/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.kafka.core;

import com.gs.collections.api.block.function.Function;
import com.gs.collections.api.block.predicate.Predicate;
import com.gs.collections.api.partition.PartitionIterable;
import com.gs.collections.impl.block.factory.Functions;
import com.gs.collections.impl.map.mutable.UnifiedMap;
import com.gs.collections.impl.utility.Iterate;
import com.gs.collections.impl.utility.ListIterate;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import kafka.client.ClientUtils$;
import kafka.common.ErrorMapping;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataResponse;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.kafka.core.BrokerAddress;
import org.springframework.integration.kafka.core.Configuration;
import org.springframework.integration.kafka.core.Connection;
import org.springframework.integration.kafka.core.ConnectionFactory;
import org.springframework.integration.kafka.core.DefaultConnection;
import org.springframework.integration.kafka.core.MetadataCache;
import org.springframework.integration.kafka.core.Partition;
import org.springframework.integration.kafka.core.PartitionNotFoundException;
import org.springframework.integration.kafka.core.TopicNotFoundException;
import org.springframework.util.Assert;
import scala.collection.JavaConversions;
import scala.collection.Set;

public class DefaultConnectionFactory
implements InitializingBean,
ConnectionFactory,
DisposableBean {
    private static final Log log = LogFactory.getLog(DefaultConnectionFactory.class);
    public static final Predicate<TopicMetadata> errorlessTopicMetadataPredicate = new ErrorlessTopicMetadataPredicate();
    private final GetBrokersByPartitionFunction getBrokersByPartitionFunction = new GetBrokersByPartitionFunction();
    private final Configuration configuration;
    private final AtomicReference<MetadataCache> metadataCacheHolder = new AtomicReference<MetadataCache>(new MetadataCache(Collections.emptySet()));
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final UnifiedMap<BrokerAddress, Connection> kafkaBrokersCache = UnifiedMap.newMap();

    public DefaultConnectionFactory(Configuration configuration) {
        this.configuration = configuration;
    }

    public Configuration getConfiguration() {
        return this.configuration;
    }

    public void afterPropertiesSet() throws Exception {
        Assert.notNull((Object)this.configuration, (String)"Kafka configuration cannot be empty");
    }

    public void destroy() throws Exception {
        for (Connection connection : this.kafkaBrokersCache) {
            connection.close();
        }
    }

    @Override
    public Map<Partition, BrokerAddress> getLeaders(Iterable<Partition> partitions) {
        return Iterate.toMap(partitions, (Function)Functions.getPassThru(), (Function)this.getBrokersByPartitionFunction);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public BrokerAddress getLeader(Partition partition) {
        BrokerAddress leader = null;
        try {
            this.lock.readLock().lock();
            leader = this.getMetadataCache().getLeader(partition);
        }
        finally {
            this.lock.readLock().unlock();
        }
        if (leader == null) {
            try {
                this.lock.writeLock().lock();
                leader = this.getMetadataCache().getLeader(partition);
                if (leader == null) {
                    this.refreshMetadata(Collections.singleton(partition.getTopic()));
                    leader = this.getMetadataCache().getLeader(partition);
                }
            }
            finally {
                this.lock.writeLock().unlock();
            }
        }
        if (leader == null) {
            throw new PartitionNotFoundException(partition);
        }
        return leader;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Connection connect(BrokerAddress brokerAddress) {
        Connection connection = null;
        try {
            this.lock.readLock().lock();
            connection = (Connection)this.kafkaBrokersCache.get((Object)brokerAddress);
        }
        finally {
            this.lock.readLock().unlock();
        }
        if (connection == null) {
            try {
                this.lock.writeLock().lock();
                connection = (Connection)this.kafkaBrokersCache.get((Object)brokerAddress);
                if (connection == null) {
                    connection = new DefaultConnection(brokerAddress, this.configuration.getClientId(), this.configuration.getBufferSize(), this.configuration.getSocketTimeout(), this.configuration.getMinBytes(), this.configuration.getMaxWait());
                    this.kafkaBrokersCache.put((Object)brokerAddress, (Object)connection);
                }
            }
            finally {
                this.lock.writeLock().unlock();
            }
        }
        return connection;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void refreshMetadata(Collection<String> topics) {
        try {
            this.lock.writeLock().lock();
            String brokerAddressesAsString = ListIterate.collect(this.configuration.getBrokerAddresses(), (Function)Functions.getToString()).makeString(",");
            TopicMetadataResponse topicMetadataResponse = new TopicMetadataResponse(ClientUtils$.MODULE$.fetchTopicMetadata((Set)JavaConversions.asScalaSet(new HashSet<String>(topics)), ClientUtils$.MODULE$.parseBrokerList(brokerAddressesAsString), this.configuration.getClientId(), this.configuration.getFetchMetadataTimeout(), 0));
            PartitionIterable selectWithoutErrors = Iterate.partition((Iterable)topicMetadataResponse.topicsMetadata(), errorlessTopicMetadataPredicate);
            this.metadataCacheHolder.set(this.metadataCacheHolder.get().merge((Iterable<TopicMetadata>)selectWithoutErrors.getSelected()));
            if (log.isInfoEnabled()) {
                for (TopicMetadata topicMetadata : selectWithoutErrors.getRejected()) {
                    log.info((Object)String.format("No metadata could be retrieved for '%s'", topicMetadata.topic()), ErrorMapping.exceptionFor((short)topicMetadata.errorCode()));
                }
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void disconnect(BrokerAddress brokerAddress) {
        try {
            this.lock.writeLock().lock();
            Connection connection = (Connection)this.kafkaBrokersCache.get((Object)brokerAddress);
            if (connection != null) {
                connection.close();
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Collection<Partition> getPartitions(String topic) {
        Collection<Partition> returnedPartitions = null;
        try {
            this.lock.readLock().lock();
            returnedPartitions = this.getMetadataCache().getPartitions(topic);
        }
        finally {
            this.lock.readLock().unlock();
        }
        if (returnedPartitions == null) {
            try {
                this.lock.writeLock().lock();
                returnedPartitions = this.getMetadataCache().getPartitions(topic);
                if (returnedPartitions == null) {
                    this.refreshMetadata(Collections.singleton(topic));
                    returnedPartitions = this.getMetadataCache().getPartitions(topic);
                }
            }
            finally {
                this.lock.writeLock().unlock();
            }
        }
        if (returnedPartitions == null) {
            throw new TopicNotFoundException(topic);
        }
        return returnedPartitions;
    }

    private MetadataCache getMetadataCache() {
        return this.metadataCacheHolder.get();
    }

    private class GetBrokersByPartitionFunction
    implements Function<Partition, BrokerAddress> {
        private GetBrokersByPartitionFunction() {
        }

        public BrokerAddress valueOf(Partition partition) {
            return ((MetadataCache)DefaultConnectionFactory.this.metadataCacheHolder.get()).getLeader(partition);
        }
    }

    private static class ErrorlessTopicMetadataPredicate
    implements Predicate<TopicMetadata> {
        private ErrorlessTopicMetadataPredicate() {
        }

        public boolean accept(TopicMetadata topicMetadata) {
            return topicMetadata.errorCode() == ErrorMapping.NoError();
        }
    }
}

