Cloud native workflows with piggyback

library(piggyback)

Data Too Big To Fit In Memory

One of the primary advantages of piggyback is the ability to store a lot of fairly large files. This is also potentially the source of some frustrations: piggyback assets may potentially be quite large (too large to fit in RAM) and difficult to work with once they have been uploaded to the release.

There are a substantial and rapidly growing number of packages that are able to work with data on-disk without reading the whole thing into memory, including terra, stars, and sf for large spatial assets, as well as arrow and duckdb for tabular data.

Going a step further, such libraries now also make it possible to not only skip the ‘read twice’ pattern of downloading once to disk and reading to disk, but can let you skip ever reading the whole data file into R at all - for instance, spatial packages can use GDAL’s virtual file system.

arrow and duckdb can do similar tricks on parquet and csv files, allowing users to leverage functions like dplyr::select() and dplyr::filter() directly on the remote data source to access only the subset of rows/columns they need. Subsetting data directly from a URL in this manner thus has the performance benefit of reading directly into memory while also having the added benefit of allowing more efficient and bigger-than-RAM workflows. This is sometimes referred to as cloud-native workflows.

nflverse play by play

This vignette shows some examples of using duckdb for querying larger datasets, using example data from the nflverse project for NFL football analytics. (Consult the nflverse’s nflreadr package if looking to work with NFL data beyond this example)

The nflverse/nflverse-data data repository is organized into one release for a specific dataframe and typically sharded into multiple files (and file formats) by season. Here’s a brief glimpse at how this looks under the piggyback lens:

pb_releases("nflverse/nflverse-data")
#> # A data.frame: 20 × 10
#>   release_name release_id release_body        tag_name draft created_at published_at
#>   <chr>             <int> <chr>               <chr>    <lgl> <chr>      <chr>       
#> 1 rosters        58152863 "Roster data, acce… rosters  FALSE 2022-01-2… 2022-01-28T…
#> 2 player_stats   58152881 "Play by play data… player_… FALSE 2022-01-2… 2022-01-28T…
#> 3 pbp            58152862 "Play by play data… pbp      FALSE 2022-01-2… 2022-01-28T…
#> 4 pfr_advstats   58152981 "PFR Adv Stats dat… pfr_adv… FALSE 2022-01-2… 2022-01-28T…
#> 5 depth_charts   58152948 "Depth chart data,… depth_c… FALSE 2022-01-2… 2022-01-28T…
#> # ℹ 15 more rows
#> # ℹ 3 more variables: html_url <chr>, upload_url <chr>, n_assets <int>
#> # ℹ Use `print(n = ...)` to see more rows

pb_list(repo = "nflverse/nflverse-data", tag = "pbp")
#> # A data.frame: 148 × 6
#>    file_name                     size timestamp           tag   owner repo 
#>    <chr>                        <int> <dttm>              <chr> <chr> <chr>
#>  1 play_by_play_2023.rds     12308832 2023-12-26 17:10:52 pbp   nflv… nflv…
#>  2 play_by_play_2023.parquet 17469950 2023-12-26 17:11:02 pbp   nflv… nflv…
#>  3 play_by_play_2023.csv     84490319 2023-12-26 17:10:58 pbp   nflv… nflv…
#>  4 play_by_play_2022.rds     14387514 2023-02-28 09:25:26 pbp   nflv… nflv…
#>  5 play_by_play_2022.parquet 20003378 2023-02-28 09:25:35 pbp   nflv… nflv…
#>  6 play_by_play_2022.csv     97205016 2023-02-28 09:25:31 pbp   nflv… nflv…
#> # ℹ 143 more rows
#> # ℹ Use `print(n = ...)` to see more rows

pb_download_url(
  "play_by_play_2023.csv", 
  repo = "nflverse/nflverse-data", 
  tag = "pbp"
) |> 
  read.csv() |> 
  dplyr::glimpse()
#> Rows: 42,066
#> Columns: 372
#> $ play_id                              <int> 1, 39, 55, 77, 102, 124, 147…
#> $ game_id                              <chr> "2023_01_ARI_WAS", "2023_01_…
#> $ home_team                            <chr> "WAS", "WAS", "WAS", "WAS", …
#> $ away_team                            <chr> "ARI", "ARI", "ARI", "ARI", …
#> $ season_type                          <chr> "REG", "REG", "REG", "REG", …
#> $ week                                 <int> 1, 1, 1, 1, 1, 1, 1, 1, 1, 1…
#> $ posteam                              <chr> "", "WAS", "WAS", "WAS", "WA…
#> $ posteam_type                         <chr> "", "home", "home", "home", …
#> $ defteam                              <chr> "", "ARI", "ARI", "ARI", "AR…
#> $ yardline_100                         <int> NA, 35, 75, 72, 66, 64, 64, …
#> $ down                                 <int> NA, NA, 1, 2, 3, 1, 2, 1, 2,…
#> $ play_type                            <chr> "", "kickoff", "run", "pass"…

We’ll look at the play by play release data and try to calculate some summary statistics, without downloading it or reading it all into RAM…

DuckDB

Packages used in this section:

