diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketHandler.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketHandler.java index 75ed51c4569..32ae15599fe 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketHandler.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketHandler.java @@ -25,12 +25,18 @@ import reactor.core.publisher.Mono; /** * Handler for a WebSocket session. * - *

Use {@link WebSocketSession#receive()} to compose on the stream of - * inbound messages and {@link WebSocketSession#send(Publisher)} to write the - * stream of outbound messages. + *

A server {@code WebSocketHandler} is mapped to requests with + * {@link org.springframework.web.reactive.handler.SimpleUrlHandlerMapping + * SimpleUrlHandlerMapping} and + * {@link org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter + * WebSocketHandlerAdapter}. A client {@code WebSocketHandler} is passed to the + * {@link org.springframework.web.reactive.socket.client.WebSocketClient + * WebSocketClient} execute method. * - *

You can handle inbound and outbound messages as independent streams, and - * then join them: + *

Use {@link WebSocketSession#receive() session.receive()} to compose on + * the inbound message stream, and {@link WebSocketSession#send(Publisher) + * session.send(publisher)} for the outbound message stream. Below is an + * example, combined flow to process inbound and to send outbound messages: * *

  * class ExampleHandler implements WebSocketHandler {
@@ -38,49 +44,52 @@ import reactor.core.publisher.Mono;
  * 	@Override
  * 	public Mono<Void> handle(WebSocketSession session) {
  *
- * 		Mono<Void> input = session.receive()
+ * 		Flux<WebSocketMessage> input = session.receive()
  *			.doOnNext(message -> {
  * 				// ...
  * 			})
  * 			.concatMap(message -> {
  * 				// ...
  * 			})
- * 			.then();
- *
- *		Flux<String> source = ... ;
- * 		Mono<Void> output = session.send(source.map(session::textMessage));
+ * 			.map(value -> session.textMessage("Echo " + value));
  *
- * 		return Mono.zip(input, output).then();
+ * 		return session.send(output);
  * 	}
  * }
  * 
* - *

You can also create a single flow including inbound and outbound messages: + *

If processing inbound and sending outbound messages are independent + * streams, they can be joined together with the "zip" operator: + * *

  * class ExampleHandler implements WebSocketHandler {
 
  * 	@Override
  * 	public Mono<Void> handle(WebSocketSession session) {
  *
- * 		Flux<WebSocketMessage> input = session.receive()
+ * 		Mono<Void> input = session.receive()
  *			.doOnNext(message -> {
  * 				// ...
  * 			})
  * 			.concatMap(message -> {
  * 				// ...
  * 			})
- * 			.map(value -> session.textMessage("Echo " + value));
+ * 			.then();
  *
- * 		return session.send(output);
+ *		Flux<String> source = ... ;
+ * 		Mono<Void> output = session.send(source.map(session::textMessage));
+ *
+ * 		return Mono.zip(input, output).then();
  * 	}
  * }
  * 
* - *

When the connection is closed, the inbound stream will receive a - * completion/error signal, while the outbound stream will get a cancellation - * signal. The above flows are composed in such a way that the - * {@code Mono} returned from the {@code WebSocketHandler} won't complete - * until the connection is closed. + *

A {@code WebSocketHandler} must compose the inbound and outbound streams + * into a unified flow and return a {@code Mono} that reflects the + * completion of that flow. That means there is no need to check if the + * connection is open, since Reactive Streams signals will terminate activity. + * The inbound stream receives a completion/error signal, and the outbound + * stream receives receives a cancellation signal. * * @author Rossen Stoyanchev * @since 5.0 @@ -96,13 +105,17 @@ public interface WebSocketHandler { } /** - * Handle the WebSocket session. - * + * Invoked when a new WebSocket connection is established, and allows + * handling of the session. * + *

See the class-level doc and the reference for more details and + * examples of how to handle the session. * * @param session the session to handle - * @return completion {@code Mono} to indicate the outcome of the - * WebSocket session handling. + * @return indicates when appilcation handling of the session is complete, + * which should reflect the completion of the inbound message stream + * (i.e. connection closing) and possibly the completion of the outbound + * message stream and the writing of messages. */ Mono handle(WebSocketSession session); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java index bca6047a2fc..10fa28176d9 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,15 +25,11 @@ import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; /** - * Represents a WebSocket session with Reactive Streams input and output. + * Represents a WebSocket session. * - *

On the server side a WebSocket session can be handled by mapping - * requests to a {@link WebSocketHandler} and ensuring there is a - * {@link org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter - * WebSocketHandlerAdapter} strategy registered in Spring configuration. - * On the client side a {@link WebSocketHandler} can be provided to a - * {@link org.springframework.web.reactive.socket.client.WebSocketClient - * WebSocketClient}. + *

Use {@link WebSocketSession#receive() session.receive()} to compose on + * the inbound message stream, and {@link WebSocketSession#send(Publisher) + * session.send(publisher)} to provide the outbound message stream. * * @author Rossen Stoyanchev * @since 5.0 @@ -57,13 +53,24 @@ public interface WebSocketSession { DataBufferFactory bufferFactory(); /** - * Get access to the stream of incoming messages. + * Provides access to the stream of inbound messages. + *

This stream receives a completion or error signal when the connection + * is closed. In a typical {@link WebSocketHandler} implementation this + * stream is composed into the overall processing flow, so that when the + * connection is closed, handling will end. + * + *

See the class-level doc of {@link WebSocketHandler} and the reference + * for more details and examples of how to handle the session. */ Flux receive(); /** - * Write the given messages to the WebSocket connection. - * @param messages the messages to write + * Give a source of outgoing messages, write the messages and return a + * {@code Mono} that completes when the source completes and writing + * is done. + * + *

See the class-level doc of {@link WebSocketHandler} and the reference + * for more details and examples of how to handle the session. */ Mono send(Publisher messages); diff --git a/src/docs/asciidoc/web/webflux-websocket.adoc b/src/docs/asciidoc/web/webflux-websocket.adoc index cdd9e432b54..17a36d9b487 100644 --- a/src/docs/asciidoc/web/webflux-websocket.adoc +++ b/src/docs/asciidoc/web/webflux-websocket.adoc @@ -71,7 +71,37 @@ Then map it to a URL and add a `WebSocketHandlerAdapter`: [[webflux-websockethandler]] === WebSocketHandler -The most basic implementation of a handler is one that handles inbound messages: +The `handle` method of `WebSocketHandler` takes `WebSocketSession` and returns `Mono` +to indicate when application handling of the session is complete. The session is handled +through two streams, one for inbound and one for outbound messages: + +[options="header"] +|=== +| WebSocketSession method | Description + +| `Flux receive()` +| Provides access to the inbound message stream, and completes when the connection is closed. + +| `Mono send(Publisher)` +| Takes a source for outgoing messages, writes the messages, and returns a `Mono` that + completes when the source completes and writing is done. + +|=== + +A `WebSocketHandler` must compose the inbound and outbound streams into a unified flow, and +return a `Mono` that reflects the completion of that flow. Depending on application +requirements, the unified flow completes when: + +* Either inbound or outbound message streams complete. +* Inbound stream completes (i.e. connection closed), while outbound is infinite. +* At a chosen point through the `close` method of `WebSocketSession`. + +When inbound and outbound message streams are composed together, there is no need to +check if the connection is open, since Reactive Streams signals will terminate activity. +The inbound stream receives a completion/error signal, and the outbound stream receives +receives a cancellation signal. + +The most basic implementation of a handler is one that handles the inbound stream: [source,java,indent=0] [subs="verbatim,quotes"] @@ -94,17 +124,17 @@ class ExampleHandler implements WebSocketHandler { <1> Access stream of inbound messages. <2> Do something with each message. <3> Perform nested async operation using message content. -<4> Return `Mono` that doesn't complete while we continue to receive. +<4> Return `Mono` that completes when receiving completes. -[NOTE] +[TIP] ==== -If performing a nested, asynchronous operation, you'll need to call -`message.retain()` if the underlying server uses pooled data buffers (e.g. Netty), or -otherwise the data buffer may be released before you've had a chance to read the data. -For more on this see <>. +For nested, asynchronous operations, you may need to call `message.retain()` on underlying +servers that use pooled data buffers (e.g. Netty), or otherwise the data buffer may be +released before you've had a chance to read the data. For more background see +<>. ==== -A handler can work with inbound and outbound messages as independent streams: +The below implementation combines the inbound with the outbound streams: [source,java,indent=0] [subs="verbatim,quotes"] @@ -114,28 +144,25 @@ class ExampleHandler implements WebSocketHandler { @Override public Mono handle(WebSocketSession session) { - Mono input = session.receive() <1> + Flux output = session.receive() <1> .doOnNext(message -> { // ... }) .concatMap(message -> { // ... }) - .then(); - - Flux source = ... ; - Mono output = session.send(source.map(session::textMessage)); <2> + .map(value -> session.textMessage("Echo " + value)); <2> - return Mono.zip(input, output).then(); <3> + return session.send(output); <3> } } ---- <1> Handle inbound message stream. -<2> Send outgoing messages. -<3> Join the streams and return `Mono` that completes when _either_ stream ends. +<2> Create outbound message, producing a combined flow. +<3> Return `Mono` that doesn't complete while we continue to receive. + +Inbound and outbound streams can be independent, and joined only for completion: -A handler can compose a connected flow of inbound and outbound messages: -4 [source,java,indent=0] [subs="verbatim,quotes"] ---- @@ -144,22 +171,25 @@ class ExampleHandler implements WebSocketHandler { @Override public Mono handle(WebSocketSession session) { - Flux output = session.receive() <1> + Mono input = session.receive() <1> .doOnNext(message -> { // ... }) .concatMap(message -> { // ... }) - .map(value -> session.textMessage("Echo " + value)); <2> + .then(); - return session.send(output); <3> + Flux source = ... ; + Mono output = session.send(source.map(session::textMessage)); <2> + + return Mono.zip(input, output).then(); <3> } } ---- <1> Handle inbound message stream. -<2> Create outbound message, producing a combined flow. -<3> Return `Mono` that doesn't complete while we continue to receive. +<2> Send outgoing messages. +<3> Join the streams and return `Mono` that completes when _either_ stream ends. @@ -172,6 +202,8 @@ of `HandshakeWebSocketService`, which performs basic checks on the WebSocket req then uses `RequestUpgradeStrategy` for the server in use. Currently there is built-in support for Reactor Netty, Tomcat, Jetty, and Undertow. +The above are just 3 examples to serve as a starting point. + [[webflux-websocket-server-config]]