/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.executor;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.storm.daemon.Shutdownable;
import org.apache.storm.daemon.Task;
import org.apache.storm.executor.Executor;
import org.apache.storm.executor.IRunningExecutor;
import org.apache.storm.generated.Credentials;
import org.apache.storm.generated.ExecutorStats;
import org.apache.storm.hooks.ITaskHook;
import org.apache.storm.spout.ISpout;
import org.apache.storm.task.IBolt;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.TupleImpl;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutorShutdown
implements Shutdownable,
IRunningExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutorShutdown.class);
    private final Executor executor;
    private final List<Utils.SmartThread> threads;
    private final Map<Integer, Task> taskDatas;

    public ExecutorShutdown(Executor executor, List<Utils.SmartThread> threads, Map<Integer, Task> taskDatas) {
        this.executor = executor;
        this.threads = threads;
        this.taskDatas = taskDatas;
    }

    @Override
    public ExecutorStats renderStats() {
        return this.executor.getStats().renderStats();
    }

    @Override
    public List<Long> getExecutorId() {
        return this.executor.getExecutorId();
    }

    @Override
    public void credentialsChanged(Credentials credentials) {
        TupleImpl tuple = new TupleImpl(this.executor.getWorkerTopologyContext(), new Values(credentials), -1, "__credentials");
        ArrayList addressedTuple = Lists.newArrayList((Object[])new AddressedTuple[]{new AddressedTuple(-2, tuple)});
        this.executor.getReceiveQueue().publish(addressedTuple);
    }

    @Override
    public boolean getBackPressureFlag() {
        return this.executor.getBackpressure();
    }

    @Override
    public void shutdown() {
        try {
            LOG.info("Shutting down executor " + this.executor.getComponentId() + ":" + this.executor.getExecutorId());
            this.executor.getReceiveQueue().haltWithInterrupt();
            this.executor.getTransferWorkerQueue().haltWithInterrupt();
            for (Utils.SmartThread t : this.threads) {
                t.interrupt();
            }
            for (Utils.SmartThread t : this.threads) {
                LOG.debug("Executor " + this.executor.getComponentId() + ":" + this.executor.getExecutorId() + " joining thread " + t.getName());
                t.join();
            }
            this.executor.getStats().cleanupStats();
            for (Task task : this.taskDatas.values()) {
                TopologyContext userContext = task.getUserContext();
                for (ITaskHook hook : userContext.getHooks()) {
                    hook.cleanup();
                }
            }
            this.executor.getStormClusterState().disconnect();
            if (this.executor.getOpenOrPrepareWasCalled().get()) {
                for (Task task : this.taskDatas.values()) {
                    Object object = task.getTaskObject();
                    if (object instanceof ISpout) {
                        ((ISpout)object).close();
                        continue;
                    }
                    if (object instanceof IBolt) {
                        ((IBolt)object).cleanup();
                        continue;
                    }
                    LOG.error("unknown component object");
                }
            }
            LOG.info("Shut down executor " + this.executor.getComponentId() + ":" + this.executor.getExecutorId());
        }
        catch (Exception e) {
            throw Utils.wrapInRuntime(e);
        }
    }
}

