NYC Motor Vehicle Accidents

library(raster)
library(sparklyr)
library(dplyr)
library(ggplot2)

We’ll first set up a connection to the Spark cluster and reference the table we want.

sc <- spark_connect(master = "local", version = "2.0.1", hadoop_version = "2.7")
nypd <- spark_read_csv(sc, "nypd", "/home/rspark/myR/github/spark/NYPD_Motor_Vehicle_Collisions.csv", overwrite=TRUE)

A bit of data cleaning to ensure that we only get rows with that includes the fields that we’re interested in.

cleanNY <- nypd  %>% 
  filter(!is.na(LATITUDE), !is.na(LONGITUDE), LATITUDE != 0, LONGITUDE != 0, BOROUGH != "") %>% 
  arrange(desc(UNIQUE_KEY))
dplyr::explain(cleanNY)
<SQL>
SELECT *
FROM (SELECT *
FROM `nypd`
WHERE ((NOT((`LATITUDE`) IS NULL)) AND (NOT((`LONGITUDE`) IS NULL)) AND (`LATITUDE` != 0.0) AND (`LONGITUDE` != 0.0) AND (`BOROUGH` != ''))) `echumsfbch`
ORDER BY `UNIQUE_KEY` DESC


<PLAN>
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             plan
1                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             == Physical Plan ==
2                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  *Sort [UNIQUE_KEY#92 DESC NULLS LAST], true, 0
3                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 +- Exchange rangepartitioning(UNIQUE_KEY#92 DESC NULLS LAST, 1)
4                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        +- *Filter (((((((isnotnull(LONGITUDE#74) && isnotnull(LATITUDE#73)) && isnotnull(BOROUGH#71)) && NOT isnull(LATITUDE#73)) && NOT isnull(LONGITUDE#74)) && NOT (LATITUDE#73 = 0.0)) && NOT (LONGITUDE#74 = 0.0)) && NOT (BOROUGH#71 = ))
5                                                                                                                                                                                                                                                                                                                                                                                                                                                    +- InMemoryTableScan [DATE#69, TIME#70, BOROUGH#71, ZIP_CODE#72, LATITUDE#73, LONGITUDE#74, LOCATION#75, ON_STREET_NAME#76, CROSS_STREET_NAME#77, OFF_STREET_NAME#78, NUMBER_OF_PERSONS_INJURED#79, NUMBER_OF_PERSONS_KILLED#80, NUMBER_OF_PEDESTRIANS_INJURED#81, NUMBER_OF_PEDESTRIANS_KILLED#82, NUMBER_OF_CYCLIST_INJURED#83, NUMBER_OF_CYCLIST_KILLED#84, NUMBER_OF_MOTORIST_INJURED#85, NUMBER_OF_MOTORIST_KILLED#86, CONTRIBUTING_FACTOR_VEHICLE_1#87, CONTRIBUTING_FACTOR_VEHICLE_2#88, CONTRIBUTING_FACTOR_VEHICLE_3#89, CONTRIBUTING_FACTOR_VEHICLE_4#90, CONTRIBUTING_FACTOR_VEHICLE_5#91, UNIQUE_KEY#92, ... 5 more fields], [isnotnull(LONGITUDE#74), isnotnull(LATITUDE#73), isnotnull(BOROUGH#71), NOT isnull(LATITUDE#73), NOT isnull(LONGITUDE#74), NOT (LATITUDE#73 = 0.0), NOT (LONGITUDE#74 = 0.0), NOT (BOROUGH#71 = )]
6                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               +- InMemoryRelation [DATE#69, TIME#70, BOROUGH#71, ZIP_CODE#72, LATITUDE#73, LONGITUDE#74, LOCATION#75, ON_STREET_NAME#76, CROSS_STREET_NAME#77, OFF_STREET_NAME#78, NUMBER_OF_PERSONS_INJURED#79, NUMBER_OF_PERSONS_KILLED#80, NUMBER_OF_PEDESTRIANS_INJURED#81, NUMBER_OF_PEDESTRIANS_KILLED#82, NUMBER_OF_CYCLIST_INJURED#83, NUMBER_OF_CYCLIST_KILLED#84, NUMBER_OF_MOTORIST_INJURED#85, NUMBER_OF_MOTORIST_KILLED#86, CONTRIBUTING_FACTOR_VEHICLE_1#87, CONTRIBUTING_FACTOR_VEHICLE_2#88, CONTRIBUTING_FACTOR_VEHICLE_3#89, CONTRIBUTING_FACTOR_VEHICLE_4#90, CONTRIBUTING_FACTOR_VEHICLE_5#91, UNIQUE_KEY#92, ... 5 more fields], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `nypd`
7                   +- *Project [DATE#10 AS DATE#69, TIME#11 AS TIME#70, BOROUGH#12 AS BOROUGH#71, ZIP CODE#13 AS ZIP_CODE#72, LATITUDE#14 AS LATITUDE#73, LONGITUDE#15 AS LONGITUDE#74, LOCATION#16 AS LOCATION#75, ON STREET NAME#17 AS ON_STREET_NAME#76, CROSS STREET NAME#18 AS CROSS_STREET_NAME#77, OFF STREET NAME#19 AS OFF_STREET_NAME#78, NUMBER OF PERSONS INJURED#20 AS NUMBER_OF_PERSONS_INJURED#79, NUMBER OF PERSONS KILLED#21 AS NUMBER_OF_PERSONS_KILLED#80, NUMBER OF PEDESTRIANS INJURED#22 AS NUMBER_OF_PEDESTRIANS_INJURED#81, NUMBER OF PEDESTRIANS KILLED#23 AS NUMBER_OF_PEDESTRIANS_KILLED#82, NUMBER OF CYCLIST INJURED#24 AS NUMBER_OF_CYCLIST_INJURED#83, NUMBER OF CYCLIST KILLED#25 AS NUMBER_OF_CYCLIST_KILLED#84, NUMBER OF MOTORIST INJURED#26 AS NUMBER_OF_MOTORIST_INJURED#85, NUMBER OF MOTORIST KILLED#27 AS NUMBER_OF_MOTORIST_KILLED#86, CONTRIBUTING FACTOR VEHICLE 1#28 AS CONTRIBUTING_FACTOR_VEHICLE_1#87, CONTRIBUTING FACTOR VEHICLE 2#29 AS CONTRIBUTING_FACTOR_VEHICLE_2#88, CONTRIBUTING FACTOR VEHICLE 3#30 AS CONTRIBUTING_FACTOR_VEHICLE_3#89, CONTRIBUTING FACTOR VEHICLE 4#31 AS CONTRIBUTING_FACTOR_VEHICLE_4#90, CONTRIBUTING FACTOR VEHICLE 5#32 AS CONTRIBUTING_FACTOR_VEHICLE_5#91, UNIQUE KEY#33 AS UNIQUE_KEY#92, ... 5 more fields]
8                                                                                                                                                                                                                                                                                                                                                                                           +- *FileScan csv [DATE#10,TIME#11,BOROUGH#12,ZIP CODE#13,LATITUDE#14,LONGITUDE#15,LOCATION#16,ON STREET NAME#17,CROSS STREET NAME#18,OFF STREET NAME#19,NUMBER OF PERSONS INJURED#20,NUMBER OF PERSONS KILLED#21,NUMBER OF PEDESTRIANS INJURED#22,NUMBER OF PEDESTRIANS KILLED#23,NUMBER OF CYCLIST INJURED#24,NUMBER OF CYCLIST KILLED#25,NUMBER OF MOTORIST INJURED#26,NUMBER OF MOTORIST KILLED#27,CONTRIBUTING FACTOR VEHICLE 1#28,CONTRIBUTING FACTOR VEHICLE 2#29,CONTRIBUTING FACTOR VEHICLE 3#30,CONTRIBUTING FACTOR VEHICLE 4#31,CONTRIBUTING FACTOR VEHICLE 5#32,UNIQUE KEY#33,... 5 more fields] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/rspark/myR/github/spark/NYPD_Motor_Vehicle_Collisions.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DATE:string,TIME:string,BOROUGH:string,ZIP CODE:int,LATITUDE:double,LONGITUDE:double,LOCAT...

Now that we have (the reference to) the table that we want, we can send it through an MLlib function to do some learning.

sets <- sparklyr::sdf_partition(cleanNY, training=0.9, test = 0.1)
train <- sets$training
test <- sets$test
dt <- ml_decision_tree(train , "BOROUGH", c("LATITUDE", "LONGITUDE"), max.bins = 200L, max.depth=10L, seed=123L)
* No rows dropped by 'na.omit' call
# numeric to string mapping
lbls <- dt$model.parameters$labels
boroughs <- data.frame(BID=0:(length(lbls)-1), BOROUGH=lbls)
btbl <- copy_to(sc, boroughs, overwrite = TRUE)
preds <- sdf_predict(dt, test) %>% left_join(btbl) %>% mutate(correct = prediction == BID) %>% left_join(btbl, by=c("prediction"="BID")) %>% mutate(BOROUGH=BOROUGH.x, PREDICTION_NAME = BOROUGH.y)
Joining, by = "BOROUGH"
preds %>% group_by(correct) %>% summarize(count = n())

Since we want to plot the data locally, let’s collect some of the data so we can get a decent plot to visualize our results. We’ll just grab 100k rows. To be clear, this is not a limitation of Spark but rather is a limitation of a.) how much data we want to import into R and b.) how many points we want to try to plot.

results <- preds %>% 
  select(BOROUGH, LATITUDE, LONGITUDE, PREDICTION_NAME, correct) %>% 
  collect()

We now have a local table that has 100k results with their predictions alongside them. We can plot out the true values for these 100k rows:

results %>% 
  ggplot(aes(LONGITUDE, LATITUDE, color=as.factor(PREDICTION_NAME))) + 
    geom_point(alpha=0.2, size=1) + 
    guides(colour = guide_legend(override.aes = list(alpha = 1)))

Or we can plot where our MLlib prediction was in/correct:

results %>% 
  ggplot(aes(LONGITUDE, LATITUDE, color=as.factor(correct))) + geom_point(alpha=0.2, size=1) + guides(colour = guide_legend(override.aes = list(alpha = 1)))

Pretty close! In total, we got 99.9065864% correct.

# spark_disconnect(sc)
LS0tCnRpdGxlOiAiTllQRCBCb3JvdWdocyIKYXV0aG9yOiAiVGFydW4gUGFybWFyIgpkYXRlOiAiRmViIDE4LCAyMDE3IgpvdXRwdXQ6CiAgaHRtbF9ub3RlYm9vazogZGVmYXVsdAogIGh0bWxfZG9jdW1lbnQ6IGRlZmF1bHQKLS0tCgojIyBOWUMgTW90b3IgVmVoaWNsZSBBY2NpZGVudHMKCmBgYHtyLCBtZXNzYWdlPUZBTFNFLCB3YXJuaW5nPUZBTFNFfQpsaWJyYXJ5KHJhc3RlcikKbGlicmFyeShzcGFya2x5cikKbGlicmFyeShkcGx5cikKbGlicmFyeShnZ3Bsb3QyKQpgYGAKCldlJ2xsIGZpcnN0IHNldCB1cCBhIGNvbm5lY3Rpb24gdG8gdGhlIFNwYXJrIGNsdXN0ZXIgYW5kIHJlZmVyZW5jZSB0aGUgdGFibGUgd2Ugd2FudC4KCmBgYHtyfQpzYyA8LSBzcGFya19jb25uZWN0KG1hc3RlciA9ICJsb2NhbCIsIHZlcnNpb24gPSAiMi4wLjEiLCBoYWRvb3BfdmVyc2lvbiA9ICIyLjciKQpueXBkIDwtIHNwYXJrX3JlYWRfY3N2KHNjLCAibnlwZCIsICIvaG9tZS9yc3BhcmsvbXlSL2dpdGh1Yi9zcGFyay9OWVBEX01vdG9yX1ZlaGljbGVfQ29sbGlzaW9ucy5jc3YiLCBvdmVyd3JpdGU9VFJVRSkKYGBgCgpBIGJpdCBvZiBkYXRhIGNsZWFuaW5nIHRvIGVuc3VyZSB0aGF0IHdlIG9ubHkgZ2V0IHJvd3Mgd2l0aCB0aGF0IGluY2x1ZGVzIHRoZSBmaWVsZHMgdGhhdCB3ZSdyZSBpbnRlcmVzdGVkIGluLgoKYGBge3J9CmNsZWFuTlkgPC0gbnlwZCAgJT4lIAogIGZpbHRlcighaXMubmEoTEFUSVRVREUpLCAhaXMubmEoTE9OR0lUVURFKSwgTEFUSVRVREUgIT0gMCwgTE9OR0lUVURFICE9IDAsIEJPUk9VR0ggIT0gIiIpICU+JSAKICBhcnJhbmdlKGRlc2MoVU5JUVVFX0tFWSkpCgpkcGx5cjo6ZXhwbGFpbihjbGVhbk5ZKQpgYGAKCk5vdyB0aGF0IHdlIGhhdmUgKHRoZSByZWZlcmVuY2UgdG8pIHRoZSB0YWJsZSB0aGF0IHdlIHdhbnQsIHdlIGNhbiBzZW5kIGl0IHRocm91Z2ggYW4gTUxsaWIgZnVuY3Rpb24gdG8gZG8gc29tZSBsZWFybmluZy4KCmBgYHtyfQpzZXRzIDwtIHNwYXJrbHlyOjpzZGZfcGFydGl0aW9uKGNsZWFuTlksIHRyYWluaW5nPTAuOSwgdGVzdCA9IDAuMSkKdHJhaW4gPC0gc2V0cyR0cmFpbmluZwp0ZXN0IDwtIHNldHMkdGVzdAoKZHQgPC0gbWxfZGVjaXNpb25fdHJlZSh0cmFpbiAsICJCT1JPVUdIIiwgYygiTEFUSVRVREUiLCAiTE9OR0lUVURFIiksIG1heC5iaW5zID0gMjAwTCwgbWF4LmRlcHRoPTEwTCwgc2VlZD0xMjNMKQoKIyBudW1lcmljIHRvIHN0cmluZyBtYXBwaW5nCmxibHMgPC0gZHQkbW9kZWwucGFyYW1ldGVycyRsYWJlbHMKYm9yb3VnaHMgPC0gZGF0YS5mcmFtZShCSUQ9MDoobGVuZ3RoKGxibHMpLTEpLCBCT1JPVUdIPWxibHMpCmJ0YmwgPC0gY29weV90byhzYywgYm9yb3VnaHMsIG92ZXJ3cml0ZSA9IFRSVUUpCgpwcmVkcyA8LSBzZGZfcHJlZGljdChkdCwgdGVzdCkgJT4lIGxlZnRfam9pbihidGJsKSAlPiUgbXV0YXRlKGNvcnJlY3QgPSBwcmVkaWN0aW9uID09IEJJRCkgJT4lIGxlZnRfam9pbihidGJsLCBieT1jKCJwcmVkaWN0aW9uIj0iQklEIikpICU+JSBtdXRhdGUoQk9ST1VHSD1CT1JPVUdILngsIFBSRURJQ1RJT05fTkFNRSA9IEJPUk9VR0gueSkKcHJlZHMgJT4lIGdyb3VwX2J5KGNvcnJlY3QpICU+JSBzdW1tYXJpemUoY291bnQgPSBuKCkpCmBgYAoKU2luY2Ugd2Ugd2FudCB0byBwbG90IHRoZSBkYXRhIGxvY2FsbHksIGxldCdzIGNvbGxlY3Qgc29tZSBvZiB0aGUgZGF0YSBzbyB3ZSBjYW4gZ2V0IGEgZGVjZW50IHBsb3QgdG8gdmlzdWFsaXplIG91ciByZXN1bHRzLiBXZSdsbCBqdXN0IGdyYWIgMTAwayByb3dzLiBUbyBiZSBjbGVhciwgdGhpcyBpcyBub3QgYSBsaW1pdGF0aW9uIG9mIFNwYXJrIGJ1dCByYXRoZXIgaXMgYSBsaW1pdGF0aW9uIG9mIGEuKSBob3cgbXVjaCBkYXRhIHdlIHdhbnQgdG8gaW1wb3J0IGludG8gUiBhbmQgYi4pIGhvdyBtYW55IHBvaW50cyB3ZSB3YW50IHRvIHRyeSB0byBwbG90LgoKYGBge3J9CnJlc3VsdHMgPC0gcHJlZHMgJT4lIAogIHNlbGVjdChCT1JPVUdILCBMQVRJVFVERSwgTE9OR0lUVURFLCBQUkVESUNUSU9OX05BTUUsIGNvcnJlY3QpICU+JSAKICBjb2xsZWN0KCkKYGBgCgpXZSBub3cgaGF2ZSBhIGxvY2FsIHRhYmxlIHRoYXQgaGFzIDEwMGsgcmVzdWx0cyB3aXRoIHRoZWlyIHByZWRpY3Rpb25zIGFsb25nc2lkZSB0aGVtLiBXZSBjYW4gcGxvdCBvdXQgdGhlIHRydWUgdmFsdWVzIGZvciB0aGVzZSAxMDBrIHJvd3M6CgpgYGB7cn0KcmVzdWx0cyAlPiUgCiAgZ2dwbG90KGFlcyhMT05HSVRVREUsIExBVElUVURFLCBjb2xvcj1hcy5mYWN0b3IoUFJFRElDVElPTl9OQU1FKSkpICsgCiAgICBnZW9tX3BvaW50KGFscGhhPTAuMiwgc2l6ZT0xKSArIAogICAgZ3VpZGVzKGNvbG91ciA9IGd1aWRlX2xlZ2VuZChvdmVycmlkZS5hZXMgPSBsaXN0KGFscGhhID0gMSkpKQpgYGAKCk9yIHdlIGNhbiBwbG90IHdoZXJlIG91ciBNTGxpYiBwcmVkaWN0aW9uIHdhcyBpbi9jb3JyZWN0OgoKYGBge3J9CnJlc3VsdHMgJT4lIAogIGdncGxvdChhZXMoTE9OR0lUVURFLCBMQVRJVFVERSwgY29sb3I9YXMuZmFjdG9yKGNvcnJlY3QpKSkgKyBnZW9tX3BvaW50KGFscGhhPTAuMiwgc2l6ZT0xKSArIGd1aWRlcyhjb2xvdXIgPSBndWlkZV9sZWdlbmQob3ZlcnJpZGUuYWVzID0gbGlzdChhbHBoYSA9IDEpKSkKYGBgCgpQcmV0dHkgY2xvc2UhIEluIHRvdGFsLCB3ZSBnb3QgYHIgKDEgLSAoc3VtKCFyZXN1bHRzJGNvcnJlY3QpIC8gbnJvdyhyZXN1bHRzKSkpICogMTAwYCUgY29ycmVjdC4KCmBgYHtyfQojIHNwYXJrX2Rpc2Nvbm5lY3Qoc2MpCmBgYA==