@ -44,7 +44,7 @@ import org.springframework.util.StringUtils;
@@ -44,7 +44,7 @@ import org.springframework.util.StringUtils;
* cases without a dependency on optional classes .
*
* @author Simon Baslé
* @since 6 . 1 . 0
* @since 6 . 1
* /
abstract class ScheduledAnnotationReactiveSupport {
@ -54,7 +54,8 @@ abstract class ScheduledAnnotationReactiveSupport {
@@ -54,7 +54,8 @@ abstract class ScheduledAnnotationReactiveSupport {
static final boolean coroutinesReactorPresent = ClassUtils . isPresent (
"kotlinx.coroutines.reactor.MonoKt" , ScheduledAnnotationReactiveSupport . class . getClassLoader ( ) ) ;
private static final Log LOGGER = LogFactory . getLog ( ScheduledAnnotationReactiveSupport . class ) ;
private static final Log logger = LogFactory . getLog ( ScheduledAnnotationReactiveSupport . class ) ;
/ * *
* Checks that if the method is reactive , it can be scheduled . Methods are considered
@ -72,10 +73,10 @@ abstract class ScheduledAnnotationReactiveSupport {
@@ -72,10 +73,10 @@ abstract class ScheduledAnnotationReactiveSupport {
if ( KotlinDetector . isKotlinPresent ( ) & & KotlinDetector . isSuspendingFunction ( method ) ) {
// Note that suspending functions declared without args have a single Continuation
// parameter in reflective inspection
Assert . isTrue ( method . getParameterCount ( ) = = 1 , "Kotlin suspending functions may only be "
+ " annotated with @Scheduled if declared without arguments") ;
Assert . isTrue ( coroutinesReactorPresent , "Kotlin suspending functions may only be annotated with "
+ "@Scheduled if the Coroutine-Reactor bridge (kotlinx.coroutines.reactor) is present at runtime" ) ;
Assert . isTrue ( method . getParameterCount ( ) = = 1 ,
"Kotlin suspending functions may only be annotated with @Scheduled if declared without arguments") ;
Assert . isTrue ( coroutinesReactorPresent , "Kotlin suspending functions may only be annotated with " +
"@Scheduled if the Coroutine-Reactor bridge (kotlinx.coroutines.reactor) is present at runtime" ) ;
return true ;
}
ReactiveAdapterRegistry registry = ReactiveAdapterRegistry . getSharedInstance ( ) ;
@ -87,10 +88,10 @@ abstract class ScheduledAnnotationReactiveSupport {
@@ -87,10 +88,10 @@ abstract class ScheduledAnnotationReactiveSupport {
if ( candidateAdapter = = null ) {
return false ;
}
Assert . isTrue ( method . getParameterCount ( ) = = 0 , "Reactive methods may only be annotated with "
+ " @Scheduled if declared without arguments") ;
Assert . isTrue ( candidateAdapter . getDescriptor ( ) . isDeferred ( ) , "Reactive methods may only be annotated with "
+ " @Scheduled if the return type supports deferred execution") ;
Assert . isTrue ( method . getParameterCount ( ) = = 0 ,
"Reactive methods may only be annotated with @Scheduled if declared without arguments") ;
Assert . isTrue ( candidateAdapter . getDescriptor ( ) . isDeferred ( ) ,
"Reactive methods may only be annotated with @Scheduled if the return type supports deferred execution") ;
return true ;
}
@ -112,23 +113,23 @@ abstract class ScheduledAnnotationReactiveSupport {
@@ -112,23 +113,23 @@ abstract class ScheduledAnnotationReactiveSupport {
Class < ? > returnType = method . getReturnType ( ) ;
ReactiveAdapter adapter = registry . getAdapter ( returnType ) ;
if ( adapter = = null ) {
throw new IllegalArgumentException ( "Cannot convert the @Scheduled reactive method return type to Publisher" ) ;
throw new IllegalArgumentException ( "Cannot convert @Scheduled reactive method return type to Publisher" ) ;
}
if ( ! adapter . getDescriptor ( ) . isDeferred ( ) ) {
throw new IllegalArgumentException ( "Cannot convert the @Scheduled reactive method return type to Publisher: "
+ returnType . getSimpleName ( ) + " is not a deferred reactive type" ) ;
throw new IllegalArgumentException ( "Cannot convert @Scheduled reactive method return type to Publisher: " +
returnType . getSimpleName ( ) + " is not a deferred reactive type" ) ;
}
Method invocableMethod = AopUtils . selectInvocableMethod ( method , bean . getClass ( ) ) ;
try {
ReflectionUtils . makeAccessible ( invocableMethod ) ;
Object r = invocableMethod . invoke ( bean ) ;
Object returnValue = invocableMethod . invoke ( bean ) ;
Publisher < ? > publisher = adapter . toPublisher ( r ) ;
Publisher < ? > publisher = adapter . toPublisher ( returnValue ) ;
// If Reactor is on the classpath, we could benefit from having a checkpoint for debuggability
if ( reactorPresent ) {
final String checkpoint = "@Scheduled '" + method . getName ( ) + "()' in bean '"
+ method . getDeclaringClass ( ) . getName ( ) + "'" ;
return Flux . from ( publisher ) . checkpoint ( checkpoint ) ;
return Flux . from ( publisher ) . checkpoint (
"@Scheduled '" + method . getName ( ) + "()' in '" + method . getDeclaringClass ( ) . getName ( ) + "'" ) ;
}
else {
return publisher ;
@ -136,12 +137,12 @@ abstract class ScheduledAnnotationReactiveSupport {
@@ -136,12 +137,12 @@ abstract class ScheduledAnnotationReactiveSupport {
}
catch ( InvocationTargetException ex ) {
throw new IllegalArgumentException (
"Cannot obtain a Publisher-convertible value from the @Scheduled reactive method" ,
ex . getTargetException ( ) ) ;
"Cannot obtain a Publisher-convertible value from the @Scheduled reactive method" ,
ex . getTargetException ( ) ) ;
}
catch ( IllegalAccessException ex ) {
throw new IllegalArgumentException (
"Cannot obtain a Publisher-convertible value from the @Scheduled reactive method" , ex ) ;
"Cannot obtain a Publisher-convertible value from the @Scheduled reactive method" , ex ) ;
}
}
@ -157,20 +158,24 @@ abstract class ScheduledAnnotationReactiveSupport {
@@ -157,20 +158,24 @@ abstract class ScheduledAnnotationReactiveSupport {
* /
static Runnable createSubscriptionRunnable ( Method method , Object targetBean , Scheduled scheduled ,
List < Runnable > subscriptionTrackerRegistry ) {
boolean shouldBlock = scheduled . fixedDelay ( ) > 0 | | StringUtils . hasText ( scheduled . fixedDelayString ( ) ) ;
final Publisher < ? > publisher = getPublisherFor ( method , targetBean ) ;
boolean shouldBlock = ( scheduled . fixedDelay ( ) > 0 | | StringUtils . hasText ( scheduled . fixedDelayString ( ) ) ) ;
Publisher < ? > publisher = getPublisherFor ( method , targetBean ) ;
return new SubscribingRunnable ( publisher , shouldBlock , subscriptionTrackerRegistry ) ;
}
/ * *
* Utility implementation of { @code Runnable } that subscribes to a { @code Publisher }
* or subscribes - then - blocks if { @code shouldBlock } is set to { @code true } .
* /
static final class SubscribingRunnable implements Runnable {
final Publisher < ? > publisher ;
private final Publisher < ? > publisher ;
final boolean shouldBlock ;
final List < Runnable > subscriptionTrackerRegistry ;
private final List < Runnable > subscriptionTrackerRegistry ;
SubscribingRunnable ( Publisher < ? > publisher , boolean shouldBlock , List < Runnable > subscriptionTrackerRegistry ) {
this . publisher = publisher ;
@ -181,7 +186,7 @@ abstract class ScheduledAnnotationReactiveSupport {
@@ -181,7 +186,7 @@ abstract class ScheduledAnnotationReactiveSupport {
@Override
public void run ( ) {
if ( this . shouldBlock ) {
final CountDownLatch latch = new CountDownLatch ( 1 ) ;
CountDownLatch latch = new CountDownLatch ( 1 ) ;
TrackingSubscriber subscriber = new TrackingSubscriber ( this . subscriptionTrackerRegistry , latch ) ;
this . subscriptionTrackerRegistry . add ( subscriber ) ;
this . publisher . subscribe ( subscriber ) ;
@ -193,13 +198,14 @@ abstract class ScheduledAnnotationReactiveSupport {
@@ -193,13 +198,14 @@ abstract class ScheduledAnnotationReactiveSupport {
}
}
else {
final TrackingSubscriber subscriber = new TrackingSubscriber ( this . subscriptionTrackerRegistry ) ;
TrackingSubscriber subscriber = new TrackingSubscriber ( this . subscriptionTrackerRegistry ) ;
this . subscriptionTrackerRegistry . add ( subscriber ) ;
this . publisher . subscribe ( subscriber ) ;
}
}
}
/ * *
* A { @code Subscriber } which keeps track of its { @code Subscription } and exposes the
* capacity to cancel the subscription as a { @code Runnable } . Can optionally support
@ -246,13 +252,13 @@ abstract class ScheduledAnnotationReactiveSupport {
@@ -246,13 +252,13 @@ abstract class ScheduledAnnotationReactiveSupport {
@Override
public void onNext ( Object obj ) {
// NO-OP
// no-op
}
@Override
public void onError ( Throwable ex ) {
this . subscriptionTrackerRegistry . remove ( this ) ;
LOGGER . warn ( "Unexpected error occurred in scheduled reactive task" , ex ) ;
logger . warn ( "Unexpected error occurred in scheduled reactive task" , ex ) ;
if ( this . blockingLatch ! = null ) {
this . blockingLatch . countDown ( ) ;
}