/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.SortedSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.BackupJournalManager;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
import org.apache.hadoop.hdfs.server.namenode.INodeSymlink;
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.security.token.delegation.DelegationKey;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class FSEditLog {
    static final String NO_JOURNAL_STREAMS_WARNING = "!!! WARNING !!! File system changes are not persistent. No journal streams.";
    static final Log LOG = LogFactory.getLog(FSEditLog.class);
    private State state = State.UNINITIALIZED;
    private List<JournalAndStream> journals = Lists.newArrayList();
    private long txid = 0L;
    private long synctxid = 0L;
    private long curSegmentTxId = -12345L;
    private long lastPrintTime;
    private volatile boolean isSyncRunning = false;
    private volatile boolean isAutoSyncScheduled = false;
    private Runtime runtime = Runtime.getRuntime();
    private long numTransactions;
    private long numTransactionsBatchedInSync;
    private long totalTimeTransactions;
    private NameNodeMetrics metrics;
    private NNStorage storage;
    private static final ThreadLocal<TransactionId> myTransactionId = new ThreadLocal<TransactionId>(){

        @Override
        protected synchronized TransactionId initialValue() {
            return new TransactionId(Long.MAX_VALUE);
        }
    };

    FSEditLog(NNStorage storage) {
        this.storage = storage;
        this.metrics = NameNode.getNameNodeMetrics();
        this.lastPrintTime = Util.now();
        for (Storage.StorageDirectory sd : storage.dirIterable(NNStorage.NameNodeDirType.EDITS)) {
            this.journals.add(new JournalAndStream(new FileJournalManager(sd)));
        }
        if (this.journals.isEmpty()) {
            LOG.error((Object)"No edits directories configured!");
        }
        this.state = State.BETWEEN_LOG_SEGMENTS;
    }

    synchronized void open() throws IOException {
        Preconditions.checkState((this.state == State.BETWEEN_LOG_SEGMENTS ? 1 : 0) != 0);
        this.startLogSegment(this.getLastWrittenTxId() + 1L, true);
        assert (this.state == State.IN_SEGMENT) : "Bad state: " + (Object)((Object)this.state);
    }

    synchronized boolean isOpen() {
        return this.state == State.IN_SEGMENT;
    }

    synchronized void close() {
        if (this.state == State.CLOSED) {
            LOG.warn((Object)"Closing log when already closed", (Throwable)new Exception());
            return;
        }
        if (this.state == State.IN_SEGMENT) {
            assert (!this.journals.isEmpty());
            this.waitForSyncToFinish();
            this.endCurrentLogSegment(true);
        }
        this.state = State.CLOSED;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void logEdit(final FSEditLogOp op) {
        FSEditLog fSEditLog = this;
        synchronized (fSEditLog) {
            assert (this.state != State.CLOSED);
            this.waitIfAutoSyncScheduled();
            if (this.journals.isEmpty()) {
                throw new IllegalStateException(NO_JOURNAL_STREAMS_WARNING);
            }
            long start = this.beginTransaction();
            op.setTransactionId(this.txid);
            this.mapJournalsAndReportErrors(new JournalClosure(){

                @Override
                public void apply(JournalAndStream jas) throws IOException {
                    if (!jas.isActive()) {
                        return;
                    }
                    jas.stream.write(op);
                }
            }, "logging edit");
            this.endTransaction(start);
            if (!this.shouldForceSync()) {
                return;
            }
            this.isAutoSyncScheduled = true;
        }
        this.logSync();
    }

    synchronized void waitIfAutoSyncScheduled() {
        try {
            while (this.isAutoSyncScheduled) {
                this.wait(1000L);
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    synchronized void doneWithAutoSyncScheduling() {
        if (this.isAutoSyncScheduled) {
            this.isAutoSyncScheduled = false;
            this.notifyAll();
        }
    }

    private boolean shouldForceSync() {
        for (JournalAndStream jas : this.journals) {
            if (!jas.isActive() || !jas.getCurrentStream().shouldForceSync()) continue;
            return true;
        }
        return false;
    }

    private long beginTransaction() {
        assert (Thread.holdsLock(this));
        ++this.txid;
        TransactionId id = myTransactionId.get();
        id.txid = this.txid;
        return Util.now();
    }

    private void endTransaction(long start) {
        assert (Thread.holdsLock(this));
        long end = Util.now();
        ++this.numTransactions;
        this.totalTimeTransactions += end - start;
        if (this.metrics != null) {
            this.metrics.addTransaction(end - start);
        }
    }

    synchronized long getLastWrittenTxId() {
        return this.txid;
    }

    synchronized long getCurSegmentTxId() {
        Preconditions.checkState((this.state == State.IN_SEGMENT ? 1 : 0) != 0, (String)"Bad state: %s", (Object[])new Object[]{this.state});
        return this.curSegmentTxId;
    }

    synchronized void setNextTxId(long nextTxId) {
        Preconditions.checkArgument((this.synctxid <= this.txid && nextTxId >= this.txid ? 1 : 0) != 0, (String)"May not decrease txid. synctxid=%s txid=%s nextTxId=%s", (Object[])new Object[]{this.synctxid, this.txid, nextTxId});
        this.txid = nextTxId - 1L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void logSyncAll() throws IOException {
        FSEditLog fSEditLog = this;
        synchronized (fSEditLog) {
            TransactionId id = myTransactionId.get();
            id.txid = this.txid;
        }
        this.logSync();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void logSync() {
        long syncStart = 0L;
        long mytxid = FSEditLog.myTransactionId.get().txid;
        ArrayList candidateJournals = Lists.newArrayListWithCapacity((int)this.journals.size());
        ArrayList badJournals = Lists.newArrayList();
        boolean sync = false;
        try {
            FSEditLog fSEditLog = this;
            synchronized (fSEditLog) {
                try {
                    this.printStatistics(false);
                    while (mytxid > this.synctxid && this.isSyncRunning) {
                        try {
                            this.wait(1000L);
                        }
                        catch (InterruptedException ie) {}
                    }
                    if (mytxid <= this.synctxid) {
                        ++this.numTransactionsBatchedInSync;
                        if (this.metrics == null) return;
                        this.metrics.incrTransactionsBatchedInSync();
                        return;
                    }
                    syncStart = this.txid;
                    this.isSyncRunning = true;
                    sync = true;
                    assert (!this.journals.isEmpty()) : "no editlog streams";
                    for (JournalAndStream jas : this.journals) {
                        if (!jas.isActive()) continue;
                        try {
                            jas.getCurrentStream().setReadyToFlush();
                            candidateJournals.add(jas);
                        }
                        catch (IOException ie) {
                            LOG.error((Object)"Unable to get ready to flush.", (Throwable)ie);
                            badJournals.add(jas);
                        }
                    }
                }
                finally {
                    this.doneWithAutoSyncScheduling();
                }
            }
            long start = Util.now();
            for (JournalAndStream jas : candidateJournals) {
                if (!jas.isActive()) continue;
                try {
                    jas.getCurrentStream().flush();
                }
                catch (IOException ie) {
                    LOG.error((Object)"Unable to sync edit log.", (Throwable)ie);
                    badJournals.add(jas);
                }
            }
            long elapsed = Util.now() - start;
            this.disableAndReportErrorOnJournals(badJournals);
            if (this.metrics == null) return;
            this.metrics.addSync(elapsed);
            return;
        }
        finally {
            FSEditLog ie = this;
            synchronized (ie) {
                if (sync) {
                    if (badJournals.size() >= this.journals.size()) {
                        LOG.fatal((Object)("Could not sync any journal to persistent storage. Unsynced transactions: " + (this.txid - this.synctxid)), (Throwable)new Exception());
                        this.runtime.exit(1);
                    }
                    this.synctxid = syncStart;
                    this.isSyncRunning = false;
                }
                this.notifyAll();
            }
        }
    }

    private void printStatistics(boolean force) {
        long now = Util.now();
        if (this.lastPrintTime + 60000L > now && !force) {
            return;
        }
        if (this.journals.isEmpty()) {
            return;
        }
        this.lastPrintTime = now;
        StringBuilder buf = new StringBuilder();
        buf.append("Number of transactions: ");
        buf.append(this.numTransactions);
        buf.append(" Total time for transactions(ms): ");
        buf.append(this.totalTimeTransactions);
        buf.append("Number of transactions batched in Syncs: ");
        buf.append(this.numTransactionsBatchedInSync);
        buf.append(" Number of syncs: ");
        for (JournalAndStream jas : this.journals) {
            if (!jas.isActive()) continue;
            buf.append(jas.getCurrentStream().getNumSync());
            break;
        }
        buf.append(" SyncTimes(ms): ");
        for (JournalAndStream jas : this.journals) {
            if (!jas.isActive()) continue;
            EditLogOutputStream eStream = jas.getCurrentStream();
            buf.append(eStream.getTotalSyncTime());
            buf.append(" ");
        }
        LOG.info((Object)buf);
    }

    public void logOpenFile(String path, INodeFileUnderConstruction newNode) {
        FSEditLogOp.AddOp op = (FSEditLogOp.AddOp)((FSEditLogOp.AddCloseOp)((FSEditLogOp.AddCloseOp)((FSEditLogOp.AddCloseOp)((FSEditLogOp.AddCloseOp)((FSEditLogOp.AddCloseOp)((FSEditLogOp.AddCloseOp)((FSEditLogOp.AddCloseOp)((FSEditLogOp.AddCloseOp)FSEditLogOp.AddOp.getInstance().setPath(path)).setReplication(newNode.getReplication())).setModificationTime(newNode.getModificationTime())).setAccessTime(newNode.getAccessTime())).setBlockSize(newNode.getPreferredBlockSize())).setBlocks(newNode.getBlocks())).setPermissionStatus(newNode.getPermissionStatus())).setClientName(newNode.getClientName())).setClientMachine(newNode.getClientMachine());
        this.logEdit(op);
    }

    public void logCloseFile(String path, INodeFile newNode) {
        FSEditLogOp.CloseOp op = (FSEditLogOp.CloseOp)((FSEditLogOp.AddCloseOp)((FSEditLogOp.AddCloseOp)((FSEditLogOp.AddCloseOp)((FSEditLogOp.AddCloseOp)((FSEditLogOp.AddCloseOp)((FSEditLogOp.AddCloseOp)FSEditLogOp.CloseOp.getInstance().setPath(path)).setReplication(newNode.getReplication())).setModificationTime(newNode.getModificationTime())).setAccessTime(newNode.getAccessTime())).setBlockSize(newNode.getPreferredBlockSize())).setBlocks(newNode.getBlocks())).setPermissionStatus(newNode.getPermissionStatus());
        this.logEdit(op);
    }

    public void logMkDir(String path, INode newNode) {
        FSEditLogOp.MkdirOp op = FSEditLogOp.MkdirOp.getInstance().setPath(path).setTimestamp(newNode.getModificationTime()).setPermissionStatus(newNode.getPermissionStatus());
        this.logEdit(op);
    }

    void logRename(String src, String dst, long timestamp) {
        FSEditLogOp.RenameOldOp op = FSEditLogOp.RenameOldOp.getInstance().setSource(src).setDestination(dst).setTimestamp(timestamp);
        this.logEdit(op);
    }

    void logRename(String src, String dst, long timestamp, Options.Rename ... options) {
        FSEditLogOp.RenameOp op = FSEditLogOp.RenameOp.getInstance().setSource(src).setDestination(dst).setTimestamp(timestamp).setOptions(options);
        this.logEdit(op);
    }

    void logSetReplication(String src, short replication) {
        FSEditLogOp.SetReplicationOp op = FSEditLogOp.SetReplicationOp.getInstance().setPath(src).setReplication(replication);
        this.logEdit(op);
    }

    void logSetQuota(String src, long nsQuota, long dsQuota) {
        FSEditLogOp.SetQuotaOp op = FSEditLogOp.SetQuotaOp.getInstance().setSource(src).setNSQuota(nsQuota).setDSQuota(dsQuota);
        this.logEdit(op);
    }

    void logSetPermissions(String src, FsPermission permissions) {
        FSEditLogOp.SetPermissionsOp op = FSEditLogOp.SetPermissionsOp.getInstance().setSource(src).setPermissions(permissions);
        this.logEdit(op);
    }

    void logSetOwner(String src, String username, String groupname) {
        FSEditLogOp.SetOwnerOp op = FSEditLogOp.SetOwnerOp.getInstance().setSource(src).setUser(username).setGroup(groupname);
        this.logEdit(op);
    }

    void logConcat(String trg, String[] srcs, long timestamp) {
        FSEditLogOp.ConcatDeleteOp op = FSEditLogOp.ConcatDeleteOp.getInstance().setTarget(trg).setSources(srcs).setTimestamp(timestamp);
        this.logEdit(op);
    }

    void logDelete(String src, long timestamp) {
        FSEditLogOp.DeleteOp op = FSEditLogOp.DeleteOp.getInstance().setPath(src).setTimestamp(timestamp);
        this.logEdit(op);
    }

    void logGenerationStamp(long genstamp) {
        FSEditLogOp.SetGenstampOp op = FSEditLogOp.SetGenstampOp.getInstance().setGenerationStamp(genstamp);
        this.logEdit(op);
    }

    void logTimes(String src, long mtime, long atime) {
        FSEditLogOp.TimesOp op = FSEditLogOp.TimesOp.getInstance().setPath(src).setModificationTime(mtime).setAccessTime(atime);
        this.logEdit(op);
    }

    void logSymlink(String path, String value, long mtime, long atime, INodeSymlink node) {
        FSEditLogOp.SymlinkOp op = FSEditLogOp.SymlinkOp.getInstance().setPath(path).setValue(value).setModificationTime(mtime).setAccessTime(atime).setPermissionStatus(node.getPermissionStatus());
        this.logEdit(op);
    }

    void logGetDelegationToken(DelegationTokenIdentifier id, long expiryTime) {
        FSEditLogOp.GetDelegationTokenOp op = FSEditLogOp.GetDelegationTokenOp.getInstance().setDelegationTokenIdentifier(id).setExpiryTime(expiryTime);
        this.logEdit(op);
    }

    void logRenewDelegationToken(DelegationTokenIdentifier id, long expiryTime) {
        FSEditLogOp.RenewDelegationTokenOp op = FSEditLogOp.RenewDelegationTokenOp.getInstance().setDelegationTokenIdentifier(id).setExpiryTime(expiryTime);
        this.logEdit(op);
    }

    void logCancelDelegationToken(DelegationTokenIdentifier id) {
        FSEditLogOp.CancelDelegationTokenOp op = FSEditLogOp.CancelDelegationTokenOp.getInstance().setDelegationTokenIdentifier(id);
        this.logEdit(op);
    }

    void logUpdateMasterKey(DelegationKey key) {
        FSEditLogOp.UpdateMasterKeyOp op = FSEditLogOp.UpdateMasterKeyOp.getInstance().setDelegationKey(key);
        this.logEdit(op);
    }

    void logReassignLease(String leaseHolder, String src, String newHolder) {
        FSEditLogOp.ReassignLeaseOp op = FSEditLogOp.ReassignLeaseOp.getInstance().setLeaseHolder(leaseHolder).setPath(src).setNewHolder(newHolder);
        this.logEdit(op);
    }

    private int countActiveJournals() {
        int count = 0;
        for (JournalAndStream jas : this.journals) {
            if (!jas.isActive()) continue;
            ++count;
        }
        return count;
    }

    @VisibleForTesting
    List<JournalAndStream> getJournals() {
        return this.journals;
    }

    @VisibleForTesting
    synchronized void setRuntimeForTesting(Runtime runtime) {
        this.runtime = runtime;
    }

    public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId) throws IOException {
        ArrayList allLogs = Lists.newArrayList();
        for (JournalAndStream j : this.journals) {
            if (!(j.getManager() instanceof FileJournalManager)) continue;
            FileJournalManager fjm = (FileJournalManager)j.getManager();
            try {
                allLogs.addAll(fjm.getRemoteEditLogs(fromTxId));
            }
            catch (Throwable t) {
                LOG.warn((Object)("Cannot list edit logs in " + fjm), t);
            }
        }
        ImmutableListMultimap logsByStartTxId = Multimaps.index((Iterable)allLogs, RemoteEditLog.GET_START_TXID);
        long curStartTxId = fromTxId;
        ArrayList logs = Lists.newArrayList();
        while (true) {
            ImmutableList logGroup;
            if ((logGroup = logsByStartTxId.get((Object)curStartTxId)).isEmpty()) {
                SortedSet<Object> startTxIds = Sets.newTreeSet((Iterable)logsByStartTxId.keySet());
                if ((startTxIds = startTxIds.tailSet(curStartTxId)).isEmpty()) break;
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Found gap in logs at " + curStartTxId + ": " + "not returning previous logs in manifest."));
                }
                logs.clear();
                curStartTxId = (Long)startTxIds.first();
                continue;
            }
            RemoteEditLog bestLog = (RemoteEditLog)Collections.max(logGroup);
            logs.add(bestLog);
            curStartTxId = bestLog.getEndTxId() + 1L;
        }
        RemoteEditLogManifest ret = new RemoteEditLogManifest(logs);
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Generated manifest for logs since " + fromTxId + ":" + ret));
        }
        return ret;
    }

    synchronized long rollEditLog() throws IOException {
        LOG.info((Object)"Rolling edit logs.");
        this.endCurrentLogSegment(true);
        long nextTxId = this.getLastWrittenTxId() + 1L;
        this.startLogSegment(nextTxId, true);
        assert (this.curSegmentTxId == nextTxId);
        return nextTxId;
    }

    synchronized void startLogSegment(final long segmentTxId, boolean writeHeaderTxn) throws IOException {
        LOG.info((Object)("Starting log segment at " + segmentTxId));
        Preconditions.checkArgument((segmentTxId > 0L ? 1 : 0) != 0, (String)"Bad txid: %s", (Object[])new Object[]{segmentTxId});
        Preconditions.checkState((this.state == State.BETWEEN_LOG_SEGMENTS ? 1 : 0) != 0, (String)"Bad state: %s", (Object[])new Object[]{this.state});
        Preconditions.checkState((segmentTxId > this.curSegmentTxId ? 1 : 0) != 0, (Object)("Cannot start writing to log segment " + segmentTxId + " when previous log segment started at " + this.curSegmentTxId));
        Preconditions.checkArgument((segmentTxId == this.txid + 1L ? 1 : 0) != 0, (String)"Cannot start log segment at txid %s when next expected txid is %s", (Object[])new Object[]{segmentTxId, this.txid + 1L});
        this.numTransactionsBatchedInSync = 0L;
        this.totalTimeTransactions = 0L;
        this.numTransactions = 0L;
        this.storage.attemptRestoreRemovedStorage();
        this.mapJournalsAndReportErrors(new JournalClosure(){

            @Override
            public void apply(JournalAndStream jas) throws IOException {
                jas.startLogSegment(segmentTxId);
            }
        }, "starting log segment " + segmentTxId);
        if (this.countActiveJournals() == 0) {
            throw new IOException("Unable to start log segment " + segmentTxId + ": no journals successfully started.");
        }
        this.curSegmentTxId = segmentTxId;
        this.state = State.IN_SEGMENT;
        if (writeHeaderTxn) {
            this.logEdit(FSEditLogOp.LogSegmentOp.getInstance(FSEditLogOpCodes.OP_START_LOG_SEGMENT));
            this.logSync();
        }
    }

    synchronized void endCurrentLogSegment(boolean writeEndTxn) {
        LOG.info((Object)("Ending log segment " + this.curSegmentTxId));
        Preconditions.checkState((this.state == State.IN_SEGMENT ? 1 : 0) != 0, (String)"Bad state: %s", (Object[])new Object[]{this.state});
        if (writeEndTxn) {
            this.logEdit(FSEditLogOp.LogSegmentOp.getInstance(FSEditLogOpCodes.OP_END_LOG_SEGMENT));
            this.logSync();
        }
        this.printStatistics(true);
        final long lastTxId = this.getLastWrittenTxId();
        this.mapJournalsAndReportErrors(new JournalClosure(){

            @Override
            public void apply(JournalAndStream jas) throws IOException {
                if (jas.isActive()) {
                    jas.close(lastTxId);
                }
            }
        }, "ending log segment");
        this.state = State.BETWEEN_LOG_SEGMENTS;
    }

    synchronized void abortCurrentLogSegment() {
        this.mapJournalsAndReportErrors(new JournalClosure(){

            @Override
            public void apply(JournalAndStream jas) throws IOException {
                jas.abort();
            }
        }, "aborting all streams");
        this.state = State.BETWEEN_LOG_SEGMENTS;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void purgeLogsOlderThan(final long minTxIdToKeep) {
        FSEditLog fSEditLog = this;
        synchronized (fSEditLog) {
            assert (this.curSegmentTxId == -12345L || minTxIdToKeep <= this.curSegmentTxId) : "cannot purge logs older than txid " + minTxIdToKeep + " when current segment starts at " + this.curSegmentTxId;
        }
        this.mapJournalsAndReportErrors(new JournalClosure(){

            @Override
            public void apply(JournalAndStream jas) throws IOException {
                jas.manager.purgeLogsOlderThan(minTxIdToKeep);
            }
        }, "purging logs older than " + minTxIdToKeep);
    }

    synchronized void waitForSyncToFinish() {
        while (this.isSyncRunning) {
            try {
                this.wait(1000L);
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    synchronized long getSyncTxId() {
        return this.synctxid;
    }

    public void setOutputBufferCapacity(int size) {
        for (JournalAndStream jas : this.journals) {
            jas.manager.setOutputBufferCapacity(size);
        }
    }

    synchronized void registerBackupNode(NamenodeRegistration bnReg, NamenodeRegistration nnReg) throws IOException {
        if (bnReg.isRole(HdfsServerConstants.NamenodeRole.CHECKPOINT)) {
            return;
        }
        JournalAndStream jas = this.findBackupJournalAndStream(bnReg);
        if (jas != null) {
            LOG.info((Object)("Backup node " + bnReg + " re-registers"));
            return;
        }
        LOG.info((Object)("Registering new backup node: " + bnReg));
        BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg);
        this.journals.add(new JournalAndStream(bjm));
    }

    synchronized void releaseBackupStream(NamenodeRegistration registration) {
        Iterator<JournalAndStream> iter = this.journals.iterator();
        while (iter.hasNext()) {
            JournalAndStream jas = iter.next();
            if (!(jas.manager instanceof BackupJournalManager) || !((BackupJournalManager)jas.manager).matchesRegistration(registration)) continue;
            jas.abort();
            LOG.info((Object)("Removing backup journal " + jas));
            iter.remove();
        }
    }

    private synchronized JournalAndStream findBackupJournalAndStream(NamenodeRegistration bnReg) {
        for (JournalAndStream jas : this.journals) {
            BackupJournalManager bjm;
            if (!(jas.manager instanceof BackupJournalManager) || !(bjm = (BackupJournalManager)jas.manager).matchesRegistration(bnReg)) continue;
            return jas;
        }
        return null;
    }

    synchronized void logEdit(final int length, final byte[] data) {
        long start = this.beginTransaction();
        this.mapJournalsAndReportErrors(new JournalClosure(){

            @Override
            public void apply(JournalAndStream jas) throws IOException {
                if (jas.isActive()) {
                    jas.getCurrentStream().writeRaw(data, 0, length);
                }
            }
        }, "Logging edit");
        this.endTransaction(start);
    }

    private void mapJournalsAndReportErrors(JournalClosure closure, String status) {
        LinkedList badJAS = Lists.newLinkedList();
        for (JournalAndStream jas : this.journals) {
            try {
                closure.apply(jas);
            }
            catch (Throwable t) {
                LOG.error((Object)("Error " + status + " (journal " + jas + ")"), t);
                badJAS.add(jas);
            }
        }
        this.disableAndReportErrorOnJournals(badJAS);
    }

    private void disableAndReportErrorOnJournals(List<JournalAndStream> badJournals) {
        if (badJournals == null || badJournals.isEmpty()) {
            return;
        }
        for (JournalAndStream j : badJournals) {
            LOG.error((Object)("Disabling journal " + j));
            j.abort();
        }
    }

    synchronized EditLogInputStream getInProgressFileInputStream() throws IOException {
        for (JournalAndStream jas : this.journals) {
            if (!jas.isActive()) continue;
            try {
                EditLogInputStream in = jas.getInProgressInputStream();
                if (in == null) continue;
                return in;
            }
            catch (IOException ioe) {
                LOG.warn((Object)("Unable to get the in-progress input stream from " + jas), (Throwable)ioe);
            }
        }
        throw new IOException("No in-progress stream provided edits");
    }

    static class JournalAndStream {
        private final JournalManager manager;
        private EditLogOutputStream stream;
        private long segmentStartsAtTxId = -12345L;

        private JournalAndStream(JournalManager manager) {
            this.manager = manager;
        }

        private void startLogSegment(long txId) throws IOException {
            Preconditions.checkState((this.stream == null ? 1 : 0) != 0);
            this.stream = this.manager.startLogSegment(txId);
            this.segmentStartsAtTxId = txId;
        }

        private void close(long lastTxId) throws IOException {
            Preconditions.checkArgument((lastTxId >= this.segmentStartsAtTxId ? 1 : 0) != 0, (String)"invalid segment: lastTxId %s >= segment starting txid %s", (Object[])new Object[]{lastTxId, this.segmentStartsAtTxId});
            if (this.stream == null) {
                return;
            }
            this.stream.close();
            this.manager.finalizeLogSegment(this.segmentStartsAtTxId, lastTxId);
            this.stream = null;
        }

        @VisibleForTesting
        void abort() {
            if (this.stream == null) {
                return;
            }
            try {
                this.stream.abort();
            }
            catch (IOException ioe) {
                LOG.error((Object)("Unable to abort stream " + this.stream), (Throwable)ioe);
            }
            this.stream = null;
            this.segmentStartsAtTxId = -12345L;
        }

        private boolean isActive() {
            return this.stream != null;
        }

        @VisibleForTesting
        EditLogOutputStream getCurrentStream() {
            return this.stream;
        }

        public String toString() {
            return "JournalAndStream(mgr=" + this.manager + ", " + "stream=" + this.stream + ")";
        }

        @VisibleForTesting
        void setCurrentStreamForTests(EditLogOutputStream stream) {
            this.stream = stream;
        }

        @VisibleForTesting
        JournalManager getManager() {
            return this.manager;
        }

        private EditLogInputStream getInProgressInputStream() throws IOException {
            return this.manager.getInProgressInputStream(this.segmentStartsAtTxId);
        }
    }

    private static interface JournalClosure {
        public void apply(JournalAndStream var1) throws IOException;
    }

    private static class TransactionId {
        public long txid;

        TransactionId(long value) {
            this.txid = value;
        }
    }

    private static enum State {
        UNINITIALIZED,
        BETWEEN_LOG_SEGMENTS,
        IN_SEGMENT,
        CLOSED;

    }
}

