/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink.committables;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils;
import org.apache.flink.shaded.guava32.com.google.common.collect.Streams;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.connector.sink2.IntegerSerializer;
import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer;
import org.apache.flink.streaming.runtime.operators.sink.committables.SubtaskCommittableManager;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;

class CommittableCollectorSerializerTest {
    private static final SimpleVersionedSerializer<Integer> COMMITTABLE_SERIALIZER = new IntegerSerializer();
    private static final int SUBTASK_ID = 1;
    private static final int NUMBER_OF_SUBTASKS = 1;
    private static final SinkCommitterMetricGroup METRIC_GROUP = MetricsGroupTestUtils.mockCommitterMetricGroup();
    private static final CommittableCollectorSerializer<Integer> SERIALIZER = new CommittableCollectorSerializer(COMMITTABLE_SERIALIZER, 1, 1, METRIC_GROUP);

    CommittableCollectorSerializerTest() {
    }

    @Test
    void testCommittableCollectorV1SerDe() throws IOException {
        List<Integer> legacyState = Arrays.asList(1, 2, 3);
        DataOutputSerializer out = new DataOutputSerializer(256);
        out.writeInt(-1189141204);
        SimpleVersionedSerialization.writeVersionAndSerializeList(COMMITTABLE_SERIALIZER, legacyState, (DataOutputView)out);
        byte[] serialized = out.getCopyOfBuffer();
        CommittableCollector committableCollector = SERIALIZER.deserialize(1, serialized);
        Assertions.assertThat((boolean)committableCollector.isFinished()).isFalse();
        ((ListAssert)((ObjectAssert)Assertions.assertThat((Collection)committableCollector.getCheckpointCommittables()).singleElement()).extracting(checkpointCommittable -> checkpointCommittable.getSubtaskCommittableManager(0).getPendingRequests().map(CommitRequestImpl::getCommittable), InstanceOfAssertFactories.stream(Integer.class))).containsExactly((Object[])new Integer[]{1, 2, 3});
    }

    @Test
    void testCommittableCollectorV2SerDe() throws IOException {
        int subtaskId = 2;
        int numberOfSubtasks = 3;
        CommittableCollectorSerializer ccSerializer = new CommittableCollectorSerializer(COMMITTABLE_SERIALIZER, subtaskId, numberOfSubtasks, METRIC_GROUP);
        CommittableCollector committableCollector = new CommittableCollector(METRIC_GROUP);
        committableCollector.addMessage((CommittableMessage)new CommittableSummary(subtaskId, numberOfSubtasks, 1L, 1, 1, 0));
        committableCollector.addMessage((CommittableMessage)new CommittableSummary(subtaskId, numberOfSubtasks, 2L, 1, 1, 0));
        committableCollector.addMessage((CommittableMessage)new CommittableWithLineage((Object)1, 1L, subtaskId));
        committableCollector.addMessage((CommittableMessage)new CommittableWithLineage((Object)2, 2L, subtaskId));
        CommittableCollector copy = ccSerializer.deserialize(2, SERIALIZER.serialize(committableCollector));
        Assertions.assertThat((boolean)copy.isFinished()).isFalse();
        this.assertCommittableCollector("Original CommittableCollector", subtaskId, numberOfSubtasks, (CommittableCollector<Integer>)committableCollector, Arrays.asList(Collections.singletonList(1), Collections.singletonList(2)));
        this.assertCommittableCollector("Deserialized CommittableCollector", subtaskId, numberOfSubtasks, (CommittableCollector<Integer>)copy, Arrays.asList(Collections.singletonList(1), Collections.singletonList(2)));
    }

