From e92db2215524b4643e5e58d3b70d9fc69b0755ef Mon Sep 17 00:00:00 2001 From: jiangpengcheng Date: Mon, 26 Feb 2024 08:47:42 +0800 Subject: [PATCH] Use env instead of envFrom --- controllers/common.go | 14 ------- controllers/function.go | 18 ++------- controllers/sink.go | 18 ++------- controllers/sink_controller_test.go | 2 +- controllers/source.go | 18 ++------- controllers/source_controller_test.go | 2 +- controllers/spec/common.go | 53 ++++++++++++++++++++------- controllers/spec/function.go | 6 ++- controllers/spec/sink.go | 6 ++- controllers/spec/source.go | 6 ++- 10 files changed, 65 insertions(+), 78 deletions(-) diff --git a/controllers/common.go b/controllers/common.go index beb5634e8..33ab9f11a 100644 --- a/controllers/common.go +++ b/controllers/common.go @@ -24,7 +24,6 @@ import ( "github.com/streamnative/function-mesh/utils" autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2" - corev1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -343,16 +342,3 @@ func getBackgroundDeletionPolicy() client.DeleteOption { } return deleteOptions } - -func mergeGlobalEnv(globalConfigMap *corev1.ConfigMap, statefulSet *appsv1.StatefulSet) { - var globalEnvs []corev1.EnvVar - for key, val := range globalConfigMap.Data { - globalEnvs = append(globalEnvs, corev1.EnvVar{ - Name: key, - Value: val, - }) - } - for i := range statefulSet.Spec.Template.Spec.Containers { - statefulSet.Spec.Template.Spec.Containers[i].Env = append(statefulSet.Spec.Template.Spec.Containers[i].Env, globalEnvs...) - } -} diff --git a/controllers/function.go b/controllers/function.go index d6576fa6f..7c926d051 100644 --- a/controllers/function.go +++ b/controllers/function.go @@ -20,7 +20,6 @@ package controllers import ( "context" - "github.com/streamnative/function-mesh/utils" autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2" "github.com/streamnative/function-mesh/api/compute/v1alpha1" @@ -70,7 +69,7 @@ func (r *FunctionReconciler) ObserveFunctionStatefulSet(ctx context.Context, fun } function.Status.Selector = selector.String() - if r.checkIfStatefulSetNeedUpdate(statefulSet, function) { + if r.checkIfStatefulSetNeedUpdate(ctx, statefulSet, function) { condition.Status = metav1.ConditionFalse condition.Action = v1alpha1.Update function.Status.Conditions[v1alpha1.StatefulSet] = condition @@ -94,17 +93,8 @@ func (r *FunctionReconciler) ApplyFunctionStatefulSet(ctx context.Context, funct if condition.Status == metav1.ConditionTrue && !newGeneration { return nil } - desiredStatefulSet := spec.MakeFunctionStatefulSet(function) + desiredStatefulSet := spec.MakeFunctionStatefulSet(ctx, r, function) keepStatefulSetUnchangeableFields(ctx, r, r.Log, desiredStatefulSet) - if utils.GlobalConfigMap != "" { - globalCM := &corev1.ConfigMap{} - err := r.Get(ctx, types.NamespacedName{Namespace: utils.GlobalConfigMapNamespace, Name: utils.GlobalConfigMap}, globalCM) - if err != nil { - r.Log.Error(err, "error getting global configmap") - return err - } - mergeGlobalEnv(globalCM, desiredStatefulSet) - } desiredStatefulSetSpec := desiredStatefulSet.Spec if _, err := ctrl.CreateOrUpdate(ctx, r.Client, desiredStatefulSet, func() error { // function statefulSet mutate logic @@ -413,9 +403,9 @@ func (r *FunctionReconciler) ApplyFunctionCleanUpJob(ctx context.Context, functi return nil } -func (r *FunctionReconciler) checkIfStatefulSetNeedUpdate(statefulSet *appsv1.StatefulSet, +func (r *FunctionReconciler) checkIfStatefulSetNeedUpdate(ctx context.Context, statefulSet *appsv1.StatefulSet, function *v1alpha1.Function) bool { - return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &spec.MakeFunctionStatefulSet(function).Spec) + return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &spec.MakeFunctionStatefulSet(ctx, r, function).Spec) } func (r *FunctionReconciler) checkIfHPANeedUpdate(hpa *autov2.HorizontalPodAutoscaler, diff --git a/controllers/sink.go b/controllers/sink.go index 616db77a7..254e877bd 100644 --- a/controllers/sink.go +++ b/controllers/sink.go @@ -20,7 +20,6 @@ package controllers import ( "context" - "github.com/streamnative/function-mesh/utils" autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2" "github.com/streamnative/function-mesh/api/compute/v1alpha1" @@ -70,7 +69,7 @@ func (r *SinkReconciler) ObserveSinkStatefulSet(ctx context.Context, sink *v1alp } sink.Status.Selector = selector.String() - if r.checkIfStatefulSetNeedUpdate(statefulSet, sink) { + if r.checkIfStatefulSetNeedUpdate(ctx, statefulSet, sink) { condition.Status = metav1.ConditionFalse condition.Action = v1alpha1.Update sink.Status.Conditions[v1alpha1.StatefulSet] = condition @@ -93,17 +92,8 @@ func (r *SinkReconciler) ApplySinkStatefulSet(ctx context.Context, sink *v1alpha if condition.Status == metav1.ConditionTrue && !newGeneration { return nil } - desiredStatefulSet := spec.MakeSinkStatefulSet(sink) + desiredStatefulSet := spec.MakeSinkStatefulSet(ctx, r, sink) keepStatefulSetUnchangeableFields(ctx, r, r.Log, desiredStatefulSet) - if utils.GlobalConfigMap != "" { - globalCM := &corev1.ConfigMap{} - err := r.Get(ctx, types.NamespacedName{Namespace: utils.GlobalConfigMapNamespace, Name: utils.GlobalConfigMap}, globalCM) - if err != nil { - r.Log.Error(err, "error getting global configmap") - return err - } - mergeGlobalEnv(globalCM, desiredStatefulSet) - } desiredStatefulSetSpec := desiredStatefulSet.Spec if _, err := ctrl.CreateOrUpdate(ctx, r.Client, desiredStatefulSet, func() error { // sink statefulSet mutate logic @@ -410,8 +400,8 @@ func (r *SinkReconciler) ApplySinkCleanUpJob(ctx context.Context, sink *v1alpha1 return nil } -func (r *SinkReconciler) checkIfStatefulSetNeedUpdate(statefulSet *appsv1.StatefulSet, sink *v1alpha1.Sink) bool { - return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &spec.MakeSinkStatefulSet(sink).Spec) +func (r *SinkReconciler) checkIfStatefulSetNeedUpdate(ctx context.Context, statefulSet *appsv1.StatefulSet, sink *v1alpha1.Sink) bool { + return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &spec.MakeSinkStatefulSet(ctx, r, sink).Spec) } func (r *SinkReconciler) checkIfHPANeedUpdate(hpa *autov2.HorizontalPodAutoscaler, sink *v1alpha1.Sink) bool { diff --git a/controllers/sink_controller_test.go b/controllers/sink_controller_test.go index 38373fd2b..e2ec15695 100644 --- a/controllers/sink_controller_test.go +++ b/controllers/sink_controller_test.go @@ -41,7 +41,7 @@ var _ = Describe("Sink Controller", func() { if sink.Status.Conditions == nil { sink.Status.Conditions = make(map[v1alpha1.Component]v1alpha1.ResourceCondition) } - statefulSet := spec.MakeSinkStatefulSet(sink) + statefulSet := spec.MakeSinkStatefulSet(context.Background(), k8sClient, sink) It("Should create pulsar configmap successfully", func() { Expect(k8sClient.Create(context.Background(), pulsarConfig)).Should(Succeed()) diff --git a/controllers/source.go b/controllers/source.go index 4f49d97e4..9fa5f060d 100644 --- a/controllers/source.go +++ b/controllers/source.go @@ -20,7 +20,6 @@ package controllers import ( "context" - "github.com/streamnative/function-mesh/utils" autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2" "github.com/streamnative/function-mesh/api/compute/v1alpha1" @@ -70,7 +69,7 @@ func (r *SourceReconciler) ObserveSourceStatefulSet(ctx context.Context, source } source.Status.Selector = selector.String() - if r.checkIfStatefulSetNeedUpdate(statefulSet, source) { + if r.checkIfStatefulSetNeedUpdate(ctx, statefulSet, source) { condition.Status = metav1.ConditionFalse condition.Action = v1alpha1.Update source.Status.Conditions[v1alpha1.StatefulSet] = condition @@ -94,17 +93,8 @@ func (r *SourceReconciler) ApplySourceStatefulSet(ctx context.Context, source *v if condition.Status == metav1.ConditionTrue && !newGeneration { return nil } - desiredStatefulSet := spec.MakeSourceStatefulSet(source) + desiredStatefulSet := spec.MakeSourceStatefulSet(ctx, r, source) keepStatefulSetUnchangeableFields(ctx, r, r.Log, desiredStatefulSet) - if utils.GlobalConfigMap != "" { - globalCM := &corev1.ConfigMap{} - err := r.Get(ctx, types.NamespacedName{Namespace: utils.GlobalConfigMapNamespace, Name: utils.GlobalConfigMap}, globalCM) - if err != nil { - r.Log.Error(err, "error getting global configmap") - return err - } - mergeGlobalEnv(globalCM, desiredStatefulSet) - } desiredStatefulSetSpec := desiredStatefulSet.Spec if _, err := ctrl.CreateOrUpdate(ctx, r.Client, desiredStatefulSet, func() error { // source statefulSet mutate logic @@ -412,8 +402,8 @@ func (r *SourceReconciler) ApplySourceCleanUpJob(ctx context.Context, source *v1 return nil } -func (r *SourceReconciler) checkIfStatefulSetNeedUpdate(statefulSet *appsv1.StatefulSet, source *v1alpha1.Source) bool { - return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &spec.MakeSourceStatefulSet(source).Spec) +func (r *SourceReconciler) checkIfStatefulSetNeedUpdate(ctx context.Context, statefulSet *appsv1.StatefulSet, source *v1alpha1.Source) bool { + return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &spec.MakeSourceStatefulSet(ctx, r, source).Spec) } func (r *SourceReconciler) checkIfHPANeedUpdate(hpa *autov2.HorizontalPodAutoscaler, source *v1alpha1.Source) bool { diff --git a/controllers/source_controller_test.go b/controllers/source_controller_test.go index e2b20f1e5..cf86e6ee5 100644 --- a/controllers/source_controller_test.go +++ b/controllers/source_controller_test.go @@ -40,7 +40,7 @@ var _ = Describe("Source Controller", func() { if source.Status.Conditions == nil { source.Status.Conditions = make(map[v1alpha1.Component]v1alpha1.ResourceCondition) } - statefulSet := spec.MakeSourceStatefulSet(source) + statefulSet := spec.MakeSourceStatefulSet(context.Background(), k8sClient, source) It("Should create pulsar configmap successfully", func() { Expect(k8sClient.Create(context.Background(), pulsarConfig)).Should(Succeed()) diff --git a/controllers/spec/common.go b/controllers/spec/common.go index 7b27c5ae0..acfc34b5a 100644 --- a/controllers/spec/common.go +++ b/controllers/spec/common.go @@ -20,7 +20,6 @@ package spec import ( "bytes" "context" - // used for template _ "embed" "encoding/json" @@ -40,6 +39,7 @@ import ( v1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -203,7 +203,7 @@ func MakeHeadlessServiceName(serviceName string) string { return fmt.Sprintf("%s-headless", serviceName) } -func MakeStatefulSet(objectMeta *metav1.ObjectMeta, replicas *int32, downloaderImage string, +func MakeStatefulSet(ctx context.Context, r client.Reader, objectMeta *metav1.ObjectMeta, replicas *int32, downloaderImage string, container *corev1.Container, filebeatContainer *corev1.Container, volumes []corev1.Volume, labels map[string]string, policy v1alpha1.PodPolicy, pulsar v1alpha1.PulsarMessaging, javaRuntime *v1alpha1.JavaRuntime, pythonRuntime *v1alpha1.PythonRuntime, @@ -265,7 +265,7 @@ func MakeStatefulSet(objectMeta *metav1.ObjectMeta, replicas *int32, downloaderI Name: DownloaderVolume, }) } - return &appsv1.StatefulSet{ + desiredStatefulSet := &appsv1.StatefulSet{ TypeMeta: metav1.TypeMeta{ Kind: "StatefulSet", APIVersion: "apps/v1", @@ -275,6 +275,8 @@ func MakeStatefulSet(objectMeta *metav1.ObjectMeta, replicas *int32, downloaderI MakeHeadlessServiceName(objectMeta.Name), downloaderContainer, volumeClaimTemplates, persistentVolumeClaimRetentionPolicy), } + MergeGlobalAndNamespacedEnv(ctx, r, objectMeta.Namespace, desiredStatefulSet) + return desiredStatefulSet } func MakeStatefulSetSpec(replicas *int32, container *corev1.Container, filebeatContainer *corev1.Container, @@ -1480,17 +1482,6 @@ func generateContainerEnvFrom(messagingConfig string, authSecret string, tlsSecr }) } - // add env from namespaces config map with optional=true - optional := true - if utils.NamespacedConfigMap != "" { - envs = append(envs, corev1.EnvFromSource{ - ConfigMapRef: &corev1.ConfigMapEnvSource{ - LocalObjectReference: corev1.LocalObjectReference{Name: utils.NamespacedConfigMap}, - Optional: &optional, - }, - }) - } - return envs } @@ -2287,3 +2278,37 @@ func makeFilebeatContainer(volumeMounts []corev1.VolumeMount, envVar []corev1.En }, } } + +func MergeGlobalAndNamespacedEnv(ctx context.Context, r client.Reader, namespace string, statefulSet *appsv1.StatefulSet) { + var globalEnvs []corev1.EnvVar + if utils.GlobalConfigMap != "" { + globalCM := &corev1.ConfigMap{} + err := r.Get(ctx, types.NamespacedName{Namespace: utils.GlobalConfigMapNamespace, Name: utils.GlobalConfigMap}, globalCM) + if err != nil { + return + } + for key, val := range globalCM.Data { + globalEnvs = append(globalEnvs, corev1.EnvVar{ + Name: key, + Value: val, + }) + } + } + if utils.NamespacedConfigMap != "" { + namespacedCM := &corev1.ConfigMap{} + err := r.Get(ctx, types.NamespacedName{Namespace: namespace, Name: utils.NamespacedConfigMap}, namespacedCM) + if err != nil { + return + } + for key, val := range namespacedCM.Data { + globalEnvs = append(globalEnvs, corev1.EnvVar{ + Name: key, + Value: val, + }) + } + } + + for i := range statefulSet.Spec.Template.Spec.Containers { + statefulSet.Spec.Template.Spec.Containers[i].Env = append(statefulSet.Spec.Template.Spec.Containers[i].Env, globalEnvs...) + } +} diff --git a/controllers/spec/function.go b/controllers/spec/function.go index 0392746b6..18b48f727 100644 --- a/controllers/spec/function.go +++ b/controllers/spec/function.go @@ -18,6 +18,7 @@ package spec import ( + "context" "regexp" "github.com/streamnative/function-mesh/utils" @@ -27,6 +28,7 @@ import ( v1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" logf "sigs.k8s.io/controller-runtime/pkg/log" "github.com/streamnative/function-mesh/api/compute/v1alpha1" @@ -57,9 +59,9 @@ func MakeFunctionService(function *v1alpha1.Function) *corev1.Service { return MakeService(objectMeta, labels) } -func MakeFunctionStatefulSet(function *v1alpha1.Function) *appsv1.StatefulSet { +func MakeFunctionStatefulSet(ctx context.Context, r client.Reader, function *v1alpha1.Function) *appsv1.StatefulSet { objectMeta := MakeFunctionObjectMeta(function) - return MakeStatefulSet(objectMeta, function.Spec.Replicas, function.Spec.DownloaderImage, + return MakeStatefulSet(ctx, r, objectMeta, function.Spec.Replicas, function.Spec.DownloaderImage, makeFunctionContainer(function), makeFilebeatContainer(function.Spec.VolumeMounts, function.Spec.Pod.Env, function.Spec.Name, function.Spec.LogTopic, function.Spec.LogTopicAgent, function.Spec.Pulsar.TLSConfig, function.Spec.Pulsar.AuthConfig, function.Spec.Pulsar.PulsarConfig, function.Spec.Pulsar.TLSSecret, diff --git a/controllers/spec/sink.go b/controllers/spec/sink.go index d88c27f89..e84aa7482 100644 --- a/controllers/spec/sink.go +++ b/controllers/spec/sink.go @@ -18,6 +18,7 @@ package spec import ( + "context" "regexp" "github.com/streamnative/function-mesh/utils" @@ -27,6 +28,7 @@ import ( v1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/streamnative/function-mesh/api/compute/v1alpha1" ) @@ -53,9 +55,9 @@ func MakeSinkService(sink *v1alpha1.Sink) *corev1.Service { return MakeService(objectMeta, labels) } -func MakeSinkStatefulSet(sink *v1alpha1.Sink) *appsv1.StatefulSet { +func MakeSinkStatefulSet(ctx context.Context, r client.Reader, sink *v1alpha1.Sink) *appsv1.StatefulSet { objectMeta := MakeSinkObjectMeta(sink) - return MakeStatefulSet(objectMeta, sink.Spec.Replicas, sink.Spec.DownloaderImage, makeSinkContainer(sink), + return MakeStatefulSet(ctx, r, objectMeta, sink.Spec.Replicas, sink.Spec.DownloaderImage, makeSinkContainer(sink), makeFilebeatContainer(sink.Spec.VolumeMounts, sink.Spec.Pod.Env, sink.Spec.Name, sink.Spec.LogTopic, sink.Spec.LogTopicAgent, sink.Spec.Pulsar.TLSConfig, sink.Spec.Pulsar.AuthConfig, sink.Spec.Pulsar.PulsarConfig, sink.Spec.Pulsar.TLSSecret, sink.Spec.Pulsar.AuthSecret, sink.Spec.FilebeatImage), diff --git a/controllers/spec/source.go b/controllers/spec/source.go index 3fb249426..b551d4d2a 100644 --- a/controllers/spec/source.go +++ b/controllers/spec/source.go @@ -18,6 +18,7 @@ package spec import ( + "context" "fmt" "regexp" @@ -28,6 +29,7 @@ import ( v1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/streamnative/function-mesh/api/compute/v1alpha1" ) @@ -54,9 +56,9 @@ func MakeSourceService(source *v1alpha1.Source) *corev1.Service { return MakeService(objectMeta, labels) } -func MakeSourceStatefulSet(source *v1alpha1.Source) *appsv1.StatefulSet { +func MakeSourceStatefulSet(ctx context.Context, r client.Reader, source *v1alpha1.Source) *appsv1.StatefulSet { objectMeta := MakeSourceObjectMeta(source) - return MakeStatefulSet(objectMeta, source.Spec.Replicas, source.Spec.DownloaderImage, makeSourceContainer(source), + return MakeStatefulSet(ctx, r, objectMeta, source.Spec.Replicas, source.Spec.DownloaderImage, makeSourceContainer(source), makeFilebeatContainer(source.Spec.VolumeMounts, source.Spec.Pod.Env, source.Spec.Name, source.Spec.LogTopic, source.Spec.LogTopicAgent, source.Spec.Pulsar.TLSConfig, source.Spec.Pulsar.AuthConfig, source.Spec.Pulsar.PulsarConfig, source.Spec.Pulsar.TLSSecret, source.Spec.Pulsar.AuthSecret, source.Spec.FilebeatImage),