/*
 * Decompiled with CFR 0.152.
 */
package org.hibernate.search.mapper.orm.outboxpolling.event.impl;

import jakarta.persistence.OptimisticLockException;
import jakarta.persistence.PersistenceException;
import jakarta.persistence.PessimisticLockException;
import java.lang.invoke.MethodHandles;
import java.time.Clock;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.hibernate.Session;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.search.engine.backend.orchestration.spi.SingletonTask;
import org.hibernate.search.engine.cfg.ConfigurationPropertySource;
import org.hibernate.search.engine.cfg.spi.ConfigurationProperty;
import org.hibernate.search.engine.cfg.spi.OptionalConfigurationProperty;
import org.hibernate.search.engine.reporting.FailureHandler;
import org.hibernate.search.mapper.orm.automaticindexing.spi.AutomaticIndexingMappingContext;
import org.hibernate.search.mapper.orm.common.spi.SessionHelper;
import org.hibernate.search.mapper.orm.common.spi.TransactionHelper;
import org.hibernate.search.mapper.orm.outboxpolling.cluster.impl.AgentRepositoryProvider;
import org.hibernate.search.mapper.orm.outboxpolling.cluster.impl.ShardAssignmentDescriptor;
import org.hibernate.search.mapper.orm.outboxpolling.event.impl.AgentClusterLinkContextProvider;
import org.hibernate.search.mapper.orm.outboxpolling.event.impl.OutboxConfigUtils;
import org.hibernate.search.mapper.orm.outboxpolling.event.impl.OutboxEvent;
import org.hibernate.search.mapper.orm.outboxpolling.event.impl.OutboxEventFinder;
import org.hibernate.search.mapper.orm.outboxpolling.event.impl.OutboxEventFinderProvider;
import org.hibernate.search.mapper.orm.outboxpolling.event.impl.OutboxEventLoader;
import org.hibernate.search.mapper.orm.outboxpolling.event.impl.OutboxEventProcessingPlan;
import org.hibernate.search.mapper.orm.outboxpolling.event.impl.OutboxEventUpdater;
import org.hibernate.search.mapper.orm.outboxpolling.event.impl.OutboxPollingEventProcessingInstructions;
import org.hibernate.search.mapper.orm.outboxpolling.event.impl.OutboxPollingEventProcessorClusterLink;
import org.hibernate.search.mapper.orm.outboxpolling.event.impl.ShardAssignment;
import org.hibernate.search.mapper.orm.outboxpolling.logging.impl.Log;
import org.hibernate.search.util.common.impl.Closer;
import org.hibernate.search.util.common.logging.impl.LoggerFactory;
import org.hibernate.search.util.common.spi.ToStringTreeAppendable;
import org.hibernate.search.util.common.spi.ToStringTreeAppender;

