Banco de Dados

11 jan, 2019

O desafio de um bilhão de pontos de dados: criando um mecanismo de consulta para dados de série temporal

Publicidade

A Uber, como a maioria das grandes empresas de tecnologia, depende bastante de métricas para monitorar efetivamente todo o seu stack.

Desde métricas de sistema de nível baixo, como a utilização de memória de um host, até métricas de negócios de nível alto, incluindo o número de pedidos da Uber Eats em uma cidade específica, elas permitem que nossos engenheiros entendam como nossos serviços estão operando diariamente.

À medida que nossa dimensionalidade e uso de métricas aumentam, soluções comuns como Prometheus e Graphite tornam-se difíceis de gerenciar e às vezes deixam de funcionar. Devido à falta de soluções disponíveis, decidimos criar uma plataforma de métricas internas e de código aberto, chamada M3, que pudesse lidar com a escala de nossas métricas.

Um componente importante da plataforma M3 é seu mecanismo de consulta, que construímos a partir do zero e usamos internamente há vários anos.

A partir de novembro de 2018, nosso mecanismo de consulta de métricas processa cerca de 2.500 consultas por segundo (Figura 1), cerca de 8,5 bilhões de pontos de dados por segundo (Figura 2) e aproximadamente 35 Gbps (Figura 3).

Esses números têm aumentado constantemente em uma taxa muito superior ao crescimento orgânico da Uber, devido à maior adoção de métricas em várias partes de nossa pilha.

Figura 1. Nosso mecanismo de consulta de métricas processa cerca de 2.500 consultas por segundo. Acima, mapeamos as consultas em um período de sete dias.
Figura 2. Nosso mecanismo de métricas retorna cerca de 8,5 bilhões de pontos de dados por segundo. Acima, mapeamos os retornos dos pontos de dados ao longo de um período de sete dias.
Figura 3. Nosso mecanismo de consulta de métricas processa aproximadamente 3,5 Gbps. Acima, mapeamos o tráfego de rede em um período de sete dias.

Neste artigo, apresentamos os desafios que enfrentamos ao projetar um mecanismo de consulta para a M3 com desempenho e dimensionável o suficiente para suportar o cálculo de uma grande quantidade de dados a cada segundo, bem como flexível o suficiente para suportar várias linguagens de consulta.

Arquitetura do mecanismo de consulta

Uma consulta M3 passa por três fases: análise, execução e recuperação de dados. Os componentes de análise e execução de consulta funcionam juntos como parte de um serviço de consulta comum, e a recuperação é feita por meio de um wrapper/envoltório fino sobre os nodes de armazenamento, conforme ilustrado na Figura 4, abaixo:

Figura 4. A arquitetura do mecanismo de consulta da M3 é executada nas fases de análise, execução e recuperação de dados.

Para suportar várias linguagens de consulta, criamos um formato de gráfico acíclico direcionado (DAG) que pode ser compilado tanto pela Linguagem de Consulta M3 (M3QL) quanto pela Linguagem de Consulta Prometheus (PromQL). O DAG pode ser entendido como uma representação abstrata da consulta que precisa ser executada.

Esse design facilita a adição de novas linguagens de consulta ao desacoplar a análise de linguagem da execução da consulta.

Antes de programar o DAG para execução, usamos um limitador de taxa e um autorizador para garantir que apenas usuários autorizados possam emitir solicitações, limitando-as em taxa para evitar abusos.

A fase de execução controla todas as consultas em execução e seu consumo de recursos. Ela pode rejeitar ou cancelar uma consulta que resulte em qualquer esgotamento de recursos, como a memória total disponível para o processo.

Depois que uma consulta é iniciada, o mecanismo de execução busca os dados necessários e, em seguida, executa as funções especificadas. A fase de execução é desacoplada do armazenamento, permitindo-nos adicionar facilmente suporte a outros back-ends de armazenamento.

Enquanto o M3DB, o banco de dados de séries temporais de código aberto da Uber, seja atualmente o único back-end de armazenamento suportado pela M3, no futuro planejamos adicionar suporte a mais provedores de armazenamento, como o OpenTSDB e o MetricTank.

