1 Sparklyr

Data is everywhere and is getting bigger and bigger nowadays, the massive amount of data sets cannot be processed and analyzed using traditional tools. Apache Spark is the perfect tool to deal with big data. It is a data processing framework used in many programming languages like R, Python and others.

Sparklyr is the Apache spark library in R.
To learn more about sparklyr, I suggest:
- spark.rstudio
- Mastering Spark with R

1.1 Spark Settings

First, you have to know that sparks runs on Java8. You need to install Java on your machine to get it work. To see all spark versions available:

library(sparklyr)
## 
## Attache Paket: 'sparklyr'
## Das folgende Objekt ist maskiert 'package:stats':
## 
##     filter
spark_available_versions()
##    spark
## 1    1.6
## 2    2.0
## 3    2.1
## 4    2.2
## 5    2.3
## 6    2.4
## 7    3.0
## 8    3.1
## 9    3.2
## 10   3.3

For this tutorial, the configuration is:
- Version 3.3.0
- 8 CPU cores
- 16GB RAM
- 0.6 for the memory fraction
- Mode local, other options are available to connect to Yarn (most common), Livy, Mesos, Kubernetes, …

if(exists("sc")==FALSE){
  spark_install(version="3.3.0")
  # Spark configuration
  conf <- spark_config()
  conf$`sparklyr.cores.local` <- 8
  conf$`sparklyr.shell.driver-memory` <- "16G"
  conf$spark.memory.fraction <- 0.6
  # Connect to Spark
  sc<-spark_connect(master="local", version="3.3.0", config=conf)
}

1.2 AWS

Spark is ready, first step will be to make spark read the data.
For that, I will download “nyc trip files” from AWS (Amazon Web Services). No AWS account is required to access as it is open data.
To load library “aws.s3”:

library(aws.s3)
library(dplyr)

We’ll also use dplyr, it’s a very interesting library for data manipulations. To access data, we’ll need the bucket (AWS data is stored into bucket) address and the region.

1.2.1 Connect to AWS bucket

To test if the bucket exists:

bucket_exists(
  bucket = "s3://nyc-tlc-trip-records-pds/",
  region = "us-east-1"
)
## [1] TRUE
## attr(,"x-amz-id-2")
## [1] "+G3fPG3BekUtF5IHFicC8byo5PgGOxEohSFZe2tHpTUFQOo/SlTBqChjomuxb/8OPbTyjWQmx6Q="
## attr(,"x-amz-request-id")
## [1] "CAV079RG789W3PSE"
## attr(,"date")
## [1] "Wed, 24 Aug 2022 16:03:29 GMT"
## attr(,"x-amz-bucket-region")
## [1] "us-east-1"
## attr(,"x-amz-access-point-alias")
## [1] "false"
## attr(,"content-type")
## [1] "application/xml"
## attr(,"server")
## [1] "AmazonS3"

Great, R can access the bucket.

1.2.2 Read AWS bucket

A bucket can contain a lot of files in many formats.
To see all the available files:

nyc_trips_files <- get_bucket_df(
  bucket = "s3://nyc-tlc-trip-records-pds/", 
  region = "us-east-1",
  max=Inf
) %>% 
  as_tibble()
str(nyc_trips_files)
## tibble [33,476 × 8] (S3: tbl_df/tbl/data.frame)
##  $ Key              : chr [1:33476] "csv/" "csv/year=2010/month=01/color=yellow/yellow_tripdata_2010-01.csv.bz2" "csv/year=2010/month=02/color=yellow/yellow_tripdata_2010-02.csv.bz2" "csv/year=2010/month=03/color=yellow/yellow_tripdata_2010-03.csv.bz2" ...
##  $ LastModified     : chr [1:33476] "2016-06-27T19:51:49.000Z" "2016-06-27T20:13:15.000Z" "2016-06-27T20:13:18.000Z" "2016-06-27T20:13:20.000Z" ...
##  $ ETag             : chr [1:33476] "\"d41d8cd98f00b204e9800998ecf8427e\"" "\"da4bb371413beec08d07f4bacb0c5f82-43\"" "\"0c397d3f331b2c53060c46b491848b1e-30\"" "\"1769594b697cb1058142368f661dc836-34\"" ...
##  $ Size             : chr [1:33476] "0" "357133781" "245672528" "282214396" ...
##  $ Owner_ID         : chr [1:33476] "b1e110da4d570a880e67e7282633fec434de50998cd4dcfe5b876a497dea7bc6" NA NA NA ...
##  $ Owner_DisplayName: chr [1:33476] "amazonec2publicdatasets" NA NA NA ...
##  $ StorageClass     : chr [1:33476] "STANDARD" "STANDARD" "STANDARD" "STANDARD" ...
##  $ Bucket           : chr [1:33476] "nyc-tlc-trip-records-pds" "nyc-tlc-trip-records-pds" "nyc-tlc-trip-records-pds" "nyc-tlc-trip-records-pds" ...
##  - attr(*, "Marker")= list()
##  - attr(*, "IsTruncated")= chr "false"
##  - attr(*, "MaxKeys")= chr "34000"

