Skip to content

Commit

Permalink
[add] Добавлены Middleware для ConsumerHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
kEERill committed Dec 10, 2024
1 parent d4f1285 commit 535ff79
Show file tree
Hide file tree
Showing 29 changed files with 309 additions and 182 deletions.
70 changes: 70 additions & 0 deletions examples/bus.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
<?php

use Micromus\KafkaBus\Bus;
use Micromus\KafkaBus\Connections\Registry\ConnectionRegistry;
use Micromus\KafkaBus\Connections\Registry\DriverRegistry;
use Micromus\KafkaBus\Consumers\ConsumerStreamFactory;
use Micromus\KafkaBus\Consumers\Messages\ConsumerMessageHandlerFactory;
use Micromus\KafkaBus\Consumers\Router\ConsumerRouterFactory;
use Micromus\KafkaBus\Pipelines\PipelineFactory;
use Micromus\KafkaBus\Producers\ProducerStreamFactory;
use Micromus\KafkaBus\Support\Resolvers\NativeResolver;
use Micromus\KafkaBus\Testing\Messages\ConsumerHandlerFaker;
use Micromus\KafkaBus\Testing\Messages\ProducerMessageFaker;
use Micromus\KafkaBus\Topics\Topic;
use Micromus\KafkaBus\Topics\TopicRegistry;

$topicRegistry = (new TopicRegistry())
->add(new Topic('production.fact.products.1', 'products'));

$worker = new Bus\Listeners\Workers\Worker(
'default-listener',
(new Bus\Listeners\Workers\WorkerRoutes())
->add(new Bus\Listeners\Workers\Route('products', ConsumerHandlerFaker::class)),
new Bus\Listeners\Workers\Options(additionalOptions: ['group.id' => 'products-microservice'])
);

$listenerRegistry = (new Bus\Listeners\Workers\WorkerRegistry())
->add($worker);

$routes = (new Bus\Publishers\Router\PublisherRoutes())
->add(new Bus\Publishers\Router\Route(ProducerMessageFaker::class, 'products'));

$connectionRegistry = new ConnectionRegistry(
new DriverRegistry(),
[
'default' => [
'driver' => 'kafka',
'options' => [
'metadata.broker.list' => '127.0.0.1:29092',
'log_level' => LOG_DEBUG,
// 'debug' => 'all',
],
],
]
);

return new Bus(
new Bus\ThreadRegistry(
$connectionRegistry,
new Bus\Publishers\PublisherFactory(
new ProducerStreamFactory(new PipelineFactory(new NativeResolver())),
$topicRegistry,
$routes
),
new Bus\Listeners\ListenerFactory(
new ConsumerStreamFactory(
new ConsumerMessageHandlerFactory(
new PipelineFactory(new NativeResolver()),
new ConsumerRouterFactory(
new NativeResolver(),
new PipelineFactory(new NativeResolver()),
$topicRegistry
)
)
),
$listenerRegistry
)
),
'default'
);
1 change: 0 additions & 1 deletion examples/compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ services:

kafka-ui:
image: provectuslabs/kafka-ui
container_name: kafka-ui
ports:
- 8090:8080
restart: always
Expand Down
63 changes: 7 additions & 56 deletions examples/consumer.php
Original file line number Diff line number Diff line change
@@ -1,66 +1,17 @@
<?php

use Micromus\KafkaBus\Bus;
use Micromus\KafkaBus\Connections\Registry\ConnectionRegistry;
use Micromus\KafkaBus\Connections\Registry\DriverRegistry;
use Micromus\KafkaBus\Consumers\ConsumerStreamFactory;
use Micromus\KafkaBus\Consumers\Messages\ConsumerMessageHandlerFactory;
use Micromus\KafkaBus\Consumers\Router\ConsumerRouterFactory;
use Micromus\KafkaBus\Messages\MessagePipelineFactory;
use Micromus\KafkaBus\Producers\ProducerStreamFactory;
use Micromus\KafkaBus\Support\Resolvers\NativeResolver;
use Micromus\KafkaBus\Testing\Messages\ConsumerHandlerFaker;
use Micromus\KafkaBus\Topics\Topic;
use Micromus\KafkaBus\Topics\TopicRegistry;
use Micromus\KafkaBus\Interfaces\Bus\BusInterface;

require '../vendor/autoload.php';

$topicRegistry = (new TopicRegistry())
->add(new Topic('production.fact.products.1', 'products'));
/** @var BusInterface $bus */
$bus = require 'bus.php';

$worker = new Bus\Listeners\Workers\Worker(
'default-listener',
(new Bus\Listeners\Workers\WorkerRoutes())
->add(new Bus\Listeners\Workers\Route('products', ConsumerHandlerFaker::class))
);
pcntl_async_signals(true);

