Agendando jobs Spark no k8s através do Airflow
Apache Spark é uma ferramenta para processamento de dados em larga escala, já consolidada no mercado. O Apache Airflow é uma ferramenta para agendamento e monitoração de workflows, criado pelo Airbnb, vem ganhando destaque nos ultimos anos. O Kubernetes é uma ferramenta para orquestração de containers extremamente consolidada no mercado e com muitos recursos que facilitam a administração, diminuindo o overhead operacional de se manter ambientes com containers.
Como utilizamos o Apache Spark no Elo7
Atualmente utilizamos o Apache Spark para realizar ETL de dados produzidos por aplicações. Os dados são consumidos do nosso datalake e utilizados em diversos casos, como por exemplo analises do time de Data Science.
Vale mencionar que possuímos apenas processos batches utilizando Apache Spark.
Utilizamos duas plataformas para agendar estes processos, o EMR e o Kubernetes, e estamos movendo todos os processos que utilizam o EMR para o Kubernetes, com auxilio do Apache Airflow. Para que este post não fique muito longo deixarei os motivos referentes a esta movimentação para um post futuro.
Como utilizamos o Apache Airflow
O Apache Airflow roda dentro do cluster de Kubernetes, utilizamos o Celery como executor do airflow, estes executors também rodam dentro do cluster de Kubernetes.
Temos como premissa, não executar nenhum processamento pesado nestes executors, desta forma, o workers do Celery não ficam bloqueados. O que fazemos é a criação de pods no Kubernetes através do Kubernetes Operator. Uma alternativa seria o uso do Kubernetes Executor como executor do Apache Airflow, porém ainda não o testamos e também gostamos da liberdade de executar qualquer container através do Kubernetes Operator.
Agendando jobs Spark no Airflow
O Apache Airflow possuí o Spark Submit Operator, porém este requer a instalação do Apache Spark no container do Apache Airflow. O container que criamos do Apache Airflow possuí um tamanho de aproximadamente 767Mb, a instalação do Apache Spark acrescentaria cerca de 1.5Gb, isto somente para conseguir utilizar o Spark Submit Operator.
Devido ao fatos acima mencionados optamos por utilizar o Kubernetes Operator para criar um pod utilizando o container do Apache Spark, o qual realiza o submit do job spark, que é executado em modo cluster (https://spark.apache.org/docs/latest/running-on-kubernetes.html#cluster-mode) e com a flag spark.kubernetes.submission.waitAppCompletion-false
.
Porém para utilizar o Kubernetes Operator são necessárias algumas alteraçoes no entrypoint do container do Apache Spark:
function airflow-submit() {
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
"${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@" 2>&1 | tee /tmp/out.log
tac /tmp/out.log \
| grep -m1 -B12 LoggingPodStatusWatcherImpl \
| head -n12 \
| sed 's/\t //g' \
| awk -F": " '{gsub(" ", "_", $1); printf("\"%s\":\"%s\", ", $1, $2);}' \
| awk '{printf("{%s}", substr($0, 1, length($0)-2));}' > /airflow/xcom/return.json
echo "Gererated /airflow/xcom/return.json: $(cat /airflow/xcom/return.json)"
}
O trecho de código acima realiza o submit de um job Apache Spark, captura a resposta deste submit, extraí o json que o Apache Spark gera como resposta e salva ele no arquivo /airflow/xcom/return.json
. Isto é necessário devido o Apache Airflow esperar que o retorno de um operator seja escrito neste arquivo.
Além disso, criamos nosso próprio operator para realizar o submit de jobs Apache Spark, o qual captura o retorno do submit e o salva no xcom da task. Isto é necessário para a monitoração do estado do pod, que será descrito a seguir.
Com estas alterações já é possível realizar submit de jobs Apache Spark, mas, o Apache Airflow possui um bug na biblioteca que ele utiliza para comunicação com o Kubernetes, o qual não efetua o tratamento correto na captura do stdout e/ou stderr.
Para correção do bug copiamos todo o conteúdo do arquivo pod_laucher.py alterando a função abaixo, e o incluímos como um novo arquivo, o qual é utilizado pelo nosso operator:
def _exec_pod_command(self, resp, command):
if resp.is_open():
self.log.info('Running command... %s\n', command)
resp.write_stdin(command + '\n')
executed = False
while resp.is_open():
resp.update(timeout=1)
if resp.peek_stdout():
return resp.read_stdout()
if resp.peek_stderr():
self.log.info(resp.read_stderr())
break
else:
if executed:
self.log.info('Command executed, breaking loop')
break
executed = True
return None
Com o nosso operator e as alterações realizadas no entrypoint do container do Apache Spark já conseguimos realizar o submit de jobs spark, porém ainda é necessário monitorar sua execução, para tal, criamos um Sensor, o qual lê o json retornado pelo submit para extrair o pod_name e verificar o status deste pod no Kubernetes.
Conclusão
O Apache Spark é uma excelente ferramenta e tem nos atendido bem. Para utilizar o Apache Spark através do airflow dentro da nossa infra-estrutura de Kubernetes foram necessárias algumas alterações:
- A criação de um operator para efetuar o submit de jobs Apache Spark através da criação de um pod
- A correção do bug no tratamento das saídas de um pod
- A criação de um sensor para monitorar o estado em que um pod se encontra
Para estas alterações são necessários bons conhecimentos do funcionamento do Apache Spark e Apache Airflow.