|
|
|
@ -519,22 +519,8 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati |
|
|
|
session.startTransaction(); |
|
|
|
session.startTransaction(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
return ReactiveMongoTemplate.this.withSession(action, session) //
|
|
|
|
return Flux.usingWhen(Mono.just(session), s -> ReactiveMongoTemplate.this.withSession(action, s), |
|
|
|
.materialize() //
|
|
|
|
ClientSession::commitTransaction, ClientSession::abortTransaction).doFinally(signalType -> { |
|
|
|
.flatMap(signal -> { |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (session.hasActiveTransaction()) { |
|
|
|
|
|
|
|
if (signal.isOnComplete()) { |
|
|
|
|
|
|
|
return Mono.from(session.commitTransaction()).thenReturn(signal); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if (signal.isOnError()) { |
|
|
|
|
|
|
|
return Mono.from(session.abortTransaction()).thenReturn(signal); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
return Mono.just(signal); |
|
|
|
|
|
|
|
}) //
|
|
|
|
|
|
|
|
.<T> dematerialize() //
|
|
|
|
|
|
|
|
.doFinally(signalType -> { |
|
|
|
|
|
|
|
doFinally.accept(session); |
|
|
|
doFinally.accept(session); |
|
|
|
}); |
|
|
|
}); |
|
|
|
}); |
|
|
|
}); |
|
|
|
|