Browse Source

Take in account Rossen and Arjen feedbacks

pull/1111/head
Sebastien Deleuze 10 years ago
parent
commit
3c80c19c19
  1. 7
      spring-web-reactive/src/main/java/org/springframework/core/io/buffer/FlushingDataBuffer.java
  2. 41
      spring-web-reactive/src/main/java/org/springframework/http/codec/SseEventEncoder.java
  3. 16
      spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/SseHttpMessageConverter.java
  4. 23
      spring-web-reactive/src/test/java/org/springframework/core/codec/support/SseEventEncoderTests.java
  5. 2
      spring-web-reactive/src/test/java/org/springframework/http/server/reactive/FlushingIntegrationTests.java
  6. 52
      spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java

7
spring-web-reactive/src/main/java/org/springframework/core/io/buffer/FlushingDataBuffer.java

@ -33,6 +33,10 @@ public class FlushingDataBuffer implements DataBuffer {
private final DataBuffer buffer; private final DataBuffer buffer;
public FlushingDataBuffer() {
this.buffer = new DefaultDataBufferFactory().allocateBuffer(0);
}
public FlushingDataBuffer(DataBuffer buffer) { public FlushingDataBuffer(DataBuffer buffer) {
Assert.notNull(buffer); Assert.notNull(buffer);
this.buffer = buffer; this.buffer = buffer;
@ -85,7 +89,7 @@ public class FlushingDataBuffer implements DataBuffer {
@Override @Override
public DataBuffer write(byte[] source, int offset, int length) { public DataBuffer write(byte[] source, int offset, int length) {
return this.write(source, offset, length); return this.buffer.write(source, offset, length);
} }
@Override @Override
@ -117,4 +121,5 @@ public class FlushingDataBuffer implements DataBuffer {
public OutputStream asOutputStream() { public OutputStream asOutputStream() {
return this.buffer.asOutputStream(); return this.buffer.asOutputStream();
} }
} }

41
spring-web-reactive/src/main/java/org/springframework/core/codec/support/SseEventEncoder.java → spring-web-reactive/src/main/java/org/springframework/http/codec/SseEventEncoder.java

@ -14,8 +14,9 @@
* limitations under the License. * limitations under the License.
*/ */
package org.springframework.core.codec.support; package org.springframework.http.codec;
import java.nio.charset.StandardCharsets;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
@ -26,6 +27,7 @@ import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType; import org.springframework.core.ResolvableType;
import org.springframework.core.codec.CodecException; import org.springframework.core.codec.CodecException;
import org.springframework.core.codec.Encoder; import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.support.AbstractEncoder;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.FlushingDataBuffer; import org.springframework.core.io.buffer.FlushingDataBuffer;
@ -40,24 +42,22 @@ import org.springframework.web.reactive.sse.SseEvent;
*/ */
public class SseEventEncoder extends AbstractEncoder<Object> { public class SseEventEncoder extends AbstractEncoder<Object> {
private final Encoder<String> stringEncoder;
private final List<Encoder<?>> dataEncoders; private final List<Encoder<?>> dataEncoders;
public SseEventEncoder(Encoder<String> stringEncoder, List<Encoder<?>> dataEncoders) { public SseEventEncoder(List<Encoder<?>> dataEncoders) {
super(new MimeType("text", "event-stream")); super(new MimeType("text", "event-stream"));
Assert.notNull(stringEncoder, "'stringEncoder' must not be null");
Assert.notNull(dataEncoders, "'dataEncoders' must not be null"); Assert.notNull(dataEncoders, "'dataEncoders' must not be null");
this.stringEncoder = stringEncoder;
this.dataEncoders = dataEncoders; this.dataEncoders = dataEncoders;
} }
@Override @Override
public Flux<DataBuffer> encode(Publisher<?> inputStream, DataBufferFactory bufferFactory, ResolvableType type, MimeType sseMimeType, Object... hints) { public Flux<DataBuffer> encode(Publisher<?> inputStream, DataBufferFactory bufferFactory,
ResolvableType type, MimeType sseMimeType, Object... hints) {
return Flux.from(inputStream).flatMap(input -> { return Flux.from(inputStream).flatMap(input -> {
SseEvent event = (SseEvent.class.equals(type.getRawClass()) ? (SseEvent)input : new SseEvent(input)); SseEvent event = (SseEvent.class.equals(type.getRawClass()) ?
(SseEvent)input : new SseEvent(input));
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
@ -87,12 +87,11 @@ public class SseEventEncoder extends AbstractEncoder<Object> {
Object data = event.getData(); Object data = event.getData();
Flux<DataBuffer> dataBuffer = Flux.empty(); Flux<DataBuffer> dataBuffer = Flux.empty();
MimeType stringMimeType = this.stringEncoder.getEncodableMimeTypes().get(0);
MimeType mimeType = (event.getMimeType() == null ? MimeType mimeType = (event.getMimeType() == null ?
(data instanceof String ? stringMimeType : new MimeType("*")) : event.getMimeType()); new MimeType("*") : event.getMimeType());
if (data != null) { if (data != null) {
sb.append("data:"); sb.append("data:");
if (data instanceof String && mimeType.isCompatibleWith(stringMimeType)) { if (data instanceof String) {
sb.append(((String)data).replaceAll("\\n", "\ndata:")).append("\n"); sb.append(((String)data).replaceAll("\\n", "\ndata:")).append("\n");
} }
else { else {
@ -103,8 +102,9 @@ public class SseEventEncoder extends AbstractEncoder<Object> {
if (encoder.isPresent()) { if (encoder.isPresent()) {
dataBuffer = ((Encoder<Object>)encoder.get()) dataBuffer = ((Encoder<Object>)encoder.get())
.encode(Mono.just(data), bufferFactory, ResolvableType.forClass(data.getClass()), mimeType) .encode(Mono.just(data), bufferFactory,
.concatWith(encodeString("\n", bufferFactory, stringMimeType)); ResolvableType.forClass(data.getClass()), mimeType)
.concatWith(encodeString("\n", bufferFactory));
} }
else { else {
throw new CodecException("No suitable encoder found!"); throw new CodecException("No suitable encoder found!");
@ -112,16 +112,19 @@ public class SseEventEncoder extends AbstractEncoder<Object> {
} }
} }
return Flux return Flux.concat(
.concat(encodeString(sb.toString(), bufferFactory, stringMimeType), dataBuffer) encodeString(sb.toString(), bufferFactory),
.reduce((buf1, buf2) -> buf1.write(buf2)) dataBuffer,
.concatWith(encodeString("\n", bufferFactory, stringMimeType).map(b -> new FlushingDataBuffer(b))); encodeString("\n", bufferFactory).map(b -> new FlushingDataBuffer(b))
);
}); });
} }
private Flux<DataBuffer> encodeString(String str, DataBufferFactory bufferFactory, MimeType mimeType) { private Mono<DataBuffer> encodeString(String str, DataBufferFactory bufferFactory) {
return stringEncoder.encode(Mono.just(str), bufferFactory, ResolvableType.forClass(String.class), mimeType); byte[] bytes = str.getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = bufferFactory.allocateBuffer(bytes.length).write(bytes);
return Mono.just(buffer);
} }
} }

16
spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/SseHttpMessageConverter.java

@ -26,9 +26,7 @@ import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType; import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Encoder; import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.support.JacksonJsonEncoder; import org.springframework.http.codec.SseEventEncoder;
import org.springframework.core.codec.support.SseEventEncoder;
import org.springframework.core.codec.support.StringEncoder;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpOutputMessage; import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.web.reactive.sse.SseEvent; import org.springframework.web.reactive.sse.SseEvent;
@ -51,16 +49,10 @@ import org.springframework.web.reactive.sse.SseEvent;
public class SseHttpMessageConverter extends CodecHttpMessageConverter<Object> { public class SseHttpMessageConverter extends CodecHttpMessageConverter<Object> {
/** /**
* Default constructor that creates a new instance configured with {@link StringEncoder} * Constructor that creates a new instance configured with the specified data encoders.
* and {@link JacksonJsonEncoder} encoders.
*/ */
public SseHttpMessageConverter() { public SseHttpMessageConverter(List<Encoder<?>> dataEncoders) {
this(new StringEncoder(), Arrays.asList(new JacksonJsonEncoder())); super(new SseEventEncoder(dataEncoders), null);
}
public SseHttpMessageConverter(Encoder<String> stringEncoder, List<Encoder<?>> dataEncoders) {
// 1 SseEvent element = 1 DataBuffer element so flush after each element
super(new SseEventEncoder(stringEncoder, dataEncoders), null);
} }
@Override @Override

23
spring-web-reactive/src/test/java/org/springframework/core/codec/support/SseEventEncoderTests.java

@ -26,6 +26,7 @@ import reactor.core.test.TestSubscriber;
import org.springframework.core.ResolvableType; import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase; import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.codec.SseEventEncoder;
import org.springframework.util.MimeType; import org.springframework.util.MimeType;
import org.springframework.web.reactive.sse.SseEvent; import org.springframework.web.reactive.sse.SseEvent;
@ -40,25 +41,25 @@ public class SseEventEncoderTests extends AbstractDataBufferAllocatingTestCase {
@Test @Test
public void nullMimeType() { public void nullMimeType() {
SseEventEncoder encoder = new SseEventEncoder(new StringEncoder(), Arrays.asList(new JacksonJsonEncoder())); SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder()));
assertTrue(encoder.canEncode(ResolvableType.forClass(Object.class), null)); assertTrue(encoder.canEncode(ResolvableType.forClass(Object.class), null));
} }
@Test @Test
public void unsupportedMimeType() { public void unsupportedMimeType() {
SseEventEncoder encoder = new SseEventEncoder(new StringEncoder(), Arrays.asList(new JacksonJsonEncoder())); SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder()));
assertFalse(encoder.canEncode(ResolvableType.forClass(Object.class), new MimeType("foo", "bar"))); assertFalse(encoder.canEncode(ResolvableType.forClass(Object.class), new MimeType("foo", "bar")));
} }
@Test @Test
public void supportedMimeType() { public void supportedMimeType() {
SseEventEncoder encoder = new SseEventEncoder(new StringEncoder(), Arrays.asList(new JacksonJsonEncoder())); SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder()));
assertTrue(encoder.canEncode(ResolvableType.forClass(Object.class), new MimeType("text", "event-stream"))); assertTrue(encoder.canEncode(ResolvableType.forClass(Object.class), new MimeType("text", "event-stream")));
} }
@Test @Test
public void encodeServerSentEvent() { public void encodeServerSentEvent() {
SseEventEncoder encoder = new SseEventEncoder(new StringEncoder(), Arrays.asList(new JacksonJsonEncoder())); SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder()));
SseEvent event = new SseEvent(); SseEvent event = new SseEvent();
event.setId("c42"); event.setId("c42");
event.setName("foo"); event.setName("foo");
@ -82,7 +83,7 @@ public class SseEventEncoderTests extends AbstractDataBufferAllocatingTestCase {
@Test @Test
public void encodeString() { public void encodeString() {
SseEventEncoder encoder = new SseEventEncoder(new StringEncoder(), Arrays.asList(new JacksonJsonEncoder())); SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder()));
Flux<String> source = Flux.just("foo", "bar"); Flux<String> source = Flux.just("foo", "bar");
Flux<DataBuffer> output = encoder.encode(source, this.dataBufferFactory, Flux<DataBuffer> output = encoder.encode(source, this.dataBufferFactory,
ResolvableType.forClass(String.class), new MimeType("text", "event-stream")); ResolvableType.forClass(String.class), new MimeType("text", "event-stream"));
@ -99,7 +100,7 @@ public class SseEventEncoderTests extends AbstractDataBufferAllocatingTestCase {
@Test @Test
public void encodeMultilineString() { public void encodeMultilineString() {
SseEventEncoder encoder = new SseEventEncoder(new StringEncoder(), Arrays.asList(new JacksonJsonEncoder())); SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder()));
Flux<String> source = Flux.just("foo\nbar", "foo\nbaz"); Flux<String> source = Flux.just("foo\nbar", "foo\nbaz");
Flux<DataBuffer> output = encoder.encode(source, this.dataBufferFactory, Flux<DataBuffer> output = encoder.encode(source, this.dataBufferFactory,
ResolvableType.forClass(String.class), new MimeType("text", "event-stream")); ResolvableType.forClass(String.class), new MimeType("text", "event-stream"));
@ -117,7 +118,7 @@ public class SseEventEncoderTests extends AbstractDataBufferAllocatingTestCase {
@Test @Test
public void encodePojo() { public void encodePojo() {
SseEventEncoder encoder = new SseEventEncoder(new StringEncoder(), Arrays.asList(new JacksonJsonEncoder())); SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder()));
Flux<Pojo> source = Flux.just(new Pojo("foofoo", "barbar"), new Pojo("foofoofoo", "barbarbar")); Flux<Pojo> source = Flux.just(new Pojo("foofoo", "barbar"), new Pojo("foofoofoo", "barbarbar"));
Flux<DataBuffer> output = encoder.encode(source, this.dataBufferFactory, Flux<DataBuffer> output = encoder.encode(source, this.dataBufferFactory,
ResolvableType.forClass(Pojo.class), new MimeType("text", "event-stream")); ResolvableType.forClass(Pojo.class), new MimeType("text", "event-stream"));
@ -125,9 +126,13 @@ public class SseEventEncoderTests extends AbstractDataBufferAllocatingTestCase {
.subscribe(output) .subscribe(output)
.assertNoError() .assertNoError()
.assertValuesWith( .assertValuesWith(
stringConsumer("data:{\"foo\":\"foofoo\",\"bar\":\"barbar\"}\n"), stringConsumer("data:"),
stringConsumer("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}"),
stringConsumer("\n"),
stringConsumer("\n"),
stringConsumer("data:"),
stringConsumer("{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}"),
stringConsumer("\n"), stringConsumer("\n"),
stringConsumer("data:{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}\n"),
stringConsumer("\n") stringConsumer("\n")
); );
} }

2
spring-web-reactive/src/test/java/org/springframework/http/server/reactive/FlushingIntegrationTests.java

@ -62,6 +62,8 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest
return new FlushingHandler(); return new FlushingHandler();
} }
// Handler that never completes designed to test if flushing is perform correctly when
// a FlushingDataBuffer is written
private static class FlushingHandler implements HttpHandler { private static class FlushingHandler implements HttpHandler {
@Override @Override

52
spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java

@ -34,14 +34,9 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.core.codec.support.ByteBufferDecoder; import org.springframework.core.codec.support.ByteBufferDecoder;
import org.springframework.core.codec.support.JacksonJsonDecoder; import org.springframework.core.codec.support.JacksonJsonDecoder;
import org.springframework.core.codec.support.JacksonJsonEncoder;
import org.springframework.core.codec.support.JsonObjectDecoder; import org.springframework.core.codec.support.JsonObjectDecoder;
import org.springframework.core.codec.support.StringDecoder; import org.springframework.core.codec.support.StringDecoder;
import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.support.GenericConversionService;
import org.springframework.core.convert.support.ReactiveStreamsToCompletableFutureConverter;
import org.springframework.core.convert.support.ReactiveStreamsToRxJava1Converter;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorHttpClientRequestFactory; import org.springframework.http.client.reactive.ReactorHttpClientRequestFactory;
import org.springframework.http.converter.reactive.HttpMessageConverter; import org.springframework.http.converter.reactive.HttpMessageConverter;
@ -56,7 +51,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.reactive.WebClient; import org.springframework.web.client.reactive.WebClient;
import org.springframework.web.reactive.DispatcherHandler; import org.springframework.web.reactive.DispatcherHandler;
import org.springframework.web.reactive.result.SimpleResultHandler; import org.springframework.web.reactive.config.WebReactiveConfiguration;
import org.springframework.web.reactive.sse.SseEvent; import org.springframework.web.reactive.sse.SseEvent;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder; import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
@ -108,7 +103,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
.perform(get("http://localhost:" + port + "/sse/string") .perform(get("http://localhost:" + port + "/sse/string")
.accept(new MediaType("text", "event-stream"))) .accept(new MediaType("text", "event-stream")))
.extract(bodyStream(String.class)) .extract(bodyStream(String.class))
.take(Duration.ofMillis(500)) .take(Duration.ofMillis(1000))
.reduce((s1, s2) -> s1 + s2); .reduce((s1, s2) -> s1 + s2);
TestSubscriber TestSubscriber
@ -123,7 +118,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
.perform(get("http://localhost:" + port + "/sse/person") .perform(get("http://localhost:" + port + "/sse/person")
.accept(new MediaType("text", "event-stream"))) .accept(new MediaType("text", "event-stream")))
.extract(bodyStream(String.class)) .extract(bodyStream(String.class))
.take(Duration.ofMillis(500)) .take(Duration.ofMillis(1000))
.reduce((s1, s2) -> s1 + s2); .reduce((s1, s2) -> s1 + s2);
TestSubscriber TestSubscriber
@ -138,7 +133,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
.perform(get("http://localhost:" + port + "/sse/event") .perform(get("http://localhost:" + port + "/sse/event")
.accept(new MediaType("text", "event-stream"))) .accept(new MediaType("text", "event-stream")))
.extract(bodyStream(String.class)) .extract(bodyStream(String.class))
.take(Duration.ofMillis(500)) .take(Duration.ofMillis(1000))
.reduce((s1, s2) -> s1 + s2); .reduce((s1, s2) -> s1 + s2);
TestSubscriber TestSubscriber
@ -176,46 +171,17 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
@Configuration @Configuration
@SuppressWarnings("unused") @SuppressWarnings("unused")
static class TestConfiguration { static class TestConfiguration extends WebReactiveConfiguration {
private DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
@Bean @Bean
public SseController sseController() { public SseController sseController() {
return new SseController(); return new SseController();
} }
@Bean @Override
public RequestMappingHandlerMapping handlerMapping() { protected void extendMessageConverters(List<HttpMessageConverter<?>> converters) {
return new RequestMappingHandlerMapping(); converters.add(new SseHttpMessageConverter(Arrays.asList(new JacksonJsonEncoder())));
}
@Bean
public RequestMappingHandlerAdapter handlerAdapter() {
RequestMappingHandlerAdapter handlerAdapter = new RequestMappingHandlerAdapter();
handlerAdapter.setConversionService(conversionService());
return handlerAdapter;
}
@Bean
public ConversionService conversionService() {
GenericConversionService service = new GenericConversionService();
service.addConverter(new ReactiveStreamsToCompletableFutureConverter());
service.addConverter(new ReactiveStreamsToRxJava1Converter());
return service;
}
@Bean
public ResponseBodyResultHandler responseBodyResultHandler() {
List<HttpMessageConverter<?>> converters = Arrays.asList(new SseHttpMessageConverter());
return new ResponseBodyResultHandler(converters, conversionService());
}
@Bean
public SimpleResultHandler simpleHandlerResultHandler() {
return new SimpleResultHandler(conversionService());
} }
} }
private static class Person { private static class Person {

Loading…
Cancel
Save