Primi Passi con RxJava (parte 4)

In questo quarto post dedicato al framework RxJava descriveremo gli operatori della famiglia Combining Observables.

Combining Observables

A questa famiglia appartengono tutti gli operatori che lavorano con più oggetti di tipo Observable per creare un singolo Observable.

Zip

L’operatore Zip restituisce un Observable che applica una funzione definita in input alla sequenza di item emessi da due (o più) Observable, ed i risultati di tale funzione divengono gli elementi emessi dall’Observable restituito. La funzione di input è applicata agli elementi dei due Observable nell’ordine rigoroso in cui sono emessi, ovvero, il primo elemento emesso sarà il risultato della combinazione del primo elemento emesso dall’Observable numero 1 e del primo elemento emesso dall’Observable numero 2, e così via per tutti gli elementi successivi.

Come risultato di tale comportamento, l’operatore emetterà un numero di elementi pari al numero di elementi emessi dall’Observable che emette il minor numero di item. Per chiarezza si consideri il seguente esempio:

La sequenza emessa come output è il risultato della concatenazione delle stringhe emesse dai due Observable di input, ovvero: A1, B2, C3. Il quarto elemento D emesso dalla prima sorgente è ignorato in quanto la seconda sorgente emette solamente 3 elementi.

Il metodo zip() che implementa l’operatore è un metodo statico. Ne esiste però anche una versione non statica zipWith(), che si comporta nello stesso modo. Ad esempio lo stesso risultato visto precedentemente poteva essere ottenuto con il seguente codice:

CombineLatest e WithLatestFrom

L’operatore CombineLatest si comporta in modo simile a Zip, ma mentre Zip emette elementi solamente quando ciascuno degli Observable sorgenti ha emesso il successivo elemento da combinare, CombineLatest emette un elemento ogni volta che uno degli Observable di origine emette un item (purché ciascuno abbia emesso almeno un item). In altre parole quando uno degli Observable di origine emette un elemento, CombineLatest combina gli elementi emessi più di recente da ciascuno degli altri osservabili di origine, utilizzando la funzione fornita dall’utente ed emettendo il valore restituito da tale funzione. Si consideri il seguente esempio:

Il primo Observable emette elementi ogni 3 secondi mentre il secondo ogni 2. La loro combinazione genera un Observable che emette il primo item a 3 secondi, il secondo a 4, il terzo e quarto a 6, quindi ad 8, etc, in modo irregolare. L’output prodotto sarà quindi il seguente:

Una variante dell’operatore è il WithLatestFrom che, a differenza di CombineLatest, emette nuovi elementi solamente quando lo fa l’Observable a cui è applicato. Ad esempio nel codice:

gli elementi sono emessi ogni 2 secondi, contestualmente a quelli emessi dal primo Observable, e concatenati col l’Observable di input:

Join

L’operatore Join combina gli elementi emessi da due Observable e seleziona gli elementi da combinare in base ad una finestre temporale di durata definite in base all’elemento emesso. Si noti che il concetto di finestre temporale è esattamente quello introdotto parlando dell’operatore Windows descritto nel post Primi Passi con RxJava (parte 2).

In termini più semplici l’operatore riceve in input due Observable (che indicheremo come Observable di destra e di sinistra) e due funzioni associate. Tali funzioni sono utilizzate per definire la durata della finestra temporale dell’elemento emesso dall’Observable associato. Ogni volta che un elemento viene emesso da una delle sue sorgenti, viene aperta una finestra per una durata definita dalla funzione.

Supponiamo ad esempio che l’Observable di sinistra inizi ad emettere elementi, ciascuno dei quali avrà una finestra temporale di validità che eventualmente può essere sovrapposta. Questo implica che tutti gli item (validi) vengono in qualche modo accantonati. Ogni volta che la sorgente di destra emette un nuovo item, questo viene combinato con tutti gli elementi validi della sorgente di sinistra e passati alla funzione di concatenazione. Lo stesso accade nel verso opposto.

