Skip to content

Commit

Permalink
FE: Handle previousCursor in first page (kafbat#550)
Browse files Browse the repository at this point in the history
  • Loading branch information
hadisfr committed Sep 17, 2024
1 parent 6493775 commit 410826e
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 33 deletions.
32 changes: 10 additions & 22 deletions api/src/main/java/io/kafbat/ui/service/MessagesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.jetbrains.annotations.NotNull;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -216,42 +217,29 @@ public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster,
@Nullable Integer limit,
@Nullable String keySerde,
@Nullable String valueSerde) {
return loadMessages(
cluster,
topic,
Cursor cursor = new Cursor(
deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde),
consumerPosition,
getMsgFilter(containsStringFilter, filterId),
fixPageSize(limit),
null
fixPageSize(limit)
);
String cursorId = cursorsStorage.register(cursor, null);
return loadMessages(cluster, topic, cursorId, cursor);
}

public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topic, String cursorId) {
Cursor cursor = cursorsStorage.getCursor(cursorId)
.orElseThrow(() -> new ValidationException("Next page cursor not found. Maybe it was evicted from cache."));
return loadMessages(
cluster,
topic,
cursor.deserializer(),
cursor.consumerPosition(),
cursor.filter(),
cursor.limit(),
cursorId
);
return loadMessages(cluster, topic, cursorId, cursor);
}

private Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster,
String topic,
ConsumerRecordDeserializer deserializer,
ConsumerPosition consumerPosition,
Predicate<TopicMessageDTO> filter,
int limit,
String cursorId) {
private @NotNull Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topic,
String cursorId, Cursor cursor) {
return withExistingTopic(cluster, topic)
.flux()
.publishOn(Schedulers.boundedElastic())
.flatMap(td -> loadMessagesImpl(cluster, deserializer, consumerPosition, filter, limit, cursorId));
.flatMap(td -> loadMessagesImpl(cluster,
cursor.deserializer(), cursor.consumerPosition(), cursor.filter(), cursor.limit(), cursorId));
}

private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
Expand Down
19 changes: 8 additions & 11 deletions api/src/main/java/io/kafbat/ui/service/PollingCursorsStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,25 @@ public Cursor.Tracking createNewCursor(ConsumerRecordDeserializer deserializer,
ConsumerPosition originalPosition,
Predicate<TopicMessageDTO> filter,
int limit,
@Nullable String cursorId) {
return new Cursor.Tracking(deserializer, originalPosition, filter, limit, cursorId, this::register,
this::getPreviousCursorId);
String cursorId) {
return new Cursor.Tracking(deserializer, originalPosition, filter, limit, cursorId,
this::register, this::getPreviousCursorId);
}

public Optional<Cursor> getCursor(String id) {
return Optional.ofNullable(cursorsCache.getIfPresent(id));
}

public String register(Cursor nextCursor, @Nullable String currentCursorId) {
public String register(Cursor cursor, @Nullable String previousCursorId) {
var id = RandomStringUtils.random(8, true, true);
cursorsCache.put(id, nextCursor);
if (currentCursorId != null) {
previousCursorsMap.put(id, currentCursorId);
cursorsCache.put(id, cursor);
if (previousCursorId != null) {
previousCursorsMap.put(id, previousCursorId);
}
return id;
}

public Optional<String> getPreviousCursorId(@Nullable String cursorId) {
if (cursorId == null) {
return Optional.empty();
}
public Optional<String> getPreviousCursorId(String cursorId) {
return Optional.ofNullable(previousCursorsMap.getIfPresent(cursorId));
}

Expand Down

0 comments on commit 410826e

Please sign in to comment.