Browse Source

Add configurable throttle policy to @ConcurrencyLimit annotation

Closes gh-36109
pull/36115/head
Juergen Hoeller 3 weeks ago
parent
commit
38f5f4de8e
  1. 55
      spring-context/src/main/java/org/springframework/resilience/InvocationRejectedException.java
  2. 35
      spring-context/src/main/java/org/springframework/resilience/annotation/ConcurrencyLimit.java
  3. 28
      spring-context/src/main/java/org/springframework/resilience/annotation/ConcurrencyLimitBeanPostProcessor.java
  4. 7
      spring-context/src/main/java/org/springframework/resilience/package-info.java
  5. 117
      spring-context/src/test/java/org/springframework/resilience/ConcurrencyLimitTests.java

55
spring-context/src/main/java/org/springframework/resilience/InvocationRejectedException.java

@ -0,0 +1,55 @@ @@ -0,0 +1,55 @@
/*
* Copyright 2002-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.resilience;
import java.util.concurrent.RejectedExecutionException;
/**
* Exception thrown when a target will not get invoked due to a resilience policy,
* such as the concurrency limit having been reached for a class/method annotated with
* {@link org.springframework.resilience.annotation.ConcurrencyLimit @ConcurrencyLimit}.
*
* @author Juergen Hoeller
* @since 7.0.3
* @see org.springframework.resilience.annotation.ConcurrencyLimit#rejectOnExcess()
*/
@SuppressWarnings("serial")
public class InvocationRejectedException extends RejectedExecutionException {
private final Object target;
/**
* Create a new {@code InvocationRejectedException}
* with the specified detail message and target instance.
* @param msg the detail message
* @param target the target instance that was about to be invoked
*/
public InvocationRejectedException(String msg, Object target) {
super(msg);
this.target = target;
}
/**
* Return the target instance that was about to be invoked.
*/
public Object getTarget() {
return this.target;
}
}

35
spring-context/src/main/java/org/springframework/resilience/annotation/ConcurrencyLimit.java

@ -28,7 +28,9 @@ import org.springframework.core.annotation.AliasFor; @@ -28,7 +28,9 @@ import org.springframework.core.annotation.AliasFor;
/**
* A common annotation specifying a concurrency limit for an individual method,
* or for all proxy-invoked methods in a given class hierarchy if annotated at
* the type level.
* the type level. The default behavior is to block further method invocations
* when the limit has been reached. Alternatively, further invocations can be
* rejected through configuring {@link #policy()} as {@code policy = REJECT}.
*
* <p>In the type-level case, all methods inheriting the concurrency limit
* from the type level share a common concurrency throttle, with any mix
@ -95,4 +97,35 @@ public @interface ConcurrencyLimit { @@ -95,4 +97,35 @@ public @interface ConcurrencyLimit {
*/
String limitString() default "";
/**
* The policy for throttling method invocations when the limit has been reached.
* <p>The default behavior is to block further concurrent invocations once the
* specified limit has been reached: {@link ThrottlePolicy#BLOCK}.
* <p>Switch this policy to {@code REJECT} for rejecting further invocations instead,
* throwing {@link org.springframework.resilience.InvocationRejectedException}
* (which extends the common {@link java.util.concurrent.RejectedExecutionException})
* on any further concurrent invocation attempts: {@link ThrottlePolicy#REJECT}.
* @since 7.0.3
*/
ThrottlePolicy policy() default ThrottlePolicy.BLOCK;
/**
* Policy to apply for throttling method invocations when the limit has been reached.
* @since 7.0.3
*/
enum ThrottlePolicy {
/**
* The default: block until we can invoke the method within the configured limit.
*/
BLOCK,
/**
* Alternative: reject further method invocations once the limit has been reached.
* @see org.springframework.resilience.InvocationRejectedException
*/
REJECT
}
}

28
spring-context/src/main/java/org/springframework/resilience/annotation/ConcurrencyLimitBeanPostProcessor.java

