Spark Introduction

Hello everybody!

For the Portuguese version of this post, take a look at: Spark Introduction.

If you enjoy the Big Data world, you may have heard about Apache Spark, a Big Data framework which extends the famous MapReduce model and allows you to make some queries, process data in batches and in streams. It is also used for machine learning and graph processing.

One of the main Spark data structures is the RDD, which stands for Resilient Distributed Datasets. You can think of it as a distributed collection of elements. As the name suggests, the data inside RDDs is distributed across your cluster automatically, since each RDD is split into several partitions which can be processed on different cluster nodes. We explained the Distributed part of RDD, but what about Resilient? Well, machines can fail, so what happens if a machine processing RDD data fails? Spark knows how to recompute the missing partitions and that is why those datasets are resilient.

Now that we know what a RDD is, it is time to show some examples of how to use Spark. The code examples shown in this post can be executed by downloading Spark in the official site or downloading a docker image. For this post, the tests were made using Spark version 2.4.6 from a docker container image, coding in scala kernel notebook.

At Elo7 we normally create our notebooks in Jupyter Enterprise Gateway. There are different kernels available (python, python with Tensorflow, scala, etc) and this structure runs on Kubernetes. Before using it, the users authenticate themselves and can configure a Github key to version their notebooks code there. For more details, there is a post here in the blog explaining about it: Jupyter no Kubernetes.

Before calling some functions, it is necessary to run some imports, since they are needed for Spark configuration and context. Function setMaster receives as parameter the master URL to connect to. For simplicity, specify "local" as master, so it runs on a local machine and does not connect to a cluster. Function setAppName allows you to choose a name for your application. SparkContext receives configurations as parameter and will be used to create RDDs.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

val conf = new SparkConf().setMaster("local").setAppName("Tech Blog - Spark App")
val sc = new SparkContext(conf)

Data can be loaded by reading data from a file or by using parallelize function, as shown below:

val store1Products = sc.parallelize(Seq(("ring", 3), ("bracelet", 5), 
                             ("necklace", 4), ("purse", 3), ("earring", 2)))

In the example, store1Products is a Pair RDD, since it is a key-value RDD. Function parallelize() creates store1Products RDD from a given sequence of tuples. Aiming at retrieving its keys and values, one can do something like this:

store1Products.keys.collect()
// Output: Array(ring, bracelet, necklace, purse, earring)

store1Products.values.collect()
// Output: Array(3, 5, 4, 3, 2)

For those familiar with Python, this resembles a dictionary from which we want to retrieve its keys and values. Also, another analogy someone would do is to compare a Pandas Dataframe with Spark RDD. However, Pandas is not as optimized for processing Big Data as Spark RDDs are.

Note that functions keys.collect() and values.collect() were called to return the keys and the values, while store1Products.collect() returns all the RDD elements. Once our dataset is small, it is fine to use collect(), because the content fits in memory of a single machine. Nevertheless, if this requirement is not fulfilled, use take(n) which returns n RDD elements. It is important to note that collect and take are really useful when we are making tests or debugging, but may bring bottlenecks.

It is also possible to sort the RDD using the keys as sorting criteria:

store1Products.sortByKey().collect()
// Output: 
// Array((bracelet,5), (earring,2), (necklace,4), (purse,3), (ring,3))

Other common operations can be applied on the RDD, for example count() and foreach(function).

store1Products.count()
// Output: 5

store1Products.collect().foreach(println)
/* Output:
   (ring,3)
   (bracelet,5)
   (necklace,4)
   (purse,3)
   (earring,2) */

Operations like join(), leftOuterJoin() (described below in this post) and sortByKey() are transformations. On the other hand, functions like count(), collect(), take(n) and foreach(func) are actions. Transformations return a new RDD as result (for example, sortByKey() returns a RDD sorted by its keys), while actions can write data to a storage system or return a final value (for instance, count() returns the number of items a RDD contains).

Some database concepts may help understand some operations that can be used to join pair RDDs. In databases we can join tables to bring data together by using primary key(s) that are common to both tables. One can join two or more tables. There are different types of joins, for instance: inner, left outer and right outer join. For inner joins the key should be present in both RDDs, for left outer joins the key must be present in the first RDD, while for right outer joins must be in the second.

To make it clearer, another RDD is created below to show the expected behavior for each case.

//RDD store1Products already created above:
//val store1Products = sc.parallelize(Seq(("ring", 3), ("bracelet", 5), 
//                                       ("necklace", 4), ("purse", 3), ("earring", 2)))*/

val store2Products = sc.parallelize(Seq(("earring", 4), ("ring", 1), 
                                        ("hair clips", 5), ("necklace", 2),
                                        ("sunglasses", 2)))

