APIs e Microsserviços

17 abr, 2023

[API Databricks como serviço interno] dbutils — notebook.run, widgets.getArgument, widgets.text e notebook_params

Publicidade

Com o aumento da demanda por maior autonomia em pipelines de dados, a plataforma de análise de dados Databricks tornou-se uma solução popular para empresas que desejam desenvolver pipelines escaláveis e eficientes. Durante um projeto freelancer no qual me foi solicitado uma maior autonomia sobre o core da empresa, que basicamente é definido por uma pipeline de dados que importa arquivos de vários SFTP diferentes, faz a ingestão para o S3, uma série de tratamentos de dados para seguir um layout mandatório e, por fim, o envio para um sistema de fila onde os demais sistemas poderiam usufruir dos dados refinados. Pensei em como poderia atender aos seguintes requisitos com pouco ou quase nenhum desenvolvimento além do que já existe de recurso no Databricks. Por isso, falaremos neste artigo como você pode utilizar a API do Databricks como um serviço interno e dar autonomia para:

  • Reprocessar um arquivo desde sua origem (SFTP).

dbutils — notebook.run e widgets.get

Antes de algumas evoluções do Databricks em relação a “steps” dentro de um job, eu sempre utilizei uma função muito interessante, que é o dbutils.notebook.run. Com ele, eu conseguia montar uma espécie de orquestração de notebooks que gostaria de executar tanto em sequência como em paralelo, bem como atribuir retry e tempo (parâmetros que você pode passar nessa função). Desta forma, eu montei um único notebook mais ou menos como o abaixo, que fazia a minha orquestração de extração, transformação e envio dos dados de centenas de arquivos:

Exemplo do notebook de orquestração:

class PropriedadesNoteBook:
def __init__(self, path, timeout, retry=1):
self.path = path
self.timeout = timeout
self.retry = retrynbmovefiles = PropriedadesNoteBook(“0-move-ftp-to-s3”, 500,3)
nblimpadados = PropriedadesNoteBook(“1-notebook_limpa_dados”, 1000,2)
nbimport = PropriedadesNoteBook(“2-notebook_importa_arquivos”, 1000,2)
nbidentificadores = PropriedadesNoteBook(“3-notebook_processa_identificadores”,1000,2)
nbprocessjson = PropriedadesNoteBook(“4-notebook_processa_json”,2000,2)
nbexcluidados = PropriedadesNoteBook(“6-notebook_envia_json_sqs”,1000,1)

if dbutils.notebook.run(nbmovefiles.path, nbmovefiles.timeout) != “error”:
send_slack_message(“”,“Notebook “+nbmovefiles.path+ ” executado com sucesso!”)
else:
print(“Ocorreu um erro na execução do notebook: “+nbmovefiles.path)
send_slack_message(“”, f”Ocorreu uma falha na execução do notebook: {nbmovefiles.path})
raise

if dbutils.notebook.run(nblimpadados.path, nblimpadados.timeout) != “error”:
send_slack_message(“”,“Notebook “+nblimpadados.path+ ” executado com sucesso!”)
else:
print(“Ocorreu um erro na execução do notebook: “+nblimpadados.path)
send_slack_message(“”, f”Ocorreu uma falha na execução do notebook: {nblimpadados.path})
raise

if dbutils.notebook.run(nbimport.path, nbimport.timeout) != “error”:
send_slack_message(“”,” Notebook “+nbimport.path+ ” executado com sucesso!”)
else:
print(“Ocorreu um erro na execução do notebook: “+nbimport.path)
send_slack_message(“”, f”Ocorreu uma falha na execução do notebook: {nbimport.path})
raise

if dbutils.notebook.run(nbidentificadores.path, nbidentificadores.timeout) != “error”:
send_slack_message(“”,“Notebook “+nbidentificadores.path+ ” executado com sucesso!”)
else:
print(“Ocorreu um erro na execução do notebook: “+nbidentificadores.path)
send_slack_message(“”, f”Ocorreu uma falha na execução do notebook: {nbidentificadores.path})
raise

if dbutils.notebook.run(nbprocessjson.path, nbprocessjson.timeout) != “error”:
send_slack_message(“”,” Notebook “+nbprocessjson.path+ ” executado com sucesso!”)
else:
print(“Ocorreu um erro na execução do notebook: “+nbprocessjson.path)
send_slack_message(“”, f”Ocorreu uma falha na execução do notebook: {nbprocessjson.path})

