diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodySubscriber.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodySubscriber.java index 8158d525f70..e1b6457453f 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodySubscriber.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodySubscriber.java @@ -152,10 +152,13 @@ abstract class AbstractResponseBodySubscriber implements Subscriber * UNSUBSCRIBED * | * v - * REQUESTED <---> RECEIVED - * | | - * v v - * COMPLETED + * REQUESTED -------------------> RECEIVED + * ^ ^ + * | | + * --------- WRITING <----- + * | + * v + * COMPLETED * * Refer to the individual states for more information. */ @@ -206,36 +209,41 @@ abstract class AbstractResponseBodySubscriber implements Subscriber /** * State that gets entered after a buffer has been * {@linkplain Subscriber#onNext(Object) received}. Responds to - * {@code onWritePossible} by writing the current buffer, and if it can be - * written completely, changes state to either {@link #REQUESTED} if the - * subscription has not been completed; or {@link #COMPLETED} if it has. + * {@code onWritePossible} by writing the current buffer and changes + * the state to {@link #WRITING}. If it can be written completely, + * changes the state to either {@link #REQUESTED} if the subscription + * has not been completed; or {@link #COMPLETED} if it has. If it cannot + * be written completely the state will be changed to {@link #RECEIVED}. */ RECEIVED { @Override void onWritePossible(AbstractResponseBodySubscriber subscriber) { - DataBuffer dataBuffer = subscriber.currentBuffer; - try { - boolean writeCompleted = subscriber.write(dataBuffer); - if (writeCompleted) { - if (dataBuffer instanceof FlushingDataBuffer) { - subscriber.flush(); - } - subscriber.releaseBuffer(); - boolean subscriptionCompleted = subscriber.subscriptionCompleted; - if (!subscriptionCompleted) { - if (subscriber.changeState(this, REQUESTED)) { + if (subscriber.changeState(this, WRITING)) { + DataBuffer dataBuffer = subscriber.currentBuffer; + try { + boolean writeCompleted = subscriber.write(dataBuffer); + if (writeCompleted) { + if (dataBuffer instanceof FlushingDataBuffer) { + subscriber.flush(); + } + subscriber.releaseBuffer(); + boolean subscriptionCompleted = subscriber.subscriptionCompleted; + if (!subscriptionCompleted) { + subscriber.changeState(WRITING, REQUESTED); subscriber.subscription.request(1); } - } - else { - if (subscriber.changeState(this, COMPLETED)) { + else { + subscriber.changeState(WRITING, COMPLETED); subscriber.close(); } } + else { + subscriber.changeState(WRITING, RECEIVED); + } + } + catch (IOException ex) { + subscriber.onError(ex); } - } - catch (IOException ex) { - subscriber.onError(ex); } } @@ -244,6 +252,16 @@ abstract class AbstractResponseBodySubscriber implements Subscriber subscriber.subscriptionCompleted = true; } }, + /** + * State that gets entered after a writing of the current buffer has been + * {@code onWritePossible started}. + */ + WRITING { + @Override + void onComplete(AbstractResponseBodySubscriber subscriber) { + subscriber.subscriptionCompleted = true; + } + }, /** * The terminal completed state. Does not respond to any events. */