/*
 * Decompiled with CFR 0.152.
 */
package io.awspring.cloud.sqs.listener.pipeline;

import io.awspring.cloud.sqs.CompletableFutures;
import io.awspring.cloud.sqs.MessageHeaderUtils;
import io.awspring.cloud.sqs.listener.ListenerExecutionFailedException;
import io.awspring.cloud.sqs.listener.MessageProcessingContext;
import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler;
import io.awspring.cloud.sqs.listener.pipeline.MessageProcessingConfiguration;
import io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipeline;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;

public class ErrorHandlerExecutionStage<T>
implements MessageProcessingPipeline<T> {
    private static final Logger logger = LoggerFactory.getLogger(ErrorHandlerExecutionStage.class);
    private final AsyncErrorHandler<T> errorHandler;

    public ErrorHandlerExecutionStage(MessageProcessingConfiguration<T> context) {
        this.errorHandler = context.getErrorHandler();
    }

    @Override
    public CompletableFuture<Message<T>> process(CompletableFuture<Message<T>> messageFuture, MessageProcessingContext<T> context) {
        return this.errorHandler == null ? messageFuture : CompletableFutures.exceptionallyCompose(messageFuture, t -> this.handleError(ListenerExecutionFailedException.unwrapMessage(t), (Throwable)t));
    }

    private CompletableFuture<Message<T>> handleError(Message<T> failedMessage, Throwable t) {
        logger.debug("Handling error {} for message {}", (Object)t, (Object)MessageHeaderUtils.getId(failedMessage));
        return CompletableFutures.exceptionallyCompose(this.errorHandler.handle(failedMessage, t).thenApply(theVoid -> failedMessage), eht -> CompletableFutures.failedFuture(this.maybeWrap(failedMessage, (Throwable)eht)));
    }

    private Throwable maybeWrap(Message<T> failedMessage, Throwable eht) {
        return ListenerExecutionFailedException.hasListenerException(eht) ? eht : new ListenerExecutionFailedException("Error handler returned an exception", eht, failedMessage);
    }

    @Override
    public CompletableFuture<Collection<Message<T>>> processMany(CompletableFuture<Collection<Message<T>>> messagesFuture, MessageProcessingContext<T> context) {
        return this.errorHandler == null ? messagesFuture : CompletableFutures.exceptionallyCompose(messagesFuture, t -> this.handleErrors(ListenerExecutionFailedException.unwrapMessages(t), (Throwable)t));
    }

    private CompletableFuture<Collection<Message<T>>> handleErrors(Collection<Message<T>> failedMessages, Throwable t) {
        logger.debug("Handling error {} for message {}", (Object)t, (Object)MessageHeaderUtils.getId(failedMessages));
        return CompletableFutures.exceptionallyCompose(this.errorHandler.handle(failedMessages, t).thenApply(theVoid -> failedMessages), eht -> CompletableFutures.failedFuture(this.maybeWrap(failedMessages, (Throwable)eht)));
    }

    private Throwable maybeWrap(Collection<Message<T>> failedMessages, Throwable eht) {
        return ListenerExecutionFailedException.hasListenerException(eht) ? eht : new ListenerExecutionFailedException("Error handler returned an exception", eht, failedMessages);
    }
}

