/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.JoinDriver;
import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
import org.apache.flink.runtime.testutils.recordutils.RecordPairComparatorFactory;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AtomicBooleanAssert;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.TestTemplate;

class JoinTaskTest
extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
    private static final long HASH_MEM = 0x600000L;
    private static final long SORT_MEM = 0x300000L;
    private static final int NUM_SORTER = 2;
    private static final long BNLJN_MEM = 327680L;
    private final double bnljn_frac;
    private final double hash_frac;
    private final RecordComparator comparator1 = new RecordComparator(new int[]{0}, new Class[]{IntValue.class});
    private final RecordComparator comparator2 = new RecordComparator(new int[]{0}, new Class[]{IntValue.class});
    private final List<Record> outList = new ArrayList<Record>();

    JoinTaskTest(ExecutionConfig config) {
        super(config, 0x600000L, 2, 0x300000L);
        this.bnljn_frac = 327680.0 / (double)this.getMemoryManager().getMemorySize();
        this.hash_frac = 6291456.0 / (double)this.getMemoryManager().getMemorySize();
    }

    @TestTemplate
    void testSortBoth1MatchTask() {
        int keyCnt1 = 20;
        boolean valCnt1 = true;
        int keyCnt2 = 10;
        int valCnt2 = 2;
        this.setOutput(this.outList);
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
        this.getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        this.setNumFileHandlesForSort(4);
        JoinDriver testTask = new JoinDriver();
        try {
            this.addInputSorted(new UniformRecordGenerator(20, 1, false), this.comparator1.duplicate());
            this.addInputSorted(new UniformRecordGenerator(10, 2, false), this.comparator2.duplicate());
            this.testDriver((Driver)testTask, MockMatchStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"The test caused an exception.");
        }
        int expCnt = 2 * Math.min(20, 10);
        ((ListAssert)Assertions.assertThat(this.outList).withFailMessage("Resultset size was %d. Expected was %d", new Object[]{this.outList.size(), expCnt})).hasSize(expCnt);
        this.outList.clear();
    }

    @TestTemplate
    void testSortBoth2MatchTask() {
        int keyCnt1 = 20;
        int valCnt1 = 1;
        int keyCnt2 = 20;
        int valCnt2 = 1;
        this.setOutput(this.outList);
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
        this.getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        this.setNumFileHandlesForSort(4);
        JoinDriver testTask = new JoinDriver();
        try {
            this.addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
            this.addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
            this.testDriver((Driver)testTask, MockMatchStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"The test caused an exception.");
        }
        int expCnt = valCnt1 * valCnt2 * Math.min(keyCnt1, keyCnt2);
        ((ListAssert)Assertions.assertThat(this.outList).withFailMessage("Resultset size was %d. Expected was %d", new Object[]{this.outList.size(), expCnt})).hasSize(expCnt);
        this.outList.clear();
    }

    @TestTemplate
    void testSortBoth3MatchTask() {
        int keyCnt1 = 20;
        int valCnt1 = 1;
        int keyCnt2 = 20;
        int valCnt2 = 20;
        this.setOutput(this.outList);
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
        this.getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        this.setNumFileHandlesForSort(4);
        JoinDriver testTask = new JoinDriver();
        try {
            this.addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
            this.addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
            this.testDriver((Driver)testTask, MockMatchStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"The test caused an exception.");
        }
        int expCnt = valCnt1 * valCnt2 * Math.min(keyCnt1, keyCnt2);
        ((ListAssert)Assertions.assertThat(this.outList).withFailMessage("Resultset size was %d. Expected was %d", new Object[]{this.outList.size(), expCnt})).hasSize(expCnt);
        this.outList.clear();
    }

    @TestTemplate
    void testSortBoth4MatchTask() {
        int keyCnt1 = 20;
        int valCnt1 = 20;
        int keyCnt2 = 20;
        int valCnt2 = 1;
        this.setOutput(this.outList);
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
        this.getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        this.setNumFileHandlesForSort(4);
        JoinDriver testTask = new JoinDriver();
        try {
            this.addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
            this.addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
            this.testDriver((Driver)testTask, MockMatchStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"The test caused an exception.");
        }
        int expCnt = valCnt1 * valCnt2 * Math.min(keyCnt1, keyCnt2);
        ((ListAssert)Assertions.assertThat(this.outList).withFailMessage("Resultset size was %d. Expected was %d", new Object[]{this.outList.size(), expCnt})).hasSize(expCnt);
        this.outList.clear();
    }

    @TestTemplate
    void testSortBoth5MatchTask() {
        int keyCnt1 = 20;
        int valCnt1 = 20;
        int keyCnt2 = 20;
        int valCnt2 = 20;
        this.setOutput(this.outList);
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
        this.getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        this.setNumFileHandlesForSort(4);
        JoinDriver testTask = new JoinDriver();
        try {
            this.addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
            this.addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
            this.testDriver((Driver)testTask, MockMatchStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"The test caused an exception.");
        }
        int expCnt = valCnt1 * valCnt2 * Math.min(keyCnt1, keyCnt2);
        ((ListAssert)Assertions.assertThat(this.outList).withFailMessage("Resultset size was %d. Expected was %d", new Object[]{this.outList.size(), expCnt})).hasSize(expCnt);
        this.outList.clear();
    }

    @TestTemplate
    void testSortFirstMatchTask() {
        int keyCnt1 = 20;
        int valCnt1 = 20;
        int keyCnt2 = 20;
        int valCnt2 = 20;
        this.setOutput(this.outList);
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
        this.getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        this.setNumFileHandlesForSort(4);
        JoinDriver testTask = new JoinDriver();
        try {
            this.addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
            this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
            this.testDriver((Driver)testTask, MockMatchStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"The test caused an exception.");
        }
        int expCnt = valCnt1 * valCnt2 * Math.min(keyCnt1, keyCnt2);
        ((ListAssert)Assertions.assertThat(this.outList).withFailMessage("Resultset size was %d. Expected was %d", new Object[]{this.outList.size(), expCnt})).hasSize(expCnt);
        this.outList.clear();
    }

    @TestTemplate
    void testSortSecondMatchTask() {
        int keyCnt1 = 20;
        int valCnt1 = 20;
        int keyCnt2 = 20;
        int valCnt2 = 20;
        this.setOutput(this.outList);
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
        this.getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        this.setNumFileHandlesForSort(4);
        JoinDriver testTask = new JoinDriver();
        try {
            this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
            this.addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
            this.testDriver((Driver)testTask, MockMatchStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"The test caused an exception.");
        }
        int expCnt = valCnt1 * valCnt2 * Math.min(keyCnt1, keyCnt2);
        ((ListAssert)Assertions.assertThat(this.outList).withFailMessage("Resultset size was %d. Expected was %d", new Object[]{this.outList.size(), expCnt})).hasSize(expCnt);
        this.outList.clear();
    }

    @TestTemplate
    void testMergeMatchTask() {
        int keyCnt1 = 20;
        int valCnt1 = 20;
        int keyCnt2 = 20;
        int valCnt2 = 20;
        this.setOutput(this.outList);
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
        this.getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        this.setNumFileHandlesForSort(4);
        JoinDriver testTask = new JoinDriver();
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
        try {
            this.testDriver((Driver)testTask, MockMatchStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"The test caused an exception.");
        }
        int expCnt = valCnt1 * valCnt2 * Math.min(keyCnt1, keyCnt2);
        ((ListAssert)Assertions.assertThat(this.outList).withFailMessage("Resultset size was %d. Expected was %d", new Object[]{this.outList.size(), expCnt})).hasSize(expCnt);
        this.outList.clear();
    }

    @TestTemplate
    void testFailingMatchTask() {
        int keyCnt1 = 20;
        int valCnt1 = 20;
        int keyCnt2 = 20;
        int valCnt2 = 20;
        this.setOutput(new NirvanaOutputList());
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
        this.getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        this.setNumFileHandlesForSort(4);
        JoinDriver testTask = new JoinDriver();
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
        Assertions.assertThatThrownBy(() -> this.testDriver((Driver)testTask, MockFailingMatchStub.class)).isInstanceOf(ExpectedTestException.class);
    }

    @TestTemplate
    void testCancelMatchTaskWhileSort1() {
        int keyCnt = 20;
        int valCnt = 20;
        try {
            this.setOutput(new NirvanaOutputList());
            this.addDriverComparator(this.comparator1);
            this.addDriverComparator(this.comparator2);
            this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
            this.getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
            this.getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
            this.setNumFileHandlesForSort(4);
            final JoinDriver testTask = new JoinDriver();
            try {
                this.addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
                this.addInput(new UniformRecordGenerator(20, 20, true));
            }
            catch (Exception e) {
                e.printStackTrace();
                Assertions.fail((String)"The test caused an exception.");
            }
            final AtomicReference error = new AtomicReference();
            Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileSort1()"){

                @Override
                public void run() {
                    try {
                        JoinTaskTest.this.testDriver((Driver)testTask, MockMatchStub.class);
                    }
                    catch (Throwable t) {
                        error.set(t);
                    }
                }
            };
            taskRunner.start();
            Thread.sleep(1000L);
            this.cancel();
            taskRunner.interrupt();
            taskRunner.join(60000L);
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)taskRunner.isAlive()).withFailMessage("Task thread did not finish within 60 seconds", new Object[0])).isFalse();
            Throwable taskError = (Throwable)error.get();
            ((AbstractThrowableAssert)Assertions.assertThat((Throwable)taskError).withFailMessage("Error in task while canceling: %s", new Object[]{taskError})).isNull();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @TestTemplate
    void testCancelMatchTaskWhileSort2() {
        int keyCnt = 20;
        int valCnt = 20;
        try {
            this.setOutput(new NirvanaOutputList());
            this.addDriverComparator(this.comparator1);
            this.addDriverComparator(this.comparator2);
            this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
            this.getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
            this.getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
            this.setNumFileHandlesForSort(4);
            final JoinDriver testTask = new JoinDriver();
            try {
                this.addInput(new UniformRecordGenerator(20, 20, true));
                this.addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
            }
            catch (Exception e) {
                e.printStackTrace();
                Assertions.fail((String)"The test caused an exception.");
            }
            final AtomicReference error = new AtomicReference();
            Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileSort2()"){

                @Override
                public void run() {
                    try {
                        JoinTaskTest.this.testDriver((Driver)testTask, MockMatchStub.class);
                    }
                    catch (Throwable t) {
                        error.set(t);
                    }
                }
            };
            taskRunner.start();
            Thread.sleep(1000L);
            this.cancel();
            taskRunner.interrupt();
            taskRunner.join(60000L);
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)taskRunner.isAlive()).withFailMessage("Task thread did not finish within 60 seconds", new Object[0])).isFalse();
            Throwable taskError = (Throwable)error.get();
            ((AbstractThrowableAssert)Assertions.assertThat((Throwable)taskError).withFailMessage("Error in task while canceling: %s", new Object[]{taskError})).isNull();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @TestTemplate
    void testCancelMatchTaskWhileMatching() {
        int keyCnt = 20;
        int valCnt = 20;
        try {
            this.setOutput(new NirvanaOutputList());
            this.addDriverComparator(this.comparator1);
            this.addDriverComparator(this.comparator2);
            this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
            this.getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
            this.getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
            this.setNumFileHandlesForSort(4);
            final JoinDriver testTask = new JoinDriver();
            this.addInput(new UniformRecordGenerator(20, 20, true));
            this.addInput(new UniformRecordGenerator(20, 20, true));
            final AtomicReference error = new AtomicReference();
            Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileMatching()"){

                @Override
                public void run() {
                    try {
                        JoinTaskTest.this.testDriver((Driver)testTask, MockDelayingMatchStub.class);
                    }
                    catch (Throwable t) {
                        error.set(t);
                    }
                }
            };
            taskRunner.start();
            Thread.sleep(1000L);
            this.cancel();
            taskRunner.interrupt();
            taskRunner.join(60000L);
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)taskRunner.isAlive()).withFailMessage("Task thread did not finish within 60 seconds", new Object[0])).isFalse();
            Throwable taskError = (Throwable)error.get();
            ((AbstractThrowableAssert)Assertions.assertThat((Throwable)taskError).withFailMessage("Error in task while canceling:\n%s", new Object[]{taskError})).isNull();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @TestTemplate
    void testHash1MatchTask() {
        int keyCnt1 = 20;
        int valCnt1 = 1;
        int keyCnt2 = 10;
        int valCnt2 = 2;
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.setOutput(this.outList);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
        this.getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
        JoinDriver testTask = new JoinDriver();
        try {
            this.testDriver((Driver)testTask, MockMatchStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"Test caused an exception.");
        }
        int expCnt = valCnt1 * valCnt2 * Math.min(keyCnt1, keyCnt2);
        Assertions.assertThat(this.outList).hasSize(expCnt);
        this.outList.clear();
    }

    @TestTemplate
    void testHash2MatchTask() {
        int keyCnt1 = 20;
        int valCnt1 = 1;
        int keyCnt2 = 20;
        int valCnt2 = 1;
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.setOutput(this.outList);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        this.getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
        JoinDriver testTask = new JoinDriver();
        try {
            this.testDriver((Driver)testTask, MockMatchStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"Test caused an exception.");
        }
        int expCnt = valCnt1 * valCnt2 * Math.min(keyCnt1, keyCnt2);
        ((ListAssert)Assertions.assertThat(this.outList).withFailMessage("Wrong result set size.", new Object[0])).hasSize(expCnt);
        this.outList.clear();
    }

    @TestTemplate
    void testHash3MatchTask() {
        int keyCnt1 = 20;
        int valCnt1 = 1;
        int keyCnt2 = 20;
        int valCnt2 = 20;
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.setOutput(this.outList);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
        this.getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
        JoinDriver testTask = new JoinDriver();
        try {
            this.testDriver((Driver)testTask, MockMatchStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"Test caused an exception.");
        }
        int expCnt = valCnt1 * valCnt2 * Math.min(keyCnt1, keyCnt2);
        ((ListAssert)Assertions.assertThat(this.outList).withFailMessage("Wrong result set size.", new Object[0])).hasSize(expCnt);
        this.outList.clear();
    }

    @TestTemplate
    void testHash4MatchTask() {
        int keyCnt1 = 20;
        int valCnt1 = 20;
        int keyCnt2 = 20;
        int valCnt2 = 1;
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.setOutput(this.outList);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        this.getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
        JoinDriver testTask = new JoinDriver();
        try {
            this.testDriver((Driver)testTask, MockMatchStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"Test caused an exception.");
        }
        int expCnt = valCnt1 * valCnt2 * Math.min(keyCnt1, keyCnt2);
        ((ListAssert)Assertions.assertThat(this.outList).withFailMessage("Wrong result set size.", new Object[0])).hasSize(expCnt);
        this.outList.clear();
    }

    @TestTemplate
    void testHash5MatchTask() {
        int keyCnt1 = 20;
        int valCnt1 = 20;
        int keyCnt2 = 20;
        int valCnt2 = 20;
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.setOutput(this.outList);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
        this.getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
        JoinDriver testTask = new JoinDriver();
        try {
            this.testDriver((Driver)testTask, MockMatchStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)"Test caused an exception.");
        }
        int expCnt = valCnt1 * valCnt2 * Math.min(keyCnt1, keyCnt2);
        ((ListAssert)Assertions.assertThat(this.outList).withFailMessage("Wrong result set size.", new Object[0])).hasSize(expCnt);
        this.outList.clear();
    }

    @TestTemplate
    void testFailingHashFirstMatchTask() {
        int keyCnt1 = 20;
        int valCnt1 = 20;
        int keyCnt2 = 20;
        int valCnt2 = 20;
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.setOutput(new NirvanaOutputList());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
        this.getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
        JoinDriver testTask = new JoinDriver();
        Assertions.assertThatThrownBy(() -> this.testDriver((Driver)testTask, MockFailingMatchStub.class)).isInstanceOf(ExpectedTestException.class);
    }

    @TestTemplate
    void testFailingHashSecondMatchTask() {
        int keyCnt1 = 20;
        int valCnt1 = 20;
        int keyCnt2 = 20;
        int valCnt2 = 20;
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.setOutput(new NirvanaOutputList());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        this.getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
        JoinDriver testTask = new JoinDriver();
        Assertions.assertThatThrownBy(() -> this.testDriver((Driver)testTask, MockFailingMatchStub.class)).isInstanceOf(ExpectedTestException.class);
    }

    @TestTemplate
    void testCancelHashMatchTaskWhileBuildFirst() {
        int keyCnt = 20;
        int valCnt = 20;
        try {
            this.addInput(new DelayingInfinitiveInputIterator(100));
            this.addInput(new UniformRecordGenerator(20, 20, false));
            this.addDriverComparator(this.comparator1);
            this.addDriverComparator(this.comparator2);
            this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
            this.setOutput(new NirvanaOutputList());
            this.getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
            this.getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
            final JoinDriver testTask = new JoinDriver();
            final AtomicBoolean success = new AtomicBoolean(false);
            Thread taskRunner = new Thread(){

                @Override
                public void run() {
                    try {
                        JoinTaskTest.this.testDriver((Driver)testTask, MockMatchStub.class);
                        success.set(true);
                    }
                    catch (Exception ie) {
                        ie.printStackTrace();
                    }
                }
            };
            taskRunner.start();
            Thread.sleep(1000L);
            this.cancel();
            try {
                taskRunner.join();
            }
            catch (InterruptedException ie) {
                Assertions.fail((String)"Joining threads failed");
            }
            ((AtomicBooleanAssert)Assertions.assertThat((AtomicBoolean)success).withFailMessage("Test threw an exception even though it was properly canceled.", new Object[0])).isTrue();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @TestTemplate
    void testHashCancelMatchTaskWhileBuildSecond() {
        int keyCnt = 20;
        int valCnt = 20;
        try {
            this.addInput(new UniformRecordGenerator(20, 20, false));
            this.addInput(new DelayingInfinitiveInputIterator(100));
            this.addDriverComparator(this.comparator1);
            this.addDriverComparator(this.comparator2);
            this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
            this.setOutput(new NirvanaOutputList());
            this.getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
            this.getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
            final JoinDriver testTask = new JoinDriver();
            final AtomicBoolean success = new AtomicBoolean(false);
            Thread taskRunner = new Thread(){

                @Override
                public void run() {
                    try {
                        JoinTaskTest.this.testDriver((Driver)testTask, MockMatchStub.class);
                        success.set(true);
                    }
                    catch (Exception ie) {
                        ie.printStackTrace();
                    }
                }
            };
            taskRunner.start();
            Thread.sleep(1000L);
            this.cancel();
            try {
                taskRunner.join();
            }
            catch (InterruptedException ie) {
                Assertions.fail((String)"Joining threads failed");
            }
            ((AtomicBooleanAssert)Assertions.assertThat((AtomicBoolean)success).withFailMessage("Test threw an exception even though it was properly canceled.", new Object[0])).isTrue();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @TestTemplate
    void testHashFirstCancelMatchTaskWhileMatching() {
        int keyCnt = 20;
        int valCnt = 20;
        this.addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
        this.addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.setOutput(new NirvanaOutputList());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
        this.getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
        final JoinDriver testTask = new JoinDriver();
        final AtomicBoolean success = new AtomicBoolean(false);
        Thread taskRunner = new Thread(){

            @Override
            public void run() {
                try {
                    JoinTaskTest.this.testDriver((Driver)testTask, MockMatchStub.class);
                    success.set(true);
                }
                catch (Exception ie) {
                    ie.printStackTrace();
                }
            }
        };
        taskRunner.start();
        TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
        tct.start();
        try {
            tct.join();
            taskRunner.join();
        }
        catch (InterruptedException ie) {
            Assertions.fail((String)"Joining threads failed");
        }
        ((AtomicBooleanAssert)Assertions.assertThat((AtomicBoolean)success).withFailMessage("Test threw an exception even though it was properly canceled.", new Object[0])).isTrue();
    }

    @TestTemplate
    void testHashSecondCancelMatchTaskWhileMatching() {
        int keyCnt = 20;
        int valCnt = 20;
        this.addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
        this.addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.setOutput(new NirvanaOutputList());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        this.getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
        final JoinDriver testTask = new JoinDriver();
        final AtomicBoolean success = new AtomicBoolean(false);
        Thread taskRunner = new Thread(){

            @Override
            public void run() {
                try {
                    JoinTaskTest.this.testDriver((Driver)testTask, MockMatchStub.class);
                    success.set(true);
                }
                catch (Exception ie) {
                    ie.printStackTrace();
                }
            }
        };
        taskRunner.start();
        TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
        tct.start();
        try {
            tct.join();
            taskRunner.join();
        }
        catch (InterruptedException ie) {
            Assertions.fail((String)"Joining threads failed");
        }
        ((AtomicBooleanAssert)Assertions.assertThat((AtomicBoolean)success).withFailMessage("Test threw an exception even though it was properly canceled.", new Object[0])).isTrue();
    }

    public static final class MockDelayingMatchStub
    implements FlatJoinFunction<Record, Record, Record> {
        private static final long serialVersionUID = 1L;

        public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    public static final class MockFailingMatchStub
    implements FlatJoinFunction<Record, Record, Record> {
        private static final long serialVersionUID = 1L;
        private int cnt = 0;

        public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
            if (++this.cnt >= 10) {
                throw new ExpectedTestException();
            }
            out.collect((Object)record1);
        }
    }

    public static final class MockMatchStub
    implements FlatJoinFunction<Record, Record, Record> {
        private static final long serialVersionUID = 1L;

        public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
            out.collect((Object)record1);
        }
    }
}