$listenerRegistry = (new Bus\Listeners\Workers\WorkerRegistry())
->add($worker);
$listener = $bus->listener('default-listener');

$connectionRegistry = new ConnectionRegistry(
new DriverRegistry(),
[
'default' => [
'driver' => 'kafka',
'options' => [
'metadata.broker.list' => '127.0.0.1:29092',
'group.id' => 'products-microservice',
'log_level' => LOG_DEBUG,
// 'debug' => 'all',
],
],
]
);
pcntl_signal(SIGINT, fn () => $listener->forceStop());

$bus = new Bus(
new Bus\ThreadRegistry(
$connectionRegistry,
new Bus\Publishers\PublisherFactory(
new ProducerStreamFactory(new MessagePipelineFactory(new NativeResolver())),
$topicRegistry
),
new Bus\Listeners\ListenerFactory(
new ConsumerStreamFactory(
new ConsumerMessageHandlerFactory(
new MessagePipelineFactory(new NativeResolver()),
new ConsumerRouterFactory(new NativeResolver(), $topicRegistry)
)
),
$listenerRegistry
)
),
'default'
);
$listener->listen();

$bus->listener('default-listener')
->listen();
54 changes: 3 additions & 51 deletions examples/producer.php
Original file line number Diff line number Diff line change
@@ -1,59 +1,11 @@
<?php

use Micromus\KafkaBus\Bus;
use Micromus\KafkaBus\Bus\Publishers\Router\PublisherRoutes;
use Micromus\KafkaBus\Connections\Registry\ConnectionRegistry;
use Micromus\KafkaBus\Connections\Registry\DriverRegistry;
use Micromus\KafkaBus\Consumers\ConsumerStreamFactory;
use Micromus\KafkaBus\Consumers\Messages\ConsumerMessageHandlerFactory;
use Micromus\KafkaBus\Consumers\Router\ConsumerRouterFactory;
use Micromus\KafkaBus\Messages\MessagePipelineFactory;
use Micromus\KafkaBus\Producers\ProducerStreamFactory;
use Micromus\KafkaBus\Support\Resolvers\NativeResolver;
use Micromus\KafkaBus\Interfaces\Bus\BusInterface;
use Micromus\KafkaBus\Testing\Messages\ProducerMessageFaker;
use Micromus\KafkaBus\Topics\Topic;
use Micromus\KafkaBus\Topics\TopicRegistry;

require '../vendor/autoload.php';

$topicRegistry = (new TopicRegistry())
->add(new Topic('production.fact.products.1', 'products'));

$routes = (new PublisherRoutes())
->add(new Bus\Publishers\Router\Route(ProducerMessageFaker::class, 'products'));

$connectionRegistry = new ConnectionRegistry(
new DriverRegistry(),
[
'default' => [
'driver' => 'kafka',
'options' => [
'metadata.broker.list' => '127.0.0.1:29092',
'log_level' => LOG_DEBUG,
// 'debug' => 'all',
],
],
]
);

$bus = new Bus(
new Bus\ThreadRegistry(
$connectionRegistry,
new Bus\Publishers\PublisherFactory(
new ProducerStreamFactory(new MessagePipelineFactory(new NativeResolver())),
$topicRegistry,
$routes
),
new Bus\Listeners\ListenerFactory(
new ConsumerStreamFactory(
new ConsumerMessageHandlerFactory(
new MessagePipelineFactory(new NativeResolver()),
new ConsumerRouterFactory(new NativeResolver(), $topicRegistry)
)
)
)
),
'default'
);
/** @var BusInterface $bus */
$bus = require 'bus.php';

$bus->publish(new ProducerMessageFaker('test-message', ['foo' => 'bar']));
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
<?php

namespace Micromus\KafkaBus\Consumers\Router;
namespace Micromus\KafkaBus\Consumers\Attributes;

use Attribute;
use Micromus\KafkaBus\Consumers\Messages\NativeMessageFactory;

