Elaborazione Parallela con Spring Batch

In diversi articoli abbiamo descritto le principali caratteristiche di uno dei framework di Spring a mio avviso tra i più utili per la realizzazione di applicazioni enterprise, Spring Batch:

In questo articolo vogliamo proseguire la trattazione analizzando le modalità con cui il framework consente di eseguire elaborazioni parallele. Come suggerito anche nella documentazione ufficiale, nella maggior parte dei casi non vi è la necessità di una parallelizzazione delle attività, anche perché si tratta pur sempre di attività batch, però è indubbio che esistono situazioni in cui le performance sono importanti o la dimensione dei dati è tale da richiedere un tempo di elaborazione eccessivo.

In questi casi Spring Batch offre diverse opzioni, che ad un livello più alto fanno riferimento ai due “classici” modelli di parallelizzazione: single-process multithreaded e multi-process:

  • Multi-threaded Step (single process)
  • Parallel Steps (single process)
  • Remote Chunking of Step (multi process)
  • Partitioning a Step (single or multi process)

La modalità “Remote Chunking of Step” non è trattata nell’articolo perché richiederebbe una architettura più complessa.

Il Problema

Prima di procedere alla descrizione di tali opzioni descriviamo il problema che ci consentirà di esplorare  le soluzioni offerte dal framework. Per farlo scomodiamo il Teorema dei numeri primi di Gauss dal quale estraiamo il problema della determinazione del numero di numeri primi presenti nell’intervallo da 1 (escluso) ad N, indicata con la funzione π(x). La tabella seguente mostra il risultato per potenze di 10 fino alla quinta, che è sufficiente per i nostri scopi.

Per risolvere il problema usiamo la soluzione “banale” e poco “efficiente” di enumerare tutti i numeri da 2 ad N e verificare se siano primi dividendoli per tutti i precedenti numeri da 2 ad N-1. Ovviamente esistono possibili ottimizzazioni a tale algoritmo, come quello di eliminare i numeri pari ma questo esula dai nostri scopi.

Creiamo quindi un progetto Maven senza alcun artifact, apriamo il pom.xml ed inseriamo le dipendenze che ci sono necessarie:

Procediamo implementando i tre componenti caratteristici di uno step di Spring Batch. Il primo è un NumberGenerator che implementa l’interfaccia ItemReader e che utilizza un contatore per la generazione del successivo intero da valutare. Si noti l’utilizzo del tipo AtomicInteger e del metodo addAndGet() che agisce in modo atomico sul valore incrementandolo e restituendone il risultato. Questo eviterà una eccezione di accesso concorrente quando introdurremo l’elaborazione parallela.

Successivamente implementiamo il NumberProcessor che implementa l’interfaccia ItemProcessor e determina se un dato numero sia primo o meno con l’algoritmo descritto sopra.

Infine implementiamo l’interfaccia ItemWriter con la classe NumberWriter che semplicemente inserisce i numeri primi trovati in una variabile di appoggio.

Il file XML che definisce il processo utilizzando le componenti di SpringBatch ora definite sono:

Eseguiamo il processo con la seguente classe dove vengono rilevate le prestazioni dell’esecuzione misurando il tempo prima e dopo l’avvio del job.

Di seguito è mostrato l’output prodotto per i primi 105 numeri interi, dal quale si evince che il tempo necessario per l’identificazione dei 9.592 numeri primi contenuti nell’intervallo è più circa 2 secondi.

Alzando la posta di un ordine di grandezza, per analizzate i numeri fino a 106 sono necessari più di 100 secondi per contare 78.498 numeri primi.

Nei prossimi paragrafi vediamo come sia possibile migliorare le prestazioni del nostro programma utilizzando le caratteristiche offerte da Spring Batch per l’elaborazione parallela.

Multi-threaded Step

Il modo più semplice per implementare l’elaborazione parallela è quella di assegnare un thread separato a ciascun chunk da analizzare, che nel nostro esempio corrispondono ad un intervallo di 100 numeri interi (commit-intervall = 100). Per farlo è sufficiente definire un bean che implementa l’interfaccia standard di Spring TaskExecutor (equivalente all’interfaccia Executor di java). Naturalmente Spring offre diverse implementazioni di tale interfaccia per realizzare strategie di esecuzione differenti: sincrona, asincrona, con pool, etc. Per i nostri scopi utilizzeremo un ThreadPoolTaskExecutor così configurato:

L’utilizzo di tale bean per la parallelizzazione dell’elaborazione si realizza molto semplicemente definendolo come valore della proprietà task-executor del tasklet. L’XML del job visto sopra diviene quindi:

