Front End

19 set, 2019

Usando Worker Threads no Node.js

Publicidade

Você já ouviu falar de Web Worker? E Service Worker? Bom, estas APIs são velhas conhecidas no mundo web dos navegadores. Todos sabemos que o JavaScript é single-threaded, certo? Isso significa que só temos uma call stack e, portanto, só podemos executar uma tarefa por vez.

Quantas vezes alguém já te disse: ‘Não bloqueie a main-thread do JavaScript’? E isso tem um bom motivo! Ao ser bloqueada, a main-thread (e única thread) não teria a capacidade de processar outras partes do código, essencialmente travando seu programa. Com o passar do tempo e o avanço da tecnologia, fez-se absurdamente necessário a capacidade de uma linguagem tão popular como o JavaScript poder se portar como uma linguagem multi-threaded.

É ai que os Web Workers entram em cena. Basicamente, eles provém a capacidade de criarmos outras threads a parte da thread principal para que possamos executar processamentos mais pesados sem precisar, de fato, travar a main-thread! Isso permite que o JavaScript possa executar processamentos muito mais pesados do que apenas animações e alterações em elementos do site.

Um exemplo clássico é a mineração de bitcoins, o que antes só podia ser realizado em servidores, agora poderia ser feito em um browser comum!

Workers e Node.js

O Node.js possui uma api de worker threads. O grande problema é que ela ainda estava como experimental, ou seja, a API poderia sofrer modificações e breaking changes que poderiam impactar no desenvolvimento de uma aplicação de produção. Por este motivo, muitos desenvolvedores (incluindo eu) preferiam utilizar o pacote child_process, que não era exatamente um worker, mas era um processo filho que poderíamos executar.

Depois tivemos a incrível API de cluster que nos permitia criar um modelo de multi-processamento em vários núcleos do processador (porque o Node.js é single core também) e ai tirar vantagem do multi-processamento utilizando outro pacote.

Porém, com um recente tweet da equipe de desenvolvimento, esta API foi finalmente marcada como estável! Ou seja,
podemos tirar vantagens dessa ferramenta absurdamente poderosa! Vamos criar uma aplicação simples que faz exatamente o exemplo que dei anteriormente: minerar bitcoins.

Definindo regras

O ato de ‘minerar’ um bitcoin basicamente é encontrar um número único que, ao ser adicionado ao payload de um
bloco, produz um hash final que segue uma regra específica. No caso do bitcoin, essa regra diz que o hash final precisa começar com uma quantidade de N zeros.

Isto é um caso excelente para processamento paralelo, porque é um processo que demora um tempo e exige computação mais pesada. Vamos simplificar o modelo de mineração e abstrair as funcionalidades principais para um código mais simples de entender e focar em nossos workers. Nosso ‘minerador’ vai receber um JSON da seguinte forma:

{
  "payloads": [
    "romi3927273277nevraefiass2103342375ohodkotbug",
    "gaze443157697niscipibgiw1642589262efihjejlos",
    "ocug1626862968veswukguwcu2592116772osoreteriu",
    "iwos2708201740eprihvogmos3764344965agadupiote",
    "nemu3084996143vimwudtanek1213613753ofefuecnuf",
    "isaj4116570015biwidaposci4241495684hezenlatdu",
    "huct2915543159amviwihweuv2600946750jeeduzerah",
    "razk2100190742afatmuclolo997646306routaotahe",
    "gihu2518634792boafiehucef2128481441gecrepirse",
    "zoup2063415799niuwictihal1459892629arrogdekin",
    "arsa41137640wetildedimt3297232485uvebascaen",
    "uccu3568686497mucuminulaf2528880605ueduemosob",
    "luro3950339148zogazeivnac3529592063makevtabic",
    "ijoj2400035192akikodpuhet1846390304oesadipkor",
    "zuhe24553937pjicathesej1889936403voskolrowu",
    "ulog2145473488ewkiusalakm3891503344isosfidulw",
    "mevi4172405508maosocicacn218089igamvugusu",
    "vepi1035082870ratewivneln4027185780ugahidiwuz",
    "toze3177065179demjukuzjab3619747835epocilopin",
    "wefj1958553370osnoputrila406622169tonuseotij",
    "jitm4273326712uulfafoafda3083365269donecwaguu",
    "deiz284958975sonalkedrob3724713613ejlumkuvwu",
    "ekoa329701548nopofarejep3799522462ewergilual",
    "wogi4052082081dsimjugovbe1235427965ifvigesuma",
    "vost153242693ocukuwinhad3925312540celudnawhe",
    "luzz1422123026uzpihwausvi3711685905wocbewenub",
    "opup2085811647ugorwadtekj1040859347askiwosuma",
    "pisg3729559558ucucikufava940175451bwalidhuri",
    "diur2228207618koungurzawt1535270119ihullepfil",
    "guew1198852206uerehozvecd1741914790obbesikves",
    "duba1193434831nepwajmajim3638815420ivekemunjo",
    "bolm1143531649etsocopzijc2871885729ijpuptazsu",
    "jazo2509464204resunouduma364075720lepbamwegj"
  ]
}

