/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.daemon.worker;

import com.google.common.base.Preconditions;
import com.lmax.disruptor.EventHandler;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.security.auth.Subject;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.ObjectUtils;
import org.apache.storm.cluster.ClusterStateContext;
import org.apache.storm.cluster.ClusterUtils;
import org.apache.storm.cluster.DaemonType;
import org.apache.storm.cluster.IStateStorage;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.DaemonCommon;
import org.apache.storm.daemon.Shutdownable;
import org.apache.storm.daemon.StormCommon;
import org.apache.storm.daemon.worker.LogConfigManager;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.executor.Executor;
import org.apache.storm.executor.ExecutorShutdown;
import org.apache.storm.executor.IRunningExecutor;
import org.apache.storm.executor.LocalExecutor;
import org.apache.storm.generated.Credentials;
import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.generated.ExecutorStats;
import org.apache.storm.generated.LSWorkerHeartbeat;
import org.apache.storm.generated.LogConfig;
import org.apache.storm.messaging.IConnection;
import org.apache.storm.messaging.IContext;
import org.apache.storm.security.auth.AuthUtils;
import org.apache.storm.security.auth.IAutoCredentials;
import org.apache.storm.stats.StatsUtil;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.DisruptorBackpressureCallback;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.apache.storm.utils.WorkerBackpressureCallback;
import org.apache.storm.utils.WorkerBackpressureThread;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import uk.org.lidalia.sysoutslf4j.context.SysOutOverSLF4J;

