From b2f3a9633a8a01c0a2e2fb341321f28f804b582f Mon Sep 17 00:00:00 2001 From: Rui Fu Date: Wed, 6 Sep 2023 11:32:21 +0800 Subject: [PATCH] support very long function/connector name (#687) * support long name * revert webhook changes * remove the label * fix sink & source details not using correct name --- controllers/spec/function.go | 16 ++++++++-------- controllers/spec/sink.go | 8 ++++---- controllers/spec/source.go | 8 ++++---- controllers/spec/utils.go | 36 ++++++++++++++++++++++-------------- 4 files changed, 38 insertions(+), 30 deletions(-) diff --git a/controllers/spec/function.go b/controllers/spec/function.go index d741dac64..30e771fdb 100644 --- a/controllers/spec/function.go +++ b/controllers/spec/function.go @@ -61,7 +61,7 @@ func MakeFunctionStatefulSet(function *v1alpha1.Function) *appsv1.StatefulSet { objectMeta := MakeFunctionObjectMeta(function) return MakeStatefulSet(objectMeta, function.Spec.Replicas, function.Spec.DownloaderImage, makeFunctionContainer(function), makeFilebeatContainer(function.Spec.VolumeMounts, function.Spec.Pod.Env, - function.Name, function.Spec.LogTopic, function.Spec.LogTopicAgent, function.Spec.Pulsar.TLSConfig, + function.Spec.Name, function.Spec.LogTopic, function.Spec.LogTopicAgent, function.Spec.Pulsar.TLSConfig, function.Spec.Pulsar.AuthConfig, function.Spec.Pulsar.PulsarConfig, function.Spec.Pulsar.TLSSecret, function.Spec.Pulsar.AuthSecret, function.Spec.FilebeatImage), makeFunctionVolumes(function, function.Spec.Pulsar.AuthConfig), makeFunctionLabels(function), function.Spec.Pod, @@ -216,25 +216,25 @@ func makeFunctionCommand(function *v1alpha1.Function) []string { if spec.Java.Jar != "" { return MakeJavaFunctionCommand(spec.Java.JarLocation, spec.Java.Jar, spec.Name, spec.ClusterName, - generateJavaLogConfigCommand(function.Spec.Java, function.Spec.LogTopicAgent), - parseJavaLogLevel(function.Spec.Java), + generateJavaLogConfigCommand(spec.Java, spec.LogTopicAgent), + parseJavaLogLevel(spec.Java), generateFunctionDetailsInJSON(function), getDecimalSIMemory(spec.Resources.Requests.Memory()), spec.Java.ExtraDependenciesDir, string(function.UID), spec.Java.JavaOpts, hasPulsarctl, hasWget, spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", - function.Spec.SecretsMap, function.Spec.StateConfig, function.Spec.Pulsar.TLSConfig, - function.Spec.Pulsar.AuthConfig, function.Spec.MaxPendingAsyncRequests, + spec.SecretsMap, spec.StateConfig, spec.Pulsar.TLSConfig, + spec.Pulsar.AuthConfig, spec.MaxPendingAsyncRequests, generateJavaLogConfigFileName(function.Spec.Java)) } } else if spec.Python != nil { if spec.Python.Py != "" { return MakePythonFunctionCommand(spec.Python.PyLocation, spec.Python.Py, spec.Name, spec.ClusterName, - generatePythonLogConfigCommand(function.Name, function.Spec.Python, function.Spec.LogTopicAgent), + generatePythonLogConfigCommand(spec.Name, spec.Python, spec.LogTopicAgent), generateFunctionDetailsInJSON(function), string(function.UID), hasPulsarctl, hasWget, - spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", function.Spec.SecretsMap, - function.Spec.StateConfig, function.Spec.Pulsar.TLSConfig, function.Spec.Pulsar.AuthConfig) + spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", spec.SecretsMap, + spec.StateConfig, spec.Pulsar.TLSConfig, spec.Pulsar.AuthConfig) } } else if spec.Golang != nil { if spec.Golang.Go != "" { diff --git a/controllers/spec/sink.go b/controllers/spec/sink.go index 241b69f95..c729a4c36 100644 --- a/controllers/spec/sink.go +++ b/controllers/spec/sink.go @@ -56,7 +56,7 @@ func MakeSinkService(sink *v1alpha1.Sink) *corev1.Service { func MakeSinkStatefulSet(sink *v1alpha1.Sink) *appsv1.StatefulSet { objectMeta := MakeSinkObjectMeta(sink) return MakeStatefulSet(objectMeta, sink.Spec.Replicas, sink.Spec.DownloaderImage, makeSinkContainer(sink), - makeFilebeatContainer(sink.Spec.VolumeMounts, sink.Spec.Pod.Env, sink.Name, sink.Spec.LogTopic, sink.Spec.LogTopicAgent, + makeFilebeatContainer(sink.Spec.VolumeMounts, sink.Spec.Pod.Env, sink.Spec.Name, sink.Spec.LogTopic, sink.Spec.LogTopicAgent, sink.Spec.Pulsar.TLSConfig, sink.Spec.Pulsar.AuthConfig, sink.Spec.Pulsar.PulsarConfig, sink.Spec.Pulsar.TLSSecret, sink.Spec.Pulsar.AuthSecret, sink.Spec.FilebeatImage), makeSinkVolumes(sink, sink.Spec.Pulsar.AuthConfig), makeSinkLabels(sink), sink.Spec.Pod, *sink.Spec.Pulsar, @@ -209,13 +209,13 @@ func MakeSinkCommand(sink *v1alpha1.Sink) []string { } return MakeJavaFunctionCommand(spec.Java.JarLocation, spec.Java.Jar, spec.Name, spec.ClusterName, - generateJavaLogConfigCommand(sink.Spec.Java, sink.Spec.LogTopicAgent), - parseJavaLogLevel(sink.Spec.Java), + generateJavaLogConfigCommand(spec.Java, spec.LogTopicAgent), + parseJavaLogLevel(spec.Java), generateSinkDetailsInJSON(sink), getDecimalSIMemory(spec.Resources.Requests.Memory()), spec.Java.ExtraDependenciesDir, string(sink.UID), spec.Java.JavaOpts, hasPulsarctl, hasWget, spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", spec.SecretsMap, spec.StateConfig, spec.Pulsar.TLSConfig, spec.Pulsar.AuthConfig, nil, - generateJavaLogConfigFileName(sink.Spec.Java)) + generateJavaLogConfigFileName(spec.Java)) } func generateSinkDetailsInJSON(sink *v1alpha1.Sink) string { diff --git a/controllers/spec/source.go b/controllers/spec/source.go index b62357781..6e26e77bf 100644 --- a/controllers/spec/source.go +++ b/controllers/spec/source.go @@ -57,7 +57,7 @@ func MakeSourceService(source *v1alpha1.Source) *corev1.Service { func MakeSourceStatefulSet(source *v1alpha1.Source) *appsv1.StatefulSet { objectMeta := MakeSourceObjectMeta(source) return MakeStatefulSet(objectMeta, source.Spec.Replicas, source.Spec.DownloaderImage, makeSourceContainer(source), - makeFilebeatContainer(source.Spec.VolumeMounts, source.Spec.Pod.Env, source.Name, source.Spec.LogTopic, source.Spec.LogTopicAgent, + makeFilebeatContainer(source.Spec.VolumeMounts, source.Spec.Pod.Env, source.Spec.Name, source.Spec.LogTopic, source.Spec.LogTopicAgent, source.Spec.Pulsar.TLSConfig, source.Spec.Pulsar.AuthConfig, source.Spec.Pulsar.PulsarConfig, source.Spec.Pulsar.TLSSecret, source.Spec.Pulsar.AuthSecret, source.Spec.FilebeatImage), makeSourceVolumes(source, source.Spec.Pulsar.AuthConfig), makeSourceLabels(source), source.Spec.Pod, *source.Spec.Pulsar, @@ -156,13 +156,13 @@ func makeSourceCommand(source *v1alpha1.Source) []string { } return MakeJavaFunctionCommand(spec.Java.JarLocation, spec.Java.Jar, spec.Name, spec.ClusterName, - generateJavaLogConfigCommand(source.Spec.Java, source.Spec.LogTopicAgent), - parseJavaLogLevel(source.Spec.Java), + generateJavaLogConfigCommand(spec.Java, spec.LogTopicAgent), + parseJavaLogLevel(spec.Java), generateSourceDetailsInJSON(source), getDecimalSIMemory(spec.Resources.Requests.Memory()), spec.Java.ExtraDependenciesDir, string(source.UID), spec.Java.JavaOpts, hasPulsarctl, hasWget, spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", spec.SecretsMap, spec.StateConfig, spec.Pulsar.TLSConfig, spec.Pulsar.AuthConfig, nil, - generateJavaLogConfigFileName(source.Spec.Java)) + generateJavaLogConfigFileName(spec.Java)) } func generateSourceDetailsInJSON(source *v1alpha1.Source) string { diff --git a/controllers/spec/utils.go b/controllers/spec/utils.go index 99d972efd..e8d78fe43 100644 --- a/controllers/spec/utils.go +++ b/controllers/spec/utils.go @@ -41,7 +41,8 @@ func convertFunctionDetails(function *v1alpha1.Function) *proto.FunctionDetails } else if function.Spec.Python != nil { runtime = proto.FunctionDetails_PYTHON } - deadLetterTopic := getDeadLetterTopicOrDefault(function.Spec.DeadLetterTopic, function.Spec.SubscriptionName, function.Spec.Tenant, function.Spec.Namespace, function.Spec.Name, function.Spec.MaxMessageRetry) + deadLetterTopic := getDeadLetterTopicOrDefault(function.Spec.DeadLetterTopic, function.Spec.SubscriptionName, + function.Spec.Tenant, function.Spec.Namespace, function.Spec.Name, function.Spec.MaxMessageRetry) logTopic := function.Spec.LogTopic if function.Spec.LogTopicAgent == v1alpha1.SIDECAR { logTopic = "" @@ -98,7 +99,8 @@ func fetchClassName(function *v1alpha1.Function) string { } func convertGoFunctionConfs(function *v1alpha1.Function) *GoFunctionConf { - deadLetterTopic := getDeadLetterTopicOrDefault(function.Spec.DeadLetterTopic, function.Spec.SubscriptionName, function.Spec.Tenant, function.Spec.Namespace, function.Spec.Name, function.Spec.MaxMessageRetry) + deadLetterTopic := getDeadLetterTopicOrDefault(function.Spec.DeadLetterTopic, function.Spec.SubscriptionName, + function.Spec.Tenant, function.Spec.Namespace, function.Spec.Name, function.Spec.MaxMessageRetry) logTopic := function.Spec.LogTopic if function.Spec.LogTopicAgent == v1alpha1.SIDECAR { logTopic = "" @@ -194,10 +196,11 @@ func generateFunctionInputSpec(function *v1alpha1.Function) *proto.SourceSpec { inputSpecs := generateInputSpec(function.Spec.Input) return &proto.SourceSpec{ - ClassName: "", - Configs: "", - TypeClassName: function.Spec.Input.TypeClassName, - SubscriptionType: getSubscriptionType(function.Spec.RetainOrdering, function.Spec.RetainKeyOrdering, function.Spec.ProcessingGuarantee), + ClassName: "", + Configs: "", + TypeClassName: function.Spec.Input.TypeClassName, + SubscriptionType: getSubscriptionType(function.Spec.RetainOrdering, function.Spec.RetainKeyOrdering, + function.Spec.ProcessingGuarantee), InputSpecs: inputSpecs, TimeoutMs: uint64(function.Spec.Timeout), Builtin: "", @@ -251,7 +254,7 @@ func convertSourceDetails(source *v1alpha1.Source) *proto.FunctionDetails { fd := &proto.FunctionDetails{ Tenant: source.Spec.Tenant, Namespace: source.Spec.Namespace, - Name: source.Name, + Name: source.Spec.Name, ClassName: "org.apache.pulsar.functions.api.utils.IdentityFunction", ProcessingGuarantees: convertProcessingGuarantee(source.Spec.ProcessingGuarantee), UserConfig: getUserConfig(source.Spec.SourceConfig), @@ -320,11 +323,12 @@ func generateSourceOutputSpec(source *v1alpha1.Source) *proto.SinkSpec { } func convertSinkDetails(sink *v1alpha1.Sink) *proto.FunctionDetails { - deadLetterTopic := getDeadLetterTopicOrDefault(sink.Spec.DeadLetterTopic, sink.Spec.SubscriptionName, sink.Spec.Tenant, sink.Spec.Namespace, sink.Spec.Name, sink.Spec.MaxMessageRetry) + deadLetterTopic := getDeadLetterTopicOrDefault(sink.Spec.DeadLetterTopic, sink.Spec.SubscriptionName, + sink.Spec.Tenant, sink.Spec.Namespace, sink.Spec.Name, sink.Spec.MaxMessageRetry) fd := &proto.FunctionDetails{ Tenant: sink.Spec.Tenant, Namespace: sink.Spec.Namespace, - Name: sink.Name, + Name: sink.Spec.Name, ClassName: "org.apache.pulsar.functions.api.utils.IdentityFunction", ProcessingGuarantees: convertProcessingGuarantee(sink.Spec.ProcessingGuarantee), Runtime: proto.FunctionDetails_JAVA, @@ -351,8 +355,9 @@ func generateSinkInputSpec(sink *v1alpha1.Sink) *proto.SourceSpec { inputSpecs := generateInputSpec(sink.Spec.Input) return &proto.SourceSpec{ - TypeClassName: sink.Spec.Input.TypeClassName, - SubscriptionType: getSubscriptionType(sink.Spec.RetainOrdering, sink.Spec.RetainKeyOrdering, sink.Spec.ProcessingGuarantee), + TypeClassName: sink.Spec.Input.TypeClassName, + SubscriptionType: getSubscriptionType(sink.Spec.RetainOrdering, sink.Spec.RetainKeyOrdering, + sink.Spec.ProcessingGuarantee), InputSpecs: inputSpecs, TimeoutMs: uint64(sink.Spec.Timeout), SubscriptionName: sink.Spec.SubscriptionName, @@ -362,7 +367,8 @@ func generateSinkInputSpec(sink *v1alpha1.Sink) *proto.SourceSpec { } } -func getSubscriptionType(retainOrdering bool, retainKeyOrdering bool, processingGuarantee v1alpha1.ProcessGuarantee) proto.SubscriptionType { +func getSubscriptionType(retainOrdering bool, retainKeyOrdering bool, + processingGuarantee v1alpha1.ProcessGuarantee) proto.SubscriptionType { if retainOrdering || processingGuarantee == v1alpha1.EffectivelyOnce { return proto.SubscriptionType_FAILOVER } @@ -503,8 +509,10 @@ func getParallelism(replicas *int32, showPreciseParallelism bool) int32 { return 1 } -func getDeadLetterTopicOrDefault(deadLetterTopic, subscriptionName, tenant, namespace, name string, maxMessageRetry int32) string { - if deadLetterTopic == "" && maxMessageRetry > 0 && (subscriptionName == "" || strings.Contains(subscriptionName, "\\")) { +func getDeadLetterTopicOrDefault(deadLetterTopic, subscriptionName, tenant, namespace, name string, + maxMessageRetry int32) string { + if deadLetterTopic == "" && maxMessageRetry > 0 && (subscriptionName == "" || strings.Contains(subscriptionName, + "\\")) { // otherwise the auto generated DeadLetterTopic($TOPIC-$SUBNAME-DLQ) will be invalid // like: persistent://public/default/input-public/default/test-function-DLQ return fmt.Sprintf("%s-%s-%s-DLQ", tenant, namespace, name)