/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.mocks.MockSourceReader;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.SourceOperatorTestContext;
import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.util.CollectionUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SourceOperatorTest {
    @Nullable
    private SourceOperatorTestContext context;
    @Nullable
    private SourceOperator<Integer, MockSourceSplit> operator;
    @Nullable
    private MockSourceReader mockSourceReader;
    @Nullable
    private MockOperatorEventGateway mockGateway;

    @Before
    public void setup() throws Exception {
        this.context = new SourceOperatorTestContext();
        this.operator = this.context.getOperator();
        this.mockSourceReader = this.context.getSourceReader();
        this.mockGateway = this.context.getGateway();
    }

    @After
    public void tearDown() throws Exception {
        this.context.close();
        this.context = null;
        this.operator = null;
        this.mockSourceReader = null;
        this.mockGateway = null;
    }

    @Test
    public void testInitializeState() throws Exception {
        StateInitializationContext stateContext = this.context.createStateContext();
        this.operator.initializeState(stateContext);
        Assert.assertNotNull((Object)stateContext.getOperatorStateStore().getListState(SourceOperator.SPLITS_STATE_DESC));
    }

    @Test
    public void testOpen() throws Exception {
        this.operator.initializeState(this.context.createStateContext());
        this.operator.open();
        Assert.assertEquals(Collections.singletonList(SourceOperatorTestContext.MOCK_SPLIT), (Object)this.mockSourceReader.getAssignedSplits());
        Assert.assertTrue((boolean)this.mockSourceReader.isStarted());
        Assert.assertEquals((long)1L, (long)this.mockGateway.getEventsSent().size());
        OperatorEvent operatorEvent = (OperatorEvent)this.mockGateway.getEventsSent().get(0);
        Assert.assertTrue((boolean)(operatorEvent instanceof ReaderRegistrationEvent));
        Assert.assertEquals((long)1L, (long)((ReaderRegistrationEvent)operatorEvent).subtaskId());
    }

    @Test
    public void testStop() throws Exception {
        this.operator.initializeState(this.context.createStateContext());
        this.operator.open();
        Assert.assertEquals(Collections.singletonList(SourceOperatorTestContext.MOCK_SPLIT), (Object)this.mockSourceReader.getAssignedSplits());
        CollectingDataOutput dataOutput = new CollectingDataOutput();
        Assert.assertEquals((Object)DataInputStatus.NOTHING_AVAILABLE, (Object)this.operator.emitNext(dataOutput));
        Assert.assertFalse((boolean)this.operator.isAvailable());
        CompletableFuture sourceStopped = this.operator.stop(StopMode.DRAIN);
        Assert.assertTrue((boolean)this.operator.isAvailable());
        Assert.assertFalse((boolean)sourceStopped.isDone());
        Assert.assertEquals((Object)DataInputStatus.END_OF_DATA, (Object)this.operator.emitNext(dataOutput));
        this.operator.finish();
        Assert.assertTrue((boolean)sourceStopped.isDone());
    }

    @Test
    public void testHandleAddSplitsEvent() throws Exception {
        this.operator.initializeState(this.context.createStateContext());
        this.operator.open();
        MockSourceSplit newSplit = new MockSourceSplit(2);
        this.operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Collections.singletonList(newSplit), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        Assert.assertEquals(Arrays.asList(SourceOperatorTestContext.MOCK_SPLIT, newSplit), (Object)this.mockSourceReader.getAssignedSplits());
    }

    @Test
    public void testHandleAddSourceEvent() throws Exception {
        this.operator.initializeState(this.context.createStateContext());
        this.operator.open();
        SourceEvent event = new SourceEvent(){};
        this.operator.handleOperatorEvent((OperatorEvent)new SourceEventWrapper(event));
        Assert.assertEquals(Collections.singletonList(event), (Object)this.mockSourceReader.getReceivedSourceEvents());
    }

    @Test
    public void testSnapshotState() throws Exception {
        StateInitializationContext stateContext = this.context.createStateContext();
        this.operator.initializeState(stateContext);
        this.operator.open();
        MockSourceSplit newSplit = new MockSourceSplit(2);
        this.operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Collections.singletonList(newSplit), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        this.operator.snapshotState((StateSnapshotContext)new StateSnapshotContextSynchronousImpl(100L, 100L));
        List splitsInState = CollectionUtil.iterableToList((Iterable)((Iterable)this.operator.getReaderState().get()));
        Assert.assertEquals(Arrays.asList(SourceOperatorTestContext.MOCK_SPLIT, newSplit), (Object)splitsInState);
    }

    @Test
    public void testNotifyCheckpointComplete() throws Exception {
        StateInitializationContext stateContext = this.context.createStateContext();
        this.operator.initializeState(stateContext);
        this.operator.open();
        this.operator.snapshotState((StateSnapshotContext)new StateSnapshotContextSynchronousImpl(100L, 100L));
        this.operator.notifyCheckpointComplete(100L);
        Assert.assertEquals((long)100L, (long)((Long)this.mockSourceReader.getCompletedCheckpoints().get(0)));
    }

    @Test
    public void testNotifyCheckpointAborted() throws Exception {
        StateInitializationContext stateContext = this.context.createStateContext();
        this.operator.initializeState(stateContext);
        this.operator.open();
        this.operator.snapshotState((StateSnapshotContext)new StateSnapshotContextSynchronousImpl(100L, 100L));
        this.operator.notifyCheckpointAborted(100L);
        Assert.assertEquals((long)100L, (long)((Long)this.mockSourceReader.getAbortedCheckpoints().get(0)));
    }
}

