Primi Passi con RxJava (parte 1)

Nell’articolo Reactive Programming abbiamo introdotto la programmazione reattiva per poi mostrare la specifica Reactive Streams, che è il risultato dello sforzo di diversi attori che operano attivamente in tale contesto, al fine di fissare le caratteristiche principali del paradigma. RxJava, implementazione java del progetto ReactiveX, è uno di tali attori anche se ha adottato la specifica solamente nella versione 2, in quanto tale progetto nasce prima del rilascio specifica Reactive Streams.

Questo è anche il motivo per cui i concetti che sono alla base di ReactiveX sono leggermente differenti. Esso infatti estende il design pattern observer (vedi Observer Design Pattern) al fine di supportare l’elaborazione di sequenze di dati/eventi ed aggiungendo operatori che possono essere composti sequenzialmente in modo dichiarativo. Il tutto consentendo allo sviluppatore di disinteressarsi di concetti quali la gestione dei thread, la sincronizzazione, la concorrenza, etc., che sono gestiti direttamente dal framework in modo trasparente.

Building Block

I concetti che sono alla base del framework RxJava sono due: Observable e Observer.

Observable Gli observable sono le sorgenti dei dati ovvero gli analoghi dei Publisher nella specifica reactive streams. Generalmente iniziano a distribuire dati quando un subscriber si mette in ascolto. Un observable può emettere un numero qualsiasi di oggetti (anche nessuno) e può terminare correttamente o con un errore. E’ anche possibile che non terminino mai, ad esempio un observable per il clic sul pulsante del mouse può potenzialmente produrre un flusso infinito di eventi.
Observer Un observable può avere un numero qualsiasi di observer, che sono gli analoghi dei Subscriber nella specifica reactive streams. Se l’observable emette un nuovo elemento, il metodo onNext() viene invocato su ciascuno dei observer. Analogamente, se l’observable termina correttamente il flusso di dati, il metodo onComplete() viene invocato su ciascuno dei observer. Infine, se l’observable termina invece con un errore, il metodo onError() viene invocato su ciascuno dei observer.

A tali concetti vanno poi aggiunti gli operatori che possono essere applicati in successione al flusso dei dati, a partire dalla sorgente (observable) per terminare con il consumatore (observer). Tali operatori giocano il ruolo dei Processor nella specifica reactive streams, ovvero si comportano contemporaneamente come observer e come observable.

observalbe.operator1().operator2().operator3().subscribe(observer);

Molto spesso tali concetti sono descritti attraverso diagrammi denominati marble diagrams, che rappresentano sinteticamente gli observable i subscriber e le trasformazioni realizzate dagli operatori.

A tale proposito il sito https://rxmarbles.com/ offre diversi marble diagrams interattivi con cui l’utente può interagire per comprendere meglio il comportamento dei diversi operatori.

Observable

Il framework mette a disposizione diversi tipi di observable:

  • Flowable: possono emettere 0 o n elementi e terminare con successo o con un errore. Supportano la backpressure consentendo di controllare la velocità con cui una sorgente genera i dati.
  • Observable: possono emettere 0 o n elementi e terminare con successo o con un errore.
  • Single: possono emettere un singolo item o generare un evento di errore. Si tratta della versione reactive dell’invocazione di un metodo java.
  • Maybe:  possono emettere un singolo item o nessun item, oppure possono generare un evento di errore. Si tratta della versione reactive del tipo di dato Optional in java.
  • Completable: terminano sempre generando un evento di completamento o di errore, ma soprattutto non emettono alcun dato. Si tratta della versione reattiva del Runnable in java.

Il framework inoltre fornisce diversi metodi di utilità per generare un observable, alcuni dei quali sono descritti nella tabella seguente.

Classe Metodo Descrizione
Observable
Flowable
Single
Maybe
just()

Consente di creare un observable come wrapper degli oggetti forniti in input.

Observable
Flowable
fromIterable() Accetta un java.lang.Iterable come input ed emette i valori contenuti nell’ordine che hanno all’interno struttura dati.
Observable
Flowable
fromArray() Accetta un array come input ed emette i valori contenuti nell’ordine che hanno nella struttura dati.
Observable
Flowable
Single
Maybe
Completable
fromCallable() Consente di creare un observable a partire da un java.util.concurrent.Callable.
Observable
Flowable
Single
Maybe
Completable
fromFuture() Consente di creare un observable per un oggetto di tipo java.util.concurrent.Future.
Observable
Flowable
interval() Consente di creare un observable che emette valori interi ad un intervallo specificato (frequenza).

Observer

L’interfaccia Observer di RxJava definisce tre metodi, gli stessi che troviamo nella analoga interfaccia Subscriber di reactive streams:

  • onNext() viene invocato sull’observer ogni volta che un nuovo evento è pubblicato dall’observable a cui è sottoscritto.
  • onCompleted() viene chiamato quando la sequenza di eventi associati a un observable è completa, ed indica che non dovremmo più aspettarci altre invocazioni sull’observer.
  • onError() viene invocato quando è generata un’eccezione non gestita durante il codice del framework RxJava o il nostro codice di gestione degli eventi.

Dato un observer la sottoscrizione avviene molto semplicemente invocando il metodo subscribe() sull’oggetto observable:

La particolarità del framework RxJava è che il metodo subscribe() è sovraccaricato, ovvero presenta diverse firme al fine di fornire al metodo esclusivamente le callback necessarie a gestire i tre segnali di onNext()onComplete() ed onError(), piuttosto che l’intero oggetto Observer.

Confronto con Reactive Streams

Come già anticipato RxJava recepisce la specifica reactive streams solamente dalla versione 2 del framework. In particolare è il tipo Flowable che implementa l’interfaccia org.reactivestreams.Publisher e supporta la gestione del backpressure.

In conseguenza di ciò Flowable eredita il metodo subscribe() che riceve in input un tipo org.reactivestreams.Subscriber. Inoltre il RxJava definisce l’interfaccia FlowableSubscriber che estende l’interfaccia Subscriber di reactive streams, rilassando però le regole 1.3 e 3.9 della specifica.

Per completezza segnaliamo inoltre che il framework introduce la classe astratta  FlowableProcessor che implementa l’interfaccia org.reactivestreams.Processor: