/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators.co;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Queue;
import java.util.stream.Collectors;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.co.IntervalJoinOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.Collector;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OutputTag;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.IterableAssert;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class IntervalJoinOperatorTest {
    private final boolean lhsFasterThanRhs;

    @Parameters(name="lhs faster than rhs: {0}")
    private static Collection<Object[]> data() {
        return Arrays.asList({true}, {false});
    }

    public IntervalJoinOperatorTest(boolean lhsFasterThanRhs) {
        this.lhsFasterThanRhs = lhsFasterThanRhs;
    }

    @TestTemplate
    void testImplementationMirrorsCorrectly() throws Exception {
        long lowerBound = 1L;
        long upperBound = 3L;
        boolean lowerBoundInclusive = true;
        boolean upperBoundInclusive = false;
        this.setupHarness(lowerBound, lowerBoundInclusive, upperBound, upperBoundInclusive).processElementsAndWatermarks(1, 4).andExpect(this.streamRecordOf(1L, 2L), this.streamRecordOf(1L, 3L), this.streamRecordOf(2L, 3L), this.streamRecordOf(2L, 4L), this.streamRecordOf(3L, 4L)).noLateRecords().close();
        this.setupHarness(-1L * upperBound, upperBoundInclusive, -1L * lowerBound, lowerBoundInclusive).processElementsAndWatermarks(1, 4).andExpect(this.streamRecordOf(2L, 1L), this.streamRecordOf(3L, 1L), this.streamRecordOf(3L, 2L), this.streamRecordOf(4L, 2L), this.streamRecordOf(4L, 3L)).noLateRecords().close();
    }

    @TestTemplate
    void testNegativeInclusiveAndNegativeInclusive() throws Exception {
        this.setupHarness(-2L, true, -1L, true).processElementsAndWatermarks(1, 4).andExpect(this.streamRecordOf(2L, 1L), this.streamRecordOf(3L, 1L), this.streamRecordOf(3L, 2L), this.streamRecordOf(4L, 2L), this.streamRecordOf(4L, 3L)).noLateRecords().close();
    }

    @TestTemplate
    void testNegativeInclusiveAndPositiveInclusive() throws Exception {
        this.setupHarness(-1L, true, 1L, true).processElementsAndWatermarks(1, 4).andExpect(this.streamRecordOf(1L, 1L), this.streamRecordOf(1L, 2L), this.streamRecordOf(2L, 1L), this.streamRecordOf(2L, 2L), this.streamRecordOf(2L, 3L), this.streamRecordOf(3L, 2L), this.streamRecordOf(3L, 3L), this.streamRecordOf(3L, 4L), this.streamRecordOf(4L, 3L), this.streamRecordOf(4L, 4L)).noLateRecords().close();
    }

    @TestTemplate
    void testPositiveInclusiveAndPositiveInclusive() throws Exception {
        this.setupHarness(1L, true, 2L, true).processElementsAndWatermarks(1, 4).andExpect(this.streamRecordOf(1L, 2L), this.streamRecordOf(1L, 3L), this.streamRecordOf(2L, 3L), this.streamRecordOf(2L, 4L), this.streamRecordOf(3L, 4L)).noLateRecords().close();
    }

    @TestTemplate
    void testNegativeExclusiveAndNegativeExlusive() throws Exception {
        this.setupHarness(-3L, false, -1L, false).processElementsAndWatermarks(1, 4).andExpect(this.streamRecordOf(3L, 1L), this.streamRecordOf(4L, 2L)).noLateRecords().close();
    }

    @TestTemplate
    void testNegativeExclusiveAndPositiveExlusive() throws Exception {
        this.setupHarness(-1L, false, 1L, false).processElementsAndWatermarks(1, 4).andExpect(this.streamRecordOf(1L, 1L), this.streamRecordOf(2L, 2L), this.streamRecordOf(3L, 3L), this.streamRecordOf(4L, 4L)).noLateRecords().close();
    }

    @TestTemplate
    void testPositiveExclusiveAndPositiveExlusive() throws Exception {
        this.setupHarness(1L, false, 3L, false).processElementsAndWatermarks(1, 4).andExpect(this.streamRecordOf(1L, 3L), this.streamRecordOf(2L, 4L)).noLateRecords().close();
    }

    @TestTemplate
    void testStateCleanupNegativeInclusiveNegativeInclusive() throws Exception {
        this.setupHarness(-1L, true, 0L, true).processElement1(1).processElement1(2).processElement1(3).processElement1(4).processElement1(5).processElement2(1).processElement2(2).processElement2(3).processElement2(4).processElement2(5).processWatermark1(1).processWatermark2(1).assertLeftBufferContainsOnly(2L, 3L, 4L, 5L).assertRightBufferContainsOnly(1L, 2L, 3L, 4L, 5L).processWatermark1(4).processWatermark2(4).assertLeftBufferContainsOnly(5L).assertRightBufferContainsOnly(4L, 5L).processWatermark1(6).processWatermark2(6).assertLeftBufferEmpty().assertRightBufferEmpty().close();
    }

    @TestTemplate
    void testStateCleanupNegativePositiveNegativeExlusive() throws Exception {
        this.setupHarness(-2L, false, 1L, false).processElement1(1).processElement1(2).processElement1(3).processElement1(4).processElement1(5).processElement2(1).processElement2(2).processElement2(3).processElement2(4).processElement2(5).processWatermark1(1).processWatermark2(1).assertLeftBufferContainsOnly(2L, 3L, 4L, 5L).assertRightBufferContainsOnly(1L, 2L, 3L, 4L, 5L).processWatermark1(4).processWatermark2(4).assertLeftBufferContainsOnly(5L).assertRightBufferContainsOnly(4L, 5L).processWatermark1(6).processWatermark2(6).assertLeftBufferEmpty().assertRightBufferEmpty().close();
    }

    @TestTemplate
    void testStateCleanupPositiveInclusivePositiveInclusive() throws Exception {
        this.setupHarness(0L, true, 1L, true).processElement1(1).processElement1(2).processElement1(3).processElement1(4).processElement1(5).processElement2(1).processElement2(2).processElement2(3).processElement2(4).processElement2(5).processWatermark1(1).processWatermark2(1).assertLeftBufferContainsOnly(1L, 2L, 3L, 4L, 5L).assertRightBufferContainsOnly(2L, 3L, 4L, 5L).processWatermark1(4).processWatermark2(4).assertLeftBufferContainsOnly(4L, 5L).assertRightBufferContainsOnly(5L).processWatermark1(6).processWatermark2(6).assertLeftBufferEmpty().assertRightBufferEmpty().close();
    }

    @TestTemplate
    void testStateCleanupPositiveExlusivePositiveExclusive() throws Exception {
        this.setupHarness(-1L, false, 2L, false).processElement1(1).processElement1(2).processElement1(3).processElement1(4).processElement1(5).processElement2(1).processElement2(2).processElement2(3).processElement2(4).processElement2(5).processWatermark1(1).processWatermark2(1).assertLeftBufferContainsOnly(1L, 2L, 3L, 4L, 5L).assertRightBufferContainsOnly(2L, 3L, 4L, 5L).processWatermark1(4).processWatermark2(4).assertLeftBufferContainsOnly(4L, 5L).assertRightBufferContainsOnly(5L).processWatermark1(6).processWatermark2(6).assertLeftBufferEmpty().assertRightBufferEmpty().close();
    }

    @TestTemplate
    void testRestoreFromSnapshot() throws Exception {
        ArrayList expectedOutput;
        OperatorSubtaskState handles;
        int lowerBound = -1;
        boolean lowerBoundInclusive = true;
        int upperBound = 1;
        boolean upperBoundInclusive = true;
        try (TestHarness testHarness = this.createTestHarness(lowerBound, lowerBoundInclusive, upperBound, upperBoundInclusive);){
            testHarness.setup();
            testHarness.open();
            testHarness.processElement1(IntervalJoinOperatorTest.createStreamRecord(1L, "lhs"));
            testHarness.processWatermark1(new Watermark(1L));
            testHarness.processElement2(IntervalJoinOperatorTest.createStreamRecord(1L, "rhs"));
            testHarness.processWatermark2(new Watermark(1L));
            testHarness.processElement1(IntervalJoinOperatorTest.createStreamRecord(2L, "lhs"));
            testHarness.processWatermark1(new Watermark(2L));
            testHarness.processElement2(IntervalJoinOperatorTest.createStreamRecord(2L, "rhs"));
            testHarness.processWatermark2(new Watermark(2L));
            testHarness.processElement1(IntervalJoinOperatorTest.createStreamRecord(3L, "lhs"));
            testHarness.processWatermark1(new Watermark(3L));
            testHarness.processElement2(IntervalJoinOperatorTest.createStreamRecord(3L, "rhs"));
            testHarness.processWatermark2(new Watermark(3L));
            handles = testHarness.snapshot(0L, 0L);
            testHarness.close();
            expectedOutput = Lists.newArrayList((Object[])new StreamRecord[]{this.streamRecordOf(1L, 1L), this.streamRecordOf(1L, 2L), this.streamRecordOf(2L, 1L), this.streamRecordOf(2L, 2L), this.streamRecordOf(2L, 3L), this.streamRecordOf(3L, 2L), this.streamRecordOf(3L, 3L)});
            TestHarnessUtil.assertNoLateRecords(testHarness.getOutput());
            this.assertOutput(expectedOutput, testHarness.getOutput());
        }
        var8_6 = null;
        try (TestHarness newTestHarness = this.createTestHarness(lowerBound, lowerBoundInclusive, upperBound, upperBoundInclusive);){
            newTestHarness.setup();
            newTestHarness.initializeState(handles);
            newTestHarness.open();
            newTestHarness.processElement1(IntervalJoinOperatorTest.createStreamRecord(4L, "lhs"));
            newTestHarness.processWatermark1(new Watermark(4L));
            newTestHarness.processElement2(IntervalJoinOperatorTest.createStreamRecord(4L, "rhs"));
            newTestHarness.processWatermark2(new Watermark(4L));
            expectedOutput = Lists.newArrayList((Object[])new StreamRecord[]{this.streamRecordOf(3L, 4L), this.streamRecordOf(4L, 3L), this.streamRecordOf(4L, 4L)});
            TestHarnessUtil.assertNoLateRecords(newTestHarness.getOutput());
            this.assertOutput(expectedOutput, newTestHarness.getOutput());
        }
        catch (Throwable throwable) {
            var8_6 = throwable;
            throw throwable;
        }
    }

    @TestTemplate
    void testContextCorrectLeftTimestamp() throws Exception {
        IntervalJoinOperator op = new IntervalJoinOperator(-1L, 1L, true, true, null, null, TestElem.serializer(), TestElem.serializer(), (ProcessJoinFunction)new ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>(){

            public void processElement(TestElem left, TestElem right, ProcessJoinFunction.Context ctx, Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
                Assertions.assertThat((long)ctx.getLeftTimestamp()).isEqualTo(left.ts);
            }
        });
        try (TestHarness testHarness = new TestHarness((TwoInputStreamOperator<TestElem, TestElem, Tuple2<TestElem, TestElem>>)op, (KeySelector<TestElem, String>)(KeySelector & Serializable)elem -> elem.key, (KeySelector<TestElem, String>)(KeySelector & Serializable)elem -> elem.key, (TypeInformation<String>)TypeInformation.of(String.class));){
            testHarness.setup();
            testHarness.open();
            this.processElementsAndWatermarks(testHarness);
        }
    }

    @TestTemplate
    void testReturnsCorrectTimestamp() throws Exception {
        IntervalJoinOperator op = new IntervalJoinOperator(-1L, 1L, true, true, null, null, TestElem.serializer(), TestElem.serializer(), (ProcessJoinFunction)new ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>(){
            private static final long serialVersionUID = 1L;

            public void processElement(TestElem left, TestElem right, ProcessJoinFunction.Context ctx, Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
                Assertions.assertThat((long)ctx.getTimestamp()).isEqualTo(Math.max(left.ts, right.ts));
            }
        });
        try (TestHarness testHarness = new TestHarness((TwoInputStreamOperator<TestElem, TestElem, Tuple2<TestElem, TestElem>>)op, (KeySelector<TestElem, String>)(KeySelector & Serializable)elem -> elem.key, (KeySelector<TestElem, String>)(KeySelector & Serializable)elem -> elem.key, (TypeInformation<String>)TypeInformation.of(String.class));){
            testHarness.setup();
            testHarness.open();
            this.processElementsAndWatermarks(testHarness);
        }
    }

    @TestTemplate
    void testContextCorrectRightTimestamp() throws Exception {
        IntervalJoinOperator op = new IntervalJoinOperator(-1L, 1L, true, true, null, null, TestElem.serializer(), TestElem.serializer(), (ProcessJoinFunction)new ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>>(){

            public void processElement(TestElem left, TestElem right, ProcessJoinFunction.Context ctx, Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
                Assertions.assertThat((long)ctx.getRightTimestamp()).isEqualTo(right.ts);
            }
        });
        try (TestHarness testHarness = new TestHarness((TwoInputStreamOperator<TestElem, TestElem, Tuple2<TestElem, TestElem>>)op, (KeySelector<TestElem, String>)(KeySelector & Serializable)elem -> elem.key, (KeySelector<TestElem, String>)(KeySelector & Serializable)elem -> elem.key, (TypeInformation<String>)TypeInformation.of(String.class));){
            testHarness.setup();
            testHarness.open();
            this.processElementsAndWatermarks(testHarness);
        }
    }

    @TestTemplate
    void testFailsWithNoTimestampsLeft() throws Exception {
        try (TestHarness newTestHarness = this.createTestHarness(0L, true, 0L, true);){
            newTestHarness.setup();
            newTestHarness.open();
            Assertions.assertThatThrownBy(() -> newTestHarness.processElement1(new StreamRecord((Object)new TestElem(0L, "lhs")))).isInstanceOf(FlinkException.class);
        }
    }

    @TestTemplate
    void testFailsWithNoTimestampsRight() throws Exception {
        try (TestHarness newTestHarness = this.createTestHarness(0L, true, 0L, true);){
            newTestHarness.setup();
            newTestHarness.open();
            Assertions.assertThatThrownBy(() -> newTestHarness.processElement2(new StreamRecord((Object)new TestElem(0L, "rhs")))).isInstanceOf(FlinkException.class);
        }
    }

    @TestTemplate
    void testDiscardsLateData() throws Exception {
        this.setupHarness(-1L, true, 1L, true).processElement1(1).processElement2(1).processElement1(2).processElement2(2).processElement1(3).processElement2(3).processWatermark1(3).processWatermark2(3).processElement1(1).processElement1(4).processElement2(4).processElement1(5).processElement2(5).andExpect(this.streamRecordOf(1L, 1L), this.streamRecordOf(1L, 2L), this.streamRecordOf(2L, 1L), this.streamRecordOf(2L, 2L), this.streamRecordOf(2L, 3L), this.streamRecordOf(3L, 2L), this.streamRecordOf(3L, 3L), this.streamRecordOf(3L, 4L), this.streamRecordOf(4L, 3L), this.streamRecordOf(4L, 4L), this.streamRecordOf(4L, 5L), this.streamRecordOf(5L, 4L), this.streamRecordOf(5L, 5L)).noLateRecords().close();
    }

    @TestTemplate
    void testLateData() throws Exception {
        OutputTag<TestElem> leftLateTag = new OutputTag<TestElem>("left_late"){};
        OutputTag<TestElem> rightLateTag = new OutputTag<TestElem>("right_late"){};
        this.setupHarness(-1L, true, 1L, true, leftLateTag, rightLateTag).processElement1(3).processElement2(3).processWatermark1(3).processWatermark2(3).processElement1(4).processElement2(4).processElement1(1).processElement2(2).processElement1(5).processElement2(5).andExpect(this.streamRecordOf(3L, 3L), this.streamRecordOf(3L, 4L), this.streamRecordOf(4L, 3L), this.streamRecordOf(4L, 4L), this.streamRecordOf(4L, 5L), this.streamRecordOf(5L, 4L), this.streamRecordOf(5L, 5L)).expectLateRecords(leftLateTag, IntervalJoinOperatorTest.createStreamRecord(1L, "lhs")).expectLateRecords(rightLateTag, IntervalJoinOperatorTest.createStreamRecord(2L, "rhs")).close();
    }

    private void assertEmpty(MapState<Long, ?> state) throws Exception {
        Assertions.assertThat((Iterable)state.keys()).isEmpty();
    }

    private void assertContainsOnly(MapState<Long, ?> state, long ... ts) throws Exception {
        for (long t : ts) {
            String message = "Keys not found in state. \n Expected: " + Arrays.toString(ts) + "\n Actual:   " + state.keys();
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)state.contains((Object)t)).as(message, new Object[0])).isTrue();
        }
        String message = "Too many objects in state. \n Expected: " + Arrays.toString(ts) + "\n Actual:   " + state.keys();
        ((IterableAssert)Assertions.assertThat((Iterable)state.keys()).as(message, new Object[0])).hasSize(ts.length);
    }

    private <T1, T2> void assertOutput(Iterable<StreamRecord<T1>> expectedOutput, Queue<T2> actualOutput) {
        int actualSize = actualOutput.stream().filter(elem -> elem instanceof StreamRecord).collect(Collectors.toList()).size();
        int expectedSize = Iterables.size(expectedOutput);
        ((AbstractIntegerAssert)Assertions.assertThat((int)actualSize).as("Expected and actual size of stream records different", new Object[0])).isEqualTo(expectedSize);
        for (StreamRecord<T1> record : expectedOutput) {
            Assertions.assertThat((boolean)actualOutput.contains(record)).isTrue();
        }
    }

    private TestHarness createTestHarness(long lowerBound, boolean lowerBoundInclusive, long upperBound, boolean upperBoundInclusive) throws Exception {
        IntervalJoinOperator operator = new IntervalJoinOperator(lowerBound, upperBound, lowerBoundInclusive, upperBoundInclusive, null, null, TestElem.serializer(), TestElem.serializer(), (ProcessJoinFunction)new PassthroughFunction());
        return new TestHarness((TwoInputStreamOperator<TestElem, TestElem, Tuple2<TestElem, TestElem>>)operator, (KeySelector<TestElem, String>)(KeySelector & Serializable)elem -> elem.key, (KeySelector<TestElem, String>)(KeySelector & Serializable)elem -> elem.key, (TypeInformation<String>)TypeInformation.of(String.class));
    }

    private JoinTestBuilder setupHarness(long lowerBound, boolean lowerBoundInclusive, long upperBound, boolean upperBoundInclusive, OutputTag<TestElem> leftLateDataOutputTag, OutputTag<TestElem> rightLateDataOutputTag) throws Exception {
        IntervalJoinOperator operator = new IntervalJoinOperator(lowerBound, upperBound, lowerBoundInclusive, upperBoundInclusive, leftLateDataOutputTag, rightLateDataOutputTag, TestElem.serializer(), TestElem.serializer(), (ProcessJoinFunction)new PassthroughFunction());
        TestHarness t = new TestHarness((TwoInputStreamOperator<TestElem, TestElem, Tuple2<TestElem, TestElem>>)operator, (KeySelector<TestElem, String>)(KeySelector & Serializable)elem -> elem.key, (KeySelector<TestElem, String>)(KeySelector & Serializable)elem -> elem.key, (TypeInformation<String>)TypeInformation.of(String.class));
        return new JoinTestBuilder(t, (IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>>)operator);
    }

    private JoinTestBuilder setupHarness(long lowerBound, boolean lowerBoundInclusive, long upperBound, boolean upperBoundInclusive) throws Exception {
        return this.setupHarness(lowerBound, lowerBoundInclusive, upperBound, upperBoundInclusive, null, null);
    }

    private StreamRecord<Tuple2<TestElem, TestElem>> streamRecordOf(long lhsTs, long rhsTs) {
        TestElem lhs = new TestElem(lhsTs, "lhs");
        TestElem rhs = new TestElem(rhsTs, "rhs");
        long ts = Math.max(lhsTs, rhsTs);
        return new StreamRecord((Object)Tuple2.of((Object)lhs, (Object)rhs), ts);
    }

    private static StreamRecord<TestElem> createStreamRecord(long ts, String source) {
        TestElem testElem = new TestElem(ts, source);
        return new StreamRecord((Object)testElem, ts);
    }

    private void processElementsAndWatermarks(TestHarness testHarness) throws Exception {
        if (this.lhsFasterThanRhs) {
            int i;
            for (i = 1; i <= 4; ++i) {
                testHarness.processElement1(IntervalJoinOperatorTest.createStreamRecord(i, "lhs"));
                testHarness.processWatermark1(new Watermark((long)i));
            }
            for (i = 1; i <= 4; ++i) {
                testHarness.processElement2(IntervalJoinOperatorTest.createStreamRecord(i, "rhs"));
                testHarness.processWatermark2(new Watermark((long)i));
            }
        } else {
            int i;
            for (i = 1; i <= 4; ++i) {
                testHarness.processElement2(IntervalJoinOperatorTest.createStreamRecord(i, "rhs"));
                testHarness.processWatermark2(new Watermark((long)i));
            }
            for (i = 1; i <= 4; ++i) {
                testHarness.processElement1(IntervalJoinOperatorTest.createStreamRecord(i, "lhs"));
                testHarness.processWatermark1(new Watermark((long)i));
            }
        }
    }

    private static class TestHarness
    extends KeyedTwoInputStreamOperatorTestHarness<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> {
        TestHarness(TwoInputStreamOperator<TestElem, TestElem, Tuple2<TestElem, TestElem>> operator, KeySelector<TestElem, String> keySelector1, KeySelector<TestElem, String> keySelector2, TypeInformation<String> keyType) throws Exception {
            super(operator, keySelector1, keySelector2, keyType);
        }
    }

    private static class TestElem {
        String key = "key";
        long ts;
        String source;

        public TestElem(long ts, String source) {
            this.ts = ts;
            this.source = source;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            TestElem testElem = (TestElem)o;
            if (this.ts != testElem.ts) {
                return false;
            }
            if (this.key != null ? !this.key.equals(testElem.key) : testElem.key != null) {
                return false;
            }
            return this.source != null ? this.source.equals(testElem.source) : testElem.source == null;
        }

        public int hashCode() {
            int result = this.key != null ? this.key.hashCode() : 0;
            result = 31 * result + (int)(this.ts ^ this.ts >>> 32);
            result = 31 * result + (this.source != null ? this.source.hashCode() : 0);
            return result;
        }

        public String toString() {
            return this.source + ":" + this.ts;
        }

        public static TypeSerializer<TestElem> serializer() {
            return TypeInformation.of((TypeHint)new TypeHint<TestElem>(){}).createSerializer((SerializerConfig)new SerializerConfigImpl());
        }
    }

    private static class PassthroughFunction
    extends ProcessJoinFunction<TestElem, TestElem, Tuple2<TestElem, TestElem>> {
        private PassthroughFunction() {
        }

        public void processElement(TestElem left, TestElem right, ProcessJoinFunction.Context ctx, Collector<Tuple2<TestElem, TestElem>> out) throws Exception {
            out.collect((Object)Tuple2.of((Object)left, (Object)right));
        }
    }

    private class JoinTestBuilder {
        private IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator;
        private TestHarness testHarness;

        public JoinTestBuilder(TestHarness t, IntervalJoinOperator<String, TestElem, TestElem, Tuple2<TestElem, TestElem>> operator) throws Exception {
            this.testHarness = t;
            this.operator = operator;
            t.open();
            t.setup();
        }

        public TestHarness get() {
            return this.testHarness;
        }

        public JoinTestBuilder processElement1(int ts) throws Exception {
            this.testHarness.processElement1(IntervalJoinOperatorTest.createStreamRecord(ts, "lhs"));
            return this;
        }

        public JoinTestBuilder processElement2(int ts) throws Exception {
            this.testHarness.processElement2(IntervalJoinOperatorTest.createStreamRecord(ts, "rhs"));
            return this;
        }

        public JoinTestBuilder processWatermark1(int ts) throws Exception {
            this.testHarness.processWatermark1(new Watermark((long)ts));
            return this;
        }

        public JoinTestBuilder processWatermark2(int ts) throws Exception {
            this.testHarness.processWatermark2(new Watermark((long)ts));
            return this;
        }

        public JoinTestBuilder processElementsAndWatermarks(int from, int to) throws Exception {
            if (IntervalJoinOperatorTest.this.lhsFasterThanRhs) {
                int i;
                for (i = from; i <= to; ++i) {
                    this.testHarness.processElement1(IntervalJoinOperatorTest.createStreamRecord(i, "lhs"));
                    this.testHarness.processWatermark1(new Watermark((long)i));
                }
                for (i = from; i <= to; ++i) {
                    this.testHarness.processElement2(IntervalJoinOperatorTest.createStreamRecord(i, "rhs"));
                    this.testHarness.processWatermark2(new Watermark((long)i));
                }
            } else {
                int i;
                for (i = from; i <= to; ++i) {
                    this.testHarness.processElement2(IntervalJoinOperatorTest.createStreamRecord(i, "rhs"));
                    this.testHarness.processWatermark2(new Watermark((long)i));
                }
                for (i = from; i <= to; ++i) {
                    this.testHarness.processElement1(IntervalJoinOperatorTest.createStreamRecord(i, "lhs"));
                    this.testHarness.processWatermark1(new Watermark((long)i));
                }
            }
            return this;
        }

        @SafeVarargs
        public final JoinTestBuilder andExpect(StreamRecord<Tuple2<TestElem, TestElem>> ... elems) {
            IntervalJoinOperatorTest.this.assertOutput(Lists.newArrayList((Object[])elems), this.testHarness.getOutput());
            return this;
        }

        public JoinTestBuilder assertLeftBufferContainsOnly(long ... timestamps) {
            try {
                IntervalJoinOperatorTest.this.assertContainsOnly(this.operator.getLeftBuffer(), timestamps);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            return this;
        }

        public JoinTestBuilder assertRightBufferContainsOnly(long ... timestamps) {
            try {
                IntervalJoinOperatorTest.this.assertContainsOnly(this.operator.getRightBuffer(), timestamps);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            return this;
        }

        public JoinTestBuilder assertLeftBufferEmpty() {
            try {
                IntervalJoinOperatorTest.this.assertEmpty(this.operator.getLeftBuffer());
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            return this;
        }

        public JoinTestBuilder assertRightBufferEmpty() {
            try {
                IntervalJoinOperatorTest.this.assertEmpty(this.operator.getRightBuffer());
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            return this;
        }

        @SafeVarargs
        public final JoinTestBuilder expectLateRecords(OutputTag<TestElem> tag, StreamRecord<TestElem> ... elems) {
            IntervalJoinOperatorTest.this.assertOutput(Lists.newArrayList((Object[])elems), this.testHarness.getSideOutput(tag));
            return this;
        }

        public JoinTestBuilder noLateRecords() {
            TestHarnessUtil.assertNoLateRecords(this.testHarness.getOutput());
            return this;
        }

        public void close() throws Exception {
            this.testHarness.close();
        }
    }
}

