Data

18 dez, 2018

Sessionizing de viagens da Uber em tempo real

Publicidade

Em certo sentido, o desafio de combinar eficientemente passageiros e motoristas no mundo real se resume à questão de como coletar, armazenar e organizar logicamente os dados.

Nossos esforços para garantir tempos de espera baixos, prevendo a demanda do passageiro e, ao mesmo tempo, permitindo que os motoristas usem a plataforma da maneira mais eficiente possível, levando em conta o tráfego e outros fatores, apenas ampliam o escopo dos dados envolvidos.

Para melhor focar em como gerenciamos as grandes quantidades de dados em tempo real em vários sistemas que compõem o Uber Marketplace, desenvolvemos a Rider Session State Machine, uma metodologia que modela o fluxo de todos os eventos de dados que compõem uma única viagem.

Nos referimos aos dados subjacentes a cada viagem como uma sessão, que começa quando um usuário abre o aplicativo da Uber. Essa ação aciona uma sequência de eventos de dados, desde quando o passageiro realmente solicita uma corrida até o ponto em que a viagem foi concluída.

Como cada sessão ocorre dentro de um período de tempo finito, podemos organizar mais facilmente os dados relevantes a serem usados em futuras análises para aprimorar ainda mais nossos serviços.

Entre outras funções, categorizar os dados da viagem da Uber em sessões facilita a compreensão e a descoberta de problemas ou a introdução de novos recursos.

Continue lendo para saber como projetamos essa nova session state machine e lições aprendidas ao longo do caminho.

A Rider Session State Machine

Uma das informações críticas que queremos capturar e entender em tempo real é o ciclo de vida completo de uma única viagem da Uber, desde o momento em que o passageiro abre o aplicativo até o momento em que chega ao destino final.

No entanto, dada a complexidade e a escala de nossos sistemas, esses dados são distribuídos em vários fluxos de eventos diferentes.

Por exemplo, quando alguém abre o app da Uber, ele pede que a pessoa escolha um destino e dispara um evento no fluxo de eventos do registro do usuário.

O aplicativo exibe produtos (uberPOOL, uberX, UberBLACK, etc) disponíveis nessa região geográfica, juntamente com preços para cada um, conforme gerado pelo sistema de preços dinâmico, com cada preço aparecendo como um evento discreto no fluxo de eventos de impressão.

Quando o passageiro seleciona um produto, o pedido vai para o nosso sistema de despacho, que combina o passageiro com um motorista-parceiro e atribui o veículo a essa viagem.

Quando o motorista-parceiro pega o passageiro, o aplicativo envia um evento “pickup completed” para o sistema de despacho, efetivamente iniciando a viagem. Quando o motorista chega ao destino e indica que o passageiro foi deixado em seu aplicativo, ele envia um evento “trip completed”.

Um ciclo de vida de viagem típico como esse pode abranger seis fluxos de eventos distintos, com eventos gerados pelo aplicativo do passageiro, pelo app do motorista e pelo servidor de despacho de back-end da Uber. Esses fluxos de eventos distintos são inseridos em uma única viagem da Uber.

Como podemos contextualizar esses fluxos de eventos para que eles possam ser logicamente agrupados e rapidamente exibir informações úteis para aplicativos de dados downstream? A resposta está na definição de uma máquina de estado limitada pelo tempo que modela o fluxo de diferentes eventos gerados pelo usuário e pelo servidor para a conclusão de uma única tarefa. Nós nos referimos a esse tipo de máquina de estado, consistindo em ações brutas, como uma “sessão”.

No contexto de um ciclo de vida da viagem da Uber, uma sessão consiste em uma série de eventos que começam quando um passageiro abre seu app e termina na conclusão bem-sucedida de sua viagem.

Também temos que considerar que nem todas as sessões passam por essa série completa de eventos, já que um passageiro pode cancelar a viagem depois de fazer a solicitação ou apenas abrir o aplicativo para verificar as tarifas. Por causa desses fatores, foi importante impor uma janela de tempo em uma sessão.

