From 67773f43136fd021cdd0ba6933dd74c6f0ab03ce Mon Sep 17 00:00:00 2001 From: Jiangnan Jia Date: Fri, 4 Nov 2022 00:50:20 +0800 Subject: [PATCH] support unsubscribe Signed-off-by: Jiangnan Jia --- control_plane.go | 70 +++++++++++++++++++++++++++++++- pkg/controller/crd_watcher.go | 51 ++++++++++++++++++++++- pkg/controller/k8s_operator.go | 42 +++++++++++++++++++ pkg/transport/grpc/connection.go | 30 ++++++++++++++ pkg/transport/grpc/server.go | 4 ++ 5 files changed, 195 insertions(+), 2 deletions(-) diff --git a/control_plane.go b/control_plane.go index 8e0686e..b54715c 100644 --- a/control_plane.go +++ b/control_plane.go @@ -43,7 +43,11 @@ func NewControlPlane() (*ControlPlane, error) { return nil, err } - cp.server = transport.NewServer(uint32(10246), []model.SubscribeRequestHandler{cp.handleSubscribeRequest}) + handlers := []model.SubscribeRequestHandler{ + cp.handleSubscribeRequest, + cp.handleUnSubscribeRequest, + } + cp.server = transport.NewServer(uint32(10246), handlers) cp.operator = operator hostname, herr := os.Hostname() @@ -106,7 +110,24 @@ func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream }) } +func (c *ControlPlane) sendAckToStream(stream model.OpenSergoTransportStream, ack string, status *trpb.Status, respId string) error { + if stream == nil { + return nil + } + return stream.SendMsg(&trpb.SubscribeResponse{ + Status: status, + Ack: ack, + ControlPlane: c.protoDesc, + ResponseId: respId, + }) +} + func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error { + + if trpb.SubscribeOpType_SUBSCRIBE != request.OpType { + return nil + } + //var labels []model.LabelKV //if request.Target.Labels != nil { // for _, label := range request.Target.Labels { @@ -160,3 +181,50 @@ func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdent } return nil } + +// handleUnSubscribeRequest handle the UnSubscribeRequest request from OpenSergo SDK. +// +// 1.use ConnectionManager to remove from connectionMap for SubscribeTarget +// 2.use operator to UnRegisterWatcher for SubscribeTarget which will remove SubscribeTarget, delete crdCache, and remove CrdWatcher +func (c *ControlPlane) handleUnSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error { + + if trpb.SubscribeOpType_UNSUBSCRIBE != request.OpType { + return nil + } + + for _, kind := range request.Target.Kinds { + namespacedApp := model.NamespacedApp{ + Namespace: request.Target.Namespace, + App: request.Target.App, + } + // remove the relation of Connection and SubscribeTarget from local cache + err := c.server.ConnectionManager().RemoveWithIdentifier(namespacedApp, kind, clientIdentifier) + if err != nil { + log.Printf("Remove map of Connection-SubscribeTarget failed, err=%s\n", err.Error()) + status := &trpb.Status{ + // TODO: defined a new errorCode + Code: transport.RegisterWatcherError, + Message: "Remove from watcher error", + Details: nil, + } + err = c.sendMessageToStream(stream, request.Target.Namespace, request.Target.App, kind, nil, status, request.RequestId) + if err != nil { + // TODO: log here + log.Printf("sendMessageToStream failed, err=%s\n", err.Error()) + } + continue + } + + // UnRegisterWatcher for SubscribeTarget + err = c.operator.UnRegisterWatcher(model.SubscribeTarget{ + Namespace: request.Target.Namespace, + AppName: request.Target.App, + Kind: kind, + }) + if err != nil { + log.Printf("UnRegisterWatcher failed, err=%s\n", err.Error()) + } + } + + return nil +} diff --git a/pkg/controller/crd_watcher.go b/pkg/controller/crd_watcher.go index bcc42ad..8b273d4 100644 --- a/pkg/controller/crd_watcher.go +++ b/pkg/controller/crd_watcher.go @@ -21,6 +21,8 @@ import ( "strconv" "sync" + "go.uber.org/atomic" + "github.com/go-logr/logr" crdv1alpha1 "github.com/opensergo/opensergo-control-plane/pkg/api/v1alpha1" crdv1alpha1traffic "github.com/opensergo/opensergo-control-plane/pkg/api/v1alpha1/traffic" @@ -56,6 +58,7 @@ type CRDWatcher struct { crdGenerator func() client.Object sendDataHandler model.DataEntirePushHandler + deleted *atomic.Bool updateMux sync.RWMutex } @@ -97,7 +100,36 @@ func (r *CRDWatcher) AddSubscribeTarget(target model.SubscribeTarget) error { } func (r *CRDWatcher) RemoveSubscribeTarget(target model.SubscribeTarget) error { - // TODO: implement me + // TODO: validate the target + if target.Kind != r.kind { + return errors.New("target kind mismatch, expected: " + target.Kind + ", r.kind: " + r.kind) + } + r.updateMux.Lock() + defer r.updateMux.Unlock() + + // remove from subscribedList + delete(r.subscribedList, target) + + // if len(r.subscribedList) < 1 means there's no matched subscribeTarget in NamespacedApp, + // then delete subscribeTarget and crdCache which matched NamespacedApp + if len(r.subscribedList) < 1 { + delete(r.subscribedApps, target.NamespacedApp()) + // TODO delete crdCache + //r.crdCache.DeleteByNamespaceApp(model.NamespacedApp{ + // Namespace: target.Namespace, + // App: target.AppName, + //}, "") + + // if len(r.subscribedApps) < 1 means there's no matched subscribeTarget in Namespace, + // then delete subscribeTarget and crdCache which matched Namespace + if len(r.subscribedApps) < 1 { + delete(r.subscribedNamespaces, target.Namespace) + // r.crdCache.DeleteByNamespacedName(model.NamespacedApp{ + // Namespace: target.Namespace, + // App: target.AppName, + // }, "") + } + } return nil } @@ -119,6 +151,14 @@ func (r *CRDWatcher) HasAnySubscribedOfApp(app model.NamespacedApp) bool { } func (r *CRDWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + // TODO optimize the logic of destroy the current controller + // r.deleted.Load() is a flag marked the deleted status of controller + // can not destroy the current controller + // + if r.deleted.Load() { + return ctrl.Result{}, nil + } + if !r.HasAnySubscribedOfNamespace(req.Namespace) { // Ignore unmatched namespace return ctrl.Result{Requeue: false, RequeueAfter: 0}, nil @@ -216,9 +256,17 @@ func (r *CRDWatcher) GetRules(n model.NamespacedApp) ([]*anypb.Any, int64) { } func (r *CRDWatcher) SetupWithManager(mgr ctrl.Manager) error { + // TODO optimized delete logic here + r.deleted.Store(false) return ctrl.NewControllerManagedBy(mgr).For(r.crdGenerator()).Complete(r) } +func (r *CRDWatcher) ShutdownWithManager(mgr ctrl.Manager) error { + // TODO optimized delete logic here + r.deleted.Store(true) + return nil +} + func (r *CRDWatcher) translateCrdToProto(object client.Object) (*anypb.Any, error) { var packRule *anypb.Any var err error @@ -338,6 +386,7 @@ func NewCRDWatcher(crdManager ctrl.Manager, kind model.SubscribeKind, crdGenerat subscribedNamespaces: make(map[string]bool), subscribedApps: make(map[model.NamespacedApp]bool), crdGenerator: crdGenerator, + deleted: atomic.NewBool(false), crdCache: NewCRDCache(kind), sendDataHandler: sendDataHandler, } diff --git a/pkg/controller/k8s_operator.go b/pkg/controller/k8s_operator.go index 8ae15ad..535ec26 100644 --- a/pkg/controller/k8s_operator.go +++ b/pkg/controller/k8s_operator.go @@ -129,6 +129,8 @@ func (k *KubernetesOperator) RegisterWatcher(target model.SubscribeTarget) (*CRD existingWatcher, exists := k.controllers[target.Kind] if exists { + // TODO optimized delete logic here + existingWatcher.deleted.Store(false) if existingWatcher.HasSubscribed(target) { // Target has been subscribed return existingWatcher, nil @@ -168,6 +170,8 @@ func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget) error { existingWatcher, exists := k.controllers[target.Kind] if exists && !existingWatcher.HasSubscribed(target) { + // TODO optimized delete logic here + existingWatcher.deleted.Store(false) // TODO: think more about here err = existingWatcher.AddSubscribeTarget(target) if err != nil { @@ -200,6 +204,44 @@ func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget) error { return nil } +// UnRegisterWatcher unRegisters given SubscribeTarget. +func (k *KubernetesOperator) UnRegisterWatcher(target model.SubscribeTarget) error { + k.controllerMux.Lock() + defer k.controllerMux.Unlock() + + existingWatcher, exists := k.controllers[target.Kind] + if !exists { + return nil + } + + if existingWatcher.HasAnySubscribedOfNamespace(target.Namespace) { + err := existingWatcher.RemoveSubscribeTarget(target) + if err != nil { + return err + } + if !existingWatcher.HasAnySubscribedOfNamespace(target.Namespace) { + if err = k.removeWatcher(existingWatcher, target); err != nil { + return err + } + } + } else { + if err := k.removeWatcher(existingWatcher, target); err != nil { + return err + } + } + + return nil +} + +func (k *KubernetesOperator) removeWatcher(crdWatcher *CRDWatcher, target model.SubscribeTarget) error { + delete(k.controllers, target.Kind) + // TODO add Shutdown logic + if err := crdWatcher.ShutdownWithManager(k.crdManager); err != nil { + return err + } + return nil +} + // Close exit the K8S KubernetesOperator func (k *KubernetesOperator) Close() error { k.ctxCancel() diff --git a/pkg/transport/grpc/connection.go b/pkg/transport/grpc/connection.go index f8f9d56..45d97a0 100644 --- a/pkg/transport/grpc/connection.go +++ b/pkg/transport/grpc/connection.go @@ -132,9 +132,26 @@ func (c *ConnectionManager) removeInternal(n model.NamespacedApp, kind string, i return nil } delete(streams, identifier) + + streams, exists = kindMap[kind] + // !exists || streams == nil || len(streams) < 1 + // means after delete, there is no elements in kindMap[kind] + // then delete kind from kindMap + if !exists || streams == nil || len(streams) < 1 { + delete(kindMap, kind) + } + kindMap, exists = c.connectionMap[n] + // !exists || kindMap == nil || len(kindMap) < 1 + // means after delete, there is no elements in c.connectionMap[n] + // then delete n from c.connectionMap[n] + if !exists || kindMap == nil || len(kindMap) < 1 { + delete(c.connectionMap, n) + } + return nil } +// RemoveByIdentifier with a sync.RWMutex func (c *ConnectionManager) RemoveByIdentifier(identifier model.ClientIdentifier) error { c.updateMux.Lock() defer c.updateMux.Unlock() @@ -143,6 +160,7 @@ func (c *ConnectionManager) RemoveByIdentifier(identifier model.ClientIdentifier if !exists { return nil } + // remove from connectionMap for n, kinds := range NamespaceAppKinds { for _, kind := range kinds { err := c.removeInternal(n, kind, identifier) @@ -154,6 +172,18 @@ func (c *ConnectionManager) RemoveByIdentifier(identifier model.ClientIdentifier return nil } +// RemoveWithIdentifier with a sync.RWMutex +func (c *ConnectionManager) RemoveWithIdentifier(namespacedApp model.NamespacedApp, kind string, identifier model.ClientIdentifier) error { + c.updateMux.Lock() + defer c.updateMux.Unlock() + // remove from connectionMap + err := c.removeInternal(namespacedApp, kind, identifier) + if err != nil { + return err + } + return nil +} + func NewConnectionManager() *ConnectionManager { return &ConnectionManager{ connectionMap: make(map[model.NamespacedApp]map[string]ConnectionMap), diff --git a/pkg/transport/grpc/server.go b/pkg/transport/grpc/server.go index fc43adc..b01b836 100644 --- a/pkg/transport/grpc/server.go +++ b/pkg/transport/grpc/server.go @@ -61,6 +61,10 @@ func (s *Server) ComponentName() string { return "OpenSergoUniversalTransportServer" } +func (s *Server) IsStarted() bool { + return s.started.Load() +} + func (s *Server) Run() error { if s.started.CAS(false, true) { listener, err := net.Listen("tcp", fmt.Sprintf(":%d", s.port))