diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java index 6287832c914..860fe3caa02 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java @@ -21,6 +21,8 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import org.springframework.core.io.buffer.DataBufferFactory; + /** * Representation for a WebSocket session. * @@ -39,6 +41,12 @@ public interface WebSocketSession { */ URI getUri(); + /** + * Return a {@link DataBufferFactory} that can be used for creating message payloads. + * @return a buffer factory + */ + DataBufferFactory bufferFactory(); + /** * Get the flux of incoming messages. */ diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java index 40aad9df42b..30e2576b458 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java @@ -24,9 +24,9 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.http.server.reactive.AbstractListenerReadPublisher; import org.springframework.http.server.reactive.AbstractListenerWriteProcessor; -import org.springframework.util.Assert; import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketMessage.Type; @@ -48,10 +48,6 @@ public abstract class AbstractListenerWebSocketSession extends WebSocketSessi private static final int RECEIVE_BUFFER_SIZE = 8192; - private final String id; - - private final URI uri; - private final WebSocketReceivePublisher receivePublisher = new WebSocketReceivePublisher(); private volatile WebSocketSendProcessor sendProcessor; @@ -59,24 +55,10 @@ public abstract class AbstractListenerWebSocketSession extends WebSocketSessi private final AtomicBoolean sendCalled = new AtomicBoolean(); - public AbstractListenerWebSocketSession(T delegate, String id, URI uri) { - super(delegate); - Assert.notNull(id, "'id' is required."); - Assert.notNull(uri, "'uri' is required."); - this.id = id; - this.uri = uri; - } - - - @Override - public String getId() { - return this.id; + public AbstractListenerWebSocketSession(T delegate, String id, URI uri, DataBufferFactory bufferFactory) { + super(delegate, id, uri, bufferFactory); } - @Override - public URI getUri() { - return this.uri; - } protected WebSocketSendProcessor getSendProcessor() { return this.sendProcessor; diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java index a7e9c17c469..58a8f9b2626 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java @@ -63,7 +63,7 @@ public class JettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport @OnWebSocketConnect public void onWebSocketConnect(Session session) { - this.session = new JettyWebSocketSession(session); + this.session = new JettyWebSocketSession(session, getUri(), getBufferFactory()); HandlerResultSubscriber subscriber = new HandlerResultSubscriber(); getDelegate().handle(this.session).subscribe(subscriber); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java index 0bd4af2a943..99715f5f9da 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java @@ -17,6 +17,7 @@ package org.springframework.web.reactive.socket.adapter; import java.io.IOException; +import java.net.URI; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -24,6 +25,7 @@ import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.WriteCallback; import reactor.core.publisher.Mono; +import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.util.ObjectUtils; import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.WebSocketMessage; @@ -38,9 +40,9 @@ import org.springframework.web.reactive.socket.WebSocketSession; */ public class JettyWebSocketSession extends AbstractListenerWebSocketSession { - public JettyWebSocketSession(Session session) { - super(session, ObjectUtils.getIdentityHexString(session), - session.getUpgradeRequest().getRequestURI()); + + public JettyWebSocketSession(Session session, URI uri, DataBufferFactory bufferFactory) { + super(session, ObjectUtils.getIdentityHexString(session), uri, bufferFactory); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java index 8095e797494..031cd34e053 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java @@ -31,7 +31,6 @@ import reactor.core.publisher.Flux; import org.springframework.core.io.buffer.NettyDataBuffer; import org.springframework.core.io.buffer.NettyDataBufferFactory; -import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketSession; @@ -55,32 +54,16 @@ public abstract class NettyWebSocketSessionSupport extends WebSocketSessionSu } - protected final String id; - - protected final URI uri; - - protected final NettyDataBufferFactory bufferFactory; - - - protected NettyWebSocketSessionSupport(T delegate, URI uri, NettyDataBufferFactory factory) { - super(delegate); - Assert.notNull(uri, "'uri' is required."); - Assert.notNull(uri, "'bufferFactory' is required."); - this.uri = uri; - this.bufferFactory = factory; - this.id = ObjectUtils.getIdentityHexString(getDelegate()); + protected NettyWebSocketSessionSupport(T delegate, URI uri, NettyDataBufferFactory bufferFactory) { + super(delegate, ObjectUtils.getIdentityHexString(delegate), uri, bufferFactory); } @Override - public String getId() { - return this.id; + public NettyDataBufferFactory bufferFactory() { + return (NettyDataBufferFactory) super.bufferFactory(); } - @Override - public URI getUri() { - return this.uri; - } protected Flux toMessageFlux(Flux frameFlux) { return frameFlux @@ -94,11 +77,11 @@ public abstract class NettyWebSocketSessionSupport extends WebSocketSessionSu private WebSocketMessage toMessage(List frames) { Class frameType = frames.get(0).getClass(); if (frames.size() == 1) { - NettyDataBuffer buffer = this.bufferFactory.wrap(frames.get(0).content()); + NettyDataBuffer buffer = bufferFactory().wrap(frames.get(0).content()); return WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer); } return frames.stream() - .map(socketFrame -> bufferFactory.wrap(socketFrame.content())) + .map(socketFrame -> bufferFactory().wrap(socketFrame.content())) .reduce(NettyDataBuffer::write) .map(buffer -> WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer)) .get(); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java index 6dca8680718..08b97a0369f 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java @@ -41,8 +41,8 @@ import org.springframework.web.reactive.socket.WebSocketSession; public class ReactorNettyWebSocketSession extends NettyWebSocketSessionSupport { - protected ReactorNettyWebSocketSession(WebsocketInbound inbound, - WebsocketOutbound outbound, + + protected ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound, URI uri, NettyDataBufferFactory factory) { super(new WebSocketConnection(inbound, outbound), uri, factory); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java index d4fef38c59d..4b708675165 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java @@ -40,6 +40,7 @@ import org.springframework.web.reactive.socket.WebSocketSession; */ public class RxNettyWebSocketSession extends NettyWebSocketSessionSupport { + public RxNettyWebSocketSession(WebSocketConnection conn, URI uri, NettyDataBufferFactory factory) { super(conn, uri, factory); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java index 74b65f7b84b..36ba805a27f 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java @@ -67,7 +67,8 @@ public class TomcatWebSocketHandlerAdapter extends WebSocketHandlerAdapterSuppor @Override public void onOpen(Session session, EndpointConfig config) { - TomcatWebSocketHandlerAdapter.this.session = new TomcatWebSocketSession(session); + TomcatWebSocketHandlerAdapter.this.session = + new TomcatWebSocketSession(session, getUri(), getBufferFactory()); session.addMessageHandler(String.class, message -> { WebSocketMessage webSocketMessage = toMessage(message); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java index aeafea79db0..72fd76d8b40 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java @@ -17,6 +17,7 @@ package org.springframework.web.reactive.socket.adapter; import java.io.IOException; +import java.net.URI; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import javax.websocket.CloseReason; @@ -27,6 +28,7 @@ import javax.websocket.Session; import reactor.core.publisher.Mono; +import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketSession; @@ -40,8 +42,9 @@ import org.springframework.web.reactive.socket.WebSocketSession; */ public class TomcatWebSocketSession extends AbstractListenerWebSocketSession { - public TomcatWebSocketSession(Session session) { - super(session, session.getId(), session.getRequestURI()); + + public TomcatWebSocketSession(Session session, URI uri, DataBufferFactory bufferFactory) { + super(session, session.getId(), uri, bufferFactory); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java index 17169806fb5..c594ccffc13 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java @@ -59,7 +59,7 @@ public class UndertowWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupp @Override public void onConnect(WebSocketHttpExchange exchange, WebSocketChannel channel) { - this.session = new UndertowWebSocketSession(channel, getUri()); + this.session = new UndertowWebSocketSession(channel, getUri(), getBufferFactory()); channel.getReceiveSetter().set(new UndertowReceiveListener()); channel.resumeReceives(); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java index 0b70e89b22b..46d3432033a 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java @@ -27,6 +27,7 @@ import io.undertow.websockets.core.WebSocketChannel; import io.undertow.websockets.core.WebSockets; import reactor.core.publisher.Mono; +import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.util.ObjectUtils; import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.WebSocketMessage; @@ -41,8 +42,9 @@ import org.springframework.web.reactive.socket.WebSocketSession; */ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession { - public UndertowWebSocketSession(WebSocketChannel channel, URI url) { - super(channel, ObjectUtils.getIdentityHexString(channel), url); + + public UndertowWebSocketSession(WebSocketChannel channel, URI url, DataBufferFactory bufferFactory) { + super(channel, ObjectUtils.getIdentityHexString(channel), url, bufferFactory); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java index 0e5ec0daf71..33a5a3529c2 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java @@ -16,10 +16,13 @@ package org.springframework.web.reactive.socket.adapter; +import java.net.URI; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import reactor.core.publisher.Mono; +import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.util.Assert; import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.WebSocketSession; @@ -39,14 +42,26 @@ public abstract class WebSocketSessionSupport implements WebSocketSession { private final T delegate; + private final String id; + + private final URI uri; + + private final DataBufferFactory bufferFactory; + /** * Create a new instance and associate the given attributes with it. * @param delegate the underlying WebSocket connection */ - protected WebSocketSessionSupport(T delegate) { - Assert.notNull(delegate, "'delegate' session is required."); + protected WebSocketSessionSupport(T delegate, String id, URI uri, DataBufferFactory bufferFactory) { + Assert.notNull(delegate, "Native session is required."); + Assert.notNull(id, "'id' is required."); + Assert.notNull(uri, "URI is required."); + Assert.notNull(bufferFactory, "DataBufferFactory is required."); this.delegate = delegate; + this.id = id; + this.uri = uri; + this.bufferFactory = bufferFactory; } @@ -57,6 +72,21 @@ public abstract class WebSocketSessionSupport implements WebSocketSession { return this.delegate; } + @Override + public String getId() { + return this.id; + } + + @Override + public URI getUri() { + return this.uri; + } + + @Override + public DataBufferFactory bufferFactory() { + return this.bufferFactory; + } + @Override public final Mono close(CloseStatus status) {