Data

22 mai, 2025

Como criar um fluxo de RAG utilizando 100% os recursos do Databricks

Publicidade

Todo engenheiro de dados (Data Engineer) e desenvolvedor de software (Software Engineer) está ou vai estar muito “Harmonizado” e na minha visão deve estar engajado com o desenvolvimento de soluções de AI. Ok, tudo bem, neste artigo vou mirar mais no meu publico que são os Data Analytics Engineer, Data Architech, Data Engineer, DBRE etc.. mas você, desenvolvedor, vai precisar saber disso também. Mas, por que, então, vou escrever esse artigo mais para o publico de Data? Bom por que aqui no Brasil, o Databricks é massivamente utilizado para questões de Data e não desenvolvimento de software. Contudo, todos de tecnologia devem saber o conceito de RAG e, neste artigo, vou ensinar como fazer um fluxo de RAG completo utilizando 100% o Databricks e todos os recursos que ele já oferece para tornar isso possível.

Vamos nessa introdução clássica para conseguir aprofundar no tema: o que é RAG?

O que é RAG?

RAG é uma arquitetura que combina um componente de recuperação de informação com um modelo generativo. Quando uma pergunta é feita, o sistema primeiro busca documentos ou trechos de texto relevantes de uma base de conhecimento. Em seguida, esses documentos recuperados são fornecidos como contexto adicional ao LLM, que então gera uma resposta mais informada e precisa.

Resumindo, ele alimenta uma AI com informações da sua empresa, o que digamos que é uma coisa MUITO NECESSARIA.

Por que Databricks para RAG?

A Databricks Data Intelligence Platform oferece diversas vantagens para a construção de sistemas RAG:

  • Unified Data Lakehouse: Armazene, processe e gerencie todos os tipos de dados (estruturados, semiestruturados e não estruturados) em um único local com Delta Lake.
  • Processamento Escalável: Utilize Apache Spark para processamento de dados em larga escala, essencial para lidar com grandes volumes de texto.
  • Gerenciamento de Modelos: MLflow e Databricks Model Serving simplificam o rastreamento, versionamento, implantação e monitoramento de modelos de embedding e LLMs.
  • Vector Search (Busca Vetorial): Databricks oferece soluções nativas para busca vetorial, permitindo a criação e consulta eficiente de índices de embeddings.
  • Unity Catalog: Governança de dados, linhagem e descoberta de ativos em todo o seu pipeline de RAG.
  • Workflows: Orquestre pipelines complexos de RAG de forma programática.
  • Notebooks e Colaboração: Ambiente interativo para desenvolvimento, experimentação e colaboração.
  • Model serving Endpoint:Registra seu modelo e provisiona como uma API com varias pequenas features como monitoria, threshold, Guardrails entre outras funcionalidades.

Etapas para Construir um Pipeline RAG no Databricks

Vamos detalhar as etapas envolvidas na criação de um sistema RAG no Databricks:

Bom, aqui eu espero que todos já saibam montar delta tables, tipos de arquitetura de dados, medalion etc… certo? Então, você tem lá várias tabelas gold que representam por exemplo, um sistema de chamados, algumas são comentários dos chamados, outras somente o que é o chamado, etc..

Se você receber o desafio de construir uma AI que compreenda bem melhor os tipos e o que e como estão os chamados abertos para sua empresa, como devemos começar?

Primeiramente, uma Gold no estilo OBT – One big Table, que tenha todo o sistema de chamado em uma única tabela. Legal, a partir disso, é necessário fazer algumas analises exploratórias, principalmente nas colunas valiosas para sua AI, que são: titulo do chamado, tipo do chamado, data de abertura, data de resolução e comentários (iteração entre o solicitante e o atendente).

Mas já não é uma Gold, por quê preciso fazer essa análise como uma espécie de limpeza de dados? Pois é, meu caro, AI não chegou a passos curtos…. Digo isso para empresas que já têm seu Datalake, Data warehouse, Data hub, enfim, para quem está nascendo, já pensa em AI. Como eu li recentemente sobre o Nubank “Somos uma empresa AI-First“.

Então, vamos lá entender os dados para ficar bonitinho para nosso banco vetorial.

Análises Exploratórias (EDA):

Compreensão da Estrutura dos Dados:

  • Identifique os campos relevantes: Título do chamado, descrição, comentários, status (aberto, em andamento, resolvido, fechado), data de criação, data de resolução, prioridade, categoria, tags, logs de sistema, anexos.
  • Analise a distribuição de chamados por status, prioridade, categoria.
  • Verifique a completude dos campos: Quais campos possuem muitos valores ausentes

