Cargamos sparklyr y dplyr.
knitr::opts_chunk$set(include = FALSE)
library(sparklyr)
library(dplyr)
Inicializamos la conexión
sc <- spark_connect(master = "local", version = "2.0.0")
Para conectarlo con un cluster suele ser algo parecido a esto. Dependiendo de si Rstudio está instalado en el master o en otro nodo. Como siempre, esto puede variar dependiendo de la configuración del cluster.
Un ejemplo de cómo utilizar sparklyr
para analizar un gran conjunto de datos se puede encontrar en https://beta.rstudioconnect.com/content/1705/taxiDemo.nb.html
.libPaths( c( "~/R/x86_64-pc-linux-gnu-library/3.3/", .libPaths()) )
# install.packages("sparklyr", lib = "~/R/x86_64-pc-linux-gnu-library/3.3/")
Sys.setenv(SPARK_HOME = "/opt/cloudera/parcels/CDH/lib/spark")
Sys.setenv(HADOOP_CONF_DIR = '/etc/hadoop/conf.cloudera.hdfs')
Sys.setenv(YARN_CONF_DIR = '/etc/hadoop/conf.cloudera.yarn')
library(sparklyr)
library(dplyr)
# sc <- spark_connect(master= "local", version= "1.6.0")
# sc <- spark_connect(master= "yarn-client", version= "1.6.0")
Sparklyr puede leer datos de hive utilizando hive_context(sc)
o de ficheros en local o hdfs. Vamos a leer un fichero en formato parquet
que está en local. Se trata de los datos del censo de 2011 para AndalucÃa, que habÃa bajado previamente utilizando el paquete MicroDatosEs
de Carlos Gil
Utilizamos la función spark_read_parquet
que lee los datos y crea un DataFrame de spark. Al asignarlo a un objeto se crea un tbl_spark
que permite utilizar funciones de dplyr
sobre un dataframe de Spark.
censo1_tbl <- spark_read_parquet(sc, "censo", path = "/home/jose/spark-warehouse/censo2/")
class(censo1_tbl)
[1] "tbl_spark" "tbl_sql" "tbl_lazy" "tbl"
# podemos ver las Dataframes en spark
src_tbls(sc)
[1] "censo"
censo1_tbl no es un dataframe de R sino una conexión al dataframe de spark
censo1_tbl
Source: query [?? x 9]
Database: spark connection master=local[4] app=sparklyr local=TRUE
edad sexo cpro ecivil factor
<dbl> <chr> <chr> <chr> <dbl>
1 77 Mujer Lleida Viudo 1.413253
2 47 Mujer Lleida Casado 1.351084
3 50 Hombre Lleida Casado 1.351084
4 21 Hombre Lleida Soltero 1.351084
5 15 Mujer Lleida Soltero 1.351084
6 47 Mujer Lleida Soltero 28.615887
7 33 Hombre Lleida Divorciado 28.615887
8 34 Hombre Lleida Casado 28.615887
9 46 Mujer Lleida Casado 6.735608
10 61 Hombre Lleida Casado 6.735608
# ... with more rows, and 4 more variables: esreal <chr>, rela <chr>,
# nhijos <dbl>, nocu <dbl>
Población total, según edad y provincia Filtramos por edad, agrupamos por edad y calculamos el total de población, factor
es el factor de elevación del censo. También registramos la tabla resultante con sdf_register
por si queremos utilizarla más tarde
total <- censo1_tbl %>%
filter(edad > 20, edad < 70) %>%
group_by(edad, cpro) %>%
summarise(total = sum(factor)) %>%
arrange(desc(total)) %>%
sdf_register("total")
total
Source: query [?? x 3]
Database: spark connection master=local[4] app=sparklyr local=TRUE
edad cpro total
<dbl> <chr> <dbl>
1 34 Madrid 128186.8
2 36 Madrid 126499.4
3 35 Madrid 123598.4
4 37 Madrid 122807.4
5 38 Madrid 121143.1
6 33 Madrid 119484.8
7 39 Madrid 118050.5
8 32 Madrid 113179.5
9 40 Madrid 112198.5
10 41 Madrid 111313.0
# ... with more rows
En realidad utilizamos dplyr para mandar consultas SQL a la spark sql api. Podemos ver cómo es la consulta con sql_render()
sql_render(censo1_tbl %>%
filter(edad > 20, edad < 70) %>%
group_by(edad, cpro) %>%
summarise(total = sum(factor)) %>%
arrange(desc(total)))
<SQL> SELECT *
FROM (SELECT `edad`, `cpro`, SUM(`factor`) AS `total`
FROM (SELECT *
FROM `censo`
WHERE ((`edad` > 20.0) AND (`edad` < 70.0))) `ioaqfnvifv`
GROUP BY `edad`, `cpro`) `wmrrchzmha`
ORDER BY `total` DESC
Calculamos ahora el número de divorcios
divorces <- censo1_tbl %>%
filter(edad > 20, edad < 70, ecivil == "Divorciado") %>%
group_by(edad, cpro) %>%
summarise(divorces = sum(factor)) %>%
arrange(desc(divorces)) %>%
sdf_register("divorces")
Hacemos ahora un inner_join
de las dos tablas, los join de dplyr no funcionan en spark 1.6, pero sà en spark 2.0 y superiores
divorces.by.age.pro <- total %>%
inner_join(divorces, by = c("edad","cpro")) %>%
mutate(porcentaje = 100 * divorces/total) %>%
sdf_register("divorces_by_age_pro")
divorces.by.age.pro
Source: query [?? x 5]
Database: spark connection master=local[4] app=sparklyr local=TRUE
edad cpro total divorces porcentaje
<dbl> <chr> <dbl> <dbl> <dbl>
1 21 Alicante/Alacant 20232.707 31.285538 0.15462853
2 21 AlmerÃa 9207.686 13.390031 0.14542232
3 21 Badajoz 9217.113 27.175656 0.29483913
4 21 Barcelona 56022.898 157.924870 0.28189343
5 21 Bizkaia 10092.244 32.880117 0.32579591
6 21 Burgos 3633.439 1.154345 0.03177005
7 21 Castellón/Castelló 6204.103 20.862106 0.33626308
8 21 Ciudad Real 6539.515 12.316173 0.18833465
9 21 Coruña, A 10865.143 18.334196 0.16874327
10 21 Ourense 2635.842 13.270185 0.50345149
# ... with more rows
Podemos ahora graficarlo. ggplot2
necesita que los datos estén en un data.frame de R, asà que utilizamos la función collect
. Es importante antes de usar collect
que nuestros datos caben en memoria.
library(ggplot2)
divorces.by.age.pro %>%
collect() %>% # con collect los traigo de spark a R
ggplot(aes(x=edad,y=porcentaje,color=cpro)) +
geom_jitter(size=0.4) +
geom_smooth()+
facet_wrap(~cpro) +
labs(x ="Edad", y = "% Divorcios")