From ab80ccb01fecd682dd13e35502edbeae355ff606 Mon Sep 17 00:00:00 2001 From: Gaurav Sahil Date: Tue, 8 Oct 2024 20:47:57 +0530 Subject: [PATCH 1/4] feat: multiple collection support in source/destination connector --- README.md | 20 +- acceptance_test.go | 10 +- common/configuration.go | 4 - common/error.go | 23 ++ destination/config/config.go | 19 +- destination/config/config_test.go | 96 +----- destination/config/paramgen.go | 17 +- destination/destination_integration_test.go | 6 - destination/destination_test.go | 6 +- destination/writer/writer.go | 5 +- go.mod | 2 +- source/config/config.go | 69 ++-- source/config/config_test.go | 173 ++++------ source/config/paramgen.go | 25 +- source/iterator/iterator.go | 326 +++++------------- source/iterator/position.go | 4 + source/iterator/position_test.go | 116 ++++--- source/iterator/worker.go | 351 ++++++++++++++++++++ source/source_integration_test.go | 45 ++- source/source_test.go | 44 ++- 20 files changed, 715 insertions(+), 646 deletions(-) create mode 100644 source/iterator/worker.go diff --git a/README.md b/README.md index c54926a..3dc1375 100644 --- a/README.md +++ b/README.md @@ -21,15 +21,15 @@ to the environment variables as an `REDSHIFT_DSN`. ## Source -Conduit's Redshift source connector allows you to move data from a Redshift table with the specified `dsn` and `table` -configuration parameters. Upon starting, the source connector takes a snapshot of a given table in the database, then +Conduit's Redshift source connector allows you to move data from multiple Redshift tables with the specified `dsn` and `tables` +configuration parameters. Upon starting, the source connector takes a snapshot the tables in the database, then switches into change data capture (CDC) mode. In CDC mode, the connector will only detect new rows. ### Snapshots -At the first launch of the connector, the snapshot mode is enabled and the last value of the `orderingColumn` is stored +At the first launch of the connector, the snapshot mode is enabled and the last value of the `orderingColumn` corresponding to a table is stored to the position, to know the boundary of this mode. The connector reads all rows of a table in batches, using a -keyset pagination, limiting the rows by `batchSize` and ordering by `orderingColumn`. The connector stores the +keyset pagination, limiting the rows by `batchSize` and ordering by `orderingColumn` and then does the same for other tables. The connector stores the last processed element value of an `orderingColumn` in a position, so the snapshot process can be paused and resumed without losing data. Once all rows in that initial snapshot are read the connector switches into CDC mode. @@ -45,10 +45,9 @@ pagination, limiting by `batchSize` and ordering by `orderingColumn`. | name | description | required | example | default value | |------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------------------------------------------------------|---------------| | `dsn` | [DSN](https://en.wikipedia.org/wiki/Data_source_name) to connect to Redshift. | **true** | `postgres://username:password@endpoint:5439/database` | | -| `table` | Name of the table from which the connector reads from. | **true** | `table_name` | | -| `orderingColumn` | Column used to order the rows.
Keep in mind that the data will be sorted by this column, so the column must contain unique, consistent values suitable for sorting. | **true** | `id` | | +| `tables` | Name of the tables from which the connector reads from. | **true** | `table1,table2` | | +| `orderingColumns` | Corresponding Columns for the tables used to order the rows.
Keep in mind that the data will be sorted by this column, so the column must contain unique, consistent values suitable for sorting. | **true** | `id,created_at` | | | `snapshot` | Whether the connector will take a snapshot of the entire table before starting cdc mode. | false | `false` | "true" | -| `keyColumns` | Comma-separated list of column names to build the `sdk.Record.Key`. See more: [Key handling](#key-handling). | false | `id,name` | | | `batchSize` | Size of rows batch. Min is 1 and max is 100000. | false | `100` | "1000" | ### Key handling @@ -60,7 +59,7 @@ of `sdk.Record.Key` field are taken from `sdk.Payload.After` by the keys of this ### Table Name -For each record, the connector adds a `redshift.table` property to the metadata that contains the table name. +For each record, the connector adds a `opencdc.collection` property to the metadata that contains the table name. ## Destination @@ -74,8 +73,7 @@ configuration parameters. It takes an `sdk.Record` and parses it into a valid SQ | name | description | required | example | default | |--------------|-------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------------------------------------------------------|---------| | `dsn` | [DSN](https://en.wikipedia.org/wiki/Data_source_name) to connect to Redshift. | **true** | `postgres://username:password@endpoint:5439/database` | "" | -| `table` | Name of the table the connector writes to. | **true** | `table_name` | "" | -| `keyColumns` | Comma-separated list of column names to build the where clause in case if `sdk.Record.Key` is empty.
See more: [Key handling](#key-handling-1). | false | `id,name` | "" | +| `table` | Name of the table the connector writes to. | **false** | `table_name` | "{{ index .Metadata "opencdc.collection" }}" | ### Key handling @@ -88,7 +86,7 @@ configuration parameters. It takes an `sdk.Record` and parses it into a valid SQ ### Table Name -Records are written to the table specified by the `redshift.table` property in the metadata, if it exists. +Records are written to the table specified by the `opencdc.collection` property in the metadata, if it exists. If not, the connector will fall back to the `table` configured in the connector. ## Known limitations diff --git a/acceptance_test.go b/acceptance_test.go index d554239..9facfbb 100644 --- a/acceptance_test.go +++ b/acceptance_test.go @@ -36,7 +36,7 @@ const ( // envNameDSN is a Redshift dsn environment name. envNameDSN = "REDSHIFT_DSN" // metadataFieldTable is a name of a record metadata field that stores a Redshift table name. - metadataFieldTable = "redshift.table" + metadataFieldTable = "opencdc.collection" ) type driver struct { @@ -53,7 +53,7 @@ func (d *driver) GenerateRecord(_ *testing.T, operation opencdc.Operation) openc Position: nil, Operation: operation, Metadata: map[string]string{ - metadataFieldTable: d.Config.SourceConfig[srcConfig.ConfigTable], + metadataFieldTable: d.Config.SourceConfig[srcConfig.ConfigTables], }, Key: opencdc.StructuredData{ "col1": d.id, @@ -71,9 +71,9 @@ func TestAcceptance(t *testing.T) { } cfg := map[string]string{ - srcConfig.ConfigDsn: dsn, - srcConfig.ConfigTable: fmt.Sprintf("conduit_test_%d", time.Now().UnixNano()), - srcConfig.ConfigOrderingColumn: "col1", + srcConfig.ConfigDsn: dsn, + srcConfig.ConfigTables: fmt.Sprintf("conduit_test_%d", time.Now().UnixNano()), + srcConfig.ConfigOrderingColumns: "col1", } sdk.AcceptanceTest(t, &driver{ diff --git a/common/configuration.go b/common/configuration.go index 62154ed..5e5732e 100644 --- a/common/configuration.go +++ b/common/configuration.go @@ -24,8 +24,4 @@ const ( type Configuration struct { // DSN is the configuration of the data source name to connect to the Amazon Redshift. DSN string `json:"dsn" validate:"required"` - // Table is the configuration of the table name. - Table string `json:"table" validate:"required"` - // KeyColumns is the configuration list of column names to build the opencdc.Record.Key (for Source). - KeyColumns []string `json:"keyColumns"` } diff --git a/common/error.go b/common/error.go index c7edb9d..67347cd 100644 --- a/common/error.go +++ b/common/error.go @@ -67,3 +67,26 @@ func (e GreaterThanError) Error() string { func NewGreaterThanError(fieldName string, value int) GreaterThanError { return GreaterThanError{fieldName: fieldName, value: value} } + +type NoTablesOrColumnsError struct{} + +func (e NoTablesOrColumnsError) Error() string { + return "at least one table and one corresponding ordering column must be provided" +} + +func NewNoTablesOrColumnsError() NoTablesOrColumnsError { + return NoTablesOrColumnsError{} +} + +type MismatchedTablesAndColumnsError struct { + tablesCount int + orderingColumnsCount int +} + +func (e MismatchedTablesAndColumnsError) Error() string { + return fmt.Sprintf("the number of tables (%d) and ordering columns (%d) must match", e.tablesCount, e.orderingColumnsCount) +} + +func NewMismatchedTablesAndColumnsError(tablesCount, orderingColumnsCount int) MismatchedTablesAndColumnsError { + return MismatchedTablesAndColumnsError{tablesCount: tablesCount, orderingColumnsCount: orderingColumnsCount} +} diff --git a/destination/config/config.go b/destination/config/config.go index 5ce71e0..a6fb875 100644 --- a/destination/config/config.go +++ b/destination/config/config.go @@ -25,14 +25,16 @@ import ( // Config is a destination configuration needed to connect to Redshift database. type Config struct { common.Configuration + + // Table is used as the target table into which records are inserted. + Table string `json:"table"` } // Validate executes manual validations beyond what is defined in struct tags. func (c *Config) Validate() error { // c.DSN has required validation handled in struct tag - // c.Table required validation is handled in stuct tag - // handling "lowercase", "excludesall= " and "lte=127" validations + // handling "lowercase", "excludesall= " and "lte=127" validations for c.Table if c.Table != strings.ToLower(c.Table) { return common.NewLowercaseError(ConfigTable) } @@ -43,18 +45,5 @@ func (c *Config) Validate() error { return common.NewLessThanError(ConfigTable, common.MaxConfigStringLength) } - // c.KeyColumns handling "lowercase", "excludesall= " and "lte=127" validations - for _, v := range c.KeyColumns { - if v != strings.ToLower(v) { - return common.NewLowercaseError(ConfigKeyColumns) - } - if strings.Contains(v, " ") { - return common.NewExcludesSpacesError(ConfigKeyColumns) - } - if len(v) > common.MaxConfigStringLength { - return common.NewLessThanError(ConfigKeyColumns, common.MaxConfigStringLength) - } - } - return nil } diff --git a/destination/config/config_test.go b/destination/config/config_test.go index da46641..1f8977a 100644 --- a/destination/config/config_test.go +++ b/destination/config/config_test.go @@ -36,67 +36,12 @@ func TestValidateConfig(t *testing.T) { in *Config wantErr error }{ - { - name: "success_keyColumns_has_one_key", - in: &Config{ - common.Configuration{ - DSN: testValueDSN, - Table: testValueTable, - KeyColumns: []string{"id"}, - }, - }, - wantErr: nil, - }, - { - name: "success_keyColumns_has_two_keys", - in: &Config{ - common.Configuration{ - DSN: testValueDSN, - Table: testValueTable, - KeyColumns: []string{"id", "name"}, - }, - }, - wantErr: nil, - }, - { - name: "success_keyColumns_ends_with_space", - in: &Config{ - common.Configuration{ - DSN: testValueDSN, - Table: testValueTable, - KeyColumns: []string{"id", "name "}, - }, - }, - wantErr: common.NewExcludesSpacesError(ConfigKeyColumns), - }, - { - name: "success_keyColumns_starts_with_space", - in: &Config{ - common.Configuration{ - DSN: testValueDSN, - Table: testValueTable, - KeyColumns: []string{"id", "name "}, - }, - }, - wantErr: common.NewExcludesSpacesError(ConfigKeyColumns), - }, - { - name: "success_keyColumns_has_two_spaces", - in: &Config{ - common.Configuration{ - DSN: testValueDSN, - Table: testValueTable, - KeyColumns: []string{"id", " name"}, - }, - }, - wantErr: common.NewExcludesSpacesError(ConfigKeyColumns), - }, { name: "failure_table_has_space", in: &Config{ - common.Configuration{ - DSN: testValueDSN, - Table: "test table", + Table: "test_table ", + Configuration: common.Configuration{ + DSN: testValueDSN, }, }, wantErr: common.NewExcludesSpacesError(ConfigTable), @@ -104,42 +49,19 @@ func TestValidateConfig(t *testing.T) { { name: "failure_table_has_uppercase_letter", in: &Config{ - common.Configuration{ - DSN: testValueDSN, - Table: "Test_table", + Table: "Test_table", + Configuration: common.Configuration{ + DSN: testValueDSN, }, }, wantErr: common.NewLowercaseError(ConfigTable), }, - { - name: "failure_keyColumns_has_uppercase_letter", - in: &Config{ - common.Configuration{ - DSN: testValueDSN, - Table: testValueTable, - KeyColumns: []string{"ID"}, - }, - }, - wantErr: common.NewLowercaseError(ConfigKeyColumns), - }, - { - name: "failure_keyColumns_exceeds_max_length", - in: &Config{ - common.Configuration{ - DSN: testValueDSN, - Table: testValueTable, - KeyColumns: []string{testLongString}, - }, - }, - wantErr: common.NewLessThanError(ConfigKeyColumns, common.MaxConfigStringLength), - }, { name: "failure_table_exceeds_max_length", in: &Config{ - common.Configuration{ - DSN: testValueDSN, - Table: testLongString, - KeyColumns: []string{"id"}, + Table: testLongString, + Configuration: common.Configuration{ + DSN: testValueDSN, }, }, wantErr: common.NewLessThanError(ConfigTable, common.MaxConfigStringLength), diff --git a/destination/config/paramgen.go b/destination/config/paramgen.go index 692e497..1546780 100644 --- a/destination/config/paramgen.go +++ b/destination/config/paramgen.go @@ -8,9 +8,8 @@ import ( ) const ( - ConfigDsn = "dsn" - ConfigKeyColumns = "keyColumns" - ConfigTable = "table" + ConfigDsn = "dsn" + ConfigTable = "table" ) func (Config) Parameters() map[string]config.Parameter { @@ -23,19 +22,11 @@ func (Config) Parameters() map[string]config.Parameter { config.ValidationRequired{}, }, }, - ConfigKeyColumns: { - Default: "", - Description: "KeyColumns is the configuration list of column names to build the opencdc.Record.Key (for Source).", - Type: config.ParameterTypeString, - Validations: []config.Validation{}, - }, ConfigTable: { Default: "", - Description: "Table is the configuration of the table name.", + Description: "Table is used as the target table into which records are inserted.", Type: config.ParameterTypeString, - Validations: []config.Validation{ - config.ValidationRequired{}, - }, + Validations: []config.Validation{}, }, } } diff --git a/destination/destination_integration_test.go b/destination/destination_integration_test.go index 9ff10f3..810b927 100644 --- a/destination/destination_integration_test.go +++ b/destination/destination_integration_test.go @@ -258,9 +258,6 @@ func TestDestination_Write_successKeyColumns(t *testing.T) { _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (1, 2);", cfg[config.ConfigTable])) is.NoErr(err) - // set a KeyColumns field to the config - cfg[config.ConfigKeyColumns] = "col1" - dest := NewDestination() err = dest.Configure(ctx, cfg) @@ -352,9 +349,6 @@ func TestDestination_Write_failedWrongKeyColumnsField(t *testing.T) { _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (1, 2);", cfg[config.ConfigTable])) is.NoErr(err) - // set a wrong KeyColumns field to the config - cfg[config.ConfigKeyColumns] = "wrong_column" - dest := NewDestination() err = dest.Configure(ctx, cfg) diff --git a/destination/destination_test.go b/destination/destination_test.go index f8aff91..7f79acb 100644 --- a/destination/destination_test.go +++ b/destination/destination_test.go @@ -53,12 +53,10 @@ func TestDestination_Configure_Fail(t *testing.T) { d := NewDestination() - err := d.Configure(context.Background(), map[string]string{ - config.ConfigDsn: testDSN, - }) + err := d.Configure(context.Background(), map[string]string{}) is.True(err != nil) is.Equal(err.Error(), - `config invalid: error validating "table": required parameter is not provided`) + `config invalid: error validating "dsn": required parameter is not provided`) } func TestDestination_Write_Success(t *testing.T) { diff --git a/destination/writer/writer.go b/destination/writer/writer.go index 9bb90b6..2e595c0 100644 --- a/destination/writer/writer.go +++ b/destination/writer/writer.go @@ -30,7 +30,7 @@ import ( const ( // metadataFieldTable is a name of a record metadata field that stores a Redshift table name. - metadataFieldTable = "redshift.table" + metadataFieldTable = "opencdc.collection" // keySearchPath is a key of get parameter of a datatable's schema name. keySearchPath = "search_path" // pingTimeout is a database ping timeout. @@ -50,8 +50,7 @@ func NewWriter(ctx context.Context, driverName string, config config.Config) (*W var err error writer := &Writer{ - table: config.Table, - keyColumns: config.KeyColumns, + table: config.Table, } writer.db, err = sqlx.Open(driverName, config.DSN) diff --git a/go.mod b/go.mod index 20efed7..67cac40 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,6 @@ require ( github.com/jmoiron/sqlx v1.4.0 github.com/matryer/is v1.4.1 go.uber.org/mock v0.4.0 - go.uber.org/multierr v1.11.0 ) require ( @@ -206,6 +205,7 @@ require ( go-simpler.org/sloglint v0.7.2 // indirect go.uber.org/automaxprocs v1.5.3 // indirect go.uber.org/goleak v1.3.0 // indirect + go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect golang.org/x/crypto v0.27.0 // indirect golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e // indirect diff --git a/source/config/config.go b/source/config/config.go index 501f219..df828e8 100644 --- a/source/config/config.go +++ b/source/config/config.go @@ -17,6 +17,7 @@ package config import ( + "fmt" "strings" "github.com/conduitio-labs/conduit-connector-redshift/common" @@ -25,8 +26,11 @@ import ( // Config contains source-specific configurable values. type Config struct { common.Configuration - // OrderingColumn is a name of a column that the connector will use for ordering rows. - OrderingColumn string `json:"orderingColumn" validate:"required"` + // Tables is a list of table names to pull data from. + Tables []string `json:"tables" validate:"required"` + // OrderingColumns is a list of corresponding ordering columns for the table + // that the connector will use for ordering rows. + OrderingColumns []string `json:"orderingColumns" validate:"required"` // Snapshot is the configuration that determines whether the connector // will take a snapshot of the entire table before starting cdc mode. Snapshot bool `json:"snapshot" default:"true"` @@ -38,40 +42,41 @@ type Config struct { func (c *Config) Validate() error { // c.DSN has required validation handled in struct tag. - // c.Table required validation is handled in stuct tag - // handling "lowercase", "excludesall= " and "lte=127" validations. - if c.Table != strings.ToLower(c.Table) { - return common.NewLowercaseError(ConfigTable) - } - if strings.Contains(c.Table, " ") { - return common.NewExcludesSpacesError(ConfigTable) + // Ensure there is at least one table and one corresponding ordering column. + if len(c.Tables) == 0 || len(c.OrderingColumns) == 0 { + return common.NewNoTablesOrColumnsError() } - if len(c.Table) > common.MaxConfigStringLength { - return common.NewLessThanError(ConfigTable, common.MaxConfigStringLength) + + // Ensure that the number of tables and ordering columns match. + if len(c.Tables) != len(c.OrderingColumns) { + return common.NewMismatchedTablesAndColumnsError(len(c.Tables), len(c.OrderingColumns)) } - // c.KeyColumns handling "lowercase", "excludesall= " and "lte=127" validations. - for _, v := range c.KeyColumns { - if v != strings.ToLower(v) { - return common.NewLowercaseError(ConfigKeyColumns) + // c.Tables required validation is handled in stuct tag + // handling "lowercase", "excludesall= " and "lte=127" validations. + for i, table := range c.Tables { + if table != strings.ToLower(table) { + return common.NewLowercaseError(fmt.Sprintf("table[%d]", i)) } - if strings.Contains(v, " ") { - return common.NewExcludesSpacesError(ConfigKeyColumns) + if strings.Contains(table, " ") { + return common.NewExcludesSpacesError(fmt.Sprintf("table[%d]", i)) } - if len(v) > common.MaxConfigStringLength { - return common.NewLessThanError(ConfigKeyColumns, common.MaxConfigStringLength) + if len(table) > common.MaxConfigStringLength { + return common.NewLessThanError(fmt.Sprintf("table[%d]", i), common.MaxConfigStringLength) } } // c.OrderingColumn handling "lowercase", "excludesall= " and "lte=127" validations. - if c.OrderingColumn != strings.ToLower(c.OrderingColumn) { - return common.NewLowercaseError(ConfigOrderingColumn) - } - if strings.Contains(c.OrderingColumn, " ") { - return common.NewExcludesSpacesError(ConfigOrderingColumn) - } - if len(c.OrderingColumn) > common.MaxConfigStringLength { - return common.NewLessThanError(ConfigOrderingColumn, common.MaxConfigStringLength) + for i, col := range c.OrderingColumns { + if col != strings.ToLower(col) { + return common.NewLowercaseError(fmt.Sprintf("orderingColumn[%d]", i)) + } + if strings.Contains(col, " ") { + return common.NewExcludesSpacesError(fmt.Sprintf("orderingColumn[%d]", i)) + } + if len(col) > common.MaxConfigStringLength { + return common.NewLessThanError(fmt.Sprintf("orderingColumn[%d]", i), common.MaxConfigStringLength) + } } // c.BatchSize handling "gte=1" and "lte=100000" validations. @@ -84,3 +89,13 @@ func (c *Config) Validate() error { return nil } + +func (c *Config) GetTableOrderingMap() map[string]string { + tableOrderingMap := make(map[string]string) + + for i := range c.Tables { + tableOrderingMap[c.Tables[i]] = c.OrderingColumns[i] + } + + return tableOrderingMap +} diff --git a/source/config/config_test.go b/source/config/config_test.go index 1fc311b..c92dd7e 100644 --- a/source/config/config_test.go +++ b/source/config/config_test.go @@ -23,7 +23,6 @@ import ( const ( testValueDSN = "postgres://username:password@endpoint:5439/database" - testValueTable = "test_table" testLongString = `this_is_a_very_long_string_which_exceeds_max_config_string_limit_ abcdefghijklmnopqrstuvwxyz_zyxwvutsrqponmlkjihgfedcba_xxxxxxxx` ) @@ -37,182 +36,133 @@ func TestValidateConfig(t *testing.T) { wantErr error }{ { - name: "success_keyColumns_has_one_key", + name: "success", in: &Config{ Configuration: common.Configuration{ - DSN: testValueDSN, - Table: testValueTable, - KeyColumns: []string{"id"}, + DSN: testValueDSN, }, - OrderingColumn: "id", - BatchSize: 1, + Tables: []string{"test_table1", "test_table2"}, + OrderingColumns: []string{"id1", "id2"}, + Snapshot: true, + BatchSize: 1000, }, wantErr: nil, }, { - name: "success_keyColumns_has_two_keys", + name: "failure_no_tables", in: &Config{ Configuration: common.Configuration{ - DSN: testValueDSN, - Table: testValueTable, - KeyColumns: []string{"id", "name"}, + DSN: testValueDSN, }, - OrderingColumn: "id", - BatchSize: 1, + OrderingColumns: []string{"id"}, + BatchSize: 1000, }, - wantErr: nil, + wantErr: common.NewNoTablesOrColumnsError(), }, { - name: "success_keyColumns_ends_with_space", + name: "failure_no_ordering_columns", in: &Config{ Configuration: common.Configuration{ - DSN: testValueDSN, - Table: testValueTable, - KeyColumns: []string{"id", "name "}, + DSN: testValueDSN, }, - OrderingColumn: "id", - BatchSize: 1, + Tables: []string{"test_table"}, + BatchSize: 1000, }, - wantErr: common.NewExcludesSpacesError(ConfigKeyColumns), + wantErr: common.NewNoTablesOrColumnsError(), }, { - name: "success_keyColumns_starts_with_space", + name: "failure_mismatched_tables_and_columns", in: &Config{ Configuration: common.Configuration{ - DSN: testValueDSN, - Table: testValueTable, - KeyColumns: []string{"id", "name "}, + DSN: testValueDSN, }, - OrderingColumn: "id", - BatchSize: 1, + Tables: []string{"test_table1", "test_table2"}, + OrderingColumns: []string{"id"}, + BatchSize: 1000, }, - wantErr: common.NewExcludesSpacesError(ConfigKeyColumns), - }, - { - name: "success_keyColumns_has_two_spaces", - in: &Config{ - Configuration: common.Configuration{ - DSN: testValueDSN, - Table: testValueTable, - KeyColumns: []string{"id", " name"}, - }, - OrderingColumn: "id", - BatchSize: 1, - }, - wantErr: common.NewExcludesSpacesError(ConfigKeyColumns), - }, - { - name: "failure_table_has_space", - in: &Config{ - Configuration: common.Configuration{ - DSN: testValueDSN, - Table: "test table", - }, - OrderingColumn: "id", - BatchSize: 1, - }, - wantErr: common.NewExcludesSpacesError(ConfigTable), + wantErr: common.NewMismatchedTablesAndColumnsError(2, 1), }, { name: "failure_table_has_uppercase_letter", in: &Config{ Configuration: common.Configuration{ - DSN: testValueDSN, - Table: "Test_table", - }, - OrderingColumn: "id", - BatchSize: 1, - }, - wantErr: common.NewLowercaseError(ConfigTable), - }, - { - name: "failure_keyColumns_has_uppercase_letter", - in: &Config{ - Configuration: common.Configuration{ - DSN: testValueDSN, - Table: testValueTable, - KeyColumns: []string{"ID"}, + DSN: testValueDSN, }, - OrderingColumn: "id", - BatchSize: 1, + Tables: []string{"Test_table"}, + OrderingColumns: []string{"id"}, + BatchSize: 1000, }, - wantErr: common.NewLowercaseError(ConfigKeyColumns), + wantErr: common.NewLowercaseError("table[0]"), }, { - name: "failure_keyColumns_exceeds_max_length", + name: "failure_table_has_space", in: &Config{ Configuration: common.Configuration{ - DSN: testValueDSN, - Table: testValueTable, - KeyColumns: []string{testLongString}, + DSN: testValueDSN, }, - OrderingColumn: "id", - BatchSize: 1, + Tables: []string{"test table"}, + OrderingColumns: []string{"id"}, + BatchSize: 1000, }, - wantErr: common.NewLessThanError(ConfigKeyColumns, common.MaxConfigStringLength), + wantErr: common.NewExcludesSpacesError("table[0]"), }, { name: "failure_table_exceeds_max_length", in: &Config{ Configuration: common.Configuration{ - DSN: testValueDSN, - Table: testLongString, - KeyColumns: []string{"id"}, + DSN: testValueDSN, }, - OrderingColumn: "id", - BatchSize: 1, + Tables: []string{testLongString}, + OrderingColumns: []string{"id"}, + BatchSize: 1000, }, - wantErr: common.NewLessThanError(ConfigTable, common.MaxConfigStringLength), + wantErr: common.NewLessThanError("table[0]", common.MaxConfigStringLength), }, { name: "failure_ordering_column_has_uppercase_letter", in: &Config{ Configuration: common.Configuration{ - DSN: testValueDSN, - Table: testValueTable, - KeyColumns: []string{"id"}, + DSN: testValueDSN, }, - OrderingColumn: "ID", - BatchSize: 1, + Tables: []string{"test_table"}, + OrderingColumns: []string{"ID"}, + BatchSize: 1000, }, - wantErr: common.NewLowercaseError(ConfigOrderingColumn), + wantErr: common.NewLowercaseError("orderingColumn[0]"), }, { name: "failure_ordering_column_has_space", in: &Config{ Configuration: common.Configuration{ - DSN: testValueDSN, - Table: testValueTable, - KeyColumns: []string{"id"}, + DSN: testValueDSN, }, - OrderingColumn: " id", - BatchSize: 1, + Tables: []string{"test_table"}, + OrderingColumns: []string{" id"}, + BatchSize: 1000, }, - wantErr: common.NewExcludesSpacesError(ConfigOrderingColumn), + wantErr: common.NewExcludesSpacesError("orderingColumn[0]"), }, { name: "failure_ordering_column_exceeds_max_length", in: &Config{ Configuration: common.Configuration{ - DSN: testValueDSN, - Table: testValueTable, - KeyColumns: []string{"id"}, + DSN: testValueDSN, }, - OrderingColumn: testLongString, - BatchSize: 1, + Tables: []string{"test_table"}, + OrderingColumns: []string{testLongString}, + BatchSize: 1000, }, - wantErr: common.NewLessThanError(ConfigOrderingColumn, common.MaxConfigStringLength), + wantErr: common.NewLessThanError("orderingColumn[0]", common.MaxConfigStringLength), }, { name: "failure_batch_size_less_than_min_value", in: &Config{ Configuration: common.Configuration{ - DSN: testValueDSN, - Table: testValueTable, - KeyColumns: []string{"id"}, + DSN: testValueDSN, }, - OrderingColumn: "id", - BatchSize: 0, + Tables: []string{"test_table"}, + OrderingColumns: []string{"id"}, + BatchSize: 0, }, wantErr: common.NewGreaterThanError(ConfigBatchSize, common.MinConfigBatchSize), }, @@ -220,12 +170,11 @@ func TestValidateConfig(t *testing.T) { name: "failure_batch_size_greater_than_max_value", in: &Config{ Configuration: common.Configuration{ - DSN: testValueDSN, - Table: testValueTable, - KeyColumns: []string{"id"}, + DSN: testValueDSN, }, - OrderingColumn: "id", - BatchSize: 100001, + Tables: []string{"test_table"}, + OrderingColumns: []string{"id"}, + BatchSize: 100001, }, wantErr: common.NewLessThanError(ConfigBatchSize, common.MaxConfigBatchSize), }, diff --git a/source/config/paramgen.go b/source/config/paramgen.go index a3d10fa..e6e8409 100644 --- a/source/config/paramgen.go +++ b/source/config/paramgen.go @@ -8,12 +8,11 @@ import ( ) const ( - ConfigBatchSize = "batchSize" - ConfigDsn = "dsn" - ConfigKeyColumns = "keyColumns" - ConfigOrderingColumn = "orderingColumn" - ConfigSnapshot = "snapshot" - ConfigTable = "table" + ConfigBatchSize = "batchSize" + ConfigDsn = "dsn" + ConfigOrderingColumns = "orderingColumns" + ConfigSnapshot = "snapshot" + ConfigTables = "tables" ) func (Config) Parameters() map[string]config.Parameter { @@ -32,15 +31,9 @@ func (Config) Parameters() map[string]config.Parameter { config.ValidationRequired{}, }, }, - ConfigKeyColumns: { + ConfigOrderingColumns: { Default: "", - Description: "KeyColumns is the configuration list of column names to build the opencdc.Record.Key (for Source).", - Type: config.ParameterTypeString, - Validations: []config.Validation{}, - }, - ConfigOrderingColumn: { - Default: "", - Description: "OrderingColumn is a name of a column that the connector will use for ordering rows.", + Description: "OrderingColumns is a list of corresponding ordering columns for the table\nthat the connector will use for ordering rows.", Type: config.ParameterTypeString, Validations: []config.Validation{ config.ValidationRequired{}, @@ -52,9 +45,9 @@ func (Config) Parameters() map[string]config.Parameter { Type: config.ParameterTypeBool, Validations: []config.Validation{}, }, - ConfigTable: { + ConfigTables: { Default: "", - Description: "Table is the configuration of the table name.", + Description: "Tables is a list of table names.", Type: config.ParameterTypeString, Validations: []config.Validation{ config.ValidationRequired{}, diff --git a/source/iterator/iterator.go b/source/iterator/iterator.go index 34fc0de..3b142af 100644 --- a/source/iterator/iterator.go +++ b/source/iterator/iterator.go @@ -16,55 +16,45 @@ package iterator import ( "context" - "encoding/json" "fmt" - "net/url" - "time" + "sync" - "github.com/conduitio-labs/conduit-connector-redshift/columntypes" "github.com/conduitio-labs/conduit-connector-redshift/source/config" "github.com/conduitio/conduit-commons/opencdc" - sdk "github.com/conduitio/conduit-connector-sdk" - "github.com/huandu/go-sqlbuilder" "github.com/jmoiron/sqlx" - "go.uber.org/multierr" ) -const ( - // metadataFieldTable is a name of a record metadata field that stores a Redshift table name. - metadataFieldTable = "redshift.table" - // keySearchPath is a key of get parameter of a datatable's schema name. - keySearchPath = "search_path" - // pingTimeout is a database ping timeout. - pingTimeout = 10 * time.Second -) - -// Iterator is an implementation of an iterator for Amazon Redshift. type Iterator struct { db *sqlx.DB - rows *sqlx.Rows position *Position - // table is a table name - table string - // keyColumns are table column which the iterator will use to create a record's key - keyColumns []string - // orderingColumn is the name of the column that iterator will use for sorting data - orderingColumn string + // list of tables to fetch data from + tables []string + // orderingColumn is map of tables to its ordering column + orderingColumn map[string]string // batchSize is the size of a batch retrieved from Redshift batchSize int - // columnTypes is a mapping from column names to their respective types - columnTypes map[string]string + + // index of the current table being iterated over + currentTable int + // store individual workers for each table + workers []*Worker + + mu sync.Mutex } // New creates a new instance of the iterator. func New(ctx context.Context, driverName string, pos *Position, config config.Config) (*Iterator, error) { iterator := &Iterator{ - position: pos, - table: config.Table, - keyColumns: config.KeyColumns, - orderingColumn: config.OrderingColumn, + tables: config.Tables, + orderingColumn: config.GetTableOrderingMap(), batchSize: config.BatchSize, + currentTable: 0, + position: pos, + } + + if len(pos.TablePositions) == 0 { + iterator.position = &Position{TablePositions: make(map[string]TablePosition)} } err := iterator.openDB(ctx, driverName, config.DSN) @@ -72,44 +62,28 @@ func New(ctx context.Context, driverName string, pos *Position, config config.Co return nil, fmt.Errorf("open db: %w", err) } - if iterator.position.LastProcessedValue == nil { - latestSnapshotValue, latestValErr := iterator.latestSnapshotValue(ctx) - if latestValErr != nil { - return nil, fmt.Errorf("get latest snapshot value: %w", latestValErr) + for _, table := range iterator.tables { + position, ok := iterator.position.TablePositions[table] + if !ok { + position = TablePosition{ + LastProcessedValue: nil, + LatestSnapshotValue: nil, + } } - if config.Snapshot { - // set the LatestSnapshotValue to specify which record the snapshot iterator will work to - iterator.position.LatestSnapshotValue = latestSnapshotValue - } else { - // set the LastProcessedValue to skip a snapshot of the entire table - iterator.position.LastProcessedValue = latestSnapshotValue + worker, err := NewWorker(ctx, WorkerConfig{ + db: iterator.db, + position: position, + table: table, + orderingColumn: iterator.orderingColumn[table], + batchSize: iterator.batchSize, + snapshot: config.Snapshot, + dsn: config.DSN, + }, iterator) + if err != nil { + return nil, fmt.Errorf("create worker for table %s: %w", table, err) } - } - - uri, err := url.Parse(config.DSN) - if err != nil { - return nil, fmt.Errorf("parse dsn: %w", err) - } - - err = iterator.populateKeyColumns(ctx, uri.Query().Get(keySearchPath)) - if err != nil { - return nil, fmt.Errorf("populate key columns: %w", err) - } - - iterator.columnTypes, err = columntypes.GetColumnTypes( - ctx, - iterator.db, - iterator.table, - uri.Query().Get(keySearchPath), - ) - if err != nil { - return nil, fmt.Errorf("get column types: %w", err) - } - - err = iterator.loadRows(ctx) - if err != nil { - return nil, fmt.Errorf("load rows: %w", err) + iterator.workers = append(iterator.workers, worker) } return iterator, nil @@ -117,213 +91,63 @@ func New(ctx context.Context, driverName string, pos *Position, config config.Co // HasNext returns a bool indicating whether the source has the next record to return or not. func (iter *Iterator) HasNext(ctx context.Context) (bool, error) { - if iter.rows != nil && iter.rows.Next() { - return true, nil - } - - if err := iter.loadRows(ctx); err != nil { - return false, fmt.Errorf("load rows: %w", err) + if iter.currentTable == len(iter.tables) { + iter.currentTable = 0 } - if iter.rows.Next() { - return true, nil - } + for iter.currentTable < len(iter.tables) { + worker := iter.workers[iter.currentTable] - // At this point, there are no more rows to load - // so if we are in snapshot mode, we can switch to CDC - if iter.position.LatestSnapshotValue != nil { - // switch to CDC mode - iter.position.LastProcessedValue = iter.position.LatestSnapshotValue - iter.position.LatestSnapshotValue = nil + hasNext, err := worker.HasNext(ctx) + if err != nil { + return false, fmt.Errorf("error checking next for table %s: %w", worker.table, err) + } - // and load new rows - if err := iter.loadRows(ctx); err != nil { - return false, fmt.Errorf("load rows: %w", err) + if hasNext { + return true, nil } - return iter.rows.Next(), nil + iter.currentTable++ } return false, nil } // Next returns the next record. -func (iter *Iterator) Next(_ context.Context) (opencdc.Record, error) { - row := make(map[string]any) - if err := iter.rows.MapScan(row); err != nil { - return opencdc.Record{}, fmt.Errorf("scan rows: %w", err) - } - - transformedRow, err := columntypes.TransformRow(row, iter.columnTypes) - if err != nil { - return opencdc.Record{}, fmt.Errorf("transform row column types: %w", err) - } +func (iter *Iterator) Next(ctx context.Context) (opencdc.Record, error) { + if iter.currentTable < len(iter.tables) { + worker := iter.workers[iter.currentTable] - if _, ok := transformedRow[iter.orderingColumn]; !ok { - return opencdc.Record{}, fmt.Errorf("ordering column %q not found", iter.orderingColumn) - } - - key := make(opencdc.StructuredData) - for i := range iter.keyColumns { - val, ok := transformedRow[iter.keyColumns[i]] - if !ok { - return opencdc.Record{}, fmt.Errorf("key column %q not found", iter.keyColumns[i]) + record, err := worker.Next(ctx) + if err != nil { + return opencdc.Record{}, fmt.Errorf("error getting next record from table %s: %w", worker.table, err) } - key[iter.keyColumns[i]] = val + return record, nil } - rowBytes, err := json.Marshal(transformedRow) - if err != nil { - return opencdc.Record{}, fmt.Errorf("marshal row: %w", err) - } - - // set a new position into the variable, - // to avoid saving position into the struct until we marshal the position - position := *iter.position - // set the value from iter.orderingColumn column you chose - position.LastProcessedValue = transformedRow[iter.orderingColumn] - - sdkPosition, err := position.marshal() - if err != nil { - return opencdc.Record{}, fmt.Errorf("failed converting to SDK position :%w", err) - } - - iter.position = &position - - metadata := opencdc.Metadata{ - metadataFieldTable: iter.table, - } - metadata.SetCreatedAt(time.Now().UTC()) - - if position.LatestSnapshotValue != nil { - return sdk.Util.Source.NewRecordSnapshot(sdkPosition, metadata, key, opencdc.RawData(rowBytes)), nil - } - - return sdk.Util.Source.NewRecordCreate(sdkPosition, metadata, key, opencdc.RawData(rowBytes)), nil + return opencdc.Record{}, fmt.Errorf("no more records") } -// Stop stops iterators and closes database connection. +// Stop stops iterator and closes database connection. func (iter *Iterator) Stop() error { - var err error - - if iter.rows != nil { - err = iter.rows.Close() + for _, worker := range iter.workers { + err := worker.Stop() + if err != nil { + return fmt.Errorf("stop worker: %w", err) + } } if iter.db != nil { - err = multierr.Append(err, iter.db.Close()) - } - - if err != nil { - return fmt.Errorf("close db rows and db: %w", err) - } - - return nil -} - -// loadRows selects a batch of rows from a database, based on the -// table, columns, orderingColumn, batchSize and current position. -func (iter *Iterator) loadRows(ctx context.Context) error { - var err error - - sb := sqlbuilder.PostgreSQL.NewSelectBuilder(). - Select("*"). - From(iter.table). - OrderBy(iter.orderingColumn). - Limit(iter.batchSize) - - if iter.position.LastProcessedValue != nil { - sb.Where(sb.GreaterThan(iter.orderingColumn, iter.position.LastProcessedValue)) - } - - if iter.position.LatestSnapshotValue != nil { - sb.Where(sb.LessEqualThan(iter.orderingColumn, iter.position.LatestSnapshotValue)) - } - - query, args := sb.Build() - - iter.rows, err = iter.db.QueryxContext(ctx, query, args...) - if err != nil { - return fmt.Errorf("execute select data query %q, %v: %w", query, args, err) - } - - return nil -} - -// populateKeyColumns populates keyColumn from the database metadata -// or from the orderingColumn configuration field in the described order if it's empty. -func (iter *Iterator) populateKeyColumns(ctx context.Context, schema string) error { - if len(iter.keyColumns) != 0 { - return nil - } - - sb := sqlbuilder.PostgreSQL.NewSelectBuilder(). - Select("kcu.column_name"). - From("information_schema.table_constraints tco"). - Join( - "information_schema.key_column_usage kcu", - "kcu.constraint_name = tco.constraint_name "+ - "AND kcu.constraint_schema = tco.constraint_schema AND kcu.constraint_name = tco.constraint_name", - ). - Where("tco.constraint_type = 'PRIMARY KEY'") - - sb.Where(sb.Equal("kcu.table_name", iter.table)) - - if schema != "" { - sb.Where(sb.Equal("kcu.table_schema", schema)) - } - - query, args := sb.Build() - - rows, err := iter.db.QueryxContext(ctx, query, args...) - if err != nil { - return fmt.Errorf("execute query select primary keys %q, %v: %w", query, args, err) - } - defer rows.Close() - - columnName := "" - for rows.Next() { - if err = rows.Scan(&columnName); err != nil { - return fmt.Errorf("scan primary key column name: %w", err) + err := iter.db.Close() + if err != nil { + return fmt.Errorf("close db: %w", err) } - - iter.keyColumns = append(iter.keyColumns, columnName) - } - - if len(iter.keyColumns) == 0 { - iter.keyColumns = []string{iter.orderingColumn} } return nil } -// latestSnapshotValue returns most recent value of orderingColumn column. -func (iter *Iterator) latestSnapshotValue(ctx context.Context) (any, error) { - var latestSnapshotValue any - - query := sqlbuilder.PostgreSQL.NewSelectBuilder(). - Select(iter.orderingColumn). - From(iter.table). - OrderBy(iter.orderingColumn).Desc(). - Limit(1). - String() - - rows, err := iter.db.QueryxContext(ctx, query) - if err != nil { - return nil, fmt.Errorf("execute select latest snapshot value query %q: %w", query, err) - } - defer rows.Close() - - for rows.Next() { - if err = rows.Scan(&latestSnapshotValue); err != nil { - return nil, fmt.Errorf("scan latest snapshot value: %w", err) - } - } - - return latestSnapshotValue, nil -} - func (iter *Iterator) openDB(ctx context.Context, driverName string, dsn string) error { var err error iter.db, err = sqlx.Open(driverName, dsn) @@ -341,3 +165,19 @@ func (iter *Iterator) openDB(ctx context.Context, driverName string, dsn string) return nil } + +func (iter *Iterator) updateTablePosition(table string, newPosition TablePosition) { + iter.mu.Lock() + defer iter.mu.Unlock() + + iter.position.TablePositions[table] = newPosition +} + +func (iter *Iterator) getPosition() *Position { + iter.mu.Lock() + defer iter.mu.Unlock() + + position := iter.position + + return position +} diff --git a/source/iterator/position.go b/source/iterator/position.go index 74deae5..6deff81 100644 --- a/source/iterator/position.go +++ b/source/iterator/position.go @@ -23,6 +23,10 @@ import ( // Position represents Redshift's position. type Position struct { + TablePositions map[string]TablePosition `json:"tablePositions"` +} + +type TablePosition struct { // LastProcessedValue represents the last processed value from ordering column. LastProcessedValue any `json:"lastProcessedValue"` // LatestSnapshotValue represents the most recent value of ordering column. diff --git a/source/iterator/position_test.go b/source/iterator/position_test.go index 2321b73..b373fb2 100644 --- a/source/iterator/position_test.go +++ b/source/iterator/position_test.go @@ -37,50 +37,62 @@ func TestParseSDKPosition(t *testing.T) { wantPos: Position{}, }, { - name: "success_float64_fields", + name: "success_single_table", in: opencdc.Position(`{ - "lastProcessedValue":10, - "latestSnapshotValue":30 + "tablePositions": { + "table1": { + "lastProcessedValue": 10, + "latestSnapshotValue": 30 + } + } }`), wantPos: Position{ - LastProcessedValue: float64(10), - LatestSnapshotValue: float64(30), + TablePositions: map[string]TablePosition{ + "table1": { + LastProcessedValue: float64(10), + LatestSnapshotValue: float64(30), + }, + }, }, }, { - name: "success_string_fields", + name: "success_multiple_tables", in: opencdc.Position(`{ - "lastProcessedValue":"abc", - "latestSnapshotValue":"def" + "tablePositions": { + "table1": { + "lastProcessedValue": "abc", + "latestSnapshotValue": "def" + }, + "table2": { + "lastProcessedValue": 20, + "latestSnapshotValue": 40 + } + } }`), wantPos: Position{ - LastProcessedValue: "abc", - LatestSnapshotValue: "def", + TablePositions: map[string]TablePosition{ + "table1": { + LastProcessedValue: "abc", + LatestSnapshotValue: "def", + }, + "table2": { + LastProcessedValue: float64(20), + LatestSnapshotValue: float64(40), + }, + }, }, }, { - name: "success_lastProcessedValue_only", - in: opencdc.Position(`{ - "lastProcessedValue":10 - }`), - wantPos: Position{ - LastProcessedValue: float64(10), - }, - }, - { - name: "success_latestSnapshotValue_only", - in: opencdc.Position(`{ - "latestSnapshotValue":30 - }`), + name: "success_empty_table_positions", + in: opencdc.Position(`{"tablePositions": {}}`), wantPos: Position{ - LatestSnapshotValue: float64(30), + TablePositions: map[string]TablePosition{}, }, }, { - name: "failure_required_dsn_and_table", - in: opencdc.Position("invalid"), - wantErr: errors.New("unmarshal opencdc.Position into Position: " + - "invalid character 'i' looking for beginning of value"), + name: "failure_invalid_json", + in: opencdc.Position("invalid"), + wantErr: errors.New("unmarshal opencdc.Position into Position: invalid character 'i' looking for beginning of value"), }, } @@ -110,39 +122,37 @@ func TestMarshal(t *testing.T) { want opencdc.Position }{ { - name: "success_position_is_nil", - in: Position{}, - want: opencdc.Position(`{"lastProcessedValue":null,"latestSnapshotValue":null}`), - }, - { - name: "success_integer_fields", - in: Position{ - LastProcessedValue: 10, - LatestSnapshotValue: 30, - }, - want: opencdc.Position(`{"lastProcessedValue":10,"latestSnapshotValue":30}`), - }, - { - name: "success_string_fields", - in: Position{ - LastProcessedValue: "abc", - LatestSnapshotValue: "def", - }, - want: opencdc.Position(`{"lastProcessedValue":"abc","latestSnapshotValue":"def"}`), + name: "success_empty_table_positions", + in: Position{TablePositions: map[string]TablePosition{}}, + want: opencdc.Position(`{"tablePositions":{}}`), }, { - name: "success_lastProcessedValue_only", + name: "success_single_table", in: Position{ - LastProcessedValue: float64(10), + TablePositions: map[string]TablePosition{ + "table1": { + LastProcessedValue: 10, + LatestSnapshotValue: 30, + }, + }, }, - want: opencdc.Position(`{"lastProcessedValue":10,"latestSnapshotValue":null}`), + want: opencdc.Position(`{"tablePositions":{"table1":{"lastProcessedValue":10,"latestSnapshotValue":30}}}`), }, { - name: "success_latestSnapshotValue_only", + name: "success_multiple_tables", in: Position{ - LatestSnapshotValue: float64(30), + TablePositions: map[string]TablePosition{ + "table1": { + LastProcessedValue: "abc", + LatestSnapshotValue: "def", + }, + "table2": { + LastProcessedValue: 20, + LatestSnapshotValue: 40, + }, + }, }, - want: opencdc.Position(`{"lastProcessedValue":null,"latestSnapshotValue":30}`), + want: opencdc.Position(`{"tablePositions":{"table1":{"lastProcessedValue":"abc","latestSnapshotValue":"def"},"table2":{"lastProcessedValue":20,"latestSnapshotValue":40}}}`), }, } for _, tt := range tests { diff --git a/source/iterator/worker.go b/source/iterator/worker.go new file mode 100644 index 0000000..bee1798 --- /dev/null +++ b/source/iterator/worker.go @@ -0,0 +1,351 @@ +// Copyright © 2022 Meroxa, Inc. & Yalantis +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package iterator + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "time" + + "github.com/conduitio-labs/conduit-connector-redshift/columntypes" + "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/huandu/go-sqlbuilder" + "github.com/jmoiron/sqlx" +) + +const ( + // metadataFieldTable is a name of a record metadata field that stores a Redshift table name. + metadataFieldTable = "opencdc.collection" + // keySearchPath is a key of get parameter of a datatable's schema name. + keySearchPath = "search_path" + // pingTimeout is a database ping timeout. + pingTimeout = 10 * time.Second +) + +type Worker struct { + db *sqlx.DB + rows *sqlx.Rows + position TablePosition + + // table is a table name + table string + // keyColumns are table column which the worker will use to create a record's key + keyColumns []string + // orderingColumn is the name of the column that worker will use for sorting data + orderingColumn string + // batchSize is the size of a batch retrieved from Redshift + batchSize int + // columnTypes is a mapping from column names to their respective types + columnTypes map[string]string + + // iterator is an instance of the iterator + iterator *Iterator +} + +type WorkerConfig struct { + db *sqlx.DB + position TablePosition + + table string + orderingColumn string + batchSize int + // snapshot is the configuration that determines whether the connector + // will take a snapshot of the entire table before starting cdc mode. + snapshot bool + dsn string +} + +// NewWorker creates a new instance of the worker. +func NewWorker(ctx context.Context, config WorkerConfig, iterator *Iterator) (*Worker, error) { + worker := &Worker{ + db: config.db, + position: config.position, + table: config.table, + keyColumns: []string{}, + orderingColumn: config.orderingColumn, + batchSize: config.batchSize, + iterator: iterator, + } + + if worker.position.LastProcessedValue == nil { + latestSnapshotValue, latestValErr := worker.latestSnapshotValue(ctx) + if latestValErr != nil { + return nil, fmt.Errorf("get latest snapshot value: %w", latestValErr) + } + + if config.snapshot { + // set the LatestSnapshotValue to specify which record the snapshot worker will work to + worker.position.LatestSnapshotValue = latestSnapshotValue + } else { + // set the LastProcessedValue to skip a snapshot of the entire table + worker.position.LastProcessedValue = latestSnapshotValue + } + } + + uri, err := url.Parse(config.dsn) + if err != nil { + return nil, fmt.Errorf("parse dsn: %w", err) + } + + err = worker.populateKeyColumns(ctx, uri.Query().Get(keySearchPath)) + if err != nil { + return nil, fmt.Errorf("populate key columns: %w", err) + } + + worker.columnTypes, err = columntypes.GetColumnTypes( + ctx, + worker.db, + worker.table, + uri.Query().Get(keySearchPath), + ) + if err != nil { + return nil, fmt.Errorf("get column types: %w", err) + } + + err = worker.loadRows(ctx) + if err != nil { + return nil, fmt.Errorf("load rows: %w", err) + } + + return worker, nil +} + +// HasNext returns a bool indicating whether the source has the next record to return or not. +func (worker *Worker) HasNext(ctx context.Context) (bool, error) { + if worker.rows != nil && worker.rows.Next() { + return true, nil + } + + if err := worker.loadRows(ctx); err != nil { + return false, fmt.Errorf("load rows: %w", err) + } + + if worker.rows.Next() { + return true, nil + } + + // At this point, there are no more rows to load + // so if we are in snapshot mode, we can switch to CDC + if worker.position.LatestSnapshotValue != nil { + // switch to CDC mode + worker.position.LastProcessedValue = worker.position.LatestSnapshotValue + worker.position.LatestSnapshotValue = nil + + worker.iterator.updateTablePosition(worker.table, worker.position) + + // and load new rows + if err := worker.loadRows(ctx); err != nil { + return false, fmt.Errorf("load rows: %w", err) + } + + return worker.rows.Next(), nil + } + + return false, nil +} + +// Next returns the next record. +func (worker *Worker) Next(_ context.Context) (opencdc.Record, error) { + row := make(map[string]any) + if err := worker.rows.MapScan(row); err != nil { + return opencdc.Record{}, fmt.Errorf("scan rows: %w", err) + } + + transformedRow, err := columntypes.TransformRow(row, worker.columnTypes) + if err != nil { + return opencdc.Record{}, fmt.Errorf("transform row column types: %w", err) + } + + if _, ok := transformedRow[worker.orderingColumn]; !ok { + return opencdc.Record{}, fmt.Errorf("ordering column %q not found", worker.orderingColumn) + } + + key := make(opencdc.StructuredData) + for i := range worker.keyColumns { + val, ok := transformedRow[worker.keyColumns[i]] + if !ok { + return opencdc.Record{}, fmt.Errorf("key column %q not found", worker.keyColumns[i]) + } + + key[worker.keyColumns[i]] = val + } + + rowBytes, err := json.Marshal(transformedRow) + if err != nil { + return opencdc.Record{}, fmt.Errorf("marshal row: %w", err) + } + + // set a new position into the variable, + // to avoid saving position into the struct until we marshal the position + position := worker.iterator.getPosition() + + tablePos := worker.getTablePosition(position, transformedRow) + + // set the value from iter.orderingColumn column you chose + position.TablePositions[worker.table] = tablePos + + sdkPosition, err := position.marshal() + if err != nil { + return opencdc.Record{}, fmt.Errorf("failed converting to SDK position :%w", err) + } + + worker.position = position.TablePositions[worker.table] + worker.iterator.updateTablePosition(worker.table, tablePos) + + metadata := opencdc.Metadata{ + metadataFieldTable: worker.table, + } + metadata.SetCreatedAt(time.Now().UTC()) + + if worker.position.LatestSnapshotValue != nil { + return sdk.Util.Source.NewRecordSnapshot(sdkPosition, metadata, key, opencdc.RawData(rowBytes)), nil + } + + return sdk.Util.Source.NewRecordCreate(sdkPosition, metadata, key, opencdc.RawData(rowBytes)), nil +} + +// Stop stops worker. +func (worker *Worker) Stop() error { + var err error + + if worker.rows != nil { + err = worker.rows.Close() + } + + if err != nil { + return fmt.Errorf("close db rows: %w", err) + } + + return nil +} + +// loadRows selects a batch of rows from a database, based on the +// table, columns, orderingColumn, batchSize and current position. +func (worker *Worker) loadRows(ctx context.Context) error { + var err error + + sb := sqlbuilder.PostgreSQL.NewSelectBuilder(). + Select("*"). + From(worker.table). + OrderBy(worker.orderingColumn). + Limit(worker.batchSize) + + if worker.position.LastProcessedValue != nil { + sb.Where(sb.GreaterThan(worker.orderingColumn, worker.position.LastProcessedValue)) + } + + if worker.position.LatestSnapshotValue != nil { + sb.Where(sb.LessEqualThan(worker.orderingColumn, worker.position.LatestSnapshotValue)) + } + + query, args := sb.Build() + + worker.rows, err = worker.db.QueryxContext(ctx, query, args...) + if err != nil { + return fmt.Errorf("execute select data query %q, %v: %w", query, args, err) + } + + return nil +} + +// populateKeyColumns populates keyColumn from the database metadata +// or from the orderingColumn configuration field in the described order if it's empty. +func (worker *Worker) populateKeyColumns(ctx context.Context, schema string) error { + if len(worker.keyColumns) != 0 { + return nil + } + + sb := sqlbuilder.PostgreSQL.NewSelectBuilder(). + Select("kcu.column_name"). + From("information_schema.table_constraints tco"). + Join( + "information_schema.key_column_usage kcu", + "kcu.constraint_name = tco.constraint_name "+ + "AND kcu.constraint_schema = tco.constraint_schema AND kcu.constraint_name = tco.constraint_name", + ). + Where("tco.constraint_type = 'PRIMARY KEY'") + + sb.Where(sb.Equal("kcu.table_name", worker.table)) + + if schema != "" { + sb.Where(sb.Equal("kcu.table_schema", schema)) + } + + query, args := sb.Build() + + rows, err := worker.db.QueryxContext(ctx, query, args...) + if err != nil { + return fmt.Errorf("execute query select primary keys %q, %v: %w", query, args, err) + } + defer rows.Close() + + columnName := "" + for rows.Next() { + if err = rows.Scan(&columnName); err != nil { + return fmt.Errorf("scan primary key column name: %w", err) + } + + worker.keyColumns = append(worker.keyColumns, columnName) + } + + if len(worker.keyColumns) == 0 { + worker.keyColumns = []string{worker.orderingColumn} + } + + return nil +} + +// latestSnapshotValue returns most recent value of orderingColumn column. +func (worker *Worker) latestSnapshotValue(ctx context.Context) (any, error) { + var latestSnapshotValue any + + query := sqlbuilder.PostgreSQL.NewSelectBuilder(). + Select(worker.orderingColumn). + From(worker.table). + OrderBy(worker.orderingColumn).Desc(). + Limit(1). + String() + + rows, err := worker.db.QueryxContext(ctx, query) + if err != nil { + return nil, fmt.Errorf("execute select latest snapshot value query %q: %w", query, err) + } + defer rows.Close() + + for rows.Next() { + if err = rows.Scan(&latestSnapshotValue); err != nil { + return nil, fmt.Errorf("scan latest snapshot value: %w", err) + } + } + + return latestSnapshotValue, nil +} + +func (worker *Worker) getTablePosition(position *Position, transformedRow map[string]any) TablePosition { + tablePos, ok := position.TablePositions[worker.table] + if !ok { + tablePos = TablePosition{ + LastProcessedValue: transformedRow[worker.orderingColumn], + LatestSnapshotValue: worker.position.LatestSnapshotValue, + } + } else { + tablePos.LastProcessedValue = transformedRow[worker.orderingColumn] + } + + return tablePos +} diff --git a/source/source_integration_test.go b/source/source_integration_test.go index fe21c76..7d4a66e 100644 --- a/source/source_integration_test.go +++ b/source/source_integration_test.go @@ -88,11 +88,11 @@ func TestSource_Read_tableHasNoData(t *testing.T) { err = db.PingContext(ctxTimeout) is.NoErr(err) - _, err = db.Exec(fmt.Sprintf("CREATE TABLE %s (col INTEGER, PRIMARY KEY (col));", cfg[config.ConfigTable])) + _, err = db.Exec(fmt.Sprintf("CREATE TABLE %s (col INTEGER, PRIMARY KEY (col));", cfg[config.ConfigTables])) is.NoErr(err) defer func() { - _, err = db.Exec(fmt.Sprintf("DROP TABLE %s;", cfg[config.ConfigTable])) + _, err = db.Exec(fmt.Sprintf("DROP TABLE %s;", cfg[config.ConfigTables])) is.NoErr(err) }() @@ -132,15 +132,15 @@ func TestSource_Read_keyColumnsFromConfig(t *testing.T) { err = db.PingContext(ctxTimeout) is.NoErr(err) - _, err = db.Exec(fmt.Sprintf("CREATE TABLE %s (col1 INTEGER, col2 INTEGER);", cfg[config.ConfigTable])) + _, err = db.Exec(fmt.Sprintf("CREATE TABLE %s (col1 INTEGER, col2 INTEGER);", cfg[config.ConfigTables])) is.NoErr(err) defer func() { - _, err = db.Exec(fmt.Sprintf("DROP TABLE %s;", cfg[config.ConfigTable])) + _, err = db.Exec(fmt.Sprintf("DROP TABLE %s;", cfg[config.ConfigTables])) is.NoErr(err) }() - _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (1, 2);", cfg[config.ConfigTable])) + _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (1, 2);", cfg[config.ConfigTables])) is.NoErr(err) src := NewSource() @@ -185,15 +185,15 @@ func TestSource_Read_keyColumnsFromTableMetadata(t *testing.T) { _, err = db.Exec(fmt.Sprintf( "CREATE TABLE %s (col1 INTEGER, col2 INTEGER, col3 INTEGER, PRIMARY KEY (col1, col2, col3));", - cfg[config.ConfigTable])) + cfg[config.ConfigTables])) is.NoErr(err) defer func() { - _, err = db.Exec(fmt.Sprintf("DROP TABLE %s;", cfg[config.ConfigTable])) + _, err = db.Exec(fmt.Sprintf("DROP TABLE %s;", cfg[config.ConfigTables])) is.NoErr(err) }() - _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (1, 2, 3);", cfg[config.ConfigTable])) + _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (1, 2, 3);", cfg[config.ConfigTables])) is.NoErr(err) src := NewSource() @@ -237,15 +237,15 @@ func TestSource_Read_keyColumnsFromOrderingColumn(t *testing.T) { err = db.PingContext(ctxTimeout) is.NoErr(err) - _, err = db.Exec(fmt.Sprintf("CREATE TABLE %s (col1 INTEGER, col2 INTEGER);", cfg[config.ConfigTable])) + _, err = db.Exec(fmt.Sprintf("CREATE TABLE %s (col1 INTEGER, col2 INTEGER);", cfg[config.ConfigTables])) is.NoErr(err) defer func() { - _, err = db.Exec(fmt.Sprintf("DROP TABLE %s;", cfg[config.ConfigTable])) + _, err = db.Exec(fmt.Sprintf("DROP TABLE %s;", cfg[config.ConfigTables])) is.NoErr(err) }() - _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (1, 2);", cfg[config.ConfigTable])) + _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (1, 2);", cfg[config.ConfigTables])) is.NoErr(err) src := NewSource() @@ -331,11 +331,11 @@ func TestSource_Read_checkTypes(t *testing.T) { time_type time, time_tz_type timetz, varbyte_type varbyte - );`, cfg[config.ConfigTable])) + );`, cfg[config.ConfigTables])) is.NoErr(err) defer func() { - _, err = db.Exec(fmt.Sprintf("DROP TABLE %s;", cfg[config.ConfigTable])) + _, err = db.Exec(fmt.Sprintf("DROP TABLE %s;", cfg[config.ConfigTables])) is.NoErr(err) }() @@ -366,7 +366,7 @@ func TestSource_Read_checkTypes(t *testing.T) { } _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16);", - cfg[config.ConfigTable]), + cfg[config.ConfigTables]), want.SmallIntType, want.IntegerType, want.BigIntType, @@ -446,16 +446,16 @@ func TestSource_Read_snapshotIsFalse(t *testing.T) { err = db.PingContext(ctxTimeout) is.NoErr(err) - _, err = db.Exec(fmt.Sprintf("CREATE TABLE %s (col1 INTEGER, col2 INTEGER);", cfg[config.ConfigTable])) + _, err = db.Exec(fmt.Sprintf("CREATE TABLE %s (col1 INTEGER, col2 INTEGER);", cfg[config.ConfigTables])) is.NoErr(err) defer func() { - _, err = db.Exec(fmt.Sprintf("DROP TABLE %s;", cfg[config.ConfigTable])) + _, err = db.Exec(fmt.Sprintf("DROP TABLE %s;", cfg[config.ConfigTables])) is.NoErr(err) }() // insert a row to be sure that this data will not be transferred to the destination - _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (1, 2);", cfg[config.ConfigTable])) + _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (1, 2);", cfg[config.ConfigTables])) is.NoErr(err) src := NewSource() @@ -470,7 +470,7 @@ func TestSource_Read_snapshotIsFalse(t *testing.T) { is.Equal(err, sdk.ErrBackoffRetry) // insert an additional row - _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (3, 4);", cfg[config.ConfigTable])) + _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (3, 4);", cfg[config.ConfigTables])) is.NoErr(err) record, err := src.Read(ctx) @@ -491,7 +491,7 @@ func TestSource_Read_snapshotIsFalse(t *testing.T) { // prepareConfig retrieves the value of the environment variable named by envNameDSN, // generates a name of database's table and returns a configuration map. -func prepareConfig(t *testing.T, orderingColumn string, keyColumns ...string) map[string]string { +func prepareConfig(t *testing.T, orderingColumn string, _ ...string) map[string]string { t.Helper() dsn := os.Getenv(envNameDSN) @@ -502,9 +502,8 @@ func prepareConfig(t *testing.T, orderingColumn string, keyColumns ...string) ma } return map[string]string{ - config.ConfigDsn: dsn, - config.ConfigTable: fmt.Sprintf("conduit_src_test_%d", time.Now().UnixNano()), - config.ConfigOrderingColumn: orderingColumn, - config.ConfigKeyColumns: strings.Join(keyColumns, ","), + config.ConfigDsn: dsn, + config.ConfigTables: fmt.Sprintf("conduit_src_test_%d", time.Now().UnixNano()), + config.ConfigOrderingColumns: orderingColumn, } } diff --git a/source/source_test.go b/source/source_test.go index b175110..5ddfb5d 100644 --- a/source/source_test.go +++ b/source/source_test.go @@ -40,19 +40,19 @@ func TestSource_Configure_requiredFieldsSuccess(t *testing.T) { s := Source{} err := s.Configure(context.Background(), map[string]string{ - config.ConfigDsn: testDSN, - config.ConfigTable: testTable, - config.ConfigOrderingColumn: "created_at", + config.ConfigDsn: testDSN, + config.ConfigTables: testTable, + config.ConfigOrderingColumns: "created_at", }) is.NoErr(err) is.Equal(s.config, config.Config{ Configuration: common.Configuration{ - DSN: testDSN, - Table: testTable, + DSN: testDSN, }, - OrderingColumn: "created_at", - Snapshot: true, - BatchSize: 1000, + Tables: []string{testTable}, + OrderingColumns: []string{"created_at"}, + Snapshot: true, + BatchSize: 1000, }) } @@ -64,23 +64,21 @@ func TestSource_Configure_allFieldsSuccess(t *testing.T) { s := Source{} err := s.Configure(context.Background(), map[string]string{ - config.ConfigDsn: testDSN, - config.ConfigTable: testTable, - config.ConfigOrderingColumn: "created_at", - config.ConfigSnapshot: "false", - config.ConfigKeyColumns: "id,name", - config.ConfigBatchSize: "10000", + config.ConfigDsn: testDSN, + config.ConfigTables: testTable, + config.ConfigOrderingColumns: "created_at", + config.ConfigSnapshot: "false", + config.ConfigBatchSize: "10000", }) is.NoErr(err) is.Equal(s.config, config.Config{ Configuration: common.Configuration{ - DSN: testDSN, - Table: testTable, - KeyColumns: []string{"id", "name"}, + DSN: testDSN, }, - OrderingColumn: "created_at", - Snapshot: false, - BatchSize: 10000, + Tables: []string{testTable}, + OrderingColumns: []string{"created_at"}, + Snapshot: false, + BatchSize: 10000, }) } @@ -92,11 +90,11 @@ func TestSource_Configure_failure(t *testing.T) { s := Source{} err := s.Configure(context.Background(), map[string]string{ - config.ConfigDsn: testDSN, - config.ConfigTable: testTable, + config.ConfigDsn: testDSN, + config.ConfigTables: testTable, }) is.True(err != nil) - is.Equal(err.Error(), `config invalid: error validating "orderingColumn": required parameter is not provided`) + is.Equal(err.Error(), `config invalid: error validating "orderingColumns": required parameter is not provided`) } func TestSource_Read_success(t *testing.T) { From 422aa47363bf2f61c513cebe33a7c4b9d6a4dbde Mon Sep 17 00:00:00 2001 From: Gaurav Sahil Date: Tue, 8 Oct 2024 21:16:21 +0530 Subject: [PATCH 2/4] updated source paramgen --- source/config/paramgen.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/config/paramgen.go b/source/config/paramgen.go index e6e8409..a5c4f70 100644 --- a/source/config/paramgen.go +++ b/source/config/paramgen.go @@ -47,7 +47,7 @@ func (Config) Parameters() map[string]config.Parameter { }, ConfigTables: { Default: "", - Description: "Tables is a list of table names.", + Description: "Tables is a list of table names to pull data from.", Type: config.ParameterTypeString, Validations: []config.Validation{ config.ValidationRequired{}, From b86475d1480ad54facc73b7b64b0203ff26e3d87 Mon Sep 17 00:00:00 2001 From: Gaurav Sahil Date: Mon, 21 Oct 2024 17:32:01 +0530 Subject: [PATCH 3/4] fix: pr comments --- README.md | 52 +++- acceptance_test.go | 10 +- common/configuration.go | 2 + common/error.go | 23 -- destination/config/config.go | 79 +++++- destination/config/config_test.go | 120 +++++++- destination/config/paramgen.go | 15 +- destination/destination.go | 2 + destination/destination_integration_test.go | 6 + destination/writer/writer.go | 122 ++++++--- go.mod | 2 +- source/config/config.go | 141 +++++++--- source/config/config_test.go | 286 +++++++++++++++----- source/config/paramgen.go | 52 ++-- source/iterator/iterator.go | 32 +-- source/iterator/worker.go | 115 ++++---- source/source.go | 2 + source/source_integration_test.go | 45 +-- source/source_test.go | 51 ++-- 19 files changed, 814 insertions(+), 343 deletions(-) diff --git a/README.md b/README.md index d56dc33..e5a5c1c 100644 --- a/README.md +++ b/README.md @@ -21,15 +21,15 @@ to the environment variables as an `REDSHIFT_DSN`. ## Source -Conduit's Redshift source connector allows you to move data from multiple Redshift tables with the specified `dsn` and `tables` -configuration parameters. Upon starting, the source connector takes a snapshot the tables in the database, then +Conduit's Redshift source connector allows you to move data from Redshift tables with the specified `dsn` and `tables` +configuration parameters. Upon starting, the source connector takes a snapshot of given tables in the database, then switches into change data capture (CDC) mode. In CDC mode, the connector will only detect new rows. ### Snapshots -At the first launch of the connector, the snapshot mode is enabled and the last value of the `orderingColumn` corresponding to a table is stored +At the first launch of the connector, the snapshot mode is enabled and the last value of the `orderingColumn` is stored to the position, to know the boundary of this mode. The connector reads all rows of a table in batches, using a -keyset pagination, limiting the rows by `batchSize` and ordering by `orderingColumn` and then does the same for other tables. The connector stores the +keyset pagination, limiting the rows by `batchSize` and ordering by `orderingColumn`. The connector stores the last processed element value of an `orderingColumn` in a position, so the snapshot process can be paused and resumed without losing data. Once all rows in that initial snapshot are read the connector switches into CDC mode. @@ -42,19 +42,52 @@ pagination, limiting by `batchSize` and ordering by `orderingColumn`. ### Configuration +> [!IMPORTANT] +> Parameters starting with `tables.*` are used to configure the orderingColumn and +> keyColumns for a specific table. The `*` in the parameter name should be +> replaced with the table name. + | name | description | required | example | default value | |------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------------------------------------------------------|---------------| | `dsn` | [DSN](https://en.wikipedia.org/wiki/Data_source_name) to connect to Redshift. | **true** | `postgres://username:password@endpoint:5439/database` | | -| `tables` | Name of the tables from which the connector reads from. | **true** | `table1,table2` | | -| `orderingColumns` | Corresponding Columns for the tables used to order the rows.
Keep in mind that the data will be sorted by this column, so the column must contain unique, consistent values suitable for sorting. | **true** | `id,created_at` | | +| `tables.*.orderingColumn` | Column used to order the rows.
Keep in mind that the data will be sorted by this column, so the column must contain unique, consistent values suitable for sorting. | true | | +| `tables.*.keyColumns` | Comma-separated list of column names to build the sdk.Record.Key. See more: [Key handling](#key-handling). | false | | +| ~~`table`~~ | Name of the table from which the connector reads from. **Deprecated: use `tables` instead.** | **false** | `table_name` | | +| ~~`orderingColumn`~~ | Column used to order the rows.
Keep in mind that the data will be sorted by this column, so the column must contain unique, consistent values suitable for sorting. **Deprecated: use `tables.*.orderingColumn` instead.** | **false** | `id` | | | `snapshot` | Whether the connector will take a snapshot of the entire table before starting cdc mode. | false | `false` | "true" | +| ~~`keyColumns`~~ | Comma-separated list of column names to build the `sdk.Record.Key`. **Deprecated: use `tables.*.keyColumns` instead.** | false | `id,name` | | | `batchSize` | Size of rows batch. Min is 1 and max is 100000. | false | `100` | "1000" | +### Example + +#### Collections + +The following configuration reads records from `users` and `orders` table + +```yaml +version: 2.2 +pipelines: + - id: example + status: running + connectors: + - id: example + type: source + plugin: redshift + settings: + dsn: "sample_dsn" + # table "users" + tables.users.orderingColumn: "foo" + tables.users.keyColumns: "foo,bar" + # table "orders" + tables.orders.orderingColumn: "id" +``` + + ### Key handling The connector builds `sdk.Record.Key` as `sdk.StructuredData`. The keys of this field consist of elements of -the `keyColumns` configuration field. If `keyColumns` is empty, the connector uses the primary keys of the specified -table; otherwise, if the table has no primary keys, it uses the value of the `orderingColumn` field. The values +the `keyColumns` of a specific table. If `keyColumns` is empty, the connector uses the primary keys of the +table; otherwise, if the table has no primary keys, it uses the value of the `orderingColumn` field of that table. The values of `sdk.Record.Key` field are taken from `sdk.Payload.After` by the keys of this field. ### Table Name @@ -73,7 +106,8 @@ configuration parameters. It takes an `sdk.Record` and parses it into a valid SQ | name | description | required | example | default | |--------------|-------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------------------------------------------------------|---------| | `dsn` | [DSN](https://en.wikipedia.org/wiki/Data_source_name) to connect to Redshift. | **true** | `postgres://username:password@endpoint:5439/database` | "" | -| `table` | Name of the table the connector writes to. | **false** | `table_name` | "{{ index .Metadata "opencdc.collection" }}" | +| `table` | Name of the table the connector writes to. It can contain a Go template that will be executed for each record to determine the table. By default, the table is the value of the opencdc.collection metadata field. | **false** | `table_name` | {{ index .Metadata \"opencdc.collection\" }} | +| `keyColumns` | Comma-separated list of column names to build the where clause in case if `sdk.Record.Key` is empty.
See more: [Key handling](#key-handling-1). | false | `id,name` | "" | ### Key handling diff --git a/acceptance_test.go b/acceptance_test.go index 9facfbb..03a07b6 100644 --- a/acceptance_test.go +++ b/acceptance_test.go @@ -35,8 +35,6 @@ const ( driverName = "pgx" // envNameDSN is a Redshift dsn environment name. envNameDSN = "REDSHIFT_DSN" - // metadataFieldTable is a name of a record metadata field that stores a Redshift table name. - metadataFieldTable = "opencdc.collection" ) type driver struct { @@ -53,7 +51,7 @@ func (d *driver) GenerateRecord(_ *testing.T, operation opencdc.Operation) openc Position: nil, Operation: operation, Metadata: map[string]string{ - metadataFieldTable: d.Config.SourceConfig[srcConfig.ConfigTables], + opencdc.MetadataCollection: d.Config.SourceConfig[srcConfig.ConfigTable], }, Key: opencdc.StructuredData{ "col1": d.id, @@ -71,9 +69,9 @@ func TestAcceptance(t *testing.T) { } cfg := map[string]string{ - srcConfig.ConfigDsn: dsn, - srcConfig.ConfigTables: fmt.Sprintf("conduit_test_%d", time.Now().UnixNano()), - srcConfig.ConfigOrderingColumns: "col1", + srcConfig.ConfigDsn: dsn, + srcConfig.ConfigTable: fmt.Sprintf("conduit_test_%d", time.Now().UnixNano()), + srcConfig.ConfigOrderingColumn: "col1", } sdk.AcceptanceTest(t, &driver{ diff --git a/common/configuration.go b/common/configuration.go index 395fe28..5d1d5a1 100644 --- a/common/configuration.go +++ b/common/configuration.go @@ -18,6 +18,8 @@ const ( MaxConfigStringLength = 127 ConfigTable = "table" ConfigKeyColumns = "keyColumns" + MinConfigBatchSize = 1 + MaxConfigBatchSize = 100000 ) // Configuration contains common for source and destination configurable values. diff --git a/common/error.go b/common/error.go index aaff629..a7b16bc 100644 --- a/common/error.go +++ b/common/error.go @@ -67,26 +67,3 @@ func (e GreaterThanError) Error() string { func NewGreaterThanError(fieldName string, value int) GreaterThanError { return GreaterThanError{fieldName: fieldName, value: value} } - -type NoTablesOrColumnsError struct{} - -func (e NoTablesOrColumnsError) Error() string { - return "at least one table and one corresponding ordering column must be provided" -} - -func NewNoTablesOrColumnsError() NoTablesOrColumnsError { - return NoTablesOrColumnsError{} -} - -type MismatchedTablesAndColumnsError struct { - tablesCount int - orderingColumnsCount int -} - -func (e MismatchedTablesAndColumnsError) Error() string { - return fmt.Sprintf("the number of tables (%d) and ordering columns (%d) must match", e.tablesCount, e.orderingColumnsCount) -} - -func NewMismatchedTablesAndColumnsError(tablesCount, orderingColumnsCount int) MismatchedTablesAndColumnsError { - return MismatchedTablesAndColumnsError{tablesCount: tablesCount, orderingColumnsCount: orderingColumnsCount} -} diff --git a/destination/config/config.go b/destination/config/config.go index 82d9bd4..3674ea8 100644 --- a/destination/config/config.go +++ b/destination/config/config.go @@ -17,33 +17,96 @@ package config import ( + "bytes" + "fmt" "strings" + "text/template" + "github.com/Masterminds/sprig/v3" "github.com/conduitio-labs/conduit-connector-redshift/common" + "github.com/conduitio/conduit-commons/opencdc" ) +type TableFn func(opencdc.Record) (string, error) + // Config is a destination configuration needed to connect to Redshift database. type Config struct { common.Configuration + // Table is the configuration of the table name. + Table string `json:"table" default:"{{ index .Metadata \"opencdc.collection\" }}"` + // KeyColumns is the configuration of comma-separated column names to build the sdk.Record.Key. + KeyColumns []string `json:"keyColumns"` +} - // Table is used as the target table into which records are inserted. - Table string `json:"table"` +// Init sets lowercase "table" name if not a template. +func (c Config) Init() Config { + if !c.isTableTemplate() { + c.Table = strings.ToLower(c.Table) + } + + return c } // Validate executes manual validations beyond what is defined in struct tags. -func (c *Config) Validate() error { +func (c Config) Validate() error { // c.DSN has required validation handled in struct tag - // handling "lowercase", "excludesall= " and "lte=127" validations for c.Table - if c.Table != strings.ToLower(c.Table) { - return common.NewLowercaseError(ConfigTable) - } - if strings.Contains(c.Table, " ") { + // handling "excludesall= " and "lte=127" validations for c.Table + if !c.isTableTemplate() && strings.Contains(c.Table, " ") { return common.NewExcludesSpacesError(ConfigTable) } if len(c.Table) > common.MaxConfigStringLength { return common.NewLessThanError(ConfigTable, common.MaxConfigStringLength) } + // c.KeyColumns handling "lowercase", "excludesall= " and "lte=127" validations + for _, v := range c.KeyColumns { + if v != strings.ToLower(v) { + return common.NewLowercaseError(ConfigKeyColumns) + } + if strings.Contains(v, " ") { + return common.NewExcludesSpacesError(ConfigKeyColumns) + } + if len(v) > common.MaxConfigStringLength { + return common.NewLessThanError(ConfigKeyColumns, common.MaxConfigStringLength) + } + } + return nil } + +// TableFunction returns a function that determines the table for each record individually. +// The function might be returning a static table name. +// If the table is neither static nor a template, an error is returned. +func (c Config) TableFunction() (f TableFn, err error) { + // Not a template, i.e. it's a static table name + if !c.isTableTemplate() { + return func(_ opencdc.Record) (string, error) { + return c.Table, nil + }, nil + } + + // Try to parse the table + t, err := template.New("table").Funcs(sprig.FuncMap()).Parse(c.Table) + if err != nil { + // The table is not a valid Go template. + return nil, fmt.Errorf("table is neither a valid static table nor a valid Go template: %w", err) + } + + // The table is a valid template, return TableFn. + var buf bytes.Buffer + + return func(r opencdc.Record) (string, error) { + buf.Reset() + if err := t.Execute(&buf, r); err != nil { + return "", fmt.Errorf("failed to execute table template: %w", err) + } + + return buf.String(), nil + }, nil +} + +// isTableTemplate returns true if "table" contains a template placeholder. +func (c Config) isTableTemplate() bool { + return strings.Contains(c.Table, "{{") && strings.Contains(c.Table, "}}") +} diff --git a/destination/config/config_test.go b/destination/config/config_test.go index e553f30..3e9b3c6 100644 --- a/destination/config/config_test.go +++ b/destination/config/config_test.go @@ -28,6 +28,48 @@ const ( abcdefghijklmnopqrstuvwxyz_zyxwvutsrqponmlkjihgfedcba_xxxxxxxx` ) +func TestConfig_Init(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + in Config + expected string + }{ + { + name: "table already lowercase", + in: Config{ + Table: testValueTable, + }, + expected: testValueTable, + }, + { + name: "table in uppercase", + in: Config{ + Table: "TEST_TABLE", + }, + expected: testValueTable, + }, + { + name: "table mixed case", + in: Config{ + Table: "Test_Table", + }, + expected: testValueTable, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + is := is.New(t) + result := tt.in.Init() + is.Equal(result.Table, tt.expected) + }) + } +} + func TestValidateConfig(t *testing.T) { t.Parallel() @@ -36,33 +78,101 @@ func TestValidateConfig(t *testing.T) { in *Config wantErr error }{ + { + name: "success_keyColumns_has_one_key", + in: &Config{ + Configuration: common.Configuration{ + DSN: testValueDSN, + }, + Table: testValueTable, + KeyColumns: []string{"id"}, + }, + wantErr: nil, + }, + { + name: "success_keyColumns_has_two_keys", + in: &Config{ + Configuration: common.Configuration{ + DSN: testValueDSN, + }, + Table: testValueTable, + KeyColumns: []string{"id", "name"}, + }, + wantErr: nil, + }, + { + name: "success_keyColumns_ends_with_space", + in: &Config{ + Configuration: common.Configuration{ + DSN: testValueDSN, + }, + Table: testValueTable, + KeyColumns: []string{"id", "name "}, + }, + wantErr: common.NewExcludesSpacesError(ConfigKeyColumns), + }, + { + name: "success_keyColumns_starts_with_space", + in: &Config{ + Configuration: common.Configuration{ + DSN: testValueDSN, + }, + Table: testValueTable, + KeyColumns: []string{"id", "name "}, + }, + wantErr: common.NewExcludesSpacesError(ConfigKeyColumns), + }, + { + name: "success_keyColumns_has_two_spaces", + in: &Config{ + Configuration: common.Configuration{ + DSN: testValueDSN, + }, + Table: testValueTable, + KeyColumns: []string{"id", " name"}, + }, + wantErr: common.NewExcludesSpacesError(ConfigKeyColumns), + }, { name: "failure_table_has_space", in: &Config{ - Table: "test_table ", Configuration: common.Configuration{ DSN: testValueDSN, }, + Table: "test table", }, wantErr: common.NewExcludesSpacesError(ConfigTable), }, { - name: "failure_table_has_uppercase_letter", + name: "failure_keyColumns_has_uppercase_letter", + in: &Config{ + Configuration: common.Configuration{ + DSN: testValueDSN, + }, + Table: testValueTable, + KeyColumns: []string{"ID"}, + }, + wantErr: common.NewLowercaseError(ConfigKeyColumns), + }, + { + name: "failure_keyColumns_exceeds_max_length", in: &Config{ - Table: "Test_table", Configuration: common.Configuration{ DSN: testValueDSN, }, + Table: testValueTable, + KeyColumns: []string{testLongString}, }, - wantErr: common.NewLowercaseError(ConfigTable), + wantErr: common.NewLessThanError(ConfigKeyColumns, common.MaxConfigStringLength), }, { name: "failure_table_exceeds_max_length", in: &Config{ - Table: testLongString, Configuration: common.Configuration{ DSN: testValueDSN, }, + Table: testLongString, + KeyColumns: []string{"id"}, }, wantErr: common.NewLessThanError(ConfigTable, common.MaxConfigStringLength), }, diff --git a/destination/config/paramgen.go b/destination/config/paramgen.go index 1546780..0f6613e 100644 --- a/destination/config/paramgen.go +++ b/destination/config/paramgen.go @@ -8,8 +8,9 @@ import ( ) const ( - ConfigDsn = "dsn" - ConfigTable = "table" + ConfigDsn = "dsn" + ConfigKeyColumns = "keyColumns" + ConfigTable = "table" ) func (Config) Parameters() map[string]config.Parameter { @@ -22,9 +23,15 @@ func (Config) Parameters() map[string]config.Parameter { config.ValidationRequired{}, }, }, - ConfigTable: { + ConfigKeyColumns: { Default: "", - Description: "Table is used as the target table into which records are inserted.", + Description: "KeyColumns is the configuration of comma-separated column names to build the sdk.Record.Key.", + Type: config.ParameterTypeString, + Validations: []config.Validation{}, + }, + ConfigTable: { + Default: "{{ index .Metadata \"opencdc.collection\" }}", + Description: "Table is the configuration of the table name.", Type: config.ParameterTypeString, Validations: []config.Validation{}, }, diff --git a/destination/destination.go b/destination/destination.go index f5ba5d1..d29a6c1 100644 --- a/destination/destination.go +++ b/destination/destination.go @@ -66,6 +66,8 @@ func (d *Destination) Configure(ctx context.Context, cfg commonsConfig.Config) e return err //nolint: wrapcheck // not needed here } + d.config = d.config.Init() + err = d.config.Validate() if err != nil { return fmt.Errorf("error validating configuration: %w", err) diff --git a/destination/destination_integration_test.go b/destination/destination_integration_test.go index 810b927..9ff10f3 100644 --- a/destination/destination_integration_test.go +++ b/destination/destination_integration_test.go @@ -258,6 +258,9 @@ func TestDestination_Write_successKeyColumns(t *testing.T) { _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (1, 2);", cfg[config.ConfigTable])) is.NoErr(err) + // set a KeyColumns field to the config + cfg[config.ConfigKeyColumns] = "col1" + dest := NewDestination() err = dest.Configure(ctx, cfg) @@ -349,6 +352,9 @@ func TestDestination_Write_failedWrongKeyColumnsField(t *testing.T) { _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (1, 2);", cfg[config.ConfigTable])) is.NoErr(err) + // set a wrong KeyColumns field to the config + cfg[config.ConfigKeyColumns] = "wrong_column" + dest := NewDestination() err = dest.Configure(ctx, cfg) diff --git a/destination/writer/writer.go b/destination/writer/writer.go index 2e595c0..1b58670 100644 --- a/destination/writer/writer.go +++ b/destination/writer/writer.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "net/url" + "strings" "time" "github.com/conduitio-labs/conduit-connector-redshift/columntypes" @@ -28,9 +29,9 @@ import ( "github.com/jmoiron/sqlx" ) +type ColumnTypes map[string]string + const ( - // metadataFieldTable is a name of a record metadata field that stores a Redshift table name. - metadataFieldTable = "opencdc.collection" // keySearchPath is a key of get parameter of a datatable's schema name. keySearchPath = "search_path" // pingTimeout is a database ping timeout. @@ -39,18 +40,31 @@ const ( // Writer implements a writer logic for Redshift destination. type Writer struct { - db *sqlx.DB - table string - keyColumns []string - columnTypes map[string]string + db *sqlx.DB + // Schema name to write data into. + schema string + // Table name provided through config. + tableName string + // Key columns provided through config. + keyColumns []string + // Function to dynamically get table name for each record. + tableNameFunc config.TableFn + // Maps table names to their column types. + columnTypes map[string]ColumnTypes } // NewWriter creates new instance of the Writer. func NewWriter(ctx context.Context, driverName string, config config.Config) (*Writer, error) { - var err error + tableFn, err := config.TableFunction() + if err != nil { + return nil, fmt.Errorf("invalid table name or table function: %w", err) + } writer := &Writer{ - table: config.Table, + tableName: config.Table, + keyColumns: config.KeyColumns, + tableNameFunc: tableFn, + columnTypes: make(map[string]ColumnTypes), } writer.db, err = sqlx.Open(driverName, config.DSN) @@ -71,17 +85,17 @@ func NewWriter(ctx context.Context, driverName string, config config.Config) (*W return nil, fmt.Errorf("parse dsn: %w", err) } - writer.columnTypes, err = columntypes.GetColumnTypes(ctx, writer.db, writer.table, u.Query().Get(keySearchPath)) - if err != nil { - return nil, fmt.Errorf("get column types: %w", err) - } + writer.schema = u.Query().Get(keySearchPath) return writer, nil } // Insert inserts a record. func (w *Writer) Insert(ctx context.Context, record opencdc.Record) error { - tableName := w.getTableName(record.Metadata) + table, err := w.tableNameFunc(record) + if err != nil { + return err + } payload, err := w.structurizeData(record.Payload.After) if err != nil { @@ -93,7 +107,12 @@ func (w *Writer) Insert(ctx context.Context, record opencdc.Record) error { return ErrNoPayload } - payload, err = columntypes.ConvertStructuredData(w.columnTypes, payload) + columnTypes, err := w.getColumnTypes(ctx, table) + if err != nil { + return fmt.Errorf("get column types: %w", err) + } + + payload, err = columntypes.ConvertStructuredData(columnTypes, payload) if err != nil { return fmt.Errorf("convert structure data: %w", err) } @@ -101,7 +120,7 @@ func (w *Writer) Insert(ctx context.Context, record opencdc.Record) error { columns, values := w.extractColumnsAndValues(payload) query, args := sqlbuilder.PostgreSQL.NewInsertBuilder(). - InsertInto(tableName). + InsertInto(table). Cols(columns...). Values(values...). Build() @@ -116,26 +135,24 @@ func (w *Writer) Insert(ctx context.Context, record opencdc.Record) error { // Update updates a record. func (w *Writer) Update(ctx context.Context, record opencdc.Record) error { - tableName := w.getTableName(record.Metadata) - - payload, err := w.structurizeData(record.Payload.After) + table, err := w.tableNameFunc(record) if err != nil { - return fmt.Errorf("structurize payload: %w", err) + return err } - // if payload is empty return empty payload error - if payload == nil { - return ErrNoPayload + payload, key, err := w.preparePayloadAndKey(record) + if err != nil { + return err } - payload, err = columntypes.ConvertStructuredData(w.columnTypes, payload) + columnTypes, err := w.getColumnTypes(ctx, table) if err != nil { - return fmt.Errorf("convert structure data: %w", err) + return err } - key, err := w.structurizeData(record.Key) + payload, err = columntypes.ConvertStructuredData(columnTypes, payload) if err != nil { - return fmt.Errorf("structurize key: %w", err) + return fmt.Errorf("convert structure data: %w", err) } key, err = w.populateKey(key, payload) @@ -156,7 +173,7 @@ func (w *Writer) Update(ctx context.Context, record opencdc.Record) error { columns, values := w.extractColumnsAndValues(payload) ub := sqlbuilder.PostgreSQL.NewUpdateBuilder(). - Update(tableName) + Update(table) assignments := make([]string, len(columns)) for i := range columns { @@ -180,7 +197,10 @@ func (w *Writer) Update(ctx context.Context, record opencdc.Record) error { // Delete deletes a record. func (w *Writer) Delete(ctx context.Context, record opencdc.Record) error { - tableName := w.getTableName(record.Metadata) + table, err := w.tableNameFunc(record) + if err != nil { + return err + } key, err := w.structurizeData(record.Key) if err != nil { @@ -193,7 +213,7 @@ func (w *Writer) Delete(ctx context.Context, record opencdc.Record) error { } db := sqlbuilder.PostgreSQL.NewDeleteBuilder(). - DeleteFrom(tableName) + DeleteFrom(table) for i := range keyColumns { db.Where(db.Equal(keyColumns[i], key[keyColumns[i]])) @@ -220,15 +240,22 @@ func (w *Writer) Stop() error { return nil } -// getTableName returns either the record metadata value for the table -// or the default configured value for the table. -func (w *Writer) getTableName(metadata map[string]string) string { - tableName, ok := metadata[metadataFieldTable] - if !ok { - return w.table +func (w *Writer) preparePayloadAndKey(record opencdc.Record) (map[string]interface{}, map[string]interface{}, error) { + payload, err := w.structurizeData(record.Payload.After) + if err != nil { + return nil, nil, fmt.Errorf("structurize payload: %w", err) + } + + if payload == nil { + return nil, nil, ErrNoPayload } - return tableName + key, err := w.structurizeData(record.Key) + if err != nil { + return nil, nil, fmt.Errorf("structurize key: %w", err) + } + + return payload, key, nil } // getKeyColumns returns either all the keys of the opencdc.Record's Key field. @@ -275,14 +302,18 @@ func (w *Writer) extractColumnsAndValues(payload opencdc.StructuredData) ([]stri return columns, values } -// populateKey populates the key from the payload by keyColumns keys if it's empty. +// populateKey populates the key from the payload by provided keyColumns keys if it's empty and a static table is provided. func (w *Writer) populateKey(key opencdc.StructuredData, payload opencdc.StructuredData) (opencdc.StructuredData, error) { if key != nil { return key, nil } - key = make(opencdc.StructuredData, len(w.keyColumns)) + // keyColumns not available for non-static tables + if strings.Contains(w.tableName, "{{") && strings.Contains(w.tableName, "}}") { + return nil, ErrNoKey + } + key = make(opencdc.StructuredData, len(w.keyColumns)) for i := range w.keyColumns { val, ok := payload[w.keyColumns[i]] if !ok { @@ -294,3 +325,18 @@ func (w *Writer) populateKey(key opencdc.StructuredData, payload opencdc.Structu return key, nil } + +// getColumnTypes returns column types from w.tableColumnTypes, or queries the database if not exists. +func (w *Writer) getColumnTypes(ctx context.Context, table string) (map[string]string, error) { + if columnTypes, ok := w.columnTypes[table]; ok { + return columnTypes, nil + } + + columnTypes, err := columntypes.GetColumnTypes(ctx, w.db, table, w.schema) + if err != nil { + return nil, fmt.Errorf("get column types: %w", err) + } + w.columnTypes[table] = columnTypes + + return columnTypes, nil +} diff --git a/go.mod b/go.mod index 67cac40..78c8f68 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.22.5 toolchain go1.22.7 require ( + github.com/Masterminds/sprig/v3 v3.3.0 github.com/conduitio/conduit-commons v0.3.0 github.com/conduitio/conduit-connector-sdk v0.10.1 github.com/golangci/golangci-lint v1.61.0 @@ -31,7 +32,6 @@ require ( github.com/GaijinEntertainment/go-exhaustruct/v3 v3.3.0 // indirect github.com/Masterminds/goutils v1.1.1 // indirect github.com/Masterminds/semver/v3 v3.3.0 // indirect - github.com/Masterminds/sprig/v3 v3.3.0 // indirect github.com/OpenPeeDeeP/depguard/v2 v2.2.0 // indirect github.com/alecthomas/go-check-sumtype v0.1.4 // indirect github.com/alexkohler/nakedret/v2 v2.0.4 // indirect diff --git a/source/config/config.go b/source/config/config.go index 254236d..585e632 100644 --- a/source/config/config.go +++ b/source/config/config.go @@ -26,69 +26,130 @@ import ( // Config contains source-specific configurable values. type Config struct { common.Configuration - // Tables is a list of table names to pull data from. - Tables []string `json:"tables" validate:"required"` - // OrderingColumns is a list of corresponding ordering columns for the table - // that the connector will use for ordering rows. - OrderingColumns []string `json:"orderingColumns" validate:"required"` + + // Deprecated: use `tables` instead + Table string `json:"table"` + // Deprecated: use `tables.*.keyColumns` instead + KeyColumns []string `json:"keyColumns"` + // Deprecated: use `tables.*.orderingColumn` instead + OrderingColumn string `json:"orderingColumn"` + + // Tables is the table names and their configs to pull data from + Tables map[string]TableConfig `json:"tables"` // Snapshot is the configuration that determines whether the connector // will take a snapshot of the entire table before starting cdc mode. Snapshot bool `json:"snapshot" default:"true"` // BatchSize is a size of rows batch. - BatchSize int `json:"batchSize" default:"1000" validate:"gt=0,lt=100001"` + BatchSize int `json:"batchSize" default:"1000"` +} + +type TableConfig struct { + // KeyColumns is the configuration list of column names to build the opencdc.Record.Key. + KeyColumns []string `json:"keyColumns"` + // OrderingColumn is a name of a column that the connector will use for ordering rows. + OrderingColumn string `json:"orderingColumn"` +} + +// Init sets the desired value on Tables while Table, OrderingColumn & KeyColumns are being deprecated +// and converts all the table names into lowercase. +func (c Config) Init() Config { + if c.Table != "" && c.OrderingColumn != "" && len(c.Tables) == 0 { + tables := make(map[string]TableConfig, 1) + + tables[c.Table] = TableConfig{ + KeyColumns: c.KeyColumns, + OrderingColumn: c.OrderingColumn, + } + + c.Tables = tables + c.Table = "" + } + + tables := make(map[string]TableConfig) + for tableName, tableConfig := range c.Tables { + tables[strings.ToLower(tableName)] = tableConfig + } + c.Tables = tables + + return c } // Validate executes manual validations beyond what is defined in struct tags. -func (c *Config) Validate() error { - // c.DSN has required validation handled in struct tag. +func (c Config) Validate() error { + // ensure that both `table` and `tables` are not provided + if len(c.Tables) > 0 && c.Table != "" { + return fmt.Errorf(`error validating "tables": cannot provide both "table" and "tables", use "tables" only`) + } - // Ensure there is at least one table and one corresponding ordering column. - if len(c.Tables) == 0 || len(c.OrderingColumns) == 0 { - return common.NewNoTablesOrColumnsError() + // ensure at least one table is configured. + if len(c.Tables) == 0 { + return fmt.Errorf(`error validating "tables": required parameter is not provided`) } - // Ensure that the number of tables and ordering columns match. - if len(c.Tables) != len(c.OrderingColumns) { - return common.NewMismatchedTablesAndColumnsError(len(c.Tables), len(c.OrderingColumns)) + // ensure `orderingColumn` is provided if `table` is being used. + if c.OrderingColumn == "" && c.Table != "" { + return fmt.Errorf(`error validating "orderingColumn": required if "table" is being used`) } - // c.Tables required validation is handled in stuct tag - // handling "lowercase", "excludesall= " and "lte=127" validations. - for i, table := range c.Tables { - if table != strings.ToLower(table) { - return common.NewLowercaseError(fmt.Sprintf("table[%d]", i)) + // validate each table and table config. + for tableName, tableConfig := range c.Tables { + // handling "excludesall= " and "lte=127" validations for table name + if strings.Contains(tableName, " ") { + return common.NewExcludesSpacesError(ConfigTable) } - if strings.Contains(table, " ") { - return common.NewExcludesSpacesError(fmt.Sprintf("table[%d]", i)) + if len(tableName) > common.MaxConfigStringLength { + return common.NewLessThanError(ConfigTable, common.MaxConfigStringLength) } - if len(table) > common.MaxConfigStringLength { - return common.NewLessThanError(fmt.Sprintf("table[%d]", i), common.MaxConfigStringLength) + + // validate table config + if err := tableConfig.validate(); err != nil { + return err } } - // c.OrderingColumns required validation is handled in stuct tag - // handling "lowercase", "excludesall= " and "lte=127" validations. - for i, col := range c.OrderingColumns { - if col != strings.ToLower(col) { - return common.NewLowercaseError(fmt.Sprintf("orderingColumn[%d]", i)) - } - if strings.Contains(col, " ") { - return common.NewExcludesSpacesError(fmt.Sprintf("orderingColumn[%d]", i)) - } - if len(col) > common.MaxConfigStringLength { - return common.NewLessThanError(fmt.Sprintf("orderingColumn[%d]", i), common.MaxConfigStringLength) - } + // c.BatchSize handling "gte=1" and "lte=100000" validations. + if c.BatchSize < common.MinConfigBatchSize { + return common.NewGreaterThanError(ConfigBatchSize, common.MinConfigBatchSize) + } + if c.BatchSize > common.MaxConfigBatchSize { + return common.NewLessThanError(ConfigBatchSize, common.MaxConfigBatchSize) } return nil } -func (c *Config) GetTableOrderingMap() map[string]string { - tableOrderingMap := make(map[string]string) +// Validate checks the individual table configuration. +func (tc TableConfig) validate() error { + // tc.OrderingColumn required validation is handled in stuct tag + // handling "required", "lowercase", "excludesall= " and "lte=127" validations. + if tc.OrderingColumn == "" { + return fmt.Errorf(`error validating %s: required parameter is not provided`, ConfigOrderingColumn) + } + if tc.OrderingColumn != strings.ToLower(tc.OrderingColumn) { + return common.NewLowercaseError(ConfigOrderingColumn) + } + if strings.Contains(tc.OrderingColumn, " ") { + return common.NewExcludesSpacesError(ConfigOrderingColumn) + } + if len(tc.OrderingColumn) > common.MaxConfigStringLength { + return common.NewLessThanError(ConfigOrderingColumn, common.MaxConfigStringLength) + } - for i := range c.Tables { - tableOrderingMap[c.Tables[i]] = c.OrderingColumns[i] + if len(tc.KeyColumns) == 0 { + return nil + } + // handling "lowercase", "excludesall= " and "lte=127" validations on each key column from tc.KeyColumns + for _, keyColumn := range tc.KeyColumns { + if keyColumn != strings.ToLower(keyColumn) { + return common.NewLowercaseError(ConfigKeyColumns) + } + if strings.Contains(keyColumn, " ") { + return common.NewExcludesSpacesError(ConfigKeyColumns) + } + if len(keyColumn) > common.MaxConfigStringLength { + return common.NewLessThanError(ConfigKeyColumns, common.MaxConfigStringLength) + } } - return tableOrderingMap + return nil } diff --git a/source/config/config_test.go b/source/config/config_test.go index e092f8e..6f4e645 100644 --- a/source/config/config_test.go +++ b/source/config/config_test.go @@ -15,6 +15,7 @@ package config import ( + "fmt" "testing" "github.com/conduitio-labs/conduit-connector-redshift/common" @@ -23,6 +24,7 @@ import ( const ( testValueDSN = "postgres://username:password@endpoint:5439/database" + testValueTable = "test_table" testLongString = `this_is_a_very_long_string_which_exceeds_max_config_string_limit_ abcdefghijklmnopqrstuvwxyz_zyxwvutsrqponmlkjihgfedcba_xxxxxxxx` ) @@ -32,127 +34,201 @@ func TestValidateConfig(t *testing.T) { tests := []struct { name string - in *Config + in Config wantErr error }{ { - name: "success", - in: &Config{ + name: "success_single_table_config", + in: Config{ Configuration: common.Configuration{ DSN: testValueDSN, }, - Tables: []string{"test_table1", "test_table2"}, - OrderingColumns: []string{"id1", "id2"}, - Snapshot: true, - BatchSize: 1000, + Tables: map[string]TableConfig{ + "test_table": { + KeyColumns: []string{"id"}, + OrderingColumn: "id", + }, + }, + BatchSize: 1000, + Snapshot: true, + }, + }, + { + name: "success_multiple_tables_config", + in: Config{ + Configuration: common.Configuration{ + DSN: testValueDSN, + }, + Tables: map[string]TableConfig{ + "users": { + KeyColumns: []string{"id", "email"}, + OrderingColumn: "updated_at", + }, + "orders": { + KeyColumns: []string{"order_id"}, + OrderingColumn: "created_at", + }, + }, + BatchSize: 1000, + Snapshot: true, + }, + }, + { + name: "failure_no_tables_configured", + in: Config{ + Configuration: common.Configuration{ + DSN: testValueDSN, + }, + Tables: map[string]TableConfig{}, + Snapshot: true, + BatchSize: 1000, }, - wantErr: nil, + wantErr: fmt.Errorf(`error validating "tables": required parameter is not provided`), }, { - name: "failure_no_tables", - in: &Config{ + name: "failure_both_table_and_tables_provided", + in: Config{ Configuration: common.Configuration{ DSN: testValueDSN, }, - OrderingColumns: []string{"id"}, - BatchSize: 1000, + Table: "legacy_table", + Tables: map[string]TableConfig{ + "new_table": { + KeyColumns: []string{"id"}, + OrderingColumn: "updated_at", + }, + }, + OrderingColumn: "id", + BatchSize: 1000, }, - wantErr: common.NewNoTablesOrColumnsError(), + wantErr: fmt.Errorf(`error validating "tables": cannot provide both "table" and "tables", use "tables" only`), }, { - name: "failure_no_ordering_columns", - in: &Config{ + name: "failure_table_name_has_space", + in: Config{ Configuration: common.Configuration{ DSN: testValueDSN, }, - Tables: []string{"test_table"}, + Tables: map[string]TableConfig{ + "test table": { + KeyColumns: []string{"id"}, + OrderingColumn: "updated_at", + }, + }, BatchSize: 1000, }, - wantErr: common.NewNoTablesOrColumnsError(), + wantErr: common.NewExcludesSpacesError(ConfigTable), }, { - name: "failure_mismatched_tables_and_columns", - in: &Config{ + name: "failure_table_name_too_long", + in: Config{ Configuration: common.Configuration{ DSN: testValueDSN, }, - Tables: []string{"test_table1", "test_table2"}, - OrderingColumns: []string{"id"}, - BatchSize: 1000, + Tables: map[string]TableConfig{ + testLongString: { + KeyColumns: []string{"id"}, + OrderingColumn: "updated_at", + }, + }, + BatchSize: 1000, }, - wantErr: common.NewMismatchedTablesAndColumnsError(2, 1), + wantErr: common.NewLessThanError(ConfigTable, common.MaxConfigStringLength), }, { - name: "failure_table_has_uppercase_letter", - in: &Config{ + name: "failure_ordering_column_missing", + in: Config{ Configuration: common.Configuration{ DSN: testValueDSN, }, - Tables: []string{"Test_table"}, - OrderingColumns: []string{"id"}, - BatchSize: 1000, + Tables: map[string]TableConfig{ + "test_table": { + KeyColumns: []string{"id"}, + }, + }, + BatchSize: 1000, }, - wantErr: common.NewLowercaseError("table[0]"), + wantErr: fmt.Errorf(`error validating %s: required parameter is not provided`, ConfigOrderingColumn), }, { - name: "failure_table_has_space", - in: &Config{ + name: "failure_ordering_column_has_uppercase", + in: Config{ Configuration: common.Configuration{ DSN: testValueDSN, }, - Tables: []string{"test table"}, - OrderingColumns: []string{"id"}, - BatchSize: 1000, + Tables: map[string]TableConfig{ + "test_table": { + KeyColumns: []string{"id"}, + OrderingColumn: "Updated_At", + }, + }, + BatchSize: 1000, }, - wantErr: common.NewExcludesSpacesError("table[0]"), + wantErr: common.NewLowercaseError(ConfigOrderingColumn), }, { - name: "failure_table_exceeds_max_length", - in: &Config{ + name: "failure_key_column_has_space", + in: Config{ Configuration: common.Configuration{ DSN: testValueDSN, }, - Tables: []string{testLongString}, - OrderingColumns: []string{"id"}, - BatchSize: 1000, + Tables: map[string]TableConfig{ + "test_table": { + KeyColumns: []string{"user id"}, + OrderingColumn: "updated_at", + }, + }, + BatchSize: 1000, }, - wantErr: common.NewLessThanError("table[0]", common.MaxConfigStringLength), + wantErr: common.NewExcludesSpacesError(ConfigKeyColumns), }, { - name: "failure_ordering_column_has_uppercase_letter", - in: &Config{ + name: "failure_key_column_has_uppercase", + in: Config{ Configuration: common.Configuration{ DSN: testValueDSN, }, - Tables: []string{"test_table"}, - OrderingColumns: []string{"ID"}, - BatchSize: 1000, + Tables: map[string]TableConfig{ + "test_table": { + KeyColumns: []string{"ID"}, + OrderingColumn: "updated_at", + }, + }, + BatchSize: 1000, }, - wantErr: common.NewLowercaseError("orderingColumn[0]"), + wantErr: common.NewLowercaseError(ConfigKeyColumns), }, { - name: "failure_ordering_column_has_space", - in: &Config{ + name: "failure_batch_size_too_small", + in: Config{ Configuration: common.Configuration{ DSN: testValueDSN, }, - Tables: []string{"test_table"}, - OrderingColumns: []string{" id"}, - BatchSize: 1000, + Tables: map[string]TableConfig{ + "test_table": { + KeyColumns: []string{"id"}, + OrderingColumn: "updated_at", + }, + }, + BatchSize: 0, }, - wantErr: common.NewExcludesSpacesError("orderingColumn[0]"), + wantErr: common.NewGreaterThanError(ConfigBatchSize, common.MinConfigBatchSize), }, { - name: "failure_ordering_column_exceeds_max_length", - in: &Config{ + name: "failure_batch_size_too_large", + in: Config{ Configuration: common.Configuration{ DSN: testValueDSN, }, - Tables: []string{"test_table"}, - OrderingColumns: []string{testLongString}, - BatchSize: 1000, + Tables: map[string]TableConfig{ + "test_table": { + KeyColumns: []string{"id"}, + OrderingColumn: "updated_at", + }, + }, + BatchSize: 100001, }, - wantErr: common.NewLessThanError("orderingColumn[0]", common.MaxConfigStringLength), + wantErr: common.NewLessThanError(ConfigBatchSize, common.MaxConfigBatchSize), }, } @@ -172,22 +248,92 @@ func TestValidateConfig(t *testing.T) { } } -func TestGetTableOrderingMap(t *testing.T) { +func TestConfigInit(t *testing.T) { t.Parallel() - is := is.New(t) - config := &Config{ - Tables: []string{"table1", "table2"}, - OrderingColumns: []string{"id1", "id2"}, + tests := []struct { + name string + input Config + expected Config + }{ + { + name: "convert_legacy_config", + input: Config{ + Configuration: common.Configuration{ + DSN: testValueDSN, + }, + Table: "Legacy_Table", + KeyColumns: []string{"id", "email"}, + OrderingColumn: "updated_at", + BatchSize: 1000, + }, + expected: Config{ + Configuration: common.Configuration{ + DSN: testValueDSN, + }, + Tables: map[string]TableConfig{ + "legacy_table": { + KeyColumns: []string{"id", "email"}, + OrderingColumn: "updated_at", + }, + }, + BatchSize: 1000, + Table: "", + }, + }, + { + name: "convert_table_names_to_lowercase", + input: Config{ + Configuration: common.Configuration{ + DSN: testValueDSN, + }, + Tables: map[string]TableConfig{ + "Users": { + KeyColumns: []string{"id"}, + OrderingColumn: "updated_at", + }, + "Orders": { + KeyColumns: []string{"order_id"}, + OrderingColumn: "created_at", + }, + }, + BatchSize: 1000, + }, + expected: Config{ + Configuration: common.Configuration{ + DSN: testValueDSN, + }, + Tables: map[string]TableConfig{ + "users": { + KeyColumns: []string{"id"}, + OrderingColumn: "updated_at", + }, + "orders": { + KeyColumns: []string{"order_id"}, + OrderingColumn: "created_at", + }, + }, + BatchSize: 1000, + }, + }, } - expectedMap := map[string]string{ - "table1": "id1", - "table2": "id2", - } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + result := tt.input.Init() - result := config.GetTableOrderingMap() + is.Equal(len(result.Tables), len(tt.expected.Tables)) + for tableName, tableConfig := range result.Tables { + expectedConfig, exists := tt.expected.Tables[tableName] + is.True(exists) + is.Equal(tableConfig, expectedConfig) + } - is.Equal(result, expectedMap) + is.Equal(result.Table, tt.expected.Table) + is.Equal(result.BatchSize, tt.expected.BatchSize) + is.Equal(result.DSN, tt.expected.DSN) + }) + } } diff --git a/source/config/paramgen.go b/source/config/paramgen.go index b35c638..afe38e7 100644 --- a/source/config/paramgen.go +++ b/source/config/paramgen.go @@ -8,11 +8,14 @@ import ( ) const ( - ConfigBatchSize = "batchSize" - ConfigDsn = "dsn" - ConfigOrderingColumns = "orderingColumns" - ConfigSnapshot = "snapshot" - ConfigTables = "tables" + ConfigBatchSize = "batchSize" + ConfigDsn = "dsn" + ConfigKeyColumns = "keyColumns" + ConfigOrderingColumn = "orderingColumn" + ConfigSnapshot = "snapshot" + ConfigTable = "table" + ConfigTablesKeyColumns = "tables.*.keyColumns" + ConfigTablesOrderingColumn = "tables.*.orderingColumn" ) func (Config) Parameters() map[string]config.Parameter { @@ -21,10 +24,7 @@ func (Config) Parameters() map[string]config.Parameter { Default: "1000", Description: "BatchSize is a size of rows batch.", Type: config.ParameterTypeInt, - Validations: []config.Validation{ - config.ValidationGreaterThan{V: 0}, - config.ValidationLessThan{V: 100001}, - }, + Validations: []config.Validation{}, }, ConfigDsn: { Default: "", @@ -34,13 +34,17 @@ func (Config) Parameters() map[string]config.Parameter { config.ValidationRequired{}, }, }, - ConfigOrderingColumns: { + ConfigKeyColumns: { Default: "", - Description: "OrderingColumns is a list of corresponding ordering columns for the table\nthat the connector will use for ordering rows.", + Description: "Deprecated: use `tables.*.keyColumns` instead", Type: config.ParameterTypeString, - Validations: []config.Validation{ - config.ValidationRequired{}, - }, + Validations: []config.Validation{}, + }, + ConfigOrderingColumn: { + Default: "", + Description: "Deprecated: use `tables.*.orderingColumn` instead", + Type: config.ParameterTypeString, + Validations: []config.Validation{}, }, ConfigSnapshot: { Default: "true", @@ -48,13 +52,23 @@ func (Config) Parameters() map[string]config.Parameter { Type: config.ParameterTypeBool, Validations: []config.Validation{}, }, - ConfigTables: { + ConfigTable: { Default: "", - Description: "Tables is a list of table names to pull data from.", + Description: "Deprecated: use `tables` instead", Type: config.ParameterTypeString, - Validations: []config.Validation{ - config.ValidationRequired{}, - }, + Validations: []config.Validation{}, + }, + ConfigTablesKeyColumns: { + Default: "", + Description: "KeyColumns is the configuration list of column names to build the opencdc.Record.Key.", + Type: config.ParameterTypeString, + Validations: []config.Validation{}, + }, + ConfigTablesOrderingColumn: { + Default: "", + Description: "OrderingColumn is a name of a column that the connector will use for ordering rows.", + Type: config.ParameterTypeString, + Validations: []config.Validation{}, }, } } diff --git a/source/iterator/iterator.go b/source/iterator/iterator.go index 3b142af..1d5be46 100644 --- a/source/iterator/iterator.go +++ b/source/iterator/iterator.go @@ -28,10 +28,8 @@ type Iterator struct { db *sqlx.DB position *Position - // list of tables to fetch data from - tables []string - // orderingColumn is map of tables to its ordering column - orderingColumn map[string]string + // tables to pull data from and their config + tables map[string]config.TableConfig // batchSize is the size of a batch retrieved from Redshift batchSize int @@ -46,11 +44,10 @@ type Iterator struct { // New creates a new instance of the iterator. func New(ctx context.Context, driverName string, pos *Position, config config.Config) (*Iterator, error) { iterator := &Iterator{ - tables: config.Tables, - orderingColumn: config.GetTableOrderingMap(), - batchSize: config.BatchSize, - currentTable: 0, - position: pos, + tables: config.Tables, + batchSize: config.BatchSize, + currentTable: 0, + position: pos, } if len(pos.TablePositions) == 0 { @@ -62,26 +59,21 @@ func New(ctx context.Context, driverName string, pos *Position, config config.Co return nil, fmt.Errorf("open db: %w", err) } - for _, table := range iterator.tables { - position, ok := iterator.position.TablePositions[table] - if !ok { - position = TablePosition{ - LastProcessedValue: nil, - LatestSnapshotValue: nil, - } - } + for tableName, tableConfig := range iterator.tables { + position := iterator.position.TablePositions[tableName] worker, err := NewWorker(ctx, WorkerConfig{ db: iterator.db, position: position, - table: table, - orderingColumn: iterator.orderingColumn[table], + table: tableName, + orderingColumn: tableConfig.OrderingColumn, + keyColumns: tableConfig.KeyColumns, batchSize: iterator.batchSize, snapshot: config.Snapshot, dsn: config.DSN, }, iterator) if err != nil { - return nil, fmt.Errorf("create worker for table %s: %w", table, err) + return nil, fmt.Errorf("create worker for table %s: %w", tableName, err) } iterator.workers = append(iterator.workers, worker) } diff --git a/source/iterator/worker.go b/source/iterator/worker.go index bee1798..7eac841 100644 --- a/source/iterator/worker.go +++ b/source/iterator/worker.go @@ -29,8 +29,6 @@ import ( ) const ( - // metadataFieldTable is a name of a record metadata field that stores a Redshift table name. - metadataFieldTable = "opencdc.collection" // keySearchPath is a key of get parameter of a datatable's schema name. keySearchPath = "search_path" // pingTimeout is a database ping timeout. @@ -63,6 +61,7 @@ type WorkerConfig struct { table string orderingColumn string + keyColumns []string batchSize int // snapshot is the configuration that determines whether the connector // will take a snapshot of the entire table before starting cdc mode. @@ -76,7 +75,7 @@ func NewWorker(ctx context.Context, config WorkerConfig, iterator *Iterator) (*W db: config.db, position: config.position, table: config.table, - keyColumns: []string{}, + keyColumns: config.keyColumns, orderingColumn: config.orderingColumn, batchSize: config.batchSize, iterator: iterator, @@ -126,63 +125,63 @@ func NewWorker(ctx context.Context, config WorkerConfig, iterator *Iterator) (*W } // HasNext returns a bool indicating whether the source has the next record to return or not. -func (worker *Worker) HasNext(ctx context.Context) (bool, error) { - if worker.rows != nil && worker.rows.Next() { +func (w *Worker) HasNext(ctx context.Context) (bool, error) { + if w.rows != nil && w.rows.Next() { return true, nil } - if err := worker.loadRows(ctx); err != nil { + if err := w.loadRows(ctx); err != nil { return false, fmt.Errorf("load rows: %w", err) } - if worker.rows.Next() { + if w.rows.Next() { return true, nil } // At this point, there are no more rows to load // so if we are in snapshot mode, we can switch to CDC - if worker.position.LatestSnapshotValue != nil { + if w.position.LatestSnapshotValue != nil { // switch to CDC mode - worker.position.LastProcessedValue = worker.position.LatestSnapshotValue - worker.position.LatestSnapshotValue = nil + w.position.LastProcessedValue = w.position.LatestSnapshotValue + w.position.LatestSnapshotValue = nil - worker.iterator.updateTablePosition(worker.table, worker.position) + w.iterator.updateTablePosition(w.table, w.position) // and load new rows - if err := worker.loadRows(ctx); err != nil { + if err := w.loadRows(ctx); err != nil { return false, fmt.Errorf("load rows: %w", err) } - return worker.rows.Next(), nil + return w.rows.Next(), nil } return false, nil } // Next returns the next record. -func (worker *Worker) Next(_ context.Context) (opencdc.Record, error) { +func (w *Worker) Next(_ context.Context) (opencdc.Record, error) { row := make(map[string]any) - if err := worker.rows.MapScan(row); err != nil { + if err := w.rows.MapScan(row); err != nil { return opencdc.Record{}, fmt.Errorf("scan rows: %w", err) } - transformedRow, err := columntypes.TransformRow(row, worker.columnTypes) + transformedRow, err := columntypes.TransformRow(row, w.columnTypes) if err != nil { return opencdc.Record{}, fmt.Errorf("transform row column types: %w", err) } - if _, ok := transformedRow[worker.orderingColumn]; !ok { - return opencdc.Record{}, fmt.Errorf("ordering column %q not found", worker.orderingColumn) + if _, ok := transformedRow[w.orderingColumn]; !ok { + return opencdc.Record{}, fmt.Errorf("ordering column %q not found", w.orderingColumn) } key := make(opencdc.StructuredData) - for i := range worker.keyColumns { - val, ok := transformedRow[worker.keyColumns[i]] + for i := range w.keyColumns { + val, ok := transformedRow[w.keyColumns[i]] if !ok { - return opencdc.Record{}, fmt.Errorf("key column %q not found", worker.keyColumns[i]) + return opencdc.Record{}, fmt.Errorf("key column %q not found", w.keyColumns[i]) } - key[worker.keyColumns[i]] = val + key[w.keyColumns[i]] = val } rowBytes, err := json.Marshal(transformedRow) @@ -192,27 +191,27 @@ func (worker *Worker) Next(_ context.Context) (opencdc.Record, error) { // set a new position into the variable, // to avoid saving position into the struct until we marshal the position - position := worker.iterator.getPosition() + position := w.iterator.getPosition() - tablePos := worker.getTablePosition(position, transformedRow) + tablePos := w.getTablePosition(position, transformedRow) // set the value from iter.orderingColumn column you chose - position.TablePositions[worker.table] = tablePos + position.TablePositions[w.table] = tablePos sdkPosition, err := position.marshal() if err != nil { return opencdc.Record{}, fmt.Errorf("failed converting to SDK position :%w", err) } - worker.position = position.TablePositions[worker.table] - worker.iterator.updateTablePosition(worker.table, tablePos) + w.position = position.TablePositions[w.table] + w.iterator.updateTablePosition(w.table, tablePos) metadata := opencdc.Metadata{ - metadataFieldTable: worker.table, + opencdc.MetadataCollection: w.table, } metadata.SetCreatedAt(time.Now().UTC()) - if worker.position.LatestSnapshotValue != nil { + if w.position.LatestSnapshotValue != nil { return sdk.Util.Source.NewRecordSnapshot(sdkPosition, metadata, key, opencdc.RawData(rowBytes)), nil } @@ -220,11 +219,11 @@ func (worker *Worker) Next(_ context.Context) (opencdc.Record, error) { } // Stop stops worker. -func (worker *Worker) Stop() error { +func (w *Worker) Stop() error { var err error - if worker.rows != nil { - err = worker.rows.Close() + if w.rows != nil { + err = w.rows.Close() } if err != nil { @@ -236,26 +235,26 @@ func (worker *Worker) Stop() error { // loadRows selects a batch of rows from a database, based on the // table, columns, orderingColumn, batchSize and current position. -func (worker *Worker) loadRows(ctx context.Context) error { +func (w *Worker) loadRows(ctx context.Context) error { var err error sb := sqlbuilder.PostgreSQL.NewSelectBuilder(). Select("*"). - From(worker.table). - OrderBy(worker.orderingColumn). - Limit(worker.batchSize) + From(w.table). + OrderBy(w.orderingColumn). + Limit(w.batchSize) - if worker.position.LastProcessedValue != nil { - sb.Where(sb.GreaterThan(worker.orderingColumn, worker.position.LastProcessedValue)) + if w.position.LastProcessedValue != nil { + sb.Where(sb.GreaterThan(w.orderingColumn, w.position.LastProcessedValue)) } - if worker.position.LatestSnapshotValue != nil { - sb.Where(sb.LessEqualThan(worker.orderingColumn, worker.position.LatestSnapshotValue)) + if w.position.LatestSnapshotValue != nil { + sb.Where(sb.LessEqualThan(w.orderingColumn, w.position.LatestSnapshotValue)) } query, args := sb.Build() - worker.rows, err = worker.db.QueryxContext(ctx, query, args...) + w.rows, err = w.db.QueryxContext(ctx, query, args...) if err != nil { return fmt.Errorf("execute select data query %q, %v: %w", query, args, err) } @@ -265,8 +264,8 @@ func (worker *Worker) loadRows(ctx context.Context) error { // populateKeyColumns populates keyColumn from the database metadata // or from the orderingColumn configuration field in the described order if it's empty. -func (worker *Worker) populateKeyColumns(ctx context.Context, schema string) error { - if len(worker.keyColumns) != 0 { +func (w *Worker) populateKeyColumns(ctx context.Context, schema string) error { + if len(w.keyColumns) != 0 { return nil } @@ -280,7 +279,7 @@ func (worker *Worker) populateKeyColumns(ctx context.Context, schema string) err ). Where("tco.constraint_type = 'PRIMARY KEY'") - sb.Where(sb.Equal("kcu.table_name", worker.table)) + sb.Where(sb.Equal("kcu.table_name", w.table)) if schema != "" { sb.Where(sb.Equal("kcu.table_schema", schema)) @@ -288,7 +287,7 @@ func (worker *Worker) populateKeyColumns(ctx context.Context, schema string) err query, args := sb.Build() - rows, err := worker.db.QueryxContext(ctx, query, args...) + rows, err := w.db.QueryxContext(ctx, query, args...) if err != nil { return fmt.Errorf("execute query select primary keys %q, %v: %w", query, args, err) } @@ -300,28 +299,28 @@ func (worker *Worker) populateKeyColumns(ctx context.Context, schema string) err return fmt.Errorf("scan primary key column name: %w", err) } - worker.keyColumns = append(worker.keyColumns, columnName) + w.keyColumns = append(w.keyColumns, columnName) } - if len(worker.keyColumns) == 0 { - worker.keyColumns = []string{worker.orderingColumn} + if len(w.keyColumns) == 0 { + w.keyColumns = []string{w.orderingColumn} } return nil } // latestSnapshotValue returns most recent value of orderingColumn column. -func (worker *Worker) latestSnapshotValue(ctx context.Context) (any, error) { +func (w *Worker) latestSnapshotValue(ctx context.Context) (any, error) { var latestSnapshotValue any query := sqlbuilder.PostgreSQL.NewSelectBuilder(). - Select(worker.orderingColumn). - From(worker.table). - OrderBy(worker.orderingColumn).Desc(). + Select(w.orderingColumn). + From(w.table). + OrderBy(w.orderingColumn).Desc(). Limit(1). String() - rows, err := worker.db.QueryxContext(ctx, query) + rows, err := w.db.QueryxContext(ctx, query) if err != nil { return nil, fmt.Errorf("execute select latest snapshot value query %q: %w", query, err) } @@ -336,15 +335,15 @@ func (worker *Worker) latestSnapshotValue(ctx context.Context) (any, error) { return latestSnapshotValue, nil } -func (worker *Worker) getTablePosition(position *Position, transformedRow map[string]any) TablePosition { - tablePos, ok := position.TablePositions[worker.table] +func (w *Worker) getTablePosition(position *Position, transformedRow map[string]any) TablePosition { + tablePos, ok := position.TablePositions[w.table] if !ok { tablePos = TablePosition{ - LastProcessedValue: transformedRow[worker.orderingColumn], - LatestSnapshotValue: worker.position.LatestSnapshotValue, + LastProcessedValue: transformedRow[w.orderingColumn], + LatestSnapshotValue: w.position.LatestSnapshotValue, } } else { - tablePos.LastProcessedValue = transformedRow[worker.orderingColumn] + tablePos.LastProcessedValue = transformedRow[w.orderingColumn] } return tablePos diff --git a/source/source.go b/source/source.go index b074b49..042a920 100644 --- a/source/source.go +++ b/source/source.go @@ -66,6 +66,8 @@ func (s *Source) Configure(ctx context.Context, cfgRaw commonsConfig.Config) err return err //nolint: wrapcheck // not needed here } + s.config = s.config.Init() + err = s.config.Validate() if err != nil { return fmt.Errorf("error validating configuration: %w", err) diff --git a/source/source_integration_test.go b/source/source_integration_test.go index 7d4a66e..fe21c76 100644 --- a/source/source_integration_test.go +++ b/source/source_integration_test.go @@ -88,11 +88,11 @@ func TestSource_Read_tableHasNoData(t *testing.T) { err = db.PingContext(ctxTimeout) is.NoErr(err) - _, err = db.Exec(fmt.Sprintf("CREATE TABLE %s (col INTEGER, PRIMARY KEY (col));", cfg[config.ConfigTables])) + _, err = db.Exec(fmt.Sprintf("CREATE TABLE %s (col INTEGER, PRIMARY KEY (col));", cfg[config.ConfigTable])) is.NoErr(err) defer func() { - _, err = db.Exec(fmt.Sprintf("DROP TABLE %s;", cfg[config.ConfigTables])) + _, err = db.Exec(fmt.Sprintf("DROP TABLE %s;", cfg[config.ConfigTable])) is.NoErr(err) }() @@ -132,15 +132,15 @@ func TestSource_Read_keyColumnsFromConfig(t *testing.T) { err = db.PingContext(ctxTimeout) is.NoErr(err) - _, err = db.Exec(fmt.Sprintf("CREATE TABLE %s (col1 INTEGER, col2 INTEGER);", cfg[config.ConfigTables])) + _, err = db.Exec(fmt.Sprintf("CREATE TABLE %s (col1 INTEGER, col2 INTEGER);", cfg[config.ConfigTable])) is.NoErr(err) defer func() { - _, err = db.Exec(fmt.Sprintf("DROP TABLE %s;", cfg[config.ConfigTables])) + _, err = db.Exec(fmt.Sprintf("DROP TABLE %s;", cfg[config.ConfigTable])) is.NoErr(err) }() - _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (1, 2);", cfg[config.ConfigTables])) + _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (1, 2);", cfg[config.ConfigTable])) is.NoErr(err) src := NewSource() @@ -185,15 +185,15 @@ func TestSource_Read_keyColumnsFromTableMetadata(t *testing.T) { _, err = db.Exec(fmt.Sprintf( "CREATE TABLE %s (col1 INTEGER, col2 INTEGER, col3 INTEGER, PRIMARY KEY (col1, col2, col3));", - cfg[config.ConfigTables])) + cfg[config.ConfigTable])) is.NoErr(err) defer func() { - _, err = db.Exec(fmt.Sprintf("DROP TABLE %s;", cfg[config.ConfigTables])) + _, err = db.Exec(fmt.Sprintf("DROP TABLE %s;", cfg[config.ConfigTable])) is.NoErr(err) }() - _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (1, 2, 3);", cfg[config.ConfigTables])) + _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (1, 2, 3);", cfg[config.ConfigTable])) is.NoErr(err) src := NewSource() @@ -237,15 +237,15 @@ func TestSource_Read_keyColumnsFromOrderingColumn(t *testing.T) { err = db.PingContext(ctxTimeout) is.NoErr(err) - _, err = db.Exec(fmt.Sprintf("CREATE TABLE %s (col1 INTEGER, col2 INTEGER);", cfg[config.ConfigTables])) + _, err = db.Exec(fmt.Sprintf("CREATE TABLE %s (col1 INTEGER, col2 INTEGER);", cfg[config.ConfigTable])) is.NoErr(err) defer func() { - _, err = db.Exec(fmt.Sprintf("DROP TABLE %s;", cfg[config.ConfigTables])) + _, err = db.Exec(fmt.Sprintf("DROP TABLE %s;", cfg[config.ConfigTable])) is.NoErr(err) }() - _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (1, 2);", cfg[config.ConfigTables])) + _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (1, 2);", cfg[config.ConfigTable])) is.NoErr(err) src := NewSource() @@ -331,11 +331,11 @@ func TestSource_Read_checkTypes(t *testing.T) { time_type time, time_tz_type timetz, varbyte_type varbyte - );`, cfg[config.ConfigTables])) + );`, cfg[config.ConfigTable])) is.NoErr(err) defer func() { - _, err = db.Exec(fmt.Sprintf("DROP TABLE %s;", cfg[config.ConfigTables])) + _, err = db.Exec(fmt.Sprintf("DROP TABLE %s;", cfg[config.ConfigTable])) is.NoErr(err) }() @@ -366,7 +366,7 @@ func TestSource_Read_checkTypes(t *testing.T) { } _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16);", - cfg[config.ConfigTables]), + cfg[config.ConfigTable]), want.SmallIntType, want.IntegerType, want.BigIntType, @@ -446,16 +446,16 @@ func TestSource_Read_snapshotIsFalse(t *testing.T) { err = db.PingContext(ctxTimeout) is.NoErr(err) - _, err = db.Exec(fmt.Sprintf("CREATE TABLE %s (col1 INTEGER, col2 INTEGER);", cfg[config.ConfigTables])) + _, err = db.Exec(fmt.Sprintf("CREATE TABLE %s (col1 INTEGER, col2 INTEGER);", cfg[config.ConfigTable])) is.NoErr(err) defer func() { - _, err = db.Exec(fmt.Sprintf("DROP TABLE %s;", cfg[config.ConfigTables])) + _, err = db.Exec(fmt.Sprintf("DROP TABLE %s;", cfg[config.ConfigTable])) is.NoErr(err) }() // insert a row to be sure that this data will not be transferred to the destination - _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (1, 2);", cfg[config.ConfigTables])) + _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (1, 2);", cfg[config.ConfigTable])) is.NoErr(err) src := NewSource() @@ -470,7 +470,7 @@ func TestSource_Read_snapshotIsFalse(t *testing.T) { is.Equal(err, sdk.ErrBackoffRetry) // insert an additional row - _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (3, 4);", cfg[config.ConfigTables])) + _, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (3, 4);", cfg[config.ConfigTable])) is.NoErr(err) record, err := src.Read(ctx) @@ -491,7 +491,7 @@ func TestSource_Read_snapshotIsFalse(t *testing.T) { // prepareConfig retrieves the value of the environment variable named by envNameDSN, // generates a name of database's table and returns a configuration map. -func prepareConfig(t *testing.T, orderingColumn string, _ ...string) map[string]string { +func prepareConfig(t *testing.T, orderingColumn string, keyColumns ...string) map[string]string { t.Helper() dsn := os.Getenv(envNameDSN) @@ -502,8 +502,9 @@ func prepareConfig(t *testing.T, orderingColumn string, _ ...string) map[string] } return map[string]string{ - config.ConfigDsn: dsn, - config.ConfigTables: fmt.Sprintf("conduit_src_test_%d", time.Now().UnixNano()), - config.ConfigOrderingColumns: orderingColumn, + config.ConfigDsn: dsn, + config.ConfigTable: fmt.Sprintf("conduit_src_test_%d", time.Now().UnixNano()), + config.ConfigOrderingColumn: orderingColumn, + config.ConfigKeyColumns: strings.Join(keyColumns, ","), } } diff --git a/source/source_test.go b/source/source_test.go index 5ddfb5d..406417f 100644 --- a/source/source_test.go +++ b/source/source_test.go @@ -40,19 +40,24 @@ func TestSource_Configure_requiredFieldsSuccess(t *testing.T) { s := Source{} err := s.Configure(context.Background(), map[string]string{ - config.ConfigDsn: testDSN, - config.ConfigTables: testTable, - config.ConfigOrderingColumns: "created_at", + config.ConfigDsn: testDSN, + config.ConfigTable: testTable, + config.ConfigOrderingColumn: "created_at", }) is.NoErr(err) is.Equal(s.config, config.Config{ Configuration: common.Configuration{ DSN: testDSN, }, - Tables: []string{testTable}, - OrderingColumns: []string{"created_at"}, - Snapshot: true, - BatchSize: 1000, + Tables: func() map[string]config.TableConfig { + tables := make(map[string]config.TableConfig) + tables[testTable] = config.TableConfig{OrderingColumn: "created_at"} + + return tables + }(), + OrderingColumn: "created_at", + Snapshot: true, + BatchSize: 1000, }) } @@ -64,21 +69,28 @@ func TestSource_Configure_allFieldsSuccess(t *testing.T) { s := Source{} err := s.Configure(context.Background(), map[string]string{ - config.ConfigDsn: testDSN, - config.ConfigTables: testTable, - config.ConfigOrderingColumns: "created_at", - config.ConfigSnapshot: "false", - config.ConfigBatchSize: "10000", + config.ConfigDsn: testDSN, + config.ConfigTable: testTable, + config.ConfigOrderingColumn: "created_at", + config.ConfigSnapshot: "false", + config.ConfigKeyColumns: "id,name", + config.ConfigBatchSize: "10000", }) is.NoErr(err) is.Equal(s.config, config.Config{ Configuration: common.Configuration{ DSN: testDSN, }, - Tables: []string{testTable}, - OrderingColumns: []string{"created_at"}, - Snapshot: false, - BatchSize: 10000, + Tables: func() map[string]config.TableConfig { + tables := make(map[string]config.TableConfig) + tables[testTable] = config.TableConfig{OrderingColumn: "created_at", KeyColumns: []string{"id", "name"}} + + return tables + }(), + KeyColumns: []string{"id", "name"}, + OrderingColumn: "created_at", + Snapshot: false, + BatchSize: 10000, }) } @@ -90,11 +102,11 @@ func TestSource_Configure_failure(t *testing.T) { s := Source{} err := s.Configure(context.Background(), map[string]string{ - config.ConfigDsn: testDSN, - config.ConfigTables: testTable, + config.ConfigDsn: testDSN, + config.ConfigTable: testTable, }) is.True(err != nil) - is.Equal(err.Error(), `config invalid: error validating "orderingColumns": required parameter is not provided`) + is.Equal(err.Error(), "error validating configuration: error validating \"tables\": required parameter is not provided") } func TestSource_Read_success(t *testing.T) { @@ -125,7 +137,6 @@ func TestSource_Read_success(t *testing.T) { r, err := s.Read(ctx) is.NoErr(err) - is.Equal(r, record) } From 032c8d14969c79c7a6986fd1bce83732ce19d777 Mon Sep 17 00:00:00 2001 From: Gaurav Sahil Date: Fri, 25 Oct 2024 13:16:28 +0530 Subject: [PATCH 4/4] fix: test cases and minor refactor --- common/error.go | 13 ---- destination/config/config.go | 2 +- destination/config/paramgen.go | 2 +- destination/writer/writer.go | 2 +- source/config/config.go | 10 +-- source/config/config_test.go | 32 --------- source/config/paramgen.go | 5 +- source/iterator/position_test.go | 118 +++++++++++++++++++++++++++++++ source/iterator/worker.go | 11 ++- 9 files changed, 130 insertions(+), 65 deletions(-) diff --git a/common/error.go b/common/error.go index a7b16bc..7e82c13 100644 --- a/common/error.go +++ b/common/error.go @@ -54,16 +54,3 @@ func (e LessThanError) Error() string { func NewLessThanError(fieldName string, value int) LessThanError { return LessThanError{fieldName: fieldName, value: value} } - -type GreaterThanError struct { - fieldName string - value int -} - -func (e GreaterThanError) Error() string { - return fmt.Sprintf("%q value must be greater than or equal to %d", e.fieldName, e.value) -} - -func NewGreaterThanError(fieldName string, value int) GreaterThanError { - return GreaterThanError{fieldName: fieldName, value: value} -} diff --git a/destination/config/config.go b/destination/config/config.go index 3674ea8..4591345 100644 --- a/destination/config/config.go +++ b/destination/config/config.go @@ -34,7 +34,7 @@ type Config struct { common.Configuration // Table is the configuration of the table name. Table string `json:"table" default:"{{ index .Metadata \"opencdc.collection\" }}"` - // KeyColumns is the configuration of comma-separated column names to build the sdk.Record.Key. + // KeyColumns is the configuration of comma-separated column names to build the record key. KeyColumns []string `json:"keyColumns"` } diff --git a/destination/config/paramgen.go b/destination/config/paramgen.go index 0f6613e..3f8a377 100644 --- a/destination/config/paramgen.go +++ b/destination/config/paramgen.go @@ -25,7 +25,7 @@ func (Config) Parameters() map[string]config.Parameter { }, ConfigKeyColumns: { Default: "", - Description: "KeyColumns is the configuration of comma-separated column names to build the sdk.Record.Key.", + Description: "KeyColumns is the configuration of comma-separated column names to build the record key.", Type: config.ParameterTypeString, Validations: []config.Validation{}, }, diff --git a/destination/writer/writer.go b/destination/writer/writer.go index 1b58670..25946fe 100644 --- a/destination/writer/writer.go +++ b/destination/writer/writer.go @@ -240,7 +240,7 @@ func (w *Writer) Stop() error { return nil } -func (w *Writer) preparePayloadAndKey(record opencdc.Record) (map[string]interface{}, map[string]interface{}, error) { +func (w *Writer) preparePayloadAndKey(record opencdc.Record) (opencdc.StructuredData, opencdc.StructuredData, error) { payload, err := w.structurizeData(record.Payload.After) if err != nil { return nil, nil, fmt.Errorf("structurize payload: %w", err) diff --git a/source/config/config.go b/source/config/config.go index 585e632..50b5994 100644 --- a/source/config/config.go +++ b/source/config/config.go @@ -40,7 +40,7 @@ type Config struct { // will take a snapshot of the entire table before starting cdc mode. Snapshot bool `json:"snapshot" default:"true"` // BatchSize is a size of rows batch. - BatchSize int `json:"batchSize" default:"1000"` + BatchSize int `json:"batchSize" default:"1000" validate:"gt=0,lt=100001"` } type TableConfig struct { @@ -107,14 +107,6 @@ func (c Config) Validate() error { } } - // c.BatchSize handling "gte=1" and "lte=100000" validations. - if c.BatchSize < common.MinConfigBatchSize { - return common.NewGreaterThanError(ConfigBatchSize, common.MinConfigBatchSize) - } - if c.BatchSize > common.MaxConfigBatchSize { - return common.NewLessThanError(ConfigBatchSize, common.MaxConfigBatchSize) - } - return nil } diff --git a/source/config/config_test.go b/source/config/config_test.go index 6f4e645..6faecef 100644 --- a/source/config/config_test.go +++ b/source/config/config_test.go @@ -198,38 +198,6 @@ func TestValidateConfig(t *testing.T) { }, wantErr: common.NewLowercaseError(ConfigKeyColumns), }, - { - name: "failure_batch_size_too_small", - in: Config{ - Configuration: common.Configuration{ - DSN: testValueDSN, - }, - Tables: map[string]TableConfig{ - "test_table": { - KeyColumns: []string{"id"}, - OrderingColumn: "updated_at", - }, - }, - BatchSize: 0, - }, - wantErr: common.NewGreaterThanError(ConfigBatchSize, common.MinConfigBatchSize), - }, - { - name: "failure_batch_size_too_large", - in: Config{ - Configuration: common.Configuration{ - DSN: testValueDSN, - }, - Tables: map[string]TableConfig{ - "test_table": { - KeyColumns: []string{"id"}, - OrderingColumn: "updated_at", - }, - }, - BatchSize: 100001, - }, - wantErr: common.NewLessThanError(ConfigBatchSize, common.MaxConfigBatchSize), - }, } for _, tt := range tests { diff --git a/source/config/paramgen.go b/source/config/paramgen.go index afe38e7..80d9fb7 100644 --- a/source/config/paramgen.go +++ b/source/config/paramgen.go @@ -24,7 +24,10 @@ func (Config) Parameters() map[string]config.Parameter { Default: "1000", Description: "BatchSize is a size of rows batch.", Type: config.ParameterTypeInt, - Validations: []config.Validation{}, + Validations: []config.Validation{ + config.ValidationGreaterThan{V: 0}, + config.ValidationLessThan{V: 100001}, + }, }, ConfigDsn: { Default: "", diff --git a/source/iterator/position_test.go b/source/iterator/position_test.go index b373fb2..6ee4b44 100644 --- a/source/iterator/position_test.go +++ b/source/iterator/position_test.go @@ -89,6 +89,78 @@ func TestParseSDKPosition(t *testing.T) { TablePositions: map[string]TablePosition{}, }, }, + { + name: "success_single_table_float64_fields", + in: opencdc.Position(`{ + "tablePositions": { + "table1": { + "lastProcessedValue": 10, + "latestSnapshotValue": 30 + } + } + }`), + wantPos: Position{ + TablePositions: map[string]TablePosition{ + "table1": { + LastProcessedValue: float64(10), + LatestSnapshotValue: float64(30), + }, + }, + }, + }, + { + name: "success_single_table_string_fields", + in: opencdc.Position(`{ + "tablePositions": { + "table1": { + "lastProcessedValue": "abc", + "latestSnapshotValue": "def" + } + } + }`), + wantPos: Position{ + TablePositions: map[string]TablePosition{ + "table1": { + LastProcessedValue: "abc", + LatestSnapshotValue: "def", + }, + }, + }, + }, + { + name: "success_single_table_lastProcessedValue_only", + in: opencdc.Position(`{ + "tablePositions": { + "table1": { + "lastProcessedValue": 10 + } + } + }`), + wantPos: Position{ + TablePositions: map[string]TablePosition{ + "table1": { + LastProcessedValue: float64(10), + }, + }, + }, + }, + { + name: "success_single_table_latestSnapshotValue_only", + in: opencdc.Position(`{ + "tablePositions": { + "table1": { + "latestSnapshotValue": 30 + } + } + }`), + wantPos: Position{ + TablePositions: map[string]TablePosition{ + "table1": { + LatestSnapshotValue: float64(30), + }, + }, + }, + }, { name: "failure_invalid_json", in: opencdc.Position("invalid"), @@ -154,6 +226,52 @@ func TestMarshal(t *testing.T) { }, want: opencdc.Position(`{"tablePositions":{"table1":{"lastProcessedValue":"abc","latestSnapshotValue":"def"},"table2":{"lastProcessedValue":20,"latestSnapshotValue":40}}}`), }, + { + name: "success_single_table_integer_fields", + in: Position{ + TablePositions: map[string]TablePosition{ + "table1": { + LastProcessedValue: 10, + LatestSnapshotValue: 30, + }, + }, + }, + want: opencdc.Position(`{"tablePositions":{"table1":{"lastProcessedValue":10,"latestSnapshotValue":30}}}`), + }, + { + name: "success_single_table_string_fields", + in: Position{ + TablePositions: map[string]TablePosition{ + "table1": { + LastProcessedValue: "abc", + LatestSnapshotValue: "def", + }, + }, + }, + want: opencdc.Position(`{"tablePositions":{"table1":{"lastProcessedValue":"abc","latestSnapshotValue":"def"}}}`), + }, + { + name: "success_single_table_lastProcessedValue_only", + in: Position{ + TablePositions: map[string]TablePosition{ + "table1": { + LastProcessedValue: float64(10), + }, + }, + }, + want: opencdc.Position(`{"tablePositions":{"table1":{"lastProcessedValue":10,"latestSnapshotValue":null}}}`), + }, + { + name: "success_single_table_latestSnapshotValue_only", + in: Position{ + TablePositions: map[string]TablePosition{ + "table1": { + LatestSnapshotValue: float64(30), + }, + }, + }, + want: opencdc.Position(`{"tablePositions":{"table1":{"lastProcessedValue":null,"latestSnapshotValue":30}}}`), + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/source/iterator/worker.go b/source/iterator/worker.go index 7eac841..89799c9 100644 --- a/source/iterator/worker.go +++ b/source/iterator/worker.go @@ -220,14 +220,11 @@ func (w *Worker) Next(_ context.Context) (opencdc.Record, error) { // Stop stops worker. func (w *Worker) Stop() error { - var err error - if w.rows != nil { - err = w.rows.Close() - } - - if err != nil { - return fmt.Errorf("close db rows: %w", err) + err := w.rows.Close() + if err != nil { + return fmt.Errorf("close db rows: %w", err) + } } return nil