From edcf04911f4ca3a045194c0a7e90b2d48603929b Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Sat, 17 Dec 2016 15:46:37 -0500 Subject: [PATCH] Refactoring in reactive WebSocketMessage Move WebSocketMessage factory methods to the WebSocketSession which has the bufferFactory() needed to create message payloads. WebSocketMessage is left with one public constructor. WebSocketMessage exposes convenience retain/releasePayload methods. --- .../web/reactive/socket/WebSocketMessage.java | 87 +++++++++++-------- .../web/reactive/socket/WebSocketSession.java | 26 ++++++ .../adapter/JettyWebSocketHandlerAdapter.java | 6 +- .../adapter/NettyWebSocketSessionSupport.java | 4 +- .../TomcatWebSocketHandlerAdapter.java | 6 +- .../UndertowWebSocketHandlerAdapter.java | 6 +- .../adapter/WebSocketSessionSupport.java | 28 ++++++ 7 files changed, 114 insertions(+), 49 deletions(-) diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java index 66afe78b313..128e5d5484d 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketMessage.java @@ -15,7 +15,10 @@ */ package org.springframework.web.reactive.socket; +import java.nio.charset.StandardCharsets; + import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; @@ -34,9 +37,17 @@ public class WebSocketMessage { /** - * Private constructor. See static factory methods. + * Constructor for a WebSocketMessage. To create, see factory methods: + * + *

Alternatively use {@link WebSocketSession#bufferFactory()} to create + * the payload and then invoke this constructor. */ - private WebSocketMessage(Type type, DataBuffer payload) { + public WebSocketMessage(Type type, DataBuffer payload) { Assert.notNull(type, "'type' must not be null"); Assert.notNull(payload, "'payload' must not be null"); this.type = type; @@ -58,6 +69,42 @@ public class WebSocketMessage { return this.payload; } + /** + * Return the message payload as UTF-8 text. This is a useful for text + * WebSocket messages. + */ + public String getPayloadAsText() { + byte[] bytes = new byte[this.payload.readableByteCount()]; + this.payload.read(bytes); + return new String(bytes, StandardCharsets.UTF_8); + } + + /** + * Retain the data buffer for the message payload, which is useful on + * runtimes with pooled buffers, e.g. Netty. A shortcut for: + *

+	 * DataBuffer payload = message.getPayload();
+	 * DataBufferUtils.retain(payload);
+	 * 
+ * @see DataBufferUtils#retain(DataBuffer) + */ + public void retainPayload() { + DataBufferUtils.retain(this.payload); + } + + /** + * Release the data buffer for the message payload, which is useful on + * runtimes with pooled buffers, e.g. Netty. This is a shortcut for: + *
+	 * DataBuffer payload = message.getPayload();
+	 * DataBufferUtils.release(payload);
+	 * 
+ * @see DataBufferUtils#release(DataBuffer) + */ + public void releasePayload() { + DataBufferUtils.release(this.payload); + } + @Override public boolean equals(Object other) { @@ -78,42 +125,6 @@ public class WebSocketMessage { } - /** - * Factory method to create a text WebSocket message. - */ - public static WebSocketMessage text(DataBuffer payload) { - return create(Type.TEXT, payload); - } - - /** - * Factory method to create a binary WebSocket message. - */ - public static WebSocketMessage binary(DataBuffer payload) { - return create(Type.BINARY, payload); - } - - /** - * Factory method to create a ping WebSocket message. - */ - public static WebSocketMessage ping(DataBuffer payload) { - return create(Type.PING, payload); - } - - /** - * Factory method to create a pong WebSocket message. - */ - public static WebSocketMessage pong(DataBuffer payload) { - return create(Type.PONG, payload); - } - - /** - * Factory method to create a WebSocket message of the given type. - */ - public static WebSocketMessage create(Type type, DataBuffer payload) { - return new WebSocketMessage(type, payload); - } - - /** * WebSocket message types. */ diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java index 860fe3caa02..5f8720cb183 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java @@ -16,11 +16,13 @@ package org.springframework.web.reactive.socket; import java.net.URI; +import java.util.function.Function; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; /** @@ -58,6 +60,30 @@ public interface WebSocketSession { */ Mono send(Publisher messages); + /** + * Factory method to create a text {@link WebSocketMessage} using the + * {@link #bufferFactory()} for the session. + */ + WebSocketMessage textMessage(String payload); + + /** + * Factory method to create a binary WebSocketMessage using the + * {@link #bufferFactory()} for the session. + */ + WebSocketMessage binaryMessage(Function payloadFactory); + + /** + * Factory method to create a ping WebSocketMessage using the + * {@link #bufferFactory()} for the session. + */ + WebSocketMessage pingMessage(Function payloadFactory); + + /** + * Factory method to create a pong WebSocketMessage using the + * {@link #bufferFactory()} for the session. + */ + WebSocketMessage pongMessage(Function payloadFactory); + /** * Close the WebSocket session with {@link CloseStatus#NORMAL}. */ diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java index 58a8f9b2626..540ec8eac7b 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java @@ -101,15 +101,15 @@ public class JettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport if (Type.TEXT.equals(type)) { byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8); DataBuffer buffer = getBufferFactory().wrap(bytes); - return WebSocketMessage.create(Type.TEXT, buffer); + return new WebSocketMessage(Type.TEXT, buffer); } else if (Type.BINARY.equals(type)) { DataBuffer buffer = getBufferFactory().wrap((ByteBuffer) message); - return WebSocketMessage.create(Type.BINARY, buffer); + return new WebSocketMessage(Type.BINARY, buffer); } else if (Type.PONG.equals(type)) { DataBuffer buffer = getBufferFactory().wrap((ByteBuffer) message); - return WebSocketMessage.create(Type.PONG, buffer); + return new WebSocketMessage(Type.PONG, buffer); } else { throw new IllegalArgumentException("Unexpected message type: " + message); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java index 031cd34e053..b4a75823389 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java @@ -78,12 +78,12 @@ public abstract class NettyWebSocketSessionSupport extends WebSocketSessionSu Class frameType = frames.get(0).getClass(); if (frames.size() == 1) { NettyDataBuffer buffer = bufferFactory().wrap(frames.get(0).content()); - return WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer); + return new WebSocketMessage(MESSAGE_TYPES.get(frameType), buffer); } return frames.stream() .map(socketFrame -> bufferFactory().wrap(socketFrame.content())) .reduce(NettyDataBuffer::write) - .map(buffer -> WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer)) + .map(buffer -> new WebSocketMessage(MESSAGE_TYPES.get(frameType), buffer)) .get(); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java index 36ba805a27f..cd1af4728ad 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java @@ -90,15 +90,15 @@ public class TomcatWebSocketHandlerAdapter extends WebSocketHandlerAdapterSuppor private WebSocketMessage toMessage(T message) { if (message instanceof String) { byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8); - return WebSocketMessage.create(Type.TEXT, getBufferFactory().wrap(bytes)); + return new WebSocketMessage(Type.TEXT, getBufferFactory().wrap(bytes)); } else if (message instanceof ByteBuffer) { DataBuffer buffer = getBufferFactory().wrap((ByteBuffer) message); - return WebSocketMessage.create(Type.BINARY, buffer); + return new WebSocketMessage(Type.BINARY, buffer); } else if (message instanceof PongMessage) { DataBuffer buffer = getBufferFactory().wrap(((PongMessage) message).getApplicationData()); - return WebSocketMessage.create(Type.PONG, buffer); + return new WebSocketMessage(Type.PONG, buffer); } else { throw new IllegalArgumentException("Unexpected message type: " + message); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java index c594ccffc13..691c4395b0b 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java @@ -102,15 +102,15 @@ public class UndertowWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupp private WebSocketMessage toMessage(Type type, T message) { if (Type.TEXT.equals(type)) { byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8); - return WebSocketMessage.create(Type.TEXT, getBufferFactory().wrap(bytes)); + return new WebSocketMessage(Type.TEXT, getBufferFactory().wrap(bytes)); } else if (Type.BINARY.equals(type)) { DataBuffer buffer = getBufferFactory().allocateBuffer().write((ByteBuffer[]) message); - return WebSocketMessage.create(Type.BINARY, buffer); + return new WebSocketMessage(Type.BINARY, buffer); } else if (Type.PONG.equals(type)) { DataBuffer buffer = getBufferFactory().allocateBuffer().write((ByteBuffer[]) message); - return WebSocketMessage.create(Type.PONG, buffer); + return new WebSocketMessage(Type.PONG, buffer); } else { throw new IllegalArgumentException("Unexpected message type: " + message); diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java index 33a5a3529c2..b50dc7257c7 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketSessionSupport.java @@ -17,14 +17,18 @@ package org.springframework.web.reactive.socket.adapter; import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.function.Function; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import reactor.core.publisher.Mono; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.util.Assert; import org.springframework.web.reactive.socket.CloseStatus; +import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketSession; /** @@ -87,6 +91,30 @@ public abstract class WebSocketSessionSupport implements WebSocketSession { return this.bufferFactory; } + @Override + public WebSocketMessage textMessage(String payload) { + byte[] bytes = payload.getBytes(StandardCharsets.UTF_8); + DataBuffer buffer = bufferFactory().wrap(bytes); + return new WebSocketMessage(WebSocketMessage.Type.TEXT, buffer); + } + + @Override + public WebSocketMessage binaryMessage(Function payloadFactory) { + DataBuffer payload = payloadFactory.apply(bufferFactory()); + return new WebSocketMessage(WebSocketMessage.Type.BINARY, payload); + } + + @Override + public WebSocketMessage pingMessage(Function payloadFactory) { + DataBuffer payload = payloadFactory.apply(bufferFactory()); + return new WebSocketMessage(WebSocketMessage.Type.PING, payload); + } + + @Override + public WebSocketMessage pongMessage(Function payloadFactory) { + DataBuffer payload = payloadFactory.apply(bufferFactory()); + return new WebSocketMessage(WebSocketMessage.Type.PONG, payload); + } @Override public final Mono close(CloseStatus status) {