From 18dfa2ff9654288d5646e82aedc7085c0e17c80c Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Tue, 4 Jun 2024 09:07:48 +0800 Subject: [PATCH] Adjust some code to avoid duplicate (#760) --- api/compute/v1alpha1/common.go | 6 +- api/compute/v1alpha1/zz_generated.deepcopy.go | 16 -- controllers/common.go | 10 +- controllers/spec/common.go | 68 ++++---- controllers/spec/common_test.go | 154 ++++++++---------- controllers/spec/function.go | 36 ++-- controllers/spec/hpa.go | 9 + controllers/spec/sink.go | 31 ++-- controllers/spec/source.go | 31 ++-- 9 files changed, 157 insertions(+), 204 deletions(-) diff --git a/api/compute/v1alpha1/common.go b/api/compute/v1alpha1/common.go index 3a8d90769..c09bbb3c6 100644 --- a/api/compute/v1alpha1/common.go +++ b/api/compute/v1alpha1/common.go @@ -66,7 +66,7 @@ type PulsarMessaging struct { CleanupAuthConfig *AuthConfig `json:"cleanupAuthConfig,omitempty"` } -type TLSConfig struct { +type PulsarTLSConfig struct { Enabled bool `json:"enabled,omitempty"` AllowInsecure bool `json:"allowInsecure,omitempty"` HostnameVerification bool `json:"hostnameVerification,omitempty"` @@ -74,10 +74,6 @@ type TLSConfig struct { CertSecretKey string `json:"certSecretKey,omitempty"` } -type PulsarTLSConfig struct { - TLSConfig `json:",inline"` -} - func (c *PulsarTLSConfig) IsEnabled() bool { return c.Enabled } diff --git a/api/compute/v1alpha1/zz_generated.deepcopy.go b/api/compute/v1alpha1/zz_generated.deepcopy.go index 81b672c7b..688e68184 100644 --- a/api/compute/v1alpha1/zz_generated.deepcopy.go +++ b/api/compute/v1alpha1/zz_generated.deepcopy.go @@ -986,7 +986,6 @@ func (in *PulsarStateStoreJavaProvider) DeepCopy() *PulsarStateStoreJavaProvider // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PulsarTLSConfig) DeepCopyInto(out *PulsarTLSConfig) { *out = *in - out.TLSConfig = in.TLSConfig } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PulsarTLSConfig. @@ -1432,21 +1431,6 @@ func (in *Stateful) DeepCopy() *Stateful { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *TLSConfig) DeepCopyInto(out *TLSConfig) { - *out = *in -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TLSConfig. -func (in *TLSConfig) DeepCopy() *TLSConfig { - if in == nil { - return nil - } - out := new(TLSConfig) - in.DeepCopyInto(out) - return out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *VPASpec) DeepCopyInto(out *VPASpec) { *out = *in diff --git a/controllers/common.go b/controllers/common.go index eaa41ab51..f43d0a7c4 100644 --- a/controllers/common.go +++ b/controllers/common.go @@ -248,9 +248,15 @@ func ConvertHPAV2ToV2beta2(hpa *autov2.HorizontalPodAutoscaler) *autoscalingv2be } result := &autoscalingv2beta2.HorizontalPodAutoscaler{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "autoscaling/v2beta2", + Kind: "HorizontalPodAutoscaler", + }, ObjectMeta: metav1.ObjectMeta{ - Namespace: hpa.Namespace, - Name: hpa.Name, + Namespace: hpa.Namespace, + Name: hpa.Name, + Labels: hpa.Labels, + OwnerReferences: hpa.OwnerReferences, }, Spec: autoscalingv2beta2.HorizontalPodAutoscalerSpec{ ScaleTargetRef: autoscalingv2beta2.CrossVersionObjectReference{ diff --git a/controllers/spec/common.go b/controllers/spec/common.go index bf093420c..2fb71a393 100644 --- a/controllers/spec/common.go +++ b/controllers/spec/common.go @@ -205,14 +205,15 @@ func MakeHeadlessServiceName(serviceName string) string { return fmt.Sprintf("%s-headless", serviceName) } -func MakeStatefulSet(objectMeta *metav1.ObjectMeta, replicas *int32, downloaderImage string, - container *corev1.Container, filebeatContainer *corev1.Container, - volumes []corev1.Volume, labels map[string]string, policy v1alpha1.PodPolicy, pulsar v1alpha1.PulsarMessaging, - javaRuntime *v1alpha1.JavaRuntime, pythonRuntime *v1alpha1.PythonRuntime, - goRuntime *v1alpha1.GoRuntime, definedVolumeMounts []corev1.VolumeMount, - volumeClaimTemplates []corev1.PersistentVolumeClaim, +func MakeStatefulSet(objectMeta *metav1.ObjectMeta, replicas *int32, downloaderImage string, container *corev1.Container, + volumes []corev1.Volume, labels map[string]string, policy v1alpha1.PodPolicy, authConfig *v1alpha1.AuthConfig, + tlsConfig TLSConfig, pulsarConfig, authSecret, tlsSecret string, javaRuntime *v1alpha1.JavaRuntime, + pythonRuntime *v1alpha1.PythonRuntime, goRuntime *v1alpha1.GoRuntime, env []corev1.EnvVar, name, logTopic, filebeatImage string, + logTopicAgent v1alpha1.LogTopicAgent, definedVolumeMounts []corev1.VolumeMount, volumeClaimTemplates []corev1.PersistentVolumeClaim, persistentVolumeClaimRetentionPolicy *appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy) *appsv1.StatefulSet { + filebeatContainer := makeFilebeatContainer(definedVolumeMounts, env, name, logTopic, logTopicAgent, tlsConfig, authConfig, pulsarConfig, tlsSecret, authSecret, filebeatImage) + volumeMounts := generateDownloaderVolumeMountsForDownloader(javaRuntime, pythonRuntime, goRuntime) var downloaderContainer *corev1.Container var podVolumes = volumes @@ -232,12 +233,12 @@ func MakeStatefulSet(objectMeta *metav1.ObjectMeta, replicas *int32, downloaderI // mount auth and tls related VolumeMounts when download package from pulsar if !hasHTTPPrefix(downloadPath) { - if pulsar.AuthConfig != nil && pulsar.AuthConfig.OAuth2Config != nil { - volumeMounts = append(volumeMounts, generateVolumeMountFromOAuth2Config(pulsar.AuthConfig.OAuth2Config)) + if authConfig != nil && authConfig.OAuth2Config != nil { + volumeMounts = append(volumeMounts, generateVolumeMountFromOAuth2Config(authConfig.OAuth2Config)) } - if !reflect.ValueOf(pulsar.TLSConfig).IsNil() && pulsar.TLSConfig.HasSecretVolume() { - volumeMounts = append(volumeMounts, generateVolumeMountFromTLSConfig(pulsar.TLSConfig)) + if !reflect.ValueOf(tlsConfig).IsNil() && tlsConfig.HasSecretVolume() { + volumeMounts = append(volumeMounts, generateVolumeMountFromTLSConfig(tlsConfig)) } } volumeMounts = append(volumeMounts, definedVolumeMounts...) @@ -253,15 +254,15 @@ func MakeStatefulSet(objectMeta *metav1.ObjectMeta, replicas *int32, downloaderI Name: DownloaderName, Image: image, Command: []string{"sh", "-c", - strings.Join(getDownloadCommand(downloadPath, componentPackage, true, true, - pulsar.AuthSecret != "", pulsar.TLSSecret != "", pulsar.TLSConfig, pulsar.AuthConfig), " ")}, + strings.Join(GetDownloadCommand(downloadPath, componentPackage, true, true, + authSecret != "", tlsSecret != "", tlsConfig, authConfig), " ")}, VolumeMounts: volumeMounts, ImagePullPolicy: corev1.PullIfNotPresent, Env: []corev1.EnvVar{{ Name: "HOME", Value: "/tmp", }}, - EnvFrom: generateContainerEnvFrom(pulsar.PulsarConfig, pulsar.AuthSecret, pulsar.TLSSecret), + EnvFrom: GenerateContainerEnvFrom(pulsarConfig, authSecret, tlsSecret), } podVolumes = append(podVolumes, corev1.Volume{ Name: DownloaderVolume, @@ -415,7 +416,7 @@ func MakeJavaFunctionCommand(downloadPath, packageFile, name, clusterName, gener authConfig, maxPendingAsyncRequests, logConfigFileName), " ") if downloadPath != "" && !utils.EnableInitContainers { // prepend download command if the downPath is provided - downloadCommand := strings.Join(getDownloadCommand(downloadPath, packageFile, hasPulsarctl, hasWget, + downloadCommand := strings.Join(GetDownloadCommand(downloadPath, packageFile, hasPulsarctl, hasWget, authProvided, tlsProvided, tlsConfig, authConfig), " ") processCommand = downloadCommand + " && " + processCommand } @@ -431,7 +432,7 @@ func MakePythonFunctionCommand(downloadPath, packageFile, name, clusterName, gen details, uid, authProvided, tlsProvided, secretMaps, state, tlsConfig, authConfig), " ") if downloadPath != "" && !utils.EnableInitContainers { // prepend download command if the downPath is provided - downloadCommand := strings.Join(getDownloadCommand(downloadPath, packageFile, hasPulsarctl, hasWget, + downloadCommand := strings.Join(GetDownloadCommand(downloadPath, packageFile, hasPulsarctl, hasWget, authProvided, tlsProvided, tlsConfig, authConfig), " ") processCommand = downloadCommand + " && " + processCommand @@ -450,7 +451,7 @@ func MakeGoFunctionCommand(downloadPath, goExecFilePath string, function *v1alph hasPulsarctl = true hasWget = true } - downloadCommand := strings.Join(getDownloadCommand(downloadPath, goExecFilePath, + downloadCommand := strings.Join(GetDownloadCommand(downloadPath, goExecFilePath, hasPulsarctl, hasWget, function.Spec.Pulsar.AuthSecret != "", function.Spec.Pulsar.TLSSecret != "", function.Spec.Pulsar.TLSConfig, function.Spec.Pulsar.AuthConfig), " ") processCommand = downloadCommand + " && ls -al && pwd &&" + processCommand @@ -466,7 +467,7 @@ func MakeGenericFunctionCommand(downloadPath, functionFile, language, clusterNam details, uid, authProvided, tlsProvided, secretMaps, state, tlsConfig, authConfig), " ") if downloadPath != "" && !utils.EnableInitContainers { // prepend download command if the downPath is provided - downloadCommand := strings.Join(getDownloadCommand(downloadPath, functionFile, true, true, + downloadCommand := strings.Join(GetDownloadCommand(downloadPath, functionFile, true, true, authProvided, tlsProvided, tlsConfig, authConfig), " ") processCommand = downloadCommand + " && " + processCommand @@ -486,7 +487,7 @@ func MakeLivenessProbe(liveness *v1alpha1.Liveness) *corev1.Probe { ProbeHandler: corev1.ProbeHandler{ HTTPGet: &corev1.HTTPGetAction{ Path: "/", - Port: intstr.FromInt(int(MetricsPort.ContainerPort)), + Port: intstr.FromInt32(MetricsPort.ContainerPort), }, }, InitialDelaySeconds: initialDelay, @@ -788,7 +789,7 @@ func getCleanUpCommand(hasPulsarctl, authProvided, tlsProvided bool, tlsConfig T " ")} } -func getDownloadCommand(downloadPath, componentPackage string, hasPulsarctl, hasWget, authProvided, tlsProvided bool, +func GetDownloadCommand(downloadPath, componentPackage string, hasPulsarctl, hasWget, authProvided, tlsProvided bool, tlsConfig TLSConfig, authConfig *v1alpha1.AuthConfig) []string { var args []string @@ -822,7 +823,7 @@ func getDownloadCommand(downloadPath, componentPackage string, hasPulsarctl, has return args } -func generateJavaLogConfigCommand(runtime *v1alpha1.JavaRuntime, agent v1alpha1.LogTopicAgent) string { +func GenerateJavaLogConfigCommand(runtime *v1alpha1.JavaRuntime, agent v1alpha1.LogTopicAgent) string { if runtime == nil || (runtime.Log != nil && runtime.Log.LogConfig != nil) { return "" } @@ -857,7 +858,7 @@ func generateJavaLogConfigCommand(runtime *v1alpha1.JavaRuntime, agent v1alpha1. return "" } -func generateJavaLogConfigFileName(runtime *v1alpha1.JavaRuntime) string { +func GenerateJavaLogConfigFileName(runtime *v1alpha1.JavaRuntime) string { if runtime == nil || (runtime.Log != nil && runtime.Log.LogConfig != nil) { return DefaultJavaLogConfigPath } @@ -1524,12 +1525,15 @@ func generateBasicContainerEnv(secrets map[string]v1alpha1.SecretRef, env []core return vars } -func generateContainerEnvFrom(messagingConfig string, authSecret string, tlsSecret string) []corev1.EnvFromSource { - envs := []corev1.EnvFromSource{{ - ConfigMapRef: &corev1.ConfigMapEnvSource{ - LocalObjectReference: corev1.LocalObjectReference{Name: messagingConfig}, - }, - }} +func GenerateContainerEnvFrom(messagingConfig string, authSecret string, tlsSecret string) []corev1.EnvFromSource { + var envs []corev1.EnvFromSource + if messagingConfig != "" { + envs = append(envs, corev1.EnvFromSource{ + ConfigMapRef: &corev1.ConfigMapEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{Name: messagingConfig}, + }, + }) + } if authSecret != "" { envs = append(envs, corev1.EnvFromSource{ @@ -1814,7 +1818,7 @@ func generateContainerVolumeMountsFromProducerConf(conf *v1alpha1.ProducerConfig return mounts } -func generateContainerVolumeMounts(volumeMounts []corev1.VolumeMount, producerConf *v1alpha1.ProducerConfig, +func GenerateContainerVolumeMounts(volumeMounts []corev1.VolumeMount, producerConf *v1alpha1.ProducerConfig, consumerConfs map[string]v1alpha1.ConsumerConfig, tlsConfig TLSConfig, authConfig *v1alpha1.AuthConfig, logConfigs map[int32]*v1alpha1.RuntimeLogConfig, agent v1alpha1.LogTopicAgent) []corev1.VolumeMount { mounts := []corev1.VolumeMount{} @@ -1839,7 +1843,7 @@ func generateContainerVolumeMounts(volumeMounts []corev1.VolumeMount, producerCo return mounts } -func generatePodVolumes(podVolumes []corev1.Volume, producerConf *v1alpha1.ProducerConfig, +func GeneratePodVolumes(podVolumes []corev1.Volume, producerConf *v1alpha1.ProducerConfig, consumerConfs map[string]v1alpha1.ConsumerConfig, tlsConfig TLSConfig, authConfig *v1alpha1.AuthConfig, logConfigs map[int32]*v1alpha1.RuntimeLogConfig, agent v1alpha1.LogTopicAgent) []corev1.Volume { @@ -1999,7 +2003,7 @@ const ( golangRuntimeLog ) -func getRuntimeLogConfigNames(java *v1alpha1.JavaRuntime, python *v1alpha1.PythonRuntime, +func GetRuntimeLogConfigNames(java *v1alpha1.JavaRuntime, python *v1alpha1.PythonRuntime, golang *v1alpha1.GoRuntime) map[int32]*v1alpha1.RuntimeLogConfig { var configs = map[int32]*v1alpha1.RuntimeLogConfig{} @@ -2249,7 +2253,7 @@ func makeFilebeatContainer(volumeMounts []corev1.VolumeMount, envVar []corev1.En } imagePullPolicy := corev1.PullIfNotPresent allowPrivilegeEscalation := false - mounts := generateContainerVolumeMounts(volumeMounts, nil, nil, tlsConfig, authConfig, nil, agent) + mounts := GenerateContainerVolumeMounts(volumeMounts, nil, nil, tlsConfig, authConfig, nil, agent) var uid int64 = 1000 @@ -2332,7 +2336,7 @@ func makeFilebeatContainer(volumeMounts []corev1.VolumeMount, envVar []corev1.En Command: []string{"/bin/sh", "-c", "--", "echo " + fmt.Sprintf("\"%s\"", tpl.String()) + " > " + DefaultFilebeatConfig + " && /usr/share/filebeat/filebeat -e -c " + DefaultFilebeatConfig}, Env: envs, ImagePullPolicy: imagePullPolicy, - EnvFrom: generateContainerEnvFrom(pulsarConfig, authSecret, tlsSecret), + EnvFrom: GenerateContainerEnvFrom(pulsarConfig, authSecret, tlsSecret), VolumeMounts: mounts, SecurityContext: &corev1.SecurityContext{ Capabilities: &corev1.Capabilities{ diff --git a/controllers/spec/common_test.go b/controllers/spec/common_test.go index f9a168a1d..8518dc343 100644 --- a/controllers/spec/common_test.go +++ b/controllers/spec/common_test.go @@ -136,13 +136,11 @@ func TestGetDownloadCommand(t *testing.T) { // test get the download command with a tls config {"function://public/default/test@v1", "function-package.jar", &v1alpha1.PulsarTLSConfig{ - TLSConfig: v1alpha1.TLSConfig{ - Enabled: true, - AllowInsecure: false, - HostnameVerification: true, - CertSecretName: "test-secret", - CertSecretKey: "test-key", - }, + Enabled: true, + AllowInsecure: false, + HostnameVerification: true, + CertSecretName: "test-secret", + CertSecretKey: "test-key", }, nil, nil, []string{ "export PATH=$PATH:/pulsar/bin && ", @@ -157,13 +155,11 @@ func TestGetDownloadCommand(t *testing.T) { }, {"function://public/default/test@v1", "function-package.jar", &v1alpha1.PulsarTLSConfig{ - TLSConfig: v1alpha1.TLSConfig{ - Enabled: false, - AllowInsecure: false, - HostnameVerification: true, - CertSecretName: "test-secret", - CertSecretKey: "test-key", - }, + Enabled: false, + AllowInsecure: false, + HostnameVerification: true, + CertSecretName: "test-secret", + CertSecretKey: "test-key", }, nil, nil, []string{ "export PATH=$PATH:/pulsar/bin && ", @@ -175,13 +171,11 @@ func TestGetDownloadCommand(t *testing.T) { }, {"function://public/default/test@v1", "function-package.jar", &v1alpha1.PulsarTLSConfig{ - TLSConfig: v1alpha1.TLSConfig{ - Enabled: true, - AllowInsecure: true, - HostnameVerification: false, - CertSecretName: "test-secret", - CertSecretKey: "test-key", - }, + Enabled: true, + AllowInsecure: true, + HostnameVerification: false, + CertSecretName: "test-secret", + CertSecretKey: "test-key", }, nil, nil, []string{ "export PATH=$PATH:/pulsar/bin && ", @@ -196,13 +190,11 @@ func TestGetDownloadCommand(t *testing.T) { }, {"function://public/default/test@v1", "function-package.jar", &v1alpha1.PulsarTLSConfig{ - TLSConfig: v1alpha1.TLSConfig{ - Enabled: true, - AllowInsecure: true, - HostnameVerification: false, - CertSecretName: "test-secret", - CertSecretKey: "test-key", - }, + Enabled: true, + AllowInsecure: true, + HostnameVerification: false, + CertSecretName: "test-secret", + CertSecretKey: "test-key", }, nil, &v1alpha1.GenericAuth{ ClientAuthenticationParameters: "auth-params", ClientAuthenticationPlugin: "auth-plugin", @@ -228,13 +220,11 @@ func TestGetDownloadCommand(t *testing.T) { }, {"http://aaa.bbb.ccc/test.jar", "function-package.jar", &v1alpha1.PulsarTLSConfig{ - TLSConfig: v1alpha1.TLSConfig{ - Enabled: true, - AllowInsecure: true, - HostnameVerification: false, - CertSecretName: "test-secret", - CertSecretKey: "test-key", - }, + Enabled: true, + AllowInsecure: true, + HostnameVerification: false, + CertSecretName: "test-secret", + CertSecretKey: "test-key", }, nil, nil, []string{ "wget", @@ -268,13 +258,11 @@ func TestGetDownloadCommand(t *testing.T) { false, }, {"source://public/default/test@v1", "source-package.jar", &v1alpha1.PulsarTLSConfig{ - TLSConfig: v1alpha1.TLSConfig{ - Enabled: true, - AllowInsecure: false, - HostnameVerification: true, - CertSecretName: "test-secret", - CertSecretKey: "test-key", - }, + Enabled: true, + AllowInsecure: false, + HostnameVerification: true, + CertSecretName: "test-secret", + CertSecretKey: "test-key", }, nil, nil, []string{ PulsarAdminExecutableFile, @@ -287,13 +275,11 @@ func TestGetDownloadCommand(t *testing.T) { }, // test get the download command with normal name {"/test", "test.jar", &v1alpha1.PulsarTLSConfig{ - TLSConfig: v1alpha1.TLSConfig{ - Enabled: true, - AllowInsecure: true, - HostnameVerification: true, - CertSecretName: "test-secret", - CertSecretKey: "test-key", - }, + Enabled: true, + AllowInsecure: true, + HostnameVerification: true, + CertSecretName: "test-secret", + CertSecretKey: "test-key", }, nil, nil, []string{ PulsarAdminExecutableFile, @@ -306,13 +292,11 @@ func TestGetDownloadCommand(t *testing.T) { }, // test get the download command with a wrong package name {"source/public/default/test@v1", "source-package.jar", &v1alpha1.PulsarTLSConfig{ - TLSConfig: v1alpha1.TLSConfig{ - Enabled: true, - AllowInsecure: true, - HostnameVerification: false, - CertSecretName: "test-secret", - CertSecretKey: "test-key", - }, + Enabled: true, + AllowInsecure: true, + HostnameVerification: false, + CertSecretName: "test-secret", + CertSecretKey: "test-key", }, nil, nil, []string{ PulsarAdminExecutableFile, @@ -324,13 +308,11 @@ func TestGetDownloadCommand(t *testing.T) { false, }, {"source:/public/default/test@v1", "source-package.jar", &v1alpha1.PulsarTLSConfig{ - TLSConfig: v1alpha1.TLSConfig{ - Enabled: false, - AllowInsecure: true, - HostnameVerification: false, - CertSecretName: "test-secret", - CertSecretKey: "test-key", - }, + Enabled: false, + AllowInsecure: true, + HostnameVerification: false, + CertSecretName: "test-secret", + CertSecretKey: "test-key", }, nil, nil, []string{ PulsarAdminExecutableFile, @@ -348,7 +330,7 @@ func TestGetDownloadCommand(t *testing.T) { } else if v.genericAuth != nil { authConfig.GenericAuth = v.genericAuth } - actualResult := getDownloadCommand(v.downloadPath, v.componentPackage, v.hasPulsarctl, v.hasPulsarctl, false, false, v.tlsConfig, &authConfig) + actualResult := GetDownloadCommand(v.downloadPath, v.componentPackage, v.hasPulsarctl, v.hasPulsarctl, false, false, v.tlsConfig, &authConfig) assert.Equal(t, v.expectedCommand, actualResult) } } @@ -641,13 +623,11 @@ func TestGeneratePodVolumes(t *testing.T) { }, }, trustCert: &v1alpha1.PulsarTLSConfig{ - TLSConfig: v1alpha1.TLSConfig{ - Enabled: true, - AllowInsecure: true, - HostnameVerification: true, - CertSecretName: "test-trust-secret", - CertSecretKey: "test-trust-key", - }, + Enabled: true, + AllowInsecure: true, + HostnameVerification: true, + CertSecretName: "test-trust-secret", + CertSecretKey: "test-trust-key", }, }, want: []corev1.Volume{ @@ -835,7 +815,7 @@ func TestGeneratePodVolumes(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { assert.Equalf(t, tt.want, - generatePodVolumes( + GeneratePodVolumes( tt.args.podVolumes, tt.args.producerConf, tt.args.consumerConfs, @@ -843,7 +823,7 @@ func TestGeneratePodVolumes(t *testing.T) { tt.args.authConfig, tt.args.logConf, v1alpha1.RUNTIME, - ), "generatePodVolumes(%v, %v, %v, %v)", tt.args.podVolumes, tt.args.producerConf, tt.args.consumerConfs, tt.args.trustCert) + ), "GeneratePodVolumes(%v, %v, %v, %v)", tt.args.podVolumes, tt.args.producerConf, tt.args.consumerConfs, tt.args.trustCert) }) } } @@ -957,13 +937,11 @@ func TestGenerateContainerVolumeMounts(t *testing.T) { }, }, trustCert: &v1alpha1.PulsarTLSConfig{ - TLSConfig: v1alpha1.TLSConfig{ - Enabled: true, - AllowInsecure: true, - HostnameVerification: true, - CertSecretName: "test-trust-secret", - CertSecretKey: "test-trust-key", - }, + Enabled: true, + AllowInsecure: true, + HostnameVerification: true, + CertSecretName: "test-trust-secret", + CertSecretKey: "test-trust-key", }, }, want: []corev1.VolumeMount{ @@ -1005,13 +983,11 @@ func TestGenerateContainerVolumeMounts(t *testing.T) { }, }, trustCert: &v1alpha1.PulsarTLSConfig{ - TLSConfig: v1alpha1.TLSConfig{ - Enabled: true, - AllowInsecure: true, - HostnameVerification: true, - CertSecretName: "test-trust-secret", - CertSecretKey: "test-trust-key", - }, + Enabled: true, + AllowInsecure: true, + HostnameVerification: true, + CertSecretName: "test-trust-secret", + CertSecretKey: "test-trust-key", }, javaRuntime: &v1alpha1.JavaRuntime{ Jar: "test.jar", @@ -1115,14 +1091,14 @@ func TestGenerateContainerVolumeMounts(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { assert.Equalf(t, tt.want, - generateContainerVolumeMounts( + GenerateContainerVolumeMounts( tt.args.volumeMounts, tt.args.producerConf, tt.args.consumerConfs, tt.args.trustCert, tt.args.authConfig, tt.args.logConf, - v1alpha1.RUNTIME), "generateContainerVolumeMounts(%v, %v, %v, %v)", tt.args.volumeMounts, tt.args.producerConf, tt.args.consumerConfs, tt.args.trustCert) + v1alpha1.RUNTIME), "GenerateContainerVolumeMounts(%v, %v, %v, %v)", tt.args.volumeMounts, tt.args.producerConf, tt.args.consumerConfs, tt.args.trustCert) }) } } diff --git a/controllers/spec/function.go b/controllers/spec/function.go index 65bfded84..17ab2540a 100644 --- a/controllers/spec/function.go +++ b/controllers/spec/function.go @@ -43,13 +43,7 @@ func MakeFunctionHPA(function *v1alpha1.Function) *autov2.HorizontalPodAutoscale Name: function.Name, APIVersion: function.APIVersion, } - if isBuiltinHPAEnabled(function.Spec.MinReplicas, function.Spec.MaxReplicas, function.Spec.Pod) { - return makeBuiltinHPA(objectMeta, *function.Spec.MinReplicas, *function.Spec.MaxReplicas, targetRef, - function.Spec.Pod.BuiltinAutoscaler) - } else if !isDefaultHPAEnabled(function.Spec.MinReplicas, function.Spec.MaxReplicas, function.Spec.Pod) { - return makeHPA(objectMeta, *function.Spec.MinReplicas, *function.Spec.MaxReplicas, function.Spec.Pod, targetRef) - } - return makeDefaultHPA(objectMeta, *function.Spec.MinReplicas, *function.Spec.MaxReplicas, targetRef) + return MakeHPA(objectMeta, targetRef, function.Spec.MinReplicas, function.Spec.MaxReplicas, function.Spec.Pod) } func MakeFunctionService(function *v1alpha1.Function) *corev1.Service { @@ -60,15 +54,13 @@ func MakeFunctionService(function *v1alpha1.Function) *corev1.Service { func MakeFunctionStatefulSet(ctx context.Context, cli client.Client, function *v1alpha1.Function) (*appsv1.StatefulSet, error) { objectMeta := MakeFunctionObjectMeta(function) + labels := makeFunctionLabels(function) statefulSet := MakeStatefulSet(objectMeta, function.Spec.Replicas, function.Spec.DownloaderImage, - makeFunctionContainer(function), makeFilebeatContainer(function.Spec.VolumeMounts, function.Spec.Pod.Env, - function.Spec.Name, function.Spec.LogTopic, function.Spec.LogTopicAgent, function.Spec.Pulsar.TLSConfig, - function.Spec.Pulsar.AuthConfig, function.Spec.Pulsar.PulsarConfig, function.Spec.Pulsar.TLSSecret, - function.Spec.Pulsar.AuthSecret, function.Spec.FilebeatImage), - makeFunctionVolumes(function, function.Spec.Pulsar.AuthConfig), makeFunctionLabels(function), function.Spec.Pod, - *function.Spec.Pulsar, function.Spec.Java, function.Spec.Python, function.Spec.Golang, - function.Spec.VolumeMounts, function.Spec.VolumeClaimTemplates, - function.Spec.PersistentVolumeClaimRetentionPolicy) + makeFunctionContainer(function), makeFunctionVolumes(function, function.Spec.Pulsar.AuthConfig), labels, function.Spec.Pod, + function.Spec.Pulsar.AuthConfig, function.Spec.Pulsar.TLSConfig, function.Spec.Pulsar.PulsarConfig, function.Spec.Pulsar.AuthSecret, + function.Spec.Pulsar.TLSSecret, function.Spec.Java, function.Spec.Python, function.Spec.Golang, function.Spec.Pod.Env, function.Name, + function.Spec.LogTopic, function.Spec.FilebeatImage, function.Spec.LogTopicAgent, function.Spec.VolumeMounts, + function.Spec.VolumeClaimTemplates, function.Spec.PersistentVolumeClaimRetentionPolicy) globalBackendConfigVersion, namespacedBackendConfigVersion, err := PatchStatefulSet(ctx, cli, function.Namespace, statefulSet) if err != nil { @@ -146,22 +138,22 @@ func MakeFunctionCleanUpJob(function *v1alpha1.Function) *v1.Job { } func makeFunctionVolumes(function *v1alpha1.Function, authConfig *v1alpha1.AuthConfig) []corev1.Volume { - return generatePodVolumes(function.Spec.Pod.Volumes, + return GeneratePodVolumes(function.Spec.Pod.Volumes, function.Spec.Output.ProducerConf, function.Spec.Input.SourceSpecs, function.Spec.Pulsar.TLSConfig, authConfig, - getRuntimeLogConfigNames(function.Spec.Java, function.Spec.Python, function.Spec.Golang), + GetRuntimeLogConfigNames(function.Spec.Java, function.Spec.Python, function.Spec.Golang), function.Spec.LogTopicAgent) } func makeFunctionVolumeMounts(function *v1alpha1.Function, authConfig *v1alpha1.AuthConfig) []corev1.VolumeMount { - return generateContainerVolumeMounts(function.Spec.VolumeMounts, + return GenerateContainerVolumeMounts(function.Spec.VolumeMounts, function.Spec.Output.ProducerConf, function.Spec.Input.SourceSpecs, function.Spec.Pulsar.TLSConfig, authConfig, - getRuntimeLogConfigNames(function.Spec.Java, function.Spec.Python, function.Spec.Golang), + GetRuntimeLogConfigNames(function.Spec.Java, function.Spec.Python, function.Spec.Golang), function.Spec.LogTopicAgent) } @@ -186,7 +178,7 @@ func makeFunctionContainer(function *v1alpha1.Function) *corev1.Container { Env: generateContainerEnv(function), Resources: function.Spec.Resources, ImagePullPolicy: imagePullPolicy, - EnvFrom: generateContainerEnvFrom(function.Spec.Pulsar.PulsarConfig, function.Spec.Pulsar.AuthSecret, + EnvFrom: GenerateContainerEnvFrom(function.Spec.Pulsar.PulsarConfig, function.Spec.Pulsar.AuthSecret, function.Spec.Pulsar.TLSSecret), VolumeMounts: mounts, LivenessProbe: probe, @@ -230,7 +222,7 @@ func makeFunctionCommand(function *v1alpha1.Function) []string { if spec.Java.Jar != "" { return MakeJavaFunctionCommand(spec.Java.JarLocation, spec.Java.Jar, spec.Name, spec.ClusterName, - generateJavaLogConfigCommand(spec.Java, spec.LogTopicAgent), + GenerateJavaLogConfigCommand(spec.Java, spec.LogTopicAgent), parseJavaLogLevel(spec.Java), generateFunctionDetailsInJSON(function), spec.Java.ExtraDependenciesDir, @@ -239,7 +231,7 @@ func makeFunctionCommand(function *v1alpha1.Function) []string { spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", spec.SecretsMap, spec.StateConfig, spec.Pulsar.TLSConfig, spec.Pulsar.AuthConfig, spec.MaxPendingAsyncRequests, - generateJavaLogConfigFileName(function.Spec.Java)) + GenerateJavaLogConfigFileName(function.Spec.Java)) } } else if spec.Python != nil { if spec.Python.Py != "" { diff --git a/controllers/spec/hpa.go b/controllers/spec/hpa.go index bd5d2d7e9..a3144cead 100644 --- a/controllers/spec/hpa.go +++ b/controllers/spec/hpa.go @@ -181,3 +181,12 @@ func makeHPA(objectMeta *metav1.ObjectMeta, minReplicas, maxReplicas int32, podP Spec: spec, } } + +func MakeHPA(objectMeta *metav1.ObjectMeta, targetRef autov2.CrossVersionObjectReference, minReplicas, maxReplicas *int32, policy v1alpha1.PodPolicy) *autov2.HorizontalPodAutoscaler { + if isBuiltinHPAEnabled(minReplicas, maxReplicas, policy) { + return makeBuiltinHPA(objectMeta, *minReplicas, *maxReplicas, targetRef, policy.BuiltinAutoscaler) + } else if !isDefaultHPAEnabled(minReplicas, maxReplicas, policy) { + return makeHPA(objectMeta, *minReplicas, *maxReplicas, policy, targetRef) + } + return makeDefaultHPA(objectMeta, *minReplicas, *maxReplicas, targetRef) +} diff --git a/controllers/spec/sink.go b/controllers/spec/sink.go index 321f9ecd1..7014622ec 100644 --- a/controllers/spec/sink.go +++ b/controllers/spec/sink.go @@ -40,13 +40,7 @@ func MakeSinkHPA(sink *v1alpha1.Sink) *autov2.HorizontalPodAutoscaler { Name: sink.Name, APIVersion: sink.APIVersion, } - if isBuiltinHPAEnabled(sink.Spec.MinReplicas, sink.Spec.MaxReplicas, sink.Spec.Pod) { - return makeBuiltinHPA(objectMeta, *sink.Spec.MinReplicas, *sink.Spec.MaxReplicas, targetRef, - sink.Spec.Pod.BuiltinAutoscaler) - } else if !isDefaultHPAEnabled(sink.Spec.MinReplicas, sink.Spec.MaxReplicas, sink.Spec.Pod) { - return makeHPA(objectMeta, *sink.Spec.MinReplicas, *sink.Spec.MaxReplicas, sink.Spec.Pod, targetRef) - } - return makeDefaultHPA(objectMeta, *sink.Spec.MinReplicas, *sink.Spec.MaxReplicas, targetRef) + return MakeHPA(objectMeta, targetRef, sink.Spec.MinReplicas, sink.Spec.MaxReplicas, sink.Spec.Pod) } func MakeSinkService(sink *v1alpha1.Sink) *corev1.Service { @@ -58,11 +52,10 @@ func MakeSinkService(sink *v1alpha1.Sink) *corev1.Service { func MakeSinkStatefulSet(ctx context.Context, cli client.Client, sink *v1alpha1.Sink) (*appsv1.StatefulSet, error) { objectMeta := MakeSinkObjectMeta(sink) statefulSet := MakeStatefulSet(objectMeta, sink.Spec.Replicas, sink.Spec.DownloaderImage, makeSinkContainer(sink), - makeFilebeatContainer(sink.Spec.VolumeMounts, sink.Spec.Pod.Env, sink.Spec.Name, sink.Spec.LogTopic, sink.Spec.LogTopicAgent, - sink.Spec.Pulsar.TLSConfig, sink.Spec.Pulsar.AuthConfig, sink.Spec.Pulsar.PulsarConfig, sink.Spec.Pulsar.TLSSecret, - sink.Spec.Pulsar.AuthSecret, sink.Spec.FilebeatImage), - makeSinkVolumes(sink, sink.Spec.Pulsar.AuthConfig), makeSinkLabels(sink), sink.Spec.Pod, *sink.Spec.Pulsar, - sink.Spec.Java, sink.Spec.Python, sink.Spec.Golang, sink.Spec.VolumeMounts, nil, nil) + makeSinkVolumes(sink, sink.Spec.Pulsar.AuthConfig), makeSinkLabels(sink), sink.Spec.Pod, sink.Spec.Pulsar.AuthConfig, + sink.Spec.Pulsar.TLSConfig, sink.Spec.Pulsar.PulsarConfig, sink.Spec.Pulsar.AuthSecret, sink.Spec.Pulsar.TLSSecret, + sink.Spec.Java, sink.Spec.Python, sink.Spec.Golang, sink.Spec.Pod.Env, sink.Name, sink.Spec.LogTopic, sink.Spec.FilebeatImage, + sink.Spec.LogTopicAgent, sink.Spec.VolumeMounts, nil, nil) globalBackendConfigVersion, namespacedBackendConfigVersion, err := PatchStatefulSet(ctx, cli, sink.Namespace, statefulSet) if err != nil { @@ -114,7 +107,7 @@ func makeSinkContainer(sink *v1alpha1.Sink) *corev1.Container { Env: generateBasicContainerEnv(sink.Spec.SecretsMap, sink.Spec.Pod.Env), Resources: sink.Spec.Resources, ImagePullPolicy: imagePullPolicy, - EnvFrom: generateContainerEnvFrom(sink.Spec.Pulsar.PulsarConfig, sink.Spec.Pulsar.AuthSecret, + EnvFrom: GenerateContainerEnvFrom(sink.Spec.Pulsar.PulsarConfig, sink.Spec.Pulsar.AuthSecret, sink.Spec.Pulsar.TLSSecret), VolumeMounts: mounts, LivenessProbe: probe, @@ -193,24 +186,24 @@ func MakeSinkCleanUpJob(sink *v1alpha1.Sink) *v1.Job { } func makeSinkVolumes(sink *v1alpha1.Sink, authConfig *v1alpha1.AuthConfig) []corev1.Volume { - return generatePodVolumes( + return GeneratePodVolumes( sink.Spec.Pod.Volumes, nil, sink.Spec.Input.SourceSpecs, sink.Spec.Pulsar.TLSConfig, authConfig, - getRuntimeLogConfigNames(sink.Spec.Java, sink.Spec.Python, sink.Spec.Golang), + GetRuntimeLogConfigNames(sink.Spec.Java, sink.Spec.Python, sink.Spec.Golang), sink.Spec.LogTopicAgent) } func makeSinkVolumeMounts(sink *v1alpha1.Sink, authConfig *v1alpha1.AuthConfig) []corev1.VolumeMount { - return generateContainerVolumeMounts( + return GenerateContainerVolumeMounts( sink.Spec.VolumeMounts, nil, sink.Spec.Input.SourceSpecs, sink.Spec.Pulsar.TLSConfig, authConfig, - getRuntimeLogConfigNames(sink.Spec.Java, sink.Spec.Python, sink.Spec.Golang), + GetRuntimeLogConfigNames(sink.Spec.Java, sink.Spec.Python, sink.Spec.Golang), sink.Spec.LogTopicAgent) } @@ -224,13 +217,13 @@ func MakeSinkCommand(sink *v1alpha1.Sink) []string { } return MakeJavaFunctionCommand(spec.Java.JarLocation, spec.Java.Jar, spec.Name, spec.ClusterName, - generateJavaLogConfigCommand(spec.Java, spec.LogTopicAgent), + GenerateJavaLogConfigCommand(spec.Java, spec.LogTopicAgent), parseJavaLogLevel(spec.Java), generateSinkDetailsInJSON(sink), spec.Java.ExtraDependenciesDir, string(sink.UID), spec.Java.JavaOpts, hasPulsarctl, hasWget, spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", spec.SecretsMap, spec.StateConfig, spec.Pulsar.TLSConfig, spec.Pulsar.AuthConfig, nil, - generateJavaLogConfigFileName(spec.Java)) + GenerateJavaLogConfigFileName(spec.Java)) } func generateSinkDetailsInJSON(sink *v1alpha1.Sink) string { diff --git a/controllers/spec/source.go b/controllers/spec/source.go index d077cb2aa..bfc6a8817 100644 --- a/controllers/spec/source.go +++ b/controllers/spec/source.go @@ -41,13 +41,7 @@ func MakeSourceHPA(source *v1alpha1.Source) *autov2.HorizontalPodAutoscaler { Name: source.Name, APIVersion: source.APIVersion, } - if isBuiltinHPAEnabled(source.Spec.MinReplicas, source.Spec.MaxReplicas, source.Spec.Pod) { - return makeBuiltinHPA(objectMeta, *source.Spec.MinReplicas, *source.Spec.MaxReplicas, targetRef, - source.Spec.Pod.BuiltinAutoscaler) - } else if !isDefaultHPAEnabled(source.Spec.MinReplicas, source.Spec.MaxReplicas, source.Spec.Pod) { - return makeHPA(objectMeta, *source.Spec.MinReplicas, *source.Spec.MaxReplicas, source.Spec.Pod, targetRef) - } - return makeDefaultHPA(objectMeta, *source.Spec.MinReplicas, *source.Spec.MaxReplicas, targetRef) + return MakeHPA(objectMeta, targetRef, source.Spec.MinReplicas, source.Spec.MaxReplicas, source.Spec.Pod) } func MakeSourceService(source *v1alpha1.Source) *corev1.Service { @@ -59,11 +53,10 @@ func MakeSourceService(source *v1alpha1.Source) *corev1.Service { func MakeSourceStatefulSet(ctx context.Context, cli client.Client, source *v1alpha1.Source) (*appsv1.StatefulSet, error) { objectMeta := MakeSourceObjectMeta(source) statefulSet := MakeStatefulSet(objectMeta, source.Spec.Replicas, source.Spec.DownloaderImage, makeSourceContainer(source), - makeFilebeatContainer(source.Spec.VolumeMounts, source.Spec.Pod.Env, source.Spec.Name, source.Spec.LogTopic, source.Spec.LogTopicAgent, - source.Spec.Pulsar.TLSConfig, source.Spec.Pulsar.AuthConfig, source.Spec.Pulsar.PulsarConfig, source.Spec.Pulsar.TLSSecret, - source.Spec.Pulsar.AuthSecret, source.Spec.FilebeatImage), - makeSourceVolumes(source, source.Spec.Pulsar.AuthConfig), makeSourceLabels(source), source.Spec.Pod, *source.Spec.Pulsar, - source.Spec.Java, source.Spec.Python, source.Spec.Golang, source.Spec.VolumeMounts, nil, nil) + makeSourceVolumes(source, source.Spec.Pulsar.AuthConfig), makeSourceLabels(source), source.Spec.Pod, source.Spec.Pulsar.AuthConfig, + source.Spec.Pulsar.TLSConfig, source.Spec.Pulsar.PulsarConfig, source.Spec.Pulsar.AuthSecret, source.Spec.Pulsar.TLSSecret, + source.Spec.Java, source.Spec.Python, source.Spec.Golang, source.Spec.Pod.Env, source.Name, source.Spec.LogTopic, source.Spec.FilebeatImage, + source.Spec.LogTopicAgent, source.Spec.VolumeMounts, nil, nil) globalBackendConfigVersion, namespacedBackendConfigVersion, err := PatchStatefulSet(ctx, cli, source.Namespace, statefulSet) if err != nil { @@ -110,7 +103,7 @@ func makeSourceContainer(source *v1alpha1.Source) *corev1.Container { Env: generateBasicContainerEnv(source.Spec.SecretsMap, source.Spec.Pod.Env), Resources: source.Spec.Resources, ImagePullPolicy: imagePullPolicy, - EnvFrom: generateContainerEnvFrom(source.Spec.Pulsar.PulsarConfig, source.Spec.Pulsar.AuthSecret, + EnvFrom: GenerateContainerEnvFrom(source.Spec.Pulsar.PulsarConfig, source.Spec.Pulsar.AuthSecret, source.Spec.Pulsar.TLSSecret), VolumeMounts: mounts, LivenessProbe: probe, @@ -140,24 +133,24 @@ func makeSourceLabels(source *v1alpha1.Source) map[string]string { } func makeSourceVolumes(source *v1alpha1.Source, authConfig *v1alpha1.AuthConfig) []corev1.Volume { - return generatePodVolumes( + return GeneratePodVolumes( source.Spec.Pod.Volumes, source.Spec.Output.ProducerConf, nil, source.Spec.Pulsar.TLSConfig, authConfig, - getRuntimeLogConfigNames(source.Spec.Java, source.Spec.Python, source.Spec.Golang), + GetRuntimeLogConfigNames(source.Spec.Java, source.Spec.Python, source.Spec.Golang), source.Spec.LogTopicAgent) } func makeSourceVolumeMounts(source *v1alpha1.Source, authConfig *v1alpha1.AuthConfig) []corev1.VolumeMount { - return generateContainerVolumeMounts( + return GenerateContainerVolumeMounts( source.Spec.VolumeMounts, source.Spec.Output.ProducerConf, nil, source.Spec.Pulsar.TLSConfig, authConfig, - getRuntimeLogConfigNames(source.Spec.Java, source.Spec.Python, source.Spec.Golang), + GetRuntimeLogConfigNames(source.Spec.Java, source.Spec.Python, source.Spec.Golang), source.Spec.LogTopicAgent) } @@ -171,13 +164,13 @@ func makeSourceCommand(source *v1alpha1.Source) []string { } return MakeJavaFunctionCommand(spec.Java.JarLocation, spec.Java.Jar, spec.Name, spec.ClusterName, - generateJavaLogConfigCommand(spec.Java, spec.LogTopicAgent), + GenerateJavaLogConfigCommand(spec.Java, spec.LogTopicAgent), parseJavaLogLevel(spec.Java), generateSourceDetailsInJSON(source), spec.Java.ExtraDependenciesDir, string(source.UID), spec.Java.JavaOpts, hasPulsarctl, hasWget, spec.Pulsar.AuthSecret != "", spec.Pulsar.TLSSecret != "", spec.SecretsMap, spec.StateConfig, spec.Pulsar.TLSConfig, spec.Pulsar.AuthConfig, nil, - generateJavaLogConfigFileName(spec.Java)) + GenerateJavaLogConfigFileName(spec.Java)) } func generateSourceDetailsInJSON(source *v1alpha1.Source) string {