From 6922a1c5346a0f9730990986e3cbb984b3f3cb2e Mon Sep 17 00:00:00 2001 From: Stephane Maldini Date: Tue, 13 Dec 2016 17:22:11 +0000 Subject: [PATCH] Sync with latest reactor netty changes --- .../ReactorNettyWebSocketHandlerAdapter.java | 8 ++--- .../adapter/ReactorNettyWebSocketSession.java | 31 ++++++++++--------- .../ReactorNettyRequestUpgradeStrategy.java | 2 +- ...pingMessageConversionIntegrationTests.java | 4 --- .../reactive/ReactorClientHttpRequest.java | 8 ++--- .../reactive/ReactorHttpHandlerAdapter.java | 2 +- .../reactive/ReactorServerHttpResponse.java | 6 ++-- 7 files changed, 29 insertions(+), 32 deletions(-) diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketHandlerAdapter.java index 56b6e779941..7c5f90e299e 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketHandlerAdapter.java @@ -18,8 +18,8 @@ package org.springframework.web.reactive.socket.adapter; import java.util.function.BiFunction; import org.reactivestreams.Publisher; -import reactor.ipc.netty.http.HttpInbound; -import reactor.ipc.netty.http.HttpOutbound; +import reactor.ipc.netty.http.websocket.WebsocketInbound; +import reactor.ipc.netty.http.websocket.WebsocketOutbound; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpResponse; @@ -33,7 +33,7 @@ import org.springframework.web.reactive.socket.WebSocketHandler; * @since 5.0 */ public class ReactorNettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport - implements BiFunction> { + implements BiFunction> { public ReactorNettyWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response, @@ -44,7 +44,7 @@ public class ReactorNettyWebSocketHandlerAdapter extends WebSocketHandlerAdapter @Override - public Publisher apply(HttpInbound inbound, HttpOutbound outbound) { + public Publisher apply(WebsocketInbound inbound, WebsocketOutbound outbound) { ReactorNettyWebSocketSession session = new ReactorNettyWebSocketSession(inbound, outbound, getUri(), getBufferFactory()); return getDelegate().handle(session); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java index 5abb6a223ef..6dca8680718 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java @@ -21,8 +21,9 @@ import io.netty.handler.codec.http.websocketx.WebSocketFrame; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.ipc.netty.http.HttpInbound; -import reactor.ipc.netty.http.HttpOutbound; +import reactor.ipc.netty.NettyPipeline; +import reactor.ipc.netty.http.websocket.WebsocketInbound; +import reactor.ipc.netty.http.websocket.WebsocketOutbound; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.web.reactive.socket.CloseStatus; @@ -40,8 +41,8 @@ import org.springframework.web.reactive.socket.WebSocketSession; public class ReactorNettyWebSocketSession extends NettyWebSocketSessionSupport { - - protected ReactorNettyWebSocketSession(HttpInbound inbound, HttpOutbound outbound, + protected ReactorNettyWebSocketSession(WebsocketInbound inbound, + WebsocketOutbound outbound, URI uri, NettyDataBufferFactory factory) { super(new WebSocketConnection(inbound, outbound), uri, factory); @@ -50,16 +51,17 @@ public class ReactorNettyWebSocketSession @Override public Flux receive() { - HttpInbound inbound = getDelegate().getHttpInbound(); + WebsocketInbound inbound = getDelegate().getWebsocketInbound(); return toMessageFlux(inbound.receiveObject().cast(WebSocketFrame.class)); } @Override public Mono send(Publisher messages) { Flux frameFlux = Flux.from(messages).map(this::toFrame); - HttpOutbound outbound = getDelegate().getHttpOutbound(); - outbound.flushEach(); - return outbound.sendObject(frameFlux); + WebsocketOutbound outbound = getDelegate().getWebsocketOutbound(); + return outbound.options(NettyPipeline.SendOptions::flushOnEach) + .sendObject(frameFlux) + .then(); } @Override @@ -71,25 +73,24 @@ public class ReactorNettyWebSocketSession /** - * Simple container for {@link HttpInbound} and {@link HttpOutbound}. + * Simple container for {@link WebsocketInbound} and {@link WebsocketOutbound}. */ public static class WebSocketConnection { - private final HttpInbound inbound; - - private final HttpOutbound outbound; + private final WebsocketInbound inbound; + private final WebsocketOutbound outbound; - public WebSocketConnection(HttpInbound inbound, HttpOutbound outbound) { + public WebSocketConnection(WebsocketInbound inbound, WebsocketOutbound outbound) { this.inbound = inbound; this.outbound = outbound; } - public HttpInbound getHttpInbound() { + public WebsocketInbound getWebsocketInbound() { return this.inbound; } - public HttpOutbound getHttpOutbound() { + public WebsocketOutbound getWebsocketOutbound() { return this.outbound; } } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java index 7cd3fa03227..9f8531862df 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java @@ -47,7 +47,7 @@ public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrateg String protocols = StringUtils.arrayToCommaDelimitedString(getSubProtocols(webSocketHandler)); protocols = (StringUtils.hasText(protocols) ? protocols : null); - return response.getReactorResponse().upgradeToWebsocket(protocols, false, reactorHandler); + return response.getReactorResponse().sendWebsocket(protocols, reactorHandler); } private static String[] getSubProtocols(WebSocketHandler webSocketHandler) { diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingMessageConversionIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingMessageConversionIntegrationTests.java index 32631b27876..f61a7ab535a 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingMessageConversionIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingMessageConversionIntegrationTests.java @@ -177,10 +177,6 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq @Test public void resource() throws Exception { - - // SPR-14975 - assumeFalse(server instanceof ReactorHttpServer); - ResponseEntity response = performGet("/resource", new HttpHeaders(), byte[].class); assertEquals(HttpStatus.OK, response.getStatusCode()); diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java index 56a9cfb036a..46c9dd6715a 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java @@ -54,7 +54,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { this.httpMethod = httpMethod; this.uri = uri; this.httpRequest = httpRequest; - this.bufferFactory = new NettyDataBufferFactory(httpRequest.channel().alloc()); + this.bufferFactory = new NettyDataBufferFactory(httpRequest.alloc()); } @@ -76,7 +76,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { @Override public Mono writeWith(Publisher body) { return applyBeforeCommit().then(this.httpRequest - .send(Flux.from(body).map(NettyDataBufferFactory::toByteBuf))); + .send(Flux.from(body).map(NettyDataBufferFactory::toByteBuf)).then()); } @Override @@ -84,7 +84,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { Publisher> byteBufs = Flux.from(body). map(ReactorClientHttpRequest::toByteBufs); return applyBeforeCommit().then(this.httpRequest - .sendGroups(byteBufs)); + .sendGroups(byteBufs).then()); } private static Publisher toByteBufs(Publisher dataBuffers) { @@ -94,7 +94,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { @Override public Mono setComplete() { - return applyBeforeCommit().then(httpRequest.sendHeaders()); + return applyBeforeCommit().then(httpRequest.sendHeaders().then()); } @Override diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java index aa9f55328e3..66bf702d7cc 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java @@ -48,7 +48,7 @@ public class ReactorHttpHandlerAdapter extends HttpHandlerAdapterSupport @Override public Mono apply(HttpServerRequest request, HttpServerResponse response) { - NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(request.channel().alloc()); + NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(response.alloc()); ReactorServerHttpRequest req = new ReactorServerHttpRequest(request, bufferFactory); ReactorServerHttpResponse resp = new ReactorServerHttpResponse(response, bufferFactory); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java index d128184078f..0f88bf4a400 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java @@ -72,14 +72,14 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse @Override protected Mono writeWithInternal(Publisher publisher) { Publisher body = toByteBufs(publisher); - return this.response.send(body); + return this.response.send(body).then(); } @Override protected Mono writeAndFlushWithInternal(Publisher> publisher) { Publisher> body = Flux.from(publisher) .map(ReactorServerHttpResponse::toByteBufs); - return this.response.sendGroups(body); + return this.response.sendGroups(body).then(); } @Override @@ -114,7 +114,7 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse @Override public Mono writeWith(File file, long position, long count) { - return doCommit(() -> this.response.sendFile(file.toPath(), position, count)); + return doCommit(() -> this.response.sendFile(file.toPath(), position, count).then()); } private static Publisher toByteBufs(Publisher dataBuffers) {