Skip to content

Commit

Permalink
add new message per file storage format (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
0xcaff authored Jul 23, 2024
1 parent 334dcce commit f14005b
Showing 1 changed file with 41 additions and 17 deletions.
58 changes: 41 additions & 17 deletions packages/duckdb_protobuf/src/io.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::error::Error;
use std::fs::File;
use std::io;

use byteorder::{BigEndian, ReadBytesExt};
use ouroboros::self_referencing;
use protobuf::CodedInputStream;
use std::error::Error;
use std::fs::File;
use std::io;
use strum::{AsRefStr, EnumIter, EnumString, IntoEnumIterator};

use crate::vtab::Parameters;
Expand Down Expand Up @@ -33,13 +32,31 @@ impl RecordsReader {
};

let next_file_path = next_file_path?;
let next_file = File::open(&next_file_path)?;
self.current_file = Some(LengthDelimitedRecordsReader::create(
next_file,
self.length_kind,
));

self.current_file.as_mut().unwrap()
let mut next_file = File::open(&next_file_path)?;

match self.length_kind {
LengthKind::BigEndianFixed => {
self.current_file = Some(LengthDelimitedRecordsReader::create(
next_file,
DelimitedLengthKind::BigEndianFixed,
));

self.current_file.as_mut().unwrap()
}
LengthKind::Varint => {
self.current_file = Some(LengthDelimitedRecordsReader::create(
next_file,
DelimitedLengthKind::Varint,
));

self.current_file.as_mut().unwrap()
}
LengthKind::SingleMessagePerFile => {
let mut bytes = Vec::new();
<File as io::Read>::read_to_end(&mut next_file, &mut bytes)?;
return Ok(Some(bytes));
}
}
};

let Some(next_message) = file_reader.try_get_next()? else {
Expand All @@ -55,6 +72,7 @@ impl RecordsReader {
pub enum LengthKind {
BigEndianFixed,
Varint,
SingleMessagePerFile,
}

pub fn parse<T: std::str::FromStr<Err = impl Error> + IntoEnumIterator + AsRef<str>>(
Expand All @@ -73,9 +91,15 @@ pub fn parse<T: std::str::FromStr<Err = impl Error> + IntoEnumIterator + AsRef<s
})?)
}

#[derive(Copy, Clone)]
enum DelimitedLengthKind {
BigEndianFixed,
Varint,
}

#[self_referencing]
pub struct LengthDelimitedRecordsReader {
length_kind: LengthKind,
length_kind: DelimitedLengthKind,
inner: File,

#[borrows(mut inner)]
Expand All @@ -84,7 +108,7 @@ pub struct LengthDelimitedRecordsReader {
}

impl LengthDelimitedRecordsReader {
pub fn create(inner: File, length_kind: LengthKind) -> Self {
fn create(inner: File, length_kind: DelimitedLengthKind) -> Self {
LengthDelimitedRecordsReaderBuilder {
length_kind,
inner,
Expand All @@ -93,12 +117,12 @@ impl LengthDelimitedRecordsReader {
.build()
}

pub fn get_next(&mut self) -> Result<Vec<u8>, io::Error> {
fn get_next(&mut self) -> Result<Vec<u8>, io::Error> {
let length_kind = *self.borrow_length_kind();
Ok(self.with_reader_mut(move |reader| {
let len = match length_kind {
LengthKind::BigEndianFixed => reader.read_u32::<BigEndian>()?,
LengthKind::Varint => reader.read_raw_varint32()?,
DelimitedLengthKind::BigEndianFixed => reader.read_u32::<BigEndian>()?,
DelimitedLengthKind::Varint => reader.read_raw_varint32()?,
};

let mut buf = vec![0; len as usize];
Expand All @@ -108,7 +132,7 @@ impl LengthDelimitedRecordsReader {
})?)
}

pub fn try_get_next(&mut self) -> Result<Option<Vec<u8>>, io::Error> {
fn try_get_next(&mut self) -> Result<Option<Vec<u8>>, io::Error> {
match self.get_next() {
Ok(it) => Ok(Some(it)),
Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => Ok(None),
Expand Down

0 comments on commit f14005b

Please sign in to comment.