Browse Source

Flush after each WebSocket message by default

Issue: SPR-14527
pull/1259/head
Rossen Stoyanchev 9 years ago
parent
commit
2c2de82ffb
  1. 3
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java
  2. 2
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java

3
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java

@ -56,8 +56,9 @@ public class ReactorNettyWebSocketSession
@Override @Override
public Mono<Void> send(Publisher<WebSocketMessage> messages) { public Mono<Void> send(Publisher<WebSocketMessage> messages) {
HttpOutbound outbound = getDelegate().getHttpOutbound();
Flux<WebSocketFrame> frameFlux = Flux.from(messages).map(this::toFrame); Flux<WebSocketFrame> frameFlux = Flux.from(messages).map(this::toFrame);
HttpOutbound outbound = getDelegate().getHttpOutbound();
outbound.flushEach();
return outbound.sendObject(frameFlux); return outbound.sendObject(frameFlux);
} }

2
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java

@ -55,7 +55,7 @@ public class RxNettyWebSocketSession extends NettyWebSocketSessionSupport<WebSoc
@Override @Override
public Mono<Void> send(Publisher<WebSocketMessage> messages) { public Mono<Void> send(Publisher<WebSocketMessage> messages) {
Observable<WebSocketFrame> frames = RxReactiveStreams.toObservable(messages).map(this::toFrame); Observable<WebSocketFrame> frames = RxReactiveStreams.toObservable(messages).map(this::toFrame);
Observable<Void> completion = getDelegate().write(frames); Observable<Void> completion = getDelegate().writeAndFlushOnEach(frames);
return Mono.from(RxReactiveStreams.toPublisher(completion)); return Mono.from(RxReactiveStreams.toPublisher(completion));
} }

Loading…
Cancel
Save