Skip to content

Commit

Permalink
[add] Добавлен интерфейс ConsumerMessageInterface
Browse files Browse the repository at this point in the history
  • Loading branch information
kEERill committed Dec 10, 2024
1 parent 866cab4 commit d4f1285
Show file tree
Hide file tree
Showing 37 changed files with 143 additions and 115 deletions.
1 change: 0 additions & 1 deletion phpstan.neon.dist
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,3 @@ parameters:
level: 5
paths:
- src
tmpDir: build/phpstan
4 changes: 2 additions & 2 deletions src/Bus.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use Micromus\KafkaBus\Bus\ThreadRegistry;
use Micromus\KafkaBus\Interfaces\Bus\BusInterface;
use Micromus\KafkaBus\Interfaces\Bus\ThreadInterface;
use Micromus\KafkaBus\Interfaces\Messages\MessageInterface;
use Micromus\KafkaBus\Interfaces\Producers\Messages\ProducerMessageInterface;

class Bus implements BusInterface
{
Expand All @@ -25,7 +25,7 @@ public function onConnection(string $connectionName): ThreadInterface
->thread($connectionName);
}

public function publish(MessageInterface $message): void
public function publish(ProducerMessageInterface $message): void
{
$this->thread->publish($message);
}
Expand Down
2 changes: 0 additions & 2 deletions src/Bus/Listeners/Workers/Route.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

namespace Micromus\KafkaBus\Bus\Listeners\Workers;

use Micromus\KafkaBus\Messages\NativeMessageFactory;

readonly class Route
{
public function __construct(
Expand Down
4 changes: 2 additions & 2 deletions src/Bus/Thread.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use Micromus\KafkaBus\Bus\Publishers\PublisherFactory;
use Micromus\KafkaBus\Interfaces\Bus\ThreadInterface;
use Micromus\KafkaBus\Interfaces\Connections\ConnectionInterface;
use Micromus\KafkaBus\Interfaces\Messages\MessageInterface;
use Micromus\KafkaBus\Interfaces\Producers\Messages\ProducerMessageInterface;

class Thread implements ThreadInterface
{
Expand All @@ -31,7 +31,7 @@ private function getPublisher(): Publisher
return $this->publisher;
}

public function publish(MessageInterface $message): void
public function publish(ProducerMessageInterface $message): void
{
$this->publishMany([$message]);
}
Expand Down
4 changes: 2 additions & 2 deletions src/Consumers/Commiters/CommiterInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

namespace Micromus\KafkaBus\Consumers\Commiters;

use Micromus\KafkaBus\Consumers\Messages\ConsumerMessage;
use Micromus\KafkaBus\Interfaces\Consumers\Messages\ConsumerMessageInterface;

interface CommiterInterface
{
public function commit(ConsumerMessage $consumerMessage): void;
public function commit(ConsumerMessageInterface $consumerMessage): void;
}
6 changes: 3 additions & 3 deletions src/Consumers/Commiters/DefaultCommiter.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace Micromus\KafkaBus\Consumers\Commiters;

use Micromus\KafkaBus\Consumers\Messages\ConsumerMessage;
use Micromus\KafkaBus\Interfaces\Consumers\Messages\ConsumerMessageInterface;
use RdKafka\KafkaConsumer;

class DefaultCommiter implements CommiterInterface
Expand All @@ -12,9 +12,9 @@ public function __construct(
) {
}

public function commit(ConsumerMessage $consumerMessage): void
public function commit(ConsumerMessageInterface $consumerMessage): void
{
$this->consumer
->commitAsync($consumerMessage->meta->message);
->commitAsync($consumerMessage->original());
}
}
4 changes: 2 additions & 2 deletions src/Consumers/Commiters/VoidCommiter.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

namespace Micromus\KafkaBus\Consumers\Commiters;

use Micromus\KafkaBus\Consumers\Messages\ConsumerMessage;
use Micromus\KafkaBus\Interfaces\Consumers\Messages\ConsumerMessageInterface;

class VoidCommiter implements CommiterInterface
{
public function commit(ConsumerMessage $consumerMessage): void
public function commit(ConsumerMessageInterface $consumerMessage): void
{
}
}
6 changes: 3 additions & 3 deletions src/Consumers/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
namespace Micromus\KafkaBus\Consumers;

use Micromus\KafkaBus\Consumers\Commiters\CommiterInterface;
use Micromus\KafkaBus\Consumers\Messages\ConsumerMessage;
use Micromus\KafkaBus\Consumers\Messages\ConsumerMessageConverter;
use Micromus\KafkaBus\Interfaces\Consumers\ConsumerInterface;
use Micromus\KafkaBus\Exceptions\Consumers\ConsumerException;
use Micromus\KafkaBus\Exceptions\Consumers\MessageConsumerException;
use Micromus\KafkaBus\Interfaces\Consumers\Messages\ConsumerMessageInterface;
use Micromus\KafkaBus\Support\RetryRepeater;
use RdKafka\Exception;
use RdKafka\KafkaConsumer;
Expand All @@ -33,7 +33,7 @@ public function __destruct()
$this->consumer->close();
}

