diff --git a/build.gradle b/build.gradle
index daa83c97533..d35a0dd1a00 100644
--- a/build.gradle
+++ b/build.gradle
@@ -802,6 +802,11 @@ project("spring-web-reactive") {
optional("org.freemarker:freemarker:${freemarkerVersion}")
optional "org.apache.httpcomponents:httpclient:${httpclientVersion}"
optional('org.webjars:webjars-locator:0.32')
+ optional("io.reactivex:rxnetty-http:${rxnettyVersion}") {
+ exclude group: 'io.reactivex', module: 'rxjava'
+ }
+ optional("io.reactivex:rxjava:${rxjavaVersion}")
+ optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}")
testCompile("io.projectreactor.addons:reactor-test:${reactorCoreVersion}")
testCompile("javax.validation:validation-api:${beanvalVersion}")
testCompile("org.hibernate:hibernate-validator:${hibval5Version}")
@@ -810,12 +815,7 @@ project("spring-web-reactive") {
testCompile("org.eclipse.jetty:jetty-server:${jettyVersion}")
testCompile("org.eclipse.jetty:jetty-servlet:${jettyVersion}")
testCompile("io.projectreactor.ipc:reactor-netty:${reactorNettyVersion}")
- testCompile("io.reactivex:rxnetty-http:${rxnettyVersion}") {
- exclude group: 'io.reactivex', module: 'rxjava'
- }
- testCompile("io.reactivex:rxjava:${rxjavaVersion}")
testCompile "io.reactivex.rxjava2:rxjava:${rxjava2Version}"
- testCompile("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}")
testCompile("io.undertow:undertow-core:${undertowVersion}")
testCompile("org.jboss.xnio:xnio-api:${xnioVersion}")
testCompile("com.fasterxml:aalto-xml:1.0.0")
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/CloseStatus.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/CloseStatus.java
new file mode 100644
index 00000000000..edc515e2e3c
--- /dev/null
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/CloseStatus.java
@@ -0,0 +1,213 @@
+/*
+ * Copyright 2002-2014 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.web.reactive.socket;
+
+import org.springframework.util.Assert;
+import org.springframework.util.ObjectUtils;
+
+/**
+ * Representation of WebSocket "close" status codes and reasons. Status codes
+ * in the 1xxx range are pre-defined by the protocol.
+ *
+ *
See
+ * RFC 6455, Section 7.4.1 "Defined Status Codes".
+ *
+ * @author Rossen Stoyanchev
+ * @since 5.0
+ */
+public final class CloseStatus {
+
+ /**
+ * "1000 indicates a normal closure, meaning that the purpose for which the connection
+ * was established has been fulfilled."
+ */
+ public static final CloseStatus NORMAL = new CloseStatus(1000);
+
+ /**
+ * "1001 indicates that an endpoint is "going away", such as a server going down or a
+ * browser having navigated away from a page."
+ */
+ public static final CloseStatus GOING_AWAY = new CloseStatus(1001);
+
+ /**
+ * "1002 indicates that an endpoint is terminating the connection due to a protocol
+ * error."
+ */
+ public static final CloseStatus PROTOCOL_ERROR = new CloseStatus(1002);
+
+ /**
+ * "1003 indicates that an endpoint is terminating the connection because it has
+ * received a type of data it cannot accept (e.g., an endpoint that understands only
+ * text data MAY send this if it receives a binary message)."
+ */
+ public static final CloseStatus NOT_ACCEPTABLE = new CloseStatus(1003);
+
+ // 10004: Reserved.
+ // The specific meaning might be defined in the future.
+
+ /**
+ * "1005 is a reserved value and MUST NOT be set as a status code in a Close control
+ * frame by an endpoint. It is designated for use in applications expecting a status
+ * code to indicate that no status code was actually present."
+ */
+ public static final CloseStatus NO_STATUS_CODE = new CloseStatus(1005);
+
+ /**
+ * "1006 is a reserved value and MUST NOT be set as a status code in a Close control
+ * frame by an endpoint. It is designated for use in applications expecting a status
+ * code to indicate that the connection was closed abnormally, e.g., without sending
+ * or receiving a Close control frame."
+ */
+ public static final CloseStatus NO_CLOSE_FRAME = new CloseStatus(1006);
+
+ /**
+ * "1007 indicates that an endpoint is terminating the connection because it has
+ * received data within a message that was not consistent with the type of the message
+ * (e.g., non-UTF-8 [RFC3629] data within a text message)."
+ */
+ public static final CloseStatus BAD_DATA = new CloseStatus(1007);
+
+ /**
+ * "1008 indicates that an endpoint is terminating the connection because it has
+ * received a message that violates its policy. This is a generic status code that can
+ * be returned when there is no other more suitable status code (e.g., 1003 or 1009)
+ * or if there is a need to hide specific details about the policy."
+ */
+ public static final CloseStatus POLICY_VIOLATION = new CloseStatus(1008);
+
+ /**
+ * "1009 indicates that an endpoint is terminating the connection because it has
+ * received a message that is too big for it to process."
+ */
+ public static final CloseStatus TOO_BIG_TO_PROCESS = new CloseStatus(1009);
+
+ /**
+ * "1010 indicates that an endpoint (client) is terminating the connection because it
+ * has expected the server to negotiate one or more extension, but the server didn't
+ * return them in the response message of the WebSocket handshake. The list of
+ * extensions that are needed SHOULD appear in the /reason/ part of the Close frame.
+ * Note that this status code is not used by the server, because it can fail the
+ * WebSocket handshake instead."
+ */
+ public static final CloseStatus REQUIRED_EXTENSION = new CloseStatus(1010);
+
+ /**
+ * "1011 indicates that a server is terminating the connection because it encountered
+ * an unexpected condition that prevented it from fulfilling the request."
+ */
+ public static final CloseStatus SERVER_ERROR = new CloseStatus(1011);
+
+ /**
+ * "1012 indicates that the service is restarted. A client may reconnect, and if it
+ * chooses to do, should reconnect using a randomized delay of 5 - 30s."
+ */
+ public static final CloseStatus SERVICE_RESTARTED = new CloseStatus(1012);
+
+ /**
+ * "1013 indicates that the service is experiencing overload. A client should only
+ * connect to a different IP (when there are multiple for the target) or reconnect to
+ * the same IP upon user action."
+ */
+ public static final CloseStatus SERVICE_OVERLOAD = new CloseStatus(1013);
+
+ /**
+ * "1015 is a reserved value and MUST NOT be set as a status code in a Close control
+ * frame by an endpoint. It is designated for use in applications expecting a status
+ * code to indicate that the connection was closed due to a failure to perform a TLS
+ * handshake (e.g., the server certificate can't be verified)."
+ */
+ public static final CloseStatus TLS_HANDSHAKE_FAILURE = new CloseStatus(1015);
+
+
+ private final int code;
+
+ private final String reason;
+
+
+ /**
+ * Create a new {@link CloseStatus} instance.
+ * @param code the status code
+ */
+ public CloseStatus(int code) {
+ this(code, null);
+ }
+
+ /**
+ * Create a new {@link CloseStatus} instance.
+ * @param code the status code
+ * @param reason the reason
+ */
+ public CloseStatus(int code, String reason) {
+ Assert.isTrue((code >= 1000 && code < 5000), "Invalid status code");
+ this.code = code;
+ this.reason = reason;
+ }
+
+
+ /**
+ * Return the status code.
+ */
+ public int getCode() {
+ return this.code;
+ }
+
+ /**
+ * Return the reason, or {@code null} if none.
+ */
+ public String getReason() {
+ return this.reason;
+ }
+
+ /**
+ * Create a new {@link CloseStatus} from this one with the specified reason.
+ * @param reason the reason
+ * @return a new {@link CloseStatus} instance
+ */
+ public CloseStatus withReason(String reason) {
+ Assert.hasText(reason, "Reason must not be empty");
+ return new CloseStatus(this.code, reason);
+ }
+
+
+ public boolean equalsCode(CloseStatus other) {
+ return (this.code == other.code);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!(other instanceof CloseStatus)) {
+ return false;
+ }
+ CloseStatus otherStatus = (CloseStatus) other;
+ return (this.code == otherStatus.code &&
+ ObjectUtils.nullSafeEquals(this.reason, otherStatus.reason));
+ }
+
+ @Override
+ public int hashCode() {
+ return this.code * 29 + ObjectUtils.nullSafeHashCode(this.reason);
+ }
+
+ @Override
+ public String toString() {
+ return "CloseStatus[code=" + this.code + ", reason=" + this.reason + "]";
+ }
+
+}
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketHandler.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketHandler.java
new file mode 100644
index 00000000000..ce4713e3b2e
--- /dev/null
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketHandler.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2002-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.web.reactive.socket;
+
+import java.util.Collections;
+import java.util.List;
+
+import reactor.core.publisher.Mono;
+
+/**
+ * Handler for a WebSocket-style session interaction.
+ *
+ * @author Rossen Stoyanchev
+ * @since 5.0
+ */
+public interface WebSocketHandler {
+
+ /**
+ * Return the list of sub-protocols supported by this handler.
+ *
By default an empty list is returned.
+ */
+ default List getSubProtocols() {
+ return Collections.emptyList();
+ }
+
+ /**
+ * Handle the given WebSocket session.
+ * @param session the session
+ * @return signals completion for session handling
+ */
+ Mono handle(WebSocketSession session);
+
+}
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java
new file mode 100644
index 00000000000..66afe78b313
--- /dev/null
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2002-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.web.reactive.socket;
+
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.util.Assert;
+import org.springframework.util.ObjectUtils;
+
+/**
+ * Representation of a WebSocket message.
+ * Use one of the static factory methods in this class to create a message.
+ *
+ * @author Rossen Stoyanchev
+ * @since 5.0
+ */
+public class WebSocketMessage {
+
+ private final Type type;
+
+ private final DataBuffer payload;
+
+
+ /**
+ * Private constructor. See static factory methods.
+ */
+ private WebSocketMessage(Type type, DataBuffer payload) {
+ Assert.notNull(type, "'type' must not be null");
+ Assert.notNull(payload, "'payload' must not be null");
+ this.type = type;
+ this.payload = payload;
+ }
+
+
+ /**
+ * Return the message type (text, binary, etc).
+ */
+ public Type getType() {
+ return this.type;
+ }
+
+ /**
+ * Return the message payload.
+ */
+ public DataBuffer getPayload() {
+ return this.payload;
+ }
+
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!(other instanceof WebSocketMessage)) {
+ return false;
+ }
+ WebSocketMessage otherMessage = (WebSocketMessage) other;
+ return (this.type.equals(otherMessage.type) &&
+ ObjectUtils.nullSafeEquals(this.payload, otherMessage.payload));
+ }
+
+ @Override
+ public int hashCode() {
+ return this.type.hashCode() * 29 + this.payload.hashCode();
+ }
+
+
+ /**
+ * Factory method to create a text WebSocket message.
+ */
+ public static WebSocketMessage text(DataBuffer payload) {
+ return create(Type.TEXT, payload);
+ }
+
+ /**
+ * Factory method to create a binary WebSocket message.
+ */
+ public static WebSocketMessage binary(DataBuffer payload) {
+ return create(Type.BINARY, payload);
+ }
+
+ /**
+ * Factory method to create a ping WebSocket message.
+ */
+ public static WebSocketMessage ping(DataBuffer payload) {
+ return create(Type.PING, payload);
+ }
+
+ /**
+ * Factory method to create a pong WebSocket message.
+ */
+ public static WebSocketMessage pong(DataBuffer payload) {
+ return create(Type.PONG, payload);
+ }
+
+ /**
+ * Factory method to create a WebSocket message of the given type.
+ */
+ public static WebSocketMessage create(Type type, DataBuffer payload) {
+ return new WebSocketMessage(type, payload);
+ }
+
+
+ /**
+ * WebSocket message types.
+ */
+ public enum Type { TEXT, BINARY, PING, PONG }
+
+}
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java
new file mode 100644
index 00000000000..f094edafa82
--- /dev/null
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2002-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.web.reactive.socket;
+
+import java.net.URI;
+
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import org.springframework.core.io.buffer.DataBuffer;
+
+/**
+ * Representation for a WebSocket session.
+ *
+ * @author Rossen Stoyanchev
+ * @since 5.0
+ */
+public interface WebSocketSession {
+
+ /**
+ * Return the id for the session.
+ */
+ String getId();
+
+ /**
+ * Return the WebSocket endpoint URI.
+ */
+ URI getUri();
+
+ /**
+ * Get the flux of incoming messages.
+ * Note: the caller of this method is responsible for
+ * releasing the DataBuffer payload of each message after consuming it
+ * on runtimes where a {@code PooledByteBuffer} is used such as Netty.
+ * @see org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer)
+ */
+ Flux receive();
+
+ /**
+ * Write the given messages to the WebSocket connection.
+ * @param messages the messages to write
+ */
+ Mono send(Publisher messages);
+
+ /**
+ * Close the WebSocket session with {@link CloseStatus#NORMAL}.
+ */
+ default Mono close() {
+ return close(CloseStatus.NORMAL);
+ }
+
+ /**
+ * Close the WebSocket session with the given status.
+ * @param status the close status
+ */
+ Mono close(CloseStatus status);
+
+}
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketHandlerAdapter.java
new file mode 100644
index 00000000000..4c727575e80
--- /dev/null
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketHandlerAdapter.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2002-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.web.reactive.socket.adapter;
+
+import java.net.URI;
+
+import io.reactivex.netty.protocol.http.ws.WebSocketConnection;
+import reactor.core.publisher.Mono;
+import rx.Observable;
+import rx.RxReactiveStreams;
+
+import org.springframework.core.io.buffer.NettyDataBufferFactory;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.http.server.reactive.ServerHttpResponse;
+import org.springframework.util.Assert;
+import org.springframework.web.reactive.socket.WebSocketHandler;
+
+/**
+ * RxNetty {@code WebSocketHandler} implementation adapting and delegating to a
+ * Spring {@link WebSocketHandler}.
+ *
+ * @author Rossen Stoyanchev
+ * @since 5.0
+ */
+public class RxNettyWebSocketHandlerAdapter
+ implements io.reactivex.netty.protocol.http.ws.server.WebSocketHandler {
+
+ private final URI uri;
+
+ private final NettyDataBufferFactory bufferFactory;
+
+ private final WebSocketHandler handler;
+
+
+ public RxNettyWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response,
+ WebSocketHandler handler) {
+
+ Assert.notNull("'request' is required");
+ Assert.notNull("'response' is required");
+ Assert.notNull("'handler' handler is required");
+
+ this.uri = request.getURI();
+ this.bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
+ this.handler = handler;
+ }
+
+
+ @Override
+ public Observable handle(WebSocketConnection connection) {
+ Mono result = this.handler.handle(createSession(connection));
+ return RxReactiveStreams.toObservable(result);
+ }
+
+ private RxNettyWebSocketSession createSession(WebSocketConnection conn) {
+ return new RxNettyWebSocketSession(conn, this.uri, this.bufferFactory);
+ }
+
+}
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java
new file mode 100644
index 00000000000..455b5d8eb0c
--- /dev/null
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2002-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.web.reactive.socket.adapter;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketFrame;
+import io.reactivex.netty.protocol.http.ws.WebSocketConnection;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import rx.Observable;
+import rx.RxReactiveStreams;
+
+import org.springframework.core.io.buffer.NettyDataBuffer;
+import org.springframework.core.io.buffer.NettyDataBufferFactory;
+import org.springframework.util.Assert;
+import org.springframework.util.ObjectUtils;
+import org.springframework.web.reactive.socket.CloseStatus;
+import org.springframework.web.reactive.socket.WebSocketMessage;
+
+/**
+ *
+ * @author Rossen Stoyanchev
+ * @since 5.0
+ */
+public class RxNettyWebSocketSession extends WebSocketSessionSupport {
+
+ private static final Map, WebSocketMessage.Type> MESSAGE_TYPES;
+
+ static {
+ MESSAGE_TYPES = new HashMap<>(4);
+ MESSAGE_TYPES.put(TextWebSocketFrame.class, WebSocketMessage.Type.TEXT);
+ MESSAGE_TYPES.put(BinaryWebSocketFrame.class, WebSocketMessage.Type.BINARY);
+ MESSAGE_TYPES.put(PingWebSocketFrame.class, WebSocketMessage.Type.PING);
+ MESSAGE_TYPES.put(PongWebSocketFrame.class, WebSocketMessage.Type.PONG);
+ }
+
+
+ private final String id;
+
+ private final URI uri;
+
+ private final NettyDataBufferFactory bufferFactory;
+
+
+ public RxNettyWebSocketSession(WebSocketConnection conn, URI uri, NettyDataBufferFactory factory) {
+ super(conn);
+ Assert.notNull(uri, "'uri' is required.");
+ Assert.notNull(uri, "'bufferFactory' is required.");
+ this.id = ObjectUtils.getIdentityHexString(getDelegate());
+ this.uri = uri;
+ this.bufferFactory = factory;
+ }
+
+
+ @Override
+ public String getId() {
+ return this.id;
+ }
+
+ @Override
+ public URI getUri() {
+ return this.uri;
+ }
+
+ @Override
+ public Flux receive() {
+ return Flux.from(RxReactiveStreams.toPublisher(getDelegate().getInput()))
+ .filter(frame -> !(frame instanceof CloseWebSocketFrame))
+ .window()
+ .concatMap(flux -> flux.takeUntil(WebSocketFrame::isFinalFragment).buffer())
+ .map(this::toMessage);
+ }
+
+ @SuppressWarnings("OptionalGetWithoutIsPresent")
+ private WebSocketMessage toMessage(List frames) {
+ Class> frameType = frames.get(0).getClass();
+ if (frames.size() == 1) {
+ NettyDataBuffer buffer = this.bufferFactory.wrap(frames.get(0).content());
+ return WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer);
+ }
+ return frames.stream()
+ .map(socketFrame -> bufferFactory.wrap(socketFrame.content()))
+ .reduce(NettyDataBuffer::write)
+ .map(buffer -> WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer))
+ .get();
+ }
+
+ @Override
+ public Mono send(Publisher messages) {
+ Observable frames = RxReactiveStreams.toObservable(messages).map(this::toFrame);
+ Observable completion = getDelegate().write(frames);
+ return Mono.from(RxReactiveStreams.toPublisher(completion));
+ }
+
+ private WebSocketFrame toFrame(WebSocketMessage message) {
+ ByteBuf byteBuf = NettyDataBufferFactory.toByteBuf(message.getPayload());
+ if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
+ return new TextWebSocketFrame(byteBuf);
+ }
+ else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
+ return new BinaryWebSocketFrame(byteBuf);
+ }
+ else if (WebSocketMessage.Type.PING.equals(message.getType())) {
+ return new PingWebSocketFrame(byteBuf);
+ }
+ else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
+ return new PongWebSocketFrame(byteBuf);
+ }
+ else {
+ throw new IllegalArgumentException("Unexpected message type: " + message.getType());
+ }
+ }
+
+ @Override
+ protected Mono closeInternal(CloseStatus status) {
+ return Mono.from(RxReactiveStreams.toPublisher(getDelegate().close()));
+ }
+
+}
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java
new file mode 100644
index 00000000000..0e5ec0daf71
--- /dev/null
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2002-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.web.reactive.socket.adapter;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import reactor.core.publisher.Mono;
+
+import org.springframework.util.Assert;
+import org.springframework.web.reactive.socket.CloseStatus;
+import org.springframework.web.reactive.socket.WebSocketSession;
+
+/**
+ * Base class for {@link WebSocketSession} implementations wrapping and
+ * delegating to the native WebSocket session (or connection) of the underlying
+ * WebSocket runtime.
+ *
+ * @author Rossen Stoyanchev
+ * @since 5.0
+ */
+public abstract class WebSocketSessionSupport implements WebSocketSession {
+
+ protected final Log logger = LogFactory.getLog(getClass());
+
+
+ private final T delegate;
+
+
+ /**
+ * Create a new instance and associate the given attributes with it.
+ * @param delegate the underlying WebSocket connection
+ */
+ protected WebSocketSessionSupport(T delegate) {
+ Assert.notNull(delegate, "'delegate' session is required.");
+ this.delegate = delegate;
+ }
+
+
+ /**
+ * Return the native session of the underlying runtime.
+ */
+ public T getDelegate() {
+ return this.delegate;
+ }
+
+
+ @Override
+ public final Mono close(CloseStatus status) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Closing " + this);
+ }
+ return closeInternal(status);
+ }
+
+ protected abstract Mono closeInternal(CloseStatus status);
+
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "[id=" + getId() + ", uri=" + getUri() + "]";
+ }
+
+}
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/package-info.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/package-info.java
new file mode 100644
index 00000000000..aa675f1b3bc
--- /dev/null
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Classes adapting Spring's Reactive WebSocket API to and from WebSocket runtimes.
+ */
+package org.springframework.web.reactive.socket.adapter;
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/package-info.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/package-info.java
new file mode 100644
index 00000000000..38e4baf22b7
--- /dev/null
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Abstractions and support classes for WebSocket interactions.
+ */
+package org.springframework.web.reactive.socket;
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/RequestUpgradeStrategy.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/RequestUpgradeStrategy.java
new file mode 100644
index 00000000000..61c819eafac
--- /dev/null
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/RequestUpgradeStrategy.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2002-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.web.reactive.socket.server;
+
+import java.util.Map;
+
+import reactor.core.publisher.Mono;
+
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.http.server.reactive.ServerHttpResponse;
+import org.springframework.web.reactive.socket.WebSocketHandler;
+import org.springframework.web.server.ServerWebExchange;
+
+/**
+ * A strategy for upgrading an HTTP request to a WebSocket interaction depending
+ * on the underlying HTTP runtime.
+ *
+ * Typically there is one such strategy for every {@link ServerHttpRequest}
+ * and {@link ServerHttpResponse} implementation type except in the case of
+ * Servlet containers for which there is no standard API to upgrade a request.
+ * JSR-356 does have programmatic endpoint registration but that is only
+ * intended for use on startup and not per request.
+ *
+ * @author Rossen Stoyanchev
+ * @since 5.0
+ */
+public interface RequestUpgradeStrategy {
+
+ /**
+ * Upgrade the request to a WebSocket interaction and adapt the given
+ * Spring {@link WebSocketHandler} to the underlying runtime WebSocket API.
+ * @param exchange the current exchange
+ * @param webSocketHandler handler for WebSocket session
+ * @return a completion Mono for the WebSocket session handling
+ */
+ Mono upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler);
+
+}
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/WebSocketService.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/WebSocketService.java
new file mode 100644
index 00000000000..27d0f3cf9d4
--- /dev/null
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/WebSocketService.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2002-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.web.reactive.socket.server;
+
+import reactor.core.publisher.Mono;
+
+import org.springframework.web.reactive.socket.WebSocketHandler;
+import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService;
+import org.springframework.web.server.ServerWebExchange;
+
+/**
+ * A service to delegate WebSocket-related HTTP requests to.
+ *
+ * For a straight-up WebSocket endpoint this means handling the initial
+ * handshake request but for a SockJS endpoint this means handling all HTTP
+ * requests defined in the SockJS protocol.
+ *
+ * @author Rossen Stoyanchev
+ * @since 5.0
+ * @see HandshakeWebSocketService
+ */
+public interface WebSocketService {
+
+ /**
+ * Handle the HTTP request and use the given {@link WebSocketHandler}.
+ * @param exchange the current exchange
+ * @param webSocketHandler handler for WebSocket session
+ * @return a completion Mono for the WebSocket session handling
+ */
+ Mono handleRequest(ServerWebExchange exchange, WebSocketHandler webSocketHandler);
+
+}
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/package-info.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/package-info.java
new file mode 100644
index 00000000000..0c0cb9c23c9
--- /dev/null
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Server support for WebSocket interactions.
+ */
+package org.springframework.web.reactive.socket.server;
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/support/HandshakeWebSocketService.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/support/HandshakeWebSocketService.java
new file mode 100644
index 00000000000..b96d813963b
--- /dev/null
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/support/HandshakeWebSocketService.java
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2002-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.web.reactive.socket.server.support;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import reactor.core.publisher.Mono;
+
+import org.springframework.http.HttpMethod;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.server.reactive.ServerHttpRequest;
+import org.springframework.http.server.reactive.ServerHttpResponse;
+import org.springframework.util.Assert;
+import org.springframework.util.ClassUtils;
+import org.springframework.util.ReflectionUtils;
+import org.springframework.web.reactive.socket.WebSocketHandler;
+import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
+import org.springframework.web.reactive.socket.server.WebSocketService;
+import org.springframework.web.server.MethodNotAllowedException;
+import org.springframework.web.server.ServerWebExchange;
+
+/**
+ * A {@code WebSocketService} implementation that handles a WebSocket handshake
+ * and upgrades to a WebSocket interaction through the configured or
+ * auto-detected {@link RequestUpgradeStrategy}.
+ *
+ * @author Rossen Stoyanchev
+ * @since 5.0
+ */
+public class HandshakeWebSocketService implements WebSocketService {
+
+ private static final String SEC_WEBSOCKET_KEY = "Sec-WebSocket-Key";
+
+
+ private static final boolean rxNettyPresent = ClassUtils.isPresent(
+ "io.reactivex.netty.protocol.http.ws.WebSocketConnection",
+ HandshakeWebSocketService.class.getClassLoader());
+
+
+ protected static final Log logger = LogFactory.getLog(HandshakeWebSocketService.class);
+
+
+ private final RequestUpgradeStrategy upgradeStrategy;
+
+
+ /**
+ * Default constructor automatic, classpath detection based discovery of the
+ * {@link RequestUpgradeStrategy} to use.
+ */
+ public HandshakeWebSocketService() {
+ this(initUpgradeStrategy());
+ }
+
+ /**
+ * Alternative constructor with the {@link RequestUpgradeStrategy} to use.
+ * @param upgradeStrategy the strategy to use
+ */
+ public HandshakeWebSocketService(RequestUpgradeStrategy upgradeStrategy) {
+ Assert.notNull(upgradeStrategy, "'upgradeStrategy' is required");
+ this.upgradeStrategy = upgradeStrategy;
+ }
+
+ private static RequestUpgradeStrategy initUpgradeStrategy() {
+ String className;
+ if (rxNettyPresent) {
+ className = "RxNettyRequestUpgradeStrategy";
+ }
+ else {
+ throw new IllegalStateException("No suitable default RequestUpgradeStrategy found");
+ }
+
+ try {
+ className = HandshakeWebSocketService.class.getPackage().getName() + "." + className;
+ Class> clazz = ClassUtils.forName(className, HandshakeWebSocketService.class.getClassLoader());
+ return (RequestUpgradeStrategy) ReflectionUtils.accessibleConstructor(clazz).newInstance();
+ }
+ catch (Throwable ex) {
+ throw new IllegalStateException(
+ "Failed to instantiate RequestUpgradeStrategy: " + className, ex);
+ }
+ }
+
+
+ /**
+ * Return the {@link RequestUpgradeStrategy} for WebSocket requests.
+ */
+ public RequestUpgradeStrategy getUpgradeStrategy() {
+ return this.upgradeStrategy;
+ }
+
+
+ @Override
+ public Mono handleRequest(ServerWebExchange exchange, WebSocketHandler webSocketHandler) {
+
+ ServerHttpRequest request = exchange.getRequest();
+ ServerHttpResponse response = exchange.getResponse();
+
+ if (logger.isTraceEnabled()) {
+ logger.trace("Processing " + request.getMethod() + " " + request.getURI());
+ }
+
+ if (HttpMethod.GET != request.getMethod()) {
+ return Mono.error(new MethodNotAllowedException(
+ request.getMethod().name(), Collections.singleton("GET")));
+ }
+
+ if (!isWebSocketUpgrade(request)) {
+ response.setStatusCode(HttpStatus.BAD_REQUEST);
+ return response.setComplete();
+ }
+
+ return getUpgradeStrategy().upgrade(exchange, webSocketHandler);
+ }
+
+ private boolean isWebSocketUpgrade(ServerHttpRequest request) {
+ if (!"WebSocket".equalsIgnoreCase(request.getHeaders().getUpgrade())) {
+ if (logger.isErrorEnabled()) {
+ logger.error("Invalid 'Upgrade' header: " + request.getHeaders());
+ }
+ return false;
+ }
+ List connectionValue = request.getHeaders().getConnection();
+ if (!connectionValue.contains("Upgrade") && !connectionValue.contains("upgrade")) {
+ if (logger.isErrorEnabled()) {
+ logger.error("Invalid 'Connection' header: " + request.getHeaders());
+ }
+ return false;
+ }
+ String key = request.getHeaders().getFirst(SEC_WEBSOCKET_KEY);
+ if (key == null) {
+ if (logger.isErrorEnabled()) {
+ logger.error("Missing \"Sec-WebSocket-Key\" header");
+ }
+ return false;
+ }
+ return true;
+ }
+
+}
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/support/WebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/support/WebSocketHandlerAdapter.java
new file mode 100644
index 00000000000..81f5c7a97fd
--- /dev/null
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/support/WebSocketHandlerAdapter.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2002-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.web.reactive.socket.server.support;
+
+import reactor.core.publisher.Mono;
+
+import org.springframework.util.Assert;
+import org.springframework.web.reactive.DispatcherHandler;
+import org.springframework.web.reactive.HandlerAdapter;
+import org.springframework.web.reactive.HandlerResult;
+import org.springframework.web.reactive.socket.WebSocketHandler;
+import org.springframework.web.reactive.socket.server.WebSocketService;
+import org.springframework.web.server.ServerWebExchange;
+
+/**
+ * {@code HandlerAdapter} that allows using a {@link WebSocketHandler} contract
+ * with the generic {@link DispatcherHandler} mapping URLs directly to such
+ * handlers. Requests are handled through the configured {@link WebSocketService}.
+ *
+ * @author Rossen Stoyanchev
+ * @since 5.0
+ */
+public class WebSocketHandlerAdapter implements HandlerAdapter {
+
+ private final WebSocketService webSocketService;
+
+
+ /**
+ * Default constructor that creates and uses a
+ * {@link HandshakeWebSocketService} for a straight-up WebSocket interaction,
+ * i.e. treating incoming requests as WebSocket handshake requests.
+ */
+ public WebSocketHandlerAdapter() {
+ this(new HandshakeWebSocketService());
+ }
+
+ /**
+ * Alternative constructor with the {@link WebSocketService} to use.
+ */
+ public WebSocketHandlerAdapter(WebSocketService webSocketService) {
+ Assert.notNull(webSocketService, "'webSocketService' is required");
+ this.webSocketService = webSocketService;
+ }
+
+
+ public WebSocketService getWebSocketService() {
+ return this.webSocketService;
+ }
+
+
+ @Override
+ public boolean supports(Object handler) {
+ return WebSocketHandler.class.isAssignableFrom(handler.getClass());
+ }
+
+ @Override
+ public Mono handle(ServerWebExchange exchange, Object handler) {
+ WebSocketHandler webSocketHandler = (WebSocketHandler) handler;
+ return getWebSocketService().handleRequest(exchange, webSocketHandler).then(Mono.empty());
+ }
+
+}
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/support/package-info.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/support/package-info.java
new file mode 100644
index 00000000000..4f4a56ea33c
--- /dev/null
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/support/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Server-side support classes for WebSocket requests.
+ */
+package org.springframework.web.reactive.socket.server.support;
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/RxNettyRequestUpgradeStrategy.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/RxNettyRequestUpgradeStrategy.java
new file mode 100644
index 00000000000..b923f381665
--- /dev/null
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/RxNettyRequestUpgradeStrategy.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2002-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.web.reactive.socket.server.upgrade;
+
+import java.util.List;
+import java.util.Map;
+
+import reactor.core.publisher.Mono;
+import rx.Observable;
+import rx.RxReactiveStreams;
+
+import org.springframework.http.server.reactive.RxNettyServerHttpRequest;
+import org.springframework.http.server.reactive.RxNettyServerHttpResponse;
+import org.springframework.web.reactive.socket.WebSocketHandler;
+import org.springframework.web.reactive.socket.adapter.RxNettyWebSocketHandlerAdapter;
+import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
+import org.springframework.web.server.ServerWebExchange;
+
+/**
+ * A {@link RequestUpgradeStrategy} for use with RxNetty.
+ *
+ * @author Rossen Stoyanchev
+ * @since 5.0
+ */
+public class RxNettyRequestUpgradeStrategy implements RequestUpgradeStrategy {
+
+ @Override
+ public Mono upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler) {
+
+ RxNettyServerHttpRequest request = (RxNettyServerHttpRequest) exchange.getRequest();
+ RxNettyServerHttpResponse response = (RxNettyServerHttpResponse) exchange.getResponse();
+
+ RxNettyWebSocketHandlerAdapter rxNettyHandler =
+ new RxNettyWebSocketHandlerAdapter(request, response, webSocketHandler);
+
+ Observable completion = response.getRxNettyResponse()
+ .acceptWebSocketUpgrade(rxNettyHandler)
+ .subprotocol(getSubProtocols(webSocketHandler));
+
+ return Mono.from(RxReactiveStreams.toPublisher(completion));
+ }
+
+ private static String[] getSubProtocols(WebSocketHandler webSocketHandler) {
+ List subProtocols = webSocketHandler.getSubProtocols();
+ return subProtocols.toArray(new String[subProtocols.size()]);
+ }
+
+}
diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/package-info.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/package-info.java
new file mode 100644
index 00000000000..6fdce1fc11b
--- /dev/null
+++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Holds implementations of
+ * {@link org.springframework.web.reactive.socket.server.RequestUpgradeStrategy}.
+ */
+package org.springframework.web.reactive.socket.server.upgrade;
diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/AbstractWebSocketHandlerIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/AbstractWebSocketHandlerIntegrationTests.java
new file mode 100644
index 00000000000..ec73d6a3316
--- /dev/null
+++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/AbstractWebSocketHandlerIntegrationTests.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2002-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.web.reactive.socket.server;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+import org.springframework.context.annotation.AnnotationConfigApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.http.server.reactive.HttpHandler;
+import org.springframework.http.server.reactive.bootstrap.HttpServer;
+import org.springframework.http.server.reactive.bootstrap.RxNettyHttpServer;
+import org.springframework.util.SocketUtils;
+import org.springframework.web.reactive.DispatcherHandler;
+import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService;
+import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
+import org.springframework.web.reactive.socket.server.upgrade.RxNettyRequestUpgradeStrategy;
+
+/**
+ * Base class for WebSocket integration tests involving a server-side
+ * {@code WebSocketHandler}. Sub-classes to return a Spring configuration class
+ * via {@link #getWebConfigClass()} containing a SimpleUrlHandlerMapping with
+ * pattern-to-WebSocketHandler mappings.
+ *
+ * @author Rossen Stoyanchev
+ */
+@RunWith(Parameterized.class)
+@SuppressWarnings({"unused", "WeakerAccess"})
+public abstract class AbstractWebSocketHandlerIntegrationTests {
+
+ protected int port;
+
+ @Parameter(0)
+ public HttpServer server;
+
+ @Parameter(1)
+ public Class> handlerAdapterConfigClass;
+
+
+ @Parameters
+ public static Object[][] arguments() {
+ return new Object[][] {
+ {new RxNettyHttpServer(), RxNettyConfig.class}
+ };
+ }
+
+
+ @Before
+ public void setup() throws Exception {
+ this.port = SocketUtils.findAvailableTcpPort();
+ this.server.setPort(this.port);
+ this.server.setHandler(createHttpHandler());
+ this.server.afterPropertiesSet();
+ this.server.start();
+ }
+
+ private HttpHandler createHttpHandler() {
+ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
+ context.register(DispatcherConfig.class, this.handlerAdapterConfigClass);
+ context.register(getWebConfigClass());
+ context.refresh();
+ return DispatcherHandler.toHttpHandler(context);
+ }
+
+ protected abstract Class> getWebConfigClass();
+
+ @After
+ public void tearDown() throws Exception {
+ this.server.stop();
+ }
+
+
+ @Configuration
+ static class DispatcherConfig {
+
+ @Bean
+ public DispatcherHandler webHandler() {
+ return new DispatcherHandler();
+ }
+ }
+
+ static abstract class AbstractHandlerAdapterConfig {
+
+ @Bean
+ public WebSocketHandlerAdapter handlerAdapter() {
+ RequestUpgradeStrategy strategy = createUpgradeStrategy();
+ WebSocketService service = new HandshakeWebSocketService(strategy);
+ return new WebSocketHandlerAdapter(service);
+ }
+
+ protected abstract RequestUpgradeStrategy createUpgradeStrategy();
+
+ }
+
+ @Configuration
+ static class RxNettyConfig extends AbstractHandlerAdapterConfig {
+
+ @Override
+ protected RequestUpgradeStrategy createUpgradeStrategy() {
+ return new RxNettyRequestUpgradeStrategy();
+ }
+ }
+
+}
diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/BasicWebSocketHandlerIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/BasicWebSocketHandlerIntegrationTests.java
new file mode 100644
index 00000000000..cd2dd26b002
--- /dev/null
+++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/BasicWebSocketHandlerIntegrationTests.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2002-2016 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.web.reactive.socket.server;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketFrame;
+import io.reactivex.netty.protocol.http.client.HttpClient;
+import io.reactivex.netty.protocol.http.ws.client.WebSocketResponse;
+import org.junit.Test;
+import reactor.core.publisher.Mono;
+import rx.Observable;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.reactive.HandlerMapping;
+import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
+import org.springframework.web.reactive.socket.WebSocketHandler;
+import org.springframework.web.reactive.socket.WebSocketSession;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Basic WebSocket integration
+ * @author Rossen Stoyanchev
+ */
+@SuppressWarnings({"unused", "WeakerAccess"})
+public class BasicWebSocketHandlerIntegrationTests extends AbstractWebSocketHandlerIntegrationTests {
+
+
+ @Override
+ protected Class> getWebConfigClass() {
+ return WebConfig.class;
+ }
+
+
+ @Test
+ public void echo() throws Exception {
+ Observable messages = Observable.range(1, 10).map(i -> "Interval " + i);
+ List actual = HttpClient.newClient("localhost", this.port)
+ .createGet("/echo")
+ .requestWebSocketUpgrade()
+ .flatMap(WebSocketResponse::getWebSocketConnection)
+ .flatMap(conn -> conn.write(messages
+ .map(TextWebSocketFrame::new)
+ .cast(WebSocketFrame.class)
+ .concatWith(Observable.just(new CloseWebSocketFrame())))
+ .cast(WebSocketFrame.class)
+ .mergeWith(conn.getInput())
+ )
+ .take(10)
+ .map(frame -> frame.content().toString(StandardCharsets.UTF_8))
+ .toList().toBlocking().first();
+ List expected = messages.toList().toBlocking().first();
+ assertEquals(expected, actual);
+ }
+
+
+ @Configuration
+ static class WebConfig {
+
+ @Bean
+ public HandlerMapping handlerMapping() {
+
+ Map map = new HashMap<>();
+ map.put("/echo", new EchoWebSocketHandler());
+
+ SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
+ mapping.setUrlMap(map);
+ return mapping;
+ }
+
+ }
+
+ private static class EchoWebSocketHandler implements WebSocketHandler {
+
+ @Override
+ public Mono handle(WebSocketSession session) {
+ return session.send(session.receive());
+ }
+ }
+
+}