From 8525ddbeced60fb96ff0ebc40fdcdf0ee0071b31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Fri, 18 Oct 2024 11:57:10 +0200 Subject: [PATCH] update to use conduit commons --- config/config_test.go | 2 +- go.mod | 5 ++- source/config.go | 2 +- source/config_test.go | 2 +- source/iterator.go | 27 +++++++------ source/mock/source.go | 6 +-- source/position.go | 12 +++--- source/source.go | 26 ++++++------ source/source_integration_test.go | 25 ++++++------ source/source_params.go | 66 +++++++++++++++---------------- source/source_test.go | 14 +++---- tools.go | 2 +- 12 files changed, 96 insertions(+), 93 deletions(-) diff --git a/config/config_test.go b/config/config_test.go index 796df38..fe57279 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -42,7 +42,7 @@ func TestConfig_Parse_success(t *testing.T) { ) var got Config - err := sdk.Util.ParseConfig(raw, &got) + err := sdk.Util.ParseConfig(ctx, raw, &got) if err != nil { t.Errorf("unexpected error: %s", err.Error()) diff --git a/go.mod b/go.mod index b1a3ced..e7d272b 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,10 @@ module github.com/conduitio-labs/conduit-connector-cosmos-nosql -go 1.22.2 +go 1.23.2 + require ( github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v1.1.0 + github.com/conduitio/conduit-commons v0.4.0 github.com/conduitio/conduit-connector-sdk v0.11.0 github.com/golang/mock v1.6.0 github.com/golangci/golangci-lint v1.61.0 @@ -50,7 +52,6 @@ require ( github.com/charithe/durationcheck v0.0.10 // indirect github.com/chavacava/garif v0.1.0 // indirect github.com/ckaznocha/intrange v0.2.0 // indirect - github.com/conduitio/conduit-commons v0.4.0 // indirect github.com/conduitio/conduit-connector-protocol v0.8.0 // indirect github.com/curioswitch/go-reassign v0.2.0 // indirect github.com/daixiang0/gci v0.13.5 // indirect diff --git a/source/config.go b/source/config.go index 0c5d539..e8fc5ba 100644 --- a/source/config.go +++ b/source/config.go @@ -35,7 +35,7 @@ type Config struct { // The name of a key that is used for ordering items. OrderingKey string `json:"orderingKey" validate:"required"` - // Comma-separated list of key names to build the sdk.Record.Key. + // Comma-separated list of key names to build the opencdc.Record.Key. Keys []string `json:"keys"` // Determines whether the connector takes a snapshot // of all items before starting CDC mode. diff --git a/source/config_test.go b/source/config_test.go index f103792..2832db6 100644 --- a/source/config_test.go +++ b/source/config_test.go @@ -42,7 +42,7 @@ func TestConfig_ParseSource_success(t *testing.T) { ) var got Config - err := sdk.Util.ParseConfig(raw, &got) + err := sdk.Util.ParseConfig(ctx, raw, &got) if err != nil { t.Errorf("unexpected error: %s", err.Error()) diff --git a/source/iterator.go b/source/iterator.go index bb91bb9..b1186d4 100644 --- a/source/iterator.go +++ b/source/iterator.go @@ -22,6 +22,7 @@ import ( "time" "github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos" + "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-connector-sdk" "github.com/huandu/go-sqlbuilder" ) @@ -53,11 +54,11 @@ type iterator struct { metaProperties bool batchSize uint - items []sdk.StructuredData + items []opencdc.StructuredData } // newIterator creates a new instance of the iterator. -func newIterator(ctx context.Context, config Config, sdkPosition sdk.Position) (*iterator, error) { +func newIterator(ctx context.Context, config Config, sdkPosition opencdc.Position) (*iterator, error) { iter := &iterator{ partitionKey: azcosmos.NewPartitionKeyString(config.PartitionValue), container: config.Container, @@ -99,7 +100,7 @@ func newIterator(ctx context.Context, config Config, sdkPosition sdk.Position) ( } if config.Snapshot { - // set this value to know where to stop returning records with [sdk.OperationSnapshot] operation + // set this value to know where to stop returning records with [opencdc.OperationSnapshot] operation iter.position.LatestSnapshotValue = latestSnapshotValue return iter, nil @@ -145,12 +146,12 @@ func (iter *iterator) HasNext(ctx context.Context) (bool, error) { } // Next returns the next record. -func (iter *iterator) Next() (sdk.Record, error) { - key := make(sdk.StructuredData) +func (iter *iterator) Next() (opencdc.Record, error) { + key := make(opencdc.StructuredData) for i := range iter.keys { val, ok := iter.items[0][iter.keys[i]] if !ok { - return sdk.Record{}, fmt.Errorf("key %q not found", iter.keys[i]) + return opencdc.Record{}, fmt.Errorf("key %q not found", iter.keys[i]) } key[iter.keys[i]] = val @@ -165,7 +166,7 @@ func (iter *iterator) Next() (sdk.Record, error) { rowBytes, err := json.Marshal(iter.items[0]) if err != nil { - return sdk.Record{}, fmt.Errorf("marshal item: %w", err) + return opencdc.Record{}, fmt.Errorf("marshal item: %w", err) } // set a new position into the variable, @@ -176,12 +177,12 @@ func (iter *iterator) Next() (sdk.Record, error) { convertedPosition, err := pos.marshal() if err != nil { - return sdk.Record{}, fmt.Errorf("convert position :%w", err) + return opencdc.Record{}, fmt.Errorf("convert position :%w", err) } iter.position = &pos - metadata := sdk.Metadata{ + metadata := opencdc.Metadata{ metadataFieldContainer: iter.container, } metadata.SetCreatedAt(time.Now().UTC()) @@ -195,10 +196,10 @@ func (iter *iterator) Next() (sdk.Record, error) { } if pos.LatestSnapshotValue != nil { - return sdk.Util.Source.NewRecordSnapshot(convertedPosition, metadata, key, sdk.RawData(rowBytes)), nil + return sdk.Util.Source.NewRecordSnapshot(convertedPosition, metadata, key, opencdc.RawData(rowBytes)), nil } - return sdk.Util.Source.NewRecordCreate(convertedPosition, metadata, key, sdk.RawData(rowBytes)), nil + return sdk.Util.Source.NewRecordCreate(convertedPosition, metadata, key, opencdc.RawData(rowBytes)), nil } // latestSnapshotValue returns the value of the orderingKey key @@ -265,7 +266,7 @@ func (iter *iterator) loadItems(ctx context.Context) error { // selectItems selects a batch of items by a query from a database. func (iter *iterator) selectItems( ctx context.Context, query string, queryParameters ...azcosmos.QueryParameter, -) ([]sdk.StructuredData, error) { +) ([]opencdc.StructuredData, error) { queryPager := iter.containerClient.NewQueryItemsPager(query, iter.partitionKey, &azcosmos.QueryOptions{ QueryParameters: queryParameters, }) @@ -283,7 +284,7 @@ func (iter *iterator) selectItems( return nil, nil } - items := make([]sdk.StructuredData, len(queryResponse.Items)) + items := make([]opencdc.StructuredData, len(queryResponse.Items)) for i := range queryResponse.Items { if err = json.Unmarshal(queryResponse.Items[i], &items[i]); err != nil { return nil, fmt.Errorf("unmarshal item: %w", err) diff --git a/source/mock/source.go b/source/mock/source.go index e3572d2..8a594f7 100644 --- a/source/mock/source.go +++ b/source/mock/source.go @@ -8,7 +8,7 @@ import ( context "context" reflect "reflect" - sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/conduitio/conduit-commons/opencdc" gomock "github.com/golang/mock/gomock" ) @@ -51,10 +51,10 @@ func (mr *MockIteratorMockRecorder) HasNext(arg0 interface{}) *gomock.Call { } // Next mocks base method. -func (m *MockIterator) Next() (sdk.Record, error) { +func (m *MockIterator) Next() (opencdc.Record, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Next") - ret0, _ := ret[0].(sdk.Record) + ret0, _ := ret[0].(opencdc.Record) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/source/position.go b/source/position.go index 46db85d..40f563d 100644 --- a/source/position.go +++ b/source/position.go @@ -18,7 +18,7 @@ import ( "encoding/json" "fmt" - sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/conduitio/conduit-commons/opencdc" ) // position is an iterator position. @@ -31,8 +31,8 @@ type position struct { LatestSnapshotValue any `json:"latestSnapshotValue"` } -// parsePosition converts an [sdk.Position] into a [position]. -func parsePosition(sdkPosition sdk.Position) (*position, error) { +// parsePosition converts an [opencdc.Position] into a [position]. +func parsePosition(sdkPosition opencdc.Position) (*position, error) { pos := new(position) if sdkPosition == nil { @@ -40,14 +40,14 @@ func parsePosition(sdkPosition sdk.Position) (*position, error) { } if err := json.Unmarshal(sdkPosition, pos); err != nil { - return nil, fmt.Errorf("unmarshal sdk.Position into position: %w", err) + return nil, fmt.Errorf("unmarshal opencdc.Position into position: %w", err) } return pos, nil } -// marshal marshals the underlying [position] into a [sdk.Position] as JSON bytes. -func (p position) marshal() (sdk.Position, error) { +// marshal marshals the underlying [position] into a [opencdc.Position] as JSON bytes. +func (p position) marshal() (opencdc.Position, error) { positionBytes, err := json.Marshal(p) if err != nil { return nil, fmt.Errorf("marshal position: %w", err) diff --git a/source/source.go b/source/source.go index 6668a9d..52283eb 100644 --- a/source/source.go +++ b/source/source.go @@ -19,13 +19,15 @@ import ( "fmt" "github.com/conduitio-labs/conduit-connector-cosmos-nosql/common" + "github.com/conduitio/conduit-commons/config" + "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-connector-sdk" ) // Iterator is an interface needed for the [Source]. type Iterator interface { HasNext(context.Context) (bool, error) - Next() (sdk.Record, error) + Next() (opencdc.Record, error) } // Source is an Azure Cosmos DB for NoSQL source connector. @@ -41,16 +43,16 @@ func New() sdk.Source { return sdk.SourceWithMiddleware(&Source{}, sdk.DefaultSourceMiddleware()...) } -// Parameters is a map of named [sdk.Parameter] that describe how to configure the [Source]. -func (s *Source) Parameters() map[string]sdk.Parameter { +// Parameters is a map of named [config.Parameter] that describe how to configure the [Source]. +func (s *Source) Parameters() config.Parameters { return s.config.Parameters() } // Configure parses and initializes the [Source] config. -func (s *Source) Configure(ctx context.Context, raw map[string]string) error { +func (s *Source) Configure(ctx context.Context, raw config.Config) error { sdk.Logger(ctx).Info().Msg("Configuring an Azure Cosmos DB for NoSQL Source...") - if err := sdk.Util.ParseConfig(raw, &s.config); err != nil { + if err := sdk.Util.ParseConfig(ctx, raw, &s.config, New().Parameters()); err != nil { return fmt.Errorf("parse source config: %w", err) } @@ -63,7 +65,7 @@ func (s *Source) Configure(ctx context.Context, raw map[string]string) error { } // Open parses the position and initializes the iterator. -func (s *Source) Open(ctx context.Context, sdkPosition sdk.Position) error { +func (s *Source) Open(ctx context.Context, sdkPosition opencdc.Position) error { sdk.Logger(ctx).Info().Msg("Opening an Azure Cosmos DB for NoSQL Source...") var err error @@ -76,29 +78,29 @@ func (s *Source) Open(ctx context.Context, sdkPosition sdk.Position) error { return nil } -// Read returns the next [sdk.Record]. -func (s *Source) Read(ctx context.Context) (sdk.Record, error) { +// Read returns the next [opencdc.Record]. +func (s *Source) Read(ctx context.Context) (opencdc.Record, error) { sdk.Logger(ctx).Debug().Msg("Reading a record from an Azure Cosmos DB for NoSQL Source...") hasNext, err := s.iterator.HasNext(ctx) if err != nil { - return sdk.Record{}, fmt.Errorf("has next: %w", err) + return opencdc.Record{}, fmt.Errorf("has next: %w", err) } if !hasNext { - return sdk.Record{}, sdk.ErrBackoffRetry + return opencdc.Record{}, sdk.ErrBackoffRetry } record, err := s.iterator.Next() if err != nil { - return sdk.Record{}, fmt.Errorf("next: %w", err) + return opencdc.Record{}, fmt.Errorf("next: %w", err) } return record, nil } // Ack just logs the debug event with the position. -func (s *Source) Ack(ctx context.Context, sdkPosition sdk.Position) error { +func (s *Source) Ack(ctx context.Context, sdkPosition opencdc.Position) error { sdk.Logger(ctx).Debug().Str("position", string(sdkPosition)).Msg("got ack") return nil diff --git a/source/source_integration_test.go b/source/source_integration_test.go index 8f28b24..792fcf5 100644 --- a/source/source_integration_test.go +++ b/source/source_integration_test.go @@ -25,6 +25,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos" "github.com/conduitio-labs/conduit-connector-cosmos-nosql/common" "github.com/conduitio-labs/conduit-connector-cosmos-nosql/config" + "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-connector-sdk" "github.com/matryer/is" ) @@ -187,11 +188,11 @@ func TestSource_Read_combinedIterator(t *testing.T) { record, err := src.Read(cctx) is.NoErr(err) - is.Equal(record.Key, sdk.StructuredData(map[string]any{ + is.Equal(record.Key, opencdc.StructuredData(map[string]any{ "key1": "1", "key2": float64(1), })) - is.Equal(record.Position, sdk.Position(`{"lastProcessedValue":1,"latestSnapshotValue":2}`)) + is.Equal(record.Position, opencdc.Position(`{"lastProcessedValue":1,"latestSnapshotValue":2}`)) is.Equal(record.Payload.After.Bytes(), []byte(`{"id":"3794cb1a","key1":"1","key2":1,"partKey":"partVal"}`)) cancel() @@ -221,20 +222,20 @@ func TestSource_Read_combinedIterator(t *testing.T) { record, err = src.Read(cctx) is.NoErr(err) - is.Equal(record.Key, sdk.StructuredData(map[string]any{ + is.Equal(record.Key, opencdc.StructuredData(map[string]any{ "key1": "2", "key2": float64(2), })) - is.Equal(record.Position, sdk.Position(`{"lastProcessedValue":2,"latestSnapshotValue":2}`)) + is.Equal(record.Position, opencdc.Position(`{"lastProcessedValue":2,"latestSnapshotValue":2}`)) is.Equal(record.Payload.After.Bytes(), []byte(`{"id":"ed053fb6","key1":"2","key2":2,"partKey":"partVal"}`)) record, err = src.Read(cctx) is.NoErr(err) - is.Equal(record.Key, sdk.StructuredData(map[string]any{ + is.Equal(record.Key, opencdc.StructuredData(map[string]any{ "key1": "3", "key2": float64(3), })) - is.Equal(record.Position, sdk.Position(`{"lastProcessedValue":3,"latestSnapshotValue":null}`)) + is.Equal(record.Position, opencdc.Position(`{"lastProcessedValue":3,"latestSnapshotValue":null}`)) is.Equal(record.Payload.After.Bytes(), []byte(`{"id":"2452d9a6","key1":"3","key2":3,"partKey":"partVal"}`)) _, err = src.Read(cctx) @@ -251,11 +252,11 @@ func TestSource_Read_combinedIterator(t *testing.T) { record, err = src.Read(cctx) is.NoErr(err) - is.Equal(record.Key, sdk.StructuredData(map[string]any{ + is.Equal(record.Key, opencdc.StructuredData(map[string]any{ "key1": "4", "key2": float64(4), })) - is.Equal(record.Position, sdk.Position(`{"lastProcessedValue":4,"latestSnapshotValue":null}`)) + is.Equal(record.Position, opencdc.Position(`{"lastProcessedValue":4,"latestSnapshotValue":null}`)) is.Equal(record.Payload.After.Bytes(), []byte(`{"id":"d0e7c1af","key1":"4","key2":4,"partKey":"partVal"}`)) cancel() @@ -340,11 +341,11 @@ func TestSource_Read_snapshotIsFalse(t *testing.T) { record, err := src.Read(cctx) is.NoErr(err) - is.Equal(record.Key, sdk.StructuredData(map[string]any{ + is.Equal(record.Key, opencdc.StructuredData(map[string]any{ "key1": "3", "key2": float64(3), })) - is.Equal(record.Position, sdk.Position(`{"lastProcessedValue":3,"latestSnapshotValue":null}`)) + is.Equal(record.Position, opencdc.Position(`{"lastProcessedValue":3,"latestSnapshotValue":null}`)) _, err = src.Read(cctx) is.Equal(err, sdk.ErrBackoffRetry) @@ -360,11 +361,11 @@ func TestSource_Read_snapshotIsFalse(t *testing.T) { record, err = src.Read(cctx) is.NoErr(err) - is.Equal(record.Key, sdk.StructuredData(map[string]any{ + is.Equal(record.Key, opencdc.StructuredData(map[string]any{ "key1": "4", "key2": float64(4), })) - is.Equal(record.Position, sdk.Position(`{"lastProcessedValue":4,"latestSnapshotValue":null}`)) + is.Equal(record.Position, opencdc.Position(`{"lastProcessedValue":4,"latestSnapshotValue":null}`)) cancel() diff --git a/source/source_params.go b/source/source_params.go index 3655b0f..16d45ff 100644 --- a/source/source_params.go +++ b/source/source_params.go @@ -3,85 +3,83 @@ package source -import ( - sdk "github.com/conduitio/conduit-connector-sdk" -) +import "github.com/conduitio/conduit-commons/config" -func (Config) Parameters() map[string]sdk.Parameter { - return map[string]sdk.Parameter{ +func (Config) Parameters() config.Parameters { + return map[string]config.Parameter{ "batchSize": { Default: "1000", Description: "The size of an element batch.", - Type: sdk.ParameterTypeInt, - Validations: []sdk.Validation{ - sdk.ValidationGreaterThan{Value: 0}, - sdk.ValidationLessThan{Value: 100001}, + Type: config.ParameterTypeInt, + Validations: []config.Validation{ + config.ValidationGreaterThan{Value: 0}, + config.ValidationLessThan{Value: 100001}, }, }, "container": { Default: "", Description: "The name of a container the connector should work with.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{ - sdk.ValidationRequired{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationRequired{}, }, }, "database": { Default: "", Description: "The name of a database the connector should work with.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{ - sdk.ValidationRequired{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationRequired{}, }, }, "keys": { Default: "", - Description: "Comma-separated list of key names to build the sdk.Record.Key.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{}, + Description: "Comma-separated list of key names to build the opencdc.Record.Key.", + Type: config.ParameterTypeString, + Validations: []config.Validation{}, }, "metaProperties": { Default: "false", Description: "metaProperties whether the connector takes the next automatically generated meta-properties: \"_rid\", \"_ts\", \"_self\", \"_etag\", \"_attachments\".", - Type: sdk.ParameterTypeBool, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeBool, + Validations: []config.Validation{}, }, "orderingKey": { Default: "", Description: "The name of a key that is used for ordering items.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{ - sdk.ValidationRequired{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationRequired{}, }, }, "partitionValue": { Default: "", Description: "The logical partition key value.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{ - sdk.ValidationRequired{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationRequired{}, }, }, "primaryKey": { Default: "", Description: "The key for authentication with Azure Cosmos DB.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{ - sdk.ValidationRequired{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationRequired{}, }, }, "snapshot": { Default: "true", Description: "Determines whether the connector takes a snapshot of all items before starting CDC mode.", - Type: sdk.ParameterTypeBool, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeBool, + Validations: []config.Validation{}, }, "uri": { Default: "", Description: "The connection uri pointed to an Azure Cosmos DB for NoSQL instance.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{ - sdk.ValidationRequired{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationRequired{}, }, }, } diff --git a/source/source_test.go b/source/source_test.go index e4e459c..1526122 100644 --- a/source/source_test.go +++ b/source/source_test.go @@ -22,7 +22,7 @@ import ( "github.com/conduitio-labs/conduit-connector-cosmos-nosql/config" "github.com/conduitio-labs/conduit-connector-cosmos-nosql/source/mock" - sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/conduitio/conduit-commons/opencdc" "github.com/golang/mock/gomock" "github.com/matryer/is" ) @@ -77,17 +77,17 @@ func TestSource_Read_success(t *testing.T) { ctrl := gomock.NewController(t) ctx := context.Background() - key := make(sdk.StructuredData) + key := make(opencdc.StructuredData) key["field1"] = 1 - metadata := make(sdk.Metadata) + metadata := make(opencdc.Metadata) metadata.SetCreatedAt(time.Time{}) - record := sdk.Record{ - Position: sdk.Position(`{"lastProcessedValue": 1}`), + record := opencdc.Record{ + Position: opencdc.Position(`{"lastProcessedValue": 1}`), Metadata: metadata, Key: key, - Payload: sdk.Change{ + Payload: opencdc.Change{ After: key, }, } @@ -131,7 +131,7 @@ func TestSource_Read_failNext(t *testing.T) { it := mock.NewMockIterator(ctrl) it.EXPECT().HasNext(ctx).Return(true, nil) - it.EXPECT().Next().Return(sdk.Record{}, errors.New("marshal item: some error")) + it.EXPECT().Next().Return(opencdc.Record{}, errors.New("marshal item: some error")) s := Source{iterator: it} diff --git a/tools.go b/tools.go index 7214e0b..0f9b3cc 100644 --- a/tools.go +++ b/tools.go @@ -17,6 +17,6 @@ package main import ( - _ "github.com/conduitio/conduit-connector-sdk/cmd/paramgen" + _ "github.com/conduitio/conduit-commons/paramgen" _ "github.com/golangci/golangci-lint/cmd/golangci-lint" )