Skip to content

Commit

Permalink
Multi threaded cluster finder (#117)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikfrojdh authored Jan 14, 2025
2 parents 72d10b7 + d0f435a commit e1cc774
Show file tree
Hide file tree
Showing 16 changed files with 429 additions and 184 deletions.
23 changes: 1 addition & 22 deletions docs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,7 @@ set(SPHINX_BUILD ${CMAKE_CURRENT_BINARY_DIR})


file(GLOB SPHINX_SOURCE_FILES CONFIGURE_DEPENDS "src/*.rst")
# set(SPHINX_SOURCE_FILES
# src/index.rst
# src/Installation.rst
# src/Requirements.rst
# src/NDArray.rst
# src/NDView.rst
# src/File.rst
# src/Frame.rst
# src/Dtype.rst
# src/ClusterFinder.rst
# src/ClusterFile.rst
# src/Pedestal.rst
# src/RawFile.rst
# src/RawSubFile.rst
# src/RawMasterFile.rst
# src/VarClusterFinder.rst
# src/pyVarClusterFinder.rst
# src/pyFile.rst
# src/pyCtbRawFile.rst
# src/pyRawFile.rst
# src/pyRawMasterFile.rst
# )



foreach(filename ${SPHINX_SOURCE_FILES})
Expand Down
7 changes: 7 additions & 0 deletions docs/src/ClusterFinderMT.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
ClusterFinderMT
==================


.. doxygenclass:: aare::ClusterFinderMT
:members:
:undoc-members:
2 changes: 2 additions & 0 deletions docs/src/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ AARE
pyFile
pyCtbRawFile
pyClusterFile
pyClusterVector
pyRawFile
pyRawMasterFile
pyVarClusterFinder
Expand All @@ -45,6 +46,7 @@ AARE
File
Dtype
ClusterFinder
ClusterFinderMT
ClusterFile
ClusterVector
Pedestal
Expand Down
33 changes: 33 additions & 0 deletions docs/src/pyClusterVector.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
ClusterVector
================

The ClusterVector, holds clusters from the ClusterFinder. Since it is templated
in C++ we use a suffix indicating the data type in python. The suffix is
``_i`` for integer, ``_f`` for float, and ``_d`` for double.

At the moment the functionality from python is limited and it is not supported
to push_back clusters to the vector. The intended use case is to pass it to
C++ functions that support the ClusterVector or to view it as a numpy array.

**View ClusterVector as numpy array**

.. code:: python
from aare import ClusterFile
with ClusterFile("path/to/file") as f:
cluster_vector = f.read_frame()
# Create a copy of the cluster data in a numpy array
clusters = np.array(cluster_vector)
# Avoid copying the data by passing copy=False
clusters = np.array(cluster_vector, copy = False)
.. py:currentmodule:: aare
.. autoclass:: ClusterVector_i
:members:
:undoc-members:
:show-inheritance:
:inherited-members:
9 changes: 8 additions & 1 deletion include/aare/ClusterFile.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ typedef enum {
pTopRight = 8
} pixel;

struct Eta2 {
double x;
double y;
corner c;
};

struct ClusterAnalysis {
uint32_t c;
int32_t tot;
Expand Down Expand Up @@ -74,7 +80,8 @@ int analyze_data(int32_t *data, int32_t *t2, int32_t *t3, char *quad,
int analyze_cluster(Cluster3x3& cl, int32_t *t2, int32_t *t3, char *quad,
double *eta2x, double *eta2y, double *eta3x, double *eta3y);


NDArray<double, 2> calculate_eta2( ClusterVector<int>& clusters);
std::array<double,2> calculate_eta2( Cluster3x3& cl);
Eta2 calculate_eta2( Cluster3x3& cl);

} // namespace aare
1 change: 1 addition & 0 deletions include/aare/ClusterFinder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class ClusterFinder {

NDArray<PEDESTAL_TYPE, 2> pedestal() { return m_pedestal.mean(); }
NDArray<PEDESTAL_TYPE, 2> noise() { return m_pedestal.std(); }
void clear_pedestal() { m_pedestal.clear(); }

/**
* @brief Move the clusters from the ClusterVector in the ClusterFinder to a
Expand Down
94 changes: 69 additions & 25 deletions include/aare/ClusterFinderMT.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
#include <thread>
#include <vector>

#include "aare/ClusterFinder.hpp"
#include "aare/NDArray.hpp"
#include "aare/ProducerConsumerQueue.hpp"
#include "aare/ClusterFinder.hpp"

namespace aare {

Expand All @@ -22,6 +22,14 @@ struct FrameWrapper {
NDArray<uint16_t, 2> data;
};

/**
* @brief ClusterFinderMT is a multi-threaded version of ClusterFinder. It uses
* a producer-consumer queue to distribute the frames to the threads. The
* clusters are collected in a single output queue.
* @tparam FRAME_TYPE type of the frame data
* @tparam PEDESTAL_TYPE type of the pedestal data
* @tparam CT type of the cluster data
*/
template <typename FRAME_TYPE = uint16_t, typename PEDESTAL_TYPE = double,
typename CT = int32_t>
class ClusterFinderMT {
Expand All @@ -43,31 +51,28 @@ class ClusterFinderMT {
std::atomic<bool> m_stop_requested{false};
std::atomic<bool> m_processing_threads_stopped{true};

/**
* @brief Function called by the processing threads. It reads the frames
* from the input queue and processes them.
*/
void process(int thread_id) {
auto cf = m_cluster_finders[thread_id].get();
auto q = m_input_queues[thread_id].get();
// TODO! Avoid indexing into the vector every time
fmt::print("Thread {} started\n", thread_id);
// TODO! is this check enough to make sure we process all the frames?
bool realloc_same_capacity = true;

while (!m_stop_requested || !q->isEmpty()) {
if (FrameWrapper *frame = q->frontPtr(); frame != nullptr) {
// fmt::print("Thread {} got frame {}, type: {}\n", thread_id,
// frame->frame_number, static_cast<int>(frame->type));

switch (frame->type) {
case FrameType::DATA:
cf->find_clusters(frame->data.view(), frame->frame_number);
m_output_queues[thread_id]->write(cf->steal_clusters());

m_output_queues[thread_id]->write(cf->steal_clusters(realloc_same_capacity));
break;

case FrameType::PEDESTAL:
m_cluster_finders[thread_id]->push_pedestal_frame(
frame->data.view());
break;

default:
break;
}

// frame is processed now discard it
Expand All @@ -76,7 +81,6 @@ class ClusterFinderMT {
std::this_thread::sleep_for(m_default_wait);
}
}
fmt::print("Thread {} stopped\n", thread_id);
}

/**
Expand All @@ -101,11 +105,19 @@ class ClusterFinderMT {
}

public:
/**
* @brief Construct a new ClusterFinderMT object
* @param image_size size of the image
* @param cluster_size size of the cluster
* @param nSigma number of sigma above the pedestal to consider a photon
* @param capacity initial capacity of the cluster vector. Should match
* expected number of clusters in a frame per frame.
* @param n_threads number of threads to use
*/
ClusterFinderMT(Shape<2> image_size, Shape<2> cluster_size,
PEDESTAL_TYPE nSigma = 5.0, size_t capacity = 2000,
size_t n_threads = 3)
: m_n_threads(n_threads) {
fmt::print("ClusterFinderMT: using {} threads\n", n_threads);
for (size_t i = 0; i < n_threads; i++) {
m_cluster_finders.push_back(
std::make_unique<ClusterFinder<FRAME_TYPE, PEDESTAL_TYPE, CT>>(
Expand All @@ -115,39 +127,48 @@ class ClusterFinderMT {
m_input_queues.emplace_back(std::make_unique<InputQueue>(200));
m_output_queues.emplace_back(std::make_unique<OutputQueue>(200));
}

//TODO! Should we start automatically?
start();
}

/**
* @brief Return the sink queue where all the clusters are collected
* @warning You need to empty this queue otherwise the cluster finder will wait forever
*/
ProducerConsumerQueue<ClusterVector<int>> *sink() { return &m_sink; }

/**
* @brief Start all threads
* @brief Start all processing threads
*/

void start() {
m_processing_threads_stopped = false;
m_stop_requested = false;

for (size_t i = 0; i < m_n_threads; i++) {
m_threads.push_back(
std::thread(&ClusterFinderMT::process, this, i));
}
m_processing_threads_stopped = false;

m_collect_thread = std::thread(&ClusterFinderMT::collect, this);
}

/**
* @brief Stop all threads
* @brief Stop all processing threads
*/
void stop() {
m_stop_requested = true;

for (auto &thread : m_threads) {
thread.join();
}
m_threads.clear();

m_processing_threads_stopped = true;
m_collect_thread.join();
}

/**
* @brief Wait for all the queues to be empty
* @brief Wait for all the queues to be empty. Mostly used for timing tests.
*/
void sync() {
for (auto &q : m_input_queues) {
Expand Down Expand Up @@ -194,24 +215,47 @@ class ClusterFinderMT {
m_current_thread++;
}

auto pedestal() {
void clear_pedestal() {
if (!m_processing_threads_stopped) {
throw std::runtime_error("ClusterFinderMT is still running");
}
for (auto &cf : m_cluster_finders) {
cf->clear_pedestal();
}
}

/**
* @brief Return the pedestal currently used by the cluster finder
* @param thread_index index of the thread
*/
auto pedestal(size_t thread_index = 0) {
if (m_cluster_finders.empty()) {
throw std::runtime_error("No cluster finders available");
}
if(!m_processing_threads_stopped){
if (!m_processing_threads_stopped) {
throw std::runtime_error("ClusterFinderMT is still running");
}
return m_cluster_finders[0]->pedestal();
if (thread_index >= m_cluster_finders.size()) {
throw std::runtime_error("Thread index out of range");
}
return m_cluster_finders[thread_index]->pedestal();
}

auto noise() {
/**
* @brief Return the noise currently used by the cluster finder
* @param thread_index index of the thread
*/
auto noise(size_t thread_index = 0) {
if (m_cluster_finders.empty()) {
throw std::runtime_error("No cluster finders available");
}
if(!m_processing_threads_stopped){
if (!m_processing_threads_stopped) {
throw std::runtime_error("ClusterFinderMT is still running");
}
return m_cluster_finders[0]->noise();
if (thread_index >= m_cluster_finders.size()) {
throw std::runtime_error("Thread index out of range");
}
return m_cluster_finders[thread_index]->noise();
}

// void push(FrameWrapper&& frame) {
Expand Down
Loading

0 comments on commit e1cc774

Please sign in to comment.