diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java index 201079038..8c6c03466 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/CursorReadingTask.java @@ -18,6 +18,7 @@ package org.springframework.data.mongodb.core.messaging; import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.springframework.dao.DataAccessResourceFailureException; import org.springframework.data.mongodb.core.MongoTemplate; @@ -75,8 +76,11 @@ abstract class CursorReadingTask implements Task { start(); while (isRunning()) { + try { - T next = getNext(); + + T next = execute(this::getNext); + if (next != null) { emitMessage(createMessage(next, targetType, request.getRequestOptions())); } else { @@ -88,12 +92,6 @@ abstract class CursorReadingTask implements Task { state = State.CANCELLED; } Thread.interrupted(); - } catch (RuntimeException e) { - - Exception translated = template.getExceptionTranslator().translateExceptionIfPossible(e); - Exception toHandle = translated != null ? translated : e; - - errorHandler.handleError(toHandle); } } } @@ -101,11 +99,11 @@ abstract class CursorReadingTask implements Task { /** * Initialize the Task by 1st setting the current state to {@link State#STARTING starting} indicating the * initialization procedure.
- * Moving on the underlying {@link MongoCursor} gets {@link #initCursor(MongoTemplate, RequestOptions) created} and is - * {@link #isValidCursor(MongoCursor) health checked}. Once a valid {@link MongoCursor} is created the {@link #state} - * is set to {@link State#RUNNING running}. If the health check is not passed the {@link MongoCursor} is immediately - * {@link MongoCursor#close() closed} and a new {@link MongoCursor} is requested until a valid one is retrieved or the - * {@link #state} changes. + * Moving on the underlying {@link MongoCursor} gets {@link #initCursor(MongoTemplate, RequestOptions, Class) created} + * and is {@link #isValidCursor(MongoCursor) health checked}. Once a valid {@link MongoCursor} is created the + * {@link #state} is set to {@link State#RUNNING running}. If the health check is not passed the {@link MongoCursor} + * is immediately {@link MongoCursor#close() closed} and a new {@link MongoCursor} is requested until a valid one is + * retrieved or the {@link #state} changes. */ private void start() { @@ -123,7 +121,7 @@ abstract class CursorReadingTask implements Task { if (State.STARTING.equals(state)) { - MongoCursor cursor = initCursor(template, request.getRequestOptions(), targetType); + MongoCursor cursor = execute(() -> initCursor(template, request.getRequestOptions(), targetType)); valid = isValidCursor(cursor); if (valid) { this.cursor = cursor; @@ -248,4 +246,29 @@ abstract class CursorReadingTask implements Task { return true; } + + /** + * Execute an operation and take care of translating exceptions using the {@link MongoTemplate templates} + * {@link org.springframework.data.mongodb.core.MongoExceptionTranslator} and passing those on to the + * {@link #errorHandler}. + * + * @param callback must not be {@literal null}. + * @param + * @return can be {@literal null}. + * @throws RuntimeException The potentially translated exception. + */ + @Nullable + private T execute(Supplier callback) { + + try { + return callback.get(); + } catch (RuntimeException e) { + + RuntimeException translated = template.getExceptionTranslator().translateExceptionIfPossible(e); + RuntimeException toHandle = translated != null ? translated : e; + + errorHandler.handleError(toHandle); + throw toHandle; + } + } } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/CursorReadingTaskUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/CursorReadingTaskUnitTests.java index 9af142771..6ed6350ef 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/CursorReadingTaskUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/CursorReadingTaskUnitTests.java @@ -27,8 +27,10 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import org.springframework.data.mongodb.core.MongoExceptionTranslator; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions; import org.springframework.data.mongodb.core.messaging.Task.State; @@ -64,6 +66,7 @@ public class CursorReadingTaskUnitTests { when(request.getMessageListener()).thenReturn(listener); when(options.getCollectionName()).thenReturn("collection-name"); when(template.getDb()).thenReturn(db); + when(template.getExceptionTranslator()).thenReturn(new MongoExceptionTranslator()); when(db.getName()).thenReturn("mock-db"); task = new ValueCapturingTaskStub(template, request, Object.class, cursor, errorHandler); @@ -93,6 +96,17 @@ public class CursorReadingTaskUnitTests { verify(listener, times(task.getValues().size())).onMessage(any()); } + @Test // DATAMONGO-2173 + public void writesErrorOnStartToErrorHandler() { + + ArgumentCaptor errorCaptor = ArgumentCaptor.forClass(Throwable.class); + Task task = new ErrorOnInitCursorTaskStub(template, request, Object.class, errorHandler); + + assertThatExceptionOfType(RuntimeException.class).isThrownBy(task::run); + verify(errorHandler).handleError(errorCaptor.capture()); + assertThat(errorCaptor.getValue()).hasMessageStartingWith("let's get it started (ha)"); + } + private static class MultithreadedStopRunningWhileEmittingMessages extends MultithreadedTestCase { CursorReadingTask task; @@ -222,4 +236,17 @@ public class CursorReadingTaskUnitTests { return values; } } + + static class ErrorOnInitCursorTaskStub extends CursorReadingTask { + + public ErrorOnInitCursorTaskStub(MongoTemplate template, SubscriptionRequest request, Class targetType, + ErrorHandler errorHandler) { + super(template, request, targetType, errorHandler); + } + + @Override + protected MongoCursor initCursor(MongoTemplate template, RequestOptions options, Class targetType) { + throw new RuntimeException("let's get it started (ha), let's get it started in here..."); + } + } }