/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.elasticsearch.client.elastic;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant;
import co.elastic.clients.elasticsearch.core.bulk.DeleteOperation;
import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.elasticsearch.ElasticSearchConfig;
import org.apache.pulsar.io.elasticsearch.RandomExponentialRetry;
import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ElasticBulkProcessor
implements BulkProcessor {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(ElasticBulkProcessor.class);
    private final ElasticSearchConfig config;
    private final ElasticsearchClient client;
    private final AtomicLong executionIdGen = new AtomicLong();
    private final int bulkActions;
    private final long bulkSize;
    private final List<BulkOperationWithPulsarRecord> pendingOperations = new ArrayList<BulkOperationWithPulsarRecord>();
    private final BulkRequestHandler bulkRequestHandler;
    private volatile boolean closed = false;
    private final ReentrantLock lock;
    private final ExecutorService internalExecutorService;
    private ScheduledFuture<?> futureFlushTask;
    private final ObjectMapper mapper = new ObjectMapper();

    public ElasticBulkProcessor(ElasticSearchConfig config, ElasticsearchClient client, BulkProcessor.Listener listener) {
        this.config = config;
        this.client = client;
        this.lock = new ReentrantLock();
        this.bulkActions = config.getBulkActions();
        this.bulkSize = config.getBulkSizeInMb() * 1024L * 1024L;
        this.internalExecutorService = Executors.newFixedThreadPool(Math.max(1, config.getBulkConcurrentRequests()), new ThreadFactoryBuilder().setNameFormat("elastic-bulk-executor-%d").build());
        this.bulkRequestHandler = new BulkRequestHandler(new RandomExponentialRetry(config.getMaxRetryTimeInSec()), config.getBulkConcurrentRequests(), listener);
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("elastic-flush-task-%d").build());
        if (config.getBulkFlushIntervalInMs() > 0L) {
            this.futureFlushTask = executor.scheduleWithFixedDelay(new Flush(), config.getBulkFlushIntervalInMs(), config.getBulkFlushIntervalInMs(), TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public void appendIndexRequest(BulkProcessor.BulkIndexRequest request) throws IOException {
        Map mapped = (Map)this.mapper.readValue(request.getDocumentSource(), Map.class);
        IndexOperation indexOperation = ((IndexOperation.Builder)((IndexOperation.Builder)new IndexOperation.Builder().index(request.getIndex())).id(request.getDocumentId())).document((Object)mapped).build();
        long sourceLength = 0L;
        if (this.config.getBulkSizeInMb() > 0L) {
            sourceLength = request.getDocumentSource().getBytes(StandardCharsets.UTF_8).length;
        }
        this.add(BulkOperationWithPulsarRecord.indexOperation(indexOperation, request.getRecord(), sourceLength));
    }

    @Override
    public void appendDeleteRequest(BulkProcessor.BulkDeleteRequest request) {
        DeleteOperation deleteOperation = ((DeleteOperation.Builder)((DeleteOperation.Builder)new DeleteOperation.Builder().index(request.getIndex())).id(request.getDocumentId())).build();
        this.add(BulkOperationWithPulsarRecord.deleteOperation(deleteOperation, request.getRecord()));
    }

    protected void ensureOpen() {
        if (this.closed) {
            throw new IllegalStateException("bulk process already closed");
        }
    }

    private BulkRequest createBulkRequestAndResetPendingOps() {
        BulkRequest bulkRequest = new BulkRequest.Builder().operations(new ArrayList<BulkOperationWithPulsarRecord>(this.pendingOperations)).build();
        this.pendingOperations.clear();
        return bulkRequest;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void execute(boolean force) {
        long executionId;
        BulkRequest bulkRequest;
        this.lock.lock();
        try {
            this.ensureOpen();
            if (this.pendingOperations.isEmpty()) {
                return;
            }
            if (!force && !this.isOverTheLimit()) {
                return;
            }
            bulkRequest = this.createBulkRequestAndResetPendingOps();
            executionId = this.executionIdGen.incrementAndGet();
        }
        finally {
            this.lock.unlock();
        }
        this.execute(bulkRequest, executionId);
    }

    private boolean isOverTheLimit() {
        if (this.pendingOperations.isEmpty()) {
            return false;
        }
        if (this.bulkActions > 0 && this.pendingOperations.size() >= this.bulkActions) {
            return true;
        }
        return this.bulkSize > 0L && this.pendingOperations.stream().mapToLong(op -> op.getEstimatedSizeInBytes()).sum() >= this.bulkSize;
    }

    @Override
    public void flush() {
        this.execute(true);
    }

    private void execute(BulkRequest bulkRequest, long executionId) {
        this.bulkRequestHandler.execute(bulkRequest, executionId);
    }

    private void executeIfNeeded() {
        this.execute(false);
    }

    public void add(BulkOperationWithPulsarRecord bulkOperation) {
        this.lock.lock();
        try {
            this.ensureOpen();
            this.pendingOperations.add(bulkOperation);
        }
        finally {
            this.lock.unlock();
        }
        this.executeIfNeeded();
    }

    @Override
    public void close() {
        try {
            this.lock.lock();
            try {
                if (this.closed) {
                    return;
                }
                if (this.futureFlushTask != null) {
                    this.futureFlushTask.cancel(false);
                }
                this.flush();
                this.bulkRequestHandler.awaitClose(5000L, TimeUnit.MILLISECONDS);
                this.closed = true;
            }
            finally {
                this.lock.unlock();
            }
        }
        catch (InterruptedException var2) {
            Thread.currentThread().interrupt();
        }
    }

    public final class BulkRequestHandler {
        private final BulkProcessor.Listener listener;
        private final Semaphore semaphore;
        private final RandomExponentialRetry retry;
        private final int concurrentRequests;

        BulkRequestHandler(RandomExponentialRetry retry, int concurrentRequests, BulkProcessor.Listener listener) {
            assert (concurrentRequests >= 0);
            this.concurrentRequests = concurrentRequests;
            this.retry = retry;
            this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1);
            this.listener = listener;
        }

        public void execute(BulkRequest bulkRequest, long executionId) {
            try {
                this.semaphore.acquire();
            }
            catch (InterruptedException ex2) {
                Thread.currentThread().interrupt();
                this.listener.afterBulk(executionId, this.convertBulkRequest(bulkRequest), ex2);
                return;
            }
            CompletableFuture promise = new CompletableFuture();
            Runnable responseCallable = () -> {
                Callable<BulkResponse> callable = () -> ElasticBulkProcessor.this.client.bulk(bulkRequest);
                try {
                    if (log.isDebugEnabled()) {
                        log.debug("Sending bulk {}", (Object)executionId);
                    }
                    BulkResponse bulkResponse = this.retry.retry(callable, ElasticBulkProcessor.this.config.getMaxRetries(), ElasticBulkProcessor.this.config.getRetryBackoffInMs(), "bulk");
                    if (log.isDebugEnabled()) {
                        log.debug("Sending bulk {} completed", (Object)executionId);
                    }
                    promise.complete(bulkResponse);
                }
                catch (Throwable ex) {
                    log.warn("Failed to execute bulk request {}", (Object)executionId, (Object)ex);
                    promise.completeExceptionally(ex);
                }
            };
            ElasticBulkProcessor.this.internalExecutorService.execute(responseCallable);
            CompletableFuture listenerCalledPromise = new CompletableFuture();
            ((CompletableFuture)promise.thenApply(bulkResponse -> {
                this.semaphore.release();
                this.listener.afterBulk(executionId, this.convertBulkRequest(bulkRequest), this.convertBulkResponse((BulkResponse)bulkResponse));
                listenerCalledPromise.complete(null);
                return null;
            })).exceptionally(ex -> {
                this.semaphore.release();
                this.listener.afterBulk(executionId, this.convertBulkRequest(bulkRequest), (Throwable)ex);
                log.warn("Failed to execute bulk request " + executionId, ex);
                listenerCalledPromise.complete(null);
                return null;
            });
            if (ElasticBulkProcessor.this.config.getBulkConcurrentRequests() == 0) {
                listenerCalledPromise.join();
            }
        }

        boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
            if (this.semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) {
                this.semaphore.release(this.concurrentRequests);
                return true;
            }
            return false;
        }

        private List<BulkProcessor.BulkOperationRequest> convertBulkRequest(BulkRequest bulkRequest) {
            return bulkRequest.operations().stream().map(op -> {
                BulkOperationWithPulsarRecord opWithRecord = (BulkOperationWithPulsarRecord)((Object)op);
                return BulkProcessor.BulkOperationRequest.builder().pulsarRecord(opWithRecord.getPulsarRecord()).build();
            }).collect(Collectors.toList());
        }

        private List<BulkProcessor.BulkOperationResult> convertBulkResponse(BulkResponse bulkResponse) {
            return bulkResponse.items().stream().map(responseItem -> {
                String error = responseItem.error() != null ? responseItem.error().type() : null;
                return BulkProcessor.BulkOperationResult.builder().error(error).index(responseItem.index()).documentId(responseItem.id()).build();
            }).collect(Collectors.toList());
        }
    }

    class Flush
    implements Runnable {
        Flush() {
        }

        @Override
        public void run() {
            if (!ElasticBulkProcessor.this.closed) {
                ElasticBulkProcessor.this.flush();
            }
        }
    }

    public static class BulkOperationWithPulsarRecord
    extends BulkOperation {
        private static final int REQUEST_OVERHEAD = 50;
        private final Record pulsarRecord;
        private final long estimatedSizeInBytes;

        public static BulkOperationWithPulsarRecord indexOperation(IndexOperation indexOperation, Record pulsarRecord, long sourceLength) {
            long estimatedSizeInBytes = 50L + sourceLength;
            return new BulkOperationWithPulsarRecord((BulkOperationVariant)indexOperation, pulsarRecord, estimatedSizeInBytes);
        }

        public static BulkOperationWithPulsarRecord deleteOperation(DeleteOperation indexOperation, Record pulsarRecord) {
            return new BulkOperationWithPulsarRecord((BulkOperationVariant)indexOperation, pulsarRecord, 50L);
        }

        public BulkOperationWithPulsarRecord(BulkOperationVariant value, Record pulsarRecord, long estimatedSizeInBytes) {
            super(value);
            this.pulsarRecord = pulsarRecord;
            this.estimatedSizeInBytes = estimatedSizeInBytes;
        }

        public Record getPulsarRecord() {
            return this.pulsarRecord;
        }

        public long getEstimatedSizeInBytes() {
            return this.estimatedSizeInBytes;
        }
    }
}

