Skip to content

Commit

Permalink
rm ConsumeClaim
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryRomanov committed Sep 16, 2024
1 parent aadf4b6 commit 5877a35
Showing 1 changed file with 0 additions and 21 deletions.
21 changes: 0 additions & 21 deletions plugin/input/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,27 +322,6 @@ func (p *Plugin) Commit(event *pipeline.Event) {
p.client.MarkCommitOffsets(offsets)
}

func (p *Plugin) ConsumeClaim(fetches kgo.Fetches) {
fetches.EachRecord(func(message *kgo.Record) {
sourceID := assembleSourceID(
p.idByTopic[message.Topic],
message.Partition,
)

offset := assembleOffset(message)
var metadataInfo metadata.MetaData
var err error
if len(p.config.Meta) > 0 {
metadataInfo, err = p.metaTemplater.Render(newMetaInformation(message))
if err != nil {
p.logger.Errorf("can't render meta data: %s", err.Error())
}
}

_ = p.controller.In(sourceID, "kafka", offset, message.Value, true, metadataInfo)
})
}

func assembleSourceID(index int, partition int32) pipeline.SourceID {
return pipeline.SourceID(index<<16 + int(partition))
}
Expand Down

0 comments on commit 5877a35

Please sign in to comment.