@ -38,13 +38,16 @@ import org.springframework.data.mongodb.core.ChangeStreamOptions;
@@ -38,13 +38,16 @@ import org.springframework.data.mongodb.core.ChangeStreamOptions;
import org.springframework.data.mongodb.core.MongoTemplate ;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions ;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions ;
import org.springframework.data.mongodb.test.util.Client ;
import org.springframework.data.mongodb.test.util.EnableIfMongoServerVersion ;
import org.springframework.data.mongodb.test.util.EnableIfReplicaSetAvailable ;
import org.springframework.data.mongodb.test.util.MongoServerCondition ;
import org.springframework.data.mongodb.test.util.MongoTemplateExtension ;
import org.springframework.data.mongodb.test.util.MongoTestUtils ;
import org.springframework.data.mongodb.test.util.Template ;
import org.springframework.util.ErrorHandler ;
import com.mongodb.client.MongoClient ;
import com.mongodb.client.MongoCollection ;
import com.mongodb.client.model.CreateCollectionOptions ;
import com.mongodb.client.model.changestream.ChangeStreamDocument ;
@ -60,9 +63,12 @@ public class DefaultMessageListenerContainerTests {
@@ -60,9 +63,12 @@ public class DefaultMessageListenerContainerTests {
static final String DATABASE_NAME = "change-stream-events" ;
static final String COLLECTION_NAME = "collection-1" ;
static final String COLLECTION_2_NAME = "collection-2" ;
static final String COLLECTION_3_NAME = "collection-3" ;
static final Duration TIMEOUT = Duration . ofSeconds ( 2 ) ;
@Client static MongoClient client ;
@Template ( database = DATABASE_NAME , initialEntitySet = Person . class ) //
static MongoTemplate template ;
@ -74,10 +80,13 @@ public class DefaultMessageListenerContainerTests {
@@ -74,10 +80,13 @@ public class DefaultMessageListenerContainerTests {
private CollectingMessageListener < Object , Object > messageListener ;
@BeforeEach
void beforeEach ( ) {
void beforeEach ( ) throws InterruptedException {
MongoTestUtils . dropCollectionNow ( DATABASE_NAME , COLLECTION_NAME , client ) ;
MongoTestUtils . dropCollectionNow ( DATABASE_NAME , COLLECTION_2_NAME , client ) ;
MongoTestUtils . dropCollectionNow ( DATABASE_NAME , COLLECTION_3_NAME , client ) ;
template . dropCollection ( COLLECTION_NAME ) ;
template . dropCollection ( COLLECTION_2_NAME ) ;
Thread . sleep ( 100 ) ;
messageListener = new CollectingMessageListener < > ( ) ;
}
@ -281,7 +290,7 @@ public class DefaultMessageListenerContainerTests {
@@ -281,7 +290,7 @@ public class DefaultMessageListenerContainerTests {
@Test // DATAMONGO-1803
public void callsDefaultErrorHandlerOnError ( ) throws InterruptedException {
dbFactory . getMongoDatabase ( ) . createCollection ( COLLECTION_NAME ,
dbFactory . getMongoDatabase ( ) . createCollection ( COLLECTION_3_ NAME ,
new CreateCollectionOptions ( ) . capped ( true ) . maxDocuments ( 10000 ) . sizeInBytes ( 10000 ) ) ;
collection . insertOne ( new Document ( "_id" , "id-1" ) . append ( "value" , "foo" ) ) ;
@ -298,10 +307,7 @@ public class DefaultMessageListenerContainerTests {
@@ -298,10 +307,7 @@ public class DefaultMessageListenerContainerTests {
Document . class ) ;
SubscriptionUtils . awaitSubscription ( subscription ) ;
template . dropCollection ( COLLECTION_NAME ) ;
Thread . sleep ( 20 ) ;
dbFactory . getMongoDatabase ( ) . drop ( ) ;
verify ( errorHandler , atLeast ( 1 ) ) . handleError ( any ( DataAccessException . class ) ) ;
} finally {