Skip to content

Commit

Permalink
Change fmt.errors to errors, modify validate on dest
Browse files Browse the repository at this point in the history
  • Loading branch information
anna-cross committed Oct 23, 2024
1 parent ac17f17 commit c75330f
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 87 deletions.
8 changes: 4 additions & 4 deletions cmd/connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ package main

import (
sf "github.com/conduitio-labs/conduit-connector-salesforce"
sfDesinationPubsub "github.com/conduitio-labs/conduit-connector-salesforce/destination"
sfSourcePubsub "github.com/conduitio-labs/conduit-connector-salesforce/source"
"github.com/conduitio-labs/conduit-connector-salesforce/destination"
"github.com/conduitio-labs/conduit-connector-salesforce/source"

sdk "github.com/conduitio/conduit-connector-sdk"
)

func main() {
sdk.Serve(sdk.Connector{
NewSpecification: sf.Specification,
NewSource: sfSourcePubsub.NewSource,
NewDestination: sfDesinationPubsub.NewDestination,
NewSource: source.NewSource,
NewDestination: destination.NewDestination,
})
}
14 changes: 7 additions & 7 deletions destination/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package destination

import (
"context"
"fmt"

pubsub "github.com/conduitio-labs/conduit-connector-salesforce/pubsub"
"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/pkg/errors"
)

var _ client = (*pubsub.Client)(nil)
Expand Down Expand Up @@ -42,12 +42,12 @@ func (d *Destination) Configure(ctx context.Context, cfg config.Config) error {
&c,
NewDestination().Parameters(),
); err != nil {
return fmt.Errorf("failed to parse config: %w", err)
return errors.Errorf("failed to parse config: %s", err)
}

c, err := c.Validate(ctx)
if err != nil {
return fmt.Errorf("config failed to validate: %w", err)
return errors.Errorf("config failed to validate: %s", err)
}

