Skip to content

Commit

Permalink
support unsubscribe
Browse files Browse the repository at this point in the history
Signed-off-by: Jiangnan Jia <jnan0806@gmail.com>
  • Loading branch information
jnan806 committed Feb 9, 2023
1 parent c8e75a4 commit 67773f4
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 2 deletions.
70 changes: 69 additions & 1 deletion control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ func NewControlPlane() (*ControlPlane, error) {
return nil, err
}

cp.server = transport.NewServer(uint32(10246), []model.SubscribeRequestHandler{cp.handleSubscribeRequest})
handlers := []model.SubscribeRequestHandler{
cp.handleSubscribeRequest,
cp.handleUnSubscribeRequest,
}
cp.server = transport.NewServer(uint32(10246), handlers)
cp.operator = operator

hostname, herr := os.Hostname()
Expand Down Expand Up @@ -106,7 +110,24 @@ func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream
})
}

func (c *ControlPlane) sendAckToStream(stream model.OpenSergoTransportStream, ack string, status *trpb.Status, respId string) error {
if stream == nil {
return nil
}
return stream.SendMsg(&trpb.SubscribeResponse{
Status: status,
Ack: ack,
ControlPlane: c.protoDesc,
ResponseId: respId,
})
}

func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error {

if trpb.SubscribeOpType_SUBSCRIBE != request.OpType {
return nil
}

//var labels []model.LabelKV
//if request.Target.Labels != nil {
// for _, label := range request.Target.Labels {
Expand Down Expand Up @@ -160,3 +181,50 @@ func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdent
}
return nil
}

// handleUnSubscribeRequest handle the UnSubscribeRequest request from OpenSergo SDK.
//
// 1.use ConnectionManager to remove from connectionMap for SubscribeTarget
// 2.use operator to UnRegisterWatcher for SubscribeTarget which will remove SubscribeTarget, delete crdCache, and remove CrdWatcher
func (c *ControlPlane) handleUnSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error {

if trpb.SubscribeOpType_UNSUBSCRIBE != request.OpType {
return nil
}

for _, kind := range request.Target.Kinds {
namespacedApp := model.NamespacedApp{
Namespace: request.Target.Namespace,
App: request.Target.App,
}
// remove the relation of Connection and SubscribeTarget from local cache
err := c.server.ConnectionManager().RemoveWithIdentifier(namespacedApp, kind, clientIdentifier)
if err != nil {
log.Printf("Remove map of Connection-SubscribeTarget failed, err=%s\n", err.Error())
status := &trpb.Status{
// TODO: defined a new errorCode
Code: transport.RegisterWatcherError,
Message: "Remove from watcher error",
Details: nil,
}
err = c.sendMessageToStream(stream, request.Target.Namespace, request.Target.App, kind, nil, status, request.RequestId)
if err != nil {
// TODO: log here
log.Printf("sendMessageToStream failed, err=%s\n", err.Error())
}
continue
}

// UnRegisterWatcher for SubscribeTarget
err = c.operator.UnRegisterWatcher(model.SubscribeTarget{
Namespace: request.Target.Namespace,
AppName: request.Target.App,
Kind: kind,
})
if err != nil {
log.Printf("UnRegisterWatcher failed, err=%s\n", err.Error())
}
}

return nil
}
51 changes: 50 additions & 1 deletion pkg/controller/crd_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"strconv"
"sync"

"go.uber.org/atomic"

"github.com/go-logr/logr"
crdv1alpha1 "github.com/opensergo/opensergo-control-plane/pkg/api/v1alpha1"
crdv1alpha1traffic "github.com/opensergo/opensergo-control-plane/pkg/api/v1alpha1/traffic"
Expand Down Expand Up @@ -56,6 +58,7 @@ type CRDWatcher struct {
crdGenerator func() client.Object
sendDataHandler model.DataEntirePushHandler

deleted *atomic.Bool
updateMux sync.RWMutex
}

