Skip to content

Commit

Permalink
nit
Browse files Browse the repository at this point in the history
  • Loading branch information
anna-cross committed Oct 22, 2024
1 parent 8a7b2ee commit c326cf2
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 12 deletions.
10 changes: 9 additions & 1 deletion pubsub/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,15 @@ func (c *PubSubClient) Publish(ctx context.Context, records []opencdc.Record) er
return fmt.Errorf("error marshaling data: %s", err)
}

avroPrepared, err := validateAndPreparePayload(dataMap, codec.Schema())
fmt.Println("Union fields")
fmt.Println(c.unionFields[topic.SchemaId])
fmt.Println("-------")
fmt.Println("-------")
fmt.Println(codec.Schema())
fmt.Println("-------")
fmt.Println("-------")
fmt.Println(string(data.Bytes()))
avroPrepared, err := validateAndPreparePayload(dataMap, codec.Schema(), c.unionFields[topic.SchemaId])
if err != nil {
return fmt.Errorf("error validating and preparing avro data:%s", err)
}
Expand Down
18 changes: 7 additions & 11 deletions pubsub/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func invalidReplayIDErr(err error) bool {
return strings.Contains(strings.ToLower(err.Error()), "replay id validation failed")
}

func validateAndPreparePayload(dataMap map[string]interface{}, avroSchema string) (map[string]interface{}, error) {
func validateAndPreparePayload(dataMap map[string]interface{}, avroSchema string, unionFields map[string]struct{}) (map[string]interface{}, error) {
var schema map[string]interface{}
if err := json.Unmarshal([]byte(avroSchema), &schema); err != nil {
return nil, err
Expand All @@ -80,29 +80,25 @@ func validateAndPreparePayload(dataMap map[string]interface{}, avroSchema string
for _, field := range fields {
fieldMap := field.(map[string]interface{})
fieldName := fieldMap["name"].(string)
fieldType := fieldMap["type"]
//fieldType := fieldMap["type"]

Check failure on line 83 in pubsub/utils.go

View workflow job for this annotation

GitHub Actions / golangci-lint

commentFormatting: put a space between `//` and comment text (gocritic)
value, exists := dataMap[fieldName]
if !exists {
avroRecord[fieldName] = nil
continue
}
if isUnionType(fieldType) {
if _, ok := unionFields[fieldName]; ok {
if value == nil {
avroRecord[fieldName] = nil
} else {
avroRecord[fieldName] = map[string]interface{}{
"string": value,
}

}
} else {
switch fieldType.(type) {
case string:
avroRecord[fieldName] = value
case []interface{}:
avroRecord[fieldName] = value
default:
avroRecord[fieldName] = value
}

avroRecord[fieldName] = value

}
}

Expand Down

0 comments on commit c326cf2

Please sign in to comment.