/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import java.io.Serializable;
import java.util.concurrent.RejectedExecutionException;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class MailboxOperatorTest {
    MailboxOperatorTest() {
    }

    @Test
    void testAvoidTaskStarvation() throws Exception {
        int numRecords = 3;
        StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.INT_TYPE_INFO).addInput((TypeInformation<?>)BasicTypeInfo.INT_TYPE_INFO).setupOperatorChain((StreamOperator<?>)new StreamMap((MapFunction & Serializable)i1 -> i1)).chain(new StreamMap((MapFunction & Serializable)i -> i), IntSerializer.INSTANCE).finish();
        try (StreamTaskMailboxTestHarness<Integer> testHarness = builder.build();){
            ReplicatingMail mail1 = this.createReplicatingMail(3, testHarness, 0);
            ReplicatingMail mail2 = this.createReplicatingMail(3, testHarness, 1);
            for (int i2 = 0; i2 < 3; ++i2) {
                testHarness.processElement(new StreamRecord((Object)i2));
            }
            while (testHarness.getOutput().size() < 3) {
                testHarness.processSingleStep();
            }
            int[] output = testHarness.getOutput().stream().mapToInt(r -> (Integer)((StreamRecord)r).getValue()).toArray();
            Assertions.assertThat((int[])output).isEqualTo((Object)IntStream.range(0, 3).toArray());
            Assertions.assertThat((int)mail1.getMailCount()).isEqualTo(4);
            Assertions.assertThat((int)mail2.getMailCount()).isEqualTo(4);
        }
    }

    @Nonnull
    private ReplicatingMail createReplicatingMail(int numRecords, StreamTaskMailboxTestHarness<Integer> testHarness, int priority) {
        MailboxExecutor mailboxExecutor = testHarness.getExecutor(priority);
        ReplicatingMail mail1 = new ReplicatingMail(mailboxExecutor, numRecords + 1);
        mailboxExecutor.submit((RunnableWithException)mail1, "Initial mail");
        return mail1;
    }

    private static class ReplicatingMail
    implements RunnableWithException {
        private int mailCount = -1;
        private final MailboxExecutor mailboxExecutor;
        private final int maxMails;

        ReplicatingMail(MailboxExecutor mailboxExecutor, int maxMails) {
            this.mailboxExecutor = mailboxExecutor;
            this.maxMails = maxMails;
        }

        public void run() {
            try {
                if (this.mailCount < this.maxMails) {
                    this.mailboxExecutor.execute((ThrowingRunnable)this, "Blocking mail" + ++this.mailCount);
                }
            }
            catch (RejectedExecutionException rejectedExecutionException) {
                // empty catch block
            }
        }

        int getMailCount() {
            return this.mailCount;
        }
    }
}

