Browse Source

AbstractListenerWriteProcessor/AbstractListenerWriteFlushProcessor error handling

When an exception happens while writing/flushing, the exception handling
for Servlet 3.1 based implementation will happen when
WriteListener#onError and AsyncListener#onError events are received
pull/1124/merge
Violeta Georgieva 9 years ago committed by Rossen Stoyanchev
parent
commit
e8d2c6c74b
  1. 12
      spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java
  2. 12
      spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java
  3. 28
      spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java
  4. 12
      spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java

12
spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java

@ -103,6 +103,14 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -103,6 +103,14 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
}
}
/**
* Invoked when an error happens while flushing. Defaults to no-op.
* Servlet 3.1 based implementations will receive
* {@link AsyncListener#onError(Throwable)} event.
*/
protected void flushingFailed(Throwable t) {
}
/**
* Create a new processor for subscribing to the next flush boundary.
@ -167,8 +175,8 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -167,8 +175,8 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
processor.flush();
}
catch (IOException ex) {
processor.cancel();
processor.onError(ex);
processor.flushingFailed(ex);
return;
}
if (processor.subscriberCompleted) {
if (processor.changeState(this, COMPLETED)) {

12
spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java

@ -162,6 +162,15 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -162,6 +162,15 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
protected void writingComplete() {
}
/**
* Invoked when an error happens while writing. Defaults to no-op.
* Servlet 3.1 based implementations will receive
* {@link WriteListener#onError(Throwable)} event.
*/
protected void writingFailed(Throwable t) {
}
private boolean changeState(State oldState, State newState) {
return this.state.compareAndSet(oldState, newState);
@ -276,8 +285,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -276,8 +285,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
}
}
catch (IOException ex) {
processor.cancel();
processor.onError(ex);
processor.writingFailed(ex);
}
}
}

28
spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java

@ -196,9 +196,11 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons @@ -196,9 +196,11 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
bodyFlushProcessor.cancel();
bodyFlushProcessor.onError(ex);
}
if (bodyProcessor != null) {
bodyProcessor.cancel();
bodyProcessor.onError(ex);
ResponseBodyProcessor processor = bodyProcessor;
if (processor != null) {
processor.cancel();
processor.onError(ex);
}
}
@ -208,9 +210,11 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons @@ -208,9 +210,11 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
bodyFlushProcessor.cancel();
bodyFlushProcessor.onComplete();
}
if (bodyProcessor != null) {
bodyProcessor.cancel();
bodyProcessor.onComplete();
ResponseBodyProcessor processor = bodyProcessor;
if (processor != null) {
processor.cancel();
processor.onComplete();
}
}
}
@ -220,16 +224,18 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons @@ -220,16 +224,18 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
@Override
public void onWritePossible() throws IOException {
if (bodyProcessor != null) {
bodyProcessor.onWritePossible();
ResponseBodyProcessor processor = bodyProcessor;
if (processor != null) {
processor.onWritePossible();
}
}
@Override
public void onError(Throwable ex) {
if (bodyProcessor != null) {
bodyProcessor.cancel();
bodyProcessor.onError(ex);
ResponseBodyProcessor processor = bodyProcessor;
if (processor != null) {
processor.cancel();
processor.onError(ex);
}
}
}

12
spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java

@ -229,6 +229,12 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon @@ -229,6 +229,12 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon
this.channel.getWriteSetter().set(null);
this.channel.resumeWrites();
}
@Override
protected void writingFailed(Throwable t) {
cancel();
onError(t);
}
}
@ -248,6 +254,12 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon @@ -248,6 +254,12 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon
UndertowServerHttpResponse.this.responseChannel.flush();
}
}
@Override
protected void flushingFailed(Throwable t) {
cancel();
onError(t);
}
}
}

Loading…
Cancel
Save