Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source multiple topics #120

Merged
merged 13 commits into from
Sep 10, 2024
163 changes: 101 additions & 62 deletions source_pubsub/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

rt "github.com/avast/retry-go/v4"
"github.com/conduitio-labs/conduit-connector-salesforce/source_pubsub/position"
"github.com/conduitio-labs/conduit-connector-salesforce/source_pubsub/proto"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/linkedin/goavro/v2"
Expand Down Expand Up @@ -66,23 +67,26 @@ type PubSubClient struct {
unionFields map[string]map[string]struct{}

buffer chan sdk.Record
caches chan []ConnectResponseEvent
ticker *time.Ticker
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If each topic gets its own consumer, then each will need its own ticker. Since each ticker provides a channel and the channel is drained once per tick, it is unclear how many of the CDC consumers will actually execute a fetch.


currReplayID []byte
tomb *tomb.Tomb
topicName string
retryCount int
maxRetries int
tomb *tomb.Tomb
topicNames []string
sdkPos position.Topics
retryCount int
maxRetries int
}

type ConnectResponseEvent struct {
Data map[string]interface{}
EventID string
ReplayID []byte
Data map[string]interface{}
EventID string
ReplayID []byte
Topic string
ReceivedAt time.Time
}

// Creates a new connection to the gRPC server and returns the wrapper struct.
func NewGRPCClient(ctx context.Context, config Config, sdkPos sdk.Position) (*PubSubClient, error) {
func NewGRPCClient(ctx context.Context, config Config, sdkPos position.Topics) (*PubSubClient, error) {
var transportCreds credentials.TransportCredentials
var replayPreset proto.ReplayPreset

Expand Down Expand Up @@ -118,16 +122,18 @@ func NewGRPCClient(ctx context.Context, config Config, sdkPos sdk.Position) (*Pu
replayPreset = proto.ReplayPreset_EARLIEST
}

sdkPos.SetTopics(config.TopicNames)
lyuboxa marked this conversation as resolved.
Show resolved Hide resolved

return &PubSubClient{
conn: conn,
pubSubClient: proto.NewPubSubClient(conn),
codecCache: make(map[string]*goavro.Codec),
unionFields: make(map[string]map[string]struct{}),
currReplayID: sdkPos,
replayPreset: replayPreset,
buffer: make(chan sdk.Record),
ticker: time.NewTicker(config.PollingPeriod),
topicName: config.TopicName,
topicNames: config.TopicNames,
sdkPos: sdkPos,
lyuboxa marked this conversation as resolved.
Show resolved Hide resolved
oauth: &oauth{Credentials: creds},
tomb: t,
retryCount: config.RetryCount,
Expand All @@ -145,10 +151,13 @@ func (c *PubSubClient) Initialize(ctx context.Context) error {
return err
}

c.tomb.Go(func() error {
ctx := c.tomb.Context(nil) //nolint:staticcheck // SA1012 tomb expects nil
return c.startCDC(ctx)
})
for _, topic := range c.topicNames {
c.tomb.Go(func() error {
ctx := c.tomb.Context(nil) //nolint:staticcheck // SA1012 tomb expects nil
c.startCDC(ctx, topic)
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anna-cross is it intended to not return the result of c.startCDC()? if so, what's the context?

My understanding is that this is anyways within the func() passed in above, so it shouldn't break the for loop

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woops yes! I'll move it back into return

})
}

go func() {
<-c.tomb.Dead()
Expand Down Expand Up @@ -183,7 +192,7 @@ func (c *PubSubClient) login(ctx context.Context) error {
Str("instance_url", c.instanceURL).
Str("user_id", c.userID).
Str("org_id", c.orgID).
Str("topic", c.topicName).
Str("topic", strings.Join(c.topicNames, ",")).
lyuboxa marked this conversation as resolved.
Show resolved Hide resolved
Msg("successfully authenticated")

return nil
Expand All @@ -193,26 +202,28 @@ func (c *PubSubClient) login(ctx context.Context) error {
func (c *PubSubClient) canSubscribe(_ context.Context) error {
var trailer metadata.MD

req := &proto.TopicRequest{
TopicName: c.topicName,
}
for _, topic := range c.topicNames {
req := &proto.TopicRequest{
TopicName: topic,
}

ctx, cancel := context.WithTimeout(c.getAuthContext(), GRPCCallTimeout)
defer cancel()
ctx, cancel := context.WithTimeout(c.getAuthContext(), GRPCCallTimeout)
defer cancel()

resp, err := c.pubSubClient.GetTopic(ctx, req, grpc.Trailer(&trailer))
if err != nil {
return fmt.Errorf("failed to retrieve topic %q: %w", c.topicName, err)
}
resp, err := c.pubSubClient.GetTopic(ctx, req, grpc.Trailer(&trailer))
if err != nil {
return fmt.Errorf("failed to retrieve topic %q: %w", topic, err)
}

if !resp.CanSubscribe {
return fmt.Errorf("user %q not allowed to subscribe to %q", c.userID, resp.TopicName)
}
if !resp.CanSubscribe {
return fmt.Errorf("user %q not allowed to subscribe to %q", c.userID, resp.TopicName)
}

sdk.Logger(ctx).Debug().
Bool("can_subscribe", resp.CanSubscribe).
Str("topic_name", resp.TopicName).
Msgf("client allowed to subscribe to events on %q", c.topicName)
sdk.Logger(ctx).Debug().
Bool("can_subscribe", resp.CanSubscribe).
Str("topic_name", resp.TopicName).
lyuboxa marked this conversation as resolved.
Show resolved Hide resolved
Msgf("client allowed to subscribe to events on %q", topic)
}

return nil
}
Expand All @@ -239,7 +250,9 @@ func (c *PubSubClient) Stop(ctx context.Context) {
_ = c.tomb.Killf("cdc iterator is stopping")

sdk.Logger(ctx).Debug().
Msgf("stopping pubsub client on topic %q", c.topicName)
Msgf("stopping pubsub client on topics %q", strings.Join(c.topicNames, ","))

c.topicNames = nil
}

func (c *PubSubClient) Wait(ctx context.Context) error {
Expand All @@ -262,10 +275,6 @@ func (c *PubSubClient) ResetRetryCount() {
c.retryCount = c.maxRetries
}

func (c *PubSubClient) ReplayID() []byte {
return c.currReplayID
}

func (c *PubSubClient) retryAuth(ctx context.Context, retry bool) (bool, error) {
var err error
sdk.Logger(ctx).Info().Msgf("retry connection - retries remaining %d ", c.retryCount)
Expand All @@ -291,8 +300,11 @@ func (c *PubSubClient) retryAuth(ctx context.Context, retry bool) (bool, error)
return retry, nil
}

func (c *PubSubClient) startCDC(ctx context.Context) error {
sdk.Logger(ctx).Info().Msg("starting CDC processing..")
func (c *PubSubClient) startCDC(ctx context.Context, topic string) error {
sdk.Logger(ctx).Info().
Str("topic", topic).
Str("replayID", string(c.sdkPos.GetTopicReplayID(topic))).
Msg("starting CDC processing..")

var (
retry bool
Expand All @@ -318,22 +330,33 @@ func (c *PubSubClient) startCDC(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case <-c.ticker.C: // detect changes every polling period.

replayID := c.sdkPos.GetTopicReplayID(topic)

sdk.Logger(ctx).Debug().
Dur("elapsed", time.Since(lastRecvdAt)).
Str("topic", topic).
Str("replayID", base64.StdEncoding.EncodeToString(replayID)).
Msg("attempting to receive new events")

lastRecvdAt = time.Now().UTC()

events, err := c.Recv(ctx)
events, err := c.Recv(ctx, topic, replayID)
if err != nil {
if c.invalidReplayIDErr(err) {
sdk.Logger(ctx).Error().Err(err).Msgf("replay id %s is invalid, retrying", string(c.currReplayID))
c.currReplayID = nil
sdk.Logger(ctx).Error().Err(err).
Str("topic", topic).
Str("replayID", string(replayID)).
Msgf("replay id %s is invalid, retrying", string(replayID))
c.sdkPos.SetTopicReplayID(topic, nil)
break
}

if c.retryCount > 0 {
sdk.Logger(ctx).Error().Err(err).Msg("retrying authentication")
sdk.Logger(ctx).Error().Err(err).
Str("topic", topic).
Str("replayID", string(replayID)).
Msg("retrying authentication")
retry = true
break
}
Expand All @@ -344,11 +367,18 @@ func (c *PubSubClient) startCDC(ctx context.Context) error {
sdk.Logger(ctx).Debug().
Int("events", len(events)).
Dur("elapsed", time.Since(lastRecvdAt)).
Str("topic", topic).
Msg("received events")

for _, e := range events {
c.sdkPos.SetTopicReplayID(e.Topic, e.ReplayID)
c.buffer <- c.buildRecord(e)
c.currReplayID = e.ReplayID
sdk.Logger(ctx).Debug().
Int("events", len(events)).
Dur("elapsed", time.Since(lastRecvdAt)).
Str("topic", e.Topic).
Str("replayID", base64.StdEncoding.EncodeToString(e.ReplayID)).
Msg("record sent to buffer")
}
}
}
Expand All @@ -365,10 +395,14 @@ func (*PubSubClient) connErr(err error) bool {
}

func (c *PubSubClient) buildRecord(event ConnectResponseEvent) sdk.Record {

// TODO - ADD something here to distinguish creates, deletes, updates.

return sdk.Util.Source.NewRecordCreate(
sdk.Position(event.ReplayID),
sdk.Metadata{},
c.sdkPos.ToSDKPosition(),
sdk.Metadata{
"opencdc.collection": event.Topic,
},
sdk.StructuredData{
"replayId": event.ReplayID,
"id": event.EventID,
Expand Down Expand Up @@ -417,23 +451,24 @@ func (c *PubSubClient) Subscribe(
ctx context.Context,
replayPreset proto.ReplayPreset,
replayID []byte,
topic string,
) (proto.PubSub_SubscribeClient, error) {
start := time.Now().UTC()

subscribeClient, err := c.pubSubClient.Subscribe(c.getAuthContext())
if err != nil {
return nil, fmt.Errorf("failed to subscribe to topic %q: %w", c.topicName, err)
return nil, fmt.Errorf("failed to subscribe to topic %q: %w", topic, err)
}

sdk.Logger(ctx).Debug().
Str("replay_id", base64.StdEncoding.EncodeToString(replayID)).
Str("replay_preset", proto.ReplayPreset_name[int32(replayPreset)]).
Str("topic_name", c.topicName).
Str("topic_name", topic).
Dur("elapsed", time.Since(start)).
Msgf("subscribed to %q", c.topicName)
Msgf("subscribed to %q", topic)

initialFetchRequest := &proto.FetchRequest{
TopicName: c.topicName,
TopicName: topic,
ReplayPreset: replayPreset,
NumRequested: 1,
}
Expand All @@ -453,41 +488,41 @@ func (c *PubSubClient) Subscribe(
sdk.Logger(ctx).Debug().
Str("replay_id", base64.StdEncoding.EncodeToString(replayID)).
Str("replay_preset", proto.ReplayPreset_name[int32(replayPreset)]).
Str("topic_name", c.topicName).
Str("topic_name", topic).
Dur("elapsed", time.Since(start)).
Msg("first request sent")

return subscribeClient, nil
}

func (c *PubSubClient) Recv(ctx context.Context) ([]ConnectResponseEvent, error) {
func (c *PubSubClient) Recv(ctx context.Context, topic string, replayID []byte) ([]ConnectResponseEvent, error) {
var (
replayID []byte
preset = c.replayPreset
start = time.Now().UTC()
preset = c.replayPreset
start = time.Now().UTC()
)

if len(c.currReplayID) > 0 {
if len(replayID) > 0 {
preset = proto.ReplayPreset_CUSTOM
replayID = c.currReplayID
}

sdk.Logger(ctx).Info().
Str("preset", preset.String()).
Str("replay_id", base64.StdEncoding.EncodeToString(replayID)).
Str("topic", topic).
Msg("preparing to subscribe")

subClient, err := c.Subscribe(ctx, preset, replayID)
subClient, err := c.Subscribe(ctx, preset, replayID, topic)
if err != nil {
return nil, fmt.Errorf("error subscribing to topic on custom replay id %q: %w",
base64.StdEncoding.EncodeToString(c.currReplayID),
base64.StdEncoding.EncodeToString(replayID),
err,
)
}

sdk.Logger(ctx).Info().
Str("preset", preset.String()).
Str("replay_id", base64.StdEncoding.EncodeToString(replayID)).
Str("topic", topic).
Dur("elapsed", time.Since(start)).
Msg("preparing to receive events")

Expand All @@ -500,6 +535,7 @@ func (c *PubSubClient) Recv(ctx context.Context) ([]ConnectResponseEvent, error)
sdk.Logger(ctx).Warn().
Str("preset", preset.String()).
Str("replay_id", base64.StdEncoding.EncodeToString(replayID)).
Str("topic", topic).
Dur("elapsed", time.Since(start)).
Err(err).
Msg("error while receiving events - retrying to connect")
Expand All @@ -511,6 +547,7 @@ func (c *PubSubClient) Recv(ctx context.Context) ([]ConnectResponseEvent, error)
sdk.Logger(ctx).Info().
Str("preset", preset.String()).
Str("replay_id", base64.StdEncoding.EncodeToString(replayID)).
Str("topic", topic).
Int("events", len(resp.Events)).
Dur("elapsed", time.Since(start)).
Msg("subscriber received events")
Expand All @@ -534,9 +571,11 @@ func (c *PubSubClient) Recv(ctx context.Context) ([]ConnectResponseEvent, error)
}

events = append(events, ConnectResponseEvent{
ReplayID: e.ReplayId,
EventID: e.Event.Id,
Data: flattenUnionFields(ctx, payload, c.unionFields[e.Event.SchemaId]),
ReplayID: e.ReplayId,
EventID: e.Event.Id,
Data: flattenUnionFields(ctx, payload, c.unionFields[e.Event.SchemaId]),
Topic: topic,
ReceivedAt: time.Now(),
})
}

Expand Down
6 changes: 3 additions & 3 deletions source_pubsub/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Config struct {
OAuthEndpoint string `json:"oauthEndpoint" validate:"required"`

// TopicName is the topic the source connector will subscribe to
TopicName string `json:"topicName" validate:"required"`
TopicNames []string `json:"topicNames" validate:"required"`

// Deprecated: Username is the client secret from the salesforce app.
Username string `json:"username"`
Expand Down Expand Up @@ -76,8 +76,8 @@ func (c Config) Validate() error {
}
}

if c.TopicName == "" {
errs = append(errs, fmt.Errorf("invalid topic name %q", c.TopicName))
if len(c.TopicNames) == 0 {
errs = append(errs, fmt.Errorf("invalid topic name %q", c.TopicNames))
}

if c.PollingPeriod == 0 {
Expand Down
4 changes: 2 additions & 2 deletions source_pubsub/paramgen_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading