Data

1 nov, 2018

Plataforma de Big Data da Uber: mais de 100 Petabytes com Latência de Minuto

Publicidade

A Uber está comprometida em oferecer transporte mais seguro e confiável em nossos mercados globais. Para conseguir isso, ela depende muito de tomar decisões baseadas em dados em todos os níveis, desde a previsão da demanda do motorista durante os eventos de alto tráfego até a identificação e solução de gargalos em nosso processo de inscrição de motorista parceiro.

Com o tempo, a necessidade de mais insights resultou em mais de 100 petabytes de dados analíticos que precisam ser limpos, armazenados e servidos com latência mínima por meio de nossa plataforma Big Data baseada em Hadoop. Desde 2014, trabalhamos para desenvolver uma solução de Big Data que garanta confiabilidade, escalabilidade e facilidade de uso de dados, e agora estamos focados em aumentar a velocidade e a eficiência de nossa plataforma.

Neste artigo, vamos mergulhar na jornada da plataforma Hadoop da Uber e discutir o que estamos construindo para expandir esse ecossistema rico e complexo.

Geração 1: O começo do Big Data na Uber

Antes de 2014, nossa quantidade limitada de dados poderia se encaixar em alguns bancos de dados de processamento de transações on-line (OLTP) tradicionais (no nosso caso, MySQL e PostgreSQL).

Para alavancar esses dados, nossos engenheiros tiveram que acessar cada banco de dados ou tabela individualmente, e foi deixado para os usuários escreverem seus próprios códigos se eles precisassem combinar dados de bancos de dados diferentes.

Naquela época, não tínhamos acesso global nem uma visão global de todos os nossos dados armazenados. Na verdade, nossos dados estavam espalhados em diferentes bancos de dados OLTP, o tamanho total dos dados era da ordem de alguns terabytes, e a latência para acessar esses dados era muito rápida (muitas vezes, sub-minuto). A Figura 1, abaixo, fornece uma visão geral de nossa arquitetura de dados antes de 2014:

Figura 1: Antes de 2014, a quantidade total de dados armazenados na Uber era pequena o suficiente para caber em alguns bancos de dados OLTP tradicionais. Não havia uma visão global dos dados e o acesso aos dados era rápido, pois cada banco de dados era consultado diretamente.

Com os negócios da Uber crescendo exponencialmente (tanto em termos do número de cidades/países em que operamos e o número de passageiros/motoristas que usam o serviço em cada cidade), a quantidade de dados recebidos também aumentou e a necessidade de acessar e analisar todos os dados em um só lugar exigiram que construíssemos a primeira geração de nosso warehouse/depósito de dados analíticos.

Para tornar a Uber o mais orientada a dados possível, precisávamos garantir que os dados analíticos fossem acessíveis aos analistas, tudo em um só lugar. Para atingir esse objetivo, primeiro categorizamos nossos usuários de dados em três categorias principais:

  • Equipes de operações da cidade (milhares de usuários): essas equipes no local gerenciam e dimensionam a rede de transporte da Uber em cada mercado. Com nossos negócios expandindo para novas cidades, há milhares de equipes de operações da cidade acessando esses dados regularmente para responder a problemas específicos de motoristas e passageiros.
  • Cientistas e analistas de dados (centenas de usuários): são analistas e cientistas espalhados por diferentes grupos funcionais que precisam de dados para ajudar a fornecer as melhores experiências de transporte e entrega aos nossos usuários, por exemplo, ao prever a demanda do passageiros de futuros serviços.
  • Equipes de engenharia (centenas de usuários): engenheiros de toda a empresa concentraram-se na criação de aplicativos de dados automatizados, como nossas plataformas de Detecção de Fraudes e Onboarding de Motoristas.

A primeira geração do nosso depósito de dados analíticos concentrou-se em agregar todos os dados da Uber em um único local, além de simplificar o acesso aos dados. Para os primeiros, decidimos usar o Vertica como nosso software de depósito de dados por causa de seu design rápido, escalável e orientado por colunas.

Também desenvolvemos várias tarefas ad hoc ETL (Extrair, Transformar e Carregar) que copiaram dados de diferentes fontes (por exemplo, AWS S3, bancos de dados OLTP, registros de serviços, etc) para a Vertica.

Para alcançar o último, padronizamos o SQL como nossa interface de solução e criamos um serviço de consulta on-line para aceitar as consultas dos usuários e enviá-las ao mecanismo de consulta subjacente.

A Figura 2, abaixo, descreve esse depósito de dados analíticos:

Figura 2: A primeira geração da plataforma de Big Data da Uber nos permitiu agregar todos os dados da Uber em um único local e fornecer uma interface SQL padrão para os usuários acessarem dados.

