Vert.x è un framework per la realizzazione di applicazioni reactive in java per molti aspetti simile a Node.js per javascript. I modello di programmazione che offre è di tipo event drive, questo significa che tutte le sue API sono progettate per lavorare principalmente in modo asincrono e se si è interessati al risultato di una operazione è necessario registrare un handler che Vert.x invocherà per comunicarne l’esito.
Il framework è implementato seguendo il design pattern Reactor, che offre un modello per la gestione di richieste che vengono consegnate concorrentemente ad un Service Handler da vari canali di input, il quale poi le smista in modo sincrono ai gestori delle richieste associati. Si tratta di un pattern molto utilizzato in informatica, si pensi ad esempio ad un Web Server, in cui le richieste provenienti dai client vengono servite senza bloccare il server, indipendentemente dal fatto che la singola richiesta si onerosa o meno.
In una implementazione standard del pattern Reactor, un singolo thread, chiamato event loop, è utilizzato per la gestione di tutti gli eventi. Esso esegue un loop infinito, dove riceve e smista gli eventi a tutti gli handler registrati. Poiché un thread è eseguito nell’ambito di un singolo core, per poter sfruttare al meglio la CPU è necessario avviare contemporaneamente più processi Reactor.
Vert.x utilizza invece una implementazione diversa del pattern. Invece di un singolo event loop, ogni istanza Vert.x ne avvia e gestisce più di una, almeno una per ogni core disponibile, ma tale valore piò essere sovrascritto.
L’Architettura
L’architettura del framework è molto semplice ed intuitiva. La logica di business dell’applicazione è eseguita all’interno di un o più componenti chiamati Vericle (vertice). Ogni verticle viene eseguito in maniera concorrente rispetto agli altri senza alcuna condivisione di uno stato. In tal modo è possibile creare applicazioni multi-threaded senza curarsi delle problematiche tipiche nella realizzazione di applicazioni concorrenti, come la sincronizzazione o i lock tra thread.
Ogni verticle vive all’interno di una istanza Vert.x ed è gestito da un event loop. A sua volta una istanza Vert.x è eseguita all’interno di una JVM. Diverse istanze Vert.x possono essere in esecuzione su diversi host in un network e la comunicazione tra le diverse componenti è garantita per mezzo di un Event Bus. Tutte comunicazioni avvengono tramite questo componente, semplicemente attraverso lo scambio di messaggi. I verticle possono mandare o ascoltare eventi attraverso il bus con diverse modalità: punto-punto, publish and subscribe o request-replay.
Un Esempio Pratico
Per comprendere i concetti appena descritti consideriamo un classico problema della programmazione concorrente, quello del produttore-consumatore, in cui due processi devono sincronizzarsi per scambiarsi delle informazioni. Creiamo quindi un progetto Maven, senza specificare archetype, ed inseriamo nel pom.xml
la dipendenza a Vert.x:
1 2 3 4 5 |
<dependency> <groupId>io.vertx</groupId> <artifactId>vertx-core</artifactId> <version>3.5.4</version> </dependency> |
I processi produttore e consumatore saranno naturalmente due componenti verticle che si realizzano in java implementando l’interfaccia Verticle
o meglio estendendo la classe AbstractVerticle
. Tale classe astratta definisce due metodi start()
e stop()
che vengono invocati al momento del deploy e dell’un-deploy del componente. Generalmente nell’implementare un verticle si ridefinisce il metodo start()
e solo al completamento di questo i componente è considerato started. La classe astratta AbstractVerticle
definisce inoltre alcuni metodi di utilità quali:
deploymentID()
: che fornisce l’identificatore univoco associato al componente all’atto dl deploy;-
config()
: che consente di accedere ai parametri di configurazione passati al componente durante il deploy; -
getVertx()
: che consente di recuperare l’istanza Vert.x che ha avviato il componente.
Processo Produttore
Consideriamo inizialmente il semplice caso di un processo produttore che genera una sequenza di numeri casuali interi. La classe RandomNumberProducerVerticle
è così definita:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
public class RandomNumberProducerVerticle extends AbstractVerticle { Random random = new Random(); public void start() throws Exception { EventBus eb = getVertx().eventBus(); getVertx().setPeriodic( 1000,id -> { eb.<Integer>send( "javaboss.channel", random.nextInt(100), ar -> { if ( ar.succeeded() ) { System.out.println( "PRODUCER: Received reply: " + ar.result().body()); } else { System.out.println( "PRODUCER: Received exception: " + ar.cause().getMessage() ); } }); }); } } |
Nel dettaglio l’oggetto EventBus
, recuperato dall’istanza Vert.x, fornisce l’interfaccia verso il sistema di messaggistica distribuita che consente alle diverse componenti di una applicazione Vert.x di comunicare tra loro in modo scarsamente accoppiato (loosely coupled). La trasmissione dei messaggi può avvenire invocando i metodi send()
o publish()
su tale oggetto, con la seguente differenza:
send()
: il messaggio è inviato sul canale specificato (detto address) ed è letto al massimo da uno degli handler registrato sul canale.publish()
: il messaggio è inviato sul canale specificato ed è letto da tutti gli handler registrati sul canale.
Di tali metodi esistono diverse implementazioni con signature differenti. La più semplice riceve in input l’address del canale ed il messaggio e non si cura di eventuali opzioni o risposte dai riceventi. Nel nostro esempio abbiamo preferito utilizzare una implementazione che ci consente di registrare un reply handler che sarà invocato nel caso in cui il ricevente risponde al messaggio:
1 |
<T> EventBus send(String address, Object message, Handler<AsyncResult<Message<T>>> replyHandler); |
All’handler è restituito un oggetto AsyncResult
il quale fornisce diversi metodi di utilità. I metodi succeeded()
e failed()
, per verificare che la trasmissione sia andata a buon fine o meno, il metodo cause()
, che restituisce l’eventuale causa di fallimento, ed infine il metodo result()
, che restituisce l’eventuale risposta prodotta dal processo consumatore.
Infine l’istanza Vert.x fornisce un metodo per implementare un timer, che può essere sia di tipo one-shot, che periodico, che consente di eseguire una azione allo scadere dell’intervallo specificato. Nell’esempio presentato il producer utilizza un timer periodico per avviare la procedura di generazione ed invio sul canale di un numero casuale.
Processo Consumatore
Il processo consumatore è notevolmente più semplice e si limita a registrare un handler per “consumare” messaggi provenienti dal canale:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public class RandomNumberConsumerVerticle extends AbstractVerticle { public void start() throws Exception { EventBus eb = getVertx().eventBus(); eb.<Integer>consumer("javaboss.channel", message -> { System.out.println("CONSUMER: I have received the message: " + message.body()); if ( message.body() >= 50 ) { message.fail( 0, "I don't like it!"); } else { message.reply( "ok!" ); } }); } } |
L’oggetto Message
restituito all’handler receiver consente non solo di recuperare il messaggio attraverso il metodo body()
, ma anche di inviare una risposta mediante il metodo reply()
o di restituire un fallimento col metodo fail()
.
Deploy
Il deploy dei componenti verticle può essere fatto invocando il metodo deployVerticle()
sull’istanza Vert.x specifica. Come di consueto anche per tale metodo è eseguito in modo asincrono, quindi se si intende ricevere informazioni sull’esito del deploy è necessario registrare un handler. Il metodo per l’avvio del producer (quello del consumer ovviamente simile) sarà quindi:
1 2 3 4 5 6 7 8 9 10 11 |
public void startProducer() { DeploymentOptions options = new DeploymentOptions(); vertx.deployVerticle( RandomNumberProducerVerticle.class, options, res -> { if ( res.succeeded() ) { producerId = res.result(); System.out.println("Producer deployed with id: " + producerId ); } else { System.out.println("Producer deployment failed!"); } }); } |
Per completezza registriamo anche un timer che esegue l’undeploy dei due componenti dopo 10 secondi dall’avvio:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public void stopAll() { vertx.setTimer( 10000, id -> { vertx.undeploy(producerId, res -> { if (res.succeeded()) { System.out.println("Undeployed ok"); } else { System.out.println("Undeploy failed!"); } }); vertx.undeploy(consumerId, res -> { if (res.succeeded()) { System.out.println("Undeployed ok"); } else { System.out.println("Undeploy failed!"); } }); }); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
Producer deployed with id: e7805ad5-b673-4223-a5de-b225d6937167 Consumer deployed with id: 045ef2a6-1277-43e0-8bb4-dc44c4bfd007 CONSUMER: I have received the message: 0 PRODUCER: Received reply: ok! CONSUMER: I have received the message: 10 PRODUCER: Received reply: ok! CONSUMER: I have received the message: 12 PRODUCER: Received reply: ok! CONSUMER: I have received the message: 76 PRODUCER: Received exception: I don't like it! CONSUMER: I have received the message: 67 PRODUCER: Received exception: I don't like it! CONSUMER: I have received the message: 90 PRODUCER: Received exception: I don't like it! CONSUMER: I have received the message: 20 PRODUCER: Received reply: ok! CONSUMER: I have received the message: 21 PRODUCER: Received reply: ok! CONSUMER: I have received the message: 62 PRODUCER: Received exception: I don't like it! CONSUMER: I have received the message: 32 PRODUCER: Received reply: ok! Undeployed ok Undeployed ok |
Codice Sorgente
Il codice sorgente con tutti gli esempi mostrati è scaricabile qui vertx