Skip to content

Commit

Permalink
Configure dynamo throughput
Browse files Browse the repository at this point in the history
Default to pay per request
allow configuring provisioned throughput
  • Loading branch information
Patrick Robinson committed May 27, 2019
1 parent 469ad86 commit 379b423
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 14 deletions.
28 changes: 19 additions & 9 deletions checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@ var ErrSequenceIDNotFound = errors.New("SequenceIDNotFoundForShard")

// DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend
type DynamoCheckpoint struct {
TableName string
LeaseDuration int
Retries int
svc dynamodbiface.DynamoDBAPI
skipTableCheck bool
TableName string
LeaseDuration int
Retries int
ReadCapacityUnits *int64
WriteCapacityUnits *int64
BillingMode *string
svc dynamodbiface.DynamoDBAPI
skipTableCheck bool
}

// Init initialises the DynamoDB Checkpoint
Expand Down Expand Up @@ -68,6 +71,10 @@ func (checkpointer *DynamoCheckpoint) Init() error {
checkpointer.LeaseDuration = defaultLeaseDuration
}

if checkpointer.BillingMode == nil {
checkpointer.BillingMode = aws.String("PAY_PER_REQUEST")
}

if !checkpointer.skipTableCheck && !checkpointer.doesTableExist() {
return checkpointer.createTable()
}
Expand Down Expand Up @@ -217,18 +224,21 @@ func (checkpointer *DynamoCheckpoint) createTable() error {
AttributeType: aws.String("S"),
},
},
BillingMode: checkpointer.BillingMode,
KeySchema: []*dynamodb.KeySchemaElement{
{
AttributeName: aws.String("ShardID"),
KeyType: aws.String("HASH"),
},
},
ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(1),
WriteCapacityUnits: aws.Int64(1),
},
TableName: aws.String(checkpointer.TableName),
}
if *checkpointer.BillingMode == "PROVISIONED" {
input.ProvisionedThroughput = &dynamodb.ProvisionedThroughput{
ReadCapacityUnits: checkpointer.ReadCapacityUnits,
WriteCapacityUnits: checkpointer.WriteCapacityUnits,
}
}
_, err := checkpointer.svc.CreateTable(input)
return err
}
Expand Down
14 changes: 10 additions & 4 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,15 @@ type KinesisConsumer struct {
StreamName string
ShardIteratorType string
RecordConsumer RecordConsumer
TableName string
EmptyRecordBackoffMs int
LeaseDuration int
Monitoring MonitoringConfiguration
DisableAutomaticCheckpoints bool
Retries *int
TableName string
DynamoReadCapacityUnits *int64
DynamoWriteCapacityUnits *int64
DynamoBillingMode *string
svc kinesisiface.KinesisAPI
checkpointer Checkpointer
stop *chan struct{}
Expand Down Expand Up @@ -106,9 +109,12 @@ func (kc *KinesisConsumer) StartConsumer() error {
}
kc.svc = kinesis.New(session)
kc.checkpointer = &DynamoCheckpoint{
TableName: kc.TableName,
Retries: retries,
LeaseDuration: kc.LeaseDuration,
ReadCapacityUnits: kc.DynamoReadCapacityUnits,
WriteCapacityUnits: kc.DynamoWriteCapacityUnits,
BillingMode: kc.DynamoBillingMode,
TableName: kc.TableName,
Retries: retries,
LeaseDuration: kc.LeaseDuration,
}
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module github.com/patrobinson/gokini

require (
github.com/aws/aws-sdk-go v1.15.78
github.com/aws/aws-sdk-go v1.19.38
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect
github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927 // indirect
github.com/gogo/protobuf v1.1.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/aws/aws-sdk-go v1.15.78 h1:LaXy6lWR0YK7LKyuU0QWy2ws/LWTPfYV/UgfiBu4tvY=
github.com/aws/aws-sdk-go v1.15.78/go.mod h1:E3/ieXAlvM0XWO57iftYVDLLvQ824smPP3ATZkfNZeM=
github.com/aws/aws-sdk-go v1.19.38 h1:WKjobgPO4Ua1ww2NJJl2/zQNreUZxvqmEzwMlRjjm9g=
github.com/aws/aws-sdk-go v1.19.38/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/cheekybits/is v0.0.0-20150225183255-68e9c0620927 h1:SKI1/fuSdodxmNNyVBR8d7X/HuLnRpvvFO0AgyQk764=
Expand All @@ -14,6 +16,8 @@ github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8 h1:12VvqtR6Aowv3l/EQUlocDHW2Cp4G9WJVH7uyH8QFJE=
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/matryer/try v0.0.0-20161228173917-9ac251b645a2 h1:JAEbJn3j/FrhdWA9jW8B5ajsLIjeuEHLi8xE4fk997o=
Expand Down

0 comments on commit 379b423

Please sign in to comment.