Quando falamos em utilizar thread em um script pyspark, isso se torna algo bem confuso. Levei tempo para perceber que uma das respostas para meu problema de tuning estava no tempo que gastava para gravar dados no HDFS. Então pensei que seria possível aplicar um tipo de paralelismo onde, dado meu caso de uso, fosse possível gerar resultados simultaneamente, independente do modo como o programa se distribuí.
Como o próprio site da apache diz à respeito do spark:
Spark é uma estrutura rápida e poderosa que fornece uma API para executar o processamento distribuído em massa em conjuntos de dados resilientes.
Mas por que alguém tentaria utilizar threads em um processo distribuído?
Imagine então linhas de código – chamadas de funções para ser mais específico. Criamos uma estrutura onde uma chamada é feita linha após linha, aguardando o resultado da função anterior para iniciar. Agora, imagine que você possui um arquivo e precise realizar dois tipos de processamento, bem conhecidos por sinal: wordCount e charCount.
Pode parecer algo simples, mas, aplicando esse exemplo em um cenário de projeto maior e mais complexo, torna a utilização de threads algo muito trivial, principalmente quando falamos de ações que não possuem dependências entre si, como a gravação de diversos parquet’s distintos, oriundos de um único dataframe, ou da criação de um novo fluxo, além do principal, aplicando regras e tratativas em cima de um rdd até sua etapa final de gravação no HDFS, deixando o fluxo executar normalmente.
O processo está distribuído, porém, não executando paralelamente (a nível de linhas de código). Quando aplicamos threads dependendo do seu caso de uso, o resultado pode ser incrível.
Olhamos para mais uma explicação
Thread, no seu conceito, é:
“Um pequeno programa que trabalha como um subsistema, sendo uma forma de um processo se autodividir em duas ou mais tarefas. É o termo em inglês para Linha ou Encadeamento de Execução. Essas tarefas múltiplas podem ser executadas simultaneamente para rodar mais rápido do que um programa em um único bloco ou praticamente juntas, mas que são tão rápidas que parecem estar trabalhando em conjunto ao mesmo tempo.”
Fonte: O que é Thread.
Em outras palavras, thread faz com que suas tarefas sejam paralelizadas executando-as simultaneamente. Sendo assim, mais rápidas que uma tarefa executada linha após linha. Imagine que, dada uma regra funcional você possa executar tarefas em paralelo, como filtrar e separar dados ruins e bons de um dataframe. Além disso, realizar determinadas atividades, tais como .withColumn()
, .map()
e/ou .write()
. São arquivos gerados a partir de uma única variável, que não possuem nenhum grau de dependência.
O Spark, por si só, já trabalha de maneira distribuída, porém, quando falamos em linhas de código, o processo não é inteligente o suficiente para iniciar a primeira função, e em paralelo as seguintes funções para que, sem interferências, sejam processadas ao mesmo tempo, mas é inteligente o suficiente para alocar recursos em tarefas menores, e assim liberar espaço logo que finalizar um processo.
Por isso, quando utilizamos threads essas funções executam em paralelo utilizando os recursos dinamicamente (principalmente quando alternamos o scheduler.mode
de FIFO para FAIR).
Como aplicar threads em um processo pyspark?
O script abaixo traz um exemplo simples de como aplicar threads em um escopo utilizando pyspark, carregando um arquivo e processando duas funções em paralelo:
import threading
import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext()
sqlContx = SQLContext(sc)
# Carregando arquivo de entrada e quebrando em palavras separadas por espaço.
# Loading input file and breaking into words separated by space.
file = sc.textFile("/inFiles/shakespeare.txt").flatMap(lambda line: line.split(" ")).cache()
# https://www.cloudera.com/documentation/enterprise/5-8-x/topics/spark_develop_run.html
def wordCount(file):
wordCounts = file.map(lambda word: (word, 1)).reduceByKey(lambda v1,v2:v1 +v2).toDF(["Word","Count"])
wordCounts.write.mode("overwrite").parquet("/outFiles/wordcount")
def charCount(file):
wordCounts = file.map(lambda word: (word, 1)).reduceByKey(lambda v1,v2:v1 +v2)
charCounts = wordCounts.flatMap(lambda pair:pair[0]).map(lambda c: c).map(lambda c: (c, 1)).reduceByKey(lambda v1,v2:v1 +v2).toDF()
charCounts.write.mode("overwrite").parquet("/outFiles/charcount")
# Instanciando variáveis threads com funções na memória.
# Instantiating thread variables with functions in memory.
T1 = threading.Thread(target=wordCount, args=(file,))
T2 = threading.Thread(target=charCount, args=(file,))
# Iniciando execução das threads.
# Starting execution of threads.
T1.start()
T2.start()
# Pausando execução das threads para seguir o fluxo principal. Sem eles, a thread rodará em paralelo.
# Pausing thread execution to follow main stream. Without them, the thread will run in parallel.
T1.join()
T2.join()
No primeiro momento, importamos o pacote Threading do python:
import threading
Um ponto valioso na utilização de threads é quando combinamos sua execução logo após de um .cache()
. Quando falamos de ação/transformação em spark, lembramos que, dado um método, as possibilidades da ação chamar as mesmas transformações inúmeras vezes são altas quando não desenhado da melhor forma. Pior então em um processo paralelo, onde acessamos o mesmo registro simultaneamente. Colocando sua variável na memória, as tarefas usarão os dados paralelamente, sem precisar carregá-los de novo.
file = sc.textFile("/inFiles/shakespeare.txt").flatMap(lambda line: line.split(" ")).cache()
Instanciamos variáveis threads com funções passando a variável (no nosso caso um rdd, como exemplo) como parâmetro de entrada.
# Instanciando variáveis threads com funções na memória.
T1 = threading.Thread(target=wordCount, args=(file,))
T2 = threading.Thread(target=charCount, args=(file,))
Iniciamos a execução com .start()
e paramos com .join()
. Caso contrário, podemos deixá-lo executando paralelamente até finalizar, junto ao fluxo principal.
# Iniciando execução das threads.
T1.start()
T2.start()
# Pausando execução das threads para seguir o fluxo principal.
T1.join()
T2.join()
Com o script pronto, é possível visualizar sua execução e as quebras que os processos paralelos realizam dentro do fluxo.
No exemplo abaixo, estou realizando cinco tarefas em paralelo, onde duas estão gravando dados no HDFS e uma está realizando um .count()
:

