/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.ipc;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.protobuf.BlockingService;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
import org.apache.hadoop.hbase.ipc.CallRunner;
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.StringUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.internal.verification.VerificationModeFactory;
import org.mockito.verification.VerificationMode;

public abstract class AbstractTestIPC {
    private static final Log LOG = LogFactory.getLog(AbstractTestIPC.class);
    private static byte[] CELL_BYTES = Bytes.toBytes((String)"xyz");
    private static KeyValue CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, CELL_BYTES);
    static byte[] BIG_CELL_BYTES = new byte[10240];
    static KeyValue BIG_CELL = new KeyValue(CELL_BYTES, CELL_BYTES, CELL_BYTES, BIG_CELL_BYTES);
    static final Configuration CONF = HBaseConfiguration.create();
    static final BlockingService SERVICE = TestRpcServiceProtos.TestProtobufRpcProto.newReflectiveBlockingService(new TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface(){

        @Override
        public TestProtos.EmptyResponseProto ping(RpcController controller, TestProtos.EmptyRequestProto request) throws ServiceException {
            return null;
        }

        @Override
        public TestProtos.EmptyResponseProto error(RpcController controller, TestProtos.EmptyRequestProto request) throws ServiceException {
            return null;
        }

        @Override
        public TestProtos.EchoResponseProto echo(RpcController controller, TestProtos.EchoRequestProto request) throws ServiceException {
            if (controller instanceof PayloadCarryingRpcController) {
                PayloadCarryingRpcController pcrc = (PayloadCarryingRpcController)controller;
                CellScanner cellScanner = pcrc.cellScanner();
                ArrayList<Cell> list = null;
                if (cellScanner != null) {
                    list = new ArrayList<Cell>();
                    try {
                        while (cellScanner.advance()) {
                            list.add(cellScanner.current());
                        }
                    }
                    catch (IOException e) {
                        throw new ServiceException((Throwable)e);
                    }
                }
                cellScanner = CellUtil.createCellScanner(list);
                ((PayloadCarryingRpcController)controller).setCellScanner(cellScanner);
            }
            return TestProtos.EchoResponseProto.newBuilder().setMessage(request.getMessage()).build();
        }
    });

    protected abstract AbstractRpcClient createRpcClientNoCodec(Configuration var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNoCodec() throws InterruptedException, IOException {
        Configuration conf = HBaseConfiguration.create();
        AbstractRpcClient client = this.createRpcClientNoCodec(conf);
        TestRpcServer rpcServer = new TestRpcServer();
        try {
            rpcServer.start();
            Descriptors.MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
            String message = "hello";
            TestProtos.EchoRequestProto param = TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
            InetSocketAddress address = rpcServer.getListenerAddress();
            if (address == null) {
                throw new IOException("Listener channel is closed");
            }
            Pair r = client.call(null, md, (Message)param, (Message)md.getOutputType().toProto(), User.getCurrent(), address);
            Assert.assertTrue((r.getSecond() == null ? 1 : 0) != 0);
            Assert.assertTrue((boolean)((Message)r.getFirst()).toString().contains("hello"));
        }
        finally {
            client.close();
            rpcServer.stop();
        }
    }

    protected abstract AbstractRpcClient createRpcClient(Configuration var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCompressCellBlock() throws IOException, InterruptedException, SecurityException, NoSuchMethodException, ServiceException {
        Configuration conf = new Configuration(HBaseConfiguration.create());
        conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
        ArrayList<KeyValue> cells = new ArrayList<KeyValue>();
        int count = 3;
        for (int i = 0; i < count; ++i) {
            cells.add(CELL);
        }
        AbstractRpcClient client = this.createRpcClient(conf);
        TestRpcServer rpcServer = new TestRpcServer();
        try {
            rpcServer.start();
            Descriptors.MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
            TestProtos.EchoRequestProto param = TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
            PayloadCarryingRpcController pcrc = new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
            InetSocketAddress address = rpcServer.getListenerAddress();
            if (address == null) {
                throw new IOException("Listener channel is closed");
            }
            Pair r = client.call(pcrc, md, (Message)param, (Message)md.getOutputType().toProto(), User.getCurrent(), address);
            int index = 0;
            while (((CellScanner)r.getSecond()).advance()) {
                Assert.assertTrue((boolean)CELL.equals((Object)((CellScanner)r.getSecond()).current()));
                ++index;
            }
            Assert.assertEquals((long)count, (long)index);
        }
        finally {
            client.close();
            rpcServer.stop();
        }
    }

    protected abstract AbstractRpcClient createRpcClientRTEDuringConnectionSetup(Configuration var1) throws IOException;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRTEDuringConnectionSetup() throws Exception {
        Configuration conf = HBaseConfiguration.create();
        TestRpcServer rpcServer = new TestRpcServer();
        AbstractRpcClient client = this.createRpcClientRTEDuringConnectionSetup(conf);
        try {
            rpcServer.start();
            Descriptors.MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
            TestProtos.EchoRequestProto param = TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
            InetSocketAddress address = rpcServer.getListenerAddress();
            if (address == null) {
                throw new IOException("Listener channel is closed");
            }
            client.call(null, md, (Message)param, null, User.getCurrent(), address);
            Assert.fail((String)"Expected an exception to have been thrown!");
        }
        catch (Exception e) {
            LOG.info((Object)("Caught expected exception: " + e.toString()));
            Assert.assertTrue((boolean)StringUtils.stringifyException((Throwable)e).contains("Injected fault"));
        }
        finally {
            client.close();
            rpcServer.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRpcScheduler() throws IOException, InterruptedException {
        RpcScheduler scheduler = (RpcScheduler)Mockito.spy((Object)new FifoRpcScheduler(CONF, 1));
        TestRpcServer rpcServer = new TestRpcServer(scheduler);
        ((RpcScheduler)Mockito.verify((Object)scheduler)).init((RpcScheduler.Context)Matchers.anyObject());
        AbstractRpcClient client = this.createRpcClient(CONF);
        try {
            rpcServer.start();
            ((RpcScheduler)Mockito.verify((Object)scheduler)).start();
            Descriptors.MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
            TestProtos.EchoRequestProto param = TestProtos.EchoRequestProto.newBuilder().setMessage("hello").build();
            InetSocketAddress address = rpcServer.getListenerAddress();
            if (address == null) {
                throw new IOException("Listener channel is closed");
            }
            for (int i = 0; i < 10; ++i) {
                client.call(new PayloadCarryingRpcController(CellUtil.createCellScanner((Iterable)ImmutableList.of((Object)CELL))), md, (Message)param, (Message)md.getOutputType().toProto(), User.getCurrent(), address);
            }
            ((RpcScheduler)Mockito.verify((Object)scheduler, (VerificationMode)VerificationModeFactory.times((int)10))).dispatch((CallRunner)Matchers.anyObject());
        }
        finally {
            rpcServer.stop();
            ((RpcScheduler)Mockito.verify((Object)scheduler)).stop();
        }
    }

    @Test
    public void testWrapException() throws Exception {
        AbstractRpcClient client = (AbstractRpcClient)RpcClientFactory.createClient((Configuration)CONF, (String)"AbstractTestIPC");
        InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0);
        Assert.assertTrue((boolean)(client.wrapException(address, (Exception)new ConnectException()) instanceof ConnectException));
        Assert.assertTrue((boolean)(client.wrapException(address, (Exception)new SocketTimeoutException()) instanceof SocketTimeoutException));
        Assert.assertTrue((boolean)(client.wrapException(address, (Exception)new ConnectionClosingException("Test AbstractRpcClient#wrapException")) instanceof ConnectionClosingException));
        Assert.assertTrue((boolean)(client.wrapException(address, (Exception)new CallTimeoutException("Test AbstractRpcClient#wrapException")).getCause() instanceof CallTimeoutException));
    }

    static class TestRpcServer
    extends RpcServer {
        TestRpcServer() throws IOException {
            this((RpcScheduler)new FifoRpcScheduler(CONF, 1));
        }

        TestRpcServer(RpcScheduler scheduler) throws IOException {
            super(null, "testRpcServer", (List)Lists.newArrayList((Object[])new RpcServer.BlockingServiceAndInterface[]{new RpcServer.BlockingServiceAndInterface(SERVICE, null)}), new InetSocketAddress("localhost", 0), CONF, scheduler);
        }

        public Pair<Message, CellScanner> call(BlockingService service, Descriptors.MethodDescriptor md, Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status) throws IOException {
            return super.call(service, md, param, cellScanner, receiveTime, status);
        }
    }
}

