From d6f4ec8c33147f30e34280e048f15ad7c4da1530 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 25 Feb 2019 12:56:32 -0500 Subject: [PATCH] MessagingAcceptor/RSocket refinements + upgrade to 0.11.17 See gh-21987 --- spring-messaging/spring-messaging.gradle | 2 +- .../rsocket/DefaultRSocketRequester.java | 21 +++-- .../messaging/rsocket/MessagingAcceptor.java | 23 ++--- .../messaging/rsocket/MessagingRSocket.java | 83 ++++++++++--------- .../messaging/rsocket/PayloadUtils.java | 29 +++---- .../rsocket/RSocketMessageHandler.java | 13 --- .../RSocketPayloadReturnValueHandler.java | 2 +- .../rsocket/DefaultRSocketRequesterTests.java | 2 +- ...RSocketClientToServerIntegrationTests.java | 7 ++ ...RSocketServerToClientIntegrationTests.java | 39 ++++----- 10 files changed, 105 insertions(+), 116 deletions(-) diff --git a/spring-messaging/spring-messaging.gradle b/spring-messaging/spring-messaging.gradle index 9a4771279c2..fd469281c6d 100644 --- a/spring-messaging/spring-messaging.gradle +++ b/spring-messaging/spring-messaging.gradle @@ -7,7 +7,7 @@ dependencyManagement { } } -def rsocketVersion = "0.11.15" +def rsocketVersion = "0.11.17" dependencies { compile(project(":spring-beans")) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java index 978c353d3e2..94ed888f256 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java @@ -149,9 +149,14 @@ final class DefaultRSocketRequester implements RSocketRequester { .concatMap(value -> encodeValue(value, dataType, encoder)) .switchOnFirst((signal, inner) -> { DataBuffer data = signal.get(); - return data != null ? - Flux.concat(Mono.just(firstPayload(data)), inner.skip(1).map(PayloadUtils::asPayload)) : - inner.map(PayloadUtils::asPayload); + if (data != null) { + return Flux.concat( + Mono.just(firstPayload(data)), + inner.skip(1).map(PayloadUtils::createPayload)); + } + else { + return inner.map(PayloadUtils::createPayload); + } }) .switchIfEmpty(emptyPayload()); return new DefaultResponseSpec(payloadFlux); @@ -167,7 +172,7 @@ final class DefaultRSocketRequester implements RSocketRequester { } private Payload firstPayload(DataBuffer data) { - return PayloadUtils.asPayload(getMetadata(), data); + return PayloadUtils.createPayload(getMetadata(), data); } private Mono emptyPayload() { @@ -239,7 +244,7 @@ final class DefaultRSocketRequester implements RSocketRequester { Decoder decoder = strategies.decoder(elementType, dataMimeType); return (Mono) decoder.decodeToMono( - payloadMono.map(this::asDataBuffer), elementType, dataMimeType, EMPTY_HINTS); + payloadMono.map(this::wrapPayloadData), elementType, dataMimeType, EMPTY_HINTS); } @SuppressWarnings("unchecked") @@ -255,12 +260,12 @@ final class DefaultRSocketRequester implements RSocketRequester { Decoder decoder = strategies.decoder(elementType, dataMimeType); - return payloadFlux.map(this::asDataBuffer).concatMap(dataBuffer -> + return payloadFlux.map(this::wrapPayloadData).concatMap(dataBuffer -> (Mono) decoder.decodeToMono(Mono.just(dataBuffer), elementType, dataMimeType, EMPTY_HINTS)); } - private DataBuffer asDataBuffer(Payload payload) { - return PayloadUtils.asDataBuffer(payload, strategies.dataBufferFactory()); + private DataBuffer wrapPayloadData(Payload payload) { + return PayloadUtils.wrapPayloadData(payload, strategies.dataBufferFactory()); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingAcceptor.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingAcceptor.java index 2cc7212834e..e4b7e44ad76 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingAcceptor.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingAcceptor.java @@ -28,7 +28,6 @@ import org.springframework.messaging.Message; import org.springframework.messaging.ReactiveMessageChannel; import org.springframework.util.Assert; import org.springframework.util.MimeType; -import org.springframework.util.MimeTypeUtils; /** * RSocket acceptor for @@ -79,10 +78,9 @@ public final class MessagingAcceptor implements SocketAcceptor, FunctionBy default this is not set. + * Configure the default content type to use for data payloads. + *

By default this is not set. However a server acceptor will use the + * content type from the {@link ConnectionSetupPayload}. * @param defaultDataMimeType the MimeType to use */ public void setDefaultDataMimeType(@Nullable MimeType defaultDataMimeType) { @@ -92,21 +90,18 @@ public final class MessagingAcceptor implements SocketAcceptor, Function accept(ConnectionSetupPayload setupPayload, RSocket sendingRSocket) { - - MimeType mimeType = setupPayload.dataMimeType() != null ? - MimeTypeUtils.parseMimeType(setupPayload.dataMimeType()) : this.defaultDataMimeType; - - MessagingRSocket rsocket = createRSocket(sendingRSocket, mimeType); - return rsocket.afterConnectionEstablished(setupPayload).then(Mono.just(rsocket)); + MessagingRSocket rsocket = createRSocket(sendingRSocket); + rsocket.handleConnectionSetupPayload(setupPayload).subscribe(); + return Mono.just(rsocket); } @Override public RSocket apply(RSocket sendingRSocket) { - return createRSocket(sendingRSocket, this.defaultDataMimeType); + return createRSocket(sendingRSocket); } - private MessagingRSocket createRSocket(RSocket sendingRSocket, @Nullable MimeType dataMimeType) { - return new MessagingRSocket(this.messageChannel, sendingRSocket, dataMimeType, this.rsocketStrategies); + private MessagingRSocket createRSocket(RSocket rsocket) { + return new MessagingRSocket(this.messageChannel, rsocket, this.defaultDataMimeType, this.rsocketStrategies); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java index e1c983df1c4..824ff3901d0 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java @@ -17,6 +17,7 @@ package org.springframework.messaging.rsocket; import java.util.function.Function; +import io.rsocket.AbstractRSocket; import io.rsocket.ConnectionSetupPayload; import io.rsocket.Payload; import io.rsocket.RSocket; @@ -40,6 +41,8 @@ import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageHeaderAccessor; import org.springframework.util.Assert; import org.springframework.util.MimeType; +import org.springframework.util.MimeTypeUtils; +import org.springframework.util.StringUtils; /** * Package private implementation of {@link RSocket} that is is hooked into an @@ -49,90 +52,96 @@ import org.springframework.util.MimeType; * @author Rossen Stoyanchev * @since 5.2 */ -class MessagingRSocket implements RSocket { +class MessagingRSocket extends AbstractRSocket { private final ReactiveMessageChannel messageChannel; private final RSocketRequester requester; @Nullable - private final MimeType dataMimeType; + private MimeType dataMimeType; private final RSocketStrategies strategies; MessagingRSocket(ReactiveMessageChannel messageChannel, - RSocket sendingRSocket, @Nullable MimeType dataMimeType, RSocketStrategies strategies) { + RSocket sendingRSocket, @Nullable MimeType defaultDataMimeType, RSocketStrategies strategies) { Assert.notNull(messageChannel, "'messageChannel' is required"); Assert.notNull(sendingRSocket, "'sendingRSocket' is required"); this.messageChannel = messageChannel; - this.requester = RSocketRequester.create(sendingRSocket, dataMimeType, strategies); - this.dataMimeType = dataMimeType; + this.requester = RSocketRequester.create(sendingRSocket, defaultDataMimeType, strategies); + this.dataMimeType = defaultDataMimeType; this.strategies = strategies; } - public Mono afterConnectionEstablished(ConnectionSetupPayload payload) { - return execute(payload).flatMap(flux -> flux.take(0).then()); + + public Mono handleConnectionSetupPayload(ConnectionSetupPayload payload) { + if (StringUtils.hasText(payload.dataMimeType())) { + this.dataMimeType = MimeTypeUtils.parseMimeType(payload.dataMimeType()); + } + return handle(payload); } @Override public Mono fireAndForget(Payload payload) { - return execute(payload).flatMap(flux -> flux.take(0).then()); + return handle(payload); } @Override public Mono requestResponse(Payload payload) { - return execute(payload).flatMap(Flux::next); + return handleAndReply(payload, Flux.just(payload)).next(); } @Override public Flux requestStream(Payload payload) { - return execute(payload).flatMapMany(Function.identity()); + return handleAndReply(payload, Flux.just(payload)); } @Override public Flux requestChannel(Publisher payloads) { return Flux.from(payloads) - .switchOnFirst((signal, inner) -> { - Payload first = signal.get(); - return first != null ? execute(first, inner).flatMapMany(Function.identity()) : inner; + .switchOnFirst((signal, innerFlux) -> { + Payload firstPayload = signal.get(); + return firstPayload == null ? innerFlux : handleAndReply(firstPayload, innerFlux); }); } @Override public Mono metadataPush(Payload payload) { - return null; + // This won't be very useful until createHeaders starting doing something more with metadata.. + return handle(payload); } - private Mono> execute(Payload payload) { - return execute(payload, Flux.just(payload)); - } - private Mono> execute(Payload firstPayload, Flux payloads) { + private Mono handle(Payload payload) { - // TODO: - // Since we do retain(), we need to ensure buffers are released if not consumed, - // e.g. error before Flux subscribed to, no handler found, @MessageMapping ignores payload, etc. + Message message = MessageBuilder.createMessage( + Mono.fromCallable(() -> wrapPayloadData(payload)), + createHeaders(payload, null)); - Flux payloadDataBuffers = payloads - .map(payload -> PayloadUtils.asDataBuffer(payload, this.strategies.dataBufferFactory())) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + return this.messageChannel.send(message).flatMap(result -> result ? + Mono.empty() : Mono.error(new MessageDeliveryException("RSocket request not handled"))); + } + + private Flux handleAndReply(Payload firstPayload, Flux payloads) { MonoProcessor> replyMono = MonoProcessor.create(); - MessageHeaders headers = createHeaders(firstPayload, replyMono); - Message message = MessageBuilder.createMessage(payloadDataBuffers, headers); + Message message = MessageBuilder.createMessage( + payloads.map(this::wrapPayloadData).doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release), + createHeaders(firstPayload, replyMono)); - return this.messageChannel.send(message).flatMap(result -> result ? - replyMono.isTerminated() ? replyMono : Mono.empty() : - Mono.error(new MessageDeliveryException("RSocket interaction not handled"))); + return this.messageChannel.send(message).flatMapMany(result -> + result && replyMono.isTerminated() ? replyMono.flatMapMany(Function.identity()) : + Mono.error(new MessageDeliveryException("RSocket request not handled"))); } - private MessageHeaders createHeaders(Payload payload, MonoProcessor replyMono) { + private MessageHeaders createHeaders(Payload payload, @Nullable MonoProcessor replyMono) { + // TODO: // For now treat the metadata as a simple string with routing information. // We'll have to get more sophisticated once the routing extension is completed. // https://github.com/rsocket/rsocket-java/issues/568 @@ -147,7 +156,10 @@ class MessagingRSocket implements RSocket { } headers.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, this.requester); - headers.setHeader(RSocketPayloadReturnValueHandler.RESPONSE_HEADER, replyMono); + + if (replyMono != null) { + headers.setHeader(RSocketPayloadReturnValueHandler.RESPONSE_HEADER, replyMono); + } DataBufferFactory bufferFactory = this.strategies.dataBufferFactory(); headers.setHeader(HandlerMethodReturnValueHandler.DATA_BUFFER_FACTORY_HEADER, bufferFactory); @@ -155,13 +167,8 @@ class MessagingRSocket implements RSocket { return headers.getMessageHeaders(); } - @Override - public Mono onClose() { - return null; - } - - @Override - public void dispose() { + private DataBuffer wrapPayloadData(Payload payload) { + return PayloadUtils.wrapPayloadData(payload, this.strategies.dataBufferFactory()); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/PayloadUtils.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/PayloadUtils.java index 98fd9ae8c1b..8e3e87c6e17 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/PayloadUtils.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/PayloadUtils.java @@ -15,9 +15,6 @@ */ package org.springframework.messaging.rsocket; -import java.nio.ByteBuffer; - -import io.netty.buffer.ByteBuf; import io.rsocket.Payload; import io.rsocket.util.ByteBufPayload; import io.rsocket.util.DefaultPayload; @@ -44,7 +41,7 @@ abstract class PayloadUtils { * @param bufferFactory the BufferFactory to use to wrap * @return the DataBuffer wrapper */ - public static DataBuffer asDataBuffer(Payload payload, DataBufferFactory bufferFactory) { + public static DataBuffer wrapPayloadData(Payload payload, DataBufferFactory bufferFactory) { if (bufferFactory instanceof NettyDataBufferFactory) { return ((NettyDataBufferFactory) bufferFactory).wrap(payload.retain().sliceData()); } @@ -59,12 +56,16 @@ abstract class PayloadUtils { * @param data the data part for the payload * @return the created Payload */ - public static Payload asPayload(DataBuffer metadata, DataBuffer data) { + public static Payload createPayload(DataBuffer metadata, DataBuffer data) { if (metadata instanceof NettyDataBuffer && data instanceof NettyDataBuffer) { - return ByteBufPayload.create(getByteBuf(data), getByteBuf(metadata)); + return ByteBufPayload.create( + ((NettyDataBuffer) data).getNativeBuffer(), + ((NettyDataBuffer) metadata).getNativeBuffer()); } else if (metadata instanceof DefaultDataBuffer && data instanceof DefaultDataBuffer) { - return DefaultPayload.create(getByteBuffer(data), getByteBuffer(metadata)); + return DefaultPayload.create( + ((DefaultDataBuffer) data).getNativeBuffer(), + ((DefaultDataBuffer) metadata).getNativeBuffer()); } else { return DefaultPayload.create(data.asByteBuffer(), metadata.asByteBuffer()); @@ -76,24 +77,16 @@ abstract class PayloadUtils { * @param data the data part for the payload * @return the created Payload */ - public static Payload asPayload(DataBuffer data) { + public static Payload createPayload(DataBuffer data) { if (data instanceof NettyDataBuffer) { - return ByteBufPayload.create(getByteBuf(data)); + return ByteBufPayload.create(((NettyDataBuffer) data).getNativeBuffer()); } else if (data instanceof DefaultDataBuffer) { - return DefaultPayload.create(getByteBuffer(data)); + return DefaultPayload.create(((DefaultDataBuffer) data).getNativeBuffer()); } else { return DefaultPayload.create(data.asByteBuffer()); } } - private static ByteBuf getByteBuf(DataBuffer dataBuffer) { - return ((NettyDataBuffer) dataBuffer).getNativeBuffer(); - } - - private static - ByteBuffer getByteBuffer(DataBuffer dataBuffer) { - return ((DefaultDataBuffer) dataBuffer).getNativeBuffer(); - } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketMessageHandler.java index a6f00303299..93d5fb43a63 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketMessageHandler.java @@ -21,13 +21,10 @@ import java.util.List; import org.springframework.core.codec.Decoder; import org.springframework.core.codec.Encoder; import org.springframework.lang.Nullable; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageDeliveryException; import org.springframework.messaging.ReactiveSubscribableChannel; import org.springframework.messaging.handler.annotation.support.reactive.MessageMappingMessageHandler; import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler; import org.springframework.util.Assert; -import org.springframework.util.StringUtils; /** * RSocket-specific extension of {@link MessageMappingMessageHandler}. @@ -124,14 +121,4 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { return handlers; } - - @Override - protected void handleNoMatch(@Nullable String destination, Message message) { - // Ignore empty destination, probably the ConnectionSetupPayload - if (!StringUtils.isEmpty(destination)) { - super.handleNoMatch(destination, message); - throw new MessageDeliveryException("No handler for '" + destination + "'"); - } - } - } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketPayloadReturnValueHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketPayloadReturnValueHandler.java index 83521683daa..c841736a7c8 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketPayloadReturnValueHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketPayloadReturnValueHandler.java @@ -63,7 +63,7 @@ public class RSocketPayloadReturnValueHandler extends AbstractEncoderMethodRetur Assert.isInstanceOf(MonoProcessor.class, headerValue, "Expected MonoProcessor"); MonoProcessor> monoProcessor = (MonoProcessor>) headerValue; - monoProcessor.onNext(encodedContent.map(PayloadUtils::asPayload)); + monoProcessor.onNext(encodedContent.map(PayloadUtils::createPayload)); monoProcessor.onComplete(); return Mono.empty(); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java index a11ff902370..13c3693f72d 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java @@ -199,7 +199,7 @@ public class DefaultRSocketRequesterTests { } private Payload toPayload(String value) { - return PayloadUtils.asPayload(bufferFactory.wrap(value.getBytes(StandardCharsets.UTF_8))); + return PayloadUtils.createPayload(bufferFactory.wrap(value.getBytes(StandardCharsets.UTF_8))); } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java index 39c27d8cc2e..b6e0f8f38c6 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java @@ -35,6 +35,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.codec.CharSequenceEncoder; import org.springframework.core.codec.StringDecoder; +import org.springframework.messaging.MessageDeliveryException; import org.springframework.messaging.ReactiveMessageChannel; import org.springframework.messaging.ReactiveSubscribableChannel; import org.springframework.messaging.handler.annotation.MessageMapping; @@ -169,6 +170,12 @@ public class RSocketClientToServerIntegrationTests { .verifyComplete(); } + @Test + public void noMatchingRoute() { + Mono result = requester.route("invalid").data("anything").retrieveMono(String.class); + StepVerifier.create(result).verifyErrorMessage("RSocket request not handled"); + } + @Controller static class ServerController { diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java index 9da66896106..6a31f906dc9 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java @@ -20,7 +20,6 @@ import java.util.Collections; import java.util.List; import io.rsocket.Closeable; -import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.RSocketFactory; import io.rsocket.transport.netty.client.TcpClientTransport; @@ -140,13 +139,22 @@ public class RSocketServerToClientIntegrationTests { volatile MonoProcessor result; + public void reset() { + this.result = MonoProcessor.create(); + } + + public void await(Duration duration) { + this.result.block(duration); + } + + @MessageMapping("connect.echo") void echo(RSocketRequester requester) { runTest(() -> { - Flux result = Flux.range(1, 3).concatMap(i -> + Flux flux = Flux.range(1, 3).concatMap(i -> requester.route("echo").data("Hello " + i).retrieveMono(String.class)); - StepVerifier.create(result) + StepVerifier.create(flux) .expectNext("Hello 1") .expectNext("Hello 2") .expectNext("Hello 3") @@ -157,10 +165,10 @@ public class RSocketServerToClientIntegrationTests { @MessageMapping("connect.echo-async") void echoAsync(RSocketRequester requester) { runTest(() -> { - Flux result = Flux.range(1, 3).concatMap(i -> + Flux flux = Flux.range(1, 3).concatMap(i -> requester.route("echo-async").data("Hello " + i).retrieveMono(String.class)); - StepVerifier.create(result) + StepVerifier.create(flux) .expectNext("Hello 1 async") .expectNext("Hello 2 async") .expectNext("Hello 3 async") @@ -171,9 +179,9 @@ public class RSocketServerToClientIntegrationTests { @MessageMapping("connect.echo-stream") void echoStream(RSocketRequester requester) { runTest(() -> { - Flux result = requester.route("echo-stream").data("Hello").retrieveFlux(String.class); + Flux flux = requester.route("echo-stream").data("Hello").retrieveFlux(String.class); - StepVerifier.create(result) + StepVerifier.create(flux) .expectNext("Hello 0") .expectNextCount(5) .expectNext("Hello 6") @@ -186,11 +194,11 @@ public class RSocketServerToClientIntegrationTests { @MessageMapping("connect.echo-channel") void echoChannel(RSocketRequester requester) { runTest(() -> { - Flux result = requester.route("echo-channel") + Flux flux = requester.route("echo-channel") .data(Flux.range(1, 10).map(i -> "Hello " + i), String.class) .retrieveFlux(String.class); - StepVerifier.create(result) + StepVerifier.create(flux) .expectNext("Hello 1 async") .expectNextCount(7) .expectNext("Hello 9 async") @@ -207,19 +215,6 @@ public class RSocketServerToClientIntegrationTests { .subscribeOn(Schedulers.elastic()) .subscribe(); } - - private static Payload payload(String destination, String data) { - return DefaultPayload.create(data, destination); - } - - - public void reset() { - this.result = MonoProcessor.create(); - } - - public void await(Duration duration) { - this.result.block(duration); - } }