Skip to content

Commit

Permalink
add comments
Browse files Browse the repository at this point in the history
Signed-off-by: Jiangnan Jia <jnan0806@gmail.com>
  • Loading branch information
jnan806 committed Nov 10, 2022
1 parent 263a15d commit e2ea96f
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 8 deletions.
40 changes: 32 additions & 8 deletions control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ func (c *ControlPlane) handleSubscribeRequest(clientIdentifier model.ClientIdent
return nil
}

// handleUnSubscribeRequest handle the UnSubscribeRequest request from OpenSergo SDK.
//
// 1.remove cache of SubscribeTarget in Connection
// 2.remove watcher if there is no SubscribeTarget for the same kind in Connection cache.
func (c *ControlPlane) handleUnSubscribeRequest(clientIdentifier model.ClientIdentifier, request *trpb.SubscribeRequest, stream model.OpenSergoTransportStream) error {

if trpb.SubscribeOpType_UNSUBSCRIBE != request.OpType {
Expand All @@ -193,6 +197,7 @@ func (c *ControlPlane) handleUnSubscribeRequest(clientIdentifier model.ClientIde
Namespace: request.Target.Namespace,
App: request.Target.App,
}
// remove the relation of Connection and SubscribeTarget from local cache
err := c.server.ConnectionManager().RemoveWithIdentifier(namespacedApp, kind, clientIdentifier)

if err != nil {
Expand All @@ -209,21 +214,27 @@ func (c *ControlPlane) handleUnSubscribeRequest(clientIdentifier model.ClientIde
continue
}

targetConnections, _ := c.server.ConnectionManager().Get(request.Target.Namespace, request.Target.App, kind)
if len(targetConnections) < 1 {
// handle the SubscribeTarget-cache and crdCache in CRDWatcher, and remove watcher. only push into a chan named delSubscribeConnChan waiting for delete.
// 1. if there is no kind cached in current Connection, push subscribeConnInfo with an empty NamespaceApp into delSubscribeConnChan,
// then, will delete the watcher for CRD.
// 2. if the number of relation between Connection and SubscribeTarget < 1, then push subscribeConnInfo with current NamespaceApp into delSubscribeConnChan,
// then 1st, will delete the SubscribeTarget which is cached in current Connection
// 2nd, will delte the watcher for CRD if there's no kind cached in current Connection
existConnection := c.server.ConnectionManager().ExistConnection(kind)
if !existConnection {
delSubscribeConnChan <- delSubscribeConnInfo{
stream: stream,
request: request,
namespaceApp: namespacedApp,
namespaceApp: model.NamespacedApp{},
kind: kind,
}
} else {
existConnection := c.server.ConnectionManager().ExistConnection(kind)
if !existConnection {
targetConnections, _ := c.server.ConnectionManager().Get(request.Target.Namespace, request.Target.App, kind)
if len(targetConnections) < 1 {
delSubscribeConnChan <- delSubscribeConnInfo{
stream: stream,
request: request,
namespaceApp: model.NamespacedApp{},
namespaceApp: namespacedApp,
kind: kind,
}
}
Expand All @@ -233,6 +244,7 @@ func (c *ControlPlane) handleUnSubscribeRequest(clientIdentifier model.ClientIde
return nil
}

// delSubscribeConnChan a chan for delete the SubscribeTarget-cache and crdCache in CRDWatcher, and remove watcher to stop watching.
var delSubscribeConnChan chan delSubscribeConnInfo

type delSubscribeConnInfo struct {
Expand All @@ -242,24 +254,36 @@ type delSubscribeConnInfo struct {
kind string
}

// delConn a goroutine contains the logic of delete the SubscribeTarget-cache and crdCache in CRDWatcher, and remove watcher.
//
// 1. at the beginning of current goroutine, should wait for the status of server is started.
//
// 2. when receive a delSubscribeConnInfo from delSubscribeConnChan, waiting a silence time to prevent inaccurate data statistics caused by network jitter.
//
// after the silence time, is the actually logic of deleting local cache and removing watcher.
func (c *ControlPlane) delConn() {
go func() {
// waiting for server is started.
for !c.server.IsStarted() {
time.Sleep(time.Duration(1) * time.Second)
}

// receive from delSubscribeConnChan
currDelConnInfo := <-delSubscribeConnChan
namespaceApp := currDelConnInfo.namespaceApp
kind := currDelConnInfo.kind
request := currDelConnInfo.request
stream := currDelConnInfo.stream

// wait a silence for network jitter
// TODO make time of sleep is configurable
time.Sleep(time.Duration(5) * time.Second)
var err error

// RemoveSubscribeTarget from CRDWatcher
// if namespaceApp is not an empty struct, means that need to delete SubscribeTarget cache in CRDWatcher
if !reflect.DeepEqual(namespaceApp, model.NamespacedApp{}) {
// re-count the number of SubscribeTarget
targetConnections, _ := c.server.ConnectionManager().Get(namespaceApp.Namespace, namespaceApp.App, kind)
if len(targetConnections) < 1 {
if crdWatcher, existed := c.operator.GetWatcher(kind); existed {
Expand All @@ -272,7 +296,7 @@ func (c *ControlPlane) delConn() {
}
}

// delete Connection and CRDWatch
// remove the CRDWatch from KubernetesOperator, to stop watching the kind.
existConnection := c.server.ConnectionManager().ExistConnection(kind)
if !existConnection {
c.operator.RemoveWatcher(model.SubscribeTarget{
Expand All @@ -282,7 +306,7 @@ func (c *ControlPlane) delConn() {
})
}

// send ackMessage
// send ackMessage for UnSubscribeConfig request.
status := &trpb.Status{
Code: transport.Success,
Message: "unSubscribe success",
Expand Down
2 changes: 2 additions & 0 deletions pkg/controller/crd_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,12 @@ func (r *CRDWatcher) RemoveSubscribeTarget(target model.SubscribeTarget) error {
r.updateMux.Lock()
defer r.updateMux.Unlock()

// remove the subscribe-cache in this CRDWatcher
delete(r.subscribedList, target)
delete(r.subscribedNamespaces, target.Namespace)
delete(r.subscribedApps, target.AppName)

// delete the matched crdCache which comes from k8s
// TODO the 2nd param need fix to correct.
r.crdCache.DeleteByNamespaceApp(model.NamespacedApp{
Namespace: target.Namespace,
Expand Down

0 comments on commit e2ea96f

Please sign in to comment.