Skip to content

Commit

Permalink
feat(dataframe): additional methods & updated docs
Browse files Browse the repository at this point in the history
  • Loading branch information
sjrusso8 committed Sep 20, 2023
1 parent d379b09 commit 85f4671
Show file tree
Hide file tree
Showing 7 changed files with 309 additions and 14 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "spark-connect-rs"
version = "0.0.1-alpha"
version = "0.0.1-alpha.1"
edition = "2021"
license = "Apache-2.0"
description = "Apache Spark Connect Client for Rust"
Expand Down Expand Up @@ -32,6 +32,7 @@ prost-types = "0.12.0"
uuid = { version = "1.4.1", features = ["v4"] }
url = "2.2"


[build-dependencies]
tonic-build = "0.10.0"

Expand Down
37 changes: 34 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut df = spark.sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`");

df.filter("salary > 3000").show(Some(5), None, None).await?;

Ok(())
}
```

Expand All @@ -50,7 +52,10 @@ cargo build && cargo test
## Features

The following section outlines some of the implemented functions that
are working with the Spark Connect session
are working with the Spark Connect session.

- ![open] better Error handling
- ![open] not so much use of `clone()`

### SparkSession

Expand All @@ -71,15 +76,41 @@ are working with the Spark Connect session
| select | ![done] | |
| selectExpr | ![done] | Does not include the new Spark Connect 3.5 feature with "position arguments" |
| filter | ![done] | |
| createTempView | ![done] | There is an error right now, and the functions are private till it's fixed |
| limit | ![done] | |
| dropDuplicates | ![done] | |
| withColumnsRenamed | ![done] | |
| drop | ![done] | |
| sample | ![done] | |
| repartition | ![done] | |
| offset | ![done] | |
| schema | ![done] | The output needs to be handled better |
| explain | ![done] | The output needs to be handled better |
| show | ![done] | |
| tail | ![done] | |
| collect | ![done] | |
| withColumns | ![open] | |
| drop | ![open] | |
| sort | ![open] | |
| groupBy | ![open] | |
| createTempView | ![open] | There is an error right now, and the functions are private till it's fixed |
| many more! | ![open] | |

### DataFrameWriter

Spark Connect *should* respect the format as long as your cluster supports the specified type and has the
required jars

| DataFrame | API | Comment |
|-----------------|---------|------------------------------------------------------------------------------|
| format | ![done] | |
| option | ![done] | |
| options | ![done] | |
| mode | ![done] | |
| bucketBy | ![done] | |
| sortBy | ![done] | |
| partitionBy | ![done] | |
| save | ![done] | |
| saveAsTable | ![done] | |
| insertInto | ![done] | |