d.config = c
Expand All @@ -59,11 +59,11 @@ func (d *Destination) Open(ctx context.Context) error {

client, err := pubsub.NewGRPCClient(ctx, d.config.Config)
if err != nil {
return fmt.Errorf("could not create GRPCClient: %w", err)
return errors.Errorf("could not create GRPCClient: %s", err)
}

if err := client.Initialize(ctx, []string{d.config.TopicName}); err != nil {
return fmt.Errorf("could not initialize pubsub client: %w", err)
return errors.Errorf("could not initialize pubsub client: %s", err)
}

d.client = client
Expand All @@ -78,7 +78,7 @@ func (d *Destination) Open(ctx context.Context) error {

func (d *Destination) Write(ctx context.Context, rr []opencdc.Record) (int, error) {
if err := d.client.Publish(ctx, rr); err != nil {
return 0, fmt.Errorf("failed to publish records : %w", err)
return 0, errors.Errorf("failed to publish records : %s", err)
}

return len(rr), nil
Expand All @@ -92,7 +92,7 @@ func (d *Destination) Teardown(ctx context.Context) error {
d.client.Stop(ctx)

if err := d.client.Close(ctx); err != nil {
return fmt.Errorf("error when closing subscriber conn: %w", err)
return errors.Errorf("error when closing subscriber conn: %s", err)
}

return nil
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/sagikazarmark/locafero v0.6.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ github.com/otiai10/mint v1.3.0/go.mod h1:F5AjcsTsWUqX+Na9fpHb52P8pcRX2CI6A3ctIT9
github.com/otiai10/mint v1.3.1/go.mod h1:/yxELlJQ0ufhjUwhshSj+wFjZ78CnZ48/1wtmBH1OTc=
github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M=
github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
Expand Down
80 changes: 38 additions & 42 deletions pubsub/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@ package pubsub
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"strings"
"sync"
"time"

"github.com/pkg/errors"

rt "github.com/avast/retry-go/v4"
config "github.com/conduitio-labs/conduit-connector-salesforce/config"
eventbusv1 "github.com/conduitio-labs/conduit-connector-salesforce/proto/eventbus/v1"
Expand Down Expand Up @@ -110,7 +109,7 @@ func NewGRPCClient(ctx context.Context, config config.Config) (*Client, error) {

conn, err := grpc.NewClient(config.PubsubAddress, dialOpts...)
if err != nil {
return nil, fmt.Errorf("gRPC dial: %w", err)
return nil, errors.Errorf("gRPC dial: %s", err)
}

creds, err := NewCredentials(config.ClientID, config.ClientSecret, config.OAuthEndpoint)
Expand All @@ -125,7 +124,7 @@ func NewGRPCClient(ctx context.Context, config config.Config) (*Client, error) {
unionFields: make(map[string]map[string]struct{}),
replayPreset: replayPreset,
buffer: make(chan ConnectResponseEvent),
oauth: &OAuth{Credentials: creds},
oauth: &oauth{Credentials: creds},
maxRetries: config.RetryCount,
}, nil
}
Expand Down Expand Up @@ -191,11 +190,11 @@ func (c *Client) StartCDC(ctx context.Context, replay string, currentPos positio
func (c *Client) Next(ctx context.Context) (opencdc.Record, error) {
select {
case <-ctx.Done():
return opencdc.Record{}, fmt.Errorf("next: context done: %w", ctx.Err())
return opencdc.Record{}, errors.Errorf("next: context done: %s", ctx.Err())
case event, ok := <-c.buffer:
if !ok {
if err := c.tomb.Err(); err != nil {
return opencdc.Record{}, fmt.Errorf("tomb exited: %w", err)
return opencdc.Record{}, errors.Errorf("tomb exited: %s", err)
}
return opencdc.Record{}, ErrEndOfRecords
}
Expand Down Expand Up @@ -256,12 +255,12 @@ func (c *Client) GetTopic(topic string) (*eventbusv1.TopicInfo, error) {

resp, err := c.pubSubClient.GetTopic(ctx, req, grpc.Trailer(&trailer))
if err != nil {
return nil, fmt.Errorf("failed to retrieve topic %q: %w", topic, err)
return nil, errors.Errorf("failed to retrieve topic %q: %s", topic, err)
}
return resp, nil
}

// Wrapper function around the GetSchema RPC. This will add the OAuth credentials and make a call to fetch data about a specific schema.
// Wrapper function around the GetSchema RPC. This will add the oauth credentials and make a call to fetch data about a specific schema.
func (c *Client) GetSchema(schemaID string) (*eventbusv1.SchemaInfo, error) {
var trailer metadata.MD

Expand All @@ -280,7 +279,7 @@ func (c *Client) GetSchema(schemaID string) (*eventbusv1.SchemaInfo, error) {
Fields(trailer).
Msg("retrieved schema")
if err != nil {
return nil, fmt.Errorf("error getting schema from salesforce api - %s", err)
return nil, errors.Errorf("error getting schema from salesforce api - %s", err)
}

return resp, nil
Expand All @@ -291,7 +290,7 @@ func (c *Client) Publish(ctx context.Context, records []opencdc.Record) error {
// TODO - refactor the way we access the topic
topic, err := c.GetTopic(c.topicNames[0])
if err != nil {
return fmt.Errorf("error on publish, cannot retrieve topic %s : %s", c.topicNames[0], err)
return errors.Errorf("error on publish, cannot retrieve topic %s : %s", c.topicNames[0], err)
}

sdk.Logger(ctx).Info().
Expand All @@ -304,21 +303,18 @@ func (c *Client) Publish(ctx context.Context, records []opencdc.Record) error {
}

for _, r := range records {
data := r.Payload.After

var dataMap map[string]interface{}
if err := json.Unmarshal(data.Bytes(), &dataMap); err != nil {
return fmt.Errorf("error marshaling data: %s", err)
data, err := extractPayload(r.Operation, r.Payload)
if err != nil {
return errors.Errorf("failed to extract payload data: %s", err)
}

avroPrepared, err := validateAndPreparePayload(dataMap, codec.Schema())
avroPrepared, err := validateAndPreparePayload(data, codec.Schema())
if err != nil {
return fmt.Errorf("error validating and preparing avro data:%s", err)
return errors.Errorf("error validating and preparing avro data:%s", err)
}

avroEncoded, err := codec.BinaryFromNative(nil, avroPrepared)
if err != nil {
return fmt.Errorf("error encoding data to avro: %s", err)
return errors.Errorf("error encoding data to avro: %s", err)
}

event := eventbusv1.ProducerEvent{
Expand All @@ -340,7 +336,7 @@ func (c *Client) Publish(ctx context.Context, records []opencdc.Record) error {

resp, err := c.pubSubClient.Publish(c.getAuthContext(), &publishRequest)
if err != nil {
return fmt.Errorf("error on publishing event: %s", err)
return errors.Errorf("error on publishing event: %s", err)
}

sdk.Logger(ctx).Info().
Expand All @@ -352,7 +348,7 @@ func (c *Client) Publish(ctx context.Context, records []opencdc.Record) error {
return nil
}

// Wrapper function around the Subscribe RPC. This will add the OAuth credentials and create a separate streaming client that will be used to,
// Wrapper function around the Subscribe RPC. This will add the oauth credentials and create a separate streaming client that will be used to,
// fetch data from the topic. This method will continuously consume messages unless an error occurs; if an error does occur then this method will,
// return the last successfully consumed replayID as well as the error message. If no messages were successfully consumed then this method will return,
// the same replayID that it originally received as a parameter.
Expand All @@ -370,7 +366,7 @@ func (c *Client) Subscribe(
Str("topic", topic).
Str("replayID", string(replayID)).
Msg("failed to subscribe to topic")
return nil, fmt.Errorf("failed to subscribe to topic %q: %w", topic, err)
return nil, errors.Errorf("failed to subscribe to topic %q: %s", topic, err)
}

sdk.Logger(ctx).Debug().
Expand All @@ -392,10 +388,10 @@ func (c *Client) Subscribe(

if err := subscribeClient.Send(initialFetchRequest); err != nil {
if errors.Is(err, io.EOF) {
return nil, fmt.Errorf("received EOF on initial fetch: %w", err)
return nil, errors.Errorf("received EOF on initial fetch: %s", err)
}

return nil, fmt.Errorf("initial fetch request failed: %w", err)
return nil, errors.Errorf("initial fetch request failed: %s", err)
}

sdk.Logger(ctx).Debug().
Expand Down Expand Up @@ -428,7 +424,7 @@ func (c *Client) Recv(ctx context.Context, topic string, replayID []byte) ([]Con

subClient, err := c.Subscribe(ctx, preset, replayID, topic)
if err != nil {
return nil, fmt.Errorf("error subscribing to topic on custom replay id %q: %w",
return nil, errors.Errorf("error subscribing to topic on custom replay id %q: %s",
base64.StdEncoding.EncodeToString(replayID),
err,
)
Expand All @@ -451,7 +447,7 @@ func (c *Client) Recv(ctx context.Context, topic string, replayID []byte) ([]Con
logger.Error().Fields(subClient.Trailer()).Msg("recv error")

if errors.Is(err, io.EOF) {
return nil, fmt.Errorf("pubsub: stream closed when receiving events: %w", err)
return nil, errors.Errorf("pubsub: stream closed when receiving events: %s", err)
}
if connErr(err) {
logger.Warn().
Expand Down Expand Up @@ -500,7 +496,7 @@ func (c *Client) Recv(ctx context.Context, topic string, replayID []byte) ([]Con

payload, ok := parsed.(map[string]interface{})
if !ok {
return events, fmt.Errorf("invalid payload type %T", payload)
return events, errors.Errorf("invalid payload type %T", payload)
}

logger.Trace().Fields(payload).Msg("decoded event")
Expand Down Expand Up @@ -533,7 +529,7 @@ func (c *Client) fetchCodec(ctx context.Context, schemaID string) (*goavro.Codec

schema, err := c.GetSchema(schemaID)
if err != nil {
return nil, fmt.Errorf("error making getschema request for uncached schema - %s", err)
return nil, errors.Errorf("error making getschema request for uncached schema - %s", err)
}

schemaJSON := schema.GetSchemaJson()
Expand All @@ -542,14 +538,14 @@ func (c *Client) fetchCodec(ctx context.Context, schemaID string) (*goavro.Codec

codec, err = goavro.NewCodec(schemaJSON)
if err != nil {
return nil, fmt.Errorf("error creating codec from uncached schema - %s", err)
return nil, errors.Errorf("error creating codec from uncached schema - %s", err)
}

c.codecCache[schemaID] = codec

unionFields, err := parseUnionFields(ctx, schemaJSON)
if err != nil {
return nil, fmt.Errorf("error parsing union fields from schema: %w", err)
return nil, errors.Errorf("error parsing union fields from schema: %s", err)
}

c.unionFields[schemaID] = unionFields
Expand Down Expand Up @@ -580,19 +576,19 @@ func (c *Client) retryAuth(ctx context.Context, retry bool, topic Topic) (bool,
topic.retryCount--

if err = c.login(ctx); err != nil && topic.retryCount <= 0 {
return retry, topic, fmt.Errorf("failed to refresh auth: %w", err)
return retry, topic, errors.Errorf("failed to refresh auth: %s", err)
} else if err != nil {
sdk.Logger(ctx).Info().Msgf("received error on login for topic %s - retry - %d : %v ", topic.topicName, topic.retryCount, err)
retry = true
return retry, topic, fmt.Errorf("received error on subscribe for topic %s - retry - %d : %w", topic.topicName, topic.retryCount, err)
return retry, topic, errors.Errorf("received error on subscribe for topic %s - retry - %d : %s", topic.topicName, topic.retryCount, err)
}

if err := c.canAccessTopic(ctx, subscribe); err != nil && topic.retryCount <= 0 {
return retry, topic, fmt.Errorf("failed to subscribe to client topic %s: %w", topic.topicName, err)
return retry, topic, errors.Errorf("failed to subscribe to client topic %s: %s", topic.topicName, err)
} else if err != nil {
sdk.Logger(ctx).Info().Msgf("received error on subscribe for topic %s - retry - %d : %v", topic.topicName, topic.retryCount, err)
retry = true
return retry, topic, fmt.Errorf("received error on subscribe for topic %s - retry - %d : %w ", topic.topicName, topic.retryCount, err)
return retry, topic, errors.Errorf("received error on subscribe for topic %s - retry - %d : %s ", topic.topicName, topic.retryCount, err)
}

retry = false
Expand Down Expand Up @@ -632,7 +628,7 @@ func (c *Client) startCDC(ctx context.Context, topic Topic) error {
rt.Attempts(topic.retryCount),
)
if err != nil {
return fmt.Errorf("error retrying (number of retries %d) for topic %s auth - %s", topic.retryCount, topic.topicName, err)
return errors.Errorf("error retrying (number of retries %d) for topic %s auth - %s", topic.retryCount, topic.topicName, err)
}
// once we are done with retries, reset the count
topic.retryCount = c.maxRetries
Expand Down Expand Up @@ -670,7 +666,7 @@ func (c *Client) startCDC(ctx context.Context, topic Topic) error {
break
}

return fmt.Errorf("error recv events: %w", err)
return errors.Errorf("error recv events: %s", err)
}

if len(events) == 0 {
Expand Down Expand Up @@ -701,7 +697,7 @@ func (c *Client) buildRecord(event ConnectResponseEvent) (opencdc.Record, error)
// TODO - ADD something here to distinguish creates, deletes, updates.
err := c.currentPos.SetTopicReplayID(event.Topic, event.ReplayID)
if err != nil {
return opencdc.Record{}, fmt.Errorf("err setting replay id %s on an event for topic %s : %w", event.ReplayID, event.Topic, err)
return opencdc.Record{}, errors.Errorf("err setting replay id %s on an event for topic %s : %s", event.ReplayID, event.Topic, err)
}

sdk.Logger(context.Background()).Debug().
Expand Down Expand Up @@ -753,22 +749,22 @@ func (c *Client) login(ctx context.Context) error {
return nil
}

// Wrapper function around the GetTopic RPC. This will add the OAuth credentials and make a call to fetch data about a specific topic.
// Wrapper function around the GetTopic RPC. This will add the oauth credentials and make a call to fetch data about a specific topic.
func (c *Client) canAccessTopic(ctx context.Context, accessLevel string) error {
logger := sdk.Logger(ctx).With().Str("at", "client.canAccessTopic").Logger()

for _, topic := range c.topicNames {
resp, err := c.GetTopic(topic)
if err != nil {
return fmt.Errorf(" error retrieving info on topic %s : %s ", topic, err)
return errors.Errorf(" error retrieving info on topic %s : %s ", topic, err)
}

if accessLevel == subscribe && !resp.CanSubscribe {
return fmt.Errorf("user %q not allowed to subscribe to %q", c.userID, resp.TopicName)
return errors.Errorf("user %q not allowed to subscribe to %q", c.userID, resp.TopicName)
}

if accessLevel == publish && !resp.CanPublish {
return fmt.Errorf("user %q not allowed to publish to %q", c.userID, resp.TopicName)
return errors.Errorf("user %q not allowed to publish to %q", c.userID, resp.TopicName)
}

logger.Debug().
Expand Down
Loading

0 comments on commit c75330f

Please sign in to comment.