diff --git a/destination/destination.go b/destination/destination.go index 2f741b3..4b24fbd 100644 --- a/destination/destination.go +++ b/destination/destination.go @@ -17,7 +17,7 @@ var _ client = (*pubsub.PubSubClient)(nil) type client interface { Stop(context.Context) Close(context.Context) error - InitializeWrite(context.Context, string) error + Initialize(context.Context, []string) error Publish(context.Context, []opencdc.Record) error } @@ -64,7 +64,7 @@ func (d *Destination) Open(ctx context.Context) error { return fmt.Errorf("could not create GRPCClient: %w", err) } - if err := client.InitializeWrite(ctx, d.config.TopicName); err != nil { + if err := client.Initialize(ctx, []string{d.config.TopicName}); err != nil { return fmt.Errorf("could not initialize pubsub client: %w", err) } diff --git a/pubsub/client.go b/pubsub/client.go index 2422964..d869674 100644 --- a/pubsub/client.go +++ b/pubsub/client.go @@ -130,8 +130,9 @@ func NewGRPCClient(ctx context.Context, config config.Config) (*PubSubClient, er }, nil } -func (c *PubSubClient) InitializeWrite(ctx context.Context, topic string) error { - c.topicNames = append(c.topicNames, topic) +// Initializes the pubsub client by authenticating for source and destination. +func (c *PubSubClient) Initialize(ctx context.Context, topics []string) error { + c.topicNames = topics if err := c.login(ctx); err != nil { return err @@ -144,8 +145,8 @@ func (c *PubSubClient) InitializeWrite(ctx context.Context, topic string) error return nil } -// Initializes the pubsub client by authenticating and starting cdc routine. -func (c *PubSubClient) InitializeCDC(ctx context.Context, replay string, currentPos position.Topics, topics []string, fetch time.Duration) error { +// Start CDC Routine for Source. +func (c *PubSubClient) StartCDC(ctx context.Context, replay string, currentPos position.Topics, topics []string, fetch time.Duration) error { sdk.Logger(ctx).Info().Msgf("Initizalizing PubSub client for source cdc") if replay == "latest" { @@ -154,22 +155,12 @@ func (c *PubSubClient) InitializeCDC(ctx context.Context, replay string, current c.replayPreset = eventbusv1.ReplayPreset_EARLIEST } - // set topics on source + // set topics and position on source currentPos.SetTopics(topics) c.currentPos = currentPos - c.topicNames = topics - // set fetch for ticket c.fetchInterval = fetch - if err := c.login(ctx); err != nil { - return err - } - - if err := c.canAccessTopic(ctx, subscribe); err != nil { - return err - } - stopCtx, cancel := context.WithCancel(ctx) c.stop = cancel @@ -297,12 +288,16 @@ func (c *PubSubClient) GetSchema(schemaID string) (*eventbusv1.SchemaInfo, error func (c *PubSubClient) Publish(ctx context.Context, records []opencdc.Record) error { var events []*eventbusv1.ProducerEvent - //TODO - refactor the way we access the topic + // TODO - refactor the way we access the topic topic, err := c.GetTopic(c.topicNames[0]) if err != nil { return fmt.Errorf("error on publish, cannot retrieve topic %s : %s", c.topicNames[0], err) } + sdk.Logger(ctx).Info(). + Str("topic", topic.TopicName). + Msg("Started publishing event") + codec, err := c.fetchCodec(ctx, topic.SchemaId) if err != nil { return err @@ -316,7 +311,7 @@ func (c *PubSubClient) Publish(ctx context.Context, records []opencdc.Record) er return fmt.Errorf("error marshaling data: %s", err) } - avroPrepared, err := validateAndPreparePayload(dataMap, codec.Schema()) + avroPrepared, err := validateAndPreparePayload(dataMap, codec.Schema(), c.unionFields[topic.SchemaId]) if err != nil { return fmt.Errorf("error validating and preparing avro data:%s", err) } @@ -338,12 +333,21 @@ func (c *PubSubClient) Publish(ctx context.Context, records []opencdc.Record) er Events: events, } + sdk.Logger(ctx).Info(). + Str("topic", topic.TopicName). + Str("number of events", string(len(events))). + Msg("Events created, attempting to publish") + resp, err := c.pubSubClient.Publish(c.getAuthContext(), &publishRequest) if err != nil { return fmt.Errorf("error on publishing event: %s", err) } - fmt.Println("Publish request sent!") - fmt.Println(resp) + + sdk.Logger(ctx).Info(). + Str("topic", topic.TopicName). + Str("resp", resp.String()). + Str("resp rpc id", resp.RpcId). + Msg("Events published") return nil } @@ -484,25 +488,16 @@ func (c *PubSubClient) Recv(ctx context.Context, topic string, replayID []byte) Str("event_id", e.Event.Id). Msg("decoding event") - fmt.Println("Event") - fmt.Println(string(e.Event.Payload)) - codec, err := c.fetchCodec(ctx, e.Event.SchemaId) if err != nil { return events, err } - fmt.Println("Code fetched") - fmt.Println(codec) - parsed, _, err := codec.NativeFromBinary(e.Event.Payload) if err != nil { return events, err } - fmt.Println("Parsed event") - fmt.Println(parsed) - payload, ok := parsed.(map[string]interface{}) if !ok { return events, fmt.Errorf("invalid payload type %T", payload) @@ -773,9 +768,6 @@ func (c *PubSubClient) canAccessTopic(ctx context.Context, accessLevel string) e return fmt.Errorf("user %q not allowed to subscribe to %q", c.userID, resp.TopicName) } - fmt.Println("CAN PUBLISH?") - fmt.Println(resp.CanPublish) - if accessLevel == publish && !resp.CanPublish { return fmt.Errorf("user %q not allowed to publish to %q", c.userID, resp.TopicName) } diff --git a/pubsub/utils.go b/pubsub/utils.go index 4e5f383..40208ca 100644 --- a/pubsub/utils.go +++ b/pubsub/utils.go @@ -68,7 +68,7 @@ func invalidReplayIDErr(err error) bool { return strings.Contains(strings.ToLower(err.Error()), "replay id validation failed") } -func validateAndPreparePayload(dataMap map[string]interface{}, avroSchema string) (map[string]interface{}, error) { +func validateAndPreparePayload(dataMap map[string]interface{}, avroSchema string, unionFields map[string]struct{}) (map[string]interface{}, error) { var schema map[string]interface{} if err := json.Unmarshal([]byte(avroSchema), &schema); err != nil { return nil, err @@ -86,32 +86,21 @@ func validateAndPreparePayload(dataMap map[string]interface{}, avroSchema string avroRecord[fieldName] = nil continue } - if isUnionType(fieldType) { - if value == nil { - avroRecord[fieldName] = nil - } else { - avroRecord[fieldName] = map[string]interface{}{ - "string": value, + if _, ok := unionFields[fieldName]; ok && value != nil { + switch t := fieldType.(type) { + case []interface{}: + for _, unionType := range t { + if unionType.(string) != "null" { + avroRecord[fieldName] = map[string]interface{}{ + unionType.(string): value, + } + } } } } else { - switch fieldType.(type) { - case string: - avroRecord[fieldName] = value - case []interface{}: - avroRecord[fieldName] = value - default: - avroRecord[fieldName] = value - } + avroRecord[fieldName] = value } } return avroRecord, nil } - -func isUnionType(fieldType interface{}) bool { - if types, ok := fieldType.([]interface{}); ok { - return len(types) > 1 - } - return false -} diff --git a/source/source.go b/source/source.go index 24cfd1c..3f4c728 100644 --- a/source/source.go +++ b/source/source.go @@ -30,7 +30,8 @@ import ( type client interface { Next(context.Context) (opencdc.Record, error) - InitializeCDC(context.Context, string, position.Topics, []string, time.Duration) error + Initialize(context.Context, []string) error + StartCDC(context.Context, string, position.Topics, []string, time.Duration) error Stop(context.Context) Close(context.Context) error Wait(context.Context) error @@ -95,7 +96,11 @@ func (s *Source) Open(ctx context.Context, sdkPos opencdc.Position) error { return fmt.Errorf("could not create GRPCClient: %w", err) } - if err := client.InitializeCDC(ctx, s.config.ReplayPreset, parsedPositions, s.config.TopicNames, s.config.PollingPeriod); err != nil { + if err := client.Initialize(ctx, s.config.TopicNames); err != nil { + return fmt.Errorf("could not initialize pubsub client: %w", err) + } + + if err := client.StartCDC(ctx, s.config.ReplayPreset, parsedPositions, s.config.TopicNames, s.config.PollingPeriod); err != nil { return fmt.Errorf("could not initialize pubsub client: %w", err) }