From fe7ee5ff33af4ed494c09263d1a6764568601d9f Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 9 Dec 2016 18:22:19 +0200 Subject: [PATCH] Rename "Request/ResponseBody" publisher/processor AbstractRequestBodyPublisher and AbstractResponseBodyProcessor are now used for WebSocket messages too and have been renamed more generally to AbstractListenerReadPublisher and AbstractListenerWriteProcessor. Issue: SPR-14527 --- ...stractListenerWebSocketSessionSupport.java | 8 +-- ...ava => AbstractListenerReadPublisher.java} | 52 ++++++++++--------- ...va => AbstractListenerWriteProcessor.java} | 36 +++++++------ .../reactive/ServletServerHttpRequest.java | 2 +- .../reactive/ServletServerHttpResponse.java | 2 +- .../reactive/UndertowServerHttpRequest.java | 2 +- .../reactive/UndertowServerHttpResponse.java | 2 +- ...s.java => ListenerReadPublisherTests.java} | 8 +-- 8 files changed, 58 insertions(+), 54 deletions(-) rename spring-web/src/main/java/org/springframework/http/server/reactive/{AbstractRequestBodyPublisher.java => AbstractListenerReadPublisher.java} (80%) rename spring-web/src/main/java/org/springframework/http/server/reactive/{AbstractResponseBodyProcessor.java => AbstractListenerWriteProcessor.java} (84%) rename spring-web/src/test/java/org/springframework/http/server/reactive/{AbstractRequestBodyPublisherTests.java => ListenerReadPublisherTests.java} (88%) diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSessionSupport.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSessionSupport.java index 44fbe4bb849..dab993a0bc2 100644 --- a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSessionSupport.java +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSessionSupport.java @@ -22,8 +22,8 @@ import java.net.URI; import java.util.concurrent.atomic.AtomicBoolean; import org.reactivestreams.Publisher; -import org.springframework.http.server.reactive.AbstractRequestBodyPublisher; -import org.springframework.http.server.reactive.AbstractResponseBodyProcessor; +import org.springframework.http.server.reactive.AbstractListenerReadPublisher; +import org.springframework.http.server.reactive.AbstractListenerWriteProcessor; import org.springframework.util.Assert; import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.WebSocketMessage; @@ -122,7 +122,7 @@ public abstract class AbstractListenerWebSocketSessionSupport extends WebSock } } - final class WebSocketMessagePublisher extends AbstractRequestBodyPublisher { + final class WebSocketMessagePublisher extends AbstractListenerReadPublisher { private volatile WebSocketMessage webSocketMessage; @Override @@ -155,7 +155,7 @@ public abstract class AbstractListenerWebSocketSessionSupport extends WebSock } } - final class WebSocketMessageProcessor extends AbstractResponseBodyProcessor { + final class WebSocketMessageProcessor extends AbstractListenerWriteProcessor { private volatile boolean isReady = true; @Override diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java similarity index 80% rename from spring-web/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java rename to spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java index cfc6fc61270..51146b4ee95 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java @@ -32,15 +32,17 @@ import reactor.core.publisher.Operators; /** * Abstract base class for {@code Publisher} implementations that bridge between - * event-listener APIs and Reactive Streams. Specifically, base class for the - * Servlet 3.1 and Undertow support. + * event-listener read APIs and Reactive Streams. Specifically, a base class for + * reading from the HTTP request body with Servlet 3.1 and Undertow as well as + * handling incoming WebSocket messages with JSR-356, Jetty, and Undertow. * * @author Arjen Poutsma + * @author Violeta Georgieva * @since 5.0 * @see ServletServerHttpRequest * @see UndertowHttpHandlerAdapter */ -public abstract class AbstractRequestBodyPublisher implements Publisher { +public abstract class AbstractListenerReadPublisher implements Publisher { protected final Log logger = LogFactory.getLog(getClass()); @@ -155,11 +157,11 @@ public abstract class AbstractRequestBodyPublisher implements Publisher { } - private static final class RequestBodySubscription implements Subscription { + private static final class ReadSubscription implements Subscription { - private final AbstractRequestBodyPublisher publisher; + private final AbstractListenerReadPublisher publisher; - public RequestBodySubscription(AbstractRequestBodyPublisher publisher) { + public ReadSubscription(AbstractListenerReadPublisher publisher) { this.publisher = publisher; } @@ -207,15 +209,15 @@ public abstract class AbstractRequestBodyPublisher implements Publisher { /** * The initial unsubscribed state. Will respond to {@link - * #subscribe(AbstractRequestBodyPublisher, Subscriber)} by + * #subscribe(AbstractListenerReadPublisher, Subscriber)} by * changing state to {@link #NO_DEMAND}. */ UNSUBSCRIBED { @Override - void subscribe(AbstractRequestBodyPublisher publisher, Subscriber subscriber) { + void subscribe(AbstractListenerReadPublisher publisher, Subscriber subscriber) { Objects.requireNonNull(subscriber); if (publisher.changeState(this, NO_DEMAND)) { - Subscription subscription = new RequestBodySubscription(publisher); + Subscription subscription = new ReadSubscription(publisher); publisher.subscriber = subscriber; subscriber.onSubscribe(subscription); } @@ -227,13 +229,13 @@ public abstract class AbstractRequestBodyPublisher implements Publisher { /** * State that gets entered when there is no demand. Responds to {@link - * #request(AbstractRequestBodyPublisher, long)} by increasing the demand, + * #request(AbstractListenerReadPublisher, long)} by increasing the demand, * changing state to {@link #DEMAND} and will check whether there * is data available for reading. */ NO_DEMAND { @Override - void request(AbstractRequestBodyPublisher publisher, long n) { + void request(AbstractListenerReadPublisher publisher, long n) { if (Operators.checkRequest(n, publisher.subscriber)) { Operators.addAndGet(publisher.demand, n); if (publisher.changeState(this, DEMAND)) { @@ -245,20 +247,20 @@ public abstract class AbstractRequestBodyPublisher implements Publisher { /** * State that gets entered when there is demand. Responds to - * {@link #onDataAvailable(AbstractRequestBodyPublisher)} by + * {@link #onDataAvailable(AbstractListenerReadPublisher)} by * reading the available data. The state will be changed to * {@link #NO_DEMAND} if there is no demand. */ DEMAND { @Override - void request(AbstractRequestBodyPublisher publisher, long n) { + void request(AbstractListenerReadPublisher publisher, long n) { if (Operators.checkRequest(n, publisher.subscriber)) { Operators.addAndGet(publisher.demand, n); } } @Override - void onDataAvailable(AbstractRequestBodyPublisher publisher) { + void onDataAvailable(AbstractListenerReadPublisher publisher) { if (publisher.changeState(this, READING)) { try { boolean demandAvailable = publisher.readAndPublish(); @@ -279,7 +281,7 @@ public abstract class AbstractRequestBodyPublisher implements Publisher { READING { @Override - void request(AbstractRequestBodyPublisher publisher, long n) { + void request(AbstractListenerReadPublisher publisher, long n) { if (Operators.checkRequest(n, publisher.subscriber)) { Operators.addAndGet(publisher.demand, n); } @@ -291,40 +293,40 @@ public abstract class AbstractRequestBodyPublisher implements Publisher { */ COMPLETED { @Override - void request(AbstractRequestBodyPublisher publisher, long n) { + void request(AbstractListenerReadPublisher publisher, long n) { // ignore } @Override - void cancel(AbstractRequestBodyPublisher publisher) { + void cancel(AbstractListenerReadPublisher publisher) { // ignore } @Override - void onAllDataRead(AbstractRequestBodyPublisher publisher) { + void onAllDataRead(AbstractListenerReadPublisher publisher) { // ignore } @Override - void onError(AbstractRequestBodyPublisher publisher, Throwable t) { + void onError(AbstractListenerReadPublisher publisher, Throwable t) { // ignore } }; - void subscribe(AbstractRequestBodyPublisher publisher, Subscriber subscriber) { + void subscribe(AbstractListenerReadPublisher publisher, Subscriber subscriber) { throw new IllegalStateException(toString()); } - void request(AbstractRequestBodyPublisher publisher, long n) { + void request(AbstractListenerReadPublisher publisher, long n) { throw new IllegalStateException(toString()); } - void cancel(AbstractRequestBodyPublisher publisher) { + void cancel(AbstractListenerReadPublisher publisher) { publisher.changeState(this, COMPLETED); } - void onDataAvailable(AbstractRequestBodyPublisher publisher) { + void onDataAvailable(AbstractListenerReadPublisher publisher) { // ignore } - void onAllDataRead(AbstractRequestBodyPublisher publisher) { + void onAllDataRead(AbstractListenerReadPublisher publisher) { if (publisher.changeState(this, COMPLETED)) { if (publisher.subscriber != null) { publisher.subscriber.onComplete(); @@ -332,7 +334,7 @@ public abstract class AbstractRequestBodyPublisher implements Publisher { } } - void onError(AbstractRequestBodyPublisher publisher, Throwable t) { + void onError(AbstractListenerReadPublisher publisher, Throwable t) { if (publisher.changeState(this, COMPLETED)) { if (publisher.subscriber != null) { publisher.subscriber.onError(t); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java similarity index 84% rename from spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java rename to spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java index a272c62a36e..e4a97040c09 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java @@ -33,16 +33,18 @@ import org.springframework.util.Assert; /** * Abstract base class for {@code Processor} implementations that bridge between - * event-listener APIs and Reactive Streams. Specifically, base class for the - * Servlet 3.1 and Undertow support. + * event-listener write APIs and Reactive Streams. Specifically, base class for + * writing to the HTTP response body with Servlet 3.1 and Undertow support as + * well for writing WebSocket messages with JSR-356, Jetty, and Undertow. * * @author Arjen Poutsma + * @author Violeta Georgieva * @since 5.0 * @see ServletServerHttpRequest * @see UndertowHttpHandlerAdapter * @see ServerHttpResponse#writeWith(Publisher) */ -public abstract class AbstractResponseBodyProcessor implements Processor { +public abstract class AbstractListenerWriteProcessor implements Processor { protected final Log logger = LogFactory.getLog(getClass()); @@ -190,7 +192,7 @@ public abstract class AbstractResponseBodyProcessor implements Processor void onSubscribe(AbstractResponseBodyProcessor processor, Subscription subscription) { + public void onSubscribe(AbstractListenerWriteProcessor processor, Subscription subscription) { Objects.requireNonNull(subscription, "Subscription cannot be null"); if (processor.changeState(this, REQUESTED)) { processor.subscription = subscription; @@ -210,7 +212,7 @@ public abstract class AbstractResponseBodyProcessor implements Processor void onNext(AbstractResponseBodyProcessor processor, T data) { + public void onNext(AbstractListenerWriteProcessor processor, T data) { if (processor.isDataEmpty(data)) { processor.subscription.request(1); } @@ -223,7 +225,7 @@ public abstract class AbstractResponseBodyProcessor implements Processor void onComplete(AbstractResponseBodyProcessor processor) { + public void onComplete(AbstractListenerWriteProcessor processor) { if (processor.changeState(this, COMPLETED)) { processor.resultPublisher.publishComplete(); } @@ -241,7 +243,7 @@ public abstract class AbstractResponseBodyProcessor implements Processor void onWritePossible(AbstractResponseBodyProcessor processor) { + public void onWritePossible(AbstractListenerWriteProcessor processor) { if (processor.changeState(this, WRITING)) { T data = processor.currentData; try { @@ -270,7 +272,7 @@ public abstract class AbstractResponseBodyProcessor implements Processor void onComplete(AbstractResponseBodyProcessor processor) { + public void onComplete(AbstractListenerWriteProcessor processor) { processor.subscriberCompleted = true; } }, @@ -281,7 +283,7 @@ public abstract class AbstractResponseBodyProcessor implements Processor void onComplete(AbstractResponseBodyProcessor processor) { + public void onComplete(AbstractListenerWriteProcessor processor) { processor.subscriberCompleted = true; } }, @@ -291,40 +293,40 @@ public abstract class AbstractResponseBodyProcessor implements Processor void onNext(AbstractResponseBodyProcessor processor, T data) { + public void onNext(AbstractListenerWriteProcessor processor, T data) { // ignore } @Override - public void onError(AbstractResponseBodyProcessor processor, Throwable ex) { + public void onError(AbstractListenerWriteProcessor processor, Throwable ex) { // ignore } @Override - public void onComplete(AbstractResponseBodyProcessor processor) { + public void onComplete(AbstractListenerWriteProcessor processor) { // ignore } }; - public void onSubscribe(AbstractResponseBodyProcessor processor, Subscription subscription) { + public void onSubscribe(AbstractListenerWriteProcessor processor, Subscription subscription) { subscription.cancel(); } - public void onNext(AbstractResponseBodyProcessor processor, T data) { + public void onNext(AbstractListenerWriteProcessor processor, T data) { throw new IllegalStateException(toString()); } - public void onError(AbstractResponseBodyProcessor processor, Throwable ex) { + public void onError(AbstractListenerWriteProcessor processor, Throwable ex) { if (processor.changeState(this, COMPLETED)) { processor.resultPublisher.publishError(ex); } } - public void onComplete(AbstractResponseBodyProcessor processor) { + public void onComplete(AbstractListenerWriteProcessor processor) { throw new IllegalStateException(toString()); } - public void onWritePossible(AbstractResponseBodyProcessor processor) { + public void onWritePossible(AbstractListenerWriteProcessor processor) { // ignore } } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java index 58ee2f718bb..08d204fed1a 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java @@ -203,7 +203,7 @@ public class ServletServerHttpRequest extends AbstractServerHttpRequest { } - private static class RequestBodyPublisher extends AbstractRequestBodyPublisher { + private static class RequestBodyPublisher extends AbstractListenerReadPublisher { private final RequestBodyPublisher.RequestBodyReadListener readListener = new RequestBodyPublisher.RequestBodyReadListener(); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java index 20cc5a6019c..753f8bb45b8 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java @@ -184,7 +184,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons } - private class ResponseBodyProcessor extends AbstractResponseBodyProcessor { + private class ResponseBodyProcessor extends AbstractListenerWriteProcessor { private final ServletOutputStream outputStream; diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java index 54dd95af8e3..9065bae6a35 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java @@ -106,7 +106,7 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { return Flux.from(this.body); } - private static class RequestBodyPublisher extends AbstractRequestBodyPublisher { + private static class RequestBodyPublisher extends AbstractListenerReadPublisher { private final ChannelListener readListener = new ReadListener(); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java index aaefc898eb2..2f975503718 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java @@ -139,7 +139,7 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon } - private static class ResponseBodyProcessor extends AbstractResponseBodyProcessor { + private static class ResponseBodyProcessor extends AbstractListenerWriteProcessor { private final ChannelListener listener = new WriteListener(); diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisherTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/ListenerReadPublisherTests.java similarity index 88% rename from spring-web/src/test/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisherTests.java rename to spring-web/src/test/java/org/springframework/http/server/reactive/ListenerReadPublisherTests.java index cda831cc08c..15a663f65f6 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisherTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/ListenerReadPublisherTests.java @@ -32,12 +32,12 @@ import org.springframework.core.io.buffer.DataBuffer; import static org.junit.Assert.assertTrue; /** - * Unit tests for {@link AbstractRequestBodyPublisher} + * Unit tests for {@link AbstractListenerReadPublisher} * * @author Violeta Georgieva * @since 5.0 */ -public class AbstractRequestBodyPublisherTests { +public class ListenerReadPublisherTests { @Test public void testReceiveTwoRequestCallsWhenOnSubscribe() { @@ -45,14 +45,14 @@ public class AbstractRequestBodyPublisherTests { Subscriber subscriber = mock(Subscriber.class); doAnswer(new SubscriptionAnswer()).when(subscriber).onSubscribe(isA(Subscription.class)); - TestRequestBodyPublisher publisher = new TestRequestBodyPublisher(); + TestListenerReadPublisher publisher = new TestListenerReadPublisher(); publisher.subscribe(subscriber); publisher.onDataAvailable(); assertTrue(publisher.getReadCalls() == 2); } - private static final class TestRequestBodyPublisher extends AbstractRequestBodyPublisher { + private static final class TestListenerReadPublisher extends AbstractListenerReadPublisher { private int readCalls = 0;