/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.flume.sink;

import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.flume.FlumeConfig;
import org.apache.pulsar.io.flume.FlumeConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractSink<T>
implements Sink<T> {
    private static final Logger log = LoggerFactory.getLogger(AbstractSink.class);
    protected static BlockingQueue<Object> records;
    protected FlumeConnector flumeConnector;

    public abstract T extractValue(Record<T> var1);

    public static BlockingQueue<Object> getQueue() {
        return records;
    }

    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
        records = new LinkedBlockingQueue<Object>();
        FlumeConfig flumeConfig = FlumeConfig.load(config);
        this.flumeConnector = new FlumeConnector();
        this.flumeConnector.startConnector(flumeConfig);
    }

    public void write(Record<T> record) {
        try {
            T message = this.extractValue(record);
            records.put(message);
            record.ack();
        }
        catch (InterruptedException e) {
            record.fail();
            log.error("error", (Throwable)e);
        }
    }

    public void close() {
        if (this.flumeConnector != null) {
            this.flumeConnector.stop();
        }
    }
}

