Conexión de sparklyr en local.

Cargamos sparklyr y dplyr.

knitr::opts_chunk$set(include = FALSE)
library(sparklyr)
library(dplyr)

Inicializamos la conexión

sc <- spark_connect(master = "local", version = "2.0.0")

Conexión en un cluster

Para conectarlo con un cluster suele ser algo parecido a esto. Dependiendo de si Rstudio está instalado en el master o en otro nodo. Como siempre, esto puede variar dependiendo de la configuración del cluster.

Un ejemplo de cómo utilizar sparklyr para analizar un gran conjunto de datos se puede encontrar en https://beta.rstudioconnect.com/content/1705/taxiDemo.nb.html

.libPaths( c( "~/R/x86_64-pc-linux-gnu-library/3.3/", .libPaths()) )
# install.packages("sparklyr", lib = "~/R/x86_64-pc-linux-gnu-library/3.3/")

Sys.setenv(SPARK_HOME = "/opt/cloudera/parcels/CDH/lib/spark")
Sys.setenv(HADOOP_CONF_DIR = '/etc/hadoop/conf.cloudera.hdfs')
Sys.setenv(YARN_CONF_DIR = '/etc/hadoop/conf.cloudera.yarn')
library(sparklyr)
library(dplyr)
# sc <- spark_connect(master= "local", version= "1.6.0")
# sc <- spark_connect(master= "yarn-client", version= "1.6.0")

Lectura de datos.

Sparklyr puede leer datos de hive utilizando hive_context(sc) o de ficheros en local o hdfs. Vamos a leer un fichero en formato parquet que está en local. Se trata de los datos del censo de 2011 para Andalucía, que había bajado previamente utilizando el paquete MicroDatosEs de Carlos Gil

Utilizamos la función spark_read_parquet que lee los datos y crea un DataFrame de spark. Al asignarlo a un objeto se crea un tbl_spark que permite utilizar funciones de dplyr sobre un dataframe de Spark.

censo1_tbl <- spark_read_parquet(sc, "censo", path = "/home/jose/spark-warehouse/censo2/")
class(censo1_tbl)
[1] "tbl_spark" "tbl_sql"   "tbl_lazy"  "tbl"      
# podemos ver las Dataframes en spark
src_tbls(sc)
[1] "censo"

censo1_tbl no es un dataframe de R sino una conexión al dataframe de spark

censo1_tbl
Source:   query [?? x 9]
Database: spark connection master=local[4] app=sparklyr local=TRUE

    edad   sexo   cpro     ecivil    factor
   <dbl>  <chr>  <chr>      <chr>     <dbl>
1     77  Mujer Lleida      Viudo  1.413253
2     47  Mujer Lleida     Casado  1.351084
3     50 Hombre Lleida     Casado  1.351084
4     21 Hombre Lleida    Soltero  1.351084
5     15  Mujer Lleida    Soltero  1.351084
6     47  Mujer Lleida    Soltero 28.615887
7     33 Hombre Lleida Divorciado 28.615887
8     34 Hombre Lleida     Casado 28.615887
9     46  Mujer Lleida     Casado  6.735608
10    61 Hombre Lleida     Casado  6.735608
# ... with more rows, and 4 more variables: esreal <chr>, rela <chr>,
#   nhijos <dbl>, nocu <dbl>

Población total, según edad y provincia Filtramos por edad, agrupamos por edad y calculamos el total de población, factor es el factor de elevación del censo. También registramos la tabla resultante con sdf_register por si queremos utilizarla más tarde

total <- censo1_tbl %>%
   filter(edad > 20, edad < 70) %>%
   group_by(edad, cpro) %>%
   summarise(total = sum(factor)) %>%
   arrange(desc(total)) %>% 
   sdf_register("total")
total
Source:   query [?? x 3]
Database: spark connection master=local[4] app=sparklyr local=TRUE

    edad   cpro    total
   <dbl>  <chr>    <dbl>
