/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.dynamodb;

import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.google.common.base.Preconditions;
import java.net.InetAddress;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.io.aws.AbstractAwsConnector;
import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.apache.pulsar.io.dynamodb.DynamoDBSourceConfig;
import org.apache.pulsar.io.dynamodb.StreamsRecord;
import org.apache.pulsar.io.dynamodb.StreamsRecordProcessorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Connector(name="dynamodb", type=IOType.SOURCE, help="A source connector that copies messages from DynamoDB Streams to Pulsar", configClass=DynamoDBSourceConfig.class)
public class DynamoDBSource
extends AbstractAwsConnector
implements Source<byte[]> {
    private static final Logger log = LoggerFactory.getLogger(DynamoDBSource.class);
    private LinkedBlockingQueue<StreamsRecord> queue;
    private DynamoDBSourceConfig dynamodbSourceConfig;
    private KinesisClientLibConfiguration kinesisClientLibConfig;
    private IRecordProcessorFactory recordProcessorFactory;
    private String workerId;
    private Worker worker;
    private Thread workerThread;
    private Throwable threadEx;

    public void close() throws Exception {
        this.worker.shutdown();
    }

    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
        this.dynamodbSourceConfig = DynamoDBSourceConfig.load(config, sourceContext);
        Preconditions.checkArgument((boolean)StringUtils.isNotBlank((CharSequence)this.dynamodbSourceConfig.getAwsDynamodbStreamArn()), (Object)"empty dynamo-stream arn");
        Preconditions.checkArgument((boolean)StringUtils.isNotBlank((CharSequence)this.dynamodbSourceConfig.getAwsRegion()), (Object)"The aws-region must be set");
        Preconditions.checkArgument((boolean)StringUtils.isNotBlank((CharSequence)this.dynamodbSourceConfig.getAwsCredentialPluginParam()), (Object)"empty aws-credential param");
        if (this.dynamodbSourceConfig.getInitialPositionInStream() == InitialPositionInStream.AT_TIMESTAMP) {
            Preconditions.checkArgument((this.dynamodbSourceConfig.getStartAtTime() != null ? 1 : 0) != 0, (Object)"Timestamp must be specified");
        }
        this.queue = new LinkedBlockingQueue(this.dynamodbSourceConfig.getReceiveQueueSize());
        this.workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
        AwsCredentialProviderPlugin credentialsProvider = this.createCredentialProvider(this.dynamodbSourceConfig.getAwsCredentialPluginName(), this.dynamodbSourceConfig.getAwsCredentialPluginParam());
        AmazonDynamoDBStreams dynamoDBStreamsClient = this.dynamodbSourceConfig.buildDynamoDBStreamsClient(credentialsProvider);
        AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(dynamoDBStreamsClient);
        this.recordProcessorFactory = new StreamsRecordProcessorFactory(this.queue, this.dynamodbSourceConfig);
        this.kinesisClientLibConfig = new KinesisClientLibConfiguration(this.dynamodbSourceConfig.getApplicationName(), this.dynamodbSourceConfig.getAwsDynamodbStreamArn(), credentialsProvider.getCredentialProvider(), this.workerId).withRegionName(this.dynamodbSourceConfig.getAwsRegion()).withInitialPositionInStream(this.dynamodbSourceConfig.getInitialPositionInStream());
        if (this.kinesisClientLibConfig.getInitialPositionInStream() == InitialPositionInStream.AT_TIMESTAMP) {
            this.kinesisClientLibConfig.withTimestampAtInitialPositionInStream(this.dynamodbSourceConfig.getStartAtTime());
        }
        this.worker = StreamsWorkerFactory.createDynamoDbStreamsWorker((IRecordProcessorFactory)this.recordProcessorFactory, (KinesisClientLibConfiguration)this.kinesisClientLibConfig, (AmazonDynamoDBStreamsAdapterClient)adapterClient, (AmazonDynamoDB)this.dynamodbSourceConfig.buildDynamoDBClient(credentialsProvider), (AmazonCloudWatch)this.dynamodbSourceConfig.buildCloudwatchClient(credentialsProvider));
        this.workerThread = new Thread((Runnable)this.worker);
        this.workerThread.setDaemon(true);
        this.threadEx = null;
        this.workerThread.setUncaughtExceptionHandler((t, ex) -> {
            this.threadEx = ex;
            log.error("Worker died with error", ex);
        });
        this.workerThread.start();
    }

    public StreamsRecord read() throws Exception {
        try {
            return this.queue.take();
        }
        catch (InterruptedException ex) {
            log.warn("Got interrupted when trying to fetch out of the queue");
            if (this.threadEx != null) {
                log.error("error from scheduler", this.threadEx);
            }
            throw ex;
        }
    }
}

