Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the initial synchronization of a sharing faster #4147

Merged
merged 4 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions model/sharing/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ const MaxRetries = 5
// (each next retry will wait 4 times longer than its previous retry)
const InitialBackoffPeriod = 1 * time.Minute

// BatchSize is the maximal number of documents mainpulated at once by the
// BatchSize is the maximal number of documents manipulated at once by the
// replicator
const BatchSize = 100
const BatchSize = 400

// ReplicateMsg is used for jobs on the share-replicate worker.
type ReplicateMsg struct {
Expand Down Expand Up @@ -143,6 +143,19 @@ func (s *Sharing) retryWorker(inst *instance.Instance, worker string, errors int
}
}

func (s *Sharing) InitialReplication(inst *instance.Instance, m *Member) error {
for i := 0; i < 1000; i++ {
pending, err := s.ReplicateTo(inst, m, true)
if err != nil {
return err
}
if !pending {
return nil
}
}
return ErrInternalServerError
}

// ReplicateTo starts a replicator on this sharing to the given member.
// http://docs.couchdb.org/en/stable/replication/protocol.html
// https://github.com/pouchdb/pouchdb/blob/master/packages/node_modules/pouchdb-replication/src/replicate.js
Expand Down
16 changes: 8 additions & 8 deletions model/sharing/replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestReplicator(t *testing.T) {
assert.Equal(t, feed.Seq, seq3)
})

