Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added contents of triggering message to signal_queue_size_change #46

Open
wants to merge 1 commit into
base: 2.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/acomms/protobuf/queue.proto
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ message QueueSize
{
required uint32 dccl_id = 1;
required uint32 size = 2;

message EncodedMessage
{
required string full_name = 1;
required bytes data = 2;
}
optional EncodedMessage triggering_message = 10;
}

message QueueFlush
Expand Down
5 changes: 4 additions & 1 deletion src/acomms/queue/queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,9 @@ bool goby::acomms::Queue::get_priority_values(double* priority,
}
}

bool goby::acomms::Queue::pop_message(unsigned frame)
bool goby::acomms::Queue::pop_message(unsigned frame,
boost::shared_ptr<google::protobuf::Message>& removed_msg)

{
std::list<QueuedMessage>::iterator back_it = messages_.end();
--back_it; // gives us "back" iterator
Expand All @@ -399,6 +401,7 @@ bool goby::acomms::Queue::pop_message(unsigned frame)
if (!it->meta.ack_requested())
{
stream_for_pop(*it);
removed_msg = it->dccl_msg;
messages_.erase(it);
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/acomms/queue/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class Queue
const google::protobuf::Message& msg);

goby::acomms::QueuedMessage give_data(unsigned frame);
bool pop_message(unsigned frame);
bool pop_message(unsigned frame, boost::shared_ptr<google::protobuf::Message>& removed_msg);
bool pop_message_ack(unsigned frame, boost::shared_ptr<google::protobuf::Message>& removed_msg);
void stream_for_pop(const QueuedMessage& queued_msg);

Expand Down
27 changes: 19 additions & 8 deletions src/acomms/queue/queue_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ void goby::acomms::QueueManager::add_queue(

Queue& new_q = *((new_q_pair.first)->second);

qsize(&new_q);
qsize(&new_q, 0);

glog.is(DEBUG1) && glog << group(glog_out_group_) << "Added new queue: \n"
<< new_q << std::endl;
Expand Down Expand Up @@ -158,7 +158,7 @@ void goby::acomms::QueueManager::push_message(const google::protobuf::Message& d
else
queues_.find(dccl_id)->second->push_message(new_dccl_msg);

qsize(queues_[dccl_id].get());
qsize(queues_[dccl_id].get(), new_dccl_msg.get());
}

void goby::acomms::QueueManager::flush_queue(const protobuf::QueueFlush& flush)
Expand All @@ -170,7 +170,7 @@ void goby::acomms::QueueManager::flush_queue(const protobuf::QueueFlush& flush)
it->second->flush();
glog.is(DEBUG1) && glog << group(glog_out_group_) << msg_string(it->second->descriptor())
<< ": flushed queue" << std::endl;
qsize(it->second.get());
qsize(it->second.get(), 0);
}
else
{
Expand Down Expand Up @@ -298,12 +298,14 @@ void goby::acomms::QueueManager::handle_modem_data_request(protobuf::ModemTransm
glog.is(DEBUG2) && glog << group(glog_out_group_)
<< "no ack, popping from queue: " << *winning_queue
<< std::endl;
if (!winning_queue->pop_message(frame_number))

boost::shared_ptr<google::protobuf::Message> removed_msg;
if (!winning_queue->pop_message(frame_number, removed_msg))
glog.is(DEBUG1) && glog << group(glog_out_group_)
<< "failed to pop from queue: " << *winning_queue
<< std::endl;

qsize(winning_queue); // notify change in queue size
else
qsize(winning_queue, removed_msg.get()); // notify change in queue size
}

// if an ack been set, do not unset these
Expand Down Expand Up @@ -625,7 +627,7 @@ void goby::acomms::QueueManager::process_modem_ack(const protobuf::ModemTransmis
}
else
{
qsize(q);
qsize(q, removed_msg.get());
signal_ack(ack_msg, *removed_msg);
if (network_ack_src_ids_.count(meta_from_msg(*removed_msg).src()))
create_network_ack(ack_msg.src(), *removed_msg,
Expand Down Expand Up @@ -811,11 +813,20 @@ void goby::acomms::QueueManager::process_cfg()
}
}

void goby::acomms::QueueManager::qsize(Queue* q)
void goby::acomms::QueueManager::qsize(Queue* q,
const google::protobuf::Message* triggering_message)
{
protobuf::QueueSize size;
size.set_dccl_id(codec_->id(q->descriptor()));
size.set_size(q->size());

if (triggering_message)
{
protobuf::QueueSize::EncodedMessage& encoded_message = *size.mutable_triggering_message();
encoded_message.set_full_name(triggering_message->GetDescriptor()->full_name());
encoded_message.set_data(triggering_message->SerializePartialAsString());
}

signal_queue_size_change(size);
}

Expand Down
2 changes: 1 addition & 1 deletion src/acomms/queue/queue_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ class QueueManager
QueueManager& operator=(const QueueManager&);
//@}

void qsize(Queue* q);
void qsize(Queue* q, const google::protobuf::Message* triggering_message);

// finds the %queue with the highest priority
Queue* find_next_sender(const protobuf::ModemTransmission& message, const std::string& data,
Expand Down
25 changes: 20 additions & 5 deletions src/test/acomms/queue3/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ int main(int argc, char* argv[])
goby::acomms::QueueManager q_manager;
cfg.set_modem_id(MY_MODEM_ID);

goby::acomms::connect(&q_manager.signal_receive, &handle_receive);
goby::acomms::connect(&q_manager.signal_queue_size_change, &qsize);
goby::acomms::connect(&q_manager.signal_ack, &handle_ack);

goby::acomms::protobuf::QueuedMessageEntry* q_entry = cfg.add_message_entry();
q_entry->set_protobuf_name("GobyMessage");
q_entry->set_newest_first(true);
Expand All @@ -80,10 +84,6 @@ int main(int argc, char* argv[])

q_manager.set_cfg(cfg);

goby::acomms::connect(&q_manager.signal_receive, &handle_receive);
goby::acomms::connect(&q_manager.signal_queue_size_change, &qsize);
goby::acomms::connect(&q_manager.signal_ack, &handle_ack);

msg_in_macrura.set_telegram("hello mac!");
msg_in_macrura.mutable_header()->set_time(goby::util::as<goby::uint64>(current_time));
msg_in_macrura.mutable_header()->set_source_platform(MY_MODEM_ID);
Expand Down Expand Up @@ -196,7 +196,22 @@ void handle_receive(const google::protobuf::Message& msg)
++receive_count;
}

void qsize(goby::acomms::protobuf::QueueSize size) { goby_message_qsize = size.size(); }
void qsize(goby::acomms::protobuf::QueueSize size)
{
std::cout << "Queue size change: " << size.ShortDebugString() << std::endl;

if (size.has_triggering_message())
{
boost::shared_ptr<google::protobuf::Message> triggering_message =
dccl::DynamicProtobufManager::new_protobuf_message(
size.triggering_message().full_name());
triggering_message->ParseFromString(size.triggering_message().data());
std::cout << "\tTriggering message decodes as: " << triggering_message->ShortDebugString()
<< std::endl;
}

goby_message_qsize = size.size();
}

void handle_ack(const goby::acomms::protobuf::ModemTransmission& ack_msg,
const google::protobuf::Message& orig_msg)
Expand Down