Skip to content

Commit

Permalink
feat(client/handler): reimplement client & response handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
sjrusso8 committed Mar 24, 2024
1 parent 2b8995f commit e1447b6
Show file tree
Hide file tree
Showing 8 changed files with 481 additions and 450 deletions.
60 changes: 16 additions & 44 deletions src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,7 @@ impl Catalog {

let plan = LogicalPlanBuilder::from(rel_type).clone().build_plan_root();

self.spark_session
.clone()
.consume_plan_and_fetch(Some(plan))
.await
.unwrap()
self.spark_session.client.to_first_value(plan).await
}

/// Returns the current default database in this session
Expand All @@ -45,16 +41,12 @@ impl Catalog {

let plan = LogicalPlanBuilder::from(rel_type).clone().build_plan_root();

self.spark_session
.clone()
.consume_plan_and_fetch(Some(plan))
.await
.unwrap()
self.spark_session.client.to_first_value(plan).await
}

/// Returns a list of catalogs in this session
#[allow(non_snake_case)]
pub async fn listCatalogs(&mut self, pattern: Option<&str>) -> Vec<RecordBatch> {
pub async fn listCatalogs(&mut self, pattern: Option<&str>) -> RecordBatch {
let pattern = pattern.map(|val| val.to_owned());

let cat_type = Some(spark::catalog::CatType::ListCatalogs(spark::ListCatalogs {
Expand All @@ -65,16 +57,12 @@ impl Catalog {

let plan = LogicalPlanBuilder::from(rel_type).clone().build_plan_root();

self.spark_session
.clone()
.consume_plan(Some(plan))
.await
.unwrap()
self.spark_session.client.to_arrow(plan).await.unwrap()
}

/// Returns a list of databases in this session
#[allow(non_snake_case)]
pub async fn listDatabases(&mut self, pattern: Option<&str>) -> Vec<RecordBatch> {
pub async fn listDatabases(&mut self, pattern: Option<&str>) -> RecordBatch {
let pattern = pattern.map(|val| val.to_owned());

let cat_type = Some(spark::catalog::CatType::ListDatabases(
Expand All @@ -85,20 +73,12 @@ impl Catalog {

let plan = LogicalPlanBuilder::from(rel_type).clone().build_plan_root();

self.spark_session
.clone()
.consume_plan(Some(plan))
.await
.unwrap()
self.spark_session.client.to_arrow(plan).await.unwrap()
}

/// Returns a list of tables/views in the specific database
#[allow(non_snake_case)]
pub async fn listTables(
&mut self,
dbName: Option<&str>,
pattern: Option<&str>,
) -> Vec<RecordBatch> {
pub async fn listTables(&mut self, dbName: Option<&str>, pattern: Option<&str>) -> RecordBatch {
let cat_type = Some(spark::catalog::CatType::ListTables(spark::ListTables {
db_name: dbName.map(|db| db.to_owned()),
pattern: pattern.map(|val| val.to_owned()),
Expand All @@ -108,16 +88,12 @@ impl Catalog {

let plan = LogicalPlanBuilder::from(rel_type).clone().build_plan_root();

self.spark_session
.clone()
.consume_plan(Some(plan))
.await
.unwrap()
self.spark_session.client.to_arrow(plan).await.unwrap()
}

/// Returns a list of columns for the given tables/views in the specific database
#[allow(non_snake_case)]
pub async fn listColumns(&mut self, tableName: &str, dbName: Option<&str>) -> Vec<RecordBatch> {
pub async fn listColumns(&mut self, tableName: &str, dbName: Option<&str>) -> RecordBatch {
let cat_type = Some(spark::catalog::CatType::ListColumns(spark::ListColumns {
table_name: tableName.to_owned(),
db_name: dbName.map(|val| val.to_owned()),
Expand All @@ -127,11 +103,7 @@ impl Catalog {

let plan = LogicalPlanBuilder::from(rel_type).clone().build_plan_root();

self.spark_session
.clone()
.consume_plan(Some(plan))
.await
.unwrap()
self.spark_session.client.to_arrow(plan).await.unwrap()
}
}

Expand Down Expand Up @@ -177,8 +149,8 @@ mod tests {

let value = spark.catalog().listCatalogs(None).await;

assert_eq!(2, value[0].num_columns());
assert_eq!(1, value[0].num_rows());
assert_eq!(2, value.num_columns());
assert_eq!(1, value.num_rows());
}

#[tokio::test]
Expand All @@ -193,8 +165,8 @@ mod tests {

let value = spark.catalog().listDatabases(None).await;

assert_eq!(4, value[0].num_columns());
assert_eq!(2, value[0].num_rows());
assert_eq!(4, value.num_columns());
assert_eq!(2, value.num_rows());
}

#[tokio::test]
Expand All @@ -209,7 +181,7 @@ mod tests {

let value = spark.catalog().listDatabases(Some("*rust")).await;

assert_eq!(4, value[0].num_columns());
assert_eq!(1, value[0].num_rows());
assert_eq!(4, value.num_columns());
assert_eq!(1, value.num_rows());
}
}
Loading

0 comments on commit e1447b6

Please sign in to comment.