Browse Source

DATAMONGO-2427 - Switch to MongoDB Driver 4.0.0-beta1.

Enable disabled tests. Adapt to ReactiveGridFS changes.

Use WriteConcern.MAJORITY in tests.

Original pull request: #823.
pull/830/head
Mark Paluch 6 years ago
parent
commit
f688cca400
No known key found for this signature in database
GPG Key ID: 51A00FA751B91849
  1. 8
      .travis.yml
  2. 168
      Jenkinsfile
  3. 14
      ci/openjdk11-mongodb-4.0/Dockerfile
  4. 14
      ci/openjdk11-mongodb-4.1/Dockerfile
  5. 2
      ci/openjdk11-mongodb-4.2/Dockerfile
  6. 14
      ci/openjdk13-mongodb-4.0/Dockerfile
  7. 14
      ci/openjdk13-mongodb-4.1/Dockerfile
  8. 2
      ci/openjdk13-mongodb-4.2/Dockerfile
  9. 4
      ci/openjdk8-mongodb-4.0/Dockerfile
  10. 14
      ci/openjdk8-mongodb-4.1/Dockerfile
  11. 2
      ci/openjdk8-mongodb-4.2/Dockerfile
  12. 14
      ci/openjdk8-mongodb-4.3/Dockerfile
  13. 2
      pom.xml
  14. 5
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
  15. 88
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java
  16. 14
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java
  17. 1
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java
  18. 4
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateUpdateTests.java
  19. 3
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUpdateTests.java
  20. 97
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResourceUnitTests.java
  21. 92
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java
  22. 4
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/test/util/MongoTestUtils.java

8
.travis.yml

@ -21,10 +21,10 @@ matrix: @@ -21,10 +21,10 @@ matrix:
env:
matrix:
- MONGO_VERSION=4.1.10
- MONGO_VERSION=4.0.4
- MONGO_VERSION=3.6.12
- MONGO_VERSION=3.4.20
- MONGO_VERSION=4.2.2
- MONGO_VERSION=4.0.14
- MONGO_VERSION=3.6.16
- MONGO_VERSION=3.4.23
global:
- PROFILE=ci

168
Jenkinsfile vendored

