Programmazione concorrente in java (parte 2)

Nel tutorial Programmazione concorrente in java (parte 1) abbiamo visto i concetti base della programmazione concorrente in java. In questo post andiamo invece a descrivere alcuni concetti avanzati, analizzando quanto esistente nel package java.util.concurrent, che fu introdotto in java 5 per semplificare la realizzazione di applicazioni multithread.

Semafori

Un semaforo è un costrutto per la sincronizzazione di thread che può essere utilizzata sia per inviare segnali tra i thread diversi, sia per proteggere una sezione critica di codice (similmente all’uso dei lock). Nell’inizializzare il semaforo è necessario indicare il numero di thread permessi, ovvero che possono acquisire il lock sul semaforo senza essere bloccati:

Il semaforo fornisce poi due metodi: acquire() e release(), il primo acquisisce una risorsa del semaforo, mentre la seconda la rilascia. Quando le N risorse sono esaurite il thread successivo che esegue acquire() sarà bloccato fino al prossimo release().

Utilizzato per proteggere una sezione critica di codice, il programma deve invocare il metodo acquire() prima dell’esecuzione di tale sezione ed il release() al termine di tale sezione:

Utilizzato invece per inviare segnali tra thread, il thread consumatore deve invocare il metodo acquire() ogni volta che è pronto a consumare un dato, mentre il thread produttore deve invocare il metodo release() ogni volta che ha prodotto un dato.

L’esempio seguente mostra una applicazione di tale tecnica. Come si nota ispezionando il main della classe MainProduttoreConsumatore.java il semaforo è generato con un numero di risorse pari a 0 (zero), al fine di evitare che il consumatore, che viene avviato per primo, acceda al canale quando non ci sono dati. Sarà il produttore a portare il semaforo ad 1 al primo release.

Analizzando l’output prodotto notiamo che il tread consumatore entra nel blocco di stampa di quanto ricevuto tante volte quanti sono i dati “rilasciati” e poi si blocca in attesa. Notiamo infatti che la string: Ricevuto: [32, 90, 74] è stampata 3 volte perché il semaforo è al valore 3. Ad ogni stampa si decrementa tale valore ed a zero il consumatore si blocca. Quindi il consumatore riprende il controllo e produce i restanti dati.

Fairness

Si noti nel caso vi siano più  thread in attesa, l’utilizzo del semaforo nel modo presentato non garantisce che il primo thread bloccato a eseguito di un acquire() sia anche il primo ad essere servito a seguito di un release(). Per garantire ciò è necessario istanziare il semaforo nel modo seguente:

Semaphore semaphore = new Semaphore(1, true);

Si consideri comunque che l’effort di gestione è maggiore.

BlockingQueue

Proseguiamo nell’analisi del package java.util.concurrent con una interfaccia che consente di implementare una coda thread safe. Una BlockingQueue è tipicamente utilizzata nella situazione in cui un thread produce dei dati che debbono essere consumati da un altro thread. Come vista nell’esempio dei semafori questa situazione genera diversi problemi:

  1. la necessità di sincronizzare i due tread;
  2. l’accesso concorrente ai dati prodotti/consumati.

L’interfaccia BlockingQueue li risolve entrambi. Il thread produttore infatti può inserire oggetti nella coda fino a quando non ne raggiunge il limite, oltre il quale viene bloccato. Il blocco dura fino a quando il thread consumatore non estrae almeno un oggetto dalla coda. Analogamente l’estrazione può avvenire fino a quando ci sono elementi da estrarre. Il thread consumatore viene bloccato quando non ci sono più elementi e rimane in blocco fino a che il consumatore non inserisce un nuovo oggetto. La classe inoltre è thread-safe, ovvero supporta l’accesso concorrente di più thread che quindi non hanno bisogno di lock o semafori.

Nell’interfaccia sono definiti diversi metodo, in particolare quelli che ci interessano sono put() e take() che eseguono l’inserimento e l’estrazione degli elementi con il meccanismo di blocco descritto sopra. Sono comunque previsti altri due metodi analoghi ma non bloccanti offer() e pool(), i quali tentano di eseguire l’operazione di inserimento o estrazione ma se non è possibile ritornano immediatamente. una loro versione alternativa prevede come parametro un timeout che indica il tempo di attesa, in caso di blocco, prima di ritornare dall’operazione.

Le implementazioni fornite da java dell’interfaccia BlockingQueue sono:

ArrayBlockingQueue Memorizza gli elementi in un array di dimensioni prestabilite, la cui gestione è di tipo FIFO (First In First Out).
DelayQueue Questa implementazione conserva gli elementi al suo interno fino per un periodo di tempo oltre il quale l’elemento è rilasciabile. Gli elementi conservabili devono estendere una specifica interfacci java.util.concurrent.Delayed che attraverso il metodo getDalay indica il tempo di conservazione.
LinkedBlockingQueue Memorizza gli elementi conservati al suo interno cona struttura LinkedList gestita in mofo FIFO. La lista può essere limitata o meno, nel secondo caso è preso come limite superiore il valore Integer.MAX_VALUE
PriorityBlockingQueue  Implementa una coda in cui gli elementi sono rilasciati secondo una certa priorità. A tal fine tali elementi devono implementare l’interfaccia java.lang.Comparable che è utilizzata internamente per stabilire il prioritario tra due elementi. La struttura non ha un limite prestabilito.
SynchronousQueue  Implementa una coda che può conservare al suo interno un solo elemento.