[open]: https://cdn.jsdelivr.net/gh/Readme-Workflows/Readme-Icons@main/icons/octicons/IssueNeutral.svg
[done]: https://cdn.jsdelivr.net/gh/Readme-Workflows/Readme-Icons@main/icons/octicons/ApprovedChanges.svg
214 changes: 207 additions & 7 deletions src/dataframe/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl DataFrame {

let rel_type = RelType::Project(Box::new(spark::Project {
expressions,
input: Some(Box::new(self.logical_plan.clone().relation)),
input: self.logical_plan.clone().relation_input(),
}));

let logical_plan = self.logical_plan.from(rel_type);
Expand Down Expand Up @@ -99,7 +99,7 @@ impl DataFrame {

let rel_type = RelType::Project(Box::new(spark::Project {
expressions,
input: Some(Box::new(self.logical_plan.clone().relation)),
input: self.logical_plan.clone().relation_input(),
}));

let logical_plan = self.logical_plan.from(rel_type);
Expand All @@ -121,11 +121,12 @@ impl DataFrame {
});

let rel_type = RelType::Filter(Box::new(spark::Filter {
input: Some(Box::new(self.logical_plan.clone().relation)),
input: self.logical_plan.clone().relation_input(),
condition: Some(Expression {
expr_type: Some(filter_expr),
}),
}));

let logical_plan = self.logical_plan.from(rel_type);

DataFrame::new(self.spark_session.clone(), logical_plan)
Expand All @@ -141,7 +142,7 @@ impl DataFrame {
/// ```
pub fn limit(&mut self, limit: i32) -> DataFrame {
let limit_expr = RelType::Limit(Box::new(spark::Limit {
input: Some(Box::new(self.logical_plan.clone().relation)),
input: self.logical_plan.clone().relation_input(),
limit,
}));

Expand All @@ -150,6 +151,90 @@ impl DataFrame {
DataFrame::new(self.spark_session.clone(), logical_plan)
}

/// Return a new [DataFrame] with duplicate rows removed,
/// optionally only considering certain columns from a `Vec<String>`
///
/// If no columns are supplied then it all columns are used
///
/// Alias for `dropDuplciates`
///
pub fn drop_duplicates(&mut self, cols: Option<Vec<String>>) -> DataFrame {
let drop_expr = match cols {
Some(cols) => RelType::Deduplicate(Box::new(spark::Deduplicate {
input: self.logical_plan.clone().relation_input(),
column_names: cols,
all_columns_as_keys: Some(false),
within_watermark: Some(false),
})),
None => RelType::Deduplicate(Box::new(spark::Deduplicate {
input: self.logical_plan.clone().relation_input(),
column_names: vec![],
all_columns_as_keys: Some(true),
within_watermark: Some(false),
})),
};

let logical_plan = self.logical_plan.from(drop_expr);

DataFrame::new(self.spark_session.clone(), logical_plan)
}

#[allow(non_snake_case)]
pub fn dropDuplicates(&mut self, cols: Option<Vec<String>>) -> DataFrame {
self.drop_duplicates(cols)
}

/// Returns a new [DataFrame] by renaming multiple columns from a
/// `HashMap<String, String>` containing the `existing` as the key
/// and the `new` as the value.
///
#[allow(non_snake_case)]
pub fn withColumnsRenamed(&mut self, cols: HashMap<String, String>) -> DataFrame {
let rename_expr = RelType::WithColumnsRenamed(Box::new(spark::WithColumnsRenamed {
input: self.logical_plan.clone().relation_input(),
rename_columns_map: cols,
}));

let logical_plan = self.logical_plan.from(rename_expr);

DataFrame::new(self.spark_session.clone(), logical_plan)
}

/// Returns a new [DataFrame] without the specified columns
pub fn drop(&mut self, cols: Vec<String>) -> DataFrame {
let drop_expr = RelType::Drop(Box::new(spark::Drop {
input: self.logical_plan.clone().relation_input(),
columns: vec![],
column_names: cols,
}));

let logical_plan = self.logical_plan.from(drop_expr);

DataFrame::new(self.spark_session.clone(), logical_plan)
}

/// Returns a sampled subset of this [DataFrame]
pub fn sample(
&mut self,
lower_bound: f64,
upper_bound: f64,
with_replacement: Option<bool>,
seed: Option<i64>,
) -> DataFrame {
let sample_expr = RelType::Sample(Box::new(spark::Sample {
input: self.logical_plan.clone().relation_input(),
lower_bound,
upper_bound,
with_replacement,
seed,
deterministic_order: false,
}));

let logical_plan = self.logical_plan.from(sample_expr);

DataFrame::new(self.spark_session.clone(), logical_plan)
}

/// Returns a new [DataFrame] partitioned by the given partition number and shuffle
/// option
///
Expand All @@ -160,7 +245,7 @@ impl DataFrame {
///
pub fn repartition(&mut self, num_partitions: i32, shuffle: Option<bool>) -> DataFrame {
let repart_expr = RelType::Repartition(Box::new(spark::Repartition {
input: Some(Box::new(self.logical_plan.clone().relation)),
input: self.logical_plan.clone().relation_input(),
num_partitions,
shuffle,
}));
Expand All @@ -170,6 +255,71 @@ impl DataFrame {
DataFrame::new(self.spark_session.clone(), logical_plan)
}

/// Returns a new [DataFrame] by skiping the first n rows
pub fn offset(&mut self, num: i32) -> DataFrame {
let offset_expr = RelType::Offset(Box::new(spark::Offset {
input: self.logical_plan.clone().relation_input(),
offset: num,
}));

let logical_plan = self.logical_plan.from(offset_expr);

DataFrame::new(self.spark_session.clone(), logical_plan)
}

/// Returns the schema of this DataFrame as a [spark::analyze_plan_response::Schema]
/// which contains the schema of a DataFrame
pub async fn schema(&mut self) -> spark::analyze_plan_response::Schema {
let analyze = Some(spark::analyze_plan_request::Analyze::Schema(
spark::analyze_plan_request::Schema {
plan: Some(self.logical_plan.clone().build_plan_root()),
},
));

let schema = self.spark_session.analyze_plan(analyze).await;

match schema {
spark::analyze_plan_response::Result::Schema(schema) => schema,
_ => panic!("Unexpected result"),
}
}

/// Prnts the plans to the console
///
/// # Arguments:
/// * `mode`: &str. Defaults to `unspecified`
/// - `simple`
/// - `extended`
/// - `codegen`
/// - `cost`
/// - `formatted`
/// - `unspecified`
///
pub async fn explain(&mut self, mode: &str) -> spark::analyze_plan_response::Explain {
let explain_mode = match mode {
"simple" => spark::analyze_plan_request::explain::ExplainMode::Simple,
"extended" => spark::analyze_plan_request::explain::ExplainMode::Extended,
"codegen" => spark::analyze_plan_request::explain::ExplainMode::Codegen,
"cost" => spark::analyze_plan_request::explain::ExplainMode::Cost,
"formatted" => spark::analyze_plan_request::explain::ExplainMode::Formatted,
_ => spark::analyze_plan_request::explain::ExplainMode::Unspecified,
};

let analyze = Some(spark::analyze_plan_request::Analyze::Explain(
spark::analyze_plan_request::Explain {
plan: Some(self.logical_plan.clone().build_plan_root()),
explain_mode: explain_mode.into(),
},
));

let result = self.spark_session.analyze_plan(analyze).await;

match result {
spark::analyze_plan_response::Result::Explain(explain) => explain,
_ => panic!("Unexpected result"),
}
}

#[allow(non_snake_case, dead_code)]
async fn createTempView(&mut self, name: &str) {
self.create_view_cmd(name.to_string(), false, false)
Expand Down Expand Up @@ -234,7 +384,7 @@ impl DataFrame {
vertical: Option<bool>,
) -> Result<(), ArrowError> {
let show_expr = RelType::ShowString(Box::new(spark::ShowString {
input: Some(Box::new(self.logical_plan.clone().relation)),
input: self.logical_plan.clone().relation_input(),
num_rows: num_rows.unwrap_or(10),
truncate: truncate.unwrap_or(0),
vertical: vertical.unwrap_or(false),
Expand All @@ -254,7 +404,7 @@ impl DataFrame {
///
pub async fn tail(&mut self, limit: i32) -> Result<Vec<RecordBatch>, ArrowError> {
let limit_expr = RelType::Tail(Box::new(spark::Tail {
input: Some(Box::new(self.logical_plan.clone().relation)),
input: self.logical_plan.clone().relation_input(),
limit,
}));

Expand Down Expand Up @@ -519,4 +669,54 @@ impl DataFrameWriter {

Ok(())
}

async fn save_table(&mut self, table_name: &str, save_method: i32) -> Result<(), ArrowError> {
let write_command = spark::command::CommandType::WriteOperation(spark::WriteOperation {
input: Some(self.dataframe.logical_plan.relation.clone()),
source: self.format.clone(),
mode: self.mode.into(),
sort_column_names: self.sort_by.clone(),
partitioning_columns: self.partition_by.clone(),
bucket_by: self.bucket_by.clone(),
options: self.write_options.clone(),
save_type: Some(spark::write_operation::SaveType::Table(
spark::write_operation::SaveTable {
table_name: table_name.to_string(),
save_method,
},
)),
});

let plan = self
.dataframe
.logical_plan
.clone()
.build_plan_cmd(write_command);

self.dataframe
.spark_session
.consume_plan(Some(plan))
.await
.unwrap();

Ok(())
}

/// Saves the context of the [DataFrame] as the specified table.
#[allow(non_snake_case)]
pub async fn saveAsTable(&mut self, table_name: &str) -> Result<(), ArrowError> {
self.save_table(table_name, 1).await
}

/// Inserts the content of the [DataFrame] to the specified table.
///
/// It requires that the schema of the [DataFrame] is the same as the
/// schema of the target table.
///
/// Unlike `saveAsTable()`, this method ignores the column names and just uses
/// position-based resolution
#[allow(non_snake_case)]
pub async fn insertInto(&mut self, table_name: &str) -> Result<(), ArrowError> {
self.save_table(table_name, 2).await
}
}
Loading

0 comments on commit 85f4671

Please sign in to comment.