Browse Source

Add partial WebSocketMessage support

pull/292/head
Rossen Stoyanchev 13 years ago
parent
commit
fb4e34fce4
  1. 43
      spring-websocket/src/main/java/org/springframework/web/socket/BinaryMessage.java
  2. 18
      spring-websocket/src/main/java/org/springframework/web/socket/TextMessage.java
  3. 5
      spring-websocket/src/main/java/org/springframework/web/socket/WebSocketHandler.java
  4. 22
      spring-websocket/src/main/java/org/springframework/web/socket/WebSocketMessage.java
  5. 2
      spring-websocket/src/main/java/org/springframework/web/socket/adapter/JettyWebSocketListenerAdapter.java
  6. 50
      spring-websocket/src/main/java/org/springframework/web/socket/adapter/StandardEndpointAdapter.java
  7. 4
      spring-websocket/src/main/java/org/springframework/web/socket/adapter/StandardWebSocketSessionAdapter.java
  8. 8
      spring-websocket/src/main/java/org/springframework/web/socket/adapter/WebSocketHandlerAdapter.java
  9. 4
      spring-websocket/src/main/java/org/springframework/web/socket/support/LoggingWebSocketHandlerDecorator.java
  10. 12
      spring-websocket/src/main/java/org/springframework/web/socket/support/PerConnectionWebSocketHandler.java
  11. 6
      spring-websocket/src/main/java/org/springframework/web/socket/support/WebSocketHandlerDecorator.java

43
spring-websocket/src/main/java/org/springframework/web/socket/BinaryMessage.java

@ -15,8 +15,6 @@
*/ */
package org.springframework.web.socket; package org.springframework.web.socket;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -34,11 +32,9 @@ public final class BinaryMessage extends WebSocketMessage<ByteBuffer> {
/** /**
* Create a new {@link BinaryMessage} instance. * Create a new {@link BinaryMessage} instance.
* @param payload a non-null payload * @param payload a non-null payload
* @param isLast if the message is the last of a series of partial messages
*/ */
public BinaryMessage(ByteBuffer payload) { public BinaryMessage(ByteBuffer payload) {
super(payload); this(payload, true);
this.bytes = null;
} }
/** /**
@ -46,8 +42,26 @@ public final class BinaryMessage extends WebSocketMessage<ByteBuffer> {
* @param payload a non-null payload * @param payload a non-null payload
* @param isLast if the message is the last of a series of partial messages * @param isLast if the message is the last of a series of partial messages
*/ */
public BinaryMessage(ByteBuffer payload, boolean isLast) {
super(payload, isLast);
this.bytes = null;
}
/**
* Create a new {@link BinaryMessage} instance.
* @param payload a non-null payload
*/
public BinaryMessage(byte[] payload) { public BinaryMessage(byte[] payload) {
this(payload, 0, (payload == null ? 0 : payload.length)); this(payload, 0, (payload == null ? 0 : payload.length), true);
}
/**
* Create a new {@link BinaryMessage} instance.
* @param payload a non-null payload
* @param isLast if the message is the last of a series of partial messages
*/
public BinaryMessage(byte[] payload, boolean isLast) {
this(payload, 0, (payload == null ? 0 : payload.length), isLast);
} }
/** /**
@ -58,8 +72,8 @@ public final class BinaryMessage extends WebSocketMessage<ByteBuffer> {
* @param len the length of the array considered for the payload * @param len the length of the array considered for the payload
* @param isLast if the message is the last of a series of partial messages * @param isLast if the message is the last of a series of partial messages
*/ */
public BinaryMessage(byte[] payload, int offset, int len) { public BinaryMessage(byte[] payload, int offset, int len, boolean isLast) {
super(payload != null ? ByteBuffer.wrap(payload, offset, len) : null); super(payload != null ? ByteBuffer.wrap(payload, offset, len) : null, isLast);
if(offset == 0 && len == payload.length) { if(offset == 0 && len == payload.length) {
this.bytes = payload; this.bytes = payload;
} }
@ -82,18 +96,9 @@ public final class BinaryMessage extends WebSocketMessage<ByteBuffer> {
return result; return result;
} }
/**
* Returns access to the message payload as an {@link InputStream}.
*/
public InputStream getInputStream() {
byte[] array = getByteArray();
return (array != null) ? new ByteArrayInputStream(array) : null;
}
@Override @Override
public String toString() { protected int getPayloadSize() {
int size = (getPayload() != null) ? getPayload().remaining() : 0; return (getPayload() != null) ? getPayload().remaining() : 0;
return "WebSocket binary message size=" + size;
} }
} }