As you can see, there are nearly 34000 files available in this bucket.

nyc_trips_files %>%
  select(Key,Size) %>%
  head(n=10)
## # A tibble: 10 × 2
##    Key                                                                 Size     
##    <chr>                                                               <chr>    
##  1 csv/                                                                0        
##  2 csv/year=2010/month=01/color=yellow/yellow_tripdata_2010-01.csv.bz2 357133781
##  3 csv/year=2010/month=02/color=yellow/yellow_tripdata_2010-02.csv.bz2 245672528
##  4 csv/year=2010/month=03/color=yellow/yellow_tripdata_2010-03.csv.bz2 282214396
##  5 csv/year=2010/month=04/color=yellow/yellow_tripdata_2010-04.csv.bz2 365123261
##  6 csv/year=2010/month=05/color=yellow/yellow_tripdata_2010-05.csv.bz2 374050413
##  7 csv/year=2010/month=06/color=yellow/yellow_tripdata_2010-06.csv.bz2 357705836
##  8 csv/year=2010/month=07/color=yellow/yellow_tripdata_2010-07.csv.bz2 353737675
##  9 csv/year=2010/month=08/color=yellow/yellow_tripdata_2010-08.csv.bz2 275115140
## 10 csv/year=2010/month=09/color=yellow/yellow_tripdata_2010-09.csv.bz2 349001203

Every CSV file contains the data of precised year and month, files are zipped (bz2 format).

nyc_trips_files %>%
  select(Key,Size) %>%
  filter(grepl('csv.bz2', Key) & grepl('yellow', Key)) %>%
  mutate(Year=substr(Key,10,13)) %>%
  mutate(Month=substr(Key,21,22)) %>%
  group_by(Year) %>%
  summarise(
    CountFiles=n(),
    SizeFiles=sum(as.numeric(Size), na.rm=TRUE)
  ) %>%
  arrange(Year)
## # A tibble: 6 × 3
##   Year  CountFiles  SizeFiles
##   <chr>      <int>      <dbl>
## 1 2010          12 3896750008
## 2 2011          12 3890375209
## 3 2012          12 3867830667
## 4 2013          12 3586433454
## 5 2014          12 3397516149
## 6 2015          12 3618440670

In the bucket we have data between 2010 and 2015. In this tutorial, we’ll focus on year 2015.

nyc_trips_CSVfiles<- nyc_trips_files %>%
  filter(grepl('csv/year=2015', Key) & grepl('yellow', Key))


1.2.3 Download files from AWS

To download the bz2 files you have simply to specify the bucket coordinates, Key name and the path where you want to save your file.

for(key in nyc_trips_CSVfiles$Key){
  CSV_filename <- substr(key, nchar(key)-30, nchar(key))
  
  if(! file.exists(paste0("zip/",CSV_filename))){
    save_object(
      object = key,
      bucket = "s3://nyc-tlc-trip-records-pds/", 
      region = "us-east-1",
      file = paste0(getwd(),"/zip/",CSV_filename)
    )
  }
}
list.files(path="zip/", pattern="*.bz2", full.names=TRUE, recursive=FALSE)
##  [1] "zip//yellow_tripdata_2015-01.csv.bz2"
##  [2] "zip//yellow_tripdata_2015-02.csv.bz2"
##  [3] "zip//yellow_tripdata_2015-03.csv.bz2"
##  [4] "zip//yellow_tripdata_2015-04.csv.bz2"
##  [5] "zip//yellow_tripdata_2015-05.csv.bz2"
##  [6] "zip//yellow_tripdata_2015-06.csv.bz2"
##  [7] "zip//yellow_tripdata_2015-07.csv.bz2"
##  [8] "zip//yellow_tripdata_2015-08.csv.bz2"
##  [9] "zip//yellow_tripdata_2015-09.csv.bz2"
## [10] "zip//yellow_tripdata_2015-10.csv.bz2"
## [11] "zip//yellow_tripdata_2015-11.csv.bz2"
## [12] "zip//yellow_tripdata_2015-12.csv.bz2"

