Apache Spark is an open source framework that combines an engine for distributing programs across clusters of machines with an elegant model for writing programs atop it. The below code runs through a Machine Learning excercse that interfaces Spark with Python through PySpark, the Spark Python API that exposes the Spark programming model to Python.
Docker images with Spark are available. Once such image loads a container with Spark, Mesos, Jupyter, and Python. Apache Mesos is a cluster manager that provides efficient resource isolation and sharing across distributed applications or frameworks. With Mesos clusters, any version of Spark drivers and executors can run in Docker containers. Using a Jupyter Notebook makes it easy to write programs to access the Spark clusters. PySpark, the Spark Python API, has less of a learning curve and is considered easier to use, less verbose, and more readable than Scala (Spark’s native language).
sudo docker run -d -p 8888:8888 --user root -e GRANT_SUDO=yes \
jupyter/pyspark-notebook start-notebook.sh --NotebookApp.token=''
sudo docker ps # to get <container_hash>
sudo docker exec -it <container_hash> bash
pip install pyspark --upgrade
pip install findspark --upgrade
Declaring -d
runs the container in “detached” mode in the background in lieu of the default foreground mode. This allows the user to continue using the command line. Failing to declare -d
will require a Ctrl+C
logout. Not using the --NotebookApp.token=''
option will result in a a token being assigned automatically and a message similiar to the below:
Copy/paste this URL into your browser when you connect for the first time, to login with a token: http://localhost:8888/?token=…
wget http://www.dcc.fc.up.pt/~ltorgo/Regression/cal_housing.tgz
tar xzvf cal_housing.tgz && rm -r cal_housing.tgz
readlink -f cal_housing.data
cd ../../usr/local/spark
./bin/pyspark
rdd1 = spark.sparkContext.parallelize([('a',7),('a',2),('b',2)])
rdd2 = spark.sparkContext.parallelize([("a",["x","y","z"]), ("b",["p", "r"])])
rdd3 = spark.sparkContext.parallelize(range(100))
rdd1.reduce(lambda a,b: a+b)
rdd2.flatMapValues(lambda x: x).collect()
exit()
Open the Jyouter Notebook interface on port 8888. The function findspark.init()
makes pyspark importable as a regular library.
# Import findspark
import findspark
# Initialize and provide path
findspark.init("/usr/local/spark")
# Or use this alternative
# findspark.init()
Build the Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").getOrCreate()
Creating RDDs
rdd1 = spark.sparkContext.parallelize([('a',7),('a',2),('b',2)])
rdd2 = spark.sparkContext.parallelize([("a",["x","y","z"]), ("b",["p", "r"])])
rdd3 = spark.sparkContext.parallelize(range(100))
RDD Operations
rdd1.reduce(lambda a,b: a+b)
rdd2.flatMapValues(lambda x: x).collect()
Open the Jyouter Notebook interface on port 8888. Build the Spark Session. See Troubleshooting section if necessary.
# Import SparkSession
from pyspark.sql import SparkSession
# Build the SparkSession
spark = SparkSession.builder \
.master("local") \
.appName("Linear Regression Model") \
.config("spark.executor.memory", "1gb") \
.getOrCreate()
sc = spark.sparkContext
Loading Data.
# Load in the data
rdd = sc.textFile('/home/jovyan/CaliforniaHousing/cal_housing.data')
# Load in the header
header = sc.textFile('/home/jovyan/CaliforniaHousing/cal_housing.domain')
Data Exploration.
header.collect()
rdd.take(2)
# Split lines on commas
rdd = rdd.map(lambda line: line.split(","))
# Inspect the first 2 lines
rdd.take(2)
# Inspect the first line
rdd.first()
# Take top elements
rdd.top(2)
# Import the necessary modules
from pyspark.sql import Row
# Map the RDD to a DF
df = rdd.map(lambda line: Row(longitude=line[0],
latitude=line[1],
housingMedianAge=line[2],
totalRooms=line[3],
totalBedRooms=line[4],
population=line[5],
households=line[6],
medianIncome=line[7],
medianHouseValue=line[8])).toDF()
# Show the top 20 rows
df.show()
# Print the data types of all `df` columns
# df.dtypes
# Print the schema of `df`
df.printSchema()
from pyspark.sql.types import *
df = df.withColumn("longitude", df["longitude"].cast(FloatType())) \
.withColumn("latitude", df["latitude"].cast(FloatType())) \
.withColumn("housingMedianAge",df["housingMedianAge"].cast(FloatType())) \
.withColumn("totalRooms", df["totalRooms"].cast(FloatType())) \
.withColumn("totalBedRooms", df["totalBedRooms"].cast(FloatType())) \
.withColumn("population", df["population"].cast(FloatType())) \
.withColumn("households", df["households"].cast(FloatType())) \
.withColumn("medianIncome", df["medianIncome"].cast(FloatType())) \
.withColumn("medianHouseValue", df["medianHouseValue"].cast(FloatType()))
# Import all from `sql.types`
from pyspark.sql.types import *
# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
for name in names:
df = df.withColumn(name, df[name].cast(newType))
return df
# Assign all column names to `columns`
columns = ['households', 'housingMedianAge', 'latitude', 'longitude', 'medianHouseValue', 'medianIncome', 'population', 'totalBedRooms', 'totalRooms']
# Conver the `df` columns to `FloatType()`
df = convertColumn(df, columns, FloatType())
df.select('population','totalBedRooms').show(10)
df.groupBy("housingMedianAge").count().sort("housingMedianAge",ascending=False).show()
df.describe().show()
Preprocessing The Target Values.
# Import all from `sql.functions`
from pyspark.sql.functions import *
# Adjust the values of `medianHouseValue`
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)
# Show the first 2 lines of `df`
df.take(2)
Feature Engineering.
# Import all from `sql.functions` if you haven't yet
from pyspark.sql.functions import *
# Divide `totalRooms` by `households`
roomsPerHousehold = df.select(col("totalRooms")/col("households"))
# Divide `population` by `households`
populationPerHousehold = df.select(col("population")/col("households"))
# Divide `totalBedRooms` by `totalRooms`
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))
# Add the new columns to `df`
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
.withColumn("populationPerHousehold", col("population")/col("households")) \
.withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))
# Inspect the result
df.first()
# Re-order and select columns
df = df.select("medianHouseValue",
"totalBedRooms",
"population",
"households",
"medianIncome",
"roomsPerHousehold",
"populationPerHousehold",
"bedroomsPerRoom")
# Import `DenseVector`
from pyspark.ml.linalg import DenseVector
# Define the `input_data`
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))
# Replace `df` with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])
# Import `StandardScaler`
from pyspark.ml.feature import StandardScaler
# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")
# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)
# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df)
# Inspect the result
scaled_df.take(2)
Building A Machine Learning Model With Spark ML.
# Split the data into train and test sets
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)
# Import `LinearRegression`
from pyspark.ml.regression import LinearRegression
# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)
# Fit the data to the model
linearModel = lr.fit(train_data)
# Generate predictions
predicted = linearModel.transform(test_data)
# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])
# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()
# Print out first 5 instances of `predictionAndLabel`
predictionAndLabel[:5]
# Coefficients for the model
linearModel.coefficients
# Intercept for the model
linearModel.intercept
# Get the RMSE
linearModel.summary.rootMeanSquaredError
# Get the R2
linearModel.summary.r2
spark.stop()
To be executed in bash shell if necessary.
# If you get a FileNotFoundError error
export SPARK_HOME="/usr/local/spark/"
# Set a fixed value for the hash seed secret
export PYTHONHASHSEED=0
# Set an alternate Python executable
export PYSPARK_PYTHON=/usr/local/ipython/bin/ipython
# Augment the default search path for shared libraries
export LD_LIBRARY_PATH=/usr/local/ipython/bin/ipython
# Augment the default search path for private libraries
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-*-src.zip:$PYTHONPATH:$SPARK_HOME/python/
https://hub.docker.com/r/jupyter/pyspark-notebook/
http://www.dcc.fc.up.pt/~ltorgo/Regression/cal_housing.html
https://www.datacamp.com/community/tutorials/apache-spark-python
https://github.com/jupyter/docker-stacks/blob/master/base-notebook/start.sh
https://www.datacamp.com/community/tutorials/apache-spark-tutorial-machine-learning
https://medium.com/@mccode/understanding-how-uid-and-gid-work-in-docker-containers-c37a01d01cf