diff --git a/libs/core/coroutines/include/hpx/coroutines/detail/combined_tagged_state.hpp b/libs/core/coroutines/include/hpx/coroutines/detail/combined_tagged_state.hpp index 35361b622dca..3cf6b90e3261 100644 --- a/libs/core/coroutines/include/hpx/coroutines/detail/combined_tagged_state.hpp +++ b/libs/core/coroutines/include/hpx/coroutines/detail/combined_tagged_state.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2016 Hartmut Kaiser +// Copyright (c) 2007-2022 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -13,44 +13,48 @@ #include /////////////////////////////////////////////////////////////////////////////// -namespace hpx { namespace threads { namespace detail { +namespace hpx::threads::detail { + /////////////////////////////////////////////////////////////////////////// template class combined_tagged_state { private: - typedef std::int64_t tagged_state_type; + using tagged_state_type = std::int64_t; - typedef std::int8_t thread_state_type; - typedef std::int8_t thread_state_ex_type; - typedef std::int64_t tag_type; + using thread_state_type = std::int8_t; + using thread_state_ex_type = std::int8_t; + using tag_type = std::int64_t; - static const std::size_t state_shift = 56; // 8th byte - static const std::size_t state_ex_shift = 48; // 7th byte + static constexpr std::size_t state_shift = 56; // 8th byte + static constexpr std::size_t state_ex_shift = 48; // 7th byte - static const tagged_state_type state_mask = 0xffull; - static const tagged_state_type state_ex_mask = 0xffull; + static constexpr tagged_state_type state_mask = 0xffull; + static constexpr tagged_state_type state_ex_mask = 0xffull; // (1L << 48L) - 1; - static const tagged_state_type tag_mask = 0x0000ffffffffffffull; + static constexpr tagged_state_type tag_mask = 0x0000ffffffffffffull; - static tag_type extract_tag(tagged_state_type const& i) + static constexpr tag_type extract_tag( + tagged_state_type const& i) noexcept { return i & tag_mask; } - static thread_state_type extract_state(tagged_state_type const& i) + static constexpr thread_state_type extract_state( + tagged_state_type const& i) noexcept { return (i >> state_shift) & state_mask; } - static thread_state_ex_type extract_state_ex(tagged_state_type const& i) + static constexpr thread_state_ex_type extract_state_ex( + tagged_state_type const& i) noexcept { return (i >> state_ex_shift) & state_ex_mask; } static tagged_state_type pack_state( - T1 state_, T2 state_ex_, tag_type tag) + T1 state_, T2 state_ex_, tag_type tag) noexcept { tagged_state_type state = static_cast(state_); tagged_state_type state_ex = @@ -65,68 +69,70 @@ namespace hpx { namespace threads { namespace detail { public: /////////////////////////////////////////////////////////////////////// - combined_tagged_state() noexcept + constexpr combined_tagged_state() noexcept : state_(0) { } - combined_tagged_state(T1 state, T2 state_ex, tag_type t = 0) + constexpr combined_tagged_state( + T1 state, T2 state_ex, tag_type t = 0) noexcept : state_(pack_state(state, state_ex, t)) { } - combined_tagged_state(combined_tagged_state state, tag_type t) + constexpr combined_tagged_state( + combined_tagged_state state, tag_type t) noexcept : state_(pack_state(state.state(), state.state_ex(), t)) { } /////////////////////////////////////////////////////////////////////// - void set(T1 state, T2 state_ex, tag_type t) + void set(T1 state, T2 state_ex, tag_type t) noexcept { state_ = pack_state(state, state_ex, t); } /////////////////////////////////////////////////////////////////////// - bool operator==(combined_tagged_state const& p) const + constexpr bool operator==(combined_tagged_state const& p) const noexcept { return state_ == p.state_; } - bool operator!=(combined_tagged_state const& p) const + constexpr bool operator!=(combined_tagged_state const& p) const noexcept { return !operator==(p); } /////////////////////////////////////////////////////////////////////// // state access - T1 state() const + constexpr T1 state() const noexcept { return static_cast(extract_state(state_)); } - void set_state(T1 state) + void set_state(T1 state) noexcept { state_ = pack_state(state, state_ex(), tag()); } - T2 state_ex() const + T2 state_ex() const noexcept { return static_cast(extract_state_ex(state_)); } - void set_state_ex(T2 state_ex) + void set_state_ex(T2 state_ex) noexcept { state_ = pack_state(state(), state_ex, tag()); } /////////////////////////////////////////////////////////////////////// // tag access - tag_type tag() const + constexpr tag_type tag() const noexcept { return extract_tag(state_); } - void set_tag(tag_type t) + void set_tag(tag_type t) noexcept { state_ = pack_state(state(), state_ex(), t); } @@ -134,4 +140,4 @@ namespace hpx { namespace threads { namespace detail { protected: tagged_state_type state_; }; -}}} // namespace hpx::threads::detail +} // namespace hpx::threads::detail diff --git a/libs/core/schedulers/include/hpx/schedulers/thread_queue.hpp b/libs/core/schedulers/include/hpx/schedulers/thread_queue.hpp index af0de91ccfca..5bbf5867aa94 100644 --- a/libs/core/schedulers/include/hpx/schedulers/thread_queue.hpp +++ b/libs/core/schedulers/include/hpx/schedulers/thread_queue.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2019 Hartmut Kaiser +// Copyright (c) 2007-2022 Hartmut Kaiser // Copyright (c) 2011 Bryce Lelbach // // SPDX-License-Identifier: BSL-1.0 @@ -46,7 +46,8 @@ #include /////////////////////////////////////////////////////////////////////////////// -namespace hpx { namespace threads { namespace policies { +namespace hpx::threads::policies { + /////////////////////////////////////////////////////////////////////////// // // Queue back-end interface: // @@ -95,14 +96,6 @@ namespace hpx { namespace threads { namespace policies { using thread_heap_type = std::vector>; - struct task_description - { - thread_init_data data; -#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME - std::uint64_t waittime; -#endif - }; - #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME struct thread_description { @@ -118,9 +111,6 @@ namespace hpx { namespace threads { namespace policies { using work_items_type = typename PendingQueuing::template apply< thread_description_ptr>::type; - using task_items_type = - typename StagedQueuing::template apply::type; - using terminated_items_type = typename TerminatedQueuing::template apply::type; @@ -165,9 +155,8 @@ namespace hpx { namespace threads { namespace policies { data.initial_state = thread_schedule_state::pending; } - // ASAN gets confused by reusing threads/stacks #if !defined(HPX_HAVE_ADDRESS_SANITIZER) - + // ASAN gets confused by reusing threads/stacks // Check for an unused thread object. if (!heap->empty()) { @@ -197,132 +186,7 @@ namespace hpx { namespace threads { namespace policies { } } - static util::internal_allocator - task_description_alloc_; - /////////////////////////////////////////////////////////////////////// - // add new threads if there is some amount of work available - std::size_t add_new(std::int64_t add_count, thread_queue* addfrom, - std::unique_lock& lk, bool steal = false) - { - HPX_ASSERT(lk.owns_lock()); - - if (HPX_UNLIKELY(0 == add_count)) - return 0; - - std::size_t added = 0; - task_description* task = nullptr; - while (add_count-- && addfrom->new_tasks_.pop(task, steal)) - { -#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME - if (get_maintain_queue_wait_times_enabled()) - { - addfrom->new_tasks_wait_ += - hpx::chrono::high_resolution_clock::now() - - task->waittime; - ++addfrom->new_tasks_wait_count_; - } -#endif - // create the new thread - threads::thread_init_data& data = task->data; - - bool schedule_now = - data.initial_state == thread_schedule_state::pending; - (void) schedule_now; - - threads::thread_id_ref_type thrd; - create_thread_object(thrd, data, lk); - - task->~task_description(); - task_description_alloc_.deallocate(task, 1); - - // add the new entry to the map of all threads - std::pair p = - thread_map_.insert(thrd.noref()); - - if (HPX_UNLIKELY(!p.second)) - { - --addfrom->new_tasks_count_.data_; - lk.unlock(); - HPX_THROW_EXCEPTION(hpx::out_of_memory, - "thread_queue::add_new", - "Couldn't add new thread to the thread map"); - return 0; - } - - ++thread_map_count_; - - // Decrement only after thread_map_count_ has been incremented - --addfrom->new_tasks_count_.data_; - - // insert the thread into the work-items queue assuming it is - // in pending state, thread would go out of scope otherwise - HPX_ASSERT(schedule_now); - - // pushing the new thread into the pending queue of the - // specified thread_queue - ++added; - schedule_thread(HPX_MOVE(thrd)); - } - - if (added) - { - LTM_(debug).format("add_new: added {} tasks to queues", added); - } - return added; - } - - /////////////////////////////////////////////////////////////////////// - bool add_new_always(std::size_t& added, thread_queue* addfrom, - std::unique_lock& lk, bool steal = false) - { - HPX_ASSERT(lk.owns_lock()); - -#ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES - util::tick_counter tc(add_new_time_); -#endif - - // create new threads from pending tasks (if appropriate) - std::int64_t add_count = -1; // default is no constraint - - // if we are desperate (no work in the queues), add some even if the - // map holds more than max_thread_count - if (HPX_LIKELY(parameters_.max_thread_count_)) - { - std::int64_t count = - static_cast(thread_map_.size()); - if (parameters_.max_thread_count_ >= - count + parameters_.min_add_new_count_) - { //-V104 - HPX_ASSERT(parameters_.max_thread_count_ - count < - (std::numeric_limits::max)()); - add_count = static_cast( - parameters_.max_thread_count_ - count); - if (add_count < parameters_.min_add_new_count_) - add_count = parameters_.min_add_new_count_; - if (add_count > parameters_.max_add_new_count_) - add_count = parameters_.max_add_new_count_; - } - else if (work_items_.empty()) - { - // add this number of threads - add_count = parameters_.min_add_new_count_; - - // increase max_thread_count - parameters_.max_thread_count_ += - parameters_.min_add_new_count_; //-V101 - } - else - { - return false; - } - } - - std::size_t addednew = add_new(add_count, addfrom, lk, steal); - added += addednew; - return addednew != 0; - } - void recycle_thread(thread_id_type thrd) { std::ptrdiff_t stacksize = @@ -356,11 +220,11 @@ namespace hpx { namespace threads { namespace policies { } public: - /// This function makes sure all threads which are marked for deletion - /// (state is terminated) are properly destroyed. - /// - /// This returns 'true' if there are no more terminated threads waiting - /// to be deleted. + // This function makes sure all threads which are marked for deletion + // (state is terminated) are properly destroyed. + // + // This returns 'true' if there are no more terminated threads waiting + // to be deleted. bool cleanup_terminated_locked(bool delete_all = false) { #ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES @@ -434,7 +298,10 @@ namespace hpx { namespace threads { namespace policies { // do not lock mutex while deleting all threads, do it piece-wise while (true) { - std::lock_guard lk(mtx_); + std::unique_lock lk(mtx_, std::try_to_lock); + if (!lk.owns_lock()) + return false; // avoid long wait on lock + if (cleanup_terminated_locked(false)) { return true; @@ -443,7 +310,10 @@ namespace hpx { namespace threads { namespace policies { return false; } - std::lock_guard lk(mtx_); + std::unique_lock lk(mtx_, std::try_to_lock); + if (!lk.owns_lock()) + return false; // avoid long wait on lock + return cleanup_terminated_locked(false); } @@ -458,30 +328,21 @@ namespace hpx { namespace threads { namespace policies { #endif , terminated_items_(128) , terminated_items_count_(0) - , new_tasks_(128) -#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME - , new_tasks_wait_(0) - , new_tasks_wait_count_(0) -#endif , thread_heap_small_() , thread_heap_medium_() , thread_heap_large_() , thread_heap_huge_() , thread_heap_nostack_() #ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES - , add_new_time_(0) , cleanup_terminated_time_(0) #endif #ifdef HPX_HAVE_THREAD_STEALING_COUNTS , pending_misses_(0) , pending_accesses_(0) , stolen_from_pending_(0) - , stolen_from_staged_(0) , stolen_to_pending_(0) - , stolen_to_staged_(0) #endif { - new_tasks_count_.data_ = 0; work_items_count_.data_ = 0; } @@ -509,12 +370,12 @@ namespace hpx { namespace threads { namespace policies { } #ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES - std::uint64_t get_creation_time(bool reset) + constexpr std::uint64_t get_creation_time(bool) const noexcept { - return util::get_and_reset_value(add_new_time_, reset); + return 0; } - std::uint64_t get_cleanup_time(bool reset) + std::uint64_t get_cleanup_time(bool reset) noexcept { return util::get_and_reset_value(cleanup_terminated_time_, reset); } @@ -523,36 +384,32 @@ namespace hpx { namespace threads { namespace policies { /////////////////////////////////////////////////////////////////////// // This returns the current length of the queues (work items and new items) std::int64_t get_queue_length( - std::memory_order order = std::memory_order_acquire) const + std::memory_order order = std::memory_order_acquire) const noexcept { - return work_items_count_.data_.load(order) + - new_tasks_count_.data_.load(order); + return work_items_count_.data_.load(order); } // This returns the current length of the pending queue std::int64_t get_pending_queue_length( - std::memory_order order = std::memory_order_acquire) const + std::memory_order order = std::memory_order_acquire) const noexcept { return work_items_count_.data_.load(order); } // This returns the current length of the staged queue std::int64_t get_staged_queue_length( - std::memory_order order = std::memory_order_acquire) const + std::memory_order = std::memory_order_acquire) const noexcept { - return new_tasks_count_.data_.load(order); + return 0; } #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME - std::uint64_t get_average_task_wait_time() const + std::uint64_t get_average_task_wait_time() const noexcept { - std::uint64_t count = new_tasks_wait_count_; - if (count == 0) - return 0; - return new_tasks_wait_ / count; + return 0; } - std::uint64_t get_average_thread_wait_time() const + std::uint64_t get_average_thread_wait_time() const noexcept { std::uint64_t count = work_items_wait_count_; if (count == 0) @@ -562,85 +419,88 @@ namespace hpx { namespace threads { namespace policies { #endif #ifdef HPX_HAVE_THREAD_STEALING_COUNTS - std::int64_t get_num_pending_misses(bool reset) + std::int64_t get_num_pending_misses(bool reset) noexcept { return util::get_and_reset_value(pending_misses_, reset); } - void increment_num_pending_misses(std::size_t num = 1) + void increment_num_pending_misses(std::size_t num = 1) noexcept { pending_misses_.fetch_add(num, std::memory_order_relaxed); } - std::int64_t get_num_pending_accesses(bool reset) + std::int64_t get_num_pending_accesses(bool reset) noexcept { return util::get_and_reset_value(pending_accesses_, reset); } - void increment_num_pending_accesses(std::size_t num = 1) + void increment_num_pending_accesses(std::size_t num = 1) noexcept { pending_accesses_.fetch_add(num, std::memory_order_relaxed); } - std::int64_t get_num_stolen_from_pending(bool reset) + std::int64_t get_num_stolen_from_pending(bool reset) noexcept { return util::get_and_reset_value(stolen_from_pending_, reset); } - void increment_num_stolen_from_pending(std::size_t num = 1) + void increment_num_stolen_from_pending(std::size_t num = 1) noexcept { stolen_from_pending_.fetch_add(num, std::memory_order_relaxed); } - std::int64_t get_num_stolen_from_staged(bool reset) + constexpr std::int64_t get_num_stolen_from_staged(bool) const noexcept { - return util::get_and_reset_value(stolen_from_staged_, reset); + return 0; } - void increment_num_stolen_from_staged(std::size_t num = 1) + constexpr void increment_num_stolen_from_staged( + std::size_t = 1) const noexcept { - stolen_from_staged_.fetch_add(num, std::memory_order_relaxed); } - std::int64_t get_num_stolen_to_pending(bool reset) + std::int64_t get_num_stolen_to_pending(bool reset) noexcept { return util::get_and_reset_value(stolen_to_pending_, reset); } - void increment_num_stolen_to_pending(std::size_t num = 1) + void increment_num_stolen_to_pending(std::size_t num = 1) noexcept { stolen_to_pending_.fetch_add(num, std::memory_order_relaxed); } - std::int64_t get_num_stolen_to_staged(bool reset) + constexpr std::int64_t get_num_stolen_to_staged(bool) const noexcept { - return util::get_and_reset_value(stolen_to_staged_, reset); + return 0; } - void increment_num_stolen_to_staged(std::size_t num = 1) + constexpr void increment_num_stolen_to_staged( + std::size_t = 1) const noexcept { - stolen_to_staged_.fetch_add(num, std::memory_order_relaxed); } #else - constexpr void increment_num_pending_misses(std::size_t /* num */ = 1) + constexpr void increment_num_pending_misses( + std::size_t = 1) const noexcept { } - constexpr void increment_num_pending_accesses(std::size_t /* num */ = 1) + constexpr void increment_num_pending_accesses( + std::size_t = 1) const noexcept { } constexpr void increment_num_stolen_from_pending( - std::size_t /* num */ = 1) + std::size_t = 1) const noexcept { } constexpr void increment_num_stolen_from_staged( - std::size_t /* num */ = 1) + std::size_t = 1) const noexcept { } constexpr void increment_num_stolen_to_pending( - std::size_t /* num */ = 1) + std::size_t = 1) const noexcept { } - constexpr void increment_num_stolen_to_staged(std::size_t /* num */ = 1) + constexpr void increment_num_stolen_to_staged( + std::size_t = 1) const noexcept { } #endif @@ -662,91 +522,59 @@ namespace hpx { namespace threads { namespace policies { HPX_ASSERT(data.stacksize != threads::thread_stacksize::current); - if (data.run_now) - { - threads::thread_id_ref_type thrd; + threads::thread_id_ref_type thrd; - // The mutex can not be locked while a new thread is getting - // created, as it might have that the current HPX thread gets - // suspended. - { - std::unique_lock lk(mtx_); - - bool schedule_now = - data.initial_state == thread_schedule_state::pending; + // The mutex can not be locked while a new thread is getting + // created, as it might have that the current HPX thread gets + // suspended. + std::unique_lock lk(mtx_); - create_thread_object(thrd, data, lk); + bool schedule_now = + data.initial_state == thread_schedule_state::pending; - // add a new entry in the map for this thread - std::pair p = - thread_map_.insert(thrd.noref()); + create_thread_object(thrd, data, lk); - if (HPX_UNLIKELY(!p.second)) - { - lk.unlock(); - HPX_THROWS_IF(ec, hpx::out_of_memory, - "thread_queue::create_thread", - "Couldn't add new thread to the map of threads"); - return; - } - ++thread_map_count_; + // add a new entry in the map for this thread + std::pair p = + thread_map_.insert(thrd.noref()); - // this thread has to be in the map now - HPX_ASSERT( - thread_map_.find(thrd.noref()) != thread_map_.end()); - HPX_ASSERT( - &get_thread_id_data(thrd)->get_queue() == - this); + if (HPX_UNLIKELY(!p.second)) + { + lk.unlock(); + HPX_THROWS_IF(ec, hpx::out_of_memory, + "thread_queue::create_thread", + "Couldn't add new thread to the map of threads"); + return; + } + ++thread_map_count_; - // push the new thread in the pending thread queue - if (schedule_now) - { - // return the thread_id_ref of the newly created thread - if (id) - { - *id = thrd; - } - schedule_thread(HPX_MOVE(thrd)); - } - else - { - // if the thread should not be scheduled the id must be - // returned to the caller as otherwise the thread would - // go out of scope right away. - HPX_ASSERT(id != nullptr); - *id = HPX_MOVE(thrd); - } + // this thread has to be in the map now + HPX_ASSERT(thread_map_.find(thrd.noref()) != thread_map_.end()); + HPX_ASSERT( + &get_thread_id_data(thrd)->get_queue() == this); - if (&ec != &throws) - ec = make_success_code(); - return; + // push the new thread in the pending thread queue + if (schedule_now) + { + // return the thread_id_ref of the newly created thread + if (id) + { + *id = thrd; } + schedule_thread(HPX_MOVE(thrd)); } - - // if the initial state is not pending, delayed creation will - // fail as the newly created thread would go out of scope right - // away (can't be scheduled). - if (data.initial_state != thread_schedule_state::pending) + else { - HPX_THROW_EXCEPTION(bad_parameter, - "thread_queue::create_thread", - "staged tasks must have 'pending' as their initial state"); + // if the thread should not be scheduled the id must be returned + // to the caller as otherwise the thread would go out of scope + // right away. + HPX_ASSERT(id != nullptr); + *id = HPX_MOVE(thrd); } - // do not execute the work, but register a task description for - // later thread creation - ++new_tasks_count_.data_; - - task_description* td = task_description_alloc_.allocate(1); -#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME - new (td) task_description{ - HPX_MOVE(data), hpx::chrono::high_resolution_clock::now()}; -#else - new (td) task_description{HPX_MOVE(data)}; //-V106 -#endif - new_tasks_.push(td); if (&ec != &throws) ec = make_success_code(); + return; } void move_work_items_from(thread_queue* src, std::int64_t count) @@ -767,49 +595,15 @@ namespace hpx { namespace threads { namespace policies { } #endif - bool finished = count == ++work_items_count_.data_; + ++work_items_count_.data_; work_items_.push(trd); - if (finished) + if (--count == 0) break; } } - void move_task_items_from(thread_queue* src, std::int64_t count) - { - task_description* task = nullptr; - while (src->new_tasks_.pop(task)) - { -#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME - if (get_maintain_queue_wait_times_enabled()) - { - std::int64_t now = - hpx::chrono::high_resolution_clock::now(); - src->new_tasks_wait_ += now - task->waittime; - ++src->new_tasks_wait_count_; - task->waittime = now; - } -#endif - - bool finish = count == ++new_tasks_count_.data_; - - // Decrement only after the local new_tasks_count_ has - // been incremented - --src->new_tasks_count_.data_; - - if (new_tasks_.push(task)) - { - if (finish) - break; - } - else - { - --new_tasks_count_.data_; - } - } - } - - /// Return the next thread to be executed, return false if none is - /// available + // Return the next thread to be executed, return false if none is + // available bool get_next_thread(threads::thread_id_ref_type& thrd, bool allow_stealing = false, bool steal = false) HPX_HOT { @@ -853,7 +647,7 @@ namespace hpx { namespace threads { namespace policies { return false; } - /// Schedule the passed thread + // Schedule the passed thread void schedule_thread( threads::thread_id_ref_type thrd, bool other_end = false) { @@ -863,13 +657,13 @@ namespace hpx { namespace threads { namespace policies { hpx::chrono::high_resolution_clock::now()}, other_end); #else - // detach the thread from the id_ref without decrementing - // the reference count + // detach the thread from the id_ref without decrementing the + // reference count work_items_.push(thrd.detach(), other_end); #endif } - /// Destroy the passed thread as it has been terminated + // Destroy the passed thread as it has been terminated void destroy_thread(threads::thread_data* thrd) { HPX_ASSERT(&thrd->get_queue() == this); @@ -884,7 +678,7 @@ namespace hpx { namespace threads { namespace policies { } /////////////////////////////////////////////////////////////////////// - /// Return the number of existing threads with the given state. + // Return the number of existing threads with the given state. std::int64_t get_thread_count( thread_schedule_state state = thread_schedule_state::unknown) const { @@ -892,12 +686,11 @@ namespace hpx { namespace threads { namespace policies { return terminated_items_count_; if (thread_schedule_state::staged == state) - return new_tasks_count_.data_; + return 0; if (thread_schedule_state::unknown == state) { - return thread_map_count_ + new_tasks_count_.data_ - - terminated_items_count_; + return thread_map_count_ - terminated_items_count_; } // acquire lock only if absolutely necessary @@ -987,110 +780,25 @@ namespace hpx { namespace threads { namespace policies { return true; } - /// This is a function which gets called periodically by the thread - /// manager to allow for maintenance tasks to be executed in the - /// scheduler. Returns true if the OS thread calling this function - /// has to be terminated (i.e. no more work has to be done). - inline bool wait_or_add_new( - bool, std::size_t& added, bool steal = false) HPX_HOT + constexpr bool wait_or_add_new( + bool, std::size_t&, bool = false) noexcept { - if (0 == new_tasks_count_.data_.load(std::memory_order_relaxed)) - { - return true; - } - - // No obvious work has to be done, so a lock won't hurt too much. - // - // We prefer to exit this function (some kind of very short - // busy waiting) to blocking on this lock. Locking fails either - // when a thread is currently doing thread maintenance, which - // means there might be new work, or the thread owning the lock - // just falls through to the cleanup work below (no work is available) - // in which case the current thread (which failed to acquire - // the lock) will just retry to enter this loop. - std::unique_lock lk(mtx_, std::try_to_lock); - if (!lk.owns_lock()) - return false; // avoid long wait on lock - - // stop running after all HPX threads have been terminated - return !add_new_always(added, this, lk, steal); + return false; } - inline bool wait_or_add_new(bool running, std::size_t& added, - thread_queue* addfrom, bool steal = false) HPX_HOT + // This is a function which gets called periodically by the thread + // manager to allow for maintenance tasks to be executed in the + // scheduler. Returns true if the OS thread calling this function has to + // be terminated (i.e. no more work has to be done). + inline bool wait_or_add_new( + bool running, std::size_t&, thread_queue*, bool = false) HPX_HOT { - // try to generate new threads from task lists, but only if our - // own list of threads is empty - if (0 == work_items_count_.data_.load(std::memory_order_relaxed)) - { - // see if we can avoid grabbing the lock below - - // don't try to steal if there are only a few tasks left on - // this queue - std::int64_t new_tasks_count = - addfrom->new_tasks_count_.data_.load( - std::memory_order_relaxed); - bool enough_threads = - new_tasks_count >= parameters_.min_tasks_to_steal_staged_; - - if (running && !enough_threads) - { - if (new_tasks_count != 0) - { - LTM_(debug).format( - "thread_queue::wait_or_add_new: not enough threads " - "to steal from queue {} to queue {}, have {} but " - "need at least {}", - addfrom, this, new_tasks_count, - parameters_.min_tasks_to_steal_staged_); - } - - return false; - } - - // No obvious work has to be done, so a lock won't hurt too much. - // - // We prefer to exit this function (some kind of very short - // busy waiting) to blocking on this lock. Locking fails either - // when a thread is currently doing thread maintenance, which - // means there might be new work, or the thread owning the lock - // just falls through to the cleanup work below (no work is available) - // in which case the current thread (which failed to acquire - // the lock) will just retry to enter this loop. - std::unique_lock lk(mtx_, std::try_to_lock); - if (!lk.owns_lock()) - return false; // avoid long wait on lock - - // stop running after all HPX threads have been terminated - bool added_new = add_new_always(added, addfrom, lk, steal); - if (!added_new) - { - // Before exiting each of the OS threads deletes the - // remaining terminated HPX threads - // REVIEW: Should we be doing this if we are stealing? - bool canexit = cleanup_terminated_locked(true); - if (!running && canexit) - { - // we don't have any registered work items anymore - //do_some_work(); // notify possibly waiting threads - return true; // terminate scheduling loop - } - return false; - } - else - { - cleanup_terminated_locked(); - return false; - } - } - bool canexit = cleanup_terminated(true); if (!running && canexit) { // we don't have any registered work items anymore return true; // terminate scheduling loop } - return false; } @@ -1152,9 +860,9 @@ namespace hpx { namespace threads { namespace policies { thread_heap_small_.emplace_back(p); } } - void on_stop_thread(std::size_t /* num_thread */) {} - void on_error( - std::size_t /* num_thread */, std::exception_ptr const& /* e */) + + constexpr void on_stop_thread(std::size_t) noexcept {} + constexpr void on_error(std::size_t, std::exception_ptr const&) noexcept { } @@ -1173,6 +881,7 @@ namespace hpx { namespace threads { namespace policies { #ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME // overall wait time of work items std::atomic work_items_wait_; + // overall number of work items in queue std::atomic work_items_wait_count_; #endif @@ -1181,15 +890,6 @@ namespace hpx { namespace threads { namespace policies { // count of terminated items std::atomic terminated_items_count_; - task_items_type new_tasks_; // list of new tasks to run - -#ifdef HPX_HAVE_THREAD_QUEUE_WAITTIME - // overall wait time of new tasks - std::atomic new_tasks_wait_; - // overall number tasks waited - std::atomic new_tasks_wait_count_; -#endif - thread_heap_type thread_heap_small_; thread_heap_type thread_heap_medium_; thread_heap_type thread_heap_large_; @@ -1197,7 +897,6 @@ namespace hpx { namespace threads { namespace policies { thread_heap_type thread_heap_nostack_; #ifdef HPX_HAVE_THREAD_CREATION_AND_CLEANUP_RATES - std::uint64_t add_new_time_; std::uint64_t cleanup_terminated_time_; #endif @@ -1210,12 +909,9 @@ namespace hpx { namespace threads { namespace policies { // count of work_items stolen from this queue std::atomic stolen_from_pending_; - // count of new_tasks stolen from this queue - std::atomic stolen_from_staged_; + // count of work_items stolen to this queue from other queues std::atomic stolen_to_pending_; - // count of new_tasks stolen to this queue from other queues - std::atomic stolen_to_staged_; #endif // count of new tasks to run, separate to new cache line to avoid false // sharing @@ -1224,12 +920,4 @@ namespace hpx { namespace threads { namespace policies { // count of active work items util::cache_line_data> work_items_count_; }; - - /////////////////////////////////////////////////////////////////////////// - template - util::internal_allocator::task_description> - thread_queue::task_description_alloc_; -}}} // namespace hpx::threads::policies +} // namespace hpx::threads::policies diff --git a/libs/core/threading_base/include/hpx/threading_base/thread_data.hpp b/libs/core/threading_base/include/hpx/threading_base/thread_data.hpp index a45d827f4fb3..ef55e8b37d31 100644 --- a/libs/core/threading_base/include/hpx/threading_base/thread_data.hpp +++ b/libs/core/threading_base/include/hpx/threading_base/thread_data.hpp @@ -665,12 +665,13 @@ namespace hpx { namespace threads { }; HPX_FORCEINLINE thread_data* get_thread_id_data( - thread_id_ref_type const& tid) + thread_id_ref_type const& tid) noexcept { return static_cast(tid.get().get()); } - HPX_FORCEINLINE thread_data* get_thread_id_data(thread_id_type const& tid) + HPX_FORCEINLINE thread_data* get_thread_id_data( + thread_id_type const& tid) noexcept { return static_cast(tid.get()); } diff --git a/libs/core/util/include/hpx/util/get_and_reset_value.hpp b/libs/core/util/include/hpx/util/get_and_reset_value.hpp index e592d12127fa..2a2e4ed221c5 100644 --- a/libs/core/util/include/hpx/util/get_and_reset_value.hpp +++ b/libs/core/util/include/hpx/util/get_and_reset_value.hpp @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2013 Hartmut Kaiser +// Copyright (c) 2007-2022 Hartmut Kaiser // // SPDX-License-Identifier: BSL-1.0 // Distributed under the Boost Software License, Version 1.0. (See accompanying @@ -8,6 +8,7 @@ #include #include +#include #include namespace hpx { namespace util { @@ -39,12 +40,14 @@ namespace hpx { namespace util { } inline std::vector get_and_reset_value( - std::vector& value, bool reset) noexcept + std::vector& value, bool reset) { - std::vector result = value; if (reset) - value.clear(); - - return result; + { + std::vector result; + std::swap(result, value); + return result; + } + return value; } }} // namespace hpx::util