Desenvolvimento

30 jun, 2017

Ex Manga Downloadr: a ascensão do Flow

Publicidade

Faz mais de um ano desde que escrevi sobre meu projeto favorito, Ex Manga Downloadr. Desde então eu fiz pequenas atualizações para mantê-lo com as versões e bibliotecas atuais do Elixir.

Para um lembrete rápido, o exercício é que eu quero fazer um webscrapping no MangaReader.net, um punhado de imagens organizadas em páginas e capítulos, e, no fim, tudo deve ser compilado e organizado em um PDF para que eu possa carregá-los no Kindle.

O webscrapping é um loop simples de HTML GETs em uma grande quantidade de URLs, vasculhando o HTML e buscado novas URLs para baixar.

Em muitas linguagens simples, as pessoas geralmente resolvem isso muito simplesmente, de duas maneiras:

– Um loop aninhado simples. Uma thread simples, fetch sequencial. Então, se você tiver 5.000 links e a busca em cada link levar 10 segundos, basicamente serão 10*5.000 = 50.000 segundos, o que é um tempo estupidamente longo.

– Uma geração simples de processos, fibras e threads ou I/O paralelos, tudo ao mesmo tempo. Uma tentativa de paralelizar todos os fetches de uma vez.

Todos provavelmente concordam que a primeira opção é estúpida. Agora, a segunda é delicada.

A parte delicada é o controle.

Qualquer um utilizando GO diria “oh, isso é fácil, basta colocar um loop e criar algumas rotinas GO”. Ou qualquer um utilizando o Node.js diria “oh, isso é fácil, basta colocar um loop, fazer a busca – elas vão ser executadas assincronamente – e adicionar os retornos, um processo simples de async/await”.

Eles não estão errados, mas essa é uma implementação muito ingênua. É comum disparar centenas ou milhares de solicitações paralelas. Agora, o que acontece se uma falhar e você precisar tentar novamente? O que acontece se o MangaReader tiver um sistema de controle que comece a derrubar as conexões ou limitar seu tempo? Ou se sua velocidade de Internet não for suficiente, e depois de certa quantidade de solicitações você começar a receber menos retornos e exceder o limite de tempo?

A mensagem é: é muito simples disparar coisas paralelas. É muito complicado controlar coisas paralelas.

É por isso que em minha primeira implementação em Elixir, eu apresentei uma implementação complicada utilizando uma combinação de um GenServer personalizada, a infraestrutura Task própria do Elixir para o padrão de async/await, e o Poolboy para controlar a taxa de paralelismo. É assim que você controla o gargalo para reduzir recursos: utilizar pools e filas (por isso que todos os bancos de dados tem um pool de conexão, lembra do C3PO?)

Esse é um fragmento da minha implementação anterior:

def chapter_page([chapter_link, source]) do
  Task.Supervisor.async(Fetcher.TaskSupervisor, fn ->
    :poolboy.transaction :worker_pool, fn(server) ->
      GenServer.call(server, {:chapter_page, chapter_link, source}, @genserver_call_timeout)
    end, @task_async_timeout
  end)
end

Sim, está muito feio, e existem muitos boilerplates para o GenServer, o Supervisor padrão para inicializar o Poolboy e etc. E o workflow do código de mais alto nível fica assim:

def pages({chapter_list, source}) do
   pages_list = chapter_list
     |> Enum.map(&Worker.chapter_page([&1, source]))
     |> Enum.map(&Task.await(&1, @await_timeout_ms))
     |> Enum.reduce([], fn {:ok, list}, acc -> acc ++ list end)
   {pages_list, source}
end

Então, dentro do módulo Worker cada método público encapsula as chamadas internas do GenServer em um Task async e na iteração da coleção nós adicionamos Task.await para realmente aguardar por todas as chamadas paralelas serem finalizadas, então nós podemos finalmente reduzir os resultados.

O Elixir agora vem com essa infraestrutura GenStage interessante que promete substituir GenEvent, e o caso de uso é quando você tem uma situação produtor-consumidor com back pressure. Basicamente quando você tem pontos de lentidão e você acabaria precisando controlar os gargalos.

Então, o Flow é uma abstração alta e mais fácil que você pode utilizar quase da mesma maneira que você utilizaria o Enum em suas coleções, mas, ao invés de um mapeamento sequencial, ele trata os deslocamentos paralelos e controla os batches. Então o código é muito similar, mas sem a necessidade de você controlar a paralelização manualmente.

Este é a commit completo onde eu consegui remover o Poolboy, meu GenServer padrão, reimplementar o Worker como um módulo simples de funções, e então o fluxo pode ficar sem o padrão de async/await e ao invés disso, utilizar o Flow:

def pages({chapter_list, source}) do
   pages_list = chapter_list
     |> Flow.from_enumerable(max_demand: @max_demand)
     |> Flow.map(&MangaWrapper.chapter_page([&1, source]))
     |> Flow.partition()
     |> Flow.reduce(fn -> [] end, fn {:ok, list}, acc -> acc ++ list end)
     |> Enum.to_list()
   {pages_list, source}
end

O único boilerplate que sobrou é Flow.from_enumerable() e Flow.partition() encapsulando o Flow.map, e é isso!

Note que eu configurei @max_demand como 60. Você deve ajustá-lo para ser maior ou menor, dependendo da sua conexão de Internet, você deve testar. Como padrão, o Flow vai disparar 500 processos em paralelo, o que é muito para o webscrapping em uma conexão doméstica comum e você vai sofrer com a queda dos retornos. Isso é o que eu tinha que fazer antes com o Poolboy, iniciando o pool com no máximo 60 transações.

Infelizmente nem tudo é tão simples quanto parece. Executando essa nova versão no modo de teste eu recebi o seguinte resultado:

58,85s user 13,93s system 37% cpu 3:13,78 total

Então, o tempo total de mais de 3 minutos, utilizando aproximadamente 37% da capacidade de processamento.

Minha versão imediatamente anterior utilizando as coisas do Poolboy, Task.Supervisor, GenServer, etc., ainda me retornou isso:

100,67s user 20,83s system 152% cpu 1:19,92 total

Menos da metade do tempo, embora utilizando todos os núcleos do meu processador. Então, minha implementação padrão ainda utiliza o máximo dos meus recursos. Ainda existem algumas coisas na implementação do Flow que eu não entendi bem. Eu já tentei aumentar o max_demand de 60 para 100, mas isso não melhorou nada. Deixar com o padrão de 500 aumenta o tempo para mais de 7 minutos.

De forma geral, ao menos ele torna a implementação mais simples (por isso, mais fácil de manter), mas até a implementação do Flow tem gargalos, ou eu estou utilizando ela errado nesse momento. Se você souber o que é, me avise nos comentários abaixo.

 

***

Artigo traduzido com autorização do autor. Publicado originalmente em http://www.akitaonrails.com/2017/06/13/ex-manga-downloadr-part-6-the-rise-of-flow