/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.UUID;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.DataSinkTask;
import org.apache.flink.runtime.operators.testutils.InfiniteInputIterator;
import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
import org.apache.flink.runtime.operators.testutils.TaskTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.runtime.testutils.recordutils.RecordComparatorFactory;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.AbstractFileAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.MapAssert;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class DataSinkTaskTest
extends TaskTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(DataSinkTaskTest.class);
    private static final int MEMORY_MANAGER_SIZE = 0x300000;
    private static final int NETWORK_BUFFER_SIZE = 1024;

    DataSinkTaskTest() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testDataSinkTask() {
        InputStreamReader fr = null;
        BufferedReader br = null;
        try {
            int keyCnt = 100;
            int valCnt = 20;
            super.initEnvironment(0x300000L, 1024);
            super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
            DataSinkTask testTask = new DataSinkTask((Environment)this.mockEnv);
            File tempTestFile = new File(this.tempFolder.toFile(), UUID.randomUUID().toString());
            super.registerFileOutputTask(MockOutputFormat.class, tempTestFile.toURI().toString(), new Configuration());
            testTask.invoke();
            ((AbstractFileAssert)Assertions.assertThat((File)tempTestFile).withFailMessage("Temp output file does not exist", new Object[0])).exists();
            fr = new FileReader(tempTestFile);
            br = new BufferedReader(fr);
            HashMap keyValueCountMap = new HashMap(keyCnt);
            while (br.ready()) {
                String line = br.readLine();
                Integer key = Integer.parseInt(line.substring(0, line.indexOf("_")));
                Integer val = Integer.parseInt(line.substring(line.indexOf("_") + 1, line.length()));
                if (!keyValueCountMap.containsKey(key)) {
                    keyValueCountMap.put(key, new HashSet());
                }
                ((HashSet)keyValueCountMap.get(key)).add(val);
            }
            ((MapAssert)Assertions.assertThat(keyValueCountMap).withFailMessage("Invalid key count in out file. Expected: %d Actual: %d", new Object[]{keyCnt, keyValueCountMap.size()})).hasSize(keyCnt);
            for (Integer key : keyValueCountMap.keySet()) {
                ((AbstractCollectionAssert)Assertions.assertThat((Collection)((Collection)keyValueCountMap.get(key))).withFailMessage("Invalid value count for key: %d. Expected: %d Actual: %d", new Object[]{key, valCnt, ((HashSet)keyValueCountMap.get(key)).size()})).hasSize(valCnt);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
        finally {
            if (br != null) {
                try {
                    br.close();
                }
                catch (Throwable throwable) {}
            }
            if (fr != null) {
                try {
                    fr.close();
                }
                catch (Throwable throwable) {}
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testUnionDataSinkTask() {
        int keyCnt = 10;
        int valCnt = 20;
        super.initEnvironment(0x300000L, 1024);
        IteratorWrappingTestSingleInputGate[] readers = new IteratorWrappingTestSingleInputGate[]{super.addInput(new UniformRecordGenerator(keyCnt, valCnt, 0, 0, false), 0, false), super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt, 0, false), 0, false), super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt * 2, 0, false), 0, false), super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt * 3, 0, false), 0, false)};
        DataSinkTask testTask = new DataSinkTask((Environment)this.mockEnv);
        File tempTestFile = new File(this.tempFolder.toFile(), UUID.randomUUID().toString());
        super.registerFileOutputTask(MockOutputFormat.class, tempTestFile.toURI().toString(), new Configuration());
        try {
            for (IteratorWrappingTestSingleInputGate reader : readers) {
                reader.notifyNonEmpty();
            }
            testTask.invoke();
        }
        catch (Exception e) {
            LOG.debug("Exception while invoking the test task.", (Throwable)e);
            Assertions.fail((String)"Invoke method caused exception.");
        }
        ((AbstractFileAssert)Assertions.assertThat((File)tempTestFile).withFailMessage("Temp output file does not exist", new Object[0])).exists();
        FileReader fr = null;
        BufferedReader br = null;
        try {
            fr = new FileReader(tempTestFile);
            br = new BufferedReader(fr);
            HashMap keyValueCountMap = new HashMap(keyCnt);
            while (br.ready()) {
                String line = br.readLine();
                Integer key = Integer.parseInt(line.substring(0, line.indexOf("_")));
                Integer val = Integer.parseInt(line.substring(line.indexOf("_") + 1, line.length()));
                if (!keyValueCountMap.containsKey(key)) {
                    keyValueCountMap.put(key, new HashSet());
                }
                ((HashSet)keyValueCountMap.get(key)).add(val);
            }
            ((MapAssert)Assertions.assertThat(keyValueCountMap).withFailMessage("Invalid key count in out file. Expected: %d Actual: %d", new Object[]{keyCnt * 4, keyValueCountMap.size()})).hasSize(keyCnt * 4);
            for (Integer key : keyValueCountMap.keySet()) {
                ((AbstractCollectionAssert)Assertions.assertThat((Collection)((Collection)keyValueCountMap.get(key))).withFailMessage("Invalid value count for key: %d. Expected: %d Actual: %d", new Object[]{key, valCnt, ((HashSet)keyValueCountMap.get(key)).size()})).hasSize(valCnt);
            }
        }
        catch (FileNotFoundException e) {
            Assertions.fail((String)"Out file got lost...");
        }
        catch (IOException ioe) {
            Assertions.fail((String)"Caught IOE while reading out file");
        }
        finally {
            if (br != null) {
                try {
                    br.close();
                }
                catch (Throwable e) {}
            }
            if (fr != null) {
                try {
                    fr.close();
                }
                catch (Throwable e) {}
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testSortingDataSinkTask() {
        int keyCnt = 100;
        int valCnt = 20;
        double memoryFraction = 1.0;
        super.initEnvironment(0x300000L, 1024);
        super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0);
        DataSinkTask testTask = new DataSinkTask((Environment)this.mockEnv);
        super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT);
        super.getTaskConfig().setInputComparator((TypeComparatorFactory)new RecordComparatorFactory(new int[]{1}, new Class[]{IntValue.class}), 0);
        super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction);
        super.getTaskConfig().setFilehandlesInput(0, 8);
        super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
        File tempTestFile = new File(this.tempFolder.toFile(), UUID.randomUUID().toString());
        super.registerFileOutputTask(MockOutputFormat.class, tempTestFile.toURI().toString(), new Configuration());
        try {
            testTask.invoke();
        }
        catch (Exception e) {
            LOG.debug("Exception while invoking the test task.", (Throwable)e);
            Assertions.fail((String)"Invoke method caused exception.");
        }
        ((AbstractFileAssert)Assertions.assertThat((File)tempTestFile).withFailMessage("Temp output file does not exist", new Object[0])).exists();
        FileReader fr = null;
        BufferedReader br = null;
        try {
            fr = new FileReader(tempTestFile);
            br = new BufferedReader(fr);
            HashSet<Integer> keys = new HashSet<Integer>();
            int curVal = -1;
            while (br.ready()) {
                String line = br.readLine();
                Integer key = Integer.parseInt(line.substring(0, line.indexOf("_")));
                Integer val = Integer.parseInt(line.substring(line.indexOf("_") + 1, line.length()));
                ((AbstractIntegerAssert)Assertions.assertThat((Integer)val).withFailMessage("Values not in ascending order", new Object[0])).isGreaterThanOrEqualTo(curVal);
                if (val > curVal) {
                    if (curVal != -1) {
                        ((AbstractCollectionAssert)Assertions.assertThat(keys).withFailMessage("Keys missing for value", new Object[0])).hasSize(100);
                    }
                    keys.clear();
                    curVal = val;
                }
                ((AbstractBooleanAssert)Assertions.assertThat((boolean)keys.add(key)).withFailMessage("Duplicate key for value", new Object[0])).isTrue();
            }
        }
        catch (FileNotFoundException e) {
            Assertions.fail((String)"Out file got lost...");
        }
        catch (IOException ioe) {
            Assertions.fail((String)"Caught IOE while reading out file");
        }
        finally {
            if (br != null) {
                try {
                    br.close();
                }
                catch (Throwable e) {}
            }
            if (fr != null) {
                try {
                    fr.close();
                }
                catch (Throwable e) {}
            }
        }
    }

    @Test
    void testFailingDataSinkTask() {
        int keyCnt = 100;
        int valCnt = 20;
        super.initEnvironment(0x300000L, 1024);
        super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
        DataSinkTask testTask = new DataSinkTask((Environment)this.mockEnv);
        Configuration stubParams = new Configuration();
        File tempTestFile = new File(this.tempFolder.toFile(), UUID.randomUUID().toString());
        super.registerFileOutputTask(MockFailingOutputFormat.class, tempTestFile.toURI().toString(), stubParams);
        boolean stubFailed = false;
        try {
            testTask.invoke();
        }
        catch (Exception e) {
            stubFailed = true;
        }
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)stubFailed).withFailMessage("Function exception was not forwarded.", new Object[0])).isTrue();
        ((AbstractFileAssert)Assertions.assertThat((File)tempTestFile).withFailMessage("Temp output file has not been removed", new Object[0])).doesNotExist();
    }

    @Test
    void testFailingSortingDataSinkTask() {
        int keyCnt = 100;
        int valCnt = 20;
        double memoryFraction = 1.0;
        super.initEnvironment(0x300000L, 1024);
        super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0);
        DataSinkTask testTask = new DataSinkTask((Environment)this.mockEnv);
        Configuration stubParams = new Configuration();
        super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT);
        super.getTaskConfig().setInputComparator((TypeComparatorFactory)new RecordComparatorFactory(new int[]{1}, new Class[]{IntValue.class}), 0);
        super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction);
        super.getTaskConfig().setFilehandlesInput(0, 8);
        super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
        File tempTestFile = new File(this.tempFolder.toFile(), UUID.randomUUID().toString());
        super.registerFileOutputTask(MockFailingOutputFormat.class, tempTestFile.toURI().toString(), stubParams);
        boolean stubFailed = false;
        try {
            testTask.invoke();
        }
        catch (Exception e) {
            stubFailed = true;
        }
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)stubFailed).withFailMessage("Function exception was not forwarded.", new Object[0])).isTrue();
        ((AbstractFileAssert)Assertions.assertThat((File)tempTestFile).withFailMessage("Temp output file has not been removed", new Object[0])).doesNotExist();
    }

    @Test
    void testCancelDataSinkTask() throws Exception {
        super.initEnvironment(0x300000L, 1024);
        super.addInput(new InfiniteInputIterator(), 0);
        final DataSinkTask testTask = new DataSinkTask((Environment)this.mockEnv);
        Configuration stubParams = new Configuration();
        File tempTestFile = new File(this.tempFolder.toFile(), UUID.randomUUID().toString());
        super.registerFileOutputTask(MockOutputFormat.class, tempTestFile.toURI().toString(), stubParams);
        Thread taskRunner = new Thread(){

            @Override
            public void run() {
                try {
                    testTask.invoke();
                }
                catch (Exception ie) {
                    ie.printStackTrace();
                    Assertions.fail((String)"Task threw exception although it was properly canceled");
                }
            }
        };
        taskRunner.start();
        long deadline = System.currentTimeMillis() + 60000L;
        while (!tempTestFile.exists() && System.currentTimeMillis() < deadline) {
            Thread.sleep(10L);
        }
        ((AbstractFileAssert)Assertions.assertThat((File)tempTestFile).withFailMessage("Task did not create file within 60 seconds", new Object[0])).exists();
        Thread.sleep(500L);
        testTask.cancel();
        taskRunner.interrupt();
        taskRunner.join();
        ((AbstractFileAssert)Assertions.assertThat((File)tempTestFile).withFailMessage("Task did not create file within 60 seconds", new Object[0])).doesNotExist();
    }

    @Test
    void testCancelSortingDataSinkTask() {
        double memoryFraction = 1.0;
        super.initEnvironment(0x300000L, 1024);
        super.addInput(new InfiniteInputIterator(), 0);
        final DataSinkTask testTask = new DataSinkTask((Environment)this.mockEnv);
        Configuration stubParams = new Configuration();
        super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT);
        super.getTaskConfig().setInputComparator((TypeComparatorFactory)new RecordComparatorFactory(new int[]{1}, new Class[]{IntValue.class}), 0);
        super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction);
        super.getTaskConfig().setFilehandlesInput(0, 8);
        super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
        File tempTestFile = new File(this.tempFolder.toFile(), UUID.randomUUID().toString());
        super.registerFileOutputTask(MockOutputFormat.class, tempTestFile.toURI().toString(), stubParams);
        Thread taskRunner = new Thread(){

            @Override
            public void run() {
                try {
                    testTask.invoke();
                }
                catch (Exception ie) {
                    ie.printStackTrace();
                    Assertions.fail((String)"Task threw exception although it was properly canceled");
                }
            }
        };
        taskRunner.start();
        TaskCancelThread tct = new TaskCancelThread(2, taskRunner, (AbstractInvokable)testTask);
        tct.start();
        try {
            tct.join();
            taskRunner.join();
        }
        catch (InterruptedException ie) {
            Assertions.fail((String)"Joining threads failed");
        }
    }

    public static class MockFailingOutputFormat
    extends MockOutputFormat {
        private static final long serialVersionUID = 1L;
        int cnt = 0;

        @Override
        public void configure(Configuration parameters) {
            super.configure(parameters);
        }

        @Override
        public void writeRecord(Record rec) throws IOException {
            if (++this.cnt >= 10) {
                throw new RuntimeException("Expected Test Exception");
            }
            super.writeRecord(rec);
        }
    }

    public static class MockOutputFormat
    extends FileOutputFormat<Record> {
        private static final long serialVersionUID = 1L;
        final StringBuilder bld = new StringBuilder();

        public void configure(Configuration parameters) {
            super.configure(parameters);
        }

        public void writeRecord(Record rec) throws IOException {
            IntValue key = (IntValue)rec.getField(0, IntValue.class);
            IntValue value = (IntValue)rec.getField(1, IntValue.class);
            this.bld.setLength(0);
            this.bld.append(key.getValue());
            this.bld.append('_');
            this.bld.append(value.getValue());
            this.bld.append('\n');
            byte[] bytes = this.bld.toString().getBytes(ConfigConstants.DEFAULT_CHARSET);
            this.stream.write(bytes);
        }
    }
}