1.2.4 Read data

Spark can read a lot of data formats:
- CSV files
- JSON files
- AVRO files
- PARQUET files
- Binary files
- Images

list.files(path="zip/", pattern="*.bz2", full.names=TRUE, recursive=FALSE)
##  [1] "zip//yellow_tripdata_2015-01.csv.bz2"
##  [2] "zip//yellow_tripdata_2015-02.csv.bz2"
##  [3] "zip//yellow_tripdata_2015-03.csv.bz2"
##  [4] "zip//yellow_tripdata_2015-04.csv.bz2"
##  [5] "zip//yellow_tripdata_2015-05.csv.bz2"
##  [6] "zip//yellow_tripdata_2015-06.csv.bz2"
##  [7] "zip//yellow_tripdata_2015-07.csv.bz2"
##  [8] "zip//yellow_tripdata_2015-08.csv.bz2"
##  [9] "zip//yellow_tripdata_2015-09.csv.bz2"
## [10] "zip//yellow_tripdata_2015-10.csv.bz2"
## [11] "zip//yellow_tripdata_2015-11.csv.bz2"
## [12] "zip//yellow_tripdata_2015-12.csv.bz2"
spark_read_csv(sc, name = "df",  path = "zip/yellow_tripdata_2015*.bz2")
## # Source: spark<df> [?? x 22]
##    vendorid lpep_pickup_datetime lpep_dropoff_datet… store_and_fwd_f… ratecodeid
##       <int> <dttm>               <dttm>              <chr>                 <int>
##  1        2 2015-08-01 00:00:15  2015-08-01 00:36:21 N                         1
##  2        1 2015-08-01 00:00:16  2015-08-01 00:14:52 N                         1
##  3        1 2015-08-01 00:00:16  2015-08-01 00:06:30 N                         1
##  4        1 2015-08-01 00:00:16  2015-08-01 00:06:18 N                         1
##  5        2 2015-08-01 00:00:16  2015-08-01 00:16:28 N                         1
##  6        2 2015-08-01 00:00:16  2015-08-01 00:13:17 N                         1
##  7        2 2015-08-01 00:00:16  2015-08-01 00:14:00 N                         1
##  8        2 2015-08-01 00:00:16  2015-08-01 00:25:25 N                         1
##  9        1 2015-08-01 00:00:17  2015-08-01 00:26:59 N                         1
## 10        1 2015-08-01 00:00:17  2015-08-01 00:08:26 N                         1
## # … with more rows, and 17 more variables: pickup_longitude <dbl>,
## #   pickup_latitude <dbl>, dropoff_longitude <dbl>, dropoff_latitude <dbl>,
## #   passenger_count <int>, trip_distance <dbl>, fare_amount <dbl>, extra <dbl>,
## #   mta_tax <dbl>, tip_amount <dbl>, tolls_amount <dbl>,
## #   improvement_surcharge <dbl>, ehail_fee <chr>, total_amount <dbl>,
## #   payment_type <int>, trip_type <chr>, tcolor <chr>

df has 1.4611299^{8} rows and 22 columns. To display the first 10 rows:

head(tbl(sc,"df"),n=10)
## # Source: spark<?> [?? x 22]
##    vendorid lpep_pickup_datetime lpep_dropoff_datet… store_and_fwd_f… ratecodeid
##       <int> <dttm>               <dttm>              <chr>                 <int>
##  1        2 2015-08-01 00:00:15  2015-08-01 00:36:21 N                         1
##  2        1 2015-08-01 00:00:16  2015-08-01 00:14:52 N                         1
##  3        1 2015-08-01 00:00:16  2015-08-01 00:06:30 N                         1
##  4        1 2015-08-01 00:00:16  2015-08-01 00:06:18 N                         1
##  5        2 2015-08-01 00:00:16  2015-08-01 00:16:28 N                         1
##  6        2 2015-08-01 00:00:16  2015-08-01 00:13:17 N                         1
##  7        2 2015-08-01 00:00:16  2015-08-01 00:14:00 N                         1
##  8        2 2015-08-01 00:00:16  2015-08-01 00:25:25 N                         1
##  9        1 2015-08-01 00:00:17  2015-08-01 00:26:59 N                         1
## 10        1 2015-08-01 00:00:17  2015-08-01 00:08:26 N                         1
## # … with 17 more variables: pickup_longitude <dbl>, pickup_latitude <dbl>,
## #   dropoff_longitude <dbl>, dropoff_latitude <dbl>, passenger_count <int>,
## #   trip_distance <dbl>, fare_amount <dbl>, extra <dbl>, mta_tax <dbl>,
## #   tip_amount <dbl>, tolls_amount <dbl>, improvement_surcharge <dbl>,
## #   ehail_fee <chr>, total_amount <dbl>, payment_type <int>, trip_type <chr>,
## #   tcolor <chr>


