|
|
|
@ -43,7 +43,6 @@ import org.springframework.web.server.adapter.WebHttpHandlerBuilder; |
|
|
|
import static org.junit.Assert.*; |
|
|
|
import static org.junit.Assert.*; |
|
|
|
import static org.junit.Assume.*; |
|
|
|
import static org.junit.Assume.*; |
|
|
|
import static org.springframework.http.MediaType.*; |
|
|
|
import static org.springframework.http.MediaType.*; |
|
|
|
import static org.springframework.web.reactive.function.BodyExtractors.*; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* @author Sebastien Deleuze |
|
|
|
* @author Sebastien Deleuze |
|
|
|
@ -77,8 +76,8 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { |
|
|
|
Flux<String> result = this.webClient.get() |
|
|
|
Flux<String> result = this.webClient.get() |
|
|
|
.uri("/string") |
|
|
|
.uri("/string") |
|
|
|
.accept(TEXT_EVENT_STREAM) |
|
|
|
.accept(TEXT_EVENT_STREAM) |
|
|
|
.exchange() |
|
|
|
.retrieve() |
|
|
|
.flatMapMany(response -> response.bodyToFlux(String.class)); |
|
|
|
.bodyToFlux(String.class); |
|
|
|
|
|
|
|
|
|
|
|
StepVerifier.create(result) |
|
|
|
StepVerifier.create(result) |
|
|
|
.expectNext("foo 0") |
|
|
|
.expectNext("foo 0") |
|
|
|
@ -92,8 +91,8 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { |
|
|
|
Flux<Person> result = this.webClient.get() |
|
|
|
Flux<Person> result = this.webClient.get() |
|
|
|
.uri("/person") |
|
|
|
.uri("/person") |
|
|
|
.accept(TEXT_EVENT_STREAM) |
|
|
|
.accept(TEXT_EVENT_STREAM) |
|
|
|
.exchange() |
|
|
|
.retrieve() |
|
|
|
.flatMapMany(response -> response.bodyToFlux(Person.class)); |
|
|
|
.bodyToFlux(Person.class); |
|
|
|
|
|
|
|
|
|
|
|
StepVerifier.create(result) |
|
|
|
StepVerifier.create(result) |
|
|
|
.expectNext(new Person("foo 0")) |
|
|
|
.expectNext(new Person("foo 0")) |
|
|
|
@ -107,9 +106,8 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { |
|
|
|
Flux<ServerSentEvent<String>> result = this.webClient.get() |
|
|
|
Flux<ServerSentEvent<String>> result = this.webClient.get() |
|
|
|
.uri("/event") |
|
|
|
.uri("/event") |
|
|
|
.accept(TEXT_EVENT_STREAM) |
|
|
|
.accept(TEXT_EVENT_STREAM) |
|
|
|
.exchange() |
|
|
|
.retrieve() |
|
|
|
.flatMapMany(response -> response.body( |
|
|
|
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {}); |
|
|
|
toFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {}))); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
StepVerifier.create(result) |
|
|
|
StepVerifier.create(result) |
|
|
|
.consumeNextWith( event -> { |
|
|
|
.consumeNextWith( event -> { |
|
|
|
@ -135,9 +133,8 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { |
|
|
|
Flux<ServerSentEvent<String>> result = this.webClient.get() |
|
|
|
Flux<ServerSentEvent<String>> result = this.webClient.get() |
|
|
|
.uri("/event") |
|
|
|
.uri("/event") |
|
|
|
.accept(TEXT_EVENT_STREAM) |
|
|
|
.accept(TEXT_EVENT_STREAM) |
|
|
|
.exchange() |
|
|
|
.retrieve() |
|
|
|
.flatMapMany(response -> response.body( |
|
|
|
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {}); |
|
|
|
toFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {}))); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
StepVerifier.create(result) |
|
|
|
StepVerifier.create(result) |
|
|
|
.consumeNextWith( event -> { |
|
|
|
.consumeNextWith( event -> { |
|
|
|
@ -167,8 +164,8 @@ public class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { |
|
|
|
Flux<String> result = this.webClient.get() |
|
|
|
Flux<String> result = this.webClient.get() |
|
|
|
.uri("/infinite") |
|
|
|
.uri("/infinite") |
|
|
|
.accept(TEXT_EVENT_STREAM) |
|
|
|
.accept(TEXT_EVENT_STREAM) |
|
|
|
.exchange() |
|
|
|
.retrieve() |
|
|
|
.flatMapMany(response -> response.bodyToFlux(String.class)); |
|
|
|
.bodyToFlux(String.class); |
|
|
|
|
|
|
|
|
|
|
|
StepVerifier.create(result) |
|
|
|
StepVerifier.create(result) |
|
|
|
.expectNext("foo 0") |
|
|
|
.expectNext("foo 0") |
|
|
|
|