O lançamento do nosso primeiro serviço de depósito de dados foi um enorme sucesso para os engenheiros da empresa. Pela primeira vez, os usuários tinham uma visão global e podiam acessar todos os dados em um só lugar. Isso resultou em um grande número de novas equipes usando a análise de dados como base para suas decisões sobre tecnologia e produtos. Em poucos meses, o tamanho de nossos dados analíticos cresceu para dezenas de terabytes e o número de usuários aumentou para várias centenas.

O uso do SQL como uma interface padrão simples permitiu que os operadores da cidade interagissem facilmente com os dados sem conhecer as tecnologias subjacentes.

Além disso, diferentes equipes de engenharia começaram a criar serviços e produtos sob medida para as necessidades dos usuários que foram informados por esses dados (por exemplo, uberPool, precificação inicial, etc.) e novas equipes foram formadas para melhor usar e servir esses dados (ou seja, nosso aprendizado de máquina e equipes de experimentação).

Limitações

Por outro lado, o uso generalizado de nosso depósito de dados e dados recebidos revelou algumas limitações. Como os dados foram processados/ingeridos ​​por meio de tarefas ETL ad hoc e faltou um mecanismo formal de comunicação de esquema, a confiabilidade dos dados tornou-se uma preocupação. A maioria de nossos dados de origem estava no formato JSON e os trabalhos de processamento não eram resilientes a alterações no código do produtor.

Conforme nossa empresa crescia, escalar nosso depósito de dados tornou-se cada vez mais caro. Para reduzir os custos, começamos a excluir dados antigos e obsoletos para liberar espaço para novos dados.

Além disso, grande parte da nossa plataforma de Big Data não era dimensionável horizontalmente, pois o objetivo principal era desbloquear a necessidade crítica de negócios de acesso ou visualização centralizada de dados, e simplesmente não havia tempo suficiente para garantir que todas as partes fossem dimensionáveis ​​horizontalmente.

Nosso depósito de dados estava sendo efetivamente usado como um lago de dados, acumulando todos os dados brutos, bem como realizando toda a modelagem de dados e servindo os dados.

Além disso, as tarefas de ETL que ingeriram dados no depósito de dados também eram muito frágeis devido à falta de um contrato formal entre os serviços que produzem os dados e os consumidores de dados downstream (o uso do formato JSON flexível resultou na falta de imposição de esquema para os dados de origem).

Os mesmos dados podem ser ingeridos várias vezes se usuários diferentes fizerem diferentes transformações durante a ingestão. Isso resultou em pressão extra sobre nossas fontes de dados upstream (ou seja, armazenamentos de dados on-line) e afetou sua qualidade de serviço.

Além disso, isso resultou em várias cópias de quase os mesmos dados sendo armazenados em nosso depósito, aumentando ainda mais os custos de armazenamento.

E, no caso de problemas de qualidade de dados, o preenchimento consumia muito tempo e trabalho porque os trabalhos de ETL eram ad hoc e dependentes de origem, e as projeções e transformações de dados eram executadas durante a ingestão.

Também foi difícil ingerir novos conjuntos e tipos de dados devido à falta de padronização em nossas tarefas de ingestão.

Geração 2: A chegada do Hadoop

Para resolver essas limitações, reprojetamos nossa plataforma de Big Data em torno do ecossistema do Hadoop. Mais especificamente, introduzimos um lago de dados do Hadoop onde todos os dados brutos foram ingeridos a partir de diferentes armazenamentos de dados on-line apenas uma vez e sem transformação durante a ingestão.

Essa mudança de design reduziu significativamente a pressão sobre nossos datastores on-line e nos permitiu fazer a transição de tarefas de ingestão ad hoc para uma plataforma de ingestão escalonável.

Para que os usuários acessem dados no Hadoop, apresentamos o Presto para permitir consultas de usuário ad hoc interativas, o Apache Spark para facilitar o acesso programático a dados brutos (em formatos SQL e não SQL) e o Apache Hive para servir como o cavalo de batalha para consultas extremamente grandes.

Esses diferentes mecanismos de consulta permitiram que os usuários usassem as ferramentas que melhor atendiam às suas necessidades, tornando nossa plataforma mais flexível e acessível.

Para manter a plataforma escalável, garantimos que toda a modelagem e transformação de dados acontecesse apenas no Hadoop, permitindo o preenchimento e a recuperação rápida quando surgissem problemas.

Somente as tabelas modeladas mais críticas (ou seja, aquelas aproveitadas pelos operadores de cidade em tempo real para executar consultas SQL rápidas e puras) foram transferidas para o nosso depósito de dados.

