In this post we will answer two questions:

If we assume there are more than trillion pages in the web, each of about 10KB per page, that’s 10PB total. With 1TB disks we would need 10K machines to store this, but hard disks would be too slow for interactive analysis. One approach would be sampling down to 0.01%, or about 100 million pages. With 10GB in memory, we would need 100 about machines to cache this.

Now that we’ve have sketched this approach, we can introduce Apache Spark, a fast and general engine for large-scale data processing with support for in-memory datasets. To work from R, we can use sparklyr to install, connect and analize data in Apache Spark.

Analysis (dry-run)

Before attempting to run in a 100 nodes cluster, it’s sensible to start with a local run. We can accomplish this from the R console or RStudio as follows:

library(sparkwarc)                                  # Load extension to read warc files
library(sparklyr)                                   # Load sparklyr to use Spark from R
library(dplyr)                                      # Load dplyr to perform analysis

spark_install()                                     # Install Apache Spark

config <- spark_config()                            # Create a config to tune memory
config[["sparklyr.shell.driver-memory"]] <- "10G"   # Set driver memory to 10GB

sc <- spark_connect(master = "local",               # Connecto to local cluster
                    config = config)                # using custom configs

file <- gsub("s3n://commoncrawl/",                  # mapping the S3 bucket url
             "http://commoncrawl.amazonaws.com/",   # into a adownloadable url
             sparkwarc::cc_warc(1)), "warc.gz")     # from the first archive file

spark_read_warc(                                    # Read the warc file
  sc,                                               # into the sc Spark connection
  "warc",                                           # save into 'warc' table
  "warc.gz",                                        # loading from remote gz file
  repartition = 8,                                  # partition into 8 to maximize MBP cores
  parse = TRUE)                                     # parse tags and attributes
)

tbl(sc, "warc") %>% summarize(count = n())          # Count tags and attributes

Analysis (full-run)

Now that we have a local run working we can focus on running this at scale.

First step first, we need to find a cluster with 100 machines, ideally. While there are multiple on-demand providers of Spark Clusters (IBM Bluemix, Databricks, Google ataproc, Microsoft HDInsight, etc.) we will use in this post Amazon AMR.

To set up this cluster you can read Using sparklyr with an Apache Spark EMR cluster or from the Amazon EMR team Running sparklyr in EMR.

While following the EMR walkthroughs, there are a couple suggestions worth mentioning:

  1. With a new account in EMR, cluster instances are limited and therefore, starting with 50 m3.xlarge instances it’s an easier start. Otherwise, you can request a limit increase.

  2. Specify the following maximizeResourceAllocation config to maximize memory usage in Spark:

[
  {
    "Classification": "spark",
    "Properties": {
      "maximizeResourceAllocation": "true"
    }
  }
]
  1. Scope the feature parameters as follows for the boostrap action:
s3://aws-bigdata-blog/artifacts/aws-blog-emr-rstudio-sparklyr/rstudio_sparklyr_emr5.sh
--sparklyr --rstudio

Alright, at this point you should be all set to run the following R code in EMR by connecting through SSH or opening RStudio from http://:8787.

devtools::install_github("javierluraschi/sparkwarc")   # Install sparkwarc from CRAN

library(sparkwarc)                                     # Load extension to read warc files
library(sparklyr)                                      # Load sparklyr to use Spark from R
library(dplyr)                                         # Load dplyr to perform analysis

config <- spark_config()                               # Create a config to tune memory
config[["spark.memory.fraction"]] <- "0.9"             # Increase memory allocated to storage

sc <- spark_connect(                                   # Connect to Apache Spark
  master = "yarn-client",                              # as yarn-client (EMR default)
  config = config)                                     # using custom config settings

spark_read_warc(                                       # Read the warc file
  sc,                                                  # into the sc Spark connection
  "warc",                                              # save into 'warc' table
  paste(cc_warc(1, 50), collapse = ","),               # load 100 ~5GB files
  parse = TRUE,                                        # maximize cores
  repartition = 400)                                   # load tags as table

To warm up, lets count how many attribute tags we have:

tbl(sc, "warc") %>%
  summarize(count = n())

That’s 4,020,411,053 total, this verifies that the cluster is working appropiately and ready to answer our original questions next.

What is the most used javascript library?

We can find this out by applying a regular expression to the <script> tag:

tbl(sc, "warc") %>%
  filter(tag == "script", attribute == "src") %>%
  transmute(js = regexp_extract(value, "[^/]+[.]js", 0)) %>%
  group_by(js) %>%
  summarize(count = n()) %>%
  arrange(desc(count)) %>%
  transmute(
      js = substr(js, 1, 30),
      count = count) %>%
  filter(js != "", !js %like% "%�%") 

What is the most used keyword in the web?

This time, we can use the <meta> tag but we also need to explode the comma saprated values using the explode function as follows:

tbl(sc, "warc") %>%
  filter(tag == "meta", attribute == "content", original %like% "%keywords%") %>%
  transmute(keyword = explode(split(
    value, ","
  ))) %>%
  transmute(keyword = trim(substr(keyword, 1, 30))) %>%
  group_by(keyword) %>%
  summarize(count = n()) %>%
  arrange(desc(count)) %>%
  filter(keyword != "", !keyword %like% "%�%") 

Cleanup

Last but not least, disconnect terminate and terminate your cluster!

spark_disconnect(sc)

Thank for reading this far. For more information on sparklyr you can take a look at spark.rstudio.com