diff --git a/spring-context-support/src/main/java/org/springframework/cache/caffeine/CaffeineCache.java b/spring-context-support/src/main/java/org/springframework/cache/caffeine/CaffeineCache.java
index c47497f85e0..d73b130cbd2 100644
--- a/spring-context-support/src/main/java/org/springframework/cache/caffeine/CaffeineCache.java
+++ b/spring-context-support/src/main/java/org/springframework/cache/caffeine/CaffeineCache.java
@@ -17,8 +17,11 @@
package org.springframework.cache.caffeine;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
+import java.util.function.Supplier;
+import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.springframework.cache.support.AbstractValueAdaptingCache;
@@ -29,7 +32,11 @@ import org.springframework.util.Assert;
* Spring {@link org.springframework.cache.Cache} adapter implementation
* on top of a Caffeine {@link com.github.benmanes.caffeine.cache.Cache} instance.
*
- *
Requires Caffeine 2.1 or higher.
+ *
Supports the {@link #retrieve(Object)} and {@link #retrieve(Object, Supplier)}
+ * operations through Caffeine's {@link AsyncCache}, when provided via the
+ * {@link #CaffeineCache(String, AsyncCache, boolean)} constructor.
+ *
+ *
Requires Caffeine 3.0 or higher, as of Spring Framework 6.1.
*
* @author Ben Manes
* @author Juergen Hoeller
@@ -43,6 +50,9 @@ public class CaffeineCache extends AbstractValueAdaptingCache {
private final com.github.benmanes.caffeine.cache.Cache cache;
+ @Nullable
+ private AsyncCache asyncCache;
+
/**
* Create a {@link CaffeineCache} instance with the specified name and the
@@ -72,17 +82,51 @@ public class CaffeineCache extends AbstractValueAdaptingCache {
this.cache = cache;
}
+ /**
+ * Create a {@link CaffeineCache} instance with the specified name and the
+ * given internal {@link AsyncCache} to use.
+ * @param name the name of the cache
+ * @param cache the backing Caffeine Cache instance
+ * @param allowNullValues whether to accept and convert {@code null}
+ * values for this cache
+ * @since 6.1
+ */
+ public CaffeineCache(String name, AsyncCache cache, boolean allowNullValues) {
+ super(allowNullValues);
+ Assert.notNull(name, "Name must not be null");
+ Assert.notNull(cache, "Cache must not be null");
+ this.name = name;
+ this.cache = cache.synchronous();
+ this.asyncCache = cache;
+ }
+
@Override
public final String getName() {
return this.name;
}
+ /**
+ * Return the internal Caffeine Cache
+ * (possibly an adapter on top of an {@link #getAsyncCache()}).
+ */
@Override
public final com.github.benmanes.caffeine.cache.Cache getNativeCache() {
return this.cache;
}
+ /**
+ * Return the internal Caffeine AsyncCache.
+ * @throws IllegalStateException if no AsyncCache is available
+ * @see #CaffeineCache(String, AsyncCache, boolean)
+ * @see CaffeineCacheManager#setAsyncCacheMode
+ */
+ public final AsyncCache getAsyncCache() {
+ Assert.state(this.asyncCache != null,
+ "No Caffeine AsyncCache available: set CaffeineCacheManager.setAsyncCacheMode(true)");
+ return this.asyncCache;
+ }
+
@SuppressWarnings("unchecked")
@Override
@Nullable
@@ -90,6 +134,22 @@ public class CaffeineCache extends AbstractValueAdaptingCache {
return (T) fromStoreValue(this.cache.get(key, new LoadFunction(valueLoader)));
}
+ @Override
+ @Nullable
+ public CompletableFuture> retrieve(Object key) {
+ CompletableFuture> result = getAsyncCache().getIfPresent(key);
+ if (result != null && isAllowNullValues()) {
+ result = result.handle((value, ex) -> fromStoreValue(value));
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public CompletableFuture retrieve(Object key, Supplier> valueLoader) {
+ return (CompletableFuture) getAsyncCache().get(key, (k, e) -> valueLoader.get());
+ }
+
@Override
@Nullable
protected Object lookup(Object key) {
diff --git a/spring-context-support/src/main/java/org/springframework/cache/caffeine/CaffeineCacheManager.java b/spring-context-support/src/main/java/org/springframework/cache/caffeine/CaffeineCacheManager.java
index 239a7350cdc..bd2c2415c40 100644
--- a/spring-context-support/src/main/java/org/springframework/cache/caffeine/CaffeineCacheManager.java
+++ b/spring-context-support/src/main/java/org/springframework/cache/caffeine/CaffeineCacheManager.java
@@ -22,7 +22,10 @@ import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Supplier;
+import com.github.benmanes.caffeine.cache.AsyncCache;
+import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.CaffeineSpec;
@@ -45,7 +48,11 @@ import org.springframework.util.ObjectUtils;
* A {@link CaffeineSpec}-compliant expression value can also be applied
* via the {@link #setCacheSpecification "cacheSpecification"} bean property.
*
- * Requires Caffeine 2.1 or higher.
+ *
Supports the {@link Cache#retrieve(Object)} and
+ * {@link Cache#retrieve(Object, Supplier)} operations through Caffeine's
+ * {@link AsyncCache}, when configured via {@link #setAsyncCacheMode}.
+ *
+ *
Requires Caffeine 3.0 or higher, as of Spring Framework 6.1.
*
* @author Ben Manes
* @author Juergen Hoeller
@@ -54,13 +61,18 @@ import org.springframework.util.ObjectUtils;
* @author Brian Clozel
* @since 4.3
* @see CaffeineCache
+ * @see #setCaffeineSpec
+ * @see #setCacheSpecification
+ * @see #setAsyncCacheMode
*/
public class CaffeineCacheManager implements CacheManager {
private Caffeine cacheBuilder = Caffeine.newBuilder();
@Nullable
- private CacheLoader cacheLoader;
+ private AsyncCacheLoader cacheLoader;
+
+ private boolean asyncCacheMode = false;
private boolean allowNullValues = true;
@@ -110,7 +122,7 @@ public class CaffeineCacheManager implements CacheManager {
* Set the Caffeine to use for building each individual
* {@link CaffeineCache} instance.
* @see #createNativeCaffeineCache
- * @see com.github.benmanes.caffeine.cache.Caffeine#build()
+ * @see Caffeine#build()
*/
public void setCaffeine(Caffeine caffeine) {
Assert.notNull(caffeine, "Caffeine must not be null");
@@ -121,7 +133,7 @@ public class CaffeineCacheManager implements CacheManager {
* Set the {@link CaffeineSpec} to use for building each individual
* {@link CaffeineCache} instance.
* @see #createNativeCaffeineCache
- * @see com.github.benmanes.caffeine.cache.Caffeine#from(CaffeineSpec)
+ * @see Caffeine#from(CaffeineSpec)
*/
public void setCaffeineSpec(CaffeineSpec caffeineSpec) {
doSetCaffeine(Caffeine.from(caffeineSpec));
@@ -132,7 +144,7 @@ public class CaffeineCacheManager implements CacheManager {
* individual {@link CaffeineCache} instance. The given value needs to
* comply with Caffeine's {@link CaffeineSpec} (see its javadoc).
* @see #createNativeCaffeineCache
- * @see com.github.benmanes.caffeine.cache.Caffeine#from(String)
+ * @see Caffeine#from(String)
*/
public void setCacheSpecification(String cacheSpecification) {
doSetCaffeine(Caffeine.from(cacheSpecification));
@@ -149,7 +161,7 @@ public class CaffeineCacheManager implements CacheManager {
* Set the Caffeine CacheLoader to use for building each individual
* {@link CaffeineCache} instance, turning it into a LoadingCache.
* @see #createNativeCaffeineCache
- * @see com.github.benmanes.caffeine.cache.Caffeine#build(CacheLoader)
+ * @see Caffeine#build(CacheLoader)
* @see com.github.benmanes.caffeine.cache.LoadingCache
*/
public void setCacheLoader(CacheLoader cacheLoader) {
@@ -159,6 +171,45 @@ public class CaffeineCacheManager implements CacheManager {
}
}
+ /**
+ * Set the Caffeine AsyncCacheLoader to use for building each individual
+ * {@link CaffeineCache} instance, turning it into a LoadingCache.
+ * This implicitly switches the {@link #setAsyncCacheMode "asyncCacheMode"}
+ * flag to {@code true}.
+ * @since 6.1
+ * @see #createAsyncCaffeineCache
+ * @see Caffeine#buildAsync(AsyncCacheLoader)
+ * @see com.github.benmanes.caffeine.cache.LoadingCache
+ */
+ public void setAsyncCacheLoader(AsyncCacheLoader cacheLoader) {
+ if (!ObjectUtils.nullSafeEquals(this.cacheLoader, cacheLoader)) {
+ this.cacheLoader = cacheLoader;
+ this.asyncCacheMode = true;
+ refreshCommonCaches();
+ }
+ }
+
+ /**
+ * Set the common cache type that this cache manager builds to async.
+ * This applies to {@link #setCacheNames} as well as on-demand caches.
+ * Individual cache registrations (such as {@link #registerCustomCache(String, AsyncCache)}
+ * and {@link #registerCustomCache(String, com.github.benmanes.caffeine.cache.Cache)}
+ * are not dependent on this setting.
+ *
By default, this cache manager builds regular native Caffeine caches.
+ * To switch to async caches which can also be used through the synchronous API
+ * but come with support for {@code Cache#retrieve}, set this flag to {@code true}.
+ * @since 6.1
+ * @see Caffeine#buildAsync()
+ * @see Cache#retrieve(Object)
+ * @see Cache#retrieve(Object, Supplier)
+ */
+ public void setAsyncCacheMode(boolean asyncCacheMode) {
+ if (this.asyncCacheMode != asyncCacheMode) {
+ this.asyncCacheMode = asyncCacheMode;
+ refreshCommonCaches();
+ }
+ }
+
/**
* Specify whether to accept and convert {@code null} values for all caches
* in this cache manager.
@@ -211,13 +262,34 @@ public class CaffeineCacheManager implements CacheManager {
* @param name the name of the cache
* @param cache the custom Caffeine Cache instance to register
* @since 5.2.8
- * @see #adaptCaffeineCache
+ * @see #adaptCaffeineCache(String, com.github.benmanes.caffeine.cache.Cache)
*/
public void registerCustomCache(String name, com.github.benmanes.caffeine.cache.Cache cache) {
this.customCacheNames.add(name);
this.cacheMap.put(name, adaptCaffeineCache(name, cache));
}
+ /**
+ * Register the given Caffeine AsyncCache instance with this cache manager,
+ * adapting it to Spring's cache API for exposure through {@link #getCache}.
+ * Any number of such custom caches may be registered side by side.
+ * This allows for custom settings per cache (as opposed to all caches
+ * sharing the common settings in the cache manager's configuration) and
+ * is typically used with the Caffeine builder API:
+ * {@code registerCustomCache("myCache", Caffeine.newBuilder().maximumSize(10).build())}
+ *
Note that any other caches, whether statically specified through
+ * {@link #setCacheNames} or dynamically built on demand, still operate
+ * with the common settings in the cache manager's configuration.
+ * @param name the name of the cache
+ * @param cache the custom Caffeine Cache instance to register
+ * @since 6.1
+ * @see #adaptCaffeineCache(String, AsyncCache)
+ */
+ public void registerCustomCache(String name, AsyncCache cache) {
+ this.customCacheNames.add(name);
+ this.cacheMap.put(name, adaptCaffeineCache(name, cache));
+ }
+
/**
* Adapt the given new native Caffeine Cache instance to Spring's {@link Cache}
* abstraction for the specified cache name.
@@ -225,13 +297,27 @@ public class CaffeineCacheManager implements CacheManager {
* @param cache the native Caffeine Cache instance
* @return the Spring CaffeineCache adapter (or a decorator thereof)
* @since 5.2.8
- * @see CaffeineCache
+ * @see CaffeineCache#CaffeineCache(String, com.github.benmanes.caffeine.cache.Cache, boolean)
* @see #isAllowNullValues()
*/
protected Cache adaptCaffeineCache(String name, com.github.benmanes.caffeine.cache.Cache cache) {
return new CaffeineCache(name, cache, isAllowNullValues());
}
+ /**
+ * Adapt the given new Caffeine AsyncCache instance to Spring's {@link Cache}
+ * abstraction for the specified cache name.
+ * @param name the name of the cache
+ * @param cache the Caffeine AsyncCache instance
+ * @return the Spring CaffeineCache adapter (or a decorator thereof)
+ * @since 6.1
+ * @see CaffeineCache#CaffeineCache(String, AsyncCache, boolean)
+ * @see #isAllowNullValues()
+ */
+ protected Cache adaptCaffeineCache(String name, AsyncCache cache) {
+ return new CaffeineCache(name, cache, isAllowNullValues());
+ }
+
/**
* Build a common {@link CaffeineCache} instance for the specified cache name,
* using the common Caffeine configuration specified on this cache manager.
@@ -244,7 +330,8 @@ public class CaffeineCacheManager implements CacheManager {
* @see #createNativeCaffeineCache
*/
protected Cache createCaffeineCache(String name) {
- return adaptCaffeineCache(name, createNativeCaffeineCache(name));
+ return (this.asyncCacheMode ? adaptCaffeineCache(name, createAsyncCaffeineCache(name)) :
+ adaptCaffeineCache(name, createNativeCaffeineCache(name)));
}
/**
@@ -255,7 +342,29 @@ public class CaffeineCacheManager implements CacheManager {
* @see #createCaffeineCache
*/
protected com.github.benmanes.caffeine.cache.Cache createNativeCaffeineCache(String name) {
- return (this.cacheLoader != null ? this.cacheBuilder.build(this.cacheLoader) : this.cacheBuilder.build());
+ if (this.cacheLoader != null) {
+ if (this.cacheLoader instanceof CacheLoader regularCacheLoader) {
+ return this.cacheBuilder.build(regularCacheLoader);
+ }
+ else {
+ throw new IllegalStateException(
+ "Cannot create regular Caffeine Cache with async-only cache loader: " + this.cacheLoader);
+ }
+ }
+ return this.cacheBuilder.build();
+ }
+
+ /**
+ * Build a common Caffeine AsyncCache instance for the specified cache name,
+ * using the common Caffeine configuration specified on this cache manager.
+ * @param name the name of the cache
+ * @return the Caffeine AsyncCache instance
+ * @since 6.1
+ * @see #createCaffeineCache
+ */
+ protected AsyncCache createAsyncCaffeineCache(String name) {
+ return (this.cacheLoader != null ? this.cacheBuilder.buildAsync(this.cacheLoader) :
+ this.cacheBuilder.buildAsync());
}
/**
diff --git a/spring-context-support/src/main/java/org/springframework/cache/transaction/TransactionAwareCacheDecorator.java b/spring-context-support/src/main/java/org/springframework/cache/transaction/TransactionAwareCacheDecorator.java
index 33571ffc905..1c14c85a4b8 100644
--- a/spring-context-support/src/main/java/org/springframework/cache/transaction/TransactionAwareCacheDecorator.java
+++ b/spring-context-support/src/main/java/org/springframework/cache/transaction/TransactionAwareCacheDecorator.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2020 the original author or authors.
+ * Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,8 @@
package org.springframework.cache.transaction;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
import org.springframework.cache.Cache;
import org.springframework.lang.Nullable;
@@ -91,6 +93,17 @@ public class TransactionAwareCacheDecorator implements Cache {
return this.targetCache.get(key, valueLoader);
}
+ @Override
+ @Nullable
+ public CompletableFuture> retrieve(Object key) {
+ return this.targetCache.retrieve(key);
+ }
+
+ @Override
+ public CompletableFuture retrieve(Object key, Supplier> valueLoader) {
+ return this.targetCache.retrieve(key, valueLoader);
+ }
+
@Override
public void put(final Object key, @Nullable final Object value) {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
diff --git a/spring-context-support/src/test/java/org/springframework/cache/caffeine/CaffeineCacheManagerTests.java b/spring-context-support/src/test/java/org/springframework/cache/caffeine/CaffeineCacheManagerTests.java
index fd2943c1919..a9f9858087e 100644
--- a/spring-context-support/src/test/java/org/springframework/cache/caffeine/CaffeineCacheManagerTests.java
+++ b/spring-context-support/src/test/java/org/springframework/cache/caffeine/CaffeineCacheManagerTests.java
@@ -16,6 +16,8 @@
package org.springframework.cache.caffeine;
+import java.util.concurrent.CompletableFuture;
+
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.CaffeineSpec;
@@ -26,6 +28,7 @@ import org.springframework.cache.CacheManager;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import static org.mockito.Mockito.mock;
/**
@@ -38,22 +41,26 @@ public class CaffeineCacheManagerTests {
@Test
public void testDynamicMode() {
CacheManager cm = new CaffeineCacheManager();
+
Cache cache1 = cm.getCache("c1");
- boolean condition2 = cache1 instanceof CaffeineCache;
- assertThat(condition2).isTrue();
+ assertThat(cache1).isInstanceOf(CaffeineCache.class);
Cache cache1again = cm.getCache("c1");
assertThat(cache1).isSameAs(cache1again);
Cache cache2 = cm.getCache("c2");
- boolean condition1 = cache2 instanceof CaffeineCache;
- assertThat(condition1).isTrue();
+ assertThat(cache2).isInstanceOf(CaffeineCache.class);
Cache cache2again = cm.getCache("c2");
assertThat(cache2).isSameAs(cache2again);
Cache cache3 = cm.getCache("c3");
- boolean condition = cache3 instanceof CaffeineCache;
- assertThat(condition).isTrue();
+ assertThat(cache3).isInstanceOf(CaffeineCache.class);
Cache cache3again = cm.getCache("c3");
assertThat(cache3).isSameAs(cache3again);
+ assertThatIllegalStateException().isThrownBy(() -> cache1.retrieve("key1"));
+ assertThatIllegalStateException().isThrownBy(() -> cache1.retrieve("key2"));
+ assertThatIllegalStateException().isThrownBy(() -> cache1.retrieve("key3"));
+ assertThatIllegalStateException().isThrownBy(() -> cache1.retrieve("key3",
+ () -> CompletableFuture.completedFuture("value3")));
+
cache1.put("key1", "value1");
assertThat(cache1.get("key1").get()).isEqualTo("value1");
cache1.put("key2", 2);
@@ -62,19 +69,23 @@ public class CaffeineCacheManagerTests {
assertThat(cache1.get("key3").get()).isNull();
cache1.evict("key3");
assertThat(cache1.get("key3")).isNull();
+ assertThat(cache1.get("key3", () -> "value3")).isEqualTo("value3");
+ assertThat(cache1.get("key3", () -> "value3")).isEqualTo("value3");
+ cache1.evict("key3");
+ assertThat(cache1.get("key3", () -> (String) null)).isNull();
+ assertThat(cache1.get("key3", () -> (String) null)).isNull();
}
@Test
public void testStaticMode() {
CaffeineCacheManager cm = new CaffeineCacheManager("c1", "c2");
+
Cache cache1 = cm.getCache("c1");
- boolean condition3 = cache1 instanceof CaffeineCache;
- assertThat(condition3).isTrue();
+ assertThat(cache1).isInstanceOf(CaffeineCache.class);
Cache cache1again = cm.getCache("c1");
assertThat(cache1).isSameAs(cache1again);
Cache cache2 = cm.getCache("c2");
- boolean condition2 = cache2 instanceof CaffeineCache;
- assertThat(condition2).isTrue();
+ assertThat(cache2).isInstanceOf(CaffeineCache.class);
Cache cache2again = cm.getCache("c2");
assertThat(cache2).isSameAs(cache2again);
Cache cache3 = cm.getCache("c3");
@@ -88,15 +99,24 @@ public class CaffeineCacheManagerTests {
assertThat(cache1.get("key3").get()).isNull();
cache1.evict("key3");
assertThat(cache1.get("key3")).isNull();
+ assertThat(cache1.get("key3", () -> "value3")).isEqualTo("value3");
+ assertThat(cache1.get("key3", () -> "value3")).isEqualTo("value3");
+ cache1.evict("key3");
+ assertThat(cache1.get("key3", () -> (String) null)).isNull();
+ assertThat(cache1.get("key3", () -> (String) null)).isNull();
+
+ assertThatIllegalStateException().isThrownBy(() -> cache1.retrieve("key1"));
+ assertThatIllegalStateException().isThrownBy(() -> cache1.retrieve("key2"));
+ assertThatIllegalStateException().isThrownBy(() -> cache1.retrieve("key3"));
+ assertThatIllegalStateException().isThrownBy(() -> cache1.retrieve("key3",
+ () -> CompletableFuture.completedFuture("value3")));
cm.setAllowNullValues(false);
Cache cache1x = cm.getCache("c1");
- boolean condition1 = cache1x instanceof CaffeineCache;
- assertThat(condition1).isTrue();
+ assertThat(cache1x).isInstanceOf(CaffeineCache.class);
assertThat(cache1x).isNotSameAs(cache1);
Cache cache2x = cm.getCache("c2");
- boolean condition = cache2x instanceof CaffeineCache;
- assertThat(condition).isTrue();
+ assertThat(cache2x).isInstanceOf(CaffeineCache.class);
assertThat(cache2x).isNotSameAs(cache2);
Cache cache3x = cm.getCache("c3");
assertThat(cache3x).isNull();
@@ -115,6 +135,52 @@ public class CaffeineCacheManagerTests {
assertThat(cache1y.get("key3")).isNull();
}
+ @Test
+ public void testAsyncMode() {
+ CaffeineCacheManager cm = new CaffeineCacheManager();
+ cm.setAsyncCacheMode(true);
+
+ Cache cache1 = cm.getCache("c1");
+ assertThat(cache1).isInstanceOf(CaffeineCache.class);
+ Cache cache1again = cm.getCache("c1");
+ assertThat(cache1).isSameAs(cache1again);
+ Cache cache2 = cm.getCache("c2");
+ assertThat(cache2).isInstanceOf(CaffeineCache.class);
+ Cache cache2again = cm.getCache("c2");
+ assertThat(cache2).isSameAs(cache2again);
+ Cache cache3 = cm.getCache("c3");
+ assertThat(cache3).isInstanceOf(CaffeineCache.class);
+ Cache cache3again = cm.getCache("c3");
+ assertThat(cache3).isSameAs(cache3again);
+
+ cache1.put("key1", "value1");
+ assertThat(cache1.get("key1").get()).isEqualTo("value1");
+ cache1.put("key2", 2);
+ assertThat(cache1.get("key2").get()).isEqualTo(2);
+ cache1.put("key3", null);
+ assertThat(cache1.get("key3").get()).isNull();
+ cache1.evict("key3");
+ assertThat(cache1.get("key3")).isNull();
+ assertThat(cache1.get("key3", () -> "value3")).isEqualTo("value3");
+ assertThat(cache1.get("key3", () -> "value3")).isEqualTo("value3");
+ cache1.evict("key3");
+ assertThat(cache1.get("key3", () -> (String) null)).isNull();
+ assertThat(cache1.get("key3", () -> (String) null)).isNull();
+
+ assertThat(cache1.retrieve("key1").join()).isEqualTo("value1");
+ assertThat(cache1.retrieve("key2").join()).isEqualTo(2);
+ assertThat(cache1.retrieve("key3").join()).isNull();
+ cache1.evict("key3");
+ assertThat(cache1.retrieve("key3")).isNull();
+ assertThat(cache1.retrieve("key3", () -> CompletableFuture.completedFuture("value3")).join())
+ .isEqualTo("value3");
+ assertThat(cache1.retrieve("key3", () -> CompletableFuture.completedFuture("value3")).join())
+ .isEqualTo("value3");
+ cache1.evict("key3");
+ assertThat(cache1.retrieve("key3", () -> CompletableFuture.completedFuture(null)).join()).isNull();
+ assertThat(cache1.retrieve("key3", () -> CompletableFuture.completedFuture(null)).join()).isNull();
+ }
+
@Test
public void changeCaffeineRecreateCache() {
CaffeineCacheManager cm = new CaffeineCacheManager("c1");
@@ -190,7 +256,7 @@ public class CaffeineCacheManagerTests {
assertThat(value.get()).isEqualTo("pong");
assertThatIllegalArgumentException().isThrownBy(() -> assertThat(cache1.get("foo")).isNull())
- .withMessageContaining("I only know ping");
+ .withMessageContaining("I only know ping");
}
@Test
diff --git a/spring-context/src/main/java/org/springframework/cache/Cache.java b/spring-context/src/main/java/org/springframework/cache/Cache.java
index 8a3b904f490..63f93f32ea4 100644
--- a/spring-context/src/main/java/org/springframework/cache/Cache.java
+++ b/spring-context/src/main/java/org/springframework/cache/Cache.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2022 the original author or authors.
+ * Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,20 +17,28 @@
package org.springframework.cache;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
import org.springframework.lang.Nullable;
/**
* Interface that defines common cache operations.
*
- * Note: Due to the generic use of caching, it is recommended that
- * implementations allow storage of {@code null} values (for example to
- * cache methods that return {@code null}).
+ * Serves as an SPI for Spring's annotation-based caching model
+ * ({@link org.springframework.cache.annotation.Cacheable} and co)
+ * as well as an API for direct usage in applications.
+ *
+ *
Note: Due to the generic use of caching, it is recommended
+ * that implementations allow storage of {@code null} values
+ * (for example to cache methods that return {@code null}).
*
* @author Costin Leau
* @author Juergen Hoeller
* @author Stephane Nicoll
* @since 3.1
+ * @see CacheManager
+ * @see org.springframework.cache.annotation.Cacheable
*/
public interface Cache {
@@ -100,6 +108,51 @@ public interface Cache {
@Nullable
T get(Object key, Callable valueLoader);
+ /**
+ * Return the value to which this cache maps the specified key,
+ * wrapped in a {@link CompletableFuture}. This operation must not block
+ * but is allowed to return a completed {@link CompletableFuture} if the
+ * corresponding value is immediately available.
+ * Returns {@code null} if the cache contains no mapping for this key;
+ * otherwise, the cached value (which may be {@code null} itself) will
+ * be returned in the {@link CompletableFuture}.
+ * @param key the key whose associated value is to be returned
+ * @return the value to which this cache maps the specified key,
+ * contained within a {@link CompletableFuture} which may also hold
+ * a cached {@code null} value. A straight {@code null} being
+ * returned means that the cache contains no mapping for this key.
+ * @since 6.1
+ * @see #get(Object)
+ */
+ @Nullable
+ default CompletableFuture> retrieve(Object key) {
+ throw new UnsupportedOperationException(
+ getClass().getName() + " does not support CompletableFuture-based retrieval");
+ }
+
+ /**
+ * Return the value to which this cache maps the specified key, obtaining
+ * that value from {@code valueLoader} if necessary. This method provides
+ * a simple substitute for the conventional "if cached, return; otherwise
+ * create, cache and return" pattern, based on {@link CompletableFuture}.
+ * This operation must not block.
+ *
If possible, implementations should ensure that the loading operation
+ * is synchronized so that the specified {@code valueLoader} is only called
+ * once in case of concurrent access on the same key.
+ *
If the {@code valueLoader} throws an exception, it will be propagated
+ * to the {@code CompletableFuture} handle returned from here.
+ * @param key the key whose associated value is to be returned
+ * @return the value to which this cache maps the specified key,
+ * contained within a {@link CompletableFuture}
+ * @since 6.1
+ * @see #retrieve(Object)
+ * @see #get(Object, Callable)
+ */
+ default CompletableFuture retrieve(Object key, Supplier> valueLoader) {
+ throw new UnsupportedOperationException(
+ getClass().getName() + " does not support CompletableFuture-based retrieval");
+ }
+
/**
* Associate the specified value with the specified key in this cache.
* If the cache previously contained a mapping for this key, the old
@@ -108,6 +161,11 @@ public interface Cache {
* fashion, with subsequent lookups possibly not seeing the entry yet.
* This may for example be the case with transactional cache decorators.
* Use {@link #putIfAbsent} for guaranteed immediate registration.
+ *
If the cache is supposed to be compatible with {@link CompletableFuture}
+ * and reactive interactions, the put operation needs to be effectively
+ * non-blocking, with any backend write-through happening asynchronously.
+ * This goes along with a cache implemented and configured to support
+ * {@link #retrieve(Object)} and {@link #retrieve(Object, Supplier)}.
* @param key the key with which the specified value is to be associated
* @param value the value to be associated with the specified key
* @see #putIfAbsent(Object, Object)
@@ -156,6 +214,11 @@ public interface Cache {
* fashion, with subsequent lookups possibly still seeing the entry.
* This may for example be the case with transactional cache decorators.
* Use {@link #evictIfPresent} for guaranteed immediate removal.
+ *
If the cache is supposed to be compatible with {@link CompletableFuture}
+ * and reactive interactions, the evict operation needs to be effectively
+ * non-blocking, with any backend write-through happening asynchronously.
+ * This goes along with a cache implemented and configured to support
+ * {@link #retrieve(Object)} and {@link #retrieve(Object, Supplier)}.
* @param key the key whose mapping is to be removed from the cache
* @see #evictIfPresent(Object)
*/
@@ -188,6 +251,11 @@ public interface Cache {
* fashion, with subsequent lookups possibly still seeing the entries.
* This may for example be the case with transactional cache decorators.
* Use {@link #invalidate()} for guaranteed immediate removal of entries.
+ *
If the cache is supposed to be compatible with {@link CompletableFuture}
+ * and reactive interactions, the clear operation needs to be effectively
+ * non-blocking, with any backend write-through happening asynchronously.
+ * This goes along with a cache implemented and configured to support
+ * {@link #retrieve(Object)} and {@link #retrieve(Object, Supplier)}.
* @see #invalidate()
*/
void clear();
diff --git a/spring-context/src/main/java/org/springframework/cache/concurrent/ConcurrentMapCache.java b/spring-context/src/main/java/org/springframework/cache/concurrent/ConcurrentMapCache.java
index 1a17605578a..10648155c43 100644
--- a/spring-context/src/main/java/org/springframework/cache/concurrent/ConcurrentMapCache.java
+++ b/spring-context/src/main/java/org/springframework/cache/concurrent/ConcurrentMapCache.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2020 the original author or authors.
+ * Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,8 +17,11 @@
package org.springframework.cache.concurrent;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ForkJoinPool;
+import java.util.function.Supplier;
import org.springframework.cache.support.AbstractValueAdaptingCache;
import org.springframework.core.serializer.support.SerializationDelegate;
@@ -26,13 +29,17 @@ import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
/**
- * Simple {@link org.springframework.cache.Cache} implementation based on the
- * core JDK {@code java.util.concurrent} package.
+ * Simple {@link org.springframework.cache.Cache} implementation based on the core
+ * JDK {@code java.util.concurrent} package.
*
*
Useful for testing or simple caching scenarios, typically in combination
* with {@link org.springframework.cache.support.SimpleCacheManager} or
* dynamically through {@link ConcurrentMapCacheManager}.
*
+ *
Supports the {@link #retrieve(Object)} and {@link #retrieve(Object, Supplier)}
+ * operations in a best-effort fashion, relying on default {@link CompletableFuture}
+ * execution (typically within the JVM's {@link ForkJoinPool#commonPool()}).
+ *
*
Note: As {@link ConcurrentHashMap} (the default implementation used)
* does not allow for {@code null} values to be stored, this class will replace
* them with a predefined internal object. This behavior can be changed through the
@@ -149,6 +156,20 @@ public class ConcurrentMapCache extends AbstractValueAdaptingCache {
}));
}
+ @Override
+ @Nullable
+ public CompletableFuture> retrieve(Object key) {
+ Object value = lookup(key);
+ return (value != null ? CompletableFuture.completedFuture(fromStoreValue(value)) : null);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public CompletableFuture retrieve(Object key, Supplier> valueLoader) {
+ return CompletableFuture.supplyAsync(() ->
+ (T) fromStoreValue(this.store.computeIfAbsent(key, k -> toStoreValue(valueLoader.get().join()))));
+ }
+
@Override
public void put(Object key, @Nullable Object value) {
this.store.put(key, toStoreValue(value));
diff --git a/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java b/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java
index fe60df9d6b6..695c53471c3 100644
--- a/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java
+++ b/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java
@@ -24,11 +24,16 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.aop.support.AopUtils;
@@ -43,6 +48,8 @@ import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.context.expression.AnnotatedElementKey;
import org.springframework.core.BridgeMethodResolver;
+import org.springframework.core.ReactiveAdapter;
+import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.expression.EvaluationContext;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
@@ -83,12 +90,18 @@ import org.springframework.util.function.SupplierUtils;
public abstract class CacheAspectSupport extends AbstractCacheInvoker
implements BeanFactoryAware, InitializingBean, SmartInitializingSingleton {
+ private static final boolean reactiveStreamsPresent = ClassUtils.isPresent(
+ "org.reactivestreams.Publisher", CacheAspectSupport.class.getClassLoader());
+
protected final Log logger = LogFactory.getLog(getClass());
private final Map metadataCache = new ConcurrentHashMap<>(1024);
private final CacheOperationExpressionEvaluator evaluator = new CacheOperationExpressionEvaluator();
+ @Nullable
+ private final ReactiveCachingHandler reactiveCachingHandler;
+
@Nullable
private CacheOperationSource cacheOperationSource;
@@ -103,6 +116,11 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
private boolean initialized = false;
+ protected CacheAspectSupport() {
+ this.reactiveCachingHandler = (reactiveStreamsPresent ? new ReactiveCachingHandler() : null);
+ }
+
+
/**
* Configure this aspect with the given error handler, key generator and cache resolver/manager
* suppliers, applying the corresponding default if a supplier is not resolvable.
@@ -371,41 +389,25 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
}
@Nullable
- private Object execute(final CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) {
- // Special handling of synchronized invocation
+ private Object execute(CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) {
if (contexts.isSynchronized()) {
- CacheOperationContext context = contexts.get(CacheableOperation.class).iterator().next();
- if (isConditionPassing(context, CacheOperationExpressionEvaluator.NO_RESULT)) {
- Object key = generateKey(context, CacheOperationExpressionEvaluator.NO_RESULT);
- Cache cache = context.getCaches().iterator().next();
- try {
- return wrapCacheValue(method, handleSynchronizedGet(invoker, key, cache));
- }
- catch (Cache.ValueRetrievalException ex) {
- // Directly propagate ThrowableWrapper from the invoker,
- // or potentially also an IllegalArgumentException etc.
- ReflectionUtils.rethrowRuntimeException(ex.getCause());
- }
- }
- else {
- // No caching required, just call the underlying method
- return invokeOperation(invoker);
- }
+ // Special handling of synchronized invocation
+ return executeSynchronized(invoker, method, contexts);
}
// Process any early evictions
processCacheEvicts(contexts.get(CacheEvictOperation.class), true,
CacheOperationExpressionEvaluator.NO_RESULT);
- // Check if we have a cached item matching the conditions
- Cache.ValueWrapper cacheHit = findCachedItem(contexts.get(CacheableOperation.class));
+ // Check if we have a cached value matching the conditions
+ Object cacheHit = findCachedValue(contexts.get(CacheableOperation.class));
Object cacheValue;
Object returnValue;
if (cacheHit != null && !hasCachePut(contexts)) {
// If there are no put requests, just use the cache hit
- cacheValue = cacheHit.get();
+ cacheValue = (cacheHit instanceof Cache.ValueWrapper wrapper ? wrapper.get() : cacheHit);
returnValue = wrapCacheValue(method, cacheValue);
}
else {
@@ -414,8 +416,8 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
cacheValue = unwrapReturnValue(returnValue);
}
- // Collect puts from any @Cacheable miss, if no cached item is found
- List cachePutRequests = new ArrayList<>();
+ // Collect puts from any @Cacheable miss, if no cached value is found
+ List cachePutRequests = new ArrayList<>(1);
if (cacheHit == null) {
collectPutRequests(contexts.get(CacheableOperation.class), cacheValue, cachePutRequests);
}
@@ -425,29 +427,52 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
// Process any collected put requests, either from @CachePut or a @Cacheable miss
for (CachePutRequest cachePutRequest : cachePutRequests) {
- cachePutRequest.apply(cacheValue);
+ Object returnOverride = cachePutRequest.apply(cacheValue);
+ if (returnOverride != null) {
+ returnValue = returnOverride;
+ }
}
// Process any late evictions
- processCacheEvicts(contexts.get(CacheEvictOperation.class), false, cacheValue);
+ Object returnOverride = processCacheEvicts(
+ contexts.get(CacheEvictOperation.class), false, returnValue);
+ if (returnOverride != null) {
+ returnValue = returnOverride;
+ }
return returnValue;
}
@Nullable
- private Object handleSynchronizedGet(CacheOperationInvoker invoker, Object key, Cache cache) {
- InvocationAwareResult invocationResult = new InvocationAwareResult();
- Object result = cache.get(key, () -> {
- invocationResult.invoked = true;
- if (logger.isTraceEnabled()) {
- logger.trace("No cache entry for key '" + key + "' in cache " + cache.getName());
+ private Object executeSynchronized(CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) {
+ CacheOperationContext context = contexts.get(CacheableOperation.class).iterator().next();
+ if (isConditionPassing(context, CacheOperationExpressionEvaluator.NO_RESULT)) {
+ Object key = generateKey(context, CacheOperationExpressionEvaluator.NO_RESULT);
+ Cache cache = context.getCaches().iterator().next();
+ if (CompletableFuture.class.isAssignableFrom(method.getReturnType())) {
+ return cache.retrieve(key, () -> (CompletableFuture>) invokeOperation(invoker));
+ }
+ if (this.reactiveCachingHandler != null) {
+ Object returnValue = this.reactiveCachingHandler.executeSynchronized(invoker, method, cache, key);
+ if (returnValue != ReactiveCachingHandler.NOT_HANDLED) {
+ return returnValue;
+ }
}
- return unwrapReturnValue(invokeOperation(invoker));
- });
- if (!invocationResult.invoked && logger.isTraceEnabled()) {
- logger.trace("Cache entry for key '" + key + "' found in cache '" + cache.getName() + "'");
+ try {
+ return wrapCacheValue(method, cache.get(key, () -> unwrapReturnValue(invokeOperation(invoker))));
+ }
+ catch (Cache.ValueRetrievalException ex) {
+ // Directly propagate ThrowableWrapper from the invoker,
+ // or potentially also an IllegalArgumentException etc.
+ ReflectionUtils.rethrowRuntimeException(ex.getCause());
+ // Never reached
+ return null;
+ }
+ }
+ else {
+ // No caching required, just call the underlying method
+ return invokeOperation(invoker);
}
- return result;
}
@Nullable
@@ -467,7 +492,7 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
private boolean hasCachePut(CacheOperationContexts contexts) {
// Evaluate the conditions *without* the result object because we don't have it yet...
Collection cachePutContexts = contexts.get(CachePutOperation.class);
- Collection excluded = new ArrayList<>();
+ Collection excluded = new ArrayList<>(1);
for (CacheOperationContext context : cachePutContexts) {
try {
if (!context.isConditionPassing(CacheOperationExpressionEvaluator.RESULT_UNAVAILABLE)) {
@@ -482,32 +507,55 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
return (cachePutContexts.size() != excluded.size());
}
- private void processCacheEvicts(
- Collection contexts, boolean beforeInvocation, @Nullable Object result) {
+ @Nullable
+ private Object processCacheEvicts(Collection contexts, boolean beforeInvocation,
+ @Nullable Object result) {
- for (CacheOperationContext context : contexts) {
- CacheEvictOperation operation = (CacheEvictOperation) context.metadata.operation;
- if (beforeInvocation == operation.isBeforeInvocation() && isConditionPassing(context, result)) {
- performCacheEvict(context, operation, result);
+ if (contexts.isEmpty()) {
+ return null;
+ }
+ List applicable = contexts.stream()
+ .filter(context -> (context.metadata.operation instanceof CacheEvictOperation evict &&
+ beforeInvocation == evict.isBeforeInvocation())).toList();
+ if (applicable.isEmpty()) {
+ return null;
+ }
+
+ if (result instanceof CompletableFuture> future) {
+ return future.whenComplete((value, ex) -> {
+ if (ex == null) {
+ performCacheEvicts(applicable, result);
+ }
+ });
+ }
+ if (this.reactiveCachingHandler != null) {
+ Object returnValue = this.reactiveCachingHandler.processCacheEvicts(applicable, result);
+ if (returnValue != ReactiveCachingHandler.NOT_HANDLED) {
+ return returnValue;
}
}
+ performCacheEvicts(applicable, result);
+ return null;
}
- private void performCacheEvict(
- CacheOperationContext context, CacheEvictOperation operation, @Nullable Object result) {
-
- Object key = null;
- for (Cache cache : context.getCaches()) {
- if (operation.isCacheWide()) {
- logInvalidating(context, operation, null);
- doClear(cache, operation.isBeforeInvocation());
- }
- else {
- if (key == null) {
- key = generateKey(context, result);
+ private void performCacheEvicts(List contexts, @Nullable Object result) {
+ for (CacheOperationContext context : contexts) {
+ CacheEvictOperation operation = (CacheEvictOperation) context.metadata.operation;
+ if (isConditionPassing(context, result)) {
+ Object key = null;
+ for (Cache cache : context.getCaches()) {
+ if (operation.isCacheWide()) {
+ logInvalidating(context, operation, null);
+ doClear(cache, operation.isBeforeInvocation());
+ }
+ else {
+ if (key == null) {
+ key = generateKey(context, result);
+ }
+ logInvalidating(context, operation, key);
+ doEvict(cache, key, operation.isBeforeInvocation());
+ }
}
- logInvalidating(context, operation, key);
- doEvict(cache, key, operation.isBeforeInvocation());
}
}
}
@@ -520,19 +568,21 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
}
/**
- * Find a cached item only for {@link CacheableOperation} that passes the condition.
+ * Find a cached value only for {@link CacheableOperation} that passes the condition.
* @param contexts the cacheable operations
- * @return a {@link Cache.ValueWrapper} holding the cached item,
+ * @return a {@link Cache.ValueWrapper} holding the cached value,
* or {@code null} if none is found
*/
@Nullable
- private Cache.ValueWrapper findCachedItem(Collection contexts) {
- Object result = CacheOperationExpressionEvaluator.NO_RESULT;
+ private Object findCachedValue(Collection contexts) {
for (CacheOperationContext context : contexts) {
- if (isConditionPassing(context, result)) {
- Object key = generateKey(context, result);
- Cache.ValueWrapper cached = findInCaches(context, key);
+ if (isConditionPassing(context, CacheOperationExpressionEvaluator.NO_RESULT)) {
+ Object key = generateKey(context, CacheOperationExpressionEvaluator.NO_RESULT);
+ Object cached = findInCaches(context, key);
if (cached != null) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Cache entry for key '" + key + "' found in cache(s) " + context.getCacheNames());
+ }
return cached;
}
else {
@@ -547,9 +597,9 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
/**
* Collect the {@link CachePutRequest} for all {@link CacheOperation} using
- * the specified result item.
+ * the specified result value.
* @param contexts the contexts to handle
- * @param result the result item (never {@code null})
+ * @param result the result value (never {@code null})
* @param putRequests the collection to update
*/
private void collectPutRequests(Collection contexts,
@@ -564,15 +614,18 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
}
@Nullable
- private Cache.ValueWrapper findInCaches(CacheOperationContext context, Object key) {
+ private Object findInCaches(CacheOperationContext context, Object key) {
for (Cache cache : context.getCaches()) {
- Cache.ValueWrapper wrapper = doGet(cache, key);
- if (wrapper != null) {
- if (logger.isTraceEnabled()) {
- logger.trace("Cache entry for key '" + key + "' found in cache '" + cache.getName() + "'");
+ if (CompletableFuture.class.isAssignableFrom(context.getMethod().getReturnType())) {
+ return cache.retrieve(key);
+ }
+ if (this.reactiveCachingHandler != null) {
+ Object returnValue = this.reactiveCachingHandler.findInCaches(context, cache, key);
+ if (returnValue != ReactiveCachingHandler.NOT_HANDLED) {
+ return returnValue;
}
- return wrapper;
}
+ return doGet(cache, key);
}
return null;
}
@@ -625,13 +678,13 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
}
private boolean determineSyncFlag(Method method) {
- List cacheOperationContexts = this.contexts.get(CacheableOperation.class);
- if (cacheOperationContexts == null) { // no @Cacheable operation at all
+ List cacheableContexts = this.contexts.get(CacheableOperation.class);
+ if (cacheableContexts == null) { // no @Cacheable operation at all
return false;
}
boolean syncEnabled = false;
- for (CacheOperationContext cacheOperationContext : cacheOperationContexts) {
- if (((CacheableOperation) cacheOperationContext.getOperation()).isSync()) {
+ for (CacheOperationContext context : cacheableContexts) {
+ if (context.getOperation() instanceof CacheableOperation cacheable && cacheable.isSync()) {
syncEnabled = true;
break;
}
@@ -641,13 +694,13 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
throw new IllegalStateException(
"A sync=true operation cannot be combined with other cache operations on '" + method + "'");
}
- if (cacheOperationContexts.size() > 1) {
+ if (cacheableContexts.size() > 1) {
throw new IllegalStateException(
"Only one sync=true operation is allowed on '" + method + "'");
}
- CacheOperationContext cacheOperationContext = cacheOperationContexts.iterator().next();
- CacheOperation operation = cacheOperationContext.getOperation();
- if (cacheOperationContext.getCaches().size() > 1) {
+ CacheOperationContext cacheableContext = cacheableContexts.iterator().next();
+ CacheOperation operation = cacheableContext.getOperation();
+ if (cacheableContext.getCaches().size() > 1) {
throw new IllegalStateException(
"A sync=true operation is restricted to a single cache on '" + operation + "'");
}
@@ -720,7 +773,7 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
this.args = extractArgs(metadata.method, args);
this.target = target;
this.caches = CacheAspectSupport.this.getCaches(this, metadata.cacheResolver);
- this.cacheNames = createCacheNames(this.caches);
+ this.cacheNames = prepareCacheNames(this.caches);
}
@Override
@@ -808,8 +861,8 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
return this.cacheNames;
}
- private Collection createCacheNames(Collection extends Cache> caches) {
- Collection names = new ArrayList<>();
+ private Collection prepareCacheNames(Collection extends Cache> caches) {
+ Collection names = new ArrayList<>(caches.size());
for (Cache cache : caches) {
names.add(cache.getName());
}
@@ -818,25 +871,6 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
}
- private class CachePutRequest {
-
- private final CacheOperationContext context;
-
- private final Object key;
-
- public CachePutRequest(CacheOperationContext context, Object key) {
- this.context = context;
- this.key = key;
- }
-
- public void apply(@Nullable Object result) {
- for (Cache cache : this.context.getCaches()) {
- doPut(cache, this.key, result);
- }
- }
- }
-
-
private static final class CacheOperationCacheKey implements Comparable {
private final CacheOperation cacheOperation;
@@ -876,12 +910,168 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker
}
+ private class CachePutRequest {
+
+ private final CacheOperationContext context;
+
+ private final Object key;
+
+ public CachePutRequest(CacheOperationContext context, Object key) {
+ this.context = context;
+ this.key = key;
+ }
+
+ @Nullable
+ public Object apply(@Nullable Object result) {
+ if (result instanceof CompletableFuture> future) {
+ return future.whenComplete((value, ex) -> {
+ if (ex != null) {
+ performEvict(ex);
+ }
+ else {
+ performPut(value);
+ }
+ });
+ }
+ if (reactiveCachingHandler != null) {
+ Object returnValue = reactiveCachingHandler.processPutRequest(this, result);
+ if (returnValue != ReactiveCachingHandler.NOT_HANDLED) {
+ return returnValue;
+ }
+ }
+ performPut(result);
+ return null;
+ }
+
+ void performPut(@Nullable Object value) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Creating cache entry for key '" + this.key + "' in cache(s) " +
+ this.context.getCacheNames());
+ }
+ for (Cache cache : this.context.getCaches()) {
+ doPut(cache, this.key, value);
+ }
+ }
+
+ void performEvict(Throwable cause) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Removing cache entry for key '" + this.key + "' from cache(s) " +
+ this.context.getCacheNames() + " due to exception: " + cause);
+ }
+ for (Cache cache : this.context.getCaches()) {
+ doEvict(cache, this.key, false);
+ }
+ }
+ }
+
+
/**
- * Internal holder class for recording that a cache method was invoked.
+ * Reactive Streams Subscriber collection for collecting a List to cache.
*/
- private static class InvocationAwareResult {
+ private class CachePutListSubscriber implements Subscriber {
+
+ private final CachePutRequest request;
- boolean invoked;
+ private final List cacheValue = new ArrayList<>();
+
+ public CachePutListSubscriber(CachePutRequest request) {
+ this.request = request;
+ }
+
+ @Override
+ public void onSubscribe(Subscription s) {
+ s.request(Integer.MAX_VALUE);
+ }
+ @Override
+ public void onNext(Object o) {
+ this.cacheValue.add(o);
+ }
+ @Override
+ public void onError(Throwable t) {
+ this.request.performEvict(t);
+ }
+ @Override
+ public void onComplete() {
+ this.request.performPut(this.cacheValue);
+ }
+ }
+
+
+ /**
+ * Inner class to avoid a hard dependency on the Reactive Streams API at runtime.
+ */
+ private class ReactiveCachingHandler {
+
+ public static final Object NOT_HANDLED = new Object();
+
+ private final ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance();
+
+ @Nullable
+ public Object executeSynchronized(CacheOperationInvoker invoker, Method method, Cache cache, Object key) {
+ ReactiveAdapter adapter = this.registry.getAdapter(method.getReturnType());
+ if (adapter != null) {
+ if (adapter.isMultiValue()) {
+ // Flux or similar
+ return adapter.fromPublisher(Flux.from(Mono.fromFuture(
+ cache.retrieve(key,
+ () -> Flux.from(adapter.toPublisher(invokeOperation(invoker))).collectList().toFuture())))
+ .flatMap(Flux::fromIterable));
+ }
+ else {
+ // Mono or similar
+ return adapter.fromPublisher(Mono.fromFuture(
+ cache.retrieve(key,
+ () -> Mono.from(adapter.toPublisher(invokeOperation(invoker))).toFuture())));
+ }
+ }
+ return NOT_HANDLED;
+ }
+
+ @Nullable
+ public Object processCacheEvicts(List contexts, @Nullable Object result) {
+ ReactiveAdapter adapter = (result != null ? this.registry.getAdapter(result.getClass()) : null);
+ if (adapter != null) {
+ return adapter.fromPublisher(Mono.from(adapter.toPublisher(result))
+ .doOnSuccess(value -> performCacheEvicts(contexts, result)));
+ }
+ return NOT_HANDLED;
+ }
+
+ @Nullable
+ public Object findInCaches(CacheOperationContext context, Cache cache, Object key) {
+ ReactiveAdapter adapter = this.registry.getAdapter(context.getMethod().getReturnType());
+ if (adapter != null) {
+ CompletableFuture> cachedFuture = cache.retrieve(key);
+ if (cachedFuture == null) {
+ return null;
+ }
+ if (adapter.isMultiValue()) {
+ return adapter.fromPublisher(Flux.from(Mono.fromFuture(cachedFuture))
+ .flatMap(v -> (v instanceof Iterable> iv ? Flux.fromIterable(iv) : Flux.just(v))));
+ }
+ else {
+ return adapter.fromPublisher(Mono.fromFuture(cachedFuture));
+ }
+ }
+ return NOT_HANDLED;
+ }
+
+ @Nullable
+ public Object processPutRequest(CachePutRequest request, @Nullable Object result) {
+ ReactiveAdapter adapter = (result != null ? this.registry.getAdapter(result.getClass()) : null);
+ if (adapter != null) {
+ if (adapter.isMultiValue()) {
+ Flux> source = Flux.from(adapter.toPublisher(result));
+ source.subscribe(new CachePutListSubscriber(request));
+ return adapter.fromPublisher(source);
+ }
+ else {
+ return adapter.fromPublisher(Mono.from(adapter.toPublisher(result))
+ .doOnSuccess(request::performPut).doOnError(request::performEvict));
+ }
+ }
+ return NOT_HANDLED;
+ }
}
}
diff --git a/spring-context/src/main/java/org/springframework/cache/support/NoOpCache.java b/spring-context/src/main/java/org/springframework/cache/support/NoOpCache.java
index 6c814ff18e0..b8746e97f91 100644
--- a/spring-context/src/main/java/org/springframework/cache/support/NoOpCache.java
+++ b/spring-context/src/main/java/org/springframework/cache/support/NoOpCache.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2020 the original author or authors.
+ * Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,8 @@
package org.springframework.cache.support;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
import org.springframework.cache.Cache;
import org.springframework.lang.Nullable;
@@ -80,6 +82,17 @@ public class NoOpCache implements Cache {
}
}
+ @Override
+ @Nullable
+ public CompletableFuture> retrieve(Object key) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture retrieve(Object key, Supplier> valueLoader) {
+ return valueLoader.get();
+ }
+
@Override
public void put(Object key, @Nullable Object value) {
}
diff --git a/spring-context/src/test/java/org/springframework/cache/CacheReproTests.java b/spring-context/src/test/java/org/springframework/cache/CacheReproTests.java
index 48878ef4e18..e538deedd54 100644
--- a/spring-context/src/test/java/org/springframework/cache/CacheReproTests.java
+++ b/spring-context/src/test/java/org/springframework/cache/CacheReproTests.java
@@ -20,11 +20,15 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
import org.springframework.beans.testfixture.beans.TestBean;
+import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.CachePut;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.cache.annotation.Caching;
@@ -118,6 +122,7 @@ class CacheReproTests {
assertThat(cacheResolver.getCache("foo").get("foo")).isNull();
Object result = bean.getSimple("foo"); // cache name = id
assertThat(cacheResolver.getCache("foo").get("foo").get()).isEqualTo(result);
+
context.close();
}
@@ -127,7 +132,7 @@ class CacheReproTests {
Spr13081Service bean = context.getBean(Spr13081Service.class);
assertThatIllegalStateException().isThrownBy(() -> bean.getSimple(null))
- .withMessageContaining(MyCacheResolver.class.getName());
+ .withMessageContaining(MyCacheResolver.class.getName());
context.close();
}
@@ -146,6 +151,7 @@ class CacheReproTests {
TestBean tb2 = bean.findById("tb1").get();
assertThat(tb2).isNotSameAs(tb);
assertThat(cache.get("tb1").get()).isSameAs(tb2);
+
context.close();
}
@@ -164,6 +170,151 @@ class CacheReproTests {
TestBean tb2 = bean.findById("tb1").get();
assertThat(tb2).isNotSameAs(tb);
assertThat(cache.get("tb1").get()).isSameAs(tb2);
+
+ context.close();
+ }
+
+ @Test
+ void spr14235AdaptsToCompletableFuture() {
+ AnnotationConfigApplicationContext context =
+ new AnnotationConfigApplicationContext(Spr14235Config.class, Spr14235FutureService.class);
+ Spr14235FutureService bean = context.getBean(Spr14235FutureService.class);
+ Cache cache = context.getBean(CacheManager.class).getCache("itemCache");
+
+ TestBean tb = bean.findById("tb1").join();
+ assertThat(bean.findById("tb1").join()).isSameAs(tb);
+ assertThat(cache.get("tb1").get()).isSameAs(tb);
+
+ bean.clear().join();
+ TestBean tb2 = bean.findById("tb1").join();
+ assertThat(tb2).isNotSameAs(tb);
+ assertThat(cache.get("tb1").get()).isSameAs(tb2);
+
+ bean.clear().join();
+ bean.insertItem(tb).join();
+ assertThat(bean.findById("tb1").join()).isSameAs(tb);
+ assertThat(cache.get("tb1").get()).isSameAs(tb);
+
+ context.close();
+ }
+
+ @Test
+ void spr14235AdaptsToCompletableFutureWithSync() throws Exception {
+ AnnotationConfigApplicationContext context =
+ new AnnotationConfigApplicationContext(Spr14235Config.class, Spr14235FutureServiceSync.class);
+ Spr14235FutureServiceSync bean = context.getBean(Spr14235FutureServiceSync.class);
+ Cache cache = context.getBean(CacheManager.class).getCache("itemCache");
+
+ TestBean tb = bean.findById("tb1").get();
+ assertThat(bean.findById("tb1").get()).isSameAs(tb);
+ assertThat(cache.get("tb1").get()).isSameAs(tb);
+
+ cache.clear();
+ TestBean tb2 = bean.findById("tb1").get();
+ assertThat(tb2).isNotSameAs(tb);
+ assertThat(cache.get("tb1").get()).isSameAs(tb2);
+
+ cache.clear();
+ bean.insertItem(tb);
+ assertThat(bean.findById("tb1").get()).isSameAs(tb);
+ assertThat(cache.get("tb1").get()).isSameAs(tb);
+
+ context.close();
+ }
+
+ @Test
+ void spr14235AdaptsToReactorMono() {
+ AnnotationConfigApplicationContext context =
+ new AnnotationConfigApplicationContext(Spr14235Config.class, Spr14235MonoService.class);
+ Spr14235MonoService bean = context.getBean(Spr14235MonoService.class);
+ Cache cache = context.getBean(CacheManager.class).getCache("itemCache");
+
+ TestBean tb = bean.findById("tb1").block();
+ assertThat(bean.findById("tb1").block()).isSameAs(tb);
+ assertThat(cache.get("tb1").get()).isSameAs(tb);
+
+ bean.clear().block();
+ TestBean tb2 = bean.findById("tb1").block();
+ assertThat(tb2).isNotSameAs(tb);
+ assertThat(cache.get("tb1").get()).isSameAs(tb2);
+
+ bean.clear().block();
+ bean.insertItem(tb).block();
+ assertThat(bean.findById("tb1").block()).isSameAs(tb);
+ assertThat(cache.get("tb1").get()).isSameAs(tb);
+
+ context.close();
+ }
+
+ @Test
+ void spr14235AdaptsToReactorMonoWithSync() {
+ AnnotationConfigApplicationContext context =
+ new AnnotationConfigApplicationContext(Spr14235Config.class, Spr14235MonoServiceSync.class);
+ Spr14235MonoServiceSync bean = context.getBean(Spr14235MonoServiceSync.class);
+ Cache cache = context.getBean(CacheManager.class).getCache("itemCache");
+
+ TestBean tb = bean.findById("tb1").block();
+ assertThat(bean.findById("tb1").block()).isSameAs(tb);
+ assertThat(cache.get("tb1").get()).isSameAs(tb);
+
+ cache.clear();
+ TestBean tb2 = bean.findById("tb1").block();
+ assertThat(tb2).isNotSameAs(tb);
+ assertThat(cache.get("tb1").get()).isSameAs(tb2);
+
+ cache.clear();
+ bean.insertItem(tb);
+ assertThat(bean.findById("tb1").block()).isSameAs(tb);
+ assertThat(cache.get("tb1").get()).isSameAs(tb);
+
+ context.close();
+ }
+
+ @Test
+ void spr14235AdaptsToReactorFlux() {
+ AnnotationConfigApplicationContext context =
+ new AnnotationConfigApplicationContext(Spr14235Config.class, Spr14235FluxService.class);
+ Spr14235FluxService bean = context.getBean(Spr14235FluxService.class);
+ Cache cache = context.getBean(CacheManager.class).getCache("itemCache");
+
+ List tb = bean.findById("tb1").collectList().block();
+ assertThat(bean.findById("tb1").collectList().block()).isEqualTo(tb);
+ assertThat(cache.get("tb1").get()).isEqualTo(tb);
+
+ bean.clear().blockLast();
+ List tb2 = bean.findById("tb1").collectList().block();
+ assertThat(tb2).isNotEqualTo(tb);
+ assertThat(cache.get("tb1").get()).isEqualTo(tb2);
+
+ bean.clear().blockLast();
+ bean.insertItem("tb1", tb).blockLast();
+ assertThat(bean.findById("tb1").collectList().block()).isEqualTo(tb);
+ assertThat(cache.get("tb1").get()).isEqualTo(tb);
+
+ context.close();
+ }
+
+ @Test
+ void spr14235AdaptsToReactorFluxWithSync() {
+ AnnotationConfigApplicationContext context =
+ new AnnotationConfigApplicationContext(Spr14235Config.class, Spr14235FluxServiceSync.class);
+ Spr14235FluxServiceSync bean = context.getBean(Spr14235FluxServiceSync.class);
+ Cache cache = context.getBean(CacheManager.class).getCache("itemCache");
+
+ List tb = bean.findById("tb1").collectList().block();
+ assertThat(bean.findById("tb1").collectList().block()).isEqualTo(tb);
+ assertThat(cache.get("tb1").get()).isEqualTo(tb);
+
+ cache.clear();
+ List tb2 = bean.findById("tb1").collectList().block();
+ assertThat(tb2).isNotEqualTo(tb);
+ assertThat(cache.get("tb1").get()).isEqualTo(tb2);
+
+ cache.clear();
+ bean.insertItem("tb1", tb);
+ assertThat(bean.findById("tb1").collectList().block()).isEqualTo(tb);
+ assertThat(cache.get("tb1").get()).isEqualTo(tb);
+
context.close();
}
@@ -177,6 +328,7 @@ class CacheReproTests {
bean.insertItem(tb);
assertThat(bean.findById("tb1").get()).isSameAs(tb);
assertThat(cache.get("tb1").get()).isSameAs(tb);
+
context.close();
}
@@ -190,6 +342,7 @@ class CacheReproTests {
bean.insertItem(tb);
assertThat(bean.findById("tb1").get()).isSameAs(tb);
assertThat(cache.get("tb1").get()).isSameAs(tb);
+
context.close();
}
@@ -387,6 +540,120 @@ class CacheReproTests {
}
+ public static class Spr14235FutureService {
+
+ @Cacheable(value = "itemCache")
+ public CompletableFuture findById(String id) {
+ return CompletableFuture.completedFuture(new TestBean(id));
+ }
+
+ @CachePut(cacheNames = "itemCache", key = "#item.name")
+ public CompletableFuture insertItem(TestBean item) {
+ return CompletableFuture.completedFuture(item);
+ }
+
+ @CacheEvict(cacheNames = "itemCache", allEntries = true)
+ public CompletableFuture clear() {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+
+
+ public static class Spr14235FutureServiceSync {
+
+ @Cacheable(value = "itemCache", sync = true)
+ public CompletableFuture findById(String id) {
+ return CompletableFuture.completedFuture(new TestBean(id));
+ }
+
+ @CachePut(cacheNames = "itemCache", key = "#item.name")
+ public TestBean insertItem(TestBean item) {
+ return item;
+ }
+ }
+
+
+ public static class Spr14235MonoService {
+
+ @Cacheable(value = "itemCache")
+ public Mono findById(String id) {
+ return Mono.just(new TestBean(id));
+ }
+
+ @CachePut(cacheNames = "itemCache", key = "#item.name")
+ public Mono insertItem(TestBean item) {
+ return Mono.just(item);
+ }
+
+ @CacheEvict(cacheNames = "itemCache", allEntries = true)
+ public Mono clear() {
+ return Mono.empty();
+ }
+ }
+
+
+ public static class Spr14235MonoServiceSync {
+
+ @Cacheable(value = "itemCache", sync = true)
+ public Mono findById(String id) {
+ return Mono.just(new TestBean(id));
+ }
+
+ @CachePut(cacheNames = "itemCache", key = "#item.name")
+ public TestBean insertItem(TestBean item) {
+ return item;
+ }
+ }
+
+
+ public static class Spr14235FluxService {
+
+ private int counter = 0;
+
+ @Cacheable(value = "itemCache")
+ public Flux findById(String id) {
+ return Flux.just(new TestBean(id), new TestBean(id + (counter++)));
+ }
+
+ @CachePut(cacheNames = "itemCache", key = "#id")
+ public Flux insertItem(String id, List item) {
+ return Flux.fromIterable(item);
+ }
+
+ @CacheEvict(cacheNames = "itemCache", allEntries = true)
+ public Flux clear() {
+ return Flux.empty();
+ }
+ }
+
+
+ public static class Spr14235FluxServiceSync {
+
+ private int counter = 0;
+
+ @Cacheable(value = "itemCache", sync = true)
+ public Flux findById(String id) {
+ return Flux.just(new TestBean(id), new TestBean(id + (counter++)));
+ }
+
+ @CachePut(cacheNames = "itemCache", key = "#id")
+ public List insertItem(String id, List item) {
+ return item;
+ }
+ }
+
+
+ @Configuration
+ @EnableCaching
+ public static class Spr14235Config {
+
+ @Bean
+ public CacheManager cacheManager() {
+ return new ConcurrentMapCacheManager();
+ }
+ }
+
+
public static class Spr14853Service {
@Cacheable(value = "itemCache", sync = true)