diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/MonoToListenableFutureAdapter.java b/spring-core/src/main/java/org/springframework/util/concurrent/MonoToListenableFutureAdapter.java new file mode 100644 index 00000000000..f86c48cf020 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/util/concurrent/MonoToListenableFutureAdapter.java @@ -0,0 +1,97 @@ +/* + * Copyright 2002-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.util.concurrent; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; + +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +/** + * Adapts a {@link Mono} into a {@link ListenableFuture}. + * + * @author Rossen Stoyanchev + * @author Stephane Maldini + * @since 5.1 + * @param the object type + */ +public class MonoToListenableFutureAdapter implements ListenableFuture { + + private final MonoProcessor processor; + + private final ListenableFutureCallbackRegistry registry = new ListenableFutureCallbackRegistry<>(); + + + public MonoToListenableFutureAdapter(Mono mono) { + Assert.notNull(mono, "Mono must not be null"); + this.processor = mono + .doOnSuccess(this.registry::success) + .doOnError(this.registry::failure) + .toProcessor(); + } + + + @Override + @Nullable + public T get() { + return this.processor.block(); + } + + @Override + @Nullable + public T get(long timeout, TimeUnit unit) { + Assert.notNull(unit, "TimeUnit must not be null"); + Duration duration = Duration.ofMillis(TimeUnit.MILLISECONDS.convert(timeout, unit)); + return this.processor.block(duration); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (isCancelled()) { + return false; + } + this.processor.cancel(); + // isCancelled may still return false, if mono completed before the cancel + return this.processor.isCancelled(); + } + + @Override + public boolean isCancelled() { + return this.processor.isCancelled(); + } + + @Override + public boolean isDone() { + return this.processor.isTerminated(); + } + + @Override + public void addCallback(ListenableFutureCallback callback) { + this.registry.addCallback(callback); + } + + @Override + public void addCallback(SuccessCallback success, FailureCallback failure) { + this.registry.addSuccessCallback(success); + this.registry.addFailureCallback(failure); + } + +} diff --git a/spring-messaging/src/test/java/org/springframework/messaging/support/MonoToListenableFutureAdapterTests.java b/spring-core/src/test/java/org/springframework/util/concurrent/MonoToListenableFutureAdapterTests.java similarity index 95% rename from spring-messaging/src/test/java/org/springframework/messaging/support/MonoToListenableFutureAdapterTests.java rename to spring-core/src/test/java/org/springframework/util/concurrent/MonoToListenableFutureAdapterTests.java index e9f3c7a9051..aec8c9854d8 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/support/MonoToListenableFutureAdapterTests.java +++ b/spring-core/src/test/java/org/springframework/util/concurrent/MonoToListenableFutureAdapterTests.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.springframework.messaging.support; +package org.springframework.util.concurrent; import java.time.Duration; import java.util.concurrent.Future; @@ -22,8 +22,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import reactor.core.publisher.Mono; -import org.springframework.util.concurrent.ListenableFuture; - import static org.junit.Assert.*; /** diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/ReactiveReturnValueHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/ReactiveReturnValueHandler.java index ac3ca100196..334457cf701 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/ReactiveReturnValueHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/invocation/ReactiveReturnValueHandler.java @@ -21,9 +21,9 @@ import reactor.core.publisher.Mono; import org.springframework.core.MethodParameter; import org.springframework.core.ReactiveAdapter; import org.springframework.core.ReactiveAdapterRegistry; -import org.springframework.messaging.support.MonoToListenableFutureAdapter; import org.springframework.util.Assert; import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.MonoToListenableFutureAdapter; /** * Support for single-value reactive types (like {@code Mono} or {@code Single}) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/MonoToListenableFutureAdapter.java b/spring-messaging/src/main/java/org/springframework/messaging/support/MonoToListenableFutureAdapter.java deleted file mode 100644 index 712644f55ee..00000000000 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/MonoToListenableFutureAdapter.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2002-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.messaging.support; - -import reactor.core.publisher.Mono; - -import org.springframework.lang.Nullable; - -/** - * A Mono-to-ListenableFuture adapter where the source and the target from - * the Promise and the ListenableFuture respectively are of the same type. - * - * @author Rossen Stoyanchev - * @author Stephane Maldini - * @since 5.0 - * @param the object type - */ -public class MonoToListenableFutureAdapter extends AbstractMonoToListenableFutureAdapter { - - public MonoToListenableFutureAdapter(Mono mono) { - super(mono); - } - - @Override - @Nullable - protected T adapt(@Nullable T result) { - return result; - } - -} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index 8ad4774b0d2..9168e2b24e4 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -49,13 +49,13 @@ import reactor.netty.tcp.TcpClient; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; -import org.springframework.messaging.support.MonoToListenableFutureAdapter; import org.springframework.messaging.tcp.ReconnectStrategy; import org.springframework.messaging.tcp.TcpConnection; import org.springframework.messaging.tcp.TcpConnectionHandler; import org.springframework.messaging.tcp.TcpOperations; import org.springframework.util.Assert; import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.MonoToListenableFutureAdapter; import org.springframework.util.concurrent.SettableListenableFuture; /** diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java index 25ec53d67d4..0df203d1308 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java @@ -23,9 +23,9 @@ import reactor.netty.NettyInbound; import reactor.netty.NettyOutbound; import org.springframework.messaging.Message; -import org.springframework.messaging.support.MonoToListenableFutureAdapter; import org.springframework.messaging.tcp.TcpConnection; import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.MonoToListenableFutureAdapter; /** * Reactor Netty based implementation of {@link TcpConnection}.