Browse Source

Consistently throw Task/InvocationRejectedException on access rejected

Closes gh-36167
pull/36174/head
Juergen Hoeller 2 weeks ago
parent
commit
2d2809f45c
  1. 27
      spring-context/src/main/java/org/springframework/resilience/annotation/ConcurrencyLimitBeanPostProcessor.java
  2. 7
      spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java
  3. 7
      spring-core/src/main/java/org/springframework/core/task/SyncTaskExecutor.java
  4. 26
      spring-core/src/main/java/org/springframework/util/ConcurrencyThrottleSupport.java
  5. 7
      spring-core/src/test/java/org/springframework/core/task/SimpleAsyncTaskExecutorTests.java

27
spring-context/src/main/java/org/springframework/resilience/annotation/ConcurrencyLimitBeanPostProcessor.java

@ -117,11 +117,10 @@ public class ConcurrencyLimitBeanPostProcessor extends AbstractBeanFactoryAwareA @@ -117,11 +117,10 @@ public class ConcurrencyLimitBeanPostProcessor extends AbstractBeanFactoryAwareA
if (concurrencyLimit < -1) {
throw new IllegalStateException(annotation + " must be configured with a valid limit");
}
String name = (perMethod ? ClassUtils.getQualifiedMethodName(method) : targetClass.getName());
interceptor = (annotation.policy() == ConcurrencyLimit.ThrottlePolicy.REJECT ?
new RejectingConcurrencyThrottleInterceptor(concurrencyLimit,
(perMethod ? ClassUtils.getQualifiedMethodName(method) : targetClass.getName()),
instance) :
new ConcurrencyThrottleInterceptor(concurrencyLimit));
new RejectingConcurrencyThrottleInterceptor(concurrencyLimit, name, instance) :
new ResilienceConcurrencyThrottleInterceptor(concurrencyLimit, name, instance));
if (!perMethod) {
holder.classInterceptor = interceptor;
}
@ -155,22 +154,34 @@ public class ConcurrencyLimitBeanPostProcessor extends AbstractBeanFactoryAwareA @@ -155,22 +154,34 @@ public class ConcurrencyLimitBeanPostProcessor extends AbstractBeanFactoryAwareA
}
private static class RejectingConcurrencyThrottleInterceptor extends ConcurrencyThrottleInterceptor {
private static class ResilienceConcurrencyThrottleInterceptor extends ConcurrencyThrottleInterceptor {
private final String identifier;
private final Object target;
public RejectingConcurrencyThrottleInterceptor(int concurrencyLimit, String identifier, Object target) {
public ResilienceConcurrencyThrottleInterceptor(int concurrencyLimit, String identifier, Object target) {
super(concurrencyLimit);
this.identifier = identifier;
this.target = target;
}
@Override
protected void onAccessRejected(String msg) {
throw new InvocationRejectedException(msg + " " + this.identifier, this.target);
}
}
private static class RejectingConcurrencyThrottleInterceptor extends ResilienceConcurrencyThrottleInterceptor {
public RejectingConcurrencyThrottleInterceptor(int concurrencyLimit, String identifier, Object target) {
super(concurrencyLimit, identifier, target);
}
@Override
protected void onLimitReached() {
throw new InvocationRejectedException(
"Concurrency limit reached for " + this.identifier + ": " + getConcurrencyLimit(), this.target);
onAccessRejected("Concurrency limit reached: " + getConcurrencyLimit() + " - not allowed to enter");
}
}

7
spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java

@ -425,11 +425,16 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator @@ -425,11 +425,16 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
@Override
protected void onLimitReached() {
if (rejectTasksWhenLimitReached) {
throw new TaskRejectedException("Concurrency limit reached: " + getConcurrencyLimit());
onAccessRejected("Concurrency limit reached: " + getConcurrencyLimit());
}
super.onLimitReached();
}
@Override
protected void onAccessRejected(String msg) {
throw new TaskRejectedException(msg);
}
@Override
protected void afterAccess() {
super.afterAccess();

7
spring-core/src/main/java/org/springframework/core/task/SyncTaskExecutor.java

@ -109,9 +109,14 @@ public class SyncTaskExecutor extends ConcurrencyThrottleSupport implements Task @@ -109,9 +109,14 @@ public class SyncTaskExecutor extends ConcurrencyThrottleSupport implements Task
@Override
protected void onLimitReached() {
if (this.rejectTasksWhenLimitReached) {
throw new TaskRejectedException("Concurrency limit reached: " + getConcurrencyLimit());
onAccessRejected("Concurrency limit reached: " + getConcurrencyLimit());
}
super.onLimitReached();
}
@Override
protected void onAccessRejected(String msg) {
throw new TaskRejectedException(msg);
}
}

26
spring-core/src/main/java/org/springframework/util/ConcurrencyThrottleSupport.java

@ -51,8 +51,8 @@ public abstract class ConcurrencyThrottleSupport implements Serializable { @@ -51,8 +51,8 @@ public abstract class ConcurrencyThrottleSupport implements Serializable {
/**
* Concurrency limit which signals unbounded concurrency: {@value}.
* <p>Setting the limit to this value permits any number of concurrent
* invocations: that is, concurrency will not be throttled.
* <p>Setting the limit to this value permits any number of concurrent access:
* that is, concurrency will not be throttled.
* @see #NO_CONCURRENCY
*/
public static final int UNBOUNDED_CONCURRENCY = -1;
@ -60,7 +60,7 @@ public abstract class ConcurrencyThrottleSupport implements Serializable { @@ -60,7 +60,7 @@ public abstract class ConcurrencyThrottleSupport implements Serializable {
/**
* Concurrency limit which signals that concurrency throttling has been
* disabled: {@value}.
* <p>Setting the limit to this value prevents all invocations.
* <p>Setting the limit to this value prevents all access.
* @see #beforeAccess()
* @see #UNBOUNDED_CONCURRENCY
*/
@ -117,8 +117,7 @@ public abstract class ConcurrencyThrottleSupport implements Serializable { @@ -117,8 +117,7 @@ public abstract class ConcurrencyThrottleSupport implements Serializable {
*/
protected void beforeAccess() {
if (this.concurrencyLimit == NO_CONCURRENCY) {
throw new IllegalStateException(
"Currently no invocations allowed - concurrency limit set to NO_CONCURRENCY");
onAccessRejected("Concurrency limit set to NO_CONCURRENCY - not allowed to enter");
}
if (this.concurrencyLimit > 0) {
this.concurrencyLock.lock();
@ -146,7 +145,7 @@ public abstract class ConcurrencyThrottleSupport implements Serializable { @@ -146,7 +145,7 @@ public abstract class ConcurrencyThrottleSupport implements Serializable {
boolean interrupted = false;
while (this.concurrencyCount >= this.concurrencyLimit) {
if (interrupted) {
throw new IllegalStateException("Thread was interrupted while waiting for invocation access, " +
onAccessRejected("Thread was interrupted while waiting for access " +
"but concurrency limit still does not allow for entering");
}
if (logger.isDebugEnabled()) {
@ -164,6 +163,21 @@ public abstract class ConcurrencyThrottleSupport implements Serializable { @@ -164,6 +163,21 @@ public abstract class ConcurrencyThrottleSupport implements Serializable {
}
}
/**
* Triggered when access has been rejected due to the concurrency policy
* or due to interruption.
* <p>Implementations will typically throw a corresponding exception.
* When returning normally, regular access will still be attempted.
* <p>The default implementation throws an {@link IllegalStateException}.
* @param msg the rejection message (common exception messages are designed
* so that they can be appended with an identifier separated by a space
* in custom subclasses)
* @since 7.0.4
*/
protected void onAccessRejected(String msg) {
throw new IllegalStateException(msg);
}
/**
* To be invoked after the main execution logic of concrete subclasses.
* @see #beforeAccess()

7
spring-core/src/test/java/org/springframework/core/task/SimpleAsyncTaskExecutorTests.java

@ -21,12 +21,9 @@ import java.util.concurrent.TimeUnit; @@ -21,12 +21,9 @@ import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import org.springframework.util.ConcurrencyThrottleSupport;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.willCallRealMethod;
@ -61,9 +58,9 @@ class SimpleAsyncTaskExecutorTests { @@ -61,9 +58,9 @@ class SimpleAsyncTaskExecutorTests {
@Test
void cannotExecuteWhenConcurrencyIsSwitchedOff() {
try (SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor()) {
executor.setConcurrencyLimit(ConcurrencyThrottleSupport.NO_CONCURRENCY);
executor.setConcurrencyLimit(SimpleAsyncTaskExecutor.NO_CONCURRENCY);
assertThat(executor.isThrottleActive()).isTrue();
assertThatIllegalStateException().isThrownBy(() -> executor.execute(new NoOpRunnable()));
assertThatExceptionOfType(TaskRejectedException.class).isThrownBy(() -> executor.execute(new NoOpRunnable()));
}
}

Loading…
Cancel
Save