/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Generated;
import net.bytebuddy.pool.TypePool;
import org.apache.commons.lang.StringUtils;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.io.BatchSourceConfig;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.auth.FunctionAuthUtils;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.utils.Actions;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.utils.ValidatableFunctionPackage;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionsManager;
import org.apache.pulsar.functions.worker.PackageUrlValidator;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FunctionActioner {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(FunctionActioner.class);
    private final WorkerConfig workerConfig;
    private final RuntimeFactory runtimeFactory;
    private final Namespace dlogNamespace;
    private final ConnectorsManager connectorsManager;
    private final FunctionsManager functionsManager;
    private final PulsarAdmin pulsarAdmin;
    private final PackageUrlValidator packageUrlValidator;

    public FunctionActioner(WorkerConfig workerConfig, RuntimeFactory runtimeFactory, Namespace dlogNamespace, ConnectorsManager connectorsManager, FunctionsManager functionsManager, PulsarAdmin pulsarAdmin, PackageUrlValidator packageUrlValidator) {
        this.workerConfig = workerConfig;
        this.runtimeFactory = runtimeFactory;
        this.dlogNamespace = dlogNamespace;
        this.connectorsManager = connectorsManager;
        this.functionsManager = functionsManager;
        this.pulsarAdmin = pulsarAdmin;
        this.packageUrlValidator = packageUrlValidator;
    }

    public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) {
        try {
            String packageFile;
            Function.FunctionMetaData functionMetaData = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData();
            Function.FunctionDetails functionDetails = functionMetaData.getFunctionDetails();
            int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
            log.info("{}/{}/{}-{} Starting function ...", new Object[]{functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName(), instanceId});
            String transformFunctionPackageFile = null;
            Function.PackageLocationMetaData pkgLocation = functionMetaData.getPackageLocation();
            Function.PackageLocationMetaData transformFunctionPkgLocation = functionMetaData.getTransformFunctionPackageLocation();
            if (this.runtimeFactory.externallyManaged()) {
                packageFile = pkgLocation.getPackagePath();
                transformFunctionPackageFile = transformFunctionPkgLocation.getPackagePath();
            } else {
                packageFile = this.getPackageFile(functionMetaData, functionDetails, instanceId, pkgLocation, InstanceUtils.calculateSubjectType((Function.FunctionDetails)functionDetails));
                if (!org.apache.commons.lang3.StringUtils.isEmpty((CharSequence)transformFunctionPkgLocation.getPackagePath())) {
                    transformFunctionPackageFile = this.getPackageFile(functionMetaData, functionDetails, instanceId, transformFunctionPkgLocation, Function.FunctionDetails.ComponentType.FUNCTION);
                }
            }
            this.setupBatchSource(functionDetails);
            RuntimeSpawner runtimeSpawner = this.getRuntimeSpawner(functionRuntimeInfo.getFunctionInstance(), packageFile, transformFunctionPackageFile);
            functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
            runtimeSpawner.start();
        }
        catch (Exception ex) {
            Function.FunctionDetails details = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
            log.error("{}/{}/{} Error starting function", new Object[]{details.getTenant(), details.getNamespace(), details.getName(), ex});
            functionRuntimeInfo.setStartupException(ex);
        }
    }

    private String getPackageFile(Function.FunctionMetaData functionMetaData, Function.FunctionDetails functionDetails, int instanceId, Function.PackageLocationMetaData pkgLocation, Function.FunctionDetails.ComponentType componentType) throws URISyntaxException, IOException, ClassNotFoundException, PulsarAdminException {
        String packageFile;
        String packagePath = pkgLocation.getPackagePath();
        boolean isPkgUrlProvided = Utils.isFunctionPackageUrlSupported((String)packagePath);
        if (isPkgUrlProvided && packagePath.startsWith("file")) {
            if (!this.packageUrlValidator.isValidPackageUrl(componentType, packagePath)) {
                throw new IllegalArgumentException("Package URL " + packagePath + " is not valid");
            }
            URL url = new URL(packagePath);
            File pkgFile = new File(url.toURI());
            packageFile = pkgFile.getAbsolutePath();
        } else if (FunctionCommon.isFunctionCodeBuiltin((Function.FunctionDetailsOrBuilder)functionDetails, (Function.FunctionDetails.ComponentType)componentType)) {
            File pkgFile = this.getBuiltinArchive(componentType, Function.FunctionDetails.newBuilder((Function.FunctionDetails)functionMetaData.getFunctionDetails()));
            packageFile = pkgFile.getAbsolutePath();
        } else {
            File pkgDir = new File(this.workerConfig.getDownloadDirectory(), this.getDownloadPackagePath(functionMetaData, instanceId));
            pkgDir.mkdirs();
            File pkgFile = new File(pkgDir, new File(FunctionActioner.getDownloadFileName(functionMetaData.getFunctionDetails(), pkgLocation)).getName());
            this.downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId, pkgLocation, componentType);
            packageFile = pkgFile.getAbsolutePath();
        }
        return packageFile;
    }

    RuntimeSpawner getRuntimeSpawner(Function.Instance instance, String packageFile, String transformFunctionPackageFile) {
        Function.FunctionMetaData functionMetaData = instance.getFunctionMetaData();
        int instanceId = instance.getInstanceId();
        Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder((Function.FunctionDetails)functionMetaData.getFunctionDetails());
        Function.FunctionAuthenticationSpec functionAuthenticationSpec = null;
        if (this.workerConfig.isAuthenticationEnabled() && instance.getFunctionMetaData().hasFunctionAuthSpec()) {
            functionAuthenticationSpec = instance.getFunctionMetaData().getFunctionAuthSpec();
        }
        InstanceConfig instanceConfig = this.createInstanceConfig(functionDetailsBuilder.build(), functionAuthenticationSpec, instanceId, this.workerConfig.getPulsarFunctionsCluster());
        RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, packageFile, functionMetaData.getPackageLocation().getOriginalFileName(), transformFunctionPackageFile, functionMetaData.getTransformFunctionPackageLocation().getOriginalFileName(), this.runtimeFactory, this.workerConfig.getInstanceLivenessCheckFreqMs());
        return runtimeSpawner;
    }

    InstanceConfig createInstanceConfig(Function.FunctionDetails functionDetails, Function.FunctionAuthenticationSpec functionAuthSpec, int instanceId, String clusterName) {
        InstanceConfig instanceConfig = new InstanceConfig();
        instanceConfig.setFunctionDetails(functionDetails);
        instanceConfig.setFunctionId(UUID.randomUUID().toString());
        instanceConfig.setTransformFunctionId(UUID.randomUUID().toString());
        instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
        instanceConfig.setInstanceId(instanceId);
        instanceConfig.setMaxBufferedTuples(1024);
        instanceConfig.setPort(FunctionCommon.findAvailablePort());
        instanceConfig.setMetricsPort(FunctionCommon.findAvailablePort());
        instanceConfig.setClusterName(clusterName);
        instanceConfig.setFunctionAuthenticationSpec(functionAuthSpec);
        instanceConfig.setMaxPendingAsyncRequests(this.workerConfig.getMaxPendingAsyncRequests());
        instanceConfig.setExposePulsarAdminClientEnabled(this.workerConfig.isExposeAdminClientEnabled());
        if (this.workerConfig.getAdditionalJavaRuntimeArguments() != null) {
            instanceConfig.setAdditionalJavaRuntimeArguments(this.workerConfig.getAdditionalJavaRuntimeArguments());
        }
        instanceConfig.setIgnoreUnknownConfigFields(this.workerConfig.isIgnoreUnknownConfigFields());
        return instanceConfig;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void downloadFile(File pkgFile, boolean isPkgUrlProvided, Function.FunctionMetaData functionMetaData, int instanceId, Function.PackageLocationMetaData pkgLocation, Function.FunctionDetails.ComponentType componentType) throws IOException, PulsarAdminException {
        File tempPkgFile;
        Function.FunctionDetails details = functionMetaData.getFunctionDetails();
        File pkgDir = pkgFile.getParentFile();
        if (pkgFile.exists()) {
            log.warn("Function package exists already {} deleting it", (Object)pkgFile);
            pkgFile.delete();
        }
        while ((tempPkgFile = new File(pkgDir, pkgFile.getName() + "." + instanceId + "." + String.valueOf(UUID.randomUUID()))).exists() || !tempPkgFile.createNewFile()) {
        }
        String pkgLocationPath = pkgLocation.getPackagePath();
        boolean downloadFromHttp = isPkgUrlProvided && pkgLocationPath.startsWith("http");
        boolean downloadFromPackageManagementService = isPkgUrlProvided && Utils.hasPackageTypePrefix((String)pkgLocationPath);
        log.info("{}/{}/{} Function package file {} will be downloaded from {}", new Object[]{tempPkgFile, details.getTenant(), details.getNamespace(), details.getName(), downloadFromHttp ? pkgLocationPath : pkgLocation});
        if (downloadFromHttp) {
            if (!this.packageUrlValidator.isValidPackageUrl(componentType, pkgLocationPath)) {
                throw new IllegalArgumentException("Package URL " + pkgLocationPath + " is not valid");
            }
            FunctionCommon.downloadFromHttpUrl((String)pkgLocationPath, (File)tempPkgFile);
        } else if (downloadFromPackageManagementService) {
            this.getPulsarAdmin().packages().download(pkgLocationPath, tempPkgFile.getPath());
        } else {
            FileOutputStream tempPkgFos = new FileOutputStream(tempPkgFile);
            WorkerUtils.downloadFromBookkeeper(this.dlogNamespace, tempPkgFos, pkgLocationPath);
            if (tempPkgFos != null) {
                tempPkgFos.close();
            }
        }
        try {
            try {
                Files.createLink(Paths.get(pkgFile.toURI()), Paths.get(tempPkgFile.toURI()));
                log.info("Function package file is linked from {} to {}", (Object)tempPkgFile, (Object)pkgFile);
            }
            catch (FileAlreadyExistsException faee) {
                log.warn("Function package has been downloaded from {} and saved at {}", (Object)pkgLocation, (Object)pkgFile);
            }
        }
        finally {
            tempPkgFile.delete();
        }
        if (details.getRuntime() == Function.FunctionDetails.Runtime.GO && !pkgFile.canExecute()) {
            pkgFile.setExecutable(true);
            log.info("Golang function package file {} is set to executable", (Object)pkgFile);
        }
    }

    private void cleanupFunctionFiles(FunctionRuntimeInfo functionRuntimeInfo) {
        Function.Instance instance = functionRuntimeInfo.getFunctionInstance();
        Function.FunctionMetaData functionMetaData = instance.getFunctionMetaData();
        File pkgDir = new File(this.workerConfig.getDownloadDirectory(), this.getDownloadPackagePath(functionMetaData, instance.getInstanceId()));
        if (pkgDir.exists()) {
            try {
                MoreFiles.deleteRecursively((Path)Paths.get(pkgDir.toURI()), (RecursiveDeleteOption[])new RecursiveDeleteOption[]{RecursiveDeleteOption.ALLOW_INSECURE});
            }
            catch (IOException e) {
                log.warn("Failed to delete package for function: {}", (Object)FunctionCommon.getFullyQualifiedName((Function.FunctionDetails)functionMetaData.getFunctionDetails()), (Object)e);
            }
        }
    }

    public void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
        Function.Instance instance = functionRuntimeInfo.getFunctionInstance();
        Function.FunctionMetaData functionMetaData = instance.getFunctionMetaData();
        Function.FunctionDetails details = functionMetaData.getFunctionDetails();
        log.info("{}/{}/{}-{} Stopping function...", new Object[]{details.getTenant(), details.getNamespace(), details.getName(), instance.getInstanceId()});
        if (functionRuntimeInfo.getRuntimeSpawner() != null) {
            functionRuntimeInfo.getRuntimeSpawner().close();
            functionRuntimeInfo.setRuntimeSpawner(null);
        }
        this.cleanupFunctionFiles(functionRuntimeInfo);
    }

    public void terminateFunction(final FunctionRuntimeInfo functionRuntimeInfo) {
        Function.FunctionDetails details = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
        final String fqfn = FunctionCommon.getFullyQualifiedName((Function.FunctionDetails)details);
        log.info("{}-{} Terminating function...", (Object)fqfn, (Object)functionRuntimeInfo.getFunctionInstance().getInstanceId());
        if (functionRuntimeInfo.getRuntimeSpawner() != null) {
            functionRuntimeInfo.getRuntimeSpawner().close();
            if (this.workerConfig.isAuthenticationEnabled()) {
                functionRuntimeInfo.getRuntimeSpawner().getRuntimeFactory().getAuthProvider().ifPresent(functionAuthProvider -> {
                    try {
                        log.info("{}-{} Cleaning up authentication data for function...", (Object)fqfn, (Object)functionRuntimeInfo.getFunctionInstance().getInstanceId());
                        functionAuthProvider.cleanUpAuthData(details, Optional.ofNullable(FunctionAuthUtils.getFunctionAuthData(Optional.ofNullable(functionRuntimeInfo.getRuntimeSpawner().getInstanceConfig().getFunctionAuthenticationSpec()))));
                    }
                    catch (Exception e) {
                        log.error("Failed to cleanup auth data for function: {}", (Object)fqfn, (Object)e);
                    }
                });
            }
            functionRuntimeInfo.setRuntimeSpawner(null);
        }
        this.cleanupFunctionFiles(functionRuntimeInfo);
        if (details.getSource().getCleanupSubscription()) {
            Map consumerSpecMap = details.getSource().getInputSpecsMap();
            consumerSpecMap.entrySet().forEach(new Consumer<Map.Entry<String, Function.ConsumerSpec>>(){

                @Override
                public void accept(Map.Entry<String, Function.ConsumerSpec> stringConsumerSpecEntry) {
                    Function.ConsumerSpec consumerSpec = stringConsumerSpecEntry.getValue();
                    String topic = stringConsumerSpecEntry.getKey();
                    String subscriptionName = org.apache.commons.lang3.StringUtils.isBlank((CharSequence)functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails().getSource().getSubscriptionName()) ? InstanceUtils.getDefaultSubscriptionName((Function.FunctionDetails)functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails()) : functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails().getSource().getSubscriptionName();
                    FunctionActioner.this.deleteSubscription(topic, consumerSpec, subscriptionName, String.format("Cleaning up subscriptions for function %s", fqfn));
                }
            });
        }
        this.cleanupBatchSource(details);
    }

    private void deleteSubscription(String topic, Function.ConsumerSpec consumerSpec, String subscriptionName, String msg) {
        try {
            Actions.newBuilder().addAction(Actions.Action.builder().actionName(msg).numRetries(10).sleepBetweenInvocationsMs(1000L).supplier(this.getDeleteSubscriptionSupplier(topic, consumerSpec.getIsRegexPattern(), subscriptionName)).build()).run();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private Supplier<Actions.ActionResult> getDeleteSubscriptionSupplier(String topic, boolean isRegex, String subscriptionName) {
        return () -> {
            try {
                if (isRegex) {
                    this.pulsarAdmin.namespaces().unsubscribeNamespace(TopicName.get((String)topic).getNamespace(), subscriptionName);
                } else {
                    this.pulsarAdmin.topics().deleteSubscription(topic, subscriptionName);
                }
            }
            catch (PulsarAdminException e) {
                String finalErrorMsg;
                String errorMsg;
                if (e instanceof PulsarAdminException.NotFoundException) {
                    return Actions.ActionResult.builder().success(true).build();
                }
                List existingConsumers = Collections.emptyList();
                SubscriptionStats sub = null;
                try {
                    TopicStats stats = this.pulsarAdmin.topics().getStats(topic);
                    sub = (SubscriptionStats)stats.getSubscriptions().get(subscriptionName);
                    if (sub != null) {
                        existingConsumers = sub.getConsumers().stream().map(consumerStats -> consumerStats.getMetadata()).collect(Collectors.toList());
                    }
                }
                catch (PulsarAdminException stats) {
                    // empty catch block
                }
                String string = errorMsg = e.getHttpError() != null ? e.getHttpError() : e.getMessage();
                if (sub != null) {
                    try {
                        finalErrorMsg = String.format("%s - existing consumers: %s", errorMsg, ObjectMapperFactory.getMapper().writer().writeValueAsString((Object)sub));
                    }
                    catch (JsonProcessingException jsonProcessingException) {
                        finalErrorMsg = errorMsg;
                    }
                } else {
                    finalErrorMsg = errorMsg;
                }
                return Actions.ActionResult.builder().success(false).errorMsg(finalErrorMsg).build();
            }
            return Actions.ActionResult.builder().success(true).build();
        };
    }

    private Supplier<Actions.ActionResult> getDeleteTopicSupplier(String topic) {
        return () -> {
            try {
                this.pulsarAdmin.topics().delete(topic, true);
            }
            catch (PulsarAdminException e) {
                String finalErrorMsg;
                String errorMsg;
                if (e instanceof PulsarAdminException.NotFoundException) {
                    return Actions.ActionResult.builder().success(true).build();
                }
                TopicStats stats = null;
                try {
                    stats = this.pulsarAdmin.topics().getStats(topic);
                }
                catch (PulsarAdminException pulsarAdminException) {
                    // empty catch block
                }
                String string = errorMsg = e.getHttpError() != null ? e.getHttpError() : e.getMessage();
                if (stats != null) {
                    try {
                        finalErrorMsg = String.format("%s - topic stats: %s", errorMsg, ObjectMapperFactory.getMapper().writer().writeValueAsString((Object)stats));
                    }
                    catch (JsonProcessingException jsonProcessingException) {
                        finalErrorMsg = errorMsg;
                    }
                } else {
                    finalErrorMsg = errorMsg;
                }
                return Actions.ActionResult.builder().success(false).errorMsg(finalErrorMsg).build();
            }
            return Actions.ActionResult.builder().success(true).build();
        };
    }

    private String getDownloadPackagePath(Function.FunctionMetaData functionMetaData, int instanceId) {
        return org.apache.commons.lang3.StringUtils.join((Object[])new String[]{functionMetaData.getFunctionDetails().getTenant(), functionMetaData.getFunctionDetails().getNamespace(), functionMetaData.getFunctionDetails().getName(), Integer.toString(instanceId)}, (char)File.separatorChar);
    }

    private File getBuiltinArchive(Function.FunctionDetails.ComponentType componentType, Function.FunctionDetails.Builder functionDetails) throws IOException, ClassNotFoundException {
        Function.SinkSpec sinkSpec;
        Function.SourceSpec sourceSpec;
        if (componentType == Function.FunctionDetails.ComponentType.SOURCE && functionDetails.hasSource() && !org.apache.commons.lang3.StringUtils.isEmpty((CharSequence)(sourceSpec = functionDetails.getSource()).getBuiltin())) {
            Connector connector = this.connectorsManager.getConnector(sourceSpec.getBuiltin());
            File archive = connector.getArchivePath().toFile();
            String sourceClass = connector.getConnectorDefinition().getSourceClass();
            Function.SourceSpec.Builder builder = Function.SourceSpec.newBuilder((Function.SourceSpec)functionDetails.getSource());
            builder.setClassName(sourceClass);
            functionDetails.setSource(builder);
            this.fillSourceTypeClass(functionDetails, connector.getConnectorFunctionPackage(), sourceClass);
            return archive;
        }
        if (componentType == Function.FunctionDetails.ComponentType.SINK && functionDetails.hasSink() && !org.apache.commons.lang3.StringUtils.isEmpty((CharSequence)(sinkSpec = functionDetails.getSink()).getBuiltin())) {
            Connector connector = this.connectorsManager.getConnector(sinkSpec.getBuiltin());
            File archive = connector.getArchivePath().toFile();
            String sinkClass = connector.getConnectorDefinition().getSinkClass();
            Function.SinkSpec.Builder builder = Function.SinkSpec.newBuilder((Function.SinkSpec)functionDetails.getSink());
            builder.setClassName(sinkClass);
            functionDetails.setSink(builder);
            this.fillSinkTypeClass(functionDetails, connector.getConnectorFunctionPackage(), sinkClass);
            return archive;
        }
        if (componentType == Function.FunctionDetails.ComponentType.FUNCTION && !org.apache.commons.lang3.StringUtils.isEmpty((CharSequence)functionDetails.getBuiltin())) {
            return this.functionsManager.getFunctionArchive(functionDetails.getBuiltin()).toFile();
        }
        throw new IOException("Could not find built in archive definition");
    }

    private void fillSourceTypeClass(Function.FunctionDetails.Builder functionDetails, ValidatableFunctionPackage functionPackage, String className) {
        String typeArg = FunctionCommon.getSourceType((String)className, (TypePool)functionPackage.getTypePool()).asErasure().getName();
        Function.SourceSpec.Builder sourceBuilder = Function.SourceSpec.newBuilder((Function.SourceSpec)functionDetails.getSource());
        sourceBuilder.setTypeClassName(typeArg);
        functionDetails.setSource(sourceBuilder);
        Function.SinkSpec sinkSpec = functionDetails.getSink();
        if (null == sinkSpec || org.apache.commons.lang3.StringUtils.isEmpty((CharSequence)sinkSpec.getTypeClassName())) {
            Function.SinkSpec.Builder sinkBuilder = Function.SinkSpec.newBuilder((Function.SinkSpec)sinkSpec);
            sinkBuilder.setTypeClassName(typeArg);
            functionDetails.setSink(sinkBuilder);
        }
    }

    private void fillSinkTypeClass(Function.FunctionDetails.Builder functionDetails, ValidatableFunctionPackage functionPackage, String className) {
        String typeArg = FunctionCommon.getSinkType((String)className, (TypePool)functionPackage.getTypePool()).asErasure().getName();
        Function.SinkSpec.Builder sinkBuilder = Function.SinkSpec.newBuilder((Function.SinkSpec)functionDetails.getSink());
        sinkBuilder.setTypeClassName(typeArg);
        functionDetails.setSink(sinkBuilder);
        Function.SourceSpec sourceSpec = functionDetails.getSource();
        if (null == sourceSpec || org.apache.commons.lang3.StringUtils.isEmpty((CharSequence)sourceSpec.getTypeClassName())) {
            Function.SourceSpec.Builder sourceBuilder = Function.SourceSpec.newBuilder((Function.SourceSpec)sourceSpec);
            sourceBuilder.setTypeClassName(typeArg);
            functionDetails.setSource(sourceBuilder);
        }
    }

    private static String getDownloadFileName(Function.FunctionDetails functionDetails, Function.PackageLocationMetaData packageLocation) {
        if (!StringUtils.isEmpty((String)packageLocation.getOriginalFileName())) {
            return packageLocation.getOriginalFileName();
        }
        String[] hierarchy = functionDetails.getClassName().split("\\.");
        String fileName = hierarchy.length <= 0 ? functionDetails.getClassName() : (hierarchy.length == 1 ? hierarchy[0] : hierarchy[hierarchy.length - 2]);
        switch (functionDetails.getRuntime()) {
            case JAVA: {
                return fileName + ".jar";
            }
            case PYTHON: {
                return fileName + ".py";
            }
            case GO: {
                return fileName + ".go";
            }
        }
        throw new RuntimeException("Unknown runtime " + String.valueOf(functionDetails.getRuntime()));
    }

    private void setupBatchSource(Function.FunctionDetails functionDetails) {
        if (FunctionActioner.isBatchSource(functionDetails)) {
            String intermediateTopicName = SourceConfigUtils.computeBatchSourceIntermediateTopicName((String)functionDetails.getTenant(), (String)functionDetails.getNamespace(), (String)functionDetails.getName()).toString();
            String intermediateTopicSubscription = SourceConfigUtils.computeBatchSourceInstanceSubscriptionName((String)functionDetails.getTenant(), (String)functionDetails.getNamespace(), (String)functionDetails.getName());
            String fqfn = FunctionCommon.getFullyQualifiedName((String)functionDetails.getTenant(), (String)functionDetails.getNamespace(), (String)functionDetails.getName());
            try {
                Actions.newBuilder().addAction(Actions.Action.builder().actionName(String.format("Creating intermediate topic%s with subscription %s for Batch Source %s", intermediateTopicName, intermediateTopicSubscription, fqfn)).numRetries(10).sleepBetweenInvocationsMs(1000L).supplier(() -> {
                    try {
                        this.pulsarAdmin.topics().createSubscription(intermediateTopicName, intermediateTopicSubscription, MessageId.latest);
                        return Actions.ActionResult.builder().success(true).build();
                    }
                    catch (PulsarAdminException.ConflictException e) {
                        return Actions.ActionResult.builder().success(true).build();
                    }
                    catch (Exception e) {
                        return Actions.ActionResult.builder().errorMsg(e.getMessage()).success(false).build();
                    }
                }).build()).run();
            }
            catch (InterruptedException e) {
                log.error("Error setting up instance subscription for intermediate topic", (Throwable)e);
                throw new RuntimeException(e);
            }
        }
    }

    private void cleanupBatchSource(Function.FunctionDetails functionDetails) {
        if (FunctionActioner.isBatchSource(functionDetails)) {
            String intermediateTopicName = SourceConfigUtils.computeBatchSourceIntermediateTopicName((String)functionDetails.getTenant(), (String)functionDetails.getNamespace(), (String)functionDetails.getName()).toString();
            String intermediateTopicSubscription = SourceConfigUtils.computeBatchSourceInstanceSubscriptionName((String)functionDetails.getTenant(), (String)functionDetails.getNamespace(), (String)functionDetails.getName());
            String fqfn = FunctionCommon.getFullyQualifiedName((Function.FunctionDetails)functionDetails);
            try {
                Actions.newBuilder().addAction(Actions.Action.builder().actionName(String.format("Removing intermediate topic subscription %s for Batch Source %s", intermediateTopicSubscription, fqfn)).numRetries(10).sleepBetweenInvocationsMs(1000L).supplier(this.getDeleteSubscriptionSupplier(intermediateTopicName, false, intermediateTopicSubscription)).build()).addAction(Actions.Action.builder().actionName(String.format("Deleting intermediate topic %s for Batch Source %s", intermediateTopicName, fqfn)).numRetries(10).sleepBetweenInvocationsMs(1000L).supplier(this.getDeleteTopicSupplier(intermediateTopicName)).build()).run();
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private static boolean isBatchSource(Function.FunctionDetails functionDetails) {
        if (InstanceUtils.calculateSubjectType((Function.FunctionDetails)functionDetails) == Function.FunctionDetails.ComponentType.SOURCE) {
            BatchSourceConfig batchSourceConfig;
            String fqfn = FunctionCommon.getFullyQualifiedName((Function.FunctionDetails)functionDetails);
            Map configMap = SourceConfigUtils.extractSourceConfig((Function.SourceSpec)functionDetails.getSource(), (String)fqfn);
            if (configMap != null && (batchSourceConfig = SourceConfigUtils.extractBatchSourceConfig((Map)configMap)) != null) {
                return true;
            }
        }
        return false;
    }

    @Generated
    public WorkerConfig getWorkerConfig() {
        return this.workerConfig;
    }

    @Generated
    public RuntimeFactory getRuntimeFactory() {
        return this.runtimeFactory;
    }

    @Generated
    public Namespace getDlogNamespace() {
        return this.dlogNamespace;
    }

    @Generated
    public ConnectorsManager getConnectorsManager() {
        return this.connectorsManager;
    }

    @Generated
    public FunctionsManager getFunctionsManager() {
        return this.functionsManager;
    }

    @Generated
    public PulsarAdmin getPulsarAdmin() {
        return this.pulsarAdmin;
    }

    @Generated
    public PackageUrlValidator getPackageUrlValidator() {
        return this.packageUrlValidator;
    }

    @Generated
    public boolean equals(Object o) {
        if (o == this) {
            return true;
        }
        if (!(o instanceof FunctionActioner)) {
            return false;
        }
        FunctionActioner other = (FunctionActioner)o;
        if (!other.canEqual(this)) {
            return false;
        }
        WorkerConfig this$workerConfig = this.getWorkerConfig();
        WorkerConfig other$workerConfig = other.getWorkerConfig();
        if (this$workerConfig == null ? other$workerConfig != null : !this$workerConfig.equals(other$workerConfig)) {
            return false;
        }
        RuntimeFactory this$runtimeFactory = this.getRuntimeFactory();
        RuntimeFactory other$runtimeFactory = other.getRuntimeFactory();
        if (this$runtimeFactory == null ? other$runtimeFactory != null : !this$runtimeFactory.equals(other$runtimeFactory)) {
            return false;
        }
        Namespace this$dlogNamespace = this.getDlogNamespace();
        Namespace other$dlogNamespace = other.getDlogNamespace();
        if (this$dlogNamespace == null ? other$dlogNamespace != null : !this$dlogNamespace.equals(other$dlogNamespace)) {
            return false;
        }
        ConnectorsManager this$connectorsManager = this.getConnectorsManager();
        ConnectorsManager other$connectorsManager = other.getConnectorsManager();
        if (this$connectorsManager == null ? other$connectorsManager != null : !this$connectorsManager.equals(other$connectorsManager)) {
            return false;
        }
        FunctionsManager this$functionsManager = this.getFunctionsManager();
        FunctionsManager other$functionsManager = other.getFunctionsManager();
        if (this$functionsManager == null ? other$functionsManager != null : !this$functionsManager.equals(other$functionsManager)) {
            return false;
        }
        PulsarAdmin this$pulsarAdmin = this.getPulsarAdmin();
        PulsarAdmin other$pulsarAdmin = other.getPulsarAdmin();
        if (this$pulsarAdmin == null ? other$pulsarAdmin != null : !this$pulsarAdmin.equals(other$pulsarAdmin)) {
            return false;
        }
        PackageUrlValidator this$packageUrlValidator = this.getPackageUrlValidator();
        PackageUrlValidator other$packageUrlValidator = other.getPackageUrlValidator();
        return !(this$packageUrlValidator == null ? other$packageUrlValidator != null : !this$packageUrlValidator.equals(other$packageUrlValidator));
    }

    @Generated
    protected boolean canEqual(Object other) {
        return other instanceof FunctionActioner;
    }

    @Generated
    public int hashCode() {
        int PRIME = 59;
        int result = 1;
        WorkerConfig $workerConfig = this.getWorkerConfig();
        result = result * 59 + ($workerConfig == null ? 43 : $workerConfig.hashCode());
        RuntimeFactory $runtimeFactory = this.getRuntimeFactory();
        result = result * 59 + ($runtimeFactory == null ? 43 : $runtimeFactory.hashCode());
        Namespace $dlogNamespace = this.getDlogNamespace();
        result = result * 59 + ($dlogNamespace == null ? 43 : $dlogNamespace.hashCode());
        ConnectorsManager $connectorsManager = this.getConnectorsManager();
        result = result * 59 + ($connectorsManager == null ? 43 : $connectorsManager.hashCode());
        FunctionsManager $functionsManager = this.getFunctionsManager();
        result = result * 59 + ($functionsManager == null ? 43 : $functionsManager.hashCode());
        PulsarAdmin $pulsarAdmin = this.getPulsarAdmin();
        result = result * 59 + ($pulsarAdmin == null ? 43 : $pulsarAdmin.hashCode());
        PackageUrlValidator $packageUrlValidator = this.getPackageUrlValidator();
        result = result * 59 + ($packageUrlValidator == null ? 43 : $packageUrlValidator.hashCode());
        return result;
    }

    @Generated
    public String toString() {
        return "FunctionActioner(workerConfig=" + String.valueOf(this.getWorkerConfig()) + ", runtimeFactory=" + String.valueOf(this.getRuntimeFactory()) + ", dlogNamespace=" + String.valueOf(this.getDlogNamespace()) + ", connectorsManager=" + String.valueOf(this.getConnectorsManager()) + ", functionsManager=" + String.valueOf(this.getFunctionsManager()) + ", pulsarAdmin=" + String.valueOf(this.getPulsarAdmin()) + ", packageUrlValidator=" + String.valueOf(this.getPackageUrlValidator()) + ")";
    }
}

