Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve throughput with optional immediate rescan #122

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

jtackaberry
Copy link
Contributor

@jtackaberry jtackaberry commented Jul 31, 2020

This PR adds an option WithImmediateRescan() which, when enabled (it's disabled by default), causes the scan loop to skip the scan tick and immediately go back to Kinesis for another batch.

Another change is that the first scan is always performed immediately, without waiting for the next scan tick. This new behavior exists regardless of whether immediate rescanning is enabled.

These two control flow changes combined allow for higher scan intervals, which results in less chatty clients while also preserving throughput. In fact, this slightly increases effective throughput per shard even with the default 250ms scan interval by eliminating any residual delay between batches, which improves the bandwidth-delay product.

A second commit in this PR exposes MillisBehindLatest in the Record struct passed to ScanFunc(). My use case is to allow applications to provide metrics for per-shard lag times, but there could be other uses as well.

This second change is unrelated, but since it's in the same area of code as the immediate rescan option they became co-dependent. If this is a problem, I can remove it for now.

@jtackaberry
Copy link
Contributor Author

A variation on this idea I'd like to consider is to also use MillisBehindLatest to aid in the decision of whether an immediate rescan should be done. If we fetched > 0 records and we're > 0 milliseconds behind, then rescan immediately. This could further reduce redundant calls to Kinesis while not impeding the scenario of trying to catch up on older events as quickly as possible.

I'll experiment with this idea.

consumer.go Show resolved Hide resolved
@harlow
Copy link
Owner

harlow commented Aug 1, 2020

Another change is that the first scan is always performed immediately, without waiting for the next scan tick. This new behavior exists regardless of whether immediate rescanning is enabled.

Love this! can we split this out into it's own PR and get this merged first. then we can work on the interval delay

consumer.go Outdated
@@ -213,6 +209,18 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
}

shardIterator = resp.NextShardIterator
if c.immediateRescan && len(resp.Records) > 0 {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the c.immediateRescan = true we'll skip the <-ctx.Done() below. wanna make sure the program will gracefully shutdown when told to

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, wouldn't that get picked up by the <-ctx.Done() in the inner loop above? Ok, there's a narrow window between the point where we finish iterating over all records (where <-ctx.Done() would apply) and looping back to c.client.GetRecords() but I see this as being very similar to the case of being asked to shutdown while having called out to c.client.GetRecords(): in either case, the Kinesis API call will complete quickly and we will fall through to the inner loop where <-ctx.Done() there would return.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh yep, good point. didn't see the additional check in the loop above (collapsed in PR preview)

@jtackaberry
Copy link
Contributor Author

Love this! can we split this out into it's own PR and get this merged first. then we can work on the interval delay

Yep no problem.

@jtackaberry jtackaberry changed the title Improve throughput and enable shard lag reporting Improve throughput with optional immediate rescan Aug 1, 2020
@jtackaberry
Copy link
Contributor Author

jtackaberry commented Aug 1, 2020

Another change is that the first scan is always performed immediately, without waiting for the next scan tick. This new behavior exists regardless of whether immediate rescanning is enabled.

Love this! can we split this out into it's own PR and get this merged first. then we can work on the interval delay

#123 is now created just with this logic.

I also removed the MillisBehindLatest addition to the Record from this PR so as not to muddy the waters. I'll submit a separate PR for that once #123 shakes out.

@jtackaberry
Copy link
Contributor Author

A variation on this idea I'd like to consider is to also use MillisBehindLatest to aid in the decision of whether an immediate rescan should be done. If we fetched > 0 records and we're > 0 milliseconds behind, then rescan immediately. This could further reduce redundant calls to Kinesis while not impeding the scenario of trying to catch up on older events as quickly as possible.

Ran this overnight and I'm happy with the result. I've included that logic in this PR.

When immediate rescan is enabled, a poll from Kinesis that returns records and
also indicates we are still behind the latest record will skip waiting for the
next scan interval and rescan immediately.

Related, the initial scan for a shard is performed immediately.  The
scan ticker only applies to subsequent scans.

These changes make higher scan intervals feasible, allowing for less
chatty clients while also improving overall effective throughput.
@jtackaberry
Copy link
Contributor Author

Rebased and squashed.

consumer.go Show resolved Hide resolved
@jtackaberry
Copy link
Contributor Author

Thinking on this a little more, I wonder if for certain types of streams this could actually reduce effective thoughput.

For low volume streams, this should be innocuous. When there's activity on the stream, we'll quickly catch up and then simply wait for the next scan tick as with the existing behavior.

For high volume streams, each call to Kinesis will give us plenty of work to do, so it makes sense to read that data as quickly as possible. However, there is a 5 TPS rate limit per shard for GetRecords, which we should try to respect lest we get throttled. With the immediate rescan logic, the probability of getting throttled is high for a high volume stream. So this is a gap in this PR.

A more interesting unintended consequence exists for middle-of-the-road data rates. In this scenario, we are more likely to have a higher number of GetRecords calls with each returning a fewer number of records than if gave a bit of time to queue up a hunk of records to pull at once.

Based on that, I'm thinking it makes sense to

  • have some threshold for the number of records Kinesis needs to return before we immediately rescan. With the PR now, that threshold is simply > 0 but I now think we should less aggressively return to Kinesis to give it time to queue up some records. The problem is I don't believe a static threshold will work sufficiently well. I think it should probably be adaptive based on the number of records typically returned in a GetRecords call for that shard.
  • include some logic to ensure we avoid being throttled

Thoughts?

@harlow
Copy link
Owner

harlow commented Aug 2, 2020

@jtackaberry I'd say let's keep it simple for now. If the current PR is working well for you then I'm cool with merging in it's current form.

One thing I guess could be interesting in this middle case; is whether *resp.MillisBehindLatest > c.scanInterval and if it's greater than we grab rows; else let it wait in the ticker until it gets to the desired interval.

@jtackaberry
Copy link
Contributor Author

One thing I guess could be interesting in this middle case; is whether *resp.MillisBehindLatest > c.scanInterval and if it's greater than we grab rows; else let it wait in the ticker until it gets to the desired interval.

That sounds like a sensible tweak. So you're not overly concerned about hitting the API frequently enough to trigger throttling?

I think we may want another ticker that fires according to the AWS-specified rate limit, which we fall back to in "immediate" mode. There's another use case driving this, which I'm going to open an issue about so we can talk about it further.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants