diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java index 2e0a7359c0d..19307e24cf5 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java @@ -124,11 +124,6 @@ abstract class AbstractRequestBodyPublisher implements Publisher { */ protected abstract DataBuffer read() throws IOException; - /** - * Closes the input. - */ - protected abstract void close(); - private boolean hasDemand() { return this.demand.get() > 0; } @@ -294,9 +289,7 @@ abstract class AbstractRequestBodyPublisher implements Publisher { } void cancel(AbstractRequestBodyPublisher publisher) { - if (publisher.changeState(this, COMPLETED)) { - publisher.close(); - } + publisher.changeState(this, COMPLETED); } void onDataAvailable(AbstractRequestBodyPublisher publisher) { @@ -305,7 +298,6 @@ abstract class AbstractRequestBodyPublisher implements Publisher { void onAllDataRead(AbstractRequestBodyPublisher publisher) { if (publisher.changeState(this, COMPLETED)) { - publisher.close(); if (publisher.subscriber != null) { publisher.subscriber.onComplete(); } @@ -314,7 +306,6 @@ abstract class AbstractRequestBodyPublisher implements Publisher { void onError(AbstractRequestBodyPublisher publisher, Throwable t) { if (publisher.changeState(this, COMPLETED)) { - publisher.close(); if (publisher.subscriber != null) { publisher.subscriber.onError(t); } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletAsyncContextSynchronizer.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletAsyncContextSynchronizer.java deleted file mode 100644 index ce507c9729b..00000000000 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletAsyncContextSynchronizer.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright 2002-2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.http.server.reactive; - -import java.util.concurrent.atomic.AtomicInteger; -import javax.servlet.AsyncContext; -import javax.servlet.ServletRequest; -import javax.servlet.ServletResponse; - -/** - * Utility class for synchronizing between the reading and writing side of an - * {@link AsyncContext}. This class will simply call {@link AsyncContext#complete()} when - * both {@link #readComplete()} and {@link #writeComplete()} have been called. - * - * @author Arjen Poutsma - * @see AsyncContext - */ -final class ServletAsyncContextSynchronizer { - - private static final int NONE_COMPLETE = 0; - - private static final int READ_COMPLETE = 1; - - private static final int WRITE_COMPLETE = 1 << 1; - - private static final int COMPLETE = READ_COMPLETE | WRITE_COMPLETE; - - - private final AsyncContext asyncContext; - - private final AtomicInteger complete = new AtomicInteger(NONE_COMPLETE); - - - /** - * Creates a new {@code AsyncContextSynchronizer} based on the given context. - * @param asyncContext the context to base this synchronizer on - */ - public ServletAsyncContextSynchronizer(AsyncContext asyncContext) { - this.asyncContext = asyncContext; - } - - /** - * Returns the request of this synchronizer. - */ - public ServletRequest getRequest() { - return this.asyncContext.getRequest(); - } - - /** - * Returns the response of this synchronizer. - */ - public ServletResponse getResponse() { - return this.asyncContext.getResponse(); - } - - /** - * Completes the reading side of the asynchronous operation. When both this method and - * {@link #writeComplete()} have been called, the {@code AsyncContext} will be - * {@linkplain AsyncContext#complete() fully completed}. - */ - public void readComplete() { - if (complete.compareAndSet(WRITE_COMPLETE, COMPLETE)) { - this.asyncContext.complete(); - } - else { - this.complete.compareAndSet(NONE_COMPLETE, READ_COMPLETE); - } - } - - /** - * Completes the writing side of the asynchronous operation. When both this method and - * {@link #readComplete()} have been called, the {@code AsyncContext} will be - * {@linkplain AsyncContext#complete() fully completed}. - */ - public void writeComplete() { - if (complete.compareAndSet(READ_COMPLETE, COMPLETE)) { - this.asyncContext.complete(); - } - else { - this.complete.compareAndSet(NONE_COMPLETE, WRITE_COMPLETE); - } - } - - /** - * Completes both the reading and writing side of the asynchronous operation. - */ - public void complete() { - readComplete(); - writeComplete(); - } -} diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java index e95f39afe30..ce6e49a7b55 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java @@ -51,7 +51,6 @@ public class ServletHttpHandlerAdapter extends HttpServlet { private static Log logger = LogFactory.getLog(ServletHttpHandlerAdapter.class); - private HttpHandler handler; // Servlet is based on blocking I/O, hence the usage of non-direct, heap-based buffers @@ -60,7 +59,6 @@ public class ServletHttpHandlerAdapter extends HttpServlet { private int bufferSize = DEFAULT_BUFFER_SIZE; - public void setHandler(HttpHandler handler) { Assert.notNull(handler, "'handler' must not be null"); this.handler = handler; @@ -77,21 +75,21 @@ public class ServletHttpHandlerAdapter extends HttpServlet { } @Override - protected void service(HttpServletRequest servletRequest, HttpServletResponse servletResponse) - throws ServletException, IOException { + protected void service(HttpServletRequest servletRequest, + HttpServletResponse servletResponse) throws ServletException, IOException { - AsyncContext context = servletRequest.startAsync(); - ServletAsyncContextSynchronizer synchronizer = new ServletAsyncContextSynchronizer(context); + AsyncContext asyncContext = servletRequest.startAsync(); RequestBodyPublisher requestBody = - new RequestBodyPublisher(synchronizer, this.dataBufferFactory, - this.bufferSize); + new RequestBodyPublisher(servletRequest.getInputStream(), + this.dataBufferFactory, this.bufferSize); requestBody.registerListener(); ServletServerHttpRequest request = new ServletServerHttpRequest(servletRequest, requestBody); ResponseBodyProcessor responseBody = - new ResponseBodyProcessor(synchronizer, this.bufferSize); + new ResponseBodyProcessor(servletResponse.getOutputStream(), + this.bufferSize); responseBody.registerListener(); ServletServerHttpResponse response = new ServletServerHttpResponse(servletResponse, this.dataBufferFactory, @@ -101,20 +99,19 @@ public class ServletHttpHandlerAdapter extends HttpServlet { })); HandlerResultSubscriber resultSubscriber = - new HandlerResultSubscriber(synchronizer); + new HandlerResultSubscriber(asyncContext); this.handler.handle(request, response).subscribe(resultSubscriber); } private static class HandlerResultSubscriber implements Subscriber { - private final ServletAsyncContextSynchronizer synchronizer; + private final AsyncContext asyncContext; - public HandlerResultSubscriber(ServletAsyncContextSynchronizer synchronizer) { - this.synchronizer = synchronizer; + public HandlerResultSubscriber(AsyncContext asyncContext) { + this.asyncContext = asyncContext; } - @Override public void onSubscribe(Subscription subscription) { subscription.request(Long.MAX_VALUE); @@ -129,14 +126,14 @@ public class ServletHttpHandlerAdapter extends HttpServlet { public void onError(Throwable ex) { logger.error("Error from request handling. Completing the request.", ex); HttpServletResponse response = - (HttpServletResponse) this.synchronizer.getResponse(); + (HttpServletResponse) this.asyncContext.getResponse(); response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); - this.synchronizer.complete(); + this.asyncContext.complete(); } @Override public void onComplete() { - this.synchronizer.complete(); + this.asyncContext.complete(); } } @@ -145,44 +142,34 @@ public class ServletHttpHandlerAdapter extends HttpServlet { private final RequestBodyPublisher.RequestBodyReadListener readListener = new RequestBodyPublisher.RequestBodyReadListener(); - private final ServletAsyncContextSynchronizer synchronizer; + private final ServletInputStream inputStream; private final DataBufferFactory dataBufferFactory; private final byte[] buffer; - public RequestBodyPublisher(ServletAsyncContextSynchronizer synchronizer, + public RequestBodyPublisher(ServletInputStream inputStream, DataBufferFactory dataBufferFactory, int bufferSize) { - this.synchronizer = synchronizer; + this.inputStream = inputStream; this.dataBufferFactory = dataBufferFactory; this.buffer = new byte[bufferSize]; } public void registerListener() throws IOException { - inputStream().setReadListener(this.readListener); - } - - private ServletInputStream inputStream() throws IOException { - return this.synchronizer.getRequest().getInputStream(); + inputStream.setReadListener(this.readListener); } @Override protected void checkOnDataAvailable() { - try { - if (!inputStream().isFinished() && inputStream().isReady()) { - onDataAvailable(); - } - } - catch (IOException ex) { - onError(ex); + if (!inputStream.isFinished() && inputStream.isReady()) { + onDataAvailable(); } } @Override protected DataBuffer read() throws IOException { - ServletInputStream input = inputStream(); - if (input.isReady()) { - int read = input.read(this.buffer); + if (inputStream.isReady()) { + int read = inputStream.read(this.buffer); if (logger.isTraceEnabled()) { logger.trace("read:" + read); } @@ -196,12 +183,6 @@ public class ServletHttpHandlerAdapter extends HttpServlet { return null; } - @Override - protected void close() { - this.synchronizer.readComplete(); - - } - private class RequestBodyReadListener implements ReadListener { @Override @@ -227,46 +208,33 @@ public class ServletHttpHandlerAdapter extends HttpServlet { private final ResponseBodyWriteListener writeListener = new ResponseBodyWriteListener(); - private final ServletAsyncContextSynchronizer synchronizer; + private final ServletOutputStream outputStream; private final int bufferSize; private volatile boolean flushOnNext; - public ResponseBodyProcessor(ServletAsyncContextSynchronizer synchronizer, - int bufferSize) { - this.synchronizer = synchronizer; + public ResponseBodyProcessor(ServletOutputStream outputStream, int bufferSize) { + this.outputStream = outputStream; this.bufferSize = bufferSize; } public void registerListener() throws IOException { - outputStream().setWriteListener(this.writeListener); - } - - private ServletOutputStream outputStream() throws IOException { - return this.synchronizer.getResponse().getOutputStream(); + outputStream.setWriteListener(this.writeListener); } @Override protected boolean isWritePossible() { - try { - return outputStream().isReady(); - } - catch (IOException ex) { - onError(ex); - return false; - } + return outputStream.isReady(); } @Override protected boolean write(DataBuffer dataBuffer) throws IOException { - ServletOutputStream output = outputStream(); - if (this.flushOnNext) { flush(); } - boolean ready = output.isReady(); + boolean ready = outputStream.isReady(); if (this.logger.isTraceEnabled()) { this.logger.trace("write: " + dataBuffer + " ready: " + ready); @@ -288,13 +256,12 @@ public class ServletHttpHandlerAdapter extends HttpServlet { @Override protected void flush() throws IOException { - ServletOutputStream output = outputStream(); - if (output.isReady()) { + if (outputStream.isReady()) { if (logger.isTraceEnabled()) { this.logger.trace("flush"); } try { - output.flush(); + outputStream.flush(); this.flushOnNext = false; } catch (IOException ignored) { @@ -308,14 +275,13 @@ public class ServletHttpHandlerAdapter extends HttpServlet { private int writeDataBuffer(DataBuffer dataBuffer) throws IOException { InputStream input = dataBuffer.asInputStream(); - ServletOutputStream output = outputStream(); int bytesWritten = 0; byte[] buffer = new byte[this.bufferSize]; int bytesRead = -1; - while (output.isReady() && (bytesRead = input.read(buffer)) != -1) { - output.write(buffer, 0, bytesRead); + while (outputStream.isReady() && (bytesRead = input.read(buffer)) != -1) { + outputStream.write(buffer, 0, bytesRead); bytesWritten += bytesRead; } diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java index 1597a3af2c9..1649a25145f 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java @@ -26,7 +26,6 @@ import io.undertow.server.HttpServerExchange; import io.undertow.server.handlers.Cookie; import io.undertow.util.HeaderValues; import org.xnio.ChannelListener; -import org.xnio.IoUtils; import org.xnio.channels.StreamSourceChannel; import reactor.core.publisher.Flux; @@ -152,16 +151,6 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { return null; } - @Override - protected void close() { - if (this.pooledByteBuffer != null) { - IoUtils.safeClose(this.pooledByteBuffer); - } - if (this.requestChannel != null) { - IoUtils.safeClose(this.requestChannel); - } - } - private class ReadListener implements ChannelListener { @Override diff --git a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/AsyncContextSynchronizerTests.java b/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/AsyncContextSynchronizerTests.java deleted file mode 100644 index 158c7e022ee..00000000000 --- a/spring-web-reactive/src/test/java/org/springframework/http/server/reactive/AsyncContextSynchronizerTests.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.http.server.reactive; - -import javax.servlet.AsyncContext; - -import org.junit.Before; -import org.junit.Test; - -import static org.mockito.BDDMockito.mock; -import static org.mockito.BDDMockito.verify; - -/** - * @author Arjen Poutsma - */ -public class AsyncContextSynchronizerTests { - - private AsyncContext asyncContext; - - private ServletAsyncContextSynchronizer synchronizer; - - @Before - public void setUp() throws Exception { - asyncContext = mock(AsyncContext.class); - synchronizer = new ServletAsyncContextSynchronizer(asyncContext); - } - - @Test - public void readThenWrite() { - synchronizer.readComplete(); - synchronizer.writeComplete(); - - verify(asyncContext).complete(); - } - - @Test - public void writeThenRead() { - synchronizer.writeComplete(); - synchronizer.readComplete(); - - verify(asyncContext).complete(); - } -} \ No newline at end of file