-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: multiple collection support in source/destination connector #97
base: feat/sdk-updates
Are you sure you want to change the base?
Conversation
acceptance_test.go
Outdated
@@ -36,7 +36,7 @@ const ( | |||
// envNameDSN is a Redshift dsn environment name. | |||
envNameDSN = "REDSHIFT_DSN" | |||
// metadataFieldTable is a name of a record metadata field that stores a Redshift table name. | |||
metadataFieldTable = "redshift.table" | |||
metadataFieldTable = "opencdc.collection" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: we can use opencdc.MetadataCollection from Conduit Commons.
common/error.go
Outdated
@@ -67,3 +67,26 @@ func (e GreaterThanError) Error() string { | |||
func NewGreaterThanError(fieldName string, value int) GreaterThanError { | |||
return GreaterThanError{fieldName: fieldName, value: value} | |||
} | |||
|
|||
type NoTablesOrColumnsError struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it's simpler to just use errors.New() for this one?
is.True(err != nil) | ||
is.Equal(err.Error(), | ||
`config invalid: error validating "table": required parameter is not provided`) | ||
`config invalid: error validating "dsn": required parameter is not provided`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should still validate if the tables
or table
parameter is present.
common/error.go
Outdated
} | ||
|
||
func NewMismatchedTablesAndColumnsError(tablesCount, orderingColumnsCount int) MismatchedTablesAndColumnsError { | ||
return MismatchedTablesAndColumnsError{tablesCount: tablesCount, orderingColumnsCount: orderingColumnsCount} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, we should prefer using fmt.Errorf()
over creating new error types. The fields in the error structs (e.g. tablesCount
and orderingColumnsCount
) are only ever accessed in the Error()
function. If we had some logic for those fields, then I think it might make sense having an error type.
source/config/config.go
Outdated
Tables []string `json:"tables" validate:"required"` | ||
// OrderingColumns is a list of corresponding ordering columns for the table | ||
// that the connector will use for ordering rows. | ||
OrderingColumns []string `json:"orderingColumns" validate:"required"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good, but there's something in our tool, paramgen, that can simplify the code and make it a bit safer. It has "dynamic configuration parameters", where it basically allows you to have a map like here: https://github.com/ConduitIO/conduit-connector-generator/blob/main/config.go#L50.
This makes it possible to have a config like this: https://github.com/ConduitIO/conduit-connector-generator?tab=readme-ov-file#collections.
In our case that would something like:
orderingColumns.tableNameFoo: columnFoo
orderingColumns.tableNameBar: columnBar
This is more readable (it's clear which column goes to which table).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, We can also format it like
tables.tableNameFoo.orderingColumn: columnFoo
tables.tableNameBar.orderingColumn: columnBar
Does this look okay ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me!
} else { | ||
// set the LastProcessedValue to skip a snapshot of the entire table | ||
iterator.position.LastProcessedValue = latestSnapshotValue | ||
worker, err := NewWorker(ctx, WorkerConfig{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to start each worker in a separate goroutine, so that work can be parallelized (we're doing something similar in the Postgres connector). It's also closer to the current behavior: to read data from multiple tables, multiple source connectors are needed (one table per source connector), and source connectors work in parallel. We can tackle that in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah right, I'll implement it in a separate PR. The source connector will spawn workers (one for each table) in seperate goroutines and read records using a common channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me! Feel free to open an issue for that so we can track it.:)
source/iterator/iterator.go
Outdated
if !ok { | ||
position = TablePosition{ | ||
LastProcessedValue: nil, | ||
LatestSnapshotValue: nil, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is needed, because you'll get the zero value for the TablePosition
struct, which is an "empty" struct and has all zero values in fields too.
source/iterator/worker.go
Outdated
} | ||
|
||
// HasNext returns a bool indicating whether the source has the next record to return or not. | ||
func (worker *Worker) HasNext(ctx context.Context) (bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: it's common to name the receiver with one letter, e.g. w
in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work! I left a few comments and questions, I believe we can merge this PR pretty soon.:)
// Config is a destination configuration needed to connect to Redshift database. | ||
type Config struct { | ||
common.Configuration | ||
// Table is the configuration of the table name. | ||
Table string `json:"table" default:"{{ index .Metadata \"opencdc.collection\" }}"` | ||
// KeyColumns is the configuration of comma-separated column names to build the sdk.Record.Key. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: sdk.Record.Key
-> "record key", since I think it's pretty clear what that is about (plus, the sdk.Record type is gone, it's opencdc.Record).
// c.BatchSize handling "gte=1" and "lte=100000" validations. | ||
if c.BatchSize < common.MinConfigBatchSize { | ||
return common.NewGreaterThanError(ConfigBatchSize, common.MinConfigBatchSize) | ||
} | ||
if c.BatchSize > common.MaxConfigBatchSize { | ||
return common.NewLessThanError(ConfigBatchSize, common.MaxConfigBatchSize) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our paramgen tool supports validating min and max for a value: https://github.com/ConduitIO/conduit-commons/tree/main/paramgen#parameter-tags (see no. 3).
} else { | ||
// set the LastProcessedValue to skip a snapshot of the entire table | ||
iterator.position.LastProcessedValue = latestSnapshotValue | ||
worker, err := NewWorker(ctx, WorkerConfig{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me! Feel free to open an issue for that so we can track it.:)
name: "success_position_is_nil", | ||
in: Position{}, | ||
want: opencdc.Position(`{"lastProcessedValue":null,"latestSnapshotValue":null}`), | ||
}, | ||
{ | ||
name: "success_integer_fields", | ||
in: Position{ | ||
LastProcessedValue: 10, | ||
LatestSnapshotValue: 30, | ||
}, | ||
want: opencdc.Position(`{"lastProcessedValue":10,"latestSnapshotValue":30}`), | ||
}, | ||
{ | ||
name: "success_string_fields", | ||
in: Position{ | ||
LastProcessedValue: "abc", | ||
LatestSnapshotValue: "def", | ||
}, | ||
want: opencdc.Position(`{"lastProcessedValue":"abc","latestSnapshotValue":"def"}`), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a special reason why these tests are not valid anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of these are actually valid, but should be done for individual tables. Adding them again against table position.
tableName, ok := metadata[metadataFieldTable] | ||
if !ok { | ||
return w.table | ||
func (w *Writer) preparePayloadAndKey(record opencdc.Record) (map[string]interface{}, map[string]interface{}, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick maybe return opencdc.StructuredData instead of the map?
@@ -94,15 +107,20 @@ func (w *Writer) Insert(ctx context.Context, record opencdc.Record) error { | |||
return ErrNoPayload | |||
} | |||
|
|||
payload, err = columntypes.ConvertStructuredData(w.columnTypes, payload) | |||
columnTypes, err := w.getColumnTypes(ctx, table) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this executes a query (to get the column types) for each record?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It retrieves the column types if its already fetched and stored for the record's table or executes the query to get the column types if record from a new table is encountered which isn't already stored.
columnTypes map[string]string | ||
|
||
// iterator is an instance of the iterator | ||
iterator *Iterator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, what's Worker
supposed to do, and what is Iterator
supposed to do? IIUC, Iterator
is a "global" iterator, for the whole source connector, and Worker
is actually an iterator for a single table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes right
if err != nil { | ||
return fmt.Errorf("close db rows: %w", err) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: this can in the body of the if
statement, i.e.:
if w.rows != nil {
err := w.Rows.Close()
if err != nil {
return fmt.Errorf()
}
}
It's the same result, but the scope of err
is smaller.
Description
This PR includes the following changes:
redshift.table
toopencdc.collection
for compatibility with other connectors.Fixes # (issue)
Quick checks: