Browse Source

Polishing.

Use Lock utility for easier lock handling.

Original Pull Request: #4431
See also: spring-projects/spring-data-commmons#2944
pull/4512/head
Mark Paluch 2 years ago committed by Christoph Strobl
parent
commit
661607a603
No known key found for this signature in database
GPG Key ID: 8CC1AB53391458C8
  1. 73
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java
  2. 23
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java
  3. 40
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java
  4. 46
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java

73
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java

@ -18,12 +18,9 @@ package org.springframework.data.mongodb.core; @@ -18,12 +18,9 @@ package org.springframework.data.mongodb.core;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Stream;
import org.bson.Document;
@ -50,6 +47,7 @@ import org.springframework.data.mongodb.core.query.NearQuery; @@ -50,6 +47,7 @@ import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.mongodb.core.query.UpdateDefinition;
import org.springframework.data.util.Lock;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
@ -192,20 +190,18 @@ public interface MongoOperations extends FluentMongoOperations { @@ -192,20 +190,18 @@ public interface MongoOperations extends FluentMongoOperations {
return new SessionScoped() {
private final Lock lock = new ReentrantLock();
private final Lock lock = Lock.of(new ReentrantLock());
private @Nullable ClientSession session;
@Override
public <T> T execute(SessionCallback<T> action, Consumer<ClientSession> onComplete) {
lock.lock();
try {
lock.executeWithoutResult(() -> {
if (session == null) {
session = sessionProvider.get();
}
} finally {
lock.unlock();
}
});
try {
return action.doInSession(MongoOperations.this.withSession(session));
@ -950,8 +946,8 @@ public interface MongoOperations extends FluentMongoOperations { @@ -950,8 +946,8 @@ public interface MongoOperations extends FluentMongoOperations {
* Triggers <a href="https://docs.mongodb.org/manual/reference/method/db.collection.findAndModify/">findAndModify </a>
* to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query}.
*
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional
* fields specification. Must not be {@literal null}.
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an
* optional fields specification. Must not be {@literal null}.
* @param update the {@link UpdateDefinition} to apply on matching documents. Must not be {@literal null}.
* @param entityClass the parametrized type. Must not be {@literal null}.
* @return the converted object that was updated before it was updated or {@literal null}, if not found.
@ -966,8 +962,8 @@ public interface MongoOperations extends FluentMongoOperations { @@ -966,8 +962,8 @@ public interface MongoOperations extends FluentMongoOperations {
* Triggers <a href="https://docs.mongodb.org/manual/reference/method/db.collection.findAndModify/">findAndModify </a>
* to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query}.
*
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional
* fields specification. Must not be {@literal null}.
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an
* optional fields specification. Must not be {@literal null}.
* @param update the {@link UpdateDefinition} to apply on matching documents. Must not be {@literal null}.
* @param entityClass the parametrized type. Must not be {@literal null}.
* @param collectionName the collection to query. Must not be {@literal null}.
@ -984,8 +980,8 @@ public interface MongoOperations extends FluentMongoOperations { @@ -984,8 +980,8 @@ public interface MongoOperations extends FluentMongoOperations {
* to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query} taking
* {@link FindAndModifyOptions} into account.
*
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional
* fields specification.
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an
* optional fields specification.
* @param update the {@link UpdateDefinition} to apply on matching documents.
* @param options the {@link FindAndModifyOptions} holding additional information.
* @param entityClass the parametrized type.
@ -1004,8 +1000,8 @@ public interface MongoOperations extends FluentMongoOperations { @@ -1004,8 +1000,8 @@ public interface MongoOperations extends FluentMongoOperations {
* to apply provided {@link Update} on documents matching {@link Criteria} of given {@link Query} taking
* {@link FindAndModifyOptions} into account.
*
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional
* fields specification. Must not be {@literal null}.
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an
* optional fields specification. Must not be {@literal null}.
* @param update the {@link UpdateDefinition} to apply on matching documents. Must not be {@literal null}.
* @param options the {@link FindAndModifyOptions} holding additional information. Must not be {@literal null}.
* @param entityClass the parametrized type. Must not be {@literal null}.
@ -1030,8 +1026,8 @@ public interface MongoOperations extends FluentMongoOperations { @@ -1030,8 +1026,8 @@ public interface MongoOperations extends FluentMongoOperations {
* Options are defaulted to {@link FindAndReplaceOptions#empty()}. <br />
* <strong>NOTE:</strong> The replacement entity must not hold an {@literal id}.
*
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional
* fields specification. Must not be {@literal null}.
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an
* optional fields specification. Must not be {@literal null}.
* @param replacement the replacement document. Must not be {@literal null}.
* @return the converted object that was updated or {@literal null}, if not found.
* @throws org.springframework.data.mapping.MappingException if the collection name cannot be
@ -1051,8 +1047,8 @@ public interface MongoOperations extends FluentMongoOperations { @@ -1051,8 +1047,8 @@ public interface MongoOperations extends FluentMongoOperations {
* Options are defaulted to {@link FindAndReplaceOptions#empty()}. <br />
* <strong>NOTE:</strong> The replacement entity must not hold an {@literal id}.
*
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional
* fields specification. Must not be {@literal null}.
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an
* optional fields specification. Must not be {@literal null}.
* @param replacement the replacement document. Must not be {@literal null}.
* @param collectionName the collection to query. Must not be {@literal null}.
* @return the converted object that was updated or {@literal null}, if not found.
@ -1070,8 +1066,8 @@ public interface MongoOperations extends FluentMongoOperations { @@ -1070,8 +1066,8 @@ public interface MongoOperations extends FluentMongoOperations {
* taking {@link FindAndReplaceOptions} into account.<br />
* <strong>NOTE:</strong> The replacement entity must not hold an {@literal id}.
*
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional
* fields specification. Must not be {@literal null}.
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an
* optional fields specification. Must not be {@literal null}.
* @param replacement the replacement document. Must not be {@literal null}.
* @param options the {@link FindAndModifyOptions} holding additional information. Must not be {@literal null}.
* @return the converted object that was updated or {@literal null}, if not found. Depending on the value of
@ -1093,8 +1089,8 @@ public interface MongoOperations extends FluentMongoOperations { @@ -1093,8 +1089,8 @@ public interface MongoOperations extends FluentMongoOperations {
* taking {@link FindAndReplaceOptions} into account.<br />
* <strong>NOTE:</strong> The replacement entity must not hold an {@literal id}.
*
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional
* fields specification. Must not be {@literal null}.
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an
* optional fields specification. Must not be {@literal null}.
* @param replacement the replacement document. Must not be {@literal null}.
* @param options the {@link FindAndModifyOptions} holding additional information. Must not be {@literal null}.
* @return the converted object that was updated or {@literal null}, if not found. Depending on the value of
@ -1116,8 +1112,8 @@ public interface MongoOperations extends FluentMongoOperations { @@ -1116,8 +1112,8 @@ public interface MongoOperations extends FluentMongoOperations {
* taking {@link FindAndReplaceOptions} into account.<br />
* <strong>NOTE:</strong> The replacement entity must not hold an {@literal id}.
*
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional
* fields specification. Must not be {@literal null}.
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an
* optional fields specification. Must not be {@literal null}.
* @param replacement the replacement document. Must not be {@literal null}.
* @param options the {@link FindAndModifyOptions} holding additional information. Must not be {@literal null}.
* @param entityType the parametrized type. Must not be {@literal null}.
@ -1141,8 +1137,8 @@ public interface MongoOperations extends FluentMongoOperations { @@ -1141,8 +1137,8 @@ public interface MongoOperations extends FluentMongoOperations {
* taking {@link FindAndReplaceOptions} into account.<br />
* <strong>NOTE:</strong> The replacement entity must not hold an {@literal id}.
*
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional
* fields specification. Must not be {@literal null}.
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an
* optional fields specification. Must not be {@literal null}.
* @param replacement the replacement document. Must not be {@literal null}.
* @param options the {@link FindAndModifyOptions} holding additional information. Must not be {@literal null}.
* @param entityType the type used for mapping the {@link Query} to domain type fields and deriving the collection
@ -1171,8 +1167,8 @@ public interface MongoOperations extends FluentMongoOperations { @@ -1171,8 +1167,8 @@ public interface MongoOperations extends FluentMongoOperations {
* taking {@link FindAndReplaceOptions} into account.<br />
* <strong>NOTE:</strong> The replacement entity must not hold an {@literal id}.
*
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an optional
* fields specification. Must not be {@literal null}.
* @param query the {@link Query} class that specifies the {@link Criteria} used to find a document and also an
* optional fields specification. Must not be {@literal null}.
* @param replacement the replacement document. Must not be {@literal null}.
* @param options the {@link FindAndModifyOptions} holding additional information. Must not be {@literal null}.
* @param entityType the type used for mapping the {@link Query} to domain type fields. Must not be {@literal null}.
@ -1680,7 +1676,8 @@ public interface MongoOperations extends FluentMongoOperations { @@ -1680,7 +1676,8 @@ public interface MongoOperations extends FluentMongoOperations {
* acknowledged} remove operation was successful or not.
*
* @param object must not be {@literal null}.
* @param collectionName name of the collection where the documents will be removed from, must not be {@literal null} or empty.
* @param collectionName name of the collection where the documents will be removed from, must not be {@literal null}
* or empty.
* @return the {@link DeleteResult} which lets you access the results of the previous delete.
*/
DeleteResult remove(Object object, String collectionName);
@ -1704,7 +1701,8 @@ public interface MongoOperations extends FluentMongoOperations { @@ -1704,7 +1701,8 @@ public interface MongoOperations extends FluentMongoOperations {
*
* @param query the query document that specifies the criteria used to remove a document.
* @param entityClass class of the pojo to be operated on. Can be {@literal null}.
* @param collectionName name of the collection where the documents will be removed from, must not be {@literal null} or empty.
* @param collectionName name of the collection where the documents will be removed from, must not be {@literal null}
* or empty.
* @return the {@link DeleteResult} which lets you access the results of the previous delete.
* @throws IllegalArgumentException when {@literal query}, {@literal entityClass} or {@literal collectionName} is
* {@literal null}.
@ -1718,7 +1716,8 @@ public interface MongoOperations extends FluentMongoOperations { @@ -1718,7 +1716,8 @@ public interface MongoOperations extends FluentMongoOperations {
* information. Use {@link #remove(Query, Class, String)} to get full type specific support.
*
* @param query the query document that specifies the criteria used to remove a document.
* @param collectionName name of the collection where the documents will be removed from, must not be {@literal null} or empty.
* @param collectionName name of the collection where the documents will be removed from, must not be {@literal null}
* or empty.
* @return the {@link DeleteResult} which lets you access the results of the previous delete.
* @throws IllegalArgumentException when {@literal query} or {@literal collectionName} is {@literal null}.
*/
@ -1730,7 +1729,8 @@ public interface MongoOperations extends FluentMongoOperations { @@ -1730,7 +1729,8 @@ public interface MongoOperations extends FluentMongoOperations {
* information. Use {@link #findAllAndRemove(Query, Class, String)} to get full type specific support.
*
* @param query the query document that specifies the criteria used to find and remove documents.
* @param collectionName name of the collection where the documents will be removed from, must not be {@literal null} or empty.
* @param collectionName name of the collection where the documents will be removed from, must not be {@literal null}
* or empty.
* @return the {@link List} converted objects deleted by this operation.
* @since 1.5
*/
@ -1755,7 +1755,8 @@ public interface MongoOperations extends FluentMongoOperations { @@ -1755,7 +1755,8 @@ public interface MongoOperations extends FluentMongoOperations {
*
* @param query the query document that specifies the criteria used to find and remove documents.
* @param entityClass class of the pojo to be operated on.
* @param collectionName name of the collection where the documents will be removed from, must not be {@literal null} or empty.
* @param collectionName name of the collection where the documents will be removed from, must not be {@literal null}
* or empty.
* @return the {@link List} converted objects deleted by this operation.
* @since 1.5
*/

23
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/LazyLoadingProxyFactory.java

@ -22,7 +22,6 @@ import java.io.ObjectInputStream; @@ -22,7 +22,6 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
@ -42,6 +41,8 @@ import org.springframework.dao.support.PersistenceExceptionTranslator; @@ -42,6 +41,8 @@ import org.springframework.dao.support.PersistenceExceptionTranslator;
import org.springframework.data.mongodb.ClientSessionException;
import org.springframework.data.mongodb.LazyLoadingException;
import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty;
import org.springframework.data.util.Lock;
import org.springframework.data.util.Lock.AcquiredLock;
import org.springframework.lang.Nullable;
import org.springframework.objenesis.SpringObjenesis;
import org.springframework.util.ReflectionUtils;
@ -175,7 +176,9 @@ public final class LazyLoadingProxyFactory { @@ -175,7 +176,9 @@ public final class LazyLoadingProxyFactory {
}
}
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = Lock.of(rwLock.readLock());
private final Lock writeLock = Lock.of(rwLock.writeLock());
private final MongoPersistentProperty property;
private final DbRefResolverCallback callback;
@ -347,8 +350,7 @@ public final class LazyLoadingProxyFactory { @@ -347,8 +350,7 @@ public final class LazyLoadingProxyFactory {
@Nullable
private Object resolve() {
lock.readLock().lock();
try {
try (AcquiredLock l = readLock.lock()) {
if (resolved) {
if (LOGGER.isTraceEnabled()) {
@ -357,8 +359,6 @@ public final class LazyLoadingProxyFactory { @@ -357,8 +359,6 @@ public final class LazyLoadingProxyFactory {
}
return result;
}
} finally {
lock.readLock().unlock();
}
if (LOGGER.isTraceEnabled()) {
@ -367,7 +367,7 @@ public final class LazyLoadingProxyFactory { @@ -367,7 +367,7 @@ public final class LazyLoadingProxyFactory {
}
try {
return executeWhileLocked(lock.writeLock(), () -> callback.resolve(property));
return writeLock.execute(() -> callback.resolve(property));
} catch (RuntimeException ex) {
DataAccessException translatedException = exceptionTranslator.translateExceptionIfPossible(ex);
@ -381,15 +381,6 @@ public final class LazyLoadingProxyFactory { @@ -381,15 +381,6 @@ public final class LazyLoadingProxyFactory {
}
}
private static <T> T executeWhileLocked(Lock lock, Supplier<T> stuff) {
lock.lock();
try {
return stuff.get();
} finally {
lock.unlock();
}
}
}
}

40
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java

@ -18,7 +18,6 @@ package org.springframework.data.mongodb.core.messaging; @@ -18,7 +18,6 @@ package org.springframework.data.mongodb.core.messaging;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
@ -26,6 +25,7 @@ import org.springframework.dao.DataAccessResourceFailureException; @@ -26,6 +25,7 @@ import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.messaging.Message.MessageProperties;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
import org.springframework.data.util.Lock;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;
@ -41,7 +41,7 @@ import com.mongodb.client.MongoCursor; @@ -41,7 +41,7 @@ import com.mongodb.client.MongoCursor;
*/
abstract class CursorReadingTask<T, R> implements Task {
private final Lock lock = new ReentrantLock();
private final Lock lock = Lock.of(new ReentrantLock());
private final MongoTemplate template;
private final SubscriptionRequest<T, R, RequestOptions> request;
@ -88,14 +88,14 @@ abstract class CursorReadingTask<T, R> implements Task { @@ -88,14 +88,14 @@ abstract class CursorReadingTask<T, R> implements Task {
}
} catch (InterruptedException e) {
doWhileLocked(lock, () -> state = State.CANCELLED);
lock.executeWithoutResult(() -> state = State.CANCELLED);
Thread.currentThread().interrupt();
break;
}
}
} catch (RuntimeException e) {
doWhileLocked(lock, () -> state = State.CANCELLED);
lock.executeWithoutResult(() -> state = State.CANCELLED);
errorHandler.handleError(e);
}
}
@ -111,7 +111,7 @@ abstract class CursorReadingTask<T, R> implements Task { @@ -111,7 +111,7 @@ abstract class CursorReadingTask<T, R> implements Task {
*/
private void start() {
doWhileLocked(lock, () -> {
lock.executeWithoutResult(() -> {
if (!State.RUNNING.equals(state)) {
state = State.STARTING;
}
@ -119,9 +119,7 @@ abstract class CursorReadingTask<T, R> implements Task { @@ -119,9 +119,7 @@ abstract class CursorReadingTask<T, R> implements Task {
do {
// boolean valid = false;
boolean valid = executeWhileLocked(lock, () -> {
boolean valid = lock.execute(() -> {
if (!State.STARTING.equals(state)) {
return false;
@ -144,7 +142,7 @@ abstract class CursorReadingTask<T, R> implements Task { @@ -144,7 +142,7 @@ abstract class CursorReadingTask<T, R> implements Task {
Thread.sleep(100);
} catch (InterruptedException e) {
doWhileLocked(lock, () -> state = State.CANCELLED);
lock.executeWithoutResult(() -> state = State.CANCELLED);
Thread.currentThread().interrupt();
}
}
@ -160,7 +158,7 @@ abstract class CursorReadingTask<T, R> implements Task { @@ -160,7 +158,7 @@ abstract class CursorReadingTask<T, R> implements Task {
@Override
public void cancel() throws DataAccessResourceFailureException {
doWhileLocked(lock, () -> {
lock.executeWithoutResult(() -> {
if (State.RUNNING.equals(state) || State.STARTING.equals(state)) {
this.state = State.CANCELLED;
@ -178,7 +176,7 @@ abstract class CursorReadingTask<T, R> implements Task { @@ -178,7 +176,7 @@ abstract class CursorReadingTask<T, R> implements Task {
@Override
public State getState() {
return executeWhileLocked(lock, () -> state);
return lock.execute(() -> state);
}
@Override
@ -214,7 +212,7 @@ abstract class CursorReadingTask<T, R> implements Task { @@ -214,7 +212,7 @@ abstract class CursorReadingTask<T, R> implements Task {
@Nullable
private T getNext() {
return executeWhileLocked(lock, () -> {
return lock.execute(() -> {
if (State.RUNNING.equals(state)) {
return cursor.tryNext();
}
@ -257,22 +255,4 @@ abstract class CursorReadingTask<T, R> implements Task { @@ -257,22 +255,4 @@ abstract class CursorReadingTask<T, R> implements Task {
}
}
private static void doWhileLocked(Lock lock, Runnable action) {
executeWhileLocked(lock, () -> {
action.run();
return null;
});
}
@Nullable
private static <T> T executeWhileLocked(Lock lock, Supplier<T> stuff) {
lock.lock();
try {
return stuff.get();
} finally {
lock.unlock();
}
}
}

46
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/DefaultMessageListenerContainer.java

@ -20,10 +20,8 @@ import java.util.LinkedHashMap; @@ -20,10 +20,8 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -31,6 +29,7 @@ import org.springframework.core.task.SimpleAsyncTaskExecutor; @@ -31,6 +29,7 @@ import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
import org.springframework.data.util.Lock;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;
@ -54,8 +53,13 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer @@ -54,8 +53,13 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer
private final Map<SubscriptionRequest, Subscription> subscriptions = new LinkedHashMap<>();
ReadWriteLock lifecycleMonitor = new ReentrantReadWriteLock();
ReadWriteLock subscriptionMonitor = new ReentrantReadWriteLock();
private final ReadWriteLock lifecycleMonitor = new ReentrantReadWriteLock();
private final Lock lifecycleRead = Lock.of(lifecycleMonitor.readLock());
private final Lock lifecycleWrite = Lock.of(lifecycleMonitor.readLock());
private final ReadWriteLock subscriptionMonitor = new ReentrantReadWriteLock();
private final Lock subscriptionRead = Lock.of(subscriptionMonitor.readLock());
private final Lock subscriptionWrite = Lock.of(subscriptionMonitor.readLock());
private boolean running = false;
@ -114,7 +118,7 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer @@ -114,7 +118,7 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer
@Override
public void start() {
doWhileLocked(lifecycleMonitor.writeLock(), () -> {
lifecycleWrite.executeWithoutResult(() -> {
if (!this.running) {
subscriptions.values().stream() //
.filter(it -> !it.isActive()) //
@ -130,8 +134,7 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer @@ -130,8 +134,7 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer
@Override
public void stop() {
doWhileLocked(lifecycleMonitor.writeLock(), () -> {
lifecycleWrite.executeWithoutResult(() -> {
if (this.running) {
subscriptions.values().forEach(Cancelable::cancel);
running = false;
@ -141,7 +144,7 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer @@ -141,7 +144,7 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer
@Override
public boolean isRunning() {
return executeWhileLocked(lifecycleMonitor.readLock(), () -> running);
return lifecycleRead.execute(() -> running);
}
@Override
@ -166,13 +169,12 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer @@ -166,13 +169,12 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer
@Override
public Optional<Subscription> lookup(SubscriptionRequest<?, ?, ?> request) {
return executeWhileLocked(subscriptionMonitor.readLock(), () -> Optional.ofNullable(subscriptions.get(request)));
return subscriptionRead.execute(() -> Optional.ofNullable(subscriptions.get(request)));
}
public Subscription register(SubscriptionRequest request, Task task) {
return executeWhileLocked(this.subscriptionMonitor.writeLock(), () ->
{
return subscriptionWrite.execute(() -> {
if (subscriptions.containsKey(request)) {
return subscriptions.get(request);
}
@ -190,8 +192,7 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer @@ -190,8 +192,7 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer
@Override
public void remove(Subscription subscription) {
doWhileLocked(this.subscriptionMonitor.writeLock(), () -> {
subscriptionWrite.executeWithoutResult(() -> {
if (subscriptions.containsValue(subscription)) {
@ -203,25 +204,6 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer @@ -203,25 +204,6 @@ public class DefaultMessageListenerContainer implements MessageListenerContainer
}
});
}
private static void doWhileLocked(Lock lock, Runnable action) {
executeWhileLocked(lock, () -> {
action.run();
return null;
});
}
@Nullable
private static <T> T executeWhileLocked(Lock lock, Supplier<T> stuff) {
lock.lock();
try {
return stuff.get();
} finally {
lock.unlock();
}
}
/**
* @author Christoph Strobl

Loading…
Cancel
Save