/*
 * Decompiled with CFR 0.152.
 */
package edu.ucar.unidata.sruth;

import edu.ucar.unidata.sruth.AbstractNode;
import edu.ucar.unidata.sruth.Archive;
import edu.ucar.unidata.sruth.CancellingExecutor;
import edu.ucar.unidata.sruth.ClearingHouse;
import edu.ucar.unidata.sruth.ClientManager;
import edu.ucar.unidata.sruth.Filter;
import edu.ucar.unidata.sruth.InetSocketAddressSet;
import edu.ucar.unidata.sruth.PortNumberSet;
import edu.ucar.unidata.sruth.Predicate;
import edu.ucar.unidata.sruth.Server;
import edu.ucar.unidata.sruth.SinkServer;
import edu.ucar.unidata.sruth.TrackerProxy;
import edu.ucar.unidata.sruth.Util;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;

@ThreadSafe
final class SinkNode
extends AbstractNode {
    private final InetSocketAddress trackerAddress;
    @GuardedBy(value="this")
    private final ArrayList<ClientManager> clientManagers;
    private final TrackerProxy trackerProxy;
    private static final Logger logger = Util.getLogger();

    SinkNode(Archive archive, Predicate predicate, InetSocketAddress trackerAddress) throws IOException {
        this(archive, predicate, trackerAddress, new InetSocketAddressSet());
    }

    SinkNode(Archive archive, Predicate predicate, InetSocketAddress trackerAddress, int serverPort) throws UnknownHostException, IOException {
        this(archive, predicate, trackerAddress, new InetSocketAddressSet(InetAddress.getLocalHost(), PortNumberSet.getInstance(serverPort, serverPort)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    SinkNode(Archive archive, Predicate predicate, InetSocketAddress trackerAddress, InetSocketAddressSet inetSockAddrSet) throws IOException {
        super(archive, predicate, inetSockAddrSet, new CancellingExecutor(1, 1 + predicate.getFilterCount(), 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()));
        if (trackerAddress == null) {
            throw new NullPointerException();
        }
        this.trackerAddress = trackerAddress;
        Archive.DistributedTrackerFiles distributedTrackerFiles = this.clearingHouse.getDistributedTrackerFiles(trackerAddress);
        this.trackerProxy = new TrackerProxy(trackerAddress, this.localServer.getSocketAddress(), distributedTrackerFiles);
        this.clientManagers = new ArrayList(predicate.getFilterCount());
        SinkNode sinkNode = this;
        synchronized (sinkNode) {
            for (Filter filter : this.getPredicate()) {
                ClientManager clientManager = new ClientManager(this.localServer.getSocketAddress(), this.clearingHouse, filter, this.trackerProxy);
                this.clientManagers.add(clientManager);
            }
        }
    }

    @Override
    Server createServer(ClearingHouse clearingHouse, InetSocketAddressSet inetSockAddrSet) throws SocketException, IOException {
        return new SinkServer(clearingHouse, inetSockAddrSet);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public Void call() throws InterruptedException, IOException {
        logger.trace("Starting up: {}", this);
        String prevName = Thread.currentThread().getName();
        Thread.currentThread().setName(this.toString());
        try {
            int clientManagerCount;
            SinkNode sinkNode = this;
            synchronized (sinkNode) {
                clientManagerCount = this.clientManagers.size();
            }
            try {
                ExecutorCompletionService<Void> completionService = new ExecutorCompletionService<Void>(this.executorService);
                HashMap<Future<Void>, ClientManager> clientManagerMap = new HashMap<Future<Void>, ClientManager>(clientManagerCount);
                Future<Void> serverFuture = completionService.submit(this.localServer);
                SinkNode sinkNode2 = this;
                synchronized (sinkNode2) {
                    for (ClientManager clientManager : this.clientManagers) {
                        Future<Void> future = completionService.submit(clientManager);
                        clientManagerMap.put(future, clientManager);
                    }
                }
                while (clientManagerMap.size() > 0) {
                    Future future = completionService.take();
                    if (future == serverFuture) {
                        if (future.isCancelled()) continue;
                        try {
                            future.get();
                            throw new AssertionError();
                        }
                        catch (ExecutionException e) {
                            Throwable cause = e.getCause();
                            if (!(cause instanceof IOException)) throw new RuntimeException("Unexpected error: " + this.localServer, cause);
                            throw new IOException("Local sink-server crashed: " + this.localServer, cause);
                        }
                    }
                    ClientManager clientManager = (ClientManager)clientManagerMap.remove(future);
                    assert (future != null);
                    if (future.isCancelled()) {
                        logger.debug("Cancelled: {}", clientManager);
                        continue;
                    }
                    try {
                        future.get();
                    }
                    catch (ExecutionException e) {
                        Throwable cause = e.getCause();
                        if (!(cause instanceof IOException)) throw new RuntimeException("Unexpected error: " + clientManager, cause);
                        throw new IOException("I/O error: " + clientManager, cause);
                        return null;
                    }
                }
            }
            finally {
                this.executorService.shutdownNow();
                this.awaitCompletion();
            }
        }
        finally {
            Thread.currentThread().setName(prevName);
            logger.trace("Done: {}", this);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void waitUntilRunning() throws InterruptedException {
        this.localServer.waitUntilRunning();
        SinkNode sinkNode = this;
        synchronized (sinkNode) {
            for (ClientManager clientManager : this.clientManagers) {
                clientManager.waitUntilRunning();
            }
        }
    }

    long getReceivedFileCount() {
        return this.clearingHouse.getReceivedFileCount();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    int getClientCount() {
        int n = 0;
        SinkNode sinkNode = this;
        synchronized (sinkNode) {
            for (ClientManager clientManager : this.clientManagers) {
                n += clientManager.getClientCount();
            }
        }
        return n;
    }

    public synchronized String toString() {
        return "SinkNode [localServer=" + this.localServer + ", trackerAddress=" + this.trackerAddress + "]";
    }
}

