/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.util;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.groups.GenericMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.util.LatencyStats;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class LatencyStatsTest {
    private static final OperatorID OPERATOR_ID = new OperatorID();
    private static final OperatorID SOURCE_ID_1 = new OperatorID();
    private static final OperatorID SOURCE_ID_2 = new OperatorID();
    private static final int OPERATOR_SUBTASK_INDEX = 64;
    private static final String PARENT_GROUP_NAME = "parent";

    LatencyStatsTest() {
    }

    @Test
    void testLatencyStatsSingle() {
        LatencyStatsTest.testLatencyStats(LatencyStats.Granularity.SINGLE, registrations -> {
            Assertions.assertThat((List)registrations).hasSize(1);
            Tuple2 registration = (Tuple2)registrations.get(0);
            LatencyStatsTest.assertName((String)registration.f0);
            Assertions.assertThat((long)((Histogram)registration.f1).getCount()).isEqualTo(5L);
        });
    }

    @Test
    void testLatencyStatsOperator() {
        LatencyStatsTest.testLatencyStats(LatencyStats.Granularity.OPERATOR, registrations -> {
            Assertions.assertThat((List)registrations).hasSize(2);
            Tuple2 registration = (Tuple2)registrations.get(0);
            LatencyStatsTest.assertName((String)registration.f0, SOURCE_ID_1);
            Assertions.assertThat((long)((Histogram)registration.f1).getCount()).isEqualTo(3L);
            registration = (Tuple2)registrations.get(1);
            LatencyStatsTest.assertName((String)registration.f0, SOURCE_ID_2);
            Assertions.assertThat((long)((Histogram)registration.f1).getCount()).isEqualTo(2L);
        });
    }

    @Test
    void testLatencyStatsSubtask() {
        LatencyStatsTest.testLatencyStats(LatencyStats.Granularity.SUBTASK, registrations -> {
            Assertions.assertThat((List)registrations).hasSize(4);
            Tuple2 registration = (Tuple2)registrations.get(0);
            LatencyStatsTest.assertName((String)registration.f0, SOURCE_ID_1, 0);
            Assertions.assertThat((long)((Histogram)registration.f1).getCount()).isEqualTo(2L);
            registration = (Tuple2)registrations.get(1);
            LatencyStatsTest.assertName((String)registration.f0, SOURCE_ID_1, 1);
            Assertions.assertThat((long)((Histogram)registration.f1).getCount()).isOne();
            registration = (Tuple2)registrations.get(2);
            LatencyStatsTest.assertName((String)registration.f0, SOURCE_ID_2, 2);
            Assertions.assertThat((long)((Histogram)registration.f1).getCount()).isOne();
            registration = (Tuple2)registrations.get(3);
            LatencyStatsTest.assertName((String)registration.f0, SOURCE_ID_2, 3);
            Assertions.assertThat((long)((Histogram)registration.f1).getCount()).isOne();
        });
    }

    private static void testLatencyStats(LatencyStats.Granularity granularity, Consumer<List<Tuple2<String, Histogram>>> verifier) {
        InternalOperatorMetricGroup dummyGroup = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
        ArrayList latencyHistograms = new ArrayList(4);
        TestingMetricRegistry registry = TestingMetricRegistry.builder().setRegisterConsumer((metric, metricName, group) -> {
            if (metric instanceof Histogram) {
                latencyHistograms.add(Tuple2.of((Object)group.getMetricIdentifier(metricName), (Object)((Histogram)metric)));
            }
        }).build();
        GenericMetricGroup parentGroup = new GenericMetricGroup((MetricRegistry)registry, (AbstractMetricGroup)dummyGroup, PARENT_GROUP_NAME);
        LatencyStats latencyStats = new LatencyStats((MetricGroup)parentGroup, ((Integer)MetricOptions.LATENCY_HISTORY_SIZE.defaultValue()).intValue(), 64, OPERATOR_ID, granularity);
        latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 0));
        latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 0));
        latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 1));
        latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 2));
        latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 3));
        verifier.accept(latencyHistograms);
    }

    private static String sanitizeName(String registrationName) {
        return registrationName.substring(registrationName.lastIndexOf(PARENT_GROUP_NAME) + PARENT_GROUP_NAME.length() + 1);
    }

    private static void assertName(String registrationName) {
        String sanitizedName = LatencyStatsTest.sanitizeName(registrationName);
        Assertions.assertThat((String)sanitizedName).isEqualTo("operator_id." + OPERATOR_ID + ".operator_subtask_index.64.latency");
    }

    private static void assertName(String registrationName, OperatorID sourceId) {
        String sanitizedName = LatencyStatsTest.sanitizeName(registrationName);
        Assertions.assertThat((String)sanitizedName).isEqualTo("source_id." + sourceId + ".operator_id." + OPERATOR_ID + ".operator_subtask_index.64.latency");
    }

    private static void assertName(String registrationName, OperatorID sourceId, int sourceIndex) {
        String sanitizedName = LatencyStatsTest.sanitizeName(registrationName);
        Assertions.assertThat((String)sanitizedName).isEqualTo("source_id." + sourceId + ".source_subtask_index." + sourceIndex + ".operator_id." + OPERATOR_ID + ".operator_subtask_index.64.latency");
    }
}

