Skip to content

Commit

Permalink
Refactor AWS integration specs
Browse files Browse the repository at this point in the history
Upgrading our localstack version showed how brittle our integration
tests are, because newer localstack versions react a bit slower (read:
100+ ms to pick up a message instead of 10+ ms). Since we used `sleep`
statements a ton to wait "just the right amount of time" to exceed the
TTL, specs became flaky or consistently failed because the worker was
picking them up later than intended.

Looking deeper into it showed that our approach was flawed from the get
go. While we name the variable in specs `retry_attempt_count` it is
actually a `call_count`. Therefore all `sleeps` were off by one
iteration. In the past this created the following patterns in specs:
* 0.0s Boot worker
* 0.1s `sleep 1` to let worker pick up message
* 0.2s Worker processes message
* 1.1s Check message has been processed
* 1.1s `sleep 1` to wait for retry
* 1.2s Worker processes retry

Sometimes there was just 10 ms between assertion and worker processing
the message. Since the worker is on a thread, this could have failed
with older localstack versions as well. With the newer and slower
localstack version most specs looked like this:
* 0.0s Boot worker
* 0.1s `sleep 1` to let worker pick up message
* 0.8s Worker processes message
* 1.1s Check message has been processed
* 1.1s `sleep 1` to wait for retry
* 2.0s Worker processes retry
* 2.1s `sleep 1` to wait for 2nd retry
* 3.1s Check for 2nd retry fails as worker has not picked up 2nd retry yet

The flaw here is that `visibility_timeout` does not tell when the
message is picked up again, but for how long it is NOT PICKED UP again.
So while we need to _at least_ wait for the `visibility_timeout` in
specs, we cannot know when the worker actually comes around to pick the
message up again. If we were using bigger numbers, like minutes for the
`visibility_timeout` rather than seconds, the 0.1 - 1.9 seconds
sometimes needed to pick up a message would not matter, but also would
slow down specs to a halt.

The second flaw is that we are basically testing libraries here. We
should trust AWS to ensure messages are not picked up for the
`visibility_timeout` we set. Instead we should only test if the right
`visibility_timeout` numbers are calculated for the libraries to use.

As a result we have adjusted the specs to use a `visibility_timeout` of
zero whenever possible to speed up specs. However, we note down the
calculated number to ensure the right one is normally passed on to AWS.
This allows us to use the exponential backoff strategy, without waiting
expontentially long.

In order to remove the need for `sleep` in specs, as the operation to
test is run asynchroneously, we introduced a thread based queue to wait
for results on. This way, as soon as a result is pushed to the queue in
another thread, the thread waiting for results immediately picks it up
and continues running the spec and assertions. By adding a decorator
around a method that is invoked after each time a message was processed,
we can simply say `wait_for_message_processed` to ensure the spec wait
just long enough to continue.

To test `visibility_timeout` we can now use
`aws_visibility_timeout_queue`, which gets pushed the
`visibility_timeout` calculated for each call, e.g.
```
expect(aws_visibility_timeout_queue.pop).to eq(call: 5, visibility_timeout: 5)
```

Using these techniques we sped up specs considerably, made them much
more reliable and still kept the integration part as real AWS libraries
are used. Also we can still integrate with a real AWS account instead of
using `localstack`.
  • Loading branch information
florianpilz committed Jan 10, 2024
1 parent 4bd06ae commit 02087b3
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 171 deletions.
108 changes: 27 additions & 81 deletions spec/eventq_aws/integration/aws_queue_worker_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
require 'spec_helper'

RSpec.describe EventQ::Amazon::QueueWorker, integration: true do
include_context 'mock_aws_visibility_timeout'
include_context 'aws_wait_for_message_processed_helper'

let(:queue_worker) { EventQ::QueueWorker.new }

let(:queue_client) do
Expand Down Expand Up @@ -116,9 +119,6 @@
received = false
context = nil

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(subscriber_queue, worker_adapter: subject, thread_count: 1, block_process: false, client: queue_client, wait: false) do |event, args|
expect(event).to eq(message)
expect(args).to be_a(EventQ::MessageArgs)
Expand All @@ -127,12 +127,12 @@
EventQ.logger.debug { "Message Received: #{event}" }
end

sleep(2)
wait_for_message_processed

queue_worker.stop
expect(received).to eq(true)
expect(context).to eq message_context

queue_worker.stop
expect(queue_worker.is_running).to eq(false)
end

Expand All @@ -143,9 +143,6 @@
received = false
context = nil

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(
subscriber_queue,
worker_adapter: subject,
Expand All @@ -161,12 +158,12 @@
EventQ.logger.debug { "Message Received: #{event}" }
end

sleep(2)
wait_for_message_processed

queue_worker.stop
expect(received).to eq(true)
expect(context).to eq message_context

queue_worker.stop
expect(queue_worker.is_running).to eq(false)
end

Expand All @@ -185,22 +182,18 @@

received = false

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 1, thread_count: 1, client: queue_client) do |event, args|
expect(event).to eq(message)
expect(args).to be_a(EventQ::MessageArgs)
received = true
EventQ.logger.debug { "Message Received: #{event}" }
end

