dply
supports the following functions:
- arrange Sorts rows by column values
- count Counts columns unique values
- config Configure display format options
- csv Reads or writes a dataframe in CSV format
- distinct Retains unique rows
- filter Filters rows that satisfy given predicates
- glimpse Shows a dataframe overview
- group by and summarize Performs grouped aggregations
- head Shows the first few dataframe rows in table format
- joins Left, inner, outer and cross joins
- json Reads or writes a dataframe in JSON format
- mutate Creates or mutate columns
- parquet Reads or writes a dataframe in Parquet format
- relocate Moves columns positions
- rename Renames columns
- select Selects columns
- show Shows all dataframe rows
- unnest Unnest list columns
more examples can be found in the tests folder.
arrange
sorts the rows of its input dataframe according to the values of the
given columns:
$ dply -c 'parquet("nyctaxi.parquet") |
count(payment_type, VendorID) |
arrange(payment_type, n) |
show()'
shape: (8, 3)
ββββββββββββββββ¬βββββββββββ¬ββββββ
β payment_type β VendorID β n β
β --- β --- β --- β
β str β i64 β u32 β
ββββββββββββββββͺβββββββββββͺββββββ‘
β Cash β 1 β 12 β
β Cash β 2 β 41 β
β Credit card β 1 β 37 β
β Credit card β 2 β 148 β
β Dispute β 2 β 2 β
β No charge β 1 β 1 β
β Unknown β 2 β 4 β
β Unknown β 1 β 5 β
ββββββββββββββββ΄βββββββββββ΄ββββββ
To invert the ordering of a column use the desc
function:
$ dply -c 'parquet("nyctaxi.parquet") |
count(payment_type, VendorID) |
arrange(desc(payment_type), n) |
show()'
shape: (8, 3)
ββββββββββββββββ¬βββββββββββ¬ββββββ
β payment_type β VendorID β n β
β --- β --- β --- β
β str β i64 β u32 β
ββββββββββββββββͺβββββββββββͺββββββ‘
β Unknown β 2 β 4 β
β Unknown β 1 β 5 β
β No charge β 1 β 1 β
β Dispute β 2 β 2 β
β Credit card β 1 β 37 β
β Credit card β 2 β 148 β
β Cash β 1 β 12 β
β Cash β 2 β 41 β
ββββββββββββββββ΄βββββββββββ΄ββββββ
config
configures display options, it supports the following attributes:
max_columns
: The maximum number of columns in a table.max_column_width
: The maximum number of characters used in a column value.max_table_width
: The maximum table width. Pass 0 for using the terminal width.
The following example shows only 2 columns:
$ dply -c 'config(max_columns = 2)
parquet("nyctaxi.parquet") |
count(payment_type, VendorID) |
arrange(desc(payment_type), n) |
show()'
shape: (8, 3)
ββββββββββββββββββ¬βββββββββββββββββ¬ββββββ
β payment_type β VendorID β ... β
β --- β --- β β
β str β i64 β β
ββββββββββββββββββͺβββββββββββββββββͺββββββ‘
β Unknown β 2 β ... β
β Unknown β 1 β ... β
β No charge β 1 β ... β
β Dispute β 2 β ... β
β Credit card β 1 β ... β
β Credit card β 2 β ... β
β Cash β 1 β ... β
β Cash β 2 β ... β
ββββββββββββββββββ΄βββββββββββββββββ΄ββββββ
count
counts the number of unique values in the given columns:
$ dply -c 'parquet("nyctaxi.parquet") |
count(payment_type, VendorID) |
show()'
shape: (8, 3)
ββββββββββββββββ¬βββββββββββ¬ββββββ
β payment_type β VendorID β n β
β --- β --- β --- β
β str β i64 β u32 β
ββββββββββββββββͺβββββββββββͺββββββ‘
β Cash β 1 β 12 β
β Cash β 2 β 41 β
β Credit card β 1 β 37 β
β Credit card β 2 β 148 β
β Dispute β 2 β 2 β
β No charge β 1 β 1 β
β Unknown β 1 β 5 β
β Unknown β 2 β 4 β
ββββββββββββββββ΄βββββββββββ΄ββββββ
passing sort = true
sorts the counters in descending order:
$ dply -c 'parquet("nyctaxi.parquet") |
count(payment_type, VendorID, sort=true) |
show()'
shape: (8, 3)
ββββββββββββββββ¬βββββββββββ¬ββββββ
β payment_type β VendorID β n β
β --- β --- β --- β
β str β i64 β u32 β
ββββββββββββββββͺβββββββββββͺββββββ‘
β Credit card β 2 β 148 β
β Cash β 2 β 41 β
β Credit card β 1 β 37 β
β Cash β 1 β 12 β
β Unknown β 1 β 5 β
β Unknown β 2 β 4 β
β Dispute β 2 β 2 β
β No charge β 1 β 1 β
ββββββββββββββββ΄βββββββββββ΄ββββββ
When csv
is called as the first step in a pipeline it reads a csv file from disk:
$ dply -c 'csv("nyctaxi.csv") |
select(passenger_count, trip_distance, total_amount) |
head(5)'
shape: (5, 3)
βββββββββββββββββββ¬ββββββββββββββββ¬βββββββββββββββ
β passenger_count β trip_distance β total_amount β
β --- β --- β --- β
β i64 β f64 β f64 β
βββββββββββββββββββͺββββββββββββββββͺβββββββββββββββ‘
β 1 β 3.14 β 22.56 β
β 2 β 1.06 β 9.8 β
β 1 β 2.36 β 17.76 β
β 1 β 5.2 β 26.16 β
β 3 β 0.0 β 19.55 β
βββββββββββββββββββ΄ββββββββββββββββ΄βββββββββββββββ
when called after the first step it writes the active dataframe to disk:
$ dply -c 'parquet("nyctaxi.parquet") |
select(passenger_count, payment_type, trip_distance, total_amount) |
csv("trips.csv", overwrite = true) |
count(passenger_count, payment_type, sort = true) |
csv("payments.csv")'
$ ls *.csv
nyctaxi.csv payments.csv trips.csv
By default csv
generates an error if the file already exists, to overwrite the
file pass overwrite = true
.
distinct
keeps unique rows in the input dataframe:
$ dply -c 'parquet("nyctaxi.parquet") |
distinct(payment_type, VendorID) |
arrange(payment_type, VendorID) |
show()'
shape: (8, 2)
ββββββββββββββββ¬βββββββββββ
β payment_type β VendorID β
β --- β --- β
β str β i64 β
ββββββββββββββββͺβββββββββββ‘
β Cash β 1 β
β Cash β 2 β
β Credit card β 1 β
β Credit card β 2 β
β Dispute β 2 β
β No charge β 1 β
β Unknown β 1 β
β Unknown β 2 β
ββββββββββββββββ΄βββββββββββ
when called without any columns it shows the distinct rows in the input dataframe.
filter
retains all the rows whose column values satisfy the given predicates.
For each predicate the left hand side of each condition must specify a column,
predicates that are comma separated are applied one after the other:
$ dply -c 'parquet("nyctaxi.parquet") |
select(payment_type, trip_distance, total_amount) |
filter(payment_type == "Cash", trip_distance < 2, total_amount < 10) |
show()'
shape: (8, 3)
ββββββββββββββββ¬ββββββββββββββββ¬βββββββββββββββ
β payment_type β trip_distance β total_amount β
β --- β --- β --- β
β str β f64 β f64 β
ββββββββββββββββͺββββββββββββββββͺβββββββββββββββ‘
β Cash β 1.06 β 9.8 β
β Cash β 0.0 β 3.3 β
β Cash β 1.24 β 7.8 β
β Cash β 1.18 β 8.8 β
β Cash β 1.18 β 9.8 β
β Cash β 0.9 β 8.3 β
β Cash β 0.74 β 8.8 β
β Cash β 1.2 β 9.8 β
ββββββββββββββββ΄ββββββββββββββββ΄βββββββββββββββ
filter
supports logical &
and |
in predicates, their priority is right
associative, the following predicate will return all rows whose payment is Cash
or rows whose trip_distance < 2
and total_amount < 10
:
$ dply -c 'parquet("nyctaxi.parquet") |
select(payment_type, trip_distance, total_amount) |
filter(payment_type == "Cash" | trip_distance < 2 & total_amount < 10) |
glimpse()'
Rows: 68
Columns: 3
+---------------+--------+----------------------------------------------------+
| payment_type | str | "Cash", "Cash", "Cash", "Credit card", "Cash",... |
| trip_distance | f64 | 1.06, 2.39, 1.52, 0.48, 2.88, 4.67, 1.6, 0.0,... |
| total_amount | f64 | 9.8, 22.3, 11.8, 9.13, 16.3, 21.3, 12.8, 3.3, 7... |
+---------------+--------+----------------------------------------------------+
we can use parenthesis to change the priority:
$ dply -c 'parquet("nyctaxi.parquet") |
select(payment_type, trip_distance, total_amount) |
filter((payment_type == "Cash" | trip_distance < 2) & total_amount < 10) |
glimpse()'
Rows: 23
Columns: 3
+---------------+--------+----------------------------------------------------+
| payment_type | str | "Cash", "Credit card", "Cash", "Dispute", "Cred... |
| trip_distance | f64 | 1.06, 0.48, 0.0, 0.43, 0.42, 0.66, 1.1, 0.49, 0.5 |
| total_amount | f64 | 9.8, 9.13, 3.3, 7.3, 8.5, 9.36, 8.8, 8.76, 9.8 |
+---------------+--------+----------------------------------------------------+
To compare dates use the dt
function, it can parse a string with a date-time
YYYY-MM-DD HH:MM:SS
or a date YYYY-MM-DD
:
$ dply -c 'parquet("nyctaxi.parquet") |
select(ends_with("time")) |
filter(tpep_pickup_datetime < dt("2022-11-01 12:00:00")) |
show()'
shape: (4, 2)
ββββββββββββββββββββββββ¬ββββββββββββββββββββββββ
β tpep_pickup_datetime β tpep_dropoff_datetime β
β --- β --- β
β datetime[ns] β datetime[ns] β
ββββββββββββββββββββββββͺββββββββββββββββββββββββ‘
β 2022-11-01 10:45:13 β 2022-11-01 10:53:56 β
β 2022-11-01 07:31:16 β 2022-11-01 08:19:44 β
β 2022-11-01 11:33:46 β 2022-11-01 12:03:15 β
β 2022-11-01 11:17:08 β 2022-11-01 12:08:15 β
ββββββββββββββββββββββββ΄ββββββββββββββββββββββββ
The contains
function can be used on string or list columns to find rows that
contain a given value. For finding string values use a regex pattern:
$ dply -c 'parquet("nyctaxi.parquet") |
filter(contains(payment_type, "(?i:no)")) |
distinct(payment_type) |
show()'
shape: (2, 1)
ββββββββββββββββ
β payment_type β
β --- β
β str β
ββββββββββββββββ‘
β Unknown β
β No charge β
ββββββββββββββββ
for dataframes that contain list columns where each column value is a list:
$ dply -c 'parquet("lists.parquet") | head(5)'
shape: (5, 4)
ββββββββββββ¬ββββββββββββββ¬βββββββββββββββββββββ¬βββββββββββββββββββββββββββββ
β shape_id β ints β floats β tags β
β --- β --- β --- β --- β
β u32 β list[u32] β list[f64] β list[str] β
ββββββββββββͺββββββββββββββͺβββββββββββββββββββββͺβββββββββββββββββββββββββββββ‘
β 1 β [3, 88, 94] β [2.5, 3.5, β¦ 23.0] β ["tag2", "tag5", β¦ "tag8"] β
β 2 β [73] β [3.5, 15.0, 23.0] β ["tag9"] β
β 3 β null β [1.0, 2.5, β¦ 6.0] β ["tag5"] β
β 4 β [43, 97] β [2.5, 2.5, β¦ 19.0] β ["tag7"] β
β 5 β null β [2.5, 2.5, β¦ 23.0] β ["tag2", "tag3", "tag4"] β
ββββββββββββ΄ββββββββββββββ΄βββββββββββββββββββββ΄βββββββββββββββββββββββββββββ
we can get all rows that have a tag that matches ag5
or ag9
:
$ dply -c 'parquet("lists.parquet") |
filter(contains(tags, "ag5|ag9")) |
head(5)'
shape: (5, 4)
ββββββββββββ¬ββββββββββββββββ¬βββββββββββββββββββββ¬βββββββββββββββββββββββββββββ
β shape_id β ints β floats β tags β
β --- β --- β --- β --- β
β u32 β list[u32] β list[f64] β list[str] β
ββββββββββββͺββββββββββββββββͺβββββββββββββββββββββͺβββββββββββββββββββββββββββββ‘
β 1 β [3, 88, 94] β [2.5, 3.5, β¦ 23.0] β ["tag2", "tag5", β¦ "tag8"] β
β 2 β [73] β [3.5, 15.0, 23.0] β ["tag9"] β
β 3 β null β [1.0, 2.5, β¦ 6.0] β ["tag5"] β
β 7 β [1, 22, β¦ 87] β [3.5] β ["tag5", "tag6", β¦ "tag7"] β
β 10 β [6] β [2.5, 3.5, β¦ 5.0] β ["tag1", "tag3", β¦ "tag9"] β
ββββββββββββ΄ββββββββββββββββ΄βββββββββββββββββββββ΄βββββββββββββββββββββββββββββ
or for numeric values we specify the value we are looking for:
$ dply -c 'parquet("lists.parquet") |
filter(contains(floats, 2.5)) |
head(5)'
shape: (5, 4)
ββββββββββββ¬ββββββββββββββ¬βββββββββββββββββββββ¬βββββββββββββββββββββββββββββ
β shape_id β ints β floats β tags β
β --- β --- β --- β --- β
β u32 β list[u32] β list[f64] β list[str] β
ββββββββββββͺββββββββββββββͺβββββββββββββββββββββͺβββββββββββββββββββββββββββββ‘
β 1 β [3, 88, 94] β [2.5, 3.5, β¦ 23.0] β ["tag2", "tag5", β¦ "tag8"] β
β 3 β null β [1.0, 2.5, β¦ 6.0] β ["tag5"] β
β 4 β [43, 97] β [2.5, 2.5, β¦ 19.0] β ["tag7"] β
β 5 β null β [2.5, 2.5, β¦ 23.0] β ["tag2", "tag3", "tag4"] β
β 10 β [6] β [2.5, 3.5, β¦ 5.0] β ["tag1", "tag3", β¦ "tag9"] β
ββββββββββββ΄ββββββββββββββ΄βββββββββββββββββββββ΄βββββββββββββββββββββββββββββ
Use is_null
or !is_null
to check for null values:
dply -c 'parquet("lists.parquet") |
filter(is_null(ints) & contains(tags, "ag9")) |
head(5)'
shape: (5, 4)
ββββββββββββ¬ββββββββββββ¬βββββββββββββββββββββ¬βββββββββββββββββββββββββββββ
β shape_id β ints β floats β tags β
β --- β --- β --- β --- β
β u32 β list[u32] β list[f64] β list[str] β
ββββββββββββͺββββββββββββͺβββββββββββββββββββββͺβββββββββββββββββββββββββββββ‘
β 78 β null β [1.0, 15.0, 15.0] β ["tag7", "tag9"] β
β 88 β null β [3.5] β ["tag3", "tag5", β¦ "tag9"] β
β 91 β null β [1.0, 2.5, β¦ 23.0] β ["tag1", "tag9"] β
β 141 β null β [15.0] β ["tag9"] β
β 193 β null β [6.0] β ["tag1", "tag7", β¦ "tag9"] β
ββββββββββββ΄ββββββββββββ΄βββββββββββββββββββββ΄βββββββββββββββββββββββββββββ
glimpse
displays an overview of the input dataframe by showing each column in a
row with its type and a few values. This format is convenient when a dataframe
has many columns and a table view doesn't fit in the terminal.
$ dply -c 'parquet("nyctaxi.parquet") | glimpse()'
Rows: 250
Columns: 19
+-----------------------+--------------+---------------------------------------------+
| VendorID | i64 | 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 1, 1, 2... |
| tpep_pickup_datetime | datetime[ns] | 2022-11-22 19:27:01, 2022-11-27 16:43:26... |
| tpep_dropoff_datetime | datetime[ns] | 2022-11-22 19:45:53, 2022-11-27 16:50:06... |
| passenger_count | i64 | 1, 2, 1, 1, 3, 1, 2, 1, 1, 2, 2, 1, 1, 1... |
| trip_distance | f64 | 3.14, 1.06, 2.36, 5.2, 0.0, 2.39, 1.52,... |
| rate_code | str | "Standard", "Standard", "Standard",... |
| store_and_fwd_flag | str | "N", "N", "N", "N", "N", "N", "N", "N",... |
| PULocationID | i64 | 234, 48, 142, 79, 237, 137, 107, 229, 16... |
| DOLocationID | i64 | 141, 142, 236, 75, 230, 140, 162, 161, 1... |
| payment_type | str | "Credit card", "Cash", "Credit card",... |
| fare_amount | f64 | 14.5, 6.5, 11.5, 18.0, 12.5, 19.0, 8.5,... |
| extra | f64 | 1.0, 0.0, 0.0, 0.5, 3.0, 0.0, 0.0, 0.0,... |
| mta_tax | f64 | 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5,... |
| tip_amount | f64 | 3.76, 0.0, 2.96, 4.36, 3.25, 0.0, 0.0, 2... |
| tolls_amount | f64 | 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,... |
| improvement_surcharge | f64 | 0.3, 0.3, 0.3, 0.3, 0.3, 0.3, 0.3, 0.3,... |
| total_amount | f64 | 22.56, 9.8, 17.76, 26.16, 19.55, 22.3,... |
| congestion_surcharge | f64 | 2.5, 2.5, 2.5, 2.5, 2.5, 2.5, 2.5, 2.5,... |
| airport_fee | f64 | 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,... |
+-----------------------+--------------+---------------------------------------------+
As glimpse
consumes the input dataframe it must be the last function in a
pipeline.
group_by
and summarize
work together to compute aggregations on groups of
values. group_by
specifies which columns to use for the groups and summarize
specifies which aggregate operations to compute.
summarize
supports the following aggregate functions, list
, max
, min
,
mean
, median
, sd
, sum
, var
and quantile
.
A call to group_by
must always be followed by a summarize
.
For example to compute the mean, standard deviation, minimum and maximum price paid and number of rows for each payment type:
$ dply -c 'parquet("nyctaxi.parquet") |
group_by(payment_type) |
summarize(
mean_price = mean(total_amount),
std_price = sd(total_amount),
min_price = min(total_amount),
max_price = max(total_amount),
n = n()
) |
arrange(desc(n)) |
show()'
shape: (5, 6)
ββββββββββββββββ¬βββββββββββββ¬ββββββββββββ¬ββββββββββββ¬ββββββββββββ¬ββββββ
β payment_type β mean_price β std_price β min_price β max_price β n β
β --- β --- β --- β --- β --- β --- β
β str β f64 β f64 β f64 β f64 β u32 β
ββββββββββββββββͺβββββββββββββͺββββββββββββͺββββββββββββͺββββββββββββͺββββββ‘
β Credit card β 22.378757 β 16.095337 β 8.5 β 84.36 β 185 β
β Cash β 18.458491 β 12.545236 β 3.3 β 63.1 β 53 β
β Unknown β 26.847778 β 14.279152 β 9.96 β 54.47 β 9 β
β Dispute β -0.5 β 11.030866 β -8.3 β 7.3 β 2 β
β No charge β 8.8 β 0.0 β 8.8 β 8.8 β 1 β
ββββββββββββββββ΄βββββββββββββ΄ββββββββββββ΄ββββββββββββ΄ββββββββββββ΄ββββββ
To compute aggregations on all values in a dataframe call summarize
without
grouping:
dply -c 'parquet("nyctaxi.parquet") |
summarize(
mean_price = mean(total_amount),
std_price = sd(total_amount),
var_price = var(total_amount),
n = n()
) |
show()'
shape: (1, 4)
ββββββββββββββ¬ββββββββββββ¬βββββββββββββ¬ββββββ
β mean_price β std_price β var_price β n β
β --- β --- β --- β --- β
β f64 β f64 β f64 β u32 β
ββββββββββββββͺββββββββββββͺβββββββββββββͺββββββ‘
β 21.4712 β 15.474215 β 239.451342 β 250 β
ββββββββββββββ΄ββββββββββββ΄βββββββββββββ΄ββββββ
See tests for more examples.
head
shows the first few rows from a dataframe, an optional parameter can be
used to change the number of rows that are shown.
head
must be the last step in a pipeline as it consumes the input dataframe.
By using dataframe variables we can join dataframes with inner_join
,
left_join
, outer_join
, or cross_join
.
If we join by specifying a dataframe without specifying the join columns then the
join is done by using all common columns, here we rename PULocationID
to make
the join work:
$ dply -c 'csv("zones.csv") | zones_df
parquet("nyctaxi.parquet") |
select(LocationID = PULocationID) |
left_join(zones_df) |
head(5)'
shape: (5, 4)
ββββββββββββββ¬ββββββββββββ¬ββββββββββββββββββββββββ¬βββββββββββββββ
β LocationID β Borough β Zone β service_zone β
β --- β --- β --- β --- β
β i64 β str β str β str β
ββββββββββββββͺββββββββββββͺββββββββββββββββββββββββͺβββββββββββββββ‘
β 234 β Manhattan β Union Sq β Yellow Zone β
β 48 β Manhattan β Clinton East β Yellow Zone β
β 142 β Manhattan β Lincoln Square East β Yellow Zone β
β 79 β Manhattan β East Village β Yellow Zone β
β 237 β Manhattan β Upper East Side South β Yellow Zone β
ββββββββββββββ΄ββββββββββββ΄ββββββββββββββββββββββββ΄βββββββββββββββ
To join on specific columns we can pass them to the join call:
dply -c 'csv("zones.csv") | zones_df
parquet("nyctaxi.parquet") |
left_join(zones_df, PULocationID == LocationID) |
select(PULocationID, Zone) |
head(5)'
shape: (5, 2)
ββββββββββββββββ¬ββββββββββββββββββββββββ
β PULocationID β Zone β
β --- β --- β
β i64 β str β
ββββββββββββββββͺββββββββββββββββββββββββ‘
β 234 β Union Sq β
β 48 β Clinton East β
β 142 β Lincoln Square East β
β 79 β East Village β
β 237 β Upper East Side South β
ββββββββββββββββ΄ββββββββββββββββββββββββ
When json
is called as the first step in a pipeline it reads a JSON file from disk:
$ dply -c 'json("./tests/data/github.json") |
select(created_at, public, repo, type) |
head()'
shape: (4, 4)
ββββββββββββββββββββββββ¬ββββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββββββββββ¬βββββββββββββ
β created_at β public β repo β type β
β --- β --- β --- β --- β
β str β bool β struct[3] β str β
ββββββββββββββββββββββββͺββββββββββββββββͺβββββββββββββββββββββββββββββββββββββββββββββββββββͺβββββββββββββ‘
β 2023-07-16T11:00:00Z β true β {id: 278515889, name: user2134, url: β PushEvent β
β β β https://api.github.com/repos/some_repo} β β
β 2023-07-16T11:00:00Z β true β {id: 21090723, name: User123/tdi-studio-se, url: β PushEvent β
β β β https://api.github.com/repos/S... β β
β 2023-07-16T11:00:01Z β true β {id: 26810458, name: User5/user-name, url: β ForkEvent β
β β β https://api.github.com/repos/Some_re... β β
β 2023-07-16T11:00:01Z β true β {id: 940421158, name: the repo name, url: β PushEvent β
β β β https://api.github.com/repos/Some_rep... β β
ββββββββββββββββββββββββ΄ββββββββββββββββ΄βββββββββββββββββββββββββββββββββββββββββββββββββββ΄βββββββββββββ
when called after the first step it writes the active dataframe as a JSON file to disk:
$ dply -c 'parquet("nyctaxi.parquet") | json("nyctaxi.json")'
$ head -1 nyctaxi.json| jq
{
"DOLocationID": 141,
"PULocationID": 234,
"VendorID": 2,
"airport_fee": 0,
"congestion_surcharge": 2.5,
"extra": 1,
"fare_amount": 14.5,
"improvement_surcharge": 0.3,
"mta_tax": 0.5,
"passenger_count": 1,
"payment_type": "Credit card",
"rate_code": "Standard",
"store_and_fwd_flag": "N",
"tip_amount": 3.76,
"tolls_amount": 0,
"total_amount": 22.56,
"tpep_dropoff_datetime": "2022-11-22T19:45:53",
"tpep_pickup_datetime": "2022-11-22T19:27:01",
"trip_distance": 3.14
}
mutate
creates new columns by applying transformations to existing columns. For
example to add column for trip duration and average speed in km/h:
$ dply -c 'parquet("nyctaxi.parquet") |
select(ends_with("time"), trip_distance_mi = trip_distance) |
mutate(
travel_time_ns = tpep_dropoff_datetime - tpep_pickup_datetime,
trip_distance_km = trip_distance_mi * 1.60934,
avg_speed_km_h = trip_distance_km / (travel_time_ns / 3.6e12)
) |
select(travel_time_ns, trip_distance_km, avg_speed_km_h) |
arrange(desc(travel_time_ns)) |
head(5)'
shape: (5, 3)
ββββββββββββββββββ¬βββββββββββββββββββ¬βββββββββββββββββ
β travel_time_ns β trip_distance_km β avg_speed_km_h β
β --- β --- β --- β
β duration[ns] β f64 β f64 β
ββββββββββββββββββͺβββββββββββββββββββͺβββββββββββββββββ‘
β 1h 6m β 28.179543 β 25.617767 β
β 1h 2m 39s β 28.630159 β 27.419146 β
β 55m 48s β 26.763324 β 28.777768 β
β 53m 45s β 19.988003 β 22.312189 β
β 51m 7s β 14.966862 β 17.567885 β
ββββββββββββββββββ΄βββββββββββββββββββ΄βββββββββββββββββ
mutate
supports also len
for list columns, and mean
, max
, min
, median
,
and dt
for scalar columns, seetests for more examples.
When parquet
is called as the first step in a pipeline it reads a parquet file
from disk:
$ dply -c 'parquet("nyctaxi.parquet") |
select(passenger_count, trip_distance, total_amount) |
head(5)'
shape: (5, 3)
βββββββββββββββββββ¬ββββββββββββββββ¬βββββββββββββββ
β passenger_count β trip_distance β total_amount β
β --- β --- β --- β
β i64 β f64 β f64 β
βββββββββββββββββββͺββββββββββββββββͺβββββββββββββββ‘
β 1 β 3.14 β 22.56 β
β 2 β 1.06 β 9.8 β
β 1 β 2.36 β 17.76 β
β 1 β 5.2 β 26.16 β
β 3 β 0.0 β 19.55 β
βββββββββββββββββββ΄ββββββββββββββββ΄βββββββββββββββ
when called after the first step it writes the active dataframe to disk:
$ dply -c 'parquet("nyctaxi.parquet") |
select(passenger_count, payment_type, trip_distance, total_amount) |
parquet("trips.parquet", overwrite = true) |
count(passenger_count, payment_type, sort = true) |
parquet("payments.parquet")'
$ ls *.parquet
nyctaxi.parquet payments.parquet trips.parquet
By default parquet
generates an error if the file already exists, to overwrite
the file pass overwrite = true
.
relocate
moves column in the dataframe, by default the given columns are moved
before the first column:
$ dply -c 'parquet("nyctaxi.parquet") |
relocate(passenger_count, payment_type, total_amount) |
glimpse()'
Rows: 250
Columns: 19
+-----------------------+--------------+----------------------------------------------------+
| passenger_count | i64 | 1, 2, 1, 1, 3, 1, 2, 1, 1, 2, 2, 1, 1, 1, 1, 5,... |
| payment_type | str | "Credit card", "Cash", "Credit card", "Credit... |
| total_amount | f64 | 22.56, 9.8, 17.76, 26.16, 19.55, 22.3, 11.8, 11... |
| VendorID | i64 | 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 1, 1, 2, 2, 2,... |
| tpep_pickup_datetime | datetime[ns] | 2022-11-22 19:27:01, 2022-11-27 16:43:26,... |
| tpep_dropoff_datetime | datetime[ns] | 2022-11-22 19:45:53, 2022-11-27 16:50:06,... |
| trip_distance | f64 | 3.14, 1.06, 2.36, 5.2, 0.0, 2.39, 1.52, 0.51,... |
| rate_code | str | "Standard", "Standard", "Standard", "Standard",... |
| store_and_fwd_flag | str | "N", "N", "N", "N", "N", "N", "N", "N", "N", "N... |
| PULocationID | i64 | 234, 48, 142, 79, 237, 137, 107, 229, 162, 48,... |
| DOLocationID | i64 | 141, 142, 236, 75, 230, 140, 162, 161, 186, 239... |
| fare_amount | f64 | 14.5, 6.5, 11.5, 18.0, 12.5, 19.0, 8.5, 6.0, 12... |
| extra | f64 | 1.0, 0.0, 0.0, 0.5, 3.0, 0.0, 0.0, 0.0, 1.0, 0.... |
| mta_tax | f64 | 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.... |
| tip_amount | f64 | 3.76, 0.0, 2.96, 4.36, 3.25, 0.0, 0.0, 2.0, 3.2... |
| tolls_amount | f64 | 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.... |
| improvement_surcharge | f64 | 0.3, 0.3, 0.3, 0.3, 0.3, 0.3, 0.3, 0.3, 0.3, 0.... |
| congestion_surcharge | f64 | 2.5, 2.5, 2.5, 2.5, 2.5, 2.5, 2.5, 2.5, 2.5, 2.... |
| airport_fee | f64 | 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.... |
+-----------------------+--------------+----------------------------------------------------+
relocate
also supports the options before = column
and after = column
to
move columns before or after a specific column, see tests
for examples.
rename
renames columns, each rename has new_name = old_name
format:
$ dply -c 'parquet("nyctaxi.parquet") |
rename(
vendor_id = VendorID,
pu_location_id = PULocationID,
do_location_id = DOLocationID
) |
glimpse()'
Rows: 250
Columns: 19
+-----------------------+--------------+----------------------------------------------------+
| vendor_id | i64 | 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 1, 1, 2, 2, 2,... |
| tpep_pickup_datetime | datetime[ns] | 2022-11-22 19:27:01, 2022-11-27 16:43:26,... |
| tpep_dropoff_datetime | datetime[ns] | 2022-11-22 19:45:53, 2022-11-27 16:50:06,... |
| passenger_count | i64 | 1, 2, 1, 1, 3, 1, 2, 1, 1, 2, 2, 1, 1, 1, 1, 5,... |
| trip_distance | f64 | 3.14, 1.06, 2.36, 5.2, 0.0, 2.39, 1.52, 0.51,... |
| rate_code | str | "Standard", "Standard", "Standard", "Standard",... |
| store_and_fwd_flag | str | "N", "N", "N", "N", "N", "N", "N", "N", "N", "N... |
| pu_location_id | i64 | 234, 48, 142, 79, 237, 137, 107, 229, 162, 48,... |
| do_location_id | i64 | 141, 142, 236, 75, 230, 140, 162, 161, 186, 239... |
| payment_type | str | "Credit card", "Cash", "Credit card", "Credit... |
| fare_amount | f64 | 14.5, 6.5, 11.5, 18.0, 12.5, 19.0, 8.5, 6.0, 12... |
| extra | f64 | 1.0, 0.0, 0.0, 0.5, 3.0, 0.0, 0.0, 0.0, 1.0, 0.... |
| mta_tax | f64 | 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.5, 0.... |
| tip_amount | f64 | 3.76, 0.0, 2.96, 4.36, 3.25, 0.0, 0.0, 2.0, 3.2... |
| tolls_amount | f64 | 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.... |
| improvement_surcharge | f64 | 0.3, 0.3, 0.3, 0.3, 0.3, 0.3, 0.3, 0.3, 0.3, 0.... |
| total_amount | f64 | 22.56, 9.8, 17.76, 26.16, 19.55, 22.3, 11.8, 11... |
| congestion_surcharge | f64 | 2.5, 2.5, 2.5, 2.5, 2.5, 2.5, 2.5, 2.5, 2.5, 2.... |
| airport_fee | f64 | 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.... |
+-----------------------+--------------+----------------------------------------------------+
select
keeps the columns specified in its arguments and optionally rename them.
It accepts column names and starts_with
, ends_with
and contains
predicates:
$ dply -c 'parquet("nyctaxi.parquet") |
select(
vendor_id = VendorID,
ends_with("time"),
contains("amount")
) |
glimpse()'
Rows: 250
Columns: 7
+-----------------------+--------------+----------------------------------------------------+
| vendor_id | i64 | 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 1, 1, 2, 2, 2,... |
| tpep_pickup_datetime | datetime[ns] | 2022-11-22 19:27:01, 2022-11-27 16:43:26,... |
| tpep_dropoff_datetime | datetime[ns] | 2022-11-22 19:45:53, 2022-11-27 16:50:06,... |
| fare_amount | f64 | 14.5, 6.5, 11.5, 18.0, 12.5, 19.0, 8.5, 6.0, 12... |
| tip_amount | f64 | 3.76, 0.0, 2.96, 4.36, 3.25, 0.0, 0.0, 2.0, 3.2... |
| tolls_amount | f64 | 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.... |
| total_amount | f64 | 22.56, 9.8, 17.76, 26.16, 19.55, 22.3, 11.8, 11... |
+-----------------------+--------------+----------------------------------------------------+
Any of the predicates functions can be negated with !
:
$ dply -c 'parquet("nyctaxi.parquet") |
select(!contains("a")) |
head(5)'
shape: (5, 1)
ββββββββββββ
β VendorID β
β --- β
β i64 β
ββββββββββββ‘
β 2 β
β 2 β
β 2 β
β 2 β
β 1 β
ββββββββββββ
show
displays all the rows in the input dataframe in table format. show
must
be the last step in a pipeline as it consumes the input dataframe.
unnest
expands a list column creating a row for each element in the list:
$ dply -c 'parquet("lists.parquet") |
select(shape_id, ints) |
unnest(ints) |
head()'
shape: (10, 2)
ββββββββββββ¬βββββββ
β shape_id β ints β
β --- β --- β
β u32 β u32 β
ββββββββββββͺβββββββ‘
β 1 β 3 β
β 1 β 88 β
β 1 β 94 β
β 2 β 73 β
β 3 β null β
β 4 β 43 β
β 4 β 97 β
β 5 β null β
β 6 β 65 β
β 7 β 1 β
ββββββββββββ΄βββββββ
To create a list column from a group we can use the list
function in
summarize
:
$ dply -c 'parquet("lists.parquet") |
select(shape_id, ints) |
unnest(ints) |
group_by(shape_id) |
summarize(ints = list(ints)) |
head()'
shape: (10, 2)
ββββββββββββ¬βββββββββββββββββ
β shape_id β ints β
β --- β --- β
β u32 β list[u32] β
ββββββββββββͺβββββββββββββββββ‘
β 1 β [3, 88, 94] β
β 2 β [73] β
β 3 β [null] β
β 4 β [43, 97] β
β 5 β [null] β
β 6 β [65] β
β 7 β [1, 22, β¦ 87] β
β 8 β [null] β
β 9 β [36, 37, β¦ 48] β
β 10 β [6] β
ββββββββββββ΄βββββββββββββββββ
If we have a dataframe with columns that contain a list of structs:
$ dply -c 'parquet("structs.parquet") | head(8)'
shape: (8, 2)
ββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β shape_id β points β
β --- β --- β
β u32 β list[struct[4]] β
ββββββββββββͺβββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ‘
β 1 β [{"s1",0,-7.144482,-2.752852}, {"s1",1,-3.377404,-2.862458}, {"s1",2,-4.05302,6.336014}] β
β 2 β null β
β 3 β [{"s3",0,-8.744724,-0.039072}] β
β 4 β [{"s4",0,-0.807573,-7.81899}] β
β 5 β [{"s5",0,-2.831063,5.288568}] β
β 6 β [{"s6",0,4.039896,-3.030655}] β
β 7 β [{"s7",0,4.160488,9.694407}, {"s7",1,-7.926216,-4.505739}, {"s7",2,8.11179,8.441616}] β
β 8 β [{"s8",0,0.737154,0.908487}, {"s8",1,-2.295539,-7.304075}, {"s8",2,-1.40542,-9.652238}] β
ββββββββββββ΄βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
we can unnest
twice to get all values as columns:
dply -c 'parquet("structs.parquet") | unnest(points, points) | head()'
shape: (10, 5)
ββββββββββββ¬βββββββ¬βββββββ¬ββββββββββββ¬ββββββββββββ
β shape_id β ptag β pid β x β y β
β --- β --- β --- β --- β --- β
β u32 β str β i32 β f32 β f32 β
ββββββββββββͺβββββββͺβββββββͺββββββββββββͺββββββββββββ‘
β 1 β s1 β 0 β -7.144482 β -2.752852 β
β 1 β s1 β 1 β -3.377404 β -2.862458 β
β 1 β s1 β 2 β -4.05302 β 6.336014 β
β 2 β null β null β null β null β
β 3 β s3 β 0 β -8.744724 β -0.039072 β
β 4 β s4 β 0 β -0.807573 β -7.81899 β
β 5 β s5 β 0 β -2.831063 β 5.288568 β
β 6 β s6 β 0 β 4.039896 β -3.030655 β
β 7 β s7 β 0 β 4.160488 β 9.694407 β
β 7 β s7 β 1 β -7.926216 β -4.505739 β
ββββββββββββ΄βββββββ΄βββββββ΄ββββββββββββ΄ββββββββββββ
Pipeline variables store a pipeline progress that can be used by other pipelines, they are useful for joins or as partial computations to be used in other pipelines.
Pipelines can be separated by a newline or by a semicolon, the following example
has two pipelines, the first reads a CSV file with some zones mapping and saves
the result to the zones_df
variable, the second one uses zones_df
for a join
(note semicolon to separate pipelines):
$ dply -c 'csv("zones.csv") | zones_df; parquet("nyctaxi.parquet") |
left_join(zones_df, PULocationID == LocationID) |
select(contains("amount"), Zone) |
head()'
shape: (10, 5)
βββββββββββββββ¬βββββββββββββ¬βββββββββββββββ¬βββββββββββββββ¬ββββββββββββββββββββββββββββββββ
β fare_amount β tip_amount β tolls_amount β total_amount β Zone β
β --- β --- β --- β --- β --- β
β f64 β f64 β f64 β f64 β str β
βββββββββββββββͺβββββββββββββͺβββββββββββββββͺβββββββββββββββͺββββββββββββββββββββββββββββββββ‘
β 14.5 β 3.76 β 0.0 β 22.56 β Union Sq β
β 6.5 β 0.0 β 0.0 β 9.8 β Clinton East β
β 11.5 β 2.96 β 0.0 β 17.76 β Lincoln Square East β
β 18.0 β 4.36 β 0.0 β 26.16 β East Village β
β 12.5 β 3.25 β 0.0 β 19.55 β Upper East Side South β
β 19.0 β 0.0 β 0.0 β 22.3 β Kips Bay β
β 8.5 β 0.0 β 0.0 β 11.8 β Gramercy β
β 6.0 β 2.0 β 0.0 β 11.3 β Sutton Place/Turtle Bay North β
β 12.0 β 3.26 β 0.0 β 19.56 β Midtown East β
β 9.0 β 2.56 β 0.0 β 15.36 β Clinton East β
βββββββββββββββ΄βββββββββββββ΄βββββββββββββββ΄βββββββββββββββ΄ββββββββββββββββββββββββββββββββ
alternatively we can use variables for producing different computation from a common start (use newlines as separator):
$ dply -c 'parquet("nyctaxi.parquet") |
select(payment_type, contains("amount")) |
fare_amounts |
group_by(payment_type) |
summarize(mean_amount = mean(total_amount)) |
head()
fare_amounts |
group_by(payment_type) |
summarize(mean_tips = mean(tip_amount)) |
head()'
shape: (5, 2)
ββββββββββββββββ¬ββββββββββββββ
β payment_type β mean_amount β
β --- β --- β
β str β f64 β
ββββββββββββββββͺββββββββββββββ‘
β Credit card β 22.378757 β
β Cash β 18.458491 β
β Dispute β -0.5 β
β Unknown β 26.847778 β
β No charge β 8.8 β
ββββββββββββββββ΄ββββββββββββββ
shape: (5, 2)
ββββββββββββββββ¬ββββββββββββ
β payment_type β mean_tips β
β --- β --- β
β str β f64 β
ββββββββββββββββͺββββββββββββ‘
β Credit card β 3.469784 β
β Cash β 0.0 β
β Dispute β 0.0 β
β Unknown β 3.082222 β
β No charge β 0.0 β
ββββββββββββββββ΄ββββββββββββ
To reference columns whose name contains characters that are not alphanumeric or
underscores you can quote the column using back ticks, the following example uses
the travel time ns
column that contains words separated by spaces:
dply -c 'parquet("nyctaxi.parquet") |
select(ends_with("time")) |
mutate(`travel time ns` = tpep_dropoff_datetime - tpep_pickup_datetime) |
select(`travel time ns`) |
arrange(desc(`travel time ns`)) |
head(2)'
shape: (2, 1)
ββββββββββββββββββ
β travel time ns β
β --- β
β duration[ns] β
ββββββββββββββββββ‘
β 1h 6m β
β 1h 2m 39s β
ββββββββββββββββββ