Apache Spark is one of the hottest products in data science.
Spark 1.4.0 has formally adopted SparkR package which enables to handle Spark DataFrames on R.(See this article)
SparkR is very useful and powerful.
One of the reasons is that SparkR DataFrames present an API similar to dplyr.
For example:
df <- createDataFrame(sqlContext, iris)
df %>%
select("Sepal_Length", "Species") %>%
filter(df$Sepal_Length >= 5.5) %>%
group_by(df$Species) %>%
summarize(count = n(df$Sepal_Length), mean = mean(df$Sepal_Length)) %>%
collect
## Species count mean
## 1 versicolor 44 6.050000
## 2 setosa 5 5.640000
## 3 virginica 49 6.622449
This is very cool. But I have a little discontent.
One of the reasons that dplyr is so much popular is the functions adopts NSE(non-standard evaluation).
library(dplyr)
iris %>%
select(Sepal.Length, Species) %>%
filter(Sepal.Length >= 5.5) %>%
group_by(Species) %>%
summarize(count = n(), mean = mean(Sepal.Length))
## Source: local data frame [3 x 3]
##
## Species count mean
## 1 setosa 5 5.640000
## 2 versicolor 44 6.050000
## 3 virginica 49 6.622449
It’s very smart.
With NSE, you don’t need to type quotations or names of DataFrame that the columns belong to.
The package SparkRext have been created to make SparkR be closer to dplyr.
library(SparkRext)
df <- createDataFrame(sqlContext, iris)
df %>%
select(Sepal_Length, Species) %>%
filter(Sepal_Length >= 5.5) %>%
group_by(Species) %>%
summarize(count = n(Sepal_Length), mean = mean(Sepal_Length)) %>%
collect
## Species count mean
## 1 versicolor 44 6.050000
## 2 setosa 5 5.640000
## 3 virginica 49 6.622449
SparkRext redefines the functions of SparkR to enable NSE inputs.
As a result, the functions will be able to be used in the same way as dplyr.
The source code for SparkRext package is available on GitHub at
You can install the package from there.
install.packages("devtools") # if you have not installed "devtools" package
devtools::install_github("hoxo-m/SparkRext")
SparkRext redefines six functions on SparkR.
filter()
select()
mutate()
arrange()
summarize()
group_by()
In this section, these funcions are explained.
For illustration, let’s prepare data.
library(dplyr)
library(nycflights13)
set.seed(123)
data <- sample_n(flights, 10000)
library(SparkRext)
prior_library(SparkRext)
df <- createDataFrame(sqlContext, data.frame(data))
df %>% head
## year month day dep_time dep_delay arr_time arr_delay carrier tailnum
## 1 2013 12 15 2124 -4 2322 1 UA N801UA
## 2 2013 7 17 651 -9 936 -28 DL N194DN
## 3 2013 3 2 1636 1 1800 0 WN N475WN
## 4 2013 8 19 1058 -2 1203 -32 WN N765SW
## 5 2013 9 9 1251 -9 1412 3 US N963UW
## 6 2013 1 18 1259 -1 1556 -14 WN N654SW
## flight origin dest air_time distance hour minute
## 1 289 EWR DTW 88 488 21 24
## 2 763 JFK LAX 306 2475 6 51
## 3 1501 LGA MKE 103 738 16 36
## 4 51 LGA MDW 107 725 10 58
## 5 2148 LGA BOS 38 184 12 51
## 6 2239 EWR HOU 222 1411 12 59
filter()
filter()
is used to extract rows that the conditions specified are satisfied.
df %>% filter(month == 12, day == 31) %>% head
## year month day dep_time dep_delay arr_time arr_delay carrier tailnum
## 1 2013 12 31 1155 -5 1257 -11 B6 N216JB
## 2 2013 12 31 2211 12 100 15 B6 N715JB
## 3 2013 12 31 1504 9 1620 -5 MQ N501MQ
## 4 2013 12 31 2328 -2 412 3 B6 N651JB
## 5 2013 12 31 1922 -8 2116 1 MQ N501MQ
## 6 2013 12 31 849 -1 1225 -3 B6 N834JB
## flight origin dest air_time distance hour minute
## 1 316 JFK SYR 50 209 11 55
## 2 1183 JFK MCO 148 944 22 11
## 3 3425 JFK DCA 52 213 15 4
## 4 1389 EWR SJU 198 1608 23 28
## 5 3535 JFK CMH 82 483 19 22
## 6 15 JFK SFO 371 2586 8 49
df %>% filter(month == 12 | day == 31) %>% head
## year month day dep_time dep_delay arr_time arr_delay carrier tailnum
## 1 2013 12 15 2124 -4 2322 1 UA N801UA
## 2 2013 12 30 2031 -4 2351 4 DL N3743H
## 3 2013 12 16 1248 -4 1407 -4 EV N11548
## 4 2013 12 27 928 28 1307 48 DL N901DE
## 5 2013 12 7 1719 -10 2008 8 F9 N209FR
## 6 2013 12 10 NA NaN NA NaN EV N717EV
## flight origin dest air_time distance hour minute
## 1 289 EWR DTW 88 488 21 24
## 2 2065 JFK FLL 173 1069 20 31
## 3 6054 EWR IAD 49 212 12 48
## 4 2446 LGA FLL 186 1076 9 28
## 5 507 LGA DEN 269 1620 17 19
## 6 5245 LGA PIT NaN 335 NaN NaN
Note that filter()
of SparkR cannot accept multiple conditions at once.
select()
select()
is used to extract columns specified.
df %>% select(year, month, day) %>% head
## year month day
## 1 2013 12 15
## 2 2013 7 17
## 3 2013 3 2
## 4 2013 8 19
## 5 2013 9 9
## 6 2013 1 18
Continuous columns can be extracted using a colon :
.
df %>% select(year:day) %>% head
## year month day
## 1 2013 12 15
## 2 2013 7 17
## 3 2013 3 2
## 4 2013 8 19
## 5 2013 9 9
## 6 2013 1 18
You can use the minus sign -
to extract columns with the exception of columns specified.
df %>% select(-year, -month, -day) %>% head
## dep_time dep_delay arr_time arr_delay carrier tailnum flight origin dest
## 1 2124 -4 2322 1 UA N801UA 289 EWR DTW
## 2 651 -9 936 -28 DL N194DN 763 JFK LAX
## 3 1636 1 1800 0 WN N475WN 1501 LGA MKE
## 4 1058 -2 1203 -32 WN N765SW 51 LGA MDW
## 5 1251 -9 1412 3 US N963UW 2148 LGA BOS
## 6 1259 -1 1556 -14 WN N654SW 2239 EWR HOU
## air_time distance hour minute
## 1 88 488 21 24
## 2 306 2475 6 51
## 3 103 738 16 36
## 4 107 725 10 58
## 5 38 184 12 51
## 6 222 1411 12 59
You can also extract columns by using column numbers.
df %>% select(1, 2, 3) %>% head
## year month day
## 1 2013 12 15
## 2 2013 7 17
## 3 2013 3 2
## 4 2013 8 19
## 5 2013 9 9
## 6 2013 1 18
You can use the select utility functions in dplyr such as starts_with()
.
df %>% select(starts_with("arr")) %>% head
## arr_time arr_delay
## 1 2322 1
## 2 936 -28
## 3 1800 0
## 4 1203 -32
## 5 1412 3
## 6 1556 -14
All select utility functions is below.
starts_with(match, ignore.case = TRUE)
ends_with(match, ignore.case = TRUE)
contains(match, ignore.case = TRUE)
matches(match, ignore.case = TRUE)
num_range(prefix, range, width = NULL)
one_of(...)
everything()
Note that select()
of SparkR cannot accept a variety of input like this.
mutate()
mutate()
is used to add new columns.
df %>% mutate(gain = arr_delay - dep_delay, speed = distance / air_time * 60) %>% head
## year month day dep_time dep_delay arr_time arr_delay carrier tailnum
## 1 2013 12 15 2124 -4 2322 1 UA N801UA
## 2 2013 7 17 651 -9 936 -28 DL N194DN
## 3 2013 3 2 1636 1 1800 0 WN N475WN
## 4 2013 8 19 1058 -2 1203 -32 WN N765SW
## 5 2013 9 9 1251 -9 1412 3 US N963UW
## 6 2013 1 18 1259 -1 1556 -14 WN N654SW
## flight origin dest air_time distance hour minute gain speed
## 1 289 EWR DTW 88 488 21 24 5 332.7273
## 2 763 JFK LAX 306 2475 6 51 -19 485.2941
## 3 1501 LGA MKE 103 738 16 36 -1 429.9029
## 4 51 LGA MDW 107 725 10 58 -30 406.5421
## 5 2148 LGA BOS 38 184 12 51 12 290.5263
## 6 2239 EWR HOU 222 1411 12 59 -13 381.3514
Note that mutate()
of SparkR cannot accept multiple input at once.
Furthermore, mutate()
of SparkR cannot also reuse columns added like below.
df %>% mutate(gain = arr_delay - dep_delay, gain_per_hour = gain/(air_time/60)) %>% head
## year month day dep_time dep_delay arr_time arr_delay carrier tailnum
## 1 2013 12 15 2124 -4 2322 1 UA N801UA
## 2 2013 7 17 651 -9 936 -28 DL N194DN
## 3 2013 3 2 1636 1 1800 0 WN N475WN
## 4 2013 8 19 1058 -2 1203 -32 WN N765SW
## 5 2013 9 9 1251 -9 1412 3 US N963UW
## 6 2013 1 18 1259 -1 1556 -14 WN N654SW
## flight origin dest air_time distance hour minute gain gain_per_hour
## 1 289 EWR DTW 88 488 21 24 5 3.4090909
## 2 763 JFK LAX 306 2475 6 51 -19 -3.7254902
## 3 1501 LGA MKE 103 738 16 36 -1 -0.5825243
## 4 51 LGA MDW 107 725 10 58 -30 -16.8224299
## 5 2148 LGA BOS 38 184 12 51 12 18.9473684
## 6 2239 EWR HOU 222 1411 12 59 -13 -3.5135135
arrange()
arrange()
is used to sort rows by columns specified.
df %>% arrange(month, day) %>% head
## year month day dep_time dep_delay arr_time arr_delay carrier tailnum
## 1 2013 1 1 1353 -4 1549 24 EV N14105
## 2 2013 1 1 1832 4 2144 0 UA N18220
## 3 2013 1 1 602 -3 821 16 MQ N730MQ
## 4 2013 1 1 1416 5 1603 14 UA N456UA
## 5 2013 1 1 1127 -2 1303 -6 EV N14180
## 6 2013 1 1 2323 83 22 69 EV N13538
## flight origin dest air_time distance hour minute
## 1 4171 EWR MSN 152 799 13 53
## 2 1075 EWR SNA 342 2434 18 32
## 3 4401 LGA DTW 105 502 6 2
## 4 683 EWR ORD 136 719 14 16
## 5 4294 EWR RDU 73 416 11 27
## 6 4257 EWR BTV 44 266 23 23
It will be sorted in ascending order if you write just column names.
If you want to sort in descending order, you can use desc()
.
df %>% arrange(month, desc(day)) %>% head
## year month day dep_time dep_delay arr_time arr_delay carrier tailnum
## 1 2013 1 31 1424 -5 1752 -2 UA N512UA
## 2 2013 1 31 1853 -2 2149 7 DL N175DZ
## 3 2013 1 31 958 -2 1251 -30 UA N464UA
## 4 2013 1 31 1448 105 1635 131 B6 N292JB
## 5 2013 1 31 NA NaN NA NaN US
## 6 2013 1 31 1358 -7 1717 12 B6 N554JB
## flight origin dest air_time distance hour minute
## 1 257 JFK SFO 355 2586 14 24
## 2 951 JFK ATL 129 760 18 53
## 3 499 EWR SEA 324 2402 9 58
## 4 32 JFK ROC 54 264 14 48
## 5 1625 LGA CLT NaN 544 NaN NaN
## 6 63 JFK TPA 164 1005 13 58
You can also sort by values that are transformed from columns.
df %>% arrange(abs(dep_delay)) %>% head
## year month day dep_time dep_delay arr_time arr_delay carrier tailnum
## 1 2013 2 18 605 0 844 -23 B6 N629JB
## 2 2013 8 22 640 0 935 11 UA N36207
## 3 2013 3 13 1738 0 2002 -2 FL N944AT
## 4 2013 3 5 1840 0 2142 -23 DL N3772H
## 5 2013 10 4 1710 0 1821 -14 MQ N724MQ
## 6 2013 2 19 2130 0 2255 0 B6 N228JB
## flight origin dest air_time distance hour minute
## 1 501 JFK FLL 138 1069 6 5
## 2 1162 EWR TPA 138 997 6 40
## 3 806 LGA ATL 113 762 17 38
## 4 1643 JFK SEA 343 2422 18 40
## 5 3365 JFK DCA 43 213 17 10
## 6 104 JFK BUF 60 301 21 30
summarize()
summarize()
is used to collapse a DataFrame to a single row.
df %>% summarize(count = n(year)) %>% collect
## count
## 1 10000
Typically, summarize()
is used with group_by()
to collapse each group to a single row.
As far as I know, you can use the following functions in summarize()
.
n()
n_distinct()
approxCountDistinct()
mean()
first()
last()
It seems that other aggregate functions are available in Scala (See docs).
Like dplyr, you can use summarise()
instead of simmarize()
.
group_by()
group_by()
is used to describe how to break a DataFrame down into groups of rows.
Usually it is used with summarize()
to collapse each group to a single row.
df %>%
group_by(tailnum) %>%
summarize(mean_distance = mean(distance)) %>%
head
## tailnum mean_distance
## 1 N600LR 695.0
## 2 N3HAAA 1075.0
## 3 N77518 642.5
## 4 N66051 1400.0
## 5 N5DCAA 1089.0
## 6 N947DL 1016.5
You can indicate multiple colmuns.
df %>%
group_by(year, month, day) %>%
summarize(count = n(year)) %>%
arrange(year, month, day) %>%
head
## year month day count
## 1 2013 1 1 25
## 2 2013 1 2 29
## 3 2013 1 3 24
## 4 2013 1 4 30
## 5 2013 1 5 16
## 6 2013 1 6 35
Unlike dplyr, only summarize()
can receive the results of group_by()
.
To install SparkR 1.4.0, the next articles may be useful.
When you can load SparkR package, you will be also able to use SparkRext package.
# Preparation of data
library(dplyr)
library(nycflights13)
set.seed(123)
data <- sample_n(flights, 10000)
# Load library
library(SparkRext)
# Create Spark context and SQL context
sc <- sparkR.init(master="local")
sqlContext <- sparkRSQL.init(sc)
# Create DataFrame
df <- createDataFrame(sqlContext, data.frame(data))
# Forcing to use the functions of SparkRext
prior_library(SparkRext)
# Play with DataFrame
result <- df %>%
select(year:day, flight, distance) %>%
group_by(year, month, day) %>%
summarize(flight_mean = mean(flight), distance_mean = mean(distance)) %>%
filter(flight_mean >= 2000, distance_mean >= 1000) %>%
arrange(year, month, day) %>%
collect
# Print result
head(result)
## year month day flight_mean distance_mean
## 1 2013 1 13 2271.043 1025.261
## 2 2013 1 24 2331.600 1092.850
## 3 2013 2 25 2055.471 1074.529
## 4 2013 3 5 2003.000 1003.448
## 5 2013 3 8 2167.138 1048.448
## 6 2013 3 9 2023.280 1168.400
prior_library()
SparkRext has the functions with the same name as the functions of SparkR.
Therefore, SparkRext is sensitive to the order of loading of libraries.
In order to avoid confusion, there is prior_library()
function.
prior_library(SparkRext)
By doing this, the functions of SparkRext will be called with the highest priority.
You can confirm this by checking the search path:
head(search())
## [1] ".GlobalEnv" "package:SparkRext" "package:SparkR"
## [4] "package:nycflights13" "package:dplyr" "package:stats"
If you want to switch to SparkR, you can do it.
prior_library(SparkR)
head(search())
## [1] ".GlobalEnv" "package:SparkR" "package:SparkRext"
## [4] "package:nycflights13" "package:dplyr" "package:stats"
You can also switch to dplyr.
prior_library(dplyr)
head(search())
## [1] ".GlobalEnv" "package:dplyr" "package:SparkR"
## [4] "package:SparkRext" "package:nycflights13" "package:stats"
%>%
You can use pipe operator %>%
without loading magrittr or dplyr.
The pipe operator imports from pipeR package. (See pipeR)
The reason of it is that the pipe operator of pipeR is faster than magrittr.
I will show that below.
library(dplyr)
library(pipeR)
library(microbenchmark)
dplyr_pipe <- function() {
iris %>%
select(Sepal.Length, Species) %>%
filter(Sepal.Length >= 5.5) %>%
group_by(Species) %>%
summarize(count = n(), mean = mean(Sepal.Length))
}
pipeR_pipe <- function() {
iris %>>%
select(Sepal.Length, Species) %>>%
filter(Sepal.Length >= 5.5) %>>%
group_by(Species) %>>%
summarize(count = n(), mean = mean(Sepal.Length))
}
microbenchmark(
dplyr_pipe(),
pipeR_pipe()
)
## Unit: milliseconds
## expr min lq mean median uq max neval
## dplyr_pipe() 2.084006 2.243104 2.425786 2.315232 2.434530 3.972770 100
## pipeR_pipe() 1.827246 2.058515 2.173975 2.113719 2.179361 3.506113 100
If you want to use pipe operator on the others, please overwrite it.