/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskmanager;

import java.io.IOException;
import java.net.URL;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StateUtils;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.runtime.taskmanager.RuntimeEnvironment;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskInputSplitProvider;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;

public class Task
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
    private static final ThreadGroup TASK_THREADS_GROUP = new ThreadGroup("Flink Task Threads");
    private static final AtomicReferenceFieldUpdater<Task, ExecutionState> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Task.class, ExecutionState.class, "executionState");
    private final JobID jobId;
    private final JobVertexID vertexId;
    private final ExecutionAttemptID executionId;
    private final TaskInfo taskInfo;
    private final String taskNameWithSubtask;
    private final Configuration jobConfiguration;
    private final Configuration taskConfiguration;
    private final List<BlobKey> requiredJarFiles;
    private final List<URL> requiredClasspaths;
    private final String nameOfInvokableClass;
    private final TaskManagerRuntimeInfo taskManagerConfig;
    private final MemoryManager memoryManager;
    private final IOManager ioManager;
    private final BroadcastVariableManager broadcastVariableManager;
    private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
    private final ResultPartition[] producedPartitions;
    private final ResultPartitionWriter[] writers;
    private final SingleInputGate[] inputGates;
    private final Map<IntermediateDataSetID, SingleInputGate> inputGatesById;
    private final ActorGateway taskManager;
    private final ActorGateway jobManager;
    private final List<ActorGateway> executionListenerActors;
    private final FiniteDuration actorAskTimeout;
    private final LibraryCacheManager libraryCache;
    private final FileCache fileCache;
    private final NetworkEnvironment network;
    private final AccumulatorRegistry accumulatorRegistry;
    private final Thread executingThread;
    private final TaskMetricGroup metrics;
    private final AtomicBoolean invokableHasBeenCanceled;
    private volatile AbstractInvokable invokable;
    private volatile ExecutionState executionState = ExecutionState.CREATED;
    private volatile Throwable failureCause;
    private volatile ExecutorService asyncCallDispatcher;
    private volatile SerializedValue<StateHandle<?>> operatorState;
    private long taskCancellationInterval;

    public Task(TaskDeploymentDescriptor tdd, MemoryManager memManager, IOManager ioManager, NetworkEnvironment networkEnvironment, BroadcastVariableManager bcVarManager, ActorGateway taskManagerActor, ActorGateway jobManagerActor, FiniteDuration actorAskTimeout, LibraryCacheManager libraryCache, FileCache fileCache, TaskManagerRuntimeInfo taskManagerConfig, TaskMetricGroup metricGroup) {
        int i;
        this.taskInfo = (TaskInfo)Preconditions.checkNotNull((Object)tdd.getTaskInfo());
        this.jobId = (JobID)Preconditions.checkNotNull((Object)tdd.getJobID());
        this.vertexId = (JobVertexID)((Object)Preconditions.checkNotNull((Object)((Object)tdd.getVertexID())));
        this.executionId = (ExecutionAttemptID)((Object)Preconditions.checkNotNull((Object)((Object)tdd.getExecutionId())));
        this.taskNameWithSubtask = this.taskInfo.getTaskNameWithSubtasks();
        this.jobConfiguration = (Configuration)Preconditions.checkNotNull((Object)tdd.getJobConfiguration());
        this.taskConfiguration = (Configuration)Preconditions.checkNotNull((Object)tdd.getTaskConfiguration());
        this.requiredJarFiles = (List)Preconditions.checkNotNull(tdd.getRequiredJarFiles());
        this.requiredClasspaths = (List)Preconditions.checkNotNull(tdd.getRequiredClasspaths());
        this.nameOfInvokableClass = (String)Preconditions.checkNotNull((Object)tdd.getInvokableClassName());
        this.operatorState = tdd.getOperatorState();
        this.serializedExecutionConfig = (SerializedValue)Preconditions.checkNotNull(tdd.getSerializedExecutionConfig());
        this.taskCancellationInterval = this.jobConfiguration.getLong("task.cancellation-interval", 30000L);
        this.memoryManager = (MemoryManager)Preconditions.checkNotNull((Object)memManager);
        this.ioManager = (IOManager)Preconditions.checkNotNull((Object)ioManager);
        this.broadcastVariableManager = (BroadcastVariableManager)Preconditions.checkNotNull((Object)bcVarManager);
        this.accumulatorRegistry = new AccumulatorRegistry(this.jobId, this.executionId);
        this.jobManager = (ActorGateway)Preconditions.checkNotNull((Object)jobManagerActor);
        this.taskManager = (ActorGateway)Preconditions.checkNotNull((Object)taskManagerActor);
        this.actorAskTimeout = (FiniteDuration)Preconditions.checkNotNull((Object)actorAskTimeout);
        this.libraryCache = (LibraryCacheManager)Preconditions.checkNotNull((Object)libraryCache);
        this.fileCache = (FileCache)Preconditions.checkNotNull((Object)fileCache);
        this.network = (NetworkEnvironment)Preconditions.checkNotNull((Object)networkEnvironment);
        this.taskManagerConfig = (TaskManagerRuntimeInfo)Preconditions.checkNotNull((Object)taskManagerConfig);
        this.executionListenerActors = new CopyOnWriteArrayList<ActorGateway>();
        this.metrics = metricGroup;
        String taskNameWithSubtaskAndId = this.taskNameWithSubtask + " (" + (Object)((Object)this.executionId) + ')';
        List<ResultPartitionDeploymentDescriptor> partitions = tdd.getProducedPartitions();
        List<InputGateDeploymentDescriptor> consumedPartitions = tdd.getInputGates();
        this.producedPartitions = new ResultPartition[partitions.size()];
        this.writers = new ResultPartitionWriter[partitions.size()];
        for (i = 0; i < this.producedPartitions.length; ++i) {
            ResultPartitionDeploymentDescriptor desc = partitions.get(i);
            ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), this.executionId);
            this.producedPartitions[i] = new ResultPartition(taskNameWithSubtaskAndId, this.jobId, partitionId, desc.getPartitionType(), desc.getEagerlyDeployConsumers(), desc.getNumberOfSubpartitions(), networkEnvironment.getPartitionManager(), networkEnvironment.getPartitionConsumableNotifier(), ioManager, networkEnvironment.getDefaultIOMode());
            this.writers[i] = new ResultPartitionWriter(this.producedPartitions[i]);
        }
        this.inputGates = new SingleInputGate[consumedPartitions.size()];
        this.inputGatesById = new HashMap<IntermediateDataSetID, SingleInputGate>();
        for (i = 0; i < this.inputGates.length; ++i) {
            SingleInputGate gate;
            this.inputGates[i] = gate = SingleInputGate.create(taskNameWithSubtaskAndId, this.jobId, this.executionId, consumedPartitions.get(i), networkEnvironment, metricGroup.getIOMetricGroup());
            this.inputGatesById.put(gate.getConsumedResultId(), gate);
        }
        this.invokableHasBeenCanceled = new AtomicBoolean(false);
        this.executingThread = new Thread(TASK_THREADS_GROUP, this, this.taskNameWithSubtask);
    }

    public JobID getJobID() {
        return this.jobId;
    }

    public JobVertexID getJobVertexId() {
        return this.vertexId;
    }

    public ExecutionAttemptID getExecutionId() {
        return this.executionId;
    }

    public TaskInfo getTaskInfo() {
        return this.taskInfo;
    }

    public Configuration getJobConfiguration() {
        return this.jobConfiguration;
    }

    public Configuration getTaskConfiguration() {
        return this.taskConfiguration;
    }

    public ResultPartitionWriter[] getAllWriters() {
        return this.writers;
    }

    public SingleInputGate[] getAllInputGates() {
        return this.inputGates;
    }

    public ResultPartition[] getProducedPartitions() {
        return this.producedPartitions;
    }

    public SingleInputGate getInputGateById(IntermediateDataSetID id) {
        return this.inputGatesById.get((Object)id);
    }

    public AccumulatorRegistry getAccumulatorRegistry() {
        return this.accumulatorRegistry;
    }

    public Thread getExecutingThread() {
        return this.executingThread;
    }

    public ExecutionState getExecutionState() {
        return this.executionState;
    }

    public boolean isCanceledOrFailed() {
        return this.executionState == ExecutionState.CANCELING || this.executionState == ExecutionState.CANCELED || this.executionState == ExecutionState.FAILED;
    }

    public Throwable getFailureCause() {
        return this.failureCause;
    }

    public void startTaskThread() {
        this.executingThread.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Unable to fully structure code
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void run() {
        while (true) {
            block45: {
                if ((current = this.executionState) != ExecutionState.CREATED) break block45;
                if (!Task.STATE_UPDATER.compareAndSet(this, ExecutionState.CREATED, ExecutionState.DEPLOYING)) continue;
                distributedCacheEntries = new HashMap<String, Future<Path>>();
                invokable = null;
                ** try [egrp 0[TRYBLOCK] [4, 8 : 95->759)] { 
lbl7:
                // 1 sources

                ** GOTO lbl-1000
            }
            if (current == ExecutionState.FAILED) {
                this.notifyFinalState();
                return;
            }
            if (current != ExecutionState.CANCELING) throw new IllegalStateException("Invalid state for beginning of task operation");
            if (Task.STATE_UPDATER.compareAndSet(this, ExecutionState.CANCELING, ExecutionState.CANCELED)) break;
        }
        this.notifyFinalState();
        return;
lbl-1000:
        // 1 sources

        {
            Task.LOG.info("Loading JAR files for task " + this.taskNameWithSubtask);
            userCodeClassLoader = this.createUserCodeClassloader(this.libraryCache);
            executionConfig = (ExecutionConfig)this.serializedExecutionConfig.deserializeValue(userCodeClassLoader);
            if (executionConfig.getTaskCancellationInterval() >= 0L) {
                this.taskCancellationInterval = executionConfig.getTaskCancellationInterval();
            }
            invokable = this.loadAndInstantiateInvokable(userCodeClassLoader, this.nameOfInvokableClass);
            if (this.isCanceledOrFailed()) {
                throw new CancelTaskException();
            }
            Task.LOG.info("Registering task at network: " + this);
            this.network.registerTask(this);
            try {
                for (Map.Entry entry : DistributedCache.readFileInfoFromConfig((Configuration)this.jobConfiguration)) {
                    Task.LOG.info("Obtaining local cache file for '" + (String)entry.getKey() + '\'');
                    cp = this.fileCache.createTmpFile((String)entry.getKey(), (DistributedCache.DistributedCacheEntry)entry.getValue(), this.jobId);
                    distributedCacheEntries.put((String)entry.getKey(), cp);
                }
            }
            catch (Exception e) {
                throw new Exception("Exception while adding files to distributed cache.", e);
            }
            if (this.isCanceledOrFailed()) {
                throw new CancelTaskException();
            }
            splitProvider = new TaskInputSplitProvider(this.jobManager, this.jobId, this.vertexId, this.executionId, userCodeClassLoader, this.actorAskTimeout);
            env = new RuntimeEnvironment(this.jobId, this.vertexId, this.executionId, executionConfig, this.taskInfo, this.jobConfiguration, this.taskConfiguration, userCodeClassLoader, this.memoryManager, this.ioManager, this.broadcastVariableManager, this.accumulatorRegistry, splitProvider, distributedCacheEntries, this.writers, this.inputGates, this.jobManager, this.taskManagerConfig, this.metrics, this);
            invokable.setEnvironment(env);
            operatorState = this.operatorState;
            if (operatorState != null) {
                if (invokable instanceof StatefulTask == false) throw new IllegalStateException("Found operator state for a non-stateful task invokable");
                try {
                    state = (StateHandle)operatorState.deserializeValue(userCodeClassLoader);
                    op = (StatefulTask)invokable;
                    StateUtils.setOperatorState(op, state);
                }
                catch (Exception e) {
                    throw new RuntimeException("Failed to deserialize state handle and setup initial operator state.", e);
                }
            }
            operatorState = null;
            this.operatorState = null;
            this.invokable = invokable;
            if (!Task.STATE_UPDATER.compareAndSet(this, ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }
            this.notifyObservers(ExecutionState.RUNNING, null);
            this.taskManager.tell(new TaskMessages.UpdateTaskExecutionState(new TaskExecutionState(this.jobId, this.executionId, ExecutionState.RUNNING)));
            this.executingThread.setContextClassLoader(userCodeClassLoader);
            invokable.invoke();
            if (this.isCanceledOrFailed()) {
                throw new CancelTaskException();
            }
            for (ResultPartition partition : this.producedPartitions) {
                if (partition == null) continue;
                partition.finish();
            }
            if (Task.STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, ExecutionState.FINISHED) == false) throw new CancelTaskException();
            this.notifyObservers(ExecutionState.FINISHED, null);
        }
        try {
            Task.LOG.info("Freeing task resources for " + this.taskNameWithSubtask);
            dispatcher = this.asyncCallDispatcher;
            if (dispatcher != null && !dispatcher.isShutdown()) {
                dispatcher.shutdownNow();
            }
            this.network.unregisterTask(this);
            if (invokable != null) {
                this.memoryManager.releaseAll(invokable);
            }
            this.libraryCache.unregisterTask(this.jobId, this.executionId);
            this.removeCachedFiles(distributedCacheEntries, this.fileCache);
            this.notifyFinalState();
        }
        catch (Throwable t) {
            message = "FATAL - exception in task resource cleanup";
            Task.LOG.error(message, t);
            this.notifyFatalError(message, t);
        }
        try {
            this.metrics.close();
            return;
        }
        catch (Throwable t) {
            Task.LOG.error("Error during metrics de-registration", t);
            return;
        }
lbl92:
        // 1 sources

        catch (Throwable t) {
            block44: {
                try {
                    try {}
                    catch (Throwable tt) {
                        message = "FATAL - exception in task exception handler";
                        Task.LOG.error(message, tt);
                        this.notifyFatalError(message, tt);
                        break block44;
                    }
                }
lbl101:
                // 2 sources

                catch (Throwable var12_24) {
                    try {
                        Task.LOG.info("Freeing task resources for " + this.taskNameWithSubtask);
                        dispatcher = this.asyncCallDispatcher;
                        if (dispatcher != null && !dispatcher.isShutdown()) {
                            dispatcher.shutdownNow();
                        }
                        this.network.unregisterTask(this);
                        if (invokable != null) {
                            this.memoryManager.releaseAll(invokable);
                        }
                        this.libraryCache.unregisterTask(this.jobId, this.executionId);
                        this.removeCachedFiles(distributedCacheEntries, this.fileCache);
                        this.notifyFinalState();
                    }
                    catch (Throwable t) {
                        message = "FATAL - exception in task resource cleanup";
                        Task.LOG.error(message, t);
                        this.notifyFatalError(message, t);
                    }
                    try {
                        this.metrics.close();
                        throw var12_24;
                    }
                    catch (Throwable t) {
                        Task.LOG.error("Error during metrics de-registration", t);
                    }
                    throw var12_24;
                }
                while (true) {
                    if ((current = this.executionState) == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
                        if (t instanceof CancelTaskException) {
                            if (!Task.STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) continue;
                            this.cancelInvokable();
                            this.notifyObservers(ExecutionState.CANCELED, null);
                            break block44;
                        } else {
                            if (!Task.STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) continue;
                            Task.LOG.error("Task execution failed. ", t);
                            this.failureCause = t;
                            this.cancelInvokable();
                            this.notifyObservers(ExecutionState.FAILED, t);
                        }
                        break block44;
                    }
                    if (current == ExecutionState.CANCELING) {
                        if (!Task.STATE_UPDATER.compareAndSet(this, current, ExecutionState.CANCELED)) continue;
                        this.notifyObservers(ExecutionState.CANCELED, null);
                        break block44;
                    }
                    if (current == ExecutionState.FAILED) break block44;
                    if (Task.STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) break;
                }
                Task.LOG.error("Unexpected state in Task during an exception: " + (Object)current);
            }
            try {
                Task.LOG.info("Freeing task resources for " + this.taskNameWithSubtask);
                dispatcher = this.asyncCallDispatcher;
                if (dispatcher != null && !dispatcher.isShutdown()) {
                    dispatcher.shutdownNow();
                }
                this.network.unregisterTask(this);
                if (invokable != null) {
                    this.memoryManager.releaseAll(invokable);
                }
                this.libraryCache.unregisterTask(this.jobId, this.executionId);
                this.removeCachedFiles(distributedCacheEntries, this.fileCache);
                this.notifyFinalState();
            }
            catch (Throwable t) {
                message = "FATAL - exception in task resource cleanup";
                Task.LOG.error(message, t);
                this.notifyFatalError(message, t);
            }
            try {
                this.metrics.close();
                return;
            }
            catch (Throwable t) {
                Task.LOG.error("Error during metrics de-registration", t);
                return;
            }
        }
    }

    private ClassLoader createUserCodeClassloader(LibraryCacheManager libraryCache) throws Exception {
        long startDownloadTime = System.currentTimeMillis();
        libraryCache.registerTask(this.jobId, this.executionId, this.requiredJarFiles, this.requiredClasspaths);
        LOG.debug("Register task {} at library cache manager took {} milliseconds", (Object)this.executionId, (Object)(System.currentTimeMillis() - startDownloadTime));
        ClassLoader userCodeClassLoader = libraryCache.getClassLoader(this.jobId);
        if (userCodeClassLoader == null) {
            throw new Exception("No user code classloader available.");
        }
        return userCodeClassLoader;
    }

    private AbstractInvokable loadAndInstantiateInvokable(ClassLoader classLoader, String className) throws Exception {
        Class<AbstractInvokable> invokableClass;
        try {
            invokableClass = Class.forName(className, true, classLoader).asSubclass(AbstractInvokable.class);
        }
        catch (Throwable t) {
            throw new Exception("Could not load the task's invokable class.", t);
        }
        try {
            return invokableClass.newInstance();
        }
        catch (Throwable t) {
            throw new Exception("Could not instantiate the task's invokable class.", t);
        }
    }

    private void removeCachedFiles(Map<String, Future<Path>> entries, FileCache fileCache) {
        try {
            for (Map.Entry<String, Future<Path>> entry : entries.entrySet()) {
                String name = entry.getKey();
                try {
                    fileCache.deleteTmpFile(name, this.jobId);
                }
                catch (Exception e) {
                    LOG.error("Distributed Cache could not remove cached file registered under '" + name + "'.", (Throwable)e);
                }
            }
        }
        catch (Throwable t) {
            LOG.error("Error while removing cached local files from distributed cache.");
        }
    }

    private void notifyFinalState() {
        this.taskManager.tell(new TaskMessages.TaskInFinalState(this.executionId));
    }

    private void notifyFatalError(String message, Throwable cause) {
        this.taskManager.tell(new TaskManagerMessages.FatalError(message, cause));
    }

    public void stopExecution() throws UnsupportedOperationException {
        LOG.info("Attempting to stop task " + this.taskNameWithSubtask);
        if (!(this.invokable instanceof StoppableTask)) {
            throw new UnsupportedOperationException("Stopping not supported by this task.");
        }
        Runnable runnable = new Runnable(){

            @Override
            public void run() {
                try {
                    ((StoppableTask)((Object)Task.this.invokable)).stop();
                }
                catch (RuntimeException e) {
                    LOG.error("Stopping task " + Task.this.taskNameWithSubtask + " failed.", (Throwable)e);
                    Task.this.taskManager.tell(new TaskMessages.FailTask(Task.this.executionId, e));
                }
            }
        };
        this.executeAsyncCallRunnable(runnable, "Stopping source task " + this.taskNameWithSubtask);
    }

    public void cancelExecution() {
        LOG.info("Attempting to cancel task " + this.taskNameWithSubtask);
        this.cancelOrFailAndCancelInvokable(ExecutionState.CANCELING, null);
    }

    public void failExternally(Throwable cause) {
        LOG.info("Attempting to fail task externally " + this.taskNameWithSubtask);
        this.cancelOrFailAndCancelInvokable(ExecutionState.FAILED, cause);
    }

    private void cancelOrFailAndCancelInvokable(ExecutionState targetState, Throwable cause) {
        ExecutionState current;
        block4: {
            while (true) {
                if ((current = this.executionState).isTerminal() || current == ExecutionState.CANCELING) {
                    LOG.info("Task " + this.taskNameWithSubtask + " is already in state " + (Object)((Object)current));
                    return;
                }
                if (current == ExecutionState.DEPLOYING || current == ExecutionState.CREATED) {
                    if (!STATE_UPDATER.compareAndSet(this, current, targetState)) continue;
                    this.failureCause = cause;
                    this.notifyObservers(targetState, cause);
                    return;
                }
                if (current != ExecutionState.RUNNING) break block4;
                if (STATE_UPDATER.compareAndSet(this, ExecutionState.RUNNING, targetState)) break;
            }
            if (this.invokable != null && this.invokableHasBeenCanceled.compareAndSet(false, true)) {
                this.failureCause = cause;
                this.notifyObservers(targetState, cause);
                LOG.info("Triggering cancellation of task code {} ({}).", (Object)this.taskNameWithSubtask, (Object)this.executionId);
                TaskCanceler canceler = new TaskCanceler(LOG, this.invokable, this.executingThread, this.taskNameWithSubtask, this.taskCancellationInterval, this.producedPartitions, this.inputGates);
                Thread cancelThread = new Thread(this.executingThread.getThreadGroup(), canceler, "Canceler for " + this.taskNameWithSubtask);
                cancelThread.setDaemon(true);
                cancelThread.start();
            }
            return;
        }
        throw new IllegalStateException("Unexpected task state: " + (Object)((Object)current));
    }

    public void registerExecutionListener(ActorGateway listener) {
        this.executionListenerActors.add(listener);
    }

    private void notifyObservers(ExecutionState newState, Throwable error) {
        if (error == null) {
            LOG.info(this.taskNameWithSubtask + " switched to " + (Object)((Object)newState));
        } else {
            LOG.info(this.taskNameWithSubtask + " switched to " + (Object)((Object)newState) + " with exception.", error);
        }
        TaskExecutionState stateUpdate = new TaskExecutionState(this.jobId, this.executionId, newState, error);
        TaskMessages.UpdateTaskExecutionState actorMessage = new TaskMessages.UpdateTaskExecutionState(stateUpdate);
        for (ActorGateway listener : this.executionListenerActors) {
            listener.tell(actorMessage);
        }
    }

    public void triggerCheckpointBarrier(final long checkpointID, final long checkpointTimestamp) {
        AbstractInvokable invokable = this.invokable;
        if (this.executionState == ExecutionState.RUNNING && invokable != null) {
            if (invokable instanceof StatefulTask) {
                final StatefulTask statefulTask = (StatefulTask)((Object)invokable);
                final String taskName = this.taskNameWithSubtask;
                Runnable runnable = new Runnable(){

                    @Override
                    public void run() {
                        block3: {
                            try {
                                boolean success = statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp);
                                if (!success) {
                                    DeclineCheckpoint decline = new DeclineCheckpoint(Task.this.jobId, Task.this.getExecutionId(), checkpointID, checkpointTimestamp);
                                    Task.this.jobManager.tell(decline);
                                }
                            }
                            catch (Throwable t) {
                                if (Task.this.getExecutionState() != ExecutionState.RUNNING) break block3;
                                Task.this.failExternally(new RuntimeException("Error while triggering checkpoint for " + taskName, t));
                            }
                        }
                    }
                };
                this.executeAsyncCallRunnable(runnable, "Checkpoint Trigger for " + taskName);
            } else {
                LOG.error("Task received a checkpoint request, but is not a checkpointing task - " + this.taskNameWithSubtask);
            }
        } else {
            LOG.debug("Ignoring request to trigger a checkpoint for non-running task.");
        }
    }

    public void notifyCheckpointComplete(final long checkpointID) {
        AbstractInvokable invokable = this.invokable;
        if (this.executionState == ExecutionState.RUNNING && invokable != null) {
            if (invokable instanceof StatefulTask) {
                final StatefulTask statefulTask = (StatefulTask)((Object)invokable);
                String taskName = this.taskNameWithSubtask;
                Runnable runnable = new Runnable(){

                    @Override
                    public void run() {
                        block2: {
                            try {
                                statefulTask.notifyCheckpointComplete(checkpointID);
                            }
                            catch (Throwable t) {
                                if (Task.this.getExecutionState() != ExecutionState.RUNNING) break block2;
                                Task.this.failExternally(new RuntimeException("Error while confirming checkpoint", t));
                            }
                        }
                    }
                };
                this.executeAsyncCallRunnable(runnable, "Checkpoint Confirmation for " + taskName);
            } else {
                LOG.error("Task received a checkpoint commit notification, but is not a checkpoint committing task - " + this.taskNameWithSubtask);
            }
        } else {
            LOG.debug("Ignoring checkpoint commit notification for non-running task.");
        }
    }

    public void onPartitionStateUpdate(IntermediateDataSetID resultId, IntermediateResultPartitionID partitionId, ExecutionState partitionState) throws IOException, InterruptedException {
        if (this.executionState == ExecutionState.RUNNING) {
            SingleInputGate inputGate = this.inputGatesById.get((Object)resultId);
            if (inputGate != null) {
                if (partitionState == ExecutionState.RUNNING) {
                    inputGate.retriggerPartitionRequest(partitionId);
                } else if (partitionState == ExecutionState.CANCELED || partitionState == ExecutionState.CANCELING || partitionState == ExecutionState.FAILED) {
                    this.cancelExecution();
                } else {
                    this.failExternally(new IllegalStateException("Received unexpected partition state " + (Object)((Object)partitionState) + " for partition request. This is a bug."));
                }
            } else {
                this.failExternally(new IllegalStateException("Received partition state for unknown input gate " + (Object)((Object)resultId) + ". This is a bug."));
            }
        } else {
            LOG.debug("Ignoring partition state notification for not running task.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void executeAsyncCallRunnable(Runnable runnable, String callName) {
        Task task = this;
        synchronized (task) {
            block8: {
                if (this.executionState != ExecutionState.RUNNING) {
                    return;
                }
                ExecutorService executor = this.asyncCallDispatcher;
                if (executor == null) {
                    this.asyncCallDispatcher = executor = Executors.newSingleThreadExecutor(new DispatcherThreadFactory(TASK_THREADS_GROUP, "Async calls on " + this.taskNameWithSubtask));
                    if (this.executionState != ExecutionState.RUNNING) {
                        executor.shutdown();
                        this.asyncCallDispatcher = null;
                        return;
                    }
                }
                LOG.debug("Invoking async call {} on task {}", (Object)callName, (Object)this.taskNameWithSubtask);
                try {
                    executor.submit(runnable);
                }
                catch (RejectedExecutionException e) {
                    if (this.executionState != ExecutionState.RUNNING) break block8;
                    throw new RuntimeException("Async call was rejected, even though the task is running.", e);
                }
            }
        }
    }

    private void cancelInvokable() {
        if (this.invokable != null && this.invokable != null && this.invokableHasBeenCanceled.compareAndSet(false, true)) {
            try {
                this.invokable.cancel();
            }
            catch (Throwable t) {
                LOG.error("Error while canceling task " + this.taskNameWithSubtask, t);
            }
        }
    }

    public String toString() {
        return this.taskNameWithSubtask + " [" + (Object)((Object)this.executionState) + ']';
    }

    private static class TaskCanceler
    implements Runnable {
        private final Logger logger;
        private final AbstractInvokable invokable;
        private final Thread executer;
        private final String taskName;
        private final long taskCancellationIntervalMillis;
        private final ResultPartition[] producedPartitions;
        private final SingleInputGate[] inputGates;

        public TaskCanceler(Logger logger, AbstractInvokable invokable, Thread executer, String taskName, long cancelationInterval, ResultPartition[] producedPartitions, SingleInputGate[] inputGates) {
            this.logger = logger;
            this.invokable = invokable;
            this.executer = executer;
            this.taskName = taskName;
            this.taskCancellationIntervalMillis = cancelationInterval;
            this.producedPartitions = producedPartitions;
            this.inputGates = inputGates;
        }

        @Override
        public void run() {
            try {
                try {
                    this.invokable.cancel();
                }
                catch (Throwable t) {
                    this.logger.error("Error while canceling the task", t);
                }
                for (ResultPartition partition : this.producedPartitions) {
                    try {
                        partition.destroyBufferPool();
                    }
                    catch (Throwable t) {
                        LOG.error("Failed to release result partition buffer pool.", t);
                    }
                }
                for (SingleInputGate inputGate : this.inputGates) {
                    try {
                        inputGate.releaseAllResources();
                    }
                    catch (Throwable t) {
                        LOG.error("Failed to release input gate.", t);
                    }
                }
                this.executer.interrupt();
                try {
                    this.executer.join(this.taskCancellationIntervalMillis);
                }
                catch (InterruptedException t) {
                    // empty catch block
                }
                while (this.executer.isAlive()) {
                    StackTraceElement[] stack;
                    StringBuilder bld = new StringBuilder();
                    for (StackTraceElement e : stack = this.executer.getStackTrace()) {
                        bld.append(e).append('\n');
                    }
                    this.logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}", (Object)this.taskName, (Object)bld.toString());
                    this.executer.interrupt();
                    try {
                        this.executer.join(this.taskCancellationIntervalMillis);
                    }
                    catch (InterruptedException interruptedException) {}
                }
            }
            catch (Throwable t) {
                this.logger.error("Error in the task canceler", t);
            }
        }
    }
}

