Skip to content

Commit

Permalink
chore: make fmt happy
Browse files Browse the repository at this point in the history
  • Loading branch information
iamazy committed Mar 22, 2024
1 parent 44df033 commit dcdb185
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 25 deletions.
37 changes: 23 additions & 14 deletions src/coordinator/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use crate::{

const CONSUMER_PROTOCOL_TYPE: &str = "consumer";

macro_rules! offset_fetch_block {
macro_rules! fetch_offsets_block {
($self:ident, $source:ident) => {
for topic in $source.topics {
for partition in topic.partitions {
Expand All @@ -63,13 +63,22 @@ macro_rules! offset_fetch_block {
};
if partition.error_code.is_ok() {
if let Some(partition_state) = $self.subscriptions.assignments.get_mut(&tp) {
partition_state.position.offset = partition.committed_offset;
partition_state.position.offset_epoch =
Some(partition.committed_leader_epoch);
info!(
"Fetch {tp} offset success, offset: {}",
partition.committed_offset
);
// record the position with the offset (-1 indicates no committed offset to
// fetch)
if partition.committed_offset >= 0 {
partition_state.position.offset = partition.committed_offset;
partition_state.position.offset_epoch =
Some(partition.committed_leader_epoch);
info!(
"Fetch {tp} offset success, offset: {}",
partition.committed_offset
);
} else {
debug!(
"Found no committed offset for partition {}",
partition.partition_index
);
}
}
} else {
error!(
Expand All @@ -86,7 +95,7 @@ macro_rules! offset_fetch_block {
if let Some(tp_state) = $self.subscriptions.assignments.get_mut(&tp) {
tp_state.position.offset = offset;
tp_state.position.offset_epoch = None;
tp_state.position.current_leader = $self.client.cluster_meta.current_leader(&tp);
tp_state.position.current_leader = $self.client.cluster.current_leader(&tp);
info!("Seek {tp} with offset: {offset}",);
}
}
Expand Down Expand Up @@ -370,7 +379,7 @@ impl<Exe: Executor> CoordinatorInner<Exe> {

self.join_group().await?;
self.sync_group().await?;
self.offset_fetch().await?;
self.fetch_offsets().await?;

// resume fetch thread.
self.fetcher.resume();
Expand Down Expand Up @@ -590,7 +599,7 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
}
}

pub async fn offset_fetch(&mut self) -> Result<()> {
pub async fn fetch_offsets(&mut self) -> Result<()> {
match self.client.version_range(ApiKey::OffsetFetchKey) {
Some(version_range) => {
let mut offset_fetch_response = self
Expand All @@ -603,9 +612,9 @@ impl<Exe: Executor> CoordinatorInner<Exe> {
match offset_fetch_response.error_code.err() {
None => {
if let Some(group) = offset_fetch_response.groups.pop() {
offset_fetch_block!(self, group);
fetch_offsets_block!(self, group);
} else {
offset_fetch_block!(self, offset_fetch_response);
fetch_offsets_block!(self, offset_fetch_response);
}
Ok(())
}
Expand Down Expand Up @@ -992,7 +1001,7 @@ async fn coordinator_loop<Exe: Executor>(
CoordinatorEvent::JoinGroup => coordinator.join_group().await,
CoordinatorEvent::SyncGroup => coordinator.sync_group().await,
CoordinatorEvent::LeaveGroup(reason) => coordinator.maybe_leave_group(reason).await,
CoordinatorEvent::OffsetFetch => coordinator.offset_fetch().await,
CoordinatorEvent::OffsetFetch => coordinator.fetch_offsets().await,
CoordinatorEvent::PauseFetch => {
coordinator.fetcher.pause();
Ok(())
Expand Down
13 changes: 2 additions & 11 deletions src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ impl KafkaCodec {
}
Ok(())
}

fn encode_request<Req: Request>(
&mut self,
mut header: RequestHeader,
Expand Down Expand Up @@ -192,18 +193,8 @@ impl KafkaCodec {
Ok(())
}

fn response_header_version(&self, api_key: i16, api_version: i16) -> i16 {
if let Some(version_range) = self.support_versions.get(&api_key) {
if api_version >= version_range.max {
return 1;
}
}
0
}

fn decode_response(&mut self, src: &mut BytesMut) -> Result<Option<Command>, ConnectionError> {
let mut correlation_id_bytes = src.try_peek_bytes(0..4)?;
let correlation_id = correlation_id_bytes.get_i32();
let correlation_id = src.peek_bytes(0..4).get_i32();

let request_header = self
.active_requests
Expand Down

0 comments on commit dcdb185

Please sign in to comment.