/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.connector.sink2;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator;
import org.apache.flink.streaming.api.connector.sink2.IntegerSerializer;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.function.SerializableSupplier;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class GlobalCommitterOperatorTest {
    GlobalCommitterOperatorTest() {
    }

    @Test
    void testWaitForCommittablesOfLatestCheckpointBeforeCommitting() throws Exception {
        MockCommitter committer = new MockCommitter();
        OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void> testHarness = this.createTestHarness(committer);
        testHarness.open();
        testHarness.processElement((StreamRecord<CommittableMessage<Integer>>)new StreamRecord((Object)new CommittableSummary(1, 1, Long.valueOf(1L), 2, 0, 0)));
        testHarness.processElement((StreamRecord<CommittableMessage<Integer>>)new StreamRecord((Object)new CommittableWithLineage((Object)1, Long.valueOf(1L), 1)));
        testHarness.notifyOfCompletedCheckpoint(1L);
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        Assertions.assertThat(committer.committed).isEmpty();
        testHarness.processElement((StreamRecord<CommittableMessage<Integer>>)new StreamRecord((Object)new CommittableWithLineage((Object)2, Long.valueOf(1L), 1)));
        testHarness.notifyOfCompletedCheckpoint(2L);
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        Assertions.assertThat(committer.committed).containsExactly((Object[])new Integer[]{1, 2});
        testHarness.close();
    }

    @Test
    void testStateRestore() throws Exception {
        MockCommitter committer = new MockCommitter();
        OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void> testHarness = this.createTestHarness(committer);
        testHarness.open();
        CommittableSummary committableSummary = new CommittableSummary(1, 1, Long.valueOf(0L), 1, 1, 0);
        testHarness.processElement((StreamRecord<CommittableMessage<Integer>>)new StreamRecord((Object)committableSummary));
        CommittableWithLineage first = new CommittableWithLineage((Object)1, Long.valueOf(0L), 1);
        testHarness.processElement((StreamRecord<CommittableMessage<Integer>>)new StreamRecord((Object)first));
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 2L);
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        testHarness.close();
        Assertions.assertThat(committer.committed).isEmpty();
        OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void> restored = this.createTestHarness(committer);
        restored.initializeState(snapshot);
        restored.open();
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        Assertions.assertThat(committer.committed).containsExactly((Object[])new Integer[]{1});
        restored.close();
    }

    @Test
    void testCommitAllCommittablesOnEndOfInput() throws Exception {
        MockCommitter committer = new MockCommitter();
        OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void> testHarness = this.createTestHarness(committer);
        testHarness.open();
        CommittableSummary committableSummary = new CommittableSummary(1, 2, null, 1, 1, 0);
        testHarness.processElement((StreamRecord<CommittableMessage<Integer>>)new StreamRecord((Object)committableSummary));
        CommittableSummary committableSummary2 = new CommittableSummary(2, 2, null, 1, 1, 0);
        testHarness.processElement((StreamRecord<CommittableMessage<Integer>>)new StreamRecord((Object)committableSummary2));
        CommittableWithLineage first = new CommittableWithLineage((Object)1, null, 1);
        testHarness.processElement((StreamRecord<CommittableMessage<Integer>>)new StreamRecord((Object)first));
        CommittableWithLineage second = new CommittableWithLineage((Object)2, null, 2);
        testHarness.processElement((StreamRecord<CommittableMessage<Integer>>)new StreamRecord((Object)second));
        testHarness.endInput();
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        Assertions.assertThat(committer.committed).containsExactly((Object[])new Integer[]{1, 2});
    }

    private OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void> createTestHarness(Committer<Integer> committer) throws Exception {
        return new OneInputStreamOperatorTestHarness<CommittableMessage<Integer>, Void>((OneInputStreamOperator<CommittableMessage<Integer>, Void>)new GlobalCommitterOperator((SerializableSupplier & Serializable)() -> committer, IntegerSerializer::new));
    }

    private static class MockCommitter
    implements Committer<Integer> {
        final List<Integer> committed = new ArrayList<Integer>();

        private MockCommitter() {
        }

        public void close() throws Exception {
        }

        public void commit(Collection<Committer.CommitRequest<Integer>> committables) throws IOException, InterruptedException {
            committables.forEach(c -> this.committed.add((Integer)c.getCommittable()));
        }
    }
}

