Skip to content

Commit

Permalink
update to use conduit commons
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon committed Oct 18, 2024
1 parent 6577ba1 commit 8525ddb
Show file tree
Hide file tree
Showing 12 changed files with 96 additions and 93 deletions.
2 changes: 1 addition & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestConfig_Parse_success(t *testing.T) {
)

var got Config
err := sdk.Util.ParseConfig(raw, &got)
err := sdk.Util.ParseConfig(ctx, raw, &got)

Check failure on line 45 in config/config_test.go

View workflow job for this annotation

GitHub Actions / test

undefined: ctx

Check failure on line 45 in config/config_test.go

View workflow job for this annotation

GitHub Actions / test

not enough arguments in call to sdk.Util.ParseConfig

Check failure on line 45 in config/config_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: ctx

Check failure on line 45 in config/config_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

not enough arguments in call to sdk.Util.ParseConfig
if err != nil {
t.Errorf("unexpected error: %s", err.Error())

Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
module github.com/conduitio-labs/conduit-connector-cosmos-nosql

go 1.22.2
go 1.23.2

require (
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v1.1.0
github.com/conduitio/conduit-commons v0.4.0
github.com/conduitio/conduit-connector-sdk v0.11.0
github.com/golang/mock v1.6.0
github.com/golangci/golangci-lint v1.61.0
Expand Down Expand Up @@ -50,7 +52,6 @@ require (
github.com/charithe/durationcheck v0.0.10 // indirect
github.com/chavacava/garif v0.1.0 // indirect
github.com/ckaznocha/intrange v0.2.0 // indirect
github.com/conduitio/conduit-commons v0.4.0 // indirect
github.com/conduitio/conduit-connector-protocol v0.8.0 // indirect
github.com/curioswitch/go-reassign v0.2.0 // indirect
github.com/daixiang0/gci v0.13.5 // indirect
Expand Down
2 changes: 1 addition & 1 deletion source/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Config struct {

// The name of a key that is used for ordering items.
OrderingKey string `json:"orderingKey" validate:"required"`
// Comma-separated list of key names to build the sdk.Record.Key.
// Comma-separated list of key names to build the opencdc.Record.Key.
Keys []string `json:"keys"`
// Determines whether the connector takes a snapshot
// of all items before starting CDC mode.
Expand Down
2 changes: 1 addition & 1 deletion source/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestConfig_ParseSource_success(t *testing.T) {
)

var got Config
err := sdk.Util.ParseConfig(raw, &got)
err := sdk.Util.ParseConfig(ctx, raw, &got)

Check failure on line 45 in source/config_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: ctx

Check failure on line 45 in source/config_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

not enough arguments in call to sdk.Util.ParseConfig
if err != nil {
t.Errorf("unexpected error: %s", err.Error())

Expand Down
27 changes: 14 additions & 13 deletions source/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/huandu/go-sqlbuilder"
)
Expand Down Expand Up @@ -53,11 +54,11 @@ type iterator struct {
metaProperties bool
batchSize uint

items []sdk.StructuredData
items []opencdc.StructuredData
}

// newIterator creates a new instance of the iterator.
func newIterator(ctx context.Context, config Config, sdkPosition sdk.Position) (*iterator, error) {
func newIterator(ctx context.Context, config Config, sdkPosition opencdc.Position) (*iterator, error) {
iter := &iterator{
partitionKey: azcosmos.NewPartitionKeyString(config.PartitionValue),
container: config.Container,
Expand Down Expand Up @@ -99,7 +100,7 @@ func newIterator(ctx context.Context, config Config, sdkPosition sdk.Position) (
}

if config.Snapshot {
// set this value to know where to stop returning records with [sdk.OperationSnapshot] operation
// set this value to know where to stop returning records with [opencdc.OperationSnapshot] operation
iter.position.LatestSnapshotValue = latestSnapshotValue

return iter, nil
Expand Down Expand Up @@ -145,12 +146,12 @@ func (iter *iterator) HasNext(ctx context.Context) (bool, error) {
}

// Next returns the next record.
func (iter *iterator) Next() (sdk.Record, error) {
key := make(sdk.StructuredData)
func (iter *iterator) Next() (opencdc.Record, error) {
key := make(opencdc.StructuredData)
for i := range iter.keys {
val, ok := iter.items[0][iter.keys[i]]
if !ok {
return sdk.Record{}, fmt.Errorf("key %q not found", iter.keys[i])
return opencdc.Record{}, fmt.Errorf("key %q not found", iter.keys[i])
}

key[iter.keys[i]] = val
Expand All @@ -165,7 +166,7 @@ func (iter *iterator) Next() (sdk.Record, error) {

rowBytes, err := json.Marshal(iter.items[0])
if err != nil {
return sdk.Record{}, fmt.Errorf("marshal item: %w", err)
return opencdc.Record{}, fmt.Errorf("marshal item: %w", err)
}

// set a new position into the variable,
Expand All @@ -176,12 +177,12 @@ func (iter *iterator) Next() (sdk.Record, error) {

convertedPosition, err := pos.marshal()
if err != nil {
return sdk.Record{}, fmt.Errorf("convert position :%w", err)
return opencdc.Record{}, fmt.Errorf("convert position :%w", err)
}

iter.position = &pos

metadata := sdk.Metadata{
metadata := opencdc.Metadata{
metadataFieldContainer: iter.container,
}
metadata.SetCreatedAt(time.Now().UTC())
Expand All @@ -195,10 +196,10 @@ func (iter *iterator) Next() (sdk.Record, error) {
}

if pos.LatestSnapshotValue != nil {
return sdk.Util.Source.NewRecordSnapshot(convertedPosition, metadata, key, sdk.RawData(rowBytes)), nil
return sdk.Util.Source.NewRecordSnapshot(convertedPosition, metadata, key, opencdc.RawData(rowBytes)), nil
}

return sdk.Util.Source.NewRecordCreate(convertedPosition, metadata, key, sdk.RawData(rowBytes)), nil
return sdk.Util.Source.NewRecordCreate(convertedPosition, metadata, key, opencdc.RawData(rowBytes)), nil
}

// latestSnapshotValue returns the value of the orderingKey key
Expand Down Expand Up @@ -265,7 +266,7 @@ func (iter *iterator) loadItems(ctx context.Context) error {
// selectItems selects a batch of items by a query from a database.
func (iter *iterator) selectItems(
ctx context.Context, query string, queryParameters ...azcosmos.QueryParameter,
) ([]sdk.StructuredData, error) {
) ([]opencdc.StructuredData, error) {
queryPager := iter.containerClient.NewQueryItemsPager(query, iter.partitionKey, &azcosmos.QueryOptions{
QueryParameters: queryParameters,
})
Expand All @@ -283,7 +284,7 @@ func (iter *iterator) selectItems(
return nil, nil
}

items := make([]sdk.StructuredData, len(queryResponse.Items))
items := make([]opencdc.StructuredData, len(queryResponse.Items))
for i := range queryResponse.Items {
if err = json.Unmarshal(queryResponse.Items[i], &items[i]); err != nil {
return nil, fmt.Errorf("unmarshal item: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions source/mock/source.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions source/position.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"encoding/json"
"fmt"

sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/conduitio/conduit-commons/opencdc"
)

// position is an iterator position.
Expand All @@ -31,23 +31,23 @@ type position struct {
LatestSnapshotValue any `json:"latestSnapshotValue"`
}

// parsePosition converts an [sdk.Position] into a [position].
func parsePosition(sdkPosition sdk.Position) (*position, error) {
// parsePosition converts an [opencdc.Position] into a [position].
func parsePosition(sdkPosition opencdc.Position) (*position, error) {
pos := new(position)

if sdkPosition == nil {
return pos, nil
}

if err := json.Unmarshal(sdkPosition, pos); err != nil {
return nil, fmt.Errorf("unmarshal sdk.Position into position: %w", err)
return nil, fmt.Errorf("unmarshal opencdc.Position into position: %w", err)
}

return pos, nil
}

// marshal marshals the underlying [position] into a [sdk.Position] as JSON bytes.
func (p position) marshal() (sdk.Position, error) {
// marshal marshals the underlying [position] into a [opencdc.Position] as JSON bytes.
func (p position) marshal() (opencdc.Position, error) {
positionBytes, err := json.Marshal(p)
if err != nil {
return nil, fmt.Errorf("marshal position: %w", err)
Expand Down
26 changes: 14 additions & 12 deletions source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ import (
"fmt"

"github.com/conduitio-labs/conduit-connector-cosmos-nosql/common"
"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-connector-sdk"
)

// Iterator is an interface needed for the [Source].
type Iterator interface {
HasNext(context.Context) (bool, error)
Next() (sdk.Record, error)
Next() (opencdc.Record, error)
}

// Source is an Azure Cosmos DB for NoSQL source connector.
Expand All @@ -41,16 +43,16 @@ func New() sdk.Source {
return sdk.SourceWithMiddleware(&Source{}, sdk.DefaultSourceMiddleware()...)
}

// Parameters is a map of named [sdk.Parameter] that describe how to configure the [Source].
func (s *Source) Parameters() map[string]sdk.Parameter {
// Parameters is a map of named [config.Parameter] that describe how to configure the [Source].
func (s *Source) Parameters() config.Parameters {
return s.config.Parameters()
}

// Configure parses and initializes the [Source] config.
func (s *Source) Configure(ctx context.Context, raw map[string]string) error {
func (s *Source) Configure(ctx context.Context, raw config.Config) error {
sdk.Logger(ctx).Info().Msg("Configuring an Azure Cosmos DB for NoSQL Source...")

if err := sdk.Util.ParseConfig(raw, &s.config); err != nil {
if err := sdk.Util.ParseConfig(ctx, raw, &s.config, New().Parameters()); err != nil {
return fmt.Errorf("parse source config: %w", err)
}

Expand All @@ -63,7 +65,7 @@ func (s *Source) Configure(ctx context.Context, raw map[string]string) error {
}

// Open parses the position and initializes the iterator.
func (s *Source) Open(ctx context.Context, sdkPosition sdk.Position) error {
func (s *Source) Open(ctx context.Context, sdkPosition opencdc.Position) error {
sdk.Logger(ctx).Info().Msg("Opening an Azure Cosmos DB for NoSQL Source...")

var err error
Expand All @@ -76,29 +78,29 @@ func (s *Source) Open(ctx context.Context, sdkPosition sdk.Position) error {
return nil
}

// Read returns the next [sdk.Record].
func (s *Source) Read(ctx context.Context) (sdk.Record, error) {
// Read returns the next [opencdc.Record].
func (s *Source) Read(ctx context.Context) (opencdc.Record, error) {
sdk.Logger(ctx).Debug().Msg("Reading a record from an Azure Cosmos DB for NoSQL Source...")

hasNext, err := s.iterator.HasNext(ctx)
if err != nil {
return sdk.Record{}, fmt.Errorf("has next: %w", err)
return opencdc.Record{}, fmt.Errorf("has next: %w", err)
}

if !hasNext {
return sdk.Record{}, sdk.ErrBackoffRetry
return opencdc.Record{}, sdk.ErrBackoffRetry
}

record, err := s.iterator.Next()
if err != nil {
return sdk.Record{}, fmt.Errorf("next: %w", err)
return opencdc.Record{}, fmt.Errorf("next: %w", err)
}

return record, nil
}

// Ack just logs the debug event with the position.
func (s *Source) Ack(ctx context.Context, sdkPosition sdk.Position) error {
func (s *Source) Ack(ctx context.Context, sdkPosition opencdc.Position) error {
sdk.Logger(ctx).Debug().Str("position", string(sdkPosition)).Msg("got ack")

return nil
Expand Down
25 changes: 13 additions & 12 deletions source/source_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos"
"github.com/conduitio-labs/conduit-connector-cosmos-nosql/common"
"github.com/conduitio-labs/conduit-connector-cosmos-nosql/config"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/matryer/is"
)
Expand Down Expand Up @@ -187,11 +188,11 @@ func TestSource_Read_combinedIterator(t *testing.T) {

record, err := src.Read(cctx)
is.NoErr(err)
is.Equal(record.Key, sdk.StructuredData(map[string]any{
is.Equal(record.Key, opencdc.StructuredData(map[string]any{
"key1": "1",
"key2": float64(1),
}))
is.Equal(record.Position, sdk.Position(`{"lastProcessedValue":1,"latestSnapshotValue":2}`))
is.Equal(record.Position, opencdc.Position(`{"lastProcessedValue":1,"latestSnapshotValue":2}`))
is.Equal(record.Payload.After.Bytes(), []byte(`{"id":"3794cb1a","key1":"1","key2":1,"partKey":"partVal"}`))

cancel()
Expand Down Expand Up @@ -221,20 +222,20 @@ func TestSource_Read_combinedIterator(t *testing.T) {

record, err = src.Read(cctx)
is.NoErr(err)
is.Equal(record.Key, sdk.StructuredData(map[string]any{
is.Equal(record.Key, opencdc.StructuredData(map[string]any{
"key1": "2",
"key2": float64(2),
}))
is.Equal(record.Position, sdk.Position(`{"lastProcessedValue":2,"latestSnapshotValue":2}`))
is.Equal(record.Position, opencdc.Position(`{"lastProcessedValue":2,"latestSnapshotValue":2}`))
is.Equal(record.Payload.After.Bytes(), []byte(`{"id":"ed053fb6","key1":"2","key2":2,"partKey":"partVal"}`))

record, err = src.Read(cctx)
is.NoErr(err)
is.Equal(record.Key, sdk.StructuredData(map[string]any{
is.Equal(record.Key, opencdc.StructuredData(map[string]any{
"key1": "3",
"key2": float64(3),
}))
is.Equal(record.Position, sdk.Position(`{"lastProcessedValue":3,"latestSnapshotValue":null}`))
is.Equal(record.Position, opencdc.Position(`{"lastProcessedValue":3,"latestSnapshotValue":null}`))
is.Equal(record.Payload.After.Bytes(), []byte(`{"id":"2452d9a6","key1":"3","key2":3,"partKey":"partVal"}`))

_, err = src.Read(cctx)
Expand All @@ -251,11 +252,11 @@ func TestSource_Read_combinedIterator(t *testing.T) {

record, err = src.Read(cctx)
is.NoErr(err)
is.Equal(record.Key, sdk.StructuredData(map[string]any{
is.Equal(record.Key, opencdc.StructuredData(map[string]any{
"key1": "4",
"key2": float64(4),
}))
is.Equal(record.Position, sdk.Position(`{"lastProcessedValue":4,"latestSnapshotValue":null}`))
is.Equal(record.Position, opencdc.Position(`{"lastProcessedValue":4,"latestSnapshotValue":null}`))
is.Equal(record.Payload.After.Bytes(), []byte(`{"id":"d0e7c1af","key1":"4","key2":4,"partKey":"partVal"}`))

cancel()
Expand Down Expand Up @@ -340,11 +341,11 @@ func TestSource_Read_snapshotIsFalse(t *testing.T) {

record, err := src.Read(cctx)
is.NoErr(err)
is.Equal(record.Key, sdk.StructuredData(map[string]any{
is.Equal(record.Key, opencdc.StructuredData(map[string]any{
"key1": "3",
"key2": float64(3),
}))
is.Equal(record.Position, sdk.Position(`{"lastProcessedValue":3,"latestSnapshotValue":null}`))
is.Equal(record.Position, opencdc.Position(`{"lastProcessedValue":3,"latestSnapshotValue":null}`))

_, err = src.Read(cctx)
is.Equal(err, sdk.ErrBackoffRetry)
Expand All @@ -360,11 +361,11 @@ func TestSource_Read_snapshotIsFalse(t *testing.T) {

record, err = src.Read(cctx)
is.NoErr(err)
is.Equal(record.Key, sdk.StructuredData(map[string]any{
is.Equal(record.Key, opencdc.StructuredData(map[string]any{
"key1": "4",
"key2": float64(4),
}))
is.Equal(record.Position, sdk.Position(`{"lastProcessedValue":4,"latestSnapshotValue":null}`))
is.Equal(record.Position, opencdc.Position(`{"lastProcessedValue":4,"latestSnapshotValue":null}`))

cancel()

Expand Down
Loading

0 comments on commit 8525ddb

Please sign in to comment.