Dopo aver introdotto la programmazione reattiva nel precedente post Reactive Programming, parliamo più approfonditamente della specifica Reactive Streams ed in particolare della sua implementazione nella JVM. Introdotta dalla versione 1.8 come libreria separata, fu successivamente inclusa nella versione 1.9 sotto il package java.util.concurrent.Flow
. Il package è però la sola differenza tra le due versioni, quindi preferiamo utilizzare, per i nostri esempi, la versione per Java 1.8. Conseguentemente dobbiamo creare un progetto maven ed includere le seguenti due dipendenze:
1 2 3 4 5 6 7 8 9 10 |
<dependency> <groupId>org.reactivestreams</groupId> <artifactId>reactive-streams</artifactId> <version>1.0.2</version> </dependency> <dependency> <groupId>org.reactivestreams</groupId> <artifactId>reactive-streams-tck</artifactId> <version>1.0.2</version> </dependency> |
Publisher<T>
è responsabile della pubblicazione di elementi di tipoT
e fornisce un metodo di sottoscrizione per iSubscriber
a cui connettersi.Subscriber<T>
si connette a unPublisher
, ottenendone conferma tramite il metodoonSubscribe()
, quindi riceve i dati tramite il metodo di callbackonNext()
e segnali aggiuntivi tramite i metodionError()
eonComplete()
.Subscription
rappresenta un collegamento tra unPublisher
ed unSubscriber
. Implementa il meccanismo di backpressure tramite il metodorequest()
e fornisce attraverso il metodocancel()
la possibilità di interrompere la sottoscrizione.Processor
combina in una unica interfaccia le funzionalità di unPublisher
e di unSubscriber
.
Publisher
Avendo a disposizione le interfacce sopra definite proviamo ad implementare un semplice esempio di Publisher
che restituisce una lista di numeri consecutivi:
1 2 3 4 5 6 7 8 9 10 11 12 |
public class SimplePublisher implements Publisher<Integer> { private final Iterator<Integer> iterator; public SimplePublisher(int count) { this.iterator = IntStream.rangeClosed( 1, count ).iterator(); } public void subscribe(Subscriber<? super Integer> subscriber) { iterator.forEachRemaining( subscriber::onNext ); subscriber.onComplete(); } } |
Molto banalmente la classe SimpleBublisher
predispone un Iterator
con numeri interi da 1
al parametro count
(estremi compresi) e li restituisce al Subscriber
invocando il metodo onNext()
per ognuno di essi. Al termine chiude lo stream invocando il metodo onComplete()
.
Processor
La classe SimpleProcessor
, che implementa l’interfaccia Processor
, raddoppia ciascun numero intero che riceve. Tra tutte è la classe meno banale perché deve conservare gli elementi ricevuti in una coda per poi restituirli (raddoppiati) al Subscriber
. Per farlo utilizza un oggetto di tipo ConcurrentLinkedQueue
al fine di gestire l’accesso concorrente alla coda, che viene utilizzata sia nel metodo onNext()
, per popolarla, che nel metodo subscribe()
, per estrarne gli elementi.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
public class SimpleProcessor implements Processor<Integer, Integer> { private ConcurrentLinkedQueue<Integer> numbers = new ConcurrentLinkedQueue<Integer>(); private AtomicBoolean completed = new AtomicBoolean( false ); Subscriber<? super Integer> subscriber; @Override public void onSubscribe(Subscription subscription) { System.out.println( this.getClass().getSimpleName() + ": I'm subscribed now" ); } @Override public void onNext(Integer number) { numbers.add( number * 2 ); } @Override public void onError(Throwable t) { subscriber.onError(t); } @Override public void onComplete() { completed.set( true ); System.out.println( this.getClass().getSimpleName() + ": completed" ); } @Override public void subscribe(Subscriber<? super Integer> subscriber) { while ( !completed.get() || !numbers.isEmpty() ) { if ( !numbers.isEmpty() ) { subscriber.onNext( numbers.poll() ); } else { try { Thread.sleep( 200 ); } catch (InterruptedException e) {} } } subscriber.onComplete(); } } |
Vale la pena spendere qualche riga per descrivere l’implementazione del metodo subscribe()
. All’interno di un ciclo vengono inviati gli elementi al Subscriber
fin tanto che ce ne sono. Se la coda è vuota il metodo rimane in attesa per 200 ms. Il ciclo termina solamente quando la coda è vuota ed il Processor
ha ricevuto il segnale onComplete()
dal Publisher
, condizione rilevata dal fatto che la variabile completed
, di tipo AtomicBoolean
, è a true. Al termine del metodo è invocato l’onComplete()
sul Subscriber
.
Subscriber
Un Subscriber
molto semplice che si limita a stampare gli elementi ricevuti dal Processor
/Publisher
è il seguente:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
public class SimpleSubscriber implements Subscriber<Integer> { @Override public void onSubscribe(Subscription subscription) { System.out.println( this.getClass().getSimpleName() + ": I'm subscribed now" ); } @Override public void onNext(Integer t) { System.out.println( this.getClass().getSimpleName() + ": " + t ); } @Override public void onError(Throwable t) { System.err.println( t ); } @Override public void onComplete() { System.out.println( this.getClass().getSimpleName() + ": completed" ); } } |
Main
Infine riportiamo il main che lega le tre classi ed avvia il processo di produzione della sequenza di numeri interi:
1 2 3 4 5 6 7 8 |
public static void main(String[] args) { SimplePublisher publisher = new SimplePublisher(5); SimpleProcessor processor = new SimpleProcessor(); SimpleSubscriber subscriber = new SimpleSubscriber(); publisher.subscribe( processor ); processor.subscribe( subscriber ); } |
Coerentemente con quanto ci si aspetta il programma stampa in console i numeri inviati dal Publisher
, raddoppiati dal Processor
e ricevuti dal Subscriber
:
1 2 3 4 5 6 7 |
SimpleProcessor: completed SimpleSubscriber: 2 SimpleSubscriber: 4 SimpleSubscriber: 6 SimpleSubscriber: 8 SimpleSubscriber: 10 SimpleSubscriber: completed |
Conclusioni
L’applicazione sviluppata sembra corretta in quanto funziona esattamente come ce lo aspettiamo. Sfortunatamente non è così, innanzitutto perché non vi è alcun meccanismo di backpressure utilizzato. Sta di fatto che esistono tutta una serie di regole che le implementazioni delle 4 interfacce devono rispettare e che sono riepilogate nella sezione specification del repository github del progetto Reatvice Streams.
La particolarità del progetto è che include anche una implementazione di un kit di verifica ti tale specifica, il Reactive Streams TCK (Technology Compatibility Kit). Il TCK non è altro che un test framework che verifica se l’implementazione di un componenti reattivo è corretta in termini dell’interazione che tale componente dovrebbe avere con gli altri. Il suo scopo è quello di garantire che tutte le implementazioni personalizzate dei componenti di Reactive Streams possano interagire senza problemi, eseguendo correttamente tutti i trasferimenti di dati e gestendo tutti i segnali e la backpressure.
Nei prossimi articoli vedremo come eseguire i test e migliorare l’implementazione del codice di esempio presentato in questo post.
Codice Sorgente
Il codice sorgente con l’esempio presentato è scaricabile qui react.