This article is fully available at the following address
http://blog.revolutionanalytics.com/2016/11/data-manipulation-with-sparklyr-on-azure-hdinsight.html

by Ali Zaidi, Data Scientist at Microsoft

Apache Spark and a Tale of APIs

Spark is an exceptionally popular processing engine for distributed data. Dealing with data in distributed storage and programming with concurrent systems often requires learning complicated new paradigms and techniques. Statisticans and data scientists familiar wtih R are unlikely to have much experience with such systems. Fortunately, Spark is a very intuitive distributed computing platform, and there are APIs available in R and Python that are more familiar to the average data scientist than scala.

One of the most intriguing APIs available for Spark for R users is the sparkapi package developed by RStudio and the associated sparklyr project, which provides a very similar dplyr syntax for manipulating Spark DataFrames.

Azure HDInsight

Azure HDInsight is the Microsoft developed Apache Hadoop distribution for the cloud. The Hadoop ecosystem is a wild zoo of components, and configuring them together can be a difficult job. Microsoft’s Azure HDInsight service gives data scientists and analysts the most exciting capabilities of distributed computing on the cloud without the hassle of configuration and management. In this post, I will show how we can utilize premium Spark clusters with a complete installation of Microsoft R’s stack for doing data manipulation at scale.

Provisioning

To provision Azure HDInsight Spark clusters with Microsoft R Server, follow the instructions on the Azure Documentation Page, or my Spark course repository. To create a cluster, simply navigate to your portal and select a new HDInsight cluster:

Portal-blade3

On the Azure portal page, you will be asked to select a subscription, and also provide some security credentials to access your cluster via a secure terminal connection (SSH).

Make sure you select the premium option to get the awesome features of Microsoft R Server:

Premiumspark

A follow up blog post will show you how to develop scalable machine learning models using the Microsoft R Server machine learning libraries.

Spark DataFrames and Spark SQL

In order to accommodate the variety of workloads a data scientist encounters during her work, Spark contains four core pieces aimed at a specific use-case:

Spark-core

The component aimed at data manipulation workloads is the Spark SQL component. Inside the Spark SQL context, Spark developers create DataFrames, which are a schematic versions of RDDs (Resilient Distributed Datasets). RDDs are the standard primitive of data in the Spark ecosystem, and can be thought of as distributed lists. However, just as lists are somewhat too general for the workloads necessitated by data manipulation in R, RDDs are also too general for data analysis workloads in Spark. By injecting a schema of columns (type schema and equality of rows), DataFrames are born.

Functional Programming and the Art of Being Lazy

The majority of Spark is written in Scala (~80% of Spark core), which is a functional programming language. Functional programming languages emphasize functional purity (the output only depends on the inputs) and strive to avoid side-effects. One important component of most functional programming languages is their lazy evaluation. While it might seem odd that we would appreciate laziness from our computing tools, lazy evaluation is an effective way of ensuring computations are evaluated in the most efficient manner possible.

Lazy evaluation allows Spark SQL to highly optimize the queries. When a user submits a query to Spark SQL, Spark composes the components of the SQL query into a logical plan. The logical plan is basically a recipe Spark SQL creates in order to evaluate the desired query. Spark SQL then submits the logical plan to its highly optimized engine called Catalyst, which optimizes this plan into a physical plan of action that is executed inside Spark computation engine (a series of coordinating JVMs).

Spark SQL and dplyr

Some of the more R-enthusiastic readers might already see where this is going. Since Spark SQL is Spark’s core component of doing data analysis, why can’t we just use our favorite SQL-like paradigm in R to connect to Spark as an inteface? For many an R user, the only package that brings up memories of love and joy for data analysis is dplyr.

More than just being an awesome package with a simple to understand grammar, dplyr is highly extensible and works hand-in-hand with SQL. It supports working with many SQL databases by utilizing that database engine as it’s backend, and converting/translating all of it’s verbs into SQL statements. For more information on working with databases using dplyr, see the package’s vignette on databases. Take a look at dplyr‘s translate_sql function to see how it converts dplyr verbs into SQL statements.

The sparklyr package takes advantage of dplyr’s SQL connectivity and R’s lazy evaluation procedures by sending all the dplyr operations to Spark SQL. Therefore, as an R user, you don’t need to write anything but dplyr to get the awesomeness of Spark SQL and Catalyst!

Spark Context

In order to start using Spark as our computation engine, we need to define a Spark context (or Spark Session). We will use the sparklyr function spark_connect to create a Spark SQL context. There are a slew of arguments you can use to modify the parameters of your Spark SQL context, but we will stick to the defaults. By specifying yarn-client as our master, we are telling Spark we want a yarn client application. Full an overview of the environments on which you can run spark, take a look at the documentation page here.