1.3 Memory usage

df is spark table, it is stored as collection of RDD partitions (Resilient Distributed Dataset) stored across different nodes. To see the memory usage of Spark:

sc %>% 
  spark_context %>% 
  invoke("getRDDStorageInfo")
## [[1]]
## <jobj[47]>
##   org.apache.spark.storage.RDDInfo
##   RDD "In-memory table df" (16) StorageLevel: StorageLevel(disk, memory, deserialized, 1 replicas); CachedPartitions: 29; TotalPartitions: 29; MemorySize: 8.7 GiB; DiskSize: 6.4 GiB

Table ´df´ has 29 partitions on the RAM and on the disk.

1.4 Web interface

Spark offer a web interface where you can monitor all the memory resources.
To connect:

spark_web(sc)


2 SQL

Once cached, Spark table can be queried like any database using SQL code.

2.1 Trips count by month

To extract trips count for the first six months, we can use SQL syntax (Select From Where).

SELECT count(*) as trip_count, DATE_FORMAT(lpep_pickup_datetime,'M-y') as trip_month FROM (
  SELECT lpep_pickup_datetime FROM df Where lpep_pickup_datetime<'2015-07-01'
)
group by trip_month order by trip_month asc
6 records
trip_count trip_month
12748986 1-2015
12450521 2-2015
13351609 3-2015
13071789 4-2015
13158262 5-2015
12324935 6-2015


2.2 Payment type

The “metadata” tell us more about the variable payment_type:
1= Credit card
2= Cash
3= No charge
4= Dispute
5= Unknown
6= Voided trip

SELECT COUNT(*) as count_payment, sum(total_amount) as payment_total, payment_type 
FROM df
group by payment_type 
order by count_payment desc
5 records
count_payment payment_total payment_type
91574644 1.621943e+09 1
53864648 7.192573e+08 2
503070 7.626925e+06 3
170599 3.042620e+06 4
28 5.311900e+02 5
SELECT DISTINCT payment_type,
  ROUND(SUM(total_amount) OVER (PARTITION BY payment_type) / SUM(total_amount) OVER() ,2) as payment_perc
FROM df
order by payment_perc desc
5 records
payment_type payment_perc
1 0.69
2 0.31
3 0.00
4 0.00
5 0.00

The majority of payment in 2015 was with credit card (payment_type =1), it was representing 69% of the total. Cash represented only 31%.

3 Dplyr

dplyr is one of the core packages of tidyverse, it is a very powerful tool to manipulate data. Most common process are:
-Select() : select the columns that you need it for your analysis
-mutate() : create new column function of other columns
-filter() : filter your data (for example if you need to concentrate on a certain range of your data)
-group by() : group your data in function of one or more variables
-summarise() : do calculations like mean, sum, etc…
-arrange() : sorting your data
And some others “(to know more about dplyr)”