t.Run("InitialCopy", func(t *testing.T) {
t.Run("InitialIndex", func(t *testing.T) {
// Start with an empty io.cozy.shared database
_ = couchdb.DeleteDB(inst, consts.Shared)
if err := couchdb.CreateDB(inst, consts.Shared); err != nil {
Expand All @@ -102,7 +102,7 @@ func TestReplicator(t *testing.T) {
Values: []string{settingsDocID},
Local: true,
})
assert.NoError(t, s.InitialCopy(inst, s.Rules[len(s.Rules)-1], len(s.Rules)-1))
assert.NoError(t, s.InitialIndex(inst, s.Rules[len(s.Rules)-1], len(s.Rules)-1))
nbShared := 0
assertNbSharedRef(t, inst, nbShared)

Expand All @@ -114,7 +114,7 @@ func TestReplicator(t *testing.T) {
DocType: testDoctype,
Values: []string{oneID},
})
assert.NoError(t, s.InitialCopy(inst, s.Rules[len(s.Rules)-1], len(s.Rules)-1))
assert.NoError(t, s.InitialIndex(inst, s.Rules[len(s.Rules)-1], len(s.Rules)-1))
nbShared++
assertNbSharedRef(t, inst, nbShared)
oneRef := getSharedRef(t, inst, testDoctype, oneID)
Expand All @@ -135,7 +135,7 @@ func TestReplicator(t *testing.T) {
Selector: "foo",
Values: []string{"bar"},
})
assert.NoError(t, s.InitialCopy(inst, s.Rules[len(s.Rules)-1], len(s.Rules)-1))
assert.NoError(t, s.InitialIndex(inst, s.Rules[len(s.Rules)-1], len(s.Rules)-1))
nbShared += len(twoIDs)
assertNbSharedRef(t, inst, nbShared)
for _, id := range twoIDs {
Expand All @@ -160,7 +160,7 @@ func TestReplicator(t *testing.T) {
Selector: "foo",
Values: []string{"qux", "quux", "quuux"},
})
assert.NoError(t, s.InitialCopy(inst, s.Rules[len(s.Rules)-1], len(s.Rules)-1))
assert.NoError(t, s.InitialIndex(inst, s.Rules[len(s.Rules)-1], len(s.Rules)-1))
nbShared += len(threeIDs)
assertNbSharedRef(t, inst, nbShared)
for _, id := range threeIDs {
Expand All @@ -172,7 +172,7 @@ func TestReplicator(t *testing.T) {

// Another member accepts the sharing
for r, rule := range s.Rules {
assert.NoError(t, s.InitialCopy(inst, rule, r))
assert.NoError(t, s.InitialIndex(inst, rule, r))
}
assertNbSharedRef(t, inst, nbShared)

Expand All @@ -189,7 +189,7 @@ func TestReplicator(t *testing.T) {

// A third member accepts the sharing
for r, rule := range s.Rules {
assert.NoError(t, s.InitialCopy(inst, rule, r))
assert.NoError(t, s.InitialIndex(inst, rule, r))
}
nbShared++
assertNbSharedRef(t, inst, nbShared)
Expand All @@ -212,7 +212,7 @@ func TestReplicator(t *testing.T) {
Selector: "foo",
Values: []string{"qux", "quux", "quuux"},
})
assert.NoError(t, s2.InitialCopy(inst, s2.Rules[len(s2.Rules)-1], len(s2.Rules)-1))
assert.NoError(t, s2.InitialIndex(inst, s2.Rules[len(s2.Rules)-1], len(s2.Rules)-1))
assertNbSharedRef(t, inst, nbShared)
for _, id := range threeIDs {
threeRef := getSharedRef(t, inst, testDoctype, id)
Expand Down
16 changes: 8 additions & 8 deletions model/sharing/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (s *Sharing) Setup(inst *instance.Instance, m *Member) {
}
if s.Triggers.ReplicateID == "" {
for i, rule := range s.Rules {
if err := s.InitialCopy(inst, rule, i); err != nil {
if err := s.InitialIndex(inst, rule, i); err != nil {
inst.Logger().Warnf("Error on initial copy for %s (%s): %s", rule.Title, s.SID, err)
}
}
Expand All @@ -119,14 +119,14 @@ func (s *Sharing) Setup(inst *instance.Instance, m *Member) {
inst.Logger().WithNamespace("sharing").
Warnf("Error on setup replicate trigger (%s): %s", s.SID, err)
}
if pending, err := s.ReplicateTo(inst, m, true); err != nil {
if err := s.InitialReplication(inst, m); err != nil {
inst.Logger().WithNamespace("sharing").
Warnf("Error on initial replication (%s): %s", s.SID, err)
s.retryWorker(inst, "share-replicate", 0)
} else {
if pending {
s.pushJob(inst, "share-replicate")
if s.FirstFilesRule() != nil {
s.retryWorker(inst, "share-upload", 1) // 1, so that it will start after share-replicate
}
} else {
if s.FirstFilesRule() == nil {
return
}
Expand All @@ -141,7 +141,7 @@ func (s *Sharing) Setup(inst *instance.Instance, m *Member) {
}
}

go s.NotifyRecipients(inst, m)
s.NotifyRecipients(inst, m)
}

// AddTrackTriggers creates the share-track triggers for each rule of the
Expand Down Expand Up @@ -208,9 +208,9 @@ func (s *Sharing) AddReplicateTrigger(inst *instance.Instance) error {
return couchdb.UpdateDoc(inst, s)
}

// InitialCopy lists the shared documents and put a reference in the
// InitialIndex lists the shared documents and put a reference in the
// io.cozy.shared database
func (s *Sharing) InitialCopy(inst *instance.Instance, rule Rule, r int) error {
func (s *Sharing) InitialIndex(inst *instance.Instance, rule Rule, r int) error {
if rule.Local || len(rule.Values) == 0 {
return nil
}
Expand Down
164 changes: 89 additions & 75 deletions model/sharing/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sharing

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -18,8 +19,8 @@ import (
"github.com/cozy/cozy-stack/pkg/consts"
"github.com/cozy/cozy-stack/pkg/couchdb"
"github.com/cozy/cozy-stack/pkg/realtime"
multierror "github.com/hashicorp/go-multierror"
"github.com/labstack/echo/v4"
"golang.org/x/sync/errgroup"
)

// UploadMsg is used for jobs on the share-upload worker.
Expand Down Expand Up @@ -52,25 +53,27 @@ func (s *Sharing) Upload(inst *instance.Instance, errors int) error {
}

lastTry := errors+1 == MaxRetries
for i := 0; i < BatchSize; i++ {
if len(members) == 0 {
break
}
m := members[0]
members = members[1:]
more, err := s.UploadTo(inst, m, lastTry)
if err != nil {
errm = multierror.Append(errm, err)
}
if more {
members = append(members, m)
}
done := true
g, _ := errgroup.WithContext(context.Background())
for i := range members {
m := members[i]
g.Go(func() error {
Comment on lines +59 to +60
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to pass m as an argument to the goroutine to create a closure?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errgroup doesn't allow to pass arguments to the goroutine. That's why I have written for i := range members { m := members[i] instead of the shorter for _, m := range members. It forces m to have the correct range.

In the future, I hope that https://github.com/golang/go/wiki/LoopvarExperiment will make things easier.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation.

more, err := s.UploadBatchTo(inst, m, lastTry)
if err != nil {
return err
}
if more {
done = false
}
return nil
})
}
err := g.Wait()

if errm != nil {
if err != nil {
s.retryWorker(inst, "share-upload", errors)
inst.Logger().WithNamespace("upload").Infof("errm=%s\n", errm)
} else if len(members) > 0 {
inst.Logger().WithNamespace("upload").Infof("err=%s\n", err)
} else if !done {
s.pushJob(inst, "share-upload")
}
return errm
Expand All @@ -84,14 +87,12 @@ func (s *Sharing) InitialUpload(inst *instance.Instance, m *Member) error {
}
defer mu.Unlock()

for i := 0; i < BatchSize; i++ {
more, err := s.UploadTo(inst, m, false)
if err != nil {
return err
}
if !more {
return s.sendInitialEndNotif(inst, m)
}
more, err := s.UploadBatchTo(inst, m, false)
if err != nil {
return err
}
if !more {
return s.sendInitialEndNotif(inst, m)
}

s.pushJob(inst, "share-upload")
Expand Down Expand Up @@ -126,9 +127,9 @@ func (s *Sharing) sendInitialEndNotif(inst *instance.Instance, m *Member) error
return nil
}

// UploadTo uploads one file to the given member. It returns false if there
// are no more files to upload to this member currently.
func (s *Sharing) UploadTo(inst *instance.Instance, m *Member, lastTry bool) (bool, error) {
// UploadBatchTo uploads a batch of files to the given member. It returns false
// if there are no more files to upload to this member currently.
func (s *Sharing) UploadBatchTo(inst *instance.Instance, m *Member, lastTry bool) (bool, error) {
if m.Instance == "" {
return false, ErrInvalidURL
}
Expand All @@ -143,58 +144,71 @@ func (s *Sharing) UploadTo(inst *instance.Instance, m *Member, lastTry bool) (bo
}
inst.Logger().WithNamespace("upload").Debugf("lastSeq = %s", lastSeq)

file, ruleIndex, seq, err := s.findNextFileToUpload(inst, lastSeq)
if errors.Is(err, ErrInternalServerError) {
// Retrying is useless in this case, let's skip this file
if seq != lastSeq {
_ = s.UpdateLastSequenceNumber(inst, m, "upload", seq)
}
return false, nil
batch := &batchUpload{
Sharing: s,
Instance: inst,
Seq: lastSeq,
}
if err != nil {
return false, err
}
if file == nil {
if seq != lastSeq {
err = s.UpdateLastSequenceNumber(inst, m, "upload", seq)
defer func() {
if batch.Seq != lastSeq {
_ = s.UpdateLastSequenceNumber(inst, m, "upload", batch.Seq)
}
return false, err
}
}()

if err = s.uploadFile(inst, m, file, ruleIndex); err != nil {
if lastTry {
_ = s.UpdateLastSequenceNumber(inst, m, "upload", seq)
for i := 0; i < BatchSize; i++ {
file, ruleIndex, err := batch.findNextFileToUpload()
if err != nil {
return false, err
}
if file == nil {
return false, nil
}
if err = s.uploadFile(inst, m, file, ruleIndex); err != nil {
return false, err
}
return false, err
}
return true, nil
}

type batchUpload struct {
Sharing *Sharing
Instance *instance.Instance
Seq string

return true, s.UpdateLastSequenceNumber(inst, m, "upload", seq)
// changes is used to batch calls to the changes feed and improves
// performances.
changes []couchdb.Change
}

// findNextFileToUpload uses the changes feed to find the next file that needs
// to be uploaded. It returns a file document if there is one file to upload,
// and the sequence number where it is in the changes feed.
func (s *Sharing) findNextFileToUpload(inst *instance.Instance, since string) (map[string]interface{}, int, string, error) {
// and the index of the sharing rule that applies to this file.
func (b *batchUpload) findNextFileToUpload() (map[string]interface{}, int, error) {
for {
response, err := couchdb.GetChanges(inst, &couchdb.ChangesRequest{
DocType: consts.Shared,
IncludeDocs: true,
Since: since,
Limit: 1,
})
if err != nil {
return nil, 0, since, err
}
since = response.LastSeq
if len(response.Results) == 0 {
break
seq := b.Seq
if len(b.changes) == 0 {
response, err := couchdb.GetChanges(b.Instance, &couchdb.ChangesRequest{
DocType: consts.Shared,
IncludeDocs: true,
Since: seq,
Limit: BatchSize,
})
if err != nil {
return nil, 0, err
}
if len(response.Results) == 0 {
return nil, 0, nil
}
b.changes = response.Results
}
r := response.Results[0]
infos, ok := r.Doc.Get("infos").(map[string]interface{})
change := b.changes[0]
b.changes = b.changes[1:]
b.Seq = change.Seq
infos, ok := change.Doc.Get("infos").(map[string]interface{})
if !ok {
continue
}
info, ok := infos[s.SID].(map[string]interface{})
info, ok := infos[b.Sharing.SID].(map[string]interface{})
if !ok {
continue
}
Expand All @@ -208,30 +222,30 @@ func (s *Sharing) findNextFileToUpload(inst *instance.Instance, since string) (m
if !ok {
continue
}
rev := extractLastRevision(r.Doc)
rev := extractLastRevision(change.Doc)
if rev == "" {
continue
}
docID := strings.SplitN(r.DocID, "/", 2)[1]
docID := strings.SplitN(change.DocID, "/", 2)[1]
ir := couchdb.IDRev{ID: docID, Rev: rev}
query := []couchdb.IDRev{ir}
results, err := couchdb.BulkGetDocs(inst, consts.Files, query)
results, err := couchdb.BulkGetDocs(b.Instance, consts.Files, query)
if err != nil {
return nil, 0, since, err
b.Seq = seq
return nil, 0, err
}
if len(results) == 0 {
inst.Logger().WithNamespace("upload").
b.Instance.Logger().WithNamespace("upload").
Warnf("missing results for bulk get %v", query)
return nil, 0, since, ErrInternalServerError
return nil, 0, ErrInternalServerError
}
if results[0]["_deleted"] == true {
inst.Logger().WithNamespace("upload").
b.Instance.Logger().WithNamespace("upload").
Warnf("cannot upload _deleted file %v", results[0])
return nil, 0, since, ErrInternalServerError
return nil, 0, ErrInternalServerError
}
return results[0], int(idx), since, nil
return results[0], int(idx), nil
}
return nil, 0, since, nil
}

// uploadFile uploads one file to the given member. It first try to just send
Expand Down
Loading