Skip to content

Commit

Permalink
support very long function/connector name (#687)
Browse files Browse the repository at this point in the history
* support long name

* revert webhook changes

* remove the label

* fix sink & source details not using correct name
  • Loading branch information
freeznet authored Sep 6, 2023
1 parent 26f75b5 commit b2f3a96
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 30 deletions.
16 changes: 8 additions & 8 deletions controllers/spec/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func MakeFunctionStatefulSet(function *v1alpha1.Function) *appsv1.StatefulSet {
objectMeta := MakeFunctionObjectMeta(function)
return MakeStatefulSet(objectMeta, function.Spec.Replicas, function.Spec.DownloaderImage,
makeFunctionContainer(function), makeFilebeatContainer(function.Spec.VolumeMounts, function.Spec.Pod.Env,
function.Name, function.Spec.LogTopic, function.Spec.LogTopicAgent, function.Spec.Pulsar.TLSConfig,
function.Spec.Name, function.Spec.LogTopic, function.Spec.LogTopicAgent, function.Spec.Pulsar.TLSConfig,
function.Spec.Pulsar.AuthConfig, function.Spec.Pulsar.PulsarConfig, function.Spec.Pulsar.TLSSecret,
function.Spec.Pulsar.AuthSecret, function.Spec.FilebeatImage),
makeFunctionVolumes(function, function.Spec.Pulsar.AuthConfig), makeFunctionLabels(function), function.Spec.Pod,
Expand Down Expand Up @@ -216,25 +216,25 @@ func makeFunctionCommand(function *v1alpha1.Function) []string {
if spec.Java.Jar != "" {
return MakeJavaFunctionCommand(spec.Java.JarLocation, spec.Java.Jar,
spec.Name, spec.ClusterName,
generateJavaLogConfigCommand(function.Spec.Java, function.Spec.LogTopicAgent),
parseJavaLogLevel(function.Spec.Java),
generateJavaLogConfigCommand(spec.Java, spec.LogTopicAgent),
parseJavaLogLevel(spec.Java),
generateFunctionDetailsInJSON(function),
getDecimalSIMemory(spec.Resources.Requests.Memory()), spec.Java.ExtraDependenciesDir,
string(function.UID),
spec.Java.JavaOpts, hasPulsarctl, hasWget,
spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "",
function.Spec.SecretsMap, function.Spec.StateConfig, function.Spec.Pulsar.TLSConfig,
function.Spec.Pulsar.AuthConfig, function.Spec.MaxPendingAsyncRequests,
spec.SecretsMap, spec.StateConfig, spec.Pulsar.TLSConfig,
spec.Pulsar.AuthConfig, spec.MaxPendingAsyncRequests,
generateJavaLogConfigFileName(function.Spec.Java))
}
} else if spec.Python != nil {
if spec.Python.Py != "" {
return MakePythonFunctionCommand(spec.Python.PyLocation, spec.Python.Py,
spec.Name, spec.ClusterName,
generatePythonLogConfigCommand(function.Name, function.Spec.Python, function.Spec.LogTopicAgent),
generatePythonLogConfigCommand(spec.Name, spec.Python, spec.LogTopicAgent),
generateFunctionDetailsInJSON(function), string(function.UID), hasPulsarctl, hasWget,
spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", function.Spec.SecretsMap,
function.Spec.StateConfig, function.Spec.Pulsar.TLSConfig, function.Spec.Pulsar.AuthConfig)
spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", spec.SecretsMap,
spec.StateConfig, spec.Pulsar.TLSConfig, spec.Pulsar.AuthConfig)
}
} else if spec.Golang != nil {
if spec.Golang.Go != "" {
Expand Down
8 changes: 4 additions & 4 deletions controllers/spec/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func MakeSinkService(sink *v1alpha1.Sink) *corev1.Service {
func MakeSinkStatefulSet(sink *v1alpha1.Sink) *appsv1.StatefulSet {
objectMeta := MakeSinkObjectMeta(sink)
return MakeStatefulSet(objectMeta, sink.Spec.Replicas, sink.Spec.DownloaderImage, makeSinkContainer(sink),
makeFilebeatContainer(sink.Spec.VolumeMounts, sink.Spec.Pod.Env, sink.Name, sink.Spec.LogTopic, sink.Spec.LogTopicAgent,
makeFilebeatContainer(sink.Spec.VolumeMounts, sink.Spec.Pod.Env, sink.Spec.Name, sink.Spec.LogTopic, sink.Spec.LogTopicAgent,
sink.Spec.Pulsar.TLSConfig, sink.Spec.Pulsar.AuthConfig, sink.Spec.Pulsar.PulsarConfig, sink.Spec.Pulsar.TLSSecret,
sink.Spec.Pulsar.AuthSecret, sink.Spec.FilebeatImage),
makeSinkVolumes(sink, sink.Spec.Pulsar.AuthConfig), makeSinkLabels(sink), sink.Spec.Pod, *sink.Spec.Pulsar,
Expand Down Expand Up @@ -209,13 +209,13 @@ func MakeSinkCommand(sink *v1alpha1.Sink) []string {
}
return MakeJavaFunctionCommand(spec.Java.JarLocation, spec.Java.Jar,
spec.Name, spec.ClusterName,
generateJavaLogConfigCommand(sink.Spec.Java, sink.Spec.LogTopicAgent),
parseJavaLogLevel(sink.Spec.Java),
generateJavaLogConfigCommand(spec.Java, spec.LogTopicAgent),
parseJavaLogLevel(spec.Java),
generateSinkDetailsInJSON(sink),
getDecimalSIMemory(spec.Resources.Requests.Memory()), spec.Java.ExtraDependenciesDir, string(sink.UID),
spec.Java.JavaOpts, hasPulsarctl, hasWget, spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "",
spec.SecretsMap, spec.StateConfig, spec.Pulsar.TLSConfig, spec.Pulsar.AuthConfig, nil,
generateJavaLogConfigFileName(sink.Spec.Java))
generateJavaLogConfigFileName(spec.Java))
}

func generateSinkDetailsInJSON(sink *v1alpha1.Sink) string {
Expand Down
8 changes: 4 additions & 4 deletions controllers/spec/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func MakeSourceService(source *v1alpha1.Source) *corev1.Service {
func MakeSourceStatefulSet(source *v1alpha1.Source) *appsv1.StatefulSet {
objectMeta := MakeSourceObjectMeta(source)
return MakeStatefulSet(objectMeta, source.Spec.Replicas, source.Spec.DownloaderImage, makeSourceContainer(source),
makeFilebeatContainer(source.Spec.VolumeMounts, source.Spec.Pod.Env, source.Name, source.Spec.LogTopic, source.Spec.LogTopicAgent,
makeFilebeatContainer(source.Spec.VolumeMounts, source.Spec.Pod.Env, source.Spec.Name, source.Spec.LogTopic, source.Spec.LogTopicAgent,
source.Spec.Pulsar.TLSConfig, source.Spec.Pulsar.AuthConfig, source.Spec.Pulsar.PulsarConfig, source.Spec.Pulsar.TLSSecret,
source.Spec.Pulsar.AuthSecret, source.Spec.FilebeatImage),
makeSourceVolumes(source, source.Spec.Pulsar.AuthConfig), makeSourceLabels(source), source.Spec.Pod, *source.Spec.Pulsar,
Expand Down Expand Up @@ -156,13 +156,13 @@ func makeSourceCommand(source *v1alpha1.Source) []string {
}
return MakeJavaFunctionCommand(spec.Java.JarLocation, spec.Java.Jar,
spec.Name, spec.ClusterName,
generateJavaLogConfigCommand(source.Spec.Java, source.Spec.LogTopicAgent),
parseJavaLogLevel(source.Spec.Java),
generateJavaLogConfigCommand(spec.Java, spec.LogTopicAgent),
parseJavaLogLevel(spec.Java),
generateSourceDetailsInJSON(source),
getDecimalSIMemory(spec.Resources.Requests.Memory()), spec.Java.ExtraDependenciesDir, string(source.UID),
spec.Java.JavaOpts, hasPulsarctl, hasWget, spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "",
spec.SecretsMap, spec.StateConfig, spec.Pulsar.TLSConfig, spec.Pulsar.AuthConfig, nil,
generateJavaLogConfigFileName(source.Spec.Java))
generateJavaLogConfigFileName(spec.Java))
}

func generateSourceDetailsInJSON(source *v1alpha1.Source) string {
Expand Down
36 changes: 22 additions & 14 deletions controllers/spec/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func convertFunctionDetails(function *v1alpha1.Function) *proto.FunctionDetails
} else if function.Spec.Python != nil {
runtime = proto.FunctionDetails_PYTHON
}
deadLetterTopic := getDeadLetterTopicOrDefault(function.Spec.DeadLetterTopic, function.Spec.SubscriptionName, function.Spec.Tenant, function.Spec.Namespace, function.Spec.Name, function.Spec.MaxMessageRetry)
deadLetterTopic := getDeadLetterTopicOrDefault(function.Spec.DeadLetterTopic, function.Spec.SubscriptionName,
function.Spec.Tenant, function.Spec.Namespace, function.Spec.Name, function.Spec.MaxMessageRetry)
logTopic := function.Spec.LogTopic
if function.Spec.LogTopicAgent == v1alpha1.SIDECAR {
logTopic = ""
Expand Down Expand Up @@ -98,7 +99,8 @@ func fetchClassName(function *v1alpha1.Function) string {
}

func convertGoFunctionConfs(function *v1alpha1.Function) *GoFunctionConf {
deadLetterTopic := getDeadLetterTopicOrDefault(function.Spec.DeadLetterTopic, function.Spec.SubscriptionName, function.Spec.Tenant, function.Spec.Namespace, function.Spec.Name, function.Spec.MaxMessageRetry)
deadLetterTopic := getDeadLetterTopicOrDefault(function.Spec.DeadLetterTopic, function.Spec.SubscriptionName,
function.Spec.Tenant, function.Spec.Namespace, function.Spec.Name, function.Spec.MaxMessageRetry)
logTopic := function.Spec.LogTopic
if function.Spec.LogTopicAgent == v1alpha1.SIDECAR {
logTopic = ""
Expand Down Expand Up @@ -194,10 +196,11 @@ func generateFunctionInputSpec(function *v1alpha1.Function) *proto.SourceSpec {
inputSpecs := generateInputSpec(function.Spec.Input)

return &proto.SourceSpec{
ClassName: "",
Configs: "",
TypeClassName: function.Spec.Input.TypeClassName,
SubscriptionType: getSubscriptionType(function.Spec.RetainOrdering, function.Spec.RetainKeyOrdering, function.Spec.ProcessingGuarantee),
ClassName: "",
Configs: "",
TypeClassName: function.Spec.Input.TypeClassName,
SubscriptionType: getSubscriptionType(function.Spec.RetainOrdering, function.Spec.RetainKeyOrdering,
function.Spec.ProcessingGuarantee),
InputSpecs: inputSpecs,
TimeoutMs: uint64(function.Spec.Timeout),
Builtin: "",
Expand Down Expand Up @@ -251,7 +254,7 @@ func convertSourceDetails(source *v1alpha1.Source) *proto.FunctionDetails {
fd := &proto.FunctionDetails{
Tenant: source.Spec.Tenant,
Namespace: source.Spec.Namespace,
Name: source.Name,
Name: source.Spec.Name,
ClassName: "org.apache.pulsar.functions.api.utils.IdentityFunction",
ProcessingGuarantees: convertProcessingGuarantee(source.Spec.ProcessingGuarantee),
UserConfig: getUserConfig(source.Spec.SourceConfig),
Expand Down Expand Up @@ -320,11 +323,12 @@ func generateSourceOutputSpec(source *v1alpha1.Source) *proto.SinkSpec {
}

func convertSinkDetails(sink *v1alpha1.Sink) *proto.FunctionDetails {
deadLetterTopic := getDeadLetterTopicOrDefault(sink.Spec.DeadLetterTopic, sink.Spec.SubscriptionName, sink.Spec.Tenant, sink.Spec.Namespace, sink.Spec.Name, sink.Spec.MaxMessageRetry)
deadLetterTopic := getDeadLetterTopicOrDefault(sink.Spec.DeadLetterTopic, sink.Spec.SubscriptionName,
sink.Spec.Tenant, sink.Spec.Namespace, sink.Spec.Name, sink.Spec.MaxMessageRetry)
fd := &proto.FunctionDetails{
Tenant: sink.Spec.Tenant,
Namespace: sink.Spec.Namespace,
Name: sink.Name,
Name: sink.Spec.Name,
ClassName: "org.apache.pulsar.functions.api.utils.IdentityFunction",
ProcessingGuarantees: convertProcessingGuarantee(sink.Spec.ProcessingGuarantee),
Runtime: proto.FunctionDetails_JAVA,
Expand All @@ -351,8 +355,9 @@ func generateSinkInputSpec(sink *v1alpha1.Sink) *proto.SourceSpec {
inputSpecs := generateInputSpec(sink.Spec.Input)

return &proto.SourceSpec{
TypeClassName: sink.Spec.Input.TypeClassName,
SubscriptionType: getSubscriptionType(sink.Spec.RetainOrdering, sink.Spec.RetainKeyOrdering, sink.Spec.ProcessingGuarantee),
TypeClassName: sink.Spec.Input.TypeClassName,
SubscriptionType: getSubscriptionType(sink.Spec.RetainOrdering, sink.Spec.RetainKeyOrdering,
sink.Spec.ProcessingGuarantee),
InputSpecs: inputSpecs,
TimeoutMs: uint64(sink.Spec.Timeout),
SubscriptionName: sink.Spec.SubscriptionName,
Expand All @@ -362,7 +367,8 @@ func generateSinkInputSpec(sink *v1alpha1.Sink) *proto.SourceSpec {
}
}

func getSubscriptionType(retainOrdering bool, retainKeyOrdering bool, processingGuarantee v1alpha1.ProcessGuarantee) proto.SubscriptionType {
func getSubscriptionType(retainOrdering bool, retainKeyOrdering bool,
processingGuarantee v1alpha1.ProcessGuarantee) proto.SubscriptionType {
if retainOrdering || processingGuarantee == v1alpha1.EffectivelyOnce {
return proto.SubscriptionType_FAILOVER
}
Expand Down Expand Up @@ -503,8 +509,10 @@ func getParallelism(replicas *int32, showPreciseParallelism bool) int32 {
return 1
}

func getDeadLetterTopicOrDefault(deadLetterTopic, subscriptionName, tenant, namespace, name string, maxMessageRetry int32) string {
if deadLetterTopic == "" && maxMessageRetry > 0 && (subscriptionName == "" || strings.Contains(subscriptionName, "\\")) {
func getDeadLetterTopicOrDefault(deadLetterTopic, subscriptionName, tenant, namespace, name string,
maxMessageRetry int32) string {
if deadLetterTopic == "" && maxMessageRetry > 0 && (subscriptionName == "" || strings.Contains(subscriptionName,
"\\")) {
// otherwise the auto generated DeadLetterTopic($TOPIC-$SUBNAME-DLQ) will be invalid
// like: persistent://public/default/input-public/default/test-function-DLQ
return fmt.Sprintf("%s-%s-%s-DLQ", tenant, namespace, name)
Expand Down

0 comments on commit b2f3a96

Please sign in to comment.