18
spring-websocket/src/main/java/org/springframework/web/socket/TextMessage.java

@ -27,12 +27,23 @@ import java.io.StringReader;
*/ */
public final class TextMessage extends WebSocketMessage<String> { public final class TextMessage extends WebSocketMessage<String> {
/** /**
* Create a new {@link TextMessage} instance. * Create a new {@link TextMessage} instance.
* @param payload the payload * @param payload the payload
* @param isLast whether this the last part of a message received or transmitted in parts
*/ */
public TextMessage(CharSequence payload) { public TextMessage(CharSequence payload) {
super(payload.toString()); super(payload.toString(), true);
}
/**
* Create a new {@link TextMessage} instance.
* @param payload the payload
* @param isLast whether this the last part of a message received or transmitted in parts
*/
public TextMessage(CharSequence payload, boolean isLast) {
super(payload.toString(), isLast);
} }
/** /**
@ -42,4 +53,9 @@ public final class TextMessage extends WebSocketMessage<String> {
return new StringReader(getPayload()); return new StringReader(getPayload());
} }
@Override
protected int getPayloadSize() {
return getPayload().length();
}
} }

5
spring-websocket/src/main/java/org/springframework/web/socket/WebSocketHandler.java

@ -73,4 +73,9 @@ public interface WebSocketHandler {
*/ */
void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception; void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception;
/**
* Whether the WebSocketHandler handles messages in parts.
*/
boolean supportsPartialMessages();
} }

22
spring-websocket/src/main/java/org/springframework/web/socket/WebSocketMessage.java

@ -31,14 +31,17 @@ public abstract class WebSocketMessage<T> {
private final T payload; private final T payload;
private final boolean last;
/** /**
* Create a new {@link WebSocketMessage} instance with the given payload. * Create a new {@link WebSocketMessage} instance with the given payload.
* @param payload a non-null payload * @param payload a non-null payload
*/ */
WebSocketMessage(T payload) { WebSocketMessage(T payload, boolean isLast) {
Assert.notNull(payload, "Payload must not be null"); Assert.notNull(payload, "Payload must not be null");
this.payload = payload; this.payload = payload;
this.last = isLast;
} }
/** /**
@ -48,9 +51,13 @@ public abstract class WebSocketMessage<T> {
return this.payload; return this.payload;
} }
@Override /**
public String toString() { * Whether this is the last part of a message, when partial message support on a
return getClass().getSimpleName() + " [payload=" + this.payload + "]"; * {@link WebSocketHandler} is enabled. If partial message support is not enabled the
* returned value is always {@code true}.
*/
public boolean isLast() {
return this.last;
} }
@Override @Override
@ -70,4 +77,11 @@ public abstract class WebSocketMessage<T> {
return ObjectUtils.nullSafeEquals(this.payload, otherMessage.payload); return ObjectUtils.nullSafeEquals(this.payload, otherMessage.payload);
} }
@Override
public String toString() {
return getClass().getSimpleName() + " [payload length=" + getPayloadSize() + ", last=" + isLast() + "]";
}
protected abstract int getPayloadSize();
} }

2
spring-websocket/src/main/java/org/springframework/web/socket/adapter/JettyWebSocketListenerAdapter.java

