package fr.inserm.u1078.tludwig.vcfprocessor.functions;

import fr.inserm.u1078.tludwig.maok.tools.DateTools;
import fr.inserm.u1078.tludwig.maok.tools.Message;
import fr.inserm.u1078.tludwig.vcfprocessor.files.PedException;
import fr.inserm.u1078.tludwig.vcfprocessor.files.VCF;
import fr.inserm.u1078.tludwig.vcfprocessor.files.VCFException;
import fr.inserm.u1078.tludwig.vcfprocessor.genetics.Variant;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:fr/inserm/u1078/tludwig/vcfprocessor/functions/ParallelVCFFunction.class */
public abstract class ParallelVCFFunction extends VCFFunction {
    public static final int QUEUE_DEPTH = 200;
    public static final int STEP = 10000;
    public static final int DELAY = 10;
    public static final String END_MESSAGE = "XXX_NO_MORE_LINES_XXX";
    public static final String EMPTY = "ZZZ_EMPTY_ZZZ";
    private VCF vcf;
    private LinkedBlockingQueue<Output> outputLines;
    private Analyzer analyzer;
    public static final byte[] EOL = "\n".getBytes();
    public static final String[] NO_OUTPUT = new String[0];
    public static final int WORKERS = Math.max(1, Math.min(8, Runtime.getRuntime().availableProcessors() - 3));

    /* loaded from: input_file:fr/inserm/u1078/tludwig/vcfprocessor/functions/ParallelVCFFunction$Analyzer.class */
    public class Analyzer extends Thread {
        private boolean stillRunning = true;
        private final LinkedBlockingQueue analyzes = new LinkedBlockingQueue(200);

