Skip to content

Commit

Permalink
Support pause rollout (#780)
Browse files Browse the repository at this point in the history
* Support pause rollout

* Observer StatefulSet first

* Update status

* Expose pendingChange

* Update diff

* Fix livenessrobe compare
  • Loading branch information
jiangpengcheng committed Oct 8, 2024
1 parent e416e71 commit 7ff1021
Show file tree
Hide file tree
Showing 21 changed files with 126 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
apiVersion: compute.functionmesh.io/v1alpha1
kind: Function
metadata:
annotations:
compute.functionmesh.io/pause-rollout: "false"
name: function-download-sample
namespace: default
spec:
Expand Down
2 changes: 2 additions & 0 deletions .ci/tests/integration/cases/java-function-vpa/manifests.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
apiVersion: compute.functionmesh.io/v1alpha1
kind: Function
metadata:
annotations:
compute.functionmesh.io/pause-rollout: "true"
name: function-sample-vpa
namespace: default
spec:
Expand Down
2 changes: 2 additions & 0 deletions .ci/tests/integration/cases/java-function/manifests.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
apiVersion: compute.functionmesh.io/v1alpha1
kind: Function
metadata:
annotations:
compute.functionmesh.io/pause-rollout: "true"
name: function-sample
namespace: default
spec:
Expand Down
2 changes: 2 additions & 0 deletions .ci/tests/integration/cases/java-log-config/manifests.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
apiVersion: compute.functionmesh.io/v1alpha1
kind: Function
metadata:
annotations:
compute.functionmesh.io/pause-rollout: "true"
name: java-log-config
namespace: default
spec:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
apiVersion: compute.functionmesh.io/v1alpha1
kind: Function
metadata:
annotations:
compute.functionmesh.io/pause-rollout: "false"
name: java-log-format-json
namespace: default
spec:
Expand Down
1 change: 1 addition & 0 deletions api/compute/v1alpha1/function_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ type FunctionStatus struct {
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
GlobalBackendConfigRevision string `json:"globalBackendConfigRevision,omitempty"`
NamespacedBackendConfigRevision string `json:"namespacedBackendConfigRevision,omitempty"`
PendingChange string `json:"pendingChange,omitempty"`
}

// +genclient
Expand Down
1 change: 1 addition & 0 deletions api/compute/v1alpha1/sink_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ type SinkStatus struct {
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
GlobalBackendConfigRevision string `json:"globalBackendConfigRevision,omitempty"`
NamespacedBackendConfigRevision string `json:"namespacedBackendConfigRevision,omitempty"`
PendingChange string `json:"pendingChange,omitempty"`
}

// +genclient
Expand Down
1 change: 1 addition & 0 deletions api/compute/v1alpha1/source_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ type SourceStatus struct {
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
GlobalBackendConfigRevision string `json:"globalBackendConfigRevision,omitempty"`
NamespacedBackendConfigRevision string `json:"namespacedBackendConfigRevision,omitempty"`
PendingChange string `json:"pendingChange,omitempty"`
}

// +genclient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3846,6 +3846,8 @@ spec:
observedGeneration:
format: int64
type: integer
pendingChange:
type: string
replicas:
format: int32
type: integer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3565,6 +3565,8 @@ spec:
observedGeneration:
format: int64
type: integer
pendingChange:
type: string
replicas:
format: int32
type: integer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3546,6 +3546,8 @@ spec:
observedGeneration:
format: int64
type: integer
pendingChange:
type: string
replicas:
format: int32
type: integer
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/compute.functionmesh.io_functions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3825,6 +3825,8 @@ spec:
observedGeneration:
format: int64
type: integer
pendingChange:
type: string
replicas:
format: int32
type: integer
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/compute.functionmesh.io_sinks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3544,6 +3544,8 @@ spec:
observedGeneration:
format: int64
type: integer
pendingChange:
type: string
replicas:
format: int32
type: integer
Expand Down
2 changes: 2 additions & 0 deletions config/crd/bases/compute.functionmesh.io_sources.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3525,6 +3525,8 @@ spec:
observedGeneration:
format: int64
type: integer
pendingChange:
type: string
replicas:
format: int32
type: integer
Expand Down
10 changes: 9 additions & 1 deletion controllers/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,15 @@ func (r *FunctionReconciler) checkIfStatefulSetNeedUpdate(ctx context.Context, s
if err != nil {
return false, err
}
return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &desiredStatefulSet.Spec), nil
needUpdate := !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &desiredStatefulSet.Spec)
if needUpdate {
diff, err := spec.CreateDiff(statefulSet, desiredStatefulSet)
if err != nil {
return needUpdate, err
}
function.Status.PendingChange = diff
}
return needUpdate, nil
}

