@ -18,6 +18,7 @@ package org.springframework.data.mongodb.core.messaging;
@@ -18,6 +18,7 @@ package org.springframework.data.mongodb.core.messaging;
import java.time.Duration ;
import java.util.concurrent.CountDownLatch ;
import java.util.concurrent.TimeUnit ;
import java.util.function.Supplier ;
import org.springframework.dao.DataAccessResourceFailureException ;
import org.springframework.data.mongodb.core.MongoTemplate ;
@ -75,8 +76,11 @@ abstract class CursorReadingTask<T, R> implements Task {
@@ -75,8 +76,11 @@ abstract class CursorReadingTask<T, R> implements Task {
start ( ) ;
while ( isRunning ( ) ) {
try {
T next = getNext ( ) ;
T next = execute ( this : : getNext ) ;
if ( next ! = null ) {
emitMessage ( createMessage ( next , targetType , request . getRequestOptions ( ) ) ) ;
} else {
@ -88,12 +92,6 @@ abstract class CursorReadingTask<T, R> implements Task {
@@ -88,12 +92,6 @@ abstract class CursorReadingTask<T, R> implements Task {
state = State . CANCELLED ;
}
Thread . interrupted ( ) ;
} catch ( RuntimeException e ) {
Exception translated = template . getExceptionTranslator ( ) . translateExceptionIfPossible ( e ) ;
Exception toHandle = translated ! = null ? translated : e ;
errorHandler . handleError ( toHandle ) ;
}
}
}
@ -101,11 +99,11 @@ abstract class CursorReadingTask<T, R> implements Task {
@@ -101,11 +99,11 @@ abstract class CursorReadingTask<T, R> implements Task {
/ * *
* Initialize the Task by 1st setting the current state to { @link State # STARTING starting } indicating the
* initialization procedure . < br / >
* Moving on the underlying { @link MongoCursor } gets { @link # initCursor ( MongoTemplate , RequestOptions ) created } and is
* { @link # isValidCursor ( MongoCursor ) health checked } . Once a valid { @link MongoCursor } is created the { @link # state }
* is set to { @link State # RUNNING running } . If the health check is not passed the { @link MongoCursor } is immediately
* { @link MongoCursor # close ( ) closed } and a new { @link MongoCursor } is requested until a valid one is retrieved or the
* { @link # state } changes .
* Moving on the underlying { @link MongoCursor } gets { @link # initCursor ( MongoTemplate , RequestOptions , Class ) created }
* and is { @link # isValidCursor ( MongoCursor ) health checked } . Once a valid { @link MongoCursor } is created the
* { @link # state } is set to { @link State # RUNNING running } . If the health check is not passed the { @link MongoCursor }
* is immediately { @link MongoCursor # close ( ) closed } and a new { @link MongoCursor } is requested until a valid one is
* retrieved or the { @link # state } changes .
* /
private void start ( ) {
@ -123,7 +121,7 @@ abstract class CursorReadingTask<T, R> implements Task {
@@ -123,7 +121,7 @@ abstract class CursorReadingTask<T, R> implements Task {
if ( State . STARTING . equals ( state ) ) {
MongoCursor < T > cursor = initCursor ( template , request . getRequestOptions ( ) , targetType ) ;
MongoCursor < T > cursor = execute ( ( ) - > initCursor ( template , request . getRequestOptions ( ) , targetType ) ) ;
valid = isValidCursor ( cursor ) ;
if ( valid ) {
this . cursor = cursor ;
@ -248,4 +246,29 @@ abstract class CursorReadingTask<T, R> implements Task {
@@ -248,4 +246,29 @@ abstract class CursorReadingTask<T, R> implements Task {
return true ;
}
/ * *
* Execute an operation and take care of translating exceptions using the { @link MongoTemplate templates }
* { @link org . springframework . data . mongodb . core . MongoExceptionTranslator } and passing those on to the
* { @link # errorHandler } .
*
* @param callback must not be { @literal null } .
* @param < T >
* @return can be { @literal null } .
* @throws RuntimeException The potentially translated exception .
* /
@Nullable
private < T > T execute ( Supplier < T > callback ) {
try {
return callback . get ( ) ;
} catch ( RuntimeException e ) {
RuntimeException translated = template . getExceptionTranslator ( ) . translateExceptionIfPossible ( e ) ;
RuntimeException toHandle = translated ! = null ? translated : e ;
errorHandler . handleError ( toHandle ) ;
throw toHandle ;
}
}
}