1     34 Madrid 128186.8
2     36 Madrid 126499.4
3     35 Madrid 123598.4
4     37 Madrid 122807.4
5     38 Madrid 121143.1
6     33 Madrid 119484.8
7     39 Madrid 118050.5
8     32 Madrid 113179.5
9     40 Madrid 112198.5
10    41 Madrid 111313.0
# ... with more rows

En realidad utilizamos dplyr para mandar consultas SQL a la spark sql api. Podemos ver cómo es la consulta con sql_render()

sql_render(censo1_tbl %>%
   filter(edad > 20, edad < 70) %>%
   group_by(edad, cpro) %>%
   summarise(total = sum(factor)) %>%
   arrange(desc(total)))
<SQL> SELECT *
FROM (SELECT `edad`, `cpro`, SUM(`factor`) AS `total`
FROM (SELECT *
FROM `censo`
WHERE ((`edad` > 20.0) AND (`edad` < 70.0))) `ioaqfnvifv`
GROUP BY `edad`, `cpro`) `wmrrchzmha`
ORDER BY `total` DESC

Calculamos ahora el número de divorcios

divorces <- censo1_tbl %>%
   filter(edad > 20, edad < 70, ecivil == "Divorciado") %>%
   group_by(edad, cpro) %>%
   summarise(divorces = sum(factor)) %>%
   arrange(desc(divorces)) %>% 
   sdf_register("divorces")

Hacemos ahora un inner_join de las dos tablas, los join de dplyr no funcionan en spark 1.6, pero sí en spark 2.0 y superiores

divorces.by.age.pro <- total %>% 
   inner_join(divorces, by = c("edad","cpro")) %>%
   mutate(porcentaje = 100 * divorces/total) %>% 
   sdf_register("divorces_by_age_pro")
divorces.by.age.pro
Source:   query [?? x 5]
Database: spark connection master=local[4] app=sparklyr local=TRUE

    edad               cpro     total   divorces porcentaje
   <dbl>              <chr>     <dbl>      <dbl>      <dbl>
1     21   Alicante/Alacant 20232.707  31.285538 0.15462853
2     21            Almería  9207.686  13.390031 0.14542232
3     21            Badajoz  9217.113  27.175656 0.29483913
4     21          Barcelona 56022.898 157.924870 0.28189343
5     21            Bizkaia 10092.244  32.880117 0.32579591
6     21             Burgos  3633.439   1.154345 0.03177005
7     21 Castellón/Castelló  6204.103  20.862106 0.33626308
8     21        Ciudad Real  6539.515  12.316173 0.18833465
9     21          Coruña, A 10865.143  18.334196 0.16874327
10    21            Ourense  2635.842  13.270185 0.50345149
# ... with more rows

Podemos ahora graficarlo. ggplot2 necesita que los datos estén en un data.frame de R, así que utilizamos la función collect. Es importante antes de usar collect que nuestros datos caben en memoria.

library(ggplot2)
divorces.by.age.pro %>% 
   collect() %>%                             # con collect los traigo de spark a R
   ggplot(aes(x=edad,y=porcentaje,color=cpro)) + 
   geom_jitter(size=0.4) + 
   geom_smooth()+
   facet_wrap(~cpro) +
   labs(x ="Edad", y = "% Divorcios")

