diff --git a/source_pubsub/client.go b/source_pubsub/client.go index de3a46c..d12bed7 100644 --- a/source_pubsub/client.go +++ b/source_pubsub/client.go @@ -67,16 +67,19 @@ type PubSubClient struct { unionFields map[string]map[string]struct{} buffer chan sdk.Record - caches chan []ConnectResponseEvent ticker *time.Ticker tomb *tomb.Tomb topicNames []string sdkPos position.Topics - retryCount int maxRetries int } +type Topic struct { + retryCount int + topicName string +} + type ConnectResponseEvent struct { Data map[string]interface{} EventID string @@ -87,6 +90,11 @@ type ConnectResponseEvent struct { // Creates a new connection to the gRPC server and returns the wrapper struct. func NewGRPCClient(ctx context.Context, config Config, sdkPos position.Topics) (*PubSubClient, error) { + sdk.Logger(ctx).Info(). + Str("topics", strings.Join(config.TopicNames, ",")). + Str("topic", config.TopicName). + Msgf("Starting GRPC client") + var transportCreds credentials.TransportCredentials var replayPreset proto.ReplayPreset @@ -136,13 +144,14 @@ func NewGRPCClient(ctx context.Context, config Config, sdkPos position.Topics) ( sdkPos: sdkPos, oauth: &oauth{Credentials: creds}, tomb: t, - retryCount: config.RetryCount, maxRetries: config.RetryCount, }, nil } // Initializes the pubsub client by authenticating and. func (c *PubSubClient) Initialize(ctx context.Context) error { + sdk.Logger(ctx).Info().Msgf("Initizalizing PubSub client") + if err := c.login(ctx); err != nil { return err } @@ -154,8 +163,10 @@ func (c *PubSubClient) Initialize(ctx context.Context) error { for _, topic := range c.topicNames { c.tomb.Go(func() error { ctx := c.tomb.Context(nil) //nolint:staticcheck // SA1012 tomb expects nil - c.startCDC(ctx, topic) - return nil + return c.startCDC(ctx, Topic{ + topicName: topic, + retryCount: c.maxRetries, + }) }) } @@ -271,39 +282,35 @@ func (c *PubSubClient) Wait(ctx context.Context) error { } } -func (c *PubSubClient) ResetRetryCount() { - c.retryCount = c.maxRetries -} - -func (c *PubSubClient) retryAuth(ctx context.Context, retry bool) (bool, error) { +func (c *PubSubClient) retryAuth(ctx context.Context, retry bool, topic Topic) (bool, Topic, error) { var err error - sdk.Logger(ctx).Info().Msgf("retry connection - retries remaining %d ", c.retryCount) - c.retryCount-- + sdk.Logger(ctx).Info().Msgf("retry connection on topic %s - retries remaining %d ", topic.topicName, topic.retryCount) + topic.retryCount-- - if err = c.login(ctx); err != nil && c.retryCount <= 0 { - return retry, fmt.Errorf("failed to refresh auth: %w", err) + if err = c.login(ctx); err != nil && topic.retryCount <= 0 { + return retry, topic, fmt.Errorf("failed to refresh auth: %w", err) } else if err != nil { - sdk.Logger(ctx).Info().Msgf("received error on login - retry - %d ", c.retryCount) + sdk.Logger(ctx).Info().Msgf("received error on login for topic %s - retry - %d : %v ", topic.topicName, topic.retryCount, err) retry = true - return retry, fmt.Errorf("received error on subscribe - retry - %d ", c.retryCount) + return retry, topic, fmt.Errorf("received error on subscribe for topic %s - retry - %d : %w", topic.topicName, topic.retryCount, err) } - if err := c.canSubscribe(ctx); err != nil && c.retryCount <= 0 { - return retry, fmt.Errorf("failed to subscribe to client topic: %w", err) + if err := c.canSubscribe(ctx); err != nil && topic.retryCount <= 0 { + return retry, topic, fmt.Errorf("failed to subscribe to client topic %s: %w", topic.topicName, err) } else if err != nil { - sdk.Logger(ctx).Info().Msgf("received error on subscribe - retry - %d ", c.retryCount) + sdk.Logger(ctx).Info().Msgf("received error on subscribe for topic %s - retry - %d : %v", topic.topicName, topic.retryCount, err) retry = true - return retry, fmt.Errorf("received error on subscribe - retry - %d ", c.retryCount) + return retry, topic, fmt.Errorf("received error on subscribe for topic %s - retry - %d : %w ", topic.topicName, topic.retryCount, err) } retry = false - return retry, nil + return retry, topic, nil } -func (c *PubSubClient) startCDC(ctx context.Context, topic string) error { +func (c *PubSubClient) startCDC(ctx context.Context, topic Topic) error { sdk.Logger(ctx).Info(). - Str("topic", topic). - Str("replayID", string(c.sdkPos.GetTopicReplayID(topic))). + Str("topic", topic.topicName). + Str("replayID", string(c.sdkPos.GetTopicReplayID(topic.topicName))). Msg("starting CDC processing..") var ( @@ -315,15 +322,18 @@ func (c *PubSubClient) startCDC(ctx context.Context, topic string) error { for { if retry { err := rt.Do(func() error { - retry, err = c.retryAuth(ctx, retry) + retry, topic, err = c.retryAuth(ctx, retry, topic) return err }, rt.Delay(RetryDelay), - rt.Attempts(uint(c.retryCount)), + rt.Attempts(uint(topic.retryCount)), ) if err != nil { - return fmt.Errorf("error retrying (number of retries %d) auth - %s", c.retryCount, err) + return fmt.Errorf("error retrying (number of retries %d) for topic %s auth - %s", topic.retryCount, topic.topicName, err) } + + // once we are done with retries, reset the count + topic.retryCount = c.maxRetries } select { @@ -331,30 +341,30 @@ func (c *PubSubClient) startCDC(ctx context.Context, topic string) error { return ctx.Err() case <-c.ticker.C: // detect changes every polling period. - replayID := c.sdkPos.GetTopicReplayID(topic) + replayID := c.sdkPos.GetTopicReplayID(topic.topicName) sdk.Logger(ctx).Debug(). Dur("elapsed", time.Since(lastRecvdAt)). - Str("topic", topic). + Str("topic", topic.topicName). Str("replayID", base64.StdEncoding.EncodeToString(replayID)). Msg("attempting to receive new events") lastRecvdAt = time.Now().UTC() - events, err := c.Recv(ctx, topic, replayID) + events, err := c.Recv(ctx, topic.topicName, replayID) if err != nil { if c.invalidReplayIDErr(err) { sdk.Logger(ctx).Error().Err(err). - Str("topic", topic). + Str("topic", topic.topicName). Str("replayID", string(replayID)). Msgf("replay id %s is invalid, retrying", string(replayID)) - c.sdkPos.SetTopicReplayID(topic, nil) + c.sdkPos.SetTopicReplayID(topic.topicName, nil) break } - if c.retryCount > 0 { + if topic.retryCount > 0 { sdk.Logger(ctx).Error().Err(err). - Str("topic", topic). + Str("topic", topic.topicName). Str("replayID", string(replayID)). Msg("retrying authentication") retry = true @@ -367,7 +377,7 @@ func (c *PubSubClient) startCDC(ctx context.Context, topic string) error { sdk.Logger(ctx).Debug(). Int("events", len(events)). Dur("elapsed", time.Since(lastRecvdAt)). - Str("topic", topic). + Str("topic", topic.topicName). Msg("received events") for _, e := range events { @@ -395,7 +405,6 @@ func (*PubSubClient) connErr(err error) bool { } func (c *PubSubClient) buildRecord(event ConnectResponseEvent) sdk.Record { - // TODO - ADD something here to distinguish creates, deletes, updates. return sdk.Util.Source.NewRecordCreate( diff --git a/source_pubsub/config.go b/source_pubsub/config.go index 457508b..edb3738 100644 --- a/source_pubsub/config.go +++ b/source_pubsub/config.go @@ -15,11 +15,15 @@ package source import ( + "context" "errors" "fmt" "net" "net/url" + "strings" "time" + + sdk "github.com/conduitio/conduit-connector-sdk" ) //go:generate paramgen -output=paramgen_config.go Config @@ -34,8 +38,10 @@ type Config struct { // OAuthEndpoint is the OAuthEndpoint from the salesforce app OAuthEndpoint string `json:"oauthEndpoint" validate:"required"` - // TopicName is the topic the source connector will subscribe to - TopicNames []string `json:"topicNames" validate:"required"` + // TopicName {WARN will be deprecated soon} the TopicName the source connector will subscribe to + TopicName string `json:"topicName"` + // TopicNames are the TopicNames the source connector will subscribe to + TopicNames []string `json:"topicNames"` // Deprecated: Username is the client secret from the salesforce app. Username string `json:"username"` @@ -55,7 +61,7 @@ type Config struct { RetryCount int `json:"retryCount" default:"10"` } -func (c Config) Validate() error { +func (c Config) Validate(ctx context.Context) (Config, error) { var errs []error if c.ClientID == "" { @@ -76,8 +82,28 @@ func (c Config) Validate() error { } } + // validate and set the TopicNames. + if len(c.TopicName) == 0 && len(c.TopicNames) == 0 { + errs = append(errs, fmt.Errorf("required parameter missing: %q", "TopicNames")) + } + + if len(c.TopicName) > 0 && len(c.TopicNames) > 0 { + errs = append(errs, fmt.Errorf(`can't provide both "TopicName" and "TopicNames" parameters, "TopicName" is deprecated and will be removed, use the "TopicNames" parameter instead`)) + } + if len(c.TopicName) > 0 && len(c.TopicNames) == 0 { + sdk.Logger(ctx).Warn().Msg(`"TopicName" parameter is deprecated and will be removed, please use "TopicNames" instead.`) + // add the TopicName value to the TopicNames slice. + c.TopicNames = make([]string, 1) + c.TopicNames[0] = c.TopicName + sdk.Logger(ctx).Warn(). + Str("topics", strings.Join(c.TopicNames, ",")). + Str("topic", c.TopicName). + Msg(`"TopicName" parameter is deprecated and will be removed, please use "TopicNames" instead.`) + + } + if len(c.TopicNames) == 0 { - errs = append(errs, fmt.Errorf("invalid topic name %q", c.TopicNames)) + errs = append(errs, fmt.Errorf("invalid TopicName name %q", c.TopicNames)) } if c.PollingPeriod == 0 { @@ -94,5 +120,5 @@ func (c Config) Validate() error { } } - return errors.Join(errs...) + return c, errors.Join(errs...) } diff --git a/source_pubsub/mock_client.go b/source_pubsub/mock_client.go index cf75803..3fba072 100644 --- a/source_pubsub/mock_client.go +++ b/source_pubsub/mock_client.go @@ -170,85 +170,6 @@ func (_c *mockClient_Next_Call) RunAndReturn(run func(context.Context) (sdk.Reco return _c } -// ReplayID provides a mock function with given fields: -func (_m *mockClient) ReplayID() []byte { - ret := _m.Called() - - if len(ret) == 0 { - panic("no return value specified for ReplayID") - } - - var r0 []byte - if rf, ok := ret.Get(0).(func() []byte); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]byte) - } - } - - return r0 -} - -// mockClient_ReplayID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReplayID' -type mockClient_ReplayID_Call struct { - *mock.Call -} - -// ReplayID is a helper method to define mock.On call -func (_e *mockClient_Expecter) ReplayID() *mockClient_ReplayID_Call { - return &mockClient_ReplayID_Call{Call: _e.mock.On("ReplayID")} -} - -func (_c *mockClient_ReplayID_Call) Run(run func()) *mockClient_ReplayID_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *mockClient_ReplayID_Call) Return(_a0 []byte) *mockClient_ReplayID_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *mockClient_ReplayID_Call) RunAndReturn(run func() []byte) *mockClient_ReplayID_Call { - _c.Call.Return(run) - return _c -} - -// ResetRetryCount provides a mock function with given fields: -func (_m *mockClient) ResetRetryCount() { - _m.Called() -} - -// mockClient_ResetRetryCount_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ResetRetryCount' -type mockClient_ResetRetryCount_Call struct { - *mock.Call -} - -// ResetRetryCount is a helper method to define mock.On call -func (_e *mockClient_Expecter) ResetRetryCount() *mockClient_ResetRetryCount_Call { - return &mockClient_ResetRetryCount_Call{Call: _e.mock.On("ResetRetryCount")} -} - -func (_c *mockClient_ResetRetryCount_Call) Run(run func()) *mockClient_ResetRetryCount_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *mockClient_ResetRetryCount_Call) Return() *mockClient_ResetRetryCount_Call { - _c.Call.Return() - return _c -} - -func (_c *mockClient_ResetRetryCount_Call) RunAndReturn(run func()) *mockClient_ResetRetryCount_Call { - _c.Call.Return(run) - return _c -} - // Stop provides a mock function with given fields: _a0 func (_m *mockClient) Stop(_a0 context.Context) { _m.Called(_a0) diff --git a/source_pubsub/paramgen_config.go b/source_pubsub/paramgen_config.go index aaa6707..68a60b6 100644 --- a/source_pubsub/paramgen_config.go +++ b/source_pubsub/paramgen_config.go @@ -63,13 +63,17 @@ func (Config) Parameters() map[string]sdk.Parameter { Type: sdk.ParameterTypeInt, Validations: []sdk.Validation{}, }, + "topicName": { + Default: "", + Description: "topicName {WARN will be deprecated soon} the topicName the source connector will subscribe to", + Type: sdk.ParameterTypeString, + Validations: []sdk.Validation{}, + }, "topicNames": { Default: "", - Description: "TopicName is the topic the source connector will subscribe to", + Description: "topicNames are the topicNames the source connector will subscribe to", Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{ - sdk.ValidationRequired{}, - }, + Validations: []sdk.Validation{}, }, "username": { Default: "", diff --git a/source_pubsub/position/position.go b/source_pubsub/position/position.go index 5d1949a..5824ecc 100644 --- a/source_pubsub/position/position.go +++ b/source_pubsub/position/position.go @@ -33,6 +33,12 @@ func ParseSDKPosition(sdkPos sdk.Position) (Topics, error) { return p, nil } +func NewTopicPosition() Topics { + var p Topics + p.Topics = make(TopicPositions) + return p +} + func (p Topics) SetTopics(topics []string) { for _, topic := range topics { if _, ok := p.Topics[topic]; !ok { @@ -62,7 +68,7 @@ func (p Topics) SetTopicReplayID(topic string, replayID []byte) { topicEvent.ReadTime = time.Now() p.Topics[topic] = topicEvent } else { - //should never be even reaching this point, something went wrong if we do + // should never be even reaching this point, something went wrong if we do panic(fmt.Errorf("attempting to set replay id - %b on topic %s, topic doesn't exist on position", replayID, topic)) } } diff --git a/source_pubsub/source.go b/source_pubsub/source.go index 84c2985..a8550b4 100644 --- a/source_pubsub/source.go +++ b/source_pubsub/source.go @@ -18,6 +18,7 @@ import ( "context" "encoding/base64" "fmt" + "strings" "github.com/conduitio-labs/conduit-connector-salesforce/source_pubsub/position" sdk "github.com/conduitio/conduit-connector-sdk" @@ -30,7 +31,6 @@ type client interface { Stop(context.Context) Close(context.Context) error Wait(context.Context) error - ResetRetryCount() } var _ client = (*PubSubClient)(nil) @@ -49,29 +49,45 @@ func (s *Source) Parameters() map[string]sdk.Parameter { return s.config.Parameters() } -func (s *Source) Configure(_ context.Context, cfg map[string]string) error { - if err := sdk.Util.ParseConfig(cfg, &s.config); err != nil { +func (s *Source) Configure(ctx context.Context, cfg map[string]string) error { + var config Config + + if err := sdk.Util.ParseConfig(cfg, &config); err != nil { return fmt.Errorf("failed to parse config - %s", err) } - if err := s.config.Validate(); err != nil { + config, err := config.Validate(ctx) + if err != nil { return fmt.Errorf("config failed to validate: %w", err) } + s.config = config + return nil } func (s *Source) Open(ctx context.Context, sdkPos sdk.Position) error { logger := sdk.Logger(ctx) + var parsedPositions position.Topics + var err error logger.Debug(). Str("at", "source.open"). Str("position", base64.StdEncoding.EncodeToString(sdkPos)). - Msg("OPEN START AT ") + Str("topic", s.config.TopicName). + Str("topics", strings.Join(s.config.TopicNames, ",")). + Msg("Open Source Connector") + + if len(s.config.TopicName) > 0 { + parsedPositions = position.NewTopicPosition() + parsedPositions.SetTopics(s.config.TopicNames) + parsedPositions.SetTopicReplayID(s.config.TopicName, sdkPos) + } else { + parsedPositions, err = position.ParseSDKPosition(sdkPos) + } - parsedPositions, err := position.ParseSDKPosition(sdkPos) if err != nil { - return fmt.Errorf("could not parsed sdk position %w", sdkPos) + return fmt.Errorf("could not parsed sdk position %v", sdkPos) } client, err := NewGRPCClient(ctx, s.config, parsedPositions) if err != nil { @@ -84,12 +100,25 @@ func (s *Source) Open(ctx context.Context, sdkPos sdk.Position) error { s.client = client + for _, t := range s.config.TopicNames { + p := parsedPositions.GetTopicReplayID(t) + logger.Debug(). + Str("at", "source.open"). + Str("position", string(p)). + Str("position encoded", base64.StdEncoding.EncodeToString(p)). + Str("topic", t). + Msgf("Grpc Client has been set. Will begin read for topic: %s", t) + } + return nil } func (s *Source) Read(ctx context.Context) (rec sdk.Record, err error) { logger := sdk.Logger(ctx) - s.client.ResetRetryCount() + logger.Debug(). + Str("topic", s.config.TopicName). + Str("topics", strings.Join(s.config.TopicNames, ",")). + Msg("begin read") r, err := s.client.Next(ctx) if err != nil { @@ -105,9 +134,6 @@ func (s *Source) Read(ctx context.Context) (rec sdk.Record, err error) { return sdk.Record{}, sdk.ErrBackoffRetry } - // setting topic name as collection - //r.Metadata.SetCollection(s.config.TopicName) - topic, err := r.Metadata.GetCollection() if err != nil { return sdk.Record{}, err @@ -115,7 +141,8 @@ func (s *Source) Read(ctx context.Context) (rec sdk.Record, err error) { logger.Debug(). Str("at", "source.read"). - Str("position", base64.StdEncoding.EncodeToString(r.Position)). + Str("position encoded", base64.StdEncoding.EncodeToString(r.Position)). + Str("position", string(r.Position)). Str("record on topic", topic). Msg("sending record") diff --git a/source_pubsub/source_test.go b/source_pubsub/source_test.go index 25fa9f4..2e5e72f 100644 --- a/source_pubsub/source_test.go +++ b/source_pubsub/source_test.go @@ -29,7 +29,8 @@ func Test_Read(t *testing.T) { Position: []byte("test1"), Operation: sdk.OperationCreate, Metadata: sdk.Metadata{ - "test1": "test", + "test1": "test", + "opencdc.collection": "test", }, Key: sdk.StructuredData{ "test1": "test", @@ -45,7 +46,7 @@ func Test_Read(t *testing.T) { ClientID: "test-client-id", ClientSecret: "test-client-secret", OAuthEndpoint: "https://somewhere", - TopicNames: []string{"/events/TestEvent__e"}, + TopicNames: []string{"/events/TestEvent__e", "/events/TestEvent2__e"}, } testCases := []struct { @@ -60,7 +61,6 @@ func Test_Read(t *testing.T) { config: testConfig, mockClient: func() *mockClient { m := newMockClient(t) - m.On("ResetRetryCount").Return(nil) m.On("Next", mock.Anything).Return(testRecord, nil) return m @@ -73,7 +73,6 @@ func Test_Read(t *testing.T) { mockClient: func() *mockClient { m := newMockClient(t) m.On("Next", mock.Anything).Return(sdk.Record{}, nil).Times(1) - m.On("ResetRetryCount").Return(nil) return m }, expectedErr: sdk.ErrBackoffRetry, @@ -84,7 +83,6 @@ func Test_Read(t *testing.T) { config: testConfig, mockClient: func() *mockClient { m := newMockClient(t) - m.On("ResetRetryCount").Return(nil) m.On("Next", mock.Anything).Return(sdk.Record{}, errors.New("error receiving new events - test error")).Times(1) return m }, @@ -95,7 +93,6 @@ func Test_Read(t *testing.T) { config: testConfig, mockClient: func() *mockClient { m := newMockClient(t) - m.On("ResetRetryCount").Return(nil) m.On("Next", mock.Anything).Return(sdk.Record{Payload: sdk.Change{Before: nil, After: nil}}, nil).Times(1) return m },