Browse Source

Extract Netty WebSocket session + adapter base classes

Issue: SPR-14527
pull/1259/head
Rossen Stoyanchev 9 years ago
parent
commit
46f2aafdc0
  1. 126
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java
  2. 27
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketHandlerAdapter.java
  3. 95
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java
  4. 53
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketHandlerAdapterSupport.java

126
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java

@ -0,0 +1,126 @@ @@ -0,0 +1,126 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.adapter;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import reactor.core.publisher.Flux;
import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
/**
* Base class for Netty-based {@link WebSocketSession} adapters.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public abstract class NettyWebSocketSessionSupport<T> extends WebSocketSessionSupport<T> {
private static final Map<Class<?>, WebSocketMessage.Type> MESSAGE_TYPES;
static {
MESSAGE_TYPES = new HashMap<>(4);
MESSAGE_TYPES.put(TextWebSocketFrame.class, WebSocketMessage.Type.TEXT);
MESSAGE_TYPES.put(BinaryWebSocketFrame.class, WebSocketMessage.Type.BINARY);
MESSAGE_TYPES.put(PingWebSocketFrame.class, WebSocketMessage.Type.PING);
MESSAGE_TYPES.put(PongWebSocketFrame.class, WebSocketMessage.Type.PONG);
}
protected final String id;
protected final URI uri;
protected final NettyDataBufferFactory bufferFactory;
protected NettyWebSocketSessionSupport(T delegate, URI uri, NettyDataBufferFactory factory) {
super(delegate);
Assert.notNull(uri, "'uri' is required.");
Assert.notNull(uri, "'bufferFactory' is required.");
this.uri = uri;
this.bufferFactory = factory;
this.id = ObjectUtils.getIdentityHexString(getDelegate());
}
@Override
public String getId() {
return this.id;
}
@Override
public URI getUri() {
return this.uri;
}
protected Flux<WebSocketMessage> toMessageFlux(Flux<WebSocketFrame> frameFlux) {
return frameFlux
.filter(frame -> !(frame instanceof CloseWebSocketFrame))
.window()
.concatMap(flux -> flux.takeUntil(WebSocketFrame::isFinalFragment).buffer())
.map(this::toMessage);
}
@SuppressWarnings("OptionalGetWithoutIsPresent")
private WebSocketMessage toMessage(List<WebSocketFrame> frames) {
Class<?> frameType = frames.get(0).getClass();
if (frames.size() == 1) {
NettyDataBuffer buffer = this.bufferFactory.wrap(frames.get(0).content());
return WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer);
}
return frames.stream()
.map(socketFrame -> bufferFactory.wrap(socketFrame.content()))
.reduce(NettyDataBuffer::write)
.map(buffer -> WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer))
.get();
}
protected WebSocketFrame toFrame(WebSocketMessage message) {
ByteBuf byteBuf = NettyDataBufferFactory.toByteBuf(message.getPayload());
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
return new TextWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
return new BinaryWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.PING.equals(message.getType())) {
return new PingWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
return new PongWebSocketFrame(byteBuf);
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message.getType());
}
}
}

27
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketHandlerAdapter.java

@ -15,8 +15,6 @@ @@ -15,8 +15,6 @@
*/
package org.springframework.web.reactive.socket.adapter;
import java.net.URI;
import io.reactivex.netty.protocol.http.ws.WebSocketConnection;
import reactor.core.publisher.Mono;
import rx.Observable;
@ -35,37 +33,30 @@ import org.springframework.web.reactive.socket.WebSocketHandler; @@ -35,37 +33,30 @@ import org.springframework.web.reactive.socket.WebSocketHandler;
* @author Rossen Stoyanchev
* @since 5.0
*/
public class RxNettyWebSocketHandlerAdapter
public class RxNettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport
implements io.reactivex.netty.protocol.http.ws.server.WebSocketHandler {
private final URI uri;
private final NettyDataBufferFactory bufferFactory;
private final WebSocketHandler handler;
public RxNettyWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler handler) {
Assert.notNull("'request' is required");
super(request, handler);
Assert.notNull("'response' is required");
Assert.notNull("'handler' handler is required");
this.uri = request.getURI();
this.bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
this.handler = handler;
}
@Override
public Observable<Void> handle(WebSocketConnection connection) {
Mono<Void> result = this.handler.handle(createSession(connection));
return RxReactiveStreams.toObservable(result);
public NettyDataBufferFactory getBufferFactory() {
return this.bufferFactory;
}
private RxNettyWebSocketSession createSession(WebSocketConnection conn) {
return new RxNettyWebSocketSession(conn, this.uri, this.bufferFactory);
@Override
public Observable<Void> handle(WebSocketConnection conn) {
RxNettyWebSocketSession session = new RxNettyWebSocketSession(conn, getUri(), getBufferFactory());
Mono<Void> result = getDelegate().handle(session);
return RxReactiveStreams.toObservable(result);
}
}

95
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java

@ -16,16 +16,7 @@ @@ -16,16 +16,7 @@
package org.springframework.web.reactive.socket.adapter;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.reactivex.netty.protocol.http.ws.WebSocketConnection;
import org.reactivestreams.Publisher;
@ -34,79 +25,31 @@ import reactor.core.publisher.Mono; @@ -34,79 +25,31 @@ import reactor.core.publisher.Mono;
import rx.Observable;
import rx.RxReactiveStreams;
import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
/**
* Spring {@link WebSocketSession} adapter for RxNetty's
* {@link io.reactivex.netty.protocol.http.ws.WebSocketConnection}.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public class RxNettyWebSocketSession extends WebSocketSessionSupport<WebSocketConnection> {
private static final Map<Class<?>, WebSocketMessage.Type> MESSAGE_TYPES;
static {
MESSAGE_TYPES = new HashMap<>(4);
MESSAGE_TYPES.put(TextWebSocketFrame.class, WebSocketMessage.Type.TEXT);
MESSAGE_TYPES.put(BinaryWebSocketFrame.class, WebSocketMessage.Type.BINARY);
MESSAGE_TYPES.put(PingWebSocketFrame.class, WebSocketMessage.Type.PING);
MESSAGE_TYPES.put(PongWebSocketFrame.class, WebSocketMessage.Type.PONG);
}
private final String id;
private final URI uri;
private final NettyDataBufferFactory bufferFactory;
public class RxNettyWebSocketSession extends NettyWebSocketSessionSupport<WebSocketConnection> {
public RxNettyWebSocketSession(WebSocketConnection conn, URI uri, NettyDataBufferFactory factory) {
super(conn);
Assert.notNull(uri, "'uri' is required.");
Assert.notNull(uri, "'bufferFactory' is required.");
this.id = ObjectUtils.getIdentityHexString(getDelegate());
this.uri = uri;
this.bufferFactory = factory;
}
@Override
public String getId() {
return this.id;
super(conn, uri, factory);
}
@Override
public URI getUri() {
return this.uri;
}
@Override
public Flux<WebSocketMessage> receive() {
return Flux.from(RxReactiveStreams.toPublisher(getDelegate().getInput()))
.filter(frame -> !(frame instanceof CloseWebSocketFrame))
.window()
.concatMap(flux -> flux.takeUntil(WebSocketFrame::isFinalFragment).buffer())
.map(this::toMessage);
}
@SuppressWarnings("OptionalGetWithoutIsPresent")
private WebSocketMessage toMessage(List<WebSocketFrame> frames) {
Class<?> frameType = frames.get(0).getClass();
if (frames.size() == 1) {
NettyDataBuffer buffer = this.bufferFactory.wrap(frames.get(0).content());
return WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer);
}
return frames.stream()
.map(socketFrame -> bufferFactory.wrap(socketFrame.content()))
.reduce(NettyDataBuffer::write)
.map(buffer -> WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer))
.get();
Observable<WebSocketFrame> observable = getDelegate().getInput();
Flux<WebSocketFrame> flux = Flux.from(RxReactiveStreams.toPublisher(observable));
return toMessageFlux(flux);
}
@Override
@ -116,28 +59,10 @@ public class RxNettyWebSocketSession extends WebSocketSessionSupport<WebSocketCo @@ -116,28 +59,10 @@ public class RxNettyWebSocketSession extends WebSocketSessionSupport<WebSocketCo
return Mono.from(RxReactiveStreams.toPublisher(completion));
}
private WebSocketFrame toFrame(WebSocketMessage message) {
ByteBuf byteBuf = NettyDataBufferFactory.toByteBuf(message.getPayload());
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
return new TextWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
return new BinaryWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.PING.equals(message.getType())) {
return new PingWebSocketFrame(byteBuf);
}
else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
return new PongWebSocketFrame(byteBuf);
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message.getType());
}
}
@Override
protected Mono<Void> closeInternal(CloseStatus status) {
return Mono.from(RxReactiveStreams.toPublisher(getDelegate().close()));
Observable<Void> completion = getDelegate().close();
return Mono.from(RxReactiveStreams.toPublisher(completion));
}
}

53
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketHandlerAdapterSupport.java

@ -0,0 +1,53 @@ @@ -0,0 +1,53 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.adapter;
import java.net.URI;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.WebSocketHandler;
/**
* Base class for {@link WebSocketHandler} implementations.
*
* @author Rossen Stoyanchev
* @since 5.0
*/
public abstract class WebSocketHandlerAdapterSupport {
private final URI uri;
private final WebSocketHandler delegate;
protected WebSocketHandlerAdapterSupport(ServerHttpRequest request, WebSocketHandler handler) {
Assert.notNull("'request' is required");
Assert.notNull("'handler' handler is required");
this.uri = request.getURI();
this.delegate = handler;
}
public URI getUri() {
return this.uri;
}
public WebSocketHandler getDelegate() {
return this.delegate;
}
}
Loading…
Cancel
Save