diff --git a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java index 79feb3e0331..cec67543ed9 100644 --- a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java +++ b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.function.Function; import io.reactivex.BackpressureStrategy; @@ -221,12 +222,8 @@ public class ReactiveAdapterRegistry { source -> source); registry.registerReactiveType( - ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class, () -> { - CompletableFuture empty = new CompletableFuture<>(); - empty.complete(null); - return empty; - }), - source -> Mono.fromFuture((CompletableFuture) source), + ReactiveTypeDescriptor.singleOptionalValue(CompletionStage.class, EmptyCompletableFuture::new), + source -> Mono.fromCompletionStage((CompletionStage) source), source -> Mono.from(source).toFuture() ); } @@ -338,17 +335,28 @@ public class ReactiveAdapterRegistry { } } + + private static class EmptyCompletableFuture extends CompletableFuture { + + EmptyCompletableFuture() { + complete(null); + } + } + + private static class CoroutinesRegistrar { @SuppressWarnings("KotlinInternalInJava") void registerAdapters(ReactiveAdapterRegistry registry) { registry.registerReactiveType( - ReactiveTypeDescriptor.singleOptionalValue(Deferred.class, () -> CompletableDeferredKt.CompletableDeferred(null)), + ReactiveTypeDescriptor.singleOptionalValue(Deferred.class, + () -> CompletableDeferredKt.CompletableDeferred(null)), source -> CoroutinesUtils.deferredToMono((Deferred) source), source -> CoroutinesUtils.monoToDeferred(Mono.from(source))); } } + private static class CoroutinesFlowRegistrar { void registerAdapters(ReactiveAdapterRegistry registry) { diff --git a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java index 712f470c10e..f387418e144 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java +++ b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java @@ -834,16 +834,13 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init return TransactionContextManager.currentContext().flatMap(context -> createTransactionIfNecessary(tm, txAttr, joinpointIdentification).flatMap(it -> { try { - // This is an around advice: Invoke the next interceptor in the chain. - // This will normally result in a target object being invoked. - // Need re-wrapping of ReactiveTransaction until we get hold of the exception - // through usingWhen. + // Need re-wrapping until we get hold of the exception through usingWhen. return Mono.usingWhen(Mono.just(it), txInfo -> { try { return (Mono) invocation.proceedWithInvocation(); } - catch (Throwable throwable) { - return Mono.error(throwable); + catch (Throwable ex) { + return Mono.error(ex); } }, this::commitTransactionAfterReturning, txInfo -> Mono.empty()) .onErrorResume(ex -> completeTransactionAfterThrowing(it, ex).then(Mono.error(ex))); @@ -856,19 +853,17 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init .subscriberContext(TransactionContextManager.getOrCreateContextHolder()); } - return TransactionContextManager.currentContext().flatMapMany(context -> + // Any other reactive type, typically a Flux + return this.adapter.fromPublisher(TransactionContextManager.currentContext().flatMapMany(context -> createTransactionIfNecessary(tm, txAttr, joinpointIdentification).flatMapMany(it -> { try { - // This is an around advice: Invoke the next interceptor in the chain. - // This will normally result in a target object being invoked. - // Need re-wrapping of ReactiveTransaction until we get hold of the exception - // through usingWhen. + // Need re-wrapping until we get hold of the exception through usingWhen. return Flux.usingWhen(Mono.just(it), txInfo -> { try { return this.adapter.toPublisher(invocation.proceedWithInvocation()); } - catch (Throwable throwable) { - return Mono.error(throwable); + catch (Throwable ex) { + return Mono.error(ex); } }, this::commitTransactionAfterReturning, txInfo -> Mono.empty()) .onErrorResume(ex -> completeTransactionAfterThrowing(it, ex).then(Mono.error(ex))); @@ -878,7 +873,7 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init return completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)); } })).subscriberContext(TransactionContextManager.getOrCreateContext()) - .subscriberContext(TransactionContextManager.getOrCreateContextHolder()); + .subscriberContext(TransactionContextManager.getOrCreateContextHolder())); } @Nullable