/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.sink;

import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.util.ContentDump;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.testutils.logging.LoggerAuditingExtension;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.event.Level;

class TwoPhaseCommitSinkFunctionTest {
    private ContentDumpSinkFunction sinkFunction;
    private OneInputStreamOperatorTestHarness<String, Object> harness;
    private AtomicBoolean throwException = new AtomicBoolean();
    private ContentDump targetDirectory;
    private ContentDump tmpDirectory;
    private SettableClock clock;
    @RegisterExtension
    private LoggerAuditingExtension testLoggerResource = new LoggerAuditingExtension(TwoPhaseCommitSinkFunction.class, Level.WARN);

    TwoPhaseCommitSinkFunctionTest() {
    }

    @BeforeEach
    void setUp() throws Exception {
        this.targetDirectory = new ContentDump();
        this.tmpDirectory = new ContentDump();
        this.clock = new SettableClock();
        this.setUpTestHarness();
    }

    @AfterEach
    void tearDown() throws Exception {
        this.closeTestHarness();
    }

    private void setUpTestHarness() throws Exception {
        this.sinkFunction = new ContentDumpSinkFunction();
        this.harness = new OneInputStreamOperatorTestHarness(new StreamSink((SinkFunction)this.sinkFunction), StringSerializer.INSTANCE);
        this.harness.setup();
    }

    private void closeTestHarness() throws Exception {
        this.harness.close();
    }

    @Test
    void testSubsumedNotificationOfPreviousCheckpoint() throws Exception {
        this.harness.open();
        this.harness.processElement("42", 0L);
        this.harness.snapshot(0L, 1L);
        this.harness.processElement("43", 2L);
        this.harness.snapshot(1L, 3L);
        this.harness.processElement("44", 4L);
        this.harness.snapshot(2L, 5L);
        this.harness.notifyOfCompletedCheckpoint(2L);
        this.harness.notifyOfCompletedCheckpoint(1L);
        this.assertExactlyOnce(Arrays.asList("42", "43", "44"));
        Assertions.assertThat(this.tmpDirectory.listFiles()).hasSize(1);
    }

    @Test
    void testNoTransactionAfterSinkFunctionFinish() throws Exception {
        this.harness.open();
        this.harness.processElement("42", 0L);
        this.harness.snapshot(0L, 1L);
        this.harness.processElement("43", 2L);
        this.harness.snapshot(1L, 3L);
        this.harness.processElement("44", 4L);
        this.sinkFunction.finish();
        this.harness.snapshot(2L, 5L);
        this.harness.notifyOfCompletedCheckpoint(1L);
        this.harness.snapshot(3L, 6L);
        Assertions.assertThatThrownBy(() -> this.harness.processElement("45", 7L)).isInstanceOf(NullPointerException.class);
        this.assertExactlyOnce(Arrays.asList("42", "43"));
        Assertions.assertThat(this.tmpDirectory.listFiles()).hasSize(1);
    }

    @Test
    void testRecoverFromStateAfterFinished() throws Exception {
        this.harness.open();
        this.harness.processElement("42", 0L);
        this.sinkFunction.finish();
        OperatorSubtaskState operatorSubtaskState = this.harness.snapshot(2L, 5L);
        this.closeTestHarness();
        this.setUpTestHarness();
        this.harness.initializeState(operatorSubtaskState);
        this.harness.open();
        Assertions.assertThat(this.sinkFunction.abortedTransactions).isEmpty();
    }

    @Test
    void testNotifyOfCompletedCheckpoint() throws Exception {
        this.harness.open();
        this.harness.processElement("42", 0L);
        this.harness.snapshot(0L, 1L);
        this.harness.processElement("43", 2L);
        this.harness.snapshot(1L, 3L);
        this.harness.processElement("44", 4L);
        this.harness.snapshot(2L, 5L);
        this.harness.notifyOfCompletedCheckpoint(1L);
        this.assertExactlyOnce(Arrays.asList("42", "43"));
        Assertions.assertThat(this.tmpDirectory.listFiles()).hasSize(2);
    }

    @Test
    void testFailBeforeNotify() throws Exception {
        this.harness.open();
        this.harness.processElement("42", 0L);
        this.harness.snapshot(0L, 1L);
        this.harness.processElement("43", 2L);
        OperatorSubtaskState snapshot = this.harness.snapshot(1L, 3L);
        this.tmpDirectory.setWritable(false);
        Assertions.assertThatThrownBy(() -> {
            this.harness.processElement("44", 4L);
            this.harness.snapshot(2L, 5L);
        }).hasCauseInstanceOf(ContentDump.NotWritableException.class);
        this.closeTestHarness();
        this.tmpDirectory.setWritable(true);
        this.setUpTestHarness();
        this.harness.initializeState(snapshot);
        this.assertExactlyOnce(Arrays.asList("42", "43"));
        this.closeTestHarness();
        Assertions.assertThat(this.tmpDirectory.listFiles()).isEmpty();
    }

