Skip to content

Commit

Permalink
Merge pull request #3149 from Dispell-AI/calini/add-amqp09-queue-exch…
Browse files Browse the repository at this point in the history
…ange-arguments

Feat: Add optional queue and exchange arguments in amqp09 connector
  • Loading branch information
rockwotj authored Jan 28, 2025
2 parents 31f11c3 + 80e74c5 commit 8638c31
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 31 deletions.
66 changes: 66 additions & 0 deletions docs/modules/components/pages/inputs/amqp_0_9.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ input:
enabled: false
durable: true
auto_delete: false
arguments: {} # No default (optional)
bindings_declare: [] # No default (optional)
consumer_tag: ""
auto_ack: false
Expand Down Expand Up @@ -172,6 +173,71 @@ Whether the declared queue will auto-delete.
*Default*: `false`
=== `queue_declare.arguments`
Optional arguments specific to the server's implementation of the queue that can be sent for queue types which require extra parameters.
== Arguments
- x-queue-type
Is used to declare quorum and stream queues. Accepted values are: 'classic' (default), 'quorum', 'stream', 'drop-head', 'reject-publish' and 'reject-publish-dlx'.
- x-max-length
Maximum number of messages, is a non-negative integer value.
- x-max-length-bytes
Maximum number of messages, is a non-negative integer value.
- x-overflow
Sets overflow behaviour. Possible values are: 'drop-head' (default), 'reject-publish', 'reject-publish-dlx'.
- x-message-ttl
TTL period in milliseconds. Must be a string representation of the number.
- x-expires
Expiration policy, describes the expiration period in milliseconds. Must be a positive integer.
- x-max-age
Controls the retention of a stream. Must be a strin, valid units: (Y, M, D, h, m, s) e.g. '7D' for a week.
- x-stream-max-segment-size-bytes
Controls the size of the segment files on disk (default 500000000). Must be a positive integer.
- x-queue-version
declares the Classic Queue version to use. Expects an integer, either 1 or 2.
- x-consumer-timeout
Integer specified in milliseconds.
- x-single-active-consumer
Enables Single Active Consumer, Expects a Boolean.
See https://github.com/rabbitmq/amqp091-go/blob/b3d409fe92c34bea04d8123a136384c85e8dc431/types.go#L282-L362 for more information on available arguments.
*Type*: `object`
```yml
# Examples
arguments:
x-max-length: 1000
x-max-length-bytes: 4096
x-queue-type: quorum
```
=== `bindings_declare`
Allows you to passively declare bindings for the target queue.
Expand Down
16 changes: 16 additions & 0 deletions docs/modules/components/pages/outputs/amqp_0_9.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ output:
enabled: false
type: direct
durable: true
arguments: {} # No default (optional)
key: ""
type: ""
content_type: application/octet-stream
Expand Down Expand Up @@ -176,6 +177,21 @@ Whether the exchange should be durable.
*Default*: `true`
=== `exchange_declare.arguments`
Optional arguments specific to the server's implementation of the exchange that can be sent for exchange types which require extra parameters.
*Type*: `object`
```yml
# Examples
arguments:
alternate-exchange: my-ae
```
=== `key`
The binding key to set for each message.
Expand Down
44 changes: 23 additions & 21 deletions internal/impl/amqp09/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
queueDeclareEnabledField = "enabled"
queueDeclareDurableField = "durable"
queueDeclareAutoDeleteField = "auto_delete"
queueDeclareArgumentsField = "arguments"
bindingsDeclareField = "bindings_declare"
bindingsDeclareExchangeField = "exchange"
bindingsDeclareKeyField = "key"
Expand All @@ -35,25 +36,26 @@ const (
prefetchSizeField = "prefetch_size"

// Output
exchangeField = "exchange"
exchangeDeclareField = "exchange_declare"
exchangeDeclareEnabledField = "enabled"
exchangeDeclareTypeField = "type"
exchangeDeclareDurableField = "durable"
keyField = "key"
typeField = "type"
contentTypeField = "content_type"
contentEncodingField = "content_encoding"
metadataFilterField = "metadata"
priorityField = "priority"
persistentField = "persistent"
mandatoryField = "mandatory"
immediateField = "immediate"
timeoutField = "timeout"
correlationIDField = "correlation_id"
replyToField = "reply_to"
expirationField = "expiration"
messageIDField = "message_id"
userIDField = "user_id"
appIDField = "app_id"
exchangeField = "exchange"
exchangeDeclareField = "exchange_declare"
exchangeDeclareEnabledField = "enabled"
exchangeDeclareTypeField = "type"
exchangeDeclareDurableField = "durable"
exchangeDeclareArgumentsField = "arguments"
keyField = "key"
typeField = "type"
contentTypeField = "content_type"
contentEncodingField = "content_encoding"
metadataFilterField = "metadata"
priorityField = "priority"
persistentField = "persistent"
mandatoryField = "mandatory"
immediateField = "immediate"
timeoutField = "timeout"
correlationIDField = "correlation_id"
replyToField = "reply_to"
expirationField = "expiration"
messageIDField = "message_id"
userIDField = "user_id"
appIDField = "app_id"
)
89 changes: 80 additions & 9 deletions internal/impl/amqp09/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,64 @@ You can access these metadata fields using xref:configuration:interpolation.adoc
service.NewBoolField(queueDeclareAutoDeleteField).
Description("Whether the declared queue will auto-delete.").
Default(false),
service.NewStringMapField(queueDeclareArgumentsField).
Description(`
Optional arguments specific to the server's implementation of the queue that can be sent for queue types which require extra parameters.
== Arguments
- x-queue-type
Is used to declare quorum and stream queues. Accepted values are: 'classic' (default), 'quorum', 'stream', 'drop-head', 'reject-publish' and 'reject-publish-dlx'.
- x-max-length
Maximum number of messages, is a non-negative integer value.
- x-max-length-bytes
Maximum number of messages, is a non-negative integer value.
- x-overflow
Sets overflow behaviour. Possible values are: 'drop-head' (default), 'reject-publish', 'reject-publish-dlx'.
- x-message-ttl
TTL period in milliseconds. Must be a string representation of the number.
- x-expires
Expiration policy, describes the expiration period in milliseconds. Must be a positive integer.
- x-max-age
Controls the retention of a stream. Must be a strin, valid units: (Y, M, D, h, m, s) e.g. '7D' for a week.
- x-stream-max-segment-size-bytes
Controls the size of the segment files on disk (default 500000000). Must be a positive integer.
- x-queue-version
declares the Classic Queue version to use. Expects an integer, either 1 or 2.
- x-consumer-timeout
Integer specified in milliseconds.
- x-single-active-consumer
Enables Single Active Consumer, Expects a Boolean.
See https://github.com/rabbitmq/amqp091-go/blob/b3d409fe92c34bea04d8123a136384c85e8dc431/types.go#L282-L362 for more information on available arguments.`).
Advanced().
Optional().
Example(map[string]any{
"x-queue-type": "quorum",
"x-max-length": 1000,
"x-max-length-bytes": 4096,
}),
).
Description(`Allows you to passively declare the target queue. If the queue already exists then the declaration passively verifies that they match the target fields.`).
Advanced().
Expand Down Expand Up @@ -161,9 +219,10 @@ type amqp09Reader struct {

nackRejectPattens []*regexp.Regexp

queueDeclare bool
queueDurable bool
queueAutoDelete bool
queueDeclare bool
queueDurable bool
queueAutoDelete bool
queueDeclareArgs amqp.Table

bindingDeclare []amqp09BindingDeclare

Expand Down Expand Up @@ -231,6 +290,18 @@ func amqp09ReaderFromParsed(conf *service.ParsedConfig, mgr *service.Resources)
a.queueDeclare, _ = qdConf.FieldBool(queueDeclareEnabledField)
a.queueDurable, _ = qdConf.FieldBool(queueDeclareDurableField)
a.queueAutoDelete, _ = qdConf.FieldBool(queueDeclareAutoDeleteField)

a.queueDeclareArgs = amqp.Table{}

if qdConf.Contains(queueDeclareArgumentsField) {
args, err := qdConf.FieldStringMap(queueDeclareArgumentsField)
if err != nil {
return nil, err
}
for key, value := range args {
a.queueDeclareArgs[key] = value
}
}
}

