/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline.metrics;

import io.debezium.annotation.ThreadSafe;
import io.debezium.connector.base.ChangeEventQueueMetrics;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.metrics.Metrics;
import io.debezium.pipeline.metrics.ChangeEventSourceMetricsMXBean;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.schema.DataCollectionId;
import io.debezium.util.Clock;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.connect.data.Struct;

@ThreadSafe
public abstract class PipelineMetrics
extends Metrics
implements DataChangeEventListener,
ChangeEventSourceMetricsMXBean {
    protected final EventMetadataProvider metadataProvider;
    protected final AtomicLong totalNumberOfEventsSeen = new AtomicLong();
    private final AtomicLong numberOfEventsFiltered = new AtomicLong();
    protected final AtomicLong numberOfErroneousEvents = new AtomicLong();
    protected final AtomicLong lastEventTimestamp = new AtomicLong(-1L);
    private volatile String lastEvent;
    protected final Clock clock;
    private final ChangeEventQueueMetrics changeEventQueueMetrics;
    protected final CdcSourceTaskContext taskContext;

    protected <T extends CdcSourceTaskContext> PipelineMetrics(T taskContext, String contextName, ChangeEventQueueMetrics changeEventQueueMetrics, EventMetadataProvider metadataProvider) {
        super(taskContext, contextName);
        this.taskContext = taskContext;
        this.clock = taskContext.getClock();
        this.changeEventQueueMetrics = changeEventQueueMetrics;
        this.metadataProvider = metadataProvider;
    }

    @Override
    public void onEvent(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
        this.updateCommonEventMetrics();
        this.lastEvent = this.metadataProvider.toSummaryString(source, offset, key, value);
    }

    private void updateCommonEventMetrics() {
        this.totalNumberOfEventsSeen.incrementAndGet();
        this.lastEventTimestamp.set(this.clock.currentTimeInMillis());
    }

    @Override
    public void onFilteredEvent(String event) {
        this.numberOfEventsFiltered.incrementAndGet();
        this.updateCommonEventMetrics();
    }

    @Override
    public void onErroneousEvent(String event) {
        this.numberOfErroneousEvents.incrementAndGet();
        this.updateCommonEventMetrics();
    }

    @Override
    public String getLastEvent() {
        return this.lastEvent;
    }

    @Override
    public long getMilliSecondsSinceLastEvent() {
        return this.lastEventTimestamp.get() == -1L ? -1L : this.clock.currentTimeInMillis() - this.lastEventTimestamp.get();
    }

    @Override
    public long getTotalNumberOfEventsSeen() {
        return this.totalNumberOfEventsSeen.get();
    }

    @Override
    public long getNumberOfEventsFiltered() {
        return this.numberOfEventsFiltered.get();
    }

    @Override
    public long getNumberOfErroneousEvents() {
        return this.numberOfErroneousEvents.get();
    }

    @Override
    public void reset() {
        this.totalNumberOfEventsSeen.set(0L);
        this.lastEventTimestamp.set(-1L);
        this.numberOfEventsFiltered.set(0L);
        this.numberOfErroneousEvents.set(0L);
        this.lastEvent = null;
    }

    @Override
    public int getQueueTotalCapacity() {
        return this.changeEventQueueMetrics.totalCapacity();
    }

    @Override
    public int getQueueRemainingCapacity() {
        return this.changeEventQueueMetrics.remainingCapacity();
    }
}