public function getMessage(): ConsumerMessage
public function getMessage(): ConsumerMessageInterface
{
try {
$message = $this->consumer
Expand All @@ -51,7 +51,7 @@ public function getMessage(): ConsumerMessage
}
}

public function commit(ConsumerMessage $consumerMessage): void
public function commit(ConsumerMessageInterface $consumerMessage): void
{
$this->retryRepeater
->execute(fn () => $this->commiter->commit($consumerMessage));
Expand Down
6 changes: 3 additions & 3 deletions src/Consumers/ConsumerStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

namespace Micromus\KafkaBus\Consumers;

use Micromus\KafkaBus\Consumers\Messages\ConsumerMessage;
use Micromus\KafkaBus\Exceptions\Consumers\MessageConsumerNotHandledException;
use Micromus\KafkaBus\Interfaces\Consumers\ConsumerInterface;
use Micromus\KafkaBus\Interfaces\Consumers\ConsumerStreamInterface;
use Micromus\KafkaBus\Interfaces\Consumers\Messages\ConsumerMessageHandlerInterface;
use Micromus\KafkaBus\Exceptions\Consumers\MessageConsumerException;
use Micromus\KafkaBus\Interfaces\Consumers\Messages\ConsumerMessageInterface;
use Micromus\KafkaBus\Testing\Exceptions\KafkaMessagesEndedException;

class ConsumerStream implements ConsumerStreamInterface
Expand Down Expand Up @@ -49,12 +49,12 @@ public function listen(): void
}

