/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.client.JobInitializationException;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess;
import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult;
import org.apache.flink.runtime.jobmaster.JobMasterService;
import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
import org.apache.flink.runtime.jobmaster.TestingJobMasterService;
import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceFactory;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
import org.apache.flink.util.SerializedThrowable;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.CompletableFutureAssert;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.Test;

class DefaultJobMasterServiceProcessTest {
    private static final JobID jobId = new JobID();
    private static final Function<Throwable, ArchivedExecutionGraph> failedArchivedExecutionGraphFactory = throwable -> ArchivedExecutionGraph.createSparseArchivedExecutionGraph((JobID)jobId, (String)"test", (JobStatus)JobStatus.FAILED, (Throwable)throwable, null, (long)1337L);

    DefaultJobMasterServiceProcessTest() {
    }

    @Test
    void testInitializationFailureCompletesResultFuture() {
        CompletableFuture<JobMasterService> jobMasterServiceFuture = new CompletableFuture<JobMasterService>();
        DefaultJobMasterServiceProcess serviceProcess = this.createTestInstance(jobMasterServiceFuture);
        RuntimeException originalCause = new RuntimeException("Init error");
        jobMasterServiceFuture.completeExceptionally(originalCause);
        JobManagerRunnerResult actualJobManagerResult = (JobManagerRunnerResult)serviceProcess.getResultFuture().join();
        Assertions.assertThat((boolean)actualJobManagerResult.isInitializationFailure()).isTrue();
        Throwable initializationFailure = actualJobManagerResult.getInitializationFailure();
        ((AbstractThrowableAssert)Assertions.assertThat((Throwable)initializationFailure).isInstanceOf(JobInitializationException.class)).hasCause((Throwable)originalCause);
    }

    @Test
    void testInitializationFailureSetsFailureInfoProperly() throws ExecutionException, InterruptedException {
        CompletableFuture<JobMasterService> jobMasterServiceFuture = new CompletableFuture<JobMasterService>();
        DefaultJobMasterServiceProcess serviceProcess = this.createTestInstance(jobMasterServiceFuture);
        RuntimeException originalCause = new RuntimeException("Expected RuntimeException");
        long beforeFailureTimestamp = System.currentTimeMillis();
        jobMasterServiceFuture.completeExceptionally(originalCause);
        long afterFailureTimestamp = System.currentTimeMillis();
        JobManagerRunnerResult result = (JobManagerRunnerResult)serviceProcess.getResultFuture().get();
        ErrorInfo executionGraphFailure = result.getExecutionGraphInfo().getArchivedExecutionGraph().getFailureInfo();
        Assertions.assertThat((Object)executionGraphFailure).isNotNull();
        DefaultJobMasterServiceProcessTest.assertInitializationException(executionGraphFailure.getException(), originalCause, executionGraphFailure.getTimestamp(), beforeFailureTimestamp, afterFailureTimestamp);
    }

    @Test
    void testInitializationFailureSetsExceptionHistoryProperly() throws ExecutionException, InterruptedException {
        CompletableFuture<JobMasterService> jobMasterServiceFuture = new CompletableFuture<JobMasterService>();
        DefaultJobMasterServiceProcess serviceProcess = this.createTestInstance(jobMasterServiceFuture);
        RuntimeException originalCause = new RuntimeException("Expected RuntimeException");
        long beforeFailureTimestamp = System.currentTimeMillis();
        jobMasterServiceFuture.completeExceptionally(originalCause);
        long afterFailureTimestamp = System.currentTimeMillis();
        RootExceptionHistoryEntry entry = (RootExceptionHistoryEntry)Iterables.getOnlyElement((Iterable)((JobManagerRunnerResult)serviceProcess.getResultFuture().get()).getExecutionGraphInfo().getExceptionHistory());
        DefaultJobMasterServiceProcessTest.assertInitializationException(entry.getException(), originalCause, entry.getTimestamp(), beforeFailureTimestamp, afterFailureTimestamp);
        Assertions.assertThat((boolean)entry.isGlobal()).isTrue();
    }

    private static void assertInitializationException(SerializedThrowable actualException, Throwable expectedCause, long actualTimestamp, long expectedLowerTimestampThreshold, long expectedUpperTimestampThreshold) {
        Throwable deserializedException = actualException.deserializeError(Thread.currentThread().getContextClassLoader());
        ((AbstractThrowableAssert)Assertions.assertThat((Throwable)deserializedException).isInstanceOf(JobInitializationException.class)).hasCause(expectedCause);
        Assertions.assertThat((long)actualTimestamp).isGreaterThanOrEqualTo(expectedLowerTimestampThreshold).isLessThanOrEqualTo(expectedUpperTimestampThreshold);
    }

    @Test
    void testCloseAfterInitializationFailure() throws Exception {
        CompletableFuture<JobMasterService> jobMasterServiceFuture = new CompletableFuture<JobMasterService>();
        DefaultJobMasterServiceProcess serviceProcess = this.createTestInstance(jobMasterServiceFuture);
        jobMasterServiceFuture.completeExceptionally(new RuntimeException("Init error"));
        serviceProcess.closeAsync().get();
        Assertions.assertThat((CompletableFuture)serviceProcess.getResultFuture()).isCompletedWithValueMatching(JobManagerRunnerResult::isInitializationFailure);
        Assertions.assertThat((CompletableFuture)serviceProcess.getJobMasterGatewayFuture()).isCompletedExceptionally();
    }

