Measuring behaviors of traffic congestion in Lima
Introduction
In this document, I will show and explain the basics of the methodology to measure the impact of investment in some corridors located in Lima, Perú. This is done with R, sparklyr and apache.sedona environments.
Setting up
First, we need to set up the spark connections and load all the dependencies of the project.
library(tidyverse)
library(sf)
library(sparklyr)
library(apache.sedona)
library(osmdata)
library(tmap)
library(tmaptools)
library(mapview)
library(leaflet)
library(highcharter)
library(plotly)
library(h3)The config parameter of the spark session is filled in this way:
conf <- spark_config()
conf$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:3.2.0"
conf$`sparklyr.shell.driver-memory` <- "24G"
conf$`sparklyr.shell.executor-memory` <- "24G"
conf$spark.driver.memory <- "24G"
conf$spark.executor.memory <- "24G"
conf$spark.sql.autoBroadcastJoinThreshold <- -1
conf$spark.network.timeout <- "10000000s"
conf$spark.driver.maxResultSize<- "24G"
Sys.setenv("JAVA_HOME" = "/usr/java/jdk1.8.0_251/")
sc <- spark_connect(master = "local", config = conf, version = "3.0.1")In this case, I used 24 GB of RAM the spark session. The PC used in this project has 32 GB in total, so there is room to do a couple of things like editing code or write this document when the driver is processing all the data. As there is no special definition for the utilized cores, spark uses all the available CPU cores at 100% (always check the temperature of the cores, even if the CPU is not overclocked). For compatibility reasons, I used spark 3.0.1 and hadoop 3.2.0.
Loading and tidying the data
The raw .csv data was compressed and partitioned in several .gz files. With sparklyr, we can import the data automatically indicating the path of all the .gz files. If all of them have the same structure, sparklyr will generate an unique dataset with all the data from every file. To avoid the random name in the spark context, we can simply fill the name parameter with a more proper name.
lima_sc <- spark_read_csv(sc, name = "lima_sc", path = "393lima/jams",
overwrite = T)
head(lima_sc)Once we have all the data in the spark context, it is recommended to have a look of the data and generate some useful variables in the dataset, like weekdays, month and floor hours.
lima_sc <- lima_sc %>%
mutate(ts = as.POSIXct(ts),
ts = from_utc_timestamp(ts, "UTC-5"),
floor_int = sql("hour(`ts`)"), day = sql("day(`ts`)"),
date = as.Date(ts),
year = sql("year(`ts`)"),
weekday = sql("weekday(`ts`)"),
month = sql("month(`ts`)"))To have a better notion of the behavior of traffic congestion, it is suggested to have in hand the geometries of the district you are analyzing. In this case, I used Open Street Maps to have the administrative boundaries of the area and, later, join the jams linestrings with these polygons.
division <- lima_sc %>% distinct(city) %>% collect() %>% na.omit()
boundaries <- opq(bbox = 'Department of Lima') %>%
add_osm_feature(key = 'admin_level', value = '8') %>%
osmdata_sf %>% unique_osmdata
municipalities <- boundaries$osm_multipolygons
division$fixed_names <- division$city %>%
str_replace(" - Chosica", "") %>%
str_replace("Pachacamac", "Pachacámac") %>%
str_replace("Carmen de la Legua Reynoso",
"Carmen de La Legua-Reynoso")
division <- division[-16,] %>% left_join(municipalities,
by = c("fixed_names" = "name")) %>% st_as_sf()
limit <- st_read("LimaCallaoLimit.shp")mapview(division[limit, ] %>%
mutate(population = as.numeric(population)), zcol = "population",
layer.name = 'Population') Now, we need to import the data from the analyzed corridors. After a QGis export, the corridors were saved as several geoJSONs, one for each corridor. The corridor with ID == 7 will not be analyzed in this project.
corridors <- list()
for (i in 1:10) {
corridors[[i]] <- st_read(paste0("~/", i, ".geojson"), quiet = TRUE) %>%
sf::st_zm()
corridors[[i]]$id <- i
}
corridors <- bind_rows(corridors) %>% filter(id != 7)
corridors$geometry <- st_as_text(corridors$geometry)
corridors <- data.frame(id = corridors$id, geometry = corridors$geometry)
corridors_sc <- copy_to(sc, corridors, name = "corridors", overwrite = T)
corridors_sc <- mutate(corridors_sc, geometry = st_geomfromtext(geometry))In order to make a better analysis of the congested zones, I decided to make buffers of around 20 meters around each corridor linestring. I prefer to do this geographical transformation inside the spark context. This is because the spark engine uses all the available CPU cores, which neither R or Python computation processes do by themselves.
corridors_sc <- corridors_sc %>% mutate(buff = st_buffer(geometry, .0008))
corridors_buff <- corridors_sc %>% mutate(geometry = st_astext(buff)) %>%
collect()pal <- colorFactor("Dark2", names_corr$name_corr)
corridors_buff %>% left_join(names_corr) %>%
st_as_sf(wkt = "geometry") %>% select(-buff) %>%
leaflet() %>% addProviderTiles(providers$Stamen.Toner) %>%
addPolygons(color = ~pal(name_corr), label = ~name_corr) %>%
addLegend(pal = pal, values = ~name_corr)Joining, by = "id"
Warning in RColorBrewer::brewer.pal(max(3, n), palette): n too large, allowed maximum for palette Dark2 is 8
Returning the palette you asked for with that many colors
Warning in RColorBrewer::brewer.pal(max(3, n), palette): n too large, allowed maximum for palette Dark2 is 8
Returning the palette you asked for with that many colors
Warning in RColorBrewer::brewer.pal(max(3, n), palette): n too large, allowed maximum for palette Dark2 is 8
Returning the palette you asked for with that many colors
Spark processing
With the parallel capability of processing data, we can execute heavy geospatial joins with relatively less time comparing to a single core process. This enables to take all the traffic jams database and query a spatial join between those linestrings and the corridors polygon database.
The “levels” dataframe is the result of this geospatial join and a dplyr summarize. In this case, I needed to have speed, length and delay results for every corridor, level and date. This allowed me to generate basic indicators for the impact evaluation and comparisons between years and corridors.
levels <- corridors_sc %>%
left_join(lima_sc %>% select(-id),
sql_on = "st_intersects(buff, geo)") %>%
group_by(id, year, level, date) %>%
summarise(speed = mean(speedKMH, na.rm = T),
length = sum(length, na.rm = T),
delay = sum(delay, na.rm = T)) %>% collect()levels %>% left_join(names_corr) %>%
group_by(year, name_corr) %>%
summarize(length = sum(length, na.rm = T)/1000) %>%
hchart("column", hcaes(x = year, y = length, group = name_corr),
stacking = "normal") %>%
hc_yAxis(title = list(text = "Total length")) %>%
hc_xAxis(title = list(text = "Year")) %>%
hc_title(text = "Total length of traffic congestion
in Lima, by corridor and year.") %>%
hc_subtitle(text = "<b> Month</b>: September of every year.") %>%
hc_caption(text = "Own elaboration with <b>Waze</b> data.",
align = "right")Joining, by = "id"
`summarise()` has grouped output by 'year'. You can override using the
`.groups` argument.
p1 <- tm_shape(shape) +
tm_polygons("regularSpeed", style = "cont", palette = "YlGn",
n = 5, title = "Regular speed",
legend.format=list(fun=function(x) paste0(formatC(x,
digits=0, format="f"), " km/h"))) +
tm_layout(title= 'Regular speed by district (2019-2021)',
title.position = c('right', 'top')) +
tm_credits("Own elaboration with Waze data.")
p2 <- tm_shape(shape) +
tm_polygons("ratio", style = "cont", title = "",
legend.format=list(fun=function(x) paste0(formatC(x*100,
digits=0, format="f"), " %"))) +
tm_layout(title= 'Ratio of jams speed / regular speed',
title.position = c('right', 'top'))
tmap_arrange(p1, p2)
# st_crs(accidents) <- st_crs(division)
#
# ggplotly(accidents %>% ggplot() +
# geom_sf(data = division[limit, ], alpha = .1, fill = NA) +
# geom_sf(aes(color = as.factor(year)), size = .1, alpha = .8) +
# labs(color = "Year") + theme_void())
pal2 <- colorNumeric("Reds", 1:1000)
accidents %>% mutate(h3 = h3::geo_to_h3(.,8)) %>% st_drop_geometry() %>% count(h3) %>% mutate(h3_to_geo_boundary_sf(h3)) %>%
st_as_sf() %>% leaflet() %>% addProviderTiles(providers$Stamen.Toner) %>% addPolygons(color = ~pal2(n), stroke = NA, fillOpacity = .8) %>%
addLegend(pal = pal2, values = ~n)accidents %>% st_drop_geometry() %>% count(year, floor_int) %>%
hchart("line", hcaes(floor_int, n, group = year)) %>%
hc_yAxis(title = list(text = "Quantity")) %>%
hc_xAxis(title = list(text = "Year")) %>%
hc_title(text = "Count of reported accidents by hour in Lima.") %>%
hc_subtitle(text = "<b> Month</b>: September of every year.") %>%
hc_caption(text = "Own elaboration with <b>Waze</b> data.", align = "right")levels_by_hour %>% group_by(year, floor_int) %>%
summarise(n = sum(level*length/1000, na.rm = T)) %>%
hchart("line", hcaes(floor_int, n, group = year)) %>%
hc_yAxis(title = list(text = "Length in weighted km.")) %>%
hc_xAxis(title = list(text = "Year")) %>%
hc_title(text = "Length of traffic jams weighted by intensity in Lima.") %>%
hc_subtitle(text = "<b> Month</b>: September of every year.") %>%
hc_caption(text = "Own elaboration with <b>Waze</b> data.", align = "right")`summarise()` has grouped output by 'year'. You can override using the
`.groups` argument.
ggplotly(levels %>% filter(year %in% c(2021,2022)) %>%
left_join(names_corr) %>%
group_by(year, name_corr) %>% summarise(n = sum(length)) %>%
group_by(name_corr) %>% arrange(name_corr) %>% mutate(n = n/n[1]-1) %>%
filter(year == "2022") %>% arrange(-n) %>%
#mutate(name_corr = str_replace_all(name_corr, " - ", " - \n")) %>%
ggplot() + geom_col(aes(reorder(name_corr, n), n, fill = n, text = name_corr)) +
scale_fill_gradient2(high = "darkred", mid = "orange", low = "darkgreen", midpoint = 0,
labels = scales::percent) +
scale_y_continuous(labels = scales::percent) +
scale_x_discrete(labels = scales::label_wrap(15)) +
theme(axis.text.x = element_text(angle=45, size = 7)) +
labs(title = "Percent change of congestion levels between 2021 and 2022, by corridor.",
y = "", x = "Corridor", caption = "Own elaboration with Waze data."),
tooltip = "text")Warning in geom_col(aes(reorder(name_corr, n), n, fill = n, text = name_corr)):
Ignoring unknown aesthetics: text
ggplotly(levels_by_hour %>% left_join(names_corr) %>% group_by(name_corr, floor_int) %>%
summarise(length = mean(length)/1000, speed = mean(speed)*1.6) %>%
ggplot() + geom_col(aes(floor_int, length, fill = speed, text = paste0("Speed: ", round(speed,2), " km/h."))) +
facet_wrap(~name_corr) +
scale_y_continuous(labels = scales::label_number(suffix = " K", scale = 1e-3)) +
scale_fill_viridis_c() +
labs(x = "Hour", y = "Length weighted by intensity (Km.)", fill = "Speed (km/h)",
caption = "Own elaboration with Waze data.",
title = "Behaviour of the congestion, by corridor."), tooltip = "text")Warning in geom_col(aes(floor_int, length, fill = speed, text = paste0("Speed:
", : Ignoring unknown aesthetics: text