@ -27,8 +27,6 @@ import org.reactivestreams.Publisher;
@@ -27,8 +27,6 @@ import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber ;
import org.reactivestreams.Subscription ;
import org.springframework.core.io.buffer.DataBuffer ;
/ * *
* Abstract base class for { @code Processor } implementations that bridge between
* event - listener APIs and Reactive Streams . Specifically , base class for the
@ -41,11 +39,11 @@ import org.springframework.core.io.buffer.DataBuffer;
@@ -41,11 +39,11 @@ import org.springframework.core.io.buffer.DataBuffer;
* @see UndertowHttpHandlerAdapter
* @see ServerHttpResponse # writeAndFlushWith ( Publisher )
* /
abstract class AbstractResponseBodyFlushProcessor implements Processor < Publisher < ? extends DataBuffer > , Void > {
public abstract class AbstractListenerFlushProcessor < T > implements Processor < Publisher < ? extends T > , Void > {
protected final Log logger = LogFactory . getLog ( getClass ( ) ) ;
private final ResponseBody WriteResultPublisher resultPublisher = new ResponseBody WriteResultPublisher( ) ;
private final WriteResultPublisher resultPublisher = new WriteResultPublisher ( ) ;
private final AtomicReference < State > state = new AtomicReference < > ( State . UNSUBSCRIBED ) ;
@ -65,7 +63,7 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor<Publisher
@@ -65,7 +63,7 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor<Publisher
}
@Override
public final void onNext ( Publisher < ? extends DataBuffer > publisher ) {
public final void onNext ( Publisher < ? extends T > publisher ) {
if ( logger . isTraceEnabled ( ) ) {
logger . trace ( this . state + " onNext: " + publisher ) ;
}
@ -100,7 +98,7 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor<Publisher
@@ -100,7 +98,7 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor<Publisher
/ * *
* Creates a new processor for subscribing to a body chunk .
* /
protected abstract Processor < ? super DataBuffer , Void > createBodyProcessor ( ) ;
protected abstract Processor < ? super T , Void > createBodyProcessor ( ) ;
/ * *
* Flushes the output .
@ -130,7 +128,7 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor<Publisher
@@ -130,7 +128,7 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor<Publisher
UNSUBSCRIBED {
@Override
public void onSubscribe ( AbstractResponseBodyFlushProcessor processor , Subscription subscription ) {
public < T > void onSubscribe ( AbstractListenerFlushProcessor < T > processor , Subscription subscription ) {
Objects . requireNonNull ( subscription , "Subscription cannot be null" ) ;
if ( processor . changeState ( this , REQUESTED ) ) {
processor . subscription = subscription ;
@ -144,16 +142,16 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor<Publisher
@@ -144,16 +142,16 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor<Publisher
REQUESTED {
@Override
public void onNext ( AbstractResponseBodyFlushProcessor processor , Publisher < ? extends DataBuffer > chunk ) {
public < T > void onNext ( AbstractListenerFlushProcessor < T > processor , Publisher < ? extends T > chunk ) {
if ( processor . changeState ( this , RECEIVED ) ) {
Processor < ? super DataBuffer , Void > chunkProcessor = processor . createBodyProcessor ( ) ;
Processor < ? super T , Void > chunkProcessor = processor . createBodyProcessor ( ) ;
chunk . subscribe ( chunkProcessor ) ;
chunkProcessor . subscribe ( new WriteSubscriber ( processor ) ) ;
}
}
@Override
public void onComplete ( AbstractResponseBodyFlushProcessor processor ) {
public < T > void onComplete ( AbstractListenerFlushProcessor < T > processor ) {
if ( processor . changeState ( this , COMPLETED ) ) {
processor . resultPublisher . publishComplete ( ) ;
}
@ -162,7 +160,7 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor<Publisher
@@ -162,7 +160,7 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor<Publisher
RECEIVED {
@Override
public void writeComplete ( AbstractResponseBodyFlushProcessor processor ) {
public < T > void writeComplete ( AbstractListenerFlushProcessor < T > processor ) {
try {
processor . flush ( ) ;
}
@ -184,58 +182,58 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor<Publisher
@@ -184,58 +182,58 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor<Publisher
}
@Override
public void onComplete ( AbstractResponseBodyFlushProcessor processor ) {
public < T > void onComplete ( AbstractListenerFlushProcessor < T > processor ) {
processor . subscriberCompleted = true ;
}
} ,
COMPLETED {
@Override
public void onNext ( AbstractResponseBodyFlushProcessor processor ,
Publisher < ? extends DataBuffer > publisher ) {
public < T > void onNext ( AbstractListenerFlushProcessor < T > processor ,
Publisher < ? extends T > publisher ) {
// ignore
}
@Override
public void onError ( AbstractResponseBodyFlushProcessor processor , Throwable t ) {
public < T > void onError ( AbstractListenerFlushProcessor < T > processor , Throwable t ) {
// ignore
}
@Override
public void onComplete ( AbstractResponseBodyFlushProcessor processor ) {
public < T > void onComplete ( AbstractListenerFlushProcessor < T > processor ) {
// ignore
}
} ;
public void onSubscribe ( AbstractResponseBodyFlushProcessor processor , Subscription subscription ) {
public < T > void onSubscribe ( AbstractListenerFlushProcessor < T > processor , Subscription subscription ) {
subscription . cancel ( ) ;
}
public void onNext ( AbstractResponseBodyFlushProcessor processor , Publisher < ? extends DataBuffer > publisher ) {
public < T > void onNext ( AbstractListenerFlushProcessor < T > processor , Publisher < ? extends T > publisher ) {
throw new IllegalStateException ( toString ( ) ) ;
}
public void onError ( AbstractResponseBodyFlushProcessor processor , Throwable ex ) {
public < T > void onError ( AbstractListenerFlushProcessor < T > processor , Throwable ex ) {
if ( processor . changeState ( this , COMPLETED ) ) {
processor . resultPublisher . publishError ( ex ) ;
}
}
public void onComplete ( AbstractResponseBodyFlushProcessor processor ) {
public < T > void onComplete ( AbstractListenerFlushProcessor < T > processor ) {
throw new IllegalStateException ( toString ( ) ) ;
}
public void writeComplete ( AbstractResponseBodyFlushProcessor processor ) {
public < T > void writeComplete ( AbstractListenerFlushProcessor < T > processor ) {
// ignore
}
private static class WriteSubscriber implements Subscriber < Void > {
private final AbstractResponseBodyFlushProcessor processor ;
private final AbstractListenerFlushProcessor < ? > processor ;
public WriteSubscriber ( AbstractResponseBodyFlushProcessor processor ) {
public WriteSubscriber ( AbstractListenerFlushProcessor < ? > processor ) {
this . processor = processor ;
}