/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.internal.wire;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CheckedApiPreconditions;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.RoutingPolicy;
import com.google.cloud.pubsublite.internal.wire.ApiServiceUtils;
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatcher;
import com.google.cloud.pubsublite.internal.wire.PartitionPublisherFactory;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.common.collect.ImmutableMap;
import com.google.common.flogger.GoogleLogger;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.io.IOException;
import java.util.Optional;
import java.util.stream.LongStream;

public class PartitionCountWatchingPublisher
extends ProxyService
implements Publisher<MessageMetadata> {
    private static final GoogleLogger log = GoogleLogger.forEnclosingClass();
    private final PartitionPublisherFactory publisherFactory;
    private final RoutingPolicy.Factory policyFactory;
    private final CloseableMonitor monitor = new CloseableMonitor();
    @GuardedBy(value="monitor.monitor")
    private boolean shutdown = false;
    @GuardedBy(value="monitor.monitor")
    private Optional<PartitionsWithRouting> partitionsWithRouting = Optional.empty();

    PartitionCountWatchingPublisher(PartitionPublisherFactory publisherFactory, RoutingPolicy.Factory policyFactory, PartitionCountWatcher.Factory configWatcherFactory) {
        super(new ApiService[0]);
        this.publisherFactory = publisherFactory;
        this.policyFactory = policyFactory;
        PartitionCountWatcher configWatcher = configWatcherFactory.newWatcher(this::handleConfig);
        this.addServices(configWatcher, ApiServiceUtils.autoCloseableAsApiService(publisherFactory));
    }

    @Override
    public ApiFuture<MessageMetadata> publish(PubSubMessage message) {
        Optional<PartitionsWithRouting> partitions;
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            partitions = this.partitionsWithRouting;
        }
        if (!partitions.isPresent()) {
            throw new IllegalStateException("Publish called before start or after shutdown");
        }
        try {
            return partitions.get().publish(message);
        }
        catch (CheckedApiException e) {
            this.onPermanentError(e);
            return ApiFutures.immediateFailedFuture((Throwable)e);
        }
    }

    @Override
    public void cancelOutstandingPublishes() {
        Optional<PartitionsWithRouting> partitions;
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            partitions = this.partitionsWithRouting;
        }
        if (!partitions.isPresent()) {
            throw new IllegalStateException("Cancel outstanding publishes called before start or after shutdown");
        }
        partitions.get().cancelOutstandingPublishes();
    }

    @Override
    public void flush() throws IOException {
        Optional<PartitionsWithRouting> partitions;
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            partitions = this.partitionsWithRouting;
        }
        if (!partitions.isPresent()) {
            throw new IllegalStateException("Publish called before start or after shutdown");
        }
        partitions.get().flush();
    }

    private ImmutableMap<Partition, Publisher<MessageMetadata>> getNewPartitionPublishers(LongStream newPartitions) {
        ImmutableMap.Builder mapBuilder = ImmutableMap.builder();
        newPartitions.forEach(i -> {
            Publisher<MessageMetadata> p = this.publisherFactory.newPublisher(Partition.of(i));
            p.addListener(new ApiService.Listener(){

                public void failed(ApiService.State from, Throwable failure) {
                    PartitionCountWatchingPublisher.this.onPermanentError(ExtractStatus.toCanonical(failure));
                }
            }, SystemExecutors.getFuturesExecutor());
            mapBuilder.put((Object)Partition.of(i), p);
            p.startAsync();
        });
        ImmutableMap partitions = mapBuilder.build();
        partitions.values().forEach(ApiService::awaitRunning);
        return partitions;
    }

    private void handleConfig(long partitionCount) {
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            if (this.shutdown) {
                return;
            }
            Optional<PartitionsWithRouting> current = this.partitionsWithRouting;
            long currentSize = current.map(withRouting -> withRouting.publishers.size()).orElse(0).intValue();
            if (partitionCount == currentSize) {
                return;
            }
            if (partitionCount < currentSize) {
                ((GoogleLogger.Api)log.atWarning()).log("Received an unexpected decrease in partition count. Previous partition count %s, new count %s", currentSize, partitionCount);
                return;
            }
            ImmutableMap.Builder mapBuilder = ImmutableMap.builder();
            current.ifPresent(p -> p.publishers.forEach((arg_0, arg_1) -> ((ImmutableMap.Builder)mapBuilder).put(arg_0, arg_1)));
            this.getNewPartitionPublishers(LongStream.range(currentSize, partitionCount)).forEach((arg_0, arg_1) -> ((ImmutableMap.Builder)mapBuilder).put(arg_0, arg_1));
            this.partitionsWithRouting = Optional.of(new PartitionsWithRouting(mapBuilder.build(), this.policyFactory.newPolicy(partitionCount)));
        }
    }

    @Override
    protected void stop() {
        Optional<PartitionsWithRouting> current;
        try (CloseableMonitor.Hold h = this.monitor.enter();){
            this.shutdown = true;
            current = this.partitionsWithRouting;
            this.partitionsWithRouting = Optional.empty();
        }
        current.ifPresent(PartitionsWithRouting::stop);
    }

    @Override
    protected void handlePermanentError(CheckedApiException error) {
        try {
            this.stop();
        }
        catch (Exception e) {
            ((GoogleLogger.Api)((GoogleLogger.Api)log.atWarning()).withCause((Throwable)e)).log("Encountered exception while trying to handle failure");
        }
    }

    private static class PartitionsWithRouting {
        public final ImmutableMap<Partition, Publisher<MessageMetadata>> publishers;
        private final RoutingPolicy routingPolicy;

        private PartitionsWithRouting(ImmutableMap<Partition, Publisher<MessageMetadata>> publishers, RoutingPolicy routingPolicy) {
            this.publishers = publishers;
            this.routingPolicy = routingPolicy;
        }

        public ApiFuture<MessageMetadata> publish(PubSubMessage message) throws CheckedApiException {
            try {
                Partition routedPartition = this.routingPolicy.route(message);
                CheckedApiPreconditions.checkState(this.publishers.containsKey((Object)routedPartition), "Routed to partition %s for which there is no publisher available.", routedPartition);
                return ((Publisher)this.publishers.get((Object)routedPartition)).publish(message);
            }
            catch (Throwable t) {
                throw ExtractStatus.toCanonical(t);
            }
        }

        public void cancelOutstandingPublishes() {
            for (Publisher publisher : this.publishers.values()) {
                publisher.cancelOutstandingPublishes();
            }
        }

        public void flush() throws IOException {
            for (Publisher publisher : this.publishers.values()) {
                publisher.flush();
            }
        }

        public void stop() {
            ApiServiceUtils.blockingShutdown((Iterable<? extends ApiService>)this.publishers.values());
        }
    }
}