Isso reduziu significativamente o custo operacional da execução de um enorme depósito de dados e, ao mesmo tempo, direcionou os usuários para mecanismos de consulta baseados no Hadoop que foram projetados tendo em mente suas necessidades específicas.

Também aproveitamos o formato de arquivo colunar padrão do Apache Parquet, resultando em economias de armazenamento, devido à taxa de compactação aprimorada e ganhos de recursos de computação, dado o acesso colunar para atender a consultas analíticas.

Além disso, a integração contínua do Parquet com o Apache Spark tornou essa solução uma opção popular para acessar dados do Hadoop. A Figura 3, abaixo, resume a arquitetura da nossa segunda geração de plataforma Big Data:

Figura 3: A segunda geração da nossa plataforma Big Data aproveitou o Hadoop para habilitar o dimensionamento horizontal. Incorporando tecnologias como Parquet, Spark e Hive, dezenas de petabytes de dados foram ingeridos, armazenados e servidos.

Além de incorporar um data lake do Hadoop, também tornamos todos os serviços de dados nesse ecossistema horizontalmente escalonáveis, melhorando assim a eficiência e a estabilidade de nossa plataforma de Big Data.

Em particular, ter essa escalabilidade horizontal universal para atender às necessidades imediatas de negócios nos permitiu concentrar nossa energia na construção da próxima geração da plataforma de dados, em oposição à solução de problemas ad hoc.

Ao contrário da primeira geração de nossa plataforma na qual os pipelines de dados eram vulneráveis ​​a alterações de formato de dados upstream, nossa segunda iteração nos permitiu esquematizar todos os dados, passando de JSON para Parquet para armazenar esquema e dados juntos.

Para conseguir isso, criamos um serviço de esquema central para coletar, armazenar e fornecer esquemas, bem como diferentes bibliotecas de clientes, para integrar diferentes serviços a esse serviço de esquema central.

Frágeis, os trabalhos de ingestão de dados ad hoc foram substituídos por uma plataforma padrão para transferir todos os dados de origem em seu formato aninhado original para o data lake do Hadoop.

Quaisquer operações necessárias e transformação dos dados ocorreram após a ingestão por meio de tarefas em lote escalonáveis ​​horizontalmente no Hadoop.

Com os negócios da Uber continuando a escalar à velocidade da luz, logo tivemos dezenas de petabytes de dados. Diariamente, havia dezenas de terabytes de novos dados adicionados ao nosso data lake, e nossa plataforma Big Data cresceu para mais de 10.000 vcores com mais de 100.000 tarefas em lote em execução em qualquer dia. Isso resultou em nosso data lake do Hadoop se tornando a fonte centralizada da verdade para todos os dados analíticos da Uber.

Limitações

Conforme a empresa continuava a escalonar e com dezenas de petabytes de dados armazenados em nosso ecossistema, enfrentamos um novo conjunto de desafios.

Para começar, a enorme quantidade de pequenos arquivos armazenados em nosso HDFS (resultante de mais dados sendo ingeridos, bem como mais usuários escrevendo trabalhos em lote ad hoc que geraram ainda mais dados de saída) começou a adicionar pressão extra ao HDFS NameNodes.

Além disso, a latência de dados ainda estava longe do que nossos negócios precisavam. Novos dados só eram acessíveis aos usuários a cada 24 horas, o que era muito lento para tomar decisões em tempo real.

Enquanto mover ETL e modelagem para o Hadoop tornou esse processo mais escalável, essas etapas ainda eram gargalos, uma vez que essas tarefas de ETL precisavam recriar toda a tabela modelada em todas as execuções.

Somando-se ao problema, tanto a ingestão dos novos dados quanto a modelagem da tabela derivada relacionada foram baseadas na criação de novas capturas instantâneas de todo o conjunto de dados e na troca de tabelas antigas e novas para fornecer aos usuários acesso a novos dados.

Os trabalhos de ingestão tiveram que retornar ao armazenamento de dados de origem, criar uma nova captura instantânea e ingerir ou converter todo o conjunto de dados em arquivos Parquet colunares consumíveis durante cada execução. Com nossas lojas de dados crescendo, esses trabalhos podem levar mais de vinte horas com mais de 1.000 executores do Spark para serem executados.

Uma grande parte de cada trabalho envolvia a conversão tanto de dados históricos quanto de dados novos da captura instantânea mais recente. Embora apenas mais de 100 gigabytes de novos dados tenham sido adicionados todos os dias para cada tabela, cada execução da ingestão teve que converter todo o conjunto de dados de mais de 100 terabytes para essa tabela específica.

Isso também era verdadeiro para tarefas ETL e de modelagem que recriavam novas tabelas derivadas em cada execução. Essas tarefas dependiam da ingestão baseada em capturas instantâneas dos dados de origem, devido à alta proporção de atualizações em dados históricos.

