@ -1,5 +1,5 @@
@@ -1,5 +1,5 @@
/ *
* Copyright 2002 - 2024 the original author or authors .
* Copyright 2002 - 2025 the original author or authors .
*
* Licensed under the Apache License , Version 2 . 0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
@ -19,7 +19,9 @@ package org.springframework.cache.annotation;
@@ -19,7 +19,9 @@ package org.springframework.cache.annotation;
import java.util.List ;
import java.util.concurrent.CompletableFuture ;
import java.util.concurrent.CompletionException ;
import java.util.concurrent.atomic.AtomicBoolean ;
import java.util.concurrent.atomic.AtomicLong ;
import java.util.function.Supplier ;
import org.junit.jupiter.api.Test ;
import org.junit.jupiter.params.ParameterizedTest ;
@ -40,6 +42,7 @@ import org.springframework.context.annotation.Configuration;
@@ -40,6 +42,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.lang.Nullable ;
import static org.assertj.core.api.Assertions.assertThat ;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType ;
import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable ;
/ * *
@ -58,8 +61,8 @@ class ReactiveCachingTests {
@@ -58,8 +61,8 @@ class ReactiveCachingTests {
LateCacheHitDeterminationWithValueWrapperConfig . class } )
void cacheHitDetermination ( Class < ? > configClass ) {
AnnotationConfigApplicationContext ctx =
new AnnotationConfigApplicationContext ( configClass , ReactiveCacheableService . class ) ;
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext (
configClass , ReactiveCacheableService . class ) ;
ReactiveCacheableService service = ctx . getBean ( ReactiveCacheableService . class ) ;
Object key = new Object ( ) ;
@ -119,58 +122,58 @@ class ReactiveCachingTests {
@@ -119,58 +122,58 @@ class ReactiveCachingTests {
ctx . close ( ) ;
}
@Test
void cacheErrorHandlerWithLoggingCacheErrorHandler ( ) {
AnnotationConfigApplicationContext ctx =
new AnnotationConfigApplicationContext ( ExceptionCacheManager . class , ReactiveCacheableService . class , ErrorHandlerCachingConfiguration . class ) ;
@ParameterizedTest
@ValueSource ( classes = { EarlyCacheHitDeterminationConfig . class ,
EarlyCacheHitDeterminationWithoutNullValuesConfig . class ,
LateCacheHitDeterminationConfig . class ,
LateCacheHitDeterminationWithValueWrapperConfig . class } )
void fluxCacheDoesntDependOnFirstRequest ( Class < ? > configClass ) {
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext (
configClass , ReactiveCacheableService . class ) ;
ReactiveCacheableService service = ctx . getBean ( ReactiveCacheableService . class ) ;
Object key = new Object ( ) ;
Long r1 = service . cacheFuture ( key ) . join ( ) ;
assertThat ( r1 ) . isNotNull ( ) ;
assertThat ( r1 ) . as ( "cacheFuture" ) . isEqualTo ( 0L ) ;
key = new Object ( ) ;
r1 = service . cacheMono ( key ) . block ( ) ;
assertThat ( r1 ) . isNotNull ( ) ;
assertThat ( r1 ) . as ( "cacheMono" ) . isEqualTo ( 1L ) ;
List < Long > l1 = service . cacheFlux ( key ) . take ( 1L , true ) . collectList ( ) . block ( ) ;
List < Long > l2 = service . cacheFlux ( key ) . take ( 3L , true ) . collectList ( ) . block ( ) ;
List < Long > l3 = service . cacheFlux ( key ) . collectList ( ) . block ( ) ;
key = new Object ( ) ;
Long first = l1 . get ( 0 ) ;
r1 = service . cacheFlux ( key ) . blockFirst ( ) ;
assertThat ( l1 ) . as ( "l1" ) . containsExactly ( first ) ;
assertThat ( l2 ) . as ( "l2" ) . containsExactly ( first , 0L , - 1L ) ;
assertThat ( l3 ) . as ( "l3" ) . containsExactly ( first , 0L , - 1L , - 2L , - 3L ) ;
assertThat ( r1 ) . isNotNull ( ) ;
assertThat ( r1 ) . as ( "cacheFlux blockFirst" ) . isEqualTo ( 2L ) ;
ctx . close ( ) ;
}
@Test
void cacheErrorHandlerWithLoggingCacheErrorHandlerAndMethodErro r ( ) {
AnnotationConfigApplicationContext ctx =
new AnnotationConfigApplicationContext ( ExceptionCacheManager . class , ReactiveFailure CacheableService . class , ErrorHandlerCachingConfiguration . class ) ;
void cacheErrorHandlerWithSimpleCacheErrorHandle r ( ) {
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext (
ExceptionCacheManager . class , ReactiveCacheableService . class ) ;
ReactiveCacheableService service = ctx . getBean ( ReactiveCacheableService . class ) ;
Object key = new Object ( ) ;
StepVerifier . create ( service . cacheMono ( key ) )
. expectErrorMessage ( "mono service error" )
. verify ( ) ;
Throwable completableFutureThrowable = catchThrowable ( ( ) - > service . cacheFuture ( new Object ( ) ) . join ( ) ) ;
assertThat ( completableFutureThrowable ) . isInstanceOf ( CompletionException . class )
. extracting ( Throwable : : getCause )
. isInstanceOf ( UnsupportedOperationException . class ) ;
key = new Object ( ) ;
StepVerifier . create ( service . cacheFlux ( key ) )
. expectErrorMessage ( "flux service error" )
. verify ( ) ;
Throwable monoThrowable = catchThrowable ( ( ) - > service . cacheMono ( new Object ( ) ) . block ( ) ) ;
assertThat ( monoThrowable ) . isInstanceOf ( UnsupportedOperationException . class ) ;
Throwable fluxThrowable = catchThrowable ( ( ) - > service . cacheFlux ( new Object ( ) ) . blockFirst ( ) ) ;
assertThat ( fluxThrowable ) . isInstanceOf ( UnsupportedOperationException . class ) ;
}
@Test
void cacheErrorHandlerWithSimpleCacheErrorHandler ( ) {
AnnotationConfigApplicationContext ctx =
new AnnotationConfigApplicationContext ( ExceptionCacheManager . class , ReactiveCacheableService . class ) ;
ReactiveCacheableService service = ctx . getBean ( ReactiveCacheableService . class ) ;
void cacheErrorHandlerWithSimpleCacheErrorHandlerAndSync ( ) {
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext (
ExceptionCacheManager . class , ReactiveSync CacheableService . class ) ;
ReactiveSync CacheableService service = ctx . getBean ( ReactiveSync CacheableService . class ) ;
Throwable completableFuturThrowable = catchThrowable ( ( ) - > service . cacheFuture ( new Object ( ) ) . join ( ) ) ;
assertThat ( completableFuturThrowable ) . isInstanceOf ( CompletionException . class )
Throwable completableFuture Throwable = catchThrowable ( ( ) - > service . cacheFuture ( new Object ( ) ) . join ( ) ) ;
assertThat ( completableFuture Throwable ) . isInstanceOf ( CompletionException . class )
. extracting ( Throwable : : getCause )
. isInstanceOf ( UnsupportedOperationException . class ) ;
@ -181,32 +184,81 @@ class ReactiveCachingTests {
@@ -181,32 +184,81 @@ class ReactiveCachingTests {
assertThat ( fluxThrowable ) . isInstanceOf ( UnsupportedOperationException . class ) ;
}
@ParameterizedTest
@ValueSource ( classes = { EarlyCacheHitDeterminationConfig . class ,
EarlyCacheHitDeterminationWithoutNullValuesConfig . class ,
LateCacheHitDeterminationConfig . class ,
LateCacheHitDeterminationWithValueWrapperConfig . class } )
void fluxCacheDoesntDependOnFirstRequest ( Class < ? > configClass ) {
@Test
void cacheErrorHandlerWithLoggingCacheErrorHandler ( ) {
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext (
ExceptionCacheManager . class , ReactiveCacheableService . class , ErrorHandlerCachingConfiguration . class ) ;
ReactiveCacheableService service = ctx . getBean ( ReactiveCacheableService . class ) ;
Long r1 = service . cacheFuture ( new Object ( ) ) . join ( ) ;
assertThat ( r1 ) . isNotNull ( ) ;
assertThat ( r1 ) . as ( "cacheFuture" ) . isEqualTo ( 0L ) ;
r1 = service . cacheMono ( new Object ( ) ) . block ( ) ;
assertThat ( r1 ) . isNotNull ( ) ;
assertThat ( r1 ) . as ( "cacheMono" ) . isEqualTo ( 1L ) ;
r1 = service . cacheFlux ( new Object ( ) ) . blockFirst ( ) ;
assertThat ( r1 ) . isNotNull ( ) ;
assertThat ( r1 ) . as ( "cacheFlux blockFirst" ) . isEqualTo ( 2L ) ;
}
@Test
void cacheErrorHandlerWithLoggingCacheErrorHandlerAndSync ( ) {
AnnotationConfigApplicationContext ctx =
new AnnotationConfigApplicationContext ( configClass , ReactiveCacheableService . class ) ;
ReactiveCacheableService service = ctx . getBean ( ReactiveCacheableService . class ) ;
new AnnotationConfigApplicationContext ( ExceptionCacheManager . c lass, ReactiveSync CacheableService . class , ErrorHandlerCachingConfiguration . class ) ;
ReactiveSync CacheableService service = ctx . getBean ( ReactiveSync CacheableService . class ) ;
Object key = new Object ( ) ;
Long r1 = service . cacheFuture ( new Object ( ) ) . join ( ) ;
assertThat ( r1 ) . isNotNull ( ) ;
assertThat ( r1 ) . as ( "cacheFuture" ) . isEqualTo ( 0L ) ;
List < Long > l1 = service . cacheFlux ( key ) . take ( 1L , true ) . collectList ( ) . block ( ) ;
List < Long > l2 = service . cacheFlux ( key ) . take ( 3L , true ) . collectList ( ) . block ( ) ;
List < Long > l3 = service . cacheFlux ( key ) . collectList ( ) . block ( ) ;
r 1 = service . cacheMono ( new Object ( ) ) . block ( ) ;
assertThat ( r1 ) . isNotNull ( ) ;
assertThat ( r1 ) . as ( "cacheMono" ) . isEqualTo ( 1L ) ;
Long first = l1 . get ( 0 ) ;
r1 = service . cacheFlux ( new Object ( ) ) . blockFirst ( ) ;
assertThat ( r1 ) . isNotNull ( ) ;
assertThat ( r1 ) . as ( "cacheFlux blockFirst" ) . isEqualTo ( 2L ) ;
}
assertThat ( l1 ) . as ( "l1" ) . containsExactly ( first ) ;
assertThat ( l2 ) . as ( "l2" ) . containsExactly ( first , 0L , - 1L ) ;
assertThat ( l3 ) . as ( "l3" ) . containsExactly ( first , 0L , - 1L , - 2L , - 3L ) ;
@Test
void cacheErrorHandlerWithLoggingCacheErrorHandlerAndOperationException ( ) {
AnnotationConfigApplicationContext ctx =
new AnnotationConfigApplicationContext ( EarlyCacheHitDeterminationConfig . class , ReactiveFailureCacheableService . class , ErrorHandlerCachingConfiguration . class ) ;
ReactiveFailureCacheableService service = ctx . getBean ( ReactiveFailureCacheableService . class ) ;
ctx . close ( ) ;
assertThatExceptionOfType ( CompletionException . class ) . isThrownBy ( ( ) - > service . cacheFuture ( new Object ( ) ) . join ( ) )
. withMessage ( IllegalStateException . class . getName ( ) + ": future service error" ) ;
StepVerifier . create ( service . cacheMono ( new Object ( ) ) )
. expectErrorMessage ( "mono service error" )
. verify ( ) ;
StepVerifier . create ( service . cacheFlux ( new Object ( ) ) )
. expectErrorMessage ( "flux service error" )
. verify ( ) ;
}
@Test
void cacheErrorHandlerWithLoggingCacheErrorHandlerAndOperationExceptionAndSync ( ) {
AnnotationConfigApplicationContext ctx =
new AnnotationConfigApplicationContext ( EarlyCacheHitDeterminationConfig . class , ReactiveSyncFailureCacheableService . class , ErrorHandlerCachingConfiguration . class ) ;
ReactiveSyncFailureCacheableService service = ctx . getBean ( ReactiveSyncFailureCacheableService . class ) ;
assertThatExceptionOfType ( CompletionException . class ) . isThrownBy ( ( ) - > service . cacheFuture ( new Object ( ) ) . join ( ) )
. withMessage ( IllegalStateException . class . getName ( ) + ": future service error" ) ;
StepVerifier . create ( service . cacheMono ( new Object ( ) ) )
. expectErrorMessage ( "mono service error" )
. verify ( ) ;
StepVerifier . create ( service . cacheFlux ( new Object ( ) ) )
. expectErrorMessage ( "flux service error" )
. verify ( ) ;
}
@CacheConfig ( cacheNames = "first" )
static class ReactiveCacheableService {
@ -232,16 +284,94 @@ class ReactiveCachingTests {
@@ -232,16 +284,94 @@ class ReactiveCachingTests {
}
}
@CacheConfig ( cacheNames = "first" )
static class ReactiveSyncCacheableService {
private final AtomicLong counter = new AtomicLong ( ) ;
@Cacheable ( sync = true )
CompletableFuture < Long > cacheFuture ( Object arg ) {
return CompletableFuture . completedFuture ( this . counter . getAndIncrement ( ) ) ;
}
@Cacheable ( sync = true )
Mono < Long > cacheMono ( Object arg ) {
return Mono . defer ( ( ) - > Mono . just ( this . counter . getAndIncrement ( ) ) ) ;
}
@Cacheable ( sync = true )
Flux < Long > cacheFlux ( Object arg ) {
return Flux . defer ( ( ) - > Flux . just ( this . counter . getAndIncrement ( ) , 0L , - 1L , - 2L , - 3L ) ) ;
}
}
@CacheConfig ( cacheNames = "first" )
static class ReactiveFailureCacheableService extends ReactiveCacheableService {
static class ReactiveFailureCacheableService {
private final AtomicBoolean cacheFutureInvoked = new AtomicBoolean ( ) ;
private final AtomicBoolean cacheMonoInvoked = new AtomicBoolean ( ) ;
private final AtomicBoolean cacheFluxInvoked = new AtomicBoolean ( ) ;
@Cacheable
CompletableFuture < Long > cacheFuture ( Object arg ) {
if ( ! this . cacheFutureInvoked . compareAndSet ( false , true ) ) {
return CompletableFuture . failedFuture ( new IllegalStateException ( "future service invoked twice" ) ) ;
}
return CompletableFuture . failedFuture ( new IllegalStateException ( "future service error" ) ) ;
}
@Cacheable
Mono < Long > cacheMono ( Object arg ) {
if ( ! this . cacheMonoInvoked . compareAndSet ( false , true ) ) {
return Mono . error ( new IllegalStateException ( "mono service invoked twice" ) ) ;
}
return Mono . error ( new IllegalStateException ( "mono service error" ) ) ;
}
@Cacheable
Flux < Long > cacheFlux ( Object arg ) {
if ( ! this . cacheFluxInvoked . compareAndSet ( false , true ) ) {
return Flux . error ( new IllegalStateException ( "flux service invoked twice" ) ) ;
}
return Flux . error ( new IllegalStateException ( "flux service error" ) ) ;
}
}
@CacheConfig ( cacheNames = "first" )
static class ReactiveSyncFailureCacheableService {
private final AtomicBoolean cacheFutureInvoked = new AtomicBoolean ( ) ;
private final AtomicBoolean cacheMonoInvoked = new AtomicBoolean ( ) ;
private final AtomicBoolean cacheFluxInvoked = new AtomicBoolean ( ) ;
@Cacheable ( sync = true )
CompletableFuture < Long > cacheFuture ( Object arg ) {
if ( ! this . cacheFutureInvoked . compareAndSet ( false , true ) ) {
return CompletableFuture . failedFuture ( new IllegalStateException ( "future service invoked twice" ) ) ;
}
return CompletableFuture . failedFuture ( new IllegalStateException ( "future service error" ) ) ;
}
@Cacheable ( sync = true )
Mono < Long > cacheMono ( Object arg ) {
if ( ! this . cacheMonoInvoked . compareAndSet ( false , true ) ) {
return Mono . error ( new IllegalStateException ( "mono service invoked twice" ) ) ;
}
return Mono . error ( new IllegalStateException ( "mono service error" ) ) ;
}
@Cacheable ( sync = true )
Flux < Long > cacheFlux ( Object arg ) {
if ( ! this . cacheFluxInvoked . compareAndSet ( false , true ) ) {
return Flux . error ( new IllegalStateException ( "flux service invoked twice" ) ) ;
}
return Flux . error ( new IllegalStateException ( "flux service error" ) ) ;
}
}
@ -323,6 +453,7 @@ class ReactiveCachingTests {
@@ -323,6 +453,7 @@ class ReactiveCachingTests {
}
}
@Configuration
static class ErrorHandlerCachingConfiguration implements CachingConfigurer {
@ -333,6 +464,7 @@ class ReactiveCachingTests {
@@ -333,6 +464,7 @@ class ReactiveCachingTests {
}
}
@Configuration ( proxyBeanMethods = false )
@EnableCaching
static class ExceptionCacheManager {
@ -345,11 +477,12 @@ class ReactiveCachingTests {
@@ -345,11 +477,12 @@ class ReactiveCachingTests {
return new ConcurrentMapCache ( name , isAllowNullValues ( ) ) {
@Override
public CompletableFuture < ? > retrieve ( Object key ) {
return CompletableFuture . supplyAsync ( ( ) - > {
throw new UnsupportedOperationException ( "Test exception on retrieve" ) ;
} ) ;
return CompletableFuture . failedFuture ( new UnsupportedOperationException ( "Test exception on retrieve" ) ) ;
}
@Override
public < T > CompletableFuture < T > retrieve ( Object key , Supplier < CompletableFuture < T > > valueLoader ) {
return CompletableFuture . failedFuture ( new UnsupportedOperationException ( "Test exception on retrieve" ) ) ;
}
@Override
public void put ( Object key , @Nullable Object value ) {
throw new UnsupportedOperationException ( "Test exception on put" ) ;