Browse Source

Light refactoring/polish in reactive read/write bridge

Issue: SPR-16207
pull/1598/merge
Rossen Stoyanchev 8 years ago
parent
commit
3c2d1862f1
  1. 165
      spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java
  2. 124
      spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java
  3. 156
      spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java
  4. 4
      spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java
  5. 103
      spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java

165
spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java

@ -52,10 +52,10 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { @@ -52,10 +52,10 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
private volatile long demand;
private volatile boolean publisherCompleted;
private volatile boolean completionBeforeDemand;
@Nullable
private volatile Throwable publisherError;
private volatile Throwable errorBeforeDemand;
@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<AbstractListenerReadPublisher> DEMAND_FIELD_UPDATER =
@ -76,11 +76,8 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { @@ -76,11 +76,8 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
}
// Listener delegation methods...
// Methods for sub-classes to delegate to, when async I/O events occur...
/**
* Listeners can call this to notify when reading is possible.
*/
public final void onDataAvailable() {
if (this.logger.isTraceEnabled()) {
this.logger.trace(this.state + " onDataAvailable");
@ -88,9 +85,6 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { @@ -88,9 +85,6 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
this.state.get().onDataAvailable(this);
}
/**
* Listeners can call this to notify when all data has been read.
*/
public void onAllDataRead() {
if (this.logger.isTraceEnabled()) {
this.logger.trace(this.state + " onAllDataRead");
@ -98,9 +92,6 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { @@ -98,9 +92,6 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
this.state.get().onAllDataRead(this);
}
/**
* Listeners can call this to notify when a read error has occurred.
*/
public final void onError(Throwable t) {
if (this.logger.isTraceEnabled()) {
this.logger.trace(this.state + " onError: " + t);
@ -109,11 +100,17 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { @@ -109,11 +100,17 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
}
// Methods for sub-classes to implement...
/**
* Check if data is available, calling {@link #onDataAvailable()} either
* immediately or later when reading is possible.
*/
protected abstract void checkOnDataAvailable();
/**
* Reads a data from the input, if possible.
* @return the data that was read; or {@code null}
* Read once from the input, if possible.
* @return the item that was read; or {@code null}
*/
@Nullable
protected abstract T read() throws IOException;
@ -125,14 +122,17 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { @@ -125,14 +122,17 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
}
// Private methods for use in State...
/**
* Read and publish data from the input. Continue till there is no more
* demand or there is no more data to be read.
* @return {@code true} if there is more demand; {@code false} otherwise
* Read and publish data one at a time until there is no more data, no more
* demand, or perhaps we completed in the mean time.
* @return {@code true} if there is more demand; {@code false} if there is
* no more demand or we have completed.
*/
private boolean readAndPublish() throws IOException {
long r;
while ((r = demand) > 0 && !publisherCompleted) {
while ((r = this.demand) > 0 && !this.state.get().equals(State.COMPLETED)) {
T data = read();
if (data != null) {
if (r != Long.MAX_VALUE) {
@ -152,96 +152,95 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { @@ -152,96 +152,95 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
return this.state.compareAndSet(oldState, newState);
}
private Subscription createSubscription() {
return new ReadSubscription();
}
private static final class ReadSubscription implements Subscription {
private final AbstractListenerReadPublisher<?> publisher;
/**
* Subscription that delegates signals to State.
*/
private final class ReadSubscription implements Subscription {
public ReadSubscription(AbstractListenerReadPublisher<?> publisher) {
this.publisher = publisher;
}
@Override
public final void request(long n) {
if (this.publisher.logger.isTraceEnabled()) {
this.publisher.logger.trace(state() + " request: " + n);
if (logger.isTraceEnabled()) {
logger.trace(state + " request: " + n);
}
state().request(this.publisher, n);
state.get().request(AbstractListenerReadPublisher.this, n);
}
@Override
public final void cancel() {
if (this.publisher.logger.isTraceEnabled()) {
this.publisher.logger.trace(state() + " cancel");
if (logger.isTraceEnabled()) {
logger.trace(state + " cancel");
}
state().cancel(this.publisher);
}
private State state() {
return this.publisher.state.get();
state.get().cancel(AbstractListenerReadPublisher.this);
}
}
/**
* Represents a state for the {@link Publisher} to be in. The following figure
* indicate the four different states that exist, and the relationships between them.
*
* <pre>
* UNSUBSCRIBED
* |
* v
* NO_DEMAND -------------------> DEMAND
* | ^ ^ |
* | | | |
* | --------- READING <----- |
* | | |
* | v |
* ------------> COMPLETED <---------
* Represents a state for the {@link Publisher} to be in.
* <p><pre>
* UNSUBSCRIBED
* |
* v
* SUBSCRIBING
* |
* v
* +---- NO_DEMAND ---------------> DEMAND ---+
* | ^ ^ |
* | | | |
* | +------- READING <--------+ |
* | | |
* | v |
* +--------------> COMPLETED <---------------+
* </pre>
* Refer to the individual states for more information.
*/
private enum State {
/**
* The initial unsubscribed state. Will respond to {@link
* #subscribe(AbstractListenerReadPublisher, Subscriber)} by
* changing state to {@link #NO_DEMAND}.
*/
UNSUBSCRIBED {
@Override
<T> void subscribe(AbstractListenerReadPublisher<T> publisher, Subscriber<? super T> subscriber) {
Assert.notNull(publisher, "Publisher must not be null");
Assert.notNull(subscriber, "Subscriber must not be null");
if (publisher.changeState(this, SUBSCRIBING)) {
Subscription subscription = new ReadSubscription(publisher);
Subscription subscription = publisher.createSubscription();
publisher.subscriber = subscriber;
subscriber.onSubscribe(subscription);
publisher.changeState(SUBSCRIBING, NO_DEMAND);
if (publisher.publisherCompleted) {
publisher.onAllDataRead();
// Now safe to check "beforeDemand" flags, they won't change once in NO_DEMAND
if (publisher.completionBeforeDemand) {
publisher.state.get().onAllDataRead(publisher);
}
Throwable publisherError = publisher.publisherError;
if (publisherError != null) {
publisher.onError(publisherError);
Throwable ex = publisher.errorBeforeDemand;
if (ex != null) {
publisher.state.get().onError(publisher, ex);
}
}
else {
throw new IllegalStateException(toString());
throw new IllegalStateException("Failed to transition to SUBSCRIBING, " +
"subscriber: " + subscriber);
}
}
@Override
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
publisher.publisherCompleted = true;
publisher.completionBeforeDemand = true;
}
@Override
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable t) {
publisher.publisherError = t;
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) {
publisher.errorBeforeDemand = ex;
}
},
/**
* Very brief state where we know we have a Subscriber but must not
* send onComplete and onError until we after onSubscribe.
*/
SUBSCRIBING {
<T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
if (Operators.validate(n)) {
@ -254,21 +253,15 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { @@ -254,21 +253,15 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
@Override
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
publisher.publisherCompleted = true;
publisher.completionBeforeDemand = true;
}
@Override
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable t) {
publisher.publisherError = t;
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) {
publisher.errorBeforeDemand = ex;
}
},
/**
* State that gets entered when there is no demand. Responds to {@link
* #request(AbstractListenerReadPublisher, long)} by increasing the demand,
* changing state to {@link #DEMAND} and will check whether there
* is data available for reading.
*/
NO_DEMAND {
@Override
<T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
@ -277,21 +270,17 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { @@ -277,21 +270,17 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
if (publisher.changeState(this, DEMAND)) {
publisher.checkOnDataAvailable();
}
// or else we completed at the same time...
}
}
},
/**
* State that gets entered when there is demand. Responds to
* {@link #onDataAvailable(AbstractListenerReadPublisher)} by
* reading the available data. The state will be changed to
* {@link #NO_DEMAND} if there is no demand.
*/
DEMAND {
@Override
<T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
if (Operators.validate(n)) {
Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n);
// Did a concurrent read transition to NO_DEMAND just before us?
if (publisher.changeState(NO_DEMAND, DEMAND)) {
publisher.checkOnDataAvailable();
}
@ -304,6 +293,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { @@ -304,6 +293,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
if (!read(publisher)) {
return;
}
// Maybe demand arrived between readAndPublish and READING->NO_DEMAND?
long r = publisher.demand;
if (r == 0 || publisher.changeState(NO_DEMAND, this)) {
break;
@ -311,6 +301,10 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { @@ -311,6 +301,10 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
}
}
/**
* @return whether to exit the read loop; false means stop trying
* to read, true means check demand one more time.
*/
<T> boolean read(AbstractListenerReadPublisher<T> publisher) {
if (publisher.changeState(this, READING)) {
try {
@ -318,18 +312,19 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { @@ -318,18 +312,19 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
if (demandAvailable) {
if (publisher.changeState(READING, DEMAND)) {
publisher.checkOnDataAvailable();
return false;
}
}
else if (publisher.changeState(READING, NO_DEMAND)) {
publisher.suspendReading();
return true;
}
}
catch (IOException ex) {
publisher.onError(ex);
}
return true;
}
// Either competing onDataAvailable calls (via request or container callback)
// Or a concurrent completion
return false;
}
},
@ -339,6 +334,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { @@ -339,6 +334,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
<T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
if (Operators.validate(n)) {
Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n);
// Did a concurrent read transition to NO_DEMAND just before us?
if (publisher.changeState(NO_DEMAND, DEMAND)) {
publisher.checkOnDataAvailable();
}
@ -346,9 +342,6 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { @@ -346,9 +342,6 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
}
},
/**
* The terminal completed state. Does not respond to any events.
*/
COMPLETED {
@Override
<T> void request(AbstractListenerReadPublisher<T> publisher, long n) {
@ -377,10 +370,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { @@ -377,10 +370,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
}
<T> void cancel(AbstractListenerReadPublisher<T> publisher) {
if (publisher.changeState(this, COMPLETED)) {
publisher.publisherCompleted = true;
}
else {
if (!publisher.changeState(this, COMPLETED)) {
publisher.state.get().cancel(publisher);
}
}
@ -391,7 +381,6 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { @@ -391,7 +381,6 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
if (publisher.changeState(this, COMPLETED)) {
publisher.publisherCompleted = true;
if (publisher.subscriber != null) {
publisher.subscriber.onComplete();
}

124
spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java

@ -43,17 +43,17 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -43,17 +43,17 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
protected final Log logger = LogFactory.getLog(getClass());
private final WriteResultPublisher resultPublisher = new WriteResultPublisher();
private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED);
private volatile boolean subscriberCompleted;
@Nullable
private Subscription subscription;
private volatile boolean subscriberCompleted;
private final WriteResultPublisher resultPublisher = new WriteResultPublisher();
// Subscriber implementation...
// Subscriber methods...
@Override
public final void onSubscribe(Subscription subscription) {
@ -88,7 +88,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -88,7 +88,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
}
// Publisher implementation...
// Publisher method...
@Override
public final void subscribe(Subscriber<? super Void> subscriber) {
@ -96,23 +96,16 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -96,23 +96,16 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
}
/**
* Listeners can call this method to cancel further writing.
*/
// Methods for sub-classes to delegate to, when async I/O events occur...
protected void cancel() {
if (this.subscription != null) {
this.subscription.cancel();
}
}
/**
* Invoked when an error happens while flushing. Defaults to no-op.
* Servlet 3.1 based implementations will receive an
* {@link javax.servlet.AsyncListener#onError} event.
*/
protected void flushingFailed(Throwable t) {
}
// Methods for sub-classes to implement or override...
/**
* Create a new processor for subscribing to the next flush boundary.
@ -120,14 +113,18 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -120,14 +113,18 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
protected abstract Processor<? super T, Void> createWriteProcessor();
/**
* Flush the output.
* Flush the output if ready, or otherwise {@link #isFlushPending()} should
* return true after that.
*/
protected abstract void flush() throws IOException;
/**
* Whether writing is possible.
* Invoked when an error happens while flushing. Defaults to no-op.
* Servlet 3.1 based implementations will receive an
* {@link javax.servlet.AsyncListener#onError} event.
*/
protected abstract boolean isWritePossible();
protected void flushingFailed(Throwable t) {
}
/**
* Whether flushing is pending.
@ -141,25 +138,41 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -141,25 +138,41 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
this.state.get().onFlushPossible(this);
}
private void flushIfPossible() {
if (isWritePossible()) {
onFlushPossible();
}
}
/**
* Whether writing is possible.
*/
protected abstract boolean isWritePossible();
// Private methods for use in State...
private boolean changeState(State oldState, State newState) {
return this.state.compareAndSet(oldState, newState);
}
private void writeComplete() {
if (logger.isTraceEnabled()) {
logger.trace(this.state + " writeComplete");
private void flushIfPossible() {
if (isWritePossible()) {
onFlushPossible();
}
this.state.get().writeComplete(this);
}
/**
* Represents a state for the {@link Processor} to be in.
*
* <p><pre>
* UNSUBSCRIBED
* |
* v
* +--- REQUESTED <--------> RECEIVED ---+
* | | |
* | | |
* | FLUSHING <------+ |
* | | |
* | v |
* +----------> COMPLETED <--------------+
* </pre>
*/
private enum State {
UNSUBSCRIBED {
@ -178,11 +191,13 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -178,11 +191,13 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
REQUESTED {
@Override
public <T> void onNext(AbstractListenerWriteFlushProcessor<T> processor, Publisher<? extends T> chunk) {
public <T> void onNext(AbstractListenerWriteFlushProcessor<T> processor,
Publisher<? extends T> currentPublisher) {
if (processor.changeState(this, RECEIVED)) {
Processor<? super T, Void> chunkProcessor = processor.createWriteProcessor();
chunk.subscribe(chunkProcessor);
chunkProcessor.subscribe(new WriteSubscriber(processor));
Processor<? super T, Void> currentProcessor = processor.createWriteProcessor();
currentPublisher.subscribe(currentProcessor);
currentProcessor.subscribe(new WriteResultSubscriber(processor));
}
}
@Override
@ -202,25 +217,25 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -202,25 +217,25 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
try {
processor.flush();
}
catch (IOException ex) {
catch (Throwable ex) {
processor.flushingFailed(ex);
return;
}
if (processor.subscriberCompleted) {
if (processor.isFlushPending()) {
// Ensure the final flush
processor.changeState(this, FLUSHING);
processor.flushIfPossible();
}
else if (processor.changeState(this, COMPLETED)) {
processor.resultPublisher.publishComplete();
if (processor.changeState(this, REQUESTED)) {
if (processor.subscriberCompleted) {
if (processor.isFlushPending()) {
// Ensure the final flush
processor.changeState(REQUESTED, FLUSHING);
processor.flushIfPossible();
}
else if (processor.changeState(REQUESTED, COMPLETED)) {
processor.resultPublisher.publishComplete();
}
else {
processor.state.get().onComplete(processor);
}
}
else {
processor.state.get().onComplete(processor);
}
}
else {
if (processor.changeState(this, REQUESTED)) {
Assert.state(processor.subscription != null, "No subscription");
processor.subscription.request(1);
}
@ -237,7 +252,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -237,7 +252,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
try {
processor.flush();
}
catch (IOException ex) {
catch (Throwable ex) {
processor.flushingFailed(ex);
return;
}
@ -272,6 +287,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -272,6 +287,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
}
};
public <T> void onSubscribe(AbstractListenerWriteFlushProcessor<T> processor, Subscription subscription) {
subscription.cancel();
}
@ -302,11 +318,16 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -302,11 +318,16 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
}
private static class WriteSubscriber implements Subscriber<Void> {
/**
* Subscriber to receive and delegate completion notifications for from
* the current Publisher, i.e. within the current flush boundary.
*/
private static class WriteResultSubscriber implements Subscriber<Void> {
private final AbstractListenerWriteFlushProcessor<?> processor;
public WriteSubscriber(AbstractListenerWriteFlushProcessor<?> processor) {
public WriteResultSubscriber(AbstractListenerWriteFlushProcessor<?> processor) {
this.processor = processor;
}
@ -327,7 +348,10 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -327,7 +348,10 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
@Override
public void onComplete() {
this.processor.writeComplete();
if (this.processor.logger.isTraceEnabled()) {
this.processor.logger.trace(this.processor.state + " writeComplete");
}
this.processor.state.get().writeComplete(this.processor);
}
}
}

156
spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java

@ -45,20 +45,20 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -45,20 +45,20 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
protected final Log logger = LogFactory.getLog(getClass());
private final WriteResultPublisher resultPublisher = new WriteResultPublisher();
private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED);
@Nullable
private Subscription subscription;
@Nullable
protected volatile T currentData;
private volatile boolean subscriberCompleted;
@Nullable
private Subscription subscription;
private final WriteResultPublisher resultPublisher = new WriteResultPublisher();
// Subscriber implementation...
// Subscriber methods...
@Override
public final void onSubscribe(Subscription subscription) {
@ -93,7 +93,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -93,7 +93,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
}
// Publisher implementation...
// Publisher method...
@Override
public final void subscribe(Subscriber<? super Void> subscriber) {
@ -101,18 +101,12 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -101,18 +101,12 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
}
// Listener delegation methods...
// Methods for sub-classes to delegate to, when async I/O events occur...
/**
* Listeners can call this to notify when writing is possible.
*/
public final void onWritePossible() {
this.state.get().onWritePossible(this);
}
/**
* Listeners can call this method to cancel further writing.
*/
public void cancel() {
if (this.subscription != null) {
this.subscription.cancel();
@ -120,10 +114,19 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -120,10 +114,19 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
}
// Methods for sub-classes to implement or override...
/**
* Whether the given data item has any content to write.
* If false the item is not written.
*/
protected abstract boolean isDataEmpty(T data);
/**
* Called when a data item is received via {@link Subscriber#onNext(Object)}
* Called when a data item is received via {@link Subscriber#onNext(Object)}.
* The default implementation saves the data for writing when possible.
*/
protected void receiveData(T data) {
protected void dataReceived(T data) {
if (this.currentData != null) {
throw new IllegalStateException("Current data not processed yet: " + this.currentData);
}
@ -135,19 +138,14 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -135,19 +138,14 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
*/
protected abstract void releaseData();
/**
* Whether the given data item contains any actual data to be processed.
*/
protected abstract boolean isDataEmpty(T data);
/**
* Whether writing is possible.
*/
protected abstract boolean isWritePossible();
/**
* Writes the given data to the output.
* @param data the data to write
* Write the given item.
* @param data the item to write
* @return whether the data was fully written ({@code true})
* and new data can be requested, or otherwise ({@code false})
*/
@ -174,11 +172,22 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -174,11 +172,22 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
}
// Private methods for use in State...
private boolean changeState(State oldState, State newState) {
return this.state.compareAndSet(oldState, newState);
}
private void changeStateToComplete(State oldState) {
if (changeState(oldState, State.COMPLETED)) {
writingComplete();
this.resultPublisher.publishComplete();
}
else {
this.state.get().onComplete(this);
}
}
private void writeIfPossible() {
if (isWritePossible()) {
onWritePossible();
@ -187,30 +196,23 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -187,30 +196,23 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
/**
* Represents a state for the {@link Subscriber} to be in. The following figure
* indicate the four different states that exist, and the relationships between them.
* Represents a state for the {@link Processor} to be in.
*
* <pre>
* UNSUBSCRIBED
* |
* v
* REQUESTED -------------------> RECEIVED
* ^ ^
* | |
* --------- WRITING <-----
* |
* v
* COMPLETED
* <p><pre>
* UNSUBSCRIBED
* |
* v
* +--- REQUESTED -------------> RECEIVED ---+
* | ^ ^ |
* | | | |
* | + ------ WRITING <------+ |
* | | |
* | v |
* +--------------> COMPLETED <--------------+
* </pre>
* Refer to the individual states for more information.
*/
private enum State {
/**
* The initial unsubscribed state. Will respond to {@code onSubscribe} by
* requesting 1 data from the subscription, and change state to {@link
* #REQUESTED}.
*/
UNSUBSCRIBED {
@Override
public <T> void onSubscribe(AbstractListenerWriteProcessor<T> processor, Subscription subscription) {
@ -225,12 +227,6 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -225,12 +227,6 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
}
},
/**
* State that gets entered after a data has been
* {@linkplain Subscription#request(long) requested}. Responds to {@code onNext}
* by changing state to {@link #RECEIVED}, and responds to {@code onComplete} by
* changing state to {@link #COMPLETED}.
*/
REQUESTED {
@Override
public <T> void onNext(AbstractListenerWriteProcessor<T> processor, T data) {
@ -239,7 +235,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -239,7 +235,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
processor.subscription.request(1);
}
else {
processor.receiveData(data);
processor.dataReceived(data);
if (processor.changeState(this, RECEIVED)) {
processor.writeIfPossible();
}
@ -247,25 +243,10 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -247,25 +243,10 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
}
@Override
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
if (processor.changeState(this, COMPLETED)) {
processor.writingComplete();
processor.resultPublisher.publishComplete();
}
else {
processor.state.get().onComplete(processor);
}
processor.changeStateToComplete(this);
}
},
/**
* State that gets entered after a data has been
* {@linkplain Subscriber#onNext(Object) received}. Responds to
* {@code onWritePossible} by writing the current data and changes
* the state to {@link #WRITING}. If it can be written completely,
* changes the state to either {@link #REQUESTED} if the subscription
* has not been completed; or {@link #COMPLETED} if it has. If it cannot
* be written completely the state will be changed to {@code #RECEIVED}.
*/
RECEIVED {
@Override
public <T> void onWritePossible(AbstractListenerWriteProcessor<T> processor) {
@ -276,35 +257,24 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -276,35 +257,24 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
boolean writeCompleted = processor.write(data);
if (writeCompleted) {
processor.releaseData();
if (!processor.subscriberCompleted) {
if (processor.changeState(WRITING, REQUESTED)) {
if (processor.subscriberCompleted) {
if (processor.changeState(REQUESTED, COMPLETED)) {
processor.writingComplete();
processor.resultPublisher.publishComplete();
} else {
processor.state.get().onComplete(processor);
}
}
else {
processor.suspendWriting();
Assert.state(processor.subscription != null, "No subscription");
processor.subscription.request(1);
}
if (processor.changeState(WRITING, REQUESTED)) {
if (processor.subscriberCompleted) {
processor.changeStateToComplete(REQUESTED);
}
}
else {
if (processor.changeState(WRITING, COMPLETED)) {
processor.writingComplete();
processor.resultPublisher.publishComplete();
} else {
processor.state.get().onComplete(processor);
else {
processor.suspendWriting();
Assert.state(processor.subscription != null, "No subscription");
processor.subscription.request(1);
}
}
}
else {
processor.changeState(WRITING, RECEIVED);
processor.writeIfPossible();
else if (processor.changeState(WRITING, RECEIVED)) {
if (processor.subscriberCompleted) {
processor.changeStateToComplete(RECEIVED);
}
else {
processor.writeIfPossible();
}
}
}
catch (IOException ex) {
@ -312,16 +282,13 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -312,16 +282,13 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
}
}
}
@Override
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
processor.subscriberCompleted = true;
}
},
/**
* State that gets entered after a writing of the current data has been
* {@code onWritePossible started}.
*/
WRITING {
@Override
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
@ -329,9 +296,6 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -329,9 +296,6 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
}
},
/**
* The terminal completed state. Does not respond to any events.
*/
COMPLETED {
@Override
public <T> void onNext(AbstractListenerWriteProcessor<T> processor, T data) {

4
spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java

@ -208,8 +208,8 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl @@ -208,8 +208,8 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
}
@Override
protected void receiveData(DataBuffer dataBuffer) {
super.receiveData(dataBuffer);
protected void dataReceived(DataBuffer dataBuffer) {
super.dataReceived(dataBuffer);
this.byteBuffer = dataBuffer.asByteBuffer();
}

103
spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java

@ -33,6 +33,7 @@ import org.springframework.util.Assert; @@ -33,6 +33,7 @@ import org.springframework.util.Assert;
*
* @author Arjen Poutsma
* @author Violeta Georgieva
* @author Rossen Stoyanchev
* @since 5.0
*/
class WriteResultPublisher implements Publisher<Void> {
@ -44,10 +45,10 @@ class WriteResultPublisher implements Publisher<Void> { @@ -44,10 +45,10 @@ class WriteResultPublisher implements Publisher<Void> {
@Nullable
private Subscriber<? super Void> subscriber;
private volatile boolean publisherCompleted;
private volatile boolean completedBeforeSubscribed;
@Nullable
private volatile Throwable publisherError;
private volatile Throwable errorBeforeSubscribed;
@Override
@ -58,12 +59,8 @@ class WriteResultPublisher implements Publisher<Void> { @@ -58,12 +59,8 @@ class WriteResultPublisher implements Publisher<Void> {
this.state.get().subscribe(this, subscriber);
}
private boolean changeState(State oldState, State newState) {
return this.state.compareAndSet(oldState, newState);
}
/**
* Publishes the complete signal to the subscriber of this publisher.
* Delegate a completion signal to the subscriber.
*/
public void publishComplete() {
if (logger.isTraceEnabled()) {
@ -73,7 +70,7 @@ class WriteResultPublisher implements Publisher<Void> { @@ -73,7 +70,7 @@ class WriteResultPublisher implements Publisher<Void> {
}
/**
* Publishes the given error signal to the subscriber of this publisher.
* Delegate the given error signal to the subscriber.
*/
public void publishError(Throwable t) {
if (logger.isTraceEnabled()) {
@ -82,12 +79,20 @@ class WriteResultPublisher implements Publisher<Void> { @@ -82,12 +79,20 @@ class WriteResultPublisher implements Publisher<Void> {
this.state.get().publishError(this, t);
}
private boolean changeState(State oldState, State newState) {
return this.state.compareAndSet(oldState, newState);
}
private static final class ResponseBodyWriteResultSubscription implements Subscription {
/**
* Subscription to receive and delegate request and cancel signals from the
* suscbriber to this publisher.
*/
private static final class WriteResultSubscription implements Subscription {
private final WriteResultPublisher publisher;
public ResponseBodyWriteResultSubscription(WriteResultPublisher publisher) {
public WriteResultSubscription(WriteResultPublisher publisher) {
this.publisher = publisher;
}
@ -113,6 +118,21 @@ class WriteResultPublisher implements Publisher<Void> { @@ -113,6 +118,21 @@ class WriteResultPublisher implements Publisher<Void> {
}
/**
* Represents a state for the {@link Publisher} to be in.
* <p><pre>
* UNSUBSCRIBED
* |
* v
* SUBSCRIBING
* |
* v
* SUBSCRIBED
* |
* v
* COMPLETED
* </pre>
*/
private enum State {
UNSUBSCRIBED {
@ -120,14 +140,15 @@ class WriteResultPublisher implements Publisher<Void> { @@ -120,14 +140,15 @@ class WriteResultPublisher implements Publisher<Void> {
void subscribe(WriteResultPublisher publisher, Subscriber<? super Void> subscriber) {
Assert.notNull(subscriber, "Subscriber must not be null");
if (publisher.changeState(this, SUBSCRIBING)) {
Subscription subscription = new ResponseBodyWriteResultSubscription(publisher);
Subscription subscription = new WriteResultSubscription(publisher);
publisher.subscriber = subscriber;
subscriber.onSubscribe(subscription);
publisher.changeState(SUBSCRIBING, SUBSCRIBED);
if (publisher.publisherCompleted) {
// Now safe to check "beforeSubscribed" flags, they won't change once in NO_DEMAND
if (publisher.completedBeforeSubscribed) {
publisher.publishComplete();
}
Throwable publisherError = publisher.publisherError;
Throwable publisherError = publisher.errorBeforeSubscribed;
if (publisherError != null) {
publisher.publishError(publisherError);
}
@ -138,11 +159,11 @@ class WriteResultPublisher implements Publisher<Void> { @@ -138,11 +159,11 @@ class WriteResultPublisher implements Publisher<Void> {
}
@Override
void publishComplete(WriteResultPublisher publisher) {
publisher.publisherCompleted = true;
publisher.completedBeforeSubscribed = true;
}
@Override
void publishError(WriteResultPublisher publisher, Throwable t) {
publisher.publisherError = t;
void publishError(WriteResultPublisher publisher, Throwable ex) {
publisher.errorBeforeSubscribed = ex;
}
},
@ -153,11 +174,11 @@ class WriteResultPublisher implements Publisher<Void> { @@ -153,11 +174,11 @@ class WriteResultPublisher implements Publisher<Void> {
}
@Override
void publishComplete(WriteResultPublisher publisher) {
publisher.publisherCompleted = true;
publisher.completedBeforeSubscribed = true;
}
@Override
void publishError(WriteResultPublisher publisher, Throwable t) {
publisher.publisherError = t;
void publishError(WriteResultPublisher publisher, Throwable ex) {
publisher.errorBeforeSubscribed = ex;
}
},
@ -166,26 +187,6 @@ class WriteResultPublisher implements Publisher<Void> { @@ -166,26 +187,6 @@ class WriteResultPublisher implements Publisher<Void> {
void request(WriteResultPublisher publisher, long n) {
Operators.validate(n);
}
@Override
void publishComplete(WriteResultPublisher publisher) {
if (publisher.changeState(this, COMPLETED)) {
Assert.state(publisher.subscriber != null, "No subscriber");
publisher.subscriber.onComplete();
}
else {
publisher.state.get().publishComplete(publisher);
}
}
@Override
void publishError(WriteResultPublisher publisher, Throwable t) {
if (publisher.changeState(this, COMPLETED)) {
Assert.state(publisher.subscriber != null, "No subscriber");
publisher.subscriber.onError(t);
}
else {
publisher.state.get().publishError(publisher, t);
}
}
},
COMPLETED {
@ -197,6 +198,14 @@ class WriteResultPublisher implements Publisher<Void> { @@ -197,6 +198,14 @@ class WriteResultPublisher implements Publisher<Void> {
void cancel(WriteResultPublisher publisher) {
// ignore
}
@Override
void publishComplete(WriteResultPublisher publisher) {
// ignore
}
@Override
void publishError(WriteResultPublisher publisher, Throwable t) {
// ignore
}
};
void subscribe(WriteResultPublisher publisher, Subscriber<? super Void> subscriber) {
@ -214,11 +223,23 @@ class WriteResultPublisher implements Publisher<Void> { @@ -214,11 +223,23 @@ class WriteResultPublisher implements Publisher<Void> {
}
void publishComplete(WriteResultPublisher publisher) {
// ignore
if (publisher.changeState(this, COMPLETED)) {
Assert.state(publisher.subscriber != null, "No subscriber");
publisher.subscriber.onComplete();
}
else {
publisher.state.get().publishComplete(publisher);
}
}
void publishError(WriteResultPublisher publisher, Throwable t) {
// ignore
if (publisher.changeState(this, COMPLETED)) {
Assert.state(publisher.subscriber != null, "No subscriber");
publisher.subscriber.onError(t);
}
else {
publisher.state.get().publishError(publisher, t);
}
}
}

Loading…
Cancel
Save