Expand Down Expand Up @@ -97,7 +100,36 @@ func (r *CRDWatcher) AddSubscribeTarget(target model.SubscribeTarget) error {
}

func (r *CRDWatcher) RemoveSubscribeTarget(target model.SubscribeTarget) error {
// TODO: implement me
// TODO: validate the target
if target.Kind != r.kind {
return errors.New("target kind mismatch, expected: " + target.Kind + ", r.kind: " + r.kind)
}
r.updateMux.Lock()
defer r.updateMux.Unlock()

// remove from subscribedList
delete(r.subscribedList, target)

// if len(r.subscribedList) < 1 means there's no matched subscribeTarget in NamespacedApp,
// then delete subscribeTarget and crdCache which matched NamespacedApp
if len(r.subscribedList) < 1 {
delete(r.subscribedApps, target.NamespacedApp())
// TODO delete crdCache
//r.crdCache.DeleteByNamespaceApp(model.NamespacedApp{
// Namespace: target.Namespace,
// App: target.AppName,
//}, "")

// if len(r.subscribedApps) < 1 means there's no matched subscribeTarget in Namespace,
// then delete subscribeTarget and crdCache which matched Namespace
if len(r.subscribedApps) < 1 {
delete(r.subscribedNamespaces, target.Namespace)
// r.crdCache.DeleteByNamespacedName(model.NamespacedApp{
// Namespace: target.Namespace,
// App: target.AppName,
// }, "")
}
}

return nil
}
Expand All @@ -119,6 +151,14 @@ func (r *CRDWatcher) HasAnySubscribedOfApp(app model.NamespacedApp) bool {
}