Análise do Conteúdo Textual:

  • Tamanho do Texto: Verifique a distribuição do comprimento das descrições e comentários. Isso ajudará a definir estratégias de chunking.
  • Linguagem: Identifique os idiomas presentes nos chamados. Se houver múltiplos idiomas, pode ser necessário traduzir ou usar modelos de embedding multilíngues.
  • Qualidade do Texto: Procure por erros de digitação, abreviações comuns, jargões específicos do domínio.
  • Presença de Formatação Especial: Tickets frequentemente contêm HTML, Markdown, snippets de código, stack traces ou URLs. Decida como tratar esses elementos (remover, extrair, converter para texto puro).
  • Identificação de PII (Informações de Identificação Pessoal): Busque por nomes, emails, telefones, CPFs, etc., que precisam ser anonimizados ou removidos.

Análise Temporal:

  • Como o volume de chamados varia ao longo do tempo?
  • Existem padrões sazonais ou tendências?

Relações entre Chamados:

  • Identifique chamados duplicados ou relacionados.
  • Analise o fluxo de resolução: Quais são as etapas comuns?

Limpeza de Dados:

  • Remoção/Anonimização de PII: Utilize técnicas como regex, bibliotecas de detecção de PII (ex: presidio) ou serviços cognitivos para identificar e mascarar/remover dados sensíveis.
  • Tratamento de HTML/Markdown: Use bibliotecas como BeautifulSoup (Python) para remover tags HTML e extrair o texto puro, ou converter Markdown para texto.

Confesso que muitos não consigo tirar conclusões, mas sigo a receita. Feito isso, agora seus dados estão prontos para os termos que podem ser novos para você, como: chunking, embeding, vetorização. Mas antes disso é importante você fazer uma última coisa com os dados dessa tabela, unificar as informações importantes em uma única coluna, que normalmente chamamos de content. Ela, em nosso caso, terá na ordem titulo do chamado, descrição do chamado e depois toda iteração do chamado, identificando o que foi uma pergunta e o que foi uma resposta, ou seja, solicitante e atendente.

Agora, sim, vamos para a codificação:

1. Realizar o chunk da coluna content:

Vou explicar rapidinho o que é chunk: “chunk” se refere a um segmento de texto menor e mais gerenciável, extraído de um documento maior, para facilitar a busca e recuperação de informações relevantes. Quer saber mais, vai no perplexity.ai e busca (google não, virou Kodak).

Como fazer o Chunk? Bom, existe um framework, que você vai ouvir falar muito, chamado langchain, que já tem diversos tipos de chunks prontinhos para realizarmos. Neste caso, o código será:

Conteúdo do artigo
Imagem gerada da minha função com langchain para Chunk

Bacana, não? E o mais bacana é que eu só escrevo bacana, mas não falo bacana, enfim. O que estamos fazendo aí, para quem não entendeu, além da enxurrada de comentários:

  1. Validação da Entrada: Primeiro, ele verifica se o texto fornecido (texto_concatenado) é válido (não é nulo, vazio ou de um tipo diferente de string). Se não for, retorna uma lista vazia.
  2. Remoção de Duplicações Iniciais: O código suspeita que o texto de entrada pode ter seções inteiras duplicadas, separadas por ” | “. Ele divide o texto por esse separador. Para cada parte, ele calcula um “hash” (uma espécie de impressão digital única do texto) usando o algoritmo MD5. Ele mantém apenas as partes com hashes únicos, eliminando assim as duplicações em nível de seção. O texto é reconstruído sem essas duplicações.
  3. Divisão Semântica (Chunking): Ele utiliza a biblioteca langchain com RecursiveCharacterTextSplitter. Esta ferramenta é projetada para quebrar textos grandes em pedaços menores, tentando manter o significado. Uma lista de separadores é definida (como “Pergunta:”, “Resposta:”, ” | “, etc.). O RecursiveCharacterTextSplitter tentará quebrar o texto nesses pontos prioritariamente. Parâmetros como chunk_size (tamanho máximo de cada pedaço) e chunk_overlap (quanto de texto se sobrepõe entre pedaços consecutivos para manter o contexto) são configurados. O texto (já sem as duplicações de seção) é então dividido nesses chunks.
  4. Processamento e Filtragem dos Chunks: Cada chunk gerado passa por um processo de limpeza e validação: Espaços em branco no início e no fim são removidos. Chunks vazios são descartados. Chunks muito pequenos (menos de 20 caracteres) são ignorados, a menos que contenham termos chave como “Pergunta:”, “Resposta:” ou “Titulo”, indicando que podem ser importantes apesar do tamanho. Remoção de Duplicações em Nível de Chunk: Semelhante à etapa 2, um hash MD5 é calculado para cada chunk. Se um chunk com o mesmo conteúdo (mesmo hash) já foi processado, ele é descartado para evitar redundância na saída final.
  5. Retorno: A função retorna uma lista de chunks_processados, que são os pedaços de texto finais, limpos, sem duplicações e divididos semanticamente.

