You are currently browsing the tag archive for the ‘Spark’ tag.

If big data is your thing, you use R, and you’re headed to Strata + Hadoop World in San Jose March 13 & 14th, you can experience in person how easy and practical it is to analyze big data with R and Spark.

In a beginner level talk by RStudio’s Edgar Ruiz and an intermediate level  workshop by Win-Vector’s John Mount, we cover the spectrum: What R is, what Spark is, how Sparklyr works, and what is required to set up and tune a Spark cluster. You’ll also learn practical applications including: how to quickly set up a local Spark instance, store big data in Spark and then connect to the data with R, use R to apply machine-learning algorithms to big data stored in Spark, and filter and aggregate big data stored in Spark and then import the results into R for analysis and visualization.

2:40pm–3:20pm Wednesday, March 15, 2017
Sparklyr: An R interface for Apache Spark
Edgar Ruiz (RStudio)
Primary topic: Spark & beyond
Location: LL21 C/D
Level: Beginner
Secondary topics: R

1:30pm–5:00pm Tuesday, March 14, 2017
Modeling big data with R, sparklyr, and Apache Spark
John Mount (Win-Vector LLC)
Primary topic: Data science & advanced analytics
Location: LL21 C/D
Level: Intermediate
Secondary topics: R

While you’re  at the conference be sure to look us up in the Innovator’s Pavilion – booth number P8 during the Expo Hall hours. We’ll have the latest books from RStudio authors, t-shirts to win, demonstrations of RStudio Connect and RStudio Server Pro and, of course, stickers and cheatsheets. Share with us what you’re doing with RStudio and get your product and company questions answered by RStudio employees.

