Agendando jobs Spark no k8s através do Airflow

Apache Spark é uma ferramenta para processamento de dados em larga escala, já consolidada no mercado. O Apache Airflow é uma ferramenta para agendamento e monitoração de workflows, criado pelo Airbnb, vem ganhando destaque nos ultimos anos. O Kubernetes é uma ferramenta para orquestração de containers extremamente consolidada no mercado e com muitos recursos que facilitam a administração, diminuindo o overhead operacional de se manter ambientes com containers.

Como utilizamos o Apache Spark no Elo7

Atualmente utilizamos o Apache Spark para realizar ETL de dados produzidos por aplicações. Os dados são consumidos do nosso datalake e utilizados em diversos casos, como por exemplo analises do time de Data Science.

Vale mencionar que possuímos apenas processos batches utilizando Apache Spark.

Utilizamos duas plataformas para agendar estes processos, o EMR e o Kubernetes, e estamos movendo todos os processos que utilizam o EMR para o Kubernetes, com auxilio do Apache Airflow. Para que este post não fique muito longo deixarei os motivos referentes a esta movimentação para um post futuro.

Como utilizamos o Apache Airflow

O Apache Airflow roda dentro do cluster de Kubernetes, utilizamos o Celery como executor do airflow, estes executors também rodam dentro do cluster de Kubernetes.

Temos como premissa, não executar nenhum processamento pesado nestes executors, desta forma, o workers do Celery não ficam bloqueados. O que fazemos é a criação de pods no Kubernetes através do Kubernetes Operator. Uma alternativa seria o uso do Kubernetes Executor como executor do Apache Airflow, porém ainda não o testamos e também gostamos da liberdade de executar qualquer container através do Kubernetes Operator.

Agendando jobs Spark no Airflow

O Apache Airflow possuí o Spark Submit Operator, porém este requer a instalação do Apache Spark no container do Apache Airflow. O container que criamos do Apache Airflow possuí um tamanho de aproximadamente 767Mb, a instalação do Apache Spark acrescentaria cerca de 1.5Gb, isto somente para conseguir utilizar o Spark Submit Operator.

Devido ao fatos acima mencionados optamos por utilizar o Kubernetes Operator para criar um pod utilizando o container do Apache Spark, o qual realiza o submit do job spark, que é executado em modo cluster (https://spark.apache.org/docs/latest/running-on-kubernetes.html#cluster-mode) e com a flag spark.kubernetes.submission.waitAppCompletion-false.

Porém para utilizar o Kubernetes Operator são necessárias algumas alteraçoes no entrypoint do container do Apache Spark:

function airflow-submit() {
	# disable randomized hash for string in Python 3.3+
	export PYTHONHASHSEED=0

	"${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@" 2>&1 | tee /tmp/out.log
	tac /tmp/out.log \
		| grep -m1 -B12 LoggingPodStatusWatcherImpl \
		| head -n12 \
		| sed 's/\t //g' \
		| awk -F": " '{gsub(" ", "_", $1); printf("\"%s\":\"%s\", ", $1, $2);}' \
		| awk '{printf("{%s}", substr($0, 1, length($0)-2));}' > /airflow/xcom/return.json
	echo "Gererated /airflow/xcom/return.json: $(cat /airflow/xcom/return.json)"
}

O trecho de código acima realiza o submit de um job Apache Spark, captura a resposta deste submit, extraí o json que o Apache Spark gera como resposta e salva ele no arquivo /airflow/xcom/return.json. Isto é necessário devido o Apache Airflow esperar que o retorno de um operator seja escrito neste arquivo.

Além disso, criamos nosso próprio operator para realizar o submit de jobs Apache Spark, o qual captura o retorno do submit e o salva no xcom da task. Isto é necessário para a monitoração do estado do pod, que será descrito a seguir.

Com estas alterações já é possível realizar submit de jobs Apache Spark, mas, o Apache Airflow possui um bug na biblioteca que ele utiliza para comunicação com o Kubernetes, o qual não efetua o tratamento correto na captura do stdout e/ou stderr.

Para correção do bug copiamos todo o conteúdo do arquivo pod_laucher.py alterando a função abaixo, e o incluímos como um novo arquivo, o qual é utilizado pelo nosso operator:

    def _exec_pod_command(self, resp, command):
        if resp.is_open():
            self.log.info('Running command... %s\n', command)
            resp.write_stdin(command + '\n')
            executed = False
            while resp.is_open():
                resp.update(timeout=1)
                if resp.peek_stdout():
                    return resp.read_stdout()
                if resp.peek_stderr():
                    self.log.info(resp.read_stderr())
                    break
                else:
                    if executed:
                        self.log.info('Command executed, breaking loop')
                        break
                    executed = True
        return None

Com o nosso operator e as alterações realizadas no entrypoint do container do Apache Spark já conseguimos realizar o submit de jobs spark, porém ainda é necessário monitorar sua execução, para tal, criamos um Sensor, o qual lê o json retornado pelo submit para extrair o pod_name e verificar o status deste pod no Kubernetes.

Conclusão

O Apache Spark é uma excelente ferramenta e tem nos atendido bem. Para utilizar o Apache Spark através do airflow dentro da nossa infra-estrutura de Kubernetes foram necessárias algumas alterações:

  • A criação de um operator para efetuar o submit de jobs Apache Spark através da criação de um pod
  • A correção do bug no tratamento das saídas de um pod
  • A criação de um sensor para monitorar o estado em que um pod se encontra

Para estas alterações são necessários bons conhecimentos do funcionamento do Apache Spark e Apache Airflow.