Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
  • Loading branch information
anna-cross committed Dec 6, 2023
1 parent 4ae3102 commit bab7350
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 61 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ Run `make build` to build the connector.


## Source


The source connector pulls data from the Amazon SQS Queue. As messages come in, the source connector grabs a single message from the Amazon SQS Queue, formats it for the destination connector as a new record, and sends it. The Message Body of the SQS Message is formatted into a record's payload, and the Message Attributes are passed as record metadata. After the record has been acknowledged, the source connector deletes the message from the Amazon SQS Queue.


## Destinaton


The destination connector splits incoming records into batches of 10 and pushes them to the Amazon SQS Queue. Any fields defined in the metadata of the record will be passed as Message Attributes, and the json encoding of the record will be passed as the Message Body.
The destination connector batches incoming records into 10 and pushes them to the Amazon SQS Queue. Any fields defined in the metadata of the record will be passed as Message Attributes, and the json encoding of the record will be passed as the Message Body.


### Configuration
Expand Down
38 changes: 38 additions & 0 deletions common/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package common

//go:generate paramgen -output=config_paramgen.go Config

type Config struct {
// amazon access key id
AWSAccessKeyID string `json:"aws.accessKeyId" validate:"required"`
// amazon secret access key
AWSSecretAccessKey string `json:"aws.secretAccessKey" validate:"required"`
// amazon sqs region
AWSRegion string `json:"aws.region" validate:"required"`
// amazon sqs queue name
AWSQueue string `json:"aws.queue" validate:"required"`
}

const (
ConfigKeyAWSAccessKeyID = "aws.accessKeyId"

ConfigKeyAWSSecretAccessKey = "aws.secretAccessKey"

ConfigKeyAWSRegion = "aws.region"

ConfigKeyAWSQueue = "aws.queue"
)
19 changes: 3 additions & 16 deletions destination/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,16 @@

package destination

import "github.com/conduitio-labs/conduit-connector-sqs/common"

//go:generate paramgen -output=config_paramgen.go Config

type Config struct {
// amazon access key id
AWSAccessKeyID string `json:"aws.accessKeyId" validate:"required"`
// amazon secret access key
AWSSecretAccessKey string `json:"aws.secretAccessKey" validate:"required"`
// amazon sqs region
AWSRegion string `json:"aws.region" validate:"required"`
// amazon sqs queue name
AWSQueue string `json:"aws.queue" validate:"required"`
common.Config
// amazon sqs message delay time
AWSSQSMessageDelay int32 `json:"aws.delayTime" default:"0"`
}

const (
ConfigKeyAWSAccessKeyID = "aws.accessKeyId"

ConfigKeyAWSSecretAccessKey = "aws.secretAccessKey"

ConfigKeyAWSRegion = "aws.region"

ConfigKeyAWSQueue = "aws.queue"

ConfigKeyAWSSQSDelayTime = "aws.delayTime"
)
11 changes: 7 additions & 4 deletions destination/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package destination
import (
"testing"

"github.com/conduitio-labs/conduit-connector-sqs/common"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/matryer/is"
)
Expand All @@ -30,10 +31,12 @@ func TestParseConfig(t *testing.T) {
"aws.delayTime": "10",
}
want := Config{
AWSAccessKeyID: "access-key-123",
AWSSecretAccessKey: "secret-key-321",
AWSRegion: "us-east-1",
AWSQueue: "queue",
Config: common.Config{
AWSAccessKeyID: "access-key-123",
AWSSecretAccessKey: "secret-key-321",
AWSRegion: "us-east-1",
AWSQueue: "queue",
},
AWSSQSMessageDelay: 10,
}

Expand Down
21 changes: 11 additions & 10 deletions destination/destination_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/conduitio-labs/conduit-connector-sqs/common"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/google/uuid"
"github.com/matryer/is"
Expand Down Expand Up @@ -143,7 +144,7 @@ func TestDestination_FailNonExistentQueue(t *testing.T) {
_, _, cfg, err := prepareIntegrationTest(t)
is.NoErr(err)

cfg[ConfigKeyAWSQueue] = ""
cfg[common.ConfigKeyAWSQueue] = ""

err = destination.Configure(ctx, cfg)
is.NoErr(err)
Expand All @@ -155,7 +156,7 @@ func TestDestination_FailNonExistentQueue(t *testing.T) {
func prepareIntegrationTest(t *testing.T) (*sqs.Client, *sqs.GetQueueUrlOutput, map[string]string, error) {
cfg, err := parseIntegrationConfig()
if err != nil {
t.Skip(err)
t.Fatalf("could not parse config: %v", err)
}

sourceQueue := "test-queue-destination-" + uuid.NewString()
Expand Down Expand Up @@ -192,7 +193,7 @@ func prepareIntegrationTest(t *testing.T) (*sqs.Client, *sqs.GetQueueUrlOutput,
}
})

cfg[ConfigKeyAWSQueue] = sourceQueue
cfg[common.ConfigKeyAWSQueue] = sourceQueue

return client, urlResult, cfg, nil
}
Expand All @@ -209,11 +210,11 @@ func deleteSQSQueue(t *testing.T, svc *sqs.Client, url *string) error {

func newAWSClient(cfg map[string]string) (*sqs.Client, error) {
awsConfig, err := config.LoadDefaultConfig(context.Background(),
config.WithRegion(cfg[ConfigKeyAWSRegion]),
config.WithRegion(cfg[common.ConfigKeyAWSRegion]),
config.WithCredentialsProvider(
credentials.NewStaticCredentialsProvider(
cfg[ConfigKeyAWSAccessKeyID],
cfg[ConfigKeyAWSSecretAccessKey], "")),
cfg[common.ConfigKeyAWSAccessKeyID],
cfg[common.ConfigKeyAWSSecretAccessKey], "")),
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -244,9 +245,9 @@ func parseIntegrationConfig() (map[string]string, error) {
}

return map[string]string{
ConfigKeyAWSAccessKeyID: awsAccessKeyID,
ConfigKeyAWSSecretAccessKey: awsSecretAccessKey,
ConfigKeyAWSSQSDelayTime: awsMessageDelay,
ConfigKeyAWSRegion: awsRegion,
common.ConfigKeyAWSAccessKeyID: awsAccessKeyID,
common.ConfigKeyAWSSecretAccessKey: awsSecretAccessKey,
ConfigKeyAWSSQSDelayTime: awsMessageDelay,
common.ConfigKeyAWSRegion: awsRegion,
}, nil
}
19 changes: 3 additions & 16 deletions source/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,16 @@

