Cargamos sparklyr y dplyr
library(sparklyr)
library(dplyr)
library(ggplot2)
Inicializamos la conexión
sc <- spark_connect(master = "local", version = "2.0.0")
Sparklyr puede leer datos de hive utilizando hive_context(sc)
o de ficheros en local o hdfs. Vamos a leer un fichero en formato parquet
que está en local. Se trata de los datos del censo de 2011 para Andalucía, que había bajado previamente utilizando el paquete MicroDatosEs
de Carlos Gil
Utilizamos la función spark_read_parquet
que lee los datos y crea un DataFrame de spark. Al asignarlo a un objeto se crea un tbl_spark
que permite utilizar funciones de dplyr
sobre un dataframe de Spark.
censo1_tbl <- spark_read_parquet(sc, "censo1", path = "/home/jose/spark-warehouse/censo2/")
class(censo1_tbl)
[1] "tbl_spark" "tbl_sql" "tbl_lazy" "tbl"
censo1_tbl no es un dataframe de R sino una conexión al dataframe de spark
censo1_tbl
Source: query [?? x 9]
Database: spark connection master=local[4] app=sparklyr local=TRUE
edad sexo cpro ecivil factor
<dbl> <chr> <chr> <chr> <dbl>
1 77 Mujer Lleida Viudo 1.413253
2 47 Mujer Lleida Casado 1.351084
3 50 Hombre Lleida Casado 1.351084
4 21 Hombre Lleida Soltero 1.351084
5 15 Mujer Lleida Soltero 1.351084
6 47 Mujer Lleida Soltero 28.615887
7 33 Hombre Lleida Divorciado 28.615887
8 34 Hombre Lleida Casado 28.615887
9 46 Mujer Lleida Casado 6.735608
10 61 Hombre Lleida Casado 6.735608
# ... with more rows, and 4 more variables: esreal <chr>, rela <chr>,
# nhijos <dbl>, nocu <dbl>
Creamos dataset para el modelo. Es importante que el dataset no tenga perdidos, porque puede dar error. Se supone que en la versión de desarrollo de sparklyr está solucionado, pero a mi no me ha funcionado
mod_dataset <- censo1_tbl %>%
filter(edad > 20, edad < 70, !is.na(edad),!is.na(rela), !is.na(esreal), !is.na(nhijos), !is.na(nocu)) %>%
mutate(respuesta= as.character(ifelse( ecivil == "Divorciado", 1,0))) %>%
sdf_register("mod_dataset")
summarise(mod_dataset, n())
Source: query [?? x 1]
Database: spark connection master=local[4] app=sparklyr local=TRUE
`count(1)`
<dbl>
1 921239
Con sdf_partion
creamos datos para training y para test
partitions <- mod_dataset %>% sdf_partition(training = 0.7, test = 0.3, seed = 42)
partitions$training %>% group_by(cpro) %>% summarise(n(),prop_divorc = mean(respuesta))
Source: query [?? x 3]
Database: spark connection master=local[4] app=sparklyr local=TRUE
cpro `count(1)` prop_divorc
<chr> <dbl> <dbl>
1 Madrid 75864 0.06858325
2 Huelva 7749 0.05187766
3 Barcelona 66326 0.07633507
4 Zamora 6154 0.02941176
5 Burgos 7647 0.03295410
6 Cantabria 8704 0.05641085
7 Ciudad Real 7849 0.03822143
8 Cuenca 5801 0.02982244
9 Guadalajara 5379 0.04982339
10 Santa Cruz de Tenerife 10034 0.09358182
# ... with more rows
partitions$test %>% group_by(cpro) %>% summarise(n(),prop_divorc = mean(respuesta))
Source: query [?? x 3]
Database: spark connection master=local[4] app=sparklyr local=TRUE
cpro `count(1)` prop_divorc
<chr> <dbl> <dbl>
1 Madrid 32607 0.06780753
2 Huelva 3330 0.05165165
3 Barcelona 28271 0.07742917
4 Zamora 2571 0.03072734
5 Burgos 3202 0.04184884
6 Cantabria 3779 0.05398254
7 Ciudad Real 3409 0.02992080
8 Cuenca 2469 0.03280680
9 Santa Cruz de Tenerife 4211 0.09380195
10 Ávila 2050 0.02878049
# ... with more rows
Aunque incorpora interfaz fórmula, da problemas con variables categóricas según la versión de spark y sparklyr instalada. Lo siguiente no funciona con la versión de desarrollo de sparklyr ni con algunas versiones de spark
mlformula <- formula(respuesta ~ edad + esreal+ rela+ nhijos + nocu)
mlogis <- partitions$training %>% ml_logistic_regression(mlformula)
summary(mlogis)
Call: ml_logistic_regression(., mlformula)
Coefficients:
(Intercept)
-1.555407668
edad
0.006485887
esreal_Diplomatura universitaria, Arquitectura Técnica, Ingeniería Técnica o equivalente
-0.309846995
esreal_Doctorado
-0.190056222
esreal_FP grado medio, FP I, Oficialía industrial o equivalente, Grado Medio de Música y Danza, Certificados de Escuelas Oficiales de Idiomas
-0.018245016
esreal_FP grado superior, FP II, Maestría industrial o equivalente
-0.221566091
esreal_Fue a la escuela 5 o más años pero no llegó al último curso de ESO, EGB o Bachiller Elemental
-0.699136112
esreal_Grado Universitario o equivalente
-0.025673079
esreal_Licenciatura, Arquitectura, Ingeniería o equivalente
-0.302889805
esreal_Llegó al último curso de ESO, EGB o Bachiller Elemental o tiene el Certificado de Escolaridad o de Estudios Primarios
-0.382232763
esreal_Máster oficial universitario (a partir de 2006), Especialidades Médicas o análogos
-0.055593503
esreal_No sabe leer o escribir
-1.092034826
esreal_Sabe leer y escribir pero fue menos de 5 años a la escuela
-1.106870226
rela_Ocupado
1.998901013
rela_Otra situación
-0.475906522
rela_Parado buscando su primer empleo
0.310737238
rela_Parado que ha trabajado antes
0.755599099
rela_Persona con invalidez laboral permanente
1.119436453
nhijos
-0.055198650
nocu
-1.111598930
Con sdf_predict
hacemos predicción sobre el conjunto de test. Hay un pequeño bug cuando el modelo es ml_logistic, no se muestran bien las probabilidades, con ml_generalized si funciona bien, pero ml_generalized no permite parámetros de regularización.
pred <- sdf_predict(mlogis, partitions$test)
Para ver las probabilidade hay que llevarlas a R. Esto es un problema con grandes datos, hay abierta un issue en github, supongo que en próximas versiones lo arreglarán
prueba <- select(pred, probability) %>% head(10) %>% collect
prueba
prueba$probability[1]
[[1]]
<jobj[388]>
class org.apache.spark.ml.linalg.DenseVector
[0.9514041354980373,0.04859586450196277]
Evaluamos el modelo con ml_binary_classification_eval
. Por defecto calcula el AUC
ml_binary_classification_eval(pred, "respuesta", "probability")
[1] 0.716193
Hay diferentes modelos accesibles, ver ayuda del paquete
mgbm <- partitions$training %>% ml_gradient_boosted_trees(mlformula, type="classification")
summary(mgbm)
Length Class Mode
features 19 -none- character
response 1 -none- character
max.bins 1 -none- numeric
max.depth 1 -none- numeric
trees 20 -none- list
data 2 spark_jobj environment
ml.options 5 ml_options list
categorical.transformations 2 -none- environment
model.parameters 5 -none- list
.call 4 -none- call
.model 2 spark_jobj environment
Importancia de las variables
importancia <- ml_tree_feature_importance(sc,mgbm)
importancia$feature <- iconv(importancia$feature, to="ASCII//TRANSLIT")
importancia$importance <- as.numeric(as.character(importancia$importance))
importancia$featurebis <- stringr::str_trunc(importancia$feature,50, side="right")
ggplot(importancia, aes(x=reorder(featurebis, importance), y= importance)) + geom_bar(stat="identity") + coord_flip()
El modelo de gbm no devuelve las probabilidades sino la clase predicha, esto es un problema.
predgbm <- mgbm %>% sdf_predict(partitions$test)
predgbm %>% group_by(cpro) %>% summarise(mean(respuesta), mean(prediction))
Source: query [?? x 3]
Database: spark connection master=local[4] app=sparklyr local=TRUE
cpro `avg(CAST(respuesta AS DOUBLE))` `avg(prediction)`
<chr> <dbl> <dbl>
1 Madrid 0.06780753 0
2 Huelva 0.05165165 0
3 Barcelona 0.07742917 0
4 Zamora 0.03072734 0
5 Burgos 0.04184884 0
6 Cantabria 0.05398254 0
7 Ciudad Real 0.02992080 0
8 Cuenca 0.03280680 0
9 Santa Cruz de Tenerife 0.09380195 0
10 Ávila 0.02878049 0
# ... with more rows
mrf <- partitions$training %>% ml_random_forest(mlformula, type="classification")
summary(mrf)
Length Class Mode
features 19 -none- character
response 1 -none- character
max.bins 1 -none- numeric
max.depth 1 -none- numeric
num.trees 1 -none- numeric
feature.importances 19 -none- numeric
trees 20 -none- list
data 2 spark_jobj environment
ml.options 5 ml_options list
categorical.transformations 2 -none- environment
model.parameters 5 -none- list
.call 4 -none- call
.model 2 spark_jobj environment
importancia <- ml_tree_feature_importance(sc,mrf)
importancia$feature <- iconv(importancia$feature, to="ASCII//TRANSLIT")
importancia$importance <- as.numeric(as.character(importancia$importance))
importancia$featurebis <- stringr::str_trunc(importancia$feature,50, side="right")
ggplot(importancia, aes(x=reorder(featurebis, importance), y= importance)) + geom_bar(stat="identity") + coord_flip()
predrf <- mrf %>% sdf_predict(partitions$test)
ml_random_forest
si calcula las probabilidades, pero tenemos el mismo problema que antes, no lo muestra bien en R. Aunque se puede extraer.
predrf %>% select(respuesta, rawPrediction, probability, prediction)
Source: query [?? x 4]
Database: spark connection master=local[4] app=sparklyr local=TRUE
respuesta rawPrediction probability prediction
<chr> <list> <list> <dbl>
1 0.0 <S3: spark_jobj> <S3: spark_jobj> 0
2 0.0 <S3: spark_jobj> <S3: spark_jobj> 0
3 0.0 <S3: spark_jobj> <S3: spark_jobj> 0
4 0.0 <S3: spark_jobj> <S3: spark_jobj> 0
5 0.0 <S3: spark_jobj> <S3: spark_jobj> 0
6 0.0 <S3: spark_jobj> <S3: spark_jobj> 0
7 0.0 <S3: spark_jobj> <S3: spark_jobj> 0
8 0.0 <S3: spark_jobj> <S3: spark_jobj> 0
9 0.0 <S3: spark_jobj> <S3: spark_jobj> 0
10 0.0 <S3: spark_jobj> <S3: spark_jobj> 0
# ... with more rows
Guardo en R la columna de probabilidades
probs <- predrf %>% select(probability) %>% collect
Veo como es y utilizo invoke para llamar al método “toArray” en scala.
head(probs)
invoke(probs$probability[[1]], "toArray")[2]
[1] 0.0668815
# Es ineficiente extraerlas usando sapply por ejemplo
La función ml_binary_classification_eval
nos da el AUC
ml_binary_classification_eval(predrf, "respuesta", "probability")
[1] 0.7247844
Casi todos los modelos de MLlib están implementados en sparklyr, (naiveBayes, pca, kmeans, lda,survival regression, glm, decission trees, randomforest, gradient boosting machine) y si no hay alguno se puede implementar mediante extensiones en sparklyr
En mi opinión sparklyr es un gran paquete que quiere ser el pyspark para R. Lo mejor es la posibilidad de utilizar dplyr con Spark DataFrames. Se puede utilizar sparklyr junto con SparkR creando dos conexiones distintas. Aún le falta bastante para que sea cómodo trabajar con los algoritmos de MLlib. La mejor opción para realizar análisis utilizando Spark es H2O, afortunadamente, la gente de Rstudio ha sacado el paquete rsparkling
que habilita una conexión entre sparklyr y H2o sparkling water.