@ -1,5 +1,5 @@
@@ -1,5 +1,5 @@
/ *
* Copyright 2002 - 2015 the original author or authors .
* Copyright 2002 - 2016 the original author or authors .
*
* Licensed under the Apache License , Version 2 . 0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
@ -18,14 +18,14 @@ package org.springframework.messaging.simp.stomp;
@@ -18,14 +18,14 @@ package org.springframework.messaging.simp.stomp;
import java.nio.ByteBuffer ;
import org.springframework.messaging.Message ;
import org.springframework.util.Assert ;
import reactor.fn.Consumer ;
import reactor.fn.Function ;
import reactor.io.buffer.Buffer ;
import reactor.io.codec.Codec ;
import org.springframework.messaging.Message ;
import org.springframework.util.Assert ;
/ * *
* A Reactor TCP { @link Codec } for sending and receiving STOMP messages .
*
@ -35,25 +35,23 @@ import reactor.io.codec.Codec;
@@ -35,25 +35,23 @@ import reactor.io.codec.Codec;
* /
public class Reactor2StompCodec extends Codec < Buffer , Message < byte [ ] > , Message < byte [ ] > > {
private final StompDecoder stompDecoder ;
private final StompEncoder stompEncoder ;
private final Function < Message < byte [ ] > , Buffer > encodingFunction ;
private final StompDecoder stompDecoder ;
public Reactor2StompCodec ( ) {
this ( new StompEncoder ( ) , new StompDecoder ( ) ) ;
}
public Reactor2StompCodec ( StompEncoder encoder , StompDecoder decoder ) {
Assert . notNull ( encoder , "'encoder' is required" ) ;
Assert . notNull ( decoder , "'decoder' is required" ) ;
this . stompEncoder = encoder ;
Assert . notNull ( encoder , "StompEncoder is required" ) ;
Assert . notNull ( decoder , "StompDecoder is required" ) ;
this . encodingFunction = new EncodingFunction ( encoder ) ;
this . stompDecoder = decoder ;
this . encodingFunction = new EncodingFunction ( this . stompEncoder ) ;
}
@Override
public Function < Buffer , Message < byte [ ] > > decoder ( final Consumer < Message < byte [ ] > > messageConsumer ) {
return new DecodingFunction ( this . stompDecoder , messageConsumer ) ;
@ -66,14 +64,15 @@ public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<b
@@ -66,14 +64,15 @@ public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<b
@Override
public Buffer apply ( Message < byte [ ] > message ) {
return encodingFunction . apply ( message ) ;
return this . encodingFunction . apply ( message ) ;
}
private static class EncodingFunction implements Function < Message < byte [ ] > , Buffer > {
private final StompEncoder encoder ;
private EncodingFunction ( StompEncoder encoder ) {
public EncodingFunction ( StompEncoder encoder ) {
this . encoder = encoder ;
}
@ -84,6 +83,7 @@ public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<b
@@ -84,6 +83,7 @@ public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<b
}
}
private static class DecodingFunction implements Function < Buffer , Message < byte [ ] > > {
private final StompDecoder decoder ;
@ -103,4 +103,5 @@ public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<b
@@ -103,4 +103,5 @@ public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<b
return null ;
}
}
}