Skip to content

Commit

Permalink
Add multiple bindings support
Browse files Browse the repository at this point in the history
  • Loading branch information
Oleg Namaka committed Oct 30, 2024
1 parent 10a813f commit 906b9de
Showing 1 changed file with 42 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,21 @@ class Connection
];

private const AVAILABLE_QUEUE_OPTIONS = [
'flags',
'arguments',
];

private const NEW_QUEUE_OPTIONS = [
'bindings',
];

private const DEPRECATED_BINDING_KEYS = [
'binding_keys',
'binding_arguments',
'flags',
];

private const AVAILABLE_BINDINGS_OPTIONS = [
'key',
'arguments',
];

Expand Down Expand Up @@ -145,8 +157,11 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
* * connect_timeout: Connection timeout. Note: 0 or greater seconds. May be fractional.
* * confirm_timeout: Timeout in seconds for confirmation, if none specified transport will not wait for message confirmation. Note: 0 or greater seconds. May be fractional.
* * queues[name]: An array of queues, keyed by the name
* * binding_keys: The binding keys (if any) to bind to this queue
* * binding_arguments: Arguments to be used while binding the queue.
* * binding_keys: The binding keys (if any) to bind to this queue (Usage is deprecated. See 'bindings')
* * binding_arguments: Arguments to be used while binding the queue. (Usage is deprecated. See 'bindings')
* * bindings[name]: An array of bindings for this queue, keyed by the name
* * key: The binding key (if any) to bind to this queue
* * arguments: An array of arguments to be used while binding the queue.
* * flags: Queue flags (Default: AMQP_DURABLE)
* * arguments: Extra arguments
* * exchange:
Expand Down Expand Up @@ -261,9 +276,24 @@ private static function validateOptions(array $options): void
continue;
}

if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS))) {
if (0 < \count($deprecatedQueueOptions = array_intersect(array_keys($queue), self::DEPRECATED_BINDING_KEYS))) {
trigger_deprecation('symfony/messenger', '6.3', 'Deprecated queue option(s) "%s" passed to the AMQP Messenger transport. The "%s" option(s) should be used rather than "%s".', implode('", "', $deprecatedQueueOptions), implode('", ', self::NEW_QUEUE_OPTIONS), implode('", ', self::DEPRECATED_BINDING_KEYS));
if (0 < \count($newQueueOptions = array_intersect(array_keys($queue), self::NEW_QUEUE_OPTIONS))) {
throw new LogicException(sprintf('New "%s" and deprecated "%s" option(s) passed to the AMQP Messenger transport', implode('", "', $newQueueOptions), implode('", "', $deprecatedQueueOptions)));
}
}

if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS, self::NEW_QUEUE_OPTIONS, self::DEPRECATED_BINDING_KEYS))) {
trigger_deprecation('symfony/messenger', '5.1', 'Invalid queue option(s) "%s" passed to the AMQP Messenger transport. Passing invalid queue options is deprecated.', implode('", "', $invalidQueueOptions));
}

if (\is_array($queue['bindings'] ?? false)) {
foreach ($queue['bindings'] as $individualBinding) {
if (0 < \count(array_diff(array_keys($individualBinding), self::AVAILABLE_BINDINGS_OPTIONS))) {
throw new LogicException(sprintf("Valid options for each 'bindings' are: %s", implode(', ', self::AVAILABLE_BINDINGS_OPTIONS)));
}
}
}
}
}

Expand Down Expand Up @@ -478,9 +508,9 @@ public function pull(string $queueName, ?callable $callback): void
$this->queue($queueName)->consume($callback);
}

public function ack(\AMQPEnvelope $message, string $queueName): bool
public function ack(\AMQPEnvelope $message, string $queueName, int $flags = \AMQP_NOPARAM): bool
{
return $this->queue($queueName)->ack($message->getDeliveryTag()) ?? true;
return $this->queue($queueName)->ack($message->getDeliveryTag(), $flags);
}

public function nack(\AMQPEnvelope $message, string $queueName, int $flags = \AMQP_NOPARAM): bool
Expand All @@ -500,6 +530,12 @@ private function setupExchangeAndQueues(): void

foreach ($this->queuesOptions as $queueName => $queueConfig) {
$this->queue($queueName)->declareQueue();
foreach ($queueConfig['bindings'] ?? [] as $binding) {
$this->queue($queueName)->bind($this->exchangeOptions['name'], $binding['key'] ?? null, $binding['arguments'] ?? []);
}
if (isset($queueConfig['bindings']) && empty($queueConfig['binding_keys'])) {
continue;
}
foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) {
$this->queue($queueName)->bind($this->exchangeOptions['name'], $bindingKey, $queueConfig['binding_arguments'] ?? []);
}
Expand Down

0 comments on commit 906b9de

Please sign in to comment.