Por natureza, nossos dados contêm muitas operações de atualização (ou seja, classificações de passageiro e motorista ou ajustes de tarifa de suporte algumas horas ou mesmo dias após uma viagem concluída). Como o HDFS e o Parquet não suportam atualizações de dados, todos os trabalhos de ingestão precisaram para criar novas capturas instantâneas a partir dos dados de origem atualizados, ingerir a nova captura instantânea no Hadoop, convertê-la no formato Parquet e trocar as tabelas de saída/resultado para exibir os novos dados.

A Figura 4, abaixo, resume como essas ingestões de dados baseados em captura instantânea foram movidas por meio de nossa plataforma Big Data:

Figura 4: Embora o Hadoop permitisse o armazenamento de vários petabytes de dados em nossa plataforma Big Data, a latência de novos dados ainda era de um dia, um atraso devido à ingestão baseada em capturas instantâneas de tabelas de origem grandes e de upstream que levam várias horas processo.

Geração 3: Reconstruindo nossa plataforma de Big Data para o longo prazo

No início de 2017, nossa plataforma de Big Data foi usada por equipes de engenharia e operações em toda a empresa, permitindo que eles acessassem dados novos e históricos em um só lugar. Os usuários podiam facilmente acessar dados no Hive, Presto, Spark, Vertica, Notebook e mais opções de depósito por meio de um único portal de interface do usuário adaptado às suas necessidades.

Com mais de 100 petabytes de dados no HDFS, 100.000 vcores em nosso cluster computacional, 100.000 consultas Presto por dia, 10.000 trabalhos Spark por dia e 20.000 consultas Hive por dia, nossa arquitetura de análise Hadoop estava atingindo limitações de escalabilidade e muitos serviços foram afetados por altas latência de dados.

Felizmente, como nossa infraestrutura subjacente era dimensionável horizontalmente para atender às necessidades comerciais imediatas, tínhamos tempo suficiente para estudar nosso conteúdo de dados, padrões de acesso a dados e requisitos específicos do usuário para identificar as preocupações mais prementes antes de construir a próxima geração. Nossa pesquisa revelou quatro principais pontos problemáticos:

  • Limitação de escalabilidade do HDFS: Esse problema é enfrentado por muitas empresas que dependem do HDFS para dimensionar suas infraestruturas de big data. Por design, o HDFS é limitado por sua capacidade NameNode, portanto, armazenar grandes quantidades de arquivos pequenos pode afetar significativamente o desempenho. Essa limitação geralmente ocorre quando o tamanho dos dados aumenta além de dez petabytes e se torna um problema real além dos 50-100 petabytes. Felizmente, existem soluções relativamente diretas para dimensionar o HDFS de algumas dezenas para algumas centenas de petabytes, por exemplo, aproveitando o ViewFS e usando o HDFS NameNode Federation. Controlando o número de arquivos pequenos e movendo partes diferentes de nossos dados para clusters separados (por exemplo, logs de aplicativos HBase e Yarn movidos para um cluster HDFS separado), conseguimos atenuar essa limitação do HDFS.
  • Dados mais rápidos no Hadoop: Os negócios da Uber operam em tempo real e, como tal, nossos serviços exigem acesso a dados o mais recentes possível. Como resultado, a latência de dados de 24 horas era muito lenta para muitos casos de uso e havia uma grande demanda por uma entrega de dados mais rápida. O método de ingestão baseado na captura instantânea da nossa segunda geração da plataforma Big Data foi ineficiente e nos impediu de ingerir dados com menor latência. Para agilizar a entrega de dados, tivemos que redesenhar nosso pipeline para a ingestão incremental apenas de dados atualizados e novos.
  • Suporte a atualizações e exclusões no Hadoop e Parquet: os dados da Uber contêm muitas atualizações, com idades que variam desde os últimos dias (por exemplo, um passageiro ou motorista parceiro que ajusta uma tarifa de viagem recente) a algumas semanas (por exemplo, um passageiro classificando sua última viagem na próxima vez que fizer uma nova viagem) ou mesmo alguns meses (por exemplo, preenchimento ou ajuste de dados anteriores devido a uma necessidade comercial). Com a ingestão de dados baseada em capturas instantâneas, nós ingerimos uma nova cópia dos dados de origem a cada 24 horas. Em outras palavras, nós ingerimos todas as atualizações de uma só vez, uma vez por dia. No entanto, com a necessidade de novos dados e ingestão incremental, nossa solução deve ser capaz de suportar operações de atualização e exclusão de dados existentes. No entanto, como nosso Big Data é armazenado no HDFS e no Parquet, não é possível suportar diretamente as operações de atualização nos dados existentes. Por outro lado, nossos dados contêm tabelas extremamente amplas (cerca de 1.000 colunas por tabela) com cinco ou mais níveis de aninhamento, enquanto as consultas de usuários geralmente só tocam em algumas dessas colunas, impedindo que utilizemos formatos não-colunares em um custo eficiente. Para preparar nossa plataforma de Big Data para crescimento a longo prazo, tivemos que encontrar uma maneira de resolver essa limitação dentro do nosso sistema de arquivos HDFS, para que também pudéssemos dar suporte também às operações de atualização/exclusão.
  • ETL e modelagem mais rápida: Semelhante à ingestão de dados brutos, os trabalhos de ETL e modelagem eram baseados em capturas instantâneas, exigindo que nossa plataforma reconstruísse tabelas derivadas em todas as execuções. Para reduzir a latência de dados para tabelas modeladas, as tarefas de ETL também precisavam se tornar incrementais. Isso exigia que as tarefas de ETL extraíssem apenas os dados alterados da tabela de origem bruta e atualizassem a tabela de saída derivada anterior, em vez de recriar toda a tabela de saída a cada poucas horas.

