Nel tutorial Programmazione concorrente in java (parte 1) abbiamo visto i concetti base della programmazione concorrente in java. In questo post andiamo invece a descrivere alcuni concetti avanzati, analizzando quanto esistente nel package java.util.concurrent
, che fu introdotto in java 5 per semplificare la realizzazione di applicazioni multithread.
Semafori
Un semaforo è un costrutto per la sincronizzazione di thread che può essere utilizzata sia per inviare segnali tra i thread diversi, sia per proteggere una sezione critica di codice (similmente all’uso dei lock). Nell’inizializzare il semaforo è necessario indicare il numero di thread permessi, ovvero che possono acquisire il lock sul semaforo senza essere bloccati:
1 |
Semaphore semaphore = new Semaphore(N); |
Il semaforo fornisce poi due metodi: acquire()
e release()
, il primo acquisisce una risorsa del semaforo, mentre la seconda la rilascia. Quando le N risorse sono esaurite il thread successivo che esegue acquire()
sarà bloccato fino al prossimo release()
.
Utilizzato per proteggere una sezione critica di codice, il programma deve invocare il metodo acquire()
prima dell’esecuzione di tale sezione ed il release()
al termine di tale sezione:
1 2 3 4 5 6 7 8 9 10 |
Semaphore semaphore = new Semaphore(1); ... //critical section semaphore.acquire(); ... try { .... } finally { semaphore.release(); } |
Utilizzato invece per inviare segnali tra thread, il thread consumatore deve invocare il metodo acquire()
ogni volta che è pronto a consumare un dato, mentre il thread produttore deve invocare il metodo release()
ogni volta che ha prodotto un dato.
L’esempio seguente mostra una applicazione di tale tecnica. Come si nota ispezionando il main
della classe MainProduttoreConsumatore.java
il semaforo è generato con un numero di risorse pari a 0 (zero), al fine di evitare che il consumatore, che viene avviato per primo, acceda al canale quando non ci sono dati. Sarà il produttore a portare il semaforo ad 1 al primo release
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
public class Produttore extends Thread { private Semaphore semaforo; private List<Integer> canale; public Produttore(Semaphore semaforo, List<Integer> canale) { this.semaforo = semaforo; this.canale = canale; } @Override public void run() { Random random = new Random(); while ( !isInterrupted() ) { synchronized (canale) { Integer dato = random.nextInt( 99 ); System.out.println( "Prodotto: " + dato ); canale.add( dato ); } semaforo.release(); } } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
public class Consumatore extends Thread { private Semaphore semaforo; private List<Integer> canale; public Consumatore(Semaphore semaforo, List<Integer> canale) { this.semaforo = semaforo; this.canale = canale; } @Override public void run() { while ( !isInterrupted() ) { try { semaforo.acquire(); } catch (InterruptedException e) { return; } synchronized (canale) { System.out.println( "Ricevuto: " + canale ); } } } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
public class MainProduttoreConsumatore { public static void main(String[] args) { Semaphore semaforo = new Semaphore(0); List<Integer> canale = new ArrayList<Integer>(); Consumatore consumatore = new Consumatore( semaforo, canale ); consumatore.start(); Produttore produttore = new Produttore( semaforo, canale ); produttore.start(); try { Thread.sleep( 10 ); } catch (InterruptedException e) {} produttore.interrupt(); consumatore.interrupt(); } } |
Analizzando l’output prodotto notiamo che il tread consumatore entra nel blocco di stampa di quanto ricevuto tante volte quanti sono i dati “rilasciati” e poi si blocca in attesa. Notiamo infatti che la string: Ricevuto: [32, 90, 74]
è stampata 3 volte perché il semaforo è al valore 3. Ad ogni stampa si decrementa tale valore ed a zero il consumatore si blocca. Quindi il consumatore riprende il controllo e produce i restanti dati.
1 2 3 4 5 6 7 8 9 10 11 12 |
Prodotto: 32 Prodotto: 32 Prodotto: 90 Prodotto: 74 Ricevuto: [32, 90, 74] Ricevuto: [32, 90, 74] Ricevuto: [32, 90, 74] Prodotto: 22 Prodotto: 59 Ricevuto: [32, 90, 74, 22, 59] Ricevuto: [32, 90, 74, 22, 59] ... |
Fairness
Si noti nel caso vi siano più thread in attesa, l’utilizzo del semaforo nel modo presentato non garantisce che il primo thread bloccato a eseguito di un acquire()
sia anche il primo ad essere servito a seguito di un release()
. Per garantire ciò è necessario istanziare il semaforo nel modo seguente:
Semaphore semaphore = new Semaphore(1, true);
Si consideri comunque che l’effort di gestione è maggiore.
BlockingQueue
Proseguiamo nell’analisi del package java.util.concurrent
con una interfaccia che consente di implementare una coda thread safe. Una BlockingQueue
è tipicamente utilizzata nella situazione in cui un thread produce dei dati che debbono essere consumati da un altro thread. Come vista nell’esempio dei semafori questa situazione genera diversi problemi:
- la necessità di sincronizzare i due tread;
- l’accesso concorrente ai dati prodotti/consumati.
L’interfaccia BlockingQueue
li risolve entrambi. Il thread produttore infatti può inserire oggetti nella coda fino a quando non ne raggiunge il limite, oltre il quale viene bloccato. Il blocco dura fino a quando il thread consumatore non estrae almeno un oggetto dalla coda. Analogamente l’estrazione può avvenire fino a quando ci sono elementi da estrarre. Il thread consumatore viene bloccato quando non ci sono più elementi e rimane in blocco fino a che il consumatore non inserisce un nuovo oggetto. La classe inoltre è thread-safe, ovvero supporta l’accesso concorrente di più thread che quindi non hanno bisogno di lock o semafori.
Nell’interfaccia sono definiti diversi metodo, in particolare quelli che ci interessano sono put()
e take()
che eseguono l’inserimento e l’estrazione degli elementi con il meccanismo di blocco descritto sopra. Sono comunque previsti altri due metodi analoghi ma non bloccanti offer()
e pool()
, i quali tentano di eseguire l’operazione di inserimento o estrazione ma se non è possibile ritornano immediatamente. una loro versione alternativa prevede come parametro un timeout
che indica il tempo di attesa, in caso di blocco, prima di ritornare dall’operazione.
Le implementazioni fornite da java dell’interfaccia BlockingQueue
sono:
ArrayBlockingQueue |
Memorizza gli elementi in un array di dimensioni prestabilite, la cui gestione è di tipo FIFO (First In First Out). |
DelayQueue |
Questa implementazione conserva gli elementi al suo interno fino per un periodo di tempo oltre il quale l’elemento è rilasciabile. Gli elementi conservabili devono estendere una specifica interfacci java.util.concurrent.Delayed che attraverso il metodo getDalay indica il tempo di conservazione. |
LinkedBlockingQueue |
Memorizza gli elementi conservati al suo interno cona struttura LinkedList gestita in mofo FIFO . La lista può essere limitata o meno, nel secondo caso è preso come limite superiore il valore Integer.MAX_VALUE |
PriorityBlockingQueue |
Implementa una coda in cui gli elementi sono rilasciati secondo una certa priorità. A tal fine tali elementi devono implementare l’interfaccia java.lang.Comparable che è utilizzata internamente per stabilire il prioritario tra due elementi. La struttura non ha un limite prestabilito. |
SynchronousQueue |
Implementa una coda che può conservare al suo interno un solo elemento. |
Le nostre classi produttore e consumatore utilizzando un oggetto ArrayBlockingQueue
si semplificano notevolmente:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
public class Produttore extends Thread { private BlockingQueue<Integer> canale; public Produttore(BlockingQueue<Integer> canale) { this.canale = canale; } @Override public void run() { Random random = new Random(); while ( !isInterrupted() ) { Integer dato = random.nextInt( 99 ); System.out.println( "Prodotto: " + dato ); try { canale.put( dato ); } catch (InterruptedException e) { return; } } } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public class Consumatore extends Thread { private BlockingQueue<Integer> canale; public Consumatore(BlockingQueue<Integer> canale) { this.canale = canale; } @Override public void run() { while ( !isInterrupted() ) { try { System.out.println( "Ricevuto: " + canale.take() ); } catch (InterruptedException e) { return; } } } } |
ConcurrentMap
L’interfaccia java.util.concurrent.ConcurrentMap
è utilizzata per rappresentare una java.util.Map
capace di gestire l’accesso concorrente ad essa attraverso i metodo get
e put
.
Le sue implementazioni sono:
ConcurrentHashMap |
Molto simile alla classe java.util.HashTable ma gestisce la concorrenza eseguendo un lock interno nelle sezioni critiche di codice che accedono agli elementi. |
ConcurrentNavigableMap |
E’ una java.util.NavigableMap che supporta l’accesso concorrente. |
CyclicBarrier
Le cyclic barrier offrono un meccanismo di sincronizzazione tra thread. Si tratta di vere e proprie barriere che tutti i thread sincronizzati devono raggiungere prima di poter proseguire nell’esecuzione del codice.
Per il loro utilizzo una CyclingBarrier
deve essere inizializzata indicando il numero di thread che devono essere sincronizzati.
CyclicBarrier barrier = new CyclicBarrier(2);
Eventualmente è anche possibile indicare una action da eseguire quando tutti i thread hanno raggiunto la barrier, aggiungendo come parametro un oggetto di tipo Runnable
:
CyclicBarrier barrier = new CyclicBarrier(2, barrierAction);
Nel momento in cui un thread ha necessità di sincronizzarsi deve invocare il metodo await()
che lo metterà in blocco se almeno uno degli altri thread non ha ancora invocato il metodo. E’ anche possibile indicare un timeout
di attesa oltre il quale l’esecuzione del thread procede comunque. L’attesa dei thread potrà terminare in uno dei seguenti casi:
- l’ultimo thread invoca
await()
; - il thread è interrotto da un altro thread;
- un altro thread in attesa è interrotto;
- un altro thread esaurisce il timeout di attesa;
- è invocato il metodo
reset()
.
Exchanger
Simile al CyclicBarrier un Exchanger fornisce un punto di rendezvous tra thread in cui è possibile scambiarsi oggetti. Quando un thread raggiunge il punto di scambio invoca il metodo exchange()
passando come parametro l’oggetto da scambiare ed ottenendo come risultato l’oggetto scambiato. L’esecuzione sarà sospesa fino a quando l’altro thread eseguirà a sua volta il metodo exchange()
.
ExecutorService
L’interfaccia java.util.concurrent.ExecutorService
fornisce un meccanismo asincrono per la gestione dei task in background. oltre a fornire un maggior controllo sui thread in esecuzione, offre un meccanismo di pooling che limita il numero dei thread in esecuzione contemporanea. Un semplice esempio di utilizzo è il seguente:
1 2 3 4 5 6 7 8 9 |
ExecutorService executorService = Executors.newFixedThreadPool(10); executorService.execute(new Runnable() { public void run() { System.out.println("Asynchronous task"); } }); executorService.shutdown(); |
dove un pool di 10 thread è creato immediatamente e successivamente un task (di tipo Runnable
) è generato e dato in esecuzione all’ExecutorService
che, per la sua esecuzione, delegherà uno dei thread disponibili. L’invocazione le metodo execute()
non è bloccante.
Esistono due implementazioni di tale interfaccia:
ThreadPoolExecutor |
Implementa un pool di thread di dimensione fissa. Ogni volta che viene aggiunto un task se vi sono thread disponibili nel pool l’esecuzione avviene immediatamente, altrimenti il task è posto in attesa fino a quando non si rende disponibile un thread. |
ScheduledExecutorService |
E’ un ExecutorService che può schedulare l’esecuzione dei task assegnati in modo ritardato o ripetitivo con un certo intervallo tra una esecuzione e la successiva. Si tratta di una interfaccia quindi la sua implementazione concreta è ScheduledThreadPoolExecutor. |
La creazione di un ExecutorService dipende quindi dall’implementazione che si intende adottare. La classe Executors fornisce alcuni metodo di utilità a tale scopo:
1 2 3 |
ExecutorService executorService1 = Executors.newSingleThreadExecutor(); ExecutorService executorService2 = Executors.newFixedThreadPool(10); ExecutorService executorService3 = Executors.newScheduledThreadPool(10); |
execute(Runnable) |
Riceve un oggetto java.lang.Runnable e lo esegue in modo asincrono. Non c’è modo di avere un risultato dall’esecuzione del thread. |
||
submit(Runnable) |
Si comporta in modo simile al precedente ma ritorna un oggetto Future che può essere utilizzato per sapere se il thread ha terminato l’esecuzione.
|
||
submit(Callable) |
Simile a submit(Runnable) ma riceve in input un oggetto Callable . Tale oggetto è simile a Runnable ma attraverso il metodo call() può restituire un risultato dell’esecuzione del thread. Per farlo è necessario utilizzare l’oggetto Future restituito dal submit .
|
Terminato l’uso dell’ExecutorService
è necessario invocare il metodo shutdown()
. Questo impedisce all’executor di accettare nuovi task, ma i task ancora in esecuzione proseguono fino al loro completamento. Quando tutti i task terminato l’executor termina. Per terminare immediatamente l’executor e tutti i task running è possibile invocare il metodo shutdownNow()
.