Nel precedente articolo Reactive Streams abbiamo introdotto il Reactive Streams Technology Compatibility Kit (TCK nel seguito), il cui scopo è quello di guidare e aiutare gli sviluppatori di librerie, conformi alla specifica Reactive Streams, a convalidare le loro implementazioni rispetto alle regole che fanno parte delle specifica.
Nelle intenzioni il TCK vorrebbe coprire tutte le regole, tuttavia per alcune di queste non è possibile (o fattibile) costruire test automatizzati, quindi non si può pretendere di verificare completamente al 100% tutta l’implementazione. Ciò nonostante si tratta di uno strumento molto utile che è in grado di validare almeno le norme più importanti della specifica.
Il TCK è distribuito come libreria nel repository Maven Central, e va quindi aggiunto nel progetto come dipendenza:
1 2 3 4 5 6 |
<dependency> <groupId>org.reactivestreams</groupId> <artifactId>reactive-streams-tck</artifactId> <version>1.0.2</version> <scope>test</scope> </dependency> |
Implementazione dei Casi di Test
Il TCK è suddiviso in 4 classi di test, realizzate con il framework TestNG, che devono essere estese dagli sviluppatori, fornendo le rispettive implementazioni dei componenti Publisher
, Subscriber
o Processor
che intendono sottoporre a validazione. Tali classi sono:
PublisherVerification
SubscriberWhiteboxVerification
SubscriberBlackboxVerification
IdentityProcessorVerification
In questo post utilizzeremo la classe PublisherVerifier
per verificare l’implementazione della classe di esempio SimplePublisher
sviluppata nel corso del precedente articolo Reactive Streams, che riportiamo per comodità:
1 2 3 4 5 6 7 8 9 10 11 12 |
public class SimplePublisher implements Publisher<Integer> { private final Iterator<Integer> iterator; public SimplePublisher(int count) { this.iterator = IntStream.rangeClosed( 1, count ).iterator(); } public void subscribe(Subscriber<? super Integer> subscriber) { iterator.forEachRemaining( subscriber::onNext ); subscriber.onComplete(); } } |
L’implementazione del test case per tale classe si realizza in modo molto semplice estendendo la classe astratta PublisherVerifier
nel modo seguente:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
@Test public class SimplePublisherTest extends PublisherVerification<Integer> { public SimplePublisherTest() { super(new TestEnvironment()); } public Publisher<Integer> createPublisher(long elements) { return new SimplePublisher((int)elements); } public Publisher<Integer> createFailedPublisher() { return null; } } |
La classe prevede l’implementazione di due metodo. Il primo, createPublisher()
, utilizzato per istanziare l’oggetto Publisher
da testare, il secondo, createFailedPublisher()
, per istanziare un oggetto che restituisca un errore a seguito di una sottoscrizione. Questo secondo metodo è abilitante all’esecuzione di ulteriori test case, che per il momento possiamo tralasciare e per questo restituiamo null
.
Tutti i test implementati nel TCK devono essere eseguiti in modo isolato al fine di consentirne l’esecuzione parallela. Questo è garantito dall’oggetto TestEnvironment
nel quale sono anche definiti i timeout di attesa delle diverse callback.
Esecuzione dei Test
Per eseguire il test è sufficiente, in Eclipse, aprire il menù contestuale (tasto destro del mouse) sulla classe SimplePublisherTest
e selezionare la voce Run As -> TestNG test. Al termine dell’esecuzione otterremo 19 test falliti, 9 ignorati ed 1 passato.
Il solo caso di successo è quello relativo alla specifica 1.9 che prevede la restituzione dell’eccezione java.lang.NullPointerException
nel caso in cui il Subscriber
che si sottoscrive sia nullo. L’eccezione è infatti sollevata nel metodo onSubscribe()
quando si esegue il metodo onNext()
sul Subscriber
.
Procediamo analizzando i 19 casi di test falliti uno alla volta e modifichiamo conseguentemente la classe SimplePublisher
al fine di risolvere tutti i fallimenti.
Must Issue On Subscribe For Non Null Subscriber
Il metodo required_spec109_mustIssueOnSubscribeForNonNullSubscriber
, intende verificare che l’implementazione invochi la callback onSubscribe()
a seguito di una sottoscrizione. Tradotto in codice significa che nel metodo subscribe()
della classe SimplePublisher
è necessario invocare il metodo onSubscribe()
sull’oggetto Subscriber
ricevuto in input. Tale metodo richiede come parametro un oggetto di tipo Subscription
, che ricordiamo rappresenta un link tra Publisher
e Subscriber
, ed al quale deve essere demandata la gestione della comunicazione tra i due componenti. Il metodo subscribe()
diviene quindi:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
public void subscribe(Subscriber<? super Integer> subscriber) { subscriber.onSubscribe( new Subscription() { public void request(long n) { for ( long demand = n; demand > 0 && iterator.hasNext(); demand-- ) { subscriber.onNext( iterator.next() ); } if ( !iterator.hasNext() ) { subscriber.onComplete(); } } public void cancel() {} }); } |
In particolare l’interfaccia Subscription
definisce il metodo request()
con il quale è gestita la backpressure e che è invocato dal Subscriber
per richiedere ulteriori elementi al Publisher
. Il funzionamento è molto semplice, dato in input il numero n
di elementi richiesti, il metodo li restituisce prelevandoli dall’iterator
, fino a quando ve ne sono. Se gli elementi sono terminati è anche inviato al Subscriber
il segnale di completamento attraverso il metodo onComplete()
.
Modifichiamo anche la classe SimpleSubscriber
in modo da riflettere il nuovo comportamento della classe SimplePublisher
, utilizzando l’oggetto Subscription
per richiedere un elemento alla volta dal Publisher
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
public class SimpleSubscriber implements Subscriber<Integer> { private Subscription subscription; public void onSubscribe(Subscription subscription) { System.out.println( this.getClass().getSimpleName() + ": I'm subscribed now" ); this.subscription = subscription; subscription.request(1); } public void onNext(Integer t) { System.out.println( this.getClass().getSimpleName() + ": " + t ); subscription.request(1); } public void onError(Throwable t) { System.err.println( t ); } public void onComplete() { System.out.println( this.getClass().getSimpleName() + ": completed" ); } } |
Eseguendo nuovamente i test otteniamo questa volta 11 fallimenti, 13 test ignorati e 14 passati con successo. Gestendo quindi le comunicazioni con l’oggetto Subscription
la situazione migliora sensibilmente, molti test ora sono passati con successo, ma proseguiamo analizzando i restanti 11 fallimenti.
Illegal Argument Exception
La specifica 3.9 richiede che venga segnalata una eccezione di tipo IllegalArgumentException
nel caso in qui il Subscriber
richieda un numero di elementi minore o uguale a zero. Specifica che è verificata attraverso i due metodi di test:
required_spec309_requestNegativeNumberMustSignalIllegalArgumentException
required_spec309_requestZeroMustSignalIllegalArgumentException
.
La soluzione è abbastanza immediata perché è sufficiente introdurre il controllo nel metodo request()
. Naturalmente il metodo non deve sollevare l’eccezione ma segnalarla al Subscriber
attraverso il metodo di callback onError()
:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
public void request(long n) { if ( n <= 0 ) { subscriber.onError( new IllegalArgumentException() ); } for ( long demand = n; demand > 0 && iterator.hasNext(); demand-- ) { subscriber.onNext( iterator.next() ); } if ( !iterator.hasNext() ) { subscriber.onComplete(); } } |
Eseguendo nuovamente i test questa volta otteniamo 16 test passati e 9 falliti, di fatto comprendo completamente la regola 3.9 della specifica.
Signaling Termination
Prendiamo ora in esame i due requisiti 1.5 e 1.7. Il primo richiede che il Publisher
segnali correttamente la fine dello stream attraverso il metodo onComplete()
(testato dal metodo required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates
). Il secondo, invece, prevede che quando è inviato un segnale di terminazione, ovvero onError()
o onComplete()
, nessun altro segnale deve essere inviato al Subscriber
(testato dal metodo required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled
). Nella sostanza quello che accade è evidente analizzando lo stack trace relativo all’eccezione sollevata dai due metodi:
1 2 3 4 5 6 7 8 9 |
FAILED: required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates java.lang.AssertionError: Async error during test execution: Unexpected additional complete signal received! at org.testng.Assert.fail(Assert.java:85) ... FAILED: required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled java.lang.AssertionError: Async error during test execution: Unexpected additional complete signal received! at org.testng.Assert.fail(Assert.java:85) |
Entrambe stampano il messaggio Unexpected additional complete signal received
, il che significa che il Publisher
sta inviando più volte il segnale onComplete()
. Effettivamente ispezionando il codice si nota che, ad ogni invocazione del metodo request()
, se lo stream di interi è terminato, il segnale di completamento viene inviato sempre ad ogni richiesta, in quanto la condizione !iterator.hasNext()
risulta sempre vera.
Anche in questo caso il problema è facilmente risolvibile utilizzando un AtomicBoolean
da impostare a true
non appena il primo segnale di completamento è inviato. Il metodo request()
si modifica quindi nel seguente modo:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
private final AtomicBoolean completed = new AtomicBoolean( false ); public void request(long n) { if ( n <= 0 ) { subscriber.onError( new IllegalArgumentException() ); } for ( long demand = n; demand > 0 && iterator.hasNext(); demand-- ) { subscriber.onNext( iterator.next() ); } if ( !iterator.hasNext() ) { if ( completed.compareAndSet(false, true) ) { subscriber.onComplete(); } } } |
La proprietà completed
è inizializzata a false
ed impostata a true
non appena terminano gli elementi nell’iteratore. Il metodo compareAndSet()
esegue in modo atomico sia il test per verificare che la proprietà sia a false
, che il set a true
che è eseguito se e solo se la condizione è verificata. In questo modo il segnale di completamento è inviato una sola volta, indipendentemente da quante volte è invocato il metodo request()
.
Eseguendo nuovamente i test questa volte si ottengono solamente 3 fallimenti e 22 test passati correttamente.
Subscription Cancellation
Analizziamo ora il requisito 3.6 che coerentemente richiede che, alla cancellazione della sottoscrizione da parte del Subscriber
, il Publisher
non invii più dati, neppure se il sottoscrittore continua ad invocare il metodo request()
(test required_spec306_afterSubscriptionIsCancelledRequestMustBeNops
).
Effettivamente il metodo cancel()
dell’oggetto Subscription
da noi implementato non fa assolutamente nulla, quindi la cancellazione effettivamente non è gestita dal nostro codice. Per farlo introduciamo una nuova proprietà di tipo AtomicBoolean
inizializzata a false
, che impostiamo a true
quando è ricevuto il segnale di cancellazione. Inoltre dobbiamo impedire che una eventuale invocazione del metodo request()
continui ad inviare i dati al Subscriber
, e per farlo introduciamo una ulteriore condizione del ciclo for
. Il codice del metodo subscribe
()
si modifica quindi nel seguente modo:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
public void subscribe(Subscriber<? super Integer> subscriber) { subscriber.onSubscribe( new Subscription() { private final AtomicBoolean completed = new AtomicBoolean( false ); private final AtomicBoolean terminated = new AtomicBoolean( false ); public void request(long n) { if ( n <= 0 ) { subscriber.onError( new IllegalArgumentException() ); } for ( long demand = n; demand > 0 && iterator.hasNext() && !terminated.get(); demand-- ) { subscriber.onNext( iterator.next() ); } if ( !iterator.hasNext() ) { if ( completed.compareAndSet(false, true) ) { subscriber.onComplete(); } } } public void cancel() { terminated.set( true ); } }); } |
Unbounded Recursion
La condizione di unbounded recursion si verifica quando si ha a che fare con una implementazione sincrona dell’interfaccia Subscription
. Per comprendere il problema consideriamo la classe SimpleSubscriber
, riportata sopra, che effettivamente è una implementazione sincrona perchè non fa uso di alcun thread. Quello che accade è che si verifica una sequenza di chiamata del tipo:
request() -> onNext() -> request() -> onNext() -> ...
Questo perché nel metodo onNext()
di SimpleSubscriber
viene invocato il metodo request()
della Subscription
, all’interno del quale viene ancora invocato il metodo onNext()
, generando un loop potenzialmente infinito.
Tale problema è indirizzato dalla specifica 3.3 e verificato dal test eseguito nel metodo required_spec303_mustNotAllowUnboundedRecursion
. Si noti che il comportamento appena descritto è anche responsabile del fallimento del test espresso dal metodo required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue
, associato alla specifica 3.17, e che fallisce a causa di una eccezione di StackOverflowError
generata dal loop infinito descritto sopra.
Ancora una volta risolviamo i due problemi utilizzando una proprietà di tipo AtomicBoolean
inizializzata a false
e che viene impostata a true
prima di invocare il metodo onNext()
sul Subscriber
, per poi essere ripristinata a false
all’uscita di tale metodo. La classe SimplePublisher
si modifica quindi nel seguente modo:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
public class SimplePublisher implements Publisher<Integer> { private final Iterator<Integer> iterator; public SimplePublisher(int count) { this.iterator = IntStream.rangeClosed( 1, count ).iterator(); } public void subscribe(Subscriber<? super Integer> subscriber) { subscriber.onSubscribe( new Subscription() { private final AtomicBoolean completed = new AtomicBoolean( false ); private final AtomicBoolean terminated = new AtomicBoolean( false ); private final AtomicBoolean recursion = new AtomicBoolean( false ); public void request(long n) { if ( n <= 0 ) { subscriber.onError( new IllegalArgumentException() ); } for ( long demand = n; demand > 0 && iterator.hasNext() && !terminated.get() && !recursion.get(); demand-- ) { recursion.set( true ); subscriber.onNext( iterator.next() ); recursion.set( false ); } if ( !iterator.hasNext() || recursion.get() ) { if ( completed.compareAndSet(false, true) ) { subscriber.onComplete(); } } } public void cancel() { terminated.set( true ); } }); } } |
La proprietà recursion
è anche utilizzata nel ciclo for
per impedire l’invio dei dati al Subscriber
e nella condizione dell’if
per inviare il segnale di onComplete()
nel caso di unbounded recursion.
Eseguendo i test per l’ultima volta otteniamo infine 0 errori e 25 test passati con successo.
Codice Sorgente
Il codice sorgente con l’esempio presentato è scaricabile qui react.