diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index a88a59f..3e29fb0 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,3 +1,3 @@ # Define code owners (individuals or teams that are responsible for code in this repository) # More about code owners at https://docs.github.com/en/repositories/managing-your-repositorys-settings-and-features/customizing-your-repository/about-code-owners -* @ConduitIO/conduit-core +* @conduitio-labs/conduit-core diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 1ed0687..9741108 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -8,6 +8,6 @@ Fixes # (issue) ### Quick checks: -- [ ] There is no other [pull request](https://github.com/conduitio/conduit-connector-connectorname/pulls) for the same update/change. +- [ ] There is no other [pull request](https://github.com/conduitio-labs/conduit-connector-sqs/pulls) for the same update/change. - [ ] I have written unit tests. - [ ] I have made sure that the PR is of reasonable size and can be easily reviewed. \ No newline at end of file diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index f1b66e7..d4251d3 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -10,11 +10,9 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - - name: Set up Go uses: actions/setup-go@v4 with: - go-version: '1.21' - + go-version-file: 'go.mod' - name: Test - run: make test" + run: make test diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index dc2e737..811b107 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -12,8 +12,8 @@ jobs: - uses: actions/checkout@v4 - uses: actions/setup-go@v4 with: - go-version: '1.21' - + go-version-file: 'go.mod' - name: golangci-lint uses: golangci/golangci-lint-action@v3 - + with: + version: v1.52.2 diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 01403e7..84d3a08 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -21,7 +21,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v4 with: - go-version: '1.20' + go-version-file: 'go.mod' - name: Run GoReleaser uses: goreleaser/goreleaser-action@v5 diff --git a/.goreleaser.yml b/.goreleaser.yml index 5b068e0..fc9de23 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -7,7 +7,7 @@ builds: env: - CGO_ENABLED=0 ldflags: - - "-s -w -X 'github.com/conduitio/conduit-connector-connectorname.version={{ .Tag }}'" + - "-s -w -X 'github.com/conduitio-labs/conduit-connector-sqs.version={{ .Tag }}'" checksum: name_template: checksums.txt archives: diff --git a/Makefile b/Makefile index ca87c0b..2cf5b7b 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ VERSION=$(shell git describe --tags --dirty --always) .PHONY: build: - go build -ldflags "-X 'github.com/meroxa/conduit-connector-amazon-sqs.version=${VERSION}'" -o conduit-connector-amazon-sqs cmd/sqs/main.go + go build -ldflags "-X 'github.com/conduitio-labs/conduit-connector-sqs.version=${VERSION}'" -o conduit-connector-amazon-sqs cmd/sqs/main.go .PHONY: test: @@ -16,4 +16,4 @@ lint: golangci-lint run -c .golangci.yml install-paramgen: - go install github.com/conduitio/conduit-connector-sdk/cmd/paramgen@latest \ No newline at end of file + go install github.com/conduitio/conduit-connector-sdk/cmd/paramgen@latest diff --git a/cmd/sqs/main.go b/cmd/connector/main.go similarity index 100% rename from cmd/sqs/main.go rename to cmd/connector/main.go diff --git a/destination/config.go b/destination/config.go index 2d21201..974656c 100644 --- a/destination/config.go +++ b/destination/config.go @@ -7,10 +7,12 @@ type Config struct { AWSAccessKeyID string `json:"aws.accessKeyId" validate:"required"` // amazon secret access key AWSSecretAccessKey string `json:"aws.secretAccessKey" validate:"required"` - // amazon access token - AWSToken string `json:"aws.token"` + // amazon sqs region + AWSRegion string `json:"aws.region" validate:"required"` // amazon sqs queue name AWSQueue string `json:"aws.queue" validate:"required"` + // amazon sqs message delay time + AWSSQSMessageDelay int32 `json:"aws.delayTime"` } const ( @@ -18,7 +20,9 @@ const ( ConfigKeyAWSSecretAccessKey = "aws.secretAccessKey" - ConfigKeyAWSToken = "aws.token" + ConfigKeyAWSRegion = "aws.region" ConfigKeyAWSQueue = "aws.queue" + + ConfigKeyAWSSQSDelayTime = "aws.delayTime" ) diff --git a/destination/config_test.go b/destination/config_test.go index c0f7aa1..91b8f74 100644 --- a/destination/config_test.go +++ b/destination/config_test.go @@ -10,7 +10,9 @@ import ( var exampleConfig = map[string]string{ "aws.accessKeyId": "access-key-123", "aws.secretAccessKey": "secret-key-321", - "aws.token": "token_token", + "aws.region": "us-east-1", + "aws.queue": "queue", + "aws.delayTime": "10", } func TestParseConfig(t *testing.T) { @@ -20,7 +22,9 @@ func TestParseConfig(t *testing.T) { want := Config{ AWSAccessKeyID: "access-key-123", AWSSecretAccessKey: "secret-key-321", - AWSToken: "token_token", + AWSRegion: "us-east-1", + AWSQueue: "queue", + AWSSQSMessageDelay: 10, } is.NoErr(err) diff --git a/destination/destination.go b/destination/destination.go index fb35fa2..4fee2c7 100644 --- a/destination/destination.go +++ b/destination/destination.go @@ -16,6 +16,7 @@ package destination import ( "context" + "fmt" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" @@ -45,7 +46,7 @@ func (d *Destination) Configure(ctx context.Context, cfg map[string]string) erro err := sdk.Util.ParseConfig(cfg, &d.config) if err != nil { - return err + return fmt.Errorf("failed to parse destination config : %w", err) } return nil @@ -53,14 +54,15 @@ func (d *Destination) Configure(ctx context.Context, cfg map[string]string) erro func (d *Destination) Open(ctx context.Context) error { cfg, err := config.LoadDefaultConfig(ctx, + config.WithRegion(d.config.AWSRegion), config.WithCredentialsProvider( credentials.NewStaticCredentialsProvider( d.config.AWSAccessKeyID, d.config.AWSSecretAccessKey, - d.config.AWSToken)), + "")), ) if err != nil { - return err + return fmt.Errorf("failed to load amazon config with given credentials : %w", err) } // Create a SQS client from just a session. d.svc = sqs.NewFromConfig(cfg) @@ -72,7 +74,7 @@ func (d *Destination) Open(ctx context.Context) error { // Get URL of queue urlResult, err := d.svc.GetQueueUrl(ctx, queueInput) if err != nil { - return err + return fmt.Errorf("failed to get sqs queue url : %w", err) } d.queueURL = *urlResult.QueueUrl @@ -93,15 +95,15 @@ func (d *Destination) Write(ctx context.Context, records []sdk.Record) (int, err } // construct record to send to destination sendMessageInput := &sqs.SendMessageInput{ - DelaySeconds: 10, MessageAttributes: messageAttributes, MessageBody: &messageBody, QueueUrl: &d.queueURL, + DelaySeconds: d.config.AWSSQSMessageDelay, } _, err := d.svc.SendMessage(ctx, sendMessageInput) if err != nil { - return i, err + return i, fmt.Errorf("failed to write sqs message : %w", err) } } diff --git a/destination/destination_integration_test.go b/destination/destination_integration_test.go index 1a77ba9..83633bf 100644 --- a/destination/destination_integration_test.go +++ b/destination/destination_integration_test.go @@ -133,11 +133,11 @@ func deleteSQSQueue(t *testing.T, svc *sqs.Client, url *string) error { func newAWSClient(cfg map[string]string) (*sqs.Client, error) { awsConfig, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion(cfg[ConfigKeyAWSRegion]), config.WithCredentialsProvider( credentials.NewStaticCredentialsProvider( cfg[ConfigKeyAWSAccessKeyID], - cfg[ConfigKeyAWSSecretAccessKey], - cfg[ConfigKeyAWSToken])), + cfg[ConfigKeyAWSSecretAccessKey], "")), ) if err != nil { return nil, err @@ -160,11 +160,11 @@ func parseIntegrationConfig() (map[string]string, error) { return map[string]string{}, errors.New("AWS_SECRET_ACCESS_KEY env var must be set") } - awsToken := os.Getenv("AWS_TOKEN") + awsMessageDelay := os.Getenv("AWS_MESSAGE_DELAY") return map[string]string{ ConfigKeyAWSAccessKeyID: awsAccessKeyID, ConfigKeyAWSSecretAccessKey: awsSecretAccessKey, - ConfigKeyAWSToken: awsToken, + ConfigKeyAWSSQSDelayTime: awsMessageDelay, }, nil } diff --git a/destination/paramgen.go b/destination/paramgen.go index 6e727b6..8d1c75a 100644 --- a/destination/paramgen.go +++ b/destination/paramgen.go @@ -17,6 +17,12 @@ func (Config) Parameters() map[string]sdk.Parameter { sdk.ValidationRequired{}, }, }, + "aws.delayTime": { + Default: "", + Description: "amazon sqs message delay time", + Type: sdk.ParameterTypeInt, + Validations: []sdk.Validation{}, + }, "aws.queue": { Default: "", Description: "amazon sqs queue name", @@ -33,11 +39,5 @@ func (Config) Parameters() map[string]sdk.Parameter { sdk.ValidationRequired{}, }, }, - "aws.token": { - Default: "", - Description: "amazon access token", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{}, - }, } } diff --git a/setup.sh b/setup.sh deleted file mode 100755 index 226b9dd..0000000 --- a/setup.sh +++ /dev/null @@ -1,40 +0,0 @@ -#!/bin/bash -if [ $# -eq 0 ] -then - echo "Module name not provided." - exit 1 -fi - -MODULE_NAME=$1 -if ! [[ "$MODULE_NAME" =~ ^github.com\/.*\/conduit-connector-(.*)$ ]] -then - echo "Module name ${MODULE_NAME} not in recommended format \"github.com/repository/conduit-connector-connectorname\"." - echo - echo "Certain things (such as pull request templates) will not work correctly." - while true; do - read -n1 -p "Are you sure you want to continue? [y/n] " yn - echo - case $yn in - [Yy]* ) break;; - [Nn]* ) exit;; - * ) echo "Please answer yes or no.";; - esac - done -fi - -CONNECTOR_NAME=${BASH_REMATCH[1]} - -if [[ "$OSTYPE" == "darwin"* ]]; then - LC_ALL=C find . -type f ! -name "setup.sh" -exec sed -i "" "s~github.com/conduitio/conduit-connector-connectorname~$MODULE_NAME~g" {} + - LC_ALL=C find . -type f ! -name "setup.sh" -exec sed -i "" "s~connectorname~$CONNECTOR_NAME~g" {} + - LC_ALL=C sed -i "" "s~* @ConduitIO/conduit-core~ ~g" .github/CODEOWNERS -else - find . -type f ! -name "setup.sh" -exec sed -i "s~github.com/conduitio/conduit-connector-connectorname~$MODULE_NAME~g" {} + - find . -type f ! -name "setup.sh" -exec sed -i "s~connectorname~$CONNECTOR_NAME~g" {} + - sed -i "s~* @ConduitIO/conduit-core~ ~g" .github/CODEOWNERS -fi - -# Remove this script -rm "$0" -rm README.md -mv README_TEMPLATE.md README.md diff --git a/source/config.go b/source/config.go index 0be8903..f45b0d1 100644 --- a/source/config.go +++ b/source/config.go @@ -7,8 +7,8 @@ type Config struct { AWSAccessKeyID string `json:"aws.accessKeyId" validate:"required"` // amazon secret access key AWSSecretAccessKey string `json:"aws.secretAccessKey" validate:"required"` - // amazon access token - AWSToken string `json:"aws.token"` + // amazon sqs region + AWSRegion string `json:"aws.region" validate:"required"` // amazon sqs queue name AWSQueue string `json:"aws.queue" validate:"required"` // visibility timeout @@ -20,9 +20,9 @@ const ( ConfigKeyAWSSecretAccessKey = "aws.secretAccessKey" - ConfigKeyAWSToken = "aws.token" + ConfigKeyAWSRegion = "aws.region" ConfigKeyAWSQueue = "aws.queue" - ConfigSQSVisibilityTimeout = "aws.visibilityTimeout" + ConfigKeySQSVisibilityTimeout = "aws.visibilityTimeout" ) diff --git a/source/config_test.go b/source/config_test.go index fbe4682..62a0439 100644 --- a/source/config_test.go +++ b/source/config_test.go @@ -24,8 +24,9 @@ import ( var exampleConfig = map[string]string{ "aws.accessKeyId": "access-key-123", "aws.secretAccessKey": "secret-key-321", - "aws.token": "token_token", + "aws.region": "us-east-1", "aws.visibilityTimeout": "60", + "aws.queue": "queue", } func TestParseConfig(t *testing.T) { @@ -35,7 +36,8 @@ func TestParseConfig(t *testing.T) { want := Config{ AWSAccessKeyID: "access-key-123", AWSSecretAccessKey: "secret-key-321", - AWSToken: "token_token", + AWSRegion: "us-east-1", + AWSQueue: "queue", AWSSQSVisibilityTimeout: 60, } diff --git a/source/paramgen.go b/source/paramgen.go index ff8e820..f85f6fd 100644 --- a/source/paramgen.go +++ b/source/paramgen.go @@ -25,19 +25,21 @@ func (Config) Parameters() map[string]sdk.Parameter { sdk.ValidationRequired{}, }, }, - "aws.secretAccessKey": { + "aws.region": { Default: "", - Description: "amazon secret access key", + Description: "amazon sqs region", Type: sdk.ParameterTypeString, Validations: []sdk.Validation{ sdk.ValidationRequired{}, }, }, - "aws.token": { + "aws.secretAccessKey": { Default: "", - Description: "amazon access token", + Description: "amazon secret access key", Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{}, + Validations: []sdk.Validation{ + sdk.ValidationRequired{}, + }, }, "aws.visibilityTimeout": { Default: "", diff --git a/source/source.go b/source/source.go index 335eed0..087285d 100644 --- a/source/source.go +++ b/source/source.go @@ -31,7 +31,7 @@ func (s *Source) Configure(ctx context.Context, cfg map[string]string) error { err := sdk.Util.ParseConfig(cfg, &s.config) if err != nil { - return fmt.Errorf(" failed to parse source config : %w", err) + return fmt.Errorf("failed to parse source config : %w", err) } return nil @@ -39,14 +39,15 @@ func (s *Source) Configure(ctx context.Context, cfg map[string]string) error { func (s *Source) Open(ctx context.Context, _ sdk.Position) error { cfg, err := config.LoadDefaultConfig(ctx, + config.WithRegion(s.config.AWSRegion), config.WithCredentialsProvider( credentials.NewStaticCredentialsProvider( s.config.AWSAccessKeyID, s.config.AWSSecretAccessKey, - s.config.AWSToken)), + "")), ) if err != nil { - return err + return fmt.Errorf("failed to load aws config with given credentials : %w", err) } // Create a SQS client from just a session. s.svc = sqs.NewFromConfig(cfg) @@ -57,7 +58,7 @@ func (s *Source) Open(ctx context.Context, _ sdk.Position) error { // Get URL of queue urlResult, err := s.svc.GetQueueUrl(ctx, queueInput) if err != nil { - return fmt.Errorf(" failed to get queue amazon sqs URL: %w", err) + return fmt.Errorf("failed to get queue amazon sqs URL: %w", err) } s.queueURL = *urlResult.QueueUrl @@ -79,7 +80,7 @@ func (s *Source) Read(ctx context.Context) (sdk.Record, error) { // grab a message from queue sqsMessages, err := s.svc.ReceiveMessage(ctx, receiveMessage) if err != nil { - return sdk.Record{}, fmt.Errorf(" error retrieving amazon sqs messages: %w", err) + return sdk.Record{}, fmt.Errorf("error retrieving amazon sqs messages: %w", err) } if len(sqsMessages.Messages) != 0 { diff --git a/source/source_integration_test.go b/source/source_integration_test.go index ae23e4e..1967593 100644 --- a/source/source_integration_test.go +++ b/source/source_integration_test.go @@ -142,11 +142,12 @@ func deleteSQSQueue(t *testing.T, svc *sqs.Client, url *string) error { func newAWSClient(cfg map[string]string) (*sqs.Client, error) { awsConfig, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion(cfg[ConfigKeyAWSRegion]), config.WithCredentialsProvider( credentials.NewStaticCredentialsProvider( cfg[ConfigKeyAWSAccessKeyID], cfg[ConfigKeyAWSSecretAccessKey], - cfg[ConfigKeyAWSToken])), + "")), ) if err != nil { return nil, err @@ -169,13 +170,17 @@ func parseIntegrationConfig() (map[string]string, error) { return map[string]string{}, errors.New("AWS_SECRET_ACCESS_KEY env var must be set") } - awsToken := os.Getenv("AWS_TOKEN") + awsRegion := os.Getenv("AWS_REGION") + if awsRegion == "" { + return map[string]string{}, errors.New("AWS_REGION env var must be set") + } + awsVisibility := os.Getenv("AWS_VISIBILITY") return map[string]string{ - ConfigKeyAWSAccessKeyID: awsAccessKeyID, - ConfigKeyAWSSecretAccessKey: awsSecretAccessKey, - ConfigKeyAWSToken: awsToken, - ConfigSQSVisibilityTimeout: awsVisibility, + ConfigKeyAWSAccessKeyID: awsAccessKeyID, + ConfigKeyAWSSecretAccessKey: awsSecretAccessKey, + ConfigKeySQSVisibilityTimeout: awsVisibility, + ConfigKeyAWSRegion: awsRegion, }, nil }