package com.juicefs;

import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Constructor;
import java.net.URI;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/juicefs/ReplicateFileSystem.class */
public class ReplicateFileSystem extends FilterFileSystem {
    public static final String DST_FS_KEY = "juicefs.replicate.dst-fs";
    public static final String SRC_FS_KEY = "juicefs.replicate.src-fs";
    public static final String REPLICATION_TIMEOUT_KEY = "juicefs.replicate.timeout";
    public static final long REPLICATION_TIMEOUT = 10;
    public static final String REPLICATION_WRITE_POOL_THREADS_KEY = "juicefs.replicate.threads";
    public static final String REPLICATION_WRITE_POOL_QUEUE_SIZE_KEY = "juicefs.replicate.queue-size";
    private FileSystem dstFs;
    private ThreadPoolExecutor dataPool;
    private long timeout;
    private String name;
    public volatile boolean writeClose;
    private ScheduledExecutorService writeSwitchSchedule;
    private Constructor<?> constructor;
    private boolean withStreamCapability;
    private static final Logger LOG = LoggerFactory.getLogger(ReplicateFileSystem.class);
    public static int REPLICATION_WRITE_POOL_THREADS = 10;
    public static int REPLICATION_WRITE_POOL_QUEUE_SIZE = 100;
    public static final Path WRITE_SWITCH_FILE = new Path("/NO_REPLICATE");