Em resumo, o código é uma ferramenta de pré-processamento de texto que limpa e organiza comentários concatenados, tornando-os mais fáceis de serem processados por outras ferramentas ou modelos de linguagem, ao dividi-los em unidades menores e relevantes e removendo repetições.

E agora com o resultado adivinha? Você vai criar uma nova coluna chamada chunk_content, ou chunk_text enfim. E por que fizemos o chunk? para seguirmos para o mais mirabolante que eu vou explicar bem pouco que não estudei afundo o embedding. Ah mas entender o embedding é simples, sim é simples o tipo mais comum sem se aprofundar no seus parametros ou nos outros tipos de embedding como

  • Dense Embeddings
  • Sparse Embeddings
  • Hybrid Embeddings/Search
Conteúdo do artigo

Enfim, vamos no arroz com feijão que é bom e todo Brasileiro sabe disso. Portanto no databricks como posso fazer o embedding? Bom tem 2 formas:

1.Usar modelos open-source do Hugging Face (via transformers ou MLflow) ou modelos proprietários.

2.O Databricks Model Serving hospedar diversos modelos para embedding e conversação (Chat).

Como hoje estou com preguiça vamos de Model Serving da databricks para facilitar a vida. E para ser mais fã da databricks vou escolher um modelo LLM de embedding que tem o nome databricks nele: databricks-bge-large-en. Agora como é feito o embedding, bom lembra que falei que estou com preguiça hoje? pois bem eu criei uma função que já retorna os embedding a partir de uma lista que neste caso nós passamos a coluna de chunk lembra?

Conteúdo do artigo
Função para realizar o Embedding da coluna Chu

E agora só fazer o mesmo que fizemos para criar a coluna de chunk mas dessa vez essa coluna deve ser OBRIGATÓRIAMENTE chamada de embedding:

Conteúdo do artigo

Claro que no código acima eu já suponho né que sua variável df seja sua tabela completa com o chunk.

Ok, recapitulando o que foi feito até aqui:

  1. Analise exploratória dos dados.
  2. Limpeza e normalização.
  3. Junção de colunas importantes em uma unica coluna.
  4. Criamos a coluna de chunk com base na coluna unica que chamamos de content.
  5. Criamos a coluna de embedding com base na coluna de chunk chamada de chunk_text.

Agora estamos prontos para uma das facilidades do databricks que é tornar nossa tabela delta contendo os embeddings uma tabela com índice vetorial. Você pode configurar o tipo de índice (direto ou Delta Sync) e a métrica de similaridade (ex: cosseno, produto escalar).

Primeiramente você precisa habilitar o Change Data Feed da sua tabela:

Conteúdo do artigo

Este código verifica se esta habilitado a tabela, do contrario habilita a propriedade, agora precisamos criar um Vector Search, registrar o index da sua tabela e um endpoint para receber as predições. Vamos entender então mais sobre o Vector Search:

Conteúdo do artigo

O Databricks oferece o Vector Search, uma solução de banco de dados vetorial sem servidor e de baixa latência. Ele se integra perfeitamente com o Unity Catalog e o Model Serving.

  1. Criação do Endpoint: Crie um endpoint do Vector Search através da UI do Databricks ou da API.
  2. Criação do Índice: Crie um índice vetorial a partir da sua tabela Delta contendo os embeddings. Você pode configurar o tipo de índice (direto ou Delta Sync) e a métrica de similaridade (ex: cosseno, produto escalar).

