Skip to content

Commit

Permalink
Refactor (and rename) Exchange#bindings to remove memory allocations (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
spuun authored Feb 13, 2025
1 parent 66a48f7 commit a0bd78d
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 40 deletions.
6 changes: 2 additions & 4 deletions src/lavinmq/amqp/exchange/consistent_hash.cr
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,10 @@ module LavinMQ
true
end

protected def bindings(routing_key, headers) : Iterator(Destination)
def each_destination(routing_key : String, headers : AMQP::Table?, & : LavinMQ::Destination ->)
key = hash_key(routing_key, headers)
if d = @hasher.get(key)
{d}.each
else
Iterator(Destination).empty
yield d
end
end

Expand Down
6 changes: 2 additions & 4 deletions src/lavinmq/amqp/exchange/default.cr
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@ module LavinMQ
Iterator(BindingDetails).empty
end

protected def bindings(routing_key, headers) : Iterator(LavinMQ::Destination)
protected def each_destination(routing_key : String, headers : AMQP::Table?, & : LavinMQ::Destination ->)
if q = @vhost.queues[routing_key]?
Tuple(LavinMQ::Destination).new(q).each
else
Iterator(LavinMQ::Destination).empty
yield q
end
end

Expand Down
6 changes: 4 additions & 2 deletions src/lavinmq/amqp/exchange/direct.cr
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ module LavinMQ
true
end

protected def bindings(routing_key, headers) : Iterator(Destination)
@bindings[routing_key].each
protected def each_destination(routing_key : String, headers : AMQP::Table?, & : LavinMQ::Destination ->)
@bindings[routing_key].each do |destination|
yield destination
end
end
end
end
Expand Down
3 changes: 2 additions & 1 deletion src/lavinmq/amqp/exchange/exchange.cr
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ module LavinMQ
abstract def bind(destination : AMQP::Destination, routing_key : String, headers : AMQP::Table?)
abstract def unbind(destination : AMQP::Destination, routing_key : String, headers : AMQP::Table?)
abstract def bindings_details : Iterator(BindingDetails)
abstract def each_destination(routing_key : String, headers : AMQP::Table?, & : LavinMQ::Destination ->)

def publish(msg : Message, immediate : Bool,
queues : Set(LavinMQ::Queue) = Set(LavinMQ::Queue).new,
Expand Down Expand Up @@ -232,7 +233,7 @@ module LavinMQ
queues : Set(LavinMQ::Queue) = Set(LavinMQ::Queue).new,
exchanges : Set(LavinMQ::Exchange) = Set(LavinMQ::Exchange).new) : Nil
return unless exchanges.add? self
bindings(routing_key, headers).each do |d|
each_destination(routing_key, headers) do |d|
case d
in LavinMQ::Queue
queues.add(d)
Expand Down
6 changes: 4 additions & 2 deletions src/lavinmq/amqp/exchange/fanout.cr
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ module LavinMQ
true
end

protected def bindings(routing_key, headers) : Iterator(Destination)
@bindings.each
protected def each_destination(routing_key : String, headers : AMQP::Table?, & : LavinMQ::Destination ->)
@bindings.each do |destination|
yield destination
end
end
end
end
Expand Down
29 changes: 16 additions & 13 deletions src/lavinmq/amqp/exchange/headers.cr
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ module LavinMQ
true
end

protected def bindings(routing_key, headers) : Iterator(Destination)
matches(headers).each
end

private def validate!(headers) : Nil
if h = headers
if match = h["x-match"]?
Expand All @@ -65,19 +61,26 @@ module LavinMQ
end
end

private def matches(headers) : Iterator(Destination)
@bindings.each.select do |args, _|
protected def each_destination(routing_key : String, headers : AMQP::Table?, & : LavinMQ::Destination ->)
@bindings.each do |args, destinations|
if headers.nil? || headers.empty?
args.empty?
next unless args.empty?
destinations.each do |destination|
yield destination
end
else
case args["x-match"]?
when "any"
args.any? { |k, v| !k.starts_with?("x-") && (headers.has_key?(k) && headers[k] == v) }
else
args.all? { |k, v| k.starts_with?("x-") || (headers.has_key?(k) && headers[k] == v) }
is_match = case args["x-match"]?
when "any"
args.any? { |k, v| !k.starts_with?("x-") && (headers.has_key?(k) && headers[k] == v) }
else
args.all? { |k, v| k.starts_with?("x-") || (headers.has_key?(k) && headers[k] == v) }
end
next unless is_match
destinations.each do |destination|
yield destination
end
end
end.flat_map { |_, v| v.each }
end
end
end
end
Expand Down
26 changes: 14 additions & 12 deletions src/lavinmq/amqp/exchange/topic.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ require "./exchange"
module LavinMQ
module AMQP
class TopicExchange < Exchange
@bindings = Hash(Array(String), Set(LavinMQ::Destination)).new do |h, k|
h[k] = Set(LavinMQ::Destination).new
@bindings = Hash(Array(String), Set(AMQP::Destination)).new do |h, k|
h[k] = Set(AMQP::Destination).new
end

def type : String
Expand Down Expand Up @@ -42,28 +42,26 @@ module LavinMQ
true
end

protected def bindings(routing_key, headers) : Iterator(LavinMQ::Destination)
select_matches(routing_key).each
end

# ameba:disable Metrics/CyclomaticComplexity
private def select_matches(routing_key) : Iterator(LavinMQ::Destination)
protected def each_destination(routing_key : String, headers : AMQP::Table?, & : LavinMQ::Destination ->)
binding_keys = @bindings

return Iterator(LavinMQ::Destination).empty if binding_keys.empty?
return if binding_keys.empty?

# optimize the case where the only binding key is '#'
if binding_keys.size == 1
bk, qs = binding_keys.first
if bk.size == 1
if bk.first == "#"
return qs.each
qs.each do |q|
yield q
end
end
end
end

rk_parts = routing_key.split(".")
binding_keys.each.select do |bks, _|
binding_keys.each do |bks, destinations|
ok = false
prev_hash = false
size = bks.size # binding keys can max be 256 chars long anyway
Expand Down Expand Up @@ -120,8 +118,12 @@ module LavinMQ
break unless ok
i += 1
end
ok
end.flat_map { |_, v| v.each }
if ok
destinations.each do |destination|
yield destination
end
end
end
end
end
end
Expand Down
3 changes: 1 addition & 2 deletions src/lavinmq/mqtt/exchange.cr
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ module LavinMQ
end

# Only here to make superclass happy
protected def bindings(routing_key, headers) : Iterator(LavinMQ::Destination)
Iterator(LavinMQ::Destination).empty
protected def each_destination(routing_key : String, headers : AMQP::Table?, & : LavinMQ::Destination ->)
end

def bind(destination : MQTT::Session, routing_key : String, headers = nil) : Bool
Expand Down

0 comments on commit a0bd78d

Please sign in to comment.