diff --git a/checkpointer.go b/checkpointer.go index a894c59..09bc09b 100644 --- a/checkpointer.go +++ b/checkpointer.go @@ -35,11 +35,14 @@ var ErrSequenceIDNotFound = errors.New("SequenceIDNotFoundForShard") // DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend type DynamoCheckpoint struct { - TableName string - LeaseDuration int - Retries int - svc dynamodbiface.DynamoDBAPI - skipTableCheck bool + TableName string + LeaseDuration int + Retries int + ReadCapacityUnits *int64 + WriteCapacityUnits *int64 + BillingMode *string + svc dynamodbiface.DynamoDBAPI + skipTableCheck bool } // Init initialises the DynamoDB Checkpoint @@ -68,6 +71,10 @@ func (checkpointer *DynamoCheckpoint) Init() error { checkpointer.LeaseDuration = defaultLeaseDuration } + if checkpointer.BillingMode == nil { + checkpointer.BillingMode = aws.String("PAY_PER_REQUEST") + } + if !checkpointer.skipTableCheck && !checkpointer.doesTableExist() { return checkpointer.createTable() } @@ -217,18 +224,21 @@ func (checkpointer *DynamoCheckpoint) createTable() error { AttributeType: aws.String("S"), }, }, + BillingMode: checkpointer.BillingMode, KeySchema: []*dynamodb.KeySchemaElement{ { AttributeName: aws.String("ShardID"), KeyType: aws.String("HASH"), }, }, - ProvisionedThroughput: &dynamodb.ProvisionedThroughput{ - ReadCapacityUnits: aws.Int64(1), - WriteCapacityUnits: aws.Int64(1), - }, TableName: aws.String(checkpointer.TableName), } + if *checkpointer.BillingMode == "PROVISIONED" { + input.ProvisionedThroughput = &dynamodb.ProvisionedThroughput{ + ReadCapacityUnits: checkpointer.ReadCapacityUnits, + WriteCapacityUnits: checkpointer.WriteCapacityUnits, + } + } _, err := checkpointer.svc.CreateTable(input) return err } diff --git a/consumer.go b/consumer.go index 3998ed2..64bc1d6 100644 --- a/consumer.go +++ b/consumer.go @@ -52,12 +52,15 @@ type KinesisConsumer struct { StreamName string ShardIteratorType string RecordConsumer RecordConsumer - TableName string EmptyRecordBackoffMs int LeaseDuration int Monitoring MonitoringConfiguration DisableAutomaticCheckpoints bool Retries *int + TableName string + DynamoReadCapacityUnits *int64 + DynamoWriteCapacityUnits *int64 + DynamoBillingMode *string svc kinesisiface.KinesisAPI checkpointer Checkpointer stop *chan struct{} @@ -106,9 +109,12 @@ func (kc *KinesisConsumer) StartConsumer() error { } kc.svc = kinesis.New(session) kc.checkpointer = &DynamoCheckpoint{ - TableName: kc.TableName, - Retries: retries, - LeaseDuration: kc.LeaseDuration, + ReadCapacityUnits: kc.DynamoReadCapacityUnits, + WriteCapacityUnits: kc.DynamoWriteCapacityUnits, + BillingMode: kc.DynamoBillingMode, + TableName: kc.TableName, + Retries: retries, + LeaseDuration: kc.LeaseDuration, } } diff --git a/go.mod b/go.mod index b8bb9cf..29af7ca 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,7 @@ module github.com/patrobinson/gokini require ( - github.com/aws/aws-sdk-go v1.15.78 + github.com/aws/aws-sdk-go v1.19.38 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927 // indirect github.com/gogo/protobuf v1.1.1 // indirect diff --git a/go.sum b/go.sum index 1b04770..e4a8ec5 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/aws/aws-sdk-go v1.15.78 h1:LaXy6lWR0YK7LKyuU0QWy2ws/LWTPfYV/UgfiBu4tvY= github.com/aws/aws-sdk-go v1.15.78/go.mod h1:E3/ieXAlvM0XWO57iftYVDLLvQ824smPP3ATZkfNZeM= +github.com/aws/aws-sdk-go v1.19.38 h1:WKjobgPO4Ua1ww2NJJl2/zQNreUZxvqmEzwMlRjjm9g= +github.com/aws/aws-sdk-go v1.19.38/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927 h1:SKI1/fuSdodxmNNyVBR8d7X/HuLnRpvvFO0AgyQk764= @@ -14,6 +16,8 @@ github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8 h1:12VvqtR6Aowv3l/EQUlocDHW2Cp4G9WJVH7uyH8QFJE= github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/matryer/try v0.0.0-20161228173917-9ac251b645a2 h1:JAEbJn3j/FrhdWA9jW8B5ajsLIjeuEHLi8xE4fk997o=