From 16939b7bc79cc825bba3bb5184ae0cb0ca73f488 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Mon, 25 Jul 2016 14:57:45 +0300 Subject: [PATCH] AbstractListenerServerHttpResponse improvements This commit changes writeWithInternal(Publisher body). It is implemented as writeAndFlushWith(Mono.just(body)). --- .../AbstractListenerServerHttpResponse.java | 19 +------- .../reactive/ServletServerHttpResponse.java | 43 ++++++++++--------- .../reactive/UndertowServerHttpResponse.java | 3 +- 3 files changed, 24 insertions(+), 41 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java index 3d58304385e..d9126fa0aa0 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java @@ -40,17 +40,7 @@ public abstract class AbstractListenerServerHttpResponse extends AbstractServerH @Override protected final Mono writeWithInternal(Publisher body) { - if (this.writeCalled.compareAndSet(false, true)) { - Processor bodyProcessor = createBodyProcessor(); - return Mono.from(subscriber -> { - body.subscribe(bodyProcessor); - bodyProcessor.subscribe(subscriber); - }); - - } else { - return Mono.error(new IllegalStateException( - "writeWith() or writeAndFlushWith() has already been called")); - } + return writeAndFlushWithInternal(Mono.just(body)); } @Override @@ -68,13 +58,6 @@ public abstract class AbstractListenerServerHttpResponse extends AbstractServerH } } - /** - * Abstract template method to create a {@code Processor} that - * will write the response body to the underlying output. Called from - * {@link #writeWithInternal(Publisher)}. - */ - protected abstract Processor createBodyProcessor(); - /** * Abstract template method to create a {@code Processor, Void>} * that will write the response body with flushes to the underlying output. Called from diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java index 6ae0647e532..323f8c0a868 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java @@ -22,13 +22,13 @@ import java.io.UncheckedIOException; import java.nio.charset.Charset; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; import javax.servlet.ServletOutputStream; import javax.servlet.WriteListener; import javax.servlet.http.Cookie; import javax.servlet.http.HttpServletResponse; import org.reactivestreams.Processor; +import org.reactivestreams.Publisher; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; @@ -44,7 +44,7 @@ import org.springframework.util.Assert; */ public class ServletServerHttpResponse extends AbstractListenerServerHttpResponse { - private final AtomicBoolean listenerRegistered = new AtomicBoolean(); + private final ResponseBodyWriteListener writeListener = new ResponseBodyWriteListener(); private volatile ResponseBodyProcessor bodyProcessor; @@ -112,15 +112,17 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons } } - private void registerListener() throws IOException { - if (this.listenerRegistered.compareAndSet(false, true)) { - ResponseBodyWriteListener writeListener = new ResponseBodyWriteListener(); - this.response.getOutputStream().setWriteListener(writeListener); + private void registerListener() { + try { + outputStream().setWriteListener(writeListener); + } + catch (IOException e) { + throw new UncheckedIOException(e); } } private void flush() throws IOException { - ServletOutputStream outputStream = this.response.getOutputStream(); + ServletOutputStream outputStream = outputStream(); if (outputStream.isReady()) { try { outputStream.flush(); @@ -136,22 +138,15 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons } } - @Override - protected ResponseBodyProcessor createBodyProcessor() { - try { - registerListener(); - this.bodyProcessor = new ResponseBodyProcessor(this.response.getOutputStream(), - this.bufferSize); - return this.bodyProcessor; - } - catch (IOException ex) { - throw new UncheckedIOException(ex); - } + private ServletOutputStream outputStream() throws IOException { + return this.response.getOutputStream(); } @Override - protected AbstractResponseBodyFlushProcessor createBodyFlushProcessor() { - return new ResponseBodyFlushProcessor(); + protected Processor, Void> createBodyFlushProcessor() { + Processor, Void> processor = new ResponseBodyFlushProcessor(); + registerListener(); + return processor; } private class ResponseBodyProcessor extends AbstractResponseBodyProcessor { @@ -238,7 +233,13 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons @Override protected Processor createBodyProcessor() { - return ServletServerHttpResponse.this.createBodyProcessor(); + try { + bodyProcessor = new ResponseBodyProcessor(outputStream(), bufferSize); + return bodyProcessor; + } + catch (IOException ex) { + throw new UncheckedIOException(ex); + } } @Override diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java index b4483aea480..b8df141cb91 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java @@ -123,8 +123,7 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon } } - @Override - protected ResponseBodyProcessor createBodyProcessor() { + private ResponseBodyProcessor createBodyProcessor() { if (this.responseChannel == null) { this.responseChannel = this.exchange.getResponseChannel(); }