diff --git a/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java b/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java index 11c65287059..adacee55f6d 100644 --- a/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java +++ b/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java @@ -503,6 +503,11 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker private Object evaluate(@Nullable Object cacheHit, CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) { + // Re-invocation in reactive pipeline after late cache hit determination? + if (contexts.processed) { + return cacheHit; + } + Object cacheValue; Object returnValue; @@ -541,6 +546,9 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker returnValue = returnOverride; } + // Mark as processed for re-invocation after late cache hit determination + contexts.processed = true; + return returnValue; } @@ -688,6 +696,8 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker private final boolean sync; + boolean processed; + public CacheOperationContexts(Collection operations, Method method, Object[] args, Object target, Class targetClass) { @@ -1082,21 +1092,25 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker return null; } if (adapter.isMultiValue()) { - return adapter.fromPublisher(Flux.from( - Mono.fromFuture(cachedFuture) - .flatMap(value -> (Mono) evaluate(Mono.justOrEmpty(unwrapCacheValue(value)), invoker, method, contexts))) - .flatMap(v -> (v instanceof Iterable iv ? Flux.fromIterable(iv) : Flux.just(v))) - .switchIfEmpty(Flux.defer(() -> (Flux) evaluate(null, invoker, method, contexts)))); + return adapter.fromPublisher(Flux.from(Mono.fromFuture(cachedFuture)) + .switchIfEmpty(Flux.defer(() -> (Flux) evaluate(null, invoker, method, contexts))) + .flatMap(v -> evaluate(valueToFlux(v, contexts), invoker, method, contexts))); } else { return adapter.fromPublisher(Mono.fromFuture(cachedFuture) - .flatMap(value -> (Mono) evaluate(Mono.justOrEmpty(unwrapCacheValue(value)), invoker, method, contexts)) - .switchIfEmpty(Mono.defer(() -> (Mono) evaluate(null, invoker, method, contexts)))); + .switchIfEmpty(Mono.defer(() -> (Mono) evaluate(null, invoker, method, contexts))) + .flatMap(v -> evaluate(Mono.justOrEmpty(unwrapCacheValue(v)), invoker, method, contexts))); } } return NOT_HANDLED; } + private Flux valueToFlux(Object value, CacheOperationContexts contexts) { + Object data = unwrapCacheValue(value); + return (!contexts.processed && data instanceof Iterable iterable ? Flux.fromIterable(iterable) : + (data != null ? Flux.just(data) : Flux.empty())); + } + @Nullable public Object processPutRequest(CachePutRequest request, @Nullable Object result) { ReactiveAdapter adapter = (result != null ? this.registry.getAdapter(result.getClass()) : null); diff --git a/spring-context/src/test/java/org/springframework/cache/CacheReproTests.java b/spring-context/src/test/java/org/springframework/cache/CacheReproTests.java index c97bc9db67e..c67102f7da8 100644 --- a/spring-context/src/test/java/org/springframework/cache/CacheReproTests.java +++ b/spring-context/src/test/java/org/springframework/cache/CacheReproTests.java @@ -202,9 +202,9 @@ class CacheReproTests { assertThat(bean.findById("tb2").join()).isNotSameAs(tb); assertThat(cache.get("tb2")).isNull(); - assertThat(bean.findByIdEmpty("").join()).isNull(); assertThat(bean.findByIdEmpty("").join()).isNull(); assertThat(cache.get("").get()).isNull(); + assertThat(bean.findByIdEmpty("").join()).isNull(); context.close(); } @@ -230,9 +230,9 @@ class CacheReproTests { assertThat(bean.findById("tb1").get()).isSameAs(tb); assertThat(cache.get("tb1").get()).isSameAs(tb); - assertThat(bean.findById("").join()).isNull(); assertThat(bean.findById("").join()).isNull(); assertThat(cache.get("").get()).isNull(); + assertThat(bean.findById("").join()).isNull(); context.close(); } @@ -265,9 +265,9 @@ class CacheReproTests { assertThat(bean.findById("tb2").block()).isNotSameAs(tb); assertThat(cache.get("tb2")).isNull(); - assertThat(bean.findByIdEmpty("").block()).isNull(); assertThat(bean.findByIdEmpty("").block()).isNull(); assertThat(cache.get("").get()).isNull(); + assertThat(bean.findByIdEmpty("").block()).isNull(); context.close(); } @@ -293,9 +293,9 @@ class CacheReproTests { assertThat(bean.findById("tb1").block()).isSameAs(tb); assertThat(cache.get("tb1").get()).isSameAs(tb); - assertThat(bean.findById("").block()).isNull(); assertThat(bean.findById("").block()).isNull(); assertThat(cache.get("").get()).isNull(); + assertThat(bean.findById("").block()).isNull(); context.close(); } @@ -328,9 +328,9 @@ class CacheReproTests { assertThat(bean.findById("tb2").collectList().block()).isNotEqualTo(tb); assertThat(cache.get("tb2")).isNull(); - assertThat(bean.findByIdEmpty("").collectList().block()).isEmpty(); assertThat(bean.findByIdEmpty("").collectList().block()).isEmpty(); assertThat(cache.get("").get()).isEqualTo(Collections.emptyList()); + assertThat(bean.findByIdEmpty("").collectList().block()).isEmpty(); context.close(); } @@ -356,9 +356,9 @@ class CacheReproTests { assertThat(bean.findById("tb1").collectList().block()).isEqualTo(tb); assertThat(cache.get("tb1").get()).isEqualTo(tb); - assertThat(bean.findById("").collectList().block()).isEmpty(); assertThat(bean.findById("").collectList().block()).isEmpty(); assertThat(cache.get("").get()).isEqualTo(Collections.emptyList()); + assertThat(bean.findById("").collectList().block()).isEmpty(); context.close(); } @@ -587,6 +587,8 @@ class CacheReproTests { public static class Spr14235FutureService { + private boolean emptyCalled; + @Cacheable(value = "itemCache", unless = "#result.name == 'tb2'") public CompletableFuture findById(String id) { return CompletableFuture.completedFuture(new TestBean(id)); @@ -594,6 +596,8 @@ class CacheReproTests { @Cacheable(value = "itemCache") public CompletableFuture findByIdEmpty(String id) { + assertThat(emptyCalled).isFalse(); + emptyCalled = true; return CompletableFuture.completedFuture(null); } @@ -611,9 +615,16 @@ class CacheReproTests { public static class Spr14235FutureServiceSync { + private boolean emptyCalled; + @Cacheable(value = "itemCache", sync = true) public CompletableFuture findById(String id) { - return CompletableFuture.completedFuture(id.isEmpty() ? null : new TestBean(id)); + if (id.isEmpty()) { + assertThat(emptyCalled).isFalse(); + emptyCalled = true; + return CompletableFuture.completedFuture(null); + } + return CompletableFuture.completedFuture(new TestBean(id)); } @CachePut(cacheNames = "itemCache", key = "#item.name") @@ -625,6 +636,8 @@ class CacheReproTests { public static class Spr14235MonoService { + private boolean emptyCalled; + @Cacheable(value = "itemCache", unless = "#result.name == 'tb2'") public Mono findById(String id) { return Mono.just(new TestBean(id)); @@ -632,6 +645,8 @@ class CacheReproTests { @Cacheable(value = "itemCache") public Mono findByIdEmpty(String id) { + assertThat(emptyCalled).isFalse(); + emptyCalled = true; return Mono.empty(); } @@ -649,9 +664,16 @@ class CacheReproTests { public static class Spr14235MonoServiceSync { + private boolean emptyCalled; + @Cacheable(value = "itemCache", sync = true) public Mono findById(String id) { - return (id.isEmpty() ? Mono.empty() : Mono.just(new TestBean(id))); + if (id.isEmpty()) { + assertThat(emptyCalled).isFalse(); + emptyCalled = true; + return Mono.empty(); + } + return Mono.just(new TestBean(id)); } @CachePut(cacheNames = "itemCache", key = "#item.name") @@ -665,6 +687,8 @@ class CacheReproTests { private int counter = 0; + private boolean emptyCalled; + @Cacheable(value = "itemCache", unless = "#result[0].name == 'tb2'") public Flux findById(String id) { return Flux.just(new TestBean(id), new TestBean(id + (counter++))); @@ -672,6 +696,8 @@ class CacheReproTests { @Cacheable(value = "itemCache") public Flux findByIdEmpty(String id) { + assertThat(emptyCalled).isFalse(); + emptyCalled = true; return Flux.empty(); } @@ -691,9 +717,16 @@ class CacheReproTests { private int counter = 0; + private boolean emptyCalled; + @Cacheable(value = "itemCache", sync = true) public Flux findById(String id) { - return (id.isEmpty() ? Flux.empty() : Flux.just(new TestBean(id), new TestBean(id + (counter++)))); + if (id.isEmpty()) { + assertThat(emptyCalled).isFalse(); + emptyCalled = true; + return Flux.empty(); + } + return Flux.just(new TestBean(id), new TestBean(id + (counter++))); } @CachePut(cacheNames = "itemCache", key = "#id") diff --git a/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java b/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java index dcc680d7930..74b716add76 100644 --- a/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java +++ b/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java @@ -32,6 +32,7 @@ import org.springframework.cache.concurrent.ConcurrentMapCacheManager; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.lang.Nullable; import static org.assertj.core.api.Assertions.assertThat; @@ -171,6 +172,11 @@ public class ReactiveCachingTests { public CompletableFuture retrieve(Object key) { return CompletableFuture.completedFuture(lookup(key)); } + @Override + public void put(Object key, @Nullable Object value) { + assertThat(get(key) == null).as("Double put"); + super.put(key, value); + } }; } }; @@ -193,6 +199,11 @@ public class ReactiveCachingTests { Object value = lookup(key); return CompletableFuture.completedFuture(value != null ? toValueWrapper(value) : null); } + @Override + public void put(Object key, @Nullable Object value) { + assertThat(get(key) == null).as("Double put"); + super.put(key, value); + } }; } };