println("Inner Join")
store1Products.join(store2Products).take(5)
// Output: 
// Inner Join
// Array((earring,(2,4)), (ring,(3,1)), (necklace,(4,2)))

println("Left Outer Join")
store1Products.leftOuterJoin(store2Products).take(10)
// Output: 
// Left Outer Join
// Array((bracelet,(5,None)), (earring,(2,Some(4))), (ring,(3,Some(1))), (necklace,(4,Some(2))), (purse,(3,None)))

println("Right Outer Join")
store1Products.rightOuterJoin(store2Products).take(10)
// Output: 
// Right Outer Join
// Array((earring,(Some(2),4)), (ring,(Some(3),1)), (necklace,(Some(4),2)), (hair clips,(None,5)), (sunglasses,(None,2)))

It was already explained the difference between transformations and actions, how to create your own RDDs and join data among them. So, to finish this post, let’s explain how to read data from a file into a Dataframe and do some operations with it, using file sales.csv as input example. A dataframe contains named columns and it is similar to a table for those familiar to relational databases. While RDDs process structured and unstructured data, dataframes are intended for structured and semi-structured data. Both RDDs and Dataframes are lazily evaluated by Spark.


sales.csv

product,seller,quantity,price,day
"ring","Joana",3,35.30,20210913
"bracelet","Leonardo",5,15.50,20210914
"necklace","Marcos",4,20.20,20210913
"ring","Ana",1,32.30,20210913
"purse","Michelle",3,49.30,20210913
"earring","Luísa",2,18.40,20210914
"purse","Felipe",4,55.30,20210914
"ring","Ana",4,35.30,20210913
"purse","Michelle",3,51.30,20210914

Files can be in different formats, such as text, CSV, JSON, so on. For CSV format, for example, there is an option to specify if the file contains a header or not. If the header option is true, the first line is not considered a data record, but a line with the column names. Sometimes, data can contain some duplications and those can be removed by using a function called dropDuplicates(). This function receives as arguments a subset of columns that will be considered when removing the duplications. For example, consider the data contained in sales.csv file. We can see both Ana and Michelle appear in different lines. However, if we consider as keys the seller, the product and the day as well, we see that Michelle sells purses on two different days. So this record will be keeped after applying the dropDuplicates() function. Ana appears twice as well, but she sells the same type of product (rings) on the same day. For the considered keys there is a duplication in Ana records, then only one of them will appear in the final result. This is shown in the following example:

val sales = spark.read.option("header",true).csv("../sales.csv")

println("Sales with duplications")
sales.show()

/* Output:
Sales with duplications
+--------+--------+--------+-----+--------+
| product|  seller|quantity|price|     day|
+--------+--------+--------+-----+--------+
|    ring|   Joana|       3|35.30|20210913|
|bracelet|Leonardo|       5|15.50|20210914|
|necklace|  Marcos|       4|20.20|20210913|
|    ring|     Ana|       1|32.30|20210913|
|   purse|Michelle|       3|49.30|20210913|
| earring|   Luísa|       2|18.40|20210914|
|   purse|  Felipe|       4|55.30|20210914|
|    ring|     Ana|       4|35.30|20210913|
|   purse|Michelle|       3|51.30|20210914|
+--------+--------+--------+-----+--------+
*/

println("Sales without duplications")
val cleanedSales = sales.dropDuplicates("product", "seller", "day")
cleanedSales.show()

/* Output:
Sales without duplications
+--------+--------+--------+-----+--------+
| product|  seller|quantity|price|     day|
+--------+--------+--------+-----+--------+
|necklace|  Marcos|       4|20.20|20210913|
|    ring|     Ana|       1|32.30|20210913|
|   purse|Michelle|       3|51.30|20210914|
|    ring|   Joana|       3|35.30|20210913|
|   purse|Michelle|       3|49.30|20210913|
|   purse|  Felipe|       4|55.30|20210914|
|bracelet|Leonardo|       5|15.50|20210914|
| earring|   Luísa|       2|18.40|20210914|
+--------+--------+--------+-----+--------+
*/

Conclusion

This post brought some basic and important concepts related to Apache Spark, a very powerful framework used for Big Data processing, data cleaning and analysis. There are several transformations and actions that can be applied to the data, which can be read from files or created by using the parallelize() function. Having some previous concepts from databases and Python can help understand better some of the structures and functions explained here.

See you next time!

References

(1) Apache Spark Website

(2) Karau, Holden, et al. Learning spark: lightning-fast big data analysis." O’Reilly Media, Inc.", 2015.