diff --git a/src/lavinmqperf.cr b/src/lavinmqperf.cr index 79b7de42bc..758fac3fd9 100644 --- a/src/lavinmqperf.cr +++ b/src/lavinmqperf.cr @@ -28,6 +28,23 @@ class Perf def run(args = ARGV) @parser.parse(args) end + + protected def rss + if File.exists?("/proc/self/statm") + File.read("/proc/self/statm").split[1].to_i64 * 4096 + elsif ps_rss = `ps -o rss= -p $PPID`.to_i64? + ps_rss * 1024 + else + 0 + end + end + + protected def maximize_fd_limit + _, fd_limit_max = System.file_descriptor_limit + System.file_descriptor_limit = fd_limit_max + fd_limit_current, _ = System.file_descriptor_limit + puts "FD limit: #{fd_limit_current}" + end end class Throughput < Perf @@ -492,22 +509,117 @@ class ConnectionCount < Perf client.host = "127.0.#{Random.rand(UInt8)}.#{Random.rand(UInt8)}" if @random_localhost client end +end - private def rss - if File.exists?("/proc/self/statm") - File.read("/proc/self/statm").split[1].to_i64 * 4096 - elsif ps_rss = `ps -o rss= -p $PPID`.to_i64? - ps_rss * 1024 - else - 0 +class Chaos < Perf + def initialize + super + end + + def run + super + spawn churn_connections_and_pubs + spawn churn_connections_and_acks + sleep + # churn_queues + # churn_exchanges + # churn_vhosts + # churn_bindings + # random_ack_rejects + end + + @i = Atomic(Int64).new(0u64) + + private def churn_connections_and_pubs + done = Channel(Nil).new + loop do + 100.times do + spawn do + AMQP::Client.start(@uri) do |conn| + ch = conn.channel + ch.confirm_select if rand(3).zero? + rand(10_000).times do + sleep rand(0.1) + body = uninitialized UInt8[8] + IO::ByteFormat::SystemEndian.encode(@i.add(1), body.to_slice) + ch.basic_publish(body.to_slice, rand_exchange, rand_routing_key) + end + done.send nil + end + end + end + 100.times do + done.receive + end end end - private def maximize_fd_limit - _, fd_limit_max = System.file_descriptor_limit - System.file_descriptor_limit = fd_limit_max - fd_limit_current, _ = System.file_descriptor_limit - puts "FD limit: #{fd_limit_current}" + private def rand_routing_key : String + Random::DEFAULT.hex(1) + end + + private def rand_queue : String + Random::DEFAULT.hex(1) + end + + private def rand_exchange : String + case rand(5) + when 1 then "amq.direct" + when 2 then "amq.topic" + when 3 then "amq.fanout" + when 4 then "amq.headers" + else "" + end + end + + private def churn_connections_and_acks + done = Channel(Nil).new + loop do + 100.times do + spawn do + AMQP::Client.start(@uri) do |conn| + ch = conn.channel + q = begin + ch.queue(rand_queue, args: AMQP::Client::Arguments.new({"x-expires": rand(1000), + "x-message-ttl": rand(1000), + "x-max-length": rand(1000), + "x-dead-letter-exchange": rand_exchange, + "x-max-priority": rand(255)})) + rescue + ch = conn.channel + ch.queue(rand_queue, passive: true) + end + x = rand_exchange + binding_key = rand_routing_key + q.bind(x, binding_key) unless x.empty? + prefetch = rand(100) + 1 + ch.prefetch(prefetch) + last = 0i64 + q.subscribe(no_ack: false, work_pool: rand(prefetch) + 1) do |m| + ns = IO::ByteFormat::SystemEndian.decode(Int64, m.body_io) + # puts "messages out of order #{ns} < #{last}" if ns < last + last = ns + sleep rand(0.1) + case rand(3) + when 1 then m.reject(requeue: true) + when 2 then m.reject(requeue: false) + else m.ack + end + end + sleep rand(30.0) + case rand(3) + when 0 then q.delete + when 1 then q.unbind(x, binding_key) unless x.empty? + end + end + ensure + done.send nil + end + end + 100.times do + done.receive + end + end end end @@ -524,6 +636,7 @@ when "connection-churn" then ConnectionChurn.new.run when "channel-churn" then ChannelChurn.new.run when "consumer-churn" then ConsumerChurn.new.run when "connection-count" then ConnectionCount.new.run +when "chaos" then Chaos.new.run when /^.+$/ then Perf.new.run([mode.not_nil!]) else abort Perf.new.banner end