From c3e9debe5d6f7ef8445777b60f42bf5e343d7d8a Mon Sep 17 00:00:00 2001 From: Toby Schneider Date: Tue, 14 May 2019 17:47:32 +0100 Subject: [PATCH] Added contents of triggering message to signal_queue_size_change --- src/acomms/protobuf/queue.proto | 7 +++++++ src/acomms/queue/queue.cpp | 5 ++++- src/acomms/queue/queue.h | 2 +- src/acomms/queue/queue_manager.cpp | 27 +++++++++++++++++++-------- src/acomms/queue/queue_manager.h | 2 +- src/test/acomms/queue3/test.cpp | 25 ++++++++++++++++++++----- 6 files changed, 52 insertions(+), 16 deletions(-) diff --git a/src/acomms/protobuf/queue.proto b/src/acomms/protobuf/queue.proto index d5621aba..0d94ed6b 100644 --- a/src/acomms/protobuf/queue.proto +++ b/src/acomms/protobuf/queue.proto @@ -86,6 +86,13 @@ message QueueSize { required uint32 dccl_id = 1; required uint32 size = 2; + + message EncodedMessage + { + required string full_name = 1; + required bytes data = 2; + } + optional EncodedMessage triggering_message = 10; } message QueueFlush diff --git a/src/acomms/queue/queue.cpp b/src/acomms/queue/queue.cpp index 4165ee83..098b1d1a 100644 --- a/src/acomms/queue/queue.cpp +++ b/src/acomms/queue/queue.cpp @@ -384,7 +384,9 @@ bool goby::acomms::Queue::get_priority_values(double* priority, } } -bool goby::acomms::Queue::pop_message(unsigned frame) +bool goby::acomms::Queue::pop_message(unsigned frame, + boost::shared_ptr& removed_msg) + { std::list::iterator back_it = messages_.end(); --back_it; // gives us "back" iterator @@ -399,6 +401,7 @@ bool goby::acomms::Queue::pop_message(unsigned frame) if (!it->meta.ack_requested()) { stream_for_pop(*it); + removed_msg = it->dccl_msg; messages_.erase(it); return true; } diff --git a/src/acomms/queue/queue.h b/src/acomms/queue/queue.h index 40909104..0760b987 100644 --- a/src/acomms/queue/queue.h +++ b/src/acomms/queue/queue.h @@ -73,7 +73,7 @@ class Queue const google::protobuf::Message& msg); goby::acomms::QueuedMessage give_data(unsigned frame); - bool pop_message(unsigned frame); + bool pop_message(unsigned frame, boost::shared_ptr& removed_msg); bool pop_message_ack(unsigned frame, boost::shared_ptr& removed_msg); void stream_for_pop(const QueuedMessage& queued_msg); diff --git a/src/acomms/queue/queue_manager.cpp b/src/acomms/queue/queue_manager.cpp index f6ef8bbc..54703909 100644 --- a/src/acomms/queue/queue_manager.cpp +++ b/src/acomms/queue/queue_manager.cpp @@ -109,7 +109,7 @@ void goby::acomms::QueueManager::add_queue( Queue& new_q = *((new_q_pair.first)->second); - qsize(&new_q); + qsize(&new_q, 0); glog.is(DEBUG1) && glog << group(glog_out_group_) << "Added new queue: \n" << new_q << std::endl; @@ -158,7 +158,7 @@ void goby::acomms::QueueManager::push_message(const google::protobuf::Message& d else queues_.find(dccl_id)->second->push_message(new_dccl_msg); - qsize(queues_[dccl_id].get()); + qsize(queues_[dccl_id].get(), new_dccl_msg.get()); } void goby::acomms::QueueManager::flush_queue(const protobuf::QueueFlush& flush) @@ -170,7 +170,7 @@ void goby::acomms::QueueManager::flush_queue(const protobuf::QueueFlush& flush) it->second->flush(); glog.is(DEBUG1) && glog << group(glog_out_group_) << msg_string(it->second->descriptor()) << ": flushed queue" << std::endl; - qsize(it->second.get()); + qsize(it->second.get(), 0); } else { @@ -298,12 +298,14 @@ void goby::acomms::QueueManager::handle_modem_data_request(protobuf::ModemTransm glog.is(DEBUG2) && glog << group(glog_out_group_) << "no ack, popping from queue: " << *winning_queue << std::endl; - if (!winning_queue->pop_message(frame_number)) + + boost::shared_ptr removed_msg; + if (!winning_queue->pop_message(frame_number, removed_msg)) glog.is(DEBUG1) && glog << group(glog_out_group_) << "failed to pop from queue: " << *winning_queue << std::endl; - - qsize(winning_queue); // notify change in queue size + else + qsize(winning_queue, removed_msg.get()); // notify change in queue size } // if an ack been set, do not unset these @@ -625,7 +627,7 @@ void goby::acomms::QueueManager::process_modem_ack(const protobuf::ModemTransmis } else { - qsize(q); + qsize(q, removed_msg.get()); signal_ack(ack_msg, *removed_msg); if (network_ack_src_ids_.count(meta_from_msg(*removed_msg).src())) create_network_ack(ack_msg.src(), *removed_msg, @@ -811,11 +813,20 @@ void goby::acomms::QueueManager::process_cfg() } } -void goby::acomms::QueueManager::qsize(Queue* q) +void goby::acomms::QueueManager::qsize(Queue* q, + const google::protobuf::Message* triggering_message) { protobuf::QueueSize size; size.set_dccl_id(codec_->id(q->descriptor())); size.set_size(q->size()); + + if (triggering_message) + { + protobuf::QueueSize::EncodedMessage& encoded_message = *size.mutable_triggering_message(); + encoded_message.set_full_name(triggering_message->GetDescriptor()->full_name()); + encoded_message.set_data(triggering_message->SerializePartialAsString()); + } + signal_queue_size_change(size); } diff --git a/src/acomms/queue/queue_manager.h b/src/acomms/queue/queue_manager.h index bd2f881d..bd9bc61b 100644 --- a/src/acomms/queue/queue_manager.h +++ b/src/acomms/queue/queue_manager.h @@ -227,7 +227,7 @@ class QueueManager QueueManager& operator=(const QueueManager&); //@} - void qsize(Queue* q); + void qsize(Queue* q, const google::protobuf::Message* triggering_message); // finds the %queue with the highest priority Queue* find_next_sender(const protobuf::ModemTransmission& message, const std::string& data, diff --git a/src/test/acomms/queue3/test.cpp b/src/test/acomms/queue3/test.cpp index 0e56497a..985664b7 100644 --- a/src/test/acomms/queue3/test.cpp +++ b/src/test/acomms/queue3/test.cpp @@ -62,6 +62,10 @@ int main(int argc, char* argv[]) goby::acomms::QueueManager q_manager; cfg.set_modem_id(MY_MODEM_ID); + goby::acomms::connect(&q_manager.signal_receive, &handle_receive); + goby::acomms::connect(&q_manager.signal_queue_size_change, &qsize); + goby::acomms::connect(&q_manager.signal_ack, &handle_ack); + goby::acomms::protobuf::QueuedMessageEntry* q_entry = cfg.add_message_entry(); q_entry->set_protobuf_name("GobyMessage"); q_entry->set_newest_first(true); @@ -80,10 +84,6 @@ int main(int argc, char* argv[]) q_manager.set_cfg(cfg); - goby::acomms::connect(&q_manager.signal_receive, &handle_receive); - goby::acomms::connect(&q_manager.signal_queue_size_change, &qsize); - goby::acomms::connect(&q_manager.signal_ack, &handle_ack); - msg_in_macrura.set_telegram("hello mac!"); msg_in_macrura.mutable_header()->set_time(goby::util::as(current_time)); msg_in_macrura.mutable_header()->set_source_platform(MY_MODEM_ID); @@ -196,7 +196,22 @@ void handle_receive(const google::protobuf::Message& msg) ++receive_count; } -void qsize(goby::acomms::protobuf::QueueSize size) { goby_message_qsize = size.size(); } +void qsize(goby::acomms::protobuf::QueueSize size) +{ + std::cout << "Queue size change: " << size.ShortDebugString() << std::endl; + + if (size.has_triggering_message()) + { + boost::shared_ptr triggering_message = + dccl::DynamicProtobufManager::new_protobuf_message( + size.triggering_message().full_name()); + triggering_message->ParseFromString(size.triggering_message().data()); + std::cout << "\tTriggering message decodes as: " << triggering_message->ShortDebugString() + << std::endl; + } + + goby_message_qsize = size.size(); +} void handle_ack(const goby::acomms::protobuf::ModemTransmission& ack_msg, const google::protobuf::Message& orig_msg)