58 changed files with 1204 additions and 1308 deletions
@ -0,0 +1,321 @@
@@ -0,0 +1,321 @@
|
||||
/* |
||||
* Copyright 2002-2015 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.http.server.reactive; |
||||
|
||||
import java.io.IOException; |
||||
import java.net.URI; |
||||
import java.net.URISyntaxException; |
||||
import java.nio.ByteBuffer; |
||||
import java.nio.charset.Charset; |
||||
import java.util.Arrays; |
||||
import java.util.Enumeration; |
||||
import java.util.Map; |
||||
import java.util.concurrent.atomic.AtomicLong; |
||||
import javax.servlet.ReadListener; |
||||
import javax.servlet.ServletInputStream; |
||||
import javax.servlet.http.HttpServletRequest; |
||||
|
||||
import org.apache.commons.logging.Log; |
||||
import org.apache.commons.logging.LogFactory; |
||||
import org.reactivestreams.Publisher; |
||||
import org.reactivestreams.Subscriber; |
||||
import org.reactivestreams.Subscription; |
||||
|
||||
import org.springframework.http.HttpHeaders; |
||||
import org.springframework.http.HttpMethod; |
||||
import org.springframework.http.MediaType; |
||||
import org.springframework.util.Assert; |
||||
import org.springframework.util.LinkedCaseInsensitiveMap; |
||||
import org.springframework.util.StringUtils; |
||||
|
||||
/** |
||||
* @author Rossen Stoyanchev |
||||
*/ |
||||
public class ServletServerHttpRequest implements ServerHttpRequest { |
||||
|
||||
private static final int BUFFER_SIZE = 8192; |
||||
|
||||
private static final Log logger = LogFactory.getLog(ServletServerHttpRequest.class); |
||||
|
||||
|
||||
private final HttpServletRequest servletRequest; |
||||
|
||||
private HttpHeaders headers; |
||||
|
||||
private final RequestBodyPublisher requestBodyPublisher; |
||||
|
||||
|
||||
public ServletServerHttpRequest(HttpServletRequest servletRequest, ServletAsyncContextSynchronizer synchronizer) { |
||||
Assert.notNull(servletRequest, "HttpServletRequest must not be null"); |
||||
this.servletRequest = servletRequest; |
||||
this.requestBodyPublisher = new RequestBodyPublisher(synchronizer, BUFFER_SIZE); |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public HttpMethod getMethod() { |
||||
return HttpMethod.valueOf(this.servletRequest.getMethod()); |
||||
} |
||||
|
||||
@Override |
||||
public URI getURI() { |
||||
try { |
||||
return new URI(this.servletRequest.getScheme(), null, this.servletRequest.getServerName(), |
||||
this.servletRequest.getServerPort(), this.servletRequest.getRequestURI(), |
||||
this.servletRequest.getQueryString(), null); |
||||
} |
||||
catch (URISyntaxException ex) { |
||||
throw new IllegalStateException("Could not get HttpServletRequest URI: " + ex.getMessage(), ex); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public HttpHeaders getHeaders() { |
||||
if (this.headers == null) { |
||||
this.headers = new HttpHeaders(); |
||||
for (Enumeration<?> names = this.servletRequest.getHeaderNames(); names.hasMoreElements(); ) { |
||||
String headerName = (String) names.nextElement(); |
||||
for (Enumeration<?> headerValues = this.servletRequest.getHeaders(headerName); |
||||
headerValues.hasMoreElements(); ) { |
||||
String headerValue = (String) headerValues.nextElement(); |
||||
this.headers.add(headerName, headerValue); |
||||
} |
||||
} |
||||
// HttpServletRequest exposes some headers as properties: we should include those if not already present
|
||||
MediaType contentType = this.headers.getContentType(); |
||||
if (contentType == null) { |
||||
String requestContentType = this.servletRequest.getContentType(); |
||||
if (StringUtils.hasLength(requestContentType)) { |
||||
contentType = MediaType.parseMediaType(requestContentType); |
||||
this.headers.setContentType(contentType); |
||||
} |
||||
} |
||||
if (contentType != null && contentType.getCharSet() == null) { |
||||
String requestEncoding = this.servletRequest.getCharacterEncoding(); |
||||
if (StringUtils.hasLength(requestEncoding)) { |
||||
Charset charSet = Charset.forName(requestEncoding); |
||||
Map<String, String> params = new LinkedCaseInsensitiveMap<>(); |
||||
params.putAll(contentType.getParameters()); |
||||
params.put("charset", charSet.toString()); |
||||
MediaType newContentType = new MediaType(contentType.getType(), contentType.getSubtype(), params); |
||||
this.headers.setContentType(newContentType); |
||||
} |
||||
} |
||||
if (this.headers.getContentLength() == -1) { |
||||
int requestContentLength = this.servletRequest.getContentLength(); |
||||
if (requestContentLength != -1) { |
||||
this.headers.setContentLength(requestContentLength); |
||||
} |
||||
} |
||||
} |
||||
return this.headers; |
||||
} |
||||
|
||||
@Override |
||||
public Publisher<ByteBuffer> getBody() { |
||||
return this.requestBodyPublisher; |
||||
} |
||||
|
||||
ReadListener getReadListener() { |
||||
return this.requestBodyPublisher; |
||||
} |
||||
|
||||
|
||||
private static class RequestBodyPublisher implements ReadListener, Publisher<ByteBuffer> { |
||||
|
||||
private final ServletAsyncContextSynchronizer synchronizer; |
||||
|
||||
private final byte[] buffer; |
||||
|
||||
private final DemandCounter demand = new DemandCounter(); |
||||
|
||||
private Subscriber<? super ByteBuffer> subscriber; |
||||
|
||||
private boolean stalled; |
||||
|
||||
private boolean cancelled; |
||||
|
||||
|
||||
public RequestBodyPublisher(ServletAsyncContextSynchronizer synchronizer, int bufferSize) { |
||||
this.synchronizer = synchronizer; |
||||
this.buffer = new byte[bufferSize]; |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public void subscribe(Subscriber<? super ByteBuffer> subscriber) { |
||||
if (subscriber == null) { |
||||
throw new NullPointerException(); |
||||
} |
||||
else if (this.subscriber != null) { |
||||
subscriber.onError(new IllegalStateException("Only one subscriber allowed")); |
||||
} |
||||
this.subscriber = subscriber; |
||||
this.subscriber.onSubscribe(new RequestBodySubscription()); |
||||
} |
||||
|
||||
@Override |
||||
public void onDataAvailable() throws IOException { |
||||
if (cancelled) { |
||||
return; |
||||
} |
||||
ServletInputStream input = this.synchronizer.getInputStream(); |
||||
logger.debug("onDataAvailable: " + input); |
||||
|
||||
while (true) { |
||||
logger.debug("Demand: " + this.demand); |
||||
|
||||
if (!demand.hasDemand()) { |
||||
stalled = true; |
||||
break; |
||||
} |
||||
|
||||
boolean ready = input.isReady(); |
||||
logger.debug("Input ready: " + ready + " finished: " + input.isFinished()); |
||||
|
||||
if (!ready) { |
||||
break; |
||||
} |
||||
|
||||
int read = input.read(buffer); |
||||
logger.debug("Input read:" + read); |
||||
|
||||
if (read == -1) { |
||||
break; |
||||
} |
||||
else if (read > 0) { |
||||
this.demand.decrement(); |
||||
byte[] copy = Arrays.copyOf(this.buffer, read); |
||||
|
||||
// logger.debug("Next: " + new String(copy, UTF_8));
|
||||
|
||||
this.subscriber.onNext(ByteBuffer.wrap(copy)); |
||||
|
||||
} |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void onAllDataRead() throws IOException { |
||||
if (cancelled) { |
||||
return; |
||||
} |
||||
logger.debug("All data read"); |
||||
this.synchronizer.readComplete(); |
||||
if (this.subscriber != null) { |
||||
this.subscriber.onComplete(); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void onError(Throwable t) { |
||||
if (cancelled) { |
||||
return; |
||||
} |
||||
logger.error("RequestBodyPublisher Error", t); |
||||
this.synchronizer.readComplete(); |
||||
if (this.subscriber != null) { |
||||
this.subscriber.onError(t); |
||||
} |
||||
} |
||||
|
||||
private class RequestBodySubscription implements Subscription { |
||||
|
||||
@Override |
||||
public void request(long n) { |
||||
if (cancelled) { |
||||
return; |
||||
} |
||||
logger.debug("Updating demand " + demand + " by " + n); |
||||
|
||||
demand.increase(n); |
||||
|
||||
logger.debug("Stalled: " + stalled); |
||||
|
||||
if (stalled) { |
||||
stalled = false; |
||||
try { |
||||
onDataAvailable(); |
||||
} |
||||
catch (IOException ex) { |
||||
onError(ex); |
||||
} |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void cancel() { |
||||
if (cancelled) { |
||||
return; |
||||
} |
||||
cancelled = true; |
||||
synchronizer.readComplete(); |
||||
demand.reset(); |
||||
} |
||||
} |
||||
|
||||
|
||||
/** |
||||
* Small utility class for keeping track of Reactive Streams demand. |
||||
*/ |
||||
private static final class DemandCounter { |
||||
|
||||
private final AtomicLong demand = new AtomicLong(); |
||||
|
||||
/** |
||||
* Increases the demand by the given number |
||||
* @param n the positive number to increase demand by |
||||
* @return the increased demand |
||||
* @see org.reactivestreams.Subscription#request(long) |
||||
*/ |
||||
public long increase(long n) { |
||||
Assert.isTrue(n > 0, "'n' must be higher than 0"); |
||||
return demand.updateAndGet(d -> d != Long.MAX_VALUE ? d + n : Long.MAX_VALUE); |
||||
} |
||||
|
||||
/** |
||||
* Decreases the demand by one. |
||||
* @return the decremented demand |
||||
*/ |
||||
public long decrement() { |
||||
return demand.updateAndGet(d -> d != Long.MAX_VALUE ? d - 1 : Long.MAX_VALUE); |
||||
} |
||||
|
||||
/** |
||||
* Indicates whether this counter has demand, i.e. whether it is higher than 0. |
||||
* @return {@code true} if this counter has demand; {@code false} otherwise |
||||
*/ |
||||
public boolean hasDemand() { |
||||
return this.demand.get() > 0; |
||||
} |
||||
|
||||
/** |
||||
* Resets this counter to 0. |
||||
* @see org.reactivestreams.Subscription#cancel() |
||||
*/ |
||||
public void reset() { |
||||
this.demand.set(0); |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return demand.toString(); |
||||
} |
||||
} |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,298 @@
@@ -0,0 +1,298 @@
|
||||
/* |
||||
* Copyright 2002-2015 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.http.server.reactive; |
||||
|
||||
import java.io.IOException; |
||||
import java.net.URI; |
||||
import java.net.URISyntaxException; |
||||
import java.nio.ByteBuffer; |
||||
import java.util.concurrent.atomic.AtomicLongFieldUpdater; |
||||
|
||||
import io.undertow.connector.PooledByteBuffer; |
||||
import io.undertow.server.HttpServerExchange; |
||||
import io.undertow.util.HeaderValues; |
||||
import io.undertow.util.SameThreadExecutor; |
||||
import org.reactivestreams.Publisher; |
||||
import org.reactivestreams.Subscriber; |
||||
import org.reactivestreams.Subscription; |
||||
import org.xnio.ChannelListener; |
||||
import org.xnio.channels.StreamSourceChannel; |
||||
import reactor.core.error.SpecificationExceptions; |
||||
import reactor.core.support.BackpressureUtils; |
||||
|
||||
import org.springframework.http.HttpHeaders; |
||||
import org.springframework.http.HttpMethod; |
||||
import org.springframework.http.server.reactive.ServerHttpRequest; |
||||
import org.springframework.util.Assert; |
||||
|
||||
import static org.xnio.IoUtils.safeClose; |
||||
|
||||
/** |
||||
* @author Marek Hawrylczak |
||||
* @author Rossen Stoyanchev |
||||
*/ |
||||
public class UndertowServerHttpRequest implements ServerHttpRequest { |
||||
|
||||
private final HttpServerExchange exchange; |
||||
|
||||
private final Publisher<ByteBuffer> body = new RequestBodyPublisher(); |
||||
|
||||
private HttpHeaders headers; |
||||
|
||||
|
||||
public UndertowServerHttpRequest(HttpServerExchange exchange) { |
||||
Assert.notNull(exchange, "'exchange' is required."); |
||||
this.exchange = exchange; |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public HttpMethod getMethod() { |
||||
return HttpMethod.valueOf(this.exchange.getRequestMethod().toString()); |
||||
} |
||||
|
||||
@Override |
||||
public URI getURI() { |
||||
try { |
||||
return new URI(this.exchange.getRequestScheme(), null, this.exchange.getHostName(), |
||||
this.exchange.getHostPort(), this.exchange.getRequestURI(), |
||||
this.exchange.getQueryString(), null); |
||||
} |
||||
catch (URISyntaxException ex) { |
||||
throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public HttpHeaders getHeaders() { |
||||
if (this.headers == null) { |
||||
this.headers = new HttpHeaders(); |
||||
for (HeaderValues headerValues : this.exchange.getRequestHeaders()) { |
||||
for (String value : headerValues) { |
||||
this.headers.add(headerValues.getHeaderName().toString(), value); |
||||
} |
||||
} |
||||
} |
||||
return this.headers; |
||||
} |
||||
|
||||
@Override |
||||
public Publisher<ByteBuffer> getBody() { |
||||
return this.body; |
||||
} |
||||
|
||||
|
||||
private static final AtomicLongFieldUpdater<RequestBodyPublisher.RequestBodySubscription> DEMAND = |
||||
AtomicLongFieldUpdater.newUpdater(RequestBodyPublisher.RequestBodySubscription.class, "demand"); |
||||
|
||||
private class RequestBodyPublisher implements Publisher<ByteBuffer> { |
||||
|
||||
private Subscriber<? super ByteBuffer> subscriber; |
||||
|
||||
|
||||
@Override |
||||
public void subscribe(Subscriber<? super ByteBuffer> subscriber) { |
||||
if (subscriber == null) { |
||||
throw SpecificationExceptions.spec_2_13_exception(); |
||||
} |
||||
if (this.subscriber != null) { |
||||
subscriber.onError(new IllegalStateException("Only one subscriber allowed")); |
||||
} |
||||
|
||||
this.subscriber = subscriber; |
||||
this.subscriber.onSubscribe(new RequestBodySubscription()); |
||||
} |
||||
|
||||
|
||||
private class RequestBodySubscription implements Subscription, Runnable, |
||||
ChannelListener<StreamSourceChannel> { |
||||
|
||||
volatile long demand; |
||||
|
||||
private PooledByteBuffer pooledBuffer; |
||||
|
||||
private StreamSourceChannel channel; |
||||
|
||||
private boolean subscriptionClosed; |
||||
|
||||
private boolean draining; |
||||
|
||||
|
||||
@Override |
||||
public void request(long n) { |
||||
BackpressureUtils.checkRequest(n, subscriber); |
||||
if (this.subscriptionClosed) { |
||||
return; |
||||
} |
||||
BackpressureUtils.getAndAdd(DEMAND, this, n); |
||||
scheduleNextMessage(); |
||||
} |
||||
|
||||
private void scheduleNextMessage() { |
||||
exchange.dispatch(exchange.isInIoThread() ? SameThreadExecutor.INSTANCE : |
||||
exchange.getIoThread(), this); |
||||
} |
||||
|
||||
@Override |
||||
public void cancel() { |
||||
this.subscriptionClosed = true; |
||||
close(); |
||||
} |
||||
|
||||
private void close() { |
||||
if (this.pooledBuffer != null) { |
||||
safeClose(this.pooledBuffer); |
||||
this.pooledBuffer = null; |
||||
} |
||||
if (this.channel != null) { |
||||
safeClose(this.channel); |
||||
this.channel = null; |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void run() { |
||||
if (this.subscriptionClosed || this.draining) { |
||||
return; |
||||
} |
||||
if (0 == BackpressureUtils.getAndSub(DEMAND, this, 1)) { |
||||
return; |
||||
} |
||||
|
||||
this.draining = true; |
||||
|
||||
if (this.channel == null) { |
||||
this.channel = exchange.getRequestChannel(); |
||||
|
||||
if (this.channel == null) { |
||||
if (exchange.isRequestComplete()) { |
||||
return; |
||||
} |
||||
else { |
||||
throw new IllegalStateException("Failed to acquire channel!"); |
||||
} |
||||
} |
||||
} |
||||
if (this.pooledBuffer == null) { |
||||
this.pooledBuffer = exchange.getConnection().getByteBufferPool().allocate(); |
||||
} |
||||
else { |
||||
this.pooledBuffer.getBuffer().clear(); |
||||
} |
||||
|
||||
try { |
||||
ByteBuffer buffer = this.pooledBuffer.getBuffer(); |
||||
int count; |
||||
do { |
||||
count = this.channel.read(buffer); |
||||
if (count == 0) { |
||||
this.channel.getReadSetter().set(this); |
||||
this.channel.resumeReads(); |
||||
} |
||||
else if (count == -1) { |
||||
if (buffer.position() > 0) { |
||||
doOnNext(buffer); |
||||
} |
||||
doOnComplete(); |
||||
} |
||||
else { |
||||
if (buffer.remaining() == 0) { |
||||
if (this.demand == 0) { |
||||
this.channel.suspendReads(); |
||||
} |
||||
doOnNext(buffer); |
||||
if (this.demand > 0) { |
||||
scheduleNextMessage(); |
||||
} |
||||
break; |
||||
} |
||||
} |
||||
} while (count > 0); |
||||
} |
||||
catch (IOException e) { |
||||
doOnError(e); |
||||
} |
||||
} |
||||
|
||||
private void doOnNext(ByteBuffer buffer) { |
||||
this.draining = false; |
||||
buffer.flip(); |
||||
subscriber.onNext(buffer); |
||||
} |
||||
|
||||
private void doOnComplete() { |
||||
this.subscriptionClosed = true; |
||||
try { |
||||
subscriber.onComplete(); |
||||
} |
||||
finally { |
||||
close(); |
||||
} |
||||
} |
||||
|
||||
private void doOnError(Throwable t) { |
||||
this.subscriptionClosed = true; |
||||
try { |
||||
subscriber.onError(t); |
||||
} |
||||
finally { |
||||
close(); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void handleEvent(StreamSourceChannel channel) { |
||||
if (this.subscriptionClosed) { |
||||
return; |
||||
} |
||||
|
||||
try { |
||||
ByteBuffer buffer = this.pooledBuffer.getBuffer(); |
||||
int count; |
||||
do { |
||||
count = channel.read(buffer); |
||||
if (count == 0) { |
||||
return; |
||||
} |
||||
else if (count == -1) { |
||||
if (buffer.position() > 0) { |
||||
doOnNext(buffer); |
||||
} |
||||
doOnComplete(); |
||||
} |
||||
else { |
||||
if (buffer.remaining() == 0) { |
||||
if (this.demand == 0) { |
||||
channel.suspendReads(); |
||||
} |
||||
doOnNext(buffer); |
||||
if (this.demand > 0) { |
||||
scheduleNextMessage(); |
||||
} |
||||
break; |
||||
} |
||||
} |
||||
} while (count > 0); |
||||
} |
||||
catch (IOException e) { |
||||
doOnError(e); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
} |
||||
@ -0,0 +1,267 @@
@@ -0,0 +1,267 @@
|
||||
/* |
||||
* Copyright 2002-2015 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.http.server.reactive; |
||||
|
||||
import java.io.IOException; |
||||
import java.nio.ByteBuffer; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.Queue; |
||||
import java.util.concurrent.ConcurrentLinkedQueue; |
||||
import java.util.concurrent.atomic.AtomicBoolean; |
||||
import java.util.concurrent.atomic.AtomicInteger; |
||||
|
||||
import org.springframework.http.HttpHeaders; |
||||
import org.springframework.http.HttpStatus; |
||||
import org.springframework.util.Assert; |
||||
|
||||
import io.undertow.connector.PooledByteBuffer; |
||||
import io.undertow.server.HttpServerExchange; |
||||
import io.undertow.util.HttpString; |
||||
import org.apache.commons.logging.Log; |
||||
import org.apache.commons.logging.LogFactory; |
||||
import org.reactivestreams.Publisher; |
||||
import org.reactivestreams.Subscription; |
||||
import org.xnio.ChannelListener; |
||||
import org.xnio.channels.StreamSinkChannel; |
||||
import reactor.core.subscriber.BaseSubscriber; |
||||
|
||||
import static org.xnio.ChannelListeners.closingChannelExceptionHandler; |
||||
import static org.xnio.ChannelListeners.flushingChannelListener; |
||||
import static org.xnio.IoUtils.safeClose; |
||||
|
||||
/** |
||||
* @author Marek Hawrylczak |
||||
* @author Rossen Stoyanchev |
||||
*/ |
||||
public class UndertowServerHttpResponse implements ServerHttpResponse { |
||||
|
||||
private static final Log logger = LogFactory.getLog(UndertowServerHttpResponse.class); |
||||
|
||||
|
||||
private final HttpServerExchange exchange; |
||||
|
||||
private final ResponseBodySubscriber bodySubscriber = new ResponseBodySubscriber(); |
||||
|
||||
private final HttpHeaders headers = new HttpHeaders(); |
||||
|
||||
private boolean headersWritten = false; |
||||
|
||||
|
||||
public UndertowServerHttpResponse(HttpServerExchange exchange) { |
||||
Assert.notNull(exchange, "'exchange' is required."); |
||||
this.exchange = exchange; |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public void setStatusCode(HttpStatus status) { |
||||
Assert.notNull(status); |
||||
this.exchange.setStatusCode(status.value()); |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public Publisher<Void> setBody(Publisher<ByteBuffer> bodyPublisher) { |
||||
applyHeaders(); |
||||
return (subscriber -> bodyPublisher.subscribe(bodySubscriber)); |
||||
} |
||||
|
||||
@Override |
||||
public HttpHeaders getHeaders() { |
||||
return (this.headersWritten ? HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers); |
||||
} |
||||
|
||||
@Override |
||||
public Publisher<Void> writeHeaders() { |
||||
applyHeaders(); |
||||
return s -> s.onSubscribe(new Subscription() { |
||||
@Override |
||||
public void request(long n) { |
||||
s.onComplete(); |
||||
} |
||||
|
||||
@Override |
||||
public void cancel() { |
||||
} |
||||
}); |
||||
} |
||||
|
||||
private void applyHeaders() { |
||||
if (!this.headersWritten) { |
||||
for (Map.Entry<String, List<String>> entry : this.headers.entrySet()) { |
||||
HttpString headerName = HttpString.tryFromString(entry.getKey()); |
||||
this.exchange.getResponseHeaders().addAll(headerName, entry.getValue()); |
||||
|
||||
} |
||||
this.headersWritten = true; |
||||
} |
||||
} |
||||
|
||||
|
||||
private class ResponseBodySubscriber extends BaseSubscriber<ByteBuffer> |
||||
implements ChannelListener<StreamSinkChannel> { |
||||
|
||||
private Subscription subscription; |
||||
|
||||
private final Queue<PooledByteBuffer> buffers = new ConcurrentLinkedQueue<>(); |
||||
|
||||
private final AtomicInteger writing = new AtomicInteger(); |
||||
|
||||
private final AtomicBoolean closing = new AtomicBoolean(); |
||||
|
||||
private StreamSinkChannel responseChannel; |
||||
|
||||
|
||||
@Override |
||||
public void onSubscribe(Subscription subscription) { |
||||
super.onSubscribe(subscription); |
||||
this.subscription = subscription; |
||||
this.subscription.request(1); |
||||
} |
||||
|
||||
@Override |
||||
public void onNext(ByteBuffer buffer) { |
||||
super.onNext(buffer); |
||||
|
||||
if (this.responseChannel == null) { |
||||
this.responseChannel = exchange.getResponseChannel(); |
||||
} |
||||
|
||||
this.writing.incrementAndGet(); |
||||
try { |
||||
int c; |
||||
do { |
||||
c = this.responseChannel.write(buffer); |
||||
} while (buffer.hasRemaining() && c > 0); |
||||
|
||||
if (buffer.hasRemaining()) { |
||||
this.writing.incrementAndGet(); |
||||
enqueue(buffer); |
||||
this.responseChannel.getWriteSetter().set(this); |
||||
this.responseChannel.resumeWrites(); |
||||
} |
||||
else { |
||||
this.subscription.request(1); |
||||
} |
||||
|
||||
} |
||||
catch (IOException ex) { |
||||
onError(ex); |
||||
} |
||||
finally { |
||||
this.writing.decrementAndGet(); |
||||
if (this.closing.get()) { |
||||
closeIfDone(); |
||||
} |
||||
} |
||||
} |
||||
|
||||
private void enqueue(ByteBuffer src) { |
||||
do { |
||||
PooledByteBuffer buffer = exchange.getConnection().getByteBufferPool().allocate(); |
||||
ByteBuffer dst = buffer.getBuffer(); |
||||
copy(dst, src); |
||||
dst.flip(); |
||||
this.buffers.add(buffer); |
||||
} while (src.remaining() > 0); |
||||
} |
||||
|
||||
private void copy(ByteBuffer dst, ByteBuffer src) { |
||||
int n = Math.min(dst.capacity(), src.remaining()); |
||||
for (int i = 0; i < n; i++) { |
||||
dst.put(src.get()); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void handleEvent(StreamSinkChannel channel) { |
||||
try { |
||||
int c; |
||||
do { |
||||
ByteBuffer buffer = this.buffers.peek().getBuffer(); |
||||
do { |
||||
c = channel.write(buffer); |
||||
} while (buffer.hasRemaining() && c > 0); |
||||
|
||||
if (!buffer.hasRemaining()) { |
||||
safeClose(this.buffers.remove()); |
||||
} |
||||
} while (!this.buffers.isEmpty() && c > 0); |
||||
|
||||
if (!this.buffers.isEmpty()) { |
||||
channel.resumeWrites(); |
||||
} |
||||
else { |
||||
this.writing.decrementAndGet(); |
||||
|
||||
if (this.closing.get()) { |
||||
closeIfDone(); |
||||
} |
||||
else { |
||||
this.subscription.request(1); |
||||
} |
||||
} |
||||
} |
||||
catch (IOException ex) { |
||||
onError(ex); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void onError(Throwable ex) { |
||||
super.onError(ex); |
||||
logger.error("ResponseBodySubscriber error", ex); |
||||
if (!exchange.isResponseStarted() && exchange.getStatusCode() < 500) { |
||||
exchange.setStatusCode(500); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void onComplete() { |
||||
super.onComplete(); |
||||
if (this.responseChannel != null) { |
||||
this.closing.set(true); |
||||
closeIfDone(); |
||||
} |
||||
} |
||||
|
||||
private void closeIfDone() { |
||||
if (this.writing.get() == 0) { |
||||
if (this.closing.compareAndSet(true, false)) { |
||||
closeChannel(); |
||||
} |
||||
} |
||||
} |
||||
|
||||
private void closeChannel() { |
||||
try { |
||||
this.responseChannel.shutdownWrites(); |
||||
|
||||
if (!this.responseChannel.flush()) { |
||||
this.responseChannel.getWriteSetter().set(flushingChannelListener( |
||||
o -> safeClose(this.responseChannel), closingChannelExceptionHandler())); |
||||
this.responseChannel.resumeWrites(); |
||||
} |
||||
this.responseChannel = null; |
||||
} |
||||
catch (IOException ex) { |
||||
onError(ex); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
@ -1,218 +0,0 @@
@@ -1,218 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2015 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.http.server.servlet31; |
||||
|
||||
import java.io.IOException; |
||||
import java.nio.ByteBuffer; |
||||
import java.util.Arrays; |
||||
import java.util.concurrent.atomic.AtomicLong; |
||||
import javax.servlet.ReadListener; |
||||
import javax.servlet.ServletInputStream; |
||||
|
||||
import org.apache.commons.logging.Log; |
||||
import org.apache.commons.logging.LogFactory; |
||||
import org.reactivestreams.Publisher; |
||||
import org.reactivestreams.Subscriber; |
||||
import org.reactivestreams.Subscription; |
||||
|
||||
import org.springframework.util.Assert; |
||||
|
||||
/** |
||||
* @author Arjen Poutsma |
||||
*/ |
||||
public class RequestBodyPublisher implements ReadListener, Publisher<ByteBuffer> { |
||||
|
||||
private static final Log logger = LogFactory.getLog(RequestBodyPublisher.class); |
||||
|
||||
private final AsyncContextSynchronizer synchronizer; |
||||
|
||||
private final byte[] buffer; |
||||
|
||||
private final DemandCounter demand = new DemandCounter(); |
||||
|
||||
private Subscriber<? super ByteBuffer> subscriber; |
||||
|
||||
private boolean stalled; |
||||
|
||||
private boolean cancelled; |
||||
|
||||
public RequestBodyPublisher(AsyncContextSynchronizer synchronizer, int bufferSize) { |
||||
this.synchronizer = synchronizer; |
||||
this.buffer = new byte[bufferSize]; |
||||
} |
||||
|
||||
@Override |
||||
public void subscribe(Subscriber<? super ByteBuffer> subscriber) { |
||||
if (subscriber == null) { |
||||
throw new NullPointerException(); |
||||
} |
||||
else if (this.subscriber != null) { |
||||
subscriber.onError(new IllegalStateException("Only one subscriber allowed")); |
||||
} |
||||
this.subscriber = subscriber; |
||||
this.subscriber.onSubscribe(new RequestBodySubscription()); |
||||
} |
||||
|
||||
@Override |
||||
public void onDataAvailable() throws IOException { |
||||
if (cancelled) { |
||||
return; |
||||
} |
||||
ServletInputStream input = this.synchronizer.getInputStream(); |
||||
logger.debug("onDataAvailable: " + input); |
||||
|
||||
while (true) { |
||||
logger.debug("Demand: " + this.demand); |
||||
|
||||
if (!demand.hasDemand()) { |
||||
stalled = true; |
||||
break; |
||||
} |
||||
|
||||
boolean ready = input.isReady(); |
||||
logger.debug("Input ready: " + ready + " finished: " + input.isFinished()); |
||||
|
||||
if (!ready) { |
||||
break; |
||||
} |
||||
|
||||
int read = input.read(buffer); |
||||
logger.debug("Input read:" + read); |
||||
|
||||
if (read == -1) { |
||||
break; |
||||
} |
||||
else if (read > 0) { |
||||
this.demand.decrement(); |
||||
byte[] copy = Arrays.copyOf(this.buffer, read); |
||||
|
||||
// logger.debug("Next: " + new String(copy, UTF_8));
|
||||
|
||||
this.subscriber.onNext(ByteBuffer.wrap(copy)); |
||||
|
||||
} |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void onAllDataRead() throws IOException { |
||||
if (cancelled) { |
||||
return; |
||||
} |
||||
logger.debug("All data read"); |
||||
this.synchronizer.readComplete(); |
||||
if (this.subscriber != null) { |
||||
this.subscriber.onComplete(); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void onError(Throwable t) { |
||||
if (cancelled) { |
||||
return; |
||||
} |
||||
logger.error("RequestBodyPublisher Error", t); |
||||
this.synchronizer.readComplete(); |
||||
if (this.subscriber != null) { |
||||
this.subscriber.onError(t); |
||||
} |
||||
} |
||||
|
||||
private class RequestBodySubscription implements Subscription { |
||||
|
||||
@Override |
||||
public void request(long n) { |
||||
if (cancelled) { |
||||
return; |
||||
} |
||||
logger.debug("Updating demand " + demand + " by " + n); |
||||
|
||||
demand.increase(n); |
||||
|
||||
logger.debug("Stalled: " + stalled); |
||||
|
||||
if (stalled) { |
||||
stalled = false; |
||||
try { |
||||
onDataAvailable(); |
||||
} |
||||
catch (IOException ex) { |
||||
onError(ex); |
||||
} |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void cancel() { |
||||
if (cancelled) { |
||||
return; |
||||
} |
||||
cancelled = true; |
||||
synchronizer.readComplete(); |
||||
demand.reset(); |
||||
} |
||||
} |
||||
|
||||
|
||||
/** |
||||
* Small utility class for keeping track of Reactive Streams demand. |
||||
*/ |
||||
private static final class DemandCounter { |
||||
|
||||
private final AtomicLong demand = new AtomicLong(); |
||||
|
||||
/** |
||||
* Increases the demand by the given number |
||||
* @param n the positive number to increase demand by |
||||
* @return the increased demand |
||||
* @see org.reactivestreams.Subscription#request(long) |
||||
*/ |
||||
public long increase(long n) { |
||||
Assert.isTrue(n > 0, "'n' must be higher than 0"); |
||||
return demand.updateAndGet(d -> d != Long.MAX_VALUE ? d + n : Long.MAX_VALUE); |
||||
} |
||||
|
||||
/** |
||||
* Decreases the demand by one. |
||||
* @return the decremented demand |
||||
*/ |
||||
public long decrement() { |
||||
return demand.updateAndGet(d -> d != Long.MAX_VALUE ? d - 1 : Long.MAX_VALUE); |
||||
} |
||||
|
||||
/** |
||||
* Indicates whether this counter has demand, i.e. whether it is higher than 0. |
||||
* @return {@code true} if this counter has demand; {@code false} otherwise |
||||
*/ |
||||
public boolean hasDemand() { |
||||
return this.demand.get() > 0; |
||||
} |
||||
|
||||
/** |
||||
* Resets this counter to 0. |
||||
* @see org.reactivestreams.Subscription#cancel() |
||||
*/ |
||||
public void reset() { |
||||
this.demand.set(0); |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
return demand.toString(); |
||||
} |
||||
} |
||||
} |
||||
@ -1,112 +0,0 @@
@@ -1,112 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2015 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.http.server.servlet31; |
||||
|
||||
import java.io.IOException; |
||||
import java.nio.ByteBuffer; |
||||
import javax.servlet.ServletOutputStream; |
||||
import javax.servlet.WriteListener; |
||||
|
||||
import org.apache.commons.logging.Log; |
||||
import org.apache.commons.logging.LogFactory; |
||||
import org.reactivestreams.Subscriber; |
||||
import org.reactivestreams.Subscription; |
||||
|
||||
import org.springframework.util.Assert; |
||||
|
||||
/** |
||||
* @author Arjen Poutsma |
||||
*/ |
||||
public class ResponseBodySubscriber implements WriteListener, Subscriber<ByteBuffer> { |
||||
|
||||
private static final Log logger = LogFactory.getLog(ResponseBodySubscriber.class); |
||||
|
||||
private final AsyncContextSynchronizer synchronizer; |
||||
|
||||
private Subscription subscription; |
||||
|
||||
private ByteBuffer buffer; |
||||
|
||||
private volatile boolean subscriberComplete = false; |
||||
|
||||
public ResponseBodySubscriber(AsyncContextSynchronizer synchronizer) { |
||||
this.synchronizer = synchronizer; |
||||
} |
||||
|
||||
@Override |
||||
public void onSubscribe(Subscription subscription) { |
||||
this.subscription = subscription; |
||||
this.subscription.request(1); |
||||
} |
||||
|
||||
@Override |
||||
public void onNext(ByteBuffer bytes) { |
||||
|
||||
Assert.isNull(buffer); |
||||
|
||||
this.buffer = bytes; |
||||
try { |
||||
onWritePossible(); |
||||
} |
||||
catch (IOException e) { |
||||
onError(e); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void onComplete() { |
||||
logger.debug("Complete buffer: " + (buffer == null)); |
||||
|
||||
this.subscriberComplete = true; |
||||
|
||||
if (buffer == null) { |
||||
this.synchronizer.writeComplete(); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void onWritePossible() throws IOException { |
||||
ServletOutputStream output = this.synchronizer.getOutputStream(); |
||||
|
||||
boolean ready = output.isReady(); |
||||
logger.debug("Output: " + ready + " buffer: " + (buffer == null)); |
||||
|
||||
if (ready) { |
||||
if (this.buffer != null) { |
||||
byte[] bytes = new byte[this.buffer.remaining()]; |
||||
this.buffer.get(bytes); |
||||
this.buffer = null; |
||||
output.write(bytes); |
||||
if (!subscriberComplete) { |
||||
this.subscription.request(1); |
||||
} |
||||
else { |
||||
this.synchronizer.writeComplete(); |
||||
} |
||||
} |
||||
else { |
||||
this.subscription.request(1); |
||||
} |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void onError(Throwable t) { |
||||
logger.error("ResponseBodySubscriber error", t); |
||||
} |
||||
|
||||
} |
||||
@ -1,122 +0,0 @@
@@ -1,122 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2015 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.http.server.servlet31; |
||||
|
||||
import java.net.URI; |
||||
import java.net.URISyntaxException; |
||||
import java.nio.ByteBuffer; |
||||
import java.nio.charset.Charset; |
||||
import java.util.Enumeration; |
||||
import java.util.Map; |
||||
import javax.servlet.http.HttpServletRequest; |
||||
|
||||
import org.reactivestreams.Publisher; |
||||
|
||||
import org.springframework.http.HttpHeaders; |
||||
import org.springframework.http.HttpMethod; |
||||
import org.springframework.http.MediaType; |
||||
import org.springframework.http.server.ReactiveServerHttpRequest; |
||||
import org.springframework.util.Assert; |
||||
import org.springframework.util.LinkedCaseInsensitiveMap; |
||||
import org.springframework.util.StringUtils; |
||||
|
||||
/** |
||||
* @author Rossen Stoyanchev |
||||
*/ |
||||
public class Servlet31ServerHttpRequest implements ReactiveServerHttpRequest { |
||||
|
||||
private final HttpServletRequest servletRequest; |
||||
|
||||
private final Publisher<ByteBuffer> requestBodyPublisher; |
||||
|
||||
private HttpHeaders headers; |
||||
|
||||
|
||||
public Servlet31ServerHttpRequest(HttpServletRequest servletRequest, |
||||
Publisher<ByteBuffer> requestBodyPublisher) { |
||||
|
||||
Assert.notNull(servletRequest, "HttpServletRequest must not be null"); |
||||
this.servletRequest = servletRequest; |
||||
this.requestBodyPublisher = requestBodyPublisher; |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public HttpMethod getMethod() { |
||||
return HttpMethod.valueOf(this.servletRequest.getMethod()); |
||||
} |
||||
|
||||
@Override |
||||
public URI getURI() { |
||||
try { |
||||
return new URI(this.servletRequest.getScheme(), null, this.servletRequest.getServerName(), |
||||
this.servletRequest.getServerPort(), this.servletRequest.getRequestURI(), |
||||
this.servletRequest.getQueryString(), null); |
||||
} |
||||
catch (URISyntaxException ex) { |
||||
throw new IllegalStateException("Could not get HttpServletRequest URI: " + ex.getMessage(), ex); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public HttpHeaders getHeaders() { |
||||
if (this.headers == null) { |
||||
this.headers = new HttpHeaders(); |
||||
for (Enumeration<?> names = this.servletRequest.getHeaderNames(); names.hasMoreElements(); ) { |
||||
String headerName = (String) names.nextElement(); |
||||
for (Enumeration<?> headerValues = this.servletRequest.getHeaders(headerName); |
||||
headerValues.hasMoreElements(); ) { |
||||
String headerValue = (String) headerValues.nextElement(); |
||||
this.headers.add(headerName, headerValue); |
||||
} |
||||
} |
||||
// HttpServletRequest exposes some headers as properties: we should include those if not already present
|
||||
MediaType contentType = this.headers.getContentType(); |
||||
if (contentType == null) { |
||||
String requestContentType = this.servletRequest.getContentType(); |
||||
if (StringUtils.hasLength(requestContentType)) { |
||||
contentType = MediaType.parseMediaType(requestContentType); |
||||
this.headers.setContentType(contentType); |
||||
} |
||||
} |
||||
if (contentType != null && contentType.getCharSet() == null) { |
||||
String requestEncoding = this.servletRequest.getCharacterEncoding(); |
||||
if (StringUtils.hasLength(requestEncoding)) { |
||||
Charset charSet = Charset.forName(requestEncoding); |
||||
Map<String, String> params = new LinkedCaseInsensitiveMap<>(); |
||||
params.putAll(contentType.getParameters()); |
||||
params.put("charset", charSet.toString()); |
||||
MediaType newContentType = new MediaType(contentType.getType(), contentType.getSubtype(), params); |
||||
this.headers.setContentType(newContentType); |
||||
} |
||||
} |
||||
if (this.headers.getContentLength() == -1) { |
||||
int requestContentLength = this.servletRequest.getContentLength(); |
||||
if (requestContentLength != -1) { |
||||
this.headers.setContentLength(requestContentLength); |
||||
} |
||||
} |
||||
} |
||||
return this.headers; |
||||
} |
||||
|
||||
@Override |
||||
public Publisher<ByteBuffer> getBody() { |
||||
return this.requestBodyPublisher; |
||||
} |
||||
|
||||
} |
||||
@ -1,248 +0,0 @@
@@ -1,248 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2015 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.http.server.undertow; |
||||
|
||||
import static org.xnio.IoUtils.safeClose; |
||||
|
||||
import java.io.IOException; |
||||
import java.nio.ByteBuffer; |
||||
import java.util.concurrent.atomic.AtomicLongFieldUpdater; |
||||
|
||||
import org.springframework.util.Assert; |
||||
|
||||
import io.undertow.connector.PooledByteBuffer; |
||||
import io.undertow.server.HttpServerExchange; |
||||
import io.undertow.util.SameThreadExecutor; |
||||
import org.reactivestreams.Publisher; |
||||
import org.reactivestreams.Subscriber; |
||||
import org.reactivestreams.Subscription; |
||||
import org.xnio.ChannelListener; |
||||
import org.xnio.channels.StreamSourceChannel; |
||||
import reactor.core.error.SpecificationExceptions; |
||||
import reactor.core.support.BackpressureUtils; |
||||
|
||||
/** |
||||
* @author Marek Hawrylczak |
||||
*/ |
||||
class RequestBodyPublisher implements Publisher<ByteBuffer> { |
||||
|
||||
private static final AtomicLongFieldUpdater<RequestBodySubscription> DEMAND = |
||||
AtomicLongFieldUpdater.newUpdater(RequestBodySubscription.class, "demand"); |
||||
|
||||
|
||||
private final HttpServerExchange exchange; |
||||
|
||||
private Subscriber<? super ByteBuffer> subscriber; |
||||
|
||||
|
||||
public RequestBodyPublisher(HttpServerExchange exchange) { |
||||
Assert.notNull(exchange, "'exchange' is required."); |
||||
this.exchange = exchange; |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public void subscribe(Subscriber<? super ByteBuffer> subscriber) { |
||||
if (subscriber == null) { |
||||
throw SpecificationExceptions.spec_2_13_exception(); |
||||
} |
||||
if (this.subscriber != null) { |
||||
subscriber.onError(new IllegalStateException("Only one subscriber allowed")); |
||||
} |
||||
|
||||
this.subscriber = subscriber; |
||||
this.subscriber.onSubscribe(new RequestBodySubscription()); |
||||
} |
||||
|
||||
|
||||
private class RequestBodySubscription implements Subscription, Runnable, |
||||
ChannelListener<StreamSourceChannel> { |
||||
|
||||
volatile long demand; |
||||
|
||||
private PooledByteBuffer pooledBuffer; |
||||
|
||||
private StreamSourceChannel channel; |
||||
|
||||
private boolean subscriptionClosed; |
||||
|
||||
private boolean draining; |
||||
|
||||
|
||||
@Override |
||||
public void request(long n) { |
||||
BackpressureUtils.checkRequest(n, subscriber); |
||||
if (this.subscriptionClosed) { |
||||
return; |
||||
} |
||||
BackpressureUtils.getAndAdd(DEMAND, this, n); |
||||
scheduleNextMessage(); |
||||
} |
||||
|
||||
private void scheduleNextMessage() { |
||||
exchange.dispatch(exchange.isInIoThread() ? SameThreadExecutor.INSTANCE : |
||||
exchange.getIoThread(), this); |
||||
} |
||||
|
||||
@Override |
||||
public void cancel() { |
||||
this.subscriptionClosed = true; |
||||
close(); |
||||
} |
||||
|
||||
private void close() { |
||||
if (this.pooledBuffer != null) { |
||||
safeClose(this.pooledBuffer); |
||||
this.pooledBuffer = null; |
||||
} |
||||
if (this.channel != null) { |
||||
safeClose(this.channel); |
||||
this.channel = null; |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void run() { |
||||
if (this.subscriptionClosed || this.draining) { |
||||
return; |
||||
} |
||||
if (0 == BackpressureUtils.getAndSub(DEMAND, this, 1)) { |
||||
return; |
||||
} |
||||
|
||||
this.draining = true; |
||||
|
||||
if (this.channel == null) { |
||||
this.channel = exchange.getRequestChannel(); |
||||
|
||||
if (this.channel == null) { |
||||
if (exchange.isRequestComplete()) { |
||||
return; |
||||
} |
||||
else { |
||||
throw new IllegalStateException("Failed to acquire channel!"); |
||||
} |
||||
} |
||||
} |
||||
if (this.pooledBuffer == null) { |
||||
this.pooledBuffer = exchange.getConnection().getByteBufferPool().allocate(); |
||||
} |
||||
else { |
||||
this.pooledBuffer.getBuffer().clear(); |
||||
} |
||||
|
||||
try { |
||||
ByteBuffer buffer = this.pooledBuffer.getBuffer(); |
||||
int count; |
||||
do { |
||||
count = this.channel.read(buffer); |
||||
if (count == 0) { |
||||
this.channel.getReadSetter().set(this); |
||||
this.channel.resumeReads(); |
||||
} |
||||
else if (count == -1) { |
||||
if (buffer.position() > 0) { |
||||
doOnNext(buffer); |
||||
} |
||||
doOnComplete(); |
||||
} |
||||
else { |
||||
if (buffer.remaining() == 0) { |
||||
if (this.demand == 0) { |
||||
this.channel.suspendReads(); |
||||
} |
||||
doOnNext(buffer); |
||||
if (this.demand > 0) { |
||||
scheduleNextMessage(); |
||||
} |
||||
break; |
||||
} |
||||
} |
||||
} while (count > 0); |
||||
} |
||||
catch (IOException e) { |
||||
doOnError(e); |
||||
} |
||||
} |
||||
|
||||
private void doOnNext(ByteBuffer buffer) { |
||||
this.draining = false; |
||||
buffer.flip(); |
||||
subscriber.onNext(buffer); |
||||
} |
||||
|
||||
private void doOnComplete() { |
||||
this.subscriptionClosed = true; |
||||
try { |
||||
subscriber.onComplete(); |
||||
} |
||||
finally { |
||||
close(); |
||||
} |
||||
} |
||||
|
||||
private void doOnError(Throwable t) { |
||||
this.subscriptionClosed = true; |
||||
try { |
||||
subscriber.onError(t); |
||||
} |
||||
finally { |
||||
close(); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void handleEvent(StreamSourceChannel channel) { |
||||
if (this.subscriptionClosed) { |
||||
return; |
||||
} |
||||
|
||||
try { |
||||
ByteBuffer buffer = this.pooledBuffer.getBuffer(); |
||||
int count; |
||||
do { |
||||
count = channel.read(buffer); |
||||
if (count == 0) { |
||||
return; |
||||
} |
||||
else if (count == -1) { |
||||
if (buffer.position() > 0) { |
||||
doOnNext(buffer); |
||||
} |
||||
doOnComplete(); |
||||
} |
||||
else { |
||||
if (buffer.remaining() == 0) { |
||||
if (this.demand == 0) { |
||||
channel.suspendReads(); |
||||
} |
||||
doOnNext(buffer); |
||||
if (this.demand > 0) { |
||||
scheduleNextMessage(); |
||||
} |
||||
break; |
||||
} |
||||
} |
||||
} while (count > 0); |
||||
} |
||||
catch (IOException e) { |
||||
doOnError(e); |
||||
} |
||||
} |
||||
} |
||||
|
||||
} |
||||
@ -1,204 +0,0 @@
@@ -1,204 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2015 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.http.server.undertow; |
||||
|
||||
import java.io.IOException; |
||||
import java.nio.ByteBuffer; |
||||
import java.util.Queue; |
||||
import java.util.concurrent.ConcurrentLinkedQueue; |
||||
import java.util.concurrent.atomic.AtomicBoolean; |
||||
import java.util.concurrent.atomic.AtomicInteger; |
||||
|
||||
import io.undertow.connector.PooledByteBuffer; |
||||
import io.undertow.server.HttpServerExchange; |
||||
import org.apache.commons.logging.Log; |
||||
import org.apache.commons.logging.LogFactory; |
||||
import org.reactivestreams.Subscription; |
||||
import org.xnio.ChannelListener; |
||||
import org.xnio.channels.StreamSinkChannel; |
||||
import reactor.core.subscriber.BaseSubscriber; |
||||
|
||||
import static org.xnio.ChannelListeners.closingChannelExceptionHandler; |
||||
import static org.xnio.ChannelListeners.flushingChannelListener; |
||||
import static org.xnio.IoUtils.safeClose; |
||||
|
||||
/** |
||||
* @author Marek Hawrylczak |
||||
* @author Rossen Stoyanchev |
||||
*/ |
||||
class ResponseBodySubscriber extends BaseSubscriber<ByteBuffer> |
||||
implements ChannelListener<StreamSinkChannel> { |
||||
|
||||
private static final Log logger = LogFactory.getLog(ResponseBodySubscriber.class); |
||||
|
||||
|
||||
private final HttpServerExchange exchange; |
||||
|
||||
private Subscription subscription; |
||||
|
||||
private final Queue<PooledByteBuffer> buffers; |
||||
|
||||
private final AtomicInteger writing = new AtomicInteger(); |
||||
|
||||
private final AtomicBoolean closing = new AtomicBoolean(); |
||||
|
||||
private StreamSinkChannel responseChannel; |
||||
|
||||
|
||||
public ResponseBodySubscriber(HttpServerExchange exchange) { |
||||
this.exchange = exchange; |
||||
this.buffers = new ConcurrentLinkedQueue<>(); |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public void onSubscribe(Subscription subscription) { |
||||
super.onSubscribe(subscription); |
||||
this.subscription = subscription; |
||||
this.subscription.request(1); |
||||
} |
||||
|
||||
@Override |
||||
public void onNext(ByteBuffer buffer) { |
||||
super.onNext(buffer); |
||||
|
||||
if (this.responseChannel == null) { |
||||
this.responseChannel = this.exchange.getResponseChannel(); |
||||
} |
||||
|
||||
this.writing.incrementAndGet(); |
||||
try { |
||||
int c; |
||||
do { |
||||
c = this.responseChannel.write(buffer); |
||||
} while (buffer.hasRemaining() && c > 0); |
||||
|
||||
if (buffer.hasRemaining()) { |
||||
this.writing.incrementAndGet(); |
||||
enqueue(buffer); |
||||
this.responseChannel.getWriteSetter().set(this); |
||||
this.responseChannel.resumeWrites(); |
||||
} |
||||
else { |
||||
this.subscription.request(1); |
||||
} |
||||
|
||||
} |
||||
catch (IOException ex) { |
||||
onError(ex); |
||||
} |
||||
finally { |
||||
this.writing.decrementAndGet(); |
||||
if (this.closing.get()) { |
||||
closeIfDone(); |
||||
} |
||||
} |
||||
} |
||||
|
||||
private void enqueue(ByteBuffer src) { |
||||
do { |
||||
PooledByteBuffer buffer = this.exchange.getConnection().getByteBufferPool().allocate(); |
||||
ByteBuffer dst = buffer.getBuffer(); |
||||
copy(dst, src); |
||||
dst.flip(); |
||||
this.buffers.add(buffer); |
||||
} while (src.remaining() > 0); |
||||
} |
||||
|
||||
private void copy(ByteBuffer dst, ByteBuffer src) { |
||||
int n = Math.min(dst.capacity(), src.remaining()); |
||||
for (int i = 0; i < n; i++) { |
||||
dst.put(src.get()); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void handleEvent(StreamSinkChannel channel) { |
||||
try { |
||||
int c; |
||||
do { |
||||
ByteBuffer buffer = this.buffers.peek().getBuffer(); |
||||
do { |
||||
c = channel.write(buffer); |
||||
} while (buffer.hasRemaining() && c > 0); |
||||
|
||||
if (!buffer.hasRemaining()) { |
||||
safeClose(this.buffers.remove()); |
||||
} |
||||
} while (!this.buffers.isEmpty() && c > 0); |
||||
|
||||
if (!this.buffers.isEmpty()) { |
||||
channel.resumeWrites(); |
||||
} |
||||
else { |
||||
this.writing.decrementAndGet(); |
||||
|
||||
if (this.closing.get()) { |
||||
closeIfDone(); |
||||
} |
||||
else { |
||||
this.subscription.request(1); |
||||
} |
||||
} |
||||
} |
||||
catch (IOException ex) { |
||||
onError(ex); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void onError(Throwable ex) { |
||||
super.onError(ex); |
||||
logger.error("ResponseBodySubscriber error", ex); |
||||
if (!this.exchange.isResponseStarted() && this.exchange.getStatusCode() < 500) { |
||||
this.exchange.setStatusCode(500); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void onComplete() { |
||||
super.onComplete(); |
||||
if (this.responseChannel != null) { |
||||
this.closing.set(true); |
||||
closeIfDone(); |
||||
} |
||||
} |
||||
|
||||
private void closeIfDone() { |
||||
if (this.writing.get() == 0) { |
||||
if (this.closing.compareAndSet(true, false)) { |
||||
closeChannel(); |
||||
} |
||||
} |
||||
} |
||||
|
||||
private void closeChannel() { |
||||
try { |
||||
this.responseChannel.shutdownWrites(); |
||||
|
||||
if (!this.responseChannel.flush()) { |
||||
this.responseChannel.getWriteSetter().set(flushingChannelListener( |
||||
o -> safeClose(this.responseChannel), closingChannelExceptionHandler())); |
||||
this.responseChannel.resumeWrites(); |
||||
} |
||||
this.responseChannel = null; |
||||
} |
||||
catch (IOException ex) { |
||||
onError(ex); |
||||
} |
||||
} |
||||
} |
||||
@ -1,85 +0,0 @@
@@ -1,85 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2015 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.http.server.undertow; |
||||
|
||||
import java.net.URI; |
||||
import java.net.URISyntaxException; |
||||
import java.nio.ByteBuffer; |
||||
|
||||
import io.undertow.server.HttpServerExchange; |
||||
import io.undertow.util.HeaderValues; |
||||
import org.reactivestreams.Publisher; |
||||
|
||||
import org.springframework.http.HttpHeaders; |
||||
import org.springframework.http.HttpMethod; |
||||
import org.springframework.http.server.ReactiveServerHttpRequest; |
||||
|
||||
/** |
||||
* @author Marek Hawrylczak |
||||
* @author Rossen Stoyanchev |
||||
*/ |
||||
class UndertowServerHttpRequest implements ReactiveServerHttpRequest { |
||||
|
||||
private final HttpServerExchange exchange; |
||||
|
||||
private final Publisher<ByteBuffer> body; |
||||
|
||||
private HttpHeaders headers; |
||||
|
||||
|
||||
public UndertowServerHttpRequest(HttpServerExchange exchange, Publisher<ByteBuffer> body) { |
||||
this.exchange = exchange; |
||||
this.body = body; |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public HttpMethod getMethod() { |
||||
return HttpMethod.valueOf(this.exchange.getRequestMethod().toString()); |
||||
} |
||||
|
||||
@Override |
||||
public URI getURI() { |
||||
try { |
||||
return new URI(this.exchange.getRequestScheme(), null, this.exchange.getHostName(), |
||||
this.exchange.getHostPort(), this.exchange.getRequestURI(), |
||||
this.exchange.getQueryString(), null); |
||||
} |
||||
catch (URISyntaxException ex) { |
||||
throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public HttpHeaders getHeaders() { |
||||
if (this.headers == null) { |
||||
this.headers = new HttpHeaders(); |
||||
for (HeaderValues headerValues : this.exchange.getRequestHeaders()) { |
||||
for (String value : headerValues) { |
||||
this.headers.add(headerValues.getHeaderName().toString(), value); |
||||
} |
||||
} |
||||
} |
||||
return this.headers; |
||||
} |
||||
|
||||
@Override |
||||
public Publisher<ByteBuffer> getBody() { |
||||
return this.body; |
||||
} |
||||
|
||||
} |
||||
@ -1,98 +0,0 @@
@@ -1,98 +0,0 @@
|
||||
/* |
||||
* Copyright 2002-2015 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.http.server.undertow; |
||||
|
||||
import java.nio.ByteBuffer; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
|
||||
import org.springframework.http.HttpHeaders; |
||||
import org.springframework.http.HttpStatus; |
||||
import org.springframework.http.server.ReactiveServerHttpResponse; |
||||
import org.springframework.util.Assert; |
||||
|
||||
import io.undertow.server.HttpServerExchange; |
||||
import io.undertow.util.HttpString; |
||||
import org.reactivestreams.Publisher; |
||||
import org.reactivestreams.Subscription; |
||||
|
||||
/** |
||||
* @author Marek Hawrylczak |
||||
* @author Rossen Stoyanchev |
||||
*/ |
||||
class UndertowServerHttpResponse implements ReactiveServerHttpResponse { |
||||
|
||||
private final HttpServerExchange exchange; |
||||
|
||||
private final ResponseBodySubscriber bodySubscriber; |
||||
|
||||
private final HttpHeaders headers = new HttpHeaders(); |
||||
|
||||
private boolean headersWritten = false; |
||||
|
||||
|
||||
public UndertowServerHttpResponse(HttpServerExchange exchange, ResponseBodySubscriber body) { |
||||
this.exchange = exchange; |
||||
this.bodySubscriber = body; |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public void setStatusCode(HttpStatus status) { |
||||
Assert.notNull(status); |
||||
this.exchange.setStatusCode(status.value()); |
||||
} |
||||
|
||||
|
||||
@Override |
||||
public Publisher<Void> setBody(Publisher<ByteBuffer> bodyPublisher) { |
||||
applyHeaders(); |
||||
return (subscriber -> bodyPublisher.subscribe(bodySubscriber)); |
||||
} |
||||
|
||||
@Override |
||||
public HttpHeaders getHeaders() { |
||||
return (this.headersWritten ? HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers); |
||||
} |
||||
|
||||
@Override |
||||
public Publisher<Void> writeHeaders() { |
||||
applyHeaders(); |
||||
return s -> s.onSubscribe(new Subscription() { |
||||
@Override |
||||
public void request(long n) { |
||||
s.onComplete(); |
||||
} |
||||
|
||||
@Override |
||||
public void cancel() { |
||||
} |
||||
}); |
||||
} |
||||
|
||||
private void applyHeaders() { |
||||
if (!this.headersWritten) { |
||||
for (Map.Entry<String, List<String>> entry : this.headers.entrySet()) { |
||||
HttpString headerName = HttpString.tryFromString(entry.getKey()); |
||||
this.exchange.getResponseHeaders().addAll(headerName, entry.getValue()); |
||||
|
||||
} |
||||
this.headersWritten = true; |
||||
} |
||||
} |
||||
|
||||
} |
||||
Loading…
Reference in new issue