|
|
|
@ -92,12 +92,12 @@ public class SimpleReactiveMongoRepository<T, ID extends Serializable> implement |
|
|
|
q.limit(2); |
|
|
|
q.limit(2); |
|
|
|
|
|
|
|
|
|
|
|
return mongoOperations.find(q, example.getProbeType(), entityInformation.getCollectionName()).buffer(2) |
|
|
|
return mongoOperations.find(q, example.getProbeType(), entityInformation.getCollectionName()).buffer(2) |
|
|
|
.flatMap(vals -> { |
|
|
|
.map(vals -> { |
|
|
|
|
|
|
|
|
|
|
|
if (vals.size() > 1) { |
|
|
|
if (vals.size() > 1) { |
|
|
|
return Mono.error(new IncorrectResultSizeDataAccessException(1)); |
|
|
|
throw new IncorrectResultSizeDataAccessException(1); |
|
|
|
} |
|
|
|
} |
|
|
|
return Mono.just(vals.iterator().next()); |
|
|
|
return vals.iterator().next(); |
|
|
|
}).next(); |
|
|
|
}).next(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -315,8 +315,7 @@ public class SimpleReactiveMongoRepository<T, ID extends Serializable> implement |
|
|
|
|
|
|
|
|
|
|
|
Assert.notNull(entityStream, "The given Publisher of entities must not be null!"); |
|
|
|
Assert.notNull(entityStream, "The given Publisher of entities must not be null!"); |
|
|
|
|
|
|
|
|
|
|
|
return Flux.from(entityStream) |
|
|
|
return Flux.from(entityStream).flatMap(entity -> entityInformation.isNew(entity) ? //
|
|
|
|
.flatMap(entity -> entityInformation.isNew(entity) ? //
|
|
|
|
|
|
|
|
mongoOperations.insert(entity, entityInformation.getCollectionName()).then(Mono.just(entity)) : //
|
|
|
|
mongoOperations.insert(entity, entityInformation.getCollectionName()).then(Mono.just(entity)) : //
|
|
|
|
mongoOperations.save(entity, entityInformation.getCollectionName()).then(Mono.just(entity))); |
|
|
|
mongoOperations.save(entity, entityInformation.getCollectionName()).then(Mono.just(entity))); |
|
|
|
} |
|
|
|
} |
|
|
|
|