/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.eureka.cluster;

import com.netflix.appinfo.InstanceInfo;
import com.netflix.discovery.shared.EurekaHttpClient;
import com.netflix.eureka.EurekaServerConfig;
import com.netflix.eureka.PeerAwareInstanceRegistryImpl;
import com.netflix.eureka.cluster.HttpReplicationClient;
import com.netflix.eureka.cluster.InstanceReplicationTask;
import com.netflix.eureka.cluster.ReplicationTask;
import com.netflix.eureka.cluster.protocol.ReplicationInstance;
import com.netflix.eureka.cluster.protocol.ReplicationInstanceResponse;
import com.netflix.eureka.cluster.protocol.ReplicationList;
import com.netflix.eureka.cluster.protocol.ReplicationListResponse;
import com.netflix.eureka.util.batcher.MessageBatcher;
import com.netflix.eureka.util.batcher.MessageProcessor;
import com.netflix.servo.monitor.DynamicCounter;
import java.io.IOException;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReplicationTaskProcessor {
    private static final Logger logger = LoggerFactory.getLogger(ReplicationTaskProcessor.class);
    private final String peerId;
    private final HttpReplicationClient replicationClient;
    private final EurekaServerConfig config;
    private final long retrySleepTimeMs;
    private final long serverUnavailableSleepTime;
    private final MessageBatcher<ReplicationTask> batcher;

    ReplicationTaskProcessor(String peerId, String batcherName, String batchedAction, HttpReplicationClient replicationClient, EurekaServerConfig config, long maxDelay, long retrySleepTimeMs, long serverUnavailableSleepTime) {
        this.peerId = peerId;
        this.replicationClient = replicationClient;
        this.config = config;
        this.retrySleepTimeMs = retrySleepTimeMs;
        this.serverUnavailableSleepTime = serverUnavailableSleepTime;
        String absoluteBatcherName = batcherName + '-' + batchedAction;
        this.batcher = new MessageBatcher<ReplicationTask>(absoluteBatcherName, this.createMessageProcessor(), config.getMaxElementsInPeerReplicationPool(), maxDelay, config.getMinThreadsForPeerReplication(), config.getMaxThreadsForPeerReplication(), config.getMaxIdleThreadAgeInMinutesForPeerReplication() * 60L * 1000L, true);
    }

    private MessageProcessor<ReplicationTask> createMessageProcessor() {
        return new MessageProcessor<ReplicationTask>(){

            @Override
            public void process(List<ReplicationTask> tasks) {
                if (tasks.get(0).isBatchingSupported() && ReplicationTaskProcessor.this.config.shouldBatchReplication()) {
                    ReplicationTaskProcessor.this.executeBatch(tasks);
                } else {
                    ReplicationTaskProcessor.this.executeSingle(tasks);
                }
            }
        };
    }

    public boolean process(ReplicationTask replicationTask) {
        boolean success = this.batcher.process(replicationTask);
        if (!success) {
            logger.error("Cannot find space in the replication pool for peer {}. Check the network connectivity or the traffic", (Object)this.peerId);
        }
        return success;
    }

    public void shutdown() {
        this.batcher.stop();
    }

    private void executeSingle(List<ReplicationTask> tasks) {
        for (ReplicationTask task : tasks) {
            boolean done;
            long lastNetworkErrorTime = 0L;
            do {
                done = true;
                try {
                    if (this.isLate(task)) continue;
                    DynamicCounter.increment((String)("Single_" + task.getAction().name() + "_tries"), (String[])new String[0]);
                    EurekaHttpClient.HttpResponse<?> httpResponse = task.execute();
                    int statusCode = httpResponse.getStatusCode();
                    Object entity = httpResponse.getEntity();
                    if (logger.isDebugEnabled()) {
                        logger.debug("Replication task {} completed with status {}, (includes entity {})", new Object[]{task.getTaskName(), statusCode, entity != null});
                    }
                    if (ReplicationTaskProcessor.isSuccess(statusCode)) {
                        DynamicCounter.increment((String)("Single_" + task.getAction().name() + "_success"), (String[])new String[0]);
                        task.handleSuccess();
                        continue;
                    }
                    DynamicCounter.increment((String)("Single_" + task.getAction().name() + "_failure"), (String[])new String[0]);
                    task.handleFailure(statusCode, entity);
                }
                catch (Throwable e) {
                    if (ReplicationTaskProcessor.isNetworkConnectException(e)) {
                        long now = System.currentTimeMillis();
                        if (now - lastNetworkErrorTime > 10000L) {
                            lastNetworkErrorTime = now;
                            logger.error("Network level connection to peer " + this.peerId + " for task " + task.getTaskName() + "; retrying after delay", e);
                        }
                        try {
                            Thread.sleep(this.retrySleepTimeMs);
                        }
                        catch (InterruptedException ignore) {
                            // empty catch block
                        }
                        DynamicCounter.increment((String)(task.getAction().name() + "_retries"), (String[])new String[0]);
                        done = false;
                        continue;
                    }
                    logger.error(this.peerId + ": " + task.getTaskName() + "Not re-trying this exception because it does not seem to be a network exception", e);
                }
            } while (!done);
        }
    }

    private void executeBatch(List<ReplicationTask> tasks) {
        boolean done;
        ReplicationList list = this.createReplicationListOf(tasks);
        if (list.getReplicationList().isEmpty()) {
            return;
        }
        PeerAwareInstanceRegistryImpl.Action action = list.getReplicationList().get(0).getAction();
        DynamicCounter.increment((String)("Batch_" + (Object)((Object)action) + "_tries"), (String[])new String[0]);
        long lastNetworkErrorTime = 0L;
        do {
            done = true;
            try {
                EurekaHttpClient.HttpResponse<ReplicationListResponse> response = this.replicationClient.submitBatchUpdates(list);
                int statusCode = response.getStatusCode();
                if (!ReplicationTaskProcessor.isSuccess(statusCode)) {
                    if (statusCode == 503) {
                        logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", (Object)this.peerId);
                        this.rescheduleAfterDelay(tasks);
                    } else {
                        logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", (Object)statusCode, (Object)tasks.size());
                    }
                    return;
                }
                DynamicCounter.increment((String)("Batch_" + (Object)((Object)action) + "_success"), (String[])new String[0]);
                this.handleBatchResponse(tasks, ((ReplicationListResponse)response.getEntity()).getResponseList());
            }
            catch (Throwable e) {
                if (ReplicationTaskProcessor.isNetworkConnectException(e)) {
                    long now = System.currentTimeMillis();
                    if (now - lastNetworkErrorTime > 10000L) {
                        lastNetworkErrorTime = now;
                        logger.error("Network level connection to peer " + this.peerId + "; retrying after delay", e);
                    }
                    try {
                        Thread.sleep(this.retrySleepTimeMs);
                    }
                    catch (InterruptedException ignore) {
                        // empty catch block
                    }
                    done = false;
                    DynamicCounter.increment((String)("Batch_" + (Object)((Object)action) + "_retries"), (String[])new String[0]);
                    continue;
                }
                logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
            }
        } while (!done);
    }

    private void handleBatchResponse(List<ReplicationTask> tasks, List<ReplicationInstanceResponse> responseList) {
        if (tasks.size() != responseList.size()) {
            logger.error("Batch response size different from submitted task list ({} != {}); skipping response analysis", (Object)responseList.size(), (Object)tasks.size());
            return;
        }
        for (int i = 0; i < tasks.size(); ++i) {
            this.handleBatchResponse(tasks.get(i), responseList.get(i));
        }
    }

    private void handleBatchResponse(ReplicationTask task, ReplicationInstanceResponse response) {
        int statusCode = response.getStatusCode();
        if (ReplicationTaskProcessor.isSuccess(statusCode)) {
            task.handleSuccess();
            return;
        }
        try {
            task.handleFailure(response.getStatusCode(), response.getResponseEntity());
        }
        catch (Throwable e) {
            logger.error("Replication task " + task.getTaskName() + " error handler failure", e);
        }
    }

    private void rescheduleAfterDelay(List<ReplicationTask> tasks) {
        try {
            Thread.sleep(this.serverUnavailableSleepTime);
        }
        catch (InterruptedException e) {
            // empty catch block
        }
        for (ReplicationTask task : tasks) {
            if (this.isLate(task)) continue;
            this.process(task);
        }
    }

    private ReplicationList createReplicationListOf(List<ReplicationTask> tasks) {
        ReplicationList list = new ReplicationList();
        for (ReplicationTask task : tasks) {
            if (this.isLate(task)) continue;
            list.addReplicationInstance(ReplicationTaskProcessor.createReplicationInstanceOf((InstanceReplicationTask)task));
        }
        return list;
    }

    private boolean isLate(ReplicationTask task) {
        boolean late;
        long now = System.currentTimeMillis();
        boolean bl = late = now - task.getSubmitTime() > (long)this.config.getMaxTimeForReplication();
        if (late) {
            DynamicCounter.increment((String)("Replication_" + task.getAction().name() + "_expiry"), (String[])new String[0]);
            logger.warn("Replication task {} older than the threshold (submit time {}", (Object)task.getTaskName(), (Object)task.getSubmitTime());
            task.cancel();
        }
        return late;
    }

    private static boolean isSuccess(int statusCode) {
        return statusCode >= 200 && statusCode < 300;
    }

    private static boolean isNetworkConnectException(Throwable e) {
        do {
            if (!IOException.class.isInstance(e)) continue;
            return true;
        } while ((e = e.getCause()) != null);
        return false;
    }

    private static ReplicationInstance createReplicationInstanceOf(InstanceReplicationTask task) {
        ReplicationInstance.ReplicationInstanceBuilder instanceBuilder = ReplicationInstance.ReplicationInstanceBuilder.aReplicationInstance();
        instanceBuilder.withAppName(task.getAppName());
        instanceBuilder.withId(task.getId());
        InstanceInfo instanceInfo = task.getInstanceInfo();
        if (instanceInfo != null) {
            String overriddenStatus = task.getOverriddenStatus() == null ? null : task.getOverriddenStatus().name();
            instanceBuilder.withOverriddenStatus(overriddenStatus);
            instanceBuilder.withLastDirtyTimestamp(instanceInfo.getLastDirtyTimestamp());
            if (task.shouldReplicateInstanceInfo()) {
                instanceBuilder.withInstanceInfo(instanceInfo);
            }
            String instanceStatus = instanceInfo.getStatus() == null ? null : instanceInfo.getStatus().name();
            instanceBuilder.withStatus(instanceStatus);
        }
        instanceBuilder.withAction(task.getAction());
        return instanceBuilder.build();
    }
}