#[\Attribute]
#[Attribute(Attribute::TARGET_METHOD)]
final readonly class MessageFactory
{
public function __construct(
Expand Down
14 changes: 14 additions & 0 deletions src/Consumers/Attributes/MessageMiddleware.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

namespace Micromus\KafkaBus\Consumers\Attributes;

use Attribute;

#[Attribute(Attribute::TARGET_METHOD | Attribute::IS_REPEATABLE)]
final readonly class MessageMiddleware
{
public function __construct(
public string $middlewareClass
) {
}
}
6 changes: 3 additions & 3 deletions src/Consumers/Messages/ConsumerMessageHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
use Micromus\KafkaBus\Exceptions\Consumers\MessageConsumerNotHandledException;
use Micromus\KafkaBus\Interfaces\Consumers\Messages\ConsumerMessageHandlerInterface;
use Micromus\KafkaBus\Interfaces\Consumers\Messages\ConsumerMessageInterface;
use Micromus\KafkaBus\Interfaces\Messages\MessagePipelineInterface;
use Micromus\KafkaBus\Interfaces\Pipelines\PipelineInterface;
use Throwable;

class ConsumerMessageHandler implements ConsumerMessageHandlerInterface
{
public function __construct(
protected ConsumerRouter $consumerRouter,
protected MessagePipelineInterface $messagePipeline,
protected PipelineInterface $pipeline,
) {
}

Expand All @@ -24,7 +24,7 @@ public function topics(): array

public function handle(ConsumerMessageInterface $message): void
{
$this->messagePipeline
$this->pipeline
->then($message, $this->handleMessage(...));
}

Expand Down
6 changes: 3 additions & 3 deletions src/Consumers/Messages/ConsumerMessageHandlerFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
use Micromus\KafkaBus\Consumers\Router\ConsumerRouterFactory;
use Micromus\KafkaBus\Interfaces\Consumers\Messages\ConsumerMessageHandlerFactoryInterface;
use Micromus\KafkaBus\Interfaces\Consumers\Messages\ConsumerMessageHandlerInterface;
use Micromus\KafkaBus\Interfaces\Messages\MessagePipelineFactoryInterface;
use Micromus\KafkaBus\Interfaces\Pipelines\PipelineFactoryInterface;

class ConsumerMessageHandlerFactory implements ConsumerMessageHandlerFactoryInterface
{
public function __construct(
protected MessagePipelineFactoryInterface $messagePipelineFactory,
protected PipelineFactoryInterface $pipelineFactory,
protected ConsumerRouterFactory $consumerRouterFactory,
) {
}
Expand All @@ -20,7 +20,7 @@ public function create(Worker $worker): ConsumerMessageHandlerInterface
{
return new ConsumerMessageHandler(
$this->consumerRouterFactory->create($worker->routes),
$this->messagePipelineFactory->create($worker->options->middlewares)
$this->pipelineFactory->create($worker->options->middlewares)
);
}
}
39 changes: 35 additions & 4 deletions src/Consumers/Router/ConsumerRouter.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,54 @@

namespace Micromus\KafkaBus\Consumers\Router;

use Micromus\KafkaBus\Consumers\Router\Extractors\MessageFactoryClassExtractor;
use Micromus\KafkaBus\Consumers\Router\Extractors\MiddlewareClassExtractor;
use Micromus\KafkaBus\Exceptions\Consumers\RouteConsumerException;
use Micromus\KafkaBus\Interfaces\Consumers\Messages\ConsumerMessageInterface;
use Micromus\KafkaBus\Interfaces\Pipelines\PipelineFactoryInterface;
use Micromus\KafkaBus\Interfaces\ResolverInterface;
use Micromus\KafkaBus\Exceptions\Consumers\RouteConsumerException;
use ReflectionException;

class ConsumerRouter
{
protected array $executors = [];

protected MessageFactoryClassExtractor $extractor;
protected MessageFactoryClassExtractor $messageFactoryClassExtractor;

protected MiddlewareClassExtractor $middlewareClassExtractor;

public function __construct(
protected ResolverInterface $resolver,
protected PipelineFactoryInterface $pipelineFactory,
protected ConsumerRoutes $routes
) {
$this->extractor = new MessageFactoryClassExtractor($this->resolver);
$this->messageFactoryClassExtractor = new MessageFactoryClassExtractor($this->resolver);
$this->middlewareClassExtractor = new MiddlewareClassExtractor();
}

public function topics(): array
{
return $this->routes->topics();
}

/**
* @param ConsumerMessageInterface $consumerMessage
* @return void
*
* @throws ReflectionException
*/
public function handle(ConsumerMessageInterface $consumerMessage): void
{
$executor = $this->getOrCreateExecutor($consumerMessage->topicName());
$executor->execute($consumerMessage);
}

/**
* @param string $topicName
* @return Executor
*
* @throws ReflectionException
*/
private function getOrCreateExecutor(string $topicName): Executor
{
if (! isset($this->executors[$topicName])) {
Expand All @@ -39,13 +59,24 @@ private function getOrCreateExecutor(string $topicName): Executor
return $this->executors[$topicName];
}

/**
* @param string $topicName
* @return Executor
*
* @throws ReflectionException
*/
protected function makeExecutor(string $topicName): Executor
{
$route = $this->routes->get($topicName)
?? throw new RouteConsumerException("Route for topic [$topicName] not found.");

$handler = $this->resolver->resolve($route->handlerClass);
$middlewares = $this->middlewareClassExtractor->extract($handler);

return new Executor($handler, $this->extractor->extract($handler));
return new Executor(
$handler,
$this->pipelineFactory->create($middlewares),
$this->messageFactoryClassExtractor->extract($handler)
);
}
}
Loading

0 comments on commit 535ff79

Please sign in to comment.