Skip to content

Commit

Permalink
add source snapshot mode
Browse files Browse the repository at this point in the history
  • Loading branch information
maksenius committed Nov 29, 2022
1 parent f3e0486 commit 2115493
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 15 deletions.
21 changes: 13 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,19 @@ and each detected change.

### Configuration options

| Name | Description | Required | Example |
|-------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|------------------------------------------------|
| `connection` | String line for connection to SQL SERVER. More information about it [Connection](https://github.com/denisenkom/go-mssqldb#the-connection-string-can-be-specified-in-one-of-three-formats) | **true** | sqlserver://sa:password@0.0.0.0?database=mydb |
| `table` | The name of a table in the database that the connector should write to, by default. | **true** | users |
| `primaryKey` | Column name that records should use for their `Key` fields. | **true** | id |
| `orderingColumn` | The name of a column that the connector will use for ordering rows. Its values must be unique and suitable for sorting, otherwise, the snapshot won't work correctly. | **true** | id |
| `column` | Comma separated list of column names that should be included in each Record's payload. If the field is not empty it must contain values of the `primaryKey` and `orderingColumn` fields. By default: all rows | false | id,name,age |
| `batchSize` | Size of rows batch. By default is 1000 | false | 100 |
| Name | Description | Required | Example |
|---------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-----------------------------------------------|
| `connection` | String line for connection to SQL SERVER. More information about it [Connection](https://github.com/denisenkom/go-mssqldb#the-connection-string-can-be-specified-in-one-of-three-formats) | **true** | sqlserver://sa:password@0.0.0.0?database=mydb |
| `table` | The name of a table in the database that the connector should write to, by default. | **true** | users |
| `primaryKey` | Column name that records should use for their `Key` fields. | **true** | id |
| `orderingColumn` | The name of a column that the connector will use for ordering rows. Its values must be unique and suitable for sorting, otherwise, the snapshot won't work correctly. | **true** | id |
| `column` | Comma separated list of column names that should be included in each Record's payload. If the field is not empty it must contain values of the `primaryKey` and `orderingColumn` fields. By default: all rows | false | id,name,age |
| `snapshot` | Whether or not the plugin will take a snapshot of the entire table before starting cdc mode, by default true. | false | false |
| `batchSize` | Size of rows batch. By default is 1000 | false | 100 |

### Snapshot
When the connector first starts, snapshot mode is enabled.

First time when the snapshot iterator starts work, it is get max value from `orderingColumn` and saves this value to position.
The snapshot iterator reads all rows, where `orderingColumn` values less or equal maxValue, from the table in batches.

Expand All @@ -66,6 +69,8 @@ If snapshot stops it will parse position from last record and will try gets row

When all records are returned, the connector switches to the CDC iterator.

This behavior is enabled by default, but can be turned off by adding "snapshot":"false" to the Source configuration.

### Change Data Capture (CDC)

This connector implements CDC features for DB2 by adding a tracking table and triggers to populate it. The tracking
Expand Down
6 changes: 2 additions & 4 deletions destination/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ func (w *Writer) Close(ctx context.Context) error {
return w.db.Close()
}

// Delete deletes records by a key. First it looks in the sdk.Record.Key,
// if it doesn't find a key there it will use the default configured value for a key.
// Delete deletes records by a key.
func (w *Writer) Delete(ctx context.Context, record sdk.Record) error {
tableName := w.getTableName(record.Metadata)

Expand All @@ -94,8 +93,7 @@ func (w *Writer) Delete(ctx context.Context, record sdk.Record) error {
return nil
}

// Update updates records by a key. First it looks in the sdk.Record.Key,
// if it doesn't find a key there it will use the default configured value for a key.
// Update updates records by a key.
func (w *Writer) Update(ctx context.Context, record sdk.Record) error {
tableName := w.getTableName(record.Metadata)

Expand Down
18 changes: 18 additions & 0 deletions source/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ const (
KeyBatchSize = "batchSize"
// KeyPrimaryKey is column name that records should use for their `Key` fields.
KeyPrimaryKey string = "primaryKey"
// KeySnapshot is a config name for snapshotMode.
KeySnapshot = "snapshot"

// snapshotDefault is a default value for the Snapshot field.
snapshotDefault = true

// defaultBatchSize is a default value for a BatchSize field.
defaultBatchSize = 1000
)
Expand All @@ -48,6 +54,8 @@ type Config struct {
BatchSize int `key:"batchSize" validate:"gte=1,lte=100000"`
// Key - Column name that records should use for their `Key` fields.
Key string `validate:"max=128"`
// Snapshot whether or not the plugin will take a snapshot of the entire table before starting cdc.
Snapshot bool
}

// Parse maps the incoming map to the Config and validates it.
Expand All @@ -62,6 +70,7 @@ func Parse(cfg map[string]string) (Config, error) {
OrderingColumn: cfg[KeyOrderingColumn],
BatchSize: defaultBatchSize,
Key: cfg[KeyPrimaryKey],
Snapshot: snapshotDefault,
}

if columns := cfg[KeyColumns]; columns != "" {
Expand All @@ -75,6 +84,15 @@ func Parse(cfg map[string]string) (Config, error) {
}
}

if cfg[KeySnapshot] != "" {
snapshot, err := strconv.ParseBool(cfg[KeySnapshot])
if err != nil {
return Config{}, fmt.Errorf("parse %q: %w", KeySnapshot, err)
}

sourceConfig.Snapshot = snapshot
}

if err = validator.Validate(&sourceConfig); err != nil {
return Config{}, fmt.Errorf("validate source config: %w", err)
}
Expand Down
44 changes: 44 additions & 0 deletions source/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func TestParse(t *testing.T) {
OrderingColumn: "ID",
BatchSize: defaultBatchSize,
Key: "ID",
Snapshot: true,
},
wantErr: false,
},
Expand All @@ -76,6 +77,7 @@ func TestParse(t *testing.T) {
OrderingColumn: "ID",
BatchSize: 50,
Key: "ID",
Snapshot: true,
},
wantErr: false,
},
Expand All @@ -100,6 +102,33 @@ func TestParse(t *testing.T) {
BatchSize: 50,
Columns: []string{"ID", "NAME"},
Key: "ID",
Snapshot: true,
},
wantErr: false,
},
{
name: "success, custom snapshot",
args: args{
cfg: map[string]string{
config.KeyConnection: "sqlserver://sa:password@0.0.0.0?database=mydb&connection+timeout=30",
config.KeyTable: "CLIENTS",
KeyPrimaryKey: "ID",
KeyColumns: "ID,NAME",
KeyOrderingColumn: "ID",
KeyBatchSize: "50",
KeySnapshot: "false",
},
},
want: Config{
Config: config.Config{
Connection: "sqlserver://sa:password@0.0.0.0?database=mydb&connection+timeout=30",
Table: "CLIENTS",
},
OrderingColumn: "ID",
BatchSize: 50,
Columns: []string{"ID", "NAME"},
Key: "ID",
Snapshot: false,
},
wantErr: false,
},
Expand Down Expand Up @@ -130,6 +159,21 @@ func TestParse(t *testing.T) {
},
wantErr: true,
},
{
name: "success, snapshot error",
args: args{
cfg: map[string]string{
config.KeyConnection: "sqlserver://sa:password@0.0.0.0?database=mydb&connection+timeout=30",
config.KeyTable: "CLIENTS",
KeyPrimaryKey: "ID",
KeyColumns: "ID,NAME",
KeyOrderingColumn: "ID",
KeyBatchSize: "50",
KeySnapshot: "mode",
},
},
wantErr: true,
},
}

