Reactive Streams

Dopo aver introdotto la programmazione reattiva nel precedente post Reactive Programming, parliamo più approfonditamente della specifica Reactive Streams ed in particolare della sua implementazione nella JVM. Introdotta dalla versione 1.8 come libreria separata, fu successivamente  inclusa nella versione 1.9 sotto il package java.util.concurrent.Flow. Il package è però la sola differenza tra le due versioni, quindi preferiamo utilizzare, per i nostri esempi, la versione per Java 1.8. Conseguentemente dobbiamo  creare un progetto maven ed includere le seguenti due dipendenze:

La specifica definisce le seguenti quattro interfacce, il cui scopo è intuibile dai loro nomi:

  • Publisher<T> è responsabile della pubblicazione di elementi di tipo T e fornisce un metodo di sottoscrizione per i Subscriber a cui connettersi.
  • Subscriber<T> si connette a un Publisher, ottenendone conferma tramite il metodo onSubscribe(), quindi riceve i dati tramite il metodo di callback onNext() e segnali aggiuntivi tramite i metodi onError() e onComplete().
  • Subscription rappresenta un collegamento tra un Publisher ed un Subscriber. Implementa il meccanismo di backpressure tramite il metodo request() e fornisce attraverso il metodo cancel() la possibilità di interrompere la sottoscrizione.
  • Processor combina in una unica interfaccia le funzionalità di un Publisher e di un Subscriber.

Publisher

Avendo a disposizione le interfacce sopra definite proviamo ad implementare un semplice esempio di Publisher che restituisce una lista di numeri consecutivi:

Molto banalmente la classe SimpleBublisher predispone un Iterator con numeri interi da 1 al parametro count (estremi compresi) e li restituisce al Subscriber invocando il metodo onNext() per ognuno di essi. Al termine chiude lo stream invocando il metodo onComplete().

Processor

La classe SimpleProcessor, che implementa l’interfaccia Processor, raddoppia ciascun numero intero che riceve. Tra tutte è la classe meno banale perché deve conservare gli elementi ricevuti in una coda per poi restituirli (raddoppiati) al Subscriber. Per farlo utilizza un oggetto di tipo ConcurrentLinkedQueue al fine di gestire l’accesso concorrente alla coda, che viene utilizzata sia nel metodo onNext(), per popolarla, che nel metodo subscribe(), per estrarne gli elementi.

Vale la pena spendere qualche riga per descrivere l’implementazione del metodo subscribe(). All’interno di un ciclo vengono inviati gli elementi al Subscriber fin tanto che ce ne sono. Se la coda è vuota il metodo rimane in attesa per 200 ms. Il ciclo termina solamente quando la coda è vuota ed il Processor ha ricevuto il segnale onComplete() dal Publisher, condizione rilevata dal fatto che la variabile completed, di tipo AtomicBoolean, è a true. Al termine del metodo è invocato l’onComplete() sul Subscriber.

Subscriber

Un Subscriber molto semplice che si limita a stampare gli elementi ricevuti dal Processor/Publisher è il seguente:

Main

Infine riportiamo il main che lega le tre classi ed avvia il processo di produzione della sequenza di numeri interi:

Coerentemente con quanto ci si aspetta il programma stampa in console i numeri inviati dal Publisher, raddoppiati dal Processor e ricevuti dal Subscriber:

Conclusioni

L’applicazione sviluppata sembra corretta in quanto funziona esattamente come ce lo aspettiamo. Sfortunatamente non è così, innanzitutto perché non vi è alcun meccanismo di backpressure utilizzato. Sta di fatto che esistono tutta una serie di regole che le implementazioni delle 4 interfacce devono rispettare e che sono riepilogate nella sezione specification del repository github del progetto Reatvice Streams.

La particolarità del progetto è che include anche una implementazione di un kit di verifica ti tale specifica, il Reactive Streams TCK (Technology Compatibility Kit). Il TCK non è altro che un test framework che verifica se l’implementazione di un componenti reattivo è corretta in termini dell’interazione che tale componente dovrebbe avere con gli altri. Il suo scopo è quello di garantire che tutte le implementazioni personalizzate dei componenti di Reactive Streams possano interagire senza problemi, eseguendo correttamente tutti i trasferimenti di dati e gestendo tutti i segnali e la backpressure.

Nei prossimi articoli vedremo come eseguire i test e migliorare l’implementazione del codice di esempio presentato in questo post.

Codice Sorgente

Il codice sorgente con l’esempio presentato è scaricabile qui react.

How useful was this post?

Click on a star to rate it!

Average rating 0 / 5. Vote count: 0

No votes so far! Be the first to rate this post.

As you found this post useful...

Follow us on social media!