From 4673525462d15a7a19ea49f0725cd55353ae58c5 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Thu, 25 Oct 2018 10:52:06 +0200 Subject: [PATCH] DATAMONGO-2113 - Fix resumeTimestamp conversion for change streams. We now use the first 32 bits of the timestamp to create the instant and ignore the ordinal value. Original pull request: #615. --- .../data/mongodb/core/ChangeStreamEvent.java | 4 +++- .../mongodb/core/ReactiveMongoTemplate.java | 2 +- .../mongodb/core/convert/MongoConverters.java | 21 +++++++++++++++++++ .../core/messaging/ChangeStreamTask.java | 2 +- .../core/ReactiveMongoTemplateTests.java | 5 +++-- .../convert/MongoConvertersUnitTests.java | 17 +++++++++++++-- .../core/messaging/ChangeStreamTests.java | 5 ++++- 7 files changed, 48 insertions(+), 8 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java index b25b3eb4f..8f661c5af 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ChangeStreamEvent.java @@ -83,7 +83,9 @@ public class ChangeStreamEvent { */ @Nullable public Instant getTimestamp() { - return raw != null && raw.getClusterTime() != null ? Instant.ofEpochMilli(raw.getClusterTime().getValue()) : null; + + return raw != null && raw.getClusterTime() != null + ? converter.getConversionService().convert(raw.getClusterTime(), Instant.class) : null; } /** diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java index 67d41f04d..81e7d821d 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java @@ -1915,7 +1915,7 @@ public class ReactiveMongoTemplate implements ReactiveMongoOperations, Applicati publisher = options.getResumeToken().map(BsonValue::asDocument).map(publisher::resumeAfter).orElse(publisher); publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation).orElse(publisher); - publisher = options.getResumeTimestamp().map(it -> new BsonTimestamp(it.toEpochMilli())) + publisher = options.getResumeTimestamp().map(it -> new BsonTimestamp((int) it.getEpochSecond(), 0)) .map(publisher::startAtOperationTime).orElse(publisher); publisher = publisher.fullDocument(options.getFullDocumentLookup().orElse(fullDocument)); diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/MongoConverters.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/MongoConverters.java index 83d27510a..018eba49c 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/MongoConverters.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/MongoConverters.java @@ -19,6 +19,7 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.net.MalformedURLException; import java.net.URL; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.Currency; @@ -26,6 +27,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.bson.BsonTimestamp; import org.bson.Document; import org.bson.types.Binary; import org.bson.types.Code; @@ -86,6 +88,7 @@ abstract class MongoConverters { converters.add(LongToAtomicLongConverter.INSTANCE); converters.add(IntegerToAtomicIntegerConverter.INSTANCE); converters.add(BinaryToByteArrayConverter.INSTANCE); + converters.add(BsonTimestampToInstantConverter.INSTANCE); return converters; } @@ -465,4 +468,22 @@ abstract class MongoConverters { return source.getData(); } } + + /** + * {@link Converter} implementation converting {@link BsonTimestamp} into {@link Instant}. + * + * @author Christoph Strobl + * @since 2.1.2 + */ + @ReadingConverter + enum BsonTimestampToInstantConverter implements Converter { + + INSTANCE; + + @Nullable + @Override + public Instant convert(BsonTimestamp source) { + return Instant.ofEpochSecond(source.getTime(), 0); + } + } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java index 6e887548a..5d0ff694d 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTask.java @@ -115,7 +115,7 @@ class ChangeStreamTask extends CursorReadingTask, .orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT : FullDocument.UPDATE_LOOKUP); - startAt = changeStreamOptions.getResumeTimestamp().map(Instant::toEpochMilli).map(BsonTimestamp::new) + startAt = changeStreamOptions.getResumeTimestamp().map(it -> new BsonTimestamp((int) it.getEpochSecond(), 0)) .orElse(null); } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java index 0324628db..22fc49317 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateTests.java @@ -29,6 +29,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import java.time.Duration; import java.time.Instant; import java.util.Arrays; import java.util.Collections; @@ -1355,7 +1356,7 @@ public class ReactiveMongoTemplateTests { } } - @Test // DATAMONGO-2012 + @Test // DATAMONGO-2012, DATAMONGO-2113 public void resumesAtTimestampCorrectly() throws InterruptedException { Assumptions.assumeThat(ReplicaSet.required().runsAsReplicaSet()).isTrue(); @@ -1372,7 +1373,7 @@ public class ReactiveMongoTemplateTests { Person person2 = new Person("Data", 37); Person person3 = new Person("MongoDB", 39); - StepVerifier.create(template.save(person1)).expectNextCount(1).verifyComplete(); + StepVerifier.create(template.save(person1).delayElement(Duration.ofSeconds(1))).expectNextCount(1).verifyComplete(); StepVerifier.create(template.save(person2)).expectNextCount(1).verifyComplete(); Thread.sleep(500); // just give it some time to link receive all events diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/convert/MongoConvertersUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/convert/MongoConvertersUnitTests.java index 753f033ca..8e5d0bb15 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/convert/MongoConvertersUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/convert/MongoConvertersUnitTests.java @@ -19,10 +19,15 @@ import static org.hamcrest.Matchers.*; import static org.junit.Assert.*; import java.math.BigDecimal; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.Currency; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.assertj.core.data.TemporalUnitLessThanOffset; +import org.bson.BsonTimestamp; +import org.bson.Document; import org.junit.Test; import org.springframework.data.geo.Box; import org.springframework.data.geo.Circle; @@ -32,14 +37,14 @@ import org.springframework.data.geo.Shape; import org.springframework.data.mongodb.core.convert.MongoConverters.AtomicIntegerToIntegerConverter; import org.springframework.data.mongodb.core.convert.MongoConverters.AtomicLongToLongConverter; import org.springframework.data.mongodb.core.convert.MongoConverters.BigDecimalToStringConverter; +import org.springframework.data.mongodb.core.convert.MongoConverters.BsonTimestampToInstantConverter; import org.springframework.data.mongodb.core.convert.MongoConverters.CurrencyToStringConverter; import org.springframework.data.mongodb.core.convert.MongoConverters.IntegerToAtomicIntegerConverter; import org.springframework.data.mongodb.core.convert.MongoConverters.LongToAtomicLongConverter; import org.springframework.data.mongodb.core.convert.MongoConverters.StringToBigDecimalConverter; import org.springframework.data.mongodb.core.convert.MongoConverters.StringToCurrencyConverter; import org.springframework.data.mongodb.core.geo.Sphere; - -import org.bson.Document; +import org.springframework.data.mongodb.test.util.Assertions; /** * Unit tests for {@link MongoConverters}. @@ -145,4 +150,12 @@ public class MongoConvertersUnitTests { public void convertsIntegerToAtomicIntegerCorrectly() { assertThat(IntegerToAtomicIntegerConverter.INSTANCE.convert(100), is(instanceOf(AtomicInteger.class))); } + + @Test // DATAMONGO-2113 + public void convertsBsonTimestampToInstantCorrectly() { + + Assertions.assertThat(BsonTimestampToInstantConverter.INSTANCE.convert(new BsonTimestamp(6615900307735969796L))) + .isCloseTo(Instant.ofEpochSecond(1540384327), new TemporalUnitLessThanOffset(100, ChronoUnit.MILLIS)); + } + } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTests.java index c76db2b59..a1d3361c0 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/messaging/ChangeStreamTests.java @@ -405,7 +405,7 @@ public class ChangeStreamTests { .append("user_name", "jellyBelly").append("age", 8).append("_class", User.class.getName())); } - @Test // DATAMONGO-2012 + @Test // DATAMONGO-2012, DATAMONGO-2113 public void resumeAtTimestampCorrectly() throws InterruptedException { CollectingMessageListener, User> messageListener1 = new CollectingMessageListener<>(); @@ -415,6 +415,9 @@ public class ChangeStreamTests { awaitSubscription(subscription1); template.save(jellyBelly); + + Thread.sleep(1000); // cluster timestamp is in seconds, so we need to wait at least one. + template.save(sugarSplashy); awaitMessages(messageListener1, 12);