Primi Passi con RxJava (parte 5)

In questo quarto post dedicato al framework RxJava descriveremo gli operatori della famiglia degli Utility Operators, ovvero di quegli operatori di utilità generale che consentono di operare sugli Observable.

Observable Utility Operators

Delay

L’operatore Delay modifica l’Observable a cui è applicato ritardando l’emissione degli elementi emessi, introducendo una pausa per un periodo di tempo specificata dall’utente. Ciò ha l’effetto di spostare l’intera sequenza di elementi emessi dall’Observable in avanti nel tempo.

La variante più semplice dell’operatore richiede come parametro esclusivamente la durata della pausa, definita in termini di quantità di tempo ed unità di misura applicata:

In questo caso ogni volta che l’Observable sorgente emette un item, il Delay avvia un timer e quando il timer raggiunge la durata specificata, l’Observable restituito dal Delay emette lo stesso item. L’effetto del delay si manifesta anche sull’evento onCompleted, che risulta quindi ritardato. Diversamente il ritardo applicato non ha effetto su una notifica onError, che viene immediatamente inoltrata causando la perdita di tutti gli eventuali item in sospeso. Esiste comunque una variante che riceve come ulteriore parametro un boolean delayError che se impostato a TRUE applica il ritardo anche ad un eventuale errore.

Una variante dell’operatore consente di definire un ritardo differente per ogni item emesso. Tale variante richiede in input una funzione che genera un nuovo Observable che si comporta da timer associato all’item emesso. Non appena tale Observable emette un item, l’item associato è emesso dal Delay. Si consideri il seguente esempio:

La funzione di input all’operatore genera un Observable che emette il suo unico item 0 dopo un intervallo di tempo in secondi pari all’item emesso dall’Observable sorgente. In conseguenza di ciò l’item 3, ad esempio, subisce un ritardo di 3 secondi, come mostrato dall’output generato.

Do

Tale operatore consente di registrare un’azione da eseguire a seguito di diversi eventi che possono caratterizzare il ciclo di vita di un Observable. Esistono diverse implementazioni dell’operatore in funzione dell’evento che si intende osservare:

doAfterNext Invoca un Consumer di input dopo aver emesso ogni item.
doAfterTerminate Incoca l’Action di input quando l’Observable sorgente emette un evento onComplete o onError.
doFinally Incoca l’Action di input quando l’Observable sorgente emette un evento onComplete o onError, oppure quando un osservatore chiude la sottoscrizione. Si noti che l’action deve essere thread-safe.
doOnDispose Incoca l’Action di input quando osservatore chiude la sottoscrizione. Si noti che l’action deve essere thread-safe.
doOnComplete Incoca l’Action di input quando l’Observable sorgente emette un evento onComplete.
doOnEach Esistono due variante dell’operatore. Una invoca un Consumer ad ogni item emesso, mentre l’altra notifica l’evento ad un Observer.
doOnError Incoca l’Action di input quando l’Observable sorgente emette un evento onError.
doOnLifecycle Invoca un Consumer di input ad ogni richiesta di sottoscrizione e l’Action di input ad ogni annullamento della sottoscrizione.
doOnNext Incoca l’Action di input quando l’Observable sorgente emette un evento onNext.
doOnSubscribe Invoca il Consumer di input ad ogni richiesta di sottoscrizione.
doOnTerminate Incoca l’Action di input quando l’Observable sorgente emette un evento onComplete.

Il codice seguente utilizza diversi dei metodi descritti, generando l’output riportato di fianco:

Materialize/Dematerialize

L’operatore Materialize traduce la serie di eventi emessi dall’Observable sorgente onNext, onCompleted e onError, in una serie di oggetti di tipo io.reactivex.Notification che sono emessi dall’Observable risultante. L’operatore Dematerialize esegue l’operazione inversa. L’operatore Materialize è utile sostanzialmente quando si preferisce gestire tutti gli eventi emessi dallo stream in modo uniforme, senza dover implementare un handler diverso per ciascuno di essi. Si consideri ad esempio il seguente codice sorgente:

l’output prodotto sarà:

SubscribeOn e ObserveOn

Questi operatori sono utili quando si intende utilizzare RxJava in un progetto multithreading. Per default un Observable, e la catena di operatori applicati, svolgeranno il loro lavoro e notificano ai loro osservatori, nello stesso thread su cui viene invocato il metodo subscribe(). L’operatore SubscribeOn modifica tale comportamento specificando un altro Scheduler su cui l’Osservable andrà ad operare. Analogamente l’operatore ObserveOn specifica un diverso Scheduler che l’Observable utilizzerà per inviare notifiche ai propri osservatori.

Consideriamo ad esempio il codice seguente:

