diff --git a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/offset/KafkaOffsetManager.java b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/offset/KafkaOffsetManager.java index fbb4329b..341978b2 100644 --- a/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/offset/KafkaOffsetManager.java +++ b/kafka-connect-oura-source/src/main/java/org/radarbase/connect/rest/oura/offset/KafkaOffsetManager.java @@ -36,7 +36,7 @@ public void initialize(List> partitions) { .filter(e -> e.getValue() != null && e.getValue().containsKey(TIMESTAMP_OFFSET_KEY)) .collect(Collectors.toMap( e -> (String) e.getKey().get("user") + "-" + e.getKey().get("route"), - e -> Instant.ofEpochMilli(((Number) e.getValue().get(TIMESTAMP_OFFSET_KEY)).longValue()))); + e -> Instant.ofEpochSecond(((Number) e.getValue().get(TIMESTAMP_OFFSET_KEY)).longValue()))); } else { logger.warn("Offset storage reader is null, will resume from an empty state."); } diff --git a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt index c1d89c01..d948d874 100644 --- a/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt +++ b/oura-library/src/main/kotlin/org/radarbase/oura/request/OuraRequestGenerator.kt @@ -90,7 +90,7 @@ constructor( offsetTime.coerceAtLeast(startDate) } val endDate = if (user.endDate >= Instant.now()) Instant.now() else user.endDate - if (Duration.between(startOffset, endDate).toDays() <= ONE_DAY) { + if (Duration.between(startOffset, endDate) <= ONE_DAY) { logger.info("Interval between dates is too short. Backing off..") userNextRequest[user.versionedId] = Instant.now().plus(USER_BACK_OFF_TIME) return emptySequence() @@ -145,11 +145,13 @@ constructor( ?: nextRequestTime } else { if (request.startDate.plus(TIME_AFTER_REQUEST).isBefore(Instant.now())) { + logger.info("No records found, updating offsets to end date..") ouraOffsetManager.updateOffsets( request.route, request.user, request.endDate, ) + userNextRequest[request.user.versionedId] = Instant.now().plus(BACK_OFF_TIME) } } return records @@ -240,7 +242,7 @@ constructor( companion object { private val logger = LoggerFactory.getLogger(OuraRequestGenerator::class.java) private val BACK_OFF_TIME = Duration.ofMinutes(10L) - private val ONE_DAY = 1L + private val ONE_DAY = Duration.ofDays(1L) private val TIME_AFTER_REQUEST = Duration.ofDays(30) private val USER_BACK_OFF_TIME = Duration.ofHours(12L) private val SUCCESS_BACK_OFF_TIME = Duration.ofMinutes(1L)