From 263feda2429f266818e9d0a97679218575029089 Mon Sep 17 00:00:00 2001 From: "Ben Sheldon [he/him]" Date: Sun, 28 May 2023 22:20:34 -0700 Subject: [PATCH] Allow TimerTask to be safely restarted after shutdown and avoid duplicate tasks --- lib/concurrent-ruby/concurrent/timer_task.rb | 8 ++++++-- spec/concurrent/timer_task_spec.rb | 14 ++++++++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/lib/concurrent-ruby/concurrent/timer_task.rb b/lib/concurrent-ruby/concurrent/timer_task.rb index dd2037f62..a5a8f6690 100644 --- a/lib/concurrent-ruby/concurrent/timer_task.rb +++ b/lib/concurrent-ruby/concurrent/timer_task.rb @@ -236,6 +236,7 @@ def execute synchronize do if @running.false? @running.make_true + @age.increment schedule_next_task(@run_now ? 0 : @execution_interval) end end @@ -309,6 +310,7 @@ def ns_initialize(opts, &task) @task = Concurrent::SafeTaskExecutor.new(task) @executor = opts[:executor] || Concurrent.global_io_executor @running = Concurrent::AtomicBoolean.new(false) + @age = Concurrent::AtomicFixnum.new(0) @value = nil self.observers = Collection::CopyOnNotifyObserverSet.new @@ -328,13 +330,15 @@ def ns_kill_execution # @!visibility private def schedule_next_task(interval = execution_interval) - ScheduledTask.execute(interval, executor: @executor, args: [Concurrent::Event.new], &method(:execute_task)) + ScheduledTask.execute(interval, executor: @executor, args: [Concurrent::Event.new, @age.value], &method(:execute_task)) nil end # @!visibility private - def execute_task(completion) + def execute_task(completion, age_when_scheduled) return nil unless @running.true? + return nil unless @age.value == age_when_scheduled + start_time = Concurrent.monotonic_time _success, value, reason = @task.execute(self) if completion.try? diff --git a/spec/concurrent/timer_task_spec.rb b/spec/concurrent/timer_task_spec.rb index 10fbf34db..dbb5d43d6 100644 --- a/spec/concurrent/timer_task_spec.rb +++ b/spec/concurrent/timer_task_spec.rb @@ -1,5 +1,6 @@ require_relative 'concern/dereferenceable_shared' require_relative 'concern/observable_shared' +require 'concurrent/atomic/atomic_fixnum' require 'concurrent/timer_task' module Concurrent @@ -116,6 +117,19 @@ def trigger_observable(observable) sleep(0.1) expect(task.shutdown).to be_truthy end + + it 'will cancel pre-shutdown task even if restarted to avoid double-runs' do + counter = Concurrent::AtomicFixnum.new(0) + task = TimerTask.execute(execution_interval: 0.2, run_now: true) { counter.increment } + sleep 0.05 + expect(counter.value).to eq 1 + + task.shutdown + task.execute + + sleep 0.25 + expect(counter.value).to eq 3 + end end end