Skip to content

Commit

Permalink
Multiple Topics Support on Source
Browse files Browse the repository at this point in the history
  • Loading branch information
anna-cross committed Jul 17, 2024
1 parent 3f129e0 commit ab49ca8
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 71 deletions.
163 changes: 101 additions & 62 deletions source_pubsub/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"time"

rt "github.com/avast/retry-go/v4"
"github.com/conduitio-labs/conduit-connector-salesforce/source_pubsub/position"
"github.com/conduitio-labs/conduit-connector-salesforce/source_pubsub/proto"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/linkedin/goavro/v2"
Expand Down Expand Up @@ -66,23 +67,26 @@ type PubSubClient struct {
unionFields map[string]map[string]struct{}

buffer chan sdk.Record
caches chan []ConnectResponseEvent
ticker *time.Ticker

currReplayID []byte
tomb *tomb.Tomb
topicName string
retryCount int
maxRetries int
tomb *tomb.Tomb
topicNames []string
sdkPos position.Topics
retryCount int
maxRetries int
}

type ConnectResponseEvent struct {
Data map[string]interface{}
EventID string
ReplayID []byte
Data map[string]interface{}
EventID string
ReplayID []byte
Topic string
ReceivedAt time.Time
}

// Creates a new connection to the gRPC server and returns the wrapper struct.
func NewGRPCClient(ctx context.Context, config Config, sdkPos sdk.Position) (*PubSubClient, error) {
func NewGRPCClient(ctx context.Context, config Config, sdkPos position.Topics) (*PubSubClient, error) {
var transportCreds credentials.TransportCredentials
var replayPreset proto.ReplayPreset

Expand Down Expand Up @@ -118,16 +122,18 @@ func NewGRPCClient(ctx context.Context, config Config, sdkPos sdk.Position) (*Pu
replayPreset = proto.ReplayPreset_EARLIEST
}

sdkPos.SetTopics(config.TopicNames)

return &PubSubClient{
conn: conn,
pubSubClient: proto.NewPubSubClient(conn),
codecCache: make(map[string]*goavro.Codec),
unionFields: make(map[string]map[string]struct{}),
currReplayID: sdkPos,
replayPreset: replayPreset,
buffer: make(chan sdk.Record),
ticker: time.NewTicker(config.PollingPeriod),
topicName: config.TopicName,
topicNames: config.TopicNames,
sdkPos: sdkPos,
oauth: &oauth{Credentials: creds},
tomb: t,
retryCount: config.RetryCount,
Expand All @@ -145,10 +151,13 @@ func (c *PubSubClient) Initialize(ctx context.Context) error {
return err
}

c.tomb.Go(func() error {
ctx := c.tomb.Context(nil) //nolint:staticcheck // SA1012 tomb expects nil
return c.startCDC(ctx)
})
for _, topic := range c.topicNames {
c.tomb.Go(func() error {
ctx := c.tomb.Context(nil) //nolint:staticcheck // SA1012 tomb expects nil
c.startCDC(ctx, topic)
return nil
})
}

go func() {
<-c.tomb.Dead()
Expand Down Expand Up @@ -183,7 +192,7 @@ func (c *PubSubClient) login(ctx context.Context) error {
Str("instance_url", c.instanceURL).
Str("user_id", c.userID).
Str("org_id", c.orgID).
Str("topic", c.topicName).
Str("topic", strings.Join(c.topicNames, ",")).
Msg("successfully authenticated")

return nil
Expand All @@ -193,26 +202,28 @@ func (c *PubSubClient) login(ctx context.Context) error {
func (c *PubSubClient) canSubscribe(_ context.Context) error {
var trailer metadata.MD

req := &proto.TopicRequest{
TopicName: c.topicName,
}
for _, topic := range c.topicNames {
req := &proto.TopicRequest{
TopicName: topic,
}

ctx, cancel := context.WithTimeout(c.getAuthContext(), GRPCCallTimeout)
defer cancel()
ctx, cancel := context.WithTimeout(c.getAuthContext(), GRPCCallTimeout)
defer cancel()

resp, err := c.pubSubClient.GetTopic(ctx, req, grpc.Trailer(&trailer))
if err != nil {
return fmt.Errorf("failed to retrieve topic %q: %w", c.topicName, err)
}
resp, err := c.pubSubClient.GetTopic(ctx, req, grpc.Trailer(&trailer))
if err != nil {
return fmt.Errorf("failed to retrieve topic %q: %w", topic, err)
}

if !resp.CanSubscribe {
return fmt.Errorf("user %q not allowed to subscribe to %q", c.userID, resp.TopicName)
}
if !resp.CanSubscribe {
return fmt.Errorf("user %q not allowed to subscribe to %q", c.userID, resp.TopicName)
}

sdk.Logger(ctx).Debug().
Bool("can_subscribe", resp.CanSubscribe).
Str("topic_name", resp.TopicName).
Msgf("client allowed to subscribe to events on %q", c.topicName)
sdk.Logger(ctx).Debug().
Bool("can_subscribe", resp.CanSubscribe).
Str("topic_name", resp.TopicName).
Msgf("client allowed to subscribe to events on %q", topic)
}

return nil
}
Expand All @@ -239,7 +250,9 @@ func (c *PubSubClient) Stop(ctx context.Context) {
_ = c.tomb.Killf("cdc iterator is stopping")

sdk.Logger(ctx).Debug().
Msgf("stopping pubsub client on topic %q", c.topicName)
Msgf("stopping pubsub client on topics %q", strings.Join(c.topicNames, ","))

c.topicNames = nil
}

func (c *PubSubClient) Wait(ctx context.Context) error {
Expand All @@ -262,10 +275,6 @@ func (c *PubSubClient) ResetRetryCount() {
c.retryCount = c.maxRetries
}

func (c *PubSubClient) ReplayID() []byte {
return c.currReplayID
}

func (c *PubSubClient) retryAuth(ctx context.Context, retry bool) (bool, error) {
var err error
sdk.Logger(ctx).Info().Msgf("retry connection - retries remaining %d ", c.retryCount)
Expand All @@ -291,8 +300,11 @@ func (c *PubSubClient) retryAuth(ctx context.Context, retry bool) (bool, error)
return retry, nil
}

func (c *PubSubClient) startCDC(ctx context.Context) error {
sdk.Logger(ctx).Info().Msg("starting CDC processing..")
func (c *PubSubClient) startCDC(ctx context.Context, topic string) error {
sdk.Logger(ctx).Info().
Str("topic", topic).
Str("replayID", string(c.sdkPos.GetTopicReplayID(topic))).
Msg("starting CDC processing..")

var (
retry bool
Expand All @@ -318,22 +330,33 @@ func (c *PubSubClient) startCDC(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case <-c.ticker.C: // detect changes every polling period.

replayID := c.sdkPos.GetTopicReplayID(topic)

sdk.Logger(ctx).Debug().
Dur("elapsed", time.Since(lastRecvdAt)).
Str("topic", topic).
Str("replayID", base64.StdEncoding.EncodeToString(replayID)).
Msg("attempting to receive new events")

lastRecvdAt = time.Now().UTC()

events, err := c.Recv(ctx)
events, err := c.Recv(ctx, topic, replayID)
if err != nil {
if c.invalidReplayIDErr(err) {
sdk.Logger(ctx).Error().Err(err).Msgf("replay id %s is invalid, retrying", string(c.currReplayID))
c.currReplayID = nil
sdk.Logger(ctx).Error().Err(err).
Str("topic", topic).
Str("replayID", string(replayID)).
Msgf("replay id %s is invalid, retrying", string(replayID))
c.sdkPos.SetTopicReplayID(topic, nil)
break
}

if c.retryCount > 0 {
sdk.Logger(ctx).Error().Err(err).Msg("retrying authentication")
sdk.Logger(ctx).Error().Err(err).
Str("topic", topic).
Str("replayID", string(replayID)).
Msg("retrying authentication")
retry = true
break
}
Expand All @@ -344,11 +367,18 @@ func (c *PubSubClient) startCDC(ctx context.Context) error {
sdk.Logger(ctx).Debug().
Int("events", len(events)).
Dur("elapsed", time.Since(lastRecvdAt)).
Str("topic", topic).
Msg("received events")

for _, e := range events {
c.sdkPos.SetTopicReplayID(e.Topic, e.ReplayID)
c.buffer <- c.buildRecord(e)
c.currReplayID = e.ReplayID
sdk.Logger(ctx).Debug().
Int("events", len(events)).
Dur("elapsed", time.Since(lastRecvdAt)).
Str("topic", e.Topic).
Str("replayID", base64.StdEncoding.EncodeToString(e.ReplayID)).
Msg("record sent to buffer")
}
}
}
Expand All @@ -365,10 +395,14 @@ func (*PubSubClient) connErr(err error) bool {
}

func (c *PubSubClient) buildRecord(event ConnectResponseEvent) sdk.Record {

// TODO - ADD something here to distinguish creates, deletes, updates.

return sdk.Util.Source.NewRecordCreate(
sdk.Position(event.ReplayID),
sdk.Metadata{},
c.sdkPos.ToSDKPosition(),
sdk.Metadata{
"opencdc.collection": event.Topic,
},
sdk.StructuredData{
"replayId": event.ReplayID,
"id": event.EventID,
Expand Down Expand Up @@ -417,23 +451,24 @@ func (c *PubSubClient) Subscribe(
ctx context.Context,
replayPreset proto.ReplayPreset,
replayID []byte,
topic string,
) (proto.PubSub_SubscribeClient, error) {
start := time.Now().UTC()

subscribeClient, err := c.pubSubClient.Subscribe(c.getAuthContext())
if err != nil {
return nil, fmt.Errorf("failed to subscribe to topic %q: %w", c.topicName, err)
return nil, fmt.Errorf("failed to subscribe to topic %q: %w", topic, err)
}

sdk.Logger(ctx).Debug().
Str("replay_id", base64.StdEncoding.EncodeToString(replayID)).
Str("replay_preset", proto.ReplayPreset_name[int32(replayPreset)]).
Str("topic_name", c.topicName).
Str("topic_name", topic).
Dur("elapsed", time.Since(start)).
Msgf("subscribed to %q", c.topicName)
Msgf("subscribed to %q", topic)

initialFetchRequest := &proto.FetchRequest{
TopicName: c.topicName,
TopicName: topic,
ReplayPreset: replayPreset,
NumRequested: 1,
}
Expand All @@ -453,41 +488,41 @@ func (c *PubSubClient) Subscribe(
sdk.Logger(ctx).Debug().
Str("replay_id", base64.StdEncoding.EncodeToString(replayID)).
Str("replay_preset", proto.ReplayPreset_name[int32(replayPreset)]).
Str("topic_name", c.topicName).
Str("topic_name", topic).
Dur("elapsed", time.Since(start)).
Msg("first request sent")

return subscribeClient, nil
}

func (c *PubSubClient) Recv(ctx context.Context) ([]ConnectResponseEvent, error) {
func (c *PubSubClient) Recv(ctx context.Context, topic string, replayID []byte) ([]ConnectResponseEvent, error) {
var (
replayID []byte
preset = c.replayPreset
start = time.Now().UTC()
preset = c.replayPreset
start = time.Now().UTC()
)

if len(c.currReplayID) > 0 {
if len(replayID) > 0 {
preset = proto.ReplayPreset_CUSTOM
replayID = c.currReplayID
}

sdk.Logger(ctx).Info().
Str("preset", preset.String()).
Str("replay_id", base64.StdEncoding.EncodeToString(replayID)).
Str("topic", topic).
Msg("preparing to subscribe")

subClient, err := c.Subscribe(ctx, preset, replayID)
subClient, err := c.Subscribe(ctx, preset, replayID, topic)
if err != nil {
return nil, fmt.Errorf("error subscribing to topic on custom replay id %q: %w",
base64.StdEncoding.EncodeToString(c.currReplayID),
base64.StdEncoding.EncodeToString(replayID),
err,
)
}

sdk.Logger(ctx).Info().
Str("preset", preset.String()).
Str("replay_id", base64.StdEncoding.EncodeToString(replayID)).
Str("topic", topic).
Dur("elapsed", time.Since(start)).
Msg("preparing to receive events")

Expand All @@ -500,6 +535,7 @@ func (c *PubSubClient) Recv(ctx context.Context) ([]ConnectResponseEvent, error)
sdk.Logger(ctx).Warn().
Str("preset", preset.String()).
Str("replay_id", base64.StdEncoding.EncodeToString(replayID)).
Str("topic", topic).
Dur("elapsed", time.Since(start)).
Err(err).
Msg("error while receiving events - retrying to connect")
Expand All @@ -511,6 +547,7 @@ func (c *PubSubClient) Recv(ctx context.Context) ([]ConnectResponseEvent, error)
sdk.Logger(ctx).Info().
Str("preset", preset.String()).
Str("replay_id", base64.StdEncoding.EncodeToString(replayID)).
Str("topic", topic).
Int("events", len(resp.Events)).
Dur("elapsed", time.Since(start)).
Msg("subscriber received events")
Expand All @@ -534,9 +571,11 @@ func (c *PubSubClient) Recv(ctx context.Context) ([]ConnectResponseEvent, error)
}

events = append(events, ConnectResponseEvent{
ReplayID: e.ReplayId,
EventID: e.Event.Id,
Data: flattenUnionFields(ctx, payload, c.unionFields[e.Event.SchemaId]),
ReplayID: e.ReplayId,
EventID: e.Event.Id,
Data: flattenUnionFields(ctx, payload, c.unionFields[e.Event.SchemaId]),
Topic: topic,
ReceivedAt: time.Now(),
})
}

Expand Down
6 changes: 3 additions & 3 deletions source_pubsub/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Config struct {
OAuthEndpoint string `json:"oauthEndpoint" validate:"required"`

// TopicName is the topic the source connector will subscribe to
TopicName string `json:"topicName" validate:"required"`
TopicNames []string `json:"topicNames" validate:"required"`

// Deprecated: Username is the client secret from the salesforce app.
Username string `json:"username"`
Expand Down Expand Up @@ -76,8 +76,8 @@ func (c Config) Validate() error {
}
}

if c.TopicName == "" {
errs = append(errs, fmt.Errorf("invalid topic name %q", c.TopicName))
if len(c.TopicNames) == 0 {
errs = append(errs, fmt.Errorf("invalid topic name %q", c.TopicNames))
}

if c.PollingPeriod == 0 {
Expand Down
4 changes: 2 additions & 2 deletions source_pubsub/paramgen_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit ab49ca8

Please sign in to comment.