package com.juicefs;

import com.juicefs.security.ranger.RangerPermissionChecker;
import com.juicefs.utils.PatchUtil;
import com.sun.jna.platform.win32.WinError;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/juicefs/DataPrefetch.class */
public class DataPrefetch {
    private static ExecutorService threadPool;
    private volatile boolean isClosed = false;
    private final FileSystem fs;
    private final Configuration conf;
    private static final Logger LOG = LoggerFactory.getLogger(DataPrefetch.class);
    private static final BlockingQueue<TaskFiles> paths = new LinkedBlockingQueue(WinError.WSABASEERR);
    private static final Map<String, TaskFiles> openedStreams = new ConcurrentHashMap();
    private static boolean prefetchPatched = false;

    /* loaded from: input_file:com/juicefs/DataPrefetch$RunEnv.class */
    enum RunEnv {
        SPARK,
        HIVE_TEZ,
        HIVE_MR,
        PRESTO
    }

    /* loaded from: input_file:com/juicefs/DataPrefetch$TaskFiles.class */
    private static class TaskFiles {
        private String id;
        private List<String> files;
        private volatile List<InputStream> streams;
        private volatile boolean complete;

        public TaskFiles(String str, List<String> list) {
            this.id = str;
            this.files = list;
            this.streams = Collections.synchronizedList(new ArrayList(list.size()));
        }

        public void setComplete() {
            synchronized (this) {
                this.complete = true;
                Iterator<InputStream> it = this.streams.iterator();
                while (it.hasNext()) {
                    try {
                        it.next().close();
                    } catch (IOException e) {
                        DataPrefetch.LOG.warn("prefetch failed to close", e);
                    }
                }
                DataPrefetch.openedStreams.remove(this.id);
            }
        }

        public boolean isComplete() {
            return this.complete;
        }

        public void prefetch(FileSystem fileSystem, String str, int i, byte[] bArr) {
            try {
                if (!this.complete) {
                    Path path = new Path(str);
                    FileStatus fileStatus = fileSystem.getFileStatus(path);
                    int len = fileStatus.getLen() > ((long) i) ? i : (int) fileStatus.getLen();
                    InputStream open = fileSystem.open(path);
                    open.read(fileStatus.getLen() - len, bArr, 0, len);
                    synchronized (this) {
                        if (this.complete) {
                            open.close();
                        } else {
                            this.streams.add(open);
                        }
                    }
                }
            } catch (IOException e) {
                DataPrefetch.LOG.warn("prefetch failed", e);
            }
        }
    }

    public DataPrefetch(FileSystem fileSystem) {
        this.fs = fileSystem;
        this.conf = fileSystem.getConf();
    }

    public synchronized void registerPrefetch() {
        RunEnv runEnv = null;
        StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
        int length = stackTrace.length;
        int i = 0;
        while (true) {
            if (i >= length) {
                break;
            }
            String className = stackTrace[i].getClassName();
            if (className.contains("org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader")) {
                runEnv = RunEnv.HIVE_TEZ;
                LOG.info("prefetch in hive tez");
                break;
            } else {
                if (className.contains("org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader") || className.contains("org.apache.hadoop.hive.ql.exec.mr.MapRedTask")) {
                    break;
                }
                if (className.contains("org.apache.spark.sql.execution")) {
                    runEnv = RunEnv.SPARK;
                    LOG.info("prefetch in spark");
                    break;
                }
                i++;
            }
        }
        runEnv = RunEnv.HIVE_MR;
        LOG.info("prefetch in hive mr");
        if (runEnv == null) {
            return;
        }
        if (!prefetchPatched) {
            switch (runEnv) {
                case SPARK:
                    patchSpark(this.fs.getScheme());
                    break;
                case HIVE_TEZ:
                    patchHiveTez(this.fs.getScheme());
                    break;
                case HIVE_MR:
                    patchHiveMR(this.fs.getScheme());
                    break;
                default:
                    return;
            }
            prefetchPatched = true;
        }
        int parseInt = Integer.parseInt(getConf(this.conf, "prefetch-threads", String.valueOf(Runtime.getRuntime().availableProcessors() * 2)));
        int parseInt2 = Integer.parseInt(getConf(this.conf, "prefetch-len", "16384"));
        threadPool = new ThreadPoolExecutor(parseInt, parseInt, 3L, TimeUnit.SECONDS, new LinkedBlockingQueue(), runnable -> {
            Thread thread = new Thread(runnable, "jfs-prefetch");
            thread.setDaemon(true);
            return thread;
        }, new ThreadPoolExecutor.DiscardPolicy());
        Thread thread = new Thread(() -> {
            while (!this.isClosed) {
                byte[] bArr = new byte[parseInt2];
                try {
                    TaskFiles poll = paths.poll(1L, TimeUnit.SECONDS);
                    if (poll != null && !poll.isComplete()) {
                        for (String str : poll.files) {
                            if (!this.isClosed) {
                                threadPool.execute(() -> {
                                    poll.prefetch(this.fs, str, parseInt2, bArr);
                                });
                            }
                        }
                    }
                } catch (InterruptedException e) {
                }
            }
        }, "prefetch dispatcher");
        thread.setDaemon(true);
        thread.start();
    }

