diff --git a/.Rbuildignore b/.Rbuildignore index 91114bf..cbdc128 100644 --- a/.Rbuildignore +++ b/.Rbuildignore @@ -1,2 +1,8 @@ +^requirements\.txt$ +^renv$ +^renv\.lock$ ^.*\.Rproj$ ^\.Rproj\.user$ +^LICENSE\.md$ +^doc$ +^Meta$ diff --git a/.circleci/config.yml b/.circleci/config.yml deleted file mode 100644 index fb25e1f..0000000 --- a/.circleci/config.yml +++ /dev/null @@ -1,53 +0,0 @@ -version: 2 -jobs: - build: - docker: - - image: continuumio/miniconda3 - steps: - - run: mkdir -p ~/.ssh/ && ssh-keyscan github.com > ~/.ssh/known_hosts 2>/dev/null - - run: conda install conda-build CacheControl lockfile - - run: conda skeleton cran $CIRCLE_REPOSITORY_URL --git-tag $CIRCLE_SHA1 - - run: | - for channel in r conda-forge bioconda moj-analytical-services; do - conda config --add channels "$channel" - done - - run: conda build r-$CIRCLE_PROJECT_REPONAME --R 3.5.1 - - persist_to_workspace: - root: /opt/conda/conda-bld/linux-64/ - paths: - - "*.tar.bz2" - publish: - docker: - - image: continuumio/miniconda3 - steps: - - attach_workspace: - at: /opt/conda/conda-bld/linux-64/ - - run: conda install anaconda-client - - run: - name: "Publish to Conda" - command: anaconda -t $CONDA_UPLOAD_TOKEN upload -u moj-analytical-services /opt/conda/conda-bld/linux-64/*.tar.bz2 - -workflows: - version: 2 - build-only: - jobs: - - build: - filters: - tags: - ignore: /^v.*/ - build-and-publish: - jobs: - - build: - filters: - tags: - only: /^v.*/ - branches: - ignore: /.*/ - - publish: - filters: - tags: - only: /^v.*/ - branches: - ignore: /.*/ - requires: - - build diff --git a/.gitignore b/.gitignore index 5b6a065..1f0272a 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,6 @@ .Rhistory .RData .Ruserdata +renv +.Rprofile +/Meta/ diff --git a/DESCRIPTION b/DESCRIPTION index 701792d..da8ae4f 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,20 +1,20 @@ Package: dbtools Type: Package Title: Uses R wrapper function to send queries to athena. -Version: 2.0.3 +Version: 3.0.0 Author: Karik Isichei -Maintainer: The package maintainer Description: See title. -License: What license is it under? +License: MIT + file LICENSE Encoding: UTF-8 LazyData: true -RoxygenNote: 6.0.1 +RoxygenNote: 7.1.1 Imports: - reticulate (>= 1.10), - s3tools, - readr + magrittr, + reticulate, + arrow Suggests: - data.table (>= 1.11.8) -Remotes: - moj-analytical-services/s3tools - + knitr, + data.table (>= 1.11.8), + rmarkdown, + tibble +VignetteBuilder: knitr diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..6f9d947 --- /dev/null +++ b/LICENSE @@ -0,0 +1,2 @@ +YEAR: 2022 +COPYRIGHT HOLDER: Ministry of Justice diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..b20a52f --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,21 @@ +# MIT License + +Copyright (c) 2022 Ministry of Justice + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/NAMESPACE b/NAMESPACE index 87c0573..12be57e 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -1,7 +1,21 @@ # Generated by roxygen2: do not edit by hand -export(get_athena_query_response) +export("%>%") +export(create_temp_table) +export(delete_database_and_data) +export(delete_partitions_and_data) +export(delete_table_and_data) +export(describe_table) +export(get_query_columns_types) +export(get_query_execution) +export(get_sql_from_file) export(read_sql) -import(readr) -import(reticulate) -import(s3tools) +export(read_sql_query) +export(render_sql_template) +export(repair_table) +export(show_create_table) +export(start_query_execution) +export(start_query_execution_and_wait) +export(stop_query_execution) +export(wait_query) +importFrom(magrittr,"%>%") diff --git a/R/dbtools.R b/R/dbtools.R new file mode 100644 index 0000000..7337dc0 --- /dev/null +++ b/R/dbtools.R @@ -0,0 +1,22 @@ +#' dbtools: A package for accessing AWS Athena from the Analytical Platform. +#' +#' @section About: +#' The dbtools package is used to run SQL queries configured for the +#' Analytical Platform. This package is a reticulated +#' wrapper around the Python library pydbtools +#' which uses AWS Wrangler's Athena module but adds additional functionality +#' (like Jinja templating, creating temporary tables) and alters some configuration +#' to our specification. +#' +#' Alternatively you might want to use +#' Rdbtools, which has the +#' advantages of being R-native, so no messing with `reticulate` and Python, and +#' supporting `dbplyr`. Please note the caveat about support, though. +#' +#' @seealso \url{https://github.com/moj-analytical-services/pydbtools} +#' @seealso \url{https://github.com/moj-analytical-services/Rdbtools} +#' +#' @docType package +#' @name dbtools +NULL +#> NULL diff --git a/R/get_athena_query_response.R b/R/get_athena_query_response.R deleted file mode 100644 index 6547ac1..0000000 --- a/R/get_athena_query_response.R +++ /dev/null @@ -1,40 +0,0 @@ -#' get_athena_query_response -#' -#'@description uses boto3 (in python) to send an sql query to athena and return the resulting data's path in s3 and meta data -#' -#'@import reticulate s3tools -#' -#'@export -#' -#'@details Will send an SQL query to athena and wait for it to complete. Once the query has completed the funtion will return a list containing the s3 path to your athena query and meta data about the output data -#' -#'@param sql_query A string specifying the SQL query you want to send to athena. See packages github readme for info on the flavour of SQL Athena uses. -#' -#'@param return_athena_types Specifies if the list describing the data's meta data types should be defined using athena datatypes (TRUE) or using the data engineering team's generic metadata types (FALSE). If not specified the default value of this input parameter is set to FALSE. -#' -#'@param timeout Specifies How long you want your sql query to wait before it gives up (in seconds). Default parameter is NULL which will mean SQL query will not timeout and could wait forever if an issue occured. -#' -#'@return A list with two keys. [1] s3_path : a string pointing to the s3 path of the athena query. [2] meta : a list that has the name and type of each column in of the data in the s3_path. Can be used to get correct data types of your output when read in to R (note the order of the columns matches the order they appear in the data). -#' -#'@examples -#'# Read an sql query using readr::read_csv -#'response <- dbtools::get_athena_query_response("SELECT * from crest_v1.flatfile limit 10000") -#' -#'# print out path to athena query output (as a csv) -#'print(response$s3_path) -#' -#'# print out meta data -#'print(response$meta) -#' -#'# Read in data using whatever csv reader you want -#'s3_path_stripped = gsub("s3://", "", response$s3_path) -#'df <- s3tools::read_using(FUN = read.csv, s3_path=s3_path_stripped) - -get_athena_query_response <- function(sql_query, return_athena_types=FALSE, timeout = NULL){ - - pydbtools <- reticulate::import("pydbtools") - s3tools::get_credentials() - response <- pydbtools$get_athena_query_response(sql_query=sql_query, return_athena_types=return_athena_types, timeout=timeout, force_ec2=TRUE) - return(response) -} - diff --git a/R/get_data_conversion.R b/R/get_data_conversion.R deleted file mode 100644 index a3d3eef..0000000 --- a/R/get_data_conversion.R +++ /dev/null @@ -1,41 +0,0 @@ -get_data_conversion <- function(df_type){ - - if(df_type == 'tibble'){ - # Note how 64 bit integers (long) are read in as double - conversion <- list( - "character" = readr::col_character(), - "int" = readr::col_integer(), - "long" = readr::col_double(), - "date" = readr::col_date(), - "datetime" = readr::col_datetime(), - "boolean" = readr::col_logical(), - "float" = readr::col_double(), - "double" = readr::col_double() - ) - } else if(df_type == 'data.table'){ - # Note that dates/datetimes are read in as characters - conversion <- list( - "character" = "character", - "int" = "integer", - "long" = "integer64", - "date" = "character", - "datetime" = "character", - "boolean" = "logical", - "float" = "double", - "double" = "double" - ) - } else { - # have to read in everything as character see readme - conversion <- list( - "character" = "character", - "int" = "character", - "long" = "character", - "date" = "character", - "datetime" = "character", - "boolean" = "character", - "float" = "character", - "double" = "character" - ) - } - return(conversion) -} diff --git a/R/read.R b/R/read.R new file mode 100644 index 0000000..e21cb4d --- /dev/null +++ b/R/read.R @@ -0,0 +1,50 @@ +#' Send an SQL query to Athena and receive a dataframe. +#' +#' @param sql An SQL query +#' +#' @return Dataframe or tibble if the tibble library is loaded. +#' @export +#' +#' @examples +#' `df <- dbtools::read_sql_query('select * from my_db.my_table')` +read_sql_query <- function(sql) { + # Download the dataframe result to a parquet temporary file as pandas and + # reticulate are frequently incompatible, and load the data into R using + # arrow. + tmp_location <- tempfile(fileext=".parquet") + dbtools.env$pydb$save_query_to_parquet(sql, tmp_location) + df <- arrow::read_parquet(tmp_location) + unlink(tmp_location) + return(df) +} + +#' Uses boto3 (in python) to send an sql query to athena and return an R dataframe, tibble or data.table based on user preference. +#' +#' @export +#' +#' @details Will send an SQL query to Athena and wait for it to complete. Once the query has completed the resulting sql query will be read using arrow. +#' Function returns dataframe. If needing more a more bespoke or self defined data reading function and arguments use dbtools::start_query_and_wait to send an SQL query and return the s3 path to data in csv format. +#' +#' @param sql_query A string specifying the SQL query you want to send to athena. See packages github readme for info on the flavour of SQL Athena uses. +#' @param return_df_as String specifying what the table should be returned as i.e. 'dataframe', 'tibble' (converts data using tibble::as_tibble) or 'data.table' (converts data using data.table::as.data.table). Default is 'tibble'. Not all tables returned are a DataFrame class. +#' +#' @return A table as a dataframe, tibble or data.table +#' +#' @examples +#' # Read an sql query returning a tibble +#' ``` +#' df <- dbtools::read_sql( +#' "SELECT * from crest_v1.flatfile limit 10000", +#' return_df_as="tibble" +#' ) +#' ``` +read_sql <- function(sql_query, return_df_as="tibble") { + df <- read_sql_query(sql_query) + if (return_df_as == "dataframe") { + return(as.data.frame(df)) + } else if (return_df_as == "data.table") { + return(data.table::as.data.table(df)) + } else { + return(tibble::as_tibble(df)) + } +} diff --git a/R/read_sql.R b/R/read_sql.R deleted file mode 100644 index adc314b..0000000 --- a/R/read_sql.R +++ /dev/null @@ -1,78 +0,0 @@ -#' read_sql -#' -#'@description uses boto3 (in python) to send an sql query to athena and return an R dataframe, tibble or data.table based on user preference. -#' -#'@import reticulate s3tools readr -#' -#'@export -#' -#'@details Will send an SQL query to athena and wait for it to complete. Once the query has completed the resulting sql query will be read using read.csv (base R), read_csv (readr) or fread (data.table). -#' Function returns dataframe. If needing more a more bespoke or self defined data reading function and arguments use dbtools::get_athena_query_response to send an SQL query and return the s3 path to data in csv format. -#' -#'@param sql_query A string specifying the SQL query you want to send to athena. See packages github readme for info on the flavour of SQL Athena uses. -#' -#'@param return_df_as String specifying what the table should be returned as i.e. 'dataframe' (reads data using read.csv), 'tibble' (reads data using readr::read_csv) or 'data.table' (reads data using data.table::fread). Default is 'tibble'. Not all tables returned are a DataFrame class. -#' Only return_df_as set to 'tibble' maintains date and datetime formats. dataframe and data.table will convert date and datetimes to characters. -#' -#'@param timeout Specifies How long you want your sql query to wait before it gives up (in seconds). Default parameter is NULL which will mean SQL query will not timeout and could wait forever if an issue occured. -#' -#'@return A table as a dataframe, tibble or data.table -#' -#'@examples -#'# Read an sql query using readr::read_csv i.e. returning a Tibble -#'df <- dbtools::read_sql("SELECT * from crest_v1.flatfile limit 10000") -#'df - -read_sql <- function(sql_query, return_df_as='tibble', timeout = NULL){ - - # Annoyingly I think you have to pull it in as the source_python function doesn't seem to be exported properly - # require(reticulate) - - return_df_as <- tolower(return_df_as) - if(!return_df_as %in% c('dataframe', 'tibble', 'data.table')){ - stop("input var return_df_as must be one of the following 'dataframe', 'tibble' or 'data.table'") - } - - response <- dbtools::get_athena_query_response(sql_query=sql_query, return_athena_types=FALSE, timeout=timeout) - s3_path_stripped <- gsub("s3://", "", response$s3_path) - bucket <- unlist(strsplit(s3_path_stripped, '/'))[1] - s3_key <- gsub(paste0(bucket,"/"), "", s3_path_stripped) - - # Check if text output - if(endsWith(s3_path_stripped, ".txt")){ - if(return_df_as == 'tibble'){ - df <- s3tools::read_using(FUN=readr::read_csv, s3_path=s3_path_stripped, col_names=FALSE, col_types=list(readr::col_character())) - } else if(return_df_as == 'data.table'){ - df <- s3tools::read_using(FUN=data.table::fread, s3_path=s3_path_stripped, header=FALSE, colClasses=c("character")) - } else { - df <- s3tools::read_using(FUN=read.csv, s3_path=s3_path_stripped, header=FALSE, colClasses=c("character")) - } - colnames(df) <- c("output") - } else { - # Get meta Ddata conversion - data_conversion <- dbtools:::get_data_conversion(return_df_as) - col_classes = list() - for(m in response$meta){ - col_classes[[m$name]] = data_conversion[[m$type]] - } - col_classes_vec = unlist(col_classes) - - if(return_df_as == 'tibble'){ - #This is the best R work arround I could find to replicate Python's **kwargs... - col_types = do.call(readr::cols, col_classes) - df <- s3tools::read_using(FUN=readr::read_csv, s3_path=s3_path_stripped, col_names=TRUE, col_types=col_types) - - } else if(return_df_as == 'data.table'){ - dt_ver <- packageVersion("data.table") - if(dt_ver < '1.11.8'){ - warning("Your version of data.table must be 1.11.8 or above please install a new version otherwise your outputs of type data.table may not convert data types properly.") - } - df <- s3tools::read_using(FUN=data.table::fread, s3_path=s3_path_stripped, header=TRUE, colClasses=col_classes_vec) - } else { - df <- s3tools::read_using(FUN=read.csv, s3_path=s3_path_stripped, header=TRUE, colClasses=col_classes_vec) - } - } - dbtools:::delete_object(bucket, s3_key) - dbtools:::delete_object(bucket, paste0(s3_key, ".metadata")) - return(df) -} diff --git a/R/utils-pipe.R b/R/utils-pipe.R new file mode 100644 index 0000000..fd0b1d1 --- /dev/null +++ b/R/utils-pipe.R @@ -0,0 +1,14 @@ +#' Pipe operator +#' +#' See \code{magrittr::\link[magrittr:pipe]{\%>\%}} for details. +#' +#' @name %>% +#' @rdname pipe +#' @keywords internal +#' @export +#' @importFrom magrittr %>% +#' @usage lhs \%>\% rhs +#' @param lhs A value or the magrittr placeholder. +#' @param rhs A function call using the magrittr semantics. +#' @return The result of calling `rhs(lhs)`. +NULL diff --git a/R/utils.R b/R/utils.R deleted file mode 100644 index 2829e88..0000000 --- a/R/utils.R +++ /dev/null @@ -1,34 +0,0 @@ -#' delete_object -#' -#'@description uses boto3 (in python) to delete an S3 object (used by read_sql function to clean up after itself) -#' -#'@import reticulate -#' -#'@details Will send an SQL query to athena and wait for it to complete. Once the query has completed the resulting sql query will be read using read.csv (base R), read_csv (readr) or fread (data.table). -#' Function returns dataframe. If needing more a more bespoke or self defined data reading function and arguments use dbtools::get_athena_query_response to send an SQL query and return the s3 path to data in csv format. -#' -#'@param bucket A string specifying the s3 bucket name -#' -#'@param key File path to the s3 object - -#'@examples -#'# delete a file from S3 -#'# (note this is not exported so have to use tripple colon) -#'dbtools:::delete_object('my_bucket', 'path/to/file.csv') - -delete_object <- function(bucket, key){ - python_script <- system.file("extdata", "boto_utils.py", package = "dbtools") - reticulate::source_python(python_script) - s3tools::get_credentials() - delete_object(bucket=bucket, key=key) -} - -get_iam_role <- function(){ - user <- Sys.getenv("USER") - if(user==""){ - stop("Error could not find username in env vars. Please raise an issue on the Github repo for dbtools.") - } - iam_role <- paste0("alpha_user_", user) - return(iam_role) -} - diff --git a/R/wrap.R b/R/wrap.R new file mode 100644 index 0000000..c3cd938 --- /dev/null +++ b/R/wrap.R @@ -0,0 +1,199 @@ +#' Create a temporary table +#' +#' @param sql: The SQL table you want to create a temp table out of. Should +#' be a table that starts with a WITH or SELECT clause. +#' @param table_name The name of the temp table you wish to create +#' +#' @export +#' +#' @examples +#' `dbtools::create_temp_table("SELECT a_col, count(*) as n FROM a_database.table GROUP BY a_col", table_name="temp_table_1")` +create_temp_table <- function(sql, table_name) { + dbtools.env$pydb$create_temp_table(sql, table_name) +} + +#' Show the list of columns, including partition columns: 'DESCRIBE table;'. +#' +#' @param table Table name +#' @param database AWS Glue/Athena database name +#' +#' @return DataFrame filled by formatted infos. +#' @export +#' +#' @examples +#' `df_table = dbtools::describe_table(table='my_table', database='my_db')` +describe_table <- function(table, database) { + dbtools.env$pydb$describe_table(table, database) +} + +#' Get the data type of all columns queried. +#' +#' @param query_id Athena query execution ID +#' +#' @return List with all data types +#' @export +#' +#' @examples +#' `dbtools::get_query_columns_types('query-execution-id')` +get_query_columns_types <- function(query_id) { + dbtools.env$pydb$get_query_columns_types(query_id) +} + +#' Fetch query execution details. +#' +#' @param query_id Athena query execution ID +#' +#' @return List with the get_query_execution response. +#' @seealso https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html#Athena.Client.get_query_execution +#' @export +#' +#' @examples +#' `res <- dbtools::get_query_execution(query_id='query-execution-id')` +get_query_execution <- function(query_id) { + dbtools.env$pydb$get_query_execution(query_id) +} + + +#' Run the Hive's metastore consistency check: 'MSCK REPAIR TABLE table;'. +#' +#' Recovers partitions and data associated with partitions. +#' Use this statement when you add partitions to the catalog. +#' It is possible it will take some time to add all partitions. +#' If this operation times out, it will be in an incomplete state +#' where only a few partitions are added to the catalog. +#' +#' @param table Table name +#' @param database AWS Glue/Athena database name +#' +#' @return Query final state ('SUCCEEDED', 'FAILED', 'CANCELLED') +#' @export +#' +#' @examples +#' `query_final_state = dbtools::repair_table(table='...', database='...')` +repair_table <- function(table, database) { + dbtools.env$pydb$repair_table(table, database) +} + +#' Generate the query that created a table: 'SHOW CREATE TABLE table;'. +#' +#' @param table Table name +#' @param database AWS Glue/Athena database name +#' +#' @return The query that created the table +#' @export +#' +#' @examples +#' `df_table = dbtools::show_create_table(table='my_table', database='my_db')` +show_create_table <- function(table, database) { + dbtools.env$pydb$show_create_table(table, database) +} + +#' Start a SQL Query against AWS Athena +#' +#' @param sql SQL query +#' @param wait Default FALSE, indicates whether to wait for the query to finish and return a dictionary with the query execution response. +#' +#' @return Query execution ID if `wait` is set to `False`, list with the get_query_execution response otherwise. +#' @seealso https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html#Athena.Client.get_query_execution +#' @export +start_query_execution <- function(sql, wait=FALSE) { + dbtools.env$pydb$start_query_execution(sql, wait) +} + +#' Stop a query execution +#' +#' @param query_id Athena query execution ID +#' +#' @export +stop_query_execution <- function(query_id) { + dbtools.env$pydb$stop_query_execution(query_id) +} + +#' Wait for a query to end. +#' +#' @param query_id Athena query execution ID +#' +#' @return List with the get_query_execution response +#' @seealso https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html#Athena.Client.get_query_execution +#' @export +#' +#' @examples +#' `res <- dbtools::wait_query(query_id)` +wait_query <- function(query_id) { + dbtools.env$pydb$wait_query(query_id) +} + +#' Calls start_query_execution followed by wait_query. +#' +#' @param sql An SQL string. Which works with __TEMP__ references. +#' +#' @return List with the get_query_execution response. +#' @seealso https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html#Athena.Client.get_query_execution +#' @export +#' +#' @examples +#' `res <- dbtools::start_query_execution_and_wait('select * from __temp__.my_table')` +start_query_execution_and_wait <- function(sql) { + dbtools.env$pydb$start_query_execution_and_wait(sql) +} + +#' Deletes partitions and the underlying data on S3 from an Athena +#' database table matching an expression. +#' +#' @param database The database name. +#' @param table The table name. +#' @param expression The expression to match. +#' +#' @seealso https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_partitions +#' @export +#' +#' @examples +#' `dbtools::delete_partitions_and_data("my_database", "my_table", "year = 2020 and month = 5")` +delete_partitions_and_data <- function(database, table, expression) { + dbtools.env$pydb$delete_partitions_and_data(database, table, expression) +} + +#' Deletes both a table from an Athena database and the underlying data on S3. +#' +#' @param database The database name. +#' @param table The table name. +#' +#' @export +#' +#' @examples +#' `dbtools::delete_table_and_data("__temp__", "my_table")` +delete_table_and_data <- function(database, table) { + dbtools.env$pydb$delete_table_and_data(table, database) +} + +#' Deletes both an Athena database and the underlying data on S3. +#' +#' @param database Database name +#' +#' @export +delete_database_and_data <- function(database) { + dbtools.env$pydb$delete_database_and_data(database) +} + +#' Read in an SQL file and inject arguments with Jinja (if given params). +#' +#' @param filepath A filepath to your SQL file. +#' @param jinja_args If not NULL, will pass the read +#' in SQL file through a jinja template to render the template. +#' Otherwise will just return the SQL file as is. Defaults to NULL. +#' +#' @return +#' @export +get_sql_from_file <- function(filepath, jinja_args=NULL) { + dbtools.env$pydb$get_sql_from_file(filepath, jinja_args) +} + +#' Takes an SQL query and injects arguments with Jinja. +#' +#' @param sql An SQL query +#' @param jinja_args Arguments that are referenced in the SQL file +#' +#' @export +render_sql_template <- function(sql, jinja_args) { + dbtools.env$pydb$render_sql_template(sql, jinja_args) +} diff --git a/R/zzz.R b/R/zzz.R index 8b88e50..fa4b998 100644 --- a/R/zzz.R +++ b/R/zzz.R @@ -1,6 +1,5 @@ +dbtools.env <- new.env(parent=emptyenv()) .onLoad <- function(libname, pkgname) { - # Construct Python path and pass it to reticulate - base_path <- strsplit(Sys.getenv("PATH"), ":")[[1]][1] - python_path <- paste(base_path, "/python", sep = "") - reticulate::use_python(python_path) + # import Python package on package load + assign('pydb', reticulate::import('pydbtools'), dbtools.env) } diff --git a/README.md b/README.md index 6d114b1..9a0ea67 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,102 @@ # dbtools +Users who have yet to migrate to the newer version of the Analytical +Platform should refer to the [Legacy](#legacy) section below. + +## About + +A package that is used to run SQL queries configured for the +Analytical Platform. This package is a [reticulated](https://rstudio.github.io/reticulate/) +wrapper around [pydbtools](https://github.com/moj-analytical-services/pydbtools) +which uses AWS Wrangler's Athena module but adds additional functionality +(like Jinja templating, creating temporary tables) and alters some configuration +to our specification. + +Alternatively you might want to use +[Rdbtools](https://github.com/moj-analytical-services/Rdbtools), which has the +advantages of being R-native, so no messing with `reticulate` and Python, and +supporting `dbplyr`. Please note the caveat about support, though. + +## Installation + +Run the following commands in the R console. + +```R +# Set up the project to use renv, if not already done +renv::init() +# Tell renv that Python will be used +renv::use_python() +# Install the reticulate library to interface with Python +renv::install("reticulate") +# Install the Python library pydbtools +reticulate::py_install("pydbtools") +# Install dbtools +renv::install("moj-analytical-services/dbtools") +``` + +## Quickstart guide + +There is a [vignette](doc/dbtools.pdf) with more details but the following +describes the basics of the package. + +### Read an SQL Athena query into an R dataframe + +```r +library(dbtools) + +df <- read_sql_query("SELECT * from a_database.table LIMIT 10") +``` + +### Run a query in Athena + +```r +response <- dbtools::start_query_execution_and_wait( + "CREATE DATABASE IF NOT EXISTS my_test_database" +) +``` + +### Create temporary tables to do further separate SQL queries on later + +```r +dbtools::create_temp_table( + "SELECT a_col, count(*) as n FROM a_database.table GROUP BY a_col", + table_name="temp_table_1" +) +df <- dbtools::read_sql_query("SELECT * FROM __temp__.temp_table_1 WHERE n < 10") +``` + +### Delete databases, tables and partitions together with the data on S3 + +```r +dbtools::delete_partitions_and_data( + database='my_database', + table='my_table', + expression='year = 2020 or year = 2021' +) +dbtools::delete_table_and_data(database='my_database', table='my_table') +dbtools::delete_database('my_database') + +# These can be used for temporary databases and tables. +dbtools::delete_table_and_data(database='__temp__', table='my_temp_table') +``` + +### Use Jinja templating to inject arguments into your SQL + +```r +sql_template <- "SELECT * FROM {{ db_name }}.{{ table }}" +sql <- dbtools::render_sql_template(sql_template, {"db_name": db_name, "table": "department"}) +df <- dbtools::read_sql_query(sql) + +cat("SELECT * FROM {{ db_name }}.{{ table_name }}", file="tempfile.sql") +sql <- pydb.get_sql_from_file("tempfile.sql", jinja_args={"db_name": db_name, "table_name": "department"}) +pydb.read_sql_query(sql) +``` + +# Legacy + +The information below applies to versions <3.0.0, and should be used by anyone +on the older version of the Analytical Platform i.e. anyone using R3.*. + This is a simple package that lets you query databases using Amazon Athena and get the s3 path to the athena output (as a csv). This is significantly faster than using database drivers provided by Amazon, so might be a good option when pulling in large data. Note: this package works alongside user IAM policies on the Analytical-Platform and requires you to be added to be given a standard database access. If in our github organisation you will be able to access the repo to request standard database access [here](https://github.com/moj-analytical-services/data-engineering-database-access). @@ -141,6 +238,11 @@ When you run a query in SQL against our databases you are using Athena. When Ath #### Changelog: +## 3.0.0 - 2022-02-03 + +- No longer dependent on s3tools +- Wraps `pydbtools` functions + ## 2.0.3 - 2020-04-29 - Fixes prompts to install miniconda - now automatically uses main Analytical Platform Conda Python, based on sys path diff --git a/doc/dbtools.R b/doc/dbtools.R new file mode 100644 index 0000000..f8710e6 --- /dev/null +++ b/doc/dbtools.R @@ -0,0 +1,151 @@ +## ---- include = FALSE--------------------------------------------------------- +knitr::opts_chunk$set( + collapse = TRUE, + comment = "#>" +) +devtools::load_all() + +## ----setup-------------------------------------------------------------------- +library(dbtools) + +## ----------------------------------------------------------------------------- +read_sql_query("select * from aws_example_dbtools.employees limit 5") + +## ----------------------------------------------------------------------------- +read_sql("select * from aws_example_dbtools.department limit 5", + return_df_as="tibble") + +## ----------------------------------------------------------------------------- +read_sql("select * from aws_example_dbtools.sales limit 5", + return_df_as="data.table") + +## ----------------------------------------------------------------------------- +sql <- " +SELECT employee_id, sum(sales) as total_sales +FROM aws_example_dbtools.sales +GROUP BY employee_id +" +create_temp_table(sql, table_name="total_sales") + +## ----------------------------------------------------------------------------- +sql <- " +SELECT e.employee_id, e.forename, e.surname, d.department_name +FROM aws_example_dbtools.employees AS e +LEFT JOIN aws_example_dbtools.department AS d +ON e.department_id = d.department_id +WHERE e.department_id = 1 +" +create_temp_table(sql, table_name="sales_employees") + +## ----------------------------------------------------------------------------- +sql <- " +SELECT se.*, ts.total_sales +FROM __temp__.sales_employees AS se +INNER JOIN __temp__.total_sales AS ts +ON se.employee_id = ts.employee_id +" +read_sql_query(sql) + +## ----------------------------------------------------------------------------- +sql_template = "select * from {{ db_name }}.{{ table }} limit 10" +sql <- render_sql_template(sql_template, + list(db_name="aws_example_dbtools", + table="department")) +sql + +## ----------------------------------------------------------------------------- +read_sql_query(sql) + +## ----------------------------------------------------------------------------- +sql <- render_sql_template(sql_template, + list(db_name="aws_example_dbtools", + table="sales")) +read_sql_query(sql) + +## ----------------------------------------------------------------------------- +cat("SELECT * FROM {{ db_name }}.{{ table_name }}", file="tempfile.sql") + +sql <- get_sql_from_file("tempfile.sql", + jinja_args=list(db_name="aws_example_dbtools", + table_name="department")) +read_sql_query(sql) + +## ----------------------------------------------------------------------------- +sql <- " +CREATE DATABASE IF NOT EXISTS new_db_dbtools +COMMENT 'Example of running queries and insert into' +LOCATION 's3://alpha-everyone/dbtools/new_db/' +" + +response <- start_query_execution_and_wait(sql) +response$Status$State + +## ----------------------------------------------------------------------------- +sql <- " +CREATE TABLE new_db_dbtools.sales_report WITH +( + external_location='s3://alpha-everyone/dbtools/new_db/sales_report' +) AS +SELECT qtr as sales_quarter, sum(sales) AS total_sales +FROM aws_example_dbtools.sales +WHERE qtr IN (1,2) +GROUP BY qtr +" + +response <- start_query_execution_and_wait(sql) +response$Status$State + +## ----------------------------------------------------------------------------- +sql <- " +INSERT INTO new_db_dbtools.sales_report +SELECT qtr as sales_quarter, sum(sales) AS total_sales +FROM aws_example_dbtools.sales +WHERE qtr IN (3,4) +GROUP BY qtr +" + +response <- start_query_execution_and_wait(sql) +read_sql_query("select * from new_db_dbtools.sales_report") + +## ----------------------------------------------------------------------------- +sql <- " +CREATE TABLE new_db_dbtools.daily_sales_report WITH +( + external_location='s3://alpha-everyone/dbtools/new_db/daily_sales_report', + partitioned_by = ARRAY['report_date'] +) AS +SELECT qtr as sales_quarter, sum(sales) AS total_sales, +date '2021-01-01' AS report_date +FROM aws_example_dbtools.sales +GROUP BY qtr, date '2021-01-01' +" + +response <- start_query_execution_and_wait(sql) +response$Status$State + +## ----------------------------------------------------------------------------- +sql <- " +INSERT INTO new_db_dbtools.daily_sales_report +SELECT qtr as sales_quarter, sum(sales) AS total_sales, +date '2021-01-02' AS report_date +FROM aws_example_dbtools.sales +GROUP BY qtr, date '2021-01-02' +" + +response <- start_query_execution_and_wait(sql) +read_sql_query("select * from new_db_dbtools.daily_sales_report") + +## ----------------------------------------------------------------------------- +delete_partitions_and_data("new_db_dbtools", "daily_sales_report", + "report_date = '2021-01-02'") +read_sql_query("select * from new_db_dbtools.daily_sales_report") + +## ----------------------------------------------------------------------------- +delete_table_and_data("new_db_dbtools", "daily_sales_report") + +## ----------------------------------------------------------------------------- +delete_database_and_data("new_db_dbtools") + +## ---- include=FALSE----------------------------------------------------------- +delete_database_and_data("aws_example_dbtools") + diff --git a/doc/dbtools.Rmd b/doc/dbtools.Rmd new file mode 100644 index 0000000..440fe09 --- /dev/null +++ b/doc/dbtools.Rmd @@ -0,0 +1,303 @@ +--- +title: "Introduction to dbtools" +output: rmarkdown::html_vignette +vignette: > + %\VignetteIndexEntry{dbtools} + %\VignetteEngine{knitr::rmarkdown} + %\VignetteEncoding{UTF-8} +--- + +```{r, include = FALSE} +knitr::opts_chunk$set( + collapse = TRUE, + comment = "#>" +) +devtools::load_all() +``` + +```{python, include=FALSE} +import os +import pandas as pd +import awswrangler as wr +import pydbtools as pydb + +# setup your own testing area (set foldername = GH username) +foldername = "mratford" # GH username +foldername = foldername.lower().replace("-","_") + +bucketname = "alpha-everyone" +s3_base_path = f"s3://{bucketname}/{foldername}/" + +db_name = f"aws_example_dbtools" +source_db_base_path = f"s3://{bucketname}/{foldername}/source_db/" + +# Delete all the s3 files in a given path +if wr.s3.list_objects(s3_base_path): + print("deleting objs") + wr.s3.delete_objects(s3_base_path) + +# Delete the database if it exists +df_dbs = wr.catalog.databases(None) +if db_name in df_dbs["Database"].to_list(): + print(f"{db_name} found deleting") + wr.catalog.delete_database( + name=db_name + ) + +# Setup source database +# Create the database +wr.catalog.create_database(db_name) + +# Iterate through the tables in data/ and write them to our db using awswrangler +for table_name in ["department", "employees", "sales"]: + df = pd.read_csv(f"data/{table_name}.csv") + table_path = os.path.join(source_db_base_path, table_name) + wr.s3.to_parquet( + df=df, + path=table_path, + index=False, + dataset=True, # True allows the other params below i.e. overwriting to db.table + database=db_name, + table=table_name, + mode="overwrite", + ) +``` + +`dbtools` is a library used to query AWS Athena databases from R on the +Ministry of Justice's Analytical Platform. It uses the Python library +`pydbtools` and inherits much of its functionality, including creating +and querying temporary tables and injecting SQL queries +with template arguments. + +```{r setup} +library(dbtools) +``` + +## Reading SQL queries + +The `read_sql_query` function is used to obtain R dataframes from SQL queries +sent to Athena. + +```{r} +read_sql_query("select * from aws_example_dbtools.employees limit 5") +``` + +If a tibble is preferred the `read_sql` function is provided + +```{r} +read_sql("select * from aws_example_dbtools.department limit 5", + return_df_as="tibble") +``` + +or for a `data.table` + +```{r} +read_sql("select * from aws_example_dbtools.sales limit 5", + return_df_as="data.table") +``` + + +## Creating temporary SQL tables + +The `create_temp_table` function allows you to create tables which can be +referred to in subsequent queries from the `__temp__` database. For example, +to create a table showing total sales per employee from the tables above create +a temporary total sales table. + +```{r} +sql <- " +SELECT employee_id, sum(sales) as total_sales +FROM aws_example_dbtools.sales +GROUP BY employee_id +" +create_temp_table(sql, table_name="total_sales") +``` + +Then create a table of employees from the sales department. + +```{r} +sql <- " +SELECT e.employee_id, e.forename, e.surname, d.department_name +FROM aws_example_dbtools.employees AS e +LEFT JOIN aws_example_dbtools.department AS d +ON e.department_id = d.department_id +WHERE e.department_id = 1 +" +create_temp_table(sql, table_name="sales_employees") +``` + +The two temporary tables can then be joined to provide the final table. + +```{r} +sql <- " +SELECT se.*, ts.total_sales +FROM __temp__.sales_employees AS se +INNER JOIN __temp__.total_sales AS ts +ON se.employee_id = ts.employee_id +" +read_sql_query(sql) +``` + +## SQL templating + +Sometimes you will want to run similar SQL queries which differ only by, +for example, table or column names. In these cases +SQL templates can be created to SQL queries populated by templated variables, +using Jinja2 templating (https://jinja2docs.readthedocs.io/en/stable/index.html). +For example, + +```{r} +sql_template = "select * from {{ db_name }}.{{ table }} limit 10" +sql <- render_sql_template(sql_template, + list(db_name="aws_example_dbtools", + table="department")) +sql +``` + +The rendered SQL can then be used to query Athena as usual. + +```{r} +read_sql_query(sql) +``` + +The same template can be used to read a different table. + +```{r} +sql <- render_sql_template(sql_template, + list(db_name="aws_example_dbtools", + table="sales")) +read_sql_query(sql) +``` + +Perhaps more usefully we can use SQL templates saved as a file, which means we +can make use of our editors' and IDEs' SQL capabilities. + +```{r} +cat("SELECT * FROM {{ db_name }}.{{ table_name }}", file="tempfile.sql") + +sql <- get_sql_from_file("tempfile.sql", + jinja_args=list(db_name="aws_example_dbtools", + table_name="department")) +read_sql_query(sql) +``` + +## Advanced usage +### Creating and maintaining database tables in Athena + +In this section we will create a new database from our existing database in +Athena. Use the `start_query_execution_and_wait` function +to run the SQL creating the database. + +```{r} +sql <- " +CREATE DATABASE IF NOT EXISTS new_db_dbtools +COMMENT 'Example of running queries and insert into' +LOCATION 's3://alpha-everyone/dbtools/new_db/' +" + +response <- start_query_execution_and_wait(sql) +response$Status$State +``` + +Create a derived table in the new database with a CTAS query that both +generates the output into S3 and creates the schema of the table. Note that +this only inserts the data from quarters 1 and 2. + +```{r} +sql <- " +CREATE TABLE new_db_dbtools.sales_report WITH +( + external_location='s3://alpha-everyone/dbtools/new_db/sales_report' +) AS +SELECT qtr as sales_quarter, sum(sales) AS total_sales +FROM aws_example_dbtools.sales +WHERE qtr IN (1,2) +GROUP BY qtr +" + +response <- start_query_execution_and_wait(sql) +response$Status$State +``` + +We can now use an insert into query to add the data from quarters 3 and 4 as +the schema has already been created. + +```{r} +sql <- " +INSERT INTO new_db_dbtools.sales_report +SELECT qtr as sales_quarter, sum(sales) AS total_sales +FROM aws_example_dbtools.sales +WHERE qtr IN (3,4) +GROUP BY qtr +" + +response <- start_query_execution_and_wait(sql) +read_sql_query("select * from new_db_dbtools.sales_report") +``` + +### Creating a table with partitions + +Do the same as before but partition the data based on when the report was run. +This can make queries more efficient as filtering on the partition columns +reduces the amount of data scanned, plus makes incrementally adding data +easier. + +```{r} +sql <- " +CREATE TABLE new_db_dbtools.daily_sales_report WITH +( + external_location='s3://alpha-everyone/dbtools/new_db/daily_sales_report', + partitioned_by = ARRAY['report_date'] +) AS +SELECT qtr as sales_quarter, sum(sales) AS total_sales, +date '2021-01-01' AS report_date +FROM aws_example_dbtools.sales +GROUP BY qtr, date '2021-01-01' +" + +response <- start_query_execution_and_wait(sql) +response$Status$State +``` + +Then, simulating a source database that is updated daily, add more partitioned +data. + +```{r} +sql <- " +INSERT INTO new_db_dbtools.daily_sales_report +SELECT qtr as sales_quarter, sum(sales) AS total_sales, +date '2021-01-02' AS report_date +FROM aws_example_dbtools.sales +GROUP BY qtr, date '2021-01-02' +" + +response <- start_query_execution_and_wait(sql) +read_sql_query("select * from new_db_dbtools.daily_sales_report") +``` + +We can remove a partition and its underlying data using +`delete_partitions_and_data` which uses an expression to match partitions - +see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_partitions +for more details. + +```{r} +delete_partitions_and_data("new_db_dbtools", "daily_sales_report", + "report_date = '2021-01-02'") +read_sql_query("select * from new_db_dbtools.daily_sales_report") +``` + +Similarly we can remove a table and its data, + +```{r} +delete_table_and_data("new_db_dbtools", "daily_sales_report") +``` + +or the whole database. + +```{r} +delete_database_and_data("new_db_dbtools") +``` + +```{r, include=FALSE} +delete_database_and_data("aws_example_dbtools") +``` diff --git a/doc/dbtools.html b/doc/dbtools.html new file mode 100644 index 0000000..b764281 --- /dev/null +++ b/doc/dbtools.html @@ -0,0 +1,397 @@ + + + + + + + + + + + + + + +Introduction to dbtools + + + + + + + + + + + + + + + + + + + + + + + + + +