if dbutils.notebook.run(nbexcluidados.path, nbexcluidados.timeout) != “error”:
send_slack_message(“”,“Notebook “+nbexcluidados.path+ ” executado com sucesso!”)
else:
print(“Ocorreu um erro na execução do notebook: “+nbexcluidados.path)
send_slack_message(“”, f”Ocorreu uma falha na execução do notebook: {nbexcluidados.path})

No exemplo acima, eu faço toda a minha etapa de “ETL” de forma sequencial, visto que, em meu caso, os arquivos eram disponibilizados no SFTP de 1 a 3 vezes ao dia (às vezes muito mais, que é onde quero chegar com esse artigo). Legal, você tem seu job, sua orquestração, seus notebooks que executam tarefas e seu job é executado em uma janela definida de acordo com o que foi definido com o time de negócios. Mas, depois de desenvolvido, testado e validado, esse processo ao longo que o próprio negócio foi crescendo, principalmente em 3 aspectos:

  1. Muitos setups de novos clientes, que o time de produtos precisava validar o JSON processado pelo Databricks a nível de homologação ou produção.

Bom, chegamos onde será demonstrado como utilizar e a efetividade das funções abaixo:

notebook_params (parametro da API do databricks)
dbutils.widgets.getArgument
dbutils.widgets.text

Passando e recebendo dados através do widgets entre notebooks.

Conforme havia comentado anteriormente, eu senti a necessidade de trazer mais liberdade para usuários e sistemas e tirar um pouco a preocupação do engenheiro de dados de modificar, executar e monitorar o fluxo de importação, processamento e envio dos dados para o AWS SQS (sistema de fila de mensagens). Desta forma, comecei a elaborar um notebook responsável totalmente pelo reprocessamento das necessidades acima, relembrando:

  1. Muitos setups de novos clientes que o time de produtos precisava validar o JSON processado pelo Databricks a nível de homologação ou produção.

Continuando, além do notebook que desenvolvi, que denominei de reprocessamento, criei um segundo notebook chamado orquestrador_reprocessamento e, por último, criei um bucket dedicado a esse tipo de necessidade de reprocessamento, obter apenas o penúltimo processo que o JSON, enviar ou não para o SQS e etc… Ótimo, agora como o usuário ou aplicação vai chamar esse notebook e especificar o que ele quer exatamente diante dessas 3 novas funcionalidades? Para isso, utilizei a própria API do Databricks, no qual é composta pelo ID do seu cluster + .cloud.databricks.com/api/2.1/jobs/run-now. Ficando algo do tipo: https://meuidcluster.cloud.databricks.com/api/2.1/jobs/run-now. Além disso, conforme mencionei anteriormente, criei um bucket focado em reprocessamento de dados, no qual dentro desse bucket criei as seguintes: pastas:
– hmle/json
– hmle/sqs
– json_gerados
– prod/json
– prod/sqs

Basicamente, como grande parte das necessidades (exceto o processamento completo que envolve o SFTP) envolve a necessidade de colocar o arquivo de SETUP para processamento, eu criei as pastas acima onde meu notebook de reprocessamento tem a seguinte inteligência:

– hmle/json — Caso exista arquivos dentro desse diretório, eu inicio meu notebook de reprocessamento passando via widgets para meu notebook do fluxo normal (dessa forma, mantenho a regra do fluxo diário sem criar algo separado com essa necessidade de reprocessamento) da seguinte forma:

HMLE_PROCESS = "T"
PROD_PROCESS = "F"
JSON_PROCESS = "T"
dbutils.widgets.text('HMLE_PROCESS', HMLE_PROCESS)
dbutils.widgets.text('PROD_PROCESS', PROD_PROCESS)
dbutils.widgets.text('JSON_PROCESS', JSON_PROCESS)
if dbutils.notebook.run(nbprocessjson.path, nbprocessjson.timeout, {"HMLE_PROCESS": HMLE_PROCESS, "PROD_PROCESS": PROD_PROCESS, "JSON_PROCESS": JSON_PROCESS}) != "error":
  print("Notebook "+nbprocessjson.path+ " executado com sucesso!")