# Transform data
  
  daily_stats <- tbl(sc,"df") %>%
  
    # Select columns
    select(lpep_pickup_datetime, total_amount, passenger_count, lpep_dropoff_datetime,
           payment_type, vendorid, trip_distance, tip_amount) %>%
    
    # New column trip date
    mutate(pickup_date = as.Date(lpep_pickup_datetime)) %>%
    
    # Filter by date
    filter(pickup_date >="2015-01-01", pickup_date<="2015-12-31", total_amount>=0, passenger_count>=0) %>%
    
    # Filter Total_amount >=0
    filter(total_amount>=0) %>%
    
    # Create new column: duration of trip in seconds
    mutate(duration_sec=unix_timestamp(lpep_dropoff_datetime)-unix_timestamp(lpep_pickup_datetime)) %>%
    
    # Keep only payment_type with nchar <=3
    filter(nchar(payment_type)<=3) %>%
    
    # Replace (Cas, Cre, Dis, 1, 2, 3, 4) With (CAS, CRE, DIS, CRE, CAS, NOC, DIS)
    mutate(payment_type=regexp_replace(payment_type, "Cas", "CAS")) %>%
    mutate(payment_type=regexp_replace(payment_type, "Cre", "CRE")) %>%
    mutate(payment_type=regexp_replace(payment_type, "Dis", "DIS")) %>%
    mutate(payment_type=regexp_replace(payment_type, "1", "CRE")) %>%
    mutate(payment_type=regexp_replace(payment_type, "2", "CAS")) %>%
    mutate(payment_type=regexp_replace(payment_type, "3", "NOC")) %>%
    mutate(payment_type=regexp_replace(payment_type, "4", "DIS")) %>%
    
    # Group by
    group_by(pickup_date, payment_type, vendorid) %>%
    
    # Calculations
    summarise(
      # Trips count
      trip_count=n(),
      # Sum of passengers
      passengers_count=sum(passenger_count, na.rm=TRUE),
      # Average of passenger
      passengers_mean=mean(passenger_count, na.rm=TRUE),
      # Sum of distance
      distance_total=sum(trip_distance, na.rm=TRUE),
      # Mean of distance
      distance_mean=mean(trip_distance, na.rm=TRUE),
      # Total Payment
      payment_total=sum(total_amount, na.rm=TRUE),
      # Payment mean
      payment_mean=mean(total_amount, na.rm=TRUE),
      # Total tip
      tip_amount=sum(tip_amount, na.rm=TRUE),
      # Sum of trip periods in seconds
      duration_total=sum(duration_sec, na.rm=TRUE),
      # Average of trip periods in seconds
      duration_mean=mean(duration_sec, na.rm=TRUE)
    ) %>%
    
    # Sorting
    arrange(pickup_date, payment_type, vendorid) %>%
    
    # Load data into memory
    collect()

After all this manipulations, I display the structure of the new data:

str(daily_stats)
## tibble [2,221 × 13] (S3: tbl_df/tbl/data.frame)
##  $ pickup_date     : Date[1:2221], format: "2015-01-01" "2015-01-01" ...
##  $ payment_type    : chr [1:2221] "CAS" "CAS" "CRE" "CRE" ...
##  $ vendorid        : int [1:2221] 1 2 1 2 1 1 1 2 1 2 ...
##  $ trip_count      : num [1:2221] 86369 105185 84892 103586 430 ...
##  $ passengers_count: num [1:2221] 132913 218074 121352 215287 630 ...
##  $ passengers_mean : num [1:2221] 1.54 2.07 1.43 2.08 1.47 ...
##  $ distance_total  : num [1:2221] 257769 323956 289229 359247 1515 ...
##  $ distance_mean   : num [1:2221] 2.98 3.08 3.41 3.47 3.52 ...
##  $ payment_total   : num [1:2221] 1138371 1415751 1465804 1799491 6494 ...
##  $ payment_mean    : num [1:2221] 13.2 13.5 17.3 17.4 15.1 ...
##  $ tip_amount      : num [1:2221] 16.3 0 226129.9 266849.2 2.2 ...
##  $ duration_total  : num [1:2221] 58280220 98219869 61830430 88712814 301134 ...
##  $ duration_mean   : num [1:2221] 675 934 728 856 700 ...

I print all the levels to check if I still have errors with column ´payment_type´:

levels(as.factor(daily_stats$payment_type))
## [1] "5"   "CAS" "CRE" "DIS" "NOC"

It is easier to work with symbols (CAS,CRE) than with 1,2.

4 Plots

There is a lot of libraries in R for making plots, the most famous is ggplot. Like Dplyr, it is an extension of tidyverse.
Particularities of ggplot: - data for plots is always a dataframe, variables are columns
- same basis for all the type of plots
- use ‘+’ to customize and add complexity to plot
- a large type of charts

library(ggplot2)


4.1 Total passengers each day

There were on average 600 Thousand taxi passengers each day in 2015. New York is a big city !

# Passengers per day
daily_stats %>%
  filter(distance_total>=0, distance_total<500000, payment_type %in% c("CAS", "CRE")) %>%
  group_by(pickup_date) %>%
  summarise(
    passenger_by_day=sum(passengers_count, na.rm=TRUE)
  ) %>%
  ggplot(aes(x=pickup_date, y=passenger_by_day))+geom_point()+ylim(c(0,1100000))+
  geom_smooth(se=TRUE, colour="red", span=0.3, linetype="11",method="loess")+
  ggtitle("NYC Passengers number",subtitle="(2015)")+
  xlab("Date")+ylab("Passengers number")
## `geom_smooth()` using formula 'y ~ x'


4.2 Payment type

