In questo quarto post dedicato al framework RxJava descriveremo gli operatori della famiglia Combining Observables.
Combining Observables
A questa famiglia appartengono tutti gli operatori che lavorano con più oggetti di tipo Observable
per creare un singolo Observable
.
Zip
L’operatore Zip restituisce un Observable
che applica una funzione definita in input alla sequenza di item emessi da due (o più) Observable
, ed i risultati di tale funzione divengono gli elementi emessi dall’Observable
restituito. La funzione di input è applicata agli elementi dei due Observable
nell’ordine rigoroso in cui sono emessi, ovvero, il primo elemento emesso sarà il risultato della combinazione del primo elemento emesso dall’Observable
numero 1 e del primo elemento emesso dall’Observable
numero 2, e così via per tutti gli elementi successivi.
Come risultato di tale comportamento, l’operatore emetterà un numero di elementi pari al numero di elementi emessi dall’Observable
che emette il minor numero di item. Per chiarezza si consideri il seguente esempio:
1 2 3 4 5 |
Observable.zip( Observable.just("A", "B", "C", "D"), Observable.just("1", "2", "3") , (x,y) -> x+y ) .subscribe( System.out::println ); |
Observable
di input, ovvero: A1, B2, C3
. Il quarto elemento D
emesso dalla prima sorgente è ignorato in quanto la seconda sorgente emette solamente 3 elementi.
Il metodo zip()
che implementa l’operatore è un metodo statico. Ne esiste però anche una versione non statica zipWith()
, che si comporta nello stesso modo. Ad esempio lo stesso risultato visto precedentemente poteva essere ottenuto con il seguente codice:
1 2 3 |
Observable.just("A", "B", "C", "D") .zipWith( Observable.just("1", "2", "3"), (x,y) -> x+y ) .subscribe( System.out::println ); |
CombineLatest e WithLatestFrom
L’operatore CombineLatest si comporta in modo simile a Zip, ma mentre Zip emette elementi solamente quando ciascuno degli Observable
sorgenti ha emesso il successivo elemento da combinare, CombineLatest emette un elemento ogni volta che uno degli Observable
di origine emette un item (purché ciascuno abbia emesso almeno un item). In altre parole quando uno degli Observable
di origine emette un elemento, CombineLatest combina gli elementi emessi più di recente da ciascuno degli altri osservabili di origine, utilizzando la funzione fornita dall’utente ed emettendo il valore restituito da tale funzione. Si consideri il seguente esempio:
1 2 3 4 5 |
Observable.combineLatest( Observable.interval(3, TimeUnit.SECONDS), Observable.interval(2, TimeUnit.SECONDS) , (x,y) -> x + "-" + y ) .subscribe( x -> System.out.println(LocalDateTime.now() + " " + x ) ); |
Il primo Observable
emette elementi ogni 3 secondi mentre il secondo ogni 2. La loro combinazione genera un Observable
che emette il primo item a 3 secondi, il secondo a 4, il terzo e quarto a 6, quindi ad 8, etc, in modo irregolare. L’output prodotto sarà quindi il seguente:
1 2 3 4 5 6 7 |
2019-09-24T11:12:52.812 0-0 2019-09-24T11:12:53.790 0-1 2019-09-24T11:12:55.787 0-2 2019-09-24T11:12:55.787 1-2 2019-09-24T11:12:57.789 1-3 2019-09-24T11:12:58.786 2-3 2019-09-24T11:12:59.787 2-4 |
Una variante dell’operatore è il WithLatestFrom che, a differenza di CombineLatest, emette nuovi elementi solamente quando lo fa l’Observable
a cui è applicato. Ad esempio nel codice:
1 2 3 4 |
Observable.interval(2, TimeUnit.SECONDS) .withLatestFrom( Observable.interval(3, TimeUnit.SECONDS), (x,y) -> x + "-" + y ) .subscribe( x -> System.out.println(LocalDateTime.now() + " " + x ) ); |
gli elementi sono emessi ogni 2 secondi, contestualmente a quelli emessi dal primo Observable
, e concatenati col l’Observable
di input:
1 2 3 4 |
2019-09-24T11:16:15.469 1-0 2019-09-24T11:16:17.448 2-0 2019-09-24T11:16:19.447 3-1 2019-09-24T11:16:21.448 4-2 |
Join
L’operatore Join combina gli elementi emessi da due Observable
e seleziona gli elementi da combinare in base ad una finestre temporale di durata definite in base all’elemento emesso. Si noti che il concetto di finestre temporale è esattamente quello introdotto parlando dell’operatore Windows descritto nel post Primi Passi con RxJava (parte 2).
In termini più semplici l’operatore riceve in input due Observable
(che indicheremo come Observable
di destra e di sinistra) e due funzioni associate. Tali funzioni sono utilizzate per definire la durata della finestra temporale dell’elemento emesso dall’Observable
associato. Ogni volta che un elemento viene emesso da una delle sue sorgenti, viene aperta una finestra per una durata definita dalla funzione.
Supponiamo ad esempio che l’Observable
di sinistra inizi ad emettere elementi, ciascuno dei quali avrà una finestra temporale di validità che eventualmente può essere sovrapposta. Questo implica che tutti gli item (validi) vengono in qualche modo accantonati. Ogni volta che la sorgente di destra emette un nuovo item, questo viene combinato con tutti gli elementi validi della sorgente di sinistra e passati alla funzione di concatenazione. Lo stesso accade nel verso opposto.
Consideriamo ad esempio il seguente codice sorgente:
1 2 3 4 5 6 |
Observable.interval(1, TimeUnit.SECONDS) .join( Observable.interval(3, TimeUnit.SECONDS), i -> Observable.timer(5, TimeUnit.MILLISECONDS), i -> Observable.timer(7, TimeUnit.MILLISECONDS), (l, r) -> l+ " - " + r ) .subscribe( x -> System.out.println(LocalDateTime.now() + " " + x ) ); |
I due Observable emettono elementi ad intervalli rispettivamente di 1 e 3 secondi e rimangono validi rispettivamente per finestre temporali di 5 e 7 secondi. Se osseviamo l’output prodoto, riportato di seguito, noteremo che ci sono istanti di tempo in cui sono emessi dal Join più item, ad esempio quello al secondo 34 o 37:
1 2 3 4 5 6 7 8 |
2019-09-24T15:19:48.060 2 - 0 2019-09-24T15:19:51.034 5 - 1 2019-09-24T15:19:54.034 8 - 2 2019-09-24T15:19:57.037 11 - 3 2019-09-24T15:20:00.037 14 - 4 2019-09-24T15:20:03.034 17 - 5 2019-09-24T15:20:06.032 20 - 6 2019-09-24T15:20:09.037 23 - 7 |
Una variante di tale operatore è l’operatore GroupJoin che è molto simile, ad eccezione del fatto che la funzione definita per combinare gli elementi emessi dalle due Observable
accoppia singoli elementi emessi dall’Observable
sorgente non con un elemento del secondo Observable
, ma con un Observable
che emette elementi dal secondo Observable
che rientrano nella stessa finestra.
Merge e MergeDelayError
È possibile combinare l’output prodotto da più Observable
in modo che si comportino come un unico Observable
, utilizzando l’operatore Merge. Un semplice esempio di applicazione di tale operatore è il seguente:
1 2 3 4 |
Observable.merge( Observable.interval(1,TimeUnit.SECONDS), Observable.interval(3,TimeUnit.SECONDS) ) .subscribe( x -> System.out.println(LocalDateTime.now() + " " + x ) ); |
Di seguito è mostrato l’output prodotto, in cui è evidente che ad ogni secondo sono stampati gli item emessi dal primo Observable, mentre ogni 3 secondo viene stampato anche quello emesso dal secondo (si vedano i secondi 40 e 43).
1 2 3 4 5 6 7 8 9 10 |
2019-09-24T15:39:38.589 0 2019-09-24T15:39:39.567 1 2019-09-24T15:39:40.572 2 2019-09-24T15:39:40.576 0 2019-09-24T15:39:41.570 3 2019-09-24T15:39:42.570 4 2019-09-24T15:39:43.571 5 2019-09-24T15:39:43.571 1 2019-09-24T15:39:44.569 6 2019-09-24T15:39:45.571 7 |
Varianti di tale operatore ammettono come parametri, invece che degli Observable
, oggetti di tipo List
di Observable
, Iterable
di Observable
, Array
di Observable
o addirittura Observable
diObservable
.
Alla stessa famiglia appartiene l’operatore MergeDelayError che, come Merge, unisce gli Observable
sorgenti in un unico Observable
, ma differisce ogni errore sollevato dagli Observable
al momento in cui tutti gli Observable
, che non hanno emesso errori, inviano il segnale di completamento. In altri termini tale operatore consente a tutti gli Observable
di terminare differendo ogni possibile errore alla chiusura di tutti gli Observable
. Ad esempio il codice seguente:
1 2 3 4 |
Observable.mergeDelayError( Observable.intervalRange( 1, 5, 0, 1, TimeUnit.SECONDS), Observable.error( new Exception() ) ) .subscribe( x -> System.out.println(LocalDateTime.now() + " " + x ) ); |
stamperà gli interi da 1 a 5 emessi dal primo Observable
, e solamente dopo 5 secondi restituirà l’eccezione sollevata dal secondo Observable
:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
2019-09-24T16:04:18.179 1 2019-09-24T16:04:19.162 2 2019-09-24T16:04:20.160 3 2019-09-24T16:04:21.159 4 2019-09-24T16:04:22.164 5 io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.lang.Exception ... Caused by: java.lang.Exception at it.javaboss.combine.MergeOperator.main(MergeOperator.java:19) Exception in thread "RxComputationThreadPool-1" io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.lang.Exception ... Caused by: java.lang.Exception at it.javaboss.combine.MergeOperator.main(MergeOperator.java:19) |
StartWith
L’operatore StartWith molto banalmente emette una specifica sequenza di elementi prima di iniziare a emettere gli elementi emessi dall’Observable
sorgente. Esistono tre varianti dell’operatore che ricevono rispettivamente come input un singolo item, in Iterable
o un Observable
. La tabella seguente mostra un esempio di utilizzo tali varianti ed il relativo output prodotto.
|
|
||||
|
|
||||
|
|
SwitchOnNext e SwitchOnNextDelayError
Tale operatore converte un Observable
che emette più Observable
in un unico Observable
che emette gli elementi emessi dall’ultimo emesso di tali Observable
. Ogni volta che osserva uno degli Observable
emessi, l’Observable
restituito da Switch annulla l’osservazione emessa in precedenza ed inizia a emettere elementi dall’ultima Observable
.
In altri termini l’operatore riceve in input un Observable
che emette Observable
. Quando il primo di questi Observable
emette item, l’operatore restituisce tali elementi, fino a quando viene emesso un secondo Observable
. A questo punto chiude la sottoscrizione col primo Observable
e comincia a seguire il secondo, riproponendo in output i suoi elementi.
Si consideri ad esempio il seguente codice sorgente:
1 2 3 4 |
Observable.switchOnNext( Observable.interval(5, TimeUnit.SECONDS) .map( t -> Observable.interval(1, TimeUnit.SECONDS) ) ) .subscribe( System.out::println ); |
L’operatore SwitchOnNext riceve in input un Observable
che emette ogni 5 secondi un nuovo Observable
che emette interi con una frequenza di uno al secondo. Poichè l’ultimo Observable
emesso annulla la sottoscrizione al precedente, l’output generato sarà il seguente, in cui si nota che ogni 5 secondi (circa) la sequenza ricomincia daccapo:
1 2 3 4 5 6 7 8 9 10 11 12 |
0 1 2 3 0 1 2 3 4 0 1 2 |
La variante SwitchOnNextDelayError si comporta in modo analogo a quanto faceva il metodo MergeDelayError, ovvero ritarda l’emissione di qualsiasi errore da parte degli Observable
al momento della chiusura di tutti gli Observable
.