Utilização de memória

À medida que mais e mais equipes da Uber começaram a usar a M3, ferramentas foram construídas no topo da plataforma para executar funções como detecção de anomalias, estimativa de recursos e alertas.

Logo percebemos que precisávamos colocar limites em nossos tamanhos de consulta para evitar que consultas caras consumissem toda a memória do nosso serviço.

Por exemplo, nosso limite inicial de memória para uma única consulta era de 3,5 GB, um tamanho que nos permitia atender a uma quantidade razoável de tráfego sem sobrecarregar muito nosso sistema. Ainda assim, se um host de consulta recebesse várias consultas grandes, ele sobrecarregaria e ficaria sem memória.

Além disso, percebemos que uma vez que nosso serviço de consulta começava a degradar, os usuários continuariam atualizando seus painéis porque as consultas estavam retornando muito devagar ou nem retornando. Isso aumentaria o problema gerando consultas adicionais que começariam a se acumular, porque as consultas originais nunca foram canceladas.

Além disso, descobrimos que alguns usuários avançados e plataformas construídas sobre o serviço de consultas pressionavam esses limites de memória, forçando-nos a pensar em maneiras de melhorar a utilização da memória do sistema.

Pooling

Durante a execução da consulta, gastamos muito tempo alocando fatias massivas que poderiam armazenar os resultados da computação. Como resultado, iniciamos o agrupamento de objetos, como séries e tags, um movimento que causou uma redução notável na sobrecarga da coleta de lixo.

Além disso, assumimos que nossa criação de goroutines seria extremamente leve, e foi, mas apenas porque cada nova goroutine começa com uma pequena memória de 2 kibibytes (KiB). Todos os nossos gráficos de chama indicavam que muito tempo estava sendo gasto aumentando nossa pilha de goroutines recém-criadas (runtime.morestack, runtime.newstack).

Percebemos que isso estava ocorrendo porque muitas de nossas chamadas de pilha excederam o limite de 2 KiB, e assim o tempo de execução do Go estava gastando muito tempo alocando novas goroutines de 2 KiB e, em seguida, imediatamente jogando fora e copiando a pilha para um pilha maior de 4 ou 8 de KiB. Decidimos que poderíamos evitar todas essas alocações inteiramente agrupando nossas goroutines e reutilizando-as por meio de um pool de trabalho.

Notificação de fechamento HTTP

Observando as estatísticas de uso do serviço de consulta, identificamos um padrão consistente no qual o serviço de consulta estava gastando muito tempo executando consultas que não eram mais necessárias ou haviam sido substituídas por uma nova consulta. Isso pode acontecer quando um usuário atualiza seu painel no Grafana e ocorre com mais frequência com consultas grandes e lentas.

Em tais situações, o serviço desperdiça recursos para avaliar desnecessariamente uma consulta demorada. Para remediar isso, adicionamos um notificador para detectar quando o cliente desconectou e cancelou as execuções restantes nas diferentes camadas do serviço de consulta por meio da propagação de contexto.

Embora todas as abordagens acima tenham reduzido significativamente o rastro de memória do mecanismo de consulta, a utilização da memória permaneceu como o principal gargalo para a execução de consultas.

Redesenho para a próxima ordem de grandeza

Sabíamos que, se quiséssemos apoiar um aumento de ordem de magnitude na escala de nosso serviço de consulta, precisaríamos de uma mudança de paradigma fundamental. Projetamos um framework de estilo MapReduce completa para o sistema com nodes de consulta e execução separados.

A ideia por trás dessa escolha de projeto era que, para garantir que os nodes de execução pudessem escalonar infinitamente, precisávamos dividir a consulta em unidades de execução menores (fase Mapear). Uma vez processadas as unidades de execução, poderíamos recombinar os resultados (fase Reduzir).

Quando começamos a implementar nosso projeto, percebemos rapidamente a complexidade dessa solução e começamos a procurar soluções mais simples.

Uma das principais informações de nosso processo de avaliação foi que não deveríamos descompactar dados em busca se estivéssemos lidando com back-ends de armazenamento que mantêm os dados compactados internamente, que é exatamente como o M3DB armazena dados.

Se atrasássemos a descompressão o maior tempo possível, poderíamos reduzir nosso rastro de memória. Para conseguir isso, decidimos tirar uma página do livro de programação funcional e reprojetar nosso mecanismo de consulta para avaliar funções de maneira preguiçosa, atrasando alocações de representações intermediárias (e, às vezes, eliminando-as totalmente) pelo maior tempo possível.

Figura 5. O rastro de memória da execução sequencial (Abordagem 1) é muito maior do que o da execução lenta (Abordagem 2).

A figura 5, acima, demonstra uma operação linear – clamp_min seguida de sum. Durante a avaliação, determinamos que havia duas abordagens para executar nesta descompressão: aplicar ambas as funções sequencialmente (Abordagem 1) ou aplicar clamp_min e sum em uma coluna de cada vez (Abordagem 2). O diagrama acima descreve as etapas executadas por cada abordagem.

Para tentar a Abordagem 1, precisaríamos de cerca de 1,7 GB de memória (supondo que cada ponto de dados tenha oito bytes).

Neste caso, teríamos que gerar um único bloco com todas as séries 10K, o que exigiria a maior parte da memória. Portanto, ela dimensiona com o número de séries * o número de pontos de dados por série.

Na Abordagem 2, no entanto, precisamos apenas manter uma fatia para uma determinada coluna em todas as séries e, em seguida, chamar sum. A fatia ocupa 80 KB de memória (assumindo pontos de dados de oito bytes) e o bloco final requer 161 KB de memória.

Essa abordagem dimensiona a utilização da memória linearmente com o número de séries e, portanto, diminui significativamente o rastro/espaço ocupado pela memória.

Armazenamento de dados

Com base no padrão de uso de nosso serviço, a maioria das transformações em uma consulta foi aplicada em diferentes séries para cada intervalo de tempo. Portanto, ter dados armazenados em um formato colunar ajuda na localização da memória dos dados.

Além disso, se dividirmos nossos dados ao longo do tempo em blocos, a maioria das transformações pode funcionar em paralelo em blocos diferentes, aumentando assim a nossa velocidade de computação, conforme ilustrado na Figura 6, abaixo:

Figura 6. Uma estrutura de bloco nos permite trabalhar em paralelo em diferentes blocos de armazenamento, o que melhora muito nossa velocidade de computação.

Uma ressalva para essa decisão é que algumas funções, como movingAverage, funcionam/trabalham na mesma série ao longo do tempo. Para essas funções, não podemos processar os blocos independentemente, pois um bloco pode depender de vários blocos anteriores. Para esses casos, a função calcula os blocos de entrada dependentes para cada bloco de saída e armazena em cache os blocos de entrada na memória até que a função tenha todos os blocos dependentes para um bloco de saída. Uma vez que um bloco de entrada não é mais necessário, ele é despejado desse cache.

É importante observar que outros sistemas de monitoramento, como o Prometheus, defendem o uso de regras de registro para pré-agregar métricas para que esses tipos de problemas possam ser evitados.

Embora tenhamos a capacidade de pré-agregar, descobrimos que os desenvolvedores ainda precisam executar consultas ad hoc que acabarão buscando dezenas de milhares e algumas vezes centenas de milhares de séries temporais para gerar o resultado final.

Redução da latência de métricas

A Uber é executada em uma configuração de cluster ativo-ativo que atende simultaneamente o tráfego de vários centros de dados, bem como várias zonas de nuvem, o que significa que as métricas são constantemente geradas em muitos locais geográficos diferentes.

Muitos de nossos casos de uso de métricas exigem uma visão global das métricas em nossos centros de dados. Por exemplo, quando realizamos um tráfego falha de um centro de dados para outro, podemos querer confirmar que o número total de motoristas parceiros online em todos os centros de dados não mudou, mesmo que o número por centro de dados individual tenha mudado drasticamente.

