diff --git a/src/acomms/modemdriver/iridium_driver.cpp b/src/acomms/modemdriver/iridium_driver.cpp index 1843e234..3716592f 100644 --- a/src/acomms/modemdriver/iridium_driver.cpp +++ b/src/acomms/modemdriver/iridium_driver.cpp @@ -38,14 +38,16 @@ using goby::common::goby_time; using goby::acomms::operator<<; +boost::shared_ptr goby::acomms::iridium_header_dccl_; + goby::acomms::IridiumDriver::IridiumDriver() : fsm_(driver_cfg_), last_triple_plus_time_(0), - last_send_time_(0), serial_fd_(-1), next_frame_(0) { - + init_iridium_dccl(); + // assert(byte_string_to_uint32(uint32_to_byte_string(16540)) == 16540); } @@ -190,8 +192,10 @@ void goby::acomms::IridiumDriver::process_transmission(protobuf::ModemTransmissi { signal_modify_transmission(&msg); + const static unsigned frame_max = IridiumHeader::descriptor()->FindFieldByName("frame_start")->options().GetExtension(dccl::field).max(); + if(!msg.has_frame_start()) - msg.set_frame_start(next_frame_); + msg.set_frame_start(next_frame_ % frame_max); // set the frame size, if not set or if it exceeds the max configured if(!msg.has_max_frame_bytes() || msg.max_frame_bytes() > driver_cfg_.GetExtension(IridiumDriverConfig::max_frame_size)) @@ -209,7 +213,8 @@ void goby::acomms::IridiumDriver::process_transmission(protobuf::ModemTransmissi } else if(msg.rate() == RATE_SBD) { - fsm_.process_event(fsm::EvSBDBeginData()); // mailbox check + if(msg.GetExtension(IridiumDriverConfig::if_no_data_do_mailbox_check)) + fsm_.process_event(fsm::EvSBDBeginData()); // mailbox check } } @@ -221,26 +226,24 @@ void goby::acomms::IridiumDriver::do_work() // display_state_cfg(&glog); double now = goby_time(); - // if we're on a call, keep pushing data at the target rate - const double send_interval = - driver_cfg_.GetExtension(IridiumDriverConfig::max_frame_size) / - (driver_cfg_.GetExtension(IridiumDriverConfig::target_bit_rate) / static_cast(BITS_IN_BYTE)); - - const fsm::OnCall* on_call = fsm_.state_cast(); - - if(fsm_.data_out().empty() && - (now > (last_send_time_ + send_interval))) - { - if(on_call && !on_call->bye_sent()) - { - process_transmission(rudics_mac_msg_, false); - last_send_time_ = now; - } - } if(on_call) { + // if we're on a call, keep pushing data at the target rate + const double send_wait = + on_call->last_bytes_sent() / + (driver_cfg_.GetExtension(IridiumDriverConfig::target_bit_rate) / static_cast(BITS_IN_BYTE)); + + if(fsm_.data_out().empty() && + (now > (on_call->last_tx_time() + send_wait))) + { + if(!on_call->bye_sent()) + { + process_transmission(rudics_mac_msg_, false); + } + } + if(!on_call->bye_sent() && now > (on_call->last_tx_time() + driver_cfg_.GetExtension(IridiumDriverConfig::handshake_hangup_seconds))) { @@ -338,8 +341,11 @@ void goby::acomms::IridiumDriver::send(const protobuf::ModemTransmission& msg) fsm_.buffer_data_out(msg); else { + std::string iridium_packet; + serialize_iridium_modem_message(&iridium_packet, msg); + std::string rudics_packet; - serialize_rudics_packet(msg.SerializeAsString(), &rudics_packet); + serialize_rudics_packet(iridium_packet, &rudics_packet); fsm_.process_event(fsm::EvSBDBeginData(rudics_packet)); } diff --git a/src/acomms/modemdriver/iridium_driver.h b/src/acomms/modemdriver/iridium_driver.h index bc8ab2c0..8bc6c559 100644 --- a/src/acomms/modemdriver/iridium_driver.h +++ b/src/acomms/modemdriver/iridium_driver.h @@ -74,7 +74,6 @@ namespace goby enum { TRIPLE_PLUS_WAIT = 2 }; protobuf::ModemTransmission rudics_mac_msg_; - double last_send_time_; int serial_fd_; diff --git a/src/acomms/modemdriver/iridium_driver_common.h b/src/acomms/modemdriver/iridium_driver_common.h index a07e0d30..80e99eb5 100644 --- a/src/acomms/modemdriver/iridium_driver_common.h +++ b/src/acomms/modemdriver/iridium_driver_common.h @@ -23,6 +23,12 @@ #ifndef IridiumDriverCommon20150508H #define IridiumDriverCommon20150508H +#include +#include +#include + +#include "goby/acomms/protobuf/iridium_driver.pb.h" + namespace goby { namespace acomms @@ -36,13 +42,17 @@ namespace goby : last_tx_time_(goby::common::goby_time()), last_rx_time_(0), bye_received_(false), - bye_sent_(false) + bye_sent_(false), + total_bytes_sent_(0), + last_bytes_sent_(0) { } - double last_rx_tx_time() const { return std::max(last_tx_time_, last_rx_time_); } double last_rx_time() const { return last_rx_time_; } double last_tx_time() const { return last_tx_time_; } - + + int last_bytes_sent() const { return last_bytes_sent_; } + int total_bytes_sent() const { return total_bytes_sent_; } + void set_bye_received(bool b) { bye_received_ = b; } void set_bye_sent(bool b) { bye_sent_ = b; } @@ -53,13 +63,82 @@ namespace goby void set_last_tx_time(double d) { last_tx_time_ = d; } void set_last_rx_time(double d) { last_rx_time_ = d; } + void set_last_bytes_sent(int i) + { + last_bytes_sent_ = i; + total_bytes_sent_ += i; + } + private: double last_tx_time_; double last_rx_time_; bool bye_received_; - bool bye_sent_; + bool bye_sent_; + int total_bytes_sent_; + int last_bytes_sent_; }; + + + // placeholder id codec that uses no bits, since we're always sending just this message on the wire + class IridiumHeaderIdentifierCodec : public dccl::TypedFixedFieldCodec + { + dccl::Bitset encode() { return dccl::Bitset(); } + dccl::Bitset encode(const uint32& wire_value) { return dccl::Bitset(); } + dccl::uint32 decode(dccl::Bitset* bits) { return 0; } + virtual unsigned size() { return 0; } + + }; + + extern boost::shared_ptr iridium_header_dccl_; + + inline void init_iridium_dccl() + { + dccl::FieldCodecManager::add("iridium_header_id"); + iridium_header_dccl_.reset(new dccl::Codec("iridium_header_id")); + iridium_header_dccl_->load(); + } + + inline void serialize_iridium_modem_message(std::string* out, const goby::acomms::protobuf::ModemTransmission& in) + { + IridiumHeader header; + header.set_src(in.src()); + header.set_dest(in.dest()); + if(in.has_rate()) + header.set_rate(in.rate()); + header.set_type(in.type()); + if(in.has_ack_requested()) + header.set_ack_requested(in.ack_requested()); + if(in.has_frame_start()) + header.set_frame_start(in.frame_start()); + if(in.acked_frame_size()) + header.set_acked_frame(in.acked_frame(0)); + + iridium_header_dccl_->encode(out, header); + if(in.frame_size()) + *out += in.frame(0); + } + + inline void parse_iridium_modem_message(std::string in, goby::acomms::protobuf::ModemTransmission* out) + { + IridiumHeader header; + iridium_header_dccl_->decode(&in, &header); + out->set_src(header.src()); + out->set_dest(header.dest()); + if(header.has_rate()) + out->set_rate(header.rate()); + out->set_type(header.type()); + if(header.has_ack_requested()) + out->set_ack_requested(header.ack_requested()); + if(header.has_frame_start()) + out->set_frame_start(header.frame_start()); + if(header.has_acked_frame()) + out->add_acked_frame(header.acked_frame()); + + if(in.size()) + out->add_frame(in); + } + } } diff --git a/src/acomms/modemdriver/iridium_driver_fsm.cpp b/src/acomms/modemdriver/iridium_driver_fsm.cpp index 1b245a51..aa7208b2 100644 --- a/src/acomms/modemdriver/iridium_driver_fsm.cpp +++ b/src/acomms/modemdriver/iridium_driver_fsm.cpp @@ -26,6 +26,7 @@ #include "goby/common/logger.h" #include "goby/common/time.h" #include "goby/util/binary.h" +#include "goby/acomms/acomms_constants.h" #include "rudics_packet.h" #include "iridium_driver_fsm.h" @@ -144,7 +145,7 @@ void goby::acomms::fsm::Command::handle_sbd_rx(const std::string& in) std::string bytes; parse_rudics_packet(&bytes, sbd_rx_data); protobuf::ModemTransmission msg; - msg.ParseFromString(bytes); + parse_iridium_modem_message(bytes, &msg); context< IridiumDriverFSM >().received().push_back(msg); at_out().pop_front(); @@ -343,7 +344,7 @@ void goby::acomms::fsm::OnCall::in_state_react(const EvRxOnCallSerial& e) parse_rudics_packet(&bytes, in); protobuf::ModemTransmission msg; - msg.ParseFromString(bytes); + parse_iridium_modem_message(bytes, &msg); context< IridiumDriverFSM >().received().push_back(msg); set_last_rx_time(goby_time()); } @@ -357,13 +358,18 @@ void goby::acomms::fsm::OnCall::in_state_react(const EvRxOnCallSerial& e) void goby::acomms::fsm::OnCall::in_state_react( const EvTxOnCallSerial& ) { + const static double target_byte_rate = (context().driver_cfg().GetExtension(IridiumDriverConfig::target_bit_rate) / static_cast(goby::acomms::BITS_IN_BYTE)); + + const double send_wait = last_bytes_sent() / target_byte_rate; + + double now = goby_time(); boost::circular_buffer& data_out = context().data_out(); - if(!data_out.empty()) + if(!data_out.empty() && (now > last_tx_time() + send_wait)) { // serialize the (protobuf) message std::string bytes; - data_out.front().SerializeToString(&bytes); - + serialize_iridium_modem_message(&bytes, data_out.front()); + // frame message std::string rudics_packet; serialize_rudics_packet(bytes, &rudics_packet); @@ -371,7 +377,8 @@ void goby::acomms::fsm::OnCall::in_state_react( const EvTxOnCallSerial& ) context().serial_tx_buffer().push_back(rudics_packet); data_out.pop_front(); - set_last_tx_time(goby_time()); + set_last_bytes_sent(rudics_packet.size()); + set_last_tx_time(now); } } diff --git a/src/acomms/modemdriver/iridium_driver_fsm.h b/src/acomms/modemdriver/iridium_driver_fsm.h index ba6f6c98..4469513d 100644 --- a/src/acomms/modemdriver/iridium_driver_fsm.h +++ b/src/acomms/modemdriver/iridium_driver_fsm.h @@ -105,9 +105,6 @@ namespace goby struct EvSendBye : boost::statechart::event< EvSendBye > {}; - struct EvZMQConnect : boost::statechart::event< EvZMQConnect > {}; - struct EvZMQDisconnect : boost::statechart::event< EvZMQDisconnect > {}; - struct EvConfigured : boost::statechart::event< EvConfigured > {}; struct EvSBDBeginData : boost::statechart::event< EvSBDBeginData > { @@ -144,7 +141,6 @@ namespace goby struct Online; struct OnCall; - struct OnZMQCall; struct NotOnCall; struct SBD; @@ -426,37 +422,19 @@ namespace goby struct NotOnCall : boost::statechart::simple_state >, StateNotify { typedef boost::mpl::list< - boost::statechart::transition< EvConnect, OnCall >, - boost::statechart::transition< EvZMQConnect, OnZMQCall > + boost::statechart::transition< EvConnect, OnCall > > reactions; NotOnCall() : StateNotify("NotOnCall") {} ~NotOnCall() {} }; - struct OnZMQCall : boost::statechart::simple_state >, StateNotify, OnCallBase - { - - OnZMQCall() : StateNotify("OnZMQCall") { } - ~OnZMQCall() {} - - void in_state_react( const EvSendBye& ) - { - set_bye_sent(true); - } - - typedef boost::mpl::list< - boost::statechart::transition< EvZMQDisconnect, NotOnCall >, - boost::statechart::in_state_reaction< EvSendBye, OnZMQCall, &OnZMQCall::in_state_react > - > reactions; - - }; struct OnCall : boost::statechart::state >, StateNotify, OnCallBase { public: - OnCall(my_context ctx) : my_base(ctx), StateNotify("OnCall") + OnCall(my_context ctx) : my_base(ctx), StateNotify("OnCall") { // add a brief identifier that is *different* than the "~" which is what PPP uses // add a carriage return to clear out any garbage @@ -468,14 +446,14 @@ namespace goby } ~OnCall() { // signal the disconnect event for the command state to handle + glog.is(goby::common::logger::DEBUG1) && glog << group("iridiumdriver") << "Sent " << total_bytes_sent() << " bytes on this call." << std::endl; post_event(EvDisconnect()); } void in_state_react( const EvRxOnCallSerial& ); void in_state_react( const EvTxOnCallSerial& ); void in_state_react( const EvSendBye& ); - - + typedef boost::mpl::list< boost::statechart::transition< EvNoCarrier, NotOnCall >, boost::statechart::in_state_reaction< EvRxOnCallSerial, OnCall, &OnCall::in_state_react >, @@ -485,7 +463,7 @@ namespace goby > reactions; private: - + }; diff --git a/src/acomms/modemdriver/iridium_shore_driver.cpp b/src/acomms/modemdriver/iridium_shore_driver.cpp index 55f2b2d4..7b910d52 100644 --- a/src/acomms/modemdriver/iridium_shore_driver.cpp +++ b/src/acomms/modemdriver/iridium_shore_driver.cpp @@ -47,6 +47,7 @@ using goby::acomms::protobuf::DirectIPMTPayload; goby::acomms::IridiumShoreDriver::IridiumShoreDriver() : next_frame_(0) { + init_iridium_dccl(); } goby::acomms::IridiumShoreDriver::~IridiumShoreDriver() @@ -113,31 +114,32 @@ void goby::acomms::IridiumShoreDriver::do_work() // display_state_cfg(&glog); double now = goby_time(); - // if we're on a call, keep pushing data at the target rate - const double send_interval = - driver_cfg_.GetExtension(IridiumDriverConfig::max_frame_size) / - (driver_cfg_.GetExtension(IridiumDriverConfig::target_bit_rate) / static_cast(BITS_IN_BYTE)); for(std::map::iterator it = remote_.begin(), end = remote_.end(); it != end; ++it) { RemoteNode& remote = it->second; - boost::shared_ptr on_call_base = it->second.on_call; + boost::shared_ptr on_call_base = remote.on_call; ModemId id = it->first; - if(now > (remote.last_send_time + send_interval)) - { - if(on_call_base && !on_call_base->bye_sent()) - { - rudics_mac_msg_.set_dest(it->first); - process_transmission(rudics_mac_msg_); - remote.last_send_time = now; - } - } // if we're on either type of call, see if we need to send the "bye" message or hangup if(on_call_base) { + // if we're on a call, keep pushing data at the target rate + const double send_wait = + on_call_base->last_bytes_sent() / + (driver_cfg_.GetExtension(IridiumDriverConfig::target_bit_rate) / static_cast(BITS_IN_BYTE)); + + if(now > (on_call_base->last_tx_time() + send_wait)) + { + if(!on_call_base->bye_sent()) + { + rudics_mac_msg_.set_dest(it->first); + process_transmission(rudics_mac_msg_); + } + } + if(!on_call_base->bye_sent() && now > (on_call_base->last_tx_time() + driver_cfg_.GetExtension(IridiumDriverConfig::handshake_hangup_seconds))) { @@ -197,7 +199,7 @@ void goby::acomms::IridiumShoreDriver::send(const protobuf::ModemTransmission& m if(msg.rate() == RATE_RUDICS || remote.on_call) { std::string bytes; - msg.SerializeToString(&bytes); + serialize_iridium_modem_message(&bytes, msg); // frame message std::string rudics_packet; @@ -205,11 +207,12 @@ void goby::acomms::IridiumShoreDriver::send(const protobuf::ModemTransmission& m rudics_send(rudics_packet, msg.dest()); boost::shared_ptr on_call_base = remote.on_call; on_call_base->set_last_tx_time(goby_time()); + on_call_base->set_last_bytes_sent(rudics_packet.size()); } else if(msg.rate() == RATE_SBD) { std::string bytes; - msg.SerializeToString(&bytes); + serialize_iridium_modem_message(&bytes, msg); std::string sbd_packet; serialize_rudics_packet(bytes, &sbd_packet); @@ -294,8 +297,9 @@ void goby::acomms::IridiumShoreDriver::rudics_line(const std::string& data, boos parse_rudics_packet(&decoded_line, data); protobuf::ModemTransmission modem_msg; - modem_msg.ParseFromString(decoded_line); + parse_iridium_modem_message(decoded_line, &modem_msg); + glog.is(DEBUG1) && glog << "Received RUDICS message from: " << modem_msg.src() << " to: " << modem_msg.dest() << " from endpoint: " << connection->remote_endpoint_str() << std::endl; if(!clients_.left.count(modem_msg.src())) { @@ -347,8 +351,7 @@ void goby::acomms::IridiumShoreDriver::receive_sbd_mo() try { parse_rudics_packet(&bytes, (*it)->message().body().payload()); - - modem_msg.ParseFromString(bytes); + parse_iridium_modem_message(bytes, &modem_msg); glog.is(DEBUG1) && glog << "Rx SBD ModemTransmission: " << modem_msg.ShortDebugString() << std::endl; diff --git a/src/acomms/modemdriver/iridium_shore_driver.h b/src/acomms/modemdriver/iridium_shore_driver.h index a0e752c7..4ec2c349 100644 --- a/src/acomms/modemdriver/iridium_shore_driver.h +++ b/src/acomms/modemdriver/iridium_shore_driver.h @@ -84,13 +84,11 @@ namespace goby struct RemoteNode { enum { DATA_BUFFER_CAPACITY = 30 }; - RemoteNode() : data_out(DATA_BUFFER_CAPACITY), - last_send_time(0) + RemoteNode() : data_out(DATA_BUFFER_CAPACITY) { } boost::shared_ptr on_call; boost::circular_buffer data_out; - double last_send_time; }; diff --git a/src/acomms/protobuf/iridium_driver.proto b/src/acomms/protobuf/iridium_driver.proto index 970342e7..3879b69d 100644 --- a/src/acomms/protobuf/iridium_driver.proto +++ b/src/acomms/protobuf/iridium_driver.proto @@ -1,6 +1,7 @@ import "goby/common/protobuf/option_extensions.proto"; import "goby/acomms/protobuf/driver_base.proto"; -import "goby/common/protobuf/zero_mq_node_config.proto"; +import "goby/acomms/protobuf/modem_message.proto"; +import "dccl/protobuf/option_extensions.proto"; message IridiumDriverConfig { @@ -24,4 +25,33 @@ message IridiumDriverConfig optional bool use_dtr = 1390 [default = false]; optional int32 handshake_hangup_seconds = 1392 [default = 5]; } + + extend goby.acomms.protobuf.ModemTransmission + { + optional bool if_no_data_do_mailbox_check = 1381 [default = true]; + } + } + +// subset of goby.acomms.protobuf.ModemTransmission +message IridiumHeader +{ + option (dccl.msg).id = 0; + option (dccl.msg).max_bytes = 7; + + required int32 src = 1 [(dccl.field).min = 0, + (dccl.field).max = 30]; + + required int32 dest = 2 [(dccl.field).min = 0, + (dccl.field).max = 30]; + + optional int32 rate = 3 [(dccl.field).min = 0, + (dccl.field).max = 1]; + + required goby.acomms.protobuf.ModemTransmission.TransmissionType type = 4; + + + optional bool ack_requested = 5; + optional uint32 frame_start = 6 [(dccl.field).min = 0, (dccl.field).max = 0xFFFF]; + optional int32 acked_frame = 7 [(dccl.field).min = 0, (dccl.field).max = 0xFFFF]; +} \ No newline at end of file diff --git a/src/acomms/protobuf/network_header.proto b/src/acomms/protobuf/network_header.proto index 4728fc54..fdaf06ec 100644 --- a/src/acomms/protobuf/network_header.proto +++ b/src/acomms/protobuf/network_header.proto @@ -98,3 +98,38 @@ message UDPHeader required uint32 checksum = 4 [(dccl.field).codec = "net.short"]; } + + +message ICMPHeader +{ + option (dccl.msg).id = 0xF003; + option (dccl.msg).max_bytes = 8; + option (dccl.msg).codec_version = 3; + + required uint32 type = 1 [(dccl.field).min = 0, (dccl.field).max = 255]; + required uint32 code = 2 [(dccl.field).min = 0, (dccl.field).max = 255]; + required uint32 checksum = 3 [(dccl.field).codec = "net.short"]; + required uint32 short1 = 4 [(dccl.field).codec = "net.short"]; + required uint32 short2 = 5 [(dccl.field).codec = "net.short"]; +} + +message IPGatewayICMPControl +{ + enum MessageType + { + QUEUE_REPORT = 1; + } + required MessageType type = 1; + required string address = 2; + + message QueueReport + { + message SubQueue + { + required int32 dest = 1; + required int32 size = 2; + } + repeated SubQueue queue = 1; + } + optional QueueReport queue_report = 3; +} diff --git a/src/apps/acomms/goby_ip_gateway/ip_gateway.cpp b/src/apps/acomms/goby_ip_gateway/ip_gateway.cpp index 0da45ac0..f15c0b22 100644 --- a/src/apps/acomms/goby_ip_gateway/ip_gateway.cpp +++ b/src/apps/acomms/goby_ip_gateway/ip_gateway.cpp @@ -24,6 +24,7 @@ #include #include +#include #include "dccl/arithmetic/field_codec_arithmetic.h" @@ -39,7 +40,11 @@ enum { IPV4_ADDRESS_BITS = 32, MIN_IPV4_HEADER_LENGTH = 5, // number of 32-bit words - UDP_HEADER_SIZE = 8 }; + UDP_HEADER_SIZE = 8, + ICMP_HEADER_SIZE = 8, + ICMP_TYPE = 250, + IPV4_VERSION = 4, + ICMP_CODE = 0}; int tun_alloc(char *dev); int tun_config(const char* dev, const char* host, unsigned cidr_prefix, unsigned mtu); @@ -60,14 +65,23 @@ namespace goby void init_tun(); void loop(); - void handle_udp_packet(protobuf::ModemTransmission* m, - const goby::acomms::protobuf::IPv4Header& ip_hdr, + void receive_packets(); + + void handle_udp_packet(const goby::acomms::protobuf::IPv4Header& ip_hdr, const goby::acomms::protobuf::UDPHeader& udp_hdr, const std::string& payload); void write_udp_packet(goby::acomms::protobuf::IPv4Header& ip_hdr, goby::acomms::protobuf::UDPHeader& udp_hdr, const std::string& payload); + void write_icmp_control_message(const protobuf::IPGatewayICMPControl& control_msg); + + void write_icmp_packet(goby::acomms::protobuf::IPv4Header& ip_hdr, + goby::acomms::protobuf::ICMPHeader& icmp_hdr, + const std::string& payload); + void icmp_report_queue(); + + std::pair to_src_dest_pair(int srcdest); int from_src_dest_pair(std::pair src_dest); @@ -82,7 +96,7 @@ namespace goby private: goby::acomms::protobuf::IPGatewayConfig& cfg_; - dccl::Codec dccl_goby_nh_, dccl_ip_, dccl_udp_; + dccl::Codec dccl_goby_nh_, dccl_ip_, dccl_udp_, dccl_icmp_; int tun_fd_; int total_addresses_; goby::uint32 local_address_; // in host byte order @@ -97,6 +111,9 @@ namespace goby std::vector dynamic_udp_fd_; int ip_mtu_; // the MTU on the tun interface, which is slightly different than the Goby MTU specified in the config file since the IP and Goby NetworkHeader are different sizes. + + // maps destination goby address to message buffer + std::map > outgoing_; }; } } @@ -107,6 +124,7 @@ goby::acomms::IPGateway::IPGateway(goby::acomms::protobuf::IPGatewayConfig* cfg) dccl_goby_nh_("ip_gateway_id_codec_0"), dccl_ip_("ip_gateway_id_codec_1"), dccl_udp_("ip_gateway_id_codec_2"), + dccl_icmp_("ip_gateway_id_codec_3"), tun_fd_(-1), total_addresses_((1 << (IPV4_ADDRESS_BITS-cfg_.cidr_netmask_prefix()))-1), // minus one since we don't need to use .255 as broadcast local_address_(0), @@ -229,14 +247,22 @@ void goby::acomms::IPGateway::init_dccl() dccl_goby_nh_.load(); dccl_ip_.load(); dccl_udp_.load(); - + dccl_icmp_.load(); + dccl_goby_nh_.info_all(&std::cout); } void goby::acomms::IPGateway::init_tun() { char tun_name[IFNAMSIZ]; - strcpy(tun_name, "\0"); + + std::string desired_tun_name = "tun"; + if(cfg_.has_tun_number()) + desired_tun_name += goby::util::as(cfg_.tun_number()); + else + desired_tun_name += "%d"; + + strcpy(tun_name, desired_tun_name.c_str()); tun_fd_ = tun_alloc(tun_name); if(tun_fd_ < 0) glog.is(DIE) && glog << "Could not allocate tun interface. Check permissions?" << std::endl; @@ -264,11 +290,13 @@ goby::acomms::IPGateway::~IPGateway() void goby::acomms::IPGateway::loop() { - mac_.do_work(); + mac_.do_work(); + receive_packets(); } + + void goby::acomms::IPGateway::handle_udp_packet( - protobuf::ModemTransmission* m, const goby::acomms::protobuf::IPv4Header& ip_hdr, const goby::acomms::protobuf::UDPHeader& udp_hdr, const std::string& payload) @@ -278,21 +306,30 @@ void goby::acomms::IPGateway::handle_udp_packet( int src = ipv4_to_goby_address(ip_hdr.source_ip_address()); int dest = ipv4_to_goby_address(ip_hdr.dest_ip_address()); - - if(m->src() != src) - glog.is(WARN) && glog << "Wrong source ID on data request" << std::endl; - - m->set_dest(dest); - m->set_ack_requested(false); goby::acomms::protobuf::NetworkHeader net_header; net_header.set_protocol(goby::acomms::protobuf::NetworkHeader::UDP); net_header.set_srcdest_addr(from_src_dest_pair(std::make_pair(src,dest))); + // map destination first - we need this mapping to exist on the other end + // if we map the source first, we might use the source mapping when source port == dest port + int dest_port = 0, src_port = 0; + boost::bimap::right_map::const_iterator dest_it = port_map_.right.find(udp_hdr.dest_port()); + if(dest_it != port_map_.right.end()) + { + dest_port = dest_it->second; + } + else + { + glog.is(WARN) && glog << "No mapping for destination UDP port: " << udp_hdr.dest_port() << ". Unable to send packet." << std::endl; + return; + } + + boost::bimap::right_map::const_iterator src_it = port_map_.right.find(udp_hdr.source_port()); if(src_it != port_map_.right.end()) { - net_header.mutable_udp()->add_srcdest_port(src_it->second); + src_port = src_it->second; } else { @@ -312,24 +349,25 @@ void goby::acomms::IPGateway::handle_udp_packet( dynamic_port_index_ = cfg_.static_udp_port_size(); } } - - boost::bimap::right_map::const_iterator dest_it = port_map_.right.find(udp_hdr.dest_port()); - if(dest_it != port_map_.right.end()) - { - net_header.mutable_udp()->add_srcdest_port(dest_it->second); - } - else - { - glog.is(WARN) && glog << "No mapping for destination UDP port: " << udp_hdr.dest_port() << ". Unable to send packet." << std::endl; - return; - } - + + net_header.mutable_udp()->add_srcdest_port(src_port); + net_header.mutable_udp()->add_srcdest_port(dest_port); glog.is(VERBOSE) && glog << "NetHeader: " << net_header.DebugString() << std::endl; std::string nh; dccl_goby_nh_.encode(&nh, net_header); - m->add_frame(nh + payload); + + std::map >::iterator it = outgoing_.find(dest); + if(it == outgoing_.end()) + { + std::pair >::iterator, bool> itboolpair = + outgoing_.insert(std::make_pair(dest, boost::circular_buffer(cfg_.queue_size()))); + it = itboolpair.first; + } + + it->second.push_back(nh + payload); + icmp_report_queue(); } void goby::acomms::IPGateway::handle_initiate_transmission(const protobuf::ModemTransmission& m) @@ -337,12 +375,9 @@ void goby::acomms::IPGateway::handle_initiate_transmission(const protobuf::Modem publish(m, "Tx" + goby::util::as(local_modem_id_)); } - -void goby::acomms::IPGateway::handle_data_request(const protobuf::ModemTransmission& orig_msg) +void goby::acomms::IPGateway::receive_packets() { - protobuf::ModemTransmission msg = orig_msg; - - while((unsigned)msg.frame_size() < msg.max_num_frames()) + while(true) { fd_set rd_set; FD_ZERO(&rd_set); @@ -353,7 +388,7 @@ void goby::acomms::IPGateway::handle_data_request(const protobuf::ModemTransmiss if (ret < 0 && errno != EINTR) { glog.is(WARN) && glog << "Could not select on tun fd." << std::endl; - break; + return; } else if(ret > 0) { @@ -388,9 +423,20 @@ void goby::acomms::IPGateway::handle_data_request(const protobuf::ModemTransmiss goby::acomms::protobuf::UDPHeader udp_hdr; std::string udp_header_data(&buffer[ip_header_size], UDP_HEADER_SIZE); dccl_udp_.decode(udp_header_data, &udp_hdr); - handle_udp_packet(&msg, ip_hdr, udp_hdr, std::string(&buffer[ip_header_size+UDP_HEADER_SIZE], ip_hdr.total_length()-ip_header_size-UDP_HEADER_SIZE)); + handle_udp_packet(ip_hdr, udp_hdr, std::string(&buffer[ip_header_size+UDP_HEADER_SIZE], ip_hdr.total_length()-ip_header_size-UDP_HEADER_SIZE)); break; } + case IPPROTO_ICMP: + { + goby::acomms::protobuf::ICMPHeader icmp_hdr; + std::string icmp_header_data(&buffer[ip_header_size], ICMP_HEADER_SIZE); + dccl_icmp_.decode(icmp_header_data, &icmp_hdr); + glog.is(DEBUG1) && glog << "Received ICMP Packet with header: " << icmp_hdr.ShortDebugString() << std::endl; + glog.is(DEBUG1) && glog << "ICMP sending is not supported." << std::endl; + + break; + } + } } @@ -398,27 +444,83 @@ void goby::acomms::IPGateway::handle_data_request(const protobuf::ModemTransmiss } else { + // no select items + return; + } + } +} + + + +void goby::acomms::IPGateway::handle_data_request(const protobuf::ModemTransmission& orig_msg) +{ + if(cfg_.has_only_rate() && cfg_.only_rate() != orig_msg.rate()) + return; + + protobuf::ModemTransmission msg = orig_msg; + + bool had_data = false; + while((unsigned)msg.frame_size() < msg.max_num_frames()) + { + if(msg.dest() != goby::acomms::QUERY_DESTINATION_ID) + { + std::map >::iterator it = outgoing_.find(msg.dest()); + if(it == outgoing_.end()) + { + break; + } + else if(it->second.size() == 0) + { + break; + } + else + { + msg.set_ack_requested(false); + msg.add_frame(it->second.front()); + it->second.pop_front(); + had_data = true; + } + } + else + { + // TODO: not fair - prioritizes lower valued destinations + for(std::map >::iterator it = outgoing_.begin(), + end = outgoing_.end(); it != end; ++it) + { + if(it->second.size() == 0) + { + continue; + } + else + { + msg.set_dest(it->first); + msg.set_ack_requested(false); + msg.add_frame(it->second.front()); + it->second.pop_front(); + had_data = true; + break; + } + } break; } + } publish(msg, "DataResponse" + goby::util::as(local_modem_id_)); + if(had_data) + icmp_report_queue(); } void goby::acomms::IPGateway::handle_modem_receive(const goby::acomms::protobuf::ModemTransmission& modem_msg) { - std::cout << modem_msg.DebugString() << std::endl; + if(cfg_.has_only_rate() && cfg_.only_rate() != modem_msg.rate()) + return; for(int i = 0, n = modem_msg.frame_size(); i < n; ++i) { goby::acomms::protobuf::IPv4Header ip_hdr; goby::acomms::protobuf::UDPHeader udp_hdr; - enum - { - IPV4_VERSION = 4, - }; - goby::acomms::protobuf::NetworkHeader net_header; std::string frame = modem_msg.frame(i); @@ -531,6 +633,109 @@ void goby::acomms::IPGateway::write_udp_packet(goby::acomms::protobuf::IPv4Heade glog.is(WARN) && glog << "Failed to write all " << packet.size() << " bytes." << std::endl; } +void goby::acomms::IPGateway::icmp_report_queue() +{ + protobuf::IPGatewayICMPControl control_msg; + control_msg.set_type(protobuf::IPGatewayICMPControl::QUEUE_REPORT); + control_msg.set_address(cfg_.local_ipv4_address()); + + int total_messages = 0; + + for(std::map >::const_iterator it = outgoing_.begin(), end = outgoing_.end(); it != end; ++it) + { + int size = it->second.size(); + if(size) + { + protobuf::IPGatewayICMPControl::QueueReport::SubQueue* q = control_msg.mutable_queue_report()->add_queue(); + q->set_dest(it->first); + q->set_size(size); + total_messages += size; + } + } + write_icmp_control_message(control_msg); + + // skip the MACManager and directly initiate the transmission + if(total_messages > 0 && cfg_.bypass_mac()) + { + if(cfg_.has_bypass_mac_slot()) + { + handle_initiate_transmission(cfg_.bypass_mac_slot()); + } + else + { + protobuf::ModemTransmission m; + m.set_src(local_modem_id_); + m.set_type(protobuf::ModemTransmission::DATA); + if(cfg_.has_only_rate()) + m.set_rate(cfg_.only_rate()); + handle_initiate_transmission(m); + } + } +} + +void goby::acomms::IPGateway::write_icmp_control_message(const protobuf::IPGatewayICMPControl& control_msg) +{ + std::string control_data; + control_msg.SerializeToString(&control_data); + + glog.is(DEBUG1) && glog << "Writing ICMP Control message: " << control_msg.DebugString() << std::endl; + + + goby::acomms::protobuf::IPv4Header ip_hdr; + ip_hdr.set_ihl(MIN_IPV4_HEADER_LENGTH); + ip_hdr.set_version(IPV4_VERSION); + ip_hdr.set_ecn(0); + ip_hdr.set_dscp(0); + ip_hdr.set_total_length(MIN_IPV4_HEADER_LENGTH*4 + ICMP_HEADER_SIZE + control_data.size()); + ip_hdr.set_identification(0); + ip_hdr.mutable_flags_frag_offset()->set_dont_fragment(false); + ip_hdr.mutable_flags_frag_offset()->set_more_fragments(false); + ip_hdr.mutable_flags_frag_offset()->set_fragment_offset(0); + ip_hdr.set_ttl(63); + ip_hdr.set_protocol(IPPROTO_ICMP); + ip_hdr.set_source_ip_address(cfg_.local_ipv4_address()); + ip_hdr.set_dest_ip_address(cfg_.local_ipv4_address()); + + goby::acomms::protobuf::ICMPHeader icmp_hdr; + icmp_hdr.set_type(ICMP_TYPE); + icmp_hdr.set_code(ICMP_CODE); + + write_icmp_packet(ip_hdr, icmp_hdr, control_data); +} + + + +void goby::acomms::IPGateway::write_icmp_packet(goby::acomms::protobuf::IPv4Header& ip_hdr, goby::acomms::protobuf::ICMPHeader& icmp_hdr, const std::string& payload) +{ + // set checksum 0 for calculation + ip_hdr.set_header_checksum(0); + icmp_hdr.set_checksum(0); + icmp_hdr.set_short1(0); + icmp_hdr.set_short2(0); + + std::string ip_hdr_data, icmp_hdr_data; + dccl_icmp_.encode(&icmp_hdr_data, icmp_hdr); + dccl_ip_.encode(&ip_hdr_data, ip_hdr); + + enum { NET_SHORT_BYTES = 2, NET_LONG_BYTES = 4 }; + enum { IPV4_SOURCE_ADDR_OFFSET = 12, IPV4_DEST_ADDR_OFFSET = 16, IPV4_CS_OFFSET = 10 }; + enum { ICMP_CS_OFFSET = 2}; + + uint16_t ip_checksum = net_checksum(ip_hdr_data); + uint16_t icmp_checksum = net_checksum(icmp_hdr_data + payload); + + ip_hdr_data[IPV4_CS_OFFSET] = (ip_checksum >> 8) & 0xFF; + ip_hdr_data[IPV4_CS_OFFSET+1] = ip_checksum & 0xFF; + + icmp_hdr_data[ICMP_CS_OFFSET] = (icmp_checksum >> 8) & 0xFF; + icmp_hdr_data[ICMP_CS_OFFSET+1] = icmp_checksum & 0xFF; + + std::string packet(ip_hdr_data + icmp_hdr_data + payload); + unsigned len = write(tun_fd_, packet.c_str(), packet.size()); + if(len < packet.size()) + glog.is(WARN) && glog << "Failed to write all " << packet.size() << " bytes." << std::endl; +} + // src // 0 1 2 3 // ------------ @@ -615,6 +820,7 @@ int main(int argc, char* argv[]) dccl::FieldCodecManager::add >("ip_gateway_id_codec_0"); dccl::FieldCodecManager::add >("ip_gateway_id_codec_1"); dccl::FieldCodecManager::add >("ip_gateway_id_codec_2"); + dccl::FieldCodecManager::add >("ip_gateway_id_codec_3"); dccl::FieldCodecManager::add("net.short"); dccl::FieldCodecManager::add("ip.v4.address"); dccl::FieldCodecManager::add("ip.v4.flagsfragoffset"); diff --git a/src/apps/acomms/goby_ip_gateway/ip_gateway_config.proto b/src/apps/acomms/goby_ip_gateway/ip_gateway_config.proto index 79d91ac4..9cc9237f 100644 --- a/src/apps/acomms/goby_ip_gateway/ip_gateway_config.proto +++ b/src/apps/acomms/goby_ip_gateway/ip_gateway_config.proto @@ -1,6 +1,7 @@ import "goby/common/protobuf/option_extensions.proto"; import "goby/common/protobuf/app_base_config.proto"; import "goby/acomms/protobuf/amac_config.proto"; +import "goby/acomms/protobuf/modem_message.proto"; package goby.acomms.protobuf; @@ -10,6 +11,7 @@ message IPGatewayConfig required string local_ipv4_address = 2; required uint32 cidr_netmask_prefix = 3; + optional int32 tun_number = 4; optional bool enable_broadcast_address = 10 [default = true]; // use all ones address as broadcast (e.g. 192.168.1.255 in /24 subnet optional uint32 dynamic_address_count = 11 [default = 0]; // number of dynamic addresses for use as reverse-NAT (starting at one less than broadcast and counting down). @@ -29,6 +31,12 @@ message IPGatewayConfig optional int32 total_ports = 20 [default = 1]; repeated uint32 static_udp_port = 21; - required MACConfig mac_cfg = 30; + optional MACConfig mac_cfg = 30; required uint32 mtu = 31; + optional bool bypass_mac = 32 [default = false]; + optional ModemTransmission bypass_mac_slot = 33; + + optional int32 queue_size = 40 [default = 100]; + + optional int32 only_rate = 50; } diff --git a/src/test/util/nmea/nmea.cpp b/src/test/util/nmea/nmea.cpp index 082d693b..481430eb 100644 --- a/src/test/util/nmea/nmea.cpp +++ b/src/test/util/nmea/nmea.cpp @@ -56,6 +56,13 @@ int main() std::cout << nmea.message() << std::endl; assert(nmea.at(8) == "ROLL"); } + + { + goby::util::NMEASentence nmea("!AIVDO,1,1,,,B0000003wk?8mP=18D3Q3wwUkP06,0*7B"); + std::cout << nmea.message() << std::endl; + assert(nmea.as(1) == 1); + assert(nmea.as(2) == 1); + } std::cout << "all tests passed" << std::endl; diff --git a/src/util/linebasedcomms/connection.h b/src/util/linebasedcomms/connection.h index 501411e1..e2b7fb10 100644 --- a/src/util/linebasedcomms/connection.h +++ b/src/util/linebasedcomms/connection.h @@ -86,31 +86,21 @@ namespace goby } std::istream is(&buffer_); - std::string& line = *in_datagram_.mutable_data(); - + if(!remote_endpoint().empty()) in_datagram_.set_src(remote_endpoint()); if(!local_endpoint().empty()) in_datagram_.set_dest(local_endpoint()); in_datagram_.set_time(goby::common::goby_time()); - char last = interface_->delimiter().at(interface_->delimiter().length()-1); - while(!std::getline(is, line, last).eof()) + std::getline(is, line, last); + { - line = extra_ + line + last; - // grab a lock on the in_ deque because the user can modify - boost::mutex::scoped_lock lock(interface_->in_mutex()); - + boost::mutex::scoped_lock lock(interface_->in_mutex()); interface_->in().push_back(in_datagram_); - - extra_.clear(); - } - - // store any remainder for the next round - if(!line.empty()) extra_ = line; - + } read_start(); // start waiting for another asynchronous read again } @@ -144,7 +134,6 @@ namespace goby private: LineBasedInterface* interface_; boost::asio::streambuf buffer_; - std::string extra_; protobuf::Datagram in_datagram_; std::deque out_; // buffered write data diff --git a/src/util/linebasedcomms/nmea_sentence.cpp b/src/util/linebasedcomms/nmea_sentence.cpp index 9ea94752..6952a6db 100644 --- a/src/util/linebasedcomms/nmea_sentence.cpp +++ b/src/util/linebasedcomms/nmea_sentence.cpp @@ -37,8 +37,8 @@ goby::util::NMEASentence::NMEASentence(std::string s, strategy cs_strat /*= VALI // Basic error checks ($, empty) if (s.empty()) throw bad_nmea_sentence("NMEASentence: no message provided."); - if (s[0] != '$') - throw bad_nmea_sentence("NMEASentence: no $: '" + s + "'."); + if (s[0] != '$' && s[0] != '!') + throw bad_nmea_sentence("NMEASentence: no $ or !: '" + s + "'."); // Check if the checksum exists and is correctly placed, and strip it. // If it's not correctly placed, we'll interpret it as part of message. // NMEA spec doesn't seem to say that * is forbidden elsewhere? (should be) @@ -68,11 +68,11 @@ unsigned char goby::util::NMEASentence::checksum(const std::string& s) { if(s.empty()) throw bad_nmea_sentence("NMEASentence::checksum: no message provided."); - std::string::size_type star = s.find_first_of("*\r\n"); - std::string::size_type dollar = s.find('$'); + std::string::size_type star = s.find_first_of("*"); + std::string::size_type dollar = s.find_first_of("$!"); if(dollar == std::string::npos) - throw bad_nmea_sentence("NMEASentence::checksum: no $ found."); + throw bad_nmea_sentence("NMEASentence::checksum: no $ or ! found."); if(star == std::string::npos) star = s.length();