Browse Source

Polish

pull/1256/merge
Rossen Stoyanchev 9 years ago
parent
commit
f32a41933e
  1. 7
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java
  2. 72
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java
  3. 22
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java
  4. 84
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java
  5. 22
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java
  6. 59
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java
  7. 35
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java

7
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java

@ -213,7 +213,12 @@ public abstract class AbstractListenerWebSocketSession<T> extends WebSocketSessi
return this.isReady && this.currentData != null; return this.isReady && this.currentData != null;
} }
public void setReady(boolean ready) { /**
* Sub-classes can invoke this before sending a message (false) and
* after receiving the async send callback (true) effective translating
* async completion callback into simple flow control.
*/
public void setReadyToSend(boolean ready) {
this.isReady = ready; this.isReady = ready;
} }
} }

72
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java

@ -30,6 +30,8 @@ import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.OpCode; import org.eclipse.jetty.websocket.common.OpCode;
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@ -50,84 +52,90 @@ public class JettyWebSocketHandlerAdapter {
private static final ByteBuffer EMPTY_PAYLOAD = ByteBuffer.wrap(new byte[0]); private static final ByteBuffer EMPTY_PAYLOAD = ByteBuffer.wrap(new byte[0]);
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(false); private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(false);
private final WebSocketHandler handler; private final WebSocketHandler delegate;
private JettyWebSocketSession session;
private JettyWebSocketSession wsSession;
public JettyWebSocketHandlerAdapter(WebSocketHandler handler) { public JettyWebSocketHandlerAdapter(WebSocketHandler delegate) {
Assert.notNull("'handler' is required"); Assert.notNull("WebSocketHandler is required");
this.handler = handler; this.delegate = delegate;
} }
@OnWebSocketConnect @OnWebSocketConnect
public void onWebSocketConnect(Session session) { public void onWebSocketConnect(Session session) {
this.wsSession = new JettyWebSocketSession(session); this.session = new JettyWebSocketSession(session);
HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(); HandlerResultSubscriber subscriber = new HandlerResultSubscriber();
this.handler.handle(this.wsSession).subscribe(resultSubscriber); this.delegate.handle(this.session).subscribe(subscriber);
} }
@OnWebSocketMessage @OnWebSocketMessage
public void onWebSocketText(String message) { public void onWebSocketText(String message) {
if (this.wsSession != null) { if (this.session != null) {
WebSocketMessage wsMessage = toMessage(Type.TEXT, message); WebSocketMessage webSocketMessage = toMessage(Type.TEXT, message);
this.wsSession.handleMessage(wsMessage.getType(), wsMessage); this.session.handleMessage(webSocketMessage.getType(), webSocketMessage);
} }
} }
@OnWebSocketMessage @OnWebSocketMessage
public void onWebSocketBinary(byte[] message, int offset, int length) { public void onWebSocketBinary(byte[] message, int offset, int length) {
if (this.wsSession != null) { if (this.session != null) {
WebSocketMessage wsMessage = toMessage(Type.BINARY, ByteBuffer.wrap(message, offset, length)); ByteBuffer buffer = ByteBuffer.wrap(message, offset, length);
wsSession.handleMessage(wsMessage.getType(), wsMessage); WebSocketMessage webSocketMessage = toMessage(Type.BINARY, buffer);
session.handleMessage(webSocketMessage.getType(), webSocketMessage);
} }
} }
@OnWebSocketFrame @OnWebSocketFrame
public void onWebSocketFrame(Frame frame) { public void onWebSocketFrame(Frame frame) {
if (this.wsSession != null) { if (this.session != null) {
if (OpCode.PONG == frame.getOpCode()) { if (OpCode.PONG == frame.getOpCode()) {
ByteBuffer message = frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD; ByteBuffer buffer = (frame.getPayload() != null ? frame.getPayload() : EMPTY_PAYLOAD);
WebSocketMessage wsMessage = toMessage(Type.PONG, message); WebSocketMessage webSocketMessage = toMessage(Type.PONG, buffer);
wsSession.handleMessage(wsMessage.getType(), wsMessage); session.handleMessage(webSocketMessage.getType(), webSocketMessage);
} }
} }
} }
@OnWebSocketClose @OnWebSocketClose
public void onWebSocketClose(int statusCode, String reason) { public void onWebSocketClose(int statusCode, String reason) {
if (this.wsSession != null) { if (this.session != null) {
this.wsSession.handleClose(new CloseStatus(statusCode, reason)); this.session.handleClose(new CloseStatus(statusCode, reason));
} }
} }
@OnWebSocketError @OnWebSocketError
public void onWebSocketError(Throwable cause) { public void onWebSocketError(Throwable cause) {
if (this.wsSession != null) { if (this.session != null) {
this.wsSession.handleError(cause); this.session.handleError(cause);
} }
} }
private <T> WebSocketMessage toMessage(Type type, T message) { private <T> WebSocketMessage toMessage(Type type, T message) {
if (Type.TEXT.equals(type)) { if (Type.TEXT.equals(type)) {
return WebSocketMessage.create(Type.TEXT, byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8);
bufferFactory.wrap(((String) message).getBytes(StandardCharsets.UTF_8))); DataBuffer buffer = this.bufferFactory.wrap(bytes);
return WebSocketMessage.create(Type.TEXT, buffer);
} }
else if (Type.BINARY.equals(type)) { else if (Type.BINARY.equals(type)) {
return WebSocketMessage.create(Type.BINARY, DataBuffer buffer = this.bufferFactory.wrap((ByteBuffer) message);
bufferFactory.wrap((ByteBuffer) message)); return WebSocketMessage.create(Type.BINARY, buffer);
} }
else if (Type.PONG.equals(type)) { else if (Type.PONG.equals(type)) {
return WebSocketMessage.create(Type.PONG, DataBuffer buffer = this.bufferFactory.wrap((ByteBuffer) message);
bufferFactory.wrap((ByteBuffer) message)); return WebSocketMessage.create(Type.PONG, buffer);
} }
else { else {
throw new IllegalArgumentException("Unexpected message type: " + message); throw new IllegalArgumentException("Unexpected message type: " + message);
} }
} }
private final class HandlerResultSubscriber implements Subscriber<Void> { private final class HandlerResultSubscriber implements Subscriber<Void> {
@Override @Override
@ -142,15 +150,15 @@ public class JettyWebSocketHandlerAdapter {
@Override @Override
public void onError(Throwable ex) { public void onError(Throwable ex) {
if (wsSession != null) { if (session != null) {
wsSession.close(new CloseStatus(CloseStatus.SERVER_ERROR.getCode(), ex.getMessage())); session.close(new CloseStatus(CloseStatus.SERVER_ERROR.getCode(), ex.getMessage()));
} }
} }
@Override @Override
public void onComplete() { public void onComplete() {
if (wsSession != null) { if (session != null) {
wsSession.close(); session.close();
} }
} }
} }

22
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java

@ -17,6 +17,7 @@
package org.springframework.web.reactive.socket.adapter; package org.springframework.web.reactive.socket.adapter;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.Session;
@ -52,22 +53,21 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Sess
@Override @Override
protected boolean sendMessage(WebSocketMessage message) throws IOException { protected boolean sendMessage(WebSocketMessage message) throws IOException {
ByteBuffer buffer = message.getPayload().asByteBuffer();
if (WebSocketMessage.Type.TEXT.equals(message.getType())) { if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
getSendProcessor().setReady(false); getSendProcessor().setReadyToSend(false);
getDelegate().getRemote().sendString( String text = new String(buffer.array(), StandardCharsets.UTF_8);
new String(message.getPayload().asByteBuffer().array(), StandardCharsets.UTF_8), getDelegate().getRemote().sendString(text, new SendProcessorCallback());
new WebSocketMessageWriteCallback());
} }
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) { else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
getSendProcessor().setReady(false); getSendProcessor().setReadyToSend(false);
getDelegate().getRemote().sendBytes(message.getPayload().asByteBuffer(), getDelegate().getRemote().sendBytes(buffer, new SendProcessorCallback());
new WebSocketMessageWriteCallback());
} }
else if (WebSocketMessage.Type.PING.equals(message.getType())) { else if (WebSocketMessage.Type.PING.equals(message.getType())) {
getDelegate().getRemote().sendPing(message.getPayload().asByteBuffer()); getDelegate().getRemote().sendPing(buffer);
} }
else if (WebSocketMessage.Type.PONG.equals(message.getType())) { else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
getDelegate().getRemote().sendPong(message.getPayload().asByteBuffer()); getDelegate().getRemote().sendPong(buffer);
} }
else { else {
throw new IllegalArgumentException("Unexpected message type: " + message.getType()); throw new IllegalArgumentException("Unexpected message type: " + message.getType());
@ -91,7 +91,7 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Sess
} }
private final class WebSocketMessageWriteCallback implements WriteCallback { private final class SendProcessorCallback implements WriteCallback {
@Override @Override
public void writeFailed(Throwable x) { public void writeFailed(Throwable x) {
@ -101,7 +101,7 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Sess
@Override @Override
public void writeSuccess() { public void writeSuccess() {
getSendProcessor().setReady(true); getSendProcessor().setReadyToSend(true);
getSendProcessor().onWritePossible(); getSendProcessor().onWritePossible();
} }

84
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java

@ -18,16 +18,16 @@ package org.springframework.web.reactive.socket.adapter;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import javax.websocket.CloseReason; import javax.websocket.CloseReason;
import javax.websocket.Endpoint; import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig; import javax.websocket.EndpointConfig;
import javax.websocket.MessageHandler;
import javax.websocket.PongMessage; import javax.websocket.PongMessage;
import javax.websocket.Session; import javax.websocket.Session;
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@ -47,84 +47,72 @@ public class TomcatWebSocketHandlerAdapter extends Endpoint {
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(false); private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(false);
private final WebSocketHandler handler; private final WebSocketHandler delegate;
private TomcatWebSocketSession wsSession; private TomcatWebSocketSession session;
public TomcatWebSocketHandlerAdapter(WebSocketHandler handler) {
Assert.notNull("'handler' is required"); public TomcatWebSocketHandlerAdapter(WebSocketHandler delegate) {
this.handler = handler; Assert.notNull("WebSocketHandler is required");
this.delegate = delegate;
} }
@Override @Override
public void onOpen(Session session, EndpointConfig config) { public void onOpen(Session session, EndpointConfig config) {
this.wsSession = new TomcatWebSocketSession(session); this.session = new TomcatWebSocketSession(session);
session.addMessageHandler(new MessageHandler.Whole<String>() {
@Override
public void onMessage(String message) {
WebSocketMessage wsMessage = toMessage(message);
wsSession.handleMessage(wsMessage.getType(), wsMessage);
}
session.addMessageHandler(String.class, message -> {
WebSocketMessage webSocketMessage = toMessage(message);
this.session.handleMessage(webSocketMessage.getType(), webSocketMessage);
}); });
session.addMessageHandler(new MessageHandler.Whole<ByteBuffer>() { session.addMessageHandler(ByteBuffer.class, message -> {
WebSocketMessage webSocketMessage = toMessage(message);
@Override this.session.handleMessage(webSocketMessage.getType(), webSocketMessage);
public void onMessage(ByteBuffer message) {
WebSocketMessage wsMessage = toMessage(message);
wsSession.handleMessage(wsMessage.getType(), wsMessage);
}
}); });
session.addMessageHandler(new MessageHandler.Whole<PongMessage>() { session.addMessageHandler(PongMessage.class, message -> {
WebSocketMessage webSocketMessage = toMessage(message);
@Override this.session.handleMessage(webSocketMessage.getType(), webSocketMessage);
public void onMessage(PongMessage message) {
WebSocketMessage wsMessage = toMessage(message);
wsSession.handleMessage(wsMessage.getType(), wsMessage);
}
}); });
HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(); HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber();
this.handler.handle(this.wsSession).subscribe(resultSubscriber); this.delegate.handle(this.session).subscribe(resultSubscriber);
} }
@Override @Override
public void onClose(Session session, CloseReason reason) { public void onClose(Session session, CloseReason reason) {
if (this.wsSession != null) { if (this.session != null) {
this.wsSession.handleClose( int code = reason.getCloseCode().getCode();
new CloseStatus(reason.getCloseCode().getCode(), reason.getReasonPhrase())); this.session.handleClose(new CloseStatus(code, reason.getReasonPhrase()));
} }
} }
@Override @Override
public void onError(Session session, Throwable exception) { public void onError(Session session, Throwable exception) {
if (this.wsSession != null) { if (this.session != null) {
this.wsSession.handleError(exception); this.session.handleError(exception);
} }
} }
private <T> WebSocketMessage toMessage(T message) { private <T> WebSocketMessage toMessage(T message) {
if (message instanceof String) { if (message instanceof String) {
return WebSocketMessage.create(Type.TEXT, byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8);
bufferFactory.wrap(((String) message).getBytes(StandardCharsets.UTF_8))); return WebSocketMessage.create(Type.TEXT, this.bufferFactory.wrap(bytes));
} }
else if (message instanceof ByteBuffer) { else if (message instanceof ByteBuffer) {
return WebSocketMessage.create(Type.BINARY, DataBuffer buffer = this.bufferFactory.wrap((ByteBuffer) message);
bufferFactory.wrap((ByteBuffer) message)); return WebSocketMessage.create(Type.BINARY, buffer);
} }
else if (message instanceof PongMessage) { else if (message instanceof PongMessage) {
return WebSocketMessage.create(Type.PONG, DataBuffer buffer = this.bufferFactory.wrap(((PongMessage) message).getApplicationData());
bufferFactory.wrap(((PongMessage) message).getApplicationData())); return WebSocketMessage.create(Type.PONG, buffer);
} }
else { else {
throw new IllegalArgumentException("Unexpected message type: " + message); throw new IllegalArgumentException("Unexpected message type: " + message);
} }
} }
private final class HandlerResultSubscriber implements Subscriber<Void> { private final class HandlerResultSubscriber implements Subscriber<Void> {
@Override @Override
@ -139,15 +127,15 @@ public class TomcatWebSocketHandlerAdapter extends Endpoint {
@Override @Override
public void onError(Throwable ex) { public void onError(Throwable ex) {
if (wsSession != null) { if (session != null) {
wsSession.close(new CloseStatus(CloseStatus.SERVER_ERROR.getCode(), ex.getMessage())); session.close(new CloseStatus(CloseStatus.SERVER_ERROR.getCode(), ex.getMessage()));
} }
} }
@Override @Override
public void onComplete() { public void onComplete() {
if (wsSession != null) { if (session != null) {
wsSession.close(); session.close();
} }
} }
} }

22
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java

@ -17,6 +17,7 @@
package org.springframework.web.reactive.socket.adapter; package org.springframework.web.reactive.socket.adapter;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import javax.websocket.CloseReason; import javax.websocket.CloseReason;
import javax.websocket.CloseReason.CloseCodes; import javax.websocket.CloseReason.CloseCodes;
@ -59,22 +60,21 @@ public class TomcatWebSocketSession extends AbstractListenerWebSocketSession<Ses
@Override @Override
protected boolean sendMessage(WebSocketMessage message) throws IOException { protected boolean sendMessage(WebSocketMessage message) throws IOException {
ByteBuffer buffer = message.getPayload().asByteBuffer();
if (WebSocketMessage.Type.TEXT.equals(message.getType())) { if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
getSendProcessor().setReady(false); getSendProcessor().setReadyToSend(false);
getDelegate().getAsyncRemote().sendText( String text = new String(buffer.array(), StandardCharsets.UTF_8);
new String(message.getPayload().asByteBuffer().array(), StandardCharsets.UTF_8), getDelegate().getAsyncRemote().sendText(text, new SendProcessorCallback());
new WebSocketMessageSendHandler());
} }
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) { else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
getSendProcessor().setReady(false); getSendProcessor().setReadyToSend(false);
getDelegate().getAsyncRemote().sendBinary(message.getPayload().asByteBuffer(), getDelegate().getAsyncRemote().sendBinary(buffer, new SendProcessorCallback());
new WebSocketMessageSendHandler());
} }
else if (WebSocketMessage.Type.PING.equals(message.getType())) { else if (WebSocketMessage.Type.PING.equals(message.getType())) {
getDelegate().getAsyncRemote().sendPing(message.getPayload().asByteBuffer()); getDelegate().getAsyncRemote().sendPing(buffer);
} }
else if (WebSocketMessage.Type.PONG.equals(message.getType())) { else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
getDelegate().getAsyncRemote().sendPong(message.getPayload().asByteBuffer()); getDelegate().getAsyncRemote().sendPong(buffer);
} }
else { else {
throw new IllegalArgumentException("Unexpected message type: " + message.getType()); throw new IllegalArgumentException("Unexpected message type: " + message.getType());
@ -98,12 +98,12 @@ public class TomcatWebSocketSession extends AbstractListenerWebSocketSession<Ses
} }
private final class WebSocketMessageSendHandler implements SendHandler { private final class SendProcessorCallback implements SendHandler {
@Override @Override
public void onResult(SendResult result) { public void onResult(SendResult result) {
if (result.isOK()) { if (result.isOK()) {
getSendProcessor().setReady(true); getSendProcessor().setReadyToSend(true);
getSendProcessor().onWritePossible(); getSendProcessor().onWritePossible();
} }
else { else {

59
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java

@ -16,13 +16,14 @@
package org.springframework.web.reactive.socket.adapter; package org.springframework.web.reactive.socket.adapter;
import java.io.IOException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import org.reactivestreams.Subscriber; import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription; import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@ -50,78 +51,78 @@ public class UndertowWebSocketHandlerAdapter implements WebSocketConnectionCallb
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(false); private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(false);
private final WebSocketHandler handler; private final WebSocketHandler delegate;
private UndertowWebSocketSession session;
private UndertowWebSocketSession wsSession;
public UndertowWebSocketHandlerAdapter(WebSocketHandler handler) { public UndertowWebSocketHandlerAdapter(WebSocketHandler delegate) {
Assert.notNull("'handler' is required"); Assert.notNull("WebSocketHandler is required");
this.handler = handler; this.delegate = delegate;
} }
@Override @Override
public void onConnect(WebSocketHttpExchange exchange, WebSocketChannel channel) { public void onConnect(WebSocketHttpExchange exchange, WebSocketChannel channel) {
try { try {
this.wsSession = new UndertowWebSocketSession(channel); this.session = new UndertowWebSocketSession(channel);
} }
catch (URISyntaxException e) { catch (URISyntaxException e) {
// TODO Auto-generated catch block // TODO Auto-generated catch block
e.printStackTrace(); e.printStackTrace();
} }
channel.getReceiveSetter().set(new ReceiveListener()); channel.getReceiveSetter().set(new UndertowReceiveListener());
channel.resumeReceives(); channel.resumeReceives();
HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(); HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber();
this.handler.handle(this.wsSession).subscribe(resultSubscriber); this.delegate.handle(this.session).subscribe(resultSubscriber);
} }
private final class ReceiveListener extends AbstractReceiveListener {
private final class UndertowReceiveListener extends AbstractReceiveListener {
@Override @Override
protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) { protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) {
wsSession.handleMessage(Type.TEXT, toMessage(Type.TEXT, message.getData())); session.handleMessage(Type.TEXT, toMessage(Type.TEXT, message.getData()));
} }
@Override @Override
protected void onFullBinaryMessage(WebSocketChannel channel, protected void onFullBinaryMessage(WebSocketChannel channel, BufferedBinaryMessage message) {
BufferedBinaryMessage message) throws IOException { session.handleMessage(Type.BINARY, toMessage(Type.BINARY, message.getData().getResource()));
wsSession.handleMessage(Type.BINARY, toMessage(Type.BINARY, message.getData().getResource()));
message.getData().free(); message.getData().free();
} }
@Override @Override
protected void onFullPongMessage(WebSocketChannel channel, protected void onFullPongMessage(WebSocketChannel channel, BufferedBinaryMessage message) {
BufferedBinaryMessage message) throws IOException { session.handleMessage(Type.PONG, toMessage(Type.PONG, message.getData().getResource()));
wsSession.handleMessage(Type.PONG, toMessage(Type.PONG, message.getData().getResource()));
message.getData().free(); message.getData().free();
} }
@Override @Override
protected void onFullCloseMessage(WebSocketChannel channel, protected void onFullCloseMessage(WebSocketChannel channel, BufferedBinaryMessage message) {
BufferedBinaryMessage message) throws IOException {
CloseMessage closeMessage = new CloseMessage(message.getData().getResource()); CloseMessage closeMessage = new CloseMessage(message.getData().getResource());
wsSession.handleClose(new CloseStatus(closeMessage.getCode(), closeMessage.getReason())); session.handleClose(new CloseStatus(closeMessage.getCode(), closeMessage.getReason()));
message.getData().free(); message.getData().free();
} }
@Override @Override
protected void onError(WebSocketChannel channel, Throwable error) { protected void onError(WebSocketChannel channel, Throwable error) {
wsSession.handleError(error); session.handleError(error);
} }
private <T> WebSocketMessage toMessage(Type type, T message) { private <T> WebSocketMessage toMessage(Type type, T message) {
if (Type.TEXT.equals(type)) { if (Type.TEXT.equals(type)) {
return WebSocketMessage.create(Type.TEXT, byte[] bytes = ((String) message).getBytes(StandardCharsets.UTF_8);
bufferFactory.wrap(((String) message).getBytes(StandardCharsets.UTF_8))); return WebSocketMessage.create(Type.TEXT, bufferFactory.wrap(bytes));
} }
else if (Type.BINARY.equals(type)) { else if (Type.BINARY.equals(type)) {
return WebSocketMessage.create(Type.BINARY, DataBuffer buffer = bufferFactory.allocateBuffer().write((ByteBuffer[]) message);
bufferFactory.allocateBuffer().write((ByteBuffer[]) message)); return WebSocketMessage.create(Type.BINARY, buffer);
} }
else if (Type.PONG.equals(type)) { else if (Type.PONG.equals(type)) {
return WebSocketMessage.create(Type.PONG, DataBuffer buffer = bufferFactory.allocateBuffer().write((ByteBuffer[]) message);
bufferFactory.allocateBuffer().write((ByteBuffer[]) message)); return WebSocketMessage.create(Type.PONG, buffer);
} }
else { else {
throw new IllegalArgumentException("Unexpected message type: " + message); throw new IllegalArgumentException("Unexpected message type: " + message);
@ -144,12 +145,12 @@ public class UndertowWebSocketHandlerAdapter implements WebSocketConnectionCallb
@Override @Override
public void onError(Throwable ex) { public void onError(Throwable ex) {
wsSession.close(new CloseStatus(CloseStatus.SERVER_ERROR.getCode(), ex.getMessage())); session.close(new CloseStatus(CloseStatus.SERVER_ERROR.getCode(), ex.getMessage()));
} }
@Override @Override
public void onComplete() { public void onComplete() {
wsSession.close(); session.close();
} }
} }

35
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java

@ -19,6 +19,7 @@ package org.springframework.web.reactive.socket.adapter;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import io.undertow.websockets.core.CloseMessage; import io.undertow.websockets.core.CloseMessage;
@ -41,10 +42,12 @@ import org.springframework.web.reactive.socket.WebSocketSession;
*/ */
public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<WebSocketChannel> { public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<WebSocketChannel> {
public UndertowWebSocketSession(WebSocketChannel channel) throws URISyntaxException { public UndertowWebSocketSession(WebSocketChannel channel) throws URISyntaxException {
super(channel, ObjectUtils.getIdentityHexString(channel), new URI(channel.getUrl())); super(channel, ObjectUtils.getIdentityHexString(channel), new URI(channel.getUrl()));
} }
@Override @Override
protected Mono<Void> closeInternal(CloseStatus status) { protected Mono<Void> closeInternal(CloseStatus status) {
CloseMessage cm = new CloseMessage(status.getCode(), status.getReason()); CloseMessage cm = new CloseMessage(status.getCode(), status.getReason());
@ -69,26 +72,23 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<W
@Override @Override
protected boolean sendMessage(WebSocketMessage message) throws IOException { protected boolean sendMessage(WebSocketMessage message) throws IOException {
ByteBuffer buffer = message.getPayload().asByteBuffer();
if (WebSocketMessage.Type.TEXT.equals(message.getType())) { if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
getSendProcessor().setReady(false); getSendProcessor().setReadyToSend(false);
WebSockets.sendText( String text = new String(buffer.array(), StandardCharsets.UTF_8);
new String(message.getPayload().asByteBuffer().array(), StandardCharsets.UTF_8), WebSockets.sendText(text, getDelegate(), new SendProcessorCallback());
getDelegate(), new WebSocketMessageSendHandler());
} }
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) { else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
getSendProcessor().setReady(false); getSendProcessor().setReadyToSend(false);
WebSockets.sendBinary(message.getPayload().asByteBuffer(), WebSockets.sendBinary(buffer, getDelegate(), new SendProcessorCallback());
getDelegate(), new WebSocketMessageSendHandler());
} }
else if (WebSocketMessage.Type.PING.equals(message.getType())) { else if (WebSocketMessage.Type.PING.equals(message.getType())) {
getSendProcessor().setReady(false); getSendProcessor().setReadyToSend(false);
WebSockets.sendPing(message.getPayload().asByteBuffer(), WebSockets.sendPing(buffer, getDelegate(), new SendProcessorCallback());
getDelegate(), new WebSocketMessageSendHandler());
} }
else if (WebSocketMessage.Type.PONG.equals(message.getType())) { else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
getSendProcessor().setReady(false); getSendProcessor().setReadyToSend(false);
WebSockets.sendPong(message.getPayload().asByteBuffer(), WebSockets.sendPong(buffer, getDelegate(), new SendProcessorCallback());
getDelegate(), new WebSocketMessageSendHandler());
} }
else { else {
throw new IllegalArgumentException("Unexpected message type: " + message.getType()); throw new IllegalArgumentException("Unexpected message type: " + message.getType());
@ -96,21 +96,20 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<W
return true; return true;
} }
private final class WebSocketMessageSendHandler implements WebSocketCallback<Void> {
private final class SendProcessorCallback implements WebSocketCallback<Void> {
@Override @Override
public void complete(WebSocketChannel channel, Void context) { public void complete(WebSocketChannel channel, Void context) {
getSendProcessor().setReady(true); getSendProcessor().setReadyToSend(true);
getSendProcessor().onWritePossible(); getSendProcessor().onWritePossible();
} }
@Override @Override
public void onError(WebSocketChannel channel, Void context, public void onError(WebSocketChannel channel, Void context, Throwable throwable) {
Throwable throwable) {
getSendProcessor().cancel(); getSendProcessor().cancel();
getSendProcessor().onError(throwable); getSendProcessor().onError(throwable);
} }
} }
} }

Loading…
Cancel
Save