Este arquivo simboliza uma série de payloads de blocos diferentes. Cada item do array significa um bloco. O que temos que fazer é minerar cada um destes blocos, encontrar o número correspondente, seu hash e adicioná-lo em um objeto desta forma:

const blocosMinerados = [
  { payload: 'payload original do bloco', nonce: 3456, hash: 'hash final do bloco' }
]

O nonce é o nosso número único que vai gerar o hash que queremos. Para este exemplo, vamos dizer que nossa regra é que o hash final precisa começar com 4 zeros (você pode aumentar este limite).

Criando nosso minerador

A primeira coisa que vamos fazer é importar os nossos payloads em um novo arquivo que chamarei de index.js,
definir nele o número de zeros que queremos, o número de workers finalizados (já vamos entender por que) e também o array que vai receber nosso resultado final:

const { payloads } = require('./payloads.json')
const final = []
let finishedWorkers = 0
const LEADING_ZEROES = 4

Em seguida, vamos criar nossa lógica. Temos que iterar por todos os payloads, para cada payload temos que instanciar uma nova worker thread e enviar para esta worker thread o payload do bloco:

const { payloads } = require('./payloads.json')
const LEADING_ZEROES = 4
const { Worker, isMainThread } = require('worker_threads')

if (isMainThread) {
  for (let payload of payloads) {
    const worker = new Worker(__filename, { env: { LEADING_ZEROES } })
    worker.once('message', (message) => {
      final.push(message)
      finishedWorkers++
      if (finishedWorkers === payloads.length) console.log(final)
    })
    worker.on('error', console.error)

    console.log(`Iniciando worker de ID ${worker.threadId} e enviando o payload "${payload}"`)
    worker.postMessage(payload)
  }
}

Vamos passar linha a linha aqui. O que estamos fazendo é, inicialmente verificar se esta é a thread principal, ou seja, é o processo que inicia os workers, se for, então vamos fazer um loop em nosso array de payloads e criar para cada payload um novo worker.

Veja que, para criar um novo worker, nós precisamos passar um caminho de arquivo, isto porque o Worker executa um código JavaScript. Então, podemos passar o caminho do próprio arquivo – como fizemos agora – e, neste caso, devemos sempre verificar se esta thread é a thread principal usando isMainThread, podemos também enviar o caminho de outro arquivo JavaScript ou então podemos enviar uma string de um código JavaScript e pedir para que o Worker faça um eval deste código.

O segundo parâmetro são as opções de inicialização. Existem várias, porém, a que estamos utilizando é a env, que seta o objeto global process.env com o objeto que passarmos para ela. Então, dentro do nosso worker, nossas variáveis de ambiente vão conter uma chave LEADING_ZEROES.

Existe a opção de utilizarmos { env: worker.SHARE_ENV } para que o worker compartilhe do mesmo objeto process.env que o processo pai que o criou.

Por fim, o que fazemos é definir os tratamentos de erros e mensagens. Um worker, por definição, é um emissor de
eventos, então temos que ouvir os eventos que ele emite, que podem ser:

  • online: Disparado quando o worker é iniciado
  • error: Disparado quando há um erro no worker
  • exit: Disparado quando o worker terminou seu processamento
  • message: Disparado quando o worker envia uma mensagem para o processo pai que o criou

Os mais importantes aqui são o message, que vai ser por onde vamos buscar nosso resultado, e o error, que vai ser por onde vamos tratar nossos erros. Para o erro, vamos apenas logar o erro no console. Agora, para a mensagem, o que temos que fazer é o seguinte:

  • 1. Adicionamos o objeto de resultado no nosso array de resultados
  • 2. Aumentamos a quantidade de workers que foram finalizados
  • 3. Se a quantidade for igual ao número de payloads, significa que todos os workers foram finalizados e podemos exibir os resultados

Fazemos esta checagem de número de workers finalizados porque este é um processamento assíncrono. Se somente
colocássemos um console.log(final) após o nosso laço for, o programa iria iniciar e fechar logo em seguida, porque
o processo pai iria inicializar todos os workers e seguir executando seu código normalmente. Então, precisamos
sincronizar o final de todos os Workers com o final da execução.

Fazendo a mineração

Agora que criamos o processo pai, vamos criar o processo filho! O que ele precisa fazer é o seguinte:

  • 1. Receber a mensagem do pai
  • 2. Inicializar um número com o valor 0
  • 3. Para cada incremento de 1 neste número temos que verificar se os N primeiros zeros do hash gerado pelo payload+ numero existem
  • 4. Se existir, postamos uma mensagem com o objeto de resultado
  • 5. Se não, incrementamos o número e tentamos de novo

Fazemos isto da seguinte forma:

