Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chaos mode in lavinmqperf #443

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 125 additions & 12 deletions src/lavinmqperf.cr
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,23 @@ class Perf
def run(args = ARGV)
@parser.parse(args)
end

protected def rss
if File.exists?("/proc/self/statm")
File.read("/proc/self/statm").split[1].to_i64 * 4096
elsif ps_rss = `ps -o rss= -p $PPID`.to_i64?
ps_rss * 1024
else
0
end
end

protected def maximize_fd_limit
_, fd_limit_max = System.file_descriptor_limit
System.file_descriptor_limit = fd_limit_max
fd_limit_current, _ = System.file_descriptor_limit
puts "FD limit: #{fd_limit_current}"
end
end

class Throughput < Perf
Expand Down Expand Up @@ -492,22 +509,117 @@ class ConnectionCount < Perf
client.host = "127.0.#{Random.rand(UInt8)}.#{Random.rand(UInt8)}" if @random_localhost
client
end
end

private def rss
if File.exists?("/proc/self/statm")
File.read("/proc/self/statm").split[1].to_i64 * 4096
elsif ps_rss = `ps -o rss= -p $PPID`.to_i64?
ps_rss * 1024
else
0
class Chaos < Perf
def initialize
super
end

def run
super
spawn churn_connections_and_pubs
spawn churn_connections_and_acks
sleep
# churn_queues
# churn_exchanges
# churn_vhosts
# churn_bindings
# random_ack_rejects
end

@i = Atomic(Int64).new(0u64)

private def churn_connections_and_pubs
done = Channel(Nil).new
loop do
100.times do
spawn do
AMQP::Client.start(@uri) do |conn|
ch = conn.channel
ch.confirm_select if rand(3).zero?
rand(10_000).times do
sleep rand(0.1)
body = uninitialized UInt8[8]
IO::ByteFormat::SystemEndian.encode(@i.add(1), body.to_slice)
ch.basic_publish(body.to_slice, rand_exchange, rand_routing_key)
end
done.send nil
end
end
end
100.times do
done.receive
end
end
end

private def maximize_fd_limit
_, fd_limit_max = System.file_descriptor_limit
System.file_descriptor_limit = fd_limit_max
fd_limit_current, _ = System.file_descriptor_limit
puts "FD limit: #{fd_limit_current}"
private def rand_routing_key : String
Random::DEFAULT.hex(1)
end

private def rand_queue : String
Random::DEFAULT.hex(1)
end

private def rand_exchange : String
case rand(5)
when 1 then "amq.direct"
when 2 then "amq.topic"
when 3 then "amq.fanout"
when 4 then "amq.headers"
else ""
end
end

private def churn_connections_and_acks
done = Channel(Nil).new
loop do
100.times do
spawn do
AMQP::Client.start(@uri) do |conn|
ch = conn.channel
q = begin
ch.queue(rand_queue, args: AMQP::Client::Arguments.new({"x-expires": rand(1000),
"x-message-ttl": rand(1000),
"x-max-length": rand(1000),
"x-dead-letter-exchange": rand_exchange,
"x-max-priority": rand(255)}))
rescue
ch = conn.channel
ch.queue(rand_queue, passive: true)
end
x = rand_exchange
binding_key = rand_routing_key
q.bind(x, binding_key) unless x.empty?
prefetch = rand(100) + 1
ch.prefetch(prefetch)
last = 0i64
q.subscribe(no_ack: false, work_pool: rand(prefetch) + 1) do |m|
ns = IO::ByteFormat::SystemEndian.decode(Int64, m.body_io)
# puts "messages out of order #{ns} < #{last}" if ns < last
last = ns
sleep rand(0.1)
case rand(3)
when 1 then m.reject(requeue: true)
when 2 then m.reject(requeue: false)
else m.ack
end
end
sleep rand(30.0)
case rand(3)
when 0 then q.delete
when 1 then q.unbind(x, binding_key) unless x.empty?
end
end
ensure
done.send nil
end
end
100.times do
done.receive
end
end
end
end

Expand All @@ -524,6 +636,7 @@ when "connection-churn" then ConnectionChurn.new.run
when "channel-churn" then ChannelChurn.new.run
when "consumer-churn" then ConsumerChurn.new.run
when "connection-count" then ConnectionCount.new.run
when "chaos" then Chaos.new.run
when /^.+$/ then Perf.new.run([mode.not_nil!])
else abort Perf.new.banner
end