Browse Source

Polish WebFlux WebSocket docs

Issue: SPR-16820
pull/1831/head
Rossen Stoyanchev 8 years ago
parent
commit
ade2eab169
  1. 61
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketHandler.java
  2. 31
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java
  3. 78
      src/docs/asciidoc/web/webflux-websocket.adoc

61
spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketHandler.java

@ -25,12 +25,18 @@ import reactor.core.publisher.Mono;
/** /**
* Handler for a WebSocket session. * Handler for a WebSocket session.
* *
* <p>Use {@link WebSocketSession#receive()} to compose on the stream of * <p>A server {@code WebSocketHandler} is mapped to requests with
* inbound messages and {@link WebSocketSession#send(Publisher)} to write the * {@link org.springframework.web.reactive.handler.SimpleUrlHandlerMapping
* stream of outbound messages. * SimpleUrlHandlerMapping} and
* {@link org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter
* WebSocketHandlerAdapter}. A client {@code WebSocketHandler} is passed to the
* {@link org.springframework.web.reactive.socket.client.WebSocketClient
* WebSocketClient} execute method.
* *
* <p>You can handle inbound and outbound messages as independent streams, and * <p>Use {@link WebSocketSession#receive() session.receive()} to compose on
* then join them: * the inbound message stream, and {@link WebSocketSession#send(Publisher)
* session.send(publisher)} for the outbound message stream. Below is an
* example, combined flow to process inbound and to send outbound messages:
* *
* <pre class="code"> * <pre class="code">
* class ExampleHandler implements WebSocketHandler { * class ExampleHandler implements WebSocketHandler {
@ -38,49 +44,52 @@ import reactor.core.publisher.Mono;
* &#064;Override * &#064;Override
* public Mono&lt;Void&gt; handle(WebSocketSession session) { * public Mono&lt;Void&gt; handle(WebSocketSession session) {
* *
* Mono&lt;Void&gt; input = session.receive() * Flux&lt;WebSocketMessage&gt; input = session.receive()
* .doOnNext(message -> { * .doOnNext(message -> {
* // ... * // ...
* }) * })
* .concatMap(message -> { * .concatMap(message -> {
* // ... * // ...
* }) * })
* .then(); * .map(value -> session.textMessage("Echo " + value));
*
* Flux&lt;String&gt; source = ... ;
* Mono&lt;Void&gt; output = session.send(source.map(session::textMessage));
* *
* return Mono.zip(input, output).then(); * return session.send(output);
* } * }
* } * }
* </pre> * </pre>
* *
* <p>You can also create a single flow including inbound and outbound messages: * <p>If processing inbound and sending outbound messages are independent
* streams, they can be joined together with the "zip" operator:
*
* <pre class="code"> * <pre class="code">
* class ExampleHandler implements WebSocketHandler { * class ExampleHandler implements WebSocketHandler {
* &#064;Override * &#064;Override
* public Mono&lt;Void&gt; handle(WebSocketSession session) { * public Mono&lt;Void&gt; handle(WebSocketSession session) {
* *
* Flux&lt;WebSocketMessage&gt; input = session.receive() * Mono&lt;Void&gt; input = session.receive()
* .doOnNext(message -> { * .doOnNext(message -> {
* // ... * // ...
* }) * })
* .concatMap(message -> { * .concatMap(message -> {
* // ... * // ...
* }) * })
* .map(value -> session.textMessage("Echo " + value)); * .then();
* *
* return session.send(output); * Flux&lt;String&gt; source = ... ;
* Mono&lt;Void&gt; output = session.send(source.map(session::textMessage));
*
* return Mono.zip(input, output).then();
* } * }
* } * }
* </pre> * </pre>
* *
* <p>When the connection is closed, the inbound stream will receive a * <p>A {@code WebSocketHandler} must compose the inbound and outbound streams
* completion/error signal, while the outbound stream will get a cancellation * into a unified flow and return a {@code Mono<Void>} that reflects the
* signal. The above flows are composed in such a way that the * completion of that flow. That means there is no need to check if the
* {@code Mono<Void>} returned from the {@code WebSocketHandler} won't complete * connection is open, since Reactive Streams signals will terminate activity.
* until the connection is closed. * The inbound stream receives a completion/error signal, and the outbound
* stream receives receives a cancellation signal.
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 5.0 * @since 5.0
@ -96,13 +105,17 @@ public interface WebSocketHandler {
} }
/** /**
* Handle the WebSocket session. * Invoked when a new WebSocket connection is established, and allows
* * handling of the session.
* *
* <p>See the class-level doc and the reference for more details and
* examples of how to handle the session.
* *
* @param session the session to handle * @param session the session to handle
* @return completion {@code Mono<Void>} to indicate the outcome of the * @return indicates when appilcation handling of the session is complete,
* WebSocket session handling. * which should reflect the completion of the inbound message stream
* (i.e. connection closing) and possibly the completion of the outbound
* message stream and the writing of messages.
*/ */
Mono<Void> handle(WebSocketSession session); Mono<Void> handle(WebSocketSession session);

