Introdução ao Spark

Olá, galera!

Para a versão em inglês deste post, olhar: Spark Introduction.

Se você gosta do mundo de Big Data, deve ter ouvido falar do Apache Spark, um arcabouço de Big Data que estende o famoso modelo MapReduce e permite que você faça algumas consultas, processe dados em lotes e em streams. Ele também é usado para aprendizado de máquina e processamento de grafos.

Uma das principais estruturas de dados do Spark é o RDD, cuja sigla vem de Resilient Distributed Datasets (conjuntos de Dados Distribuídos e Resilientes). Você pode pensar nele como uma coleção distribuída de elementos. Como o nome sugere, os dados dentro de RDDs são distribuídos em seu cluster automaticamente, uma vez que cada RDD é dividido em várias partições que podem ser processadas em diferentes nós do cluster. Explicamos a parte Distribuída do RDD, mas e quanto ao Resiliente? Bem, as máquinas podem falhar, então o que acontece se uma máquina que processa dados do RDD falhar? O Spark sabe como recalcular as partições ausentes e é por isso que esses conjuntos de dados são resilientes.

Agora que sabemos o que é um RDD, é hora de mostrarmos alguns exemplos de como usar o Spark. Os exemplos de código ilustrados nesta postagem podem ser executados baixando o Spark no site oficial ou fazendo pull de uma imagem docker. Para este post, os testes foram feitos usando o Spark na versão 2.4.6 a partir de uma imagem docker, codificando em um notebook com Scala kernel.

No Elo7 nós normalmente criamos nossos notebooks no Jupyter Enterprise Gateway. Existem diferentes kernels disponíveis (python, python com Tensorflow, scala, etc.) e essa estrutura é executada no Kubernetes. Antes de usá-lo, os usuários se autenticam e podem configurar uma chave no Github para versionamento do código de seus notebooks. Para mais detalhes, há uma postagem aqui no blog explicando sobre isso: Jupyter no Kubernetes.

Antes de chamar algumas funções, é necessário executar alguns imports, visto que eles são necessários para as configurações do Spark e seu contexto. A função setMaster recebe como parâmetro a URL master de conexão. Para simplificar, especifique "local" como master, para que seja executado em uma máquina local e não se conecte a um cluster. A função setAppName permite que você escolha um nome para seu aplicativo. SparkContext recebe configurações como parâmetro e será usado para criar RDDs.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

val conf = new SparkConf().setMaster("local").setAppName("Tech Blog - Spark App")
val sc = new SparkContext(conf)

Os dados podem ser carregados por meio da leitura de um arquivo ou utilizando a função parallelize, como abaixo:

val store1Products = sc.parallelize(Seq(("ring", 3), ("bracelet", 5), 
                             ("necklace", 4), ("purse", 3), ("earring", 2)))

No exemplo, store1Products é o que chamamos de Pair RDD, uma vez que é um RDD com chaves e valores. A função parallelize() cria o RDD store1Products a partir de uma sequência de tuplas. Visando a obter as chaves e os valores do RDD, pode-se fazer o seguinte:

store1Products.keys.collect()
// Output: Array(ring, bracelet, necklace, purse, earring)

store1Products.values.collect()
// Output: Array(3, 5, 4, 3, 2)

Para aqueles familiarizados com Python, isso se assemelha a um dicionário do qual queremos recuperar suas chaves e valores. Além disso, outra analogia que alguém poderia fazer é comparar um Pandas Dataframe com Spark RDD. No entanto, o Pandas não é tão otimizado para processamento de Big Data quanto os RDDs do Spark.

Observe que as funções keys.collect() e values.collect() foram chamadas para retornar as chaves e os valores, enquanto store1Products.collect() retornou todos os elementos do RDD. Uma vez que nosso conjunto de dados é pequeno, não há problema em usar collect(), porque o conteúdo cabe na memória de uma única máquina. No entanto, se esse requisito não for atendido, use take(n) que retorna n elementos do RDD. É importante notar que collect e take são bem úteis quando estamos fazendo testes ou depurando, mas podem trazer gargalos.

É possível também ordenar o RDD usando as chaves como critério de ordenação:

store1Products.sortByKey().collect()
// Output: 
// Array((bracelet,5), (earring,2), (necklace,4), (purse,3), (ring,3))

Outras operações comuns podem ser aplicadas ao RDD, tais como count() e foreach(function).

store1Products.count()
// Output: 5

store1Products.collect().foreach(println)
/* Output:
   (ring,3)
   (bracelet,5)
   (necklace,4)
   (purse,3)
   (earring,2) */

Operações como join(), leftOuterJoin() (descritas mais abaixo neste post) e sortByKey() são transformações. Por outro lado, funções como count(), collect(), take(n) e foreach(func) são ações. Transformações retornam um novo RDD como resultado (por exemplo, sortByKey() retorna um RDD ordenado por suas chaves), enquanto ações podem escrever dados para um sistema de armazenamento como um arquivo ou retornam um resultado final (count(), por exemplo, retorna o número de itens que um RDD contém).

Alguns conceitos de banco de dados ajudam a entender algumas operações que podem ser usadas para unir Pair RDDs. Em bancos de dados podemos juntar tabelas para reunir dados, usando as chaves primárias comuns a ambas as tabelas. Pode-se juntar duas ou mais tabelas, havendo diferentes tipos de junções (joins), por exemplo: inner join, left outer join e right outer join. Para inner joins a chave deve estar presente em ambos os RDDs, para left outer joins deve estar presente no primeiro RDD, enquanto para right outer joins deve estar no segundo.

A fim de elucidar a explicação sobre os joins, abaixo foi criado outro RDD, mostrando o comportamento para cada cenário.

