Skip to content

Commit

Permalink
DBZ-8708 Mongo read events shoudl produce source info with ts_ms fiel…
Browse files Browse the repository at this point in the history
…d set to current timestamp instead of zero
  • Loading branch information
jcechace authored and jpechane committed Feb 24, 2025
1 parent 56a2759 commit 5ad65c4
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void event(DataCollectionId collectionId, Instant timestamp) {
}

public void readEvent(CollectionId collectionId, Instant timestamp) {
sourceInfo.collectionEvent(collectionId, 0L);
sourceInfo.readEvent(collectionId, 0L);
}

public void initEvent(MongoChangeStreamCursor<ChangeStreamDocument<BsonDocument>> cursor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public BsonTimestamp getTimestamp() {
}

public int getTime() {
return (this.ts != null) ? this.ts.getTime() : 0;
return (this.ts != null) ? this.ts.getTime() : -1;
}

public int getInc() {
Expand Down Expand Up @@ -171,7 +171,7 @@ public BsonTimestamp lastTimestamp() {
* @param collectionId the event's collection identifier; may not be null
* @see #schema()
*/
public void collectionEvent(CollectionId collectionId, long wallTime) {
public void readEvent(CollectionId collectionId, long wallTime) {
onEvent(collectionId, position, wallTime);
}

Expand Down Expand Up @@ -285,7 +285,8 @@ public boolean isSnapshotRunning() {

@Override
protected Instant timestamp() {
return Instant.ofEpochSecond(position().getTime());
var time = position().getTime();
return (time == -1) ? null : Instant.ofEpochSecond(time);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.time.Instant;
import java.util.Map;

import org.apache.kafka.connect.data.Schema;
Expand Down Expand Up @@ -142,15 +143,24 @@ public void assertSourceInfoContents(SourceInfo source,
assertThat(source.hasPosition()).isEqualTo(hasOffset);

Map<String, ?> offset = context.getOffset();
assertThat(offset.get(SourceInfo.TIMESTAMP)).isEqualTo((timestamp != null) ? timestamp.getTime() : 0);
assertThat(offset.get(SourceInfo.TIMESTAMP)).isEqualTo((timestamp != null) ? timestamp.getTime() : -1);
assertThat(offset.get(SourceInfo.ORDER)).isEqualTo((timestamp != null) ? timestamp.getInc() : -1);

String resumeToken = source.lastResumeToken();
assertThat(resumeToken).isEqualTo(resumeTokenData);

source.collectionEvent(new CollectionId("test", "names"), 0L);
source.readEvent(new CollectionId("test", "names"), 0L);

var structPreMakeTime = Instant.now().toEpochMilli();
Struct struct = source.struct();
assertThat(struct.getInt64(SourceInfo.TIMESTAMP_KEY)).isEqualTo((timestamp != null) ? timestamp.getTime() * 1000L : 0L);
var structPostMakeTime = Instant.now().toEpochMilli();

if (timestamp != null) {
assertThat(struct.getInt64(SourceInfo.TIMESTAMP_KEY)).isEqualTo(timestamp.getTime() * 1000L);
}
else {
assertThat(struct.getInt64(SourceInfo.TIMESTAMP_KEY)).isBetween(structPreMakeTime, structPostMakeTime);
}
assertThat(struct.getInt32(SourceInfo.ORDER)).isEqualTo((timestamp != null) ? timestamp.getInc() : -1);
assertThat(struct.getString(SourceInfo.DATABASE_NAME_KEY)).isEqualTo("test");
assertThat(struct.getString(SourceInfo.COLLECTION)).isEqualTo("names");
Expand Down Expand Up @@ -230,7 +240,7 @@ public void wallTimeIsPresent() {
var cursor = mockEventChangeStreamCursor();
source.initEvent(cursor);
assertThat(source.struct().getInt64(SourceInfo.WALL_TIME)).isNull();
source.collectionEvent(new CollectionId("test", "names"), 10L);
source.readEvent(new CollectionId("test", "names"), 10L);
assertThat(source.struct().getInt64(SourceInfo.WALL_TIME)).isEqualTo(10L);
}

Expand Down

0 comments on commit 5ad65c4

Please sign in to comment.