/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.jdbc.datasource.connections.xa;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import javax.annotation.concurrent.NotThreadSafe;
import javax.sql.XAConnection;
import javax.sql.XADataSource;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.jdbc.datasource.connections.xa.XaCommand;
import org.apache.flink.connector.jdbc.datasource.connections.xa.XaConnectionProvider;
import org.apache.flink.connector.jdbc.datasource.transactions.xa.exceptions.EmptyTransactionXaException;
import org.apache.flink.connector.jdbc.datasource.transactions.xa.exceptions.XaError;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
@Internal
public class SimpleXaConnectionProvider
implements XaConnectionProvider {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(SimpleXaConnectionProvider.class);
    private static final int MAX_RECOVER_CALLS = 100;
    private final Supplier<XADataSource> dataSourceSupplier;
    private final Integer timeoutSec;
    private transient XAResource xaResource;
    private transient Connection connection;
    private transient XAConnection xaConnection;

    @VisibleForTesting
    public static SimpleXaConnectionProvider from(XADataSource ds) {
        return SimpleXaConnectionProvider.from(ds, null);
    }

    public static SimpleXaConnectionProvider from(XADataSource ds, Integer timeoutSec) {
        return SimpleXaConnectionProvider.from(() -> ds, timeoutSec);
    }

    public static SimpleXaConnectionProvider from(Supplier<XADataSource> dsSupplier, Integer timeoutSec) {
        return new SimpleXaConnectionProvider(dsSupplier, timeoutSec);
    }

    private SimpleXaConnectionProvider(Supplier<XADataSource> dataSourceSupplier, Integer timeoutSec) {
        this.dataSourceSupplier = (Supplier)Preconditions.checkNotNull(dataSourceSupplier);
        this.timeoutSec = timeoutSec;
    }

    @Override
    public void open() throws SQLException {
        Preconditions.checkState((!this.isOpen() ? 1 : 0) != 0, (Object)"already connected");
        XADataSource ds = this.dataSourceSupplier.get();
        this.xaConnection = ds.getXAConnection();
        this.xaResource = this.xaConnection.getXAResource();
        if (this.timeoutSec != null) {
            try {
                this.xaResource.setTransactionTimeout(this.timeoutSec);
            }
            catch (XAException e) {
                throw new SQLException(e);
            }
        }
        this.connection = this.xaConnection.getConnection();
        this.connection.setReadOnly(false);
        this.connection.setAutoCommit(false);
        Preconditions.checkState((!this.connection.getAutoCommit() ? 1 : 0) != 0);
    }

    @Override
    public void close() throws SQLException {
        if (this.connection != null) {
            this.connection.close();
            this.connection = null;
        }
        try {
            this.xaConnection.close();
        }
        catch (SQLException e) {
            LOG.warn("unable to close XA connection", (Throwable)e);
        }
        this.xaResource = null;
    }

    @Override
    public Connection getConnection() {
        Preconditions.checkNotNull((Object)this.connection);
        return this.connection;
    }

    @Override
    public boolean isOpen() {
        return this.xaResource != null;
    }

    @Override
    public boolean isConnectionValid() throws SQLException {
        return this.isOpen() && this.connection.isValid(this.connection.getNetworkTimeout());
    }

    @Override
    public Connection getOrEstablishConnection() throws SQLException {
        if (!this.isOpen()) {
            this.open();
        }
        return this.connection;
    }

    @Override
    public void closeConnection() {
        try {
            this.close();
        }
        catch (SQLException e) {
            LOG.warn("Connection close failed.", (Throwable)e);
        }
    }

    @Override
    public Connection reestablishConnection() {
        throw new UnsupportedOperationException();
    }

    @Override
    public void start(Xid xid) {
        this.execute(XaCommand.fromRunnable("start", xid, (ThrowingRunnable<XAException>)((ThrowingRunnable)() -> this.xaResource.start(xid, 0))));
    }

    @Override
    public void endAndPrepare(Xid xid) {
        this.execute(XaCommand.fromRunnable("end", xid, (ThrowingRunnable<XAException>)((ThrowingRunnable)() -> this.xaResource.end(xid, 0x4000000))));
        int prepResult = this.execute(new XaCommand<Integer>("prepare", xid, () -> this.xaResource.prepare(xid)));
        if (prepResult == 3) {
            throw new EmptyTransactionXaException(xid);
        }
        if (prepResult != 0) {
            throw new FlinkRuntimeException(SimpleXaConnectionProvider.formatErrorMessage("prepare", xid, "response: " + prepResult));
        }
    }

    @Override
    public void failAndRollback(Xid xid) {
        this.execute(XaCommand.fromRunnable("end (fail)", xid, (ThrowingRunnable<XAException>)((ThrowingRunnable)() -> {
            this.xaResource.end(xid, 0x20000000);
            this.xaResource.rollback(xid);
        }), err -> {
            if (err.errorCode >= 100) {
                this.rollback(xid);
            } else {
                LOG.warn(SimpleXaConnectionProvider.formatErrorMessage("end (fail)", xid, err.errorCode, new String[0]));
            }
        }));
    }

    @Override
    public void commit(Xid xid, boolean ignoreUnknown) {
        this.execute(XaCommand.fromRunnableRecoverByWarn("commit", xid, (ThrowingRunnable<XAException>)((ThrowingRunnable)() -> this.xaResource.commit(xid, false)), e -> this.buildCommitErrorDesc((XAException)e, ignoreUnknown)));
    }

    @Override
    public void rollback(Xid xid) {
        this.execute(XaCommand.fromRunnableRecoverByWarn("rollback", xid, (ThrowingRunnable<XAException>)((ThrowingRunnable)() -> this.xaResource.rollback(xid)), this::buildRollbackErrorDesc));
    }

    private void forget(Xid xid) {
        this.execute(XaCommand.fromRunnableRecoverByWarn("forget", xid, (ThrowingRunnable<XAException>)((ThrowingRunnable)() -> this.xaResource.forget(xid)), e -> Optional.of("manual cleanup may be required")));
    }

    @Override
    public Collection<Xid> recover() {
        return this.execute(new XaCommand<List>("recover", null, () -> {
            List<Xid> list = this.recover(0x1000000);
            try {
                int i = 0;
                while (list.addAll(this.recover(0))) {
                    Preconditions.checkState((i < 100 ? 1 : 0) != 0, (Object)"too many xa_recover() calls");
                    ++i;
                }
            }
            finally {
                this.recover(0x800000);
            }
            return list;
        }));
    }

    private List<Xid> recover(int flags) throws XAException {
        return Arrays.asList(this.xaResource.recover(flags));
    }

    private <T> T execute(XaCommand<T> cmd) throws FlinkRuntimeException {
        Preconditions.checkState((boolean)this.isOpen(), (Object)"not connected");
        try {
            return cmd.execute();
        }
        catch (XAException e) {
            if (XaError.isHeurErrorCode(e.errorCode)) {
                cmd.getXid().ifPresent(this::forget);
            }
            return cmd.recover(e);
        }
        catch (FlinkRuntimeException e) {
            throw e;
        }
        catch (Exception e) {
            throw SimpleXaConnectionProvider.wrapException(cmd.getName(), cmd.getXid().orElse(null), e);
        }
    }

    private static FlinkRuntimeException wrapException(String action, Xid xid, Exception ex) {
        return XaError.wrapException(action, xid, ex);
    }

    private Optional<String> buildCommitErrorDesc(XAException err, boolean ignoreUnknown) {
        return XaError.buildCommitErrorDesc(err, ignoreUnknown);
    }

    private Optional<String> buildRollbackErrorDesc(XAException err) {
        return XaError.buildRollbackErrorDesc(err);
    }

    private static String formatErrorMessage(String action, Xid xid, String ... more) {
        return SimpleXaConnectionProvider.formatErrorMessage(action, xid, null, more);
    }

    private static String formatErrorMessage(String action, Xid xid, Integer errorCode, String ... more) {
        return XaError.errorMessage(action, xid, errorCode, more);
    }
}

