Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql-server Source #18

Merged
merged 17 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions .github/validate-generated-files.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
name: validate-generated-files

on:
push:
branches: [ main ]
pull_request:

jobs:
validate-generated-files:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.20'

- name: Install Protoc
uses: arduino/setup-protoc@v2

- name: Set up Buf
uses: bufbuild/buf-setup-action@v1

- name: Check generated files
run: |
export PATH=$PATH:$(go env GOPATH)/bin
make install-tools
make generate
make proto-generate
git diff --exit-code --numstat
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v3.6.0
with:
version: v1.52.2
version: v1.53.3
2 changes: 1 addition & 1 deletion .golangci.goheader.template
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright © 2022 Meroxa, Inc & Yalantis.
Copyright © {{ copyright-year }} Meroxa, Inc & Yalantis.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
10 changes: 7 additions & 3 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,20 @@ linters-settings:
ignore-tests: true
goheader:
template-path: '.golangci.goheader.template'
values:
regexp:
copyright-year: 20[2-9]\d
lll:
line-length: 120


linters:
# please, do not use `enable-all`: it's deprecated and will be removed soon.
# inverted configuration with `enable-all` and `disable` is not scalable during updates of golangci-lint
disable-all: true
enable:
- bodyclose
- deadcode
# - deadcode
# - depguard
- dogsled
- durationcheck
Expand Down Expand Up @@ -69,7 +73,7 @@ linters:
- predeclared
- rowserrcheck
- staticcheck
- structcheck
# - structcheck
- stylecheck
- sqlclosecheck
# - tagliatelle
Expand All @@ -80,7 +84,7 @@ linters:
- unconvert
# - unparam
- unused
- varcheck
# - varcheck
- wastedassign
- whitespace
# - wrapcheck
Expand Down
90 changes: 89 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ It provides both, a source and a destination SQL SERVER connector.

### Prerequisites

