|
|
|
@ -1,5 +1,5 @@ |
|
|
|
/* |
|
|
|
/* |
|
|
|
* Copyright 2002-2016 the original author or authors. |
|
|
|
* Copyright 2002-2017 the original author or authors. |
|
|
|
* |
|
|
|
* |
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
@ -115,6 +115,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> { |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> handler) { |
|
|
|
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> handler) { |
|
|
|
Assert.notNull(handler, "TcpConnectionHandler is required"); |
|
|
|
Assert.notNull(handler, "TcpConnectionHandler is required"); |
|
|
|
|
|
|
|
|
|
|
|
if (this.stopping) { |
|
|
|
if (this.stopping) { |
|
|
|
return handleShuttingDownConnectFailure(handler); |
|
|
|
return handleShuttingDownConnectFailure(handler); |
|
|
|
} |
|
|
|
} |
|
|
|
@ -131,6 +132,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> { |
|
|
|
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) { |
|
|
|
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) { |
|
|
|
Assert.notNull(handler, "TcpConnectionHandler is required"); |
|
|
|
Assert.notNull(handler, "TcpConnectionHandler is required"); |
|
|
|
Assert.notNull(strategy, "ReconnectStrategy is required"); |
|
|
|
Assert.notNull(strategy, "ReconnectStrategy is required"); |
|
|
|
|
|
|
|
|
|
|
|
if (this.stopping) { |
|
|
|
if (this.stopping) { |
|
|
|
return handleShuttingDownConnectFailure(handler); |
|
|
|
return handleShuttingDownConnectFailure(handler); |
|
|
|
} |
|
|
|
} |
|
|
|
@ -189,7 +191,6 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> { |
|
|
|
ChannelGroupFuture close = this.channelGroup.close(); |
|
|
|
ChannelGroupFuture close = this.channelGroup.close(); |
|
|
|
Mono<Void> completion = FutureMono.from(close) |
|
|
|
Mono<Void> completion = FutureMono.from(close) |
|
|
|
.doAfterTerminate((x, e) -> { |
|
|
|
.doAfterTerminate((x, e) -> { |
|
|
|
|
|
|
|
|
|
|
|
// TODO: https://github.com/reactor/reactor-netty/issues/24
|
|
|
|
// TODO: https://github.com/reactor/reactor-netty/issues/24
|
|
|
|
shutdownGlobalResources(); |
|
|
|
shutdownGlobalResources(); |
|
|
|
|
|
|
|
|
|
|
|
@ -211,14 +212,14 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> { |
|
|
|
return new MonoToListenableFutureAdapter<>(completion); |
|
|
|
return new MonoToListenableFutureAdapter<>(completion); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private static void shutdownGlobalResources() { |
|
|
|
private void shutdownGlobalResources() { |
|
|
|
try { |
|
|
|
try { |
|
|
|
Method method = TcpResources.class.getDeclaredMethod("_dispose"); |
|
|
|
Method method = TcpResources.class.getDeclaredMethod("_dispose"); |
|
|
|
ReflectionUtils.makeAccessible(method); |
|
|
|
ReflectionUtils.makeAccessible(method); |
|
|
|
ReflectionUtils.invokeMethod(method, TcpResources.get()); |
|
|
|
ReflectionUtils.invokeMethod(method, TcpResources.get()); |
|
|
|
} |
|
|
|
} |
|
|
|
catch (NoSuchMethodException ex) { |
|
|
|
catch (NoSuchMethodException ex) { |
|
|
|
ex.printStackTrace(); |
|
|
|
// ignore
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -227,15 +228,13 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> { |
|
|
|
|
|
|
|
|
|
|
|
private final TcpConnectionHandler<P> connectionHandler; |
|
|
|
private final TcpConnectionHandler<P> connectionHandler; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ReactorNettyHandler(TcpConnectionHandler<P> handler) { |
|
|
|
ReactorNettyHandler(TcpConnectionHandler<P> handler) { |
|
|
|
this.connectionHandler = handler; |
|
|
|
this.connectionHandler = handler; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
|
|
|
|
@SuppressWarnings("unchecked") |
|
|
|
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) { |
|
|
|
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) { |
|
|
|
|
|
|
|
|
|
|
|
DirectProcessor<Void> completion = DirectProcessor.create(); |
|
|
|
DirectProcessor<Void> completion = DirectProcessor.create(); |
|
|
|
TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion); |
|
|
|
TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion); |
|
|
|
scheduler.schedule(() -> connectionHandler.afterConnected(connection)); |
|
|
|
scheduler.schedule(() -> connectionHandler.afterConnected(connection)); |
|
|
|
@ -254,6 +253,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static class StompMessageDecoder<P> extends ByteToMessageDecoder { |
|
|
|
private static class StompMessageDecoder<P> extends ByteToMessageDecoder { |
|
|
|
|
|
|
|
|
|
|
|
private final ReactorNettyCodec<P> codec; |
|
|
|
private final ReactorNettyCodec<P> codec; |
|
|
|
|