Skip to content

Commit

Permalink
Cleanup code
Browse files Browse the repository at this point in the history
  • Loading branch information
anna-cross committed Oct 22, 2024
1 parent 8a7b2ee commit 55fea9d
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 57 deletions.
4 changes: 2 additions & 2 deletions destination/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var _ client = (*pubsub.PubSubClient)(nil)
type client interface {
Stop(context.Context)
Close(context.Context) error
InitializeWrite(context.Context, string) error
Initialize(context.Context, []string) error
Publish(context.Context, []opencdc.Record) error
}

Expand Down Expand Up @@ -64,7 +64,7 @@ func (d *Destination) Open(ctx context.Context) error {
return fmt.Errorf("could not create GRPCClient: %w", err)
}

if err := client.InitializeWrite(ctx, d.config.TopicName); err != nil {
if err := client.Initialize(ctx, []string{d.config.TopicName}); err != nil {
return fmt.Errorf("could not initialize pubsub client: %w", err)
}

Expand Down
54 changes: 23 additions & 31 deletions pubsub/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ func NewGRPCClient(ctx context.Context, config config.Config) (*PubSubClient, er
}, nil
}

func (c *PubSubClient) InitializeWrite(ctx context.Context, topic string) error {
c.topicNames = append(c.topicNames, topic)
// Initializes the pubsub client by authenticating for source and destination.
func (c *PubSubClient) Initialize(ctx context.Context, topics []string) error {
c.topicNames = topics

if err := c.login(ctx); err != nil {
return err
Expand All @@ -144,8 +145,8 @@ func (c *PubSubClient) InitializeWrite(ctx context.Context, topic string) error
return nil
}

// Initializes the pubsub client by authenticating and starting cdc routine.
func (c *PubSubClient) InitializeCDC(ctx context.Context, replay string, currentPos position.Topics, topics []string, fetch time.Duration) error {
// Start CDC Routine for Source.
func (c *PubSubClient) StartCDC(ctx context.Context, replay string, currentPos position.Topics, topics []string, fetch time.Duration) error {
sdk.Logger(ctx).Info().Msgf("Initizalizing PubSub client for source cdc")

if replay == "latest" {
Expand All @@ -154,22 +155,12 @@ func (c *PubSubClient) InitializeCDC(ctx context.Context, replay string, current
c.replayPreset = eventbusv1.ReplayPreset_EARLIEST
}

// set topics on source
// set topics and position on source
currentPos.SetTopics(topics)
c.currentPos = currentPos
c.topicNames = topics

// set fetch for ticket
c.fetchInterval = fetch

if err := c.login(ctx); err != nil {
return err
}

if err := c.canAccessTopic(ctx, subscribe); err != nil {
return err
}

stopCtx, cancel := context.WithCancel(ctx)
c.stop = cancel

Expand Down Expand Up @@ -297,12 +288,16 @@ func (c *PubSubClient) GetSchema(schemaID string) (*eventbusv1.SchemaInfo, error

func (c *PubSubClient) Publish(ctx context.Context, records []opencdc.Record) error {
var events []*eventbusv1.ProducerEvent
//TODO - refactor the way we access the topic
// TODO - refactor the way we access the topic
topic, err := c.GetTopic(c.topicNames[0])
if err != nil {
return fmt.Errorf("error on publish, cannot retrieve topic %s : %s", c.topicNames[0], err)
}

sdk.Logger(ctx).Info().
Str("topic", topic.TopicName).
Msg("Started publishing event")

codec, err := c.fetchCodec(ctx, topic.SchemaId)
if err != nil {
return err
Expand All @@ -316,7 +311,7 @@ func (c *PubSubClient) Publish(ctx context.Context, records []opencdc.Record) er
return fmt.Errorf("error marshaling data: %s", err)
}

avroPrepared, err := validateAndPreparePayload(dataMap, codec.Schema())
avroPrepared, err := validateAndPreparePayload(dataMap, codec.Schema(), c.unionFields[topic.SchemaId])
if err != nil {
return fmt.Errorf("error validating and preparing avro data:%s", err)
}
Expand All @@ -338,12 +333,21 @@ func (c *PubSubClient) Publish(ctx context.Context, records []opencdc.Record) er
Events: events,
}

sdk.Logger(ctx).Info().
Str("topic", topic.TopicName).
Str("number of events", string(len(events))).

Check failure on line 338 in pubsub/client.go

View workflow job for this annotation

GitHub Actions / build

conversion from int to string yields a string of one rune, not a string of digits

Check failure on line 338 in pubsub/client.go

View workflow job for this annotation

GitHub Actions / golangci-lint

stringintconv: conversion from int to string yields a string of one rune, not a string of digits (govet)
Msg("Events created, attempting to publish")

resp, err := c.pubSubClient.Publish(c.getAuthContext(), &publishRequest)
if err != nil {
return fmt.Errorf("error on publishing event: %s", err)
}
fmt.Println("Publish request sent!")
fmt.Println(resp)

sdk.Logger(ctx).Info().
Str("topic", topic.TopicName).
Str("resp", resp.String()).
Str("resp rpc id", resp.RpcId).
Msg("Events published")

return nil
}
Expand Down Expand Up @@ -484,25 +488,16 @@ func (c *PubSubClient) Recv(ctx context.Context, topic string, replayID []byte)
Str("event_id", e.Event.Id).
Msg("decoding event")

fmt.Println("Event")
fmt.Println(string(e.Event.Payload))

codec, err := c.fetchCodec(ctx, e.Event.SchemaId)
if err != nil {
return events, err
}

fmt.Println("Code fetched")
fmt.Println(codec)

parsed, _, err := codec.NativeFromBinary(e.Event.Payload)
if err != nil {
return events, err
}

fmt.Println("Parsed event")
fmt.Println(parsed)

payload, ok := parsed.(map[string]interface{})
if !ok {
return events, fmt.Errorf("invalid payload type %T", payload)
Expand Down Expand Up @@ -773,9 +768,6 @@ func (c *PubSubClient) canAccessTopic(ctx context.Context, accessLevel string) e
return fmt.Errorf("user %q not allowed to subscribe to %q", c.userID, resp.TopicName)
}

fmt.Println("CAN PUBLISH?")
fmt.Println(resp.CanPublish)

if accessLevel == publish && !resp.CanPublish {
return fmt.Errorf("user %q not allowed to publish to %q", c.userID, resp.TopicName)
}
Expand Down
33 changes: 11 additions & 22 deletions pubsub/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func invalidReplayIDErr(err error) bool {
return strings.Contains(strings.ToLower(err.Error()), "replay id validation failed")
}

func validateAndPreparePayload(dataMap map[string]interface{}, avroSchema string) (map[string]interface{}, error) {
func validateAndPreparePayload(dataMap map[string]interface{}, avroSchema string, unionFields map[string]struct{}) (map[string]interface{}, error) {
var schema map[string]interface{}
if err := json.Unmarshal([]byte(avroSchema), &schema); err != nil {
return nil, err
Expand All @@ -86,32 +86,21 @@ func validateAndPreparePayload(dataMap map[string]interface{}, avroSchema string
avroRecord[fieldName] = nil
continue
}
if isUnionType(fieldType) {
if value == nil {
avroRecord[fieldName] = nil
} else {
avroRecord[fieldName] = map[string]interface{}{
"string": value,
if _, ok := unionFields[fieldName]; ok && value != nil {
switch t := fieldType.(type) {

Check failure on line 90 in pubsub/utils.go

View workflow job for this annotation

GitHub Actions / golangci-lint

singleCaseSwitch: should rewrite switch statement to if statement (gocritic)
case []interface{}:
for _, unionType := range t {
if unionType.(string) != "null" {
avroRecord[fieldName] = map[string]interface{}{
unionType.(string): value,
}
}
}
}
} else {
switch fieldType.(type) {
case string:
avroRecord[fieldName] = value
case []interface{}:
avroRecord[fieldName] = value
default:
avroRecord[fieldName] = value
}
avroRecord[fieldName] = value
}
}

return avroRecord, nil
}

func isUnionType(fieldType interface{}) bool {
if types, ok := fieldType.([]interface{}); ok {
return len(types) > 1
}
return false
}
9 changes: 7 additions & 2 deletions source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import (

type client interface {
Next(context.Context) (opencdc.Record, error)
InitializeCDC(context.Context, string, position.Topics, []string, time.Duration) error
Initialize(context.Context, []string) error
StartCDC(context.Context, string, position.Topics, []string, time.Duration) error
Stop(context.Context)
Close(context.Context) error
Wait(context.Context) error
Expand Down Expand Up @@ -95,7 +96,11 @@ func (s *Source) Open(ctx context.Context, sdkPos opencdc.Position) error {
return fmt.Errorf("could not create GRPCClient: %w", err)
}

if err := client.InitializeCDC(ctx, s.config.ReplayPreset, parsedPositions, s.config.TopicNames, s.config.PollingPeriod); err != nil {
if err := client.Initialize(ctx, s.config.TopicNames); err != nil {
return fmt.Errorf("could not initialize pubsub client: %w", err)
}

if err := client.StartCDC(ctx, s.config.ReplayPreset, parsedPositions, s.config.TopicNames, s.config.PollingPeriod); err != nil {
return fmt.Errorf("could not initialize pubsub client: %w", err)
}

Expand Down

0 comments on commit 55fea9d

Please sign in to comment.