Back-End

30 jul, 2018

Streams no Node.js: o que são streams, afinal? – Parte 03

Publicidade

Chegamos ao final da nossa sequência sobre streams no Node.js. Se você ainda não leu o último artigo da série, dê um pulo lá para podermos ficar na mesma página!

Nesta parte, vamos aprender a criar a nossa própria stream utilizando as interfaces que o Node nos provê.

Implementando uma stream de leitura

Vamos tentar implementar uma stream de leitura simples. Para isso, vamos utilizar o construtor chamado Writable que o Node nos proporciona através do pacote Stream que vimos antes:

const {Writable} = require('stream')
A partir de agora, temos uma série de maneiras de implementar essa interface. Uma delas, por exemplo, é extender o próprio construtor em uma classe separada:
class MinhaStream extends Writable {
    write (chunk, encoding, callback) {
        console.log(chunk.toString())
        callback()
    }
}

Esse método é bacana quando estamos trabalhando literalmente com objetos que serão instanciados em outras classes, assim encapsulamos a lógica dentro dele e abstraímos a complexidade.

Porém, uma outra forma mais simples de fazer a criação de um objeto deste tipo (e muito mais poderosa) é utilizando objetos inicializadores. Isso significa que podemos criar uma nova instância de Writable e passar para esta instância um objeto de configurações com várias chaves.

const {Writable} = require('stream')

const minhaStream = new Writable({
    write (chunk, encoding, callback) {
        console.log(chunk.toString())
        callback()
    }
})

// OU DO JEITO MAIS EXPLÍCITO

const minhaStream = new Writable({
    write: (chunk, encoding, callback) => {
        console.log(chunk.toString())
        callback()
    }
})

// OU AINDA

const minhaStream = new Writable({
    write: function write(chunk, encoding, callback) {
        console.log(chunk.toString())
        callback()
    }
})

Estamos criando uma stream que escreve no process.stdout, assim como o console.log faz. Podemos agora utilizar métodos como o pipe na nossa stream criada:

process.stdin.pipe(minhaStream)

A chave write é a única chave obrigatória que temos que passar para o inicializador. Ela leva três argumentos:

  • Chunk: que é, geralmente, um Buffer que representa os dados que estão chegando
  • Encoding: encoding dos dados chegando pelo Buffer, por exemplo, se recebermos uma string plana, este valor será utf-8
  • Callback: vai ser a função que será chamada quando terminarmos o processamento. É aqui que sinalizamos que o que fizemos foi terminado com sucesso ou não. Quando queremos sinalizar uma falha, precisamos chamar a função Callback com um objeto Error.

Quando digo que este método é mais poderoso, é somente pelo fato de que, com ele, podemos passar um objeto dinamicamente, criando assim, uma stream dinâmica que pode responder aos eventos de acordo, por exemplo, com um hash map que podemos usar. Assim, cada chave enviada para o Writable importaria uma função diferente na stream.

Esse exemplo não é o melhor jeito de implementar um Echo, até porque o que fizemos é, basicamente, uma cópia de:

process.stdin.pipe(process.stdout)

Se você quiser saber mais detalhes da implementação, veja a documentação de implementação oficial.

Implementando uma stream de leitura

Para fazer o oposto, vamos usar a interface Readable, também do mesmo pacote:

const {Readable} = require('stream')

Da mesma forma, temos várias maneiras de criar uma stream de leitura. Uma delas é estender o objeto, como fizemos na anterior:

class MinhaLeitura extends Readable {
    read (tamanho) {
        
    }
}

E também podemos implementar um objeto. A diferença aqui, é que o método que vamos criar dentro dele se chama read, e ele pode ou não ter um argumento size, que indica o tamanho que será lido.

const streamLeitura = new Readable({
    read () {}
})

Mas, agora temos uma nova forma de implementar essa stream, podemos simplesmente escrever nela, o que é chamado de pushing.

const {Readable} = require('stream')

const streamLeitura = new Readable({
    read(){}
})

streamLeitura.push('Uma chunk')
streamLeitura.push('outra chunk')
streamLeitura.push(null) // Os dados acabaram

streamLeitura.pipe(process.stdout)

Veja que aqui mandamos os valores um a um para a stream, o que é uma boa para termos controle dos dados que queremos enviar. Depois simplesmente mandamos um null, sinalizando que não há mais dados a serem enviados.

O problema que enfrentamos aqui é o seguinte: estamos criando um pipe para a saída padrão, mas estamos colocando todos os dados na stream antes de enviarmos estes dados para outro lugar. O que não é muito eficiente, se você recordar, streams foram justamente criadas para não termos que armazenar dados anteriores e somente ter que tratar dados On Demand.

Para isso, vamos implementar de fato o método read, que está no nosso objeto de configuração:

const streamLeitura = new Readable({
    read (tamanho) {
        this.push(String.fromCharCode(this.charAtual++))
        if (this.charAtual > 90) this.push(null)
    }
})
streamLeitura.charAtual = 65

streamLeitura.pipe(process.stdout) //ABCDEFGHIJKLMNOPQRSTUVWXYZ

Estamos chamando o método read várias vezes desta forma, mas estamos recebendo os dados sob demanda, não ocupando memória. Mas note que temos que criar uma implementação de charAtual como propriedade. Isto seria melhor executado no formato de classe:

class Alfabeto extends Readable {
    constructor () {
        super()
        this.charAtual = 65
    }
    
