/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.sort;

import java.io.IOException;
import java.io.Serializable;
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.sort.MultiInputSortingDataInput;
import org.apache.flink.streaming.api.operators.sort.SortingDataInput;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.io.MultipleInputSelectionHandler;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor;
import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
import org.apache.flink.streaming.runtime.io.StreamTaskInput;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class LargeSortingDataInputITCase {
    LargeSortingDataInputITCase() {
    }

    @Test
    void intKeySorting() throws Exception {
        int numberOfRecords = 500000;
        GeneratedRecordsDataInput input = new GeneratedRecordsDataInput(numberOfRecords, 0);
        KeySelector & Serializable keySelector = (KeySelector & Serializable)value -> (Integer)value.f0;
        try (MockEnvironment environment = MockEnvironment.builder().build();
             SortingDataInput sortingDataInput = new SortingDataInput((StreamTaskInput)input, GeneratedRecordsDataInput.SERIALIZER, (TypeSerializer)new IntSerializer(), (KeySelector)keySelector, environment.getMemoryManager(), environment.getIOManager(), true, 1.0, new Configuration(), (TaskInvokable)new DummyInvokable(), new ExecutionConfig());){
            DataInputStatus inputStatus;
            VerifyingOutput output = new VerifyingOutput(keySelector);
            while ((inputStatus = sortingDataInput.emitNext(output)) != DataInputStatus.END_OF_INPUT) {
            }
            Assertions.assertThat((int)output.getSeenRecords()).isEqualTo(numberOfRecords);
        }
    }

    @Test
    void stringKeySorting() throws Exception {
        int numberOfRecords = 500000;
        GeneratedRecordsDataInput input = new GeneratedRecordsDataInput(numberOfRecords, 0);
        KeySelector & Serializable keySelector = (KeySelector & Serializable)value -> (String)value.f1;
        try (MockEnvironment environment = MockEnvironment.builder().build();
             SortingDataInput sortingDataInput = new SortingDataInput((StreamTaskInput)input, GeneratedRecordsDataInput.SERIALIZER, (TypeSerializer)new StringSerializer(), (KeySelector)keySelector, environment.getMemoryManager(), environment.getIOManager(), true, 1.0, new Configuration(), (TaskInvokable)new DummyInvokable(), new ExecutionConfig());){
            DataInputStatus inputStatus;
            VerifyingOutput output = new VerifyingOutput(keySelector);
            while ((inputStatus = sortingDataInput.emitNext(output)) != DataInputStatus.END_OF_INPUT) {
            }
            Assertions.assertThat((int)output.getSeenRecords()).isEqualTo(numberOfRecords);
        }
    }

    @Test
    void multiInputKeySorting() throws Exception {
        int numberOfRecords = 500000;
        GeneratedRecordsDataInput input1 = new GeneratedRecordsDataInput(numberOfRecords, 0);
        GeneratedRecordsDataInput input2 = new GeneratedRecordsDataInput(numberOfRecords, 1);
        KeySelector & Serializable keySelector = (KeySelector & Serializable)value -> (String)value.f1;
        try (MockEnvironment environment = MockEnvironment.builder().build();){
            MultiInputSortingDataInput.SelectableSortingInputs selectableSortingInputs = MultiInputSortingDataInput.wrapInputs((TaskInvokable)new DummyInvokable(), (StreamTaskInput[])new StreamTaskInput[]{input1, input2}, (KeySelector[])new KeySelector[]{keySelector, keySelector}, (TypeSerializer[])new TypeSerializer[]{GeneratedRecordsDataInput.SERIALIZER, GeneratedRecordsDataInput.SERIALIZER}, (TypeSerializer)new StringSerializer(), (StreamTaskInput[])new StreamTaskInput[0], (MemoryManager)environment.getMemoryManager(), (IOManager)environment.getIOManager(), (boolean)true, (double)1.0, (Configuration)new Configuration(), (ExecutionConfig)new ExecutionConfig());
            StreamTaskInput[] sortingDataInputs = selectableSortingInputs.getSortedInputs();
            try (StreamTaskInput sortedInput1 = sortingDataInputs[0];
                 StreamTaskInput sortedInput2 = sortingDataInputs[1];){
                DataInputStatus inputStatus;
                VerifyingOutput output = new VerifyingOutput(keySelector);
                StreamMultipleInputProcessor multiSortedProcessor = new StreamMultipleInputProcessor(new MultipleInputSelectionHandler(selectableSortingInputs.getInputSelectable(), 2), new StreamOneInputProcessor[]{new StreamOneInputProcessor(sortedInput1, output, (BoundedMultiInput)new DummyOperatorChain()), new StreamOneInputProcessor(sortedInput2, output, (BoundedMultiInput)new DummyOperatorChain())});
                while ((inputStatus = multiSortedProcessor.processInput()) != DataInputStatus.END_OF_INPUT) {
                }
                Assertions.assertThat((int)output.getSeenRecords()).isEqualTo(numberOfRecords * 2);
            }
        }
    }

    private static class DummyOperatorChain
    implements BoundedMultiInput {
        private DummyOperatorChain() {
        }

        public void endInput(int inputId) throws Exception {
        }
    }

    private static final class GeneratedRecordsDataInput
    implements StreamTaskInput<Tuple3<Integer, String, byte[]>> {
        private static final TypeSerializer<Tuple3<Integer, String, byte[]>> SERIALIZER = new TupleSerializer(Tuple3.class, new TypeSerializer[]{new IntSerializer(), new StringSerializer(), new BytePrimitiveArraySerializer()});
        private static final String ALPHA_NUM = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
        private final long numberOfRecords;
        private final int inputIdx;
        private int recordsGenerated;
        private final Random rnd = new Random();
        private final byte[] buffer;

        private GeneratedRecordsDataInput(int numberOfRecords, int inputIdx) {
            this.numberOfRecords = numberOfRecords;
            this.recordsGenerated = 0;
            this.buffer = new byte[500];
            this.rnd.nextBytes(this.buffer);
            this.inputIdx = inputIdx;
        }

        public DataInputStatus emitNext(PushingAsyncDataInput.DataOutput<Tuple3<Integer, String, byte[]>> output) throws Exception {
            if ((long)this.recordsGenerated == this.numberOfRecords) {
                ++this.recordsGenerated;
                return DataInputStatus.END_OF_DATA;
            }
            if ((long)this.recordsGenerated > this.numberOfRecords) {
                return DataInputStatus.END_OF_INPUT;
            }
            output.emitRecord(new StreamRecord((Object)Tuple3.of((Object)this.rnd.nextInt(), (Object)this.randomString(this.rnd.nextInt(256)), (Object)this.buffer), 1L));
            if ((long)this.recordsGenerated++ == this.numberOfRecords) {
                return DataInputStatus.END_OF_DATA;
            }
            return DataInputStatus.MORE_AVAILABLE;
        }

        public CompletableFuture<?> getAvailableFuture() {
            return AvailabilityProvider.AVAILABLE;
        }

        private String randomString(int len) {
            StringBuilder sb = new StringBuilder(len);
            for (int i = 0; i < len; ++i) {
                sb.append(ALPHA_NUM.charAt(this.rnd.nextInt(ALPHA_NUM.length())));
            }
            return sb.toString();
        }

        public int getInputIndex() {
            return this.inputIdx;
        }

        public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws CheckpointException {
            return CompletableFuture.completedFuture(null);
        }

        public void close() throws IOException {
        }
    }

    private static final class VerifyingOutput<E>
    implements PushingAsyncDataInput.DataOutput<Tuple3<Integer, String, byte[]>> {
        private final KeySelector<Tuple3<Integer, String, byte[]>, E> keySelector;
        private final Set<E> seenKeys = new LinkedHashSet();
        private E currentKey = null;
        private int seenRecords = 0;

        private VerifyingOutput(KeySelector<Tuple3<Integer, String, byte[]>, E> keySelector) {
            this.keySelector = keySelector;
        }

        public void emitRecord(StreamRecord<Tuple3<Integer, String, byte[]>> streamRecord) throws Exception {
            ++this.seenRecords;
            Object incomingKey = this.keySelector.getKey((Object)((Tuple3)streamRecord.getValue()));
            if (!Objects.equals(incomingKey, this.currentKey)) {
                ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.seenKeys.add(incomingKey)).as("Received an out of order key: " + incomingKey, new Object[0])).isTrue();
                this.currentKey = incomingKey;
            }
        }

        public void emitWatermark(Watermark watermark) throws Exception {
        }

        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
        }

        public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
        }

        public void emitRecordAttributes(RecordAttributes recordAttributes) throws Exception {
        }

        public int getSeenRecords() {
            return this.seenRecords;
        }
    }
}

