diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/reactive/PayloadMethodArgumentResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/reactive/PayloadMethodArgumentResolver.java index b057201230b..53004446d1c 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/reactive/PayloadMethodArgumentResolver.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/reactive/PayloadMethodArgumentResolver.java @@ -231,7 +231,7 @@ public class PayloadMethodArgumentResolver implements HandlerMethodArgumentResol Flux> flux = content .filter(this::nonEmptyDataBuffer) .map(buffer -> decoder.decode(buffer, elementType, mimeType, hints)) - .onErrorResume(ex -> Flux.error(handleReadError(parameter, message, ex))); + .onErrorMap(ex -> handleReadError(parameter, message, ex)); if (isContentRequired) { flux = flux.switchIfEmpty(Flux.error(() -> handleMissingBody(parameter, message))); } @@ -245,7 +245,7 @@ public class PayloadMethodArgumentResolver implements HandlerMethodArgumentResol Mono> mono = content.next() .filter(this::nonEmptyDataBuffer) .map(buffer -> decoder.decode(buffer, elementType, mimeType, hints)) - .onErrorResume(ex -> Mono.error(handleReadError(parameter, message, ex))); + .onErrorMap(ex -> handleReadError(parameter, message, ex)); if (isContentRequired) { mono = mono.switchIfEmpty(Mono.error(() -> handleMissingBody(parameter, message))); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNetty2TcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNetty2TcpClient.java index a333e08e26c..61622a35e9c 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNetty2TcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNetty2TcpClient.java @@ -256,12 +256,12 @@ public class ReactorNetty2TcpClient
implements TcpOperations
{ this.channelGroup.close().addListener(future -> channnelGroupCloseSink.tryEmitEmpty()); result = channnelGroupCloseSink.asMono(); if (this.loopResources != null) { - result = result.onErrorResume(ex -> Mono.empty()).then(this.loopResources.disposeLater()); + result = result.onErrorComplete().then(this.loopResources.disposeLater()); } if (this.poolResources != null) { - result = result.onErrorResume(ex -> Mono.empty()).then(this.poolResources.disposeLater()); + result = result.onErrorComplete().then(this.poolResources.disposeLater()); } - result = result.onErrorResume(ex -> Mono.empty()).then(stopScheduler()); + result = result.onErrorComplete().then(stopScheduler()); } else { result = stopScheduler(); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index 5b56c35f7f9..337b7b31135 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -254,12 +254,12 @@ public class ReactorNettyTcpClient
implements TcpOperations
{
if (this.channelGroup != null) {
result = FutureMono.from(this.channelGroup.close());
if (this.loopResources != null) {
- result = result.onErrorResume(ex -> Mono.empty()).then(this.loopResources.disposeLater());
+ result = result.onErrorComplete().then(this.loopResources.disposeLater());
}
if (this.poolResources != null) {
- result = result.onErrorResume(ex -> Mono.empty()).then(this.poolResources.disposeLater());
+ result = result.onErrorComplete().then(this.poolResources.disposeLater());
}
- result = result.onErrorResume(ex -> Mono.empty()).then(stopScheduler());
+ result = result.onErrorComplete().then(stopScheduler());
}
else {
result = stopScheduler();
diff --git a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java
index 38ef488cae5..8537aa4d8d1 100644
--- a/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java
+++ b/spring-r2dbc/src/main/java/org/springframework/r2dbc/connection/R2dbcTransactionManager.java
@@ -228,8 +228,8 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
.then(Mono.error(ex));
}
return Mono.error(ex);
- })).onErrorResume(ex -> Mono.error(new CannotCreateTransactionException(
- "Could not open R2DBC Connection for transaction", ex)));
+ })).onErrorMap(ex -> new CannotCreateTransactionException(
+ "Could not open R2DBC Connection for transaction", ex));
}).then();
}
diff --git a/spring-r2dbc/src/test/java/org/springframework/r2dbc/core/AbstractDatabaseClientIntegrationTests.java b/spring-r2dbc/src/test/java/org/springframework/r2dbc/core/AbstractDatabaseClientIntegrationTests.java
index d25e21cb8b4..7fdde3c3bea 100644
--- a/spring-r2dbc/src/test/java/org/springframework/r2dbc/core/AbstractDatabaseClientIntegrationTests.java
+++ b/spring-r2dbc/src/test/java/org/springframework/r2dbc/core/AbstractDatabaseClientIntegrationTests.java
@@ -50,7 +50,7 @@ abstract class AbstractDatabaseClientIntegrationTests {
Mono.from(connectionFactory.create())
.flatMapMany(connection -> Flux.from(connection.createStatement("DROP TABLE legoset").execute())
.flatMap(Result::getRowsUpdated)
- .onErrorResume(e -> Mono.empty())
+ .onErrorComplete()
.thenMany(connection.createStatement(getCreateTableStatement()).execute())
.flatMap(Result::getRowsUpdated).thenMany(connection.close())).as(StepVerifier::create)
.verifyComplete();
diff --git a/spring-r2dbc/src/test/java/org/springframework/r2dbc/core/AbstractTransactionalDatabaseClientIntegrationTests.java b/spring-r2dbc/src/test/java/org/springframework/r2dbc/core/AbstractTransactionalDatabaseClientIntegrationTests.java
index 8bcb4c3f49c..8973f553a2c 100644
--- a/spring-r2dbc/src/test/java/org/springframework/r2dbc/core/AbstractTransactionalDatabaseClientIntegrationTests.java
+++ b/spring-r2dbc/src/test/java/org/springframework/r2dbc/core/AbstractTransactionalDatabaseClientIntegrationTests.java
@@ -69,7 +69,7 @@ abstract class AbstractTransactionalDatabaseClientIntegrationTests {
Mono.from(connectionFactory.create())
.flatMapMany(connection -> Flux.from(connection.createStatement("DROP TABLE legoset").execute())
.flatMap(Result::getRowsUpdated)
- .onErrorResume(e -> Mono.empty())
+ .onErrorComplete()
.thenMany(connection.createStatement(getCreateTableStatement()).execute())
.flatMap(Result::getRowsUpdated).thenMany(connection.close())).as(StepVerifier::create).verifyComplete();
diff --git a/spring-web/src/main/java/org/springframework/http/codec/ResourceHttpMessageWriter.java b/spring-web/src/main/java/org/springframework/http/codec/ResourceHttpMessageWriter.java
index 5ab892c8d93..f1517acfda0 100644
--- a/spring-web/src/main/java/org/springframework/http/codec/ResourceHttpMessageWriter.java
+++ b/spring-web/src/main/java/org/springframework/http/codec/ResourceHttpMessageWriter.java
@@ -191,7 +191,7 @@ public class ResourceHttpMessageWriter implements HttpMessageWriter