Skip to content

Commit

Permalink
Implement channel_max 0 = unlimited
Browse files Browse the repository at this point in the history
  • Loading branch information
baelter committed Feb 13, 2025
1 parent e6af2c0 commit 40b626d
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions src/lavinmq/amqp/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ module LavinMQ

@connected_at = RoughTime.unix_ms
@channels = Hash(UInt16, Client::Channel).new
@actual_channel_max : UInt16
@exclusive_queues = Array(Queue).new
@heartbeat_interval_ms : Int64?
@local_address : Socket::IPAddress
Expand All @@ -45,7 +46,12 @@ module LavinMQ
@local_address = @connection_info.dst

@max_frame_size = tune_ok.frame_max

# keep 0 = unlimited in ui/api for consistency with the spec
@channel_max = tune_ok.channel_max
# use @actual_channel_max for limit check
@actual_channel_max = @channel_max.zero? ? UInt16::MAX : @channel_max

@heartbeat_timeout = tune_ok.heartbeat
@heartbeat_interval_ms = tune_ok.heartbeat.zero? ? nil : ((tune_ok.heartbeat / 2) * 1000).to_i64
@auth_mechanism = start_ok.mechanism
Expand Down Expand Up @@ -318,9 +324,9 @@ module LavinMQ
private def open_channel(frame)
if @channels.has_key? frame.channel
close_connection(frame, ConnectionReplyCode::CHANNEL_ERROR, "second 'channel.open' seen")
elsif @channels.size >= @channel_max
elsif @channels.size >= @actual_channel_max
reply_text = "number of channels opened (#{@channels.size})" \
" has reached the negotiated channel_max (#{@channel_max})"
" has reached the negotiated channel_max (#{@actual_channel_max})"
close_connection(frame, ConnectionReplyCode::NOT_ALLOWED, reply_text)
else
@channels[frame.channel] = AMQP::Channel.new(self, frame.channel)
Expand Down

0 comments on commit 40b626d

Please sign in to comment.