Here is an attempt at implementing a parallelized version of the aggregate function in sf. This function expresses a simple features object as a graph, breaks this graph down into sub-graphs using their spatial proximity and grouping data, calls the aggregate function on each sub-graph in parallel, and finally merges the binds outputs all together. I’ve also included a quick benchmark—it doesn’t look very inspiring, but perhaps this function might be more beneficial for larger data sets.
# load packages
library(raster)## Loading required package: sp
library(dplyr)##
## Attaching package: 'dplyr'
## The following objects are masked from 'package:raster':
##
## intersect, select, union
## The following objects are masked from 'package:stats':
##
## filter, lag
## The following objects are masked from 'package:base':
##
## intersect, setdiff, setequal, union
library(sf)## Linking to GEOS 3.5.1, GDAL 2.2.2, proj.4 4.9.2
library(lwgeom)## Linking to liblwgeom 2.5.0dev r16016, GEOS 3.5.1, proj.4 4.9.2
library(microbenchmark)
library(rworldxtra)
# set number of threads for benchmark
n_threads <- 4
# define parallel aggregate function
parallel_aggregate <- function(x, by, FUN, ..., do_union = TRUE,
simplify = TRUE, join = sf::st_intersects,
threads = 1) {
## capture args
pargs <- list(FUN = FUN, ..., do_union = do_union, simplify = simplify,
join = join)
## build list over overlapping features
neighbor_list <- rgeos::gUnarySTRtreeQuery(as(x, "Spatial"))
## convert list to symmetric matrix
rows <- unlist(lapply(seq_along(neighbor_list),
function(i) rep(i + 1, length(neighbor_list[[i]]))),
recursive = TRUE, use.names = FALSE)
cols <- unlist(neighbor_list, recursive = TRUE, use.names = FALSE)
neighbor_matrix <- Matrix::sparseMatrix(i = rows, j = cols,
x = rep(1, length(rows)),
dims = c(nrow(x), nrow(x)))
neighbor_matrix <- Matrix::forceSymmetric(neighbor_matrix)
## create matrix indicating which features are in the same group
group_ids <- do.call(paste, append(by, list(sep = ".")))
group_onehot_matrix <- model.matrix(~ id - 1, data.frame(id = group_ids))
group_matrix <- as(matrix(0, nrow = nrow(x), ncol = nrow(x)), "Matrix")
for (i in seq_len(ncol(group_onehot_matrix))) {
pos <- which(group_onehot_matrix[, i] == 1)
group_matrix[as.matrix(expand.grid(pos, pos))] <- 1
}
## remove cells that correspond to overlaps between geometries that
## belong to different groups in by
neighbor_matrix <- neighbor_matrix * group_matrix
## create graph from neighborhood matrix
neighbor_graph <- igraph::graph_from_adjacency_matrix(neighbor_matrix,
diag = FALSE,
mode = "undirected",
weighted = NULL)
## find clusters in the graph
graph_clusters <- igraph::clusters(neighbor_graph)
## create cluster for parallel processing
cl <- parallel::makeCluster(threads, "PSOCK")
## run dissolve in parallel
### note that we use the load-balancing implementation
### because we expect different clusters to take different
### amounts of time to process
tryCatch(
{parallel::clusterEvalQ(cl, library(sf))
parallel::clusterExport(cl, c("x", "by", "pargs", "graph_clusters"),
envir = environment())
result <- parallel::parLapplyLB(cl, unique(graph_clusters$membership),
function(i) {
# extract indices of features in sub-graph
pos <- graph_clusters$membership == i
# subset group ids in by
by_subset <- lapply(by, `[`, pos)
# run aggregate
do.call(aggregate, append(list(x = x[pos, ], by = by_subset), pargs))
})
},
finally = {parallel::stopCluster(cl)})
## compile output
if (length(result) > 1) {
result <- do.call(rbind, result)
} else {
result <- result[[1]]
}
## return result
return(result)
}# load data
data(countriesHigh)
# create spatial data
s <- lwgeom::st_make_valid(as(countriesHigh, "sf"))
# assign grouping variable
s$GROUP <- s$REGION# plot data by group
plot(s[, "GROUP"])# verify that parallel_aggregate works as expected
## run aggregate and parallel aggregate
out1 <- aggregate(s, by = list(s$GROUP), FUN = dplyr::first)
out2 <- parallel_aggregate(s, by = list(s$GROUP), FUN = dplyr::first,
threads = n_threads)
## plot both outputs
par(mfrow = c(1, 2))
plot(out1[, "GROUP"], main = "aggregate")plot(out2[, "GROUP"], main = "parallel_aggregate")# benchmark experiment
## run benchmark
result <- microbenchmark(
aggregate = aggregate(s, by = list(s$GROUP), FUN = dplyr::first),
parallel_aggregate = parallel_aggregate(s, by = list(s$GROUP),
FUN = dplyr::first,
threads = n_threads),
times = 10L, unit = "s")
## print results
print(result)## Unit: seconds
## expr min lq mean median uq max
## aggregate 211.1410 212.2627 214.4997 213.8748 216.1232 222.5055
## parallel_aggregate 121.7825 122.4164 122.8886 122.8362 123.2444 124.0313
## neval cld
## 10 b
## 10 a
## plot result
plot(result)