See you in San Jose! (https://conferences.oreilly.com/strata/strata-ca)

We’re happy to announce that version 0.5 of the sparklyr package is now available on CRAN. The new version comes with many improvements over the first release, including:

  • Extended dplyr support by implementing: do() and n_distinct().
  • New functions including sdf_quantile(), ft_tokenizer() and ft_regex_tokenizer().
  • Improved compatibility, sparklyr now respects the value of the ‘na.action’ R option and dim(), nrow() and ncol().
  • Experimental support for Livy to enable clients, including RStudio, to connect remotely to Apache Spark.
  • Improved connections by simplifying initialization and providing error diagnostics.
  • Certified sparklyr, RStudio Server Pro and ShinyServer Pro with Cloudera.
  • Updated spark.rstudio.com with new deployment examples and a sparklyr cheatsheet.

Additional changes and improvements can be found in the sparklyr NEWS file.

For questions or feedback, please feel free to open a sparklyr github issue or a sparklyr stackoverflow question.

Extended dplyr support

sparklyr 0.5 adds supports for n_distinct() as a faster and more concise equivalent of length(unique(x)) and also adds support for do() as a convenient way to perform multiple serial computations over a group_by() operation:

library(sparklyr)
sc <- spark_connect(master = "local")
mtcars_tbl <- copy_to(sc, mtcars, overwrite = TRUE)

by_cyl <- group_by(mtcars_tbl, cyl)
fit_sparklyr <- by_cyl %>% 
   do(mod = ml_linear_regression(mpg ~ disp, data = .))

# display results
fit_sparklyr$mod

In this case, . represents a Spark DataFrame, which allows us to perform operations at scale (like this linear regression) for a small set of groups. However, since each group operation is performed sequentially, it is not recommended to use do() with a large number of groups. The code above performs multiple linear regressions with the following output:

[[1]]
Call: ml_linear_regression(mpg ~ disp, data = .)

Coefficients:
 (Intercept)         disp 
19.081987419  0.003605119 

[[2]]
Call: ml_linear_regression(mpg ~ disp, data = .)

Coefficients:
(Intercept)        disp 
 40.8719553  -0.1351418 

[[3]]
Call: ml_linear_regression(mpg ~ disp, data = .)

Coefficients:
(Intercept)        disp 
22.03279891 -0.01963409 

It’s worth mentioning that while sparklyr provides comprehensive support for dplyr, dplyr is not strictly required while using sparklyr. For instance, one can make use of DBI without dplyr as follows:

library(sparklyr)
library(DBI)

sc <- spark_connect(master = "local")
sdf_copy_to(sc, iris)
dbGetQuery(sc, "SELECT * FROM iris LIMIT 4")
  Sepal_Length Sepal_Width Petal_Length Petal_Width Species
1          5.1         3.5          1.4         0.2  setosa
2          4.9         3.0          1.4         0.2  setosa
3          4.7         3.2          1.3         0.2  setosa
4          4.6         3.1          1.5         0.2  setosa
5          5.0         3.6          1.4         0.2  setosa

New functions

The new sdf_quantile() function computes approximate quantiles (to some relative error), while the new ft_tokenizer() and ft_regex_tokenizer() functions split a string by white spaces or regex patterns.

For example, ft_tokenizer() can be used as follows:

library(sparklyr)
library(janeaustenr)
library(dplyr)

sc %>%
  spark_dataframe() %>%
  na.omit() %>%
  ft_tokenizer(input.col = “text”, output.col = “tokens”) %>%
  head(4)

Which produces the following output:

                   text                book     tokens
                  <chr>               <chr>     <list>
1 SENSE AND SENSIBILITY Sense & Sensibility <list [3]>
2                       Sense & Sensibility <list [1]>
3        by Jane Austen Sense & Sensibility <list [3]>
4                       Sense & Sensibility <list [1]>

Tokens can be further processed through, for instance, HashingTF.

Improved compatibility

‘na.action’ is a parameter accepted as part of the ‘ml.options’ argument, which defaults to getOption("na.action", "na.omit"). This allows sparklyr to match the behavior of R while processing NA records, for instance, the following linear model drops NA record appropriately:

library(sparklyr)
library(dplyr)
library(nycflights13)

sc <- spark_connect(master = "local")
flights_clean <- na.omit(copy_to(sc, flights))

ml_linear_regression(
  flights_tbl
  response = "dep_delay",
  features = c("arr_delay", "arr_time"))
* Dropped 9430 rows with 'na.omit' (336776 => 327346)
Call: ml_linear_regression(flights_tbl, response = "dep_delay",
                           features = c("arr_delay", "arr_time"))

Coefficients:
 (Intercept)    arr_delay     arr_time 
6.1001212994 0.8210307947 0.0005284729

In addition, dim(), nrow() and ncol() are now supported against Spark DataFrames.

Livy connections

Livy, “An Open Source REST Service for Apache Spark (Apache License)”, is now available in sparklyr 0.5 as an experimental feature. Among many scenarios, this enables connections from the RStudio desktop to Apache Spark when Livy is available and correctly configured in the remote cluster.

Livy running locally

To work with Livy locally, sparklyr supports livy_install() which installs Livy in your local environment, this is similar to spark_install(). Since Livy is a service to enable remote connections into Apache Spark, the service needs to be started with livy_service_start(). Once the service is running, spark_connect() needs to reference the running service and use method = "Livy", then sparklyr can be used as usual. A short example follows:

livy_install()
livy_service_start()

sc <- spark_connect(master = "http://localhost:8998",
                    method = "livy")
copy_to(sc, iris)

spark_disconnect(sc)
livy_service_stop()

Livy running in HDInsight

Microsoft Azure supports Apache Spark clusters configured with Livy and protected with basic authentication in HDInsight clusters. To use sparklyr with HDInsight clusters through Livy, first create the HDInsight cluster with Spark support:

hdinsight-azureCreating Spark Cluster in Microsoft Azure HDInsight

Once the cluster is created, you can connect with sparklyr as follows:

library(sparklyr)
library(dplyr)

config <- livy_config(user = "admin", password = "password")
sc <- spark_connect(master = "https://dm.azurehdinsight.net/livy/",
                    method = "livy",
                    config = config)

copy_to(sc, iris)

From a desktop running RStudio, the remote connection looks like this:

rstudio-hdinsight-azure.png

Improved connections

sparklyr 0.5 no longer requires internet connectivity to download additional Apache Spark packages. This enables connections in secure clusters that do not have internet access or while on the go.

Some community members reported a generic “Ports file does not exists” error while connecting with sparklyr 0.4. In 0.5, we’ve deprecated the ports file and improved error reporting. For instance, the following invalid connection example throws: a descriptive error, the spark-submit parameters and logging information that helps us troubleshoot connection issues.

> library(sparklyr)
> sc <- spark_connect(master = "local",
                      config = list("sparklyr.gateway.port" = "0"))
Error in force(code) : 
  Failed while connecting to sparklyr to port (0) for sessionid (5305): 
  Gateway in port (0) did not respond.
  Path: /spark-1.6.2-bin-hadoop2.6/bin/spark-submit
  Parameters: --class, sparklyr.Backend, 'sparklyr-1.6-2.10.jar', 0, 5305


---- Output Log ----
16/12/12 12:42:35 INFO sparklyr: Session (5305) starting

---- Error Log ----

Additional technical details can be found in the sparklyr gateway socket pull request.

Cloudera certification

sparklyr 0.4, sparklyr 0.5, RStudio Server Pro 1.0 and ShinyServer Pro 1.5 went through Cloudera’s certification and are now certified with Cloudera. Among various benefits, authentication features like Kerberos, have been tested and validated against secured clusters.

For more information see Cloudera’s partner listings.

Today’s guest post is written by Vincent Warmerdam of GoDataDriven and is reposted with Vincent’s permission from blog.godatadriven.com. You can learn more about how to use SparkR with RStudio at the 2015 EARL Conference in Boston November 2-4, where Vincent will be speaking live.

This document contains a tutorial on how to provision a spark cluster with RStudio. You will need a machine that can run bash scripts and a functioning account on AWS. Note that this tutorial is meant for Spark 1.4.0. Future versions will most likely be provisioned in another way but this should be good enough to help you get started. At the end of this tutorial you will have a fully provisioned spark cluster that allows you to handle simple dataframe operations on gigabytes of data within RStudio.

AWS prep

Make sure you have an AWS account with billing. Next make sure that you have downloaded your .pem files and that you have your keys ready.

Spark Startup

Next go and get spark locally on your machine from the spark homepage. It’s a pretty big blob. Unzip it once it is downloaded go to the ec2 folder in the spark folder. Run the following command from the command line.

./spark-ec2 \
--key-pair=spark-df \
--identity-file=/Users/code/Downloads/spark-df.pem \
--region=eu-west-1 \
-s 1 \
--instance-type c3.2xlarge \
launch mysparkr

This script will use your keys to connect to amazon and setup a spark standalone cluster for you. You can specify what type of machines you want to use as well as how many and where on amazon. You will only need to wait until everything is installed, which can take up to 10 minutes. More info can be found here.
When the command signals that it is done, you can ssh into your machine via the command line.
./spark-ec2 -k spark-df -i /Users/code/Downloads/spark-df.pem --region=eu-west-1 login mysparkr
Once you are in your amazon machine you can immediately run SparkR from the terminal.

chmod u+w /root/spark/
./spark/bin/sparkR 

As just a toy example, you should be able to confirm that the following code already works.

ddf <- createDataFrame(sqlContext, faithful) 
head(ddf)
printSchema(ddf)

This ddf dataframe is no ordinary dataframe object. It is a distributed dataframe, one that can be distributed across a network of workers such that we could query it for parallelized commands through spark.

Spark UI

This R command you have just run launches a spark job. Spark has a webui so you can keep track of the cluster. To visit the web-ui, first confirm on what IP-address the master node is via this command:

curl icanhazip.com

You can now visit the webui via your browser.

<master-node-ip>:4040

From here you can view anything you may want to know about your spark clusters (like executor status, job process and even a DAG visualisation).

This is a good moment to stand still and realize that this on it’s own right is already very cool. We can start up a spark cluster in 15 minutes and use R to control it. We can specify how many servers we need by only changing a number on the command line and without any real developer effort we gain access to all this parallelizing power.
Still, working from a terminal might not be too productive. We’d prefer to work with a GUI and we would like some basic plotting functionality when working with data. So let’s install RStudio and get some tools connected.

RStudio setup

Get out of the SparkR shell by entering q(). Next, download and install Rstudio.
wget http://download2.rstudio.org/rstudio-server-rhel-0.99.446-x86_64.rpm
sudo yum install --nogpgcheck -y rstudio-server-rhel-0.99.446-x86_64.rpm
rstudio-server restart
While this is installing. Make sure the TCP connection on the 8787 port is open in the AWS security group setting for the master node. A recommended setting is to only allow access from your ip.

Then, add a user that can access RStudio. We make sure that this user can also access all the RStudio files.

adduser analyst
passwd analyst

You also need to do this (the details of why are a bit involved). These edits need to be made because the analyst user doesn’t have root permissions.
chmod a+w /mnt/spark
chmod a+w /mnt2/spark
sed -e 's/^ulimit/#ulimit/g' /root/spark/conf/spark-env.sh > /root/spark/conf/spark-env2.sh
mv /root/spark/conf/spark-env2.sh /root/spark/conf/spark-env.sh
ulimit -n 1000000
When this is known, point the browser to <master-ip-adr>:8787. Then login in as analyst.

Loading data from S3

Let’s confirm that we can now play with the RStudio stack by downloading some libraries and having it run against a data that lives on S3.
small_file = "s3n://<AWS-ID>:<AWS-SECRET-KEY>@<bucket_name>/data.json"
dist_df <- read.df(sqlContext, small_file, "json") %>% cache
This dist_df is now a distributed dataframe, which has a different api than the normal R dataframe but is similar to dplyr.
head(summarize(groupBy(dist_df, df$type), count = n(df$auc)))
Also, we can install magrittr to make our code look a lot nicer.

local_df <- dist_df %>% 
  groupBy(df$type) %>% 
  summarize(count = n(df$id)) %>% 
  collect

The collect method pulls the distributed dataframe back into a normal dataframe on a single machine so you can use plotting methods on it again and use R as you would normally. A common use case would be to use spark to sample or aggregate a large dataset which can then be further explored in R.
Again, if you want to view the spark ui for these jobs you can just go to:

<master-node-ip>:4040

A more complete stack

Unfortunately this stack has an old version of R (we need version 3.2 to get the newest version of ggplot2/dplyr). Also, as of right now there isn’t support for the machine learning libraries yet. These are known issues at the moment and version 1.5 should show some fixes. Version 1.5 will also feature RStudio installation as part of the ec2 stack.
Another issue is that the namespace of dplyr currently conflicts with sparkr, time will tell how this gets resolved. Same would go for other data features like windowing function and more elaborate data types.

Killing the cluster

When you are done with the cluster, you only need to exit the ssh connection and run the following command:
./spark-ec2 -k spark-df -i /Users/code/Downloads/spark-df.pem --region=eu-west-1 destroy mysparkr

Conclusion

The economics of spark are very interesting. We only pay amazon for the time that we are using Spark as a compute engine. All other times we’d only pay for S3. This means that if we analyse for 8 hours, we’d only pay for 8 hours. Spark is also very flexible in that it allows us to continue coding in R (or python or scala) without having to learn multiple domain specific languages or frameworks like in hadoop. Spark makes big data really simple again.
This document is meant to help you get started with Spark and RStudio but in a production environment there are a few things you still need to account for:

  • security, our web connection is not done through https, even though we are telling amazon to only use our ip, we may be at security risk if there is a man in the middle listening .
  • multiple users, this setup will work fine for a single user but if multiple users are working on such a cluster you may need to rethink some steps with regards to user groups, file access and resource management.
  • privacy, this setup works well for ec2 but if you have sensitive, private user data then you may need to do this on premise because the data cannot leave your own datacenter. Most install steps would be the same but the initial installation of Spark would require the most work. See the docs for more information.

Spark is an amazing tool, expect more features in the future.

Possible Gotya

Hanging

It can happen that the ec2 script hangs in the Waiting for cluster to enter 'ssh-ready' state part. This can happen if you use amazon a lot. To prevent this you may want to remove some lines in ~/.ssh/known_hosts. More info here. Another option is to add the following lines to your ~/.ssh/config file.

# AWS EC2 public hostnames (changing IPs)
Host *.compute.amazonaws.com 
  StrictHostKeyChecking no
  UserKnownHostsFile /dev/null

This is a guest post by Vincent Warmerdam of koaning.io.​

SparkR preview in Rstudio

Apache Spark is the hip new technology on the block. It allows you to write scripts in a functional style and the technology behind it will allow you to run iterative tasks very quickly on a cluster of machines. It’s benchmarked to be quicker than hadoop for most machine learning use cases (by a factor between 10-100) and soon Spark will also have support for the R language. As of April 2015, SparkR has been merged into Apache Spark and is shipping with a new version in an upcoming release (1.4) due early summer 2015. In the meanwhile, you can use this tutorial to go ahead and get familiar with the current version of SparkR.

**Disclaimer** : although you will be able to use this tutorial to write Spark jobs right now with R, the new api due this summer will most likely have breaking changes.

Running Spark Locally

The main feature of Spark is the resilient distributed dataset, which is a dataset that can be queried in memory, in parallel on a cluster of machines. You don’t need a cluster of machines to get started with Spark though. Even on a single machine, Spark is able to efficiently use any configured resources. To keep it simple we will ignore this configuration for now and do a quick one-click install. You can use devtools to download and install Spark with SparkR.

library(devtools)
install_github("amplab-extras/SparkR-pkg", subdir="pkg")

This might take a while. But after the installation, the following R code will run Spark jobs for you:

library(magrittr)
library(SparkR)

sc <- sparkR.init(master="local")

sc %>% 
  parallelize(1:100000) %>%
  count

This small program generates a list, gives it to Spark (which turns it into an RDD, Spark’s Resilient Distributed Dataset structure) and then counts the number of items in it. SparkR exposes the RDD API of Spark as distributed lists in R, which plays very nicely with magrittr. As long as you follow the API, you don’t need to worry much about parallelizing for performance for your programs.

A more elaborate example

Spark also allows for grouped operations, which might remind you a bit of dplyr.

nums = runif(100000) * 10

sc %>% 
  parallelize(nums) %>% 
  map(function(x) round(x)) %>%
  filterRDD(function(x) x %% 2) %>% 
  map(function(x) list(x, 1)) %>%
  reduceByKey(function(x,y) x + y, 1L) %>% 
  collect

The Spark API will look very ‘functional’ to programmers used to functional programming languages (which should come to no suprise if you know that Spark is written in Scala). This script will do the following;

  1. it will create a RRD Spark object from the original data
  2. it will map each number to a rounded number
  3. it will filter all even numbers out or the RDD
  4. next it will create key/value pairs that can be counted
  5. it then reduces the key value pairs (the 1L is the number of partitions for the resulting RDD)
  6. and it collects the results

Spark will have started running services locally on your computer, which can be viewed at http://localhost:4040/stages/. You should be able to see all the jobs you’ve run here. You will also see which jobs have failed with the error log.

Bootstrapping with Spark

These examples are nice, but you can also use the power of Spark for more common data science tasks. Let’s sample a dataset to generate a large RDD, which we will then summarise via bootstrapping. Instead of parallelizing numbers, I will now parallelize dataframe samples.

sc <- sparkR.init(master="local")

sample_cw <- function(n, s){
  set.seed(s)
  ChickWeight[sample(nrow(ChickWeight), n), ]
}

data_rdd <- sc %>%
  parallelize(1:200, 20) %>% 
  map(function(s) sample_cw(250, s))

For the parallelize function we can assign the number of partitions Spark can use for the resulting RDD. My s argument ensures that each partition will use a different random seed when sampling. This data_rdd is useful, because it can be reused for multiple purposes.

You can use it to estimate the mean of the weight.

data_rdd %>% 
  map(function(x) mean(x$weight)) %>% 
  collect %>% 
  as.numeric %>% 
  hist(20, main="mean weight, bootstrap samples")

Or you can use it to perform bootstrapped regressions.

train_lm <- function(data_in){
  lm(data=data_in, weight ~ Time)
}

coef_rdd <- data_rdd %>% 
  map(train_lm) %>% 
  map(function(x) x$coefficients) 

get_coef <- function(k) { 
  code_rdd %>%  
    map(function(x) x[k]) %>% 
    collect %>%
    as.numeric
}

df <- data.frame(intercept = get_coef(1), time_coef = get_coef(2))
df$intercept %>% hist(breaks = 30, main="beta coef for intercept")
df$time_coef %>% hist(breaks = 30, main="beta coef for time")

The slow part of this tree of operations is the creation of the data, because this has to occur locally through R. A more common use case for Spark would be to load a large dataset from S3 which connects to a large EC2 cluster of Spark machines.

More power?

Running Spark locally is nice and will already allow for parallelism, but the real profit can be gained by running Spark on a cluster of computers. The nice thing is that Spark automatically comes with a script that will automate the provisioning of a Spark cluster on Amazon AWS.

To get a cluster started; start up an EC2 cluster with the supplied ec2 folder from Apache’s Spark github repo. A more elaborate tutorial can be found here, but if you already are an Amazon user, provisioning a cluster on Amazon is as simple as calling a one-liner:

./spark-ec2 \
--key-pair=spark-df \
--identity-file=/path/spark-df.pem \
--region=eu-west-1 \
-s 3 \
--instance-type c3.2xlarge \
launch my-spark-cluster

If you ssh in the master node that has just been setup you can run the following code:

cd /root
git clone https://github.com/amplab-extras/SparkR-pkg.git
cd SparkR-pkg
SPARK_VERSION=1.2.1 ./install-dev.sh
cp -a /root/SparkR-pkg/lib/SparkR /usr/share/R/library/
/root/spark-ec2/copy-dir /root/SparkR-pkg
/root/spark/sbin/slaves.sh cp -a /root/SparkR-pkg/lib/SparkR /usr/share/R/library/

Launch SparkR on a Cluster

Finally to launch SparkR and connect to the Spark EC2 cluster, we run the following code on the master machine:

MASTER=spark://:7077 ./sparkR

The hostname can be retrieved using:

cat /root/spark-ec2/cluster-url

You can check on the status of your cluster via Spark’s Web UI at http://:8080.

The future

Everything described in this document is subject to changes with the next Spark release, but should help you feel familiar on how Spark works. There will be R support for Spark, less so for low level RDD operations but more so for its distributed machine learning algorithms as well as DataFrame objects.

The support for R in the Spark universe might be a game changer. R has always been great on doing exploratory and interactive analysis on small to medium datasets. With the addition of Spark, R can become a more viable tool for big datasets.

June is the current planned release date for Spark 1.4 which will allow R users to run data frame operations in parallel on the distributed memory of a cluster of computers. All of which is completely open source.

It will be interesting to see what possibilities this brings for the R community.