/**
* @param ConsumerMessage $message
* @param ConsumerMessageInterface $message
* @return void
*
* @throws MessageConsumerNotHandledException
*/
private function handleMessage(ConsumerMessage $message): void
private function handleMessage(ConsumerMessageInterface $message): void
{
$this->consumerMessageHandler->handle($message);
$this->consumer->commit($message);
Expand Down
32 changes: 24 additions & 8 deletions src/Consumers/Messages/ConsumerMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,43 @@

namespace Micromus\KafkaBus\Consumers\Messages;

class ConsumerMessage
use Micromus\KafkaBus\Interfaces\Consumers\Messages\ConsumerMessageInterface;
use RdKafka\Message;

final class ConsumerMessage implements ConsumerMessageInterface
{
public function __construct(
public string $payload,
public array $headers,
public readonly ConsumerMeta $meta
protected Message $message
) {
}

public function msgId(): string
{
return "{$this->message->partition}-{$this->message->offset}";
}

public function topicName(): string
{
return $this->meta->message->topic_name;
return $this->message->topic_name;
}

public function key(): ?string
{
return $this->meta->message->key;
return $this->message->key;
}

public function msgId(): string
public function payload(): string
{
return $this->message->payload;
}

public function headers(): array
{
return $this->message->headers;
}

public function original(): Message
{
return "{$this->meta->message->partition}-{$this->meta->message->offset}";
return $this->message;
}
}
9 changes: 3 additions & 6 deletions src/Consumers/Messages/ConsumerMessageConverter.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@

namespace Micromus\KafkaBus\Consumers\Messages;

use Micromus\KafkaBus\Interfaces\Consumers\Messages\ConsumerMessageInterface;
use RdKafka\Message;

class ConsumerMessageConverter
{
public function fromKafka(Message $message): ConsumerMessage
public function fromKafka(Message $message): ConsumerMessageInterface
{
return new ConsumerMessage(
$message->payload,
$message->headers,
new ConsumerMeta($message)
);
return new ConsumerMessage($message);
}
}
5 changes: 3 additions & 2 deletions src/Consumers/Messages/ConsumerMessageHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Micromus\KafkaBus\Consumers\Router\ConsumerRouter;
use Micromus\KafkaBus\Exceptions\Consumers\MessageConsumerNotHandledException;
use Micromus\KafkaBus\Interfaces\Consumers\Messages\ConsumerMessageHandlerInterface;
use Micromus\KafkaBus\Interfaces\Consumers\Messages\ConsumerMessageInterface;
use Micromus\KafkaBus\Interfaces\Messages\MessagePipelineInterface;
use Throwable;

Expand All @@ -21,7 +22,7 @@ public function topics(): array
return $this->consumerRouter->topics();
}