A Databricks oferece dois modos principais de indexação: o Delta Sync Index, que sincroniza automaticamente com uma tabela Delta e pode calcular embeddings ou usar embeddings pré-existentes, e o Direct Vector Access Index, que permite inserção e atualização manual de vetores e metadados via API ou SDK. Por baixo dos panos ouvi dizer que é pgvector (https://github.com/pgvector/pgvector), ou seja um PostgreSQL Vetorial.

Ok, agora vamos criar nosso Vector Search, o index, referenciar nossa tabela e o endpoint via python, tem como fazer via interface? sim tem, mas ai ficar tirando print e colocando aqui eu iria demorar muito e via código é mais rápido não é mesmo.

Conteúdo do artigo

Grande o código não, mas ele já faz varias validações esta como Classe para incluir mais métodos como atualizar o index, realizar buscas etc.. Explicando melhor o código:

Este código define uma classe Python chamada AiVectorSearch, projetada para gerenciar um processo de ETL (Extração, Transformação e Carga) focado na criação e sincronização de um índice de pesquisa vetorial. A pesquisa vetorial é comumente usada em aplicações de IA para encontrar itens semelhantes com base em embeddings (representações vetoriais de dados).

A seguir, uma análise hierárquica da classe e seus métodos:

1.Classe: AiVectorSearch Esta é a classe principal que encapsula toda a lógica para o processo de ETL da pesquisa vetorial.

1.1. Método Construtor: __init__(self, …) Objetivo: Inicializar uma nova instância da classe AISVectorSearchETL. Parâmetros Principais: source_table_name: Nome da tabela de origem dos dados. source_table_type: Tipo da tabela de origem (ex: “TABLE”, “VIEW”). vector_search_endpoint_name: Nome do endpoint do serviço de pesquisa vetorial. embedding_column: Nome da coluna que contém os embeddings vetoriais. text_column: Nome da coluna que contém o texto original (opcional, mas útil para referência). primary_key: Nome da coluna da chave primária na tabela de origem. id_column: Nome da coluna que será usada como ID no índice vetorial. Ações: Armazena os parâmetros recebidos como atributos da instância (ex: self.source_table_name). Inicializa outros atributos internos, como self.ws (provavelmente um cliente do workspace, ex: Databricks), self.vs_endpoint, self.index_name, self.index_info, self.pipeline_type. Define o nome do índice (self.index_name) combinando o nome do endpoint e o nome da tabela de origem. Define o tipo de pipeline (self.pipeline_type), que parece ser “DELTA_SYNC” com base no código.

1.2. Método Privado: _get_vs_endpoint(self) Objetivo: Obter os detalhes do endpoint da pesquisa vetorial especificado. Ações: Utiliza self.ws.vector_search.get_endpoint() para buscar as informações do endpoint. Armazena o resultado em self.vs_endpoint. Imprime uma mensagem indicando se o endpoint foi encontrado ou não.

1.3. Método Privado: _get_index_info(self) Objetivo: Obter informações sobre o índice de pesquisa vetorial, como seu status e tipo. Ações: Chama self._get_vs_endpoint() para garantir que as informações do endpoint estejam disponíveis. Tenta obter o índice usando self.ws.vector_search.get_index(endpoint_name=self.vector_search_endpoint_name, index_name=self.index_name). Se o índice existir: Armazena as informações do índice em self.index_info. Imprime o nome, tipo e status do índice. Se o índice não existir (captura uma exceção, provavelmente HttpError com status 404): Define self.index_info como None. Imprime uma mensagem indicando que o índice não foi encontrado.

1.4. Método Privado: _create_index_if_not_exists(self) Objetivo: Criar o índice de pesquisa vetorial se ele ainda não existir. Ações: Chama self._get_index_info() para verificar o estado atual do índice. Se self.index_info for None (índice não existe): Imprime uma mensagem indicando que o índice será criado. Chama self.ws.vector_search.create_delta_sync_index(…) para criar um novo índice de sincronização Delta. Parâmetros para criação: endpoint_name: Nome do endpoint. index_name: Nome do índice. source_table_name: Tabela de origem dos dados. pipeline_type: Tipo de pipeline (ex: “DELTA_SYNC”). primary_key: Chave primária da tabela de origem. embedding_vector_column (ou similar, o nome exato pode variar na imagem): Coluna de embedding. embedding_dimension (ou similar): Dimensão dos vetores de embedding. text_column (ou similar, se aplicável): Coluna de texto. Atualiza self.index_info chamando self._get_index_info() novamente. Se o índice já existir: Imprime uma mensagem indicando que o índice já existe e seu status.

1.5. Método Privado: _wait_for_index_online(self) Objetivo: Pausar a execução até que o índice de pesquisa vetorial esteja no estado “ONLINE” e pronto para uso. Ações: Chama self._get_index_info() para obter o status mais recente do índice. Entra em um loop while que continua enquanto o status do índice (self.index_info.status.detailed_state) não for “ONLINE”. Dentro do loop: Imprime o status atual do índice. Aguarda por um período fixo (ex: 30 segundos) usando time.sleep(30). Atualiza as informações do índice chamando self._get_index_info() novamente. Quando o loop terminar (índice está “ONLINE”): Imprime uma mensagem confirmando que o índice está online.

1.6. Método Privado: _sync_data(self) Objetivo: Orquestrar o processo de criação (se necessário), espera e sincronização de dados para o índice. Ações: Chama self._create_index_if_not_exists() para garantir que o índice exista. Chama self._wait_for_index_online() para garantir que o índice esteja pronto. Verifica se o tipo de índice é “DELTA_SYNC”. Se for, e se houver um self.index_info válido, ele tenta sincronizar os dados. A imagem mostra idx.sync(), onde idx seria self.index_info ou um objeto de índice obtido a partir dele. Isso sugere que o objeto index_info (ou o objeto retornado por get_index) tem um método sync(). Imprime mensagens sobre o início e a conclusão da sincronização. Se o tipo de índice não for “DELTA_SYNC” ou se self.index_info for inválido, imprime uma mensagem apropriada.

1.7. Método Público: run(self) Objetivo: Ponto de entrada principal para executar todo o processo de ETL da pesquisa vetorial. Ações: Imprime uma mensagem indicando o início do processo de ETL. Chama o método self._sync_data() para realizar a sincronização. Imprime uma mensagem indicando a conclusão do processo de ETL.

Resumo da Hierarquia e Fluxo:

  1. Um objeto AiVectorSearch é instanciado com as configurações necessárias.
  2. O método run() é chamado para iniciar o processo.
  3. run() chama _sync_data().
  4. _sync_data() primeiro chama _create_index_if_not_exists(): _create_index_if_not_exists() chama _get_index_info() para verificar se o índice existe. _get_index_info() chama _get_vs_endpoint() para obter detalhes do endpoint. Se o índice não existir, _create_index_if_not_exists() o cria.
  5. _sync_data() então chama _wait_for_index_online(): _wait_for_index_online() chama repetidamente _get_index_info() para monitorar o status do índice até que ele esteja online.
  6. Finalmente, _sync_data() (se o índice for do tipo correto e estiver online) executa a operação de sincronização de dados (ex: idx.sync()).

Agora com toda parte do Vector Search realizada você terá visualmente no databricks:

Conteúdo do artigo

Repare que agora você tem um banco de dados vetorial criado, com 1 index que é da sua tabela e o creator que ficara o seu e-mail.

The end: Registrando seu modelo com esse fluxo de RAG como uma API através do Model Serving endpoint da Databricks.

Se você se aventurar e ir na seção de Serving no databricks verá vários modelos da databricks que são de modelos proprietários e grande parte opensource:

Conteúdo do artigo

O que queremos é registrar seu modelo como um modelo Chat e que apareça aqui como disponível para uso como API para receber perguntas passar pelo banco vetorial, receber mais contexto, passar pelo pré-prompt e por ultimo enviar para a LLM de chat que vai nos devolver a resposta.

Primeiro ponto é criar o código que vai realizar toda essa orquestração que neste caso eu denominei de RagChatModel onde ela é responsável por fazer busca vetoriais, obter retorno do LLM de chat, realizar o processo de embedding da pergunta do usuário, definir temperatura do modelo do chat que basicamente define o quanto ele pode ser criativo ou não, qual é meu endpoint do vector search, qual o LLM de chat, que neste caso vamos utilizar o databricks-meta-llama-3-1-8b-instruct e qual a tabela de origem.

Soltar a bomba:

Conteúdo do artigo

 

Legal e como registro essa classe no Model Serving Endpoint no databricks?

Conteúdo do artigo

A saída será mais ou menos desta forma ao rodar no seu notebook Databricks:

Conteúdo do artigo

Agora sim se olharmos no Serving endpoints veremos nosso modelo RAG com API registrado e gerenciado pela Databricks:

Conteúdo do artigo
Conteúdo do artigo

E por ultimo se você estiver construindo com mlflow e utilizara classe mlflow.pyfunc.ChatModel

Você testar no Playground do databricks:

Conteúdo do artigo

Conclusão: o que criamos se resume nesta imagem abaixo:

Conteúdo do artigo

Espero que tenham curtido. Se sim, me sigam no Linkedin para ver meus post, artigos.

Meus demais artigos: https://imasters.com.br/perfil/airtonlirajr

Github: https://github.com/AirtonLira

Valeu, abrs!!