Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source multiple topics #120

Merged
merged 13 commits into from
Sep 10, 2024
Merged

Source multiple topics #120

merged 13 commits into from
Sep 10, 2024

Conversation

anna-cross
Copy link
Contributor

@anna-cross anna-cross commented Jul 17, 2024

Description

We want to add a new connector feature on salesforce source, allowing client to subscribe to multiple topics at once.
The topics subscribe and pull data parallel to each other, but push to same destination. We don't enforce data structure on the source topics, so the destination should be able to accept data with different record structure (this works for redpanda destination but will not work for something like snowflake destination)

Fixes # (issue)

https://github.com/meroxa/mdpx/issues/2515

Quick checks:

  • I have followed the Code Guidelines.
  • There is no other pull request for the same update/change.
  • I have written unit tests.
  • I have made sure that the PR is of reasonable size and can be easily reviewed.

@anna-cross anna-cross changed the base branch from main to new-dest-new-source July 17, 2024 20:01
Comment on lines 157 to 158
c.startCDC(ctx, topic)
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anna-cross is it intended to not return the result of c.startCDC()? if so, what's the context?

My understanding is that this is anyways within the func() passed in above, so it shouldn't break the for loop

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woops yes! I'll move it back into return

@@ -45,7 +45,7 @@ func Test_Read(t *testing.T) {
ClientID: "test-client-id",
ClientSecret: "test-client-secret",
OAuthEndpoint: "https://somewhere",
TopicName: "/events/TestEvent__e",
TopicNames: []string{"/events/TestEvent__e"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice. would be great to also add a test case here with 2 topics - thought, you might already be on this since it's in draft mode

@anna-cross anna-cross marked this pull request as ready for review July 18, 2024 18:32
Comment on lines 106 to 107
// setting topic name as collection
r.Metadata.SetCollection(s.config.TopicName)
// r.Metadata.SetCollection(s.config.TopicName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we remove this?

source_pubsub/client.go Outdated Show resolved Hide resolved
topicEvent := p.Topics[topic]
topicEvent.ReplayID = replayID
topicEvent.ReadTime = time.Now()
p.Topics[topic] = topicEvent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
p.Topics[topic] = topicEvent
p.Topics[topic] = TopicPosition{
ReplayID: replayID,
ReadTime: time.Now().UTC(),
}

Is the equivalent.

p.Topics[topic] = topicEvent
} else {
// should never be even reaching this point, something went wrong if we do
panic(fmt.Errorf("attempting to set replay id - %b on topic %s, topic doesn't exist on position", replayID, topic))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return an error here, panic is reserved for some rare cases.

source_pubsub/source.go Outdated Show resolved Hide resolved
if err != nil {
// if using old config and the position isnt parsable
// assume its in the old position format
if len(s.config.TopicName) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the alternative is if s.config.TopicName == "" { return err.. }

Str("topics", strings.Join(s.config.TopicNames, ",")).
Msg("Open Source Connector")

parsedPositions, err = position.ParseSDKPosition(sdkPos)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you push this logic in ParseSDKPosition such that position.ParseSDKPosition(sdkPos, s.config.TopicNames) does the conversion internally.

source_pubsub/config.go Outdated Show resolved Hide resolved
source_pubsub/config.go Outdated Show resolved Hide resolved
source_pubsub/client.go Outdated Show resolved Hide resolved
source_pubsub/client.go Outdated Show resolved Hide resolved
@@ -78,18 +78,10 @@ func (s *Source) Open(ctx context.Context, sdkPos sdk.Position) error {
Str("topics", strings.Join(s.config.TopicNames, ",")).
Msg("Open Source Connector")

parsedPositions, err = position.ParseSDKPosition(sdkPos)
parsedPositions, err = position.ParseSDKPosition(sdkPos, s.config.TopicName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to pass all the topics here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only need to pass the old config to process the old config scenario, if multiple topics are being used we parse as normal

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anna-cross Oh right, but what if you have new config with old position?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean if they are updating the config to use new one? I dont believe we allow that at the moment, if a person is using old config and the pipeline is deployed they cant switch over to new one

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm referring to this in general. If you get the new config with an old position this may not work as expected?

The position which is passed to Open(..) is the last position committed to Conduit, if you start the connector and the position is upgraded but not committed, next time it will come up with the same position. While this may not happen with mdpx right now, it can turn into an edge case later is what I'm saying ;-)

source_pubsub/client.go Outdated Show resolved Hide resolved
source_pubsub/client.go Outdated Show resolved Hide resolved
@@ -65,24 +66,37 @@ type PubSubClient struct {
codecCache map[string]*goavro.Codec
unionFields map[string]map[string]struct{}

buffer chan sdk.Record
buffer chan ConnectResponseEvent
ticker *time.Ticker
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If each topic gets its own consumer, then each will need its own ticker. Since each ticker provides a channel and the channel is drained once per tick, it is unclear how many of the CDC consumers will actually execute a fetch.

@lyuboxa lyuboxa merged commit 362432a into new-dest-new-source Sep 10, 2024
2 checks passed
@lyuboxa lyuboxa deleted the source_multiple_topics branch September 10, 2024 00:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants