Reactive Streams Publisher Test

Nel precedente articolo Reactive Streams abbiamo introdotto il Reactive Streams Technology Compatibility Kit (TCK nel seguito), il cui scopo è quello di guidare e aiutare gli sviluppatori di librerie, conformi alla specifica Reactive Streams, a convalidare le loro implementazioni rispetto alle regole che fanno parte delle specifica.

Nelle intenzioni il TCK vorrebbe coprire tutte le regole, tuttavia per alcune di queste non è possibile (o fattibile) costruire test automatizzati, quindi non si può pretendere di verificare completamente al 100% tutta l’implementazione. Ciò nonostante si tratta di uno strumento molto utile che è in grado di validare almeno le norme più importanti della specifica.

Il TCK è distribuito come libreria nel repository Maven Central, e va quindi aggiunto nel progetto come dipendenza:

Implementazione dei Casi di Test

Il TCK è suddiviso in 4 classi di test, realizzate con il framework TestNG, che devono essere estese dagli sviluppatori, fornendo le rispettive implementazioni dei componenti Publisher, Subscriber o Processor che intendono sottoporre a validazione. Tali classi sono:

  • PublisherVerification
  • SubscriberWhiteboxVerification
  • SubscriberBlackboxVerification
  • IdentityProcessorVerification

In questo post utilizzeremo la classe PublisherVerifier per verificare l’implementazione della classe di esempio SimplePublisher sviluppata nel corso del precedente articolo Reactive Streams, che riportiamo per comodità:

L’implementazione del test case per tale classe si realizza in modo molto semplice estendendo la classe astratta PublisherVerifier nel modo seguente:

La classe prevede l’implementazione di due metodo. Il primo, createPublisher(), utilizzato per istanziare l’oggetto Publisher da testare, il secondo, createFailedPublisher(), per istanziare un oggetto che restituisca un errore a seguito di una sottoscrizione. Questo secondo metodo è abilitante all’esecuzione di ulteriori test case, che per il momento possiamo tralasciare e per questo restituiamo null.

Tutti i test implementati nel TCK devono essere eseguiti in modo isolato al fine di consentirne l’esecuzione parallela. Questo è garantito dall’oggetto TestEnvironment nel quale sono anche definiti i timeout di attesa delle diverse callback.

Esecuzione dei Test

Per eseguire il test è sufficiente, in Eclipse, aprire il menù contestuale (tasto destro del mouse) sulla classe SimplePublisherTest e selezionare la voce Run As -> TestNG test. Al termine dell’esecuzione otterremo 19 test falliti, 9 ignorati ed 1 passato.

Il solo caso di successo è quello relativo alla specifica 1.9 che prevede la restituzione dell’eccezione java.lang.NullPointerException nel caso in cui il Subscriber che si sottoscrive sia nullo. L’eccezione è infatti sollevata nel metodo onSubscribe() quando si esegue il metodo onNext() sul Subscriber.

Procediamo analizzando i 19 casi di test falliti uno alla volta e modifichiamo conseguentemente la classe SimplePublisher al fine di risolvere tutti i fallimenti.

Must Issue On Subscribe For Non Null Subscriber

Il metodo required_spec109_mustIssueOnSubscribeForNonNullSubscriber, intende verificare che l’implementazione invochi la callback onSubscribe() a seguito di una sottoscrizione. Tradotto in codice significa che nel metodo subscribe() della classe SimplePublisher è necessario invocare il metodo onSubscribe() sull’oggetto Subscriber ricevuto in input. Tale metodo richiede come parametro un oggetto di tipo Subscription, che ricordiamo rappresenta un link tra Publisher e Subscriber, ed al quale deve essere demandata la gestione della comunicazione tra i due componenti. Il metodo subscribe() diviene quindi:

In particolare l’interfaccia Subscription definisce il metodo request() con il quale è gestita la backpressure e che è invocato dal Subscriber per richiedere ulteriori elementi al Publisher. Il funzionamento è molto semplice, dato in input il numero n di elementi richiesti, il metodo li restituisce prelevandoli dall’iterator,  fino a quando ve ne sono. Se gli elementi sono terminati è anche inviato al Subscriber il segnale di completamento attraverso il metodo onComplete().

Modifichiamo anche la classe SimpleSubscriber in modo da riflettere il nuovo comportamento della classe SimplePublisher, utilizzando l’oggetto Subscription per richiedere un elemento alla volta dal Publisher.

Eseguendo nuovamente i test otteniamo questa volta 11 fallimenti, 13 test ignorati e 14 passati con successo. Gestendo quindi le comunicazioni con l’oggetto Subscription la situazione migliora sensibilmente, molti test ora sono passati con successo, ma proseguiamo analizzando i restanti 11 fallimenti.

Illegal Argument Exception

La specifica 3.9 richiede che venga segnalata una eccezione di tipo IllegalArgumentException nel caso in qui il Subscriber richieda un numero di elementi minore o uguale a zero. Specifica che è verificata attraverso i due metodi di test:

  • required_spec309_requestNegativeNumberMustSignalIllegalArgumentException
  • required_spec309_requestZeroMustSignalIllegalArgumentException.

La soluzione è abbastanza immediata perché è sufficiente introdurre il controllo nel metodo request(). Naturalmente il metodo non deve sollevare l’eccezione ma segnalarla al Subscriber attraverso il metodo di callback onError():

Eseguendo nuovamente i test questa volta otteniamo 16 test passati e 9 falliti, di fatto comprendo completamente la regola 3.9 della specifica.

Signaling Termination

Prendiamo ora in esame i due requisiti 1.5 e 1.7. Il primo richiede che il Publisher segnali correttamente la fine dello stream attraverso il metodo onComplete() (testato dal metodo required_spec105_mustSignalOnCompleteWhenFiniteStreamTerminates). Il secondo, invece, prevede che quando è inviato un segnale di terminazione, ovvero onError() o onComplete(), nessun altro segnale deve essere inviato al Subscriber (testato dal metodo required_spec107_mustNotEmitFurtherSignalsOnceOnCompleteHasBeenSignalled). Nella sostanza quello che accade è evidente analizzando lo stack trace relativo all’eccezione sollevata dai due metodi:

Entrambe stampano il messaggio Unexpected additional complete signal received, il che significa che il Publisher sta inviando più volte il segnale onComplete(). Effettivamente ispezionando il codice si nota che, ad ogni invocazione del metodo request(), se lo stream di interi è terminato, il segnale di completamento viene inviato sempre ad ogni richiesta, in quanto la condizione !iterator.hasNext() risulta sempre vera.

Anche in questo caso il problema è facilmente risolvibile utilizzando un AtomicBoolean da impostare a true non appena il primo segnale di completamento è inviato. Il metodo request() si modifica quindi nel seguente modo:

La proprietà completed è inizializzata a false ed impostata a true non appena terminano gli elementi nell’iteratore.  Il metodo compareAndSet() esegue in modo atomico sia il test per verificare che la proprietà sia a false, che il set a true che è eseguito se e solo se la condizione è verificata. In questo modo il segnale di completamento è inviato una sola volta, indipendentemente da quante volte è invocato il metodo request().

Eseguendo nuovamente i test questa volte si ottengono solamente 3 fallimenti e 22 test passati correttamente.

Subscription Cancellation

Analizziamo ora il requisito 3.6 che coerentemente richiede che, alla cancellazione della sottoscrizione da parte del Subscriber, il Publisher non invii più dati,  neppure se il sottoscrittore continua ad invocare il metodo request() (test required_spec306_afterSubscriptionIsCancelledRequestMustBeNops).

Effettivamente il metodo cancel() dell’oggetto Subscription da noi implementato non fa assolutamente nulla, quindi la cancellazione effettivamente non è gestita dal nostro codice. Per farlo introduciamo una nuova proprietà di tipo AtomicBoolean inizializzata a false, che impostiamo a true quando è ricevuto il segnale di cancellazione. Inoltre dobbiamo impedire che una eventuale invocazione del metodo request() continui ad inviare i dati al Subscriber, e per farlo introduciamo una ulteriore condizione del ciclo for. Il codice del metodo subscribe() si modifica quindi nel seguente modo:

Proseguendo con l’esecuzione dei test otteniamo 2 fallimenti e 23 test passati correttamente.

Unbounded Recursion

La condizione di unbounded recursion si verifica quando si ha a che fare con una implementazione sincrona dell’interfaccia Subscription. Per comprendere il problema consideriamo la classe SimpleSubscriber, riportata sopra, che effettivamente è una implementazione sincrona perchè non fa uso di alcun thread. Quello che accade è che si verifica una sequenza di chiamata del tipo:

request() -> onNext() -> request() -> onNext() -> ...

Questo perché nel metodo onNext() di SimpleSubscriber viene invocato il metodo request() della Subscription, all’interno del quale viene ancora invocato il metodo onNext(), generando un loop potenzialmente infinito.

Tale problema è indirizzato dalla specifica 3.3 e verificato dal test eseguito nel metodo required_spec303_mustNotAllowUnboundedRecursion. Si noti che il comportamento appena descritto è anche responsabile del fallimento del test espresso dal metodo required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue, associato alla specifica 3.17, e che fallisce a causa di una eccezione di StackOverflowError generata dal loop infinito descritto sopra.

Ancora una volta risolviamo i due problemi utilizzando una proprietà di tipo AtomicBoolean inizializzata a false e che viene impostata a true prima di invocare il metodo onNext() sul Subscriber, per poi essere ripristinata a false all’uscita di tale metodo. La classe SimplePublisher si modifica quindi nel seguente modo:

La proprietà recursion è anche utilizzata nel ciclo for per impedire l’invio dei dati al Subscriber e nella condizione dell’if per inviare il segnale di onComplete() nel caso di unbounded recursion.

Eseguendo i test per l’ultima volta otteniamo infine 0 errori e 25 test passati con successo.

Codice Sorgente

Il codice sorgente con l’esempio presentato è scaricabile qui react.