Skip to content

Commit

Permalink
Use env instead of envFrom
Browse files Browse the repository at this point in the history
  • Loading branch information
jiangpengcheng committed Feb 26, 2024
1 parent d09cb33 commit e92db22
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 78 deletions.
14 changes: 0 additions & 14 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/streamnative/function-mesh/utils"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/predicate"

Expand Down Expand Up @@ -343,16 +342,3 @@ func getBackgroundDeletionPolicy() client.DeleteOption {
}
return deleteOptions
}

func mergeGlobalEnv(globalConfigMap *corev1.ConfigMap, statefulSet *appsv1.StatefulSet) {
var globalEnvs []corev1.EnvVar
for key, val := range globalConfigMap.Data {
globalEnvs = append(globalEnvs, corev1.EnvVar{
Name: key,
Value: val,
})
}
for i := range statefulSet.Spec.Template.Spec.Containers {
statefulSet.Spec.Template.Spec.Containers[i].Env = append(statefulSet.Spec.Template.Spec.Containers[i].Env, globalEnvs...)
}
}
18 changes: 4 additions & 14 deletions controllers/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package controllers
import (
"context"

"github.com/streamnative/function-mesh/utils"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"

"github.com/streamnative/function-mesh/api/compute/v1alpha1"
Expand Down Expand Up @@ -70,7 +69,7 @@ func (r *FunctionReconciler) ObserveFunctionStatefulSet(ctx context.Context, fun
}
function.Status.Selector = selector.String()

if r.checkIfStatefulSetNeedUpdate(statefulSet, function) {
if r.checkIfStatefulSetNeedUpdate(ctx, statefulSet, function) {
condition.Status = metav1.ConditionFalse
condition.Action = v1alpha1.Update
function.Status.Conditions[v1alpha1.StatefulSet] = condition
Expand All @@ -94,17 +93,8 @@ func (r *FunctionReconciler) ApplyFunctionStatefulSet(ctx context.Context, funct
if condition.Status == metav1.ConditionTrue && !newGeneration {
return nil
}
desiredStatefulSet := spec.MakeFunctionStatefulSet(function)
desiredStatefulSet := spec.MakeFunctionStatefulSet(ctx, r, function)
keepStatefulSetUnchangeableFields(ctx, r, r.Log, desiredStatefulSet)
if utils.GlobalConfigMap != "" {
globalCM := &corev1.ConfigMap{}
err := r.Get(ctx, types.NamespacedName{Namespace: utils.GlobalConfigMapNamespace, Name: utils.GlobalConfigMap}, globalCM)
if err != nil {
r.Log.Error(err, "error getting global configmap")
return err
}
mergeGlobalEnv(globalCM, desiredStatefulSet)
}
desiredStatefulSetSpec := desiredStatefulSet.Spec
if _, err := ctrl.CreateOrUpdate(ctx, r.Client, desiredStatefulSet, func() error {
// function statefulSet mutate logic
Expand Down Expand Up @@ -413,9 +403,9 @@ func (r *FunctionReconciler) ApplyFunctionCleanUpJob(ctx context.Context, functi
return nil
}

func (r *FunctionReconciler) checkIfStatefulSetNeedUpdate(statefulSet *appsv1.StatefulSet,
func (r *FunctionReconciler) checkIfStatefulSetNeedUpdate(ctx context.Context, statefulSet *appsv1.StatefulSet,
function *v1alpha1.Function) bool {
return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &spec.MakeFunctionStatefulSet(function).Spec)
return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &spec.MakeFunctionStatefulSet(ctx, r, function).Spec)
}

func (r *FunctionReconciler) checkIfHPANeedUpdate(hpa *autov2.HorizontalPodAutoscaler,
Expand Down
18 changes: 4 additions & 14 deletions controllers/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package controllers
import (
"context"

"github.com/streamnative/function-mesh/utils"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"

"github.com/streamnative/function-mesh/api/compute/v1alpha1"
Expand Down Expand Up @@ -70,7 +69,7 @@ func (r *SinkReconciler) ObserveSinkStatefulSet(ctx context.Context, sink *v1alp
}
sink.Status.Selector = selector.String()

if r.checkIfStatefulSetNeedUpdate(statefulSet, sink) {
if r.checkIfStatefulSetNeedUpdate(ctx, statefulSet, sink) {
condition.Status = metav1.ConditionFalse
condition.Action = v1alpha1.Update
sink.Status.Conditions[v1alpha1.StatefulSet] = condition
Expand All @@ -93,17 +92,8 @@ func (r *SinkReconciler) ApplySinkStatefulSet(ctx context.Context, sink *v1alpha
if condition.Status == metav1.ConditionTrue && !newGeneration {
return nil
}
desiredStatefulSet := spec.MakeSinkStatefulSet(sink)
desiredStatefulSet := spec.MakeSinkStatefulSet(ctx, r, sink)
keepStatefulSetUnchangeableFields(ctx, r, r.Log, desiredStatefulSet)
if utils.GlobalConfigMap != "" {
globalCM := &corev1.ConfigMap{}
err := r.Get(ctx, types.NamespacedName{Namespace: utils.GlobalConfigMapNamespace, Name: utils.GlobalConfigMap}, globalCM)
if err != nil {
r.Log.Error(err, "error getting global configmap")
return err
}
mergeGlobalEnv(globalCM, desiredStatefulSet)
}
desiredStatefulSetSpec := desiredStatefulSet.Spec
if _, err := ctrl.CreateOrUpdate(ctx, r.Client, desiredStatefulSet, func() error {
// sink statefulSet mutate logic
Expand Down Expand Up @@ -410,8 +400,8 @@ func (r *SinkReconciler) ApplySinkCleanUpJob(ctx context.Context, sink *v1alpha1
return nil
}

func (r *SinkReconciler) checkIfStatefulSetNeedUpdate(statefulSet *appsv1.StatefulSet, sink *v1alpha1.Sink) bool {
return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &spec.MakeSinkStatefulSet(sink).Spec)
func (r *SinkReconciler) checkIfStatefulSetNeedUpdate(ctx context.Context, statefulSet *appsv1.StatefulSet, sink *v1alpha1.Sink) bool {
return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &spec.MakeSinkStatefulSet(ctx, r, sink).Spec)
}

func (r *SinkReconciler) checkIfHPANeedUpdate(hpa *autov2.HorizontalPodAutoscaler, sink *v1alpha1.Sink) bool {
Expand Down
2 changes: 1 addition & 1 deletion controllers/sink_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var _ = Describe("Sink Controller", func() {
if sink.Status.Conditions == nil {
sink.Status.Conditions = make(map[v1alpha1.Component]v1alpha1.ResourceCondition)
}
statefulSet := spec.MakeSinkStatefulSet(sink)
statefulSet := spec.MakeSinkStatefulSet(context.Background(), k8sClient, sink)

It("Should create pulsar configmap successfully", func() {
Expect(k8sClient.Create(context.Background(), pulsarConfig)).Should(Succeed())
Expand Down
18 changes: 4 additions & 14 deletions controllers/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package controllers
import (
"context"

"github.com/streamnative/function-mesh/utils"
autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2"

"github.com/streamnative/function-mesh/api/compute/v1alpha1"
Expand Down Expand Up @@ -70,7 +69,7 @@ func (r *SourceReconciler) ObserveSourceStatefulSet(ctx context.Context, source
}
source.Status.Selector = selector.String()

if r.checkIfStatefulSetNeedUpdate(statefulSet, source) {
if r.checkIfStatefulSetNeedUpdate(ctx, statefulSet, source) {
condition.Status = metav1.ConditionFalse
condition.Action = v1alpha1.Update
source.Status.Conditions[v1alpha1.StatefulSet] = condition
Expand All @@ -94,17 +93,8 @@ func (r *SourceReconciler) ApplySourceStatefulSet(ctx context.Context, source *v
if condition.Status == metav1.ConditionTrue && !newGeneration {
return nil
}
desiredStatefulSet := spec.MakeSourceStatefulSet(source)
desiredStatefulSet := spec.MakeSourceStatefulSet(ctx, r, source)
keepStatefulSetUnchangeableFields(ctx, r, r.Log, desiredStatefulSet)
if utils.GlobalConfigMap != "" {
globalCM := &corev1.ConfigMap{}
err := r.Get(ctx, types.NamespacedName{Namespace: utils.GlobalConfigMapNamespace, Name: utils.GlobalConfigMap}, globalCM)
if err != nil {
r.Log.Error(err, "error getting global configmap")
return err
}
mergeGlobalEnv(globalCM, desiredStatefulSet)
}
desiredStatefulSetSpec := desiredStatefulSet.Spec
if _, err := ctrl.CreateOrUpdate(ctx, r.Client, desiredStatefulSet, func() error {
// source statefulSet mutate logic
Expand Down Expand Up @@ -412,8 +402,8 @@ func (r *SourceReconciler) ApplySourceCleanUpJob(ctx context.Context, source *v1
return nil
}

func (r *SourceReconciler) checkIfStatefulSetNeedUpdate(statefulSet *appsv1.StatefulSet, source *v1alpha1.Source) bool {
return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &spec.MakeSourceStatefulSet(source).Spec)
func (r *SourceReconciler) checkIfStatefulSetNeedUpdate(ctx context.Context, statefulSet *appsv1.StatefulSet, source *v1alpha1.Source) bool {
return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &spec.MakeSourceStatefulSet(ctx, r, source).Spec)
}

func (r *SourceReconciler) checkIfHPANeedUpdate(hpa *autov2.HorizontalPodAutoscaler, source *v1alpha1.Source) bool {
Expand Down
2 changes: 1 addition & 1 deletion controllers/source_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var _ = Describe("Source Controller", func() {
if source.Status.Conditions == nil {
source.Status.Conditions = make(map[v1alpha1.Component]v1alpha1.ResourceCondition)
}
statefulSet := spec.MakeSourceStatefulSet(source)
statefulSet := spec.MakeSourceStatefulSet(context.Background(), k8sClient, source)

It("Should create pulsar configmap successfully", func() {
Expect(k8sClient.Create(context.Background(), pulsarConfig)).Should(Succeed())
Expand Down
53 changes: 39 additions & 14 deletions controllers/spec/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package spec
import (
"bytes"
"context"

Check failure on line 22 in controllers/spec/common.go

View workflow job for this annotation

GitHub Actions / unit-tests (1.19)

File is not `goimports`-ed (goimports)

Check failure on line 22 in controllers/spec/common.go

View workflow job for this annotation

GitHub Actions / unit-tests (1.20.4)

File is not `goimports`-ed (goimports)

// used for template
_ "embed"
"encoding/json"
Expand All @@ -40,6 +39,7 @@ import (
v1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -203,7 +203,7 @@ func MakeHeadlessServiceName(serviceName string) string {
return fmt.Sprintf("%s-headless", serviceName)
}

func MakeStatefulSet(objectMeta *metav1.ObjectMeta, replicas *int32, downloaderImage string,
func MakeStatefulSet(ctx context.Context, r client.Reader, objectMeta *metav1.ObjectMeta, replicas *int32, downloaderImage string,
container *corev1.Container, filebeatContainer *corev1.Container,
volumes []corev1.Volume, labels map[string]string, policy v1alpha1.PodPolicy, pulsar v1alpha1.PulsarMessaging,
javaRuntime *v1alpha1.JavaRuntime, pythonRuntime *v1alpha1.PythonRuntime,
Expand Down Expand Up @@ -265,7 +265,7 @@ func MakeStatefulSet(objectMeta *metav1.ObjectMeta, replicas *int32, downloaderI
Name: DownloaderVolume,
})
}
return &appsv1.StatefulSet{
desiredStatefulSet := &appsv1.StatefulSet{
TypeMeta: metav1.TypeMeta{
Kind: "StatefulSet",
APIVersion: "apps/v1",
Expand All @@ -275,6 +275,8 @@ func MakeStatefulSet(objectMeta *metav1.ObjectMeta, replicas *int32, downloaderI
MakeHeadlessServiceName(objectMeta.Name), downloaderContainer, volumeClaimTemplates,
persistentVolumeClaimRetentionPolicy),
}
MergeGlobalAndNamespacedEnv(ctx, r, objectMeta.Namespace, desiredStatefulSet)
return desiredStatefulSet
}

func MakeStatefulSetSpec(replicas *int32, container *corev1.Container, filebeatContainer *corev1.Container,
Expand Down Expand Up @@ -1480,17 +1482,6 @@ func generateContainerEnvFrom(messagingConfig string, authSecret string, tlsSecr
})
}

// add env from namespaces config map with optional=true
optional := true
if utils.NamespacedConfigMap != "" {
envs = append(envs, corev1.EnvFromSource{
ConfigMapRef: &corev1.ConfigMapEnvSource{
LocalObjectReference: corev1.LocalObjectReference{Name: utils.NamespacedConfigMap},
Optional: &optional,
},
})
}

return envs
}

Expand Down Expand Up @@ -2287,3 +2278,37 @@ func makeFilebeatContainer(volumeMounts []corev1.VolumeMount, envVar []corev1.En
},
}
}

func MergeGlobalAndNamespacedEnv(ctx context.Context, r client.Reader, namespace string, statefulSet *appsv1.StatefulSet) {
var globalEnvs []corev1.EnvVar
if utils.GlobalConfigMap != "" {
globalCM := &corev1.ConfigMap{}
err := r.Get(ctx, types.NamespacedName{Namespace: utils.GlobalConfigMapNamespace, Name: utils.GlobalConfigMap}, globalCM)
if err != nil {
return
}
for key, val := range globalCM.Data {
globalEnvs = append(globalEnvs, corev1.EnvVar{
Name: key,
Value: val,
})
}
}
if utils.NamespacedConfigMap != "" {
namespacedCM := &corev1.ConfigMap{}
err := r.Get(ctx, types.NamespacedName{Namespace: namespace, Name: utils.NamespacedConfigMap}, namespacedCM)
if err != nil {
return
}
for key, val := range namespacedCM.Data {
globalEnvs = append(globalEnvs, corev1.EnvVar{
Name: key,
Value: val,
})
}
}

for i := range statefulSet.Spec.Template.Spec.Containers {
statefulSet.Spec.Template.Spec.Containers[i].Env = append(statefulSet.Spec.Template.Spec.Containers[i].Env, globalEnvs...)
}
}
6 changes: 4 additions & 2 deletions controllers/spec/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package spec

import (
"context"
"regexp"

"github.com/streamnative/function-mesh/utils"
Expand All @@ -27,6 +28,7 @@ import (
v1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"

"github.com/streamnative/function-mesh/api/compute/v1alpha1"
Expand Down Expand Up @@ -57,9 +59,9 @@ func MakeFunctionService(function *v1alpha1.Function) *corev1.Service {
return MakeService(objectMeta, labels)
}

func MakeFunctionStatefulSet(function *v1alpha1.Function) *appsv1.StatefulSet {
func MakeFunctionStatefulSet(ctx context.Context, r client.Reader, function *v1alpha1.Function) *appsv1.StatefulSet {
objectMeta := MakeFunctionObjectMeta(function)
return MakeStatefulSet(objectMeta, function.Spec.Replicas, function.Spec.DownloaderImage,
return MakeStatefulSet(ctx, r, objectMeta, function.Spec.Replicas, function.Spec.DownloaderImage,
makeFunctionContainer(function), makeFilebeatContainer(function.Spec.VolumeMounts, function.Spec.Pod.Env,
function.Spec.Name, function.Spec.LogTopic, function.Spec.LogTopicAgent, function.Spec.Pulsar.TLSConfig,
function.Spec.Pulsar.AuthConfig, function.Spec.Pulsar.PulsarConfig, function.Spec.Pulsar.TLSSecret,
Expand Down
6 changes: 4 additions & 2 deletions controllers/spec/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package spec

import (
"context"
"regexp"

"github.com/streamnative/function-mesh/utils"
Expand All @@ -27,6 +28,7 @@ import (
v1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/streamnative/function-mesh/api/compute/v1alpha1"
)
Expand All @@ -53,9 +55,9 @@ func MakeSinkService(sink *v1alpha1.Sink) *corev1.Service {
return MakeService(objectMeta, labels)
}

func MakeSinkStatefulSet(sink *v1alpha1.Sink) *appsv1.StatefulSet {
func MakeSinkStatefulSet(ctx context.Context, r client.Reader, sink *v1alpha1.Sink) *appsv1.StatefulSet {
objectMeta := MakeSinkObjectMeta(sink)
return MakeStatefulSet(objectMeta, sink.Spec.Replicas, sink.Spec.DownloaderImage, makeSinkContainer(sink),
return MakeStatefulSet(ctx, r, objectMeta, sink.Spec.Replicas, sink.Spec.DownloaderImage, makeSinkContainer(sink),
makeFilebeatContainer(sink.Spec.VolumeMounts, sink.Spec.Pod.Env, sink.Spec.Name, sink.Spec.LogTopic, sink.Spec.LogTopicAgent,
sink.Spec.Pulsar.TLSConfig, sink.Spec.Pulsar.AuthConfig, sink.Spec.Pulsar.PulsarConfig, sink.Spec.Pulsar.TLSSecret,
sink.Spec.Pulsar.AuthSecret, sink.Spec.FilebeatImage),
Expand Down
6 changes: 4 additions & 2 deletions controllers/spec/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package spec

import (
"context"
"fmt"
"regexp"

Expand All @@ -28,6 +29,7 @@ import (
v1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/streamnative/function-mesh/api/compute/v1alpha1"
)
Expand All @@ -54,9 +56,9 @@ func MakeSourceService(source *v1alpha1.Source) *corev1.Service {
return MakeService(objectMeta, labels)
}

func MakeSourceStatefulSet(source *v1alpha1.Source) *appsv1.StatefulSet {
func MakeSourceStatefulSet(ctx context.Context, r client.Reader, source *v1alpha1.Source) *appsv1.StatefulSet {
objectMeta := MakeSourceObjectMeta(source)
return MakeStatefulSet(objectMeta, source.Spec.Replicas, source.Spec.DownloaderImage, makeSourceContainer(source),
return MakeStatefulSet(ctx, r, objectMeta, source.Spec.Replicas, source.Spec.DownloaderImage, makeSourceContainer(source),
makeFilebeatContainer(source.Spec.VolumeMounts, source.Spec.Pod.Env, source.Spec.Name, source.Spec.LogTopic, source.Spec.LogTopicAgent,
source.Spec.Pulsar.TLSConfig, source.Spec.Pulsar.AuthConfig, source.Spec.Pulsar.PulsarConfig, source.Spec.Pulsar.TLSSecret,
source.Spec.Pulsar.AuthSecret, source.Spec.FilebeatImage),
Expand Down

0 comments on commit e92db22

Please sign in to comment.