L’output prodotto, mostrato di seguito, dimostra che lo stream di eventi viene prodotto e processato all’interno del thread del metodo main().

Spesso però può avere senso delegare le operazioni ad un thread differente. Si pensi ad esempio ad operazioni di I/O eseguite sugli item emessi, che comporterebbero il blocco del thread principale ad ogni accesso I/O. A tale scopo l’operatore SubscribeOn consente di indicare, all’Observable sorgente, quale thread si occuperà di gestire l’emissione ed il processamento degli item. Modifichiamo quindi il codice nel seguente modo:

La classe Scheduler contiene diversi factory method per oggetti di tipo Thread. Nel nostro esempio viene generato un nuovo thread che si occuperà esclusivamente di gestire le emissioni ed il loro processamento. Conseguentemente l’output prodotto sarà:

In cui si nota che è utilizzato il thread  RxNewThreadScheduler-1 differente da quello associato al main.

L’operatore ObserveOn consente, invece, di svincolare le operazioni di emissione e processamento degli item emessi, in modo che ciascuna di tali operazioni sia gestita da thread differente. Modifichiamo il nostro codice di esempio nel seguente modo:

L’output prodotto mostra che i thread sono diventati due: il thread RxNewThreadScheduler-1 che gestisce le emissioni, ed il thread RxNewThreadScheduler-2 che invece gestisce il processamento lato Subscriber.

Ulteriori approfondimenti sull’utilizzo di tali operatori potete trovarlo nell’articolo: Understanding RxJava subscribeOn and observeOn.

TimeInterval

L’operatore TimeInterval intercetta gli elementi emessi dall’Observable sorgente ed emette, al loro posto, oggetti di tipo io.reactivex.schedulers.Timed, che indicano la quantità di tempo che è trascorso tra le successive emissioni. Consideriamo il seguente codice:

l’output prodotto sarà:

Una variante dell’operatore consente di indicare l’unità di misura con cui si intende campionare gli intervalli (e.g. secondi, minuti, etc.). Si noti comunque che l’oggetto Timed attraverso il metodo time() consente di eseguire la conversione ad una qualsiasi altra unità.

Timeout

L’operatore Timeout consente di interrompere un Observable con una segnalazione onError se tale Observable non emette alcun elemento durante un periodo di tempo specificato. In altri termini l’operatore riemette gli item ricevuti, ma per ognuno di essi avvia un timer, e nel caso in cui tale timer scada senza che la sorgente abbia emesso il successivo item, l’operatore solleva una eccezione di tipo java.util.concurrent.TimeoutException.

Una seconda variante dell’operatore Timeout differisce dalla prima in quanto invece di emettere una notifica di errore in caso di una condizione di timeout, passa invece immediatamente a un osservabile di backup specificato.

Timestamp

L’operatore Timestamp allega un timestamp a ciascun elemento emesso dalla sorgente Observable prima di riemetterlo nella sequenza. Il timestamp indica a che ora è stato emesso l’elemento. Si noti che l’oggetto emesso è di tipo io.reactivex.schedulers.Timed come per l’operatore TimeInterval, ma ovviamente rappresenta un concetto differente. Ad esempio il seguente codice:

genera l’output:

Using

L’operatore Using consente di creare un Observable a partire da una risorsa (usa e getta) che esiste solamente per la durata della vita dell’Observable e che viene eliminata al termine dello stesso. Tale operatore riceve come input tre parametri:

  • resourceSupplier: una factory function utilizzata per istanziare la risorsa;
  • sourceSupplier: una factory function utilizzata per generare l’Observable a partire dalla risorsa data;
  • disposer: una funzione utilizzata del distruggere la risorsa.

Quando un osservatore si iscrive all’Observable restituito dall’operatore Using, l’operatore utilizzerà la funzione sourceSupplier per creare l’Observable che l’osservatore osserverà, mentre allo stesso tempo utilizzerà la funzione resourceSupplier per creare qualsiasi risorsa di cui si ha bisogno. Quando l’osservatore annulla l’iscrizione all’Observable o quando l’Observable termina (normalmente o con un errore), l’operatore Using chiamerà la terza funzione, disposer, per distruggere la risorsa che ha creato.

Consideriamo il seguente codice sorgente:

La funzione resourceSupplier genera la stringa "10,13,54,23,15", che è poi passata alla funzione sourceSupplier che esegue uno split e restituisce un Observable che emette la lista "10", "13", "54", "23", "15". La funzione disposer è infine invocata al termine della sequenza emessa, a seguito dell’evento onComplete. L’output prodotto sarà quindi:

 

How useful was this post?

Click on a star to rate it!

Average rating 0 / 5. Vote count: 0

No votes so far! Be the first to rate this post.

As you found this post useful...

Follow us on social media!