Figura 1: Essa ilustração mostra o fluxo de eventos na Rider Session state machine.

A sessão de viagem começa quando um usuário abre o aplicativo, gerando um evento discreto no log do app. Quando um usuário navega pelos produtos da Uber disponíveis em sua localização, nosso sistema de back-end de preços de viagem entrega várias impressões ao aplicativo, mostrando o preço de cada uma, iniciando o estado Shopping na sessão.

Podemos obter o estado Request Ride a partir do fluxo de eventos para dispositivos móveis do aplicativo para solicitar eventos, bem como do fluxo de eventos gerado pelo sistema de despacho, que registra todas as solicitações recebidas.

Quando um motorista pressiona o botão “Pickup Completed” em seu aplicativo, a sessão entra em seu estado On Trip. E, claro, a sessão termina quando o motorista pressiona o botão ” Trip Completed ” em seu aplicativo.

À medida que cada sessão modela os eventos que acontecem no mundo físico, nossa Rider Session state machine precisa ser resiliente, projetada para lidar com eventos fora do esperado. Por exemplo, um passageiro pode cancelar sua viagem após fazer uma solicitação, ou o carro de um motorista pode quebrar ou ficar preso no trânsito, forçando o motorista a cancelar a viagem. Nós modelamos esses cenários permitindo uma transição do estado Request Ride de volta para o estado Shopping.

Colocar todos os eventos relevantes para o ciclo de vida da nossa sessão em um único local desbloqueia uma ampla variedade de casos de uso, como:

  • Nossa equipe de Demand Modeling pode comparar impressões de aplicativos, quantas pessoas abriram o app, com dados da sessão em tempo real, ajudando a entender a probabilidade de um passageiro encomendar um produto específico depois de visualizá-lo no aplicativo.
  • Nossa equipe de Forecasting pode ver quantas sessões estão no estado Shopping em uma determinada área durante uma janela de tempo específica, usando essas informações para prever a demanda para essa região, o que ajuda os motoristas a entenderem onde é mais provável que peguem passageiros no futuro.

Fazendo sessões em produção

Usamos o Spark Streaming para implementar a Rider Session State Machine na produção porque:

  • 1 – Muitos de nossos pipelines de extração, transformação e carregamento (ETL) foram criados no Spark, já que a Samza, plataforma de streaming anterior escolhida pelao Uber, não tinha suporte suficiente para aplicativos de streaming baseados em estado, como a “sessionalização” (deixar assim mesmo?).
  • 2 – A função mapWithState do Spark Streaming para aplicativos de streaming stateful provou ser muito versátil, por exemplo, oferecendo tratamento de expiração de estado automático.

O pipeline ETL opera uma janela de micro-batching de um minuto e processa alguns bilhões de eventos por dia. O pipeline é executado em nosso cluster YARN (sugestão: verificar se tem esse artigo traduzido no iMasters e linkar se tiver) e usa 64 containers de núcleo único com 8 GB de memória. A saída vem na forma de transições de estado que contêm os dados relevantes de eventos brutos compactados. A saída é publicada no Gairos, nosso sistema de dados de séries temporais geoespaciais in-house.

Lições aprendidas

