diff --git a/spring-web-reactive/build.gradle b/spring-web-reactive/build.gradle index bf58b483b69..3cb153e31c8 100644 --- a/spring-web-reactive/build.gradle +++ b/spring-web-reactive/build.gradle @@ -22,6 +22,7 @@ dependencies { compile "org.springframework:spring-core:4.2.0.RELEASE" compile "org.springframework:spring-web:4.2.0.RELEASE" compile "org.reactivestreams:reactive-streams:1.0.0" + compile "io.projectreactor:reactor-core:2.0.5.RELEASE" compile "org.slf4j:slf4j-api:1.7.6" compile "ch.qos.logback:logback-classic:1.1.2" diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/DispatcherHttpHandler.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/DispatcherHttpHandler.java index 9f6f4aad6d5..5093586451e 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/DispatcherHttpHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/DispatcherHttpHandler.java @@ -21,6 +21,8 @@ import java.util.List; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import reactor.core.reactivestreams.PublisherFactory; +import reactor.core.reactivestreams.SubscriberWithContext; import org.springframework.beans.factory.BeanFactoryUtils; import org.springframework.context.ApplicationContext; @@ -58,7 +60,7 @@ public class DispatcherHttpHandler implements HttpHandler { if (handler == null) { // No exception handling mechanism yet response.setStatusCode(HttpStatus.NOT_FOUND); - return Publishers.complete(); + return PublisherFactory.forEach(SubscriberWithContext::onComplete); } HandlerAdapter handlerAdapter = getHandlerAdapter(handler); @@ -143,27 +145,4 @@ public class DispatcherHttpHandler implements HttpHandler { throw new IllegalStateException("No HandlerAdapter for " + handler); } - - private static class Publishers { - - - public static Publisher complete() { - return subscriber -> { - subscriber.onSubscribe(new NoopSubscription()); - subscriber.onComplete(); - }; - } - } - - private static class NoopSubscription implements Subscription { - - @Override - public void request(long n) { - } - - @Override - public void cancel() { - } - } - } diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/web/DispatcherApp.java b/spring-web-reactive/src/test/java/org/springframework/reactive/web/DispatcherApp.java index 7b3eb7c5d7f..1707908b17c 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/web/DispatcherApp.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/web/DispatcherApp.java @@ -24,6 +24,7 @@ import io.reactivex.netty.protocol.http.server.HttpServer; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import reactor.core.reactivestreams.PublisherFactory; import org.springframework.http.MediaType; import org.springframework.reactive.web.rxnetty.RequestHandlerAdapter; @@ -80,21 +81,10 @@ public class DispatcherApp { @Override public Publisher handle(ServerHttpRequest request, ServerHttpResponse response) { - - return new Publisher() { - - @Override - public void subscribe(Subscriber subscriber) { - subscriber.onSubscribe(new AbstractSubscription(subscriber) { - - @Override - protected void requestInternal(long n) { - invokeOnNext("Hello world."); - invokeOnComplete(); - } - }); - } - }; + return PublisherFactory.forEach((subscriber) -> { + subscriber.onNext("Hello world."); + subscriber.onComplete(); + }); } } @@ -113,40 +103,30 @@ public class DispatcherApp { PlainTextHandler textHandler = (PlainTextHandler) handler; final Publisher resultPublisher = textHandler.handle(request, response); - return new Publisher() { - - @Override - public void subscribe(Subscriber handlerResultSubscriber) { - handlerResultSubscriber.onSubscribe(new AbstractSubscription(handlerResultSubscriber) { - - @Override - protected void requestInternal(long n) { - resultPublisher.subscribe(new Subscriber() { - - @Override - public void onSubscribe(Subscription subscription) { - subscription.request(Long.MAX_VALUE); - } - - @Override - public void onNext(Object result) { - invokeOnNext(new HandlerResult(result)); - } - - @Override - public void onError(Throwable error) { - invokeOnError(error); - } - - @Override - public void onComplete() { - invokeOnComplete(); - } - }); - } - }); - } - }; + return PublisherFactory.forEach((subscriber) -> { + resultPublisher.subscribe(new Subscriber() { + + @Override + public void onSubscribe(Subscription subscription) { + subscription.request(Long.MAX_VALUE); + } + + @Override + public void onNext(Object result) { + subscriber.onNext(new HandlerResult(result)); + } + + @Override + public void onError(Throwable error) { + subscriber.onError(error); + } + + @Override + public void onComplete() { + subscriber.onComplete(); + } + }); + }); } } @@ -159,74 +139,13 @@ public class DispatcherApp { } @Override - public Publisher handleResult(ServerHttpRequest request, ServerHttpResponse response, - HandlerResult result) { - + public Publisher handleResult(ServerHttpRequest request, ServerHttpResponse response, HandlerResult result) { response.getHeaders().setContentType(MediaType.TEXT_PLAIN); - - return response.writeWith(new Publisher() { - - @Override - public void subscribe(Subscriber writeSubscriber) { - writeSubscriber.onSubscribe(new AbstractSubscription(writeSubscriber) { - - @Override - protected void requestInternal(long n) { - Charset charset = Charset.forName("UTF-8"); - invokeOnNext(((String) result.getReturnValue()).getBytes(charset)); - invokeOnComplete(); - } - }); - } - }); - } - } - - - private static abstract class AbstractSubscription implements Subscription { - - private final Subscriber subscriber; - - private volatile boolean terminated; - - - public AbstractSubscription(Subscriber subscriber) { - this.subscriber = subscriber; - } - - protected boolean isTerminated() { - return this.terminated; - } - - @Override - public void request(long n) { - if (isTerminated()) { - return; - } - if (n > 0) { - requestInternal(n); - } - } - - protected abstract void requestInternal(long n); - - @Override - public void cancel() { - this.terminated = true; - } - - protected void invokeOnNext(T data) { - this.subscriber.onNext(data); - } - - protected void invokeOnError(Throwable error) { - this.terminated = true; - this.subscriber.onError(error); - } - - protected void invokeOnComplete() { - this.terminated = true; - this.subscriber.onComplete(); + return response.writeWith(PublisherFactory.forEach((writeSubscriber) -> { + Charset charset = Charset.forName("UTF-8"); + writeSubscriber.onNext(((String) result.getReturnValue()).getBytes(charset)); + writeSubscriber.onComplete(); + })); } }