class: title-slide, left, top # Bigger data ## with `arrow` and `duckdb` ### Tom Mock & Edgar Ruiz ### 2021-10-27 <br>
[jthomasmock.github.io/bigger-data](https://jthomasmock.github.io/bigger-data/#1)
[github.com/jthomasmock/bigger-data](https://github.com/jthomasmock/bigger-data) <span style='color:white;'>Slides released under</span> [CC-BY 2.0](https://creativecommons.org/licenses/by/2.0/)
] <div style = "position: absolute;top: 0px;right: 0;"><img src="https://images.unsplash.com/photo-1579538800945-46d13c694a36?ixid=MnwxMjA3fDB8MHxwaG90by1wYWdlfHx8fGVufDB8fHx8&ixlib=rb-1.2.1&auto=format&fit=crop&w=1470&q=80" alt="Two people walking up a large mountain" width="600"></img></div> --- layout: true <div class="my-footer"><span>https://colorado.rstudio.com/rsc/bigger-data-prez/</span></div> --- ### The initial ask > * Some discussion around in-process analytics > * The concept and implementations like DuckDb and Arrow > * What the current landscape of options looks like, what is and isn't stable enough for enterprise use > * DuckDB backend for dplyr * [Hannes Mühleisen & Mark Raasveldt slides, 2021](https://homepages.cwi.nl/~hannes/sigmod2021-muehleisen-inprocess-slides.pdf) * [New directions for Apache Arrow, Wes McKinney 2021-09-13](https://www.slideshare.net/wesm/new-directions-for-apache-arrow) --- ### The TLDR; -- #### `arrow` > `arrow` is software development platform for building high performance applications that process and transport large data sets -- * The `arrow` R package is an interface to data via the `arrow` backend, and has upcoming enhanced support for `dplyr`: - Existing ungrouped `mutate()`, `filter()`, `select()` in `arrow` 5.0 - `group_by()` + `summarize()` aggregation coming in 6.0 * `arrow` data can be "handed off" to `duckdb` with `to_duckdb()` for any `dbplyr` commands without data conversion. IE no serialization or data copying costs are incurred. --- ### The TLDR; #### `duckdb` > DuckDB is a lightweight, columnar oriented, on disk database designed to support **analytical query workloads** and has no external dependencies. It's the '`sqlite` for analytics' -- * The `duckdb` R package is an interface to `duckdb` from R * It uses postgres 'flavored' SQL with rich support for `dbplyr` * It is up to 10x faster than `sqlite` with the same lightweight structure (just a header and the implementation file) * It can work with existing on-disk data or interact with `arrow` without any transfer costs --- ### Working with bigger data? * Relational databases (IE SQL) are still around and hugely popular but... -- * Data and specifically _Local_ files are getting bigger -- * Data Warehouses/Data Lakes often use flat-file storage (`.csv`, `.parquet`, `.json` etc) -- If the team has a Data Lake (ie semi-structured raw data in CSV, Parquet, JSON, etc) _and_ access to a Data Lake query engine (like Dremio, Athena, Presto, Snowflake, etc), then they should use those tools in R via ODBC -- BUT for teams that don't, or have extracts that are still large. How do you work with data that isn't already in a database, and is bigger than your memory? --- ### Pause for one second If it _can_ fit in memory, then try out: * [`vroom::vroom()`](https://vroom.r-lib.org/) or [`data.table::fread()`](https://rdatatable.gitlab.io/data.table/reference/fread.html) for fast file reads _into_ R * [`vroom(col_select = c(column_name))`](https://vroom.r-lib.org/reference/vroom.html) also allows for partial reads (ie specific columns) -- * [`data.table`](https://rdatatable.gitlab.io/data.table/index.html) or the `dplyr` front-end to `data.table` via [`dtplyr`](https://dtplyr.tidyverse.org/) for fast and efficient in-memory analysis -- * Lastly, the [`collapse`](https://sebkrantz.github.io/collapse/) R package for limited capability, but hyper-performant data manipulation --- ### Two topics for today Two relatively lightweight options: * The [Apache Arrow](https://arrow.apache.org/) project * [`duckdb`](https://duckdb.org/docs/api/r) -- * Also, `arrow` can be used to expedite data transfer via [`sparklyr`](https://spark.rstudio.com/guides/arrow/) to great effect, but that brings into play Java, Scala, and `rJava` 🙃 --- class: inverse, middle ## `duckdb`, like sqlite<br>but _really_ fast <div style = "position: absolute;top: -120px;right: 0;"><img src="https://images.unsplash.com/photo-1465153690352-10c1b29577f8?ixid=MnwxMjA3fDB8MHxwaG90by1wYWdlfHx8fGVufDB8fHx8&ixlib=rb-1.2.1&auto=format&fit=crop&w=715&q=80" alt="Two people walking up a large mountain" width="575"></img></div> --- ### [`duckdb`](https://duckdb.org/docs/why_duckdb.html) > DuckDB plays well with `dbplyr` / `dplyr` for querying from R. - [duckdb.org/docs/api/r](https://duckdb.org/docs/api/r) -- #### Fast > DuckDB is designed to support analytical query workloads, also known as Online analytical processing (OLAP). These workloads are characterized by complex, relatively long-running queries that process significant portions of the stored dataset, for example aggregations over entire tables. -- #### Simple > DuckDB has no external dependencies, neither for compilation nor during run-time [basically a 10x faster version of SQLite] -- #### Feature Rich > Data can be stored in persistent, single-file databases. DuckDB is deeply integrated into Python and R for efficient interactive data analysis. Use[s] the PostgreSQL parser --- .pull-left[ ###
When to use DuckDB * Processing and storing tabular datasets, e.g. from CSV or Parquet files * **Interactive data analysis**, e.g. Joining & aggregate multiple large tables * Concurrent large changes, to multiple large tables, e.g. appending rows, adding/removing/updating columns * Large result set transfer to client ] -- .pull-right[ ###
When to not use DuckDB * Non-rectangular data sets, e.g. graphs, plaintext * High-volume transactional use cases (e.g. tracking orders in a webshop) * Large client/server installations for centralized enterprise data warehousing * Writing to a single database from multiple concurrent processes ] Credit: [duckdb.org/](https://duckdb.org/) --- ### `duckdb` + `dbplyr` `duckdb` uses Postgres-flavored SQL, so it has deep integration with `dbplyr` out of the box. -- ```r library("dplyr", warn.conflicts = FALSE) con <- DBI::dbConnect(duckdb::duckdb()) # create a temp database in memory duckdb::duckdb_register(con, "flights", nycflights13::flights) tbl(con, "flights") ``` ``` ## # Source: table<flights> [?? x 19] ## # Database: duckdb_connection ## year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time ## <int> <int> <int> <int> <int> <dbl> <int> <int> ## 1 2013 1 1 517 515 2 830 819 ## 2 2013 1 1 533 529 4 850 830 ## 3 2013 1 1 542 540 2 923 850 ## 4 2013 1 1 544 545 -1 1004 1022 ## 5 2013 1 1 554 600 -6 812 837 ## 6 2013 1 1 554 558 -4 740 728 ## 7 2013 1 1 555 600 -5 913 854 ## 8 2013 1 1 557 600 -3 709 723 ## 9 2013 1 1 557 600 -3 838 846 ## 10 2013 1 1 558 600 -2 753 745 ## # … with more rows, and 11 more variables: arr_delay <dbl>, carrier <chr>, ## # flight <int>, tailnum <chr>, origin <chr>, dest <chr>, air_time <dbl>, ## # distance <dbl>, hour <dbl>, minute <dbl>, time_hour <dttm> ``` --- ### `duckdb` + `dbplyr` `duckdb` uses Postgres-flavored SQL, so it has deep integration with `dbplyr` out of the box. -- ```r tbl(con, "flights") %>% group_by(dest) %>% summarise(delay = mean(dep_time, na.rm = TRUE)) ``` ``` ## # Source: lazy query [?? x 2] ## # Database: duckdb_connection ## dest delay ## <chr> <dbl> ## 1 IAH 1266. ## 2 MIA 1245. ## 3 BQN 1375. ## 4 ATL 1293. ## 5 ORD 1310. ## 6 FLL 1327. ## 7 IAD 1306. ## 8 MCO 1337. ## 9 PBI 1335. ## 10 TPA 1346. ## # … with more rows ``` ### How big is `nycflights13` anyway? ```r lobstr::obj_size(nycflights13::flights) %>% unclass() %>% scales::label_bytes()(.) ``` ``` ## [1] "41 MB" ``` -- ### Save to disk as a `.csv` ```r nycflights13::flights %>% janitor::clean_names() %>% write_delim("flights.csv", delim = ",") ``` -- ```r fs::file_info("flights.csv") %>% pull(size) ``` ``` ## 29.6M ``` --- ### Load into `duckdb` permanently ```r # write to disk as "flightDisk", other defaults to in memory con <- DBI::dbConnect(duckdb::duckdb(), "flightDisk") duckdb::duckdb_read_csv(conn = con, name = "flightsCSV", files = "flights.csv", header = TRUE, delim = ",", na.strings = "NA") ``` -- ```r DBI::dbListTables(con) ``` ``` ## [1] "flightsCSV" ``` --- ### Flying with the `duckdb`s ```r flight_tbl <- tbl(con, "flightsCSV") ``` --- ### Flying with the `duckdb`s ```r flight_tbl <- tbl(con, "flightsCSV") flight_tbl ``` ``` ## # Source: table<flightsCSV> [?? x 19] ## # Database: duckdb_connection ## year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time ## <int> <int> <int> <int> <int> <int> <int> <int> ## 1 2013 1 1 517 515 2 830 819 ## 2 2013 1 1 533 529 4 850 830 ## 3 2013 1 1 542 540 2 923 850 ## 4 2013 1 1 544 545 -1 1004 1022 ## 5 2013 1 1 554 600 -6 812 837 ## 6 2013 1 1 554 558 -4 740 728 ## 7 2013 1 1 555 600 -5 913 854 ## 8 2013 1 1 557 600 -3 709 723 ## 9 2013 1 1 557 600 -3 838 846 ## 10 2013 1 1 558 600 -2 753 745 ## # … with more rows, and 11 more variables: arr_delay <int>, carrier <chr>, ## # flight <int>, tailnum <chr>, origin <chr>, dest <chr>, air_time <int>, ## # distance <int>, hour <int>, minute <int>, time_hour <chr> ``` --- ### Flying with the `duckdb`s ```r flight_tbl %>% group_by(month, origin) %>% summarise(avg_delay = mean(dep_delay, na.rm = TRUE), .groups = "drop") %>% arrange(desc(avg_delay)) # all on disk still ``` ``` ## # Source: lazy query [?? x 3] ## # Database: duckdb_connection ## # Ordered by: desc(avg_delay) ## month origin avg_delay ## <int> <chr> <dbl> ## 1 7 JFK 23.8 ## 2 6 EWR 22.5 ## 3 7 EWR 22.0 ## 4 12 EWR 21.0 ## 5 6 JFK 20.5 ## 6 6 LGA 19.3 ## 7 7 LGA 19.0 ## 8 3 EWR 18.1 ## 9 4 EWR 17.4 ## 10 5 EWR 15.4 ## # … with more rows ``` --- ### Landing the `duckdb` in memory .pull-left[ ```r flight_tbl %>% group_by(origin, month, day) %>% summarise( avg_delay = mean(dep_delay, na.rm = T), .groups = "drop" ) %>% arrange(desc(avg_delay)) %>% # collect() to bring into R collect() %>% # and then it's like any other dataframe! ggplot(aes(x = month, y = avg_delay)) + geom_boxplot(aes(group = month)) + geom_jitter( aes(color = origin), alpha = 0.2, width = 0.4) + facet_wrap(~origin, ncol = 1) ``` ] -- .pull-right[ ![](index_files/figure-html/duckdbPlot-1.png)<!-- --> ] --- class: inverse, middle # `Apache`<br>`arrow` <div style = "position: absolute;top: -50px;right: 0;"><img src="https://images.unsplash.com/photo-1517173524746-c8e3c136d4f7?ixid=MnwxMjA3fDB8MHxwaG90by1wYWdlfHx8fGVufDB8fHx8&ixlib=rb-1.2.1&auto=format&fit=crop&w=687&q=80" alt="A white arrow on black tarmac" width="575"></img></div> --- ### `arrow` > [Apache Arrow](https://arrow.apache.org/) is a cross-language development platform for in-memory data. It specifies a standardized language-independent columnar memory format for flat and hierarchical data, organized for efficient analytic operations on modern hardware. It also provides computational libraries and zero-copy streaming messaging and interprocess communication. -- > The `arrow` package exposes an interface to the Arrow C++ library, enabling access to many of its features in R. It provides low-level access to the Arrow C++ library API and higher-level access through a `dplyr` backend and familiar R functions. -- For more info, checkout the slides from Wes McKinney as of [September 10th](https://www.slideshare.net/wesm/new-directions-for-apache-arrow) --- ### `arrow` and `duckdb` integration ```r arrow::InMemoryDataset$create(mtcars) %>% filter(mpg < 30) %>% arrow::to_duckdb() %>% # oneliner to move into duckdb group_by(cyl) %>% summarize(mean_mpg = mean(mpg, na.rm = TRUE)) ``` ``` ## # Source: lazy query [?? x 2] ## # Database: duckdb_connection ## cyl mean_mpg ## <dbl> <dbl> ## 1 6 19.7 ## 2 4 23.7 ## 3 8 15.1 ``` --- ### Big data ok... so `mtcars` is most definitely not big data. -- And ~30 Mb of `nycflights.csv` is not that impressive. -- Let's use the `arrow` example data, stay in NYC, and look at the `nyc-taxi` fare dataset -- ```r nyc_fares <- fs::dir_info("nyc-taxi", recurse = TRUE) %>% filter(type == "file") %>% summarise(n = n(), size = sum(size)) glue::glue("There are {nyc_fares$n} files, totaling {nyc_fares$size}!") ``` ``` ## There are 125 files, totaling 37G! ``` -- Now that we have a full quiver of data to pull from, let's shoot some `arrow`s -- Also RIP my laptop's hard drive space 💣 💥 💣 --- ### Shoot some `arrow`s Benchmarked with R 4.1.1 on a Macbook Pro 2017 (2.3 GHz Dual-Core Intel Core i5) with 16 GB 2133 MHz LPDDR3 and 256 GB SSD ```r library(duckdb) library(pins) library(arrow, warn.conflicts = FALSE) library(dplyr, warn.conflicts = FALSE) library(tictoc) # for timing # arrow::arrow_with_s3() # setwd("/Volumes/mock-external/") # arrow::copy_files("s3://ursa-labs-taxi-data", "nyc-taxi") # warning that downloads 37Gb of data! tic() ds <- open_dataset("nyc-taxi", partitioning = c("year", "month")) toc() # just to show that this is a fast process ``` ``` ## 0.062 sec elapsed ``` --- ### How many ar**rows**? ```r tic() full_collect <- summarise(ds, n = n()) %>% collect() %>% pull(n) n_rows <- scales::unit_format(unit = "billion", scale = 1e-9, accuracy = 0.01)(full_collect) glue::glue("There are approximately {n_rows} rows!") ``` ``` ## There are approximately 1.55 billion rows! ``` ```r toc() # wow that's fast. ``` ``` ## 1.213 sec elapsed ``` --- class: center, middle # Ludicrous speed! <iframe src="https://giphy.com/embed/izspP6uMbMeti" width="648" height="343" frameBorder="0" class="giphy-embed" allowFullScreen></iframe><p><a href="https://giphy.com/gifs/community-playstation-xo-izspP6uMbMeti"></a></p> --- ### How can you query 1.5 BILLION rows in a few seconds? > Apache Arrow processes large amounts of data quickly by using Single Instruction Multiple Data (SIMD). Sets of data are broken into batches that fit the cache layers of a CPU. - [Apache Arrow overview from OmniSci](https://www.omnisci.com/technical-glossary/apache-arrow) -- `arrow` provides an R interface to `feather` and `parquet`, along with more traditional formats (CSV, JSON, RDS). It also provides a translation of `dplyr` into `dbplyr`-style queries OUT of memory. -- `arrow` is a columnar data format, as opposed to more traditional storage that is row-based. -- With row-based, when `filter()` or `select()`, the query engine would have to scan every row, parse each column, extract the specified matches and then perform calculations -- With columnar formats, you can completely skip unnecessary columns, and even for a relatively "narrow" dataset of 6x columns, this means 1/6th the processing... or essentially a 600% performance gain. You can imagine the performance gains when you have wide data with dozens of additional columns that are only needed intermittently. --- ### How can you query 1.5 BILLION rows in a few seconds? --- ### How can you query ~~1.5 BILLION rows~~ in a few seconds? The answer? By **not** reading in 1.5 billion rows. -- `arrow`'s 'magic', is that `filter()` + `select()` + columnar data format are "cheat codes" for ludicrously fast subset queries. BUT it's also very optimized, and simply reading in entire files or entire columns is also very fast. [Neal Richardson, NYR 2020](https://enpiar.com/talks/nyr-2020/#41) -- <img src="https://enpiar.com/talks/nyr-2020/img/taxi-single-1.png" width="50%" /> --- ### One more piece of "magic" `arrow` and `arrow::open_dataset()` allows for the creation and reading of partitioned datasets. The `nyc-taxi` data is _actually_ 126 individual parquet files. Each file is about 300 Mb. -- This allows for "touching" only the necessary subsets, rather than searching the entire combined dataset. -- ```r ds <- open_dataset("nyc-taxi", partitioning = c("year", "month")) ``` -- ```r fs::dir_ls("nyc-taxi/", recurse = TRUE) %>% stringr::str_subset("parquet", negate = TRUE) %>% stringr::str_subset("\\/20[0-9]+\\/") %T>% {cat("There are", length(.), "files\n")} %>% return() ``` ``` ## There are 126 files ``` ``` ## [1] "nyc-taxi/2009/01" "nyc-taxi/2009/02" "nyc-taxi/2009/03" ## [4] "nyc-taxi/2009/04" "nyc-taxi/2009/05" "nyc-taxi/2009/06" ## [7] "nyc-taxi/2009/07" "nyc-taxi/2009/08" "nyc-taxi/2009/09" ## [10] "nyc-taxi/2009/10" "nyc-taxi/2009/11" "nyc-taxi/2009/12" ## [13] "nyc-taxi/2010/01" "nyc-taxi/2010/02" "nyc-taxi/2010/03" ## [16] "nyc-taxi/2010/04" "nyc-taxi/2010/05" "nyc-taxi/2010/06" ## [19] "nyc-taxi/2010/07" "nyc-taxi/2010/08" "nyc-taxi/2010/09" ## [22] "nyc-taxi/2010/10" "nyc-taxi/2010/11" "nyc-taxi/2010/12" ## [25] "nyc-taxi/2011/01" "nyc-taxi/2011/02" "nyc-taxi/2011/03" ## [28] "nyc-taxi/2011/04" "nyc-taxi/2011/05" "nyc-taxi/2011/06" ## [31] "nyc-taxi/2011/07" "nyc-taxi/2011/08" "nyc-taxi/2011/09" ## [34] "nyc-taxi/2011/10" "nyc-taxi/2011/11" "nyc-taxi/2011/12" ## [37] "nyc-taxi/2012/01" "nyc-taxi/2012/02" "nyc-taxi/2012/03" ## [40] "nyc-taxi/2012/04" "nyc-taxi/2012/05" "nyc-taxi/2012/06" ## [43] "nyc-taxi/2012/07" "nyc-taxi/2012/08" "nyc-taxi/2012/09" ## [46] "nyc-taxi/2012/10" "nyc-taxi/2012/11" "nyc-taxi/2012/12" ## [49] "nyc-taxi/2013/01" "nyc-taxi/2013/02" "nyc-taxi/2013/03" ## [52] "nyc-taxi/2013/04" "nyc-taxi/2013/05" "nyc-taxi/2013/06" ## [55] "nyc-taxi/2013/07" "nyc-taxi/2013/08" "nyc-taxi/2013/09" ## [58] "nyc-taxi/2013/10" "nyc-taxi/2013/11" "nyc-taxi/2013/12" ## [61] "nyc-taxi/2014/01" "nyc-taxi/2014/02" "nyc-taxi/2014/03" ## [64] "nyc-taxi/2014/04" "nyc-taxi/2014/05" "nyc-taxi/2014/06" ## [67] "nyc-taxi/2014/07" "nyc-taxi/2014/08" "nyc-taxi/2014/09" ## [70] "nyc-taxi/2014/10" "nyc-taxi/2014/11" "nyc-taxi/2014/12" ## [73] "nyc-taxi/2015/01" "nyc-taxi/2015/02" "nyc-taxi/2015/03" ## [76] "nyc-taxi/2015/04" "nyc-taxi/2015/05" "nyc-taxi/2015/06" ## [79] "nyc-taxi/2015/07" "nyc-taxi/2015/08" "nyc-taxi/2015/09" ## [82] "nyc-taxi/2015/10" "nyc-taxi/2015/11" "nyc-taxi/2015/12" ## [85] "nyc-taxi/2016/01" "nyc-taxi/2016/02" "nyc-taxi/2016/03" ## [88] "nyc-taxi/2016/04" "nyc-taxi/2016/05" "nyc-taxi/2016/06" ## [91] "nyc-taxi/2016/07" "nyc-taxi/2016/08" "nyc-taxi/2016/09" ## [94] "nyc-taxi/2016/10" "nyc-taxi/2016/11" "nyc-taxi/2016/12" ## [97] "nyc-taxi/2017/01" "nyc-taxi/2017/02" "nyc-taxi/2017/03" ## [100] "nyc-taxi/2017/04" "nyc-taxi/2017/05" "nyc-taxi/2017/06" ## [103] "nyc-taxi/2017/07" "nyc-taxi/2017/08" "nyc-taxi/2017/09" ## [106] "nyc-taxi/2017/10" "nyc-taxi/2017/11" "nyc-taxi/2017/12" ## [109] "nyc-taxi/2018/01" "nyc-taxi/2018/02" "nyc-taxi/2018/03" ## [112] "nyc-taxi/2018/04" "nyc-taxi/2018/05" "nyc-taxi/2018/06" ## [115] "nyc-taxi/2018/07" "nyc-taxi/2018/08" "nyc-taxi/2018/09" ## [118] "nyc-taxi/2018/10" "nyc-taxi/2018/11" "nyc-taxi/2018/12" ## [121] "nyc-taxi/2019/01" "nyc-taxi/2019/02" "nyc-taxi/2019/03" ## [124] "nyc-taxi/2019/04" "nyc-taxi/2019/05" "nyc-taxi/2019/06" ``` --- ### Big data, new data .left-wide[ ```r tic() ds %>% filter(total_amount > 100, year == 2015) %>% select(tip_amount, total_amount, passenger_count) %>% # calculate a new column, on disk! mutate(tip_pct = 100 * tip_amount / total_amount) %>% group_by(passenger_count) %>% summarise( mean_tip_pct = mean(tip_pct), n = n() ) %>% collect() %>% print() toc() ``` ] -- .right-narrow[ ``` ## # A tibble: 10 × 3 ## passenger_count mean_tip_pct n ## <int> <dbl> <int> ## 1 1 13.5 143087 ## 2 2 12.6 34418 ## 3 6 18.2 3338 ## 4 3 11.9 8922 ## 5 4 11.1 4771 ## 6 5 16.8 5806 ## 7 0 9.78 380 ## 8 8 16.5 32 ## 9 9 12.7 42 ## 10 7 16.3 11 ``` ``` ## 6.106 sec elapsed ``` ] --- ### Big data, all the rows OK not all the rows, but at least the rows with actual passengers (there's some negative values in `passenger_count`... 🤔) -- .left-wide[ ```r tic() ds %>% select(passenger_count, total_amount) %>% filter(between(passenger_count, 0, 6)) %>% group_by(passenger_count) %>% summarise( n = n(), mean_total = mean(total_amount, na.rm = TRUE) ) %>% collect() %>% # pull into memory! arrange(desc(passenger_count)) toc() ``` ] -- .right-narrow[ ``` ## # A tibble: 7 × 3 ## passenger_count n mean_total ## <int> <int> <dbl> ## 1 6 37241244 15.1 ## 2 5 99064441 13.9 ## 3 4 32443710 14.5 ## 4 3 67096194 14.4 ## 5 2 227454966 14.8 ## 6 1 1078624900 14.1 ## 7 0 5809809 13.0 ``` ``` ## 46.32 sec elapsed ``` ] --- ### Big data, elsewhere Same data, but on a cheap 4TB hard drive, 5400 RPM, connected via USB 2.0. .pull-left[ ```r ds_external <- open_dataset( "/Volumes/mock-external/nyc-taxi", partitioning = c("year", "month")) ``` ```r tic() ds_external %>% filter(total_amount > 100, year == 2015) %>% select(tip_amount, total_amount, passenger_count) %>% # calculate a new column, on remote disk mutate(tip_pct = 100 * tip_amount / total_amount) %>% group_by(passenger_count) %>% summarise(mean_tip_pct = mean(tip_pct), n = n()) %>% collect() toc() ``` ] -- .pull-right[ ``` ## # A tibble: 10 × 3 ## passenger_count mean_tip_pct n ## <int> <dbl> <int> ## 1 1 13.5 143087 ## 2 2 12.6 34418 ## 3 5 16.8 5806 ## 4 4 11.1 4771 ## 5 6 18.2 3338 ## 6 3 11.9 8922 ## 7 0 9.78 380 ## 8 7 16.3 11 ## 9 9 12.7 42 ## 10 8 16.5 32 ``` ``` ## 58.802 sec elapsed ``` ] --- ### We can shoot ducks with arrows Going back to local SSD and using `arrow::to_duckdb()`, note the full `group_by()`/`mutate()` support! -- .left-wide[ ```r tic() ds %>% filter(total_amount > 100, year == 2015) %>% select(tip_amount, total_amount, passenger_count) %>% # use arrow to populate directly into a duckdb arrow::to_duckdb() %>% group_by(passenger_count) %>% # group_by mutate! mutate(tip_pct = 100 * tip_amount / total_amount) %>% filter(tip_pct >= 25) %>% summarise(n = n()) %>% collect() toc() ``` ] -- .right-narrow[ ``` ## # A tibble: 9 × 2 ## passenger_count n ## <int> <dbl> ## 1 0 6 ## 2 1 6464 ## 3 2 1276 ## 4 3 324 ## 5 4 205 ## 6 5 345 ## 7 6 303 ## 8 7 1 ## 9 9 1 ``` ``` ## 7.468 sec elapsed ``` ] --- ### `pins` Now, as far as using `arrow`, since they are single files (eg `feather` or `parquet`), you _could_ `pin` them -- Note, `pins` 1.0 now has native support for `feather` which is essentially an `arrow` dataframe on disk. -- `duckdb` is two files, but you should just use `feather` anyway as it's fast to read, relatively efficient to store, and allows you to mix `arrow` native work along with `arrow::to_duckdb()` if you need `duckdb` querying. -- You could also go `parquet` which is very efficient to store and fast to read. In `pins` you would switch from `pin_write()` to `pin_upload()`/`pin_download() %>% arrow::read_parquet()` since it's treated as a file rather than one of the native `read` file types. --- ### `pins` comparison .pull-left[ ```r query_feather <- function(){ tic() feather_file <- pin_download( board_rsc, "thomas/big-nyc-feather") feather_file <- arrow::open_dataset( feather_file, format = "feather") summary_feather <- feather_file %>% select(fare_amount, vendor_id) %>% filter(vendor_id == "1") %>% summarise(mean = mean(fare_amount)) %>% collect() toc() } ``` ] -- .pull-right[ ```r query_rds <- function(){ tic() pinned_rds <- pin_download( board_rsc, "thomas/big-nyc-rds") rds_file <- readr::read_rds(pinned_rds) summary_rds <- rds_file %>% select(fare_amount, vendor_id) %>% filter(vendor_id == "1") %>% summarise(mean = mean(fare_amount)) toc() } ``` ] --- ### `pins` comparison `feather` is native to `arrow`, so it allows for rapid file reading AND allows for partial reading, through a combination of `filter()` + `select()` + `summarize()` -- So, `pins` + `feather` + `arrow::open_dataset()` allows for insanely fast queries on relatively large datasets. -- ```r query_feather() #> 10.2 sec elapsed query_rds() #> 14.4 sec elapsed ``` -- I promised you speed, but they're the same! You can't "beat" download times 🤷 --- ### `pins` comparison While you can't beat file transfer, disk space is **infinitely** cheaper than memory... -- And your downloaded file still exists! ```r file.exists("~/Library/Caches/pins/rsc-e62371cfd77db754024f9c5ed3556a73/2f2ee2e3-564a-4791-a7f3-01e22ad2939f/48597/big-nyc-feather.arrow") ``` ``` ## [1] TRUE ``` -- ```r # Windowed query query_feather() #> 0.554 sec elapsed # read ENTIRE file in query_rds() #> 11.8 sec elapsed ``` --- ### `pins` comparison Now, you could get similar query speed ups by loading the data completely into memory 1x, but you'd have to "eat" the initial read time of 10+ seconds. -- Also... now you're lugging around at least 1 Gb of data for each session... and there are _very few times_ when all of your users **needs** 1 Gb of data all at once. -- By selecting "windows" of data rapidly, you get the ability to perform useful queries/summaries across large swaths of data with minimal memory load, and minimal read time. --- ### `pins` comparison So, in your `shiny` apps: -- ```r feather_file <- pin_download(board_rsc, "thomas/big-nyc-feather") feather_ds <- arrow::open_dataset(feather_file, format = "feather") server <- function(input, output, session) { # Lots more code feather_ds %>% some_queries %>% collect() %>% if(type == "plot"){ plot_it() } else if(type == "table"){ table_it() } else { work_it() %>% make_it() %>% do_it() %>% makes_us() %>% harder_better_faster_stronger() } } ``` --- ### `pins` + `duckdb` Cached file still exists, so can be used with `duckdb` as well! -- ```r tic() feather_file <- pin_download(board_rsc, "thomas/big-nyc-feather") feather_file <- arrow::open_dataset(feather_file, format = "feather") summary_duck <- feather_file %>% select(fare_amount, vendor_id) %>% filter(vendor_id == "1") %>% arrow::to_duckdb() %>% summarise(mean = mean(fare_amount, na.rm = TRUE)) %>% collect() toc() #> 1.48 sec ``` --- ### Pause for a second While it's possible to have these work with `shiny` or interactive apps via `pins`, again I think it's worth considering `shiny` apps as **requiring** a proper database if working with that much data. --- ### Keeping your `pins` at a distance If your team happens to use AWS, you can make use of [native S3 support with `arrow`](https://arrow.apache.org/docs/r/articles/fs.html) -- Similar to our `pins` workflow, there is unavoidable file-transfer time BUT you can read the files remotely and you _can_ partition the data in S3. -- This allows `arrow` to intelligently transfer/read only portions of the partitions as necessary, optimizing both the file transfer AND the query/read. --- ### Things we didn't talk about * Snowflake (uses `arrow` under the hood) -- * Dremio (uses `arrow` under the hood) -- * Rinse and repeat across many other databases/data warehouse engines, as `arrow` is quickly becoming a standard tool for lots of database/data warehouse/data lake providers -- The core idea is if you have a "real" data warehouse or data lake, your team has likely already invested in some way to perform queries (and you can probably just use ODBC or equivalent). -- BUT if you have a need to work with large flat-files, whether extracted from your data warehouse or other methods, `arrow` and/or `duckdb` are very attractive tools. -- `pins` can be leveraged along with `arrow`, but you lose a lot of the power of partitioning, since it's a single file. That being said, with `shiny` app sized data, you should be more than fine with a single file. --- ### Should you use `arrow` with `shiny`? * If you are relying on CSVs today, sure `parquet` is an efficient storage format, meaning it will be "cheaper" to move around and faster to read, and can still be used via `pins` -- * BUT should you move around 1 Gb flat files for `shiny` apps with _many_ users? Probably not. -- * Could it be useful for large-data `shiny` apps with low user counts? 🤷 Possibly but not necessarily something I'm keen on suggesting as 'ideal' -- * But this requires you passing around a large flat file, and typically a dedicated database table will be a better option --- ### Using a dedicated database, aka 3 Tier Architecture * **Presentation Tier:** Javascript/HTML/CSS, ie Front-end, processed in client browser * **Application/Logic Tier:** R/Python, processed in memory, meaning `shiny`/`dash`/`streamlit` OR connections to `plumber`/`flask`/`FastAPI`, ie it can include "microservices" or be a "monolith" * **Data Tier:** Database/SQL/etc, processed in the database backend ![](https://docs.aws.amazon.com/whitepapers/latest/serverless-multi-tier-architectures-api-gateway-lambda/images/image2.png) Expanded in greater detail at: [https://db.rstudio.com/best-practices/dashboards/](https://db.rstudio.com/best-practices/dashboards/) --- ### Summary - For both R/Python, `arrow` and/or `DuckDB` can make working with _larger_ out of memory data possible and much faster for interactive work -- - `arrow` alone can push `filter()`/`select()` calls _into_ the data, along with basic `group_by()`: `mutate()` + `summarize()` to data on disk -- - `duckdb` further adds full `dbplyr` support, and can be used with native `arrow` BUT it doesn't _require_ `arrow` in the process. Could be a standalone local database on disk. -- - If you use `pins`, you can _also_ use `arrow` via `parquet`/`feather` for lightweight files, fast full or partial reads -- - `arrow` + `duckdb` with large data are likely not appropriate for high-concurrency `shiny` apps -- - Large datasets with potentially high concurrency is where a database (and `dbplyr`) makes sense, and maybe that database is columnar/uses `arrow` behind the scenes! --- ### Links - [`arrow` docs](https://arrow.apache.org/docs/r/articles/dataset.html) - [`arrow` 6.0 pre-release news](https://github.com/apache/arrow/commit/62ff6590fc43e83381a8ec722a9c241a49619c78) - [`duckdb` R package](https://duckdb.org/docs/api/r) - [`dbplyr`](https://dbplyr.tidyverse.org/) - [`db.rstudio.com`, Enterprise Ready Dashboards](https://db.rstudio.com/best-practices/dashboards/)