Configuración de sesión de Spark
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
Librerías a usar
#Pyspark libraries/modules
from pyspark.sql import *
from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql.types import StringType, DoubleType, IntegerType, ArrayType, DateType
#Python libraries/modules
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings("ignore")
from scipy import stats
from scipy.stats import kstest
#Pyspark Machine Learning libraries/modules
from pyspark.mllib.stat import Statistics
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier, LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.stat import Statistics
import re
Cargamos un dataset público con información de una campaña de marketing portuguesa para la venta de un préstamo bancario. El objetivo es, una vez limpiado el dataset, estudiarlo para extraer el máximo número de insights de los datos.
Información sobre las variables del dataset
Importación del dataset
df = spark.read.csv('bank-full.csv',sep=",",header = True,inferSchema = True)
df.toPandas().head()
Dimensiones del dataset
print("El dataset tiene",df.count(),"filas y ",len(df.columns),"columnas")
Balanceo de la variable Target
Frecuencias = (df.select('Target').groupBy('Target').count()
.withColumn('Porcentaje',F.round(F.col('count')*100/df.count(),3)))
Frecuencias.show()
df_frec = Frecuencias.toPandas()
plt.figure(figsize=(10,4))
plt.title("Frecuencia porcentual de variable Target")
sns.barplot(x=df_frec['Target'],y= df_frec['Porcentaje'])
plt.show()
Por medio de la tabla de frecuencias y el gráfico vemos que la variable Target se encuentra desbalanceada. la categoría no representa un 88% de las filas y la categoría yes sólo un 11,69% (redondeado 12%).
Formateo de las variables (strings y numéricas)
df.printSchema()
De forma preliminar vimos que variables como age, balance y duration tenían formato string pero agregando el argumento inferSchema en la función read.csv() detecta automaticamente los data types de estas columnas, sin embargo vemos que hay algunos casos que no fueron bien inferidos tales como month.
df.select('month').distinct().show()
Como podemos ver, month tiene un formato tipo string, por lo tanto por medio de un condicional vamos a convertir las etiquetas a número de meses, de esta forma la variable pasa de string a integer.
df = df.withColumn('month',F.when(F.col('month') == "jan", 1)
.when(F.col('month') == "feb", 2)
.when(F.col('month') == "mar", 3)
.when(F.col('month') == "apr", 4)
.when(F.col('month') == "may", 5)
.when(F.col('month') == "jun", 6)
.when(F.col('month') == "jul", 7)
.when(F.col('month') == "aug", 8)
.when(F.col('month') == "sep", 9)
.when(F.col('month') == "oct", 10)
.when(F.col('month') == "nov", 11)
.otherwise(12))
Finalmente, volvemos a ejecutar el printSchema() para verificar que todas las variables tengan su correspondiente data type.
df.printSchema()
data_types = list(df.dtypes)
data_types_df = spark.createDataFrame(data_types, ['variable', 'data_type']).select('data_type').groupBy('data_type').count().toPandas()
plt.figure(figsize=(10,4))
plt.title( "Cantidad de variables por Data type")
sns.barplot(x = data_types_df['data_type'],y = data_types_df['count'])
plt.show()
Hasta el momento tenemos mayot cantidad de variables string que integer, luego cuando se creen las variables dummies esta proporción va a cambiar.
antes = df.count()
df = df.dropDuplicates()
despues = df.count()
print(' Filas antes de chequear duplicados:',antes,'\n','Filas después de chequear duplicados:',despues,'\n','Total filas duplicadas:',antes-despues)
Por medio de dropDuplicates() vemos que no hay filas completas duplicadas en el dataset. Sin embargo tampoco contamos con un ID único ni nombre que permita realizar un chequeo a fondo.
Comprobación de columnas que no aportan información
1 - Comprobación para variables Cualitativas
Primero creamos un dataframe con variables cualitativas
cualitativas = [c for c,t in df.dtypes if t in ['string']]
cualitativas = df.select(cualitativas)
cualitativas.show(1)
Loop para generar tablas de frecuencias de las variables cualitativas
for col in cualitativas.columns:
cualitativas.select(col).groupBy(col).count()\
.withColumn('Porcentaje',F.round(F.col('count')*100/cualitativas.count(),3))\
.orderBy(F.col('Porcentaje').desc())\
.drop('count')\
.show()
2 - Comprobación para variables Cuantitativas
Primero creamos un dataframe con variables cuantitativas
numericas = [c for c,t in df.dtypes if t in ['int']]
numericas = df.select(numericas)
numericas.show(1)
Loop para generar tablas de frecuencias de las variables cualitativas
for col in numericas.columns:
numericas.select(col).groupBy(col).count()\
.withColumn('Porcentaje',F.round(F.col('count')*100/numericas.count(),3))\
.orderBy(F.col('Porcentaje').desc())\
.drop('count')\
.show(5)
Resumen de variables que no aportan mucha información:
Estableciendo un umbral de 80%, si una de sus categorías concentra este porcentaje decimos que la variable a priori no aporta mucha información ya que esta categoría se repite mayoritariamente a lo largo de toda la columna.
Estudio de la varianza para las variables numéricas
Método 1
numericas_pd = numericas.toPandas()
avgs = [F.avg(col).alias('avg_' + col) for col in numericas_pd]
maxs = [F.max(col).alias('max_' + col) for col in numericas_pd]
mins = [F.min(col).alias('min_' + col) for col in numericas_pd]
stds = [F.stddev(col).alias('std_' + col) for col in numericas_pd]
operations = avgs + stds + maxs + mins
resultados = df.select(operations).first()
for col in numericas_pd:
avg = resultados['avg_' + col]
std = resultados['std_' + col]
maxi = resultados['max_' + col]
mini = resultados['min_' + col]
print('{}: avg={}, std={}, min={}, max={}'\
.format(col, round(avg, 2), round(std, 2), mini, maxi))
Método 2: más automatizado
numericas_pd.describe()
Análisis gráfico de media y varianza:
Por medio de histogramas de las variables cuantitativas, vamos a observar la media y varianza de cada una de ellas para ver cuales son las variables que tienen mayor desvio.
for col in numericas_pd.columns:
plt.figure(figsize=(20, 4))
plt.title("Histograma de "+col+" con media y desvio")
sns.histplot(numericas_pd[col])
mean = numericas_pd[col].mean()
std = numericas_pd[col].std()
plt.axvline(std, color='red', linewidth=2, linestyle = 'dashed')
plt.axvline(mean, color='blue', linewidth=2, linestyle = 'dashed')
En conclusión las variables que presentan mayor desvío respecto de su media, son las variables : age, month y day
Como primer paso vamos a revisar los valores únicos de las variables cualitativas para ver si hay alguna etiqueta que no corresponda y que este ocultando valores perdidos.
for col in cualitativas.columns:
cualitativas.select(col).distinct().show()
Por medio de un distinct vemos que hay una categoría presente en varias variables que es "unknown". Al no haber dato en la variable esta categoría lo que hace es rellenar la celda, por lo tanto vamos a remover esta etiqueta por un valor vacío para desenmascarar a los valores perdidos ocultos.
df = df.replace('unknown',None)
Una vez reemplazada la etiqueta procedo a revisar el % de nulos por variables
for col in df.columns:
n_missing = df.filter(F.col(col).isNull()).count()
perc_missing = 100 * n_missing / df.count()
print(col, round(perc_missing, 2))
Realizando una revisión del porcentaje de valores nulos, vemos que la variable job, education, contact y poutcome tienen valores nulos, por lo tanto para tratar estos casos vamos a determinar algunos criterios:
Eliminación de variable poutcome
len(df.columns)
df = df.drop('poutcome')
Chequeamos que las variable hayan quedado eliminadas del dataset:
len(df.columns)
Una vez eliminada la variable poutcome, nos quedan 3 variables más con nulos: job, education y contact de las cuales vamos a tomar el criterio de imputación por la cateogoría más frecuente de cada una de ellas:
Categorías con mayor frecuencia:
max_job = df.select('job').groupBy('job').count().orderBy(F.col('count').desc()).select('job').first()[0]
max_educ = df.select('education').groupBy('education').count().orderBy(F.col('count').desc()).select('education').first()[0]
max_cont = df.select('contact').groupBy('contact').count().orderBy(F.col('count').desc()).select('contact').first()[0]
Categoría más frecuentes:
max_job + ', '+ max_educ + ', ' + max_cont
Imputación por categoría más frecuente:
df = df.fillna(max_job, subset=['job'])
df = df.fillna(max_educ, subset=['education'])
df = df.fillna(max_cont, subset=['contact'])
Luego de la imputación verificamos nuevamente si quedaron valores nulos:
for col in df.columns:
n_missing = df.filter(F.col(col).isNull()).count()
perc_missing = 100 * n_missing / df.count()
print(col, round(perc_missing, 2))
Finalmente hemos descubierto a los valores nulos ocultos y se han inputado siguiendo determinados criterios.
Como primer paso vamos a crear un dataframe con variables numéricas
var_num = df.select([c for c,t in df.dtypes if t in['int']])
var_num = var_num.toPandas()
A continuación vamos a comprobar por medio de una función si las variables tienen distribución normal o no, para evaluar que método es conveniente aplicar.
El alpha seteado es de 0.05
def is_normal(variable, alpha=0.05):
mean = variable.mean()
std = variable.std()
pvalue = kstest(variable, 'norm', args=(mean, std)).pvalue
return pvalue >= alpha
Loop para chequear normalidad por cada columna del dataset de variables numéricas
normal_cols = []
non_normal_cols = []
for col in var_num.columns:
normal = is_normal(var_num[col])
if normal:
normal_cols.append(col)
else:
non_normal_cols.append(col)
normal_cols
Niguna de las variables analizadas tienen distribución normal
non_normal_cols
Concluimos que las variables no tienen una distribución normal, por lo tanto aplicaremos el test de Tukey que se maneja con cuartiles:
Chequeo de outliers por variable
def tukey_outliers(df,column,extreme=False):
q1, q3 = np.percentile(df[column],[25,75])
iqr = q3 - q1
constant = 1.5 if not extreme else 3
return df[~((df[column]>(q3+constant*iqr)) | (df[column]<(q1-constant*iqr)))]
for columna in var_num.columns: #loop para outliers
outliers = round((1 - len(tukey_outliers(var_num,columna,extreme=False))/len(var_num))*100,2)
print('Outliers en columna',columna,':',outliers,'%')
Removiendo Outliers
def remove_tukey_outliers(df, col):
q1, q3 = df.approxQuantile(col, [0.25, 0.75], 0.01)
IQR = q3 - q1
min_thresh = q1 - 1.5 * IQR
max_thresh = q3 + 1.5 * IQR
df_no_outliers = df.filter(F.col(col).between(min_thresh, max_thresh))
return df_no_outliers
df_final = df
df_col_out = ['age','balance','campaign','duration']
for col in df_col_out:
df_final = remove_tukey_outliers(df_final,col)
print("Filas eliminadas:",df.count()-df_final.count())
Dimensiones del dataset sin outliers
print("El dataset luego de la limpieza contiene: ", df_final.count(), "filas y ",len(df_final.columns),"columnas")
cualitativas_final = df_final.select([c for c,t in df.dtypes if t in['string']])
df_final.toPandas().describe(include = 'O')
for col in cualitativas_final.columns:
tablas = cualitativas_final.select(col).groupBy(col).count().orderBy(F.col('count').desc()).toPandas()
plt.figure(figsize=(10, 4))
plt.title("Gráfico de barras: "+col)
sns.barplot(x=tablas['count'], y = tablas[col],orient = 'h')
plt.show()
Vemos que la mayoria de las variables tienen 2 o 3 categorías salvo job que tiene varias categorías.
Creamos variable job_recod que indica si la persona tiene trabajo o no en base a las categorías disponibles en job
df_final = df_final.withColumn('trabaja',F.when(F.col('job')=='retired',0)
.when(F.col('job')=='unemployed',0)
.when(F.col('job')=='student',0)
.otherwise(1))
df_final.groupBy('trabaja').count().show()
Luego creamos la variable duration_cat que indica si el llamado es mayor igual al promedio o no.
mean_duration = df_final.select(F.mean('duration')).first()[0]
print("Promedio de llamada: ",round(mean_duration,2),"Segs.")
df_final = df_final.withColumn('duration_cat',F.when(F.col('duration')>=mean_duration,'long_call')
.otherwise('normal_call'))
df_final.groupBy('duration_cat').count().show()
Variables Dummy
vars_dummy = [c for c,t in df.dtypes if t in['string']]
from pyspark.ml.feature import StringIndexer
df_ind = df_final
for element in vars_dummy:
feature_indexer = StringIndexer(inputCol=element, outputCol=element+'_indexed')
feature_indexer_model = feature_indexer.fit(df_final)
cualitativas_final_ind = feature_indexer_model.transform(df_ind)
df_ind.show(1,vertical = True)
dictionaries = []
for element in vars_dummy:
string_indexer = StringIndexer(inputCol=element, outputCol=element+'_category')
onehotencoder = OneHotEncoder(dropLast=False, inputCol= string_indexer.getOutputCol(), outputCol=element+'_dummy')
pipeline = Pipeline(stages=[string_indexer, onehotencoder])
pipeline_model = pipeline.fit(df_final)
dictionaries.append((element, list(enumerate(pipeline_model.stages[0].labels)), pipeline_model.stages[0]))
df_final = pipeline_model.transform(df_final)
df_final = df_final.drop(string_indexer.getOutputCol())
for element in dictionaries:
df_final = (df_final.withColumn('activated_indices'+element[0], F.udf(lambda x: x.toArray().tolist(), ArrayType(DoubleType()))
(F.col(element[0]+'_dummy'))))
vocab = [re.sub(r'\W', '_', value) for value in element[-1].labels]
df_final = df_final.select(df_final.columns + [F.col("activated_indices"+element[0])[i] for i in range(len(vocab))])
dictionary = {"activated_indices"+element[0]+"[{0}]".format(x): element[0]+'_'+vocab[x] for x in range(len(vocab))}
# Renombramos las columnas con el elemento que sea
df_final = df_final.selectExpr(["{0} as {1}".format(x, x) if x not in dictionary else "{0} as {1}".format(x, dictionary[x])
for x in df_final.columns])
df_final = df_final.drop('activated_indices'+element[0], element[0]+'_dummy')
df_final.show(1,vertical = True)
df_final.printSchema()
var_num_corr=[c for c,t in df_final.dtypes if t in['int','double']]
corr_matrix = Statistics.corr(df_final.select(var_num_corr).rdd.map(lambda v: Vectors.dense(v)),method='pearson')
corr_matrix=pd.DataFrame(corr_matrix,columns=var_num_corr, index=var_num_corr)
corr_matrix
Gráfico Matriz de correlaciones
plt.figure(figsize=(18,18))
sns.heatmap(corr_matrix,cmap='coolwarm', vmax=1, vmin=-1, square=True, annot=True, fmt='.2f')
Comprobación de pares de variables correlacionadas
Por medio de la función incluida abajo, se chequean los pares de variables con sus respectivas correlaciones.
def corrank(X):
import itertools
df_correlaciones = pd.DataFrame([[(i,j),X.corr().loc[i,j]] for i,j in list(itertools.combinations(X.corr(), 2))],columns=['pares','corr'])
return df_correlaciones.sort_values(by='corr',ascending=False)
corrank(corr_matrix).head(5)
En la tabla generada arriba se pueden observar los pares de variables con fuerte correlación positiva.
corrank(corr_matrix).tail(5)
En la tabla generada arriba se pueden observar los pares de variables con fuerte correlación negativa.
Próximamente:
Creación de modelos con spark
Autor:
Marcelo G Gonzalez