/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SystemTopicTxnBufferSnapshotService<T> {
    private static final Logger log = LoggerFactory.getLogger(SystemTopicTxnBufferSnapshotService.class);
    protected final ConcurrentHashMap<NamespaceName, SystemTopicClient<T>> clients;
    protected final NamespaceEventsSystemTopicFactory namespaceEventsSystemTopicFactory;
    protected final Class<T> schemaType;
    protected final EventType systemTopicType;
    private final ConcurrentHashMap<NamespaceName, ReferenceCountedWriter<T>> refCountedWriterMap;

    public SystemTopicTxnBufferSnapshotService(PulsarClient client, EventType systemTopicType, Class<T> schemaType) {
        this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client);
        this.systemTopicType = systemTopicType;
        this.schemaType = schemaType;
        this.clients = new ConcurrentHashMap();
        this.refCountedWriterMap = new ConcurrentHashMap();
    }

    public CompletableFuture<SystemTopicClient.Reader<T>> createReader(TopicName topicName) {
        return this.getTransactionBufferSystemTopicClient(topicName.getNamespaceObject()).newReaderAsync();
    }

    public void removeClient(TopicName topicName, SystemTopicClientBase<T> transactionBufferSystemTopicClient) {
        if (transactionBufferSystemTopicClient.getReaders().size() == 0 && transactionBufferSystemTopicClient.getWriters().size() == 0) {
            this.clients.remove(topicName.getNamespaceObject());
        }
    }

    public ReferenceCountedWriter<T> getReferenceWriter(NamespaceName namespaceName) {
        return this.refCountedWriterMap.compute(namespaceName, (k, v) -> {
            if (v != null && v.retain()) {
                return v;
            }
            return new ReferenceCountedWriter<T>(namespaceName, this.getTransactionBufferSystemTopicClient(namespaceName).newWriterAsync(), this);
        });
    }

    private SystemTopicClient<T> getTransactionBufferSystemTopicClient(NamespaceName namespaceName) {
        TopicName systemTopicName = NamespaceEventsSystemTopicFactory.getSystemTopicName(namespaceName, this.systemTopicType);
        if (systemTopicName == null) {
            throw new RuntimeException((Throwable)new PulsarClientException.InvalidTopicNameException("Can't get the TB system topic client for namespace " + namespaceName + " with type " + this.systemTopicType + "."));
        }
        return this.clients.computeIfAbsent(namespaceName, v -> this.namespaceEventsSystemTopicFactory.createTransactionBufferSystemTopicClient(systemTopicName, this, this.schemaType));
    }

    public void close() throws Exception {
        for (Map.Entry<NamespaceName, SystemTopicClient<T>> entry : this.clients.entrySet()) {
            entry.getValue().close();
        }
    }

    public static class ReferenceCountedWriter<T> {
        private final AtomicLong referenceCount = new AtomicLong(1L);
        private final NamespaceName namespaceName;
        private final CompletableFuture<SystemTopicClient.Writer<T>> future;
        private final SystemTopicTxnBufferSnapshotService<T> snapshotService;

        public ReferenceCountedWriter(NamespaceName namespaceName, CompletableFuture<SystemTopicClient.Writer<T>> future, SystemTopicTxnBufferSnapshotService<T> snapshotService) {
            this.namespaceName = namespaceName;
            this.snapshotService = snapshotService;
            this.future = future;
            this.future.exceptionally(t -> {
                log.error("[{}] Failed to create TB snapshot writer.", (Object)namespaceName, t);
                snapshotService.refCountedWriterMap.remove(namespaceName, this);
                return null;
            });
        }

        public CompletableFuture<SystemTopicClient.Writer<T>> getFuture() {
            return this.future;
        }

        private synchronized boolean retain() {
            return this.referenceCount.incrementAndGet() > 0L;
        }

        public synchronized void release() {
            if (this.referenceCount.decrementAndGet() == 0L) {
                this.snapshotService.refCountedWriterMap.remove(this.namespaceName, this);
                this.future.thenAccept(writer -> {
                    String topicName = writer.getSystemTopicClient().getTopicName().toString();
                    writer.closeAsync().exceptionally(t -> {
                        if (t != null) {
                            log.error("[{}] Failed to close TB snapshot writer.", (Object)topicName, t);
                        } else if (log.isDebugEnabled()) {
                            log.debug("[{}] Success to close TB snapshot writer.", (Object)topicName);
                        }
                        return null;
                    });
                });
            }
        }
    }
}

