From 673f83e388e5ee992340a623de5a248e95b94c34 Mon Sep 17 00:00:00 2001 From: Stephane Nicoll Date: Tue, 4 Aug 2020 11:41:30 +0200 Subject: [PATCH] Adapt to API changes in the latest reactor snapshot --- ...ractRoutingConnectionFactoryUnitTests.java | 20 +++++++++++-------- .../interceptor/TransactionAspectSupport.java | 8 ++++---- .../reactive/TransactionContextManager.java | 10 ++++------ .../reactive/TransactionalOperatorImpl.java | 8 ++++---- .../ServerWebExchangeContextFilter.java | 2 +- 5 files changed, 25 insertions(+), 23 deletions(-) diff --git a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/lookup/AbstractRoutingConnectionFactoryUnitTests.java b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/lookup/AbstractRoutingConnectionFactoryUnitTests.java index 233e22dbc5b..43ea3141a14 100644 --- a/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/lookup/AbstractRoutingConnectionFactoryUnitTests.java +++ b/spring-r2dbc/src/test/java/org/springframework/r2dbc/connection/lookup/AbstractRoutingConnectionFactoryUnitTests.java @@ -63,7 +63,7 @@ public class AbstractRoutingConnectionFactoryUnitTests { connectionFactory.afterPropertiesSet(); connectionFactory.determineTargetConnectionFactory() - .subscriberContext(Context.of(ROUTING_KEY, "key")) + .contextWrite(Context.of(ROUTING_KEY, "key")) .as(StepVerifier::create) .expectNext(routedConnectionFactory) .verifyComplete(); @@ -109,7 +109,7 @@ public class AbstractRoutingConnectionFactoryUnitTests { connectionFactory.afterPropertiesSet(); connectionFactory.determineTargetConnectionFactory() - .subscriberContext(Context.of(ROUTING_KEY, "unknown")) + .contextWrite(Context.of(ROUTING_KEY, "unknown")) .as(StepVerifier::create) .verifyError(IllegalStateException.class); } @@ -122,7 +122,7 @@ public class AbstractRoutingConnectionFactoryUnitTests { connectionFactory.afterPropertiesSet(); connectionFactory.determineTargetConnectionFactory() - .subscriberContext(Context.of(ROUTING_KEY, "unknown")) + .contextWrite(Context.of(ROUTING_KEY, "unknown")) .as(StepVerifier::create) .expectNext(defaultConnectionFactory) .verifyComplete(); @@ -153,7 +153,7 @@ public class AbstractRoutingConnectionFactoryUnitTests { connectionFactory.afterPropertiesSet(); connectionFactory.determineTargetConnectionFactory() - .subscriberContext(Context.of(ROUTING_KEY, "my-key")) + .contextWrite(Context.of(ROUTING_KEY, "my-key")) .as(StepVerifier::create) .expectNext(routedConnectionFactory) .verifyComplete(); @@ -168,7 +168,7 @@ public class AbstractRoutingConnectionFactoryUnitTests { connectionFactory.afterPropertiesSet(); connectionFactory.determineTargetConnectionFactory() - .subscriberContext(Context.of(ROUTING_KEY, "lookup-key")) + .contextWrite(Context.of(ROUTING_KEY, "lookup-key")) .as(StepVerifier::create) .expectNext(defaultConnectionFactory) .verifyComplete(); @@ -177,7 +177,7 @@ public class AbstractRoutingConnectionFactoryUnitTests { connectionFactory.afterPropertiesSet(); connectionFactory.determineTargetConnectionFactory() - .subscriberContext(Context.of(ROUTING_KEY, "lookup-key")) + .contextWrite(Context.of(ROUTING_KEY, "lookup-key")) .as(StepVerifier::create) .expectNext(routedConnectionFactory) .verifyComplete(); @@ -187,8 +187,12 @@ public class AbstractRoutingConnectionFactoryUnitTests { @Override protected Mono determineCurrentLookupKey() { - return Mono.subscriberContext().filter(context -> context.hasKey(ROUTING_KEY)) - .map(context -> context.get(ROUTING_KEY)); + return Mono.deferContextual(context -> { + if (context.hasKey(ROUTING_KEY)) { + return Mono.just(context.get(ROUTING_KEY)); + } + return Mono.empty(); + }); } } diff --git a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java index cbcfb6cc85b..64f15f234c3 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java +++ b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java @@ -887,8 +887,8 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init // target invocation exception return completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)); } - })).subscriberContext(TransactionContextManager.getOrCreateContext()) - .subscriberContext(TransactionContextManager.getOrCreateContextHolder()); + })).contextWrite(TransactionContextManager.getOrCreateContext()) + .contextWrite(TransactionContextManager.getOrCreateContextHolder()); } // Any other reactive type, typically a Flux @@ -917,8 +917,8 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init // target invocation exception return completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)); } - })).subscriberContext(TransactionContextManager.getOrCreateContext()) - .subscriberContext(TransactionContextManager.getOrCreateContextHolder())); + })).contextWrite(TransactionContextManager.getOrCreateContext()) + .contextWrite(TransactionContextManager.getOrCreateContextHolder())); } @SuppressWarnings("serial") diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextManager.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextManager.java index e3df7dbdb9f..0a2cdda01a8 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextManager.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextManager.java @@ -50,19 +50,17 @@ public abstract class TransactionContextManager { * or no context found in a holder */ public static Mono currentContext() throws NoTransactionException { - return Mono.subscriberContext().handle((ctx, sink) -> { + return Mono.deferContextual(ctx -> { if (ctx.hasKey(TransactionContext.class)) { - sink.next(ctx.get(TransactionContext.class)); - return; + return Mono.just(ctx.get(TransactionContext.class)); } if (ctx.hasKey(TransactionContextHolder.class)) { TransactionContextHolder holder = ctx.get(TransactionContextHolder.class); if (holder.hasContext()) { - sink.next(holder.currentContext()); - return; + return Mono.just(holder.currentContext()); } } - sink.error(new NoTransactionInContextException()); + return Mono.error(new NoTransactionInContextException()); }); } diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java index 7dc3de8842e..ece11c5ec1f 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java @@ -82,8 +82,8 @@ final class TransactionalOperatorImpl implements TransactionalOperator { this.transactionManager::commit, (res, err) -> Mono.empty(), this.transactionManager::rollback) .onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex)))); }) - .subscriberContext(TransactionContextManager.getOrCreateContext()) - .subscriberContext(TransactionContextManager.getOrCreateContextHolder()); + .contextWrite(TransactionContextManager.getOrCreateContext()) + .contextWrite(TransactionContextManager.getOrCreateContextHolder()); } @Override @@ -104,8 +104,8 @@ final class TransactionalOperatorImpl implements TransactionalOperator { .onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex)))); }) - .subscriberContext(TransactionContextManager.getOrCreateContext()) - .subscriberContext(TransactionContextManager.getOrCreateContextHolder()); + .contextWrite(TransactionContextManager.getOrCreateContext()) + .contextWrite(TransactionContextManager.getOrCreateContextHolder()); } /** diff --git a/spring-web/src/main/java/org/springframework/web/filter/reactive/ServerWebExchangeContextFilter.java b/spring-web/src/main/java/org/springframework/web/filter/reactive/ServerWebExchangeContextFilter.java index 094a6f65294..8d47f89efe6 100644 --- a/spring-web/src/main/java/org/springframework/web/filter/reactive/ServerWebExchangeContextFilter.java +++ b/spring-web/src/main/java/org/springframework/web/filter/reactive/ServerWebExchangeContextFilter.java @@ -47,7 +47,7 @@ public class ServerWebExchangeContextFilter implements WebFilter { @Override public Mono filter(ServerWebExchange exchange, WebFilterChain chain) { return chain.filter(exchange) - .subscriberContext(cxt -> cxt.put(EXCHANGE_CONTEXT_ATTRIBUTE, exchange)); + .contextWrite(cxt -> cxt.put(EXCHANGE_CONTEXT_ATTRIBUTE, exchange)); }