library(piggyback)
library(DBI)
library(duckdb)
library(dplyr)
library(glue)
library(tictoc)

First, initialize duckdb and install/load httpfs (short for http file system)

conn <- DBI::dbConnect(duckdb::duckdb())
DBI::dbExecute(conn, "INSTALL 'httpfs'; LOAD 'httpfs';")

Next, we’ll need to get all of the relevant play-by-play URLs from the release - we can do this with pb_download_url - and pass it into duckdb’s read_parquet function

tictoc::tic()
pbp_urls <- pb_download_url(repo = "nflverse/nflverse-data", tag = "pbp")
# keep only the ones matching the desired regex pattern, "play_by_play_####.parquet"
pbp_urls <- pbp_urls[grepl("play_by_play_\\d+.parquet", pbp_urls)]

query <- glue::glue_sql("SELECT COUNT(*) as row_count FROM read_parquet([{pbp_urls *}])", .con = conn)

DBI::dbGetQuery(conn = conn, query)
#>   row_count
#> 1   1190783
tictoc::toc()
#> 2.845 sec elapsed

Now, we can construct a SQL query that summarizes the data:

tictoc::tic()
query <- glue::glue_sql(
  "
  SELECT
    season,
    posteam,
    play_type,
    COUNT(play_id) AS n_plays,
    AVG(epa) AS epa_per_play
  FROM read_parquet([{pbp_urls *}], filename = true)
  WHERE filename SIMILAR TO '.*(2021|2022|2023).*'
  AND (pass = 1 OR rush = 1)
  GROUP BY season, posteam, play_type
  ORDER BY season DESC, posteam ASC, n_plays DESC
  ",
  .con = conn
)

DBI::dbGetQuery(conn = conn, query)
#> # A data.frame: 288 × 5
#>    season posteam play_type n_plays epa_per_play
#>     <int> <chr>   <chr>       <dbl>        <dbl>
#>  1   2023 ARI     pass          539      -0.231 
#>  2   2023 ARI     run           391       0.0351
#>  3   2023 ARI     no_play        48       0.191 
#>  4   2023 ATL     pass          499      -0.0738
#>  5   2023 ATL     run           465      -0.103 
#> # ℹ 283 more rows
#> # ℹ Use `print(n = ...)` to see more rows

tictoc::toc()
#> 3.343 sec elapsed

You can also turn this into a view and query it with dbplyr/dplyr instead:

query <- glue::glue_sql(
  "
  CREATE VIEW pbp AS
  SELECT
    *
  FROM read_parquet([{pbp_urls *}], filename = true)
  ",
  .con = conn
)
DBI::dbExecute(conn, query)
pbp <- dplyr::tbl(conn, "pbp")
tictoc::tic()
pbp |> 
  dplyr::filter(grepl("2021|2022|2023", filename), pass == 1 | rush == 1) |> 
  dplyr::summarise(
    n_plays = dplyr::n(),
    epa_per_play = mean(epa, na.rm = TRUE),
    .by = c(season, posteam, play_type)
  ) |> 
  dplyr::arrange(
    desc(season), posteam, desc(n_plays)
  ) |> 
  dplyr::collect()
#> # A tibble: 288 × 5
#>    season posteam play_type n_plays epa_per_play
#>     <int> <chr>   <chr>       <dbl>        <dbl>
#>  1   2023 ARI     pass          539      -0.231 
#>  2   2023 ARI     run           391       0.0351
#>  3   2023 ARI     no_play        48       0.191 
#>  4   2023 ATL     pass          499      -0.0738
#>  5   2023 ATL     run           465      -0.103 
#> # ℹ 283 more rows
#> # ℹ Use `print(n = ...)` to see more rows
tictoc::toc()
#> 3.491 sec elapsed

Using duckdb certainly adds a little verbosity - in exchange, we’ve managed to query and summarize the 20+ parquet files summing 1M+ rows without having to load it all into memory!

duckdbfs

duckdbfs was developed to wrap this latter workflow into a single function call that accepts a vector of URLs:

library(duckdbfs)
pbp <- duckdbfs::open_dataset(pbp_urls, filename = TRUE)
tictoc::tic()
pbp |> 
  dplyr::filter(grepl("2021|2022|2023", filename), pass == 1 | rush == 1) |> 
  dplyr::summarise(
    n_plays = dplyr::n(),
    epa_per_play = mean(epa, na.rm = TRUE),
    .by = c(season, posteam, play_type)
  ) |> 
  dplyr::arrange(
    desc(season), posteam, desc(n_plays)
  ) |> 
  dplyr::collect()
#> # A tibble: 288 × 5
#>    season posteam play_type n_plays epa_per_play
#>     <int> <chr>   <chr>       <dbl>        <dbl>
#>  1   2023 ARI     pass          539      -0.231 
#>  2   2023 ARI     run           391       0.0351
#>  3   2023 ARI     no_play        48       0.191 
#>  4   2023 ATL     pass          499      -0.0738
#>  5   2023 ATL     run           465      -0.103 
#> # ℹ 283 more rows
#> # ℹ Use `print(n = ...)` to see more rows
tictoc::toc()
#> 3.492 sec elapsed