Browse Source
This commit introduces the following changes:
- Publisher -> Observable/Stream/etc. conversion is now managed
in a dedicated ConversionService instead of directly in
RequestBodyArgumentResolver and ResponseBodyResultHandler
- More isolated logic that decides if the stream should be
serialized as a JSON array or not
- Publisher<ByteBuffer> are now handled by regular
ByteBufferEncoder and ByteBufferDecoder
- Handle Publisher<Void> return value properly
- Ensure that the headers are properly written even for response
without body
- Improve JsonObjectEncoder to autodetect JSON arrays
pull/1111/head
32 changed files with 759 additions and 203 deletions
@ -0,0 +1,53 @@
@@ -0,0 +1,53 @@
|
||||
/* |
||||
* Copyright 2002-2015 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.core.convert.support; |
||||
|
||||
import java.util.LinkedHashSet; |
||||
import java.util.Set; |
||||
import java.util.concurrent.CompletableFuture; |
||||
|
||||
import org.reactivestreams.Publisher; |
||||
|
||||
import org.springframework.core.convert.TypeDescriptor; |
||||
import org.springframework.core.convert.converter.GenericConverter; |
||||
|
||||
/** |
||||
* @author Sebastien Deleuze |
||||
*/ |
||||
public class ReactiveStreamsToCompletableFutureConverter implements GenericConverter { |
||||
|
||||
@Override |
||||
public Set<ConvertiblePair> getConvertibleTypes() { |
||||
Set<GenericConverter.ConvertiblePair> convertibleTypes = new LinkedHashSet<>(); |
||||
convertibleTypes.add(new GenericConverter.ConvertiblePair(Publisher.class, CompletableFuture.class)); |
||||
convertibleTypes.add(new GenericConverter.ConvertiblePair(CompletableFuture.class, Publisher.class)); |
||||
return convertibleTypes; |
||||
} |
||||
|
||||
@Override |
||||
public Object convert(Object source, TypeDescriptor sourceType, TypeDescriptor targetType) { |
||||
if (source != null) { |
||||
if (CompletableFuture.class.isAssignableFrom(source.getClass())) { |
||||
return reactor.core.publisher.convert.CompletableFutureConverter.from((CompletableFuture)source); |
||||
} else if (CompletableFuture.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) { |
||||
return reactor.core.publisher.convert.CompletableFutureConverter.fromSingle((Publisher)source); |
||||
} |
||||
} |
||||
return null; |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,62 @@
@@ -0,0 +1,62 @@
|
||||
/* |
||||
* Copyright (c) 2011-2015 Pivotal Software Inc, All Rights Reserved. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.core.convert.support; |
||||
|
||||
import java.util.LinkedHashSet; |
||||
import java.util.Set; |
||||
|
||||
import org.reactivestreams.Publisher; |
||||
import reactor.rx.Promise; |
||||
import reactor.rx.Stream; |
||||
import reactor.rx.Streams; |
||||
|
||||
import org.springframework.core.convert.TypeDescriptor; |
||||
import org.springframework.core.convert.converter.GenericConverter; |
||||
|
||||
/** |
||||
* @author Stephane Maldini |
||||
* @author Sebastien Deleuze |
||||
*/ |
||||
public final class ReactiveStreamsToReactorConverter implements GenericConverter { |
||||
|
||||
@Override |
||||
public Set<GenericConverter.ConvertiblePair> getConvertibleTypes() { |
||||
Set<GenericConverter.ConvertiblePair> convertibleTypes = new LinkedHashSet<>(); |
||||
convertibleTypes.add(new GenericConverter.ConvertiblePair(Publisher.class, Stream.class)); |
||||
convertibleTypes.add(new GenericConverter.ConvertiblePair(Stream.class, Publisher.class)); |
||||
convertibleTypes.add(new GenericConverter.ConvertiblePair(Publisher.class, Promise.class)); |
||||
convertibleTypes.add(new GenericConverter.ConvertiblePair(Promise.class, Publisher.class)); |
||||
return convertibleTypes; |
||||
} |
||||
|
||||
@Override |
||||
public Object convert(Object source, TypeDescriptor sourceType, TypeDescriptor targetType) { |
||||
if (source != null) { |
||||
if (Stream.class.isAssignableFrom(source.getClass())) { |
||||
return source; |
||||
} else if (Stream.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) { |
||||
return Streams.wrap((Publisher)source); |
||||
} else if (Promise.class.isAssignableFrom(source.getClass())) { |
||||
return ((Promise<?>)source); |
||||
} else if (Promise.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) { |
||||
return Streams.wrap((Publisher)source).next(); |
||||
} |
||||
} |
||||
return null; |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,65 @@
@@ -0,0 +1,65 @@
|
||||
/* |
||||
* Copyright (c) 2011-2015 Pivotal Software Inc, All Rights Reserved. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.core.convert.support; |
||||
|
||||
import java.util.LinkedHashSet; |
||||
import java.util.Set; |
||||
|
||||
import org.reactivestreams.Publisher; |
||||
import reactor.core.publisher.convert.RxJava1Converter; |
||||
import rx.Observable; |
||||
import rx.Single; |
||||
|
||||
import org.springframework.core.convert.TypeDescriptor; |
||||
import org.springframework.core.convert.converter.GenericConverter; |
||||
|
||||
/** |
||||
* TODO Avoid classpath exception for older RxJava1 version without Single type |
||||
* @author Stephane Maldini |
||||
* @author Sebastien Deleuze |
||||
*/ |
||||
public final class ReactiveStreamsToRxJava1Converter implements GenericConverter { |
||||
|
||||
@Override |
||||
public Set<GenericConverter.ConvertiblePair> getConvertibleTypes() { |
||||
Set<GenericConverter.ConvertiblePair> convertibleTypes = new LinkedHashSet<>(); |
||||
convertibleTypes.add(new GenericConverter.ConvertiblePair(Publisher.class, Observable.class)); |
||||
convertibleTypes.add(new GenericConverter.ConvertiblePair(Observable.class, Publisher.class)); |
||||
convertibleTypes.add(new GenericConverter.ConvertiblePair(Publisher.class, Single.class)); |
||||
convertibleTypes.add(new GenericConverter.ConvertiblePair(Single.class, Publisher.class)); |
||||
return convertibleTypes; |
||||
} |
||||
|
||||
@Override |
||||
public Object convert(Object source, TypeDescriptor sourceType, TypeDescriptor targetType) { |
||||
if (source != null) { |
||||
if (Observable.class.isAssignableFrom(source.getClass())) { |
||||
return RxJava1Converter.from((Observable) source); |
||||
} |
||||
else if (Observable.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) { |
||||
return RxJava1Converter.from((Publisher)source); |
||||
} |
||||
else if (Single.class.isAssignableFrom(source.getClass())) { |
||||
return reactor.core.publisher.convert.RxJava1SingleConverter.from((Single) source); |
||||
} else if (Single.class.isAssignableFrom(targetType.getResolvableType().getRawClass())) { |
||||
return reactor.core.publisher.convert.RxJava1SingleConverter.from((Publisher)source); |
||||
} |
||||
} |
||||
return null; |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,40 @@
@@ -0,0 +1,40 @@
|
||||
/* |
||||
* Copyright 2002-2015 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.reactive.codec.decoder; |
||||
|
||||
import java.nio.ByteBuffer; |
||||
|
||||
import org.reactivestreams.Publisher; |
||||
|
||||
import org.springframework.core.ResolvableType; |
||||
import org.springframework.http.MediaType; |
||||
|
||||
/** |
||||
* @author Sebastien Deleuze |
||||
*/ |
||||
public class ByteBufferDecoder implements ByteToMessageDecoder<ByteBuffer> { |
||||
|
||||
@Override |
||||
public boolean canDecode(ResolvableType type, MediaType mediaType, Object... hints) { |
||||
return ByteBuffer.class.isAssignableFrom(type.getRawClass()); |
||||
} |
||||
|
||||
@Override |
||||
public Publisher<ByteBuffer> decode(Publisher<ByteBuffer> inputStream, ResolvableType type, MediaType mediaType, Object... hints) { |
||||
return inputStream; |
||||
} |
||||
} |
||||
@ -0,0 +1,41 @@
@@ -0,0 +1,41 @@
|
||||
/* |
||||
* Copyright 2002-2015 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.reactive.codec.encoder; |
||||
|
||||
import java.nio.ByteBuffer; |
||||
|
||||
import org.reactivestreams.Publisher; |
||||
|
||||
import org.springframework.core.ResolvableType; |
||||
import org.springframework.http.MediaType; |
||||
|
||||
/** |
||||
* @author Sebastien Deleuze |
||||
*/ |
||||
public class ByteBufferEncoder implements MessageToByteEncoder<ByteBuffer> { |
||||
|
||||
@Override |
||||
public boolean canEncode(ResolvableType type, MediaType mediaType, Object... hints) { |
||||
return ByteBuffer.class.isAssignableFrom(type.getRawClass()); |
||||
} |
||||
|
||||
@Override |
||||
public Publisher<ByteBuffer> encode(Publisher<? extends ByteBuffer> messageStream, ResolvableType type, MediaType mediaType, Object... hints) { |
||||
return (Publisher<ByteBuffer>)messageStream; |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,47 @@
@@ -0,0 +1,47 @@
|
||||
/* |
||||
* Copyright (c) 2011-2015 Pivotal Software Inc, All Rights Reserved. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.reactive.web.dispatch.method.annotation; |
||||
|
||||
import reactor.core.publisher.convert.DependencyUtils; |
||||
|
||||
import org.springframework.core.convert.converter.ConverterRegistry; |
||||
import org.springframework.core.convert.support.GenericConversionService; |
||||
import org.springframework.core.convert.support.ReactiveStreamsToCompletableFutureConverter; |
||||
import org.springframework.core.convert.support.ReactiveStreamsToReactorConverter; |
||||
import org.springframework.core.convert.support.ReactiveStreamsToRxJava1Converter; |
||||
|
||||
/** |
||||
* TODO temporary class designed to be replaced by org.springframework.core.convert.support.DefaultConversionService when it will contain Reactive Streams converter |
||||
* @author Sebastien Deleuze |
||||
*/ |
||||
class DefaultConversionService extends GenericConversionService { |
||||
|
||||
public DefaultConversionService() { |
||||
addDefaultConverters(this); |
||||
} |
||||
|
||||
public static void addDefaultConverters(ConverterRegistry converterRegistry) { |
||||
converterRegistry.addConverter(new ReactiveStreamsToCompletableFutureConverter()); |
||||
if (DependencyUtils.hasReactorStream()) { |
||||
converterRegistry.addConverter(new ReactiveStreamsToReactorConverter()); |
||||
} |
||||
if (DependencyUtils.hasRxJava1()) { |
||||
converterRegistry.addConverter(new ReactiveStreamsToRxJava1Converter()); |
||||
} |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,58 @@
@@ -0,0 +1,58 @@
|
||||
/* |
||||
* Copyright 2002-2015 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.reactive.codec.decoder; |
||||
|
||||
import java.nio.ByteBuffer; |
||||
import java.util.List; |
||||
|
||||
import static org.junit.Assert.*; |
||||
import org.junit.Test; |
||||
import org.reactivestreams.Publisher; |
||||
import reactor.io.buffer.Buffer; |
||||
import reactor.rx.Stream; |
||||
import reactor.rx.Streams; |
||||
|
||||
import org.springframework.core.ResolvableType; |
||||
import org.springframework.http.MediaType; |
||||
|
||||
/** |
||||
* @author Sebastien Deleuze |
||||
*/ |
||||
public class ByteBufferDecoderTests { |
||||
|
||||
private final ByteBufferDecoder decoder = new ByteBufferDecoder(); |
||||
|
||||
@Test |
||||
public void canDecode() { |
||||
assertTrue(decoder.canDecode(ResolvableType.forClass(ByteBuffer.class), MediaType.TEXT_PLAIN)); |
||||
assertFalse(decoder.canDecode(ResolvableType.forClass(Integer.class), MediaType.TEXT_PLAIN)); |
||||
assertTrue(decoder.canDecode(ResolvableType.forClass(ByteBuffer.class), MediaType.APPLICATION_JSON)); |
||||
} |
||||
|
||||
@Test |
||||
public void decode() throws InterruptedException { |
||||
ByteBuffer fooBuffer = Buffer.wrap("foo").byteBuffer(); |
||||
ByteBuffer barBuffer = Buffer.wrap("bar").byteBuffer(); |
||||
Stream<ByteBuffer> source = Streams.just(fooBuffer, barBuffer); |
||||
List<ByteBuffer> results = Streams.wrap(decoder.decode(source, |
||||
ResolvableType.forClassWithGenerics(Publisher.class, ByteBuffer.class), null)).toList().await(); |
||||
assertEquals(2, results.size()); |
||||
assertEquals(fooBuffer, results.get(0)); |
||||
assertEquals(barBuffer, results.get(1)); |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,58 @@
@@ -0,0 +1,58 @@
|
||||
/* |
||||
* Copyright 2002-2015 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.reactive.codec.encoder; |
||||
|
||||
import java.nio.ByteBuffer; |
||||
import java.util.List; |
||||
|
||||
import static org.junit.Assert.*; |
||||
import org.junit.Test; |
||||
import org.reactivestreams.Publisher; |
||||
import reactor.io.buffer.Buffer; |
||||
import reactor.rx.Stream; |
||||
import reactor.rx.Streams; |
||||
|
||||
import org.springframework.core.ResolvableType; |
||||
import org.springframework.http.MediaType; |
||||
|
||||
/** |
||||
* @author Sebastien Deleuze |
||||
*/ |
||||
public class ByteBufferDecoderEncoder { |
||||
|
||||
private final ByteBufferEncoder encoder = new ByteBufferEncoder(); |
||||
|
||||
@Test |
||||
public void canDecode() { |
||||
assertTrue(encoder.canEncode(ResolvableType.forClass(ByteBuffer.class), MediaType.TEXT_PLAIN)); |
||||
assertFalse(encoder.canEncode(ResolvableType.forClass(Integer.class), MediaType.TEXT_PLAIN)); |
||||
assertTrue(encoder.canEncode(ResolvableType.forClass(ByteBuffer.class), MediaType.APPLICATION_JSON)); |
||||
} |
||||
|
||||
@Test |
||||
public void decode() throws InterruptedException { |
||||
ByteBuffer fooBuffer = Buffer.wrap("foo").byteBuffer(); |
||||
ByteBuffer barBuffer = Buffer.wrap("bar").byteBuffer(); |
||||
Stream<ByteBuffer> source = Streams.just(fooBuffer, barBuffer); |
||||
List<ByteBuffer> results = Streams.wrap(encoder.encode(source, |
||||
ResolvableType.forClassWithGenerics(Publisher.class, ByteBuffer.class), null)).toList().await(); |
||||
assertEquals(2, results.size()); |
||||
assertEquals(fooBuffer, results.get(0)); |
||||
assertEquals(barBuffer, results.get(1)); |
||||
} |
||||
|
||||
} |
||||
Loading…
Reference in new issue