diff --git a/spring-web-reactive/src/main/java/org/springframework/http/ExtendedHttpHeaders.java b/spring-web-reactive/src/main/java/org/springframework/http/ExtendedHttpHeaders.java
index ade3903d893..f5fa19922bf 100644
--- a/spring-web-reactive/src/main/java/org/springframework/http/ExtendedHttpHeaders.java
+++ b/spring-web-reactive/src/main/java/org/springframework/http/ExtendedHttpHeaders.java
@@ -21,10 +21,11 @@ import java.util.List;
import java.util.Map;
/**
- * Extension of HttpHeaders (to be merged into HttpHeaders) that allows the
- * registration of {@link HeaderChangeListener}. For use with HTTP response
- * implementations that can keep track of changes made headers and keep the
- * underlying server headers always in sync.
+ * Variant of HttpHeaders (to be merged into HttpHeaders) that supports the
+ * registration of {@link HeaderChangeListener}s.
+ *
+ *
For use with HTTP server response implementations that wish to propagate
+ * header header changes to the underlying runtime as they occur.
*
* @author Rossen Stoyanchev
*/
@@ -33,7 +34,10 @@ public class ExtendedHttpHeaders extends HttpHeaders {
private final List listeners = new ArrayList<>(1);
- public void registerChangeListener(HeaderChangeListener listener) {
+ public ExtendedHttpHeaders() {
+ }
+
+ public ExtendedHttpHeaders(HeaderChangeListener listener) {
this.listeners.add(listener);
}
@@ -48,7 +52,7 @@ public class ExtendedHttpHeaders extends HttpHeaders {
@Override
public void set(String name, String value) {
- List values = new LinkedList();
+ List values = new LinkedList<>();
values.add(value);
put(name, values);
}
diff --git a/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpInputMessage.java b/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpInputMessage.java
index 7870571b4ad..3cff8d95530 100644
--- a/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpInputMessage.java
+++ b/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpInputMessage.java
@@ -21,11 +21,9 @@ import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
/**
- * Represents a "reactive" HTTP input message, consisting of
- * {@linkplain #getHeaders() headers} and a readable
- * {@linkplain #getBody() streaming body }.
+ * An "reactive" HTTP input message that exposes the input as {@link Publisher}.
*
- * Typically implemented by an HTTP request on the server-side, or a response
+ *
Typically implemented by an HTTP request on the server-side or a response
* on the client-side.
*
* @author Arjen Poutsma
@@ -33,8 +31,8 @@ import org.reactivestreams.Publisher;
public interface ReactiveHttpInputMessage extends HttpMessage {
/**
- * Return the body of the message as an publisher of {@code ByteBuffer}s.
- * @return the body
+ * Return the body of the message as a {@link Publisher}.
+ * @return the body content publisher
*/
Publisher getBody();
diff --git a/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java b/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java
index 33df2dcdeea..83b3507df4b 100644
--- a/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java
+++ b/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java
@@ -21,11 +21,9 @@ import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
/**
- * Represents a "reactive" HTTP output message, consisting of
- * {@linkplain #getHeaders() headers} and the capability to add a
- * {@linkplain #setBody(Publisher) body}.
+ * A "reactive" HTTP output message that accepts output as a {@link Publisher}.
*
- * Typically implemented by an HTTP request on the client-side, or a response
+ *
Typically implemented by an HTTP request on the client-side or a response
* on the server-side.
*
* @author Arjen Poutsma
@@ -33,12 +31,11 @@ import org.reactivestreams.Publisher;
public interface ReactiveHttpOutputMessage extends HttpMessage {
/**
- * Sets the body of this message to the given publisher of {@link ByteBuffer}s.
- * The publisher will be used to write to the underlying HTTP layer with
- * asynchronously, given pull demand by this layer.
+ * Set the body of the message to the given {@link Publisher} which will be
+ * used to write to the underlying HTTP layer.
*
- * @param body the body to use
- * @return a publisher that indicates completion
+ * @param body the body content publisher
+ * @return a publisher that indicates completion or error.
*/
Publisher setBody(Publisher body);
diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java
index 343ec20f24e..493e37df65d 100644
--- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java
+++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java
@@ -26,36 +26,30 @@ import reactor.io.net.http.HttpChannel;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
-import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.util.Assert;
/**
+ * Adapt {@link ServerHttpRequest} to the Reactor Net {@link HttpChannel}.
+ *
* @author Stephane Maldini
*/
public class ReactorServerHttpRequest implements ServerHttpRequest {
private final HttpChannel channel;
+ private URI uri;
+
private HttpHeaders headers;
public ReactorServerHttpRequest(HttpChannel request) {
- Assert.notNull("'request', request must not be null.");
+ Assert.notNull("'request' must not be null.");
this.channel = request;
}
- @Override
- public HttpHeaders getHeaders() {
- if (this.headers == null) {
- this.headers = new HttpHeaders();
- for (String name : this.channel.headers().names()) {
- for (String value : this.channel.headers().getAll(name)) {
- this.headers.add(name, value);
- }
- }
- }
- return this.headers;
+ public HttpChannel getReactorChannel() {
+ return this.channel;
}
@Override
@@ -65,12 +59,26 @@ public class ReactorServerHttpRequest implements ServerHttpRequest {
@Override
public URI getURI() {
- try {
- return new URI(this.channel.uri());
- } catch (URISyntaxException ex) {
- throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex);
+ if (this.uri == null) {
+ try {
+ this.uri = new URI(this.channel.uri());
+ }
+ catch (URISyntaxException ex) {
+ throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex);
+ }
}
+ return this.uri;
+ }
+ @Override
+ public HttpHeaders getHeaders() {
+ if (this.headers == null) {
+ this.headers = new HttpHeaders();
+ for (String name : this.channel.headers().names()) {
+ this.headers.put(name, this.channel.headers().getAll(name));
+ }
+ }
+ return this.headers;
}
@Override
diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java
index c774ead81fc..d3fdf10a225 100644
--- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java
+++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java
@@ -30,6 +30,8 @@ import org.springframework.http.HttpStatus;
import org.springframework.util.Assert;
/**
+ * Adapt {@link ServerHttpResponse} to the Reactor Net {@link HttpChannel}.
+ *
* @author Stephane Maldini
* @author Rossen Stoyanchev
*/
@@ -41,21 +43,19 @@ public class ReactorServerHttpResponse implements ServerHttpResponse {
public ReactorServerHttpResponse(HttpChannel, Buffer> response) {
- Assert.notNull("'response', response must not be null.");
+ Assert.notNull("'response' must not be null.");
this.channel = response;
- this.headers = initHttpHeaders();
+ this.headers = new ExtendedHttpHeaders(new ReactorHeaderChangeListener());
}
- private HttpHeaders initHttpHeaders() {
- ExtendedHttpHeaders headers = new ExtendedHttpHeaders();
- headers.registerChangeListener(new ReactorHeaderChangeListener());
- return headers;
- }
+ public HttpChannel, Buffer> getReactorChannel() {
+ return this.channel;
+ }
@Override
public void setStatusCode(HttpStatus status) {
- this.channel.responseStatus(Status.valueOf(status.value()));
+ getReactorChannel().responseStatus(Status.valueOf(status.value()));
}
@Override
@@ -65,8 +65,11 @@ public class ReactorServerHttpResponse implements ServerHttpResponse {
@Override
public Publisher setBody(Publisher publisher) {
- return Publishers.lift(publisher, new WriteWithOperator<>(writePublisher ->
- this.channel.writeWith(Publishers.map(writePublisher, Buffer::new))));
+ return Publishers.lift(publisher, new WriteWithOperator<>(this::setBodyInternal));
+ }
+
+ protected Publisher setBodyInternal(Publisher publisher) {
+ return getReactorChannel().writeWith(Publishers.map(publisher, Buffer::new));
}
@@ -74,18 +77,18 @@ public class ReactorServerHttpResponse implements ServerHttpResponse {
@Override
public void headerAdded(String name, String value) {
- channel.responseHeaders().add(name, value);
+ getReactorChannel().responseHeaders().add(name, value);
}
@Override
public void headerPut(String key, List values) {
- channel.responseHeaders().remove(key);
- channel.responseHeaders().add(key, values);
+ getReactorChannel().responseHeaders().remove(key);
+ getReactorChannel().responseHeaders().add(key, values);
}
@Override
public void headerRemoved(String key) {
- channel.responseHeaders().remove(key);
+ getReactorChannel().responseHeaders().remove(key);
}
}
diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpRequest.java
index 2f4ae920c54..6afad378292 100644
--- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpRequest.java
+++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpRequest.java
@@ -31,6 +31,8 @@ import org.springframework.http.HttpMethod;
import org.springframework.util.Assert;
/**
+ * Adapt {@link ServerHttpRequest} to the RxNetty {@link HttpServerRequest}.
+ *
* @author Rossen Stoyanchev
* @author Stephane Maldini
*/
@@ -38,6 +40,8 @@ public class RxNettyServerHttpRequest implements ServerHttpRequest {
private final HttpServerRequest request;
+ private URI uri;
+
private HttpHeaders headers;
@@ -47,44 +51,44 @@ public class RxNettyServerHttpRequest implements ServerHttpRequest {
}
- @Override
- public HttpHeaders getHeaders() {
- if (this.headers == null) {
- this.headers = new HttpHeaders();
- for (String name : this.request.getHeaderNames()) {
- for (String value : this.request.getAllHeaderValues(name)) {
- this.headers.add(name, value);
- }
- }
- }
- return this.headers;
+ public HttpServerRequest getRxNettyRequest() {
+ return this.request;
}
@Override
public HttpMethod getMethod() {
- return HttpMethod.valueOf(this.request.getHttpMethod().name());
+ return HttpMethod.valueOf(this.getRxNettyRequest().getHttpMethod().name());
}
@Override
public URI getURI() {
- try {
- return new URI(this.request.getUri());
- }
- catch (URISyntaxException ex) {
- throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex);
+ if (this.uri == null) {
+ try {
+ this.uri = new URI(this.getRxNettyRequest().getUri());
+ }
+ catch (URISyntaxException ex) {
+ throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex);
+ }
}
+ return this.uri;
+ }
+ @Override
+ public HttpHeaders getHeaders() {
+ if (this.headers == null) {
+ this.headers = new HttpHeaders();
+ for (String name : this.getRxNettyRequest().getHeaderNames()) {
+ this.headers.put(name, this.getRxNettyRequest().getAllHeaderValues(name));
+ }
+ }
+ return this.headers;
}
@Override
public Publisher getBody() {
- Observable bytesContent = this.request.getContent()
- .concatWith(Observable.empty())
- .map(ByteBuf::nioBuffer);
- return RxJava1Converter.from(bytesContent);
+ Observable content = this.getRxNettyRequest().getContent().map(ByteBuf::nioBuffer);
+ content = content.concatWith(Observable.empty()); // See GH issue #58
+ return RxJava1Converter.from(content);
}
- public Observable asObservable() {
- return this.request.getContent().map(ByteBuf::nioBuffer);
- }
}
diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java
index 2b316492b0d..aec66802888 100644
--- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java
+++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java
@@ -32,6 +32,8 @@ import org.springframework.http.HttpStatus;
import org.springframework.util.Assert;
/**
+ * Adapt {@link ServerHttpResponse} to the RxNetty {@link HttpServerResponse}.
+ *
* @author Rossen Stoyanchev
* @author Stephane Maldini
*/
@@ -45,19 +47,17 @@ public class RxNettyServerHttpResponse implements ServerHttpResponse {
public RxNettyServerHttpResponse(HttpServerResponse> response) {
Assert.notNull("'response', response must not be null.");
this.response = response;
- this.headers = initHttpHeaders();
+ this.headers = new ExtendedHttpHeaders(new RxNettyHeaderChangeListener());
}
- private HttpHeaders initHttpHeaders() {
- ExtendedHttpHeaders headers = new ExtendedHttpHeaders();
- headers.registerChangeListener(new RxNettyHeaderChangeListener());
- return headers;
- }
+ public HttpServerResponse> getRxNettyResponse() {
+ return this.response;
+ }
@Override
public void setStatusCode(HttpStatus status) {
- this.response.setStatus(HttpResponseStatus.valueOf(status.value()));
+ getRxNettyResponse().setStatus(HttpResponseStatus.valueOf(status.value()));
}
@Override
@@ -67,15 +67,19 @@ public class RxNettyServerHttpResponse implements ServerHttpResponse {
@Override
public Publisher setBody(Publisher publisher) {
- return Publishers.lift(publisher, new WriteWithOperator<>(writePublisher -> {
- Observable observable = RxJava1Converter.from(writePublisher)
- .map(buffer -> {
- byte[] bytes = new byte[buffer.remaining()];
- buffer.get(bytes);
- return bytes;
- });
- return RxJava1Converter.from(this.response.writeBytes(observable));
- }));
+ return Publishers.lift(publisher, new WriteWithOperator<>(this::setBodyInternal));
+ }
+
+ protected Publisher setBodyInternal(Publisher publisher) {
+ Observable content = RxJava1Converter.from(publisher).map(this::toBytes);
+ Observable completion = getRxNettyResponse().writeBytes(content);
+ return RxJava1Converter.from(completion);
+ }
+
+ private byte[] toBytes(ByteBuffer buffer) {
+ byte[] bytes = new byte[buffer.remaining()];
+ buffer.get(bytes);
+ return bytes;
}
@@ -83,20 +87,20 @@ public class RxNettyServerHttpResponse implements ServerHttpResponse {
@Override
public void headerAdded(String name, String value) {
- response.addHeader(name, value);
+ getRxNettyResponse().addHeader(name, value);
}
@Override
public void headerPut(String key, List values) {
- response.removeHeader(key);
+ getRxNettyResponse().removeHeader(key);
for (String value : values) {
- response.addHeader(key, value);
+ getRxNettyResponse().addHeader(key, value);
}
}
@Override
public void headerRemoved(String key) {
- response.removeHeader(key);
+ getRxNettyResponse().removeHeader(key);
}
}
diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java
index 46158c306e4..477f2b6a860 100644
--- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java
+++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java
@@ -43,6 +43,8 @@ import org.springframework.util.LinkedCaseInsensitiveMap;
import org.springframework.util.StringUtils;
/**
+ * Adapt {@link ServerHttpRequest} to the Servlet {@link HttpServletRequest}.
+ *
* @author Rossen Stoyanchev
*/
public class ServletServerHttpRequest implements ServerHttpRequest {
@@ -52,44 +54,55 @@ public class ServletServerHttpRequest implements ServerHttpRequest {
private static final Log logger = LogFactory.getLog(ServletServerHttpRequest.class);
- private final HttpServletRequest servletRequest;
+ private final HttpServletRequest request;
+
+ private URI uri;
private HttpHeaders headers;
private final RequestBodyPublisher requestBodyPublisher;
- public ServletServerHttpRequest(HttpServletRequest servletRequest, ServletAsyncContextSynchronizer synchronizer) {
- Assert.notNull(servletRequest, "HttpServletRequest must not be null");
- this.servletRequest = servletRequest;
+ public ServletServerHttpRequest(HttpServletRequest request, ServletAsyncContextSynchronizer synchronizer) {
+ Assert.notNull(request, "'request' must not be null.");
+ this.request = request;
this.requestBodyPublisher = new RequestBodyPublisher(synchronizer, BUFFER_SIZE);
}
+ public HttpServletRequest getServletRequest() {
+ return this.request;
+ }
+
@Override
public HttpMethod getMethod() {
- return HttpMethod.valueOf(this.servletRequest.getMethod());
+ return HttpMethod.valueOf(getServletRequest().getMethod());
}
@Override
public URI getURI() {
- try {
- return new URI(this.servletRequest.getScheme(), null, this.servletRequest.getServerName(),
- this.servletRequest.getServerPort(), this.servletRequest.getRequestURI(),
- this.servletRequest.getQueryString(), null);
- }
- catch (URISyntaxException ex) {
- throw new IllegalStateException("Could not get HttpServletRequest URI: " + ex.getMessage(), ex);
+ if (this.uri == null) {
+ try {
+ this.uri = new URI(getServletRequest().getScheme(), null,
+ getServletRequest().getServerName(),
+ getServletRequest().getServerPort(),
+ getServletRequest().getRequestURI(),
+ getServletRequest().getQueryString(), null);
+ }
+ catch (URISyntaxException ex) {
+ throw new IllegalStateException("Could not get HttpServletRequest URI: " + ex.getMessage(), ex);
+ }
}
+ return this.uri;
}
@Override
public HttpHeaders getHeaders() {
if (this.headers == null) {
this.headers = new HttpHeaders();
- for (Enumeration> names = this.servletRequest.getHeaderNames(); names.hasMoreElements(); ) {
+ for (Enumeration> names = getServletRequest().getHeaderNames(); names.hasMoreElements(); ) {
String headerName = (String) names.nextElement();
- for (Enumeration> headerValues = this.servletRequest.getHeaders(headerName);
+ for (Enumeration> headerValues = getServletRequest().getHeaders(headerName);
headerValues.hasMoreElements(); ) {
String headerValue = (String) headerValues.nextElement();
this.headers.add(headerName, headerValue);
@@ -98,14 +111,14 @@ public class ServletServerHttpRequest implements ServerHttpRequest {
// HttpServletRequest exposes some headers as properties: we should include those if not already present
MediaType contentType = this.headers.getContentType();
if (contentType == null) {
- String requestContentType = this.servletRequest.getContentType();
+ String requestContentType = getServletRequest().getContentType();
if (StringUtils.hasLength(requestContentType)) {
contentType = MediaType.parseMediaType(requestContentType);
this.headers.setContentType(contentType);
}
}
if (contentType != null && contentType.getCharSet() == null) {
- String requestEncoding = this.servletRequest.getCharacterEncoding();
+ String requestEncoding = getServletRequest().getCharacterEncoding();
if (StringUtils.hasLength(requestEncoding)) {
Charset charSet = Charset.forName(requestEncoding);
Map params = new LinkedCaseInsensitiveMap<>();
@@ -116,7 +129,7 @@ public class ServletServerHttpRequest implements ServerHttpRequest {
}
}
if (this.headers.getContentLength() == -1) {
- int requestContentLength = this.servletRequest.getContentLength();
+ int requestContentLength = getServletRequest().getContentLength();
if (requestContentLength != -1) {
this.headers.setContentLength(requestContentLength);
}
diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java
index 72ec01a3445..55dfee31a88 100644
--- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java
+++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java
@@ -36,6 +36,8 @@ import org.springframework.http.HttpStatus;
import org.springframework.util.Assert;
/**
+ * Adapt {@link ServerHttpResponse} to the Servlet {@link HttpServletResponse}.
+ *
* @author Rossen Stoyanchev
*/
public class ServletServerHttpResponse implements ServerHttpResponse {
@@ -53,20 +55,18 @@ public class ServletServerHttpResponse implements ServerHttpResponse {
public ServletServerHttpResponse(HttpServletResponse response, ServletAsyncContextSynchronizer synchronizer) {
Assert.notNull(response, "'response' must not be null");
this.response = response;
- this.headers = initHttpHeaders();
+ this.headers = new ExtendedHttpHeaders(new ServletHeaderChangeListener());
this.subscriber = new ResponseBodySubscriber(synchronizer);
}
- private HttpHeaders initHttpHeaders() {
- ExtendedHttpHeaders headers = new ExtendedHttpHeaders();
- headers.registerChangeListener(new ServletHeaderChangeListener());
- return headers;
- }
+ public HttpServletResponse getServletResponse() {
+ return this.response;
+ }
@Override
public void setStatusCode(HttpStatus status) {
- this.response.setStatus(status.value());
+ getServletResponse().setStatus(status.value());
}
@Override
@@ -80,8 +80,11 @@ public class ServletServerHttpResponse implements ServerHttpResponse {
@Override
public Publisher setBody(final Publisher publisher) {
- return Publishers.lift(publisher, new WriteWithOperator<>(writePublisher ->
- (s -> writePublisher.subscribe(subscriber))));
+ return Publishers.lift(publisher, new WriteWithOperator<>(this::setBodyInternal));
+ }
+
+ protected Publisher setBodyInternal(Publisher publisher) {
+ return s -> publisher.subscribe(subscriber);
}
@@ -89,14 +92,14 @@ public class ServletServerHttpResponse implements ServerHttpResponse {
@Override
public void headerAdded(String name, String value) {
- response.addHeader(name, value);
+ getServletResponse().addHeader(name, value);
}
@Override
public void headerPut(String key, List values) {
// We can only add but not remove
for (String value : values) {
- response.addHeader(key, value);
+ getServletResponse().addHeader(key, value);
}
}
@@ -106,6 +109,7 @@ public class ServletServerHttpResponse implements ServerHttpResponse {
}
}
+
private static class ResponseBodySubscriber implements WriteListener, Subscriber {
private final ServletAsyncContextSynchronizer synchronizer;
diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java
index db3e5936d3e..e8133f8bc87 100644
--- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java
+++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java
@@ -36,12 +36,13 @@ import reactor.core.support.BackpressureUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
-import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.util.Assert;
import static org.xnio.IoUtils.safeClose;
/**
+ * Adapt {@link ServerHttpRequest} to the Underow {@link HttpServerExchange}.
+ *
* @author Marek Hawrylczak
* @author Rossen Stoyanchev
*/
@@ -49,10 +50,12 @@ public class UndertowServerHttpRequest implements ServerHttpRequest {
private final HttpServerExchange exchange;
- private final Publisher body = new RequestBodyPublisher();
+ private URI uri;
private HttpHeaders headers;
+ private final Publisher body = new RequestBodyPublisher();
+
public UndertowServerHttpRequest(HttpServerExchange exchange) {
Assert.notNull(exchange, "'exchange' is required.");
@@ -60,31 +63,38 @@ public class UndertowServerHttpRequest implements ServerHttpRequest {
}
+ public HttpServerExchange getUndertowExchange() {
+ return this.exchange;
+ }
+
@Override
public HttpMethod getMethod() {
- return HttpMethod.valueOf(this.exchange.getRequestMethod().toString());
+ return HttpMethod.valueOf(this.getUndertowExchange().getRequestMethod().toString());
}
@Override
public URI getURI() {
- try {
- return new URI(this.exchange.getRequestScheme(), null, this.exchange.getHostName(),
- this.exchange.getHostPort(), this.exchange.getRequestURI(),
- this.exchange.getQueryString(), null);
- }
- catch (URISyntaxException ex) {
- throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex);
+ if (this.uri == null) {
+ try {
+ return new URI(this.getUndertowExchange().getRequestScheme(), null,
+ this.getUndertowExchange().getHostName(),
+ this.getUndertowExchange().getHostPort(),
+ this.getUndertowExchange().getRequestURI(),
+ this.getUndertowExchange().getQueryString(), null);
+ }
+ catch (URISyntaxException ex) {
+ throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex);
+ }
}
+ return this.uri;
}
@Override
public HttpHeaders getHeaders() {
if (this.headers == null) {
this.headers = new HttpHeaders();
- for (HeaderValues headerValues : this.exchange.getRequestHeaders()) {
- for (String value : headerValues) {
- this.headers.add(headerValues.getHeaderName().toString(), value);
- }
+ for (HeaderValues values : this.getUndertowExchange().getRequestHeaders()) {
+ this.headers.put(values.getHeaderName().toString(), values);
}
}
return this.headers;
diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java
index b7fa1edfb61..316257bf8b5 100644
--- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java
+++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java
@@ -46,6 +46,8 @@ import static org.xnio.ChannelListeners.flushingChannelListener;
import static org.xnio.IoUtils.safeClose;
/**
+ * Adapt {@link ServerHttpResponse} to the Undertow {@link HttpServerExchange}.
+ *
* @author Marek Hawrylczak
* @author Rossen Stoyanchev
*/
@@ -56,28 +58,26 @@ public class UndertowServerHttpResponse implements ServerHttpResponse {
private final HttpServerExchange exchange;
- private final ResponseBodySubscriber bodySubscriber = new ResponseBodySubscriber();
-
private final HttpHeaders headers;
+ private final ResponseBodySubscriber bodySubscriber = new ResponseBodySubscriber();
+
public UndertowServerHttpResponse(HttpServerExchange exchange) {
Assert.notNull(exchange, "'exchange' is required.");
this.exchange = exchange;
- this.headers = initHttpHeaders();
+ this.headers = new ExtendedHttpHeaders(new UndertowHeaderChangeListener());
}
- private HttpHeaders initHttpHeaders() {
- ExtendedHttpHeaders headers = new ExtendedHttpHeaders();
- headers.registerChangeListener(new UndertowHeaderChangeListener());
- return headers;
- }
+ public HttpServerExchange getUndertowExchange() {
+ return this.exchange;
+ }
@Override
public void setStatusCode(HttpStatus status) {
Assert.notNull(status);
- this.exchange.setStatusCode(status.value());
+ getUndertowExchange().setStatusCode(status.value());
}
@Override
@@ -87,8 +87,11 @@ public class UndertowServerHttpResponse implements ServerHttpResponse {
@Override
public Publisher setBody(Publisher publisher) {
- return Publishers.lift(publisher, new WriteWithOperator<>(writePublisher ->
- (subscriber -> writePublisher.subscribe(bodySubscriber))));
+ return Publishers.lift(publisher, new WriteWithOperator<>(this::setBodyInternal));
+ }
+
+ protected Publisher setBodyInternal(Publisher writePublisher) {
+ return subscriber -> writePublisher.subscribe(bodySubscriber);
}
@@ -96,17 +99,19 @@ public class UndertowServerHttpResponse implements ServerHttpResponse {
@Override
public void headerAdded(String name, String value) {
- exchange.getResponseHeaders().add(HttpString.tryFromString(name), value);
+ HttpString headerName = HttpString.tryFromString(name);
+ getUndertowExchange().getResponseHeaders().add(headerName, value);
}
@Override
public void headerPut(String key, List values) {
- exchange.getResponseHeaders().putAll(HttpString.tryFromString(key), values);
+ HttpString headerName = HttpString.tryFromString(key);
+ getUndertowExchange().getResponseHeaders().putAll(headerName, values);
}
@Override
public void headerRemoved(String key) {
- exchange.getResponseHeaders().remove(key);
+ getUndertowExchange().getResponseHeaders().remove(key);
}
}