In this article we look to build on our introduction to Mapreduce by implementing KNN on the airline data using the MapReduce paradigm

You can get the source code here

A Quick Review


Let us recall some of the topics we touched on previously. The Hadoop framework we work with is a bundle of a couple things, HDFS, Mapreduce and YARN.

HDFS

HDFS stands for Hadoop Distributed File System. HDFS allows us to take our big data (that will not fit into a single computer), and “distribute” it across several connected computers, which we call a cluster. HDFS was written such that operations that are implemented correctly will be efficiently implemented across all the computers in the cluster.

MapReduce

In order to take advantage of HDFS and its efficiency we must implement our computations in a paradigm we call MapReduce. MapReduce is made up of two steps, Map and Reduce. The mapping portion takes a function and applies it to a list.

Suppose we have a list x = [1,2,3,4,5], we feed the mapper a function we wish to map, and it performs,

\[x \rightarrow Mapper(f) \rightarrow [f(1), f(2), f(3), f(4), f(5)]\]

Later on we will define this function \(f\) to be the Euclidian Distance, and this mapper will calculate the distance of each point to a given point.

The reducer will take this output and “reduce” down to a single object, in most cases a single number or list. We must also provide the reducer with a operation to reduce on as well as a starting point. Knowing this it is easy to picture reducing on \(+\) or \(\times\) but we can reduce on a number of other operations.

\[[f(1), f(2), f(3), f(4), f(5)] \rightarrow Reducer(+) \rightarrow f(1) + f(2) + f(3) + f(4) + f(5)\] \[[f(1), f(2), f(3), f(4), f(5)] \rightarrow Reducer(\times) \rightarrow f(1) \times f(2) \times f(3) \times f(4) \times f(5)\]

YARN

YARN stands for Yet Another Resource Negotiator. YARN is a functionallity that is heavily used when Hadoop is used by multiple users, it effciently determines a schedule of times for jobs submitted to Hadoop by multiple users. YARN is the least important to us at the moment, and so if you would like more information there are plenty of articles on the internet.

The Airline Data


To get a copy of the airline data, Click here. This page provides you with all the years available, we are going to only use the 1988 airline set.

We advice the following:

  1. Create a directoty (folder) called Mapreduce-Airline

  2. Download the zip file into that folder

  3. Extract file into that folder. (Windows users need to download 7zip)

With this one can simply set the working directory in R to this path. With the following

setwd("path/to/Mapreduce-Airline")

You can also use the menus if using R studio, on top bar do:

\[\text{Session} \rightarrow \text{ Set Working Directory } \rightarrow \text{ Choose Directory }\]

and pick the folder created above.

Writting the Mapper


We will create two scripts. One for the mapper and another for the reducer. First create a file called airline_mapper.R, we will write to this file now.

Reading in One Line at a Time

The first thing we want to do is open up a connection to the file contaning the data. In our case this is 1988.csv.

con <- file("1988.csv", open = "r")

With this open connection we now want to read one line at a time, and compute on this. Lets implement the bare bones of this:

while(length(line <- readLines(con, n = 1)) > 0) {
  cat(line, "\n")
}

Here we will read the next line as long as there exist one. We measure the existance of a line by finding its length. We then use cat to output the result to screen.

Processing the Data

Next we want to parse each line such that we can do computations on these observations. Much like we did last time, we need to seperate by commas and then convert to a numeric vector. In this step we will also pick what variables we want to compare.

We implement all of this in a function called parse_line

# @ line: is the raw line from file
# @ to_keep: is a vector of indices which we wish to keep for computation, default is all 29
parse_line <- function(line, to_keep = 1:29) {
  line <- unlist(strsplit(line, ","))
  return (line[to_keep])
}

Adding this to function to the while loop above results in the following:

select_features <- c(2,3,4,6,9,10,13,17,18,15)
while(length(line <- readLines(con, n = 1)) > 0) {
  clean_line <- parse_line(line, select_features)
  cat(clean_line, "\n")
}

A Distance Metric

The KNN algorithm assigns classes by measuring the similarity between the new observation to those in the training set. So we need to define a distance metric, or rather a way to measure similarity. We will use what is called a the Jaccard Coefficient, essentially count the number of matches to determine how similar two items are. Along the way we also weights to give power to features we believe to be more important.

This part is a bit more involved in R, but very straightforward. We compare to check if there is a match, if there is we return a 1 else a 0, and store in a vector. We then return this binary vector times the weights we give. In the end highly similar items will have a high score, distant items will have a low score.

sim_distance_unbound<- function(test_obs, train_obs,feature_weights) {
  comparison_result <- numeric(length(train_obs) - 1)
  comparison_result[1] <- as.numeric(test_obs[1]) == as.numeric(train_obs[1]) # Month
  comparison_result[2] <- as.numeric(test_obs[2]) == as.numeric(train_obs[2]) # DayofMonth
  comparison_result[3] <- as.numeric(test_obs[3]) == as.numeric(train_obs[3]) # DayofWeek
  comparison_result[4] <- abs(as.numeric(test_obs[4]) - as.numeric(train_obs[4])) < 50 # CRSDepTime
  comparison_result[5] <- as.character(test_obs[5]) == as.character(train_obs[5]) # UniqueCarrier
  comparison_result[6] <- as.character(test_obs[6]) == as.character(train_obs[6]) #FlightNum
  comparison_result[7] <- abs(as.numeric(test_obs[7]) - as.numeric(train_obs[7])) < 50 # CRSElapsedTime
  comparison_result[8] <- as.character(test_obs[8]) == as.character(train_obs[8]) # Origin
  comparison_result[9] <- as.character(test_obs[9]) == as.character(train_obs[9]) # Dest
  return(sum(comparison_result * feature_weights))
}