L’effetto sarà che ciascun thread eseguirà i task numberGenerator, numberProcessor e numberWriter, caratteristici dello step, per l’intero chunk prima di passare al successivo. Ovviamente non esiste alcun ordine prestabilito nell’elaborazione dei chunk che quindi possono essere anche non consecutivi. Va inoltre considerato che i task che partecipano allo step devono essere thread safe e molti dei componenti (reader e writer) forniti dal Spring Batch non lo sono in quanto statefull. Come anticipato precedentemente la classe NumberGenerator è stata resa thread safe utilizzando il tipo AtomicInteger per evitare comportamenti inaspettati.

Eseguendo il job il framework utilizza 10 thread ed i tempi di elaborazione per gli intervalli fino a 105 e 106 diventano rispettivamente di 684ms e 26.199ms, con una riduzione percentuale del tempo di elaborazione del 65% nel primo caso e del 74% nel secondo.

Parallel Steps

Questa seconda modalità di elaborazione è applicabile ogni qualvolta la logica dell’applicazione consenti di parallelizzare l’esecuzione di due o più step, la cui esecuzione è quindi affidata a thread differenti attivi contemporaneamente. La configurazione di step paralleli si realizza molto semplicemente introducendo il concetto di split dell’elaborazione su diversi flussi (flow).

Per utilizzare tali concetti nel nostro problema di esempio supponiamo di suddividere l’intervallo degli interi da elaborare in due sottointervalli, ad esempio (1, 50.000] e (50.000, 100.000]. Quindi assegniamo l’elaborazione di ciascun sottointervallo a step differenti. Per farlo dobbiamo modificare il codice della classe NumberGenerator in modo da ricevere come parametri di configurazione il limite minimo e massimo degli interi da generare e configure due bean numberGeneratorLower e numberGeneratorUpper nel modo seguente:

Quindi configuriamo il job suddividendo l’elaborazione sui due step come mostrato nell’XML seguente. Si noti che la proprietà task-executor del tag <split> specifica quale implementazione di TaskExecutor deve essere utilizzato per la parallelizzazione dei flussi. Per default l’implementazione utilizzata è SyncTaskExecutor che non va bene se vogliamo eseguire i due step in parallelo.

Eseguendo il job questa volta il framework impiega 2 thread, uno per step come ci aspettavamo, elaborando gli interi fino a 105 e 106 rispettivamente in 1.119ms e 73.830ms. Ovviamente vi è un peggioramento rispetto al caso multi-threaded step, descritto al paragrafo precedente, perché i thread utilizzati questa volta sono solamente 2 ma rispetto al caso a singolo thread vi è comunque un miglioramento.

Partitioning

Il partitioning è una metodologia di elaborazione parallela in cui uno step è suddiviso in una componente master ed una slave e i dati di input su cui opera lo step sono partizionati opportunamente e la loro elaborazione è affidata alle componenti slave

Le componenti slave in genere saranno servizi remoti, ma possono anche essere thread eseguiti localmente, come nel nostro esempio. Per prima cosa dobbiamo creare un bean che implementa l’interfaccia Partitioner di Spring a cui è demandato il compito di partizionare i dati di input in intervalli e di inserirli in un ExecutionContext.

Il parametro gridSize specifica il numero di partizioni richieste ed è utilizzato, nel nostro caso, per determinare l’ampiezza del range cui dividere l’intervallo di numeri interi da analizzare. Per ciascun range generato è creato un nuovo contesto di esecuzione a cui sono assegnati i valori minimo e massimo dell’intervallo ed un nome nella forma “(min, max]”.

Una volta implementato il partitioner inseriamo la definizione del relativo bean nell’XML:

Quindi ridefiniamo il bean associato al tipo NumberGenerator in modo da recuperare i valori delle proprietà min e max dal contesto generato dal partitioner identificato dal bean stepExecutionContext.

L’XML che definisce il job diviene quindi:

L’esecuzione del job per l’intervallo di interi fino a 102 genera l’output mostrato di seguito con un tempo di 1.391ms, che è superiore agli altri casi di processamento parallelo ma comunque inferiore al caso base a thread singolo.

Codice Sorgente

Il codice sorgente completo di tutti gli esempi descritti è scaricabile qui spring-prime-counter.

 

How useful was this post?

Click on a star to rate it!

Average rating 5 / 5. Vote count: 2

No votes so far! Be the first to rate this post.

As you found this post useful...

Follow us on social media!