/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.server.consumer;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.message.MessageContainer;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.converter.MessageConversionException;
import org.apache.qpid.server.queue.SuspendedConsumerLoggingTicker;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractConsumerTarget<T extends AbstractConsumerTarget<T>>
implements ConsumerTarget<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConsumerTarget.class);
    private static final LogSubject MULTI_QUEUE_LOG_SUBJECT = () -> "[(** Multi-Queue **)] ";
    protected final AtomicLong _unacknowledgedBytes = new AtomicLong(0L);
    protected final AtomicLong _unacknowledgedCount = new AtomicLong(0L);
    private final AtomicReference<ConsumerTarget.State> _state = new AtomicReference<ConsumerTarget.State>(ConsumerTarget.State.OPEN);
    private final boolean _isMultiQueue;
    private final SuspendedConsumerLoggingTicker _suspendedConsumerLoggingTicker;
    private final List<MessageInstanceConsumer> _consumers = new CopyOnWriteArrayList<MessageInstanceConsumer>();
    private final AtomicBoolean _scheduled = new AtomicBoolean();
    private volatile Iterator<MessageInstanceConsumer> _pullIterator;
    private volatile boolean _notifyWorkDesired;

    protected AbstractConsumerTarget(boolean isMultiQueue, final AMQPConnection<?> amqpConnection) {
        this._isMultiQueue = isMultiQueue;
        this._suspendedConsumerLoggingTicker = new SuspendedConsumerLoggingTicker(amqpConnection.getContextValue(Long.class, "consumer.suspendNotificationPeriod")){

            @Override
            protected void log(long period) {
                amqpConnection.getEventLogger().message(AbstractConsumerTarget.this.getLogSubject(), SubscriptionMessages.STATE(period));
            }
        };
    }

    private LogSubject getLogSubject() {
        if (this._consumers.size() == 1 && this._consumers.get(0) instanceof LogSubject) {
            return (LogSubject)((Object)this._consumers.get(0));
        }
        return MULTI_QUEUE_LOG_SUBJECT;
    }

    @Override
    public void acquisitionRemoved(MessageInstance node) {
    }

    @Override
    public boolean isMultiQueue() {
        return this._isMultiQueue;
    }

    @Override
    public void notifyWork() {
        AbstractConsumerTarget target = this;
        this.getSession().notifyWork(target);
    }

    protected final void setNotifyWorkDesired(boolean desired) {
        if (desired != this._notifyWorkDesired) {
            if (desired) {
                this.getSession().removeTicker(this._suspendedConsumerLoggingTicker);
            } else {
                this._suspendedConsumerLoggingTicker.setStartTime(System.currentTimeMillis());
                this.getSession().addTicker(this._suspendedConsumerLoggingTicker);
            }
            for (MessageInstanceConsumer consumer : this._consumers) {
                consumer.setNotifyWorkDesired(desired);
            }
            this._notifyWorkDesired = desired;
        }
    }

    @Override
    public final boolean isNotifyWorkDesired() {
        return this._notifyWorkDesired;
    }

    @Override
    public boolean processPending() {
        if (this.getSession() == null || !this.getSession().getAMQPConnection().isIOThread()) {
            return false;
        }
        return this.sendNextMessage();
    }

    @Override
    public void consumerAdded(MessageInstanceConsumer sub) {
        this._consumers.add(sub);
    }

    @Override
    public ListenableFuture<Void> consumerRemoved(MessageInstanceConsumer sub) {
        if (this._consumers.contains(sub)) {
            return this.doOnIoThreadAsync(() -> this.consumerRemovedInternal(sub));
        }
        return Futures.immediateFuture(null);
    }

    private ListenableFuture<Void> doOnIoThreadAsync(Runnable task) {
        return this.getSession().getAMQPConnection().doOnIOThreadAsync(task);
    }

    private void consumerRemovedInternal(MessageInstanceConsumer sub) {
        this._consumers.remove(sub);
        if (this._consumers.isEmpty()) {
            this.close();
        }
    }

    public List<MessageInstanceConsumer> getConsumers() {
        return this._consumers;
    }

    @Override
    public final boolean isSuspended() {
        return !this.isNotifyWorkDesired();
    }

    @Override
    public final ConsumerTarget.State getState() {
        return this._state.get();
    }

    @Override
    public final void send(MessageInstanceConsumer consumer, MessageInstance entry, boolean batch) {
        this.doSend(consumer, entry, batch);
        if (consumer.acquires()) {
            entry.makeAcquisitionStealable();
        }
    }

    @Override
    public long getUnacknowledgedMessages() {
        return this._unacknowledgedCount.longValue();
    }

    @Override
    public long getUnacknowledgedBytes() {
        return this._unacknowledgedBytes.longValue();
    }

    protected abstract void doSend(MessageInstanceConsumer var1, MessageInstance var2, boolean var3);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean sendNextMessage() {
        MessageContainer messageContainer = null;
        MessageInstanceConsumer consumer = null;
        boolean iteratedCompleteList = false;
        while (messageContainer == null) {
            if (this._pullIterator == null || !this._pullIterator.hasNext()) {
                if (iteratedCompleteList) break;
                iteratedCompleteList = true;
                this._pullIterator = this.getConsumers().iterator();
            }
            if (!this._pullIterator.hasNext()) continue;
            consumer = this._pullIterator.next();
            messageContainer = consumer.pullMessage();
        }
        if (messageContainer != null) {
            block17: {
                MessageInstance entry = messageContainer.getMessageInstance();
                try {
                    this.send(consumer, entry, false);
                }
                catch (MessageConversionException mce) {
                    this.restoreCredit(entry.getMessage());
                    TransactionLogResource owningResource = entry.getOwningResource();
                    if (owningResource instanceof MessageSource) {
                        MessageSource.MessageConversionExceptionHandlingPolicy handlingPolicy = ((MessageSource)owningResource).getMessageConversionExceptionHandlingPolicy();
                        switch (handlingPolicy) {
                            case CLOSE: {
                                entry.release(consumer);
                                throw new ConnectionScopedRuntimeException(String.format("Unable to convert message %s for this consumer", entry.getMessage()), mce);
                            }
                            case ROUTE_TO_ALTERNATE: {
                                if (consumer.acquires()) {
                                    int enqueues = entry.routeToAlternate(null, null);
                                    if (enqueues == 0) {
                                        LOGGER.info("Failed to convert message {} for this consumer because '{}'.  Message discarded.", (Object)entry.getMessage(), (Object)mce.getMessage());
                                        break;
                                    }
                                    LOGGER.info("Failed to convert message {} for this consumer because '{}'.  Message routed to alternate.", (Object)entry.getMessage(), (Object)mce.getMessage());
                                    break;
                                }
                                LOGGER.info("Failed to convert message {} for this browser because '{}'.  Message skipped.", (Object)entry.getMessage(), (Object)mce.getMessage());
                                break;
                            }
                            case REJECT: {
                                entry.reject(consumer);
                                entry.release(consumer);
                                LOGGER.info("Failed to convert message {} for this consumer because '{}'.  Message skipped.", (Object)entry.getMessage(), (Object)mce.getMessage());
                                break;
                            }
                            default: {
                                throw new ServerScopedRuntimeException("Unrecognised policy " + (Object)((Object)handlingPolicy));
                            }
                        }
                        break block17;
                    }
                    throw new ConnectionScopedRuntimeException(String.format("Unable to convert message %s for this consumer", entry.getMessage()), mce);
                }
                finally {
                    if (messageContainer.getMessageReference() != null) {
                        messageContainer.getMessageReference().release();
                    }
                }
            }
            return true;
        }
        return false;
    }

    @Override
    public final boolean close() {
        if (this._state.compareAndSet(ConsumerTarget.State.OPEN, ConsumerTarget.State.CLOSED)) {
            this.setNotifyWorkDesired(false);
            ArrayList<MessageInstanceConsumer> consumers = new ArrayList<MessageInstanceConsumer>(this._consumers);
            this._consumers.clear();
            for (MessageInstanceConsumer consumer : consumers) {
                consumer.close();
            }
            this.getSession().removeTicker(this._suspendedConsumerLoggingTicker);
            return true;
        }
        return false;
    }

    final boolean setScheduled() {
        return this._scheduled.compareAndSet(false, true);
    }

    final void clearScheduled() {
        this._scheduled.set(false);
    }
}

