/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.metadata.bookkeeper;

import java.io.File;
import java.io.IOException;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.Generated;
import org.apache.pulsar.common.util.PortManager;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.bookie.Cookie;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.BookKeeper;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.allocator.PoolingPolicy;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.component.ComponentStarter;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.component.Lifecycle;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.component.LifecycleComponent;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.component.LifecycleComponentStack;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.BookieId;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.proto.BookieServer;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.replication.AutoRecoveryMain;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.server.Main;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.server.conf.BookieConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.util.IOUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.io.FileUtils;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.bookkeeper.PulsarMetadataBookieDriver;
import org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver;
import org.apache.pulsar.metadata.bookkeeper.PulsarRegistrationManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BKCluster
implements AutoCloseable {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(BKCluster.class);
    private final BKClusterConf clusterConf;
    private final MetadataStoreExtended store;
    private final List<File> tmpDirs = new ArrayList<File>();
    private final List<LifecycleComponentStack> bookieComponents = new ArrayList<LifecycleComponentStack>();
    private final List<ServerConfiguration> bsConfs = new ArrayList<ServerConfiguration>();
    protected final ServerConfiguration baseConf;
    protected final ClientConfiguration baseClientConf;
    private final List<Integer> lockedPorts = new ArrayList<Integer>();
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final Map<BookieServer, AutoRecoveryMain> autoRecoveryProcesses = new HashMap<BookieServer, AutoRecoveryMain>();
    boolean isAutoRecoveryEnabled = false;

    public static BKClusterConf builder() {
        return new BKClusterConf();
    }

    private BKCluster(BKClusterConf bkClusterConf) throws Exception {
        this.clusterConf = bkClusterConf;
        this.baseConf = bkClusterConf.baseServerConfiguration != null ? bkClusterConf.baseServerConfiguration : this.newBaseServerConfiguration();
        this.baseClientConf = BKCluster.newBaseClientConfiguration();
        this.store = MetadataStoreExtended.create(this.clusterConf.metadataServiceUri, MetadataStoreConfig.builder().metadataStoreName("metadata-store").build());
        this.baseConf.setJournalRemovePagesFromCache(false);
        this.baseConf.setProperty("metadata-store-instance", this.store);
        this.baseClientConf.setProperty("metadata-store-instance", this.store);
        System.setProperty("bookkeeper.metadata.bookie.drivers", PulsarMetadataBookieDriver.class.getName());
        System.setProperty("bookkeeper.metadata.client.drivers", PulsarMetadataClientDriver.class.getName());
        this.startBKCluster(bkClusterConf.numBookies);
    }

    @Override
    public void close() throws Exception {
        if (this.closed.compareAndSet(false, true)) {
            try {
                this.stopBKCluster();
            }
            catch (Exception e) {
                log.error("Got Exception while trying to stop BKCluster", (Throwable)e);
            }
            this.lockedPorts.forEach(PortManager::releaseLockedPort);
            this.lockedPorts.clear();
            try {
                this.cleanupTempDirs();
            }
            catch (Exception e) {
                log.error("Got Exception while trying to cleanupTempDirs", (Throwable)e);
            }
            this.store.close();
        }
    }

    private File createTempDir(String prefix, String suffix) throws IOException {
        File dir = IOUtils.createTempDir(prefix, suffix);
        this.tmpDirs.add(dir);
        return dir;
    }

    private void startBKCluster(int numBookies) throws Exception {
        PulsarRegistrationManager rm = new PulsarRegistrationManager(this.store, "/ledgers", this.baseConf);
        rm.initNewCluster();
        this.baseConf.setMetadataServiceUri("metadata-store:" + this.clusterConf.metadataServiceUri);
        this.baseClientConf.setMetadataServiceUri("metadata-store:" + this.clusterConf.metadataServiceUri);
        for (int i = 0; i < numBookies; ++i) {
            this.startNewBookie(i);
        }
    }

    public BookKeeper newClient() throws Exception {
        return new BookKeeper(this.baseClientConf);
    }

    protected void stopBKCluster() throws Exception {
        this.bookieComponents.forEach(LifecycleComponentStack::close);
        this.bookieComponents.clear();
    }

    protected void cleanupTempDirs() throws Exception {
        for (File f : this.tmpDirs) {
            FileUtils.deleteDirectory(f);
        }
    }

    private ServerConfiguration newServerConfiguration(int index) throws Exception {
        String existBookieAddr;
        int port;
        File dataDir = this.clusterConf.dataDir != null ? (index == 0 ? new File(this.clusterConf.dataDir) : new File(this.clusterConf.dataDir + "/" + index)) : this.createTempDir("bookie", "test-" + index);
        if (this.clusterConf.clearOldData && dataDir.exists()) {
            log.info("Wiping Bookie data directory at {}", (Object)dataDir.getAbsolutePath());
            FileUtils.cleanDirectory(dataDir);
        }
        if (this.baseConf.isEnableLocalTransport() || !this.baseConf.getAllowEphemeralPorts() || this.clusterConf.bkPort == 0) {
            port = PortManager.nextLockedFreePort();
            this.lockedPorts.add(port);
        } else {
            port = this.clusterConf.bkPort;
        }
        File[] cookieDir = dataDir.listFiles(file -> file.getName().equals("current"));
        if (cookieDir != null && cookieDir.length > 0 && (existBookieAddr = this.parseBookieAddressFromCookie(cookieDir[0])) != null) {
            this.baseConf.setAdvertisedAddress(existBookieAddr.split(":")[0]);
            port = Integer.parseInt(existBookieAddr.split(":")[1]);
        }
        return this.newServerConfiguration(port, dataDir, new File[]{dataDir});
    }

    private String parseBookieAddressFromCookie(File dir) throws IOException {
        Cookie cookie = Cookie.readFromDirectory(dir);
        Pattern pattern = Pattern.compile(".*bookieHost: \"(.*?)\".*", 32);
        Matcher m = pattern.matcher(cookie.toString());
        return m.find() ? m.group(1) : null;
    }

    private ClientConfiguration newClientConfiguration() {
        return new ClientConfiguration(this.baseConf);
    }

    private ServerConfiguration newServerConfiguration(int port, File journalDir, File[] ledgerDirs) {
        ServerConfiguration conf = new ServerConfiguration(this.baseConf);
        conf.setBookiePort(port);
        conf.setJournalDirName(journalDir.getPath());
        String[] ledgerDirNames = new String[ledgerDirs.length];
        for (int i = 0; i < ledgerDirs.length; ++i) {
            ledgerDirNames[i] = ledgerDirs[i].getPath();
        }
        conf.setLedgerDirNames(ledgerDirNames);
        conf.setEnableTaskExecutionStats(true);
        conf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
        return conf;
    }

    protected void stopAllBookies() throws Exception {
        this.stopAllBookies(true);
    }

    protected void stopAllBookies(boolean shutdownClient) throws Exception {
        this.bookieComponents.forEach(LifecycleComponent::close);
        this.bookieComponents.clear();
        this.bsConfs.clear();
    }

    protected void startAllBookies() throws Exception {
        for (ServerConfiguration conf : this.bsConfs) {
            this.bookieComponents.add(this.startBookie(conf));
        }
    }

    public int startNewBookie(int index) throws Exception {
        ServerConfiguration conf = this.newServerConfiguration(index);
        this.bsConfs.add(conf);
        log.info("Starting new bookie on port: {}", (Object)conf.getBookiePort());
        LifecycleComponentStack server = this.startBookie(conf);
        this.bookieComponents.add(server);
        return conf.getBookiePort();
    }

    protected LifecycleComponentStack startBookie(ServerConfiguration conf) throws Exception {
        LifecycleComponentStack server = Main.buildBookieServer(new BookieConfiguration(conf));
        BookieId address = BookieImpl.getBookieId(conf);
        ComponentStarter.startComponent(server);
        for (int i = 0; i < 3000 && server.lifecycleState() != Lifecycle.State.STARTED; ++i) {
            Thread.sleep(10L);
        }
        if (server.lifecycleState() != Lifecycle.State.STARTED) {
            throw new RuntimeException("Bookie failed to start within timeout period");
        }
        log.info("New bookie '{}' has been created.", (Object)address);
        return server;
    }

    private void startAutoRecovery(BookieServer bserver, ServerConfiguration conf) throws Exception {
        if (this.isAutoRecoveryEnabled()) {
            AutoRecoveryMain autoRecoveryProcess = new AutoRecoveryMain(conf);
            autoRecoveryProcess.start();
            this.autoRecoveryProcesses.put(bserver, autoRecoveryProcess);
            log.debug("Starting Auditor Recovery for the bookie:" + String.valueOf(bserver.getBookieId()));
        }
    }

    private ServerConfiguration newBaseServerConfiguration() {
        ServerConfiguration confReturn = new ServerConfiguration();
        confReturn.setTLSEnabledProtocols("TLSv1.2,TLSv1.1");
        confReturn.setJournalFlushWhenQueueEmpty(true);
        confReturn.setJournalFormatVersionToWrite(5);
        confReturn.setAllowEphemeralPorts(true);
        confReturn.setJournalWriteData(false);
        confReturn.setProperty("journalPreAllocSizeMB", 1);
        confReturn.setBookiePort(this.clusterConf.bkPort);
        confReturn.setGcWaitTime(1000L);
        confReturn.setDiskUsageThreshold(0.999f);
        confReturn.setDiskUsageWarnThreshold(0.99f);
        confReturn.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
        confReturn.setProperty("dbStorage_writeCacheMaxSizeMb", 4);
        confReturn.setProperty("dbStorage_readAheadCacheMaxSizeMb", 4);
        BKCluster.setLoopbackInterfaceAndAllowLoopback(confReturn);
        return confReturn;
    }

    public static ClientConfiguration newBaseClientConfiguration() {
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setTLSEnabledProtocols("TLSv1.2,TLSv1.1");
        return clientConfiguration;
    }

    private static String getLoopbackInterfaceName() {
        try {
            Enumeration<NetworkInterface> nifs = NetworkInterface.getNetworkInterfaces();
            for (NetworkInterface nif : Collections.list(nifs)) {
                if (!nif.isLoopback()) continue;
                return nif.getName();
            }
        }
        catch (SocketException var3) {
            log.warn("Exception while figuring out loopback interface. Will use null.", (Throwable)var3);
            return null;
        }
        log.warn("Unable to deduce loopback interface. Will use null");
        return null;
    }

    private static ServerConfiguration setLoopbackInterfaceAndAllowLoopback(ServerConfiguration serverConf) {
        serverConf.setListeningInterface(BKCluster.getLoopbackInterfaceName());
        serverConf.setAllowLoopback(true);
        return serverConf;
    }

    public boolean isClosed() {
        return this.closed.get();
    }

    @Generated
    public MetadataStoreExtended getStore() {
        return this.store;
    }

    @Generated
    public List<ServerConfiguration> getBsConfs() {
        return this.bsConfs;
    }

    @Generated
    public boolean isAutoRecoveryEnabled() {
        return this.isAutoRecoveryEnabled;
    }

    public static class BKClusterConf {
        private ServerConfiguration baseServerConfiguration;
        private String metadataServiceUri;
        private int numBookies = 1;
        private String dataDir;
        private int bkPort = 0;
        private boolean clearOldData;

        public BKClusterConf baseServerConfiguration(ServerConfiguration baseServerConfiguration) {
            this.baseServerConfiguration = baseServerConfiguration;
            return this;
        }

        public BKClusterConf metadataServiceUri(String metadataServiceUri) {
            this.metadataServiceUri = metadataServiceUri;
            return this;
        }

        public BKClusterConf numBookies(int numBookies) {
            this.numBookies = numBookies;
            return this;
        }

        public BKClusterConf dataDir(String dataDir) {
            this.dataDir = dataDir;
            return this;
        }

        public BKClusterConf bkPort(int bkPort) {
            this.bkPort = bkPort;
            return this;
        }

        public BKClusterConf clearOldData(boolean clearOldData) {
            this.clearOldData = clearOldData;
            return this;
        }

        public BKCluster build() throws Exception {
            return new BKCluster(this);
        }
    }
}