@ -35,7 +35,9 @@ import org.springframework.aop.support.DefaultPointcutAdvisor; @@ -35,7 +35,9 @@ import org.springframework.aop.support.DefaultPointcutAdvisor;
import org.springframework.aop.support.annotation.AnnotationMatchingPointcut;
import org.springframework.context.EmbeddedValueResolverAware;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.resilience.InvocationRejectedException;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.StringValueResolver;
@ -115,7 +117,11 @@ public class ConcurrencyLimitBeanPostProcessor extends AbstractBeanFactoryAwareA @@ -115,7 +117,11 @@ public class ConcurrencyLimitBeanPostProcessor extends AbstractBeanFactoryAwareA
if (concurrencyLimit < -1) {
throw new IllegalStateException(annotation + " must be configured with a valid limit");
}
interceptor = new ConcurrencyThrottleInterceptor(concurrencyLimit);
interceptor = (annotation.policy() == ConcurrencyLimit.ThrottlePolicy.REJECT ?
new RejectingConcurrencyThrottleInterceptor(concurrencyLimit,
(perMethod ? ClassUtils.getQualifiedMethodName(method) : targetClass.getName()),
instance) :
new ConcurrencyThrottleInterceptor(concurrencyLimit));
if (!perMethod) {
holder.classInterceptor = interceptor;
}
@ -148,4 +154,24 @@ public class ConcurrencyLimitBeanPostProcessor extends AbstractBeanFactoryAwareA @@ -148,4 +154,24 @@ public class ConcurrencyLimitBeanPostProcessor extends AbstractBeanFactoryAwareA
@Nullable MethodInterceptor classInterceptor;
}
private static class RejectingConcurrencyThrottleInterceptor extends ConcurrencyThrottleInterceptor {
private final String identifier;
private final Object target;
public RejectingConcurrencyThrottleInterceptor(int concurrencyLimit, String identifier, Object target) {
super(concurrencyLimit);
this.identifier = identifier;
this.target = target;
}
@Override
protected void onLimitReached() {
throw new InvocationRejectedException(
"Concurrency limit reached for " + this.identifier + ": " + getConcurrencyLimit(), this.target);
}
}
}

7
spring-context/src/main/java/org/springframework/resilience/package-info.java

@ -0,0 +1,7 @@ @@ -0,0 +1,7 @@
/**
* Common exceptions thrown by Spring's resilience facilities.
*/
@NullMarked
package org.springframework.resilience;
import org.jspecify.annotations.NullMarked;

117
spring-context/src/test/java/org/springframework/resilience/ConcurrencyLimitTests.java

