Skip to content

Commit

Permalink
Add option for immediate rescan
Browse files Browse the repository at this point in the history
When immediate rescan is enabled, a poll from Kinesis that returns records will
skip waiting for the next scan interval and rescan immediately.

Related, the initial scan for a shard is performed immediately.  The
scan ticker only applies to subsequent scans.

These changes make higher scan intervals feasible, allowing for less
chatty clients while also improving overall effective throughput.
  • Loading branch information
jtackaberry committed Jul 31, 2020
1 parent 97ffabe commit 0a399d6
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 24 deletions.
55 changes: 31 additions & 24 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ func New(streamName string, opts ...Option) (*Consumer, error) {
logger: &noopLogger{
logger: log.New(ioutil.Discard, "", log.LstdFlags),
},
scanInterval: 250 * time.Millisecond,
maxRecords: 10000,
scanInterval: 250 * time.Millisecond,
immediateRescan: false,
maxRecords: 10000,
}

// override defaults
Expand Down Expand Up @@ -76,6 +77,7 @@ type Consumer struct {
logger Logger
store Store
scanInterval time.Duration
immediateRescan bool
maxRecords int64
}

Expand Down Expand Up @@ -158,33 +160,26 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
defer scanTicker.Stop()

for {
select {
case <-ctx.Done():
return nil
case <-scanTicker.C:
resp, err := c.client.GetRecords(&kinesis.GetRecordsInput{
Limit: aws.Int64(c.maxRecords),
ShardIterator: shardIterator,
})

// attempt to recover from GetRecords error when expired iterator
if err != nil {
c.logger.Log("[CONSUMER] get records error:", err.Error())
resp, err := c.client.GetRecords(&kinesis.GetRecordsInput{
Limit: aws.Int64(c.maxRecords),
ShardIterator: shardIterator,
})

if awserr, ok := err.(awserr.Error); ok {
if _, ok := retriableErrors[awserr.Code()]; !ok {
return fmt.Errorf("get records error: %v", awserr.Message())
}
}
// attempt to recover from GetRecords error when expired iterator
if err != nil {
c.logger.Log("[CONSUMER] get records error:", err.Error())

shardIterator, err = c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum)
if err != nil {
return fmt.Errorf("get shard iterator error: %v", err)
if awserr, ok := err.(awserr.Error); ok {
if _, ok := retriableErrors[awserr.Code()]; !ok {
return fmt.Errorf("get records error: %v", awserr.Message())
}

continue
}

shardIterator, err = c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum)
if err != nil {
return fmt.Errorf("get shard iterator error: %v", err)
}
} else {
// loop over records, call callback func
for _, r := range resp.Records {
select {
Expand Down Expand Up @@ -213,6 +208,18 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
}

shardIterator = resp.NextShardIterator
if c.immediateRescan && len(resp.Records) > 0 {
// We have records with immediate rescan enabled, so skip waiting for next tick
continue
}
}

// Wait for next scan
select {
case <-ctx.Done():
return nil
case <-scanTicker.C:
continue
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ func WithScanInterval(d time.Duration) Option {
}
}

// WithImmediateRescan overrides whether we wait for the next
// scan interval if records were fetched during a poll
func WithImmediateRescan(r bool) Option {
return func(c *Consumer) {
c.immediateRescan = r
}
}

// WithMaxRecords overrides the maximum number of records to be
// returned in a single GetRecords call for the consumer (specify a
// value of up to 10,000)
Expand Down

0 comments on commit 0a399d6

Please sign in to comment.