Skip to content

Commit

Permalink
parallel version for file-listing
Browse files Browse the repository at this point in the history
  • Loading branch information
aprxi committed Jul 25, 2024
1 parent 6ca4dd3 commit da4a6ef
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 54 deletions.
2 changes: 2 additions & 0 deletions lumni/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ syntect = { version = "5.2.0", default-features = false, features = ["parsing",
crc32fast = { version = "1.4" }
rusqlite = { version = "0.31" }
lazy_static = { version = "1.5" }
rayon = { version = "1.10" }
crossbeam-channel = { version = "0.5" }

# CLI
env_logger = { version = "0.9", optional = true }
Expand Down
117 changes: 63 additions & 54 deletions lumni/src/localfs/list.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::collections::HashMap;
use std::fs;
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::collections::HashMap;
use rayon::prelude::*;
use crossbeam_channel::{bounded, Sender};

use super::bucket::{FileSystem, LocalFileSystem};
use crate::table::{FileObjectTable, TableColumnValue};
use crate::{FileObject, FileObjectFilter};

Expand All @@ -14,69 +16,75 @@ pub async fn list_files(
filter: &Option<FileObjectFilter>,
table: &mut FileObjectTable,
) {
list_files_next(path, selected_columns, max_keys, recursive, filter, table)
.await;
println!("Selected columns: {:?}", selected_columns);

let max_count = max_keys.map(|m| m as usize).unwrap_or(usize::MAX);
let count = AtomicUsize::new(0);
let (sender, receiver) = bounded(500);

let path_buf = path.to_path_buf(); // Clone the path

// Spawn a thread to process entries
std::thread::spawn(move || {
process_directory(&path_buf, recursive, &count, max_count, &sender);
});

let rows: Vec<_> = receiver
.into_iter()
.filter_map(|entry| process_entry(&entry, filter, selected_columns))
.take(max_count)
.collect();

// Batch insert all rows at once
if !rows.is_empty() {
let _ = table.add_rows(rows).await;
}
}

async fn list_files_next(
fn process_directory(
path: &Path,
selected_columns: &Option<Vec<&str>>,
max_keys: Option<u32>,
recursive: bool,
filter: &Option<FileObjectFilter>,
table: &mut FileObjectTable,
count: &AtomicUsize,
max_count: usize,
sender: &Sender<fs::DirEntry>,
) {
let fs = &LocalFileSystem;
let mut directory_stack = vec![path.to_owned()];
let mut object_count = 0usize;

println!("Selected columns: {:?}", selected_columns);
while let Some(current_path) = directory_stack.pop() {
let mut temp_rows = Vec::new();

if let Ok(entries) = fs.read_dir(&current_path) {
for entry in entries.flatten() {
if max_keys.map_or(false, |max| object_count >= max as usize) {
break; // Stop processing more entries
}
if count.load(Ordering::Relaxed) >= max_count {
return;
}

let metadata = match entry.metadata() {
Ok(md) => md,
Err(_) => continue,
};

if metadata.is_file() {
if let Some(row_data) =
handle_file(&entry, filter, selected_columns)
{
temp_rows.push(row_data);
object_count += 1;
}
} else if metadata.is_dir() {
// Only add directory object when no filter is provided
if filter.is_none() {
if let Some(row_data) =
handle_directory(&entry, selected_columns)
{
temp_rows.push(row_data);
object_count += 1;
}
}
if let Ok(entries) = fs::read_dir(path) {
entries.par_bridge().for_each(|entry| {
if count.load(Ordering::Relaxed) >= max_count {
return;
}

if recursive {
directory_stack.push(entry.path());
if let Ok(entry) = entry {
if let Ok(file_type) = entry.file_type() {
if file_type.is_file() {
count.fetch_add(1, Ordering::Relaxed);
let _ = sender.send(entry);
} else if file_type.is_dir() && recursive {
process_directory(&entry.path(), recursive, count, max_count, sender);
}
}
}
}
if !temp_rows.is_empty() {
let _ = table.add_rows(temp_rows).await;
}
});
}
}

// Exit the loop early if the max_keys limit has been reached
if max_keys.map_or(false, |max| object_count >= max as usize) {
break;
}
fn process_entry(
entry: &fs::DirEntry,
filter: &Option<FileObjectFilter>,
selected_columns: &Option<Vec<&str>>,
) -> Option<HashMap<String, TableColumnValue>> {
let metadata = entry.metadata().ok()?;

if metadata.is_file() {
handle_file(entry, filter, selected_columns)
} else if metadata.is_dir() && filter.is_none() {
handle_directory(entry, selected_columns)
} else {
None
}
}

Expand Down Expand Up @@ -185,3 +193,4 @@ fn handle_file(
Some(row_data)
}
}

0 comments on commit da4a6ef

Please sign in to comment.