Introduction to dbtools

+ + + +

dbtools is a library used to query AWS Athena databases from R on the Ministry of Justice’s Analytical Platform. It uses the Python library pydbtools and inherits much of its functionality, including creating and querying temporary tables and injecting SQL queries with template arguments.

+
library(dbtools)
+
+

Reading SQL queries

+

The read_sql_query function is used to obtain R dataframes from SQL queries sent to Athena.

+
read_sql_query("select * from aws_example_dbtools.employees limit 5")
+#> # A tibble: 5 x 6
+#>   employee_id sex   forename surname  department_id manager_id
+#>         <int> <chr> <chr>    <chr>            <dbl>      <dbl>
+#> 1           1 M     Dexter   Mitchell             1         17
+#> 2           2 F     Summer   Bennett              1         17
+#> 3           3 M     Pip      Carter               1         17
+#> 4           4 F     Bella    Long                 1         17
+#> 5           5 F     Lexie    Perry               NA         17
+

If a tibble is preferred the read_sql function is provided

+
read_sql("select * from aws_example_dbtools.department limit 5",
+         return_df_as="tibble")
+#> # A tibble: 5 x 2
+#>   department_id department_name
+#>           <int> <chr>          
+#> 1             1 Sales          
+#> 2             2 Admin          
+#> 3             3 Management     
+#> 4             4 Technical      
+#> 5             5 Maintenance
+

