/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.client;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.NMClient;
import org.apache.hadoop.yarn.client.NMClientImpl;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;

@InterfaceStability.Unstable
@InterfaceStability.Evolving
public class NMClientAsync
extends AbstractService {
    private static final Log LOG = LogFactory.getLog(NMClientAsync.class);
    protected static final int INITIAL_THREAD_POOL_SIZE = 10;
    protected ThreadPoolExecutor threadPool;
    protected int maxThreadPoolSize;
    protected Thread eventDispatcherThread;
    protected AtomicBoolean stopped = new AtomicBoolean(false);
    protected BlockingQueue<ContainerEvent> events = new LinkedBlockingQueue<ContainerEvent>();
    protected NMClient client;
    protected CallbackHandler callbackHandler;
    protected ConcurrentMap<ContainerId, StatefulContainer> containers = new ConcurrentHashMap<ContainerId, StatefulContainer>();

    public NMClientAsync(CallbackHandler callbackHandler) {
        this(NMClientAsync.class.getName(), callbackHandler);
    }

    public NMClientAsync(String name, CallbackHandler callbackHandler) {
        this(name, new NMClientImpl(), callbackHandler);
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    protected NMClientAsync(String name, NMClient client, CallbackHandler callbackHandler) {
        super(name);
        this.client = client;
        this.callbackHandler = callbackHandler;
    }

    protected void serviceInit(Configuration conf) throws Exception {
        this.maxThreadPoolSize = conf.getInt("yarn.client.nodemanager-client-async.thread-pool-max-size", 500);
        LOG.info((Object)("Upper bound of the thread pool size is " + this.maxThreadPoolSize));
        this.client.init(conf);
        super.serviceInit(conf);
    }

    protected void serviceStart() throws Exception {
        this.client.start();
        ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(((Object)((Object)this)).getClass().getName() + " #%d").setDaemon(true).build();
        int initSize = Math.min(10, this.maxThreadPoolSize);
        this.threadPool = new ThreadPoolExecutor(initSize, Integer.MAX_VALUE, 1L, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
        this.eventDispatcherThread = new Thread(){

            @Override
            public void run() {
                ContainerEvent event = null;
                HashSet<String> allNodes = new HashSet<String>();
                while (!NMClientAsync.this.stopped.get() && !Thread.currentThread().isInterrupted()) {
                    int nodeNum;
                    int idealThreadPoolSize;
                    try {
                        event = NMClientAsync.this.events.take();
                    }
                    catch (InterruptedException e) {
                        if (!NMClientAsync.this.stopped.get()) {
                            LOG.error((Object)"Returning, thread interrupted", (Throwable)e);
                        }
                        return;
                    }
                    allNodes.add(event.getNodeId().toString());
                    int threadPoolSize = NMClientAsync.this.threadPool.getCorePoolSize();
                    if (threadPoolSize != NMClientAsync.this.maxThreadPoolSize && threadPoolSize < (idealThreadPoolSize = Math.min(NMClientAsync.this.maxThreadPoolSize, nodeNum = allNodes.size()))) {
                        int newThreadPoolSize = Math.min(NMClientAsync.this.maxThreadPoolSize, idealThreadPoolSize + 10);
                        LOG.info((Object)("Set NMClientAsync thread pool size to " + newThreadPoolSize + " as the number of nodes to talk to is " + nodeNum));
                        NMClientAsync.this.threadPool.setCorePoolSize(newThreadPoolSize);
                    }
                    NMClientAsync.this.threadPool.execute(NMClientAsync.this.getContainerEventProcessor(event));
                }
            }
        };
        this.eventDispatcherThread.setName("Container  Event Dispatcher");
        this.eventDispatcherThread.setDaemon(false);
        this.eventDispatcherThread.start();
        super.serviceStart();
    }

    protected void serviceStop() throws Exception {
        if (this.stopped.getAndSet(true)) {
            return;
        }
        if (this.eventDispatcherThread != null) {
            this.eventDispatcherThread.interrupt();
            try {
                this.eventDispatcherThread.join();
            }
            catch (InterruptedException e) {
                LOG.error((Object)("The thread of " + this.eventDispatcherThread.getName() + " didn't finish normally."), (Throwable)e);
            }
        }
        if (this.threadPool != null) {
            this.threadPool.shutdownNow();
        }
        if (this.client != null) {
            if ((!(this.client instanceof NMClientImpl) || ((NMClientImpl)this.client).cleanupRunningContainers.get()) && this.containers != null) {
                this.containers.clear();
            }
            this.client.stop();
        }
        super.serviceStop();
    }

    public void startContainer(Container container, ContainerLaunchContext containerLaunchContext) {
        if (this.containers.putIfAbsent(container.getId(), new StatefulContainer(this, container.getId())) != null) {
            this.callbackHandler.onStartContainerError(container.getId(), RPCUtil.getRemoteException((String)("Container " + container.getId() + " is already started or scheduled to start")));
        }
        try {
            this.events.put(new StartContainerEvent(container, containerLaunchContext));
        }
        catch (InterruptedException e) {
            LOG.warn((Object)("Exception when scheduling the event of starting Container " + container.getId()));
            this.callbackHandler.onStartContainerError(container.getId(), e);
        }
    }

    public void stopContainer(ContainerId containerId, NodeId nodeId, Token containerToken) {
        if (this.containers.get(containerId) == null) {
            this.callbackHandler.onStopContainerError(containerId, RPCUtil.getRemoteException((String)("Container " + containerId + " is neither started nor scheduled to start")));
        }
        try {
            this.events.put(new ContainerEvent(containerId, nodeId, containerToken, ContainerEventType.STOP_CONTAINER));
        }
        catch (InterruptedException e) {
            LOG.warn((Object)("Exception when scheduling the event of stopping Container " + containerId));
            this.callbackHandler.onStopContainerError(containerId, e);
        }
    }

    public void getContainerStatus(ContainerId containerId, NodeId nodeId, Token containerToken) {
        try {
            this.events.put(new ContainerEvent(containerId, nodeId, containerToken, ContainerEventType.QUERY_CONTAINER));
        }
        catch (InterruptedException e) {
            LOG.warn((Object)("Exception when scheduling the event of querying the status of Container " + containerId));
            this.callbackHandler.onGetContainerStatusError(containerId, e);
        }
    }

    protected boolean isCompletelyDone(StatefulContainer container) {
        return container.getState() == ContainerState.DONE || container.getState() == ContainerState.FAILED;
    }

    protected ContainerEventProcessor getContainerEventProcessor(ContainerEvent event) {
        return new ContainerEventProcessor(event);
    }

    public static interface CallbackHandler {
        public void onContainerStarted(ContainerId var1, Map<String, ByteBuffer> var2);

        public void onContainerStatusReceived(ContainerId var1, ContainerStatus var2);

        public void onContainerStopped(ContainerId var1);

        public void onStartContainerError(ContainerId var1, Throwable var2);

        public void onGetContainerStatusError(ContainerId var1, Throwable var2);

        public void onStopContainerError(ContainerId var1, Throwable var2);
    }

    protected class ContainerEventProcessor
    implements Runnable {
        protected ContainerEvent event;

        public ContainerEventProcessor(ContainerEvent event) {
            this.event = event;
        }

        @Override
        public void run() {
            ContainerId containerId = this.event.getContainerId();
            LOG.info((Object)("Processing Event " + (Object)((Object)this.event) + " for Container " + containerId));
            if (this.event.getType() == ContainerEventType.QUERY_CONTAINER) {
                try {
                    ContainerStatus containerStatus = NMClientAsync.this.client.getContainerStatus(containerId, this.event.getNodeId(), this.event.getContainerToken());
                    try {
                        NMClientAsync.this.callbackHandler.onContainerStatusReceived(containerId, containerStatus);
                    }
                    catch (Throwable thr) {
                        LOG.info((Object)("Unchecked exception is thrown from onContainerStatusReceived for Container " + this.event.getContainerId()), thr);
                    }
                }
                catch (YarnException e) {
                    this.onExceptionRaised(containerId, e);
                }
                catch (IOException e) {
                    this.onExceptionRaised(containerId, e);
                }
                catch (Throwable t) {
                    this.onExceptionRaised(containerId, t);
                }
            } else {
                StatefulContainer container = (StatefulContainer)NMClientAsync.this.containers.get(containerId);
                if (container == null) {
                    LOG.info((Object)("Container " + containerId + " is already stopped or failed"));
                } else {
                    container.handle(this.event);
                    if (NMClientAsync.this.isCompletelyDone(container)) {
                        NMClientAsync.this.containers.remove(containerId);
                    }
                }
            }
        }

        private void onExceptionRaised(ContainerId containerId, Throwable t) {
            try {
                NMClientAsync.this.callbackHandler.onGetContainerStatusError(containerId, t);
            }
            catch (Throwable thr) {
                LOG.info((Object)("Unchecked exception is thrown from onGetContainerStatusError for Container " + containerId), thr);
            }
        }
    }

    protected static class StatefulContainer
    implements EventHandler<ContainerEvent> {
        protected static final StateMachineFactory<StatefulContainer, ContainerState, ContainerEventType, ContainerEvent> stateMachineFactory = new StateMachineFactory((Enum)ContainerState.PREP).addTransition((Enum)ContainerState.PREP, EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED), (Enum)ContainerEventType.START_CONTAINER, (MultipleArcTransition)new StartContainerTransition()).addTransition((Enum)ContainerState.PREP, (Enum)ContainerState.DONE, (Enum)ContainerEventType.STOP_CONTAINER, (SingleArcTransition)new OutOfOrderTransition()).addTransition((Enum)ContainerState.RUNNING, EnumSet.of(ContainerState.DONE, ContainerState.FAILED), (Enum)ContainerEventType.STOP_CONTAINER, (MultipleArcTransition)new StopContainerTransition()).addTransition((Enum)ContainerState.DONE, (Enum)ContainerState.DONE, EnumSet.of(ContainerEventType.START_CONTAINER, ContainerEventType.STOP_CONTAINER)).addTransition((Enum)ContainerState.FAILED, (Enum)ContainerState.FAILED, EnumSet.of(ContainerEventType.START_CONTAINER, ContainerEventType.STOP_CONTAINER));
        private final NMClientAsync nmClientAsync;
        private final ContainerId containerId;
        private final StateMachine<ContainerState, ContainerEventType, ContainerEvent> stateMachine;
        private final ReentrantReadWriteLock.ReadLock readLock;
        private final ReentrantReadWriteLock.WriteLock writeLock;

        public StatefulContainer(NMClientAsync client, ContainerId containerId) {
            this.nmClientAsync = client;
            this.containerId = containerId;
            this.stateMachine = stateMachineFactory.make((Object)this);
            ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
            this.readLock = lock.readLock();
            this.writeLock = lock.writeLock();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void handle(ContainerEvent event) {
            this.writeLock.lock();
            try {
                try {
                    this.stateMachine.doTransition(event.getType(), (Object)event);
                }
                catch (InvalidStateTransitonException e) {
                    LOG.error((Object)"Can't handle this event at current state", (Throwable)e);
                }
            }
            finally {
                this.writeLock.unlock();
            }
        }

        public ContainerId getContainerId() {
            return this.containerId;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public ContainerState getState() {
            this.readLock.lock();
            try {
                ContainerState containerState = (ContainerState)this.stateMachine.getCurrentState();
                return containerState;
            }
            finally {
                this.readLock.unlock();
            }
        }

        protected static class OutOfOrderTransition
        implements SingleArcTransition<StatefulContainer, ContainerEvent> {
            protected static final String STOP_BEFORE_START_ERROR_MSG = "Container was killed before it was launched";

            protected OutOfOrderTransition() {
            }

            public void transition(StatefulContainer container, ContainerEvent event) {
                try {
                    ((StatefulContainer)container).nmClientAsync.callbackHandler.onStartContainerError(event.getContainerId(), RPCUtil.getRemoteException((String)STOP_BEFORE_START_ERROR_MSG));
                }
                catch (Throwable thr) {
                    LOG.info((Object)("Unchecked exception is thrown from onStartContainerError for Container " + event.getContainerId()), thr);
                }
            }
        }

        protected static class StopContainerTransition
        implements MultipleArcTransition<StatefulContainer, ContainerEvent, ContainerState> {
            protected StopContainerTransition() {
            }

            public ContainerState transition(StatefulContainer container, ContainerEvent event) {
                ContainerId containerId = event.getContainerId();
                try {
                    ((StatefulContainer)container).nmClientAsync.client.stopContainer(containerId, event.getNodeId(), event.getContainerToken());
                    try {
                        ((StatefulContainer)container).nmClientAsync.callbackHandler.onContainerStopped(event.getContainerId());
                    }
                    catch (Throwable thr) {
                        LOG.info((Object)("Unchecked exception is thrown from onContainerStopped for Container " + event.getContainerId()), thr);
                    }
                    return ContainerState.DONE;
                }
                catch (YarnException e) {
                    return this.onExceptionRaised(container, event, e);
                }
                catch (IOException e) {
                    return this.onExceptionRaised(container, event, e);
                }
                catch (Throwable t) {
                    return this.onExceptionRaised(container, event, t);
                }
            }

            private ContainerState onExceptionRaised(StatefulContainer container, ContainerEvent event, Throwable t) {
                try {
                    ((StatefulContainer)container).nmClientAsync.callbackHandler.onStopContainerError(event.getContainerId(), t);
                }
                catch (Throwable thr) {
                    LOG.info((Object)("Unchecked exception is thrown from onStopContainerError for Container " + event.getContainerId()), thr);
                }
                return ContainerState.FAILED;
            }
        }

        protected static class StartContainerTransition
        implements MultipleArcTransition<StatefulContainer, ContainerEvent, ContainerState> {
            protected StartContainerTransition() {
            }

            public ContainerState transition(StatefulContainer container, ContainerEvent event) {
                ContainerId containerId = event.getContainerId();
                try {
                    StartContainerEvent scEvent = null;
                    if (event instanceof StartContainerEvent) {
                        scEvent = (StartContainerEvent)event;
                    }
                    assert (scEvent != null);
                    Map<String, ByteBuffer> allServiceResponse = ((StatefulContainer)container).nmClientAsync.client.startContainer(scEvent.getContainer(), scEvent.getContainerLaunchContext());
                    try {
                        ((StatefulContainer)container).nmClientAsync.callbackHandler.onContainerStarted(containerId, allServiceResponse);
                    }
                    catch (Throwable thr) {
                        LOG.info((Object)("Unchecked exception is thrown from onContainerStarted for Container " + containerId), thr);
                    }
                    return ContainerState.RUNNING;
                }
                catch (YarnException e) {
                    return this.onExceptionRaised(container, event, e);
                }
                catch (IOException e) {
                    return this.onExceptionRaised(container, event, e);
                }
                catch (Throwable t) {
                    return this.onExceptionRaised(container, event, t);
                }
            }

            private ContainerState onExceptionRaised(StatefulContainer container, ContainerEvent event, Throwable t) {
                try {
                    ((StatefulContainer)container).nmClientAsync.callbackHandler.onStartContainerError(event.getContainerId(), t);
                }
                catch (Throwable thr) {
                    LOG.info((Object)("Unchecked exception is thrown from onStartContainerError for Container " + event.getContainerId()), thr);
                }
                return ContainerState.FAILED;
            }
        }
    }

    protected static class StartContainerEvent
    extends ContainerEvent {
        private Container container;
        private ContainerLaunchContext containerLaunchContext;

        public StartContainerEvent(Container container, ContainerLaunchContext containerLaunchContext) {
            super(container.getId(), container.getNodeId(), container.getContainerToken(), ContainerEventType.START_CONTAINER);
            this.container = container;
            this.containerLaunchContext = containerLaunchContext;
        }

        public Container getContainer() {
            return this.container;
        }

        public ContainerLaunchContext getContainerLaunchContext() {
            return this.containerLaunchContext;
        }
    }

    protected static class ContainerEvent
    extends AbstractEvent<ContainerEventType> {
        private ContainerId containerId;
        private NodeId nodeId;
        private Token containerToken;

        public ContainerEvent(ContainerId containerId, NodeId nodeId, Token containerToken, ContainerEventType type) {
            super((Enum)type);
            this.containerId = containerId;
            this.nodeId = nodeId;
            this.containerToken = containerToken;
        }

        public ContainerId getContainerId() {
            return this.containerId;
        }

        public NodeId getNodeId() {
            return this.nodeId;
        }

        public Token getContainerToken() {
            return this.containerToken;
        }
    }

    protected static enum ContainerEventType {
        START_CONTAINER,
        STOP_CONTAINER,
        QUERY_CONTAINER;

    }

    protected static enum ContainerState {
        PREP,
        FAILED,
        RUNNING,
        DONE;

    }
}

