Skip to content

Commit

Permalink
Adding taskflow w/o coloring
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanMoulday committed Jun 21, 2024
1 parent 8be8fa7 commit 79d0fc7
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 10 deletions.
203 changes: 202 additions & 1 deletion include/deal.II/base/work_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,163 @@ namespace WorkStream
# endif // DEAL_II_WITH_TBB



# ifdef DEAL_II_WITH_TASKFLOW
/**
* Mostly a copy of the 2nd implementation of the Workstream paper taking
* advantage of thread local lists for re-use. Uses taskflow for task
* scheduling rather than TBB. Currently does not support chunking.
*/

namespace taskflow_no_coloring
{
template <typename Worker,
typename Copier,
typename Iterator,
typename ScratchData,
typename CopyData>

/**
* The last two arguments in this function are for chunking support which
* currently does not exist but ideally will later. For now they are
* ignored but still here to permit existing programs to function
*/
void
run(const Iterator &begin,
const typename identity<Iterator>::type &end,
Worker worker,
Copier copier,
const ScratchData &sample_scratch_data,
const CopyData &sample_copy_data,
const unsigned int queue_length = 2 * MultithreadInfo::n_threads(),
const unsigned int chunk_size = 8)

{
struct ScratchDataObjects
{
std::unique_ptr<ScratchData> scratch_data;
bool currently_in_use;

/**
* Default constructor.
*/
ScratchDataObjects()
: currently_in_use(false)
{}

ScratchDataObjects(std::unique_ptr<ScratchData> &&p,
const bool in_use)
: scratch_data(std::move(p))
, currently_in_use(in_use)
{}

// Provide a copy constructor that actually doesn't copy the
// internal state. This makes handling ScratchAndCopyDataObjects
// easier to handle with STL containers.
ScratchDataObjects(const ScratchDataObjects &)
: currently_in_use(false)
{}
};

tf::Executor &executor = MultithreadInfo::get_taskflow_executor();
tf::Taskflow taskflow;

using ScratchDataList = std::list<ScratchDataObjects>;

Threads::ThreadLocalStorage<ScratchDataList> data;

tf::Task last_copier;

// Silence unused variable arguments.
(void)queue_length;
(void)chunk_size;

// This is used to connect each worker to its copier as communication
// between tasks is not supported.
unsigned int idx = 0;

std::vector<std::unique_ptr<CopyData>> copy_datas;

for (Iterator i = begin; i != end; ++i, ++idx)
{
copy_datas.emplace_back();
// Create a worker task.
auto worker_task =
taskflow
.emplace([it = i,
idx,
&data,
&sample_scratch_data,
&sample_copy_data,
&copy_datas,
&worker]() {
ScratchData *scratch_data = nullptr;

ScratchDataList &scratch_data_list = data.get();
// See if there is an unused object. if so,
// grab it and mark it as used.
for (auto &p : scratch_data_list)
{
if (p.currently_in_use == false)
{
scratch_data = p.scratch_data.get();
p.currently_in_use = true;
break;
}
}
// If no element in the list was found, create
// one and mark it as used.
if (scratch_data == nullptr)
{
scratch_data_list.emplace_back(
std::make_unique<ScratchData>(sample_scratch_data),
true);
scratch_data =
scratch_data_list.back().scratch_data.get();
}

// Create a unique copy data object where this
// worker's work will be stored.
auto &copy = copy_datas[idx];
copy = std::make_unique<CopyData>(sample_copy_data);
worker(it, *scratch_data, *copy.get());

// Find our currently used scratch data and
// mark it as unused.
for (auto &p : scratch_data_list)
{
if (p.scratch_data.get() == scratch_data)
{
Assert(p.currently_in_use == true,
ExcInternalError());
p.currently_in_use = false;
}
}
})
.name("worker");

// Create a copier task.
tf::Task copier_task = taskflow
.emplace([idx, &copy_datas, &copier]() {
copier(*copy_datas[idx].get());
copy_datas[idx].reset();
})
.name("copy");

// Ensure the copy task runs after the worker task.
worker_task.precede(copier_task);

// Ensure that only one copy task can run at a time.
if (!last_copier.empty())
last_copier.precede(copier_task);
last_copier = copier_task;
}

executor.run(taskflow).wait();
}
} // namespace taskflow_no_coloring
# endif

/**
* A reference implementation without using multithreading to be used if we
* don't have multithreading support or if the user requests to run things
Expand Down Expand Up @@ -1144,7 +1301,51 @@ namespace WorkStream

if (MultithreadInfo::n_threads() > 1)
{
# ifdef DEAL_II_WITH_TBB
# ifdef DEAL_II_WITH_TASKFLOW
if (static_cast<const std::function<void(const CopyData &)> &>(copier))
{
// If we have a copier, run the algorithm:
internal::taskflow_no_coloring::run(begin,
end,
worker,
copier,
sample_scratch_data,
sample_copy_data,
queue_length,
chunk_size);
}
else
{
// There is no copier function. in this case, we have an
// embarrassingly parallel problem where we can
// essentially apply parallel_for. because parallel_for
// requires subdividing the range for which operator- is
// necessary between iterators, it is often inefficient to
// apply it directly to cell ranges and similar iterator
// types for which operator- is expensive or, in fact,
// nonexistent. rather, in that case, we simply copy the
// iterators into a large array and use operator- on
// iterators to this array of iterators.
//
// instead of duplicating code, this is essentially the
// same situation we have in the colored implementation below, so we
// just defer to that place
std::vector<std::vector<Iterator>> all_iterators(1);
for (Iterator p = begin; p != end; ++p)
all_iterators[0].push_back(p);

run(all_iterators,
worker,
copier,
sample_scratch_data,
sample_copy_data,
queue_length,
chunk_size);
}

// exit this function to not run the sequential version below:
return;
# elif defined(DEAL_II_WITH_TBB)
if (static_cast<const std::function<void(const CopyData &)> &>(copier))
{
// If we have a copier, run the algorithm:
Expand Down
6 changes: 3 additions & 3 deletions tests/mpi/mesh_worker_01.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@ test()
int
main(int argc, char **argv)
{
Utilities::MPI::MPI_InitFinalize mpi_initialization(
argc, argv, testing_max_num_threads());
MPILogInitAll log;
// Disable multithreading so that text output order is consistent
Utilities::MPI::MPI_InitFinalize mpi_initialization(argc, argv, 1);
MPILogInitAll log;

test<2>();
}
6 changes: 3 additions & 3 deletions tests/mpi/mesh_worker_02.cc
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,9 @@ test()
int
main(int argc, char **argv)
{
Utilities::MPI::MPI_InitFinalize mpi_initialization(
argc, argv, testing_max_num_threads());
MPILogInitAll log;
// Disable multithreading so that text output order is consistent
Utilities::MPI::MPI_InitFinalize mpi_initialization(argc, argv, 1);
MPILogInitAll log;

test<2>();
}
6 changes: 3 additions & 3 deletions tests/mpi/mesh_worker_04.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ test()
int
main(int argc, char **argv)
{
Utilities::MPI::MPI_InitFinalize mpi_initialization(
argc, argv, testing_max_num_threads());
MPILogInitAll log;
// Disable multithreading so that text output order is consistent
Utilities::MPI::MPI_InitFinalize mpi_initialization(argc, argv, 1);
MPILogInitAll log;

test<2>();
}

0 comments on commit 79d0fc7

Please sign in to comment.