From e545b20289df650499aeb8f3a30317b5d678cfa5 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Thu, 30 Jun 2016 11:18:13 +0300 Subject: [PATCH] Make AbstractResponseBodySubscriber.onWritePossible thread-safe When there are simultaneous invocations of onWritePossible, only the first one should succeed. This can happens when AbstractResponseBodySubscriber.onNext and WriteListener.onWritePossible() are called respectively by the application and the web container. --- .../AbstractResponseBodySubscriber.java | 66 ++++++++++++------- 1 file changed, 42 insertions(+), 24 deletions(-) diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodySubscriber.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodySubscriber.java index 8158d525f70..e1b6457453f 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodySubscriber.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodySubscriber.java @@ -152,10 +152,13 @@ abstract class AbstractResponseBodySubscriber implements Subscriber * UNSUBSCRIBED * | * v - * REQUESTED <---> RECEIVED - * | | - * v v - * COMPLETED + * REQUESTED -------------------> RECEIVED + * ^ ^ + * | | + * --------- WRITING <----- + * | + * v + * COMPLETED * * Refer to the individual states for more information. */ @@ -206,36 +209,41 @@ abstract class AbstractResponseBodySubscriber implements Subscriber /** * State that gets entered after a buffer has been * {@linkplain Subscriber#onNext(Object) received}. Responds to - * {@code onWritePossible} by writing the current buffer, and if it can be - * written completely, changes state to either {@link #REQUESTED} if the - * subscription has not been completed; or {@link #COMPLETED} if it has. + * {@code onWritePossible} by writing the current buffer and changes + * the state to {@link #WRITING}. If it can be written completely, + * changes the state to either {@link #REQUESTED} if the subscription + * has not been completed; or {@link #COMPLETED} if it has. If it cannot + * be written completely the state will be changed to {@link #RECEIVED}. */ RECEIVED { @Override void onWritePossible(AbstractResponseBodySubscriber subscriber) { - DataBuffer dataBuffer = subscriber.currentBuffer; - try { - boolean writeCompleted = subscriber.write(dataBuffer); - if (writeCompleted) { - if (dataBuffer instanceof FlushingDataBuffer) { - subscriber.flush(); - } - subscriber.releaseBuffer(); - boolean subscriptionCompleted = subscriber.subscriptionCompleted; - if (!subscriptionCompleted) { - if (subscriber.changeState(this, REQUESTED)) { + if (subscriber.changeState(this, WRITING)) { + DataBuffer dataBuffer = subscriber.currentBuffer; + try { + boolean writeCompleted = subscriber.write(dataBuffer); + if (writeCompleted) { + if (dataBuffer instanceof FlushingDataBuffer) { + subscriber.flush(); + } + subscriber.releaseBuffer(); + boolean subscriptionCompleted = subscriber.subscriptionCompleted; + if (!subscriptionCompleted) { + subscriber.changeState(WRITING, REQUESTED); subscriber.subscription.request(1); } - } - else { - if (subscriber.changeState(this, COMPLETED)) { + else { + subscriber.changeState(WRITING, COMPLETED); subscriber.close(); } } + else { + subscriber.changeState(WRITING, RECEIVED); + } + } + catch (IOException ex) { + subscriber.onError(ex); } - } - catch (IOException ex) { - subscriber.onError(ex); } } @@ -244,6 +252,16 @@ abstract class AbstractResponseBodySubscriber implements Subscriber subscriber.subscriptionCompleted = true; } }, + /** + * State that gets entered after a writing of the current buffer has been + * {@code onWritePossible started}. + */ + WRITING { + @Override + void onComplete(AbstractResponseBodySubscriber subscriber) { + subscriber.subscriptionCompleted = true; + } + }, /** * The terminal completed state. Does not respond to any events. */