Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

go.mod: bump github.com/conduitio/conduit-connector-sdk from 0.9.1 to 0.10.0 #130

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 2 additions & 12 deletions common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package common

//go:generate paramgen -output=config_paramgen.go Config

type Config struct {
// AWSAccessKeyID is the amazon access key id
AWSAccessKeyID string `json:"aws.accessKeyId" validate:"required"`
Expand All @@ -26,15 +28,3 @@ type Config struct {
// AWSURL is the URL for AWS (internal use only).
AWSURL string `json:"aws.url"`
}

const (
ConfigKeyAWSAccessKeyID = "aws.accessKeyId"

ConfigKeyAWSSecretAccessKey = "aws.secretAccessKey"

ConfigKeyAWSRegion = "aws.region"

ConfigKeyAWSQueue = "aws.queue"

ConfigKeyAWSURL = "aws.url"
)
59 changes: 59 additions & 0 deletions common/config_paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions common/position.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,24 @@ package common
import (
"encoding/json"

sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/conduitio/conduit-commons/opencdc"
)

type Position struct {
ReceiptHandle string `json:"receipt_handle"`
QueueName string `json:"queue_name"`
}

func ParsePosition(sdkPosition sdk.Position) (Position, error) {
func ParsePosition(sdkPosition opencdc.Position) (Position, error) {
var position Position
err := json.Unmarshal(sdkPosition, &position)
return position, err
}

func (p Position) ToSdkPosition() sdk.Position {
func (p Position) ToSdkPosition() opencdc.Position {
bs, err := json.Marshal(p)
if err != nil {
panic(err)
}
return sdk.Position(bs)
return opencdc.Position(bs)
}
4 changes: 0 additions & 4 deletions destination/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,3 @@ type Config struct {
// amazon sqs message delay time
AWSSQSMessageDelay int32 `json:"aws.delayTime" default:"0"`
}

const (
ConfigKeyAWSSQSDelayTime = "aws.delayTime"
)
71 changes: 40 additions & 31 deletions destination/config_paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion destination/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package destination

import (
"context"
"testing"

"github.com/conduitio-labs/conduit-connector-sqs/common"
Expand Down Expand Up @@ -42,7 +43,7 @@ func TestParseConfig(t *testing.T) {

is := is.New(t)
var got Config
err := sdk.Util.ParseConfig(exampleConfig, &got)
err := sdk.Util.ParseConfig(context.Background(), exampleConfig, &got, NewDestination().Parameters())

is.NoErr(err)
is.Equal(want, got)
Expand Down
10 changes: 6 additions & 4 deletions destination/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/conduitio-labs/conduit-connector-sqs/common"
"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-connector-sdk"
)

Expand All @@ -36,14 +38,14 @@ func NewDestination() sdk.Destination {
return sdk.DestinationWithMiddleware(&Destination{}, sdk.DefaultDestinationMiddleware()...)
}

func (d *Destination) Parameters() map[string]sdk.Parameter {
func (d *Destination) Parameters() config.Parameters {
return Config{}.Parameters()
}

func (d *Destination) Configure(ctx context.Context, cfg map[string]string) error {
func (d *Destination) Configure(ctx context.Context, cfg config.Config) error {
sdk.Logger(ctx).Debug().Msg("Configuring Destination Connector.")

err := sdk.Util.ParseConfig(cfg, &d.config)
err := sdk.Util.ParseConfig(ctx, cfg, &d.config, NewDestination().Parameters())
if err != nil {
return fmt.Errorf("failed to parse destination config : %w", err)
}
Expand Down Expand Up @@ -72,7 +74,7 @@ func (d *Destination) Open(ctx context.Context) (err error) {
return nil
}

func (d *Destination) Write(ctx context.Context, records []sdk.Record) (int, error) {
func (d *Destination) Write(ctx context.Context, records []opencdc.Record) (int, error) {
for i := 0; i < len(records); i += 10 {
end := i + 10
if end > len(records) {
Expand Down
24 changes: 11 additions & 13 deletions destination/destination_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
"time"

"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/conduitio-labs/conduit-connector-sqs/common"
testutils "github.com/conduitio-labs/conduit-connector-sqs/test"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/matryer/is"
)
Expand All @@ -45,16 +45,16 @@ func TestDestination_SuccessfulMessageSend(t *testing.T) {
is := is.New(t)
ctx := testutils.TestContext(t)

metadata := sdk.Metadata{}
metadata := opencdc.Metadata{}
destination := NewDestination()
defer func() { is.NoErr(destination.Teardown(ctx)) }()

messageBody := "Test message body"
record := sdk.Util.Source.NewRecordCreate(
sdk.Position("111111"),
opencdc.Position("111111"),
metadata,
sdk.RawData("1111111"),
sdk.RawData(messageBody),
opencdc.RawData("1111111"),
opencdc.RawData(messageBody),
)

testClient := testutils.NewSQSClient(ctx, is)
Expand All @@ -67,7 +67,7 @@ func TestDestination_SuccessfulMessageSend(t *testing.T) {
err = destination.Open(ctx)
is.NoErr(err)

ret, err := destination.Write(ctx, []sdk.Record{record})
ret, err := destination.Write(ctx, []opencdc.Record{record})
is.NoErr(err)

is.Equal(ret, 1)
Expand Down Expand Up @@ -100,16 +100,16 @@ func TestDestination_FailBadRecord(t *testing.T) {
queueName := testutils.CreateTestQueue(ctx, t, is, testClient)
cfg := testutils.IntegrationConfig(queueName.Name)

metadata := sdk.Metadata{}
metadata := opencdc.Metadata{}
destination := NewDestination()
defer func() { is.NoErr(destination.Teardown(ctx)) }()

messageBody := "Test message body"
record := sdk.Util.Source.NewRecordCreate(
sdk.Position(""),
opencdc.Position(""),
metadata,
sdk.RawData(""),
sdk.RawData(messageBody),
opencdc.RawData(""),
opencdc.RawData(messageBody),
)

err := destination.Configure(ctx, cfg)
Expand All @@ -118,7 +118,7 @@ func TestDestination_FailBadRecord(t *testing.T) {
err = destination.Open(ctx)
is.NoErr(err)

_, err = destination.Write(ctx, []sdk.Record{record})
_, err = destination.Write(ctx, []opencdc.Record{record})
is.True(strings.Contains(err.Error(), "AWS.SimpleQueueService.InvalidBatchEntryId"))
}

Expand All @@ -131,8 +131,6 @@ func TestDestination_FailNonExistentQueue(t *testing.T) {

cfg := testutils.IntegrationConfig("nonexistent-testqueue")

cfg[common.ConfigKeyAWSQueue] = ""

err := destination.Configure(ctx, cfg)
is.NoErr(err)

Expand Down
Loading
Loading