From 69b69442bcbd1b7da6f360e33c7ccc7785e43729 Mon Sep 17 00:00:00 2001 From: Sebastien Deleuze Date: Wed, 21 Dec 2016 17:07:41 +0100 Subject: [PATCH] Add ServerSentEventHttpMessageReader This HTTP message reader parse incoming Server-Sent Events and turn them into Flux, Flux or Flux. Issue: SPR-14539 --- .../ServerSentEventHttpMessageReader.java | 169 ++++++++++++++++++ ...ServerSentEventHttpMessageReaderTests.java | 149 +++++++++++++++ 2 files changed, 318 insertions(+) create mode 100644 spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java create mode 100644 spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageReaderTests.java diff --git a/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java b/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java new file mode 100644 index 00000000000..8be6964dfb1 --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java @@ -0,0 +1,169 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.codec; + +import java.nio.CharBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.IntPredicate; +import java.util.stream.Collectors; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuples; + +import org.springframework.core.ResolvableType; +import org.springframework.core.codec.CodecException; +import org.springframework.core.codec.Decoder; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.http.MediaType; +import org.springframework.http.ReactiveHttpInputMessage; +import org.springframework.util.Assert; +import org.springframework.util.MimeTypeUtils; + +/** + * Reader that supports a stream of {@link ServerSentEvent}s and also plain + * {@link Object}s which is the same as an {@link ServerSentEvent} with data + * only. + * + * @author Sebastien Deleuze + * @since 5.0 + */ +public class ServerSentEventHttpMessageReader implements HttpMessageReader { + + private static final IntPredicate NEWLINE_DELIMITER = b -> b == '\n' || b == '\r'; + + private final List> dataDecoders; + + + public ServerSentEventHttpMessageReader() { + this.dataDecoders = Collections.emptyList(); + } + + public ServerSentEventHttpMessageReader(List> dataDecoders) { + Assert.notNull(dataDecoders, "'dataDecoders' must not be null"); + this.dataDecoders = new ArrayList<>(dataDecoders); + } + + + @Override + public boolean canRead(ResolvableType elementType, MediaType mediaType) { + return MediaType.TEXT_EVENT_STREAM.isCompatibleWith(mediaType) || + ServerSentEvent.class.isAssignableFrom(elementType.getRawClass()); + } + + @Override + public Flux read(ResolvableType elementType, ReactiveHttpInputMessage inputMessage, Map hints) { + boolean isSseElementType = ServerSentEvent.class.isAssignableFrom(elementType.getRawClass()); + ResolvableType dataType = (isSseElementType ? elementType.getGeneric(0) : elementType); + return Flux.from(inputMessage.getBody()) + .concatMap(ServerSentEventHttpMessageReader::splitOnNewline) + .map(buffer -> Tuples.of(decodeDataBuffer(buffer), buffer.factory())) + .bufferUntil(data -> data.getT1().equals("\n")) + .concatMap(list -> { + ServerSentEvent.Builder sseBuilder = ServerSentEvent.builder(); + StringBuilder dataBuilder = new StringBuilder(); + StringBuilder commentBuilder = new StringBuilder(); + DataBufferFactory bufferFactory = list.stream().findFirst().get().getT2(); + String[] lines = list.stream().map(t -> t.getT1()).collect(Collectors.joining()).split("\\r?\\n"); + for (String line : lines) { + if (line.startsWith("id:")) { + sseBuilder.id(line.substring(3)); + } + else if (line.startsWith("event:")) { + sseBuilder.event(line.substring(6)); + } + else if (line.startsWith("data:")) { + dataBuilder.append(line.substring(5)).append("\n"); + } + else if (line.startsWith("retry:")) { + sseBuilder.retry(Duration.ofMillis(Long.valueOf(line.substring(6)))); + } + else if (line.startsWith(":")) { + commentBuilder.append(line.substring(1)).append("\n"); + } + } + if (dataBuilder.length() > 0) { + String data = dataBuilder.toString(); + if (String.class.isAssignableFrom(dataType.getRawClass())) { + sseBuilder.data(data.substring(0, data.length() - 1)); + } + else { + sseBuilder.data(decode(data, bufferFactory, dataType, hints)); + } + } + if (commentBuilder.length() > 0) { + String comment = commentBuilder.toString(); + sseBuilder.comment(comment.substring(0, comment.length() - 1)); + } + ServerSentEvent sse = sseBuilder.build(); + return (isSseElementType ? Mono.just(sse) : Mono.justOrEmpty(sse.data())); + }) + .cast(Object.class); + } + + private static Flux splitOnNewline(DataBuffer dataBuffer) { + List results = new ArrayList<>(); + int startIdx = 0; + int endIdx; + final int limit = dataBuffer.readableByteCount(); + do { + endIdx = dataBuffer.indexOf(NEWLINE_DELIMITER, startIdx); + int length = endIdx != -1 ? endIdx - startIdx + 1 : limit - startIdx; + DataBuffer token = dataBuffer.slice(startIdx, length); + results.add(DataBufferUtils.retain(token)); + startIdx = endIdx + 1; + } + while (startIdx < limit && endIdx != -1); + DataBufferUtils.release(dataBuffer); + return Flux.fromIterable(results); + } + + private String decodeDataBuffer(DataBuffer dataBuffer) { + CharBuffer charBuffer = StandardCharsets.UTF_8.decode(dataBuffer.asByteBuffer()); + DataBufferUtils.release(dataBuffer); + return charBuffer.toString(); + } + + @SuppressWarnings("unchecked") + private T decode(String data, DataBufferFactory bufferFactory, ResolvableType elementType, Map hints) { + Optional> decoder = dataDecoders + .stream() + .filter(e -> e.canDecode(elementType, MimeTypeUtils.APPLICATION_JSON)) + .findFirst(); + return ((Decoder) decoder.orElseThrow(() -> new CodecException("No suitable decoder found!"))) + .decodeToMono(Mono.just(bufferFactory.wrap(data.getBytes(StandardCharsets.UTF_8))), elementType, MimeTypeUtils.APPLICATION_JSON, hints).block(); + } + + @Override + public Mono readMono(ResolvableType elementType, ReactiveHttpInputMessage inputMessage, Map hints) { + return Mono.error(new UnsupportedOperationException("ServerSentEventHttpMessageReader only supports reading stream of events as a Flux")); + } + + @Override + public List getReadableMediaTypes() { + return Collections.singletonList(MediaType.TEXT_EVENT_STREAM); + } +} diff --git a/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageReaderTests.java b/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageReaderTests.java new file mode 100644 index 00000000000..84af432c412 --- /dev/null +++ b/spring-web/src/test/java/org/springframework/http/codec/ServerSentEventHttpMessageReaderTests.java @@ -0,0 +1,149 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.codec; + +import java.time.Duration; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import org.springframework.core.ResolvableType; +import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase; +import org.springframework.http.MediaType; +import org.springframework.http.codec.json.Jackson2JsonDecoder; +import org.springframework.mock.http.server.reactive.test.MockServerHttpRequest; + +/** + * @author Sebastien Deleuze + */ +public class ServerSentEventHttpMessageReaderTests extends AbstractDataBufferAllocatingTestCase { + + private ServerSentEventHttpMessageReader messageReader = new ServerSentEventHttpMessageReader( + Collections.singletonList(new Jackson2JsonDecoder())); + + @Test + public void cantRead() { + assertFalse(messageReader.canRead(ResolvableType.forClass(Object.class), + new MediaType("foo", "bar"))); + assertFalse(messageReader.canRead(ResolvableType.forClass(Object.class), null)); + } + + @Test + public void canRead() { + assertTrue(messageReader.canRead(ResolvableType.forClass(Object.class), + new MediaType("text", "event-stream"))); + assertTrue(messageReader.canRead(ResolvableType.forClass(ServerSentEvent.class), + new MediaType("foo", "bar"))); + } + + @Test + public void readServerSentEvents() { + MockServerHttpRequest request = new MockServerHttpRequest(); + request.setBody("id:c42\nevent:foo\nretry:123\n:bla\n:bla bla\n:bla bla bla\ndata:bar\n\n" + + "id:c43\nevent:bar\nretry:456\ndata:baz\n\n"); + Flux events = messageReader + .read(ResolvableType.forClassWithGenerics(ServerSentEvent.class, String.class), + request, Collections.emptyMap()).cast(ServerSentEvent.class); + + StepVerifier.create(events) + .consumeNextWith(event -> { + assertEquals("c42", event.id().get()); + assertEquals("foo", event.event().get()); + assertEquals(Duration.ofMillis(123), event.retry().get()); + assertEquals("bla\nbla bla\nbla bla bla", event.comment().get()); + assertEquals("bar", event.data().get()); + }) + .consumeNextWith(event -> { + assertEquals("c43", event.id().get()); + assertEquals("bar", event.event().get()); + assertEquals(Duration.ofMillis(456), event.retry().get()); + assertFalse(event.comment().isPresent()); + assertEquals("baz", event.data().get()); + }) + .expectComplete() + .verify(); + } + + @Test + public void readServerSentEventsWithMultipleChunks() { + MockServerHttpRequest request = new MockServerHttpRequest(); + request.setBody(Flux.just(stringBuffer("id:c42\nev"), stringBuffer("ent:foo\nretry:123\n:bla\n:bla bla\n:bla bla bla\ndata:"), + stringBuffer("bar\n\nid:c43\nevent:bar\nretry:456\ndata:baz\n\n"))); + Flux events = messageReader + .read(ResolvableType.forClassWithGenerics(ServerSentEvent.class, String.class), + request, Collections.emptyMap()).cast(ServerSentEvent.class); + + StepVerifier.create(events) + .consumeNextWith(event -> { + assertEquals("c42", event.id().get()); + assertEquals("foo", event.event().get()); + assertEquals(Duration.ofMillis(123), event.retry().get()); + assertEquals("bla\nbla bla\nbla bla bla", event.comment().get()); + assertEquals("bar", event.data().get()); + }) + .consumeNextWith(event -> { + assertEquals("c43", event.id().get()); + assertEquals("bar", event.event().get()); + assertEquals(Duration.ofMillis(456), event.retry().get()); + assertFalse(event.comment().isPresent()); + assertEquals("baz", event.data().get()); + }) + .expectComplete() + .verify(); + } + + @Test + public void readString() { + MockServerHttpRequest request = new MockServerHttpRequest(); + request.setBody("data:foo\ndata:bar\n\ndata:baz\n\n"); + Flux data = messageReader.read(ResolvableType.forClass(String.class), + request, Collections.emptyMap()).cast(String.class); + + StepVerifier.create(data) + .expectNextMatches(elem -> elem.equals("foo\nbar")) + .expectNextMatches(elem -> elem.equals("baz")) + .expectComplete() + .verify(); + } + + @Test + public void readPojo() { + MockServerHttpRequest request = new MockServerHttpRequest(); + request.setBody("data:{\"foo\": \"foofoo\", \"bar\": \"barbar\"}\n\n" + + "data:{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}\n\n"); + Flux data = messageReader.read(ResolvableType.forClass(Pojo.class), request, + Collections.emptyMap()).cast(Pojo.class); + + StepVerifier.create(data) + .consumeNextWith(pojo -> { + assertEquals("foofoo", pojo.getFoo()); + assertEquals("barbar", pojo.getBar()); + }) + .consumeNextWith(pojo -> { + assertEquals("foofoofoo", pojo.getFoo()); + assertEquals("barbarbar", pojo.getBar()); + }) + .expectComplete() + .verify(); + } + +}