@ -74,7 +74,7 @@ public class JettyWebSocketListenerAdapter implements WebSocketListener {
@Override @Override
public void onWebSocketBinary(byte[] payload, int offset, int len) { public void onWebSocketBinary(byte[] payload, int offset, int len) {
BinaryMessage message = new BinaryMessage(payload, offset, len); BinaryMessage message = new BinaryMessage(payload, offset, len, true);
try { try {
this.webSocketHandler.handleMessage(this.wsSession, message); this.webSocketHandler.handleMessage(this.wsSession, message);
} }

50
spring-websocket/src/main/java/org/springframework/web/socket/adapter/StandardEndpointAdapter.java

@ -61,6 +61,35 @@ public class StandardEndpointAdapter extends Endpoint {
this.wsSession.initSession(session); this.wsSession.initSession(session);
if (this.handler.supportsPartialMessages()) {
session.addMessageHandler(new MessageHandler.Partial<String>() {
@Override
public void onMessage(String message, boolean isLast) {
handleTextMessage(session, message, isLast);
}
});
session.addMessageHandler(new MessageHandler.Partial<ByteBuffer>() {
@Override
public void onMessage(ByteBuffer message, boolean isLast) {
handleBinaryMessage(session, message, isLast);
}
});
}
else {
session.addMessageHandler(new MessageHandler.Whole<String>() {
@Override
public void onMessage(String message) {
handleTextMessage(session, message, true);
}
});
session.addMessageHandler(new MessageHandler.Whole<ByteBuffer>() {
@Override
public void onMessage(ByteBuffer message) {
handleBinaryMessage(session, message, true);
}
});
}
try { try {
this.handler.afterConnectionEstablished(this.wsSession); this.handler.afterConnectionEstablished(this.wsSession);
} }
@ -68,23 +97,10 @@ public class StandardEndpointAdapter extends Endpoint {
ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger); ExceptionWebSocketHandlerDecorator.tryCloseWithError(this.wsSession, t, logger);
return; return;
} }
session.addMessageHandler(new MessageHandler.Whole<String>() {
@Override
public void onMessage(String message) {
handleTextMessage(session, message);
}
});
session.addMessageHandler(new MessageHandler.Whole<ByteBuffer>() {
@Override
public void onMessage(ByteBuffer message) {
handleBinaryMessage(session, message);
}
});
} }
private void handleTextMessage(javax.websocket.Session session, String payload) { private void handleTextMessage(javax.websocket.Session session, String payload, boolean isLast) {
TextMessage textMessage = new TextMessage(payload); TextMessage textMessage = new TextMessage(payload, isLast);
try { try {
this.handler.handleMessage(this.wsSession, textMessage); this.handler.handleMessage(this.wsSession, textMessage);
} }
@ -93,8 +109,8 @@ public class StandardEndpointAdapter extends Endpoint {
} }
} }
private void handleBinaryMessage(javax.websocket.Session session, ByteBuffer payload) { private void handleBinaryMessage(javax.websocket.Session session, ByteBuffer payload, boolean isLast) {
BinaryMessage binaryMessage = new BinaryMessage(payload); BinaryMessage binaryMessage = new BinaryMessage(payload, isLast);
try { try {
this.handler.handleMessage(this.wsSession, binaryMessage); this.handler.handleMessage(this.wsSession, binaryMessage);
} }

4
spring-websocket/src/main/java/org/springframework/web/socket/adapter/StandardWebSocketSessionAdapter.java

@ -110,12 +110,12 @@ public class StandardWebSocketSessionAdapter extends AbstractWebSocketSesssionAd
@Override @Override
protected void sendTextMessage(TextMessage message) throws IOException { protected void sendTextMessage(TextMessage message) throws IOException {
this.session.getBasicRemote().sendText(message.getPayload()); this.session.getBasicRemote().sendText(message.getPayload(), message.isLast());
} }
@Override @Override
protected void sendBinaryMessage(BinaryMessage message) throws IOException { protected void sendBinaryMessage(BinaryMessage message) throws IOException {
this.session.getBasicRemote().sendBinary(message.getPayload()); this.session.getBasicRemote().sendBinary(message.getPayload(), message.isLast());
} }
@Override @Override

8
spring-websocket/src/main/java/org/springframework/web/socket/adapter/WebSocketHandlerAdapter.java

@ -38,7 +38,7 @@ public class WebSocketHandlerAdapter implements WebSocketHandler {
} }
@Override @Override
public final void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception { public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
if (message instanceof TextMessage) { if (message instanceof TextMessage) {
handleTextMessage(session, (TextMessage) message); handleTextMessage(session, (TextMessage) message);
} }
@ -46,7 +46,6 @@ public class WebSocketHandlerAdapter implements WebSocketHandler {
handleBinaryMessage(session, (BinaryMessage) message); handleBinaryMessage(session, (BinaryMessage) message);
} }
else { else {
// should not happen
throw new IllegalStateException("Unexpected WebSocket message type: " + message); throw new IllegalStateException("Unexpected WebSocket message type: " + message);
} }
} }
@ -65,4 +64,9 @@ public class WebSocketHandlerAdapter implements WebSocketHandler {
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
} }
@Override
public boolean supportsPartialMessages() {
return false;
}
} }

