Skip to content

Commit

Permalink
Merge branch 'main' of github.com:arangodb/lightning
Browse files Browse the repository at this point in the history
  • Loading branch information
hkernbach committed Aug 9, 2024
2 parents c1617ff + 457ae32 commit fd074de
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 34 deletions.
56 changes: 44 additions & 12 deletions src/graph_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,23 @@ impl GraphLoader {

info!("{:?} Got all data, processing...", SystemTime::now());
for c in consumers {
let _guck = c.join();
match c.join() {
Ok(Ok(())) => {
// The thread completed successfully and returned Ok
}
Ok(Err(e)) => {
// The thread completed but returned an error
eprintln!("Thread returned error: {:?}", e);
return Err(e); // Propagate the error
}
Err(e) => {
// The thread panicked
eprintln!("Thread panicked in do_vertices: {:?}", e);
return Err(GraphLoaderError::from(
"Thread panicked in do_vertices".to_string(),
));
}
}
}
}
Ok(())
Expand Down Expand Up @@ -738,23 +754,23 @@ impl GraphLoader {
edge.as_object_mut().unwrap().remove("_to");
edge_json.push(vec![edge]);
} else {
let id: &Value = &edge["_id"];
let idstr: &String = match id {
Value::String(i) => i,
_ => {
return Err(GraphLoaderError::JsonParseError(format!(
"JSON is no object with a string _id attribute:\n{}",
edge
)));
}
// it is not guaranteed that the _id field is present
let id = &edge["_id"];
let idstr: Option<&String> = match id {
Value::String(i) => Some(i),
_ => None,
};

// If we get here, we have to extract the field
// values in `fields` from the json and store it
// to edge_json:
let get_value = |v: &Value, field: &str| -> Value {
if field == "@collection_name" {
Value::String(collection_name_from_id(idstr))
if let Some(id) = idstr {
Value::String(collection_name_from_id(id))
} else {
Value::String("n/A - _id is missing".to_string())
}
} else {
v[field].clone()
}
Expand Down Expand Up @@ -846,7 +862,23 @@ impl GraphLoader {
std::time::SystemTime::now()
);
for c in consumers {
let _guck = c.join();
match c.join() {
Ok(Ok(())) => {
// The thread completed successfully and returned Ok
}
Ok(Err(e)) => {
// The thread completed but returned an error
eprintln!("Thread returned error: {:?}", e);
return Err(e); // Propagate the error
}
Err(e) => {
// The thread panicked
eprintln!("Thread panicked in do_edges: {:?}", e);
return Err(GraphLoaderError::from(
"Thread panicked in do_edges".to_string(),
));
}
}
}
Ok(())
}
Expand Down
72 changes: 50 additions & 22 deletions tests/graph_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,9 @@ async fn init_named_graph_loader_with_data() {
teardown().await;
}

fn get_attribute_position(field_names: &Vec<String>, attribute: &str) -> usize {
fn get_attribute_position_from_fields(field_names: &Vec<String>, attribute: &str) -> usize {
assert!(!field_names.is_empty());
assert!(field_names.contains(&attribute.to_string()));
field_names.iter().position(|x| x == attribute).unwrap()
}

Expand Down Expand Up @@ -300,13 +302,13 @@ async fn init_named_graph_loader_with_data_all_v_and_e_attributes_manually_set()
assert_eq!(vertex.len(), 3);
assert_eq!(vertex.len(), vertex_field_names.len());

let x = &vertex[get_attribute_position(vertex_field_names, "x")]
let x = &vertex[get_attribute_position_from_fields(vertex_field_names, "x")]
.as_u64()
.unwrap();
let y = &vertex[get_attribute_position(vertex_field_names, "y")]
let y = &vertex[get_attribute_position_from_fields(vertex_field_names, "y")]
.as_u64()
.unwrap();
let z = &vertex[get_attribute_position(vertex_field_names, "z")]
let z = &vertex[get_attribute_position_from_fields(vertex_field_names, "z")]
.as_u64()
.unwrap();
let expected_x_value = (v_index + 1) as u64;
Expand Down Expand Up @@ -358,13 +360,13 @@ async fn init_named_graph_loader_with_data_all_v_and_e_attributes_manually_set()
assert_eq!(edge.len(), 3);
assert_eq!(edge.len(), edge_field_names.len());

let x = &edge[get_attribute_position(edge_field_names, "x")]
let x = &edge[get_attribute_position_from_fields(edge_field_names, "x")]
.as_u64()
.unwrap();
let y = &edge[get_attribute_position(edge_field_names, "y")]
let y = &edge[get_attribute_position_from_fields(edge_field_names, "y")]
.as_u64()
.unwrap();
let z = &edge[get_attribute_position(edge_field_names, "z")]
let z = &edge[get_attribute_position_from_fields(edge_field_names, "z")]
.as_u64()
.unwrap();
let expected_x_value = (e_index + 1) as u64;
Expand Down Expand Up @@ -429,7 +431,7 @@ async fn init_named_graph_loader_with_data_all_v_and_e_collection_name_attribute
assert_eq!(vertex.len(), vertex_field_names.len());

let collection_name = &vertex
[get_attribute_position(vertex_field_names, "@collection_name")]
[get_attribute_position_from_fields(vertex_field_names, "@collection_name")]
.as_str()
.unwrap();
let expected_collection_name = VERTEX_COLLECTION;
Expand Down Expand Up @@ -472,11 +474,11 @@ async fn init_named_graph_loader_with_data_all_v_and_e_collection_name_attribute
}

for (_e_index, edge) in columns.iter().enumerate() {
assert_eq!(edge.len(), 3);
assert_eq!(edge.len(), edge_field_names.len());
assert_eq!(edge.len(), 1);
assert_eq!(edge_field_names.len(), 1);

let collection_name = &edge
[get_attribute_position(edge_field_names, "@collection_name")]
[get_attribute_position_from_fields(edge_field_names, "@collection_name")]
.as_str()
.unwrap();
let expected_collection_name = EDGE_COLLECTION;
Expand Down Expand Up @@ -533,15 +535,28 @@ async fn init_named_graph_loader_with_data_all_v_and_e_attributes_all_by_boolean
for (v_index, vertex_json_arr) in columns.iter().enumerate() {
assert_eq!(vertex_json_arr.len(), 1);
let vertex = &vertex_json_arr[0];
assert_eq!(3, vertex.as_object().unwrap().len());

let x = &vertex[get_attribute_position(vertex_field_names, "x")]
// x, y, y including _key and _rev
assert_eq!(5, vertex.as_object().unwrap().len());

let x = &vertex
.as_object()
.unwrap()
.get("x")
.unwrap()
.as_u64()
.unwrap();
let y = &vertex[get_attribute_position(vertex_field_names, "y")]
let y = &vertex
.as_object()
.unwrap()
.get("y")
.unwrap()
.as_u64()
.unwrap();
let z = &vertex[get_attribute_position(vertex_field_names, "z")]
let z = &vertex
.as_object()
.unwrap()
.get("z")
.unwrap()
.as_u64()
.unwrap();
let expected_x_value = (v_index + 1) as u64;
Expand Down Expand Up @@ -588,17 +603,30 @@ async fn init_named_graph_loader_with_data_all_v_and_e_attributes_all_by_boolean

for (e_index, edge_json_arr) in columns.iter().enumerate() {
assert_eq!(edge_json_arr.len(), 1);
assert_eq!(edge_json_arr.len(), edge_field_names.len());
assert_eq!(edge_field_names.len(), 0);
let edge = &edge_json_arr[0];
assert_eq!(3, edge.as_object().unwrap().len());

let x = &edge[get_attribute_position(edge_field_names, "x")]
assert_eq!(6, edge.as_object().unwrap().len());

// x, y, z and _id, _key, _rev
let x = &edge
.as_object()
.unwrap()
.get("x")
.unwrap()
.as_u64()
.unwrap();
let y = &edge[get_attribute_position(edge_field_names, "y")]
let y = &edge
.as_object()
.unwrap()
.get("y")
.unwrap()
.as_u64()
.unwrap();
let z = &edge[get_attribute_position(edge_field_names, "z")]
let z = &edge
.as_object()
.unwrap()
.get("z")
.unwrap()
.as_u64()
.unwrap();
let expected_x_value = (e_index + 1) as u64;
Expand Down

0 comments on commit fd074de

Please sign in to comment.