--- title: "Extracting Weather/Climate Geospatial Data with `chopin`" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Extracting Weather/Climate Geospatial Data with `chopin`} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- # Introduction This document demonstrates to parallelize weather/climate geospatial data processing with `chopin` and what cases users may take advantage of parallel processing or not. We will cover the following formats: We consider TerraClimate and PRISM data which have its own data format each. [Parameter-elevation Regressions on Independent Slopes Model (PRISM)](https://prism.oregonstate.edu) is a high-resolution (800-1,000 meters) gridded climate dataset available in the BIL (band interleaved by line) format which is readable with GDAL. TerraClimate is a high-resolution (5 km) gridded climate dataset in the NetCDF format which is readable with GDAL. | Data | Source | Resolution | File format | |:------------:|:------------------------|:---------------|:------------| | TerraClimate | University of Idaho | 0.0417 degrees | NetCDF | | PRISM | Oregon State University | 0.0083 degrees | BIL | The spatial resolution of TerraClimate data commensurates 5 km in the equator, whereas that of PRISM data is approximately 1 km. The data are available in the NetCDF format which is readable with GDAL. We will use the `terra` package to read the data. ## Prepare target datasets We will consider the populated places centroids in the mainland United States (i.e., excluding Alaska, Hawaii, and the territories). We will use the [`tigris`](https://cran.r-project.org/web/packages/tigris/index.html) package to download the data. | Data | Number of features | Source | Type | |:-------------------:|:----------------|:----------------|:----------------| | Census places | 31,377 | US Census Bureau | Polygon | | Block groups | 238,193 | US Census Bureau | Polygon | | Grid points in the southeastern US | 1,204,904 | Author, US Census Bureau (state polygons) | Point | ```r library(chopin) library(terra) library(sf) library(stars) library(future) library(future.mirai) library(parallelly) library(dplyr) library(tigris) library(tictoc) options(tigris_use_cache = TRUE, sf_use_s2 = FALSE) ``` ## Hardware specification We used a machine with 112 physical CPU cores with 750 GB of memory, but we used only a portion of the cores (up to 20) for the demonstration. No maximum possible memory usage was set. However, in typical HPC management systems, users are required to request the number of cores and memory for their jobs. The number of cores and memory capacity should be considered when users parallelize the extraction process. ## Download and preprocess {.tabset} ### Download ```r # populated places state <- tigris::states(year = 2022) statemain <- state[!state$STUSPS %in% c("AK", "HI", "PR", "VI", "GU", "MP", "AS"), ] target_states <- statemain$GEOID popplace <- lapply(target_states, function(x) tigris::places(x, year = 2022)) %>% do.call(rbind, .) saveRDS(popplace, "./input/populated_place_2022.rds", compress = "xz") ``` ### Read ```r state <- tigris::states(year = 2022) statemain <- state[!state$STUSPS %in% c("AK", "HI", "PR", "VI", "GU", "MP", "AS"), ] target_states <- statemain$GEOID # download populated places options(tigris_use_cache = TRUE) popplace <- lapply(target_states, function(x) { tigris::places(year = 2022, state = x) }) popplace <- do.call(rbind, popplace) # generate circular buffers with 10 km radius popplacep <- sf::st_centroid(popplace, of_largest_polygon = TRUE) %>% sf::st_transform("EPSG:5070") popplacep2 <- sf::st_centroid(popplace, of_largest_polygon = TRUE) popplaceb <- sf::st_buffer(popplacep, dist = units::set_units(10, "km")) ``` # TerraClimate TerraClimate data are provided in yearly NetCDF files, each of which contains monthly layers. We will read the data with the [`terra`](https://cran.r-project.org/web/packages/terra/index.html) package and preprocess the data to extract the annual mean and sum of the bands by types of columns. For reproducibility, run the code below with our companion package `amadeus` to download terraClimate data. Please note that it will take a while depending on the internet speed. ```r rlang::check_installed("amadeus") tcli_variables <- c( "aet", "def", "pet", "ppt", "q", "soil", "swe", "PDSI", "srad", "tmax", "tmin", "vap", "vpd", "ws" ) amadeus::download_terraclimate( variables = tcli_variables, year = c(2000, 2022), directory_to_save = "./input/terraClimate", acknowledgement = TRUE, download = TRUE, remove_command = TRUE ) ``` ```r # wbd ext_mainland <- c(xmin = -126, xmax = -64, ymin = 22, ymax = 51) ext_mainland <- terra::ext(ext_mainland) path_tc <- file.path("input", "terraClimate") path_tc_files <- list.files( path = path_tc, pattern = "*.nc$", full.names = TRUE ) path_tc_files ``` ``` #> [1] "input/terraClimate/TerraClimate_aet_2000.nc" #> [2] "input/terraClimate/TerraClimate_aet_2001.nc" #> [truncated] #> [321] "input/terraClimate/TerraClimate_ws_2021.nc" #> [322] "input/terraClimate/TerraClimate_ws_2022.nc" ``` ## Preprocessing Fourteen variables are available in TerraClimate data. The table below is from [the TerraClimate website](https://www.climatologylab.org/terraclimate-variables.html). | Variable | Description | Units | |:--------:|:------------|:------| | aet | Actual Evapotranspiration, monthly total | mm | | def | Climate Water Deficit, monthly total | mm | | PDSI | Palmer Drought Severity Index, at end of month | unitless | | pet | Potential evapotranspiration, monthly total | mm | | ppt | Precipitation, monthly total | mm | | q | Runoff, monthly total | mm | | soil | Soil Moisture, total column - at end of month | mm | | srad | Downward surface shortwave radiation | W/m2 | | swe | Snow water equivalent - at end of month | mm | | tmax | Max Temperature, average for month | C | | tmin | Min Temperature, average for month | C | | vap | Vapor pressure, average for month | kPa | | vpd | Vapor Pressure Deficit, average for month | kpa | | ws | Wind speed, average for month | m/s | The variables can be categorized into two types: those that will be summed and those that will be averaged. - Sum: `aet`, `def`, `pet`, `ppt`, `q`, `soil`, and `swe`. - Mean: `PDSI`, `srad`, `tmax`, `tmin`, `vap`, `vpd`, and `ws`. Following that rationale, we will preprocess the data by summing the first seven layers and averaging the rest of the layers. The code blocks below follow "split-apply-combine" strategy. Note that `terra::tapp` aggregates layers by its attributes such as time or custom indices. ```r options(future.globals = FALSE) # some bands should be summed bandnames <- c( "aet", "def", "PDSI", "pet", "ppt", "q", "soil", "srad", "swe", "tmax", "tmin", "vap", "vpd", "ws" ) bandnames_sorted <- sort(bandnames) # single nc file, yearly aggregation by fun value # band for summation bandnames_sum <- c("aet", "def", "pet", "ppt", "q", "soil", "swe") # band for averaging bandnames_avg <- c("PDSI", "srad", "tmax", "tmin", "vap", "vpd", "ws") # mean: temporally marginal pixel mean (i.e., monthly -> yearly) # sum: temporally marginal pixel sum (i.e., monthly -> yearly) # Preprocessed data are stored in tictoc::tic("sum: 7 layers") netcdf_read_sum <- split(bandnames_sum, bandnames_sum) |> lapply(function(x) { grep(paste0("(", x, ")"), path_tc_files, value = TRUE) }) |> lapply(function(x) { terra::tapp(terra::rast(x, win = ext_mainland, snap = "out"), index = "years", fun = "sum") }) netcdf_read_sum <- Reduce(c, netcdf_read_sum) tictoc::toc() ``` ``` #> sum: 7 layers: 177.974 sec elapsed ``` ```r names(netcdf_read_sum) <- paste0(rep(bandnames_sum, each = 23), "_", rep(2000:2022, 7)) netcdf_read_sum ``` ``` #> class : SpatRaster #> dimensions : 696, 1488, 161 (nrow, ncol, nlyr) #> resolution : 0.04166667, 0.04166667 (x, y) #> extent : -126, -64, 22, 51 (xmin, xmax, ymin, ymax) #> coord. ref. : +proj=longlat +ellps=WGS84 +no_defs #> source(s) : memory #> names : aet_2000, aet_2001, aet_2002, aet_2003, ... #> min values : 22.9, 27.3, 11, 30.8, ... #> max values : 1270.9, 1366.6, 1399, 1411.1, ... #> time (years): 2000 to 2022 ``` ```r # tictoc::tic("mean: 7 layers") # netcdf_read_mean <- # split(bandnames_avg, bandnames_avg) |> # lapply(function(x) { # grep(paste0("(", x, ")"), path_tc_files, value = TRUE) # }) |> # lapply(function(x) { # terra::tapp(terra::rast(x, win = ext_mainland, snap = "out"), index = "years", fun = "mean") # }) |> # Reduce(f = c, x = _) # tictoc::toc() # names(netcdf_read_mean) <- # sprintf("%s_%d", rep(bandnames_avg, each = 23), rep(2000:2022, 7)) # netcdf_read_mean ``` > \[!WARNING\] Stacking raster data may take a long time and consume a large amount of memory depending on users' area of interest and data resolution. Users need to consider the memory capacity of the system before stacking rasters. We have 14 data elements for 23 years with 12 months each. Below demonstrates the summary of the data layers that were averaged at circular buffer polygons with 10 kilometers (10,000 meters) radius. To leverage multiple cores in your machine, run `future::availableCores()` to check the number of cores and set the number of workers in `future::plan` accordingly. Typically, there are two logical cores in each physical core in modern central processing units. The number of workers should be set to up to the number of physical cores in the machine for optimal performance. The example below uses `future::multicore` which shares the memory across the workers. ```r tic("multi threads (grid)") future::plan(future::multicore, workers = 8L) grid_init <- chopin::par_pad_grid( popplacep2, mode = "grid", padding = 1e4, nx = 4L, ny = 2L ) multi_grid <- chopin::par_grid( grids = grid_init, fun_dist = extract_at, y = popplacep2, x = netcdf_read_sum, id = "GEOID", func = "mean", radius = 1e4 ) toc() ``` ``` #> multi threads (grid): 16.668 sec elapsed ``` ```r system.time( multi <- grep( paste0("(", paste(bandnames_sum, collapse = "|"), ")"), path_tc_files, value = TRUE ) %>% chopin::par_multirasters( filenames = ., fun_dist = extract_at, y = popplacep2, x = rast(), # ignored id = "GEOID", func = "mean", radius = 1e4 ) ) ``` ``` #> user system elapsed #> 5377.495 231.654 762.147 ``` ```r future::plan(future::sequential) # single thread tic("single thread") single <- exactextractr::exact_extract( netcdf_read_sum, sf::st_as_sf(popplaceb), fun = "mean", stack_apply = TRUE, force_df = TRUE, append_cols = c("GEOID"), progress = FALSE ) toc() ``` ``` #> single thread: 21.161 sec elapsed ``` > \[!CAUTION\] All Windows users and RStudio users (all platforms) will not be able to use `future::multicore` due to the restriction in `future` package. Please `future::multisession` instead and note that this option will runs slower than `future::multicore` case. # PRISM dataset PRISM data are provided in monthly 30-year normal BIL files. Assuming that a user wants to summarize 30-year normal precipitation at 10 kilometers circular buffers of the geogrpahic centroids of [US Census Places](https://www2.census.gov/geo/pdfs/maps-data/data/tiger/tgrshp2023/TGRSHP2023_TechDoc_Ch4.pdf) (from p.26), we demonstrate the extraction process with the `chopin` package. ## Download and preprocess {.tabset} ### Download ```r # populated places # mainland states state <- tigris::states(year = 2022) statemain <- state[!state$STUSPS %in% c("AK", "HI", "PR", "VI", "GU", "MP", "AS"), ] target_states <- statemain$GEOID popplace <- lapply(target_states, function(x) tigris::places(x, year = 2022)) %>% do.call(rbind, .) saveRDS(popplace, "./input/populated_place_2022.rds", compress = "xz") ``` ### Read ```r bils <- list.files("input", "bil$", recursive = TRUE, full.names = TRUE) bilssds <- terra::rast(bils[-13]) popplace2 <- sf::st_transform(popplace, crs = terra::crs(bilssds)) popplaceb2 <- sf::st_transform(popplaceb, crs = terra::crs(bilssds)) ``` > \[!IMPORTANT\] `chopin::par_pad_grid` works the best with point datasets since each grid will clip the input features when parallelized. Polygon inputs will result in duplicate values in the output and lead to take longer to complete. ## Grid parallelization In the same vein as the TerraClimate data, we will use the `chopin` package to parallelize the extraction process with grid strategy. ```r exgrid <- chopin::par_pad_grid( popplacep2, mode = "grid", padding = 1e4, nx = 4L, ny = 2L ) future::plan(future::multicore, workers = 8L) system.time( exmulti <- chopin::par_grid( exgrid, fun_dist = extract_at, y = popplacep2, x = bilssds, radius = units::set_units(1e4, "meter"), id = "GEOID", func = "mean" ) ) ``` ``` #> Your input function was successfully run at CGRIDID: 1 #> Your input function was successfully run at CGRIDID: 2 #> Your input function was successfully run at CGRIDID: 3 #> Your input function was successfully run at CGRIDID: 4 #> Your input function was successfully run at CGRIDID: 5 #> Your input function was successfully run at CGRIDID: 6 #> Your input function was successfully run at CGRIDID: 7 #> Your input function was successfully run at CGRIDID: 8 ``` ``` #> user system elapsed #> 33.090 13.254 14.571 ``` ```r system.time( exsingle <- exactextractr::exact_extract( bilssds, popplaceb2, fun = "mean", stack_apply = TRUE, force_df = TRUE, append_cols = "GEOID", max_cells_in_memory = 2.14e9, progress = FALSE ) ) ``` ``` #> user system elapsed #> 19.347 1.927 21.716 ``` ```r future::plan(future::sequential) ``` # Scaled up examples ## Larger buffer sizes Examples above showed that the difference between the parallelized and single-threaded extraction process is not significant. We will increase the buffer size to 50 kilometers and compare the performance of the parallelized and single-threaded extraction process. ```r # make buffers popplaceb5 <- sf::st_buffer(popplacep, dist = units::set_units(50, "km")) %>% sf::st_transform(terra::crs(bilssds)) system.time( exsingle5 <- exactextractr::exact_extract( bilssds, popplaceb5, fun = "mean", stack_apply = TRUE, force_df = TRUE, append_cols = "GEOID", max_cells_in_memory = 2.14e9, progress = FALSE ) ) ``` ``` #> user system elapsed #> 140.373 2.302 144.552 ``` ```r exgrid5k <- chopin::par_pad_grid( popplacep2, mode = "grid", padding = 5e4, nx = 4L, ny = 2L ) future::plan(future::multicore, workers = 8L) system.time( exmulti5k <- chopin::par_grid( exgrid5k, fun_dist = extract_at, y = popplacep2, x = bilssds, radius = 5e4, id = "GEOID", func = "mean" ) ) ``` ``` #> Your input function was successfully run at CGRIDID: 1 #> Your input function was successfully run at CGRIDID: 2 #> Your input function was successfully run at CGRIDID: 3 #> Your input function was successfully run at CGRIDID: 4 #> Your input function was successfully run at CGRIDID: 5 #> Your input function was successfully run at CGRIDID: 6 #> Your input function was successfully run at CGRIDID: 7 #> Your input function was successfully run at CGRIDID: 8 #> user system elapsed #> 152.344 11.003 60.409 ``` ```r future::plan(future::sequential) ``` > \[!NOTE\] The example above used strings of raster file paths for `surf` argument in `chopin::extract_at_buffer`. `terra::rast` at multiple file paths will return a `SpatRaster` with multiple layers **only** if the rasters have the same extent and resolution. ## Larger number of features This example uses 1,204,934 1-km grid points in the southeastern United States to summarize seven layers of TerraClimate. ```r ## generate 1km grid points in the southeastern US States stt <- tigris::states(year = 2020) targ_states <- c("NC", "SC", "GA", "FL", "AL", "MS", "TN", "LA", "AR") stt_targ <- stt[stt$STUSPS %in% targ_states, ] ``` ```r stt_t <- sf::st_transform(stt_targ, "EPSG:5070") stt_g <- sf::st_sample(stt_t, type = "regular", 1204934) stt_g <- sf::st_as_sf(stt_g) sf::st_geometry(stt_g) <- "geometry" stt_g$pid <- seq(1, nrow(stt_g)) ``` ```r stt_gb <- sf::st_buffer(stt_g, units::set_units(10, "km")) tic() single_2m <- exactextractr::exact_extract( netcdf_read_sum, stt_gb, fun = "mean", stack_apply = TRUE, force_df = TRUE, max_cells_in_memory = 2.14e9, progress = FALSE ) toc() ``` ``` #> 855.908 sec elapsed ``` ```r stt_gbg <- chopin::par_pad_grid( stt_g, mode = "grid", padding = 5e3, nx = 5L, ny = 5L ) future::plan(future::multicore, workers = 8L) system.time( stt5k <- chopin::par_grid( stt_gbg, fun_dist = extract_at, y = stt_g, x = netcdf_read_sum, id = "pid", radius = 1e4, func = "mean", max_cells = 2e7 ) ) ``` ``` #> user system elapsed #> 6.745 4.102 434.041 ``` ```r future::plan(future::sequential) ``` ## Finely resolved vector {.tabset} Using PRISM data, the example below summarizes the mean values of each data element at census block groups. ### Download ```r # set state=NULL and cb=TRUE will download the block groups for the entire US bg <- tigris::block_groups(state = NULL, cb = TRUE, year = 2020) sf::write_sf(bg, file.path("input", "Blockgroups_2020.gpkg")) ``` ### Extract ```r ## extract prism by par_hierarchy bgsf <- sf::st_read("input/Blockgroups_2020.gpkg") ``` ``` #> Reading layer `Blockgroups_2020' from data source #> `/ddn/gs1/home/songi2/projects/chopin/input/Blockgroups_2020.gpkg' #> using driver `GPKG' #> Simple feature collection with 242298 features and 11 fields #> Geometry type: MULTIPOLYGON #> Dimension: XY #> Bounding box: xmin: -179.1467 ymin: -14.5487 xmax: 179.7785 ymax: 71.38782 #> Geodetic CRS: NAD83 ``` ```r bgsf_main <- bgsf %>% dplyr::filter(!STATEFP %in% c("02", "15", "72", "66", "78", "60", "69")) ## extract prism at bg system.time( exsingle <- exactextractr::exact_extract( bilssds, bgsf_main, fun = "mean", stack_apply = TRUE, force_df = TRUE, append_cols = "GEOID", max_cells_in_memory = 2.14e9, progress = FALSE ) ) ``` ``` #> user system elapsed #> 60.387 2.059 64.147 ``` ```r nmain <- c("AS", "AK", "HI", "PR", "VI", "GU", "MP") stt_nmain <- stt[!stt$STUSPS %in% nmain, ] future::plan(future.mirai::mirai_multisession, workers = 20L) system.time( exhierarchy <- chopin::par_hierarchy( bgsf_main, regions_id = "STATEFP", fun_dist = extract_at, y = bgsf_main, x = bilssds, id = "GEOID" ) ) ``` ``` #> user system elapsed #> 2.205 0.558 158.905 ``` ```r future::plan(future::sequential) ``` ## Finely resolved raster We demonstrate the extraction process with the [CropScape](https://croplandcros.scinet.usda.gov) dataset which has a resolution of 30 meters. In this case, we use `frac` option of `exact_extract` which tabulates the fraction of the area of the cell category that is covered by the polygon after accounting for partial coverage of polygon segments over raster cells. ```r cdl <- terra::rast("input/2022_cdls/2022_30m_cdls.tif") system.time( bgsf_cdl_single <- exactextractr::exact_extract( cdl, bgsf_main, fun = "frac", stack_apply = TRUE, force_df = TRUE, append_cols = "GEOID", max_cells_in_memory = 2.14e9, progress = FALSE ) ) ``` ``` #> user system elapsed #> 1013.411 44.322 1369.053 ``` For balancing computation time, we will split the block groups into nine subsets to parallelize. Note that `mode = "grid_quantile"` is used in `par_pad_grid` to balance the number of block groups per grid. When `input` argument of `par_pad_grid` is polygons, a few polygons will have duplicate rows in the output data.frame since block groups overlapping each grid will be selected. ```r cdl <- terra::rast("input/2022_cdls/2022_30m_cdls.tif") bgsf <- sf::st_read("input/Blockgroups_2020.gpkg") bgsf_main <- bgsf %>% dplyr::filter(!STATEFP %in% c("02", "15", "72", "66", "78", "60", "69")) # balancing the number of features assigned per workers # by splitting block groups by splitting centroids bgsf_9grids_plain <- bgsf_main %>% chopin::par_pad_grid( mode = "grid", nx = 3L, ny = 3L, padding = 1e4 ) bgsf_9grids <- bgsf_main %>% chopin::par_pad_grid( mode = "grid_quantile", padding = 1e4, quantiles = chopin::par_def_q(3L) ) future::plan(future.mirai::mirai_multisession, workers = 9L) # Note that bgsf_9grids were converted to sf # for exporting the objects to the parallel workers system.time( bgsf_cdl_par <- chopin::par_grid( lapply(bgsf_9grids, sf::st_as_sf), fun_dist = extract_at, y = bgsf_main, x = cdl, id = "GEOID", func = "frac", max_cells = 2e7 ) ) ``` ``` #> Your input function was successfully run at CGRIDID: 1 #> Your input function was successfully run at CGRIDID: 2 #> Your input function was successfully run at CGRIDID: 3 #> Your input function was successfully run at CGRIDID: 4 #> Your input function was successfully run at CGRIDID: 5 #> Your input function was successfully run at CGRIDID: 6 #> Your input function was successfully run at CGRIDID: 7 #> Your input function was successfully run at CGRIDID: 8 #> Your input function was successfully run at CGRIDID: 9 #> user system elapsed #> 6.632 1.821 347.638 ``` ## See also - [Garnett, R. (2023). Geospatial distributed processing with furrr](https://posit.co/blog/geospatial-distributed-processing-with-furrr/) - [Dyba, K. (n.d.). Parallel raster processing in *stars*](https://kadyb.github.io/stars-parallel/Tutorial.html)