Skip to content

Commit

Permalink
Merge pull request #21 from moj-analytical-services/pydbools_dep
Browse files Browse the repository at this point in the history
Making dbtools dependent on pydbtools
  • Loading branch information
isichei authored Jun 14, 2019
2 parents 14c1500 + bf26ff4 commit f71ccce
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 91 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: dbtools
Type: Package
Title: Uses R wrapper function to send queries to athena.
Version: 2.0.1
Version: 2.0.2
Author: Karik Isichei
Maintainer: The package maintainer <[email protected]>
Description: See title.
Expand Down
9 changes: 3 additions & 6 deletions R/get_athena_query_response.R
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,9 @@

get_athena_query_response <- function(sql_query, return_athena_types=FALSE, timeout = NULL){

# Annoyingly I think you have to pull it in as the source_python function doesn't seem to be exported properly
# require(reticulate)

python_script <- system.file("extdata", "get_athena_query_response.py", package = "dbtools")
reticulate::source_python(python_script)
pydbtools <- reticulate::import("pydbtools")
s3tools::get_credentials()
response <- get_athena_query_response(sql_query=sql_query, return_athena_types=return_athena_types, timeout=timeout)
response <- pydbtools$get_athena_query_response(sql_query=sql_query, return_athena_types=return_athena_types, timeout=timeout, force_ec2=TRUE)
return(response)
}

43 changes: 28 additions & 15 deletions R/read_sql.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#'
#'@param timeout Specifies How long you want your sql query to wait before it gives up (in seconds). Default parameter is NULL which will mean SQL query will not timeout and could wait forever if an issue occured.
#'
#'@return A table as a Dataframe, tibble or data.table
#'@return A table as a dataframe, tibble or data.table
#'
#'@examples
#'# Read an sql query using readr::read_csv i.e. returning a Tibble
Expand All @@ -38,26 +38,39 @@ read_sql <- function(sql_query, return_df_as='tibble', timeout = NULL){
bucket <- unlist(strsplit(s3_path_stripped, '/'))[1]
s3_key <- gsub(paste0(bucket,"/"), "", s3_path_stripped)

data_conversion <- dbtools:::get_data_conversion(return_df_as)
col_classes = list()
# Check if text output
if(endsWith(s3_path_stripped, ".txt")){
if(return_df_as == 'tibble'){
df <- s3tools::read_using(FUN=readr::read_csv, s3_path=s3_path_stripped, col_names=FALSE, col_types=list(readr::col_character()))
} else if(return_df_as == 'data.table'){
df <- s3tools::read_using(FUN=data.table::fread, s3_path=s3_path_stripped, header=FALSE, colClasses=c("character"))
} else {
df <- s3tools::read_using(FUN=read.csv, s3_path=s3_path_stripped, header=FALSE, colClasses=c("character"))
}
colnames(df) <- c("output")
} else {
# Get meta Ddata conversion
data_conversion <- dbtools:::get_data_conversion(return_df_as)
col_classes = list()
for(m in response$meta){
col_classes[[m$name]] = data_conversion[[m$type]]
}
col_classes_vec = unlist(col_classes)
col_classes_vec = unlist(col_classes)

if(return_df_as == 'tibble'){
#This is the best R work arround I could find to replicate Python's **kwargs...
col_types = do.call(readr::cols, col_classes)
df <- s3tools::read_using(FUN=readr::read_csv, s3_path=s3_path_stripped, col_names=TRUE, col_types=col_types)
if(return_df_as == 'tibble'){
#This is the best R work arround I could find to replicate Python's **kwargs...
col_types = do.call(readr::cols, col_classes)
df <- s3tools::read_using(FUN=readr::read_csv, s3_path=s3_path_stripped, col_names=TRUE, col_types=col_types)

} else if(return_df_as == 'data.table'){
dt_ver <- packageVersion("data.table")
if(dt_ver < '1.11.8'){
warning("Your version of data.table must be 1.11.8 or above please install a new version otherwise your outputs of type data.table may not convert data types properly.")
} else if(return_df_as == 'data.table'){
dt_ver <- packageVersion("data.table")
if(dt_ver < '1.11.8'){
warning("Your version of data.table must be 1.11.8 or above please install a new version otherwise your outputs of type data.table may not convert data types properly.")
}
df <- s3tools::read_using(FUN=data.table::fread, s3_path=s3_path_stripped, header=TRUE, colClasses=col_classes_vec)
} else {
df <- s3tools::read_using(FUN=read.csv, s3_path=s3_path_stripped, header=TRUE, colClasses=col_classes_vec)
}
df <- s3tools::read_using(FUN=data.table::fread, s3_path=s3_path_stripped, header=TRUE, colClasses=col_classes_vec)
} else {
df <- s3tools::read_using(FUN=read.csv, s3_path=s3_path_stripped, header=TRUE, colClasses=col_classes_vec)
}
dbtools:::delete_object(bucket, s3_key)
dbtools:::delete_object(bucket, paste0(s3_key, ".metadata"))
Expand Down
14 changes: 13 additions & 1 deletion inst/extdata/boto_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
import boto3
from botocore.credentials import InstanceMetadataProvider, InstanceMetadataFetcher

def delete_object(bucket, key):
s3_client = boto3.client('s3')
rn = "eu-west-1"
provider = InstanceMetadataProvider(
iam_role_fetcher=InstanceMetadataFetcher(timeout=1000, num_attempts=2)
)
creds = provider.load().get_frozen_credentials()
s3_client = boto3.client(
"s3",
region_name=rn,
aws_access_key_id=creds.access_key,
aws_secret_access_key=creds.secret_key,
aws_session_token=creds.token,
)
s3_client.delete_object(Bucket=bucket, Key=key)
68 changes: 0 additions & 68 deletions inst/extdata/get_athena_query_response.py

This file was deleted.

0 comments on commit f71ccce

Please sign in to comment.