From 3303a68436c23fa4928c402bbec52d2da88818b0 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Fri, 2 Oct 2020 11:39:31 +0200 Subject: [PATCH] Use registry to convert to CompletableFuture Use the ReactiveAdapterRegistry to convert to CompletableFuture, and remove the ToFutureSubscriber. --- .../servlet/function/AsyncServerResponse.java | 67 +++---------------- 1 file changed, 10 insertions(+), 57 deletions(-) diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/function/AsyncServerResponse.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/function/AsyncServerResponse.java index cf347405221..f099a3699d3 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/function/AsyncServerResponse.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/function/AsyncServerResponse.java @@ -33,8 +33,6 @@ import javax.servlet.http.HttpServletRequestWrapper; import javax.servlet.http.HttpServletResponse; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; import org.springframework.core.ReactiveAdapter; import org.springframework.core.ReactiveAdapterRegistry; @@ -138,67 +136,22 @@ final class AsyncServerResponse extends ErrorHandlingServerResponse { return new AsyncServerResponse(futureResponse); } else if (reactiveStreamsPresent) { - ReactiveAdapter adapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(o.getClass()); - if (adapter != null) { - Publisher publisher = adapter.toPublisher(o); - CompletableFuture futureResponse = new CompletableFuture<>(); - publisher.subscribe(new ToFutureSubscriber(futureResponse)); - return new AsyncServerResponse(futureResponse); + ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance(); + ReactiveAdapter publisherAdapter = registry.getAdapter(o.getClass()); + if (publisherAdapter != null) { + Publisher publisher = publisherAdapter.toPublisher(o); + ReactiveAdapter futureAdapter = registry.getAdapter(CompletableFuture.class); + if (futureAdapter != null) { + CompletableFuture futureResponse = + (CompletableFuture) futureAdapter.fromPublisher(publisher); + return new AsyncServerResponse(futureResponse); + } } } throw new IllegalArgumentException("Asynchronous type not supported: " + o.getClass()); } - /** - * Subscriber that exposes the first result it receives via a CompletableFuture. - */ - private static final class ToFutureSubscriber implements Subscriber { - - private final CompletableFuture future; - - @Nullable - private Subscription subscription; - - - public ToFutureSubscriber(CompletableFuture future) { - this.future = future; - } - - @Override - public void onSubscribe(Subscription s) { - if (this.subscription == null) { - this.subscription = s; - s.request(1); - } - else { - s.cancel(); - } - } - - @Override - public void onNext(ServerResponse serverResponse) { - if (!this.future.isDone()) { - this.future.complete(serverResponse); - } - } - - @Override - public void onError(Throwable t) { - if (!this.future.isDone()) { - this.future.completeExceptionally(t); - } - } - - @Override - public void onComplete() { - if (!this.future.isDone()) { - this.future.completeExceptionally(new IllegalStateException("Did not receive ServerResponse")); - } - } - } - - /** * HttpServletRequestWrapper that shares its AsyncContext between this * AsyncServerResponse class and other, subsequent ServerResponse