Le nostre classi produttore e consumatore utilizzando un oggetto ArrayBlockingQueue si semplificano notevolmente:

ConcurrentMap

L’interfaccia java.util.concurrent.ConcurrentMap è utilizzata per rappresentare una java.util.Map capace di gestire l’accesso concorrente ad essa attraverso i metodo get e put.

Le sue implementazioni sono:

ConcurrentHashMap Molto simile alla classe java.util.HashTable ma gestisce la concorrenza eseguendo un lock interno nelle sezioni critiche di codice che accedono agli elementi.
ConcurrentNavigableMap  E’ una java.util.NavigableMap che supporta l’accesso concorrente.

CyclicBarrier

Le cyclic barrier offrono un meccanismo di sincronizzazione tra thread. Si tratta di vere e proprie barriere che tutti i thread sincronizzati devono raggiungere prima di poter proseguire nell’esecuzione del codice.

Cyclic Barrier

Per il loro utilizzo una CyclingBarrier deve essere inizializzata indicando il numero di thread che devono essere sincronizzati.

CyclicBarrier barrier = new CyclicBarrier(2);

Eventualmente è anche possibile indicare una action da eseguire quando tutti i thread hanno raggiunto la barrier, aggiungendo come parametro un oggetto di tipo Runnable:

CyclicBarrier barrier = new CyclicBarrier(2, barrierAction);

Nel momento in cui un thread ha necessità di sincronizzarsi deve invocare il metodo await() che lo metterà in blocco se almeno uno degli altri thread non ha ancora invocato il metodo. E’ anche possibile indicare un timeout di attesa oltre il quale l’esecuzione del thread procede comunque. L’attesa dei thread potrà terminare in uno dei seguenti casi:

  • l’ultimo thread invoca await();
  • il thread è interrotto da un altro thread;
  • un altro thread in attesa è interrotto;
  • un altro thread esaurisce il timeout di attesa;
  • è invocato il metodo reset().

Exchanger

Simile al CyclicBarrier un Exchanger fornisce un punto di rendezvous tra thread in cui è possibile scambiarsi oggetti. Quando un thread raggiunge il punto di scambio invoca il metodo exchange() passando come parametro l’oggetto da scambiare ed ottenendo come risultato l’oggetto scambiato. L’esecuzione sarà sospesa fino a quando l’altro thread eseguirà a sua volta il metodo exchange().

ExecutorService

L’interfaccia java.util.concurrent.ExecutorService fornisce un meccanismo asincrono per la gestione dei task in background. oltre a fornire un maggior controllo sui thread in esecuzione, offre un meccanismo di pooling che limita il numero dei thread in esecuzione contemporanea. Un semplice esempio di utilizzo è il seguente:

dove un pool di 10 thread è creato immediatamente e successivamente un task (di tipo Runnable) è generato e dato in esecuzione all’ExecutorService che, per la sua esecuzione, delegherà uno dei thread disponibili. L’invocazione le metodo execute() non è bloccante.

Esistono due implementazioni di tale interfaccia:

ThreadPoolExecutor  Implementa un pool di thread di dimensione fissa. Ogni volta che viene aggiunto un task se vi sono thread disponibili nel pool l’esecuzione avviene immediatamente, altrimenti il task è posto in attesa fino a quando non si rende disponibile un thread.
ScheduledExecutorService  E’ un ExecutorService che può schedulare l’esecuzione dei task assegnati in modo ritardato o ripetitivo con un certo intervallo tra una esecuzione e la successiva. Si tratta di una interfaccia quindi la sua implementazione concreta è ScheduledThreadPoolExecutor.

La creazione di un ExecutorService dipende quindi dall’implementazione che si intende adottare. La classe Executors fornisce alcuni metodo di utilità a tale scopo:

L’interfaccia ExecutorService definisce alcuni metodi che descriviamo di seguito:

execute(Runnable) Riceve un oggetto java.lang.Runnable e lo esegue in modo asincrono. Non c’è modo di avere un risultato dall’esecuzione del thread.
submit(Runnable) Si comporta in modo simile al precedente ma ritorna un oggetto Future che può essere utilizzato per sapere se il thread ha terminato l’esecuzione.
submit(Callable) Simile a submit(Runnable) ma riceve in input un oggetto Callable. Tale oggetto è simile a Runnable ma attraverso il metodo call() può restituire un risultato dell’esecuzione del thread. Per farlo è necessario utilizzare l’oggetto Future restituito dal submit.

Terminato l’uso dell’ExecutorService è necessario invocare il metodo shutdown(). Questo impedisce all’executor di accettare nuovi task, ma i task ancora in esecuzione proseguono fino al loro completamento. Quando tutti i task terminato l’executor termina. Per terminare immediatamente l’executor e tutti i task running è possibile invocare il metodo shutdownNow().