Apresentando Hudi

Com os requisitos acima em mente, criamos o Hadoop Upserts and Incremental (Hudi), uma biblioteca Spark de código aberto que fornece uma camada de abstração sobre o HDFS e o Parquet para suportar as operações de atualização e exclusão necessárias. O Hudi pode ser usado em qualquer trabalho do Spark, é dimensionável horizontalmente e depende apenas do HDFS para operar.

Como resultado, qualquer plataforma de Big Data que precise dar suporte a operações de atualização/exclusão para os dados históricos pode alavancar o Hudi.

O Hudi nos permite atualizar, inserir e excluir dados existentes do Parquet no Hadoop. Além disso, o Hudi permite que os usuários de dados extraiam apenas dados alterados de forma incremental, melhorando significativamente a eficiência da consulta e permitindo atualizações incrementais de tabelas modeladas derivadas.

Os dados brutos em nosso ecossistema Hadoop são particionados com base no tempo e qualquer uma das partições antigas pode potencialmente receber atualizações em um momento posterior. Assim, para um usuário de dados ou um trabalho ETL que depende dessas tabelas de dados de origem bruta, a única maneira de saber qual partição de data contém dados atualizados é verificar toda a tabela de origem e filtrar registros com base em alguma noção conhecida de tempo.

Isso resulta em uma consulta computacionalmente dispendiosa que requer uma verificação completa da tabela de origem e impede que as tarefas de ETL sejam executadas com muita frequência.

Com o Hudi, os usuários podem simplesmente passar seu último carimbo de data e hora e recuperar todos os registros que foram atualizados desde então, independentemente de essas atualizações serem novos registros adicionados a partições de data recente ou atualizações de dados antigos (por exemplo, uma nova viagem acontecendo hoje versus uma viagem atualizada de 6 meses atrás), sem executar uma consulta dispendiosa que verifica toda a tabela de origem.

Usando a biblioteca Hudi, conseguimos nos afastar da ingestão baseada em capturas instantâneas de dados brutos para um modelo de ingestão incremental que nos permite reduzir a latência de dados de 24 horas para menos de uma hora.

A Figura 5, abaixo, mostra nossa plataforma Big Data após a incorporação de Hudi:

Figura 5: A terceira geração da nossa plataforma Big Data incorpora ingestão de dados incremental e mais rápida (usando nossa framework Marmaray de código aberto), bem como armazenamento e serviço de dados mais eficientes através da nossa biblioteca Hudi de código aberto.

Ingestão de dados genéricos

O Hudi não é o único acréscimo à terceira geração da nossa plataforma de Big Data. Também formalizamos a transferência de alterações do armazenamento de dados upstream entre as equipes de armazenamento e big data por meio do Apache Kafka.

Os eventos de armazenamento de dados upstream (bem como mensagens de registro clássicas de diferentes aplicativos e serviços) fluem para o Kafka com uma codificação Avro unificada, incluindo cabeçalhos de metadados globais padrão anexados (isto é, timestamp, chave de linha, versão, informações do centro de dados e host de origem). Ambas as equipes Streaming e Big Data usam esses eventos de changelog de armazenamento como seus dados de entrada de origem para processamento adicional.

Nossa plataforma de ingestão de dados, Marmaray, roda em mini-lotes e seleciona os changelogs de armazenamento de upstream a partir do Kafka, aplicando-os sobre os dados existentes no Hadoop usando a biblioteca Hudi. Como mencionado anteriormente, o Hudi suporta operações upsert, permitindo aos usuários adicionar novos registros e atualizar ou excluir dados históricos.

