From 5a37697f53ca56e30a7d6e4a95a7fd7bfc7ce32e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Tue, 28 Jan 2025 10:50:28 +0100 Subject: [PATCH] Queue should expire TTL milliseconds after last consumer left (#924) This fixes a bug where if a queue was declared with x-expires X and a consumer disconnected after X+1 the queue was deleted immediately. --- spec/policies_spec.cr | 13 ++++++------- spec/queue_spec.cr | 17 +++++++++++++++++ src/lavinmq/amqp/queue/queue.cr | 27 +++++---------------------- 3 files changed, 28 insertions(+), 29 deletions(-) diff --git a/spec/policies_spec.cr b/spec/policies_spec.cr index 1c8e80be35..d526373e78 100644 --- a/spec/policies_spec.cr +++ b/spec/policies_spec.cr @@ -124,18 +124,17 @@ describe LavinMQ::VHost do end end - it "should refresh queue last_get_time when expire policy applied" do + it "should update queue expiration" do with_amqp_server do |s| - defs = {"expires" => JSON::Any.new(50_i64)} of String => JSON::Any with_channel(s) do |ch| - ch.queue("qttl") + ch.queue("qttl", args: AMQP::Client::Arguments.new({"x-expires" => 100})) queue = s.vhosts["/"].queues["qttl"] - first = queue.last_get_time sleep 0.1.seconds - s.vhosts["/"].add_policy("qttl", "^.*$", "all", defs, 12_i8) + s.vhosts["/"].add_policy("qttl", "^.*$", "all", {"expires" => JSON::Any.new(200)}, 2_i8) sleep 0.1.seconds - last = queue.last_get_time - last.should be > first + queue.closed?.should be_false + sleep 0.2.seconds + queue.closed?.should be_true end end end diff --git a/spec/queue_spec.cr b/spec/queue_spec.cr index 96fdea40d0..0f2487f2a7 100644 --- a/spec/queue_spec.cr +++ b/spec/queue_spec.cr @@ -2,6 +2,23 @@ require "./spec_helper" require "./../src/lavinmq/amqp/queue" describe LavinMQ::AMQP::Queue do + it "should expire it self after last consumer disconnects" do + with_amqp_server do |s| + with_channel(s) do |ch| + q = ch.queue("qexpires", args: AMQP::Client::Arguments.new({"x-expires" => 100})) + queue = s.vhosts["/"].queues["qexpires"] + tag = q.subscribe { } + sleep 100.milliseconds + queue.closed?.should be_false + ch.basic_cancel(tag) + sleep 100.milliseconds + queue.closed?.should be_false + sleep 100.milliseconds + queue.closed?.should be_true + end + end + end + it "Should dead letter expired messages" do with_amqp_server do |s| with_channel(s) do |ch| diff --git a/src/lavinmq/amqp/queue/queue.cr b/src/lavinmq/amqp/queue/queue.cr index 2f826d0e2e..0a989de779 100644 --- a/src/lavinmq/amqp/queue/queue.cr +++ b/src/lavinmq/amqp/queue/queue.cr @@ -50,16 +50,17 @@ module LavinMQ::AMQP getter consumer_timeout : UInt64? = Config.instance.consumer_timeout @consumers_empty_change = ::Channel(Bool).new + @queue_expiration_ttl_change = ::Channel(Nil).new private def queue_expire_loop loop do - break unless @expires - if @consumers.empty? && (ttl = queue_expiration_ttl) + break unless ttl = @expires + if @consumers.empty? @log.debug { "Queue expires in #{ttl}" } select when @queue_expiration_ttl_change.receive when @consumers_empty_change.receive - when timeout ttl + when timeout ttl.milliseconds expire_queue close break @@ -115,7 +116,7 @@ module LavinMQ::AMQP {"ack", "deliver", "deliver_get", "confirm", "get", "get_no_ack", "publish", "redeliver", "reject", "return_unroutable", "dedup"}, {"message_count", "unacked_count"}) - getter name, arguments, vhost, consumers, last_get_time + getter name, arguments, vhost, consumers getter? auto_delete, exclusive getter policy : Policy? getter operator_policy : OperatorPolicy? @@ -133,7 +134,6 @@ module LavinMQ::AMQP def initialize(@vhost : VHost, @name : String, @exclusive = false, @auto_delete = false, @arguments = AMQP::Table.new) - @last_get_time = RoughTime.monotonic @data_dir = make_data_dir @metadata = ::Log::Metadata.new(nil, {queue: @name, vhost: @vhost.name}) @log = Logger.new(Log, @metadata) @@ -186,7 +186,6 @@ module LavinMQ::AMQP end def redeclare - @last_get_time = RoughTime.monotonic @queue_expiration_ttl_change.try_send? nil end @@ -217,7 +216,6 @@ module LavinMQ::AMQP when "expires" unless @expires.try &.< v.as_i64 @expires = v.as_i64 - @last_get_time = RoughTime.monotonic spawn queue_expire_loop, name: "Queue#queue_expire_loop #{@vhost.name}/#{@name}" @queue_expiration_ttl_change.try_send? nil end @@ -339,19 +337,6 @@ module LavinMQ::AMQP File.delete(File.join(@data_dir, ".paused")) end - @queue_expiration_ttl_change = ::Channel(Nil).new - - private def queue_expiration_ttl : Time::Span? - if e = @expires - expires_in = @last_get_time + e.milliseconds - RoughTime.monotonic - if expires_in > Time::Span.zero - expires_in - else - Time::Span.zero - end - end - end - def close : Bool return false if @closed @closed = true @@ -688,7 +673,6 @@ module LavinMQ::AMQP def basic_get(no_ack, force = false, & : Envelope -> Nil) : Bool return false if !@state.running? && (@state.paused? && !force) - @last_get_time = RoughTime.monotonic @queue_expiration_ttl_change.try_send? nil @get_count += 1 @deliver_get_count += 1 @@ -833,7 +817,6 @@ module LavinMQ::AMQP def add_consumer(consumer : Client::Channel::Consumer) return if @closed - @last_get_time = RoughTime.monotonic @consumers_lock.synchronize do was_empty = @consumers.empty? @consumers << consumer