/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.util.TestLoggerExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={TestLoggerExtension.class})
abstract class CheckpointIDCounterTestBase {
    CheckpointIDCounterTestBase() {
    }

    protected abstract CheckpointIDCounter createCheckpointIdCounter() throws Exception;

    @Test
    void testCounterIsNeverNegative() throws Exception {
        CheckpointIDCounter counter = this.createCheckpointIdCounter();
        try {
            counter.start();
            Assertions.assertThat((long)counter.get()).isGreaterThanOrEqualTo(0L);
        }
        finally {
            counter.shutdown(JobStatus.FINISHED).join();
        }
    }

    @Test
    void testSerialIncrementAndGet() throws Exception {
        CheckpointIDCounter counter = this.createCheckpointIdCounter();
        try {
            counter.start();
            Assertions.assertThat((long)counter.getAndIncrement()).isOne();
            Assertions.assertThat((long)counter.get()).isEqualTo(2L);
            Assertions.assertThat((long)counter.getAndIncrement()).isEqualTo(2L);
            Assertions.assertThat((long)counter.get()).isEqualTo(3L);
            Assertions.assertThat((long)counter.getAndIncrement()).isEqualTo(3L);
            Assertions.assertThat((long)counter.get()).isEqualTo(4L);
            Assertions.assertThat((long)counter.getAndIncrement()).isEqualTo(4L);
        }
        finally {
            counter.shutdown(JobStatus.FINISHED).join();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testConcurrentGetAndIncrement() throws Exception {
        int numThreads = 8;
        CountDownLatch startLatch = new CountDownLatch(1);
        CheckpointIDCounter counter = this.createCheckpointIdCounter();
        counter.start();
        ExecutorService executor = null;
        try {
            executor = Executors.newFixedThreadPool(8);
            ArrayList<Future<List<Long>>> resultFutures = new ArrayList<Future<List<Long>>>(8);
            for (int i = 0; i < 8; ++i) {
                resultFutures.add(executor.submit(new Incrementer(startLatch, counter)));
            }
            startLatch.countDown();
            int expectedTotal = 1024;
            ArrayList<Long> all = new ArrayList<Long>(1024);
            for (Future future : resultFutures) {
                List counts = (List)future.get();
                CheckpointIDCounterTestBase.assertStrictlyMonotonous(counts);
                all.addAll(counts);
            }
            Collections.sort(all);
            Assertions.assertThat((int)all.size()).isEqualTo(1024);
            CheckpointIDCounterTestBase.assertStrictlyMonotonous(all);
            long lastCheckpointId = (Long)all.get(all.size() - 1);
            Assertions.assertThat((long)lastCheckpointId).isLessThan(counter.get());
            Assertions.assertThat((long)lastCheckpointId).isLessThan(counter.getAndIncrement());
        }
        finally {
            if (executor != null) {
                executor.shutdown();
            }
            counter.shutdown(JobStatus.FINISHED).join();
        }
    }

    private static void assertStrictlyMonotonous(List<Long> checkpointIds) {
        long current = -1L;
        for (long checkpointId : checkpointIds) {
            Assertions.assertThat((long)current).isLessThan(checkpointId);
            current = checkpointId;
        }
    }

    @Test
    void testSetCount() throws Exception {
        CheckpointIDCounter counter = this.createCheckpointIdCounter();
        counter.start();
        counter.setCount(1337L);
        Assertions.assertThat((long)counter.get()).isEqualTo(1337L);
        Assertions.assertThat((long)counter.getAndIncrement()).isEqualTo(1337L);
        Assertions.assertThat((long)counter.get()).isEqualTo(1338L);
        Assertions.assertThat((long)counter.getAndIncrement()).isEqualTo(1338L);
        counter.shutdown(JobStatus.FINISHED).join();
    }

    private static class Incrementer
    implements Callable<List<Long>> {
        private static final int NumIncrements = 128;
        private final CountDownLatch startLatch;
        private final CheckpointIDCounter counter;

        public Incrementer(CountDownLatch startLatch, CheckpointIDCounter counter) {
            this.startLatch = startLatch;
            this.counter = counter;
        }

        @Override
        public List<Long> call() throws Exception {
            Random rand = new Random();
            ArrayList<Long> counts = new ArrayList<Long>();
            this.startLatch.await();
            for (int i = 0; i < 128; ++i) {
                counts.add(this.counter.getAndIncrement());
                Thread.sleep(rand.nextInt(20));
            }
            return counts;
        }
    }
}

