/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.transaction.pendingack.impl;

import io.netty.util.Timer;
import io.prometheus.client.CollectorRegistry;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.exception.pendingack.TransactionPendingAckException;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterMetricsStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MLPendingAckStoreProvider
implements TransactionPendingAckStoreProvider {
    private static final Logger log = LoggerFactory.getLogger(MLPendingAckStoreProvider.class);
    private static volatile TxnLogBufferedWriterMetricsStats bufferedWriterMetrics = DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void initBufferedWriterMetrics(String brokerAdvertisedAddress) {
        if (bufferedWriterMetrics != DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS) {
            return;
        }
        Class<MLPendingAckStoreProvider> clazz = MLPendingAckStoreProvider.class;
        synchronized (MLPendingAckStoreProvider.class) {
            if (bufferedWriterMetrics != DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS) {
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return;
            }
            bufferedWriterMetrics = new MLTxnPendingAckLogBufferedWriterMetrics(brokerAdvertisedAddress);
            // ** MonitorExit[var1_1] (shouldn't be in output)
            return;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static void closeBufferedWriterMetrics() {
        Class<MLPendingAckStoreProvider> clazz = MLPendingAckStoreProvider.class;
        synchronized (MLPendingAckStoreProvider.class) {
            if (bufferedWriterMetrics == DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS) {
                // ** MonitorExit[var0] (shouldn't be in output)
                return;
            }
            bufferedWriterMetrics.close();
            bufferedWriterMetrics = DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS;
            // ** MonitorExit[var0] (shouldn't be in output)
            return;
        }
    }

    @Override
    public CompletableFuture<PendingAckStore> newPendingAckStore(final PersistentSubscription subscription) {
        final CompletableFuture<PendingAckStore> pendingAckStoreFuture = new CompletableFuture<PendingAckStore>();
        if (subscription == null) {
            pendingAckStoreFuture.completeExceptionally(new TransactionPendingAckException.TransactionPendingAckStoreProviderException("The subscription is null."));
            return pendingAckStoreFuture;
        }
        final PersistentTopic originPersistentTopic = (PersistentTopic)subscription.getTopic();
        PulsarService pulsarService = originPersistentTopic.getBrokerService().getPulsar();
        final Timer brokerClientSharedTimer = pulsarService.getBrokerClientSharedTimer();
        ServiceConfiguration serviceConfiguration = pulsarService.getConfiguration();
        final TxnLogBufferedWriterConfig txnLogBufferedWriterConfig = new TxnLogBufferedWriterConfig();
        txnLogBufferedWriterConfig.setBatchEnabled(serviceConfiguration.isTransactionPendingAckBatchedWriteEnabled());
        txnLogBufferedWriterConfig.setBatchedWriteMaxRecords(serviceConfiguration.getTransactionPendingAckBatchedWriteMaxRecords());
        txnLogBufferedWriterConfig.setBatchedWriteMaxSize(serviceConfiguration.getTransactionPendingAckBatchedWriteMaxSize());
        txnLogBufferedWriterConfig.setBatchedWriteMaxDelayInMillis(serviceConfiguration.getTransactionPendingAckBatchedWriteMaxDelayInMillis());
        String pendingAckTopicName = MLPendingAckStore.getTransactionPendingAckStoreSuffix(originPersistentTopic.getName(), subscription.getName());
        ((CompletableFuture)originPersistentTopic.getBrokerService().getManagedLedgerFactory().asyncExists(TopicName.get((String)pendingAckTopicName).getPersistenceNamingEncoding()).thenAccept(exist -> {
            TopicName topicName = exist != false ? TopicName.get((String)pendingAckTopicName) : TopicName.get((String)originPersistentTopic.getName());
            ((CompletableFuture)originPersistentTopic.getBrokerService().getManagedLedgerConfig(topicName).thenAccept(config -> {
                config.setCreateIfMissing(true);
                originPersistentTopic.getBrokerService().getManagedLedgerFactory().asyncOpen(TopicName.get((String)pendingAckTopicName).getPersistenceNamingEncoding(), config, new AsyncCallbacks.OpenLedgerCallback(){

                    public void openLedgerComplete(final ManagedLedger ledger, Object ctx) {
                        ledger.asyncOpenCursor(MLPendingAckStore.getTransactionPendingAckStoreCursorName(), CommandSubscribe.InitialPosition.Earliest, new AsyncCallbacks.OpenCursorCallback(){

                            public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                                pendingAckStoreFuture.complete(new MLPendingAckStore(ledger, cursor, subscription.getCursor(), originPersistentTopic.getBrokerService().getPulsar().getConfiguration().getTransactionPendingAckLogIndexMinLag(), txnLogBufferedWriterConfig, brokerClientSharedTimer, bufferedWriterMetrics));
                                if (log.isDebugEnabled()) {
                                    log.debug("{},{} open MLPendingAckStore cursor success", (Object)originPersistentTopic.getName(), (Object)subscription.getName());
                                }
                            }

                            public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                                log.error("{},{} open MLPendingAckStore cursor failed.", new Object[]{originPersistentTopic.getName(), subscription.getName(), exception});
                                pendingAckStoreFuture.completeExceptionally(exception);
                            }
                        }, null);
                    }

                    public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
                        log.error("{}, {} open MLPendingAckStore managedLedger failed.", new Object[]{originPersistentTopic.getName(), subscription.getName(), exception});
                        pendingAckStoreFuture.completeExceptionally(exception);
                    }
                }, () -> CompletableFuture.completedFuture(true), null);
            })).exceptionally(e -> {
                Throwable t = FutureUtil.unwrapCompletionException((Throwable)e);
                log.error("[{}] [{}] Failed to get managedLedger config when init pending ack store!", new Object[]{originPersistentTopic, subscription, t});
                pendingAckStoreFuture.completeExceptionally(t);
                return null;
            });
        })).exceptionally(e -> {
            Throwable t = FutureUtil.unwrapCompletionException((Throwable)e);
            log.error("[{}] [{}] Failed to check the pending ack topic exist when init pending ack store!", new Object[]{originPersistentTopic, subscription, t});
            pendingAckStoreFuture.completeExceptionally(t);
            return null;
        });
        return pendingAckStoreFuture;
    }

    @Override
    public CompletableFuture<Boolean> checkInitializedBefore(PersistentSubscription subscription) {
        PersistentTopic originPersistentTopic = (PersistentTopic)subscription.getTopic();
        String pendingAckTopicName = MLPendingAckStore.getTransactionPendingAckStoreSuffix(originPersistentTopic.getName(), subscription.getName());
        return originPersistentTopic.getBrokerService().getManagedLedgerFactory().asyncExists(TopicName.get((String)pendingAckTopicName).getPersistenceNamingEncoding());
    }

    private static class MLTxnPendingAckLogBufferedWriterMetrics
    extends TxnLogBufferedWriterMetricsStats {
        private MLTxnPendingAckLogBufferedWriterMetrics(String brokerAdvertisedAddress) {
            super("pulsar_txn_pending_ack_store", new String[]{"broker"}, new String[]{brokerAdvertisedAddress}, CollectorRegistry.defaultRegistry);
        }
    }
}

