Skip to content

Commit

Permalink
chore: rollback catching Throwable in callback proxies
Browse files Browse the repository at this point in the history
  • Loading branch information
dirx committed Jan 25, 2025
1 parent 57d26fe commit 1070e00
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 93 deletions.
12 changes: 4 additions & 8 deletions src/RdKafka/FFI/ConsumeCallbackProxy.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,9 @@ class ConsumeCallbackProxy extends CallbackProxy
{
public function __invoke(CData $nativeMessage, ?CData $opaque = null): void
{
try {
($this->callback)(
new Message($nativeMessage),
OpaqueMap::get($opaque)
);
} catch (\Throwable $exception) {
error_log($exception->getMessage(), E_ERROR);
}
($this->callback)(
new Message($nativeMessage),
OpaqueMap::get($opaque)
);
}
}
14 changes: 5 additions & 9 deletions src/RdKafka/FFI/DrMsgCallbackProxy.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,10 @@ class DrMsgCallbackProxy extends CallbackProxy
{
public function __invoke(CData $producer, CData $nativeMessage, ?CData $opaque = null): void
{
try {
($this->callback)(
RdKafka::resolveFromCData($producer),
new Message($nativeMessage),
OpaqueMap::get($opaque)
);
} catch (\Throwable $exception) {
error_log($exception->getMessage(), E_ERROR);
}
($this->callback)(
RdKafka::resolveFromCData($producer),
new Message($nativeMessage),
OpaqueMap::get($opaque)
);
}
}
16 changes: 6 additions & 10 deletions src/RdKafka/FFI/ErrorCallbackProxy.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,11 @@ class ErrorCallbackProxy extends CallbackProxy
{
public function __invoke(CData $consumerOrProducer, int $err, string $reason, ?CData $opaque = null): void
{
try {
($this->callback)(
RdKafka::resolveFromCData($consumerOrProducer),
$err,
$reason,
OpaqueMap::get($opaque)
);
} catch (\Throwable $exception) {
error_log($exception->getMessage(), E_ERROR);
}
($this->callback)(
RdKafka::resolveFromCData($consumerOrProducer),
$err,
$reason,
OpaqueMap::get($opaque)
);
}
}
16 changes: 6 additions & 10 deletions src/RdKafka/FFI/LogCallbackProxy.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,11 @@ class LogCallbackProxy extends CallbackProxy
{
public function __invoke(CData $rdkafka, int $level, string $facility, string $message): void
{
try {
($this->callback)(
RdKafka::resolveFromCData($rdkafka),
$level,
$facility,
$message
);
} catch (\Throwable $exception) {
error_log($exception->getMessage(), E_ERROR);
}
($this->callback)(
RdKafka::resolveFromCData($rdkafka),
$level,
$facility,
$message
);
}
}
22 changes: 8 additions & 14 deletions src/RdKafka/FFI/NativePartitionerCallbackProxy.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,14 @@ public function __invoke(
?CData $topic_opaque = null,
?CData $msg_opaque = null
): int {
try {
return (int) Library::{$this->partitionerMethod}(
$topic,
$keydata,
$keylen,
$partition_cnt,
OpaqueMap::get($topic_opaque),
OpaqueMap::get($msg_opaque)
);
} catch (\Throwable $exception) {
error_log($exception->getMessage(), E_ERROR);
}

return RD_KAFKA_PARTITION_UA;
return (int) Library::{$this->partitionerMethod}(
$topic,
$keydata,
$keylen,
$partition_cnt,
OpaqueMap::get($topic_opaque),
OpaqueMap::get($msg_opaque)
);
}

public static function create(string $partitionerMethod): Closure
Expand Down
16 changes: 6 additions & 10 deletions src/RdKafka/FFI/OffsetCommitCallbackProxy.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,11 @@ class OffsetCommitCallbackProxy extends CallbackProxy
{
public function __invoke(CData $consumer, int $err, CData $nativeTopicPartitionList, ?CData $opaque = null): void
{
try {
($this->callback)(
RdKafka::resolveFromCData($consumer),
$err,
TopicPartitionList::fromCData($nativeTopicPartitionList)->asArray(),
OpaqueMap::get($opaque)
);
} catch (\Throwable $exception) {
error_log($exception->getMessage(), E_ERROR);
}
($this->callback)(
RdKafka::resolveFromCData($consumer),
$err,
TopicPartitionList::fromCData($nativeTopicPartitionList)->asArray(),
OpaqueMap::get($opaque)
);
}
}
18 changes: 6 additions & 12 deletions src/RdKafka/FFI/PartitionerCallbackProxy.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,11 @@ public function __invoke(
?CData $topic_opaque = null,
?CData $msg_opaque = null
): int {
try {
return (int) ($this->callback)(
$keydata === null ? null : FFI::string($keydata, $keylen),
$partition_cnt,
OpaqueMap::get($topic_opaque),
OpaqueMap::get($msg_opaque)
);
} catch (\Throwable $exception) {
error_log($exception->getMessage(), E_ERROR);
}

return RD_KAFKA_PARTITION_UA;
return (int) ($this->callback)(
$keydata === null ? null : FFI::string($keydata, $keylen),
$partition_cnt,
OpaqueMap::get($topic_opaque),
OpaqueMap::get($msg_opaque)
);
}
}
16 changes: 6 additions & 10 deletions src/RdKafka/FFI/RebalanceCallbackProxy.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,11 @@ class RebalanceCallbackProxy extends CallbackProxy
{
public function __invoke(CData $consumer, int $err, CData $nativeTopicPartitionList, ?CData $opaque = null): void
{
try {
($this->callback)(
RdKafka::resolveFromCData($consumer),
$err,
TopicPartitionList::fromCData($nativeTopicPartitionList)->asArray(),
OpaqueMap::get($opaque)
);
} catch (\Throwable $exception) {
error_log($exception->getMessage(), E_ERROR);
}
($this->callback)(
RdKafka::resolveFromCData($consumer),
$err,
TopicPartitionList::fromCData($nativeTopicPartitionList)->asArray(),
OpaqueMap::get($opaque)
);
}
}
16 changes: 6 additions & 10 deletions src/RdKafka/FFI/StatsCallbackProxy.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,12 @@ class StatsCallbackProxy extends CallbackProxy
{
public function __invoke(CData $consumerOrProducer, CData $json, int $json_len, ?CData $opaque = null): int
{
try {
($this->callback)(
RdKafka::resolveFromCData($consumerOrProducer),
FFI::string($json, $json_len),
$json_len,
OpaqueMap::get($opaque)
);
} catch (\Throwable $exception) {
error_log($exception->getMessage(), E_ERROR);
}
($this->callback)(
RdKafka::resolveFromCData($consumerOrProducer),
FFI::string($json, $json_len),
$json_len,
OpaqueMap::get($opaque)
);

return 0;
}
Expand Down

0 comments on commit 1070e00

Please sign in to comment.