/*
 * Decompiled with CFR 0.152.
 */
package com.sproutsocial.nsq;

import com.sproutsocial.nsq.BalanceStrategy;
import com.sproutsocial.nsq.BasePubSub;
import com.sproutsocial.nsq.Batcher;
import com.sproutsocial.nsq.Client;
import com.sproutsocial.nsq.ConnectionDetails;
import com.sproutsocial.nsq.ListBasedBalanceStrategy;
import com.sproutsocial.nsq.NSQException;
import com.sproutsocial.nsq.PubConnection;
import com.sproutsocial.nsq.SingleNsqdBalanceStrategy;
import com.sproutsocial.nsq.Util;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class Publisher
extends BasePubSub {
    private static final int DEFAULT_MAX_BATCH_SIZE = 16384;
    private static final int DEFUALT_MAX_BATCH_DELAY = 300;
    private static final Logger logger = LoggerFactory.getLogger(Publisher.class);
    private final BalanceStrategy balanceStrategy;
    private final Map<String, Batcher> batchers = new HashMap<String, Batcher>();
    private ScheduledExecutorService batchExecutor;

    public Publisher(Client client, String nsqd, String failoverNsqd) {
        this(client, Publisher.getBalanceStrategyBiFunction(nsqd, failoverNsqd));
    }

    private static BiFunction<Client, Publisher, BalanceStrategy> getBalanceStrategyBiFunction(String nsqd, String failoverNsqd) {
        Objects.requireNonNull(nsqd);
        if (failoverNsqd == null) {
            return (c, p) -> new SingleNsqdBalanceStrategy((Client)c, (Publisher)p, nsqd);
        }
        return ListBasedBalanceStrategy.getFailoverStrategyBuilder(Arrays.asList(nsqd, failoverNsqd));
    }

    public Publisher(Client client, BiFunction<Client, Publisher, BalanceStrategy> balanceStrategyFactory) {
        super(client);
        client.addPublisher(this);
        this.balanceStrategy = balanceStrategyFactory.apply(client, this);
    }

    public Publisher(String nsqd, String failoverNsqd) {
        this(Client.getDefaultClient(), nsqd, failoverNsqd);
    }

    public Publisher(String nsqd) {
        this(Client.getDefaultClient(), nsqd, null);
    }

    public synchronized void connectionClosed(PubConnection closedCon) {
        this.balanceStrategy.connectionClosed(closedCon);
    }

    public synchronized void publish(String topic, byte[] data) {
        Util.checkNotNull(topic);
        Util.checkNotNull(data);
        Util.checkArgument(data.length > 0);
        ConnectionDetails connectionDetails = this.balanceStrategy.getConnectionDetails();
        try {
            connectionDetails.getCon().publish(topic, data);
        }
        catch (Exception e) {
            connectionDetails.markFailure();
            logger.error("publish error with", (Throwable)e);
            this.publish(topic, data);
        }
    }

    public synchronized void publishDeferred(String topic, byte[] data, long delay, TimeUnit unit) {
        Util.checkNotNull(topic);
        Util.checkNotNull(data);
        Util.checkArgument(data.length > 0);
        Util.checkArgument(delay > 0L);
        Util.checkNotNull((Object)unit);
        ConnectionDetails connection = this.balanceStrategy.getConnectionDetails();
        try {
            connection.getCon().publishDeferred(topic, data, unit.toMillis(delay));
        }
        catch (Exception e) {
            connection.markFailure();
            throw new NSQException("deferred publish failed", e);
        }
    }

    public synchronized void publishDeferredWithRetry(String topic, byte[] data, long delay, TimeUnit unit) {
        Util.checkNotNull(topic);
        Util.checkNotNull(data);
        Util.checkArgument(data.length > 0);
        Util.checkArgument(delay > 0L);
        Util.checkNotNull((Object)unit);
        ConnectionDetails connection = this.balanceStrategy.getConnectionDetails();
        try {
            connection.getCon().publishDeferred(topic, data, unit.toMillis(delay));
        }
        catch (Exception e) {
            logger.error("Deferred publish error", (Throwable)e);
            connection.markFailure();
            this.publishDeferredWithRetry(topic, data, delay, unit);
        }
    }

    public synchronized void publish(String topic, List<byte[]> dataList) {
        Util.checkNotNull(topic);
        Util.checkNotNull(dataList);
        Util.checkArgument(dataList.size() > 0);
        ConnectionDetails connectionDetails = this.balanceStrategy.getConnectionDetails();
        try {
            connectionDetails.getCon().publish(topic, dataList);
        }
        catch (Exception e) {
            logger.error("publish error", (Throwable)e);
            connectionDetails.markFailure();
            for (byte[] data : dataList) {
                this.publish(topic, data);
            }
        }
    }

    public synchronized void publishBuffered(String topic, byte[] data) {
        Util.checkNotNull(topic);
        Util.checkNotNull(data);
        Util.checkArgument(data.length > 0);
        Batcher batcher = this.batchers.get(topic);
        if (batcher == null) {
            batcher = new Batcher(this, topic, 16384, 300);
            this.batchers.put(topic, batcher);
        }
        batcher.publish(data);
    }

    public synchronized void setBatchConfig(String topic, int maxSizeBytes, int maxDelayMillis) {
        Batcher batcher = this.batchers.get(topic);
        if (batcher != null) {
            batcher.sendBatch();
        }
        batcher = new Batcher(this, topic, maxSizeBytes, maxDelayMillis);
        this.batchers.put(topic, batcher);
    }

    synchronized ScheduledExecutorService getBatchExecutor() {
        if (this.batchExecutor == null) {
            this.batchExecutor = Executors.newScheduledThreadPool(1, Util.threadFactory("nsq-batch"));
        }
        return this.batchExecutor;
    }

    @Override
    public synchronized void stop() {
        this.flushBatchers();
        super.stop();
        if (this.batchExecutor != null) {
            Util.shutdownAndAwaitTermination(this.batchExecutor, 40L, TimeUnit.MILLISECONDS);
        }
        if (this.client.isLonePublisher(this)) {
            Util.shutdownAndAwaitTermination(this.client.getSchedExecutor(), 40L, TimeUnit.MILLISECONDS);
        }
    }

    protected void flushBatchers() {
        for (Batcher batcher : this.batchers.values()) {
            batcher.sendBatch();
        }
    }

    public synchronized int getFailoverDurationSecs() {
        return this.balanceStrategy.getFailoverDurationSecs();
    }

    public synchronized void setFailoverDurationSecs(int failoverDurationSecs) {
        this.balanceStrategy.setFailoverDurationSecs(failoverDurationSecs);
    }
}