Consideriamo ad esempio il seguente codice sorgente:

I due Observable emettono elementi ad intervalli rispettivamente di 1 e 3 secondi e rimangono validi rispettivamente per finestre temporali di 5 e 7 secondi. Se osseviamo l’output prodoto, riportato di seguito, noteremo che ci sono istanti di tempo in cui sono emessi dal Join più item, ad esempio quello al secondo 34 o 37:

Una variante di tale operatore è l’operatore GroupJoin che è molto simile, ad eccezione del fatto che la funzione definita per combinare gli elementi emessi dalle due Observable accoppia singoli elementi emessi dall’Observable sorgente non con un elemento del secondo Observable, ma con un Observable che emette elementi dal secondo Observable che rientrano nella stessa finestra.

Merge e MergeDelayError

È possibile combinare l’output prodotto da più Observable in modo che si comportino come un unico Observable, utilizzando l’operatore Merge. Un semplice esempio di applicazione di tale operatore è il seguente:

Di seguito è mostrato l’output prodotto, in cui è evidente che ad ogni secondo sono stampati gli item emessi dal primo Observable, mentre ogni 3 secondo viene stampato anche quello emesso dal secondo (si vedano i secondi 40 e 43).

Varianti di tale operatore ammettono come parametri, invece che degli Observable, oggetti di tipo List di Observable, Iterable  di ObservableArray di Observable o addirittura Observable diObservable.

Alla stessa famiglia appartiene l’operatore MergeDelayError che, come Merge, unisce gli Observable sorgenti in un unico Observable, ma differisce ogni errore sollevato dagli Observable al momento in cui tutti gli Observable, che non hanno emesso errori, inviano il segnale di completamento. In altri termini tale operatore consente a tutti gli Observable di terminare differendo ogni possibile errore alla chiusura di tutti gli Observable. Ad esempio il codice seguente:

stamperà gli interi da 1 a 5 emessi dal primo Observable, e solamente dopo 5 secondi restituirà l’eccezione sollevata dal secondo Observable:

StartWith

L’operatore StartWith molto banalmente emette una specifica sequenza di elementi prima di iniziare a emettere gli elementi emessi dall’Observable sorgente. Esistono tre varianti dell’operatore che ricevono rispettivamente come input un singolo item, in Iterable o un Observable. La tabella seguente mostra un esempio di utilizzo tali varianti ed il relativo output prodotto.

 
 

SwitchOnNext e SwitchOnNextDelayError

Tale operatore converte un Observable che emette più Observable in un unico Observable che emette gli elementi emessi dall’ultimo emesso di tali Observable. Ogni volta che osserva uno degli Observable emessi, l’Observable restituito da Switch annulla l’osservazione emessa in precedenza ed inizia a emettere elementi dall’ultima Observable.

In altri termini l’operatore riceve in input un Observable che emette Observable. Quando il primo di questi Observable emette item, l’operatore restituisce tali elementi, fino a quando viene emesso un secondo Observable. A questo punto chiude la sottoscrizione col primo Observable e comincia a seguire il secondo, riproponendo in output i suoi elementi.

Si consideri ad esempio il seguente codice sorgente:

L’operatore SwitchOnNext riceve in input un Observable che emette ogni 5 secondi un nuovo Observable che emette interi con una frequenza di uno al secondo. Poichè l’ultimo Observable emesso annulla la sottoscrizione al precedente, l’output generato sarà il seguente, in cui si nota che ogni 5 secondi (circa) la sequenza ricomincia daccapo:

La variante SwitchOnNextDelayError si comporta in modo analogo a quanto faceva il metodo MergeDelayError, ovvero ritarda l’emissione di qualsiasi errore da parte degli Observable al momento della chiusura di tutti gli Observable.

Lascia un commento

Il tuo indirizzo email non sarà pubblicato.

*