diff --git a/spring-test/src/test/java/org/springframework/test/web/reactive/server/samples/ResponseEntityTests.java b/spring-test/src/test/java/org/springframework/test/web/reactive/server/samples/ResponseEntityTests.java index 7d39d9817a2..1177be557e2 100644 --- a/spring-test/src/test/java/org/springframework/test/web/reactive/server/samples/ResponseEntityTests.java +++ b/spring-test/src/test/java/org/springframework/test/web/reactive/server/samples/ResponseEntityTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -59,7 +59,7 @@ public class ResponseEntityTests { @Test - public void entity() throws Exception { + public void entity() { this.client.get().uri("/John") .exchange() .expectStatus().isOk() @@ -68,7 +68,7 @@ public class ResponseEntityTests { } @Test - public void entityWithConsumer() throws Exception { + public void entityWithConsumer() { this.client.get().uri("/John") .exchange() .expectStatus().isOk() @@ -78,7 +78,7 @@ public class ResponseEntityTests { } @Test - public void entityList() throws Exception { + public void entityList() { List expected = Arrays.asList( new Person("Jane"), new Person("Jason"), new Person("John")); @@ -91,7 +91,7 @@ public class ResponseEntityTests { } @Test - public void entityMap() throws Exception { + public void entityMap() { Map map = new LinkedHashMap<>(); map.put("Jane", new Person("Jane")); @@ -105,7 +105,7 @@ public class ResponseEntityTests { } @Test - public void entityStream() throws Exception { + public void entityStream() { FluxExchangeResult result = this.client.get() .accept(TEXT_EVENT_STREAM) @@ -123,7 +123,7 @@ public class ResponseEntityTests { } @Test - public void postEntity() throws Exception { + public void postEntity() { this.client.post() .syncBody(new Person("John")) .exchange() @@ -158,7 +158,8 @@ public class ResponseEntityTests { @GetMapping(produces = "text/event-stream") Flux getPersonStream() { - return Flux.interval(ofMillis(100)).onBackpressureBuffer(10).map(index -> new Person("N" + index)); + return Flux.interval(ofMillis(100)).take(50).onBackpressureBuffer(50) + .map(index -> new Person("N" + index)); } @PostMapping diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/AbstractHttpHandlerIntegrationTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/AbstractHttpHandlerIntegrationTests.java index 9ebf31f7776..2ad80fb7721 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/AbstractHttpHandlerIntegrationTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/AbstractHttpHandlerIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.http.server.reactive; import java.io.File; +import java.time.Duration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -24,6 +25,7 @@ import org.junit.After; import org.junit.Before; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import reactor.core.publisher.Flux; import org.springframework.http.server.reactive.bootstrap.HttpServer; import org.springframework.http.server.reactive.bootstrap.JettyHttpServer; @@ -73,4 +75,13 @@ public abstract class AbstractHttpHandlerIntegrationTests { protected abstract HttpHandler createHttpHandler(); + + /** + * Return an interval stream of with n number of ticks and buffer the + * emissions to avoid back pressure failures (e.g. on slow CI server). + */ + public static Flux interval(Duration period, int count) { + return Flux.interval(period).take(count).onBackpressureBuffer(2); + } + } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/FlushingIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/FlushingIntegrationTests.java index 8d30429729f..9f73c560fd3 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/FlushingIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/FlushingIntegrationTests.java @@ -53,7 +53,7 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest @Test - public void writeAndFlushWith() throws Exception { + public void writeAndFlushWith() { Mono result = this.webClient.get() .uri("/write-and-flush") .retrieve() @@ -64,7 +64,7 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest StepVerifier.create(result) .expectNext("data0data1") .expectComplete() - .verify(Duration.ofSeconds(5L)); + .verify(Duration.ofSeconds(10L)); } @Test // SPR-14991 @@ -79,7 +79,7 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest StepVerifier.create(result) .consumeNextWith(value -> assertTrue(value.length() == 20000 * "0123456789".length())) .expectComplete() - .verify(Duration.ofSeconds(5L)); + .verify(Duration.ofSeconds(10L)); } catch (AssertionError err) { String os = System.getProperty("os.name").toLowerCase(); @@ -103,7 +103,7 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest StepVerifier.create(result) .expectNextMatches(s -> s.startsWith("0123456789")) .expectComplete() - .verify(Duration.ofSeconds(5L)); + .verify(Duration.ofSeconds(10L)); } @@ -119,13 +119,10 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { String path = request.getURI().getPath(); if (path.endsWith("write-and-flush")) { - Flux> responseBody = Flux - .interval(Duration.ofMillis(50)) + Flux> responseBody = interval(Duration.ofMillis(50), 2) .map(l -> toDataBuffer("data" + l + "\n", response.bufferFactory())) - .take(2) .map(Flux::just); - responseBody = responseBody.concatWith(Flux.never()); - return response.writeAndFlushWith(responseBody); + return response.writeAndFlushWith(responseBody.concatWith(Flux.never())); } else if (path.endsWith("write-and-complete")) { Flux responseBody = Flux @@ -138,9 +135,8 @@ public class FlushingIntegrationTests extends AbstractHttpHandlerIntegrationTest Flux responseBody = Flux .just("0123456789") .repeat(20000) - .map(value -> toDataBuffer(value + "\n", response.bufferFactory())) - .mergeWith(Flux.never()); - return response.writeWith(responseBody); + .map(value -> toDataBuffer(value + "\n", response.bufferFactory())); + return response.writeWith(responseBody.mergeWith(Flux.never())); } return response.writeWith(Flux.empty()); } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/SseHandlerFunctionIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/SseHandlerFunctionIntegrationTests.java index 356ac87c654..24f5a5abf74 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/SseHandlerFunctionIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/server/SseHandlerFunctionIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,10 +30,10 @@ import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.reactive.function.client.WebClient; import static org.junit.Assert.*; -import static org.springframework.http.MediaType.TEXT_EVENT_STREAM; -import static org.springframework.web.reactive.function.BodyExtractors.toFlux; -import static org.springframework.web.reactive.function.BodyInserters.fromServerSentEvents; -import static org.springframework.web.reactive.function.server.RouterFunctions.route; +import static org.springframework.http.MediaType.*; +import static org.springframework.web.reactive.function.BodyExtractors.*; +import static org.springframework.web.reactive.function.BodyInserters.*; +import static org.springframework.web.reactive.function.server.RouterFunctions.*; /** * @author Arjen Poutsma @@ -59,7 +59,7 @@ public class SseHandlerFunctionIntegrationTests extends AbstractRouterFunctionIn @Test - public void sseAsString() throws Exception { + public void sseAsString() { Flux result = this.webClient.get() .uri("/string") .accept(TEXT_EVENT_STREAM) @@ -74,7 +74,7 @@ public class SseHandlerFunctionIntegrationTests extends AbstractRouterFunctionIn } @Test - public void sseAsPerson() throws Exception { + public void sseAsPerson() { Flux result = this.webClient.get() .uri("/person") .accept(TEXT_EVENT_STREAM) @@ -89,7 +89,7 @@ public class SseHandlerFunctionIntegrationTests extends AbstractRouterFunctionIn } @Test - public void sseAsEvent() throws Exception { + public void sseAsEvent() { Flux> result = this.webClient.get() .uri("/event") .accept(TEXT_EVENT_STREAM) @@ -119,28 +119,25 @@ public class SseHandlerFunctionIntegrationTests extends AbstractRouterFunctionIn private static class SseHandler { - public Mono string(ServerRequest request) { - Flux flux = Flux.interval(Duration.ofMillis(100)).map(l -> "foo " + l).take(2); + private static final Flux INTERVAL = interval(Duration.ofMillis(100), 2); + + + Mono string(ServerRequest request) { return ServerResponse.ok() .contentType(MediaType.TEXT_EVENT_STREAM) - .body(flux, String.class); + .body(INTERVAL.map(aLong -> "foo " + aLong), String.class); } - public Mono person(ServerRequest request) { - Flux flux = Flux.interval(Duration.ofMillis(100)) - .map(l -> new Person("foo " + l)).take(2); + Mono person(ServerRequest request) { return ServerResponse.ok() .contentType(MediaType.TEXT_EVENT_STREAM) - .body(flux, Person.class); + .body(INTERVAL.map(aLong -> new Person("foo " + aLong)), Person.class); } - public Mono sse(ServerRequest request) { - Flux> flux = Flux.interval(Duration.ofMillis(100)) - .map(l -> ServerSentEvent.builder().data("foo") - .id(Long.toString(l)) - .comment("bar") - .build()).take(2); - return ServerResponse.ok().body(fromServerSentEvents(flux)); + Mono sse(ServerRequest request) { + Flux> body = INTERVAL + .map(aLong -> ServerSentEvent.builder("foo").id("" + aLong).comment("bar").build()); + return ServerResponse.ok().body(fromServerSentEvents(body)); } } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/JacksonStreamingIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/JacksonStreamingIntegrationTests.java index 8a8c0ca410b..f729b8145ef 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/JacksonStreamingIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/JacksonStreamingIntegrationTests.java @@ -29,15 +29,14 @@ import org.springframework.context.annotation.Configuration; import org.springframework.http.MediaType; import org.springframework.http.server.reactive.AbstractHttpHandlerIntegrationTests; import org.springframework.http.server.reactive.HttpHandler; -import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.reactive.DispatcherHandler; import org.springframework.web.reactive.config.EnableWebFlux; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.server.adapter.WebHttpHandlerBuilder; -import static org.springframework.http.MediaType.APPLICATION_STREAM_JSON; -import static org.springframework.http.MediaType.APPLICATION_STREAM_JSON_VALUE; +import static org.springframework.http.MediaType.*; /** * @author Sebastien Deleuze @@ -67,7 +66,7 @@ public class JacksonStreamingIntegrationTests extends AbstractHttpHandlerIntegra } @Test - public void jsonStreaming() throws Exception { + public void jsonStreaming() { Flux result = this.webClient.get() .uri("/stream") .accept(APPLICATION_STREAM_JSON) @@ -82,7 +81,7 @@ public class JacksonStreamingIntegrationTests extends AbstractHttpHandlerIntegra } @Test - public void smileStreaming() throws Exception { + public void smileStreaming() { Flux result = this.webClient.get() .uri("/stream") .accept(new MediaType("application", "stream+x-jackson-smile")) @@ -100,9 +99,10 @@ public class JacksonStreamingIntegrationTests extends AbstractHttpHandlerIntegra @SuppressWarnings("unused") static class JacksonStreamingController { - @RequestMapping(value = "/stream", produces = { APPLICATION_STREAM_JSON_VALUE, "application/stream+x-jackson-smile" }) + @GetMapping(value = "/stream", + produces = { APPLICATION_STREAM_JSON_VALUE, "application/stream+x-jackson-smile" }) Flux person() { - return Flux.interval(Duration.ofMillis(100)).map(l -> new Person("foo " + l)); + return interval(Duration.ofMillis(100), 50).map(l -> new Person("foo " + l)); } } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingIntegrationTests.java index 5eee687983b..b126b32d11a 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/RequestMappingIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,6 @@ import java.time.Duration; import org.junit.Test; import org.reactivestreams.Publisher; -import reactor.core.publisher.Flux; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; @@ -30,9 +29,7 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.reactive.config.EnableWebFlux; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.*; /** * Integration tests with {@code @RequestMapping} handler methods. @@ -56,7 +53,7 @@ public class RequestMappingIntegrationTests extends AbstractRequestMappingIntegr @Test - public void httpHead() throws Exception { + public void httpHead() { String url = "http://localhost:" + this.port + "/text"; HttpHeaders headers = getRestTemplate().headForHeaders(url); String contentType = headers.getFirst("Content-Type"); @@ -89,7 +86,7 @@ public class RequestMappingIntegrationTests extends AbstractRequestMappingIntegr @GetMapping("/stream") public Publisher stream() { - return Flux.interval(Duration.ofMillis(50)).take(5); + return interval(Duration.ofMillis(50), 5); } } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java index 06c17b34085..287609df3f7 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -71,7 +71,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { } @Test - public void sseAsString() throws Exception { + public void sseAsString() { Flux result = this.webClient.get() .uri("/string") .accept(TEXT_EVENT_STREAM) @@ -86,7 +86,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { } @Test - public void sseAsPerson() throws Exception { + public void sseAsPerson() { Flux result = this.webClient.get() .uri("/person") .accept(TEXT_EVENT_STREAM) @@ -101,13 +101,14 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { } @Test - public void sseAsEvent() throws Exception { + public void sseAsEvent() { ResolvableType type = forClassWithGenerics(ServerSentEvent.class, String.class); Flux> result = this.webClient.get() .uri("/event") .accept(TEXT_EVENT_STREAM) .exchange() - .flatMapMany(response -> response.body(toFlux(new ParameterizedTypeReference>() {}))); + .flatMapMany(response -> response.body( + toFlux(new ParameterizedTypeReference>() {}))); StepVerifier.create(result) .consumeNextWith( event -> { @@ -129,12 +130,13 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { } @Test - public void sseAsEventWithoutAcceptHeader() throws Exception { + public void sseAsEventWithoutAcceptHeader() { Flux> result = this.webClient.get() .uri("/event") .accept(TEXT_EVENT_STREAM) .exchange() - .flatMapMany(response -> response.body(toFlux(new ParameterizedTypeReference>() {}))); + .flatMapMany(response -> response.body( + toFlux(new ParameterizedTypeReference>() {}))); StepVerifier.create(result) .consumeNextWith( event -> { @@ -155,23 +157,27 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { .verify(Duration.ofSeconds(5L)); } + @RestController @SuppressWarnings("unused") static class SseController { + private static final Flux INTERVAL = interval(Duration.ofMillis(100), 50); + + @RequestMapping("/sse/string") Flux string() { - return Flux.interval(Duration.ofMillis(100)).map(l -> "foo " + l); + return INTERVAL.map(l -> "foo " + l); } @RequestMapping("/sse/person") Flux person() { - return Flux.interval(Duration.ofMillis(100)).map(l -> new Person("foo " + l)); + return INTERVAL.map(l -> new Person("foo " + l)); } @RequestMapping("/sse/event") Flux> sse() { - return Flux.interval(Duration.ofMillis(100)).map(l -> ServerSentEvent.builder("foo") + return INTERVAL.map(l -> ServerSentEvent.builder("foo") .id(Long.toString(l)) .comment("bar") .build()); @@ -179,6 +185,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { } + @Configuration @EnableWebFlux @SuppressWarnings("unused") @@ -190,6 +197,7 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { } } + @SuppressWarnings("unused") private static class Person {