January, 2017

Quick Recap!

Apache Spark

Use case: What is the most used keyword in the web?

  • 1 trillion pages
  • Sampling of 0.01%
  • 100 machines with 10GB RAM

Apache Spark is a fast and general engine for large-scale data processing, with support for in-memory datasets.

sparklyr - R interface for Spark

spark_install()                             # Install Apache Spark
sc <- spark_connect(master="local")         # Connect to local instance
library(dplyr)                              # Data Manipulation Grammar
mtcars_tbl <- copy_to(sc, mtcars)           # Copy mtcars into Spark
mtcars_tbl %>% summarize(n = n())           # Count records
mtcars_tbl %>% ml_linear_regression(        # Perform linear regression
  response = "mpg",                         # Response vector
  features = c("wt", "cyl"))                # Features for the model fit
library(DBI)                                # R Database Interface
dbGetQuery(sc, "SELECT * FROM mtcars")      # Run SQL query in Spark
invoke(spark_context(sc), "version")        # Run sc.version in Scala
compile_package_jars()                      # Compile Scala code

sparklyr - 0.5 on CRAN!

  • New functions: sdf_quantile(), ft_tokenizer() and ft_regex_tokenizer().
  • Improved compatibility: na.action, dim(), nrow() and ncol().
  • Extended dplyr: do() and n_distinct().
  • Experimental Livy support.
  • Improved connections.
  • Certified with Cloudera.

Analysis

Analysis - Connect

CommonCrawl.org - A nonprofit that crawls the web and provides its archives.

library(sparkwarc)
library(sparklyr)
library(dplyr)
library(DBI)

config <- spark_config()                            # Create a config to tune memory
config[["sparklyr.shell.driver-memory"]] <- "10G"   # Set driver memory to 10GB

sc <- spark_connect(
  master = "local",
  version = "2.0.1",
  config = config)

Analysis - Load

if (!file.exists("cc.warc")) {
  url <- sparkwarc::cc_warc(1) %>%                  # Get the first archive file
    gsub("s3n://commoncrawl/",                      # Map the S3 bucket url
        "http://commoncrawl.amazonaws.com/")        # into downloadable url
  
  download.file(url, "cc.warc.gz")
  unzip("cc.warc.gz")
}

spark_read_warc(                                    # Read the warc file
  sc,                                               # into the sc Spark connection
  "warc",                                           # save into 'warc' table
  "cc.warc",                                        # loading from remote gz file
  repartition = 8                                   # maximize MBP cores
)

Analysis - Aggregate

cc_stats <- function(expr) {
  tbl(sc, "warc") %>%
    transmute(
      regval = explode(split(
        regexp_extract(
          value,
          expr,
          1),
        ",[ ]?"))) %>%
    filter(regval != "") %>%
    group_by(regval) %>%
    summarize(count = n()) %>%
    arrange(desc(count)) %>%
    transmute(
      value = substr(regval, 1, 30),
      count = count)
}

Analysis - Pages

cc_stats("<(html).*")

Analysis - Tags

cc_stats("<([a-zA-Z]+).*>") %>% summarize(count = "tags", total = sum(count))

Analysis - Domains

cc_stats("WARC-Target-URI: (http://[^/]+)/.*")

Analysis - JavaScript Libraries

cc_stats("<script .*src=\\\".*/([^/]+.js)\\\".*")

Analysis - Keywords

cc_stats("<meta .*keywords.*content=\"([a-zA-Z1-9, ]+)\".*")

Questions?