A ingestão de tarefas do Spark é executada a cada 10-15 minutos, fornecendo uma latência de dados brutos de 30 minutos no Hadoop (com espaço para 1 a 2 falhas ou tentativas de tarefa de ingestão).

Para evitar ineficiências resultantes da ingestão dos mesmos dados de origem no Hadoop mais de uma vez, nossa configuração não permite transformações durante a ingestão de dados brutos, resultando na nossa decisão de tornar nosso framework de ingestão de dados brutos uma plataforma EL em oposição a uma plataforma ETL tradicional.

Sob esse modelo, os usuários são encorajados a executar as operações de transformação desejadas no Hadoop e no modo em lote depois que os dados do upstream chegam em seu formato aninhado bruto.

Desde a implementação dessas alterações em nossa plataforma de Big Data, salvamos uma quantidade significativa de recursos computacionais evitando operações de ingestão desnecessárias ou ineficientes. A confiabilidade de nossos dados brutos também melhorou significativamente, já que agora podemos evitar transformações propensas a erros durante a ingestão.

Agora, os usuários podem executar suas transformações sobre os dados de origem brutos usando qualquer mecanismo de processamento de Big Data. Além disso, em caso de problemas, os usuários podem reexecutar novamente suas transformações e ainda atender aos seus SLAs usando mais recursos de computação e um maior grau de paralelismo para concluir os trabalhos de transformação em lote mais rapidamente.

Modelagem de dados incrementais

Considerando o grande número de repositórios de dados upstream que precisam ser ingeridos no Hadoop (mais de 3.000 tabelas Hadoop brutas a partir de 2017), também construímos uma plataforma de ingestão genérica que facilita a ingestão de dados brutos no Hadoop de maneira unificada e configurável.

Agora, nossa plataforma Big Data atualiza as tabelas brutas do Hadoop incrementalmente com uma latência de dados de 10 a 15 minutos, permitindo acesso rápido aos dados de origem.

No entanto, para garantir que as tabelas modeladas também estejam disponíveis com baixa latência, devemos evitar ineficiências (ou seja, recriação de tabela derivada completa ou varreduras de tabela bruta de origem completa) em nossas tarefas ETL de modelagem também. Na verdade, Hudi permite que as tarefas ETL busquem apenas os dados alterados da tabela de origem.

As tarefas de modelagem só precisam passar um registro de data e hora do ponto de verificação durante cada execução iterativa para o leitor Hudi para receber um fluxo de registros novos ou atualizados da tabela de origem bruta (independentemente da partição de data na qual o registro real está armazenado).

O uso de um gravador Hudi durante uma tarefa ETL nos permite atualizar partições antigas nas tabelas modeladas derivadas sem recriar toda a partição ou tabela. Assim, nossas tarefas ETL de modelagem usam leitores Hudi para buscar incrementalmente somente os dados alterados da tabela de origem e usar os gravadores Hudi para atualizar incrementalmente a tabela de saída derivada.

Agora, as tarefas ETL também são concluídas em menos de 30 minutos, fornecendo latência de ponta a ponta de menos de uma hora para todas as tabelas derivadas no Hadoop.

Para fornecer aos usuários de dados das tabelas do Hadoop opções diferentes para acessar todos os dados ou somente dados novos ou atualizados, as tabelas brutas do Hadoop usando o formato de armazenamento Hudi fornecem dois modos de leitura diferentes:

  • Modo de exibição mais recente. Fornece uma visão holística de toda a tabela do Hadoop naquele momento. Essa visualização inclui os valores mesclados mais recentes para todos os registros, bem como todos os registros existentes em uma tabela.
  • Visualização do modo incremental. Busca apenas os registros novos e atualizados de uma tabela do Hadoop específica com base em um determinado registro de data e hora. Essa visualização retorna apenas as linhas que foram inseridas recentemente ou foram atualizadas desde o último ponto de verificação. Além disso, se uma linha específica for atualizada mais de uma vez desde o último ponto de verificação, esse modo retornará todos esses valores intermediários alterados (em vez de apenas retornar o último mesclado)

A Figura 6, abaixo, descreve essas duas visualizações de leitura para todas as tabelas do Hadoop armazenadas no formato de arquivo Hudi:

Figura 6: Uma tabela bruta que está sendo atualizada através do gravador Hudi pode ser lida em dois modos diferentes: a última exibição de modo retornando o último valor para todos os registros e a exibição de modo incremental retornando somente os registros atualizados desde a última leitura.

Os usuários geralmente alternam entre essas duas visualizações de tabela com base em suas necessidades. Quando eles executam uma consulta ad hoc para analisar dados com base no estado mais recente, eles usam a visualização de modo mais recente da tabela (por exemplo, para buscar o número total semanal de viagens por cidade nos EUA).

