From b49924ba37d588afc0c5232290f5a6726115c10b Mon Sep 17 00:00:00 2001 From: rstoyanchev Date: Wed, 16 Apr 2025 11:41:00 +0100 Subject: [PATCH 1/2] Revert "Fix handling of timeout in SseEmitter" This reverts commit f92f9c1d5b04aefb467355576e63cc2cc6d78d92. See gh-34762 --- .../annotation/ResponseBodyEmitter.java | 92 +++++-------------- 1 file changed, 22 insertions(+), 70 deletions(-) diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java index afa3008cdc1..e4e5d0e6b7c 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2025 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,7 @@ import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import org.springframework.http.MediaType; @@ -73,20 +73,21 @@ public class ResponseBodyEmitter { @Nullable private Handler handler; - private final AtomicReference state = new AtomicReference<>(State.START); - /** Store send data before handler is initialized. */ private final Set earlySendAttempts = new LinkedHashSet<>(8); + /** Store successful completion before the handler is initialized. */ + private final AtomicBoolean complete = new AtomicBoolean(); + /** Store an error before the handler is initialized. */ @Nullable private Throwable failure; - private final TimeoutCallback timeoutCallback = new TimeoutCallback(); + private final DefaultCallback timeoutCallback = new DefaultCallback(); private final ErrorCallback errorCallback = new ErrorCallback(); - private final CompletionCallback completionCallback = new CompletionCallback(); + private final DefaultCallback completionCallback = new DefaultCallback(); /** @@ -127,7 +128,7 @@ public class ResponseBodyEmitter { this.earlySendAttempts.clear(); } - if (this.state.get() == State.COMPLETE) { + if (this.complete.get()) { if (this.failure != null) { this.handler.completeWithError(this.failure); } @@ -143,7 +144,7 @@ public class ResponseBodyEmitter { } void initializeWithError(Throwable ex) { - if (this.state.compareAndSet(State.START, State.COMPLETE)) { + if (this.complete.compareAndSet(false, true)) { this.failure = ex; this.earlySendAttempts.clear(); this.errorCallback.accept(ex); @@ -185,7 +186,8 @@ public class ResponseBodyEmitter { * @throws java.lang.IllegalStateException wraps any other errors */ public synchronized void send(Object object, @Nullable MediaType mediaType) throws IOException { - assertNotComplete(); + Assert.state(!this.complete.get(), () -> "ResponseBodyEmitter has already completed" + + (this.failure != null ? " with error: " + this.failure : "")); if (this.handler != null) { try { this.handler.send(object, mediaType); @@ -212,13 +214,9 @@ public class ResponseBodyEmitter { * @since 6.0.12 */ public synchronized void send(Set items) throws IOException { - assertNotComplete(); - sendInternal(items); - } - - private void assertNotComplete() { - Assert.state(this.state.get() == State.START, () -> "ResponseBodyEmitter has already completed" + + Assert.state(!this.complete.get(), () -> "ResponseBodyEmitter has already completed" + (this.failure != null ? " with error: " + this.failure : "")); + sendInternal(items); } private void sendInternal(Set items) throws IOException { @@ -250,7 +248,7 @@ public class ResponseBodyEmitter { * related events such as an error while {@link #send(Object) sending}. */ public void complete() { - if (trySetComplete() && this.handler != null) { + if (this.complete.compareAndSet(false, true) && this.handler != null) { this.handler.complete(); } } @@ -267,7 +265,7 @@ public class ResponseBodyEmitter { * {@link #send(Object) sending}. */ public void completeWithError(Throwable ex) { - if (trySetComplete()) { + if (this.complete.compareAndSet(false, true)) { this.failure = ex; if (this.handler != null) { this.handler.completeWithError(ex); @@ -275,11 +273,6 @@ public class ResponseBodyEmitter { } } - private boolean trySetComplete() { - return (this.state.compareAndSet(State.START, State.COMPLETE) || - (this.state.compareAndSet(State.TIMEOUT, State.COMPLETE))); - } - /** * Register code to invoke when the async request times out. This method is * called from a container thread when an async request times out. @@ -376,7 +369,7 @@ public class ResponseBodyEmitter { } - private class TimeoutCallback implements Runnable { + private class DefaultCallback implements Runnable { private final List delegates = new ArrayList<>(1); @@ -386,10 +379,9 @@ public class ResponseBodyEmitter { @Override public void run() { - if (ResponseBodyEmitter.this.state.compareAndSet(State.START, State.TIMEOUT)) { - for (Runnable delegate : this.delegates) { - delegate.run(); - } + ResponseBodyEmitter.this.complete.compareAndSet(false, true); + for (Runnable delegate : this.delegates) { + delegate.run(); } } } @@ -405,51 +397,11 @@ public class ResponseBodyEmitter { @Override public void accept(Throwable t) { - if (ResponseBodyEmitter.this.state.compareAndSet(State.START, State.COMPLETE)) { - for (Consumer delegate : this.delegates) { - delegate.accept(t); - } - } - } - } - - - private class CompletionCallback implements Runnable { - - private final List delegates = new ArrayList<>(1); - - public synchronized void addDelegate(Runnable delegate) { - this.delegates.add(delegate); - } - - @Override - public void run() { - if (ResponseBodyEmitter.this.state.compareAndSet(State.START, State.COMPLETE)) { - for (Runnable delegate : this.delegates) { - delegate.run(); - } + ResponseBodyEmitter.this.complete.compareAndSet(false, true); + for(Consumer delegate : this.delegates) { + delegate.accept(t); } } } - - /** - * Represents a state for {@link ResponseBodyEmitter}. - *

-	 *     START ----+
-	 *       |       |
-	 *       v       |
-	 *    TIMEOUT    |
-	 *       |       |
-	 *       v       |
-	 *   COMPLETE <--+
-	 * 
- * @since 6.2.4 - */ - private enum State { - START, - TIMEOUT, // handling a timeout - COMPLETE - } - } From 9c13c6b695ac70cd4288016815105d0a694b62fb Mon Sep 17 00:00:00 2001 From: rstoyanchev Date: Wed, 16 Apr 2025 11:53:22 +0100 Subject: [PATCH 2/2] Revert "Use optimistic locking where possible in `ResponseBodyEmitter`" This reverts commit e67f892e44bab285ed7e2848f888ff897b0e6d0e. Closes gh-34762 --- .../annotation/ResponseBodyEmitter.java | 54 +++++++++---------- 1 file changed, 26 insertions(+), 28 deletions(-) diff --git a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java index e4e5d0e6b7c..e78b416d3df 100644 --- a/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java +++ b/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import org.springframework.http.MediaType; @@ -77,7 +76,7 @@ public class ResponseBodyEmitter { private final Set earlySendAttempts = new LinkedHashSet<>(8); /** Store successful completion before the handler is initialized. */ - private final AtomicBoolean complete = new AtomicBoolean(); + private boolean complete; /** Store an error before the handler is initialized. */ @Nullable @@ -128,7 +127,7 @@ public class ResponseBodyEmitter { this.earlySendAttempts.clear(); } - if (this.complete.get()) { + if (this.complete) { if (this.failure != null) { this.handler.completeWithError(this.failure); } @@ -143,12 +142,11 @@ public class ResponseBodyEmitter { } } - void initializeWithError(Throwable ex) { - if (this.complete.compareAndSet(false, true)) { - this.failure = ex; - this.earlySendAttempts.clear(); - this.errorCallback.accept(ex); - } + synchronized void initializeWithError(Throwable ex) { + this.complete = true; + this.failure = ex; + this.earlySendAttempts.clear(); + this.errorCallback.accept(ex); } /** @@ -186,7 +184,7 @@ public class ResponseBodyEmitter { * @throws java.lang.IllegalStateException wraps any other errors */ public synchronized void send(Object object, @Nullable MediaType mediaType) throws IOException { - Assert.state(!this.complete.get(), () -> "ResponseBodyEmitter has already completed" + + Assert.state(!this.complete, () -> "ResponseBodyEmitter has already completed" + (this.failure != null ? " with error: " + this.failure : "")); if (this.handler != null) { try { @@ -214,7 +212,7 @@ public class ResponseBodyEmitter { * @since 6.0.12 */ public synchronized void send(Set items) throws IOException { - Assert.state(!this.complete.get(), () -> "ResponseBodyEmitter has already completed" + + Assert.state(!this.complete, () -> "ResponseBodyEmitter has already completed" + (this.failure != null ? " with error: " + this.failure : "")); sendInternal(items); } @@ -247,8 +245,9 @@ public class ResponseBodyEmitter { * to complete request processing. It should not be used after container * related events such as an error while {@link #send(Object) sending}. */ - public void complete() { - if (this.complete.compareAndSet(false, true) && this.handler != null) { + public synchronized void complete() { + this.complete = true; + if (this.handler != null) { this.handler.complete(); } } @@ -264,12 +263,11 @@ public class ResponseBodyEmitter { * container related events such as an error while * {@link #send(Object) sending}. */ - public void completeWithError(Throwable ex) { - if (this.complete.compareAndSet(false, true)) { - this.failure = ex; - if (this.handler != null) { - this.handler.completeWithError(ex); - } + public synchronized void completeWithError(Throwable ex) { + this.complete = true; + this.failure = ex; + if (this.handler != null) { + this.handler.completeWithError(ex); } } @@ -278,7 +276,7 @@ public class ResponseBodyEmitter { * called from a container thread when an async request times out. *

As of 6.2, one can register multiple callbacks for this event. */ - public void onTimeout(Runnable callback) { + public synchronized void onTimeout(Runnable callback) { this.timeoutCallback.addDelegate(callback); } @@ -289,7 +287,7 @@ public class ResponseBodyEmitter { *

As of 6.2, one can register multiple callbacks for this event. * @since 5.0 */ - public void onError(Consumer callback) { + public synchronized void onError(Consumer callback) { this.errorCallback.addDelegate(callback); } @@ -300,7 +298,7 @@ public class ResponseBodyEmitter { * detecting that a {@code ResponseBodyEmitter} instance is no longer usable. *

As of 6.2, one can register multiple callbacks for this event. */ - public void onCompletion(Runnable callback) { + public synchronized void onCompletion(Runnable callback) { this.completionCallback.addDelegate(callback); } @@ -371,15 +369,15 @@ public class ResponseBodyEmitter { private class DefaultCallback implements Runnable { - private final List delegates = new ArrayList<>(1); + private List delegates = new ArrayList<>(1); - public synchronized void addDelegate(Runnable delegate) { + public void addDelegate(Runnable delegate) { this.delegates.add(delegate); } @Override public void run() { - ResponseBodyEmitter.this.complete.compareAndSet(false, true); + ResponseBodyEmitter.this.complete = true; for (Runnable delegate : this.delegates) { delegate.run(); } @@ -389,15 +387,15 @@ public class ResponseBodyEmitter { private class ErrorCallback implements Consumer { - private final List> delegates = new ArrayList<>(1); + private List> delegates = new ArrayList<>(1); - public synchronized void addDelegate(Consumer callback) { + public void addDelegate(Consumer callback) { this.delegates.add(callback); } @Override public void accept(Throwable t) { - ResponseBodyEmitter.this.complete.compareAndSet(false, true); + ResponseBodyEmitter.this.complete = true; for(Consumer delegate : this.delegates) { delegate.accept(t); }