if conf.Contains(bindingsDeclareField) {
Expand Down Expand Up @@ -279,12 +350,12 @@ func (a *amqp09Reader) Connect(ctx context.Context) (err error) {

if a.queueDeclare {
if _, err = amqpChan.QueueDeclare(
a.queue, // name of the queue
a.queueDurable, // durable
a.queueAutoDelete, // delete when unused
false, // exclusive
false, // noWait
nil, // arguments
a.queue, // name of the queue
a.queueDurable, // durable
a.queueAutoDelete, // delete when unused
false, // exclusive
false, // noWait
a.queueDeclareArgs, // arguments
); err != nil {
_ = amqpChan.Close()
_ = conn.Close()
Expand Down
20 changes: 19 additions & 1 deletion internal/impl/amqp09/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ The fields 'key', 'exchange' and 'type' can be dynamically set using xref:config
service.NewBoolField(exchangeDeclareDurableField).
Description("Whether the exchange should be durable.").
Default(true),
service.NewStringMapField(exchangeDeclareArgumentsField).
Description("Optional arguments specific to the server's implementation of the exchange that can be sent for exchange types which require extra parameters.").
Advanced().
Optional().
Example(map[string]any{
"alternate-exchange": "my-ae",
}),
).
Description(`Optionally declare the target exchange (passive).`).
Advanced().
Expand Down Expand Up @@ -176,6 +183,7 @@ type amqp09Writer struct {
exchangeDeclare bool
exchangeDeclareType string
exchangeDeclareDurable bool
exchangeDeclareArgs amqp.Table

log *service.Logger

Expand Down Expand Up @@ -240,6 +248,16 @@ func amqp09WriterFromParsed(conf *service.ParsedConfig, mgr *service.Resources)
if a.exchangeDeclareDurable, err = edConf.FieldBool(exchangeDeclareDurableField); err != nil {
return nil, err
}

if edConf.Contains(exchangeDeclareArgumentsField) {
args, err := edConf.FieldStringMap(exchangeDeclareArgumentsField)
if err != nil {
return nil, err
}
for key, value := range args {
a.exchangeDeclareArgs[key] = value
}
}
}

if a.key, err = conf.FieldInterpolatedString(keyField); err != nil {
Expand Down Expand Up @@ -360,7 +378,7 @@ func (a *amqp09Writer) declareExchange(exchange string) error {
false, // delete when complete
false, // internal
false, // noWait
nil, // arguments
a.exchangeDeclareArgs, // arguments
); err != nil {
return fmt.Errorf("amqp failed to declare exchange: %w", err)
}
Expand Down

0 comments on commit 8638c31

Please sign in to comment.