diff --git a/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/FlushingDataBuffer.java b/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/FlushingDataBuffer.java index f11f201959f..c1a16ac1b37 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/FlushingDataBuffer.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/io/buffer/FlushingDataBuffer.java @@ -33,6 +33,10 @@ public class FlushingDataBuffer implements DataBuffer { private final DataBuffer buffer; + public FlushingDataBuffer() { + this.buffer = new DefaultDataBufferFactory().allocateBuffer(0); + } + public FlushingDataBuffer(DataBuffer buffer) { Assert.notNull(buffer); this.buffer = buffer; @@ -85,7 +89,7 @@ public class FlushingDataBuffer implements DataBuffer { @Override public DataBuffer write(byte[] source, int offset, int length) { - return this.write(source, offset, length); + return this.buffer.write(source, offset, length); } @Override @@ -117,4 +121,5 @@ public class FlushingDataBuffer implements DataBuffer { public OutputStream asOutputStream() { return this.buffer.asOutputStream(); } + } diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/SseEventEncoder.java b/spring-web-reactive/src/main/java/org/springframework/http/codec/SseEventEncoder.java similarity index 70% rename from spring-web-reactive/src/main/java/org/springframework/core/codec/support/SseEventEncoder.java rename to spring-web-reactive/src/main/java/org/springframework/http/codec/SseEventEncoder.java index 8d95cc8cdac..ffeae04ab84 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/SseEventEncoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/codec/SseEventEncoder.java @@ -14,8 +14,9 @@ * limitations under the License. */ -package org.springframework.core.codec.support; +package org.springframework.http.codec; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Optional; @@ -26,6 +27,7 @@ import reactor.core.publisher.Mono; import org.springframework.core.ResolvableType; import org.springframework.core.codec.CodecException; import org.springframework.core.codec.Encoder; +import org.springframework.core.codec.support.AbstractEncoder; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.FlushingDataBuffer; @@ -40,24 +42,22 @@ import org.springframework.web.reactive.sse.SseEvent; */ public class SseEventEncoder extends AbstractEncoder { - private final Encoder stringEncoder; - private final List> dataEncoders; - public SseEventEncoder(Encoder stringEncoder, List> dataEncoders) { + public SseEventEncoder(List> dataEncoders) { super(new MimeType("text", "event-stream")); - Assert.notNull(stringEncoder, "'stringEncoder' must not be null"); Assert.notNull(dataEncoders, "'dataEncoders' must not be null"); - this.stringEncoder = stringEncoder; this.dataEncoders = dataEncoders; } @Override - public Flux encode(Publisher inputStream, DataBufferFactory bufferFactory, ResolvableType type, MimeType sseMimeType, Object... hints) { + public Flux encode(Publisher inputStream, DataBufferFactory bufferFactory, + ResolvableType type, MimeType sseMimeType, Object... hints) { return Flux.from(inputStream).flatMap(input -> { - SseEvent event = (SseEvent.class.equals(type.getRawClass()) ? (SseEvent)input : new SseEvent(input)); + SseEvent event = (SseEvent.class.equals(type.getRawClass()) ? + (SseEvent)input : new SseEvent(input)); StringBuilder sb = new StringBuilder(); @@ -87,12 +87,11 @@ public class SseEventEncoder extends AbstractEncoder { Object data = event.getData(); Flux dataBuffer = Flux.empty(); - MimeType stringMimeType = this.stringEncoder.getEncodableMimeTypes().get(0); MimeType mimeType = (event.getMimeType() == null ? - (data instanceof String ? stringMimeType : new MimeType("*")) : event.getMimeType()); + new MimeType("*") : event.getMimeType()); if (data != null) { sb.append("data:"); - if (data instanceof String && mimeType.isCompatibleWith(stringMimeType)) { + if (data instanceof String) { sb.append(((String)data).replaceAll("\\n", "\ndata:")).append("\n"); } else { @@ -103,8 +102,9 @@ public class SseEventEncoder extends AbstractEncoder { if (encoder.isPresent()) { dataBuffer = ((Encoder)encoder.get()) - .encode(Mono.just(data), bufferFactory, ResolvableType.forClass(data.getClass()), mimeType) - .concatWith(encodeString("\n", bufferFactory, stringMimeType)); + .encode(Mono.just(data), bufferFactory, + ResolvableType.forClass(data.getClass()), mimeType) + .concatWith(encodeString("\n", bufferFactory)); } else { throw new CodecException("No suitable encoder found!"); @@ -112,16 +112,19 @@ public class SseEventEncoder extends AbstractEncoder { } } - return Flux - .concat(encodeString(sb.toString(), bufferFactory, stringMimeType), dataBuffer) - .reduce((buf1, buf2) -> buf1.write(buf2)) - .concatWith(encodeString("\n", bufferFactory, stringMimeType).map(b -> new FlushingDataBuffer(b))); + return Flux.concat( + encodeString(sb.toString(), bufferFactory), + dataBuffer, + encodeString("\n", bufferFactory).map(b -> new FlushingDataBuffer(b)) + ); }); } - private Flux encodeString(String str, DataBufferFactory bufferFactory, MimeType mimeType) { - return stringEncoder.encode(Mono.just(str), bufferFactory, ResolvableType.forClass(String.class), mimeType); + private Mono encodeString(String str, DataBufferFactory bufferFactory) { + byte[] bytes = str.getBytes(StandardCharsets.UTF_8); + DataBuffer buffer = bufferFactory.allocateBuffer(bytes.length).write(bytes); + return Mono.just(buffer); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/SseHttpMessageConverter.java b/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/SseHttpMessageConverter.java index 7f1100a9967..705bed7d753 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/SseHttpMessageConverter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/SseHttpMessageConverter.java @@ -26,9 +26,7 @@ import reactor.core.publisher.Mono; import org.springframework.core.ResolvableType; import org.springframework.core.codec.Encoder; -import org.springframework.core.codec.support.JacksonJsonEncoder; -import org.springframework.core.codec.support.SseEventEncoder; -import org.springframework.core.codec.support.StringEncoder; +import org.springframework.http.codec.SseEventEncoder; import org.springframework.http.MediaType; import org.springframework.http.ReactiveHttpOutputMessage; import org.springframework.web.reactive.sse.SseEvent; @@ -51,16 +49,10 @@ import org.springframework.web.reactive.sse.SseEvent; public class SseHttpMessageConverter extends CodecHttpMessageConverter { /** - * Default constructor that creates a new instance configured with {@link StringEncoder} - * and {@link JacksonJsonEncoder} encoders. + * Constructor that creates a new instance configured with the specified data encoders. */ - public SseHttpMessageConverter() { - this(new StringEncoder(), Arrays.asList(new JacksonJsonEncoder())); - } - - public SseHttpMessageConverter(Encoder stringEncoder, List> dataEncoders) { - // 1 SseEvent element = 1 DataBuffer element so flush after each element - super(new SseEventEncoder(stringEncoder, dataEncoders), null); + public SseHttpMessageConverter(List> dataEncoders) { + super(new SseEventEncoder(dataEncoders), null); } @Override diff --git a/spring-web-reactive/src/test/java/org/springframework/core/codec/support/SseEventEncoderTests.java b/spring-web-reactive/src/test/java/org/springframework/core/codec/support/SseEventEncoderTests.java index b4d86264e7c..3d6807829c9 100644 --- a/spring-web-reactive/src/test/java/org/springframework/core/codec/support/SseEventEncoderTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/core/codec/support/SseEventEncoderTests.java @@ -26,6 +26,7 @@ import reactor.core.test.TestSubscriber; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.http.codec.SseEventEncoder; import org.springframework.util.MimeType; import org.springframework.web.reactive.sse.SseEvent; @@ -40,25 +41,25 @@ public class SseEventEncoderTests extends AbstractDataBufferAllocatingTestCase { @Test public void nullMimeType() { - SseEventEncoder encoder = new SseEventEncoder(new StringEncoder(), Arrays.asList(new JacksonJsonEncoder())); + SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder())); assertTrue(encoder.canEncode(ResolvableType.forClass(Object.class), null)); } @Test public void unsupportedMimeType() { - SseEventEncoder encoder = new SseEventEncoder(new StringEncoder(), Arrays.asList(new JacksonJsonEncoder())); + SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder())); assertFalse(encoder.canEncode(ResolvableType.forClass(Object.class), new MimeType("foo", "bar"))); } @Test public void supportedMimeType() { - SseEventEncoder encoder = new SseEventEncoder(new StringEncoder(), Arrays.asList(new JacksonJsonEncoder())); + SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder())); assertTrue(encoder.canEncode(ResolvableType.forClass(Object.class), new MimeType("text", "event-stream"))); } @Test public void encodeServerSentEvent() { - SseEventEncoder encoder = new SseEventEncoder(new StringEncoder(), Arrays.asList(new JacksonJsonEncoder())); + SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder())); SseEvent event = new SseEvent(); event.setId("c42"); event.setName("foo"); @@ -82,7 +83,7 @@ public class SseEventEncoderTests extends AbstractDataBufferAllocatingTestCase { @Test public void encodeString() { - SseEventEncoder encoder = new SseEventEncoder(new StringEncoder(), Arrays.asList(new JacksonJsonEncoder())); + SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder())); Flux source = Flux.just("foo", "bar"); Flux output = encoder.encode(source, this.dataBufferFactory, ResolvableType.forClass(String.class), new MimeType("text", "event-stream")); @@ -99,7 +100,7 @@ public class SseEventEncoderTests extends AbstractDataBufferAllocatingTestCase { @Test public void encodeMultilineString() { - SseEventEncoder encoder = new SseEventEncoder(new StringEncoder(), Arrays.asList(new JacksonJsonEncoder())); + SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder())); Flux source = Flux.just("foo\nbar", "foo\nbaz"); Flux output = encoder.encode(source, this.dataBufferFactory, ResolvableType.forClass(String.class), new MimeType("text", "event-stream")); @@ -117,7 +118,7 @@ public class SseEventEncoderTests extends AbstractDataBufferAllocatingTestCase { @Test public void encodePojo() { - SseEventEncoder encoder = new SseEventEncoder(new StringEncoder(), Arrays.asList(new JacksonJsonEncoder())); + SseEventEncoder encoder = new SseEventEncoder(Arrays.asList(new JacksonJsonEncoder())); Flux source = Flux.just(new Pojo("foofoo", "barbar"), new Pojo("foofoofoo", "barbarbar")); Flux output = encoder.encode(source, this.dataBufferFactory, ResolvableType.forClass(Pojo.class), new MimeType("text", "event-stream")); @@ -125,9 +126,13 @@ public class SseEventEncoderTests extends AbstractDataBufferAllocatingTestCase { .subscribe(output) .assertNoError() .assertValuesWith( - stringConsumer("data:{\"foo\":\"foofoo\",\"bar\":\"barbar\"}\n"), + stringConsumer("data:"), + stringConsumer("{\"foo\":\"foofoo\",\"bar\":\"barbar\"}"), + stringConsumer("\n"), + stringConsumer("\n"), + stringConsumer("data:"), + stringConsumer("{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}"), stringConsumer("\n"), - stringConsumer("data:{\"foo\":\"foofoofoo\",\"bar\":\"barbarbar\"}\n"), stringConsumer("\n") ); } diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/FlushingIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/FlushingIntegrationTests.java index 344a49283c8..8dfc741a5e5 100644 --- a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/FlushingIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/FlushingIntegrationTests.java @@ -62,6 +62,8 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest return new FlushingHandler(); } + // Handler that never completes designed to test if flushing is perform correctly when + // a FlushingDataBuffer is written private static class FlushingHandler implements HttpHandler { @Override diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java index 23397d340fc..2c98530d8ce 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java @@ -34,14 +34,9 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.codec.support.ByteBufferDecoder; import org.springframework.core.codec.support.JacksonJsonDecoder; +import org.springframework.core.codec.support.JacksonJsonEncoder; import org.springframework.core.codec.support.JsonObjectDecoder; import org.springframework.core.codec.support.StringDecoder; -import org.springframework.core.convert.ConversionService; -import org.springframework.core.convert.support.GenericConversionService; -import org.springframework.core.convert.support.ReactiveStreamsToCompletableFutureConverter; -import org.springframework.core.convert.support.ReactiveStreamsToRxJava1Converter; -import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.http.MediaType; import org.springframework.http.client.reactive.ReactorHttpClientRequestFactory; import org.springframework.http.converter.reactive.HttpMessageConverter; @@ -56,7 +51,7 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.client.reactive.WebClient; import org.springframework.web.reactive.DispatcherHandler; -import org.springframework.web.reactive.result.SimpleResultHandler; +import org.springframework.web.reactive.config.WebReactiveConfiguration; import org.springframework.web.reactive.sse.SseEvent; import org.springframework.web.server.adapter.WebHttpHandlerBuilder; @@ -108,7 +103,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { .perform(get("http://localhost:" + port + "/sse/string") .accept(new MediaType("text", "event-stream"))) .extract(bodyStream(String.class)) - .take(Duration.ofMillis(500)) + .take(Duration.ofMillis(1000)) .reduce((s1, s2) -> s1 + s2); TestSubscriber @@ -123,7 +118,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { .perform(get("http://localhost:" + port + "/sse/person") .accept(new MediaType("text", "event-stream"))) .extract(bodyStream(String.class)) - .take(Duration.ofMillis(500)) + .take(Duration.ofMillis(1000)) .reduce((s1, s2) -> s1 + s2); TestSubscriber @@ -138,7 +133,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { .perform(get("http://localhost:" + port + "/sse/event") .accept(new MediaType("text", "event-stream"))) .extract(bodyStream(String.class)) - .take(Duration.ofMillis(500)) + .take(Duration.ofMillis(1000)) .reduce((s1, s2) -> s1 + s2); TestSubscriber @@ -176,46 +171,17 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { @Configuration @SuppressWarnings("unused") - static class TestConfiguration { - - private DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory(); + static class TestConfiguration extends WebReactiveConfiguration { @Bean public SseController sseController() { return new SseController(); } - @Bean - public RequestMappingHandlerMapping handlerMapping() { - return new RequestMappingHandlerMapping(); - } - - @Bean - public RequestMappingHandlerAdapter handlerAdapter() { - RequestMappingHandlerAdapter handlerAdapter = new RequestMappingHandlerAdapter(); - handlerAdapter.setConversionService(conversionService()); - return handlerAdapter; - } - - @Bean - public ConversionService conversionService() { - GenericConversionService service = new GenericConversionService(); - service.addConverter(new ReactiveStreamsToCompletableFutureConverter()); - service.addConverter(new ReactiveStreamsToRxJava1Converter()); - return service; - } - - @Bean - public ResponseBodyResultHandler responseBodyResultHandler() { - List> converters = Arrays.asList(new SseHttpMessageConverter()); - return new ResponseBodyResultHandler(converters, conversionService()); - } - - @Bean - public SimpleResultHandler simpleHandlerResultHandler() { - return new SimpleResultHandler(conversionService()); + @Override + protected void extendMessageConverters(List> converters) { + converters.add(new SseHttpMessageConverter(Arrays.asList(new JacksonJsonEncoder()))); } - } private static class Person {