Considerações finais
A primeira aparição de thread em meu script spark foi quando precisamos aplicar tuning, além da documentação, e diminuir o tempo de execução. Algumas considerações foram levantadas após esse trabalho:
- É necessário conhecer bem seu fluxo para avaliar de fato a utilização de threads, o que pode ou não tornar seu processo mais rápido. Em cenários pequenos de teste, o tempo de execução de um fluxo em paralelo era o mesmo que rodando linha após linha:
- Caso não esteja utilizando classes e objetos, ou uma variável global, não será possível utilizar o resultado dentro e fora de uma thread, principalmente se a mesma está sendo executada em paralelo junto a seu fluxo principal. Isso porque a variável pode estar sendo criado no mesmo momento do uso.
- Se possível, utilize
.cache()
! Threads que utilizam uma variável já carregada na memória – elas podem reduzir ainda mais seu tempo de processamento, isso porque o tempo de ação/transformação é menor para um dataframe estando em memória. - Crie Pool’s! Utilizando job scheduling do spark, é possível visualizar suas tarefas rodando em paralelo no applicationMaster alternando a maneira como o spark trabalha,
spark.scheduler.mode=FAIR
(Default:FIFO) no spark-submit ou dentro do script com o SparkContext, além de outras possibilidades. Como mostra na imagem a seguir:

Conclusão
Por fim, após um ano trabalhando com spark, pude perceber o quão poderoso ele é. Não somente isso, como pude perceber que ainda é novo na mão de muitos e que poucos de fato compreendem como ele funciona.
Com muitas horas de pesquisa e testes árduos, senti que as informações podem ser explicadas de maneira simples e sucintas demais para que qualquer leigo no assunto possa entender e aplicar (a documentação não vai te ensinar como fazer um projeto do zero).
O mundo de big data nunca esteve tão fácil de colocá-lo em prática e, após esse tempo de experiencia, fui motivado a compartilhar temas difíceis de difundir no começo dessa caminhada.
Por isso, espero daqui para frente, compartilhar com vocês uma série de artigos onde trabalharemos com detalhes, de maneira clara, simples e completa e espero que seja uma via de mão dupla para que juntos possamos compartilhar ideias e experiências. Por isso, faça seus comentários.
Nos vemos por ai!