From 322cbca0dc4de35f7aeee56c8b781d80113a93c6 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Fri, 2 Jun 2023 20:40:55 +0200 Subject: [PATCH] Support for async/reactive close methods (e.g. R2DBC) Closes gh-26991 --- spring-beans/spring-beans.gradle | 1 + .../support/DisposableBeanAdapter.java | 129 +++++++++++++++--- .../DestroyMethodInferenceTests.java | 54 +++++++- ...DatabaseClientContextIntegrationTests.java | 64 +++++++++ 4 files changed, 230 insertions(+), 18 deletions(-) create mode 100644 spring-r2dbc/src/test/java/org/springframework/r2dbc/core/H2DatabaseClientContextIntegrationTests.java diff --git a/spring-beans/spring-beans.gradle b/spring-beans/spring-beans.gradle index 877d28852dc..7b1bc889153 100644 --- a/spring-beans/spring-beans.gradle +++ b/spring-beans/spring-beans.gradle @@ -9,6 +9,7 @@ dependencies { optional("org.apache.groovy:groovy-xml") optional("org.jetbrains.kotlin:kotlin-reflect") optional("org.jetbrains.kotlin:kotlin-stdlib") + optional("org.reactivestreams:reactive-streams") testImplementation(testFixtures(project(":spring-core"))) testImplementation(project(":spring-core-test")) testImplementation("jakarta.annotation:jakarta.annotation-api") diff --git a/spring-beans/src/main/java/org/springframework/beans/factory/support/DisposableBeanAdapter.java b/spring-beans/src/main/java/org/springframework/beans/factory/support/DisposableBeanAdapter.java index c13d5c9600c..cf04f0e8f55 100644 --- a/spring-beans/src/main/java/org/springframework/beans/factory/support/DisposableBeanAdapter.java +++ b/spring-beans/src/main/java/org/springframework/beans/factory/support/DisposableBeanAdapter.java @@ -21,13 +21,20 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.config.DestructionAwareBeanPostProcessor; +import org.springframework.core.ReactiveAdapter; +import org.springframework.core.ReactiveAdapterRegistry; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; @@ -65,8 +72,12 @@ class DisposableBeanAdapter implements DisposableBean, Runnable, Serializable { private static final String SHUTDOWN_METHOD_NAME = "shutdown"; + private static final Log logger = LogFactory.getLog(DisposableBeanAdapter.class); + private static final boolean reactiveStreamsPresent = ClassUtils.isPresent( + "org.reactivestreams.Publisher", DisposableBeanAdapter.class.getClassLoader()); + private final Object bean; @@ -240,7 +251,7 @@ class DisposableBeanAdapter implements DisposableBean, Runnable, Serializable { } } else if (this.destroyMethodNames != null) { - for (String destroyMethodName: this.destroyMethodNames) { + for (String destroyMethodName : this.destroyMethodNames) { Method destroyMethod = determineDestroyMethod(destroyMethodName); if (destroyMethod != null) { invokeCustomDestroyMethod( @@ -287,32 +298,40 @@ class DisposableBeanAdapter implements DisposableBean, Runnable, Serializable { * assuming a "force" parameter), else logging an error. */ private void invokeCustomDestroyMethod(Method destroyMethod) { + if (logger.isTraceEnabled()) { + logger.trace("Invoking custom destroy method '" + destroyMethod.getName() + + "' on bean with name '" + this.beanName + "': " + destroyMethod); + } + int paramCount = destroyMethod.getParameterCount(); - final Object[] args = new Object[paramCount]; + Object[] args = new Object[paramCount]; if (paramCount == 1) { args[0] = Boolean.TRUE; } - if (logger.isTraceEnabled()) { - logger.trace("Invoking custom destroy method '" + destroyMethod.getName() + - "' on bean with name '" + this.beanName + "'"); - } + try { ReflectionUtils.makeAccessible(destroyMethod); - destroyMethod.invoke(this.bean, args); - } - catch (InvocationTargetException ex) { - if (logger.isWarnEnabled()) { - String msg = "Custom destroy method '" + destroyMethod.getName() + "' on bean with name '" + - this.beanName + "' threw an exception"; + Object returnValue = destroyMethod.invoke(this.bean, args); + + if (returnValue == null) { + // Regular case: a void method + logDestroyMethodCompletion(destroyMethod, false); + } + else if (returnValue instanceof Future future) { + // An async task: await its completion. + future.get(); + logDestroyMethodCompletion(destroyMethod, true); + } + else if (!reactiveStreamsPresent || !new ReactiveDestroyMethodHandler().await(destroyMethod, returnValue)) { if (logger.isDebugEnabled()) { - // Log at warn level like below but add the exception stacktrace only with debug level - logger.warn(msg, ex.getTargetException()); - } - else { - logger.warn(msg + ": " + ex.getTargetException()); + logger.debug("Unknown return value type from custom destroy method '" + destroyMethod.getName() + + "' on bean with name '" + this.beanName + "': " + returnValue.getClass()); } } } + catch (InvocationTargetException | ExecutionException ex) { + logDestroyMethodException(destroyMethod, ex.getCause()); + } catch (Throwable ex) { if (logger.isWarnEnabled()) { logger.warn("Failed to invoke custom destroy method '" + destroyMethod.getName() + @@ -321,6 +340,27 @@ class DisposableBeanAdapter implements DisposableBean, Runnable, Serializable { } } + void logDestroyMethodException(Method destroyMethod, Throwable ex) { + if (logger.isWarnEnabled()) { + String msg = "Custom destroy method '" + destroyMethod.getName() + "' on bean with name '" + + this.beanName + "' propagated an exception"; + if (logger.isDebugEnabled()) { + // Log at warn level like below but add the exception stacktrace only with debug level + logger.warn(msg, ex); + } + else { + logger.warn(msg + ": " + ex); + } + } + } + + void logDestroyMethodCompletion(Method destroyMethod, boolean async) { + if (logger.isDebugEnabled()) { + logger.debug("Custom destroy method '" + destroyMethod.getName() + + "' on bean with name '" + this.beanName + "' completed" + (async ? " asynchronously" : "")); + } + } + /** * Serializes a copy of the state of this class, @@ -443,4 +483,59 @@ class DisposableBeanAdapter implements DisposableBean, Runnable, Serializable { return filteredPostProcessors; } + + /** + * Inner class to avoid a hard dependency on the Reactive Streams API at runtime. + */ + private class ReactiveDestroyMethodHandler { + + public boolean await(Method destroyMethod, Object returnValue) throws InterruptedException { + ReactiveAdapter adapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(returnValue.getClass()); + if (adapter != null) { + CountDownLatch latch = new CountDownLatch(1); + adapter.toPublisher(returnValue).subscribe(new DestroyMethodSubscriber(destroyMethod, latch)); + latch.await(); + return true; + } + return false; + } + } + + + /** + * Reactive Streams Subscriber for destroy method completion. + */ + private class DestroyMethodSubscriber implements Subscriber { + + private final Method destroyMethod; + + private final CountDownLatch latch; + + public DestroyMethodSubscriber(Method destroyMethod, CountDownLatch latch) { + this.destroyMethod = destroyMethod; + this.latch = latch; + } + + @Override + public void onSubscribe(Subscription s) { + s.request(Integer.MAX_VALUE); + } + + @Override + public void onNext(Object o) { + } + + @Override + public void onError(Throwable t) { + this.latch.countDown(); + logDestroyMethodException(this.destroyMethod, t); + } + + @Override + public void onComplete() { + this.latch.countDown(); + logDestroyMethodCompletion(this.destroyMethod, true); + } + } + } diff --git a/spring-context/src/test/java/org/springframework/context/annotation/DestroyMethodInferenceTests.java b/spring-context/src/test/java/org/springframework/context/annotation/DestroyMethodInferenceTests.java index c15abc241db..3fa5b05c6bf 100644 --- a/spring-context/src/test/java/org/springframework/context/annotation/DestroyMethodInferenceTests.java +++ b/spring-context/src/test/java/org/springframework/context/annotation/DestroyMethodInferenceTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,8 +17,10 @@ package org.springframework.context.annotation; import java.io.Closeable; +import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; import org.springframework.beans.factory.DisposableBean; import org.springframework.context.ConfigurableApplicationContext; @@ -47,6 +49,8 @@ public class DestroyMethodInferenceTests { WithInheritedCloseMethod c8 = ctx.getBean("c8", WithInheritedCloseMethod.class); WithDisposableBean c9 = ctx.getBean("c9", WithDisposableBean.class); WithAutoCloseable c10 = ctx.getBean("c10", WithAutoCloseable.class); + WithCompletableFutureMethod c11 = ctx.getBean("c11", WithCompletableFutureMethod.class); + WithReactorMonoMethod c12 = ctx.getBean("c12", WithReactorMonoMethod.class); assertThat(c0.closed).as("c0").isFalse(); assertThat(c1.closed).as("c1").isFalse(); @@ -59,6 +63,8 @@ public class DestroyMethodInferenceTests { assertThat(c8.closed).as("c8").isFalse(); assertThat(c9.closed).as("c9").isFalse(); assertThat(c10.closed).as("c10").isFalse(); + assertThat(c11.closed).as("c11").isFalse(); + assertThat(c12.closed).as("c12").isFalse(); ctx.close(); assertThat(c0.closed).as("c0").isTrue(); @@ -72,6 +78,8 @@ public class DestroyMethodInferenceTests { assertThat(c8.closed).as("c8").isFalse(); assertThat(c9.closed).as("c9").isTrue(); assertThat(c10.closed).as("c10").isTrue(); + assertThat(c11.closed).as("c11").isTrue(); + assertThat(c12.closed).as("c12").isTrue(); } @Test @@ -171,6 +179,16 @@ public class DestroyMethodInferenceTests { public WithAutoCloseable c10() { return new WithAutoCloseable(); } + + @Bean + public WithCompletableFutureMethod c11() { + return new WithCompletableFutureMethod(); + } + + @Bean + public WithReactorMonoMethod c12() { + return new WithReactorMonoMethod(); + } } @@ -242,4 +260,38 @@ public class DestroyMethodInferenceTests { } } + + static class WithCompletableFutureMethod { + + boolean closed = false; + + public CompletableFuture close() { + return CompletableFuture.runAsync(() -> { + try { + Thread.sleep(100); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + closed = true; + }); + } + } + + + static class WithReactorMonoMethod { + + boolean closed = false; + + public Mono close() { + try { + Thread.sleep(100); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return Mono.fromRunnable(() -> closed = true); + } + } + } diff --git a/spring-r2dbc/src/test/java/org/springframework/r2dbc/core/H2DatabaseClientContextIntegrationTests.java b/spring-r2dbc/src/test/java/org/springframework/r2dbc/core/H2DatabaseClientContextIntegrationTests.java new file mode 100644 index 00000000000..133c15204cf --- /dev/null +++ b/spring-r2dbc/src/test/java/org/springframework/r2dbc/core/H2DatabaseClientContextIntegrationTests.java @@ -0,0 +1,64 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.r2dbc.core; + +import io.r2dbc.h2.CloseableConnectionFactory; +import io.r2dbc.h2.H2ConnectionFactory; +import io.r2dbc.spi.ConnectionFactory; +import io.r2dbc.spi.R2dbcNonTransientResourceException; +import org.junit.jupiter.api.AfterEach; + +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; + +/** + * @author Juergen Hoeller + * @since 6.1 + */ +public class H2DatabaseClientContextIntegrationTests extends H2DatabaseClientIntegrationTests { + + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class); + + CloseableConnectionFactory connectionFactory = context.getBean(CloseableConnectionFactory.class); + + + @Override + protected ConnectionFactory createConnectionFactory() { + return connectionFactory; + } + + @AfterEach + public void tearDown() { + context.close(); + assertThatExceptionOfType(R2dbcNonTransientResourceException.class).isThrownBy( + () -> connectionFactory.create().block()); + } + + + @Configuration + static class Config { + + @Bean + ConnectionFactory connectionFactory() { + return H2ConnectionFactory.inMemory("r2dbc-context"); + } + } + +}