Browse Source

Introduce ResourceEncoder and ResourceDecoder

This commit introduces the ResourceEncoder and ResourceDecoder, and uses
these in ResourceHttpMessageConverter as a non-zero-copy fallback
method.
pull/1111/head
Arjen Poutsma 10 years ago
parent
commit
52c9b3b235
  1. 77
      spring-web-reactive/src/main/java/org/springframework/core/codec/support/ResourceDecoder.java
  2. 78
      spring-web-reactive/src/main/java/org/springframework/core/codec/support/ResourceEncoder.java
  3. 2
      spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringEncoder.java
  4. 59
      spring-web-reactive/src/main/java/org/springframework/core/io/support/ResourceUtils2.java
  5. 38
      spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/CodecHttpMessageConverter.java
  6. 158
      spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/ResourceHttpMessageConverter.java
  7. 80
      spring-web-reactive/src/test/java/org/springframework/core/codec/support/ResourceDecoderTests.java
  8. 71
      spring-web-reactive/src/test/java/org/springframework/core/codec/support/ResourceEncoderTests.java

77
spring-web-reactive/src/main/java/org/springframework/core/codec/support/ResourceDecoder.java

@ -0,0 +1,77 @@ @@ -0,0 +1,77 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.codec.support;
import java.io.InputStream;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.InputStreamResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.support.DataBufferUtils;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
/**
* A decoder for {@link Resource}s.
*
* @author Arjen Poutsma
*/
public class ResourceDecoder extends AbstractDecoder<Resource> {
public ResourceDecoder() {
super(MimeTypeUtils.ALL);
}
@Override
public boolean canDecode(ResolvableType type, MimeType mimeType, Object... hints) {
Class<?> clazz = type.getRawClass();
return (InputStreamResource.class.equals(clazz) ||
clazz.isAssignableFrom(ByteArrayResource.class)) &&
super.canDecode(type, mimeType, hints);
}
@Override
public Flux<Resource> decode(Publisher<DataBuffer> inputStream, ResolvableType type,
MimeType mimeType, Object... hints) {
Class<?> clazz = type.getRawClass();
Flux<DataBuffer> body = Flux.from(inputStream);
if (InputStreamResource.class.equals(clazz)) {
InputStream is = DataBufferUtils.toInputStream(body);
return Flux.just(new InputStreamResource(is));
}
else if (clazz.isAssignableFrom(ByteArrayResource.class)) {
Mono<DataBuffer> singleBuffer = body.reduce(DataBuffer::write);
return Flux.from(singleBuffer.map(buffer -> {
byte[] bytes = new byte[buffer.readableByteCount()];
buffer.read(bytes);
return new ByteArrayResource(bytes);
}));
}
else {
return Flux.error(new IllegalStateException(
"Unsupported resource class: " + clazz));
}
}
}

78
spring-web-reactive/src/main/java/org/springframework/core/codec/support/ResourceEncoder.java

@ -0,0 +1,78 @@ @@ -0,0 +1,78 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.codec.support;
import java.io.IOException;
import java.io.InputStream;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.core.io.buffer.support.DataBufferUtils;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.StreamUtils;
/**
* An encoder for {@link Resource}s.
* @author Arjen Poutsma
*/
public class ResourceEncoder extends AbstractEncoder<Resource> {
public static final int DEFAULT_BUFFER_SIZE = StreamUtils.BUFFER_SIZE;
private final int bufferSize;
public ResourceEncoder() {
this(DEFAULT_BUFFER_SIZE);
}
public ResourceEncoder(int bufferSize) {
super(MimeTypeUtils.ALL);
Assert.isTrue(bufferSize > 0, "'bufferSize' must be larger than 0");
this.bufferSize = bufferSize;
}
@Override
public boolean canEncode(ResolvableType type, MimeType mimeType, Object... hints) {
Class<?> clazz = type.getRawClass();
return (super.canEncode(type, mimeType, hints) &&
Resource.class.isAssignableFrom(clazz));
}
@Override
public Flux<DataBuffer> encode(Publisher<? extends Resource> inputStream,
DataBufferAllocator allocator, ResolvableType type, MimeType mimeType,
Object... hints) {
return Flux.from(inputStream).
concatMap(resource -> {
try {
InputStream is = resource.getInputStream();
return DataBufferUtils.read(is, allocator, this.bufferSize);
}
catch (IOException ex) {
return Mono.error(ex);
}
});
}
}