Para alcançar essa visão abrangente, precisávamos que métricas de um centro de dados estivessem disponíveis em outros centros de dados. Poderíamos realizar isso em tempo de gravação, replicando todos os nossos dados para vários centros de dados ou durante o tempo de leitura, distribuindo/dispersando quando os dados fossem consultados.

No entanto, as gravações em todos os centros de dados aumentariam nossos custos de armazenamento proporcionalmente ao número de centros de dados. Para reduzir os custos, tomamos a decisão de fazer uma distribuição/dispersão de leitura no tempo/momento da consulta para recuperar os dados das métricas.
Nossa implementação original do serviço de consulta fez com que a instância que recebia a solicitação buscasse os dados de vários centros de dados.

Depois de buscar os dados correspondentes, o node avaliaria localmente a consulta e retornaria o resultado. Muito rapidamente, no entanto, isso se tornou um gargalo de latência, já que buscas cruzadas através de centros de dados eram lentas e a largura de banda de nossa conexão através de centros de dados era muito estreita.

Além disso, essa implementação coloca ainda mais pressão de memória na instância que recebeu a solicitação de consulta inicial.

Com a nossa primeira versão, a latência de p95 para métricas coletadas através de centros de dados era dez vezes maior do que a latência de p95 para métricas buscadas em um único centro de dados. Como esse problema se agravou, começamos a procurar soluções melhores.

Durante nossas chamadas através de centros de dados, começamos a devolver as métricas compactadas diretamente, em vez de enviá-las sem compactação.

Além disso, começamos a transmitir essas métricas em vez de enviá-las em um lote gigante. Esses esforços combinados levaram a uma redução de 3x na latência de consultas através de centros de dados.

Downsampling/Redução da amostragem

Além de reduzir a latência ao recuperar dados de nossos centros de dados, também desejamos melhorar o desempenho das várias ferramentas de interface do usuário, incluindo o Grafana, que nosso sistema de monitoramento usa.

Para consultas que retornam um grande número de pontos de dados, não necessariamente faz sentido devolver tudo ao usuário. Na verdade, às vezes, o Grafana e outras ferramentas fazem com que o navegador bloqueie ao exibir muitos dados em um determinado momento.

Além disso, o nível de granularidade que um usuário precisa visualizar normalmente pode ser alcançado sem mostrar todos os pontos de dados.

Enquanto o Grafana limita o número de pontos de dados processados ​​com base no número de pixels na tela por meio de sua configuração maxDatapoints, os usuários ainda experimentam atraso ao exibir gráficos maiores.

Para reduzir essa carga, implementamos a redução da amostragem, o que reduz significativamente o número de pontos de dados recuperados ao consultar métricas.

Inicialmente, implementamos um algoritmo de média ingênua, o que significa que geraríamos um ponto de dados calculando a média de vários, mas isso tinha o efeito negativo de ocultar picos e baixos nos dados, além de modificar os pontos de dados reais e torná-los imprecisos.

Após avaliar vários algoritmos de downsampling/redução da amostragem, decidimos aproveitar o algoritmo Largest Triangle Three Bucket (LTTB) para nosso caso de uso. Esse algoritmo faz um bom trabalho de manter a forma original dos dados, incluindo a exibição de outliers, que geralmente são perdidos com métodos mais ingênuos, além de possuir a propriedade útil de apenas retornar pontos de dados existentes no conjunto de dados original.

Nas Figuras 7, 8 e 9, abaixo, demonstramos as diferenças entre não reduzir a amostragem (Figura 7), usar um algoritmo de média ingênuo (Figura 8) e usando o algoritmo LTTB (Figura 9):

Figura 7. Ao impulsionar algoritmo sem redução de amostragem, os resultados são os mais precisos, mas mais longos para serem carregados.
Figura 8. O algoritmo de média é rápido para carregar, mas oculta anomalias, como o ponto de dados circulado no gráfico.
Figura 9. O algoritmo LTTB captura anomalias como a mostrada no gráfico e carrega rapidamente.

