Browse Source

ReactorNettyWebSocketSession implements close properly

Issue: SPR-16774
pull/1800/head
Rossen Stoyanchev 8 years ago
parent
commit
417bb302c3
  1. 13
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java
  2. 26
      spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java

13
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2016 the original author or authors. * Copyright 2002-2018 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -15,10 +15,12 @@
*/ */
package org.springframework.web.reactive.socket.adapter; package org.springframework.web.reactive.socket.adapter;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.ipc.netty.NettyInbound; import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound; import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.NettyPipeline; import reactor.ipc.netty.NettyPipeline;
@ -42,6 +44,8 @@ import org.springframework.web.reactive.socket.WebSocketSession;
public class ReactorNettyWebSocketSession public class ReactorNettyWebSocketSession
extends NettyWebSocketSessionSupport<ReactorNettyWebSocketSession.WebSocketConnection> { extends NettyWebSocketSessionSupport<ReactorNettyWebSocketSession.WebSocketConnection> {
private final MonoProcessor<WebSocketFrame> closeMono = MonoProcessor.create();
public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound, public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound,
HandshakeInfo info, NettyDataBufferFactory bufferFactory) { HandshakeInfo info, NettyDataBufferFactory bufferFactory) {
@ -69,11 +73,8 @@ public class ReactorNettyWebSocketSession
@Override @Override
public Mono<Void> close(CloseStatus status) { public Mono<Void> close(CloseStatus status) {
return Mono.error(new UnsupportedOperationException( WebSocketFrame closeFrame = new CloseWebSocketFrame(status.getCode(), status.getReason());
"Reactor Netty does not support closing the session from anywhere. " + return getDelegate().getOutbound().sendObject(closeFrame).then();
"You will need to work with the Flux returned from receive() method, " +
"either subscribing to it and using the returned Disposable, " +
"or using an operator that cancels (e.g. take)."));
} }

26
spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2017 the original author or authors. * Copyright 2002-2018 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -127,6 +127,21 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
assertEquals("my-header:my-value", output.block(Duration.ofMillis(5000))); assertEquals("my-header:my-value", output.block(Duration.ofMillis(5000)));
} }
@Test
public void sessionClosing() throws Exception {
this.client.execute(getUrl("/close"),
session -> {
logger.debug("Starting..");
return session.receive()
.doOnNext(s -> logger.debug("inbound " + s))
.then()
.doFinally(signalType -> {
logger.debug("Completed with: " + signalType);
});
})
.block(Duration.ofMillis(5000));
}
@Configuration @Configuration
static class WebConfig { static class WebConfig {
@ -137,6 +152,7 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
map.put("/echo", new EchoWebSocketHandler()); map.put("/echo", new EchoWebSocketHandler());
map.put("/sub-protocol", new SubProtocolWebSocketHandler()); map.put("/sub-protocol", new SubProtocolWebSocketHandler());
map.put("/custom-header", new CustomHeaderHandler()); map.put("/custom-header", new CustomHeaderHandler());
map.put("/close", new SessionClosingHandler());
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping(); SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map); mapping.setUrlMap(map);
@ -183,4 +199,12 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
} }
} }
private static class SessionClosingHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return Flux.never().mergeWith(session.close(CloseStatus.GOING_AWAY)).then();
}
}
} }

Loading…
Cancel
Save