/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.executor;

import com.google.common.annotations.VisibleForTesting;
import com.lmax.disruptor.EventHandler;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.Callable;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.serialization.KryoTupleSerializer;
import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.DisruptorQueue;
import org.apache.storm.utils.MutableObject;
import org.apache.storm.utils.ObjectReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutorTransfer
implements EventHandler,
Callable {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class);
    private final WorkerState workerData;
    private final DisruptorQueue batchTransferQueue;
    private final Map<String, Object> topoConf;
    private final KryoTupleSerializer serializer;
    private final MutableObject cachedEmit;
    private final boolean isDebug;

    public ExecutorTransfer(WorkerState workerData, DisruptorQueue batchTransferQueue, Map<String, Object> topoConf) {
        this.workerData = workerData;
        this.batchTransferQueue = batchTransferQueue;
        this.topoConf = topoConf;
        this.serializer = new KryoTupleSerializer(topoConf, workerData.getWorkerTopologyContext());
        this.cachedEmit = new MutableObject(new ArrayList());
        this.isDebug = ObjectReader.getBoolean(topoConf.get("topology.debug"), false);
    }

    public void transfer(int task, Tuple tuple) {
        AddressedTuple val = new AddressedTuple(task, tuple);
        if (this.isDebug) {
            LOG.info("TRANSFERRING tuple {}", (Object)val);
        }
        this.batchTransferQueue.publish(val);
    }

    @VisibleForTesting
    public DisruptorQueue getBatchTransferQueue() {
        return this.batchTransferQueue;
    }

    public Object call() throws Exception {
        this.batchTransferQueue.consumeBatchWhenAvailable(this);
        return 0L;
    }

    public String getName() {
        return this.batchTransferQueue.getName();
    }

    public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
        ArrayList cachedEvents = (ArrayList)this.cachedEmit.getObject();
        cachedEvents.add(event);
        if (endOfBatch) {
            this.workerData.transfer(this.serializer, cachedEvents);
            this.cachedEmit.setObject(new ArrayList());
        }
    }
}

