From 16939b7bc79cc825bba3bb5184ae0cb0ca73f488 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Mon, 25 Jul 2016 14:57:45 +0300 Subject: [PATCH 1/4] AbstractListenerServerHttpResponse improvements This commit changes writeWithInternal(Publisher body). It is implemented as writeAndFlushWith(Mono.just(body)). --- .../AbstractListenerServerHttpResponse.java | 19 +------- .../reactive/ServletServerHttpResponse.java | 43 ++++++++++--------- .../reactive/UndertowServerHttpResponse.java | 3 +- 3 files changed, 24 insertions(+), 41 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java index 3d58304385e..d9126fa0aa0 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java @@ -40,17 +40,7 @@ public abstract class AbstractListenerServerHttpResponse extends AbstractServerH @Override protected final Mono writeWithInternal(Publisher body) { - if (this.writeCalled.compareAndSet(false, true)) { - Processor bodyProcessor = createBodyProcessor(); - return Mono.from(subscriber -> { - body.subscribe(bodyProcessor); - bodyProcessor.subscribe(subscriber); - }); - - } else { - return Mono.error(new IllegalStateException( - "writeWith() or writeAndFlushWith() has already been called")); - } + return writeAndFlushWithInternal(Mono.just(body)); } @Override @@ -68,13 +58,6 @@ public abstract class AbstractListenerServerHttpResponse extends AbstractServerH } } - /** - * Abstract template method to create a {@code Processor} that - * will write the response body to the underlying output. Called from - * {@link #writeWithInternal(Publisher)}. - */ - protected abstract Processor createBodyProcessor(); - /** * Abstract template method to create a {@code Processor, Void>} * that will write the response body with flushes to the underlying output. Called from diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java index 6ae0647e532..323f8c0a868 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java @@ -22,13 +22,13 @@ import java.io.UncheckedIOException; import java.nio.charset.Charset; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; import javax.servlet.ServletOutputStream; import javax.servlet.WriteListener; import javax.servlet.http.Cookie; import javax.servlet.http.HttpServletResponse; import org.reactivestreams.Processor; +import org.reactivestreams.Publisher; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; @@ -44,7 +44,7 @@ import org.springframework.util.Assert; */ public class ServletServerHttpResponse extends AbstractListenerServerHttpResponse { - private final AtomicBoolean listenerRegistered = new AtomicBoolean(); + private final ResponseBodyWriteListener writeListener = new ResponseBodyWriteListener(); private volatile ResponseBodyProcessor bodyProcessor; @@ -112,15 +112,17 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons } } - private void registerListener() throws IOException { - if (this.listenerRegistered.compareAndSet(false, true)) { - ResponseBodyWriteListener writeListener = new ResponseBodyWriteListener(); - this.response.getOutputStream().setWriteListener(writeListener); + private void registerListener() { + try { + outputStream().setWriteListener(writeListener); + } + catch (IOException e) { + throw new UncheckedIOException(e); } } private void flush() throws IOException { - ServletOutputStream outputStream = this.response.getOutputStream(); + ServletOutputStream outputStream = outputStream(); if (outputStream.isReady()) { try { outputStream.flush(); @@ -136,22 +138,15 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons } } - @Override - protected ResponseBodyProcessor createBodyProcessor() { - try { - registerListener(); - this.bodyProcessor = new ResponseBodyProcessor(this.response.getOutputStream(), - this.bufferSize); - return this.bodyProcessor; - } - catch (IOException ex) { - throw new UncheckedIOException(ex); - } + private ServletOutputStream outputStream() throws IOException { + return this.response.getOutputStream(); } @Override - protected AbstractResponseBodyFlushProcessor createBodyFlushProcessor() { - return new ResponseBodyFlushProcessor(); + protected Processor, Void> createBodyFlushProcessor() { + Processor, Void> processor = new ResponseBodyFlushProcessor(); + registerListener(); + return processor; } private class ResponseBodyProcessor extends AbstractResponseBodyProcessor { @@ -238,7 +233,13 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons @Override protected Processor createBodyProcessor() { - return ServletServerHttpResponse.this.createBodyProcessor(); + try { + bodyProcessor = new ResponseBodyProcessor(outputStream(), bufferSize); + return bodyProcessor; + } + catch (IOException ex) { + throw new UncheckedIOException(ex); + } } @Override diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java index b4483aea480..b8df141cb91 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java @@ -123,8 +123,7 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon } } - @Override - protected ResponseBodyProcessor createBodyProcessor() { + private ResponseBodyProcessor createBodyProcessor() { if (this.responseChannel == null) { this.responseChannel = this.exchange.getResponseChannel(); } From 4798a1eb026d6b51bb15bcd64de1d349bf2183e6 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Tue, 26 Jul 2016 16:20:56 +0300 Subject: [PATCH 2/4] Cancel Subscription when onError is invoked internally AbstractResponseBodyProcessor.onError and AbstractResponseBodyFlushProcessor.onError will be invoked when: - The Publisher wants to signal with onError that there are failures. Once onError is invoked the Subscription should be considered canceled. - The internal implementation wants to signal with onError that there are failures. In this use case the implementation should invoke Subscription.cancel() --- .../reactive/AbstractResponseBodyFlushProcessor.java | 8 +++++++- .../server/reactive/AbstractResponseBodyProcessor.java | 6 +++++- .../http/server/reactive/ServletServerHttpResponse.java | 1 + 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java index 71adf24dd16..811cb78a96c 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java @@ -106,6 +106,10 @@ abstract class AbstractResponseBodyFlushProcessor */ protected abstract void flush() throws IOException; + private void cancel() { + this.subscription.cancel(); + } + private void writeComplete() { if (logger.isTraceEnabled()) { logger.trace(this.state + " writeComplete"); @@ -157,11 +161,12 @@ abstract class AbstractResponseBodyFlushProcessor else { try { processor.flush(); + processor.subscription.request(1); } catch (IOException ex) { + processor.cancel(); processor.onError(ex); } - processor.subscription.request(1); } } }, COMPLETED { @@ -231,6 +236,7 @@ abstract class AbstractResponseBodyFlushProcessor @Override public void onError(Throwable t) { + processor.cancel(); processor.onError(t); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java index 1a0268cd07d..2463fb94143 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java @@ -159,6 +159,10 @@ abstract class AbstractResponseBodyProcessor implements Processor Date: Wed, 27 Jul 2016 17:44:24 +0300 Subject: [PATCH 3/4] Refactor AbstractResponseBodyFlushProcessor states With the current state machine - the implementation can hang after the last element when executing on Jetty. - in some cases there will be no flush after the last Publisher. --- .../AbstractResponseBodyFlushProcessor.java | 52 ++++++----- .../WriteOnlyHandlerIntegrationTests.java | 86 +++++++++++++++++++ 2 files changed, 117 insertions(+), 21 deletions(-) create mode 100644 spring-web/src/test/java/org/springframework/http/server/reactive/WriteOnlyHandlerIntegrationTests.java diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java index 811cb78a96c..207d9b1c2bc 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java @@ -128,7 +128,7 @@ abstract class AbstractResponseBodyFlushProcessor public void onSubscribe(AbstractResponseBodyFlushProcessor processor, Subscription subscription) { Objects.requireNonNull(subscription, "Subscription cannot be null"); - if (processor.changeState(this, SUBSCRIBED)) { + if (processor.changeState(this, REQUESTED)) { processor.subscription = subscription; subscription.request(1); } @@ -136,40 +136,55 @@ abstract class AbstractResponseBodyFlushProcessor super.onSubscribe(processor, subscription); } } - }, SUBSCRIBED { + }, + REQUESTED { @Override public void onNext(AbstractResponseBodyFlushProcessor processor, Publisher chunk) { - Processor chunkProcessor = - processor.createBodyProcessor(); - chunk.subscribe(chunkProcessor); - chunkProcessor.subscribe(new WriteSubscriber(processor)); + if (processor.changeState(this, RECEIVED)) { + Processor chunkProcessor = + processor.createBodyProcessor(); + chunk.subscribe(chunkProcessor); + chunkProcessor.subscribe(new WriteSubscriber(processor)); + } } @Override void onComplete(AbstractResponseBodyFlushProcessor processor) { - processor.subscriberCompleted = true; + if (processor.changeState(this, COMPLETED)) { + processor.publisherDelegate.publishComplete(); + } } - + }, + RECEIVED { @Override public void writeComplete(AbstractResponseBodyFlushProcessor processor) { + try { + processor.flush(); + } + catch (IOException ex) { + processor.cancel(); + processor.onError(ex); + } + if (processor.subscriberCompleted) { if (processor.changeState(this, COMPLETED)) { processor.publisherDelegate.publishComplete(); } } else { - try { - processor.flush(); + if (processor.changeState(this, REQUESTED)) { processor.subscription.request(1); } - catch (IOException ex) { - processor.cancel(); - processor.onError(ex); - } } } - }, COMPLETED { + + @Override + void onComplete(AbstractResponseBodyFlushProcessor processor) { + processor.subscriberCompleted = true; + } + }, + COMPLETED { @Override public void onNext(AbstractResponseBodyFlushProcessor processor, Publisher publisher) { @@ -186,11 +201,6 @@ abstract class AbstractResponseBodyFlushProcessor void onComplete(AbstractResponseBodyFlushProcessor processor) { // ignore } - - @Override - public void writeComplete(AbstractResponseBodyFlushProcessor processor) { - // ignore - } }; public void onSubscribe(AbstractResponseBodyFlushProcessor processor, @@ -214,7 +224,7 @@ abstract class AbstractResponseBodyFlushProcessor } public void writeComplete(AbstractResponseBodyFlushProcessor processor) { - throw new IllegalStateException(toString()); + // ignore } private static class WriteSubscriber implements Subscriber { diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/WriteOnlyHandlerIntegrationTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/WriteOnlyHandlerIntegrationTests.java new file mode 100644 index 00000000000..b7bafdfe420 --- /dev/null +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/WriteOnlyHandlerIntegrationTests.java @@ -0,0 +1,86 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.server.reactive; + +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.Random; + +import org.junit.Test; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.http.RequestEntity; +import org.springframework.http.ResponseEntity; +import org.springframework.http.server.reactive.HttpHandler; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.web.client.RestTemplate; + +import static org.junit.Assert.*; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * @author Violeta Georgieva + * @since 5.0 + */ +public class WriteOnlyHandlerIntegrationTests extends AbstractHttpHandlerIntegrationTests { + + private static final int REQUEST_SIZE = 4096 * 3; + + private Random rnd = new Random(); + + private byte[] body; + + + @Override + protected WriteOnlyHandler createHttpHandler() { + return new WriteOnlyHandler(); + } + + + @Test + public void writeOnly() throws Exception { + RestTemplate restTemplate = new RestTemplate(); + + this.body = randomBytes(); + RequestEntity request = RequestEntity.post( + new URI("http://localhost:" + port)).body( + "".getBytes(StandardCharsets.UTF_8)); + ResponseEntity response = restTemplate.exchange(request, byte[].class); + + assertArrayEquals(body, response.getBody()); + } + + + private byte[] randomBytes() { + byte[] buffer = new byte[REQUEST_SIZE]; + rnd.nextBytes(buffer); + return buffer; + } + + + public class WriteOnlyHandler implements HttpHandler { + + @Override + public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { + DataBuffer buffer = response.bufferFactory().allocateBuffer(body.length); + buffer.write(body); + return response.writeAndFlushWith(Flux.just(Flux.just(buffer))); + } + } +} From d219054b0d3ac1eed0e95c893fb9fd20a4c80862 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 27 Jul 2016 12:34:31 -0400 Subject: [PATCH 4/4] Polish --- .../AbstractListenerServerHttpResponse.java | 13 +++- .../AbstractResponseBodyFlushProcessor.java | 76 ++++++++++--------- .../AbstractResponseBodyProcessor.java | 56 ++++++++------ .../reactive/ServletServerHttpResponse.java | 29 ++++--- .../reactive/UndertowServerHttpResponse.java | 29 ++++--- 5 files changed, 111 insertions(+), 92 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java index d9126fa0aa0..b14f95b30e7 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerServerHttpResponse.java @@ -26,7 +26,9 @@ import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; /** - * Abstract base class for listener-based server responses, i.e. Servlet 3.1 and Undertow. + * Abstract base class for listener-based server responses, e.g. Servlet 3.1 + * and Undertow. + * * @author Arjen Poutsma * @since 5.0 */ @@ -34,10 +36,12 @@ public abstract class AbstractListenerServerHttpResponse extends AbstractServerH private final AtomicBoolean writeCalled = new AtomicBoolean(); + public AbstractListenerServerHttpResponse(DataBufferFactory dataBufferFactory) { super(dataBufferFactory); } + @Override protected final Mono writeWithInternal(Publisher body) { return writeAndFlushWithInternal(Mono.just(body)); @@ -46,13 +50,13 @@ public abstract class AbstractListenerServerHttpResponse extends AbstractServerH @Override protected final Mono writeAndFlushWithInternal(Publisher> body) { if (this.writeCalled.compareAndSet(false, true)) { - Processor, Void> bodyProcessor = - createBodyFlushProcessor(); + Processor, Void> bodyProcessor = createBodyFlushProcessor(); return Mono.from(subscriber -> { body.subscribe(bodyProcessor); bodyProcessor.subscribe(subscriber); }); - } else { + } + else { return Mono.error(new IllegalStateException( "writeWith() or writeAndFlushWith() has already been called")); } @@ -64,4 +68,5 @@ public abstract class AbstractListenerServerHttpResponse extends AbstractServerH * {@link #writeAndFlushWithInternal(Publisher)}. */ protected abstract Processor, Void> createBodyFlushProcessor(); + } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java index 207d9b1c2bc..ef63b70377e 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java @@ -35,26 +35,25 @@ import org.springframework.core.io.buffer.DataBuffer; * Servlet 3.1 and Undertow support. * * @author Arjen Poutsma + * @author Violeta Georgieva * @since 5.0 * @see ServletServerHttpRequest * @see UndertowHttpHandlerAdapter * @see ServerHttpResponse#writeAndFlushWith(Publisher) */ -abstract class AbstractResponseBodyFlushProcessor - implements Processor, Void> { +abstract class AbstractResponseBodyFlushProcessor implements Processor, Void> { protected final Log logger = LogFactory.getLog(getClass()); - private final ResponseBodyWriteResultPublisher publisherDelegate = - new ResponseBodyWriteResultPublisher(); + private final ResponseBodyWriteResultPublisher resultPublisher = new ResponseBodyWriteResultPublisher(); - private final AtomicReference state = - new AtomicReference<>(State.UNSUBSCRIBED); + private final AtomicReference state = new AtomicReference<>(State.UNSUBSCRIBED); private volatile boolean subscriberCompleted; private Subscription subscription; + // Subscriber @Override @@ -89,13 +88,15 @@ abstract class AbstractResponseBodyFlushProcessor this.state.get().onComplete(this); } + // Publisher @Override public final void subscribe(Subscriber subscriber) { - this.publisherDelegate.subscribe(subscriber); + this.resultPublisher.subscribe(subscriber); } + /** * Creates a new processor for subscribing to a body chunk. */ @@ -106,8 +107,9 @@ abstract class AbstractResponseBodyFlushProcessor */ protected abstract void flush() throws IOException; - private void cancel() { - this.subscription.cancel(); + + private boolean changeState(State oldState, State newState) { + return this.state.compareAndSet(oldState, newState); } private void writeComplete() { @@ -118,15 +120,17 @@ abstract class AbstractResponseBodyFlushProcessor } - private boolean changeState(State oldState, State newState) { - return this.state.compareAndSet(oldState, newState); + private void cancel() { + this.subscription.cancel(); } + private enum State { + UNSUBSCRIBED { + @Override - public void onSubscribe(AbstractResponseBodyFlushProcessor processor, - Subscription subscription) { + public void onSubscribe(AbstractResponseBodyFlushProcessor processor, Subscription subscription) { Objects.requireNonNull(subscription, "Subscription cannot be null"); if (processor.changeState(this, REQUESTED)) { processor.subscription = subscription; @@ -138,25 +142,25 @@ abstract class AbstractResponseBodyFlushProcessor } }, REQUESTED { + @Override - public void onNext(AbstractResponseBodyFlushProcessor processor, - Publisher chunk) { + public void onNext(AbstractResponseBodyFlushProcessor processor, Publisher chunk) { if (processor.changeState(this, RECEIVED)) { - Processor chunkProcessor = - processor.createBodyProcessor(); + Processor chunkProcessor = processor.createBodyProcessor(); chunk.subscribe(chunkProcessor); chunkProcessor.subscribe(new WriteSubscriber(processor)); } } @Override - void onComplete(AbstractResponseBodyFlushProcessor processor) { + public void onComplete(AbstractResponseBodyFlushProcessor processor) { if (processor.changeState(this, COMPLETED)) { - processor.publisherDelegate.publishComplete(); + processor.resultPublisher.publishComplete(); } } }, RECEIVED { + @Override public void writeComplete(AbstractResponseBodyFlushProcessor processor) { try { @@ -169,7 +173,7 @@ abstract class AbstractResponseBodyFlushProcessor if (processor.subscriberCompleted) { if (processor.changeState(this, COMPLETED)) { - processor.publisherDelegate.publishComplete(); + processor.resultPublisher.publishComplete(); } } else { @@ -180,11 +184,12 @@ abstract class AbstractResponseBodyFlushProcessor } @Override - void onComplete(AbstractResponseBodyFlushProcessor processor) { + public void onComplete(AbstractResponseBodyFlushProcessor processor) { processor.subscriberCompleted = true; } }, COMPLETED { + @Override public void onNext(AbstractResponseBodyFlushProcessor processor, Publisher publisher) { @@ -193,33 +198,31 @@ abstract class AbstractResponseBodyFlushProcessor } @Override - void onError(AbstractResponseBodyFlushProcessor processor, Throwable t) { + public void onError(AbstractResponseBodyFlushProcessor processor, Throwable t) { // ignore } @Override - void onComplete(AbstractResponseBodyFlushProcessor processor) { + public void onComplete(AbstractResponseBodyFlushProcessor processor) { // ignore } }; - public void onSubscribe(AbstractResponseBodyFlushProcessor processor, - Subscription subscription) { + public void onSubscribe(AbstractResponseBodyFlushProcessor processor, Subscription subscription) { subscription.cancel(); } - public void onNext(AbstractResponseBodyFlushProcessor processor, - Publisher publisher) { + public void onNext(AbstractResponseBodyFlushProcessor processor, Publisher publisher) { throw new IllegalStateException(toString()); } - void onError(AbstractResponseBodyFlushProcessor processor, Throwable t) { + public void onError(AbstractResponseBodyFlushProcessor processor, Throwable ex) { if (processor.changeState(this, COMPLETED)) { - processor.publisherDelegate.publishError(t); + processor.resultPublisher.publishError(ex); } } - void onComplete(AbstractResponseBodyFlushProcessor processor) { + public void onComplete(AbstractResponseBodyFlushProcessor processor) { throw new IllegalStateException(toString()); } @@ -227,6 +230,7 @@ abstract class AbstractResponseBodyFlushProcessor // ignore } + private static class WriteSubscriber implements Subscriber { private final AbstractResponseBodyFlushProcessor processor; @@ -236,8 +240,8 @@ abstract class AbstractResponseBodyFlushProcessor } @Override - public void onSubscribe(Subscription s) { - s.request(Long.MAX_VALUE); + public void onSubscribe(Subscription subscription) { + subscription.request(Long.MAX_VALUE); } @Override @@ -245,14 +249,14 @@ abstract class AbstractResponseBodyFlushProcessor } @Override - public void onError(Throwable t) { - processor.cancel(); - processor.onError(t); + public void onError(Throwable ex) { + this.processor.cancel(); + this.processor.onError(ex); } @Override public void onComplete() { - processor.writeComplete(); + this.processor.writeComplete(); } } } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java index 2463fb94143..3e62264dd13 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java @@ -48,11 +48,9 @@ abstract class AbstractResponseBodyProcessor implements Processor state = - new AtomicReference<>(State.UNSUBSCRIBED); + private final AtomicReference state = new AtomicReference<>(State.UNSUBSCRIBED); private volatile DataBuffer currentBuffer; @@ -60,6 +58,7 @@ abstract class AbstractResponseBodyProcessor implements Processor subscriber) { - this.publisherDelegate.subscribe(subscriber); + this.resultPublisher.subscribe(subscriber); } + // listener methods /** @@ -167,6 +168,7 @@ abstract class AbstractResponseBodyProcessor implements Processor, Void> createBodyFlushProcessor() { + Processor, Void> processor = new ResponseBodyFlushProcessor(); + registerListener(); + return processor; + } + private void registerListener() { try { outputStream().setWriteListener(writeListener); @@ -121,6 +131,10 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons } } + private ServletOutputStream outputStream() throws IOException { + return this.response.getOutputStream(); + } + private void flush() throws IOException { ServletOutputStream outputStream = outputStream(); if (outputStream.isReady()) { @@ -138,16 +152,6 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons } } - private ServletOutputStream outputStream() throws IOException { - return this.response.getOutputStream(); - } - - @Override - protected Processor, Void> createBodyFlushProcessor() { - Processor, Void> processor = new ResponseBodyFlushProcessor(); - registerListener(); - return processor; - } private class ResponseBodyProcessor extends AbstractResponseBodyProcessor { @@ -155,11 +159,13 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons private final int bufferSize; + public ResponseBodyProcessor(ServletOutputStream outputStream, int bufferSize) { this.outputStream = outputStream; this.bufferSize = bufferSize; } + @Override protected boolean isWritePossible() { return this.outputStream.isReady(); @@ -201,8 +207,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons byte[] buffer = new byte[this.bufferSize]; int bytesRead = -1; - while (this.outputStream.isReady() && - (bytesRead = input.read(buffer)) != -1) { + while (this.outputStream.isReady() && (bytesRead = input.read(buffer)) != -1) { this.outputStream.write(buffer, 0, bytesRead); bytesWritten += bytesRead; } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java index b8df141cb91..30102b6e2d5 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java @@ -55,13 +55,14 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon private StreamSinkChannel responseChannel; - public UndertowServerHttpResponse(HttpServerExchange exchange, - DataBufferFactory dataBufferFactory) { - super(dataBufferFactory); + + public UndertowServerHttpResponse(HttpServerExchange exchange, DataBufferFactory bufferFactory) { + super(bufferFactory); Assert.notNull(exchange, "'exchange' is required."); this.exchange = exchange; } + public HttpServerExchange getUndertowExchange() { return this.exchange; } @@ -78,10 +79,8 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon public Mono writeWith(File file, long position, long count) { writeHeaders(); writeCookies(); - try { - StreamSinkChannel responseChannel = - getUndertowExchange().getResponseChannel(); + StreamSinkChannel responseChannel = getUndertowExchange().getResponseChannel(); @SuppressWarnings("resource") FileChannel in = new FileInputStream(file).getChannel(); long result = responseChannel.transferFrom(in, position, count); @@ -123,20 +122,20 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon } } + @Override + protected AbstractResponseBodyFlushProcessor createBodyFlushProcessor() { + return new ResponseBodyFlushProcessor(); + } + private ResponseBodyProcessor createBodyProcessor() { if (this.responseChannel == null) { this.responseChannel = this.exchange.getResponseChannel(); } - ResponseBodyProcessor bodyProcessor = - new ResponseBodyProcessor( this.responseChannel); + ResponseBodyProcessor bodyProcessor = new ResponseBodyProcessor( this.responseChannel); bodyProcessor.registerListener(); return bodyProcessor; } - @Override - protected AbstractResponseBodyFlushProcessor createBodyFlushProcessor() { - return new ResponseBodyFlushProcessor(); - } private static class ResponseBodyProcessor extends AbstractResponseBodyProcessor { @@ -146,11 +145,13 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon private volatile ByteBuffer byteBuffer; + public ResponseBodyProcessor(StreamSinkChannel responseChannel) { Assert.notNull(responseChannel, "'responseChannel' must not be null"); this.responseChannel = responseChannel; } + public void registerListener() { this.responseChannel.getWriteSetter().set(this.listener); this.responseChannel.resumeWrites(); @@ -202,9 +203,7 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon public void handleEvent(StreamSinkChannel channel) { onWritePossible(); } - } - } private class ResponseBodyFlushProcessor extends AbstractResponseBodyFlushProcessor { @@ -223,6 +222,6 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon UndertowServerHttpResponse.this.responseChannel.flush(); } } - } + }