Embora nossa Rider Session State Machine possa parecer simples na teoria, aplicá-la ao caso de uso da Uber provou ser uma besta completamente diferente. Aqui estão algumas das principais lições que aprendemos ao implementar essa nova metodologia em nosso fluxo de dados existente:

  • Sincronização do relógio: Dada a grande variedade de aparelhos e variações de sistemas operacionais móveis, para não mencionar as configurações do usuário, você nunca pode realmente confiar nos timestamps enviados de clientes móveis. Vimos o relógio se desviar desde alguns segundos a alguns anos em nossos dados de produção. Para contornar esse problema, decidimos usar o timestamp Kafka, ou seja, a hora em que o Kafka recebeu a mensagem de log. No entanto, nossos clientes móveis armazenam em buffer várias mensagens de log e as enviam em uma única carga útil, de modo que muitas mensagens exibem o mesmo registro de data e hora do Kafka. Acabamos realizando uma classificação secundária usando o timestamp Kafka e o timestamp de evento de cada mensagem.
  • Robustez do ponto de verificação: Os trabalhos de fluxo baseados no estado exigem pontos de verificação periódicos do estado para um sistema de arquivos replicado, como o HDFS. A latência desse sistema de arquivos pode afetar diretamente o desempenho do trabalho, especialmente se ele for verificado com frequência. Uma única falha no ponto de verificação pode causar falhas catastróficas, como a queda total do pipeline.
  • Recuperação e preenchimento do ponto de verificação: qualquer sistema distribuído, especialmente um projetado para funcionar 24/7 na produção, está fadado a falhar em algum momento; por exemplo, os nós desaparecerão, os containers poderão ser afetados pelo YARN, ou as falhas do sistema upstream poderão impactar os jobs downstream. Portanto, planejar a recuperação de pontos de verificação e o preenchimento é essencial. O comportamento padrão do Spark Streaming para a recuperação de pontos de verificação é consumir todos os eventos acumulados em um único lote durante a tentativa de recuperação de um ponto de verificação. Descobrimos que isso sobrecarregava nossos sistemas nos casos em que o tempo entre a falha e a recuperação do trabalho era muito longo. Acabamos modificando o DirectKafkaInputDStream para podermos dividir os eventos do backlog em lotes apropriados na recuperação de pontos de verificação.
  • Pressão de retorno e limite de taxa: a taxa de entrada para tópicos do Kafka nunca é constante. Na plataforma Uber, por exemplo, muitas vezes há maior atividade durante os horários de deslocamento e noites de fim de semana. A contrapressão (pressão de retorno?) é essencial para aliviar a carga em um trabalho sobrecarregado. A contrapressão do Spark Streaming entra em ação quando o tempo total gasto pelo lote excede a duração da janela de micro-batching. Ele usa um estimador de taxa PID para controlar a taxa de entrada dos lotes subsequentes. Observamos que os parâmetros padrão incorporados para o estimador produziram oscilações selvagens e taxas de entrada artificialmente baixas durante os períodos de contrapressão, afetando a atualização dos dados. A introdução de um piso substancial para o estimador de taxa se provou ser consequente para a recuperação mais rápida da contrapressão.
  • Fidelidade dos logs móveis: os eventos enviados pelos clientes móveis podem variar muito em sua fidelidade. Em locais de baixa largura de banda ou sinal fraco, as mensagens são geralmente perdidas ou repetidas e enviadas várias vezes. Os clientes podem ficar offline devido à baixa capacidade de mid-session, portanto, a máquina de estado deve contabilizar isso. Percebemos que ouvir outros fluxos de eventos gerados por nossos sistemas de back-end associados ajudava a determinar se tínhamos dados com perdas de clientes móveis. Essa experiência mostra que é necessário que os sistemas do lado do servidor mantenham seus próprios fluxos de eventos.

Próximos passos

O processamento de pedidos de eventos é um desafio difícil. Embora as primitivas de streaming estruturado no Spark 2.2 pareçam promissoras no tratamento de eventos fora de ordem, estamos pensando em migrar para o Flink devido ao seu suporte mais amplo para o processamento de eventos instantâneos e para o suporte mais amplo da Uber.

Além disso, alguns de nossos casos de uso poderiam usar granularidade para dados em sessão, tornando inviáveis os micro-batches do Spark, outro ponto a favor do Flink.

Se você estiver interessado em criar sistemas projetados para lidar com dados em grande escala, visite a página de carreiras da Uber.

***

Este artigo é do Uber Engineering. Ele foi escrito por Amey Chaugule. A tradução foi feita pela Redação iMasters com autorização. Você pode conferir o original em: https://eng.uber.com/sessionizing-data/