public class Worker
implements Shutdownable,
DaemonCommon {
    private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
    private final Map<String, Object> conf;
    private final IContext context;
    private final String topologyId;
    private final String assignmentId;
    private final int port;
    private final String workerId;
    private final LogConfigManager logConfigManager;
    private WorkerState workerState;
    private AtomicReference<List<IRunningExecutor>> executorsAtom;
    private Thread transferThread;
    private WorkerBackpressureThread backpressureThread;
    private AtomicReference<Credentials> credentialsAtom;
    private Subject subject;
    private Collection<IAutoCredentials> autoCreds;

    public Worker(Map<String, Object> conf, IContext context, String topologyId, String assignmentId, int port, String workerId) {
        this.conf = conf;
        this.context = context;
        this.topologyId = topologyId;
        this.assignmentId = assignmentId;
        this.port = port;
        this.workerId = workerId;
        this.logConfigManager = new LogConfigManager();
    }

    public void start() throws Exception {
        LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", new Object[]{this.topologyId, this.assignmentId, this.port, this.workerId, this.conf});
        if (!ConfigUtils.isLocalMode(this.conf)) {
            SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
            String pid = Utils.processPid();
            FileUtils.touch((File)new File(ConfigUtils.workerPidPath(this.conf, this.workerId, pid)));
            FileUtils.writeStringToFile((File)new File(ConfigUtils.workerArtifactsPidPath(this.conf, this.topologyId, this.port)), (String)pid, (Charset)Charset.forName("UTF-8"));
        }
        final Map topologyConf = ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(this.conf, this.topologyId));
        List<ACL> acls = Utils.getWorkerACL(topologyConf);
        final IStateStorage stateStorage = ClusterUtils.mkStateStorage(this.conf, topologyConf, acls, new ClusterStateContext(DaemonType.WORKER));
        final IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(stateStorage, acls, new ClusterStateContext());
        final Credentials initialCredentials = stormClusterState.credentials(this.topologyId, null);
        final HashMap<String, String> initCreds = new HashMap<String, String>();
        if (initialCredentials != null) {
            initCreds.putAll(initialCredentials.get_creds());
        }
        this.autoCreds = AuthUtils.GetAutoCredentials(topologyConf);
        this.subject = AuthUtils.populateSubject(null, this.autoCreds, initCreds);
        Subject.doAs(this.subject, new PrivilegedExceptionAction<Object>(){

            @Override
            public Object run() throws Exception {
                Worker.this.workerState = new WorkerState(Worker.this.conf, Worker.this.context, Worker.this.topologyId, Worker.this.assignmentId, Worker.this.port, Worker.this.workerId, topologyConf, stateStorage, stormClusterState);
                Worker.this.doHeartBeat();
                Worker.this.executorsAtom = new AtomicReference<Object>(null);
                ((Worker)Worker.this).workerState.heartbeatTimer.scheduleRecurring(0, (Integer)Worker.this.conf.get("worker.heartbeat.frequency.secs"), () -> {
                    try {
                        Worker.this.doHeartBeat();
                    }
                    catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                });
                ((Worker)Worker.this).workerState.executorHeartbeatTimer.scheduleRecurring(0, (Integer)Worker.this.conf.get("worker.heartbeat.frequency.secs"), Worker.this::doExecutorHeartbeats);
                Worker.this.workerState.registerCallbacks();
                Worker.this.workerState.refreshConnections(null);
                Worker.this.workerState.activateWorkerWhenAllConnectionsReady();
                Worker.this.workerState.refreshStormActive(null);
                Worker.this.workerState.runWorkerStartHooks();
                ArrayList<ExecutorShutdown> newExecutors = new ArrayList<ExecutorShutdown>();
                for (List<Long> e : Worker.this.workerState.getExecutors()) {
                    if (ConfigUtils.isLocalMode(topologyConf)) {
                        newExecutors.add(LocalExecutor.mkExecutor(Worker.this.workerState, e, initCreds).execute());
                        continue;
                    }
                    newExecutors.add(Executor.mkExecutor(Worker.this.workerState, e, initCreds).execute());
                }
                Worker.this.executorsAtom.set(newExecutors);
                EventHandler tupleHandler = (packets, seqId, batchEnd) -> Worker.this.workerState.sendTuplesToRemoteWorker((HashMap)packets, seqId, batchEnd);
                Worker.this.transferThread = Utils.asyncLoop(() -> {
                    ((Worker)Worker.this).workerState.transferQueue.consumeBatchWhenAvailable((EventHandler<Object>)tupleHandler);
                    return 0L;
                });
                DisruptorBackpressureCallback disruptorBackpressureHandler = Worker.this.mkDisruptorBackpressureHandler(Worker.this.workerState);
                ((Worker)Worker.this).workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
                ((Worker)Worker.this).workerState.transferQueue.setEnableBackpressure((Boolean)topologyConf.get("topology.backpressure.enable"));
                ((Worker)Worker.this).workerState.transferQueue.setHighWaterMark(ObjectReader.getDouble(topologyConf.get("backpressure.disruptor.high.watermark")));
                ((Worker)Worker.this).workerState.transferQueue.setLowWaterMark(ObjectReader.getDouble(topologyConf.get("backpressure.disruptor.low.watermark")));
                WorkerBackpressureCallback backpressureCallback = Worker.this.mkBackpressureHandler();
                Worker.this.backpressureThread = new WorkerBackpressureThread(((Worker)Worker.this).workerState.backpressureTrigger, Worker.this.workerState, backpressureCallback);
                if (((Boolean)topologyConf.get("topology.backpressure.enable")).booleanValue()) {
                    Worker.this.backpressureThread.start();
                    stormClusterState.topologyBackpressure(Worker.this.topologyId, Worker.this.workerState::refreshThrottle);
                    int pollingSecs = ObjectReader.getInt(topologyConf.get("task.backpressure.poll.secs"));
                    ((Worker)Worker.this).workerState.refreshBackpressureTimer.scheduleRecurring(0, pollingSecs, Worker.this.workerState::refreshThrottle);
                }
                Worker.this.credentialsAtom = new AtomicReference<Credentials>(initialCredentials);
                Worker.this.establishLogSettingCallback();
                ((Worker)Worker.this).workerState.stormClusterState.credentials(Worker.this.topologyId, Worker.this::checkCredentialsChanged);
                ((Worker)Worker.this).workerState.refreshCredentialsTimer.scheduleRecurring(0, (Integer)Worker.this.conf.get("task.credentials.poll.secs"), new Runnable(){

                    @Override
                    public void run() {
                        Worker.this.checkCredentialsChanged();
                        if (((Boolean)topologyConf.get("topology.backpressure.enable")).booleanValue()) {
                            Worker.this.checkThrottleChanged();
                        }
                    }
                });
                if (!((Boolean)topologyConf.get("topology.disable.loadaware.messaging")).booleanValue()) {
                    ((Worker)Worker.this).workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, Worker.this.workerState::refreshLoad);
                }
                ((Worker)Worker.this).workerState.refreshConnectionsTimer.scheduleRecurring(0, (Integer)Worker.this.conf.get("task.refresh.poll.secs"), Worker.this.workerState::refreshConnections);
                ((Worker)Worker.this).workerState.resetLogLevelsTimer.scheduleRecurring(0, (Integer)Worker.this.conf.get("worker.log.level.reset.poll.secs"), Worker.this.logConfigManager::resetLogLevels);
                ((Worker)Worker.this).workerState.refreshActiveTimer.scheduleRecurring(0, (Integer)Worker.this.conf.get("task.refresh.poll.secs"), Worker.this.workerState::refreshStormActive);
                LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, "storm.zookeeper.topology.auth.payload"));
                LOG.info("Worker {} for storm {} on {}:{}  has finished loading", new Object[]{Worker.this.workerId, Worker.this.topologyId, Worker.this.assignmentId, Worker.this.port});
                return this;
            }
        });
    }

    public void doHeartBeat() throws IOException {
        LocalState state = ConfigUtils.workerState(this.workerState.conf, this.workerState.workerId);
        state.setWorkerHeartBeat(new LSWorkerHeartbeat(Time.currentTimeSecs(), this.workerState.topologyId, this.workerState.executors.stream().map(executor -> new ExecutorInfo(((Long)executor.get(0)).intValue(), ((Long)executor.get(1)).intValue())).collect(Collectors.toList()), this.workerState.port));
        state.cleanup(60);
    }

    public void doExecutorHeartbeats() {
        List<IRunningExecutor> executors = this.executorsAtom.get();
        Map<List<Integer>, ExecutorStats> stats = null == executors ? StatsUtil.mkEmptyExecutorZkHbs(this.workerState.executors) : StatsUtil.convertExecutorZkHbs(executors.stream().collect(Collectors.toMap(IRunningExecutor::getExecutorId, IRunningExecutor::renderStats)));
        Map<String, Object> zkHB = StatsUtil.mkZkWorkerHb(this.workerState.topologyId, stats, this.workerState.uptime.upTime());
        try {
            this.workerState.stormClusterState.workerHeartbeat(this.workerState.topologyId, this.workerState.assignmentId, Long.valueOf(this.workerState.port), StatsUtil.thriftifyZkWorkerHb(zkHB));
        }
        catch (Exception ex) {
            LOG.error("Worker failed to write heartbeats to ZK or Pacemaker...will retry", (Throwable)ex);
        }
    }

    public void checkCredentialsChanged() {
        Credentials newCreds = this.workerState.stormClusterState.credentials(this.topologyId, null);
        if (!ObjectUtils.equals((Object)newCreds, (Object)this.credentialsAtom.get())) {
            AuthUtils.updateSubject(this.subject, this.autoCreds, null == newCreds ? null : newCreds.get_creds());
            for (IRunningExecutor executor : this.executorsAtom.get()) {
                executor.credentialsChanged(newCreds);
            }
            this.credentialsAtom.set(newCreds);
        }
    }

    public void checkThrottleChanged() {
        boolean throttleOn = this.workerState.stormClusterState.topologyBackpressure(this.topologyId, this::checkThrottleChanged);
        this.workerState.throttleOn.set(throttleOn);
    }

    public void checkLogConfigChanged() {
        LogConfig logConfig = this.workerState.stormClusterState.topologyLogConfig(this.topologyId, null);
        this.logConfigManager.processLogConfigChange(logConfig);
        this.establishLogSettingCallback();
    }

    public void establishLogSettingCallback() {
        this.workerState.stormClusterState.topologyLogConfig(this.topologyId, this::checkLogConfigChanged);
    }

    private DisruptorBackpressureCallback mkDisruptorBackpressureHandler(final WorkerState workerState) {
        return new DisruptorBackpressureCallback(){

            @Override
            public void highWaterMark() throws Exception {
                LOG.debug("worker {} transfer-queue is congested, checking backpressure state", (Object)workerState.workerId);
                WorkerBackpressureThread.notifyBackpressureChecker(workerState.backpressureTrigger);
            }

            @Override
            public void lowWaterMark() throws Exception {
                LOG.debug("worker {} transfer-queue is not congested, checking backpressure state", (Object)workerState.workerId);
                WorkerBackpressureThread.notifyBackpressureChecker(workerState.backpressureTrigger);
            }
        };
    }

    private WorkerBackpressureCallback mkBackpressureHandler() {
        final List<IRunningExecutor> executors = this.executorsAtom.get();
        return new WorkerBackpressureCallback(){

            @Override
            public void onEvent(Object obj) {
                boolean prevBackpressureFlag;
                String topologyId = ((Worker)Worker.this).workerState.topologyId;
                String assignmentId = ((Worker)Worker.this).workerState.assignmentId;
                int port = ((Worker)Worker.this).workerState.port;
                IStormClusterState stormClusterState = ((Worker)Worker.this).workerState.stormClusterState;
                boolean currBackpressureFlag = prevBackpressureFlag = ((Worker)Worker.this).workerState.backpressure.get();
                if (null != executors) {
                    boolean bl = currBackpressureFlag = ((Worker)Worker.this).workerState.transferQueue.getThrottleOn() || executors.stream().map(IRunningExecutor::getBackPressureFlag).reduce((op1, op2) -> op1 != false || op2 != false).get() != false;
                }
                if (currBackpressureFlag != prevBackpressureFlag) {
                    try {
                        LOG.debug("worker backpressure flag changing from {} to {}", (Object)prevBackpressureFlag, (Object)currBackpressureFlag);
                        stormClusterState.workerBackpressure(topologyId, assignmentId, Long.valueOf(port), currBackpressureFlag);
                        ((Worker)Worker.this).workerState.backpressure.set(currBackpressureFlag);
                    }
                    catch (Exception ex) {
                        LOG.error("workerBackpressure update failed when connecting to ZK ... will retry", (Throwable)ex);
                    }
                }
            }
        };
    }

    @Override
    public void shutdown() {
        try {
            LOG.info("Shutting down worker {} {} {}", new Object[]{this.topologyId, this.assignmentId, this.port});
            for (IConnection socket : this.workerState.cachedNodeToPortSocket.get().values()) {
                socket.close();
            }
            LOG.info("Terminating messaging context");
            LOG.info("Shutting down executors");
            for (IRunningExecutor executor : this.executorsAtom.get()) {
                ((ExecutorShutdown)executor).shutdown();
            }
            LOG.info("Shut down executors");
            this.workerState.mqContext.term();
            LOG.info("Shutting down transfer thread");
            this.workerState.transferQueue.haltWithInterrupt();
            this.transferThread.interrupt();
            this.transferThread.join();
            LOG.info("Shut down transfer thread");
            this.backpressureThread.terminate();
            LOG.info("Shut down backpressure thread");
            this.workerState.heartbeatTimer.close();
            this.workerState.refreshConnectionsTimer.close();
            this.workerState.refreshCredentialsTimer.close();
            this.workerState.refreshBackpressureTimer.close();
            this.workerState.refreshActiveTimer.close();
            this.workerState.executorHeartbeatTimer.close();
            this.workerState.userTimer.close();
            this.workerState.refreshLoadTimer.close();
            this.workerState.resetLogLevelsTimer.close();
            this.workerState.closeResources();
            LOG.info("Trigger any worker shutdown hooks");
            this.workerState.runWorkerShutdownHooks();
            this.workerState.stormClusterState.removeWorkerHeartbeat(this.topologyId, this.assignmentId, Long.valueOf(this.port));
            this.workerState.stormClusterState.removeWorkerBackpressure(this.topologyId, this.assignmentId, Long.valueOf(this.port));
            LOG.info("Disconnecting from storm cluster state context");
            this.workerState.stormClusterState.disconnect();
            this.workerState.stateStorage.close();
            LOG.info("Shut down worker {} {} {}", new Object[]{this.topologyId, this.assignmentId, this.port});
        }
        catch (Exception ex) {
            throw Utils.wrapInRuntime(ex);
        }
    }

    @Override
    public boolean isWaiting() {
        return this.workerState.heartbeatTimer.isTimerWaiting() && this.workerState.refreshConnectionsTimer.isTimerWaiting() && this.workerState.refreshLoadTimer.isTimerWaiting() && this.workerState.refreshCredentialsTimer.isTimerWaiting() && this.workerState.refreshBackpressureTimer.isTimerWaiting() && this.workerState.refreshActiveTimer.isTimerWaiting() && this.workerState.executorHeartbeatTimer.isTimerWaiting() && this.workerState.userTimer.isTimerWaiting();
    }

    public static void main(String[] args) throws Exception {
        Preconditions.checkArgument((args.length == 4 ? 1 : 0) != 0, (Object)("Illegal number of arguemtns. Expected: 4, Actual: " + args.length));
        String stormId = args[0];
        String assignmentId = args[1];
        String portStr = args[2];
        String workerId = args[3];
        Map<String, Object> conf = Utils.readStormConfig();
        Utils.setupDefaultUncaughtExceptionHandler();
        StormCommon.validateDistributedMode(conf);
        Worker worker = new Worker(conf, null, stormId, assignmentId, Integer.parseInt(portStr), workerId);
        worker.start();
        Utils.addShutdownHookWithForceKillIn1Sec(worker::shutdown);
    }
}

