Skip to content

Commit

Permalink
refactor(pkg/config): watcher of mognodb and redis reuse Read function
Browse files Browse the repository at this point in the history
  • Loading branch information
amazing-gao committed May 13, 2021
1 parent 67acc64 commit bb4c842
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 120 deletions.
2 changes: 1 addition & 1 deletion pkg/config/source/mongodb/mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (rs *mongoSource) Watch() (source.Watcher, error) {
return nil, rs.err
}

return newWatcher(rs.db, rs.collection, rs.service, rs.client, rs.opts)
return newWatcher(rs)
}

func (rs *mongoSource) String() string {
Expand Down
73 changes: 13 additions & 60 deletions pkg/config/source/mongodb/wathcer.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,26 @@
package mongodb

import (
"bytes"
"errors"
"log"
"sync"
"time"

"github.com/boxgo/box/pkg/config/source"
"go.mongodb.org/mongo-driver/mongo"
)

type watcher struct {
sync.RWMutex
name string
opts source.Options
cs *source.ChangeSet
rsp chan string
ch chan *source.ChangeSet
exit chan bool
client *mongo.Client
db string
collection string
service string
name string
source *mongoSource
exit chan bool
changeSet chan *source.ChangeSet
}

func newWatcher(db, collection, service string, client *mongo.Client, opts source.Options) (source.Watcher, error) {
func newWatcher(mgo *mongoSource) (source.Watcher, error) {
w := &watcher{
name: "mongodb",
db: db,
collection: collection,
service: service,
opts: opts,
cs: nil,
rsp: make(chan string),
ch: make(chan *source.ChangeSet),
exit: make(chan bool),
client: client,
name: "mongodb",
source: mgo,
changeSet: make(chan *source.ChangeSet),
exit: make(chan bool),
}
go w.watch()

Expand All @@ -45,7 +29,7 @@ func newWatcher(db, collection, service string, client *mongo.Client, opts sourc

func (w *watcher) Next() (*source.ChangeSet, error) {
select {
case cs := <-w.ch:
case cs := <-w.changeSet:
return cs, nil
case <-w.exit:
return nil, errors.New("watcher stopped")
Expand All @@ -70,43 +54,12 @@ func (w *watcher) watch() {
case <-w.exit:
return
default:
cfg, err := loadConfig(w.client, w.db, w.collection, w.service)

data, err := w.source.Read()
if err != nil {
log.Printf("config mongodb watch error: %#v", err)
} else if cfg.Config != "" {
w.handle(cfg)
}
}
}
}

func (w *watcher) handle(cfg *Config) {
w.RLock()
eq := w.cs != nil && bytes.Compare(w.cs.Data, []byte(cfg.Config)) == 0
w.RUnlock()

if eq {
return
}

var val map[string]interface{}
if err := w.opts.Encoder.Decode([]byte(cfg.Config), &val); err != nil {
log.Printf("config mongo watch handler decode error: %#v", err)
return
}

cs := &source.ChangeSet{
Timestamp: time.Now(),
Source: w.name,
Data: []byte(cfg.Config),
Format: w.opts.Encoder.String(),
w.changeSet <- data
}
}
cs.Checksum = cs.Sum()

w.Lock()
w.cs = cs
w.Unlock()

w.ch <- cs
}
2 changes: 1 addition & 1 deletion pkg/config/source/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (rs *redisSource) Watch() (source.Watcher, error) {
return nil, rs.err
}

return newWatcher(rs.prefix, rs.client, rs.opts)
return newWatcher(rs)
}

func (rs *redisSource) String() string {
Expand Down
71 changes: 13 additions & 58 deletions pkg/config/source/redis/wathcer.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,26 @@
package redis

import (
"bytes"
"context"
"errors"
"log"
"sync"
"time"

"github.com/boxgo/box/pkg/config/source"
"github.com/go-redis/redis/v8"
)

type watcher struct {
sync.RWMutex
name string
prefix string
opts source.Options
cs *source.ChangeSet
rsp chan string
ch chan *source.ChangeSet
exit chan bool
client redis.UniversalClient
name string
source *redisSource
exit chan bool
changeSets chan *source.ChangeSet
}

func newWatcher(prefix string, client redis.UniversalClient, opts source.Options) (source.Watcher, error) {
func newWatcher(sour *redisSource) (source.Watcher, error) {
w := &watcher{
name: "redis",
prefix: prefix,
opts: opts,
cs: nil,
rsp: make(chan string),
ch: make(chan *source.ChangeSet),
exit: make(chan bool),
client: client,
name: "redis",
changeSets: make(chan *source.ChangeSet),
exit: make(chan bool),
source: sour,
}
go w.watch()

Expand All @@ -42,7 +29,7 @@ func newWatcher(prefix string, client redis.UniversalClient, opts source.Options

func (w *watcher) Next() (*source.ChangeSet, error) {
select {
case cs := <-w.ch:
case cs := <-w.changeSets:
return cs, nil
case <-w.exit:
return nil, errors.New("watcher stopped")
Expand All @@ -67,44 +54,12 @@ func (w *watcher) watch() {
case <-w.exit:
return
default:
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
if rsp, err := w.client.Get(ctx, w.prefix+".config").Bytes(); err != nil && err != redis.Nil {
data, err := w.source.Read()
if err != nil {
log.Printf("config redis watch error: %#v", err)
} else if len(rsp) != 0 {
w.handle(rsp)
}

cancel()
w.changeSets <- data
}
}
}

func (w *watcher) handle(data []byte) {
w.RLock()
eq := w.cs != nil && bytes.Compare(w.cs.Data, data) == 0
w.RUnlock()

if eq {
return
}

var val map[string]interface{}
if err := w.opts.Encoder.Decode(data, &val); err != nil {
log.Printf("config redis watch handler decode error: %#v", err)
return
}

cs := &source.ChangeSet{
Timestamp: time.Now(),
Source: w.name,
Data: data,
Format: w.opts.Encoder.String(),
}
cs.Checksum = cs.Sum()

w.Lock()
w.cs = cs
w.Unlock()

w.ch <- cs
}

0 comments on commit bb4c842

Please sign in to comment.