library(raster)
library(sparklyr)
library(dplyr)
library(ggplot2)
We’ll first set up a connection to the Spark cluster and reference the table we want.
sc <- spark_connect(master = "local", version = "2.0.1", hadoop_version = "2.7")
nypd <- spark_read_csv(sc, "nypd", "/home/rspark/myR/github/spark/NYPD_Motor_Vehicle_Collisions.csv", overwrite=TRUE)
A bit of data cleaning to ensure that we only get rows with that includes the fields that we’re interested in.
cleanNY <- nypd %>%
filter(!is.na(LATITUDE), !is.na(LONGITUDE), LATITUDE != 0, LONGITUDE != 0, BOROUGH != "") %>%
arrange(desc(UNIQUE_KEY))
dplyr::explain(cleanNY)
<SQL>
SELECT *
FROM (SELECT *
FROM `nypd`
WHERE ((NOT((`LATITUDE`) IS NULL)) AND (NOT((`LONGITUDE`) IS NULL)) AND (`LATITUDE` != 0.0) AND (`LONGITUDE` != 0.0) AND (`BOROUGH` != ''))) `echumsfbch`
ORDER BY `UNIQUE_KEY` DESC
<PLAN>
plan
1 == Physical Plan ==
2 *Sort [UNIQUE_KEY#92 DESC NULLS LAST], true, 0
3 +- Exchange rangepartitioning(UNIQUE_KEY#92 DESC NULLS LAST, 1)
4 +- *Filter (((((((isnotnull(LONGITUDE#74) && isnotnull(LATITUDE#73)) && isnotnull(BOROUGH#71)) && NOT isnull(LATITUDE#73)) && NOT isnull(LONGITUDE#74)) && NOT (LATITUDE#73 = 0.0)) && NOT (LONGITUDE#74 = 0.0)) && NOT (BOROUGH#71 = ))
5 +- InMemoryTableScan [DATE#69, TIME#70, BOROUGH#71, ZIP_CODE#72, LATITUDE#73, LONGITUDE#74, LOCATION#75, ON_STREET_NAME#76, CROSS_STREET_NAME#77, OFF_STREET_NAME#78, NUMBER_OF_PERSONS_INJURED#79, NUMBER_OF_PERSONS_KILLED#80, NUMBER_OF_PEDESTRIANS_INJURED#81, NUMBER_OF_PEDESTRIANS_KILLED#82, NUMBER_OF_CYCLIST_INJURED#83, NUMBER_OF_CYCLIST_KILLED#84, NUMBER_OF_MOTORIST_INJURED#85, NUMBER_OF_MOTORIST_KILLED#86, CONTRIBUTING_FACTOR_VEHICLE_1#87, CONTRIBUTING_FACTOR_VEHICLE_2#88, CONTRIBUTING_FACTOR_VEHICLE_3#89, CONTRIBUTING_FACTOR_VEHICLE_4#90, CONTRIBUTING_FACTOR_VEHICLE_5#91, UNIQUE_KEY#92, ... 5 more fields], [isnotnull(LONGITUDE#74), isnotnull(LATITUDE#73), isnotnull(BOROUGH#71), NOT isnull(LATITUDE#73), NOT isnull(LONGITUDE#74), NOT (LATITUDE#73 = 0.0), NOT (LONGITUDE#74 = 0.0), NOT (BOROUGH#71 = )]
6 +- InMemoryRelation [DATE#69, TIME#70, BOROUGH#71, ZIP_CODE#72, LATITUDE#73, LONGITUDE#74, LOCATION#75, ON_STREET_NAME#76, CROSS_STREET_NAME#77, OFF_STREET_NAME#78, NUMBER_OF_PERSONS_INJURED#79, NUMBER_OF_PERSONS_KILLED#80, NUMBER_OF_PEDESTRIANS_INJURED#81, NUMBER_OF_PEDESTRIANS_KILLED#82, NUMBER_OF_CYCLIST_INJURED#83, NUMBER_OF_CYCLIST_KILLED#84, NUMBER_OF_MOTORIST_INJURED#85, NUMBER_OF_MOTORIST_KILLED#86, CONTRIBUTING_FACTOR_VEHICLE_1#87, CONTRIBUTING_FACTOR_VEHICLE_2#88, CONTRIBUTING_FACTOR_VEHICLE_3#89, CONTRIBUTING_FACTOR_VEHICLE_4#90, CONTRIBUTING_FACTOR_VEHICLE_5#91, UNIQUE_KEY#92, ... 5 more fields], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `nypd`
7 +- *Project [DATE#10 AS DATE#69, TIME#11 AS TIME#70, BOROUGH#12 AS BOROUGH#71, ZIP CODE#13 AS ZIP_CODE#72, LATITUDE#14 AS LATITUDE#73, LONGITUDE#15 AS LONGITUDE#74, LOCATION#16 AS LOCATION#75, ON STREET NAME#17 AS ON_STREET_NAME#76, CROSS STREET NAME#18 AS CROSS_STREET_NAME#77, OFF STREET NAME#19 AS OFF_STREET_NAME#78, NUMBER OF PERSONS INJURED#20 AS NUMBER_OF_PERSONS_INJURED#79, NUMBER OF PERSONS KILLED#21 AS NUMBER_OF_PERSONS_KILLED#80, NUMBER OF PEDESTRIANS INJURED#22 AS NUMBER_OF_PEDESTRIANS_INJURED#81, NUMBER OF PEDESTRIANS KILLED#23 AS NUMBER_OF_PEDESTRIANS_KILLED#82, NUMBER OF CYCLIST INJURED#24 AS NUMBER_OF_CYCLIST_INJURED#83, NUMBER OF CYCLIST KILLED#25 AS NUMBER_OF_CYCLIST_KILLED#84, NUMBER OF MOTORIST INJURED#26 AS NUMBER_OF_MOTORIST_INJURED#85, NUMBER OF MOTORIST KILLED#27 AS NUMBER_OF_MOTORIST_KILLED#86, CONTRIBUTING FACTOR VEHICLE 1#28 AS CONTRIBUTING_FACTOR_VEHICLE_1#87, CONTRIBUTING FACTOR VEHICLE 2#29 AS CONTRIBUTING_FACTOR_VEHICLE_2#88, CONTRIBUTING FACTOR VEHICLE 3#30 AS CONTRIBUTING_FACTOR_VEHICLE_3#89, CONTRIBUTING FACTOR VEHICLE 4#31 AS CONTRIBUTING_FACTOR_VEHICLE_4#90, CONTRIBUTING FACTOR VEHICLE 5#32 AS CONTRIBUTING_FACTOR_VEHICLE_5#91, UNIQUE KEY#33 AS UNIQUE_KEY#92, ... 5 more fields]
8 +- *FileScan csv [DATE#10,TIME#11,BOROUGH#12,ZIP CODE#13,LATITUDE#14,LONGITUDE#15,LOCATION#16,ON STREET NAME#17,CROSS STREET NAME#18,OFF STREET NAME#19,NUMBER OF PERSONS INJURED#20,NUMBER OF PERSONS KILLED#21,NUMBER OF PEDESTRIANS INJURED#22,NUMBER OF PEDESTRIANS KILLED#23,NUMBER OF CYCLIST INJURED#24,NUMBER OF CYCLIST KILLED#25,NUMBER OF MOTORIST INJURED#26,NUMBER OF MOTORIST KILLED#27,CONTRIBUTING FACTOR VEHICLE 1#28,CONTRIBUTING FACTOR VEHICLE 2#29,CONTRIBUTING FACTOR VEHICLE 3#30,CONTRIBUTING FACTOR VEHICLE 4#31,CONTRIBUTING FACTOR VEHICLE 5#32,UNIQUE KEY#33,... 5 more fields] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/rspark/myR/github/spark/NYPD_Motor_Vehicle_Collisions.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DATE:string,TIME:string,BOROUGH:string,ZIP CODE:int,LATITUDE:double,LONGITUDE:double,LOCAT...
Now that we have (the reference to) the table that we want, we can send it through an MLlib function to do some learning.
sets <- sparklyr::sdf_partition(cleanNY, training=0.9, test = 0.1)
train <- sets$training
test <- sets$test
dt <- ml_decision_tree(train , "BOROUGH", c("LATITUDE", "LONGITUDE"), max.bins = 200L, max.depth=10L, seed=123L)
* No rows dropped by 'na.omit' call
# numeric to string mapping
lbls <- dt$model.parameters$labels
boroughs <- data.frame(BID=0:(length(lbls)-1), BOROUGH=lbls)
btbl <- copy_to(sc, boroughs, overwrite = TRUE)
preds <- sdf_predict(dt, test) %>% left_join(btbl) %>% mutate(correct = prediction == BID) %>% left_join(btbl, by=c("prediction"="BID")) %>% mutate(BOROUGH=BOROUGH.x, PREDICTION_NAME = BOROUGH.y)
Joining, by = "BOROUGH"
preds %>% group_by(correct) %>% summarize(count = n())
Since we want to plot the data locally, let’s collect some of the data so we can get a decent plot to visualize our results. We’ll just grab 100k rows. To be clear, this is not a limitation of Spark but rather is a limitation of a.) how much data we want to import into R and b.) how many points we want to try to plot.
results <- preds %>%
select(BOROUGH, LATITUDE, LONGITUDE, PREDICTION_NAME, correct) %>%
collect()
We now have a local table that has 100k results with their predictions alongside them. We can plot out the true values for these 100k rows:
results %>%
ggplot(aes(LONGITUDE, LATITUDE, color=as.factor(PREDICTION_NAME))) +
geom_point(alpha=0.2, size=1) +
guides(colour = guide_legend(override.aes = list(alpha = 1)))
Or we can plot where our MLlib prediction was in/correct:
results %>%
ggplot(aes(LONGITUDE, LATITUDE, color=as.factor(correct))) + geom_point(alpha=0.2, size=1) + guides(colour = guide_legend(override.aes = list(alpha = 1)))
Pretty close! In total, we got 99.9065864% correct.
# spark_disconnect(sc)