@ -37,6 +37,7 @@ import org.springframework.resilience.annotation.EnableResilientMethods; @@ -37,6 +37,7 @@ import org.springframework.resilience.annotation.EnableResilientMethods;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import static org.springframework.resilience.annotation.ConcurrencyLimit.ThrottlePolicy.REJECT;
/**
* @author Juergen Hoeller
@ -89,6 +90,26 @@ class ConcurrencyLimitTests { @@ -89,6 +90,26 @@ class ConcurrencyLimitTests {
assertThat(target.current).hasValue(10);
}
@Test
void withPostProcessorForMethodWithRejection() throws Exception{
AnnotatedMethodBean proxy = createProxy(AnnotatedMethodBean.class);
AnnotatedMethodBean target = (AnnotatedMethodBean) AopProxyUtils.getSingletonTarget(proxy);
List<CompletableFuture<?>> futures = new ArrayList<>(10);
for (int i = 0; i < 2; i++) {
futures.add(CompletableFuture.runAsync(proxy::rejectingOperation));
}
Thread.sleep(10);
for (int i = 2; i < 10; i++) {
futures.add(CompletableFuture.runAsync(() ->
assertThatExceptionOfType(InvocationRejectedException.class).isThrownBy(proxy::rejectingOperation)
.withMessageContaining(AnnotatedMethodBean.class.getName() + ".rejectingOperation")
.satisfies(ex -> assertThat(ex.getTarget() == target))));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
assertThat(target.current).hasValue(2);
}
@Test
void withPostProcessorForClass() {
AnnotatedClassBean proxy = createProxy(AnnotatedClassBean.class);
@ -108,6 +129,30 @@ class ConcurrencyLimitTests { @@ -108,6 +129,30 @@ class ConcurrencyLimitTests {
assertThat(target.current).hasValue(0);
}
@Test
void withPostProcessorForClassWithRejection() throws Exception {
AnnotatedClassBeanWithRejection proxy = createProxy(AnnotatedClassBeanWithRejection.class);
AnnotatedClassBeanWithRejection target = (AnnotatedClassBeanWithRejection) AopProxyUtils.getSingletonTarget(proxy);
List<CompletableFuture<?>> futures = new ArrayList<>(30);
futures.add(CompletableFuture.runAsync(proxy::concurrentOperation));
futures.add(CompletableFuture.runAsync(proxy::otherOperation));
Thread.sleep(10);
futures.add(CompletableFuture.runAsync(() ->
assertThatExceptionOfType(InvocationRejectedException.class).isThrownBy(proxy::concurrentOperation)
.withMessageContaining(AnnotatedClassBeanWithRejection.class.getName())
.satisfies(ex -> assertThat(ex.getTarget() == target))));
futures.add(CompletableFuture.runAsync(() ->
assertThatExceptionOfType(InvocationRejectedException.class).isThrownBy(proxy::otherOperation)
.withMessageContaining(AnnotatedClassBeanWithRejection.class.getName())
.satisfies(ex -> assertThat(ex.getTarget() == target))));
for (int i = 0; i < 10; i++) {
futures.add(CompletableFuture.runAsync(proxy::overrideOperation));
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
assertThat(target.current).hasValue(0);
}
@Test
void withPlaceholderResolution() {
MockPropertySource mockPropertySource = new MockPropertySource("test").withProperty("test.concurrency.limit", "3");
@ -197,7 +242,7 @@ class ConcurrencyLimitTests { @@ -197,7 +242,7 @@ class ConcurrencyLimitTests {
throw new IllegalStateException();
}
try {
Thread.sleep(10);
Thread.sleep(100);
}
catch (InterruptedException ex) {
throw new IllegalStateException(ex);
@ -209,7 +254,18 @@ class ConcurrencyLimitTests { @@ -209,7 +254,18 @@ class ConcurrencyLimitTests {
public void unboundedConcurrency() {
current.incrementAndGet();
try {
Thread.sleep(10);
Thread.sleep(100);
}
catch (InterruptedException ex) {
throw new IllegalStateException(ex);
}
}
@ConcurrencyLimit(limit = 2, policy = REJECT)
public void rejectingOperation() {
current.incrementAndGet();
try {
Thread.sleep(100);
}
catch (InterruptedException ex) {
throw new IllegalStateException(ex);
@ -230,7 +286,7 @@ class ConcurrencyLimitTests { @@ -230,7 +286,7 @@ class ConcurrencyLimitTests {
throw new IllegalStateException();
}
try {
Thread.sleep(10);
Thread.sleep(100);
}
catch (InterruptedException ex) {
throw new IllegalStateException(ex);
@ -243,7 +299,7 @@ class ConcurrencyLimitTests { @@ -243,7 +299,7 @@ class ConcurrencyLimitTests {
throw new IllegalStateException();
}
try {
Thread.sleep(10);
Thread.sleep(100);
}
catch (InterruptedException ex) {
throw new IllegalStateException(ex);
@ -257,7 +313,56 @@ class ConcurrencyLimitTests { @@ -257,7 +313,56 @@ class ConcurrencyLimitTests {
throw new IllegalStateException();
}
try {
Thread.sleep(10);
Thread.sleep(100);
}
catch (InterruptedException ex) {
throw new IllegalStateException(ex);
}
currentOverride.decrementAndGet();
}
}
@ConcurrencyLimit(limit = 2, policy = REJECT)
static class AnnotatedClassBeanWithRejection {
final AtomicInteger current = new AtomicInteger();
final AtomicInteger currentOverride = new AtomicInteger();
public void concurrentOperation() {
if (current.incrementAndGet() > 2) {
throw new IllegalStateException();
}
try {
Thread.sleep(100);
}
catch (InterruptedException ex) {
throw new IllegalStateException(ex);
}
current.decrementAndGet();
}
public void otherOperation() {
if (current.incrementAndGet() > 2) {
throw new IllegalStateException();
}
try {
Thread.sleep(100);
}
catch (InterruptedException ex) {
throw new IllegalStateException(ex);
}
current.decrementAndGet();
}
@ConcurrencyLimit(limit = 1)
public void overrideOperation() {
if (currentOverride.incrementAndGet() > 1) {
throw new IllegalStateException();
}
try {
Thread.sleep(100);
}
catch (InterruptedException ex) {
throw new IllegalStateException(ex);
@ -282,7 +387,7 @@ class ConcurrencyLimitTests { @@ -282,7 +387,7 @@ class ConcurrencyLimitTests {
throw new IllegalStateException();
}
try {
Thread.sleep(10);
Thread.sleep(100);
}
catch (InterruptedException ex) {
throw new IllegalStateException(ex);

Loading…
Cancel
Save