diff --git a/lumni/Cargo.toml b/lumni/Cargo.toml index 68a67ab4..9a5df617 100644 --- a/lumni/Cargo.toml +++ b/lumni/Cargo.toml @@ -49,6 +49,8 @@ syntect = { version = "5.2.0", default-features = false, features = ["parsing", crc32fast = { version = "1.4" } rusqlite = { version = "0.31" } lazy_static = { version = "1.5" } +rayon = { version = "1.10" } +crossbeam-channel = { version = "0.5" } # CLI env_logger = { version = "0.9", optional = true } diff --git a/lumni/src/localfs/list.rs b/lumni/src/localfs/list.rs index 54b004cf..22df6932 100644 --- a/lumni/src/localfs/list.rs +++ b/lumni/src/localfs/list.rs @@ -1,8 +1,10 @@ -use std::collections::HashMap; use std::fs; use std::path::Path; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::collections::HashMap; +use rayon::prelude::*; +use crossbeam_channel::{bounded, Sender}; -use super::bucket::{FileSystem, LocalFileSystem}; use crate::table::{FileObjectTable, TableColumnValue}; use crate::{FileObject, FileObjectFilter}; @@ -14,69 +16,75 @@ pub async fn list_files( filter: &Option, table: &mut FileObjectTable, ) { - list_files_next(path, selected_columns, max_keys, recursive, filter, table) - .await; + println!("Selected columns: {:?}", selected_columns); + + let max_count = max_keys.map(|m| m as usize).unwrap_or(usize::MAX); + let count = AtomicUsize::new(0); + let (sender, receiver) = bounded(500); + + let path_buf = path.to_path_buf(); // Clone the path + + // Spawn a thread to process entries + std::thread::spawn(move || { + process_directory(&path_buf, recursive, &count, max_count, &sender); + }); + + let rows: Vec<_> = receiver + .into_iter() + .filter_map(|entry| process_entry(&entry, filter, selected_columns)) + .take(max_count) + .collect(); + + // Batch insert all rows at once + if !rows.is_empty() { + let _ = table.add_rows(rows).await; + } } -async fn list_files_next( +fn process_directory( path: &Path, - selected_columns: &Option>, - max_keys: Option, recursive: bool, - filter: &Option, - table: &mut FileObjectTable, + count: &AtomicUsize, + max_count: usize, + sender: &Sender, ) { - let fs = &LocalFileSystem; - let mut directory_stack = vec![path.to_owned()]; - let mut object_count = 0usize; - - println!("Selected columns: {:?}", selected_columns); - while let Some(current_path) = directory_stack.pop() { - let mut temp_rows = Vec::new(); - - if let Ok(entries) = fs.read_dir(¤t_path) { - for entry in entries.flatten() { - if max_keys.map_or(false, |max| object_count >= max as usize) { - break; // Stop processing more entries - } + if count.load(Ordering::Relaxed) >= max_count { + return; + } - let metadata = match entry.metadata() { - Ok(md) => md, - Err(_) => continue, - }; - - if metadata.is_file() { - if let Some(row_data) = - handle_file(&entry, filter, selected_columns) - { - temp_rows.push(row_data); - object_count += 1; - } - } else if metadata.is_dir() { - // Only add directory object when no filter is provided - if filter.is_none() { - if let Some(row_data) = - handle_directory(&entry, selected_columns) - { - temp_rows.push(row_data); - object_count += 1; - } - } + if let Ok(entries) = fs::read_dir(path) { + entries.par_bridge().for_each(|entry| { + if count.load(Ordering::Relaxed) >= max_count { + return; + } - if recursive { - directory_stack.push(entry.path()); + if let Ok(entry) = entry { + if let Ok(file_type) = entry.file_type() { + if file_type.is_file() { + count.fetch_add(1, Ordering::Relaxed); + let _ = sender.send(entry); + } else if file_type.is_dir() && recursive { + process_directory(&entry.path(), recursive, count, max_count, sender); } } } - } - if !temp_rows.is_empty() { - let _ = table.add_rows(temp_rows).await; - } + }); + } +} - // Exit the loop early if the max_keys limit has been reached - if max_keys.map_or(false, |max| object_count >= max as usize) { - break; - } +fn process_entry( + entry: &fs::DirEntry, + filter: &Option, + selected_columns: &Option>, +) -> Option> { + let metadata = entry.metadata().ok()?; + + if metadata.is_file() { + handle_file(entry, filter, selected_columns) + } else if metadata.is_dir() && filter.is_none() { + handle_directory(entry, selected_columns) + } else { + None } } @@ -185,3 +193,4 @@ fn handle_file( Some(row_data) } } +