Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect checkpoint seqnum pointing to expired data #126

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

jtackaberry
Copy link
Contributor

@jtackaberry jtackaberry commented Aug 5, 2020

Implementation for #125 for consideration/discussion.

CI will fail but I can add a mock for DescribeStreamSummaryWithContext if you think this is worthy of merging.

This also adds an extra ticker fastTicker which is based on the 5TPS rate limit, and which is used for those cases where we want a fast return back to Kinesis. I can amend #122 to use this as well, if/when the time comes. Alternatively, to squeeze more throughput, we could use the behavior from #122 (continue immediately) and only start backing off if we hit ErrCodeProvisionedThroughputExceededException.

},
)
if err != nil {
if err.(awserr.Error).Code() != "AccessDeniedException" {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no constant for this that I could find. kinesis.ErrCodeAccessDeniedException isn't a thing, and while this constant is defined in other packages, I didn't think it made sense to import one of them just for that.

if err.(awserr.Error).Code() != "AccessDeniedException" {
return err
}
c.logger.Log("[CONSUMER] IAM entity lacks kinesis:DescribeStreamSummary permissions, skipping extra sanity checks")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Existing users may have IAM policies crafted to work with kinesis-consumer that don't allow this permission, so it's optional.

@@ -228,7 +270,7 @@ func isShardClosed(nextShardIterator, currentShardIterator *string) bool {
return nextShardIterator == nil || currentShardIterator == nextShardIterator
}

func (c *Consumer) getShardIterator(ctx context.Context, streamName, shardID, seqNum string) (*string, error) {
func (c *Consumer) getShardIterator(ctx context.Context, streamName, shardID, seqNum string, oldest bool) (*string, error) {
params := &kinesis.GetShardIteratorInput{
ShardId: aws.String(shardID),
StreamName: aws.String(streamName),
Copy link
Contributor Author

@jtackaberry jtackaberry Aug 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we reference c.streamName here instead and get rid of the streamName argument to this function?


for {
nextTicker := scanTicker
resp, err := c.client.GetRecords(&kinesis.GetRecordsInput{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, should we be using GetRecordsWithContext here?

@lepek
Copy link

lepek commented Jan 5, 2023

Hi! What's the status of this? It hasn't been merge right? I am running into the same issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants