From d68232c880f770466993b4de7e66ebe958df9e7a Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Thu, 7 Jul 2016 15:21:58 +0300 Subject: [PATCH] Refactor AbstractRequestBodyPublisher states The state machine is refactored in order to solve various concurrency issues. --- .../AbstractRequestBodyPublisher.java | 106 +++++++++--------- .../reactive/ServletHttpHandlerAdapter.java | 21 +++- .../reactive/UndertowServerHttpRequest.java | 5 + 3 files changed, 74 insertions(+), 58 deletions(-) diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java index 049b3c837bc..2e0a7359c0d 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java @@ -52,8 +52,6 @@ abstract class AbstractRequestBodyPublisher implements Publisher { private Subscriber subscriber; - private volatile boolean dataAvailable; - @Override public void subscribe(Subscriber subscriber) { if (this.logger.isTraceEnabled()) { @@ -101,28 +99,24 @@ abstract class AbstractRequestBodyPublisher implements Publisher { /** * Reads and publishes data buffers from the input. Continues till either there is no * more demand, or till there is no more data to be read. - * @return {@code true} if there is more data to be read; {@code false} otherwise + * @return {@code true} if there is more demand; {@code false} otherwise */ - private boolean readAndPublish() { - try { - while (hasDemand()) { - DataBuffer dataBuffer = read(); - if (dataBuffer != null) { - BackpressureUtils.getAndSub(this.demand, 1L); - this.subscriber.onNext(dataBuffer); - } - else { - return false; - } + private boolean readAndPublish() throws IOException { + while (hasDemand()) { + DataBuffer dataBuffer = read(); + if (dataBuffer != null) { + BackpressureUtils.getAndSub(this.demand, 1L); + this.subscriber.onNext(dataBuffer); + } + else { + return true; } - return true; - } - catch (IOException ex) { - onError(ex); - return false; } + return false; } + protected abstract void checkOnDataAvailable(); + /** * Reads a data buffer from the input, if possible. Returns {@code null} if a buffer * could not be read. @@ -182,10 +176,13 @@ abstract class AbstractRequestBodyPublisher implements Publisher { * UNSUBSCRIBED * | * v - * DATA_UNAVAILABLE <---> DATA_AVAILABLE - * | | - * v v - * COMPLETED + * NO_DEMAND -------------------> DEMAND + * | ^ ^ | + * | | | | + * | --------- READING <----- | + * | | | + * | v | + * ------------> COMPLETED <--------- * * Refer to the individual states for more information. */ @@ -194,16 +191,14 @@ abstract class AbstractRequestBodyPublisher implements Publisher { /** * The initial unsubscribed state. Will respond to {@link * #subscribe(AbstractRequestBodyPublisher, Subscriber)} by - * changing state to {@link #DATA_UNAVAILABLE}. + * changing state to {@link #NO_DEMAND}. */ UNSUBSCRIBED { @Override void subscribe(AbstractRequestBodyPublisher publisher, Subscriber subscriber) { Objects.requireNonNull(subscriber); - State newState = - publisher.dataAvailable ? DATA_AVAILABLE : DATA_UNAVAILABLE; - if (publisher.changeState(this, newState)) { + if (publisher.changeState(this, NO_DEMAND)) { Subscription subscription = new RequestBodySubscription( publisher); publisher.subscriber = subscriber; @@ -213,54 +208,55 @@ abstract class AbstractRequestBodyPublisher implements Publisher { throw new IllegalStateException(toString()); } } - - @Override - void onDataAvailable(AbstractRequestBodyPublisher publisher) { - publisher.dataAvailable = true; - } }, /** - * State that gets entered when there is no data to be read. Responds to {@link - * #request(AbstractRequestBodyPublisher, long)} by increasing the demand, and - * responds to {@link #onDataAvailable(AbstractRequestBodyPublisher)} by - * reading the available data and changing state to {@link #DATA_AVAILABLE} if - * there continues to be more data available after the demand has been satisfied. + * State that gets entered when there is no demand. Responds to {@link + * #request(AbstractRequestBodyPublisher, long)} by increasing the demand, + * changing state to {@link #DEMAND} and will check whether there + * is data available for reading. */ - DATA_UNAVAILABLE { + NO_DEMAND { @Override void request(AbstractRequestBodyPublisher publisher, long n) { if (BackpressureUtils.checkRequest(n, publisher.subscriber)) { BackpressureUtils.addAndGet(publisher.demand, n); + if (publisher.changeState(this, DEMAND)) { + publisher.checkOnDataAvailable(); + } } } - + }, + /** + * State that gets entered when there is demand. Responds to + * {@link #onDataAvailable(AbstractRequestBodyPublisher)} by + * reading the available data. The state will be changed to + * {@link #NO_DEMAND} if there is no demand. + */ + DEMAND { @Override void onDataAvailable(AbstractRequestBodyPublisher publisher) { - boolean dataAvailable = publisher.readAndPublish(); - if (dataAvailable) { - publisher.changeState(this, DATA_AVAILABLE); + if (publisher.changeState(this, READING)) { + try { + boolean demandAvailable = publisher.readAndPublish(); + if (demandAvailable) { + publisher.changeState(READING, DEMAND); + publisher.checkOnDataAvailable(); + } else { + publisher.changeState(READING, NO_DEMAND); + } + } catch (IOException ex) { + publisher.onError(ex); + } } } - }, - /** - * State that gets entered when there is data to be read. Responds to {@link - * #request(AbstractRequestBodyPublisher, long)} by increasing the demand, and - * by reading the available data and changing state to {@link #DATA_UNAVAILABLE} - * if there is no more data available. - */ - DATA_AVAILABLE { + READING { @Override void request(AbstractRequestBodyPublisher publisher, long n) { if (BackpressureUtils.checkRequest(n, publisher.subscriber)) { BackpressureUtils.addAndGet(publisher.demand, n); - boolean dataAvailable = publisher.readAndPublish(); - if (!dataAvailable) { - publisher.changeState(this, DATA_UNAVAILABLE); - } } } - }, /** * The terminal completed state. Does not respond to any events. diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java index 61ff7409e3e..c8b03c4c50e 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java @@ -159,13 +159,28 @@ public class ServletHttpHandlerAdapter extends HttpServlet { } public void registerListener() throws IOException { - this.synchronizer.getRequest().getInputStream() - .setReadListener(this.readListener); + inputStream().setReadListener(this.readListener); + } + + private ServletInputStream inputStream() throws IOException { + return this.synchronizer.getRequest().getInputStream(); + } + + @Override + protected void checkOnDataAvailable() { + try { + if (!inputStream().isFinished() && inputStream().isReady()) { + onDataAvailable(); + } + } + catch (IOException ex) { + onError(ex); + } } @Override protected DataBuffer read() throws IOException { - ServletInputStream input = this.synchronizer.getRequest().getInputStream(); + ServletInputStream input = inputStream(); if (input.isReady()) { int read = input.read(this.buffer); if (logger.isTraceEnabled()) { diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java index 19f56d596dc..1597a3af2c9 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java @@ -129,6 +129,11 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { this.requestChannel.resumeReads(); } + @Override + protected void checkOnDataAvailable() { + onDataAvailable(); + } + @Override protected DataBuffer read() throws IOException { ByteBuffer byteBuffer = this.pooledByteBuffer.getBuffer();