Browse Source

Polish server request & response implementations

pull/1111/head
Rossen Stoyanchev 10 years ago
parent
commit
869f6bef40
  1. 16
      spring-web-reactive/src/main/java/org/springframework/http/ExtendedHttpHeaders.java
  2. 10
      spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpInputMessage.java
  3. 15
      spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java
  4. 42
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java
  5. 31
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java
  6. 52
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpRequest.java
  7. 44
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java
  8. 47
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java
  9. 26
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java
  10. 38
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java
  11. 33
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java

16
spring-web-reactive/src/main/java/org/springframework/http/ExtendedHttpHeaders.java

@ -21,10 +21,11 @@ import java.util.List; @@ -21,10 +21,11 @@ import java.util.List;
import java.util.Map;
/**
* Extension of HttpHeaders (to be merged into HttpHeaders) that allows the
* registration of {@link HeaderChangeListener}. For use with HTTP response
* implementations that can keep track of changes made headers and keep the
* underlying server headers always in sync.
* Variant of HttpHeaders (to be merged into HttpHeaders) that supports the
* registration of {@link HeaderChangeListener}s.
*
* <p>For use with HTTP server response implementations that wish to propagate
* header header changes to the underlying runtime as they occur.
*
* @author Rossen Stoyanchev
*/
@ -33,7 +34,10 @@ public class ExtendedHttpHeaders extends HttpHeaders { @@ -33,7 +34,10 @@ public class ExtendedHttpHeaders extends HttpHeaders {
private final List<HeaderChangeListener> listeners = new ArrayList<>(1);
public void registerChangeListener(HeaderChangeListener listener) {
public ExtendedHttpHeaders() {
}
public ExtendedHttpHeaders(HeaderChangeListener listener) {
this.listeners.add(listener);
}
@ -48,7 +52,7 @@ public class ExtendedHttpHeaders extends HttpHeaders { @@ -48,7 +52,7 @@ public class ExtendedHttpHeaders extends HttpHeaders {
@Override
public void set(String name, String value) {
List<String> values = new LinkedList<String>();
List<String> values = new LinkedList<>();
values.add(value);
put(name, values);
}

10
spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpInputMessage.java

@ -21,11 +21,9 @@ import java.nio.ByteBuffer; @@ -21,11 +21,9 @@ import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
/**
* Represents a "reactive" HTTP input message, consisting of
* {@linkplain #getHeaders() headers} and a readable
* {@linkplain #getBody() streaming body }.
* An "reactive" HTTP input message that exposes the input as {@link Publisher}.
*
* <p>Typically implemented by an HTTP request on the server-side, or a response
* <p>Typically implemented by an HTTP request on the server-side or a response
* on the client-side.
*
* @author Arjen Poutsma
@ -33,8 +31,8 @@ import org.reactivestreams.Publisher; @@ -33,8 +31,8 @@ import org.reactivestreams.Publisher;
public interface ReactiveHttpInputMessage extends HttpMessage {
/**
* Return the body of the message as an publisher of {@code ByteBuffer}s.
* @return the body
* Return the body of the message as a {@link Publisher}.
* @return the body content publisher
*/
Publisher<ByteBuffer> getBody();

15
spring-web-reactive/src/main/java/org/springframework/http/ReactiveHttpOutputMessage.java

@ -21,11 +21,9 @@ import java.nio.ByteBuffer; @@ -21,11 +21,9 @@ import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
/**
* Represents a "reactive" HTTP output message, consisting of
* {@linkplain #getHeaders() headers} and the capability to add a
* {@linkplain #setBody(Publisher) body}.
* A "reactive" HTTP output message that accepts output as a {@link Publisher}.
*
* <p>Typically implemented by an HTTP request on the client-side, or a response
* <p>Typically implemented by an HTTP request on the client-side or a response
* on the server-side.
*
* @author Arjen Poutsma
@ -33,12 +31,11 @@ import org.reactivestreams.Publisher; @@ -33,12 +31,11 @@ import org.reactivestreams.Publisher;
public interface ReactiveHttpOutputMessage extends HttpMessage {
/**
* Sets the body of this message to the given publisher of {@link ByteBuffer}s.
* The publisher will be used to write to the underlying HTTP layer with
* asynchronously, given pull demand by this layer.
* Set the body of the message to the given {@link Publisher} which will be
* used to write to the underlying HTTP layer.
*
* @param body the body to use
* @return a publisher that indicates completion
* @param body the body content publisher
* @return a publisher that indicates completion or error.
*/
Publisher<Void> setBody(Publisher<ByteBuffer> body);

42
spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java

@ -26,36 +26,30 @@ import reactor.io.net.http.HttpChannel; @@ -26,36 +26,30 @@ import reactor.io.net.http.HttpChannel;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.util.Assert;
/**
* Adapt {@link ServerHttpRequest} to the Reactor Net {@link HttpChannel}.
*
* @author Stephane Maldini
*/
public class ReactorServerHttpRequest implements ServerHttpRequest {
private final HttpChannel<Buffer, ?> channel;
private URI uri;
private HttpHeaders headers;
public ReactorServerHttpRequest(HttpChannel<Buffer, ?> request) {
Assert.notNull("'request', request must not be null.");
Assert.notNull("'request' must not be null.");
this.channel = request;
}
@Override
public HttpHeaders getHeaders() {
if (this.headers == null) {
this.headers = new HttpHeaders();
for (String name : this.channel.headers().names()) {
for (String value : this.channel.headers().getAll(name)) {
this.headers.add(name, value);
}
}
}
return this.headers;
public HttpChannel<Buffer, ?> getReactorChannel() {
return this.channel;
}
@Override
@ -65,12 +59,26 @@ public class ReactorServerHttpRequest implements ServerHttpRequest { @@ -65,12 +59,26 @@ public class ReactorServerHttpRequest implements ServerHttpRequest {
@Override
public URI getURI() {
try {
return new URI(this.channel.uri());
} catch (URISyntaxException ex) {
throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex);
if (this.uri == null) {
try {
this.uri = new URI(this.channel.uri());
}
catch (URISyntaxException ex) {
throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex);
}
}
return this.uri;
}
@Override
public HttpHeaders getHeaders() {
if (this.headers == null) {
this.headers = new HttpHeaders();
for (String name : this.channel.headers().names()) {
this.headers.put(name, this.channel.headers().getAll(name));
}
}
return this.headers;
}
@Override

31
spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java

@ -30,6 +30,8 @@ import org.springframework.http.HttpStatus; @@ -30,6 +30,8 @@ import org.springframework.http.HttpStatus;
import org.springframework.util.Assert;
/**
* Adapt {@link ServerHttpResponse} to the Reactor Net {@link HttpChannel}.
*
* @author Stephane Maldini
* @author Rossen Stoyanchev
*/
@ -41,21 +43,19 @@ public class ReactorServerHttpResponse implements ServerHttpResponse { @@ -41,21 +43,19 @@ public class ReactorServerHttpResponse implements ServerHttpResponse {
public ReactorServerHttpResponse(HttpChannel<?, Buffer> response) {
Assert.notNull("'response', response must not be null.");
Assert.notNull("'response' must not be null.");
this.channel = response;
this.headers = initHttpHeaders();
this.headers = new ExtendedHttpHeaders(new ReactorHeaderChangeListener());
}
private HttpHeaders initHttpHeaders() {
ExtendedHttpHeaders headers = new ExtendedHttpHeaders();
headers.registerChangeListener(new ReactorHeaderChangeListener());
return headers;
}
public HttpChannel<?, Buffer> getReactorChannel() {
return this.channel;
}
@Override
public void setStatusCode(HttpStatus status) {
this.channel.responseStatus(Status.valueOf(status.value()));
getReactorChannel().responseStatus(Status.valueOf(status.value()));
}
@Override
@ -65,8 +65,11 @@ public class ReactorServerHttpResponse implements ServerHttpResponse { @@ -65,8 +65,11 @@ public class ReactorServerHttpResponse implements ServerHttpResponse {
@Override
public Publisher<Void> setBody(Publisher<ByteBuffer> publisher) {
return Publishers.lift(publisher, new WriteWithOperator<>(writePublisher ->
this.channel.writeWith(Publishers.map(writePublisher, Buffer::new))));
return Publishers.lift(publisher, new WriteWithOperator<>(this::setBodyInternal));
}
protected Publisher<Void> setBodyInternal(Publisher<ByteBuffer> publisher) {
return getReactorChannel().writeWith(Publishers.map(publisher, Buffer::new));
}
@ -74,18 +77,18 @@ public class ReactorServerHttpResponse implements ServerHttpResponse { @@ -74,18 +77,18 @@ public class ReactorServerHttpResponse implements ServerHttpResponse {
@Override
public void headerAdded(String name, String value) {
channel.responseHeaders().add(name, value);
getReactorChannel().responseHeaders().add(name, value);
}
@Override
public void headerPut(String key, List<String> values) {
channel.responseHeaders().remove(key);
channel.responseHeaders().add(key, values);
getReactorChannel().responseHeaders().remove(key);
getReactorChannel().responseHeaders().add(key, values);
}
@Override
public void headerRemoved(String key) {
channel.responseHeaders().remove(key);
getReactorChannel().responseHeaders().remove(key);
}
}

52
spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpRequest.java

@ -31,6 +31,8 @@ import org.springframework.http.HttpMethod; @@ -31,6 +31,8 @@ import org.springframework.http.HttpMethod;
import org.springframework.util.Assert;
/**
* Adapt {@link ServerHttpRequest} to the RxNetty {@link HttpServerRequest}.
*
* @author Rossen Stoyanchev
* @author Stephane Maldini
*/
@ -38,6 +40,8 @@ public class RxNettyServerHttpRequest implements ServerHttpRequest { @@ -38,6 +40,8 @@ public class RxNettyServerHttpRequest implements ServerHttpRequest {
private final HttpServerRequest<ByteBuf> request;
private URI uri;
private HttpHeaders headers;
@ -47,44 +51,44 @@ public class RxNettyServerHttpRequest implements ServerHttpRequest { @@ -47,44 +51,44 @@ public class RxNettyServerHttpRequest implements ServerHttpRequest {
}
@Override
public HttpHeaders getHeaders() {
if (this.headers == null) {
this.headers = new HttpHeaders();
for (String name : this.request.getHeaderNames()) {
for (String value : this.request.getAllHeaderValues(name)) {
this.headers.add(name, value);
}
}
}
return this.headers;
public HttpServerRequest<ByteBuf> getRxNettyRequest() {
return this.request;
}
@Override
public HttpMethod getMethod() {
return HttpMethod.valueOf(this.request.getHttpMethod().name());
return HttpMethod.valueOf(this.getRxNettyRequest().getHttpMethod().name());
}
@Override
public URI getURI() {
try {
return new URI(this.request.getUri());
}
catch (URISyntaxException ex) {
throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex);
if (this.uri == null) {
try {
this.uri = new URI(this.getRxNettyRequest().getUri());
}
catch (URISyntaxException ex) {
throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex);
}
}
return this.uri;
}
@Override
public HttpHeaders getHeaders() {
if (this.headers == null) {
this.headers = new HttpHeaders();
for (String name : this.getRxNettyRequest().getHeaderNames()) {
this.headers.put(name, this.getRxNettyRequest().getAllHeaderValues(name));
}
}
return this.headers;
}
@Override
public Publisher<ByteBuffer> getBody() {
Observable<ByteBuffer> bytesContent = this.request.getContent()
.concatWith(Observable.empty())
.map(ByteBuf::nioBuffer);
return RxJava1Converter.from(bytesContent);
Observable<ByteBuffer> content = this.getRxNettyRequest().getContent().map(ByteBuf::nioBuffer);
content = content.concatWith(Observable.empty()); // See GH issue #58
return RxJava1Converter.from(content);
}
public Observable<ByteBuffer> asObservable() {
return this.request.getContent().map(ByteBuf::nioBuffer);
}
}

44
spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java

@ -32,6 +32,8 @@ import org.springframework.http.HttpStatus; @@ -32,6 +32,8 @@ import org.springframework.http.HttpStatus;
import org.springframework.util.Assert;
/**
* Adapt {@link ServerHttpResponse} to the RxNetty {@link HttpServerResponse}.
*
* @author Rossen Stoyanchev
* @author Stephane Maldini
*/
@ -45,19 +47,17 @@ public class RxNettyServerHttpResponse implements ServerHttpResponse { @@ -45,19 +47,17 @@ public class RxNettyServerHttpResponse implements ServerHttpResponse {
public RxNettyServerHttpResponse(HttpServerResponse<?> response) {
Assert.notNull("'response', response must not be null.");
this.response = response;
this.headers = initHttpHeaders();
this.headers = new ExtendedHttpHeaders(new RxNettyHeaderChangeListener());
}
private HttpHeaders initHttpHeaders() {
ExtendedHttpHeaders headers = new ExtendedHttpHeaders();
headers.registerChangeListener(new RxNettyHeaderChangeListener());
return headers;
}
public HttpServerResponse<?> getRxNettyResponse() {
return this.response;
}
@Override
public void setStatusCode(HttpStatus status) {
this.response.setStatus(HttpResponseStatus.valueOf(status.value()));
getRxNettyResponse().setStatus(HttpResponseStatus.valueOf(status.value()));
}
@Override
@ -67,15 +67,19 @@ public class RxNettyServerHttpResponse implements ServerHttpResponse { @@ -67,15 +67,19 @@ public class RxNettyServerHttpResponse implements ServerHttpResponse {
@Override
public Publisher<Void> setBody(Publisher<ByteBuffer> publisher) {
return Publishers.lift(publisher, new WriteWithOperator<>(writePublisher -> {
Observable<byte[]> observable = RxJava1Converter.from(writePublisher)
.map(buffer -> {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return bytes;
});
return RxJava1Converter.from(this.response.writeBytes(observable));
}));
return Publishers.lift(publisher, new WriteWithOperator<>(this::setBodyInternal));
}
protected Publisher<Void> setBodyInternal(Publisher<ByteBuffer> publisher) {
Observable<byte[]> content = RxJava1Converter.from(publisher).map(this::toBytes);
Observable<Void> completion = getRxNettyResponse().writeBytes(content);
return RxJava1Converter.from(completion);
}
private byte[] toBytes(ByteBuffer buffer) {
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return bytes;
}
@ -83,20 +87,20 @@ public class RxNettyServerHttpResponse implements ServerHttpResponse { @@ -83,20 +87,20 @@ public class RxNettyServerHttpResponse implements ServerHttpResponse {
@Override
public void headerAdded(String name, String value) {
response.addHeader(name, value);
getRxNettyResponse().addHeader(name, value);
}
@Override
public void headerPut(String key, List<String> values) {
response.removeHeader(key);
getRxNettyResponse().removeHeader(key);
for (String value : values) {
response.addHeader(key, value);
getRxNettyResponse().addHeader(key, value);
}
}
@Override
public void headerRemoved(String key) {
response.removeHeader(key);
getRxNettyResponse().removeHeader(key);
}
}

47
spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java

@ -43,6 +43,8 @@ import org.springframework.util.LinkedCaseInsensitiveMap; @@ -43,6 +43,8 @@ import org.springframework.util.LinkedCaseInsensitiveMap;
import org.springframework.util.StringUtils;
/**
* Adapt {@link ServerHttpRequest} to the Servlet {@link HttpServletRequest}.
*
* @author Rossen Stoyanchev
*/
public class ServletServerHttpRequest implements ServerHttpRequest {
@ -52,44 +54,55 @@ public class ServletServerHttpRequest implements ServerHttpRequest { @@ -52,44 +54,55 @@ public class ServletServerHttpRequest implements ServerHttpRequest {
private static final Log logger = LogFactory.getLog(ServletServerHttpRequest.class);
private final HttpServletRequest servletRequest;
private final HttpServletRequest request;
private URI uri;
private HttpHeaders headers;
private final RequestBodyPublisher requestBodyPublisher;
public ServletServerHttpRequest(HttpServletRequest servletRequest, ServletAsyncContextSynchronizer synchronizer) {
Assert.notNull(servletRequest, "HttpServletRequest must not be null");
this.servletRequest = servletRequest;
public ServletServerHttpRequest(HttpServletRequest request, ServletAsyncContextSynchronizer synchronizer) {
Assert.notNull(request, "'request' must not be null.");
this.request = request;
this.requestBodyPublisher = new RequestBodyPublisher(synchronizer, BUFFER_SIZE);
}
public HttpServletRequest getServletRequest() {
return this.request;
}
@Override
public HttpMethod getMethod() {
return HttpMethod.valueOf(this.servletRequest.getMethod());
return HttpMethod.valueOf(getServletRequest().getMethod());
}
@Override
public URI getURI() {
try {
return new URI(this.servletRequest.getScheme(), null, this.servletRequest.getServerName(),
this.servletRequest.getServerPort(), this.servletRequest.getRequestURI(),
this.servletRequest.getQueryString(), null);
}
catch (URISyntaxException ex) {
throw new IllegalStateException("Could not get HttpServletRequest URI: " + ex.getMessage(), ex);
if (this.uri == null) {
try {
this.uri = new URI(getServletRequest().getScheme(), null,
getServletRequest().getServerName(),
getServletRequest().getServerPort(),
getServletRequest().getRequestURI(),
getServletRequest().getQueryString(), null);
}
catch (URISyntaxException ex) {
throw new IllegalStateException("Could not get HttpServletRequest URI: " + ex.getMessage(), ex);
}
}
return this.uri;
}
@Override
public HttpHeaders getHeaders() {
if (this.headers == null) {
this.headers = new HttpHeaders();
for (Enumeration<?> names = this.servletRequest.getHeaderNames(); names.hasMoreElements(); ) {
for (Enumeration<?> names = getServletRequest().getHeaderNames(); names.hasMoreElements(); ) {
String headerName = (String) names.nextElement();
for (Enumeration<?> headerValues = this.servletRequest.getHeaders(headerName);
for (Enumeration<?> headerValues = getServletRequest().getHeaders(headerName);
headerValues.hasMoreElements(); ) {
String headerValue = (String) headerValues.nextElement();
this.headers.add(headerName, headerValue);
@ -98,14 +111,14 @@ public class ServletServerHttpRequest implements ServerHttpRequest { @@ -98,14 +111,14 @@ public class ServletServerHttpRequest implements ServerHttpRequest {
// HttpServletRequest exposes some headers as properties: we should include those if not already present
MediaType contentType = this.headers.getContentType();
if (contentType == null) {
String requestContentType = this.servletRequest.getContentType();
String requestContentType = getServletRequest().getContentType();
if (StringUtils.hasLength(requestContentType)) {
contentType = MediaType.parseMediaType(requestContentType);
this.headers.setContentType(contentType);
}
}
if (contentType != null && contentType.getCharSet() == null) {
String requestEncoding = this.servletRequest.getCharacterEncoding();
String requestEncoding = getServletRequest().getCharacterEncoding();
if (StringUtils.hasLength(requestEncoding)) {
Charset charSet = Charset.forName(requestEncoding);
Map<String, String> params = new LinkedCaseInsensitiveMap<>();
@ -116,7 +129,7 @@ public class ServletServerHttpRequest implements ServerHttpRequest { @@ -116,7 +129,7 @@ public class ServletServerHttpRequest implements ServerHttpRequest {
}
}
if (this.headers.getContentLength() == -1) {
int requestContentLength = this.servletRequest.getContentLength();
int requestContentLength = getServletRequest().getContentLength();
if (requestContentLength != -1) {
this.headers.setContentLength(requestContentLength);
}

26
spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java

@ -36,6 +36,8 @@ import org.springframework.http.HttpStatus; @@ -36,6 +36,8 @@ import org.springframework.http.HttpStatus;
import org.springframework.util.Assert;
/**
* Adapt {@link ServerHttpResponse} to the Servlet {@link HttpServletResponse}.
*
* @author Rossen Stoyanchev
*/
public class ServletServerHttpResponse implements ServerHttpResponse {
@ -53,20 +55,18 @@ public class ServletServerHttpResponse implements ServerHttpResponse { @@ -53,20 +55,18 @@ public class ServletServerHttpResponse implements ServerHttpResponse {
public ServletServerHttpResponse(HttpServletResponse response, ServletAsyncContextSynchronizer synchronizer) {
Assert.notNull(response, "'response' must not be null");
this.response = response;
this.headers = initHttpHeaders();
this.headers = new ExtendedHttpHeaders(new ServletHeaderChangeListener());
this.subscriber = new ResponseBodySubscriber(synchronizer);
}
private HttpHeaders initHttpHeaders() {
ExtendedHttpHeaders headers = new ExtendedHttpHeaders();
headers.registerChangeListener(new ServletHeaderChangeListener());
return headers;
}
public HttpServletResponse getServletResponse() {
return this.response;
}
@Override
public void setStatusCode(HttpStatus status) {
this.response.setStatus(status.value());
getServletResponse().setStatus(status.value());
}
@Override
@ -80,8 +80,11 @@ public class ServletServerHttpResponse implements ServerHttpResponse { @@ -80,8 +80,11 @@ public class ServletServerHttpResponse implements ServerHttpResponse {
@Override
public Publisher<Void> setBody(final Publisher<ByteBuffer> publisher) {
return Publishers.lift(publisher, new WriteWithOperator<>(writePublisher ->
(s -> writePublisher.subscribe(subscriber))));
return Publishers.lift(publisher, new WriteWithOperator<>(this::setBodyInternal));
}
protected Publisher<Void> setBodyInternal(Publisher<ByteBuffer> publisher) {
return s -> publisher.subscribe(subscriber);
}
@ -89,14 +92,14 @@ public class ServletServerHttpResponse implements ServerHttpResponse { @@ -89,14 +92,14 @@ public class ServletServerHttpResponse implements ServerHttpResponse {
@Override
public void headerAdded(String name, String value) {
response.addHeader(name, value);
getServletResponse().addHeader(name, value);
}
@Override
public void headerPut(String key, List<String> values) {
// We can only add but not remove
for (String value : values) {
response.addHeader(key, value);
getServletResponse().addHeader(key, value);
}
}
@ -106,6 +109,7 @@ public class ServletServerHttpResponse implements ServerHttpResponse { @@ -106,6 +109,7 @@ public class ServletServerHttpResponse implements ServerHttpResponse {
}
}
private static class ResponseBodySubscriber implements WriteListener, Subscriber<ByteBuffer> {
private final ServletAsyncContextSynchronizer synchronizer;

38
spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java

@ -36,12 +36,13 @@ import reactor.core.support.BackpressureUtils; @@ -36,12 +36,13 @@ import reactor.core.support.BackpressureUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.util.Assert;
import static org.xnio.IoUtils.safeClose;
/**
* Adapt {@link ServerHttpRequest} to the Underow {@link HttpServerExchange}.
*
* @author Marek Hawrylczak
* @author Rossen Stoyanchev
*/
@ -49,10 +50,12 @@ public class UndertowServerHttpRequest implements ServerHttpRequest { @@ -49,10 +50,12 @@ public class UndertowServerHttpRequest implements ServerHttpRequest {
private final HttpServerExchange exchange;
private final Publisher<ByteBuffer> body = new RequestBodyPublisher();
private URI uri;
private HttpHeaders headers;
private final Publisher<ByteBuffer> body = new RequestBodyPublisher();
public UndertowServerHttpRequest(HttpServerExchange exchange) {
Assert.notNull(exchange, "'exchange' is required.");
@ -60,31 +63,38 @@ public class UndertowServerHttpRequest implements ServerHttpRequest { @@ -60,31 +63,38 @@ public class UndertowServerHttpRequest implements ServerHttpRequest {
}
public HttpServerExchange getUndertowExchange() {
return this.exchange;
}
@Override
public HttpMethod getMethod() {
return HttpMethod.valueOf(this.exchange.getRequestMethod().toString());
return HttpMethod.valueOf(this.getUndertowExchange().getRequestMethod().toString());
}
@Override
public URI getURI() {
try {
return new URI(this.exchange.getRequestScheme(), null, this.exchange.getHostName(),
this.exchange.getHostPort(), this.exchange.getRequestURI(),
this.exchange.getQueryString(), null);
}
catch (URISyntaxException ex) {
throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex);
if (this.uri == null) {
try {
return new URI(this.getUndertowExchange().getRequestScheme(), null,
this.getUndertowExchange().getHostName(),
this.getUndertowExchange().getHostPort(),
this.getUndertowExchange().getRequestURI(),
this.getUndertowExchange().getQueryString(), null);
}
catch (URISyntaxException ex) {
throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex);
}
}
return this.uri;
}
@Override
public HttpHeaders getHeaders() {
if (this.headers == null) {
this.headers = new HttpHeaders();
for (HeaderValues headerValues : this.exchange.getRequestHeaders()) {
for (String value : headerValues) {
this.headers.add(headerValues.getHeaderName().toString(), value);
}
for (HeaderValues values : this.getUndertowExchange().getRequestHeaders()) {
this.headers.put(values.getHeaderName().toString(), values);
}
}
return this.headers;

33
spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java

@ -46,6 +46,8 @@ import static org.xnio.ChannelListeners.flushingChannelListener; @@ -46,6 +46,8 @@ import static org.xnio.ChannelListeners.flushingChannelListener;
import static org.xnio.IoUtils.safeClose;
/**
* Adapt {@link ServerHttpResponse} to the Undertow {@link HttpServerExchange}.
*
* @author Marek Hawrylczak
* @author Rossen Stoyanchev
*/
@ -56,28 +58,26 @@ public class UndertowServerHttpResponse implements ServerHttpResponse { @@ -56,28 +58,26 @@ public class UndertowServerHttpResponse implements ServerHttpResponse {
private final HttpServerExchange exchange;
private final ResponseBodySubscriber bodySubscriber = new ResponseBodySubscriber();
private final HttpHeaders headers;
private final ResponseBodySubscriber bodySubscriber = new ResponseBodySubscriber();
public UndertowServerHttpResponse(HttpServerExchange exchange) {
Assert.notNull(exchange, "'exchange' is required.");
this.exchange = exchange;
this.headers = initHttpHeaders();
this.headers = new ExtendedHttpHeaders(new UndertowHeaderChangeListener());
}
private HttpHeaders initHttpHeaders() {
ExtendedHttpHeaders headers = new ExtendedHttpHeaders();
headers.registerChangeListener(new UndertowHeaderChangeListener());
return headers;
}
public HttpServerExchange getUndertowExchange() {
return this.exchange;
}
@Override
public void setStatusCode(HttpStatus status) {
Assert.notNull(status);
this.exchange.setStatusCode(status.value());
getUndertowExchange().setStatusCode(status.value());
}
@Override
@ -87,8 +87,11 @@ public class UndertowServerHttpResponse implements ServerHttpResponse { @@ -87,8 +87,11 @@ public class UndertowServerHttpResponse implements ServerHttpResponse {
@Override
public Publisher<Void> setBody(Publisher<ByteBuffer> publisher) {
return Publishers.lift(publisher, new WriteWithOperator<>(writePublisher ->
(subscriber -> writePublisher.subscribe(bodySubscriber))));
return Publishers.lift(publisher, new WriteWithOperator<>(this::setBodyInternal));
}
protected Publisher<Void> setBodyInternal(Publisher<ByteBuffer> writePublisher) {
return subscriber -> writePublisher.subscribe(bodySubscriber);
}
@ -96,17 +99,19 @@ public class UndertowServerHttpResponse implements ServerHttpResponse { @@ -96,17 +99,19 @@ public class UndertowServerHttpResponse implements ServerHttpResponse {
@Override
public void headerAdded(String name, String value) {
exchange.getResponseHeaders().add(HttpString.tryFromString(name), value);
HttpString headerName = HttpString.tryFromString(name);
getUndertowExchange().getResponseHeaders().add(headerName, value);
}
@Override
public void headerPut(String key, List<String> values) {
exchange.getResponseHeaders().putAll(HttpString.tryFromString(key), values);
HttpString headerName = HttpString.tryFromString(key);
getUndertowExchange().getResponseHeaders().putAll(headerName, values);
}
@Override
public void headerRemoved(String key) {
exchange.getResponseHeaders().remove(key);
getUndertowExchange().getResponseHeaders().remove(key);
}
}

Loading…
Cancel
Save