From e10dc984bc0fa16b8ee79f53c19ec8b681c2e5ee Mon Sep 17 00:00:00 2001 From: Gary Yendell Date: Fri, 8 Mar 2024 18:05:27 +0000 Subject: [PATCH] Add Fan /dev/shm cache --- cpp/CMakeLists.txt | 8 +- cpp/data/common/include/EigerDefinitions.h | 3 + cpp/data/eigerfan/include/EigerFan.h | 4 +- cpp/data/eigerfan/src/EigerFan.cpp | 137 +++++++++++++++++++-- 4 files changed, 139 insertions(+), 13 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 747791f..4fe7f5b 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -5,6 +5,10 @@ set(SOURCE_DIR ${EigerDetector_SOURCE_DIR}) # Require CMake version >=2.8 cmake_minimum_required(VERSION 2.8) +# Set C++11 +set(CMAKE_CXX_STANDARD 11) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + # Set output directories set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin) set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib) @@ -14,10 +18,12 @@ set(CMAKE_CONFIG_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/test_config) # directories of additional CMake modules (ie. MacroOutOfSourceBuild.cmake): set(CMAKE_MODULE_PATH ${SOURCE_DIR}/cmake ${CMAKE_MODULE_PATH}) +SET(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE) + # Find and add external packages required for application and test find_package( Boost 1.41.0 REQUIRED - COMPONENTS program_options system unit_test_framework date_time thread + COMPONENTS program_options system filesystem unit_test_framework date_time thread ) find_package(LOG4CXX 0.10.0 REQUIRED) find_package(ZEROMQ 3.2.4 REQUIRED) diff --git a/cpp/data/common/include/EigerDefinitions.h b/cpp/data/common/include/EigerDefinitions.h index 3eee830..7257b8c 100644 --- a/cpp/data/common/include/EigerDefinitions.h +++ b/cpp/data/common/include/EigerDefinitions.h @@ -59,6 +59,7 @@ namespace Eiger { const std::string CONTROL_OFFSET = "offset"; const std::string CONTROL_ACQ_ID = "acqid"; const std::string CONTROL_FWD_STREAM = "forward_stream"; + const std::string CONTROL_DEV_SHM_CACHE = "dev_shm_cache"; const std::string CONTROL_BLOCK_SIZE = "block_size"; const std::string CONTROL_RESPONSE_OK = "{\"msg_type\":\"ack\",\"msg_val\":\"configure\", \"params\": {}}"; @@ -75,6 +76,8 @@ namespace Eiger { static const std::string STATE_DSTR_END = "DSTR_END"; static const std::string STATE_KILL_REQUESTED = "KILL_REQUESTED"; + static const std::string DEV_SHM_PATH = "/dev/shm/eiger"; + enum EigerMessageType { GLOBAL_HEADER_NONE, GLOBAL_HEADER_CONFIG, GLOBAL_HEADER_FLATFIELD, GLOBAL_HEADER_MASK, GLOBAL_HEADER_COUNTRATE, GLOBAL_HEADER_APPENDIX, IMAGE_DATA, IMAGE_APPENDIX, END_OF_STREAM}; typedef struct diff --git a/cpp/data/eigerfan/include/EigerFan.h b/cpp/data/eigerfan/include/EigerFan.h index 98e31ba..02c0234 100644 --- a/cpp/data/eigerfan/include/EigerFan.h +++ b/cpp/data/eigerfan/include/EigerFan.h @@ -36,8 +36,9 @@ class EigerFan { protected: void HandleStreamMessage(zmq::message_t &message, boost::shared_ptr socket); void HandleGlobalHeaderMessage(boost::shared_ptr socket); - void HandleImageDataMessage(boost::shared_ptr socket); + void HandleImageDataMessage(boost::shared_ptr socket, uint64_t frame_number); void HandleEndOfSeriesMessage(boost::shared_ptr socket); + void WriteMessageToFile(zmq::message_t &message, std::string filename); void HandleMonitorMessage(zmq::message_t &message, boost::shared_ptr socket, int rank); void HandleForwardMonitorMessage(zmq::message_t &message, zmq::socket_t &socket); void HandleControlMessage(zmq::message_t &message, zmq::message_t &idMessage); @@ -71,6 +72,7 @@ class EigerFan { int currentOffset; int numConnectedForwardingSockets; bool forwardStream; + bool devShmCache; }; diff --git a/cpp/data/eigerfan/src/EigerFan.cpp b/cpp/data/eigerfan/src/EigerFan.cpp index 22533a2..d4093f7 100644 --- a/cpp/data/eigerfan/src/EigerFan.cpp +++ b/cpp/data/eigerfan/src/EigerFan.cpp @@ -5,11 +5,16 @@ * Author: Ulrik Pedersen */ +#include // open, O_CREAT, O_RDWR +#include // std::setw, std::setfill #include #include #include -#include "EigerFan.h" + #include "boost/date_time/posix_time/posix_time.hpp" +#include + +#include "EigerFan.h" // Utility variables int more; @@ -17,6 +22,18 @@ size_t more_size = sizeof (more); using namespace Eiger; + +/** Log an error with the given message and the current errno + * + * @param message Message to append errno onto + * +*/ +#define LOG_WITH_ERRNO(log, message) { \ + std::stringstream error; \ + error << message << " [errno: " << errno << " - " << strerror(errno) << "]"; \ + LOG4CXX_ERROR(log, error.str()); \ +} + /** * Get a user-friendly string from a state enum value * @@ -35,6 +52,19 @@ std::string GetStateString(EigerFanState state) { return "UNKNOWN STATE"; } +/** + * Create a string of value padded with zeroes. + * + * \param[in] value Value to pad + * \return The padded string + * +*/ +std::string PadInt(int value) { + std::ostringstream ss; + ss << std::setw(6) << std::setfill('0') << value; + return ss.str(); +} + /** * Default constructor for the EigerFan class */ @@ -54,6 +84,7 @@ EigerFan::EigerFan() currentOffset = 0; numConnectedForwardingSockets = 0; forwardStream = false; + devShmCache = false; } /** @@ -78,6 +109,7 @@ EigerFan::EigerFan(EigerFanConfig config_) currentOffset = 0; numConnectedForwardingSockets = 0; forwardStream = false; + devShmCache = false; } /** @@ -384,7 +416,7 @@ void EigerFan::HandleStreamMessage(zmq::message_t &message, boost::shared_ptr lastFrameSent) { lastFrameSent = frame; } @@ -454,12 +486,17 @@ void EigerFan::HandleGlobalHeaderMessage(boost::shared_ptr socket rapidjson::Value& headerDetailValue = jsonDocument[HEADER_DETAIL_KEY.c_str()]; std::string headerDetail(headerDetailValue.GetString()); + this->WriteMessageToFile(newPart1message, "start_0"); + if (headerDetail.compare(HEADER_DETAIL_NONE) == 0) { socket->getsockopt(ZMQ_RCVMORE, &more, &more_size); if (more == MORE_MESSAGES) { LOG4CXX_DEBUG(log, "Header has appendix"); zmq::message_t messageAppendix; socket->recv(&messageAppendix); + + this->WriteMessageToFile(messageAppendix, "start_appendix"); + messageList.push_back(&newPart1message); messageList.push_back(&messageAppendix); SendMessagesToAllConsumers(messageList); @@ -477,11 +514,16 @@ void EigerFan::HandleGlobalHeaderMessage(boost::shared_ptr socket zmq::message_t messagePart2; socket->recv(&messagePart2); + this->WriteMessageToFile(messagePart2, "start_1"); + socket->getsockopt(ZMQ_RCVMORE, &more, &more_size); if (more == MORE_MESSAGES) { LOG4CXX_DEBUG(log, "Header has appendix"); zmq::message_t messageAppendix; socket->recv(&messageAppendix); + + this->WriteMessageToFile(messageAppendix, "start_appendix"); + messageList.push_back(&newPart1message); messageList.push_back(&messagePart2); messageList.push_back(&messageAppendix); @@ -503,9 +545,11 @@ void EigerFan::HandleGlobalHeaderMessage(boost::shared_ptr socket zmq::message_t messagePart2; socket->recv(&messagePart2); + this->WriteMessageToFile(messagePart2, "start_1"); + socket->getsockopt(ZMQ_RCVMORE, &more, &more_size); if (more != MORE_MESSAGES) { - LOG4CXX_ERROR(log, "Header only contained 2 part but expected 8 for 'all' detail"); + LOG4CXX_ERROR(log, "Header only contained 2 parts but expected 8 for 'all' detail"); return; } @@ -513,9 +557,11 @@ void EigerFan::HandleGlobalHeaderMessage(boost::shared_ptr socket zmq::message_t messagePart3; socket->recv(&messagePart3); + this->WriteMessageToFile(messagePart3, "start_2"); + socket->getsockopt(ZMQ_RCVMORE, &more, &more_size); if (more != MORE_MESSAGES) { - LOG4CXX_ERROR(log, "Header only contained 3 part but expected 8 for 'all' detail"); + LOG4CXX_ERROR(log, "Header only contained 3 parts but expected 8 for 'all' detail"); return; } @@ -523,9 +569,11 @@ void EigerFan::HandleGlobalHeaderMessage(boost::shared_ptr socket zmq::message_t messagePart4; socket->recv(&messagePart4); + this->WriteMessageToFile(messagePart4, "start_3"); + socket->getsockopt(ZMQ_RCVMORE, &more, &more_size); if (more != MORE_MESSAGES) { - LOG4CXX_ERROR(log, "Header only contained 4 part but expected 8 for 'all' detail"); + LOG4CXX_ERROR(log, "Header only contained 4 parts but expected 8 for 'all' detail"); return; } @@ -533,9 +581,11 @@ void EigerFan::HandleGlobalHeaderMessage(boost::shared_ptr socket zmq::message_t messagePart5; socket->recv(&messagePart5); + this->WriteMessageToFile(messagePart5, "start_4"); + socket->getsockopt(ZMQ_RCVMORE, &more, &more_size); if (more != MORE_MESSAGES) { - LOG4CXX_ERROR(log, "Header only contained 5 part but expected 8 for 'all' detail"); + LOG4CXX_ERROR(log, "Header only contained 5 parts but expected 8 for 'all' detail"); return; } @@ -543,9 +593,11 @@ void EigerFan::HandleGlobalHeaderMessage(boost::shared_ptr socket zmq::message_t messagePart6; socket->recv(&messagePart6); + this->WriteMessageToFile(messagePart6, "start_5"); + socket->getsockopt(ZMQ_RCVMORE, &more, &more_size); if (more != MORE_MESSAGES) { - LOG4CXX_ERROR(log, "Header only contained 6 part but expected 8 for 'all' detail"); + LOG4CXX_ERROR(log, "Header only contained 6 parts but expected 8 for 'all' detail"); return; } @@ -553,9 +605,11 @@ void EigerFan::HandleGlobalHeaderMessage(boost::shared_ptr socket zmq::message_t messagePart7; socket->recv(&messagePart7); + this->WriteMessageToFile(messagePart7, "start_6"); + socket->getsockopt(ZMQ_RCVMORE, &more, &more_size); if (more != MORE_MESSAGES) { - LOG4CXX_ERROR(log, "Header only contained 7 part but expected 8 for 'all' detail"); + LOG4CXX_ERROR(log, "Header only contained 7 parts but expected 8 for 'all' detail"); return; } @@ -563,11 +617,16 @@ void EigerFan::HandleGlobalHeaderMessage(boost::shared_ptr socket zmq::message_t messagePart8; socket->recv(&messagePart8); + this->WriteMessageToFile(messagePart8, "start_7"); + socket->getsockopt(ZMQ_RCVMORE, &more, &more_size); if (more == MORE_MESSAGES) { LOG4CXX_DEBUG(log, "Header has appendix"); zmq::message_t messageAppendix; socket->recv(&messageAppendix); + + this->WriteMessageToFile(messageAppendix, "start_appendix"); + messageList.push_back(&newPart1message); messageList.push_back(&messagePart2); messageList.push_back(&messagePart3); @@ -589,6 +648,7 @@ void EigerFan::HandleGlobalHeaderMessage(boost::shared_ptr socket messageList.push_back(&messagePart8); SendMessagesToAllConsumers(messageList); } + } else { LOG4CXX_ERROR(log, "Unexpected header detail type"); @@ -608,7 +668,7 @@ void EigerFan::HandleGlobalHeaderMessage(boost::shared_ptr socket * * \param[in] socket The socket that the message was received on */ -void EigerFan::HandleImageDataMessage(boost::shared_ptr socket) { +void EigerFan::HandleImageDataMessage(boost::shared_ptr socket, uint64_t frame_number) { LOG4CXX_DEBUG(log, "Handling Image Data Message"); socket->getsockopt(ZMQ_RCVMORE, &more, &more_size); @@ -646,6 +706,11 @@ void EigerFan::HandleImageDataMessage(boost::shared_ptr socket) { zmq::message_t messagePart4; socket->recv(&messagePart4); + this->WriteMessageToFile(newPart1message, "image_" + PadInt(frame_number) + "_0"); + this->WriteMessageToFile(messagePart2, "image_" + PadInt(frame_number) + "_1"); + this->WriteMessageToFile(messagePart3, "image_" + PadInt(frame_number) + "_2"); + this->WriteMessageToFile(messagePart4, "image_" + PadInt(frame_number) + "_3"); + // Handle appendix socket->getsockopt(ZMQ_RCVMORE, &more, &more_size); if (more == MORE_MESSAGES) { @@ -653,6 +718,8 @@ void EigerFan::HandleImageDataMessage(boost::shared_ptr socket) { zmq::message_t messageAppendix; socket->recv(&messageAppendix); + this->WriteMessageToFile(messageAppendix, "image_" + PadInt(frame_number) + "_appendix"); + // Send the data on to a consumer SendMessageToSingleConsumer(newPart1message, ZMQ_SNDMORE); SendMessageToSingleConsumer(messagePart2, ZMQ_SNDMORE); @@ -686,6 +753,9 @@ void EigerFan::HandleEndOfSeriesMessage(boost::shared_ptr socket) std::string part1WithAcquisitionID = AddAcquisitionIDToPart1(); zmq::message_t newPart1message(part1WithAcquisitionID.size()); memcpy (newPart1message.data (), part1WithAcquisitionID.c_str(), part1WithAcquisitionID.size()); + + this->WriteMessageToFile(newPart1message, "end"); + SendMessageToAllConsumers(newPart1message); if (state != DSTR_IMAGE) { LOG4CXX_WARN(log, std::string("Received EndOfSeries message in unexpected state: ").append(GetStateString(state))); @@ -694,6 +764,34 @@ void EigerFan::HandleEndOfSeriesMessage(boost::shared_ptr socket) LOG4CXX_DEBUG(log, "Finished Handling EndOfSeries Message"); } +void EigerFan::WriteMessageToFile(zmq::message_t &message, std::string filename) { + if (!this->devShmCache) { + return; + } + + boost::filesystem::path full_file_path = boost::filesystem::path( + DEV_SHM_PATH + "/" + currentAcquisitionID + "/" + filename + ); + boost::filesystem::path tmp_file_path = boost::filesystem::path(full_file_path.string() + ".tmp"); + + boost::filesystem::create_directories(full_file_path.parent_path()); + + int fd = open(tmp_file_path.c_str(), O_CREAT | O_RDWR, 0666); + if (fd == -1) { + LOG_WITH_ERRNO(log, "open failed"); + } + + if (write(fd, message.data(), message.size()) != message.size()) { + LOG_WITH_ERRNO(log, "write failed"); + } + + close(fd); + + if (rename(tmp_file_path.c_str(), full_file_path.c_str())) { + LOG_WITH_ERRNO(log, "rename failed"); + } +} + /** * Handle messages from the monitoring of the zeromq connections * @@ -708,7 +806,7 @@ void EigerFan::HandleMonitorMessage(zmq::message_t &message, boost::shared_ptrgetsockopt(ZMQ_RCVMORE, &more, &more_size); if (more != MORE_MESSAGES) { @@ -767,7 +865,7 @@ void EigerFan::HandleForwardMonitorMessage(zmq::message_t &message, zmq::socket_ // Get the event code from the message, which is a number contained in the first 16 bits uint16_t event = *(uint16_t *) (message.data()); - + // Get the second message part which contains the endpoint socket.getsockopt(ZMQ_RCVMORE, &more, &more_size); if (more != MORE_MESSAGES) { @@ -925,6 +1023,12 @@ void EigerFan::HandleControlMessage(zmq::message_t &message, zmq::message_t &idM valueForward.SetBool(forwardStream); document.AddMember(keyForward, valueForward, document.GetAllocator()); + // Add /dev/shm cache state + rapidjson::Value keyDevShmCache(CONTROL_DEV_SHM_CACHE, document.GetAllocator()); + rapidjson::Value valueDevShmCache; + valueDevShmCache.SetBool(devShmCache); + document.AddMember(keyDevShmCache, valueDevShmCache, document.GetAllocator()); + rapidjson::StringBuffer buffer; rapidjson::Writer writer(buffer); @@ -1013,6 +1117,17 @@ void EigerFan::HandleControlMessage(zmq::message_t &message, zmq::message_t &idM LOG4CXX_INFO(log, "Forward stream changed to " << forwardStream); replyString.assign(CONTROL_RESPONSE_OK.c_str()); } + if (paramsValue.HasMember(CONTROL_DEV_SHM_CACHE.c_str())) { + // Enable/disable /dev/shm cache + devShmCache = paramsValue[CONTROL_DEV_SHM_CACHE.c_str()].GetBool(); + if (devShmCache) { + LOG4CXX_INFO(log, "Enabling shared memory cache"); + } else { + LOG4CXX_INFO(log, "Disabling shared memory cache"); + boost::filesystem::remove_all(DEV_SHM_PATH); + } + replyString.assign(CONTROL_RESPONSE_OK.c_str()); + } if (paramsValue.HasMember(CONTROL_BLOCK_SIZE.c_str())) { // Change the block size config.block_size = paramsValue[CONTROL_BLOCK_SIZE.c_str()].GetInt();