/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.pubsublite.cloudpubsub.internal;

import com.google.api.core.ApiService;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.cloudpubsub.ReassignmentHandler;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.internal.PartitionSubscriberFactory;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CheckedApiPreconditions;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.ApiServiceUtils;
import com.google.cloud.pubsublite.internal.wire.Assigner;
import com.google.cloud.pubsublite.internal.wire.AssignerFactory;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.common.collect.ImmutableSet;
import com.google.common.flogger.GoogleLogger;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

public class AssigningSubscriber
extends ProxyService
implements Subscriber {
    private static final GoogleLogger LOG = GoogleLogger.forEnclosingClass();
    private final PartitionSubscriberFactory subscriberFactory;
    private final ReassignmentHandler reassignmentHandler;
    @GuardedBy(value="this")
    private final Map<Partition, Subscriber> liveSubscriberMap = new HashMap<Partition, Subscriber>();
    @GuardedBy(value="this")
    private boolean shutdown = false;

    public AssigningSubscriber(PartitionSubscriberFactory subscriberFactory, ReassignmentHandler reassignmentHandler, AssignerFactory assignerFactory) throws ApiException {
        super(new ApiService[0]);
        this.subscriberFactory = subscriberFactory;
        this.reassignmentHandler = reassignmentHandler;
        Assigner assigner = assignerFactory.New(this::handleAssignment);
        this.addServices(assigner, ApiServiceUtils.autoCloseableAsApiService(subscriberFactory));
    }

    @Override
    protected synchronized void stop() {
        this.shutdown = true;
        ApiServiceUtils.blockingShutdown(this.liveSubscriberMap.values());
        this.liveSubscriberMap.clear();
    }

    @Override
    protected void handlePermanentError(CheckedApiException error) {
        this.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleAssignment(Set<Partition> assignment) {
        try {
            ImmutableSet livePartitions;
            ArrayList<Subscriber> removed = new ArrayList<Subscriber>();
            AssigningSubscriber assigningSubscriber = this;
            synchronized (assigningSubscriber) {
                if (this.shutdown) {
                    return;
                }
                livePartitions = ImmutableSet.copyOf(this.liveSubscriberMap.keySet());
                for (Partition partition : livePartitions) {
                    if (assignment.contains(partition)) continue;
                    removed.add(Objects.requireNonNull(this.liveSubscriberMap.remove(partition)));
                }
                for (Partition partition : assignment) {
                    if (this.liveSubscriberMap.containsKey(partition)) continue;
                    this.startSubscriber(partition);
                }
            }
            ApiServiceUtils.blockingShutdown(removed);
            this.reassignmentHandler.handleReassignment((Set<Partition>)livePartitions, assignment);
        }
        catch (Throwable t) {
            this.onPermanentError(ExtractStatus.toCanonical(t));
        }
    }

    private synchronized void startSubscriber(Partition partition) throws CheckedApiException {
        CheckedApiPreconditions.checkState(!this.liveSubscriberMap.containsKey(partition));
        Subscriber subscriber = this.subscriberFactory.newSubscriber(partition);
        subscriber.addListener(new ApiService.Listener(){

            public void failed(ApiService.State from, Throwable failure) {
                if (ApiService.State.STOPPING.equals((Object)from)) {
                    return;
                }
                AssigningSubscriber.this.onPermanentError(ExtractStatus.toCanonical(failure));
            }
        }, SystemExecutors.getFuturesExecutor());
        this.liveSubscriberMap.put(partition, subscriber);
        subscriber.startAsync();
    }
}

