Browse Source

Use StringDecoder to split SSE stream

ServerSentEventHttpMessageReader had logic to split on new lines
and buffer until an empty new line (start of a new event). To account
for random data chunking, it later re-assembled the lines for each
event and split again on new lines. However bufferUntil was still
unreliable a chunk may contain nothing but a newline, which doesn't
necessarily mean an empty newline in the overall SSE stream.

This commit simplifies the above by delegating the splitting of the
stream along newlines to StringDecoder.

Issue: SPR-16744
pull/1800/head
Rossen Stoyanchev 8 years ago
parent
commit
66bd277671
  1. 93
      spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java
  2. 64
      spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java

93
spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java

@ -16,15 +16,11 @@ @@ -16,15 +16,11 @@
package org.springframework.http.codec;
import java.nio.CharBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.IntPredicate;
import java.util.stream.Collectors;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -35,7 +31,6 @@ import org.springframework.core.codec.Decoder; @@ -35,7 +31,6 @@ import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpInputMessage;
@ -51,12 +46,12 @@ import org.springframework.lang.Nullable; @@ -51,12 +46,12 @@ import org.springframework.lang.Nullable;
*/
public class ServerSentEventHttpMessageReader implements HttpMessageReader<Object> {
private static final IntPredicate NEWLINE_DELIMITER = b -> b == '\n' || b == '\r';
private static final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
private static final StringDecoder stringDecoder = StringDecoder.textPlainOnly();
private static final ResolvableType STRING_TYPE = ResolvableType.forClass(String.class);
@Nullable
private final Decoder<?> decoder;
@ -110,77 +105,53 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader<Objec @@ -110,77 +105,53 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader<Objec
boolean shouldWrap = isServerSentEvent(elementType);
ResolvableType valueType = (shouldWrap ? elementType.getGeneric(0) : elementType);
return Flux.from(message.getBody())
.concatMap(ServerSentEventHttpMessageReader::splitOnNewline)
.map(buffer -> {
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer.asByteBuffer());
DataBufferUtils.release(buffer);
return charBuffer.toString();
})
.bufferUntil(line -> line.equals("\n"))
.concatMap(rawLines -> {
String[] lines = rawLines.stream().collect(Collectors.joining()).split("\\r?\\n");
return buildEvent(lines, valueType, hints)
.filter(event -> shouldWrap || event.data() != null)
.map(event -> shouldWrap ? event : event.data());
})
.cast(Object.class);
}
private static Flux<DataBuffer> splitOnNewline(DataBuffer dataBuffer) {
List<DataBuffer> results = new ArrayList<>();
int startIdx = 0;
int endIdx;
final int limit = dataBuffer.readableByteCount();
do {
endIdx = dataBuffer.indexOf(NEWLINE_DELIMITER, startIdx);
int length = endIdx != -1 ? endIdx - startIdx + 1 : limit - startIdx;
DataBuffer token = dataBuffer.slice(startIdx, length);
results.add(DataBufferUtils.retain(token));
startIdx = endIdx + 1;
}
while (startIdx < limit && endIdx != -1);
DataBufferUtils.release(dataBuffer);
return Flux.fromIterable(results);
return stringDecoder.decode(message.getBody(), STRING_TYPE, null, Collections.emptyMap())
.bufferUntil(line -> line.equals(""))
.concatMap(lines -> buildEvent(lines, valueType, shouldWrap, hints));
}
private Mono<ServerSentEvent<Object>> buildEvent(String[] lines, ResolvableType valueType,
private Mono<?> buildEvent(List<String> lines, ResolvableType valueType, boolean shouldWrap,
Map<String, Object> hints) {
ServerSentEvent.Builder<Object> sseBuilder = ServerSentEvent.builder();
ServerSentEvent.Builder<Object> sseBuilder = shouldWrap ? ServerSentEvent.builder() : null;
StringBuilder data = null;
StringBuilder comment = null;
for (String line : lines) {
if (line.startsWith("id:")) {
sseBuilder.id(line.substring(3));
}
else if (line.startsWith("event:")) {
sseBuilder.event(line.substring(6));
}
else if (line.startsWith("data:")) {
if (line.startsWith("data:")) {
data = (data != null ? data : new StringBuilder());
data.append(line.substring(5)).append("\n");
}
else if (line.startsWith("retry:")) {
sseBuilder.retry(Duration.ofMillis(Long.valueOf(line.substring(6))));
}
else if (line.startsWith(":")) {
comment = (comment != null ? comment : new StringBuilder());
comment.append(line.substring(1)).append("\n");
if (shouldWrap) {
if (line.startsWith("id:")) {
sseBuilder.id(line.substring(3));
}
else if (line.startsWith("event:")) {
sseBuilder.event(line.substring(6));
}
else if (line.startsWith("retry:")) {
sseBuilder.retry(Duration.ofMillis(Long.valueOf(line.substring(6))));
}
else if (line.startsWith(":")) {
comment = (comment != null ? comment : new StringBuilder());
comment.append(line.substring(1)).append("\n");
}
}
}
if (comment != null) {
sseBuilder.comment(comment.toString().substring(0, comment.length() - 1));
}
if (data != null) {
return decodeData(data.toString(), valueType, hints).map(decodedData -> {
sseBuilder.data(decodedData);
Mono<?> decodedData = (data != null ? decodeData(data.toString(), valueType, hints) : Mono.empty());
if (shouldWrap) {
if (comment != null) {
sseBuilder.comment(comment.toString().substring(0, comment.length() - 1));
}
return decodedData.map(o -> {
sseBuilder.data(o);
return sseBuilder.build();
});
}
else {
return Mono.just(sseBuilder.build());
return decodedData;
}
}

64
spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java

@ -18,6 +18,7 @@ package org.springframework.web.reactive.result.method.annotation; @@ -18,6 +18,7 @@ package org.springframework.web.reactive.result.method.annotation;
import java.time.Duration;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@ -32,8 +33,10 @@ import org.springframework.core.ParameterizedTypeReference; @@ -32,8 +33,10 @@ import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.http.server.reactive.AbstractHttpHandlerIntegrationTests;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.bootstrap.JettyHttpServer;
import org.springframework.http.server.reactive.bootstrap.ReactorHttpServer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.DispatcherHandler;
import org.springframework.web.reactive.config.EnableWebFlux;
@ -103,51 +106,42 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { @@ -103,51 +106,42 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
@Test
public void sseAsEvent() {
Flux<ServerSentEvent<String>> result = this.webClient.get()
Assume.assumeTrue(server instanceof JettyHttpServer);
Flux<ServerSentEvent<Person>> result = this.webClient.get()
.uri("/event")
.accept(TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {});
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<Person>>() {});
StepVerifier.create(result)
.consumeNextWith( event -> {
assertEquals("0", event.id());
assertEquals("foo", event.data());
assertEquals("bar", event.comment());
assertNull(event.event());
assertNull(event.retry());
})
.consumeNextWith( event -> {
assertEquals("1", event.id());
assertEquals("foo", event.data());
assertEquals("bar", event.comment());
assertNull(event.event());
assertNull(event.retry());
})
.thenCancel()
.verify(Duration.ofSeconds(5L));
verifyPersonEvents(result);
}
@Test
public void sseAsEventWithoutAcceptHeader() {
Flux<ServerSentEvent<String>> result = this.webClient.get()
Flux<ServerSentEvent<Person>> result = this.webClient.get()
.uri("/event")
.accept(TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {});
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<Person>>() {});
verifyPersonEvents(result);
}
private void verifyPersonEvents(Flux<ServerSentEvent<Person>> result) {
StepVerifier.create(result)
.consumeNextWith( event -> {
assertEquals("0", event.id());
assertEquals("foo", event.data());
assertEquals("bar", event.comment());
assertEquals(new Person("foo 0"), event.data());
assertEquals("bar 0", event.comment());
assertNull(event.event());
assertNull(event.retry());
})
.consumeNextWith( event -> {
assertEquals("1", event.id());
assertEquals("foo", event.data());
assertEquals("bar", event.comment());
assertEquals(new Person("foo 1"), event.data());
assertEquals("bar 1", event.comment());
assertNull(event.event());
assertNull(event.retry());
})
@ -180,6 +174,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { @@ -180,6 +174,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
@RestController
@SuppressWarnings("unused")
@RequestMapping("/sse")
static class SseController {
private static final Flux<Long> INTERVAL = interval(Duration.ofMillis(100), 50);
@ -187,25 +182,26 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { @@ -187,25 +182,26 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
private MonoProcessor<Void> cancellation = MonoProcessor.create();
@GetMapping("/sse/string")
@GetMapping("/string")
Flux<String> string() {
return INTERVAL.map(l -> "foo " + l);
}
@GetMapping("/sse/person")
@GetMapping("/person")
Flux<Person> person() {
return INTERVAL.map(l -> new Person("foo " + l));
}
@GetMapping("/sse/event")
Flux<ServerSentEvent<String>> sse() {
return INTERVAL.map(l -> ServerSentEvent.builder("foo")
.id(Long.toString(l))
.comment("bar")
.build());
@GetMapping("/event")
Flux<ServerSentEvent<Person>> sse() {
return INTERVAL.take(2).map(l ->
ServerSentEvent.builder(new Person("foo " + l))
.id(Long.toString(l))
.comment("bar " + l)
.build());
}
@GetMapping("/sse/infinite")
@GetMapping("/infinite")
Flux<String> infinite() {
return Flux.just(0, 1).map(l -> "foo " + l)
.mergeWith(Flux.never())

Loading…
Cancel
Save