From 11ed847aca046590e162d631d8a6ef67791d01cb Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Tue, 28 Jun 2016 10:49:15 +0200 Subject: [PATCH] AbstractRequestBodyPublisher improvements Reactored Servlet 3.1 and Undertow request support (AbstractResponseBodySubscriber) to use an internal state machine, making thread-safity a lot easier. --- .../AbstractRequestBodyPublisher.java | 335 ++++++++++++------ .../reactive/ServletHttpHandlerAdapter.java | 82 ++--- .../reactive/UndertowHttpHandlerAdapter.java | 88 ++--- 3 files changed, 293 insertions(+), 212 deletions(-) diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java index 2a29fec9043..5be914a6cb5 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java @@ -16,16 +16,21 @@ package org.springframework.http.server.reactive; +import java.io.IOException; +import java.nio.channels.Channel; import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import javax.servlet.ReadListener; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.util.BackpressureUtils; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.util.Assert; /** * Abstract base class for {@code Publisher} implementations that bridge between @@ -38,171 +43,287 @@ import org.springframework.util.Assert; */ abstract class AbstractRequestBodyPublisher implements Publisher { - private ResponseBodySubscription subscription; + protected final Log logger = LogFactory.getLog(getClass()); - private volatile boolean stalled; + private final AtomicReference state = + new AtomicReference<>(State.UNSUBSCRIBED); + + private final AtomicLong demand = new AtomicLong(); + + private Subscriber subscriber; @Override public void subscribe(Subscriber subscriber) { - Objects.requireNonNull(subscriber); - Assert.state(this.subscription == null, "Only a single subscriber allowed"); - - this.subscription = new ResponseBodySubscription(subscriber); - subscriber.onSubscribe(this.subscription); + if (this.logger.isTraceEnabled()) { + this.logger.trace(this.state + " subscribe: " + subscriber); + } + this.state.get().subscribe(this, subscriber); } /** - * Publishes the given signal to the subscriber. - * @param dataBuffer the signal to publish - * @see Subscriber#onNext(Object) + * Called via a listener interface to indicate that reading is possible. + * @see ReadListener#onDataAvailable() + * @see org.xnio.ChannelListener#handleEvent(Channel) */ - protected final void publishOnNext(DataBuffer dataBuffer) { - Assert.state(this.subscription != null); - this.subscription.publishOnNext(dataBuffer); + protected final void onDataAvailable() { + if (this.logger.isTraceEnabled()) { + this.logger.trace(this.state + " onDataAvailable"); + } + this.state.get().onDataAvailable(this); } /** - * Publishes the given error to the subscriber. - * @param t the error to publish - * @see Subscriber#onError(Throwable) + * Called via a listener interface to indicate that all data has been read. + * @see ReadListener#onAllDataRead() + * @see org.xnio.ChannelListener#handleEvent(Channel) */ - protected final void publishOnError(Throwable t) { - if (this.subscription != null) { - this.subscription.publishOnError(t); + protected final void onAllDataRead() { + if (this.logger.isTraceEnabled()) { + this.logger.trace(this.state + " onAllDataRead"); } + this.state.get().onAllDataRead(this); } /** - * Publishes the complete signal to the subscriber. - * @see Subscriber#onComplete() + * Called by a listener interface to indicate that as error has occured. + * @param t the error + * @see ReadListener#onError(Throwable) */ - protected final void publishOnComplete() { - if (this.subscription != null) { - this.subscription.publishOnComplete(); + protected final void onError(Throwable t) { + if (this.logger.isErrorEnabled()) { + this.logger.error(this.state + " onError: " + t, t); } + this.state.get().onError(this, t); } /** - * Returns true if the {@code Subscriber} associated with this {@code Publisher} has - * cancelled its {@code Subscription}. - * @return {@code true} if a subscriber has been registered and its subscription has - * been cancelled; {@code false} otherwise - * @see ResponseBodySubscription#isCancelled() - * @see Subscription#cancel() + * Reads and publishes data buffers from the input. Continues till either there is no + * more demand, or till there is no more data to be read. + * @return {@code true} if there is more data to be read; {@code false} otherwise */ - protected final boolean isSubscriptionCancelled() { - return (this.subscription != null && this.subscription.isCancelled()); + private boolean readAndPublish() { + try { + while (hasDemand()) { + DataBuffer dataBuffer = read(); + if (dataBuffer != null) { + BackpressureUtils.getAndSub(this.demand, 1L); + this.subscriber.onNext(dataBuffer); + } + else { + return false; + } + } + return true; + } + catch (IOException ex) { + onError(ex); + return false; + } } /** - * Checks the subscription for demand, and marks this publisher as "stalled" if there - * is none. The next time the subscriber {@linkplain Subscription#request(long) - * requests} more events, the {@link #noLongerStalled()} method is called. - * @return {@code true} if there is demand; {@code false} otherwise + * Reads a data buffer from the input, if possible. Returns {@code null} if a buffer + * could not be read. + * @return the data buffer that was read; or {@code null} */ - protected final boolean checkSubscriptionForDemand() { - if (this.subscription == null || !this.subscription.hasDemand()) { - this.stalled = true; - return false; - } - else { - return true; - } - } + protected abstract DataBuffer read() throws IOException; /** - * Abstract template method called when this publisher is no longer "stalled". Used in - * sub-classes to resume reading from the request. + * Closes the input. */ - protected abstract void noLongerStalled(); + protected abstract void close(); - private final class ResponseBodySubscription implements Subscription { + private boolean hasDemand() { + return this.demand.get() > 0; + } - private final Subscriber subscriber; + private boolean changeState(AbstractRequestBodyPublisher.State oldState, + AbstractRequestBodyPublisher.State newState) { + return this.state.compareAndSet(oldState, newState); + } - private final AtomicLong demand = new AtomicLong(); + private static final class RequestBodySubscription implements Subscription { - private boolean cancelled; + private final AbstractRequestBodyPublisher publisher; - public ResponseBodySubscription(Subscriber subscriber) { - Assert.notNull(subscriber, "'subscriber' must not be null"); + public RequestBodySubscription(AbstractRequestBodyPublisher publisher) { + this.publisher = publisher; + } - this.subscriber = subscriber; + @Override + public final void request(long n) { + if (this.publisher.logger.isTraceEnabled()) { + this.publisher.logger.trace(state() + " request: " + n); + } + state().request(this.publisher, n); } @Override public final void cancel() { - this.cancelled = true; + if (this.publisher.logger.isTraceEnabled()) { + this.publisher.logger.trace(state() + " cancel"); + } + state().cancel(this.publisher); } - /** - * Indicates whether this subscription has been cancelled. - * @see #cancel() - */ - protected final boolean isCancelled() { - return this.cancelled; + private AbstractRequestBodyPublisher.State state() { + return this.publisher.state.get(); } - @Override - public final void request(long n) { - if (!isCancelled() && BackpressureUtils.checkRequest(n, this.subscriber)) { - long demand = BackpressureUtils.addAndGet(this.demand, n); + } - if (stalled && demand > 0) { - stalled = false; - noLongerStalled(); - } - } - } + /** + * Represents a state for the {@link Publisher} to be in. The following figure + * indicate the four different states that exist, and the relationships between them. + * + *
+	 *       UNSUBSCRIBED
+	 *        |
+	 *        v
+	 * DATA_UNAVAILABLE <---> DATA_AVAILABLE
+	 *                |       |
+	 *                v       v
+	 *                COMPLETED
+	 * 
+ * Refer to the individual states for more information. + */ + private enum State { /** - * Indicates whether this subscription has demand. - * @see #request(long) + * The initial unsubscribed state. Will respond to {@link + * #subscribe(AbstractRequestBodyPublisher, Subscriber)} by + * changing state to {@link #DATA_UNAVAILABLE}. */ - protected final boolean hasDemand() { - return this.demand.get() > 0; - } - + UNSUBSCRIBED { + @Override + void subscribe(AbstractRequestBodyPublisher publisher, + Subscriber subscriber) { + Objects.requireNonNull(subscriber); + if (publisher.changeState(this, DATA_UNAVAILABLE)) { + Subscription subscription = new RequestBodySubscription( + publisher); + publisher.subscriber = subscriber; + subscriber.onSubscribe(subscription); + } + else { + throw new IllegalStateException(toString()); + } + } + }, /** - * Publishes the given signal to the subscriber wrapped by this subscription, if - * it has not been cancelled. If there is {@linkplain #hasDemand() no demand} for - * the signal, an exception will be thrown. - * @param dataBuffer the signal to publish - * @see Subscriber#onNext(Object) + * State that gets entered when there is no data to be read. Responds to {@link + * #request(AbstractRequestBodyPublisher, long)} by increasing the demand, and + * responds to {@link #onDataAvailable(AbstractRequestBodyPublisher)} by + * reading the available data and changing state to {@link #DATA_AVAILABLE} if + * there continues to be more data available after the demand has been satisfied. */ - protected final void publishOnNext(DataBuffer dataBuffer) { - if (!isCancelled()) { - if (hasDemand()) { - BackpressureUtils.getAndSub(this.demand, 1L); - this.subscriber.onNext(dataBuffer); + DATA_UNAVAILABLE { + @Override + void request(AbstractRequestBodyPublisher publisher, long n) { + if (BackpressureUtils.checkRequest(n, publisher.subscriber)) { + BackpressureUtils.addAndGet(publisher.demand, n); } - else { - throw new IllegalStateException("No demand for: " + dataBuffer); + } + + @Override + void onDataAvailable(AbstractRequestBodyPublisher publisher) { + boolean dataAvailable = publisher.readAndPublish(); + if (dataAvailable) { + publisher.changeState(this, DATA_AVAILABLE); } } - } + }, /** - * Publishes the given error to the subscriber wrapped by this subscription, if it - * has not been cancelled. - * @param t the error to publish - * @see Subscriber#onError(Throwable) + * State that gets entered when there is data to be read. Responds to {@link + * #request(AbstractRequestBodyPublisher, long)} by increasing the demand, and + * by reading the available data and changing state to {@link #DATA_UNAVAILABLE} + * if there is no more data available. */ - protected final void publishOnError(Throwable t) { - if (!isCancelled()) { - this.subscriber.onError(t); + DATA_AVAILABLE { + @Override + void request(AbstractRequestBodyPublisher publisher, long n) { + if (BackpressureUtils.checkRequest(n, publisher.subscriber)) { + BackpressureUtils.addAndGet(publisher.demand, n); + boolean dataAvailable = publisher.readAndPublish(); + if (!dataAvailable) { + publisher.changeState(this, DATA_UNAVAILABLE); + } + } } - } - + }, /** - * Publishes the complete signal to the subscriber wrapped by this subscription, - * if it has not been cancelled. - * @see Subscriber#onComplete() + * The terminal completed state. Does not respond to any events. */ - protected final void publishOnComplete() { - if (!isCancelled()) { - this.subscriber.onComplete(); + COMPLETED { + @Override + void subscribe(AbstractRequestBodyPublisher publisher, + Subscriber subscriber) { + // ignore + } + + @Override + void request(AbstractRequestBodyPublisher publisher, long n) { + // ignore + } + + @Override + void cancel(AbstractRequestBodyPublisher publisher) { + // ignore + } + + @Override + void onDataAvailable(AbstractRequestBodyPublisher publisher) { + // ignore + } + + @Override + void onAllDataRead(AbstractRequestBodyPublisher publisher) { + // ignore } + + @Override + void onError(AbstractRequestBodyPublisher publisher, Throwable t) { + // ignore + } + }; + + void subscribe(AbstractRequestBodyPublisher publisher, + Subscriber subscriber) { + throw new IllegalStateException(toString()); + } + + void request(AbstractRequestBodyPublisher publisher, long n) { + throw new IllegalStateException(toString()); + } + + void cancel(AbstractRequestBodyPublisher publisher) { + if (publisher.changeState(this, COMPLETED)) { + publisher.close(); + } + } + + void onDataAvailable(AbstractRequestBodyPublisher publisher) { + throw new IllegalStateException(toString()); } + + void onAllDataRead(AbstractRequestBodyPublisher publisher) { + if (publisher.changeState(this, COMPLETED)) { + publisher.close(); + if (publisher.subscriber != null) { + publisher.subscriber.onComplete(); + } + } + } + + void onError(AbstractRequestBodyPublisher publisher, Throwable t) { + if (publisher.changeState(this, COMPLETED)) { + publisher.close(); + if (publisher.subscriber != null) { + publisher.subscriber.onError(t); + } + } + } + } } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java index 3f0f4304240..4314ec221e3 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java @@ -141,10 +141,8 @@ public class ServletHttpHandlerAdapter extends HttpServlet { private static class RequestBodyPublisher extends AbstractRequestBodyPublisher { - private static final Log logger = LogFactory.getLog(RequestBodyPublisher.class); - - private final RequestBodyReadListener readListener = - new RequestBodyReadListener(); + private final RequestBodyPublisher.RequestBodyReadListener readListener = + new RequestBodyPublisher.RequestBodyReadListener(); private final ServletAsyncContextSynchronizer synchronizer; @@ -165,76 +163,50 @@ public class ServletHttpHandlerAdapter extends HttpServlet { } @Override - protected void noLongerStalled() { - try { - this.readListener.onDataAvailable(); - } - catch (IOException ex) { - this.readListener.onError(ex); + protected DataBuffer read() throws IOException { + ServletInputStream input = this.synchronizer.getRequest().getInputStream(); + if (input.isReady()) { + int read = input.read(this.buffer); + if (logger.isTraceEnabled()) { + logger.trace("read:" + read); + } + + if (read > 0) { + DataBuffer dataBuffer = this.dataBufferFactory.allocateBuffer(read); + dataBuffer.write(this.buffer, 0, read); + return dataBuffer; + } } + return null; + } + + @Override + protected void close() { + this.synchronizer.readComplete(); + } private class RequestBodyReadListener implements ReadListener { @Override public void onDataAvailable() throws IOException { - if (isSubscriptionCancelled()) { - return; - } - logger.trace("onDataAvailable"); - ServletInputStream input = - RequestBodyPublisher.this.synchronizer.getRequest() - .getInputStream(); - - while (true) { - if (!checkSubscriptionForDemand()) { - break; - } - - boolean ready = input.isReady(); - logger.trace( - "Input ready: " + ready + " finished: " + input.isFinished()); - - if (!ready) { - break; - } - - int read = input.read(RequestBodyPublisher.this.buffer); - logger.trace("Input read:" + read); - - if (read == -1) { - break; - } - else if (read > 0) { - DataBuffer dataBuffer = - RequestBodyPublisher.this.dataBufferFactory - .allocateBuffer(read); - dataBuffer.write(RequestBodyPublisher.this.buffer, 0, read); - - publishOnNext(dataBuffer); - } - } + RequestBodyPublisher.this.onDataAvailable(); } @Override public void onAllDataRead() throws IOException { - logger.trace("All data read"); - RequestBodyPublisher.this.synchronizer.readComplete(); - - publishOnComplete(); + RequestBodyPublisher.this.onAllDataRead(); } @Override - public void onError(Throwable t) { - logger.trace("RequestBodyReadListener Error", t); - RequestBodyPublisher.this.synchronizer.readComplete(); + public void onError(Throwable throwable) { + RequestBodyPublisher.this.onError(throwable); - publishOnError(t); } } - } + private static class ResponseBodySubscriber extends AbstractResponseBodySubscriber { private final ResponseBodyWriteListener writeListener = diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java index 4d3f4ac7aed..dfc9d5837bd 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java @@ -57,7 +57,6 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle this.dataBufferFactory = dataBufferFactory; } - @Override public void handleRequest(HttpServerExchange exchange) throws Exception { @@ -72,7 +71,8 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle responseBody.registerListener(); ServerHttpResponse response = new UndertowServerHttpResponse(exchange, responseChannel, - publisher -> Mono.from(subscriber -> publisher.subscribe(responseBody)), + publisher -> Mono + .from(subscriber -> publisher.subscribe(responseBody)), this.dataBufferFactory); this.delegate.handle(request, response).subscribe(new Subscriber() { @@ -90,7 +90,8 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle @Override public void onError(Throwable ex) { if (exchange.isResponseStarted() || exchange.getStatusCode() > 500) { - logger.error("Error from request handling. Completing the request.", ex); + logger.error("Error from request handling. Completing the request.", + ex); } else { exchange.setStatusCode(500); @@ -107,10 +108,11 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle private static class RequestBodyPublisher extends AbstractRequestBodyPublisher { - private static final Log logger = LogFactory.getLog(RequestBodyPublisher.class); + private final ChannelListener readListener = + new ReadListener(); - private final ChannelListener listener = - new RequestBodyListener(); + private final ChannelListener closeListener = + new CloseListener(); private final StreamSourceChannel requestChannel; @@ -127,11 +129,31 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle } public void registerListener() { - this.requestChannel.getReadSetter().set(this.listener); + this.requestChannel.getReadSetter().set(this.readListener); + this.requestChannel.getCloseSetter().set(this.closeListener); this.requestChannel.resumeReads(); } - private void close() { + @Override + protected DataBuffer read() throws IOException { + ByteBuffer byteBuffer = this.pooledByteBuffer.getBuffer(); + int read = this.requestChannel.read(byteBuffer); + if (logger.isTraceEnabled()) { + logger.trace("read:" + read); + } + + if (read > 0) { + byteBuffer.flip(); + return this.dataBufferFactory.wrap(byteBuffer); + } + else if (read == -1) { + onAllDataRead(); + } + return null; + } + + @Override + protected void close() { if (this.pooledByteBuffer != null) { IoUtils.safeClose(this.pooledByteBuffer); } @@ -140,54 +162,21 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle } } - @Override - protected void noLongerStalled() { - this.listener.handleEvent(this.requestChannel); + private class ReadListener implements ChannelListener { + + @Override + public void handleEvent(StreamSourceChannel channel) { + onDataAvailable(); + } } - private class RequestBodyListener - implements ChannelListener { + private class CloseListener implements ChannelListener { @Override public void handleEvent(StreamSourceChannel channel) { - if (isSubscriptionCancelled()) { - return; - } - logger.trace("handleEvent"); - ByteBuffer byteBuffer = - RequestBodyPublisher.this.pooledByteBuffer.getBuffer(); - try { - while (true) { - if (!checkSubscriptionForDemand()) { - break; - } - int read = channel.read(byteBuffer); - logger.trace("Input read:" + read); - - if (read == -1) { - publishOnComplete(); - close(); - break; - } - else if (read == 0) { - // input not ready, wait until we are invoked again - break; - } - else { - byteBuffer.flip(); - DataBuffer dataBuffer = - RequestBodyPublisher.this.dataBufferFactory - .wrap(byteBuffer); - publishOnNext(dataBuffer); - } - } - } - catch (IOException ex) { - publishOnError(ex); - } + onAllDataRead(); } } - } private static class ResponseBodySubscriber extends AbstractResponseBodySubscriber { @@ -296,5 +285,4 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle } - } \ No newline at end of file