Access your data
We analyze the full taxi data as described by Todd Schneider in using R and sparklyr. We load the billion record trips table into Apache Spark and then use sparklyr and dplyr to manipulate the data and run machine learning algorithms at scale.
The data represent 200 GB of uncompressed data in CSV format. When converted and compressed in the parquet format, the data are 70 GB. The data are stored in HDFS and pre-loaded in a Hive table.
The Hadoop cluster runs on Elastic Map Reduce (EMR) in AWS and has 12 worker nodes and one master node. The master node has R, RStudio Server Pro, and sparklyr loaded onto it.
Connect to spark
Use sparklyr to create a new connection to Apache Spark.
# Load libraries
library(ggplot2)
library(leaflet)
library(geosphere)
library(tidyr)
library(shiny)
library(sparklyr)
library(dplyr)
library(miniUI)
library(DT)
# Configure cluster
Sys.setenv(SPARK_HOME="/usr/lib/spark")
config <- spark_config()
config$spark.driver.cores <- 32
config$spark.executor.cores <- 32
config$spark.executor.memory <- "40g"
# Connect to cluster
sc <- spark_connect(master = "yarn-client", config = config, version = '1.6.1')
Prepare your data
# Create table references
trips_tbl <- tbl(sc, "trips_par")
nyct2010_tbl <- tbl(sc, "nyct2010")
# Join tables
trips_joined_tbl <- trips_tbl %>%
filter(!is.na(pickup_nyct2010_gid) & !is.na(dropoff_nyct2010_gid)) %>%
select(pickup_datetime, dropoff_datetime,
pickup_latitude, dropoff_latitude,
pickup_longitude, dropoff_longitude,
pickup_nyct2010_gid, dropoff_nyct2010_gid,
tip_amount, fare_amount, vendor_id, passenger_count, trip_distance) %>%
left_join(
select(nyct2010_tbl, pickup_gid = gid, pickup_boro = boroname, pickup_nta = ntaname),
by = c("pickup_nyct2010_gid" = "pickup_gid")) %>%
left_join(
select(nyct2010_tbl, dropoff_gid = gid, dropoff_boro = boroname, dropoff_nta = ntaname),
by = c("dropoff_nyct2010_gid" = "dropoff_gid")) %>%
sdf_register("trips_par_joined")
# Cache table
tbl_cache(sc, "trips_par_joined")
Understand your data
Spark SQL and dplyr
Use dplyr syntax to write Spark SQL. When you’re ready to visualize your data, use collect to bring data into R memory. Notice the data describe roughly 170 million trips per year.
# Calculate total trips
trips_joined_tbl %>% count
Source: query [?? x 1]
Database: spark connection master=yarn-client app=sparklyr local=FALSE
n
<dbl>
1 1184591756
# Calculate trips by year
trip_by_year <- trips_joined_tbl %>%
mutate(year = year(pickup_datetime)) %>%
group_by(year) %>%
summarize(n = n()) %>%
collect()
# Plot trips by year
ggplot(trip_by_year, aes(year, n)) +
geom_bar(stat="Identity") +
scale_y_continuous(labels = scales::comma) +
labs(title = "Number of trips by year", x = "Year", y = "")

You can measure the time between pickup and dropoff for any two locations. In this example, we measure the time between JFK and mid-town. Notice the longest trip times occur around 4 PM.
# Calcluate trip time for a specific pickup and dropoff
pickup_dropoff_tbl <- trips_joined_tbl %>%
filter(pickup_nyct2010_gid == 1250 & dropoff_nyct2010_gid == 2056) %>%
mutate(pickup_hour = hour(pickup_datetime)) %>%
mutate(trip_time = unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime)) %>%
group_by(pickup_hour) %>%
summarize(n = n(),
trip_time_mean = mean(trip_time),
trip_time_p10 = percentile(trip_time, 0.10),
trip_time_p25 = percentile(trip_time, 0.25),
trip_time_p50 = percentile(trip_time, 0.50),
trip_time_p75 = percentile(trip_time, 0.75),
trip_time_p90 = percentile(trip_time, 0.90))
# Collect results
pickup_dropoff <- collect(pickup_dropoff_tbl)
# Plot
ggplot(pickup_dropoff, aes(x = pickup_hour)) +
geom_line(aes(y = trip_time_p50, alpha = "Median")) +
geom_ribbon(aes(ymin = trip_time_p25, ymax = trip_time_p75,
alpha = "25–75th percentile")) +
geom_ribbon(aes(ymin = trip_time_p10, ymax = trip_time_p90,
alpha = "10–90th percentile")) +
scale_y_continuous("trip duration in minutes")