func (r *CRDWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// TODO optimize the logic of destroy the current controller
// r.deleted.Load() is a flag marked the deleted status of controller
// can not destroy the current controller
//
if r.deleted.Load() {
return ctrl.Result{}, nil
}

if !r.HasAnySubscribedOfNamespace(req.Namespace) {
// Ignore unmatched namespace
return ctrl.Result{Requeue: false, RequeueAfter: 0}, nil
Expand Down Expand Up @@ -216,9 +256,17 @@ func (r *CRDWatcher) GetRules(n model.NamespacedApp) ([]*anypb.Any, int64) {
}

func (r *CRDWatcher) SetupWithManager(mgr ctrl.Manager) error {
// TODO optimized delete logic here
r.deleted.Store(false)
return ctrl.NewControllerManagedBy(mgr).For(r.crdGenerator()).Complete(r)
}

func (r *CRDWatcher) ShutdownWithManager(mgr ctrl.Manager) error {
// TODO optimized delete logic here
r.deleted.Store(true)
return nil
}

func (r *CRDWatcher) translateCrdToProto(object client.Object) (*anypb.Any, error) {
var packRule *anypb.Any
var err error
Expand Down Expand Up @@ -338,6 +386,7 @@ func NewCRDWatcher(crdManager ctrl.Manager, kind model.SubscribeKind, crdGenerat
subscribedNamespaces: make(map[string]bool),
subscribedApps: make(map[model.NamespacedApp]bool),
crdGenerator: crdGenerator,
deleted: atomic.NewBool(false),
crdCache: NewCRDCache(kind),
sendDataHandler: sendDataHandler,
}
Expand Down
42 changes: 42 additions & 0 deletions pkg/controller/k8s_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ func (k *KubernetesOperator) RegisterWatcher(target model.SubscribeTarget) (*CRD

existingWatcher, exists := k.controllers[target.Kind]
if exists {
// TODO optimized delete logic here
existingWatcher.deleted.Store(false)
if existingWatcher.HasSubscribed(target) {
// Target has been subscribed
return existingWatcher, nil
Expand Down Expand Up @@ -168,6 +170,8 @@ func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget) error {

existingWatcher, exists := k.controllers[target.Kind]
if exists && !existingWatcher.HasSubscribed(target) {
// TODO optimized delete logic here
existingWatcher.deleted.Store(false)
// TODO: think more about here
err = existingWatcher.AddSubscribeTarget(target)
if err != nil {
Expand Down Expand Up @@ -200,6 +204,44 @@ func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget) error {
return nil
}

// UnRegisterWatcher unRegisters given SubscribeTarget.
func (k *KubernetesOperator) UnRegisterWatcher(target model.SubscribeTarget) error {
k.controllerMux.Lock()
defer k.controllerMux.Unlock()

existingWatcher, exists := k.controllers[target.Kind]
if !exists {
return nil
}

if existingWatcher.HasAnySubscribedOfNamespace(target.Namespace) {
err := existingWatcher.RemoveSubscribeTarget(target)
if err != nil {
return err
}
if !existingWatcher.HasAnySubscribedOfNamespace(target.Namespace) {
if err = k.removeWatcher(existingWatcher, target); err != nil {
return err
}
}
} else {
if err := k.removeWatcher(existingWatcher, target); err != nil {
return err
}
}

return nil
}

func (k *KubernetesOperator) removeWatcher(crdWatcher *CRDWatcher, target model.SubscribeTarget) error {
delete(k.controllers, target.Kind)
// TODO add Shutdown logic
if err := crdWatcher.ShutdownWithManager(k.crdManager); err != nil {
return err
}
return nil
}

// Close exit the K8S KubernetesOperator
func (k *KubernetesOperator) Close() error {
k.ctxCancel()
Expand Down
30 changes: 30 additions & 0 deletions pkg/transport/grpc/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,26 @@ func (c *ConnectionManager) removeInternal(n model.NamespacedApp, kind string, i
return nil
}
delete(streams, identifier)

streams, exists = kindMap[kind]
// !exists || streams == nil || len(streams) < 1
// means after delete, there is no elements in kindMap[kind]
// then delete kind from kindMap
if !exists || streams == nil || len(streams) < 1 {
delete(kindMap, kind)
}
kindMap, exists = c.connectionMap[n]
// !exists || kindMap == nil || len(kindMap) < 1
// means after delete, there is no elements in c.connectionMap[n]
// then delete n from c.connectionMap[n]
if !exists || kindMap == nil || len(kindMap) < 1 {
delete(c.connectionMap, n)
}

return nil
}

// RemoveByIdentifier with a sync.RWMutex
func (c *ConnectionManager) RemoveByIdentifier(identifier model.ClientIdentifier) error {
c.updateMux.Lock()
defer c.updateMux.Unlock()
Expand All @@ -143,6 +160,7 @@ func (c *ConnectionManager) RemoveByIdentifier(identifier model.ClientIdentifier
if !exists {
return nil
}
// remove from connectionMap
for n, kinds := range NamespaceAppKinds {
for _, kind := range kinds {
err := c.removeInternal(n, kind, identifier)
Expand All @@ -154,6 +172,18 @@ func (c *ConnectionManager) RemoveByIdentifier(identifier model.ClientIdentifier
return nil
}

// RemoveWithIdentifier with a sync.RWMutex
func (c *ConnectionManager) RemoveWithIdentifier(namespacedApp model.NamespacedApp, kind string, identifier model.ClientIdentifier) error {
c.updateMux.Lock()
defer c.updateMux.Unlock()
// remove from connectionMap
err := c.removeInternal(namespacedApp, kind, identifier)
if err != nil {
return err
}
return nil
}

func NewConnectionManager() *ConnectionManager {
return &ConnectionManager{
connectionMap: make(map[model.NamespacedApp]map[string]ConnectionMap),
Expand Down
4 changes: 4 additions & 0 deletions pkg/transport/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (s *Server) ComponentName() string {
return "OpenSergoUniversalTransportServer"
}

func (s *Server) IsStarted() bool {
return s.started.Load()
}

func (s *Server) Run() error {
if s.started.CAS(false, true) {
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", s.port))
Expand Down

0 comments on commit 67773f4

Please sign in to comment.