/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.persistence.remote.upgrade;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.BitSet;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.cache.impl.InvocationHelper;
import org.infinispan.client.hotrod.MetadataValue;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.write.ComputeCommand;
import org.infinispan.commons.dataconversion.ByteArrayWrapper;
import org.infinispan.commons.io.UnsignedNumeric;
import org.infinispan.commons.marshall.AbstractExternalizer;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.commons.util.EnumUtil;
import org.infinispan.commons.util.Util;
import org.infinispan.container.versioning.EntryVersion;
import org.infinispan.container.versioning.NumericVersion;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.encoding.DataConversion;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.threads.BlockingThreadFactory;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.metadata.EmbeddedMetadata;
import org.infinispan.metadata.Metadata;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.remote.ExternalizerIds;
import org.infinispan.persistence.remote.RemoteStore;
import org.infinispan.persistence.remote.logging.Log;
import org.infinispan.persistence.remote.upgrade.RemovedFilter;
import org.infinispan.util.logging.LogFactory;

public class MigrationTask
implements Function<EmbeddedCacheManager, Integer> {
    private static final Log log = (Log)LogFactory.getLog(MigrationTask.class, Log.class);
    private static final String THREAD_NAME = "RollingUpgrade-MigrationTask";
    private final String cacheName;
    private final Set<Integer> segments;
    private final int readBatch;
    private final int threads;
    private final Set<Object> deletedKeys = ConcurrentHashMap.newKeySet();
    private InvocationHelper invocationHelper;
    private CommandsFactory commandsFactory;
    private KeyPartitioner keyPartitioner;

    public MigrationTask(String cacheName, Set<Integer> segments, int readBatch, int threads) {
        this.cacheName = cacheName;
        this.segments = segments;
        this.readBatch = readBatch;
        this.threads = threads;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public Integer apply(EmbeddedCacheManager embeddedCacheManager) {
        ExecutorService executorService;
        AtomicInteger counter;
        block4: {
            counter = new AtomicInteger(0);
            BlockingThreadFactory threadFactory = new BlockingThreadFactory(null, 1, "RollingUpgrade-MigrationTask-%t", null, null);
            executorService = Executors.newFixedThreadPool(this.threads, (ThreadFactory)threadFactory);
            RemoveListener listener = null;
            AdvancedCache advancedCache = embeddedCacheManager.getCache(this.cacheName).getAdvancedCache();
            AdvancedCache cache = advancedCache.withStorageMediaType();
            try {
                ComponentRegistry cr = cache.getAdvancedCache().getComponentRegistry();
                this.invocationHelper = (InvocationHelper)cr.getComponent(InvocationHelper.class);
                this.commandsFactory = cr.getCommandsFactory();
                this.keyPartitioner = (KeyPartitioner)cr.getComponent(KeyPartitioner.class);
                PersistenceManager loaderManager = (PersistenceManager)cr.getComponent(PersistenceManager.class);
                Set stores = loaderManager.getStores(RemoteStore.class);
                listener = new RemoveListener();
                cache.addFilteredListener((Object)listener, new RemovedFilter(), null, Util.asSet((Object[])new Class[]{CacheEntryRemoved.class}));
                Iterator storeIterator = stores.iterator();
                if (storeIterator.hasNext()) {
                    RemoteStore store = (RemoteStore)storeIterator.next();
                    RemoteCache<Object, Object> storeCache = store.getRemoteCache();
                    this.migrateEntriesWithMetadata(storeCache, counter, executorService, (Cache<Object, Object>)cache);
                }
                if (listener == null) break block4;
            }
            catch (Throwable throwable) {
                if (listener != null) {
                    cache.removeListener(listener);
                }
                executorService.shutdown();
                throw throwable;
            }
            cache.removeListener((Object)listener);
        }
        executorService.shutdown();
        return counter.get();
    }

    private void migrateEntriesWithMetadata(RemoteCache<Object, Object> sourceCache, AtomicInteger counter, ExecutorService executorService, Cache<Object, Object> cache) {
        AdvancedCache destinationCache = cache.getAdvancedCache();
        DataConversion keyDataConversion = destinationCache.getKeyDataConversion();
        DataConversion valueDataConversion = destinationCache.getValueDataConversion();
        try (CloseableIterator iterator = sourceCache.retrieveEntriesWithMetadata(this.segments, this.readBatch);){
            CompletableFuture[] completableFutures = (CompletableFuture[])StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0), false).map(entry -> {
                Object key = entry.getKey();
                MetadataValue metadataValue = (MetadataValue)entry.getValue();
                int lifespan = metadataValue.getLifespan();
                int maxIdle = metadataValue.getMaxIdle();
                long version = metadataValue.getVersion();
                Metadata metadata = new EmbeddedMetadata.Builder().version((EntryVersion)new NumericVersion(version)).lifespan((long)lifespan, TimeUnit.SECONDS).maxIdle((long)maxIdle, TimeUnit.SECONDS).build();
                if (!this.deletedKeys.contains(ByteArrayWrapper.INSTANCE.wrap(key))) {
                    return CompletableFuture.supplyAsync(() -> {
                        int currentCount = counter.incrementAndGet();
                        if (log.isDebugEnabled() && currentCount % 100 == 0) {
                            log.debugf(">>    Migrated %s entries\n", currentCount);
                        }
                        return this.writeToDestinationCache((Map.Entry<Object, MetadataValue<Object>>)entry, metadata, keyDataConversion, valueDataConversion);
                    }, executorService);
                }
                return CompletableFuture.completedFuture(null);
            }).toArray(CompletableFuture[]::new);
            CompletableFuture.allOf(completableFutures).join();
        }
    }

    private Object writeToDestinationCache(Map.Entry<Object, MetadataValue<Object>> entry, Metadata metadata, DataConversion keyDataConversion, DataConversion valueDataConversion) {
        Object key = keyDataConversion.toStorage(entry.getKey());
        Object value = valueDataConversion.toStorage(entry.getValue().getValue());
        int segment = this.keyPartitioner.getSegment(key);
        long flags = EnumUtil.bitSetOf((Enum)Flag.SKIP_CACHE_LOAD, (Enum)Flag.ROLLING_UPGRADE);
        ComputeCommand computeCommand = this.commandsFactory.buildComputeCommand(key, new EntryWriter(value), false, segment, metadata, flags);
        InvocationContext context = this.invocationHelper.createInvocationContextWithImplicitTransaction(1, true);
        return this.invocationHelper.invoke(context, (VisitableCommand)computeCommand);
    }

    @Listener(clustered=true)
    private class RemoveListener {
        private RemoveListener() {
        }

        @CacheEntryRemoved
        public void entryRemoved(CacheEntryRemovedEvent event) {
            MigrationTask.this.deletedKeys.add(ByteArrayWrapper.INSTANCE.wrap(event.getKey()));
        }
    }

    public static class EntryWriter<K, V>
    implements BiFunction<K, V, V> {
        private final V newEntry;

        public EntryWriter(V newEntry) {
            this.newEntry = newEntry;
        }

        @Override
        public V apply(K k, V v) {
            return v == null ? this.newEntry : v;
        }
    }

    public static class EntryWriterExternalizer
    extends AbstractExternalizer<EntryWriter> {
        public Set<Class<? extends EntryWriter>> getTypeClasses() {
            return Collections.singleton(EntryWriter.class);
        }

        public Integer getId() {
            return ExternalizerIds.ENTRY_WRITER;
        }

        public void writeObject(ObjectOutput output, EntryWriter object) throws IOException {
            output.writeObject(object.newEntry);
        }

        public EntryWriter readObject(ObjectInput input) throws IOException, ClassNotFoundException {
            Object newEntry = input.readObject();
            return new EntryWriter(newEntry);
        }
    }

    public static class Externalizer
    extends AbstractExternalizer<MigrationTask> {
        public Set<Class<? extends MigrationTask>> getTypeClasses() {
            return Collections.singleton(MigrationTask.class);
        }

        public void writeObject(ObjectOutput output, MigrationTask task) throws IOException {
            output.writeObject(task.cacheName);
            UnsignedNumeric.writeUnsignedInt((DataOutput)output, (int)task.readBatch);
            UnsignedNumeric.writeUnsignedInt((DataOutput)output, (int)task.threads);
            BitSet bs = new BitSet();
            for (Integer segment : task.segments) {
                bs.set(segment);
            }
            byte[] bytes = bs.toByteArray();
            UnsignedNumeric.writeUnsignedInt((DataOutput)output, (int)bytes.length);
            output.write(bs.toByteArray());
        }

        public MigrationTask readObject(ObjectInput input) throws IOException, ClassNotFoundException {
            String cacheName = (String)input.readObject();
            int readBatch = UnsignedNumeric.readUnsignedInt((DataInput)input);
            int threads = UnsignedNumeric.readUnsignedInt((DataInput)input);
            int segmentsSize = UnsignedNumeric.readUnsignedInt((DataInput)input);
            byte[] bytes = new byte[segmentsSize];
            input.read(bytes);
            BitSet bitSet = BitSet.valueOf(bytes);
            Set<Integer> segments = bitSet.stream().boxed().collect(Collectors.toSet());
            return new MigrationTask(cacheName, segments, readBatch, threads);
        }
    }
}