31
spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java

@ -1,5 +1,5 @@
/* /*
* Copyright 2002-2016 the original author or authors. * Copyright 2002-2018 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -25,15 +25,11 @@ import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;
/** /**
* Represents a WebSocket session with Reactive Streams input and output. * Represents a WebSocket session.
* *
* <p>On the server side a WebSocket session can be handled by mapping * <p>Use {@link WebSocketSession#receive() session.receive()} to compose on
* requests to a {@link WebSocketHandler} and ensuring there is a * the inbound message stream, and {@link WebSocketSession#send(Publisher)
* {@link org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter * session.send(publisher)} to provide the outbound message stream.
* WebSocketHandlerAdapter} strategy registered in Spring configuration.
* On the client side a {@link WebSocketHandler} can be provided to a
* {@link org.springframework.web.reactive.socket.client.WebSocketClient
* WebSocketClient}.
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 5.0 * @since 5.0
@ -57,13 +53,24 @@ public interface WebSocketSession {
DataBufferFactory bufferFactory(); DataBufferFactory bufferFactory();
/** /**
* Get access to the stream of incoming messages. * Provides access to the stream of inbound messages.
* <p>This stream receives a completion or error signal when the connection
* is closed. In a typical {@link WebSocketHandler} implementation this
* stream is composed into the overall processing flow, so that when the
* connection is closed, handling will end.
*
* <p>See the class-level doc of {@link WebSocketHandler} and the reference
* for more details and examples of how to handle the session.
*/ */
Flux<WebSocketMessage> receive(); Flux<WebSocketMessage> receive();
/** /**
* Write the given messages to the WebSocket connection. * Give a source of outgoing messages, write the messages and return a
* @param messages the messages to write * {@code Mono<Void>} that completes when the source completes and writing
* is done.
*
* <p>See the class-level doc of {@link WebSocketHandler} and the reference
* for more details and examples of how to handle the session.
*/ */
Mono<Void> send(Publisher<WebSocketMessage> messages); Mono<Void> send(Publisher<WebSocketMessage> messages);

78
src/docs/asciidoc/web/webflux-websocket.adoc

