diff --git a/spring-context/src/main/java/org/springframework/cache/interceptor/AbstractCacheInvoker.java b/spring-context/src/main/java/org/springframework/cache/interceptor/AbstractCacheInvoker.java index 84e36757d61..a9cef86ba84 100644 --- a/spring-context/src/main/java/org/springframework/cache/interceptor/AbstractCacheInvoker.java +++ b/spring-context/src/main/java/org/springframework/cache/interceptor/AbstractCacheInvoker.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -105,7 +105,7 @@ public abstract class AbstractCacheInvoker { return valueLoader.call(); } catch (Exception ex2) { - throw new RuntimeException(ex2); + throw new Cache.ValueRetrievalException(key, valueLoader, ex); } } } @@ -124,16 +124,12 @@ public abstract class AbstractCacheInvoker { try { return cache.retrieve(key); } - catch (Cache.ValueRetrievalException ex) { - throw ex; - } catch (RuntimeException ex) { getErrorHandler().handleCacheGetError(ex, cache, key); return null; } } - /** * Execute {@link Cache#retrieve(Object, Supplier)} on the specified * {@link Cache} and invoke the error handler if an exception occurs. @@ -146,9 +142,6 @@ public abstract class AbstractCacheInvoker { try { return cache.retrieve(key, valueLoader); } - catch (Cache.ValueRetrievalException ex) { - throw ex; - } catch (RuntimeException ex) { getErrorHandler().handleCacheGetError(ex, cache, key); return valueLoader.get(); diff --git a/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java b/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java index 8bf8fc85d3b..65050fea3ac 100644 --- a/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java +++ b/spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import org.apache.commons.logging.Log; @@ -449,6 +450,7 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker return cacheHit; } + @SuppressWarnings("unchecked") @Nullable private Object executeSynchronized(CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) { CacheOperationContext context = contexts.get(CacheableOperation.class).iterator().next(); @@ -456,7 +458,33 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker Object key = generateKey(context, CacheOperationExpressionEvaluator.NO_RESULT); Cache cache = context.getCaches().iterator().next(); if (CompletableFuture.class.isAssignableFrom(method.getReturnType())) { - return doRetrieve(cache, key, () -> (CompletableFuture) invokeOperation(invoker)); + AtomicBoolean invokeFailure = new AtomicBoolean(false); + CompletableFuture result = doRetrieve(cache, key, + () -> { + CompletableFuture invokeResult = ((CompletableFuture) invokeOperation(invoker)); + if (invokeResult == null) { + return null; + } + return invokeResult.exceptionallyCompose(ex -> { + invokeFailure.set(true); + return CompletableFuture.failedFuture(ex); + }); + }); + return result.exceptionallyCompose(ex -> { + if (!(ex instanceof RuntimeException rex)) { + return CompletableFuture.failedFuture(ex); + } + try { + getErrorHandler().handleCacheGetError(rex, cache, key); + if (invokeFailure.get()) { + return CompletableFuture.failedFuture(ex); + } + return (CompletableFuture) invokeOperation(invoker); + } + catch (Throwable ex2) { + return CompletableFuture.failedFuture(ex2); + } + }); } if (this.reactiveCachingHandler != null) { Object returnValue = this.reactiveCachingHandler.executeSynchronized(invoker, method, cache, key); @@ -517,9 +545,17 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker if (CompletableFuture.class.isAssignableFrom(context.getMethod().getReturnType())) { CompletableFuture result = doRetrieve(cache, key); if (result != null) { - return result.exceptionally(ex -> { - getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key); - return null; + return result.exceptionallyCompose(ex -> { + if (!(ex instanceof RuntimeException rex)) { + return CompletableFuture.failedFuture(ex); + } + try { + getErrorHandler().handleCacheGetError(rex, cache, key); + return CompletableFuture.completedFuture(null); + } + catch (Throwable ex2) { + return CompletableFuture.failedFuture(ex2); + } }).thenCompose(value -> (CompletableFuture) evaluate( (value != null ? CompletableFuture.completedFuture(unwrapCacheValue(value)) : null), invoker, method, contexts)); @@ -1097,32 +1133,72 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker private final ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance(); + @SuppressWarnings({"rawtypes", "unchecked"}) @Nullable public Object executeSynchronized(CacheOperationInvoker invoker, Method method, Cache cache, Object key) { + AtomicBoolean invokeFailure = new AtomicBoolean(false); ReactiveAdapter adapter = this.registry.getAdapter(method.getReturnType()); if (adapter != null) { if (adapter.isMultiValue()) { // Flux or similar return adapter.fromPublisher(Flux.from(Mono.fromFuture( - cache.retrieve(key, - () -> Flux.from(adapter.toPublisher(invokeOperation(invoker))).collectList().toFuture()))) - .flatMap(Flux::fromIterable)); + doRetrieve(cache, key, + () -> Flux.from(adapter.toPublisher(invokeOperation(invoker))).collectList().doOnError(ex -> invokeFailure.set(true)).toFuture()))) + .flatMap(Flux::fromIterable) + .onErrorResume(RuntimeException.class, ex -> { + try { + getErrorHandler().handleCacheGetError(ex, cache, key); + if (invokeFailure.get()) { + return Flux.error(ex); + } + return Flux.from(adapter.toPublisher(invokeOperation(invoker))); + } + catch (RuntimeException exception) { + return Flux.error(exception); + } + })); } else { // Mono or similar return adapter.fromPublisher(Mono.fromFuture( - cache.retrieve(key, - () -> Mono.from(adapter.toPublisher(invokeOperation(invoker))).toFuture()))); + doRetrieve(cache, key, + () -> Mono.from(adapter.toPublisher(invokeOperation(invoker))).doOnError(ex -> invokeFailure.set(true)).toFuture())) + .onErrorResume(RuntimeException.class, ex -> { + try { + getErrorHandler().handleCacheGetError(ex, cache, key); + if (invokeFailure.get()) { + return Mono.error(ex); + } + return Mono.from(adapter.toPublisher(invokeOperation(invoker))); + } + catch (RuntimeException exception) { + return Mono.error(exception); + } + })); } } if (KotlinDetector.isKotlinReflectPresent() && KotlinDetector.isSuspendingFunction(method)) { - return Mono.fromFuture(cache.retrieve(key, () -> { - Mono mono = ((Mono) invokeOperation(invoker)); - if (mono == null) { + return Mono.fromFuture(doRetrieve(cache, key, () -> { + Mono mono = (Mono) invokeOperation(invoker); + if (mono != null) { + mono = mono.doOnError(ex -> invokeFailure.set(true)); + } + else { mono = Mono.empty(); } return mono.toFuture(); - })); + })).onErrorResume(RuntimeException.class, ex -> { + try { + getErrorHandler().handleCacheGetError(ex, cache, key); + if (invokeFailure.get()) { + return Mono.error(ex); + } + return (Mono) invokeOperation(invoker); + } + catch (RuntimeException exception) { + return Mono.error(exception); + } + }); } return NOT_HANDLED; } @@ -1137,7 +1213,7 @@ public abstract class CacheAspectSupport extends AbstractCacheInvoker return NOT_HANDLED; } - @SuppressWarnings({ "unchecked", "rawtypes" }) + @SuppressWarnings({"rawtypes", "unchecked"}) @Nullable public Object findInCaches(CacheOperationContext context, Cache cache, Object key, CacheOperationInvoker invoker, Method method, CacheOperationContexts contexts) { diff --git a/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java b/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java index c43df31de41..15a16625597 100644 --- a/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java +++ b/spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2024 the original author or authors. + * Copyright 2002-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,9 @@ package org.springframework.cache.annotation; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -40,6 +42,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.lang.Nullable; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable; /** @@ -58,8 +61,8 @@ class ReactiveCachingTests { LateCacheHitDeterminationWithValueWrapperConfig.class}) void cacheHitDetermination(Class configClass) { - AnnotationConfigApplicationContext ctx = - new AnnotationConfigApplicationContext(configClass, ReactiveCacheableService.class); + AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext( + configClass, ReactiveCacheableService.class); ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class); Object key = new Object(); @@ -119,58 +122,58 @@ class ReactiveCachingTests { ctx.close(); } - @Test - void cacheErrorHandlerWithLoggingCacheErrorHandler() { - AnnotationConfigApplicationContext ctx = - new AnnotationConfigApplicationContext(ExceptionCacheManager.class, ReactiveCacheableService.class, ErrorHandlerCachingConfiguration.class); + @ParameterizedTest + @ValueSource(classes = {EarlyCacheHitDeterminationConfig.class, + EarlyCacheHitDeterminationWithoutNullValuesConfig.class, + LateCacheHitDeterminationConfig.class, + LateCacheHitDeterminationWithValueWrapperConfig.class}) + void fluxCacheDoesntDependOnFirstRequest(Class configClass) { + + AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext( + configClass, ReactiveCacheableService.class); ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class); Object key = new Object(); - Long r1 = service.cacheFuture(key).join(); - - assertThat(r1).isNotNull(); - assertThat(r1).as("cacheFuture").isEqualTo(0L); - key = new Object(); - - r1 = service.cacheMono(key).block(); - - assertThat(r1).isNotNull(); - assertThat(r1).as("cacheMono").isEqualTo(1L); + List l1 = service.cacheFlux(key).take(1L, true).collectList().block(); + List l2 = service.cacheFlux(key).take(3L, true).collectList().block(); + List l3 = service.cacheFlux(key).collectList().block(); - key = new Object(); + Long first = l1.get(0); - r1 = service.cacheFlux(key).blockFirst(); + assertThat(l1).as("l1").containsExactly(first); + assertThat(l2).as("l2").containsExactly(first, 0L, -1L); + assertThat(l3).as("l3").containsExactly(first, 0L, -1L, -2L, -3L); - assertThat(r1).isNotNull(); - assertThat(r1).as("cacheFlux blockFirst").isEqualTo(2L); + ctx.close(); } @Test - void cacheErrorHandlerWithLoggingCacheErrorHandlerAndMethodError() { - AnnotationConfigApplicationContext ctx = - new AnnotationConfigApplicationContext(ExceptionCacheManager.class, ReactiveFailureCacheableService.class, ErrorHandlerCachingConfiguration.class); + void cacheErrorHandlerWithSimpleCacheErrorHandler() { + AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext( + ExceptionCacheManager.class, ReactiveCacheableService.class); ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class); - Object key = new Object(); - StepVerifier.create(service.cacheMono(key)) - .expectErrorMessage("mono service error") - .verify(); + Throwable completableFutureThrowable = catchThrowable(() -> service.cacheFuture(new Object()).join()); + assertThat(completableFutureThrowable).isInstanceOf(CompletionException.class) + .extracting(Throwable::getCause) + .isInstanceOf(UnsupportedOperationException.class); - key = new Object(); - StepVerifier.create(service.cacheFlux(key)) - .expectErrorMessage("flux service error") - .verify(); + Throwable monoThrowable = catchThrowable(() -> service.cacheMono(new Object()).block()); + assertThat(monoThrowable).isInstanceOf(UnsupportedOperationException.class); + + Throwable fluxThrowable = catchThrowable(() -> service.cacheFlux(new Object()).blockFirst()); + assertThat(fluxThrowable).isInstanceOf(UnsupportedOperationException.class); } @Test - void cacheErrorHandlerWithSimpleCacheErrorHandler() { - AnnotationConfigApplicationContext ctx = - new AnnotationConfigApplicationContext(ExceptionCacheManager.class, ReactiveCacheableService.class); - ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class); + void cacheErrorHandlerWithSimpleCacheErrorHandlerAndSync() { + AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext( + ExceptionCacheManager.class, ReactiveSyncCacheableService.class); + ReactiveSyncCacheableService service = ctx.getBean(ReactiveSyncCacheableService.class); - Throwable completableFuturThrowable = catchThrowable(() -> service.cacheFuture(new Object()).join()); - assertThat(completableFuturThrowable).isInstanceOf(CompletionException.class) + Throwable completableFutureThrowable = catchThrowable(() -> service.cacheFuture(new Object()).join()); + assertThat(completableFutureThrowable).isInstanceOf(CompletionException.class) .extracting(Throwable::getCause) .isInstanceOf(UnsupportedOperationException.class); @@ -181,32 +184,81 @@ class ReactiveCachingTests { assertThat(fluxThrowable).isInstanceOf(UnsupportedOperationException.class); } - @ParameterizedTest - @ValueSource(classes = {EarlyCacheHitDeterminationConfig.class, - EarlyCacheHitDeterminationWithoutNullValuesConfig.class, - LateCacheHitDeterminationConfig.class, - LateCacheHitDeterminationWithValueWrapperConfig.class}) - void fluxCacheDoesntDependOnFirstRequest(Class configClass) { + @Test + void cacheErrorHandlerWithLoggingCacheErrorHandler() { + AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext( + ExceptionCacheManager.class, ReactiveCacheableService.class, ErrorHandlerCachingConfiguration.class); + ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class); + Long r1 = service.cacheFuture(new Object()).join(); + assertThat(r1).isNotNull(); + assertThat(r1).as("cacheFuture").isEqualTo(0L); + + r1 = service.cacheMono(new Object()).block(); + assertThat(r1).isNotNull(); + assertThat(r1).as("cacheMono").isEqualTo(1L); + + r1 = service.cacheFlux(new Object()).blockFirst(); + assertThat(r1).isNotNull(); + assertThat(r1).as("cacheFlux blockFirst").isEqualTo(2L); + } + + @Test + void cacheErrorHandlerWithLoggingCacheErrorHandlerAndSync() { AnnotationConfigApplicationContext ctx = - new AnnotationConfigApplicationContext(configClass, ReactiveCacheableService.class); - ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class); + new AnnotationConfigApplicationContext(ExceptionCacheManager.class, ReactiveSyncCacheableService.class, ErrorHandlerCachingConfiguration.class); + ReactiveSyncCacheableService service = ctx.getBean(ReactiveSyncCacheableService.class); - Object key = new Object(); + Long r1 = service.cacheFuture(new Object()).join(); + assertThat(r1).isNotNull(); + assertThat(r1).as("cacheFuture").isEqualTo(0L); - List l1 = service.cacheFlux(key).take(1L, true).collectList().block(); - List l2 = service.cacheFlux(key).take(3L, true).collectList().block(); - List l3 = service.cacheFlux(key).collectList().block(); + r1 = service.cacheMono(new Object()).block(); + assertThat(r1).isNotNull(); + assertThat(r1).as("cacheMono").isEqualTo(1L); - Long first = l1.get(0); + r1 = service.cacheFlux(new Object()).blockFirst(); + assertThat(r1).isNotNull(); + assertThat(r1).as("cacheFlux blockFirst").isEqualTo(2L); + } - assertThat(l1).as("l1").containsExactly(first); - assertThat(l2).as("l2").containsExactly(first, 0L, -1L); - assertThat(l3).as("l3").containsExactly(first, 0L, -1L, -2L, -3L); + @Test + void cacheErrorHandlerWithLoggingCacheErrorHandlerAndOperationException() { + AnnotationConfigApplicationContext ctx = + new AnnotationConfigApplicationContext(EarlyCacheHitDeterminationConfig.class, ReactiveFailureCacheableService.class, ErrorHandlerCachingConfiguration.class); + ReactiveFailureCacheableService service = ctx.getBean(ReactiveFailureCacheableService.class); - ctx.close(); + assertThatExceptionOfType(CompletionException.class).isThrownBy(() -> service.cacheFuture(new Object()).join()) + .withMessage(IllegalStateException.class.getName() + ": future service error"); + + StepVerifier.create(service.cacheMono(new Object())) + .expectErrorMessage("mono service error") + .verify(); + + StepVerifier.create(service.cacheFlux(new Object())) + .expectErrorMessage("flux service error") + .verify(); } + @Test + void cacheErrorHandlerWithLoggingCacheErrorHandlerAndOperationExceptionAndSync() { + AnnotationConfigApplicationContext ctx = + new AnnotationConfigApplicationContext(EarlyCacheHitDeterminationConfig.class, ReactiveSyncFailureCacheableService.class, ErrorHandlerCachingConfiguration.class); + ReactiveSyncFailureCacheableService service = ctx.getBean(ReactiveSyncFailureCacheableService.class); + + assertThatExceptionOfType(CompletionException.class).isThrownBy(() -> service.cacheFuture(new Object()).join()) + .withMessage(IllegalStateException.class.getName() + ": future service error"); + + StepVerifier.create(service.cacheMono(new Object())) + .expectErrorMessage("mono service error") + .verify(); + + StepVerifier.create(service.cacheFlux(new Object())) + .expectErrorMessage("flux service error") + .verify(); + } + + @CacheConfig(cacheNames = "first") static class ReactiveCacheableService { @@ -232,16 +284,94 @@ class ReactiveCachingTests { } } + + @CacheConfig(cacheNames = "first") + static class ReactiveSyncCacheableService { + + private final AtomicLong counter = new AtomicLong(); + + @Cacheable(sync = true) + CompletableFuture cacheFuture(Object arg) { + return CompletableFuture.completedFuture(this.counter.getAndIncrement()); + } + + @Cacheable(sync = true) + Mono cacheMono(Object arg) { + return Mono.defer(() -> Mono.just(this.counter.getAndIncrement())); + } + + @Cacheable(sync = true) + Flux cacheFlux(Object arg) { + return Flux.defer(() -> Flux.just(this.counter.getAndIncrement(), 0L, -1L, -2L, -3L)); + } + } + + @CacheConfig(cacheNames = "first") - static class ReactiveFailureCacheableService extends ReactiveCacheableService { + static class ReactiveFailureCacheableService { + + private final AtomicBoolean cacheFutureInvoked = new AtomicBoolean(); + + private final AtomicBoolean cacheMonoInvoked = new AtomicBoolean(); + + private final AtomicBoolean cacheFluxInvoked = new AtomicBoolean(); + + @Cacheable + CompletableFuture cacheFuture(Object arg) { + if (!this.cacheFutureInvoked.compareAndSet(false, true)) { + return CompletableFuture.failedFuture(new IllegalStateException("future service invoked twice")); + } + return CompletableFuture.failedFuture(new IllegalStateException("future service error")); + } @Cacheable Mono cacheMono(Object arg) { + if (!this.cacheMonoInvoked.compareAndSet(false, true)) { + return Mono.error(new IllegalStateException("mono service invoked twice")); + } return Mono.error(new IllegalStateException("mono service error")); } @Cacheable Flux cacheFlux(Object arg) { + if (!this.cacheFluxInvoked.compareAndSet(false, true)) { + return Flux.error(new IllegalStateException("flux service invoked twice")); + } + return Flux.error(new IllegalStateException("flux service error")); + } + } + + + @CacheConfig(cacheNames = "first") + static class ReactiveSyncFailureCacheableService { + + private final AtomicBoolean cacheFutureInvoked = new AtomicBoolean(); + + private final AtomicBoolean cacheMonoInvoked = new AtomicBoolean(); + + private final AtomicBoolean cacheFluxInvoked = new AtomicBoolean(); + + @Cacheable(sync = true) + CompletableFuture cacheFuture(Object arg) { + if (!this.cacheFutureInvoked.compareAndSet(false, true)) { + return CompletableFuture.failedFuture(new IllegalStateException("future service invoked twice")); + } + return CompletableFuture.failedFuture(new IllegalStateException("future service error")); + } + + @Cacheable(sync = true) + Mono cacheMono(Object arg) { + if (!this.cacheMonoInvoked.compareAndSet(false, true)) { + return Mono.error(new IllegalStateException("mono service invoked twice")); + } + return Mono.error(new IllegalStateException("mono service error")); + } + + @Cacheable(sync = true) + Flux cacheFlux(Object arg) { + if (!this.cacheFluxInvoked.compareAndSet(false, true)) { + return Flux.error(new IllegalStateException("flux service invoked twice")); + } return Flux.error(new IllegalStateException("flux service error")); } } @@ -323,6 +453,7 @@ class ReactiveCachingTests { } } + @Configuration static class ErrorHandlerCachingConfiguration implements CachingConfigurer { @@ -333,6 +464,7 @@ class ReactiveCachingTests { } } + @Configuration(proxyBeanMethods = false) @EnableCaching static class ExceptionCacheManager { @@ -345,11 +477,12 @@ class ReactiveCachingTests { return new ConcurrentMapCache(name, isAllowNullValues()) { @Override public CompletableFuture retrieve(Object key) { - return CompletableFuture.supplyAsync(() -> { - throw new UnsupportedOperationException("Test exception on retrieve"); - }); + return CompletableFuture.failedFuture(new UnsupportedOperationException("Test exception on retrieve")); + } + @Override + public CompletableFuture retrieve(Object key, Supplier> valueLoader) { + return CompletableFuture.failedFuture(new UnsupportedOperationException("Test exception on retrieve")); } - @Override public void put(Object key, @Nullable Object value) { throw new UnsupportedOperationException("Test exception on put");