Parallel aggregate

Here is an attempt at implementing a parallelized version of the aggregate function in sf. This function expresses a simple features object as a graph, breaks this graph down into sub-graphs using their spatial proximity and grouping data, calls the aggregate function on each sub-graph in parallel, and finally merges the binds outputs all together. I’ve also included a quick benchmark—it doesn’t look very inspiring, but perhaps this function might be more beneficial for larger data sets.

# load packages
library(raster)
## Loading required package: sp
library(dplyr)
## 
## Attaching package: 'dplyr'
## The following objects are masked from 'package:raster':
## 
##     intersect, select, union
## The following objects are masked from 'package:stats':
## 
##     filter, lag
## The following objects are masked from 'package:base':
## 
##     intersect, setdiff, setequal, union
library(sf)
## Linking to GEOS 3.5.1, GDAL 2.2.2, proj.4 4.9.2
library(lwgeom)
## Linking to liblwgeom 2.5.0dev r16016, GEOS 3.5.1, proj.4 4.9.2
library(microbenchmark)
library(rworldxtra)

# set number of threads for benchmark
n_threads <- 4

# define parallel aggregate function
parallel_aggregate <- function(x, by, FUN, ..., do_union = TRUE,
                               simplify = TRUE, join = sf::st_intersects,
                               threads = 1) {
  ## capture args
  pargs <- list(FUN = FUN, ..., do_union = do_union, simplify = simplify,
               join = join)
  ## build list over overlapping features
  neighbor_list <- rgeos::gUnarySTRtreeQuery(as(x, "Spatial"))
  ## convert list to symmetric matrix
  rows <- unlist(lapply(seq_along(neighbor_list),
                        function(i) rep(i + 1, length(neighbor_list[[i]]))),
                 recursive = TRUE, use.names = FALSE)
  cols <- unlist(neighbor_list, recursive = TRUE, use.names = FALSE)
  neighbor_matrix <- Matrix::sparseMatrix(i = rows, j = cols,
                                          x = rep(1, length(rows)),
                                          dims = c(nrow(x), nrow(x)))
  neighbor_matrix <- Matrix::forceSymmetric(neighbor_matrix)
  ## create matrix indicating which features are in the same group
  group_ids <- do.call(paste, append(by, list(sep = ".")))
  group_onehot_matrix <- model.matrix(~ id - 1, data.frame(id = group_ids))
  group_matrix <- as(matrix(0, nrow = nrow(x), ncol = nrow(x)), "Matrix")
  for (i in seq_len(ncol(group_onehot_matrix))) {
    pos <- which(group_onehot_matrix[, i] == 1)
    group_matrix[as.matrix(expand.grid(pos, pos))] <- 1
  }
  ## remove cells that correspond to overlaps between geometries that
  ## belong to different groups in by
  neighbor_matrix <- neighbor_matrix * group_matrix
  ## create graph from neighborhood matrix
  neighbor_graph <- igraph::graph_from_adjacency_matrix(neighbor_matrix,
                                                        diag = FALSE,
                                                        mode = "undirected",
                                                        weighted = NULL)
  ## find clusters in the graph
  graph_clusters <- igraph::clusters(neighbor_graph)
  ## create cluster for parallel processing
  cl <- parallel::makeCluster(threads, "PSOCK")
  ## run dissolve in parallel
  ### note that we use the load-balancing implementation
  ### because we expect different clusters to take different
  ### amounts of time to process
  tryCatch(
    {parallel::clusterEvalQ(cl, library(sf))
     parallel::clusterExport(cl, c("x", "by", "pargs", "graph_clusters"),
                             envir = environment())
     result <- parallel::parLapplyLB(cl, unique(graph_clusters$membership),
       function(i) {
         # extract indices of features in sub-graph
         pos <- graph_clusters$membership == i
         # subset group ids in by
         by_subset <- lapply(by, `[`, pos)
         # run aggregate
         do.call(aggregate, append(list(x = x[pos, ], by = by_subset), pargs))
     })
    },
    finally = {parallel::stopCluster(cl)})
  ## compile output
  if (length(result) > 1) {
    result <- do.call(rbind, result)
  } else {
    result <- result[[1]]
  }
  ## return result
  return(result)
}
# load data
data(countriesHigh)

# create spatial data
s <- lwgeom::st_make_valid(as(countriesHigh, "sf"))

# assign grouping variable
s$GROUP <- s$REGION
# plot data by group
plot(s[, "GROUP"])

# verify that parallel_aggregate works as expected
## run aggregate and parallel aggregate
out1 <- aggregate(s, by = list(s$GROUP), FUN = dplyr::first)
out2 <- parallel_aggregate(s, by = list(s$GROUP), FUN = dplyr::first,
                           threads = n_threads)

## plot both outputs
par(mfrow = c(1, 2))
plot(out1[, "GROUP"], main = "aggregate")

plot(out2[, "GROUP"], main = "parallel_aggregate")

# benchmark experiment
## run benchmark
result <- microbenchmark(
  aggregate = aggregate(s, by = list(s$GROUP), FUN = dplyr::first),
  parallel_aggregate = parallel_aggregate(s, by = list(s$GROUP),
                                          FUN = dplyr::first,
                                          threads = n_threads),
  times = 10L, unit = "s")

## print results
print(result)
## Unit: seconds
##                expr      min       lq     mean   median       uq      max
##           aggregate 211.1410 212.2627 214.4997 213.8748 216.1232 222.5055
##  parallel_aggregate 121.7825 122.4164 122.8886 122.8362 123.2444 124.0313
##  neval cld
##     10   b
##     10  a
## plot result
plot(result)