/*
 * Decompiled with CFR 0.152.
 */
package com.github.yingzhuo.nsqj;

import com.github.yingzhuo.nsqj.BasePubSub;
import com.github.yingzhuo.nsqj.Batcher;
import com.github.yingzhuo.nsqj.Client;
import com.github.yingzhuo.nsqj.HostAndPort;
import com.github.yingzhuo.nsqj.Messages;
import com.github.yingzhuo.nsqj.NSQException;
import com.github.yingzhuo.nsqj.PubConnection;
import com.github.yingzhuo.nsqj.Util;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class Publisher
extends BasePubSub {
    private static final Logger log = LoggerFactory.getLogger(Publisher.class);
    private final HostAndPort nsqd;
    private final HostAndPort failoverNsqd;
    private PubConnection con;
    private boolean isFailover = false;
    private long failoverStart;
    private int failoverDurationSecs = 300;
    private final Map<String, Batcher> batchers = new HashMap<String, Batcher>();
    private ScheduledExecutorService batchExecutor;
    private static final int DEFAULT_MAX_BATCH_SIZE = 16384;
    private static final int DEFAULT_MAX_BATCH_DELAY = 300;

    public Publisher(Client client, String string) {
        this(client, string, null);
    }

    public Publisher(Client client, String string, String string2) {
        super(client);
        this.nsqd = HostAndPort.fromString(string).withDefaultPort(4150);
        this.failoverNsqd = string2 != null ? HostAndPort.fromString(string2).withDefaultPort(4150) : null;
        client.addPublisher(this);
    }

    public Publisher(String string, String string2) {
        this(Client.getDefaultClient(), string, string2);
    }

    public Publisher(String string) {
        this(string, null);
    }

    @GuardedBy(value="this")
    private void checkConnection() throws IOException {
        if (this.con == null) {
            if (this.isStopping) {
                throw new NSQException("publisher stopped");
            }
            this.connect(this.nsqd);
        } else if (this.isFailover && Util.clock() - this.failoverStart > (long)(this.failoverDurationSecs * 1000)) {
            this.isFailover = false;
            this.connect(this.nsqd);
            log.info("using primary nsqd");
        }
    }

    @GuardedBy(value="this")
    private void connect(HostAndPort hostAndPort) throws IOException {
        if (this.con != null) {
            this.con.close();
        }
        this.con = new PubConnection(this.client, hostAndPort, this);
        this.con.connect(this.config);
        log.info("publisher connected:{}", (Object)hostAndPort);
    }

    public synchronized void connectionClosed(PubConnection pubConnection) {
        if (this.con == pubConnection) {
            this.con = null;
            log.debug("removed closed publisher connection:{}", (Object)pubConnection.getHost());
        }
    }

    public synchronized void publish(String string, byte[] byArray) {
        Util.checkNotNull(string);
        Util.checkNotNull(byArray);
        Util.checkArgument(byArray.length > 0);
        try {
            this.checkConnection();
            this.con.publish(string, byArray);
        }
        catch (Exception exception) {
            log.error("publish error with:{}", (Object)(this.isFailover ? this.failoverNsqd : this.nsqd), (Object)exception);
            this.publishFailover(string, byArray);
        }
    }

    public void publish(String string, String string2) {
        this.publish(string, string2.getBytes(StandardCharsets.UTF_8));
    }

    public synchronized void publishDeferred(String string, byte[] byArray, long l, TimeUnit timeUnit) {
        Util.checkNotNull(string);
        Util.checkNotNull(byArray);
        Util.checkArgument(byArray.length > 0);
        Util.checkArgument(l > 0L);
        Util.checkNotNull((Object)timeUnit);
        try {
            this.checkConnection();
            this.con.publishDeferred(string, byArray, timeUnit.toMillis(l));
        }
        catch (Exception exception) {
            throw new NSQException("deferred publish failed", exception);
        }
    }

    public void publishDeferred(String string, String string2, long l, TimeUnit timeUnit) {
        this.publishDeferred(string, string2.getBytes(StandardCharsets.UTF_8), l, timeUnit);
    }

    public synchronized void publish(String string, List<byte[]> list) {
        Util.checkNotNull(string);
        Util.checkNotNull(list);
        Util.checkArgument(list.size() > 0);
        try {
            this.checkConnection();
            this.con.publish(string, list);
        }
        catch (Exception exception) {
            log.error("publish error with:{}", (Object)(this.isFailover ? this.failoverNsqd : this.nsqd), (Object)exception);
            for (byte[] byArray : list) {
                this.publishFailover(string, byArray);
            }
        }
    }

    public void publish(String string, Messages messages) {
        this.publish(string, messages.getData());
    }

    @GuardedBy(value="this")
    private void publishFailover(String string, byte[] byArray) {
        try {
            if (this.failoverNsqd == null) {
                log.warn("publish failed but no failoverNsqd configured. Will wait and retry once.");
                Util.sleepQuietly(10000);
                this.connect(this.nsqd);
            } else if (!this.isFailover) {
                this.failoverStart = Util.clock();
                this.isFailover = true;
                this.connect(this.failoverNsqd);
                log.info("using failover nsqd:{}", (Object)this.failoverNsqd);
            }
            this.con.publish(string, byArray);
        }
        catch (Exception exception) {
            Util.closeQuietly(this.con);
            this.con = null;
            this.isFailover = false;
            throw new NSQException("publish failed", exception);
        }
    }

    public synchronized void publishBuffered(String string, byte[] byArray) {
        Util.checkNotNull(string);
        Util.checkNotNull(byArray);
        Util.checkArgument(byArray.length > 0);
        Batcher batcher = this.batchers.get(string);
        if (batcher == null) {
            batcher = new Batcher(this, string, 16384, 300);
            this.batchers.put(string, batcher);
        }
        batcher.publish(byArray);
    }

    public void publishBuffered(String string, String string2) {
        this.publishBuffered(string, string2.getBytes(StandardCharsets.UTF_8));
    }

    public synchronized void setBatchConfig(String string, int n, int n2) {
        Batcher batcher = this.batchers.get(string);
        if (batcher != null) {
            batcher.sendBatch();
        }
        batcher = new Batcher(this, string, n, n2);
        this.batchers.put(string, batcher);
    }

    synchronized ScheduledExecutorService getBatchExecutor() {
        if (this.batchExecutor == null) {
            this.batchExecutor = Executors.newScheduledThreadPool(1, Util.threadFactory("nsq-batch"));
        }
        return this.batchExecutor;
    }

    @Override
    public synchronized void stop() {
        for (Batcher batcher : this.batchers.values()) {
            batcher.sendBatch();
        }
        super.stop();
        Util.closeQuietly(this.con);
        this.con = null;
        if (this.batchExecutor != null) {
            Util.shutdownAndAwaitTermination(this.batchExecutor, 40L, TimeUnit.MILLISECONDS);
        }
    }

    public synchronized int getFailoverDurationSecs() {
        return this.failoverDurationSecs;
    }

    public synchronized void setFailoverDurationSecs(int n) {
        this.failoverDurationSecs = n;
    }
}