func (r *FunctionReconciler) checkIfHPANeedUpdate(hpa *autov2.HorizontalPodAutoscaler,
Expand Down
16 changes: 14 additions & 2 deletions controllers/function_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,24 @@ func (r *FunctionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
function.Status.Conditions = make(map[v1alpha1.Component]v1alpha1.ResourceCondition)
}

isNewGeneration := r.checkIfFunctionGenerationsIsIncreased(function)

err = r.ObserveFunctionStatefulSet(ctx, function)
if err != nil {
return reconcile.Result{}, err
}
// skip reconcile if pauseRollout is set to true and the generation is not increased
if spec.IsPauseRollout(function) && !isNewGeneration {
err = r.Status().Update(ctx, function)
if err != nil {
r.Log.Error(err, "failed to update function status after observing statefulset")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
} else {
function.Status.PendingChange = ""
}

err = r.ObserveFunctionService(ctx, function)
if err != nil {
return reconcile.Result{}, err
Expand Down Expand Up @@ -130,8 +144,6 @@ func (r *FunctionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, err
}

isNewGeneration := r.checkIfFunctionGenerationsIsIncreased(function)

err = r.ApplyFunctionStatefulSet(ctx, function, isNewGeneration)
if err != nil {
return reconcile.Result{}, err
Expand Down
10 changes: 9 additions & 1 deletion controllers/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,15 @@ func (r *SinkReconciler) checkIfStatefulSetNeedUpdate(ctx context.Context, state
if err != nil {
return false, err
}
return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &desiredStatefulSet.Spec), nil
needUpdate := !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &desiredStatefulSet.Spec)
if needUpdate {
diff, err := spec.CreateDiff(statefulSet, desiredStatefulSet)
if err != nil {
return needUpdate, err
}
sink.Status.PendingChange = diff
}
return needUpdate, nil
}

func (r *SinkReconciler) checkIfHPANeedUpdate(hpa *autov2.HorizontalPodAutoscaler, sink *v1alpha1.Sink) bool {
Expand Down
16 changes: 14 additions & 2 deletions controllers/sink_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,24 @@ func (r *SinkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
sink.Status.Conditions = make(map[v1alpha1.Component]v1alpha1.ResourceCondition)
}

isNewGeneration := r.checkIfSinkGenerationsIsIncreased(sink)

err = r.ObserveSinkStatefulSet(ctx, sink)
if err != nil {
return reconcile.Result{}, err
}
// skip reconcile if pauseRollout is set to true and the generation is not increased
if spec.IsPauseRollout(sink) && !isNewGeneration {
err = r.Status().Update(ctx, sink)
if err != nil {
r.Log.Error(err, "failed to update sink status after observing statefulset")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
} else {
sink.Status.PendingChange = ""
}

err = r.ObserveSinkService(ctx, sink)
if err != nil {
return reconcile.Result{}, err
Expand Down Expand Up @@ -129,8 +143,6 @@ func (r *SinkReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
return ctrl.Result{}, err
}

isNewGeneration := r.checkIfSinkGenerationsIsIncreased(sink)

err = r.ApplySinkStatefulSet(ctx, sink, isNewGeneration)
if err != nil {
return reconcile.Result{}, err
Expand Down
10 changes: 9 additions & 1 deletion controllers/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,15 @@ func (r *SourceReconciler) checkIfStatefulSetNeedUpdate(ctx context.Context, sta
if err != nil {
return false, err
}
return !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &desiredStatefulSet.Spec), nil
needUpdate := !spec.CheckIfStatefulSetSpecIsEqual(&statefulSet.Spec, &desiredStatefulSet.Spec)
if needUpdate {
diff, err := spec.CreateDiff(statefulSet, desiredStatefulSet)
if err != nil {
return needUpdate, err
}
source.Status.PendingChange = diff
}
return needUpdate, nil
}

func (r *SourceReconciler) checkIfHPANeedUpdate(hpa *autov2.HorizontalPodAutoscaler, source *v1alpha1.Source) bool {
Expand Down
16 changes: 14 additions & 2 deletions controllers/source_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,24 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
source.Status.Conditions = make(map[v1alpha1.Component]v1alpha1.ResourceCondition)
}

isNewGeneration := r.checkIfSourceGenerationsIsIncreased(source)

err = r.ObserveSourceStatefulSet(ctx, source)
if err != nil {
return reconcile.Result{}, err
}
// skip reconcile if pauseRollout is set to true and the generation is not increased
if spec.IsPauseRollout(source) && !isNewGeneration {
err = r.Status().Update(ctx, source)
if err != nil {
r.Log.Error(err, "failed to update source status after observing statefulset")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
} else {
source.Status.PendingChange = ""
}

err = r.ObserveSourceService(ctx, source)
if err != nil {
return reconcile.Result{}, err
Expand Down Expand Up @@ -129,8 +143,6 @@ func (r *SourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return ctrl.Result{}, err
}

isNewGeneration := r.checkIfSourceGenerationsIsIncreased(source)

err = r.ApplySourceStatefulSet(ctx, source, isNewGeneration)
if err != nil {
return reconcile.Result{}, err
Expand Down
35 changes: 32 additions & 3 deletions controllers/spec/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
Expand Down Expand Up @@ -99,6 +100,7 @@ const (
AnnotationPrometheusScrape = "prometheus.io/scrape"
AnnotationPrometheusPort = "prometheus.io/port"
AnnotationManaged = "compute.functionmesh.io/managed"
AnnotationPauseRollout = "compute.functionmesh.io/pause-rollout"
AnnotationNeedCleanup = "compute.functionmesh.io/need-cleanup"

// if labels contains below, we think it comes from function-mesh-worker-service
Expand Down Expand Up @@ -171,6 +173,11 @@ func IsManaged(object metav1.Object) bool {
return !exists || managed != "false"
}

func IsPauseRollout(object metav1.Object) bool {
pauseRollout, exists := object.GetAnnotations()[AnnotationPauseRollout]
return exists && pauseRollout == "true"
}

func NeedCleanup(object metav1.Object) bool {
// don't cleanup if it's managed by function-mesh-worker-service
_, exists := object.GetLabels()[LabelPulsarCluster]
Expand Down Expand Up @@ -509,8 +516,9 @@ func MakeLivenessProbe(liveness *v1alpha1.Liveness) *corev1.Probe {
return &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/",
Port: intstr.FromInt32(MetricsPort.ContainerPort),
Path: "/",
Port: intstr.FromInt32(MetricsPort.ContainerPort),
Scheme: corev1.URISchemeHTTP,
},
},
InitialDelaySeconds: initialDelay,
Expand Down Expand Up @@ -2080,7 +2088,7 @@ func CheckIfStatefulSetSpecIsEqual(spec *appsv1.StatefulSetSpec, desiredSpec *ap
if !reflect.DeepEqual(container.Command, desiredContainer.Command) ||
container.Image != desiredContainer.Image ||
container.ImagePullPolicy != desiredContainer.ImagePullPolicy ||
container.LivenessProbe != desiredContainer.LivenessProbe ||
!reflect.DeepEqual(container.LivenessProbe, desiredContainer.LivenessProbe) ||
!reflect.DeepEqual(ports, desiredPorts) ||
!reflect.DeepEqual(containerEnvFrom, desiredContainerEnvFrom) ||
!reflect.DeepEqual(container.Resources, desiredContainer.Resources) {
Expand Down Expand Up @@ -2368,3 +2376,24 @@ func makeFilebeatContainer(volumeMounts []corev1.VolumeMount, envVar []corev1.En
},
}
}

func CreateDiff(orj, modified *appsv1.StatefulSet) (string, error) {
orjCopy := orj.DeepCopyObject().(*appsv1.StatefulSet)
modifiedCopy := modified.DeepCopyObject().(*appsv1.StatefulSet)
modifiedCopy.Status = orjCopy.Status
modifiedCopy.ObjectMeta = orjCopy.ObjectMeta

orjData, err := json.Marshal(orjCopy)
if err != nil {
return "", fmt.Errorf("marshal origin %w", err)
}
modifiedData, err := json.Marshal(modifiedCopy)
if err != nil {
return "", fmt.Errorf("marshal modified %w", err)
}
patch, err := strategicpatch.CreateTwoWayMergePatch(orjData, modifiedData, orjCopy)
if err != nil {
return "", fmt.Errorf("create diff %w", err)
}
return string(patch), nil
}

0 comments on commit 7ff1021

Please sign in to comment.