or for a data.table

+
read_sql("select * from aws_example_dbtools.sales limit 5",
+         return_df_as="data.table")
+#>    employee_id qtr  sales
+#> 1:           1   1 768.17
+#> 2:           2   1 391.98
+#> 3:           3   1 406.36
+#> 4:           4   1 816.25
+#> 5:           5   1 437.05
+
+
+

Creating temporary SQL tables

+

The create_temp_table function allows you to create tables which can be referred to in subsequent queries from the __temp__ database. For example, to create a table showing total sales per employee from the tables above create a temporary total sales table.

+
sql <- "
+SELECT employee_id, sum(sales) as total_sales
+FROM aws_example_dbtools.sales
+GROUP BY employee_id
+"
+create_temp_table(sql, table_name="total_sales")
+

Then create a table of employees from the sales department.

+
sql <- "
+SELECT e.employee_id, e.forename, e.surname, d.department_name
+FROM aws_example_dbtools.employees AS e
+LEFT JOIN aws_example_dbtools.department AS d
+ON e.department_id = d.department_id
+WHERE e.department_id = 1
+"
+create_temp_table(sql, table_name="sales_employees")
+

The two temporary tables can then be joined to provide the final table.

+
sql <- "
+SELECT se.*, ts.total_sales
+FROM __temp__.sales_employees AS se
+INNER JOIN __temp__.total_sales AS ts
+ON se.employee_id = ts.employee_id
+"
+read_sql_query(sql)
+#> # A tibble: 41 x 5
+#>    employee_id forename surname   department_name total_sales
+#>          <int> <chr>    <chr>     <chr>                 <dbl>
+#>  1           1 Dexter   Mitchell  Sales                 2912.
+#>  2           2 Summer   Bennett   Sales                 1786.
+#>  3           3 Pip      Carter    Sales                 2591.
+#>  4           4 Bella    Long      Sales                 2997.
+#>  5           6 Robert   Roberts   Sales                 2208.
+#>  6           7 Iris     Alexander Sales                 2465.
+#>  7           9 Evan     Carter    Sales                 2280.
+#>  8          10 Lauren   Powell    Sales                 1936.
+#>  9          11 Alice    James     Sales                 3093.
+#> 10          12 Owen     Scott     Sales                 2286.
+#> # … with 31 more rows
+
+
+

