RANDOM FOREST - PYSPARK

a

Configuraciones iniciales e importación de librerías

Configuración de sesión de Spark

In [10]:
import findspark
findspark.init()
In [11]:
from pyspark.sql import SparkSession
In [12]:
spark = SparkSession.builder.getOrCreate()

Librerías a usar

In [13]:
#Pyspark libraries/modules
from pyspark.sql import *
from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql.types import StringType, DoubleType, IntegerType, ArrayType, DateType

#Python libraries/modules
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings("ignore")
from scipy import stats
from scipy.stats import kstest

#Pyspark Machine Learning libraries/modules
from pyspark.mllib.stat import Statistics

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier, LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.stat import Statistics
import re

Caso a tratar

En esta oportunidad vamos a retomar el caso publicado anteriormente en el que realice un análisis exploratorio de una base de posibles clientes bancarios.

Si no lo viste te invito a pasar por el notebook anterior.

Ahora que la base se encuentra limpia procedemos a aplicar un modelo de clasificación Random

Pasos Iniciales

In [14]:
pd_df = pd.read_excel('bank_full_final.xlsx')
df = spark.createDataFrame(pd_df)
In [15]:
df = df.drop('Target_no','Target_yes')
In [16]:
print('filas',df.count(),len(df.columns),"columnas")
filas 34383 43 columnas
In [17]:
df.printSchema()
root
 |-- age: long (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: long (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: long (nullable = true)
 |-- month: long (nullable = true)
 |-- duration: long (nullable = true)
 |-- campaign: long (nullable = true)
 |-- pdays: long (nullable = true)
 |-- previous: long (nullable = true)
 |-- Target: string (nullable = true)
 |-- trabaja: long (nullable = true)
 |-- duration_cat: string (nullable = true)
 |-- job_blue_collar: long (nullable = true)
 |-- job_management: long (nullable = true)
 |-- job_technician: long (nullable = true)
 |-- job_admin_: long (nullable = true)
 |-- job_services: long (nullable = true)
 |-- job_retired: long (nullable = true)
 |-- job_self_employed: long (nullable = true)
 |-- job_entrepreneur: long (nullable = true)
 |-- job_unemployed: long (nullable = true)
 |-- job_housemaid: long (nullable = true)
 |-- job_student: long (nullable = true)
 |-- marital_married: long (nullable = true)
 |-- marital_single: long (nullable = true)
 |-- marital_divorced: long (nullable = true)
 |-- education_secondary: long (nullable = true)
 |-- education_tertiary: long (nullable = true)
 |-- education_primary: long (nullable = true)
 |-- default_no: long (nullable = true)
 |-- default_yes: long (nullable = true)
 |-- housing_yes: long (nullable = true)
 |-- housing_no: long (nullable = true)
 |-- loan_no: long (nullable = true)
 |-- loan_yes: long (nullable = true)
 |-- contact_cellular: long (nullable = true)
 |-- contact_telephone: long (nullable = true)

Codificación de variables Targets

In [18]:
df = df.withColumn('Target',F.when(F.col('Target') == 'no',0)
             .otherwise(1))

Separación Train/Test (Hold-out)

In [19]:
train, test = df.randomSplit([0.80,0.20])
#test = test.drop('Target')
In [20]:
print('Filas training set {}'.format(train.count()))
print('Filas validation set {}'.format(test.count()))
Filas training set 27456
Filas validation set 6927
In [21]:
print('Filas training set {}'.format(len(train.columns)))
print('Filas validation set {}'.format(len(test.columns)))
Filas training set 43
Filas validation set 43

Selección de Features

In [22]:
feature = [c for c,t in df.dtypes if t != 'string']
In [23]:
feature.remove('Target')
feature
Out[23]:
['age',
 'balance',
 'day',
 'month',
 'duration',
 'campaign',
 'pdays',
 'previous',
 'trabaja',
 'job_blue_collar',
 'job_management',
 'job_technician',
 'job_admin_',
 'job_services',
 'job_retired',
 'job_self_employed',
 'job_entrepreneur',
 'job_unemployed',
 'job_housemaid',
 'job_student',
 'marital_married',
 'marital_single',
 'marital_divorced',
 'education_secondary',
 'education_tertiary',
 'education_primary',
 'default_no',
 'default_yes',
 'housing_yes',
 'housing_no',
 'loan_no',
 'loan_yes',
 'contact_cellular',
 'contact_telephone']
In [24]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=feature, outputCol='features')
train = assembler.transform(train)
test = assembler.transform(test)
In [25]:
train.select('features').take(1)
Out[25]:
[Row(features=SparseVector(34, {0: 18.0, 1: 5.0, 2: 24.0, 3: 8.0, 4: 143.0, 5: 2.0, 6: -1.0, 19: 1.0, 21: 1.0, 23: 1.0, 26: 1.0, 29: 1.0, 30: 1.0, 32: 1.0}))]
In [26]:
test.show(1,vertical = True)
-RECORD 0-----------------------------------
 age                 | 19                   
 job                 | student              
 marital             | single               
 education           | secondary            
 default             | no                   
 balance             | 526                  
 housing             | no                   
 loan                | no                   
 contact             | cellular             
 day                 | 1                    
 month               | 7                    
 duration            | 174                  
 campaign            | 1                    
 pdays               | 199                  
 previous            | 3                    
 Target              | 0                    
 trabaja             | 0                    
 duration_cat        | normal_call          
 job_blue_collar     | 0                    
 job_management      | 0                    
 job_technician      | 0                    
 job_admin_          | 0                    
 job_services        | 0                    
 job_retired         | 0                    
 job_self_employed   | 0                    
 job_entrepreneur    | 0                    
 job_unemployed      | 0                    
 job_housemaid       | 0                    
 job_student         | 1                    
 marital_married     | 0                    
 marital_single      | 1                    
 marital_divorced    | 0                    
 education_secondary | 1                    
 education_tertiary  | 0                    
 education_primary   | 0                    
 default_no          | 1                    
 default_yes         | 0                    
 housing_yes         | 0                    
 housing_no          | 1                    
 loan_no             | 1                    
 loan_yes            | 0                    
 contact_cellular    | 1                    
 contact_telephone   | 0                    
 features            | (34,[0,1,2,3,4,5,... 
only showing top 1 row

In [27]:
train.select('features').show(2)
+--------------------+
|            features|
+--------------------+
|(34,[0,1,2,3,4,5,...|
|(34,[0,1,2,3,4,5,...|
+--------------------+
only showing top 2 rows

In [28]:
from pyspark.ml.stat import Correlation

matrix = Correlation.corr(train.select('features'), 'features')
matrix_np = matrix.collect()[0]["pearson({})".format('features')].values
In [41]:
import seaborn as sns

matrix_np = matrix_np.reshape(len(feature),len(feature))

fig, ax = plt.subplots(figsize=(20,20))
ax = sns.heatmap(matrix_np, cmap="YlGnBu")
ax.xaxis.set_ticklabels(feature, rotation=270)
ax.yaxis.set_ticklabels(feature, rotation=0)
ax.set_title("Correlation Matrix")
plt.tight_layout()
plt.show()

RANDOM FOREST

In [30]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol='Target', featuresCol='features', numTrees=50)
In [31]:
rf = rf.fit(train)
In [32]:
pred = rf.transform(test)
In [33]:
pred.select('Target', 'prediction', 'probability').show(5)
+------+----------+--------------------+
|Target|prediction|         probability|
+------+----------+--------------------+
|     0|       0.0|[0.69865934831345...|
|     0|       0.0|[0.83775300983458...|
|     1|       0.0|[0.92307653215843...|
|     0|       0.0|[0.91331704901701...|
|     1|       0.0|[0.94449666309508...|
+------+----------+--------------------+
only showing top 5 rows

In [34]:
pred_pd = pred.select(['Target', 'prediction', 'probability']).toPandas()
In [35]:
pred_pd.head()
Out[35]:
Target prediction probability
0 0 0.0 [0.6986593483134501, 0.30134065168654994]
1 0 0.0 [0.8377530098345801, 0.16224699016541982]
2 1 0.0 [0.9230765321584307, 0.07692346784156945]
3 0 0.0 [0.9133170490170146, 0.08668295098298541]
4 1 0.0 [0.9444966630950828, 0.05550333690491719]
In [36]:
import numpy as np
pred_pd['probability'] = pred_pd['probability'].map(lambda x: list(x))
pred_pd['encoded_Target'] = pred_pd['Target'].map(lambda x: np.eye(2)[int(x)])
In [37]:
y_pred = np.array(pred_pd['probability'].tolist())
y_true = np.array(pred_pd['encoded_Target'].tolist())

Performance Metric

In [38]:
from sklearn.metrics import auc, roc_curve

fpr, tpr, threshold = roc_curve(y_score=y_pred[:,0], y_true=y_true[:,0])
auc = auc(fpr, tpr)

print('AUC: {:.3f}'.format(auc))
AUC: 0.833
In [45]:
plt.figure()
plt.subplots(figsize=(15,15))
plt.plot([0,1], [0,1], 'k--', color='orange')
plt.plot(fpr, tpr, label='auc = {:.3f}'.format(auc))
plt.xlabel('False positive rate')
plt.ylabel('True positive rate')
plt.title('Curva ROC')
plt.legend(loc='lower right')
plt.grid()
plt.show()
<Figure size 432x288 with 0 Axes>