Browse Source

Polish

pull/1605/head
Rossen Stoyanchev 8 years ago
parent
commit
102a0ad792
  1. 20
      spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java
  2. 86
      spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java
  3. 44
      spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java
  4. 16
      spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java

20
spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java

@ -73,18 +73,30 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { @@ -73,18 +73,30 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
}
// Methods for sub-classes to delegate to, when async I/O events occur...
// Async I/O notification methods...
/**
* Invoked when reading is possible, either in the same thread after a check
* via {@link #checkOnDataAvailable()}, or as a callback from the underlying
* container.
*/
public final void onDataAvailable() {
this.logger.trace("I/O event onDataAvailable");
this.state.get().onDataAvailable(this);
}
/**
* Sub-classes can call this method to delegate a contain notification when
* all data has been read.
*/
public void onAllDataRead() {
this.logger.trace("I/O event onAllDataRead");
this.state.get().onAllDataRead(this);
}
/**
* Sub-classes can call this to delegate container error notifications.
*/
public final void onError(Throwable ex) {
if (this.logger.isTraceEnabled()) {
this.logger.trace("I/O event onError: " + ex);
@ -93,11 +105,11 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { @@ -93,11 +105,11 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
}
// Methods for sub-classes to implement...
// Read API methods to be implemented or template methods to override...
/**
* Check if data is available, calling {@link #onDataAvailable()} either
* immediately or later when reading is possible.
* Check if data is available and either call {@link #onDataAvailable()}
* immediately or schedule a notification.
*/
protected abstract void checkOnDataAvailable();

86
spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java

@ -53,7 +53,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -53,7 +53,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
private final WriteResultPublisher resultPublisher = new WriteResultPublisher();
// Subscriber methods and methods to notify of async I/O events...
// Subscriber methods and async I/O notification methods...
@Override
public final void onSubscribe(Subscription subscription) {
@ -67,8 +67,8 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -67,8 +67,8 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
}
/**
* Notify of an error. This can come from the upstream write Publisher or
* from sub-classes as a result of an I/O error.
* Error signal from the upstream, write Publisher. This is also used by
* sub-classes to delegate error notifications from the container.
*/
@Override
public final void onError(Throwable ex) {
@ -79,8 +79,8 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -79,8 +79,8 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
}
/**
* Notify of completion. This can come from the upstream write Publisher or
* from sub-classes as a result of an I/O completion event.
* Completion signal from the upstream, write Publisher. This is also used
* by sub-classes to delegate completion notifications from the container.
*/
@Override
public final void onComplete() {
@ -88,6 +88,19 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -88,6 +88,19 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
this.state.get().onComplete(this);
}
/**
* Invoked when flusing is possible, either in the same thread after a check
* via {@link #isWritePossible()}, or as a callback from the underlying
* container.
*/
protected final void onFlushPossible() {
this.state.get().onFlushPossible(this);
}
/**
* Invoked during an error or completion callback from the underlying
* container to cancel the upstream subscription.
*/
protected void cancel() {
this.logger.trace("Received request to cancel");
if (this.subscription != null) {
@ -95,7 +108,8 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -95,7 +108,8 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
}
}
// Publisher method...
// Publisher implementation for result notifications...
@Override
public final void subscribe(Subscriber<? super Void> subscriber) {
@ -103,26 +117,23 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -103,26 +117,23 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
}
// Methods for sub-classes to implement or override...
// Write API methods to be implemented or template methods to override...
/**
* Create a new processor for subscribing to the next flush boundary.
* Create a new processor for the current flush boundary.
*/
protected abstract Processor<? super T, Void> createWriteProcessor();
/**
* Flush the output if ready, or otherwise {@link #isFlushPending()} should
* return true after that.
* Whether writing/flushing is possible.
*/
protected abstract void flush() throws IOException;
protected abstract boolean isWritePossible();
/**
* Invoked when an error happens while flushing. Defaults to no-op.
* Servlet 3.1 based implementations will receive an
* {@link javax.servlet.AsyncListener#onError} event.
* Flush the output if ready, or otherwise {@link #isFlushPending()} should
* return true after.
*/
protected void flushingFailed(Throwable t) {
}
protected abstract void flush() throws IOException;
/**
* Whether flushing is pending.
@ -130,17 +141,14 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -130,17 +141,14 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
protected abstract boolean isFlushPending();
/**
* Listeners can call this to notify when flushing is possible.
* Invoked when an error happens while flushing. Sub-classes may choose
* to ignore this if they know the underlying API will provide an error
* notification in a container thread.
* <p>Defaults to no-op.
*/
protected final void onFlushPossible() {
this.state.get().onFlushPossible(this);
protected void flushingFailed(Throwable t) {
}
/**
* Whether writing is possible.
*/
protected abstract boolean isWritePossible();
// Private methods for use in State...
@ -167,16 +175,16 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -167,16 +175,16 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
* Represents a state for the {@link Processor} to be in.
*
* <p><pre>
* UNSUBSCRIBED
* |
* v
* +--- REQUESTED <--------> RECEIVED ---+
* | | |
* | | |
* | FLUSHING <------+ |
* | | |
* | v |
* +----------> COMPLETED <--------------+
* UNSUBSCRIBED
* |
* v
* REQUESTED <---> RECEIVED ------+
* | | |
* | v |
* | FLUSHING |
* | | |
* | v |
* +--------> COMPLETED <-----+
* </pre>
*/
private enum State {
@ -269,7 +277,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -269,7 +277,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
processor.state.get().onComplete(processor);
}
}
public <T> void onNext(AbstractListenerWriteFlushProcessor<T> processor, Publisher<? extends T> publisher) {
public <T> void onNext(AbstractListenerWriteFlushProcessor<T> proc, Publisher<? extends T> pub) {
// ignore
}
@Override
@ -280,7 +288,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -280,7 +288,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
COMPLETED {
@Override
public <T> void onNext(AbstractListenerWriteFlushProcessor<T> processor, Publisher<? extends T> publisher) {
public <T> void onNext(AbstractListenerWriteFlushProcessor<T> proc, Publisher<? extends T> pub) {
// ignore
}
@Override
@ -294,11 +302,11 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -294,11 +302,11 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
};
public <T> void onSubscribe(AbstractListenerWriteFlushProcessor<T> processor, Subscription subscription) {
public <T> void onSubscribe(AbstractListenerWriteFlushProcessor<T> proc, Subscription subscription) {
subscription.cancel();
}
public <T> void onNext(AbstractListenerWriteFlushProcessor<T> processor, Publisher<? extends T> publisher) {
public <T> void onNext(AbstractListenerWriteFlushProcessor<T> proc, Publisher<? extends T> pub) {
throw new IllegalStateException(toString());
}
@ -326,7 +334,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -326,7 +334,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
/**
* Subscriber to receive and delegate completion notifications for from
* the current Publisher, i.e. within the current flush boundary.
* the current Publisher, i.e. for the current flush boundary.
*/
private static class WriteResultSubscriber implements Subscriber<Void> {

44
spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java

@ -58,7 +58,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -58,7 +58,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
private final WriteResultPublisher resultPublisher = new WriteResultPublisher();
// Subscriber methods and methods to notify of async I/O events...
// Subscriber methods and async I/O notification methods...
@Override
public final void onSubscribe(Subscription subscription) {
@ -72,8 +72,8 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -72,8 +72,8 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
}
/**
* Notify of an error. This can come from the upstream write Publisher or
* from sub-classes as a result of an I/O error.
* Error signal from the upstream, write Publisher. This is also used by
* sub-classes to delegate error notifications from the container.
*/
@Override
public final void onError(Throwable ex) {
@ -84,8 +84,8 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -84,8 +84,8 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
}
/**
* Notify of completion. This can come from the upstream write Publisher or
* from sub-classes as a result of an I/O completion event.
* Completion signal from the upstream, write Publisher. This is also used
* by sub-classes to delegate completion notifications from the container.
*/
@Override
public final void onComplete() {
@ -93,11 +93,20 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -93,11 +93,20 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
this.state.get().onComplete(this);
}
/**
* Invoked when writing is possible, either in the same thread after a check
* via {@link #isWritePossible()}, or as a callback from the underlying
* container.
*/
public final void onWritePossible() {
this.logger.trace("Received onWritePossible");
this.state.get().onWritePossible(this);
}
/**
* Invoked during an error or completion callback from the underlying
* container to cancel the upstream subscription.
*/
public void cancel() {
this.logger.trace("Received request to cancel");
if (this.subscription != null) {
@ -105,7 +114,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -105,7 +114,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
}
}
// Publisher method...
// Publisher implementation for result notifications...
@Override
public final void subscribe(Subscriber<? super Void> subscriber) {
@ -113,7 +122,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -113,7 +122,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
}
// Methods for sub-classes to implement or override...
// Write API methods to be implemented or template methods to override...
/**
* Whether the given data item has any content to write.
@ -122,8 +131,9 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -122,8 +131,9 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
protected abstract boolean isDataEmpty(T data);
/**
* Called when a data item is received via {@link Subscriber#onNext(Object)}.
* The default implementation saves the data for writing when possible.
* Template method invoked after a data item to write is received via
* {@link Subscriber#onNext(Object)}. The default implementation saves the
* data item for writing once that is possible.
*/
protected void dataReceived(T data) {
if (this.currentData != null) {
@ -151,27 +161,31 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -151,27 +161,31 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
protected abstract boolean write(T data) throws IOException;
/**
* Suspend writing. Defaults to no-op.
* Invoked after the current data has been written and before requesting
* the next item from the upstream, write Publisher.
* <p>The default implementation is a no-op.
*/
protected void suspendWriting() {
}
/**
* Invoked when writing is complete. Defaults to no-op.
* Invoked after onComplete or onError notification.
* <p>The default implementation is a no-op.
*/
protected void writingComplete() {
}
/**
* Invoked when an error happens while writing.
* <p>Defaults to no-op. Servlet 3.1 based implementations will receive
* {@code javax.servlet.WriteListener#onError(Throwable)} event.
* Invoked when an I/O error occurs during a write. Sub-classes may choose
* to ignore this if they know the underlying API will provide an error
* notification in a container thread.
* <p>Defaults to no-op.
*/
protected void writingFailed(Throwable ex) {
}
// Private methods for use in State...
// Private methods for use from State's...
private boolean changeState(State oldState, State newState) {
boolean result = this.state.compareAndSet(oldState, newState);

16
spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java

@ -60,7 +60,7 @@ class WriteResultPublisher implements Publisher<Void> { @@ -60,7 +60,7 @@ class WriteResultPublisher implements Publisher<Void> {
}
/**
* Delegate a completion signal to the subscriber.
* Invoke this to delegate a completion signal to the subscriber.
*/
public void publishComplete() {
if (logger.isTraceEnabled()) {
@ -70,7 +70,7 @@ class WriteResultPublisher implements Publisher<Void> { @@ -70,7 +70,7 @@ class WriteResultPublisher implements Publisher<Void> {
}
/**
* Delegate the given error signal to the subscriber.
* Invoke this to delegate an error signal to the subscriber.
*/
public void publishError(Throwable t) {
if (logger.isTraceEnabled()) {
@ -86,7 +86,7 @@ class WriteResultPublisher implements Publisher<Void> { @@ -86,7 +86,7 @@ class WriteResultPublisher implements Publisher<Void> {
/**
* Subscription to receive and delegate request and cancel signals from the
* suscbriber to this publisher.
* subscriber to this publisher.
*/
private static final class WriteResultSubscription implements Subscription {
@ -224,8 +224,9 @@ class WriteResultPublisher implements Publisher<Void> { @@ -224,8 +224,9 @@ class WriteResultPublisher implements Publisher<Void> {
void publishComplete(WriteResultPublisher publisher) {
if (publisher.changeState(this, COMPLETED)) {
Assert.state(publisher.subscriber != null, "No subscriber");
publisher.subscriber.onComplete();
Subscriber<? super Void> s = publisher.subscriber;
Assert.state(s != null, "No subscriber");
s.onComplete();
}
else {
publisher.state.get().publishComplete(publisher);
@ -234,8 +235,9 @@ class WriteResultPublisher implements Publisher<Void> { @@ -234,8 +235,9 @@ class WriteResultPublisher implements Publisher<Void> {
void publishError(WriteResultPublisher publisher, Throwable t) {
if (publisher.changeState(this, COMPLETED)) {
Assert.state(publisher.subscriber != null, "No subscriber");
publisher.subscriber.onError(t);
Subscriber<? super Void> s = publisher.subscriber;
Assert.state(s != null, "No subscriber");
s.onError(t);
}
else {
publisher.state.get().publishError(publisher, t);

Loading…
Cancel
Save