/Infra

voltar
/Infra

Desmistificando o Backpressure do RxJava no app Android da Uber

PorUber Engineering em

A transmissão rápida e eficiente de informações sobre a atividade de um aplicativo – por exemplo, solicitando corridas ou a aproximação de um local de embarque de passageiro – é crucial para uma experiência de usuário perfeita em todos os produtos da Uber.

Na Uber, usamos muito o RxJava em nossos aplicativos Android para comunicar claramente eventos entre observers e expressar transformações assíncronas complicadas. Os aplicativos de passageiro e motorista da Uber têm muitos estados assíncronos usando RxJava Observables, como a comunicação de novas saídas para um motorista ou uma nova combinação de uberPOOL para um passageiro atualmente em viagem. Embora RxJava tenha sido incrivelmente útil para nós, o Backpressure nos obrigou a pensar de forma criativa sobre como usamos essa biblioteca.

Neste artigo, oferecemos um exemplo do Backpressure do RxJava e compartilhamos nossas melhores práticas para negá-lo através de operadores de Backpressure RxJava, configurações RxJava 1.x mais tolerantes e RxJava 2.x.

Sob pressão: um caso de backpressure com RxJava 1.x

Backpressure em RxJava ocorre quando um Observable descarrega eventos mais rapidamente do que um operador possa recebe-los; isso causa lags, crashes e outros problemas. Há muitos cenários em que a Backpressure RxJava é comum, particularmente quando se lida com múltiplos tópicos ou eventos assíncronos. Com RxJava 1.x, a Backpressure é problemática independentemente de precisar ou não de suporte; assim, erros MissingBackpressureException e falhas correspondentes geralmente ocorrem com pouco contexto ou visão sobre quais eventos as estão causando.

Em 2015, começamos a migrar de Otto (um event bus melhorado com suporte para Android) e a implementar mais e mais RxJava em nossos aplicativos de passageiro e motorista. Isso causou um aumento nas falhas MissingBackpressureException. Uma vez que a causa inicial da falha é difícil de identificar, nossa equipe da Plataforma Móvel precisava obter uma compreensão mais profunda de como a Backpressure  funciona em ambientes RxJava. Também precisávamos desenvolver soluções para impedir que tornasse os nossos aplicativos mais lentos.

Na prática, a sequência do evento que leva a uma MissingBackpressureException é complicada. Para manter as coisas simples, usaremos um pequeno fragmento de RxJava 1.x para demonstrar como essa incapacidade de consumir eventos causa uma MissingBackpressureException:

// Set RxRingBuffer size to 8, to make this example simple +  deterministic across platforms.
System.setProperty("rx.ring-buffer.size", "8");

ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Runnable() {
    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});
Scheduler scheduler = Schedulers.from(executorService);

PublishSubject<Integer> subject = PublishSubject.create();
subject.observeOn(scheduler)
        .subscribe(System.out::println);

// onNext 12 times into the subject...
for (int i = 1; i <= 12; i++) {
    subject.onNext(i);
}
subject.onCompleted();

Conforme demonstrado acima, a propriedade rx.ring-buffer.size do RxJava 1.x armazena o tamanho de todos os ring buffers in-memory que RxJava usa quando um Observable não consegue acompanhar a taxa de emissões de eventos. O tamanho do ring buffer está configurado explicitamente para um número baixo (8) porque diferentes plataformas têm padrões diferentes para esse valor. Enquanto o padrão Java Virtual Machine (JVM) é 128 itens por buffer circular, o Android tem um limite muito menor de 16.

No nosso exemplo, criamos um assunto e o emitimos na thread principal, mas tentamos consumir os eventos em um novo agendador. Para manter as coisas deterministas e reproduzíveis, usamos um único agendador, iniciando a thread com uma chamada de suspensão para garantir que não possa consumir eventos enquanto os emite da thread principal.

Em seguida, criamos um PublishSubject com um observador em nosso agendador único para imprimir os valores que recebe. Finalmente, emitimos os itens de 0 a 11 para o assunto, iterando através dos valores no tópico principal e chamando PublishSubject#onNext().

Se você executar o nosso fragmento de exemplo, ele irá falhar depois de lançar esta exceção:

Exception in thread "pool-1-thread-1" java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
	at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:57)
	at rx.internal.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.run(ExecutorScheduler.java:107)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: rx.exceptions.OnErrorNotImplementedException: PublishSubject: could not emit value due to lack of requests
	at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386)
	at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383)
	at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44)
	at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:153)
	at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115)
	at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:273)
	at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:216)
	at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
	... 4 more
Caused by: rx.exceptions.MissingBackpressureException: PublishSubject: could not emit value due to lack of requests
	at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:307)
	at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:219)
	at rx.subjects.PublishSubject.onNext(PublishSubject.java:72)
	at Main.main(Main.java:36)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

Uma vez que o código emitiu 12 itens no tópico principal (4 acima de nosso limite de 8 itens), nosso observador no agendador em background não pode processá-los porque o tópico está dormindo no momento. Emitir mais do que o nosso limite de 8 itens também causa uma MissingBackpressureException, forçando nosso código a falhar.

Na próxima seção, discutiremos como aliviar esta Backpressure.

Aliviar a pressão: estratégias do Buffer

Podemos implementar algumas estratégias do buffer para abordar o exemplo MissingBackpressureException acima e outros como ele. A abordagem mais simples é usar o operador onBackpressureBuffer() da biblioteca ReactiveX, que é efetivamente o que RxJava 2.x faz por padrão agora:

subject

       .onBackpressureBuffer()

       .observeOn(scheduler)

       .map((Integer integer) -> String.format(“%d “, integer))

       .subscribe(System.out::print);

O operador armazena de forma transparente todos os valores até que o consumidor de downstream os ingira. A saída do comando é todos seus valores correspondentes, demonstrados abaixo:

