Skip to content

Commit

Permalink
perf: More efficient row encoding for pl.List (#19907)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Nov 23, 2024
1 parent adf703b commit 05f2abb
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 245 deletions.
211 changes: 62 additions & 149 deletions crates/polars-row/src/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,25 @@ fn dtype_and_data_to_encoded_item_len(

use ArrowDataType as D;
match dtype {
D::Binary
| D::LargeBinary
| D::Utf8
| D::LargeUtf8
| D::List(_)
| D::LargeList(_)
| D::BinaryView
| D::Utf8View => unsafe {
D::Binary | D::LargeBinary | D::Utf8 | D::LargeUtf8 | D::BinaryView | D::Utf8View => unsafe {
crate::variable::encoded_item_len(data, non_empty_sentinel, continuation_token)
},

D::List(list_field) | D::LargeList(list_field) => {
let mut data = data;
let mut item_len = 0;

let list_continuation_token = field.list_continuation_token();

while data[0] == list_continuation_token {
data = &data[1..];
let len = dtype_and_data_to_encoded_item_len(list_field.dtype(), data, field);
data = &data[len..];
item_len += 1 + len;
}
1 + item_len
},

D::FixedSizeBinary(_) => todo!(),
D::FixedSizeList(fsl_field, width) => {
let mut data = &data[1..];
Expand Down Expand Up @@ -162,130 +170,20 @@ fn rows_for_fixed_size_list<'a>(
return;
}

use ArrowDataType as D;
match dtype {
D::FixedSizeBinary(_) => todo!(),
D::BinaryView
| D::Utf8View
| D::Binary
| D::LargeBinary
| D::Utf8
| D::LargeUtf8
| D::List(_)
| D::LargeList(_) => {
let (non_empty_sentinel, continuation_token) = if field.descending {
(
!variable::NON_EMPTY_SENTINEL,
!variable::BLOCK_CONTINUATION_TOKEN,
)
} else {
(
variable::NON_EMPTY_SENTINEL,
variable::BLOCK_CONTINUATION_TOKEN,
)
};

for row in rows.iter_mut() {
for _ in 0..width {
let length = unsafe {
crate::variable::encoded_item_len(
row,
non_empty_sentinel,
continuation_token,
)
};
let v;
(v, *row) = row.split_at(length);
nested_rows.push(v);
}
}
},
_ => {
// @TODO: This is quite slow since we need to dispatch for possibly every nested type
for row in rows.iter_mut() {
for _ in 0..width {
let length = dtype_and_data_to_encoded_item_len(dtype, row, field);
let v;
(v, *row) = row.split_at(length);
nested_rows.push(v);
}
}
},
}
}

fn offsets_from_dtype_and_data(
dtype: &ArrowDataType,
field: &EncodingField,
data: &[u8],
offsets: &mut Vec<usize>,
) {
offsets.clear();

// Fast path: if the size is fixed, we can just divide.
if let Some(size) = fixed_size(dtype) {
assert!(size == 0 || data.len() % size == 0);
offsets.extend((0..data.len() / size).map(|i| i * size));
return;
}

use ArrowDataType as D;
match dtype {
D::FixedSizeBinary(_) => todo!(),
D::BinaryView
| D::Utf8View
| D::Binary
| D::LargeBinary
| D::Utf8
| D::LargeUtf8
| D::List(_)
| D::LargeList(_) => {
let mut data = data;
let (non_empty_sentinel, continuation_token) = if field.descending {
(
!variable::NON_EMPTY_SENTINEL,
!variable::BLOCK_CONTINUATION_TOKEN,
)
} else {
(
variable::NON_EMPTY_SENTINEL,
variable::BLOCK_CONTINUATION_TOKEN,
)
};
let mut offset = 0;
while !data.is_empty() {
let length = unsafe {
crate::variable::encoded_item_len(data, non_empty_sentinel, continuation_token)
};
offsets.push(offset);
data = &data[length..];
offset += length;
}
},
_ => {
// @TODO: This is quite slow since we need to dispatch for possibly every nested type
let mut data = data;
let mut offset = 0;
while !data.is_empty() {
let length = dtype_and_data_to_encoded_item_len(dtype, data, field);
offsets.push(offset);
data = &data[length..];
offset += length;
}
},
// @TODO: This is quite slow since we need to dispatch for possibly every nested type
for row in rows.iter_mut() {
for _ in 0..width {
let length = dtype_and_data_to_encoded_item_len(dtype, row, field);
let v;
(v, *row) = row.split_at(length);
nested_rows.push(v);
}
}
}

unsafe fn decode(rows: &mut [&[u8]], field: &EncodingField, dtype: &ArrowDataType) -> ArrayRef {
match dtype {
ArrowDataType::Null => {
// Temporary: remove when list encoding is better.
for row in rows.iter_mut() {
*row = &row[1..];
}

NullArray::new(ArrowDataType::Null, rows.len()).to_boxed()
},
ArrowDataType::Null => NullArray::new(ArrowDataType::Null, rows.len()).to_boxed(),
ArrowDataType::Boolean => decode_bool(rows, field).to_boxed(),
ArrowDataType::BinaryView | ArrowDataType::LargeBinary => {
decode_binview(rows, field).to_boxed()
Expand Down Expand Up @@ -323,36 +221,51 @@ unsafe fn decode(rows: &mut [&[u8]], field: &EncodingField, dtype: &ArrowDataTyp
FixedSizeListArray::new(dtype.clone(), rows.len(), values, validity).to_boxed()
},
ArrowDataType::List(list_field) | ArrowDataType::LargeList(list_field) => {
let arr = decode_binary(rows, field);
let mut validity = MutableBitmap::new();

let mut offsets = Vec::with_capacity(rows.len());
// @TODO: we could consider making this into a scratchpad
let mut nested_offsets = Vec::new();
offsets_from_dtype_and_data(
list_field.dtype(),
field,
arr.values().as_ref(),
&mut nested_offsets,
);
// @TODO: This might cause realloc, fix.
nested_offsets.push(arr.values().len());
let mut nested_rows = nested_offsets
.windows(2)
.map(|vs| &arr.values()[vs[0]..vs[1]])
.collect::<Vec<_>>();

let mut i = 0;
for offset in arr.offsets().iter() {
while nested_offsets[i] != offset.as_usize() {
i += 1;
let num_rows = rows.len();
let mut nested_rows = Vec::new();
let mut offsets = Vec::with_capacity(rows.len() + 1);
offsets.push(0);

let list_null_sentinel = field.list_null_sentinel();
let list_continuation_token = field.list_continuation_token();
let list_termination_token = field.list_termination_token();

// @TODO: make a specialized loop for fixed size list_field.dtype()
for (i, row) in rows.iter_mut().enumerate() {
while row[0] == list_continuation_token {
*row = &row[1..];
let len = dtype_and_data_to_encoded_item_len(list_field.dtype(), row, field);
nested_rows.push(&row[..len]);
*row = &row[len..];
}

offsets.push(i as i64);
offsets.push(nested_rows.len() as i64);

// @TODO: Might be better to make this a 2-loop system.
if row[0] == list_null_sentinel {
*row = &row[1..];
validity.reserve(num_rows);
validity.extend_constant(i - validity.len(), true);
validity.push(false);
continue;
}

assert_eq!(row[0], list_termination_token);
*row = &row[1..];
}

let validity = if validity.is_empty() {
None
} else {
validity.extend_constant(num_rows - validity.len(), true);
Some(validity.freeze())
};
assert_eq!(offsets.len(), rows.len() + 1);

let values = decode(&mut nested_rows, field, list_field.dtype());
let (_, _, _, validity) = arr.into_inner();

ListArray::<i64>::new(
dtype.clone(),
Expand Down
Loading

0 comments on commit 05f2abb

Please sign in to comment.