Primi Passi con RxJava (parte 2)

Nel precedente articolo Primi Passi con RxJava (parte 1) abbiamo solamente accennato al concetto di operatore, dicendo che, nella specifica ReactiveX giocano il ruolo dei Processor definiti nella specifica reactive streams. Gli operatori consentono di manipolare i dati emessi da un Observable generando, nella maggior parte dei casi, un nuovo Observable. Ciò consente di applicare gli operatori uno di seguito all’altro, in una catena, in cui ogni operatore della catena modifica l’Observable che risulta dall’applicazione dell’operatore precedente.

RxJava dispone di una vasta collezione di operatori molto utili, che sono classificati in base allo scopo che hanno. E’ possibile ottenere qualsiasi flusso di dati complesso combinando più operatori insieme. La lista completa degli operatori è disponibile qui, e si dividono in diverse categorie, le più importanti delle quali sono:

  • Operatori che generano nuovi Observable (Creating Observable).
  • Operatori che trasformano gli item emessi da un Observable (Transforming Observable).
  • Operatori che emettono selettivamente gli item ottenuti da una fonte Observable (Filtering Observable).
  • Operatori che lavorano con Observable di più fonti per creare un unico Observable (Combining Observable).
  • Operatori che aiutano a gestire le notifiche di errore da un Observable (Error Handling Operator).
  • Operatori che eseguono valutazioni  condizionali uno o più Observable o sugli elementi emessi da Observable (Conditional and Boolean Operator).
  • Operatori che operano sull’intera sequenza di elementi emessi da un Observable (Mathematical and Aggregate Operator).

Alla prima classe appartengono operatori come create, just, fromArray, etc., che sono stati già introdotti nel precedente articolo Primi Passi con RxJava (parte 1).

Nel seguito dell’articolo ed in quelli che seguiranno vedremo alcuni degli operatori più interessanti, raggruppati per categoria.

Transforming Observable

Buffer

Il buffer è un operatore che raccoglie gli elementi emessi da un Observable conservandoli in un batch che viene poi emesso in un’unica soluzione. Il metodo è disponibile con diverse firme, per semplicità riportiamo alcuni semplici esempi di utilizzo. Una prima possibilità è di indicare la dimensione del buffer, ovvero il numero di item che può ospitare prima di riemetterli:

L’output generato è composto da tre oggetti di tipo java.util.List ognuno di dimensione 3:

Una seconda applicazione prevede di indicare il numero di item che devono essere trascurati prima di iniziare la generazione di un nuovo buffer:

In questo caso la dimensione del buffer è ancora 3 ma un nuovo buffer deve essere generato ogni 4 item emessi. L’output è quindi il seguente, dove gli elementi 4 ed 8 sono esclusi dal buffer perchè ha dimensione 3 e perchè il buffer successivo inizia con l’emissione del quarto item.

Si noti che se buffer( n, n ) è equivalente a buffer( n ), e che se scelgo buffer( n, m ) con n>m allora i buffer risulteranno sovrapposti. Ad esempio:

genera:

Consideriamo infine un tipo di buffer che emette i valori collezionati in una nuova lista ad ogni intervallo di tempo specificato:

In questo caso l’Observable emette un nuovo item ad ogni secondo, mentre il buffer conserva gli item ricevuti per 4 secondi prima di riemetterli. L’output generato è il seguente, in cui ogni 4 secondi è stampata una nuova lista:

Map e FlatMap

Sono entrambe operatori che modificano gli elementi emessi da un Observable, ma mentre il primo trasforma un elemento alla volta restituendolo immediatamente, l’operatore flatMap restituisce un Observable per ogni elemento ricevuto. Un semplice esempio di utilizzo dell’operatore map() è il seguente, in cui ogni intero ricevuto viene raddoppiato:

banalmente l’output prodotto è:

Nel caso dell’operatore flatMap() deve essere restituito un nuovo Observable per ogni elemento quindi una possibile analoga codifica dello stesso problema visto sopra è la seguente:

Di per se questo esempio non è particolarmente caratteristico del comportamento dell’operatore. Consideriamo invece l’esempio seguente, in cui la println() è stata introdotta al solo scopo di rendere l’output leggibile:

In questo caso ad ognuno degli interi emessi dal primo operatore just() sono applicate tutte le funzioni restituite dal secondo operatore just(), applicate nell’ordine. L’output prodotto è:

dove la prima riga è ottenuta moltiplicando 1 per 2, 3 e 4, la seconda moltiplicando il 2, e così via fino alla quinta riga ottenuta moltiplicando il 5.

GroupBy

Tale operatore dividere unObservable in un insieme di Observable che emettono ciascuno un gruppo di elementi diverso dall’Observable originale ed organizzati per chiave. Consideriamo il seguente esempio:

il metod groupBy() riceve come input la funzione che sarà utilizzata per estrarre la chiave sulla base della quale gli elementi saranno suddivisi. Nel nostro caso gli interi da 1 a 5 vengono smistati su tre Observable sulla base del risultato della loro divisione intera per 2. Risultato che potrà essere esclusivamente uno dei valori 0, 1 o 2, che sono appunto le chiavi che distinguono i tre Observable. L’output prodotto sarà quindi:

Scan

L’operatore scan applica una funzione di accumulazione al primo elemento emesso dall’Observable e quindi emette il risultato della funzione come prima emissione. Ad ogni emissione successiva il risultato della precedente applicazione della funzione, il valore accumulato, è applicato insieme al nuovo item emesso alla funzione di accumulazione. Consideriamo il seguente codice di esempio:

Considerando 0 come prima valore accumulato, all’emissione del numero 1 viene emesso 0 + 1 = 1. Successivamente all’emissione del numero 2 viene emesso 1 + 2 = 3. E così via fino a generare l’output seguente:
Eventualmente è possibile definire un valore iniziale per l’accumulatore valorizzando il parametro  initialValue dell’operatore scan().

Window

L’operatore Window è molto simile a Buffer, ma anziché emettere in una unica soluzione un pacchetto di elementi dall’Observable di origine, emette di volta in volta un nuovo Osservable, ognuno dei quali emette un sottoinsieme di elementi dall’Observable di origine.

Come Buffer anche Window ha molte varietà, ognuna con il suo modo di suddividere  gli elementi dell’Observable originale negli Observable risultanti, ognuno dei quali rappresenta appunto una “finestra” sugli oggetti emessi originariamente. Nella terminologia dell’operatore:

  • quando si apre una finestra significa che viene emesso un nuovo Observable che inizierà ad emettere gli item emessi dall’Observable sorgente;
  • quando invece una finestra si chiude significa che l’Observable emesso smette di emettere elementi dall’Observable di origine e termina con una notifica onCompleted ai suoi osservatori.

Un primo semplice esempio di applicazione dell’operatore è il seguente, in cui la finestra è definita in termini del numero di elementi che può contenere.

Come risultato il codice emetterà tre nuovi Observable ognuno dei quali conterrà tre elementi dell’Observable originale, come mostrato dall’output seguente:
Un’applicazione alternativa dell’operatore consiste nel definire una finestra di tipo temporale. Ad esempio nel codice seguente:

l’operatore definisce una finestra di 5 secondi, al termine dei quali è chiuso l’Observable corrente ed aperto un nuovo Observable, come è possibile verificare dall’output prodotto:

 

How useful was this post?

Click on a star to rate it!

Average rating / 5. Vote count:

No votes so far! Be the first to rate this post.

As you found this post useful...

Follow us on social media!