Skip to content

Commit

Permalink
[improve][cli] Support additional msg metadata for V1 topic on peek m…
Browse files Browse the repository at this point in the history
…essage cmd
  • Loading branch information
rdhabalia committed Feb 13, 2025
1 parent 0a95976 commit 7dfaa03
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.admin.cli;

import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.admin.cli.CmdTopics.printMessages;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -589,26 +590,7 @@ private class PeekMessages extends CliCommand {
void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(topicName);
List<Message<byte[]>> messages = getPersistentTopics().peekMessages(persistentTopic, subName, numMessages);
int position = 0;
for (Message<byte[]> msg : messages) {
if (++position != 1) {
System.out.println("-------------------------------------------------------------------------\n");
}
if (msg.getMessageId() instanceof BatchMessageIdImpl) {
BatchMessageIdImpl msgId = (BatchMessageIdImpl) msg.getMessageId();
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
+ msgId.getBatchIndex());
} else {
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
}
if (msg.getProperties().size() > 0) {
System.out.println("Properties:");
print(msg.getProperties());
}
ByteBuf data = Unpooled.wrappedBuffer(msg.getData());
System.out.println(ByteBufUtil.prettyHexDump(data));
}
printMessages(messages, false, this);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1117,50 +1117,7 @@ void run() throws PulsarAdminException {
String persistentTopic = validatePersistentTopic(topicName);
List<Message<byte[]>> messages = getTopics().peekMessages(persistentTopic, subName, numMessages,
showServerMarker, transactionIsolationLevel);
int position = 0;
for (Message<byte[]> msg : messages) {
MessageImpl message = (MessageImpl) msg;
if (++position != 1) {
System.out.println("-------------------------------------------------------------------------\n");
}
if (message.getMessageId() instanceof BatchMessageIdImpl) {
BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId();
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
+ msgId.getBatchIndex());
} else {
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
}

System.out.println("Publish time: " + message.getPublishTime());
System.out.println("Event time: " + message.getEventTime());

if (message.getDeliverAtTime() != 0) {
System.out.println("Deliver at time: " + message.getDeliverAtTime());
}
MessageMetadata msgMetaData = message.getMessageBuilder();
if (showServerMarker && msgMetaData.hasMarkerType()) {
System.out.println("Marker Type: " + MarkerType.valueOf(msgMetaData.getMarkerType()));
}

if (message.getBrokerEntryMetadata() != null) {
if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) {
System.out.println("Broker entry metadata timestamp: "
+ message.getBrokerEntryMetadata().getBrokerTimestamp());
}
if (message.getBrokerEntryMetadata().hasIndex()) {
System.out.println("Broker entry metadata index: "
+ message.getBrokerEntryMetadata().getIndex());
}
}

if (message.getProperties().size() > 0) {
System.out.println("Properties:");
print(msg.getProperties());
}
ByteBuf data = Unpooled.wrappedBuffer(msg.getData());
System.out.println(ByteBufUtil.prettyHexDump(data));
}
printMessages(messages, showServerMarker, this);
}
}

Expand Down Expand Up @@ -1379,6 +1336,55 @@ static MessageId findFirstLedgerWithinThreshold(List<PersistentTopicInternalStat
return null;
}

public static void printMessages(List<Message<byte[]>> messages, boolean showServerMarker, CliCommand cli) {
if (messages == null) {
return;
}
int position = 0;
for (Message<byte[]> msg : messages) {
MessageImpl message = (MessageImpl) msg;
if (++position != 1) {
System.out.println("-------------------------------------------------------------------------\n");
}
if (message.getMessageId() instanceof BatchMessageIdImpl) {
BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId();
System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":"
+ msgId.getBatchIndex());
} else {
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId());
}

System.out.println("Publish time: " + message.getPublishTime());
System.out.println("Event time: " + message.getEventTime());

if (message.getDeliverAtTime() != 0) {
System.out.println("Deliver at time: " + message.getDeliverAtTime());
}
MessageMetadata msgMetaData = message.getMessageBuilder();
if (showServerMarker && msgMetaData.hasMarkerType()) {
System.out.println("Marker Type: " + MarkerType.valueOf(msgMetaData.getMarkerType()));
}

if (message.getBrokerEntryMetadata() != null) {
if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) {
System.out.println("Broker entry metadata timestamp: "
+ message.getBrokerEntryMetadata().getBrokerTimestamp());
}
if (message.getBrokerEntryMetadata().hasIndex()) {
System.out.println("Broker entry metadata index: " + message.getBrokerEntryMetadata().getIndex());
}
}

if (message.getProperties().size() > 0) {
System.out.println("Properties:");
cli.print(msg.getProperties());
}
ByteBuf data = Unpooled.wrappedBuffer(msg.getData());
System.out.println(ByteBufUtil.prettyHexDump(data));
}
}

@Command(description = "Trigger offload of data from a topic to long-term storage (e.g. Amazon S3)")
private class Offload extends CliCommand {
@Option(names = { "-s", "--size-threshold" },
Expand Down

0 comments on commit 7dfaa03

Please sign in to comment.