library(sparklyr)
sc <- spark_connect("yarn-client")

Import Into Spark DataFrames

In my cluster’s storage account on Azure HDInsight, I have loaded data from Freddie Mac’s single family loan database. I then use the spark_read_csv function to read my folder of mortgage originations data into a Spark DataFrame.

origins <- file.path("http://wasbmrs-spark@alizaidi.blob.core.windows.net",
                     "user/RevoShare/alizaidi/Freddie/Acquisition")
freddie_origins <- spark_read_csv(sc,
                                  path = origins,
                                  name = 'freddie_origins',
                                  header = FALSE,
                                  delimiter = "|"
                                  )

Using dplyr to Query our Spark DataFrame

The resulting object from the above operation is a Spark DataFrame. If we peek at it’s class, we notice that in addition to being a tbl_spark object, it is also of class tbl_sql, so all dplyr methods are converted to Spark SQL statements and run on the spark application defined through the spark context sc.

 ## [1] "tbl_spark" "tbl_sql"   "tbl_lazy"  "tbl"
library(dplyr)
freddie_origins %>% head
 ## Source:   query [?? x 25]
 ## Database: spark connection master=yarn-client app=sparklyr local=FALSE
 ## 
 ##      V1     V2    V3     V4    V5    V6    V7    V8    V9   V10    V11
 ##   <chr>  <int> <chr>  <int> <int> <chr> <int> <chr> <dbl> <chr>  <int>
 ## 1   751 199910     N 202909    NA   000     1     O    71    20 180000
 ## 2   733 199909     N 202908 29540   000     1     O    51       116000
 ## 3   755 199905     N 202904 29540    30     1     O    95    38 138000
 ## 4   669 200206     N 202901    NA   000     1     O    80    33 162000
 ## 5   732 199904     N 202903 17140   000     1     O    25    10  53000
 ## 6   715 199904     N 202903 17140   000     1     O    67    35  91000
 ## # ... with 14 more variables: V12 <int>, V13 <dbl>, V14 <chr>, V15 <chr>,
 ## #   V16 <chr>, V17 <chr>, V18 <chr>, V19 <int>, V20 <chr>, V21 <chr>,
 ## #   V22 <int>, V23 <int>, V24 <chr>, V25 <chr>

Therefore, we can do any our common dplyr operations directly on this Spark DataFrame, and through the magic of method dispatch in R, the operations are converted to SQL and sent to Catalyst for evaluation in Spark SQL.

freddie_rename <- freddie_origins %>% rename(
                          credit_score = V1,
                          first_payment = V2,
                          first_home = V3,
                          maturity = V4,
                          msa = V5,
                          mi_perc = V6,
                          num_units = V7,
                          occ_status = V8,
                          cltv = V9,
                          dti = V10,
                          upb = V11,
                          ltv = V12,
                          orig_rate = V13,
                          channel = V14,
                          ppm = V15,
                          prod_type = V16,
                          state = V17,
                          prop_type = V18,
                          post_code = V19,
                          loan_number = V20,
                          loan_purpose = V21,
                          orig_term = V22,
                          num_borrowers = V23,
                          seller = V24,
                          servicer = V25
                          )
freddie_rename %>% head
 ## Source:   query [?? x 25]
 ## Database: spark connection master=yarn-client app=sparklyr local=FALSE
 ## 
 ##   credit_score first_payment first_home maturity   msa mi_perc num_units
 ##          <chr>         <int>      <chr>    <int> <int>   <chr>     <int>
 ## 1          751        199910          N   202909    NA     000         1
 ## 2          733        199909          N   202908 29540     000         1
 ## 3          755        199905          N   202904 29540      30         1
 ## 4          669        200206          N   202901    NA     000         1
 ## 5          732        199904          N   202903 17140     000         1
 ## 6          715        199904          N   202903 17140     000         1
 ## # ... with 18 more variables: occ_status <chr>, cltv <dbl>, dti <chr>,
 ## #   upb <int>, ltv <int>, orig_rate <dbl>, channel <chr>, ppm <chr>,
 ## #   prod_type <chr>, state <chr>, prop_type <chr>, post_code <int>,
 ## #   loan_number <chr>, loan_purpose <chr>, orig_term <int>,
 ## #   num_borrowers <int>, seller <chr>, servicer <chr>

The origination date is buried inside the loan number field. We will pick it out by indexing the loan number substring:

