diff --git a/spring-context/src/main/java/org/springframework/resilience/annotation/ConcurrencyLimit.java b/spring-context/src/main/java/org/springframework/resilience/annotation/ConcurrencyLimit.java
index bacc60a5756..3e6ec382bb0 100644
--- a/spring-context/src/main/java/org/springframework/resilience/annotation/ConcurrencyLimit.java
+++ b/spring-context/src/main/java/org/springframework/resilience/annotation/ConcurrencyLimit.java
@@ -64,25 +64,30 @@ public @interface ConcurrencyLimit {
* @see #limitString()
*/
@AliasFor("limit")
- int value() default 1;
+ int value() default Integer.MIN_VALUE;
/**
- * The applicable concurrency limit: 1 by default,
- * effectively locking the target instance for each method invocation.
- *
Specify a limit higher than 1 for pool-like throttling, constraining
+ * The concurrency limit.
+ *
Specify {@code 1} to effectively lock the target instance for each method
+ * invocation.
+ *
Specify a limit greater than {@code 1} for pool-like throttling, constraining
* the number of concurrent invocations similar to the upper bound of a pool.
+ *
Specify {@code -1} for unbounded concurrency.
* @see #value()
* @see #limitString()
+ * @see org.springframework.util.ConcurrencyThrottleSupport#UNBOUNDED_CONCURRENCY
*/
@AliasFor("value")
- int limit() default 1;
+ int limit() default Integer.MIN_VALUE;
/**
* The concurrency limit, as a configurable String.
- *
A non-empty value specified here overrides the {@link #limit()} (or
- * {@link #value()}) attribute.
+ *
A non-empty value specified here overrides the {@link #limit()} and
+ * {@link #value()} attributes.
*
This supports Spring-style "${...}" placeholders as well as SpEL expressions.
+ *
See the Javadoc for {@link #limit()} for details on supported values.
* @see #limit()
+ * @see org.springframework.util.ConcurrencyThrottleSupport#UNBOUNDED_CONCURRENCY
*/
String limitString() default "";
diff --git a/spring-context/src/main/java/org/springframework/resilience/annotation/ConcurrencyLimitBeanPostProcessor.java b/spring-context/src/main/java/org/springframework/resilience/annotation/ConcurrencyLimitBeanPostProcessor.java
index e195bd8d6f3..f43e0a55226 100644
--- a/spring-context/src/main/java/org/springframework/resilience/annotation/ConcurrencyLimitBeanPostProcessor.java
+++ b/spring-context/src/main/java/org/springframework/resilience/annotation/ConcurrencyLimitBeanPostProcessor.java
@@ -108,6 +108,9 @@ public class ConcurrencyLimitBeanPostProcessor extends AbstractBeanFactoryAwareA
if (interceptor == null) {
Assert.state(annotation != null, "No @ConcurrencyLimit annotation found");
int concurrencyLimit = parseInt(annotation.limit(), annotation.limitString());
+ if (concurrencyLimit < -1) {
+ throw new IllegalStateException(annotation + " must be configured with a valid limit");
+ }
interceptor = new ConcurrencyThrottleInterceptor(concurrencyLimit);
if (!perMethod) {
cache.classInterceptor = interceptor;
diff --git a/spring-context/src/test/java/org/springframework/resilience/ConcurrencyLimitTests.java b/spring-context/src/test/java/org/springframework/resilience/ConcurrencyLimitTests.java
index d8c4c7c3488..a2fbf769574 100644
--- a/spring-context/src/test/java/org/springframework/resilience/ConcurrencyLimitTests.java
+++ b/spring-context/src/test/java/org/springframework/resilience/ConcurrencyLimitTests.java
@@ -35,10 +35,13 @@ import org.springframework.resilience.annotation.ConcurrencyLimitBeanPostProcess
import org.springframework.resilience.annotation.EnableResilientMethods;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
/**
* @author Juergen Hoeller
* @author Hyunsang Han
+ * @author Sam Brannen
* @since 7.0
*/
class ConcurrencyLimitTests {
@@ -61,12 +64,7 @@ class ConcurrencyLimitTests {
@Test
void withPostProcessorForMethod() {
- DefaultListableBeanFactory bf = new DefaultListableBeanFactory();
- bf.registerBeanDefinition("bean", new RootBeanDefinition(AnnotatedMethodBean.class));
- ConcurrencyLimitBeanPostProcessor bpp = new ConcurrencyLimitBeanPostProcessor();
- bpp.setBeanFactory(bf);
- bf.addBeanPostProcessor(bpp);
- AnnotatedMethodBean proxy = bf.getBean(AnnotatedMethodBean.class);
+ AnnotatedMethodBean proxy = createProxy(AnnotatedMethodBean.class);
AnnotatedMethodBean target = (AnnotatedMethodBean) AopProxyUtils.getSingletonTarget(proxy);
List> futures = new ArrayList<>(10);
@@ -77,14 +75,22 @@ class ConcurrencyLimitTests {
assertThat(target.current).hasValue(0);
}
+ @Test
+ void withPostProcessorForMethodWithUnboundedConcurrency() {
+ AnnotatedMethodBean proxy = createProxy(AnnotatedMethodBean.class);
+ AnnotatedMethodBean target = (AnnotatedMethodBean) AopProxyUtils.getSingletonTarget(proxy);
+
+ List> futures = new ArrayList<>(10);
+ for (int i = 0; i < 10; i++) {
+ futures.add(CompletableFuture.runAsync(proxy::unboundedConcurrency));
+ }
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+ assertThat(target.current).hasValue(10);
+ }
+
@Test
void withPostProcessorForClass() {
- DefaultListableBeanFactory bf = new DefaultListableBeanFactory();
- bf.registerBeanDefinition("bean", new RootBeanDefinition(AnnotatedClassBean.class));
- ConcurrencyLimitBeanPostProcessor bpp = new ConcurrencyLimitBeanPostProcessor();
- bpp.setBeanFactory(bf);
- bf.addBeanPostProcessor(bpp);
- AnnotatedClassBean proxy = bf.getBean(AnnotatedClassBean.class);
+ AnnotatedClassBean proxy = createProxy(AnnotatedClassBean.class);
AnnotatedClassBean target = (AnnotatedClassBean) AopProxyUtils.getSingletonTarget(proxy);
List> futures = new ArrayList<>(30);
@@ -122,17 +128,52 @@ class ConcurrencyLimitTests {
ctx.close();
}
+ @Test
+ void configurationErrors() {
+ ConfigurationErrorsBean proxy = createProxy(ConfigurationErrorsBean.class);
+
+ assertThatIllegalStateException()
+ .isThrownBy(proxy::emptyDeclaration)
+ .withMessageMatching("@.+?ConcurrencyLimit(.+?) must be configured with a valid limit")
+ .withMessageContaining("\"\"")
+ .withMessageContaining(String.valueOf(Integer.MIN_VALUE));
+
+ assertThatIllegalStateException()
+ .isThrownBy(proxy::negative42Int)
+ .withMessageMatching("@.+?ConcurrencyLimit(.+?) must be configured with a valid limit")
+ .withMessageContaining("-42");
+
+ assertThatIllegalStateException()
+ .isThrownBy(proxy::negative42String)
+ .withMessageMatching("@.+?ConcurrencyLimit(.+?) must be configured with a valid limit")
+ .withMessageContaining("-42");
+
+ assertThatExceptionOfType(NumberFormatException.class)
+ .isThrownBy(proxy::alphanumericString)
+ .withMessageContaining("B2");
+ }
+
+
+ private static T createProxy(Class beanClass) {
+ DefaultListableBeanFactory bf = new DefaultListableBeanFactory();
+ bf.registerBeanDefinition("bean", new RootBeanDefinition(beanClass));
+ ConcurrencyLimitBeanPostProcessor bpp = new ConcurrencyLimitBeanPostProcessor();
+ bpp.setBeanFactory(bf);
+ bf.addBeanPostProcessor(bpp);
+ return bf.getBean(beanClass);
+ }
+
static class NonAnnotatedBean {
- AtomicInteger counter = new AtomicInteger();
+ final AtomicInteger counter = new AtomicInteger();
public void concurrentOperation() {
if (counter.incrementAndGet() > 2) {
throw new IllegalStateException();
}
try {
- Thread.sleep(100);
+ Thread.sleep(10);
}
catch (InterruptedException ex) {
throw new IllegalStateException(ex);
@@ -144,7 +185,7 @@ class ConcurrencyLimitTests {
static class AnnotatedMethodBean {
- AtomicInteger current = new AtomicInteger();
+ final AtomicInteger current = new AtomicInteger();
@ConcurrencyLimit(2)
public void concurrentOperation() {
@@ -152,29 +193,40 @@ class ConcurrencyLimitTests {
throw new IllegalStateException();
}
try {
- Thread.sleep(100);
+ Thread.sleep(10);
}
catch (InterruptedException ex) {
throw new IllegalStateException(ex);
}
current.decrementAndGet();
}
+
+ @ConcurrencyLimit(limit = -1)
+ public void unboundedConcurrency() {
+ current.incrementAndGet();
+ try {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException ex) {
+ throw new IllegalStateException(ex);
+ }
+ }
}
@ConcurrencyLimit(2)
static class AnnotatedClassBean {
- AtomicInteger current = new AtomicInteger();
+ final AtomicInteger current = new AtomicInteger();
- AtomicInteger currentOverride = new AtomicInteger();
+ final AtomicInteger currentOverride = new AtomicInteger();
public void concurrentOperation() {
if (current.incrementAndGet() > 2) {
throw new IllegalStateException();
}
try {
- Thread.sleep(100);
+ Thread.sleep(10);
}
catch (InterruptedException ex) {
throw new IllegalStateException(ex);
@@ -187,7 +239,7 @@ class ConcurrencyLimitTests {
throw new IllegalStateException();
}
try {
- Thread.sleep(100);
+ Thread.sleep(10);
}
catch (InterruptedException ex) {
throw new IllegalStateException(ex);
@@ -201,7 +253,7 @@ class ConcurrencyLimitTests {
throw new IllegalStateException();
}
try {
- Thread.sleep(100);
+ Thread.sleep(10);
}
catch (InterruptedException ex) {
throw new IllegalStateException(ex);
@@ -218,7 +270,7 @@ class ConcurrencyLimitTests {
static class PlaceholderBean {
- AtomicInteger current = new AtomicInteger();
+ final AtomicInteger current = new AtomicInteger();
@ConcurrencyLimit(limitString = "${test.concurrency.limit}")
public void concurrentOperation() {
@@ -226,7 +278,7 @@ class ConcurrencyLimitTests {
throw new IllegalStateException();
}
try {
- Thread.sleep(100);
+ Thread.sleep(10);
}
catch (InterruptedException ex) {
throw new IllegalStateException(ex);
@@ -235,4 +287,24 @@ class ConcurrencyLimitTests {
}
}
+
+ static class ConfigurationErrorsBean {
+
+ @ConcurrencyLimit
+ public void emptyDeclaration() {
+ }
+
+ @ConcurrencyLimit(-42)
+ public void negative42Int() {
+ }
+
+ @ConcurrencyLimit(limitString = "-42")
+ public void negative42String() {
+ }
+
+ @ConcurrencyLimit(limitString = "B2")
+ public void alphanumericString() {
+ }
+ }
+
}