    /* loaded from: input_file:com/juicefs/ReplicateFileSystem$DeleteFailFilePolicy.class */
    public static class DeleteFailFilePolicy implements RejectedExecutionHandler {
        @Override // java.util.concurrent.RejectedExecutionHandler
        public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
            if (runnable instanceof TaskItem) {
                TaskItem taskItem = (TaskItem) runnable;
                try {
                    if (taskItem.dstFs.delete(taskItem.p, false)) {
                        ReplicateFileSystem.LOG.warn("{} was discarded because dst filesystem is slow now", taskItem.p);
                    }
                } catch (Throwable th) {
                    ReplicateFileSystem.LOG.warn(String.format("dst filesystem delete %s failed", taskItem.p), th);
                }
                taskItem.cancel(true);
            }
        }
    }

    /* loaded from: input_file:com/juicefs/ReplicateFileSystem$ReplicateOutputStream.class */
    public static class ReplicateOutputStream extends OutputStream implements Syncable {
        public OutputStream ou;
        public OutputStream dou;
        volatile boolean failed = false;
        ExecutorService pool;
        long timeout;
        Path p;
        FileSystem dstFs;

        public ReplicateOutputStream(OutputStream outputStream, OutputStream outputStream2, ExecutorService executorService, long j, Path path, FileSystem fileSystem) {
            this.ou = outputStream;
            this.dou = outputStream2;
            this.pool = executorService;
            this.timeout = j;
            this.p = path;
            this.dstFs = fileSystem;
        }

        @Override // java.io.OutputStream
        public void write(int i) throws IOException {
            this.ou.write(i);
            try {
                if (this.failed) {
                    return;
                }
                this.dou.write(i);
            } catch (Throwable th) {
                ReplicateFileSystem.LOG.warn("dst filesystem write failed", th);
                fail();
            }
        }

        @Override // java.io.OutputStream
        public void write(byte[] bArr, int i, int i2) throws IOException {
            this.ou.write(bArr, i, i2);
            try {
                if (this.failed) {
                    return;
                }
                this.dou.write(bArr, i, i2);
            } catch (Throwable th) {
                ReplicateFileSystem.LOG.warn("dst filesystem write failed", th);
                fail();
            }
        }

        public void hflush() throws IOException {
            TaskItem taskItem = new TaskItem(() -> {
                if (this.failed) {
                    return false;
                }
                if (this.dou instanceof Syncable) {
                    this.dou.hflush();
                }
                return true;
            }, this.dstFs, this.p);
            this.pool.execute(taskItem);
            if (this.ou instanceof Syncable) {
                this.ou.hflush();
            }
            try {
                if (!taskItem.isCancelled()) {
                    taskItem.get(this.timeout, TimeUnit.MILLISECONDS);
                }
            } catch (Throwable th) {
                ReplicateFileSystem.LOG.warn(String.format("dst filesystem hflush failed for %s", this.p), th);
                taskItem.cancel(true);
                fail();
            }
        }

        public void hsync() throws IOException {
            TaskItem taskItem = new TaskItem(() -> {
                if (this.failed) {
                    return false;
                }
                if (this.dou instanceof Syncable) {
                    this.dou.hsync();
                }
                return true;
            }, this.dstFs, this.p);
            this.pool.execute(taskItem);
            if (this.ou instanceof Syncable) {
                this.ou.hsync();
            }
            try {
                if (!taskItem.isCancelled()) {
                    taskItem.get(this.timeout, TimeUnit.MILLISECONDS);
                }
            } catch (Throwable th) {
                ReplicateFileSystem.LOG.warn(String.format("dst filesystem hsync failed for %s", this.p), th);
                taskItem.cancel(true);
                fail();
            }
        }

        private void fail() {
            if (this.failed) {
                return;
            }
            this.failed = true;
            if (this.dstFs != null) {
                try {
                    this.dstFs.delete(this.p, false);
                } catch (Throwable th) {
                    ReplicateFileSystem.LOG.warn(String.format("dst filesystem delete %s failed", this.p), th);
                }
            }
        }

        @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            TaskItem taskItem = new TaskItem(() -> {
                if (this.failed) {
                    return false;
                }
                this.dou.close();
                return true;
            }, this.dstFs, this.p);
            this.pool.execute(taskItem);
            this.ou.close();
            try {
                if (!taskItem.isCancelled()) {
                    taskItem.get(this.timeout, TimeUnit.MILLISECONDS);
                }
            } catch (Throwable th) {
                ReplicateFileSystem.LOG.warn(String.format("dst filesystem close failed for %s", this.p), th);
                taskItem.cancel(true);
                fail();
            }
        }
    }

    /* loaded from: input_file:com/juicefs/ReplicateFileSystem$ReplicateOutputStreamWithStreamCapabilities.class */
    static class ReplicateOutputStreamWithStreamCapabilities extends ReplicateOutputStream implements StreamCapabilities {
        public ReplicateOutputStreamWithStreamCapabilities(OutputStream outputStream, OutputStream outputStream2, ExecutorService executorService, long j, Path path, FileSystem fileSystem) {
            super(outputStream, outputStream2, executorService, j, path, fileSystem);
        }

        public boolean hasCapability(String str) {
            return str.equalsIgnoreCase("hsync") || str.equalsIgnoreCase("hflush");
        }
    }

    /* loaded from: input_file:com/juicefs/ReplicateFileSystem$TaskItem.class */
    static class TaskItem extends FutureTask<Boolean> {
        FileSystem dstFs;
        Path p;

        public TaskItem(Callable<Boolean> callable, FileSystem fileSystem, Path path) {
            super(callable);
            this.dstFs = fileSystem;
            this.p = path;
        }
    }

    public void initialize(URI uri, Configuration configuration) throws IOException {
        String str;
        setConf(configuration);
        if (MigratingFileSystem.patchGetDfs(uri, configuration)) {
            throw new IllegalArgumentException("DFSAdmin is not supported for " + ReplicateFileSystem.class.getName());
        }
        String str2 = configuration.get(SRC_FS_KEY);
        URI create = str2 != null ? URI.create(str2) : null;
        if (create != null && create.getAuthority() == null) {
            URI defaultUri = getDefaultUri(configuration);
            if (create.getScheme().equals(defaultUri.getScheme())) {
                create = defaultUri;
            }
        }
        String scheme = uri.getScheme();
        String authority = uri.getAuthority();
        Configuration configuration2 = new Configuration(configuration);
        configuration2.set("fs." + scheme + ".impl", MigratingFileSystem.fsImpl.get(scheme));
        configuration2.set("fs.AbstractFileSystem." + scheme + ".impl", MigratingFileSystem.fsImpl.get(scheme + ".abstract"));
        if (create != null && scheme.equals(create.getScheme()) && Objects.equals(authority, create.getAuthority()) && (str = configuration.get(DST_FS_KEY)) != null) {
            URI create2 = URI.create(str);
            this.name = create2.getHost() == null ? "" : create2.getHost();
            if (create2.getScheme().equals("jfs")) {
                configuration2.set("juicefs.name", this.name);
            }
            try {
                this.dstFs = (FileSystem) ReflectionUtils.newInstance(getFileSystemClass(create2.getScheme(), configuration2), configuration2);
                this.dstFs.initialize(create2, configuration2);
                LOG.debug("{} was init success as replicate filesystem", str);
                int i = configuration2.getInt(REPLICATION_WRITE_POOL_THREADS_KEY, REPLICATION_WRITE_POOL_THREADS);
                this.dataPool = new ThreadPoolExecutor(i, i, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(configuration2.getInt(REPLICATION_WRITE_POOL_QUEUE_SIZE_KEY, REPLICATION_WRITE_POOL_QUEUE_SIZE)), runnable -> {
                    Thread thread = new Thread(runnable, "write-data-thread");
                    thread.setDaemon(true);
                    return thread;
                }, new DeleteFailFilePolicy());
                this.timeout = configuration2.getLong(REPLICATION_TIMEOUT_KEY, 10L) * 1000;
                if (this.timeout <= 0) {
                    this.timeout = Long.MAX_VALUE;
                }
                try {
                    Class.forName("org.apache.hadoop.fs.StreamCapabilities");
                    this.withStreamCapability = true;
                } catch (ClassNotFoundException e) {
                    this.withStreamCapability = false;
                }
                if (this.withStreamCapability) {
                    try {
                        this.constructor = Class.forName("com.juicefs.ReplicateFileSystem$ReplicateOutputStreamWithStreamCapabilities").getConstructor(OutputStream.class, OutputStream.class, ExecutorService.class, Long.TYPE, Path.class, FileSystem.class);
                    } catch (ClassNotFoundException | NoSuchMethodException e2) {
                        throw new RuntimeException(e2);
                    }
                }
            } catch (Throwable th) {
                LOG.warn(String.format("%s init failed", create2), th);
            }
        }
        this.fs = (FileSystem) ReflectionUtils.newInstance(getFileSystemClass(scheme, configuration2), configuration2);
        this.fs.initialize(uri, configuration2);
        this.writeClose = this.fs.exists(WRITE_SWITCH_FILE);
        this.writeSwitchSchedule = Executors.newScheduledThreadPool(1, runnable2 -> {
            Thread thread = new Thread(runnable2, "juicefs-replicate-refresh");
            thread.setDaemon(true);
            return thread;
        });
        this.writeSwitchSchedule.scheduleAtFixedRate(() -> {
            try {
                this.writeClose = this.fs.exists(WRITE_SWITCH_FILE);
            } catch (IOException e3) {
                LOG.warn(e3.getMessage(), e3);
                this.writeClose = true;
            }
        }, 0L, 10L, TimeUnit.SECONDS);
    }

    public void setWriteClose(boolean z) {
        this.writeClose = z;
    }

    private boolean writable() {
        return (this.dstFs == null || this.writeClose) ? false : true;
    }

    public String getScheme() {
        return this.fs.getScheme();
    }

    private OutputStream creatOutputStream(OutputStream outputStream, OutputStream outputStream2, ExecutorService executorService, long j, Path path, FileSystem fileSystem) throws IOException {
        ReplicateOutputStream replicateOutputStream = new ReplicateOutputStream(outputStream, outputStream2, executorService, j, path, fileSystem);
        if (!this.withStreamCapability) {
            return replicateOutputStream;
        }
        try {
            return (OutputStream) this.constructor.newInstance(outputStream, outputStream2, executorService, Long.valueOf(j), path, fileSystem);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private Path convertPath(Path path) {
        return new Path(path.toUri().getPath());
    }

    public FSDataOutputStream append(Path path, int i, Progressable progressable) throws IOException {
        FSDataOutputStream append = this.fs.append(path, i, progressable);
        if (writable()) {
            Path convertPath = convertPath(path);
            try {
                return new FSDataOutputStream(creatOutputStream(append.getWrappedStream(), this.dstFs.append(convertPath, i, progressable).getWrappedStream(), this.dataPool, this.timeout, convertPath, this.dstFs), getStatistics(this.fs.getScheme(), this.fs.getClass()), append.getPos());
            } catch (Throwable th) {
                LOG.warn(String.format("dst filesystem append %s failed due to: ", convertPath), th);
            }
        }
        return append;
    }

    public FSDataOutputStream create(Path path, FsPermission fsPermission, boolean z, int i, short s, long j, Progressable progressable) throws IOException {
        FSDataOutputStream create = this.fs.create(path, fsPermission, z, i, s, j, progressable);
        if (writable()) {
            Path convertPath = convertPath(path);
            try {
                return new FSDataOutputStream(creatOutputStream(create.getWrappedStream(), this.dstFs.create(convertPath, fsPermission, z, i, s, j, progressable).getWrappedStream(), this.dataPool, this.timeout, convertPath, this.dstFs), getStatistics(this.fs.getScheme(), this.fs.getClass()), 0L);
            } catch (Throwable th) {
                LOG.warn(String.format("dst filesystem create %s failed due to: ", convertPath), th);
            }
        }
        return create;
    }

    public FSDataOutputStream createNonRecursive(Path path, FsPermission fsPermission, EnumSet<CreateFlag> enumSet, int i, short s, long j, Progressable progressable) throws IOException {
        FSDataOutputStream createNonRecursive = this.fs.createNonRecursive(path, fsPermission, enumSet, i, s, j, progressable);
        if (writable()) {
            Path convertPath = convertPath(path);
            try {
                return new FSDataOutputStream(creatOutputStream(createNonRecursive.getWrappedStream(), this.dstFs.createNonRecursive(convertPath, fsPermission, enumSet, i, s, j, progressable).getWrappedStream(), this.dataPool, this.timeout, convertPath, this.dstFs), getStatistics(this.fs.getScheme(), this.fs.getClass()), 0L);
            } catch (Throwable th) {
                LOG.warn(String.format("dst filesystem createNonRecursive %s failed due to: ", convertPath), th);
            }
        }
        return createNonRecursive;
    }

    public void concat(Path path, Path[] pathArr) throws IOException {
        this.fs.concat(path, pathArr);
        if (writable()) {
            Path convertPath = convertPath(path);
            Path[] pathArr2 = (Path[]) Arrays.stream(pathArr).map(this::convertPath).toArray(i -> {
                return new Path[i];
            });
            try {
                this.dstFs.concat(convertPath, pathArr2);
            } catch (Throwable th) {
                LOG.warn(String.format("dst filesystem concat %s to %s failed due to: ", Arrays.toString(pathArr2), convertPath), th);
            }
        }
    }

    public boolean rename(Path path, Path path2) throws IOException {
        boolean rename = this.fs.rename(path, path2);
        if (writable()) {
            Path convertPath = convertPath(path);
            Path convertPath2 = convertPath(path2);
            try {
                if (rename != this.dstFs.rename(convertPath, convertPath2)) {
                    LOG.warn(String.format("rename in src and dst are not consistent for %s to %s", convertPath, convertPath2));
                }
            } catch (Throwable th) {
                LOG.warn(String.format("dst filesystem rename from %s to %s failed", convertPath, convertPath2), th);
            }
        }
        return rename;
    }

    public boolean truncate(Path path, long j) throws IOException {
        boolean truncate = this.fs.truncate(path, j);
        if (writable()) {
            Path convertPath = convertPath(path);
            try {
                if (truncate != this.dstFs.truncate(convertPath, j)) {
                    LOG.warn(String.format("truncate in src and dst are not consistent for %s to %s", convertPath, Long.valueOf(j)));
                }
            } catch (Throwable th) {
                LOG.warn(String.format("dst filesystem truncate %s to %s failed", convertPath, Long.valueOf(j)), th);
            }
        }
        return truncate;
    }

    public boolean delete(Path path, boolean z) throws IOException {
        boolean delete = this.fs.delete(path, z);
        if (writable()) {
            Path convertPath = convertPath(path);
            try {
                if (delete != this.dstFs.delete(convertPath, z)) {
                    LOG.warn(String.format("delete in src and dst are not consistent for %s", convertPath));
                }
            } catch (Throwable th) {
                LOG.warn(String.format("dst filesystem delete %s failed", convertPath), th);
            }
        }
        return delete;
    }

    public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
        boolean mkdirs = this.fs.mkdirs(path, fsPermission);
        if (writable()) {
            Path convertPath = convertPath(path);
            try {
                if (mkdirs != this.dstFs.mkdirs(convertPath, fsPermission)) {
                    LOG.warn(String.format("mkdirs in src and dst are not consistent for %s", convertPath));
                }
            } catch (Throwable th) {
                LOG.warn(String.format("dst filesystem mkdirs %s failed", convertPath), th);
            }
        }
        return mkdirs;
    }

    public void setPermission(Path path, FsPermission fsPermission) throws IOException {
        this.fs.setPermission(path, fsPermission);
        if (writable()) {
            Path convertPath = convertPath(path);
            try {
                this.dstFs.setPermission(convertPath, fsPermission);
            } catch (Throwable th) {
                LOG.warn(String.format("dst filesystem setPermission %s failed", convertPath), th);
            }
        }
    }

    public void setOwner(Path path, String str, String str2) throws IOException {
        this.fs.setOwner(path, str, str2);
        if (writable()) {
            Path convertPath = convertPath(path);
            try {
                this.dstFs.setOwner(convertPath, str, str2);
            } catch (Throwable th) {
                LOG.warn(String.format("dst filesystem setOwner %s failed", convertPath), th);
            }
        }
    }

    public void setTimes(Path path, long j, long j2) throws IOException {
        this.fs.setTimes(path, j, j2);
        if (writable()) {
            Path convertPath = convertPath(path);
            try {
                this.dstFs.setTimes(convertPath, j, j2);
            } catch (Throwable th) {
                LOG.warn(String.format("dst filesystem setTimes %s failed", convertPath), th);
            }
        }
    }

    public void setXAttr(Path path, String str, byte[] bArr, EnumSet<XAttrSetFlag> enumSet) throws IOException {
        this.fs.setXAttr(path, str, bArr, enumSet);
        if (writable()) {
            Path convertPath = convertPath(path);
            try {
                this.dstFs.setXAttr(convertPath, str, bArr, enumSet);
            } catch (Throwable th) {
                LOG.warn(String.format("dst filesystem setXAttr %s failed", convertPath), th);
            }
        }
    }

    public void removeXAttr(Path path, String str) throws IOException {
        this.fs.removeXAttr(path, str);
        if (writable()) {
            Path convertPath = convertPath(path);
            try {
                this.dstFs.removeXAttr(convertPath, str);
            } catch (Throwable th) {
                LOG.warn(String.format("dst filesystem removeXAttr %s failed", convertPath), th);
            }
        }
    }

    public void modifyAclEntries(Path path, List<AclEntry> list) throws IOException {
        this.fs.modifyAclEntries(path, list);
        if (writable()) {
            Path convertPath = convertPath(path);
            try {
                this.dstFs.modifyAclEntries(convertPath, list);
            } catch (Throwable th) {
                LOG.warn(String.format("dst filesystem modifyAclEntries %s failed", convertPath), th);
            }
        }
    }

    public void removeAclEntries(Path path, List<AclEntry> list) throws IOException {
        this.fs.removeAclEntries(path, list);
        if (writable()) {
            Path convertPath = convertPath(path);
            try {
                this.dstFs.removeAclEntries(convertPath, list);
            } catch (Throwable th) {
                LOG.warn(String.format("dst filesystem removeAclEntries %s failed", convertPath), th);
            }
        }
    }

    public void setAcl(Path path, List<AclEntry> list) throws IOException {
        this.fs.setAcl(path, list);
        if (writable()) {
            Path convertPath = convertPath(path);
            try {
                this.dstFs.setAcl(convertPath, list);
            } catch (Throwable th) {
                LOG.warn(String.format("dst filesystem setAcl %s failed", convertPath), th);
            }
        }
    }

    public void removeDefaultAcl(Path path) throws IOException {
        this.fs.removeDefaultAcl(path);
        if (writable()) {
            Path convertPath = convertPath(path);
            try {
                this.dstFs.removeDefaultAcl(convertPath);
            } catch (Throwable th) {
                LOG.warn(String.format("dst filesystem removeDefaultAcl %s failed", convertPath), th);
            }
        }
    }

    public void removeAcl(Path path) throws IOException {
        this.fs.removeAcl(path);
        if (writable()) {
            Path convertPath = convertPath(path);
            try {
                this.dstFs.removeAcl(convertPath);
            } catch (Throwable th) {
                LOG.warn(String.format("dst filesystem removeAcl %s failed", convertPath), th);
            }
        }
    }

    public void close() throws IOException {
        super.close();
        if (this.dstFs != null) {
            try {
                this.dstFs.close();
                this.writeSwitchSchedule.shutdownNow();
                this.dataPool.shutdownNow();
            } catch (Throwable th) {
                LOG.warn("dst fs close failed", th);
            }
        }
    }

    public boolean isFileClosed(Path path) throws IOException {
        if (this.fs instanceof DistributedFileSystem) {
            return this.fs.isFileClosed(path);
        }
        if (this.fs instanceof JuiceFileSystem) {
            return this.fs.isFileClosed(path);
        }
        throw new UnsupportedOperationException("Method isFileClosed is not supported");
    }
}
