diff --git a/go.mod b/go.mod index 0b83d67..bf2389c 100644 --- a/go.mod +++ b/go.mod @@ -38,6 +38,8 @@ require ( github.com/alingse/asasalint v0.0.11 // indirect github.com/ashanbrown/forbidigo v1.6.0 // indirect github.com/ashanbrown/makezero v1.1.1 // indirect + github.com/avast/retry-go v3.0.0+incompatible // indirect + github.com/avast/retry-go/v4 v4.6.0 github.com/beorn7/perks v1.0.1 // indirect github.com/bkielbasa/cyclop v1.2.1 // indirect github.com/blizzy78/varnamelen v0.8.0 // indirect diff --git a/go.sum b/go.sum index c865001..4386c87 100644 --- a/go.sum +++ b/go.sum @@ -85,6 +85,10 @@ github.com/ashanbrown/forbidigo v1.6.0 h1:D3aewfM37Yb3pxHujIPSpTf6oQk9sc9WZi8ger github.com/ashanbrown/forbidigo v1.6.0/go.mod h1:Y8j9jy9ZYAEHXdu723cUlraTqbzjKF1MUyfOKL+AjcU= github.com/ashanbrown/makezero v1.1.1 h1:iCQ87C0V0vSyO+M9E/FZYbu65auqH0lnsOkf5FcB28s= github.com/ashanbrown/makezero v1.1.1/go.mod h1:i1bJLCRSCHOcOa9Y6MyF2FTfMZMFdHvxKHxgO5Z1axI= +github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0= +github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY= +github.com/avast/retry-go/v4 v4.6.0 h1:K9xNA+KeB8HHc2aWFuLb25Offp+0iVRXEvFx8IinRJA= +github.com/avast/retry-go/v4 v4.6.0/go.mod h1:gvWlPhBVsvBbLkVGDg/KwvBv0bEkCOLRRSHKIr2PyOE= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= diff --git a/source_pubsub/client.go b/source_pubsub/client.go index cf9ec48..697f30c 100644 --- a/source_pubsub/client.go +++ b/source_pubsub/client.go @@ -26,6 +26,7 @@ import ( "sync" "time" + rt "github.com/avast/retry-go/v4" "github.com/conduitio-labs/conduit-connector-salesforce/source_pubsub/proto" sdk "github.com/conduitio/conduit-connector-sdk" "github.com/linkedin/goavro/v2" @@ -42,6 +43,7 @@ import ( var ( GRPCDialTimeout = 5 * time.Second GRPCCallTimeout = 5 * time.Second + RetryDelay = 10 * time.Second ) var ErrEndOfRecords = errors.New("end of records from stream") @@ -268,29 +270,29 @@ func (c *PubSubClient) ReplayID() []byte { } func (c *PubSubClient) retryAuth(ctx context.Context, retry bool) (error, bool) { - for retry { - var err error - sdk.Logger(ctx).Info().Msgf("retry connection - retries %d ", c.retryCount) - - if err = c.login(ctx); err != nil && c.retryCount <= 0 { - return fmt.Errorf("failed to refresh auth: %w", err), retry - } else if err != nil { - sdk.Logger(ctx).Info().Msgf("received error on login - retry - %d ", c.retryCount) - c.retryCount-- - } + var err error + sdk.Logger(ctx).Info().Msgf("retry connection - retries %d ", c.retryCount) - if err = c.Initialize(ctx); err != nil && c.retryCount <= 0 { - return fmt.Errorf("failed to reinitialize client: %w", err), retry - } else if err != nil { - sdk.Logger(ctx).Info().Msgf("received error on init - retry - %d ", c.retryCount) - c.retryCount-- - } + if err = c.login(ctx); err != nil && c.retryCount <= 0 { + return fmt.Errorf("failed to refresh auth: %w", err), retry + } else if err != nil { + sdk.Logger(ctx).Info().Msgf("received error on login - retry - %d ", c.retryCount) + c.retryCount-- + retry = true + return nil, retry + } - if err == nil { - retry = false - c.retryCount-- - } + if err = c.Initialize(ctx); err != nil && c.retryCount <= 0 { + return fmt.Errorf("failed to reinitialize client: %w", err), retry + } else if err != nil { + sdk.Logger(ctx).Info().Msgf("received error on init - retry - %d ", c.retryCount) + c.retryCount-- + retry = true + return nil, retry } + + retry = false + c.retryCount-- return nil, retry } @@ -305,7 +307,18 @@ func (c *PubSubClient) startCDC(ctx context.Context) error { for { if retry { - err, retry = c.retryAuth(ctx, retry) + err := rt.Do(func() error { + err, retry = c.retryAuth(ctx, retry) + return err + }, + rt.RetryIf(func(err error) bool { + if retry && c.retryCount > 0 { + return true + } + return false + }), + rt.Delay(RetryDelay), + ) if err != nil { return fmt.Errorf("error retrying (number of retries %d) auth - %s", c.retryCount, err) }