Skip to content

Commit

Permalink
Log worker name in steno logger (#4170)
Browse files Browse the repository at this point in the history
  • Loading branch information
johha authored Jan 21, 2025
1 parent cd7e670 commit 4d63ed2
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 2 deletions.
1 change: 1 addition & 0 deletions lib/delayed_job/delayed_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def get_initialized_delayed_worker(config, logger)

worker = Delayed::Worker.new(@queue_options)
worker.name = @queue_options[:worker_name]
Steno.config.context.data[:worker_name] = worker.name
worker
end

Expand Down
1 change: 1 addition & 0 deletions lib/delayed_job/threaded_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def start
@num_threads.times do |thread_index|
thread = Thread.new do
Thread.current[:thread_index] = thread_index
Steno.config.context.data[:worker_name] = name # override logged worker name with thread specific name
threaded_start
rescue Exception => e # rubocop:disable Lint/RescueException
say "Unexpected error: #{e.message}\n#{e.backtrace.join("\n")}", 'error'
Expand Down
12 changes: 10 additions & 2 deletions spec/unit/lib/delayed_job/delayed_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
describe '#start_working' do
let(:cc_delayed_worker) { CloudController::DelayedWorker.new(options) }

before { allow(delayed_worker).to receive(:name).and_return(options[:name]) }

it 'sets up the environment and starts the worker' do
expect(environment).to receive(:setup_environment).with(nil)
expect(Delayed::Worker).to receive(:new).with(anything).and_return(delayed_worker)
Expand All @@ -86,10 +88,16 @@
expect(Delayed::Worker.sleep_delay).to eq(5)
end

it 'sets the worker name in the Steno context' do
cc_delayed_worker.start_working
expect(Steno.config.context.data[:worker_name]).to eq(options[:name])
end

context 'when the number of threads is specified' do
before do
allow(Delayed).to receive(:remove_const).with(:Worker)
allow(Delayed).to receive(:const_set).with(:Worker, Delayed::ThreadedWorker)
allow(threaded_worker).to receive(:name)
options[:num_threads] = 7
end

Expand Down Expand Up @@ -122,7 +130,7 @@
expect(environment).to receive(:setup_environment).with(nil)
expect(Delayed::Worker).to receive(:new).with(anything).and_return(delayed_worker).once
expect(delayed_worker).to receive(:name=).with(options[:name]).once
expect(delayed_worker).to receive(:name).and_return(options[:name]).once
expect(delayed_worker).to receive(:name).and_return(options[:name]).twice
expect(Delayed::Job).to receive(:clear_locks!).with(options[:name]).once

cc_delayed_worker.clear_locks!
Expand All @@ -134,7 +142,7 @@
expect(environment).to receive(:setup_environment).with(nil)
expect(Delayed::Worker).to receive(:new).with(anything).and_return(threaded_worker).once
expect(threaded_worker).to receive(:name=).with(options[:name]).once
expect(threaded_worker).to receive(:name).and_return(options[:name]).once
expect(threaded_worker).to receive(:name).and_return(options[:name]).twice
expect(threaded_worker).to receive(:names_with_threads).and_return(["#{options[:name]} thread:1", "#{options[:name]} thread:2"]).once
expect(Delayed::Job).to receive(:clear_locks!).with(options[:name]).once
expect(Delayed::Job).to receive(:clear_locks!).with("#{options[:name]} thread:1").once
Expand Down
10 changes: 10 additions & 0 deletions spec/unit/lib/delayed_job/threaded_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@
expect { worker.start }.to raise_error('Unexpected error occurred in one of the worker threads')
expect(worker.instance_variable_get(:@unexpected_error)).to be true
end

it 'sets the worker name in the Steno context' do
steno_data_spy = spy('data')
allow(Steno.config.context).to receive(:data).and_return(steno_data_spy)

worker.start
worker.instance_variable_get(:@threads).each_with_index do |_, index|
expect(steno_data_spy).to have_received(:[]=).with(:worker_name, "#{worker_name} thread:#{index + 1}")
end
end
end

describe '#names_with_threads' do
Expand Down

0 comments on commit 4d63ed2

Please sign in to comment.