- [Go](https://go.dev/) '1.20'
- [Go](https://go.dev/) 1.20
- (optional) [golangci-lint](https://github.com/golangci/golangci-lint) 1.48.0
- (optional) [mock](https://github.com/golang/mock) 1.6.0

Expand Down Expand Up @@ -37,3 +37,91 @@ The SQL Server Destination takes a `sdk.Record` and parses it into a valid SQL q
If a record contains a `sqlserver.table` property in its metadata it will be inserted in that table, otherwise it will fall back
to use the table configured in the connector. Thus, a Destination can support multiple tables in a single connector,
as long as the user has proper access to those tables.

## Source

The source connects to the database using the provided connection and starts creating records for each table row
and each detected change.

### Configuration options

| Name | Description | Required | Example |
|---------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-----------------------------------------------|
| `connection` | String line for connection to SQL SERVER. More information about it [Connection](https://github.com/denisenkom/go-mssqldb#the-connection-string-can-be-specified-in-one-of-three-formats) | **true** | sqlserver://sa:password@0.0.0.0?database=mydb |
| `table` | The name of a table in the database that the connector should write to, by default. | **true** | users |
| `primaryKey` | Column name that records should use for their `Key` fields. | **true** | id |
| `orderingColumn` | The name of a column that the connector will use for ordering rows. Its values must be unique and suitable for sorting, otherwise, the snapshot won't work correctly. | **true** | id |
| `column` | Comma separated list of column names that should be included in each Record's payload. If the field is not empty it must contain values of the `primaryKey` and `orderingColumn` fields. By default: all rows | false | id,name,age |
| `snapshot` | Whether or not the plugin will take a snapshot of the entire table before starting cdc mode, by default true. | false | false |
| `batchSize` | Size of rows batch. By default is 1000 | false | 100 |

### Snapshot
When the connector first starts, snapshot mode is enabled.

First time when the snapshot iterator starts work, it is get max value from `orderingColumn` and saves this value to position.
The snapshot iterator reads all rows, where `orderingColumn` values less or equal maxValue, from the table in batches.


Values in the ordering column must be unique and suitable for sorting, otherwise, the snapshot won't work correctly.
Iterators saves last processed value from `orderingColumn` column to position to field `SnapshotLastProcessedVal`.
If snapshot stops it will parse position from last record and will try gets row where `{{orderingColumn}} > {{position.SnapshotLastProcessedVal}}`


When all records are returned, the connector switches to the CDC iterator.

This behavior is enabled by default, but can be turned off by adding "snapshot":"false" to the Source configuration.

### Change Data Capture (CDC)

This connector implements CDC features for DB2 by adding a tracking table and triggers to populate it. The tracking
table has the same name as a target table with the prefix `CONDUIT_TRACKING_`. The tracking table has all the
same columns as the target table plus three additional columns:

| name | description |
|---------------------------------|------------------------------------------------------|
| `CONDUIT_TRACKING_ID` | Autoincrement index for the position. |
| `CONDUIT_OPERATION_TYPE` | Operation type: `insert`, `update`, or `delete`. |
| `CONDUIT_TRACKING_CREATED_DATE` | Date when the event was added to the tacking table. |


Triggers have name pattern `CONDUIT_TRIGGER_{{operation_type}}_{{table}}`.


Queries to retrieve change data from a tracking table are very similar to queries in a Snapshot iterator, but with
`CONDUIT_TRACKING_ID` ordering column.

CDC iterator periodically clears rows which were successfully applied from tracking table.
It collects `CONDUIT_TRACKING_ID` inside the `Ack` method into a batch and clears the tracking table every 5 seconds.

Iterator saves the last `CONDUIT_TRACKING_ID` to the position from the last successfully recorded row.

If connector stops, it will parse position from the last record and will try
to get row where `{{CONDUIT_TRACKING_ID}}` > `{{position.CDCLastID}}`.


### CDC FAQ

#### Is it possible to add/remove/rename column to table?

Yes. You have to stop the pipeline and do the same with conduit tracking table.
For example:
```sql
ALTER TABLE CLIENTS
ADD COLUMN address VARCHAR(18);

ALTER TABLE CONDUIT_TRACKING_CLIENTS
ADD COLUMN address VARCHAR(18);
```

#### I accidentally removed tracking table.

You have to restart pipeline, tracking table will be recreated by connector.

#### I accidentally removed table.

You have to stop the pipeline, remove the conduit tracking table, and then start the pipeline.

#### Is it possible to change table name?

Yes. Stop the pipeline, change the value of the `table` in the Source configuration,
change the name of the tracking table using a pattern `CONDUIT_TRACKING_{{TABLE}}`
187 changes: 187 additions & 0 deletions acceptance_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// Copyright © 2022 Meroxa, Inc & Yalantis.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sqlserver

import (
"database/sql"
"fmt"
"os"
"strings"
"sync/atomic"
"testing"
"time"

sdk "github.com/conduitio/conduit-connector-sdk"

"github.com/conduitio-labs/conduit-connector-sql-server/config"

"github.com/brianvoe/gofakeit"

s "github.com/conduitio-labs/conduit-connector-sql-server/source"
)

const (
queryCreateTestTable = `CREATE TABLE %s (ID INT, NAME VARCHAR(100))`
queryDropTestTable = `DROP TABLE %s`
queryDropTestTrackingTable = `DROP TABLE CONDUIT_TRACKING_%s`
queryIfExistTable = `SELECT count(*) as ct
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE = 'BASE TABLE'
AND TABLE_NAME = ?`
)

type driver struct {
sdk.ConfigurableAcceptanceTestDriver

counter int64
}

// GenerateRecord generates a random sdk.Record.
func (d *driver) GenerateRecord(_ *testing.T, operation sdk.Operation) sdk.Record {
atomic.AddInt64(&d.counter, 1)

return sdk.Record{
Position: nil,
Operation: operation,
Metadata: map[string]string{
config.KeyTable: d.Config.DestinationConfig[config.KeyTable],
},
Key: sdk.StructuredData{
"ID": d.counter,
},
Payload: sdk.Change{After: sdk.RawData(
fmt.Sprintf(
`{"ID":%d,"NAME":"%s"}`, d.counter, gofakeit.Name(),
),
),
},
}
}

//nolint:paralleltest // we don't need paralleltest for the Acceptance tests.
func TestAcceptance(t *testing.T) {
cfg := prepareConfig(t)

sdk.AcceptanceTest(t, &driver{
ConfigurableAcceptanceTestDriver: sdk.ConfigurableAcceptanceTestDriver{
Config: sdk.ConfigurableAcceptanceTestDriverConfig{
Connector: Connector,
SourceConfig: cfg,
DestinationConfig: cfg,
BeforeTest: beforeTest(t, cfg),
},
},
})
}

// beforeTest creates new table before each test.
func beforeTest(_ *testing.T, cfg map[string]string) func(t *testing.T) {
return func(t *testing.T) {
table := randomIdentifier(t)
t.Logf("table under test: %v", table)

cfg[config.KeyTable] = table

err := prepareData(t, cfg)
if err != nil {
t.Fatal(err)
}
}
}

func prepareConfig(t *testing.T) map[string]string {
conn := os.Getenv("SQL_SERVER_CONNECTION")
if conn == "" {
t.Skip("SQL_SERVER_CONNECTION env var must be set")

return nil
}

return map[string]string{
config.KeyConnection: conn,
s.KeyOrderingColumn: "ID",
s.KeyPrimaryKey: "ID",
}
}

func prepareData(t *testing.T, cfg map[string]string) error {
db, err := sql.Open("mssql", cfg[config.KeyConnection])
if err != nil {
return err
}

_, err = db.Exec(fmt.Sprintf(queryCreateTestTable, cfg[config.KeyTable]))
if err != nil {
return err
}

db.Close()

// drop table
t.Cleanup(func() {
db, err = sql.Open("mssql", cfg[config.KeyConnection])
if err != nil {
t.Fatal(err)
}

queryDropTable := fmt.Sprintf(queryDropTestTable, cfg[config.KeyTable])

_, err = db.Exec(queryDropTable)
if err != nil {
t.Errorf("drop test table: %v", err)
}

queryDropTrackingTable := fmt.Sprintf(queryDropTestTrackingTable, cfg[config.KeyTable])

// check if table exist.
rows, er := db.Query(queryIfExistTable, cfg[config.KeyTable])
if rows.Err() != nil {
t.Error(rows.Err())
}
if er != nil {
t.Error(er)
}

defer rows.Close() //nolint:staticcheck,nolintlint

for rows.Next() {
var count int
err = rows.Scan(&count)
if err != nil {
t.Error(er)
}

if count == 1 {
// table exist, setup not needed.
_, err = db.Exec(queryDropTrackingTable)
if err != nil {
t.Errorf("drop test tracking table: %v", err)
}
}
}

if err = db.Close(); err != nil {
t.Errorf("close database: %v", err)
}
})

return nil
}

func randomIdentifier(t *testing.T) string {
return strings.ToUpper(fmt.Sprintf("%v_%d",
strings.ReplaceAll(strings.ToLower(t.Name()), "/", "_"),
time.Now().UnixMicro()%1000))
}
Loading