Cargamos un dataset con información de una campaña de marketing portuguesa para la venta de un préstamo bancario. El objetivo es, una vez limpiado el dataset, estudiarlo para extraer el máximo número de insights de los datos.
df = spark.read.csv(PATH_DATA+'/bank-full.csv',sep=';', header=True, inferSchema=True) #complementar con los atributos necesarios (inferSchema, separador, etc.)
df.show(5)
El dataset contiene 17 columnas y 45.211 registros
print(len(df.columns))
print(df.count())
El dataset cuenta con un 11,7% (5.289) de casos en los cuales el cliente tiene contratado un depósito a plazo fijo
from pyspark.sql import functions as F
df.groupBy('y').agg(F.count('y').alias('Cantidad'),F.round((F.count('y')/df.count())*100,2).alias('Share')).show()
1) Notamos que los nombres de las columnas estan bajo buenas practicas, por lo que no hay que hacerles ninguna transformacion
2) Por otro lado, a partir de un PrintSchema() vemos que los formatos de las variables estan bien asignados.
3) Sin embargo, se ha notado la presencia de variables binarias, por lo que mas adelante se transformaran a 1s y 0s.
df.show(3)
Veo formatos de columnas
df.printSchema()
Hacemos un describe de las variables categoricas
import warnings
warnings.filterwarnings("ignore")
df.toPandas().describe(include='O')
Ahora para las numericas
df.toPandas().describe()
No se encuentran registros repetidos teniendo en cuenta la totalidad de las columnas
Creemos que no es necesario eliminar duplicados de otra manera, ya que no hay ninguna variable id, y el resto no serÃa un problema que se repitan.
df.count() - df.dropDuplicates().count()
La variable "default" cuenta con un desbalance demasiado grande como para que la misma aporte informacion a la base. Por lo tanto, esta columna deberia ser desestimada en caso de generar un modelo estadistico.
El resto de las variables no cuentan con un problema de concentracion de valores, por lo que estarian aportando informacion a la base.
qualitative_vars = [c for c,t in df.dtypes if t in ['string']]
df_cat = df.select(qualitative_vars)
for col in df_cat.columns:
df.groupBy(col).agg(F.count(col).alias('Cantidad'),F.round((F.count(col)/df.count())*100,2).alias('Share')).orderBy(F.col('Cantidad').desc()).show()
Para las variables numericas notamos que no hay ninguna columna que tenga valores repetidos en su totalidad, siendo el maximo un 81,7% en pdays y previous
numericas = [c for c,t in df.dtypes if t in ['int','float','double']]
df_num = df.select(numericas)
for col in df_num.columns:
df.groupBy(col).agg(F.count(col).alias('Cantidad'),F.round((F.count(col)/df.count())*100,2).alias('Share')).orderBy(F.col('Cantidad').desc()).show(5)
Separo DF numerico
numericas = [c for c,t in df.dtypes if t in ['int','float','double']]
df_num = df.select(numericas)
Calculo varianza y estadisticos basicos
Se agrego una medida de dispersion (coeficiente de variacion) para poder comparar la variancia, dividiendola sobre la media de cada columna particular.
Notamos que la mayor dispersion se encuentra en las variables Previous (3,97) y pdays (2,49). Esto significa que hay mucha diferencia entre la cantidad de veces que los clientes han sido contactados en campañas previas, y por ende tambien en la cantidad de dias desde el ultimo contacto.
avgs = [F.avg(col).alias('avg_' + col) for col in numericas]
maxs = [F.max(col).alias('max_' + col) for col in numericas]
mins = [F.min(col).alias('min_' + col) for col in numericas]
stds = [F.stddev(col).alias('std_' + col) for col in numericas]
var = [F.variance(col).alias('var_' + col) for col in numericas]
cv = [(F.stddev(col)/F.avg(col)).alias('cv_' + col) for col in numericas]
operations = avgs + stds + maxs + mins +var + cv
results = df.select(operations).first()
for col in numericas:
avg = results['avg_' + col]
std = results['std_' + col]
maxi = results['max_' + col]
mini = results['min_' + col]
vari = results['var_' + col]
cvi= results['cv_'+ col]
print('{}: avg={}, min={}, max={}, var={}, std={}, cv={}'.format(col, round(avg, 2), mini, maxi,round(vari, 2),round(std, 2),round(cvi,2)))
Este tema sera tratado luego de remover los outliers e imputar los nulos
Al reemplazar los valores "unknown" como nulos, vemos que las variables poutcome, contact, education y job tienen valores nulos
df = df.replace('unknown',None)
nulls = [F.round(F.sum(F.col(c).isNull().cast('int')) * 100 / df.count(), 4).alias(c) for c in df.columns]
df.select(nulls).show()
Job, Education, Contact, serán reemplazados por la moda
nulos_columnas = ['job','education','contact']
for col in nulos_columnas:
moda = df.groupBy(F.col(col)).count().alias('count').orderBy(F.col('count').desc()).first()[0]
df = df.fillna(moda,subset=[col])
Para la columna poutcome, los nulos son aquellos que no fueron contactados previamente. Es decir, con valor -1 en pdays.
Por este motivo, no seran reemplazados los nulos, sino que seran identificados como un 0, junto a las opciones origniales "other" y "failure". Aquellos que fueron casos contactados con exito ("success") seran codificados con un 1.
#codifico valores y reemplazo por 1 y 0
df= df.withColumn('poutcome', F.when(F.col('poutcome') =='success', 1)\
.when(F.col('poutcome') =='other', 0)\
.when(F.col('poutcome') =='null', 0)\
.when(F.col('poutcome') =='failure', 0)\
.otherwise(F.col('poutcome')))
df= df.fillna('0',subset=['poutcome'])
df = df.withColumn('poutcome',F.col('poutcome').cast('int'))
df.groupBy('poutcome').count().show()
nulls = [F.round(F.sum(F.col(c).isNull().cast('int')) * 100 / df.count(), 4).alias(c) for c in df.columns]
df.select(nulls).show()
Separamos las variables numericas y primero revisamos si son normales.
# Importamos funcion para chequear normalidad
from scipy.stats import kstest
from scipy import stats
def is_normal(variable, alpha=0.05):
# get std, mean
mean = variable.mean()
std = variable.std()
# run kstest and get pvalue
pvalue = kstest(variable, 'norm', args=(mean, std)).pvalue
# check if pvalue is higher than alpha
return pvalue >= alpha
numericas = [c for c,t in df.dtypes if t in ['int','float','double']]
df_num = df.select(numericas)
for col in df_num.columns:
check = df_num.toPandas()[col]
print(col, is_normal(check,alpha=0.05))
Como ninguna lleva una distribucion normal, procedemos a realizar el test de Tuckey para outliers, pero con parametro = 3 en vez de 1.5, asi elimina los outliers extremos
def remove_tukey_outliers(df, col):
"""
Returns a new dataframe with outliers removed on column 'col' usting Tukey test
"""
q1, q3 = df.approxQuantile(col, [0.25, 0.75], 0.01)
IQR = q3 - q1
min_thresh = q1 - 3 * IQR
max_thresh = q3 + 3 * IQR
df_no_outliers = df.filter(F.col(col).between(min_thresh, max_thresh))
return df_no_outliers
Las columnas con un numero considerable de outliers son balance, pdays y previous
for col in df_num.columns:
nrows= df_num.count()
outliers = remove_tukey_outliers(df_num,col)
nrows_out= outliers.count()
print('La columna ',col,' tiene ',nrows - nrows_out, '(', round((100 * (nrows - nrows_out) / nrows),2),'%)','outliers')
Sin embargo, no borraremos outliers de pdays y previous, ya que esto significaria una perdida de informacion relevante de la muestra, ya que estaria borrando todos los valores que no sean "-1" o 0, respectivamente.
Es decir, borraria a todos los registros que hayan sido contactados previamente.
remove_tukey_outliers(df,'previous').groupBy('previous').agg(F.count(col).alias('Cantidad'),F.round((F.count('previous')/df.count())*100,2).alias('Share')).orderBy(F.col('Cantidad').desc()).show(5)
remove_tukey_outliers(df,'pdays').groupBy('pdays').agg(F.count(col).alias('Cantidad'),F.round((F.count('pdays')/df.count())*100,2).alias('Share')).orderBy(F.col('Cantidad').desc()).show(5)
Habiendo hecho este analisis, definimos como outliers a aquellos registros que sean extremos en las columnas Balance, Age, Duration o Campaign, eliminando 5123 registros
df_out=df
df_col_out = ['balance','age','duration','campaign']
for col in df_col_out:
df_out = remove_tukey_outliers(df_out,col)
(df.count() - df_out.count()) # total borrados
Enseñamos diferencia entre muestra con y sin outliers
import matplotlib.pyplot as plt
import seaborn as sns
df_boxplot= df.select(df_col_out)
def plot_outliers(column):
plt.figure(figsize = (12,5))
plt.subplot(1,2,1)
ax = sns.boxplot(x=df_boxplot.toPandas()[column]) #originales
plt.title('Con outliers')
temp = remove_tukey_outliers(df_boxplot,column) #sin outliers
plt.subplot(1,2,2)
ax2= sns.boxplot(x=temp.toPandas()[column])
plt.title('Sin outliers')
for i in df_boxplot.columns:
plot_outliers(i)
Se han eliminado 5.123 casos, siendo guardados bajo el nombrer df_out, quedando un total de 40.088 casos.
Esto genera que el dataset ahora haya perdido 1.150 casos de plazo fijo, pero la distribucion sigue siendo parecida a la original (10.3% vs 11.7%), como se nota debajo.
df.groupBy('y').agg(F.count('y').alias('Cantidad'),F.round((F.count('y')/df.count())*100,2).alias('Share')).show()
df_out.groupBy('y').agg(F.count('y').alias('Cantidad'),F.round((F.count('y')/df_out.count())*100,2).alias('Share')).show()
Aquellas variables binarias (loan, default, housing, contact e y), no seran consideradas como categoricas, sino como booleanas o numericas.
Asà mismo, la variable Month sera pasada a Numerico.
Por lo tanto, las variables categoricas serán Job, Marital y Education.
Agrupamos columna Job en Trabaja y No Trabaja
df_out = df_out.withColumn('job_v2',F.when(F.col('job') =='retired', 'No trabaja')\
.when(F.col('job') =='student', 'No trabaja')\
.when(F.col('job') =='unemployed', 'No trabaja')\
.otherwise('Trabaja'))
df_out.groupBy('job_v2').count().show()
Estudiamos variables categoricas
df_out.toPandas().describe(include='O')
def plot_cat(col):
plt.figure(figsize = (12,5))
plt.subplot(1,2,1)
plot = sns.countplot(x=df_out.toPandas()[col], palette='Blues_d',order = df_out.toPandas()[col].value_counts().index)
plt.title('Distribucion ' + col)
plt.xticks(rotation=90)
return plot
cat_col=['job','marital','education','job_v2']
for col in cat_col :
plot_cat(col)
Importo librerias
from pyspark.sql.types import StringType, DoubleType, IntegerType, ArrayType, DateType
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier, LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import re
Confirmacion que no tenemos datos nulos para evitar errores
nulls = [F.round(F.sum(F.col(c).isNull().cast('int')) * 100 / df_out.count(), 4).alias(c) for c in df_out.columns]
df_out.select(nulls).show()
Previo a convertir a dummies, convertimos Month a numerico
df_out = df_out.withColumn('month',F.when(F.col('month') =='jan', 1)\
.when(F.col('month') =='feb', 2)\
.when(F.col('month') =='mar', 3)\
.when(F.col('month') =='apr', 4)\
.when(F.col('month') =='may', 5)\
.when(F.col('month') =='jun', 6)\
.when(F.col('month') =='jul', 7)\
.when(F.col('month') =='aug', 8)\
.when(F.col('month') =='sep', 9)\
.when(F.col('month') =='oct', 10)\
.when(F.col('month') =='nov', 11)\
.when(F.col('month') =='dec', 12)\
.otherwise('month'))
df_out=df_out.withColumn('month',F.col('month').cast('int'))
Para la conversion a dummies, se utilizaran tanto las funciones de Pandas como de Spark.
Se decidió esto ya que la ventaja de pandas es, ademas de ser una linea unica de codigo, la presencia de la opcion de elimninar el primer dato (drop_first), lo cual achica la dimension de la base final.
Conversion a dummies -> Pandas
import pandas as pd
pandas_dummies = pd.get_dummies(data=df_out.toPandas(),drop_first=True)
pandas_dummies.head()
pd_dummies_spark = spark.createDataFrame(pandas_dummies) #dejamos el mismo df en spark para la matriz de correlacion mas adelante
Conversion a dummies -> Spark
Importo funcion para obtener dummies spark
def remove_nulls(df):
df_no_nulls = df
for element in df_no_nulls.columns:
if df_no_nulls.where(df_no_nulls[element].isNull()).count() != 0:
print('\tThe column "{}" has null values'.format(element))
df_no_nulls = df_no_nulls.where(df_no_nulls[element].isNotNull())
if df_no_nulls.where(df_no_nulls[element].isNull()).count() == 0:
print('The column "{}" does not have null values'.format(element))
return df_no_nulls
def check_nulls(df):
existing_nulls = False
for element in df.columns:
if df.where(df[element].isNull()).count() != 0:
print('\tThe column "{}" has null values'.format(element))
existing_nulls = True
break
if df.where(df[element].isNull()).count() == 0:
print('The column "{}" does not have null values'.format(element))
return existing_nulls
def get_dummies_spark(df, dummy_cols):
# check nulls
existing_nulls = check_nulls(df)
if existing_nulls == False:
# StringIndexer + OneHotEncoder
dictionaries = []
for element in dummy_cols:
print("StringIndexer + OneHotEncoder for column ", element)
string_indexer = StringIndexer(inputCol=element, outputCol=element+'_category')
onehotencoder = OneHotEncoder(dropLast=False, inputCol= string_indexer.getOutputCol(), outputCol=element+'_dummy')
pipeline = Pipeline(stages=[string_indexer, onehotencoder])
pipeline_model = pipeline.fit(df)
dictionaries.append((element, list(enumerate(pipeline_model.stages[0].labels)), pipeline_model.stages[0]))
df = pipeline_model.transform(df)
df = df.drop(string_indexer.getOutputCol())
# divide OneHotEncoder output in different columns
for element in dictionaries:
print("Divide OneHotEncoder output in several columns for original column ", element[0])
df = (df.withColumn('activated_indices'+element[0], F.udf(lambda x: x.toArray().tolist(), ArrayType(DoubleType()))
(F.col(element[0]+'_dummy'))))
vocab = [re.sub(r'\W', '_', value) for value in element[-1].labels]
df = df.select(df.columns + [F.col("activated_indices"+element[0])[i] for i in range(len(vocab))])
dictionary = {"activated_indices"+element[0]+"[{0}]".format(x): element[0]+'_'+vocab[x] for x in range(len(vocab))}
# rename columns
df = df.selectExpr(["{0} as {1}".format(x, x) if x not in dictionary else "{0} as {1}".format(x, dictionary[x])
for x in df.columns])
df = df.drop('activated_indices'+element[0], element[0]+'_dummy')
else:
print("There are nulls in your dataframe, please remove them or fill them before creating dummy features")
return df
Creamos df final con dummies spark
dummy_cols = [c for c,t in df_out.dtypes if t in ['string']]
df_no_nulls = remove_nulls(df_out)
df_no_nulls_dummies = get_dummies_spark(df_no_nulls, dummy_cols)
df_no_nulls_dummies.limit(10).toPandas()
Sumado a la creacion previa de Job_v2, se puede agrupar la edad segun la generacion en la que caiga
df_no_nulls_dummies = df_no_nulls_dummies.withColumn('range_age',F.when(F.col('age') < 18,'[18 <]')\
.when(F.col('age').between(19,30),'[19-30]')\
.when(F.col('age').between(31,45),'[31-45]')\
.when(F.col('age').between(46,60),'[46-60]')\
.otherwise('[60 >]'))
df_no_nulls_dummies.limit(10).toPandas()
# Respuesta aqui
numericas = [c for c,t in pd_dummies_spark.dtypes if t in ['bigint','float','double']]
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.stat import Statistics
import pandas as pd
corr_matrix = Statistics.corr(pd_dummies_spark.select(numericas).rdd.map(lambda v: Vectors.dense(v)),
method='pearson')
corr_matrix = pd.DataFrame(corr_matrix, columns=numericas, index=numericas)
import numpy as np
from matplotlib import pyplot as plt
import seaborn as sns
%matplotlib inline
mask = np.zeros_like(corr_matrix, dtype=np.bool)
mask[np.triu_indices_from(mask)] = True
mask
corr_matrix = corr_matrix.mask(mask)
corr_matrix
plt.figure(figsize=(20,20))
sns.heatmap(corr_matrix, cmap='coolwarm', vmax=1, vmin=-1, square=True, annot=True, fmt='.2f')
numericas = [c for c,t in pd_dummies_spark.dtypes if t in ['bigint','float','double']]
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.stat import Statistics
import pandas as pd
corr_matrix = Statistics.corr(pd_dummies_spark.select(numericas).rdd.map(lambda v: Vectors.dense(v)),
method='pearson')
corr_matrix = pd.DataFrame(corr_matrix, columns=numericas, index=numericas)
# Imported functions
def corrank(X):
import itertools
df_correlaciones = pd.DataFrame([[(i,j),X.corr().loc[i,j]] for i,j in list(itertools.combinations(X.corr(), 2))],columns=['pairs','corr'])
return df_correlaciones.sort_values(by='corr',ascending=False)
La variable Job_management y Educacion terciaria estan altamente correlacionados, como tambien lo esta la cantidad de dias que pasaron desde el ultimo contacto y el resultado de este mismo. Esta ultima se debe a la existencia de la una mayoria que no fue contactada y por ende no tiene resultado alguno.
corrank(corr_matrix).head(10)
Al transformar a dummies las variables categoricas, notamos que como son opciones excluyentes, tiene una alta correlacion negativa.
corrank(corr_matrix).tail(10)
Modelo
Confirmo que no hayan variables string
pandas_dummies.dtypes
Separo en train y testing
X = pandas_dummies.drop(columns = 'y_yes')
y = pandas_dummies['y_yes']
pip install xgboost --user
import sklearn as sk
from sklearn import model_selection, ensemble, metrics
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.ensemble import RandomForestClassifier
import xgboost as xgb
from xgboost import XGBClassifier
from sklearn import metrics
import matplotlib.pyplot as plt
from sklearn import tree
X_train, X_test, y_train, y_test = sk.model_selection.train_test_split(X, y, test_size=0.2, random_state=42)
print(f'''% Positive class in Train = {np.round(y_train.value_counts(normalize=True)[1] * 100, 2)}
% Positive class in Test = {np.round(y_test.value_counts(normalize=True)[1] * 100, 2)}''')
Probando 4 modelos predictivos, notamos que XGBoost classifier es el que tiene mayor exito de prediccion, otorgando un area debajo de la curva de 0.71, lo cual no es mucho.
plt.figure()
# Add the models to the list that you want to view on the ROC plot
models = [
{
'label': 'Logistic Regression',
'model': LogisticRegression(),
},
{
'label': 'Random Forest',
'model': RandomForestClassifier(),
},{
'label': 'XGBClassifier',
'model': XGBClassifier()
},{
'label': 'AdaBoostClassifier',
'model': AdaBoostClassifier()
}
]
# Below for loop iterates through your models list
for m in models:
model = m['model'] # select the model
model.fit(X_train, y_train) # train the model
y_pred=model.predict(X_test) # predict the test data
# Compute False postive rate, and True positive rate
fpr, tpr, thresholds = metrics.roc_curve(y_test, model.predict_proba(X_test)[:,1])
# Calculate Area under the curve to display on the plot
auc = metrics.roc_auc_score(y_test,model.predict(X_test))
# Now, plot the computed values
plt.plot(fpr, tpr, label='%s ROC (area = %0.2f)' % (m['label'], auc))
# Custom settings for the plot
plt.plot([0, 1], [0, 1],'r--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('1-Specificity(False Positive Rate)')
plt.ylabel('Sensitivity(True Positive Rate)')
plt.title('AUC para cada modelo de prediccion de éxito')
plt.legend(loc="lower right")
fig = plt.gcf()
fig.set_size_inches(13, 10)
plt.show() # Display
Vemos la importancia de las variables en el modelo
rf = RandomForestClassifier(n_estimators=1000)
rf.fit(X_train, y_train)
import numpy as np
importances = rf.feature_importances_
sorted_indices = np.argsort(importances)[::-1]
Mayor importancia: Duration, Balance, Age
import matplotlib.pyplot as plt
plt.title('Feature Importance')
plt.bar(range(X_train.shape[1]), importances[sorted_indices], align='center')
plt.xticks(range(pandas_dummies.shape[1]-1), pandas_dummies.columns[sorted_indices], rotation=90)
plt.tight_layout()
from matplotlib.pyplot import figure
fig = plt.gcf()
fig.set_size_inches(10, 15)
plt.title('Importancia de variables, Random Forest')
plt.show()