In questo quarto post dedicato al framework RxJava descriveremo gli operatori della famiglia dei Mathematical and Aggregate Operators ovvero di quegli operatori che eseguono operazioni matematiche sugli Observable
o sugli item da essi emessi.
Alcuni degli operatori che andremo a descrivere non sono presenti nella versione 2 del framework RxJava. Fortunatamente sono ancora utilizzabili perché inseriti in un pacchetto esterno che richiede l’utilizzo di una ulteriore dipendenza.
1 2 3 4 5 |
<dependency> <groupId>io.reactivex</groupId> <artifactId>rxjava-math</artifactId> <version>1.0.0</version> </dependency> |
Si noti infine che con il rilascio della versione 2 di RxJava il package radice rx
è stato sostituito con io.reactivex
, e tutte le classi sono state rifattorizzate coerentemente. Questo è il motivo per cui in tutti gli esempi presentati che utilizzano la versione 1 del framework si fà riferimento alla classe rx.Observable
e non alla nuova classe io.reactivex.Observable
.
Come ultima osservazione si noti che, poiché alcuni di questi operatori devono attendere che la sorgente osservabile termini l’emissione degli elementi prima di poter costruire l’output (e di solito devono utilizzare dei buffer intermedi per farlo), questi operatori sono pericolosi da utilizzare su Observable
che generano sequenze molto lunghe o infinite di dati.
Mathematical and Aggregate Operators
Average
L’operatore Average opera su un Observable
che emette numeri (o elementi che possono essere valutati come numeri) ed genera un singolo valore, ovvero la media di tutti i numeri emessi dall’Observable
di origine. Di tale operatore esistono quattro differenti possibili implementazioni, in funzione del tipo di dato in input: averageDouble
, averageFloat
, averageInteger
e averageLong
. Un semplice esempio di utilizzo è il seguente:
1 2 3 |
rx.Observable<Integer> source = rx.Observable.just( 1, 12, 13, 24, 54, 61, 7 ); MathObservable.averageInteger( source ).subscribe( System.out::println ); |
Il quale genera come output il valore intero 24. Ulteriori varianti dell’operatore ricevono come input una funzione che è utilizzata per generare le quantità che saranno poi sottoposte all’operazione di media. Il codice necessario per l’utilizzo di tali variante è leggermente più complesso in quanto richiede la generazione di un oggetto di tipo MathObservable
a partire dall’Observable
sorgente:
1 2 3 4 5 6 7 8 9 10 11 |
rx.Observable<Integer> source = rx.Observable.just( 1, 12, 13, 24, 54, 61, 7 ); Func1<Integer, Integer> valueExtractor = new Func1<Integer, Integer>() { public Integer call(Integer t1) { return t1 / 2; } }; MathObservable.from( source ) .averageInteger( valueExtractor ) .subscribe( System.out::println ); |
Min e Max
Gli operatori Max e Min operano su un Observable
che emette numeri (o elementi che possono essere valutati come numeri) ed genera un singolo valore, rispettivamente il numero più grande tra quelli emessi o il più piccolo. Il seguente codice, ad esempio, stampa su console i valori 1 e 61.
1 2 3 4 |
rx.Observable<Integer> source = rx.Observable.just( 1, 12, 13, 24, 54, 61, 7 ); MathObservable.min( source ).subscribe( System.out::println ); MathObservable.max( source ).subscribe( System.out::println ); |
Una implementazione alternativa di tali operatori ammette l’utilizzo di un Comparator
per eseguire il confronto tra gli item emessi dall’Observable
:
1 2 3 4 5 6 7 8 9 10 11 |
rx.Observable<String> source = rx.Observable.just( "1", "12", "13", "24", "54", "61", "7" ); Comparator<String> comparator = new Comparator<String>() { public int compare(String o1, String o2) { return Integer.valueOf( o1 ).compareTo( Integer.valueOf( o2 ) ); } }; MathObservable.from( source ) .min(comparator) .subscribe( System.out::println ); |
Sum
L’operatore Sum opera su un Observable
che emette numeri (o elementi che possono essere valutati come numeri) ed genera un singolo valore, che è la somma di tutti gli elementi emessi. Di tale operatore esistono quattro differenti possibili implementazioni, in funzione del tipo di dato in input: sumDouble
, sumFloat
, sumInteger
e sumLong
. Un semplice esempio di utilizzo è il seguente:
1 2 3 |
rx.Observable<Integer> source = rx.Observable.just( 1, 12, 13, 24, 54, 61, 7 ); MathObservable.sumInteger( source ).subscribe( System.out::println ); |
Il quale genera come output il valore intero 172. Come nel caso dell’operatore Average esistono varianti dell’operatore che ricevono come input una funzione la quale è utilizzata per generare le quantità che saranno poi sottoposte a somma.
1 2 3 4 5 6 7 8 9 10 11 |
rx.Observable<Integer> source = rx.Observable.just( 1, 12, 13, 24, 54, 61, 7 ); Func1<Integer, Integer> valueExtractor = new Func1<Integer, Integer>() { public Integer call(Integer t1) { return t1 / 2; } }; MathObservable.from( source ) .sumInteger( valueExtractor ) .subscribe( System.out::println ); |
Count
L’operatore Count trasforma un Observable
che emette item in un nuovo Observable
che emette un singolo valore, il quale rappresenta il numero di elementi emessi dall’Observable
sorgente. Se la sorgente termina con un errore, l’operatore trasmetterà la notifica di errore senza emettere alcun elemento. Se invece la sorgente osservabile non termina affatto, Count non emetterà alcun item né terminerà.
1 2 3 |
Observable.just( 1, 12, 13, 24, 54, 61, 7 ) .count() .subscribe( System.out::println ); |
Reduce
L’operatore Reduce applica una funzione, detta di accumulazione, al primo elemento emesso dall’Observable
di origine e quindi utilizza il risultato per applicare la medesima funzione al secondo elemento emesso, continuando questo processo fino a quando l’Observable
di origine emette l’elemento finale ed invia il segnale di completamento. A questo punto l’Observable
restituito da Reduce emette il valore finale restituito dalla funzione. L’operatore è implementato in una doppia versione, una che richiede come parametro il valore iniziale della funzione di accumulazione, e una seconda che non lo richiede sottintendendo il valore 0
.
1 2 3 |
Observable.just( 1, 12, 13, 24, 54, 61, 7 ) .reduce( 10, (x,y) -> x + y ) .subscribe( System.out::println ); |
L’inizializzazione della funzione di accumulazione è possibile anche attraverso l’utilizzo di un oggetto di tipo java.util.concurrent.Callable
.
1 2 3 |
Observable.just( 1, 12, 13, 24, 54, 61, 7 ) .reduceWith( new RandomCollable(), (x,y) -> x + y ) .subscribe( System.out::println ); |