From 66bd2776717f9916aed9b1d1680682db677cca08 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 19 Apr 2018 11:29:12 -0400 Subject: [PATCH] Use StringDecoder to split SSE stream ServerSentEventHttpMessageReader had logic to split on new lines and buffer until an empty new line (start of a new event). To account for random data chunking, it later re-assembled the lines for each event and split again on new lines. However bufferUntil was still unreliable a chunk may contain nothing but a newline, which doesn't necessarily mean an empty newline in the overall SSE stream. This commit simplifies the above by delegating the splitting of the stream along newlines to StringDecoder. Issue: SPR-16744 --- .../ServerSentEventHttpMessageReader.java | 93 +++++++------------ .../annotation/SseIntegrationTests.java | 64 ++++++------- 2 files changed, 62 insertions(+), 95 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java b/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java index b41910f103f..1d17ba88363 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java +++ b/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java @@ -16,15 +16,11 @@ package org.springframework.http.codec; -import java.nio.CharBuffer; import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.function.IntPredicate; -import java.util.stream.Collectors; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -35,7 +31,6 @@ import org.springframework.core.codec.Decoder; import org.springframework.core.codec.StringDecoder; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.http.MediaType; import org.springframework.http.ReactiveHttpInputMessage; @@ -51,12 +46,12 @@ import org.springframework.lang.Nullable; */ public class ServerSentEventHttpMessageReader implements HttpMessageReader { - private static final IntPredicate NEWLINE_DELIMITER = b -> b == '\n' || b == '\r'; - private static final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); private static final StringDecoder stringDecoder = StringDecoder.textPlainOnly(); + private static final ResolvableType STRING_TYPE = ResolvableType.forClass(String.class); + @Nullable private final Decoder decoder; @@ -110,77 +105,53 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader { - CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer.asByteBuffer()); - DataBufferUtils.release(buffer); - return charBuffer.toString(); - }) - .bufferUntil(line -> line.equals("\n")) - .concatMap(rawLines -> { - String[] lines = rawLines.stream().collect(Collectors.joining()).split("\\r?\\n"); - return buildEvent(lines, valueType, hints) - .filter(event -> shouldWrap || event.data() != null) - .map(event -> shouldWrap ? event : event.data()); - }) - .cast(Object.class); - } - - private static Flux splitOnNewline(DataBuffer dataBuffer) { - List results = new ArrayList<>(); - int startIdx = 0; - int endIdx; - final int limit = dataBuffer.readableByteCount(); - do { - endIdx = dataBuffer.indexOf(NEWLINE_DELIMITER, startIdx); - int length = endIdx != -1 ? endIdx - startIdx + 1 : limit - startIdx; - DataBuffer token = dataBuffer.slice(startIdx, length); - results.add(DataBufferUtils.retain(token)); - startIdx = endIdx + 1; - } - while (startIdx < limit && endIdx != -1); - DataBufferUtils.release(dataBuffer); - return Flux.fromIterable(results); + return stringDecoder.decode(message.getBody(), STRING_TYPE, null, Collections.emptyMap()) + .bufferUntil(line -> line.equals("")) + .concatMap(lines -> buildEvent(lines, valueType, shouldWrap, hints)); } - private Mono> buildEvent(String[] lines, ResolvableType valueType, + private Mono buildEvent(List lines, ResolvableType valueType, boolean shouldWrap, Map hints) { - ServerSentEvent.Builder sseBuilder = ServerSentEvent.builder(); + ServerSentEvent.Builder sseBuilder = shouldWrap ? ServerSentEvent.builder() : null; StringBuilder data = null; StringBuilder comment = null; for (String line : lines) { - if (line.startsWith("id:")) { - sseBuilder.id(line.substring(3)); - } - else if (line.startsWith("event:")) { - sseBuilder.event(line.substring(6)); - } - else if (line.startsWith("data:")) { + if (line.startsWith("data:")) { data = (data != null ? data : new StringBuilder()); data.append(line.substring(5)).append("\n"); } - else if (line.startsWith("retry:")) { - sseBuilder.retry(Duration.ofMillis(Long.valueOf(line.substring(6)))); - } - else if (line.startsWith(":")) { - comment = (comment != null ? comment : new StringBuilder()); - comment.append(line.substring(1)).append("\n"); + if (shouldWrap) { + if (line.startsWith("id:")) { + sseBuilder.id(line.substring(3)); + } + else if (line.startsWith("event:")) { + sseBuilder.event(line.substring(6)); + } + else if (line.startsWith("retry:")) { + sseBuilder.retry(Duration.ofMillis(Long.valueOf(line.substring(6)))); + } + else if (line.startsWith(":")) { + comment = (comment != null ? comment : new StringBuilder()); + comment.append(line.substring(1)).append("\n"); + } } } - if (comment != null) { - sseBuilder.comment(comment.toString().substring(0, comment.length() - 1)); - } - if (data != null) { - return decodeData(data.toString(), valueType, hints).map(decodedData -> { - sseBuilder.data(decodedData); + + Mono decodedData = (data != null ? decodeData(data.toString(), valueType, hints) : Mono.empty()); + + if (shouldWrap) { + if (comment != null) { + sseBuilder.comment(comment.toString().substring(0, comment.length() - 1)); + } + return decodedData.map(o -> { + sseBuilder.data(o); return sseBuilder.build(); }); } else { - return Mono.just(sseBuilder.build()); + return decodedData; } } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java index e32a8c1e79b..932566a7f48 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java @@ -18,6 +18,7 @@ package org.springframework.web.reactive.result.method.annotation; import java.time.Duration; +import org.junit.Assume; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; @@ -32,8 +33,10 @@ import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.codec.ServerSentEvent; import org.springframework.http.server.reactive.AbstractHttpHandlerIntegrationTests; import org.springframework.http.server.reactive.HttpHandler; +import org.springframework.http.server.reactive.bootstrap.JettyHttpServer; import org.springframework.http.server.reactive.bootstrap.ReactorHttpServer; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.reactive.DispatcherHandler; import org.springframework.web.reactive.config.EnableWebFlux; @@ -103,51 +106,42 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { @Test public void sseAsEvent() { - Flux> result = this.webClient.get() + + Assume.assumeTrue(server instanceof JettyHttpServer); + + Flux> result = this.webClient.get() .uri("/event") .accept(TEXT_EVENT_STREAM) .retrieve() - .bodyToFlux(new ParameterizedTypeReference>() {}); + .bodyToFlux(new ParameterizedTypeReference>() {}); - StepVerifier.create(result) - .consumeNextWith( event -> { - assertEquals("0", event.id()); - assertEquals("foo", event.data()); - assertEquals("bar", event.comment()); - assertNull(event.event()); - assertNull(event.retry()); - }) - .consumeNextWith( event -> { - assertEquals("1", event.id()); - assertEquals("foo", event.data()); - assertEquals("bar", event.comment()); - assertNull(event.event()); - assertNull(event.retry()); - }) - .thenCancel() - .verify(Duration.ofSeconds(5L)); + verifyPersonEvents(result); } @Test public void sseAsEventWithoutAcceptHeader() { - Flux> result = this.webClient.get() + Flux> result = this.webClient.get() .uri("/event") .accept(TEXT_EVENT_STREAM) .retrieve() - .bodyToFlux(new ParameterizedTypeReference>() {}); + .bodyToFlux(new ParameterizedTypeReference>() {}); + + verifyPersonEvents(result); + } + private void verifyPersonEvents(Flux> result) { StepVerifier.create(result) .consumeNextWith( event -> { assertEquals("0", event.id()); - assertEquals("foo", event.data()); - assertEquals("bar", event.comment()); + assertEquals(new Person("foo 0"), event.data()); + assertEquals("bar 0", event.comment()); assertNull(event.event()); assertNull(event.retry()); }) .consumeNextWith( event -> { assertEquals("1", event.id()); - assertEquals("foo", event.data()); - assertEquals("bar", event.comment()); + assertEquals(new Person("foo 1"), event.data()); + assertEquals("bar 1", event.comment()); assertNull(event.event()); assertNull(event.retry()); }) @@ -180,6 +174,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { @RestController @SuppressWarnings("unused") + @RequestMapping("/sse") static class SseController { private static final Flux INTERVAL = interval(Duration.ofMillis(100), 50); @@ -187,25 +182,26 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { private MonoProcessor cancellation = MonoProcessor.create(); - @GetMapping("/sse/string") + @GetMapping("/string") Flux string() { return INTERVAL.map(l -> "foo " + l); } - @GetMapping("/sse/person") + @GetMapping("/person") Flux person() { return INTERVAL.map(l -> new Person("foo " + l)); } - @GetMapping("/sse/event") - Flux> sse() { - return INTERVAL.map(l -> ServerSentEvent.builder("foo") - .id(Long.toString(l)) - .comment("bar") - .build()); + @GetMapping("/event") + Flux> sse() { + return INTERVAL.take(2).map(l -> + ServerSentEvent.builder(new Person("foo " + l)) + .id(Long.toString(l)) + .comment("bar " + l) + .build()); } - @GetMapping("/sse/infinite") + @GetMapping("/infinite") Flux infinite() { return Flux.just(0, 1).map(l -> "foo " + l) .mergeWith(Flux.never())