Nell’articolo Reactive Programming abbiamo introdotto la programmazione reattiva per poi mostrare la specifica Reactive Streams, che è il risultato dello sforzo di diversi attori che operano attivamente in tale contesto, al fine di fissare le caratteristiche principali del paradigma. RxJava, implementazione java del progetto ReactiveX, è uno di tali attori anche se ha adottato la specifica solamente nella versione 2, in quanto tale progetto nasce prima del rilascio specifica Reactive Streams.
Questo è anche il motivo per cui i concetti che sono alla base di ReactiveX sono leggermente differenti. Esso infatti estende il design pattern observer (vedi Observer Design Pattern) al fine di supportare l’elaborazione di sequenze di dati/eventi ed aggiungendo operatori che possono essere composti sequenzialmente in modo dichiarativo. Il tutto consentendo allo sviluppatore di disinteressarsi di concetti quali la gestione dei thread, la sincronizzazione, la concorrenza, etc., che sono gestiti direttamente dal framework in modo trasparente.
Building Block
I concetti che sono alla base del framework RxJava sono due: Observable e Observer.
Observable | Gli observable sono le sorgenti dei dati ovvero gli analoghi dei Publisher nella specifica reactive streams. Generalmente iniziano a distribuire dati quando un subscriber si mette in ascolto. Un observable può emettere un numero qualsiasi di oggetti (anche nessuno) e può terminare correttamente o con un errore. E’ anche possibile che non terminino mai, ad esempio un observable per il clic sul pulsante del mouse può potenzialmente produrre un flusso infinito di eventi. |
Observer | Un observable può avere un numero qualsiasi di observer, che sono gli analoghi dei Subscriber nella specifica reactive streams. Se l’observable emette un nuovo elemento, il metodo onNext() viene invocato su ciascuno dei observer. Analogamente, se l’observable termina correttamente il flusso di dati, il metodo onComplete() viene invocato su ciascuno dei observer. Infine, se l’observable termina invece con un errore, il metodo onError() viene invocato su ciascuno dei observer. |
A tali concetti vanno poi aggiunti gli operatori che possono essere applicati in successione al flusso dei dati, a partire dalla sorgente (observable) per terminare con il consumatore (observer). Tali operatori giocano il ruolo dei Processor
nella specifica reactive streams, ovvero si comportano contemporaneamente come observer e come observable.
observalbe.operator1().operator2().operator3().subscribe(observer);
Molto spesso tali concetti sono descritti attraverso diagrammi denominati marble diagrams, che rappresentano sinteticamente gli observable i subscriber e le trasformazioni realizzate dagli operatori.
A tale proposito il sito https://rxmarbles.com/ offre diversi marble diagrams interattivi con cui l’utente può interagire per comprendere meglio il comportamento dei diversi operatori.
Observable
Il framework mette a disposizione diversi tipi di observable:
Flowable
: possono emettere 0 o n elementi e terminare con successo o con un errore. Supportano la backpressure consentendo di controllare la velocità con cui una sorgente genera i dati.Observable
: possono emettere 0 o n elementi e terminare con successo o con un errore.Single
: possono emettere un singolo item o generare un evento di errore. Si tratta della versione reactive dell’invocazione di un metodo java.Maybe
: possono emettere un singolo item o nessun item, oppure possono generare un evento di errore. Si tratta della versione reactive del tipo di datoOptional
in java.Completable
: terminano sempre generando un evento di completamento o di errore, ma soprattutto non emettono alcun dato. Si tratta della versione reattiva delRunnable
in java.
Il framework inoltre fornisce diversi metodi di utilità per generare un observable, alcuni dei quali sono descritti nella tabella seguente.
Classe | Metodo | Descrizione | ||
Observable Flowable Single Maybe |
just() |
Consente di creare un observable come wrapper degli oggetti forniti in input.
|
||
Observable Flowable |
fromIterable() |
Accetta un java.lang.Iterable come input ed emette i valori contenuti nell’ordine che hanno all’interno struttura dati.
|
||
Observable Flowable |
fromArray() |
Accetta un array come input ed emette i valori contenuti nell’ordine che hanno nella struttura dati.
|
||
Observable Flowable Single Maybe Completable |
fromCallable() |
Consente di creare un observable a partire da un java.util.concurrent.Callable .
|
||
Observable Flowable Single Maybe Completable |
fromFuture() |
Consente di creare un observable per un oggetto di tipo java.util.concurrent.Future .
|
||
Observable Flowable |
interval() |
Consente di creare un observable che emette valori interi ad un intervallo specificato (frequenza).
|
Observer
L’interfaccia Observer
di RxJava definisce tre metodi, gli stessi che troviamo nella analoga interfaccia Subscriber
di reactive streams:
onNext()
viene invocato sull’observer ogni volta che un nuovo evento è pubblicato dall’observable a cui è sottoscritto.onCompleted()
viene chiamato quando la sequenza di eventi associati a un observable è completa, ed indica che non dovremmo più aspettarci altre invocazioni sull’observer.onError()
viene invocato quando è generata un’eccezione non gestita durante il codice del framework RxJava o il nostro codice di gestione degli eventi.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public class SystemOutObserver<T> implements Observer<T> { public void onSubscribe(Disposable d) { System.out.println( "SUBSCRIBED" ); } public void onNext(T t) { System.out.println( t ); } public void onError(Throwable e) { System.out.println( e ); } public void onComplete() { System.out.println( "DONE" ); } } |
Dato un observer la sottoscrizione avviene molto semplicemente invocando il metodo subscribe()
sull’oggetto observable:
1 2 |
Observer<Long> observer = new SystemOutObserver<Long>(); Observable.interval(1, TimeUnit.SECONDS).subscribeWith(observer); |
La particolarità del framework RxJava è che il metodo subscribe()
è sovraccaricato, ovvero presenta diverse firme al fine di fornire al metodo esclusivamente le callback necessarie a gestire i tre segnali di onNext()
, onComplete()
ed onError()
, piuttosto che l’intero oggetto Observer
.
1 2 3 4 5 6 |
Observable.interval(1, TimeUnit.SECONDS).subscribe( e -> System.out.println( e ), t -> System.out.println( t ), () -> System.out.println("DONE"), e -> System.out.println("SUBSCRIBED") ); |
Confronto con Reactive Streams
Come già anticipato RxJava recepisce la specifica reactive streams solamente dalla versione 2 del framework. In particolare è il tipo Flowable che implementa l’interfaccia org.reactivestreams.Publisher
e supporta la gestione del backpressure.
1 2 3 |
public abstract class Flowable<T> implements Publisher<T> { .... } |
In conseguenza di ciò Flowable
eredita il metodo subscribe()
che riceve in input un tipo org.reactivestreams.Subscriber
. Inoltre il RxJava definisce l’interfaccia FlowableSubscriber
che estende l’interfaccia Subscriber
di reactive streams, rilassando però le regole 1.3 e 3.9 della specifica.
Per completezza segnaliamo inoltre che il framework introduce la classe astratta FlowableProcessor che implementa l’interfaccia org.reactivestreams.Processor
:
1 2 3 |
public abstract class FlowableProcessor<T> extends Flowable<T> implements Processor<T, T>, FlowableSubscriber<T> { .... } |