@ -33,8 +33,6 @@ import javax.servlet.http.HttpServletRequestWrapper;
@@ -33,8 +33,6 @@ import javax.servlet.http.HttpServletRequestWrapper;
import javax.servlet.http.HttpServletResponse ;
import org.reactivestreams.Publisher ;
import org.reactivestreams.Subscriber ;
import org.reactivestreams.Subscription ;
import org.springframework.core.ReactiveAdapter ;
import org.springframework.core.ReactiveAdapterRegistry ;
@ -138,67 +136,22 @@ final class AsyncServerResponse extends ErrorHandlingServerResponse {
@@ -138,67 +136,22 @@ final class AsyncServerResponse extends ErrorHandlingServerResponse {
return new AsyncServerResponse ( futureResponse ) ;
}
else if ( reactiveStreamsPresent ) {
ReactiveAdapter adapter = ReactiveAdapterRegistry . getSharedInstance ( ) . getAdapter ( o . getClass ( ) ) ;
if ( adapter ! = null ) {
Publisher < ServerResponse > publisher = adapter . toPublisher ( o ) ;
CompletableFuture < ServerResponse > futureResponse = new CompletableFuture < > ( ) ;
publisher . subscribe ( new ToFutureSubscriber ( futureResponse ) ) ;
return new AsyncServerResponse ( futureResponse ) ;
ReactiveAdapterRegistry registry = ReactiveAdapterRegistry . getSharedInstance ( ) ;
ReactiveAdapter publisherAdapter = registry . getAdapter ( o . getClass ( ) ) ;
if ( publisherAdapter ! = null ) {
Publisher < ServerResponse > publisher = publisherAdapter . toPublisher ( o ) ;
ReactiveAdapter futureAdapter = registry . getAdapter ( CompletableFuture . class ) ;
if ( futureAdapter ! = null ) {
CompletableFuture < ServerResponse > futureResponse =
( CompletableFuture < ServerResponse > ) futureAdapter . fromPublisher ( publisher ) ;
return new AsyncServerResponse ( futureResponse ) ;
}
}
}
throw new IllegalArgumentException ( "Asynchronous type not supported: " + o . getClass ( ) ) ;
}
/ * *
* Subscriber that exposes the first result it receives via a CompletableFuture .
* /
private static final class ToFutureSubscriber implements Subscriber < ServerResponse > {
private final CompletableFuture < ServerResponse > future ;
@Nullable
private Subscription subscription ;
public ToFutureSubscriber ( CompletableFuture < ServerResponse > future ) {
this . future = future ;
}
@Override
public void onSubscribe ( Subscription s ) {
if ( this . subscription = = null ) {
this . subscription = s ;
s . request ( 1 ) ;
}
else {
s . cancel ( ) ;
}
}
@Override
public void onNext ( ServerResponse serverResponse ) {
if ( ! this . future . isDone ( ) ) {
this . future . complete ( serverResponse ) ;
}
}
@Override
public void onError ( Throwable t ) {
if ( ! this . future . isDone ( ) ) {
this . future . completeExceptionally ( t ) ;
}
}
@Override
public void onComplete ( ) {
if ( ! this . future . isDone ( ) ) {
this . future . completeExceptionally ( new IllegalStateException ( "Did not receive ServerResponse" ) ) ;
}
}
}
/ * *
* HttpServletRequestWrapper that shares its AsyncContext between this
* AsyncServerResponse class and other , subsequent ServerResponse