|
|
|
@ -17,10 +17,12 @@ |
|
|
|
package org.springframework.messaging.tcp.reactor; |
|
|
|
package org.springframework.messaging.tcp.reactor; |
|
|
|
|
|
|
|
|
|
|
|
import io.netty.buffer.ByteBuf; |
|
|
|
import io.netty.buffer.ByteBuf; |
|
|
|
|
|
|
|
import io.netty.channel.ChannelPipeline; |
|
|
|
import reactor.core.publisher.DirectProcessor; |
|
|
|
import reactor.core.publisher.DirectProcessor; |
|
|
|
import reactor.core.publisher.Mono; |
|
|
|
import reactor.core.publisher.Mono; |
|
|
|
import reactor.ipc.netty.NettyInbound; |
|
|
|
import reactor.ipc.netty.NettyInbound; |
|
|
|
import reactor.ipc.netty.NettyOutbound; |
|
|
|
import reactor.ipc.netty.NettyOutbound; |
|
|
|
|
|
|
|
import reactor.ipc.netty.NettyPipeline; |
|
|
|
|
|
|
|
|
|
|
|
import org.springframework.messaging.Message; |
|
|
|
import org.springframework.messaging.Message; |
|
|
|
import org.springframework.messaging.tcp.TcpConnection; |
|
|
|
import org.springframework.messaging.tcp.TcpConnection; |
|
|
|
@ -64,6 +66,14 @@ public class ReactorNettyTcpConnection<P> implements TcpConnection<P> { |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
@SuppressWarnings("deprecation") |
|
|
|
@SuppressWarnings("deprecation") |
|
|
|
public void onReadInactivity(Runnable runnable, long inactivityDuration) { |
|
|
|
public void onReadInactivity(Runnable runnable, long inactivityDuration) { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// TODO: workaround for https://github.com/reactor/reactor-netty/issues/22
|
|
|
|
|
|
|
|
ChannelPipeline pipeline = this.inbound.context().channel().pipeline(); |
|
|
|
|
|
|
|
String name = NettyPipeline.OnChannelReadIdle; |
|
|
|
|
|
|
|
if (pipeline.context(name) != null) { |
|
|
|
|
|
|
|
pipeline.remove(name); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
this.inbound.onReadIdle(inactivityDuration, runnable); |
|
|
|
this.inbound.onReadIdle(inactivityDuration, runnable); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|