So in the above we are comparing the features we think lead to a plane being late or not. Along with comparing we also add weights to these to give some more power in the over all similarity score.

Lets take a look at an implementation that looks at the first 10 only

# the values we want to compare 
select_features <- c(2,3,4,6,9,10,13,17,18,15) # include 15 as last index for the target variable 
w <- c(2.3,1.3,1,1,1,1,1,1,1)

# for producing a test_point to work with
test_sample <- c("1","28","4","2040","PS","1842","73","SFO","BUR","-8")

con <- file("../data/1988.csv", open = "r")
header_line <- readLines(con, n=1, warn = FALSE) # capture header line as to not use in comps

i <- 1
while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0 && i <= 10) {
  clean_line <- parse_line(line, select_features)
  dist <- sim_distance_unbound(test_obs = test_sample, train_obs = clean_line, feature_weights = w)
  # output the score followed by the class of the training point 
  cat(dist, "\t",(float_this(clean_line[length(clean_line)]) > 15)*1,"\n")
  i <- i + 1
}

close(con)

The Output should be something like this:

10.6     1 
8.3      0 
8.3      1 
8.3      0 
8.3      1 
8.3      0 
8.3      0 
9.3      0 
8.3      0 
8.3      1 

With this mapper is nearly complete. And so putting all the above together the file will look like this,

airline_mapper.R

# @ line: is the raw line from file
# @ to_keep: is a vector of indices which we wish to keep for computation, default is all 29
parse_line <- function(line, to_keep = 1:29) {
  line <- unlist(strsplit(line, ","))
  return (line[to_keep])
}

# @ test_ops: the new point we wish to classify, must be parsed
# @ train_obs: the observation from the training set, must be parsed
# @ feature_weights: a vector of weights 
sim_distance <- function(test_obs, train_obs,feature_weights) {
  comparison_result <- numeric(length(train_obs) - 1)
  comparison_result[1] <- as.numeric(test_obs[1]) == as.numeric(train_obs[1]) # Month
  comparison_result[2] <- as.numeric(test_obs[2]) == as.numeric(train_obs[2]) # DayofMonth
  comparison_result[3] <- as.numeric(test_obs[3]) == as.numeric(train_obs[3]) # DayofWeek
  comparison_result[4] <- abs(as.numeric(test_obs[4]) - as.numeric(train_obs[4])) < 50 # CRSDepTime
  comparison_result[5] <- as.character(test_obs[5]) == as.character(train_obs[5]) # UniqueCarrier
  comparison_result[6] <- as.character(test_obs[6]) == as.character(train_obs[6]) #FlightNum
  comparison_result[7] <- abs(as.numeric(test_obs[7]) - as.numeric(train_obs[7])) < 50 # CRSElapsedTime
  comparison_result[8] <- as.character(test_obs[8]) == as.character(train_obs[8]) # Origin
  comparison_result[9] <- as.character(test_obs[9]) == as.character(train_obs[9]) # Dest
  return(sum(comparison_result * feature_weights))
}

# a simple function to convert to numeric without warnings 
float_this <- function(c) {
  suppressWarnings(as.numeric(c))
}

# the values we want to compare 
select_features <- c(2,3,4,6,9,10,13,17,18,15) # include 15 as last index for the target variable 
w <- c(2.3,1.3,1,1,1,1,1,1,1)

# test data
test_sample <- c("1","28","4","2040","PS","1842","73","SFO","BUR","-8")

con <- file("../data/1988.csv", open = "r")
header_line <- readLines(con, n=1, warn = FALSE) # capture header line as to not use in comps

while (length(line <- readLines(con, n = 1, warn = FALSE)) > 0 && i <= 10) {
  clean_line <- parse_line(line, select_features)
  dist <- sim_distance(test_obs = test_sample, train_obs = clean_line, feature_weights = w)
  # output the score followed by the class of the training point 
  cat(dist, "\t",(float_this(clean_line[length(clean_line)]) > 15)*1,"\n")
}

close(con)

The Shuffle Stage


In this stage, Hadoop will perform some magic for us. In this process of handing off the mapper results to the reducer Hadoop will perform a shuffle, which in our case is a sort. In doing so, the work for the reducer, is simply to collect the first \(k\) or last \(k\) depending on whether the sort is ascending or descending.

We can implement a mock version of this within the terminal.

Writting a Reducer


The reducer will have data comming in with the form of

17   0 
17   0 
18   0 
18   0 
18   0 
18   0 
18   1 
18   1 
19   0 
19   0 

The above has been sorted, and so all we have to do is collect the desired \(k\) results, and vote on the class. The output above is in fact the 10 most similar from the test run in the previous section. If this was the real output we then with \(k = 10\) we would classify the point as not late or rather class 0.

Working with a Hadoop Cluster


Set Up Your Own Cluster


Setting up your own mock cluster is not too involved. Take a look at a guide I created if you would like to do so. You visit the guide at Install A Hadoop Single Node Cluster