Skip to content

Commit

Permalink
feat: updated sdk and paramgen
Browse files Browse the repository at this point in the history
  • Loading branch information
parikshitg committed Sep 20, 2024
1 parent 63e8b10 commit 6463a90
Show file tree
Hide file tree
Showing 27 changed files with 1,226 additions and 474 deletions.
10 changes: 5 additions & 5 deletions columntypes/columntypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"strings"
"time"

sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/huandu/go-sqlbuilder"
"github.com/jmoiron/sqlx"
)
Expand Down Expand Up @@ -145,13 +145,13 @@ func TransformRow(row map[string]any, columnTypes map[string]string) (map[string
return result, nil
}

// ConvertStructuredData converts a sdk.StructuredData value to another StructuredData value
// ConvertStructuredData converts a opencdc.StructuredData value to another StructuredData value
// but with proper database types.
func ConvertStructuredData(
columnTypes map[string]string,
data sdk.StructuredData,
) (sdk.StructuredData, error) {
result := make(sdk.StructuredData, len(data))
data opencdc.StructuredData,
) (opencdc.StructuredData, error) {
result := make(opencdc.StructuredData, len(data))

for key, value := range data {
if value == nil {
Expand Down
59 changes: 59 additions & 0 deletions common/configuration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright © 2022 Meroxa, Inc. & Yalantis
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

// const (
// // DSN is the configuration name of the data source name to connect to the Amazon Redshift.
// DSN = "dsn"
// // Table is the configuration name of the table name.
// Table = "table"
// // KeyColumns is the configuration name of comma-separated column names to build the opencdc.Record.Key.
// KeyColumns = "keyColumns"
// )

// Configuration contains common for source and destination configurable values.
type Configuration struct {
// DSN is the configuration of the data source name to connect to the Amazon Redshift.
DSN string `json:"dsn" validate:"required"`
// Table is the configuration of the table name.
Table string `json:"table" validate:"required"` //,lowercase,excludesall= ,lte=127"`

Check failure on line 31 in common/configuration.go

View workflow job for this annotation

GitHub Actions / golangci-lint

commentFormatting: put a space between `//` and comment text (gocritic)
// KeyColumns is the configuration of comma-separated column names to build the opencdc.Record.Key (for Source).
KeyColumns []string `json:"keyColumns"` // validate:"omitempty,dive,lowercase,excludesall= ,lte=127"`
}

// // parseCommon parses a common configuration.
// func parseCommon(cfg map[string]string) (Configuration, error) {
// commonConfig := Configuration{
// DSN: cfg[DSN],
// Table: cfg[Table],
// }

// if cfg[KeyColumns] != "" {
// keyColumns := strings.Split(cfg[KeyColumns], ",")
// for i := range keyColumns {
// if keyColumns[i] == "" {
// return Configuration{}, fmt.Errorf("invalid %q", KeyColumns)
// }

// commonConfig.KeyColumns = append(commonConfig.KeyColumns, strings.TrimSpace(keyColumns[i]))
// }
// }

// if err := validateStruct(commonConfig); err != nil {
// return Configuration{}, fmt.Errorf("validate common configuration: %w", err)
// }

// return commonConfig, nil
// }
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package config
package common

// import (
// "fmt"
Expand Down
2 changes: 1 addition & 1 deletion config/destination.go → common/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package config
package common

// import (
// "fmt"
Expand Down
2 changes: 1 addition & 1 deletion config/source.go → common/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package config
package common

const (
// OrderingColumn is a config name for the orderingColumn field.
Expand Down
2 changes: 1 addition & 1 deletion config/source_test.go → common/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package config
package common

// import (
// "fmt"
Expand Down
2 changes: 1 addition & 1 deletion config/validator.go → common/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package config
package common

import (
"errors"
Expand Down
64 changes: 0 additions & 64 deletions config/configuration.go

This file was deleted.

4 changes: 2 additions & 2 deletions destination/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
package config

import (
"github.com/conduitio-labs/conduit-connector-redshift/config"
"github.com/conduitio-labs/conduit-connector-redshift/common"
)

// Config is a destination configuration needed to connect to Redshift database.
type Config struct {
config.Configuration
common.Configuration
}
41 changes: 41 additions & 0 deletions destination/config/paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 28 additions & 25 deletions destination/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import (
"context"
"fmt"

"github.com/conduitio-labs/conduit-connector-redshift/config"
destConfig "github.com/conduitio-labs/conduit-connector-redshift/destination/config"
"github.com/conduitio-labs/conduit-connector-redshift/destination/writer"
commonsConfig "github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-connector-sdk"
_ "github.com/jackc/pgx/v5/stdlib" // sql driver
)
Expand All @@ -30,9 +31,9 @@ const driverName = "pgx"

// Writer defines a writer interface needed for the Destination.
type Writer interface {
Insert(context.Context, sdk.Record) error
Update(context.Context, sdk.Record) error
Delete(context.Context, sdk.Record) error
Insert(context.Context, opencdc.Record) error
Update(context.Context, opencdc.Record) error
Delete(context.Context, opencdc.Record) error
Stop() error
}

Expand All @@ -50,29 +51,31 @@ func NewDestination() sdk.Destination {
}

// Parameters returns a map of named Parameters that describe how to configure the Destination.
func (d *Destination) Parameters() map[string]sdk.Parameter {
return map[string]sdk.Parameter{
config.DSN: {
Default: "",
Required: true,
Description: "Data source name to connect to the Amazon Redshift.",
},
config.Table: {
Default: "",
Required: true,
Description: "Name of the table that the connector should read.",
},
config.KeyColumns: {
Default: "",
Required: false,
Description: "Comma-separated list of column names to build the where clause " +
"in case if sdk.Record.Key is empty.",
},
}
func (d *Destination) Parameters() commonsConfig.Parameters {
return commonsConfig.Parameters{}
// return d.config.Parameters()
// return map[string]config.Parameter{
// config.DSN: {
// Default: "",
// Required: true,
// Description: "Data source name to connect to the Amazon Redshift.",
// },
// config.Table: {
// Default: "",
// Required: true,
// Description: "Name of the table that the connector should read.",
// },
// config.KeyColumns: {
// Default: "",
// Required: false,
// Description: "Comma-separated list of column names to build the where clause " +
// "in case if opencdc.Record.Key is empty.",
// },
// }
}

// Configure parses and stores configurations, returns an error in case of invalid configuration.
func (d *Destination) Configure(ctx context.Context, cfg map[string]string) error {
func (d *Destination) Configure(ctx context.Context, cfg commonsConfig.Config) error {

Check failure on line 78 in destination/destination.go

View workflow job for this annotation

GitHub Actions / golangci-lint

unused-parameter: parameter 'cfg' seems to be unused, consider removing or renaming it as _ (revive)
sdk.Logger(ctx).Info().Msg("Configuring Redshift Destination...")

// var err error
Expand Down Expand Up @@ -100,7 +103,7 @@ func (d *Destination) Open(ctx context.Context) error {
}

// Write writes records into a Destination.
func (d *Destination) Write(ctx context.Context, records []sdk.Record) (int, error) {
func (d *Destination) Write(ctx context.Context, records []opencdc.Record) (int, error) {
for i := range records {
sdk.Logger(ctx).Debug().Bytes("record", records[i].Bytes()).
Msg("Writing a record into Redshift Destination...")
Expand Down
Loading

0 comments on commit 6463a90

Please sign in to comment.