public function handle(ConsumerMessage $message): void
public function handle(ConsumerMessageInterface $message): void
{
$this->messagePipeline
->then($message, $this->handleMessage(...));
Expand All @@ -30,7 +31,7 @@ public function handle(ConsumerMessage $message): void
/**
* @throws MessageConsumerNotHandledException
*/
protected function handleMessage(ConsumerMessage $message): ConsumerMessage
protected function handleMessage(ConsumerMessageInterface $message): ConsumerMessageInterface
{
try {
$this->consumerRouter
Expand Down
14 changes: 14 additions & 0 deletions src/Consumers/Messages/NativeMessageFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

namespace Micromus\KafkaBus\Consumers\Messages;

use Micromus\KafkaBus\Interfaces\Consumers\Messages\ConsumerMessageInterface;
use Micromus\KafkaBus\Interfaces\Consumers\Messages\MessageFactoryInterface;

class NativeMessageFactory implements MessageFactoryInterface
{
public function fromKafka(ConsumerMessageInterface $message): ConsumerMessageInterface
{
return $message;
}
}
4 changes: 2 additions & 2 deletions src/Consumers/Router/ConsumerRouter.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace Micromus\KafkaBus\Consumers\Router;

use Micromus\KafkaBus\Consumers\Messages\ConsumerMessage;
use Micromus\KafkaBus\Interfaces\Consumers\Messages\ConsumerMessageInterface;
use Micromus\KafkaBus\Interfaces\ResolverInterface;
use Micromus\KafkaBus\Exceptions\Consumers\RouteConsumerException;

Expand All @@ -24,7 +24,7 @@ public function topics(): array
return $this->routes->topics();
}

public function handle(ConsumerMessage $consumerMessage): void
public function handle(ConsumerMessageInterface $consumerMessage): void
{
$executor = $this->getOrCreateExecutor($consumerMessage->topicName());
$executor->execute($consumerMessage);
Expand Down
8 changes: 4 additions & 4 deletions src/Consumers/Router/Executor.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

namespace Micromus\KafkaBus\Consumers\Router;

use Micromus\KafkaBus\Consumers\Messages\ConsumerMessage;
use Micromus\KafkaBus\Interfaces\Messages\MessageFactoryInterface;
use Micromus\KafkaBus\Interfaces\Consumers\Messages\ConsumerMessageInterface;
use Micromus\KafkaBus\Interfaces\Consumers\Messages\MessageFactoryInterface;

class Executor
{
Expand All @@ -13,13 +13,13 @@ public function __construct(
) {
}

public function execute(ConsumerMessage $message): void
public function execute(ConsumerMessageInterface $message): void
{
$this->handler
->execute($this->map($message));
}

private function map(ConsumerMessage $message): mixed
private function map(ConsumerMessageInterface $message): mixed
{
return $this->factory
->fromKafka($message);
Expand Down
2 changes: 1 addition & 1 deletion src/Consumers/Router/MessageFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace Micromus\KafkaBus\Consumers\Router;

use Micromus\KafkaBus\Messages\NativeMessageFactory;
use Micromus\KafkaBus\Consumers\Messages\NativeMessageFactory;

#[\Attribute]
final readonly class MessageFactory
Expand Down
4 changes: 2 additions & 2 deletions src/Consumers/Router/MessageFactoryClassExtractor.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

namespace Micromus\KafkaBus\Consumers\Router;

use Micromus\KafkaBus\Interfaces\Messages\MessageFactoryInterface;
use Micromus\KafkaBus\Consumers\Messages\NativeMessageFactory;
use Micromus\KafkaBus\Interfaces\Consumers\Messages\MessageFactoryInterface;
use Micromus\KafkaBus\Interfaces\ResolverInterface;
use Micromus\KafkaBus\Messages\NativeMessageFactory;

final class MessageFactoryClassExtractor
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
namespace Micromus\KafkaBus\Exceptions\Consumers;

use Exception;
use Micromus\KafkaBus\Consumers\Messages\ConsumerMessage;
use Micromus\KafkaBus\Interfaces\Consumers\Messages\ConsumerMessageInterface;
use Throwable;

class MessageConsumerNotHandledException extends Exception
{
public function __construct(public readonly ConsumerMessage $consumerMessage, ?Throwable $previous = null)
public function __construct(public readonly ConsumerMessageInterface $consumerMessage, ?Throwable $previous = null)
{
$message = "Message #{$this->consumerMessage->msgId()} from ".
"{$this->consumerMessage->topicName()} not handled";
Expand Down
6 changes: 3 additions & 3 deletions src/Interfaces/Bus/ThreadInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
namespace Micromus\KafkaBus\Interfaces\Bus;

use Micromus\KafkaBus\Bus\Listeners\Listener;
use Micromus\KafkaBus\Interfaces\Messages\MessageInterface;
use Micromus\KafkaBus\Exceptions\Producers\RouteProducerException;
use Micromus\KafkaBus\Interfaces\Producers\Messages\ProducerMessageInterface;

interface ThreadInterface
{
/**
* @throws RouteProducerException
*/
public function publish(MessageInterface $message): void;
public function publish(ProducerMessageInterface $message): void;

/**
* @param MessageInterface[] $messages
* @param ProducerMessageInterface[] $messages
*
* @throws RouteProducerException
*/
Expand Down
6 changes: 3 additions & 3 deletions src/Interfaces/Consumers/ConsumerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

namespace Micromus\KafkaBus\Interfaces\Consumers;

use Micromus\KafkaBus\Consumers\Messages\ConsumerMessage;
use Micromus\KafkaBus\Exceptions\Consumers\ConsumerException;
use Micromus\KafkaBus\Interfaces\Consumers\Messages\ConsumerMessageInterface;
use Micromus\KafkaBus\Testing\Exceptions\KafkaMessagesEndedException;

interface ConsumerInterface
Expand All @@ -12,7 +12,7 @@ interface ConsumerInterface
* @throws KafkaMessagesEndedException
* @throws ConsumerException
*/
public function getMessage(): ConsumerMessage;
public function getMessage(): ConsumerMessageInterface;

public function commit(ConsumerMessage $consumerMessage): void;
public function commit(ConsumerMessageInterface $consumerMessage): void;
}
Loading

0 comments on commit d4f1285

Please sign in to comment.