/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.metadata.bookkeeper;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import lombok.Generated;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.runtime.shaded.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.BKException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.BookieId;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.versioning.LongVersion;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.versioning.Version;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.metadata.api.CacheGetResult;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.bookkeeper.BookieServiceInfoSerde;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PulsarRegistrationClient
implements RegistrationClient {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(PulsarRegistrationClient.class);
    private final AbstractMetadataStore store;
    private final String ledgersRootPath;
    private final String bookieRegistrationPath;
    private final String bookieAllRegistrationPath;
    private final String bookieReadonlyRegistrationPath;
    private final Set<RegistrationClient.RegistrationListener> writableBookiesWatchers = new CopyOnWriteArraySet<RegistrationClient.RegistrationListener>();
    private final Set<RegistrationClient.RegistrationListener> readOnlyBookiesWatchers = new CopyOnWriteArraySet<RegistrationClient.RegistrationListener>();
    private final MetadataCache<BookieServiceInfo> bookieServiceInfoMetadataCache;
    private final ScheduledExecutorService executor;
    private final Map<BookieId, Versioned<BookieServiceInfo>> writableBookieInfo;
    private final Map<BookieId, Versioned<BookieServiceInfo>> readOnlyBookieInfo;
    private final FutureUtil.Sequencer<Void> sequencer;
    private SessionEvent lastMetadataSessionEvent;

    public PulsarRegistrationClient(MetadataStore store, String ledgersRootPath) {
        this.store = (AbstractMetadataStore)store;
        this.ledgersRootPath = ledgersRootPath;
        this.bookieServiceInfoMetadataCache = store.getMetadataCache(BookieServiceInfoSerde.INSTANCE);
        this.sequencer = FutureUtil.Sequencer.create();
        this.writableBookieInfo = new ConcurrentHashMap<BookieId, Versioned<BookieServiceInfo>>();
        this.readOnlyBookieInfo = new ConcurrentHashMap<BookieId, Versioned<BookieServiceInfo>>();
        this.bookieRegistrationPath = ledgersRootPath + "/available";
        this.bookieAllRegistrationPath = ledgersRootPath + "/cookies";
        this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/readonly";
        this.executor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-registration-client"));
        store.registerListener(this::updatedBookies);
        this.store.registerSessionListener(this::refreshBookies);
    }

    @Override
    public void close() {
        this.executor.shutdownNow();
    }

    private void refreshBookies(SessionEvent sessionEvent) {
        this.lastMetadataSessionEvent = sessionEvent;
        if (!SessionEvent.Reconnected.equals((Object)sessionEvent) && !SessionEvent.SessionReestablished.equals((Object)sessionEvent)) {
            return;
        }
        this.store.invalidateCaches(this.bookieRegistrationPath, this.bookieAllRegistrationPath, this.bookieReadonlyRegistrationPath);
        this.bookieServiceInfoMetadataCache.invalidateAll();
        this.getReadOnlyBookies().thenAccept(bookies -> this.readOnlyBookiesWatchers.forEach(w -> this.executor.execute(() -> w.onBookiesChanged((Versioned<Set<BookieId>>)bookies))));
        this.getWritableBookies().thenAccept(bookies -> this.writableBookiesWatchers.forEach(w -> this.executor.execute(() -> w.onBookiesChanged((Versioned<Set<BookieId>>)bookies))));
    }

    @Override
    public CompletableFuture<Versioned<Set<BookieId>>> getWritableBookies() {
        return this.getBookiesThenFreshCache(this.bookieRegistrationPath);
    }

    @Override
    public CompletableFuture<Versioned<Set<BookieId>>> getAllBookies() {
        return this.getBookiesThenFreshCache(this.bookieAllRegistrationPath);
    }

    @Override
    public CompletableFuture<Versioned<Set<BookieId>>> getReadOnlyBookies() {
        return this.getBookiesThenFreshCache(this.bookieReadonlyRegistrationPath);
    }

    private CompletableFuture<Versioned<Set<BookieId>>> getBookiesThenFreshCache(String path) {
        if (path == null || path.isEmpty()) {
            return CompletableFuture.failedFuture(new IllegalArgumentException("parameter [path] can not be null or empty."));
        }
        return ((CompletableFuture)this.store.getChildren(path).thenComposeAsync(children -> {
            Set<BookieId> bookieIds = PulsarRegistrationClient.convertToBookieAddresses(children);
            ArrayList<CompletionStage> bookieInfoUpdated = new ArrayList<CompletionStage>(bookieIds.size());
            for (BookieId id : bookieIds) {
                if (path.equals(this.bookieReadonlyRegistrationPath) && this.readOnlyBookieInfo.get(id) == null) {
                    bookieInfoUpdated.add(this.readBookieInfoAsReadonlyBookie(id));
                    continue;
                }
                if (path.equals(this.bookieRegistrationPath) && this.writableBookieInfo.get(id) == null) {
                    bookieInfoUpdated.add(this.readBookieInfoAsWritableBookie(id));
                    continue;
                }
                if (!path.equals(this.bookieAllRegistrationPath) || this.writableBookieInfo.get(id) != null || this.readOnlyBookieInfo.get(id) != null) continue;
                CompletionStage revalidateAllBookiesFuture = this.readBookieInfoAsWritableBookie(id).thenCompose(writableBookieInfo -> writableBookieInfo.map(bookieServiceInfo -> CompletableFuture.completedFuture(null)).orElseGet(() -> this.readBookieInfoAsReadonlyBookie(id)));
                bookieInfoUpdated.add(revalidateAllBookiesFuture);
            }
            if (bookieInfoUpdated.isEmpty()) {
                return CompletableFuture.completedFuture(bookieIds);
            }
            return FutureUtil.waitForAll(bookieInfoUpdated).thenApply(___ -> bookieIds);
        })).thenApply(s -> new Versioned<Set>((Set)s, Version.NEW));
    }

    @Override
    public CompletableFuture<Void> watchWritableBookies(RegistrationClient.RegistrationListener registrationListener) {
        this.writableBookiesWatchers.add(registrationListener);
        return this.getWritableBookies().thenAcceptAsync(bookies -> this.writableBookiesWatchers.forEach(w -> w.onBookiesChanged((Versioned<Set<BookieId>>)bookies)), (Executor)this.executor);
    }

    @Override
    public void unwatchWritableBookies(RegistrationClient.RegistrationListener registrationListener) {
        this.writableBookiesWatchers.remove(registrationListener);
    }

    @Override
    public CompletableFuture<Void> watchReadOnlyBookies(RegistrationClient.RegistrationListener registrationListener) {
        this.readOnlyBookiesWatchers.add(registrationListener);
        return this.getReadOnlyBookies().thenAcceptAsync(bookies -> this.readOnlyBookiesWatchers.forEach(w -> w.onBookiesChanged((Versioned<Set<BookieId>>)bookies)), (Executor)this.executor);
    }

    @Override
    public void unwatchReadOnlyBookies(RegistrationClient.RegistrationListener registrationListener) {
        this.readOnlyBookiesWatchers.remove(registrationListener);
    }

    private void updatedBookies(Notification n) {
        String path = n.getPath();
        if (!path.startsWith(this.bookieReadonlyRegistrationPath) && !path.startsWith(this.bookieRegistrationPath)) {
            return;
        }
        if (path.equals(this.bookieReadonlyRegistrationPath) || path.equals(this.bookieRegistrationPath)) {
            return;
        }
        BookieId bookieId = PulsarRegistrationClient.stripBookieIdFromPath(n.getPath());
        this.sequencer.sequential(() -> {
            switch (n.getType()) {
                case Created: {
                    log.info("Bookie {} created. path: {}", (Object)bookieId, (Object)n.getPath());
                    if (path.startsWith(this.bookieReadonlyRegistrationPath)) {
                        return this.getReadOnlyBookies().thenAccept(bookies -> this.readOnlyBookiesWatchers.forEach(w -> this.executor.execute(() -> w.onBookiesChanged((Versioned<Set<BookieId>>)bookies))));
                    }
                    return this.getWritableBookies().thenAccept(bookies -> this.writableBookiesWatchers.forEach(w -> this.executor.execute(() -> w.onBookiesChanged((Versioned<Set<BookieId>>)bookies))));
                }
                case Modified: {
                    if (bookieId == null) {
                        return CompletableFuture.completedFuture(null);
                    }
                    log.info("Bookie {} modified. path: {}", (Object)bookieId, (Object)n.getPath());
                    if (path.startsWith(this.bookieReadonlyRegistrationPath)) {
                        return this.readBookieInfoAsReadonlyBookie(bookieId).thenApply(__ -> null);
                    }
                    return this.readBookieInfoAsWritableBookie(bookieId).thenApply(__ -> null);
                }
                case Deleted: {
                    if (bookieId == null) {
                        return CompletableFuture.completedFuture(null);
                    }
                    log.info("Bookie {} deleted. path: {}", (Object)bookieId, (Object)n.getPath());
                    if (path.startsWith(this.bookieReadonlyRegistrationPath)) {
                        this.readOnlyBookieInfo.remove(bookieId);
                        return this.getReadOnlyBookies().thenAccept(bookies -> this.readOnlyBookiesWatchers.forEach(w -> this.executor.execute(() -> w.onBookiesChanged((Versioned<Set<BookieId>>)bookies))));
                    }
                    if (path.startsWith(this.bookieRegistrationPath)) {
                        this.writableBookieInfo.remove(bookieId);
                        return this.getWritableBookies().thenAccept(bookies -> this.writableBookiesWatchers.forEach(w -> this.executor.execute(() -> w.onBookiesChanged((Versioned<Set<BookieId>>)bookies))));
                    }
                    return CompletableFuture.completedFuture(null);
                }
            }
            return CompletableFuture.completedFuture(null);
        });
    }

    private static BookieId stripBookieIdFromPath(String path) {
        if (path == null) {
            return null;
        }
        int slash = path.lastIndexOf(47);
        if (slash >= 0) {
            try {
                return BookieId.parse(path.substring(slash + 1));
            }
            catch (IllegalArgumentException e) {
                log.warn("Cannot decode bookieId from {}, error: {}", (Object)path, (Object)e.getMessage());
            }
        }
        return null;
    }

    private static Set<BookieId> convertToBookieAddresses(List<String> children) {
        HashSet<BookieId> newBookieAddrs = new HashSet<BookieId>();
        for (String bookieAddrString : children) {
            if ("readonly".equals(bookieAddrString)) continue;
            BookieId bookieAddr = BookieId.parse(bookieAddrString);
            newBookieAddrs.add(bookieAddr);
        }
        return newBookieAddrs;
    }

    @Override
    public CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(BookieId bookieId) {
        Versioned<BookieServiceInfo> info = this.writableBookieInfo.get(bookieId);
        if (info == null) {
            info = this.readOnlyBookieInfo.get(bookieId);
        }
        if (log.isDebugEnabled()) {
            log.debug("getBookieServiceInfo {} -> {}", (Object)bookieId, info);
        }
        if (info != null) {
            return CompletableFuture.completedFuture(info);
        }
        return FutureUtils.exception(new BKException.BKBookieHandleNotAvailableException());
    }

    public CompletableFuture<Optional<CacheGetResult<BookieServiceInfo>>> readBookieInfoAsWritableBookie(BookieId bookieId) {
        String asWritable = this.bookieRegistrationPath + "/" + String.valueOf(bookieId);
        return this.bookieServiceInfoMetadataCache.getWithStats(asWritable).thenApply(bkInfoWithStats -> {
            if (bkInfoWithStats.isPresent()) {
                CacheGetResult r = (CacheGetResult)bkInfoWithStats.get();
                log.info("Update BookieInfoCache (writable bookie) {} -> {}", (Object)bookieId, r.getValue());
                this.writableBookieInfo.put(bookieId, new Versioned<BookieServiceInfo>((BookieServiceInfo)r.getValue(), new LongVersion(r.getStat().getVersion())));
            }
            return bkInfoWithStats;
        });
    }

    final CompletableFuture<Optional<CacheGetResult<BookieServiceInfo>>> readBookieInfoAsReadonlyBookie(BookieId bookieId) {
        String asReadonly = this.bookieReadonlyRegistrationPath + "/" + String.valueOf(bookieId);
        return this.bookieServiceInfoMetadataCache.getWithStats(asReadonly).thenApply(bkInfoWithStats -> {
            if (bkInfoWithStats.isPresent()) {
                CacheGetResult r = (CacheGetResult)bkInfoWithStats.get();
                log.info("Update BookieInfoCache (readonly bookie) {} -> {}", (Object)bookieId, r.getValue());
                this.readOnlyBookieInfo.put(bookieId, new Versioned<BookieServiceInfo>((BookieServiceInfo)r.getValue(), new LongVersion(r.getStat().getVersion())));
            }
            return bkInfoWithStats;
        });
    }
}