4
spring-websocket/src/main/java/org/springframework/web/socket/support/LoggingWebSocketHandlerDecorator.java

@ -50,8 +50,8 @@ public class LoggingWebSocketHandlerDecorator extends WebSocketHandlerDecorator
@Override @Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception { public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
if (logger.isTraceEnabled()) { if (logger.isDebugEnabled()) {
logger.trace("Received " + message + ", " + session); logger.debug("Received " + message + ", " + session);
} }
super.handleMessage(session, message); super.handleMessage(session, message);
} }

12
spring-websocket/src/main/java/org/springframework/web/socket/support/PerConnectionWebSocketHandler.java

@ -57,9 +57,16 @@ public class PerConnectionWebSocketHandler implements WebSocketHandler, BeanFact
private final Map<WebSocketSession, WebSocketHandler> handlers = private final Map<WebSocketSession, WebSocketHandler> handlers =
new ConcurrentHashMap<WebSocketSession, WebSocketHandler>(); new ConcurrentHashMap<WebSocketSession, WebSocketHandler>();
private final boolean supportsPartialMessages;
public PerConnectionWebSocketHandler(Class<? extends WebSocketHandler> handlerType) { public PerConnectionWebSocketHandler(Class<? extends WebSocketHandler> handlerType) {
this(handlerType, false);
}
public PerConnectionWebSocketHandler(Class<? extends WebSocketHandler> handlerType, boolean supportsPartialMessages) {
this.provider = new BeanCreatingHandlerProvider<WebSocketHandler>(handlerType); this.provider = new BeanCreatingHandlerProvider<WebSocketHandler>(handlerType);
this.supportsPartialMessages = supportsPartialMessages;
} }
@Override @Override
@ -112,6 +119,11 @@ public class PerConnectionWebSocketHandler implements WebSocketHandler, BeanFact
} }
} }
@Override
public boolean supportsPartialMessages() {
return this.supportsPartialMessages;
}
@Override @Override
public String toString() { public String toString() {
return "PerConnectionWebSocketHandlerProxy [handlerType=" + this.provider.getHandlerType() + "]"; return "PerConnectionWebSocketHandlerProxy [handlerType=" + this.provider.getHandlerType() + "]";

6
spring-websocket/src/main/java/org/springframework/web/socket/support/WebSocketHandlerDecorator.java

@ -62,6 +62,12 @@ public class WebSocketHandlerDecorator implements WebSocketHandler {
this.delegate.afterConnectionClosed(session, closeStatus); this.delegate.afterConnectionClosed(session, closeStatus);
} }
@Override
public boolean supportsPartialMessages() {
return this.delegate.supportsPartialMessages();
}
@Override @Override
public String toString() { public String toString() {
return getClass().getSimpleName() + " [delegate=" + this.delegate + "]"; return getClass().getSimpleName() + " [delegate=" + this.delegate + "]";

Loading…
Cancel
Save