/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.zookeeper;

import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicReference;
import javax.security.auth.Subject;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.framework.recipes.leader.Participant;
import org.apache.storm.blobstore.BlobStore;
import org.apache.storm.blobstore.InputStreamWithMeta;
import org.apache.storm.callback.DefaultWatcherCallBack;
import org.apache.storm.callback.WatcherCallBack;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.nimbus.ILeaderElector;
import org.apache.storm.nimbus.NimbusInfo;
import org.apache.storm.security.auth.ReqContext;
import org.apache.storm.utils.Utils;
import org.apache.storm.zookeeper.ClientZookeeper;
import org.apache.storm.zookeeper.LeaderElectorImp;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Zookeeper {
    private static Logger LOG = LoggerFactory.getLogger(Zookeeper.class);
    private static final Zookeeper INSTANCE;
    private static Zookeeper _instance;

    public static void setInstance(Zookeeper u) {
        _instance = u;
    }

    public static void resetInstance() {
        _instance = INSTANCE;
    }

    public static List mkInprocessZookeeper(String localdir, Integer port) throws Exception {
        File localfile = new File(localdir);
        ZooKeeperServer zk = new ZooKeeperServer(localfile, localfile, 2000);
        NIOServerCnxnFactory factory = null;
        int report = 2000;
        int limitPort = 65535;
        if (port != null) {
            report = port;
            limitPort = port;
        }
        while (true) {
            try {
                factory = new NIOServerCnxnFactory();
                factory.configure(new InetSocketAddress(report), 0);
            }
            catch (BindException e) {
                if (++report <= limitPort) continue;
                throw new RuntimeException("No port is available to launch an inprocess zookeeper");
            }
            break;
        }
        LOG.info("Starting inprocess zookeeper at port {} and dir {}", (Object)report, (Object)localdir);
        factory.startup(zk);
        return Arrays.asList(new Long(report), factory);
    }

    public static void shutdownInprocessZookeeper(NIOServerCnxnFactory handle) {
        handle.shutdown();
    }

    public static NimbusInfo toNimbusInfo(Participant participant) {
        String id = participant.getId();
        if (StringUtils.isBlank((String)id)) {
            throw new RuntimeException("No nimbus leader participant host found, have you started your nimbus hosts?");
        }
        NimbusInfo nimbusInfo = NimbusInfo.parse((String)id);
        nimbusInfo.setLeader(participant.isLeader());
        return nimbusInfo;
    }

    public static LeaderLatchListener leaderLatchListenerImpl(final Map<String, Object> conf, final CuratorFramework zk, final BlobStore blobStore, final LeaderLatch leaderLatch) throws UnknownHostException {
        final String hostName = InetAddress.getLocalHost().getCanonicalHostName();
        return new LeaderLatchListener(){
            final String STORM_JAR_SUFFIX = "-stormjar.jar";
            final String STORM_CODE_SUFFIX = "-stormcode.ser";
            final String STORM_CONF_SUFFIX = "-stormconf.ser";

            public void isLeader() {
                TreeSet<String> activeTopologyIds = new TreeSet<String>(ClientZookeeper.getChildren((CuratorFramework)zk, (String)(conf.get("storm.zookeeper.root") + "/storms"), (boolean)false));
                Set<String> activeTopologyBlobKeys = this.populateTopologyBlobKeys(activeTopologyIds);
                Set<String> activeTopologyCodeKeys = this.filterTopologyCodeKeys(activeTopologyBlobKeys);
                HashSet allLocalBlobKeys = Sets.newHashSet((Iterator)blobStore.listKeys());
                Set<String> allLocalTopologyBlobKeys = this.filterTopologyBlobKeys(allLocalBlobKeys);
                Sets.SetView diffTopology = Sets.difference(activeTopologyBlobKeys, allLocalTopologyBlobKeys);
                LOG.info("active-topology-blobs [{}] local-topology-blobs [{}] diff-topology-blobs [{}]", new Object[]{this.generateJoinedString(activeTopologyIds), this.generateJoinedString(allLocalTopologyBlobKeys), this.generateJoinedString((Set<String>)diffTopology)});
                if (diffTopology.isEmpty()) {
                    Set<String> activeTopologyDependencies = this.getTopologyDependencyKeys(activeTopologyCodeKeys);
                    Sets.SetView diffDependencies = Sets.difference(activeTopologyDependencies, (Set)allLocalBlobKeys);
                    LOG.info("active-topology-dependencies [{}] local-blobs [{}] diff-topology-dependencies [{}]", new Object[]{this.generateJoinedString(activeTopologyDependencies), this.generateJoinedString(allLocalBlobKeys), this.generateJoinedString((Set<String>)diffDependencies)});
                    if (diffDependencies.isEmpty()) {
                        LOG.info("Accepting leadership, all active topologies and corresponding dependencies found locally.");
                    } else {
                        LOG.info("Code for all active topologies is available locally, but some dependencies are not found locally, giving up leadership.");
                        this.closeLatch();
                    }
                } else {
                    LOG.info("code for all active topologies not available locally, giving up leadership.");
                    this.closeLatch();
                }
            }

            public void notLeader() {
                LOG.info("{} lost leadership.", (Object)hostName);
            }

            private String generateJoinedString(Set<String> activeTopologyIds) {
                return Joiner.on((String)",").join(activeTopologyIds);
            }

            private Set<String> populateTopologyBlobKeys(Set<String> activeTopologyIds) {
                TreeSet<String> activeTopologyBlobKeys = new TreeSet<String>();
                for (String activeTopologyId : activeTopologyIds) {
                    activeTopologyBlobKeys.add(activeTopologyId + "-stormjar.jar");
                    activeTopologyBlobKeys.add(activeTopologyId + "-stormcode.ser");
                    activeTopologyBlobKeys.add(activeTopologyId + "-stormconf.ser");
                }
                return activeTopologyBlobKeys;
            }

            private Set<String> filterTopologyBlobKeys(Set<String> blobKeys) {
                HashSet<String> topologyBlobKeys = new HashSet<String>();
                for (String blobKey : blobKeys) {
                    if (!blobKey.endsWith("-stormjar.jar") && !blobKey.endsWith("-stormcode.ser") && !blobKey.endsWith("-stormconf.ser")) continue;
                    topologyBlobKeys.add(blobKey);
                }
                return topologyBlobKeys;
            }

            private Set<String> filterTopologyCodeKeys(Set<String> blobKeys) {
                HashSet<String> topologyCodeKeys = new HashSet<String>();
                for (String blobKey : blobKeys) {
                    if (!blobKey.endsWith("-stormcode.ser")) continue;
                    topologyCodeKeys.add(blobKey);
                }
                return topologyCodeKeys;
            }

            private Set<String> getTopologyDependencyKeys(Set<String> activeTopologyCodeKeys) {
                TreeSet<String> activeTopologyDependencies = new TreeSet<String>();
                Subject subject = ReqContext.context().subject();
                for (String activeTopologyCodeKey : activeTopologyCodeKeys) {
                    try {
                        InputStreamWithMeta blob = blobStore.getBlob(activeTopologyCodeKey, subject);
                        byte[] blobContent = IOUtils.readFully((InputStream)blob, (int)new Long(blob.getFileLength()).intValue());
                        StormTopology stormCode = (StormTopology)Utils.deserialize((byte[])blobContent, StormTopology.class);
                        if (stormCode.is_set_dependency_jars()) {
                            activeTopologyDependencies.addAll(stormCode.get_dependency_jars());
                        }
                        if (!stormCode.is_set_dependency_artifacts()) continue;
                        activeTopologyDependencies.addAll(stormCode.get_dependency_artifacts());
                    }
                    catch (IOException | AuthorizationException | KeyNotFoundException e) {
                        LOG.error("Exception occurs while reading blob for key: " + activeTopologyCodeKey + ", exception: " + e, e);
                        throw new RuntimeException("Exception occurs while reading blob for key: " + activeTopologyCodeKey + ", exception: " + e, e);
                    }
                }
                return activeTopologyDependencies;
            }

            private void closeLatch() {
                try {
                    leaderLatch.close();
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }

    public static ILeaderElector zkLeaderElector(Map<String, Object> conf, BlobStore blobStore) throws UnknownHostException {
        return _instance.zkLeaderElectorImpl(conf, blobStore);
    }

    protected ILeaderElector zkLeaderElectorImpl(Map<String, Object> conf, BlobStore blobStore) throws UnknownHostException {
        List servers = (List)conf.get("storm.zookeeper.servers");
        Object port = conf.get("storm.zookeeper.port");
        CuratorFramework zk = ClientZookeeper.mkClient(conf, (List)servers, (Object)port, (String)"", (WatcherCallBack)new DefaultWatcherCallBack(), conf);
        String leaderLockPath = conf.get("storm.zookeeper.root") + "/leader-lock";
        String id = NimbusInfo.fromConf(conf).toHostPortString();
        AtomicReference<LeaderLatch> leaderLatchAtomicReference = new AtomicReference<LeaderLatch>(new LeaderLatch(zk, leaderLockPath, id));
        AtomicReference<LeaderLatchListener> leaderLatchListenerAtomicReference = new AtomicReference<LeaderLatchListener>(Zookeeper.leaderLatchListenerImpl(conf, zk, blobStore, leaderLatchAtomicReference.get()));
        return new LeaderElectorImp(conf, servers, zk, leaderLockPath, id, leaderLatchAtomicReference, leaderLatchListenerAtomicReference, blobStore);
    }

    static {
        _instance = INSTANCE = new Zookeeper();
    }
}

