From 9d03b77cdc0dffa716b3f945814b48142992f4f0 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Fri, 9 Dec 2016 20:57:51 +0200 Subject: [PATCH] Rename "ResponseBody" flush processor + use generics Issue: SPR-14527 --- ...va => AbstractListenerFlushProcessor.java} | 44 +++++++++---------- .../AbstractListenerWriteProcessor.java | 2 +- .../reactive/ServletServerHttpResponse.java | 2 +- .../reactive/UndertowServerHttpResponse.java | 2 +- ...blisher.java => WriteResultPublisher.java} | 39 ++++++++-------- 5 files changed, 44 insertions(+), 45 deletions(-) rename spring-web/src/main/java/org/springframework/http/server/reactive/{AbstractResponseBodyFlushProcessor.java => AbstractListenerFlushProcessor.java} (74%) rename spring-web/src/main/java/org/springframework/http/server/reactive/{ResponseBodyWriteResultPublisher.java => WriteResultPublisher.java} (75%) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerFlushProcessor.java similarity index 74% rename from spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java rename to spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerFlushProcessor.java index 56215d668a7..b4d259e943e 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractResponseBodyFlushProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerFlushProcessor.java @@ -27,8 +27,6 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import org.springframework.core.io.buffer.DataBuffer; - /** * Abstract base class for {@code Processor} implementations that bridge between * event-listener APIs and Reactive Streams. Specifically, base class for the @@ -41,11 +39,11 @@ import org.springframework.core.io.buffer.DataBuffer; * @see UndertowHttpHandlerAdapter * @see ServerHttpResponse#writeAndFlushWith(Publisher) */ -abstract class AbstractResponseBodyFlushProcessor implements Processor, Void> { +public abstract class AbstractListenerFlushProcessor implements Processor, Void> { protected final Log logger = LogFactory.getLog(getClass()); - private final ResponseBodyWriteResultPublisher resultPublisher = new ResponseBodyWriteResultPublisher(); + private final WriteResultPublisher resultPublisher = new WriteResultPublisher(); private final AtomicReference state = new AtomicReference<>(State.UNSUBSCRIBED); @@ -65,7 +63,7 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor publisher) { + public final void onNext(Publisher publisher) { if (logger.isTraceEnabled()) { logger.trace(this.state + " onNext: " + publisher); } @@ -100,7 +98,7 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor createBodyProcessor(); + protected abstract Processor createBodyProcessor(); /** * Flushes the output. @@ -130,7 +128,7 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor void onSubscribe(AbstractListenerFlushProcessor processor, Subscription subscription) { Objects.requireNonNull(subscription, "Subscription cannot be null"); if (processor.changeState(this, REQUESTED)) { processor.subscription = subscription; @@ -144,16 +142,16 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor chunk) { + public void onNext(AbstractListenerFlushProcessor processor, Publisher chunk) { if (processor.changeState(this, RECEIVED)) { - Processor chunkProcessor = processor.createBodyProcessor(); + Processor chunkProcessor = processor.createBodyProcessor(); chunk.subscribe(chunkProcessor); chunkProcessor.subscribe(new WriteSubscriber(processor)); } } @Override - public void onComplete(AbstractResponseBodyFlushProcessor processor) { + public void onComplete(AbstractListenerFlushProcessor processor) { if (processor.changeState(this, COMPLETED)) { processor.resultPublisher.publishComplete(); } @@ -162,7 +160,7 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor void writeComplete(AbstractListenerFlushProcessor processor) { try { processor.flush(); } @@ -184,58 +182,58 @@ abstract class AbstractResponseBodyFlushProcessor implements Processor void onComplete(AbstractListenerFlushProcessor processor) { processor.subscriberCompleted = true; } }, COMPLETED { @Override - public void onNext(AbstractResponseBodyFlushProcessor processor, - Publisher publisher) { + public void onNext(AbstractListenerFlushProcessor processor, + Publisher publisher) { // ignore } @Override - public void onError(AbstractResponseBodyFlushProcessor processor, Throwable t) { + public void onError(AbstractListenerFlushProcessor processor, Throwable t) { // ignore } @Override - public void onComplete(AbstractResponseBodyFlushProcessor processor) { + public void onComplete(AbstractListenerFlushProcessor processor) { // ignore } }; - public void onSubscribe(AbstractResponseBodyFlushProcessor processor, Subscription subscription) { + public void onSubscribe(AbstractListenerFlushProcessor processor, Subscription subscription) { subscription.cancel(); } - public void onNext(AbstractResponseBodyFlushProcessor processor, Publisher publisher) { + public void onNext(AbstractListenerFlushProcessor processor, Publisher publisher) { throw new IllegalStateException(toString()); } - public void onError(AbstractResponseBodyFlushProcessor processor, Throwable ex) { + public void onError(AbstractListenerFlushProcessor processor, Throwable ex) { if (processor.changeState(this, COMPLETED)) { processor.resultPublisher.publishError(ex); } } - public void onComplete(AbstractResponseBodyFlushProcessor processor) { + public void onComplete(AbstractListenerFlushProcessor processor) { throw new IllegalStateException(toString()); } - public void writeComplete(AbstractResponseBodyFlushProcessor processor) { + public void writeComplete(AbstractListenerFlushProcessor processor) { // ignore } private static class WriteSubscriber implements Subscriber { - private final AbstractResponseBodyFlushProcessor processor; + private final AbstractListenerFlushProcessor processor; - public WriteSubscriber(AbstractResponseBodyFlushProcessor processor) { + public WriteSubscriber(AbstractListenerFlushProcessor processor) { this.processor = processor; } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java index e4a97040c09..fb3b12a5fca 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java @@ -48,7 +48,7 @@ public abstract class AbstractListenerWriteProcessor implements Processor state = new AtomicReference<>(State.UNSUBSCRIBED); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java index 753f8bb45b8..2d403add8f3 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java @@ -273,7 +273,7 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons } - private class ResponseBodyFlushProcessor extends AbstractResponseBodyFlushProcessor { + private class ResponseBodyFlushProcessor extends AbstractListenerFlushProcessor { @Override protected Processor createBodyProcessor() { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java index 2f975503718..0e1c950c21d 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java @@ -218,7 +218,7 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon } } - private class ResponseBodyFlushProcessor extends AbstractResponseBodyFlushProcessor { + private class ResponseBodyFlushProcessor extends AbstractListenerFlushProcessor { @Override protected Processor createBodyProcessor() { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ResponseBodyWriteResultPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java similarity index 75% rename from spring-web/src/main/java/org/springframework/http/server/reactive/ResponseBodyWriteResultPublisher.java rename to spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java index 4a59fdcee9e..f239b9426b2 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ResponseBodyWriteResultPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java @@ -30,11 +30,12 @@ import reactor.core.publisher.Operators; * Publisher returned from {@link ServerHttpResponse#writeWith(Publisher)}. * * @author Arjen Poutsma + * @author Violeta Georgieva * @since 5.0 */ -class ResponseBodyWriteResultPublisher implements Publisher { +class WriteResultPublisher implements Publisher { - private static final Log logger = LogFactory.getLog(ResponseBodyWriteResultPublisher.class); + private static final Log logger = LogFactory.getLog(WriteResultPublisher.class); private final AtomicReference state = new AtomicReference<>(State.UNSUBSCRIBED); @@ -80,9 +81,9 @@ class ResponseBodyWriteResultPublisher implements Publisher { private static final class ResponseBodyWriteResultSubscription implements Subscription { - private final ResponseBodyWriteResultPublisher publisher; + private final WriteResultPublisher publisher; - public ResponseBodyWriteResultSubscription(ResponseBodyWriteResultPublisher publisher) { + public ResponseBodyWriteResultSubscription(WriteResultPublisher publisher) { this.publisher = publisher; } @@ -112,7 +113,7 @@ class ResponseBodyWriteResultPublisher implements Publisher { UNSUBSCRIBED { @Override - void subscribe(ResponseBodyWriteResultPublisher publisher, + void subscribe(WriteResultPublisher publisher, Subscriber subscriber) { Objects.requireNonNull(subscriber); if (publisher.changeState(this, SUBSCRIBED)) { @@ -132,28 +133,28 @@ class ResponseBodyWriteResultPublisher implements Publisher { } } @Override - void publishComplete(ResponseBodyWriteResultPublisher publisher) { + void publishComplete(WriteResultPublisher publisher) { publisher.publisherCompleted = true; } @Override - void publishError(ResponseBodyWriteResultPublisher publisher, Throwable t) { + void publishError(WriteResultPublisher publisher, Throwable t) { publisher.publisherError = t; } }, SUBSCRIBED { @Override - void request(ResponseBodyWriteResultPublisher publisher, long n) { + void request(WriteResultPublisher publisher, long n) { Operators.checkRequest(n, publisher.subscriber); } @Override - void publishComplete(ResponseBodyWriteResultPublisher publisher) { + void publishComplete(WriteResultPublisher publisher) { if (publisher.changeState(this, COMPLETED)) { publisher.subscriber.onComplete(); } } @Override - void publishError(ResponseBodyWriteResultPublisher publisher, Throwable t) { + void publishError(WriteResultPublisher publisher, Throwable t) { if (publisher.changeState(this, COMPLETED)) { publisher.subscriber.onError(t); } @@ -162,40 +163,40 @@ class ResponseBodyWriteResultPublisher implements Publisher { COMPLETED { @Override - void request(ResponseBodyWriteResultPublisher publisher, long n) { + void request(WriteResultPublisher publisher, long n) { // ignore } @Override - void cancel(ResponseBodyWriteResultPublisher publisher) { + void cancel(WriteResultPublisher publisher) { // ignore } @Override - void publishComplete(ResponseBodyWriteResultPublisher publisher) { + void publishComplete(WriteResultPublisher publisher) { // ignore } @Override - void publishError(ResponseBodyWriteResultPublisher publisher, Throwable t) { + void publishError(WriteResultPublisher publisher, Throwable t) { // ignore } }; - void subscribe(ResponseBodyWriteResultPublisher publisher, Subscriber subscriber) { + void subscribe(WriteResultPublisher publisher, Subscriber subscriber) { throw new IllegalStateException(toString()); } - void request(ResponseBodyWriteResultPublisher publisher, long n) { + void request(WriteResultPublisher publisher, long n) { throw new IllegalStateException(toString()); } - void cancel(ResponseBodyWriteResultPublisher publisher) { + void cancel(WriteResultPublisher publisher) { publisher.changeState(this, COMPLETED); } - void publishComplete(ResponseBodyWriteResultPublisher publisher) { + void publishComplete(WriteResultPublisher publisher) { throw new IllegalStateException(toString()); } - void publishError(ResponseBodyWriteResultPublisher publisher, Throwable t) { + void publishError(WriteResultPublisher publisher, Throwable t) { throw new IllegalStateException(toString()); } }