/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker.rest.api;

import com.google.common.base.Utf8;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.Base64;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriBuilder;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationParameters;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.FunctionsImpl;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionState;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl;
import org.apache.pulsar.common.policies.data.FunctionStatsImpl;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.functions.api.state.StateValue;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.instance.state.DefaultStateStore;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.utils.ComponentTypeUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
import org.apache.pulsar.functions.utils.ValidatableFunctionPackage;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.rest.RestUtils;
import org.apache.pulsar.functions.worker.service.api.Component;
import org.apache.pulsar.packages.management.core.common.PackageMetadata;
import org.apache.pulsar.packages.management.core.common.PackageName;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ComponentImpl
implements Component<PulsarWorkerService> {
    private static final Logger log = LoggerFactory.getLogger(ComponentImpl.class);
    protected final Supplier<PulsarWorkerService> workerServiceSupplier;
    protected final Function.FunctionDetails.ComponentType componentType;

    public ComponentImpl(Supplier<PulsarWorkerService> workerServiceSupplier, Function.FunctionDetails.ComponentType componentType) {
        this.workerServiceSupplier = workerServiceSupplier;
        this.componentType = componentType;
    }

    @Override
    public PulsarWorkerService worker() {
        try {
            return Objects.requireNonNull(this.workerServiceSupplier.get());
        }
        catch (Throwable t) {
            log.info("Failed to get worker service", t);
            throw t;
        }
    }

    boolean isWorkerServiceAvailable() {
        WorkerService workerService = this.workerServiceSupplier.get();
        if (workerService == null) {
            return false;
        }
        return workerService.isInitialized();
    }

    Function.PackageLocationMetaData.Builder getFunctionPackageLocation(Function.FunctionMetaData functionMetaData, String functionPkgUrl, FormDataContentDisposition fileDetail, File uploadedInputStreamAsFile) throws Exception {
        return this.getFunctionPackageLocation(functionMetaData, functionPkgUrl, fileDetail, uploadedInputStreamAsFile, functionMetaData.getFunctionDetails().getName(), this.componentType, this.getFunctionCodeBuiltin(functionMetaData.getFunctionDetails(), this.componentType));
    }

    Function.PackageLocationMetaData.Builder getFunctionPackageLocation(Function.FunctionMetaData functionMetaData, String functionPkgUrl, FormDataContentDisposition fileDetail, File uploadedInputStreamAsFile, String componentName, Function.FunctionDetails.ComponentType componentType, String builtin) throws Exception {
        Function.FunctionDetails functionDetails = functionMetaData.getFunctionDetails();
        String tenant = functionDetails.getTenant();
        String namespace = functionDetails.getNamespace();
        Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder = Function.PackageLocationMetaData.newBuilder();
        boolean isPkgUrlProvided = StringUtils.isNotBlank((CharSequence)functionPkgUrl);
        boolean isPackageManagementEnabled = this.worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement();
        PackageName packageName = PackageName.get((String)componentType.name(), (String)tenant, (String)namespace, (String)componentName, (String)String.valueOf(functionMetaData.getVersion()));
        PackageMetadata metadata = PackageMetadata.builder().createTime(functionMetaData.getCreateTime()).build();
        if (this.worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
            if (!StringUtils.isEmpty((CharSequence)builtin)) {
                File component = switch (componentType) {
                    case Function.FunctionDetails.ComponentType.SOURCE -> this.worker().getConnectorsManager().getSourceArchive(builtin).toFile();
                    case Function.FunctionDetails.ComponentType.SINK -> this.worker().getConnectorsManager().getSinkArchive(builtin).toFile();
                    default -> this.worker().getFunctionsManager().getFunctionArchive(builtin).toFile();
                };
                packageLocationMetaDataBuilder.setOriginalFileName(component.getName());
                if (this.worker().getWorkerConfig().getUploadBuiltinSinksSources().booleanValue()) {
                    packageLocationMetaDataBuilder.setPackagePath(ComponentImpl.createPackagePath(tenant, namespace, componentName, component.getName()));
                    if (isPackageManagementEnabled) {
                        packageLocationMetaDataBuilder.setPackagePath(packageName.toString());
                        this.worker().getBrokerAdmin().packages().upload(metadata, packageName.toString(), component.getAbsolutePath());
                    } else {
                        packageLocationMetaDataBuilder.setPackagePath(ComponentImpl.createPackagePath(tenant, namespace, componentName, component.getName()));
                        WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), component, this.worker().getDlogNamespace());
                    }
                    log.info("Uploading {} package to {}", (Object)ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)componentType), (Object)packageLocationMetaDataBuilder.getPackagePath());
                } else {
                    log.info("Skipping upload for the built-in package {}", (Object)ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)componentType));
                    packageLocationMetaDataBuilder.setPackagePath("builtin://" + builtin);
                }
            } else if (isPkgUrlProvided) {
                packageLocationMetaDataBuilder.setOriginalFileName(uploadedInputStreamAsFile.getName());
                if (isPackageManagementEnabled) {
                    packageLocationMetaDataBuilder.setPackagePath(functionPkgUrl);
                    if (!Utils.hasPackageTypePrefix((String)functionPkgUrl)) {
                        packageLocationMetaDataBuilder.setPackagePath(packageName.toString());
                        this.worker().getBrokerAdmin().packages().upload(metadata, packageName.toString(), uploadedInputStreamAsFile.getAbsolutePath());
                    }
                } else {
                    packageLocationMetaDataBuilder.setPackagePath(ComponentImpl.createPackagePath(tenant, namespace, componentName, uploadedInputStreamAsFile.getName()));
                    WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, this.worker().getDlogNamespace());
                }
                log.info("Uploading {} package to {}", (Object)ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)componentType), (Object)packageLocationMetaDataBuilder.getPackagePath());
            } else if (functionMetaData.getPackageLocation().getPackagePath().startsWith("http") || functionMetaData.getPackageLocation().getPackagePath().startsWith("file")) {
                String fileName = new File(new URL(functionMetaData.getPackageLocation().getPackagePath()).toURI()).getName();
                packageLocationMetaDataBuilder.setOriginalFileName(fileName);
                if (isPackageManagementEnabled) {
                    packageLocationMetaDataBuilder.setPackagePath(packageName.toString());
                    this.worker().getBrokerAdmin().packages().upload(metadata, packageName.toString(), uploadedInputStreamAsFile.getAbsolutePath());
                } else {
                    packageLocationMetaDataBuilder.setPackagePath(ComponentImpl.createPackagePath(tenant, namespace, componentName, fileName));
                    WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, this.worker().getDlogNamespace());
                }
                log.info("Uploading {} package to {}", (Object)ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)componentType), (Object)packageLocationMetaDataBuilder.getPackagePath());
            } else {
                packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
                if (isPackageManagementEnabled) {
                    packageLocationMetaDataBuilder.setPackagePath(packageName.toString());
                    this.worker().getBrokerAdmin().packages().upload(metadata, packageName.toString(), uploadedInputStreamAsFile.getAbsolutePath());
                } else {
                    packageLocationMetaDataBuilder.setPackagePath(ComponentImpl.createPackagePath(tenant, namespace, componentName, fileDetail.getFileName()));
                    WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, this.worker().getDlogNamespace());
                }
                log.info("Uploading {} package to {}", (Object)ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)componentType), (Object)packageLocationMetaDataBuilder.getPackagePath());
            }
        } else if (!StringUtils.isEmpty((CharSequence)builtin)) {
            packageLocationMetaDataBuilder.setPackagePath("builtin://" + builtin);
        } else if (isPkgUrlProvided) {
            packageLocationMetaDataBuilder.setPackagePath(functionPkgUrl);
        } else if (functionMetaData.getPackageLocation().getPackagePath().startsWith("http") || functionMetaData.getPackageLocation().getPackagePath().startsWith("file")) {
            packageLocationMetaDataBuilder.setPackagePath(functionMetaData.getPackageLocation().getPackagePath());
        } else {
            packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
            if (isPackageManagementEnabled) {
                packageLocationMetaDataBuilder.setPackagePath(packageName.toString());
                this.worker().getBrokerAdmin().packages().upload(metadata, packageName.toString(), uploadedInputStreamAsFile.getAbsolutePath());
            } else {
                packageLocationMetaDataBuilder.setPackagePath(ComponentImpl.createPackagePath(tenant, namespace, componentName, fileDetail.getFileName()));
                WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), uploadedInputStreamAsFile, this.worker().getDlogNamespace());
            }
            log.info("Uploading {} package to {}", (Object)ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)componentType), (Object)packageLocationMetaDataBuilder.getPackagePath());
        }
        return packageLocationMetaDataBuilder;
    }

    @Override
    public void deregisterFunction(String tenant, String namespace, String componentName, AuthenticationParameters authParams) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        this.throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, componentName, "deregister", authParams);
        try {
            this.validateDeregisterRequestParams(tenant, namespace, componentName, this.componentType);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid deregister {} request @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.error("{} to deregister does not exist @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!InstanceUtils.calculateSubjectType((Function.FunctionDetails)functionMetaData.getFunctionDetails()).equals((Object)this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType)});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), componentName));
        }
        Function.FunctionMetaData newVersionedMetaData = FunctionMetaDataUtils.incrMetadataVersion((Function.FunctionMetaData)functionMetaData, (Function.FunctionMetaData)functionMetaData);
        this.internalProcessFunctionRequest(newVersionedMetaData.getFunctionDetails().getTenant(), newVersionedMetaData.getFunctionDetails().getNamespace(), newVersionedMetaData.getFunctionDetails().getName(), newVersionedMetaData, true, String.format("Error deleting %s @ /%s/%s/%s", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, componentName));
        this.deleteComponentFromStorage(tenant, namespace, componentName, functionMetaData.getPackageLocation().getPackagePath());
        if (!StringUtils.isEmpty((CharSequence)functionMetaData.getTransformFunctionPackageLocation().getPackagePath())) {
            this.deleteComponentFromStorage(tenant, namespace, componentName, functionMetaData.getTransformFunctionPackageLocation().getPackagePath());
        }
        if (this.worker().getStateStoreProvider() != null) {
            try {
                this.worker().getStateStoreProvider().cleanUp(tenant, namespace, componentName);
            }
            catch (Throwable e) {
                log.error("failed to clean up the state store for {}/{}/{}", new Object[]{tenant, namespace, componentName, e});
            }
        }
    }

    private void deleteComponentFromStorage(String tenant, String namespace, String componentName, String functionPackagePath) {
        if (!(functionPackagePath.startsWith("http") || functionPackagePath.startsWith("file") || functionPackagePath.startsWith("builtin") || this.worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement())) {
            try {
                WorkerUtils.deleteFromBookkeeper(this.worker().getDlogNamespace(), functionPackagePath);
            }
            catch (IOException e) {
                log.error("{}/{}/{} Failed to cleanup package in BK with path {}", new Object[]{tenant, namespace, componentName, functionPackagePath, e});
            }
        }
    }

    @Override
    public FunctionConfig getFunctionInfo(String tenant, String namespace, String componentName, AuthenticationParameters authParams) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        this.throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, componentName, "get", authParams);
        try {
            this.validateGetFunctionRequestParams(tenant, namespace, componentName, this.componentType);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid get {} request @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.error("{} does not exist @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format(ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType) + " %s doesn't exist", componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!InstanceUtils.calculateSubjectType((Function.FunctionDetails)functionMetaData.getFunctionDetails()).equals((Object)this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType)});
            throw new RestException(Response.Status.NOT_FOUND, String.format(ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType) + " %s doesn't exist", componentName));
        }
        return FunctionConfigUtils.convertFromDetails((Function.FunctionDetails)functionMetaData.getFunctionDetails());
    }

    @Override
    public void stopFunctionInstance(String tenant, String namespace, String componentName, String instanceId, URI uri, AuthenticationParameters authParams) {
        this.changeFunctionInstanceStatus(tenant, namespace, componentName, instanceId, false, uri, authParams);
    }

    @Override
    public void startFunctionInstance(String tenant, String namespace, String componentName, String instanceId, URI uri, AuthenticationParameters authParams) {
        this.changeFunctionInstanceStatus(tenant, namespace, componentName, instanceId, true, uri, authParams);
    }

    @Deprecated
    public void changeFunctionInstanceStatus(String tenant, String namespace, String componentName, String instanceId, boolean start, URI uri, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
        AuthenticationParameters authParams = AuthenticationParameters.builder().clientRole(clientRole).clientAuthenticationDataSource(clientAuthenticationDataHttps).build();
        this.changeFunctionInstanceStatus(tenant, namespace, componentName, instanceId, start, uri, authParams);
    }

    public void changeFunctionInstanceStatus(String tenant, String namespace, String componentName, String instanceId, boolean start, URI uri, AuthenticationParameters authParams) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        this.throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, componentName, "start/stop", authParams);
        try {
            this.validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, this.componentType, instanceId);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid start/stop {} request @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.error("{} does not exist @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!InstanceUtils.calculateSubjectType((Function.FunctionDetails)functionMetaData.getFunctionDetails()).equals((Object)this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType)});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), componentName));
        }
        if (!FunctionMetaDataUtils.canChangeState((Function.FunctionMetaData)functionMetaData, (int)Integer.parseInt(instanceId), (Function.FunctionState)(start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED))) {
            log.error("Operation not permitted on {}/{}/{}", new Object[]{tenant, namespace, componentName});
            throw new RestException(Response.Status.BAD_REQUEST, "Operation not permitted");
        }
        Function.FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus((Function.FunctionMetaData)functionMetaData, (Integer)Integer.parseInt(instanceId), (boolean)start);
        this.internalProcessFunctionRequest(tenant, namespace, componentName, newFunctionMetaData, false, String.format("Failed to start/stop %s: %s/%s/%s/%s", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, componentName, instanceId));
    }

    @Override
    public void restartFunctionInstance(String tenant, String namespace, String componentName, String instanceId, URI uri, AuthenticationParameters authParams) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        this.throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, componentName, "restart", authParams);
        try {
            this.validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, this.componentType, instanceId);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid restart {} request @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.error("{} does not exist @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!InstanceUtils.calculateSubjectType((Function.FunctionDetails)functionMetaData.getFunctionDetails()).equals((Object)this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType)});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), componentName));
        }
        FunctionRuntimeManager functionRuntimeManager = this.worker().getFunctionRuntimeManager();
        try {
            functionRuntimeManager.restartFunctionInstance(tenant, namespace, componentName, Integer.parseInt(instanceId), uri);
        }
        catch (WebApplicationException we) {
            throw we;
        }
        catch (Exception e) {
            log.error("Failed to restart {}: {}/{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, componentName, instanceId, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    @Override
    public void stopFunctionInstances(String tenant, String namespace, String componentName, AuthenticationParameters authParams) {
        this.changeFunctionStatusAllInstances(tenant, namespace, componentName, false, authParams);
    }

    @Override
    public void startFunctionInstances(String tenant, String namespace, String componentName, AuthenticationParameters authParams) {
        this.changeFunctionStatusAllInstances(tenant, namespace, componentName, true, authParams);
    }

    @Deprecated
    public void changeFunctionStatusAllInstances(String tenant, String namespace, String componentName, boolean start, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
        AuthenticationParameters authParams = AuthenticationParameters.builder().clientRole(clientRole).clientAuthenticationDataSource(clientAuthenticationDataHttps).build();
        this.changeFunctionStatusAllInstances(tenant, namespace, componentName, start, authParams);
    }

    public void changeFunctionStatusAllInstances(String tenant, String namespace, String componentName, boolean start, AuthenticationParameters authParams) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        this.throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, componentName, "start/stop", authParams);
        try {
            this.validateGetFunctionRequestParams(tenant, namespace, componentName, this.componentType);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid start/stop {} request @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.warn("{} in stopFunctionInstances does not exist @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!InstanceUtils.calculateSubjectType((Function.FunctionDetails)functionMetaData.getFunctionDetails()).equals((Object)this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType)});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), componentName));
        }
        if (!FunctionMetaDataUtils.canChangeState((Function.FunctionMetaData)functionMetaData, (int)-1, (Function.FunctionState)(start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED))) {
            log.error("Operation not permitted on {}/{}/{}", new Object[]{tenant, namespace, componentName});
            throw new RestException(Response.Status.BAD_REQUEST, "Operation not permitted");
        }
        Function.FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus((Function.FunctionMetaData)functionMetaData, (Integer)-1, (boolean)start);
        this.internalProcessFunctionRequest(tenant, namespace, componentName, newFunctionMetaData, false, String.format("Failed to start/stop %s: %s/%s/%s", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, componentName));
    }

    @Override
    public void restartFunctionInstances(String tenant, String namespace, String componentName, AuthenticationParameters authParams) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        this.throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, componentName, "restart", authParams);
        try {
            this.validateGetFunctionRequestParams(tenant, namespace, componentName, this.componentType);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid restart {} request @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.warn("{} in stopFunctionInstances does not exist @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!InstanceUtils.calculateSubjectType((Function.FunctionDetails)functionMetaData.getFunctionDetails()).equals((Object)this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType)});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), componentName));
        }
        FunctionRuntimeManager functionRuntimeManager = this.worker().getFunctionRuntimeManager();
        try {
            functionRuntimeManager.restartFunctionInstances(tenant, namespace, componentName);
        }
        catch (WebApplicationException we) {
            throw we;
        }
        catch (Exception e) {
            log.error("Failed to restart {}: {}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    @Override
    public FunctionStatsImpl getFunctionStats(String tenant, String namespace, String componentName, URI uri, AuthenticationParameters authParams) {
        FunctionStatsImpl functionStats;
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        this.throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, componentName, "get stats for", authParams);
        try {
            this.validateGetFunctionRequestParams(tenant, namespace, componentName, this.componentType);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid get {} Stats request @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.warn("{} in get {} Stats does not exist @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), this.componentType, tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!InstanceUtils.calculateSubjectType((Function.FunctionDetails)functionMetaData.getFunctionDetails()).equals((Object)this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType)});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), componentName));
        }
        FunctionRuntimeManager functionRuntimeManager = this.worker().getFunctionRuntimeManager();
        try {
            functionStats = functionRuntimeManager.getFunctionStats(tenant, namespace, componentName, uri);
        }
        catch (WebApplicationException we) {
            throw we;
        }
        catch (Exception e) {
            log.error("{}/{}/{} Got Exception Getting Stats", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        return functionStats;
    }

    @Override
    public FunctionInstanceStatsDataImpl getFunctionsInstanceStats(String tenant, String namespace, String componentName, String instanceId, URI uri, AuthenticationParameters authParams) {
        FunctionInstanceStatsDataImpl functionInstanceStatsData;
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        this.throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, componentName, "get stats for", authParams);
        try {
            this.validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, this.componentType, instanceId);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid get {} Stats request @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.warn("{} in get {} Stats does not exist @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), this.componentType, tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!InstanceUtils.calculateSubjectType((Function.FunctionDetails)functionMetaData.getFunctionDetails()).equals((Object)this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType)});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), componentName));
        }
        int instanceIdInt = Integer.parseInt(instanceId);
        if (instanceIdInt < 0 || instanceIdInt >= functionMetaData.getFunctionDetails().getParallelism()) {
            log.error("instanceId in get {} Stats out of bounds @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, componentName});
            throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s doesn't have instance with id %s", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), componentName, instanceId));
        }
        FunctionRuntimeManager functionRuntimeManager = this.worker().getFunctionRuntimeManager();
        try {
            functionInstanceStatsData = functionRuntimeManager.getFunctionInstanceStats(tenant, namespace, componentName, Integer.parseInt(instanceId), uri);
        }
        catch (WebApplicationException we) {
            throw we;
        }
        catch (Exception e) {
            log.error("{}/{}/{} Got Exception Getting Stats", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        return functionInstanceStatsData;
    }

    @Override
    public List<String> listFunctions(String tenant, String namespace, AuthenticationParameters authParams) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        this.throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, null, "list", authParams);
        try {
            this.validateListFunctionRequestParams(tenant, namespace);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid list {} request @ /{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        Collection<Function.FunctionMetaData> functionStateList = functionMetaDataManager.listFunctions(tenant, namespace);
        LinkedList<String> retVals = new LinkedList<String>();
        for (Function.FunctionMetaData functionMetaData : functionStateList) {
            if (!InstanceUtils.calculateSubjectType((Function.FunctionDetails)functionMetaData.getFunctionDetails()).equals((Object)this.componentType)) continue;
            retVals.add(functionMetaData.getFunctionDetails().getName());
        }
        return retVals;
    }

    void updateRequest(Function.FunctionMetaData existingFunctionMetaData, Function.FunctionMetaData functionMetaData) {
        Function.FunctionMetaData updatedVersionMetaData = FunctionMetaDataUtils.incrMetadataVersion((Function.FunctionMetaData)existingFunctionMetaData, (Function.FunctionMetaData)functionMetaData);
        this.internalProcessFunctionRequest(updatedVersionMetaData.getFunctionDetails().getTenant(), updatedVersionMetaData.getFunctionDetails().getNamespace(), updatedVersionMetaData.getFunctionDetails().getName(), updatedVersionMetaData, false, "Update Failed");
    }

    @Override
    public List<ConnectorDefinition> getListOfConnectors() {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        return this.worker().getConnectorsManager().getConnectorDefinitions();
    }

    @Override
    public void reloadConnectors(AuthenticationParameters authParams) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        if (this.worker().getWorkerConfig().isAuthorizationEnabled() && !this.isSuperUser(authParams)) {
            throw new RestException(Response.Status.UNAUTHORIZED, "This operation requires super-user access");
        }
        try {
            this.worker().getConnectorsManager().reloadConnectors(this.worker().getWorkerConfig());
        }
        catch (IOException e) {
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public String triggerFunction(String tenant, String namespace, String functionName, String input, InputStream uploadedInputStream, String topic, AuthenticationParameters authParams) {
        String inputTopicToWrite;
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        this.throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, functionName, "trigger", authParams);
        try {
            this.validateTriggerRequestParams(tenant, namespace, functionName, topic, input, uploadedInputStream);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid trigger function request @ /{}/{}/{}", new Object[]{tenant, namespace, functionName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
            log.warn("Function in trigger function does not exist @ /{}/{}/{}", new Object[]{tenant, namespace, functionName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("Function %s doesn't exist", functionName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName);
        if (topic != null) {
            inputTopicToWrite = topic;
        } else {
            if (functionMetaData.getFunctionDetails().getSource().getInputSpecsCount() != 1) {
                log.error("Function in trigger function has more than 1 input topics @ /{}/{}/{}", new Object[]{tenant, namespace, functionName});
                throw new RestException(Response.Status.BAD_REQUEST, "Function in trigger function has more than 1 input topics");
            }
            inputTopicToWrite = (String)functionMetaData.getFunctionDetails().getSource().getInputSpecsMap().keySet().iterator().next();
        }
        if (functionMetaData.getFunctionDetails().getSource().getInputSpecsCount() == 0 || !functionMetaData.getFunctionDetails().getSource().getInputSpecsMap().containsKey(inputTopicToWrite)) {
            log.error("Function in trigger function has unidentified topic @ /{}/{}/{} {}", new Object[]{tenant, namespace, functionName, inputTopicToWrite});
            throw new RestException(Response.Status.BAD_REQUEST, "Function in trigger function has unidentified topic");
        }
        try {
            this.worker().getBrokerAdmin().topics().getSubscriptions(inputTopicToWrite);
        }
        catch (PulsarAdminException e) {
            log.error("Function in trigger function is not ready @ /{}/{}/{}", new Object[]{tenant, namespace, functionName, e});
            throw new RestException(Response.Status.BAD_REQUEST, "Function in trigger function is not ready");
        }
        String outputTopic = functionMetaData.getFunctionDetails().getSink().getTopic();
        Reader reader = null;
        Producer producer = null;
        try {
            byte[] targetArray;
            if (!StringUtils.isEmpty((CharSequence)outputTopic)) {
                reader = this.worker().getClient().newReader().topic(outputTopic).startMessageId(MessageId.latest).readerName(this.worker().getWorkerConfig().getWorkerId() + "-trigger-" + FunctionCommon.getFullyQualifiedName((String)tenant, (String)namespace, (String)functionName)).create();
            }
            producer = this.worker().getClient().newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(inputTopicToWrite).producerName(this.worker().getWorkerConfig().getWorkerId() + "-trigger-" + FunctionCommon.getFullyQualifiedName((String)tenant, (String)namespace, (String)functionName)).create();
            if (uploadedInputStream != null) {
                targetArray = new byte[uploadedInputStream.available()];
                uploadedInputStream.read(targetArray);
            } else {
                targetArray = input.getBytes();
            }
            MessageId msgId = producer.send((Object)targetArray);
            if (reader == null) {
                String string = null;
                return string;
            }
            long curTime = System.currentTimeMillis();
            long maxTime = curTime + 1000L;
            while (curTime < maxTime) {
                MessageId newMsgId;
                Message msg = reader.readNext(10000, TimeUnit.MILLISECONDS);
                if (msg == null) {
                    throw new RestException(Response.Status.REQUEST_TIMEOUT, "Request Timed Out");
                }
                if (msg.getProperties().containsKey("__pfn_input_msg_id__") && msg.getProperties().containsKey("__pfn_input_topic__") && msgId.equals(newMsgId = MessageId.fromByteArray((byte[])Base64.getDecoder().decode((String)msg.getProperties().get("__pfn_input_msg_id__")))) && msg.getProperties().get("__pfn_input_topic__").equals(TopicName.get((String)inputTopicToWrite).toString())) {
                    String string = new String(msg.getData());
                    return string;
                }
                curTime = System.currentTimeMillis();
            }
            throw new RestException(Response.Status.REQUEST_TIMEOUT, "Request Timed Out");
        }
        catch (SchemaSerializationException e) {
            throw new RestException(Response.Status.BAD_REQUEST, String.format("Failed to serialize input with error: %s. Please checkif input data conforms with the schema of the input topic.", e.getMessage()));
        }
        catch (IOException e) {
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        finally {
            if (reader != null) {
                reader.closeAsync();
            }
            if (producer != null) {
                producer.closeAsync();
            }
        }
    }

    @Override
    public FunctionState getFunctionState(String tenant, String namespace, String functionName, String key, AuthenticationParameters authParams) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        this.throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, functionName, "get state for", authParams);
        if (null == this.worker().getStateStoreProvider()) {
            this.throwStateStoreUnvailableResponse();
        }
        try {
            this.validateFunctionStateParams(tenant, namespace, functionName, key);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid getFunctionState request @ /{}/{}/{}/{}", new Object[]{tenant, namespace, functionName, key, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
            log.warn("getFunctionState does not exist @ /{}/{}/{}", new Object[]{tenant, namespace, functionName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("'%s' is not found", functionName));
        }
        try {
            DefaultStateStore store = (DefaultStateStore)this.worker().getStateStoreProvider().getStateStore(tenant, namespace, functionName);
            StateValue value = store.getStateValue(key);
            if (value == null) {
                throw new RestException(Response.Status.NOT_FOUND, "key '" + key + "' doesn't exist.");
            }
            byte[] data = value.getValue();
            if (data == null) {
                throw new RestException(Response.Status.NOT_FOUND, "key '" + key + "' doesn't exist.");
            }
            ByteBuffer buf = ByteBuffer.wrap(data);
            Long number = null;
            if (buf.remaining() == 8) {
                number = buf.getLong();
            }
            if (Boolean.TRUE.equals(value.getIsNumber())) {
                return new FunctionState(key, null, null, number, value.getVersion());
            }
            if (Utf8.isWellFormed((byte[])data)) {
                return new FunctionState(key, new String(data, StandardCharsets.UTF_8), null, number, value.getVersion());
            }
            return new FunctionState(key, null, data, number, value.getVersion());
        }
        catch (RestException e) {
            throw e;
        }
        catch (Throwable e) {
            log.error("Error while getFunctionState request @ /{}/{}/{}/{}", new Object[]{tenant, namespace, functionName, key, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    @Override
    public void putFunctionState(String tenant, String namespace, String functionName, String key, FunctionState state, AuthenticationParameters authParams) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        if (null == this.worker().getStateStoreProvider()) {
            this.throwStateStoreUnvailableResponse();
        }
        this.throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, functionName, "put state for", authParams);
        if (!key.equals(state.getKey())) {
            log.error("{}/{}/{} Bad putFunction Request, path key doesn't match key in json", new Object[]{tenant, namespace, functionName});
            throw new RestException(Response.Status.BAD_REQUEST, "Path key doesn't match key in json");
        }
        try {
            this.validateFunctionStateParams(tenant, namespace, functionName, key);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid putFunctionState request @ /{}/{}/{}/{}", new Object[]{tenant, namespace, functionName, key, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
            log.warn("putFunctionState does not exist @ /{}/{}/{}", new Object[]{tenant, namespace, functionName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("'%s' is not found", functionName));
        }
        try {
            ByteBuffer data;
            DefaultStateStore store = (DefaultStateStore)this.worker().getStateStoreProvider().getStateStore(tenant, namespace, functionName);
            if (state.getByteValue() == null || state.getByteValue().length == 0) {
                if (state.getStringValue() != null) {
                    data = ByteBuffer.wrap(state.getStringValue().getBytes(StandardCharsets.UTF_8));
                } else {
                    if (state.getNumberValue() == null) throw new IllegalArgumentException("Invalid state value");
                    data = ByteBuffer.allocate(8);
                    data.putLong(state.getNumberValue());
                }
            } else {
                data = ByteBuffer.wrap(state.getByteValue());
            }
            store.put(key, data);
            return;
        }
        catch (Throwable e) {
            log.error("Error while putFunctionState request @ /{}/{}/{}/{}", new Object[]{tenant, namespace, functionName, key, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    @Override
    public void uploadFunction(InputStream uploadedInputStream, String path, AuthenticationParameters authParams) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        if (this.worker().getWorkerConfig().isAuthorizationEnabled() && !this.isSuperUser(authParams)) {
            throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
        }
        try {
            if (uploadedInputStream == null || path == null) {
                throw new IllegalArgumentException("Function Package is not provided " + path);
            }
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid upload function request @ /{}", (Object)path, (Object)e);
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        try {
            log.info("Uploading function package to {}", (Object)path);
            if (this.worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) {
                File tempFile = FunctionCommon.createPkgTempFile();
                tempFile.deleteOnExit();
                FileOutputStream out = new FileOutputStream(tempFile);
                IOUtils.copy((InputStream)uploadedInputStream, (OutputStream)out);
                PackageMetadata metadata = PackageMetadata.builder().createTime(System.currentTimeMillis()).build();
                this.worker().getBrokerAdmin().packages().upload(metadata, path, tempFile.getAbsolutePath());
            } else {
                WorkerUtils.uploadToBookKeeper(this.worker().getDlogNamespace(), uploadedInputStream, path);
            }
        }
        catch (IOException | PulsarAdminException e) {
            log.error("Error uploading file {}", (Object)path, (Object)e);
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    @Override
    public StreamingOutput downloadFunction(String tenant, String namespace, String componentName, AuthenticationParameters authParams, boolean transformFunction) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        this.throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, componentName, "download package for", authParams);
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.error("{} does not exist @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        String pkgPath = transformFunction ? functionMetaData.getTransformFunctionPackageLocation().getPackagePath() : functionMetaData.getPackageLocation().getPackagePath();
        Function.FunctionDetails.ComponentType componentType = transformFunction ? Function.FunctionDetails.ComponentType.FUNCTION : InstanceUtils.calculateSubjectType((Function.FunctionDetails)functionMetaData.getFunctionDetails());
        return this.getStreamingOutput(pkgPath, componentType);
    }

    private StreamingOutput getStreamingOutput(String pkgPath) {
        return this.getStreamingOutput(pkgPath, null);
    }

    private StreamingOutput getStreamingOutput(String pkgPath, Function.FunctionDetails.ComponentType componentType) {
        return output -> {
            if (pkgPath.startsWith("http")) {
                if (!this.worker().getPackageUrlValidator().isValidPackageUrl(componentType, pkgPath)) {
                    throw new IllegalArgumentException("Invalid package url: " + pkgPath);
                }
                URL url = URI.create(pkgPath).toURL();
                try (InputStream inputStream = url.openStream();){
                    IOUtils.copy((InputStream)inputStream, (OutputStream)output);
                }
            }
            if (pkgPath.startsWith("file")) {
                if (!this.worker().getPackageUrlValidator().isValidPackageUrl(componentType, pkgPath)) {
                    throw new IllegalArgumentException("Invalid package url: " + pkgPath);
                }
                URI url = URI.create(pkgPath);
                File file = new File(url.getPath());
                Files.copy(file.toPath(), output);
            } else {
                if (pkgPath.startsWith("builtin") && !this.worker().getWorkerConfig().getUploadBuiltinSinksSources().booleanValue()) {
                    Path narPath = this.getBuiltinArchivePath(pkgPath, componentType);
                    log.info("Loading {} from {}", (Object)pkgPath, (Object)narPath);
                    try (FileInputStream in = new FileInputStream(narPath.toString());){
                        IOUtils.copy((InputStream)in, (OutputStream)output, (int)1024);
                        output.flush();
                    }
                }
                if (this.worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) {
                    try {
                        File file = ComponentImpl.downloadPackageFile(this.worker(), pkgPath);
                        try (FileInputStream in = new FileInputStream(file);){
                            IOUtils.copy((InputStream)in, (OutputStream)output, (int)1024);
                            output.flush();
                        }
                    }
                    catch (Exception e) {
                        log.error("Failed download package {} from packageManagement Service", (Object)pkgPath, (Object)e);
                    }
                } else {
                    WorkerUtils.downloadFromBookkeeper(this.worker().getDlogNamespace(), output, pkgPath);
                }
            }
        };
    }

    private Path getBuiltinArchivePath(String pkgPath, Function.FunctionDetails.ComponentType componentType) {
        FunctionArchive function;
        String type = pkgPath.replaceFirst("^builtin://", "");
        if (!Function.FunctionDetails.ComponentType.FUNCTION.equals((Object)componentType)) {
            Connector connector = this.worker().getConnectorsManager().getConnector(type);
            if (connector != null) {
                return connector.getArchivePath();
            }
            if (componentType != null) {
                throw new IllegalStateException("Didn't find " + type + " in built-in connectors");
            }
        }
        if ((function = this.worker().getFunctionsManager().getFunction(type)) != null) {
            return function.getArchivePath();
        }
        if (componentType != null) {
            throw new IllegalStateException("Didn't find " + type + " in built-in functions");
        }
        throw new IllegalStateException("Didn't find " + type + " in built-in connectors or functions");
    }

    @Override
    public StreamingOutput downloadFunction(String path, AuthenticationParameters authParams) {
        if (!this.isWorkerServiceAvailable()) {
            RestUtils.throwUnavailableException();
        }
        if (this.worker().getWorkerConfig().isAuthorizationEnabled()) {
            String[] tokens = path.split("/");
            if (tokens.length == 4) {
                String tenant = tokens[0];
                String namespace = tokens[1];
                String componentName = tokens[2];
                this.throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, componentName, "download package for", authParams);
            } else if (!this.isSuperUser(authParams)) {
                throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
            }
        }
        return this.getStreamingOutput(path);
    }

    private void validateListFunctionRequestParams(String tenant, String namespace) throws IllegalArgumentException {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
    }

    protected void validateGetFunctionInstanceRequestParams(String tenant, String namespace, String componentName, Function.FunctionDetails.ComponentType componentType, String instanceId) throws IllegalArgumentException {
        this.validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
        if (instanceId == null) {
            throw new IllegalArgumentException(String.format("%s Instance Id is not provided", componentType));
        }
    }

    protected void validateGetFunctionRequestParams(String tenant, String namespace, String subject, Function.FunctionDetails.ComponentType componentType) throws IllegalArgumentException {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (subject == null) {
            throw new IllegalArgumentException(ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)componentType) + " name is not provided");
        }
    }

    private void validateDeregisterRequestParams(String tenant, String namespace, String subject, Function.FunctionDetails.ComponentType componentType) throws IllegalArgumentException {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (subject == null) {
            throw new IllegalArgumentException(ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)componentType) + " name is not provided");
        }
    }

    private void validateFunctionStateParams(String tenant, String namespace, String functionName, String key) throws IllegalArgumentException {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (functionName == null) {
            throw new IllegalArgumentException(ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType) + " name is not provided");
        }
        if (key == null) {
            throw new IllegalArgumentException("Key is not provided");
        }
    }

    private String getFunctionCodeBuiltin(Function.FunctionDetails functionDetails, Function.FunctionDetails.ComponentType componentType) {
        Function.SinkSpec sinkSpec;
        Function.SourceSpec sourceSpec;
        if (componentType == Function.FunctionDetails.ComponentType.SOURCE && functionDetails.hasSource() && !StringUtils.isEmpty((CharSequence)(sourceSpec = functionDetails.getSource()).getBuiltin())) {
            return sourceSpec.getBuiltin();
        }
        if (componentType == Function.FunctionDetails.ComponentType.SINK && functionDetails.hasSink() && !StringUtils.isEmpty((CharSequence)(sinkSpec = functionDetails.getSink()).getBuiltin())) {
            return sinkSpec.getBuiltin();
        }
        if (componentType == Function.FunctionDetails.ComponentType.FUNCTION && !StringUtils.isEmpty((CharSequence)functionDetails.getBuiltin())) {
            return functionDetails.getBuiltin();
        }
        return null;
    }

    private void validateTriggerRequestParams(String tenant, String namespace, String functionName, String topic, String input, InputStream uploadedInputStream) {
        if (tenant == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (namespace == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (functionName == null) {
            throw new IllegalArgumentException("Function name is not provided");
        }
        if (uploadedInputStream == null && input == null) {
            throw new IllegalArgumentException("Trigger Data is not provided");
        }
    }

    private void throwStateStoreUnvailableResponse() {
        throw new RestException(Response.Status.SERVICE_UNAVAILABLE, "State storage client is not done initializing. Please try again in a little while.");
    }

    public static String createPackagePath(String tenant, String namespace, String functionName, String fileName) {
        return String.format("%s/%s/%s/%s", tenant, namespace, Codec.encode((String)functionName), FunctionCommon.getUniquePackageName((String)Codec.encode((String)fileName)));
    }

    @Deprecated
    public boolean isAuthorizedRole(String tenant, String namespace, String clientRole, AuthenticationDataSource authenticationData) throws PulsarAdminException {
        AuthenticationParameters authParams = AuthenticationParameters.builder().clientRole(clientRole).clientAuthenticationDataSource(authenticationData).build();
        return this.isAuthorizedRole(tenant, namespace, authParams);
    }

    public boolean isAuthorizedRole(String tenant, String namespace, AuthenticationParameters authParams) throws PulsarAdminException {
        if (this.worker().getWorkerConfig().isAuthorizationEnabled()) {
            return this.allowFunctionOps(NamespaceName.get((String)tenant, (String)namespace), authParams);
        }
        return true;
    }

    public void throwRestExceptionIfUnauthorizedForNamespace(String tenant, String namespace, String componentName, String action, AuthenticationParameters authParams) {
        try {
            if (!this.isAuthorizedRole(tenant, namespace, authParams)) {
                log.warn("{}/{}/{} Client with role [{}] and originalPrincipal [{}] is not authorized to {} {}", new Object[]{tenant, namespace, componentName, authParams.getClientRole(), authParams.getOriginalPrincipal(), action, ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType)});
                throw new RestException(Response.Status.UNAUTHORIZED, "Client is not authorized to perform operation");
            }
        }
        catch (PulsarAdminException e) {
            log.error("{}/{}/{} Failed to authorize [{}]", new Object[]{tenant, namespace, componentName, e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    @Deprecated
    protected void componentStatusRequestValidate(String tenant, String namespace, String componentName, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
        AuthenticationParameters authParams = AuthenticationParameters.builder().clientRole(clientRole).clientAuthenticationDataSource(clientAuthenticationDataHttps).build();
        this.componentStatusRequestValidate(tenant, namespace, componentName, authParams);
    }

    protected void componentStatusRequestValidate(String tenant, String namespace, String componentName, AuthenticationParameters authParams) {
        if (!this.isWorkerServiceAvailable()) {
            throw new RestException(Response.Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
        }
        this.throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, componentName, "get status for", authParams);
        try {
            this.validateGetFunctionRequestParams(tenant, namespace, componentName, this.componentType);
        }
        catch (IllegalArgumentException e) {
            log.error("Invalid get {} Status request @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, componentName, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
            log.warn("{} in get {} Status does not exist @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), this.componentType, tenant, namespace, componentName});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), componentName));
        }
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        if (!InstanceUtils.calculateSubjectType((Function.FunctionDetails)functionMetaData.getFunctionDetails()).equals((Object)this.componentType)) {
            log.error("{}/{}/{} is not a {}", new Object[]{tenant, namespace, componentName, ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType)});
            throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), componentName));
        }
    }

    @Deprecated
    protected void componentInstanceStatusRequestValidate(String tenant, String namespace, String componentName, int instanceId, String clientRole, AuthenticationDataSource clientAuthenticationDataHttps) {
        AuthenticationParameters authParams = AuthenticationParameters.builder().clientRole(clientRole).clientAuthenticationDataSource(clientAuthenticationDataHttps).build();
        this.componentInstanceStatusRequestValidate(tenant, namespace, componentName, instanceId, authParams);
    }

    protected void componentInstanceStatusRequestValidate(String tenant, String namespace, String componentName, int instanceId, AuthenticationParameters authParams) {
        this.componentStatusRequestValidate(tenant, namespace, componentName, authParams);
        FunctionMetaDataManager functionMetaDataManager = this.worker().getFunctionMetaDataManager();
        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
        int parallelism = functionMetaData.getFunctionDetails().getParallelism();
        if (instanceId < 0 || instanceId >= parallelism) {
            log.error("instanceId in get {} Status out of bounds @ /{}/{}/{}", new Object[]{ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), tenant, namespace, componentName});
            throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s doesn't have instance with id %s", ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)this.componentType), componentName, instanceId));
        }
    }

    public boolean isSuperUser(AuthenticationParameters authParams) {
        if (authParams.getClientRole() != null) {
            try {
                return (Boolean)this.worker().getAuthorizationService().isSuperUser(authParams).get(this.worker().getWorkerConfig().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                log.warn("Time-out {} sec while checking the role {} originalPrincipal {} is a super user role ", new Object[]{this.worker().getWorkerConfig().getMetadataStoreOperationTimeoutSeconds(), authParams.getClientRole(), authParams.getOriginalPrincipal()});
                throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
            }
            catch (Exception e) {
                log.warn("Failed verifying role {} originalPrincipal {} is a super user role", new Object[]{authParams.getClientRole(), authParams.getOriginalPrincipal(), e});
                throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
            }
        }
        return false;
    }

    @Deprecated
    public boolean isSuperUser(String clientRole, AuthenticationDataSource authenticationData) {
        AuthenticationParameters authParams = AuthenticationParameters.builder().clientRole(clientRole).clientAuthenticationDataSource(authenticationData).build();
        return this.isSuperUser(authParams);
    }

    @Deprecated
    public boolean allowFunctionOps(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
        AuthenticationParameters authParams = AuthenticationParameters.builder().clientRole(role).clientAuthenticationDataSource(authenticationData).build();
        return this.allowFunctionOps(namespaceName, authParams);
    }

    public boolean allowFunctionOps(NamespaceName namespaceName, AuthenticationParameters authParams) {
        try {
            switch (this.componentType) {
                case SINK: {
                    return (Boolean)this.worker().getAuthorizationService().allowSinkOpsAsync(namespaceName, authParams).get(this.worker().getWorkerConfig().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
                }
                case SOURCE: {
                    return (Boolean)this.worker().getAuthorizationService().allowSourceOpsAsync(namespaceName, authParams).get(this.worker().getWorkerConfig().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
                }
            }
            return (Boolean)this.worker().getAuthorizationService().allowFunctionOpsAsync(namespaceName, authParams).get(this.worker().getWorkerConfig().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            log.warn("Time-out {} sec while checking function authorization on {} ", (Object)this.worker().getWorkerConfig().getMetadataStoreOperationTimeoutSeconds(), (Object)namespaceName);
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        catch (Exception e) {
            log.warn("Admin-client with Role [{}] originalPrincipal [{}] failed to get function permissions for namespace - {}. {}", new Object[]{authParams.getClientRole(), authParams.getOriginalPrincipal(), namespaceName, e.getMessage(), e});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
    }

    private void internalProcessFunctionRequest(String tenant, String namespace, String functionName, Function.FunctionMetaData functionMetadata, boolean delete, String errorMsg) {
        try {
            if (this.worker().getLeaderService().isLeader()) {
                this.worker().getFunctionMetaDataManager().updateFunctionOnLeader(functionMetadata, delete);
            } else {
                FunctionsImpl functions = (FunctionsImpl)this.worker().getFunctionAdmin().functions();
                functions.updateOnWorkerLeader(tenant, namespace, functionName, functionMetadata.toByteArray(), delete);
            }
        }
        catch (PulsarAdminException e) {
            log.error(errorMsg, (Throwable)e);
            throw new RestException(e.getStatusCode(), e.getMessage());
        }
        catch (IllegalStateException e) {
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
        }
        catch (IllegalArgumentException e) {
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
    }

    static File downloadPackageFile(PulsarWorkerService worker, String packageName) throws IOException, PulsarAdminException {
        Path tempDirectory = worker.getWorkerConfig().getDownloadDirectory() != null ? Paths.get(worker.getWorkerConfig().getDownloadDirectory(), new String[0]) : Paths.get(worker.getWorkerConfig().getNarExtractionDirectory(), new String[0]);
        Files.createDirectories(tempDirectory, new FileAttribute[0]);
        File file = Files.createTempFile(tempDirectory, "function", ".tmp", new FileAttribute[0]).toFile();
        worker.getBrokerAdmin().packages().download(packageName, file.toString());
        return file;
    }

    protected File getPackageFile(Function.FunctionDetails.ComponentType componentType, String functionPkgUrl, String existingPackagePath, InputStream uploadedInputStream) throws IOException, PulsarAdminException {
        File componentPackageFile = null;
        if (StringUtils.isNotBlank((CharSequence)functionPkgUrl)) {
            componentPackageFile = this.getPackageFile(componentType, functionPkgUrl);
        } else if (existingPackagePath.startsWith("file") || existingPackagePath.startsWith("http")) {
            if (!this.worker().getPackageUrlValidator().isValidPackageUrl(componentType, functionPkgUrl)) {
                throw new IllegalArgumentException("Function Package url is not valid.supported url (http/https/file)");
            }
            try {
                componentPackageFile = FunctionCommon.extractFileFromPkgURL((String)existingPackagePath);
            }
            catch (Exception e) {
                throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)componentType), functionPkgUrl));
            }
        } else if (Utils.hasPackageTypePrefix((String)existingPackagePath)) {
            componentPackageFile = this.getPackageFile(componentType, existingPackagePath);
        } else if (uploadedInputStream != null) {
            componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
        } else if (!existingPackagePath.startsWith("builtin")) {
            componentPackageFile = FunctionCommon.createPkgTempFile();
            componentPackageFile.deleteOnExit();
            if (this.worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) {
                this.worker().getBrokerAdmin().packages().download(existingPackagePath, componentPackageFile.getAbsolutePath());
            } else {
                WorkerUtils.downloadFromBookkeeper(this.worker().getDlogNamespace(), componentPackageFile, existingPackagePath);
            }
        }
        return componentPackageFile;
    }

    protected File getPackageFile(Function.FunctionDetails.ComponentType componentType, String functionPkgUrl) throws IOException, PulsarAdminException {
        if (Utils.hasPackageTypePrefix((String)functionPkgUrl)) {
            if (!this.worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) {
                throw new IllegalStateException("Function Package management service is disabled. Please enable it to use " + functionPkgUrl);
            }
            return ComponentImpl.downloadPackageFile(this.worker(), functionPkgUrl);
        }
        if (!this.worker().getPackageUrlValidator().isValidPackageUrl(componentType, functionPkgUrl)) {
            throw new IllegalArgumentException("Function Package url is not valid.supported url (http/https/file)");
        }
        try {
            return FunctionCommon.extractFileFromPkgURL((String)functionPkgUrl);
        }
        catch (Exception e) {
            throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString((Function.FunctionDetails.ComponentType)componentType), functionPkgUrl), e);
        }
    }

    protected ValidatableFunctionPackage getBuiltinFunctionPackage(String archive) {
        if (!StringUtils.isEmpty((CharSequence)archive) && archive.startsWith("builtin")) {
            archive = archive.replaceFirst("^builtin://", "");
            FunctionArchive function = this.worker().getFunctionsManager().getFunction(archive);
            if (function == null) {
                throw new IllegalArgumentException("Built-in " + this.componentType + " is not available");
            }
            return function.getFunctionPackage();
        }
        return null;
    }

    protected abstract class GetStatus<X, T> {
        protected GetStatus() {
        }

        public abstract T notScheduledInstance();

        public abstract T fromFunctionStatusProto(InstanceCommunication.FunctionStatus var1, String var2);

        public abstract T notRunning(String var1, String var2);

        public T getComponentInstanceStatus(String tenant, String namespace, String name, int instanceId, URI uri) {
            String workerId;
            Function.Assignment assignment = ComponentImpl.this.worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged() ? ComponentImpl.this.worker().getFunctionRuntimeManager().findFunctionAssignment(tenant, namespace, name, -1) : ComponentImpl.this.worker().getFunctionRuntimeManager().findFunctionAssignment(tenant, namespace, name, instanceId);
            if (assignment == null) {
                return this.notScheduledInstance();
            }
            String assignedWorkerId = assignment.getWorkerId();
            if (assignedWorkerId.equals(workerId = ComponentImpl.this.worker().getWorkerConfig().getWorkerId())) {
                FunctionRuntimeInfo functionRuntimeInfo = ComponentImpl.this.worker().getFunctionRuntimeManager().getFunctionRuntimeInfo(FunctionCommon.getFullyQualifiedInstanceId((Function.Instance)assignment.getInstance()));
                if (functionRuntimeInfo == null) {
                    return this.notRunning(assignedWorkerId, "");
                }
                RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
                if (runtimeSpawner != null) {
                    try {
                        return this.fromFunctionStatusProto((InstanceCommunication.FunctionStatus)functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus(instanceId).get(), assignedWorkerId);
                    }
                    catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                }
                String message = functionRuntimeInfo.getStartupException() != null ? functionRuntimeInfo.getStartupException().getMessage() : "";
                return this.notRunning(assignedWorkerId, message);
            }
            List<WorkerInfo> workerInfoList = ComponentImpl.this.worker().getMembershipManager().getCurrentMembership();
            WorkerInfo workerInfo = null;
            for (WorkerInfo entry : workerInfoList) {
                if (!assignment.getWorkerId().equals(entry.getWorkerId())) continue;
                workerInfo = entry;
            }
            if (workerInfo == null) {
                return this.notScheduledInstance();
            }
            if (uri == null) {
                throw new WebApplicationException(Response.serverError().status(Response.Status.INTERNAL_SERVER_ERROR).build());
            }
            URI redirect = UriBuilder.fromUri((URI)uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(new Object[0]);
            throw new WebApplicationException(Response.temporaryRedirect((URI)redirect).build());
        }

        public abstract X getStatus(String var1, String var2, String var3, Collection<Function.Assignment> var4, URI var5) throws PulsarAdminException;

        public abstract X getStatusExternal(String var1, String var2, String var3, int var4);

        public abstract X emptyStatus(int var1);

        public X getComponentStatus(String tenant, String namespace, String name, URI uri) {
            Function.FunctionMetaData functionMetaData = ComponentImpl.this.worker().getFunctionMetaDataManager().getFunctionMetaData(tenant, namespace, name);
            Collection<Function.Assignment> assignments = ComponentImpl.this.worker().getFunctionRuntimeManager().findFunctionAssignments(tenant, namespace, name);
            if (ComponentImpl.this.worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) {
                Function.Assignment assignment = assignments.iterator().next();
                boolean isOwner = ComponentImpl.this.worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId());
                if (isOwner) {
                    return this.getStatusExternal(tenant, namespace, name, functionMetaData.getFunctionDetails().getParallelism());
                }
                List<WorkerInfo> workerInfoList = ComponentImpl.this.worker().getMembershipManager().getCurrentMembership();
                WorkerInfo workerInfo = null;
                for (WorkerInfo entry : workerInfoList) {
                    if (!assignment.getWorkerId().equals(entry.getWorkerId())) continue;
                    workerInfo = entry;
                }
                if (workerInfo == null) {
                    return this.emptyStatus(functionMetaData.getFunctionDetails().getParallelism());
                }
                if (uri == null) {
                    throw new WebApplicationException(Response.serverError().status(Response.Status.INTERNAL_SERVER_ERROR).build());
                }
                URI redirect = UriBuilder.fromUri((URI)uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(new Object[0]);
                throw new WebApplicationException(Response.temporaryRedirect((URI)redirect).build());
            }
            try {
                return this.getStatus(tenant, namespace, name, assignments, uri);
            }
            catch (PulsarAdminException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

