From 869f6bef4086fe66d642a7e4f841c5716e6e45a3 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 31 Dec 2015 12:27:40 -0500 Subject: [PATCH] Polish server request & response implementations --- .../http/ExtendedHttpHeaders.java | 16 +++--- .../http/ReactiveHttpInputMessage.java | 10 ++-- .../http/ReactiveHttpOutputMessage.java | 15 +++--- .../reactive/ReactorServerHttpRequest.java | 42 +++++++++------ .../reactive/ReactorServerHttpResponse.java | 31 ++++++----- .../reactive/RxNettyServerHttpRequest.java | 52 ++++++++++--------- .../reactive/RxNettyServerHttpResponse.java | 44 +++++++++------- .../reactive/ServletServerHttpRequest.java | 47 +++++++++++------ .../reactive/ServletServerHttpResponse.java | 26 ++++++---- .../reactive/UndertowServerHttpRequest.java | 38 +++++++++----- .../reactive/UndertowServerHttpResponse.java | 33 +++++++----- 11 files changed, 202 insertions(+), 152 deletions(-) diff --git a/spring-web-reactive/src/main/java/org/springframework/http/ExtendedHttpHeaders.java b/spring-web-reactive/src/main/java/org/springframework/http/ExtendedHttpHeaders.java index ade3903d893..f5fa19922bf 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/ExtendedHttpHeaders.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/ExtendedHttpHeaders.java @@ -21,10 +21,11 @@ import java.util.List; import java.util.Map; /** - * Extension of HttpHeaders (to be merged into HttpHeaders) that allows the - * registration of {@link HeaderChangeListener}. For use with HTTP response - * implementations that can keep track of changes made headers and keep the - * underlying server headers always in sync. + * Variant of HttpHeaders (to be merged into HttpHeaders) that supports the + * registration of {@link HeaderChangeListener}s. + * + *

For use with HTTP server response implementations that wish to propagate + * header header changes to the underlying runtime as they occur. * * @author Rossen Stoyanchev */ @@ -33,7 +34,10 @@ public class ExtendedHttpHeaders extends HttpHeaders { private final List listeners = new ArrayList<>(1); - public void registerChangeListener(HeaderChangeListener listener) { + public ExtendedHttpHeaders() { + } + + public ExtendedHttpHeaders(HeaderChangeListener listener) { this.listeners.add(listener); } @@ -48,7 +52,7 @@ public class ExtendedHttpHeaders extends HttpHeaders { @Override public void set(String name, String value) { - List values = new LinkedList(); + List values = new LinkedList<>(); values.add(value); put(name, values); } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpInputMessage.java b/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpInputMessage.java index 7870571b4ad..3cff8d95530 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpInputMessage.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpInputMessage.java @@ -21,11 +21,9 @@ import java.nio.ByteBuffer; import org.reactivestreams.Publisher; /** - * Represents a "reactive" HTTP input message, consisting of - * {@linkplain #getHeaders() headers} and a readable - * {@linkplain #getBody() streaming body }. + * An "reactive" HTTP input message that exposes the input as {@link Publisher}. * - *

Typically implemented by an HTTP request on the server-side, or a response + *

Typically implemented by an HTTP request on the server-side or a response * on the client-side. * * @author Arjen Poutsma @@ -33,8 +31,8 @@ import org.reactivestreams.Publisher; public interface ReactiveHttpInputMessage extends HttpMessage { /** - * Return the body of the message as an publisher of {@code ByteBuffer}s. - * @return the body + * Return the body of the message as a {@link Publisher}. + * @return the body content publisher */ Publisher getBody(); diff --git a/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java b/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java index 33df2dcdeea..83b3507df4b 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java @@ -21,11 +21,9 @@ import java.nio.ByteBuffer; import org.reactivestreams.Publisher; /** - * Represents a "reactive" HTTP output message, consisting of - * {@linkplain #getHeaders() headers} and the capability to add a - * {@linkplain #setBody(Publisher) body}. + * A "reactive" HTTP output message that accepts output as a {@link Publisher}. * - *

Typically implemented by an HTTP request on the client-side, or a response + *

Typically implemented by an HTTP request on the client-side or a response * on the server-side. * * @author Arjen Poutsma @@ -33,12 +31,11 @@ import org.reactivestreams.Publisher; public interface ReactiveHttpOutputMessage extends HttpMessage { /** - * Sets the body of this message to the given publisher of {@link ByteBuffer}s. - * The publisher will be used to write to the underlying HTTP layer with - * asynchronously, given pull demand by this layer. + * Set the body of the message to the given {@link Publisher} which will be + * used to write to the underlying HTTP layer. * - * @param body the body to use - * @return a publisher that indicates completion + * @param body the body content publisher + * @return a publisher that indicates completion or error. */ Publisher setBody(Publisher body); diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java index 343ec20f24e..493e37df65d 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java @@ -26,36 +26,30 @@ import reactor.io.net.http.HttpChannel; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; -import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.util.Assert; /** + * Adapt {@link ServerHttpRequest} to the Reactor Net {@link HttpChannel}. + * * @author Stephane Maldini */ public class ReactorServerHttpRequest implements ServerHttpRequest { private final HttpChannel channel; + private URI uri; + private HttpHeaders headers; public ReactorServerHttpRequest(HttpChannel request) { - Assert.notNull("'request', request must not be null."); + Assert.notNull("'request' must not be null."); this.channel = request; } - @Override - public HttpHeaders getHeaders() { - if (this.headers == null) { - this.headers = new HttpHeaders(); - for (String name : this.channel.headers().names()) { - for (String value : this.channel.headers().getAll(name)) { - this.headers.add(name, value); - } - } - } - return this.headers; + public HttpChannel getReactorChannel() { + return this.channel; } @Override @@ -65,12 +59,26 @@ public class ReactorServerHttpRequest implements ServerHttpRequest { @Override public URI getURI() { - try { - return new URI(this.channel.uri()); - } catch (URISyntaxException ex) { - throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex); + if (this.uri == null) { + try { + this.uri = new URI(this.channel.uri()); + } + catch (URISyntaxException ex) { + throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex); + } } + return this.uri; + } + @Override + public HttpHeaders getHeaders() { + if (this.headers == null) { + this.headers = new HttpHeaders(); + for (String name : this.channel.headers().names()) { + this.headers.put(name, this.channel.headers().getAll(name)); + } + } + return this.headers; } @Override diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java index c774ead81fc..d3fdf10a225 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java @@ -30,6 +30,8 @@ import org.springframework.http.HttpStatus; import org.springframework.util.Assert; /** + * Adapt {@link ServerHttpResponse} to the Reactor Net {@link HttpChannel}. + * * @author Stephane Maldini * @author Rossen Stoyanchev */ @@ -41,21 +43,19 @@ public class ReactorServerHttpResponse implements ServerHttpResponse { public ReactorServerHttpResponse(HttpChannel response) { - Assert.notNull("'response', response must not be null."); + Assert.notNull("'response' must not be null."); this.channel = response; - this.headers = initHttpHeaders(); + this.headers = new ExtendedHttpHeaders(new ReactorHeaderChangeListener()); } - private HttpHeaders initHttpHeaders() { - ExtendedHttpHeaders headers = new ExtendedHttpHeaders(); - headers.registerChangeListener(new ReactorHeaderChangeListener()); - return headers; - } + public HttpChannel getReactorChannel() { + return this.channel; + } @Override public void setStatusCode(HttpStatus status) { - this.channel.responseStatus(Status.valueOf(status.value())); + getReactorChannel().responseStatus(Status.valueOf(status.value())); } @Override @@ -65,8 +65,11 @@ public class ReactorServerHttpResponse implements ServerHttpResponse { @Override public Publisher setBody(Publisher publisher) { - return Publishers.lift(publisher, new WriteWithOperator<>(writePublisher -> - this.channel.writeWith(Publishers.map(writePublisher, Buffer::new)))); + return Publishers.lift(publisher, new WriteWithOperator<>(this::setBodyInternal)); + } + + protected Publisher setBodyInternal(Publisher publisher) { + return getReactorChannel().writeWith(Publishers.map(publisher, Buffer::new)); } @@ -74,18 +77,18 @@ public class ReactorServerHttpResponse implements ServerHttpResponse { @Override public void headerAdded(String name, String value) { - channel.responseHeaders().add(name, value); + getReactorChannel().responseHeaders().add(name, value); } @Override public void headerPut(String key, List values) { - channel.responseHeaders().remove(key); - channel.responseHeaders().add(key, values); + getReactorChannel().responseHeaders().remove(key); + getReactorChannel().responseHeaders().add(key, values); } @Override public void headerRemoved(String key) { - channel.responseHeaders().remove(key); + getReactorChannel().responseHeaders().remove(key); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpRequest.java index 2f4ae920c54..6afad378292 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpRequest.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpRequest.java @@ -31,6 +31,8 @@ import org.springframework.http.HttpMethod; import org.springframework.util.Assert; /** + * Adapt {@link ServerHttpRequest} to the RxNetty {@link HttpServerRequest}. + * * @author Rossen Stoyanchev * @author Stephane Maldini */ @@ -38,6 +40,8 @@ public class RxNettyServerHttpRequest implements ServerHttpRequest { private final HttpServerRequest request; + private URI uri; + private HttpHeaders headers; @@ -47,44 +51,44 @@ public class RxNettyServerHttpRequest implements ServerHttpRequest { } - @Override - public HttpHeaders getHeaders() { - if (this.headers == null) { - this.headers = new HttpHeaders(); - for (String name : this.request.getHeaderNames()) { - for (String value : this.request.getAllHeaderValues(name)) { - this.headers.add(name, value); - } - } - } - return this.headers; + public HttpServerRequest getRxNettyRequest() { + return this.request; } @Override public HttpMethod getMethod() { - return HttpMethod.valueOf(this.request.getHttpMethod().name()); + return HttpMethod.valueOf(this.getRxNettyRequest().getHttpMethod().name()); } @Override public URI getURI() { - try { - return new URI(this.request.getUri()); - } - catch (URISyntaxException ex) { - throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex); + if (this.uri == null) { + try { + this.uri = new URI(this.getRxNettyRequest().getUri()); + } + catch (URISyntaxException ex) { + throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex); + } } + return this.uri; + } + @Override + public HttpHeaders getHeaders() { + if (this.headers == null) { + this.headers = new HttpHeaders(); + for (String name : this.getRxNettyRequest().getHeaderNames()) { + this.headers.put(name, this.getRxNettyRequest().getAllHeaderValues(name)); + } + } + return this.headers; } @Override public Publisher getBody() { - Observable bytesContent = this.request.getContent() - .concatWith(Observable.empty()) - .map(ByteBuf::nioBuffer); - return RxJava1Converter.from(bytesContent); + Observable content = this.getRxNettyRequest().getContent().map(ByteBuf::nioBuffer); + content = content.concatWith(Observable.empty()); // See GH issue #58 + return RxJava1Converter.from(content); } - public Observable asObservable() { - return this.request.getContent().map(ByteBuf::nioBuffer); - } } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java index 2b316492b0d..aec66802888 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java @@ -32,6 +32,8 @@ import org.springframework.http.HttpStatus; import org.springframework.util.Assert; /** + * Adapt {@link ServerHttpResponse} to the RxNetty {@link HttpServerResponse}. + * * @author Rossen Stoyanchev * @author Stephane Maldini */ @@ -45,19 +47,17 @@ public class RxNettyServerHttpResponse implements ServerHttpResponse { public RxNettyServerHttpResponse(HttpServerResponse response) { Assert.notNull("'response', response must not be null."); this.response = response; - this.headers = initHttpHeaders(); + this.headers = new ExtendedHttpHeaders(new RxNettyHeaderChangeListener()); } - private HttpHeaders initHttpHeaders() { - ExtendedHttpHeaders headers = new ExtendedHttpHeaders(); - headers.registerChangeListener(new RxNettyHeaderChangeListener()); - return headers; - } + public HttpServerResponse getRxNettyResponse() { + return this.response; + } @Override public void setStatusCode(HttpStatus status) { - this.response.setStatus(HttpResponseStatus.valueOf(status.value())); + getRxNettyResponse().setStatus(HttpResponseStatus.valueOf(status.value())); } @Override @@ -67,15 +67,19 @@ public class RxNettyServerHttpResponse implements ServerHttpResponse { @Override public Publisher setBody(Publisher publisher) { - return Publishers.lift(publisher, new WriteWithOperator<>(writePublisher -> { - Observable observable = RxJava1Converter.from(writePublisher) - .map(buffer -> { - byte[] bytes = new byte[buffer.remaining()]; - buffer.get(bytes); - return bytes; - }); - return RxJava1Converter.from(this.response.writeBytes(observable)); - })); + return Publishers.lift(publisher, new WriteWithOperator<>(this::setBodyInternal)); + } + + protected Publisher setBodyInternal(Publisher publisher) { + Observable content = RxJava1Converter.from(publisher).map(this::toBytes); + Observable completion = getRxNettyResponse().writeBytes(content); + return RxJava1Converter.from(completion); + } + + private byte[] toBytes(ByteBuffer buffer) { + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + return bytes; } @@ -83,20 +87,20 @@ public class RxNettyServerHttpResponse implements ServerHttpResponse { @Override public void headerAdded(String name, String value) { - response.addHeader(name, value); + getRxNettyResponse().addHeader(name, value); } @Override public void headerPut(String key, List values) { - response.removeHeader(key); + getRxNettyResponse().removeHeader(key); for (String value : values) { - response.addHeader(key, value); + getRxNettyResponse().addHeader(key, value); } } @Override public void headerRemoved(String key) { - response.removeHeader(key); + getRxNettyResponse().removeHeader(key); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java index 46158c306e4..477f2b6a860 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java @@ -43,6 +43,8 @@ import org.springframework.util.LinkedCaseInsensitiveMap; import org.springframework.util.StringUtils; /** + * Adapt {@link ServerHttpRequest} to the Servlet {@link HttpServletRequest}. + * * @author Rossen Stoyanchev */ public class ServletServerHttpRequest implements ServerHttpRequest { @@ -52,44 +54,55 @@ public class ServletServerHttpRequest implements ServerHttpRequest { private static final Log logger = LogFactory.getLog(ServletServerHttpRequest.class); - private final HttpServletRequest servletRequest; + private final HttpServletRequest request; + + private URI uri; private HttpHeaders headers; private final RequestBodyPublisher requestBodyPublisher; - public ServletServerHttpRequest(HttpServletRequest servletRequest, ServletAsyncContextSynchronizer synchronizer) { - Assert.notNull(servletRequest, "HttpServletRequest must not be null"); - this.servletRequest = servletRequest; + public ServletServerHttpRequest(HttpServletRequest request, ServletAsyncContextSynchronizer synchronizer) { + Assert.notNull(request, "'request' must not be null."); + this.request = request; this.requestBodyPublisher = new RequestBodyPublisher(synchronizer, BUFFER_SIZE); } + public HttpServletRequest getServletRequest() { + return this.request; + } + @Override public HttpMethod getMethod() { - return HttpMethod.valueOf(this.servletRequest.getMethod()); + return HttpMethod.valueOf(getServletRequest().getMethod()); } @Override public URI getURI() { - try { - return new URI(this.servletRequest.getScheme(), null, this.servletRequest.getServerName(), - this.servletRequest.getServerPort(), this.servletRequest.getRequestURI(), - this.servletRequest.getQueryString(), null); - } - catch (URISyntaxException ex) { - throw new IllegalStateException("Could not get HttpServletRequest URI: " + ex.getMessage(), ex); + if (this.uri == null) { + try { + this.uri = new URI(getServletRequest().getScheme(), null, + getServletRequest().getServerName(), + getServletRequest().getServerPort(), + getServletRequest().getRequestURI(), + getServletRequest().getQueryString(), null); + } + catch (URISyntaxException ex) { + throw new IllegalStateException("Could not get HttpServletRequest URI: " + ex.getMessage(), ex); + } } + return this.uri; } @Override public HttpHeaders getHeaders() { if (this.headers == null) { this.headers = new HttpHeaders(); - for (Enumeration names = this.servletRequest.getHeaderNames(); names.hasMoreElements(); ) { + for (Enumeration names = getServletRequest().getHeaderNames(); names.hasMoreElements(); ) { String headerName = (String) names.nextElement(); - for (Enumeration headerValues = this.servletRequest.getHeaders(headerName); + for (Enumeration headerValues = getServletRequest().getHeaders(headerName); headerValues.hasMoreElements(); ) { String headerValue = (String) headerValues.nextElement(); this.headers.add(headerName, headerValue); @@ -98,14 +111,14 @@ public class ServletServerHttpRequest implements ServerHttpRequest { // HttpServletRequest exposes some headers as properties: we should include those if not already present MediaType contentType = this.headers.getContentType(); if (contentType == null) { - String requestContentType = this.servletRequest.getContentType(); + String requestContentType = getServletRequest().getContentType(); if (StringUtils.hasLength(requestContentType)) { contentType = MediaType.parseMediaType(requestContentType); this.headers.setContentType(contentType); } } if (contentType != null && contentType.getCharSet() == null) { - String requestEncoding = this.servletRequest.getCharacterEncoding(); + String requestEncoding = getServletRequest().getCharacterEncoding(); if (StringUtils.hasLength(requestEncoding)) { Charset charSet = Charset.forName(requestEncoding); Map params = new LinkedCaseInsensitiveMap<>(); @@ -116,7 +129,7 @@ public class ServletServerHttpRequest implements ServerHttpRequest { } } if (this.headers.getContentLength() == -1) { - int requestContentLength = this.servletRequest.getContentLength(); + int requestContentLength = getServletRequest().getContentLength(); if (requestContentLength != -1) { this.headers.setContentLength(requestContentLength); } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java index 72ec01a3445..55dfee31a88 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java @@ -36,6 +36,8 @@ import org.springframework.http.HttpStatus; import org.springframework.util.Assert; /** + * Adapt {@link ServerHttpResponse} to the Servlet {@link HttpServletResponse}. + * * @author Rossen Stoyanchev */ public class ServletServerHttpResponse implements ServerHttpResponse { @@ -53,20 +55,18 @@ public class ServletServerHttpResponse implements ServerHttpResponse { public ServletServerHttpResponse(HttpServletResponse response, ServletAsyncContextSynchronizer synchronizer) { Assert.notNull(response, "'response' must not be null"); this.response = response; - this.headers = initHttpHeaders(); + this.headers = new ExtendedHttpHeaders(new ServletHeaderChangeListener()); this.subscriber = new ResponseBodySubscriber(synchronizer); } - private HttpHeaders initHttpHeaders() { - ExtendedHttpHeaders headers = new ExtendedHttpHeaders(); - headers.registerChangeListener(new ServletHeaderChangeListener()); - return headers; - } + public HttpServletResponse getServletResponse() { + return this.response; + } @Override public void setStatusCode(HttpStatus status) { - this.response.setStatus(status.value()); + getServletResponse().setStatus(status.value()); } @Override @@ -80,8 +80,11 @@ public class ServletServerHttpResponse implements ServerHttpResponse { @Override public Publisher setBody(final Publisher publisher) { - return Publishers.lift(publisher, new WriteWithOperator<>(writePublisher -> - (s -> writePublisher.subscribe(subscriber)))); + return Publishers.lift(publisher, new WriteWithOperator<>(this::setBodyInternal)); + } + + protected Publisher setBodyInternal(Publisher publisher) { + return s -> publisher.subscribe(subscriber); } @@ -89,14 +92,14 @@ public class ServletServerHttpResponse implements ServerHttpResponse { @Override public void headerAdded(String name, String value) { - response.addHeader(name, value); + getServletResponse().addHeader(name, value); } @Override public void headerPut(String key, List values) { // We can only add but not remove for (String value : values) { - response.addHeader(key, value); + getServletResponse().addHeader(key, value); } } @@ -106,6 +109,7 @@ public class ServletServerHttpResponse implements ServerHttpResponse { } } + private static class ResponseBodySubscriber implements WriteListener, Subscriber { private final ServletAsyncContextSynchronizer synchronizer; diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java index db3e5936d3e..e8133f8bc87 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java @@ -36,12 +36,13 @@ import reactor.core.support.BackpressureUtils; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; -import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.util.Assert; import static org.xnio.IoUtils.safeClose; /** + * Adapt {@link ServerHttpRequest} to the Underow {@link HttpServerExchange}. + * * @author Marek Hawrylczak * @author Rossen Stoyanchev */ @@ -49,10 +50,12 @@ public class UndertowServerHttpRequest implements ServerHttpRequest { private final HttpServerExchange exchange; - private final Publisher body = new RequestBodyPublisher(); + private URI uri; private HttpHeaders headers; + private final Publisher body = new RequestBodyPublisher(); + public UndertowServerHttpRequest(HttpServerExchange exchange) { Assert.notNull(exchange, "'exchange' is required."); @@ -60,31 +63,38 @@ public class UndertowServerHttpRequest implements ServerHttpRequest { } + public HttpServerExchange getUndertowExchange() { + return this.exchange; + } + @Override public HttpMethod getMethod() { - return HttpMethod.valueOf(this.exchange.getRequestMethod().toString()); + return HttpMethod.valueOf(this.getUndertowExchange().getRequestMethod().toString()); } @Override public URI getURI() { - try { - return new URI(this.exchange.getRequestScheme(), null, this.exchange.getHostName(), - this.exchange.getHostPort(), this.exchange.getRequestURI(), - this.exchange.getQueryString(), null); - } - catch (URISyntaxException ex) { - throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex); + if (this.uri == null) { + try { + return new URI(this.getUndertowExchange().getRequestScheme(), null, + this.getUndertowExchange().getHostName(), + this.getUndertowExchange().getHostPort(), + this.getUndertowExchange().getRequestURI(), + this.getUndertowExchange().getQueryString(), null); + } + catch (URISyntaxException ex) { + throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex); + } } + return this.uri; } @Override public HttpHeaders getHeaders() { if (this.headers == null) { this.headers = new HttpHeaders(); - for (HeaderValues headerValues : this.exchange.getRequestHeaders()) { - for (String value : headerValues) { - this.headers.add(headerValues.getHeaderName().toString(), value); - } + for (HeaderValues values : this.getUndertowExchange().getRequestHeaders()) { + this.headers.put(values.getHeaderName().toString(), values); } } return this.headers; diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java index b7fa1edfb61..316257bf8b5 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java @@ -46,6 +46,8 @@ import static org.xnio.ChannelListeners.flushingChannelListener; import static org.xnio.IoUtils.safeClose; /** + * Adapt {@link ServerHttpResponse} to the Undertow {@link HttpServerExchange}. + * * @author Marek Hawrylczak * @author Rossen Stoyanchev */ @@ -56,28 +58,26 @@ public class UndertowServerHttpResponse implements ServerHttpResponse { private final HttpServerExchange exchange; - private final ResponseBodySubscriber bodySubscriber = new ResponseBodySubscriber(); - private final HttpHeaders headers; + private final ResponseBodySubscriber bodySubscriber = new ResponseBodySubscriber(); + public UndertowServerHttpResponse(HttpServerExchange exchange) { Assert.notNull(exchange, "'exchange' is required."); this.exchange = exchange; - this.headers = initHttpHeaders(); + this.headers = new ExtendedHttpHeaders(new UndertowHeaderChangeListener()); } - private HttpHeaders initHttpHeaders() { - ExtendedHttpHeaders headers = new ExtendedHttpHeaders(); - headers.registerChangeListener(new UndertowHeaderChangeListener()); - return headers; - } + public HttpServerExchange getUndertowExchange() { + return this.exchange; + } @Override public void setStatusCode(HttpStatus status) { Assert.notNull(status); - this.exchange.setStatusCode(status.value()); + getUndertowExchange().setStatusCode(status.value()); } @Override @@ -87,8 +87,11 @@ public class UndertowServerHttpResponse implements ServerHttpResponse { @Override public Publisher setBody(Publisher publisher) { - return Publishers.lift(publisher, new WriteWithOperator<>(writePublisher -> - (subscriber -> writePublisher.subscribe(bodySubscriber)))); + return Publishers.lift(publisher, new WriteWithOperator<>(this::setBodyInternal)); + } + + protected Publisher setBodyInternal(Publisher writePublisher) { + return subscriber -> writePublisher.subscribe(bodySubscriber); } @@ -96,17 +99,19 @@ public class UndertowServerHttpResponse implements ServerHttpResponse { @Override public void headerAdded(String name, String value) { - exchange.getResponseHeaders().add(HttpString.tryFromString(name), value); + HttpString headerName = HttpString.tryFromString(name); + getUndertowExchange().getResponseHeaders().add(headerName, value); } @Override public void headerPut(String key, List values) { - exchange.getResponseHeaders().putAll(HttpString.tryFromString(key), values); + HttpString headerName = HttpString.tryFromString(key); + getUndertowExchange().getResponseHeaders().putAll(headerName, values); } @Override public void headerRemoved(String key) { - exchange.getResponseHeaders().remove(key); + getUndertowExchange().getResponseHeaders().remove(key); } }