Skip to content

Commit

Permalink
redshift Destination (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
maksenius authored Sep 5, 2023
1 parent 977bc41 commit 195c033
Show file tree
Hide file tree
Showing 22 changed files with 1,550 additions and 86 deletions.
11 changes: 7 additions & 4 deletions .github/workflows/build.yml → .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
name: build
name: Test

on:
push:
branches: [ main ]
pull_request:

jobs:
build:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Check out the code
uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: '1.21'

- name: Test
- name: Run tests
env:
REDSHIFT_DSN: ${{ secrets.REDSHIFT_DSN }}
run: make test GOTEST_FLAGS="-v -count=1"
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,6 @@ issues:
- goerr113
- maintidx
- paralleltest # we don't want to run the integration tests in parallel because we want deterministic results
- path: acceptance_test
linters:
- paralleltest
11 changes: 9 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
GOLANG_CI_LINT_VER=v1.54.2

.PHONY: build
build:
go build -o conduit-connector-redshift cmd/connector/main.go
Expand All @@ -14,9 +16,13 @@ gofumpt:
fmt: gofumpt
gofumpt -l -w .

.PHONY: golangci-lint-install
golangci-lint-install:
go install github.com/golangci/golangci-lint/cmd/golangci-lint@$(GOLANG_CI_LINT_VER)

.PHONY: lint
lint:
golangci-lint run --config .golangci.yml
lint: golangci-lint-install
golangci-lint run -v

.PHONY: dep
dep:
Expand All @@ -26,3 +32,4 @@ dep:
.PHONY: mockgen
mockgen:
mockgen -package mock -source source/source.go -destination source/mock/source.go
mockgen -package mock -source destination/destination.go -destination destination/mock/destination.go
33 changes: 30 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,41 @@ of `sdk.Record.Key` field are taken from `sdk.Payload.After` by the keys of this

For each record, the connector adds a `redshift.table` property to the metadata that contains the table name.

## Known limitations
## Destination

Conduit's Redshift destination connector allows you to move data to a Redshift table with the specified `dsn` and `table`
configuration parameters. It takes an `sdk.Record` and parses it into a valid SQL query.

**Note**: Redshift does not support map or slice types, so they will be stored as marshaled strings.

### Configuration Options

| name | description | required | example | default |
|--------------|-------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------------------------------------------------------|---------|
| `dsn` | [DSN](https://en.wikipedia.org/wiki/Data_source_name) to connect to Redshift. | **true** | `postgres://username:password@endpoint:5439/database` | "" |
| `table` | Name of the table the connector writes to. | **true** | `table_name` | "" |
| `keyColumns` | Comma-separated list of column names to build the where clause in case if `sdk.Record.Key` is empty.<br /> See more: [Key handling](#key-handling-1). | false | `id,name` | "" |

### Key handling

Creating a Source or Destination connector will fail in the next cases:
* When inserting records (i.e. when the CDC operation is `create` or `snapshot`) the key is ignored.
* When updating records (i.e. when the CDC operations is `update`):
* if the record key exists, it's expected to be structured
* if the record key doesn't exist, then it will be built from the `keyColumns` in the payload's `After` field.
* When deleting records (i.e. when the CDC operation is `delete`) the key is required to be present.


### Table Name

Records are written to the table specified by the `redshift.table` property in the metadata, if it exists.
If not, the connector will fall back to the `table` configured in the connector.

## Known limitations

Creating a source or destination connector will fail in the next cases:
- connector does not have access to Redshift;
- user does not have permission;
- table does not exist.

## Useful resources
* [Quotas and limits in Amazon Redshift](https://docs.aws.amazon.com/redshift/latest/mgmt/amazon-redshift-limits.html)

120 changes: 120 additions & 0 deletions acceptance_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright © 2022 Meroxa, Inc. & Yalantis
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package redshift

import (
"fmt"
"os"
"sync/atomic"
"testing"
"time"

"github.com/conduitio-labs/conduit-connector-redshift/config"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/matryer/is"
)

const (
// driverName is a database driver name.
driverName = "pgx"
// envNameDSN is a Redshift dsn environment name.
envNameDSN = "REDSHIFT_DSN"
// metadataFieldTable is a name of a record metadata field that stores a Redshift table name.
metadataFieldTable = "redshift.table"
)

type driver struct {
sdk.ConfigurableAcceptanceTestDriver

id int64
}

// GenerateRecord generates a random sdk.Record.
func (d *driver) GenerateRecord(_ *testing.T, operation sdk.Operation) sdk.Record {
atomic.AddInt64(&d.id, 1)

return sdk.Record{
Position: nil,
Operation: operation,
Metadata: map[string]string{
metadataFieldTable: d.Config.SourceConfig[config.Table],
},
Key: sdk.StructuredData{
"col1": d.id,
},
Payload: sdk.Change{After: sdk.RawData(
fmt.Sprintf(`{"col1":%d,"col2":"%s"}`, d.id, uuid.NewString()),
)},
}
}

func TestAcceptance(t *testing.T) {
dsn := os.Getenv(envNameDSN)
if dsn == "" {
t.Skipf("%s env var must be set", envNameDSN)
}

cfg := map[string]string{
config.DSN: dsn,
config.Table: fmt.Sprintf("conduit_test_%d", time.Now().UnixNano()),
config.OrderingColumn: "col1",
}

sdk.AcceptanceTest(t, &driver{
ConfigurableAcceptanceTestDriver: sdk.ConfigurableAcceptanceTestDriver{
Config: sdk.ConfigurableAcceptanceTestDriverConfig{
Connector: Connector,
SourceConfig: cfg,
DestinationConfig: cfg,
BeforeTest: beforeTest(cfg),
AfterTest: afterTest(cfg),
},
},
})
}

// beforeTest creates the test table.
func beforeTest(cfg map[string]string) func(*testing.T) {
return func(t *testing.T) {
t.Helper()

is := is.New(t)

db, err := sqlx.Open(driverName, cfg[config.DSN])
is.NoErr(err)
defer db.Close()

_, err = db.Exec(fmt.Sprintf("CREATE TABLE %s (col1 INTEGER, col2 VARCHAR(36));", cfg[config.Table]))
is.NoErr(err)
}
}

// afterTest drops the test table.
func afterTest(cfg map[string]string) func(*testing.T) {
return func(t *testing.T) {
t.Helper()

is := is.New(t)

db, err := sqlx.Open(driverName, cfg[config.DSN])
is.NoErr(err)
defer db.Close()

_, err = db.Exec(fmt.Sprintf("DROP TABLE %s", cfg[config.Table]))
is.NoErr(err)
}
}
89 changes: 84 additions & 5 deletions columntypes/columntypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,42 @@ package columntypes

import (
"context"
"encoding/json"
"fmt"
"reflect"
"strconv"
"strings"
"time"

sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/huandu/go-sqlbuilder"
"github.com/jmoiron/sqlx"
)

const (
// TimeTypeLayout is a time layout of Redshift Time data type.
TimeTypeLayout = "15:04:05"
// TimeTzTypeLayout is a time layout of Redshift Time with timezone data type.
TimeTzTypeLayout = "15:04:05Z07"

// Redshift types.
numericType = "numeric"
timeType = "time without time zone"
timeTzType = "time with time zone"

timeTypeLayout = "15:04:05"
timeTzTypeLayout = "15:04:05Z07"

// constants for sql-builder to get column types.
selectColumns = `"column", type`
colTable = "tablename"
colSchema = "schemaname"
tableDef = "pg_table_def"
)

var timeLayouts = []string{
time.RFC3339, time.RFC3339Nano, time.Layout, time.ANSIC, time.UnixDate, time.RubyDate, time.RFC822, time.RFC822Z,
time.RFC850, time.RFC1123, time.RFC1123Z, time.RFC3339, time.RFC3339, time.RFC3339Nano, time.Kitchen, time.Stamp,
time.StampMilli, time.StampMicro, time.StampNano,
}

// GetColumnTypes returns a map containing all table's columns and their database types.
func GetColumnTypes(ctx context.Context, db *sqlx.DB, table, schema string) (map[string]string, error) {
sb := sqlbuilder.PostgreSQL.NewSelectBuilder().
Expand Down Expand Up @@ -104,7 +117,7 @@ func TransformRow(row map[string]any, columnTypes map[string]string) (map[string
return nil, fmt.Errorf("convert %q value to string", value)
}

val, err := time.Parse(timeTypeLayout, valueStr)
val, err := time.Parse(TimeTypeLayout, valueStr)
if err != nil {
return nil, fmt.Errorf("convert time type from string to time: %w", err)
}
Expand All @@ -117,7 +130,7 @@ func TransformRow(row map[string]any, columnTypes map[string]string) (map[string
return nil, fmt.Errorf("convert %q value to string", value)
}

val, err := time.Parse(timeTzTypeLayout, strings.TrimSpace(valueStr))
val, err := time.Parse(TimeTzTypeLayout, strings.TrimSpace(valueStr))
if err != nil {
return nil, fmt.Errorf("convert time with timezone type from string to time: %w", err)
}
Expand All @@ -131,3 +144,69 @@ func TransformRow(row map[string]any, columnTypes map[string]string) (map[string

return result, nil
}

// ConvertStructuredData converts a sdk.StructuredData value to another StructuredData value
// but with proper database types.
func ConvertStructuredData(
columnTypes map[string]string,
data sdk.StructuredData,
) (sdk.StructuredData, error) {
result := make(sdk.StructuredData, len(data))

for key, value := range data {
if value == nil {
result[key] = nil

continue
}

// Redshift does not support map or slice types,
// so it'll be stored as marshaled strings.
//nolint:exhaustive // there is no need to check all types
switch reflect.TypeOf(value).Kind() {
case reflect.Map, reflect.Slice:
bs, err := json.Marshal(value)
if err != nil {
return nil, fmt.Errorf("marshal map or slice type: %w", err)
}

result[key] = string(bs)

continue
}

switch columnTypes[key] {
case timeType:
t, err := parseTime(value.(string))
if err != nil {
return nil, fmt.Errorf("parse time: %w", err)
}

result[key] = t.Format(TimeTypeLayout)
case timeTzType:
t, err := parseTime(value.(string))
if err != nil {
return nil, fmt.Errorf("parse time: %w", err)
}

result[key] = t.Format(TimeTzTypeLayout)
default:
result[key] = value
}
}

return result, nil
}

func parseTime(val string) (time.Time, error) {
for i := range timeLayouts {
timeValue, err := time.Parse(timeLayouts[i], val)
if err != nil {
continue
}

return timeValue, nil
}

return time.Time{}, fmt.Errorf("cannot parse time: %s", val)
}
Loading

0 comments on commit 195c033

Please sign in to comment.