diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java new file mode 100644 index 00000000000..8095e797494 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/NettyWebSocketSessionSupport.java @@ -0,0 +1,126 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.reactive.socket.adapter; + +import java.net.URI; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; +import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; +import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketFrame; +import reactor.core.publisher.Flux; + +import org.springframework.core.io.buffer.NettyDataBuffer; +import org.springframework.core.io.buffer.NettyDataBufferFactory; +import org.springframework.util.Assert; +import org.springframework.util.ObjectUtils; +import org.springframework.web.reactive.socket.WebSocketMessage; +import org.springframework.web.reactive.socket.WebSocketSession; + +/** + * Base class for Netty-based {@link WebSocketSession} adapters. + * + * @author Rossen Stoyanchev + * @since 5.0 + */ +public abstract class NettyWebSocketSessionSupport extends WebSocketSessionSupport { + + private static final Map, WebSocketMessage.Type> MESSAGE_TYPES; + + static { + MESSAGE_TYPES = new HashMap<>(4); + MESSAGE_TYPES.put(TextWebSocketFrame.class, WebSocketMessage.Type.TEXT); + MESSAGE_TYPES.put(BinaryWebSocketFrame.class, WebSocketMessage.Type.BINARY); + MESSAGE_TYPES.put(PingWebSocketFrame.class, WebSocketMessage.Type.PING); + MESSAGE_TYPES.put(PongWebSocketFrame.class, WebSocketMessage.Type.PONG); + } + + + protected final String id; + + protected final URI uri; + + protected final NettyDataBufferFactory bufferFactory; + + + protected NettyWebSocketSessionSupport(T delegate, URI uri, NettyDataBufferFactory factory) { + super(delegate); + Assert.notNull(uri, "'uri' is required."); + Assert.notNull(uri, "'bufferFactory' is required."); + this.uri = uri; + this.bufferFactory = factory; + this.id = ObjectUtils.getIdentityHexString(getDelegate()); + } + + + @Override + public String getId() { + return this.id; + } + + @Override + public URI getUri() { + return this.uri; + } + + protected Flux toMessageFlux(Flux frameFlux) { + return frameFlux + .filter(frame -> !(frame instanceof CloseWebSocketFrame)) + .window() + .concatMap(flux -> flux.takeUntil(WebSocketFrame::isFinalFragment).buffer()) + .map(this::toMessage); + } + + @SuppressWarnings("OptionalGetWithoutIsPresent") + private WebSocketMessage toMessage(List frames) { + Class frameType = frames.get(0).getClass(); + if (frames.size() == 1) { + NettyDataBuffer buffer = this.bufferFactory.wrap(frames.get(0).content()); + return WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer); + } + return frames.stream() + .map(socketFrame -> bufferFactory.wrap(socketFrame.content())) + .reduce(NettyDataBuffer::write) + .map(buffer -> WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer)) + .get(); + } + + protected WebSocketFrame toFrame(WebSocketMessage message) { + ByteBuf byteBuf = NettyDataBufferFactory.toByteBuf(message.getPayload()); + if (WebSocketMessage.Type.TEXT.equals(message.getType())) { + return new TextWebSocketFrame(byteBuf); + } + else if (WebSocketMessage.Type.BINARY.equals(message.getType())) { + return new BinaryWebSocketFrame(byteBuf); + } + else if (WebSocketMessage.Type.PING.equals(message.getType())) { + return new PingWebSocketFrame(byteBuf); + } + else if (WebSocketMessage.Type.PONG.equals(message.getType())) { + return new PongWebSocketFrame(byteBuf); + } + else { + throw new IllegalArgumentException("Unexpected message type: " + message.getType()); + } + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketHandlerAdapter.java index 4c727575e80..772de5c71e9 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketHandlerAdapter.java @@ -15,8 +15,6 @@ */ package org.springframework.web.reactive.socket.adapter; -import java.net.URI; - import io.reactivex.netty.protocol.http.ws.WebSocketConnection; import reactor.core.publisher.Mono; import rx.Observable; @@ -35,37 +33,30 @@ import org.springframework.web.reactive.socket.WebSocketHandler; * @author Rossen Stoyanchev * @since 5.0 */ -public class RxNettyWebSocketHandlerAdapter +public class RxNettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport implements io.reactivex.netty.protocol.http.ws.server.WebSocketHandler { - private final URI uri; - private final NettyDataBufferFactory bufferFactory; - private final WebSocketHandler handler; - public RxNettyWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler handler) { - Assert.notNull("'request' is required"); + super(request, handler); Assert.notNull("'response' is required"); - Assert.notNull("'handler' handler is required"); - - this.uri = request.getURI(); this.bufferFactory = (NettyDataBufferFactory) response.bufferFactory(); - this.handler = handler; } - @Override - public Observable handle(WebSocketConnection connection) { - Mono result = this.handler.handle(createSession(connection)); - return RxReactiveStreams.toObservable(result); + public NettyDataBufferFactory getBufferFactory() { + return this.bufferFactory; } - private RxNettyWebSocketSession createSession(WebSocketConnection conn) { - return new RxNettyWebSocketSession(conn, this.uri, this.bufferFactory); + @Override + public Observable handle(WebSocketConnection conn) { + RxNettyWebSocketSession session = new RxNettyWebSocketSession(conn, getUri(), getBufferFactory()); + Mono result = getDelegate().handle(session); + return RxReactiveStreams.toObservable(result); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java index a08d996aa40..5a2cdcf9d2e 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java @@ -16,16 +16,7 @@ package org.springframework.web.reactive.socket.adapter; import java.net.URI; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import io.netty.buffer.ByteBuf; -import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; -import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; -import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; -import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; -import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.reactivex.netty.protocol.http.ws.WebSocketConnection; import org.reactivestreams.Publisher; @@ -34,79 +25,31 @@ import reactor.core.publisher.Mono; import rx.Observable; import rx.RxReactiveStreams; -import org.springframework.core.io.buffer.NettyDataBuffer; import org.springframework.core.io.buffer.NettyDataBufferFactory; -import org.springframework.util.Assert; -import org.springframework.util.ObjectUtils; import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.WebSocketMessage; +import org.springframework.web.reactive.socket.WebSocketSession; /** + * Spring {@link WebSocketSession} adapter for RxNetty's + * {@link io.reactivex.netty.protocol.http.ws.WebSocketConnection}. * * @author Rossen Stoyanchev * @since 5.0 */ -public class RxNettyWebSocketSession extends WebSocketSessionSupport { - - private static final Map, WebSocketMessage.Type> MESSAGE_TYPES; - - static { - MESSAGE_TYPES = new HashMap<>(4); - MESSAGE_TYPES.put(TextWebSocketFrame.class, WebSocketMessage.Type.TEXT); - MESSAGE_TYPES.put(BinaryWebSocketFrame.class, WebSocketMessage.Type.BINARY); - MESSAGE_TYPES.put(PingWebSocketFrame.class, WebSocketMessage.Type.PING); - MESSAGE_TYPES.put(PongWebSocketFrame.class, WebSocketMessage.Type.PONG); - } - - - private final String id; - - private final URI uri; - - private final NettyDataBufferFactory bufferFactory; +public class RxNettyWebSocketSession extends NettyWebSocketSessionSupport { public RxNettyWebSocketSession(WebSocketConnection conn, URI uri, NettyDataBufferFactory factory) { - super(conn); - Assert.notNull(uri, "'uri' is required."); - Assert.notNull(uri, "'bufferFactory' is required."); - this.id = ObjectUtils.getIdentityHexString(getDelegate()); - this.uri = uri; - this.bufferFactory = factory; - } - - - @Override - public String getId() { - return this.id; + super(conn, uri, factory); } - @Override - public URI getUri() { - return this.uri; - } @Override public Flux receive() { - return Flux.from(RxReactiveStreams.toPublisher(getDelegate().getInput())) - .filter(frame -> !(frame instanceof CloseWebSocketFrame)) - .window() - .concatMap(flux -> flux.takeUntil(WebSocketFrame::isFinalFragment).buffer()) - .map(this::toMessage); - } - - @SuppressWarnings("OptionalGetWithoutIsPresent") - private WebSocketMessage toMessage(List frames) { - Class frameType = frames.get(0).getClass(); - if (frames.size() == 1) { - NettyDataBuffer buffer = this.bufferFactory.wrap(frames.get(0).content()); - return WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer); - } - return frames.stream() - .map(socketFrame -> bufferFactory.wrap(socketFrame.content())) - .reduce(NettyDataBuffer::write) - .map(buffer -> WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer)) - .get(); + Observable observable = getDelegate().getInput(); + Flux flux = Flux.from(RxReactiveStreams.toPublisher(observable)); + return toMessageFlux(flux); } @Override @@ -116,28 +59,10 @@ public class RxNettyWebSocketSession extends WebSocketSessionSupport closeInternal(CloseStatus status) { - return Mono.from(RxReactiveStreams.toPublisher(getDelegate().close())); + Observable completion = getDelegate().close(); + return Mono.from(RxReactiveStreams.toPublisher(completion)); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketHandlerAdapterSupport.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketHandlerAdapterSupport.java new file mode 100644 index 00000000000..97218500122 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/WebSocketHandlerAdapterSupport.java @@ -0,0 +1,53 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.web.reactive.socket.adapter; + +import java.net.URI; + +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.util.Assert; +import org.springframework.web.reactive.socket.WebSocketHandler; + +/** + * Base class for {@link WebSocketHandler} implementations. + * + * @author Rossen Stoyanchev + * @since 5.0 + */ +public abstract class WebSocketHandlerAdapterSupport { + + private final URI uri; + + private final WebSocketHandler delegate; + + + protected WebSocketHandlerAdapterSupport(ServerHttpRequest request, WebSocketHandler handler) { + Assert.notNull("'request' is required"); + Assert.notNull("'handler' handler is required"); + this.uri = request.getURI(); + this.delegate = handler; + } + + + public URI getUri() { + return this.uri; + } + + public WebSocketHandler getDelegate() { + return this.delegate; + } + +}