@ -18,6 +18,8 @@ package org.springframework.data.mongodb.core.messaging;
@@ -18,6 +18,8 @@ package org.springframework.data.mongodb.core.messaging;
import java.time.Duration ;
import java.util.concurrent.CountDownLatch ;
import java.util.concurrent.TimeUnit ;
import java.util.concurrent.locks.Lock ;
import java.util.concurrent.locks.ReentrantLock ;
import java.util.function.Supplier ;
import org.springframework.dao.DataAccessResourceFailureException ;
@ -39,7 +41,7 @@ import com.mongodb.client.MongoCursor;
@@ -39,7 +41,7 @@ import com.mongodb.client.MongoCursor;
* /
abstract class CursorReadingTask < T , R > implements Task {
private final Object lifecycleMonitor = new Object ( ) ;
private final Lock lock = new ReentrantLock ( ) ;
private final MongoTemplate template ;
private final SubscriptionRequest < T , R , RequestOptions > request ;
@ -86,19 +88,14 @@ abstract class CursorReadingTask<T, R> implements Task {
@@ -86,19 +88,14 @@ abstract class CursorReadingTask<T, R> implements Task {
}
} catch ( InterruptedException e ) {
synchronized ( lifecycleMonitor ) {
state = State . CANCELLED ;
}
doWhileLocked ( lock , ( ) - > state = State . CANCELLED ) ;
Thread . currentThread ( ) . interrupt ( ) ;
break ;
}
}
} catch ( RuntimeException e ) {
synchronized ( lifecycleMonitor ) {
state = State . CANCELLED ;
}
doWhileLocked ( lock , ( ) - > state = State . CANCELLED ) ;
errorHandler . handleError ( e ) ;
}
}
@ -114,30 +111,32 @@ abstract class CursorReadingTask<T, R> implements Task {
@@ -114,30 +111,32 @@ abstract class CursorReadingTask<T, R> implements Task {
* /
private void start ( ) {
synchronized ( lifecycleMonitor ) {
doWhileLocked ( lock , ( ) - > {
if ( ! State . RUNNING . equals ( state ) ) {
state = State . STARTING ;
}
}
} ) ;
do {
boolean valid = false ;
// boolean valid = false;
synchronized ( lifecycleMonitor ) {
boolean valid = executeWhileLocked ( lock , ( ) - > {
if ( State . STARTING . equals ( state ) ) {
if ( ! State . STARTING . equals ( state ) ) {
return false ;
}
MongoCursor < T > cursor = execute ( ( ) - > initCursor ( template , request . getRequestOptions ( ) , targetType ) ) ;
valid = isValidCursor ( cursor ) ;
if ( valid ) {
this . cursor = cursor ;
state = State . RUNNING ;
} else if ( cursor ! = null ) {
cursor . close ( ) ;
}
MongoCursor < T > cursor = execute ( ( ) - > initCursor ( template , request . getRequestOptions ( ) , targetType ) ) ;
boolean isValid = isValidCursor ( cursor ) ;
if ( isValid ) {
this . cursor = cursor ;
state = State . RUNNING ;
} else if ( cursor ! = null ) {
cursor . close ( ) ;
}
}
return isValid ;
} ) ;
if ( ! valid ) {
@ -145,9 +144,7 @@ abstract class CursorReadingTask<T, R> implements Task {
@@ -145,9 +144,7 @@ abstract class CursorReadingTask<T, R> implements Task {
Thread . sleep ( 100 ) ;
} catch ( InterruptedException e ) {
synchronized ( lifecycleMonitor ) {
state = State . CANCELLED ;
}
doWhileLocked ( lock , ( ) - > state = State . CANCELLED ) ;
Thread . currentThread ( ) . interrupt ( ) ;
}
}
@ -163,7 +160,7 @@ abstract class CursorReadingTask<T, R> implements Task {
@@ -163,7 +160,7 @@ abstract class CursorReadingTask<T, R> implements Task {
@Override
public void cancel ( ) throws DataAccessResourceFailureException {
synchronized ( lifecycleMonitor ) {
doWhileLocked ( lock , ( ) - > {
if ( State . RUNNING . equals ( state ) | | State . STARTING . equals ( state ) ) {
this . state = State . CANCELLED ;
@ -171,7 +168,7 @@ abstract class CursorReadingTask<T, R> implements Task {
@@ -171,7 +168,7 @@ abstract class CursorReadingTask<T, R> implements Task {
cursor . close ( ) ;
}
}
}
} ) ;
}
@Override
@ -181,10 +178,7 @@ abstract class CursorReadingTask<T, R> implements Task {
@@ -181,10 +178,7 @@ abstract class CursorReadingTask<T, R> implements Task {
@Override
public State getState ( ) {
synchronized ( lifecycleMonitor ) {
return state ;
}
return executeWhileLocked ( lock , ( ) - > state ) ;
}
@Override
@ -220,13 +214,12 @@ abstract class CursorReadingTask<T, R> implements Task {
@@ -220,13 +214,12 @@ abstract class CursorReadingTask<T, R> implements Task {
@Nullable
private T getNext ( ) {
synchronized ( lifecycleMonitor ) {
return executeWhileLocked ( lock , ( ) - > {
if ( State . RUNNING . equals ( state ) ) {
return cursor . tryNext ( ) ;
}
}
throw new IllegalStateException ( String . format ( "Cursor %s is not longer open" , cursor ) ) ;
throw new IllegalStateException ( String . format ( "Cursor %s is not longer open" , cursor ) ) ;
} ) ;
}
private static boolean isValidCursor ( @Nullable MongoCursor < ? > cursor ) {
@ -263,4 +256,23 @@ abstract class CursorReadingTask<T, R> implements Task {
@@ -263,4 +256,23 @@ abstract class CursorReadingTask<T, R> implements Task {
throw translated ! = null ? translated : e ;
}
}
private static void doWhileLocked ( Lock lock , Runnable action ) {
executeWhileLocked ( lock , ( ) - > {
action . run ( ) ;
return null ;
} ) ;
}
@Nullable
private static < T > T executeWhileLocked ( Lock lock , Supplier < T > stuff ) {
lock . lock ( ) ;
try {
return stuff . get ( ) ;
} finally {
lock . unlock ( ) ;
}
}
}