diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java index 3dc0ba7b6f24af..242bb9f1035e72 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java @@ -19,6 +19,7 @@ package org.apache.pulsar.admin.cli; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.admin.cli.CmdTopics.printMessages; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import io.netty.buffer.ByteBuf; @@ -589,26 +590,7 @@ private class PeekMessages extends CliCommand { void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(topicName); List> messages = getPersistentTopics().peekMessages(persistentTopic, subName, numMessages); - int position = 0; - for (Message msg : messages) { - if (++position != 1) { - System.out.println("-------------------------------------------------------------------------\n"); - } - if (msg.getMessageId() instanceof BatchMessageIdImpl) { - BatchMessageIdImpl msgId = (BatchMessageIdImpl) msg.getMessageId(); - System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" - + msgId.getBatchIndex()); - } else { - MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); - System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId()); - } - if (msg.getProperties().size() > 0) { - System.out.println("Properties:"); - print(msg.getProperties()); - } - ByteBuf data = Unpooled.wrappedBuffer(msg.getData()); - System.out.println(ByteBufUtil.prettyHexDump(data)); - } + printMessages(messages, false, this); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 22073b1a89dc9f..ca15e111390463 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -1117,50 +1117,7 @@ void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(topicName); List> messages = getTopics().peekMessages(persistentTopic, subName, numMessages, showServerMarker, transactionIsolationLevel); - int position = 0; - for (Message msg : messages) { - MessageImpl message = (MessageImpl) msg; - if (++position != 1) { - System.out.println("-------------------------------------------------------------------------\n"); - } - if (message.getMessageId() instanceof BatchMessageIdImpl) { - BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId(); - System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" - + msgId.getBatchIndex()); - } else { - MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); - System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId()); - } - - System.out.println("Publish time: " + message.getPublishTime()); - System.out.println("Event time: " + message.getEventTime()); - - if (message.getDeliverAtTime() != 0) { - System.out.println("Deliver at time: " + message.getDeliverAtTime()); - } - MessageMetadata msgMetaData = message.getMessageBuilder(); - if (showServerMarker && msgMetaData.hasMarkerType()) { - System.out.println("Marker Type: " + MarkerType.valueOf(msgMetaData.getMarkerType())); - } - - if (message.getBrokerEntryMetadata() != null) { - if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) { - System.out.println("Broker entry metadata timestamp: " - + message.getBrokerEntryMetadata().getBrokerTimestamp()); - } - if (message.getBrokerEntryMetadata().hasIndex()) { - System.out.println("Broker entry metadata index: " - + message.getBrokerEntryMetadata().getIndex()); - } - } - - if (message.getProperties().size() > 0) { - System.out.println("Properties:"); - print(msg.getProperties()); - } - ByteBuf data = Unpooled.wrappedBuffer(msg.getData()); - System.out.println(ByteBufUtil.prettyHexDump(data)); - } + printMessages(messages, showServerMarker, this); } } @@ -1379,6 +1336,55 @@ static MessageId findFirstLedgerWithinThreshold(List> messages, boolean showServerMarker, CliCommand cli) { + if (messages == null) { + return; + } + int position = 0; + for (Message msg : messages) { + MessageImpl message = (MessageImpl) msg; + if (++position != 1) { + System.out.println("-------------------------------------------------------------------------\n"); + } + if (message.getMessageId() instanceof BatchMessageIdImpl) { + BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId(); + System.out.println("Batch Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId() + ":" + + msgId.getBatchIndex()); + } else { + MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); + System.out.println("Message ID: " + msgId.getLedgerId() + ":" + msgId.getEntryId()); + } + + System.out.println("Publish time: " + message.getPublishTime()); + System.out.println("Event time: " + message.getEventTime()); + + if (message.getDeliverAtTime() != 0) { + System.out.println("Deliver at time: " + message.getDeliverAtTime()); + } + MessageMetadata msgMetaData = message.getMessageBuilder(); + if (showServerMarker && msgMetaData.hasMarkerType()) { + System.out.println("Marker Type: " + MarkerType.valueOf(msgMetaData.getMarkerType())); + } + + if (message.getBrokerEntryMetadata() != null) { + if (message.getBrokerEntryMetadata().hasBrokerTimestamp()) { + System.out.println("Broker entry metadata timestamp: " + + message.getBrokerEntryMetadata().getBrokerTimestamp()); + } + if (message.getBrokerEntryMetadata().hasIndex()) { + System.out.println("Broker entry metadata index: " + message.getBrokerEntryMetadata().getIndex()); + } + } + + if (message.getProperties().size() > 0) { + System.out.println("Properties:"); + cli.print(msg.getProperties()); + } + ByteBuf data = Unpooled.wrappedBuffer(msg.getData()); + System.out.println(ByteBufUtil.prettyHexDump(data)); + } + } + @Command(description = "Trigger offload of data from a topic to long-term storage (e.g. Amazon S3)") private class Offload extends CliCommand { @Option(names = { "-s", "--size-threshold" },