From 1070e0093aa0d6a41781622c7dcc44a032e35e51 Mon Sep 17 00:00:00 2001 From: Dirk Adler Date: Sat, 25 Jan 2025 17:18:43 +0100 Subject: [PATCH] chore: rollback catching Throwable in callback proxies --- src/RdKafka/FFI/ConsumeCallbackProxy.php | 12 ++++------ src/RdKafka/FFI/DrMsgCallbackProxy.php | 14 +++++------- src/RdKafka/FFI/ErrorCallbackProxy.php | 16 +++++--------- src/RdKafka/FFI/LogCallbackProxy.php | 16 +++++--------- .../FFI/NativePartitionerCallbackProxy.php | 22 +++++++------------ src/RdKafka/FFI/OffsetCommitCallbackProxy.php | 16 +++++--------- src/RdKafka/FFI/PartitionerCallbackProxy.php | 18 +++++---------- src/RdKafka/FFI/RebalanceCallbackProxy.php | 16 +++++--------- src/RdKafka/FFI/StatsCallbackProxy.php | 16 +++++--------- 9 files changed, 53 insertions(+), 93 deletions(-) diff --git a/src/RdKafka/FFI/ConsumeCallbackProxy.php b/src/RdKafka/FFI/ConsumeCallbackProxy.php index 65400917..cb063c9c 100644 --- a/src/RdKafka/FFI/ConsumeCallbackProxy.php +++ b/src/RdKafka/FFI/ConsumeCallbackProxy.php @@ -11,13 +11,9 @@ class ConsumeCallbackProxy extends CallbackProxy { public function __invoke(CData $nativeMessage, ?CData $opaque = null): void { - try { - ($this->callback)( - new Message($nativeMessage), - OpaqueMap::get($opaque) - ); - } catch (\Throwable $exception) { - error_log($exception->getMessage(), E_ERROR); - } + ($this->callback)( + new Message($nativeMessage), + OpaqueMap::get($opaque) + ); } } diff --git a/src/RdKafka/FFI/DrMsgCallbackProxy.php b/src/RdKafka/FFI/DrMsgCallbackProxy.php index 108b3d4c..017f3433 100644 --- a/src/RdKafka/FFI/DrMsgCallbackProxy.php +++ b/src/RdKafka/FFI/DrMsgCallbackProxy.php @@ -12,14 +12,10 @@ class DrMsgCallbackProxy extends CallbackProxy { public function __invoke(CData $producer, CData $nativeMessage, ?CData $opaque = null): void { - try { - ($this->callback)( - RdKafka::resolveFromCData($producer), - new Message($nativeMessage), - OpaqueMap::get($opaque) - ); - } catch (\Throwable $exception) { - error_log($exception->getMessage(), E_ERROR); - } + ($this->callback)( + RdKafka::resolveFromCData($producer), + new Message($nativeMessage), + OpaqueMap::get($opaque) + ); } } diff --git a/src/RdKafka/FFI/ErrorCallbackProxy.php b/src/RdKafka/FFI/ErrorCallbackProxy.php index df1a181f..770a0888 100644 --- a/src/RdKafka/FFI/ErrorCallbackProxy.php +++ b/src/RdKafka/FFI/ErrorCallbackProxy.php @@ -11,15 +11,11 @@ class ErrorCallbackProxy extends CallbackProxy { public function __invoke(CData $consumerOrProducer, int $err, string $reason, ?CData $opaque = null): void { - try { - ($this->callback)( - RdKafka::resolveFromCData($consumerOrProducer), - $err, - $reason, - OpaqueMap::get($opaque) - ); - } catch (\Throwable $exception) { - error_log($exception->getMessage(), E_ERROR); - } + ($this->callback)( + RdKafka::resolveFromCData($consumerOrProducer), + $err, + $reason, + OpaqueMap::get($opaque) + ); } } diff --git a/src/RdKafka/FFI/LogCallbackProxy.php b/src/RdKafka/FFI/LogCallbackProxy.php index 8f0c24a1..dd8da7e0 100644 --- a/src/RdKafka/FFI/LogCallbackProxy.php +++ b/src/RdKafka/FFI/LogCallbackProxy.php @@ -11,15 +11,11 @@ class LogCallbackProxy extends CallbackProxy { public function __invoke(CData $rdkafka, int $level, string $facility, string $message): void { - try { - ($this->callback)( - RdKafka::resolveFromCData($rdkafka), - $level, - $facility, - $message - ); - } catch (\Throwable $exception) { - error_log($exception->getMessage(), E_ERROR); - } + ($this->callback)( + RdKafka::resolveFromCData($rdkafka), + $level, + $facility, + $message + ); } } diff --git a/src/RdKafka/FFI/NativePartitionerCallbackProxy.php b/src/RdKafka/FFI/NativePartitionerCallbackProxy.php index 50669f16..c6b18421 100644 --- a/src/RdKafka/FFI/NativePartitionerCallbackProxy.php +++ b/src/RdKafka/FFI/NativePartitionerCallbackProxy.php @@ -25,20 +25,14 @@ public function __invoke( ?CData $topic_opaque = null, ?CData $msg_opaque = null ): int { - try { - return (int) Library::{$this->partitionerMethod}( - $topic, - $keydata, - $keylen, - $partition_cnt, - OpaqueMap::get($topic_opaque), - OpaqueMap::get($msg_opaque) - ); - } catch (\Throwable $exception) { - error_log($exception->getMessage(), E_ERROR); - } - - return RD_KAFKA_PARTITION_UA; + return (int) Library::{$this->partitionerMethod}( + $topic, + $keydata, + $keylen, + $partition_cnt, + OpaqueMap::get($topic_opaque), + OpaqueMap::get($msg_opaque) + ); } public static function create(string $partitionerMethod): Closure diff --git a/src/RdKafka/FFI/OffsetCommitCallbackProxy.php b/src/RdKafka/FFI/OffsetCommitCallbackProxy.php index 5dde527c..fc44b3b6 100644 --- a/src/RdKafka/FFI/OffsetCommitCallbackProxy.php +++ b/src/RdKafka/FFI/OffsetCommitCallbackProxy.php @@ -12,15 +12,11 @@ class OffsetCommitCallbackProxy extends CallbackProxy { public function __invoke(CData $consumer, int $err, CData $nativeTopicPartitionList, ?CData $opaque = null): void { - try { - ($this->callback)( - RdKafka::resolveFromCData($consumer), - $err, - TopicPartitionList::fromCData($nativeTopicPartitionList)->asArray(), - OpaqueMap::get($opaque) - ); - } catch (\Throwable $exception) { - error_log($exception->getMessage(), E_ERROR); - } + ($this->callback)( + RdKafka::resolveFromCData($consumer), + $err, + TopicPartitionList::fromCData($nativeTopicPartitionList)->asArray(), + OpaqueMap::get($opaque) + ); } } diff --git a/src/RdKafka/FFI/PartitionerCallbackProxy.php b/src/RdKafka/FFI/PartitionerCallbackProxy.php index 27c89b0a..0a28b7bc 100644 --- a/src/RdKafka/FFI/PartitionerCallbackProxy.php +++ b/src/RdKafka/FFI/PartitionerCallbackProxy.php @@ -17,17 +17,11 @@ public function __invoke( ?CData $topic_opaque = null, ?CData $msg_opaque = null ): int { - try { - return (int) ($this->callback)( - $keydata === null ? null : FFI::string($keydata, $keylen), - $partition_cnt, - OpaqueMap::get($topic_opaque), - OpaqueMap::get($msg_opaque) - ); - } catch (\Throwable $exception) { - error_log($exception->getMessage(), E_ERROR); - } - - return RD_KAFKA_PARTITION_UA; + return (int) ($this->callback)( + $keydata === null ? null : FFI::string($keydata, $keylen), + $partition_cnt, + OpaqueMap::get($topic_opaque), + OpaqueMap::get($msg_opaque) + ); } } diff --git a/src/RdKafka/FFI/RebalanceCallbackProxy.php b/src/RdKafka/FFI/RebalanceCallbackProxy.php index cbb43204..24e3dc56 100644 --- a/src/RdKafka/FFI/RebalanceCallbackProxy.php +++ b/src/RdKafka/FFI/RebalanceCallbackProxy.php @@ -12,15 +12,11 @@ class RebalanceCallbackProxy extends CallbackProxy { public function __invoke(CData $consumer, int $err, CData $nativeTopicPartitionList, ?CData $opaque = null): void { - try { - ($this->callback)( - RdKafka::resolveFromCData($consumer), - $err, - TopicPartitionList::fromCData($nativeTopicPartitionList)->asArray(), - OpaqueMap::get($opaque) - ); - } catch (\Throwable $exception) { - error_log($exception->getMessage(), E_ERROR); - } + ($this->callback)( + RdKafka::resolveFromCData($consumer), + $err, + TopicPartitionList::fromCData($nativeTopicPartitionList)->asArray(), + OpaqueMap::get($opaque) + ); } } diff --git a/src/RdKafka/FFI/StatsCallbackProxy.php b/src/RdKafka/FFI/StatsCallbackProxy.php index deec22ef..0e6e2122 100644 --- a/src/RdKafka/FFI/StatsCallbackProxy.php +++ b/src/RdKafka/FFI/StatsCallbackProxy.php @@ -12,16 +12,12 @@ class StatsCallbackProxy extends CallbackProxy { public function __invoke(CData $consumerOrProducer, CData $json, int $json_len, ?CData $opaque = null): int { - try { - ($this->callback)( - RdKafka::resolveFromCData($consumerOrProducer), - FFI::string($json, $json_len), - $json_len, - OpaqueMap::get($opaque) - ); - } catch (\Throwable $exception) { - error_log($exception->getMessage(), E_ERROR); - } + ($this->callback)( + RdKafka::resolveFromCData($consumerOrProducer), + FFI::string($json, $json_len), + $json_len, + OpaqueMap::get($opaque) + ); return 0; }