Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: multiple collection support in source/destination connector #97

Open
wants to merge 6 commits into
base: feat/sdk-updates
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ to the environment variables as an `REDSHIFT_DSN`.

## Source

Conduit's Redshift source connector allows you to move data from a Redshift table with the specified `dsn` and `table`
configuration parameters. Upon starting, the source connector takes a snapshot of a given table in the database, then
Conduit's Redshift source connector allows you to move data from multiple Redshift tables with the specified `dsn` and `tables`
configuration parameters. Upon starting, the source connector takes a snapshot the tables in the database, then
switches into change data capture (CDC) mode. In CDC mode, the connector will only detect new rows.

### Snapshots

At the first launch of the connector, the snapshot mode is enabled and the last value of the `orderingColumn` is stored
At the first launch of the connector, the snapshot mode is enabled and the last value of the `orderingColumn` corresponding to a table is stored
to the position, to know the boundary of this mode. The connector reads all rows of a table in batches, using a
keyset pagination, limiting the rows by `batchSize` and ordering by `orderingColumn`. The connector stores the
keyset pagination, limiting the rows by `batchSize` and ordering by `orderingColumn` and then does the same for other tables. The connector stores the
last processed element value of an `orderingColumn` in a position, so the snapshot process can be paused and resumed
without losing data. Once all rows in that initial snapshot are read the connector switches into CDC mode.

