Skip to content

Commit

Permalink
fix recluster endless loop
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Nov 13, 2024
1 parent 3023243 commit c5dda15
Showing 1 changed file with 9 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,42 +94,24 @@ impl AccumulatingTransform for BlockCompactNoSplitBuilder {
res.push(Self::create_output_data(&mut self.staged_blocks));
}

if !self.check_for_compact() && !self.pending_blocks.is_empty() {
// blocks > 2N
res.push(Self::create_output_data(&mut self.pending_blocks));
} else {
if self.check_for_compact() || self.pending_blocks.is_empty() {
// N <= blocks < 2N
std::mem::swap(&mut self.staged_blocks, &mut self.pending_blocks);
} else {
// blocks > 2N
res.push(Self::create_output_data(&mut self.pending_blocks));
}
self.staged_blocks.push(data);
self.reset_accumulated();
Ok(res)
}

fn on_finish(&mut self, _output: bool) -> Result<Vec<DataBlock>> {
match (
self.pending_blocks.is_empty(),
self.staged_blocks.is_empty(),
) {
(true, true) => Ok(vec![]),
(true, false) => Ok(vec![Self::create_output_data(&mut self.staged_blocks)]),
(false, true) => Ok(vec![Self::create_output_data(&mut self.pending_blocks)]),
(false, false) => {
for block in &self.staged_blocks {
self.accumulated_rows += block.num_rows();
self.accumulated_bytes += block.memory_size();
}
if self.check_for_compact() {
self.staged_blocks.append(&mut self.pending_blocks);
Ok(vec![Self::create_output_data(&mut self.staged_blocks)])
} else {
// blocks > 2N
Ok(vec![
Self::create_output_data(&mut self.staged_blocks),
Self::create_output_data(&mut self.pending_blocks),
])
}
}
self.staged_blocks.append(&mut self.pending_blocks);
if self.staged_blocks.is_empty() {
Ok(vec![])
} else {
Ok(vec![Self::create_output_data(&mut self.staged_blocks)])
}
}
}

0 comments on commit c5dda15

Please sign in to comment.