/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.image.loader;

import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.image.loader.LogDeltaManifest;
import org.apache.kafka.image.loader.MetadataLoaderMetrics;
import org.apache.kafka.image.loader.SnapshotManifest;
import org.apache.kafka.image.publisher.MetadataPublisher;
import org.apache.kafka.image.writer.ImageReWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.FaultHandlerException;
import org.apache.kafka.snapshot.SnapshotReader;
import org.slf4j.Logger;

public class MetadataLoader
implements RaftClient.Listener<ApiMessageAndVersion>,
AutoCloseable {
    private static final String INITIALIZE_NEW_PUBLISHERS = "InitializeNewPublishers";
    private final Logger log;
    private final Time time;
    private final FaultHandler faultHandler;
    private final MetadataLoaderMetrics metrics;
    private final Supplier<OptionalLong> highWaterMarkAccessor;
    private final LinkedHashMap<String, MetadataPublisher> uninitializedPublishers;
    private final LinkedHashMap<String, MetadataPublisher> publishers;
    private boolean catchingUp = true;
    private LeaderAndEpoch currentLeaderAndEpoch = LeaderAndEpoch.UNKNOWN;
    private MetadataImage image;
    private final KafkaEventQueue eventQueue;

    private MetadataLoader(Time time, LogContext logContext, String threadNamePrefix, FaultHandler faultHandler, MetadataLoaderMetrics metrics, Supplier<OptionalLong> highWaterMarkAccessor) {
        this.log = logContext.logger(MetadataLoader.class);
        this.time = time;
        this.faultHandler = faultHandler;
        this.metrics = metrics;
        this.highWaterMarkAccessor = highWaterMarkAccessor;
        this.uninitializedPublishers = new LinkedHashMap();
        this.publishers = new LinkedHashMap();
        this.image = MetadataImage.EMPTY;
        this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, threadNamePrefix);
    }

    private boolean stillNeedToCatchUp(String where, long offset) {
        if (!this.catchingUp) {
            this.log.trace("{}: we are not in the initial catching up state.", (Object)where);
            return false;
        }
        OptionalLong highWaterMark = this.highWaterMarkAccessor.get();
        if (!highWaterMark.isPresent()) {
            this.log.info("{}: the loader is still catching up because we still don't know the high water mark yet.", (Object)where);
            return true;
        }
        if (highWaterMark.getAsLong() - 1L > offset) {
            this.log.info("{}: The loader is still catching up because we have loaded up to offset " + offset + ", but the high water mark is {}", (Object)where, (Object)highWaterMark.getAsLong());
            return true;
        }
        this.log.info("{}: The loader finished catching up to the current high water mark of {}", (Object)where, (Object)highWaterMark.getAsLong());
        this.catchingUp = false;
        return false;
    }

    void scheduleInitializeNewPublishers(long delayNs) {
        this.eventQueue.scheduleDeferred(INITIALIZE_NEW_PUBLISHERS, (Function)new EventQueue.EarliestDeadlineFunction(this.eventQueue.time().nanoseconds() + delayNs), () -> {
            try {
                this.initializeNewPublishers();
            }
            catch (Throwable e) {
                this.faultHandler.handleFault("Unhandled error initializing new publishers", e);
            }
        });
    }

    void initializeNewPublishers() {
        if (this.uninitializedPublishers.isEmpty()) {
            this.log.debug("InitializeNewPublishers: nothing to do.");
            return;
        }
        if (this.stillNeedToCatchUp("initializeNewPublishers", this.image.highestOffsetAndEpoch().offset())) {
            this.log.debug("InitializeNewPublishers: unable to initialize new publisher(s) {} because we are still catching up with quorum metadata. Rescheduling.", (Object)this.uninitializedPublisherNames());
            this.scheduleInitializeNewPublishers(TimeUnit.MILLISECONDS.toNanos(100L));
            return;
        }
        this.log.debug("InitializeNewPublishers: setting up snapshot image for new publisher(s): {}", (Object)this.uninitializedPublisherNames());
        long startNs = this.time.nanoseconds();
        MetadataDelta delta = new MetadataDelta.Builder().setImage(MetadataImage.EMPTY).build();
        ImageReWriter writer = new ImageReWriter(delta);
        this.image.write(writer, new ImageWriterOptions.Builder().setMetadataVersion(this.image.features().metadataVersion()).build());
        SnapshotManifest manifest = new SnapshotManifest(this.image.provenance(), this.time.nanoseconds() - startNs);
        Iterator<MetadataPublisher> iter = this.uninitializedPublishers.values().iterator();
        while (iter.hasNext()) {
            MetadataPublisher publisher = iter.next();
            iter.remove();
            try {
                this.log.info("InitializeNewPublishers: initializing {} with a snapshot at offset {}", (Object)publisher.name(), (Object)this.image.highestOffsetAndEpoch().offset());
                publisher.publishSnapshot(delta, this.image, manifest);
                this.publishers.put(publisher.name(), publisher);
            }
            catch (Throwable e) {
                this.faultHandler.handleFault("Unhandled error initializing " + publisher.name() + " with a snapshot at offset " + this.image.highestOffsetAndEpoch().offset(), e);
            }
        }
    }

    private String uninitializedPublisherNames() {
        return String.join((CharSequence)", ", this.uninitializedPublishers.keySet());
    }

    public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
        this.eventQueue.append(() -> {
            try {
                MetadataDelta delta = new MetadataDelta.Builder().setImage(this.image).build();
                LogDeltaManifest manifest = this.loadLogDelta(delta, reader);
                if (this.log.isDebugEnabled()) {
                    this.log.debug("handleCommit: Generated a metadata delta between {} and {} from {} batch(es) in {} us.", new Object[]{this.image.offset(), manifest.provenance().lastContainedOffset(), manifest.numBatches(), TimeUnit.NANOSECONDS.toMicros(manifest.elapsedNs())});
                }
                try {
                    this.image = delta.apply(manifest.provenance());
                }
                catch (Throwable e) {
                    this.faultHandler.handleFault("Error generating new metadata image from metadata delta between offset " + this.image.offset() + " and " + manifest.provenance().lastContainedOffset(), e);
                    reader.close();
                    return;
                }
                if (this.stillNeedToCatchUp("handleCommit", manifest.provenance().lastContainedOffset())) {
                    return;
                }
                if (this.log.isDebugEnabled()) {
                    this.log.debug("handleCommit: publishing new image with provenance {}.", (Object)this.image.provenance());
                }
                for (MetadataPublisher publisher : this.publishers.values()) {
                    try {
                        publisher.publishLogDelta(delta, this.image, manifest);
                    }
                    catch (Throwable e) {
                        this.faultHandler.handleFault("Unhandled error publishing the new metadata image ending at " + manifest.provenance().lastContainedOffset() + " with publisher " + publisher.name(), e);
                    }
                }
                this.metrics.updateLastAppliedImageProvenance(this.image.provenance());
                if (this.uninitializedPublishers.isEmpty()) {
                    this.scheduleInitializeNewPublishers(0L);
                }
            }
            catch (Throwable e) {
                this.faultHandler.handleFault("Unhandled fault in MetadataLoader#handleCommit. Last image offset was " + this.image.offset(), e);
            }
            finally {
                reader.close();
            }
        });
    }

    LogDeltaManifest loadLogDelta(MetadataDelta delta, BatchReader<ApiMessageAndVersion> reader) {
        long startNs = this.time.nanoseconds();
        int numBatches = 0;
        long numBytes = 0L;
        long lastOffset = this.image.provenance().lastContainedOffset();
        int lastEpoch = this.image.provenance().lastContainedEpoch();
        long lastContainedLogTimeMs = this.image.provenance().lastContainedLogTimeMs();
        while (reader.hasNext()) {
            Batch batch = (Batch)reader.next();
            int indexWithinBatch = 0;
            for (ApiMessageAndVersion record : batch.records()) {
                try {
                    delta.replay(record.message());
                }
                catch (Throwable e) {
                    this.faultHandler.handleFault("Error loading metadata log record from offset " + batch.baseOffset() + indexWithinBatch, e);
                }
                ++indexWithinBatch;
            }
            this.metrics.updateBatchSize(batch.records().size());
            lastOffset = batch.lastOffset();
            lastEpoch = batch.epoch();
            lastContainedLogTimeMs = batch.appendTimestamp();
            numBytes += (long)batch.sizeInBytes();
            ++numBatches;
        }
        MetadataProvenance provenance = new MetadataProvenance(lastOffset, lastEpoch, lastContainedLogTimeMs);
        long elapsedNs = this.time.nanoseconds() - startNs;
        this.metrics.updateBatchProcessingTime(elapsedNs);
        return new LogDeltaManifest(provenance, this.currentLeaderAndEpoch, numBatches, elapsedNs, numBytes);
    }

    public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
        this.eventQueue.append(() -> {
            try {
                MetadataDelta delta = new MetadataDelta.Builder().setImage(this.image).build();
                SnapshotManifest manifest = this.loadSnapshot(delta, reader);
                this.log.info("handleSnapshot: generated a metadata delta from a snapshot at offset {} in {} us.", (Object)manifest.provenance().lastContainedOffset(), (Object)TimeUnit.NANOSECONDS.toMicros(manifest.elapsedNs()));
                try {
                    this.image = delta.apply(manifest.provenance());
                }
                catch (Throwable e) {
                    this.faultHandler.handleFault("Error generating new metadata image from snapshot at offset " + reader.lastContainedLogOffset(), e);
                    reader.close();
                    return;
                }
                if (this.stillNeedToCatchUp("handleSnapshot", manifest.provenance().lastContainedOffset())) {
                    return;
                }
                this.log.info("handleSnapshot: publishing new snapshot image with provenance {}.", (Object)this.image.provenance());
                for (MetadataPublisher publisher : this.publishers.values()) {
                    try {
                        publisher.publishSnapshot(delta, this.image, manifest);
                    }
                    catch (Throwable e) {
                        this.faultHandler.handleFault("Unhandled error publishing the new metadata image from snapshot at offset " + reader.lastContainedLogOffset() + " with publisher " + publisher.name(), e);
                    }
                }
                this.metrics.updateLastAppliedImageProvenance(this.image.provenance());
                if (this.uninitializedPublishers.isEmpty()) {
                    this.scheduleInitializeNewPublishers(0L);
                }
            }
            catch (Throwable e) {
                this.faultHandler.handleFault("Unhandled fault in MetadataLoader#handleSnapshot. Snapshot offset was " + reader.lastContainedLogOffset(), e);
            }
            finally {
                reader.close();
            }
        });
    }

    SnapshotManifest loadSnapshot(MetadataDelta delta, SnapshotReader<ApiMessageAndVersion> reader) {
        long startNs = this.time.nanoseconds();
        int snapshotIndex = 0;
        while (reader.hasNext()) {
            Batch batch = (Batch)reader.next();
            for (ApiMessageAndVersion record : batch.records()) {
                try {
                    delta.replay(record.message());
                }
                catch (Throwable e) {
                    this.faultHandler.handleFault("Error loading metadata log record " + snapshotIndex + " in snapshot at offset " + reader.lastContainedLogOffset(), e);
                }
                ++snapshotIndex;
            }
        }
        delta.finishSnapshot();
        MetadataProvenance provenance = new MetadataProvenance(reader.lastContainedLogOffset(), reader.lastContainedLogEpoch(), reader.lastContainedLogTimestamp());
        return new SnapshotManifest(provenance, this.time.nanoseconds() - startNs);
    }

    public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch) {
        this.eventQueue.append(() -> {
            this.currentLeaderAndEpoch = leaderAndEpoch;
        });
    }

    public CompletableFuture<Void> installPublishers(List<? extends MetadataPublisher> newPublishers) {
        if (newPublishers.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.eventQueue.append(() -> {
            try {
                for (MetadataPublisher newPublisher : newPublishers) {
                    MetadataPublisher prev = this.publishers.get(newPublisher.name());
                    if (prev == null) {
                        prev = this.uninitializedPublishers.get(newPublisher.name());
                    }
                    if (prev == null) continue;
                    if (prev == newPublisher) {
                        throw this.faultHandler.handleFault("Attempted to install publisher " + newPublisher.name() + ", which is already installed.");
                    }
                    throw this.faultHandler.handleFault("Attempted to install a new publisher named " + newPublisher.name() + ", but there is already a publisher with that name.");
                }
                newPublishers.forEach(p -> this.uninitializedPublishers.put(p.name(), (MetadataPublisher)p));
                this.scheduleInitializeNewPublishers(0L);
                future.complete(null);
            }
            catch (Throwable e) {
                future.completeExceptionally(this.faultHandler.handleFault("Unhandled fault in MetadataLoader#installPublishers", e));
            }
        });
        return future;
    }

    void waitForAllEventsToBeHandled() throws Exception {
        CompletableFuture future = new CompletableFuture();
        this.eventQueue.append(() -> future.complete(null));
        future.get();
    }

    public CompletableFuture<Void> removeAndClosePublisher(MetadataPublisher publisher) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.eventQueue.append(() -> {
            try {
                if (!this.publishers.remove(publisher.name(), publisher) && !this.uninitializedPublishers.remove(publisher.name(), publisher)) {
                    throw this.faultHandler.handleFault("Attempted to remove publisher " + publisher.name() + ", which is not installed.");
                }
                this.closePublisher(publisher);
                future.complete(null);
            }
            catch (Throwable e) {
                future.completeExceptionally(e);
            }
        });
        return future;
    }

    public long lastAppliedOffset() {
        return this.metrics.lastAppliedOffset();
    }

    public void beginShutdown() {
        this.eventQueue.beginShutdown("beginShutdown", () -> {
            Iterator<MetadataPublisher> iter = this.uninitializedPublishers.values().iterator();
            while (iter.hasNext()) {
                this.closePublisher(iter.next());
                iter.remove();
            }
            iter = this.publishers.values().iterator();
            while (iter.hasNext()) {
                this.closePublisher(iter.next());
                iter.remove();
            }
        });
    }

    Time time() {
        return this.time;
    }

    private void closePublisher(MetadataPublisher publisher) {
        try {
            publisher.close();
        }
        catch (Throwable e) {
            this.faultHandler.handleFault("Got unexpected exception while closing publisher " + publisher.name(), e);
        }
    }

    @Override
    public void close() throws Exception {
        this.beginShutdown();
        this.eventQueue.close();
    }

    public static class Builder {
        private int nodeId = -1;
        private Time time = Time.SYSTEM;
        private LogContext logContext = null;
        private String threadNamePrefix = "";
        private FaultHandler faultHandler = (m, e) -> new FaultHandlerException(m, e);
        private MetadataLoaderMetrics metrics = new MetadataLoaderMetrics(){
            private volatile long lastAppliedOffset = -1L;

            @Override
            public void updateBatchProcessingTime(long elapsedNs) {
            }

            @Override
            public void updateBatchSize(int size) {
            }

            @Override
            public void updateLastAppliedImageProvenance(MetadataProvenance provenance) {
                this.lastAppliedOffset = provenance.lastContainedOffset();
            }

            @Override
            public long lastAppliedOffset() {
                return this.lastAppliedOffset;
            }

            @Override
            public void close() throws Exception {
            }
        };
        private Supplier<OptionalLong> highWaterMarkAccessor = null;

        public Builder setNodeId(int nodeId) {
            this.nodeId = nodeId;
            return this;
        }

        public Builder setTime(Time time) {
            this.time = time;
            return this;
        }

        public Builder setThreadNamePrefix(String threadNamePrefix) {
            this.threadNamePrefix = threadNamePrefix;
            return this;
        }

        public Builder setFaultHandler(FaultHandler faultHandler) {
            this.faultHandler = faultHandler;
            return this;
        }

        public Builder setMetadataLoaderMetrics(MetadataLoaderMetrics metrics) {
            this.metrics = metrics;
            return this;
        }

        public Builder setHighWaterMarkAccessor(Supplier<OptionalLong> highWaterMarkAccessor) {
            this.highWaterMarkAccessor = highWaterMarkAccessor;
            return this;
        }

        public MetadataLoader build() {
            if (this.logContext == null) {
                this.logContext = new LogContext("[MetadataLoader " + this.nodeId + "] ");
            }
            if (this.highWaterMarkAccessor == null) {
                throw new RuntimeException("You must set the high water mark accessor.");
            }
            return new MetadataLoader(this.time, this.logContext, this.threadNamePrefix, this.faultHandler, this.metrics, this.highWaterMarkAccessor);
        }
    }
}

