/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.hadoop.fs;

import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamWriter;
import org.springframework.core.serializer.Serializer;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

public class HdfsItemWriter<T>
implements ItemStreamWriter<T> {
    private static final String BUFFER_KEY_PREFIX = HdfsItemWriter.class.getName() + ".BUFFER_KEY";
    private final String bufferKey;
    private String fileName;
    private FileSystem fileSystem;
    private FSDataOutputStream fsDataOutputStream;
    private Serializer<T> itemSerializer;

    public HdfsItemWriter(FileSystem fileSystem, Serializer<T> itemSerializer, String fileName) {
        Assert.notNull((Object)fileSystem, (String)"Hadoop FileSystem is required.");
        Assert.notNull(itemSerializer, (String)"A Serializer implementation is required");
        Assert.isTrue((boolean)StringUtils.hasText((String)fileName), (String)"A non-empty fileName is required.");
        this.fileSystem = fileSystem;
        this.bufferKey = BUFFER_KEY_PREFIX + "." + this.hashCode();
        this.itemSerializer = itemSerializer;
        this.fileName = fileName;
    }

    private List<? extends T> getCurrentBuffer() {
        if (!TransactionSynchronizationManager.hasResource((Object)this.bufferKey)) {
            TransactionSynchronizationManager.bindResource((Object)this.bufferKey, new ArrayList());
            TransactionSynchronizationManager.registerSynchronization((TransactionSynchronization)new TransactionSynchronizationAdapter(){

                public void beforeCommit(boolean readOnly) {
                    List items = (List)TransactionSynchronizationManager.getResource((Object)HdfsItemWriter.this.bufferKey);
                    if (!CollectionUtils.isEmpty((Collection)items) && !readOnly) {
                        HdfsItemWriter.this.doWrite(items);
                    }
                }

                public void afterCompletion(int status) {
                    if (TransactionSynchronizationManager.hasResource((Object)HdfsItemWriter.this.bufferKey)) {
                        TransactionSynchronizationManager.unbindResource((Object)HdfsItemWriter.this.bufferKey);
                    }
                }
            });
        }
        return (List)TransactionSynchronizationManager.getResource((Object)this.bufferKey);
    }

    protected void doWrite(List<? extends T> items) {
        if (!CollectionUtils.isEmpty(items)) {
            try {
                this.fsDataOutputStream.write(this.getPayloadAsBytes(items));
            }
            catch (IOException ioe) {
                throw new RuntimeException("Error writing to HDFS", ioe);
            }
        }
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        try {
            Path name = null;
            name = new Path(this.fileName);
            this.fileSystem.createNewFile(name);
            this.fsDataOutputStream = this.fileSystem.create(name);
        }
        catch (IOException ioe) {
            throw new RuntimeException("Unable to open file to write to", ioe);
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
    }

    public void write(List<? extends T> items) throws Exception {
        if (!this.transactionActive()) {
            this.doWrite(items);
            return;
        }
        List<T> bufferedItems = this.getCurrentBuffer();
        bufferedItems.addAll(items);
    }

    public void close() {
        if (this.fsDataOutputStream != null) {
            IOUtils.closeStream((Closeable)this.fsDataOutputStream);
        }
    }

    private byte[] getPayloadAsBytes(List<? extends T> items) throws IOException {
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        for (T item : items) {
            this.itemSerializer.serialize(item, (OutputStream)stream);
        }
        return stream.toByteArray();
    }

    private boolean transactionActive() {
        return TransactionSynchronizationManager.isActualTransactionActive();
    }
}

