/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil;
import org.apache.flink.streaming.runtime.operators.sink.SinkV1WriterCommittableSerializer;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

abstract class SinkWriterOperatorTestBase {
    SinkWriterOperatorTestBase() {
    }

    @Test
    void testNotEmitCommittablesWithoutCommitter() throws Exception {
        InspectableSink sink = this.sinkWithoutCommitter();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new SinkWriterOperatorFactory(sink.getSink()));
        testHarness.open();
        testHarness.processElement((Object)1, 1L);
        Assertions.assertThat((Collection)testHarness.getOutput()).isEmpty();
        Assertions.assertThat(sink.getRecordsOfCurrentCheckpoint()).containsOnly((Object[])new String[]{"(1,1,-9223372036854775808)"});
        testHarness.prepareSnapshotPreBarrier(1L);
        Assertions.assertThat((Collection)testHarness.getOutput()).isEmpty();
        Assertions.assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty();
        testHarness.close();
    }

    @Test
    void testWatermarkPropagatedToSinkWriter() throws Exception {
        long initialTime = 0L;
        InspectableSink sink = this.sinkWithoutCommitter();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new SinkWriterOperatorFactory(sink.getSink()));
        testHarness.open();
        testHarness.processWatermark(0L);
        testHarness.processWatermark(1L);
        Assertions.assertThat((Collection)testHarness.getOutput()).containsExactly(new Object[]{new Watermark(0L), new Watermark(1L)});
        Assertions.assertThat(sink.getWatermarks()).containsExactly((Object[])new org.apache.flink.api.common.eventtime.Watermark[]{new org.apache.flink.api.common.eventtime.Watermark(0L), new org.apache.flink.api.common.eventtime.Watermark(1L)});
        testHarness.close();
    }

    @Test
    void testTimeBasedBufferingSinkWriter() throws Exception {
        long initialTime = 0L;
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new SinkWriterOperatorFactory(this.sinkWithTimeBasedWriter().getSink()));
        testHarness.open();
        testHarness.setProcessingTime(0L);
        testHarness.processElement((Object)1, 1L);
        testHarness.processElement((Object)2, 2L);
        testHarness.prepareSnapshotPreBarrier(1L);
        SinkWriterOperatorTestBase.assertBasicOutput(testHarness.getOutput(), 0, 1L);
        testHarness.getOutput().poll();
        testHarness.getProcessingTimeService().setCurrentTime(2001L);
        testHarness.prepareSnapshotPreBarrier(2L);
        SinkWriterOperatorTestBase.assertBasicOutput(testHarness.getOutput(), 2, 2L);
        testHarness.close();
    }

    @Test
    void testEmitOnFlushWithCommitter() throws Exception {
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new SinkWriterOperatorFactory(this.sinkWithCommitter().getSink()));
        testHarness.open();
        Assertions.assertThat((Collection)testHarness.getOutput()).isEmpty();
        testHarness.processElement((Object)1, 1L);
        testHarness.processElement((Object)2, 2L);
        testHarness.prepareSnapshotPreBarrier(1L);
        SinkWriterOperatorTestBase.assertBasicOutput(testHarness.getOutput(), 2, 1L);
        testHarness.close();
    }

    @Test
    void testEmitOnEndOfInputInBatchMode() throws Exception {
        SinkWriterOperatorFactory writerOperatorFactory = new SinkWriterOperatorFactory(this.sinkWithCommitter().getSink());
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)writerOperatorFactory);
        testHarness.open();
        Assertions.assertThat((Collection)testHarness.getOutput()).isEmpty();
        testHarness.processElement((Object)1, 1L);
        testHarness.endInput();
        SinkWriterOperatorTestBase.assertBasicOutput(testHarness.getOutput(), 1, Long.MAX_VALUE);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testStateRestore(boolean stateful) throws Exception {
        long initialTime = 0L;
        InspectableSink sink = this.sinkWithState(stateful, null);
        Sink<Integer> sink2 = sink.getSink();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new SinkWriterOperatorFactory(sink2));
        testHarness.open();
        testHarness.processWatermark(0L);
        testHarness.processElement((Object)1, 1L);
        testHarness.processElement((Object)2, 2L);
        testHarness.prepareSnapshotPreBarrier(1L);
        OperatorSubtaskState snapshot = testHarness.snapshot(1L, 1L);
        Assertions.assertThat((int)sink.getRecordCountFromState()).isEqualTo(2);
        Assertions.assertThat((long)sink.getLastCheckpointId()).isEqualTo(stateful ? 1L : -1L);
        testHarness.close();
        InspectableSink restoredSink = this.sinkWithState(stateful, null);
        OneInputStreamOperatorTestHarness restoredTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new SinkWriterOperatorFactory(restoredSink.getSink()));
        restoredTestHarness.initializeState(snapshot);
        restoredTestHarness.open();
        Assertions.assertThat((int)restoredSink.getRecordCountFromState()).isEqualTo(stateful ? 2 : 0);
        restoredTestHarness.close();
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testLoadPreviousSinkState(boolean stateful) throws Exception {
        List<String> previousSinkInputs = Arrays.asList("bit", "mention", "thick", "stick", "stir", "easy", "sleep", "forth", "cost", "prompt");
        InspectableSink sink = this.sinkWithState(stateful, "compatible_sink_state");
        int expectedState = 5;
        OneInputStreamOperatorTestHarness previousSink = new OneInputStreamOperatorTestHarness(new CompatibleStateSinkOperator<Integer>((SimpleVersionedSerializer<Integer>)TestSinkV2.WRITER_SERIALIZER, expectedState), (TypeSerializer)StringSerializer.INSTANCE);
        OperatorSubtaskState previousSinkState = TestHarnessUtil.buildSubtaskState((OneInputStreamOperatorTestHarness)previousSink, previousSinkInputs);
        Sink<Integer> sink3 = sink.getSink();
        OneInputStreamOperatorTestHarness compatibleWriterOperator = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new SinkWriterOperatorFactory(sink3));
        compatibleWriterOperator.initializeState(previousSinkState);
        Assertions.assertThat((int)sink.getRecordCountFromState()).isEqualTo(stateful ? expectedState : 0);
        compatibleWriterOperator.prepareSnapshotPreBarrier(1L);
        OperatorSubtaskState snapshot = compatibleWriterOperator.snapshot(1L, 1L);
        compatibleWriterOperator.close();
        InspectableSink sink2 = this.sinkWithState(stateful, "compatible_sink_state");
        OneInputStreamOperatorTestHarness restoredSinkOperator = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new SinkWriterOperatorFactory(sink2.getSink()));
        restoredSinkOperator.initializeState(snapshot);
        Assertions.assertThat((int)sink.getRecordCountFromState()).isEqualTo(stateful ? expectedState : 0);
        restoredSinkOperator.close();
    }

    @Test
    void testRestoreCommitterState() throws Exception {
        List<String> committables = Arrays.asList("state1", "state2");
        InspectableSink sink = this.sinkWithCommitter();
        OneInputStreamOperatorTestHarness committer = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)new TestCommitterOperator((SimpleVersionedSerializer<String>)TestSinkV2.COMMITTABLE_SERIALIZER), (TypeSerializer)StringSerializer.INSTANCE);
        OperatorSubtaskState committerState = TestHarnessUtil.buildSubtaskState((OneInputStreamOperatorTestHarness)committer, committables);
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new SinkWriterOperatorFactory(sink.getSink()));
        testHarness.initializeState(committerState);
        testHarness.open();
        testHarness.prepareSnapshotPreBarrier(2L);
        List<StreamElement> output = SinkTestUtil.fromOutput(testHarness.getOutput());
        Assertions.assertThat(output).hasSize(4);
        ((ObjectAssert)Assertions.assertThat((Object)output.get(0).asRecord().getValue()).isInstanceOf(CommittableSummary.class)).satisfies(new ThrowingConsumer[]{cs -> SinkV2Assertions.assertThat((CommittableSummary)((CommittableSummary)cs)).hasPendingCommittables(committables.size()).hasCheckpointId(1L).hasOverallCommittables(committables.size()).hasFailedCommittables(0)});
        SinkWriterOperatorTestBase.assertRestoredCommitterCommittable(output.get(1).asRecord().getValue(), committables.get(0));
        SinkWriterOperatorTestBase.assertRestoredCommitterCommittable(output.get(2).asRecord().getValue(), committables.get(1));
        ((ObjectAssert)Assertions.assertThat((Object)output.get(3).asRecord().getValue()).isInstanceOf(CommittableSummary.class)).satisfies(new ThrowingConsumer[]{cs -> SinkV2Assertions.assertThat((CommittableSummary)((CommittableSummary)cs)).hasPendingCommittables(0).hasCheckpointId(2L).hasOverallCommittables(0).hasFailedCommittables(0)});
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Exception {
        InspectableSink sink = this.sinkWithCommitter();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new SinkWriterOperatorFactory(sink.getSink()));
        testHarness.open();
        testHarness.processElement((Object)1, 1L);
        Assertions.assertThat((Collection)testHarness.getOutput()).isEmpty();
        String record = "(1,1,-9223372036854775808)";
        Assertions.assertThat(sink.getRecordsOfCurrentCheckpoint()).containsOnly((Object[])new String[]{"(1,1,-9223372036854775808)"});
        testHarness.endInput();
        if (isCheckpointingEnabled) {
            testHarness.prepareSnapshotPreBarrier(1L);
        }
        SinkWriterOperatorTestBase.assertEmitted(Collections.singletonList("(1,1,-9223372036854775808)"), testHarness.getOutput());
        Assertions.assertThat(sink.getRecordsOfCurrentCheckpoint()).isEmpty();
        testHarness.close();
    }

    @Test
    void testInitContext() throws Exception {
        AtomicReference initContext = new AtomicReference();
        Sink & Serializable sink = (Sink & Serializable)context -> {
            initContext.set(context);
            return null;
        };
        boolean subtaskId = true;
        int parallelism = 10;
        StringSerializer typeSerializer = StringSerializer.INSTANCE;
        JobID jobID = new JobID();
        MockEnvironment environment = MockEnvironment.builder().setSubtaskIndex(1).setParallelism(10).setMaxParallelism(10).setJobID(jobID).setExecutionConfig(new ExecutionConfig().enableObjectReuse()).build();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new SinkWriterOperatorFactory((Sink)sink), (TypeSerializer)typeSerializer, environment);
        testHarness.open();
        Assertions.assertThat((Object)((WriterInitContext)initContext.get()).getUserCodeClassLoader()).isNotNull();
        Assertions.assertThat((Object)((WriterInitContext)initContext.get()).getMailboxExecutor()).isNotNull();
        Assertions.assertThat((Object)((WriterInitContext)initContext.get()).getProcessingTimeService()).isNotNull();
        Assertions.assertThat((int)((WriterInitContext)initContext.get()).getTaskInfo().getIndexOfThisSubtask()).isEqualTo(1);
        Assertions.assertThat((int)((WriterInitContext)initContext.get()).getTaskInfo().getNumberOfParallelSubtasks()).isEqualTo(10);
        Assertions.assertThat((int)((WriterInitContext)initContext.get()).getTaskInfo().getAttemptNumber()).isZero();
        Assertions.assertThat((Object)((WriterInitContext)initContext.get()).metricGroup()).isNotNull();
        Assertions.assertThat((OptionalLong)((WriterInitContext)initContext.get()).getRestoredCheckpointId()).isNotPresent();
        Assertions.assertThat((boolean)((WriterInitContext)initContext.get()).isObjectReuseEnabled()).isTrue();
        Assertions.assertThat((Object)((WriterInitContext)initContext.get()).createInputSerializer()).isEqualTo((Object)typeSerializer);
        Assertions.assertThat((Comparable)((WriterInitContext)initContext.get()).getJobInfo().getJobId()).isEqualTo((Object)jobID);
        testHarness.close();
    }

    private static void assertContextsEqual(WriterInitContext initContext, WriterInitContext original) {
        Assertions.assertThat((Object)initContext.getUserCodeClassLoader().asClassLoader()).isEqualTo((Object)original.getUserCodeClassLoader().asClassLoader());
        Assertions.assertThat((Object)initContext.getMailboxExecutor()).isEqualTo((Object)original.getMailboxExecutor());
        Assertions.assertThat((Object)initContext.getProcessingTimeService()).isEqualTo((Object)original.getProcessingTimeService());
        Assertions.assertThat((int)initContext.getTaskInfo().getIndexOfThisSubtask()).isEqualTo(original.getTaskInfo().getIndexOfThisSubtask());
        Assertions.assertThat((int)initContext.getTaskInfo().getNumberOfParallelSubtasks()).isEqualTo(original.getTaskInfo().getNumberOfParallelSubtasks());
        Assertions.assertThat((int)initContext.getTaskInfo().getAttemptNumber()).isEqualTo(original.getTaskInfo().getAttemptNumber());
        Assertions.assertThat((Object)initContext.metricGroup()).isEqualTo((Object)original.metricGroup());
        Assertions.assertThat((OptionalLong)initContext.getRestoredCheckpointId()).isEqualTo((Object)original.getRestoredCheckpointId());
        Assertions.assertThat((boolean)initContext.isObjectReuseEnabled()).isEqualTo(original.isObjectReuseEnabled());
        Assertions.assertThat((Object)initContext.createInputSerializer()).isEqualTo((Object)original.createInputSerializer());
        Assertions.assertThat((Comparable)initContext.getJobInfo().getJobId()).isEqualTo((Object)original.getJobInfo().getJobId());
        Assertions.assertThat((Optional)initContext.metadataConsumer()).isEqualTo((Object)original.metadataConsumer());
    }

    private static void assertRestoredCommitterCommittable(Object record, String committable) {
        ((ObjectAssert)Assertions.assertThat((Object)record).isInstanceOf(CommittableWithLineage.class)).satisfies(new ThrowingConsumer[]{cl -> SinkV2Assertions.assertThat((CommittableWithLineage)((CommittableWithLineage)cl)).hasCommittable((Object)committable).hasCheckpointId(1L).hasSubtaskId(0)});
    }

    private static void assertEmitted(List<String> records, Queue<Object> output) {
        List<StreamElement> collected = SinkTestUtil.fromOutput(output);
        Assertions.assertThat(collected).hasSize(records.size() + 1);
        ((ObjectAssert)Assertions.assertThat((Object)collected.get(0).asRecord().getValue()).isInstanceOf(CommittableSummary.class)).satisfies(new ThrowingConsumer[]{cs -> SinkV2Assertions.assertThat((CommittableSummary)((CommittableSummary)cs)).hasPendingCommittables(records.size()).hasOverallCommittables(records.size()).hasFailedCommittables(0)});
        ArrayList<String> committables = new ArrayList<String>();
        for (int i = 1; i <= records.size(); ++i) {
            Object value = collected.get(i).asRecord().getValue();
            Assertions.assertThat((Object)value).isInstanceOf(CommittableWithLineage.class);
            committables.add((String)((CommittableWithLineage)value).getCommittable());
        }
        Assertions.assertThat(committables).containsExactlyInAnyOrderElementsOf(records);
    }

    private static void assertBasicOutput(Collection<Object> queuedOutput, int numberOfCommittables, long checkpointId) {
        List<StreamElement> output = SinkTestUtil.fromOutput(queuedOutput);
        Assertions.assertThat(output).hasSize(numberOfCommittables + 1);
        ((ObjectAssert)Assertions.assertThat((Object)output.get(0).asRecord().getValue()).isInstanceOf(CommittableSummary.class)).satisfies(new ThrowingConsumer[]{cs -> SinkV2Assertions.assertThat((CommittableSummary)((CommittableSummary)cs)).hasOverallCommittables(numberOfCommittables).hasPendingCommittables(numberOfCommittables).hasFailedCommittables(0)});
        for (int i = 1; i <= numberOfCommittables; ++i) {
            ((ObjectAssert)Assertions.assertThat((Object)output.get(i).asRecord().getValue()).isInstanceOf(CommittableWithLineage.class)).satisfies(new ThrowingConsumer[]{cl -> SinkV2Assertions.assertThat((CommittableWithLineage)((CommittableWithLineage)cl)).hasCheckpointId(checkpointId).hasSubtaskId(0)});
        }
    }

    abstract InspectableSink sinkWithoutCommitter();

    abstract InspectableSink sinkWithTimeBasedWriter();

    abstract InspectableSink sinkWithState(boolean var1, String var2);

    abstract InspectableSink sinkWithCommitter();

    static abstract class AbstractInspectableSink<S extends Sink<Integer>>
    implements InspectableSink {
        private final S sink;

        protected AbstractInspectableSink(S sink) {
            this.sink = sink;
        }

        public S getSink() {
            return this.sink;
        }
    }

    static interface InspectableSink {
        public long getLastCheckpointId();

        public List<String> getRecordsOfCurrentCheckpoint();

        public List<org.apache.flink.api.common.eventtime.Watermark> getWatermarks();

        public int getRecordCountFromState();

        public Sink<Integer> getSink();
    }

    private static class TestingCommittableSerializer
    extends SinkV1WriterCommittableSerializer<String> {
        private final SimpleVersionedSerializer<String> committableSerializer;

        public TestingCommittableSerializer(SimpleVersionedSerializer<String> committableSerializer) {
            super(committableSerializer);
            this.committableSerializer = committableSerializer;
        }

        public byte[] serialize(List<String> obj) throws IOException {
            DataOutputSerializer out = new DataOutputSerializer(256);
            out.writeInt(-1189141204);
            SimpleVersionedSerialization.writeVersionAndSerializeList(this.committableSerializer, obj, (DataOutputView)out);
            return out.getCopyOfBuffer();
        }
    }

    private static class CompatibleStateSinkOperator<T>
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        static final String SINK_STATE_NAME = "compatible_sink_state";
        static final ListStateDescriptor<byte[]> SINK_STATE_DESC = new ListStateDescriptor("compatible_sink_state", (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
        ListState<T> sinkState;
        private final SimpleVersionedSerializer<T> serializer;
        private final T initialState;

        public CompatibleStateSinkOperator(SimpleVersionedSerializer<T> serializer, T initialState) {
            this.serializer = serializer;
            this.initialState = initialState;
        }

        public void initializeState(StateInitializationContext context) throws Exception {
            super.initializeState(context);
            this.sinkState = new SimpleVersionedListState(context.getOperatorStateStore().getListState(SINK_STATE_DESC), this.serializer);
            if (!context.isRestored()) {
                this.sinkState.add(this.initialState);
            }
        }

        public void processElement(StreamRecord<String> element) {
        }
    }

    private static class TestCommitterOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC = new ListStateDescriptor("streaming_committer_raw_states", (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
        private ListState<List<String>> committerState;
        private final List<String> buffer = new ArrayList<String>();
        private final SimpleVersionedSerializer<String> serializer;

        public TestCommitterOperator(SimpleVersionedSerializer<String> serializer) {
            this.serializer = serializer;
        }

        public void initializeState(StateInitializationContext context) throws Exception {
            super.initializeState(context);
            this.committerState = new SimpleVersionedListState(context.getOperatorStateStore().getListState(STREAMING_COMMITTER_RAW_STATES_DESC), (SimpleVersionedSerializer)new TestingCommittableSerializer(this.serializer));
        }

        public void processElement(StreamRecord<String> element) throws Exception {
            this.buffer.add((String)element.getValue());
        }

        public void snapshotState(StateSnapshotContext context) throws Exception {
            super.snapshotState(context);
            this.committerState.add(this.buffer);
        }
    }
}

