From 38f5f4de8e1aafad7331582cefade701cb7a7c16 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Thu, 8 Jan 2026 15:50:57 +0100 Subject: [PATCH] Add configurable throttle policy to @ConcurrencyLimit annotation Closes gh-36109 --- .../InvocationRejectedException.java | 55 ++++++++ .../annotation/ConcurrencyLimit.java | 35 +++++- .../ConcurrencyLimitBeanPostProcessor.java | 28 ++++- .../resilience/package-info.java | 7 ++ .../resilience/ConcurrencyLimitTests.java | 117 +++++++++++++++++- 5 files changed, 234 insertions(+), 8 deletions(-) create mode 100644 spring-context/src/main/java/org/springframework/resilience/InvocationRejectedException.java create mode 100644 spring-context/src/main/java/org/springframework/resilience/package-info.java diff --git a/spring-context/src/main/java/org/springframework/resilience/InvocationRejectedException.java b/spring-context/src/main/java/org/springframework/resilience/InvocationRejectedException.java new file mode 100644 index 00000000000..cd701e83aba --- /dev/null +++ b/spring-context/src/main/java/org/springframework/resilience/InvocationRejectedException.java @@ -0,0 +1,55 @@ +/* + * Copyright 2002-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.resilience; + +import java.util.concurrent.RejectedExecutionException; + +/** + * Exception thrown when a target will not get invoked due to a resilience policy, + * such as the concurrency limit having been reached for a class/method annotated with + * {@link org.springframework.resilience.annotation.ConcurrencyLimit @ConcurrencyLimit}. + * + * @author Juergen Hoeller + * @since 7.0.3 + * @see org.springframework.resilience.annotation.ConcurrencyLimit#rejectOnExcess() + */ +@SuppressWarnings("serial") +public class InvocationRejectedException extends RejectedExecutionException { + + private final Object target; + + + /** + * Create a new {@code InvocationRejectedException} + * with the specified detail message and target instance. + * @param msg the detail message + * @param target the target instance that was about to be invoked + */ + public InvocationRejectedException(String msg, Object target) { + super(msg); + this.target = target; + } + + + /** + * Return the target instance that was about to be invoked. + */ + public Object getTarget() { + return this.target; + } + +} diff --git a/spring-context/src/main/java/org/springframework/resilience/annotation/ConcurrencyLimit.java b/spring-context/src/main/java/org/springframework/resilience/annotation/ConcurrencyLimit.java index d6a2750cce7..2251b5d85e8 100644 --- a/spring-context/src/main/java/org/springframework/resilience/annotation/ConcurrencyLimit.java +++ b/spring-context/src/main/java/org/springframework/resilience/annotation/ConcurrencyLimit.java @@ -28,7 +28,9 @@ import org.springframework.core.annotation.AliasFor; /** * A common annotation specifying a concurrency limit for an individual method, * or for all proxy-invoked methods in a given class hierarchy if annotated at - * the type level. + * the type level. The default behavior is to block further method invocations + * when the limit has been reached. Alternatively, further invocations can be + * rejected through configuring {@link #policy()} as {@code policy = REJECT}. * *

In the type-level case, all methods inheriting the concurrency limit * from the type level share a common concurrency throttle, with any mix @@ -95,4 +97,35 @@ public @interface ConcurrencyLimit { */ String limitString() default ""; + /** + * The policy for throttling method invocations when the limit has been reached. + *

The default behavior is to block further concurrent invocations once the + * specified limit has been reached: {@link ThrottlePolicy#BLOCK}. + *

Switch this policy to {@code REJECT} for rejecting further invocations instead, + * throwing {@link org.springframework.resilience.InvocationRejectedException} + * (which extends the common {@link java.util.concurrent.RejectedExecutionException}) + * on any further concurrent invocation attempts: {@link ThrottlePolicy#REJECT}. + * @since 7.0.3 + */ + ThrottlePolicy policy() default ThrottlePolicy.BLOCK; + + + /** + * Policy to apply for throttling method invocations when the limit has been reached. + * @since 7.0.3 + */ + enum ThrottlePolicy { + + /** + * The default: block until we can invoke the method within the configured limit. + */ + BLOCK, + + /** + * Alternative: reject further method invocations once the limit has been reached. + * @see org.springframework.resilience.InvocationRejectedException + */ + REJECT + } + } diff --git a/spring-context/src/main/java/org/springframework/resilience/annotation/ConcurrencyLimitBeanPostProcessor.java b/spring-context/src/main/java/org/springframework/resilience/annotation/ConcurrencyLimitBeanPostProcessor.java index 2a74ce208d5..b412047a70e 100644 --- a/spring-context/src/main/java/org/springframework/resilience/annotation/ConcurrencyLimitBeanPostProcessor.java +++ b/spring-context/src/main/java/org/springframework/resilience/annotation/ConcurrencyLimitBeanPostProcessor.java @@ -35,7 +35,9 @@ import org.springframework.aop.support.DefaultPointcutAdvisor; import org.springframework.aop.support.annotation.AnnotationMatchingPointcut; import org.springframework.context.EmbeddedValueResolverAware; import org.springframework.core.annotation.AnnotatedElementUtils; +import org.springframework.resilience.InvocationRejectedException; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; import org.springframework.util.StringUtils; import org.springframework.util.StringValueResolver; @@ -115,7 +117,11 @@ public class ConcurrencyLimitBeanPostProcessor extends AbstractBeanFactoryAwareA if (concurrencyLimit < -1) { throw new IllegalStateException(annotation + " must be configured with a valid limit"); } - interceptor = new ConcurrencyThrottleInterceptor(concurrencyLimit); + interceptor = (annotation.policy() == ConcurrencyLimit.ThrottlePolicy.REJECT ? + new RejectingConcurrencyThrottleInterceptor(concurrencyLimit, + (perMethod ? ClassUtils.getQualifiedMethodName(method) : targetClass.getName()), + instance) : + new ConcurrencyThrottleInterceptor(concurrencyLimit)); if (!perMethod) { holder.classInterceptor = interceptor; } @@ -148,4 +154,24 @@ public class ConcurrencyLimitBeanPostProcessor extends AbstractBeanFactoryAwareA @Nullable MethodInterceptor classInterceptor; } + + private static class RejectingConcurrencyThrottleInterceptor extends ConcurrencyThrottleInterceptor { + + private final String identifier; + + private final Object target; + + public RejectingConcurrencyThrottleInterceptor(int concurrencyLimit, String identifier, Object target) { + super(concurrencyLimit); + this.identifier = identifier; + this.target = target; + } + + @Override + protected void onLimitReached() { + throw new InvocationRejectedException( + "Concurrency limit reached for " + this.identifier + ": " + getConcurrencyLimit(), this.target); + } + } + } diff --git a/spring-context/src/main/java/org/springframework/resilience/package-info.java b/spring-context/src/main/java/org/springframework/resilience/package-info.java new file mode 100644 index 00000000000..7db823ab8e8 --- /dev/null +++ b/spring-context/src/main/java/org/springframework/resilience/package-info.java @@ -0,0 +1,7 @@ +/** + * Common exceptions thrown by Spring's resilience facilities. + */ +@NullMarked +package org.springframework.resilience; + +import org.jspecify.annotations.NullMarked; diff --git a/spring-context/src/test/java/org/springframework/resilience/ConcurrencyLimitTests.java b/spring-context/src/test/java/org/springframework/resilience/ConcurrencyLimitTests.java index 94e22784f05..814514fd0f4 100644 --- a/spring-context/src/test/java/org/springframework/resilience/ConcurrencyLimitTests.java +++ b/spring-context/src/test/java/org/springframework/resilience/ConcurrencyLimitTests.java @@ -37,6 +37,7 @@ import org.springframework.resilience.annotation.EnableResilientMethods; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.Assertions.assertThatIllegalStateException; +import static org.springframework.resilience.annotation.ConcurrencyLimit.ThrottlePolicy.REJECT; /** * @author Juergen Hoeller @@ -89,6 +90,26 @@ class ConcurrencyLimitTests { assertThat(target.current).hasValue(10); } + @Test + void withPostProcessorForMethodWithRejection() throws Exception{ + AnnotatedMethodBean proxy = createProxy(AnnotatedMethodBean.class); + AnnotatedMethodBean target = (AnnotatedMethodBean) AopProxyUtils.getSingletonTarget(proxy); + + List> futures = new ArrayList<>(10); + for (int i = 0; i < 2; i++) { + futures.add(CompletableFuture.runAsync(proxy::rejectingOperation)); + } + Thread.sleep(10); + for (int i = 2; i < 10; i++) { + futures.add(CompletableFuture.runAsync(() -> + assertThatExceptionOfType(InvocationRejectedException.class).isThrownBy(proxy::rejectingOperation) + .withMessageContaining(AnnotatedMethodBean.class.getName() + ".rejectingOperation") + .satisfies(ex -> assertThat(ex.getTarget() == target)))); + } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + assertThat(target.current).hasValue(2); + } + @Test void withPostProcessorForClass() { AnnotatedClassBean proxy = createProxy(AnnotatedClassBean.class); @@ -108,6 +129,30 @@ class ConcurrencyLimitTests { assertThat(target.current).hasValue(0); } + @Test + void withPostProcessorForClassWithRejection() throws Exception { + AnnotatedClassBeanWithRejection proxy = createProxy(AnnotatedClassBeanWithRejection.class); + AnnotatedClassBeanWithRejection target = (AnnotatedClassBeanWithRejection) AopProxyUtils.getSingletonTarget(proxy); + + List> futures = new ArrayList<>(30); + futures.add(CompletableFuture.runAsync(proxy::concurrentOperation)); + futures.add(CompletableFuture.runAsync(proxy::otherOperation)); + Thread.sleep(10); + futures.add(CompletableFuture.runAsync(() -> + assertThatExceptionOfType(InvocationRejectedException.class).isThrownBy(proxy::concurrentOperation) + .withMessageContaining(AnnotatedClassBeanWithRejection.class.getName()) + .satisfies(ex -> assertThat(ex.getTarget() == target)))); + futures.add(CompletableFuture.runAsync(() -> + assertThatExceptionOfType(InvocationRejectedException.class).isThrownBy(proxy::otherOperation) + .withMessageContaining(AnnotatedClassBeanWithRejection.class.getName()) + .satisfies(ex -> assertThat(ex.getTarget() == target)))); + for (int i = 0; i < 10; i++) { + futures.add(CompletableFuture.runAsync(proxy::overrideOperation)); + } + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + assertThat(target.current).hasValue(0); + } + @Test void withPlaceholderResolution() { MockPropertySource mockPropertySource = new MockPropertySource("test").withProperty("test.concurrency.limit", "3"); @@ -197,7 +242,7 @@ class ConcurrencyLimitTests { throw new IllegalStateException(); } try { - Thread.sleep(10); + Thread.sleep(100); } catch (InterruptedException ex) { throw new IllegalStateException(ex); @@ -209,7 +254,18 @@ class ConcurrencyLimitTests { public void unboundedConcurrency() { current.incrementAndGet(); try { - Thread.sleep(10); + Thread.sleep(100); + } + catch (InterruptedException ex) { + throw new IllegalStateException(ex); + } + } + + @ConcurrencyLimit(limit = 2, policy = REJECT) + public void rejectingOperation() { + current.incrementAndGet(); + try { + Thread.sleep(100); } catch (InterruptedException ex) { throw new IllegalStateException(ex); @@ -230,7 +286,7 @@ class ConcurrencyLimitTests { throw new IllegalStateException(); } try { - Thread.sleep(10); + Thread.sleep(100); } catch (InterruptedException ex) { throw new IllegalStateException(ex); @@ -243,7 +299,7 @@ class ConcurrencyLimitTests { throw new IllegalStateException(); } try { - Thread.sleep(10); + Thread.sleep(100); } catch (InterruptedException ex) { throw new IllegalStateException(ex); @@ -257,7 +313,56 @@ class ConcurrencyLimitTests { throw new IllegalStateException(); } try { - Thread.sleep(10); + Thread.sleep(100); + } + catch (InterruptedException ex) { + throw new IllegalStateException(ex); + } + currentOverride.decrementAndGet(); + } + } + + + @ConcurrencyLimit(limit = 2, policy = REJECT) + static class AnnotatedClassBeanWithRejection { + + final AtomicInteger current = new AtomicInteger(); + + final AtomicInteger currentOverride = new AtomicInteger(); + + public void concurrentOperation() { + if (current.incrementAndGet() > 2) { + throw new IllegalStateException(); + } + try { + Thread.sleep(100); + } + catch (InterruptedException ex) { + throw new IllegalStateException(ex); + } + current.decrementAndGet(); + } + + public void otherOperation() { + if (current.incrementAndGet() > 2) { + throw new IllegalStateException(); + } + try { + Thread.sleep(100); + } + catch (InterruptedException ex) { + throw new IllegalStateException(ex); + } + current.decrementAndGet(); + } + + @ConcurrencyLimit(limit = 1) + public void overrideOperation() { + if (currentOverride.incrementAndGet() > 1) { + throw new IllegalStateException(); + } + try { + Thread.sleep(100); } catch (InterruptedException ex) { throw new IllegalStateException(ex); @@ -282,7 +387,7 @@ class ConcurrencyLimitTests { throw new IllegalStateException(); } try { - Thread.sleep(10); + Thread.sleep(100); } catch (InterruptedException ex) { throw new IllegalStateException(ex);