7 changed files with 232 additions and 8 deletions
@ -0,0 +1,64 @@
@@ -0,0 +1,64 @@
|
||||
/* |
||||
* Copyright 2002-2016 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.web.reactive.socket.adapter; |
||||
|
||||
import java.util.function.BiFunction; |
||||
|
||||
import org.reactivestreams.Publisher; |
||||
import reactor.ipc.netty.http.HttpInbound; |
||||
import reactor.ipc.netty.http.HttpOutbound; |
||||
|
||||
import org.springframework.core.io.buffer.NettyDataBufferFactory; |
||||
import org.springframework.http.server.reactive.ServerHttpRequest; |
||||
import org.springframework.http.server.reactive.ServerHttpResponse; |
||||
import org.springframework.util.Assert; |
||||
import org.springframework.web.reactive.socket.WebSocketHandler; |
||||
|
||||
/** |
||||
* Reactor Netty {@code WebSocketHandler} implementation adapting and |
||||
* delegating to a Spring {@link WebSocketHandler}. |
||||
* |
||||
* @author Rossen Stoyanchev |
||||
* @since 5.0 |
||||
*/ |
||||
public class ReactorNettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport |
||||
implements BiFunction<HttpInbound, HttpOutbound, Publisher<Void>> { |
||||
|
||||
|
||||
private final NettyDataBufferFactory bufferFactory; |
||||
|
||||
|
||||
public ReactorNettyWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response, |
||||
WebSocketHandler handler) { |
||||
|
||||
super(request, handler); |
||||
Assert.notNull("'response' is required"); |
||||
this.bufferFactory = (NettyDataBufferFactory) response.bufferFactory(); |
||||
} |
||||
|
||||
|
||||
public NettyDataBufferFactory getBufferFactory() { |
||||
return this.bufferFactory; |
||||
} |
||||
|
||||
@Override |
||||
public Publisher<Void> apply(HttpInbound inbound, HttpOutbound outbound) { |
||||
ReactorNettyWebSocketSession session = |
||||
new ReactorNettyWebSocketSession(inbound, outbound, getUri(), getBufferFactory()); |
||||
return getDelegate().handle(session); |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,96 @@
@@ -0,0 +1,96 @@
|
||||
/* |
||||
* Copyright 2002-2016 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.web.reactive.socket.adapter; |
||||
|
||||
import java.net.URI; |
||||
|
||||
import io.netty.handler.codec.http.websocketx.WebSocketFrame; |
||||
import org.reactivestreams.Publisher; |
||||
import reactor.core.publisher.Flux; |
||||
import reactor.core.publisher.Mono; |
||||
import reactor.ipc.netty.http.HttpInbound; |
||||
import reactor.ipc.netty.http.HttpOutbound; |
||||
|
||||
import org.springframework.core.io.buffer.NettyDataBufferFactory; |
||||
import org.springframework.web.reactive.socket.CloseStatus; |
||||
import org.springframework.web.reactive.socket.WebSocketMessage; |
||||
import org.springframework.web.reactive.socket.WebSocketSession; |
||||
|
||||
|
||||
/** |
||||
* Spring {@link WebSocketSession} adapter for RxNetty's |
||||
* {@link io.reactivex.netty.protocol.http.ws.WebSocketConnection}. |
||||
* |
||||
* @author Rossen Stoyanchev |
||||
* @since 5.0 |
||||
*/ |
||||
public class ReactorNettyWebSocketSession |
||||
extends NettyWebSocketSessionSupport<ReactorNettyWebSocketSession.WebSocketConnection> { |
||||
|
||||
|
||||
protected ReactorNettyWebSocketSession(HttpInbound inbound, HttpOutbound outbound, |
||||
URI uri, NettyDataBufferFactory factory) { |
||||
|
||||
super(new WebSocketConnection(inbound, outbound), uri, factory); |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public Flux<WebSocketMessage> receive() { |
||||
HttpInbound inbound = getDelegate().getHttpInbound(); |
||||
return toMessageFlux(inbound.receiveObject().cast(WebSocketFrame.class)); |
||||
} |
||||
|
||||
@Override |
||||
public Mono<Void> send(Publisher<WebSocketMessage> messages) { |
||||
HttpOutbound outbound = getDelegate().getHttpOutbound(); |
||||
Flux<WebSocketFrame> frameFlux = Flux.from(messages).map(this::toFrame); |
||||
return outbound.sendObject(frameFlux); |
||||
} |
||||
|
||||
@Override |
||||
protected Mono<Void> closeInternal(CloseStatus status) { |
||||
return Mono.error(new UnsupportedOperationException( |
||||
"Currently in Reactor Netty applications are expected to use the Cancellation" + |
||||
"returned from subscribing to the input Flux to close the WebSocket session.")); |
||||
} |
||||
|
||||
|
||||
/** |
||||
* Simple container for {@link HttpInbound} and {@link HttpOutbound}. |
||||
*/ |
||||
public static class WebSocketConnection { |
||||
|
||||
private final HttpInbound inbound; |
||||
|
||||
private final HttpOutbound outbound; |
||||
|
||||
|
||||
public WebSocketConnection(HttpInbound inbound, HttpOutbound outbound) { |
||||
this.inbound = inbound; |
||||
this.outbound = outbound; |
||||
} |
||||
|
||||
public HttpInbound getHttpInbound() { |
||||
return this.inbound; |
||||
} |
||||
|
||||
public HttpOutbound getHttpOutbound() { |
||||
return this.outbound; |
||||
} |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,58 @@
@@ -0,0 +1,58 @@
|
||||
/* |
||||
* Copyright 2002-2016 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
package org.springframework.web.reactive.socket.server.upgrade; |
||||
|
||||
import java.util.List; |
||||
|
||||
import reactor.core.publisher.Mono; |
||||
|
||||
import org.springframework.http.server.reactive.ReactorServerHttpRequest; |
||||
import org.springframework.http.server.reactive.ReactorServerHttpResponse; |
||||
import org.springframework.util.StringUtils; |
||||
import org.springframework.web.reactive.socket.WebSocketHandler; |
||||
import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketHandlerAdapter; |
||||
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; |
||||
import org.springframework.web.server.ServerWebExchange; |
||||
|
||||
/** |
||||
* A {@link RequestUpgradeStrategy} for use with Reactor Netty. |
||||
* |
||||
* @author Rossen Stoyanchev |
||||
* @since 5.0 |
||||
*/ |
||||
public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrategy { |
||||
|
||||
@Override |
||||
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler) { |
||||
|
||||
ReactorServerHttpRequest request = (ReactorServerHttpRequest) exchange.getRequest(); |
||||
ReactorServerHttpResponse response = (ReactorServerHttpResponse) exchange.getResponse(); |
||||
|
||||
ReactorNettyWebSocketHandlerAdapter reactorHandler = |
||||
new ReactorNettyWebSocketHandlerAdapter(request, response, webSocketHandler); |
||||
|
||||
String protocols = StringUtils.arrayToCommaDelimitedString(getSubProtocols(webSocketHandler)); |
||||
protocols = (StringUtils.hasText(protocols) ? protocols : null); |
||||
|
||||
return response.getReactorResponse().upgradeToWebsocket(protocols, false, reactorHandler); |
||||
} |
||||
|
||||
private static String[] getSubProtocols(WebSocketHandler webSocketHandler) { |
||||
List<String> subProtocols = webSocketHandler.getSubProtocols(); |
||||
return subProtocols.toArray(new String[subProtocols.size()]); |
||||
} |
||||
|
||||
} |
||||
Loading…
Reference in new issue