freddie_rename %>% select(loan_number)
 ## Source:   query [?? x 1]
 ## Database: spark connection master=yarn-client app=sparklyr local=FALSE
 ## 
 ##     loan_number
 ##           <chr>
 ## 1  F199Q1000001
 ## 2  F199Q1000002
 ## 3  F199Q1000003
 ## 4  F199Q1000004
 ## 5  F199Q1000005
 ## 6  F199Q1000006
 ## 7  F199Q1000007
 ## 8  F199Q1000008
 ## 9  F199Q1000009
 ## 10 F199Q1000010
 ## # ... with more rows

Observe that year field is augmented with the quarter field and then some additional loan identification numbers. We will extract that by using some very simple string operations:

freddie_rename <- freddie_rename %>% 
  mutate(orig_date = substr(loan_number, 3, 4),
         year = as.numeric(substr(loan_number, 3, 2)))
freddie <- freddie_rename %>% 
  mutate(orig_year = paste0(ifelse(year < 10, "200", 
                                   ifelse(year > 16, "19",
                                          "20")), year))
freddie <- freddie %>% 
  mutate(orig_year = substr(orig_year, 1, 4))
freddie %>% head
 ## Source:   query [?? x 28]
 ## Database: spark connection master=yarn-client app=sparklyr local=FALSE
 ## 
 ##   credit_score first_payment first_home maturity   msa mi_perc num_units
 ##          <chr>         <int>      <chr>    <int> <int>   <chr>     <int>
 ## 1          751        199910          N   202909    NA     000         1
 ## 2          733        199909          N   202908 29540     000         1
 ## 3          755        199905          N   202904 29540      30         1
 ## 4          669        200206          N   202901    NA     000         1
 ## 5          732        199904          N   202903 17140     000         1
 ## 6          715        199904          N   202903 17140     000         1
 ## # ... with 21 more variables: occ_status <chr>, cltv <dbl>, dti <chr>,
 ## #   upb <int>, ltv <int>, orig_rate <dbl>, channel <chr>, ppm <chr>,
 ## #   prod_type <chr>, state <chr>, prop_type <chr>, post_code <int>,
 ## #   loan_number <chr>, loan_purpose <chr>, orig_term <int>,
 ## #   num_borrowers <int>, seller <chr>, servicer <chr>, orig_date <chr>,
 ## #   year <dbl>, orig_year <chr>

Calculate Average Credit Score by Year

Now that we have created features defining the quarter and year of the mortgage origination, we can create a summary of average credit attributes by year. For example, here’s the average credit score by year for each state:

fico_year <- freddie %>% group_by(orig_year, state) %>% 
  summarise(ave_fico = mean(credit_score)) %>% collect
fico_year %>% head
 ## Source: local data frame [6 x 3]
 ## Groups: orig_year [6]
 ## 
 ##   orig_year state ave_fico
 ##       <chr> <chr>    <dbl>
 ## 1      2008    NM 730.7395
 ## 2      2012    WI 769.2008
 ## 3      2011    KS 763.1039
 ## 4      2006    WI 728.6378
 ## 5      2014    AR 750.9074
 ## 6      2015    MI 752.5942

Even though we are using Spark to do most of computation, we can still proceed as though we were working entirely in R. For example, we could create a function for evaluating the average credit attribute, which will make it easier to obtain different summarizations:

year_state_sum <- function(val = "credit_score") {
  library(lazyeval) 
year_state <- freddie %>% group_by(orig_year, state) %>% summarise_(sum_val = interp(~mean(var), var = as.name(val))) year_state <- year_state %>% collect names(year_state)[3] <- paste0("ave_", val) return(year_state) }

Summarize, Plot, …?, Profit!

While our original dataset was very large, our resulting aggregated result is pretty small. The length of the output is just the number of years of data we have times the number of states:

$$
\text{number of rows} = \sum years \times\sum states
$$

Therefore, we could collect that object into a local R data.frame and plot it using the plethora of plotting libraries in R. In this case, we have data at the state-level, so I will use the awesome rMaps to make a htmlwidget containing average debt-to-income by state and year:

library(rMaps)
year_state_sum("dti") %>% 
  mutate(year = as.numeric(orig_year)) %>% 
  rMaps::ichoropleth(ave_dti ~ state, data = .,
                     animate = "year",
                     geographyConfig = list(popupTemplate = "#!function(geo, data) {
 return '
'+ data.state + '
' + 'Average DTI in '+ data.year + ': ' +
data.ave_dti.toFixed(2)+ '
';}!#"
)) -> state_dti state_dti$save("StateMapDTI.html", cdn = T) htmltools::includeHTML("StateMapDTI.html")

Click here for a larger version without the scrollbar.

The End

Thank you for reading! If this post was interesting, I welcome you to check out my course on R and Spark here and my in-development booklet on Spark and R. I also spoke about Spark and R at the recent Spark Summit in Brussels, and you can find the recording of that talk here. Thanks!