    @Test
    void testCommittablesForSameSubtaskIdV2SerDe() throws IOException {
        int subtaskId = 1;
        int numberOfSubtasks = 3;
        CommittableCollectorSerializer ccSerializer = new CommittableCollectorSerializer(COMMITTABLE_SERIALIZER, subtaskId, numberOfSubtasks, METRIC_GROUP);
        CommittableCollector committableCollector = new CommittableCollector(METRIC_GROUP);
        committableCollector.addMessage((CommittableMessage)new CommittableSummary(subtaskId, numberOfSubtasks, 1L, 1, 1, 0));
        committableCollector.addMessage((CommittableMessage)new CommittableSummary(subtaskId + 1, numberOfSubtasks, 1L, 1, 1, 0));
        committableCollector.addMessage((CommittableMessage)new CommittableWithLineage((Object)1, 1L, subtaskId));
        committableCollector.addMessage((CommittableMessage)new CommittableWithLineage((Object)1, 1L, subtaskId + 1));
        CommittableCollector copy = ccSerializer.deserialize(2, SERIALIZER.serialize(committableCollector));
        Assertions.assertThat((boolean)copy.isFinished()).isFalse();
        this.assertCommittableCollector("Original CommittableCollector", subtaskId, numberOfSubtasks, (CommittableCollector<Integer>)committableCollector, Collections.singletonList(Collections.singletonList(1)));
        Assertions.assertThat((Object)copy).isEqualTo((Object)committableCollector);
    }

    @Test
    void testAlignSubtaskCommittableManagerCheckpointWithCheckpointCommittableManagerCheckpointId() throws IOException {
        long checkpointId = 2L;
        CommittableCollector committableCollector = new CommittableCollector(METRIC_GROUP);
        committableCollector.addMessage((CommittableMessage)new CommittableSummary(1, 1, checkpointId, 1, 1, 0));
        committableCollector.addMessage((CommittableMessage)new CommittableWithLineage((Object)1, checkpointId, 1));
        CommittableCollector copy = SERIALIZER.deserialize(2, SERIALIZER.serialize(committableCollector));
        Collection checkpointCommittables = copy.getCheckpointCommittables();
        Assertions.assertThat((Collection)checkpointCommittables).hasSize(1);
        CheckpointCommittableManagerImpl committableManager = (CheckpointCommittableManagerImpl)checkpointCommittables.iterator().next();
        Assertions.assertThat((long)committableManager.getSubtaskCommittableManager(1).getCheckpointId()).isEqualTo(committableManager.getCheckpointId());
    }

    private void assertCommittableCollector(String assertMessageHeading, int subtaskId, int numberOfSubtasks, CommittableCollector<Integer> committableCollector, List<List<Integer>> committablesPerSubtaskPerCheckpoint) {
        org.junit.jupiter.api.Assertions.assertAll((String)assertMessageHeading, (Executable[])new Executable[]{() -> {
            Collection checkpointCommittables = committableCollector.getCheckpointCommittables();
            int expectedCommittableSize = committablesPerSubtaskPerCheckpoint.size();
            Assertions.assertThat((Collection)checkpointCommittables).hasSize(expectedCommittableSize);
            Streams.zip(checkpointCommittables.stream(), committablesPerSubtaskPerCheckpoint.stream(), Pair::of).forEach(pair -> {
                CheckpointCommittableManagerImpl checkpointCommittableManager = (CheckpointCommittableManagerImpl)pair.getKey();
                List expectedPendingRequestCount = (List)pair.getValue();
                SubtaskCommittableManager subtaskCommittableManager = checkpointCommittableManager.getSubtaskCommittableManager(subtaskId);
                SinkV2Assertions.assertThat(checkpointCommittableManager.getSummary(subtaskId, numberOfSubtasks)).hasSubtaskId(subtaskId).hasNumberOfSubtasks(numberOfSubtasks);
                this.assertPendingRequests((SubtaskCommittableManager<Integer>)subtaskCommittableManager, expectedPendingRequestCount);
                Assertions.assertThat((int)subtaskCommittableManager.getSubtaskId()).isEqualTo(subtaskId);
            });
        }});
    }

    private void assertPendingRequests(SubtaskCommittableManager<Integer> subtaskCommittableManagerCheckpoint, List<Integer> expectedPendingRequestCount) {
        Assertions.assertThat(subtaskCommittableManagerCheckpoint.getPendingRequests().map(CommitRequestImpl::getCommittable).collect(Collectors.toList())).containsExactlyElementsOf(expectedPendingRequestCount);
    }
}

