Skip to content

Commit

Permalink
chore: fix cs
Browse files Browse the repository at this point in the history
  • Loading branch information
dirx committed Jan 25, 2025
1 parent b93d3fd commit 9354ea4
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 37 deletions.
2 changes: 1 addition & 1 deletion src/RdKafka/TopicConf.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public function dump(): array
{
$count = Library::new('size_t');
$dump = Library::rd_kafka_topic_conf_dump($this->topicConf, FFI::addr($count));
$count = (int)$count->cdata;
$count = (int) $count->cdata;

$result = [];
for ($i = 0; $i < $count; $i += 2) {
Expand Down
68 changes: 34 additions & 34 deletions tests/RdKafka/Test/MockClusterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@ protected function setUp(): void
public function testCreateWithProducingAndConsuming(): void
{
$clusterConfig = new Conf();
$clusterConfig->set('log_level', (string)LOG_EMERG);
$clusterConfig->set('log_level', (string) LOG_EMERG);
$cluster = MockCluster::create(1, $clusterConfig);

$producerConfig = new Conf();
$producerConfig->set('log_level', (string)LOG_EMERG);
$producerConfig->set('log_level', (string) LOG_EMERG);
$producerConfig->set('bootstrap.servers', $cluster->getBootstraps());
$producer = new Producer($producerConfig);
$producerTopic = $producer->newTopic(KAFKA_TEST_TOPIC);
$producerTopic->produce(0, 0, __METHOD__, __METHOD__);
$producer->flush(KAFKA_TEST_LONG_TIMEOUT_MS);

$consumerConfig = new Conf();
$consumerConfig->set('log_level', (string)LOG_EMERG);
$consumerConfig->set('log_level', (string) LOG_EMERG);
$consumerConfig->set('bootstrap.servers', $cluster->getBootstraps());
$consumerConfig->set('group.id', __METHOD__);
$consumer = new KafkaConsumer($consumerConfig);
Expand All @@ -65,8 +65,8 @@ public function testFromProducer(): void
$this->requiresLibrdkafkaVersion('>=', '1.4.0');

$producerConfig = new Conf();
$producerConfig->set('log_level', (string)LOG_EMERG);
$producerConfig->set('test.mock.num.brokers', (string)3);
$producerConfig->set('log_level', (string) LOG_EMERG);
$producerConfig->set('test.mock.num.brokers', (string) 3);
$producer = new Producer($producerConfig);

$cluster = MockCluster::fromProducer($producer);
Expand All @@ -79,7 +79,7 @@ public function testFromProducerWithMissingConfigShouldFail(): void
$this->requiresLibrdkafkaVersion('>=', '1.4.0');

$producerConfig = new Conf();
$producerConfig->set('log_level', (string)LOG_EMERG);
$producerConfig->set('log_level', (string) LOG_EMERG);
$producer = new Producer($producerConfig);

$this->expectException(Exception::class);
Expand All @@ -90,7 +90,7 @@ public function testFromProducerWithMissingConfigShouldFail(): void
public function testGetBootstraps(): void
{
$clusterConfig = new Conf();
$clusterConfig->set('log_level', (string)LOG_EMERG);
$clusterConfig->set('log_level', (string) LOG_EMERG);
$cluster = MockCluster::create(3, $clusterConfig);

$bootstrap = $cluster->getBootstraps();
Expand All @@ -105,7 +105,7 @@ public function testSetApiVersion(): void
$logStack = [];

$clusterConfig = new Conf();
$clusterConfig->set('log_level', (string)LOG_EMERG);
$clusterConfig->set('log_level', (string) LOG_EMERG);
$cluster = MockCluster::create(1, $clusterConfig);

$cluster->setApiVersion(ApiKey::Produce, 4, 6);
Expand Down Expand Up @@ -138,13 +138,13 @@ function (Producer $producer, int $level, string $fac, string $buf) use (&$logSt
public function testSetPartitionFollowerAndLeader(): void
{
$clusterConfig = new Conf();
$clusterConfig->set('log_level', (string)LOG_EMERG);
$clusterConfig->set('log_level', (string) LOG_EMERG);
$cluster = MockCluster::create(2, $clusterConfig);

$producerConfig = new Conf();
$producerConfig->set('log_level', (string)LOG_EMERG);
$producerConfig->set('log_level', (string) LOG_EMERG);
$producerConfig->set('bootstrap.servers', $cluster->getBootstraps());
$producerConfig->set('topic.metadata.refresh.interval.ms', (string)50);
$producerConfig->set('topic.metadata.refresh.interval.ms', (string) 50);
$producer = new Producer($producerConfig);
$producerTopic = $producer->newTopic(KAFKA_TEST_TOPIC);

Expand All @@ -168,17 +168,17 @@ public function testSetPartitionFollowerAndLeader(): void
public function testSetPartitionFollowerWatermarks(): void
{
$clusterConfig = new Conf();
$clusterConfig->set('log_level', (string)LOG_EMERG);
$clusterConfig->set('log_level', (string) LOG_EMERG);
$cluster = MockCluster::create(3, $clusterConfig);

$cluster->setPartitionLeader(KAFKA_TEST_TOPIC, 0, 1);
$cluster->setPartitionFollower(KAFKA_TEST_TOPIC, 0, 2);

// produce 10 msgs
$producerConfig = new Conf();
$producerConfig->set('log_level', (string)LOG_EMERG);
$producerConfig->set('log_level', (string) LOG_EMERG);
$producerConfig->set('bootstrap.servers', $cluster->getBootstraps());
$producerConfig->set('batch.num.messages', (string)1);
$producerConfig->set('batch.num.messages', (string) 1);
$producer = new Producer($producerConfig);
$producerTopic = $producer->newTopic(KAFKA_TEST_TOPIC);
for ($i = 0; $i < 10; $i++) {
Expand All @@ -188,11 +188,11 @@ public function testSetPartitionFollowerWatermarks(): void

// prepare consumer
$consumerConfig = new Conf();
$consumerConfig->set('log_level', (string)LOG_EMERG);
$consumerConfig->set('log_level', (string) LOG_EMERG);
$consumerConfig->set('group.id', __METHOD__);
$consumerConfig->set('auto.offset.reset', 'earliest');
$consumerConfig->set('fetch.min.bytes', (string)100);
$consumerConfig->set('fetch.message.max.bytes', (string)1000);
$consumerConfig->set('fetch.min.bytes', (string) 100);
$consumerConfig->set('fetch.message.max.bytes', (string) 1000);
$consumerConfig->set('bootstrap.servers', $cluster->getBootstraps());
$consumer = new KafkaConsumer($consumerConfig);
$consumer->assign([new TopicPartition(KAFKA_TEST_TOPIC, 0, RD_KAFKA_OFFSET_INVALID)]);
Expand Down Expand Up @@ -261,13 +261,13 @@ public function testSetBrokerDownAndUp(): void
$this->requiresLibrdkafkaVersion('>=', '1.4.0');

$clusterConfig = new Conf();
$clusterConfig->set('log_level', (string)LOG_EMERG);
$clusterConfig->set('log_level', (string) LOG_EMERG);
$cluster = MockCluster::create(1, $clusterConfig);

$producerConfig = new Conf();
$producerConfig->set('log_level', (string)LOG_EMERG);
$producerConfig->set('log_level', (string) LOG_EMERG);
$producerConfig->set('bootstrap.servers', $cluster->getBootstraps());
$producerConfig->set('reconnect.backoff.max.ms', (string)1000);
$producerConfig->set('reconnect.backoff.max.ms', (string) 1000);
$producer = new Producer($producerConfig);

$cluster->setBrokerDown(1);
Expand All @@ -291,12 +291,12 @@ public function testSetBrokerDownAndUp(): void
public function testPushRequestErrors(): void
{
$clusterConfig = new Conf();
$clusterConfig->set('log_level', (string)LOG_EMERG);
$clusterConfig->set('log_level', (string) LOG_EMERG);
$cluster = MockCluster::create(1, $clusterConfig);

// produce msg
$producerConfig = new Conf();
$producerConfig->set('log_level', (string)LOG_EMERG);
$producerConfig->set('log_level', (string) LOG_EMERG);
$producerConfig->set('bootstrap.servers', $cluster->getBootstraps());
$producer = new Producer($producerConfig);
$producerTopic = $producer->newTopic(KAFKA_TEST_TOPIC);
Expand All @@ -313,7 +313,7 @@ public function testPushRequestErrors(): void

// try to consume msg
$consumerConfig = new Conf();
$consumerConfig->set('log_level', (string)LOG_EMERG);
$consumerConfig->set('log_level', (string) LOG_EMERG);
$consumerConfig->set('group.id', __METHOD__);
// $consumerConfig->set('debug', 'fetch');
$consumerConfig->set('bootstrap.servers', $cluster->getBootstraps());
Expand All @@ -333,12 +333,12 @@ public function testPushRequestErrorsArray(): void
$this->requiresLibrdkafkaVersion('>=', '1.7.0');

$clusterConfig = new Conf();
$clusterConfig->set('log_level', (string)LOG_EMERG);
$clusterConfig->set('log_level', (string) LOG_EMERG);
$cluster = MockCluster::create(1, $clusterConfig);

// produce msg
$producerConfig = new Conf();
$producerConfig->set('log_level', (string)LOG_EMERG);
$producerConfig->set('log_level', (string) LOG_EMERG);
$producerConfig->set('bootstrap.servers', $cluster->getBootstraps());
$producer = new Producer($producerConfig);
$producerTopic = $producer->newTopic(KAFKA_TEST_TOPIC);
Expand All @@ -357,7 +357,7 @@ public function testPushRequestErrorsArray(): void

// try to consume msg
$consumerConfig = new Conf();
$consumerConfig->set('log_level', (string)LOG_EMERG);
$consumerConfig->set('log_level', (string) LOG_EMERG);
$consumerConfig->set('group.id', __METHOD__);
// $consumerConfig->set('debug', 'fetch');
$consumerConfig->set('bootstrap.servers', $cluster->getBootstraps());
Expand All @@ -377,12 +377,12 @@ public function testCreateTopic(): void
$this->requiresLibrdkafkaVersion('>=', '1.4.0');

$clusterConfig = new Conf();
$clusterConfig->set('log_level', (string)LOG_EMERG);
$clusterConfig->set('log_level', (string) LOG_EMERG);
$cluster = MockCluster::create(1, $clusterConfig);
$cluster->createTopic(KAFKA_TEST_TOPIC, 12, 1);

$producerConfig = new Conf();
$producerConfig->set('log_level', (string)LOG_EMERG);
$producerConfig->set('log_level', (string) LOG_EMERG);
$producerConfig->set('bootstrap.servers', $cluster->getBootstraps());
$producer = new Producer($producerConfig);
$producerTopic = $producer->newTopic(KAFKA_TEST_TOPIC);
Expand All @@ -401,12 +401,12 @@ public function testPushBrokerRequestErrors(): void
$this->requiresLibrdkafkaVersion('<', '1.7.0');

$clusterConfig = new Conf();
$clusterConfig->set('log_level', (string)LOG_EMERG);
$clusterConfig->set('log_level', (string) LOG_EMERG);
$cluster = MockCluster::create(1, $clusterConfig);

// produce msg
$producerConfig = new Conf();
$producerConfig->set('log_level', (string)LOG_EMERG);
$producerConfig->set('log_level', (string) LOG_EMERG);
$producerConfig->set('bootstrap.servers', $cluster->getBootstraps());
$producer = new Producer($producerConfig);
$producerTopic = $producer->newTopic(KAFKA_TEST_TOPIC);
Expand All @@ -424,7 +424,7 @@ public function testPushBrokerRequestErrors(): void

// try to consume msg
$consumerConfig = new Conf();
$consumerConfig->set('log_level', (string)LOG_EMERG);
$consumerConfig->set('log_level', (string) LOG_EMERG);
$consumerConfig->set('group.id', __METHOD__);
// $consumerConfig->set('debug', 'fetch');
$consumerConfig->set('bootstrap.servers', $cluster->getBootstraps());
Expand All @@ -444,12 +444,12 @@ public function testPushBrokerRequestErrorRtts(): void
$this->requiresLibrdkafkaVersion('>=', '1.7.0');

$clusterConfig = new Conf();
$clusterConfig->set('log_level', (string)LOG_EMERG);
$clusterConfig->set('log_level', (string) LOG_EMERG);
$cluster = MockCluster::create(1, $clusterConfig);

// produce msg
$producerConfig = new Conf();
$producerConfig->set('log_level', (string)LOG_EMERG);
$producerConfig->set('log_level', (string) LOG_EMERG);
$producerConfig->set('bootstrap.servers', $cluster->getBootstraps());
$producer = new Producer($producerConfig);
$producerTopic = $producer->newTopic(KAFKA_TEST_TOPIC);
Expand All @@ -469,7 +469,7 @@ public function testPushBrokerRequestErrorRtts(): void

// try to consume msg
$consumerConfig = new Conf();
$consumerConfig->set('log_level', (string)LOG_EMERG);
$consumerConfig->set('log_level', (string) LOG_EMERG);
$consumerConfig->set('group.id', __METHOD__);
// $consumerConfig->set('debug', 'fetch');
$consumerConfig->set('bootstrap.servers', $cluster->getBootstraps());
Expand Down
3 changes: 1 addition & 2 deletions tests/RdKafka/TopicConfTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,7 @@ public function testSetPartitionerCbWithCallback(): void
$topicConf->setOpaque($expectedTopicOpaque);
$topicConf->setPartitionerCb(
function (?string $key, int $partitionCount, ?object $topic_opaque = null, ?object $message_opaque = null) use (
&
$callbackTopicOpaque,
&$callbackTopicOpaque,
&$callbackMessageOpaque
) {
$callbackTopicOpaque = $topic_opaque;
Expand Down

0 comments on commit 9354ea4

Please sign in to comment.