In Point 2.2, We saw that the amount of payment with credit card is higher than payment cash. The next boxplot confirms that.

daily_stats %>%
  filter(distance_total>=0, distance_total<500000) %>%
  ggplot(aes(x=payment_type, y=payment_total,fill=payment_type))+geom_boxplot(varwidth=T)+
  ggtitle("NYC Payment method ",subtitle="(2015)")+
  xlab("Payment method")+ylab("Total amount")


4.3 Payment by weekday

# Payment by weekday
dayofweek <- list(day=c("7-Sunday","1-Monday","2-Tuesday","3-Wednesday","4-Thursday","5-Friday","6-Saturday"))
daily_stats %>%
  filter(payment_type %in% c("CRE","CAS"),payment_total<4000000) %>%
  mutate(week_day=dayofweek$day[as.POSIXlt(pickup_date)$wday+1]) %>%
  arrange(week_day) %>%
  ggplot(aes(x=week_day,y=payment_total,fill=week_day))+geom_boxplot(varwidth=T)+
  ggtitle("NYC Total of payment",subtitle="(2015)")+
  xlab("Weekday")+ylab("Payment")


4.4 Tips by weekday

I am not a taxi driver, but I was wondering if there is a favorite day for tips.

# Tip per day
dayofweek <- list(day=c("7-Sunday","1-Monday","2-Tuesday","3-Wednesday","4-Thursday","5-Friday","6-Saturday"))
daily_stats %>%
  filter(payment_type=="CRE") %>%
  mutate(week_day=dayofweek$day[as.POSIXlt(pickup_date)$wday+1]) %>%
  arrange(week_day) %>%
  ggplot(aes(x=week_day,y=tip_amount,fill=week_day))+geom_boxplot(varwidth=T)+
  ggtitle("Tips - payment with credit card",subtitle="(2015)")+
  xlab("Weekday")+ylab("Tips Total amount")


I see an outlier on Sunday, let’s see the plot if we remove it.

# Tip per day
dayofweek <- list(day=c("7-Sunday","1-Monday","2-Tuesday","3-Wednesday","4-Thursday","5-Friday","6-Saturday"))
daily_stats %>%
  filter(payment_type=="CRE") %>%
  filter(tip_amount<4000000) %>%
  mutate(week_day=dayofweek$day[as.POSIXlt(pickup_date)$wday+1]) %>%
  arrange(week_day) %>%
  ggplot(aes(x=week_day,y=tip_amount,fill=week_day))+geom_boxplot(varwidth=T)+
  ggtitle("Tips - payment with credit card",subtitle="(2015)")+
  xlab("Weekday")+ylab("Tips Total amount")


Much better ! If there is a taxi driver among the readers don’t refuse to work on Thursdays, according to data 2015 it’s the day of tips.

4.5 Total passengers by weekday

I am assuming that passengers number during the weekends is larger than in the rest of the week. The next boxplot confirms that.

# Weekday influence
daily_stats %>%
  mutate(week_day=as.POSIXlt(pickup_date)$wday) %>%
  group_by(week_day) %>%
  summarise(
    Totalpassengers_by_day=sum(passengers_count,na.rm=TRUE),
    Meanpassengers_by_day=mean(passengers_count,na.rm=TRUE),
    Stdpassengers_by_day=sd(passengers_count,na.rm=TRUE)
  ) %>%
  mutate(frepassengers_by_day=round(Totalpassengers_by_day/sum(Totalpassengers_by_day),2)) %>%
  mutate(week_day=dayofweek$day[week_day+1]) %>%
  arrange(Meanpassengers_by_day)
## # A tibble: 7 × 5
##   week_day   Totalpassengers… Meanpassengers_… Stdpassengers_b… frepassengers_b…
##   <chr>                 <dbl>            <dbl>            <dbl>            <dbl>
## 1 1-Monday           30889925           97445.           84912.             0.13
## 2 2-Tuesday          33163899          105282.           94105.             0.14
## 3 3-Wednesd…         34763789          108298.           98708.             0.14
## 4 7-Sunday           34228282          109007.           90997.             0.14
## 5 4-Thursday         36213921          112117.          100944.             0.15
## 6 5-Friday           36814486          116871.          102050.             0.15
## 7 6-Saturday         39392019          124658.          104751.             0.16
# Boxplot
daily_stats %>%
  filter(payment_type %in% c("CAS","CRE"), passengers_count>0, duration_total>0, distance_total>0) %>%
  mutate(Weekday=dayofweek$day[as.POSIXlt(pickup_date)$wday+1]) %>%
  arrange(Weekday) %>%
  ggplot(aes(x=Weekday,y=passengers_count,fill=Weekday))+geom_boxplot(varwidth = T)+
  ggtitle("NYC Passengers number", subtitle="(2015)")+
  xlab("Weekday")+ylab("Passengers number")