@ -71,7 +71,37 @@ Then map it to a URL and add a `WebSocketHandlerAdapter`:
[[webflux-websockethandler]] [[webflux-websockethandler]]
=== WebSocketHandler === WebSocketHandler
The most basic implementation of a handler is one that handles inbound messages: The `handle` method of `WebSocketHandler` takes `WebSocketSession` and returns `Mono<Void>`
to indicate when application handling of the session is complete. The session is handled
through two streams, one for inbound and one for outbound messages:
[options="header"]
|===
| WebSocketSession method | Description
| `Flux<WebSocketMessage> receive()`
| Provides access to the inbound message stream, and completes when the connection is closed.
| `Mono<Void> send(Publisher<WebSocketMessage>)`
| Takes a source for outgoing messages, writes the messages, and returns a `Mono<Void>` that
completes when the source completes and writing is done.
|===
A `WebSocketHandler` must compose the inbound and outbound streams into a unified flow, and
return a `Mono<Void>` that reflects the completion of that flow. Depending on application
requirements, the unified flow completes when:
* Either inbound or outbound message streams complete.
* Inbound stream completes (i.e. connection closed), while outbound is infinite.
* At a chosen point through the `close` method of `WebSocketSession`.
When inbound and outbound message streams are composed together, there is no need to
check if the connection is open, since Reactive Streams signals will terminate activity.
The inbound stream receives a completion/error signal, and the outbound stream receives
receives a cancellation signal.
The most basic implementation of a handler is one that handles the inbound stream:
[source,java,indent=0] [source,java,indent=0]
[subs="verbatim,quotes"] [subs="verbatim,quotes"]
@ -94,17 +124,17 @@ class ExampleHandler implements WebSocketHandler {
<1> Access stream of inbound messages. <1> Access stream of inbound messages.
<2> Do something with each message. <2> Do something with each message.
<3> Perform nested async operation using message content. <3> Perform nested async operation using message content.
<4> Return `Mono<Void>` that doesn't complete while we continue to receive. <4> Return `Mono<Void>` that completes when receiving completes.
[NOTE] [TIP]
==== ====
If performing a nested, asynchronous operation, you'll need to call For nested, asynchronous operations, you may need to call `message.retain()` on underlying
`message.retain()` if the underlying server uses pooled data buffers (e.g. Netty), or servers that use pooled data buffers (e.g. Netty), or otherwise the data buffer may be
otherwise the data buffer may be released before you've had a chance to read the data. released before you've had a chance to read the data. For more background see
For more on this see <<core.adoc#databuffers,Data Buffers and Codecs>>. <<core.adoc#databuffers,Data Buffers and Codecs>>.
==== ====
A handler can work with inbound and outbound messages as independent streams: The below implementation combines the inbound with the outbound streams:
[source,java,indent=0] [source,java,indent=0]
[subs="verbatim,quotes"] [subs="verbatim,quotes"]
@ -114,28 +144,25 @@ class ExampleHandler implements WebSocketHandler {
@Override @Override
public Mono<Void> handle(WebSocketSession session) { public Mono<Void> handle(WebSocketSession session) {
Mono<Void> input = session.receive() <1> Flux<WebSocketMessage> output = session.receive() <1>
.doOnNext(message -> { .doOnNext(message -> {
// ... // ...
}) })
.concatMap(message -> { .concatMap(message -> {
// ... // ...
}) })
.then(); .map(value -> session.textMessage("Echo " + value)); <2>
Flux<String> source = ... ;
Mono<Void> output = session.send(source.map(session::textMessage)); <2>
return Mono.zip(input, output).then(); <3> return session.send(output); <3>
} }
} }
---- ----
<1> Handle inbound message stream. <1> Handle inbound message stream.
<2> Send outgoing messages. <2> Create outbound message, producing a combined flow.
<3> Join the streams and return `Mono<Void>` that completes when _either_ stream ends. <3> Return `Mono<Void>` that doesn't complete while we continue to receive.
Inbound and outbound streams can be independent, and joined only for completion:
A handler can compose a connected flow of inbound and outbound messages:
4
[source,java,indent=0] [source,java,indent=0]
[subs="verbatim,quotes"] [subs="verbatim,quotes"]
---- ----
@ -144,22 +171,25 @@ class ExampleHandler implements WebSocketHandler {
@Override @Override
public Mono<Void> handle(WebSocketSession session) { public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> output = session.receive() <1> Mono<Void> input = session.receive() <1>
.doOnNext(message -> { .doOnNext(message -> {
// ... // ...
}) })
.concatMap(message -> { .concatMap(message -> {
// ... // ...
}) })
.map(value -> session.textMessage("Echo " + value)); <2> .then();
return session.send(output); <3> Flux<String> source = ... ;
Mono<Void> output = session.send(source.map(session::textMessage)); <2>
return Mono.zip(input, output).then(); <3>
} }
} }
---- ----
<1> Handle inbound message stream. <1> Handle inbound message stream.
<2> Create outbound message, producing a combined flow. <2> Send outgoing messages.
<3> Return `Mono<Void>` that doesn't complete while we continue to receive. <3> Join the streams and return `Mono<Void>` that completes when _either_ stream ends.
@ -172,6 +202,8 @@ of `HandshakeWebSocketService`, which performs basic checks on the WebSocket req
then uses `RequestUpgradeStrategy` for the server in use. Currently there is built-in then uses `RequestUpgradeStrategy` for the server in use. Currently there is built-in
support for Reactor Netty, Tomcat, Jetty, and Undertow. support for Reactor Netty, Tomcat, Jetty, and Undertow.
The above are just 3 examples to serve as a starting point.
[[webflux-websocket-server-config]] [[webflux-websocket-server-config]]

Loading…
Cancel
Save