diff --git a/ros2bag/ros2bag/verb/record.py b/ros2bag/ros2bag/verb/record.py index dadb6807ba..32bf4125b8 100644 --- a/ros2bag/ros2bag/verb/record.py +++ b/ros2bag/ros2bag/verb/record.py @@ -176,6 +176,9 @@ def add_recorder_arguments(parser: ArgumentParser) -> None: help='Enable snapshot mode. Messages will not be written to the bagfile until ' 'the "/rosbag2_recorder/snapshot" service is called. e.g. \n ' 'ros2 service call /rosbag2_recorder/snapshot rosbag2_interfaces/Snapshot') + parser.add_argument( + '--split-snapshots', action='store_true', + help='Splitting the bag file if a snapshot is taken') parser.add_argument( '--log-level', type=str, default='info', choices=['debug', 'info', 'warn', 'error', 'fatal'], @@ -325,6 +328,7 @@ def main(self, *, args): # noqa: D102 storage_preset_profile=args.storage_preset_profile, storage_config_uri=storage_config_file, snapshot_mode=args.snapshot_mode, + split_snapshots=args.split_snapshots, custom_data=custom_data ) record_options = RecordOptions() diff --git a/rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp b/rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp index 0777dcd788..e85fcf37c2 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp @@ -86,6 +86,10 @@ class ROSBAG2_CPP_PUBLIC CircularMessageCache /// Snapshot API: notify cache consumer to wake-up for dumping buffer void notify_data_ready() override; + /// Sets the flag that the data are ready but does not notify the consumer + /// use notify_data_ready() to notify the consumer as well + void set_data_ready() override; + private: std::shared_ptr producer_buffer_; std::mutex producer_buffer_mutex_; diff --git a/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache.hpp b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache.hpp index 0925ed568a..52fccbffc1 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache.hpp @@ -111,6 +111,10 @@ class ROSBAG2_CPP_PUBLIC MessageCache /// Producer API: notify consumer to wake-up (primary buffer has data) void notify_data_ready() override; + /// Sets the flag that the data are ready but does not notify the consumer + /// use notify_data_ready() to notify the consumer as well + void set_data_ready() override; + protected: /// Dropped messages per topic. Used for printing in alphabetic order std::unordered_map messages_dropped_per_topic_; diff --git a/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_interface.hpp b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_interface.hpp index e7618bd13d..aaaf7a1025 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_interface.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_interface.hpp @@ -72,6 +72,10 @@ class ROSBAG2_CPP_PUBLIC MessageCacheInterface /// \brief Producer API: notify wait_for_data() to wake up and unblock consumer thread. virtual void notify_data_ready() {} + + /// Sets the flag that the data are ready but does not notify the consumer + /// use notify_data_ready() to notify the consumer as well + virtual void set_data_ready() {} }; } // namespace cache diff --git a/rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp b/rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp index d26d04ea61..90da021f51 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp @@ -73,13 +73,16 @@ void CircularMessageCache::done_flushing() void CircularMessageCache::notify_data_ready() { - { - std::lock_guard lock(producer_buffer_mutex_); - data_ready_ = true; - } + set_data_ready(); cache_condition_var_.notify_one(); } +void CircularMessageCache::set_data_ready() +{ + std::lock_guard lock(producer_buffer_mutex_); + data_ready_ = true; +} + void CircularMessageCache::wait_for_data() { std::unique_lock producer_lock(producer_buffer_mutex_); diff --git a/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache.cpp b/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache.cpp index 139cb6f662..5f05b91b9c 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache.cpp @@ -73,13 +73,16 @@ void MessageCache::release_consumer_buffer() void MessageCache::notify_data_ready() { - { - std::lock_guard lock(producer_buffer_mutex_); - data_ready_ = true; - } + set_data_ready(); cache_condition_var_.notify_one(); } +void MessageCache::set_data_ready() +{ + std::lock_guard lock(producer_buffer_mutex_); + data_ready_ = true; +} + void MessageCache::wait_for_data() { std::unique_lock producer_lock(producer_buffer_mutex_); diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index bd56eb8ade..a87808c3f0 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -444,7 +444,12 @@ bool SequentialWriter::take_snapshot() ROSBAG2_CPP_LOG_WARN("SequentialWriter take_snaphot called when snapshot mode is disabled"); return false; } - message_cache_->notify_data_ready(); + if (storage_options_.split_snapshots) { + message_cache_->set_data_ready(); + split_bagfile(); + } else { + message_cache_->notify_data_ready(); + } return true; } diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp index cb0229f523..80caca1928 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp @@ -580,6 +580,97 @@ TEST_F(SequentialWriterTest, snapshot_mode_zero_cache_size_throws_exception) EXPECT_THROW(writer_->open(storage_options_, {rmw_format, rmw_format}), std::runtime_error); } +TEST_F(SequentialWriterTest, snapshot_mode_write_on_trigger_with_splitting) +{ + storage_options_.max_bagfile_size = 0; + storage_options_.max_cache_size = 200; + storage_options_.snapshot_mode = true; + storage_options_.split_snapshots = true; + + // Expect a single write call when the snapshot is triggered + EXPECT_CALL( + *storage_, write( + An + > &>()) + ).Times(1); + + ON_CALL( + *storage_, + write(An>())).WillByDefault( + [this](std::shared_ptr) { + fake_storage_size_ += 1; + }); + + ON_CALL(*storage_, get_bagfile_size).WillByDefault( + [this]() { + return fake_storage_size_.load(); + }); + + ON_CALL(*metadata_io_, write_metadata).WillByDefault( + [this](const std::string &, const rosbag2_storage::BagMetadata & metadata) { + fake_metadata_ = metadata; + }); + + ON_CALL(*storage_, get_relative_file_path).WillByDefault( + [this]() { + return fake_storage_uri_; + }); + + auto sequential_writer = std::make_unique( + std::move(storage_factory_), converter_factory_, std::move(metadata_io_)); + writer_ = std::make_unique(std::move(sequential_writer)); + + std::vector closed_files; + std::vector opened_files; + rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; + callbacks.write_split_callback = + [&closed_files, &opened_files](rosbag2_cpp::bag_events::BagSplitInfo & info) { + closed_files.emplace_back(info.closed_file); + opened_files.emplace_back(info.opened_file); + }; + writer_->add_event_callbacks(callbacks); + + std::string rmw_format = "rmw_format"; + + std::string msg_content = "Hello"; + auto msg_length = msg_content.length(); + auto message = std::make_shared(); + message->topic_name = "test_topic"; + message->serialized_data = rosbag2_storage::make_serialized_message( + msg_content.c_str(), msg_length); + + writer_->open(storage_options_, {rmw_format, rmw_format}); + writer_->create_topic({0u, "test_topic", "test_msgs/BasicTypes", "", {}, ""}); + + for (auto i = 0u; i < 100; ++i) { + writer_->write(message); + } + writer_->take_snapshot(); + + EXPECT_THAT(closed_files.size(), 1); + EXPECT_THAT(opened_files.size(), 1); + + if (!((closed_files.size() == opened_files.size()) && (opened_files.size() == 1))) { + // Output debug info + for (size_t i = 0; i < opened_files.size(); i++) { + std::cout << "opened_file[" << i << "] = '" << opened_files[i] << + "'; closed_file[" << i << "] = '" << closed_files[i] << "';" << std::endl; + } + } + + ASSERT_GE(opened_files.size(), 1); + ASSERT_GE(closed_files.size(), 1); + for (size_t i = 0; i < 1; i++) { + auto expected_closed = + fs::path(storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i)); + auto expected_opened = (i == 1) ? + // The last opened file shall be empty string when we do "writer->close();" + "" : fs::path(storage_options_.uri) / (bag_base_dir_ + "_" + std::to_string(i + 1)); + EXPECT_EQ(closed_files[i], expected_closed.generic_string()); + EXPECT_EQ(opened_files[i], expected_opened.generic_string()); + } +} + TEST_F(SequentialWriterTest, split_event_calls_callback) { const uint64_t max_bagfile_size = 3; diff --git a/rosbag2_py/src/rosbag2_py/_storage.cpp b/rosbag2_py/src/rosbag2_py/_storage.cpp index e12898053d..64e94018f8 100644 --- a/rosbag2_py/src/rosbag2_py/_storage.cpp +++ b/rosbag2_py/src/rosbag2_py/_storage.cpp @@ -83,7 +83,7 @@ PYBIND11_MODULE(_storage, m) { .def( pybind11::init< std::string, std::string, uint64_t, uint64_t, uint64_t, std::string, std::string, bool, - int64_t, int64_t, KEY_VALUE_MAP>(), + bool, int64_t, int64_t, KEY_VALUE_MAP>(), pybind11::arg("uri"), pybind11::arg("storage_id") = "", pybind11::arg("max_bagfile_size") = 0, @@ -92,6 +92,7 @@ PYBIND11_MODULE(_storage, m) { pybind11::arg("storage_preset_profile") = "", pybind11::arg("storage_config_uri") = "", pybind11::arg("snapshot_mode") = false, + pybind11::arg("split_snapshots") = false, pybind11::arg("start_time_ns") = -1, pybind11::arg("end_time_ns") = -1, pybind11::arg("custom_data") = KEY_VALUE_MAP{}) @@ -115,6 +116,9 @@ PYBIND11_MODULE(_storage, m) { .def_readwrite( "snapshot_mode", &rosbag2_storage::StorageOptions::snapshot_mode) + .def_readwrite( + "split_snapshots", + &rosbag2_storage::StorageOptions::split_snapshots) .def_readwrite( "start_time_ns", &rosbag2_storage::StorageOptions::start_time_ns) diff --git a/rosbag2_storage/include/rosbag2_storage/storage_options.hpp b/rosbag2_storage/include/rosbag2_storage/storage_options.hpp index 8d6d188459..afcd1981e3 100644 --- a/rosbag2_storage/include/rosbag2_storage/storage_options.hpp +++ b/rosbag2_storage/include/rosbag2_storage/storage_options.hpp @@ -56,6 +56,10 @@ struct StorageOptions // Defaults to disabled. bool snapshot_mode = false; + // Split the bag files if a snapshot is created. Only used in snapshot mode. + // Defaults to disabled. + bool split_snapshots = false; + // Start and end time for cutting int64_t start_time_ns = -1; int64_t end_time_ns = -1; diff --git a/rosbag2_storage/src/rosbag2_storage/storage_options.cpp b/rosbag2_storage/src/rosbag2_storage/storage_options.cpp index 717fcb9082..62fb183642 100644 --- a/rosbag2_storage/src/rosbag2_storage/storage_options.cpp +++ b/rosbag2_storage/src/rosbag2_storage/storage_options.cpp @@ -32,6 +32,7 @@ Node convert::encode( node["storage_preset_profile"] = storage_options.storage_preset_profile; node["storage_config_uri"] = storage_options.storage_config_uri; node["snapshot_mode"] = storage_options.snapshot_mode; + node["split_snapshots"] = storage_options.split_snapshots; node["start_time_ns"] = storage_options.start_time_ns; node["end_time_ns"] = storage_options.end_time_ns; node["custom_data"] = storage_options.custom_data; @@ -50,6 +51,7 @@ bool convert::decode( node, "storage_preset_profile", storage_options.storage_preset_profile); optional_assign(node, "storage_config_uri", storage_options.storage_config_uri); optional_assign(node, "snapshot_mode", storage_options.snapshot_mode); + optional_assign(node, "split_snapshots", storage_options.split_snapshots); optional_assign(node, "start_time_ns", storage_options.start_time_ns); optional_assign(node, "end_time_ns", storage_options.end_time_ns); using KEY_VALUE_MAP = std::unordered_map; diff --git a/rosbag2_storage/test/rosbag2_storage/test_storage_options.cpp b/rosbag2_storage/test/rosbag2_storage/test_storage_options.cpp index aa21761178..5423d89b08 100644 --- a/rosbag2_storage/test/rosbag2_storage/test_storage_options.cpp +++ b/rosbag2_storage/test/rosbag2_storage/test_storage_options.cpp @@ -29,6 +29,7 @@ TEST(storage_options, test_yaml_serialization) original.storage_preset_profile = "profile"; original.storage_config_uri = "config_uri"; original.snapshot_mode = true; + original.split_snapshots = true; original.start_time_ns = 12345000; original.end_time_ns = 23456000; original.custom_data["key1"] = "value1"; @@ -50,6 +51,7 @@ TEST(storage_options, test_yaml_serialization) ASSERT_EQ(original.storage_preset_profile, reconstructed.storage_preset_profile); ASSERT_EQ(original.storage_config_uri, reconstructed.storage_config_uri); ASSERT_EQ(original.snapshot_mode, reconstructed.snapshot_mode); + ASSERT_EQ(original.split_snapshots, reconstructed.split_snapshots); ASSERT_EQ(original.start_time_ns, reconstructed.start_time_ns); ASSERT_EQ(original.end_time_ns, reconstructed.end_time_ns); ASSERT_EQ(original.custom_data, reconstructed.custom_data);