//RDD store1Products already created above:
//val store1Products = sc.parallelize(Seq(("ring", 3), ("bracelet", 5), 
//                                       ("necklace", 4), ("purse", 3), ("earring", 2)))*/

val store2Products = sc.parallelize(Seq(("earring", 4), ("ring", 1), 
                                        ("hair clips", 5), ("necklace", 2),
                                        ("sunglasses", 2)))

println("Inner Join")
store1Products.join(store2Products).take(5)
// Output: 
// Inner Join
// Array((earring,(2,4)), (ring,(3,1)), (necklace,(4,2)))

println("Left Outer Join")
store1Products.leftOuterJoin(store2Products).take(10)
// Output: 
// Left Outer Join
// Array((bracelet,(5,None)), (earring,(2,Some(4))), (ring,(3,Some(1))), (necklace,(4,Some(2))), (purse,(3,None)))

println("Right Outer Join")
store1Products.rightOuterJoin(store2Products).take(10)
// Output: 
// Right Outer Join
// Array((earring,(Some(2),4)), (ring,(Some(3),1)), (necklace,(Some(4),2)), (hair clips,(None,5)), (sunglasses,(None,2)))

Explicamos a diferença entre transformações e ações, como criar seus próprios RDDs e como juntar os dados de diferentes RDDs. Antes de finalizarmos este post, vamos explicar como ler dados de um arquivo em um Dataframe e fazer algumas operações com ele, utilizando o arquivo sales.csv como exemplo. Um dataframe contém colunas nomeadas e é semelhante a uma tabela de bancos de dados relacionais. Enquanto os RDDs processam dados estruturados e não estruturados, os dataframes são destinados a dados estruturados e semiestruturados. Ambos os RDDs e Dataframes são lazily evaluated pelo Spark.


sales.csv

product,seller,quantity,price,day
"ring","Joana",3,35.30,20210913
"bracelet","Leonardo",5,15.50,20210914
"necklace","Marcos",4,20.20,20210913
"ring","Ana",1,32.30,20210913
"purse","Michelle",3,49.30,20210913
"earring","Luísa",2,18.40,20210914
"purse","Felipe",4,55.30,20210914
"ring","Ana",4,35.30,20210913
"purse","Michelle",3,51.30,20210914

Os arquivos podem estar em diferentes formatos, como texto, CSV, JSON, etc. Para o formato CSV, por exemplo, existe uma opção para especificar se o arquivo contém um cabeçalho (header) ou não. Se a opção header for verdadeira, a primeira linha não é considerada um registro de dados, mas sim uma linha com os nomes das colunas. Às vezes, os dados podem conter algumas duplicações, as quais podem ser removidas usando uma função chamada dropDuplicates(). Esta recebe como argumentos um subconjunto de colunas utilizadas como chaves para remoção das duplicações. Por exemplo, considere os dados contidos no arquivo sales.csv. Podemos ver que Ana e Michelle aparecem em linhas diferentes. Porém, se considerarmos como chaves o vendedor, o produto e o dia também, vemos que Michelle vende bolsas em dois dias diferentes. Portanto, este registro será mantido após a aplicação da função dropDuplicates(). Ana também aparece duas vezes, mas vende o mesmo tipo de produto (anéis) no mesmo dia. Para as chaves consideradas existe uma duplicação nos registros da Ana, então apenas um deles aparecerá no resultado final. Isso é mostrado no seguinte exemplo:

val sales = spark.read.option("header",true).csv("../sales.csv")

println("Sales with duplications")
sales.show()

/* Output:
Sales with duplications
+--------+--------+--------+-----+--------+
| product|  seller|quantity|price|     day|
+--------+--------+--------+-----+--------+
|    ring|   Joana|       3|35.30|20210913|
|bracelet|Leonardo|       5|15.50|20210914|
|necklace|  Marcos|       4|20.20|20210913|
|    ring|     Ana|       1|32.30|20210913|
|   purse|Michelle|       3|49.30|20210913|
| earring|   Luísa|       2|18.40|20210914|
|   purse|  Felipe|       4|55.30|20210914|
|    ring|     Ana|       4|35.30|20210913|
|   purse|Michelle|       3|51.30|20210914|
+--------+--------+--------+-----+--------+
*/

println("Sales without duplications")
val cleanedSales = sales.dropDuplicates("product", "seller", "day")
cleanedSales.show()

/* Output:
Sales without duplications
+--------+--------+--------+-----+--------+
| product|  seller|quantity|price|     day|
+--------+--------+--------+-----+--------+
|necklace|  Marcos|       4|20.20|20210913|
|    ring|     Ana|       1|32.30|20210913|
|   purse|Michelle|       3|51.30|20210914|
|    ring|   Joana|       3|35.30|20210913|
|   purse|Michelle|       3|49.30|20210913|
|   purse|  Felipe|       4|55.30|20210914|
|bracelet|Leonardo|       5|15.50|20210914|
| earring|   Luísa|       2|18.40|20210914|
+--------+--------+--------+-----+--------+
*/

Conclusão

Este post trouxe alguns conceitos básicos e importantes relacionados ao Apache Spark, um framework muito poderoso usado para processamento de Big Data, limpeza e análise de dados. Existem várias transformações e ações que podem ser aplicadas aos dados, que podem ser lidos de arquivos ou criados usando a função parallelize(). Conhecimentos prévios de bancos de dados e Python podem ajudar a entender melhor algumas das estruturas e funções aqui explicadas.

Até a próxima!

Referências

(1) Apache Spark Website

(2) Karau, Holden, et al. Learning spark: lightning-fast big data analysis." O’Reilly Media, Inc.", 2015.