Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect checkpoint seqnum pointing to expired data #126

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 48 additions & 4 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type Consumer struct {
store Store
scanInterval time.Duration
maxRecords int64
retentionPeriod int64
}

// ScanFunc is the type of the function called for each message read
Expand All @@ -104,6 +105,23 @@ func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) error {
shardc = make(chan *kinesis.Shard, 1)
)

// Discover the retention period on the stream, which is used later in ScanShard()
// for failure mode handling.
summary, err := c.client.DescribeStreamSummaryWithContext(
aws.Context(ctx),
&kinesis.DescribeStreamSummaryInput{
StreamName: aws.String(c.streamName),
},
)
if err != nil {
if err.(awserr.Error).Code() != "AccessDeniedException" {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no constant for this that I could find. kinesis.ErrCodeAccessDeniedException isn't a thing, and while this constant is defined in other packages, I didn't think it made sense to import one of them just for that.

return err
}
c.logger.Log("[CONSUMER] IAM entity lacks kinesis:DescribeStreamSummary permissions, skipping extra sanity checks")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing users may have IAM policies crafted to work with kinesis-consumer that don't allow this permission, so it's optional.

} else {
c.retentionPeriod = *summary.StreamDescriptionSummary.RetentionPeriodHours * 3600 * 1000
}

go func() {
c.group.Start(ctx, shardc)
<-ctx.Done()
Expand Down Expand Up @@ -146,7 +164,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
}

// get shard iterator
shardIterator, err := c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum)
shardIterator, err := c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum, false)
if err != nil {
return fmt.Errorf("get shard iterator error: %v", err)
}
Expand All @@ -155,10 +173,19 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
defer func() {
c.logger.Log("[CONSUMER] stop scan:", shardID)
}()

// True if we should detect expired sequence number and fetch iterator for oldest
// record if necessary.
checkExpiredSeq := true
// User requested polling interval
scanTicker := time.NewTicker(c.scanInterval)
defer scanTicker.Stop()
// Limit before throttling based on the published Kinesis API limit of 5 TPS per shard
fastTicker := time.NewTicker(200 * time.Millisecond)
defer fastTicker.Stop()

for {
nextTicker := scanTicker
resp, err := c.client.GetRecords(&kinesis.GetRecordsInput{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, should we be using GetRecordsWithContext here?

Limit: aws.Int64(c.maxRecords),
ShardIterator: shardIterator,
Expand All @@ -174,7 +201,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
}
}

shardIterator, err = c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum)
shardIterator, err = c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum, false)
if err != nil {
return fmt.Errorf("get shard iterator error: %v", err)
}
Expand Down Expand Up @@ -207,13 +234,28 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
}

shardIterator = resp.NextShardIterator
if len(resp.Records) == 0 && c.retentionPeriod > 0 && *resp.MillisBehindLatest >= c.retentionPeriod {
// No records were returned and we are behind at least the retention
// period of the stream which means the last sequence number refers to
// expired data. If we haven't done so already, fetch an iterator for the
// shard's trim horizon.
if checkExpiredSeq {
shardIterator, err = c.getShardIterator(ctx, c.streamName, shardID, "", true)
if err != nil {
return fmt.Errorf("get shard iterator error: %v", err)
}
// Only fetch the trim horizon once.
checkExpiredSeq = false
}
nextTicker = fastTicker
}
}

// Wait for next scan
select {
case <-ctx.Done():
return nil
case <-scanTicker.C:
case <-nextTicker.C:
continue
}
}
Expand All @@ -228,7 +270,7 @@ func isShardClosed(nextShardIterator, currentShardIterator *string) bool {
return nextShardIterator == nil || currentShardIterator == nextShardIterator
}

func (c *Consumer) getShardIterator(ctx context.Context, streamName, shardID, seqNum string) (*string, error) {
func (c *Consumer) getShardIterator(ctx context.Context, streamName, shardID, seqNum string, oldest bool) (*string, error) {
params := &kinesis.GetShardIteratorInput{
ShardId: aws.String(shardID),
StreamName: aws.String(streamName),
Copy link
Contributor Author

@jtackaberry jtackaberry Aug 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we reference c.streamName here instead and get rid of the streamName argument to this function?

Expand All @@ -237,6 +279,8 @@ func (c *Consumer) getShardIterator(ctx context.Context, streamName, shardID, se
if seqNum != "" {
params.ShardIteratorType = aws.String(kinesis.ShardIteratorTypeAfterSequenceNumber)
params.StartingSequenceNumber = aws.String(seqNum)
} else if oldest {
params.ShardIteratorType = aws.String(kinesis.ShardIteratorTypeTrimHorizon)
} else if c.initialTimestamp != nil {
params.ShardIteratorType = aws.String(kinesis.ShardIteratorTypeAtTimestamp)
params.Timestamp = c.initialTimestamp
Expand Down