Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'origin/increase-batch-size' into 1.0-re…
Browse files Browse the repository at this point in the history
…lease
  • Loading branch information
anthonyalberto committed Nov 18, 2021
2 parents f1bfe79 + 7d7c6c8 commit 96f7ade
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 21 deletions.
18 changes: 10 additions & 8 deletions adaptor/mongodb/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ import (
)

const (
maxObjSize int = 1000
maxBSONObjSize int = 4800000
maxBSONObjSize int = 16000000
)

var (
Expand Down Expand Up @@ -53,7 +52,7 @@ func (b *Bulk) Write(msg message.Msg) func(client.Session) (message.Msg, error)
b.confirmChan = msg.Confirms()
bOp, ok := b.bulkMap[coll]
if !ok {
s := s.(*Session).mgoSession.Copy()
s := s.(*Session).mgoSession.Clone()
bOp = &bulkOperation{
s: s,
bulk: s.DB("").C(coll).Bulk(),
Expand All @@ -69,12 +68,15 @@ func (b *Bulk) Write(msg message.Msg) func(client.Session) (message.Msg, error)
msgSize := len(bs) + 4

// if the next op is going to put us over, flush and recreate bOp
if bOp.opCounter >= maxObjSize || bOp.bsonOpSize+msgSize >= maxBSONObjSize {
if bOp.opCounter >= s.(*Session).maxWriteBatchSize || bOp.bsonOpSize+msgSize >= maxBSONObjSize {
err = b.flush(coll, bOp)
if err != nil {
log.With("collection", coll).Infof("error flushing collection that has reached its size capacity: %s\n", err.Error())
}
if err == nil && b.confirmChan != nil {
b.confirmChan <- struct{}{}
}
s := s.(*Session).mgoSession.Copy()
s := s.(*Session).mgoSession.Clone()
bOp = &bulkOperation{
s: s,
bulk: s.DB("").C(coll).Bulk(),
Expand Down Expand Up @@ -107,7 +109,7 @@ func (b *Bulk) run(done chan struct{}, wg *sync.WaitGroup) {
return
}
case <-done:
log.Debugln("received done channel")
log.Infoln("received done channel")
if err := b.flushAll(); err != nil {
log.Errorf("flush error, %s", err)
}
Expand All @@ -131,7 +133,7 @@ func (b *Bulk) flushAll() error {
}

func (b *Bulk) flush(c string, bOp *bulkOperation) error {
log.With("collection", c).With("opCounter", bOp.opCounter).With("bsonOpSize", bOp.bsonOpSize).Debugln("flushing bulk messages")
log.With("collection", c).With("opCounter", bOp.opCounter).With("bsonOpSize", bOp.bsonOpSize).Infoln("flushing bulk messages")
_, err := bOp.bulk.Run()
if err != nil && !mgo.IsDup(err) {
log.With("collection", c).Errorf("flush error, %s\n", err)
Expand All @@ -144,7 +146,7 @@ func (b *Bulk) flush(c string, bOp *bulkOperation) error {
}
}
bOp.s.Close()
log.With("collection", c).Debugln("flush complete")
log.With("collection", c).Infoln("flush complete")
delete(b.bulkMap, c)
return nil
}
39 changes: 26 additions & 13 deletions adaptor/mongodb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const (

// DefaultReadPreference when connecting to a mongo replica set.
DefaultReadPreference = mgo.Primary

// DefaultMaxWriteBatchSize when using the bulk interface
DefaultMaxWriteBatchSize = 1000
)

var (
Expand Down Expand Up @@ -65,11 +68,12 @@ type ClientOptionFunc func(*Client) error
type Client struct {
uri string

safety mgo.Safe
tlsConfig *tls.Config
sessionTimeout time.Duration
tail bool
readPreference mgo.Mode
safety mgo.Safe
tlsConfig *tls.Config
sessionTimeout time.Duration
tail bool
readPreference mgo.Mode
maxWriteBatchSize int

mgoSession *mgo.Session
}
Expand All @@ -91,12 +95,13 @@ type Client struct {
func NewClient(options ...ClientOptionFunc) (*Client, error) {
// Set up the client
c := &Client{
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
tlsConfig: nil,
tail: false,
readPreference: DefaultReadPreference,
uri: DefaultURI,
sessionTimeout: DefaultSessionTimeout,
safety: DefaultSafety,
tlsConfig: nil,
tail: false,
readPreference: DefaultReadPreference,
maxWriteBatchSize: DefaultMaxWriteBatchSize,
}

// Run the options on it
Expand Down Expand Up @@ -270,11 +275,19 @@ func (c *Client) initConnection() error {
// mgo logger _may_ be a bit too noisy but it'll be good to have for diagnosis
mgo.SetLogger(log.Base())
mgoSession.EnsureSafe(&c.safety)
mgoSession.SetBatch(1000)
mgoSession.SetBatch(100000)
mgoSession.SetPrefetch(0.5)
mgoSession.SetSocketTimeout(time.Hour)
mgoSession.SetMode(c.readPreference, true)

// Lets set the max batch size
var results bson.M
err = mgoSession.DB("").Run("isMaster", &results)
if err != nil {
return client.ConnectError{Reason: err.Error()}
}
c.maxWriteBatchSize = results["maxWriteBatchSize"].(int)

if c.tail {
log.With("uri", c.uri).Infoln("testing oplog access")
localColls, err := mgoSession.DB("local").CollectionNames()
Expand Down Expand Up @@ -303,5 +316,5 @@ func (c *Client) initConnection() error {
// Session fulfills the client.Client interface by providing a copy of the main mgoSession
func (c *Client) session() client.Session {
sess := c.mgoSession.Copy()
return &Session{sess}
return &Session{sess, c.maxWriteBatchSize}
}
1 change: 1 addition & 0 deletions adaptor/mongodb/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
// Session serves as a wrapper for the underlying mgo.Session
type Session struct {
mgoSession *mgo.Session
maxWriteBatchSize int
}

var _ client.Session = &Session{}
Expand Down

0 comments on commit 96f7ade

Please sign in to comment.