Secondo le intenzioni di Oracle dietro la scelta architetturali che hanno portato all’introduzione degli Stream (vedi Collection e Stream in Java) e delle espressioni Lambda (vedi Programmazione Funzionale in Java) in Java 8 c’è la possibilità di utilizzare l’elaborazione parallela. In questo breve articolo vediamo come si realizza e quali ne sono le limitazioni.
Parallelismo
Il calcolo parallelo comporta la suddivisione di un problema in sottoproblemi. Risolvendo contemporaneamente questi problemi (in parallelo, con ogni sottoproblema in esecuzione in un thread separato) e combinandone i risultati si ottiene la soluzione del problema principale. Java fornisce le primitive fork e join, che consente di implementare l’elaborazione parallela nelle applicazioni, tuttavia è compito del programmatore specificare come il problema è partizionato in sottoproblemi.
Una difficoltà ulteriore che si ha nell’implementare l’elaborazione parallela nelle Collection
è che non sono strutture dati thread-safe, il che significa che più thread non possono manipolare una collezione senza introdurre interferenze o errori di coerenza di memoria. Per tale motivo java fornisce dei wrapper di sincronizzazione, che gestiscono l’accesso concorrente ad una Collection
generica rendendola sicura.
1 |
List<Type> list = Collections.synchronizedList(new ArrayList<Type>()); |
Tuttavia, la sincronizzazione introduce il problema della contesa tra thread, che limita la loro esecuzione in parallelo. Le operazioni aggregate e gli stream paralleli consentono di implementare il parallelismo con le collezioni non thread-safe, a condizione che non si modifichi la collezione mentre si opera su di esso. Inoltre con gli operatori di aggregazione la scomposizione del problema e la ricomposizione dei risultati è eseguita dal runtime di java.
Implementazione
Il modo più semplice per parallelizzare l’elaborazione di uno Stream
è quello di utilizzare la direttiva parallel()
. Vediamo il semplice esempio della stampa degli elementi in una Collection
:
1 2 |
List<Integer> list = Arrays.asList( 1, 2, 3, 4, 5 ); list.stream().forEach( a -> System.out.println(a + ": " + Thread.currentThread().getName()) ); |
1 2 3 4 5 |
1: main 2: main 3: main 4: main 5: main |
Le cose che notiamo immediatamente sono due. Innanzitutto gli elementi della lista sono elaborati nell’ordine originale, inoltre tutta l’elaborazione è eseguita nel thread principale main
. Ora modifichiamo il codice introducendo la direttiva parallel()
:
1 2 |
List<Integer> list = Arrays.asList( 1, 2, 3, 4, 5 ); list.stream().parallel().forEach( a -> System.out.println(a + ": " + Thread.currentThread().getName()) ); |
1 2 3 4 5 |
3: main 2: ForkJoinPool.commonPool-worker-1 5: ForkJoinPool.commonPool-worker-2 1: ForkJoinPool.commonPool-worker-3 4: ForkJoinPool.commonPool-worker-4 |
In questo caso gli elementi non vengono più processati nell’ordine originale, perché ciascuno è gestito in un thread differente. Inoltre ciascun thread è recuperato da un pool ForkJoinPool
, un’implementazione di ExecutorService
introdotta in java 8 e che oltre alla gestione dei thread fornisce strumenti per recuperare informazioni sullo stato e le prestazioni del pool.
Sostanzialmente i thread sono recuperati dal common pool disponibile attraverso il metodo statico ForkJoinPool.commonPool()
e che gestisce un numero di thread dipendente dal numero di core fisici disponibili nella CPU della macchina. La dimensione del pool può essere ottenuti con la seguente riga di codice che, ad esempio, sul mio Macbook Pro con Intel Core i7 stampa 7:
1 |
System.out.println("Pool size: " + ForkJoinPool.commonPool().getParallelism()); |
Problemi
Sebbene a prima vista la soluzione offerta con gli stream paralleli risulti ottima, esistono situazioni in cui il suo utilizzo non è raccomandabile. Questo perché l’utilizzo del common fork-join thread pool implica che nel momento in cui uno solo dei thread è, per qualsiasi motivo, rallentato, tutta l’elaborazione ne risente.
Vediamo ad esempio questa nuova implementazione del nostro codice in cui rallentiamo l’elaborazione del primo item introducendo uno sleep arbitrario.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
System.out.println( "START: " + LocalTime.now() ); List<Integer> list = Arrays.asList( 1, 2, 3, 4, 5 ); list.stream().parallel().map( s -> { if ( s == 1 ) { try { Thread.sleep( 5000 ); } catch (Exception e) {} } return s; } ).forEach( a -> System.out.println(a + ": " + LocalTime.now() + " - " + Thread.currentThread().getName()) ); System.out.println( "END: " + LocalTime.now() ); |
L’esito della sua esecuzione sarà la produzione in console di un output simile a quello mostrato sotto, in cui si evidenzia che non solo l’ultimo item è eseguito 5 secondi dopo rispetto agli altri, ma che tutta l’elaborazione dello stream termina solo al completamento dell’elaborazione di tutti gli item.
1 2 3 4 5 6 7 |
START: 11:31:42.218 3: 11:31:42.221 - main 2: 11:31:42.221 - ForkJoinPool.commonPool-worker-5 5: 11:31:42.221 - ForkJoinPool.commonPool-worker-1 4: 11:31:42.221 - ForkJoinPool.commonPool-worker-2 1: 11:31:47.223 - ForkJoinPool.commonPool-worker-4 END: 11:31:47.224 |
Il problema è ancora più grave nel caso in cui nella nostra applicazione esistano più stream che vengono elaborati parallelamente, perché tutti faranno riferimento allo stesso pool di thread. In altre parole l’elaborazione di uno stream potrebbe essere bloccata a causa della elaborazione di un altro stream “lento” che nulla ha a che vedere col primo.
In queste considerazioni non si è poi considerato il caso in cui una delle elaborazione genera errori o va in eccezione. Se non gestita un errore sulla singola elaborazioni può inficiare l’elaborazione dell’intero stream, cosa che potrebbe non essere corretta.
In conclusione nell’attesa che Oracle rilasci una versione del framework che consenta di specificare il pool da utilizzare, il consiglio è quello di utilizzare i parallel stream solamente per task di cui si è certi non richiedano parecchie risorse di tempo e non producano eccezioni. Oppure, in alternativa, utilizzare il framework RxJava per la programmazione reattiva.
Codice Sorgente
Il codice sorgente contenente tutti gli esempi presentati è scaricabile qui parallel-stream.