        public Analyzer() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.stillRunning) {
                try {
                    Object take = this.analyzes.take();
                    if ("XXX_NO_MORE_LINES_XXX".equals(take)) {
                        this.stillRunning = false;
                    } else if (!ParallelVCFFunction.this.checkAndProcessAnalysis(take)) {
                        Message.warning("Unexpected Analysis [" + take + "]");
                    }
                } catch (InterruptedException e) {
                }
            }
        }

        public void push(Object obj) {
            try {
                this.analyzes.put(obj);
            } catch (InterruptedException e) {
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void willEnd() {
            push("XXX_NO_MORE_LINES_XXX");
        }
    }

    /* loaded from: input_file:fr/inserm/u1078/tludwig/vcfprocessor/functions/ParallelVCFFunction$Consumer.class */
    public class Consumer extends Thread {
        private final ArrayList<Output> unqueuedOutput = new ArrayList<>();
        private long start;

        public Consumer() {
        }

        /* JADX WARN: Failed to find 'out' block for switch in B:8:0x0082. Please report as an issue. */
        private boolean process(Output output) {
            boolean z = true;
            if (output.n % 10000 == 0) {
                double duration = DateTools.duration(this.start);
                Message.info(output.n + " variants processed from " + ParallelVCFFunction.this.vcffile.getFilename() + " in " + duration + "s (" + ((int) (output.n / duration)) + " variants/s)");
            }
            for (String str : output.lines) {
                boolean z2 = -1;
                switch (str.hashCode()) {
                    case -541787613:
                        if (str.equals("ZZZ_EMPTY_ZZZ")) {
                            z2 = true;
                            break;
                        }
                        break;
                    case 657866405:
                        if (str.equals("XXX_NO_MORE_LINES_XXX")) {
                            z2 = false;
                            break;
                        }
                        break;
                }
                switch (z2) {
                    case false:
                        double duration2 = DateTools.duration(this.start);
                        Message.info((output.n - 1) + " variants processed from " + ParallelVCFFunction.this.vcffile.getFilename() + " in " + duration2 + "s (" + ((int) (output.n / duration2)) + " variants/s)");
                        z = false;
                        break;
                    case true:
                        break;
                    default:
                        ParallelVCFFunction.this.processOutput(str);
                        break;
                }
            }
            return z;
        }

        private Output remove(int i) {
            for (int i2 = 0; i2 < this.unqueuedOutput.size(); i2++) {
                if (this.unqueuedOutput.get(i2).n == i) {
                    return this.unqueuedOutput.remove(i2);
                }
            }
            return null;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            this.start = new Date().getTime();
            boolean z = true;
            int i = 1;
            while (z) {
                try {
                    Output output = (Output) ParallelVCFFunction.this.outputLines.take();
                    if (output.n != i) {
                        this.unqueuedOutput.add(output);
                        while (true) {
                            Output remove = remove(i);
                            if (remove == null) {
                                break;
                            }
                            if (!process(remove)) {
                                z = false;
                            }
                            i++;
                        }
                    } else {
                        if (!process(output)) {
                            z = false;
                        }
                        i++;
                    }
                } catch (InterruptedException e) {
                    ParallelVCFFunction.this.fatalAndDie("Consumer interrupted", e);
                }
            }
        }
    }

    /* loaded from: input_file:fr/inserm/u1078/tludwig/vcfprocessor/functions/ParallelVCFFunction$Output.class */
    public static class Output {
        public final int n;
        public final String[] lines;

        public Output(int i, String[] strArr) {
            this.n = i;
            this.lines = strArr;
        }
    }

    /* loaded from: input_file:fr/inserm/u1078/tludwig/vcfprocessor/functions/ParallelVCFFunction$Worker.class */
    public class Worker implements Runnable {
        private final VCF.Reader reader;

        public Worker(VCF.Reader reader) {
            this.reader = reader;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                VCF.Wrapper nextLine = this.reader.nextLine();
                while (nextLine.line != null) {
                    ParallelVCFFunction.this.pushOutput(nextLine.index, nextLine.line);
                    nextLine = ParallelVCFFunction.this.getVCF().getNextLineWrapper();
                }
                ParallelVCFFunction.this.pushOutput(nextLine.index, "XXX_NO_MORE_LINES_XXX");
            } catch (VCFException e) {
                ParallelVCFFunction.this.fatalAndDie("Unable to read next line from VCF file", e);
            }
        }
    }

    public void processOutput(String str) {
        println(str);
    }

    public void begin() {
    }

    public String[] getExtraHeaders() {
        return null;
    }

    public String[] getHeaders() {
        getVCF().addExtraHeaders(getExtraHeaders());
        return (String[]) getVCF().getFullHeaders().toArray(new String[0]);
    }

    public final void printHeaders() {
        String[] headers = getHeaders();
        if (headers != null) {
            for (String str : headers) {
                println(str);
            }
        }
    }

    public final void printFooters() {
        String[] footers = getFooters();
        if (footers != null) {
            for (String str : footers) {
                println(str);
            }
        }
    }

    public String[] getFooters() {
        return null;
    }

    public void end() {
    }

    public final VCF getVCF() {
        return this.vcf;
    }

    public void openVCF() throws VCFException, PedException {
        setVCF(this.vcffile.getVCF(-1));
    }

    public final void setVCF(VCF vcf) {
        this.vcf = vcf;
    }

    public static String[] asOutput(Variant variant) {
        return new String[]{variant.toString()};
    }

    @Override // fr.inserm.u1078.tludwig.vcfprocessor.functions.Function
    public final void executeFunction() throws Exception {
        openVCF();
        begin();
        printHeaders();
        this.outputLines = new LinkedBlockingQueue<>(20 * WORKERS);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(WORKERS + 2);
        this.analyzer = new Analyzer();
        this.analyzer.start();
        try {
            VCF.Reader readerWithoutStarting = getVCF().getReaderWithoutStarting();
            newFixedThreadPool.submit(readerWithoutStarting);
            newFixedThreadPool.submit(new Consumer());
            for (int i = 0; i < WORKERS; i++) {
                newFixedThreadPool.submit(new Worker(readerWithoutStarting));
            }
            newFixedThreadPool.shutdown();
            newFixedThreadPool.awaitTermination(100L, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            Message.error("Thread was interupted", e);
        }
        this.analyzer.willEnd();
        while (isStillConsuming()) {
            try {
                Thread.sleep(10L);
            } catch (Exception e2) {
            }
        }
        end();
        printFooters();
    }

    public void putOutput(int i, String[] strArr) {
        try {
            this.outputLines.put(new Output(i, strArr));
        } catch (InterruptedException e) {
            fatalAndDie("Synchronisation Exception", e);
        }
    }

    public void pushOutput(int i, String str) {
        if ("XXX_NO_MORE_LINES_XXX".equals(str)) {
            putOutput(i, new String[]{"XXX_NO_MORE_LINES_XXX"});
            return;
        }
        if (VCF.FILTERED_LINE.equals(str)) {
            putOutput(i, new String[0]);
            return;
        }
        try {
            putOutput(i, processInputLine(str));
        } catch (Exception e) {
            fatalAndDie("Unable to process line \n" + str, e);
        }
    }

    public abstract String[] processInputLine(String str);

    public abstract boolean checkAndProcessAnalysis(Object obj);

    public final void pushAnalysis(Object obj) {
        this.analyzer.push(obj);
    }

    public boolean isStillConsuming() {
        return this.analyzer.stillRunning;
    }
}
