From 410826e0065c00063c60aa28192392c5fed57c5c Mon Sep 17 00:00:00 2001 From: "H@di" Date: Tue, 17 Sep 2024 10:15:53 +0330 Subject: [PATCH] FE: Handle previousCursor in first page (#550) --- .../io/kafbat/ui/service/MessagesService.java | 32 ++++++------------- .../ui/service/PollingCursorsStorage.java | 19 +++++------ 2 files changed, 18 insertions(+), 33 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/service/MessagesService.java b/api/src/main/java/io/kafbat/ui/service/MessagesService.java index ab80cf675..7ecf987d7 100644 --- a/api/src/main/java/io/kafbat/ui/service/MessagesService.java +++ b/api/src/main/java/io/kafbat/ui/service/MessagesService.java @@ -46,6 +46,7 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.jetbrains.annotations.NotNull; import org.springframework.stereotype.Service; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -216,42 +217,29 @@ public Flux loadMessages(KafkaCluster cluster, @Nullable Integer limit, @Nullable String keySerde, @Nullable String valueSerde) { - return loadMessages( - cluster, - topic, + Cursor cursor = new Cursor( deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde), consumerPosition, getMsgFilter(containsStringFilter, filterId), - fixPageSize(limit), - null + fixPageSize(limit) ); + String cursorId = cursorsStorage.register(cursor, null); + return loadMessages(cluster, topic, cursorId, cursor); } public Flux loadMessages(KafkaCluster cluster, String topic, String cursorId) { Cursor cursor = cursorsStorage.getCursor(cursorId) .orElseThrow(() -> new ValidationException("Next page cursor not found. Maybe it was evicted from cache.")); - return loadMessages( - cluster, - topic, - cursor.deserializer(), - cursor.consumerPosition(), - cursor.filter(), - cursor.limit(), - cursorId - ); + return loadMessages(cluster, topic, cursorId, cursor); } - private Flux loadMessages(KafkaCluster cluster, - String topic, - ConsumerRecordDeserializer deserializer, - ConsumerPosition consumerPosition, - Predicate filter, - int limit, - String cursorId) { + private @NotNull Flux loadMessages(KafkaCluster cluster, String topic, + String cursorId, Cursor cursor) { return withExistingTopic(cluster, topic) .flux() .publishOn(Schedulers.boundedElastic()) - .flatMap(td -> loadMessagesImpl(cluster, deserializer, consumerPosition, filter, limit, cursorId)); + .flatMap(td -> loadMessagesImpl(cluster, + cursor.deserializer(), cursor.consumerPosition(), cursor.filter(), cursor.limit(), cursorId)); } private Flux loadMessagesImpl(KafkaCluster cluster, diff --git a/api/src/main/java/io/kafbat/ui/service/PollingCursorsStorage.java b/api/src/main/java/io/kafbat/ui/service/PollingCursorsStorage.java index 0737b8a5a..88fd26275 100644 --- a/api/src/main/java/io/kafbat/ui/service/PollingCursorsStorage.java +++ b/api/src/main/java/io/kafbat/ui/service/PollingCursorsStorage.java @@ -29,28 +29,25 @@ public Cursor.Tracking createNewCursor(ConsumerRecordDeserializer deserializer, ConsumerPosition originalPosition, Predicate filter, int limit, - @Nullable String cursorId) { - return new Cursor.Tracking(deserializer, originalPosition, filter, limit, cursorId, this::register, - this::getPreviousCursorId); + String cursorId) { + return new Cursor.Tracking(deserializer, originalPosition, filter, limit, cursorId, + this::register, this::getPreviousCursorId); } public Optional getCursor(String id) { return Optional.ofNullable(cursorsCache.getIfPresent(id)); } - public String register(Cursor nextCursor, @Nullable String currentCursorId) { + public String register(Cursor cursor, @Nullable String previousCursorId) { var id = RandomStringUtils.random(8, true, true); - cursorsCache.put(id, nextCursor); - if (currentCursorId != null) { - previousCursorsMap.put(id, currentCursorId); + cursorsCache.put(id, cursor); + if (previousCursorId != null) { + previousCursorsMap.put(id, previousCursorId); } return id; } - public Optional getPreviousCursorId(@Nullable String cursorId) { - if (cursorId == null) { - return Optional.empty(); - } + public Optional getPreviousCursorId(String cursorId) { return Optional.ofNullable(previousCursorsMap.getIfPresent(cursorId)); }