@ -30,64 +30,32 @@ pipeline { @@ -30,64 +30,32 @@ pipeline {
}
}
}
stage('Publish JDK 8 + MongoDB 4.1') {
when {
changeset "ci/openjdk8-mongodb-4.1/**"
}
agent { label 'data' }
options { timeout(time: 30, unit: 'MINUTES') }
steps {
script {
def image = docker.build("springci/spring-data-openjdk8-with-mongodb-4.1", "ci/openjdk8-mongodb-4.1/")
docker.withRegistry('', 'hub.docker.com-springbuildmaster') {
image.push()
}
}
}
}
stage('Publish JDK 8 + MongoDB 4.2') {
when {
changeset "ci/openjdk8-mongodb-4.2/**"
}
agent { label 'data' }
options { timeout(time: 30, unit: 'MINUTES') }
steps {
script {
def image = docker.build("springci/spring-data-openjdk8-with-mongodb-4.2", "ci/openjdk8-mongodb-4.2/")
docker.withRegistry('', 'hub.docker.com-springbuildmaster') {
image.push()
}
}
}
}
stage('Publish JDK 11 + MongoDB 4.0') {
when {
changeset "ci/openjdk11-mongodb-4.0/**"
changeset "ci/openjdk8-mongodb-4.2/**"
}
agent { label 'data' }
options { timeout(time: 30, unit: 'MINUTES') }
steps {
script {
def image = docker.build("springci/spring-data-openjdk11-with-mongodb-4.0", "ci/openjdk11-mongodb-4.0/")
def image = docker.build("springci/spring-data-openjdk8-with-mongodb-4.2", "ci/openjdk8-mongodb-4.2/")
docker.withRegistry('', 'hub.docker.com-springbuildmaster') {
image.push()
}
}
}
}
stage('Publish JDK 11 + MongoDB 4.1') {
stage('Publish JDK 8 + MongoDB 4.3') {
when {
changeset "ci/openjdk11-mongodb-4.1/**"
changeset "ci/openjdk8-mongodb-4.3/**"
}
agent { label 'data' }
options { timeout(time: 30, unit: 'MINUTES') }
steps {
script {
def image = docker.build("springci/spring-data-openjdk11-with-mongodb-4.1", "ci/openjdk11-mongodb-4.1/")
def image = docker.build("springci/spring-data-openjdk8-with-mongodb-4.3", "ci/openjdk8-mongodb-4.3/")
docker.withRegistry('', 'hub.docker.com-springbuildmaster') {
image.push()
}
@ -110,38 +78,6 @@ pipeline { @@ -110,38 +78,6 @@ pipeline {
}
}
}
stage('Publish JDK 13 + MongoDB 4.0') {
when {
changeset "ci/openjdk13-mongodb-4.0/**"
}
agent { label 'data' }
options { timeout(time: 30, unit: 'MINUTES') }
steps {
script {
def image = docker.build("springci/spring-data-openjdk13-with-mongodb-4.0", "ci/openjdk13-mongodb-4.0/")
docker.withRegistry('', 'hub.docker.com-springbuildmaster') {
image.push()
}
}
}
}
stage('Publish JDK 13 + MongoDB 4.1') {
when {
changeset "ci/openjdk13-mongodb-4.1/**"
}
agent { label 'data' }
options { timeout(time: 30, unit: 'MINUTES') }
steps {
script {
def image = docker.build("springci/spring-data-openjdk13-with-mongodb-4.1", "ci/openjdk13-mongodb-4.1/")
docker.withRegistry('', 'hub.docker.com-springbuildmaster') {
image.push()
}
}
}
}
stage('Publish JDK 13 + MongoDB 4.2') {
when {
changeset "ci/openjdk13-mongodb-4.2/**"
@ -170,7 +106,7 @@ pipeline { @@ -170,7 +106,7 @@ pipeline {
}
agent {
docker {
image 'springci/spring-data-openjdk8-with-mongodb-4.2:latest'
image 'springci/spring-data-openjdk8-with-mongodb-4.3:latest'
label 'data'
args '-v $HOME:/tmp/jenkins-home'
}
@ -195,46 +131,6 @@ pipeline { @@ -195,46 +131,6 @@ pipeline {
}
}
parallel {
stage("test: baseline (jdk11)") {
agent {
docker {
image 'springci/spring-data-openjdk11-with-mongodb-4.2:latest'
label 'data'
args '-v $HOME:/tmp/jenkins-home'
}
}
options { timeout(time: 30, unit: 'MINUTES') }
steps {
sh 'rm -rf ?'
sh 'mkdir -p /tmp/mongodb/db /tmp/mongodb/log'
sh 'mongod --dbpath /tmp/mongodb/db --replSet rs0 --fork --logpath /tmp/mongodb/log/mongod.log &'
sh 'sleep 10'
sh 'mongo --eval "rs.initiate({_id: \'rs0\', members:[{_id: 0, host: \'127.0.0.1:27017\'}]});"'
sh 'sleep 15'
sh 'MAVEN_OPTS="-Duser.name=jenkins -Duser.home=/tmp/jenkins-home" ./mvnw -Pjava11 clean dependency:list test -Dsort -U -B'
}
}
stage("test: baseline (jdk13)") {
agent {
docker {
image 'springci/spring-data-openjdk13-with-mongodb-4.2:latest'
label 'data'
args '-v $HOME:/tmp/jenkins-home'
}
}
options { timeout(time: 30, unit: 'MINUTES') }
steps {
sh 'rm -rf ?'
sh 'mkdir -p /tmp/mongodb/db /tmp/mongodb/log'
sh 'mongod --dbpath /tmp/mongodb/db --replSet rs0 --fork --logpath /tmp/mongodb/log/mongod.log &'
sh 'sleep 10'
sh 'mongo --eval "rs.initiate({_id: \'rs0\', members:[{_id: 0, host: \'127.0.0.1:27017\'}]});"'
sh 'sleep 15'
sh 'MAVEN_OPTS="-Duser.name=jenkins -Duser.home=/tmp/jenkins-home" ./mvnw -Pjava11 clean dependency:list test -Dsort -U -B'
}
}
stage("test: mongodb 4.0 (jdk8)") {
agent {
docker {
@ -255,50 +151,10 @@ pipeline { @@ -255,50 +151,10 @@ pipeline {
}
}
stage("test: mongodb 4.0 (jdk11)") {
agent {
docker {
image 'springci/spring-data-openjdk11-with-mongodb-4.0:latest'
label 'data'
args '-v $HOME:/tmp/jenkins-home'
}
}
options { timeout(time: 30, unit: 'MINUTES') }
steps {
sh 'rm -rf ?'
sh 'mkdir -p /tmp/mongodb/db /tmp/mongodb/log'
sh 'mongod --dbpath /tmp/mongodb/db --replSet rs0 --fork --logpath /tmp/mongodb/log/mongod.log &'
sh 'sleep 10'
sh 'mongo --eval "rs.initiate({_id: \'rs0\', members:[{_id: 0, host: \'127.0.0.1:27017\'}]});"'
sh 'sleep 15'
sh 'MAVEN_OPTS="-Duser.name=jenkins -Duser.home=/tmp/jenkins-home" ./mvnw -Pjava11 clean dependency:list test -Dsort -U -B'
}
}
stage("test: mongodb 4.0 (jdk13)") {
agent {
docker {
image 'springci/spring-data-openjdk13-with-mongodb-4.0:latest'
label 'data'
args '-v $HOME:/tmp/jenkins-home'
}
}
options { timeout(time: 30, unit: 'MINUTES') }
steps {
sh 'rm -rf ?'
sh 'mkdir -p /tmp/mongodb/db /tmp/mongodb/log'
sh 'mongod --dbpath /tmp/mongodb/db --replSet rs0 --fork --logpath /tmp/mongodb/log/mongod.log &'
sh 'sleep 10'
sh 'mongo --eval "rs.initiate({_id: \'rs0\', members:[{_id: 0, host: \'127.0.0.1:27017\'}]});"'
sh 'sleep 15'
sh 'MAVEN_OPTS="-Duser.name=jenkins -Duser.home=/tmp/jenkins-home" ./mvnw -Pjava11 clean dependency:list test -Dsort -U -B'
}
}
stage("test: mongodb 4.1 (jdk8)") {
stage("test: mongodb 4.2 (jdk8)") {
agent {
docker {
image 'springci/spring-data-openjdk8-with-mongodb-4.1:latest'
image 'springci/spring-data-openjdk8-with-mongodb-4.2:latest'
label 'data'
args '-v $HOME:/tmp/jenkins-home'
}
@ -315,10 +171,10 @@ pipeline { @@ -315,10 +171,10 @@ pipeline {
}
}
stage("test: mongodb 4.1 (jdk11)") {
stage("test: baseline (jdk11)") {
agent {
docker {
image 'springci/spring-data-openjdk11-with-mongodb-4.1:latest'
image 'springci/spring-data-openjdk11-with-mongodb-4.2:latest'
label 'data'
args '-v $HOME:/tmp/jenkins-home'
}
@ -335,10 +191,10 @@ pipeline { @@ -335,10 +191,10 @@ pipeline {
}
}
stage("test: mongodb 4.1 (jdk13)") {
stage("test: baseline (jdk13)") {
agent {
docker {
image 'springci/spring-data-openjdk13-with-mongodb-4.1:latest'
image 'springci/spring-data-openjdk13-with-mongodb-4.2:latest'
label 'data'
args '-v $HOME:/tmp/jenkins-home'
}

14
ci/openjdk11-mongodb-4.0/Dockerfile

@ -1,14 +0,0 @@ @@ -1,14 +0,0 @@
FROM adoptopenjdk/openjdk11:latest
RUN apt-get update && apt-get install -y apt-transport-https apt-utils gnupg2
RUN apt-key adv --keyserver hkps://keyserver.ubuntu.com:443 --recv 9DA31620334BD75D9DCB49F368818C72E52529D4
RUN echo "deb [ arch=amd64 ] https://repo.mongodb.org/apt/ubuntu bionic/mongodb-org/4.0 multiverse" | tee /etc/apt/sources.list.d/mongodb-org-4.0.list
RUN apt-get update
RUN apt-get install -y mongodb-org=4.0.9 mongodb-org-server=4.0.9 mongodb-org-shell=4.0.9 mongodb-org-mongos=4.0.9 mongodb-org-tools=4.0.9
RUN apt-get clean \
&& rm -rf /var/lib/apt/lists/*

14
ci/openjdk11-mongodb-4.1/Dockerfile

@ -1,14 +0,0 @@ @@ -1,14 +0,0 @@
FROM adoptopenjdk/openjdk11:latest
RUN apt-get update && apt-get install -y apt-transport-https apt-utils gnupg2
RUN apt-key adv --keyserver hkps://keyserver.ubuntu.com:443 --recv 4B7C549A058F8B6B
RUN echo "deb [ arch=amd64 ] https://repo.mongodb.org/apt/ubuntu bionic/mongodb-org/4.1 multiverse" | tee /etc/apt/sources.list.d/mongodb-org-4.1.list
RUN apt-get update
RUN apt-get install -y mongodb-org-unstable=4.1.13 mongodb-org-unstable-server=4.1.13 mongodb-org-unstable-shell=4.1.13 mongodb-org-unstable-mongos=4.1.13 mongodb-org-unstable-tools=4.1.13
RUN apt-get clean \
&& rm -rf /var/lib/apt/lists/*

2
ci/openjdk11-mongodb-4.2/Dockerfile

@ -8,7 +8,7 @@ RUN echo "deb [ arch=amd64 ] https://repo.mongodb.org/apt/ubuntu bionic/mongodb- @@ -8,7 +8,7 @@ RUN echo "deb [ arch=amd64 ] https://repo.mongodb.org/apt/ubuntu bionic/mongodb-
RUN apt-get update
RUN apt-get install -y mongodb-org=4.2.0 mongodb-org-server=4.2.0 mongodb-org-shell=4.2.0 mongodb-org-mongos=4.2.0 mongodb-org-tools=4.2.0
RUN apt-get install -y mongodb-org=4.2.2 mongodb-org-server=4.2.2 mongodb-org-shell=4.2.2 mongodb-org-mongos=4.2.2 mongodb-org-tools=4.2.2
RUN apt-get clean \
&& rm -rf /var/lib/apt/lists/*

14
ci/openjdk13-mongodb-4.0/Dockerfile

@ -1,14 +0,0 @@ @@ -1,14 +0,0 @@
FROM adoptopenjdk/openjdk13:latest
RUN apt-get update && apt-get install -y apt-transport-https apt-utils gnupg2
RUN apt-key adv --keyserver hkps://keyserver.ubuntu.com:443 --recv 9DA31620334BD75D9DCB49F368818C72E52529D4
RUN echo "deb [ arch=amd64 ] https://repo.mongodb.org/apt/ubuntu bionic/mongodb-org/4.0 multiverse" | tee /etc/apt/sources.list.d/mongodb-org-4.0.list
RUN apt-get update
RUN apt-get install -y mongodb-org=4.0.9 mongodb-org-server=4.0.9 mongodb-org-shell=4.0.9 mongodb-org-mongos=4.0.9 mongodb-org-tools=4.0.9
RUN apt-get clean \
&& rm -rf /var/lib/apt/lists/*

14
ci/openjdk13-mongodb-4.1/Dockerfile

@ -1,14 +0,0 @@ @@ -1,14 +0,0 @@
FROM adoptopenjdk/openjdk13:latest
RUN apt-get update && apt-get install -y apt-transport-https apt-utils gnupg2
RUN apt-key adv --keyserver hkps://keyserver.ubuntu.com:443 --recv 4B7C549A058F8B6B
RUN echo "deb [ arch=amd64 ] https://repo.mongodb.org/apt/ubuntu bionic/mongodb-org/4.1 multiverse" | tee /etc/apt/sources.list.d/mongodb-org-4.1.list
RUN apt-get update
RUN apt-get install -y mongodb-org-unstable=4.1.13 mongodb-org-unstable-server=4.1.13 mongodb-org-unstable-shell=4.1.13 mongodb-org-unstable-mongos=4.1.13 mongodb-org-unstable-tools=4.1.13
RUN apt-get clean \
&& rm -rf /var/lib/apt/lists/*

2
ci/openjdk13-mongodb-4.2/Dockerfile

@ -8,7 +8,7 @@ RUN echo "deb [ arch=amd64 ] https://repo.mongodb.org/apt/ubuntu bionic/mongodb- @@ -8,7 +8,7 @@ RUN echo "deb [ arch=amd64 ] https://repo.mongodb.org/apt/ubuntu bionic/mongodb-
RUN apt-get update
RUN apt-get install -y mongodb-org=4.2.0 mongodb-org-server=4.2.0 mongodb-org-shell=4.2.0 mongodb-org-mongos=4.2.0 mongodb-org-tools=4.2.0
RUN apt-get install -y mongodb-org=4.2.2 mongodb-org-server=4.2.2 mongodb-org-shell=4.2.2 mongodb-org-mongos=4.2.2 mongodb-org-tools=4.2.2
RUN apt-get clean \
&& rm -rf /var/lib/apt/lists/*

4
ci/openjdk8-mongodb-4.0/Dockerfile

@ -8,7 +8,7 @@ RUN echo "deb [ arch=amd64 ] https://repo.mongodb.org/apt/ubuntu bionic/mongodb- @@ -8,7 +8,7 @@ RUN echo "deb [ arch=amd64 ] https://repo.mongodb.org/apt/ubuntu bionic/mongodb-
RUN apt-get update
RUN apt-get install -y mongodb-org=4.0.9 mongodb-org-server=4.0.9 mongodb-org-shell=4.0.9 mongodb-org-mongos=4.0.9 mongodb-org-tools=4.0.9
RUN apt-get install -y mongodb-org=4.0.14 mongodb-org-server=4.0.14 mongodb-org-shell=4.0.14 mongodb-org-mongos=4.0.14 mongodb-org-tools=4.0.14
RUN apt-get clean \
&& rm -rf /var/lib/apt/lists/*
&& rm -rf /var/lib/apt/lists/*

14
ci/openjdk8-mongodb-4.1/Dockerfile

@ -1,14 +0,0 @@ @@ -1,14 +0,0 @@
FROM adoptopenjdk/openjdk8:latest
RUN apt-get update && apt-get install -y apt-transport-https apt-utils gnupg2
RUN apt-key adv --keyserver hkps://keyserver.ubuntu.com:443 --recv 4B7C549A058F8B6B
RUN echo "deb [ arch=amd64 ] https://repo.mongodb.org/apt/ubuntu bionic/mongodb-org/4.1 multiverse" | tee /etc/apt/sources.list.d/mongodb-org-4.1.list
RUN apt-get update
RUN apt-get install -y mongodb-org-unstable=4.1.13 mongodb-org-unstable-server=4.1.13 mongodb-org-unstable-shell=4.1.13 mongodb-org-unstable-mongos=4.1.13 mongodb-org-unstable-tools=4.1.13
RUN apt-get clean \
&& rm -rf /var/lib/apt/lists/*

2
ci/openjdk8-mongodb-4.2/Dockerfile

@ -8,7 +8,7 @@ RUN echo "deb [ arch=amd64 ] https://repo.mongodb.org/apt/ubuntu bionic/mongodb- @@ -8,7 +8,7 @@ RUN echo "deb [ arch=amd64 ] https://repo.mongodb.org/apt/ubuntu bionic/mongodb-
RUN apt-get update
RUN apt-get install -y mongodb-org=4.2.0 mongodb-org-server=4.2.0 mongodb-org-shell=4.2.0 mongodb-org-mongos=4.2.0 mongodb-org-tools=4.2.0
RUN apt-get install -y mongodb-org=4.2.2 mongodb-org-server=4.2.2 mongodb-org-shell=4.2.2 mongodb-org-mongos=4.2.2 mongodb-org-tools=4.2.2
RUN apt-get clean \
&& rm -rf /var/lib/apt/lists/*

14
ci/openjdk8-mongodb-4.3/Dockerfile

@ -0,0 +1,14 @@ @@ -0,0 +1,14 @@
FROM adoptopenjdk/openjdk8:latest
RUN apt-get update && apt-get install -y apt-transport-https apt-utils gnupg2
RUN apt-key adv --keyserver hkps://keyserver.ubuntu.com:443 --recv e162f504a20cdf15827f718d4b7c549a058f8b6b
RUN echo "deb [ arch=amd64 ] https://repo.mongodb.org/apt/ubuntu bionic/mongodb-org/development multiverse" | tee /etc/apt/sources.list.d/mongodb-org-4.3.list
RUN apt-get update
RUN apt-get install -y mongodb-org=4.3.2 mongodb-org-server=4.3.2 mongodb-org-shell=4.3.2 mongodb-org-mongos=4.3.2 mongodb-org-tools=4.3.2
RUN apt-get clean \
&& rm -rf /var/lib/apt/lists/*

2
pom.xml

@ -27,7 +27,7 @@ @@ -27,7 +27,7 @@
<project.type>multi</project.type>
<dist.id>spring-data-mongodb</dist.id>
<springdata.commons>2.3.0.BUILD-SNAPSHOT</springdata.commons>
<mongo>4.0.0-SNAPSHOT</mongo>
<mongo>4.0.0-beta1</mongo>
<mongo.reactivestreams>${mongo}</mongo.reactivestreams>
<jmh.version>1.19</jmh.version>
</properties>

5
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

@ -3014,7 +3014,10 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -3014,7 +3014,10 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
}
result = options.getCollation().map(Collation::toMongoCollation).map(result::collation).orElse(result);
result.arrayFilters(arrayFilters);
if(!CollectionUtils.isEmpty(arrayFilters)) {
result.arrayFilters(arrayFilters);
}
return result;
}

88
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java

@ -18,9 +18,14 @@ package org.springframework.data.mongodb.gridfs; @@ -18,9 +18,14 @@ package org.springframework.data.mongodb.gridfs;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Publisher;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
@ -37,19 +42,34 @@ import com.mongodb.reactivestreams.client.gridfs.GridFSDownloadPublisher; @@ -37,19 +42,34 @@ import com.mongodb.reactivestreams.client.gridfs.GridFSDownloadPublisher;
*/
public class ReactiveGridFsResource {
private final GridFSDownloadPublisher content;
private final AtomicBoolean consumed = new AtomicBoolean(false);
private final String filename;
private final @Nullable GridFSDownloadPublisher downloadPublisher;
private final DataBufferFactory dataBufferFactory;
/**
* Creates a new, absent {@link ReactiveGridFsResource}.
*
* @param filename filename of the absent resource.
* @param content
* @param downloadPublisher
*/
public ReactiveGridFsResource(String filename, @Nullable GridFSDownloadPublisher content) {
public ReactiveGridFsResource(String filename, @Nullable GridFSDownloadPublisher downloadPublisher) {
this(filename, downloadPublisher, new DefaultDataBufferFactory());
}
/**
* Creates a new, absent {@link ReactiveGridFsResource}.
*
* @param filename filename of the absent resource.
* @param downloadPublisher
*/
ReactiveGridFsResource(String filename, @Nullable GridFSDownloadPublisher downloadPublisher,
DataBufferFactory dataBufferFactory) {
this.content = content;
this.filename = filename;
this.downloadPublisher = downloadPublisher;
this.dataBufferFactory = dataBufferFactory;
}
/**
@ -77,39 +97,81 @@ public class ReactiveGridFsResource { @@ -77,39 +97,81 @@ public class ReactiveGridFsResource {
* @since 2.2
*/
public Mono<GridFSFile> getGridFSFile() {
return content != null ? Mono.from(content.getGridFSFile()) : Mono.empty();
return downloadPublisher != null ? Mono.from(downloadPublisher.getGridFSFile()) : Mono.empty();
}
/**
* Obtain the data as {@link InputStream}. <br />
* <strong>NOTE</strong> Buffers data in memory. Use {@link #getDownloadStream()} for large files.
*
* @throws IllegalStateException if the underlying {@link Publisher} has already been consumed.
* @see org.springframework.core.io.InputStreamResource#getInputStream()
* @see #getDownloadStream()
* @see DataBufferUtils#join(Publisher)
* @since 3.0
*/
public Mono<InputStream> getInputStream() throws IllegalStateException {
return getDownloadStream() //
.transform(DataBufferUtils::join) //
.as(Mono::from) //
.map(DataBuffer::asInputStream);
}
/**
* Obtain the download stream emitting chunks of data as they come in. <br />
*
* @return {@link Flux#empty()} if the file does not exist.
* @throws IllegalStateException if the underlying {@link Publisher} has already been consumed.
* @see org.springframework.core.io.InputStreamResource#getInputStream()
* @see #getDownloadStream()
* @see DataBufferUtils#join(Publisher)
* @since 3.0
*/
public Flux<DataBuffer> getDownloadStream() {
if (content == null) {
if (downloadPublisher == null) {
return Flux.empty();
}
return createDownloadStream(content);
return createDownloadStream(downloadPublisher);
}
/**
* Obtain the download stream emitting chunks of data with given {@code chunkSize} as they come in.
*
* @param chunkSize the preferred number of bytes per emitted {@link DataBuffer}.
* @return {@link Flux#empty()} if the file does not exist.
* @throws IllegalStateException if the underlying {@link Publisher} has already been consumed.
* @see org.springframework.core.io.InputStreamResource#getInputStream()
* @see #getDownloadStream()
* @see DataBufferUtils#join(Publisher)
* @since 3.0
*/
public Flux<DataBuffer> getDownloadStream(int chunkSize) {
if (content == null) {
if (downloadPublisher == null) {
return Flux.empty();
}
return createDownloadStream(content.bufferSizeBytes(chunkSize));
return createDownloadStream(downloadPublisher.bufferSizeBytes(chunkSize));
}
private Flux<DataBuffer> createDownloadStream(GridFSDownloadPublisher publisher) {
DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
return Flux.from(publisher).map(bufferFactory::wrap);
return Flux.from(publisher) //
.map(dataBufferFactory::wrap) //
.doOnSubscribe(it -> this.verifyStreamStillAvailable());
}
public boolean exists() {
return content != null;
return downloadPublisher != null;
}
private void verifyStreamStillAvailable() {
if (!consumed.compareAndSet(false, true)) {
throw new IllegalStateException("Stream already consumed.");
}
}
}

14
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java

@ -132,7 +132,7 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R @@ -132,7 +132,7 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
uploadOptions.metadata(metadata);
GridFSUploadPublisher<ObjectId> publisher = getGridFs().uploadFromPublisher(filename,
Flux.from(content).map(this::dataBufferToByteBuffer), uploadOptions);
Flux.from(content).map(DataBuffer::asByteBuffer), uploadOptions);
return Mono.from(publisher);
}
@ -209,7 +209,7 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R @@ -209,7 +209,7 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
Assert.notNull(file, "GridFSFile must not be null!");
return Mono.fromSupplier(() -> {
return new ReactiveGridFsResource(file.getFilename(), getGridFs().downloadToPublisher(file.getId()));
return new ReactiveGridFsResource(file.getFilename(), getGridFs().downloadToPublisher(file.getId()), dataBufferFactory);
});
}
@ -265,14 +265,4 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R @@ -265,14 +265,4 @@ public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements R
MongoDatabase db = dbFactory.getMongoDatabase();
return bucket == null ? GridFSBuckets.create(db) : GridFSBuckets.create(db, bucket);
}
private ByteBuffer dataBufferToByteBuffer(DataBuffer buffer) {
ByteBuffer byteBuffer = buffer.asByteBuffer();
ByteBuffer copy = ByteBuffer.allocate(byteBuffer.remaining());
byteBuffer.put(copy);
copy.flip();
return copy;
}
}

1
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java

@ -50,7 +50,6 @@ import org.springframework.transaction.reactive.TransactionalOperator; @@ -50,7 +50,6 @@ import org.springframework.transaction.reactive.TransactionalOperator;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoClients;
/**
* Integration tests for reactive transaction management.

4
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateUpdateTests.java

@ -22,6 +22,7 @@ import java.util.Arrays; @@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import lombok.EqualsAndHashCode;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@ -257,7 +258,7 @@ class MongoTemplateUpdateTests { @@ -257,7 +258,7 @@ class MongoTemplateUpdateTests {
}
@Test // DATAMONGO-2331
@Disabled("https://jira.mongodb.org/browse/JAVA-3432")
@EnableIfMongoServerVersion(isGreaterThanEqual = "4.2")
void findAndModifyAppliesAggregationUpdateCorrectly() {
Book one = new Book();
@ -336,6 +337,7 @@ class MongoTemplateUpdateTests { @@ -336,6 +337,7 @@ class MongoTemplateUpdateTests {
}
}
@EqualsAndHashCode
static class Book {
@Id Integer id;

3
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUpdateTests.java

@ -17,6 +17,7 @@ package org.springframework.data.mongodb.core; @@ -17,6 +17,7 @@ package org.springframework.data.mongodb.core;
import static org.assertj.core.api.Assertions.*;
import lombok.EqualsAndHashCode;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
@ -240,7 +241,6 @@ public class ReactiveMongoTemplateUpdateTests { @@ -240,7 +241,6 @@ public class ReactiveMongoTemplateUpdateTests {
}
@Test // DATAMONGO-2331
@Disabled("https://jira.mongodb.org/browse/JAVA-3432")
public void findAndModifyAppliesAggregationUpdateCorrectly() {
Book one = new Book();
@ -310,6 +310,7 @@ public class ReactiveMongoTemplateUpdateTests { @@ -310,6 +310,7 @@ public class ReactiveMongoTemplateUpdateTests {
}
}
@EqualsAndHashCode
static class Book {
@Id Integer id;

97
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResourceUnitTests.java

@ -0,0 +1,97 @@ @@ -0,0 +1,97 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.gridfs;
import static org.assertj.core.api.Assertions.*;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.nio.ByteBuffer;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import com.mongodb.client.gridfs.model.GridFSFile;
import com.mongodb.reactivestreams.client.gridfs.GridFSDownloadPublisher;
/**
* @author Christoph Strobl
*/
class ReactiveGridFsResourceUnitTests {
@Test // DATAMONGO-2427
void streamCanOnlyBeConsumedOnce() {
ReactiveGridFsResource resource = new ReactiveGridFsResource("file.name", new StubGridFSDownloadPublisher());
assertThat(resource.exists()).isTrue();
resource.getInputStream().as(StepVerifier::create).verifyComplete();
resource.getInputStream().as(StepVerifier::create).verifyError(IllegalStateException.class);
resource.getDownloadStream().as(StepVerifier::create).verifyError(IllegalStateException.class);
}
@Test // DATAMONGO-2427
void existReturnsFalseForNullPublisher() {
ReactiveGridFsResource resource = new ReactiveGridFsResource("file.name", null);
assertThat(resource.exists()).isFalse();
}
@Test // DATAMONGO-2427
void nonExistingResourceProducesEmptyDownloadStream() {
ReactiveGridFsResource resource = new ReactiveGridFsResource("file.name", null);
resource.getInputStream().as(StepVerifier::create).verifyComplete();
resource.getInputStream().as(StepVerifier::create).verifyComplete();
resource.getDownloadStream().as(StepVerifier::create).verifyComplete();
}
private static class StubGridFSDownloadPublisher implements GridFSDownloadPublisher {
@Override
public Publisher<GridFSFile> getGridFSFile() {
return Mono.empty();
}
@Override
public GridFSDownloadPublisher bufferSizeBytes(int bufferSizeBytes) {
return null;
}
@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
s.onSubscribe(new Subscription() {
@Override
public void request(long n) {
s.onComplete();
}
@Override
public void cancel() {
}
});
}
}
}

92
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java

@ -25,6 +25,7 @@ import reactor.core.publisher.Mono; @@ -25,6 +25,7 @@ import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.util.Arrays;
@ -32,7 +33,6 @@ import org.bson.BsonObjectId; @@ -32,7 +33,6 @@ import org.bson.BsonObjectId;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.reactivestreams.Publisher;
@ -53,14 +53,11 @@ import org.springframework.data.mongodb.core.convert.MongoConverter; @@ -53,14 +53,11 @@ import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.FileCopyUtils;
import org.springframework.util.StreamUtils;
import com.mongodb.client.gridfs.model.GridFSFile;
import com.mongodb.internal.HexUtils;
import com.mongodb.internal.connection.tlschannel.impl.ByteBufferUtil;
import com.mongodb.reactivestreams.client.gridfs.GridFSBucket;
import com.mongodb.reactivestreams.client.gridfs.GridFSBuckets;
import com.mongodb.reactivestreams.client.gridfs.GridFSUploadPublisher;
import com.mongodb.reactivestreams.client.internal.Publishers;
/**
@ -91,7 +88,6 @@ public class ReactiveGridFsTemplateTests { @@ -91,7 +88,6 @@ public class ReactiveGridFsTemplateTests {
}
@Test // DATAMONGO-1855
@Ignore("https://jira.mongodb.org/browse/JAVARS-224")
public void storesAndFindsSimpleDocument() {
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
@ -109,7 +105,6 @@ public class ReactiveGridFsTemplateTests { @@ -109,7 +105,6 @@ public class ReactiveGridFsTemplateTests {
}
@Test // DATAMONGO-1855
@Ignore("https://jira.mongodb.org/browse/JAVARS-224")
public void storesAndLoadsLargeFileCorrectly() {
ByteBuffer buffer = ByteBuffer.allocate(1000 * 1000); // 1 mb
@ -148,34 +143,12 @@ public class ReactiveGridFsTemplateTests { @@ -148,34 +143,12 @@ public class ReactiveGridFsTemplateTests {
}).verifyComplete();
}
// @Test // DATAMONGO-2392
// public void storesAndFindsByUUID() throws IOException {
//
// UUID uuid = UUID.randomUUID();
//
// GridFS fs = new GridFS(mongoClient.getLegacyDb());
// GridFSInputFile in = fs.createFile(resource.getInputStream(), "gridfs.xml");
//
// in.put("_id", uuid);
// in.put("contentType", "application/octet-stream");
// in.save();
//
// operations.findOne(query(where("_id").is(uuid))).flatMap(operations::getResource)
// .flatMapMany(ReactiveGridFsResource::getDownloadStream) //
// .transform(DataBufferUtils::join) //
// .doOnNext(DataBufferUtils::release).as(StepVerifier::create) //
// .expectNextCount(1).verifyComplete();
// }
@Test // DATAMONGO-1855
@Ignore("https://jira.mongodb.org/browse/JAVARS-224")
public void writesMetadataCorrectly() throws IOException {
Document metadata = new Document("key", "value");
Flux<DataBuffer> source = DataBufferUtils.read(resource, new DefaultDataBufferFactory(), 256);
// AsyncInputStream stream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream());
ObjectId reference = operations.store(source, "foo.xml", "binary/octet-stream", metadata).block();
operations.find(query(whereMetaData("key").is("value"))) //
@ -187,8 +160,7 @@ public class ReactiveGridFsTemplateTests { @@ -187,8 +160,7 @@ public class ReactiveGridFsTemplateTests {
}
@Test // DATAMONGO-1855
@Ignore("https://jira.mongodb.org/browse/JAVARS-224")
public void marshalsComplexMetadata() throws IOException {
public void marshalsComplexMetadata() {
Metadata metadata = new Metadata();
metadata.version = "1.0";
@ -206,7 +178,6 @@ public class ReactiveGridFsTemplateTests { @@ -206,7 +178,6 @@ public class ReactiveGridFsTemplateTests {
}
@Test // DATAMONGO-1855
@Ignore("https://jira.mongodb.org/browse/JAVARS-224")
public void getResourceShouldRetrieveContentByIdentity() throws IOException {
byte[] content = StreamUtils.copyToByteArray(resource.getInputStream());
@ -228,7 +199,6 @@ public class ReactiveGridFsTemplateTests { @@ -228,7 +199,6 @@ public class ReactiveGridFsTemplateTests {
}
@Test // DATAMONGO-1855, DATAMONGO-2240
@Ignore("https://jira.mongodb.org/browse/JAVARS-224")
public void shouldEmitFirstEntryWhenFindFirstRetrievesMoreThanOneResult() throws IOException {
Flux<DataBuffer> upload1 = DataBufferUtils.read(resource, new DefaultDataBufferFactory(), 256);
@ -260,7 +230,6 @@ public class ReactiveGridFsTemplateTests { @@ -260,7 +230,6 @@ public class ReactiveGridFsTemplateTests {
}
@Test // DATAMONGO-1855
@Ignore("https://jira.mongodb.org/browse/JAVARS-224")
public void shouldEmitErrorWhenFindOneRetrievesMoreThanOneResult() throws IOException {
Flux<DataBuffer> upload1 = DataBufferUtils.read(resource, new DefaultDataBufferFactory(), 256);
@ -277,7 +246,6 @@ public class ReactiveGridFsTemplateTests { @@ -277,7 +246,6 @@ public class ReactiveGridFsTemplateTests {
}
@Test // DATAMONGO-1855
@Ignore("https://jira.mongodb.org/browse/JAVARS-224")
public void getResourcesByPattern() throws IOException {
byte[] content = StreamUtils.copyToByteArray(resource.getInputStream());
@ -300,7 +268,6 @@ public class ReactiveGridFsTemplateTests { @@ -300,7 +268,6 @@ public class ReactiveGridFsTemplateTests {
}
@Test // DATAMONGO-765
@Ignore("https://jira.mongodb.org/browse/JAVARS-224")
public void considersSkipLimitWhenQueryingFiles() {
DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
@ -326,56 +293,15 @@ public class ReactiveGridFsTemplateTests { @@ -326,56 +293,15 @@ public class ReactiveGridFsTemplateTests {
String version;
}
@Test //
@Ignore("https://jira.mongodb.org/browse/JAVARS-224")
public void xxx() {
GridFSBucket buckets = GridFSBuckets.create(dbFactory.getMongoDatabase());
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
DefaultDataBuffer first = factory.wrap("first".getBytes());
// DefaultDataBuffer second = factory.wrap("second".getBytes());
Flux<DefaultDataBuffer> source = Flux.just(first);
// GridFSUploadPublisher<ObjectId> objectIdGridFSUploadPublisher = buckets.uploadFromPublisher("foo.xml",
// Mono.just(ByteBuffer.wrap("hello".getBytes())));
GridFSUploadPublisher<ObjectId> objectIdGridFSUploadPublisher = buckets.uploadFromPublisher("foo.xml",
source.map(DataBuffer::asByteBuffer));
Mono<ObjectId> idPublisher = Mono.from(objectIdGridFSUploadPublisher);
idPublisher.as(StepVerifier::create).expectNextCount(1).verifyComplete();
}
@Test
@Ignore("https://jira.mongodb.org/browse/JAVARS-224")
public void xxx2() {
GridFSBucket buckets = GridFSBuckets.create(dbFactory.getMongoDatabase());
Flux<ByteBuffer> source = Flux.just(ByteBuffer.wrap("first".getBytes()), ByteBuffer.wrap("second".getBytes()));
Publisher<ByteBuffer> rawSource = toPublisher(ByteBuffer.wrap("first".getBytes()),
ByteBuffer.wrap("second".getBytes()));
// GridFSUploadPublisher<ObjectId> objectIdGridFSUploadPublisher = buckets.uploadFromPublisher("foo.xml",
// Mono.just(ByteBuffer.wrap("hello".getBytes())));
// GridFSUploadPublisher<ObjectId> objectIdGridFSUploadPublisher = buckets.uploadFromPublisher("foo.xml", source);
GridFSUploadPublisher<ObjectId> objectIdGridFSUploadPublisher = buckets.uploadFromPublisher("foo.xml", rawSource);
Mono.from(objectIdGridFSUploadPublisher).as(StepVerifier::create).expectNextCount(1).verifyComplete();
// idPublisher;
}
private static Publisher<ByteBuffer> toPublisher(final ByteBuffer... byteBuffers) {
return Publishers.publishAndFlatten(callback -> callback.onResult(Arrays.asList(byteBuffers), null));
}
private ByteBuffer hack(DataBuffer buffer) {
ByteBuffer byteBuffer = buffer.asByteBuffer();
ByteBuffer copy = ByteBuffer.allocate(byteBuffer.remaining());
ByteBufferUtil.copy(byteBuffer, copy, byteBuffer.arrayOffset());
copy.flip();
return copy;
public static String readToString(DataBuffer dataBuffer) {
try {
return FileCopyUtils.copyToString(new InputStreamReader(dataBuffer.asInputStream()));
} catch (IOException e) {
return e.getMessage();
}
}
}

4
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/test/util/MongoTestUtils.java

@ -41,9 +41,9 @@ import com.mongodb.reactivestreams.client.MongoClients; @@ -41,9 +41,9 @@ import com.mongodb.reactivestreams.client.MongoClients;
*/
public class MongoTestUtils {
public static final String CONNECTION_STRING = "mongodb://localhost:27017/?replicaSet=rs0"; // &readPreference=primary&w=majority
public static final String CONNECTION_STRING = "mongodb://localhost:27017/?replicaSet=rs0&w=majority";
private static final String CONNECTION_STRING_PATTERN = "mongodb://%s:%s/";
private static final String CONNECTION_STRING_PATTERN = "mongodb://%s:%s/?w=majority";
private static final Version ANY = new Version(9999, 9999, 9999);

Loading…
Cancel
Save