/*
 * Decompiled with CFR 0.152.
 */
package reactor.io.net.impl.zmq;

import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZContext;
import org.zeromq.ZLoop;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
import reactor.rx.broadcast.Broadcaster;

public abstract class ZeroMQWorker
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(ZeroMQWorker.class);
    public static final byte[] CLOSE_MSG = "ZMQ.EVENT_CLOSED".getBytes();
    private final ZLoop zloop = new ZLoop();
    private final UUID id;
    private final int socketType;
    private final int ioThreadCount;
    private final ZLoop.IZLoopHandler inputHandler;
    private final Broadcaster<ZMsg> b;
    private volatile boolean closed;
    private volatile boolean shutdownCtx;
    private volatile ZContext zmq;
    private volatile ZMQ.Socket socket;
    private volatile ZMQ.PollItem pollin;

    public ZeroMQWorker(UUID id, int socketType, int ioThreadCount, ZContext zmq, final Broadcaster<ZMsg> b) {
        this.id = id;
        this.socketType = socketType;
        this.ioThreadCount = ioThreadCount;
        this.zmq = zmq;
        this.b = b;
        this.inputHandler = new ZLoop.IZLoopHandler(){

            public int handle(ZLoop loop, ZMQ.PollItem item, Object arg) {
                ZMsg msg = ZMsg.recvMsg((ZMQ.Socket)ZeroMQWorker.this.socket);
                if (null == msg || msg.size() == 0) {
                    return 0;
                }
                if (ZeroMQWorker.this.closed) {
                    return -1;
                }
                b.onNext((Object)msg);
                return 0;
            }
        };
    }

    @Override
    public void run() {
        try {
            if (this.closed) {
                return;
            }
            if (null == this.zmq) {
                this.zmq = new ZContext(this.ioThreadCount);
                this.shutdownCtx = true;
            }
            this.socket = this.zmq.createSocket(this.socketType);
            this.socket.setIdentity(this.id.toString().getBytes());
            this.configure(this.socket);
            this.pollin = new ZMQ.PollItem(this.socket, 1);
            if (log.isDebugEnabled()) {
                this.zloop.verbose(true);
            }
            this.zloop.addPoller(this.pollin, this.inputHandler, null);
            this.start(this.socket);
            this.zloop.start();
            this.zmq.destroySocket(this.socket);
        }
        catch (Exception e) {
            this.b.onError((Throwable)e);
        }
    }

    public void shutdown() {
        if (this.closed) {
            return;
        }
        this.zloop.removePoller(this.pollin);
        this.zloop.destroy();
        this.closed = true;
        if (this.shutdownCtx) {
            this.zmq.destroy();
        }
        this.b.onComplete();
    }

    protected abstract void configure(ZMQ.Socket var1);

    protected abstract void start(ZMQ.Socket var1);
}