Por outro lado, quando um usuário possui uma tarefa ou consulta iterativa que precisa buscar somente registros alterados ou novos desde sua última execução, eles usam a visualização de modo incremental. Ambas as visualizações estão disponíveis para todas as tabelas do Hadoop em todos os momentos, e os usuários podem alternar entre diferentes modos com base em suas necessidades.

Modelo de dados padronizado

Além de fornecer exibições diferentes da mesma tabela, também padronizamos nosso modelo de dados para fornecer dois tipos de tabelas para todos os dados brutos do Hadoop:

  • Tabela de histórico do registro de alterações. Contém o histórico de todos os changelogs/registros de alterações recebidos para uma tabela upstream específica. Essa tabela permite que os usuários examinem o histórico de alterações de uma determinada tabela e possam ser mesclados por chave para fornecer o valor mais recente para cada linha.
  • Tabela de captura instantânea mesclado. Abriga a última exibição mesclada das tabelas upstream. Esta tabela contém a exibição mesclada compactada de todos os changelogs históricos recebidos por chave.

A Figura 7, abaixo, descreve como diferentes tabelas brutas do Hive são geradas para um armazenamento de dados de origem upstream específico usando o fluxo de registros de alterações fornecidos:

Figura 7: Padronizando a qualidade de dados aprimorada do nosso modelo de dados Hive para todo o nosso ecossistema Big Data. Este modelo incorpora uma tabela de captura instantânea mesclada contendo os valores mais recentes para cada row_key, bem como uma tabela de histórico de registro de alterações contendo o histórico de todas as alterações de valor para cada row_key.

No entanto, o fluxo de registro de alterações pode ou não conter a linha inteira (todas as colunas) para uma determinada chave. Embora as tabelas de captura instantânea mescladas sempre forneçam todas as colunas para uma chave específica, a tabela de histórico de registro de alterações pode ser esparsa se o fluxo upstream de registro de alterações só fornecer registros de alterações de linha parcial, uma funcionalidade que melhora a eficiência evitando reenviar a linha inteira quando apenas um ou alguns valores de coluna limitados estão alterados.

Se os usuários desejarem buscar os valores alterados da tabela de históricos do registro de alterações e associá-los à tabela de capturas instantâneas mesclada para criar a linha completa de dados, também incluiremos a partição de datas da mesma chave da tabela de capturas instantâneas mesclada na tabela de histórico de registro alterações.

Isso permite que as duas tabelas ingressem de maneira mais eficiente em uma partição específica, evitando uma varredura de tabela completa da tabela de captura instantânea mesclada ao unir as duas.

A Figura 8, abaixo, resume a relação entre os diferentes componentes da nossa plataforma Big Data:

Figura 8: A criação de uma plataforma de transferência de dados mais extensível nos permitiu agregar facilmente todos os pipelines de dados de uma maneira padrão em um único serviço, além de oferecer suporte à conectividade do tipo “qualquer a qualquer” entre qualquer fonte de dados e data sink.

Geração 4: O que vem a seguir?

Desde o lançamento da terceira geração da nossa plataforma de Big Data em 2017, os usuários da empresa podem acessar os dados no Hadoop de maneira rápida e confiável, mas sempre há espaço para crescer. Abaixo resumimos nossos esforços contínuos para melhorar a plataforma Big Data da Uber para aprimoradas qualidade dos dados, latência de dados, eficiência, escalabilidade e confiabilidade.

Qualidade dos dados

Para melhorar a qualidade dos dados, identificamos duas áreas-chave para melhoria. Primeiro, queremos evitar dados que não estão em conformidade com o esquema quando alguns dos repositórios de dados upstream não impõem obrigatoriamente ou verificam o esquema de dados antes do armazenamento (por exemplo, armazenando um valor-chave em que o valor é um blob JSON).

Isso resulta em dados incorretos que entram no ecossistema do Hadoop, afetando todos os usuários a jusante que também dependem desses dados.

Para evitar um influxo de dados incorretos, estamos fazendo a transição de todos os repositórios de dados upstream para executar verificações de esquema obrigatórias no conteúdo de dados e rejeitando entradas de dados se houver algum problema (por exemplo, não confirmação com o esquema) com os dados.

A segunda área que achamos problemática foi a qualidade do conteúdo real dos dados. Embora usar esquemas garanta que os dados contenham tipos de dados corretos, elas não verificam os reais valores dos dados (por exemplo, um número inteiro em oposição a um número positivo entre [0,150]).

Para melhorar a qualidade dos dados, estamos expandindo nosso serviço de esquema para suportar verificações semânticas. Essas verificações semânticas (em outras palavras, tipos de dados específicos da Uber) nos permitem adicionar restrições extras ao conteúdo real dos dados além da verificação básica do tipo estrutural.