4.6 Trip duration

The next plot display a linear relation between the trip duration and payment.

# Payment vs duration
daily_stats %>%
  filter(distance_total>=0, distance_total<500000,duration_total<200000000,duration_total>=0,payment_total<4000000) %>%
  ggplot(aes(duration_total, y=payment_total, color=payment_type))+geom_point()+geom_smooth(method="lm")+
  ggtitle("NYC-Total of payment vs Total trip duration",subtitle="(2015)")+
  xlab("Duration")+ylab("Payment")
## `geom_smooth()` using formula 'y ~ x'


4.7 Trip distance

There is also a linear relation between duration of trip and payment.

# Payment vs distance
daily_stats %>%
  filter(distance_total>=0, distance_total<500000,duration_total<200000000,duration_total>=0,payment_total<4000000) %>%
  ggplot(aes(distance_total, y=payment_total, color=payment_type))+geom_point()+geom_smooth(method="lm")+
  ggtitle("NYC-Total payment vs Total distance",subtitle = "(2015)")+
  xlab("Total distance")+ylab("Total payment")
## `geom_smooth()` using formula 'y ~ x'


4.8 Map

In NYC dataset, we can use these columns to plot the pickup and dropoff points on the map:
- pickup_latitude
- pickup_longitude
- dropoff_latitude
- dropoff_longitude
With ´leaflet´, we can create plots directly integrated in Shiny applications or Markdown document.

map_data_12_2015 <- tbl(sc,"df") %>%
  # New column trip date
  mutate(pickup_date = as.Date(lpep_pickup_datetime)) %>%
  # Filters
  filter(pickup_date>="2015-12-01",pickup_date<="2015-12-31",
         pickup_longitude>=-74,pickup_longitude<=-72,pickup_latitude>=39,pickup_latitude<=42,
         dropoff_longitude>=-74,dropoff_longitude<=-72,dropoff_latitude>=39,dropoff_latitude<=42) %>%
  # Select columns
  select(pickup_date, pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude) %>%
  sample_n(size=50000) %>%
  collect()
library(leaflet)
leaflet(map_data_12_2015) %>% addTiles() %>%
  addCircles(lng = ~pickup_longitude, lat = ~pickup_latitude,weight=1,opacity=0.5,fill=TRUE,color = 'red') %>%
  addCircles(lng = ~dropoff_longitude, lat = ~dropoff_latitude,weight=1,opacity=0.5,fill=TRUE,color = 'red')

5 Modelisation

Theoretically, payment is in relation to:
- Trip duration
- Trip distance
- and propably by type of payment (cash or credit card)


5.1 Linear/Quadratic regression

I am splitting my data into:
- data test (90%): to create the model
- data validation (10%): to validate the accuracy
For both regressions, I will force the models to pass through the origin (Intercept=0)

df <- daily_stats %>%
  filter(distance_total>=0, distance_total<500000,duration_total<200000000,duration_total>=0,payment_total<4000000, payment_type %in% c("CAS","CRE"))
