/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.client;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Cancellable;
import org.apache.hadoop.hbase.client.ClientScanner;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultBoundedCompletionService;
import org.apache.hadoop.hbase.client.RetryingCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ScannerCallable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;

@InterfaceAudience.Private
class ScannerCallableWithReplicas
implements RetryingCallable<Result[]> {
    private static final Log LOG = LogFactory.getLog(ScannerCallableWithReplicas.class);
    volatile ScannerCallable currentScannerCallable;
    AtomicBoolean replicaSwitched = new AtomicBoolean(false);
    final ClusterConnection cConnection;
    protected final ExecutorService pool;
    protected final int timeBeforeReplicas;
    private final Scan scan;
    private final int retries;
    private Result lastResult;
    private final RpcRetryingCaller<Result[]> caller;
    private final TableName tableName;
    private Configuration conf;
    private int scannerTimeout;
    private Set<ScannerCallable> outstandingCallables = new HashSet<ScannerCallable>();
    private boolean someRPCcancelled = false;

    public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection, ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan, int retries, int scannerTimeout, int caching, Configuration conf, RpcRetryingCaller<Result[]> caller) {
        this.currentScannerCallable = baseCallable;
        this.cConnection = cConnection;
        this.pool = pool;
        if (timeBeforeReplicas < 0) {
            throw new IllegalArgumentException("Invalid value of operation timeout on the primary");
        }
        this.timeBeforeReplicas = timeBeforeReplicas;
        this.scan = scan;
        this.retries = retries;
        this.tableName = tableName;
        this.conf = conf;
        this.scannerTimeout = scannerTimeout;
        this.caller = caller;
    }

    public void setClose() {
        this.currentScannerCallable.setClose();
    }

    public void setRenew(boolean val) {
        this.currentScannerCallable.setRenew(val);
    }

    public void setCaching(int caching) {
        this.currentScannerCallable.setCaching(caching);
    }

    public int getCaching() {
        return this.currentScannerCallable.getCaching();
    }

    public HRegionInfo getHRegionInfo() {
        return this.currentScannerCallable.getHRegionInfo();
    }

    public boolean getServerHasMoreResults() {
        return this.currentScannerCallable.getServerHasMoreResults();
    }

    public void setServerHasMoreResults(boolean serverHasMoreResults) {
        this.currentScannerCallable.setServerHasMoreResults(serverHasMoreResults);
    }

    public boolean hasMoreResultsContext() {
        return this.currentScannerCallable.hasMoreResultsContext();
    }

    public void setHasMoreResultsContext(boolean serverHasMoreResultsContext) {
        this.currentScannerCallable.setHasMoreResultsContext(serverHasMoreResultsContext);
    }

    @Override
    public Result[] call(int timeout) throws IOException {
        ResultBoundedCompletionService.QueueingFuture<Pair<Result[], ScannerCallable>> f;
        if (this.currentScannerCallable != null && this.currentScannerCallable.closed) {
            if (LOG.isTraceEnabled()) {
                LOG.trace((Object)("Closing scanner id=" + this.currentScannerCallable.scannerId));
            }
            Result[] r = this.currentScannerCallable.call(timeout);
            this.currentScannerCallable = null;
            return r;
        }
        RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(true, 0, this.cConnection, this.tableName, this.currentScannerCallable.getRow());
        ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs = new ResultBoundedCompletionService<Pair<Result[], ScannerCallable>>(RpcRetryingCallerFactory.instantiate(this.conf), this.pool, rl.size() * 5);
        AtomicBoolean done = new AtomicBoolean(false);
        this.replicaSwitched.set(false);
        this.addCallsForCurrentReplica(cs, rl);
        try {
            f = cs.poll(this.timeBeforeReplicas, TimeUnit.MICROSECONDS);
            if (f != null) {
                Pair r = (Pair)f.get(timeout, TimeUnit.MILLISECONDS);
                if (r != null && r.getSecond() != null) {
                    this.updateCurrentlyServingReplica((ScannerCallable)r.getSecond(), (Result[])r.getFirst(), done, this.pool);
                }
                return r == null ? null : (Result[])r.getFirst();
            }
        }
        catch (ExecutionException e) {
            RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, this.retries);
        }
        catch (CancellationException e) {
            throw new InterruptedIOException(e.getMessage());
        }
        catch (InterruptedException e) {
            throw new InterruptedIOException(e.getMessage());
        }
        catch (TimeoutException e) {
            throw new InterruptedIOException(e.getMessage());
        }
        this.addCallsForOtherReplicas(cs, rl, 0, rl.size() - 1);
        try {
            f = cs.poll(timeout, TimeUnit.MILLISECONDS);
            if (f != null) {
                Pair r = (Pair)f.get(timeout, TimeUnit.MILLISECONDS);
                if (r != null && r.getSecond() != null) {
                    this.updateCurrentlyServingReplica((ScannerCallable)r.getSecond(), (Result[])r.getFirst(), done, this.pool);
                }
                Result[] resultArray = r == null ? null : (Result[])r.getFirst();
                return resultArray;
            }
        }
        catch (ExecutionException e) {
            RpcRetryingCallerWithReadReplicas.throwEnrichedException(e, this.retries);
        }
        catch (CancellationException e) {
            throw new InterruptedIOException(e.getMessage());
        }
        catch (InterruptedException e) {
            throw new InterruptedIOException(e.getMessage());
        }
        catch (TimeoutException e) {
            throw new InterruptedIOException(e.getMessage());
        }
        finally {
            cs.cancelAll();
        }
        return null;
    }

    private void updateCurrentlyServingReplica(ScannerCallable scanner, Result[] result, AtomicBoolean done, ExecutorService pool) {
        if (done.compareAndSet(false, true)) {
            if (this.currentScannerCallable != scanner) {
                this.replicaSwitched.set(true);
            }
            this.currentScannerCallable = scanner;
            if (result != null && result.length != 0) {
                this.lastResult = result[result.length - 1];
            }
            if (LOG.isTraceEnabled()) {
                LOG.trace((Object)("Setting current scanner as id=" + this.currentScannerCallable.scannerId + " associated with replica=" + this.currentScannerCallable.getHRegionInfo().getReplicaId()));
            }
            this.outstandingCallables.remove(scanner);
            for (ScannerCallable s : this.outstandingCallables) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace((Object)("Closing scanner id=" + s.scannerId + ", replica=" + s.getHRegionInfo().getRegionId() + " because slow and replica=" + this.currentScannerCallable.getHRegionInfo().getReplicaId() + " succeeded"));
                }
                s.setClose();
                final RetryingRPC r = new RetryingRPC(s);
                pool.submit(new Callable<Void>(){

                    @Override
                    public Void call() throws Exception {
                        r.call(ScannerCallableWithReplicas.this.scannerTimeout);
                        return null;
                    }
                });
            }
            this.outstandingCallables.clear();
        }
    }

    public boolean switchedToADifferentReplica() {
        return this.replicaSwitched.get();
    }

    public boolean isHeartbeatMessage() {
        return this.currentScannerCallable != null && this.currentScannerCallable.isHeartbeatMessage();
    }

    private void addCallsForCurrentReplica(ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
        RetryingRPC retryingOnReplica = new RetryingRPC(this.currentScannerCallable);
        this.outstandingCallables.add(this.currentScannerCallable);
        cs.submit(retryingOnReplica, this.scannerTimeout, this.currentScannerCallable.id);
    }

    private void addCallsForOtherReplicas(ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl, int min, int max) {
        if (this.scan.getConsistency() == Consistency.STRONG) {
            return;
        }
        for (int id = min; id <= max; ++id) {
            if (this.currentScannerCallable.id == id) continue;
            ScannerCallable s = this.currentScannerCallable.getScannerCallableForReplica(id);
            this.setStartRowForReplicaCallable(s);
            this.outstandingCallables.add(s);
            RetryingRPC retryingOnReplica = new RetryingRPC(s);
            cs.submit(retryingOnReplica, this.scannerTimeout, id);
        }
    }

    private void setStartRowForReplicaCallable(ScannerCallable callable) {
        if (this.lastResult == null || callable == null) {
            return;
        }
        if (this.lastResult.isPartial()) {
            callable.getScan().setStartRow(this.lastResult.getRow());
        } else if (callable.getScan().isReversed()) {
            callable.getScan().setStartRow(ClientScanner.createClosestRowBefore(this.lastResult.getRow()));
        } else {
            callable.getScan().setStartRow(Bytes.add((byte[])this.lastResult.getRow(), (byte[])new byte[1]));
        }
    }

    @VisibleForTesting
    boolean isAnyRPCcancelled() {
        return this.someRPCcancelled;
    }

    @Override
    public void prepare(boolean reload) throws IOException {
    }

    @Override
    public void throwable(Throwable t, boolean retrying) {
        this.currentScannerCallable.throwable(t, retrying);
    }

    @Override
    public String getExceptionMessageAdditionalDetail() {
        return this.currentScannerCallable.getExceptionMessageAdditionalDetail();
    }

    @Override
    public long sleep(long pause, int tries) {
        return this.currentScannerCallable.sleep(pause, tries);
    }

    class RetryingRPC
    implements RetryingCallable<Pair<Result[], ScannerCallable>>,
    Cancellable {
        final ScannerCallable callable;
        RpcRetryingCaller<Result[]> caller;
        private volatile boolean cancelled = false;

        RetryingRPC(ScannerCallable callable) {
            this.callable = callable;
            this.caller = ScannerCallableWithReplicas.this.caller;
            if (ScannerCallableWithReplicas.this.scan.getConsistency() == Consistency.TIMELINE) {
                this.caller = RpcRetryingCallerFactory.instantiate(ScannerCallableWithReplicas.this.conf).newCaller();
            }
        }

        @Override
        public Pair<Result[], ScannerCallable> call(int callTimeout) throws IOException {
            if (this.cancelled) {
                return null;
            }
            Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout);
            return new Pair((Object)res, (Object)this.callable);
        }

        @Override
        public void prepare(boolean reload) throws IOException {
            if (this.cancelled) {
                return;
            }
            if (Thread.interrupted()) {
                throw new InterruptedIOException();
            }
            this.callable.prepare(reload);
        }

        @Override
        public void throwable(Throwable t, boolean retrying) {
            this.callable.throwable(t, retrying);
        }

        @Override
        public String getExceptionMessageAdditionalDetail() {
            return this.callable.getExceptionMessageAdditionalDetail();
        }

        @Override
        public long sleep(long pause, int tries) {
            return this.callable.sleep(pause, tries);
        }

        @Override
        public void cancel() {
            this.cancelled = true;
            this.caller.cancel();
            if (this.callable.getController() != null) {
                this.callable.getController().startCancel();
            }
            ScannerCallableWithReplicas.this.someRPCcancelled = true;
        }

        @Override
        public boolean isCancelled() {
            return this.cancelled;
        }
    }
}

