Skip to content

Commit

Permalink
feat: colummns & dtypes
Browse files Browse the repository at this point in the history
  • Loading branch information
sjrusso8 committed Mar 12, 2024
1 parent 8bc8b54 commit 8b4c4d4
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 2 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ is not yet working with this Spark Connect implementation.
| sample | ![done] | |
| repartition | ![done] | |
| offset | ![done] | |
| dtypes | ![done] | |
| columns | ![done] | |
| schema | ![done] | The output needs to be handled better |
| explain | ![done] | The output needs to be handled better |
| show | ![done] | |
Expand Down Expand Up @@ -122,7 +124,7 @@ required jars
Spark [Column](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/column.html) type object and its implemented traits


| DataFrame | API | Comment |
| Column | API | Comment |
|------------------|---------|------------------------------------------------------------------------------|
| alias | ![done] | |
| asc | ![done] | |
Expand Down Expand Up @@ -162,7 +164,7 @@ Spark [Column](https://spark.apache.org/docs/latest/api/python/reference/pyspark

Only a few of the functions are covered by unit tests.

| DataFrame | API | Comment |
| Functions | API | Comment |
|-----------------------------|---------|---------|
| abs | ![done] | |
| acos | ![open] | |
Expand Down
39 changes: 39 additions & 0 deletions src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,45 @@ impl DataFrame {
}
}

/// Retrieves the names of all columns in the DataFrame as a `Vec<String>`.
/// The order of the column names in the list reflects their order in the DataFrame.
pub async fn columns(&mut self) -> Vec<String> {
let schema = self.schema().await.schema.unwrap();

let struct_val = schema.kind.unwrap();

match struct_val {
spark::data_type::Kind::Struct(val) => val
.fields
.iter()
.map(|field| field.name.to_string())
.collect(),
_ => unimplemented!("Unexpected schema response"),
}
}

/// Returns all column names and their data types as a Vec containing
/// the field name as a String and the [spark::data_type::Kind] enum
pub async fn dtypes(&mut self) -> Vec<(String, Option<spark::data_type::Kind>)> {
let schema = self.schema().await.schema.unwrap();

let struct_val = schema.kind.unwrap();

match struct_val {
spark::data_type::Kind::Struct(val) => val
.fields
.iter()
.map(|field| {
(
field.name.to_string(),
field.data_type.clone().unwrap().kind,
)
})
.collect(),
_ => unimplemented!("Unexpected schema response"),
}
}

/// Prints the [spark::Plan] to the console
///
/// # Arguments:
Expand Down
24 changes: 24 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,4 +237,28 @@ mod tests {

assert_eq!(total, 1000)
}

#[tokio::test]
async fn test_dataframe_columns() {
let spark = setup().await;

let paths = vec!["/opt/spark/examples/src/main/resources/people.csv".to_string()];

let cols = spark
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.load(paths)
.columns()
.await;

let expected = vec![
String::from("name"),
String::from("age"),
String::from("job"),
];

assert_eq!(cols, expected)
}
}

0 comments on commit 8b4c4d4

Please sign in to comment.