Expand All @@ -45,10 +45,9 @@ pagination, limiting by `batchSize` and ordering by `orderingColumn`.
| name | description | required | example | default value |
|------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------------------------------------------------------|---------------|
| `dsn` | [DSN](https://en.wikipedia.org/wiki/Data_source_name) to connect to Redshift. | **true** | `postgres://username:password@endpoint:5439/database` | |
| `table` | Name of the table from which the connector reads from. | **true** | `table_name` | |
hariso marked this conversation as resolved.
Show resolved Hide resolved
| `orderingColumn` | Column used to order the rows. <br /> Keep in mind that the data will be sorted by this column, so the column must contain unique, consistent values suitable for sorting. | **true** | `id` | |
| `tables` | Name of the tables from which the connector reads from. | **true** | `table1,table2` | |
| `orderingColumns` | Corresponding Columns for the tables used to order the rows. <br /> Keep in mind that the data will be sorted by this column, so the column must contain unique, consistent values suitable for sorting. | **true** | `id,created_at` | |
| `snapshot` | Whether the connector will take a snapshot of the entire table before starting cdc mode. | false | `false` | "true" |
| `keyColumns` | Comma-separated list of column names to build the `sdk.Record.Key`. See more: [Key handling](#key-handling). | false | `id,name` | |
| `batchSize` | Size of rows batch. Min is 1 and max is 100000. | false | `100` | "1000" |

### Key handling
Expand All @@ -60,7 +59,7 @@ of `sdk.Record.Key` field are taken from `sdk.Payload.After` by the keys of this

### Table Name

For each record, the connector adds a `redshift.table` property to the metadata that contains the table name.
For each record, the connector adds a `opencdc.collection` property to the metadata that contains the table name.

## Destination

Expand All @@ -74,8 +73,7 @@ configuration parameters. It takes an `sdk.Record` and parses it into a valid SQ
| name | description | required | example | default |
|--------------|-------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------------------------------------------------------|---------|
| `dsn` | [DSN](https://en.wikipedia.org/wiki/Data_source_name) to connect to Redshift. | **true** | `postgres://username:password@endpoint:5439/database` | "" |
| `table` | Name of the table the connector writes to. | **true** | `table_name` | "" |
| `keyColumns` | Comma-separated list of column names to build the where clause in case if `sdk.Record.Key` is empty.<br /> See more: [Key handling](#key-handling-1). | false | `id,name` | "" |
| `table` | Name of the table the connector writes to. | **false** | `table_name` | "{{ index .Metadata "opencdc.collection" }}" |

### Key handling

Expand All @@ -88,7 +86,7 @@ configuration parameters. It takes an `sdk.Record` and parses it into a valid SQ

### Table Name

Records are written to the table specified by the `redshift.table` property in the metadata, if it exists.
Records are written to the table specified by the `opencdc.collection` property in the metadata, if it exists.
If not, the connector will fall back to the `table` configured in the connector.

## Known limitations
Expand Down
10 changes: 5 additions & 5 deletions acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
// envNameDSN is a Redshift dsn environment name.
envNameDSN = "REDSHIFT_DSN"
// metadataFieldTable is a name of a record metadata field that stores a Redshift table name.
metadataFieldTable = "redshift.table"
metadataFieldTable = "opencdc.collection"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: we can use opencdc.MetadataCollection from Conduit Commons.

)

type driver struct {
Expand All @@ -53,7 +53,7 @@ func (d *driver) GenerateRecord(_ *testing.T, operation opencdc.Operation) openc
Position: nil,
Operation: operation,
Metadata: map[string]string{
metadataFieldTable: d.Config.SourceConfig[srcConfig.ConfigTable],
metadataFieldTable: d.Config.SourceConfig[srcConfig.ConfigTables],
},
Key: opencdc.StructuredData{
"col1": d.id,
Expand All @@ -71,9 +71,9 @@ func TestAcceptance(t *testing.T) {
}

cfg := map[string]string{
srcConfig.ConfigDsn: dsn,
srcConfig.ConfigTable: fmt.Sprintf("conduit_test_%d", time.Now().UnixNano()),
srcConfig.ConfigOrderingColumn: "col1",
srcConfig.ConfigDsn: dsn,
srcConfig.ConfigTables: fmt.Sprintf("conduit_test_%d", time.Now().UnixNano()),
srcConfig.ConfigOrderingColumns: "col1",
}

sdk.AcceptanceTest(t, &driver{
Expand Down
40 changes: 0 additions & 40 deletions common/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@

package common

import (
"strings"
)

const (
MaxConfigStringLength = 127
ConfigTable = "table"
Expand All @@ -28,40 +24,4 @@ const (
type Configuration struct {
// DSN is the configuration of the data source name to connect to the Amazon Redshift.
DSN string `json:"dsn" validate:"required"`
// Table is the configuration of the table name.
Table string `json:"table" validate:"required"`
// KeyColumns is the configuration list of column names to build the opencdc.Record.Key (for Source).
KeyColumns []string `json:"keyColumns"`
}

// Validate executes manual validations beyond what is defined in struct tags.
func (c *Configuration) Validate() error {
// c.DSN has required validation handled in struct tag

// c.Table required validation is handled in stuct tag
// handling "lowercase", "excludesall= " and "lte=127" validations
if c.Table != strings.ToLower(c.Table) {
return NewLowercaseError(ConfigTable)
}
if strings.Contains(c.Table, " ") {
return NewExcludesSpacesError(ConfigTable)
}
if len(c.Table) > MaxConfigStringLength {
return NewLessThanError(ConfigTable, MaxConfigStringLength)
}

// c.KeyColumns handling "lowercase", "excludesall= " and "lte=127" validations
for _, v := range c.KeyColumns {
if v != strings.ToLower(v) {
return NewLowercaseError(ConfigKeyColumns)
}
if strings.Contains(v, " ") {
return NewExcludesSpacesError(ConfigKeyColumns)
}
if len(v) > MaxConfigStringLength {
return NewLessThanError(ConfigKeyColumns, MaxConfigStringLength)
}
}

return nil
}
23 changes: 23 additions & 0 deletions common/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,26 @@ func (e GreaterThanError) Error() string {
func NewGreaterThanError(fieldName string, value int) GreaterThanError {
return GreaterThanError{fieldName: fieldName, value: value}
}

type NoTablesOrColumnsError struct{}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's simpler to just use errors.New() for this one?


func (e NoTablesOrColumnsError) Error() string {
return "at least one table and one corresponding ordering column must be provided"
}

func NewNoTablesOrColumnsError() NoTablesOrColumnsError {
return NoTablesOrColumnsError{}
}

type MismatchedTablesAndColumnsError struct {
tablesCount int
orderingColumnsCount int
}

func (e MismatchedTablesAndColumnsError) Error() string {
return fmt.Sprintf("the number of tables (%d) and ordering columns (%d) must match", e.tablesCount, e.orderingColumnsCount)
}

func NewMismatchedTablesAndColumnsError(tablesCount, orderingColumnsCount int) MismatchedTablesAndColumnsError {
return MismatchedTablesAndColumnsError{tablesCount: tablesCount, orderingColumnsCount: orderingColumnsCount}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, we should prefer using fmt.Errorf() over creating new error types. The fields in the error structs (e.g. tablesCount and orderingColumnsCount) are only ever accessed in the Error() function. If we had some logic for those fields, then I think it might make sense having an error type.

}
20 changes: 19 additions & 1 deletion destination/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,33 @@
package config

import (
"strings"

"github.com/conduitio-labs/conduit-connector-redshift/common"
)

// Config is a destination configuration needed to connect to Redshift database.
type Config struct {
common.Configuration

// Table is used as the target table into which records are inserted.
Table string `json:"table"`
}

// Validate executes manual validations beyond what is defined in struct tags.
func (c *Config) Validate() error {
return c.Configuration.Validate() //nolint:wrapcheck // not needed here
// c.DSN has required validation handled in struct tag

// handling "lowercase", "excludesall= " and "lte=127" validations for c.Table
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm wondering if we should be strict about lower-case. We can lower-case it ourselves, WDYT?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we can do it

if c.Table != strings.ToLower(c.Table) {
return common.NewLowercaseError(ConfigTable)
}
if strings.Contains(c.Table, " ") {
return common.NewExcludesSpacesError(ConfigTable)
}
if len(c.Table) > common.MaxConfigStringLength {
return common.NewLessThanError(ConfigTable, common.MaxConfigStringLength)
}

return nil
}
96 changes: 9 additions & 87 deletions destination/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,110 +36,32 @@ func TestValidateConfig(t *testing.T) {
in *Config
wantErr error
}{
{
hariso marked this conversation as resolved.
Show resolved Hide resolved
name: "success_keyColumns_has_one_key",
in: &Config{
common.Configuration{
DSN: testValueDSN,
Table: testValueTable,
KeyColumns: []string{"id"},
},
},
wantErr: nil,
},
{
name: "success_keyColumns_has_two_keys",
in: &Config{
common.Configuration{
DSN: testValueDSN,
Table: testValueTable,
KeyColumns: []string{"id", "name"},
},
},
wantErr: nil,
},
{
name: "success_keyColumns_ends_with_space",
in: &Config{
common.Configuration{
DSN: testValueDSN,
Table: testValueTable,
KeyColumns: []string{"id", "name "},
},
},
wantErr: common.NewExcludesSpacesError(ConfigKeyColumns),
},
{
name: "success_keyColumns_starts_with_space",
in: &Config{
common.Configuration{
DSN: testValueDSN,
Table: testValueTable,
KeyColumns: []string{"id", "name "},
},
},
wantErr: common.NewExcludesSpacesError(ConfigKeyColumns),
},
{
name: "success_keyColumns_has_two_spaces",
in: &Config{
common.Configuration{
DSN: testValueDSN,
Table: testValueTable,
KeyColumns: []string{"id", " name"},
},
},
wantErr: common.NewExcludesSpacesError(ConfigKeyColumns),
},
{
name: "failure_table_has_space",
in: &Config{
common.Configuration{
DSN: testValueDSN,
Table: "test table",
Table: "test_table ",
Configuration: common.Configuration{
DSN: testValueDSN,
},
},
wantErr: common.NewExcludesSpacesError(ConfigTable),
},
{
name: "failure_table_has_uppercase_letter",
in: &Config{
common.Configuration{
DSN: testValueDSN,
Table: "Test_table",
Table: "Test_table",
Configuration: common.Configuration{
DSN: testValueDSN,
},
},
wantErr: common.NewLowercaseError(ConfigTable),
},
{
name: "failure_keyColumns_has_uppercase_letter",
in: &Config{
common.Configuration{
DSN: testValueDSN,
Table: testValueTable,
KeyColumns: []string{"ID"},
},
},
wantErr: common.NewLowercaseError(ConfigKeyColumns),
},
{
name: "failure_keyColumns_exceeds_max_length",
in: &Config{
common.Configuration{
DSN: testValueDSN,
Table: testValueTable,
KeyColumns: []string{testLongString},
},
},
wantErr: common.NewLessThanError(ConfigKeyColumns, common.MaxConfigStringLength),
},
{
name: "failure_table_exceeds_max_length",
in: &Config{
common.Configuration{
DSN: testValueDSN,
Table: testLongString,
KeyColumns: []string{"id"},
Table: testLongString,
Configuration: common.Configuration{
DSN: testValueDSN,
},
},
wantErr: common.NewLessThanError(ConfigTable, common.MaxConfigStringLength),
Expand Down
17 changes: 4 additions & 13 deletions destination/config/paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 0 additions & 6 deletions destination/destination_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,6 @@ func TestDestination_Write_successKeyColumns(t *testing.T) {
_, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (1, 2);", cfg[config.ConfigTable]))
is.NoErr(err)

// set a KeyColumns field to the config
cfg[config.ConfigKeyColumns] = "col1"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to remove these?

Copy link
Author

@grvsahil grvsahil Oct 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can keep it also, to accept the keys from config now that I know this
Since it accepted multiple keys for table previously and we had to take multiple tables from config, so I removed it as it was not a required field and we could fetch the keys ourselves also.

Anyways I will take this from config like

tables.tableNameFoo.keyColumns:  columnFoo,columnFoo1
tables.tableNameBar.keyColumns:  columnBar


dest := NewDestination()

err = dest.Configure(ctx, cfg)
Expand Down Expand Up @@ -352,9 +349,6 @@ func TestDestination_Write_failedWrongKeyColumnsField(t *testing.T) {
_, err = db.Exec(fmt.Sprintf("INSERT INTO %s VALUES (1, 2);", cfg[config.ConfigTable]))
is.NoErr(err)

// set a wrong KeyColumns field to the config
cfg[config.ConfigKeyColumns] = "wrong_column"

dest := NewDestination()

err = dest.Configure(ctx, cfg)
Expand Down
6 changes: 2 additions & 4 deletions destination/destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,10 @@ func TestDestination_Configure_Fail(t *testing.T) {

d := NewDestination()

err := d.Configure(context.Background(), map[string]string{
config.ConfigDsn: testDSN,
})
err := d.Configure(context.Background(), map[string]string{})
is.True(err != nil)
is.Equal(err.Error(),
`config invalid: error validating "table": required parameter is not provided`)
`config invalid: error validating "dsn": required parameter is not provided`)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should still validate if the tables or table parameter is present.

}

func TestDestination_Write_Success(t *testing.T) {
Expand Down
Loading
Loading