Skip to content

Commit

Permalink
WIP remove sleep from specs
Browse files Browse the repository at this point in the history
  • Loading branch information
florianpilz committed Jan 9, 2024
1 parent 1446038 commit 1c385a1
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 171 deletions.
108 changes: 27 additions & 81 deletions spec/eventq_aws/integration/aws_queue_worker_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
require 'spec_helper'

RSpec.describe EventQ::Amazon::QueueWorker, integration: true do
include_context 'mock_aws_visibility_timeout'
include_context 'aws_wait_for_message_processed_helper'

let(:queue_worker) { EventQ::QueueWorker.new }

let(:queue_client) do
Expand Down Expand Up @@ -116,9 +119,6 @@
received = false
context = nil

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(subscriber_queue, worker_adapter: subject, thread_count: 1, block_process: false, client: queue_client, wait: false) do |event, args|
expect(event).to eq(message)
expect(args).to be_a(EventQ::MessageArgs)
Expand All @@ -127,12 +127,12 @@
EventQ.logger.debug { "Message Received: #{event}" }
end

sleep(2)
wait_for_message_processed

queue_worker.stop
expect(received).to eq(true)
expect(context).to eq message_context

queue_worker.stop
expect(queue_worker.is_running).to eq(false)
end

Expand All @@ -143,9 +143,6 @@
received = false
context = nil

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(
subscriber_queue,
worker_adapter: subject,
Expand All @@ -161,12 +158,12 @@
EventQ.logger.debug { "Message Received: #{event}" }
end

sleep(2)
wait_for_message_processed

queue_worker.stop
expect(received).to eq(true)
expect(context).to eq message_context

queue_worker.stop
expect(queue_worker.is_running).to eq(false)
end

Expand All @@ -185,22 +182,18 @@

received = false

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 1, thread_count: 1, client: queue_client) do |event, args|
expect(event).to eq(message)
expect(args).to be_a(EventQ::MessageArgs)
received = true
EventQ.logger.debug { "Message Received: #{event}" }
end

sleep(2)

queue_worker.stop
wait_for_message_processed

expect(received).to eq(true)

queue_worker.stop
expect(queue_worker.is_running).to eq(false)
end
end
Expand All @@ -216,22 +209,18 @@

received = false

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 1, thread_count: 1, client: queue_client) do |event, args|
expect(event).to eq(message)
expect(args).to be_a(EventQ::MessageArgs)
received = true
EventQ.logger.debug { "Message Received: #{event}" }
end

sleep(2)

queue_worker.stop
wait_for_message_processed

expect(received).to eq(true)

queue_worker.stop
expect(queue_worker.is_running).to eq(false)
end
end
Expand All @@ -248,9 +237,6 @@
received_count = 0
received_attribute = 0

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 1, thread_count: 1, client: queue_client) do |event, args|
expect(event).to eq(message)
expect(args).to be_a(EventQ::MessageArgs)
Expand All @@ -261,13 +247,13 @@
args.abort = true if received_count != 2
end

sleep(4)

queue_worker.stop
2.times { wait_for_message_processed }

expect(received).to eq(true)
expect(received_count).to eq(2)
expect(received_attribute).to eq(1)

queue_worker.stop
expect(queue_worker.is_running).to eq(false)
end

Expand Down Expand Up @@ -296,7 +282,7 @@
end
end

sleep(5)
10.times { wait_for_message_processed }

expect(message_count).to eq(10)
expect(received_messages.length).to eq(5)
Expand All @@ -307,7 +293,6 @@
expect(received_messages[4][:events]).to be >= 1

queue_worker.stop

expect(queue_worker.is_running).to eq(false)
end

Expand All @@ -333,71 +318,40 @@
end

it 'should receive an event from the subscriber queue and retry it' do
retry_attempt_count = 0

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 1, thread_count: 1, client: queue_client) do |event, args|
expect(event).to eq(message)
expect(args).to be_a(EventQ::MessageArgs)
retry_attempt_count = args.retry_attempts + 1
raise 'Fail on purpose to send event to retry queue.'
end

sleep(1)

expect(retry_attempt_count).to eq(1)

sleep(2)

expect(retry_attempt_count).to eq(2)

sleep(3)

expect(retry_attempt_count).to eq(3)

sleep(4)

expect(retry_attempt_count).to eq(4)
expect(aws_visibility_timeout_queue.pop).to eq(call: 1, visibility_timeout: 1)
expect(aws_visibility_timeout_queue.pop).to eq(call: 2, visibility_timeout: 2)
expect(aws_visibility_timeout_queue.pop).to eq(call: 3, visibility_timeout: 3)
expect(aws_visibility_timeout_queue.pop).to eq(call: 4, visibility_timeout: 4)
expect(aws_visibility_timeout_queue.pop).to eq(call: 5, visibility_timeout: 5)

queue_worker.stop

expect(queue_worker.is_running).to eq(false)
end

context 'queue.allow_exponential_back_off = true' do
let(:max_retry_delay) { 10_000 }
let(:max_retry_delay) { 20_000 }
let(:allow_exponential_back_off) { true }

it 'retries received event with an exponential waiting period' do
retry_attempt_count = 0

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 1, thread_count: 1, client: queue_client) do |event, args|
expect(event).to eq(message)
expect(args).to be_a(EventQ::MessageArgs)
retry_attempt_count = args.retry_attempts + 1
raise 'Fail on purpose to send event to retry queue.'
end

sleep(1)

expect(retry_attempt_count).to eq(1)

sleep(2)

expect(retry_attempt_count).to eq(2)

sleep(4)

expect(retry_attempt_count).to eq(3)

sleep(8)

expect(retry_attempt_count).to eq(4)
expect(aws_visibility_timeout_queue.pop).to eq(call: 1, visibility_timeout: 1)
expect(aws_visibility_timeout_queue.pop).to eq(call: 2, visibility_timeout: 2)
expect(aws_visibility_timeout_queue.pop).to eq(call: 3, visibility_timeout: 4)
expect(aws_visibility_timeout_queue.pop).to eq(call: 4, visibility_timeout: 8)
expect(aws_visibility_timeout_queue.pop).to eq(call: 5, visibility_timeout: 16)

queue_worker.stop

Expand All @@ -416,21 +370,13 @@
end

it 'retries after half the retry delay has passed' do
retry_attempt_count = 0

# wait 1 second to allow the message to be sent and broadcast to the queue
sleep(1)

queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 0.5, thread_count: 1, client: queue_client) do |event, args|
expect(event).to eq(message)
expect(args).to be_a(EventQ::MessageArgs)
retry_attempt_count = args.retry_attempts + 1
raise 'Fail on purpose to send event to retry queue.'
end

sleep(3)

expect(retry_attempt_count).to eq(2)
expect(aws_visibility_timeout_queue.pop).to eq(call: 1, visibility_timeout: 2)

queue_worker.stop

Expand Down
Loading

0 comments on commit 1c385a1

Please sign in to comment.