for _, tt := range tests {
Expand Down
2 changes: 1 addition & 1 deletion source/iterator/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (i *CDCIterator) HasNext(ctx context.Context) (bool, error) {
return false, fmt.Errorf("load rows: %w", err)
}

return false, nil
return i.rows.Next(), nil
}

// Next get new record.
Expand Down
3 changes: 2 additions & 1 deletion source/iterator/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func NewCombinedIterator(
conn, table, key, orderingColumn string,
columns []string,
batchSize int,
snapshot bool,
sdkPosition sdk.Position,
) (*CombinedIterator, error) {
var err error
Expand Down Expand Up @@ -98,7 +99,7 @@ func NewCombinedIterator(
return nil, fmt.Errorf("parse position: %w", err)
}

if pos == nil || pos.IteratorType == position.TypeSnapshot {
if snapshot && (pos == nil || pos.IteratorType == position.TypeSnapshot) {
it.snapshot, err = NewSnapshotIterator(ctx, db, it.table, it.orderingColumn, it.key, it.columns,
it.batchSize, pos, it.columnTypes)
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ func (s *Source) Parameters() map[string]sdk.Parameter {
Required: false,
Default: "1000",
},
KeySnapshot: {
Description: "Whether or not the plugin will take a snapshot of the entire table before starting ",
Required: false,
Default: "true",
},
}
}

Expand Down Expand Up @@ -100,7 +105,7 @@ func (s *Source) Open(ctx context.Context, rp sdk.Position) error {
}

s.iterator, err = iterator.NewCombinedIterator(ctx, db, s.config.Connection, s.config.Table, s.config.Key,
s.config.OrderingColumn, s.config.Columns, s.config.BatchSize, rp)
s.config.OrderingColumn, s.config.Columns, s.config.BatchSize, s.config.Snapshot, rp)
if err != nil {
return fmt.Errorf("create combined iterator: %w", err)
}
Expand Down
80 changes: 80 additions & 0 deletions source/source_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"os"
"reflect"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -555,6 +556,85 @@ func TestSource_CDC_Success(t *testing.T) {
}
}

