chopin
This document demonstrates to parallelize weather/climate geospatial
data processing with chopin
and what cases users may take
advantage of parallel processing or not. We will cover the following
formats:
We consider TerraClimate and PRISM data which have its own data format each. Parameter-elevation Regressions on Independent Slopes Model (PRISM) is a high-resolution (800-1,000 meters) gridded climate dataset available in the BIL (band interleaved by line) format which is readable with GDAL. TerraClimate is a high-resolution (5 km) gridded climate dataset in the NetCDF format which is readable with GDAL.
Data | Source | Resolution | File format |
---|---|---|---|
TerraClimate | University of Idaho | 0.0417 degrees | NetCDF |
PRISM | Oregon State University | 0.0083 degrees | BIL |
The spatial resolution of TerraClimate data commensurates 5 km in the
equator, whereas that of PRISM data is approximately 1 km. The data are
available in the NetCDF format which is readable with GDAL. We will use
the terra
package to read the data.
We will consider the populated places centroids in the mainland
United States (i.e., excluding Alaska, Hawaii, and the territories). We
will use the tigris
package to download the data.
Data | Number of features | Source | Type |
---|---|---|---|
Census places | 31,377 | US Census Bureau | Polygon |
Block groups | 238,193 | US Census Bureau | Polygon |
Grid points in the southeastern US | 1,204,904 | Author, US Census Bureau (state polygons) | Point |
We used a machine with 112 physical CPU cores with 750 GB of memory, but we used only a portion of the cores (up to 20) for the demonstration. No maximum possible memory usage was set. However, in typical HPC management systems, users are required to request the number of cores and memory for their jobs. The number of cores and memory capacity should be considered when users parallelize the extraction process.
# populated places
state <- tigris::states(year = 2022)
statemain <-
state[!state$STUSPS %in% c("AK", "HI", "PR", "VI", "GU", "MP", "AS"), ]
target_states <- statemain$GEOID
popplace <-
lapply(target_states, function(x) tigris::places(x, year = 2022)) %>%
do.call(rbind, .)
saveRDS(popplace, "./input/populated_place_2022.rds", compress = "xz")
state <- tigris::states(year = 2022)
statemain <-
state[!state$STUSPS %in% c("AK", "HI", "PR", "VI", "GU", "MP", "AS"), ]
target_states <- statemain$GEOID
# download populated places
options(tigris_use_cache = TRUE)
popplace <-
lapply(target_states, function(x) {
tigris::places(year = 2022, state = x)
})
popplace <- do.call(rbind, popplace)
# generate circular buffers with 10 km radius
popplacep <- sf::st_centroid(popplace, of_largest_polygon = TRUE) %>%
sf::st_transform("EPSG:5070")
popplacep2 <- sf::st_centroid(popplace, of_largest_polygon = TRUE)
popplaceb <- sf::st_buffer(popplacep, dist = units::set_units(10, "km"))
TerraClimate data are provided in yearly NetCDF files, each of which
contains monthly layers. We will read the data with the terra
package and preprocess the data to extract the annual mean and sum of
the bands by types of columns.
For reproducibility, run the code below with our companion package
amadeus
to download terraClimate data. Please note that it
will take a while depending on the internet speed.
rlang::check_installed("amadeus")
tcli_variables <- c(
"aet", "def", "pet", "ppt", "q", "soil", "swe",
"PDSI", "srad", "tmax", "tmin", "vap", "vpd", "ws"
)
amadeus::download_terraclimate(
variables = tcli_variables,
year = c(2000, 2022),
directory_to_save = "./input/terraClimate",
acknowledgement = TRUE,
download = TRUE,
remove_command = TRUE
)
# wbd
ext_mainland <- c(xmin = -126, xmax = -64, ymin = 22, ymax = 51)
ext_mainland <- terra::ext(ext_mainland)
path_tc <- file.path("input", "terraClimate")
path_tc_files <- list.files(
path = path_tc, pattern = "*.nc$",
full.names = TRUE
)
path_tc_files
#> [1] "input/terraClimate/TerraClimate_aet_2000.nc"
#> [2] "input/terraClimate/TerraClimate_aet_2001.nc"
#> [truncated]
#> [321] "input/terraClimate/TerraClimate_ws_2021.nc"
#> [322] "input/terraClimate/TerraClimate_ws_2022.nc"
Fourteen variables are available in TerraClimate data. The table below is from the TerraClimate website.
Variable | Description | Units |
---|---|---|
aet | Actual Evapotranspiration, monthly total | mm |
def | Climate Water Deficit, monthly total | mm |
PDSI | Palmer Drought Severity Index, at end of month | unitless |
pet | Potential evapotranspiration, monthly total | mm |
ppt | Precipitation, monthly total | mm |
q | Runoff, monthly total | mm |
soil | Soil Moisture, total column - at end of month | mm |
srad | Downward surface shortwave radiation | W/m2 |
swe | Snow water equivalent - at end of month | mm |
tmax | Max Temperature, average for month | C |
tmin | Min Temperature, average for month | C |
vap | Vapor pressure, average for month | kPa |
vpd | Vapor Pressure Deficit, average for month | kpa |
ws | Wind speed, average for month | m/s |
The variables can be categorized into two types: those that will be summed and those that will be averaged.
aet
, def
, pet
,
ppt
, q
, soil
, and
swe
.PDSI
, srad
, tmax
,
tmin
, vap
, vpd
, and
ws
.Following that rationale, we will preprocess the data by summing the
first seven layers and averaging the rest of the layers. The code blocks
below follow “split-apply-combine” strategy. Note that
terra::tapp
aggregates layers by its attributes such as
time or custom indices.
options(future.globals = FALSE)
# some bands should be summed
bandnames <- c(
"aet", "def", "PDSI", "pet", "ppt", "q", "soil", "srad",
"swe", "tmax", "tmin", "vap", "vpd", "ws"
)
bandnames_sorted <- sort(bandnames)
# single nc file, yearly aggregation by fun value
# band for summation
bandnames_sum <- c("aet", "def", "pet", "ppt", "q", "soil", "swe")
# band for averaging
bandnames_avg <- c("PDSI", "srad", "tmax", "tmin", "vap", "vpd", "ws")
# mean: temporally marginal pixel mean (i.e., monthly -> yearly)
# sum: temporally marginal pixel sum (i.e., monthly -> yearly)
# Preprocessed data are stored in
tictoc::tic("sum: 7 layers")
netcdf_read_sum <-
split(bandnames_sum, bandnames_sum) |>
lapply(function(x) {
grep(paste0("(", x, ")"), path_tc_files, value = TRUE)
}) |>
lapply(function(x) {
terra::tapp(terra::rast(x, win = ext_mainland, snap = "out"), index = "years", fun = "sum")
})
netcdf_read_sum <- Reduce(c, netcdf_read_sum)
tictoc::toc()
#> sum: 7 layers: 177.974 sec elapsed
names(netcdf_read_sum) <- paste0(rep(bandnames_sum, each = 23), "_", rep(2000:2022, 7))
netcdf_read_sum
#> class : SpatRaster
#> dimensions : 696, 1488, 161 (nrow, ncol, nlyr)
#> resolution : 0.04166667, 0.04166667 (x, y)
#> extent : -126, -64, 22, 51 (xmin, xmax, ymin, ymax)
#> coord. ref. : +proj=longlat +ellps=WGS84 +no_defs
#> source(s) : memory
#> names : aet_2000, aet_2001, aet_2002, aet_2003, ...
#> min values : 22.9, 27.3, 11, 30.8, ...
#> max values : 1270.9, 1366.6, 1399, 1411.1, ...
#> time (years): 2000 to 2022
# tictoc::tic("mean: 7 layers")
# netcdf_read_mean <-
# split(bandnames_avg, bandnames_avg) |>
# lapply(function(x) {
# grep(paste0("(", x, ")"), path_tc_files, value = TRUE)
# }) |>
# lapply(function(x) {
# terra::tapp(terra::rast(x, win = ext_mainland, snap = "out"), index = "years", fun = "mean")
# }) |>
# Reduce(f = c, x = _)
# tictoc::toc()
# names(netcdf_read_mean) <-
# sprintf("%s_%d", rep(bandnames_avg, each = 23), rep(2000:2022, 7))
# netcdf_read_mean
!WARNING Stacking raster data may take a long time and consume a large amount of memory depending on users’ area of interest and data resolution. Users need to consider the memory capacity of the system before stacking rasters.
We have 14 data elements for 23 years with 12 months each. Below
demonstrates the summary of the data layers that were averaged at
circular buffer polygons with 10 kilometers (10,000 meters) radius. To
leverage multiple cores in your machine, run
future::availableCores()
to check the number of cores and
set the number of workers in future::plan
accordingly.
Typically, there are two logical cores in each physical core in modern
central processing units. The number of workers should be set to up to
the number of physical cores in the machine for optimal performance. The
example below uses future::multicore
which shares the
memory across the workers.
tic("multi threads (grid)")
future::plan(future::multicore, workers = 8L)
grid_init <-
chopin::par_pad_grid(
popplacep2,
mode = "grid",
padding = 1e4,
nx = 4L,
ny = 2L
)
multi_grid <-
chopin::par_grid(
grids = grid_init,
fun_dist = extract_at,
y = popplacep2,
x = netcdf_read_sum,
id = "GEOID",
func = "mean",
radius = 1e4
)
toc()
#> multi threads (grid): 16.668 sec elapsed
system.time(
multi <-
grep(
paste0("(", paste(bandnames_sum, collapse = "|"), ")"),
path_tc_files,
value = TRUE
) %>%
chopin::par_multirasters(
filenames = .,
fun_dist = extract_at,
y = popplacep2,
x = rast(), # ignored
id = "GEOID",
func = "mean",
radius = 1e4
)
)
#> user system elapsed
#> 5377.495 231.654 762.147
future::plan(future::sequential)
# single thread
tic("single thread")
single <-
exactextractr::exact_extract(
netcdf_read_sum,
sf::st_as_sf(popplaceb),
fun = "mean",
stack_apply = TRUE,
force_df = TRUE,
append_cols = c("GEOID"),
progress = FALSE
)
toc()
#> single thread: 21.161 sec elapsed
!CAUTION All Windows users and RStudio users (all platforms) will not be able to use
future::multicore
due to the restriction infuture
package. Pleasefuture::multisession
instead and note that this option will runs slower thanfuture::multicore
case.
PRISM data are provided in monthly 30-year normal BIL files. Assuming
that a user wants to summarize 30-year normal precipitation at 10
kilometers circular buffers of the geogrpahic centroids of US
Census Places (from p.26), we demonstrate the extraction process
with the chopin
package.
# populated places
# mainland states
state <- tigris::states(year = 2022)
statemain <-
state[!state$STUSPS %in% c("AK", "HI", "PR", "VI", "GU", "MP", "AS"), ]
target_states <- statemain$GEOID
popplace <-
lapply(target_states, function(x) tigris::places(x, year = 2022)) %>%
do.call(rbind, .)
saveRDS(popplace, "./input/populated_place_2022.rds", compress = "xz")
bils <- list.files("input", "bil$", recursive = TRUE, full.names = TRUE)
bilssds <- terra::rast(bils[-13])
popplace2 <- sf::st_transform(popplace, crs = terra::crs(bilssds))
popplaceb2 <- sf::st_transform(popplaceb, crs = terra::crs(bilssds))
!IMPORTANT
chopin::par_pad_grid
works the best with point datasets since each grid will clip the input features when parallelized. Polygon inputs will result in duplicate values in the output and lead to take longer to complete.
In the same vein as the TerraClimate data, we will use the
chopin
package to parallelize the extraction process with
grid strategy.
exgrid <-
chopin::par_pad_grid(
popplacep2,
mode = "grid",
padding = 1e4,
nx = 4L,
ny = 2L
)
future::plan(future::multicore, workers = 8L)
system.time(
exmulti <-
chopin::par_grid(
exgrid,
fun_dist = extract_at,
y = popplacep2,
x = bilssds,
radius = units::set_units(1e4, "meter"),
id = "GEOID",
func = "mean"
)
)
#> Your input function was successfully run at CGRIDID: 1
#> Your input function was successfully run at CGRIDID: 2
#> Your input function was successfully run at CGRIDID: 3
#> Your input function was successfully run at CGRIDID: 4
#> Your input function was successfully run at CGRIDID: 5
#> Your input function was successfully run at CGRIDID: 6
#> Your input function was successfully run at CGRIDID: 7
#> Your input function was successfully run at CGRIDID: 8
#> user system elapsed
#> 33.090 13.254 14.571
system.time(
exsingle <-
exactextractr::exact_extract(
bilssds,
popplaceb2,
fun = "mean",
stack_apply = TRUE,
force_df = TRUE,
append_cols = "GEOID",
max_cells_in_memory = 2.14e9,
progress = FALSE
)
)
#> user system elapsed
#> 19.347 1.927 21.716
Examples above showed that the difference between the parallelized and single-threaded extraction process is not significant. We will increase the buffer size to 50 kilometers and compare the performance of the parallelized and single-threaded extraction process.
# make buffers
popplaceb5 <- sf::st_buffer(popplacep, dist = units::set_units(50, "km")) %>%
sf::st_transform(terra::crs(bilssds))
system.time(
exsingle5 <-
exactextractr::exact_extract(
bilssds,
popplaceb5,
fun = "mean",
stack_apply = TRUE,
force_df = TRUE,
append_cols = "GEOID",
max_cells_in_memory = 2.14e9,
progress = FALSE
)
)
#> user system elapsed
#> 140.373 2.302 144.552
exgrid5k <-
chopin::par_pad_grid(
popplacep2,
mode = "grid",
padding = 5e4,
nx = 4L,
ny = 2L
)
future::plan(future::multicore, workers = 8L)
system.time(
exmulti5k <-
chopin::par_grid(
exgrid5k,
fun_dist = extract_at,
y = popplacep2,
x = bilssds,
radius = 5e4,
id = "GEOID",
func = "mean"
)
)
#> Your input function was successfully run at CGRIDID: 1
#> Your input function was successfully run at CGRIDID: 2
#> Your input function was successfully run at CGRIDID: 3
#> Your input function was successfully run at CGRIDID: 4
#> Your input function was successfully run at CGRIDID: 5
#> Your input function was successfully run at CGRIDID: 6
#> Your input function was successfully run at CGRIDID: 7
#> Your input function was successfully run at CGRIDID: 8
#> user system elapsed
#> 152.344 11.003 60.409
!NOTE The example above used strings of raster file paths for
surf
argument inchopin::extract_at_buffer
.terra::rast
at multiple file paths will return aSpatRaster
with multiple layers only if the rasters have the same extent and resolution.
This example uses 1,204,934 1-km grid points in the southeastern United States to summarize seven layers of TerraClimate.
## generate 1km grid points in the southeastern US States
stt <- tigris::states(year = 2020)
targ_states <- c("NC", "SC", "GA", "FL", "AL", "MS", "TN", "LA", "AR")
stt_targ <- stt[stt$STUSPS %in% targ_states, ]
stt_t <- sf::st_transform(stt_targ, "EPSG:5070")
stt_g <- sf::st_sample(stt_t, type = "regular", 1204934)
stt_g <- sf::st_as_sf(stt_g)
sf::st_geometry(stt_g) <- "geometry"
stt_g$pid <- seq(1, nrow(stt_g))
stt_gb <- sf::st_buffer(stt_g, units::set_units(10, "km"))
tic()
single_2m <-
exactextractr::exact_extract(
netcdf_read_sum,
stt_gb,
fun = "mean",
stack_apply = TRUE,
force_df = TRUE,
max_cells_in_memory = 2.14e9,
progress = FALSE
)
toc()
#> 855.908 sec elapsed
stt_gbg <-
chopin::par_pad_grid(
stt_g,
mode = "grid",
padding = 5e3,
nx = 5L,
ny = 5L
)
future::plan(future::multicore, workers = 8L)
system.time(
stt5k <-
chopin::par_grid(
stt_gbg,
fun_dist = extract_at,
y = stt_g,
x = netcdf_read_sum,
id = "pid",
radius = 1e4,
func = "mean",
max_cells = 2e7
)
)
#> user system elapsed
#> 6.745 4.102 434.041
Using PRISM data, the example below summarizes the mean values of each data element at census block groups.
#> Reading layer `Blockgroups_2020' from data source
#> `/ddn/gs1/home/songi2/projects/chopin/input/Blockgroups_2020.gpkg'
#> using driver `GPKG'
#> Simple feature collection with 242298 features and 11 fields
#> Geometry type: MULTIPOLYGON
#> Dimension: XY
#> Bounding box: xmin: -179.1467 ymin: -14.5487 xmax: 179.7785 ymax: 71.38782
#> Geodetic CRS: NAD83
bgsf_main <- bgsf %>%
dplyr::filter(!STATEFP %in% c("02", "15", "72", "66", "78", "60", "69"))
## extract prism at bg
system.time(
exsingle <-
exactextractr::exact_extract(
bilssds,
bgsf_main,
fun = "mean",
stack_apply = TRUE,
force_df = TRUE,
append_cols = "GEOID",
max_cells_in_memory = 2.14e9,
progress = FALSE
)
)
#> user system elapsed
#> 60.387 2.059 64.147
nmain <- c("AS", "AK", "HI", "PR", "VI", "GU", "MP")
stt_nmain <- stt[!stt$STUSPS %in% nmain, ]
future::plan(future.mirai::mirai_multisession, workers = 20L)
system.time(
exhierarchy <-
chopin::par_hierarchy(
bgsf_main,
regions_id = "STATEFP",
fun_dist = extract_at,
y = bgsf_main,
x = bilssds,
id = "GEOID"
)
)
#> user system elapsed
#> 2.205 0.558 158.905
We demonstrate the extraction process with the CropScape dataset which
has a resolution of 30 meters. In this case, we use frac
option of exact_extract
which tabulates the fraction of the
area of the cell category that is covered by the polygon after
accounting for partial coverage of polygon segments over raster
cells.
cdl <- terra::rast("input/2022_cdls/2022_30m_cdls.tif")
system.time(
bgsf_cdl_single <-
exactextractr::exact_extract(
cdl,
bgsf_main,
fun = "frac",
stack_apply = TRUE,
force_df = TRUE,
append_cols = "GEOID",
max_cells_in_memory = 2.14e9,
progress = FALSE
)
)
#> user system elapsed
#> 1013.411 44.322 1369.053
For balancing computation time, we will split the block groups into
nine subsets to parallelize. Note that
mode = "grid_quantile"
is used in par_pad_grid
to balance the number of block groups per grid. When input
argument of par_pad_grid
is polygons, a few polygons will
have duplicate rows in the output data.frame since block groups
overlapping each grid will be selected.
cdl <- terra::rast("input/2022_cdls/2022_30m_cdls.tif")
bgsf <- sf::st_read("input/Blockgroups_2020.gpkg")
bgsf_main <- bgsf %>%
dplyr::filter(!STATEFP %in% c("02", "15", "72", "66", "78", "60", "69"))
# balancing the number of features assigned per workers
# by splitting block groups by splitting centroids
bgsf_9grids_plain <- bgsf_main %>%
chopin::par_pad_grid(
mode = "grid",
nx = 3L, ny = 3L, padding = 1e4
)
bgsf_9grids <- bgsf_main %>%
chopin::par_pad_grid(
mode = "grid_quantile",
padding = 1e4,
quantiles = chopin::par_def_q(3L)
)
future::plan(future.mirai::mirai_multisession, workers = 9L)
# Note that bgsf_9grids were converted to sf
# for exporting the objects to the parallel workers
system.time(
bgsf_cdl_par <-
chopin::par_grid(
lapply(bgsf_9grids, sf::st_as_sf),
fun_dist = extract_at,
y = bgsf_main,
x = cdl,
id = "GEOID",
func = "frac",
max_cells = 2e7
)
)
#> Your input function was successfully run at CGRIDID: 1
#> Your input function was successfully run at CGRIDID: 2
#> Your input function was successfully run at CGRIDID: 3
#> Your input function was successfully run at CGRIDID: 4
#> Your input function was successfully run at CGRIDID: 5
#> Your input function was successfully run at CGRIDID: 6
#> Your input function was successfully run at CGRIDID: 7
#> Your input function was successfully run at CGRIDID: 8
#> Your input function was successfully run at CGRIDID: 9
#> user system elapsed
#> 6.632 1.821 347.638