sleep(2)

queue_worker.stop
wait_for_message_processed

expect(received).to eq(true)

queue_worker.stop
expect(queue_worker.is_running).to eq(false)
end
end
Expand All @@ -216,22 +209,18 @@

received = false

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 1, thread_count: 1, client: queue_client) do |event, args|
expect(event).to eq(message)
expect(args).to be_a(EventQ::MessageArgs)
received = true
EventQ.logger.debug { "Message Received: #{event}" }
end

sleep(2)

queue_worker.stop
wait_for_message_processed

expect(received).to eq(true)

queue_worker.stop
expect(queue_worker.is_running).to eq(false)
end
end
Expand All @@ -248,9 +237,6 @@
received_count = 0
received_attribute = 0

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 1, thread_count: 1, client: queue_client) do |event, args|
expect(event).to eq(message)
expect(args).to be_a(EventQ::MessageArgs)
Expand All @@ -261,13 +247,13 @@
args.abort = true if received_count != 2
end

sleep(4)

queue_worker.stop
2.times { wait_for_message_processed }

expect(received).to eq(true)
expect(received_count).to eq(2)
expect(received_attribute).to eq(1)

queue_worker.stop
expect(queue_worker.is_running).to eq(false)
end

Expand Down Expand Up @@ -296,7 +282,7 @@
end
end

sleep(5)
10.times { wait_for_message_processed }

expect(message_count).to eq(10)
expect(received_messages.length).to eq(5)
Expand All @@ -307,7 +293,6 @@
expect(received_messages[4][:events]).to be >= 1

queue_worker.stop

expect(queue_worker.is_running).to eq(false)
end

Expand All @@ -333,71 +318,40 @@
end

it 'should receive an event from the subscriber queue and retry it' do
retry_attempt_count = 0

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 1, thread_count: 1, client: queue_client) do |event, args|
expect(event).to eq(message)
expect(args).to be_a(EventQ::MessageArgs)
retry_attempt_count = args.retry_attempts + 1
raise 'Fail on purpose to send event to retry queue.'
end

sleep(1)

expect(retry_attempt_count).to eq(1)

sleep(2)

expect(retry_attempt_count).to eq(2)

sleep(3)

expect(retry_attempt_count).to eq(3)

sleep(4)

expect(retry_attempt_count).to eq(4)
expect(aws_visibility_timeout_queue.pop).to eq(call: 1, visibility_timeout: 1)
expect(aws_visibility_timeout_queue.pop).to eq(call: 2, visibility_timeout: 2)
expect(aws_visibility_timeout_queue.pop).to eq(call: 3, visibility_timeout: 3)
expect(aws_visibility_timeout_queue.pop).to eq(call: 4, visibility_timeout: 4)
expect(aws_visibility_timeout_queue.pop).to eq(call: 5, visibility_timeout: 5)

queue_worker.stop

expect(queue_worker.is_running).to eq(false)
end

context 'queue.allow_exponential_back_off = true' do
let(:max_retry_delay) { 10_000 }
let(:max_retry_delay) { 20_000 }
let(:allow_exponential_back_off) { true }

it 'retries received event with an exponential waiting period' do
retry_attempt_count = 0

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 1, thread_count: 1, client: queue_client) do |event, args|
expect(event).to eq(message)
expect(args).to be_a(EventQ::MessageArgs)
retry_attempt_count = args.retry_attempts + 1
raise 'Fail on purpose to send event to retry queue.'
end

sleep(1)

expect(retry_attempt_count).to eq(1)

sleep(2)

expect(retry_attempt_count).to eq(2)

sleep(4)

expect(retry_attempt_count).to eq(3)

sleep(8)

expect(retry_attempt_count).to eq(4)
expect(aws_visibility_timeout_queue.pop).to eq(call: 1, visibility_timeout: 1)
expect(aws_visibility_timeout_queue.pop).to eq(call: 2, visibility_timeout: 2)
expect(aws_visibility_timeout_queue.pop).to eq(call: 3, visibility_timeout: 4)
expect(aws_visibility_timeout_queue.pop).to eq(call: 4, visibility_timeout: 8)
expect(aws_visibility_timeout_queue.pop).to eq(call: 5, visibility_timeout: 16)

queue_worker.stop

Expand All @@ -416,21 +370,13 @@
end

it 'retries after half the retry delay has passed' do
retry_attempt_count = 0

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 0.5, thread_count: 1, client: queue_client) do |event, args|
expect(event).to eq(message)
expect(args).to be_a(EventQ::MessageArgs)
retry_attempt_count = args.retry_attempts + 1
raise 'Fail on purpose to send event to retry queue.'
end

sleep(3)

expect(retry_attempt_count).to eq(2)
expect(aws_visibility_timeout_queue.pop).to eq(call: 1, visibility_timeout: 2)

queue_worker.stop

Expand Down
Loading

0 comments on commit 02087b3

Please sign in to comment.