/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil;
import org.apache.flink.streaming.runtime.operators.sink.SinkV1WriterCommittableSerializer;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class SinkWriterOperatorTest {
    SinkWriterOperatorTest() {
    }

    @Test
    void testNotEmitCommittablesWithoutCommitter() throws Exception {
        TestSink.DefaultSinkWriter sinkWriter = new TestSink.DefaultSinkWriter();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory(TestSink.newBuilder().setWriter(sinkWriter).build().asV2()));
        testHarness.open();
        testHarness.processElement(1, 1L);
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        Assertions.assertThat(sinkWriter.elements).containsOnly((Object[])new String[]{"(1,1,-9223372036854775808)"});
        testHarness.prepareSnapshotPreBarrier(1L);
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        Assertions.assertThat(sinkWriter.elements).isEmpty();
        testHarness.close();
    }

    @Test
    void testWatermarkPropagatedToSinkWriter() throws Exception {
        long initialTime = 0L;
        TestSink.DefaultSinkWriter writer = new TestSink.DefaultSinkWriter();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory(TestSink.newBuilder().setWriter(writer).build().asV2()));
        testHarness.open();
        testHarness.processWatermark(0L);
        testHarness.processWatermark(1L);
        Assertions.assertThat(testHarness.getOutput()).containsExactly(new Object[]{new Watermark(0L), new Watermark(1L)});
        Assertions.assertThat(writer.watermarks).containsExactly((Object[])new org.apache.flink.api.common.eventtime.Watermark[]{new org.apache.flink.api.common.eventtime.Watermark(0L), new org.apache.flink.api.common.eventtime.Watermark(1L)});
        testHarness.close();
    }

    @Test
    public void testTimeBasedBufferingSinkWriter() throws Exception {
        long initialTime = 0L;
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory(TestSink.newBuilder().setDefaultCommitter().setWriter(new TimeBasedBufferingSinkWriter()).build().asV2()));
        testHarness.open();
        testHarness.setProcessingTime(0L);
        testHarness.processElement(1, 1L);
        testHarness.processElement(2, 2L);
        testHarness.prepareSnapshotPreBarrier(1L);
        SinkWriterOperatorTest.assertBasicOutput(testHarness.getOutput(), 0, 1L);
        testHarness.getOutput().poll();
        testHarness.getProcessingTimeService().setCurrentTime(2001L);
        testHarness.prepareSnapshotPreBarrier(2L);
        SinkWriterOperatorTest.assertBasicOutput(testHarness.getOutput(), 2, 2L);
        testHarness.close();
    }

    @Test
    void testEmitOnFlushWithCommitter() throws Exception {
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory(TestSink.newBuilder().setDefaultCommitter().build().asV2()));
        testHarness.open();
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        testHarness.processElement(1, 1L);
        testHarness.processElement(2, 2L);
        testHarness.prepareSnapshotPreBarrier(1L);
        SinkWriterOperatorTest.assertBasicOutput(testHarness.getOutput(), 2, 1L);
        testHarness.close();
    }

    @Test
    void testEmitOnEndOfInputInBatchMode() throws Exception {
        SinkWriterOperatorFactory writerOperatorFactory = new SinkWriterOperatorFactory(TestSink.newBuilder().setDefaultCommitter().build().asV2());
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(writerOperatorFactory);
        testHarness.open();
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        testHarness.processElement(1, 1L);
        testHarness.endInput();
        SinkWriterOperatorTest.assertBasicOutput(testHarness.getOutput(), 1, Long.MAX_VALUE);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testStateRestore(boolean stateful) throws Exception {
        long initialTime = 0L;
        SnapshottingBufferingSinkWriter snapshottingWriter = new SnapshottingBufferingSinkWriter();
        OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> testHarness = SinkWriterOperatorTest.createTestHarnessWithBufferingSinkWriter(snapshottingWriter, stateful);
        testHarness.open();
        testHarness.processWatermark(0L);
        testHarness.processElement(1, 1L);
        testHarness.processElement(2, 2L);
        testHarness.prepareSnapshotPreBarrier(1L);
        OperatorSubtaskState snapshot = testHarness.snapshot(1L, 1L);
        ((AbstractCollectionAssert)Assertions.assertThat(testHarness.getOutput()).hasSize(2)).contains(new Object[]{new Watermark(0L)});
        Assertions.assertThat((long)snapshottingWriter.lastCheckpointId).isEqualTo(stateful ? 1L : -1L);
        testHarness.close();
        OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> restoredTestHarness = SinkWriterOperatorTest.createTestHarnessWithBufferingSinkWriter(new SnapshottingBufferingSinkWriter(), stateful);
        restoredTestHarness.initializeState(snapshot);
        restoredTestHarness.open();
        restoredTestHarness.endInput();
        long checkpointId = 2L;
        restoredTestHarness.prepareSnapshotPreBarrier(2L);
        if (stateful) {
            SinkWriterOperatorTest.assertBasicOutput(restoredTestHarness.getOutput(), 2, Long.MAX_VALUE);
        } else {
            ((ObjectAssert)Assertions.assertThat((Object)SinkTestUtil.fromOutput(restoredTestHarness.getOutput()).get(0).asRecord().getValue()).isInstanceOf(CommittableSummary.class)).satisfies(new ThrowingConsumer[]{cs -> SinkV2Assertions.assertThat((CommittableSummary)cs).hasOverallCommittables(0).hasPendingCommittables(0).hasFailedCommittables(0)});
        }
        restoredTestHarness.close();
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testLoadPreviousSinkState(boolean stateful) throws Exception {
        List<String> previousSinkInputs = Arrays.asList("bit", "mention", "thick", "stick", "stir", "easy", "sleep", "forth", "cost", "prompt");
        OneInputStreamOperatorTestHarness<String, String> previousSink = new OneInputStreamOperatorTestHarness<String, String>((OneInputStreamOperator<String, String>)new DummySinkOperator(), (TypeSerializer<String>)StringSerializer.INSTANCE);
        OperatorSubtaskState previousSinkState = TestHarnessUtil.buildSubtaskState(previousSink, previousSinkInputs);
        OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> compatibleWriterOperator = SinkWriterOperatorTest.createCompatibleStateTestHarness(stateful);
        ArrayList<String> expectedOutput1 = stateful ? new ArrayList<String>(previousSinkInputs) : new ArrayList();
        expectedOutput1.add(Tuple3.of((Object)1, (Object)1, (Object)Long.MIN_VALUE).toString());
        compatibleWriterOperator.initializeState(previousSinkState);
        compatibleWriterOperator.open();
        compatibleWriterOperator.processElement(1, 1L);
        compatibleWriterOperator.endInput();
        compatibleWriterOperator.prepareSnapshotPreBarrier(1L);
        OperatorSubtaskState operatorStateWithoutPreviousState = compatibleWriterOperator.snapshot(1L, 1L);
        compatibleWriterOperator.close();
        SinkWriterOperatorTest.assertEmitted(expectedOutput1, compatibleWriterOperator.getOutput());
        OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> restoredSinkOperator = SinkWriterOperatorTest.createCompatibleStateTestHarness(stateful);
        List<String> expectedOutput2 = Arrays.asList(Tuple3.of((Object)2, (Object)2, (Object)Long.MIN_VALUE).toString(), Tuple3.of((Object)3, (Object)3, (Object)Long.MIN_VALUE).toString());
        restoredSinkOperator.initializeState(operatorStateWithoutPreviousState);
        restoredSinkOperator.open();
        restoredSinkOperator.processElement(2, 2L);
        restoredSinkOperator.processElement(3, 3L);
        restoredSinkOperator.endInput();
        restoredSinkOperator.prepareSnapshotPreBarrier(2L);
        SinkWriterOperatorTest.assertEmitted(expectedOutput2, restoredSinkOperator.getOutput());
        restoredSinkOperator.close();
    }

    @Test
    void testRestoreCommitterState() throws Exception {
        List<String> committables = Arrays.asList("state1", "state2");
        OneInputStreamOperatorTestHarness<String, String> committer = new OneInputStreamOperatorTestHarness<String, String>((OneInputStreamOperator<String, String>)new TestCommitterOperator(), (TypeSerializer<String>)StringSerializer.INSTANCE);
        OperatorSubtaskState committerState = TestHarnessUtil.buildSubtaskState(committer, committables);
        TestSink.DefaultSinkWriter sinkWriter = new TestSink.DefaultSinkWriter();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory(TestSink.newBuilder().setDefaultCommitter().setWriter(sinkWriter).build().asV2()));
        testHarness.initializeState(committerState);
        testHarness.open();
        testHarness.prepareSnapshotPreBarrier(2L);
        List<StreamElement> output = SinkTestUtil.fromOutput(testHarness.getOutput());
        Assertions.assertThat(output).hasSize(4);
        ((ObjectAssert)Assertions.assertThat((Object)output.get(0).asRecord().getValue()).isInstanceOf(CommittableSummary.class)).satisfies(new ThrowingConsumer[]{cs -> SinkV2Assertions.assertThat((CommittableSummary)cs).hasPendingCommittables(committables.size()).hasCheckpointId(1L).hasOverallCommittables(committables.size()).hasFailedCommittables(0)});
        SinkWriterOperatorTest.assertRestoredCommitterCommittable(output.get(1).asRecord().getValue(), committables.get(0));
        SinkWriterOperatorTest.assertRestoredCommitterCommittable(output.get(2).asRecord().getValue(), committables.get(1));
        ((ObjectAssert)Assertions.assertThat((Object)output.get(3).asRecord().getValue()).isInstanceOf(CommittableSummary.class)).satisfies(new ThrowingConsumer[]{cs -> SinkV2Assertions.assertThat((CommittableSummary)cs).hasPendingCommittables(0).hasCheckpointId(2L).hasOverallCommittables(0).hasFailedCommittables(0)});
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Exception {
        TestSink.DefaultSinkWriter sinkWriter = new TestSink.DefaultSinkWriter();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory(TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build().asV2()));
        testHarness.open();
        testHarness.processElement(1, 1L);
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        String record = "(1,1,-9223372036854775808)";
        Assertions.assertThat(sinkWriter.elements).containsOnly((Object[])new String[]{"(1,1,-9223372036854775808)"});
        testHarness.endInput();
        if (isCheckpointingEnabled) {
            testHarness.prepareSnapshotPreBarrier(1L);
        }
        SinkWriterOperatorTest.assertEmitted(Collections.singletonList("(1,1,-9223372036854775808)"), testHarness.getOutput());
        Assertions.assertThat(sinkWriter.elements).isEmpty();
        testHarness.close();
    }

    @Test
    void testInitContext() throws Exception {
        AtomicReference initContext = new AtomicReference();
        Sink & Serializable sink = (Sink & Serializable)context -> {
            initContext.set(context);
            return null;
        };
        boolean subtaskId = true;
        int parallelism = 10;
        StringSerializer typeSerializer = StringSerializer.INSTANCE;
        JobID jobID = new JobID();
        MockEnvironment environment = MockEnvironment.builder().setSubtaskIndex(1).setParallelism(10).setMaxParallelism(10).setJobID(jobID).setExecutionConfig(new ExecutionConfig().enableObjectReuse()).build();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory((Sink)sink), typeSerializer, environment);
        testHarness.open();
        Assertions.assertThat((Object)((Sink.InitContext)initContext.get()).getUserCodeClassLoader()).isNotNull();
        Assertions.assertThat((Object)((Sink.InitContext)initContext.get()).getMailboxExecutor()).isNotNull();
        Assertions.assertThat((Object)((Sink.InitContext)initContext.get()).getProcessingTimeService()).isNotNull();
        Assertions.assertThat((int)((Sink.InitContext)initContext.get()).getSubtaskId()).isEqualTo(1);
        Assertions.assertThat((int)((Sink.InitContext)initContext.get()).getNumberOfParallelSubtasks()).isEqualTo(10);
        Assertions.assertThat((int)((Sink.InitContext)initContext.get()).getAttemptNumber()).isEqualTo(0);
        Assertions.assertThat((Object)((Sink.InitContext)initContext.get()).metricGroup()).isNotNull();
        Assertions.assertThat((OptionalLong)((Sink.InitContext)initContext.get()).getRestoredCheckpointId()).isNotPresent();
        Assertions.assertThat((boolean)((Sink.InitContext)initContext.get()).isObjectReuseEnabled()).isTrue();
        Assertions.assertThat((Object)((Sink.InitContext)initContext.get()).createInputSerializer()).isEqualTo((Object)typeSerializer);
        Assertions.assertThat((Comparable)((Sink.InitContext)initContext.get()).getJobId()).isEqualTo((Object)jobID);
        testHarness.close();
    }

    private static void assertRestoredCommitterCommittable(Object record, String committable) {
        ((ObjectAssert)Assertions.assertThat((Object)record).isInstanceOf(CommittableWithLineage.class)).satisfies(new ThrowingConsumer[]{cl -> SinkV2Assertions.assertThat((CommittableWithLineage)cl).hasCommittable(committable).hasCheckpointId(1L).hasSubtaskId(0)});
    }

    private static void assertEmitted(List<String> records, Queue<Object> output) {
        List<StreamElement> collected = SinkTestUtil.fromOutput(output);
        Assertions.assertThat(collected).hasSize(records.size() + 1);
        ((ObjectAssert)Assertions.assertThat((Object)collected.get(0).asRecord().getValue()).isInstanceOf(CommittableSummary.class)).satisfies(new ThrowingConsumer[]{cs -> SinkV2Assertions.assertThat((CommittableSummary)cs).hasPendingCommittables(records.size()).hasOverallCommittables(records.size()).hasFailedCommittables(0)});
        ArrayList<Object> committables = new ArrayList<Object>();
        for (int i = 1; i <= records.size(); ++i) {
            Object value = collected.get(i).asRecord().getValue();
            Assertions.assertThat((Object)value).isInstanceOf(CommittableWithLineage.class);
            committables.add(((CommittableWithLineage)value).getCommittable());
        }
        Assertions.assertThat(committables).containsExactlyInAnyOrderElementsOf(records);
    }

    private static OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> createTestHarnessWithBufferingSinkWriter(SnapshottingBufferingSinkWriter snapshottingWriter, boolean stateful) throws Exception {
        TestSink.Builder<Integer> builder = TestSink.newBuilder().setDefaultCommitter().setWriter(snapshottingWriter);
        if (stateful) {
            builder.withWriterState();
        }
        SinkWriterOperatorFactory writerOperatorFactory = new SinkWriterOperatorFactory(builder.build().asV2());
        return new OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>>((OneInputStreamOperatorFactory<Integer, CommittableMessage<Integer>>)writerOperatorFactory);
    }

    private static OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> createCompatibleStateTestHarness(boolean stateful) throws Exception {
        SnapshottingBufferingSinkWriter snapshottingWriter = new SnapshottingBufferingSinkWriter();
        TestSink.Builder<Integer> builder = TestSink.newBuilder().setDefaultCommitter().setCompatibleStateNames("dummy_sink_state").setWriter(snapshottingWriter);
        if (stateful) {
            builder.withWriterState();
        }
        SinkWriterOperatorFactory writerOperatorFactory = new SinkWriterOperatorFactory(builder.build().asV2());
        return new OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>>((OneInputStreamOperatorFactory<Integer, CommittableMessage<Integer>>)writerOperatorFactory);
    }

    private static void assertBasicOutput(Collection<Object> queuedOutput, int numberOfCommittables, @Nullable Long checkpointId) {
        List<StreamElement> output = SinkTestUtil.fromOutput(queuedOutput);
        Assertions.assertThat(output).hasSize(numberOfCommittables + 1);
        ((ObjectAssert)Assertions.assertThat((Object)output.get(0).asRecord().getValue()).isInstanceOf(CommittableSummary.class)).satisfies(new ThrowingConsumer[]{cs -> SinkV2Assertions.assertThat((CommittableSummary)cs).hasOverallCommittables(numberOfCommittables).hasPendingCommittables(numberOfCommittables).hasFailedCommittables(0)});
        for (int i = 1; i <= numberOfCommittables; ++i) {
            ((ObjectAssert)Assertions.assertThat((Object)output.get(i).asRecord().getValue()).isInstanceOf(CommittableWithLineage.class)).satisfies(new ThrowingConsumer[]{cl -> SinkV2Assertions.assertThat((CommittableWithLineage)cl).hasCheckpointId(checkpointId).hasSubtaskId(0)});
        }
    }

    private static class TestingCommittableSerializer
    extends SinkV1WriterCommittableSerializer<String> {
        private final SimpleVersionedSerializer<String> committableSerializer;

        public TestingCommittableSerializer(SimpleVersionedSerializer<String> committableSerializer) {
            super(committableSerializer);
            this.committableSerializer = committableSerializer;
        }

        public byte[] serialize(List<String> obj) throws IOException {
            DataOutputSerializer out = new DataOutputSerializer(256);
            out.writeInt(-1189141204);
            SimpleVersionedSerialization.writeVersionAndSerializeList(this.committableSerializer, obj, (DataOutputView)out);
            return out.getCopyOfBuffer();
        }
    }

    private static class SnapshottingBufferingSinkWriter
    extends TestSink.DefaultSinkWriter<Integer> {
        public static final int NOT_SNAPSHOTTED = -1;
        long lastCheckpointId = -1L;

        private SnapshottingBufferingSinkWriter() {
        }

        @Override
        public List<String> snapshotState(long checkpointId) {
            this.lastCheckpointId = checkpointId;
            return this.elements;
        }

        @Override
        void restoredFrom(List<String> states) {
            this.elements = new ArrayList<String>(states);
        }

        @Override
        public List<String> prepareCommit(boolean flush) {
            if (!flush) {
                return Collections.emptyList();
            }
            List result = this.elements;
            this.elements = new ArrayList();
            return result;
        }
    }

    private static class DummySinkOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        static final String DUMMY_SINK_STATE_NAME = "dummy_sink_state";
        static final ListStateDescriptor<byte[]> SINK_STATE_DESC = new ListStateDescriptor("dummy_sink_state", (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
        ListState<String> sinkState;

        private DummySinkOperator() {
        }

        public void initializeState(StateInitializationContext context) throws Exception {
            super.initializeState(context);
            this.sinkState = new SimpleVersionedListState(context.getOperatorStateStore().getListState(SINK_STATE_DESC), (SimpleVersionedSerializer)TestSink.StringCommittableSerializer.INSTANCE);
        }

        public void processElement(StreamRecord<String> element) throws Exception {
            this.sinkState.add(element.getValue());
        }
    }

    private static class TestCommitterOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC = new ListStateDescriptor("streaming_committer_raw_states", (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
        private ListState<List<String>> committerState;
        private final List<String> buffer = new ArrayList<String>();

        private TestCommitterOperator() {
        }

        public void initializeState(StateInitializationContext context) throws Exception {
            super.initializeState(context);
            this.committerState = new SimpleVersionedListState(context.getOperatorStateStore().getListState(STREAMING_COMMITTER_RAW_STATES_DESC), (SimpleVersionedSerializer)new TestingCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE));
        }

        public void processElement(StreamRecord<String> element) throws Exception {
            this.buffer.add((String)element.getValue());
        }

        public void snapshotState(StateSnapshotContext context) throws Exception {
            super.snapshotState(context);
            this.committerState.add(this.buffer);
        }
    }

    private static class TimeBasedBufferingSinkWriter
    extends TestSink.DefaultSinkWriter<Integer>
    implements Sink.ProcessingTimeService.ProcessingTimeCallback {
        private final List<String> cachedCommittables = new ArrayList<String>();

        private TimeBasedBufferingSinkWriter() {
        }

        @Override
        public void write(Integer element, SinkWriter.Context context) {
            this.cachedCommittables.add(Tuple3.of((Object)element, (Object)context.timestamp(), (Object)context.currentWatermark()).toString());
        }

        @Override
        void setProcessingTimerService(Sink.ProcessingTimeService processingTimerService) {
            super.setProcessingTimerService(processingTimerService);
            this.processingTimerService.registerProcessingTimer(1000L, (Sink.ProcessingTimeService.ProcessingTimeCallback)this);
        }

        public void onProcessingTime(long time) {
            this.elements.addAll(this.cachedCommittables);
            this.cachedCommittables.clear();
            this.processingTimerService.registerProcessingTimer(time + 1000L, (Sink.ProcessingTimeService.ProcessingTimeCallback)this);
        }
    }
}

