@ -23,9 +23,11 @@ import java.util.concurrent.atomic.AtomicInteger;
@@ -23,9 +23,11 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Test ;
import org.springframework.aop.config.AopConfigUtils ;
import org.springframework.aop.framework.AopProxyUtils ;
import org.springframework.aop.framework.ProxyFactory ;
import org.springframework.aop.interceptor.ConcurrencyThrottleInterceptor ;
import org.springframework.aop.support.AopUtils ;
import org.springframework.beans.factory.support.DefaultListableBeanFactory ;
import org.springframework.beans.factory.support.RootBeanDefinition ;
import org.springframework.context.annotation.AnnotationConfigApplicationContext ;
@ -77,6 +79,32 @@ class ConcurrencyLimitTests {
@@ -77,6 +79,32 @@ class ConcurrencyLimitTests {
assertThat ( target . current ) . hasValue ( 0 ) ;
}
@Test
void withPostProcessorForMethodWithInterface ( ) {
AnnotatedInterface proxy = createProxy ( AnnotatedMethodBeanWithInterface . class , AnnotatedInterface . class , false ) ;
AnnotatedMethodBeanWithInterface target = ( AnnotatedMethodBeanWithInterface ) AopProxyUtils . getSingletonTarget ( proxy ) ;
List < CompletableFuture < ? > > futures = new ArrayList < > ( 10 ) ;
for ( int i = 0 ; i < 10 ; i + + ) {
futures . add ( CompletableFuture . runAsync ( proxy : : concurrentOperation ) ) ;
}
CompletableFuture . allOf ( futures . toArray ( new CompletableFuture [ 0 ] ) ) . join ( ) ;
assertThat ( target . current ) . hasValue ( 0 ) ;
}
@Test
void withPostProcessorForMethodWithInterfaceAndDefaultTargetClass ( ) {
AnnotatedInterface proxy = createProxy ( AnnotatedMethodBeanWithInterface . class , AnnotatedInterface . class , true ) ;
AnnotatedMethodBeanWithInterface target = ( AnnotatedMethodBeanWithInterface ) AopProxyUtils . getSingletonTarget ( proxy ) ;
List < CompletableFuture < ? > > futures = new ArrayList < > ( 10 ) ;
for ( int i = 0 ; i < 10 ; i + + ) {
futures . add ( CompletableFuture . runAsync ( proxy : : concurrentOperation ) ) ;
}
CompletableFuture . allOf ( futures . toArray ( new CompletableFuture [ 0 ] ) ) . join ( ) ;
assertThat ( target . current ) . hasValue ( 0 ) ;
}
@Test
void withPostProcessorForMethodWithUnboundedConcurrency ( ) {
AnnotatedMethodBean proxy = createProxy ( AnnotatedMethodBean . class ) ;
@ -201,12 +229,21 @@ class ConcurrencyLimitTests {
@@ -201,12 +229,21 @@ class ConcurrencyLimitTests {
private static < T > T createProxy ( Class < T > beanClass ) {
return createProxy ( beanClass , beanClass , true ) ;
}
private static < T > T createProxy ( Class < ? > beanClass , Class < T > exposedClass , boolean proxyTargetClass ) {
DefaultListableBeanFactory bf = new DefaultListableBeanFactory ( ) ;
if ( exposedClass . isInterface ( ) & & proxyTargetClass ) {
AopConfigUtils . forceAutoProxyCreatorToUseClassProxying ( bf ) ;
}
bf . registerBeanDefinition ( "bean" , new RootBeanDefinition ( beanClass ) ) ;
ConcurrencyLimitBeanPostProcessor bpp = new ConcurrencyLimitBeanPostProcessor ( ) ;
bpp . setBeanFactory ( bf ) ;
bf . addBeanPostProcessor ( bpp ) ;
return bf . getBean ( beanClass ) ;
T proxy = bf . getBean ( exposedClass ) ;
assertThat ( proxyTargetClass ? AopUtils . isCglibProxy ( proxy ) : AopUtils . isJdkDynamicProxy ( proxy ) ) . isTrue ( ) ;
return proxy ;
}
@ -274,6 +311,33 @@ class ConcurrencyLimitTests {
@@ -274,6 +311,33 @@ class ConcurrencyLimitTests {
}
static class AnnotatedMethodBeanWithInterface implements AnnotatedInterface {
final AtomicInteger current = new AtomicInteger ( ) ;
@Override
public void concurrentOperation ( ) {
if ( current . incrementAndGet ( ) > 2 ) {
throw new IllegalStateException ( ) ;
}
try {
Thread . sleep ( 100 ) ;
}
catch ( InterruptedException ex ) {
throw new IllegalStateException ( ex ) ;
}
current . decrementAndGet ( ) ;
}
}
interface AnnotatedInterface {
@ConcurrencyLimit ( 2 )
void concurrentOperation ( ) ;
}
@ConcurrencyLimit ( 2 )
static class AnnotatedClassBean {