Browse Source

Make AbstractResponseBodySubscriber.onWritePossible thread-safe

When there are simultaneous invocations of onWritePossible, only the
first one should succeed. This can happens when
AbstractResponseBodySubscriber.onNext and
WriteListener.onWritePossible() are called respectively by the
application and the web container.
pull/1111/head
Violeta Georgieva 10 years ago
parent
commit
e545b20289
  1. 66
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodySubscriber.java

66
spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodySubscriber.java

@ -152,10 +152,13 @@ abstract class AbstractResponseBodySubscriber implements Subscriber<DataBuffer> @@ -152,10 +152,13 @@ abstract class AbstractResponseBodySubscriber implements Subscriber<DataBuffer>
* UNSUBSCRIBED
* |
* v
* REQUESTED <---> RECEIVED
* | |
* v v
* COMPLETED
* REQUESTED -------------------> RECEIVED
* ^ ^
* | |
* --------- WRITING <-----
* |
* v
* COMPLETED
* </pre>
* Refer to the individual states for more information.
*/
@ -206,36 +209,41 @@ abstract class AbstractResponseBodySubscriber implements Subscriber<DataBuffer> @@ -206,36 +209,41 @@ abstract class AbstractResponseBodySubscriber implements Subscriber<DataBuffer>
/**
* State that gets entered after a buffer has been
* {@linkplain Subscriber#onNext(Object) received}. Responds to
* {@code onWritePossible} by writing the current buffer, and if it can be
* written completely, changes state to either {@link #REQUESTED} if the
* subscription has not been completed; or {@link #COMPLETED} if it has.
* {@code onWritePossible} by writing the current buffer and changes
* the state to {@link #WRITING}. If it can be written completely,
* changes the state to either {@link #REQUESTED} if the subscription
* has not been completed; or {@link #COMPLETED} if it has. If it cannot
* be written completely the state will be changed to {@link #RECEIVED}.
*/
RECEIVED {
@Override
void onWritePossible(AbstractResponseBodySubscriber subscriber) {
DataBuffer dataBuffer = subscriber.currentBuffer;
try {
boolean writeCompleted = subscriber.write(dataBuffer);
if (writeCompleted) {
if (dataBuffer instanceof FlushingDataBuffer) {
subscriber.flush();
}
subscriber.releaseBuffer();
boolean subscriptionCompleted = subscriber.subscriptionCompleted;
if (!subscriptionCompleted) {
if (subscriber.changeState(this, REQUESTED)) {
if (subscriber.changeState(this, WRITING)) {
DataBuffer dataBuffer = subscriber.currentBuffer;
try {
boolean writeCompleted = subscriber.write(dataBuffer);
if (writeCompleted) {
if (dataBuffer instanceof FlushingDataBuffer) {
subscriber.flush();
}
subscriber.releaseBuffer();
boolean subscriptionCompleted = subscriber.subscriptionCompleted;
if (!subscriptionCompleted) {
subscriber.changeState(WRITING, REQUESTED);
subscriber.subscription.request(1);
}
}
else {
if (subscriber.changeState(this, COMPLETED)) {
else {
subscriber.changeState(WRITING, COMPLETED);
subscriber.close();
}
}
else {
subscriber.changeState(WRITING, RECEIVED);
}
}
catch (IOException ex) {
subscriber.onError(ex);
}
}
catch (IOException ex) {
subscriber.onError(ex);
}
}
@ -244,6 +252,16 @@ abstract class AbstractResponseBodySubscriber implements Subscriber<DataBuffer> @@ -244,6 +252,16 @@ abstract class AbstractResponseBodySubscriber implements Subscriber<DataBuffer>
subscriber.subscriptionCompleted = true;
}
},
/**
* State that gets entered after a writing of the current buffer has been
* {@code onWritePossible started}.
*/
WRITING {
@Override
void onComplete(AbstractResponseBodySubscriber subscriber) {
subscriber.subscriptionCompleted = true;
}
},
/**
* The terminal completed state. Does not respond to any events.
*/

Loading…
Cancel
Save