SQL templating

+

Sometimes you will want to run similar SQL queries which differ only by, for example, table or column names. In these cases SQL templates can be created to SQL queries populated by templated variables, using Jinja2 templating (https://jinja2docs.readthedocs.io/en/stable/index.html). For example,

+
sql_template = "select * from {{ db_name }}.{{ table }} limit 10"
+sql <- render_sql_template(sql_template, 
+                           list(db_name="aws_example_dbtools",
+                                table="department"))
+sql
+#> [1] "select * from aws_example_dbtools.department limit 10"
+

The rendered SQL can then be used to query Athena as usual.

+
read_sql_query(sql)
+#> # A tibble: 6 x 2
+#>   department_id department_name
+#>           <int> <chr>          
+#> 1             1 Sales          
+#> 2             2 Admin          
+#> 3             3 Management     
+#> 4             4 Technical      
+#> 5             5 Maintenance    
+#> 6             6 HR
+

The same template can be used to read a different table.

+
sql <- render_sql_template(sql_template, 
+                           list(db_name="aws_example_dbtools",
+                                table="sales"))
+read_sql_query(sql)
+#> # A tibble: 10 x 3
+#>    employee_id   qtr sales
+#>          <int> <int> <dbl>
+#>  1           1     1  768.
+#>  2           2     1  392.
+#>  3           3     1  406.
+#>  4           4     1  816.
+#>  5           5     1  437.
+#>  6           6     1  385.
+#>  7           7     1  821.
+#>  8           8     1  398.
+#>  9           9     1  899.
+#> 10          10     1  439.
+

Perhaps more usefully we can use SQL templates saved as a file, which means we can make use of our editors’ and IDEs’ SQL capabilities.

+
cat("SELECT * FROM {{ db_name }}.{{ table_name }}", file="tempfile.sql")
+
+sql <- get_sql_from_file("tempfile.sql",
+                         jinja_args=list(db_name="aws_example_dbtools",
+                                         table_name="department"))
+read_sql_query(sql)
+#> # A tibble: 6 x 2
+#>   department_id department_name
+#>           <int> <chr>          
+#> 1             1 Sales          
+#> 2             2 Admin          
+#> 3             3 Management     
+#> 4             4 Technical      
+#> 5             5 Maintenance    
+#> 6             6 HR
+
+
+

Advanced usage

+
+

Creating and maintaining database tables in Athena

+

In this section we will create a new database from our existing database in Athena. Use the start_query_execution_and_wait function to run the SQL creating the database.

+
sql <- "
+CREATE DATABASE IF NOT EXISTS new_db_dbtools
+COMMENT 'Example of running queries and insert into'
+LOCATION 's3://alpha-everyone/dbtools/new_db/'
+"
+
+response <- start_query_execution_and_wait(sql)
+response$Status$State
+#> [1] "SUCCEEDED"
+

Create a derived table in the new database with a CTAS query that both generates the output into S3 and creates the schema of the table. Note that this only inserts the data from quarters 1 and 2.

+
sql <- "
+CREATE TABLE new_db_dbtools.sales_report WITH
+(
+    external_location='s3://alpha-everyone/dbtools/new_db/sales_report'
+) AS
+SELECT qtr as sales_quarter, sum(sales) AS total_sales
+FROM aws_example_dbtools.sales
+WHERE qtr IN (1,2)
+GROUP BY qtr
+"
+
+response <- start_query_execution_and_wait(sql)
+response$Status$State
+#> [1] "SUCCEEDED"
+

We can now use an insert into query to add the data from quarters 3 and 4 as the schema has already been created.

+
sql <- "
+INSERT INTO new_db_dbtools.sales_report
+SELECT qtr as sales_quarter, sum(sales) AS total_sales
+FROM aws_example_dbtools.sales
+WHERE qtr IN (3,4)
+GROUP BY qtr
+"
+
+response <- start_query_execution_and_wait(sql)
+read_sql_query("select * from new_db_dbtools.sales_report") 
+#> # A tibble: 4 x 2
+#>   sales_quarter total_sales
+#>           <int>       <dbl>
+#> 1             3      26419.
+#> 2             4      27559.
+#> 3             1      28168.
+#> 4             2      30697.
+
+
+

Creating a table with partitions

+

Do the same as before but partition the data based on when the report was run. This can make queries more efficient as filtering on the partition columns reduces the amount of data scanned, plus makes incrementally adding data easier.

+
sql <- "
+CREATE TABLE new_db_dbtools.daily_sales_report WITH
+(
+    external_location='s3://alpha-everyone/dbtools/new_db/daily_sales_report',
+    partitioned_by = ARRAY['report_date']
+) AS
+SELECT qtr as sales_quarter, sum(sales) AS total_sales,
+date '2021-01-01' AS report_date
+FROM aws_example_dbtools.sales
+GROUP BY qtr, date '2021-01-01'
+"
+
+response <- start_query_execution_and_wait(sql)
+response$Status$State
+#> [1] "SUCCEEDED"
+

Then, simulating a source database that is updated daily, add more partitioned data.

+
sql <- "
+INSERT INTO new_db_dbtools.daily_sales_report
+SELECT qtr as sales_quarter, sum(sales) AS total_sales,
+date '2021-01-02' AS report_date
+FROM aws_example_dbtools.sales
+GROUP BY qtr, date '2021-01-02'
+"
+
+response <- start_query_execution_and_wait(sql)
+read_sql_query("select * from new_db_dbtools.daily_sales_report")
+#> # A tibble: 8 x 3
+#>   sales_quarter total_sales report_date
+#>           <int>       <dbl> <date>     
+#> 1             1      28168. 2021-01-01 
+#> 2             2      30697. 2021-01-01 
+#> 3             4      27559. 2021-01-01 
+#> 4             3      26419. 2021-01-01 
+#> 5             3      26419. 2021-01-02 
+#> 6             2      30697. 2021-01-02 
+#> 7             1      28168. 2021-01-02 
+#> 8             4      27559. 2021-01-02
+

We can remove a partition and its underlying data using delete_partitions_and_data which uses an expression to match partitions - see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_partitions for more details.

+
delete_partitions_and_data("new_db_dbtools", "daily_sales_report",
+                           "report_date = '2021-01-02'")
+read_sql_query("select * from new_db_dbtools.daily_sales_report")
+#> # A tibble: 4 x 3
+#>   sales_quarter total_sales report_date
+#>           <int>       <dbl> <date>     
+#> 1             1      28168. 2021-01-01 
+#> 2             2      30697. 2021-01-01 
+#> 3             4      27559. 2021-01-01 
+#> 4             3      26419. 2021-01-01
+

Similarly we can remove a table and its data,

+
delete_table_and_data("new_db_dbtools", "daily_sales_report")
+

or the whole database.

+
delete_database_and_data("new_db_dbtools")
+
+
+ + + + + + + + + + + diff --git a/doc/dbtools.pdf b/doc/dbtools.pdf new file mode 100644 index 0000000..320d036 Binary files /dev/null and b/doc/dbtools.pdf differ diff --git a/inst/extdata/boto_utils.py b/inst/extdata/boto_utils.py deleted file mode 100644 index 2527bbd..0000000 --- a/inst/extdata/boto_utils.py +++ /dev/null @@ -1,17 +0,0 @@ -import boto3 -from botocore.credentials import InstanceMetadataProvider, InstanceMetadataFetcher - -def delete_object(bucket, key): - rn = "eu-west-1" - provider = InstanceMetadataProvider( - iam_role_fetcher=InstanceMetadataFetcher(timeout=1000, num_attempts=2) - ) - creds = provider.load().get_frozen_credentials() - s3_client = boto3.client( - "s3", - region_name=rn, - aws_access_key_id=creds.access_key, - aws_secret_access_key=creds.secret_key, - aws_session_token=creds.token, - ) - s3_client.delete_object(Bucket=bucket, Key=key) diff --git a/man/create_temp_table.Rd b/man/create_temp_table.Rd new file mode 100644 index 0000000..298679c --- /dev/null +++ b/man/create_temp_table.Rd @@ -0,0 +1,20 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/wrap.R +\name{create_temp_table} +\alias{create_temp_table} +\title{Create a temporary table} +\usage{ +create_temp_table(sql, table_name) +} +\arguments{ +\item{table_name}{The name of the temp table you wish to create} + +\item{sql:}{The SQL table you want to create a temp table out of. Should +be a table that starts with a WITH or SELECT clause.} +} +\description{ +Create a temporary table +} +\examples{ +`dbtools::create_temp_table("SELECT a_col, count(*) as n FROM a_database.table GROUP BY a_col", table_name="temp_table_1")` +} diff --git a/man/dbtools.Rd b/man/dbtools.Rd new file mode 100644 index 0000000..c45cab0 --- /dev/null +++ b/man/dbtools.Rd @@ -0,0 +1,26 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/dbtools.R +\docType{package} +\name{dbtools} +\alias{dbtools} +\title{dbtools: A package for accessing AWS Athena from the Analytical Platform.} +\section{About}{ + +The dbtools package is used to run SQL queries configured for the +Analytical Platform. This package is a reticulated +wrapper around the Python library pydbtools +which uses AWS Wrangler's Athena module but adds additional functionality +(like Jinja templating, creating temporary tables) and alters some configuration +to our specification. + +Alternatively you might want to use +Rdbtools, which has the +advantages of being R-native, so no messing with `reticulate` and Python, and +supporting `dbplyr`. Please note the caveat about support, though. +} + +\seealso{ +\url{https://github.com/moj-analytical-services/pydbtools} + +\url{https://github.com/moj-analytical-services/Rdbtools} +} diff --git a/man/delete_database_and_data.Rd b/man/delete_database_and_data.Rd new file mode 100644 index 0000000..447fa89 --- /dev/null +++ b/man/delete_database_and_data.Rd @@ -0,0 +1,14 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/wrap.R +\name{delete_database_and_data} +\alias{delete_database_and_data} +\title{Deletes both an Athena database and the underlying data on S3.} +\usage{ +delete_database_and_data(database) +} +\arguments{ +\item{database}{Database name} +} +\description{ +Deletes both an Athena database and the underlying data on S3. +} diff --git a/man/delete_object.Rd b/man/delete_object.Rd deleted file mode 100644 index d35c13e..0000000 --- a/man/delete_object.Rd +++ /dev/null @@ -1,25 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/utils.R -\name{delete_object} -\alias{delete_object} -\title{delete_object} -\usage{ -delete_object(bucket, key) -} -\arguments{ -\item{bucket}{A string specifying the s3 bucket name} - -\item{key}{File path to the s3 object} -} -\description{ -uses boto3 (in python) to delete an S3 object (used by read_sql function to clean up after itself) -} -\details{ -Will send an SQL query to athena and wait for it to complete. Once the query has completed the resulting sql query will be read using read.csv (base R), read_csv (readr) or fread (data.table). -Function returns dataframe. If needing more a more bespoke or self defined data reading function and arguments use dbtools::get_athena_query_response to send an SQL query and return the s3 path to data in csv format. -} -\examples{ -# delete a file from S3 -# (note this is not exported so have to use tripple colon) -dbtools:::delete_object('my_bucket', 'path/to/file.csv') -} diff --git a/man/delete_partitions_and_data.Rd b/man/delete_partitions_and_data.Rd new file mode 100644 index 0000000..4f16f48 --- /dev/null +++ b/man/delete_partitions_and_data.Rd @@ -0,0 +1,26 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/wrap.R +\name{delete_partitions_and_data} +\alias{delete_partitions_and_data} +\title{Deletes partitions and the underlying data on S3 from an Athena +database table matching an expression.} +\usage{ +delete_partitions_and_data(database, table, expression) +} +\arguments{ +\item{database}{The database name.} + +\item{table}{The table name.} + +\item{expression}{The expression to match.} +} +\description{ +Deletes partitions and the underlying data on S3 from an Athena +database table matching an expression. +} +\examples{ +`dbtools::delete_partitions_and_data("my_database", "my_table", "year = 2020 and month = 5")` +} +\seealso{ +https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_partitions +} diff --git a/man/delete_table_and_data.Rd b/man/delete_table_and_data.Rd new file mode 100644 index 0000000..4e82a67 --- /dev/null +++ b/man/delete_table_and_data.Rd @@ -0,0 +1,19 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/wrap.R +\name{delete_table_and_data} +\alias{delete_table_and_data} +\title{Deletes both a table from an Athena database and the underlying data on S3.} +\usage{ +delete_table_and_data(database, table) +} +\arguments{ +\item{database}{The database name.} + +\item{table}{The table name.} +} +\description{ +Deletes both a table from an Athena database and the underlying data on S3. +} +\examples{ +`dbtools::delete_table_and_data("__temp__", "my_table")` +} diff --git a/man/describe_table.Rd b/man/describe_table.Rd new file mode 100644 index 0000000..9bbd20f --- /dev/null +++ b/man/describe_table.Rd @@ -0,0 +1,22 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/wrap.R +\name{describe_table} +\alias{describe_table} +\title{Show the list of columns, including partition columns: 'DESCRIBE table;'.} +\usage{ +describe_table(table, database) +} +\arguments{ +\item{table}{Table name} + +\item{database}{AWS Glue/Athena database name} +} +\value{ +DataFrame filled by formatted infos. +} +\description{ +Show the list of columns, including partition columns: 'DESCRIBE table;'. +} +\examples{ +`df_table = dbtools::describe_table(table='my_table', database='my_db')` +} diff --git a/man/get_athena_query_response.Rd b/man/get_athena_query_response.Rd deleted file mode 100644 index 436fb77..0000000 --- a/man/get_athena_query_response.Rd +++ /dev/null @@ -1,39 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/get_athena_query_response.R -\name{get_athena_query_response} -\alias{get_athena_query_response} -\title{get_athena_query_response} -\usage{ -get_athena_query_response(sql_query, return_athena_types = FALSE, - timeout = NULL) -} -\arguments{ -\item{sql_query}{A string specifying the SQL query you want to send to athena. See packages github readme for info on the flavour of SQL Athena uses.} - -\item{return_athena_types}{Specifies if the list describing the data's meta data types should be defined using athena datatypes (TRUE) or using the data engineering team's generic metadata types (FALSE). If not specified the default value of this input parameter is set to FALSE.} - -\item{timeout}{Specifies How long you want your sql query to wait before it gives up (in seconds). Default parameter is NULL which will mean SQL query will not timeout and could wait forever if an issue occured.} -} -\value{ -A list with two keys. [1] s3_path : a string pointing to the s3 path of the athena query. [2] meta : a list that has the name and type of each column in of the data in the s3_path. Can be used to get correct data types of your output when read in to R (note the order of the columns matches the order they appear in the data). -} -\description{ -uses boto3 (in python) to send an sql query to athena and return the resulting data's path in s3 and meta data -} -\details{ -Will send an SQL query to athena and wait for it to complete. Once the query has completed the funtion will return a list containing the s3 path to your athena query and meta data about the output data -} -\examples{ -# Read an sql query using readr::read_csv -response <- dbtools::get_athena_query_response("SELECT * from crest_v1.flatfile limit 10000") - -# print out path to athena query output (as a csv) -print(response$s3_path) - -# print out meta data -print(response$meta) - -# Read in data using whatever csv reader you want -s3_path_stripped = gsub("s3://", "", response$s3_path) -df <- s3tools::read_using(FUN = read.csv, s3_path=s3_path_stripped) -} diff --git a/man/get_query_columns_types.Rd b/man/get_query_columns_types.Rd new file mode 100644 index 0000000..6e3697e --- /dev/null +++ b/man/get_query_columns_types.Rd @@ -0,0 +1,20 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/wrap.R +\name{get_query_columns_types} +\alias{get_query_columns_types} +\title{Get the data type of all columns queried.} +\usage{ +get_query_columns_types(query_id) +} +\arguments{ +\item{query_id}{Athena query execution ID} +} +\value{ +List with all data types +} +\description{ +Get the data type of all columns queried. +} +\examples{ +`dbtools::get_query_columns_types('query-execution-id')` +} diff --git a/man/get_query_execution.Rd b/man/get_query_execution.Rd new file mode 100644 index 0000000..189e754 --- /dev/null +++ b/man/get_query_execution.Rd @@ -0,0 +1,23 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/wrap.R +\name{get_query_execution} +\alias{get_query_execution} +\title{Fetch query execution details.} +\usage{ +get_query_execution(query_id) +} +\arguments{ +\item{query_id}{Athena query execution ID} +} +\value{ +List with the get_query_execution response. +} +\description{ +Fetch query execution details. +} +\examples{ +`res <- dbtools::get_query_execution(query_id='query-execution-id')` +} +\seealso{ +https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html#Athena.Client.get_query_execution +} diff --git a/man/get_sql_from_file.Rd b/man/get_sql_from_file.Rd new file mode 100644 index 0000000..9836ee1 --- /dev/null +++ b/man/get_sql_from_file.Rd @@ -0,0 +1,21 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/wrap.R +\name{get_sql_from_file} +\alias{get_sql_from_file} +\title{Read in an SQL file and inject arguments with Jinja (if given params).} +\usage{ +get_sql_from_file(filepath, jinja_args = NULL) +} +\arguments{ +\item{filepath}{A filepath to your SQL file.} + +\item{jinja_args}{If not NULL, will pass the read +in SQL file through a jinja template to render the template. +Otherwise will just return the SQL file as is. Defaults to NULL.} +} +\value{ + +} +\description{ +Read in an SQL file and inject arguments with Jinja (if given params). +} diff --git a/man/pipe.Rd b/man/pipe.Rd new file mode 100644 index 0000000..1f8f237 --- /dev/null +++ b/man/pipe.Rd @@ -0,0 +1,20 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/utils-pipe.R +\name{\%>\%} +\alias{\%>\%} +\title{Pipe operator} +\usage{ +lhs \%>\% rhs +} +\arguments{ +\item{lhs}{A value or the magrittr placeholder.} + +\item{rhs}{A function call using the magrittr semantics.} +} +\value{ +The result of calling `rhs(lhs)`. +} +\description{ +See \code{magrittr::\link[magrittr:pipe]{\%>\%}} for details. +} +\keyword{internal} diff --git a/man/read_sql.Rd b/man/read_sql.Rd index 73d8feb..0313d74 100644 --- a/man/read_sql.Rd +++ b/man/read_sql.Rd @@ -1,31 +1,32 @@ % Generated by roxygen2: do not edit by hand -% Please edit documentation in R/read_sql.R +% Please edit documentation in R/read.R \name{read_sql} \alias{read_sql} -\title{read_sql} +\title{Uses boto3 (in python) to send an sql query to athena and return an R dataframe, tibble or data.table based on user preference.} \usage{ -read_sql(sql_query, return_df_as = "tibble", timeout = NULL) +read_sql(sql_query, return_df_as = "tibble") } \arguments{ \item{sql_query}{A string specifying the SQL query you want to send to athena. See packages github readme for info on the flavour of SQL Athena uses.} -\item{return_df_as}{String specifying what the table should be returned as i.e. 'dataframe' (reads data using read.csv), 'tibble' (reads data using readr::read_csv) or 'data.table' (reads data using data.table::fread). Default is 'tibble'. Not all tables returned are a DataFrame class. -Only return_df_as set to 'tibble' maintains date and datetime formats. dataframe and data.table will convert date and datetimes to characters.} - -\item{timeout}{Specifies How long you want your sql query to wait before it gives up (in seconds). Default parameter is NULL which will mean SQL query will not timeout and could wait forever if an issue occured.} +\item{return_df_as}{String specifying what the table should be returned as i.e. 'dataframe', 'tibble' (converts data using tibble::as_tibble) or 'data.table' (converts data using data.table::as.data.table). Default is 'tibble'. Not all tables returned are a DataFrame class.} } \value{ -A table as a Dataframe, tibble or data.table +A table as a dataframe, tibble or data.table } \description{ -uses boto3 (in python) to send an sql query to athena and return an R dataframe, tibble or data.table based on user preference. +Uses boto3 (in python) to send an sql query to athena and return an R dataframe, tibble or data.table based on user preference. } \details{ -Will send an SQL query to athena and wait for it to complete. Once the query has completed the resulting sql query will be read using read.csv (base R), read_csv (readr) or fread (data.table). -Function returns dataframe. If needing more a more bespoke or self defined data reading function and arguments use dbtools::get_athena_query_response to send an SQL query and return the s3 path to data in csv format. +Will send an SQL query to Athena and wait for it to complete. Once the query has completed the resulting sql query will be read using arrow. +Function returns dataframe. If needing more a more bespoke or self defined data reading function and arguments use dbtools::start_query_and_wait to send an SQL query and return the s3 path to data in csv format. } \examples{ -# Read an sql query using readr::read_csv i.e. returning a Tibble -df <- dbtools::read_sql("SELECT * from crest_v1.flatfile limit 10000") -df +# Read an sql query returning a tibble +``` +df <- dbtools::read_sql( + "SELECT * from crest_v1.flatfile limit 10000", + return_df_as="tibble" +) +``` } diff --git a/man/read_sql_query.Rd b/man/read_sql_query.Rd new file mode 100644 index 0000000..0f3937e --- /dev/null +++ b/man/read_sql_query.Rd @@ -0,0 +1,20 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/read.R +\name{read_sql_query} +\alias{read_sql_query} +\title{Send an SQL query to Athena and receive a dataframe.} +\usage{ +read_sql_query(sql) +} +\arguments{ +\item{sql}{An SQL query} +} +\value{ +Dataframe or tibble if the tibble library is loaded. +} +\description{ +Send an SQL query to Athena and receive a dataframe. +} +\examples{ +`df <- dbtools::read_sql_query('select * from my_db.my_table')` +} diff --git a/man/render_sql_template.Rd b/man/render_sql_template.Rd new file mode 100644 index 0000000..4983c84 --- /dev/null +++ b/man/render_sql_template.Rd @@ -0,0 +1,16 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/wrap.R +\name{render_sql_template} +\alias{render_sql_template} +\title{Takes an SQL query and injects arguments with Jinja.} +\usage{ +render_sql_template(sql, jinja_args) +} +\arguments{ +\item{sql}{An SQL query} + +\item{jinja_args}{Arguments that are referenced in the SQL file} +} +\description{ +Takes an SQL query and injects arguments with Jinja. +} diff --git a/man/repair_table.Rd b/man/repair_table.Rd new file mode 100644 index 0000000..6df1f42 --- /dev/null +++ b/man/repair_table.Rd @@ -0,0 +1,26 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/wrap.R +\name{repair_table} +\alias{repair_table} +\title{Run the Hive's metastore consistency check: 'MSCK REPAIR TABLE table;'.} +\usage{ +repair_table(table, database) +} +\arguments{ +\item{table}{Table name} + +\item{database}{AWS Glue/Athena database name} +} +\value{ +Query final state ('SUCCEEDED', 'FAILED', 'CANCELLED') +} +\description{ +Recovers partitions and data associated with partitions. +Use this statement when you add partitions to the catalog. +It is possible it will take some time to add all partitions. +If this operation times out, it will be in an incomplete state +where only a few partitions are added to the catalog. +} +\examples{ +`query_final_state = dbtools::repair_table(table='...', database='...')` +} diff --git a/man/show_create_table.Rd b/man/show_create_table.Rd new file mode 100644 index 0000000..e98640f --- /dev/null +++ b/man/show_create_table.Rd @@ -0,0 +1,22 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/wrap.R +\name{show_create_table} +\alias{show_create_table} +\title{Generate the query that created a table: 'SHOW CREATE TABLE table;'.} +\usage{ +show_create_table(table, database) +} +\arguments{ +\item{table}{Table name} + +\item{database}{AWS Glue/Athena database name} +} +\value{ +The query that created the table +} +\description{ +Generate the query that created a table: 'SHOW CREATE TABLE table;'. +} +\examples{ +`df_table = dbtools::show_create_table(table='my_table', database='my_db')` +} diff --git a/man/start_query_execution.Rd b/man/start_query_execution.Rd new file mode 100644 index 0000000..ab494ba --- /dev/null +++ b/man/start_query_execution.Rd @@ -0,0 +1,22 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/wrap.R +\name{start_query_execution} +\alias{start_query_execution} +\title{Start a SQL Query against AWS Athena} +\usage{ +start_query_execution(sql, wait = FALSE) +} +\arguments{ +\item{sql}{SQL query} + +\item{wait}{Default FALSE, indicates whether to wait for the query to finish and return a dictionary with the query execution response.} +} +\value{ +Query execution ID if `wait` is set to `False`, list with the get_query_execution response otherwise. +} +\description{ +Start a SQL Query against AWS Athena +} +\seealso{ +https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html#Athena.Client.get_query_execution +} diff --git a/man/start_query_execution_and_wait.Rd b/man/start_query_execution_and_wait.Rd new file mode 100644 index 0000000..1631eb2 --- /dev/null +++ b/man/start_query_execution_and_wait.Rd @@ -0,0 +1,23 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/wrap.R +\name{start_query_execution_and_wait} +\alias{start_query_execution_and_wait} +\title{Calls start_query_execution followed by wait_query.} +\usage{ +start_query_execution_and_wait(sql) +} +\arguments{ +\item{sql}{An SQL string. Which works with __TEMP__ references.} +} +\value{ +List with the get_query_execution response. +} +\description{ +Calls start_query_execution followed by wait_query. +} +\examples{ +`res <- dbtools::start_query_execution_and_wait('select * from __temp__.my_table')` +} +\seealso{ +https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html#Athena.Client.get_query_execution +} diff --git a/man/stop_query_execution.Rd b/man/stop_query_execution.Rd new file mode 100644 index 0000000..da0af78 --- /dev/null +++ b/man/stop_query_execution.Rd @@ -0,0 +1,14 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/wrap.R +\name{stop_query_execution} +\alias{stop_query_execution} +\title{Stop a query execution} +\usage{ +stop_query_execution(query_id) +} +\arguments{ +\item{query_id}{Athena query execution ID} +} +\description{ +Stop a query execution +} diff --git a/man/wait_query.Rd b/man/wait_query.Rd new file mode 100644 index 0000000..af3c311 --- /dev/null +++ b/man/wait_query.Rd @@ -0,0 +1,23 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/wrap.R +\name{wait_query} +\alias{wait_query} +\title{Wait for a query to end.} +\usage{ +wait_query(query_id) +} +\arguments{ +\item{query_id}{Athena query execution ID} +} +\value{ +List with the get_query_execution response +} +\description{ +Wait for a query to end. +} +\examples{ +`res <- dbtools::wait_query(query_id)` +} +\seealso{ +https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html#Athena.Client.get_query_execution +} diff --git a/renv.lock b/renv.lock new file mode 100644 index 0000000..a37a940 --- /dev/null +++ b/renv.lock @@ -0,0 +1,354 @@ +{ + "R": { + "Version": "4.1.2", + "Repositories": [ + { + "Name": "CRAN", + "URL": "https://packagemanager.rstudio.com/cran/2021-05-17" + } + ] + }, + "Python": { + "Version": "3.8.10", + "Type": "virtualenv", + "Name": null + }, + "Packages": { + "Matrix": { + "Package": "Matrix", + "Version": "1.3-4", + "Source": "Repository", + "Repository": "CRAN", + "Hash": "4ed05e9c9726267e4a5872e09c04587c" + }, + "R6": { + "Package": "R6", + "Version": "2.5.0", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "b203113193e70978a696b2809525649d" + }, + "Rcpp": { + "Package": "Rcpp", + "Version": "1.0.7", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "dab19adae4440ae55aa8a9d238b246bb" + }, + "arrow": { + "Package": "arrow", + "Version": "4.0.0.1", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "366f5c066a920a1dc8d52d8bac1f33f7" + }, + "assertthat": { + "Package": "assertthat", + "Version": "0.2.1", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "50c838a310445e954bc13f26f26a6ecf" + }, + "bit": { + "Package": "bit", + "Version": "4.0.4", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "f36715f14d94678eea9933af927bc15d" + }, + "bit64": { + "Package": "bit64", + "Version": "4.0.5", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "9fe98599ca456d6552421db0d6772d8f" + }, + "brio": { + "Package": "brio", + "Version": "1.1.2", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "2f01e16ff9571fe70381c7b9ae560dc4" + }, + "callr": { + "Package": "callr", + "Version": "3.7.0", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "461aa75a11ce2400245190ef5d3995df" + }, + "cli": { + "Package": "cli", + "Version": "2.5.0", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "a94ba44cee3ea571e813721e64184172" + }, + "cpp11": { + "Package": "cpp11", + "Version": "0.2.7", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "730eebcc741a5c36761f7d4d0f5e37b8" + }, + "crayon": { + "Package": "crayon", + "Version": "1.4.1", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "e75525c55c70e5f4f78c9960a4b402e9" + }, + "data.table": { + "Package": "data.table", + "Version": "1.14.0", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "d1b8b1a821ee564a3515fa6c6d5c52dc" + }, + "desc": { + "Package": "desc", + "Version": "1.3.0", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "b6963166f7f10b970af1006c462ce6cd" + }, + "diffobj": { + "Package": "diffobj", + "Version": "0.3.4", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "feb5b7455eba422a2c110bb89852e6a3" + }, + "digest": { + "Package": "digest", + "Version": "0.6.27", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "a0cbe758a531d054b537d16dff4d58a1" + }, + "ellipsis": { + "Package": "ellipsis", + "Version": "0.3.2", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "bb0eec2fe32e88d9e2836c2f73ea2077" + }, + "evaluate": { + "Package": "evaluate", + "Version": "0.14", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "ec8ca05cffcc70569eaaad8469d2a3a7" + }, + "fansi": { + "Package": "fansi", + "Version": "0.4.2", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "fea074fb67fe4c25d47ad09087da847d" + }, + "glue": { + "Package": "glue", + "Version": "1.4.2", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "6efd734b14c6471cfe443345f3e35e29" + }, + "jsonlite": { + "Package": "jsonlite", + "Version": "1.7.2", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "98138e0994d41508c7a6b84a0600cfcb" + }, + "lattice": { + "Package": "lattice", + "Version": "0.20-45", + "Source": "Repository", + "Repository": "CRAN", + "Hash": "b64cdbb2b340437c4ee047a1f4c4377b" + }, + "lifecycle": { + "Package": "lifecycle", + "Version": "1.0.0", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "3471fb65971f1a7b2d4ae7848cf2db8d" + }, + "magrittr": { + "Package": "magrittr", + "Version": "2.0.1", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "41287f1ac7d28a92f0a286ed507928d3" + }, + "pillar": { + "Package": "pillar", + "Version": "1.6.1", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "8672ae02bd20f6479bce2d06c7ff1401" + }, + "pkgconfig": { + "Package": "pkgconfig", + "Version": "2.0.3", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "01f28d4278f15c76cddbea05899c5d6f" + }, + "pkgload": { + "Package": "pkgload", + "Version": "1.2.1", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "463642747f81879e6752485aefb831cf" + }, + "png": { + "Package": "png", + "Version": "0.1-7", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "03b7076c234cb3331288919983326c55" + }, + "praise": { + "Package": "praise", + "Version": "1.0.0", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "a555924add98c99d2f411e37e7d25e9f" + }, + "processx": { + "Package": "processx", + "Version": "3.5.2", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "0cbca2bc4d16525d009c4dbba156b37c" + }, + "ps": { + "Package": "ps", + "Version": "1.6.0", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "32620e2001c1dce1af49c49dccbb9420" + }, + "purrr": { + "Package": "purrr", + "Version": "0.3.4", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "97def703420c8ab10d8f0e6c72101e02" + }, + "rappdirs": { + "Package": "rappdirs", + "Version": "0.3.3", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "5e3c5dc0b071b21fa128676560dbe94d" + }, + "rematch2": { + "Package": "rematch2", + "Version": "2.1.2", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "76c9e04c712a05848ae7a23d2f170a40" + }, + "renv": { + "Package": "renv", + "Version": "0.13.2", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "079cb1f03ff972b30401ed05623cbe92" + }, + "reticulate": { + "Package": "reticulate", + "Version": "1.20", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "30ab0ea8c8d3dd16a3fa06903449bbfb" + }, + "rlang": { + "Package": "rlang", + "Version": "0.4.11", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "515f341d3affe0de9e4a7f762efb0456" + }, + "rprojroot": { + "Package": "rprojroot", + "Version": "2.0.2", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "249d8cd1e74a8f6a26194a91b47f21d1" + }, + "rstudioapi": { + "Package": "rstudioapi", + "Version": "0.13", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "06c85365a03fdaf699966cc1d3cf53ea" + }, + "stringi": { + "Package": "stringi", + "Version": "1.6.1", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "3b9a9c9454b864b08713715396689b9e" + }, + "stringr": { + "Package": "stringr", + "Version": "1.4.0", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "0759e6b6c0957edb1311028a49a35e76" + }, + "testthat": { + "Package": "testthat", + "Version": "3.0.2", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "495e0434d9305716b6a87031570ce109" + }, + "tibble": { + "Package": "tibble", + "Version": "3.1.2", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "349b40a9f144516d537c875e786ec8b8" + }, + "tidyselect": { + "Package": "tidyselect", + "Version": "1.1.1", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "7243004a708d06d4716717fa1ff5b2fe" + }, + "utf8": { + "Package": "utf8", + "Version": "1.2.1", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "c3ad47dc6da0751f18ed53c4613e3ac7" + }, + "vctrs": { + "Package": "vctrs", + "Version": "0.3.8", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "ecf749a1b39ea72bd9b51b76292261f1" + }, + "waldo": { + "Package": "waldo", + "Version": "0.2.5", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "20c45f1d511a3f730b7b469f4d11e104" + }, + "withr": { + "Package": "withr", + "Version": "2.4.2", + "Source": "Repository", + "Repository": "RSPM", + "Hash": "ad03909b44677f930fa156d47d7a3aeb" + } + } +} diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..bd83923 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,40 @@ +asn1crypto==1.4.0 +awswrangler==2.13.0 +beautifulsoup4==4.10.0 +boto3==1.20.39 +botocore==1.23.39 +certifi==2021.10.8 +charset-normalizer==2.0.10 +decorator==5.1.1 +et-xmlfile==1.1.0 +idna==3.3 +Jinja2==3.0.3 +jmespath==0.10.0 +jsonpath-ng==1.5.3 +lxml==4.7.1 +MarkupSafe==2.0.1 +numpy==1.22.1 +openpyxl==3.0.9 +opensearch-py==1.0.0 +packaging==21.3 +pandas==1.2.5 +pg8000==1.22.1 +ply==3.11 +progressbar2==3.55.0 +pyarrow==6.0.1 +pydbtools==5.1.0 +PyMySQL==1.0.2 +pyparsing==3.0.6 +python-dateutil==2.8.2 +python-utils==3.1.0 +pytz==2021.3 +redshift-connector==2.0.903 +requests==2.27.1 +requests-aws4auth==1.1.1 +s3transfer==0.5.0 +scramp==1.4.1 +six==1.16.0 +soupsieve==2.3.1 +sql-metadata==2.3.0 +sqlparse==0.4.2 +urllib3==1.26.8 diff --git a/vignettes/.gitignore b/vignettes/.gitignore new file mode 100644 index 0000000..097b241 --- /dev/null +++ b/vignettes/.gitignore @@ -0,0 +1,2 @@ +*.html +*.R diff --git a/vignettes/dbtools.Rmd b/vignettes/dbtools.Rmd new file mode 100644 index 0000000..440fe09 --- /dev/null +++ b/vignettes/dbtools.Rmd @@ -0,0 +1,303 @@ +--- +title: "Introduction to dbtools" +output: rmarkdown::html_vignette +vignette: > + %\VignetteIndexEntry{dbtools} + %\VignetteEngine{knitr::rmarkdown} + %\VignetteEncoding{UTF-8} +--- + +```{r, include = FALSE} +knitr::opts_chunk$set( + collapse = TRUE, + comment = "#>" +) +devtools::load_all() +``` + +```{python, include=FALSE} +import os +import pandas as pd +import awswrangler as wr +import pydbtools as pydb + +# setup your own testing area (set foldername = GH username) +foldername = "mratford" # GH username +foldername = foldername.lower().replace("-","_") + +bucketname = "alpha-everyone" +s3_base_path = f"s3://{bucketname}/{foldername}/" + +db_name = f"aws_example_dbtools" +source_db_base_path = f"s3://{bucketname}/{foldername}/source_db/" + +# Delete all the s3 files in a given path +if wr.s3.list_objects(s3_base_path): + print("deleting objs") + wr.s3.delete_objects(s3_base_path) + +# Delete the database if it exists +df_dbs = wr.catalog.databases(None) +if db_name in df_dbs["Database"].to_list(): + print(f"{db_name} found deleting") + wr.catalog.delete_database( + name=db_name + ) + +# Setup source database +# Create the database +wr.catalog.create_database(db_name) + +# Iterate through the tables in data/ and write them to our db using awswrangler +for table_name in ["department", "employees", "sales"]: + df = pd.read_csv(f"data/{table_name}.csv") + table_path = os.path.join(source_db_base_path, table_name) + wr.s3.to_parquet( + df=df, + path=table_path, + index=False, + dataset=True, # True allows the other params below i.e. overwriting to db.table + database=db_name, + table=table_name, + mode="overwrite", + ) +``` + +`dbtools` is a library used to query AWS Athena databases from R on the +Ministry of Justice's Analytical Platform. It uses the Python library +`pydbtools` and inherits much of its functionality, including creating +and querying temporary tables and injecting SQL queries +with template arguments. + +```{r setup} +library(dbtools) +``` + +## Reading SQL queries + +The `read_sql_query` function is used to obtain R dataframes from SQL queries +sent to Athena. + +```{r} +read_sql_query("select * from aws_example_dbtools.employees limit 5") +``` + +If a tibble is preferred the `read_sql` function is provided + +```{r} +read_sql("select * from aws_example_dbtools.department limit 5", + return_df_as="tibble") +``` + +or for a `data.table` + +```{r} +read_sql("select * from aws_example_dbtools.sales limit 5", + return_df_as="data.table") +``` + + +## Creating temporary SQL tables + +The `create_temp_table` function allows you to create tables which can be +referred to in subsequent queries from the `__temp__` database. For example, +to create a table showing total sales per employee from the tables above create +a temporary total sales table. + +```{r} +sql <- " +SELECT employee_id, sum(sales) as total_sales +FROM aws_example_dbtools.sales +GROUP BY employee_id +" +create_temp_table(sql, table_name="total_sales") +``` + +Then create a table of employees from the sales department. + +```{r} +sql <- " +SELECT e.employee_id, e.forename, e.surname, d.department_name +FROM aws_example_dbtools.employees AS e +LEFT JOIN aws_example_dbtools.department AS d +ON e.department_id = d.department_id +WHERE e.department_id = 1 +" +create_temp_table(sql, table_name="sales_employees") +``` + +The two temporary tables can then be joined to provide the final table. + +```{r} +sql <- " +SELECT se.*, ts.total_sales +FROM __temp__.sales_employees AS se +INNER JOIN __temp__.total_sales AS ts +ON se.employee_id = ts.employee_id +" +read_sql_query(sql) +``` + +## SQL templating + +Sometimes you will want to run similar SQL queries which differ only by, +for example, table or column names. In these cases +SQL templates can be created to SQL queries populated by templated variables, +using Jinja2 templating (https://jinja2docs.readthedocs.io/en/stable/index.html). +For example, + +```{r} +sql_template = "select * from {{ db_name }}.{{ table }} limit 10" +sql <- render_sql_template(sql_template, + list(db_name="aws_example_dbtools", + table="department")) +sql +``` + +The rendered SQL can then be used to query Athena as usual. + +```{r} +read_sql_query(sql) +``` + +The same template can be used to read a different table. + +```{r} +sql <- render_sql_template(sql_template, + list(db_name="aws_example_dbtools", + table="sales")) +read_sql_query(sql) +``` + +Perhaps more usefully we can use SQL templates saved as a file, which means we +can make use of our editors' and IDEs' SQL capabilities. + +```{r} +cat("SELECT * FROM {{ db_name }}.{{ table_name }}", file="tempfile.sql") + +sql <- get_sql_from_file("tempfile.sql", + jinja_args=list(db_name="aws_example_dbtools", + table_name="department")) +read_sql_query(sql) +``` + +## Advanced usage +### Creating and maintaining database tables in Athena + +In this section we will create a new database from our existing database in +Athena. Use the `start_query_execution_and_wait` function +to run the SQL creating the database. + +```{r} +sql <- " +CREATE DATABASE IF NOT EXISTS new_db_dbtools +COMMENT 'Example of running queries and insert into' +LOCATION 's3://alpha-everyone/dbtools/new_db/' +" + +response <- start_query_execution_and_wait(sql) +response$Status$State +``` + +Create a derived table in the new database with a CTAS query that both +generates the output into S3 and creates the schema of the table. Note that +this only inserts the data from quarters 1 and 2. + +```{r} +sql <- " +CREATE TABLE new_db_dbtools.sales_report WITH +( + external_location='s3://alpha-everyone/dbtools/new_db/sales_report' +) AS +SELECT qtr as sales_quarter, sum(sales) AS total_sales +FROM aws_example_dbtools.sales +WHERE qtr IN (1,2) +GROUP BY qtr +" + +response <- start_query_execution_and_wait(sql) +response$Status$State +``` + +We can now use an insert into query to add the data from quarters 3 and 4 as +the schema has already been created. + +```{r} +sql <- " +INSERT INTO new_db_dbtools.sales_report +SELECT qtr as sales_quarter, sum(sales) AS total_sales +FROM aws_example_dbtools.sales +WHERE qtr IN (3,4) +GROUP BY qtr +" + +response <- start_query_execution_and_wait(sql) +read_sql_query("select * from new_db_dbtools.sales_report") +``` + +### Creating a table with partitions + +Do the same as before but partition the data based on when the report was run. +This can make queries more efficient as filtering on the partition columns +reduces the amount of data scanned, plus makes incrementally adding data +easier. + +```{r} +sql <- " +CREATE TABLE new_db_dbtools.daily_sales_report WITH +( + external_location='s3://alpha-everyone/dbtools/new_db/daily_sales_report', + partitioned_by = ARRAY['report_date'] +) AS +SELECT qtr as sales_quarter, sum(sales) AS total_sales, +date '2021-01-01' AS report_date +FROM aws_example_dbtools.sales +GROUP BY qtr, date '2021-01-01' +" + +response <- start_query_execution_and_wait(sql) +response$Status$State +``` + +Then, simulating a source database that is updated daily, add more partitioned +data. + +```{r} +sql <- " +INSERT INTO new_db_dbtools.daily_sales_report +SELECT qtr as sales_quarter, sum(sales) AS total_sales, +date '2021-01-02' AS report_date +FROM aws_example_dbtools.sales +GROUP BY qtr, date '2021-01-02' +" + +response <- start_query_execution_and_wait(sql) +read_sql_query("select * from new_db_dbtools.daily_sales_report") +``` + +We can remove a partition and its underlying data using +`delete_partitions_and_data` which uses an expression to match partitions - +see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html#Glue.Client.get_partitions +for more details. + +```{r} +delete_partitions_and_data("new_db_dbtools", "daily_sales_report", + "report_date = '2021-01-02'") +read_sql_query("select * from new_db_dbtools.daily_sales_report") +``` + +Similarly we can remove a table and its data, + +```{r} +delete_table_and_data("new_db_dbtools", "daily_sales_report") +``` + +or the whole database. + +```{r} +delete_database_and_data("new_db_dbtools") +``` + +```{r, include=FALSE} +delete_database_and_data("aws_example_dbtools") +```