/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.connector.source.lib;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.util.SimpleUserCodeClassLoader;
import org.apache.flink.util.UserCodeClassLoader;
import org.junit.Assert;
import org.junit.Test;

public class NumberSequenceSourceTest {
    @Test
    public void testReaderCheckpoints() throws Exception {
        long from = 177L;
        long mid = 333L;
        long to = 563L;
        long elementsPerCycle = 128L;
        TestingReaderOutput out = new TestingReaderOutput();
        SourceReader<Long, NumberSequenceSource.NumberSequenceSplit> reader = NumberSequenceSourceTest.createReader();
        reader.addSplits(Arrays.asList(new NumberSequenceSource.NumberSequenceSplit("split-1", 177L, 333L), new NumberSequenceSource.NumberSequenceSplit("split-2", 334L, 563L)));
        long remainingInCycle = 128L;
        while (reader.pollNext(out) != InputStatus.END_OF_INPUT) {
            if (--remainingInCycle > 0L) continue;
            remainingInCycle = 128L;
            List splits = reader.snapshotState(1L);
            reader = NumberSequenceSourceTest.createReader();
            if (splits.isEmpty()) {
                reader.notifyNoMoreSplits();
                continue;
            }
            reader.addSplits(splits);
        }
        ArrayList<Long> result = out.getEmittedRecords();
        NumberSequenceSourceTest.validateSequence(result, 177L, 563L);
    }

    private static void validateSequence(List<Long> sequence, long from, long to) {
        if ((long)sequence.size() != to - from + 1L) {
            NumberSequenceSourceTest.failSequence(sequence, from, to);
        }
        long nextExpected = from;
        for (Long next : sequence) {
            if (next == nextExpected++) continue;
            NumberSequenceSourceTest.failSequence(sequence, from, to);
        }
    }

    private static void failSequence(List<Long> sequence, long from, long to) {
        Assert.fail((String)String.format("Expected: A sequence [%d, %d], but found: sequence (size %d) : %s", from, to, sequence.size(), sequence));
    }

    private static SourceReader<Long, NumberSequenceSource.NumberSequenceSplit> createReader() {
        return new NumberSequenceSource(0L, 0L).createReader((SourceReaderContext)new DummyReaderContext());
    }

    private static final class TestingReaderOutput<E>
    implements ReaderOutput<E> {
        private final ArrayList<E> emittedRecords = new ArrayList();

        private TestingReaderOutput() {
        }

        public void collect(E record) {
            this.emittedRecords.add(record);
        }

        public void collect(E record, long timestamp) {
            this.collect(record);
        }

        public void emitWatermark(Watermark watermark) {
            throw new UnsupportedOperationException();
        }

        public void markIdle() {
            throw new UnsupportedOperationException();
        }

        public void markActive() {
            throw new UnsupportedOperationException();
        }

        public SourceOutput<E> createOutputForSplit(String splitId) {
            return this;
        }

        public void releaseOutputForSplit(String splitId) {
        }

        public ArrayList<E> getEmittedRecords() {
            return this.emittedRecords;
        }
    }

    private static final class DummyReaderContext
    implements SourceReaderContext {
        private DummyReaderContext() {
        }

        public SourceReaderMetricGroup metricGroup() {
            return UnregisteredMetricsGroup.createSourceReaderMetricGroup();
        }

        public Configuration getConfiguration() {
            return new Configuration();
        }

        public String getLocalHostName() {
            return "localhost";
        }

        public int getIndexOfSubtask() {
            return 0;
        }

        public void sendSplitRequest() {
        }

        public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {
        }

        public UserCodeClassLoader getUserCodeClassLoader() {
            return SimpleUserCodeClassLoader.create((ClassLoader)this.getClass().getClassLoader());
        }

        public int currentParallelism() {
            return 1;
        }
    }
}

