Desenvolvimento

11 jul, 2017

Ex Manga Downloadr: como lidar corretamente com grandes coleções

Publicidade

No meu artigo anterior, consegui simplificar muito o código original através do uso do Flow. Mas a desvantagem é que o tempo de execução realmente aumentou muito.

José Valim entrou gentilmente e publicou um comentário valioso, que vou colar aqui:

Você já tentou reduzir o @max_demand ao invés disso? @max_demand é a quantidade de dados que você troca entre os estágios. Se você o configurar para 60, significa que você está enviando 60 itens para um estágio, 60 itens para o outro e assim por diante. Isso dá um equilíbrio fraco para pequenas coleções, pois há uma chance de todos os itens acabarem no mesmo estágio. Você realmente quer reduzir o max_demand para 1 ou 2 para que cada estágio obtenha pequenos lotes e solicite mais do que o necessário. Outro parâmetro que você geralmente ajusta é a opção estágios: …, você deve definir isso para a quantidade de conexões que você teve no poolboy no passado.

No entanto, acredito que você não precisa usar o Flow. Elixir v1.4 tem algo chamado Task.async_stream, que é uma mistura de poolboy + task async, que é definitivamente um melhor substituto para o que você teve. Nós o adicionamos ao Elixir depois de implementar o Flow, pois percebemos que você pode obter muita milhagem de Task.async_stream sem precisar avançar para uma solução completa como o Flow. Se estiver usando Task.async_stream, a opção max_concurrency controla o tamanho do seu pool.

E, obviamente, ele está certo. Eu entendi mal como funciona o Flow. Ele é para ser usado quando você tem muitos itens para processar em paralelo. Particularmente unidades de processamento que podem ter alta variação e, portanto, muita back-pressure, não só porque há muitos itens para processar, mas porque seus tempos de execução podem variar drasticamente. Então, é um desses casos de ter um canhão, mas eu só tenho uma mosca para matar.

O que eu não sabia era a existência de Task.async_stream e de seu companheiro Task.Supervisor.async_stream se eu precisar adicionar mais controle.

Vamos voltar atrás um pouco.

Lidando com coleções em Elixir

Erlang é um monstro. Ele fornece todos os blocos de construção de um sistema operacional em tempo real, altamente concorrente! Realmente, o que ele fornece de extraordinário é muito mais do que qualquer outra linguagem/plataforma/máquina virtual poderá oferecer. Você não tem tanto assim de graça em Java, ou .NET ou qualquer coisa. Você deve montar as peças manualmente, passar centenas de horas ajustando e ainda rezar muito para que tudo funcione perfeitamente.

Então, você tem sistemas distribuídos para construir? Na verdade, não há outra opção. Use Erlang ou sofra no inferno.

Elixir melhora isso um tanto ao criar uma biblioteca padrão bastante razoável e simples de usar que torna a parte de codificação realmente agradável. Este é um combo matador. Você precisa fazer o próximo Whatsapp? Você precisa fazer o próximo Waze? Você precisa reconstruir Cassandra do zero? Você precisa criar coisas como o Apache Spark? Faça com Elixir.

Em Erlang, você precisa resolver tudo usando o GenServer. É uma abstração pura da OTP. Você precisa entender OTP intimamente. Não há atalho aqui. Não há Erlang sem OTP.

Dito isto, você pode começar simples e escalonar sem tanto problema.

Geralmente, tudo começa com Coleções, ou mais corretamente, com algum tipo de Enumeração.

Assim como a minha simples função Workflow.pages/1 que itera através de uma lista de links de capítulo, procure cada link, analise o HTML de retorno e extraia a coleção de links de páginas nesse capítulo, reduzindo as sub-listas para uma lista completa de links de páginas.

Se eu sei que a coleção é pequena (menos de 100 itens, por exemplo), eu simplesmente faria isso:

def pages({chapter_list, source}) do
   pages_list = chapter_list
     |> Enum.map(&Worker.chapter_page([&1, source]))
     |> Enum.reduce([], fn {:ok, list}, acc -> acc ++ list end)
   {pages_list, source}
end

E é isso. Isso é linear. Processará sequencialmente apenas um link por vez. Quanto mais links de capítulo, mais tempo ele levará. Normalmente, eu quero processar isso em paralelo. Mas não consigo disparar um processo paralelo para cada link de capítulo, porque se eu receber mil links de capítulos e dispará-los todos, será uma Negação de Serviço e eu certamente receberei centenas de time outs.

