/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.canal.CanalSourceConfig;
import org.apache.pulsar.io.canal.MessageUtils;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public abstract class CanalAbstractSource<V>
extends PushSource<V> {
    private static final Logger log = LoggerFactory.getLogger(CanalAbstractSource.class);
    protected Thread thread = null;
    protected volatile boolean running = false;
    private CanalConnector connector;
    private CanalSourceConfig canalSourceConfig;
    private static final String DESTINATION = "destination";
    protected final Thread.UncaughtExceptionHandler handler = (t, e) -> log.error("[{}] parse events has an error", (Object)t.getName(), (Object)e);

    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        this.canalSourceConfig = CanalSourceConfig.load(config, sourceContext);
        if (this.canalSourceConfig.getCluster().booleanValue()) {
            this.connector = CanalConnectors.newClusterConnector((String)this.canalSourceConfig.getZkServers(), (String)this.canalSourceConfig.getDestination(), (String)this.canalSourceConfig.getUsername(), (String)this.canalSourceConfig.getPassword());
            log.info("Start canal connect in cluster mode, canal cluster info {}", (Object)this.canalSourceConfig.getZkServers());
        } else {
            this.connector = CanalConnectors.newSingleConnector((SocketAddress)new InetSocketAddress(this.canalSourceConfig.getSingleHostname(), this.canalSourceConfig.getSinglePort()), (String)this.canalSourceConfig.getDestination(), (String)this.canalSourceConfig.getUsername(), (String)this.canalSourceConfig.getPassword());
            log.info("Start canal connect in standalone mode, canal server info {}:{}", (Object)this.canalSourceConfig.getSingleHostname(), (Object)this.canalSourceConfig.getSinglePort());
        }
        log.info("canal source destination {}", (Object)this.canalSourceConfig.getDestination());
        this.start();
    }

    protected void start() {
        Objects.requireNonNull(this.connector, "connector is null");
        this.thread = new Thread(this::process);
        this.thread.setName("canal source thread");
        this.thread.setUncaughtExceptionHandler(this.handler);
        this.running = true;
        this.thread.start();
    }

    public void close() throws InterruptedException {
        log.info("close canal source");
        if (!this.running) {
            return;
        }
        this.running = false;
        if (this.thread != null) {
            this.thread.interrupt();
            this.thread.join();
        }
        if (this.connector != null) {
            this.connector.disconnect();
        }
        MDC.remove((String)DESTINATION);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void process() {
        while (this.running) {
            try {
                MDC.put((String)DESTINATION, (String)this.canalSourceConfig.getDestination());
                this.connector.connect();
                log.info("start canal process");
                this.connector.subscribe();
                while (this.running) {
                    Message message = this.connector.getWithoutAck(this.canalSourceConfig.getBatchSize());
                    message.setRaw(false);
                    List<FlatMessage> flatMessages = MessageUtils.messageConverter(message);
                    long batchId = this.getMessageId(message);
                    int size = message.getEntries().size();
                    if (batchId == -1L || size == 0) {
                        try {
                            Thread.sleep(1000L);
                        }
                        catch (InterruptedException interruptedException) {}
                        continue;
                    }
                    if (flatMessages == null) continue;
                    CanalRecord<V> canalRecord = new CanalRecord<V>(this.connector);
                    canalRecord.setId(batchId);
                    canalRecord.setRecord(this.extractValue(flatMessages));
                    this.consume(canalRecord);
                }
            }
            catch (Exception e) {
                log.error("process error!", (Throwable)e);
            }
            finally {
                this.connector.disconnect();
                MDC.remove((String)DESTINATION);
            }
        }
    }

    public abstract Long getMessageId(Message var1);

    public abstract V extractValue(List<FlatMessage> var1);

    private static class CanalRecord<V>
    implements Record<V> {
        private V record;
        private Long id;
        private CanalConnector connector;

        public CanalRecord(CanalConnector connector) {
            this.connector = connector;
        }

        public Optional<String> getKey() {
            return Optional.of(Long.toString(this.id));
        }

        public V getValue() {
            return this.record;
        }

        public Optional<Long> getRecordSequence() {
            return Optional.of(this.id);
        }

        public void ack() {
            log.info("CanalRecord ack id is {}", (Object)this.id);
            this.connector.ack(this.id.longValue());
        }

        public V getRecord() {
            return this.record;
        }

        public Long getId() {
            return this.id;
        }

        public CanalConnector getConnector() {
            return this.connector;
        }

        public void setRecord(V record) {
            this.record = record;
        }

        public void setId(Long id) {
            this.id = id;
        }

        public void setConnector(CanalConnector connector) {
            this.connector = connector;
        }
    }
}

