/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.java.typeutils.runtime.kryo;

import java.io.IOException;
import java.io.Serializable;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.typeutils.TypeSerializerMatchers;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoPojosForMigrationTests;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.testutils.ClassLoaderUtils;
import org.hamcrest.MatcherAssert;
import org.junit.Before;
import org.junit.Test;

public class KryoSerializerSnapshotTest {
    private SerializerConfigImpl oldConfig;
    private SerializerConfigImpl newConfig;

    @Before
    public void setup() {
        this.oldConfig = new SerializerConfigImpl();
        this.newConfig = new SerializerConfigImpl();
    }

    @Test
    public void sanityTest() {
        MatcherAssert.assertThat(KryoSerializerSnapshotTest.resolveKryoCompatibility(this.oldConfig, this.newConfig), TypeSerializerMatchers.isCompatibleAsIs());
    }

    @Test
    public void addingTypesIsCompatibleAfterReconfiguration() {
        this.oldConfig.registerKryoType(KryoPojosForMigrationTests.Animal.class);
        this.newConfig.registerKryoType(KryoPojosForMigrationTests.Animal.class);
        this.newConfig.registerTypeWithKryoSerializer(KryoPojosForMigrationTests.Dog.class, KryoPojosForMigrationTests.DogKryoSerializer.class);
        MatcherAssert.assertThat(KryoSerializerSnapshotTest.resolveKryoCompatibility(this.oldConfig, this.newConfig), TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer());
    }

    @Test
    public void replacingKryoSerializersIsCompatibleAsIs() {
        this.oldConfig.registerKryoType(KryoPojosForMigrationTests.Animal.class);
        this.oldConfig.registerTypeWithKryoSerializer(KryoPojosForMigrationTests.Dog.class, KryoPojosForMigrationTests.DogKryoSerializer.class);
        this.newConfig.registerKryoType(KryoPojosForMigrationTests.Animal.class);
        this.newConfig.registerTypeWithKryoSerializer(KryoPojosForMigrationTests.Dog.class, KryoPojosForMigrationTests.DogV2KryoSerializer.class);
        MatcherAssert.assertThat(KryoSerializerSnapshotTest.resolveKryoCompatibility(this.oldConfig, this.newConfig), TypeSerializerMatchers.isCompatibleAsIs());
    }

    @Test
    public void reorderingIsCompatibleAfterReconfiguration() {
        this.oldConfig.registerKryoType(KryoPojosForMigrationTests.Parrot.class);
        this.oldConfig.registerKryoType(KryoPojosForMigrationTests.Dog.class);
        this.newConfig.registerKryoType(KryoPojosForMigrationTests.Dog.class);
        this.newConfig.registerKryoType(KryoPojosForMigrationTests.Parrot.class);
        MatcherAssert.assertThat(KryoSerializerSnapshotTest.resolveKryoCompatibility(this.oldConfig, this.newConfig), TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer());
    }

    @Test
    public void tryingToRestoreWithNonExistingClassShouldBeIncompatible() throws IOException {
        TypeSerializerSnapshot<KryoPojosForMigrationTests.Animal> restoredSnapshot = KryoSerializerSnapshotTest.kryoSnapshotWithMissingClass();
        KryoSerializer currentSerializer = new KryoSerializer(KryoPojosForMigrationTests.Animal.class, (SerializerConfig)new SerializerConfigImpl());
        MatcherAssert.assertThat((Object)currentSerializer.snapshotConfiguration().resolveSchemaCompatibility(restoredSnapshot), TypeSerializerMatchers.isIncompatible());
    }

    private static TypeSerializerSnapshot<KryoPojosForMigrationTests.Animal> kryoSnapshotWithMissingClass() throws IOException {
        DataInputDeserializer in = new DataInputDeserializer(KryoSerializerSnapshotTest.unLoadableSnapshotBytes());
        return TypeSerializerSnapshot.readVersionedSnapshot((DataInputView)in, (ClassLoader)KryoSerializerSnapshotTest.class.getClassLoader());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static byte[] unLoadableSnapshotBytes() throws IOException {
        ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
        ClassLoaderUtils.ObjectAndClassLoader<Serializable> outsideClassLoading = ClassLoaderUtils.createSerializableObjectFromNewClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(outsideClassLoading.getClassLoader());
            SerializerConfigImpl conf = new SerializerConfigImpl();
            conf.registerKryoType(outsideClassLoading.getObject().getClass());
            KryoSerializer previousSerializer = new KryoSerializer(KryoPojosForMigrationTests.Animal.class, (SerializerConfig)conf);
            TypeSerializerSnapshot previousSnapshot = previousSerializer.snapshotConfiguration();
            DataOutputSerializer out = new DataOutputSerializer(4096);
            TypeSerializerSnapshot.writeVersionedSnapshot((DataOutputView)out, (TypeSerializerSnapshot)previousSnapshot);
            byte[] byArray = out.getCopyOfBuffer();
            return byArray;
        }
        finally {
            Thread.currentThread().setContextClassLoader(originalClassLoader);
        }
    }

    private static TypeSerializerSchemaCompatibility<KryoPojosForMigrationTests.Animal> resolveKryoCompatibility(SerializerConfigImpl previous, SerializerConfigImpl current) {
        KryoSerializer previousSerializer = new KryoSerializer(KryoPojosForMigrationTests.Animal.class, (SerializerConfig)previous);
        TypeSerializerSnapshot previousSnapshot = previousSerializer.snapshotConfiguration();
        KryoSerializer currentSerializer = new KryoSerializer(KryoPojosForMigrationTests.Animal.class, (SerializerConfig)current);
        return currentSerializer.snapshotConfiguration().resolveSchemaCompatibility(previousSnapshot);
    }
}