Você pode encontrar dois problemas principais quando precisar fazer uma iteração em uma grande coleção.

  • Se a sua coleção for gigantesca (imagine um arquivo de texto com GB de tamanho e que você precisa iterar linha a linha). Para isso, você usa Stream em vez de Enum. Todas as funções parecem quase exatamente as mesmas, mas você não terá que carregar toda a coleção na memória e você não continuará duplicando.
  • Se a sua unidade de processamento demorar muito. Agora que você resolveu não explodir seu uso de memória, o que acontece se você tiver trabalhos lentos durante a iteração em cada item da coleção? Esse é o nosso caso, onde a coleção é bem pequena, mas o tempo de processamento é longo, pois ele está sendo buscado a partir de uma fonte externa na internet. Pode demorar milissegundos, pode demorar alguns segundos.

Uma maneira de controlar isso é através do uso de “batches”, algo assim:

def pages({chapter_list, source}) do
  pages_list = chapter_list
        |> Enum.chunk(60)
        |> Enum.map(&pages_download(&1, source))
        |> Enum.concat
  {pages_list, source}
end

def pages_download(chapter_list, source)
   results = chapter_list
     |> Enum.map(&Task.async(fn -> Worker.chapter_page([&1, source]) end))
     |> Enum.map(&Task.await/1)
     |> Enum.reduce([], fn {:ok, list}, acc -> acc ++ list end)
   {pages_list, source}, results        
end

Isto é apenas para o exemplo, não compilei esse trecho para ver se ele funciona, mas você pode ter uma ideia de dividir a grande lista e processar cada pedaço através de Task.async e Task.await.

Novamente, para pequenas listas, isso deve estar certo (milhares) e cada item não demora muito para processar.

Agora, isso não é muito bom. Porque cada pedaço deve terminar antes que o próximo pedaço comece. Razão pela qual a solução ideal é manter uma quantidade constante de trabalhos sendo executados em qualquer momento. Para esse fim, precisamos de um Pool, que é o que expliquei no artigo Poolboy: ao resgate!.

Mas implementar a maneira correta de manter o pool inteiramente preenchido requer algum malabarismo chato entre as transações Poolboy e Task.Supervisor.async. É por isso que eu estava interessado no novo uso do Flow.

O código está limpo, mas, como expliquei anteriormente, este não é o caso de uso adequado para o Flow. É melhor você ter que iterar em dezenas de milhares de itens ou mesmo infinitos (você tem um tráfego recebido de pedidos que precisam de processamento paralelo, por exemplo).

Então, finalmente, há um meio-termo. A solução entre o simples Task.async e Flow é Task.async_stream, que funciona como uma implementação de pool, onde ele mantém uma max_concurrency de trabalhos executados em uma stream. O código final torna-se mais elegante assim:

def pages({chapter_list, source}) do
  pages_list = chapter_list
    |> Task.async_stream(MangaWrapper, :chapter_page, [source], max_concurrency: @max_demand)
    |> Enum.to_list()
    |> Enum.reduce([], fn {:ok, {:ok, list}}, acc -> acc ++ list end)
  {pages_list, source}
end

E este é o commit final, com as mudanças mencionadas.

Conclusão

A implementação com Task.async_stream é super simples e os tempos finalmente se tornaram os mesmos que antes.

84,16s user 20,80s system 138% cpu 1:15,94 total

Muito melhor do que os mais de 3 minutos que estava levando com o Flow. E isso não é porque o Flow é lento, é porque eu não estava usando corretamente, provavelmente disparando um grande pedaço em um único GenStage e criando um gargalo. Mais uma vez, use apenas o Flow se você tiver itens suficientes para colocar centenas deles em vários GenStages paralelos. Estamos falando de coleções com dezenas de milhares de itens, e não minhas poucas listas de páginas.

Ainda assim, há um pequeno truque. Para buscar todos os links de capítulo e página, estou usando uma max_concurrency de 100. O time out é padrão em 5.000 (5 segundos). Isso funciona porque o HTML de retorno não é tão grande e podemos paralelizar esse tanto em uma conexão de alta largura de banda.

Mas o procedimento de download de imagens em Workflow.process_downloads eu tive que cortar max_concurrency ao meio e aumentar o timeout até 30 segundos para garantir que ele não falharia.

Como esta é uma implementação simples, não há recuperação de falha e nenhuma rotina de repetição. Eu teria que substituir esta implementação com Task.Supervisor.async_stream para recuperar algum controle. Minha implementação original foi mais complicada, mas eu tive lugares para adicionar mecanismos de retry. Então, novamente, é um meio-termo entre facilidade de uso e controle, sempre. Quanto mais controle você tem, pior o código se torna.

Este é um simples exercício de exemplo, então vou manter isso por enquanto.

 

***

Artigo traduzido com autorização do autor. Publicado originalmente em http://www.akitaonrails.com/2017/06/16/ex-manga-downloadr-part-7-properly-dealing-with-large-collections