    _read (tamanho) {
        this.push(String.fromCharCode(this.charAtual++))
        if (this.charAtual > 90) return this.push(null)
    }
}

Veja que temos algumas mudanças, quando estamos implementando em classes, precisamos sobrescrever o método que está em stream.Readable.prototype._read, pois estamos estendendo a classe pai, e também precisamos retornar o comando null para que o código não seja mais executado.

Depois podemos usar desta forma:

const alfabeto = new Alfabeto()
alfabeto.pipe(process.stdout)

A implementação é mais verbosa, mas o uso é mais limpo. Veja a documentação desta implementação aqui e se atente para esta parte.

Implementando streams duplex

Para esta implementação, vamos basicamente implementar ambas as interfaces acima. Porém, temos um objeto especial chamado Duplex, que permite que façamos isso sem muitos problemas:

const {Duplex} = require('stream')

const duplexStream = new Duplex({
    write(chunk, encoding, callback) {
        console.log(chunk.toString())
        callback()
    },
    read(tamanho) {
        this.push(String.fromCharCode(this.charAtual++))
        if(this.charAtual > 90) this.push(null)
    }
})
duplexStream.charAtual = 65
process.stdin.pipe(duplexStream).pipe(process.stdout)

O que fizemos aqui foi simplesmente implementar as duas streams que tínhamos anteriormente em uma só. Então, agora, além de enviarmos as letras, também vamos imprimi-las na tela.

É importante perceber aqui que uma Duplex tem o seu lado Readable e Writable operando de forma completamente independente, este é só um construto da linguagem para podermos declarar os dois juntos.

Da mesma forma, se quisermos fazer essa stream em forma de classe, temos que implementar os métodos _read e _write independentemente. Veja mais na documentação.

Implementando streams de transformação

Para uma stream de transformação, não temos que implementar nenhum dos métodos descritos anteriormente. Basta que implementemos um único método chamado transform do objeto Transform do pacote stream.

Este método tem a assinatura do método write da stream de escrita, mas também tem acesso ao método push da stream de leitura:

const {Transform} = require('stream')

const caixaAlta = new Transform({
    transform (chunk, encoding, callback) {
        this.push(chunk.toString().toUpperCase())
        callback()
    }
})

require('fs').createReadStream('./umarquivo.txt').pipe(caixaAlta).pipe(process.stdout)

Aqui as coisas ficam mais interessantes porque estamos realizando uma transformação e passando para frente, como falamos nas demais partes do artigo, este tipo de stream é também conhecido como “stream de passagem”, já que ela simplesmente modifica os dados internos e os passa a diante.

Veja a documentação oficial para implementações.

Object Mode

Por padrão, streams vão aceitar somente Buffers ou Strings como valores de entrada, mas podemos passar uma flag chamada objectMode quando criarmos as mesmas. Desta forma, a stream vai aceitar qualquer objeto JavaScript.

Vamos usar um exemplo simples e poderoso que vai transformar uma sequencia de valores separados por vírgula, como: a,b,c,d em um objeto do tipo {a: b, c: d}:

const {Transform} = require('stream')

const separaVirgulas = new Transform({
    readableObjectMode: true,
    transform (chunk, encoding, cb) {
        this.push(chunk.toString().trim().split(','))
        cb()
    }
})

const paraObjeto = new Transform({
    readableObjectMode: true,
    writableObjectMode: true,
    transform (chunk, encoding, cb) {
        const outObj = {}
        for(let i=0; i<chunk.length; i+=2) {
            outObj[chunk[i]] = chunk[i+1]
        }
        this.push(outObj)
        cb()
    }
})

const paraString = new Transform({
    writableObjectMode: true,
    transform (chunk, encoding, cb) {
        this.push(JSON.stringify(chunk) + '\n')
        cb()
    }
})

process.stdin
	.pipe(separaVirgulas)
	.pipe(paraObjeto)
	.pipe(paraString)
	.pipe(process.stdout)

Perceba que adicionamos readableObjectMode ou writableObjectMode em diferentes casos das streams. No primeiro caso, adicionamos readableObjectMode como true, porque vamos receber uma string no formato a,b,c,d e nossa stream separaVirgulas vai dar um push de um array, não de um buffer, como resultado do método split Por isso que nosso objectMode para Readables deve ser true neste caso.

Já na próxima stream, estamos recebendo um objeto (que é o array anterior), portanto, nosso writableObjectMode deve ser true, e realizando push em outro objeto (que é o objeto formatado), e é ai que o readableObjectMode deve ser true.

Na última stream, estamos apenas recebendo um objeto, mas escrevemos uma string (que é o resultado do JSON.strigify), então só precisamos que o writableObjectMode seja true, pois estamos recebendo um objeto, e não escrevendo um.

Pode ser um pouco confuso a princípio, mas pense assim:

  • Sempre que seu método push possuir um objeto qualquer, a flag readableObjectMode deve ser true
  • Sempre que você estiver recebendo um objeto como entrada na chunk, seu writableObjectMode deve ser true

Conclusão

No final da sequência, podemos ver o quanto streams podem ser úteis para várias coisas. Os exemplos dados aqui são meramente didáticos e pouco práticos, mas com um pouco de experiência e criatividade, você poderá implementar streams em diversos casos de uso reais.

Se você quiser saber ainda mais sobre streams, confira este artigo que serviu de base para toda a sequência, e também este outro artigo que dá uma ideia bem bacana de como podemos implementar streams.