diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 0000000..3e29fb0 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1,3 @@ +# Define code owners (individuals or teams that are responsible for code in this repository) +# More about code owners at https://docs.github.com/en/repositories/managing-your-repositorys-settings-and-features/customizing-your-repository/about-code-owners +* @conduitio-labs/conduit-core diff --git a/.github/ISSUE_TEMPLATE/1-feature-request.yml b/.github/ISSUE_TEMPLATE/1-feature-request.yml new file mode 100644 index 0000000..c7adf98 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/1-feature-request.yml @@ -0,0 +1,11 @@ +name: 🚀 Feature Request +description: Request a new feature. +title: "Feature: " +labels: [feature, triage] +body: +- type: textarea + attributes: + label: Feature description + description: A clear and concise description of what you want to happen and what problem will this solve. + validations: + required: true diff --git a/.github/ISSUE_TEMPLATE/2-bug.yml b/.github/ISSUE_TEMPLATE/2-bug.yml new file mode 100644 index 0000000..4916f64 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/2-bug.yml @@ -0,0 +1,29 @@ +name: 🐛 Bug +description: Report a bug. +title: "Bug: <title>" +labels: [bug, triage] +body: +- type: textarea + attributes: + label: Bug description + description: A concise description of what you're experiencing and what you expected to happen instead. + validations: + required: true +- type: textarea + attributes: + label: Steps to reproduce + description: Steps to reproduce the behavior. + placeholder: | + 1. In this environment... + 2. With this config... + 3. Run '...' + 4. See error... + validations: + required: true +- type: input + attributes: + label: Version + description: "Version of connector" + placeholder: v0.1.0 darwin/amd64 + validations: + required: true diff --git a/.github/ISSUE_TEMPLATE/config.yml b/.github/ISSUE_TEMPLATE/config.yml new file mode 100644 index 0000000..0627651 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/config.yml @@ -0,0 +1,5 @@ +blank_issues_enabled: true +contact_links: + - name: ❓ Ask a Question + url: https://github.com/ConduitIO/conduit/discussions + about: Please ask and answer questions here. \ No newline at end of file diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..ed7a994 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,19 @@ +# Docs: https://docs.github.com/en/code-security/supply-chain-security/keeping-your-dependencies-updated-automatically/configuration-options-for-dependency-updates +version: 2 +updates: + + # Maintain dependencies for GitHub Actions + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "daily" + commit-message: + prefix: ".github:" + + # Maintain dependencies for Go + - package-ecosystem: "gomod" + directory: "/" + schedule: + interval: "daily" + commit-message: + prefix: "go.mod:" diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 0000000..9741108 --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,13 @@ +### Description + +Please include a summary of the change and what type of change it is (new feature, bug fix, refactoring, documentation). +Please also include relevant motivation and context. +List any dependencies that are required for this change. + +Fixes # (issue) + +### Quick checks: + +- [ ] There is no other [pull request](https://github.com/conduitio-labs/conduit-connector-sqs/pulls) for the same update/change. +- [ ] I have written unit tests. +- [ ] I have made sure that the PR is of reasonable size and can be easily reviewed. \ No newline at end of file diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..3ac415b --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,24 @@ +name: build + +on: + push: + branches: [ main ] + pull_request: + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version-file: 'go.mod' + - name: Test + env: + AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + AWS_REGION: ${{ secrets.AWS_REGION }} + AWS_MESSAGE_DELAY: ${{ secrets.AWS_MESSAGE_DELAY }} + AWS_VISIBILITY: ${{ secrets.AWS_VISIBILITY }} + run: make test diff --git a/.github/workflows/dependabot-auto-merge-go.yml b/.github/workflows/dependabot-auto-merge-go.yml new file mode 100644 index 0000000..cc371ad --- /dev/null +++ b/.github/workflows/dependabot-auto-merge-go.yml @@ -0,0 +1,39 @@ +# This action automatically merges dependabot PRs that update go dependencies (only patch and minor updates). +# Based on: https://docs.github.com/en/code-security/supply-chain-security/keeping-your-dependencies-updated-automatically/automating-dependabot-with-github-actions#enable-auto-merge-on-a-pull-request + +name: Dependabot auto-merge +on: + pull_request: + # Run this action when dependabot labels the PR, we care about the 'go' label. + types: [labeled] + +permissions: + pull-requests: write + contents: write + +jobs: + dependabot-go: + runs-on: ubuntu-latest + if: ${{ github.actor == 'dependabot[bot]' && contains(github.event.pull_request.labels.*.name, 'go') }} + steps: + - name: Dependabot metadata + id: metadata + uses: dependabot/fetch-metadata@v1.6.0 + with: + github-token: "${{ secrets.GITHUB_TOKEN }}" + + - name: Approve PR + # Approve only patch and minor updates + if: ${{ steps.metadata.outputs.update-type != 'version-update:semver-major' }} + run: gh pr review --approve "$PR_URL" + env: + PR_URL: ${{ github.event.pull_request.html_url }} + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + - name: Enable auto-merge for Dependabot PRs + # Enable auto-merging only for patch and minor updates + if: ${{ steps.metadata.outputs.update-type != 'version-update:semver-major' }} + run: gh pr merge --auto --squash "$PR_URL" + env: + PR_URL: ${{ github.event.pull_request.html_url }} + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 0000000..cf2f7d6 --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,20 @@ +name: lint + +on: + push: + branches: [ main ] + pull_request: + +jobs: + golangci-lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v4 + with: + go-version-file: 'go.mod' + - name: golangci-lint + uses: golangci/golangci-lint-action@v3.7.0 + with: + version: v1.55.2 + args: --timeout=2m \ No newline at end of file diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..f0228df --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,33 @@ +name: release + +on: + push: + tags: + - v* + +permissions: + contents: write + +jobs: + release: + name: Release + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version-file: 'go.mod' + + - name: Run GoReleaser + uses: goreleaser/goreleaser-action@v5 + with: + distribution: goreleaser + version: latest + args: release --rm-dist + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/.gitignore b/.gitignore index 3b735ec..2347acf 100644 --- a/.gitignore +++ b/.gitignore @@ -15,7 +15,10 @@ *.out # Dependency directories (remove the comment below to include it) -# vendor/ +vendor/ # Go workspace file go.work + +/connectors/conduit-connector-sqs +conduit-connector-sqs \ No newline at end of file diff --git a/.golangci.goheader.template b/.golangci.goheader.template new file mode 100644 index 0000000..947c941 --- /dev/null +++ b/.golangci.goheader.template @@ -0,0 +1,13 @@ +Copyright © {{ copyright-year }} Meroxa, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. \ No newline at end of file diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..f723f8d --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,106 @@ +linters-settings: + gofmt: + simplify: false + govet: + check-shadowing: false + nolintlint: + allow-unused: false # report any unused nolint directives + require-explanation: true # require an explanation for nolint directives + require-specific: true # require nolint directives to mention the specific linter being suppressed + gocyclo: + min-complexity: 20 + goconst: + ignore-tests: true + goheader: + template-path: '.golangci.goheader.template' + values: + regexp: + copyright-year: 20[2-9]\d + +issues: + exclude-rules: + - path: _test\.go + linters: + - dogsled + - gosec + - revive + +linters: + # please, do not use `enable-all`: it's deprecated and will be removed soon. + # inverted configuration with `enable-all` and `disable` is not scalable during updates of golangci-lint + disable-all: true + enable: + - bodyclose + # - depguard + - dogsled + - durationcheck + - errcheck + - errname + # - errorlint + # - exhaustive + # - exhaustivestruct + - exportloopref + # - forbidigo + # - forcetypeassert + # - funlen + # - gochecknoinits + - goconst + - gocritic + - gocyclo + # - cyclop # not interested in package complexities at the moment + # - godot + - gofmt + # - gofumpt + - goheader + - goimports + - revive + # - gomnd + - gomoddirectives + - gomodguard + - goprintffuncname + - gosec + - gosimple + - govet + # - ifshort + - ineffassign + # - importas + # - lll + # - misspell + - makezero + # - nakedret + # - nilerr + # - nilnil + # - nlreturn + - noctx + - nolintlint + # - paralleltest + - predeclared + - rowserrcheck + - staticcheck + - stylecheck + - sqlclosecheck + # - tagliatelle + # - tenv + # - thelper + # - tparallel + - typecheck + - unconvert + # - unparam + - unused + - wastedassign + - whitespace + # - wrapcheck + # - wsl + + # don't enable: + # - asciicheck + # - dupl + # - gochecknoglobals + # - gocognit + # - godox + # - goerr113 + # - maligned + # - nestif + # - prealloc + # - testpackage + # - wsl \ No newline at end of file diff --git a/.goreleaser.yml b/.goreleaser.yml new file mode 100644 index 0000000..fc9de23 --- /dev/null +++ b/.goreleaser.yml @@ -0,0 +1,30 @@ +builds: + - main: ./cmd/connector/main.go + goos: + - darwin + - linux + - windows + env: + - CGO_ENABLED=0 + ldflags: + - "-s -w -X 'github.com/conduitio-labs/conduit-connector-sqs.version={{ .Tag }}'" +checksum: + name_template: checksums.txt +archives: + - name_template: >- + {{ .ProjectName }}_ + {{- .Version }}_ + {{- title .Os }}_ + {{- if eq .Arch "amd64" }}x86_64 + {{- else if eq .Arch "386" }}i386 + {{- else }}{{ .Arch }}{{ end }} +changelog: + sort: asc + use: github + filters: + exclude: + - '^docs:' + - '^test:' + - '^go.mod:' + - '^.github:' + - Merge branch \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..5318aa1 --- /dev/null +++ b/Makefile @@ -0,0 +1,19 @@ +VERSION=$(shell git describe --tags --dirty --always) +.PHONY: +build: + go build -ldflags "-X 'github.com/conduitio-labs/conduit-connector-sqs.version=${VERSION}'" -o conduit-connector-sqs cmd/connector/main.go + +.PHONY: +test: + go test $(GOTEST_FLAGS) -race -v ./... + +.PHONY: golangci-lint-install +golangci-lint-install: + go install github.com/golangci/golangci-lint/cmd/golangci-lint + +.PHONY: +lint: + golangci-lint run + +install-paramgen: + go install github.com/conduitio/conduit-connector-sdk/cmd/paramgen@latest diff --git a/README.md b/README.md index 58a20b6..0308937 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,43 @@ -# conduit-connector-amazon-sqs -Conduit connector for Amazon SQS +# Conduit Connector for Amazon SQS +[Conduit](https://conduit.io) for [Amazon SQS](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html). + +## How to build? +Run `make build` to build the connector. + + +## Source + + +The source connector pulls data from the Amazon SQS Queue. As messages come in, the source connector grabs a single message from the Amazon SQS Queue, formats it for the destination connector as a new record, and sends it. The Message Body of the SQS Message is formatted into a record's payload, and the Message Attributes are passed as record metadata. After the record has been acknowledged, the source connector deletes the message from the Amazon SQS Queue. + + +## Destinaton + + +The destination connector batches incoming records into 10 and pushes them to the Amazon SQS Queue. Any fields defined in the metadata of the record will be passed as Message Attributes, and the json encoding of the record will be passed as the Message Body. + + +### Configuration + + +The configuration passed to `Configure` can contain the following fields: + +#### Source + +| name | description | required | example | +| --------------------- | ------------------------------------------------------------------------------------- | -------- | ------------------- | +| `aws.accessKeyId` | AWS Access Key ID | yes | "THE_ACCESS_KEY_ID" | +| `aws.secretAccessKey` | AWS Secret Access Key | yes | "SECRET_ACCESS_KEY" | +| `aws.region` | AWS SQS Region | yes | "us-east-1" | +| `aws.queue` | AWS SQS Queue Name | yes | "QUEUE_NAME" | +| `aws.delayTime` | AWS SQS Message Delay | yes | "5" | + +#### Destination + +| name | description | required | example | +| --------------------------------- | ------------------------------------------------------------------------------------- | -------- | ------------------- | +| `aws.accessKeyId` | AWS Access Key ID | yes | "THE_ACCESS_KEY_ID" | +| `aws.secretAccessKey` | AWS Secret Access Key | yes | "SECRET_ACCESS_KEY" | +| `aws.region` | AWS SQS Region | yes | "us-east-1" | +| `aws.queue` | AWS SQS Queue Name | yes | "QUEUE_NAME" | +| `aws.visibilityTimeout` | AWS SQS Message Visibility Timeout | yes | "5" | diff --git a/cmd/connector/main.go b/cmd/connector/main.go new file mode 100644 index 0000000..f65f1f0 --- /dev/null +++ b/cmd/connector/main.go @@ -0,0 +1,24 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + sqs "github.com/conduitio-labs/conduit-connector-sqs" + sdk "github.com/conduitio/conduit-connector-sdk" +) + +func main() { + sdk.Serve(sqs.Connector) +} diff --git a/common/config.go b/common/config.go new file mode 100644 index 0000000..b1a5487 --- /dev/null +++ b/common/config.go @@ -0,0 +1,36 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +type Config struct { + // amazon access key id + AWSAccessKeyID string `json:"aws.accessKeyId" validate:"required"` + // amazon secret access key + AWSSecretAccessKey string `json:"aws.secretAccessKey" validate:"required"` + // amazon sqs region + AWSRegion string `json:"aws.region" validate:"required"` + // amazon sqs queue name + AWSQueue string `json:"aws.queue" validate:"required"` +} + +const ( + ConfigKeyAWSAccessKeyID = "aws.accessKeyId" + + ConfigKeyAWSSecretAccessKey = "aws.secretAccessKey" + + ConfigKeyAWSRegion = "aws.region" + + ConfigKeyAWSQueue = "aws.queue" +) diff --git a/connector.go b/connector.go new file mode 100644 index 0000000..14a9998 --- /dev/null +++ b/connector.go @@ -0,0 +1,27 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqs + +import ( + "github.com/conduitio-labs/conduit-connector-sqs/destination" + "github.com/conduitio-labs/conduit-connector-sqs/source" + sdk "github.com/conduitio/conduit-connector-sdk" +) + +var Connector = sdk.Connector{ + NewSpecification: Specification, + NewSource: source.NewSource, + NewDestination: destination.NewDestination, +} diff --git a/destination/config.go b/destination/config.go new file mode 100644 index 0000000..1b8ce28 --- /dev/null +++ b/destination/config.go @@ -0,0 +1,29 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package destination + +import "github.com/conduitio-labs/conduit-connector-sqs/common" + +//go:generate paramgen -output=config_paramgen.go Config + +type Config struct { + common.Config + // amazon sqs message delay time + AWSSQSMessageDelay int32 `json:"aws.delayTime" default:"0"` +} + +const ( + ConfigKeyAWSSQSDelayTime = "aws.delayTime" +) diff --git a/destination/config_paramgen.go b/destination/config_paramgen.go new file mode 100644 index 0000000..e858953 --- /dev/null +++ b/destination/config_paramgen.go @@ -0,0 +1,51 @@ +// Code generated by paramgen. DO NOT EDIT. +// Source: github.com/ConduitIO/conduit-connector-sdk/tree/main/cmd/paramgen + +package destination + +import ( + sdk "github.com/conduitio/conduit-connector-sdk" +) + +func (Config) Parameters() map[string]sdk.Parameter { + return map[string]sdk.Parameter{ + "aws.accessKeyId": { + Default: "", + Description: "amazon access key id", + Type: sdk.ParameterTypeString, + Validations: []sdk.Validation{ + sdk.ValidationRequired{}, + }, + }, + "aws.delayTime": { + Default: "0", + Description: "amazon sqs message delay time", + Type: sdk.ParameterTypeInt, + Validations: []sdk.Validation{}, + }, + "aws.queue": { + Default: "", + Description: "amazon sqs queue name", + Type: sdk.ParameterTypeString, + Validations: []sdk.Validation{ + sdk.ValidationRequired{}, + }, + }, + "aws.region": { + Default: "", + Description: "amazon sqs region", + Type: sdk.ParameterTypeString, + Validations: []sdk.Validation{ + sdk.ValidationRequired{}, + }, + }, + "aws.secretAccessKey": { + Default: "", + Description: "amazon secret access key", + Type: sdk.ParameterTypeString, + Validations: []sdk.Validation{ + sdk.ValidationRequired{}, + }, + }, + } +} diff --git a/destination/config_test.go b/destination/config_test.go new file mode 100644 index 0000000..4662bf7 --- /dev/null +++ b/destination/config_test.go @@ -0,0 +1,49 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package destination + +import ( + "testing" + + "github.com/conduitio-labs/conduit-connector-sqs/common" + sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/matryer/is" +) + +func TestParseConfig(t *testing.T) { + exampleConfig := map[string]string{ + "aws.accessKeyId": "access-key-123", + "aws.secretAccessKey": "secret-key-321", + "aws.region": "us-east-1", + "aws.queue": "queue", + "aws.delayTime": "10", + } + want := Config{ + Config: common.Config{ + AWSAccessKeyID: "access-key-123", + AWSSecretAccessKey: "secret-key-321", + AWSRegion: "us-east-1", + AWSQueue: "queue", + }, + AWSSQSMessageDelay: 10, + } + + is := is.New(t) + var got Config + err := sdk.Util.ParseConfig(exampleConfig, &got) + + is.NoErr(err) + is.Equal(want, got) +} diff --git a/destination/destination.go b/destination/destination.go new file mode 100644 index 0000000..15a3bd9 --- /dev/null +++ b/destination/destination.go @@ -0,0 +1,128 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package destination + +import ( + "context" + "fmt" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" + sdk "github.com/conduitio/conduit-connector-sdk" +) + +type Destination struct { + sdk.UnimplementedDestination + config Config + svc *sqs.Client + queueURL string +} + +func NewDestination() sdk.Destination { + return sdk.DestinationWithMiddleware(&Destination{}, sdk.DefaultDestinationMiddleware()...) +} + +func (d *Destination) Parameters() map[string]sdk.Parameter { + return Config{}.Parameters() +} + +func (d *Destination) Configure(ctx context.Context, cfg map[string]string) error { + sdk.Logger(ctx).Debug().Msg("Configuring Destination Connector.") + + err := sdk.Util.ParseConfig(cfg, &d.config) + if err != nil { + return fmt.Errorf("failed to parse destination config : %w", err) + } + + return nil +} + +func (d *Destination) Open(ctx context.Context) error { + cfg, err := config.LoadDefaultConfig(ctx, + config.WithRegion(d.config.AWSRegion), + config.WithCredentialsProvider( + credentials.NewStaticCredentialsProvider( + d.config.AWSAccessKeyID, + d.config.AWSSecretAccessKey, + "")), + ) + if err != nil { + return fmt.Errorf("failed to load amazon config with given credentials : %w", err) + } + // Create a SQS client from just a session. + d.svc = sqs.NewFromConfig(cfg) + + queueInput := &sqs.GetQueueUrlInput{ + QueueName: &d.config.AWSQueue, + } + + // Get URL of queue + urlResult, err := d.svc.GetQueueUrl(ctx, queueInput) + if err != nil { + return fmt.Errorf("failed to get sqs queue url : %w", err) + } + + d.queueURL = *urlResult.QueueUrl + + return nil +} + +func (d *Destination) Write(ctx context.Context, records []sdk.Record) (int, error) { + for i := 0; i < len(records); i += 10 { + end := i + 10 + if end > len(records) { + end = len(records) + } + + recordsChunk := records[i:end] + sqsRecords := sqs.SendMessageBatchInput{ + QueueUrl: &d.queueURL, + } + + for _, record := range recordsChunk { + messageBody := string(record.Bytes()) + + messageAttributes := map[string]types.MessageAttributeValue{} + for key, value := range record.Metadata { + messageAttributes[key] = types.MessageAttributeValue{ + DataType: aws.String("String"), + StringValue: aws.String(value), + } + } + id := string(record.Key.Bytes()) + // construct record to send to destination + sqsRecords.Entries = append(sqsRecords.Entries, types.SendMessageBatchRequestEntry{ + MessageAttributes: messageAttributes, + MessageBody: &messageBody, + DelaySeconds: d.config.AWSSQSMessageDelay, + Id: &id, + }) + } + + _, err := d.svc.SendMessageBatch(ctx, &sqsRecords) + if err != nil { + return 0, fmt.Errorf("failed to write sqs message : %w", err) + } + } + + return len(records), nil +} + +func (d *Destination) Teardown(_ context.Context) error { + return nil // nothing to do +} diff --git a/destination/destination_integration_test.go b/destination/destination_integration_test.go new file mode 100644 index 0000000..9d72f52 --- /dev/null +++ b/destination/destination_integration_test.go @@ -0,0 +1,253 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package destination + +import ( + "context" + "encoding/base64" + "encoding/json" + "errors" + "os" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/conduitio-labs/conduit-connector-sqs/common" + sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/google/uuid" + "github.com/matryer/is" +) + +type ResultConfig struct { + Payload Data `json:"payload"` + MetaData string `json:"meta_data"` + Position string `json:"position"` + Key string `json:"key"` + Operation string `json:"operation"` +} + +type Data struct { + Before string `json:"before"` + After string `json:"after"` +} + +func TestDestination_SuccessfulMessageSend(t *testing.T) { + is := is.New(t) + ctx := context.Background() + + metadata := sdk.Metadata{} + destination := NewDestination() + defer func() { + err := destination.Teardown(ctx) + is.NoErr(err) + }() + + messageBody := "Test message body" + record := sdk.Util.Source.NewRecordCreate( + sdk.Position("111111"), + metadata, + sdk.RawData("1111111"), + sdk.RawData(messageBody), + ) + + client, url, cfg, err := prepareIntegrationTest(t) + is.NoErr(err) + + err = destination.Configure(ctx, cfg) + is.NoErr(err) + + err = destination.Open(ctx) + is.NoErr(err) + + ret, err := destination.Write(ctx, []sdk.Record{record}) + is.NoErr(err) + + is.Equal(ret, 1) + time.Sleep(5 * time.Second) + + message, err := client.ReceiveMessage( + context.Background(), + &sqs.ReceiveMessageInput{ + QueueUrl: url.QueueUrl, + }, + ) + + is.NoErr(err) + is.Equal(len(message.Messages), 1) + + var result ResultConfig + err = json.Unmarshal([]byte(*message.Messages[0].Body), &result) + is.NoErr(err) + bodyDecoded, err := base64.StdEncoding.DecodeString(result.Payload.After) + + is.NoErr(err) + is.Equal(string(bodyDecoded), messageBody) +} + +func TestDestination_FailBadRecord(t *testing.T) { + is := is.New(t) + ctx := context.Background() + + metadata := sdk.Metadata{} + destination := NewDestination() + defer func() { + err := destination.Teardown(ctx) + is.NoErr(err) + }() + + messageBody := "Test message body" + record := sdk.Util.Source.NewRecordCreate( + sdk.Position(""), + metadata, + sdk.RawData(""), + sdk.RawData(messageBody), + ) + + _, _, cfg, err := prepareIntegrationTest(t) + is.NoErr(err) + + err = destination.Configure(ctx, cfg) + is.NoErr(err) + + err = destination.Open(ctx) + is.NoErr(err) + + _, err = destination.Write(ctx, []sdk.Record{record}) + is.True(strings.Contains(err.Error(), "AWS.SimpleQueueService.InvalidBatchEntryId")) +} + +func TestDestination_FailNonExistentQueue(t *testing.T) { + is := is.New(t) + ctx := context.Background() + + destination := NewDestination() + defer func() { + err := destination.Teardown(ctx) + is.NoErr(err) + }() + + _, _, cfg, err := prepareIntegrationTest(t) + is.NoErr(err) + + cfg[common.ConfigKeyAWSQueue] = "" + + err = destination.Configure(ctx, cfg) + is.NoErr(err) + + err = destination.Open(ctx) + is.True(strings.Contains(err.Error(), "AWS.SimpleQueueService.NonExistentQueue")) +} + +func prepareIntegrationTest(t *testing.T) (*sqs.Client, *sqs.GetQueueUrlOutput, map[string]string, error) { + cfg, err := parseIntegrationConfig() + if err != nil { + t.Fatalf("could not parse config: %v", err) + } + + sourceQueue := "test-queue-destination-" + uuid.NewString() + + client, err := newAWSClient(cfg) + if err != nil { + t.Fatalf("could not create S3 client: %v", err) + } + + _, err = client.CreateQueue(context.Background(), &sqs.CreateQueueInput{ + QueueName: &sourceQueue, + }) + if err != nil { + return nil, nil, nil, err + } + + queueInput := &sqs.GetQueueUrlInput{ + QueueName: &sourceQueue, + } + // Get URL of queue + urlResult, err := client.GetQueueUrl(context.Background(), queueInput) + if err != nil { + return nil, nil, nil, err + } + + if err != nil { + return nil, nil, nil, err + } + + t.Cleanup(func() { + err := deleteSQSQueue(t, client, urlResult.QueueUrl) + if err != nil { + t.Fatal(err) + } + }) + + cfg[common.ConfigKeyAWSQueue] = sourceQueue + + return client, urlResult, cfg, nil +} + +func deleteSQSQueue(t *testing.T, svc *sqs.Client, url *string) error { + _, err := svc.DeleteQueue(context.Background(), &sqs.DeleteQueueInput{ + QueueUrl: url, + }) + if err != nil { + return err + } + return nil +} + +func newAWSClient(cfg map[string]string) (*sqs.Client, error) { + awsConfig, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion(cfg[common.ConfigKeyAWSRegion]), + config.WithCredentialsProvider( + credentials.NewStaticCredentialsProvider( + cfg[common.ConfigKeyAWSAccessKeyID], + cfg[common.ConfigKeyAWSSecretAccessKey], "")), + ) + if err != nil { + return nil, err + } + // Create a SQS client from just a session. + sqsClient := sqs.NewFromConfig(awsConfig) + + return sqsClient, nil +} + +func parseIntegrationConfig() (map[string]string, error) { + awsAccessKeyID := os.Getenv("AWS_ACCESS_KEY_ID") + + if awsAccessKeyID == "" { + return nil, errors.New("AWS_ACCESS_KEY_ID env var must be set") + } + + awsSecretAccessKey := os.Getenv("AWS_SECRET_ACCESS_KEY") + if awsSecretAccessKey == "" { + return nil, errors.New("AWS_SECRET_ACCESS_KEY env var must be set") + } + + awsMessageDelay := os.Getenv("AWS_MESSAGE_DELAY") + + awsRegion := os.Getenv("AWS_REGION") + if awsRegion == "" { + return nil, errors.New("AWS_REGION env var must be set") + } + + return map[string]string{ + common.ConfigKeyAWSAccessKeyID: awsAccessKeyID, + common.ConfigKeyAWSSecretAccessKey: awsSecretAccessKey, + ConfigKeyAWSSQSDelayTime: awsMessageDelay, + common.ConfigKeyAWSRegion: awsRegion, + }, nil +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..6e4fe82 --- /dev/null +++ b/go.mod @@ -0,0 +1,62 @@ +module github.com/conduitio-labs/conduit-connector-sqs + +go 1.21 + +require ( + github.com/aws/aws-sdk-go-v2 v1.22.1 + github.com/aws/aws-sdk-go-v2/config v1.22.1 + github.com/aws/aws-sdk-go-v2/credentials v1.15.1 + github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0 + github.com/conduitio/conduit-connector-sdk v0.8.0 + github.com/google/uuid v1.4.0 + github.com/matryer/is v1.4.1 +) + +require ( + github.com/Masterminds/goutils v1.1.1 // indirect + github.com/Masterminds/semver/v3 v3.2.0 // indirect + github.com/Masterminds/sprig/v3 v3.2.3 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.2 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.1 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.5.0 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.1 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.17.0 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.19.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.25.0 // indirect + github.com/aws/smithy-go v1.16.0 // indirect + github.com/conduitio/conduit-connector-protocol v0.5.0 // indirect + github.com/fatih/color v1.14.1 // indirect + github.com/goccy/go-json v0.10.2 // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/google/go-cmp v0.6.0 // indirect + github.com/hashicorp/go-hclog v1.4.0 // indirect + github.com/hashicorp/go-plugin v1.4.8 // indirect + github.com/hashicorp/yamux v0.1.1 // indirect + github.com/huandu/xstrings v1.4.0 // indirect + github.com/imdario/mergo v0.3.13 // indirect + github.com/jhump/protoreflect v1.10.2-0.20220118162304-602a8db873e3 // indirect + github.com/jpillora/backoff v1.0.0 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.19 // indirect + github.com/mitchellh/copystructure v1.2.0 // indirect + github.com/mitchellh/go-testing-interface v1.14.1 // indirect + github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/mitchellh/reflectwalk v1.0.2 // indirect + github.com/oklog/run v1.1.0 // indirect + github.com/rs/zerolog v1.31.0 // indirect + github.com/shopspring/decimal v1.3.1 // indirect + github.com/spf13/cast v1.5.0 // indirect + go.uber.org/goleak v1.3.0 // indirect + go.uber.org/multierr v1.11.0 // indirect + golang.org/x/crypto v0.14.0 // indirect + golang.org/x/exp v0.0.0-20230131160201-f062dba9d201 // indirect + golang.org/x/net v0.17.0 // indirect + golang.org/x/sys v0.13.0 // indirect + golang.org/x/text v0.13.0 // indirect + golang.org/x/time v0.4.0 // indirect + google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect + google.golang.org/grpc v1.56.3 // indirect + google.golang.org/protobuf v1.30.0 // indirect + gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..404e0aa --- /dev/null +++ b/go.sum @@ -0,0 +1,279 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI= +github.com/Masterminds/goutils v1.1.1/go.mod h1:8cTjp+g8YejhMuvIA5y2vz3BpJxksy863GQaJW2MFNU= +github.com/Masterminds/semver/v3 v3.2.0 h1:3MEsd0SM6jqZojhjLWWeBY+Kcjy9i6MQAeY7YgDP83g= +github.com/Masterminds/semver/v3 v3.2.0/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ= +github.com/Masterminds/sprig/v3 v3.2.3 h1:eL2fZNezLomi0uOLqjQoN6BfsDD+fyLtgbJMAj9n6YA= +github.com/Masterminds/sprig/v3 v3.2.3/go.mod h1:rXcFaZ2zZbLRJv/xSysmlgIM1u11eBaRMhvYXJNkGuM= +github.com/aws/aws-sdk-go-v2 v1.22.1 h1:sjnni/AuoTXxHitsIdT0FwmqUuNUuHtufcVDErVFT9U= +github.com/aws/aws-sdk-go-v2 v1.22.1/go.mod h1:Kd0OJtkW3Q0M0lUWGszapWjEvrXDzRW+D21JNsroB+c= +github.com/aws/aws-sdk-go-v2/config v1.22.1 h1:UrRYnF7mXCGuKmZWlczOXeH0WUbQpi/gseQIPtrhme8= +github.com/aws/aws-sdk-go-v2/config v1.22.1/go.mod h1:2eWgw5lps8fKI7LZVTrRTYP6HE6k/uEFUuTSHfXwqP0= +github.com/aws/aws-sdk-go-v2/credentials v1.15.1 h1:hmf6lAm9hk7uLCfapZn/jL05lm6Uwdbn1B0fgjyuf4M= +github.com/aws/aws-sdk-go-v2/credentials v1.15.1/go.mod h1:QTcHga3ZbQOneJuxmGBOCxiClxmp+TlvmjFexAnJ790= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.2 h1:gIeH4+o1MN/caGBWjoGQTUTIu94xD6fI5B2+TcwBf70= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.2/go.mod h1:wLyMIo/zPOhQhPXTddpfdkSleyigtFi8iMnC+2m/SK4= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1 h1:fi1ga6WysOyYb5PAf3Exd6B5GiSNpnZim4h1rhlBqx0= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.1/go.mod h1:V5CY8wNurvPUibTi9mwqUqpiFZ5LnioKWIFUDtIzdI8= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.1 h1:ZpaV/j48RlPc4AmOZuPv22pJliXjXq8/reL63YzyFnw= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.1/go.mod h1:R8aXraabD2e3qv1csxM14/X9WF4wFMIY0kH4YEtYD5M= +github.com/aws/aws-sdk-go-v2/internal/ini v1.5.0 h1:DqOQvIfmGkXZUVJnl9VRk0AnxyS59tCtX9k1Pyss4Ak= +github.com/aws/aws-sdk-go-v2/internal/ini v1.5.0/go.mod h1:VV/Kbw9Mg1GWJOT9WK+oTL3cWZiXtapnNvDSRqTZLsg= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.1 h1:2OXw3ppu1XsB6rqKEMV4tnecTjIY3PRV2U6IP6KPJQo= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.1/go.mod h1:FZB4AdakIqW/yERVdGJA6Z9jraax1beXfhBBnK2wwR8= +github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0 h1:21QmEZkOnaJ4SPRFhhN+8MV5ewb0j1lxTg+RPp0mUeE= +github.com/aws/aws-sdk-go-v2/service/sqs v1.26.0/go.mod h1:E02a07/HTyJEHFpp+WMRh33xuNVdsd8WCbLlODeT4lU= +github.com/aws/aws-sdk-go-v2/service/sso v1.17.0 h1:I/Oh3IxGPfHXiGnwM54TD6hNr/8TlUrBXAtTyGhR+zw= +github.com/aws/aws-sdk-go-v2/service/sso v1.17.0/go.mod h1:H6NCMvDBqA+CvIaXzaSqM6LWtzv9BzZrqBOqz+PzRF8= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.19.0 h1:irbXQkfVYIRaewYSXcu4yVk0m2T+JzZd0dkop7FjmO0= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.19.0/go.mod h1:4wPNCkM22+oRe71oydP66K50ojDUC33XutSMi2pEF/M= +github.com/aws/aws-sdk-go-v2/service/sts v1.25.0 h1:sYIFy8tm1xQwRvVQ4CRuBGXKIg9sHNuG6+3UAQuoujk= +github.com/aws/aws-sdk-go-v2/service/sts v1.25.0/go.mod h1:S/LOQUeYDfJeJpFCIJDMjy7dwL4aA33HUdVi+i7uH8k= +github.com/aws/smithy-go v1.16.0 h1:gJZEH/Fqh+RsvlJ1Zt4tVAtV6bKkp3cC+R6FCZMNzik= +github.com/aws/smithy-go v1.16.0/go.mod h1:NukqUGpCZIILqqiV0NIjeFh24kd/FAa4beRb6nbIUPE= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/conduitio/conduit-connector-protocol v0.5.0 h1:Rr2SsDAvWDryQArvonwPoXBELQA2wRXr49xBLrAtBaM= +github.com/conduitio/conduit-connector-protocol v0.5.0/go.mod h1:UIhHWxq52hvwwbkvQDaRgZRHfbpDDmU7tZaw0mwLdd4= +github.com/conduitio/conduit-connector-sdk v0.8.0 h1:gvchqoj5d3AQsBoIosx4i32L8Ex9+5BuAyHi/IM9VD4= +github.com/conduitio/conduit-connector-sdk v0.8.0/go.mod h1:nOz4K3X6fD8YMe5CPbULwSEE18Eu02ZrpT6o6KwQfxs= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= +github.com/fatih/color v1.14.1 h1:qfhVLaG5s+nCROl1zJsZRxFeYrHLqWroPOQ8BWiNb4w= +github.com/fatih/color v1.14.1/go.mod h1:2oHN61fhTpgcxD3TSWCgKDiH1+x4OiDVVGH8WlgGZGg= +github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE= +github.com/frankban/quicktest v1.14.3/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps= +github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= +github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= +github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gordonklaus/ineffassign v0.0.0-20200309095847-7953dde2c7bf/go.mod h1:cuNKsD1zp2v6XfE/orVX2QE1LC+i254ceGcVeDT3pTU= +github.com/hashicorp/go-hclog v1.4.0 h1:ctuWFGrhFha8BnnzxqeRGidlEcQkDyL5u8J8t5eA11I= +github.com/hashicorp/go-hclog v1.4.0/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= +github.com/hashicorp/go-plugin v1.4.8 h1:CHGwpxYDOttQOY7HOWgETU9dyVjOXzniXDqJcYJE1zM= +github.com/hashicorp/go-plugin v1.4.8/go.mod h1:viDMjcLJuDui6pXb8U4HVfb8AamCWhHGUjr2IrTF67s= +github.com/hashicorp/yamux v0.1.1 h1:yrQxtgseBDrq9Y652vSRDvsKCJKOUD+GzTS4Y0Y8pvE= +github.com/hashicorp/yamux v0.1.1/go.mod h1:CtWFDAQgb7dxtzFs4tWbplKIe2jSi3+5vKbgIO0SLnQ= +github.com/huandu/xstrings v1.3.3/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= +github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU= +github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= +github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= +github.com/imdario/mergo v0.3.13 h1:lFzP57bqS/wsqKssCGmtLAb8A0wKjLGrve2q3PPVcBk= +github.com/imdario/mergo v0.3.13/go.mod h1:4lJ1jqUDcsbIECGy0RUJAXNIhg+6ocWgb1ALK2O4oXg= +github.com/jhump/protoreflect v1.10.2-0.20220118162304-602a8db873e3 h1:crcHucf8SsCh17stXXZBx5HW/Q0kfaWVUIdzimNejk8= +github.com/jhump/protoreflect v1.10.2-0.20220118162304-602a8db873e3/go.mod h1:7GcYQDdMU/O/BBrl/cX6PNHpXh6cenjd8pneu5yW7Tg= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/matryer/is v1.4.1 h1:55ehd8zaGABKLXQUe2awZ99BD/PTc2ls+KV/dXphgEQ= +github.com/matryer/is v1.4.1/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU= +github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw= +github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= +github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= +github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU= +github.com/mitchellh/go-testing-interface v1.14.1/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= +github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= +github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= +github.com/nishanths/predeclared v0.0.0-20200524104333-86fad755b4d3/go.mod h1:nt3d53pc1VYcphSCIaYAJtnPYnr3Zyn8fMq2wvPGPso= +github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA= +github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/rs/zerolog v1.31.0 h1:FcTR3NnLWW+NnTwwhFWiJSZr4ECLpqCm6QsEnyvbV4A= +github.com/rs/zerolog v1.31.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= +github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= +github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= +github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= +github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w= +github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155UU= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.3.0/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4= +golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20230131160201-f062dba9d201 h1:BEABXpNXLEz0WxtA+6CQIz2xkg80e+1zrhWyMcq8VzE= +golang.org/x/exp v0.0.0-20230131160201-f062dba9d201/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= +golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/time v0.4.0 h1:Z81tqI5ddIoXDPvVQ7/7CC9TnLM7ubaFG2qXYd5BbYY= +golang.org/x/time v0.4.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191130070609-6e064ea0cf2d/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200522201501-cb1345f3a375/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20200717024301-6ddee64345a6/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A= +google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.56.3 h1:8I4C0Yq1EjstUzUJzpcRVbuYA2mODtEmpWiQoN/b2nc= +google.golang.org/grpc v1.56.3/go.mod h1:I9bI3vqKfayGqPUAwGdOSu7kt6oIJLixfffKrpXqQ9s= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.1-0.20200805231151-a709e31e5d12/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= +google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 h1:yiW+nvdHb9LVqSHQBXfZCieqV4fzYhNBql77zY0ykqs= +gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637/go.mod h1:BHsqpu/nsuzkT5BpiH1EMZPLyqSMM8JbIavyFACoFNk= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= diff --git a/source/config.go b/source/config.go new file mode 100644 index 0000000..8c51056 --- /dev/null +++ b/source/config.go @@ -0,0 +1,29 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package source + +import "github.com/conduitio-labs/conduit-connector-sqs/common" + +//go:generate paramgen -output=config_paramgen.go Config + +type Config struct { + common.Config + // visibility timeout + AWSSQSVisibilityTimeout int32 `json:"aws.visibilityTimeout" default:"0"` +} + +const ( + ConfigKeySQSVisibilityTimeout = "aws.visibilityTimeout" +) diff --git a/source/config_paramgen.go b/source/config_paramgen.go new file mode 100644 index 0000000..8b7b4ea --- /dev/null +++ b/source/config_paramgen.go @@ -0,0 +1,51 @@ +// Code generated by paramgen. DO NOT EDIT. +// Source: github.com/ConduitIO/conduit-connector-sdk/tree/main/cmd/paramgen + +package source + +import ( + sdk "github.com/conduitio/conduit-connector-sdk" +) + +func (Config) Parameters() map[string]sdk.Parameter { + return map[string]sdk.Parameter{ + "aws.accessKeyId": { + Default: "", + Description: "amazon access key id", + Type: sdk.ParameterTypeString, + Validations: []sdk.Validation{ + sdk.ValidationRequired{}, + }, + }, + "aws.queue": { + Default: "", + Description: "amazon sqs queue name", + Type: sdk.ParameterTypeString, + Validations: []sdk.Validation{ + sdk.ValidationRequired{}, + }, + }, + "aws.region": { + Default: "", + Description: "amazon sqs region", + Type: sdk.ParameterTypeString, + Validations: []sdk.Validation{ + sdk.ValidationRequired{}, + }, + }, + "aws.secretAccessKey": { + Default: "", + Description: "amazon secret access key", + Type: sdk.ParameterTypeString, + Validations: []sdk.Validation{ + sdk.ValidationRequired{}, + }, + }, + "aws.visibilityTimeout": { + Default: "0", + Description: "visibility timeout", + Type: sdk.ParameterTypeInt, + Validations: []sdk.Validation{}, + }, + } +} diff --git a/source/config_test.go b/source/config_test.go new file mode 100644 index 0000000..9407f67 --- /dev/null +++ b/source/config_test.go @@ -0,0 +1,50 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package source + +import ( + "testing" + + "github.com/conduitio-labs/conduit-connector-sqs/common" + sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/matryer/is" +) + +func TestParseConfig(t *testing.T) { + exampleConfig := map[string]string{ + "aws.accessKeyId": "access-key-123", + "aws.secretAccessKey": "secret-key-321", + "aws.region": "us-east-1", + "aws.visibilityTimeout": "60", + "aws.queue": "queue", + } + + want := Config{ + Config: common.Config{ + AWSAccessKeyID: "access-key-123", + AWSSecretAccessKey: "secret-key-321", + AWSRegion: "us-east-1", + AWSQueue: "queue", + }, + AWSSQSVisibilityTimeout: 60, + } + + is := is.New(t) + var got Config + err := sdk.Util.ParseConfig(exampleConfig, &got) + + is.NoErr(err) + is.Equal(want, got) +} diff --git a/source/source.go b/source/source.go new file mode 100644 index 0000000..6217280 --- /dev/null +++ b/source/source.go @@ -0,0 +1,138 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package source + +import ( + "context" + "fmt" + + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" + sdk "github.com/conduitio/conduit-connector-sdk" +) + +type Source struct { + sdk.UnimplementedSource + config Config + svc *sqs.Client + queueURL string +} + +func NewSource() sdk.Source { + return sdk.SourceWithMiddleware(&Source{}, sdk.DefaultSourceMiddleware()...) +} + +func (s *Source) Parameters() map[string]sdk.Parameter { + return Config{}.Parameters() +} + +func (s *Source) Configure(ctx context.Context, cfg map[string]string) error { + sdk.Logger(ctx).Debug().Msg("Configuring Source Connector.") + + err := sdk.Util.ParseConfig(cfg, &s.config) + if err != nil { + return fmt.Errorf("failed to parse source config : %w", err) + } + + return nil +} + +func (s *Source) Open(ctx context.Context, _ sdk.Position) error { + cfg, err := config.LoadDefaultConfig(ctx, + config.WithRegion(s.config.AWSRegion), + config.WithCredentialsProvider( + credentials.NewStaticCredentialsProvider( + s.config.AWSAccessKeyID, + s.config.AWSSecretAccessKey, + "")), + ) + if err != nil { + return fmt.Errorf("failed to load aws config with given credentials : %w", err) + } + // Create a SQS client from just a session. + s.svc = sqs.NewFromConfig(cfg) + + queueInput := &sqs.GetQueueUrlInput{ + QueueName: &s.config.AWSQueue, + } + // Get URL of queue + urlResult, err := s.svc.GetQueueUrl(ctx, queueInput) + if err != nil { + return fmt.Errorf("failed to get queue amazon sqs URL: %w", err) + } + + s.queueURL = *urlResult.QueueUrl + + return nil +} + +func (s *Source) Read(ctx context.Context) (sdk.Record, error) { + var err error + receiveMessage := &sqs.ReceiveMessageInput{ + MessageAttributeNames: []string{ + string(types.QueueAttributeNameAll), + }, + QueueUrl: &s.queueURL, + MaxNumberOfMessages: 1, + VisibilityTimeout: s.config.AWSSQSVisibilityTimeout, + } + + // grab a message from queue + sqsMessages, err := s.svc.ReceiveMessage(ctx, receiveMessage) + if err != nil { + return sdk.Record{}, fmt.Errorf("error retrieving amazon sqs messages: %w", err) + } + + // if there are no messages in queue, backoff + if len(sqsMessages.Messages) == 0 { + return sdk.Record{}, sdk.ErrBackoffRetry + } + + attributes := sqsMessages.Messages[0].MessageAttributes + mt := sdk.Metadata{} + for key, value := range attributes { + mt[key] = *value.StringValue + } + + rec := sdk.Util.Source.NewRecordCreate( + sdk.Position(*sqsMessages.Messages[0].ReceiptHandle), + mt, + sdk.RawData(*sqsMessages.Messages[0].MessageId), + sdk.RawData(*sqsMessages.Messages[0].Body), + ) + return rec, nil +} + +func (s *Source) Ack(ctx context.Context, position sdk.Position) error { + // once message received in queue, remove it + receiptHandle := string(position) + deleteMessage := &sqs.DeleteMessageInput{ + QueueUrl: &s.queueURL, + ReceiptHandle: &receiptHandle, + } + + _, err := s.svc.DeleteMessage(ctx, deleteMessage) + if err != nil { + return fmt.Errorf("failed to delete sqs message with receipt handle %s : %w", string(position), err) + } + + return nil +} + +func (s *Source) Teardown(_ context.Context) error { + return nil +} diff --git a/source/source_integration_test.go b/source/source_integration_test.go new file mode 100644 index 0000000..66c12a4 --- /dev/null +++ b/source/source_integration_test.go @@ -0,0 +1,225 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package source + +import ( + "context" + "errors" + "os" + "strings" + "testing" + + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/conduitio-labs/conduit-connector-sqs/common" + sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/google/uuid" + "github.com/matryer/is" +) + +func TestSource_SuccessfulMessageReceive(t *testing.T) { + is := is.New(t) + ctx := context.Background() + source := NewSource() + defer func() { + err := source.Teardown(ctx) + is.NoErr(err) + }() + sourceQueue := "test-queue-source-" + uuid.NewString() + client, url, cfg, err := prepareIntegrationTest(t, sourceQueue) + is.NoErr(err) + + messageBody := "Test message body" + _, err = client.SendMessage( + context.Background(), + &sqs.SendMessageInput{ + MessageBody: &messageBody, + QueueUrl: url.QueueUrl, + }, + ) + is.NoErr(err) + + err = source.Configure(ctx, cfg) + is.NoErr(err) + + err = source.Open(ctx, nil) + is.NoErr(err) + + record, err := source.Read(ctx) + is.NoErr(err) + + err = source.Ack(ctx, record.Position) + is.NoErr(err) + + is.Equal(string(record.Payload.After.Bytes()), messageBody) + + record, err = source.Read(ctx) + if err != sdk.ErrBackoffRetry || record.Metadata != nil { + t.Fatalf("expected no records and a signal that there are no more records, got %v %v", record, err) + } + + _ = source.Teardown(ctx) +} +func TestSource_FailBadCreds(t *testing.T) { + is := is.New(t) + ctx := context.Background() + source := NewSource() + defer func() { + err := source.Teardown(ctx) + is.NoErr(err) + }() + sourceQueue := "test-queue-source-" + uuid.NewString() + _, _, cfg, err := prepareIntegrationTest(t, sourceQueue) + is.NoErr(err) + + cfg[common.ConfigKeyAWSAccessKeyID] = "" + + err = source.Configure(ctx, cfg) + is.NoErr(err) + + err = source.Open(ctx, nil) + is.True(strings.Contains(err.Error(), "failed to refresh cached credentials, static credentials are empty")) +} + +func TestSource_EmptyQueue(t *testing.T) { + is := is.New(t) + sourceQueue := "test-queue-source-" + uuid.NewString() + _, _, cfg, err := prepareIntegrationTest(t, sourceQueue) + ctx := context.Background() + is.NoErr(err) + + source := NewSource() + defer func() { + err := source.Teardown(ctx) + is.NoErr(err) + }() + err = source.Configure(ctx, cfg) + is.NoErr(err) + + err = source.Open(ctx, nil) + is.NoErr(err) + + record, err := source.Read(ctx) + + if err != sdk.ErrBackoffRetry || record.Metadata != nil { + t.Fatalf("expected no records and a signal that there are no more records, got %v %v", record, err) + } +} + +func TestSource_FailEmptyQueueName(t *testing.T) { + is := is.New(t) + sourceQueue := "" + _, _, _, err := prepareIntegrationTest(t, sourceQueue) + is.True(strings.Contains(err.Error(), "Queue name cannot be empty")) +} + +func prepareIntegrationTest(t *testing.T, sourceQueue string) (*sqs.Client, *sqs.GetQueueUrlOutput, map[string]string, error) { + cfg, err := parseIntegrationConfig() + if err != nil { + t.Fatalf("could not parse config: %v", err) + } + + client, err := newAWSClient(cfg) + if err != nil { + t.Fatalf("could not create S3 client: %v", err) + } + + _, err = client.CreateQueue(context.Background(), &sqs.CreateQueueInput{ + QueueName: &sourceQueue, + }) + if err != nil { + return nil, nil, nil, err + } + + queueInput := &sqs.GetQueueUrlInput{ + QueueName: &sourceQueue, + } + // Get URL of queue + urlResult, err := client.GetQueueUrl(context.Background(), queueInput) + if err != nil { + return nil, nil, nil, err + } + + if err != nil { + return nil, nil, nil, err + } + + t.Cleanup(func() { + err := deleteSQSQueue(t, client, urlResult.QueueUrl) + if err != nil { + t.Fatal(err) + } + }) + + cfg[common.ConfigKeyAWSQueue] = sourceQueue + + return client, urlResult, cfg, nil +} + +func deleteSQSQueue(t *testing.T, svc *sqs.Client, url *string) error { + _, err := svc.DeleteQueue(context.Background(), &sqs.DeleteQueueInput{ + QueueUrl: url, + }) + if err != nil { + return err + } + return nil +} + +func newAWSClient(cfg map[string]string) (*sqs.Client, error) { + awsConfig, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion(cfg[common.ConfigKeyAWSRegion]), + config.WithCredentialsProvider( + credentials.NewStaticCredentialsProvider( + cfg[common.ConfigKeyAWSAccessKeyID], + cfg[common.ConfigKeyAWSSecretAccessKey], + "")), + ) + if err != nil { + return nil, err + } + // Create a SQS client from just a session. + sqsClient := sqs.NewFromConfig(awsConfig) + + return sqsClient, nil +} + +func parseIntegrationConfig() (map[string]string, error) { + awsAccessKeyID := os.Getenv("AWS_ACCESS_KEY_ID") + + if awsAccessKeyID == "" { + return nil, errors.New("AWS_ACCESS_KEY_ID env var must be set") + } + + awsSecretAccessKey := os.Getenv("AWS_SECRET_ACCESS_KEY") + if awsSecretAccessKey == "" { + return nil, errors.New("AWS_SECRET_ACCESS_KEY env var must be set") + } + + awsRegion := os.Getenv("AWS_REGION") + if awsRegion == "" { + return nil, errors.New("AWS_REGION env var must be set") + } + + awsVisibility := os.Getenv("AWS_VISIBILITY") + + return map[string]string{ + common.ConfigKeyAWSAccessKeyID: awsAccessKeyID, + common.ConfigKeyAWSSecretAccessKey: awsSecretAccessKey, + ConfigKeySQSVisibilityTimeout: awsVisibility, + common.ConfigKeyAWSRegion: awsRegion, + }, nil +} diff --git a/spec.go b/spec.go new file mode 100644 index 0000000..8f777f9 --- /dev/null +++ b/spec.go @@ -0,0 +1,35 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqs + +import ( + sdk "github.com/conduitio/conduit-connector-sdk" +) + +// version is set during the build process with ldflags (see Makefile). +// Default version matches default from runtime/debug. +var version = "(devel)" + +// Specification returns the Plugin's Specification. +func Specification() sdk.Specification { + return sdk.Specification{ + Name: "sqs", + Summary: "The Amazon SQS plugin for Conduit, written in Go.", + Description: "The Amazon SQS connector is one of Conduit plugins." + + "It provides the source and destination connector.", + Version: version, + Author: "Meroxa, Inc.", + } +}