/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm;

import com.google.common.collect.Sets;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.ILocalCluster;
import org.apache.storm.ISubmitterHook;
import org.apache.storm.blobstore.NimbusBlobStore;
import org.apache.storm.dependency.DependencyPropertiesParser;
import org.apache.storm.dependency.DependencyUploader;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.Credentials;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.NotAliveException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.SubmitOptions;
import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.generated.TopologyInitialStatus;
import org.apache.storm.hooks.SubmitterHookException;
import org.apache.storm.scheduler.resource.ResourceUtils;
import org.apache.storm.security.auth.AuthUtils;
import org.apache.storm.security.auth.IAutoCredentials;
import org.apache.storm.utils.BufferFileInputStream;
import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Utils;
import org.apache.storm.validation.ConfigValidation;
import org.apache.thrift.TException;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StormSubmitter {
    public static final Logger LOG = LoggerFactory.getLogger(StormSubmitter.class);
    private static final int THRIFT_CHUNK_SIZE_BYTES = 307200;
    private static ILocalCluster localNimbus = null;
    public static final Pattern zkDigestPattern = Pattern.compile("\\S+:\\S+");

    private static String generateZookeeperDigestSecretPayload() {
        return Utils.secureRandomLong() + ":" + Utils.secureRandomLong();
    }

    public static boolean validateZKDigestPayload(String payload) {
        if (payload != null) {
            Matcher m = zkDigestPattern.matcher(payload);
            return m.matches();
        }
        return false;
    }

    public static Map prepareZookeeperAuthentication(Map<String, Object> conf) {
        HashMap<String, String> toRet = new HashMap<String, String>();
        if (!conf.containsKey("storm.zookeeper.topology.auth.payload") || conf.get("storm.zookeeper.topology.auth.payload") == null || !StormSubmitter.validateZKDigestPayload((String)conf.get("storm.zookeeper.topology.auth.payload"))) {
            String secretPayload = StormSubmitter.generateZookeeperDigestSecretPayload();
            toRet.put("storm.zookeeper.topology.auth.payload", secretPayload);
            LOG.info("Generated ZooKeeper secret payload for MD5-digest: " + secretPayload);
        }
        toRet.put("storm.zookeeper.topology.auth.scheme", "digest");
        return toRet;
    }

    private static Map<String, String> populateCredentials(Map<String, Object> conf, Map<String, String> creds) {
        HashMap<String, String> ret = new HashMap<String, String>();
        for (IAutoCredentials autoCred : AuthUtils.GetAutoCredentials(conf)) {
            LOG.info("Running " + autoCred);
            autoCred.populateCredentials(ret);
        }
        if (creds != null) {
            ret.putAll(creds);
        }
        return ret;
    }

    public static void pushCredentials(String name, Map<String, Object> topoConf, Map<String, String> credentials) throws AuthorizationException, NotAliveException, InvalidTopologyException {
        topoConf = new HashMap<String, Object>(topoConf);
        topoConf.putAll(Utils.readCommandLineOpts());
        Map<String, Object> conf = Utils.readStormConfig();
        conf.putAll(topoConf);
        Map<String, String> fullCreds = StormSubmitter.populateCredentials(conf, credentials);
        if (fullCreds.isEmpty()) {
            LOG.warn("No credentials were found to push to " + name);
            return;
        }
        try {
            if (localNimbus != null) {
                LOG.info("Pushing Credentials to topology {} in local mode", (Object)name);
                localNimbus.uploadNewCredentials(name, new Credentials(fullCreds));
            } else {
                try (NimbusClient client = NimbusClient.getConfiguredClient(conf);){
                    LOG.info("Uploading new credentials to {}", (Object)name);
                    client.getClient().uploadNewCredentials(name, new Credentials(fullCreds));
                }
            }
            LOG.info("Finished pushing creds to topology: {}", (Object)name);
        }
        catch (TException e) {
            throw new RuntimeException(e);
        }
    }

    public static void submitTopology(String name, Map<String, Object> topoConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
        StormSubmitter.submitTopology(name, topoConf, topology, null, null);
    }

    public static void submitTopology(String name, Map<String, Object> topoConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
        StormSubmitter.submitTopology(name, topoConf, topology, opts, null);
    }

    public static void submitTopologyAs(String name, Map<String, Object> topoConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener, String asUser) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, IllegalArgumentException {
        Map<String, Object> conf;
        block29: {
            Map<String, String> fullCreds;
            Credentials tmpCreds;
            if (!Utils.isValidConf(topoConf)) {
                throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
            }
            topoConf = new HashMap<String, Object>(topoConf);
            topoConf.putAll(Utils.readCommandLineOpts());
            conf = Utils.readStormConfig();
            conf.putAll(topoConf);
            topoConf.putAll(StormSubmitter.prepareZookeeperAuthentication(conf));
            StormSubmitter.validateConfs(conf, topology);
            Map<String, String> passedCreds = new HashMap<String, String>();
            if (opts != null && (tmpCreds = opts.get_creds()) != null) {
                passedCreds = tmpCreds.get_creds();
            }
            if (!(fullCreds = StormSubmitter.populateCredentials(conf, passedCreds)).isEmpty()) {
                if (opts == null) {
                    opts = new SubmitOptions(TopologyInitialStatus.ACTIVE);
                }
                opts.set_creds(new Credentials(fullCreds));
            }
            try {
                if (localNimbus != null) {
                    LOG.info("Submitting topology " + name + " in local mode");
                    if (opts != null) {
                        localNimbus.submitTopologyWithOpts(name, topoConf, topology, opts);
                    } else {
                        localNimbus.submitTopology(name, topoConf, topology);
                    }
                    LOG.info("Finished submitting topology: " + name);
                    break block29;
                }
                String serConf = JSONValue.toJSONString(topoConf);
                try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser);){
                    List<String> artifactsBlobKeys;
                    if (StormSubmitter.topologyNameExists(name, client)) {
                        throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
                    }
                    List<String> jarsBlobKeys = Collections.emptyList();
                    DependencyUploader uploader = new DependencyUploader();
                    try {
                        uploader.init();
                        jarsBlobKeys = StormSubmitter.uploadDependencyJarsToBlobStore(uploader);
                        artifactsBlobKeys = StormSubmitter.uploadDependencyArtifactsToBlobStore(uploader);
                    }
                    catch (Throwable e) {
                        uploader.deleteBlobs(jarsBlobKeys);
                        uploader.shutdown();
                        throw e;
                    }
                    try {
                        StormSubmitter.setDependencyBlobsToTopology(topology, jarsBlobKeys, artifactsBlobKeys);
                        StormSubmitter.submitTopologyInDistributeMode(name, topology, opts, progressListener, asUser, conf, serConf, client);
                    }
                    catch (AlreadyAliveException | AuthorizationException | InvalidTopologyException e) {
                        uploader.deleteBlobs(jarsBlobKeys);
                        throw e;
                    }
                    finally {
                        uploader.shutdown();
                    }
                }
            }
            catch (TException e) {
                throw new RuntimeException(e);
            }
        }
        StormSubmitter.invokeSubmitterHook(name, asUser, conf, topology);
    }

    private static List<String> uploadDependencyJarsToBlobStore(DependencyUploader uploader) {
        LOG.info("Uploading dependencies - jars...");
        DependencyPropertiesParser propertiesParser = new DependencyPropertiesParser();
        String depJarsProp = System.getProperty("storm.dependency.jars", "");
        List<File> depJars = propertiesParser.parseJarsProperties(depJarsProp);
        try {
            return uploader.uploadFiles(depJars, true);
        }
        catch (Throwable e) {
            throw new RuntimeException(e);
        }
    }

    private static List<String> uploadDependencyArtifactsToBlobStore(DependencyUploader uploader) {
        LOG.info("Uploading dependencies - artifacts...");
        DependencyPropertiesParser propertiesParser = new DependencyPropertiesParser();
        String depArtifactsProp = System.getProperty("storm.dependency.artifacts", "{}");
        Map<String, File> depArtifacts = propertiesParser.parseArtifactsProperties(depArtifactsProp);
        try {
            return uploader.uploadArtifacts(depArtifacts);
        }
        catch (Throwable e) {
            throw new RuntimeException(e);
        }
    }

    private static void setDependencyBlobsToTopology(StormTopology topology, List<String> jarsBlobKeys, List<String> artifactsBlobKeys) {
        LOG.info("Dependency Blob keys - jars : {} / artifacts : {}", jarsBlobKeys, artifactsBlobKeys);
        topology.set_dependency_jars(jarsBlobKeys);
        topology.set_dependency_artifacts(artifactsBlobKeys);
    }

    private static void submitTopologyInDistributeMode(String name, StormTopology topology, SubmitOptions opts, ProgressListener progressListener, String asUser, Map<String, Object> conf, String serConf, NimbusClient client) throws TException {
        try {
            String jar = StormSubmitter.submitJarAs(conf, System.getProperty("storm.jar"), progressListener, client);
            LOG.info("Submitting topology {} in distributed mode with conf {}", (Object)name, (Object)serConf);
            if (opts != null) {
                client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts);
            } else {
                client.getClient().submitTopology(name, jar, serConf, topology);
            }
            LOG.info("Finished submitting topology: {}", (Object)name);
        }
        catch (InvalidTopologyException e) {
            LOG.warn("Topology submission exception: {}", (Object)e.get_msg());
            throw e;
        }
        catch (AlreadyAliveException e) {
            LOG.warn("Topology already alive exception", (Throwable)((Object)e));
            throw e;
        }
    }

    private static void invokeSubmitterHook(String name, String asUser, Map<String, Object> topoConf, StormTopology topology) {
        String submissionNotifierClassName = null;
        try {
            if (topoConf.containsKey("storm.topology.submission.notifier.plugin.class")) {
                submissionNotifierClassName = topoConf.get("storm.topology.submission.notifier.plugin.class").toString();
                LOG.info("Initializing the registered ISubmitterHook [{}]", (Object)submissionNotifierClassName);
                if (submissionNotifierClassName == null || submissionNotifierClassName.isEmpty()) {
                    throw new IllegalArgumentException("storm.topology.submission.notifier.plugin.class property must be a non empty string.");
                }
                ISubmitterHook submitterHook = (ISubmitterHook)Class.forName(submissionNotifierClassName).newInstance();
                TopologyInfo topologyInfo = Utils.getTopologyInfo(name, asUser, topoConf);
                LOG.info("Invoking the registered ISubmitterHook [{}]", (Object)submissionNotifierClassName);
                submitterHook.notify(topologyInfo, topoConf, topology);
            }
        }
        catch (Exception e) {
            LOG.warn("Error occurred in invoking submitter hook:[{}] ", submissionNotifierClassName, (Object)e);
            throw new SubmitterHookException(e);
        }
    }

    public static void submitTopology(String name, Map<String, Object> topoConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
        StormSubmitter.submitTopologyAs(name, topoConf, topology, opts, progressListener, null);
    }

    public static void submitTopologyWithProgressBar(String name, Map<String, Object> topoConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
        StormSubmitter.submitTopologyWithProgressBar(name, topoConf, topology, null);
    }

    public static void submitTopologyWithProgressBar(String name, Map<String, Object> topoConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
        StormSubmitter.submitTopology(name, topoConf, topology, opts, new ProgressListener(){

            @Override
            public void onStart(String srcFile, String targetFile, long totalBytes) {
                System.out.printf("Start uploading file '%s' to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
            }

            @Override
            public void onProgress(String srcFile, String targetFile, long bytesUploaded, long totalBytes) {
                int length = 50;
                int p = (int)((long)length * bytesUploaded / totalBytes);
                String progress = StringUtils.repeat((String)"=", (int)p);
                String todo = StringUtils.repeat((String)" ", (int)(length - p));
                System.out.printf("\r[%s%s] %d / %d", progress, todo, bytesUploaded, totalBytes);
            }

            @Override
            public void onCompleted(String srcFile, String targetFile, long totalBytes) {
                System.out.printf("\nFile '%s' uploaded to '%s' (%d bytes)\n", srcFile, targetFile, totalBytes);
            }
        });
    }

    private static boolean topologyNameExists(String name, NimbusClient client) {
        try {
            return !client.getClient().isTopologyNameAllowed(name);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static String submitJar(Map<String, Object> conf, ProgressListener listener) {
        return StormSubmitter.submitJar(conf, System.getProperty("storm.jar"), listener);
    }

    public static String submitJar(Map<String, Object> conf, String localJar) {
        return StormSubmitter.submitJar(conf, localJar, null);
    }

    public static String submitJarAs(Map<String, Object> conf, String localJar, ProgressListener listener, NimbusClient client) {
        if (localJar == null) {
            throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
        }
        try {
            String uploadLocation = client.getClient().beginFileUpload();
            LOG.info("Uploading topology jar " + localJar + " to assigned location: " + uploadLocation);
            BufferFileInputStream is = new BufferFileInputStream(localJar, 307200);
            long totalSize = new File(localJar).length();
            if (listener != null) {
                listener.onStart(localJar, uploadLocation, totalSize);
            }
            long bytesUploaded = 0L;
            while (true) {
                byte[] toSubmit = is.read();
                bytesUploaded += (long)toSubmit.length;
                if (listener != null) {
                    listener.onProgress(localJar, uploadLocation, bytesUploaded, totalSize);
                }
                if (toSubmit.length == 0) break;
                client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
            }
            client.getClient().finishFileUpload(uploadLocation);
            if (listener != null) {
                listener.onCompleted(localJar, uploadLocation, totalSize);
            }
            LOG.info("Successfully uploaded topology jar to assigned location: " + uploadLocation);
            return uploadLocation;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static String submitJarAs(Map<String, Object> conf, String localJar, ProgressListener listener, String asUser) {
        if (localJar == null) {
            throw new RuntimeException("Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
        }
        try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser);){
            String string = StormSubmitter.submitJarAs(conf, localJar, listener, client);
            return string;
        }
    }

    public static String submitJar(Map<String, Object> conf, String localJar, ProgressListener listener) {
        return StormSubmitter.submitJarAs(conf, localJar, listener, (String)null);
    }

    private static void validateConfs(Map<String, Object> topoConf, StormTopology topology) throws IllegalArgumentException, InvalidTopologyException {
        ConfigValidation.validateFields(topoConf);
        StormSubmitter.validateTopologyWorkerMaxHeapSizeMBConfigs(topoConf, topology);
        Utils.validateTopologyBlobStoreMap(topoConf, StormSubmitter.getListOfKeysFromBlobStore(topoConf));
    }

    private static void validateTopologyWorkerMaxHeapSizeMBConfigs(Map<String, Object> topoConf, StormTopology topology) {
        double largestMemReq = StormSubmitter.getMaxExecutorMemoryUsageForTopo(topology, topoConf);
        Double topologyWorkerMaxHeapSize = ObjectReader.getDouble(topoConf.get("topology.worker.max.heap.size.mb"));
        if (topologyWorkerMaxHeapSize < largestMemReq) {
            throw new IllegalArgumentException("Topology will not be able to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB=" + ObjectReader.getDouble(topoConf.get("topology.worker.max.heap.size.mb")) + " < " + largestMemReq + " (Largest memory requirement of a component in the topology). Perhaps set TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount");
        }
    }

    private static double getMaxExecutorMemoryUsageForTopo(StormTopology topology, Map<String, Object> topologyConf) {
        double memoryRequirement;
        double largestMemoryOperator = 0.0;
        for (Map<String, Double> entry : ResourceUtils.getBoltsResources(topology, topologyConf).values()) {
            memoryRequirement = entry.get("topology.component.resources.offheap.memory.mb") + entry.get("topology.component.resources.onheap.memory.mb");
            if (!(memoryRequirement > largestMemoryOperator)) continue;
            largestMemoryOperator = memoryRequirement;
        }
        for (Map<String, Double> entry : ResourceUtils.getSpoutsResources(topology, topologyConf).values()) {
            memoryRequirement = entry.get("topology.component.resources.offheap.memory.mb") + entry.get("topology.component.resources.onheap.memory.mb");
            if (!(memoryRequirement > largestMemoryOperator)) continue;
            largestMemoryOperator = memoryRequirement;
        }
        return largestMemoryOperator;
    }

    private static Set<String> getListOfKeysFromBlobStore(Map<String, Object> topoConf) {
        try (NimbusBlobStore client = new NimbusBlobStore();){
            client.prepare(topoConf);
            HashSet hashSet = Sets.newHashSet(client.listKeys());
            return hashSet;
        }
    }

    public static interface ProgressListener {
        public void onStart(String var1, String var2, long var3);

        public void onProgress(String var1, String var2, long var3, long var5);

        public void onCompleted(String var1, String var2, long var3);
    }
}