    private void patchHiveMR(String str) {
        PatchUtil.patchBefore("org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader", "initNextRecordReader", null, "if (split instanceof org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.CombineHiveInputSplit) {\n        try {\n          if (idx == 1) {\n            String id = java.util.UUID.randomUUID().toString();\n            jc.set(\"jfs-mr-task-id\", id);\n            java.util.Set res = new java.util.LinkedHashSet();\n            org.apache.hadoop.fs.Path[] paths = ((org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.CombineHiveInputSplit) split).getPaths();\n            for (int i = 0; i < paths.length; i++) {\n              org.apache.hadoop.fs.Path p = paths[i];\n              if (\"" + str + "\".equals(p.toUri().getScheme())) {\n                res.add(paths[i].toUri().getPath());\n              }\n            }\n            if (res.size() > 0) {\n              com.juicefs.DataPrefetch.prefetchData(id, new java.util.ArrayList(res));\n            }\n          }\n          if (idx >= split.getNumPaths()) {\n            com.juicefs.DataPrefetch.completePrefetch(jc.get(\"jfs-mr-task-id\"));\n          }\n        } catch (Throwable e) {\n          System.err.println(\"prefetch hive mr failed to send back\" + e.getMessage());\n        }\n      }");
    }

    private void patchHiveTez(String str) {
        PatchUtil.patchBefore("org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader", "initNextRecordReader", null, "try {\n        if (idx == 1) {\n          String id = java.util.UUID.randomUUID().toString();\n          job.set(\"jfs-tez-task-id\", id);\n          java.util.Set paths = new java.util.LinkedHashSet();\n          java.util.List splits = groupedSplit.getGroupedSplits();\n          for (int i = 0; i < splits.size(); i++) {\n            if (splits.get(i) instanceof org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit) {\n              org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit hiveSplit = (org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit) splits.get(i);\n              org.apache.hadoop.fs.Path p = hiveSplit.getPath();\n              if (\"" + str + "\".equals(p.toUri().getScheme())) {\n                paths.add(p.toUri().getPath());\n              }\n            }\n          }\n          if (paths.size() > 0) {\n            com.juicefs.DataPrefetch.prefetchData(id, new java.util.ArrayList(paths));\n          }\n        }\n        if (idx >= groupedSplit.getGroupedSplits().size()) {\n          com.juicefs.DataPrefetch.completePrefetch(job.get(\"jfs-tez-task-id\"));\n        }\n      } catch (Throwable e) {\n        System.err.println(\"tez prefetch failed to send back \" + e.getMessage());\n      }");
    }

    private void patchSpark(String str) {
        boolean z = false;
        try {
            if (Class.forName("org.apache.spark.sql.execution.datasources.FilePartition").getDeclaredField("files").getType().isArray()) {
                z = true;
            }
            PatchUtil.patchBeforeAndAfter("org.apache.spark.scheduler.ShuffleMapTask", "runTask", new String[]{"org.apache.spark.TaskContext"}, z ? "     if (partition instanceof org.apache.spark.sql.execution.datasources.FilePartition) {\n         java.util.List paths = new java.util.ArrayList();\n         try {\n             org.apache.spark.sql.execution.datasources.PartitionedFile[] files = ((org.apache.spark.sql.execution.datasources.FilePartition) partition).files();\n             for (int i = 0; i < files.length; i++) {\n                 org.apache.spark.sql.execution.datasources.PartitionedFile file = files[i];\n                 String f = file.filePath();\n                 if (f.trim().startsWith(\"" + str + "\")) {\n                     paths.add(f);\n                 }\n             }\n             if (paths.size() > 0) {\n                 com.juicefs.DataPrefetch.prefetchData(String.valueOf(context.taskAttemptId()), paths);\n             }\n         } catch (Throwable e) {\n             System.err.println(\"prefetch failed to send back: \" + e.getMessage());\n         }\n     }" : "    if (partition instanceof org.apache.spark.sql.execution.datasources.FilePartition) {\n      java.util.List paths = new java.util.ArrayList();\n      try {\n        scala.collection.Iterator iterator = ((org.apache.spark.sql.execution.datasources.FilePartition) partition).files().iterator();\n        while (iterator.hasNext()) {\n          org.apache.spark.sql.execution.datasources.PartitionedFile file = (org.apache.spark.sql.execution.datasources.PartitionedFile) iterator.next();\n          String f = file.filePath();\n          if (f.trim().startsWith(\"" + str + "\")) {\n            paths.add(f);\n          }\n        }\n        \n        if (paths.size() > 0) {\n          com.juicefs.DataPrefetch.prefetchData(String.valueOf(context.taskAttemptId()), paths);\n        }\n      } catch (Throwable e) {\n        System.err.println(\"prefetch failed to send back: \" + e.getMessage());\n      }\n    }", "com.juicefs.DataPrefetch.completePrefetch(String.valueOf(context.taskAttemptId()));");
        } catch (ClassNotFoundException | NoSuchFieldException e) {
            LOG.warn("spark prefetch patch failed");
        }
    }

    public static void prefetchData(String str, List<String> list) {
        LOG.info("prefetching {} files for {}", Integer.valueOf(list.size()), str);
        TaskFiles taskFiles = new TaskFiles(str, list);
        openedStreams.put(str, taskFiles);
        paths.offer(taskFiles);
    }

    public static void completePrefetch(String str) {
        TaskFiles taskFiles;
        if (str == null || (taskFiles = openedStreams.get(str)) == null) {
            return;
        }
        LOG.info("complete prefetch for {}", str);
        taskFiles.setComplete();
    }

    private String getConf(Configuration configuration, String str, String str2) {
        String host = this.fs.getUri().getHost();
        String str3 = configuration.get("juicefs." + str, str2);
        if (host != null && !host.equals("")) {
            str3 = configuration.get("juicefs." + host + RangerPermissionChecker.DEFAULT_FILENAME_EXTENSION_SEPARATOR + str, str3);
        }
        if (str3 != null) {
            str3 = str3.trim();
        }
        return str3;
    }

    public void close() {
        this.isClosed = true;
        if (threadPool != null) {
            threadPool.shutdownNow();
        }
    }
}
