Browse Source

Removed DataBufferUtils.toInputStream

Replaced all usages of toInputStream with non-blocking alternatives.
pull/1111/head
Arjen Poutsma 10 years ago
parent
commit
cc8b2109a9
  1. 26
      spring-web-reactive/src/main/java/org/springframework/core/codec/support/ResourceDecoder.java
  2. 33
      spring-web-reactive/src/main/java/org/springframework/core/codec/support/XmlEventDecoder.java
  3. 21
      spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java
  4. 60
      spring-web-reactive/src/main/java/org/springframework/util/CollectionUtils2.java
  5. 24
      spring-web-reactive/src/test/java/org/springframework/core/codec/support/XmlEventDecoderTests.java
  6. 170
      spring-web-reactive/src/test/java/org/springframework/http/server/reactive/XmlHandlerIntegrationTests.java

26
spring-web-reactive/src/main/java/org/springframework/core/codec/support/ResourceDecoder.java

@ -16,7 +16,7 @@ @@ -16,7 +16,7 @@
package org.springframework.core.codec.support;
import java.io.InputStream;
import java.io.ByteArrayInputStream;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
@ -55,20 +55,24 @@ public class ResourceDecoder extends AbstractDecoder<Resource> { @@ -55,20 +55,24 @@ public class ResourceDecoder extends AbstractDecoder<Resource> {
MimeType mimeType, Object... hints) {
Class<?> clazz = type.getRawClass();
Flux<DataBuffer> body = Flux.from(inputStream);
Mono<byte[]> byteArray = Flux.from(inputStream).
reduce(DataBuffer::write).
map(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
return bytes;
});
if (InputStreamResource.class.equals(clazz)) {
InputStream is = DataBufferUtils.toInputStream(body);
return Flux.just(new InputStreamResource(is));
return Flux.from(byteArray.
map(ByteArrayInputStream::new).
map(InputStreamResource::new));
}
else if (clazz.isAssignableFrom(ByteArrayResource.class)) {
Mono<DataBuffer> singleBuffer = body.reduce(DataBuffer::write);
return Flux.from(singleBuffer.map(buffer -> {
byte[] bytes = new byte[buffer.readableByteCount()];
buffer.read(bytes);
DataBufferUtils.release(buffer);
return new ByteArrayResource(bytes);
}));
return Flux.from(byteArray.
map(ByteArrayResource::new));
}
else {
return Flux.error(new IllegalStateException(

33
spring-web-reactive/src/main/java/org/springframework/core/codec/support/XmlEventDecoder.java

@ -16,7 +16,6 @@ @@ -16,7 +16,6 @@
package org.springframework.core.codec.support;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
@ -76,6 +75,8 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> { @@ -76,6 +75,8 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> {
private static final XMLInputFactory inputFactory = XMLInputFactory.newFactory();
boolean useAalto = true;
public XmlEventDecoder() {
super(MimeTypeUtils.APPLICATION_XML, MimeTypeUtils.TEXT_XML);
}
@ -83,21 +84,25 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> { @@ -83,21 +84,25 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> {
@Override
public Flux<XMLEvent> decode(Publisher<DataBuffer> inputStream, ResolvableType type,
MimeType mimeType, Object... hints) {
if (aaltoPresent) {
return Flux.from(inputStream).flatMap(new AaltoDataBufferToXmlEvent());
Flux<DataBuffer> flux = Flux.from(inputStream);
if (useAalto && aaltoPresent) {
return flux.flatMap(new AaltoDataBufferToXmlEvent());
}
else {
try {
InputStream blockingStream = DataBufferUtils.toInputStream(inputStream);
XMLEventReader eventReader =
inputFactory.createXMLEventReader(blockingStream);
return Flux.fromIterable((Iterable<XMLEvent>) () -> eventReader);
}
catch (XMLStreamException ex) {
return Flux.error(ex);
}
Mono<DataBuffer> singleBuffer = flux.reduce(DataBuffer::write);
return singleBuffer.
map(DataBuffer::asInputStream).
flatMap(is -> {
try {
XMLEventReader eventReader =
inputFactory.createXMLEventReader(is);
return Flux
.fromIterable((Iterable<XMLEvent>) () -> eventReader);
}
catch (XMLStreamException ex) {
return Mono.error(ex);
}
});
}
}

21
spring-web-reactive/src/main/java/org/springframework/core/io/buffer/support/DataBufferUtils.java

@ -19,11 +19,9 @@ package org.springframework.core.io.buffer.support; @@ -19,11 +19,9 @@ package org.springframework.core.io.buffer.support;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.Enumeration;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@ -37,7 +35,6 @@ import org.springframework.core.io.buffer.DataBuffer; @@ -37,7 +35,6 @@ import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferAllocator;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils2;
/**i
* Utility class for working with {@link DataBuffer}s.
@ -56,24 +53,6 @@ public abstract class DataBufferUtils { @@ -56,24 +53,6 @@ public abstract class DataBufferUtils {
}
};
/**
* Returns the given data buffer publisher as a blocking input stream, streaming over
* all underlying buffers when available.
* @param publisher the publisher to create the input stream for
* @return the input stream
*/
public static InputStream toInputStream(Publisher<DataBuffer> publisher) {
Iterable<InputStream> streams = Flux.from(publisher).
map(DataBuffer::asInputStream).
toIterable(1);
Enumeration<InputStream> enumeration =
CollectionUtils2.toEnumeration(streams.iterator());
return new SequenceInputStream(enumeration);
}
/**
* Reads the given {@code ReadableByteChannel} into a {@code Flux} of
* {@code DataBuffer}s. Closes the channel when the flux is terminated.

60
spring-web-reactive/src/main/java/org/springframework/util/CollectionUtils2.java

@ -1,60 +0,0 @@ @@ -1,60 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.util;
import java.util.Enumeration;
import java.util.Iterator;
/**
* TODO: to be merged with {@link CollectionUtils}
* @author Arjen Poutsma
*/
public abstract class CollectionUtils2 {
/**
* Adapt an iterator to an enumeration.
* @param iterator the iterator
* @return the enumeration
*/
public static <E> Enumeration<E> toEnumeration(Iterator<E> iterator) {
return new IteratorEnumeration<E>(iterator);
}
/**
* Enumeration wrapping an Iterator.
*/
private static class IteratorEnumeration<T> implements Enumeration<T> {
private final Iterator<T> iterator;
public IteratorEnumeration(Iterator<T> iterator) {
this.iterator = iterator;
}
@Override
public boolean hasMoreElements() {
return this.iterator.hasNext();
}
@Override
public T nextElement() {
return this.iterator.next();
}
}
}

24
spring-web-reactive/src/test/java/org/springframework/core/codec/support/XmlEventDecoderTests.java

@ -41,7 +41,7 @@ public class XmlEventDecoderTests extends AbstractDataBufferAllocatingTestCase { @@ -41,7 +41,7 @@ public class XmlEventDecoderTests extends AbstractDataBufferAllocatingTestCase {
private XmlEventDecoder decoder = new XmlEventDecoder();
@Test
public void toXMLEvents() {
public void toXMLEventsAalto() {
Flux<XMLEvent> events =
this.decoder.decode(Flux.just(stringBuffer(XML)), null, null);
@ -61,6 +61,28 @@ public class XmlEventDecoderTests extends AbstractDataBufferAllocatingTestCase { @@ -61,6 +61,28 @@ public class XmlEventDecoderTests extends AbstractDataBufferAllocatingTestCase {
e -> assertEndElement(e, "pojo"));
}
@Test
public void toXMLEventsNonAalto() {
decoder.useAalto = false;
Flux<XMLEvent> events =
this.decoder.decode(Flux.just(stringBuffer(XML)), null, null);
TestSubscriber<XMLEvent> testSubscriber = new TestSubscriber<>();
testSubscriber.bindTo(events).
assertNoError().
assertComplete().
assertValuesWith(e -> assertTrue(e.isStartDocument()),
e -> assertStartElement(e, "pojo"),
e -> assertStartElement(e, "foo"),
e -> assertCharacters(e, "foofoo"),
e -> assertEndElement(e, "foo"),
e -> assertStartElement(e, "bar"),
e -> assertCharacters(e, "barbar"),
e -> assertEndElement(e, "bar"), e -> assertEndElement(e, "pojo"),
e -> assertTrue(e.isEndDocument()));
}
private static void assertStartElement(XMLEvent event, String expectedLocalName) {
assertTrue(event.isStartElement());
assertEquals(expectedLocalName, event.asStartElement().getName().getLocalPart());

170
spring-web-reactive/src/test/java/org/springframework/http/server/reactive/XmlHandlerIntegrationTests.java

@ -1,170 +0,0 @@ @@ -1,170 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.Marshaller;
import javax.xml.bind.Unmarshaller;
import javax.xml.bind.annotation.XmlRootElement;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferAllocator;
import org.springframework.core.io.buffer.support.DataBufferUtils;
import org.springframework.http.MediaType;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.http.server.reactive.boot.ReactorHttpServer;
import org.springframework.http.server.reactive.boot.RxNettyHttpServer;
import org.springframework.web.client.RestTemplate;
import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeFalse;
/**
* @author Arjen Poutsma
*/
public class XmlHandlerIntegrationTests extends AbstractHttpHandlerIntegrationTests {
private final XmlHandler handler = new XmlHandler();
@Override
protected HttpHandler createHttpHandler() {
return handler;
}
@Test
public void xml() throws Exception {
// TODO: fix Reactor and RxNetty support
assumeFalse(server instanceof ReactorHttpServer ||
server instanceof RxNettyHttpServer);
RestTemplate restTemplate = new RestTemplate();
Person johnDoe = new Person("John Doe");
Person janeDoe = new Person("Jane Doe");
RequestEntity<Person> request = RequestEntity.post(new URI("http://localhost:" + port)).body(
johnDoe);
ResponseEntity<Person> response = restTemplate.exchange(request, Person.class);
assertEquals(janeDoe, response.getBody());
handler.requestComplete.await(10, TimeUnit.SECONDS);
if (handler.requestError != null) {
throw handler.requestError;
}
assertEquals(johnDoe, handler.requestPerson);
}
private static class XmlHandler implements HttpHandler {
private CountDownLatch requestComplete = new CountDownLatch(1);
private Person requestPerson;
private Exception requestError;
@Override
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
requestError = null;
try {
JAXBContext jaxbContext = JAXBContext.newInstance(Person.class);
Marshaller marshaller = jaxbContext.createMarshaller();
Runnable r = () -> {
try {
InputStream bis =
DataBufferUtils.toInputStream(request.getBody());
Unmarshaller unmarshaller = jaxbContext.createUnmarshaller();
requestPerson = (Person) unmarshaller.unmarshal(bis);
}
catch (Exception ex) {
requestError = ex;
}
finally {
requestComplete.countDown();
}
};
Thread t = new Thread(r);
t.start();
response.getHeaders().setContentType(MediaType.APPLICATION_XML);
Person janeDoe = new Person("Jane Doe");
DataBuffer buffer = new DefaultDataBufferAllocator().allocateBuffer();
OutputStream bos = buffer.asOutputStream();
marshaller.marshal(janeDoe, bos);
bos.close();
return response.setBody(Flux.just(buffer));
}
catch (Exception ex) {
return Mono.error(ex);
}
}
}
@XmlRootElement
private static class Person {
private String name;
public Person() {
}
public Person(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public boolean equals(Object o) {
return name.equals(((Person) o).name);
}
@Override
public int hashCode() {
return name.hashCode();
}
@Override
public String toString() {
return name;
}
}
}
Loading…
Cancel
Save