Browse Source

Configurable TcpClient for ReactorNettyTcpClient

Issue: SPR-17523
pull/2023/head
Rossen Stoyanchev 7 years ago
parent
commit
24848ec1bc
  1. 24
      spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java
  2. 17
      src/docs/asciidoc/web/websocket.adoc

24
spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java

@ -110,19 +110,41 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> { @@ -110,19 +110,41 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
this.loopResources = LoopResources.create("tcp-client-loop");
this.poolResources = ConnectionProvider.elastic("tcp-client-pool");
this.codec = codec;
this.tcpClient = TcpClient.create(this.poolResources)
.host(host).port(port)
.runOn(this.loopResources, false)
.doOnConnected(conn -> this.channelGroup.add(conn.channel()));
}
/**
* A variant of {@link #ReactorNettyTcpClient(String, int, ReactorNettyCodec)}
* that still manages the lifecycle of the {@link TcpClient} and underlying
* resources, but allows for direct configuration of other properties of the
* client through a {@code Function<TcpClient, TcpClient>}.
* @param clientConfigurer the configurer function
* @param codec for encoding and decoding the input/output byte streams
* @since 5.1.3
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
*/
public ReactorNettyTcpClient(Function<TcpClient, TcpClient> clientConfigurer, ReactorNettyCodec<P> codec) {
Assert.notNull(codec, "ReactorNettyCodec is required");
this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
this.loopResources = LoopResources.create("tcp-client-loop");
this.poolResources = ConnectionProvider.elastic("tcp-client-pool");
this.codec = codec;
this.tcpClient = clientConfigurer.apply(TcpClient
.create(this.poolResources)
.runOn(this.loopResources, false)
.doOnConnected(conn -> this.channelGroup.add(conn.channel())));
}
/**
* Constructor with an externally created {@link TcpClient} instance whose
* lifecycle is expected to be managed externally.
*
* @param tcpClient the TcpClient instance to use
* @param codec for encoding and decoding the input/output byte streams
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec

17
src/docs/asciidoc/web/websocket.adoc

@ -1641,10 +1641,10 @@ to receive notifications when the "`system`" connection to the broker is lost an @@ -1641,10 +1641,10 @@ to receive notifications when the "`system`" connection to the broker is lost an
re-established. For example, a Stock Quote service that broadcasts stock quotes can
stop trying to send messages when there is no active "`system`" connection.
By default, the STOMP broker relay always connects (and reconnects as needed if
connectivity is lost) to the same host and port. If you wish to supply multiple addresses,
By default, the STOMP broker relay always connects, and reconnects as needed if
connectivity is lost, to the same host and port. If you wish to supply multiple addresses,
on each attempt to connect, you can configure a supplier of addresses, instead of a
fixed host and port. The following example shows how to do so:
fixed host and port. The following example shows how to do that:
====
[source,java,indent=0]
@ -1663,14 +1663,9 @@ public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { @@ -1663,14 +1663,9 @@ public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
}
private ReactorNettyTcpClient<byte[]> createTcpClient() {
Consumer<ClientOptions.Builder<?>> builderConsumer = builder -> {
builder.connectAddress(()-> {
// Select address to connect to ...
});
};
return new ReactorNettyTcpClient<>(builderConsumer, new StompReactorNettyCodec());
return new ReactorNettyTcpClient<>(
client -> client.addressSupplier(() -> ... ),
new StompReactorNettyCodec());
}
}
----

Loading…
Cancel
Save