/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.ConfirmationHandler;
import com.rabbitmq.stream.ConfirmationStatus;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.ObservationCollector;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.impl.Client;
import com.rabbitmq.stream.impl.Clock;
import com.rabbitmq.stream.impl.MessageAccumulator;
import com.rabbitmq.stream.impl.StreamProducer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.ToLongFunction;

class SimpleMessageAccumulator
implements MessageAccumulator {
    private static final Function<Message, String> NULL_FILTER_VALUE_EXTRACTOR = m -> null;
    protected final BlockingQueue<MessageAccumulator.AccumulatedEntity> messages;
    protected final Clock clock;
    private final int capacity;
    protected final Codec codec;
    private final int maxFrameSize;
    private final ToLongFunction<Message> publishSequenceFunction;
    private final Function<Message, String> filterValueExtractor;
    final String stream;
    final ObservationCollector<Object> observationCollector;

    SimpleMessageAccumulator(int capacity, Codec codec, int maxFrameSize, ToLongFunction<Message> publishSequenceFunction, Function<Message, String> filterValueExtractor, Clock clock, String stream, ObservationCollector<?> observationCollector) {
        this.capacity = capacity;
        this.messages = new LinkedBlockingQueue<MessageAccumulator.AccumulatedEntity>(capacity);
        this.codec = codec;
        this.maxFrameSize = maxFrameSize;
        this.publishSequenceFunction = publishSequenceFunction;
        this.filterValueExtractor = filterValueExtractor == null ? NULL_FILTER_VALUE_EXTRACTOR : filterValueExtractor;
        this.clock = clock;
        this.stream = stream;
        this.observationCollector = observationCollector;
    }

    @Override
    public boolean add(Message message, ConfirmationHandler confirmationHandler) {
        Object observationContext = this.observationCollector.prePublish(this.stream, message);
        Codec.EncodedMessage encodedMessage = this.codec.encode(message);
        Client.checkMessageFitsInFrame(this.maxFrameSize, encodedMessage);
        long publishingId = this.publishSequenceFunction.applyAsLong(message);
        try {
            boolean offered = this.messages.offer(new SimpleAccumulatedEntity(this.clock.time(), publishingId, this.filterValueExtractor.apply(message), encodedMessage, new SimpleConfirmationCallback(message, confirmationHandler), observationContext), 60L, TimeUnit.SECONDS);
            if (!offered) {
                throw new StreamException("Could not accumulate outbound message");
            }
        }
        catch (InterruptedException e) {
            throw new StreamException("Error while accumulating outbound message", e);
        }
        return this.messages.size() == this.capacity;
    }

    @Override
    public MessageAccumulator.AccumulatedEntity get() {
        MessageAccumulator.AccumulatedEntity entity = (MessageAccumulator.AccumulatedEntity)this.messages.poll();
        if (entity != null) {
            this.observationCollector.published(entity.observationContext(), entity.confirmationCallback().message());
        }
        return entity;
    }

    @Override
    public boolean isEmpty() {
        return this.messages.isEmpty();
    }

    @Override
    public int size() {
        return this.messages.size();
    }

    private static final class SimpleConfirmationCallback
    implements StreamProducer.ConfirmationCallback {
        private final Message message;
        private final ConfirmationHandler confirmationHandler;

        private SimpleConfirmationCallback(Message message, ConfirmationHandler confirmationHandler) {
            this.message = message;
            this.confirmationHandler = confirmationHandler;
        }

        @Override
        public int handle(boolean confirmed, short code) {
            this.confirmationHandler.handle(new ConfirmationStatus(this.message, confirmed, code));
            return 1;
        }

        @Override
        public Message message() {
            return this.message;
        }
    }

    private static final class SimpleAccumulatedEntity
    implements MessageAccumulator.AccumulatedEntity {
        private final long time;
        private final long publishingId;
        private final String filterValue;
        private final Codec.EncodedMessage encodedMessage;
        private final StreamProducer.ConfirmationCallback confirmationCallback;
        private final Object observationContext;

        private SimpleAccumulatedEntity(long time, long publishingId, String filterValue, Codec.EncodedMessage encodedMessage, StreamProducer.ConfirmationCallback confirmationCallback, Object observationContext) {
            this.time = time;
            this.publishingId = publishingId;
            this.encodedMessage = encodedMessage;
            this.filterValue = filterValue;
            this.confirmationCallback = confirmationCallback;
            this.observationContext = observationContext;
        }

        @Override
        public long publishingId() {
            return this.publishingId;
        }

        @Override
        public String filterValue() {
            return this.filterValue;
        }

        @Override
        public Object encodedEntity() {
            return this.encodedMessage;
        }

        @Override
        public long time() {
            return this.time;
        }

        @Override
        public StreamProducer.ConfirmationCallback confirmationCallback() {
            return this.confirmationCallback;
        }

        @Override
        public Object observationContext() {
            return this.observationContext;
        }
    }
}

