/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;

class WithAdapterSinkWriterOperatorTest
extends SinkWriterOperatorTestBase {
    WithAdapterSinkWriterOperatorTest() {
    }

    @Override
    SinkWriterOperatorTestBase.SinkAndSuppliers sinkWithoutCommitter() {
        TestSink.DefaultSinkWriter sinkWriter = new TestSink.DefaultSinkWriter();
        return new SinkWriterOperatorTestBase.SinkAndSuppliers(TestSink.newBuilder().setWriter(sinkWriter).build().asV2(), () -> sinkWriter.elements, () -> sinkWriter.watermarks, () -> -1L, () -> new TestSink.StringCommittableSerializer());
    }

    @Override
    SinkWriterOperatorTestBase.SinkAndSuppliers sinkWithCommitter() {
        TestSink.DefaultSinkWriter sinkWriter = new TestSink.DefaultSinkWriter();
        return new SinkWriterOperatorTestBase.SinkAndSuppliers(TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build().asV2(), () -> sinkWriter.elements, () -> sinkWriter.watermarks, () -> -1L, () -> new TestSink.StringCommittableSerializer());
    }

    @Override
    SinkWriterOperatorTestBase.SinkAndSuppliers sinkWithTimeBasedWriter() {
        TimeBasedBufferingSinkWriter sinkWriter = new TimeBasedBufferingSinkWriter();
        return new SinkWriterOperatorTestBase.SinkAndSuppliers(TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build().asV2(), () -> sinkWriter.elements, () -> sinkWriter.watermarks, () -> -1L, () -> new TestSink.StringCommittableSerializer());
    }

    @Override
    SinkWriterOperatorTestBase.SinkAndSuppliers sinkWithSnapshottingWriter(boolean withState, String stateName) {
        SnapshottingBufferingSinkWriter sinkWriter = new SnapshottingBufferingSinkWriter();
        TestSink.Builder<Integer> builder = TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter();
        if (withState) {
            builder.withWriterState();
        }
        if (stateName != null) {
            builder.setCompatibleStateNames(stateName);
        }
        return new SinkWriterOperatorTestBase.SinkAndSuppliers(builder.build().asV2(), () -> sinkWriter.elements, () -> sinkWriter.watermarks, () -> sinkWriter.lastCheckpointId, () -> new TestSink.StringCommittableSerializer());
    }

    private static class SnapshottingBufferingSinkWriter
    extends TestSink.DefaultSinkWriter<Integer> {
        public static final int NOT_SNAPSHOTTED = -1;
        long lastCheckpointId = -1L;

        private SnapshottingBufferingSinkWriter() {
        }

        @Override
        public List<String> snapshotState(long checkpointId) {
            this.lastCheckpointId = checkpointId;
            return this.elements;
        }

        @Override
        void restoredFrom(List<String> states) {
            this.elements = new ArrayList<String>(states);
        }

        @Override
        public List<String> prepareCommit(boolean flush) {
            if (!flush) {
                return Collections.emptyList();
            }
            List result = this.elements;
            this.elements = new ArrayList();
            return result;
        }
    }

    private static class TimeBasedBufferingSinkWriter
    extends TestSink.DefaultSinkWriter<Integer>
    implements Sink.ProcessingTimeService.ProcessingTimeCallback {
        private final List<String> cachedCommittables = new ArrayList<String>();

        private TimeBasedBufferingSinkWriter() {
        }

        @Override
        public void write(Integer element, SinkWriter.Context context) {
            this.cachedCommittables.add(Tuple3.of((Object)element, (Object)context.timestamp(), (Object)context.currentWatermark()).toString());
        }

        @Override
        void setProcessingTimerService(Sink.ProcessingTimeService processingTimerService) {
            super.setProcessingTimerService(processingTimerService);
            this.processingTimerService.registerProcessingTimer(1000L, (Sink.ProcessingTimeService.ProcessingTimeCallback)this);
        }

        public void onProcessingTime(long time) {
            this.elements.addAll(this.cachedCommittables);
            this.cachedCommittables.clear();
            this.processingTimerService.registerProcessingTimer(time + 1000L, (Sink.ProcessingTimeService.ProcessingTimeCallback)this);
        }
    }
}

