diff --git a/src/component/cyfs-bdt-ext/src/cache/reader/reader.rs b/src/component/cyfs-bdt-ext/src/cache/reader/reader.rs index 32256960..b4e82e7f 100644 --- a/src/component/cyfs-bdt-ext/src/cache/reader/reader.rs +++ b/src/component/cyfs-bdt-ext/src/cache/reader/reader.rs @@ -63,11 +63,13 @@ impl ChunkStoreReader { BuckyError::new(BuckyErrorCode::IoError, msg) })?; - let actual_offset = file.seek(SeekFrom::Start(offset)).await.map_err(|e| { + // First verify the length + let chunk_len = chunk.len() as u64; + + let file_meta = file.metadata().await.map_err(|e| { let msg = format!( - "seek file to offset failed! chunk={}, offset={}, path={}, {}", + "get file metadata but failed! chunk={}, path={}, {}", chunk, - offset, path.display(), e ); @@ -76,17 +78,47 @@ impl ChunkStoreReader { BuckyError::new(BuckyErrorCode::IoError, msg) })?; - if actual_offset != offset { + if file_meta.len() < offset + chunk_len { let msg = format!( - "seek file to offset but unmatch! chunk={}, path={}, except offset={}, got={}", + "read chunk from file with offset but len unmatch! chunk={}, path={}, offset={}, chunk_len={}, file_len={}", chunk, path.display(), offset, - actual_offset + chunk_len, + file_meta.len(), ); error!("{}", msg); - return Err(BuckyError::new(BuckyErrorCode::IoError, msg)); + return Err(BuckyError::new(BuckyErrorCode::Unmatch, msg)); + } + + // Try to seek to chunk pos in file + if offset > 0 { + let actual_offset = file.seek(SeekFrom::Start(offset)).await.map_err(|e| { + let msg = format!( + "seek file to offset failed! chunk={}, offset={}, path={}, {}", + chunk, + offset, + path.display(), + e + ); + error!("{}", msg); + + BuckyError::new(BuckyErrorCode::IoError, msg) + })?; + + if actual_offset != offset { + let msg = format!( + "seek file to offset but unmatch! chunk={}, path={}, except offset={}, got={}", + chunk, + path.display(), + offset, + actual_offset + ); + error!("{}", msg); + + return Err(BuckyError::new(BuckyErrorCode::IoError, msg)); + } } // async_std::Take not support seek, so use ReaderWithLimit instead @@ -136,7 +168,8 @@ impl ChunkStoreReader { chunk, fr.path, fr.range_begin, fr.range_end ); let fixer = ChunkTrackerPosFixer::new(self.tracker.clone(), c.pos.clone()); - Self::read_chunk(chunk, Path::new(fr.path.as_str()), fr.range_begin, fixer).await + Self::read_chunk(chunk, Path::new(fr.path.as_str()), fr.range_begin, fixer) + .await } TrackerPostion::ChunkManager => { info!("will read chunk from chunk manager: chunk={}", chunk); @@ -319,15 +352,14 @@ impl ChunkHashErrorHandler for ChunkTrackerPosFixer { } } - #[cfg(test)] mod tests { use super::*; use async_std::io::prelude::*; use cyfs_base::*; use std::io::SeekFrom; - use std::str::FromStr; use std::path::PathBuf; + use std::str::FromStr; async fn test_file() { // let file = "C:\\cyfs\\data\\app\\cyfs-stack-test\\root\\test-chunk-in-bundle"; @@ -335,14 +367,15 @@ mod tests { let file = PathBuf::from("C:\\cyfs\\data\\test\\2KGw87zzn4.txt"); let chunk_id = ChunkId::from_str("7C8WW21osqTTTMyRLhUN8jDbYiRdBDNEMHMiHPdDEdBB").unwrap(); - + let _reader = ChunkStoreReader::read_chunk(&chunk_id, &file, 8388608, None).await; //let buf = std::fs::read(file).unwrap(); //let real_id = ChunkId::calculate_sync(&buf).unwrap(); //assert_eq!(real_id, chunk_id); let reader = async_std::fs::File::open(file).await.unwrap(); - let mut reader = ChunkReaderWithHash::new("test1".to_owned(), chunk_id, Box::new(reader), None); + let mut reader = + ChunkReaderWithHash::new("test1".to_owned(), chunk_id, Box::new(reader), None); let mut buf2 = vec![]; reader.read_to_end(&mut buf2).await.unwrap_err(); @@ -351,8 +384,7 @@ mod tests { #[test] fn test() { async_std::task::block_on(async move { - test1().await; - // test_file().await; + test_file().await; }); } } diff --git a/src/component/cyfs-util/src/util/read_helper.rs b/src/component/cyfs-util/src/util/read_helper.rs index e738e9f9..ff0b0e89 100644 --- a/src/component/cyfs-util/src/util/read_helper.rs +++ b/src/component/cyfs-util/src/util/read_helper.rs @@ -103,6 +103,8 @@ pub struct ChunkReaderWithHash { reader: Box, hash: sha2::Sha256, error_handler: Option>, + seeked: bool, + hashed_len: usize, } impl ChunkReaderWithHash { @@ -118,6 +120,8 @@ impl ChunkReaderWithHash { reader, hash: sha2::Sha256::new(), error_handler, + seeked: false, + hashed_len: 0, } } } @@ -133,31 +137,45 @@ impl async_std::io::Read for ChunkReaderWithHash { Poll::Ready(ret) => match ret { Ok(size) => { if size > 0 { + self.hashed_len += size; self.hash.input(&buf[0..size]); Poll::Ready(Ok(size)) } else { - let hash_value = self.hash.clone().result().into(); - let actual_id = ChunkId::new(&hash_value, self.chunk_id.len() as u32); - - if actual_id.eq(&self.chunk_id) { - debug!( - "read chunk from file complete! chunk={}, file={}", - self.chunk_id, self.path + if self.seeked { + warn!( + "read chunk with hash but seeked already! chunk={}", + self.chunk_id ); Poll::Ready(Ok(0)) + } else if self.hashed_len != self.chunk_id.len() { + error!("read chunk with hash but ended with unmatched length! chunk={}, len={}, read len={}", + self.chunk_id, self.chunk_id.len(), self.hashed_len,); + // FIXME what should we do? + Poll::Ready(Ok(0)) } else { - let msg = format!( - "content in file not match chunk id: chunk={}, file={}, expect hash={}, got={}", - self.chunk_id, self.path, self.chunk_id, actual_id - ); - error!("{}", msg); - - if let Some(error_handler) = self.error_handler.take() { - error_handler.on_hash_error(&self.chunk_id, &self.path); + let hash_value = self.hash.clone().result().into(); + let actual_id = ChunkId::new(&hash_value, self.chunk_id.len() as u32); + + if actual_id.eq(&self.chunk_id) { + debug!( + "read chunk from file complete! chunk={}, file={}", + self.chunk_id, self.path + ); + Poll::Ready(Ok(0)) + } else { + let msg = format!( + "content in file not match chunk id: chunk={}, file={}, expect hash={}, got={}", + self.chunk_id, self.path, self.chunk_id, actual_id + ); + error!("{}", msg); + + if let Some(error_handler) = self.error_handler.take() { + error_handler.on_hash_error(&self.chunk_id, &self.path); + } + + let err = BuckyError::new(BuckyErrorCode::InvalidData, msg); + Poll::Ready(Err(err.into())) } - - let err = BuckyError::new(BuckyErrorCode::InvalidData, msg); - Poll::Ready(Err(err.into())) } } } @@ -174,6 +192,7 @@ impl async_std::io::Seek for ChunkReaderWithHash { cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll> { + self.seeked = true; Pin::new(self.reader.as_mut()).poll_seek(cx, pos) } } @@ -194,13 +213,14 @@ mod tests { let file = "C:\\cyfs\\data\\test\\2JtHrtiW4J"; let chunk_id = ChunkId::from_str("7C8WXUGiYVyag6WXdsFz6B8JgpedMMgkng3MRM4XoPrX").unwrap(); - + //let buf = std::fs::read(file).unwrap(); //let real_id = ChunkId::calculate_sync(&buf).unwrap(); //assert_eq!(real_id, chunk_id); let reader = async_std::fs::File::open(file).await.unwrap(); - let mut reader = ChunkReaderWithHash::new("test1".to_owned(), chunk_id, Box::new(reader), None); + let mut reader = + ChunkReaderWithHash::new("test1".to_owned(), chunk_id, Box::new(reader), None); let mut buf2 = vec![]; reader.read_to_end(&mut buf2).await.unwrap_err();