@ -88,8 +88,7 @@ class R2dbcTransactionManagerUnitTests {
@@ -88,8 +88,7 @@ class R2dbcTransactionManagerUnitTests {
ConnectionFactoryUtils . getConnection ( connectionFactoryMock )
. flatMap ( connection - > TransactionSynchronizationManager . forCurrentTransaction ( )
. doOnNext ( synchronizationManager - > synchronizationManager . registerSynchronization (
sync ) ) )
. doOnNext ( synchronizationManager - > synchronizationManager . registerSynchronization ( sync ) ) )
. as ( operator : : transactional )
. as ( StepVerifier : : create )
. expectNextCount ( 1 )
@ -120,12 +119,11 @@ class R2dbcTransactionManagerUnitTests {
@@ -120,12 +119,11 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator . create ( tm , definition ) ;
ConnectionFactoryUtils . getConnection ( connectionFactoryMock ) . as (
operator : : transactional )
ConnectionFactoryUtils . getConnection ( connectionFactoryMock )
. as ( operator : : transactional )
. as ( StepVerifier : : create )
. expectErrorSatisfies ( actual - > assertThat ( actual ) . isInstanceOf (
CannotCreateTransactionException . class ) . hasCauseInstanceOf (
R2dbcBadGrammarException . class ) )
CannotCreateTransactionException . class ) . hasCauseInstanceOf ( R2dbcBadGrammarException . class ) )
. verify ( ) ;
}
@ -141,8 +139,8 @@ class R2dbcTransactionManagerUnitTests {
@@ -141,8 +139,8 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator . create ( tm , definition ) ;
ConnectionFactoryUtils . getConnection ( connectionFactoryMock ) . as (
operator : : transactional )
ConnectionFactoryUtils . getConnection ( connectionFactoryMock )
. as ( operator : : transactional )
. as ( StepVerifier : : create )
. expectNextCount ( 1 )
. verifyComplete ( ) ;
@ -171,8 +169,8 @@ class R2dbcTransactionManagerUnitTests {
@@ -171,8 +169,8 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator . create ( tm , definition ) ;
ConnectionFactoryUtils . getConnection ( connectionFactoryMock ) . as (
operator : : transactional )
ConnectionFactoryUtils . getConnection ( connectionFactoryMock )
. as ( operator : : transactional )
. as ( StepVerifier : : create )
. expectNextCount ( 1 )
. verifyComplete ( ) ;
@ -190,8 +188,8 @@ class R2dbcTransactionManagerUnitTests {
@@ -190,8 +188,8 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator . create ( tm , definition ) ;
ConnectionFactoryUtils . getConnection ( connectionFactoryMock ) . as (
operator : : transactional )
ConnectionFactoryUtils . getConnection ( connectionFactoryMock )
. as ( operator : : transactional )
. as ( StepVerifier : : create )
. expectNextCount ( 1 )
. verifyComplete ( ) ;
@ -215,8 +213,8 @@ class R2dbcTransactionManagerUnitTests {
@@ -215,8 +213,8 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator . create ( tm , definition ) ;
ConnectionFactoryUtils . getConnection ( connectionFactoryMock ) . as (
operator : : transactional )
ConnectionFactoryUtils . getConnection ( connectionFactoryMock )
. as ( operator : : transactional )
. as ( StepVerifier : : create )
. expectNextCount ( 1 )
. verifyComplete ( ) ;
@ -262,11 +260,9 @@ class R2dbcTransactionManagerUnitTests {
@@ -262,11 +260,9 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator . create ( tm ) ;
ConnectionFactoryUtils . getConnection ( connectionFactoryMock )
. doOnNext ( connection - > {
throw new IllegalStateException ( ) ;
} ) . as ( operator : : transactional )
. as ( StepVerifier : : create )
. verifyError ( IllegalStateException . class ) ;
. doOnNext ( connection - > { throw new IllegalStateException ( ) ; } )
. as ( operator : : transactional )
. as ( StepVerifier : : create ) . verifyError ( IllegalStateException . class ) ;
assertThat ( commits ) . hasValue ( 0 ) ;
assertThat ( rollbacks ) . hasValue ( 1 ) ;
@ -286,8 +282,7 @@ class R2dbcTransactionManagerUnitTests {
@@ -286,8 +282,7 @@ class R2dbcTransactionManagerUnitTests {
reactiveTransaction . setRollbackOnly ( ) ;
return ConnectionFactoryUtils . getConnection ( connectionFactoryMock )
. doOnNext ( connection - > connection . createStatement ( "foo" ) ) . then ( ) ;
} ) . as ( StepVerifier : : create )
. verifyError ( BadSqlGrammarException . class ) ;
} ) . as ( StepVerifier : : create ) . verifyError ( BadSqlGrammarException . class ) ;
verify ( connectionMock ) . beginTransaction ( any ( io . r2dbc . spi . TransactionDefinition . class ) ) ;
verify ( connectionMock ) . createStatement ( "foo" ) ;
@ -308,7 +303,7 @@ class R2dbcTransactionManagerUnitTests {
@@ -308,7 +303,7 @@ class R2dbcTransactionManagerUnitTests {
. doOnNext ( connection - > {
throw new IllegalStateException ( "Intentional error to trigger rollback" ) ;
} ) . then ( ) ) . as ( StepVerifier : : create )
. verifyErrorSatisfies ( e - > assertThat ( e )
. verifyErrorSatisfies ( ex - > assertThat ( ex )
. isInstanceOf ( BadSqlGrammarException . class )
. hasCause ( new R2dbcBadGrammarException ( "Rollback should fail" ) )
) ;
@ -334,8 +329,7 @@ class R2dbcTransactionManagerUnitTests {
@@ -334,8 +329,7 @@ class R2dbcTransactionManagerUnitTests {
assertThat ( synchronizationManager . hasResource ( connectionFactoryMock ) ) . isTrue ( ) ;
synchronizationManager . registerSynchronization ( sync ) ;
} ) . then ( ) ;
} ) . as ( StepVerifier : : create )
. verifyComplete ( ) ;
} ) . as ( StepVerifier : : create ) . verifyComplete ( ) ;
verify ( connectionMock ) . beginTransaction ( any ( io . r2dbc . spi . TransactionDefinition . class ) ) ;
verify ( connectionMock ) . rollbackTransaction ( ) ;
@ -363,8 +357,7 @@ class R2dbcTransactionManagerUnitTests {
@@ -363,8 +357,7 @@ class R2dbcTransactionManagerUnitTests {
fail ( "Should have thrown IllegalTransactionStateException" ) ;
return Mono . empty ( ) ;
} ) ;
} ) . as ( StepVerifier : : create )
. verifyError ( IllegalTransactionStateException . class ) ;
} ) . as ( StepVerifier : : create ) . verifyError ( IllegalTransactionStateException . class ) ;
verify ( connectionMock ) . rollbackTransaction ( ) ;
verify ( connectionMock ) . close ( ) ;
@ -381,14 +374,13 @@ class R2dbcTransactionManagerUnitTests {
@@ -381,14 +374,13 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator . create ( tm , definition ) ;
operator . execute ( tx1 - > {
assertThat ( tx1 . isNewTransaction ( ) ) . isTrue ( ) ;
definition . setPropagationBehavior ( TransactionDefinition . PROPAGATION_NESTED ) ;
return operator . execute ( tx2 - > {
assertThat ( tx2 . isNewTransaction ( ) ) . isTrue ( ) ;
return Mono . empty ( ) ;
} ) ;
} ) . as ( StepVerifier : : create )
. verifyComplete ( ) ;
assertThat ( tx1 . isNewTransaction ( ) ) . isTrue ( ) ;
definition . setPropagationBehavior ( TransactionDefinition . PROPAGATION_NESTED ) ;
return operator . execute ( tx2 - > {
assertThat ( tx2 . isNewTransaction ( ) ) . isTrue ( ) ;
return Mono . empty ( ) ;
} ) ;
} ) . as ( StepVerifier : : create ) . verifyComplete ( ) ;
verify ( connectionMock ) . createSavepoint ( "SAVEPOINT_1" ) ;
verify ( connectionMock ) . releaseSavepoint ( "SAVEPOINT_1" ) ;
@ -407,15 +399,14 @@ class R2dbcTransactionManagerUnitTests {
@@ -407,15 +399,14 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator . create ( tm , definition ) ;
operator . execute ( tx1 - > {
assertThat ( tx1 . isNewTransaction ( ) ) . isTrue ( ) ;
definition . setPropagationBehavior ( TransactionDefinition . PROPAGATION_NESTED ) ;
return operator . execute ( tx2 - > {
assertThat ( tx2 . isNewTransaction ( ) ) . isTrue ( ) ;
tx2 . setRollbackOnly ( ) ;
return Mono . empty ( ) ;
} ) ;
} ) . as ( StepVerifier : : create )
. verifyComplete ( ) ;
assertThat ( tx1 . isNewTransaction ( ) ) . isTrue ( ) ;
definition . setPropagationBehavior ( TransactionDefinition . PROPAGATION_NESTED ) ;
return operator . execute ( tx2 - > {
assertThat ( tx2 . isNewTransaction ( ) ) . isTrue ( ) ;
tx2 . setRollbackOnly ( ) ;
return Mono . empty ( ) ;
} ) ;
} ) . as ( StepVerifier : : create ) . verifyComplete ( ) ;
verify ( connectionMock ) . createSavepoint ( "SAVEPOINT_1" ) ;
verify ( connectionMock ) . rollbackTransactionToSavepoint ( "SAVEPOINT_1" ) ;
@ -432,16 +423,15 @@ class R2dbcTransactionManagerUnitTests {
@@ -432,16 +423,15 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator . create ( tm , definition ) ;
operator . execute ( tx1 - > {
assertThat ( tx1 . isNewTransaction ( ) ) . isFalse ( ) ;
DefaultTransactionDefinition innerDef = new DefaultTransactionDefinition ( ) ;
innerDef . setPropagationBehavior ( TransactionDefinition . PROPAGATION_NESTED ) ;
TransactionalOperator inner = TransactionalOperator . create ( tm , innerDef ) ;
return inner . execute ( tx2 - > {
assertThat ( tx2 . isNewTransaction ( ) ) . isTrue ( ) ;
return Mono . empty ( ) ;
} ) ;
} ) . as ( StepVerifier : : create )
. verifyComplete ( ) ;
assertThat ( tx1 . isNewTransaction ( ) ) . isFalse ( ) ;
DefaultTransactionDefinition innerDef = new DefaultTransactionDefinition ( ) ;
innerDef . setPropagationBehavior ( TransactionDefinition . PROPAGATION_NESTED ) ;
TransactionalOperator inner = TransactionalOperator . create ( tm , innerDef ) ;
return inner . execute ( tx2 - > {
assertThat ( tx2 . isNewTransaction ( ) ) . isTrue ( ) ;
return Mono . empty ( ) ;
} ) ;
} ) . as ( StepVerifier : : create ) . verifyComplete ( ) ;
verify ( connectionMock ) . commitTransaction ( ) ;
verify ( connectionMock ) . close ( ) ;
@ -456,17 +446,16 @@ class R2dbcTransactionManagerUnitTests {
@@ -456,17 +446,16 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator . create ( tm , definition ) ;
operator . execute ( tx1 - > {
assertThat ( tx1 . isNewTransaction ( ) ) . isFalse ( ) ;
DefaultTransactionDefinition innerDef = new DefaultTransactionDefinition ( ) ;
innerDef . setPropagationBehavior ( TransactionDefinition . PROPAGATION_NESTED ) ;
TransactionalOperator inner = TransactionalOperator . create ( tm , innerDef ) ;
return inner . execute ( tx2 - > {
assertThat ( tx2 . isNewTransaction ( ) ) . isTrue ( ) ;
tx2 . setRollbackOnly ( ) ;
return Mono . empty ( ) ;
} ) ;
} ) . as ( StepVerifier : : create )
. verifyComplete ( ) ;
assertThat ( tx1 . isNewTransaction ( ) ) . isFalse ( ) ;
DefaultTransactionDefinition innerDef = new DefaultTransactionDefinition ( ) ;
innerDef . setPropagationBehavior ( TransactionDefinition . PROPAGATION_NESTED ) ;
TransactionalOperator inner = TransactionalOperator . create ( tm , innerDef ) ;
return inner . execute ( tx2 - > {
assertThat ( tx2 . isNewTransaction ( ) ) . isTrue ( ) ;
tx2 . setRollbackOnly ( ) ;
return Mono . empty ( ) ;
} ) ;
} ) . as ( StepVerifier : : create ) . verifyComplete ( ) ;
verify ( connectionMock ) . rollbackTransaction ( ) ;
verify ( connectionMock ) . close ( ) ;
@ -489,8 +478,7 @@ class R2dbcTransactionManagerUnitTests {
@@ -489,8 +478,7 @@ class R2dbcTransactionManagerUnitTests {
assertThat ( tx2 . isNewTransaction ( ) ) . isTrue ( ) ;
return Mono . empty ( ) ;
} ) ;
} ) . as ( StepVerifier : : create )
. verifyComplete ( ) ;
} ) . as ( StepVerifier : : create ) . verifyComplete ( ) ;
verify ( connectionMock ) . commitTransaction ( ) ;
verify ( connectionMock ) . close ( ) ;
@ -505,17 +493,16 @@ class R2dbcTransactionManagerUnitTests {
@@ -505,17 +493,16 @@ class R2dbcTransactionManagerUnitTests {
TransactionalOperator operator = TransactionalOperator . create ( tm , definition ) ;
operator . execute ( tx1 - > {
assertThat ( tx1 . isNewTransaction ( ) ) . isFalse ( ) ;
DefaultTransactionDefinition innerDef = new DefaultTransactionDefinition ( ) ;
innerDef . setPropagationBehavior ( TransactionDefinition . PROPAGATION_REQUIRES_NEW ) ;
TransactionalOperator inner = TransactionalOperator . create ( tm , innerDef ) ;
return inner . execute ( tx2 - > {
assertThat ( tx2 . isNewTransaction ( ) ) . isTrue ( ) ;
tx2 . setRollbackOnly ( ) ;
return Mono . empty ( ) ;
} ) ;
} ) . as ( StepVerifier : : create )
. verifyComplete ( ) ;
assertThat ( tx1 . isNewTransaction ( ) ) . isFalse ( ) ;
DefaultTransactionDefinition innerDef = new DefaultTransactionDefinition ( ) ;
innerDef . setPropagationBehavior ( TransactionDefinition . PROPAGATION_REQUIRES_NEW ) ;
TransactionalOperator inner = TransactionalOperator . create ( tm , innerDef ) ;
return inner . execute ( tx2 - > {
assertThat ( tx2 . isNewTransaction ( ) ) . isTrue ( ) ;
tx2 . setRollbackOnly ( ) ;
return Mono . empty ( ) ;
} ) ;
} ) . as ( StepVerifier : : create ) . verifyComplete ( ) ;
verify ( connectionMock ) . rollbackTransaction ( ) ;
verify ( connectionMock ) . close ( ) ;