Nel precedente post Primi Passi con RxJava (parte 2) abbiamo introdotto gli operatori disponibili nel framework RxJava e trattato i Transforming Observable, una tipologia di operatori che trasformato gli item emessi da un Observable
. Proseguiamo la serie di articoli dedicati al framework descrivendo gli operatori della famiglia Filtering Observable.
Filtering Observable
A questa categoria appartengono tutti gli operatori che filtrano selettivamente tutti gli item emessi da un Observable
.
Filter
Forse il più semplice degli operatori, il Filter emette esclusivamente quegli elementi che superano una determinata condizione di test. Ad esempio nel codice seguente:
1 2 3 |
Observable.just(1, 2, 3, 4, 5, 6) .filter( x -> x > 4 ) .subscribe( System.out::println ); |
5
e 6
della sequenza originale.
Debounce
Il Debounce è un tipo di operatore che emette un oggetto da un Observable
solo se un determinato lasso di tempo è passato senza che esso abbia emesso un altro oggetto. Questo operatore è particolarmente utile quando l’Observable
emette item molto rapidamente ma si è interessati a riceverli in modo cadenzato. Consideriamo il seguente esempio:
1 2 3 |
Observable.interval(3, TimeUnit.SECONDS) .debounce(2500, TimeUnit.MILLISECONDS) .subscribe( x -> System.out.println(LocalDateTime.now() + " " + x.toString() ) ); |
Il metodo interval()
emette gli interi ad intervalli di 3 secondi mentre l’operatore debounce()
li accetta solamente se sono emessi con intervallo di 2.5 secondi. In questo banale caso tutti gli item sono ammessi, ma se aumentassimo l’intervallo di debounce a 3 secondi o più, gli item verrebbero tutti ignorati.
Distinct e DistinctUntilChanged
L’operatore Distinct filtra un Observable
consentendo il transito dei soli item che non sono già stati emessi. Un esempio del suo utilizzo è il seguente:
1 2 3 |
Observable.just(1, 2, 2, 3, 4, 2, 3, 4, 5, 1) .distinct() .subscribe( System.out::println ); |
Sebbene il metodo just() emetta la sequenza [1, 2, 2, 3, 4, 2, 3, 4, 5, 1]
, l’output generato è [1, 2, 3, 4, 5]
dove gli elementi duplicati sono stati eliminati.
Esiste anche una variante dell’operatore che accetta una funzione come parametro, detta key selector function. Questa funzione opera sugli oggetti emessi dall’Osservable
per generare una chiave. Sono le chiavi, quindi, e non gli item stesse, ad essere confrontate per determinare se due elementi sono distinti o meno. Modifichiamo il precedente esempio introducendo come key selector la funzione x % 2
che genera le chiavi 0 ed 1:
1 2 3 |
Observable.just(1, 2, 2, 3, 4, 2, 3, 4, 5, 1) .distinct( x -> x % 2 ) .subscribe( System.out::println ); |
l’output generato sarà la sequenza [1, 2]
, che corrispondono ai primi due elementi nella sequenza di input che hanno resto pari ad 0 ed 1 quando divisi per 2.
Un operatore simile al Distinct è il DistinctUntilChange il quale confronta esclusivamente l’item emesso con il precedente al fine di determinare se sono distinti o meno. Ad esempio il seguente codice:
1 2 3 |
Observable.just(1, 2, 2, 3, 4, 4, 4, 4, 5, 1) .distinctUntilChanged() .subscribe( System.out::println ); |
genera la sequenza di interi [1, 2, 3, 4, 5, 1]
. Anche per tale operatore il framework prevede la variante con key selector function che lavora esattamente come visto per Distinct. Diversamente da quest’ultimo però, DistinctUntilChange ammette anche come parametro un comparator, di tipo io.reactivex.functions.BiPredicate
, utilizzato per confrontare gli elementi. Consideriamo ad esempio il codice seguente:
1 2 3 4 5 6 |
Observable.just(1, 2, 2, 3, 4, 4, 4, 4, 5, 1) .distinctUntilChanged( (x, y) -> { System.out.println( x + " < " + y + " ? " + (x.compareTo(y) < 0) ); return x.compareTo(y) < 0; } ) .subscribe( e -> System.out.println( "Emiting: " + e ) ); |
La funzione di confronto restituisce TRUE
quando l’elemento precedente è minore dell’elemento corrente e FALSE
in caso contrario. Considerando che il risultato TRUE
implica che gli elementi sono considerati uguali, comportando l’esclusione dell’elemento corrente, l’output prodotto sarà il seguente:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
Emiting: 1 1 < 2 ? true 2 < 2 ? false Emiting: 2 2 < 3 ? true 3 < 4 ? true 4 < 4 ? false Emiting: 4 4 < 4 ? false Emiting: 4 4 < 4 ? false Emiting: 4 4 < 5 ? true 5 < 1 ? false Emiting: 1 |
ElementAt
Molto banalmente l’operatore ElementAt emette esclusivamente l’item n-esimo ricevuto dall’Observable
. Considerando che la sequenza di input è indicizzata caratterizzando il primo elemento con l’indice 0
(zero-based indexing), il codice seguente:
1 2 3 |
Observable.just(1, 2, 3, 4, 5, 6) .elementAt( 3 ) .subscribe( System.out::println ); |
restituirà esclusivamente l’elemento 4
. Di fatto l’oggetto emesso dall’operatore elementAt()
è un Observable
di tipo Maybe
, infatti nel caso in cui l’indice sia superiore al numero di elementi emessi, nessun item viene restituito in output. Si provi ad esempio a sostituire nel codice precedente il 3 con 6 od un valore superiore. In alternativa è possibile utilizzare la variante elementAtOrError()
che nel caso in cui l’elemento non esista solleva l’eccezione java.util.NoSuchElementException
.
Infine, una variante dell’operatore elementAt()
ammette un secondo parametro che rappresenta un valore di default emesso nel caso l’elemento richiesto non esista. In questa accezione l’Observable
generato sarà di tipo Single
.
First e Last
Se si è interessati esclusivamente a catturare il primo elemento emesso da un Osservable
è possibile utilizzare l’operatore First. Il framework mette a disposizione diverse varianti dell’operatore:
first(T defaultItem) |
Emette il primo valore della sequenza di input o il valore di default se tale valore non esiste.
|
||
firstElement() |
Emette il primo valore della sequenza di input o nessun valore. Di fatto corrisponde ad un elementAt(0) , quindi restituisce un oggetto di tipo Maybe .
|
||
firstOrError() |
Restituisce il primo elemento della sequenza o solleva una eccezione di tipo java.util.NoSuchElementException se non esiste.
|
In modo del tutto analogo, l’operatore Last restituisce l’ultimo elemento della sequenza di input. Anche per tale operatore esistono tre varianti che sono duali a quelle viste sopra per l’operatore First, ovvero: last(T defaultItem)
, lastElement()
e lastOrError()
.
IgnoreElements
L’operatore IgnoreElements sopprime tutti gli elementi emessi dall’Osservable
di origine, ma consente alle sue notifiche di terminazione, onError
o onCompleted
, di transitare inalterata. Ciò implica che il segnale onNext
del subscriber non verrà mai invocato.
1 2 3 |
Observable.just(1, 2, 3, 4, 5, 6) .ignoreElements() .subscribe(System.out::println); |
Sample, ThrottleFirst e ThrottleLast
L’operatore Sample esamina periodicamente un Observable ed emette qualsiasi elemento emesso più di recente a partire dal campionamento precedente. Consideriamo ad esempio il codice seguente:
1 2 3 |
Observable.interval( 1, TimeUnit.SECONDS) .sample( 2, TimeUnit.SECONDS ) .subscribe(System.out::println); |
L’Observable
emette gli interi al ritmo di 1 al secondo ma l’operatore sample()
campiona con una frequenza di 2 secondi. Questo implica che solamente un elemento su due emessi vengono intercettati, generando in output la sequenza 0, 2, 4, 6, ...
, etc. E’ possibile anche indicare, attraverso un terzo parametro emitLast
, di tipo boolean
, se deve essere emesso l’ultimo item ricevuto quando lo stream termina, ignorando completamente l’intervallo di campionamento.
Un operatore simile è ThrottleFirst che, invece di emettere l’ultimo elemento ricevuto nella finestra dell’intervallo specificato, restituisce il primo emesso in tale intervallo. A fronte di ciò, il seguente codice:
1 2 3 |
Observable.interval( 1, TimeUnit.SECONDS) .throttleFirst( 2500, TimeUnit.MILLISECONDS ) .subscribe(System.out::println); |
0, 3, 6, 9, ...
, etc.
In modo assolutamente analogo funziona l’operatore ThrottleLast, il quale però restituisce l’ultimo elemento ricevuto nella finestra dell’intervallo considerato. Tale comportamente è equivalente al Sample, infatti la sua implementazione nel framework corrisponde all’invocazione di tale operatore.
1 2 3 |
public final Observable<T> throttleLast(long intervalDuration, TimeUnit unit) { return sample(intervalDuration, unit); } |
Skip, SkipLast, SkipWhile e SkipUntil
L’operatore Skip consente di ignorare i primi n
elementi emessi da un Osservable
e trasmettere esclusivamente quelli che seguono. Ad esempio il codice:
1 2 3 |
Observable.just( 0, 1, 2, 3, 4, 5, 6, 7 ) .skip(2) .subscribe(System.out::println); |
0
ed 1
e stampa in output la sequenza 2, 3, 4, 5, 6, 7
.
Alla stessa famiglia appartiene l’operatore SkipLast che invece ritrasmette tutti gli elementi ad eccezione degli ultimi n
, per cui il seguente codice:
1 2 3 |
Observable.just( 0, 1, 2, 3, 4, 5, 6, 7 ) .skipLast(2) .subscribe(System.out::println); |
0, 1, 2, 3, 4, 5
.
Tali operatori possono accettare come parametro anche un intervallo di tempo per indicare il periodo di skip, invece del numero di elementi da ignorare. Ad esempio nel codice seguente:
1 2 3 |
Observable.interval( 1, TimeUnit.SECONDS) .skip( 3, TimeUnit.SECONDS ) .subscribe(System.out::println); |
gli item ricevuti entro i tre secondi vengono completamente ignorati, generando come output la sequenza 2, 3, 4, 5, 6, ...
, etc.
Di rilevante interesse è invece l’operatore SkipWhile, che ignora tutti gli elementi emessi dall’Osservable
fino a quando una determinata condizione risulta essere vera. Non appena la condizione è falsa tutti gli elementi successivi transitano senza problemi, ignorando quindi le successive valutazioni della condizione. Una semplice applicazione dell’operatore è la seguente, in cui gli interi emessi, con un intervallo di un secondo, vengono ignorati sino a quando l’Observeble
emette il valore 4, producendo l’output 4, 5, 6, ...
, etc.
1 2 3 |
Observable.interval( 1, TimeUnit.SECONDS) .skipWhile( x -> x < 4 ) .subscribe(System.out::println); |
Una ulteriore variante è l’operatore SkipUntil che scarta gli item emessi da un Observable
fino a quando un secondo Observable
emette un elemento. Da tale momento in poi l’operatore comincerà ad emettere tutti gli elementi ricevuti dall’Observable
sorgente. Si consideri ad esempio il codice seguente:
1 2 3 |
Observable.interval( 1, TimeUnit.SECONDS) .skipUntil( Observable.just(1).delay( 3, TimeUnit.SECONDS ) ) .subscribe(System.out::println); |
in questo caso l’Observable
originale emette interi ad intervalli di un secondo (partendo da 0) ma l’operatore skipUntil()
ignora tutti gli elementi per 3 secondi. Questo perché l’Observable
di input all’operatore è costruito in modo da emettere il solo valore 1 dopo 3 secondi dall’avvio dell’applicazione.
Take, TakeLast, TakeWhile e TakeUntil
Completamente duali agli operatori della famiglia Skip sono quelli della famiglia Take, descritti in questo paragrafo.
Attraverso l’utilizzo dell’operatore Take è possibile emettere solo i primi n
elementi emessi da un Osservable
e quindi terminare ignorando i restanti. Quindi volendo mantenere esclusivamente i primi due elementi emessi da una sorgente ignorando i rimanenti è possibile utilizzare il seguente codice:
1 2 3 |
Observable.just( 0, 1, 2, 3, 4, 5, 6, 7 ) .take(2) .subscribe(System.out::println); |
che di fatto produce in output la sequenza 0, 1
. Se invece si vuole invertire il comportamento mantenendo gli ultimi n
elementi della sequenza sorgente, è possibile utilizzare l’operatore TakeLast nel modo seguente:
1 2 3 |
Observable.just( 0, 1, 2, 3, 4, 5, 6, 7 ) .takeLast(2) .subscribe(System.out::println); |
6
e 7
, ignorando tutti i precedenti.
Analogamente a Skip e SkipLast anche Take e TakeLast sono operatori che possono ricevere come parametro un intervallo temporale che indica una finestra, a partire dall’inizio o dalla fine della sequenza, entro la quale accettare i valori emessi. Ad esempio nel codice seguente:
1 2 3 |
Observable.interval( 1, TimeUnit.SECONDS) .take( 3, TimeUnit.SECONDS ) .subscribe(System.out::println); |
0
, 1
e 2
, che sono quelli emessi dalla sorgente entro i primi 3 secondi.
L’operatore TakeWhile, accetta tutti gli elementi emessi dall’Osservable
sorgente fino a quando una determinata condizione risulta essere vera, ignorando tutti gli elementi emessi successivamente alla confutazione della condizione. Ad esempio il codice:
1 2 3 |
Observable.interval( 1, TimeUnit.SECONDS) .takeWhile( x -> x < 4 ) .subscribe(System.out::println); |
riproduce in output la sequenza 0, 1, 2, 3
, in quanto all’emissione dell’intero 4
la condizione diviene falsa.
Terminiamo descrivendo l’operatore TakeUntil che accetta tutti gli elementi ricevuti da un Observable
sorgente fino a quando un secondo Observable
inizia ad emettere elementi. Da tale momento in poi tutti i successivi elementi ricevuti sono ignorati. Nel codice seguente ad esempio, gli elementi 0
, 1
e 2
sono ritrasmessi, ma i successivi sono ignorati in quanto dopo 3 secondi l’Observable di input all’operatore emette il suo primo elemento.
1 2 3 |
Observable.interval( 1, TimeUnit.SECONDS) .takeUntil( Observable.just(1).delay( 3, TimeUnit.SECONDS ) ) .subscribe(System.out::println); |