Outra estratégia é descartar eventos usando o operador onBackpressureDrop(). Isso ignora novos eventos quando a fila está cheia, demonstrado abaixo:

A última opção é usar o operador onBackpressureLatest(). Isso garante que o valor mais recente não seja descartado e descartará outros valores quando a fila estiver cheia, demonstrado abaixo:

Além dos exemplos acima, essas estratégias possuem versões parametrizadas para personalizar o comportamento. Por exemplo, se você quiser usar o OnBackpressureBuffer(), mas apenas armazena um número fixo de itens, você pode aplicar o operador onBackpressureBuffer(), que exige um parâmetro de capacidade.

Dentro da Engenharia da Uber, empregamos uma mistura dessas soluções para lidar com a Backpressure, dependendo do caso de uso. Atualmente, a maior parte da base de código do Android da Uber usa o RxJava 2.x (a próxima iteração da biblioteca), que nos permite auto excluir a Backpressure . Para o nosso código RxJava 1.x restante, superamos o tamanho do nosso buffer circular para obter um comportamento semelhante ao do RxJava 2.x. (Nós abordaremos isso em maior detalhe mais tarde.)

Cuidando dos efeitos colaterais do backpressure operator

Sem uma estratégia para aliviar Backpressure , um aplicativo sob alta pressão de memória pode preencher facilmente os ring buffers internos e falhar com uma MissingBackpressureException. Um cenário comum que causa um MissingBackpressureException é observar um valor no tópico principal quando o aplicativo não é responsivo. Se a fila for preenchida antes que estes eventos possam ser consumidos, o aplicativo irá falhar com uma MissingBackpressureException. Como o tamanho padrão da fila no Android é de apenas 16 itens (contra 128 na JVM), isso pode se tornar um problema importante.

Enquanto usar uma estratégia de buffer parece ser uma decisão óbvia, esses antídotos de Backpressure ainda possuem efeitos colaterais. Por exemplo, você deve ter certeza de que o armazenamento em buffer na memória não fará com que seu aplicativo fique sem memória. Particularmente quando desenvolvendo para Android, é muito importante manter um perfil de memória baixo; não é provável que o armazenamento de Plain Old Java Objects (POJOs) cause uma falha. No entanto, os engenheiros devem pensar duas vezes antes de armazenar itens com um grande tamanho de arquivo, como bitmaps ou outros recursos intensivos em memória.

Na verdade, para uma estratégia que descarta objetos como onBackpressureDrop() ou onBackpressureLatest(), você precisa avaliar se os eventos que são descartados causarão reações adversas. Se os eventos estiverem com estado (por exemplo, um evento enviado para iniciar e parar uma determinada funcionalidade do aplicativo), descartar um evento pode resultar em problemas difíceis de ser encontrados no tempo de execução.

Depois de ter uma compreensão básica da Backpressure, a documentação do RxJava torna-se muito mais útil. Todo operador que consome eventos tem sua própria seção no Backpressure que descreve detalhadamente como ela se comportará.

Olhando através dos operadores disponíveis, poderíamos ter evitado completamente a falha no exemplo original, usando o operador range, que atende a Backpressure derivada e transmite valores conforme solicitados.

Melhorias com RxJava 2.x

Agora que dissecamos completamente as causas e as soluções para Backpressure do RxJava 1.x, vale a pena notar que o RxJava 2.x incorpora novos tipos que permitem que a biblioteca atenda melhor a Backpressure .

O tipo Observable no RxJava 2.x não tem conceito de Backpressure. Implementar Observable é efetivamente o mesmo que usar onBackpressureBuffer() por padrão. Os eventos de UI, solicitações de rede únicas e mudanças de estado devem funcionar com essa abordagem. Os tipos Completable, Maybe e Single também podem ditar esse comportamento.

Se você precisar dar suporte a Backpressure, a nova classe do RxJava 2.x, Flowable, é consciente, como o Observable era no RxJava 1.x. No entanto, a biblioteca atualizada agora exige uma escolha explícita de uma estratégia de Backpressure para evitar MissingBackpressureExceptions inesperados.

Alcançando comportamento de RxJava 2.x no RxJava 1.x

Atualmente, usamos o código RxJava 1.x legado e o código RxJava 2.x para os aplicativos Android da Uber, com todo o código novo obtido com o último modelo. Depois de migrar a maior parte do código para o RxJava 2.x para o nosso aplicativo do passageiro, ainda tínhamos uma longa cauda de falhas de Backpressure perdidas no código RxJava 1.x mais antigo. Depois de elevar a propriedade do ring buffer para 128 (combinando com o padrão na JVM), as falhas de Backpressure foram completamente erradicadas sem nenhuma pressão de memória. Oba!

No entanto, definir esta propriedade não deve ser levado com pouca importância. Ela será aplicada globalmente a cada fluxo RxJava 1.x em seu aplicativo, incluindo bibliotecas de terceiros que usam RxJava. Além disso, os engenheiros devem estar cientes de qualquer observables RxJava 1.x usando a Backpressure. Esse cenário pode causar falhas e outras implicações negativas apesar desses ajustes.

Com uma compreensão mais clara de como funciona a Backpressure, a Engenharia da Uber conseguiu re-arquitetar nossos aplicativos de passageiro e motorista com maior estabilidade, tornando a experiência de usuário mais perfeita. Esperamos que esse conhecimento seja útil para seus empreendimentos RxJava.

 

***

Este artigo é do Uber Engineering. Ele foi escrito por Tony Cosentini. A tradução foi feita pela Redação iMasters com autorização. Você pode conferir o original em: https://eng.uber.com/rxjava-backpressure/

Deixe um comentário! 0

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *

Comentando como Anônimo

leia mais