# Split data in 2: test (90%) and validation (10%)
Samples<-sample(seq(1,2), size=nrow(df), replace=TRUE, prob=c(0.9,0.1))
df_Test<-df[Samples==1,]
df_Validation<-df[Samples==2,]
# Linear regression
model <- lm(payment_total ~ 0 + distance_total + duration_total + payment_type, data=df_Test)
# Quadratic regression
model_2 <- lm(payment_total ~ 0 + poly(distance_total,2) + poly(duration_total,2) + payment_type, data=df_Test)
summary(model)
## 
## Call:
## lm(formula = payment_total ~ 0 + distance_total + duration_total + 
##     payment_type, data = df_Test)
## 
## Residuals:
##     Min      1Q  Median      3Q     Max 
## -729496  -44964    8618   61635 1690665 
## 
## Coefficients:
##                   Estimate Std. Error t value Pr(>|t|)    
## distance_total   2.712e+00  1.012e-01  26.799   <2e-16 ***
## duration_total   6.545e-03  2.258e-04  28.987   <2e-16 ***
## payment_typeCAS -1.923e+03  1.368e+04  -0.141    0.888    
## payment_typeCRE  3.648e+05  2.591e+04  14.083   <2e-16 ***
## ---
## Signif. codes:  0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1
## 
## Residual standard error: 106300 on 1124 degrees of freedom
## Multiple R-squared:  0.9962, Adjusted R-squared:  0.9962 
## F-statistic: 7.44e+04 on 4 and 1124 DF,  p-value: < 2.2e-16
summary(model_2)
## 
## Call:
## lm(formula = payment_total ~ 0 + poly(distance_total, 2) + poly(duration_total, 
##     2) + payment_type, data = df_Test)
## 
## Residuals:
##     Min      1Q  Median      3Q     Max 
## -760047  -40589   11686   50108 1673385 
## 
## Coefficients:
##                          Estimate Std. Error t value Pr(>|t|)    
## poly(distance_total, 2)1  9910784     360422  27.498  < 2e-16 ***
## poly(distance_total, 2)2  1077207     147281   7.314 4.92e-13 ***
## poly(duration_total, 2)1  7224838     256842  28.129  < 2e-16 ***
## poly(duration_total, 2)2  -322154     145060  -2.221   0.0266 *  
## payment_typeCAS           1403930       7443 188.625  < 2e-16 ***
## payment_typeCRE           1787921       7805 229.069  < 2e-16 ***
## ---
## Signif. codes:  0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1
## 
## Residual standard error: 103200 on 1122 degrees of freedom
## Multiple R-squared:  0.9965, Adjusted R-squared:  0.9964 
## F-statistic: 5.256e+04 on 6 and 1122 DF,  p-value: < 2.2e-16

The R squared is pretty much good (0.9962 for linear regression and 0.9964 for quadratic regression). So no big difference between the two models.
If you have a look on the displayed coefficients, we don’t have an intercepted value as expected.

5.2 Residual analysis

Residuals are the difference between the predicted values from the model and the original values. Residual analysis is a good technique to judge the goodness of the fitted model.

5.2.1 Linear regression

# residuals
res <- resid(model)
# residual vs. fitted plot
plot(fitted(model), res)
abline(0,0)

# Q-Q plot for residuals
qqnorm(res)
qqline(res) 

# density plot
plot(density(res))

5.2.2 Quadratic regression

# residuals
res <- resid(model_2)
# residual vs. fitted plot
plot(fitted(model_2), res)
abline(0,0)

# Q-Q plot for residuals
qqnorm(res)
qqline(res) 

# density plot
plot(density(res))


Residuals are quite normally distributed as we can see on the previous plots. The density plot follow a bell shape (better shape for quadratic model).

5.3 Prediction

To test more my models, I will apply them on the data validation set (10%) to predict the payment.

payment_pred<- predict(model, newdata=df_Validation) %>%
  as_tibble()
payment_pred$value2 <- predict(model_2,newdata=df_Validation)
payment_pred$Valid<-df_Validation$payment_total
payment_pred$payment_type <- df_Validation$payment_type
payment_pred %>%
  ggplot(aes(x=Valid, y=value, color=payment_type))+geom_point()+geom_smooth(method="lm")+
  ggtitle("NYC Total payment predicted",subtitle="Linear model")+
  xlab("Observed values")+ylab("Predicted Values")
## `geom_smooth()` using formula 'y ~ x'

payment_pred %>%
  ggplot(aes(x=Valid, y=value2, color=payment_type))+geom_point()+geom_smooth(method="lm")+
  ggtitle("NYC Total payment predicted",subtitle="Quadratic model")+
  xlab("Observed values")+ylab("Predicted Values")
## `geom_smooth()` using formula 'y ~ x'

# Mean error
payment_pred<-payment_pred %>%
  mutate(error_1=value-Valid) %>%
  mutate(error_2=value2-Valid)
df_boxplot <- as_tibble(c(rep("Linear model",nrow(payment_pred)),rep("Quadratic model",nrow(payment_pred))))
df_boxplot$error<-dplyr::union_all(payment_pred$error_1,payment_pred$error_2)
df_boxplot %>%
  ggplot(aes(x=value,y=error,fill=value))+geom_boxplot()+ggtitle("Comparaison of error between quadratic and linear models")+xlab("Regression model")+ylab("Error")


The quadratic model gives a smaller error on average, Points on the boxplot represents outliers.

6 Conclusion

In this tutorial, we saw how to use Spark locally to deal with “big data” and how easy it is to filter and analyze our data using all the other R libraries.

Jamel Belgacem LinkedIn
R & Python developer, data analyst.
Version 3.0