/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.daemon.drpc;

import com.codahale.metrics.Meter;
import com.google.common.annotations.VisibleForTesting;
import java.security.Principal;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.storm.daemon.StormCommon;
import org.apache.storm.daemon.drpc.BlockingOutstandingRequest;
import org.apache.storm.daemon.drpc.OutstandingRequest;
import org.apache.storm.daemon.drpc.RequestFactory;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.DRPCExceptionType;
import org.apache.storm.generated.DRPCExecutionException;
import org.apache.storm.generated.DRPCRequest;
import org.apache.storm.logging.ThriftAccessLogger;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.security.auth.IAuthorizer;
import org.apache.storm.security.auth.ReqContext;
import org.apache.storm.utils.ObjectReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DRPC
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(DRPC.class);
    private static final DRPCRequest NOTHING_REQUEST = new DRPCRequest("", "");
    private static final DRPCExecutionException TIMED_OUT = new DRPCExecutionException("Timed Out");
    private static final DRPCExecutionException SHUT_DOWN = new DRPCExecutionException("Server Shutting Down");
    private static final DRPCExecutionException DEFAULT_FAILED = new DRPCExecutionException("Request failed");
    private static final Meter meterServerTimedOut;
    private static final Meter meterExecuteCalls;
    private static final Meter meterResultCalls;
    private static final Meter meterFailRequestCalls;
    private static final Meter meterFetchRequestCalls;
    private final ConcurrentHashMap<String, ConcurrentLinkedQueue<OutstandingRequest>> _queues = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, OutstandingRequest> _requests = new ConcurrentHashMap();
    private final Timer _timer = new Timer();
    private final AtomicLong _ctr = new AtomicLong(0L);
    private final IAuthorizer _auth;

    private static IAuthorizer mkAuthorizationHandler(String klassname, Map<String, Object> conf) {
        try {
            return StormCommon.mkAuthorizationHandler((String)klassname, conf);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @VisibleForTesting
    static void checkAuthorization(ReqContext reqContext, IAuthorizer auth, String operation, String function) throws AuthorizationException {
        if (reqContext != null) {
            ThriftAccessLogger.logAccessFunction(reqContext.requestID(), reqContext.remoteAddress(), reqContext.principal(), operation, function);
        }
        if (auth != null) {
            HashMap<String, String> map = new HashMap<String, String>();
            map.put("function.name", function);
            if (!auth.permit(reqContext, operation, map)) {
                Principal principal = reqContext.principal();
                String user = principal != null ? principal.getName() : "unknown";
                throw new AuthorizationException("DRPC request '" + operation + "' for '" + user + "' user is not authorized");
            }
        }
    }

    public DRPC(Map<String, Object> conf) {
        this(DRPC.mkAuthorizationHandler((String)conf.get("drpc.authorizer"), conf), ObjectReader.getInt((Object)conf.get("drpc.request.timeout.secs"), (Integer)600) * 1000);
    }

    public DRPC(IAuthorizer auth, final long timeoutMs) {
        this._auth = auth;
        this._timer.scheduleAtFixedRate(new TimerTask(){

            @Override
            public void run() {
                DRPC.this.cleanupAll(timeoutMs, TIMED_OUT);
            }
        }, timeoutMs / 2L, timeoutMs / 2L);
    }

    private void checkAuthorization(String operation, String function) throws AuthorizationException {
        DRPC.checkAuthorization(ReqContext.context(), this._auth, operation, function);
    }

    private void cleanup(String id) {
        OutstandingRequest req = this._requests.remove(id);
        if (req != null && !req.wasFetched()) {
            this._queues.get(req.getFunction()).remove(req);
        }
    }

    private void cleanupAll(long timeoutMs, DRPCExecutionException exp) {
        for (Map.Entry<String, OutstandingRequest> e : this._requests.entrySet()) {
            OutstandingRequest req = e.getValue();
            if (!req.isTimedOut(timeoutMs)) continue;
            req.fail(exp);
            this.cleanup(e.getKey());
            meterServerTimedOut.mark();
        }
    }

    private String nextId() {
        return String.valueOf(this._ctr.incrementAndGet());
    }

    private ConcurrentLinkedQueue<OutstandingRequest> getQueue(String function) {
        ConcurrentLinkedQueue<OutstandingRequest> queue = this._queues.get(function);
        if (queue == null) {
            this._queues.putIfAbsent(function, new ConcurrentLinkedQueue());
            queue = this._queues.get(function);
        }
        return queue;
    }

    public void returnResult(String id, String result) throws AuthorizationException {
        meterResultCalls.mark();
        LOG.debug("Got a result {} {}", (Object)id, (Object)result);
        OutstandingRequest req = this._requests.get(id);
        if (req != null) {
            this.checkAuthorization("result", req.getFunction());
            req.returnResult(result);
        }
    }

    public DRPCRequest fetchRequest(String functionName) throws AuthorizationException {
        meterFetchRequestCalls.mark();
        this.checkAuthorization("fetchRequest", functionName);
        ConcurrentLinkedQueue<OutstandingRequest> q = this.getQueue(functionName);
        OutstandingRequest req = q.poll();
        if (req != null) {
            req.fetched();
            DRPCRequest ret = req.getRequest();
            return ret;
        }
        return NOTHING_REQUEST;
    }

    public void failRequest(String id, DRPCExecutionException e) throws AuthorizationException {
        meterFailRequestCalls.mark();
        LOG.debug("Got a fail {}", (Object)id);
        OutstandingRequest req = this._requests.get(id);
        if (req != null) {
            this.checkAuthorization("failRequest", req.getFunction());
            if (e == null) {
                e = DEFAULT_FAILED;
            }
            req.fail(e);
        }
    }

    public <T extends OutstandingRequest> T execute(String functionName, String funcArgs, RequestFactory<T> factory) throws AuthorizationException {
        meterExecuteCalls.mark();
        this.checkAuthorization("execute", functionName);
        String id = this.nextId();
        LOG.debug("Execute {} {}", (Object)functionName, (Object)funcArgs);
        T req = factory.mkRequest(functionName, new DRPCRequest(funcArgs, id));
        this._requests.put(id, (OutstandingRequest)req);
        ConcurrentLinkedQueue<OutstandingRequest> q = this.getQueue(functionName);
        q.add((OutstandingRequest)req);
        return req;
    }

    public String executeBlocking(String functionName, String funcArgs) throws DRPCExecutionException, AuthorizationException {
        BlockingOutstandingRequest req = this.execute(functionName, funcArgs, BlockingOutstandingRequest.FACTORY);
        try {
            LOG.debug("Waiting for result {} {}", (Object)functionName, (Object)funcArgs);
            String string = req.getResult();
            return string;
        }
        catch (DRPCExecutionException e) {
            throw e;
        }
        finally {
            this.cleanup(req.getRequest().get_request_id());
        }
    }

    @Override
    public void close() {
        this._timer.cancel();
        this.cleanupAll(0L, SHUT_DOWN);
    }

    static {
        TIMED_OUT.set_type(DRPCExceptionType.SERVER_TIMEOUT);
        SHUT_DOWN.set_type(DRPCExceptionType.SERVER_SHUTDOWN);
        DEFAULT_FAILED.set_type(DRPCExceptionType.FAILED_REQUEST);
        meterServerTimedOut = StormMetricsRegistry.registerMeter("drpc:num-server-timedout-requests");
        meterExecuteCalls = StormMetricsRegistry.registerMeter("drpc:num-execute-calls");
        meterResultCalls = StormMetricsRegistry.registerMeter("drpc:num-result-calls");
        meterFailRequestCalls = StormMetricsRegistry.registerMeter("drpc:num-failRequest-calls");
        meterFetchRequestCalls = StormMetricsRegistry.registerMeter("drpc:num-fetchRequest-calls");
    }
}

