Skip to content

Commit

Permalink
[improve][client] Add a producer config to improve compaction perform…
Browse files Browse the repository at this point in the history
…ance
  • Loading branch information
xiangying committed Oct 29, 2024
1 parent fca9c5c commit a2014b6
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public ByteBuf toByteBuf() {
}
}

ByteBuf encryptedPayload = encrypt(getCompressedBatchMetadataAndPayload());
ByteBuf encryptedPayload = encrypt(getCompressedBatchMetadataAndPayload(true));
updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
ByteBuf metadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
messageMetadata, encryptedPayload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public boolean add(MessageImpl<?> msg, SendCallback callback) {
return isBatchFull();
}

protected ByteBuf getCompressedBatchMetadataAndPayload() {
protected ByteBuf getCompressedBatchMetadataAndPayload(boolean isBrokerTwoPhaseCompactor) {
int batchWriteIndex = batchedMessageMetadataAndPayload.writerIndex();
int batchReadIndex = batchedMessageMetadataAndPayload.readerIndex();

Expand Down Expand Up @@ -169,9 +169,20 @@ protected ByteBuf getCompressedBatchMetadataAndPayload() {
}

int uncompressedSize = batchedMessageMetadataAndPayload.readableBytes();
ByteBuf compressedPayload = compressor.encode(batchedMessageMetadataAndPayload);
batchedMessageMetadataAndPayload.release();
if (compressionType != CompressionType.NONE) {
ByteBuf compressedPayload;
boolean isCompressed = false;
if (!isBrokerTwoPhaseCompactor && producer != null){
if (uncompressedSize > producer.conf.getCompressMinMsgBodySize()) {
compressedPayload = producer.applyCompression(batchedMessageMetadataAndPayload);
isCompressed = true;
} else {
compressedPayload = batchedMessageMetadataAndPayload;
}
} else {
compressedPayload = compressor.encode(batchedMessageMetadataAndPayload);
batchedMessageMetadataAndPayload.release();
}
if (compressionType != CompressionType.NONE && isCompressed) {
messageMetadata.setCompression(compressionType);
messageMetadata.setUncompressedSize(uncompressedSize);
}
Expand Down Expand Up @@ -252,7 +263,8 @@ public OpSendMsg createOpSendMsg() throws IOException {
if (messages.size() == 1) {
messageMetadata.clear();
messageMetadata.copyFrom(messages.get(0).getMessageBuilder());
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload());
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata,
getCompressedBatchMetadataAndPayload(false));
updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
ByteBufPair cmd = producer.sendMessage(producer.producerId, messageMetadata.getSequenceId(),
1, null, messageMetadata, encryptedPayload);
Expand Down Expand Up @@ -283,7 +295,8 @@ public OpSendMsg createOpSendMsg() throws IOException {
lowestSequenceId = -1L;
return op;
}
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata, getCompressedBatchMetadataAndPayload());
ByteBuf encryptedPayload = producer.encryptMessage(messageMetadata,
getCompressedBatchMetadataAndPayload(false));
updateAndReserveBatchAllocatedSize(encryptedPayload.capacity());
if (encryptedPayload.readableBytes() > getMaxMessageSize()) {
producer.semaphoreRelease(messages.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Getter;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.schema.AbstractSchema;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
Expand Down Expand Up @@ -773,6 +769,10 @@ int getUncompressedSize() {
return uncompressedSize;
}

CompressionType getCompressionType() {
return CompressionType.valueOf(msgMetadata.getCompression().name());
}

SchemaState getSchemaState() {
return schemaState;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,8 @@ CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transa
* @param payload
* @return a new payload
*/
private ByteBuf applyCompression(ByteBuf payload) {
@VisibleForTesting
public ByteBuf applyCompression(ByteBuf payload) {
ByteBuf compressedPayload = compressor.encode(payload);
payload.release();
return compressedPayload;
Expand All @@ -505,22 +506,27 @@ public void sendAsync(Message<?> message, SendCallback callback) {
boolean compressed = false;
// Batch will be compressed when closed
// If a message has a delayed delivery time, we'll always send it individually
if (!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()) {
compressedPayload = applyCompression(payload);
compressed = true;
if (((!isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()))) {
if (payload.readableBytes() < conf.getCompressMinMsgBodySize()) {

// validate msg-size (For batching this will be check at the batch completion size)
int compressedSize = compressedPayload.readableBytes();
if (compressedSize > getMaxMessageSize() && !this.conf.isChunkingEnabled()) {
compressedPayload.release();
String compressedStr = conf.getCompressionType() != CompressionType.NONE ? "Compressed" : "";
PulsarClientException.InvalidMessageException invalidMessageException =
new PulsarClientException.InvalidMessageException(
format("The producer %s of the topic %s sends a %s message with %d bytes that exceeds"
+ " %d bytes",
producerName, topic, compressedStr, compressedSize, getMaxMessageSize()));
completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException);
return;
} else {
compressedPayload = applyCompression(payload);
compressed = true;

// validate msg-size (For batching this will be check at the batch completion size)
int compressedSize = compressedPayload.readableBytes();
if (compressedSize > getMaxMessageSize() && !this.conf.isChunkingEnabled()) {
compressedPayload.release();
String compressedStr = conf.getCompressionType() != CompressionType.NONE ? "Compressed" : "";
PulsarClientException.InvalidMessageException invalidMessageException =
new PulsarClientException.InvalidMessageException(
format("The producer %s of the topic %s sends a %s message with %d bytes that exceeds"
+ " %d bytes",
producerName, topic, compressedStr, compressedSize,
getMaxMessageSize()));
completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException);
return;
}
}
}

Expand All @@ -542,7 +548,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {

// Update the message metadata before computing the payload chunk size to avoid a large message cannot be split
// into chunks.
updateMessageMetadata(msgMetadata, uncompressedSize);
updateMessageMetadata(msgMetadata, uncompressedSize, compressed);

// send in chunks
int totalChunks;
Expand Down Expand Up @@ -636,7 +642,7 @@ public void sendAsync(Message<?> message, SendCallback callback) {
* @param uncompressedSize
* @return the sequence id
*/
private void updateMessageMetadata(final MessageMetadata msgMetadata, final int uncompressedSize) {
private void updateMessageMetadata(final MessageMetadata msgMetadata, final int uncompressedSize, boolean isCompressed) {
if (!msgMetadata.hasPublishTime()) {
msgMetadata.setPublishTime(client.getClientClock().millis());

Expand All @@ -646,7 +652,7 @@ private void updateMessageMetadata(final MessageMetadata msgMetadata, final int

// The field "uncompressedSize" is zero means the compression info were not set yet.
if (msgMetadata.getUncompressedSize() <= 0) {
if (conf.getCompressionType() != CompressionType.NONE) {
if (conf.getCompressionType() != CompressionType.NONE && isCompressed) {
msgMetadata
.setCompression(CompressionCodecProvider.convertToWireProtocol(conf.getCompressionType()));
}
Expand Down Expand Up @@ -737,7 +743,7 @@ private void serializeAndSendMessage(MessageImpl<?> msg,
} else {
// in this case compression has not been applied by the caller
// but we have to compress the payload if compression is configured
if (!compressed) {
if (!compressed && chunkPayload.readableBytes() > conf.getCompressMinMsgBodySize()) {
chunkPayload = applyCompression(chunkPayload);
}
ByteBuf encryptedPayload = encryptMessage(msgMetadata, chunkPayload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ public class ProducerConfigurationData implements Serializable, Cloneable {
)
private CompressionType compressionType = CompressionType.NONE;

private int compressMinMsgBodySize = 4 * 1024; // 4kb

// Cannot use Optional<Long> since it's not serializable
private Long initialSequenceId = null;

Expand Down

0 comments on commit a2014b6

Please sign in to comment.