2
spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringEncoder.java

@ -45,7 +45,7 @@ public class StringEncoder extends AbstractEncoder<String> { @@ -45,7 +45,7 @@ public class StringEncoder extends AbstractEncoder<String> {
@Override
public boolean canEncode(ResolvableType type, MimeType mimeType, Object... hints) {
Class<?> clazz = type.getRawClass();
return (super.canEncode(type, mimeType, hints) && String.class.isAssignableFrom(clazz));
return (super.canEncode(type, mimeType, hints) && String.class.equals(clazz));
}
@Override

59
spring-web-reactive/src/main/java/org/springframework/core/io/support/ResourceUtils2.java

@ -0,0 +1,59 @@ @@ -0,0 +1,59 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.io.support;
import java.io.IOException;
import java.net.URI;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.DescriptiveResource;
import org.springframework.core.io.InputStreamResource;
import org.springframework.core.io.Resource;
import org.springframework.util.Assert;
import org.springframework.util.ResourceUtils;
/**
* @author Arjen Poutsma
*/
public abstract class ResourceUtils2 {
/**
* Indicates whether the given resource has a file, so that {@link
* Resource#getFile()}
* can be called without an {@link java.io.IOException}.
* @param resource the resource to check
* @return {@code true} if the given resource has a file; {@code false} otherwise
*/
// TODO: refactor into Resource.hasFile() method
public static boolean hasFile(Resource resource) {
Assert.notNull(resource, "'resource' must not be null");
// the following Resource implementations do not support getURI/getFile
if (resource instanceof ByteArrayResource ||
resource instanceof DescriptiveResource ||
resource instanceof InputStreamResource) {
return false;
}
try {
URI resourceUri = resource.getURI();
return ResourceUtils.URL_PROTOCOL_FILE.equals(resourceUri.getScheme());
}
catch (IOException ignored) {
}
return false;
}
}

38
spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/CodecHttpMessageConverter.java

@ -29,12 +29,16 @@ import org.springframework.core.codec.Decoder; @@ -29,12 +29,16 @@ import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpInputMessage;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.http.support.MediaTypeUtils;
/**
* Implementation of the {@link HttpMessageConverter} interface that delegates to
* {@link Encoder} and {@link Decoder}.
*
* @author Arjen Poutsma
*/
public class CodecHttpMessageConverter<T> implements HttpMessageConverter<T> {
@ -43,6 +47,32 @@ public class CodecHttpMessageConverter<T> implements HttpMessageConverter<T> { @@ -43,6 +47,32 @@ public class CodecHttpMessageConverter<T> implements HttpMessageConverter<T> {
private final Decoder<T> decoder;
/**
* Create a {@code CodecHttpMessageConverter} with the given {@link Encoder}. When
* using this constructor, all read-related methods will in {@code false} or an
* {@link IllegalStateException}.
* @param encoder the encoder to use
*/
public CodecHttpMessageConverter(Encoder<T> encoder) {
this(encoder, null);
}
/**
* Create a {@code CodecHttpMessageConverter} with the given {@link Decoder}. When
* using this constructor, all write-related methods will in {@code false} or an
* {@link IllegalStateException}.
* @param decoder the decoder to use
*/
public CodecHttpMessageConverter(Decoder<T> decoder) {
this(null, decoder);
}
/**
* Create a {@code CodecHttpMessageConverter} with the given {@link Encoder} and
* {@link Decoder}.
* @param encoder the encoder to use, can be {@code null}
* @param decoder the decoder to use, can be {@code null}
*/
public CodecHttpMessageConverter(Encoder<T> encoder, Decoder<T> decoder) {
this.encoder = encoder;
this.decoder = decoder;
@ -94,9 +124,13 @@ public class CodecHttpMessageConverter<T> implements HttpMessageConverter<T> { @@ -94,9 +124,13 @@ public class CodecHttpMessageConverter<T> implements HttpMessageConverter<T> {
if (this.encoder == null) {
return Mono.error(new IllegalStateException("No decoder set"));
}
outputMessage.getHeaders().setContentType(contentType);
HttpHeaders headers = outputMessage.getHeaders();
if (headers.getContentType() == null) {
headers.setContentType(contentType);
}
DataBufferAllocator allocator = outputMessage.allocator();
Flux<DataBuffer> body = encoder.encode(inputStream, allocator, type, contentType);
Flux<DataBuffer> body =
this.encoder.encode(inputStream, allocator, type, contentType);
return outputMessage.setBody(body);
}
}

158
spring-web-reactive/src/main/java/org/springframework/http/converter/reactive/ResourceHttpMessageConverter.java

@ -18,10 +18,6 @@ package org.springframework.http.converter.reactive; @@ -18,10 +18,6 @@ package org.springframework.http.converter.reactive;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.reactivestreams.Publisher;
@ -29,106 +25,46 @@ import reactor.core.publisher.Flux; @@ -29,106 +25,46 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.DescriptiveResource;
import org.springframework.core.codec.support.ResourceDecoder;
import org.springframework.core.codec.support.ResourceEncoder;
import org.springframework.core.io.InputStreamResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.support.DataBufferUtils;
import org.springframework.core.io.support.ResourceUtils2;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpRangeResource;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpInputMessage;
import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.http.ZeroCopyHttpOutputMessage;
import org.springframework.http.support.MediaTypeUtils;
import org.springframework.util.MimeTypeUtils2;
import org.springframework.util.ResourceUtils;
import org.springframework.util.StreamUtils;
/**
* Implementation of {@link HttpMessageConverter} that can read and write
* {@link Resource Resources} and supports byte range requests.
**
* @author Arjen Poutsma
*/
public class ResourceHttpMessageConverter implements HttpMessageConverter<Resource> {
public class ResourceHttpMessageConverter extends CodecHttpMessageConverter<Resource> {
private static final int BUFFER_SIZE = StreamUtils.BUFFER_SIZE;
private static final List<MediaType> SUPPORTED_MEDIA_TYPES =
Collections.singletonList(MediaType.ALL);
@Override
public boolean canRead(ResolvableType type, MediaType mediaType) {
return Resource.class.isAssignableFrom(type.getRawClass());
}
@Override
public boolean canWrite(ResolvableType type, MediaType mediaType) {
return Resource.class.isAssignableFrom(type.getRawClass());
}
@Override
public List<MediaType> getReadableMediaTypes() {
return SUPPORTED_MEDIA_TYPES;
public ResourceHttpMessageConverter() {
super(new ResourceEncoder(), new ResourceDecoder());
}
@Override
public List<MediaType> getWritableMediaTypes() {
return SUPPORTED_MEDIA_TYPES;
}
@Override
public Flux<Resource> read(ResolvableType type,
ReactiveHttpInputMessage inputMessage) {
Class<?> clazz = type.getRawClass();
Flux<DataBuffer> body = inputMessage.getBody();
if (InputStreamResource.class.equals(clazz)) {
InputStream is = DataBufferUtils.toInputStream(body);
return Flux.just(new InputStreamResource(is));
}
else if (clazz.isAssignableFrom(ByteArrayResource.class)) {
Mono<DataBuffer> singleBuffer = body.reduce(DataBuffer::write);
return Flux.from(singleBuffer.map(buffer -> {
byte[] bytes = new byte[buffer.readableByteCount()];
buffer.read(bytes);
return new ByteArrayResource(bytes);
}));
}
else {
return Flux.error(new IllegalStateException(
"Unsupported resource class: " + clazz));
}
public ResourceHttpMessageConverter(int bufferSize) {
super(new ResourceEncoder(bufferSize), new ResourceDecoder());
}
@Override
public Mono<Void> write(Publisher<? extends Resource> inputStream,
ResolvableType type, MediaType contentType,
ReactiveHttpOutputMessage outputMessage) {
if (inputStream instanceof Mono) {
// single resource
return Mono.from(Flux.from(inputStream).
flatMap(resource -> {
HttpHeaders headers = outputMessage.getHeaders();
addHeaders(headers, resource, contentType);
if (resource instanceof HttpRangeResource) {
return writePartialContent((HttpRangeResource) resource,
outputMessage);
}
else {
return writeContent(resource, outputMessage, 0, -1);
}
}));
}
else {
// multiple resources, not supported!
return Mono.error(new IllegalArgumentException(
"Multiple resources not yet supported"));
}
return Mono.from(Flux.from(inputStream).
take(1).
concatMap(resource -> {
HttpHeaders headers = outputMessage.getHeaders();
addHeaders(headers, resource, contentType);
return writeContent(resource, type, contentType, outputMessage);
}));
}
protected void addHeaders(HttpHeaders headers, Resource resource,
@ -146,54 +82,24 @@ public class ResourceHttpMessageConverter implements HttpMessageConverter<Resour @@ -146,54 +82,24 @@ public class ResourceHttpMessageConverter implements HttpMessageConverter<Resour
if (headers.getContentLength() < 0) {
contentLength(resource).ifPresent(headers::setContentLength);
}
headers.add(HttpHeaders.ACCEPT_RANGES, "bytes");
}
private Mono<Void> writeContent(Resource resource,
ReactiveHttpOutputMessage outputMessage, long position, long count) {
private Mono<Void> writeContent(Resource resource, ResolvableType type,
MediaType contentType, ReactiveHttpOutputMessage outputMessage) {
if (outputMessage instanceof ZeroCopyHttpOutputMessage) {
Optional<File> file = getFile(resource);
if (file.isPresent()) {
ZeroCopyHttpOutputMessage zeroCopyResponse =
(ZeroCopyHttpOutputMessage) outputMessage;
if (count < 0) {
count = file.get().length();
}
return zeroCopyResponse.setBody(file.get(), position, count);
}
}
// non-zero copy fallback
try {
InputStream is = resource.getInputStream();
long skipped = is.skip(position);
if (skipped < position) {
return Mono.error(new IOException(
"Skipped only " + skipped + " bytes out of " + count +
" required."));
}
Flux<DataBuffer> responseBody =
DataBufferUtils.read(is, outputMessage.allocator(), BUFFER_SIZE);
if (count > 0) {
responseBody = DataBufferUtils.takeUntilByteCount(responseBody, count);
return zeroCopyResponse
.setBody(file.get(), (long) 0, file.get().length());
}
return outputMessage.setBody(responseBody);
}
catch (IOException ex) {
return Mono.error(ex);
}
}
protected Mono<Void> writePartialContent(HttpRangeResource resource,
ReactiveHttpOutputMessage outputMessage) {
// TODO: implement
return Mono.empty();
// non-zero copy fallback, using ResourceEncoder
return super.write(Mono.just(resource), type,
outputMessage.getHeaders().getContentType(), outputMessage);
}
private static Optional<Long> contentLength(Resource resource) {
@ -210,18 +116,12 @@ public class ResourceHttpMessageConverter implements HttpMessageConverter<Resour @@ -210,18 +116,12 @@ public class ResourceHttpMessageConverter implements HttpMessageConverter<Resour
}
private static Optional<File> getFile(Resource resource) {
// TODO: introduce Resource.hasFile() property to bypass the potential IOException thrown in Resource.getFile()
// the following Resource implementations do not support getURI/getFile
if (!(resource instanceof ByteArrayResource ||
resource instanceof DescriptiveResource ||
resource instanceof InputStreamResource)) {
if (ResourceUtils2.hasFile(resource)) {
try {
URI resourceUri = resource.getURI();
if (ResourceUtils.URL_PROTOCOL_FILE.equals(resourceUri.getScheme())) {
return Optional.of(ResourceUtils.getFile(resourceUri));
}
return Optional.of(resource.getFile());
}
catch (IOException ignored) {
// should not happen
}
}
return Optional.empty();

80
spring-web-reactive/src/test/java/org/springframework/core/codec/support/ResourceDecoderTests.java

@ -0,0 +1,80 @@ @@ -0,0 +1,80 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.codec.support;
import java.io.IOException;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.test.TestSubscriber;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.InputStreamResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.MediaType;
import org.springframework.util.StreamUtils;
import static org.junit.Assert.*;
/**
* @author Arjen Poutsma
*/
public class ResourceDecoderTests extends AbstractAllocatingTestCase {
private final ResourceDecoder decoder = new ResourceDecoder();
@Test
public void canDecode() throws Exception {
assertTrue(decoder.canDecode(ResolvableType.forClass(InputStreamResource.class),
MediaType.TEXT_PLAIN));
assertTrue(decoder.canDecode(ResolvableType.forClass(ByteArrayResource.class),
MediaType.TEXT_PLAIN));
assertTrue(decoder.canDecode(ResolvableType.forClass(Resource.class),
MediaType.TEXT_PLAIN));
assertTrue(decoder.canDecode(ResolvableType.forClass(InputStreamResource.class),
MediaType.APPLICATION_JSON));
}
@Test
public void decode() throws Exception {
DataBuffer fooBuffer = stringBuffer("foo");
DataBuffer barBuffer = stringBuffer("bar");
Flux<DataBuffer> source = Flux.just(fooBuffer, barBuffer);
Flux<Resource> result =
decoder.decode(source, ResolvableType.forClass(Resource.class), null);
TestSubscriber<Resource> testSubscriber = new TestSubscriber<>();
testSubscriber.bindTo(result).
assertNoError().
assertComplete().
assertValuesWith(resource -> {
try {
byte[] bytes =
StreamUtils.copyToByteArray(resource.getInputStream());
assertEquals("foobar", new String(bytes));
}
catch (IOException e) {
fail(e.getMessage());
}
});
}
}

71
spring-web-reactive/src/test/java/org/springframework/core/codec/support/ResourceEncoderTests.java

@ -0,0 +1,71 @@ @@ -0,0 +1,71 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.codec.support;
import java.nio.charset.StandardCharsets;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.core.io.InputStreamResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.MediaType;
import static org.junit.Assert.assertTrue;
/**
* @author Arjen Poutsma
*/
public class ResourceEncoderTests extends AbstractAllocatingTestCase {
private final ResourceEncoder encoder = new ResourceEncoder();
@Test
public void canEncode() throws Exception {
assertTrue(encoder.canEncode(ResolvableType.forClass(InputStreamResource.class),
MediaType.TEXT_PLAIN));
assertTrue(encoder.canEncode(ResolvableType.forClass(ByteArrayResource.class),
MediaType.TEXT_PLAIN));
assertTrue(encoder.canEncode(ResolvableType.forClass(Resource.class),
MediaType.TEXT_PLAIN));
assertTrue(encoder.canEncode(ResolvableType.forClass(InputStreamResource.class),
MediaType.APPLICATION_JSON));
}
@Test
public void encode() throws Exception {
String s = "foo";
Resource resource = new ByteArrayResource(s.getBytes(StandardCharsets.UTF_8));
Mono<Resource> source = Mono.just(resource);
Flux<DataBuffer> output =
encoder.encode(source, allocator, ResolvableType.forClass(Resource.class),
null);
TestSubscriber<DataBuffer> testSubscriber = new TestSubscriber<>();
testSubscriber.bindTo(output).assertNoError().assertComplete()
.assertValues(stringBuffer(s));
}
}
Loading…
Cancel
Save