|
|
|
@ -1,5 +1,5 @@ |
|
|
|
/* |
|
|
|
/* |
|
|
|
* Copyright 2002-2015 the original author or authors. |
|
|
|
* Copyright 2002-2016 the original author or authors. |
|
|
|
* |
|
|
|
* |
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
* you may not use this file except in compliance with the License. |
|
|
|
@ -18,14 +18,14 @@ package org.springframework.messaging.simp.stomp; |
|
|
|
|
|
|
|
|
|
|
|
import java.nio.ByteBuffer; |
|
|
|
import java.nio.ByteBuffer; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import org.springframework.messaging.Message; |
|
|
|
|
|
|
|
import org.springframework.util.Assert; |
|
|
|
|
|
|
|
import reactor.fn.Consumer; |
|
|
|
import reactor.fn.Consumer; |
|
|
|
import reactor.fn.Function; |
|
|
|
import reactor.fn.Function; |
|
|
|
import reactor.io.buffer.Buffer; |
|
|
|
import reactor.io.buffer.Buffer; |
|
|
|
import reactor.io.codec.Codec; |
|
|
|
import reactor.io.codec.Codec; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import org.springframework.messaging.Message; |
|
|
|
|
|
|
|
import org.springframework.util.Assert; |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* A Reactor TCP {@link Codec} for sending and receiving STOMP messages. |
|
|
|
* A Reactor TCP {@link Codec} for sending and receiving STOMP messages. |
|
|
|
* |
|
|
|
* |
|
|
|
@ -35,25 +35,23 @@ import reactor.io.codec.Codec; |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<byte[]>> { |
|
|
|
public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<byte[]>> { |
|
|
|
|
|
|
|
|
|
|
|
private final StompDecoder stompDecoder; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final StompEncoder stompEncoder; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final Function<Message<byte[]>, Buffer> encodingFunction; |
|
|
|
private final Function<Message<byte[]>, Buffer> encodingFunction; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final StompDecoder stompDecoder; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public Reactor2StompCodec() { |
|
|
|
public Reactor2StompCodec() { |
|
|
|
this(new StompEncoder(), new StompDecoder()); |
|
|
|
this(new StompEncoder(), new StompDecoder()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public Reactor2StompCodec(StompEncoder encoder, StompDecoder decoder) { |
|
|
|
public Reactor2StompCodec(StompEncoder encoder, StompDecoder decoder) { |
|
|
|
Assert.notNull(encoder, "'encoder' is required"); |
|
|
|
Assert.notNull(encoder, "StompEncoder is required"); |
|
|
|
Assert.notNull(decoder, "'decoder' is required"); |
|
|
|
Assert.notNull(decoder, "StompDecoder is required"); |
|
|
|
this.stompEncoder = encoder; |
|
|
|
this.encodingFunction = new EncodingFunction(encoder); |
|
|
|
this.stompDecoder = decoder; |
|
|
|
this.stompDecoder = decoder; |
|
|
|
this.encodingFunction = new EncodingFunction(this.stompEncoder); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public Function<Buffer, Message<byte[]>> decoder(final Consumer<Message<byte[]>> messageConsumer) { |
|
|
|
public Function<Buffer, Message<byte[]>> decoder(final Consumer<Message<byte[]>> messageConsumer) { |
|
|
|
return new DecodingFunction(this.stompDecoder, messageConsumer); |
|
|
|
return new DecodingFunction(this.stompDecoder, messageConsumer); |
|
|
|
@ -66,14 +64,15 @@ public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<b |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public Buffer apply(Message<byte[]> message) { |
|
|
|
public Buffer apply(Message<byte[]> message) { |
|
|
|
return encodingFunction.apply(message); |
|
|
|
return this.encodingFunction.apply(message); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static class EncodingFunction implements Function<Message<byte[]>, Buffer> { |
|
|
|
private static class EncodingFunction implements Function<Message<byte[]>, Buffer> { |
|
|
|
|
|
|
|
|
|
|
|
private final StompEncoder encoder; |
|
|
|
private final StompEncoder encoder; |
|
|
|
|
|
|
|
|
|
|
|
private EncodingFunction(StompEncoder encoder) { |
|
|
|
public EncodingFunction(StompEncoder encoder) { |
|
|
|
this.encoder = encoder; |
|
|
|
this.encoder = encoder; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -84,6 +83,7 @@ public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<b |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static class DecodingFunction implements Function<Buffer, Message<byte[]>> { |
|
|
|
private static class DecodingFunction implements Function<Buffer, Message<byte[]>> { |
|
|
|
|
|
|
|
|
|
|
|
private final StompDecoder decoder; |
|
|
|
private final StompDecoder decoder; |
|
|
|
@ -103,4 +103,5 @@ public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<b |
|
|
|
return null; |
|
|
|
return null; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|