parentPort.once('message', (message) => {
  const payload = message
  let nonce = 0
  let generatedHash = ''

  do {
    generatedHash = crypto.createHash('sha256').update(payload + nonce).digest('hex')
    nonce++
  } while (generatedHash.slice(0, process.env.LEADING_ZEROES) !== '0'.repeat(process.env.LEADING_ZEROES))

    parentPort.postMessage({ payload: message, nonce, hash: generatedHash })
})

Da mesma forma que o Worker é um emissor de eventos, o pai também é, então vamos ouvir o evento message uma
única vez. Ao recebermos a mensagem, vamos gerar nossas variáveis e fazer o loop do/while para começar desde o 0
verificando se o hash é igual à nossa regra. Vamos colocar tudo isso dentro de um else, pois não é o processo principal.

Veja que estamos usando o módulo crypto, não esqueça de adicionar const crypto = require(‘crypto’) no topo de seu arquivo. 

O arquivo final ficaria assim:

const { payloads } = require('./payloads.json')
const crypto = require('crypto')
const { Worker, isMainThread, parentPort } = require('worker_threads')

const LEADING_ZEROES = 4
const final = []
let finishedWorkers = 0

if (isMainThread) {
  for (let payload of payloads) {
    const worker = new Worker(__filename, { env: { LEADING_ZEROES } })
    worker.once('message', (message) => {
      final.push(message)
      finishedWorkers++
      if (finishedWorkers === payloads.length) console.log(final)
    })
    worker.on('error', console.error)

    console.log(`Iniciando worker de ID ${worker.threadId} e enviando o payload "${payload}"`)
    worker.postMessage(payload)
  }
} else {
  parentPort.once('message', (message) => {
    const payload = message
    let nonce = 0
    let generatedHash = ''

    do {
      generatedHash = crypto.createHash('sha256').update(payload + nonce).digest('hex')
      nonce++
    } while (generatedHash.slice(0, process.env.LEADING_ZEROES) !== '0'.repeat(process.env.LEADING_ZEROES))

    parentPort.postMessage({ payload: message, nonce, hash: generatedHash })
  })
}

Simplificando

Podemos simplificar um pouco mais nosso minerador. Vamos deixar ele mais simples e mais legível separando as
responsabilidades. Vamos criar um arquivo worker.js e colocar o código do nosso worker lá dentro:

// worker.js
const { parentPort } = require('worker_threads')
const crypto = require('crypto')

parentPort.once('message', (message) => {
  const payload = message
  let nonce = 0
  let generatedHash = ''

  do {
    generatedHash = crypto.createHash('sha256').update(payload + nonce).digest('hex')
    nonce++
  } while (generatedHash.slice(0, process.env.LEADING_ZEROES) !== '0'.repeat(process.env.LEADING_ZEROES))

  parentPort.postMessage({ payload: message, nonce, hash: generatedHash })
})

Agora, no nosso arquivo index.js, vamos remover o if/else que verifica se este é o processo pai ou filho, porque o
index.js sempre vai ser o processo pai que instanciará um Worker que lerá do arquivo worker.js como processo
filho:

// index.js
const { payloads } = require('./payloads.json')
const { Worker } = require('worker_threads')

const LEADING_ZEROES = 4
const final = []
let finishedWorkers = 0

for (let payload of payloads) {
  const worker = new Worker('./worker.js', { env: { LEADING_ZEROES } })
  worker.once('message', (message) => {
    final.push(message)
    finishedWorkers++
    if (finishedWorkers === payloads.length) console.log(final)
  })
  worker.on('error', console.error)

  console.log(`Iniciando worker de ID ${worker.threadId} e enviando o payload "${payload}"`)
  worker.postMessage(payload)
}

Ao executarmos node index.js no nosso console vamos ter uma resposta parecia com esta:

E, logo em seguida, printamos o resultado:

Conclusão

A API de Workers é extensa e bem completa! Você pode checar mais opções na documentação oficial. Uma das APIs
mais interessantes é a chamada MessageChannel que permite que criemos um canal de comunicação de duas portas
entre quaisquer processos que forem criados dentro do mesmo programa, permitindo que enviemos mensagens de um local a outro de forma simples e rápida.

Além disso, os Workers possuem outras opções de inicialização, como: habilitar/desabilitar as saídas e entradas de
dados, fazer um eval direto em uma string JavaScript, passar parâmetros de inicialização para o Node e, uma das coisas mais interessantes, o workerData, que é uma instancia de um valor que é replicada em todos os workers. Poderíamos ter passado nossa variável LEADING_ZEROES para este workerData que ele seria replicado em todos os workers igualmente e poderíamos utiliza-la como workerData.LEADING_ZEROES.

Em suma, a API de Workers é definitivamente uma opção para se estudar quando fazendo processamento pesado no
Node.js e ela certamente não pode ser desconsiderada como uma das APIs mais poderosas que existem até hoje.

Neste link você vai achar o código completo e a implementação do que acabamos de fazer aqui!
Até mais!