Latência de dados

Nosso objetivo é reduzir a latência de dados brutos no Hadoop para cinco minutos e a latência de dados das tabelas modeladas para dez minutos. Isso permitirá que mais casos de uso saiam do processamento de fluxo para um processamento de mini-lote mais eficiente que usa os pulls de dados incrementais de Hudi.

Também estamos expandindo nosso projeto Hudi para suportar um modo de visualização adicional, que incluirá a visualização otimizada de leitura existente, bem como uma nova visualização em tempo real que mostra dados com latência de apenas alguns minutos. Essa visão em tempo real depende de uma solução de código aberto (e parte do Hudi) que chamamos de Merge-On-Read ou Hudi 2.0.

Eficiência de dados

Para melhorar a eficiência dos dados, estamos nos afastando da dependência/confiança em hardware dedicado para qualquer um de nossos serviços e em relação à dockerização de serviços.

Além disso, estamos unificando todos os nossos agendadores/planejadores de recursos dentro e através do nosso ecossistema Hadoop para preencher a lacuna entre nossos serviços Hadoop e serviços sem dados em toda a empresa.

Isso permite que todos os trabalhos e serviços sejam agendados de forma unificada, independentemente do meio em que serão executados. À medida que a Uber cresce, a localidade de dados será uma grande preocupação para aplicativos Hadoop e um gerenciador de recursos unificado bem-sucedido pode reunir todos os agendadores existentes (ou seja, Yarn, Mesos e Myriad).

Escalabilidade e confiabilidade

Como parte de nosso esforço para melhorar a escalabilidade e a confiabilidade de nossa plataforma, identificamos vários problemas relacionados a possíveis casos de limites. Embora nossa plataforma de ingestão tenha sido desenvolvida como um modelo genérico conectável, a ingestão real de dados upstream ainda inclui muitas configurações de pipeline dependentes de origem, tornando o pipeline de ingestão frágil e aumentando os custos de manutenção de operar vários milhares desses pipelines.

Para garantir a ingestão unificada de dados, independentemente da fonte de dados, iniciamos um projeto em colaboração com a equipe de Armazenamento de Dados da Uber para unificar o conteúdo, o formato e os metadados dos registros de alterações de todas as fontes de dados upstream, independentemente de sua composição tecnológica.

Esse projeto garantirá que as informações sobre essas tecnologias upstream específicas sejam apenas um metadado adicional adicionado ao valor real do registro de alteração (em oposição a ter conteúdo de registro de alteração e metadados totalmente diferentes para diferentes origens de dados) e a ingestão de dados ocorrerá independentemente da origem do upstream.

Finalmente, nossa próxima versão do Hudi nos permitirá gerar arquivos de Parquet muito maiores (mais de um gigabyte em comparação com nossos 128 megabytes atuais) por padrão dentro de alguns minutos para todas as nossas fontes de dados.

Também removerá quaisquer sensibilidades em torno da proporção de atualizações versus inserções. O Hudi 1.0 depende de uma técnica chamada copy-on-write que reescreve todo o arquivo de origem do Parquet sempre que houver um registro atualizado.

Isso aumenta significativamente a amplificação de gravação, especialmente quando a proporção de atualização para inserção aumenta, e impede a criação de arquivos Parquet maiores em HDFs.

A nova versão do Hudi é projetada para superar essa limitação armazenando o registro atualizado em um arquivo delta separado e de maneira assíncrona mesclando-o com o arquivo base do Parquet com base em uma determinada política (por exemplo, quando há quantidade suficiente de dados atualizados para amortizar o custo de reescrever um arquivo Parquet grande de base).

Com os dados do Hadoop armazenados em arquivos maiores do Parquet, além de uma plataforma de ingestão de dados independente de fontes mais confiável, nossa plataforma de dados analíticos continuará a crescer nos próximos anos à medida que os negócios prosperarem.

Avançando

A organização de dados da Uber é uma colaboração multifuncional entre as equipes Data Platform, Data Foundation, Streaming e Real-time Platform e Big Data para criar as bibliotecas necessárias e serviços distribuídos que suportam a infraestrutura de dados analíticos da Uber.

Se trabalhar em desafios de Big Data que surpreendem os limites de escala lhe interessa, considere se candidatar a uma função em nossas equipes sediadas em São Francisco e Palo Alto.

Por favor, envie seu currículo para hadoop-platform-jobs@uber.com se estiver interessado em trabalhar conosco!

***

Este artigo é do Uber Engineering. Ele foi escrito por Reza Shiftehfar. A tradução foi feita pela Redação iMasters com autorização. Você pode conferir o original em: https://eng.uber.com/uber-big-data-platform/