|
|
|
@ -163,7 +163,8 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> { |
|
|
|
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> connectionHandler) { |
|
|
|
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> connectionHandler) { |
|
|
|
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null"); |
|
|
|
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null"); |
|
|
|
|
|
|
|
|
|
|
|
TcpClient<Message<P>, Message<P>> tcpClient; |
|
|
|
final TcpClient<Message<P>, Message<P>> tcpClient; |
|
|
|
|
|
|
|
Runnable cleanupTask; |
|
|
|
synchronized (this.tcpClients) { |
|
|
|
synchronized (this.tcpClients) { |
|
|
|
if (this.stopping) { |
|
|
|
if (this.stopping) { |
|
|
|
IllegalStateException ex = new IllegalStateException("Shutting down."); |
|
|
|
IllegalStateException ex = new IllegalStateException("Shutting down."); |
|
|
|
@ -172,9 +173,18 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> { |
|
|
|
} |
|
|
|
} |
|
|
|
tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory); |
|
|
|
tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory); |
|
|
|
this.tcpClients.add(tcpClient); |
|
|
|
this.tcpClients.add(tcpClient); |
|
|
|
|
|
|
|
cleanupTask = new Runnable() { |
|
|
|
|
|
|
|
@Override |
|
|
|
|
|
|
|
public void run() { |
|
|
|
|
|
|
|
synchronized (tcpClients) { |
|
|
|
|
|
|
|
tcpClients.remove(tcpClient); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
}; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
Promise<Void> promise = tcpClient.start(new MessageChannelStreamHandler<P>(connectionHandler)); |
|
|
|
Promise<Void> promise = tcpClient.start( |
|
|
|
|
|
|
|
new MessageChannelStreamHandler<P>(connectionHandler, cleanupTask)); |
|
|
|
|
|
|
|
|
|
|
|
return new PassThroughPromiseToListenableFutureAdapter<Void>( |
|
|
|
return new PassThroughPromiseToListenableFutureAdapter<Void>( |
|
|
|
promise.onError(new Consumer<Throwable>() { |
|
|
|
promise.onError(new Consumer<Throwable>() { |
|
|
|
@ -191,7 +201,8 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> { |
|
|
|
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null"); |
|
|
|
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null"); |
|
|
|
Assert.notNull(strategy, "ReconnectStrategy must not be null"); |
|
|
|
Assert.notNull(strategy, "ReconnectStrategy must not be null"); |
|
|
|
|
|
|
|
|
|
|
|
TcpClient<Message<P>, Message<P>> tcpClient; |
|
|
|
final TcpClient<Message<P>, Message<P>> tcpClient; |
|
|
|
|
|
|
|
Runnable cleanupTask; |
|
|
|
synchronized (this.tcpClients) { |
|
|
|
synchronized (this.tcpClients) { |
|
|
|
if (this.stopping) { |
|
|
|
if (this.stopping) { |
|
|
|
IllegalStateException ex = new IllegalStateException("Shutting down."); |
|
|
|
IllegalStateException ex = new IllegalStateException("Shutting down."); |
|
|
|
@ -200,10 +211,18 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> { |
|
|
|
} |
|
|
|
} |
|
|
|
tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory); |
|
|
|
tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory); |
|
|
|
this.tcpClients.add(tcpClient); |
|
|
|
this.tcpClients.add(tcpClient); |
|
|
|
|
|
|
|
cleanupTask = new Runnable() { |
|
|
|
|
|
|
|
@Override |
|
|
|
|
|
|
|
public void run() { |
|
|
|
|
|
|
|
synchronized (tcpClients) { |
|
|
|
|
|
|
|
tcpClients.remove(tcpClient); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
}; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
Stream<Tuple2<InetSocketAddress, Integer>> stream = tcpClient.start( |
|
|
|
Stream<Tuple2<InetSocketAddress, Integer>> stream = tcpClient.start( |
|
|
|
new MessageChannelStreamHandler<P>(connectionHandler), |
|
|
|
new MessageChannelStreamHandler<P>(connectionHandler, cleanupTask), |
|
|
|
new ReactorReconnectAdapter(strategy)); |
|
|
|
new ReactorReconnectAdapter(strategy)); |
|
|
|
|
|
|
|
|
|
|
|
return new PassThroughPromiseToListenableFutureAdapter<Void>(stream.next().after()); |
|
|
|
return new PassThroughPromiseToListenableFutureAdapter<Void>(stream.next().after()); |
|
|
|
@ -249,6 +268,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> { |
|
|
|
}); |
|
|
|
}); |
|
|
|
promise = eventLoopPromise; |
|
|
|
promise = eventLoopPromise; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return new PassThroughPromiseToListenableFutureAdapter<Void>(promise); |
|
|
|
return new PassThroughPromiseToListenableFutureAdapter<Void>(promise); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -278,8 +298,11 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> { |
|
|
|
|
|
|
|
|
|
|
|
private final TcpConnectionHandler<P> connectionHandler; |
|
|
|
private final TcpConnectionHandler<P> connectionHandler; |
|
|
|
|
|
|
|
|
|
|
|
public MessageChannelStreamHandler(TcpConnectionHandler<P> connectionHandler) { |
|
|
|
private final Runnable cleanupTask; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public MessageChannelStreamHandler(TcpConnectionHandler<P> connectionHandler, Runnable cleanupTask) { |
|
|
|
this.connectionHandler = connectionHandler; |
|
|
|
this.connectionHandler = connectionHandler; |
|
|
|
|
|
|
|
this.cleanupTask = cleanupTask; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
@ -290,6 +313,7 @@ public class Reactor2TcpClient<P> implements TcpOperations<P> { |
|
|
|
.finallyDo(new Consumer<Signal<Message<P>>>() { |
|
|
|
.finallyDo(new Consumer<Signal<Message<P>>>() { |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void accept(Signal<Message<P>> signal) { |
|
|
|
public void accept(Signal<Message<P>> signal) { |
|
|
|
|
|
|
|
cleanupTask.run(); |
|
|
|
if (signal.isOnError()) { |
|
|
|
if (signal.isOnError()) { |
|
|
|
connectionHandler.handleFailure(signal.getThrowable()); |
|
|
|
connectionHandler.handleFailure(signal.getThrowable()); |
|
|
|
} |
|
|
|
} |
|
|
|
|