-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Salesforce Destination Using Pubsub #146
base: refactor_pubsub_client
Are you sure you want to change the base?
Conversation
5da8a78
to
8a7b2ee
Compare
c326cf2
to
55fea9d
Compare
cmd/connector/main.go
Outdated
@@ -16,14 +16,16 @@ package main | |||
|
|||
import ( | |||
sf "github.com/conduitio-labs/conduit-connector-salesforce" | |||
sfDesinationPubsub "github.com/conduitio-labs/conduit-connector-salesforce/destination" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you don't need to override the imports.
sfDesinationPubsub "github.com/conduitio-labs/conduit-connector-salesforce/destination" | |
"github.com/conduitio-labs/conduit-connector-salesforce/destination" |
cmd/connector/main.go
Outdated
sdk "github.com/conduitio/conduit-connector-sdk" | ||
) | ||
|
||
func main() { | ||
sdk.Serve(sdk.Connector{ | ||
NewSpecification: sf.Specification, | ||
NewSource: sfSourcePubsub.NewSource, | ||
NewDestination: nil, | ||
NewDestination: sfDesinationPubsub.NewDestination, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NewDestination: sfDesinationPubsub.NewDestination, | |
NewDestination: destination.NewDestination, |
config.Config | ||
|
||
// Topic is Salesforce event or topic to write record | ||
TopicName string `json:"topicName"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TopicName string `json:"topicName"` | |
TopicNames string `json:"topicNames"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now we only allow one topic for destination, we can add multi event/topic like we did for source later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can have the name topicNames
now, and only support one name, so we don't have to deprecate another parameter later.
pubsub/client.go
Outdated
for _, r := range records { | ||
data := r.Payload.After | ||
|
||
var dataMap map[string]interface{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to do this, r.Payload.After
needs to be opencdc.StructuredData
which is essentially map[string]any
and already serialized to go types. Doing this JSON unmarshal from bytes all kind of bad things can happen.
pubsub/mock_pub_sub_client_test.go
Outdated
@@ -0,0 +1,480 @@ | |||
// Code generated by mockery. DO NOT EDIT. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This mock and the subscribe mock in this folder are the same as the ones in pubsub/source, so likely you can remove the pubsub/source folder
func (c Config) Validate(_ context.Context) (Config, error) { | ||
var errs []error | ||
|
||
if len(c.TopicName) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can add the "required" validation to the config instead
if err := sdk.Util.ParseConfig( | ||
ctx, | ||
cfg, | ||
&c, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can use &d.config
here instead, no need for c
@@ -100,25 +101,25 @@ func (a oauth) Login() (*LoginResponse, error) { | |||
strings.NewReader(body.Encode()), | |||
) | |||
if err != nil { | |||
return nil, fmt.Errorf("failed to make login req: %w", err) | |||
return nil, errors.Errorf("failed to make login req: %s", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be missing something here, but why are we using %s
instead of wrapping the error with %w
?
events = append(events, &event) | ||
} | ||
|
||
publishRequest := eventbusv1.PublishRequest{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that this method would publish some events and then fail, so some are published and some are not?
Description
Per customers request, we are adding a destination for single event publishing using pubsub api. The code is also refactor so that destination can also use pubsub client we utilized on source.
Fixes # (https://github.com/meroxa/mdpx/issues/2958)
Quick checks: