/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode;

import com.google.protobuf.BlockingService;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;

public class JournalService
implements JournalProtocol {
    public static final Log LOG = LogFactory.getLog((String)JournalService.class.getName());
    private final JournalListener listener;
    private final boolean internalRpcServer;
    private final InetSocketAddress nnAddress;
    private final NamenodeRegistration registration;
    private final NamenodeProtocol namenode;
    private volatile State state = State.INIT;
    private RPC.Server rpcServer;

    JournalService(Configuration conf, InetSocketAddress nnAddr, JournalListener listener, RPC.Server rpcServer, NamenodeRegistration reg) throws IOException {
        this.nnAddress = nnAddr;
        this.listener = listener;
        this.registration = reg;
        this.internalRpcServer = false;
        this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddr, NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true).getProxy();
        this.initRpcServer(conf, rpcServer);
    }

    JournalService(Configuration conf, InetSocketAddress nnAddr, InetSocketAddress serverAddress, JournalListener listener) throws IOException {
        this.nnAddress = nnAddr;
        this.listener = listener;
        this.internalRpcServer = true;
        this.namenode = NameNodeProxies.createNonHAProxy(conf, nnAddr, NamenodeProtocol.class, UserGroupInformation.getCurrentUser(), true).getProxy();
        this.initRpcServer(conf, serverAddress);
        String addr = NetUtils.getHostPortString((InetSocketAddress)this.rpcServer.getListenerAddress());
        StorageInfo storage = new StorageInfo(LayoutVersion.getCurrentLayoutVersion(), 0, "", 0L);
        this.registration = new NamenodeRegistration(addr, "", storage, HdfsServerConstants.NamenodeRole.BACKUP);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        JournalService journalService = this;
        synchronized (journalService) {
            if (this.state != State.INIT) {
                LOG.info((Object)("Service cannot be started in state - " + (Object)((Object)this.state)));
                return;
            }
            this.state = State.STARTING_UP;
        }
        if (this.internalRpcServer) {
            LOG.info((Object)"Starting rpc server");
            this.rpcServer.start();
        }
        boolean registered = false;
        boolean handshakeComplete = false;
        boolean rollEdits = false;
        while (this.state == State.STARTING_UP) {
            try {
                if (!handshakeComplete) {
                    this.handshake();
                    handshakeComplete = true;
                    LOG.info((Object)"handshake completed");
                }
                if (!registered) {
                    this.registerWithNamenode();
                    registered = true;
                    LOG.info((Object)"Registration completed");
                }
                if (!rollEdits) {
                    this.namenode.rollEditLog();
                    rollEdits = true;
                    LOG.info((Object)"Editlog roll completed");
                    break;
                }
            }
            catch (IOException ioe) {
                LOG.warn((Object)"Encountered exception ", (Throwable)ioe);
            }
            catch (Exception e) {
                LOG.warn((Object)"Encountered exception ", (Throwable)e);
            }
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException ie) {
                LOG.warn((Object)"Encountered exception ", (Throwable)ie);
            }
        }
        JournalService journalService2 = this;
        synchronized (journalService2) {
            this.state = State.RUNNING;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        JournalService journalService = this;
        synchronized (journalService) {
            if (this.state == State.STOPPED) {
                return;
            }
            this.state = State.STOPPED;
        }
        if (this.internalRpcServer && this.rpcServer != null) {
            this.rpcServer.stop();
            this.rpcServer = null;
        }
    }

    @Override
    public void journal(NamenodeRegistration registration, long firstTxnId, int numTxns, byte[] records) throws IOException {
        if (LOG.isTraceEnabled()) {
            LOG.trace((Object)("Received journal " + firstTxnId + " " + numTxns));
        }
        this.verify(registration);
        this.listener.journal(this, firstTxnId, numTxns, records);
    }

    @Override
    public void startLogSegment(NamenodeRegistration registration, long txid) throws IOException {
        if (LOG.isTraceEnabled()) {
            LOG.trace((Object)("Received startLogSegment " + txid));
        }
        this.verify(registration);
        this.listener.rollLogs(this, txid);
    }

    private void initRpcServer(Configuration conf, InetSocketAddress serverAddress) throws IOException {
        RPC.setProtocolEngine((Configuration)conf, JournalProtocolPB.class, ProtobufRpcEngine.class);
        JournalProtocolServerSideTranslatorPB xlator = new JournalProtocolServerSideTranslatorPB(this);
        BlockingService service = JournalProtocolProtos.JournalProtocolService.newReflectiveBlockingService(xlator);
        this.rpcServer = RPC.getServer(JournalProtocolPB.class, (Object)service, (String)serverAddress.getHostName(), (int)serverAddress.getPort(), (int)1, (boolean)false, (Configuration)conf, null);
    }

    private void initRpcServer(Configuration conf, RPC.Server server) throws IOException {
        this.rpcServer = server;
        JournalProtocolServerSideTranslatorPB xlator = new JournalProtocolServerSideTranslatorPB(this);
        BlockingService service = JournalProtocolProtos.JournalProtocolService.newReflectiveBlockingService(xlator);
        DFSUtil.addPBProtocol(conf, JournalProtocolPB.class, service, this.rpcServer);
    }

    private void verify(NamenodeRegistration reg) throws IOException {
        if (!this.registration.getRegistrationID().equals(reg.getRegistrationID())) {
            LOG.warn((Object)("Invalid registrationID - expected: " + this.registration.getRegistrationID() + " received: " + reg.getRegistrationID()));
            throw new UnregisteredNodeException(reg);
        }
    }

    private void registerWithNamenode() throws IOException {
        NamenodeRegistration nnReg = this.namenode.register(this.registration);
        String msg = null;
        if (nnReg == null) {
            msg = "Registration rejected by " + this.nnAddress;
        } else if (!nnReg.isRole(HdfsServerConstants.NamenodeRole.NAMENODE)) {
            msg = " Name-node " + this.nnAddress + " is not active";
        }
        if (msg != null) {
            LOG.error(msg);
            throw new IOException(msg);
        }
    }

    private void handshake() throws IOException {
        NamespaceInfo nsInfo = this.namenode.versionRequest();
        this.listener.verifyVersion(this, nsInfo);
        this.registration.setStorageInfo(nsInfo);
    }

    public static interface JournalListener {
        public void verifyVersion(JournalService var1, NamespaceInfo var2);

        public void journal(JournalService var1, long var2, int var4, byte[] var5) throws IOException;

        public void rollLogs(JournalService var1, long var2) throws IOException;
    }

    static enum State {
        INIT,
        STARTING_UP,
        RUNNING,
        STOPPED;

    }
}