    @Test
    void testCloseAfterInitializationSuccess() throws Exception {
        CompletableFuture<JobMasterService> jobMasterServiceFuture = new CompletableFuture<JobMasterService>();
        DefaultJobMasterServiceProcess serviceProcess = this.createTestInstance(jobMasterServiceFuture);
        TestingJobMasterService testingJobMasterService = new TestingJobMasterService();
        jobMasterServiceFuture.complete(testingJobMasterService);
        serviceProcess.closeAsync().get();
        Assertions.assertThat((boolean)testingJobMasterService.isClosed()).isTrue();
        ((ListAssert)FlinkAssertions.assertThatFuture((CompletableFuture)serviceProcess.getResultFuture()).eventuallyFailsWith(ExecutionException.class).extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE)).anySatisfy(t -> {
            AbstractThrowableAssert cfr_ignored_0 = (AbstractThrowableAssert)Assertions.assertThat((Throwable)t).isInstanceOf(JobNotFinishedException.class);
        });
    }

    @Test
    void testJobMasterTerminationIsHandled() {
        CompletableFuture<JobMasterService> jobMasterServiceFuture = new CompletableFuture<JobMasterService>();
        DefaultJobMasterServiceProcess serviceProcess = this.createTestInstance(jobMasterServiceFuture);
        CompletableFuture<Void> jobMasterTerminationFuture = new CompletableFuture<Void>();
        TestingJobMasterService testingJobMasterService = new TestingJobMasterService("localhost", jobMasterTerminationFuture, null);
        jobMasterServiceFuture.complete(testingJobMasterService);
        RuntimeException testException = new RuntimeException("Fake exception from JobMaster");
        jobMasterTerminationFuture.completeExceptionally(testException);
        ((ListAssert)FlinkAssertions.assertThatFuture((CompletableFuture)serviceProcess.getResultFuture()).eventuallyFailsWith(ExecutionException.class).extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE)).anySatisfy(t -> {
            AbstractThrowableAssert cfr_ignored_0 = (AbstractThrowableAssert)Assertions.assertThat((Throwable)t).isEqualTo((Object)testException);
        });
    }

    @Test
    void testJobMasterGatewayGetsForwarded() {
        CompletableFuture<JobMasterService> jobMasterServiceFuture = new CompletableFuture<JobMasterService>();
        DefaultJobMasterServiceProcess serviceProcess = this.createTestInstance(jobMasterServiceFuture);
        TestingJobMasterGateway testingGateway = new TestingJobMasterGatewayBuilder().build();
        TestingJobMasterService testingJobMasterService = new TestingJobMasterService("localhost", null, testingGateway);
        jobMasterServiceFuture.complete(testingJobMasterService);
        Assertions.assertThat((CompletableFuture)serviceProcess.getJobMasterGatewayFuture()).isCompletedWithValue((Object)testingGateway);
    }

    @Test
    void testLeaderAddressGetsForwarded() throws Exception {
        CompletableFuture<JobMasterService> jobMasterServiceFuture = new CompletableFuture<JobMasterService>();
        DefaultJobMasterServiceProcess serviceProcess = this.createTestInstance(jobMasterServiceFuture);
        String testingAddress = "yolohost";
        TestingJobMasterService testingJobMasterService = new TestingJobMasterService(testingAddress, null, null);
        jobMasterServiceFuture.complete(testingJobMasterService);
        Assertions.assertThat((CompletableFuture)serviceProcess.getLeaderAddressFuture()).isCompletedWithValue((Object)testingAddress);
    }

    @Test
    void testIsNotInitialized() {
        DefaultJobMasterServiceProcess serviceProcess = this.createTestInstance(new CompletableFuture<JobMasterService>());
        Assertions.assertThat((boolean)serviceProcess.isInitializedAndRunning()).isFalse();
    }

    @Test
    void testIsInitialized() {
        CompletableFuture<JobMasterService> jobMasterServiceFuture = new CompletableFuture<JobMasterService>();
        DefaultJobMasterServiceProcess serviceProcess = this.createTestInstance(jobMasterServiceFuture);
        jobMasterServiceFuture.complete(new TestingJobMasterService());
        Assertions.assertThat((boolean)serviceProcess.isInitializedAndRunning()).isTrue();
    }

    @Test
    void testIsNotInitializedAfterClosing() {
        CompletableFuture<JobMasterService> jobMasterServiceFuture = new CompletableFuture<JobMasterService>();
        DefaultJobMasterServiceProcess serviceProcess = this.createTestInstance(jobMasterServiceFuture);
        jobMasterServiceFuture.complete(new TestingJobMasterService());
        serviceProcess.closeAsync();
        Assertions.assertThat((boolean)serviceProcess.isInitializedAndRunning()).isFalse();
    }

    @Test
    void testSuccessOnTerminalState() throws Exception {
        CompletableFuture<JobMasterService> jobMasterServiceFuture = new CompletableFuture<JobMasterService>();
        DefaultJobMasterServiceProcess serviceProcess = this.createTestInstance(jobMasterServiceFuture);
        jobMasterServiceFuture.complete(new TestingJobMasterService());
        ArchivedExecutionGraph archivedExecutionGraph = new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build();
        serviceProcess.jobReachedGloballyTerminalState(new ExecutionGraphInfo(archivedExecutionGraph));
        ((CompletableFutureAssert)Assertions.assertThat((CompletableFuture)serviceProcess.getResultFuture()).isCompletedWithValueMatching(JobManagerRunnerResult::isSuccess)).isCompletedWithValueMatching(r -> r.getExecutionGraphInfo().getArchivedExecutionGraph().getState() == JobStatus.FINISHED);
    }

    private DefaultJobMasterServiceProcess createTestInstance(CompletableFuture<JobMasterService> jobMasterServiceFuture) {
        return new DefaultJobMasterServiceProcess(jobId, UUID.randomUUID(), (JobMasterServiceFactory)new TestingJobMasterServiceFactory(ignored -> jobMasterServiceFuture), failedArchivedExecutionGraphFactory);
    }
}