M3QL

Antes de desenvolver o M3QL, nosso sistema de métricas suportava apenas o Graphite, uma linguagem baseada em caminhos (por exemplo, foo.bar.baz).

No entanto, o Graphite não era expressivo o suficiente para as nossas necessidades, e queríamos uma solução baseada em tags (por exemplo, foo: bar biz: baz) para tornar a consulta muito mais simples.

Mais especificamente, queríamos uma solução que melhorasse a capacidade de descoberta e a facilidade de uso, dada a escala maciça de nosso armazenamento de métricas.

Em nossa experiência, é muito mais difícil recuperar métricas usando uma solução baseada em caminhos, porque o usuário deve saber exatamente o que cada node representa na métrica. Por outro lado, com uma abordagem baseada em tags, não há adivinhações e o preenchimento automático funciona muito melhor.

Além de criar uma linguagem de consulta baseada em tag, também decidimos fazer do M3QL uma linguagem baseada em pipe (como UNIX), em oposição a uma linguagem semelhante a SQL, como IFQL, ou uma linguagem semelhante à expressão C, como PromQL.

As linguagens baseadas em pipe permitem que os usuários leiam as consultas da esquerda para a direita, em vez de de dentro para fora. Nós detalhamos uma consulta M3QL típica mostrando a taxa de falhas de um determinado endpoint na Figura 10, abaixo:

Figura 10. Uma consulta típica do M3QL mostrando a taxa de falhas de um determinado endpoint.

Suporte para Prometheus

Foi uma decisão clara para nós integrar nosso mecanismo de consulta com o Prometheus, um sistema de monitoramento de código aberto amplamente utilizado. Os exportadores da Prometheus, como o seu exportador de nodes, tornam incrivelmente fácil recuperar hardware e métricas relacionadas ao kernel.

Além disso, descobrimos que muitos engenheiros têm experiência anterior no uso de PromQL, facilitando a integração de equipes no M3.

Como o nosso mecanismo de consulta agora suporta originalmente o PromQL, os usuários não precisam aprender linguagens adicionais, nem precisarão recriar painéis para exportadores de terceiros.

Também criamos a API do mecanismo de consulta para que os usuários possam se conectar diretamente ao Grafana, selecionando o tipo de fonte de dados Prometheus.

Próximos passos

À medida que continuamos a construir o ecossistema M3, estamos comprometidos em melhorar nosso mecanismo de consulta adicionando novas funcionalidades e lidando com casos de uso adicionais.

O alerta e as métricas andam de mãos dadas, e é por isso que o alerta tem sido uma das principais prioridades para nós. Atualmente, suportamos a execução do Grafana em nosso mecanismo de consulta.

Internamente, executamos um aplicativo de alerta de código fechado mais avançado, que pretendemos abrir o código-fonte nos próximos meses.

Devido ao enorme crescimento da Uber, precisamos levar em consideração as consultas curtas e cotidianas e as mais longas para o planejamento de capacidade e outros casos de uso de resumo/relatório que geralmente precisam de dados de várias fontes de dados.

Para suportar isso, planejamos criar um conector Presto, permitindo que os usuários executem consultas mais longas, além de possibilitar a combinação de dados de outras fontes de dados, como SQL ou HDFS.

Comece com o M3

Experimente o mecanismo de consulta M3 para você mesmo! Por favor, envie pedidos de pull, questões e propostas para o repositório de propostas do M3.

Esperamos que você comece com o M3 e nos dê sua opinião!

Se você estiver interessado em enfrentar os desafios de infraestrutura em escala, considere se candidatar a um cargo na Uber.

Visite a página oficial de código aberto da Uber para obter mais informações sobre o M3 e outros projetos.

***

Este artigo é do Uber Engineering. Ele foi escrito por Benjamin Raskin e Nikunj Aggarwal. A tradução foi feita pela Redação iMasters com autorização. Você pode conferir o original em: https://eng.uber.com/billion-data-point-challenge/