From 63210cc305254a2a452012e610e1d7071995bae0 Mon Sep 17 00:00:00 2001 From: Maksym Sitarchuk Date: Wed, 21 Dec 2022 13:21:46 +0200 Subject: [PATCH] add source --- README.md | 2 +- columntypes/columntypes.go | 4 +- connector.go | 3 +- destination/destination.go | 118 ----- destination/destination_integration_test.go | 532 -------------------- destination/destination_test.go | 273 ---------- destination/interface.go | 29 -- destination/mock/destination.go | 92 ---- destination/writer/errors.go | 24 - destination/writer/writer.go | 253 ---------- 10 files changed, 4 insertions(+), 1326 deletions(-) delete mode 100644 destination/destination.go delete mode 100644 destination/destination_integration_test.go delete mode 100644 destination/destination_test.go delete mode 100644 destination/interface.go delete mode 100644 destination/mock/destination.go delete mode 100644 destination/writer/errors.go delete mode 100644 destination/writer/writer.go diff --git a/README.md b/README.md index 2b26c2a..035b069 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ If a record contains a `sqlserver.table` property in its metadata it will be ins to use the table configured in the connector. Thus, a Destination can support multiple tables in a single connector, as long as the user has proper access to those tables. -### Source +## Source The source connects to the database using the provided connection and starts creating records for each table row and each detected change. diff --git a/columntypes/columntypes.go b/columntypes/columntypes.go index 0860eab..e4f00c1 100644 --- a/columntypes/columntypes.go +++ b/columntypes/columntypes.go @@ -29,8 +29,8 @@ import ( const ( // sql server date, time column types. dateType = "date" - datetime2Type = "datetime2Type" - datetimeType = "datetimeType" + datetime2Type = "datetime2" + datetimeType = "datetime" datetimeOffsetType = "datetimeoffset" smallDateTimeType = "smalldatetime" timeType = "time" diff --git a/connector.go b/connector.go index bb7fc97..2238464 100644 --- a/connector.go +++ b/connector.go @@ -17,12 +17,11 @@ package sqlserver import ( sdk "github.com/conduitio/conduit-connector-sdk" - "github.com/conduitio-labs/conduit-connector-sql-server/destination" "github.com/conduitio-labs/conduit-connector-sql-server/source" ) var Connector = sdk.Connector{ NewSpecification: Specification, NewSource: source.New, - NewDestination: destination.New, + NewDestination: nil, } diff --git a/destination/destination.go b/destination/destination.go deleted file mode 100644 index 61e8193..0000000 --- a/destination/destination.go +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright © 2022 Meroxa, Inc & Yalantis. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package destination - -import ( - "context" - "database/sql" - "fmt" - - sdk "github.com/conduitio/conduit-connector-sdk" - - _ "github.com/denisenkom/go-mssqldb" //nolint:revive,nolintlint - - "github.com/conduitio-labs/conduit-connector-sql-server/config" - "github.com/conduitio-labs/conduit-connector-sql-server/destination/writer" -) - -// Destination SQL Server Connector persists records to a sql server database. -type Destination struct { - sdk.UnimplementedDestination - - writer Writer - config config.Config -} - -// New creates new instance of the Destination. -func New() sdk.Destination { - return &Destination{} -} - -// Parameters returns a map of named sdk.Parameters that describe how to configure the Destination. -func (d *Destination) Parameters() map[string]sdk.Parameter { - return map[string]sdk.Parameter{ - config.KeyConnection: { - Description: "Connection string to SQL Server", - Required: true, - Default: "", - }, - config.KeyTable: { - Description: "A name of the table that the connector should write to.", - Required: true, - Default: "", - }, - } -} - -// Configure parses and initializes the config. -func (d *Destination) Configure(ctx context.Context, cfg map[string]string) error { - configuration, err := config.Parse(cfg) - if err != nil { - return fmt.Errorf("parse config: %w", err) - } - - d.config = configuration - - return nil -} - -// Open makes sure everything is prepared to receive records. -func (d *Destination) Open(ctx context.Context) error { - db, err := sql.Open("mssql", d.config.Connection) - if err != nil { - return fmt.Errorf("connect to sql server: %w", err) - } - - if err = db.PingContext(ctx); err != nil { - return fmt.Errorf("ping sql server: %w", err) - } - - d.writer, err = writer.NewWriter(ctx, writer.Params{ - DB: db, - Table: d.config.Table, - }) - - if err != nil { - return fmt.Errorf("new writer: %w", err) - } - - return nil -} - -// Write writes a record into a Destination. -func (d *Destination) Write(ctx context.Context, records []sdk.Record) (int, error) { - for i, record := range records { - err := sdk.Util.Destination.Route(ctx, record, - d.writer.Insert, - d.writer.Update, - d.writer.Delete, - d.writer.Insert, - ) - if err != nil { - return i, fmt.Errorf("route %s: %w", record.Operation.String(), err) - } - } - - return len(records), nil -} - -// Teardown gracefully closes connections. -func (d *Destination) Teardown(ctx context.Context) error { - if d.writer != nil { - return d.writer.Close(ctx) - } - - return nil -} diff --git a/destination/destination_integration_test.go b/destination/destination_integration_test.go deleted file mode 100644 index d58e2d8..0000000 --- a/destination/destination_integration_test.go +++ /dev/null @@ -1,532 +0,0 @@ -// Copyright © 2022 Meroxa, Inc & Yalantis. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package destination - -import ( - "context" - "database/sql" - "errors" - "fmt" - "os" - "testing" - "time" - - sdk "github.com/conduitio/conduit-connector-sdk" - - "github.com/conduitio-labs/conduit-connector-sql-server/config" -) - -const ( - integrationTable = "conduit_integration_test_table" - - // queries. - queryCreateTable = ` -CREATE TABLE %s ( - id int NOT NULL PRIMARY KEY, - cl_bigint BIGINT, - cl_tinyint TINYINT, - cl_varchar VARCHAR(40), - cl_text TEXT, - cl_date DATE, - cl_numeric NUMERIC(10,5), - cl_decimal DECIMAL(10,5), - cl_varbinary VARBINARY(100), - cl_float FLOAT - ) -` - queryDropTable = ` - IF EXISTS(SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '%s') - DROP TABLE %s; -` -) - -func TestIntegrationDestination_Write_Insert_Success(t *testing.T) { - var preparedID = 1 - - ctx := context.Background() - - cfg, err := prepareConfig() - if err != nil { - t.Log(err) - t.Skip(err) - } - - db, err := sql.Open("mssql", cfg[config.KeyConnection]) - if err != nil { - t.Fatal(err) - } - - defer db.Close() - - if err = db.PingContext(ctx); err != nil { - t.Fatal(err) - } - - err = prepareTable(ctx, db) - if err != nil { - t.Fatal(err) - } - - defer clearData(ctx, cfg[config.KeyConnection]) //nolint:errcheck,nolintlint - - dest := New() - - err = dest.Configure(ctx, cfg) - if err != nil { - t.Error(err) - } - - err = dest.Open(ctx) - if err != nil { - t.Error(err) - } - - preparedData := map[string]any{ - "id": preparedID, - "cl_bigint": 321765482, - "cl_tinyint": 2, - "cl_varchar": "test", - "cl_text": "text_test", - "cl_date": time.Date( - 2009, 11, 17, 20, 34, 58, 651387237, time.UTC), - "cl_numeric": 1234.1234, - "cl_decimal": 1234.1234, - "cl_varbinary": []byte("some test"), - "cl_float": 1234.1234, - } - - count, err := dest.Write(ctx, []sdk.Record{ - { - Payload: sdk.Change{After: sdk.StructuredData(preparedData)}, - Operation: sdk.OperationSnapshot, - Key: sdk.StructuredData{"id": "1"}, - }, - }, - ) - - if err != nil { - t.Error(err) - } - - if count != 1 { - t.Error(errors.New("count mismatched")) - } - - // check if row exist by id - rows, err := db.QueryContext(ctx, fmt.Sprintf("SELECT id FROM %s", integrationTable)) - if err != nil { - t.Error(err) - } - - defer rows.Close() - - var id int - for rows.Next() { - err = rows.Scan(&id) - if err != nil { - t.Error(err) - } - } - - if id != preparedID { - t.Error(errors.New("id and prepared id not equal")) - } - - err = dest.Teardown(ctx) - if err != nil { - t.Error(err) - } -} - -func TestIntegrationDestination_Write_Update_Success(t *testing.T) { - var preparedVarchar = "updated_test" - - ctx := context.Background() - - cfg, err := prepareConfig() - if err != nil { - t.Log(err) - t.Skip(err) - } - - db, err := sql.Open("mssql", cfg[config.KeyConnection]) - if err != nil { - t.Fatal(err) - } - - defer db.Close() - - if err = db.PingContext(ctx); err != nil { - t.Fatal(err) - } - - err = prepareTable(ctx, db) - if err != nil { - t.Fatal(err) - } - - defer clearData(ctx, cfg[config.KeyConnection]) //nolint:errcheck,nolintlint - - dest := New() - - err = dest.Configure(ctx, cfg) - if err != nil { - t.Error(err) - } - - err = dest.Open(ctx) - if err != nil { - t.Error(err) - } - - preparedData := map[string]any{ - "id": 1, - "cl_bigint": 321765482, - "cl_tinyint": 2, - "cl_varchar": "test", - "cl_text": "text_test", - "cl_date": time.Date( - 2009, 11, 17, 20, 34, 58, 651387237, time.UTC), - "cl_numeric": 1234.1234, - "cl_decimal": 1234.1234, - "cl_varbinary": []byte("some test"), - "cl_float": 1234.1234, - } - - count, err := dest.Write(ctx, []sdk.Record{ - { - Payload: sdk.Change{After: sdk.StructuredData(preparedData)}, - Operation: sdk.OperationSnapshot, - Key: sdk.StructuredData{"id": "1"}, - }, - }, - ) - - if err != nil { - t.Error(err) - } - - if count != 1 { - t.Error(errors.New("count mismatched")) - } - - preparedData["cl_varchar"] = preparedVarchar - - _, err = dest.Write(ctx, []sdk.Record{ - { - Payload: sdk.Change{After: sdk.StructuredData(preparedData)}, - Operation: sdk.OperationUpdate, - Key: sdk.StructuredData{"id": "1"}, - }, - }, - ) - if err != nil { - t.Error(err) - } - - // check if value was updated - rows, err := db.QueryContext(ctx, fmt.Sprintf("SELECT cl_varchar FROM %s", integrationTable)) - if err != nil { - t.Error(err) - } - - defer rows.Close() - - var clVarchar string - for rows.Next() { - err = rows.Scan(&clVarchar) - if err != nil { - t.Error(err) - } - } - - if clVarchar != preparedVarchar { - t.Error(errors.New("clVarchar and preparedVarchar not equal")) - } - - err = dest.Teardown(ctx) - if err != nil { - t.Error(err) - } -} - -func TestIntegrationDestination_Write_Update_Composite_Keys_Success(t *testing.T) { - var preparedVarchar = "updated_test" - - ctx := context.Background() - - cfg, err := prepareConfig() - if err != nil { - t.Log(err) - t.Skip(err) - } - - db, err := sql.Open("mssql", cfg[config.KeyConnection]) - if err != nil { - t.Fatal(err) - } - - defer db.Close() - - if err = db.PingContext(ctx); err != nil { - t.Fatal(err) - } - - err = prepareTable(ctx, db) - if err != nil { - t.Fatal(err) - } - - defer clearData(ctx, cfg[config.KeyConnection]) //nolint:errcheck,nolintlint - - dest := New() - - err = dest.Configure(ctx, cfg) - if err != nil { - t.Error(err) - } - - err = dest.Open(ctx) - if err != nil { - t.Error(err) - } - - preparedData := map[string]any{ - "id": 1, - "cl_bigint": 321765482, - "cl_tinyint": 2, - "cl_varchar": "test", - "cl_text": "text_test", - "cl_date": time.Date( - 2009, 11, 17, 20, 34, 58, 651387237, time.UTC), - "cl_numeric": 1234.1234, - "cl_decimal": 1234.1234, - "cl_varbinary": []byte("some test"), - "cl_float": 1234.1234, - } - - count, err := dest.Write(ctx, []sdk.Record{ - { - Payload: sdk.Change{After: sdk.StructuredData(preparedData)}, - Operation: sdk.OperationSnapshot, - Key: sdk.StructuredData{"id": "1"}, - }, - }, - ) - - if err != nil { - t.Error(err) - } - - if count != 1 { - t.Error(errors.New("count mismatched")) - } - - preparedData["cl_varchar"] = preparedVarchar - - _, err = dest.Write(ctx, []sdk.Record{ - { - Payload: sdk.Change{After: sdk.StructuredData(preparedData)}, - Operation: sdk.OperationUpdate, - Key: sdk.StructuredData{"id": "1", "cl_tinyint": 2}, - }, - }, - ) - if err != nil { - t.Error(err) - } - - // check if value was updated - rows, err := db.QueryContext(ctx, fmt.Sprintf("SELECT cl_varchar FROM %s", integrationTable)) - if err != nil { - t.Error(err) - } - - defer rows.Close() - - var clVarchar string - for rows.Next() { - err = rows.Scan(&clVarchar) - if err != nil { - t.Error(err) - } - } - - if clVarchar != preparedVarchar { - t.Error(errors.New("clVarchar and preparedVarchar not equal")) - } - - err = dest.Teardown(ctx) - if err != nil { - t.Error(err) - } -} - -func TestIntegrationDestination_Write_Delete_Success(t *testing.T) { - ctx := context.Background() - - cfg, err := prepareConfig() - if err != nil { - t.Log(err) - t.Skip(err) - } - - db, err := sql.Open("mssql", cfg[config.KeyConnection]) - if err != nil { - t.Fatal(err) - } - - defer db.Close() - - if err = db.PingContext(ctx); err != nil { - t.Fatal(err) - } - - err = prepareTable(ctx, db) - if err != nil { - t.Fatal(err) - } - - defer clearData(ctx, cfg[config.KeyConnection]) //nolint:errcheck,nolintlint - - dest := New() - - err = dest.Configure(ctx, cfg) - if err != nil { - t.Error(err) - } - - err = dest.Open(ctx) - if err != nil { - t.Error(err) - } - - preparedData := map[string]any{ - "id": 1, - "cl_bigint": 321765482, - "cl_tinyint": 2, - "cl_varchar": "test", - "cl_text": "text_test", - "cl_date": time.Date( - 2009, 11, 17, 20, 34, 58, 651387237, time.UTC), - "cl_numeric": 1234.1234, - "cl_decimal": 1234.1234, - "cl_varbinary": []byte("some test"), - "cl_float": 1234.1234, - } - - count, err := dest.Write(ctx, []sdk.Record{ - { - Payload: sdk.Change{After: sdk.StructuredData(preparedData)}, - Operation: sdk.OperationSnapshot, - Key: sdk.StructuredData{"id": "1"}, - }, - }, - ) - if err != nil { - t.Error(err) - } - - if count != 1 { - t.Error(errors.New("count mismatched")) - } - - count, err = dest.Write(ctx, []sdk.Record{ - { - Payload: sdk.Change{After: sdk.StructuredData(preparedData)}, - Operation: sdk.OperationDelete, - Key: sdk.StructuredData{"id": "1"}, - }, - }, - ) - if err != nil { - t.Error(err) - } - - if count != 1 { - t.Error(errors.New("count mismatched")) - } - - err = dest.Teardown(ctx) - if err != nil { - t.Error(err) - } - - // check if row exist by id - rows, err := db.QueryContext(ctx, fmt.Sprintf("SELECT count(*) FROM %s", integrationTable)) - if err != nil { - t.Error(err) - } - - defer rows.Close() - - for rows.Next() { - err = rows.Scan(&count) - if err != nil { - t.Error(err) - } - } - - if count != 0 { - t.Error(errors.New("count not zero")) - } -} - -func prepareConfig() (map[string]string, error) { - conn := os.Getenv("SQL_SERVER_CONNECTION") - if conn == "" { - return nil, errors.New("missed env variable 'SQL_SERVER_CONNECTION'") - } - - return map[string]string{ - config.KeyConnection: conn, - config.KeyTable: integrationTable, - }, nil -} - -func prepareTable(ctx context.Context, db *sql.DB) error { - // for case if table exist - _, err := db.ExecContext(ctx, fmt.Sprintf(queryDropTable, integrationTable, integrationTable)) - if err != nil { - return err - } - - _, err = db.ExecContext(ctx, fmt.Sprintf(queryCreateTable, integrationTable)) - if err != nil { - return err - } - - return nil -} - -func clearData(ctx context.Context, connection string) error { - db, err := sql.Open("mssql", connection) - if err != nil { - return fmt.Errorf("connect to sql server: %w", err) - } - - defer db.Close() - - if err = db.PingContext(ctx); err != nil { - return fmt.Errorf("ping sql server: %w", err) - } - - _, err = db.ExecContext(ctx, fmt.Sprintf(queryDropTable, integrationTable, integrationTable)) - if err != nil { - return err - } - - return nil -} diff --git a/destination/destination_test.go b/destination/destination_test.go deleted file mode 100644 index 63bf4b4..0000000 --- a/destination/destination_test.go +++ /dev/null @@ -1,273 +0,0 @@ -// Copyright © 2022 Meroxa, Inc & Yalantis. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package destination - -import ( - "context" - "errors" - "testing" - - sdk "github.com/conduitio/conduit-connector-sdk" - "github.com/golang/mock/gomock" - - "github.com/matryer/is" - - "github.com/conduitio-labs/conduit-connector-sql-server/config" - "github.com/conduitio-labs/conduit-connector-sql-server/destination/mock" - "github.com/conduitio-labs/conduit-connector-sql-server/destination/writer" -) - -func TestDestination_Configure(t *testing.T) { - t.Parallel() - - type args struct { - cfg map[string]string - } - - tests := []struct { - name string - args args - wantErr bool - }{ - { - name: "success", - args: args{ - cfg: map[string]string{ - config.KeyConnection: "sqlserver://sa:passwordx@0.0.0.0?database=mydb&connection+timeout=30", - config.KeyTable: "CLIENTS", - }, - }, - wantErr: false, - }, - { - name: "fail, missing connection", - args: args{ - cfg: map[string]string{ - config.KeyTable: "CLIENTS", - }, - }, - wantErr: true, - }, - { - name: "fail, missing table", - args: args{ - cfg: map[string]string{ - config.KeyConnection: "sqlserver://sa:passwordx@0.0.0.0?database=mydb&connection+timeout=30", - }, - }, - wantErr: true, - }, - } - - for _, tt := range tests { - tt := tt - - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - d := &Destination{} - if err := d.Configure(context.Background(), tt.args.cfg); (err != nil) != tt.wantErr { - t.Errorf("Destination.Configure() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } -} - -func TestDestination_Write(t *testing.T) { - t.Parallel() - - t.Run("success", func(t *testing.T) { - t.Parallel() - - is := is.New(t) - - ctrl := gomock.NewController(t) - ctx := context.Background() - - record := sdk.Record{ - Operation: sdk.OperationCreate, - Key: sdk.StructuredData{ - "ID": 1, - }, - Payload: sdk.Change{After: sdk.StructuredData{ - "ID": 1, - "name": "test", - }, - }, - } - - w := mock.NewMockWriter(ctrl) - w.EXPECT().Insert(ctx, record).Return(nil) - - d := Destination{ - writer: w, - } - - c, err := d.Write(ctx, []sdk.Record{record}) - is.NoErr(err) - - is.Equal(c, 1) - }) - - t.Run("success_update", func(t *testing.T) { - t.Parallel() - - is := is.New(t) - - ctrl := gomock.NewController(t) - ctx := context.Background() - - record := sdk.Record{ - Operation: sdk.OperationUpdate, - Key: sdk.StructuredData{ - "ID": 1, - }, - Payload: sdk.Change{After: sdk.StructuredData{ - "ID": 1, - "name": "test", - }, - }, - } - - w := mock.NewMockWriter(ctrl) - w.EXPECT().Update(ctx, record).Return(nil) - - d := Destination{ - writer: w, - } - - c, err := d.Write(ctx, []sdk.Record{record}) - is.NoErr(err) - - is.Equal(c, 1) - }) - - t.Run("success_delete", func(t *testing.T) { - t.Parallel() - - is := is.New(t) - - ctrl := gomock.NewController(t) - ctx := context.Background() - - record := sdk.Record{ - Operation: sdk.OperationDelete, - Key: sdk.StructuredData{ - "ID": 1, - }, - Payload: sdk.Change{After: sdk.StructuredData{ - "ID": 1, - "name": "test", - }, - }, - } - - w := mock.NewMockWriter(ctrl) - w.EXPECT().Delete(ctx, record).Return(nil) - - d := Destination{ - writer: w, - } - - c, err := d.Write(ctx, []sdk.Record{record}) - is.NoErr(err) - - is.Equal(c, 1) - }) - - t.Run("fail, empty payload", func(t *testing.T) { - t.Parallel() - - is := is.New(t) - - ctrl := gomock.NewController(t) - ctx := context.Background() - - record := sdk.Record{ - Operation: sdk.OperationSnapshot, - Position: sdk.Position("1.0"), - Key: sdk.StructuredData{ - "ID": 1, - }, - } - - w := mock.NewMockWriter(ctrl) - w.EXPECT().Insert(ctx, record).Return(writer.ErrEmptyPayload) - - d := Destination{ - writer: w, - } - - _, err := d.Write(ctx, []sdk.Record{record}) - is.Equal(err != nil, true) - }) -} - -func TestDestination_Teardown(t *testing.T) { - t.Parallel() - - t.Run("success", func(t *testing.T) { - t.Parallel() - - is := is.New(t) - - ctrl := gomock.NewController(t) - ctx := context.Background() - - w := mock.NewMockWriter(ctrl) - w.EXPECT().Close(ctx).Return(nil) - - d := Destination{ - writer: w, - } - - err := d.Teardown(ctx) - is.NoErr(err) - }) - - t.Run("success, writer is nil", func(t *testing.T) { - t.Parallel() - - is := is.New(t) - - ctx := context.Background() - - d := Destination{ - writer: nil, - } - - err := d.Teardown(ctx) - is.NoErr(err) - }) - - t.Run("fail, unexpected error", func(t *testing.T) { - t.Parallel() - - is := is.New(t) - - ctrl := gomock.NewController(t) - ctx := context.Background() - - w := mock.NewMockWriter(ctrl) - w.EXPECT().Close(ctx).Return(errors.New("some error")) - - d := Destination{ - writer: w, - } - - err := d.Teardown(ctx) - is.Equal(err != nil, true) - }) -} diff --git a/destination/interface.go b/destination/interface.go deleted file mode 100644 index 574cb7a..0000000 --- a/destination/interface.go +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright © 2022 Meroxa, Inc & Yalantis. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package destination - -import ( - "context" - - sdk "github.com/conduitio/conduit-connector-sdk" -) - -// Writer defines a writer interface needed for the Destination. -type Writer interface { - Delete(ctx context.Context, record sdk.Record) error - Insert(ctx context.Context, record sdk.Record) error - Update(ctx context.Context, record sdk.Record) error - Close(ctx context.Context) error -} diff --git a/destination/mock/destination.go b/destination/mock/destination.go deleted file mode 100644 index d8aa349..0000000 --- a/destination/mock/destination.go +++ /dev/null @@ -1,92 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: destination/interface.go - -// Package mock is a generated GoMock package. -package mock - -import ( - context "context" - reflect "reflect" - - sdk "github.com/conduitio/conduit-connector-sdk" - gomock "github.com/golang/mock/gomock" -) - -// MockWriter is a mock of Writer interface. -type MockWriter struct { - ctrl *gomock.Controller - recorder *MockWriterMockRecorder -} - -// MockWriterMockRecorder is the mock recorder for MockWriter. -type MockWriterMockRecorder struct { - mock *MockWriter -} - -// NewMockWriter creates a new mock instance. -func NewMockWriter(ctrl *gomock.Controller) *MockWriter { - mock := &MockWriter{ctrl: ctrl} - mock.recorder = &MockWriterMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockWriter) EXPECT() *MockWriterMockRecorder { - return m.recorder -} - -// Close mocks base method. -func (m *MockWriter) Close(ctx context.Context) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Close", ctx) - ret0, _ := ret[0].(error) - return ret0 -} - -// Close indicates an expected call of Close. -func (mr *MockWriterMockRecorder) Close(ctx interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockWriter)(nil).Close), ctx) -} - -// Delete mocks base method. -func (m *MockWriter) Delete(ctx context.Context, record sdk.Record) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Delete", ctx, record) - ret0, _ := ret[0].(error) - return ret0 -} - -// Delete indicates an expected call of Delete. -func (mr *MockWriterMockRecorder) Delete(ctx, record interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockWriter)(nil).Delete), ctx, record) -} - -// Insert mocks base method. -func (m *MockWriter) Insert(ctx context.Context, record sdk.Record) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Insert", ctx, record) - ret0, _ := ret[0].(error) - return ret0 -} - -// Insert indicates an expected call of Insert. -func (mr *MockWriterMockRecorder) Insert(ctx, record interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Insert", reflect.TypeOf((*MockWriter)(nil).Insert), ctx, record) -} - -// Update mocks base method. -func (m *MockWriter) Update(ctx context.Context, record sdk.Record) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Update", ctx, record) - ret0, _ := ret[0].(error) - return ret0 -} - -// Update indicates an expected call of Update. -func (mr *MockWriterMockRecorder) Update(ctx, record interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*MockWriter)(nil).Update), ctx, record) -} diff --git a/destination/writer/errors.go b/destination/writer/errors.go deleted file mode 100644 index 4d56463..0000000 --- a/destination/writer/errors.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright © 2022 Meroxa, Inc & Yalantis. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package writer - -import "errors" - -var ( - // ErrEmptyPayload occurs when there's no payload to insert. - ErrEmptyPayload = errors.New("payload is empty") - // ErrEmptyKey occurs when there is no value for key. - ErrEmptyKey = errors.New("key value must be provided") -) diff --git a/destination/writer/writer.go b/destination/writer/writer.go deleted file mode 100644 index 7c988de..0000000 --- a/destination/writer/writer.go +++ /dev/null @@ -1,253 +0,0 @@ -// Copyright © 2022 Meroxa, Inc & Yalantis. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package writer - -import ( - "context" - "database/sql" - "encoding/json" - "fmt" - - "github.com/huandu/go-sqlbuilder" - - sdk "github.com/conduitio/conduit-connector-sdk" - - "github.com/conduitio-labs/conduit-connector-sql-server/columntypes" -) - -const ( - // metadata related. - metadataTable = "sqlserver.table" -) - -// Writer implements a writer logic for db2 destination. -type Writer struct { - db *sql.DB - table string - keyColumn string - columnTypes map[string]string -} - -// Params is an incoming params for the NewWriter function. -type Params struct { - DB *sql.DB - Table string - KeyColumn string -} - -// NewWriter creates new instance of the Writer. -func NewWriter(ctx context.Context, params Params) (*Writer, error) { - writer := &Writer{ - db: params.DB, - table: params.Table, - keyColumn: params.KeyColumn, - } - - columnTypes, err := columntypes.GetColumnTypes(ctx, writer.db, writer.table) - if err != nil { - return nil, fmt.Errorf("get column types: %w", err) - } - - writer.columnTypes = columnTypes - - return writer, nil -} - -// Close closes the underlying db connection. -func (w *Writer) Close(ctx context.Context) error { - return w.db.Close() -} - -// Delete deletes records by a key. -func (w *Writer) Delete(ctx context.Context, record sdk.Record) error { - tableName := w.getTableName(record.Metadata) - - keys, err := w.structurizeData(record.Key) - if err != nil { - return fmt.Errorf("structurize key: %w", err) - } - - if len(keys) == 0 { - return ErrEmptyKey - } - - query, args := w.buildDeleteQuery(tableName, keys) - - _, err = w.db.ExecContext(ctx, query, args...) - if err != nil { - return fmt.Errorf("exec delete: %w", err) - } - - return nil -} - -// Update updates records by a key. -func (w *Writer) Update(ctx context.Context, record sdk.Record) error { - tableName := w.getTableName(record.Metadata) - - payload, err := w.structurizeData(record.Payload.After) - if err != nil { - return fmt.Errorf("structurize payload: %w", err) - } - - // if payload is empty return empty payload error - if payload == nil { - return ErrEmptyPayload - } - - payload, err = columntypes.ConvertStructureData(ctx, w.columnTypes, payload) - if err != nil { - return fmt.Errorf("convert structure data: %w", err) - } - - keys, err := w.structurizeData(record.Key) - if err != nil { - return fmt.Errorf("structurize key: %w", err) - } - - if len(keys) == 0 { - return ErrEmptyKey - } - - query, args := w.buildUpdateQuery(tableName, keys, payload) - - _, err = w.db.ExecContext(ctx, query, args...) - if err != nil { - return fmt.Errorf("exec update: %w", err) - } - - return nil -} - -// getTableName returns either the records metadata value for table -// or the default configured value for table. -func (w *Writer) getTableName(metadata map[string]string) string { - tableName, ok := metadata[metadataTable] - if !ok { - return w.table - } - - return tableName -} - -// Insert row to sql server db. -func (w *Writer) Insert(ctx context.Context, record sdk.Record) error { - tableName := w.getTableName(record.Metadata) - - payload, err := w.structurizeData(record.Payload.After) - if err != nil { - return fmt.Errorf("structurize payload: %w", err) - } - - // if payload is empty return empty payload error - if payload == nil { - return ErrEmptyPayload - } - - payload, err = columntypes.ConvertStructureData(ctx, w.columnTypes, payload) - if err != nil { - return fmt.Errorf("convert structure data: %w", err) - } - - columns, values := w.extractColumnsAndValues(payload) - - query, args := w.buildInsertQuery(tableName, columns, values) - - _, err = w.db.ExecContext(ctx, query, args...) - if err != nil { - return fmt.Errorf("exec upsert: %w", err) - } - - return nil -} - -// buildDeleteQuery generates an SQL DELETE statement query, -// based on the provided table, and keys. -func (w *Writer) buildDeleteQuery(table string, keys map[string]any) (string, []any) { - db := sqlbuilder.NewDeleteBuilder() - - db.DeleteFrom(table) - - for key, val := range keys { - db.Where( - db.Equal(key, val), - ) - } - - query, args := db.Build() - - return query, args -} - -// structurizeData converts sdk.Data to sdk.StructuredData. -func (w *Writer) structurizeData(data sdk.Data) (sdk.StructuredData, error) { - if data == nil || len(data.Bytes()) == 0 { - return nil, nil - } - - structuredData := make(sdk.StructuredData) - if err := json.Unmarshal(data.Bytes(), &structuredData); err != nil { - return nil, fmt.Errorf("unmarshal data into structured data: %w", err) - } - - return structuredData, nil -} - -// extractColumnsAndValues turns the payload into slices of -// columns and values for inserting into db2. -func (w *Writer) extractColumnsAndValues(payload sdk.StructuredData) ([]string, []any) { - var ( - columns []string - values []any - ) - - for key, value := range payload { - columns = append(columns, key) - values = append(values, value) - } - - return columns, values -} - -func (w *Writer) buildInsertQuery(table string, columns []string, values []any) (string, []any) { - sb := sqlbuilder.NewInsertBuilder() - - sb.InsertInto(table) - sb.Cols(columns...) - sb.Values(values...) - - return sb.Build() -} - -func (w *Writer) buildUpdateQuery(table string, keys, payload map[string]any) (string, []any) { - up := sqlbuilder.NewUpdateBuilder() - - up.Update(table) - - setVal := make([]string, 0) - for key, val := range payload { - setVal = append(setVal, up.Assign(key, val)) - } - - up.Set(setVal...) - - for key, val := range keys { - up.Where( - up.Equal(key, val), - ) - } - - return up.Build() -}