Com o aumento da demanda por maior autonomia em pipelines de dados, a plataforma de análise de dados Databricks tornou-se uma solução popular para empresas que desejam desenvolver pipelines escaláveis e eficientes. Durante um projeto freelancer no qual me foi solicitado uma maior autonomia sobre o core da empresa, que basicamente é definido por uma pipeline de dados que importa arquivos de vários SFTP diferentes, faz a ingestão para o S3, uma série de tratamentos de dados para seguir um layout mandatório e, por fim, o envio para um sistema de fila onde os demais sistemas poderiam usufruir dos dados refinados. Pensei em como poderia atender aos seguintes requisitos com pouco ou quase nenhum desenvolvimento além do que já existe de recurso no Databricks. Por isso, falaremos neste artigo como você pode utilizar a API do Databricks como um serviço interno e dar autonomia para:
- Reprocessar um arquivo desde sua origem (SFTP).
- Gerar apenas o JSON que é enviado para o SQS.
- Processar um arquivo em ambiente de homologação sem enviar para o SQS. Essas são algumas autonomias que foram exigidas, mas o que será demonstrado aqui vale para qualquer regra de negócio.
dbutils — notebook.run e widgets.get
Antes de algumas evoluções do Databricks em relação a “steps” dentro de um job, eu sempre utilizei uma função muito interessante, que é o dbutils.notebook.run. Com ele, eu conseguia montar uma espécie de orquestração de notebooks que gostaria de executar tanto em sequência como em paralelo, bem como atribuir retry e tempo (parâmetros que você pode passar nessa função). Desta forma, eu montei um único notebook mais ou menos como o abaixo, que fazia a minha orquestração de extração, transformação e envio dos dados de centenas de arquivos:
Exemplo do notebook de orquestração:
class PropriedadesNoteBook: def __init__(self, path, timeout, retry=1): self.path = path self.timeout = timeout self.retry = retrynbmovefiles = PropriedadesNoteBook(“0-move-ftp-to-s3”, 500,3) nblimpadados = PropriedadesNoteBook(“1-notebook_limpa_dados”, 1000,2) nbimport = PropriedadesNoteBook(“2-notebook_importa_arquivos”, 1000,2) nbidentificadores = PropriedadesNoteBook(“3-notebook_processa_identificadores”,1000,2) nbprocessjson = PropriedadesNoteBook(“4-notebook_processa_json”,2000,2) nbexcluidados = PropriedadesNoteBook(“6-notebook_envia_json_sqs”,1000,1) if dbutils.notebook.run(nbmovefiles.path, nbmovefiles.timeout) != “error”: if dbutils.notebook.run(nblimpadados.path, nblimpadados.timeout) != “error”: if dbutils.notebook.run(nbimport.path, nbimport.timeout) != “error”: if dbutils.notebook.run(nbidentificadores.path, nbidentificadores.timeout) != “error”: if dbutils.notebook.run(nbprocessjson.path, nbprocessjson.timeout) != “error”: if dbutils.notebook.run(nbexcluidados.path, nbexcluidados.timeout) != “error”: |
No exemplo acima, eu faço toda a minha etapa de “ETL” de forma sequencial, visto que, em meu caso, os arquivos eram disponibilizados no SFTP de 1 a 3 vezes ao dia (às vezes muito mais, que é onde quero chegar com esse artigo). Legal, você tem seu job, sua orquestração, seus notebooks que executam tarefas e seu job é executado em uma janela definida de acordo com o que foi definido com o time de negócios. Mas, depois de desenvolvido, testado e validado, esse processo ao longo que o próprio negócio foi crescendo, principalmente em 3 aspectos:
- Muitos setups de novos clientes, que o time de produtos precisava validar o JSON processado pelo Databricks a nível de homologação ou produção.
- Recorrência de clientes que, por um erro do lado deles, o arquivo foi disponibilizado fora do schedule do seu Job.
- Necessidade de enviar o fluxo completo, mas a um sistema de mensageria diferente.
Bom, chegamos onde será demonstrado como utilizar e a efetividade das funções abaixo:
notebook_params (parametro da API do databricks) dbutils.widgets.getArgument dbutils.widgets.text |
Passando e recebendo dados através do widgets entre notebooks.
Conforme havia comentado anteriormente, eu senti a necessidade de trazer mais liberdade para usuários e sistemas e tirar um pouco a preocupação do engenheiro de dados de modificar, executar e monitorar o fluxo de importação, processamento e envio dos dados para o AWS SQS (sistema de fila de mensagens). Desta forma, comecei a elaborar um notebook responsável totalmente pelo reprocessamento das necessidades acima, relembrando:
- Muitos setups de novos clientes que o time de produtos precisava validar o JSON processado pelo Databricks a nível de homologação ou produção.
- Recorrência de clientes que, por um erro do lado deles, o arquivo foi disponibilizado fora do schedule do seu Job.
- Necessidade de enviar o fluxo completo, mas a um sistema de mensageria diferente.
Continuando, além do notebook que desenvolvi, que denominei de reprocessamento, criei um segundo notebook chamado orquestrador_reprocessamento e, por último, criei um bucket dedicado a esse tipo de necessidade de reprocessamento, obter apenas o penúltimo processo que o JSON, enviar ou não para o SQS e etc… Ótimo, agora como o usuário ou aplicação vai chamar esse notebook e especificar o que ele quer exatamente diante dessas 3 novas funcionalidades? Para isso, utilizei a própria API do Databricks, no qual é composta pelo ID do seu cluster + .cloud.databricks.com/api/2.1/jobs/run-now. Ficando algo do tipo: https://meuidcluster.cloud.databricks.com/api/2.1/jobs/run-now. Além disso, conforme mencionei anteriormente, criei um bucket focado em reprocessamento de dados, no qual dentro desse bucket criei as seguintes: pastas:
– hmle/json
– hmle/sqs
– json_gerados
– prod/json
– prod/sqs
Basicamente, como grande parte das necessidades (exceto o processamento completo que envolve o SFTP) envolve a necessidade de colocar o arquivo de SETUP para processamento, eu criei as pastas acima onde meu notebook de reprocessamento tem a seguinte inteligência:
– hmle/json — Caso exista arquivos dentro desse diretório, eu inicio meu notebook de reprocessamento passando via widgets para meu notebook do fluxo normal (dessa forma, mantenho a regra do fluxo diário sem criar algo separado com essa necessidade de reprocessamento) da seguinte forma:
HMLE_PROCESS = "T"
PROD_PROCESS = "F"
JSON_PROCESS = "T"
dbutils.widgets.text('HMLE_PROCESS', HMLE_PROCESS)
dbutils.widgets.text('PROD_PROCESS', PROD_PROCESS)
dbutils.widgets.text('JSON_PROCESS', JSON_PROCESS)
if dbutils.notebook.run(nbprocessjson.path, nbprocessjson.timeout, {"HMLE_PROCESS": HMLE_PROCESS, "PROD_PROCESS": PROD_PROCESS, "JSON_PROCESS": JSON_PROCESS}) != "error":
print("Notebook "+nbprocessjson.path+ " executado com sucesso!")
else:
print("Ocorreu um erro na execução do notebook: "+nbprocessjson.path)
send_slack_message_reprocessing("1-importa arquivos lake", f"Ocorreu uma falha na execução do notebook: {nbprocessjson.path}")
|
O trecho de código acima é executado no meu notebook de reprocessamento. Repare que eu passo nos widgets tudo de acordo como foi disponibilizado o arquivo pelo usuário ao bucket, ou seja, quero processar o arquivo e obter o JSON em ambiente de homologação. Já no notebook que vai receber os parâmetros, fica da seguinte forma:
try:
JSON_PROCESS = dbutils.widgets.getArgument("JSON_PROCESS")
PROD_PROCESS = dbutils.widgets.getArgument("PROD_PROCESS")
HMLE_PROCESS= dbutils.widgets.getArgument("HMLE_PROCESS")
print(f"JSON - {JSON_PROCESS} PROD - {PROD_PROCESS} - File json name - {HMLE_PROCESS}")
except Exception as e:
print(f"Falha nos widgets - {str(e)}")
print("Processamento diário!")
pass
|
Desta forma, posso chamar meu notebook de processamento diário, adaptar futuras regras de negócio sem me preocupar se isso vai refletir no processamento, pois o reprocessamento já aponta para os notebooks principais.
- hmle/sqs — realizo a mesma lógica do código acima, porém, quando chega no meu notebook de envio ao SQS, valido meus widgets e vejo se ele está preenchido ou não como HMLE ou Prod e faço o apontamento para o SQS correto.
- json_gerados — Quando é solicitado a geração do processamento do arquivo somente até a etapa de JSON (e não enviar ao SQS), eu escrevo o arquivo com ano_mes_dia_minuto_segundo.json neste bucket.
- prod/json — Mesma lógica para o hmle/json.
- prod/sqs — Mesma lógica dos demais, neste caso, envio para o SQS de Prod o arquivo disponibilizado para processamento nesta pasta.
Utilizando a API do databricks para reprocessamento completo
Você pode estar se perguntando: por que você introduziu uma lógica do usuário disponibilizar o arquivo no bucket nas pastas específicas de acordo com sua necessidade, mas o reprocessamento completo (SFTP -> ingestão S3 -> limpeza dados -> processamento layout -> envio SQS) não segue esse padrão e sim via API Databricks? Este caso de reprocessamento completo envolve uma etapa na qual o usuário ou API interna que envia o arquivo já obtido pelo usuário para um dos buckets, que é o acesso ao SFTP de cada cliente (+40 clientes). Além de ser uma informação sensível, eu preciso garantir que, quando manipulo o SFTP do cliente, o arquivo vai chegar até a etapa final que é o AWS SQS. Portanto, é aqui que entra o notebook_params. De forma muito simples, eu passo um atributo dentro desse parâmetro no corpo da requisição, e ela é recebida pelo meu notebook atrelado ao meu job de reprocessamento como um widget também.
Exemplo da requisição:
{ “job_id”: 961722246913259, “pipeline_params”: { “full_refresh”: true }, “notebook_params”: { “ftp”: true } } |
Portanto, no meu notebook de orquestração, eu valido se: dbutils.widgets.get(“ftp”) existe, se está preenchido como true ou false e, caso seja true, eu chamo através do dbutils.notebook.run o meu notebook de comunicação e importação com os SFTP dos clientes.
Conclusão
Neste artigo, exploramos as vantagens de utilizar a API do Databricks e seus recursos em um projeto que envolve a importação, processamento e envio de dados para um sistema de fila. Demonstramos como a plataforma Databricks permite a criação de pipelines de dados escaláveis e eficientes, proporcionando maior autonomia aos usuários e sistemas. Utilizando a função dbutils.notebook.run, foi possível orquestrar notebooks e executar etapas de ETL de forma sequencial e paralela, além de atribuir retry e time. Com a implementação dos widgets e a API do Databricks, conseguimos adaptar o fluxo de trabalho de acordo com as necessidades específicas dos usuários, permitindo o reprocessamento de arquivos, geração de JSONs e processamento em ambiente de homologação ou produção. Ao utilizar a API do Databricks, conseguimos garantir que, mesmo em casos de reprocessamento completo, o arquivo será enviado ao AWS SQS após passar por todas as etapas do pipeline. Isso mostra o quão flexível e eficiente a plataforma Databricks pode ser ao lidar com diferentes regras de negócio e requisitos de processamento de dados. E o mais incrível, criei um fluxo de reprocessamento que segue o fluxo padrão diário da minha pipeline sem ter que deixá-la super complexa.
*O conteúdo deste artigo é de responsabilidade do(a) autor(a) e não reflete necessariamente a opinião do iMasters.