/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.ListAccumulator;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.CoordinatorEventsExactlyOnceITCase;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
extends CoordinatorEventsExactlyOnceITCase {
    private static final int NUM_EVENTS = 100;
    private static final int DELAY = 10;
    private StreamExecutionEnvironment env;

    CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase() {
    }

    @BeforeEach
    void setup() {
        this.env = StreamExecutionEnvironment.getExecutionEnvironment();
        this.env.setParallelism(1);
        this.env.enableCheckpointing(100L);
        ManuallyClosedSourceFunction.shouldCloseSource = false;
        EventReceivingOperator.shouldUnblockAllCheckpoint = false;
        EventReceivingOperator.shouldUnblockNextCheckpoint = false;
        CoordinatorEventsExactlyOnceITCase.TestScript.reset();
    }

    @Test
    void testCheckpoint() throws Exception {
        this.executeAndVerifyResults(this.env, new EventReceivingOperatorFactory<Long, Long>("eventReceiving", 100, 10));
    }

    @Test
    void testUnalignedCheckpoint() throws Exception {
        this.env.getCheckpointConfig().enableUnalignedCheckpoints();
        this.executeAndVerifyResults(this.env, new EventReceivingOperatorFactory<Long, Long>("eventReceiving", 100, 10));
    }

    @Test
    void testCheckpointWithSubtaskFailure() throws Exception {
        this.executeAndVerifyResults(this.env, new EventReceivingOperatorWithFailureFactory<Long, Long>("eventReceivingWithFailure", 100, 10));
        Assertions.assertThat((boolean)CoordinatorEventsExactlyOnceITCase.TestScript.getForOperator((String)"eventReceivingWithFailure-subtask0").hasAlreadyFailed()).isTrue();
    }

    @Test
    void testUnalignedCheckpointWithSubtaskFailure() throws Exception {
        this.env.getCheckpointConfig().enableUnalignedCheckpoints();
        this.executeAndVerifyResults(this.env, new EventReceivingOperatorWithFailureFactory<Long, Long>("eventReceivingWithFailure", 100, 10));
        Assertions.assertThat((boolean)CoordinatorEventsExactlyOnceITCase.TestScript.getForOperator((String)"eventReceivingWithFailure-subtask0").hasAlreadyFailed()).isTrue();
    }

    private void executeAndVerifyResults(StreamExecutionEnvironment env, EventReceivingOperatorFactory<Long, Long> factory) throws Exception {
        env.addSource(new ManuallyClosedSourceFunction(), TypeInformation.of(Long.class)).disableChaining().transform(factory.name, TypeInformation.of(Long.class), factory).sinkTo((Sink)new DiscardingSink());
        JobExecutionResult executionResult = MINI_CLUSTER.getMiniCluster().executeJobBlocking(env.getStreamGraph().getJobGraph());
        List receivedInts = (List)executionResult.getAccumulatorResult("receivedIntegers");
        CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.checkListContainsSequence((List)receivedInts, (int)100);
    }

    private static class EventReceivingOperatorWithFailure<T>
    extends EventReceivingOperator<T> {
        private final String name;
        private final int maxNumberBeforeFailure;
        private CoordinatorEventsExactlyOnceITCase.TestScript testScript;

        private EventReceivingOperatorWithFailure(String name, int numEvents) {
            this.name = name;
            this.maxNumberBeforeFailure = numEvents / 3 + new Random().nextInt(numEvents / 6);
        }

        public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<T>> output) {
            super.setup(containingTask, config, output);
            Preconditions.checkState((containingTask.getIndexInSubtaskGroup() == 0 ? 1 : 0) != 0);
            this.testScript = CoordinatorEventsExactlyOnceITCase.TestScript.getForOperator((String)(this.name + "-subtask0"));
        }

        @Override
        public void handleOperatorEvent(OperatorEvent evt) {
            if (evt instanceof CoordinatorEventsExactlyOnceITCase.IntegerEvent) {
                if (((CoordinatorEventsExactlyOnceITCase.IntegerEvent)evt).value > this.maxNumberBeforeFailure && !this.testScript.hasAlreadyFailed()) {
                    this.testScript.recordHasFailed();
                    throw new RuntimeException();
                }
                this.accumulator.add((Object)((CoordinatorEventsExactlyOnceITCase.IntegerEvent)evt).value);
            } else if (evt instanceof CoordinatorEventsExactlyOnceITCase.EndEvent) {
                try {
                    this.state.update((List)this.accumulator.getLocalValue());
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
                if (this.testScript.hasAlreadyFailed()) {
                    ManuallyClosedSourceFunction.shouldCloseSource = true;
                }
            } else {
                throw new UnsupportedOperationException();
            }
        }

        @Override
        protected void sendStartEvent() throws IOException {
            int lastValue = -1;
            ArrayList list = this.accumulator.getLocalValue();
            if (!list.isEmpty()) {
                lastValue = (Integer)list.get(list.size() - 1);
            }
            this.getContainingTask().getEnvironment().getOperatorCoordinatorEventGateway().sendOperatorEventToCoordinator(this.getOperatorID(), new SerializedValue((Object)new CoordinatorEventsExactlyOnceITCase.StartEvent(lastValue)));
        }
    }

    private static class EventReceivingOperatorWithFailureFactory<IN, OUT>
    extends EventReceivingOperatorFactory<IN, OUT> {
        public EventReceivingOperatorWithFailureFactory(String name, int numEvents, int delay) {
            super(name, numEvents, delay);
        }

        @Override
        public <T extends StreamOperator<OUT>> T createStreamOperator(StreamOperatorParameters<OUT> parameters) {
            EventReceivingOperatorWithFailure operator = new EventReceivingOperatorWithFailure(this.name, this.numEvents);
            operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
            parameters.getOperatorEventDispatcher().registerEventHandler(parameters.getStreamConfig().getOperatorID(), operator);
            return (T)((Object)operator);
        }
    }

    private static class EventReceivingOperator<T>
    extends AbstractStreamOperator<T>
    implements OneInputStreamOperator<T, T>,
    OperatorEventHandler {
        protected static final String ACCUMULATOR_NAME = "receivedIntegers";
        private static boolean shouldUnblockAllCheckpoint;
        private static boolean shouldUnblockNextCheckpoint;
        protected final ListAccumulator<Integer> accumulator = new ListAccumulator();
        protected ListState<Integer> state;

        private EventReceivingOperator() {
        }

        public void open() throws Exception {
            super.open();
            this.getRuntimeContext().addAccumulator(ACCUMULATOR_NAME, this.accumulator);
        }

        public void processElement(StreamRecord<T> element) throws Exception {
            throw new UnsupportedOperationException();
        }

        public void handleOperatorEvent(OperatorEvent evt) {
            if (evt instanceof CoordinatorEventsExactlyOnceITCase.IntegerEvent) {
                this.accumulator.add((Object)((CoordinatorEventsExactlyOnceITCase.IntegerEvent)evt).value);
            } else if (evt instanceof CoordinatorEventsExactlyOnceITCase.EndEvent) {
                try {
                    this.state.update((List)this.accumulator.getLocalValue());
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
                ManuallyClosedSourceFunction.shouldCloseSource = true;
            } else {
                throw new UnsupportedOperationException();
            }
        }

        public void snapshotState(StateSnapshotContext context) throws Exception {
            super.snapshotState(context);
            while (!shouldUnblockAllCheckpoint && !shouldUnblockNextCheckpoint) {
                Thread.sleep(100L);
            }
            if (shouldUnblockNextCheckpoint) {
                shouldUnblockNextCheckpoint = false;
            }
            this.state.update((List)this.accumulator.getLocalValue());
        }

        public void initializeState(StateInitializationContext context) throws Exception {
            super.initializeState(context);
            this.state = context.getOperatorStateStore().getListState(new ListStateDescriptor("accumulatorState", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO));
            this.accumulator.resetLocal();
            ((Iterable)this.state.get()).forEach(arg_0 -> this.accumulator.add(arg_0));
            this.sendStartEvent();
        }

        protected void sendStartEvent() throws IOException {
            this.getContainingTask().getEnvironment().getOperatorCoordinatorEventGateway().sendOperatorEventToCoordinator(this.getOperatorID(), new SerializedValue((Object)new CoordinatorEventsExactlyOnceITCase.StartEvent(-1)));
        }
    }

    private static class EventSendingCoordinatorWithGuaranteedCheckpoint
    extends CoordinatorEventsExactlyOnceITCase.EventSendingCoordinator {
        private final int maxNumberBeforeFirstCheckpoint;
        private boolean isEventSentAfterFirstCheckpoint;
        private boolean isCoordinatorFirstCheckpointCompleted;
        private boolean isJobFirstCheckpointCompleted;

        public EventSendingCoordinatorWithGuaranteedCheckpoint(OperatorCoordinator.Context context, String name, int numEvents, int delay) {
            super(context, name, numEvents, delay);
            this.maxNumberBeforeFirstCheckpoint = new Random().nextInt(numEvents / 6);
            this.isEventSentAfterFirstCheckpoint = false;
            this.isCoordinatorFirstCheckpointCompleted = false;
            this.isJobFirstCheckpointCompleted = false;
        }

        protected void sendNextEvent() {
            if (!this.isCoordinatorFirstCheckpointCompleted && this.nextNumber > this.maxNumberBeforeFirstCheckpoint) {
                return;
            }
            if (!this.isJobFirstCheckpointCompleted && this.nextNumber >= this.maxNumberBeforeFailure) {
                return;
            }
            super.sendNextEvent();
            if (!this.isEventSentAfterFirstCheckpoint && this.isCoordinatorFirstCheckpointCompleted) {
                this.isEventSentAfterFirstCheckpoint = true;
                EventReceivingOperator.shouldUnblockAllCheckpoint = true;
            }
        }

        protected void handleCheckpoint() {
            if (this.nextToComplete != null) {
                this.isCoordinatorFirstCheckpointCompleted = true;
            }
            super.handleCheckpoint();
            if (this.nextToComplete != null && this.isEventSentAfterFirstCheckpoint && !this.testScript.hasAlreadyFailed()) {
                this.testScript.recordHasFailed();
                this.context.failJob((Throwable)new Exception("test failure"));
            }
        }

        public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {
            super.resetToCheckpoint(checkpointId, checkpointData);
            this.runInMailbox(() -> {
                this.isCoordinatorFirstCheckpointCompleted = true;
                this.isJobFirstCheckpointCompleted = true;
            });
        }

        public void notifyCheckpointAborted(long checkpointId) {
            super.notifyCheckpointAborted(checkpointId);
            this.runInMailbox(() -> {
                if (!this.isJobFirstCheckpointCompleted) {
                    this.isCoordinatorFirstCheckpointCompleted = false;
                }
            });
        }

        public void notifyCheckpointComplete(long checkpointId) {
            super.notifyCheckpointComplete(checkpointId);
            this.runInMailbox(() -> {
                this.isJobFirstCheckpointCompleted = true;
            });
        }
    }

    private static class EventReceivingOperatorFactory<IN, OUT>
    extends AbstractStreamOperatorFactory<OUT>
    implements CoordinatedOperatorFactory<OUT>,
    OneInputStreamOperatorFactory<IN, OUT> {
        protected final String name;
        protected final int numEvents;
        protected final int delay;

        public EventReceivingOperatorFactory(String name, int numEvents, int delay) {
            this.name = name;
            this.numEvents = numEvents;
            this.delay = delay;
        }

        public OperatorCoordinator.Provider getCoordinatorProvider(String operatorName, final OperatorID operatorID) {
            return new OperatorCoordinator.Provider(){

                public OperatorID getOperatorId() {
                    return operatorID;
                }

                public OperatorCoordinator create(OperatorCoordinator.Context context) {
                    return new EventSendingCoordinatorWithGuaranteedCheckpoint(context, name, numEvents, delay);
                }
            };
        }

        public <T extends StreamOperator<OUT>> T createStreamOperator(StreamOperatorParameters<OUT> parameters) {
            EventReceivingOperator operator = new EventReceivingOperator();
            operator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
            parameters.getOperatorEventDispatcher().registerEventHandler(parameters.getStreamConfig().getOperatorID(), operator);
            return (T)((Object)operator);
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return EventReceivingOperator.class;
        }
    }

    private static class ManuallyClosedSourceFunction<T>
    implements SourceFunction<T> {
        private static boolean shouldCloseSource;

        private ManuallyClosedSourceFunction() {
        }

        public void run(SourceFunction.SourceContext<T> ctx) throws Exception {
            while (!shouldCloseSource) {
                Thread.sleep(100L);
            }
        }

        public void cancel() {
        }
    }
}

