Skip to content

Commit

Permalink
allow old topic name config and support
Browse files Browse the repository at this point in the history
  • Loading branch information
anna-cross committed Jul 22, 2024
1 parent ab49ca8 commit c59c6e2
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 143 deletions.
81 changes: 45 additions & 36 deletions source_pubsub/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,19 @@ type PubSubClient struct {
unionFields map[string]map[string]struct{}

buffer chan sdk.Record
caches chan []ConnectResponseEvent
ticker *time.Ticker

tomb *tomb.Tomb
topicNames []string
sdkPos position.Topics
retryCount int
maxRetries int
}

type Topic struct {
retryCount int
topicName string
}

type ConnectResponseEvent struct {
Data map[string]interface{}
EventID string
Expand All @@ -87,6 +90,11 @@ type ConnectResponseEvent struct {

// Creates a new connection to the gRPC server and returns the wrapper struct.
func NewGRPCClient(ctx context.Context, config Config, sdkPos position.Topics) (*PubSubClient, error) {
sdk.Logger(ctx).Info().
Str("topics", strings.Join(config.TopicNames, ",")).
Str("topic", config.TopicName).
Msgf("Starting GRPC client")

var transportCreds credentials.TransportCredentials
var replayPreset proto.ReplayPreset

Expand Down Expand Up @@ -136,13 +144,14 @@ func NewGRPCClient(ctx context.Context, config Config, sdkPos position.Topics) (
sdkPos: sdkPos,
oauth: &oauth{Credentials: creds},
tomb: t,
retryCount: config.RetryCount,
maxRetries: config.RetryCount,
}, nil
}

// Initializes the pubsub client by authenticating and.
func (c *PubSubClient) Initialize(ctx context.Context) error {
sdk.Logger(ctx).Info().Msgf("Initizalizing PubSub client")

if err := c.login(ctx); err != nil {
return err
}
Expand All @@ -154,8 +163,10 @@ func (c *PubSubClient) Initialize(ctx context.Context) error {
for _, topic := range c.topicNames {
c.tomb.Go(func() error {
ctx := c.tomb.Context(nil) //nolint:staticcheck // SA1012 tomb expects nil
c.startCDC(ctx, topic)
return nil
return c.startCDC(ctx, Topic{
topicName: topic,
retryCount: c.maxRetries,
})
})
}

Expand Down Expand Up @@ -271,39 +282,35 @@ func (c *PubSubClient) Wait(ctx context.Context) error {
}
}

func (c *PubSubClient) ResetRetryCount() {
c.retryCount = c.maxRetries
}

func (c *PubSubClient) retryAuth(ctx context.Context, retry bool) (bool, error) {
func (c *PubSubClient) retryAuth(ctx context.Context, retry bool, topic Topic) (bool, Topic, error) {
var err error
sdk.Logger(ctx).Info().Msgf("retry connection - retries remaining %d ", c.retryCount)
c.retryCount--
sdk.Logger(ctx).Info().Msgf("retry connection on topic %s - retries remaining %d ", topic.topicName, topic.retryCount)
topic.retryCount--

if err = c.login(ctx); err != nil && c.retryCount <= 0 {
return retry, fmt.Errorf("failed to refresh auth: %w", err)
if err = c.login(ctx); err != nil && topic.retryCount <= 0 {
return retry, topic, fmt.Errorf("failed to refresh auth: %w", err)
} else if err != nil {
sdk.Logger(ctx).Info().Msgf("received error on login - retry - %d ", c.retryCount)
sdk.Logger(ctx).Info().Msgf("received error on login for topic %s - retry - %d : %v ", topic.topicName, topic.retryCount, err)
retry = true
return retry, fmt.Errorf("received error on subscribe - retry - %d ", c.retryCount)
return retry, topic, fmt.Errorf("received error on subscribe for topic %s - retry - %d : %w", topic.topicName, topic.retryCount, err)
}

if err := c.canSubscribe(ctx); err != nil && c.retryCount <= 0 {
return retry, fmt.Errorf("failed to subscribe to client topic: %w", err)
if err := c.canSubscribe(ctx); err != nil && topic.retryCount <= 0 {
return retry, topic, fmt.Errorf("failed to subscribe to client topic %s: %w", topic.topicName, err)
} else if err != nil {
sdk.Logger(ctx).Info().Msgf("received error on subscribe - retry - %d ", c.retryCount)
sdk.Logger(ctx).Info().Msgf("received error on subscribe for topic %s - retry - %d : %v", topic.topicName, topic.retryCount, err)
retry = true
return retry, fmt.Errorf("received error on subscribe - retry - %d ", c.retryCount)
return retry, topic, fmt.Errorf("received error on subscribe for topic %s - retry - %d : %w ", topic.topicName, topic.retryCount, err)
}

retry = false
return retry, nil
return retry, topic, nil
}

func (c *PubSubClient) startCDC(ctx context.Context, topic string) error {
func (c *PubSubClient) startCDC(ctx context.Context, topic Topic) error {
sdk.Logger(ctx).Info().
Str("topic", topic).
Str("replayID", string(c.sdkPos.GetTopicReplayID(topic))).
Str("topic", topic.topicName).
Str("replayID", string(c.sdkPos.GetTopicReplayID(topic.topicName))).
Msg("starting CDC processing..")

var (
Expand All @@ -315,46 +322,49 @@ func (c *PubSubClient) startCDC(ctx context.Context, topic string) error {
for {
if retry {
err := rt.Do(func() error {
retry, err = c.retryAuth(ctx, retry)
retry, topic, err = c.retryAuth(ctx, retry, topic)
return err
},
rt.Delay(RetryDelay),
rt.Attempts(uint(c.retryCount)),
rt.Attempts(uint(topic.retryCount)),
)
if err != nil {
return fmt.Errorf("error retrying (number of retries %d) auth - %s", c.retryCount, err)
return fmt.Errorf("error retrying (number of retries %d) for topic %s auth - %s", topic.retryCount, topic.topicName, err)
}

// once we are done with retries, reset the count
topic.retryCount = c.maxRetries
}

select {
case <-ctx.Done():
return ctx.Err()
case <-c.ticker.C: // detect changes every polling period.

replayID := c.sdkPos.GetTopicReplayID(topic)
replayID := c.sdkPos.GetTopicReplayID(topic.topicName)

sdk.Logger(ctx).Debug().
Dur("elapsed", time.Since(lastRecvdAt)).
Str("topic", topic).
Str("topic", topic.topicName).
Str("replayID", base64.StdEncoding.EncodeToString(replayID)).
Msg("attempting to receive new events")

lastRecvdAt = time.Now().UTC()

events, err := c.Recv(ctx, topic, replayID)
events, err := c.Recv(ctx, topic.topicName, replayID)
if err != nil {
if c.invalidReplayIDErr(err) {
sdk.Logger(ctx).Error().Err(err).
Str("topic", topic).
Str("topic", topic.topicName).
Str("replayID", string(replayID)).
Msgf("replay id %s is invalid, retrying", string(replayID))
c.sdkPos.SetTopicReplayID(topic, nil)
c.sdkPos.SetTopicReplayID(topic.topicName, nil)
break
}

if c.retryCount > 0 {
if topic.retryCount > 0 {
sdk.Logger(ctx).Error().Err(err).
Str("topic", topic).
Str("topic", topic.topicName).
Str("replayID", string(replayID)).
Msg("retrying authentication")
retry = true
Expand All @@ -367,7 +377,7 @@ func (c *PubSubClient) startCDC(ctx context.Context, topic string) error {
sdk.Logger(ctx).Debug().
Int("events", len(events)).
Dur("elapsed", time.Since(lastRecvdAt)).
Str("topic", topic).
Str("topic", topic.topicName).
Msg("received events")

for _, e := range events {
Expand Down Expand Up @@ -395,7 +405,6 @@ func (*PubSubClient) connErr(err error) bool {
}

func (c *PubSubClient) buildRecord(event ConnectResponseEvent) sdk.Record {

// TODO - ADD something here to distinguish creates, deletes, updates.

return sdk.Util.Source.NewRecordCreate(
Expand Down
36 changes: 31 additions & 5 deletions source_pubsub/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
package source

import (
"context"
"errors"
"fmt"
"net"
"net/url"
"strings"
"time"

sdk "github.com/conduitio/conduit-connector-sdk"
)

//go:generate paramgen -output=paramgen_config.go Config
Expand All @@ -34,8 +38,10 @@ type Config struct {
// OAuthEndpoint is the OAuthEndpoint from the salesforce app
OAuthEndpoint string `json:"oauthEndpoint" validate:"required"`

// TopicName is the topic the source connector will subscribe to
TopicNames []string `json:"topicNames" validate:"required"`
// TopicName {WARN will be deprecated soon} the TopicName the source connector will subscribe to
TopicName string `json:"topicName"`
// TopicNames are the TopicNames the source connector will subscribe to
TopicNames []string `json:"topicNames"`

// Deprecated: Username is the client secret from the salesforce app.
Username string `json:"username"`
Expand All @@ -55,7 +61,7 @@ type Config struct {
RetryCount int `json:"retryCount" default:"10"`
}

func (c Config) Validate() error {
func (c Config) Validate(ctx context.Context) (Config, error) {
var errs []error

if c.ClientID == "" {
Expand All @@ -76,8 +82,28 @@ func (c Config) Validate() error {
}
}

// validate and set the TopicNames.
if len(c.TopicName) == 0 && len(c.TopicNames) == 0 {
errs = append(errs, fmt.Errorf("required parameter missing: %q", "TopicNames"))
}

if len(c.TopicName) > 0 && len(c.TopicNames) > 0 {
errs = append(errs, fmt.Errorf(`can't provide both "TopicName" and "TopicNames" parameters, "TopicName" is deprecated and will be removed, use the "TopicNames" parameter instead`))
}
if len(c.TopicName) > 0 && len(c.TopicNames) == 0 {
sdk.Logger(ctx).Warn().Msg(`"TopicName" parameter is deprecated and will be removed, please use "TopicNames" instead.`)
// add the TopicName value to the TopicNames slice.
c.TopicNames = make([]string, 1)
c.TopicNames[0] = c.TopicName
sdk.Logger(ctx).Warn().
Str("topics", strings.Join(c.TopicNames, ",")).
Str("topic", c.TopicName).
Msg(`"TopicName" parameter is deprecated and will be removed, please use "TopicNames" instead.`)

}

Check failure on line 103 in source_pubsub/config.go

View workflow job for this annotation

GitHub Actions / golangci-lint

unnecessary trailing newline (whitespace)

if len(c.TopicNames) == 0 {
errs = append(errs, fmt.Errorf("invalid topic name %q", c.TopicNames))
errs = append(errs, fmt.Errorf("invalid TopicName name %q", c.TopicNames))
}

if c.PollingPeriod == 0 {
Expand All @@ -94,5 +120,5 @@ func (c Config) Validate() error {
}
}

return errors.Join(errs...)
return c, errors.Join(errs...)
}
79 changes: 0 additions & 79 deletions source_pubsub/mock_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions source_pubsub/paramgen_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion source_pubsub/position/position.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ func ParseSDKPosition(sdkPos sdk.Position) (Topics, error) {
return p, nil
}

func NewTopicPosition() Topics {
var p Topics
p.Topics = make(TopicPositions)
return p
}

func (p Topics) SetTopics(topics []string) {
for _, topic := range topics {
if _, ok := p.Topics[topic]; !ok {
Expand Down Expand Up @@ -62,7 +68,7 @@ func (p Topics) SetTopicReplayID(topic string, replayID []byte) {
topicEvent.ReadTime = time.Now()
p.Topics[topic] = topicEvent
} else {
//should never be even reaching this point, something went wrong if we do
// should never be even reaching this point, something went wrong if we do
panic(fmt.Errorf("attempting to set replay id - %b on topic %s, topic doesn't exist on position", replayID, topic))
}
}
Expand Down
Loading

0 comments on commit c59c6e2

Please sign in to comment.