/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;

class WithAdapterSinkWriterOperatorTest
extends SinkWriterOperatorTestBase {
    WithAdapterSinkWriterOperatorTest() {
    }

    @Override
    InspectableSink sinkWithoutCommitter() {
        TestSink.DefaultSinkWriter sinkWriter = new TestSink.DefaultSinkWriter();
        return new InspectableSink(TestSink.newBuilder().setWriter(sinkWriter).build());
    }

    @Override
    InspectableSink sinkWithCommitter() {
        TestSink.DefaultSinkWriter sinkWriter = new TestSink.DefaultSinkWriter();
        return new InspectableSink(TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build());
    }

    @Override
    InspectableSink sinkWithTimeBasedWriter() {
        TimeBasedBufferingSinkWriter sinkWriter = new TimeBasedBufferingSinkWriter();
        return new InspectableSink(TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter().build());
    }

    @Override
    InspectableSink sinkWithState(boolean withState, String stateName) {
        TestSink.DefaultSinkWriter sinkWriter = new TestSink.DefaultSinkWriter();
        TestSink.Builder builder = TestSink.newBuilder().setWriter(sinkWriter).setDefaultCommitter();
        if (withState) {
            builder.withWriterState();
            if (stateName != null) {
                builder.setCompatibleStateNames(stateName);
            }
        }
        return new InspectableSink(builder.build());
    }

    class InspectableSink
    extends SinkWriterOperatorTestBase.AbstractInspectableSink<Sink<Integer>> {
        private final TestSink<Integer> sink;

        InspectableSink(TestSink<Integer> sink) {
            super(sink.asV2());
            this.sink = sink;
        }

        @Override
        public long getLastCheckpointId() {
            return this.sink.getWriter().lastCheckpointId;
        }

        @Override
        public List<String> getRecordsOfCurrentCheckpoint() {
            return this.sink.getWriter().elements;
        }

        @Override
        public List<Watermark> getWatermarks() {
            return this.sink.getWriter().watermarks;
        }

        @Override
        public int getRecordCountFromState() {
            return this.sink.getWriter().getRecordCount();
        }
    }

    private static class TimeBasedBufferingSinkWriter
    extends TestSink.DefaultSinkWriter<Integer>
    implements Sink.ProcessingTimeService.ProcessingTimeCallback {
        private final List<String> cachedCommittables = new ArrayList<String>();

        private TimeBasedBufferingSinkWriter() {
        }

        @Override
        public void write(Integer element, SinkWriter.Context context) {
            this.cachedCommittables.add(Tuple3.of((Object)element, (Object)context.timestamp(), (Object)context.currentWatermark()).toString());
        }

        @Override
        void setProcessingTimerService(Sink.ProcessingTimeService processingTimerService) {
            super.setProcessingTimerService(processingTimerService);
            this.processingTimerService.registerProcessingTimer(1000L, (Sink.ProcessingTimeService.ProcessingTimeCallback)this);
        }

        public void onProcessingTime(long time) {
            this.elements.addAll(this.cachedCommittables);
            this.cachedCommittables.clear();
            this.processingTimerService.registerProcessingTimer(time + 1000L, (Sink.ProcessingTimeService.ProcessingTimeCallback)this);
        }
    }
}