public final class OutboxPollingEventProcessor
implements ToStringTreeAppendable {
    private static final Log log = (Log)LoggerFactory.make(Log.class, (MethodHandles.Lookup)MethodHandles.lookup());
    private static final ConfigurationProperty<Integer> POLLING_INTERVAL = ConfigurationProperty.forKey((String)"event_processor.polling_interval").asIntegerStrictlyPositive().withDefault((Object)100).build();
    private static final ConfigurationProperty<Integer> PULSE_INTERVAL = ConfigurationProperty.forKey((String)"event_processor.pulse_interval").asIntegerStrictlyPositive().withDefault((Object)2000).build();
    private static final ConfigurationProperty<Integer> PULSE_EXPIRATION = ConfigurationProperty.forKey((String)"event_processor.pulse_expiration").asIntegerStrictlyPositive().withDefault((Object)30000).build();
    private static final ConfigurationProperty<Integer> BATCH_SIZE = ConfigurationProperty.forKey((String)"event_processor.batch_size").asIntegerStrictlyPositive().withDefault((Object)50).build();
    private static final OptionalConfigurationProperty<Integer> TRANSACTION_TIMEOUT = ConfigurationProperty.forKey((String)"event_processor.transaction_timeout").asIntegerStrictlyPositive().build();
    private static final ConfigurationProperty<Integer> RETRY_DELAY = ConfigurationProperty.forKey((String)"event_processor.retry_delay").asIntegerPositiveOrZero().withDefault((Object)30).build();
    private final String name;
    private final AutomaticIndexingMappingContext mapping;
    private final OutboxEventLoader loader;
    private final long pollingInterval;
    private final int batchSize;
    private final int retryDelay;
    private final AtomicReference<Status> status = new AtomicReference<Status>(Status.STOPPED);
    private final OutboxPollingEventProcessorClusterLink clusterLink;
    private final TransactionHelper transactionHelper;
    private final SessionHelper sessionHelper;
    private final AgentClusterLinkContextProvider clusterLinkContextProvider;
    private final FailureHandler failureHandler;
    private final Worker worker;
    private final SingletonTask processingTask;

    public static String namePrefix(String tenantId) {
        StringBuilder prefix = new StringBuilder("Outbox event processor");
        if (tenantId != null) {
            prefix.append(" - Tenant <").append(tenantId).append(">");
        }
        return prefix.toString();
    }

    public static Factory factory(AutomaticIndexingMappingContext mapping, Clock clock, String tenantId, ConfigurationPropertySource configurationSource) {
        OutboxEventLoader loader = new OutboxEventLoader(mapping.sessionFactory().getJdbcServices().getDialect());
        Duration pollingInterval = (Duration)POLLING_INTERVAL.getAndTransform(configurationSource, Duration::ofMillis);
        Duration pulseInterval = (Duration)PULSE_INTERVAL.getAndTransform(configurationSource, v -> OutboxConfigUtils.checkPulseInterval(Duration.ofMillis(v.intValue()), pollingInterval));
        Duration pulseExpiration = (Duration)PULSE_EXPIRATION.getAndTransform(configurationSource, v -> OutboxConfigUtils.checkPulseExpiration(Duration.ofMillis(v.intValue()), pulseInterval));
        int batchSize = (Integer)BATCH_SIZE.get(configurationSource);
        int retryDelay = (Integer)RETRY_DELAY.get(configurationSource);
        Integer transactionTimeout = ((Optional)TRANSACTION_TIMEOUT.get(configurationSource)).orElse(null);
        return new Factory(mapping, clock, tenantId, loader, pollingInterval, pulseInterval, pulseExpiration, batchSize, retryDelay, transactionTimeout);
    }

    public OutboxPollingEventProcessor(String name, Factory factory, ScheduledExecutorService executor, AgentRepositoryProvider agentRepositoryProvider, OutboxPollingEventProcessorClusterLink clusterLink) {
        this.name = name;
        this.mapping = factory.mapping;
        String tenantId = factory.tenantId;
        this.loader = factory.loader;
        this.pollingInterval = factory.pollingInterval.toMillis();
        this.batchSize = factory.batchSize;
        this.retryDelay = factory.retryDelay;
        this.clusterLink = clusterLink;
        this.transactionHelper = new TransactionHelper(this.mapping.sessionFactory(), factory.transactionTimeout);
        this.sessionHelper = new SessionHelper(this.mapping.sessionFactory(), (Object)tenantId);
        this.clusterLinkContextProvider = new AgentClusterLinkContextProvider(this.transactionHelper, this.sessionHelper, agentRepositoryProvider);
        this.failureHandler = this.mapping.failureHandler();
        this.worker = new Worker();
        this.processingTask = new SingletonTask(name, (SingletonTask.Worker)this.worker, (SingletonTask.Scheduler)new Scheduler(executor), this.failureHandler);
    }

    public String toString() {
        return this.toStringTree();
    }

    public void appendTo(ToStringTreeAppender appender) {
        appender.attribute("name", (Object)this.name).attribute("loader", (Object)this.loader).attribute("pollingInterval", (Object)this.pollingInterval).attribute("batchSize", (Object)this.batchSize).attribute("retryDelay", (Object)this.retryDelay).attribute("clusterLink", (Object)this.clusterLink);
    }

    public void start() {
        log.startingOutboxEventProcessor(this.name, this);
        this.status.set(Status.STARTED);
        this.processingTask.ensureScheduled();
    }

    public CompletableFuture<?> completion() {
        return this.processingTask.completion();
    }

    public CompletableFuture<?> preStop() {
        this.status.set(Status.STOPPED);
        return this.processingTask.completion();
    }

    public void stop() {
        log.stoppingOutboxEventProcessor(this.name);
        try (Closer closer = new Closer();){
            closer.push(SingletonTask::stop, (Object)this.processingTask);
            closer.push(OutboxPollingEventProcessor::leaveCluster, (Object)this);
        }
    }

    private void leaveCluster() {
        this.clusterLinkContextProvider.inTransaction(this.clusterLink::leaveCluster);
    }

    public static class Factory {
        private final AutomaticIndexingMappingContext mapping;
        private final Clock clock;
        private final String tenantId;
        private final OutboxEventLoader loader;
        private final Duration pollingInterval;
        private final Duration pulseInterval;
        private final Duration pulseExpiration;
        private final int batchSize;
        private final int retryDelay;
        private final Integer transactionTimeout;

        private Factory(AutomaticIndexingMappingContext mapping, Clock clock, String tenantId, OutboxEventLoader loader, Duration pollingInterval, Duration pulseInterval, Duration pulseExpiration, int batchSize, int retryDelay, Integer transactionTimeout) {
            this.mapping = mapping;
            this.clock = clock;
            this.tenantId = tenantId;
            this.loader = loader;
            this.pollingInterval = pollingInterval;
            this.pulseInterval = pulseInterval;
            this.pulseExpiration = pulseExpiration;
            this.batchSize = batchSize;
            this.retryDelay = retryDelay;
            this.transactionTimeout = transactionTimeout;
        }

        public OutboxPollingEventProcessor create(ScheduledExecutorService scheduledExecutor, OutboxEventFinderProvider finderProvider, AgentRepositoryProvider agentRepositoryProvider, ShardAssignmentDescriptor shardAssignmentOrNull) {
            String agentName = OutboxPollingEventProcessor.namePrefix(this.tenantId) + (String)(shardAssignmentOrNull == null ? "" : " - " + shardAssignmentOrNull.assignedShardIndex);
            OutboxPollingEventProcessorClusterLink clusterLink = new OutboxPollingEventProcessorClusterLink(agentName, this.mapping.failureHandler(), this.clock, new ShardAssignment.Provider(finderProvider), this.pollingInterval, this.pulseInterval, this.pulseExpiration, shardAssignmentOrNull);
            return new OutboxPollingEventProcessor(agentName, this, scheduledExecutor, agentRepositoryProvider, clusterLink);
        }
    }

    private static enum Status {
        STOPPED,
        STARTED;

    }

    private class Worker
    implements SingletonTask.Worker {
        private volatile OutboxPollingEventProcessingInstructions instructions;
        private volatile boolean lastExecutionProcessedEvents;

        private Worker() {
        }

        public CompletableFuture<?> work() {
            block12: {
                block11: {
                    this.lastExecutionProcessedEvents = false;
                    if (this.instructions == null) break block11;
                    if (this.instructions.isStillValid()) break block12;
                }
                this.instructions = OutboxPollingEventProcessor.this.clusterLinkContextProvider.inTransaction(OutboxPollingEventProcessor.this.clusterLink::pulse);
            }
            if (!this.instructions.eventFinder.isPresent()) {
                return CompletableFuture.completedFuture(null);
            }
            try (SessionImplementor session = OutboxPollingEventProcessor.this.sessionHelper.openSession();){
                OutboxEventProcessingPlan eventProcessing = new OutboxEventProcessingPlan(OutboxPollingEventProcessor.this.mapping, (Session)session);
                OutboxPollingEventProcessor.this.transactionHelper.inTransaction((SharedSessionContractImplementor)session, () -> {
                    List<OutboxEvent> events;
                    Optional<OutboxEventFinder> eventFinder = this.instructions.eventFinder;
                    if (!eventFinder.isPresent()) {
                        return;
                    }
                    try {
                        events = eventFinder.get().findOutboxEvents((Session)session, OutboxPollingEventProcessor.this.batchSize);
                        if (events.isEmpty()) {
                            return;
                        }
                    }
                    catch (OptimisticLockException | PessimisticLockException lockException) {
                        log.eventProcessorFindEventsUnableToLock(OutboxPollingEventProcessor.this.name, (PersistenceException)lockException);
                        return;
                    }
                    this.lastExecutionProcessedEvents = true;
                    this.ensureScheduled();
                    log.tracef("Processing %d outbox events for '%s': '%s'", events.size(), OutboxPollingEventProcessor.this.name, events);
                    eventProcessing.processEvents(events);
                });
                OutboxEventUpdater eventUpdater = new OutboxEventUpdater(OutboxPollingEventProcessor.this.failureHandler, OutboxPollingEventProcessor.this.loader, eventProcessing, session, OutboxPollingEventProcessor.this.name, OutboxPollingEventProcessor.this.retryDelay);
                while (eventUpdater.thereAreStillEventsToProcess()) {
                    OutboxPollingEventProcessor.this.transactionHelper.inTransaction((SharedSessionContractImplementor)session, eventUpdater::process);
                }
                CompletableFuture<Object> completableFuture = CompletableFuture.completedFuture(null);
                return completableFuture;
            }
        }

        public void complete() {
            this.ensureScheduled();
        }

        private void ensureScheduled() {
            if (OutboxPollingEventProcessor.this.status.get() == Status.STARTED) {
                OutboxPollingEventProcessor.this.processingTask.ensureScheduled();
            }
        }
    }

    private class Scheduler
    implements SingletonTask.Scheduler {
        private final ScheduledExecutorService delegate;

        private Scheduler(ScheduledExecutorService delegate) {
            this.delegate = delegate;
        }

        public Future<?> schedule(Runnable runnable) {
            OutboxPollingEventProcessingInstructions instructions = OutboxPollingEventProcessor.this.worker.instructions;
            if (instructions == null) {
                return this.delegate.schedule(runnable, OutboxPollingEventProcessor.this.pollingInterval, TimeUnit.MILLISECONDS);
            }
            if (instructions.eventFinder.isPresent()) {
                if (OutboxPollingEventProcessor.this.worker.lastExecutionProcessedEvents) {
                    return this.delegate.submit(runnable);
                }
                return this.delegate.schedule(runnable, OutboxPollingEventProcessor.this.pollingInterval, TimeUnit.MILLISECONDS);
            }
            return this.delegate.schedule(runnable, instructions.timeInMillisecondsToExpiration(), TimeUnit.MILLISECONDS);
        }
    }
}

