Browse Source

Removed ServletAsyncContextSynchronizer

pull/1111/head
Arjen Poutsma 10 years ago
parent
commit
e64907eed8
  1. 11
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java
  2. 105
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletAsyncContextSynchronizer.java
  3. 98
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java
  4. 11
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java
  5. 57
      spring-web-reactive/src/test/java/org/springframework/http/server/reactive/AsyncContextSynchronizerTests.java

11
spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java

@ -124,11 +124,6 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> { @@ -124,11 +124,6 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
*/
protected abstract DataBuffer read() throws IOException;
/**
* Closes the input.
*/
protected abstract void close();
private boolean hasDemand() {
return this.demand.get() > 0;
}
@ -294,9 +289,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> { @@ -294,9 +289,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
}
void cancel(AbstractRequestBodyPublisher publisher) {
if (publisher.changeState(this, COMPLETED)) {
publisher.close();
}
publisher.changeState(this, COMPLETED);
}
void onDataAvailable(AbstractRequestBodyPublisher publisher) {
@ -305,7 +298,6 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> { @@ -305,7 +298,6 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
void onAllDataRead(AbstractRequestBodyPublisher publisher) {
if (publisher.changeState(this, COMPLETED)) {
publisher.close();
if (publisher.subscriber != null) {
publisher.subscriber.onComplete();
}
@ -314,7 +306,6 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> { @@ -314,7 +306,6 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
void onError(AbstractRequestBodyPublisher publisher, Throwable t) {
if (publisher.changeState(this, COMPLETED)) {
publisher.close();
if (publisher.subscriber != null) {
publisher.subscriber.onError(t);
}

105
spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletAsyncContextSynchronizer.java

@ -1,105 +0,0 @@ @@ -1,105 +0,0 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.AsyncContext;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
/**
* Utility class for synchronizing between the reading and writing side of an
* {@link AsyncContext}. This class will simply call {@link AsyncContext#complete()} when
* both {@link #readComplete()} and {@link #writeComplete()} have been called.
*
* @author Arjen Poutsma
* @see AsyncContext
*/
final class ServletAsyncContextSynchronizer {
private static final int NONE_COMPLETE = 0;
private static final int READ_COMPLETE = 1;
private static final int WRITE_COMPLETE = 1 << 1;
private static final int COMPLETE = READ_COMPLETE | WRITE_COMPLETE;
private final AsyncContext asyncContext;
private final AtomicInteger complete = new AtomicInteger(NONE_COMPLETE);
/**
* Creates a new {@code AsyncContextSynchronizer} based on the given context.
* @param asyncContext the context to base this synchronizer on
*/
public ServletAsyncContextSynchronizer(AsyncContext asyncContext) {
this.asyncContext = asyncContext;
}
/**
* Returns the request of this synchronizer.
*/
public ServletRequest getRequest() {
return this.asyncContext.getRequest();
}
/**
* Returns the response of this synchronizer.
*/
public ServletResponse getResponse() {
return this.asyncContext.getResponse();
}
/**
* Completes the reading side of the asynchronous operation. When both this method and
* {@link #writeComplete()} have been called, the {@code AsyncContext} will be
* {@linkplain AsyncContext#complete() fully completed}.
*/
public void readComplete() {
if (complete.compareAndSet(WRITE_COMPLETE, COMPLETE)) {
this.asyncContext.complete();
}
else {
this.complete.compareAndSet(NONE_COMPLETE, READ_COMPLETE);
}
}
/**
* Completes the writing side of the asynchronous operation. When both this method and
* {@link #readComplete()} have been called, the {@code AsyncContext} will be
* {@linkplain AsyncContext#complete() fully completed}.
*/
public void writeComplete() {
if (complete.compareAndSet(READ_COMPLETE, COMPLETE)) {
this.asyncContext.complete();
}
else {
this.complete.compareAndSet(NONE_COMPLETE, WRITE_COMPLETE);
}
}
/**
* Completes both the reading and writing side of the asynchronous operation.
*/
public void complete() {
readComplete();
writeComplete();
}
}

98
spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java

@ -51,7 +51,6 @@ public class ServletHttpHandlerAdapter extends HttpServlet { @@ -51,7 +51,6 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
private static Log logger = LogFactory.getLog(ServletHttpHandlerAdapter.class);
private HttpHandler handler;
// Servlet is based on blocking I/O, hence the usage of non-direct, heap-based buffers
@ -60,7 +59,6 @@ public class ServletHttpHandlerAdapter extends HttpServlet { @@ -60,7 +59,6 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
private int bufferSize = DEFAULT_BUFFER_SIZE;
public void setHandler(HttpHandler handler) {
Assert.notNull(handler, "'handler' must not be null");
this.handler = handler;
@ -77,21 +75,21 @@ public class ServletHttpHandlerAdapter extends HttpServlet { @@ -77,21 +75,21 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
}
@Override
protected void service(HttpServletRequest servletRequest, HttpServletResponse servletResponse)
throws ServletException, IOException {
protected void service(HttpServletRequest servletRequest,
HttpServletResponse servletResponse) throws ServletException, IOException {
AsyncContext context = servletRequest.startAsync();
ServletAsyncContextSynchronizer synchronizer = new ServletAsyncContextSynchronizer(context);
AsyncContext asyncContext = servletRequest.startAsync();
RequestBodyPublisher requestBody =
new RequestBodyPublisher(synchronizer, this.dataBufferFactory,
this.bufferSize);
new RequestBodyPublisher(servletRequest.getInputStream(),
this.dataBufferFactory, this.bufferSize);
requestBody.registerListener();
ServletServerHttpRequest request =
new ServletServerHttpRequest(servletRequest, requestBody);
ResponseBodyProcessor responseBody =
new ResponseBodyProcessor(synchronizer, this.bufferSize);
new ResponseBodyProcessor(servletResponse.getOutputStream(),
this.bufferSize);
responseBody.registerListener();
ServletServerHttpResponse response =
new ServletServerHttpResponse(servletResponse, this.dataBufferFactory,
@ -101,20 +99,19 @@ public class ServletHttpHandlerAdapter extends HttpServlet { @@ -101,20 +99,19 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
}));
HandlerResultSubscriber resultSubscriber =
new HandlerResultSubscriber(synchronizer);
new HandlerResultSubscriber(asyncContext);
this.handler.handle(request, response).subscribe(resultSubscriber);
}
private static class HandlerResultSubscriber implements Subscriber<Void> {
private final ServletAsyncContextSynchronizer synchronizer;
private final AsyncContext asyncContext;
public HandlerResultSubscriber(ServletAsyncContextSynchronizer synchronizer) {
this.synchronizer = synchronizer;
public HandlerResultSubscriber(AsyncContext asyncContext) {
this.asyncContext = asyncContext;
}
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
@ -129,14 +126,14 @@ public class ServletHttpHandlerAdapter extends HttpServlet { @@ -129,14 +126,14 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
public void onError(Throwable ex) {
logger.error("Error from request handling. Completing the request.", ex);
HttpServletResponse response =
(HttpServletResponse) this.synchronizer.getResponse();
(HttpServletResponse) this.asyncContext.getResponse();
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
this.synchronizer.complete();
this.asyncContext.complete();
}
@Override
public void onComplete() {
this.synchronizer.complete();
this.asyncContext.complete();
}
}
@ -145,44 +142,34 @@ public class ServletHttpHandlerAdapter extends HttpServlet { @@ -145,44 +142,34 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
private final RequestBodyPublisher.RequestBodyReadListener readListener =
new RequestBodyPublisher.RequestBodyReadListener();
private final ServletAsyncContextSynchronizer synchronizer;
private final ServletInputStream inputStream;
private final DataBufferFactory dataBufferFactory;
private final byte[] buffer;
public RequestBodyPublisher(ServletAsyncContextSynchronizer synchronizer,
public RequestBodyPublisher(ServletInputStream inputStream,
DataBufferFactory dataBufferFactory, int bufferSize) {
this.synchronizer = synchronizer;
this.inputStream = inputStream;
this.dataBufferFactory = dataBufferFactory;
this.buffer = new byte[bufferSize];
}
public void registerListener() throws IOException {
inputStream().setReadListener(this.readListener);
}
private ServletInputStream inputStream() throws IOException {
return this.synchronizer.getRequest().getInputStream();
inputStream.setReadListener(this.readListener);
}
@Override
protected void checkOnDataAvailable() {
try {
if (!inputStream().isFinished() && inputStream().isReady()) {
onDataAvailable();
}
}
catch (IOException ex) {
onError(ex);
if (!inputStream.isFinished() && inputStream.isReady()) {
onDataAvailable();
}
}
@Override
protected DataBuffer read() throws IOException {
ServletInputStream input = inputStream();
if (input.isReady()) {
int read = input.read(this.buffer);
if (inputStream.isReady()) {
int read = inputStream.read(this.buffer);
if (logger.isTraceEnabled()) {
logger.trace("read:" + read);
}
@ -196,12 +183,6 @@ public class ServletHttpHandlerAdapter extends HttpServlet { @@ -196,12 +183,6 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
return null;
}
@Override
protected void close() {
this.synchronizer.readComplete();
}
private class RequestBodyReadListener implements ReadListener {
@Override
@ -227,46 +208,33 @@ public class ServletHttpHandlerAdapter extends HttpServlet { @@ -227,46 +208,33 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
private final ResponseBodyWriteListener writeListener =
new ResponseBodyWriteListener();
private final ServletAsyncContextSynchronizer synchronizer;
private final ServletOutputStream outputStream;
private final int bufferSize;
private volatile boolean flushOnNext;
public ResponseBodyProcessor(ServletAsyncContextSynchronizer synchronizer,
int bufferSize) {
this.synchronizer = synchronizer;
public ResponseBodyProcessor(ServletOutputStream outputStream, int bufferSize) {
this.outputStream = outputStream;
this.bufferSize = bufferSize;
}
public void registerListener() throws IOException {
outputStream().setWriteListener(this.writeListener);
}
private ServletOutputStream outputStream() throws IOException {
return this.synchronizer.getResponse().getOutputStream();
outputStream.setWriteListener(this.writeListener);
}
@Override
protected boolean isWritePossible() {
try {
return outputStream().isReady();
}
catch (IOException ex) {
onError(ex);
return false;
}
return outputStream.isReady();
}
@Override
protected boolean write(DataBuffer dataBuffer) throws IOException {
ServletOutputStream output = outputStream();
if (this.flushOnNext) {
flush();
}
boolean ready = output.isReady();
boolean ready = outputStream.isReady();
if (this.logger.isTraceEnabled()) {
this.logger.trace("write: " + dataBuffer + " ready: " + ready);
@ -288,13 +256,12 @@ public class ServletHttpHandlerAdapter extends HttpServlet { @@ -288,13 +256,12 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
@Override
protected void flush() throws IOException {
ServletOutputStream output = outputStream();
if (output.isReady()) {
if (outputStream.isReady()) {
if (logger.isTraceEnabled()) {
this.logger.trace("flush");
}
try {
output.flush();
outputStream.flush();
this.flushOnNext = false;
}
catch (IOException ignored) {
@ -308,14 +275,13 @@ public class ServletHttpHandlerAdapter extends HttpServlet { @@ -308,14 +275,13 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
private int writeDataBuffer(DataBuffer dataBuffer) throws IOException {
InputStream input = dataBuffer.asInputStream();
ServletOutputStream output = outputStream();
int bytesWritten = 0;
byte[] buffer = new byte[this.bufferSize];
int bytesRead = -1;
while (output.isReady() && (bytesRead = input.read(buffer)) != -1) {
output.write(buffer, 0, bytesRead);
while (outputStream.isReady() && (bytesRead = input.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
bytesWritten += bytesRead;
}

11
spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java

@ -26,7 +26,6 @@ import io.undertow.server.HttpServerExchange; @@ -26,7 +26,6 @@ import io.undertow.server.HttpServerExchange;
import io.undertow.server.handlers.Cookie;
import io.undertow.util.HeaderValues;
import org.xnio.ChannelListener;
import org.xnio.IoUtils;
import org.xnio.channels.StreamSourceChannel;
import reactor.core.publisher.Flux;
@ -152,16 +151,6 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { @@ -152,16 +151,6 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest {
return null;
}
@Override
protected void close() {
if (this.pooledByteBuffer != null) {
IoUtils.safeClose(this.pooledByteBuffer);
}
if (this.requestChannel != null) {
IoUtils.safeClose(this.requestChannel);
}
}
private class ReadListener implements ChannelListener<StreamSourceChannel> {
@Override

57
spring-web-reactive/src/test/java/org/springframework/http/server/reactive/AsyncContextSynchronizerTests.java

@ -1,57 +0,0 @@ @@ -1,57 +0,0 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.server.reactive;
import javax.servlet.AsyncContext;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.BDDMockito.mock;
import static org.mockito.BDDMockito.verify;
/**
* @author Arjen Poutsma
*/
public class AsyncContextSynchronizerTests {
private AsyncContext asyncContext;
private ServletAsyncContextSynchronizer synchronizer;
@Before
public void setUp() throws Exception {
asyncContext = mock(AsyncContext.class);
synchronizer = new ServletAsyncContextSynchronizer(asyncContext);
}
@Test
public void readThenWrite() {
synchronizer.readComplete();
synchronizer.writeComplete();
verify(asyncContext).complete();
}
@Test
public void writeThenRead() {
synchronizer.writeComplete();
synchronizer.readComplete();
verify(asyncContext).complete();
}
}
Loading…
Cancel
Save