package source

import "github.com/conduitio-labs/conduit-connector-sqs/common"

//go:generate paramgen -output=config_paramgen.go Config

type Config struct {
// amazon access key id
AWSAccessKeyID string `json:"aws.accessKeyId" validate:"required"`
// amazon secret access key
AWSSecretAccessKey string `json:"aws.secretAccessKey" validate:"required"`
// amazon sqs region
AWSRegion string `json:"aws.region" validate:"required"`
// amazon sqs queue name
AWSQueue string `json:"aws.queue" validate:"required"`
common.Config
// visibility timeout
AWSSQSVisibilityTimeout int32 `json:"aws.visibilityTimeout" default:"0"`
}

const (
ConfigKeyAWSAccessKeyID = "aws.accessKeyId"

ConfigKeyAWSSecretAccessKey = "aws.secretAccessKey"

ConfigKeyAWSRegion = "aws.region"

ConfigKeyAWSQueue = "aws.queue"

ConfigKeySQSVisibilityTimeout = "aws.visibilityTimeout"
)
11 changes: 7 additions & 4 deletions source/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package source
import (
"testing"

"github.com/conduitio-labs/conduit-connector-sqs/common"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/matryer/is"
)
Expand All @@ -31,10 +32,12 @@ func TestParseConfig(t *testing.T) {
}

want := Config{
AWSAccessKeyID: "access-key-123",
AWSSecretAccessKey: "secret-key-321",
AWSRegion: "us-east-1",
AWSQueue: "queue",
Config: common.Config{
AWSAccessKeyID: "access-key-123",
AWSSecretAccessKey: "secret-key-321",
AWSRegion: "us-east-1",
AWSQueue: "queue",
},
AWSSQSVisibilityTimeout: 60,
}

Expand Down
21 changes: 11 additions & 10 deletions source/source_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/conduitio-labs/conduit-connector-sqs/common"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/google/uuid"
"github.com/matryer/is"
Expand Down Expand Up @@ -84,7 +85,7 @@ func TestSource_FailBadCreds(t *testing.T) {
_, _, cfg, err := prepareIntegrationTest(t, sourceQueue)
is.NoErr(err)

cfg[ConfigKeyAWSAccessKeyID] = ""
cfg[common.ConfigKeyAWSAccessKeyID] = ""

err = source.Configure(ctx, cfg)
is.NoErr(err)
Expand Down Expand Up @@ -128,7 +129,7 @@ func TestSource_FailEmptyQueueName(t *testing.T) {
func prepareIntegrationTest(t *testing.T, sourceQueue string) (*sqs.Client, *sqs.GetQueueUrlOutput, map[string]string, error) {
cfg, err := parseIntegrationConfig()
if err != nil {
t.Skip(err)
t.Fatalf("could not parse config: %v", err)
}

client, err := newAWSClient(cfg)
Expand Down Expand Up @@ -163,7 +164,7 @@ func prepareIntegrationTest(t *testing.T, sourceQueue string) (*sqs.Client, *sqs
}
})

cfg[ConfigKeyAWSQueue] = sourceQueue
cfg[common.ConfigKeyAWSQueue] = sourceQueue

return client, urlResult, cfg, nil
}
Expand All @@ -180,11 +181,11 @@ func deleteSQSQueue(t *testing.T, svc *sqs.Client, url *string) error {

func newAWSClient(cfg map[string]string) (*sqs.Client, error) {
awsConfig, err := config.LoadDefaultConfig(context.Background(),
config.WithRegion(cfg[ConfigKeyAWSRegion]),
config.WithRegion(cfg[common.ConfigKeyAWSRegion]),
config.WithCredentialsProvider(
credentials.NewStaticCredentialsProvider(
cfg[ConfigKeyAWSAccessKeyID],
cfg[ConfigKeyAWSSecretAccessKey],
cfg[common.ConfigKeyAWSAccessKeyID],
cfg[common.ConfigKeyAWSSecretAccessKey],
"")),
)
if err != nil {
Expand Down Expand Up @@ -216,9 +217,9 @@ func parseIntegrationConfig() (map[string]string, error) {
awsVisibility := os.Getenv("AWS_VISIBILITY")

return map[string]string{
ConfigKeyAWSAccessKeyID: awsAccessKeyID,
ConfigKeyAWSSecretAccessKey: awsSecretAccessKey,
ConfigKeySQSVisibilityTimeout: awsVisibility,
ConfigKeyAWSRegion: awsRegion,
common.ConfigKeyAWSAccessKeyID: awsAccessKeyID,
common.ConfigKeyAWSSecretAccessKey: awsSecretAccessKey,
ConfigKeySQSVisibilityTimeout: awsVisibility,
common.ConfigKeyAWSRegion: awsRegion,
}, nil
}

0 comments on commit bab7350

Please sign in to comment.