In this post we will answer two questions:
If we assume there are more than trillion pages in the web, each of about 10KB per page, that’s 10PB total. With 1TB disks we would need 10K machines to store this, but hard disks would be too slow for interactive analysis. One approach would be sampling down to 0.01%, or about 100 million pages. With 10GB in memory, we would need 100 about machines to cache this.
Now that we’ve have sketched this approach, we can introduce Apache Spark, a fast and general engine for large-scale data processing with support for in-memory datasets. To work from R, we can use sparklyr to install, connect and analize data in Apache Spark.
Before attempting to run in a 100 nodes cluster, it’s sensible to start with a local run. We can accomplish this from the R console or RStudio as follows:
library(sparkwarc) # Load extension to read warc files
library(sparklyr) # Load sparklyr to use Spark from R
library(dplyr) # Load dplyr to perform analysis
spark_install() # Install Apache Spark
config <- spark_config() # Create a config to tune memory
config[["sparklyr.shell.driver-memory"]] <- "10G" # Set driver memory to 10GB
sc <- spark_connect(master = "local", # Connecto to local cluster
config = config) # using custom configs
file <- gsub("s3n://commoncrawl/", # mapping the S3 bucket url
"http://commoncrawl.amazonaws.com/", # into a adownloadable url
sparkwarc::cc_warc(1)), "warc.gz") # from the first archive file
spark_read_warc( # Read the warc file
sc, # into the sc Spark connection
"warc", # save into 'warc' table
"warc.gz", # loading from remote gz file
repartition = 8, # partition into 8 to maximize MBP cores
parse = TRUE) # parse tags and attributes
)
tbl(sc, "warc") %>% summarize(count = n()) # Count tags and attributes
Now that we have a local run working we can focus on running this at scale.
First step first, we need to find a cluster with 100 machines, ideally. While there are multiple on-demand providers of Spark Clusters (IBM Bluemix, Databricks, Google ataproc, Microsoft HDInsight, etc.) we will use in this post Amazon AMR.
To set up this cluster you can read Using sparklyr with an Apache Spark EMR cluster or from the Amazon EMR team Running sparklyr in EMR.
While following the EMR walkthroughs, there are a couple suggestions worth mentioning:
With a new account in EMR, cluster instances are limited and therefore, starting with 50 m3.xlarge instances it’s an easier start. Otherwise, you can request a limit increase.
Specify the following maximizeResourceAllocation
config to maximize memory usage in Spark:
[
{
"Classification": "spark",
"Properties": {
"maximizeResourceAllocation": "true"
}
}
]
s3://aws-bigdata-blog/artifacts/aws-blog-emr-rstudio-sparklyr/rstudio_sparklyr_emr5.sh
--sparklyr --rstudio
Alright, at this point you should be all set to run the following R code in EMR by connecting through SSH or opening RStudio from http://
devtools::install_github("javierluraschi/sparkwarc") # Install sparkwarc from CRAN
library(sparkwarc) # Load extension to read warc files
library(sparklyr) # Load sparklyr to use Spark from R
library(dplyr) # Load dplyr to perform analysis
config <- spark_config() # Create a config to tune memory
config[["spark.memory.fraction"]] <- "0.9" # Increase memory allocated to storage
sc <- spark_connect( # Connect to Apache Spark
master = "yarn-client", # as yarn-client (EMR default)
config = config) # using custom config settings
spark_read_warc( # Read the warc file
sc, # into the sc Spark connection
"warc", # save into 'warc' table
paste(cc_warc(1, 50), collapse = ","), # load 100 ~5GB files
parse = TRUE, # maximize cores
repartition = 400) # load tags as table
To warm up, lets count how many attribute tags we have:
tbl(sc, "warc") %>%
summarize(count = n())
That’s 4,020,411,053 total, this verifies that the cluster is working appropiately and ready to answer our original questions next.
We can find this out by applying a regular expression to the <script>
tag:
tbl(sc, "warc") %>%
filter(tag == "script", attribute == "src") %>%
transmute(js = regexp_extract(value, "[^/]+[.]js", 0)) %>%
group_by(js) %>%
summarize(count = n()) %>%
arrange(desc(count)) %>%
transmute(
js = substr(js, 1, 30),
count = count) %>%
filter(js != "", !js %like% "%�%")
This time, we can use the <meta>
tag but we also need to explode the comma saprated values using the explode function as follows:
tbl(sc, "warc") %>%
filter(tag == "meta", attribute == "content", original %like% "%keywords%") %>%
transmute(keyword = explode(split(
value, ","
))) %>%
transmute(keyword = trim(substr(keyword, 1, 30))) %>%
group_by(keyword) %>%
summarize(count = n()) %>%
arrange(desc(count)) %>%
filter(keyword != "", !keyword %like% "%�%")
Last but not least, disconnect terminate and terminate your cluster!
spark_disconnect(sc)
Thank for reading this far. For more information on sparklyr you can take a look at spark.rstudio.com