Skip to content

Commit

Permalink
ADD TEST
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangying committed Oct 29, 2024
1 parent a2014b6 commit 08c2853
Showing 1 changed file with 50 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,21 @@
*/
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.*;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -230,4 +227,47 @@ public void testRetentionPolicyByProducingMessages() throws Exception {
assertEquals(internalStats.ledgers.size(), 1);
});
}


@Test
public void testProducerCompressionMinMsgBodySize() throws PulsarClientException {
byte[] msg1022 = new byte[1022];
byte[] msg1025 = new byte[1025];
final String topicName = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
@Cleanup
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
.topic(topicName)
.producerName("producer")
.compressionType(CompressionType.LZ4)
.create();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName("sub")
.subscribe();

producer.conf.setCompressMinMsgBodySize(1024);
producer.conf.setCompressionType(CompressionType.LZ4);
// disable batch
producer.conf.setBatchingEnabled(false);
producer.newMessage().value(msg1022).send();
MessageImpl<byte[]> message = (MessageImpl<byte[]>) consumer.receive();
CompressionType compressionType = message.getCompressionType();
assertEquals(compressionType, CompressionType.NONE);
producer.newMessage().value(msg1025).send();
message = (MessageImpl<byte[]>) consumer.receive();
compressionType = message.getCompressionType();
assertEquals(compressionType, CompressionType.LZ4);

// enable batch
producer.conf.setBatchingEnabled(true);
producer.newMessage().value(msg1022).send();
message = (MessageImpl<byte[]>) consumer.receive();
compressionType = message.getCompressionType();
assertEquals(compressionType, CompressionType.NONE);
producer.newMessage().value(msg1025).send();
message = (MessageImpl<byte[]>) consumer.receive();
compressionType = message.getCompressionType();
assertEquals(compressionType, CompressionType.LZ4);
}
}

0 comments on commit 08c2853

Please sign in to comment.