/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.leaderretrieval;

import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.core.testutils.CustomExtension;
import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalEventHandler;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.SupplierWithException;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.OptionalAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class ZooKeeperLeaderRetrievalConnectionHandlingTest {
    @RegisterExtension
    private final TestingFatalErrorHandlerExtension fatalErrorHandlerResource = new TestingFatalErrorHandlerExtension();
    @RegisterExtension
    private final EachCallbackWrapper<ZooKeeperExtension> zooKeeperExtension = new EachCallbackWrapper((CustomExtension)new ZooKeeperExtension());
    @Nullable
    private CuratorFramework zooKeeperClient;

    ZooKeeperLeaderRetrievalConnectionHandlingTest() {
    }

    @BeforeEach
    void before() throws Exception {
        this.zooKeeperClient = ((ZooKeeperExtension)this.zooKeeperExtension.getCustomExtension()).getZooKeeperClient(this.fatalErrorHandlerResource.getTestingFatalErrorHandler());
        this.zooKeeperClient.blockUntilConnected();
    }

    private ZooKeeperExtension getZooKeeper() {
        return (ZooKeeperExtension)this.zooKeeperExtension.getCustomExtension();
    }

    @Test
    void testConnectionSuspendedHandlingDuringInitialization() throws Exception {
        this.testWithQueueLeaderElectionListener((FunctionWithException<QueueLeaderElectionListener, ZooKeeperLeaderRetrievalDriver, Exception>)((FunctionWithException)queueLeaderElectionListener -> ZooKeeperUtils.createLeaderRetrievalDriverFactory((CuratorFramework)this.zooKeeperClient).createLeaderRetrievalDriver((LeaderRetrievalEventHandler)queueLeaderElectionListener, (FatalErrorHandler)this.fatalErrorHandlerResource.getTestingFatalErrorHandler())), (BiConsumerWithException<ZooKeeperLeaderRetrievalDriver, QueueLeaderElectionListener, Exception>)((BiConsumerWithException)(leaderRetrievalDriver, queueLeaderElectionListener) -> {
            ((OptionalAssert)Assertions.assertThat(queueLeaderElectionListener.next(Duration.ofMillis(50L))).as("No results are expected, yet, since no leader was elected.", new Object[0])).isNotPresent();
            this.getZooKeeper().restart();
            String secondAddress = queueLeaderElectionListener.next().getLeaderAddress();
            ((AbstractStringAssert)Assertions.assertThat((String)secondAddress).as("The next result is expected to be null.", new Object[0])).isNull();
        }));
    }

    @Test
    void testConnectionSuspendedHandling() throws Exception {
        this.testWithQueueLeaderElectionListener((FunctionWithException<QueueLeaderElectionListener, ZooKeeperLeaderRetrievalDriver, Exception>)((FunctionWithException)queueLeaderElectionListener -> new ZooKeeperLeaderRetrievalDriver(this.zooKeeperClient, "/testConnectionSuspendedHandling", (LeaderRetrievalEventHandler)queueLeaderElectionListener, ZooKeeperLeaderRetrievalDriver.LeaderInformationClearancePolicy.ON_SUSPENDED_CONNECTION, (FatalErrorHandler)this.fatalErrorHandlerResource.getTestingFatalErrorHandler())), (BiConsumerWithException<ZooKeeperLeaderRetrievalDriver, QueueLeaderElectionListener, Exception>)((BiConsumerWithException)(leaderRetrievalDriver, queueLeaderElectionListener) -> {
            String leaderAddress = "localhost";
            this.writeLeaderInformationToZooKeeper(leaderRetrievalDriver.getConnectionInformationPath(), "localhost", UUID.randomUUID());
            String firstAddress = queueLeaderElectionListener.next().getLeaderAddress();
            ((AbstractStringAssert)Assertions.assertThat((String)firstAddress).as("The first result is expected to be the initially set leader address.", new Object[0])).isEqualTo("localhost");
            this.getZooKeeper().restart();
            String secondAddress = queueLeaderElectionListener.next().getLeaderAddress();
            ((AbstractStringAssert)Assertions.assertThat((String)secondAddress).as("The next result is expected to be null.", new Object[0])).isNull();
        }));
    }

    @Test
    void testSuspendedConnectionDoesNotClearLeaderInformationIfClearanceOnLostConnection() throws Exception {
        this.testWithQueueLeaderElectionListener((FunctionWithException<QueueLeaderElectionListener, ZooKeeperLeaderRetrievalDriver, Exception>)((FunctionWithException)queueLeaderElectionListener -> new ZooKeeperLeaderRetrievalDriver(this.zooKeeperClient, "/testConnectionSuspendedHandling", (LeaderRetrievalEventHandler)queueLeaderElectionListener, ZooKeeperLeaderRetrievalDriver.LeaderInformationClearancePolicy.ON_LOST_CONNECTION, (FatalErrorHandler)this.fatalErrorHandlerResource.getTestingFatalErrorHandler())), (BiConsumerWithException<ZooKeeperLeaderRetrievalDriver, QueueLeaderElectionListener, Exception>)((BiConsumerWithException)(leaderRetrievalDriver, queueLeaderElectionListener) -> {
            String leaderAddress = "localhost";
            this.writeLeaderInformationToZooKeeper(leaderRetrievalDriver.getConnectionInformationPath(), "localhost", UUID.randomUUID());
            String firstAddress = queueLeaderElectionListener.next().getLeaderAddress();
            ((AbstractStringAssert)Assertions.assertThat((String)firstAddress).as("The first result is expected to be the initially set leader address.", new Object[0])).isEqualTo("localhost");
            this.getZooKeeper().close();
            Assertions.assertThat(queueLeaderElectionListener.next(Duration.ofMillis(100L))).isNotPresent();
        }));
    }

    @Test
    void testSameLeaderAfterReconnectTriggersListenerNotification() throws Exception {
        this.testWithQueueLeaderElectionListener((FunctionWithException<QueueLeaderElectionListener, ZooKeeperLeaderRetrievalDriver, Exception>)((FunctionWithException)queueLeaderElectionListener -> new ZooKeeperLeaderRetrievalDriver(this.zooKeeperClient, "/testSameLeaderAfterReconnectTriggersListenerNotification", (LeaderRetrievalEventHandler)queueLeaderElectionListener, ZooKeeperLeaderRetrievalDriver.LeaderInformationClearancePolicy.ON_SUSPENDED_CONNECTION, (FatalErrorHandler)this.fatalErrorHandlerResource.getTestingFatalErrorHandler())), (BiConsumerWithException<ZooKeeperLeaderRetrievalDriver, QueueLeaderElectionListener, Exception>)((BiConsumerWithException)(leaderRetrievalDriver, queueLeaderElectionListener) -> {
            String leaderAddress = "foobar";
            UUID sessionId = UUID.randomUUID();
            this.writeLeaderInformationToZooKeeper(leaderRetrievalDriver.getConnectionInformationPath(), "foobar", sessionId);
            queueLeaderElectionListener.next();
            this.getZooKeeper().stop();
            queueLeaderElectionListener.next();
            this.getZooKeeper().restart();
            LeaderInformation connectionReconnect = queueLeaderElectionListener.next();
            Assertions.assertThat((String)connectionReconnect.getLeaderAddress()).isEqualTo("foobar");
        }));
    }

    private void writeLeaderInformationToZooKeeper(String retrievalPath, String leaderAddress, UUID sessionId) throws Exception {
        byte[] data = this.createLeaderInformation(leaderAddress, sessionId);
        if (this.zooKeeperClient.checkExists().forPath(retrievalPath) != null) {
            this.zooKeeperClient.setData().forPath(retrievalPath, data);
        } else {
            this.zooKeeperClient.create().creatingParentsIfNeeded().forPath(retrievalPath, data);
        }
    }

    /*
     * Exception decompiling
     */
    private byte[] createLeaderInformation(String leaderAddress, UUID sessionId) throws IOException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    @Test
    void testNewLeaderAfterReconnectTriggersListenerNotification() throws Exception {
        this.testWithQueueLeaderElectionListener((FunctionWithException<QueueLeaderElectionListener, ZooKeeperLeaderRetrievalDriver, Exception>)((FunctionWithException)queueLeaderElectionListener -> new ZooKeeperLeaderRetrievalDriver(this.zooKeeperClient, "/testNewLeaderAfterReconnectTriggersListenerNotification", (LeaderRetrievalEventHandler)queueLeaderElectionListener, ZooKeeperLeaderRetrievalDriver.LeaderInformationClearancePolicy.ON_SUSPENDED_CONNECTION, (FatalErrorHandler)this.fatalErrorHandlerResource.getTestingFatalErrorHandler())), (BiConsumerWithException<ZooKeeperLeaderRetrievalDriver, QueueLeaderElectionListener, Exception>)((BiConsumerWithException)(leaderRetrievalDriver, queueLeaderElectionListener) -> {
            String leaderAddress = "foobar";
            UUID sessionId = UUID.randomUUID();
            this.writeLeaderInformationToZooKeeper(leaderRetrievalDriver.getConnectionInformationPath(), "foobar", sessionId);
            queueLeaderElectionListener.next();
            this.getZooKeeper().stop();
            queueLeaderElectionListener.next();
            this.getZooKeeper().restart();
            String newLeaderAddress = "barfoo";
            UUID newSessionId = UUID.randomUUID();
            this.writeLeaderInformationToZooKeeper(leaderRetrievalDriver.getConnectionInformationPath(), "barfoo", newSessionId);
            CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> {
                LeaderInformation afterConnectionReconnect = queueLeaderElectionListener.next();
                return afterConnectionReconnect.getLeaderAddress() != null && afterConnectionReconnect.getLeaderAddress().equals("barfoo");
            }));
        }));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testWithQueueLeaderElectionListener(FunctionWithException<QueueLeaderElectionListener, ZooKeeperLeaderRetrievalDriver, Exception> driverFactoryMethod, BiConsumerWithException<ZooKeeperLeaderRetrievalDriver, QueueLeaderElectionListener, Exception> testCallback) throws Exception {
        QueueLeaderElectionListener queueLeaderElectionListener = new QueueLeaderElectionListener(1);
        ZooKeeperLeaderRetrievalDriver leaderRetrievalDriver = null;
        try {
            leaderRetrievalDriver = (ZooKeeperLeaderRetrievalDriver)driverFactoryMethod.apply((Object)queueLeaderElectionListener);
            testCallback.accept((Object)leaderRetrievalDriver, (Object)queueLeaderElectionListener);
        }
        finally {
            queueLeaderElectionListener.clearUnhandledEvents();
            if (leaderRetrievalDriver != null) {
                leaderRetrievalDriver.close();
            }
        }
    }

    private static class QueueLeaderElectionListener
    implements LeaderRetrievalEventHandler {
        private final BlockingQueue<LeaderInformation> queue;

        public QueueLeaderElectionListener(int expectedCalls) {
            this.queue = new ArrayBlockingQueue<LeaderInformation>(expectedCalls);
        }

        public void notifyLeaderAddress(LeaderInformation leaderInformation) {
            try {
                this.queue.put(leaderInformation);
            }
            catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
        }

        public LeaderInformation next() {
            try {
                return this.queue.take();
            }
            catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
        }

        public Optional<LeaderInformation> next(Duration timeout) {
            try {
                return Optional.ofNullable(this.queue.poll(timeout.toMillis(), TimeUnit.MILLISECONDS));
            }
            catch (InterruptedException e) {
                throw new IllegalStateException(e);
            }
        }

        public void clearUnhandledEvents() {
            while (!this.queue.isEmpty()) {
                this.queue.poll();
            }
        }
    }
}