    @Test
    void testIgnoreCommitExceptionDuringRecovery() throws Exception {
        this.clock.setEpochMilli(0L);
        this.harness.open();
        this.harness.processElement("42", 0L);
        OperatorSubtaskState snapshot = this.harness.snapshot(0L, 1L);
        this.harness.notifyOfCompletedCheckpoint(1L);
        this.throwException.set(true);
        this.closeTestHarness();
        this.setUpTestHarness();
        long transactionTimeout = 1000L;
        this.sinkFunction.setTransactionTimeout(1000L);
        this.sinkFunction.ignoreFailuresAfterTransactionTimeout();
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.harness.initializeState(snapshot)).isInstanceOf(RuntimeException.class)).hasMessage("Expected exception");
        this.clock.setEpochMilli(1001L);
        this.harness.initializeState(snapshot);
        this.assertExactlyOnce(Collections.singletonList("42"));
    }

    @Test
    void testLogTimeoutAlmostReachedWarningDuringCommit() throws Exception {
        this.clock.setEpochMilli(0L);
        long transactionTimeout = 1000L;
        double warningRatio = 0.5;
        this.sinkFunction.setTransactionTimeout(1000L);
        this.sinkFunction.enableTransactionTimeoutWarnings(0.5);
        this.harness.open();
        this.harness.snapshot(0L, 1L);
        long elapsedTime = 502L;
        this.clock.setEpochMilli(502L);
        this.harness.notifyOfCompletedCheckpoint(1L);
        Assertions.assertThat((List)this.testLoggerResource.getMessages()).anyMatch(item -> item.contains("has been open for 502 ms. This is close to or even exceeding the transaction timeout of 1000 ms."));
    }

    @Test
    void testLogTimeoutAlmostReachedWarningDuringRecovery() throws Exception {
        this.clock.setEpochMilli(0L);
        long transactionTimeout = 1000L;
        double warningRatio = 0.5;
        this.sinkFunction.setTransactionTimeout(1000L);
        this.sinkFunction.enableTransactionTimeoutWarnings(0.5);
        this.harness.open();
        OperatorSubtaskState snapshot = this.harness.snapshot(0L, 1L);
        long elapsedTime = 502L;
        this.clock.setEpochMilli(502L);
        this.closeTestHarness();
        this.setUpTestHarness();
        this.sinkFunction.setTransactionTimeout(1000L);
        this.sinkFunction.enableTransactionTimeoutWarnings(0.5);
        this.harness.initializeState(snapshot);
        this.harness.open();
        this.closeTestHarness();
        Assertions.assertThat((List)this.testLoggerResource.getMessages()).anyMatch(item -> item.contains("has been open for 502 ms. This is close to or even exceeding the transaction timeout of 1000 ms."));
    }

    private void assertExactlyOnce(List<String> expectedValues) {
        ArrayList<String> actualValues = new ArrayList<String>();
        for (String name : this.targetDirectory.listFiles()) {
            actualValues.addAll(this.targetDirectory.read(name));
        }
        Collections.sort(actualValues);
        Collections.sort(expectedValues);
        Assertions.assertThat(actualValues).isEqualTo(expectedValues);
    }

    private static class SettableClock
    extends Clock {
        private final ZoneId zoneId;
        private long epochMilli;

        private SettableClock() {
            this.zoneId = ZoneOffset.UTC;
        }

        public SettableClock(ZoneId zoneId, long epochMilli) {
            this.zoneId = zoneId;
            this.epochMilli = epochMilli;
        }

        public void setEpochMilli(long epochMilli) {
            this.epochMilli = epochMilli;
        }

        @Override
        public ZoneId getZone() {
            return this.zoneId;
        }

        @Override
        public Clock withZone(ZoneId zone) {
            if (zone.equals(this.zoneId)) {
                return this;
            }
            return new SettableClock(zone, this.epochMilli);
        }

        @Override
        public Instant instant() {
            return Instant.ofEpochMilli(this.epochMilli);
        }
    }

    private static class ContentTransactionSerializer
    extends KryoSerializer<ContentTransaction> {
        public ContentTransactionSerializer() {
            super(ContentTransaction.class, (SerializerConfig)new SerializerConfigImpl());
        }

        public KryoSerializer<ContentTransaction> duplicate() {
            return this;
        }

        public String toString() {
            return "ContentTransactionSerializer";
        }
    }

    private static class ContentTransaction {
        private ContentDump.ContentWriter tmpContentWriter;

        public ContentTransaction(ContentDump.ContentWriter tmpContentWriter) {
            this.tmpContentWriter = tmpContentWriter;
        }

        public String toString() {
            return String.format("ContentTransaction[%s]", this.tmpContentWriter.getName());
        }
    }

    private class ContentDumpSinkFunction
    extends TwoPhaseCommitSinkFunction<String, ContentTransaction, Void> {
        final List<ContentTransaction> abortedTransactions;

        public ContentDumpSinkFunction() {
            super((TypeSerializer)new ContentTransactionSerializer(), (TypeSerializer)VoidSerializer.INSTANCE, (Clock)TwoPhaseCommitSinkFunctionTest.this.clock);
            this.abortedTransactions = new ArrayList<ContentTransaction>();
        }

        protected void invoke(ContentTransaction transaction, String value, SinkFunction.Context context) throws Exception {
            transaction.tmpContentWriter.write(value);
        }

        protected ContentTransaction beginTransaction() throws Exception {
            return new ContentTransaction(TwoPhaseCommitSinkFunctionTest.this.tmpDirectory.createWriter(UUID.randomUUID().toString()));
        }

        protected void preCommit(ContentTransaction transaction) throws Exception {
            transaction.tmpContentWriter.flush();
            transaction.tmpContentWriter.close();
        }

        protected void commit(ContentTransaction transaction) {
            if (TwoPhaseCommitSinkFunctionTest.this.throwException.get()) {
                throw new RuntimeException("Expected exception");
            }
            ContentDump.move(transaction.tmpContentWriter.getName(), TwoPhaseCommitSinkFunctionTest.this.tmpDirectory, TwoPhaseCommitSinkFunctionTest.this.targetDirectory);
        }

        protected void abort(ContentTransaction transaction) {
            this.abortedTransactions.add(transaction);
            transaction.tmpContentWriter.close();
            TwoPhaseCommitSinkFunctionTest.this.tmpDirectory.delete(transaction.tmpContentWriter.getName());
        }
    }
}

