From dc9e10016dd76ba4778a3fdf82c8faf5fed016de Mon Sep 17 00:00:00 2001 From: froejdh_e Date: Wed, 8 Jan 2025 16:45:24 +0100 Subject: [PATCH 1/2] WIP --- include/aare/CircularFifo.hpp | 97 ++++++++++++ include/aare/ClusterCollector.hpp | 52 +++++++ include/aare/ClusterFinder.hpp | 14 -- include/aare/ClusterFinderMT.hpp | 189 +++++++++++++++++++++++ include/aare/ClusterVector.hpp | 15 +- include/aare/ProducerConsumerQueue.hpp | 203 +++++++++++++++++++++++++ python/examples/play.py | 66 +++++--- python/src/cluster.hpp | 56 +++++++ python/src/module.cpp | 3 + 9 files changed, 661 insertions(+), 34 deletions(-) create mode 100644 include/aare/CircularFifo.hpp create mode 100644 include/aare/ClusterCollector.hpp create mode 100644 include/aare/ClusterFinderMT.hpp create mode 100644 include/aare/ProducerConsumerQueue.hpp diff --git a/include/aare/CircularFifo.hpp b/include/aare/CircularFifo.hpp new file mode 100644 index 00000000..80980829 --- /dev/null +++ b/include/aare/CircularFifo.hpp @@ -0,0 +1,97 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "aare/ProducerConsumerQueue.hpp" + +namespace aare { + +template class CircularFifo { + uint32_t fifo_size; + aare::ProducerConsumerQueue free_slots; + aare::ProducerConsumerQueue filled_slots; + + public: + CircularFifo() : CircularFifo(100){}; + CircularFifo(uint32_t size) : fifo_size(size), free_slots(size + 1), filled_slots(size + 1) { + + // TODO! how do we deal with alignment for writing? alignas??? + // Do we give the user a chance to provide memory locations? + // Templated allocator? + for (size_t i = 0; i < fifo_size; ++i) { + free_slots.write(ItemType{}); + } + } + + bool next() { + // TODO! avoid default constructing ItemType + ItemType it; + if (!filled_slots.read(it)) + return false; + if (!free_slots.write(std::move(it))) + return false; + return true; + } + + ~CircularFifo() {} + + using value_type = ItemType; + + auto numFilledSlots() const noexcept { return filled_slots.sizeGuess(); } + auto numFreeSlots() const noexcept { return free_slots.sizeGuess(); } + auto isFull() const noexcept { return filled_slots.isFull(); } + + ItemType pop_free() { + ItemType v; + while (!free_slots.read(v)) + ; + return std::move(v); + // return v; + } + + bool try_pop_free(ItemType &v) { return free_slots.read(v); } + + ItemType pop_value(std::chrono::nanoseconds wait, std::atomic &stopped) { + ItemType v; + while (!filled_slots.read(v) && !stopped) { + std::this_thread::sleep_for(wait); + } + return std::move(v); + } + + ItemType pop_value() { + ItemType v; + while (!filled_slots.read(v)) + ; + return std::move(v); + } + + ItemType *frontPtr() { return filled_slots.frontPtr(); } + + // TODO! Add function to move item from filled to free to be used + // with the frontPtr function + + template void push_value(Args &&...recordArgs) { + while (!filled_slots.write(std::forward(recordArgs)...)) + ; + } + + template bool try_push_value(Args &&...recordArgs) { + return filled_slots.write(std::forward(recordArgs)...); + } + + template void push_free(Args &&...recordArgs) { + while (!free_slots.write(std::forward(recordArgs)...)) + ; + } + + template bool try_push_free(Args &&...recordArgs) { + return free_slots.write(std::forward(recordArgs)...); + } +}; + +} // namespace aare \ No newline at end of file diff --git a/include/aare/ClusterCollector.hpp b/include/aare/ClusterCollector.hpp new file mode 100644 index 00000000..07380623 --- /dev/null +++ b/include/aare/ClusterCollector.hpp @@ -0,0 +1,52 @@ +#pragma once +#include +#include + +#include "aare/ProducerConsumerQueue.hpp" +#include "aare/ClusterVector.hpp" +#include "aare/ClusterFinderMT.hpp" + +namespace aare { + +class ClusterCollector{ + ProducerConsumerQueue>* m_source; + std::atomic m_stop_requested{false}; + std::atomic m_stopped{true}; + std::chrono::milliseconds m_default_wait{1}; + std::thread m_thread; + std::vector> m_clusters; + + void process(){ + m_stopped = false; + fmt::print("ClusterCollector started\n"); + while (!m_stop_requested || !m_source->isEmpty()) { + if (ClusterVector *clusters = m_source->frontPtr(); + clusters != nullptr) { + m_clusters.push_back(std::move(*clusters)); + m_source->popFront(); + }else{ + std::this_thread::sleep_for(m_default_wait); + } + } + fmt::print("ClusterCollector stopped\n"); + m_stopped = true; + } + + public: + ClusterCollector(ClusterFinderMT* source){ + m_source = source->sink(); + m_thread = std::thread(&ClusterCollector::process, this); + } + void stop(){ + m_stop_requested = true; + m_thread.join(); + } + std::vector> steal_clusters(){ + if(!m_stopped){ + throw std::runtime_error("ClusterCollector is still running"); + } + return std::move(m_clusters); + } +}; + +} // namespace aare \ No newline at end of file diff --git a/include/aare/ClusterFinder.hpp b/include/aare/ClusterFinder.hpp index aa17d19b..2fe33a77 100644 --- a/include/aare/ClusterFinder.hpp +++ b/include/aare/ClusterFinder.hpp @@ -10,26 +10,12 @@ namespace aare { -/** enum to define the event types */ -enum class eventType { - PEDESTAL, /** pedestal */ - NEIGHBOUR, /** neighbour i.e. below threshold, but in the cluster of a - photon */ - PHOTON, /** photon i.e. above threshold */ - PHOTON_MAX, /** maximum of a cluster satisfying the photon conditions */ - NEGATIVE_PEDESTAL, /** negative value, will not be accounted for as pedestal - in order to avoid drift of the pedestal towards - negative values */ - UNDEFINED_EVENT = -1 /** undefined */ -}; - template class ClusterFinder { Shape<2> m_image_size; const int m_cluster_sizeX; const int m_cluster_sizeY; - // const PEDESTAL_TYPE m_threshold; const PEDESTAL_TYPE m_nSigma; const PEDESTAL_TYPE c2; const PEDESTAL_TYPE c3; diff --git a/include/aare/ClusterFinderMT.hpp b/include/aare/ClusterFinderMT.hpp new file mode 100644 index 00000000..d3cbaf63 --- /dev/null +++ b/include/aare/ClusterFinderMT.hpp @@ -0,0 +1,189 @@ +#pragma once +#include +#include +#include +#include +#include + +#include "aare/NDArray.hpp" +#include "aare/ProducerConsumerQueue.hpp" + +namespace aare { + +enum class FrameType { + DATA, + PEDESTAL, +}; + +struct FrameWrapper { + FrameType type; + uint64_t frame_number; + NDArray data; +}; + + +template +class ClusterFinderMT { + size_t m_current_thread{0}; + size_t m_n_threads{0}; + using Finder = ClusterFinder; + using InputQueue = ProducerConsumerQueue; + using OutputQueue = ProducerConsumerQueue>; + std::vector> m_input_queues; + std::vector> m_output_queues; + + + OutputQueue m_sink{1000}; //All clusters go into this queue + + + std::vector> m_cluster_finders; + std::vector m_threads; + std::thread m_collect_thread; + std::chrono::milliseconds m_default_wait{1}; + + std::atomic m_stop_requested{false}; + std::atomic m_processing_threads_stopped{false}; + + void process(int thread_id) { + auto cf = m_cluster_finders[thread_id].get(); + auto q = m_input_queues[thread_id].get(); + // TODO! Avoid indexing into the vector every time + fmt::print("Thread {} started\n", thread_id); + //TODO! is this check enough to make sure we process all the frames? + while (!m_stop_requested || !q->isEmpty()) { + if (FrameWrapper *frame = q->frontPtr(); + frame != nullptr) { + // fmt::print("Thread {} got frame {}, type: {}\n", thread_id, + // frame->frame_number, static_cast(frame->type)); + + switch (frame->type) { + case FrameType::DATA: + cf->find_clusters( + frame->data.view()); + m_output_queues[thread_id]->write(cf->steal_clusters()); + + break; + + case FrameType::PEDESTAL: + m_cluster_finders[thread_id]->push_pedestal_frame( + frame->data.view()); + break; + + default: + break; + } + + // frame is processed now discard it + m_input_queues[thread_id]->popFront(); + } else { + std::this_thread::sleep_for(m_default_wait); + } + + } + fmt::print("Thread {} stopped\n", thread_id); + } + + /** + * @brief Collect all the clusters from the output queues and write them to the sink + */ + void collect(){ + bool empty = true; + while(!m_stop_requested || !empty || !m_processing_threads_stopped){ + empty = true; + for (auto &queue : m_output_queues) { + if (!queue->isEmpty()) { + + while(!m_sink.write(std::move(*queue->frontPtr()))){ + std::this_thread::sleep_for(m_default_wait); + } + queue->popFront(); + empty = false; + } + } + } + } + + public: + ClusterFinderMT(Shape<2> image_size, Shape<2> cluster_size, + PEDESTAL_TYPE nSigma = 5.0, size_t capacity = 2000, + size_t n_threads = 3) + : m_n_threads(n_threads) { + fmt::print("ClusterFinderMT: using {} threads\n", n_threads); + for (size_t i = 0; i < n_threads; i++) { + m_cluster_finders.push_back( + std::make_unique>( + image_size, cluster_size, nSigma, capacity)); + } + for (size_t i = 0; i < n_threads; i++) { + m_input_queues.emplace_back(std::make_unique(200)); + m_output_queues.emplace_back(std::make_unique(200)); + + } + + start(); + } + + ProducerConsumerQueue> *sink() { return &m_sink; } + + void start(){ + for (size_t i = 0; i < m_n_threads; i++) { + m_threads.push_back( + std::thread(&ClusterFinderMT::process, this, i)); + } + m_collect_thread = std::thread(&ClusterFinderMT::collect, this); + } + + void stop() { + m_stop_requested = true; + for (auto &thread : m_threads) { + thread.join(); + } + m_processing_threads_stopped = true; + m_collect_thread.join(); + } + + void sync(){ + for (auto &q : m_input_queues) { + while(!q->isEmpty()){ + std::this_thread::sleep_for(m_default_wait); + } + } + } + + void push_pedestal_frame(NDView frame) { + FrameWrapper fw{FrameType::PEDESTAL, 0, NDArray(frame)}; + + for (auto &queue : m_input_queues) { + while (!queue->write(fw)) { + // fmt::print("push_pedestal_frame: queue full\n"); + std::this_thread::sleep_for(m_default_wait); + } + } + } + + void find_clusters(NDView frame) { + FrameWrapper fw{FrameType::DATA, 0, NDArray(frame)}; + while (!m_input_queues[m_current_thread%m_n_threads]->write(fw)) { + std::this_thread::sleep_for(m_default_wait); + } + m_current_thread++; + } + + ClusterVector steal_clusters(bool realloc_same_capacity = false) { + ClusterVector clusters(3,3); + for (auto &finder : m_cluster_finders) { + clusters += finder->steal_clusters(); + } + return clusters; + } + + + // void push(FrameWrapper&& frame) { + // //TODO! need to loop until we are successful + // auto rc = m_input_queue.write(std::move(frame)); + // fmt::print("pushed frame {}\n", rc); + // } +}; + +} // namespace aare \ No newline at end of file diff --git a/include/aare/ClusterVector.hpp b/include/aare/ClusterVector.hpp index 98d4b373..a1b3a626 100644 --- a/include/aare/ClusterVector.hpp +++ b/include/aare/ClusterVector.hpp @@ -22,6 +22,7 @@ template class ClusterVector { std::byte *m_data{}; size_t m_size{0}; size_t m_capacity; + uint64_t m_frame_number{0}; //TODO! Check frame number size and type /* Format string used in the python bindings to create a numpy array from the buffer @@ -39,7 +40,7 @@ template class ClusterVector { * @param cluster_size_y size of the cluster in y direction * @param capacity initial capacity of the buffer in number of clusters */ - ClusterVector(size_t cluster_size_x, size_t cluster_size_y, + ClusterVector(size_t cluster_size_x = 3, size_t cluster_size_y = 3, size_t capacity = 1024) : m_cluster_size_x(cluster_size_x), m_cluster_size_y(cluster_size_y), m_capacity(capacity) { @@ -108,7 +109,14 @@ template class ClusterVector { ptr); m_size++; } - + ClusterVector& operator+=(const ClusterVector& other){ + if (m_size + other.m_size > m_capacity) { + allocate_buffer(m_capacity + other.m_size); + } + std::copy(other.m_data, other.m_data + other.m_size * element_offset(), m_data + m_size * element_offset()); + m_size += other.m_size; + return *this; + } /** * @brief Sum the pixels in each cluster @@ -166,6 +174,9 @@ template class ClusterVector { return m_fmt_base; } + uint64_t frame_number() const { return m_frame_number; } + void set_frame_number(uint64_t frame_number) { m_frame_number = frame_number; } + private: void allocate_buffer(size_t new_capacity) { size_t num_bytes = element_offset() * new_capacity; diff --git a/include/aare/ProducerConsumerQueue.hpp b/include/aare/ProducerConsumerQueue.hpp new file mode 100644 index 00000000..426b9e2e --- /dev/null +++ b/include/aare/ProducerConsumerQueue.hpp @@ -0,0 +1,203 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// @author Bo Hu (bhu@fb.com) +// @author Jordan DeLong (delong.j@fb.com) + +// Changes made by PSD Detector Group: +// Copied: Line 34 constexpr std::size_t hardware_destructive_interference_size = 128; from folly/lang/Align.h +// Changed extension to .hpp +// Changed namespace to aare + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +constexpr std::size_t hardware_destructive_interference_size = 128; +namespace aare { + +/* + * ProducerConsumerQueue is a one producer and one consumer queue + * without locks. + */ +template struct ProducerConsumerQueue { + typedef T value_type; + + ProducerConsumerQueue(const ProducerConsumerQueue &) = delete; + ProducerConsumerQueue &operator=(const ProducerConsumerQueue &) = delete; + + + ProducerConsumerQueue(ProducerConsumerQueue &&other){ + size_ = other.size_; + records_ = other.records_; + other.records_ = nullptr; + readIndex_ = other.readIndex_.load(std::memory_order_acquire); + writeIndex_ = other.writeIndex_.load(std::memory_order_acquire); + } + ProducerConsumerQueue &operator=(ProducerConsumerQueue &&other){ + size_ = other.size_; + records_ = other.records_; + other.records_ = nullptr; + readIndex_ = other.readIndex_.load(std::memory_order_acquire); + writeIndex_ = other.writeIndex_.load(std::memory_order_acquire); + return *this; + } + + + ProducerConsumerQueue():ProducerConsumerQueue(2){}; + // size must be >= 2. + // + // Also, note that the number of usable slots in the queue at any + // given time is actually (size-1), so if you start with an empty queue, + // isFull() will return true after size-1 insertions. + explicit ProducerConsumerQueue(uint32_t size) + : size_(size), records_(static_cast(std::malloc(sizeof(T) * size))), readIndex_(0), writeIndex_(0) { + assert(size >= 2); + if (!records_) { + throw std::bad_alloc(); + } + } + + ~ProducerConsumerQueue() { + // We need to destruct anything that may still exist in our queue. + // (No real synchronization needed at destructor time: only one + // thread can be doing this.) + if (!std::is_trivially_destructible::value) { + size_t readIndex = readIndex_; + size_t endIndex = writeIndex_; + while (readIndex != endIndex) { + records_[readIndex].~T(); + if (++readIndex == size_) { + readIndex = 0; + } + } + } + + std::free(records_); + } + + template bool write(Args &&...recordArgs) { + auto const currentWrite = writeIndex_.load(std::memory_order_relaxed); + auto nextRecord = currentWrite + 1; + if (nextRecord == size_) { + nextRecord = 0; + } + if (nextRecord != readIndex_.load(std::memory_order_acquire)) { + new (&records_[currentWrite]) T(std::forward(recordArgs)...); + writeIndex_.store(nextRecord, std::memory_order_release); + return true; + } + + // queue is full + return false; + } + + // move (or copy) the value at the front of the queue to given variable + bool read(T &record) { + auto const currentRead = readIndex_.load(std::memory_order_relaxed); + if (currentRead == writeIndex_.load(std::memory_order_acquire)) { + // queue is empty + return false; + } + + auto nextRecord = currentRead + 1; + if (nextRecord == size_) { + nextRecord = 0; + } + record = std::move(records_[currentRead]); + records_[currentRead].~T(); + readIndex_.store(nextRecord, std::memory_order_release); + return true; + } + + // pointer to the value at the front of the queue (for use in-place) or + // nullptr if empty. + T *frontPtr() { + auto const currentRead = readIndex_.load(std::memory_order_relaxed); + if (currentRead == writeIndex_.load(std::memory_order_acquire)) { + // queue is empty + return nullptr; + } + return &records_[currentRead]; + } + + // queue must not be empty + void popFront() { + auto const currentRead = readIndex_.load(std::memory_order_relaxed); + assert(currentRead != writeIndex_.load(std::memory_order_acquire)); + + auto nextRecord = currentRead + 1; + if (nextRecord == size_) { + nextRecord = 0; + } + records_[currentRead].~T(); + readIndex_.store(nextRecord, std::memory_order_release); + } + + bool isEmpty() const { + return readIndex_.load(std::memory_order_acquire) == writeIndex_.load(std::memory_order_acquire); + } + + bool isFull() const { + auto nextRecord = writeIndex_.load(std::memory_order_acquire) + 1; + if (nextRecord == size_) { + nextRecord = 0; + } + if (nextRecord != readIndex_.load(std::memory_order_acquire)) { + return false; + } + // queue is full + return true; + } + + // * If called by consumer, then true size may be more (because producer may + // be adding items concurrently). + // * If called by producer, then true size may be less (because consumer may + // be removing items concurrently). + // * It is undefined to call this from any other thread. + size_t sizeGuess() const { + int ret = writeIndex_.load(std::memory_order_acquire) - readIndex_.load(std::memory_order_acquire); + if (ret < 0) { + ret += size_; + } + return ret; + } + + // maximum number of items in the queue. + size_t capacity() const { return size_ - 1; } + + private: + using AtomicIndex = std::atomic; + + char pad0_[hardware_destructive_interference_size]; + // const uint32_t size_; + uint32_t size_; + // T *const records_; + T* records_; + + alignas(hardware_destructive_interference_size) AtomicIndex readIndex_; + alignas(hardware_destructive_interference_size) AtomicIndex writeIndex_; + + char pad1_[hardware_destructive_interference_size - sizeof(AtomicIndex)]; +}; + +} // namespace aare \ No newline at end of file diff --git a/python/examples/play.py b/python/examples/play.py index 986b7180..9c07c995 100644 --- a/python/examples/play.py +++ b/python/examples/play.py @@ -13,36 +13,66 @@ base = Path('/mnt/sls_det_storage/matterhorn_data/aare_test_data/') f = File(base/'Moench03new/cu_half_speed_master_4.json') -cf = ClusterFinder((400,400), (3,3)) + + +from aare._aare import ClusterFinderMT, ClusterCollector + + +cf = ClusterFinderMT((400,400), (3,3), n_threads = 3) +collector = ClusterCollector(cf) + for i in range(1000): - cf.push_pedestal_frame(f.read_frame()) + img = f.read_frame() + cf.push_pedestal_frame(img) +print('Pedestal done') +cf.sync() + +for i in range(100): + img = f.read_frame() + cf.find_clusters(img) + + +# time.sleep(1) +cf.stop() +collector.stop() +cv = collector.steal_clusters() +print(f'Processed {len(cv)} frames') -fig, ax = plt.subplots() -im = ax.imshow(cf.pedestal()) -cf.pedestal() -cf.noise() +print('Done') -N = 500 -t0 = time.perf_counter() -hist1 = bh.Histogram(bh.axis.Regular(40, -2, 4000)) -f.seek(0) -t0 = time.perf_counter() -data = f.read_n(N) -t_elapsed = time.perf_counter()-t0 +# cf = ClusterFinder((400,400), (3,3)) +# for i in range(1000): +# cf.push_pedestal_frame(f.read_frame()) + +# fig, ax = plt.subplots() +# im = ax.imshow(cf.pedestal()) +# cf.pedestal() +# cf.noise() + + + +# N = 500 +# t0 = time.perf_counter() +# hist1 = bh.Histogram(bh.axis.Regular(40, -2, 4000)) +# f.seek(0) + +# t0 = time.perf_counter() +# data = f.read_n(N) +# t_elapsed = time.perf_counter()-t0 -n_bytes = data.itemsize*data.size +# n_bytes = data.itemsize*data.size -print(f'Reading {N} frames took {t_elapsed:.3f}s {N/t_elapsed:.0f} FPS, {n_bytes/1024**2:.4f} GB/s') +# print(f'Reading {N} frames took {t_elapsed:.3f}s {N/t_elapsed:.0f} FPS, {n_bytes/1024**2:.4f} GB/s') -for frame in data: - a = cf.find_clusters(frame) +# for frame in data: +# a = cf.find_clusters(frame) -clusters = cf.steal_clusters() +# clusters = cf.steal_clusters() # t_elapsed = time.perf_counter()-t0 # print(f'Clustering {N} frames took {t_elapsed:.2f}s {N/t_elapsed:.0f} FPS') diff --git a/python/src/cluster.hpp b/python/src/cluster.hpp index 5d220910..90c9f211 100644 --- a/python/src/cluster.hpp +++ b/python/src/cluster.hpp @@ -1,5 +1,7 @@ #include "aare/ClusterFinder.hpp" +#include "aare/ClusterFinderMT.hpp" #include "aare/ClusterVector.hpp" +#include "aare/ClusterCollector.hpp" #include "aare/NDView.hpp" #include "aare/Pedestal.hpp" #include "np_helper.hpp" @@ -7,11 +9,13 @@ #include #include #include +#include #include namespace py = pybind11; using pd_type = double; + template void define_cluster_vector(py::module &m, const std::string &typestr) { auto class_name = fmt::format("ClusterVector_{}", typestr); @@ -31,6 +35,9 @@ void define_cluster_vector(py::module &m, const std::string &typestr) { auto *vec = new std::vector(self.sum()); return return_vector(vec); }) + .def_property_readonly("capacity", &ClusterVector::capacity) + .def_property("frame_number", &ClusterVector::frame_number, + &ClusterVector::set_frame_number) .def_buffer([typestr](ClusterVector &self) -> py::buffer_info { return py::buffer_info( self.data(), /* Pointer to buffer */ @@ -45,6 +52,55 @@ void define_cluster_vector(py::module &m, const std::string &typestr) { }); } +void define_cluster_finder_mt_bindings(py::module &m) { + py::class_>(m, "ClusterFinderMT") + .def(py::init, Shape<2>, pd_type, size_t, size_t>(), + py::arg("image_size"), py::arg("cluster_size"), + py::arg("n_sigma") = 5.0, py::arg("capacity") = 1000, + py::arg("n_threads") = 3) + .def("push_pedestal_frame", + [](ClusterFinderMT &self, + py::array_t frame) { + auto view = make_view_2d(frame); + self.push_pedestal_frame(view); + }) + .def("find_clusters", + [](ClusterFinderMT &self, + py::array_t frame) { + auto view = make_view_2d(frame); + self.find_clusters(view); + return; + }) + .def("sync", &ClusterFinderMT::sync) + .def( + "steal_clusters", + [](ClusterFinderMT &self, + bool realloc_same_capacity) { + auto v = new ClusterVector( + self.steal_clusters(realloc_same_capacity)); + return v; + }, + py::arg("realloc_same_capacity") = false) + .def("stop", &ClusterFinderMT::stop); +} + + +void define_cluster_collector_bindings(py::module &m) { + py::class_(m, "ClusterCollector") + .def(py::init*>()) + .def("stop", &ClusterCollector::stop) + .def("steal_clusters", + [](ClusterCollector &self) { + auto v = new std::vector>( + self.steal_clusters()); + return v; + }, py::return_value_policy::take_ownership); + + + +} + + void define_cluster_finder_bindings(py::module &m) { py::class_>(m, "ClusterFinder") .def(py::init, Shape<2>, pd_type, size_t>(), diff --git a/python/src/module.cpp b/python/src/module.cpp index 14a686a1..0ef868b9 100644 --- a/python/src/module.cpp +++ b/python/src/module.cpp @@ -25,5 +25,8 @@ PYBIND11_MODULE(_aare, m) { define_pedestal_bindings(m, "Pedestal_d"); define_pedestal_bindings(m, "Pedestal_f"); define_cluster_finder_bindings(m); + define_cluster_finder_mt_bindings(m); define_cluster_file_io_bindings(m); + define_cluster_collector_bindings(m); + } \ No newline at end of file From cc95561eda6e10c015dab7949bb3d8baad98b9a4 Mon Sep 17 00:00:00 2001 From: froejdh_e Date: Thu, 9 Jan 2025 16:53:22 +0100 Subject: [PATCH 2/2] MultiThreaded Cluster finder --- conda-recipe/meta.yaml | 2 +- include/aare/ClusterFile.hpp | 7 +-- include/aare/ClusterFileSink.hpp | 56 +++++++++++++++++ include/aare/ClusterFinder.hpp | 4 +- include/aare/ClusterFinderMT.hpp | 101 +++++++++++++++++++++---------- include/aare/ClusterVector.hpp | 20 +++++- include/aare/File.hpp | 2 + pyproject.toml | 2 +- python/aare/__init__.py | 2 + python/examples/play.py | 15 ++--- python/src/cluster.hpp | 82 ++++++++++++++----------- python/src/cluster_file.hpp | 18 +++--- python/src/file.hpp | 33 ++++++++++ python/src/module.cpp | 1 + src/ClusterFile.cpp | 32 +++++----- src/File.cpp | 2 + 16 files changed, 268 insertions(+), 111 deletions(-) create mode 100644 include/aare/ClusterFileSink.hpp diff --git a/conda-recipe/meta.yaml b/conda-recipe/meta.yaml index dd2b6824..6eeec381 100644 --- a/conda-recipe/meta.yaml +++ b/conda-recipe/meta.yaml @@ -1,6 +1,6 @@ package: name: aare - version: 2025.1.7.dev0 #TODO! how to not duplicate this? + version: 2025.1.9.dev0 #TODO! how to not duplicate this? source: diff --git a/include/aare/ClusterFile.hpp b/include/aare/ClusterFile.hpp index a484560f..82740785 100644 --- a/include/aare/ClusterFile.hpp +++ b/include/aare/ClusterFile.hpp @@ -59,10 +59,9 @@ class ClusterFile { ClusterFile(const std::filesystem::path &fname, size_t chunk_size = 1000, const std::string &mode = "r"); ~ClusterFile(); - std::vector read_clusters(size_t n_clusters); - std::vector read_frame(int32_t &out_fnum); - void write_frame(int32_t frame_number, - const ClusterVector &clusters); + ClusterVector read_clusters(size_t n_clusters); + ClusterVector read_frame(); + void write_frame(const ClusterVector &clusters); std::vector read_cluster_with_cut(size_t n_clusters, double *noise_map, int nx, int ny); diff --git a/include/aare/ClusterFileSink.hpp b/include/aare/ClusterFileSink.hpp new file mode 100644 index 00000000..158fdebb --- /dev/null +++ b/include/aare/ClusterFileSink.hpp @@ -0,0 +1,56 @@ +#pragma once +#include +#include +#include + +#include "aare/ProducerConsumerQueue.hpp" +#include "aare/ClusterVector.hpp" +#include "aare/ClusterFinderMT.hpp" + +namespace aare{ + +class ClusterFileSink{ + ProducerConsumerQueue>* m_source; + std::atomic m_stop_requested{false}; + std::atomic m_stopped{true}; + std::chrono::milliseconds m_default_wait{1}; + std::thread m_thread; + std::ofstream m_file; + + + void process(){ + m_stopped = false; + fmt::print("ClusterFileSink started\n"); + while (!m_stop_requested || !m_source->isEmpty()) { + if (ClusterVector *clusters = m_source->frontPtr(); + clusters != nullptr) { + // Write clusters to file + int32_t frame_number = clusters->frame_number(); //TODO! Should we store frame number already as int? + uint32_t num_clusters = clusters->size(); + m_file.write(reinterpret_cast(&frame_number), sizeof(frame_number)); + m_file.write(reinterpret_cast(&num_clusters), sizeof(num_clusters)); + m_file.write(reinterpret_cast(clusters->data()), clusters->size() * clusters->item_size()); + m_source->popFront(); + }else{ + std::this_thread::sleep_for(m_default_wait); + } + } + fmt::print("ClusterFileSink stopped\n"); + m_stopped = true; + } + + public: + ClusterFileSink(ClusterFinderMT* source, const std::filesystem::path& fname){ + m_source = source->sink(); + m_thread = std::thread(&ClusterFileSink::process, this); + m_file.open(fname, std::ios::binary); + } + void stop(){ + m_stop_requested = true; + m_thread.join(); + m_file.close(); + } +}; + + +} // namespace aare \ No newline at end of file diff --git a/include/aare/ClusterFinder.hpp b/include/aare/ClusterFinder.hpp index 2fe33a77..4a06c311 100644 --- a/include/aare/ClusterFinder.hpp +++ b/include/aare/ClusterFinder.hpp @@ -64,13 +64,13 @@ class ClusterFinder { m_clusters = ClusterVector(m_cluster_sizeX, m_cluster_sizeY); return tmp; } - void find_clusters(NDView frame) { + void find_clusters(NDView frame, uint64_t frame_number = 0) { // // TODO! deal with even size clusters // // currently 3,3 -> +/- 1 // // 4,4 -> +/- 2 int dy = m_cluster_sizeY / 2; int dx = m_cluster_sizeX / 2; - + m_clusters.set_frame_number(frame_number); std::vector cluster_data(m_cluster_sizeX * m_cluster_sizeY); for (int iy = 0; iy < frame.shape(0); iy++) { for (int ix = 0; ix < frame.shape(1); ix++) { diff --git a/include/aare/ClusterFinderMT.hpp b/include/aare/ClusterFinderMT.hpp index d3cbaf63..9765512f 100644 --- a/include/aare/ClusterFinderMT.hpp +++ b/include/aare/ClusterFinderMT.hpp @@ -7,6 +7,7 @@ #include "aare/NDArray.hpp" #include "aare/ProducerConsumerQueue.hpp" +#include "aare/ClusterFinder.hpp" namespace aare { @@ -21,7 +22,6 @@ struct FrameWrapper { NDArray data; }; - template class ClusterFinderMT { @@ -32,10 +32,8 @@ class ClusterFinderMT { using OutputQueue = ProducerConsumerQueue>; std::vector> m_input_queues; std::vector> m_output_queues; - - - OutputQueue m_sink{1000}; //All clusters go into this queue + OutputQueue m_sink{1000}; // All clusters go into this queue std::vector> m_cluster_finders; std::vector m_threads; @@ -43,26 +41,24 @@ class ClusterFinderMT { std::chrono::milliseconds m_default_wait{1}; std::atomic m_stop_requested{false}; - std::atomic m_processing_threads_stopped{false}; + std::atomic m_processing_threads_stopped{true}; void process(int thread_id) { auto cf = m_cluster_finders[thread_id].get(); auto q = m_input_queues[thread_id].get(); // TODO! Avoid indexing into the vector every time fmt::print("Thread {} started\n", thread_id); - //TODO! is this check enough to make sure we process all the frames? - while (!m_stop_requested || !q->isEmpty()) { - if (FrameWrapper *frame = q->frontPtr(); - frame != nullptr) { + // TODO! is this check enough to make sure we process all the frames? + while (!m_stop_requested || !q->isEmpty()) { + if (FrameWrapper *frame = q->frontPtr(); frame != nullptr) { // fmt::print("Thread {} got frame {}, type: {}\n", thread_id, - // frame->frame_number, static_cast(frame->type)); + // frame->frame_number, static_cast(frame->type)); switch (frame->type) { case FrameType::DATA: - cf->find_clusters( - frame->data.view()); + cf->find_clusters(frame->data.view(), frame->frame_number); m_output_queues[thread_id]->write(cf->steal_clusters()); - + break; case FrameType::PEDESTAL: @@ -79,22 +75,22 @@ class ClusterFinderMT { } else { std::this_thread::sleep_for(m_default_wait); } - } fmt::print("Thread {} stopped\n", thread_id); } /** - * @brief Collect all the clusters from the output queues and write them to the sink + * @brief Collect all the clusters from the output queues and write them to + * the sink */ - void collect(){ + void collect() { bool empty = true; - while(!m_stop_requested || !empty || !m_processing_threads_stopped){ + while (!m_stop_requested || !empty || !m_processing_threads_stopped) { empty = true; for (auto &queue : m_output_queues) { if (!queue->isEmpty()) { - while(!m_sink.write(std::move(*queue->frontPtr()))){ + while (!m_sink.write(std::move(*queue->frontPtr()))) { std::this_thread::sleep_for(m_default_wait); } queue->popFront(); @@ -118,7 +114,6 @@ class ClusterFinderMT { for (size_t i = 0; i < n_threads; i++) { m_input_queues.emplace_back(std::make_unique(200)); m_output_queues.emplace_back(std::make_unique(200)); - } start(); @@ -126,14 +121,22 @@ class ClusterFinderMT { ProducerConsumerQueue> *sink() { return &m_sink; } - void start(){ + /** + * @brief Start all threads + */ + + void start() { for (size_t i = 0; i < m_n_threads; i++) { m_threads.push_back( std::thread(&ClusterFinderMT::process, this, i)); } + m_processing_threads_stopped = false; m_collect_thread = std::thread(&ClusterFinderMT::collect, this); } + /** + * @brief Stop all threads + */ void stop() { m_stop_requested = true; for (auto &thread : m_threads) { @@ -143,42 +146,74 @@ class ClusterFinderMT { m_collect_thread.join(); } - void sync(){ + /** + * @brief Wait for all the queues to be empty + */ + void sync() { for (auto &q : m_input_queues) { - while(!q->isEmpty()){ + while (!q->isEmpty()) { std::this_thread::sleep_for(m_default_wait); } } + for (auto &q : m_output_queues) { + while (!q->isEmpty()) { + std::this_thread::sleep_for(m_default_wait); + } + } + while (!m_sink.isEmpty()) { + std::this_thread::sleep_for(m_default_wait); + } } + /** + * @brief Push a pedestal frame to all the cluster finders. The frames is + * expected to be dark. No photon finding is done. Just pedestal update. + */ void push_pedestal_frame(NDView frame) { - FrameWrapper fw{FrameType::PEDESTAL, 0, NDArray(frame)}; + FrameWrapper fw{FrameType::PEDESTAL, 0, + NDArray(frame)}; // TODO! copies the data! for (auto &queue : m_input_queues) { while (!queue->write(fw)) { - // fmt::print("push_pedestal_frame: queue full\n"); std::this_thread::sleep_for(m_default_wait); } } } - void find_clusters(NDView frame) { - FrameWrapper fw{FrameType::DATA, 0, NDArray(frame)}; - while (!m_input_queues[m_current_thread%m_n_threads]->write(fw)) { + /** + * @brief Push the frame to the queue of the next available thread. Function + * returns once the frame is in a queue. + * @note Spin locks with a default wait if the queue is full. + */ + void find_clusters(NDView frame, uint64_t frame_number = 0) { + FrameWrapper fw{FrameType::DATA, frame_number, + NDArray(frame)}; // TODO! copies the data! + while (!m_input_queues[m_current_thread % m_n_threads]->write(fw)) { std::this_thread::sleep_for(m_default_wait); } m_current_thread++; } - ClusterVector steal_clusters(bool realloc_same_capacity = false) { - ClusterVector clusters(3,3); - for (auto &finder : m_cluster_finders) { - clusters += finder->steal_clusters(); + auto pedestal() { + if (m_cluster_finders.empty()) { + throw std::runtime_error("No cluster finders available"); + } + if(!m_processing_threads_stopped){ + throw std::runtime_error("ClusterFinderMT is still running"); + } + return m_cluster_finders[0]->pedestal(); + } + + auto noise() { + if (m_cluster_finders.empty()) { + throw std::runtime_error("No cluster finders available"); + } + if(!m_processing_threads_stopped){ + throw std::runtime_error("ClusterFinderMT is still running"); } - return clusters; + return m_cluster_finders[0]->noise(); } - // void push(FrameWrapper&& frame) { // //TODO! need to loop until we are successful // auto rc = m_input_queue.write(std::move(frame)); diff --git a/include/aare/ClusterVector.hpp b/include/aare/ClusterVector.hpp index a1b3a626..efad448c 100644 --- a/include/aare/ClusterVector.hpp +++ b/include/aare/ClusterVector.hpp @@ -41,9 +41,9 @@ template class ClusterVector { * @param capacity initial capacity of the buffer in number of clusters */ ClusterVector(size_t cluster_size_x = 3, size_t cluster_size_y = 3, - size_t capacity = 1024) + size_t capacity = 1024, uint64_t frame_number = 0) : m_cluster_size_x(cluster_size_x), m_cluster_size_y(cluster_size_y), - m_capacity(capacity) { + m_capacity(capacity), m_frame_number(frame_number) { allocate_buffer(capacity); } ~ClusterVector() { @@ -55,7 +55,7 @@ template class ClusterVector { ClusterVector(ClusterVector &&other) noexcept : m_cluster_size_x(other.m_cluster_size_x), m_cluster_size_y(other.m_cluster_size_y), m_data(other.m_data), - m_size(other.m_size), m_capacity(other.m_capacity) { + m_size(other.m_size), m_capacity(other.m_capacity), m_frame_number(other.m_frame_number) { other.m_data = nullptr; other.m_size = 0; other.m_capacity = 0; @@ -70,9 +70,11 @@ template class ClusterVector { m_data = other.m_data; m_size = other.m_size; m_capacity = other.m_capacity; + m_frame_number = other.m_frame_number; other.m_data = nullptr; other.m_size = 0; other.m_capacity = 0; + other.m_frame_number = 0; } return *this; } @@ -147,6 +149,12 @@ template class ClusterVector { return 2*sizeof(CoordType) + m_cluster_size_x * m_cluster_size_y * sizeof(T); } + + /** + * @brief Return the size in bytes of a single cluster + */ + size_t item_size() const { return element_offset(); } + /** * @brief Return the offset in bytes for the i-th cluster */ @@ -176,6 +184,12 @@ template class ClusterVector { uint64_t frame_number() const { return m_frame_number; } void set_frame_number(uint64_t frame_number) { m_frame_number = frame_number; } + void resize(size_t new_size) { + if (new_size > m_capacity) { + allocate_buffer(new_size); + } + m_size = new_size; + } private: void allocate_buffer(size_t new_capacity) { diff --git a/include/aare/File.hpp b/include/aare/File.hpp index 7aa30e1b..1cef8985 100644 --- a/include/aare/File.hpp +++ b/include/aare/File.hpp @@ -36,6 +36,8 @@ class File { File(File &&other) noexcept; File& operator=(File &&other) noexcept; ~File() = default; + + // void close(); //!< close the file Frame read_frame(); //!< read one frame from the file at the current position Frame read_frame(size_t frame_index); //!< read one frame at the position given by frame number diff --git a/pyproject.toml b/pyproject.toml index 35bdefba..1b75d029 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "scikit_build_core.build" [project] name = "aare" -version = "2025.1.7.dev0" +version = "2025.1.9.dev0" [tool.scikit-build] cmake.verbose = true diff --git a/python/aare/__init__.py b/python/aare/__init__.py index fb34c7a6..b0c9de22 100644 --- a/python/aare/__init__.py +++ b/python/aare/__init__.py @@ -8,6 +8,8 @@ from ._aare import ClusterFile from ._aare import hitmap +from ._aare import ClusterFinderMT, ClusterCollector, ClusterFileSink + from .CtbRawFile import CtbRawFile from .RawFile import RawFile from .ScanParameters import ScanParameters diff --git a/python/examples/play.py b/python/examples/play.py index 9c07c995..95da2e59 100644 --- a/python/examples/play.py +++ b/python/examples/play.py @@ -8,18 +8,19 @@ import boost_histogram as bh import time -from aare import File, ClusterFinder, VarClusterFinder +from aare import File, ClusterFinder, VarClusterFinder, ClusterFile base = Path('/mnt/sls_det_storage/matterhorn_data/aare_test_data/') f = File(base/'Moench03new/cu_half_speed_master_4.json') -from aare._aare import ClusterFinderMT, ClusterCollector +from aare._aare import ClusterFinderMT, ClusterCollector, ClusterFileSink cf = ClusterFinderMT((400,400), (3,3), n_threads = 3) -collector = ClusterCollector(cf) +# collector = ClusterCollector(cf) +out_file = ClusterFileSink(cf, "test.clust") for i in range(1000): img = f.read_frame() @@ -34,13 +35,13 @@ # time.sleep(1) cf.stop() -collector.stop() -cv = collector.steal_clusters() -print(f'Processed {len(cv)} frames') - +out_file.stop() print('Done') +cfile = ClusterFile("test.clust") + + # cf = ClusterFinder((400,400), (3,3)) diff --git a/python/src/cluster.hpp b/python/src/cluster.hpp index 90c9f211..459de443 100644 --- a/python/src/cluster.hpp +++ b/python/src/cluster.hpp @@ -1,7 +1,8 @@ +#include "aare/ClusterCollector.hpp" +#include "aare/ClusterFileSink.hpp" #include "aare/ClusterFinder.hpp" #include "aare/ClusterFinderMT.hpp" #include "aare/ClusterVector.hpp" -#include "aare/ClusterCollector.hpp" #include "aare/NDView.hpp" #include "aare/Pedestal.hpp" #include "np_helper.hpp" @@ -9,13 +10,12 @@ #include #include #include -#include #include +#include namespace py = pybind11; using pd_type = double; - template void define_cluster_vector(py::module &m, const std::string &typestr) { auto class_name = fmt::format("ClusterVector_{}", typestr); @@ -64,42 +64,51 @@ void define_cluster_finder_mt_bindings(py::module &m) { auto view = make_view_2d(frame); self.push_pedestal_frame(view); }) - .def("find_clusters", - [](ClusterFinderMT &self, - py::array_t frame) { - auto view = make_view_2d(frame); - self.find_clusters(view); - return; - }) - .def("sync", &ClusterFinderMT::sync) .def( - "steal_clusters", + "find_clusters", [](ClusterFinderMT &self, - bool realloc_same_capacity) { - auto v = new ClusterVector( - self.steal_clusters(realloc_same_capacity)); - return v; + py::array_t frame, uint64_t frame_number) { + auto view = make_view_2d(frame); + self.find_clusters(view, frame_number); + return; }, - py::arg("realloc_same_capacity") = false) - .def("stop", &ClusterFinderMT::stop); + py::arg(), py::arg("frame_number") = 0) + .def("sync", &ClusterFinderMT::sync) + .def("stop", &ClusterFinderMT::stop) + .def_property_readonly("pedestal", + [](ClusterFinderMT &self) { + auto pd = new NDArray{}; + *pd = self.pedestal(); + return return_image_data(pd); + }) + .def_property_readonly("noise", + [](ClusterFinderMT &self) { + auto arr = new NDArray{}; + *arr = self.noise(); + return return_image_data(arr); + }); } - void define_cluster_collector_bindings(py::module &m) { py::class_(m, "ClusterCollector") - .def(py::init*>()) + .def(py::init *>()) .def("stop", &ClusterCollector::stop) - .def("steal_clusters", - [](ClusterCollector &self) { - auto v = new std::vector>( - self.steal_clusters()); - return v; - }, py::return_value_policy::take_ownership); - - - + .def( + "steal_clusters", + [](ClusterCollector &self) { + auto v = + new std::vector>(self.steal_clusters()); + return v; + }, + py::return_value_policy::take_ownership); } +void define_cluster_file_sink_bindings(py::module &m) { + py::class_(m, "ClusterFileSink") + .def(py::init *, + const std::filesystem::path &>()) + .def("stop", &ClusterFileSink::stop); +} void define_cluster_finder_bindings(py::module &m) { py::class_>(m, "ClusterFinder") @@ -133,12 +142,15 @@ void define_cluster_finder_bindings(py::module &m) { return v; }, py::arg("realloc_same_capacity") = false) - .def("find_clusters", [](ClusterFinder &self, - py::array_t frame) { - auto view = make_view_2d(frame); - self.find_clusters(view); - return; - }); + .def( + "find_clusters", + [](ClusterFinder &self, + py::array_t frame, uint64_t frame_number) { + auto view = make_view_2d(frame); + self.find_clusters(view); + return; + }, + py::arg(), py::arg("frame_number") = 0); m.def("hitmap", [](std::array image_size, ClusterVector &cv) { diff --git a/python/src/cluster_file.hpp b/python/src/cluster_file.hpp index 82870c46..5280e5f8 100644 --- a/python/src/cluster_file.hpp +++ b/python/src/cluster_file.hpp @@ -28,16 +28,13 @@ void define_cluster_file_io_bindings(py::module &m) { py::arg(), py::arg("chunk_size") = 1000, py::arg("mode") = "r") .def("read_clusters", [](ClusterFile &self, size_t n_clusters) { - auto *vec = - new std::vector(self.read_clusters(n_clusters)); - return return_vector(vec); + auto v = new ClusterVector(self.read_clusters(n_clusters)); + return v; }) .def("read_frame", [](ClusterFile &self) { - int32_t frame_number; - auto *vec = - new std::vector(self.read_frame(frame_number)); - return py::make_tuple(frame_number, return_vector(vec)); + auto v = new ClusterVector(self.read_frame()); + return v; }) .def("write_frame", &ClusterFile::write_frame) .def("read_cluster_with_cut", @@ -59,12 +56,11 @@ void define_cluster_file_io_bindings(py::module &m) { }) .def("__iter__", [](ClusterFile &self) { return &self; }) .def("__next__", [](ClusterFile &self) { - auto vec = - new std::vector(self.read_clusters(self.chunk_size())); - if (vec->size() == 0) { + auto v = new ClusterVector(self.read_clusters(self.chunk_size())); + if (v->size() == 0) { throw py::stop_iteration(); } - return return_vector(vec); + return v; }); m.def("calculate_eta2", []( aare::ClusterVector &clusters) { diff --git a/python/src/file.hpp b/python/src/file.hpp index 30fa82f3..f20e0ce1 100644 --- a/python/src/file.hpp +++ b/python/src/file.hpp @@ -124,8 +124,41 @@ void define_file_io_bindings(py::module &m) { self.read_into(reinterpret_cast(image.mutable_data()), n_frames); return image; + }) + .def("__enter__", [](File &self) { return &self; }) + .def("__exit__", + [](File &self, + const std::optional &exc_type, + const std::optional &exc_value, + const std::optional &traceback) { + // self.close(); + }) + .def("__iter__", [](File &self) { return &self; }) + .def("__next__", [](File &self) { + + try{ + const uint8_t item_size = self.bytes_per_pixel(); + py::array image; + std::vector shape; + shape.reserve(2); + shape.push_back(self.rows()); + shape.push_back(self.cols()); + if (item_size == 1) { + image = py::array_t(shape); + } else if (item_size == 2) { + image = py::array_t(shape); + } else if (item_size == 4) { + image = py::array_t(shape); + } + self.read_into( + reinterpret_cast(image.mutable_data())); + return image; + }catch(std::runtime_error &e){ + throw py::stop_iteration(); + } }); + py::class_(m, "FileConfig") .def(py::init<>()) .def_readwrite("rows", &FileConfig::rows) diff --git a/python/src/module.cpp b/python/src/module.cpp index 0ef868b9..451a6b88 100644 --- a/python/src/module.cpp +++ b/python/src/module.cpp @@ -28,5 +28,6 @@ PYBIND11_MODULE(_aare, m) { define_cluster_finder_mt_bindings(m); define_cluster_file_io_bindings(m); define_cluster_collector_bindings(m); + define_cluster_file_sink_bindings(m); } \ No newline at end of file diff --git a/src/ClusterFile.cpp b/src/ClusterFile.cpp index 0e5b93ed..f62b7c90 100644 --- a/src/ClusterFile.cpp +++ b/src/ClusterFile.cpp @@ -34,8 +34,7 @@ void ClusterFile::close() { } } -void ClusterFile::write_frame(int32_t frame_number, - const ClusterVector &clusters) { +void ClusterFile::write_frame(const ClusterVector &clusters) { if (m_mode != "w") { throw std::runtime_error("File not opened for writing"); } @@ -43,26 +42,27 @@ void ClusterFile::write_frame(int32_t frame_number, !(clusters.cluster_size_y() == 3)) { throw std::runtime_error("Only 3x3 clusters are supported"); } + int32_t frame_number = clusters.frame_number(); fwrite(&frame_number, sizeof(frame_number), 1, fp); uint32_t n_clusters = clusters.size(); fwrite(&n_clusters, sizeof(n_clusters), 1, fp); fwrite(clusters.data(), clusters.element_offset(), clusters.size(), fp); - // write clusters - // fwrite(clusters.data(), sizeof(Cluster), clusters.size(), fp); } -std::vector ClusterFile::read_clusters(size_t n_clusters) { +ClusterVector ClusterFile::read_clusters(size_t n_clusters) { if (m_mode != "r") { throw std::runtime_error("File not opened for reading"); } - std::vector clusters(n_clusters); + + ClusterVector clusters(3,3, n_clusters); int32_t iframe = 0; // frame number needs to be 4 bytes! size_t nph_read = 0; uint32_t nn = m_num_left; uint32_t nph = m_num_left; // number of clusters in frame needs to be 4 - auto buf = reinterpret_cast(clusters.data()); + // auto buf = reinterpret_cast(clusters.data()); + auto buf = clusters.data(); // if there are photons left from previous frame read them first if (nph) { if (nph > n_clusters) { @@ -73,7 +73,7 @@ std::vector ClusterFile::read_clusters(size_t n_clusters) { nn = nph; } nph_read += fread(reinterpret_cast(buf + nph_read), - sizeof(Cluster3x3), nn, fp); + clusters.item_size(), nn, fp); m_num_left = nph - nn; // write back the number of photons left } @@ -88,7 +88,7 @@ std::vector ClusterFile::read_clusters(size_t n_clusters) { nn = nph; nph_read += fread(reinterpret_cast(buf + nph_read), - sizeof(Cluster3x3), nn, fp); + clusters.item_size(), nn, fp); m_num_left = nph - nn; } if (nph_read >= n_clusters) @@ -102,7 +102,7 @@ std::vector ClusterFile::read_clusters(size_t n_clusters) { return clusters; } -std::vector ClusterFile::read_frame(int32_t &out_fnum) { +ClusterVector ClusterFile::read_frame() { if (m_mode != "r") { throw std::runtime_error("File not opened for reading"); } @@ -110,8 +110,8 @@ std::vector ClusterFile::read_frame(int32_t &out_fnum) { throw std::runtime_error( "There are still photons left in the last frame"); } - - if (fread(&out_fnum, sizeof(out_fnum), 1, fp) != 1) { + int32_t frame_number; + if (fread(&frame_number, sizeof(frame_number), 1, fp) != 1) { throw std::runtime_error("Could not read frame number"); } @@ -119,15 +119,19 @@ std::vector ClusterFile::read_frame(int32_t &out_fnum) { if (fread(&n_clusters, sizeof(n_clusters), 1, fp) != 1) { throw std::runtime_error("Could not read number of clusters"); } - std::vector clusters(n_clusters); + // std::vector clusters(n_clusters); + ClusterVector clusters(3, 3, n_clusters); + clusters.set_frame_number(frame_number); - if (fread(clusters.data(), sizeof(Cluster3x3), n_clusters, fp) != + if (fread(clusters.data(), clusters.item_size(), n_clusters, fp) != static_cast(n_clusters)) { throw std::runtime_error("Could not read clusters"); } + clusters.resize(n_clusters); return clusters; } + std::vector ClusterFile::read_cluster_with_cut(size_t n_clusters, double *noise_map, int nx, int ny) { diff --git a/src/File.cpp b/src/File.cpp index 37e4c57d..11809671 100644 --- a/src/File.cpp +++ b/src/File.cpp @@ -45,6 +45,8 @@ File& File::operator=(File &&other) noexcept { return *this; } +// void File::close() { file_impl->close(); } + Frame File::read_frame() { return file_impl->read_frame(); } Frame File::read_frame(size_t frame_index) { return file_impl->read_frame(frame_index);