Spark ML
With Spark ML you can run machine learning algorithms against all your data in Spark. Here we attempt to understand what factors influence tip amounts.
# Select a model data set
model_tbl <- trips_joined_tbl %>%
filter(pickup_nyct2010_gid == 1250 & dropoff_nyct2010_gid == 2056) %>%
mutate(pickup_hour = hour(pickup_datetime)) %>%
mutate(pickup_week = weekofyear(pickup_datetime)) %>%
mutate(pickup_year = year(pickup_datetime)) %>%
mutate(trip_time = unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime)) %>%
filter(!is.na(pickup_nyct2010_gid) & !is.na(dropoff_nyct2010_gid)) %>%
filter(!is.na(tip_amount)) %>%
filter(!is.na(fare_amount)) %>%
filter(!is.na(vendor_id)) %>%
filter(!is.na(pickup_hour)) %>%
filter(!is.na(pickup_week)) %>%
filter(!is.na(passenger_count)) %>%
filter(!is.na(trip_time)) %>%
filter(!is.na(trip_distance))
# Partitioin into train and validate
model_partition_tbl <- model_tbl %>%
sdf_partition(train = 0.8, test = 0.2, seed = 1234)
# Create table references
trips_train_tbl <- sdf_register(model_partition_tbl$train, "trips_train")
trips_test_tbl <- sdf_register(model_partition_tbl$train, "trips_test")
# Cache model data
tbl_cache(sc, "trips_train")
# Model data
model_formula <- formula(tip_amount ~
fare_amount + vendor_id + pickup_hour + pickup_week +
passenger_count + trip_time + trip_distance)
m1 <- ml_linear_regression(trips_train_tbl, model_formula)
summary(m1)
Call: ml_linear_regression(trips_train_tbl, model_formula)
Deviance Residuals: (approximate):
Min 1Q Median 3Q Max
-25.2997 -4.6801 -0.9295 5.0750 144.4012
Coefficients:
Estimate Std. Error t value Pr(>|t|)
(Intercept) -1.3565e+00 1.6172e-01 -8.3879 < 2.2e-16 ***
fare_amount 1.7752e-01 2.7468e-03 64.6287 < 2.2e-16 ***
vendor_id_2 4.6725e-01 7.6849e-02 6.0800 1.205e-09 ***
vendor_id_CMT -1.5830e+00 6.2922e-02 -25.1581 < 2.2e-16 ***
vendor_id_DDS -3.1478e+00 1.3872e-01 -22.6917 < 2.2e-16 ***
vendor_id_VTS -1.4759e+00 6.2890e-02 -23.4686 < 2.2e-16 ***
pickup_hour -2.1475e-02 3.5354e-03 -6.0743 1.248e-09 ***
pickup_week 1.9910e-03 9.9476e-04 2.0015 0.04535 *
passenger_count -2.5653e-01 1.1200e-02 -22.9058 < 2.2e-16 ***
trip_time -8.0990e-06 8.6924e-06 -0.9317 0.35148
trip_distance -9.4866e-05 7.1767e-05 -1.3219 0.18622
---
Signif. codes: 0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘ ’ 1
R-Squared: 0.05971
Root Mean Squared Error: 5.526
Shiny Gadget
It is often useful to visualize the analyses interactively. Use a Shiny gadget to explore other pickup and dropoff locations.
# Create list of geo groups to select from
geo_group <- trips_joined_tbl %>%
distinct(pickup_nta) %>%
arrange(pickup_nta) %>%
collect
# Create the gadget user interface
ui <- miniPage(
gadgetTitleBar("NYC Taxi Trips"),
miniTabstripPanel(
miniTabPanel("Inputs", icon = icon("sliders"),
miniContentPanel(
selectInput("pickup", "Taxi Pickup", geo_group, "Lincoln Square"),
selectInput("dropoff", "Taxi Dropoff", geo_group, "Upper West Side")
)
),
miniTabPanel("Plot", icon = icon("area-chart"),
miniContentPanel(
plotOutput("tripTimePlot")
)
),
miniTabPanel("Map", icon = icon("map-o"),
miniContentPanel(
leafletOutput("tripLeaflet")
)
),
miniTabPanel("Data", icon = icon("table"),
miniContentPanel(
dataTableOutput("table", height = "100%")
)
)
)
)
# Create the shiny gadget functions
server <- function(input, output) {
shiny_pickup_dropoff_hour <- reactive({
trips_joined_tbl %>%
filter(pickup_nta == input$pickup & dropoff_nta == input$dropoff) %>%
mutate(pickup_hour = hour(pickup_datetime)) %>%
mutate(trip_time = unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime)) %>%
group_by(pickup_hour) %>%
summarize(n = n(),
pickup_latitude = mean(pickup_latitude),
pickup_longitude = mean(pickup_longitude),
dropoff_latitude = mean(dropoff_latitude),
dropoff_longitude = mean(dropoff_longitude),
trip_time_mean = mean(trip_time),
trip_time_p10 = percentile(trip_time, 0.10),
trip_time_p25 = percentile(trip_time, 0.25),
trip_time_p50 = percentile(trip_time, 0.50),
trip_time_p75 = percentile(trip_time, 0.75),
trip_time_p90 = percentile(trip_time, 0.90)) %>%
collect
})
shiny_pickup_dropoff <- reactive({
shiny_pickup_dropoff_hour() %>%
summarize(n = n(),
pickup_latitude = mean(pickup_latitude),
pickup_longitude = mean(pickup_longitude),
dropoff_latitude = mean(dropoff_latitude),
dropoff_longitude = mean(dropoff_longitude))
})
output$tripTimePlot <- renderPlot({
ggplot(shiny_pickup_dropoff_hour(), aes(x = pickup_hour)) +
geom_line(aes(y = trip_time_p50 / 60, alpha = "Median")) +
geom_ribbon(aes(ymin = trip_time_p25 / 60,
ymax = trip_time_p75 / 60,
alpha = "25–75th percentile")) +
geom_ribbon(aes(ymin = trip_time_p10 / 60,
ymax = trip_time_p90 / 60,
alpha = "10–90th percentile")) +
scale_y_continuous("trip duration in minutes") +
ggtitle("Trip time in minutes")
})
output$tripLeaflet <- renderLeaflet({
leaflet(shiny_pickup_dropoff()) %>%
addProviderTiles("CartoDB.Positron") %>%
addCircleMarkers(~pickup_longitude, ~pickup_latitude, fill = FALSE, color = "green") %>%
addCircleMarkers(~dropoff_longitude, ~dropoff_latitude, stroke = FALSE, color = "red")
})
output$table <- renderDataTable({
shiny_pickup_dropoff_hour() %>%
mutate(trip_time_mean = round(trip_time_mean / 60)) %>%
mutate(trip_time_p50 = round(trip_time_p50 / 60)) %>%
select(pickup_hour, n, trip_time_mean, trip_time_p50)
})
observeEvent(input$done, {
stopApp(TRUE)
})
}
# Run the gadget
runGadget(ui, server)
[1] TRUE
