From fb4e34fce486315935a0292ac0dbe7e0b1497f7f Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 16 May 2013 12:16:56 -0400 Subject: [PATCH] Add partial WebSocketMessage support --- .../web/socket/BinaryMessage.java | 43 +++++++++------- .../web/socket/TextMessage.java | 18 ++++++- .../web/socket/WebSocketHandler.java | 5 ++ .../web/socket/WebSocketMessage.java | 22 ++++++-- .../JettyWebSocketListenerAdapter.java | 2 +- .../adapter/StandardEndpointAdapter.java | 50 ++++++++++++------- .../StandardWebSocketSessionAdapter.java | 4 +- .../adapter/WebSocketHandlerAdapter.java | 8 ++- .../LoggingWebSocketHandlerDecorator.java | 4 +- .../PerConnectionWebSocketHandler.java | 12 +++++ .../support/WebSocketHandlerDecorator.java | 6 +++ 11 files changed, 126 insertions(+), 48 deletions(-) diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/BinaryMessage.java b/spring-websocket/src/main/java/org/springframework/web/socket/BinaryMessage.java index efff9b0b623..3f779a0147d 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/BinaryMessage.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/BinaryMessage.java @@ -15,8 +15,6 @@ */ package org.springframework.web.socket; -import java.io.ByteArrayInputStream; -import java.io.InputStream; import java.nio.ByteBuffer; @@ -34,11 +32,9 @@ public final class BinaryMessage extends WebSocketMessage { /** * Create a new {@link BinaryMessage} instance. * @param payload a non-null payload - * @param isLast if the message is the last of a series of partial messages */ public BinaryMessage(ByteBuffer payload) { - super(payload); - this.bytes = null; + this(payload, true); } /** @@ -46,8 +42,26 @@ public final class BinaryMessage extends WebSocketMessage { * @param payload a non-null payload * @param isLast if the message is the last of a series of partial messages */ + public BinaryMessage(ByteBuffer payload, boolean isLast) { + super(payload, isLast); + this.bytes = null; + } + + /** + * Create a new {@link BinaryMessage} instance. + * @param payload a non-null payload + */ public BinaryMessage(byte[] payload) { - this(payload, 0, (payload == null ? 0 : payload.length)); + this(payload, 0, (payload == null ? 0 : payload.length), true); + } + + /** + * Create a new {@link BinaryMessage} instance. + * @param payload a non-null payload + * @param isLast if the message is the last of a series of partial messages + */ + public BinaryMessage(byte[] payload, boolean isLast) { + this(payload, 0, (payload == null ? 0 : payload.length), isLast); } /** @@ -58,8 +72,8 @@ public final class BinaryMessage extends WebSocketMessage { * @param len the length of the array considered for the payload * @param isLast if the message is the last of a series of partial messages */ - public BinaryMessage(byte[] payload, int offset, int len) { - super(payload != null ? ByteBuffer.wrap(payload, offset, len) : null); + public BinaryMessage(byte[] payload, int offset, int len, boolean isLast) { + super(payload != null ? ByteBuffer.wrap(payload, offset, len) : null, isLast); if(offset == 0 && len == payload.length) { this.bytes = payload; } @@ -82,18 +96,9 @@ public final class BinaryMessage extends WebSocketMessage { return result; } - /** - * Returns access to the message payload as an {@link InputStream}. - */ - public InputStream getInputStream() { - byte[] array = getByteArray(); - return (array != null) ? new ByteArrayInputStream(array) : null; - } - @Override - public String toString() { - int size = (getPayload() != null) ? getPayload().remaining() : 0; - return "WebSocket binary message size=" + size; + protected int getPayloadSize() { + return (getPayload() != null) ? getPayload().remaining() : 0; } } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/TextMessage.java b/spring-websocket/src/main/java/org/springframework/web/socket/TextMessage.java index 0bcc9f9b7c8..370f433953d 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/TextMessage.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/TextMessage.java @@ -27,12 +27,23 @@ import java.io.StringReader; */ public final class TextMessage extends WebSocketMessage { + /** * Create a new {@link TextMessage} instance. * @param payload the payload + * @param isLast whether this the last part of a message received or transmitted in parts */ public TextMessage(CharSequence payload) { - super(payload.toString()); + super(payload.toString(), true); + } + + /** + * Create a new {@link TextMessage} instance. + * @param payload the payload + * @param isLast whether this the last part of a message received or transmitted in parts + */ + public TextMessage(CharSequence payload, boolean isLast) { + super(payload.toString(), isLast); } /** @@ -42,4 +53,9 @@ public final class TextMessage extends WebSocketMessage { return new StringReader(getPayload()); } + @Override + protected int getPayloadSize() { + return getPayload().length(); + } + } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/WebSocketHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/WebSocketHandler.java index 29ae6ae46a3..600785bf6bc 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/WebSocketHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/WebSocketHandler.java @@ -73,4 +73,9 @@ public interface WebSocketHandler { */ void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception; + /** + * Whether the WebSocketHandler handles messages in parts. + */ + boolean supportsPartialMessages(); + } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/WebSocketMessage.java b/spring-websocket/src/main/java/org/springframework/web/socket/WebSocketMessage.java index c3b40661308..23a32efcae3 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/WebSocketMessage.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/WebSocketMessage.java @@ -31,14 +31,17 @@ public abstract class WebSocketMessage { private final T payload; + private final boolean last; + /** * Create a new {@link WebSocketMessage} instance with the given payload. * @param payload a non-null payload */ - WebSocketMessage(T payload) { + WebSocketMessage(T payload, boolean isLast) { Assert.notNull(payload, "Payload must not be null"); this.payload = payload; + this.last = isLast; } /** @@ -48,9 +51,13 @@ public abstract class WebSocketMessage { return this.payload; } - @Override - public String toString() { - return getClass().getSimpleName() + " [payload=" + this.payload + "]"; + /** + * Whether this is the last part of a message, when partial message support on a + * {@link WebSocketHandler} is enabled. If partial message support is not enabled the + * returned value is always {@code true}. + */ + public boolean isLast() { + return this.last; } @Override @@ -70,4 +77,11 @@ public abstract class WebSocketMessage { return ObjectUtils.nullSafeEquals(this.payload, otherMessage.payload); } + @Override + public String toString() { + return getClass().getSimpleName() + " [payload length=" + getPayloadSize() + ", last=" + isLast() + "]"; + } + + protected abstract int getPayloadSize(); + } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/adapter/JettyWebSocketListenerAdapter.java b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/JettyWebSocketListenerAdapter.java index c0bb077efe7..6022f778b42 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/adapter/JettyWebSocketListenerAdapter.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/JettyWebSocketListenerAdapter.java @@ -74,7 +74,7 @@ public class JettyWebSocketListenerAdapter implements WebSocketListener { @Override public void onWebSocketBinary(byte[] payload, int offset, int len) { - BinaryMessage message = new BinaryMessage(payload, offset, len); + BinaryMessage message = new BinaryMessage(payload, offset, len, true); try { this.webSocketHandler.handleMessage(this.wsSession, message); } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/adapter/StandardEndpointAdapter.java b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/StandardEndpointAdapter.java index 7cbb631440c..7bf3c3247ba 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/adapter/StandardEndpointAdapter.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/StandardEndpointAdapter.java @@ -61,6 +61,35 @@ public class StandardEndpointAdapter extends Endpoint { this.wsSession.initSession(session); + if (this.handler.supportsPartialMessages()) { + session.addMessageHandler(new MessageHandler.Partial() { + @Override + public void onMessage(String message, boolean isLast) { + handleTextMessage(session, message, isLast); + } + }); + session.addMessageHandler(new MessageHandler.Partial() { + @Override + public void onMessage(ByteBuffer message, boolean isLast) { + handleBinaryMessage(session, message, isLast); + } + }); + } + else { + session.addMessageHandler(new MessageHandler.Whole() { + @Override + public void onMessage(String message) { + handleTextMessage(session, message, true); + } + }); + session.addMessageHandler(new MessageHandler.Whole() { + @Override + public void onMessage(ByteBuffer message) { + handleBinaryMessage(session, message, true); + } + }); + } + try { this.handler.afterConnectionEstablished(this.wsSession); } @@ -68,23 +97,10 @@ public class StandardEndpointAdapter extends Endpoint { ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger); return; } - - session.addMessageHandler(new MessageHandler.Whole() { - @Override - public void onMessage(String message) { - handleTextMessage(session, message); - } - }); - session.addMessageHandler(new MessageHandler.Whole() { - @Override - public void onMessage(ByteBuffer message) { - handleBinaryMessage(session, message); - } - }); } - private void handleTextMessage(javax.websocket.Session session, String payload) { - TextMessage textMessage = new TextMessage(payload); + private void handleTextMessage(javax.websocket.Session session, String payload, boolean isLast) { + TextMessage textMessage = new TextMessage(payload, isLast); try { this.handler.handleMessage(this.wsSession, textMessage); } @@ -93,8 +109,8 @@ public class StandardEndpointAdapter extends Endpoint { } } - private void handleBinaryMessage(javax.websocket.Session session, ByteBuffer payload) { - BinaryMessage binaryMessage = new BinaryMessage(payload); + private void handleBinaryMessage(javax.websocket.Session session, ByteBuffer payload, boolean isLast) { + BinaryMessage binaryMessage = new BinaryMessage(payload, isLast); try { this.handler.handleMessage(this.wsSession, binaryMessage); } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/adapter/StandardWebSocketSessionAdapter.java b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/StandardWebSocketSessionAdapter.java index 2d533d54860..cff1931a528 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/adapter/StandardWebSocketSessionAdapter.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/StandardWebSocketSessionAdapter.java @@ -110,12 +110,12 @@ public class StandardWebSocketSessionAdapter extends AbstractWebSocketSesssionAd @Override protected void sendTextMessage(TextMessage message) throws IOException { - this.session.getBasicRemote().sendText(message.getPayload()); + this.session.getBasicRemote().sendText(message.getPayload(), message.isLast()); } @Override protected void sendBinaryMessage(BinaryMessage message) throws IOException { - this.session.getBasicRemote().sendBinary(message.getPayload()); + this.session.getBasicRemote().sendBinary(message.getPayload(), message.isLast()); } @Override diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/adapter/WebSocketHandlerAdapter.java b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/WebSocketHandlerAdapter.java index 5aef456fa69..fefd330cf7d 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/adapter/WebSocketHandlerAdapter.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/adapter/WebSocketHandlerAdapter.java @@ -38,7 +38,7 @@ public class WebSocketHandlerAdapter implements WebSocketHandler { } @Override - public final void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { + public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { if (message instanceof TextMessage) { handleTextMessage(session, (TextMessage) message); } @@ -46,7 +46,6 @@ public class WebSocketHandlerAdapter implements WebSocketHandler { handleBinaryMessage(session, (BinaryMessage) message); } else { - // should not happen throw new IllegalStateException("Unexpected WebSocket message type: " + message); } } @@ -65,4 +64,9 @@ public class WebSocketHandlerAdapter implements WebSocketHandler { public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { } + @Override + public boolean supportsPartialMessages() { + return false; + } + } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/support/LoggingWebSocketHandlerDecorator.java b/spring-websocket/src/main/java/org/springframework/web/socket/support/LoggingWebSocketHandlerDecorator.java index 52b959cb783..1fecb8fd804 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/support/LoggingWebSocketHandlerDecorator.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/support/LoggingWebSocketHandlerDecorator.java @@ -50,8 +50,8 @@ public class LoggingWebSocketHandlerDecorator extends WebSocketHandlerDecorator @Override public void handleMessage(WebSocketSession session, WebSocketMessage message) throws Exception { - if (logger.isTraceEnabled()) { - logger.trace("Received " + message + ", " + session); + if (logger.isDebugEnabled()) { + logger.debug("Received " + message + ", " + session); } super.handleMessage(session, message); } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/support/PerConnectionWebSocketHandler.java b/spring-websocket/src/main/java/org/springframework/web/socket/support/PerConnectionWebSocketHandler.java index c9ad6797dfb..97bd27ed242 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/support/PerConnectionWebSocketHandler.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/support/PerConnectionWebSocketHandler.java @@ -57,9 +57,16 @@ public class PerConnectionWebSocketHandler implements WebSocketHandler, BeanFact private final Map handlers = new ConcurrentHashMap(); + private final boolean supportsPartialMessages; + public PerConnectionWebSocketHandler(Class handlerType) { + this(handlerType, false); + } + + public PerConnectionWebSocketHandler(Class handlerType, boolean supportsPartialMessages) { this.provider = new BeanCreatingHandlerProvider(handlerType); + this.supportsPartialMessages = supportsPartialMessages; } @Override @@ -112,6 +119,11 @@ public class PerConnectionWebSocketHandler implements WebSocketHandler, BeanFact } } + @Override + public boolean supportsPartialMessages() { + return this.supportsPartialMessages; + } + @Override public String toString() { return "PerConnectionWebSocketHandlerProxy [handlerType=" + this.provider.getHandlerType() + "]"; diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/support/WebSocketHandlerDecorator.java b/spring-websocket/src/main/java/org/springframework/web/socket/support/WebSocketHandlerDecorator.java index e9d7db761dd..20fcec11d6b 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/support/WebSocketHandlerDecorator.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/support/WebSocketHandlerDecorator.java @@ -62,6 +62,12 @@ public class WebSocketHandlerDecorator implements WebSocketHandler { this.delegate.afterConnectionClosed(session, closeStatus); } + @Override + public boolean supportsPartialMessages() { + return this.delegate.supportsPartialMessages(); + } + + @Override public String toString() { return getClass().getSimpleName() + " [delegate=" + this.delegate + "]";