Skip to content

Commit

Permalink
feat(dataframe): implement core traits (#6)
Browse files Browse the repository at this point in the history
* feat(dataframe): implement core traits
- implement more basic traits for DataFrame
- change interface to leverage Enums where possible (jointype, savemode, storagelevel)
- create storage.rs with enum wrapper around spark::StorageLevel
- update tests
- update docker-compose.yml with delta-spark

* update docs and examples

* update readme
  • Loading branch information
sjrusso8 authored Mar 16, 2024
1 parent 13e463b commit 5c85ffb
Show file tree
Hide file tree
Showing 11 changed files with 1,608 additions and 327 deletions.
166 changes: 131 additions & 35 deletions README.md

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ version: '3.7'
services:
spark:
image: "apache/spark:3.5.0-scala2.12-java11-r-ubuntu"
command:
- "/opt/spark/sbin/start-connect-server.sh"
- "--conf"
- "spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp"
- "--packages"
- "org.apache.spark:spark-connect_2.12:3.5.0"
command: >
/opt/spark/sbin/start-connect-server.sh
--packages "org.apache.spark:spark-connect_2.12:3.5.0,io.delta:delta-spark_2.12:3.0.0"
--conf "spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp"
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
environment:
- SPARK_NO_DAEMONIZE=true
ports:
Expand Down
67 changes: 67 additions & 0 deletions examples/delta.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// This example demonstrates creating a Spark DataFrame from a CSV with read options
// and then adding transformations for 'select' & 'sort'
// The resulting dataframe is saved in the `delta` format as a `managed` table
// and `spark.sql` queries are run against the delta table
//
// The remote spark session must have the spark package `io.delta:delta-spark_2.12:{DELTA_VERSION}` enabled.
// Where the `DELTA_VERSION` is the specified Delta Lake version.

use spark_connect_rs::{SparkSession, SparkSessionBuilder};

use spark_connect_rs::dataframe::SaveMode;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut spark: SparkSession = SparkSessionBuilder::default().build().await?;

let paths = vec!["/opt/spark/examples/src/main/resources/people.csv".to_string()];

let df = spark
.clone()
.read()
.format("csv")
.option("header", "True")
.option("delimiter", ";")
.option("inferSchema", "True")
.load(paths);

df.write()
.format("delta")
.mode(SaveMode::Overwrite)
.saveAsTable("default.people_delta")
.await
.unwrap();

spark
.sql("DESCRIBE HISTORY default.people_delta")
.await?
.show(Some(1), None, Some(true))
.await
.unwrap();

// print results
// +-------------------------------------------------------------------------------------------------------+
// | show_string |
// +-------------------------------------------------------------------------------------------------------+
// | -RECORD 0-------------------------------------------------------------------------------------------- |
// | version | 3 |
// | timestamp | 2024-03-16 13:46:23.552 |
// | userId | NULL |
// | userName | NULL |
// | operation | CREATE OR REPLACE TABLE AS SELECT |
// | operationParameters | {isManaged -> true, description -> NULL, partitionBy -> [], properties -> {}} |
// | job | NULL |
// | notebook | NULL |
// | clusterId | NULL |
// | readVersion | 2 |
// | isolationLevel | Serializable |
// | isBlindAppend | false |
// | operationMetrics | {numFiles -> 1, numOutputRows -> 2, numOutputBytes -> 988} |
// | userMetadata | NULL |
// | engineInfo | Apache-Spark/3.5.0 Delta-Lake/3.0.0 |
// | only showing top 1 row |
// | |
// +-------------------------------------------------------------------------------------------------------+

Ok(())
}
3 changes: 3 additions & 0 deletions examples/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use spark_connect_rs::{SparkSession, SparkSessionBuilder};

use spark_connect_rs::functions::col;

use spark_connect_rs::dataframe::SaveMode;

// This example demonstrates creating a Spark DataFrame from range()
// alias the column name, writing the results to a CSV
// then reading the csv file back
Expand All @@ -20,6 +22,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

df.write()
.format("csv")
.mode(SaveMode::Overwrite)
.option("header", "true")
.save(path)
.await?;
Expand Down
10 changes: 9 additions & 1 deletion src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl Column {
///
/// # Arguments:
///
/// * `cols` a value that implements the [ToLiteralExpr] trait
/// * `cols`: a col reference that is translated into an [spark::Expression]
///
/// # Example:
/// ```rust
Expand Down Expand Up @@ -310,6 +310,14 @@ impl Column {
invoke_func("rlike", vec![self.clone(), value])
}

/// Equality comparion. Cannot overload the '==' and return something other
/// than a bool
pub fn eq<T: ToLiteralExpr>(&self, other: T) -> Column {
let value = lit(other);

invoke_func("==", vec![self.clone(), value])
}

/// A filter expression that evaluates to true is the expression is null
#[allow(non_snake_case)]
pub fn isNull(&self) -> Column {
Expand Down
Loading

0 comments on commit 5c85ffb

Please sign in to comment.