LS0tCnRpdGxlOiAiRWplbXBsbyBTcGFya2x5ciIKb3V0cHV0OgogIGh0bWxfbm90ZWJvb2s6IGRlZmF1bHQKICBodG1sX2RvY3VtZW50OiBkZWZhdWx0Ci0tLQoKIyMgQ29uZXhpw7NuIGRlIHNwYXJrbHlyIGVuIGxvY2FsLgoKQ2FyZ2Ftb3Mgc3BhcmtseXIgeSBkcGx5ci4KCmBgYHtyIHNldHVwLCBpbmNsdWRlID0gVFJVRX0Ka25pdHI6Om9wdHNfY2h1bmskc2V0KGluY2x1ZGUgPSBGQUxTRSkKbGlicmFyeShzcGFya2x5cikKbGlicmFyeShkcGx5cikKCmBgYAoKKipJbmljaWFsaXphbW9zIGxhIGNvbmV4acOzbioqCgpgYGB7cn0Kc2MgPC0gc3BhcmtfY29ubmVjdChtYXN0ZXIgPSAibG9jYWwiLCB2ZXJzaW9uID0gIjIuMC4wIikKYGBgCgojIyBDb25leGnDs24gZW4gdW4gY2x1c3RlcgoKIFBhcmEgY29uZWN0YXJsbyBjb24gdW4gY2x1c3RlciBzdWVsZSBzZXIgYWxnbyBwYXJlY2lkbyBhIGVzdG8uIERlcGVuZGllbmRvIGRlIHNpIFJzdHVkaW8gZXN0w6EgaW5zdGFsYWRvIGVuIGVsIG1hc3RlciBvIGVuIG90cm8gbm9kby4gQ29tbyBzaWVtcHJlLCBlc3RvIHB1ZWRlIHZhcmlhciBkZXBlbmRpZW5kbyBkZSBsYSBjb25maWd1cmFjacOzbiBkZWwgY2x1c3Rlci4KClVuIGVqZW1wbG8gZGUgY8OzbW8gdXRpbGl6YXIgYHNwYXJrbHlyYCBwYXJhIGFuYWxpemFyIHVuIGdyYW4gY29uanVudG8gZGUgZGF0b3Mgc2UgcHVlZGUgZW5jb250cmFyIGVuIGh0dHBzOi8vYmV0YS5yc3R1ZGlvY29ubmVjdC5jb20vY29udGVudC8xNzA1L3RheGlEZW1vLm5iLmh0bWwKIAogCmBgYHtyLCBldmFsPUZBTFNFfQoubGliUGF0aHMoIGMoICJ+L1IveDg2XzY0LXBjLWxpbnV4LWdudS1saWJyYXJ5LzMuMy8iLCAubGliUGF0aHMoKSkgKQojIGluc3RhbGwucGFja2FnZXMoInNwYXJrbHlyIiwgbGliID0gIn4vUi94ODZfNjQtcGMtbGludXgtZ251LWxpYnJhcnkvMy4zLyIpCgpTeXMuc2V0ZW52KFNQQVJLX0hPTUUgPSAiL29wdC9jbG91ZGVyYS9wYXJjZWxzL0NESC9saWIvc3BhcmsiKQpTeXMuc2V0ZW52KEhBRE9PUF9DT05GX0RJUiA9ICcvZXRjL2hhZG9vcC9jb25mLmNsb3VkZXJhLmhkZnMnKQpTeXMuc2V0ZW52KFlBUk5fQ09ORl9ESVIgPSAnL2V0Yy9oYWRvb3AvY29uZi5jbG91ZGVyYS55YXJuJykKbGlicmFyeShzcGFya2x5cikKbGlicmFyeShkcGx5cikKIyBzYyA8LSBzcGFya19jb25uZWN0KG1hc3Rlcj0gImxvY2FsIiwgdmVyc2lvbj0gIjEuNi4wIikKIyBzYyA8LSBzcGFya19jb25uZWN0KG1hc3Rlcj0gInlhcm4tY2xpZW50IiwgdmVyc2lvbj0gIjEuNi4wIikKCmBgYAoKIyMgTGVjdHVyYSBkZSBkYXRvcy4gCgpTcGFya2x5ciBwdWVkZSBsZWVyIGRhdG9zIGRlIGhpdmUgIHV0aWxpemFuZG8gYGhpdmVfY29udGV4dChzYylgIG8gZGUgZmljaGVyb3MgZW4gbG9jYWwgbyBoZGZzLiAKVmFtb3MgYSBsZWVyIHVuIGZpY2hlcm8gZW4gZm9ybWF0byBgcGFycXVldGAgIHF1ZSBlc3TDoSBlbiBsb2NhbC4gU2UgdHJhdGEgZGUgbG9zIGRhdG9zIGRlbCBjZW5zbyBkZSAyMDExIHBhcmEgQW5kYWx1Y8OtYSwgcXVlIGhhYsOtYSBiYWphZG8gcHJldmlhbWVudGUgdXRpbGl6YW5kbyBlbCBwYXF1ZXRlIGBNaWNyb0RhdG9zRXNgIGRlIENhcmxvcyBHaWwKClV0aWxpemFtb3MgbGEgZnVuY2nDs24gYHNwYXJrX3JlYWRfcGFycXVldGAgcXVlIGxlZSBsb3MgZGF0b3MgeSBjcmVhIHVuIERhdGFGcmFtZSBkZSBzcGFyay4gQWwgYXNpZ25hcmxvIGEgdW4gb2JqZXRvIHNlIGNyZWEgdW4gYHRibF9zcGFya2AgcXVlIHBlcm1pdGUgdXRpbGl6YXIgZnVuY2lvbmVzIGRlIGBkcGx5cmAgc29icmUgdW4gZGF0YWZyYW1lIGRlIFNwYXJrLiAKCmBgYHtyLGNhY2hlPVRSVUV9CmNlbnNvMV90YmwgPC0gc3BhcmtfcmVhZF9wYXJxdWV0KHNjLCAiY2Vuc28iLCBwYXRoID0gIi9ob21lL2pvc2Uvc3Bhcmstd2FyZWhvdXNlL2NlbnNvMi8iKQpjbGFzcyhjZW5zbzFfdGJsKQojIHBvZGVtb3MgdmVyIGxhcyBEYXRhZnJhbWVzIGVuIHNwYXJrCnNyY190YmxzKHNjKQoKYGBgCgpjZW5zbzFfdGJsIG5vIGVzIHVuIGRhdGFmcmFtZSBkZSBSIHNpbm8gdW5hIGNvbmV4acOzbiBhbCBkYXRhZnJhbWUgZGUgc3BhcmsKYGBge3J9CmNlbnNvMV90YmwKYGBgCgpQb2JsYWNpw7NuIHRvdGFsLCBzZWfDum4gZWRhZCB5IHByb3ZpbmNpYQpGaWx0cmFtb3MgcG9yIGVkYWQsIGFncnVwYW1vcyBwb3IgZWRhZCB5IGNhbGN1bGFtb3MgZWwgdG90YWwgZGUgcG9ibGFjacOzbiwgYGZhY3RvcmAgZXMgZWwgZmFjdG9yIGRlIGVsZXZhY2nDs24gZGVsIGNlbnNvLiBUYW1iacOpbiByZWdpc3RyYW1vcyBsYSB0YWJsYSByZXN1bHRhbnRlIGNvbiBgc2RmX3JlZ2lzdGVyYCBwb3Igc2kgcXVlcmVtb3MgdXRpbGl6YXJsYSBtw6FzIHRhcmRlCgpgYGB7cn0KdG90YWwgPC0gY2Vuc28xX3RibCAlPiUKICAgZmlsdGVyKGVkYWQgPiAyMCwgZWRhZCA8IDcwKSAlPiUKICAgZ3JvdXBfYnkoZWRhZCwgY3BybykgJT4lCiAgIHN1bW1hcmlzZSh0b3RhbCA9IHN1bShmYWN0b3IpKSAlPiUKICAgYXJyYW5nZShkZXNjKHRvdGFsKSkgJT4lIAogICBzZGZfcmVnaXN0ZXIoInRvdGFsIikKdG90YWwKYGBgCkVuIHJlYWxpZGFkIHV0aWxpemFtb3MgZHBseXIgcGFyYSBtYW5kYXIgY29uc3VsdGFzIFNRTCBhIGxhIHNwYXJrIHNxbCBhcGkuIFBvZGVtb3MgdmVyIGPDs21vIGVzIGxhIGNvbnN1bHRhIGNvbiBgc3FsX3JlbmRlcigpYAoKYGBge3J9CnNxbF9yZW5kZXIoY2Vuc28xX3RibCAlPiUKICAgZmlsdGVyKGVkYWQgPiAyMCwgZWRhZCA8IDcwKSAlPiUKICAgZ3JvdXBfYnkoZWRhZCwgY3BybykgJT4lCiAgIHN1bW1hcmlzZSh0b3RhbCA9IHN1bShmYWN0b3IpKSAlPiUKICAgYXJyYW5nZShkZXNjKHRvdGFsKSkpCmBgYAoKQ2FsY3VsYW1vcyBhaG9yYSBlbCBuw7ptZXJvIGRlIGRpdm9yY2lvcwoKYGBge3J9CmRpdm9yY2VzIDwtIGNlbnNvMV90YmwgJT4lCiAgIGZpbHRlcihlZGFkID4gMjAsIGVkYWQgPCA3MCwgZWNpdmlsID09ICJEaXZvcmNpYWRvIikgJT4lCiAgIGdyb3VwX2J5KGVkYWQsIGNwcm8pICU+JQogICBzdW1tYXJpc2UoZGl2b3JjZXMgPSBzdW0oZmFjdG9yKSkgJT4lCiAgIGFycmFuZ2UoZGVzYyhkaXZvcmNlcykpICU+JSAKICAgc2RmX3JlZ2lzdGVyKCJkaXZvcmNlcyIpCmBgYAoKSGFjZW1vcyBhaG9yYSB1biBgaW5uZXJfam9pbmAgZGUgbGFzIGRvcyB0YWJsYXMsIGxvcyBqb2luIGRlIGRwbHlyIG5vIGZ1bmNpb25hbiBlbiBzcGFyayAxLjYsIHBlcm8gc8OtIGVuIHNwYXJrIDIuMCB5IHN1cGVyaW9yZXMKCmBgYHtyfQpkaXZvcmNlcy5ieS5hZ2UucHJvIDwtIHRvdGFsICU+JSAKICAgaW5uZXJfam9pbihkaXZvcmNlcywgYnkgPSBjKCJlZGFkIiwiY3BybyIpKSAlPiUKICAgbXV0YXRlKHBvcmNlbnRhamUgPSAxMDAgKiBkaXZvcmNlcy90b3RhbCkgJT4lIAogICBzZGZfcmVnaXN0ZXIoImRpdm9yY2VzX2J5X2FnZV9wcm8iKQpkaXZvcmNlcy5ieS5hZ2UucHJvCgpgYGAKClBvZGVtb3MgYWhvcmEgZ3JhZmljYXJsby4gYGdncGxvdDJgIG5lY2VzaXRhIHF1ZSBsb3MgZGF0b3MgZXN0w6luIGVuIHVuIGRhdGEuZnJhbWUgZGUgUiwgYXPDrSBxdWUgdXRpbGl6YW1vcyBsYSBmdW5jacOzbiBgY29sbGVjdGAuIEVzIGltcG9ydGFudGUgYW50ZXMgZGUgdXNhciBgY29sbGVjdGAgcXVlIG51ZXN0cm9zIGRhdG9zIGNhYmVuIGVuIG1lbW9yaWEuCgpgYGB7ciwgZmlnLndpZHRoPTEyfQpsaWJyYXJ5KGdncGxvdDIpCmRpdm9yY2VzLmJ5LmFnZS5wcm8gJT4lIAogICBjb2xsZWN0KCkgJT4lICAgICAgICAgICAgICAgICAgICAgICAgICAgICAjIGNvbiBjb2xsZWN0IGxvcyB0cmFpZ28gZGUgc3BhcmsgYSBSCiAgIGdncGxvdChhZXMoeD1lZGFkLHk9cG9yY2VudGFqZSxjb2xvcj1jcHJvKSkgKyAKICAgZ2VvbV9qaXR0ZXIoc2l6ZT0wLjQpICsgCiAgIGdlb21fc21vb3RoKCkrCiAgIGZhY2V0X3dyYXAofmNwcm8pICsKICAgbGFicyh4ID0iRWRhZCIsIHkgPSAiJSBEaXZvcmNpb3MiKQpgYGAKCgoKCg==