You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
299 lines
8.9 KiB
299 lines
8.9 KiB
[[webflux-websocket]] |
|
= WebSockets |
|
[.small]#<<web.adoc#websocket,Same as in the Servlet stack>># |
|
|
|
This part of the reference documentation covers support for reactive-stack WebSocket |
|
messaging. |
|
|
|
include::websocket-intro.adoc[leveloffset=+1] |
|
|
|
|
|
|
|
|
|
[[webflux-websocket-server]] |
|
== WebSocket API |
|
[.small]#<<web.adoc#websocket-server,Same as in the Servlet stack>># |
|
|
|
The Spring Framework provides a WebSocket API that you can use to write client- and |
|
server-side applications that handle WebSocket messages. |
|
|
|
|
|
|
|
[[webflux-websocket-server-handler]] |
|
=== Server |
|
[.small]#<<web.adoc#websocket-server-handler,Same as in the Servlet stack>># |
|
|
|
To create a WebSocket server, you can first create a `WebSocketHandler`. |
|
The following example shows how to do so: |
|
|
|
==== |
|
[source,java,indent=0] |
|
[subs="verbatim,quotes"] |
|
---- |
|
import org.springframework.web.reactive.socket.WebSocketHandler; |
|
import org.springframework.web.reactive.socket.WebSocketSession; |
|
|
|
public class MyWebSocketHandler implements WebSocketHandler { |
|
|
|
@Override |
|
public Mono<Void> handle(WebSocketSession session) { |
|
// ... |
|
} |
|
} |
|
---- |
|
==== |
|
|
|
Then you can map it to a URL and add a `WebSocketHandlerAdapter`, as the following example shows: |
|
|
|
==== |
|
[source,java,indent=0] |
|
[subs="verbatim,quotes"] |
|
---- |
|
@Configuration |
|
static class WebConfig { |
|
|
|
@Bean |
|
public HandlerMapping handlerMapping() { |
|
Map<String, WebSocketHandler> map = new HashMap<>(); |
|
map.put("/path", new MyWebSocketHandler()); |
|
|
|
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping(); |
|
mapping.setUrlMap(map); |
|
mapping.setOrder(-1); // before annotated controllers |
|
return mapping; |
|
} |
|
|
|
@Bean |
|
public WebSocketHandlerAdapter handlerAdapter() { |
|
return new WebSocketHandlerAdapter(); |
|
} |
|
} |
|
---- |
|
==== |
|
|
|
|
|
|
|
[[webflux-websockethandler]] |
|
=== `WebSocketHandler` |
|
|
|
The `handle` method of `WebSocketHandler` takes `WebSocketSession` and returns `Mono<Void>` |
|
to indicate when application handling of the session is complete. The session is handled |
|
through two streams, one for inbound and one for outbound messages. The following table |
|
describes the two methods that handle the streams: |
|
|
|
[options="header"] |
|
|=== |
|
| `WebSocketSession` method | Description |
|
|
|
| `Flux<WebSocketMessage> receive()` |
|
| Provides access to the inbound message stream and completes when the connection is closed. |
|
|
|
| `Mono<Void> send(Publisher<WebSocketMessage>)` |
|
| Takes a source for outgoing messages, writes the messages, and returns a `Mono<Void>` that |
|
completes when the source completes and writing is done. |
|
|
|
|=== |
|
|
|
A `WebSocketHandler` must compose the inbound and outbound streams into a unified flow and |
|
return a `Mono<Void>` that reflects the completion of that flow. Depending on application |
|
requirements, the unified flow completes when: |
|
|
|
* Either the inbound or the outbound message stream completes. |
|
* The inbound stream completes (that is, the connection closed), while the outbound stream is infinite. |
|
* At a chosen point, through the `close` method of `WebSocketSession`. |
|
|
|
When inbound and outbound message streams are composed together, there is no need to |
|
check if the connection is open, since Reactive Streams signals terminate activity. |
|
The inbound stream receives a completion or error signal, and the outbound stream |
|
receives a cancellation signal. |
|
|
|
The most basic implementation of a handler is one that handles the inbound stream. The |
|
following example shows such an implementation: |
|
|
|
==== |
|
[source,java,indent=0] |
|
[subs="verbatim,quotes"] |
|
---- |
|
class ExampleHandler implements WebSocketHandler { |
|
|
|
@Override |
|
public Mono<Void> handle(WebSocketSession session) { |
|
return session.receive() <1> |
|
.doOnNext(message -> { |
|
// ... <2> |
|
}) |
|
.concatMap(message -> { |
|
// ... <3> |
|
}) |
|
.then(); <4> |
|
} |
|
} |
|
---- |
|
<1> Access the stream of inbound messages. |
|
<2> Do something with each message. |
|
<3> Perform nested asynchronous operations that use the message content. |
|
<4> Return a `Mono<Void>` that completes when receiving completes. |
|
==== |
|
|
|
TIP: For nested, asynchronous operations, you may need to call `message.retain()` on underlying |
|
servers that use pooled data buffers (for example, Netty). Otherwise, the data buffer may be |
|
released before you have had a chance to read the data. For more background, see |
|
<<core.adoc#databuffers,Data Buffers and Codecs>>. |
|
|
|
The following implementation combines the inbound and outbound streams: |
|
|
|
==== |
|
[source,java,indent=0] |
|
[subs="verbatim,quotes"] |
|
---- |
|
class ExampleHandler implements WebSocketHandler { |
|
|
|
@Override |
|
public Mono<Void> handle(WebSocketSession session) { |
|
|
|
Flux<WebSocketMessage> output = session.receive() <1> |
|
.doOnNext(message -> { |
|
// ... |
|
}) |
|
.concatMap(message -> { |
|
// ... |
|
}) |
|
.map(value -> session.textMessage("Echo " + value)); <2> |
|
|
|
return session.send(output); <3> |
|
} |
|
} |
|
---- |
|
<1> Handle the inbound message stream. |
|
<2> Create the outbound message, producing a combined flow. |
|
<3> Return a `Mono<Void>` that does not complete while we continue to receive. |
|
==== |
|
|
|
Inbound and outbound streams can be independent and be joined only for completion, |
|
as the following example shows: |
|
|
|
==== |
|
[source,java,indent=0] |
|
[subs="verbatim,quotes"] |
|
---- |
|
class ExampleHandler implements WebSocketHandler { |
|
|
|
@Override |
|
public Mono<Void> handle(WebSocketSession session) { |
|
|
|
Mono<Void> input = session.receive() <1> |
|
.doOnNext(message -> { |
|
// ... |
|
}) |
|
.concatMap(message -> { |
|
// ... |
|
}) |
|
.then(); |
|
|
|
Flux<String> source = ... ; |
|
Mono<Void> output = session.send(source.map(session::textMessage)); <2> |
|
|
|
return Mono.zip(input, output).then(); <3> |
|
} |
|
} |
|
---- |
|
<1> Handle inbound message stream. |
|
<2> Send outgoing messages. |
|
<3> Join the streams and return a `Mono<Void>` that completes when either stream ends. |
|
==== |
|
|
|
|
|
|
|
[[webflux-websocket-server-handshake]] |
|
=== Handshake |
|
[.small]#<<web.adoc#websocket-server-handshake,Same as in the Servlet stack>># |
|
|
|
`WebSocketHandlerAdapter` delegates to a `WebSocketService`. By default, that is an instance |
|
of `HandshakeWebSocketService`, which performs basic checks on the WebSocket request and |
|
then uses `RequestUpgradeStrategy` for the server in use. Currently, there is built-in |
|
support for Reactor Netty, Tomcat, Jetty, and Undertow. |
|
|
|
`HandshakeWebSocketService` exposes a `sessionAttributePredicate` property that allows |
|
setting a `Predicate<String>` to extract attributes from the `WebSession` and insert them |
|
into the attributes of the `WebSocketSession`. |
|
|
|
|
|
|
|
[[webflux-websocket-server-config]] |
|
=== Server Configation |
|
[.small]#<<web.adoc#websocket-server-runtime-configuration,Same as in the Servlet stack>># |
|
|
|
The `RequestUpgradeStrategy` for each server exposes WebSocket-related configuration |
|
options available for the underlying WebSocket engine. The following example sets |
|
WebSocket options when running on Tomcat: |
|
|
|
==== |
|
[source,java,indent=0] |
|
[subs="verbatim,quotes"] |
|
---- |
|
@Configuration |
|
static class WebConfig { |
|
|
|
@Bean |
|
public WebSocketHandlerAdapter handlerAdapter() { |
|
return new WebSocketHandlerAdapter(webSocketService()); |
|
} |
|
|
|
@Bean |
|
public WebSocketService webSocketService() { |
|
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy(); |
|
strategy.setMaxSessionIdleTimeout(0L); |
|
return new HandshakeWebSocketService(strategy); |
|
} |
|
} |
|
---- |
|
==== |
|
|
|
Check the upgrade strategy for your server to see what options are available. Currently, |
|
only Tomcat and Jetty expose such options. |
|
|
|
|
|
|
|
[[webflux-websocket-server-cors]] |
|
=== CORS |
|
[.small]#<<web.adoc#websocket-server-allowed-origins,Same as in the Servlet stack>># |
|
|
|
The easiest way to configure CORS and restrict access to a WebSocket endpoint is to |
|
have your `WebSocketHandler` implement `CorsConfigurationSource` and return a |
|
`CorsConfiguraiton` with allowed origins, headers, and other details. If you cannot do |
|
that, you can also set the `corsConfigurations` property on the `SimpleUrlHandler` to |
|
specify CORS settings by URL pattern. If both are specified, they are combined by using the |
|
`combine` method on `CorsConfiguration`. |
|
|
|
|
|
|
|
[[webflux-websocket-client]] |
|
=== Client |
|
|
|
Spring WebFlux provides a `WebSocketClient` abstraction with implementations for |
|
Reactor Netty, Tomcat, Jetty, Undertow, and standard Java (that is, JSR-356). |
|
|
|
NOTE: The Tomcat client is effectively an extension of the standard Java one with some extra |
|
functionality in the `WebSocketSession` handling to take advantage of the Tomcat-specific |
|
API to suspend receiving messages for back pressure. |
|
|
|
To start a WebSocket session, you can create an instance of the client and use its `execute` |
|
methods: |
|
|
|
==== |
|
[source,java,indent=0] |
|
[subs="verbatim,quotes"] |
|
---- |
|
WebSocketClient client = new ReactorNettyWebSocketClient(); |
|
|
|
URI url = new URI("ws://localhost:8080/path"); |
|
client.execute(url, session -> |
|
session.receive() |
|
.doOnNext(System.out::println) |
|
.then()); |
|
---- |
|
==== |
|
|
|
Some clients, such as Jetty, implement `Lifecycle` and need to be stopped and started |
|
before you can use them. All clients have constructor options related to configuration |
|
of the underlying WebSocket client.
|
|
|