From 1410c466b7c18b5cba2fd349091c0ad2751d1669 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Tue, 21 Nov 2023 23:11:34 +0100 Subject: [PATCH] Support for late-determined cache misses from retrieve(key) Closes gh-31637 --- .../spring-context-support.gradle | 11 +- .../CaffeineReactiveCachingTests.java | 141 +++++++++++++ .../java/org/springframework/cache/Cache.java | 32 ++- .../cache/interceptor/CacheAspectSupport.java | 199 ++++++++++-------- .../cache/CacheReproTests.java | 6 + .../annotation/ReactiveCachingTests.java | 141 +++++++++++-- 6 files changed, 411 insertions(+), 119 deletions(-) create mode 100644 spring-context-support/src/test/java/org/springframework/cache/caffeine/CaffeineReactiveCachingTests.java diff --git a/spring-context-support/spring-context-support.gradle b/spring-context-support/spring-context-support.gradle index 715b922b680..a2f0c083e73 100644 --- a/spring-context-support/spring-context-support.gradle +++ b/spring-context-support/spring-context-support.gradle @@ -6,12 +6,12 @@ dependencies { api(project(":spring-core")) optional(project(":spring-jdbc")) // for Quartz support optional(project(":spring-tx")) // for Quartz support + optional("com.github.ben-manes.caffeine:caffeine") optional("jakarta.activation:jakarta.activation-api") optional("jakarta.mail:jakarta.mail-api") optional("javax.cache:cache-api") - optional("com.github.ben-manes.caffeine:caffeine") - optional("org.quartz-scheduler:quartz") optional("org.freemarker:freemarker") + optional("org.quartz-scheduler:quartz") testFixturesApi("org.junit.jupiter:junit-jupiter-api") testFixturesImplementation("org.assertj:assertj-core") testFixturesImplementation("org.mockito:mockito-core") @@ -20,10 +20,11 @@ dependencies { testImplementation(testFixtures(project(":spring-context"))) testImplementation(testFixtures(project(":spring-core"))) testImplementation(testFixtures(project(":spring-tx"))) - testImplementation("org.hsqldb:hsqldb") + testImplementation("io.projectreactor:reactor-core") testImplementation("jakarta.annotation:jakarta.annotation-api") - testRuntimeOnly("org.ehcache:jcache") + testImplementation("org.hsqldb:hsqldb") + testRuntimeOnly("com.sun.mail:jakarta.mail") testRuntimeOnly("org.ehcache:ehcache") + testRuntimeOnly("org.ehcache:jcache") testRuntimeOnly("org.glassfish:jakarta.el") - testRuntimeOnly("com.sun.mail:jakarta.mail") } diff --git a/spring-context-support/src/test/java/org/springframework/cache/caffeine/CaffeineReactiveCachingTests.java b/spring-context-support/src/test/java/org/springframework/cache/caffeine/CaffeineReactiveCachingTests.java new file mode 100644 index 00000000000..aa35e9aa5ba --- /dev/null +++ b/spring-context-support/src/test/java/org/springframework/cache/caffeine/CaffeineReactiveCachingTests.java @@ -0,0 +1,141 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cache.caffeine; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.cache.CacheManager; +import org.springframework.cache.annotation.CacheConfig; +import org.springframework.cache.annotation.Cacheable; +import org.springframework.cache.annotation.EnableCaching; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for annotation-based caching methods that use reactive operators. + * + * @author Juergen Hoeller + * @since 6.1 + */ +public class CaffeineReactiveCachingTests { + + @Test + void withCaffeineAsyncCache() { + ApplicationContext ctx = new AnnotationConfigApplicationContext(Config.class, ReactiveCacheableService.class); + ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class); + + Object key = new Object(); + + Long r1 = service.cacheFuture(key).join(); + Long r2 = service.cacheFuture(key).join(); + Long r3 = service.cacheFuture(key).join(); + + assertThat(r1).isNotNull(); + assertThat(r1).isSameAs(r2).isSameAs(r3); + + key = new Object(); + + r1 = service.cacheMono(key).block(); + r2 = service.cacheMono(key).block(); + r3 = service.cacheMono(key).block(); + + assertThat(r1).isNotNull(); + assertThat(r1).isSameAs(r2).isSameAs(r3); + + key = new Object(); + + r1 = service.cacheFlux(key).blockFirst(); + r2 = service.cacheFlux(key).blockFirst(); + r3 = service.cacheFlux(key).blockFirst(); + + assertThat(r1).isNotNull(); + assertThat(r1).isSameAs(r2).isSameAs(r3); + + key = new Object(); + + List l1 = service.cacheFlux(key).collectList().block(); + List l2 = service.cacheFlux(key).collectList().block(); + List l3 = service.cacheFlux(key).collectList().block(); + + assertThat(l1).isNotNull(); + assertThat(l1).isEqualTo(l2).isEqualTo(l3); + + key = new Object(); + + r1 = service.cacheMono(key).block(); + r2 = service.cacheMono(key).block(); + r3 = service.cacheMono(key).block(); + + assertThat(r1).isNotNull(); + assertThat(r1).isSameAs(r2).isSameAs(r3); + + // Same key as for Mono, reusing its cached value + + r1 = service.cacheFlux(key).blockFirst(); + r2 = service.cacheFlux(key).blockFirst(); + r3 = service.cacheFlux(key).blockFirst(); + + assertThat(r1).isNotNull(); + assertThat(r1).isSameAs(r2).isSameAs(r3); + } + + + @CacheConfig(cacheNames = "first") + static class ReactiveCacheableService { + + private final AtomicLong counter = new AtomicLong(); + + @Cacheable + CompletableFuture cacheFuture(Object arg) { + return CompletableFuture.completedFuture(this.counter.getAndIncrement()); + } + + @Cacheable + Mono cacheMono(Object arg) { + return Mono.just(this.counter.getAndIncrement()); + } + + @Cacheable + Flux cacheFlux(Object arg) { + return Flux.just(this.counter.getAndIncrement(), 0L); + } + } + + + @Configuration(proxyBeanMethods = false) + @EnableCaching + static class Config { + + @Bean + CacheManager cacheManager() { + CaffeineCacheManager ccm = new CaffeineCacheManager("first"); + ccm.setAsyncCacheMode(true); + return ccm; + } + } + +} diff --git a/spring-context/src/main/java/org/springframework/cache/Cache.java b/spring-context/src/main/java/org/springframework/cache/Cache.java index 63f93f32ea4..73556aca0cb 100644 --- a/spring-context/src/main/java/org/springframework/cache/Cache.java +++ b/spring-context/src/main/java/org/springframework/cache/Cache.java @@ -25,9 +25,9 @@ import org.springframework.lang.Nullable; /** * Interface that defines common cache operations. * - *

Serves as an SPI for Spring's annotation-based caching model - * ({@link org.springframework.cache.annotation.Cacheable} and co) - * as well as an API for direct usage in applications. + *

Serves primarily as an SPI for Spring's annotation-based caching + * model ({@link org.springframework.cache.annotation.Cacheable} and co) + * and secondarily as an API for direct usage in applications. * *

Note: Due to the generic use of caching, it is recommended * that implementations allow storage of {@code null} values @@ -113,16 +113,26 @@ public interface Cache { * wrapped in a {@link CompletableFuture}. This operation must not block * but is allowed to return a completed {@link CompletableFuture} if the * corresponding value is immediately available. - *

Returns {@code null} if the cache contains no mapping for this key; - * otherwise, the cached value (which may be {@code null} itself) will - * be returned in the {@link CompletableFuture}. + *

Can return {@code null} if the cache can immediately determine that + * it contains no mapping for this key (e.g. through an in-memory key map). + * Otherwise, the cached value will be returned in the {@link CompletableFuture}, + * with {@code null} indicating a late-determined cache miss (and a nested + * {@link ValueWrapper} potentially indicating a nullable cached value). * @param key the key whose associated value is to be returned - * @return the value to which this cache maps the specified key, - * contained within a {@link CompletableFuture} which may also hold - * a cached {@code null} value. A straight {@code null} being - * returned means that the cache contains no mapping for this key. + * @return the value to which this cache maps the specified key, contained + * within a {@link CompletableFuture} which may also be empty when a cache + * miss has been late-determined. A straight {@code null} being returned + * means that the cache immediately determined that it contains no mapping + * for this key. A {@link ValueWrapper} contained within the + * {@code CompletableFuture} can indicate a cached value that is potentially + * {@code null}; this is sensible in a late-determined scenario where a regular + * CompletableFuture-contained {@code null} indicates a cache miss. However, + * an early-determined cache will usually return the plain cached value here, + * and a late-determined cache may also return a plain value if it does not + * support the actual caching of {@code null} values. Spring's common cache + * processing can deal with all variants of these implementation strategies. * @since 6.1 - * @see #get(Object) + * @see #retrieve(Object, Supplier) */ @Nullable default CompletableFuture retrieve(Object key) { diff --git a/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java b/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java index 1df57244f3d..ae8b1c6befc 100644 --- a/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java +++ b/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java @@ -363,7 +363,7 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker protected Object execute(CacheOperationInvoker invoker, Object target, Method method, Object[] args) { // Check whether aspect is enabled (to cope with cases where the AJ is pulled in automatically) if (this.initialized) { - Class targetClass = getTargetClass(target); + Class targetClass = AopProxyUtils.ultimateTargetClass(target); CacheOperationSource cacheOperationSource = getCacheOperationSource(); if (cacheOperationSource != null) { Collection operations = cacheOperationSource.getCacheOperations(method, targetClass); @@ -374,7 +374,7 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker } } - return invoker.invoke(); + return invokeOperation(invoker); } /** @@ -392,10 +392,6 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker return invoker.invoke(); } - private Class getTargetClass(Object target) { - return AopProxyUtils.ultimateTargetClass(target); - } - @Nullable private Object execute(CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) { if (contexts.isSynchronized()) { @@ -408,7 +404,104 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker CacheOperationExpressionEvaluator.NO_RESULT); // Check if we have a cached value matching the conditions - Object cacheHit = findCachedValue(contexts.get(CacheableOperation.class)); + Object cacheHit = findCachedValue(invoker, method, contexts); + if (cacheHit == null || cacheHit instanceof Cache.ValueWrapper) { + return evaluate(cacheHit, invoker, method, contexts); + } + return cacheHit; + } + + @Nullable + private Object executeSynchronized(CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) { + CacheOperationContext context = contexts.get(CacheableOperation.class).iterator().next(); + if (isConditionPassing(context, CacheOperationExpressionEvaluator.NO_RESULT)) { + Object key = generateKey(context, CacheOperationExpressionEvaluator.NO_RESULT); + Cache cache = context.getCaches().iterator().next(); + if (CompletableFuture.class.isAssignableFrom(method.getReturnType())) { + return cache.retrieve(key, () -> (CompletableFuture) invokeOperation(invoker)); + } + if (this.reactiveCachingHandler != null) { + Object returnValue = this.reactiveCachingHandler.executeSynchronized(invoker, method, cache, key); + if (returnValue != ReactiveCachingHandler.NOT_HANDLED) { + return returnValue; + } + } + try { + return wrapCacheValue(method, cache.get(key, () -> unwrapReturnValue(invokeOperation(invoker)))); + } + catch (Cache.ValueRetrievalException ex) { + // Directly propagate ThrowableWrapper from the invoker, + // or potentially also an IllegalArgumentException etc. + ReflectionUtils.rethrowRuntimeException(ex.getCause()); + // Never reached + return null; + } + } + else { + // No caching required, just call the underlying method + return invokeOperation(invoker); + } + } + + /** + * Find a cached value only for {@link CacheableOperation} that passes the condition. + * @param contexts the cacheable operations + * @return a {@link Cache.ValueWrapper} holding the cached value, + * or {@code null} if none is found + */ + @Nullable + private Object findCachedValue(CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) { + for (CacheOperationContext context : contexts.get(CacheableOperation.class)) { + if (isConditionPassing(context, CacheOperationExpressionEvaluator.NO_RESULT)) { + Object key = generateKey(context, CacheOperationExpressionEvaluator.NO_RESULT); + Object cached = findInCaches(context, key, invoker, method, contexts); + if (cached != null) { + if (logger.isTraceEnabled()) { + logger.trace("Cache entry for key '" + key + "' found in cache(s) " + context.getCacheNames()); + } + return cached; + } + else { + if (logger.isTraceEnabled()) { + logger.trace("No cache entry for key '" + key + "' in cache(s) " + context.getCacheNames()); + } + } + } + } + return null; + } + + @Nullable + private Object findInCaches(CacheOperationContext context, Object key, + CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) { + + for (Cache cache : context.getCaches()) { + if (CompletableFuture.class.isAssignableFrom(context.getMethod().getReturnType())) { + CompletableFuture result = cache.retrieve(key); + if (result != null) { + return result.thenCompose(value -> (CompletableFuture) evaluate( + (value != null ? CompletableFuture.completedFuture(unwrapCacheValue(value)) : null), + invoker, method, contexts)); + } + } + if (this.reactiveCachingHandler != null) { + Object returnValue = this.reactiveCachingHandler.findInCaches( + context, cache, key, invoker, method, contexts); + if (returnValue != ReactiveCachingHandler.NOT_HANDLED) { + return returnValue; + } + } + Cache.ValueWrapper result = doGet(cache, key); + if (result != null) { + return result; + } + } + return null; + } + + @Nullable + private Object evaluate(@Nullable Object cacheHit, CacheOperationInvoker invoker, Method method, + CacheOperationContexts contexts) { Object cacheValue; Object returnValue; @@ -452,35 +545,8 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker } @Nullable - private Object executeSynchronized(CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) { - CacheOperationContext context = contexts.get(CacheableOperation.class).iterator().next(); - if (isConditionPassing(context, CacheOperationExpressionEvaluator.NO_RESULT)) { - Object key = generateKey(context, CacheOperationExpressionEvaluator.NO_RESULT); - Cache cache = context.getCaches().iterator().next(); - if (CompletableFuture.class.isAssignableFrom(method.getReturnType())) { - return cache.retrieve(key, () -> (CompletableFuture) invokeOperation(invoker)); - } - if (this.reactiveCachingHandler != null) { - Object returnValue = this.reactiveCachingHandler.executeSynchronized(invoker, method, cache, key); - if (returnValue != ReactiveCachingHandler.NOT_HANDLED) { - return returnValue; - } - } - try { - return wrapCacheValue(method, cache.get(key, () -> unwrapReturnValue(invokeOperation(invoker)))); - } - catch (Cache.ValueRetrievalException ex) { - // Directly propagate ThrowableWrapper from the invoker, - // or potentially also an IllegalArgumentException etc. - ReflectionUtils.rethrowRuntimeException(ex.getCause()); - // Never reached - return null; - } - } - else { - // No caching required, just call the underlying method - return invokeOperation(invoker); - } + private Object unwrapCacheValue(@Nullable Object cacheValue) { + return (cacheValue instanceof Cache.ValueWrapper wrapper ? wrapper.get() : cacheValue); } @Nullable @@ -575,34 +641,6 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker } } - /** - * Find a cached value only for {@link CacheableOperation} that passes the condition. - * @param contexts the cacheable operations - * @return a {@link Cache.ValueWrapper} holding the cached value, - * or {@code null} if none is found - */ - @Nullable - private Object findCachedValue(Collection contexts) { - for (CacheOperationContext context : contexts) { - if (isConditionPassing(context, CacheOperationExpressionEvaluator.NO_RESULT)) { - Object key = generateKey(context, CacheOperationExpressionEvaluator.NO_RESULT); - Object cached = findInCaches(context, key); - if (cached != null) { - if (logger.isTraceEnabled()) { - logger.trace("Cache entry for key '" + key + "' found in cache(s) " + context.getCacheNames()); - } - return cached; - } - else { - if (logger.isTraceEnabled()) { - logger.trace("No cache entry for key '" + key + "' in cache(s) " + context.getCacheNames()); - } - } - } - } - return null; - } - /** * Collect the {@link CachePutRequest} for all {@link CacheOperation} using * the specified result value. @@ -621,23 +659,6 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker } } - @Nullable - private Object findInCaches(CacheOperationContext context, Object key) { - for (Cache cache : context.getCaches()) { - if (CompletableFuture.class.isAssignableFrom(context.getMethod().getReturnType())) { - return cache.retrieve(key); - } - if (this.reactiveCachingHandler != null) { - Object returnValue = this.reactiveCachingHandler.findInCaches(context, cache, key); - if (returnValue != ReactiveCachingHandler.NOT_HANDLED) { - return returnValue; - } - } - return doGet(cache, key); - } - return null; - } - private boolean isConditionPassing(CacheOperationContext context, @Nullable Object result) { boolean passing = context.isConditionPassing(result); if (!passing && logger.isTraceEnabled()) { @@ -1048,8 +1069,11 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker return NOT_HANDLED; } + @SuppressWarnings("unchecked") @Nullable - public Object findInCaches(CacheOperationContext context, Cache cache, Object key) { + public Object findInCaches(CacheOperationContext context, Cache cache, Object key, + CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) { + ReactiveAdapter adapter = this.registry.getAdapter(context.getMethod().getReturnType()); if (adapter != null) { CompletableFuture cachedFuture = cache.retrieve(key); @@ -1057,11 +1081,16 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker return null; } if (adapter.isMultiValue()) { - return adapter.fromPublisher(Flux.from(Mono.fromFuture(cachedFuture)) - .flatMap(v -> (v instanceof Iterable iv ? Flux.fromIterable(iv) : Flux.just(v)))); + return adapter.fromPublisher(Flux.from( + Mono.fromFuture(cachedFuture) + .flatMap(value -> (Mono) evaluate(Mono.just(unwrapCacheValue(value)), invoker, method, contexts))) + .flatMap(v -> (v instanceof Iterable iv ? Flux.fromIterable(iv) : Flux.just(v))) + .switchIfEmpty(Flux.defer(() -> (Flux) evaluate(null, invoker, method, contexts)))); } else { - return adapter.fromPublisher(Mono.fromFuture(cachedFuture)); + return adapter.fromPublisher(Mono.fromFuture(cachedFuture) + .flatMap(value -> (Mono) evaluate(Mono.just(unwrapCacheValue(value)), invoker, method, contexts)) + .switchIfEmpty(Mono.defer(() -> (Mono) evaluate(null, invoker, method, contexts)))); } } return NOT_HANDLED; diff --git a/spring-context/src/test/java/org/springframework/cache/CacheReproTests.java b/spring-context/src/test/java/org/springframework/cache/CacheReproTests.java index e538deedd54..d1c8ff0bbb1 100644 --- a/spring-context/src/test/java/org/springframework/cache/CacheReproTests.java +++ b/spring-context/src/test/java/org/springframework/cache/CacheReproTests.java @@ -182,11 +182,13 @@ class CacheReproTests { Cache cache = context.getBean(CacheManager.class).getCache("itemCache"); TestBean tb = bean.findById("tb1").join(); + assertThat(tb).isNotNull(); assertThat(bean.findById("tb1").join()).isSameAs(tb); assertThat(cache.get("tb1").get()).isSameAs(tb); bean.clear().join(); TestBean tb2 = bean.findById("tb1").join(); + assertThat(tb2).isNotNull(); assertThat(tb2).isNotSameAs(tb); assertThat(cache.get("tb1").get()).isSameAs(tb2); @@ -230,11 +232,13 @@ class CacheReproTests { Cache cache = context.getBean(CacheManager.class).getCache("itemCache"); TestBean tb = bean.findById("tb1").block(); + assertThat(tb).isNotNull(); assertThat(bean.findById("tb1").block()).isSameAs(tb); assertThat(cache.get("tb1").get()).isSameAs(tb); bean.clear().block(); TestBean tb2 = bean.findById("tb1").block(); + assertThat(tb2).isNotNull(); assertThat(tb2).isNotSameAs(tb); assertThat(cache.get("tb1").get()).isSameAs(tb2); @@ -278,11 +282,13 @@ class CacheReproTests { Cache cache = context.getBean(CacheManager.class).getCache("itemCache"); List tb = bean.findById("tb1").collectList().block(); + assertThat(tb).isNotEmpty(); assertThat(bean.findById("tb1").collectList().block()).isEqualTo(tb); assertThat(cache.get("tb1").get()).isEqualTo(tb); bean.clear().blockLast(); List tb2 = bean.findById("tb1").collectList().block(); + assertThat(tb2).isNotEmpty(); assertThat(tb2).isNotEqualTo(tb); assertThat(cache.get("tb1").get()).isEqualTo(tb2); diff --git a/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java b/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java index 81dcd185d68..1b7aee81e29 100644 --- a/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java +++ b/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java @@ -16,14 +16,21 @@ package org.springframework.cache.annotation; +import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import org.springframework.cache.Cache; import org.springframework.cache.CacheManager; +import org.springframework.cache.concurrent.ConcurrentMapCache; import org.springframework.cache.concurrent.ConcurrentMapCacheManager; -import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.cache.support.SimpleValueWrapper; +import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -34,24 +41,71 @@ import static org.assertj.core.api.Assertions.assertThat; * Tests for annotation-based caching methods that use reactive operators. * * @author Stephane Nicoll + * @author Juergen Hoeller + * @since 6.1 */ public class ReactiveCachingTests { - private final ConfigurableApplicationContext ctx; + @ParameterizedTest + @ValueSource(classes = {EarlyCacheHitDeterminationConfig.class, + LateCacheHitDeterminationConfig.class, + LateCacheHitDeterminationWithValueWrapperConfig.class}) + void cacheHitDetermination(Class configClass) { + ApplicationContext ctx = new AnnotationConfigApplicationContext(configClass, ReactiveCacheableService.class); + ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class); - private final ReactiveCacheableService service; + Object key = new Object(); - public ReactiveCachingTests() { - this.ctx = new AnnotationConfigApplicationContext(TestConfig.class); - this.service = this.ctx.getBean(ReactiveCacheableService.class); - } + Long r1 = service.cacheFuture(key).join(); + Long r2 = service.cacheFuture(key).join(); + Long r3 = service.cacheFuture(key).join(); - @Test - void cache() { - Object key = new Object(); - Long r1 = this.service.cache(key).block(); - Long r2 = this.service.cache(key).block(); - Long r3 = this.service.cache(key).block(); + assertThat(r1).isNotNull(); + assertThat(r1).isSameAs(r2).isSameAs(r3); + + key = new Object(); + + r1 = service.cacheMono(key).block(); + r2 = service.cacheMono(key).block(); + r3 = service.cacheMono(key).block(); + + assertThat(r1).isNotNull(); + assertThat(r1).isSameAs(r2).isSameAs(r3); + + key = new Object(); + + r1 = service.cacheFlux(key).blockFirst(); + r2 = service.cacheFlux(key).blockFirst(); + r3 = service.cacheFlux(key).blockFirst(); + + assertThat(r1).isNotNull(); + assertThat(r1).isSameAs(r2).isSameAs(r3); + + key = new Object(); + + List l1 = service.cacheFlux(key).collectList().block(); + List l2 = service.cacheFlux(key).collectList().block(); + List l3 = service.cacheFlux(key).collectList().block(); + + assertThat(l1).isNotNull(); + assertThat(l1).isEqualTo(l2).isEqualTo(l3); + + key = new Object(); + + r1 = service.cacheMono(key).block(); + r2 = service.cacheMono(key).block(); + r3 = service.cacheMono(key).block(); + + assertThat(r1).isNotNull(); + assertThat(r1).isSameAs(r2).isSameAs(r3); + + // Same key as for Mono, reusing its cached value + + r1 = service.cacheFlux(key).blockFirst(); + r2 = service.cacheFlux(key).blockFirst(); + r3 = service.cacheFlux(key).blockFirst(); + + assertThat(r1).isNotNull(); assertThat(r1).isSameAs(r2).isSameAs(r3); } @@ -62,27 +116,78 @@ public class ReactiveCachingTests { private final AtomicLong counter = new AtomicLong(); @Cacheable - Mono cache(Object arg1) { + CompletableFuture cacheFuture(Object arg) { + return CompletableFuture.completedFuture(this.counter.getAndIncrement()); + } + + @Cacheable + Mono cacheMono(Object arg) { return Mono.just(this.counter.getAndIncrement()); } + @Cacheable + Flux cacheFlux(Object arg) { + return Flux.just(this.counter.getAndIncrement(), 0L); + } } @Configuration(proxyBeanMethods = false) @EnableCaching - static class TestConfig { + static class EarlyCacheHitDeterminationConfig { @Bean CacheManager cacheManager() { return new ConcurrentMapCacheManager("first"); } + } + + + @Configuration(proxyBeanMethods = false) + @EnableCaching + static class LateCacheHitDeterminationConfig { @Bean - ReactiveCacheableService reactiveCacheableService() { - return new ReactiveCacheableService(); + CacheManager cacheManager() { + return new ConcurrentMapCacheManager("first") { + @Override + protected Cache createConcurrentMapCache(String name) { + return new ConcurrentMapCache(name, isAllowNullValues()) { + @Override + public CompletableFuture retrieve(Object key) { + return CompletableFuture.completedFuture(lookup(key)); + } + }; + } + }; } + } + + + @Configuration(proxyBeanMethods = false) + @EnableCaching + static class LateCacheHitDeterminationWithValueWrapperConfig { + @Bean + CacheManager cacheManager() { + return new ConcurrentMapCacheManager("first") { + @Override + protected Cache createConcurrentMapCache(String name) { + return new ConcurrentMapCache(name, isAllowNullValues()) { + @Override + public CompletableFuture retrieve(Object key) { + Object value = lookup(key); + if (value != null) { + return CompletableFuture.completedFuture(new SimpleValueWrapper(fromStoreValue(value))); + } + else { + return CompletableFuture.completedFuture(null); + } + } + }; + } + }; + } } }