diff --git a/src/coordinator/consumer.rs b/src/coordinator/consumer.rs index e3a1d5d..b4dbf8c 100644 --- a/src/coordinator/consumer.rs +++ b/src/coordinator/consumer.rs @@ -53,7 +53,7 @@ use crate::{ const CONSUMER_PROTOCOL_TYPE: &str = "consumer"; -macro_rules! offset_fetch_block { +macro_rules! fetch_offsets_block { ($self:ident, $source:ident) => { for topic in $source.topics { for partition in topic.partitions { @@ -63,13 +63,22 @@ macro_rules! offset_fetch_block { }; if partition.error_code.is_ok() { if let Some(partition_state) = $self.subscriptions.assignments.get_mut(&tp) { - partition_state.position.offset = partition.committed_offset; - partition_state.position.offset_epoch = - Some(partition.committed_leader_epoch); - info!( - "Fetch {tp} offset success, offset: {}", - partition.committed_offset - ); + // record the position with the offset (-1 indicates no committed offset to + // fetch) + if partition.committed_offset >= 0 { + partition_state.position.offset = partition.committed_offset; + partition_state.position.offset_epoch = + Some(partition.committed_leader_epoch); + info!( + "Fetch {tp} offset success, offset: {}", + partition.committed_offset + ); + } else { + debug!( + "Found no committed offset for partition {}", + partition.partition_index + ); + } } } else { error!( @@ -86,7 +95,7 @@ macro_rules! offset_fetch_block { if let Some(tp_state) = $self.subscriptions.assignments.get_mut(&tp) { tp_state.position.offset = offset; tp_state.position.offset_epoch = None; - tp_state.position.current_leader = $self.client.cluster_meta.current_leader(&tp); + tp_state.position.current_leader = $self.client.cluster.current_leader(&tp); info!("Seek {tp} with offset: {offset}",); } } @@ -370,7 +379,7 @@ impl CoordinatorInner { self.join_group().await?; self.sync_group().await?; - self.offset_fetch().await?; + self.fetch_offsets().await?; // resume fetch thread. self.fetcher.resume(); @@ -590,7 +599,7 @@ impl CoordinatorInner { } } - pub async fn offset_fetch(&mut self) -> Result<()> { + pub async fn fetch_offsets(&mut self) -> Result<()> { match self.client.version_range(ApiKey::OffsetFetchKey) { Some(version_range) => { let mut offset_fetch_response = self @@ -603,9 +612,9 @@ impl CoordinatorInner { match offset_fetch_response.error_code.err() { None => { if let Some(group) = offset_fetch_response.groups.pop() { - offset_fetch_block!(self, group); + fetch_offsets_block!(self, group); } else { - offset_fetch_block!(self, offset_fetch_response); + fetch_offsets_block!(self, offset_fetch_response); } Ok(()) } @@ -992,7 +1001,7 @@ async fn coordinator_loop( CoordinatorEvent::JoinGroup => coordinator.join_group().await, CoordinatorEvent::SyncGroup => coordinator.sync_group().await, CoordinatorEvent::LeaveGroup(reason) => coordinator.maybe_leave_group(reason).await, - CoordinatorEvent::OffsetFetch => coordinator.offset_fetch().await, + CoordinatorEvent::OffsetFetch => coordinator.fetch_offsets().await, CoordinatorEvent::PauseFetch => { coordinator.fetcher.pause(); Ok(()) diff --git a/src/protocol.rs b/src/protocol.rs index 9971d08..1b93557 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -160,6 +160,7 @@ impl KafkaCodec { } Ok(()) } + fn encode_request( &mut self, mut header: RequestHeader, @@ -192,18 +193,8 @@ impl KafkaCodec { Ok(()) } - fn response_header_version(&self, api_key: i16, api_version: i16) -> i16 { - if let Some(version_range) = self.support_versions.get(&api_key) { - if api_version >= version_range.max { - return 1; - } - } - 0 - } - fn decode_response(&mut self, src: &mut BytesMut) -> Result, ConnectionError> { - let mut correlation_id_bytes = src.try_peek_bytes(0..4)?; - let correlation_id = correlation_id_bytes.get_i32(); + let correlation_id = src.peek_bytes(0..4).get_i32(); let request_header = self .active_requests