Nel precedente articolo Primi Passi con RxJava (parte 1) abbiamo solamente accennato al concetto di operatore, dicendo che, nella specifica ReactiveX giocano il ruolo dei Processor
definiti nella specifica reactive streams. Gli operatori consentono di manipolare i dati emessi da un Observable
generando, nella maggior parte dei casi, un nuovo Observable. Ciò consente di applicare gli operatori uno di seguito all’altro, in una catena, in cui ogni operatore della catena modifica l’Observable
che risulta dall’applicazione dell’operatore precedente.
RxJava dispone di una vasta collezione di operatori molto utili, che sono classificati in base allo scopo che hanno. E’ possibile ottenere qualsiasi flusso di dati complesso combinando più operatori insieme. La lista completa degli operatori è disponibile qui, e si dividono in diverse categorie, le più importanti delle quali sono:
- Operatori che generano nuovi
Observable
(Creating Observable). - Operatori che trasformano gli item emessi da un
Observable
(Transforming Observable). - Operatori che emettono selettivamente gli item ottenuti da una fonte
Observable
(Filtering Observable). - Operatori che lavorano con
Observable
di più fonti per creare un unicoObservable
(Combining Observable). - Operatori che aiutano a gestire le notifiche di errore da un
Observable
(Error Handling Operator). - Operatori che eseguono valutazioni condizionali uno o più
Observable
o sugli elementi emessi daObservable
(Conditional and Boolean Operator). - Operatori che operano sull’intera sequenza di elementi emessi da un
Observable
(Mathematical and Aggregate Operator).
Alla prima classe appartengono operatori come create
, just
, fromArray
, etc., che sono stati già introdotti nel precedente articolo Primi Passi con RxJava (parte 1).
Nel seguito dell’articolo ed in quelli che seguiranno vedremo alcuni degli operatori più interessanti, raggruppati per categoria.
Transforming Observable
Buffer
Il buffer è un operatore che raccoglie gli elementi emessi da un Observable
conservandoli in un batch che viene poi emesso in un’unica soluzione. Il metodo è disponibile con diverse firme, per semplicità riportiamo alcuni semplici esempi di utilizzo. Una prima possibilità è di indicare la dimensione del buffer, ovvero il numero di item che può ospitare prima di riemetterli:
1 |
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9).buffer(3).subscribe( System.out::println ); |
java.util.List
ognuno di dimensione 3:
1 2 3 |
[1, 2, 3] [4, 5, 6] [7, 8, 9] |
Una seconda applicazione prevede di indicare il numero di item che devono essere trascurati prima di iniziare la generazione di un nuovo buffer:
1 |
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9).buffer( 3, 4).subscribe( System.out::println ) |
In questo caso la dimensione del buffer è ancora 3 ma un nuovo buffer deve essere generato ogni 4 item emessi. L’output è quindi il seguente, dove gli elementi 4 ed 8 sono esclusi dal buffer perchè ha dimensione 3 e perchè il buffer successivo inizia con l’emissione del quarto item.
1 2 3 |
[1, 2, 3] [5, 6, 7] [9] |
Si noti che se buffer( n, n )
è equivalente a buffer( n )
, e che se scelgo buffer( n, m )
con n>m
allora i buffer risulteranno sovrapposti. Ad esempio:
1 |
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9).buffer( 3, 2).subscribe( System.out::println ); |
1 2 3 4 5 |
[1, 2, 3] [3, 4, 5] [5, 6, 7] [7, 8, 9] [9] |
Consideriamo infine un tipo di buffer che emette i valori collezionati in una nuova lista ad ogni intervallo di tempo specificato:
1 2 3 4 5 |
Observable.interval(1, TimeUnit.SECONDS).buffer( 4, TimeUnit.SECONDS ).subscribe( e -> System.out.println( LocalDateTime.now() + " " + e.toString() ), t -> t.printStackTrace(), () -> System.out.println("DONE"), e -> System.out.println("SUBSCRIBED") ); |
In questo caso l’Observable
emette un nuovo item ad ogni secondo, mentre il buffer conserva gli item ricevuti per 4 secondi prima di riemetterli. L’output generato è il seguente, in cui ogni 4 secondi è stampata una nuova lista:
1 2 3 4 |
2019-09-10T14:25:56.180 [0, 1, 2, 3] 2019-09-10T14:26:00.159 [4, 5, 6, 7] 2019-09-10T14:26:04.160 [8, 9, 10, 11] 2019-09-10T14:26:08.158 [12, 13, 14, 15] |
Map e FlatMap
Sono entrambe operatori che modificano gli elementi emessi da un Observable
, ma mentre il primo trasforma un elemento alla volta restituendolo immediatamente, l’operatore flatMap
restituisce un Observable
per ogni elemento ricevuto. Un semplice esempio di utilizzo dell’operatore map()
è il seguente, in cui ogni intero ricevuto viene raddoppiato:
1 |
Observable.just(1, 2, 3, 4, 5).map( x -> x*2 ).subscribe( System.out::println ); |
1 2 3 4 5 |
2 4 6 8 10 |
Nel caso dell’operatore flatMap()
deve essere restituito un nuovo Observable
per ogni elemento quindi una possibile analoga codifica dello stesso problema visto sopra è la seguente:
1 |
Observable.just(1, 2, 3, 4, 5).flatMap( x -> Observable.just( x * 2 ) ).subscribe(System.out::println); |
Di per se questo esempio non è particolarmente caratteristico del comportamento dell’operatore. Consideriamo invece l’esempio seguente, in cui la println()
è stata introdotta al solo scopo di rendere l’output leggibile:
1 2 3 4 |
Observable.just(1, 2, 3, 4, 5).flatMap( x -> { System.out.println(); return Observable.just( x * 2, x * 3, x * 4 ); }).subscribe(x -> System.out.print(x + " ")); |
In questo caso ad ognuno degli interi emessi dal primo operatore just()
sono applicate tutte le funzioni restituite dal secondo operatore just()
, applicate nell’ordine. L’output prodotto è:
1 2 3 4 5 |
2 3 4 4 6 8 6 9 12 8 12 16 10 15 20 |
dove la prima riga è ottenuta moltiplicando 1 per 2, 3 e 4, la seconda moltiplicando il 2, e così via fino alla quinta riga ottenuta moltiplicando il 5.
GroupBy
Tale operatore dividere unObservable
in un insieme di Observable
che emettono ciascuno un gruppo di elementi diverso dall’Observable
originale ed organizzati per chiave. Consideriamo il seguente esempio:
1 2 3 4 5 |
Observable.just(1, 2, 3, 4, 5) .groupBy( x -> x / 2 ).subscribe( g -> { System.out.println( "Key: " + g.getKey() ); g.subscribe( x -> System.out.println( x ) ); }); |
il metod groupBy()
riceve come input la funzione che sarà utilizzata per estrarre la chiave sulla base della quale gli elementi saranno suddivisi. Nel nostro caso gli interi da 1 a 5 vengono smistati su tre Observable
sulla base del risultato della loro divisione intera per 2. Risultato che potrà essere esclusivamente uno dei valori 0, 1 o 2, che sono appunto le chiavi che distinguono i tre Observable
. L’output prodotto sarà quindi:
1 2 3 4 5 6 7 8 |
Key: 0 1 Key: 1 2 3 Key: 2 4 5 |
Scan
L’operatore scan applica una funzione di accumulazione al primo elemento emesso dall’Observable
e quindi emette il risultato della funzione come prima emissione. Ad ogni emissione successiva il risultato della precedente applicazione della funzione, il valore accumulato, è applicato insieme al nuovo item emesso alla funzione di accumulazione. Consideriamo il seguente codice di esempio:
1 |
Observable.just(1, 2, 3, 4, 5).scan( (x,y) -> x+y ).subscribe( System.out::println ); |
1 2 3 4 5 |
1 3 6 10 15 |
initialValue
dell’operatore scan()
.
1 |
Observable.just(1, 2, 3, 4, 5).scan( 5, (x,y) -> x+y ).subscribe( System.out::println ); |
Window
L’operatore Window è molto simile a Buffer, ma anziché emettere in una unica soluzione un pacchetto di elementi dall’Observable
di origine, emette di volta in volta un nuovo Osservable
, ognuno dei quali emette un sottoinsieme di elementi dall’Observable
di origine.
Come Buffer anche Window ha molte varietà, ognuna con il suo modo di suddividere gli elementi dell’Observable
originale negli Observable
risultanti, ognuno dei quali rappresenta appunto una “finestra” sugli oggetti emessi originariamente. Nella terminologia dell’operatore:
- quando si apre una finestra significa che viene emesso un nuovo
Observable
che inizierà ad emettere gli item emessi dall’Observable
sorgente; - quando invece una finestra si chiude significa che l’
Observable
emesso smette di emettere elementi dall’Observable
di origine e termina con una notifica onCompleted ai suoi osservatori.
Un primo semplice esempio di applicazione dell’operatore è il seguente, in cui la finestra è definita in termini del numero di elementi che può contenere.
1 2 3 4 5 6 |
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9) .window( 3 ) .subscribe( o -> { System.out.println( "---" ); o.subscribe( System.out::println ); }); |
Observable
ognuno dei quali conterrà tre elementi dell’Observable
originale, come mostrato dall’output seguente:
1 2 3 4 5 6 7 8 9 10 11 12 |
--- 1 2 3 --- 4 5 6 --- 7 8 9 |
1 2 3 4 5 6 |
Observable.interval(1, TimeUnit.SECONDS) .window( 5, TimeUnit.SECONDS ) .subscribe( o -> { System.out.println( "---" ); o.subscribe( x -> System.out.println( LocalDateTime.now() + " " + x ) ); }); |
l’operatore definisce una finestra di 5 secondi, al termine dei quali è chiuso l’Observable
corrente ed aperto un nuovo Observable
, come è possibile verificare dall’output prodotto:
1 2 3 4 5 6 7 8 9 10 11 12 |
--- 2019-09-24T14:42:39.607 0 2019-09-24T14:42:40.578 1 2019-09-24T14:42:41.581 2 2019-09-24T14:42:42.579 3 2019-09-24T14:42:43.579 4 --- 2019-09-24T14:42:44.583 5 2019-09-24T14:42:45.583 6 2019-09-24T14:42:46.581 7 2019-09-24T14:42:47.583 8 2019-09-24T14:42:48.582 9 |