func TestSource_Snapshot_Off(t *testing.T) {
t.Parallel()

ctx := context.Background()

tableName := randomIdentifier(t)

cfg, err := prepareConfigMap(tableName)
if err != nil {
t.Log(err)
t.Skip()
}

db, err := sql.Open("mssql", cfg[config.KeyConnection])
if err != nil {
t.Fatal(err)
}

if err = db.PingContext(ctx); err != nil {
t.Fatal(err)
}

t.Cleanup(func() {
_, err = db.ExecContext(ctx, fmt.Sprintf(queryDropTable, tableName))
if err != nil {
t.Log(err)
}

_, err = db.ExecContext(ctx, fmt.Sprintf(queryDropTrackingTable, tableName))
if err != nil {
t.Log(err)
}

db.Close()
})

err = prepareData(ctx, db, tableName)
if err != nil {
t.Fatal(err)
}

// turn off snapshot
cfg[KeySnapshot] = "false"

s := new(Source)

err = s.Configure(ctx, cfg)
if err != nil {
t.Fatal(err)
}

// Start first time with nil position.
err = s.Open(ctx, nil)
if err != nil {
t.Fatal(err)
}

// update.
_, err = db.ExecContext(ctx, fmt.Sprintf(queryUpdate, tableName))
if err != nil {
t.Fatal(err)
}

// Check read. Snapshot data must be missed.
r, err := s.Read(ctx)
if err != nil {
t.Fatal(err)
}

if !reflect.DeepEqual(r.Operation, sdk.OperationUpdate) {
t.Fatal(errors.New("not wanted type"))
}

err = s.Teardown(ctx)
if err != nil {
t.Fatal(err)
}
}

func prepareConfigMap(table string) (map[string]string, error) {
connection := os.Getenv("SQL_SERVER_CONNECTION")

Expand Down

0 comments on commit 2115493

Please sign in to comment.