/*
 * Decompiled with CFR 0.152.
 */
package edu.ucar.unidata.sruth;

import edu.ucar.unidata.sruth.Archive;
import edu.ucar.unidata.sruth.ArchivePath;
import edu.ucar.unidata.sruth.CancellingExecutor;
import edu.ucar.unidata.sruth.DataProduct;
import edu.ucar.unidata.sruth.DataProductListener;
import edu.ucar.unidata.sruth.Decoder;
import edu.ucar.unidata.sruth.Filter;
import edu.ucar.unidata.sruth.Predicate;
import edu.ucar.unidata.sruth.Processor;
import edu.ucar.unidata.sruth.SinkNode;
import edu.ucar.unidata.sruth.Subscription;
import edu.ucar.unidata.sruth.Util;
import edu.ucar.unidata.sruth.XmlActionFile;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.nio.file.InvalidPathException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;

@ThreadSafe
public final class Subscriber
implements Callable<Void> {
    private static final Logger logger = Util.getLogger();
    private final SinkNode sinkNode;
    private final Predicate predicate;
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    private final Archive archive;
    private final Processor processor;

    public Subscriber(Path rootDir, InetSocketAddress trackerAddress, Predicate predicate, Processor processor) throws IOException {
        this(rootDir, trackerAddress, predicate, processor, 0);
    }

    public Subscriber(Path rootDir, InetSocketAddress trackerAddress, Predicate predicate, final Processor processor, int serverPort) throws IOException {
        if (null == rootDir) {
            throw new NullPointerException();
        }
        if (null == trackerAddress) {
            throw new NullPointerException();
        }
        if (null == predicate) {
            throw new NullPointerException();
        }
        if (null == processor) {
            throw new NullPointerException();
        }
        this.archive = new Archive(rootDir);
        Archive.DistributedTrackerFiles distributedTrackerFiles = new Archive.DistributedTrackerFiles(this.archive, trackerAddress);
        Filter filterServerMapFilter = distributedTrackerFiles.getFilter();
        predicate = predicate.add(filterServerMapFilter);
        this.archive.addDataProductListener(new DataProductListener(){

            @Override
            public void process(DataProduct dataProduct) {
                if (!processor.offer(dataProduct)) {
                    logger.error("Couldn't process data-product: {}", dataProduct);
                }
            }
        });
        this.sinkNode = new SinkNode(this.archive, predicate, trackerAddress, serverPort);
        this.predicate = predicate;
        this.processor = new Processor();
    }

    Predicate getPredicate() {
        return this.predicate;
    }

    Path getRootDir() {
        return this.archive.getRootDir();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Void call() throws InterruptedException, IOException {
        logger.trace("Starting up: {}", this);
        if (!this.isRunning.compareAndSet(false, true)) {
            throw new IllegalStateException();
        }
        String origThreadName = Thread.currentThread().getName();
        Thread.currentThread().setName(this.toString());
        CancellingExecutor executor = new CancellingExecutor(2, 2, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        try {
            ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<Void>(executor);
            Future<Void> processingFuture = completionService.submit(this.processor);
            Future<Void> sinkNodeFuture = completionService.submit(this.sinkNode);
            for (int i = 0; i < 2; ++i) {
                Future future = completionService.take();
                if (future.isCancelled()) {
                    break;
                }
                if (future == processingFuture) {
                    try {
                        future.get();
                        throw new AssertionError();
                    }
                    catch (ExecutionException e) {
                        throw new RuntimeException("Unexpected error: " + this.processor, e.getCause());
                    }
                }
                assert (future == sinkNodeFuture);
                try {
                    future.get();
                    processingFuture.cancel(true);
                    continue;
                }
                catch (ExecutionException e) {
                    Throwable cause = e.getCause();
                    logger.trace("Execution exception: {}", (Object)cause.toString());
                    if (cause instanceof IOException) {
                        throw new IOException("IO error: " + this.sinkNode, cause);
                    }
                    throw new RuntimeException("Unexpected error: " + this.sinkNode, cause);
                }
            }
        }
        finally {
            executor.shutdownNow();
            Thread.interrupted();
            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
            try {
                this.archive.close();
            }
            catch (IOException iOException) {}
            Thread.currentThread().setName(origThreadName);
            logger.trace("Done: {}", this);
        }
        return null;
    }

    public void waitUntilRunning() throws InterruptedException {
        this.processor.waitUntilRunning();
        this.sinkNode.waitUntilRunning();
    }

    public long getReceivedFileCount() {
        return this.sinkNode.getReceivedFileCount();
    }

    public int getPeerCount() {
        return this.sinkNode.getClientCount() + this.sinkNode.getServletCount();
    }

    Path getAbsolutePath(ArchivePath archivePath) {
        return this.archive.resolve(archivePath);
    }

    public String toString() {
        return "Subscriber [sinkNode=" + this.sinkNode + "]";
    }

    public static void main(String[] args) throws SecurityException, IOException {
        boolean INVALID_INVOCATION = true;
        Path archivePath = Paths.get(System.getProperty("user.home") + File.separatorChar + Util.PACKAGE_NAME, new String[0]);
        Processor processor = new Processor();
        Subscription subscription = null;
        int serverPort = 0;
        try {
            String arg;
            int iarg;
            for (iarg = 0; iarg < args.length; ++iarg) {
                arg = args[iarg];
                try {
                    if (arg.charAt(0) != '-') break;
                    String optString = arg.substring(1);
                    arg = args[++iarg];
                    if (optString.equals("a")) {
                        try {
                            processor = Util.decodeUrlOrFile(arg, new Decoder<Processor>(){

                                @Override
                                public Processor decode(InputStream input) throws IOException {
                                    return XmlActionFile.getProcessor(input);
                                }
                            });
                        }
                        catch (Exception e) {
                            logger.error("Couldn't process local-actions argument: \"{}\": {}", (Object)arg, (Object)e.toString());
                            System.exit(1);
                        }
                        continue;
                    }
                    if (optString.equals("d")) {
                        try {
                            archivePath = Paths.get(arg, new String[0]);
                            continue;
                        }
                        catch (InvalidPathException e) {
                            logger.error("Couldn't process archive argument: \"{}\"", (Object)arg);
                            throw new IllegalArgumentException();
                        }
                    }
                    if (optString.equals("s")) {
                        try {
                            serverPort = Integer.valueOf(arg);
                            continue;
                        }
                        catch (Exception e) {
                            logger.error("Couldn't decode server-port argument: \"{}\": {}", (Object)arg, (Object)e.toString());
                            throw new IllegalArgumentException();
                        }
                    }
                    logger.error("Invalid option: \"{}\"", (Object)optString);
                    throw new IllegalArgumentException();
                }
                catch (IndexOutOfBoundsException e) {
                    logger.error("Invalid argument: \"{}\"", (Object)arg);
                    throw new IllegalArgumentException();
                }
            }
            if (iarg >= args.length) {
                logger.error("The subscription argument is missing");
                throw new IllegalArgumentException();
            }
            arg = args[iarg++];
            try {
                subscription = Util.decodeUrlOrFile(arg, new Decoder<Subscription>(){

                    @Override
                    public Subscription decode(InputStream input) throws IOException {
                        return new Subscription(input);
                    }
                });
            }
            catch (Exception e) {
                logger.error("Couldn't process subscription argument: \"{}\": {}", (Object)arg, (Object)e.toString());
                throw new IllegalArgumentException();
            }
            if (iarg < args.length) {
                logger.error("Too many arguments");
                throw new IllegalArgumentException();
            }
        }
        catch (IllegalArgumentException e) {
            logger.info("Usage: ... [-a actions] [-d archive] [-s port] subscription\nwhere:\n    -a actions     URL or pathname of the XML document specifying local\n                   processing actions. The default is to do no local\n                   processing of received data-products: the instance\n                   becomes a pure relay node in the network.\n    -d archive     Pathname of the root of the temporary data archive.\n                   The default is the subdirectory \"SRUTH\" of the\n                   user's home-directory.\n    -s port        Port number on which the local data-exchange server\n                   will listen for connections. If zero, then an ephemeral\n                   port will be chosen by the operating-system (which is\n                   the default).\n    subscription   URL or pathname of the XML document that contains\n                   the subscription information.\n");
            System.exit(1);
        }
        Subscriber subscriber = null;
        subscriber = new Subscriber(archivePath, subscription.getTrackerAddress(), subscription.getPredicate(), processor, serverPort);
        try {
            subscriber.call();
        }
        catch (InterruptedException ignored) {
            // empty catch block
        }
        System.exit(0);
    }
}

