Browse Source

Support showExpandedEvents in change streams.

Expose showExpandedEvents in ChangeStreamOptions and propagate to sync/reactive change streams so expanded events can be consumed.

Closes #5069
Signed-off-by: Kyuhong Han <roy0424@naver.com>
pull/5112/head
Kyuhong Han 2 weeks ago
parent
commit
3eb3ad1ae8
  1. 27
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java
  2. 1
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java
  3. 15
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java
  4. 9
      spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java
  5. 8
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ChangeStreamOptionsUnitTests.java
  6. 15
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java
  7. 16
      spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java

27
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamOptions.java

@ -53,6 +53,7 @@ public class ChangeStreamOptions { @@ -53,6 +53,7 @@ public class ChangeStreamOptions {
private @Nullable FullDocumentBeforeChange fullDocumentBeforeChangeLookup;
private @Nullable Collation collation;
private @Nullable Object resumeTimestamp;
private @Nullable Boolean showExpandedEvents;
private Resume resume = Resume.UNDEFINED;
protected ChangeStreamOptions() {}
@ -108,6 +109,13 @@ public class ChangeStreamOptions { @@ -108,6 +109,13 @@ public class ChangeStreamOptions {
return Optional.ofNullable(resumeTimestamp).map(timestamp -> asTimestampOfType(timestamp, BsonTimestamp.class));
}
/**
* @return {@link Optional#empty()} if not set.
*/
public Optional<Boolean> getShowExpandedEvents() {
return Optional.ofNullable(showExpandedEvents);
}
/**
* @return {@literal true} if the change stream should be started after the {@link #getResumeToken() token}.
* @since 2.2
@ -191,6 +199,9 @@ public class ChangeStreamOptions { @@ -191,6 +199,9 @@ public class ChangeStreamOptions {
if (!ObjectUtils.nullSafeEquals(this.resumeTimestamp, that.resumeTimestamp)) {
return false;
}
if (!ObjectUtils.nullSafeEquals(this.showExpandedEvents, that.showExpandedEvents)) {
return false;
}
return resume == that.resume;
}
@ -202,6 +213,7 @@ public class ChangeStreamOptions { @@ -202,6 +213,7 @@ public class ChangeStreamOptions {
result = 31 * result + ObjectUtils.nullSafeHashCode(fullDocumentBeforeChangeLookup);
result = 31 * result + ObjectUtils.nullSafeHashCode(collation);
result = 31 * result + ObjectUtils.nullSafeHashCode(resumeTimestamp);
result = 31 * result + ObjectUtils.nullSafeHashCode(showExpandedEvents);
result = 31 * result + ObjectUtils.nullSafeHashCode(resume);
return result;
}
@ -239,6 +251,7 @@ public class ChangeStreamOptions { @@ -239,6 +251,7 @@ public class ChangeStreamOptions {
private @Nullable FullDocumentBeforeChange fullDocumentBeforeChangeLookup;
private @Nullable Collation collation;
private @Nullable Object resumeTimestamp;
private @Nullable Boolean showExpandedEvents;
private Resume resume = Resume.UNDEFINED;
private ChangeStreamOptionsBuilder() {}
@ -432,6 +445,19 @@ public class ChangeStreamOptions { @@ -432,6 +445,19 @@ public class ChangeStreamOptions {
return this;
}
/**
* Set whether expanded change events (e.g. createIndexes, shardCollection) should be emitted.
*
* @param showExpandedEvents {@code true} to include expanded events.
* @return this.
*/
@Contract("_ -> this")
public ChangeStreamOptionsBuilder showExpandedEvents(boolean showExpandedEvents) {
this.showExpandedEvents = showExpandedEvents;
return this;
}
/**
* @return the built {@link ChangeStreamOptions}
*/
@ -446,6 +472,7 @@ public class ChangeStreamOptions { @@ -446,6 +472,7 @@ public class ChangeStreamOptions {
options.fullDocumentBeforeChangeLookup = this.fullDocumentBeforeChangeLookup;
options.collation = this.collation;
options.resumeTimestamp = this.resumeTimestamp;
options.showExpandedEvents = this.showExpandedEvents;
options.resume = this.resume;
return options;

1
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

@ -2148,6 +2148,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati @@ -2148,6 +2148,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati
publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation)
.orElse(publisher);
publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher);
publisher = options.getShowExpandedEvents().map(publisher::showExpandedEvents).orElse(publisher);
if (options.getFullDocumentBeforeChangeLookup().isPresent()) {
publisher = publisher.fullDocumentBeforeChange(options.getFullDocumentBeforeChangeLookup().get());

15
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamRequest.java

@ -243,6 +243,7 @@ public class ChangeStreamRequest<T> @@ -243,6 +243,7 @@ public class ChangeStreamRequest<T>
private @Nullable String databaseName;
private @Nullable String collectionName;
private @Nullable Duration maxAwaitTime;
private @Nullable Boolean showExpandedEvents;
private @Nullable MessageListener<ChangeStreamDocument<Document>, ? super T> listener;
private final ChangeStreamOptionsBuilder delegate = ChangeStreamOptions.builder();
@ -470,6 +471,20 @@ public class ChangeStreamRequest<T> @@ -470,6 +471,20 @@ public class ChangeStreamRequest<T>
return this;
}
/**
* Set whether expanded change events (e.g. createIndexes, shardCollection) should be emitted.
*
* @param showExpandedEvents {@code true} to include expanded events.
* @return this.
*/
@Contract("_ -> this")
public ChangeStreamRequestBuilder<T> showExpandedEvents(boolean showExpandedEvents) {
this.showExpandedEvents = showExpandedEvents;
this.delegate.showExpandedEvents(showExpandedEvents);
return this;
}
/**
* @return the build {@link ChangeStreamRequest}.
*/

9
spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java

@ -90,6 +90,7 @@ class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>, @@ -90,6 +90,7 @@ class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>,
FullDocumentBeforeChange fullDocumentBeforeChange = null;
BsonTimestamp startAt = null;
boolean resumeAfter = true;
boolean showExpandedEvents = false;
if (options instanceof ChangeStreamRequest.ChangeStreamRequestOptions requestOptions) {
@ -105,6 +106,10 @@ class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>, @@ -105,6 +106,10 @@ class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>,
}
}
if (changeStreamOptions.getShowExpandedEvents().isPresent()) {
showExpandedEvents = changeStreamOptions.getShowExpandedEvents().get();
}
if (changeStreamOptions.getResumeToken().isPresent()) {
resumeToken = changeStreamOptions.getResumeToken().get().asDocument();
@ -155,6 +160,10 @@ class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>, @@ -155,6 +160,10 @@ class ChangeStreamTask extends CursorReadingTask<ChangeStreamDocument<Document>,
iterable = iterable.collation(collation);
}
if (showExpandedEvents) {
iterable = iterable.showExpandedEvents(showExpandedEvents);
}
iterable = iterable.fullDocument(fullDocument);
if(fullDocumentBeforeChange != null) {
iterable = iterable.fullDocumentBeforeChange(fullDocumentBeforeChange);

8
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ChangeStreamOptionsUnitTests.java

@ -53,4 +53,12 @@ public class ChangeStreamOptionsUnitTests { @@ -53,4 +53,12 @@ public class ChangeStreamOptionsUnitTests {
assertThat(options.isResumeAfter()).isFalse();
assertThat(options.isStartAfter()).isFalse();
}
@Test // GH-5069
void shouldStoreShowExpandedEvents() {
ChangeStreamOptions options = ChangeStreamOptions.builder().showExpandedEvents(true).build();
assertThat(options.getShowExpandedEvents()).isPresent().hasValue(true);
}
}

15
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java

@ -1668,6 +1668,21 @@ public class ReactiveMongoTemplateUnitTests { @@ -1668,6 +1668,21 @@ public class ReactiveMongoTemplateUnitTests {
}
@Test // GH-5069
void changeStreamOptionShowExpandedEventsShouldBeApplied() {
when(factory.getMongoDatabase(anyString())).thenReturn(Mono.just(db));
when(collection.watch(any(Class.class))).thenReturn(changeStreamPublisher);
when(changeStreamPublisher.showExpandedEvents(anyBoolean())).thenReturn(changeStreamPublisher);
when(changeStreamPublisher.fullDocument(any())).thenReturn(changeStreamPublisher);
ChangeStreamOptions options = ChangeStreamOptions.builder()
.showExpandedEvents(true).build();
template.changeStream("database", "collection", options, Object.class).subscribe();
verify(changeStreamPublisher).showExpandedEvents(true);
}
@Test // GH-4462
void replaceShouldUseCollationWhenPresent() {

16
spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTaskUnitTests.java

@ -140,6 +140,22 @@ class ChangeStreamTaskUnitTests { @@ -140,6 +140,22 @@ class ChangeStreamTaskUnitTests {
verify(changeStreamIterable).fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);
}
@Test // GH-5069
void shouldApplyShowExpandedEventsToChangeStream() {
when(changeStreamIterable.showExpandedEvents(true)).thenReturn(changeStreamIterable);
ChangeStreamRequest request = ChangeStreamRequest.builder() //
.collection("start-wars") //
.showExpandedEvents(true) //
.publishTo(message -> {}) //
.build();
initTask(request, Document.class);
verify(changeStreamIterable).showExpandedEvents(true);
}
private MongoCursor<ChangeStreamDocument<Document>> initTask(ChangeStreamRequest request, Class<?> targetType) {
ChangeStreamTask task = new ChangeStreamTask(template, request, targetType, er -> {});

Loading…
Cancel
Save