/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.transport.amqp.protocol;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.transport.amqp.AmqpProtocolConverter;
import org.apache.activemq.transport.amqp.AmqpSupport;
import org.apache.activemq.transport.amqp.protocol.AmqpAbstractReceiver;
import org.apache.activemq.transport.amqp.protocol.AmqpSession;
import org.apache.activemq.transport.amqp.protocol.ResponseHandler;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transaction.Declare;
import org.apache.qpid.proton.amqp.transaction.Declared;
import org.apache.qpid.proton.amqp.transaction.Discharge;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import org.fusesource.hawtbuf.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AmqpTransactionCoordinator
extends AmqpAbstractReceiver {
    private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionCoordinator.class);
    private final Set<AmqpSession> txSessions = new HashSet<AmqpSession>();

    public AmqpTransactionCoordinator(AmqpSession session, Receiver endpoint) {
        super(session, endpoint);
    }

    @Override
    protected void processDelivery(final Delivery delivery, Buffer deliveryBytes) throws Exception {
        int decoded;
        Message message = Proton.message();
        int offset = deliveryBytes.offset;
        for (int len = deliveryBytes.length; len > 0; len -= decoded) {
            decoded = message.decode(deliveryBytes.data, offset, len);
            assert (decoded > 0) : "Make progress decoding the message";
            offset += decoded;
        }
        final AmqpSession session = (AmqpSession)((Receiver)this.getEndpoint()).getSession().getContext();
        ConnectionId connectionId = session.getConnection().getConnectionId();
        final Object action = ((AmqpValue)message.getBody()).getValue();
        LOG.debug("COORDINATOR received: {}, [{}]", action, (Object)deliveryBytes);
        if (action instanceof Declare) {
            Declare declare = (Declare)action;
            if (declare.getGlobalId() != null) {
                throw new Exception("don't know how to handle a declare /w a set GlobalId");
            }
            LocalTransactionId txId = session.getConnection().getNextTransactionId();
            TransactionInfo txInfo = new TransactionInfo(connectionId, (TransactionId)txId, 0);
            session.getConnection().registerTransaction((TransactionId)txId, this);
            this.sendToActiveMQ((Command)txInfo, null);
            LOG.trace("started transaction {}", (Object)txId);
            Declared declared = new Declared();
            declared.setTxnId(new Binary(AmqpSupport.toBytes(txId.getValue())));
            delivery.disposition((DeliveryState)declared);
            delivery.settle();
        } else if (action instanceof Discharge) {
            byte operation;
            Discharge discharge = (Discharge)action;
            LocalTransactionId txId = new LocalTransactionId(connectionId, AmqpSupport.toLong(discharge.getTxnId()));
            if (discharge.getFail().booleanValue()) {
                LOG.trace("rollback transaction {}", (Object)txId);
                operation = 4;
            } else {
                LOG.trace("commit transaction {}", (Object)txId);
                operation = 2;
            }
            for (AmqpSession txSession : this.txSessions) {
                if (operation == 4) {
                    txSession.rollback(txId);
                    continue;
                }
                txSession.commit(txId);
            }
            this.txSessions.clear();
            session.getConnection().unregisterTransaction((TransactionId)txId);
            TransactionInfo txinfo = new TransactionInfo(connectionId, (TransactionId)txId, operation);
            this.sendToActiveMQ((Command)txinfo, new ResponseHandler(){

                @Override
                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
                    if (response.isException()) {
                        ExceptionResponse er = (ExceptionResponse)response;
                        Rejected rejected = new Rejected();
                        rejected.setError(new ErrorCondition(Symbol.valueOf((String)"failed"), er.getException().getMessage()));
                        delivery.disposition((DeliveryState)rejected);
                    } else {
                        delivery.disposition((DeliveryState)Accepted.getInstance());
                    }
                    LOG.debug("TX: {} settling {}", (Object)operation, action);
                    delivery.settle();
                    session.pumpProtonToSocket();
                }
            });
            if (operation == 4) {
                session.flushPendingMessages();
            }
        } else {
            throw new Exception("Expected coordinator message type: " + action.getClass());
        }
        this.replenishCredit();
    }

    private void replenishCredit() {
        if ((double)((Receiver)this.getEndpoint()).getCredit() <= (double)this.getConfiguredReceiverCredit() * 0.2) {
            LOG.debug("Sending more credit ({}) to transaction coordinator on session {}", (Object)(this.getConfiguredReceiverCredit() - ((Receiver)this.getEndpoint()).getCredit()), (Object)this.session.getSessionId());
            ((Receiver)this.getEndpoint()).flow(this.getConfiguredReceiverCredit() - ((Receiver)this.getEndpoint()).getCredit());
            this.session.pumpProtonToSocket();
        }
    }

    @Override
    public ActiveMQDestination getDestination() {
        return null;
    }

    @Override
    public void setDestination(ActiveMQDestination destination) {
    }

    public void enlist(AmqpSession session) {
        this.txSessions.add(session);
    }
}

