From adc50bbfb9e95e742c36ee3e99c1550e6638bcfa Mon Sep 17 00:00:00 2001 From: Sebastien Deleuze Date: Mon, 19 Oct 2015 17:00:52 +0200 Subject: [PATCH] Add handler method parameter and result converters This commit introduces the following changes: - Publisher -> Observable/Stream/etc. conversion is now managed in a dedicated ConversionService instead of directly in RequestBodyArgumentResolver and ResponseBodyResultHandler - More isolated logic that decides if the stream should be serialized as a JSON array or not - Publisher are now handled by regular ByteBufferEncoder and ByteBufferDecoder - Handle Publisher return value properly - Ensure that the headers are properly written even for response without body - Improve JsonObjectEncoder to autodetect JSON arrays --- spring-web-reactive/build.gradle | 16 +-- ...veStreamsToCompletableFutureConverter.java | 53 ++++++++++ .../ReactiveStreamsToReactorConverter.java | 62 ++++++++++++ .../ReactiveStreamsToRxJava1Converter.java | 65 +++++++++++++ .../codec/decoder/ByteBufferDecoder.java | 40 ++++++++ .../codec/decoder/ByteToMessageDecoder.java | 6 +- .../codec/decoder/JsonObjectDecoder.java | 11 +-- .../reactive/codec/decoder/StringDecoder.java | 3 +- .../codec/encoder/ByteBufferEncoder.java | 41 ++++++++ .../codec/encoder/JsonObjectEncoder.java | 75 ++++++-------- .../codec/encoder/MessageToByteEncoder.java | 6 +- .../reactive/codec/encoder/StringEncoder.java | 3 +- .../web/dispatch/DispatcherHandler.java | 1 + .../dispatch/SimpleHandlerResultHandler.java | 6 +- .../annotation/DefaultConversionService.java | 47 +++++++++ .../RequestBodyArgumentResolver.java | 97 ++++++------------- .../RequestMappingHandlerAdapter.java | 9 +- .../annotation/ResponseBodyResultHandler.java | 67 ++++++------- .../reactive/web/http/ServerHttpResponse.java | 11 ++- .../reactor/ReactorServerHttpRequest.java | 4 +- .../reactor/ReactorServerHttpResponse.java | 17 +++- .../rxnetty/RxNettyServerHttpResponse.java | 15 ++- .../servlet/ServletServerHttpResponse.java | 11 ++- .../src/main/resources/log4j.properties | 1 + .../codec/decoder/ByteBufferDecoderTests.java | 58 +++++++++++ .../codec/decoder/JsonObjectDecoderTests.java | 30 +++++- .../codec/decoder/StringDecoderTests.java | 11 ++- .../encoder/ByteBufferDecoderEncoder.java | 58 +++++++++++ .../codec/encoder/JsonObjectEncoderTests.java | 36 ++++++- .../codec/encoder/StringEncoderTests.java | 7 +- .../RequestMappingIntegrationTests.java | 93 +++++++++++++++++- .../ResponseBodyResultHandlerTests.java | 2 +- 32 files changed, 759 insertions(+), 203 deletions(-) create mode 100644 spring-web-reactive/src/main/java/org/springframework/core/convert/support/ReactiveStreamsToCompletableFutureConverter.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/core/convert/support/ReactiveStreamsToReactorConverter.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/core/convert/support/ReactiveStreamsToRxJava1Converter.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/reactive/codec/decoder/ByteBufferDecoder.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/reactive/codec/encoder/ByteBufferEncoder.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/DefaultConversionService.java create mode 100644 spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/ByteBufferDecoderTests.java create mode 100644 spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/ByteBufferDecoderEncoder.java diff --git a/spring-web-reactive/build.gradle b/spring-web-reactive/build.gradle index f3682d0902b..51509778de8 100644 --- a/spring-web-reactive/build.gradle +++ b/spring-web-reactive/build.gradle @@ -31,28 +31,28 @@ configurations.all { } dependencies { - compile "org.springframework:spring-core:4.2.0.RELEASE" - compile "org.springframework:spring-web:4.2.0.RELEASE" + compile "org.springframework:spring-core:4.2.2.RELEASE" + compile "org.springframework:spring-web:4.2.2.RELEASE" compile "org.reactivestreams:reactive-streams:1.0.0" compile "io.projectreactor:reactor-core:2.1.0.BUILD-SNAPSHOT" compile "commons-logging:commons-logging:1.2" - optional "com.fasterxml.jackson.core:jackson-databind:2.6.1" + optional "com.fasterxml.jackson.core:jackson-databind:2.6.2" optional "io.reactivex:rxnetty:0.5.0-SNAPSHOT" optional "io.projectreactor:reactor-net:2.1.0.BUILD-SNAPSHOT" - optional 'org.apache.tomcat:tomcat-util:8.0.24' - optional 'org.apache.tomcat.embed:tomcat-embed-core:8.0.24' + optional 'org.apache.tomcat:tomcat-util:8.0.28' + optional 'org.apache.tomcat.embed:tomcat-embed-core:8.0.28' - optional 'org.eclipse.jetty:jetty-server:9.3.2.v20150730' - optional 'org.eclipse.jetty:jetty-servlet:9.3.2.v20150730' + optional 'org.eclipse.jetty:jetty-server:9.3.5.v20151012' + optional 'org.eclipse.jetty:jetty-servlet:9.3.5.v20151012' provided "javax.servlet:javax.servlet-api:3.1.0" testCompile "junit:junit:4.12" - testCompile "org.springframework:spring-test:4.2.0.RELEASE" + testCompile "org.springframework:spring-test:4.2.2.RELEASE" testCompile "org.slf4j:slf4j-jcl:1.7.12" testCompile "org.slf4j:jul-to-slf4j:1.7.12" diff --git a/spring-web-reactive/src/main/java/org/springframework/core/convert/support/ReactiveStreamsToCompletableFutureConverter.java b/spring-web-reactive/src/main/java/org/springframework/core/convert/support/ReactiveStreamsToCompletableFutureConverter.java new file mode 100644 index 00000000000..046ab43a741 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/core/convert/support/ReactiveStreamsToCompletableFutureConverter.java @@ -0,0 +1,53 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.convert.support; + +import java.util.LinkedHashSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import org.reactivestreams.Publisher; + +import org.springframework.core.convert.TypeDescriptor; +import org.springframework.core.convert.converter.GenericConverter; + +/** + * @author Sebastien Deleuze + */ +public class ReactiveStreamsToCompletableFutureConverter implements GenericConverter { + + @Override + public Set getConvertibleTypes() { + Set convertibleTypes = new LinkedHashSet<>(); + convertibleTypes.add(new GenericConverter.ConvertiblePair(Publisher.class, CompletableFuture.class)); + convertibleTypes.add(new GenericConverter.ConvertiblePair(CompletableFuture.class, Publisher.class)); + return convertibleTypes; + } + + @Override + public Object convert(Object source, TypeDescriptor sourceType, TypeDescriptor targetType) { + if (source != null) { + if (CompletableFuture.class.isAssignableFrom(source.getClass())) { + return reactor.core.publisher.convert.CompletableFutureConverter.from((CompletableFuture)source); + } else if (CompletableFuture.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) { + return reactor.core.publisher.convert.CompletableFutureConverter.fromSingle((Publisher)source); + } + } + return null; + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/core/convert/support/ReactiveStreamsToReactorConverter.java b/spring-web-reactive/src/main/java/org/springframework/core/convert/support/ReactiveStreamsToReactorConverter.java new file mode 100644 index 00000000000..52d894048c1 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/core/convert/support/ReactiveStreamsToReactorConverter.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2011-2015 Pivotal Software Inc, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.convert.support; + +import java.util.LinkedHashSet; +import java.util.Set; + +import org.reactivestreams.Publisher; +import reactor.rx.Promise; +import reactor.rx.Stream; +import reactor.rx.Streams; + +import org.springframework.core.convert.TypeDescriptor; +import org.springframework.core.convert.converter.GenericConverter; + +/** + * @author Stephane Maldini + * @author Sebastien Deleuze + */ +public final class ReactiveStreamsToReactorConverter implements GenericConverter { + + @Override + public Set getConvertibleTypes() { + Set convertibleTypes = new LinkedHashSet<>(); + convertibleTypes.add(new GenericConverter.ConvertiblePair(Publisher.class, Stream.class)); + convertibleTypes.add(new GenericConverter.ConvertiblePair(Stream.class, Publisher.class)); + convertibleTypes.add(new GenericConverter.ConvertiblePair(Publisher.class, Promise.class)); + convertibleTypes.add(new GenericConverter.ConvertiblePair(Promise.class, Publisher.class)); + return convertibleTypes; + } + + @Override + public Object convert(Object source, TypeDescriptor sourceType, TypeDescriptor targetType) { + if (source != null) { + if (Stream.class.isAssignableFrom(source.getClass())) { + return source; + } else if (Stream.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) { + return Streams.wrap((Publisher)source); + } else if (Promise.class.isAssignableFrom(source.getClass())) { + return ((Promise)source); + } else if (Promise.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) { + return Streams.wrap((Publisher)source).next(); + } + } + return null; + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/core/convert/support/ReactiveStreamsToRxJava1Converter.java b/spring-web-reactive/src/main/java/org/springframework/core/convert/support/ReactiveStreamsToRxJava1Converter.java new file mode 100644 index 00000000000..b4b2043dcb5 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/core/convert/support/ReactiveStreamsToRxJava1Converter.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2011-2015 Pivotal Software Inc, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.convert.support; + +import java.util.LinkedHashSet; +import java.util.Set; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.convert.RxJava1Converter; +import rx.Observable; +import rx.Single; + +import org.springframework.core.convert.TypeDescriptor; +import org.springframework.core.convert.converter.GenericConverter; + +/** + * TODO Avoid classpath exception for older RxJava1 version without Single type + * @author Stephane Maldini + * @author Sebastien Deleuze + */ +public final class ReactiveStreamsToRxJava1Converter implements GenericConverter { + + @Override + public Set getConvertibleTypes() { + Set convertibleTypes = new LinkedHashSet<>(); + convertibleTypes.add(new GenericConverter.ConvertiblePair(Publisher.class, Observable.class)); + convertibleTypes.add(new GenericConverter.ConvertiblePair(Observable.class, Publisher.class)); + convertibleTypes.add(new GenericConverter.ConvertiblePair(Publisher.class, Single.class)); + convertibleTypes.add(new GenericConverter.ConvertiblePair(Single.class, Publisher.class)); + return convertibleTypes; + } + + @Override + public Object convert(Object source, TypeDescriptor sourceType, TypeDescriptor targetType) { + if (source != null) { + if (Observable.class.isAssignableFrom(source.getClass())) { + return RxJava1Converter.from((Observable) source); + } + else if (Observable.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) { + return RxJava1Converter.from((Publisher)source); + } + else if (Single.class.isAssignableFrom(source.getClass())) { + return reactor.core.publisher.convert.RxJava1SingleConverter.from((Single) source); + } else if (Single.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) { + return reactor.core.publisher.convert.RxJava1SingleConverter.from((Publisher)source); + } + } + return null; + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/codec/decoder/ByteBufferDecoder.java b/spring-web-reactive/src/main/java/org/springframework/reactive/codec/decoder/ByteBufferDecoder.java new file mode 100644 index 00000000000..b740dab95ba --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/codec/decoder/ByteBufferDecoder.java @@ -0,0 +1,40 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.reactive.codec.decoder; + +import java.nio.ByteBuffer; + +import org.reactivestreams.Publisher; + +import org.springframework.core.ResolvableType; +import org.springframework.http.MediaType; + +/** + * @author Sebastien Deleuze + */ +public class ByteBufferDecoder implements ByteToMessageDecoder { + + @Override + public boolean canDecode(ResolvableType type, MediaType mediaType, Object... hints) { + return ByteBuffer.class.isAssignableFrom(type.getRawClass()); + } + + @Override + public Publisher decode(Publisher inputStream, ResolvableType type, MediaType mediaType, Object... hints) { + return inputStream; + } +} diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/codec/decoder/ByteToMessageDecoder.java b/spring-web-reactive/src/main/java/org/springframework/reactive/codec/decoder/ByteToMessageDecoder.java index 888bb695718..3be8b83daea 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/codec/decoder/ByteToMessageDecoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/codec/decoder/ByteToMessageDecoder.java @@ -34,8 +34,7 @@ public interface ByteToMessageDecoder { /** * Indicate whether the given type and media type can be processed by this decoder. - * @param type the (potentially generic) type to ultimately decode to. - * Could be different from {@code T} type. + * @param type the stream element type to ultimately decode to. * @param mediaType the media type to decode from. * Typically the value of a {@code Content-Type} header for HTTP request. * @param hints Additional information about how to do decode, optional. @@ -46,8 +45,7 @@ public interface ByteToMessageDecoder { /** * Decode a bytes stream to a message stream. * @param inputStream the input stream that represent the whole object to decode. - * @param type the (potentially generic) type to ultimately decode to. - * Could be different from {@code T} type. + * @param type the stream element type to ultimately decode to. * @param hints Additional information about how to do decode, optional. * @return the decoded message stream */ diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/codec/decoder/JsonObjectDecoder.java b/spring-web-reactive/src/main/java/org/springframework/reactive/codec/decoder/JsonObjectDecoder.java index c1f9c359268..9831b4035f4 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/codec/decoder/JsonObjectDecoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/codec/decoder/JsonObjectDecoder.java @@ -23,22 +23,18 @@ import org.reactivestreams.Publisher; import org.springframework.core.ResolvableType; import org.springframework.http.MediaType; import org.springframework.reactive.codec.encoder.JsonObjectEncoder; + import reactor.Publishers; import reactor.fn.Function; -import reactor.rx.Promise; -import rx.Observable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; /** - * Decode an arbitrary split byte stream representing JSON objects to a bye stream + * Decode an arbitrary split byte stream representing JSON objects to a byte stream * where each chunk is a well-formed JSON object. * - * If {@code Hints.STREAM_ARRAY_ELEMENTS} is enabled, each element of top level JSON array - * will be streamed as an individual JSON object. - * * This class does not do any real parsing or validation. A sequence of bytes is considered a JSON object/array * if it contains a matching number of opening and closing braces/brackets. * @@ -90,8 +86,7 @@ public class JsonObjectDecoder implements ByteToMessageDecoder { @Override public boolean canDecode(ResolvableType type, MediaType mediaType, Object... hints) { - return mediaType.isCompatibleWith(MediaType.APPLICATION_JSON) && !Promise.class.isAssignableFrom(type.getRawClass()) && - (Observable.class.isAssignableFrom(type.getRawClass()) || Publisher.class.isAssignableFrom(type.getRawClass())); + return mediaType.isCompatibleWith(MediaType.APPLICATION_JSON); } @Override diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/codec/decoder/StringDecoder.java b/spring-web-reactive/src/main/java/org/springframework/reactive/codec/decoder/StringDecoder.java index b64bd81edb6..91742a84b7f 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/codec/decoder/StringDecoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/codec/decoder/StringDecoder.java @@ -41,7 +41,8 @@ public class StringDecoder implements ByteToMessageDecoder { @Override public boolean canDecode(ResolvableType type, MediaType mediaType, Object... hints) { - return mediaType.isCompatibleWith(MediaType.TEXT_PLAIN); + return mediaType.isCompatibleWith(MediaType.TEXT_PLAIN) + && String.class.isAssignableFrom(type.getRawClass()); } @Override diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/codec/encoder/ByteBufferEncoder.java b/spring-web-reactive/src/main/java/org/springframework/reactive/codec/encoder/ByteBufferEncoder.java new file mode 100644 index 00000000000..9b829f3d099 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/codec/encoder/ByteBufferEncoder.java @@ -0,0 +1,41 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.reactive.codec.encoder; + +import java.nio.ByteBuffer; + +import org.reactivestreams.Publisher; + +import org.springframework.core.ResolvableType; +import org.springframework.http.MediaType; + +/** + * @author Sebastien Deleuze + */ +public class ByteBufferEncoder implements MessageToByteEncoder { + + @Override + public boolean canEncode(ResolvableType type, MediaType mediaType, Object... hints) { + return ByteBuffer.class.isAssignableFrom(type.getRawClass()); + } + + @Override + public Publisher encode(Publisher messageStream, ResolvableType type, MediaType mediaType, Object... hints) { + return (Publisher)messageStream; + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/codec/encoder/JsonObjectEncoder.java b/spring-web-reactive/src/main/java/org/springframework/reactive/codec/encoder/JsonObjectEncoder.java index dd324a3c6f2..da7856a571a 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/codec/encoder/JsonObjectEncoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/codec/encoder/JsonObjectEncoder.java @@ -21,21 +21,17 @@ import org.reactivestreams.Subscriber; import org.springframework.core.ResolvableType; import org.springframework.http.MediaType; import org.springframework.reactive.codec.decoder.JsonObjectDecoder; -import org.springframework.util.ClassUtils; import reactor.core.subscriber.SubscriberBarrier; -import reactor.io.buffer.Buffer; -import reactor.rx.Promise; -import rx.Observable; import java.nio.ByteBuffer; -import java.util.Arrays; import static reactor.Publishers.*; +import reactor.io.buffer.Buffer; /** - * Encode a bye stream of individual JSON element to a byte stream representing a single - * JSON array when {@code Hints.ENCODE_AS_ARRAY} is enabled. + * Encode a byte stream of individual JSON element to a byte stream representing a single + * JSON array when if it contains more than one element. * * @author Sebastien Deleuze * @author Stephane Maldini @@ -44,57 +40,31 @@ import static reactor.Publishers.*; */ public class JsonObjectEncoder implements MessageToByteEncoder { - private static final boolean rxJava1Present = - ClassUtils.isPresent("rx.Observable", JsonObjectEncoder.class.getClassLoader()); - - private static final boolean reactorPresent = - ClassUtils.isPresent("reactor.rx.Promise", JsonObjectEncoder.class.getClassLoader()); - - final ByteBuffer START_ARRAY = ByteBuffer.wrap("[".getBytes()); - - final ByteBuffer END_ARRAY = ByteBuffer.wrap("]".getBytes()); - - final ByteBuffer COMMA = ByteBuffer.wrap(",".getBytes()); - @Override public boolean canEncode(ResolvableType type, MediaType mediaType, Object... hints) { - return mediaType.isCompatibleWith(MediaType.APPLICATION_JSON) && - !(reactorPresent && Promise.class.isAssignableFrom(type.getRawClass())) && - (rxJava1Present && Observable.class.isAssignableFrom(type.getRawClass()) - || Publisher.class.isAssignableFrom(type.getRawClass())); + return mediaType.isCompatibleWith(MediaType.APPLICATION_JSON); } @Override - public Publisher encode(Publisher messageStream, ResolvableType type, MediaType - mediaType, Object... hints) { - //TODO Merge some chunks, there is no need to have chunks with only '[', ']' or ',' characters - return - concat( - from( - Arrays.>asList( - just(START_ARRAY), - lift( - flatMap(messageStream, (ByteBuffer b) -> from(Arrays.asList(b, COMMA))), - sub -> new SkipLastBarrier(sub) - ), - just(END_ARRAY) - ) - ) - ); + public Publisher encode(Publisher messageStream, + ResolvableType type, MediaType mediaType, Object... hints) { + return lift(messageStream, sub -> new JsonEncoderBarrier(sub)); } - private static class SkipLastBarrier extends SubscriberBarrier { + private static class JsonEncoderBarrier extends SubscriberBarrier { - public SkipLastBarrier(Subscriber subscriber) { + public JsonEncoderBarrier(Subscriber subscriber) { super(subscriber); } ByteBuffer prev = null; + long count = 0; @Override protected void doNext(ByteBuffer next) { - if (prev == null) { + count++; + if (count == 1) { prev = next; doRequest(1); return; @@ -102,8 +72,27 @@ public class JsonObjectEncoder implements MessageToByteEncoder { ByteBuffer tmp = prev; prev = next; - subscriber.onNext(tmp); + Buffer buffer = new Buffer(); + if (count == 2) { + buffer.append("["); + } + buffer.append(tmp); + buffer.append(","); + buffer.flip(); + subscriber.onNext(buffer.byteBuffer()); } + @Override + protected void doComplete() { + Buffer buffer = new Buffer(); + buffer.append(prev); + if (count > 1) { + buffer.append("]"); + } + buffer.flip(); + subscriber.onNext(buffer.byteBuffer()); + subscriber.onComplete(); + } } + } diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/codec/encoder/MessageToByteEncoder.java b/spring-web-reactive/src/main/java/org/springframework/reactive/codec/encoder/MessageToByteEncoder.java index 9a1b67162fa..0dea2db8bbc 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/codec/encoder/MessageToByteEncoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/codec/encoder/MessageToByteEncoder.java @@ -34,8 +34,7 @@ public interface MessageToByteEncoder { /** * Indicate whether the given type and media type can be processed by this encoder. - * @param type the (potentially generic) type to ultimately encode from. - * Could be different from {@code T} type. + * @param type the stream element type to encode. * @param mediaType the media type to encode. * Typically the value of an {@code Accept} header for HTTP request. * @param hints Additional information about how to encode, optional. @@ -46,8 +45,7 @@ public interface MessageToByteEncoder { /** * Encode a given message stream to the given output byte stream. * @param messageStream the message stream to encode. - * @param type the (potentially generic) type to ultimately encode from. - * Could be different from {@code T} type. + * @param type the stream element type to encode. * @param mediaType the media type to encode. * Typically the value of an {@code Accept} header for HTTP request. * @param hints Additional information about how to encode, optional. diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/codec/encoder/StringEncoder.java b/spring-web-reactive/src/main/java/org/springframework/reactive/codec/encoder/StringEncoder.java index 487007f7d3d..ac267de1c1a 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/codec/encoder/StringEncoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/codec/encoder/StringEncoder.java @@ -40,7 +40,8 @@ public class StringEncoder implements MessageToByteEncoder { @Override public boolean canEncode(ResolvableType type, MediaType mediaType, Object... hints) { - return mediaType.isCompatibleWith(MediaType.TEXT_PLAIN); + return mediaType.isCompatibleWith(MediaType.TEXT_PLAIN) + && String.class.isAssignableFrom(type.getRawClass()); } @Override diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/DispatcherHandler.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/DispatcherHandler.java index 8d1fe02ab9a..b50aebff180 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/DispatcherHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/DispatcherHandler.java @@ -101,6 +101,7 @@ public class DispatcherHandler implements HttpHandler, ApplicationContextAware { if (handler == null) { // No exception handling mechanism yet response.setStatusCode(HttpStatus.NOT_FOUND); + response.writeHeaders(); return Publishers.empty(); } diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/SimpleHandlerResultHandler.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/SimpleHandlerResultHandler.java index c72ad8f5dd1..0e9556eb30f 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/SimpleHandlerResultHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/SimpleHandlerResultHandler.java @@ -16,6 +16,8 @@ package org.springframework.reactive.web.dispatch; +import java.util.Arrays; + import org.reactivestreams.Publisher; import reactor.Publishers; @@ -45,6 +47,8 @@ public class SimpleHandlerResultHandler implements Ordered, HandlerResultHandler @Override public Publisher handleResult(ServerHttpRequest request, ServerHttpResponse response, HandlerResult result) { - return Publishers.completable((Publisher)result.getValue()); + Publisher handleComplete = Publishers.completable((Publisher)result.getValue()); + return Publishers.concat(Publishers.from(Arrays.asList(handleComplete, response.writeHeaders()))); } + } diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/DefaultConversionService.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/DefaultConversionService.java new file mode 100644 index 00000000000..327bd8d8f8c --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/DefaultConversionService.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2011-2015 Pivotal Software Inc, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.reactive.web.dispatch.method.annotation; + +import reactor.core.publisher.convert.DependencyUtils; + +import org.springframework.core.convert.converter.ConverterRegistry; +import org.springframework.core.convert.support.GenericConversionService; +import org.springframework.core.convert.support.ReactiveStreamsToCompletableFutureConverter; +import org.springframework.core.convert.support.ReactiveStreamsToReactorConverter; +import org.springframework.core.convert.support.ReactiveStreamsToRxJava1Converter; + +/** + * TODO temporary class designed to be replaced by org.springframework.core.convert.support.DefaultConversionService when it will contain Reactive Streams converter + * @author Sebastien Deleuze + */ +class DefaultConversionService extends GenericConversionService { + + public DefaultConversionService() { + addDefaultConverters(this); + } + + public static void addDefaultConverters(ConverterRegistry converterRegistry) { + converterRegistry.addConverter(new ReactiveStreamsToCompletableFutureConverter()); + if (DependencyUtils.hasReactorStream()) { + converterRegistry.addConverter(new ReactiveStreamsToReactorConverter()); + } + if (DependencyUtils.hasRxJava1()) { + converterRegistry.addConverter(new ReactiveStreamsToRxJava1Converter()); + } + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/RequestBodyArgumentResolver.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/RequestBodyArgumentResolver.java index 8a77f0b298e..daf58211687 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/RequestBodyArgumentResolver.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/RequestBodyArgumentResolver.java @@ -16,27 +16,10 @@ package org.springframework.reactive.web.dispatch.method.annotation; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - import org.reactivestreams.Publisher; -import reactor.Publishers; -import reactor.core.publisher.convert.CompletableFutureConverter; -import reactor.core.publisher.convert.RxJava1Converter; -import reactor.core.publisher.convert.RxJava1SingleConverter; -import reactor.rx.Promise; -import reactor.rx.Stream; -import reactor.rx.Streams; -import rx.Observable; -import rx.Single; - import org.springframework.core.MethodParameter; import org.springframework.core.ResolvableType; +import org.springframework.core.convert.ConversionService; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.reactive.codec.decoder.ByteToMessageDecoder; @@ -44,8 +27,15 @@ import org.springframework.reactive.web.dispatch.method.HandlerMethodArgumentRes import org.springframework.reactive.web.http.ServerHttpRequest; import org.springframework.web.bind.annotation.RequestBody; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + /** * @author Sebastien Deleuze + * @author Stephane Maldini */ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolver { @@ -53,14 +43,19 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve private final List> deserializers; private final List> preProcessors; + private final ConversionService conversionService; - public RequestBodyArgumentResolver(List> deserializers) { - this(deserializers, Collections.EMPTY_LIST); + public RequestBodyArgumentResolver(List> deserializers, + ConversionService conversionService) { + this(deserializers, conversionService, Collections.EMPTY_LIST); } - public RequestBodyArgumentResolver(List> deserializers, List> preProcessors) { + public RequestBodyArgumentResolver(List> deserializers, + ConversionService conversionService, + List> preProcessors) { this.deserializers = deserializers; + this.conversionService = conversionService; this.preProcessors = preProcessors; } @@ -70,61 +65,31 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve } @Override + @SuppressWarnings("unchecked") public Object resolveArgument(MethodParameter parameter, ServerHttpRequest request) { MediaType mediaType = resolveMediaType(request); ResolvableType type = ResolvableType.forMethodParameter(parameter); List hints = new ArrayList<>(); hints.add(UTF_8); - - // TODO: Refactor type conversion - ResolvableType readType = type; - if (Observable.class.isAssignableFrom(type.getRawClass()) || - Single.class.isAssignableFrom(type.getRawClass()) || - Promise.class.isAssignableFrom(type.getRawClass()) || - Publisher.class.isAssignableFrom(type.getRawClass()) || - CompletableFuture.class.isAssignableFrom(type.getRawClass())) { - readType = type.getGeneric(0); - } - - ByteToMessageDecoder deserializer = resolveDeserializers(request, type, mediaType, hints.toArray()); + Publisher inputStream = request.getBody(); + Publisher elementStream = inputStream; + ResolvableType elementType = type.hasGenerics() ? type.getGeneric(0) : type; + ByteToMessageDecoder deserializer = resolveDeserializers(request, elementType, mediaType, hints.toArray()); if (deserializer != null) { - - Publisher inputStream = request.getBody(); - List> preProcessors = resolvePreProcessors(request, type, mediaType, hints.toArray()); + List> preProcessors = + resolvePreProcessors(request, elementType, mediaType,hints.toArray()); for (ByteToMessageDecoder preProcessor : preProcessors) { - inputStream = preProcessor.decode(inputStream, type, mediaType, hints.toArray()); - } - Publisher elementStream = deserializer.decode(inputStream, readType, mediaType, UTF_8); - - // TODO: Refactor type conversion - if (Stream.class.isAssignableFrom(type.getRawClass())) { - return Streams.wrap(elementStream); - } - else if (Promise.class.isAssignableFrom(type.getRawClass())) { - return Streams.wrap(elementStream).take(1).next(); - } - else if (Observable.class.isAssignableFrom(type.getRawClass())) { - return RxJava1Converter.from(elementStream); - } - else if (Single.class.isAssignableFrom(type.getRawClass())) { - return RxJava1SingleConverter.from(elementStream); - } - else if (CompletableFuture.class.isAssignableFrom(type.getRawClass())) { - return CompletableFutureConverter.fromSingle(elementStream); - } - else if (Publisher.class.isAssignableFrom(type.getRawClass())) { - return elementStream; - } - else { - try { - return Publishers.toReadQueue(elementStream, 1, true).poll(30, TimeUnit.SECONDS); - } catch(InterruptedException ex) { - return Publishers.error(new IllegalStateException("Timeout before getter the value")); - } + inputStream = preProcessor.decode(inputStream, elementType, mediaType, hints.toArray()); } + elementStream = deserializer.decode(inputStream, elementType, mediaType, hints.toArray()); + } + if (conversionService.canConvert(Publisher.class, type.getRawClass())) { + return conversionService.convert(elementStream, type.getRawClass()); + } + else { + return elementStream; } - return Publishers.error(new IllegalStateException("Argument type not supported: " + type)); } private MediaType resolveMediaType(ServerHttpRequest request) { diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/RequestMappingHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/RequestMappingHandlerAdapter.java index e2d17472002..6afcb3d44d9 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/RequestMappingHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/RequestMappingHandlerAdapter.java @@ -15,11 +15,14 @@ */ package org.springframework.reactive.web.dispatch.method.annotation; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.springframework.beans.factory.InitializingBean; +import org.springframework.reactive.codec.decoder.ByteBufferDecoder; +import org.springframework.reactive.codec.decoder.ByteToMessageDecoder; import org.springframework.reactive.codec.decoder.JacksonJsonDecoder; import org.springframework.reactive.codec.decoder.JsonObjectDecoder; import org.springframework.reactive.codec.decoder.StringDecoder; @@ -51,7 +54,11 @@ public class RequestMappingHandlerAdapter implements HandlerAdapter, Initializin if (this.argumentResolvers == null) { this.argumentResolvers = new ArrayList<>(); this.argumentResolvers.add(new RequestParamArgumentResolver()); - this.argumentResolvers.add(new RequestBodyArgumentResolver(Arrays.asList(new StringDecoder(), new JacksonJsonDecoder()), Arrays.asList(new JsonObjectDecoder(true)))); + List> deserializers = Arrays.asList(new ByteBufferDecoder(), + new StringDecoder(), new JacksonJsonDecoder()); + List> preProcessors = Arrays.asList(new JsonObjectDecoder()); + this.argumentResolvers.add(new RequestBodyArgumentResolver(deserializers, + new DefaultConversionService(), preProcessors)); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/ResponseBodyResultHandler.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/ResponseBodyResultHandler.java index eee28f723e3..bfd39cdbfff 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/ResponseBodyResultHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/ResponseBodyResultHandler.java @@ -18,9 +18,9 @@ package org.springframework.reactive.web.dispatch.method.annotation; import org.reactivestreams.Publisher; import org.springframework.core.MethodParameter; import org.springframework.core.Ordered; -import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.ResolvableType; import org.springframework.core.annotation.AnnotatedElementUtils; +import org.springframework.core.convert.ConversionService; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.reactive.codec.encoder.MessageToByteEncoder; @@ -31,26 +31,20 @@ import org.springframework.reactive.web.http.ServerHttpResponse; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.method.HandlerMethod; import reactor.Publishers; -import reactor.core.publisher.convert.CompletableFutureConverter; -import reactor.core.publisher.convert.RxJava1Converter; -import reactor.core.publisher.convert.RxJava1SingleConverter; -import reactor.rx.Promise; -import rx.Observable; -import rx.Single; - -import java.lang.reflect.Type; + import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.CompletableFuture; /** * First version using {@link MessageToByteEncoder}s * * @author Rossen Stoyanchev + * @author Stephane Maldini + * @author Sebastien Deleuze */ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered { @@ -59,6 +53,7 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered private final List> serializers; private final List> postProcessors; + private final ConversionService conversionService; private int order = 0; @@ -68,8 +63,14 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered } public ResponseBodyResultHandler(List> serializers, List> postProcessors) { + this(serializers, postProcessors, new DefaultConversionService()); + } + + public ResponseBodyResultHandler(List> serializers, List> + postProcessors, ConversionService conversionService) { this.serializers = serializers; this.postProcessors = postProcessors; + this.conversionService = conversionService; } public void setOrder(int order) { @@ -87,14 +88,13 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered Object handler = result.getHandler(); if (handler instanceof HandlerMethod) { HandlerMethod handlerMethod = (HandlerMethod) handler; - Type publisherVoidType = new ParameterizedTypeReference>(){}.getType(); - return AnnotatedElementUtils.isAnnotated(handlerMethod.getMethod(), ResponseBody.class.getName()) && - !handlerMethod.getReturnType().getGenericParameterType().equals(publisherVoidType); + return AnnotatedElementUtils.isAnnotated(handlerMethod.getMethod(), ResponseBody.class.getName()); } return false; } @Override + @SuppressWarnings("unchecked") public Publisher handleResult(ServerHttpRequest request, ServerHttpResponse response, HandlerResult result) { @@ -106,38 +106,27 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered return Publishers.empty(); } - MediaType mediaType = resolveMediaType(request); ResolvableType type = ResolvableType.forMethodParameter(returnType); + MediaType mediaType = resolveMediaType(request); List hints = new ArrayList<>(); hints.add(UTF_8); - MessageToByteEncoder serializer = (MessageToByteEncoder)resolveSerializer(request, type, mediaType, hints.toArray()); - if (serializer != null) { - Publisher elementStream; - - // TODO: Refactor type conversion - if (Promise.class.isAssignableFrom(type.getRawClass())) { - elementStream = ((Promise)value).stream(); - } - else if (Observable.class.isAssignableFrom(type.getRawClass())) { - elementStream = RxJava1Converter.from((Observable) value); - } - else if (Single.class.isAssignableFrom(type.getRawClass())) { - elementStream = RxJava1SingleConverter.from((Single)value); - } - else if (CompletableFuture.class.isAssignableFrom(type.getRawClass())) { - elementStream = CompletableFutureConverter.from((CompletableFuture) value); - } - else if (Publisher.class.isAssignableFrom(type.getRawClass())) { - elementStream = (Publisher)value; - } - else { - elementStream = Publishers.just(value); - } + Publisher elementStream; + ResolvableType elementType; + if (conversionService.canConvert(type.getRawClass(), Publisher.class)) { + elementStream = conversionService.convert(value, Publisher.class); + elementType = type.getGeneric(0); + } + else { + elementStream = Publishers.just(value); + elementType = type; + } + MessageToByteEncoder serializer = (MessageToByteEncoder) resolveSerializer(request, elementType, mediaType, hints.toArray()); + if (serializer != null) { Publisher outputStream = serializer.encode(elementStream, type, mediaType, hints.toArray()); - List> postProcessors = resolvePostProcessors(request, type, mediaType, hints.toArray()); + List> postProcessors = resolvePostProcessors(request, elementType, mediaType, hints.toArray()); for (MessageToByteEncoder postProcessor : postProcessors) { - outputStream = postProcessor.encode(outputStream, type, mediaType, hints.toArray()); + outputStream = postProcessor.encode(outputStream, elementType, mediaType, hints.toArray()); } response.getHeaders().setContentType(mediaType); return response.writeWith(outputStream); diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/ServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/ServerHttpResponse.java index ba7cc6f5db0..1f64f2dd6e0 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/ServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/ServerHttpResponse.java @@ -30,9 +30,18 @@ public interface ServerHttpResponse extends HttpMessage { void setStatusCode(HttpStatus status); + /** + * Write the response headers. This method must be invoked to send responses without body. + * @return A {@code Publisher} used to signal the demand, and receive a notification + * when the handling is complete (success or error) including the flush of the data on the + * network. + */ + Publisher writeHeaders(); + /** * Write the provided reactive stream of bytes to the response body. Most servers - * support multiple {@code writeWith} calls. + * support multiple {@code writeWith} calls. Headers are written automatically + * before the body, so not need to call {@link #writeHeaders()} explicitly. * @param contentPublisher the stream to write in the response body. * @return A {@code Publisher} used to signal the demand, and receive a notification * when the handling is complete (success or error) including the flush of the data on the diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/reactor/ReactorServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/reactor/ReactorServerHttpRequest.java index 6a9d0ed44d1..5f727d9cd94 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/reactor/ReactorServerHttpRequest.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/reactor/ReactorServerHttpRequest.java @@ -15,13 +15,13 @@ */ package org.springframework.reactive.web.http.reactor; -import org.reactivestreams.Publisher; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.reactive.web.http.ServerHttpRequest; import org.springframework.util.Assert; import reactor.io.buffer.Buffer; import reactor.io.net.http.HttpChannel; +import reactor.rx.Stream; import java.net.URI; import java.net.URISyntaxException; @@ -72,7 +72,7 @@ public class ReactorServerHttpRequest implements ServerHttpRequest { } @Override - public Publisher getBody() { + public Stream getBody() { return this.channel.map(Buffer::byteBuffer); } diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/reactor/ReactorServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/reactor/ReactorServerHttpResponse.java index 66761cabd55..7bd45b2786f 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/reactor/ReactorServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/reactor/ReactorServerHttpResponse.java @@ -24,6 +24,7 @@ import reactor.Publishers; import reactor.io.buffer.Buffer; import reactor.io.net.http.HttpChannel; import reactor.io.net.http.model.Status; +import reactor.rx.Stream; import java.nio.ByteBuffer; @@ -57,18 +58,28 @@ public class ReactorServerHttpResponse implements ServerHttpResponse { } @Override - public Publisher writeWith(Publisher contentPublisher) { - writeHeaders(); + public Publisher writeHeaders() { + if (this.headersWritten) { + return Publishers.empty(); + } + applyHeaders(); + return this.channel.writeHeaders(); + } + + @Override + public Stream writeWith(Publisher contentPublisher) { + applyHeaders(); return this.channel.writeWith(Publishers.map(contentPublisher, Buffer::new)); } - private void writeHeaders() { + private void applyHeaders() { if (!this.headersWritten) { for (String name : this.headers.keySet()) { for (String value : this.headers.get(name)) { this.channel.responseHeaders().add(name, value); } } + this.headersWritten = true; } } } diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RxNettyServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RxNettyServerHttpResponse.java index 161589cdcde..1143663e66f 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RxNettyServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RxNettyServerHttpResponse.java @@ -23,6 +23,7 @@ import org.springframework.http.HttpStatus; import org.springframework.reactive.web.http.ServerHttpResponse; import org.springframework.util.Assert; +import reactor.Publishers; import reactor.core.publisher.convert.RxJava1Converter; import reactor.io.buffer.Buffer; import rx.Observable; @@ -59,24 +60,30 @@ public class RxNettyServerHttpResponse implements ServerHttpResponse { return (this.headersWritten ? HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers); } - public Observable writeWith(Observable contentPublisher) { - return this.response.writeBytes(contentPublisher.map(content -> new Buffer(content).asBytes())); + @Override + public Publisher writeHeaders() { + if (this.headersWritten) { + return Publishers.empty(); + } + applyHeaders(); + return RxJava1Converter.from(this.response.sendHeaders()); } @Override public Publisher writeWith(Publisher contentPublisher) { - writeHeaders(); + applyHeaders(); Observable contentObservable = RxJava1Converter.from(contentPublisher).map(content -> new Buffer(content).asBytes()); return RxJava1Converter.from(this.response.writeBytes(contentObservable)); } - private void writeHeaders() { + private void applyHeaders() { if (!this.headersWritten) { for (String name : this.headers.keySet()) { for (String value : this.headers.get(name)) { this.response.addHeader(name, value); } } + this.headersWritten = true; } } } diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/servlet/ServletServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/servlet/ServletServerHttpResponse.java index 37987c36ee4..ff462ec1583 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/servlet/ServletServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/servlet/ServletServerHttpResponse.java @@ -22,6 +22,7 @@ import java.util.Map; import javax.servlet.http.HttpServletResponse; import org.reactivestreams.Publisher; +import reactor.Publishers; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; @@ -61,13 +62,19 @@ public class ServletServerHttpResponse implements ServerHttpResponse { return (this.headersWritten ? HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers); } + @Override + public Publisher writeHeaders() { + applyHeaders(); + return Publishers.empty(); + } + @Override public Publisher writeWith(final Publisher contentPublisher) { - writeHeaders(); + applyHeaders(); return (s -> contentPublisher.subscribe(responseSubscriber)); } - private void writeHeaders() { + private void applyHeaders() { if (!this.headersWritten) { for (Map.Entry> entry : this.headers.entrySet()) { String headerName = entry.getKey(); diff --git a/spring-web-reactive/src/main/resources/log4j.properties b/spring-web-reactive/src/main/resources/log4j.properties index 8e3253db661..b5bf0d4dfb1 100644 --- a/spring-web-reactive/src/main/resources/log4j.properties +++ b/spring-web-reactive/src/main/resources/log4j.properties @@ -2,6 +2,7 @@ log4j.rootCategory=WARN, stdout log4j.logger.org.springframework.reactive=DEBUG log4j.logger.org.springframework.web=DEBUG +log4j.logger.reactor=INFO log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/ByteBufferDecoderTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/ByteBufferDecoderTests.java new file mode 100644 index 00000000000..96fa5097458 --- /dev/null +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/ByteBufferDecoderTests.java @@ -0,0 +1,58 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.reactive.codec.decoder; + +import java.nio.ByteBuffer; +import java.util.List; + +import static org.junit.Assert.*; +import org.junit.Test; +import org.reactivestreams.Publisher; +import reactor.io.buffer.Buffer; +import reactor.rx.Stream; +import reactor.rx.Streams; + +import org.springframework.core.ResolvableType; +import org.springframework.http.MediaType; + +/** + * @author Sebastien Deleuze + */ +public class ByteBufferDecoderTests { + + private final ByteBufferDecoder decoder = new ByteBufferDecoder(); + + @Test + public void canDecode() { + assertTrue(decoder.canDecode(ResolvableType.forClass(ByteBuffer.class), MediaType.TEXT_PLAIN)); + assertFalse(decoder.canDecode(ResolvableType.forClass(Integer.class), MediaType.TEXT_PLAIN)); + assertTrue(decoder.canDecode(ResolvableType.forClass(ByteBuffer.class), MediaType.APPLICATION_JSON)); + } + + @Test + public void decode() throws InterruptedException { + ByteBuffer fooBuffer = Buffer.wrap("foo").byteBuffer(); + ByteBuffer barBuffer = Buffer.wrap("bar").byteBuffer(); + Stream source = Streams.just(fooBuffer, barBuffer); + List results = Streams.wrap(decoder.decode(source, + ResolvableType.forClassWithGenerics(Publisher.class, ByteBuffer.class), null)).toList().await(); + assertEquals(2, results.size()); + assertEquals(fooBuffer, results.get(0)); + assertEquals(barBuffer, results.get(1)); + } + +} diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/JsonObjectDecoderTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/JsonObjectDecoderTests.java index d683796e6d1..52bc2253c69 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/JsonObjectDecoderTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/JsonObjectDecoderTests.java @@ -31,9 +31,35 @@ import reactor.rx.Streams; */ public class JsonObjectDecoderTests { + @Test + public void decodeSingleChunkToJsonObject() throws InterruptedException { + JsonObjectDecoder decoder = new JsonObjectDecoder(); + Stream source = Streams.just(Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer()); + List results = Streams.wrap(decoder.decode(source, null, null)).map(chunk -> { + byte[] b = new byte[chunk.remaining()]; + chunk.get(b); + return new String(b, StandardCharsets.UTF_8); + }).toList().await(); + assertEquals(1, results.size()); + assertEquals("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}", results.get(0)); + } + + @Test + public void decodeMultipleChunksToJsonObject() throws InterruptedException { + JsonObjectDecoder decoder = new JsonObjectDecoder(); + Stream source = Streams.just(Buffer.wrap("{\"foo\": \"foofoo\"").byteBuffer(), Buffer.wrap(", \"bar\": \"barbar\"}").byteBuffer()); + List results = Streams.wrap(decoder.decode(source, null, null)).map(chunk -> { + byte[] b = new byte[chunk.remaining()]; + chunk.get(b); + return new String(b, StandardCharsets.UTF_8); + }).toList().await(); + assertEquals(1, results.size()); + assertEquals("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}", results.get(0)); + } + @Test public void decodeSingleChunkToArray() throws InterruptedException { - JsonObjectDecoder decoder = new JsonObjectDecoder(true); + JsonObjectDecoder decoder = new JsonObjectDecoder(); Stream source = Streams.just(Buffer.wrap("[{\"foo\": \"foofoo\", \"bar\": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]").byteBuffer()); List results = Streams.wrap(decoder.decode(source, null, null)).map(chunk -> { byte[] b = new byte[chunk.remaining()]; @@ -47,7 +73,7 @@ public class JsonObjectDecoderTests { @Test public void decodeMultipleChunksToArray() throws InterruptedException { - JsonObjectDecoder decoder = new JsonObjectDecoder(true); + JsonObjectDecoder decoder = new JsonObjectDecoder(); Stream source = Streams.just(Buffer.wrap("[{\"foo\": \"foofoo\", \"bar\"").byteBuffer(), Buffer.wrap(": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]").byteBuffer()); List results = Streams.wrap(decoder.decode(source, null, null)).map(chunk -> { byte[] b = new byte[chunk.remaining()]; diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/StringDecoderTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/StringDecoderTests.java index ce707c40c9c..cc9dc6c14c5 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/StringDecoderTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/decoder/StringDecoderTests.java @@ -23,13 +23,13 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import org.junit.Test; +import org.reactivestreams.Publisher; import reactor.io.buffer.Buffer; import reactor.rx.Stream; import reactor.rx.Streams; import org.springframework.core.ResolvableType; import org.springframework.http.MediaType; -import org.springframework.reactive.codec.Pojo; /** * @author Sebastien Deleuze @@ -40,15 +40,16 @@ public class StringDecoderTests { @Test public void canDecode() { - assertTrue(decoder.canDecode(null, MediaType.TEXT_PLAIN)); - assertFalse(decoder.canDecode(null, MediaType.APPLICATION_JSON)); + assertTrue(decoder.canDecode(ResolvableType.forClass(String.class), MediaType.TEXT_PLAIN)); + assertFalse(decoder.canDecode(ResolvableType.forClass(Integer.class), MediaType.TEXT_PLAIN)); + assertFalse(decoder.canDecode(ResolvableType.forClass(String.class), MediaType.APPLICATION_JSON)); } @Test public void decode() throws InterruptedException { Stream source = Streams.just(Buffer.wrap("foo").byteBuffer(), Buffer.wrap("bar").byteBuffer()); - List results = Streams.wrap(decoder.decode(source, ResolvableType.forClass(Pojo.class), null)) - .toList().await(); + List results = Streams.wrap(decoder.decode(source, + ResolvableType.forClassWithGenerics(Publisher.class, String.class), null)).toList().await(); assertEquals(2, results.size()); assertEquals("foo", results.get(0)); assertEquals("bar", results.get(1)); diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/ByteBufferDecoderEncoder.java b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/ByteBufferDecoderEncoder.java new file mode 100644 index 00000000000..3fac6e58a95 --- /dev/null +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/ByteBufferDecoderEncoder.java @@ -0,0 +1,58 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.reactive.codec.encoder; + +import java.nio.ByteBuffer; +import java.util.List; + +import static org.junit.Assert.*; +import org.junit.Test; +import org.reactivestreams.Publisher; +import reactor.io.buffer.Buffer; +import reactor.rx.Stream; +import reactor.rx.Streams; + +import org.springframework.core.ResolvableType; +import org.springframework.http.MediaType; + +/** + * @author Sebastien Deleuze + */ +public class ByteBufferDecoderEncoder { + + private final ByteBufferEncoder encoder = new ByteBufferEncoder(); + + @Test + public void canDecode() { + assertTrue(encoder.canEncode(ResolvableType.forClass(ByteBuffer.class), MediaType.TEXT_PLAIN)); + assertFalse(encoder.canEncode(ResolvableType.forClass(Integer.class), MediaType.TEXT_PLAIN)); + assertTrue(encoder.canEncode(ResolvableType.forClass(ByteBuffer.class), MediaType.APPLICATION_JSON)); + } + + @Test + public void decode() throws InterruptedException { + ByteBuffer fooBuffer = Buffer.wrap("foo").byteBuffer(); + ByteBuffer barBuffer = Buffer.wrap("bar").byteBuffer(); + Stream source = Streams.just(fooBuffer, barBuffer); + List results = Streams.wrap(encoder.encode(source, + ResolvableType.forClassWithGenerics(Publisher.class, ByteBuffer.class), null)).toList().await(); + assertEquals(2, results.size()); + assertEquals(fooBuffer, results.get(0)); + assertEquals(barBuffer, results.get(1)); + } + +} diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/JsonObjectEncoderTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/JsonObjectEncoderTests.java index 47726014f05..9f4eed32b9d 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/JsonObjectEncoderTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/JsonObjectEncoderTests.java @@ -32,9 +32,24 @@ import reactor.rx.Streams; public class JsonObjectEncoderTests { @Test - public void encodeToArray() throws InterruptedException { + public void encodeSingleElement() throws InterruptedException { JsonObjectEncoder encoder = new JsonObjectEncoder(); - Stream source = Streams.just(Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer(), Buffer.wrap("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}").byteBuffer()); + Stream source = Streams.just(Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer()); + List results = Streams.wrap(encoder.encode(source, null, null)).map(chunk -> { + byte[] b = new byte[chunk.remaining()]; + chunk.get(b); + return new String(b, StandardCharsets.UTF_8); + }).toList().await(); + String result = String.join("", results); + assertEquals("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}", result); + } + + @Test + public void encodeTwoElements() throws InterruptedException { + JsonObjectEncoder encoder = new JsonObjectEncoder(); + Stream source = Streams.just( + Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer(), + Buffer.wrap("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}").byteBuffer()); List results = Streams.wrap(encoder.encode(source, null, null)).map(chunk -> { byte[] b = new byte[chunk.remaining()]; chunk.get(b); @@ -44,4 +59,21 @@ public class JsonObjectEncoderTests { assertEquals("[{\"foo\": \"foofoo\", \"bar\": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}]", result); } + @Test + public void encodeThreeElements() throws InterruptedException { + JsonObjectEncoder encoder = new JsonObjectEncoder(); + Stream source = Streams.just( + Buffer.wrap("{\"foo\": \"foofoo\", \"bar\": \"barbar\"}").byteBuffer(), + Buffer.wrap("{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"}").byteBuffer(), + Buffer.wrap("{\"foo\": \"foofoofoofoo\", \"bar\": \"barbarbarbar\"}").byteBuffer() + ); + List results = Streams.wrap(encoder.encode(source, null, null)).map(chunk -> { + byte[] b = new byte[chunk.remaining()]; + chunk.get(b); + return new String(b, StandardCharsets.UTF_8); + }).toList().await(); + String result = String.join("", results); + assertEquals("[{\"foo\": \"foofoo\", \"bar\": \"barbar\"},{\"foo\": \"foofoofoo\", \"bar\": \"barbarbar\"},{\"foo\": \"foofoofoofoo\", \"bar\": \"barbarbarbar\"}]", result); + } + } diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/StringEncoderTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/StringEncoderTests.java index ce67e5e29a2..151c4f85626 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/StringEncoderTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/codec/encoder/StringEncoderTests.java @@ -23,8 +23,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import org.junit.Test; +import org.reactivestreams.Publisher; import reactor.rx.Streams; +import org.springframework.core.ResolvableType; import org.springframework.http.MediaType; /** @@ -36,8 +38,9 @@ public class StringEncoderTests { @Test public void canWrite() { - assertTrue(encoder.canEncode(null, MediaType.TEXT_PLAIN)); - assertFalse(encoder.canEncode(null, MediaType.APPLICATION_JSON)); + assertTrue(encoder.canEncode(ResolvableType.forClass(String.class), MediaType.TEXT_PLAIN)); + assertFalse(encoder.canEncode(ResolvableType.forClass(Integer.class), MediaType.TEXT_PLAIN)); + assertFalse(encoder.canEncode(ResolvableType.forClass(String.class), MediaType.APPLICATION_JSON)); } @Test diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/web/dispatch/method/annotation/RequestMappingIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/web/dispatch/method/annotation/RequestMappingIntegrationTests.java index 8bf6234fbfc..6aa048e2597 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/web/dispatch/method/annotation/RequestMappingIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/web/dispatch/method/annotation/RequestMappingIntegrationTests.java @@ -17,13 +17,20 @@ package org.springframework.reactive.web.dispatch.method.annotation; import java.net.URI; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import static org.junit.Assert.assertEquals; + import org.junit.Test; import org.reactivestreams.Publisher; + +import org.springframework.beans.factory.support.DefaultListableBeanFactory; +import org.springframework.core.ResolvableType; +import reactor.io.buffer.Buffer; import reactor.rx.Promise; import reactor.rx.Promises; import reactor.rx.Stream; @@ -32,13 +39,16 @@ import rx.Observable; import rx.Single; import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.RequestEntity; import org.springframework.http.ResponseEntity; +import org.springframework.reactive.codec.encoder.ByteBufferEncoder; import org.springframework.reactive.codec.encoder.JacksonJsonEncoder; import org.springframework.reactive.codec.encoder.JsonObjectEncoder; import org.springframework.reactive.codec.encoder.StringEncoder; import org.springframework.reactive.web.dispatch.DispatcherHandler; +import org.springframework.reactive.web.dispatch.SimpleHandlerResultHandler; import org.springframework.reactive.web.http.AbstractHttpHandlerIntegrationTests; import org.springframework.reactive.web.http.HttpHandler; import org.springframework.stereotype.Controller; @@ -52,18 +62,25 @@ import org.springframework.web.context.support.StaticWebApplicationContext; /** * @author Rossen Stoyanchev * @author Sebastien Deleuze + * @author Stephane Maldini */ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrationTests { + private TestController controller; + @Override protected HttpHandler createHttpHandler() { StaticWebApplicationContext wac = new StaticWebApplicationContext(); + DefaultListableBeanFactory factory = wac.getDefaultListableBeanFactory(); wac.registerSingleton("handlerMapping", RequestMappingHandlerMapping.class); wac.registerSingleton("handlerAdapter", RequestMappingHandlerAdapter.class); - wac.getDefaultListableBeanFactory().registerSingleton("responseBodyResultHandler", - new ResponseBodyResultHandler(Arrays.asList(new StringEncoder(), new JacksonJsonEncoder()), Arrays.asList(new JsonObjectEncoder()))); - wac.registerSingleton("controller", TestController.class); + factory.registerSingleton("responseBodyResultHandler", + new ResponseBodyResultHandler(Arrays.asList(new ByteBufferEncoder(), new StringEncoder(), new JacksonJsonEncoder()), Arrays.asList + (new JsonObjectEncoder()))); + wac.registerSingleton("simpleResultHandler", SimpleHandlerResultHandler.class); + this.controller = new TestController(); + factory.registerSingleton("controller", this.controller); wac.refresh(); DispatcherHandler dispatcherHandler = new DispatcherHandler(); @@ -83,6 +100,30 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati assertEquals("Hello George!", response.getBody()); } + @Test + public void rawPojoResponse() throws Exception { + + RestTemplate restTemplate = new RestTemplate(); + + URI url = new URI("http://localhost:" + port + "/raw"); + RequestEntity request = RequestEntity.get(url).build(); + Person person = restTemplate.exchange(request, Person.class).getBody(); + + assertEquals(new Person("Robert"), person); + } + + @Test + public void rawHelloResponse() throws Exception { + + RestTemplate restTemplate = new RestTemplate(); + + URI url = new URI("http://localhost:" + port + "/raw-observable"); + RequestEntity request = RequestEntity.get(url).build(); + ResponseEntity response = restTemplate.exchange(request, String.class); + + assertEquals("Hello!", response.getBody()); + } + @Test public void serializeAsPojo() throws Exception { serializeAsPojo("http://localhost:" + port + "/person"); @@ -153,6 +194,19 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati capitalizePojo("http://localhost:" + port + "/promise-capitalize"); } + @Test + public void create() throws Exception { + RestTemplate restTemplate = new RestTemplate(); + + URI url = new URI("http://localhost:" + port + "/create"); + List persons = Arrays.asList(new Person("Robert"), new Person("Marie")); + RequestEntity> request = RequestEntity.post(url).contentType(MediaType.APPLICATION_JSON).body(persons); + ResponseEntity response = restTemplate.exchange(request, Void.class); + + assertEquals(HttpStatus.OK, response.getStatusCode()); + assertEquals(2, this.controller.persons.size()); + } + public void serializeAsPojo(String requestUrl) throws Exception { RestTemplate restTemplate = new RestTemplate(); @@ -164,6 +218,17 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati assertEquals(new Person("Robert"), response.getBody()); } + public void postAsPojo(String requestUrl) throws Exception { + RestTemplate restTemplate = new RestTemplate(); + + URI url = new URI(requestUrl); + RequestEntity request = RequestEntity.post(url).accept(MediaType.APPLICATION_JSON).body(new Person + ("Robert")); + ResponseEntity response = restTemplate.exchange(request, Person.class); + + assertEquals(new Person("Robert"), response.getBody()); + } + public void serializeAsCollection(String requestUrl) throws Exception { RestTemplate restTemplate = new RestTemplate(); @@ -214,6 +279,8 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati @SuppressWarnings("unused") private static class TestController { + final List persons = new ArrayList<>(); + @RequestMapping("/param") @ResponseBody public Publisher handleWithParam(@RequestParam String name) { @@ -232,6 +299,19 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati return CompletableFuture.completedFuture(new Person("Robert")); } + @RequestMapping("/raw") + @ResponseBody + public Publisher rawResponseBody() { + JacksonJsonEncoder encoder = new JacksonJsonEncoder(); + return encoder.encode(Streams.just(new Person("Robert")), ResolvableType.forClass(Person.class), MediaType.APPLICATION_JSON); + } + + @RequestMapping("/raw-observable") + @ResponseBody + public Observable rawObservableResponseBody() { + return Observable.just(Buffer.wrap("Hello!").byteBuffer()); + } + @RequestMapping("/single") @ResponseBody public Single singleResponseBody() { @@ -322,6 +402,13 @@ public class RequestMappingIntegrationTests extends AbstractHttpHandlerIntegrati }); } + @RequestMapping("/create") + public Publisher create(@RequestBody Stream personStream) { + return personStream.toList().onSuccess(personList -> persons.addAll(personList)).after(); + } + + //TODO add mixed and T request mappings tests + } private static class Person { diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/web/dispatch/method/annotation/ResponseBodyResultHandlerTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/web/dispatch/method/annotation/ResponseBodyResultHandlerTests.java index b4500ceed65..4cdd6204322 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/web/dispatch/method/annotation/ResponseBodyResultHandlerTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/web/dispatch/method/annotation/ResponseBodyResultHandlerTests.java @@ -44,7 +44,7 @@ public class ResponseBodyResultHandlerTests { assertTrue(resultHandler.supports(new HandlerResult(publisherStringMethod, null))); HandlerMethod publisherVoidMethod = new HandlerMethod(controller, TestController.class.getMethod("publisherVoid")); - assertFalse(resultHandler.supports(new HandlerResult(publisherVoidMethod, null))); + assertTrue(resultHandler.supports(new HandlerResult(publisherVoidMethod, null))); }