From 2c2de82ffba51f102bea67b5dbdb695ca547239e Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 6 Dec 2016 17:07:07 -0500 Subject: [PATCH] Flush after each WebSocket message by default Issue: SPR-14527 --- .../reactive/socket/adapter/ReactorNettyWebSocketSession.java | 3 ++- .../web/reactive/socket/adapter/RxNettyWebSocketSession.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java index b532a3be0f1..5abb6a223ef 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java @@ -56,8 +56,9 @@ public class ReactorNettyWebSocketSession @Override public Mono send(Publisher messages) { - HttpOutbound outbound = getDelegate().getHttpOutbound(); Flux frameFlux = Flux.from(messages).map(this::toFrame); + HttpOutbound outbound = getDelegate().getHttpOutbound(); + outbound.flushEach(); return outbound.sendObject(frameFlux); } diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java index 5a2cdcf9d2e..4fa38c0fd8e 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java @@ -55,7 +55,7 @@ public class RxNettyWebSocketSession extends NettyWebSocketSessionSupport send(Publisher messages) { Observable frames = RxReactiveStreams.toObservable(messages).map(this::toFrame); - Observable completion = getDelegate().write(frames); + Observable completion = getDelegate().writeAndFlushOnEach(frames); return Mono.from(RxReactiveStreams.toPublisher(completion)); }