else:
  print("Ocorreu um erro na execução do notebook: "+nbprocessjson.path)
  send_slack_message_reprocessing("1-importa arquivos lake", f"Ocorreu uma falha na execução do notebook: {nbprocessjson.path}")

O trecho de código acima é executado no meu notebook de reprocessamento. Repare que eu passo nos widgets tudo de acordo como foi disponibilizado o arquivo pelo usuário ao bucket, ou seja, quero processar o arquivo e obter o JSON em ambiente de homologação. Já no notebook que vai receber os parâmetros, fica da seguinte forma:

try:
    JSON_PROCESS = dbutils.widgets.getArgument("JSON_PROCESS")
    PROD_PROCESS = dbutils.widgets.getArgument("PROD_PROCESS")
    HMLE_PROCESS= dbutils.widgets.getArgument("HMLE_PROCESS")
    
    print(f"JSON - {JSON_PROCESS} PROD - {PROD_PROCESS} - File json name - {HMLE_PROCESS}") 
except Exception as e:
    print(f"Falha nos widgets - {str(e)}")
    print("Processamento diário!")
    pass

Desta forma, posso chamar meu notebook de processamento diário, adaptar futuras regras de negócio sem me preocupar se isso vai refletir no processamento, pois o reprocessamento já aponta para os notebooks principais.
  • hmle/sqs — realizo a mesma lógica do código acima, porém, quando chega no meu notebook de envio ao SQS, valido meus widgets e vejo se ele está preenchido ou não como HMLE ou Prod e faço o apontamento para o SQS correto.

Utilizando a API do databricks para reprocessamento completo

Você pode estar se perguntando: por que você introduziu uma lógica do usuário disponibilizar o arquivo no bucket nas pastas específicas de acordo com sua necessidade, mas o reprocessamento completo (SFTP -> ingestão S3 -> limpeza dados -> processamento layout -> envio SQS) não segue esse padrão e sim via API Databricks? Este caso de reprocessamento completo envolve uma etapa na qual o usuário ou API interna que envia o arquivo já obtido pelo usuário para um dos buckets, que é o acesso ao SFTP de cada cliente (+40 clientes). Além de ser uma informação sensível, eu preciso garantir que, quando manipulo o SFTP do cliente, o arquivo vai chegar até a etapa final que é o AWS SQS. Portanto, é aqui que entra o notebook_params. De forma muito simples, eu passo um atributo dentro desse parâmetro no corpo da requisição, e ela é recebida pelo meu notebook atrelado ao meu job de reprocessamento como um widget também.

Exemplo da requisição:

{
“job_id”: 961722246913259,
“pipeline_params”: {
“full_refresh”: true
},
“notebook_params”: {
“ftp”: true
}
}

Portanto, no meu notebook de orquestração, eu valido se: dbutils.widgets.get(“ftp”) existe, se está preenchido como true ou false e, caso seja true, eu chamo através do dbutils.notebook.run o meu notebook de comunicação e importação com os SFTP dos clientes.

Conclusão

Neste artigo, exploramos as vantagens de utilizar a API do Databricks e seus recursos em um projeto que envolve a importação, processamento e envio de dados para um sistema de fila. Demonstramos como a plataforma Databricks permite a criação de pipelines de dados escaláveis e eficientes, proporcionando maior autonomia aos usuários e sistemas. Utilizando a função dbutils.notebook.run, foi possível orquestrar notebooks e executar etapas de ETL de forma sequencial e paralela, além de atribuir retry e time. Com a implementação dos widgets e a API do Databricks, conseguimos adaptar o fluxo de trabalho de acordo com as necessidades específicas dos usuários, permitindo o reprocessamento de arquivos, geração de JSONs e processamento em ambiente de homologação ou produção. Ao utilizar a API do Databricks, conseguimos garantir que, mesmo em casos de reprocessamento completo, o arquivo será enviado ao AWS SQS após passar por todas as etapas do pipeline. Isso mostra o quão flexível e eficiente a plataforma Databricks pode ser ao lidar com diferentes regras de negócio e requisitos de processamento de dados. E o mais incrível, criei um fluxo de reprocessamento que segue o fluxo padrão diário da minha pipeline sem ter que deixá-la super complexa.

*O conteúdo deste artigo é de responsabilidade do(a) autor(a) e não reflete necessariamente a opinião do iMasters.