diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 2a5ce83d..88861ed3 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -66,6 +66,12 @@ jobs: - uses: Swatinem/rust-cache@v1 with: sharedKey: ${{ matrix.os }}-stable + - name: Format + run: | + make format + git diff --exit-code + env: + WITH_STABLE_TOOLCHAIN: 'force' - name: Clippy run: make clippy env: diff --git a/ctl/src/lib.rs b/ctl/src/lib.rs index 3f48c55a..50fba27f 100644 --- a/ctl/src/lib.rs +++ b/ctl/src/lib.rs @@ -13,13 +13,14 @@ //! # Raft Engine Control -use std::path::Path; -use std::sync::Arc; +use std::{path::Path, sync::Arc}; use clap::{crate_authors, crate_version, Parser}; -use raft_engine::env::{DefaultFileSystem, FileSystem}; -use raft_engine::internals::LogQueue; -use raft_engine::{Engine, Error, Result as EngineResult}; +use raft_engine::{ + env::{DefaultFileSystem, FileSystem}, + internals::LogQueue, + Engine, Error, Result as EngineResult, +}; #[derive(Debug, clap::Parser)] #[clap( diff --git a/examples/fork.rs b/examples/fork.rs index 6fb6b216..ce0c9475 100644 --- a/examples/fork.rs +++ b/examples/fork.rs @@ -1,9 +1,6 @@ -use std::path::Path; -use std::sync::Arc; +use std::{path::Path, sync::Arc}; -use raft_engine::env::DefaultFileSystem; -use raft_engine::Config; -use raft_engine::Engine; +use raft_engine::{env::DefaultFileSystem, Config, Engine}; fn main() { let mut args = std::env::args(); diff --git a/rustfmt.toml b/rustfmt.toml index b2715b26..3de3c63c 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1 +1,16 @@ +version = "Two" +unstable_features = true + +comment_width = 80 wrap_comments = true +format_code_in_doc_comments = true +format_macro_bodies = true +format_macro_matchers = true +normalize_comments = true +normalize_doc_attributes = true +condense_wildcard_suffixes = true +newline_style = "Unix" +use_field_init_shorthand = true +use_try_shorthand = true +imports_granularity = "Crate" +group_imports = "StdExternalCrate" diff --git a/src/codec.rs b/src/codec.rs index b8b34b18..883e75bf 100644 --- a/src/codec.rs +++ b/src/codec.rs @@ -2,8 +2,10 @@ #![allow(dead_code)] -use std::io::{self, ErrorKind, Write}; -use std::mem; +use std::{ + io::{self, ErrorKind, Write}, + mem, +}; use byteorder::{BigEndian, ByteOrder, LittleEndian, WriteBytesExt}; use thiserror::Error; @@ -357,10 +359,11 @@ pub fn read_u8(data: &mut BytesSlice<'_>) -> Result { #[cfg(test)] mod tests { - use super::*; + use std::{f32, f64, i16, i32, i64, io::ErrorKind, u16, u32, u64}; + use protobuf::CodedOutputStream; - use std::io::ErrorKind; - use std::{f32, f64, i16, i32, i64, u16, u32, u64}; + + use super::*; const U16_TESTS: &[u16] = &[ i16::MIN as u16, diff --git a/src/config.rs b/src/config.rs index 4283737f..f1a9532b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,8 +3,7 @@ use log::{info, warn}; use serde::{Deserialize, Serialize}; -use crate::pipe_log::Version; -use crate::{util::ReadableSize, Result}; +use crate::{pipe_log::Version, util::ReadableSize, Result}; const MIN_RECOVERY_READ_BLOCK_SIZE: usize = 512; const MIN_RECOVERY_THREADS: usize = 1; @@ -343,9 +342,11 @@ mod tests { let mut load: Config = toml::from_str(old).unwrap(); load.sanitize().unwrap(); // Downgrade to older version. - assert!(toml::to_string(&load) - .unwrap() - .contains("tolerate-corrupted-tail-records")); + assert!( + toml::to_string(&load) + .unwrap() + .contains("tolerate-corrupted-tail-records") + ); } #[test] diff --git a/src/consistency.rs b/src/consistency.rs index c400a664..da03243e 100644 --- a/src/consistency.rs +++ b/src/consistency.rs @@ -2,10 +2,12 @@ use hashbrown::HashMap; -use crate::file_pipe_log::ReplayMachine; -use crate::log_batch::{LogItemBatch, LogItemContent}; -use crate::pipe_log::{FileId, LogQueue}; -use crate::Result; +use crate::{ + file_pipe_log::ReplayMachine, + log_batch::{LogItemBatch, LogItemContent}, + pipe_log::{FileId, LogQueue}, + Result, +}; /// A `ConsistencyChecker` scans for log entry holes in a log queue. It will /// return a list of corrupted raft groups along with their last valid log diff --git a/src/engine.rs b/src/engine.rs index 0d055296..dd20c771 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -1,28 +1,32 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. -use std::cell::{Cell, RefCell}; -use std::marker::PhantomData; -use std::path::Path; -use std::sync::{mpsc, Arc, Mutex}; -use std::thread::{Builder as ThreadBuilder, JoinHandle}; -use std::time::{Duration, Instant}; +use std::{ + cell::{Cell, RefCell}, + marker::PhantomData, + path::Path, + sync::{mpsc, Arc, Mutex}, + thread::{Builder as ThreadBuilder, JoinHandle}, + time::{Duration, Instant}, +}; use log::{error, info}; use protobuf::{parse_from_bytes, Message}; -use crate::config::{Config, RecoveryMode}; -use crate::consistency::ConsistencyChecker; -use crate::env::{DefaultFileSystem, FileSystem}; -use crate::event_listener::EventListener; -use crate::file_pipe_log::debug::LogItemReader; -use crate::file_pipe_log::{DefaultMachineFactory, FilePipeLog, FilePipeLogBuilder}; -use crate::log_batch::{Command, LogBatch, MessageExt}; -use crate::memtable::{EntryIndex, MemTableRecoverContextFactory, MemTables}; -use crate::metrics::*; -use crate::pipe_log::{FileBlockHandle, FileId, LogQueue, PipeLog}; -use crate::purge::{PurgeHook, PurgeManager}; -use crate::write_barrier::{WriteBarrier, Writer}; -use crate::{perf_context, Error, GlobalStats, Result}; +use crate::{ + config::{Config, RecoveryMode}, + consistency::ConsistencyChecker, + env::{DefaultFileSystem, FileSystem}, + event_listener::EventListener, + file_pipe_log::{debug::LogItemReader, DefaultMachineFactory, FilePipeLog, FilePipeLogBuilder}, + log_batch::{Command, LogBatch, MessageExt}, + memtable::{EntryIndex, MemTableRecoverContextFactory, MemTables}, + metrics::*, + perf_context, + pipe_log::{FileBlockHandle, FileId, LogQueue, PipeLog}, + purge::{PurgeHook, PurgeManager}, + write_barrier::{WriteBarrier, Writer}, + Error, GlobalStats, Result, +}; const METRICS_FLUSH_INTERVAL: Duration = Duration::from_secs(30); /// Max times for `write`. @@ -106,11 +110,13 @@ where let memtables_clone = memtables.clone(); let metrics_flusher = ThreadBuilder::new() .name("re-metrics".into()) - .spawn(move || loop { - stats_clone.flush_metrics(); - memtables_clone.flush_metrics(); - if rx.recv_timeout(METRICS_FLUSH_INTERVAL).is_ok() { - break; + .spawn(move || { + loop { + stats_clone.flush_metrics(); + memtables_clone.flush_metrics(); + if rx.recv_timeout(METRICS_FLUSH_INTERVAL).is_ok() { + break; + } } })?; @@ -625,18 +631,24 @@ where #[cfg(test)] pub(crate) mod tests { - use super::*; - use crate::env::{ObfuscatedFileSystem, Permission}; - use crate::file_pipe_log::{parse_reserved_file_name, FileNameExt}; - use crate::log_batch::AtomicGroupBuilder; - use crate::pipe_log::Version; - use crate::test_util::{generate_entries, PanicGuard}; - use crate::util::ReadableSize; + use std::{ + collections::{BTreeSet, HashSet}, + fs::OpenOptions, + path::PathBuf, + }; + use kvproto::raft_serverpb::RaftLocalState; use raft::eraftpb::Entry; - use std::collections::{BTreeSet, HashSet}; - use std::fs::OpenOptions; - use std::path::PathBuf; + + use super::*; + use crate::{ + env::{ObfuscatedFileSystem, Permission}, + file_pipe_log::{parse_reserved_file_name, FileNameExt}, + log_batch::AtomicGroupBuilder, + pipe_log::Version, + test_util::{generate_entries, PanicGuard}, + util::ReadableSize, + }; pub(crate) type RaftLogEngine = Engine; impl RaftLogEngine { @@ -1208,9 +1220,11 @@ pub(crate) mod tests { // GC all log entries. Won't trigger purge because total size is not enough. let count = engine.compact_to(1, 100); assert_eq!(count, 100); - assert!(!engine - .purge_manager - .needs_rewrite_log_files(LogQueue::Append)); + assert!( + !engine + .purge_manager + .needs_rewrite_log_files(LogQueue::Append) + ); // Append more logs to make total size greater than `purge_threshold`. for index in 100..250 { @@ -1220,9 +1234,11 @@ pub(crate) mod tests { // GC first 101 log entries. assert_eq!(engine.compact_to(1, 101), 1); // Needs to purge because the total size is greater than `purge_threshold`. - assert!(engine - .purge_manager - .needs_rewrite_log_files(LogQueue::Append)); + assert!( + engine + .purge_manager + .needs_rewrite_log_files(LogQueue::Append) + ); let old_min_file_seq = engine.file_span(LogQueue::Append).0; let will_force_compact = engine.purge_expired_files().unwrap(); @@ -1236,9 +1252,11 @@ pub(crate) mod tests { assert_eq!(engine.compact_to(1, 102), 1); // Needs to purge because the total size is greater than `purge_threshold`. - assert!(engine - .purge_manager - .needs_rewrite_log_files(LogQueue::Append)); + assert!( + engine + .purge_manager + .needs_rewrite_log_files(LogQueue::Append) + ); let will_force_compact = engine.purge_expired_files().unwrap(); // The region needs to be force compacted because the threshold is reached. assert!(!will_force_compact.is_empty()); @@ -1327,9 +1345,11 @@ pub(crate) mod tests { engine.append(11, 1, 11, Some(&data)); // The engine needs purge, and all old entries should be rewritten. - assert!(engine - .purge_manager - .needs_rewrite_log_files(LogQueue::Append)); + assert!( + engine + .purge_manager + .needs_rewrite_log_files(LogQueue::Append) + ); assert!(engine.purge_expired_files().unwrap().is_empty()); assert!(engine.file_span(LogQueue::Append).0 > 1); @@ -1363,9 +1383,11 @@ pub(crate) mod tests { } } - assert!(engine - .purge_manager - .needs_rewrite_log_files(LogQueue::Append)); + assert!( + engine + .purge_manager + .needs_rewrite_log_files(LogQueue::Append) + ); assert!(engine.purge_expired_files().unwrap().is_empty()); } @@ -1670,7 +1692,7 @@ pub(crate) mod tests { } drop(engine); - //dump dir with raft groups. 8 element in raft groups 7 and 2 elements in raft + // dump dir with raft groups. 8 element in raft groups 7 and 2 elements in raft // groups 8 let dump_it = Engine::dump_with_file_system(dir.path(), fs.clone()).unwrap(); let total = dump_it @@ -1680,7 +1702,7 @@ pub(crate) mod tests { .count(); assert!(total == 10); - //dump file + // dump file let file_id = FileId { queue: LogQueue::Rewrite, seq: 1, @@ -1697,10 +1719,10 @@ pub(crate) mod tests { .count(); assert!(0 == total); - //dump dir that does not exists + // dump dir that does not exists assert!(Engine::dump_with_file_system(Path::new("/not_exists_dir"), fs.clone()).is_err()); - //dump file that does not exists + // dump file that does not exists let mut not_exists_file = PathBuf::from(dir.as_ref()); not_exists_file.push("not_exists_file"); assert!(Engine::dump_with_file_system(not_exists_file.as_path(), fs).is_err()); @@ -1733,7 +1755,7 @@ pub(crate) mod tests { let script1 = "".to_owned(); RaftLogEngine::unsafe_repair_with_file_system( dir.path(), - None, /* queue */ + None, // queue script1, fs.clone(), ) @@ -1752,7 +1774,7 @@ pub(crate) mod tests { .to_owned(); RaftLogEngine::unsafe_repair_with_file_system( dir.path(), - None, /* queue */ + None, // queue script2, fs.clone(), ) @@ -1814,7 +1836,7 @@ pub(crate) mod tests { .to_owned(); RaftLogEngine::unsafe_repair_with_file_system( dir.path(), - None, /* queue */ + None, // queue script, fs.clone(), ) diff --git a/src/env/default.rs b/src/env/default.rs index 44f4fa18..1ddbe98f 100644 --- a/src/env/default.rs +++ b/src/env/default.rs @@ -2,14 +2,15 @@ #[cfg(feature = "failpoints")] use std::io::{Error, ErrorKind}; -use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write}; -use std::path::Path; -use std::sync::Arc; +use std::{ + io::{Read, Result as IoResult, Seek, SeekFrom, Write}, + path::Path, + sync::Arc, +}; use fail::fail_point; -use crate::env::log_fd::LogFd; -use crate::env::{FileSystem, Handle, Permission, WriteExt}; +use crate::env::{log_fd::LogFd, FileSystem, Handle, Permission, WriteExt}; /// A low-level file adapted for standard interfaces including [`Seek`], /// [`Write`] and [`Read`]. diff --git a/src/env/log_fd/plain.rs b/src/env/log_fd/plain.rs index 03328e91..999ee61f 100644 --- a/src/env/log_fd/plain.rs +++ b/src/env/log_fd/plain.rs @@ -3,15 +3,17 @@ //! A naive file handle implementation based on standard `File`. All I/O //! operations need to synchronize under a `RwLock`. -use crate::env::{Handle, Permission}; +use std::{ + fs::{File, OpenOptions}, + io::{Error, ErrorKind, Read, Result, Seek, SeekFrom, Write}, + path::Path, + sync::Arc, +}; use fail::fail_point; use parking_lot::RwLock; -use std::fs::{File, OpenOptions}; -use std::io::{Error, ErrorKind, Read, Result, Seek, SeekFrom, Write}; -use std::path::Path; -use std::sync::Arc; +use crate::env::{Handle, Permission}; pub struct LogFd(Arc>); diff --git a/src/env/log_fd/unix.rs b/src/env/log_fd/unix.rs index 608cca70..1f49e2f8 100644 --- a/src/env/log_fd/unix.rs +++ b/src/env/log_fd/unix.rs @@ -1,19 +1,21 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. -use crate::env::{Handle, Permission}; +use std::{io::Result as IoResult, os::unix::io::RawFd}; use fail::fail_point; use log::error; +use nix::{ + errno::Errno, + fcntl::{self, OFlag}, + sys::{ + stat::Mode, + uio::{pread, pwrite}, + }, + unistd::{close, ftruncate, lseek, Whence}, + NixPath, +}; -use std::io::Result as IoResult; -use std::os::unix::io::RawFd; - -use nix::errno::Errno; -use nix::fcntl::{self, OFlag}; -use nix::sys::stat::Mode; -use nix::sys::uio::{pread, pwrite}; -use nix::unistd::{close, ftruncate, lseek, Whence}; -use nix::NixPath; +use crate::env::{Handle, Permission}; fn from_nix_error(e: nix::Error, custom: &'static str) -> std::io::Error { let kind = std::io::Error::from(e).kind(); diff --git a/src/env/mod.rs b/src/env/mod.rs index 06de495a..f15802a3 100644 --- a/src/env/mod.rs +++ b/src/env/mod.rs @@ -1,8 +1,10 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. -use std::io::{Read, Result, Seek, Write}; -use std::path::Path; -use std::sync::Arc; +use std::{ + io::{Read, Result, Seek, Write}, + path::Path, + sync::Arc, +}; mod default; mod log_fd; diff --git a/src/env/obfuscated.rs b/src/env/obfuscated.rs index 6adaf277..9364d973 100644 --- a/src/env/obfuscated.rs +++ b/src/env/obfuscated.rs @@ -1,9 +1,13 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. -use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write}; -use std::path::Path; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; +use std::{ + io::{Read, Result as IoResult, Seek, SeekFrom, Write}, + path::Path, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; use crate::env::{DefaultFileSystem, FileSystem, Permission, WriteExt}; diff --git a/src/errors.rs b/src/errors.rs index fe35e6d8..f4d5c828 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,7 +1,6 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. -use std::error; -use std::io::Error as IoError; +use std::{error, io::Error as IoError}; use thiserror::Error; diff --git a/src/file_pipe_log/format.rs b/src/file_pipe_log/format.rs index 58b6afc3..d21c250d 100644 --- a/src/file_pipe_log/format.rs +++ b/src/file_pipe_log/format.rs @@ -2,14 +2,18 @@ //! Representations of objects in filesystem. -use std::io::BufRead; -use std::path::{Path, PathBuf}; +use std::{ + io::BufRead, + path::{Path, PathBuf}, +}; use num_traits::{FromPrimitive, ToPrimitive}; -use crate::codec::{self, NumberEncoder}; -use crate::pipe_log::{FileId, FileSeq, LogQueue, Version}; -use crate::{Error, Result}; +use crate::{ + codec::{self, NumberEncoder}, + pipe_log::{FileId, FileSeq, LogQueue, Version}, + Error, Result, +}; /// Width to format log sequence number. const LOG_SEQ_WIDTH: usize = 16; @@ -210,8 +214,7 @@ impl LogFileFormat { #[cfg(test)] mod tests { use super::*; - use crate::pipe_log::LogFileContext; - use crate::test_util::catch_unwind_silent; + use crate::{pipe_log::LogFileContext, test_util::catch_unwind_silent}; #[test] fn test_check_paddings_is_valid() { diff --git a/src/file_pipe_log/log_file.rs b/src/file_pipe_log/log_file.rs index 8ba92592..618d0778 100644 --- a/src/file_pipe_log/log_file.rs +++ b/src/file_pipe_log/log_file.rs @@ -2,18 +2,21 @@ //! Log file types. -use std::io::{Read, Result as IoResult, Seek, SeekFrom, Write}; -use std::sync::Arc; +use std::{ + io::{Read, Result as IoResult, Seek, SeekFrom, Write}, + sync::Arc, +}; use fail::fail_point; use log::warn; -use crate::env::{FileSystem, Handle, WriteExt}; -use crate::metrics::*; -use crate::pipe_log::FileBlockHandle; -use crate::{Error, Result}; - use super::format::LogFileFormat; +use crate::{ + env::{FileSystem, Handle, WriteExt}, + metrics::*, + pipe_log::FileBlockHandle, + Error, Result, +}; /// Maximum number of bytes to allocate ahead. const FILE_ALLOCATE_SIZE: usize = 2 * 1024 * 1024; diff --git a/src/file_pipe_log/mod.rs b/src/file_pipe_log/mod.rs index 64042e01..ca585e4c 100644 --- a/src/file_pipe_log/mod.rs +++ b/src/file_pipe_log/mod.rs @@ -19,18 +19,23 @@ pub use pipe_builder::{ pub mod debug { //! A set of public utilities used for interacting with log files. - use std::collections::VecDeque; - use std::path::{Path, PathBuf}; - use std::sync::Arc; + use std::{ + collections::VecDeque, + path::{Path, PathBuf}, + sync::Arc, + }; - use crate::env::{FileSystem, Permission}; - use crate::log_batch::LogItem; - use crate::pipe_log::FileId; - use crate::{Error, Result}; - - use super::format::{FileNameExt, LogFileFormat}; - use super::log_file::{LogFileReader, LogFileWriter}; - use super::reader::LogItemBatchFileReader; + use super::{ + format::{FileNameExt, LogFileFormat}, + log_file::{LogFileReader, LogFileWriter}, + reader::LogItemBatchFileReader, + }; + use crate::{ + env::{FileSystem, Permission}, + log_batch::LogItem, + pipe_log::FileId, + Error, Result, + }; /// Opens a log file for write. When `create` is true, the specified file /// will be created first if not exists. @@ -169,13 +174,16 @@ pub mod debug { #[cfg(test)] mod tests { - use super::*; - use crate::env::DefaultFileSystem; - use crate::log_batch::{Command, LogBatch}; - use crate::pipe_log::{FileBlockHandle, LogFileContext, LogQueue, Version}; - use crate::test_util::{generate_entries, PanicGuard}; use raft::eraftpb::Entry; + use super::*; + use crate::{ + env::DefaultFileSystem, + log_batch::{Command, LogBatch}, + pipe_log::{FileBlockHandle, LogFileContext, LogQueue, Version}, + test_util::{generate_entries, PanicGuard}, + }; + #[test] fn test_debug_file_basic() { let dir = tempfile::Builder::new() @@ -212,7 +220,7 @@ pub mod debug { file_system.as_ref(), &file_path, LogFileFormat::default(), - true, /* create */ + true, // create ) .unwrap(); let log_file_format = LogFileContext::new(file_id, Version::default()); @@ -277,7 +285,7 @@ pub mod debug { file_system.as_ref(), &empty_file_path, LogFileFormat::default(), - true, /* create */ + true, // create ) .unwrap(); writer.close().unwrap(); @@ -325,7 +333,7 @@ pub mod debug { file_system.as_ref(), &path, from, - true, /* create */ + true, // create ) .unwrap(); let f = std::fs::OpenOptions::new().write(true).open(&path).unwrap(); @@ -338,7 +346,7 @@ pub mod debug { file_system.as_ref(), &path, to, - false, /* create */ + false, // create ) .unwrap(); writer.close().unwrap(); diff --git a/src/file_pipe_log/pipe.rs b/src/file_pipe_log/pipe.rs index 09bc1f42..25fcba9f 100644 --- a/src/file_pipe_log/pipe.rs +++ b/src/file_pipe_log/pipe.rs @@ -1,28 +1,28 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. -use std::collections::VecDeque; -use std::fs::File as StdFile; -use std::path::PathBuf; -use std::sync::Arc; +use std::{collections::VecDeque, fs::File as StdFile, path::PathBuf, sync::Arc}; use crossbeam::utils::CachePadded; use fail::fail_point; use log::error; use parking_lot::{Mutex, MutexGuard, RwLock}; -use crate::config::Config; -use crate::env::{FileSystem, Permission}; -use crate::errors::is_no_space_err; -use crate::event_listener::EventListener; -use crate::metrics::*; -use crate::pipe_log::{ - FileBlockHandle, FileId, FileSeq, LogFileContext, LogQueue, PipeLog, ReactiveBytes, +use super::{ + format::{build_reserved_file_name, FileNameExt, LogFileFormat}, + log_file::{build_file_reader, build_file_writer, LogFileWriter}, +}; +use crate::{ + config::Config, + env::{FileSystem, Permission}, + errors::is_no_space_err, + event_listener::EventListener, + metrics::*, + perf_context, + pipe_log::{ + FileBlockHandle, FileId, FileSeq, LogFileContext, LogQueue, PipeLog, ReactiveBytes, + }, + Error, Result, }; -use crate::{perf_context, Error, Result}; - -use super::format::{build_reserved_file_name, FileNameExt, LogFileFormat}; -use super::log_file::build_file_reader; -use super::log_file::{build_file_writer, LogFileWriter}; pub type PathId = usize; pub type Paths = Vec; @@ -137,7 +137,7 @@ impl SinglePipe { file_system.as_ref(), f.handle.clone(), f.format, - no_active_files, /* force_reset */ + no_active_files, // force_reset )?, format: f.format, }; @@ -266,7 +266,7 @@ impl SinglePipe { self.file_system.as_ref(), f.handle.clone(), f.format, - true, /* force_reset */ + true, // force_reset )?, format: f.format, }; @@ -566,14 +566,18 @@ pub(crate) fn find_available_dir(paths: &Paths, target_size: usize) -> PathId { #[cfg(test)] mod tests { use std::path::Path; + use tempfile::Builder; - use super::super::format::LogFileFormat; - use super::super::pipe_builder::lock_dir; - use super::*; - use crate::env::{DefaultFileSystem, ObfuscatedFileSystem}; - use crate::pipe_log::Version; - use crate::util::ReadableSize; + use super::{ + super::{format::LogFileFormat, pipe_builder::lock_dir}, + *, + }; + use crate::{ + env::{DefaultFileSystem, ObfuscatedFileSystem}, + pipe_log::Version, + util::ReadableSize, + }; fn new_test_pipe( cfg: &Config, @@ -616,8 +620,10 @@ mod tests { // Only one thread can hold file lock let r2 = new_test_pipes(&cfg); - assert!(format!("{}", r2.err().unwrap()) - .contains("maybe another instance is using this directory")); + assert!( + format!("{}", r2.err().unwrap()) + .contains("maybe another instance is using this directory") + ); } #[test] diff --git a/src/file_pipe_log/pipe_builder.rs b/src/file_pipe_log/pipe_builder.rs index 25ca433c..391cda70 100644 --- a/src/file_pipe_log/pipe_builder.rs +++ b/src/file_pipe_log/pipe_builder.rs @@ -2,34 +2,40 @@ //! Helper types to recover in-memory states from log files. -use std::fs::{self, File as StdFile}; -use std::io::Write; -use std::marker::PhantomData; -use std::path::{Path, PathBuf}; -use std::sync::Arc; -use std::time::Instant; +use std::{ + fs::{self, File as StdFile}, + io::Write, + marker::PhantomData, + path::{Path, PathBuf}, + sync::Arc, + time::Instant, +}; use fs2::FileExt; use log::{error, info, warn}; use rayon::prelude::*; -use crate::config::{Config, RecoveryMode}; -use crate::env::{FileSystem, Handle, Permission}; -use crate::errors::is_no_space_err; -use crate::event_listener::EventListener; -use crate::log_batch::{LogItemBatch, LOG_BATCH_HEADER_LEN}; -use crate::pipe_log::{FileId, FileSeq, LogQueue}; -use crate::util::{Factory, ReadableSize}; -use crate::{Error, Result}; - -use super::format::{ - build_reserved_file_name, lock_file_path, parse_reserved_file_name, FileNameExt, LogFileFormat, +use super::{ + format::{ + build_reserved_file_name, lock_file_path, parse_reserved_file_name, FileNameExt, + LogFileFormat, + }, + log_file::build_file_reader, + pipe::{ + find_available_dir, DualPipes, File, PathId, Paths, SinglePipe, DEFAULT_FIRST_FILE_SEQ, + }, + reader::LogItemBatchFileReader, }; -use super::log_file::build_file_reader; -use super::pipe::{ - find_available_dir, DualPipes, File, PathId, Paths, SinglePipe, DEFAULT_FIRST_FILE_SEQ, +use crate::{ + config::{Config, RecoveryMode}, + env::{FileSystem, Handle, Permission}, + errors::is_no_space_err, + event_listener::EventListener, + log_batch::{LogItemBatch, LOG_BATCH_HEADER_LEN}, + pipe_log::{FileId, FileSeq, LogQueue}, + util::{Factory, ReadableSize}, + Error, Result, }; -use super::reader::LogItemBatchFileReader; /// Maximum size for the buffer for prefilling. const PREFILL_BUFFER_SIZE: usize = ReadableSize::mb(16).0 as usize; diff --git a/src/file_pipe_log/reader.rs b/src/file_pipe_log/reader.rs index 106ba72f..b6cac7d6 100644 --- a/src/file_pipe_log/reader.rs +++ b/src/file_pipe_log/reader.rs @@ -1,13 +1,16 @@ // Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0. -use crate::env::FileSystem; -use crate::log_batch::{LogBatch, LogItemBatch, LOG_BATCH_HEADER_LEN}; -use crate::pipe_log::{FileBlockHandle, FileId, LogFileContext}; -use crate::util::round_up; -use crate::{Error, Result}; - -use super::format::{is_zero_padded, LogFileFormat}; -use super::log_file::LogFileReader; +use super::{ + format::{is_zero_padded, LogFileFormat}, + log_file::LogFileReader, +}; +use crate::{ + env::FileSystem, + log_batch::{LogBatch, LogItemBatch, LOG_BATCH_HEADER_LEN}, + pipe_log::{FileBlockHandle, FileId, LogFileContext}, + util::round_up, + Error, Result, +}; /// A reusable reader over [`LogItemBatch`]s in a log file. pub(super) struct LogItemBatchFileReader { diff --git a/src/filter.rs b/src/filter.rs index f992d788..a159b45d 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -1,21 +1,24 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. -use std::path::Path; -use std::sync::Arc; +use std::{path::Path, sync::Arc}; use hashbrown::HashMap; use rhai::{Engine, Scope, AST}; use scopeguard::{guard, ScopeGuard}; -use crate::env::FileSystem; -use crate::file_pipe_log::debug::{build_file_reader, build_file_writer}; -use crate::file_pipe_log::{FileNameExt, ReplayMachine}; -use crate::log_batch::{ - Command, EntryIndexes, KeyValue, LogBatch, LogItem, LogItemBatch, LogItemContent, OpType, +use crate::{ + env::FileSystem, + file_pipe_log::{ + debug::{build_file_reader, build_file_writer}, + FileNameExt, ReplayMachine, + }, + log_batch::{ + Command, EntryIndexes, KeyValue, LogBatch, LogItem, LogItemBatch, LogItemContent, OpType, + }, + pipe_log::{FileId, LogFileContext, LogQueue}, + util::Factory, + Error, Result, }; -use crate::pipe_log::{FileId, LogFileContext, LogQueue}; -use crate::util::Factory; -use crate::{Error, Result}; /// `FilterResult` determines how to alter the existing log items in /// `RhaiFilterMachine`. @@ -319,7 +322,7 @@ impl RhaiFilterMachine { log_batch.prepare_write(&log_file_context)?; writer.write( log_batch.encoded_bytes(), - usize::MAX, /* target_size_hint */ + usize::MAX, // target_size_hint )?; log_batch.drain(); } @@ -329,7 +332,7 @@ impl RhaiFilterMachine { log_batch.prepare_write(&log_file_context)?; writer.write( log_batch.encoded_bytes(), - usize::MAX, /* target_size_hint */ + usize::MAX, // target_size_hint )?; log_batch.drain(); } diff --git a/src/fork.rs b/src/fork.rs index cab65a92..f2f85e59 100644 --- a/src/fork.rs +++ b/src/fork.rs @@ -1,19 +1,22 @@ // Copyright (c) 2023-present, PingCAP, Inc. Licensed under Apache-2.0. -use std::fs::{copy, create_dir_all}; -use std::path::Path; -use std::sync::Arc; - #[cfg(not(windows))] use std::os::unix::fs::symlink; #[cfg(windows)] use std::os::windows::fs::symlink_file as symlink; - -use crate::config::{Config, RecoveryMode}; -use crate::env::FileSystem; -use crate::file_pipe_log::{FileNameExt, FilePipeLog, FilePipeLogBuilder}; -use crate::pipe_log::{FileId, LogQueue}; -use crate::Engine; +use std::{ + fs::{copy, create_dir_all}, + path::Path, + sync::Arc, +}; + +use crate::{ + config::{Config, RecoveryMode}, + env::FileSystem, + file_pipe_log::{FileNameExt, FilePipeLog, FilePipeLogBuilder}, + pipe_log::{FileId, LogQueue}, + Engine, +}; /// Returned by `Engine::fork`. #[derive(Default)] @@ -105,12 +108,11 @@ where #[cfg(test)] mod tests { - use super::*; - use crate::engine::tests::RaftLogEngine; - use crate::env::DefaultFileSystem; - use crate::{LogBatch, ReadableSize}; use std::path::PathBuf; + use super::*; + use crate::{engine::tests::RaftLogEngine, env::DefaultFileSystem, LogBatch, ReadableSize}; + #[test] fn test_fork() { let dir = tempfile::Builder::new() diff --git a/src/lib.rs b/src/lib.rs index f282f6cc..40306ddb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -77,14 +77,11 @@ pub use util::ReadableSize; pub mod internals { //! A selective view of key components in Raft Engine. Exported under the //! `internals` feature only. - pub use crate::event_listener::*; - pub use crate::file_pipe_log::*; - pub use crate::memtable::*; - pub use crate::pipe_log::*; - pub use crate::purge::*; #[cfg(feature = "swap")] pub use crate::swappy_allocator::*; - pub use crate::write_barrier::*; + pub use crate::{ + event_listener::*, file_pipe_log::*, memtable::*, pipe_log::*, purge::*, write_barrier::*, + }; } use std::sync::atomic::{AtomicUsize, Ordering}; @@ -219,9 +216,10 @@ pub(crate) fn is_internal_key(s: &[u8], ext: Option<&[u8]>) -> bool { #[cfg(test)] mod tests { - use crate::log_batch::MessageExt; use raft::eraftpb::Entry; + use crate::log_batch::MessageExt; + #[ctor::ctor] fn init() { env_logger::init(); diff --git a/src/log_batch.rs b/src/log_batch.rs index c6ce147c..4e09a163 100644 --- a/src/log_batch.rs +++ b/src/log_batch.rs @@ -1,10 +1,15 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. -use std::fmt::Debug; -use std::io::BufRead; -use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::Arc; -use std::{mem, u64}; +use std::{ + fmt::Debug, + io::BufRead, + mem, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + u64, +}; use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt}; use log::error; @@ -12,12 +17,15 @@ use num_derive::FromPrimitive; use num_traits::FromPrimitive; use protobuf::Message; -use crate::codec::{self, NumberEncoder}; -use crate::memtable::EntryIndex; -use crate::metrics::StopWatch; -use crate::pipe_log::{FileBlockHandle, FileId, LogFileContext, ReactiveBytes}; -use crate::util::{crc32, lz4}; -use crate::{perf_context, Error, Result}; +use crate::{ + codec::{self, NumberEncoder}, + memtable::EntryIndex, + metrics::StopWatch, + perf_context, + pipe_log::{FileBlockHandle, FileId, LogFileContext, ReactiveBytes}, + util::{crc32, lz4}, + Error, Result, +}; pub(crate) const LOG_BATCH_HEADER_LEN: usize = 16; pub(crate) const LOG_BATCH_CHECKSUM_LEN: usize = 4; @@ -152,8 +160,8 @@ impl Command { fn approximate_size(&self) -> usize { match &self { - Command::Clean => 1, /* type */ - Command::Compact { .. } => 1 + 8, /* type + index */ + Command::Clean => 1, // type + Command::Compact { .. } => 1 + 8, // type + index } } } @@ -1109,13 +1117,16 @@ impl AtomicGroupBuilder { #[cfg(test)] mod tests { - use super::*; - use crate::pipe_log::{LogQueue, Version}; - use crate::test_util::{catch_unwind_silent, generate_entries, generate_entry_indexes_opt}; use protobuf::parse_from_bytes; use raft::eraftpb::Entry; use strum::IntoEnumIterator; + use super::*; + use crate::{ + pipe_log::{LogQueue, Version}, + test_util::{catch_unwind_silent, generate_entries, generate_entry_indexes_opt}, + }; + fn decode_entries_from_bytes( buf: &[u8], entry_indexes: &[EntryIndex], @@ -1595,28 +1606,34 @@ mod tests { let mut copy = encoded.to_owned(); copy.truncate(LOG_BATCH_HEADER_LEN - 1); - assert!(LogBatch::decode_header(&mut copy.as_slice()) - .unwrap_err() - .to_string() - .contains("Log batch header too short")); + assert!( + LogBatch::decode_header(&mut copy.as_slice()) + .unwrap_err() + .to_string() + .contains("Log batch header too short") + ); let mut copy = encoded.to_owned(); (&mut copy[LOG_BATCH_HEADER_LEN - 8..LOG_BATCH_HEADER_LEN]) .write_u64::(encoded.len() as u64 + 1) .unwrap(); - assert!(LogBatch::decode_header(&mut copy.as_slice()) - .unwrap_err() - .to_string() - .contains("Log item offset exceeds log batch length")); + assert!( + LogBatch::decode_header(&mut copy.as_slice()) + .unwrap_err() + .to_string() + .contains("Log item offset exceeds log batch length") + ); let mut copy = encoded.to_owned(); (&mut copy[LOG_BATCH_HEADER_LEN - 8..LOG_BATCH_HEADER_LEN]) .write_u64::(LOG_BATCH_HEADER_LEN as u64 - 1) .unwrap(); - assert!(LogBatch::decode_header(&mut copy.as_slice()) - .unwrap_err() - .to_string() - .contains("Log item offset is smaller than log batch header length")); + assert!( + LogBatch::decode_header(&mut copy.as_slice()) + .unwrap_err() + .to_string() + .contains("Log item offset is smaller than log batch header length") + ); } #[cfg(feature = "nightly")] @@ -1687,7 +1704,7 @@ mod tests { let encoded = batch.encoded_bytes(); assert_eq!(encoded.len(), len); let mut bytes_slice = encoded; - let (offset, _, _) = LogBatch::decode_header(&mut bytes_slice).unwrap(); + let (offset, ..) = LogBatch::decode_header(&mut bytes_slice).unwrap(); let expected = verify_checksum_with_signature(&encoded[offset..], file_context.get_signature()) .unwrap(); diff --git a/src/memtable.rs b/src/memtable.rs index 7a0ea41b..4d726eb5 100644 --- a/src/memtable.rs +++ b/src/memtable.rs @@ -1,32 +1,36 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. -use std::borrow::BorrowMut; -use std::collections::{BTreeMap, HashSet, VecDeque}; -use std::marker::PhantomData; -use std::ops::Bound; -use std::sync::Arc; +use std::{ + borrow::BorrowMut, + collections::{BTreeMap, HashSet, VecDeque}, + marker::PhantomData, + ops::Bound, + sync::Arc, +}; use fail::fail_point; use hashbrown::HashMap; use log::{error, warn}; use parking_lot::{Mutex, RwLock}; -use crate::config::Config; -use crate::file_pipe_log::ReplayMachine; -use crate::log_batch::{ - AtomicGroupStatus, Command, CompressionType, KeyValue, LogBatch, LogItem, LogItemBatch, - LogItemContent, OpType, +use crate::{ + config::Config, + file_pipe_log::ReplayMachine, + log_batch::{ + AtomicGroupStatus, Command, CompressionType, KeyValue, LogBatch, LogItem, LogItemBatch, + LogItemContent, OpType, + }, + metrics::MEMORY_USAGE, + pipe_log::{FileBlockHandle, FileId, FileSeq, LogQueue}, + util::{hash_u64, Factory}, + Error, GlobalStats, Result, }; -use crate::metrics::MEMORY_USAGE; -use crate::pipe_log::{FileBlockHandle, FileId, FileSeq, LogQueue}; -use crate::util::{hash_u64, Factory}; -use crate::{Error, GlobalStats, Result}; #[cfg(feature = "swap")] mod swap_conditional_imports { + use std::{convert::TryFrom, path::Path}; + use crate::swappy_allocator::SwappyAllocator; - use std::convert::TryFrom; - use std::path::Path; pub trait AllocatorTrait: std::alloc::Allocator + Clone + Send + Sync {} impl AllocatorTrait for T {} @@ -209,10 +213,10 @@ impl MemTable { rhs_first, // Rewrite -> Compact Append -> Rewrite. // TODO: add test case. - rhs.rewrite_count > 0, /* allow_hole */ + rhs.rewrite_count > 0, // allow_hole // Always true, because `self` might not have all entries in // history. - true, /* allow_overwrite */ + true, // allow_overwrite ); self.global_stats.add( rhs.entry_indexes[0].entries.unwrap().id.queue, @@ -252,9 +256,9 @@ impl MemTable { // FIXME: It's possibly okay to set it to false. Any compact // command applied to append queue will also be applied to // rewrite queue. - true, /* allow_hole */ + true, // allow_hole // Compact -> Rewrite -> Data loss of the compact command. - true, /* allow_overwrite */ + true, // allow_overwrite ); self.global_stats.add( rhs.entry_indexes[0].entries.unwrap().id.queue, @@ -384,8 +388,8 @@ impl MemTable { if len > 0 { self.prepare_append( entry_indexes[0].index, - false, /* allow_hole */ - false, /* allow_overwrite */ + false, // allow_hole + false, // allow_overwrite ); self.global_stats.add(LogQueue::Append, len); for ei in &entry_indexes { @@ -404,9 +408,9 @@ impl MemTable { debug_assert_eq!(self.rewrite_count, 0); self.prepare_append( entry_indexes[0].index, - false, /* allow_hole */ + false, // allow_hole // Refer to case in `merge_newer_neighbor`. - true, /* allow_overwrite */ + true, // allow_overwrite ); self.global_stats.add(LogQueue::Append, len); for ei in &entry_indexes { @@ -506,11 +510,11 @@ impl MemTable { self.prepare_append( entry_indexes[0].index, // Rewrite -> Compact Append -> Rewrite. - true, /* allow_hole */ + true, // allow_hole // Refer to case in `merge_append_table`. They can be adapted // to attack this path via a global rewrite without deleting // obsolete rewrite files. - true, /* allow_overwrite */ + true, // allow_overwrite ); self.global_stats.add(LogQueue::Rewrite, len); for ei in &entry_indexes { @@ -1855,21 +1859,27 @@ mod tests { memtable.consistency_check(); let mut ents_idx = vec![]; - assert!(memtable - .fetch_entry_indexes_before(2, &mut ents_idx) - .is_ok()); + assert!( + memtable + .fetch_entry_indexes_before(2, &mut ents_idx) + .is_ok() + ); assert_eq!(ents_idx.len(), 10); assert_eq!(ents_idx.last().unwrap().index, 19); ents_idx.clear(); - assert!(memtable - .fetch_entry_indexes_before(1, &mut ents_idx) - .is_ok()); + assert!( + memtable + .fetch_entry_indexes_before(1, &mut ents_idx) + .is_ok() + ); assert!(ents_idx.is_empty()); ents_idx.clear(); - assert!(memtable - .fetch_rewritten_entry_indexes(&mut ents_idx) - .is_ok()); + assert!( + memtable + .fetch_rewritten_entry_indexes(&mut ents_idx) + .is_ok() + ); assert_eq!(ents_idx.len(), 10); assert_eq!(ents_idx.first().unwrap().index, 0); assert_eq!(ents_idx.last().unwrap().index, 9); diff --git a/src/metrics.rs b/src/metrics.rs index 6ca10940..f7300680 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -107,7 +107,7 @@ where #[macro_export] macro_rules! perf_context { - ($field: ident) => { + ($field:ident) => { $crate::metrics::PerfContextField::new(|perf_context| &mut perf_context.$field) }; } diff --git a/src/pipe_log.rs b/src/pipe_log.rs index 33ca4071..58995a32 100644 --- a/src/pipe_log.rs +++ b/src/pipe_log.rs @@ -2,8 +2,10 @@ //! A generic log storage. -use std::cmp::Ordering; -use std::fmt::{self, Display}; +use std::{ + cmp::Ordering, + fmt::{self, Display}, +}; use fail::fail_point; use num_derive::{FromPrimitive, ToPrimitive}; diff --git a/src/purge.rs b/src/purge.rs index b1183438..62d67daf 100644 --- a/src/purge.rs +++ b/src/purge.rs @@ -1,23 +1,28 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. -use std::collections::VecDeque; -use std::collections::{HashMap, HashSet}; -use std::mem; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; +use std::{ + collections::{HashMap, HashSet, VecDeque}, + mem, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, +}; use fail::fail_point; use log::{info, warn}; use parking_lot::{Mutex, RwLock}; -use crate::config::Config; -use crate::engine::read_entry_bytes_from_file; -use crate::event_listener::EventListener; -use crate::log_batch::{AtomicGroupBuilder, LogBatch}; -use crate::memtable::{MemTableHandle, MemTables}; -use crate::metrics::*; -use crate::pipe_log::{FileBlockHandle, FileId, FileSeq, LogQueue, PipeLog}; -use crate::{GlobalStats, Result}; +use crate::{ + config::Config, + engine::read_entry_bytes_from_file, + event_listener::EventListener, + log_batch::{AtomicGroupBuilder, LogBatch}, + memtable::{MemTableHandle, MemTables}, + metrics::*, + pipe_log::{FileBlockHandle, FileId, FileSeq, LogQueue, PipeLog}, + GlobalStats, Result, +}; // Force compact region with oldest 20% logs. const FORCE_COMPACT_RATIO: f64 = 0.2; @@ -293,8 +298,8 @@ where let mut log_batch = self.memtables.take_cleaned_region_logs(); self.rewrite_impl( &mut log_batch, - None, /* rewrite_watermark */ - true, /* sync */ + None, // rewrite_watermark + true, // sync )?; Ok(()) } diff --git a/src/swappy_allocator.rs b/src/swappy_allocator.rs index 8baa4835..e20f62ce 100644 --- a/src/swappy_allocator.rs +++ b/src/swappy_allocator.rs @@ -2,13 +2,17 @@ //! # Swappy Allocator -use std::alloc::{AllocError, Allocator, Global, Layout}; -use std::fs::{File, OpenOptions}; -use std::path::{Path, PathBuf}; -use std::ptr::{self, NonNull}; -use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}; -use std::sync::Arc; -use std::vec::Vec; +use std::{ + alloc::{AllocError, Allocator, Global, Layout}, + fs::{File, OpenOptions}, + path::{Path, PathBuf}, + ptr::{self, NonNull}, + sync::{ + atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}, + Arc, + }, + vec::Vec, +}; use log::{error, warn}; use memmap2::MmapMut; @@ -653,10 +657,12 @@ mod tests { { let mut vec: Vec = Vec::new_in(allocator.clone()); global.set_err_mode(true); - assert!(catch_unwind_silent(|| { - vec.resize(16, 0); - }) - .is_err()); + assert!( + catch_unwind_silent(|| { + vec.resize(16, 0); + }) + .is_err() + ); assert_eq!(allocator.memory_usage(), 0); global.set_err_mode(false); vec.resize(16, 0); @@ -668,10 +674,12 @@ mod tests { vec.resize(16, 0); assert_eq!(allocator.memory_usage(), 16); global.set_err_mode(true); - assert!(catch_unwind_silent(|| { - vec.resize(32, 0); - }) - .is_err()); + assert!( + catch_unwind_silent(|| { + vec.resize(32, 0); + }) + .is_err() + ); assert_eq!(allocator.memory_usage(), 16); global.set_err_mode(false); vec.resize(32, 0); @@ -684,10 +692,12 @@ mod tests { assert_eq!(allocator.memory_usage(), 32); global.set_err_mode(true); vec.resize(16, 0); - assert!(catch_unwind_silent(|| { - vec.shrink_to_fit(); - }) - .is_err()); + assert!( + catch_unwind_silent(|| { + vec.shrink_to_fit(); + }) + .is_err() + ); assert_eq!(allocator.memory_usage(), 32); global.set_err_mode(false); vec.shrink_to_fit(); @@ -1097,10 +1107,12 @@ mod tests { v.push_front(D(1, false)); v.push_front(D(0, false)); - assert!(catch_unwind_silent(|| { - v.drain(1..=4); - }) - .is_err()); + assert!( + catch_unwind_silent(|| { + v.drain(1..=4); + }) + .is_err() + ); assert_eq!(unsafe { DROPS }, 4); assert_eq!(v.len(), 3); diff --git a/src/util.rs b/src/util.rs index 2e35a83e..3784e895 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,13 +1,17 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. -use std::fmt::{self, Display, Write}; -use std::ops::{Div, Mul}; -use std::str::FromStr; -use std::time::{Duration, Instant}; +use std::{ + fmt::{self, Display, Write}, + ops::{Div, Mul}, + str::FromStr, + time::{Duration, Instant}, +}; use crc32fast::Hasher; -use serde::de::{self, Unexpected, Visitor}; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use serde::{ + de::{self, Unexpected, Visitor}, + Deserialize, Deserializer, Serialize, Serializer, +}; const UNIT: u64 = 1; @@ -220,9 +224,10 @@ pub fn unhash_u64(mut i: u64) -> u64 { } pub mod lz4 { - use crate::{Error, Result}; use std::{i32, ptr}; + use crate::{Error, Result}; + pub const DEFAULT_LZ4_COMPRESSION_LEVEL: usize = 1; /// Compress content in `buf[skip..]`, and append output to `buf`. diff --git a/src/write_barrier.rs b/src/write_barrier.rs index 3d365456..fad43c12 100644 --- a/src/write_barrier.rs +++ b/src/write_barrier.rs @@ -5,10 +5,7 @@ //! This module relies heavily on unsafe codes. Extra call site constraints are //! required to maintain memory safety. Use it with great caution. -use std::cell::Cell; -use std::marker::PhantomData; -use std::ptr::NonNull; -use std::time::Instant; +use std::{cell::Cell, marker::PhantomData, ptr::NonNull, time::Instant}; use fail::fail_point; use parking_lot::{Condvar, Mutex}; @@ -228,11 +225,13 @@ impl WriteBarrier { #[cfg(test)] mod tests { + use std::{ + sync::{mpsc, Arc, Barrier}, + thread::{self, Builder as ThreadBuilder}, + time::Duration, + }; + use super::*; - use std::sync::mpsc; - use std::sync::{Arc, Barrier}; - use std::thread::{self, Builder as ThreadBuilder}; - use std::time::Duration; #[test] fn test_sequential_groups() { diff --git a/stress/src/main.rs b/stress/src/main.rs index 381fad3e..aa32552c 100644 --- a/stress/src/main.rs +++ b/stress/src/main.rs @@ -2,11 +2,15 @@ extern crate hdrhistogram; -use std::str::FromStr; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::Arc; -use std::thread::{sleep, Builder as ThreadBuilder, JoinHandle}; -use std::time::{Duration, Instant}; +use std::{ + str::FromStr, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, + }, + thread::{sleep, Builder as ThreadBuilder, JoinHandle}, + time::{Duration, Instant}, +}; use clap::{crate_authors, crate_version, Parser}; use const_format::formatcp; @@ -14,8 +18,10 @@ use hdrhistogram::Histogram; use num_traits::FromPrimitive; use parking_lot_core::SpinWait; use raft::eraftpb::Entry; -use raft_engine::internals::{EventListener, FileBlockHandle}; -use raft_engine::{Command, Config, Engine, LogBatch, MessageExt, ReadableSize, Version}; +use raft_engine::{ + internals::{EventListener, FileBlockHandle}, + Command, Config, Engine, LogBatch, MessageExt, ReadableSize, Version, +}; use rand::{thread_rng, Rng, RngCore}; type WriteBatch = LogBatch; diff --git a/tests/benches/bench_recovery.rs b/tests/benches/bench_recovery.rs index 55b42c22..258ddf1e 100644 --- a/tests/benches/bench_recovery.rs +++ b/tests/benches/bench_recovery.rs @@ -1,13 +1,11 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. +use std::{collections::HashMap, fmt, path::PathBuf}; + use criterion::{criterion_group, BenchmarkId, Criterion}; use raft::eraftpb::Entry; -use raft_engine::ReadableSize; -use raft_engine::{Config as EngineConfig, Engine, LogBatch, MessageExt, Result}; +use raft_engine::{Config as EngineConfig, Engine, LogBatch, MessageExt, ReadableSize, Result}; use rand::{Rng, SeedableRng}; -use std::collections::HashMap; -use std::fmt; -use std::path::PathBuf; use tempfile::TempDir; #[derive(Clone)] @@ -44,7 +42,9 @@ impl Default for Config { impl fmt::Display for Config { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{} [region-count: {}][batch-size: {}][item-size: {}][entry-size: {}][batch-compression-threshold: {}]", + write!( + f, + "{} [region-count: {}][batch-size: {}][item-size: {}][entry-size: {}][batch-compression-threshold: {}]", self.total_size, self.region_count, self.batch_size, diff --git a/tests/failpoints/test_engine.rs b/tests/failpoints/test_engine.rs index 8d74b70b..3ac0b5d2 100644 --- a/tests/failpoints/test_engine.rs +++ b/tests/failpoints/test_engine.rs @@ -1,15 +1,21 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; -use std::sync::{Arc, Barrier}; -use std::time::Duration; +use std::{ + sync::{ + atomic::{AtomicU64, AtomicUsize, Ordering}, + Arc, Barrier, + }, + time::Duration, +}; use fail::FailGuard; use kvproto::raft_serverpb::RaftLocalState; use raft::eraftpb::Entry; -use raft_engine::env::{FileSystem, ObfuscatedFileSystem}; -use raft_engine::internals::*; -use raft_engine::*; +use raft_engine::{ + env::{FileSystem, ObfuscatedFileSystem}, + internals::*, + *, +}; use crate::util::*; @@ -258,10 +264,10 @@ fn test_concurrent_write_empty_log_batch() { let mut entries = Vec::new(); engine .fetch_entries_to::( - 1, /* region */ - 0, /* begin */ - 2, /* end */ - None, /* max_size */ + 1, // region + 0, // begin + 2, // end + None, // max_size &mut entries, ) .unwrap(); @@ -269,10 +275,10 @@ fn test_concurrent_write_empty_log_batch() { entries.clear(); engine .fetch_entries_to::( - 2, /* region */ - 0, /* begin */ - 2, /* end */ - None, /* max_size */ + 2, // region + 0, // begin + 2, // end + None, // max_size &mut entries, ) .unwrap(); @@ -678,14 +684,16 @@ fn test_recycle_with_stale_logbatch_at_tail() { // Causing the final log file is a recycled file, containing rewritten // LogBatchs and end with stale LogBatchs, `Engine::open(...)` should // `panic` when recovering the relate `Memtable`. - assert!(catch_unwind_silent(|| { - let cfg_v2 = Config { - format_version: Version::V2, - ..cfg_err - }; - Engine::open(cfg_v2) - }) - .is_err()); + assert!( + catch_unwind_silent(|| { + let cfg_v2 = Config { + format_version: Version::V2, + ..cfg_err + }; + Engine::open(cfg_v2) + }) + .is_err() + ); } #[test] diff --git a/tests/failpoints/test_io_error.rs b/tests/failpoints/test_io_error.rs index d24ab049..c43e8f66 100644 --- a/tests/failpoints/test_io_error.rs +++ b/tests/failpoints/test_io_error.rs @@ -4,9 +4,7 @@ use std::sync::Arc; use fail::FailGuard; use raft::eraftpb::Entry; -use raft_engine::env::ObfuscatedFileSystem; -use raft_engine::internals::*; -use raft_engine::*; +use raft_engine::{env::ObfuscatedFileSystem, internals::*, *}; use crate::util::*; @@ -105,10 +103,12 @@ fn test_file_write_error() { engine .write(&mut generate_batch(1, 2, 3, Some(&entry)), false) .unwrap(); - assert!(catch_unwind_silent(|| { - let _ = engine.write(&mut generate_batch(1, 3, 4, Some(&entry)), true); - }) - .is_err()); + assert!( + catch_unwind_silent(|| { + let _ = engine.write(&mut generate_batch(1, 3, 4, Some(&entry)), true); + }) + .is_err() + ); } // Internal states are consistent after panics. But outstanding writes are not @@ -156,37 +156,45 @@ fn test_file_rotate_error() { { // Fail to sync old log file. let _f = FailGuard::new("log_fd::sync::err", "return"); - assert!(catch_unwind_silent(|| { - let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false); - }) - .is_err()); + assert!( + catch_unwind_silent(|| { + let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false); + }) + .is_err() + ); assert_eq!(engine.file_span(LogQueue::Append).1, 1); } { // Fail to create new log file. let _f = FailGuard::new("default_fs::create::err", "return"); - assert!(catch_unwind_silent(|| { - let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false); - }) - .is_err()); + assert!( + catch_unwind_silent(|| { + let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false); + }) + .is_err() + ); assert_eq!(engine.file_span(LogQueue::Append).1, 1); } { // Fail to write header of new log file. let _f = FailGuard::new("log_file::write::err", "1*off->return"); - assert!(catch_unwind_silent(|| { - let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false); - }) - .is_err()); + assert!( + catch_unwind_silent(|| { + let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false); + }) + .is_err() + ); assert_eq!(engine.file_span(LogQueue::Append).1, 1); } { // Fail to sync new log file. The old log file is already sync-ed at this point. let _f = FailGuard::new("log_fd::sync::err", "return"); - assert!(catch_unwind_silent(|| { - let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false); - }) - .is_err()); + assert!( + catch_unwind_silent(|| { + let _ = engine.write(&mut generate_batch(1, 4, 5, Some(&entry)), false); + }) + .is_err() + ); assert_eq!(engine.file_span(LogQueue::Append).1, 1); } @@ -335,12 +343,14 @@ fn test_non_atomic_write_error() { let engine = Engine::open_with_file_system(cfg.clone(), fs.clone()).unwrap(); let _f1 = FailGuard::new("log_file::write::err", "return"); let _f2 = FailGuard::new("log_file::seek::err", "return"); - assert!(catch_unwind_silent(|| { - engine - .write(&mut generate_batch(rid, 6, 7, Some(&entry)), true) - .unwrap_err(); - }) - .is_err()); + assert!( + catch_unwind_silent(|| { + engine + .write(&mut generate_batch(rid, 6, 7, Some(&entry)), true) + .unwrap_err(); + }) + .is_err() + ); } { let engine = Engine::open_with_file_system(cfg, fs).unwrap(); @@ -535,12 +545,14 @@ fn test_no_space_write_error() { }; let engine = Engine::open(cfg_err).unwrap(); let _f = FailGuard::new("log_fd::write::no_space_err", "return"); - assert!(catch_unwind_silent(|| { - engine - .write(&mut generate_batch(2, 11, 21, Some(&entry)), true) - .unwrap_err(); - }) - .is_err()); + assert!( + catch_unwind_silent(|| { + engine + .write(&mut generate_batch(2, 11, 21, Some(&entry)), true) + .unwrap_err(); + }) + .is_err() + ); assert_eq!( 0, engine @@ -554,12 +566,14 @@ fn test_no_space_write_error() { let _f1 = FailGuard::new("log_fd::write::no_space_err", "2*return->off"); let _f2 = FailGuard::new("file_pipe_log::force_choose_dir", "return"); // The first write should fail, because all dirs run out of space for writing. - assert!(catch_unwind_silent(|| { - engine - .write(&mut generate_batch(2, 11, 21, Some(&entry)), true) - .unwrap_err(); - }) - .is_err()); + assert!( + catch_unwind_silent(|| { + engine + .write(&mut generate_batch(2, 11, 21, Some(&entry)), true) + .unwrap_err(); + }) + .is_err() + ); assert_eq!( 0, engine @@ -612,9 +626,11 @@ fn test_no_space_write_error() { "log_fd::write::no_space_err", "1*return->1*off->1*return->1*off", ); - assert!(engine - .write(&mut generate_batch(7, 11, 21, Some(&entry)), true) - .is_err()); + assert!( + engine + .write(&mut generate_batch(7, 11, 21, Some(&entry)), true) + .is_err() + ); assert_eq!( 0, engine diff --git a/tests/failpoints/util.rs b/tests/failpoints/util.rs index 188f66d7..ebb10814 100644 --- a/tests/failpoints/util.rs +++ b/tests/failpoints/util.rs @@ -1,11 +1,12 @@ // Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. -use std::panic::{self, AssertUnwindSafe}; -use std::sync::{mpsc, Arc}; +use std::{ + panic::{self, AssertUnwindSafe}, + sync::{mpsc, Arc}, +}; use raft::eraftpb::Entry; -use raft_engine::env::FileSystem; -use raft_engine::{Engine, LogBatch, MessageExt}; +use raft_engine::{env::FileSystem, Engine, LogBatch, MessageExt}; #[derive(Clone)] pub struct MessageExtTyped;