In questo quarto post dedicato al framework RxJava descriveremo gli operatori della famiglia degli Utility Operators, ovvero di quegli operatori di utilità generale che consentono di operare sugli Observable
.
Observable Utility Operators
Delay
L’operatore Delay modifica l’Observable
a cui è applicato ritardando l’emissione degli elementi emessi, introducendo una pausa per un periodo di tempo specificata dall’utente. Ciò ha l’effetto di spostare l’intera sequenza di elementi emessi dall’Observable
in avanti nel tempo.
La variante più semplice dell’operatore richiede come parametro esclusivamente la durata della pausa, definita in termini di quantità di tempo ed unità di misura applicata:
1 2 3 |
Observable.interval(1, TimeUnit.SECONDS) .delay( 5, TimeUnit.SECONDS ) .subscribe( System.out::println ); |
In questo caso ogni volta che l’Observable
sorgente emette un item, il Delay avvia un timer e quando il timer raggiunge la durata specificata, l’Observable
restituito dal Delay emette lo stesso item. L’effetto del delay si manifesta anche sull’evento onCompleted
, che risulta quindi ritardato. Diversamente il ritardo applicato non ha effetto su una notifica onError
, che viene immediatamente inoltrata causando la perdita di tutti gli eventuali item in sospeso. Esiste comunque una variante che riceve come ulteriore parametro un boolean delayError
che se impostato a TRUE
applica il ritardo anche ad un eventuale errore.
Una variante dell’operatore consente di definire un ritardo differente per ogni item emesso. Tale variante richiede in input una funzione che genera un nuovo Observable
che si comporta da timer associato all’item emesso. Non appena tale Observable
emette un item, l’item associato è emesso dal Delay. Si consideri il seguente esempio:
1 2 3 4 5 |
Observable.interval(1, TimeUnit.SECONDS) .delay( x -> { System.out.println( "Delaying " + x + " for " + x + " sec." ); return Observable.timer( x, TimeUnit.SECONDS );} ) .subscribe( x -> System.out.println( "Emitting " + x ) ); |
La funzione di input all’operatore genera un Observable
che emette il suo unico item 0
dopo un intervallo di tempo in secondi pari all’item emesso dall’Observable
sorgente. In conseguenza di ciò l’item 3, ad esempio, subisce un ritardo di 3 secondi, come mostrato dall’output generato.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Delaying 0 for 0 sec. Emitting 0 Delaying 1 for 1 sec. Emitting 1 Delaying 2 for 2 sec. Delaying 3 for 3 sec. Delaying 4 for 4 sec. Emitting 2 Delaying 5 for 5 sec. Delaying 6 for 6 sec. Emitting 3 Delaying 7 for 7 sec. Delaying 8 for 8 sec. Emitting 4 |
Do
Tale operatore consente di registrare un’azione da eseguire a seguito di diversi eventi che possono caratterizzare il ciclo di vita di un Observable
. Esistono diverse implementazioni dell’operatore in funzione dell’evento che si intende osservare:
doAfterNext |
Invoca un Consumer di input dopo aver emesso ogni item. |
doAfterTerminate |
Incoca l’Action di input quando l’Observable sorgente emette un evento onComplete o onError . |
doFinally |
Incoca l’Action di input quando l’Observable sorgente emette un evento onComplete o onError , oppure quando un osservatore chiude la sottoscrizione. Si noti che l’action deve essere thread-safe. |
doOnDispose |
Incoca l’Action di input quando osservatore chiude la sottoscrizione. Si noti che l’action deve essere thread-safe. |
doOnComplete |
Incoca l’Action di input quando l’Observable sorgente emette un evento onComplete . |
doOnEach |
Esistono due variante dell’operatore. Una invoca un Consumer ad ogni item emesso, mentre l’altra notifica l’evento ad un Observer . |
doOnError |
Incoca l’Action di input quando l’Observable sorgente emette un evento onError . |
doOnLifecycle |
Invoca un Consumer di input ad ogni richiesta di sottoscrizione e l’Action di input ad ogni annullamento della sottoscrizione. |
doOnNext |
Incoca l’Action di input quando l’Observable sorgente emette un evento onNext . |
doOnSubscribe |
Invoca il Consumer di input ad ogni richiesta di sottoscrizione. |
doOnTerminate |
Incoca l’Action di input quando l’Observable sorgente emette un evento onComplete . |
Il codice seguente utilizza diversi dei metodi descritti, generando l’output riportato di fianco:
|
|
Materialize/Dematerialize
L’operatore Materialize traduce la serie di eventi emessi dall’Observable
sorgente onNext
, onCompleted
e onError
, in una serie di oggetti di tipo io.reactivex.Notification
che sono emessi dall’Observable
risultante. L’operatore Dematerialize esegue l’operazione inversa. L’operatore Materialize è utile sostanzialmente quando si preferisce gestire tutti gli eventi emessi dallo stream in modo uniforme, senza dover implementare un handler diverso per ciascuno di essi. Si consideri ad esempio il seguente codice sorgente:
1 2 3 4 5 6 7 8 9 10 11 12 |
Observable.range(1, 3) .materialize() .subscribe( x -> { System.out.print( x.getClass() + " " ); if ( x.isOnNext() ) { System.out.println( x.getValue() ); } else if ( x.isOnComplete() ) { System.out.println( "Completed" ); } else if ( x.isOnError() ) { System.out.println( x.getError() ); } }); |
1 2 3 4 |
class io.reactivex.Notification 1 class io.reactivex.Notification 2 class io.reactivex.Notification 3 class io.reactivex.Notification Completed |
SubscribeOn e ObserveOn
Questi operatori sono utili quando si intende utilizzare RxJava in un progetto multithreading. Per default un Observable
, e la catena di operatori applicati, svolgeranno il loro lavoro e notificano ai loro osservatori, nello stesso thread su cui viene invocato il metodo subscribe()
. L’operatore SubscribeOn modifica tale comportamento specificando un altro Scheduler
su cui l’Osservable
andrà ad operare. Analogamente l’operatore ObserveOn specifica un diverso Scheduler
che l’Observable
utilizzerà per inviare notifiche ai propri osservatori.
Consideriamo ad esempio il codice seguente:
1 2 3 |
Observable.range(1, 3) .doOnNext( c -> System.out.println("Item processed on thread name " + Thread.currentThread().getName() + ": " + c ) ) .subscribe( c -> System.out.println("Item received on thread name " + Thread.currentThread().getName() + ": " + c ) ); |
L’output prodotto, mostrato di seguito, dimostra che lo stream di eventi viene prodotto e processato all’interno del thread del metodo main()
.
1 2 3 4 5 6 |
Item processed on thread name main: 1 Item received on thread name main: 1 Item processed on thread name main: 2 Item received on thread name main: 2 Item processed on thread name main: 3 Item received on thread name main: 3 |
Spesso però può avere senso delegare le operazioni ad un thread differente. Si pensi ad esempio ad operazioni di I/O eseguite sugli item emessi, che comporterebbero il blocco del thread principale ad ogni accesso I/O. A tale scopo l’operatore SubscribeOn consente di indicare, all’Observable
sorgente, quale thread si occuperà di gestire l’emissione ed il processamento degli item. Modifichiamo quindi il codice nel seguente modo:
1 2 3 4 |
Observable.range(1, 3) .doOnNext( c -> System.out.println("Item processed on thread name " + Thread.currentThread().getName() + ": " + c ) ) .subscribeOn( Schedulers.newThread() ) .subscribe( c -> System.out.println("Item received on thread name " + Thread.currentThread().getName() + ": " + c ) ); |
La classe Scheduler
contiene diversi factory method per oggetti di tipo Thread
. Nel nostro esempio viene generato un nuovo thread che si occuperà esclusivamente di gestire le emissioni ed il loro processamento. Conseguentemente l’output prodotto sarà:
1 2 3 4 5 6 |
Item processed on thread name RxNewThreadScheduler-1: 1 Item received on thread name RxNewThreadScheduler-1: 1 Item processed on thread name RxNewThreadScheduler-1: 2 Item received on thread name RxNewThreadScheduler-1: 2 Item processed on thread name RxNewThreadScheduler-1: 3 Item received on thread name RxNewThreadScheduler-1: 3 |
RxNewThreadScheduler-1
differente da quello associato al main
.
L’operatore ObserveOn consente, invece, di svincolare le operazioni di emissione e processamento degli item emessi, in modo che ciascuna di tali operazioni sia gestita da thread differente. Modifichiamo il nostro codice di esempio nel seguente modo:
1 2 3 4 5 |
Observable.range(1, 3) .doOnNext( c -> System.out.println("Item processed on thread name " + Thread.currentThread().getName() + ": " + c ) ) .subscribeOn( Schedulers.newThread() ) .observeOn( Schedulers.newThread() ) .subscribe( c -> System.out.println("Item received on thread name " + Thread.currentThread().getName() + ": " + c ) ); |
L’output prodotto mostra che i thread sono diventati due: il thread RxNewThreadScheduler-1
che gestisce le emissioni, ed il thread RxNewThreadScheduler-2
che invece gestisce il processamento lato Subscriber
.
1 2 3 4 5 6 |
Item processed on thread name RxNewThreadScheduler-1: 1 Item processed on thread name RxNewThreadScheduler-1: 2 Item processed on thread name RxNewThreadScheduler-1: 3 Item received on thread name RxNewThreadScheduler-2: 1 Item received on thread name RxNewThreadScheduler-2: 2 Item received on thread name RxNewThreadScheduler-2: 3 |
Ulteriori approfondimenti sull’utilizzo di tali operatori potete trovarlo nell’articolo: Understanding RxJava subscribeOn and observeOn.
TimeInterval
L’operatore TimeInterval intercetta gli elementi emessi dall’Observable
sorgente ed emette, al loro posto, oggetti di tipo io.reactivex.schedulers.Timed
, che indicano la quantità di tempo che è trascorso tra le successive emissioni. Consideriamo il seguente codice:
1 2 3 |
Observable.interval(1, TimeUnit.SECONDS) .timeInterval() .subscribe( t -> System.out.println( t.time() + " " + t.unit().name() + " -> " + t.value() ) ); |
1 2 3 4 5 |
1001 MILLISECONDS -> 0 1001 MILLISECONDS -> 1 999 MILLISECONDS -> 2 1001 MILLISECONDS -> 3 999 MILLISECONDS -> 4 |
Una variante dell’operatore consente di indicare l’unità di misura con cui si intende campionare gli intervalli (e.g. secondi, minuti, etc.). Si noti comunque che l’oggetto Timed
attraverso il metodo time()
consente di eseguire la conversione ad una qualsiasi altra unità.
Timeout
L’operatore Timeout consente di interrompere un Observable
con una segnalazione onError
se tale Observable
non emette alcun elemento durante un periodo di tempo specificato. In altri termini l’operatore riemette gli item ricevuti, ma per ognuno di essi avvia un timer, e nel caso in cui tale timer scada senza che la sorgente abbia emesso il successivo item, l’operatore solleva una eccezione di tipo java.util.concurrent.TimeoutException
.
1 2 3 |
Observable.interval(2, TimeUnit.SECONDS) .timeout( 1, TimeUnit.SECONDS ) .subscribe( t -> System.out.println( t ) ); |
Una seconda variante dell’operatore Timeout differisce dalla prima in quanto invece di emettere una notifica di errore in caso di una condizione di timeout, passa invece immediatamente a un osservabile di backup specificato.
1 2 3 |
Observable.interval(2, TimeUnit.SECONDS) .timeout( 1, TimeUnit.SECONDS, Observable.just( 100L ) ) .subscribe( t -> System.out.println( t ) ); |
Timestamp
L’operatore Timestamp allega un timestamp a ciascun elemento emesso dalla sorgente Observable
prima di riemetterlo nella sequenza. Il timestamp indica a che ora è stato emesso l’elemento. Si noti che l’oggetto emesso è di tipo io.reactivex.schedulers.Timed
come per l’operatore TimeInterval, ma ovviamente rappresenta un concetto differente. Ad esempio il seguente codice:
1 2 3 |
Observable.interval(1, TimeUnit.SECONDS) .timestamp() .subscribe( t -> System.out.println( t.time() + " " + t.unit().name() + " -> " + t.value() ) ); |
1 2 3 |
1569939088595 MILLISECONDS -> 0 1569939089595 MILLISECONDS -> 1 1569939090593 MILLISECONDS -> 2 |
Using
L’operatore Using consente di creare un Observable
a partire da una risorsa (usa e getta) che esiste solamente per la durata della vita dell’Observable
e che viene eliminata al termine dello stesso. Tale operatore riceve come input tre parametri:
resourceSupplier
: una factory function utilizzata per istanziare la risorsa;sourceSupplier
: una factory function utilizzata per generare l’Observable
a partire dalla risorsa data;disposer
: una funzione utilizzata del distruggere la risorsa.
Quando un osservatore si iscrive all’Observable
restituito dall’operatore Using, l’operatore utilizzerà la funzione sourceSupplier
per creare l’Observable
che l’osservatore osserverà, mentre allo stesso tempo utilizzerà la funzione resourceSupplier
per creare qualsiasi risorsa di cui si ha bisogno. Quando l’osservatore annulla l’iscrizione all’Observable
o quando l’Observable
termina (normalmente o con un errore), l’operatore Using chiamerà la terza funzione, disposer
, per distruggere la risorsa che ha creato.
Consideriamo il seguente codice sorgente:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
Callable<String> resourceSupplier = new Callable<String>() { String resource = "10,13,54,23,15"; public String call() throws Exception { System.out.println( "Returning resource: " + resource ); return resource; } }; Function<String, Observable<String>> sourceSupplier = new Function<String, Observable<String>>() { public Observable<String> apply(String resource) throws Exception { return Observable.fromArray( resource.split(",") ); } }; Consumer<String> disposer = new Consumer<String>() { public void accept(String resource) throws Exception { System.out.println( "Disposing resource: " + resource ); resource = null; } }; Observable .using(resourceSupplier, sourceSupplier, disposer) .subscribe( System.out::println ); |
La funzione resourceSupplier
genera la stringa "10,13,54,23,15"
, che è poi passata alla funzione sourceSupplier
che esegue uno split e restituisce un Observable
che emette la lista "10"
, "13"
, "54"
, "23"
, "15"
. La funzione disposer
è infine invocata al termine della sequenza emessa, a seguito dell’evento onComplete
. L’output prodotto sarà quindi:
1 2 3 4 5 6 7 |
Returning resource: 10,13,54,23,15 10 13 54 23 15 Disposing resource: 10,13,54,23,15 |