Browse Source

Sync with latest reactor netty changes

pull/1263/merge
Stephane Maldini 9 years ago
parent
commit
6922a1c534
  1. 8
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketHandlerAdapter.java
  2. 31
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java
  3. 2
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java
  4. 4
      spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingMessageConversionIntegrationTests.java
  5. 8
      spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java
  6. 2
      spring-web/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java
  7. 6
      spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java

8
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketHandlerAdapter.java

@ -18,8 +18,8 @@ package org.springframework.web.reactive.socket.adapter; @@ -18,8 +18,8 @@ package org.springframework.web.reactive.socket.adapter;
import java.util.function.BiFunction;
import org.reactivestreams.Publisher;
import reactor.ipc.netty.http.HttpInbound;
import reactor.ipc.netty.http.HttpOutbound;
import reactor.ipc.netty.http.websocket.WebsocketInbound;
import reactor.ipc.netty.http.websocket.WebsocketOutbound;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
@ -33,7 +33,7 @@ import org.springframework.web.reactive.socket.WebSocketHandler; @@ -33,7 +33,7 @@ import org.springframework.web.reactive.socket.WebSocketHandler;
* @since 5.0
*/
public class ReactorNettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport
implements BiFunction<HttpInbound, HttpOutbound, Publisher<Void>> {
implements BiFunction<WebsocketInbound, WebsocketOutbound, Publisher<Void>> {
public ReactorNettyWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response,
@ -44,7 +44,7 @@ public class ReactorNettyWebSocketHandlerAdapter extends WebSocketHandlerAdapter @@ -44,7 +44,7 @@ public class ReactorNettyWebSocketHandlerAdapter extends WebSocketHandlerAdapter
@Override
public Publisher<Void> apply(HttpInbound inbound, HttpOutbound outbound) {
public Publisher<Void> apply(WebsocketInbound inbound, WebsocketOutbound outbound) {
ReactorNettyWebSocketSession session =
new ReactorNettyWebSocketSession(inbound, outbound, getUri(), getBufferFactory());
return getDelegate().handle(session);

31
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java

@ -21,8 +21,9 @@ import io.netty.handler.codec.http.websocketx.WebSocketFrame; @@ -21,8 +21,9 @@ import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.http.HttpInbound;
import reactor.ipc.netty.http.HttpOutbound;
import reactor.ipc.netty.NettyPipeline;
import reactor.ipc.netty.http.websocket.WebsocketInbound;
import reactor.ipc.netty.http.websocket.WebsocketOutbound;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.web.reactive.socket.CloseStatus;
@ -40,8 +41,8 @@ import org.springframework.web.reactive.socket.WebSocketSession; @@ -40,8 +41,8 @@ import org.springframework.web.reactive.socket.WebSocketSession;
public class ReactorNettyWebSocketSession
extends NettyWebSocketSessionSupport<ReactorNettyWebSocketSession.WebSocketConnection> {
protected ReactorNettyWebSocketSession(HttpInbound inbound, HttpOutbound outbound,
protected ReactorNettyWebSocketSession(WebsocketInbound inbound,
WebsocketOutbound outbound,
URI uri, NettyDataBufferFactory factory) {
super(new WebSocketConnection(inbound, outbound), uri, factory);
@ -50,16 +51,17 @@ public class ReactorNettyWebSocketSession @@ -50,16 +51,17 @@ public class ReactorNettyWebSocketSession
@Override
public Flux<WebSocketMessage> receive() {
HttpInbound inbound = getDelegate().getHttpInbound();
WebsocketInbound inbound = getDelegate().getWebsocketInbound();
return toMessageFlux(inbound.receiveObject().cast(WebSocketFrame.class));
}
@Override
public Mono<Void> send(Publisher<WebSocketMessage> messages) {
Flux<WebSocketFrame> frameFlux = Flux.from(messages).map(this::toFrame);
HttpOutbound outbound = getDelegate().getHttpOutbound();
outbound.flushEach();
return outbound.sendObject(frameFlux);
WebsocketOutbound outbound = getDelegate().getWebsocketOutbound();
return outbound.options(NettyPipeline.SendOptions::flushOnEach)
.sendObject(frameFlux)
.then();
}
@Override
@ -71,25 +73,24 @@ public class ReactorNettyWebSocketSession @@ -71,25 +73,24 @@ public class ReactorNettyWebSocketSession
/**
* Simple container for {@link HttpInbound} and {@link HttpOutbound}.
* Simple container for {@link WebsocketInbound} and {@link WebsocketOutbound}.
*/
public static class WebSocketConnection {
private final HttpInbound inbound;
private final HttpOutbound outbound;
private final WebsocketInbound inbound;
private final WebsocketOutbound outbound;
public WebSocketConnection(HttpInbound inbound, HttpOutbound outbound) {
public WebSocketConnection(WebsocketInbound inbound, WebsocketOutbound outbound) {
this.inbound = inbound;
this.outbound = outbound;
}
public HttpInbound getHttpInbound() {
public WebsocketInbound getWebsocketInbound() {
return this.inbound;
}
public HttpOutbound getHttpOutbound() {
public WebsocketOutbound getWebsocketOutbound() {
return this.outbound;
}
}

2
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ReactorNettyRequestUpgradeStrategy.java

@ -47,7 +47,7 @@ public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrateg @@ -47,7 +47,7 @@ public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrateg
String protocols = StringUtils.arrayToCommaDelimitedString(getSubProtocols(webSocketHandler));
protocols = (StringUtils.hasText(protocols) ? protocols : null);
return response.getReactorResponse().upgradeToWebsocket(protocols, false, reactorHandler);
return response.getReactorResponse().sendWebsocket(protocols, reactorHandler);
}
private static String[] getSubProtocols(WebSocketHandler webSocketHandler) {

4
spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingMessageConversionIntegrationTests.java

@ -177,10 +177,6 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq @@ -177,10 +177,6 @@ public class RequestMappingMessageConversionIntegrationTests extends AbstractReq
@Test
public void resource() throws Exception {
// SPR-14975
assumeFalse(server instanceof ReactorHttpServer);
ResponseEntity<byte[]> response = performGet("/resource", new HttpHeaders(), byte[].class);
assertEquals(HttpStatus.OK, response.getStatusCode());

8
spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java

@ -54,7 +54,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { @@ -54,7 +54,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
this.httpMethod = httpMethod;
this.uri = uri;
this.httpRequest = httpRequest;
this.bufferFactory = new NettyDataBufferFactory(httpRequest.channel().alloc());
this.bufferFactory = new NettyDataBufferFactory(httpRequest.alloc());
}
@ -76,7 +76,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { @@ -76,7 +76,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
return applyBeforeCommit().then(this.httpRequest
.send(Flux.from(body).map(NettyDataBufferFactory::toByteBuf)));
.send(Flux.from(body).map(NettyDataBufferFactory::toByteBuf)).then());
}
@Override
@ -84,7 +84,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { @@ -84,7 +84,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
Publisher<Publisher<ByteBuf>> byteBufs = Flux.from(body).
map(ReactorClientHttpRequest::toByteBufs);
return applyBeforeCommit().then(this.httpRequest
.sendGroups(byteBufs));
.sendGroups(byteBufs).then());
}
private static Publisher<ByteBuf> toByteBufs(Publisher<? extends DataBuffer> dataBuffers) {
@ -94,7 +94,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { @@ -94,7 +94,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
@Override
public Mono<Void> setComplete() {
return applyBeforeCommit().then(httpRequest.sendHeaders());
return applyBeforeCommit().then(httpRequest.sendHeaders().then());
}
@Override

2
spring-web/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java

@ -48,7 +48,7 @@ public class ReactorHttpHandlerAdapter extends HttpHandlerAdapterSupport @@ -48,7 +48,7 @@ public class ReactorHttpHandlerAdapter extends HttpHandlerAdapterSupport
@Override
public Mono<Void> apply(HttpServerRequest request, HttpServerResponse response) {
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(request.channel().alloc());
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(response.alloc());
ReactorServerHttpRequest req = new ReactorServerHttpRequest(request, bufferFactory);
ReactorServerHttpResponse resp = new ReactorServerHttpResponse(response, bufferFactory);

6
spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java

@ -72,14 +72,14 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse @@ -72,14 +72,14 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse
@Override
protected Mono<Void> writeWithInternal(Publisher<? extends DataBuffer> publisher) {
Publisher<ByteBuf> body = toByteBufs(publisher);
return this.response.send(body);
return this.response.send(body).then();
}
@Override
protected Mono<Void> writeAndFlushWithInternal(Publisher<? extends Publisher<? extends DataBuffer>> publisher) {
Publisher<Publisher<ByteBuf>> body = Flux.from(publisher)
.map(ReactorServerHttpResponse::toByteBufs);
return this.response.sendGroups(body);
return this.response.sendGroups(body).then();
}
@Override
@ -114,7 +114,7 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse @@ -114,7 +114,7 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse
@Override
public Mono<Void> writeWith(File file, long position, long count) {
return doCommit(() -> this.response.sendFile(file.toPath(), position, count));
return doCommit(() -> this.response.sendFile(file.toPath(), position, count).then());
}
private static Publisher<ByteBuf> toByteBufs(Publisher<? extends DataBuffer> dataBuffers) {

Loading…
Cancel
Save