In diversi articoli abbiamo descritto le principali caratteristiche di uno dei framework di Spring a mio avviso tra i più utili per la realizzazione di applicazioni enterprise, Spring Batch:
In questo articolo vogliamo proseguire la trattazione analizzando le modalità con cui il framework consente di eseguire elaborazioni parallele. Come suggerito anche nella documentazione ufficiale, nella maggior parte dei casi non vi è la necessità di una parallelizzazione delle attività, anche perché si tratta pur sempre di attività batch, però è indubbio che esistono situazioni in cui le performance sono importanti o la dimensione dei dati è tale da richiedere un tempo di elaborazione eccessivo.
In questi casi Spring Batch offre diverse opzioni, che ad un livello più alto fanno riferimento ai due “classici” modelli di parallelizzazione: single-process multithreaded e multi-process:
- Multi-threaded Step (single process)
- Parallel Steps (single process)
- Remote Chunking of Step (multi process)
- Partitioning a Step (single or multi process)
La modalità “Remote Chunking of Step” non è trattata nell’articolo perché richiederebbe una architettura più complessa.
Il Problema
Prima di procedere alla descrizione di tali opzioni descriviamo il problema che ci consentirà di esplorare le soluzioni offerte dal framework. Per farlo scomodiamo il Teorema dei numeri primi di Gauss dal quale estraiamo il problema della determinazione del numero di numeri primi presenti nell’intervallo da 1 (escluso) ad N, indicata con la funzione π(x)
. La tabella seguente mostra il risultato per potenze di 10 fino alla quinta, che è sufficiente per i nostri scopi.
Per risolvere il problema usiamo la soluzione “banale” e poco “efficiente” di enumerare tutti i numeri da 2 ad N e verificare se siano primi dividendoli per tutti i precedenti numeri da 2 ad N-1. Ovviamente esistono possibili ottimizzazioni a tale algoritmo, come quello di eliminare i numeri pari ma questo esula dai nostri scopi.
Creiamo quindi un progetto Maven senza alcun artifact, apriamo il pom.xml
ed inseriamo le dipendenze che ci sono necessarie:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
<!-- Spring Core --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <!-- Spting EL --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-expression</artifactId> <version>${spring.version}</version> </dependency> <!-- Spting tx --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> <version>${spring.version}</version> </dependency> <!-- Spring Batch dependencies --> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-core</artifactId> <version>${spring.batch.version}</version> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-infrastructure</artifactId> <version>${spring.batch.version}</version> </dependency> |
Procediamo implementando i tre componenti caratteristici di uno step di Spring Batch. Il primo è un NumberGenerator
che implementa l’interfaccia ItemReader
e che utilizza un contatore per la generazione del successivo intero da valutare. Si noti l’utilizzo del tipo AtomicInteger
e del metodo addAndGet()
che agisce in modo atomico sul valore incrementandolo e restituendone il risultato. Questo eviterà una eccezione di accesso concorrente quando introdurremo l’elaborazione parallela.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
public class NumberGenerator implements ItemReader<Integer> { private static final int LIMIT = 100000; private AtomicInteger number = new AtomicInteger(1); public Integer read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { int next = number.addAndGet(1); if ( next < LIMIT ) { System.out.println( "Thread n. " + Thread.currentThread().getId() + " is reading for step " + stepName + " the number " + next ); return next; } else { return null; // Indica al motore che gli item sono terminati } } } } |
Successivamente implementiamo il NumberProcessor
che implementa l’interfaccia ItemProcessor
e determina se un dato numero sia primo o meno con l’algoritmo descritto sopra.
1 2 3 4 5 6 7 8 9 10 11 12 |
public class NumberProcessor implements ItemProcessor<Integer,Integer> { public Integer process(Integer number) throws Exception { System.out.println( "Thread n. " + Thread.currentThread().getId() + " is processing for step " + stepName + " the number " + number ); for ( int i = 2; i < number; i++ ) { if ( number % i == 0 ) { return null; // not a prime } } return number; } } |
Infine implementiamo l’interfaccia ItemWriter
con la classe NumberWriter
che semplicemente inserisce i numeri primi trovati in una variabile di appoggio.
1 2 3 4 5 6 7 8 9 |
public class NumberWriter implements ItemWriter<Integer> { public void write(List<? extends Integer> items) throws Exception { System.out.println( "Thread n. " + Thread.currentThread().getId() + " is writing " + items.size() + " items for step " + stepName + " values " + items); SharedMemory.addPrimeNumber( items ); } } |
1 2 3 4 5 6 7 8 9 |
<beans ... > <batch:job id="singleStepJob"> <batch:step id="singleStepJob.step1"> <batch:tasklet> <batch:chunk reader="numberGenerator" processor="numberProcessor" writer="numberWriter" commit-interval="100"/> </batch:tasklet> </batch:step> </batch:job> </beans> |
Eseguiamo il processo con la seguente classe dove vengono rilevate le prestazioni dell’esecuzione misurando il tempo prima e dopo l’avvio del job.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
public class App { public static void main(String[] args) { String[] springConfig = { "context.xml", "single-step.xml" }; ApplicationContext context = new ClassPathXmlApplicationContext(springConfig); JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher"); Job job = (Job) context.getBean("singleStepJob"); long start = System.currentTimeMillis(); try { JobExecution execution = jobLauncher.run(job, new JobParameters()); System.out.println("Exit Status : " + execution.getStatus()); } catch (Exception e) { e.printStackTrace(); } Long end = System.currentTimeMillis(); System.out.println( "Threads: " + SharedMemory.getNumberOfUsedThreads() ); System.out.println( "Primes: " + SharedMemory.getNumberOfFoundPrimes()); System.out.println( "Done in " + (end-start) + " ms"); } } |
Di seguito è mostrato l’output prodotto per i primi 105 numeri interi, dal quale si evince che il tempo necessario per l’identificazione dei 9.592 numeri primi contenuti nell’intervallo è più circa 2 secondi.
1 2 3 |
Threads: 1 Primes: 9592 Done in 1975 ms |
Alzando la posta di un ordine di grandezza, per analizzate i numeri fino a 106 sono necessari più di 100 secondi per contare 78.498 numeri primi.
1 2 3 |
Threads: 1 Primes: 78498 Done in 103305 ms |
Nei prossimi paragrafi vediamo come sia possibile migliorare le prestazioni del nostro programma utilizzando le caratteristiche offerte da Spring Batch per l’elaborazione parallela.
Multi-threaded Step
Il modo più semplice per implementare l’elaborazione parallela è quella di assegnare un thread separato a ciascun chunk da analizzare, che nel nostro esempio corrispondono ad un intervallo di 100 numeri interi (commit-intervall = 100
). Per farlo è sufficiente definire un bean che implementa l’interfaccia standard di Spring TaskExecutor
(equivalente all’interfaccia Executor
di java). Naturalmente Spring offre diverse implementazioni di tale interfaccia per realizzare strategie di esecuzione differenti: sincrona, asincrona, con pool, etc. Per i nostri scopi utilizzeremo un ThreadPoolTaskExecutor
così configurato:
1 2 3 4 5 |
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="10" /> <property name="maxPoolSize" value="25" /> <property name="keepAliveSeconds" value="30" /> </bean> |
L’utilizzo di tale bean per la parallelizzazione dell’elaborazione si realizza molto semplicemente definendolo come valore della proprietà task-executor
del tasklet
. L’XML del job visto sopra diviene quindi:
1 2 3 4 5 6 7 8 9 |
<beans ...> <batch:job id="multiThreadedStepJob"> <batch:step id="multiThreadedStepJob.step1"> <batch:tasklet task-executor="taskExecutor"> <batch:chunk reader="numberGenerator" processor="numberProcessor" writer="numberWriter" commit-interval="100"/> </batch:tasklet> </batch:step> </batch:job> </beans> |
L’effetto sarà che ciascun thread eseguirà i task numberGenerator
, numberProcessor
e numberWriter
, caratteristici dello step, per l’intero chunk prima di passare al successivo. Ovviamente non esiste alcun ordine prestabilito nell’elaborazione dei chunk che quindi possono essere anche non consecutivi. Va inoltre considerato che i task che partecipano allo step devono essere thread safe e molti dei componenti (reader e writer) forniti dal Spring Batch non lo sono in quanto statefull. Come anticipato precedentemente la classe NumberGenerator
è stata resa thread safe utilizzando il tipo AtomicInteger
per evitare comportamenti inaspettati.
Eseguendo il job il framework utilizza 10 thread ed i tempi di elaborazione per gli intervalli fino a 105 e 106 diventano rispettivamente di 684ms e 26.199ms, con una riduzione percentuale del tempo di elaborazione del 65% nel primo caso e del 74% nel secondo.
Parallel Steps
Questa seconda modalità di elaborazione è applicabile ogni qualvolta la logica dell’applicazione consenti di parallelizzare l’esecuzione di due o più step, la cui esecuzione è quindi affidata a thread differenti attivi contemporaneamente. La configurazione di step paralleli si realizza molto semplicemente introducendo il concetto di split dell’elaborazione su diversi flussi (flow).
Per utilizzare tali concetti nel nostro problema di esempio supponiamo di suddividere l’intervallo degli interi da elaborare in due sottointervalli, ad esempio (1, 50.000] e (50.000, 100.000]. Quindi assegniamo l’elaborazione di ciascun sottointervallo a step differenti. Per farlo dobbiamo modificare il codice della classe NumberGenerator
in modo da ricevere come parametri di configurazione il limite minimo e massimo degli interi da generare e configure due bean numberGeneratorLower
e numberGeneratorUpper
nel modo seguente:
1 2 3 4 5 6 7 8 |
<bean id="numberGeneratorLower" class="it.javaboss.batch.NumberGenerator" scope="step"> <property name="min" value="${interval.min}"/> <property name="max" value="${interval.half}"/> </bean> <bean id="numberGeneratorUpper" class="it.javaboss.batch.NumberGenerator" scope="step"> <property name="min" value="${interval.half}"/> <property name="max" value="${interval.max}"/> </bean> |
Quindi configuriamo il job suddividendo l’elaborazione sui due step come mostrato nell’XML seguente. Si noti che la proprietà task-executor
del tag <split>
specifica quale implementazione di TaskExecutor
deve essere utilizzato per la parallelizzazione dei flussi. Per default l’implementazione utilizzata è SyncTaskExecutor
che non va bene se vogliamo eseguire i due step in parallelo.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
<beans ...> <batch:job id="parallelStepJob"> <batch:split id="split1" task-executor="taskExecutor"> <batch:flow> <batch:step id="parallelStepJob.step1" > <batch:tasklet> <batch:chunk reader="numberGeneratorLower" processor="numberProcessor" writer="numberWriter" commit-interval="100"/> </batch:tasklet> </batch:step> </batch:flow> <batch:flow> <batch:step id="parallelStepJob.step2"> <batch:tasklet > <batch:chunk reader="numberGeneratorUpper" processor="numberProcessor" writer="numberWriter" commit-interval="100"/> </batch:tasklet> </batch:step> </batch:flow> </batch:split> </batch:job> </beans> |
Eseguendo il job questa volta il framework impiega 2 thread, uno per step come ci aspettavamo, elaborando gli interi fino a 105 e 106 rispettivamente in 1.119ms e 73.830ms. Ovviamente vi è un peggioramento rispetto al caso multi-threaded step, descritto al paragrafo precedente, perché i thread utilizzati questa volta sono solamente 2 ma rispetto al caso a singolo thread vi è comunque un miglioramento.
Partitioning
Il partitioning è una metodologia di elaborazione parallela in cui uno step è suddiviso in una componente master ed una slave e i dati di input su cui opera lo step sono partizionati opportunamente e la loro elaborazione è affidata alle componenti slave.
Le componenti slave in genere saranno servizi remoti, ma possono anche essere thread eseguiti localmente, come nel nostro esempio. Per prima cosa dobbiamo creare un bean che implementa l’interfaccia Partitioner
di Spring a cui è demandato il compito di partizionare i dati di input in intervalli e di inserirli in un ExecutionContext
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
public class NumberPartitioner implements Partitioner { private Integer min; private Integer max; // Getter & Setter public Map<String, ExecutionContext> partition(int gridSize) { Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>(); int range = (max-min+1) / gridSize; String name; int begin = min; int end = 0; while ( begin < max ) { ExecutionContext value = new ExecutionContext(); end = Math.min( begin + range, max); name = "(" + begin + ", " + end + "]"; value.put( "name", name); value.putInt("min", begin); value.putInt("max", end ); result.put( name, value ); begin = end; } return result; } } |
Il parametro gridSize
specifica il numero di partizioni richieste ed è utilizzato, nel nostro caso, per determinare l’ampiezza del range cui dividere l’intervallo di numeri interi da analizzare. Per ciascun range generato è creato un nuovo contesto di esecuzione a cui sono assegnati i valori minimo e massimo dell’intervallo ed un nome nella forma “(min, max]”.
Una volta implementato il partitioner inseriamo la definizione del relativo bean nell’XML:
1 2 3 4 |
<bean id="partitioner" class="it.javaboss.batch.NumberPartitioner" > <property name="min" value="1"/> <property name="max" value="${interval.max}"/> </bean> |
Quindi ridefiniamo il bean associato al tipo NumberGenerator in modo da recuperare i valori delle proprietà min
e max
dal contesto generato dal partitioner identificato dal bean stepExecutionContext
.
1 2 3 4 |
<bean id="partitionedNumberGenerator" class="it.javaboss.batch.NumberGenerator" scope="step"> <property name="min" value="#{stepExecutionContext[min]}"/> <property name="max" value="#{stepExecutionContext[max]}"/> </bean> |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
<beans ...> <batch:job id="partitioningStepJob"> <batch:step id="partitioningStepJob.step1.master"> <batch:partition step="partitioningStepJob.step1" partitioner="partitioner"> <batch:handler grid-size="${grid.size}" task-executor="taskExecutor" /> </batch:partition> </batch:step> </batch:job> <!-- each thread will run this job, with different stepExecutionContext values. --> <batch:step id="partitioningStepJob.step1"> <batch:tasklet> <batch:chunk reader="partitionedNumberGenerator" processor="numberProcessor" writer="numberWriter" commit-interval="10" /> </batch:tasklet> </batch:step> </beans> |
L’esecuzione del job per l’intervallo di interi fino a 102 genera l’output mostrato di seguito con un tempo di 1.391ms, che è superiore agli altri casi di processamento parallelo ma comunque inferiore al caso base a thread singolo.
1 2 3 4 5 6 7 |
JOB: partitioningStepJob Range (1, 100000] splitted in 15 partition of 6666 number each Exit Status: COMPLETED Errors: [] Threads: 10 Primes: 9592 Done in 1391 ms |
Codice Sorgente
Il codice sorgente completo di tutti gli esempi descritti è scaricabile qui spring-prime-counter.