From 6259bf9d2e6e1453d4b6308f3d56c64aabcc77ef Mon Sep 17 00:00:00 2001 From: Sylvain Joyeux Date: Fri, 14 Feb 2025 10:33:32 -0300 Subject: [PATCH 1/6] fix: resolve warning about polymorphic exception --- rtt/transports/corba/TaskContextServer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rtt/transports/corba/TaskContextServer.cpp b/rtt/transports/corba/TaskContextServer.cpp index 09af370e4..8d2a5fde5 100644 --- a/rtt/transports/corba/TaskContextServer.cpp +++ b/rtt/transports/corba/TaskContextServer.cpp @@ -103,7 +103,7 @@ namespace RTT rootNC->unbind(name); log(Info) << "Successfully removed CTaskContext '"<< mregistered_name <<"' from CORBA Naming Service."< Date: Fri, 14 Feb 2025 10:33:57 -0300 Subject: [PATCH 2/6] fix: expand the CORBA connection API to avoid all remote calls within RTT itself --- rtt/transports/corba/DataFlow.idl | 48 +++++++- rtt/transports/corba/DataFlowI.cpp | 177 ++++++++++++++++++++++++++--- rtt/transports/corba/DataFlowI.h | 24 ++++ 3 files changed, 232 insertions(+), 17 deletions(-) diff --git a/rtt/transports/corba/DataFlow.idl b/rtt/transports/corba/DataFlow.idl index bc99e0933..a96f79d1b 100644 --- a/rtt/transports/corba/DataFlow.idl +++ b/rtt/transports/corba/DataFlow.idl @@ -100,6 +100,13 @@ module RTT * for the data type of the given ports */ exception CNoCorbaTransport {}; + /** Emitted when the system is expected to create a local connection, but one + * of the ports is remote + */ + exception CNotLocalConnection {}; + /** Emitted when a call refers to a corba channel that we can't find internally + */ + exception CNoSuchCorbaChannel {}; enum CPortType { CInput, COutput }; @@ -157,6 +164,19 @@ module RTT void disconnectPort(in string port_name) raises(CNoSuchPortException); + /** + * Create the output half of a channel, from the CORBA endpoint to the input port + * + * For the output halves, there is no difference between buildChannelOutput and + * buildChannelOutputHalf. This method is created for consistency with + * buildChannelInputHalf + * + * The returned channel element will not be functional until + * channelReady() has been called for it + */ + CChannelElement buildChannelOutputHalf(in string input_port, inout CConnPolicy policy) + raises(CNoCorbaTransport,CNoSuchPortException); + /** * Use this to write to an input port with * the given policy. @@ -169,6 +189,22 @@ module RTT CChannelElement buildChannelOutput(in string input_port, inout CConnPolicy policy) raises(CNoCorbaTransport,CNoSuchPortException); + /** + * Connect the input half of the channel to its port + */ + boolean connectChannelInputHalf(in string output_port, in CChannelElement channel, in CConnPolicy policy) + raises(CNoCorbaTransport, CNoSuchPortException, CNoSuchCorbaChannel); + + /** + * Create the half of a connection, from an output port to a CORBA channel element + * + * Unlike with buildChannelInput, the channel is not yet connected to the output + * port. You need to call connectChannelInputHalf to do so, once the whole channel + * is fully setup. + */ + CChannelElement buildChannelInputHalf(in string output_port, inout CConnPolicy policy) + raises(CNoCorbaTransport,CNoSuchPortException); + /** * Use this to read from an output port with * the given policy. @@ -178,6 +214,16 @@ module RTT CChannelElement buildChannelInput(in string output_port, inout CConnPolicy policy) raises(CNoCorbaTransport,CNoSuchPortException); + /** + * Connect two ports when they are both within the same process + * + * The local port needs to be the output, while the remote port is the + * input. + */ + boolean createLocalConnection(in string local_port, in CDataFlowInterface remote_ports, + in string remote_port, inout CConnPolicy policy) + raises(CNoSuchPortException, CNotLocalConnection); + /** * Connect the given named port to the given remote port. * Use this method to connect two Orocos data flow ports. @@ -235,7 +281,7 @@ module RTT * @return false if the connection could not be used. */ boolean channelReady(in string input_port,in CChannelElement channel, in CConnPolicy cp) - raises(CNoSuchPortException); + raises(CNoSuchPortException, CNoSuchCorbaChannel); }; }; }; diff --git a/rtt/transports/corba/DataFlowI.cpp b/rtt/transports/corba/DataFlowI.cpp index dfa655f1e..297ce74af 100644 --- a/rtt/transports/corba/DataFlowI.cpp +++ b/rtt/transports/corba/DataFlowI.cpp @@ -240,6 +240,27 @@ void CDataFlowInterface_i::deregisterChannel(CChannelElement_ptr channel) } } +CORBA::Boolean CDataFlowInterface_i::connectChannelInputHalf(const char* output_port_name, CChannelElement_ptr channel, CConnPolicy const& policy) ACE_THROW_SPEC (( + CORBA::SystemException + ,::RTT::corba::CNoSuchPortException + )) +{ + PortInterface* p = mdf->getPort(output_port_name); + if (p == 0) + throw corba::CNoSuchPortException(); + + OutputPortInterface* ip = dynamic_cast(p); + if (ip == 0) + throw corba::CNoSuchPortException(); + + CORBA_CHECK_THREAD(); + + // Attach to our output port: + auto cxx_channel = findCXXChannelFromCORBA(channel); + auto cxx_policy = toRTT(policy); + return cxx_channel->getPort()->addConnection( new SimpleConnID(), cxx_channel->getInputEndPoint(), cxx_policy); +} + CORBA::Boolean CDataFlowInterface_i::channelReady(const char * reader_port_name, CChannelElement_ptr channel, CConnPolicy const& policy ) ACE_THROW_SPEC (( CORBA::SystemException ,::RTT::corba::CNoSuchPortException @@ -254,29 +275,29 @@ CORBA::Boolean CDataFlowInterface_i::channelReady(const char * reader_port_name, throw corba::CNoSuchPortException(); CORBA_CHECK_THREAD(); + // lookup the C++ channel that matches the corba channel and // inform our local port that that C++ channel is ready. { RTT::os::MutexLock lock(channel_list_mtx); - ChannelList::iterator it=channel_list.begin(); - for (; it != channel_list.end(); ++it) { - if (it->first->_is_equivalent (channel) ) { - try { - ConnPolicy cp; - cp=toRTT(policy); - return ip->channelReady( it->second, cp ); - } - catch(std::exception const& e) - { - log(Error) << "call to channelReady threw " << e.what() << endlog(); - throw; - } - } - } + auto cxx_channel = findCXXChannelFromCORBA(channel); + ConnPolicy cxx_policy = toRTT(policy); + return ip->channelReady(cxx_channel, cxx_policy); } log(Error) << "Invalid CORBA channel given for port " << reader_port_name << ": could not match it to a local C++ channel." <first->_is_equivalent(corba) ) { + return it->second; + } + } + + throw corba::CNoSuchCorbaChannel(); +} + void CDataFlowInterface_i::disconnectPort(const char * port_name) ACE_THROW_SPEC (( CORBA::SystemException ,::RTT::corba::CNoSuchPortException @@ -375,6 +396,16 @@ std::string CDataFlowInterface_i::dispatcherNameFromPolicy(RTT::DataFlowInterfac return policy.name_id; } +CChannelElement_ptr CDataFlowInterface_i::buildChannelOutputHalf( + const char* port_name, CConnPolicy & corba_policy) ACE_THROW_SPEC (( + CORBA::SystemException + ,::RTT::corba::CNoCorbaTransport + ,::RTT::corba::CNoSuchPortException + )) +{ + return buildChannelOutput(port_name, corba_policy); +} + CChannelElement_ptr CDataFlowInterface_i::buildChannelOutput( const char* port_name, CConnPolicy & corba_policy) ACE_THROW_SPEC (( CORBA::SystemException @@ -528,7 +559,6 @@ CChannelElement_ptr CDataFlowInterface_i::buildChannelInput( buf->setOutput( dynamic_cast(this_element) ); } - // Attach to our output port: port->addConnection( new SimpleConnID(), start->getInputEndPoint(), policy); @@ -543,6 +573,121 @@ CChannelElement_ptr CDataFlowInterface_i::buildChannelInput( return this_element->_this(); } +/** + * This code is a major copy-past of the above. Amazing how much boiler plate we need. + */ +CChannelElement_ptr CDataFlowInterface_i::buildChannelInputHalf( + const char* port_name, CConnPolicy & corba_policy) ACE_THROW_SPEC (( + CORBA::SystemException + ,::RTT::corba::CNoCorbaTransport + ,::RTT::corba::CNoSuchPortException + )) +{ + Logger::In in("CDataFlowInterface_i::buildChannelInputHalf"); + // First check validity of user input... + OutputPortInterface* port = dynamic_cast(mdf->getPort(port_name)); + if (port == 0) + throw CNoSuchPortException(); + + TypeInfo const* type_info = port->getTypeInfo(); + if (!type_info) + throw CNoCorbaTransport(); + + CorbaTypeTransporter* transporter = + dynamic_cast(type_info->getProtocol(ORO_CORBA_PROTOCOL_ID)); + if (!transporter) + throw CNoCorbaTransport(); + + CORBA_CHECK_THREAD(); + // Convert to RTT policy. + ConnPolicy policy = toRTT(corba_policy); + + // Now create the output-side channel elements. + ChannelElementBase::shared_ptr start = type_info->buildChannelInput(*port); + + std::string dispatcherName = dispatcherNameFromPolicy(mdf, policy); + // The channel element that exposes our channel in CORBA + CRemoteChannelElement_i* this_element = + transporter->createOutputChannelElement_i(dispatcherName, mpoa, corba_policy.pull, corba_policy.signalling); + PortableServer::ServantBase_var servant = this_element; + this_element->setCDataFlowInterface(this); + + // Attach the corba channel element first (so OOB is after corba). + assert( dynamic_cast(this_element) ); + start->getOutputEndPoint()->setOutput( dynamic_cast(this_element)); + + /* + * This part if for out-of band. (needs to be factored out). + */ + if ( corba_policy.transport !=0 && corba_policy.transport != ORO_CORBA_PROTOCOL_ID) { + // prepare out-of-band transport for this port. + // if user supplied name, use that one. + if ( type_info->getProtocol(corba_policy.transport) == 0 ) { + log(Error) << "Could not create out-of-band transport for port "<< port_name << " with transport id " << corba_policy.transport <getTypeName() <getProtocol(corba_policy.transport)->createStream(port, policy, true); + // if no user supplied name, pass on the new name. + if ( strlen( corba_policy.name_id.in()) == 0 ) + corba_policy.name_id = CORBA::string_dup( policy.name_id.c_str() ); + + if (ceb) { + // OOB is added to end of chain. + start->getOutputEndPoint()->setOutput( ceb ); + log(Info) <<"Sending data from port "<< policy.name_id << " to out-of-band protocol "<< corba_policy.transport <getTypeName()<< " failed to create an out-of-band endpoint for port " << port_name<buildDataStorage(toRTT(corba_policy)); + start->setOutput(buf); + buf->setOutput( dynamic_cast(this_element) ); + } + + // Finally, store our mapping of corba channel elements to C++ channel elements. We need this for channelReady() and removing a channel again. + { RTT::os::MutexLock lock(channel_list_mtx); + channel_list.push_back( ChannelList::value_type(this_element->_this(), start->getInputEndPoint())); + } + + return this_element->_this(); +} + +::CORBA::Boolean CDataFlowInterface_i::createLocalConnection( + const char* writer_port, CDataFlowInterface_ptr reader_interface, + const char* reader_port, CConnPolicy & policy) ACE_THROW_SPEC (( + CORBA::SystemException + ,::RTT::corba::CNoSuchPortException + )) +{ + Logger::In in("CDataFlowInterface_i::createConnection"); + OutputPortInterface* writer = dynamic_cast(mdf->getPort(writer_port)); + if (writer == 0) + throw CNoSuchPortException(); + + CORBA_CHECK_THREAD(); + // Check if +reader_interface+ is local. If it is, use the non-CORBA + // connection. + RTT::DataFlowInterface* local_interface = CDataFlowInterface_i::getLocalInterface(reader_interface); + if (!local_interface) { + throw CNotLocalConnection(); + } + + InputPortInterface* reader = + dynamic_cast(local_interface->getPort(reader_port)); + if (!reader) + { + log(Warning) << "CORBA: createConnection() target is not an input port" << endlog(); + throw CNoSuchPortException(); + return false; + } + + log(Debug) << "CORBA: createConnection() is creating a LOCAL connection between " << + writer_port << " and " << reader_port << endlog(); + return writer->createConnection(*reader, toRTT(policy)); +} ::CORBA::Boolean CDataFlowInterface_i::createConnection( const char* writer_port, CDataFlowInterface_ptr reader_interface, diff --git a/rtt/transports/corba/DataFlowI.h b/rtt/transports/corba/DataFlowI.h index 93453ea3c..dc32af45c 100644 --- a/rtt/transports/corba/DataFlowI.h +++ b/rtt/transports/corba/DataFlowI.h @@ -134,6 +134,8 @@ namespace RTT { ChannelList channel_list; // Lock that should be taken before access to channel_list RTT::os::Mutex channel_list_mtx; + + base::ChannelElementBase::shared_ptr findCXXChannelFromCORBA(CChannelElement_ptr corba); public: // standard constructor CDataFlowInterface_i(DataFlowInterface* interface, PortableServer::POA_ptr poa); @@ -194,6 +196,13 @@ namespace RTT { static std::string dispatcherNameFromPolicy( RTT::DataFlowInterface* interface, RTT::ConnPolicy const& policy); + + CChannelElement_ptr buildChannelOutputHalf( + const char* port_name, CConnPolicy & corba_policy) ACE_THROW_SPEC (( + CORBA::SystemException + ,::RTT::corba::CNoCorbaTransport + ,::RTT::corba::CNoSuchPortException + )); CChannelElement_ptr buildChannelOutput(const char* reader_port, RTT::corba::CConnPolicy& policy) ACE_THROW_SPEC (( CORBA::SystemException ,::RTT::corba::CNoCorbaTransport @@ -204,6 +213,21 @@ namespace RTT { ,::RTT::corba::CNoCorbaTransport ,::RTT::corba::CNoSuchPortException )); + CChannelElement_ptr buildChannelInputHalf(const char* writer_port, RTT::corba::CConnPolicy& policy) ACE_THROW_SPEC (( + CORBA::SystemException + ,::RTT::corba::CNoCorbaTransport + ,::RTT::corba::CNoSuchPortException + )); + CORBA::Boolean connectChannelInputHalf(const char* output_port_name, CChannelElement_ptr channel, CConnPolicy const& policy) ACE_THROW_SPEC (( + CORBA::SystemException + ,::RTT::corba::CNoSuchPortException + )); + ::CORBA::Boolean createLocalConnection( + const char* writer_port, CDataFlowInterface_ptr reader_interface, + const char* reader_port, CConnPolicy & policy) ACE_THROW_SPEC (( + CORBA::SystemException + ,::RTT::corba::CNoSuchPortException + )); ::CORBA::Boolean createConnection( const char* writer_port, CDataFlowInterface_ptr reader_interface, From a9b41128ee5d477fe70d29e671b2ada1a6a563cc Mon Sep 17 00:00:00 2001 From: Sylvain Joyeux Date: Mon, 17 Feb 2025 09:49:47 -0300 Subject: [PATCH 3/6] chore: refactor the build*Half to pair endpoint creation with adding connection The issue with having a connendpoint without having the connection registered is that it crashes on disconnect, since the endpoint calls the port and then the port cannot find the connection --- rtt/transports/corba/DataFlow.idl | 37 +++-- rtt/transports/corba/DataFlowI.cpp | 220 +++++++++++++++++++++-------- rtt/transports/corba/DataFlowI.h | 45 ++++-- 3 files changed, 215 insertions(+), 87 deletions(-) diff --git a/rtt/transports/corba/DataFlow.idl b/rtt/transports/corba/DataFlow.idl index a96f79d1b..d21acf0e0 100644 --- a/rtt/transports/corba/DataFlow.idl +++ b/rtt/transports/corba/DataFlow.idl @@ -164,19 +164,6 @@ module RTT void disconnectPort(in string port_name) raises(CNoSuchPortException); - /** - * Create the output half of a channel, from the CORBA endpoint to the input port - * - * For the output halves, there is no difference between buildChannelOutput and - * buildChannelOutputHalf. This method is created for consistency with - * buildChannelInputHalf - * - * The returned channel element will not be functional until - * channelReady() has been called for it - */ - CChannelElement buildChannelOutputHalf(in string input_port, inout CConnPolicy policy) - raises(CNoCorbaTransport,CNoSuchPortException); - /** * Use this to write to an input port with * the given policy. @@ -189,12 +176,6 @@ module RTT CChannelElement buildChannelOutput(in string input_port, inout CConnPolicy policy) raises(CNoCorbaTransport,CNoSuchPortException); - /** - * Connect the input half of the channel to its port - */ - boolean connectChannelInputHalf(in string output_port, in CChannelElement channel, in CConnPolicy policy) - raises(CNoCorbaTransport, CNoSuchPortException, CNoSuchCorbaChannel); - /** * Create the half of a connection, from an output port to a CORBA channel element * @@ -205,6 +186,24 @@ module RTT CChannelElement buildChannelInputHalf(in string output_port, inout CConnPolicy policy) raises(CNoCorbaTransport,CNoSuchPortException); + /** + * Connect the input half of the channel to its port + */ + boolean connectChannelInputHalf(in string output_port, in CChannelElement channel, in CConnPolicy policy) + raises(CNoCorbaTransport, CNoSuchPortException, CNoSuchCorbaChannel); + + /** + * Create the output half of a channel, not yet connected to the input port + */ + CChannelElement buildChannelOutputHalf(in string input_port, inout CConnPolicy policy) + raises(CNoCorbaTransport,CNoSuchPortException); + + /** + * Create the output half of a channel, not yet connected to the input port + */ + boolean connectChannelOutputHalf(in string input_port, in CChannelElement channel, in CConnPolicy policy) + raises(CNoCorbaTransport,CNoSuchPortException); + /** * Use this to read from an output port with * the given policy. diff --git a/rtt/transports/corba/DataFlowI.cpp b/rtt/transports/corba/DataFlowI.cpp index 297ce74af..adcad6537 100644 --- a/rtt/transports/corba/DataFlowI.cpp +++ b/rtt/transports/corba/DataFlowI.cpp @@ -240,27 +240,6 @@ void CDataFlowInterface_i::deregisterChannel(CChannelElement_ptr channel) } } -CORBA::Boolean CDataFlowInterface_i::connectChannelInputHalf(const char* output_port_name, CChannelElement_ptr channel, CConnPolicy const& policy) ACE_THROW_SPEC (( - CORBA::SystemException - ,::RTT::corba::CNoSuchPortException - )) -{ - PortInterface* p = mdf->getPort(output_port_name); - if (p == 0) - throw corba::CNoSuchPortException(); - - OutputPortInterface* ip = dynamic_cast(p); - if (ip == 0) - throw corba::CNoSuchPortException(); - - CORBA_CHECK_THREAD(); - - // Attach to our output port: - auto cxx_channel = findCXXChannelFromCORBA(channel); - auto cxx_policy = toRTT(policy); - return cxx_channel->getPort()->addConnection( new SimpleConnID(), cxx_channel->getInputEndPoint(), cxx_policy); -} - CORBA::Boolean CDataFlowInterface_i::channelReady(const char * reader_port_name, CChannelElement_ptr channel, CConnPolicy const& policy ) ACE_THROW_SPEC (( CORBA::SystemException ,::RTT::corba::CNoSuchPortException @@ -278,13 +257,9 @@ CORBA::Boolean CDataFlowInterface_i::channelReady(const char * reader_port_name, // lookup the C++ channel that matches the corba channel and // inform our local port that that C++ channel is ready. - { RTT::os::MutexLock lock(channel_list_mtx); - auto cxx_channel = findCXXChannelFromCORBA(channel); - ConnPolicy cxx_policy = toRTT(policy); - return ip->channelReady(cxx_channel, cxx_policy); - } - log(Error) << "Invalid CORBA channel given for port " << reader_port_name << ": could not match it to a local C++ channel." <channelReady(cxx_channel, cxx_policy); } RTT::base::ChannelElementBase::shared_ptr CDataFlowInterface_i::findCXXChannelFromCORBA(RTT::corba::CChannelElement_ptr corba) { @@ -396,16 +371,6 @@ std::string CDataFlowInterface_i::dispatcherNameFromPolicy(RTT::DataFlowInterfac return policy.name_id; } -CChannelElement_ptr CDataFlowInterface_i::buildChannelOutputHalf( - const char* port_name, CConnPolicy & corba_policy) ACE_THROW_SPEC (( - CORBA::SystemException - ,::RTT::corba::CNoCorbaTransport - ,::RTT::corba::CNoSuchPortException - )) -{ - return buildChannelOutput(port_name, corba_policy); -} - CChannelElement_ptr CDataFlowInterface_i::buildChannelOutput( const char* port_name, CConnPolicy & corba_policy) ACE_THROW_SPEC (( CORBA::SystemException @@ -602,9 +567,6 @@ CChannelElement_ptr CDataFlowInterface_i::buildChannelInputHalf( // Convert to RTT policy. ConnPolicy policy = toRTT(corba_policy); - // Now create the output-side channel elements. - ChannelElementBase::shared_ptr start = type_info->buildChannelInput(*port); - std::string dispatcherName = dispatcherNameFromPolicy(mdf, policy); // The channel element that exposes our channel in CORBA CRemoteChannelElement_i* this_element = @@ -612,9 +574,12 @@ CChannelElement_ptr CDataFlowInterface_i::buildChannelInputHalf( PortableServer::ServantBase_var servant = this_element; this_element->setCDataFlowInterface(this); - // Attach the corba channel element first (so OOB is after corba). - assert( dynamic_cast(this_element) ); - start->getOutputEndPoint()->setOutput( dynamic_cast(this_element)); + // Now create the output-side channel elements. + ChannelElementBase::shared_ptr channel_corba_output = + dynamic_cast(this_element); + assert(channel_corba_output); + + ChannelElementBase::shared_ptr channel_input_element; /* * This part if for out-of band. (needs to be factored out). @@ -627,34 +592,173 @@ CChannelElement_ptr CDataFlowInterface_i::buildChannelInputHalf( log(Error) << "No such transport registered. Check your corba_policy.transport settings or add the transport for type "<< type_info->getTypeName() <getProtocol(corba_policy.transport)->createStream(port, policy, true); - // if no user supplied name, pass on the new name. - if ( strlen( corba_policy.name_id.in()) == 0 ) - corba_policy.name_id = CORBA::string_dup( policy.name_id.c_str() ); - - if (ceb) { - // OOB is added to end of chain. - start->getOutputEndPoint()->setOutput( ceb ); - log(Info) <<"Sending data from port "<< policy.name_id << " to out-of-band protocol "<< corba_policy.transport <getProtocol(corba_policy.transport)->createStream(port, policy, true); + if (!stream_channel_input) { log(Error) << "The type transporter for type "<getTypeName()<< " failed to create an out-of-band endpoint for port " << port_name<setOutput(stream_channel_input); + log(Info) + <<"Sending data from port "<< policy.name_id + << " to out-of-band protocol "<< corba_policy.transport <buildDataStorage(toRTT(corba_policy)); - start->setOutput(buf); - buf->setOutput( dynamic_cast(this_element) ); + channel_input_element = type_info->buildDataStorage(toRTT(corba_policy)); + channel_input_element->setOutput(channel_corba_output); } // Finally, store our mapping of corba channel elements to C++ channel elements. We need this for channelReady() and removing a channel again. { RTT::os::MutexLock lock(channel_list_mtx); - channel_list.push_back( ChannelList::value_type(this_element->_this(), start->getInputEndPoint())); + channel_list.push_back( + ChannelList::value_type(this_element->_this(), channel_input_element) + ); } return this_element->_this(); } +CORBA::Boolean CDataFlowInterface_i::connectChannelInputHalf(const char* output_port_name, CChannelElement_ptr channel, CConnPolicy const& policy) ACE_THROW_SPEC (( + CORBA::SystemException + ,::RTT::corba::CNoSuchPortException + )) +{ + OutputPortInterface* port = + dynamic_cast(mdf->getPort(output_port_name)); + if (port == 0) + throw corba::CNoSuchPortException(); + + TypeInfo const* type_info = port->getTypeInfo(); + if (!type_info) + throw CNoCorbaTransport(); + + CORBA_CHECK_THREAD(); + + // Attach to our output port: + auto cxx_channel = findCXXChannelFromCORBA(channel); + auto cxx_policy = toRTT(policy); + auto channel_input = cxx_channel->getInputEndPoint(); + + // Now create the output-side channel elements. + ChannelElementBase::shared_ptr start = + type_info->buildChannelInput(*port); + start->setOutput(channel_input); + + auto endpoint = start->getInputEndPoint(); + return endpoint->getPort()->addConnection(new SimpleConnID(), endpoint, cxx_policy); +} + +CChannelElement_ptr CDataFlowInterface_i::buildChannelOutputHalf( + const char* port_name, CConnPolicy & corba_policy) ACE_THROW_SPEC (( + CORBA::SystemException + ,::RTT::corba::CNoCorbaTransport + ,::RTT::corba::CNoSuchPortException + )) +{ + Logger::In in("CDataFlowInterface_i::buildChannelOutput"); + InputPortInterface* port = dynamic_cast(mdf->getPort(port_name)); + if (port == 0) + throw CNoSuchPortException(); + + TypeInfo const* type_info = port->getTypeInfo(); + if (!type_info) + throw CNoCorbaTransport(); + + CorbaTypeTransporter* transporter = + dynamic_cast(type_info->getProtocol(ORO_CORBA_PROTOCOL_ID)); + if (!transporter) + throw CNoCorbaTransport(); + + CORBA_CHECK_THREAD(); + ConnPolicy policy = toRTT(corba_policy); + + CRemoteChannelElement_i* this_element = + transporter->createInputChannelElement_i(mpoa, corba_policy.pull); + this_element->setCDataFlowInterface(this); + RTT::base::ChannelElementBase::shared_ptr channel_corba_input = + dynamic_cast(this_element); + assert(channel_corba_input); + + /* + * This part is for out-of band (needs to be factored out). + */ + if ( corba_policy.transport !=0 && corba_policy.transport != ORO_CORBA_PROTOCOL_ID) { + // prepare out-of-band transport for this port. + // if user supplied name, use that one. + if ( type_info->getProtocol(corba_policy.transport) == 0 ) { + log(Error) << "Could not create out-of-band transport for port "<< port_name << " with transport id " << corba_policy.transport <getTypeName() <getProtocol(corba_policy.transport)->createStream(port, policy, false); + if (!stream_channel_output) { + log(Error) << "The type transporter for type "<getTypeName()<< " failed to create an out-of-band endpoint for port " << port_name<setOutput(stream_channel_output); + ChannelElementBase::shared_ptr buf = type_info->buildDataStorage(toRTT(corba_policy)); + stream_channel_output->setOutput(buf); + log(Info) <<"Receiving data for port "<< policy.name_id << " from out-of-band protocol "<< corba_policy.transport <buildDataStorage(toRTT(corba_policy)); + channel_corba_input->setOutput(buf); + } + + this_element->_remove_ref(); + + // store our mapping of corba channel elements to C++ channel elements. We need this for channelReady() and removing a channel again. + { RTT::os::MutexLock lock(channel_list_mtx); + channel_list.push_back( + ChannelList::value_type(this_element->_this(), channel_corba_input) + ); + } + + CRemoteChannelElement_var proxy = this_element->_this(); + return proxy._retn(); +} + +CORBA::Boolean CDataFlowInterface_i::connectChannelOutputHalf(const char* input_port_name, CChannelElement_ptr channel, CConnPolicy const& policy) ACE_THROW_SPEC (( + CORBA::SystemException + ,::RTT::corba::CNoSuchPortException + )) +{ + InputPortInterface* port = + dynamic_cast(mdf->getPort(input_port_name)); + if (port == 0) + throw corba::CNoSuchPortException(); + + TypeInfo const* type_info = port->getTypeInfo(); + if (!type_info) + throw CNoCorbaTransport(); + + CORBA_CHECK_THREAD(); + + // Attach to our output port: + auto cxx_channel = findCXXChannelFromCORBA(channel); + auto cxx_policy = toRTT(policy); + auto channel_output = cxx_channel->getOutputEndPoint(); + ChannelElementBase::shared_ptr end = type_info->buildChannelOutput(*port); + channel_output->setOutput(end); + return port->channelReady(end->getOutputEndPoint(), cxx_policy); +} + ::CORBA::Boolean CDataFlowInterface_i::createLocalConnection( const char* writer_port, CDataFlowInterface_ptr reader_interface, const char* reader_port, CConnPolicy & policy) ACE_THROW_SPEC (( diff --git a/rtt/transports/corba/DataFlowI.h b/rtt/transports/corba/DataFlowI.h index dc32af45c..a0dd067cf 100644 --- a/rtt/transports/corba/DataFlowI.h +++ b/rtt/transports/corba/DataFlowI.h @@ -197,12 +197,6 @@ namespace RTT { RTT::DataFlowInterface* interface, RTT::ConnPolicy const& policy); - CChannelElement_ptr buildChannelOutputHalf( - const char* port_name, CConnPolicy & corba_policy) ACE_THROW_SPEC (( - CORBA::SystemException - ,::RTT::corba::CNoCorbaTransport - ,::RTT::corba::CNoSuchPortException - )); CChannelElement_ptr buildChannelOutput(const char* reader_port, RTT::corba::CConnPolicy& policy) ACE_THROW_SPEC (( CORBA::SystemException ,::RTT::corba::CNoCorbaTransport @@ -213,15 +207,46 @@ namespace RTT { ,::RTT::corba::CNoCorbaTransport ,::RTT::corba::CNoSuchPortException )); - CChannelElement_ptr buildChannelInputHalf(const char* writer_port, RTT::corba::CConnPolicy& policy) ACE_THROW_SPEC (( + + /** Build a channel for the input side of a CORBA channel, + * unconnected to any output port + */ + CChannelElement_ptr buildChannelInputHalf(const char* writer_port, RTT::corba::CConnPolicy& policy) + ACE_THROW_SPEC (( CORBA::SystemException ,::RTT::corba::CNoCorbaTransport ,::RTT::corba::CNoSuchPortException )); - CORBA::Boolean connectChannelInputHalf(const char* output_port_name, CChannelElement_ptr channel, CConnPolicy const& policy) ACE_THROW_SPEC (( - CORBA::SystemException - ,::RTT::corba::CNoSuchPortException + + /** Connect a channel create by buildChannelInputHalf to an output port */ + CORBA::Boolean connectChannelInputHalf( + const char* output_port_name, CChannelElement_ptr channel, + CConnPolicy const& policy + ) ACE_THROW_SPEC (( + CORBA::SystemException + ,::RTT::corba::CNoSuchPortException )); + + /** Build a channel for the output side of a CORBA channel, + * unconnected to any input port + */ + CChannelElement_ptr buildChannelOutputHalf( + const char* port_name, CConnPolicy & corba_policy + ) ACE_THROW_SPEC (( + CORBA::SystemException + ,::RTT::corba::CNoCorbaTransport + ,::RTT::corba::CNoSuchPortException + )); + + /** Connect a channel created by buildChannelOutputHalf to an input port */ + CORBA::Boolean connectChannelOutputHalf( + const char* input_port_name, CChannelElement_ptr channel, + CConnPolicy const& policy + ) ACE_THROW_SPEC (( + CORBA::SystemException + ,::RTT::corba::CNoSuchPortException + )); + ::CORBA::Boolean createLocalConnection( const char* writer_port, CDataFlowInterface_ptr reader_interface, const char* reader_port, CConnPolicy & policy) ACE_THROW_SPEC (( From bd1a350b4485af9df97fe2b7edec0f1b60e1d7ee Mon Sep 17 00:00:00 2001 From: Sylvain Joyeux Date: Mon, 17 Feb 2025 09:49:47 -0300 Subject: [PATCH 4/6] chore: make it explicit which of the channel building calls actually update the policy Policy updating is needed to exfiltrate some information in the OOB transport case (namely, a name that explains what the other side should do to connect, as for instance the MQ name for the MQ transport). Turns out that only the output half is doing so, and the other take the policy as input. Ideally, we would also have cleaned up what information is or is not being passed to the other calls (the connect calls, for instance, really don't need much policy information), but that would be for another PR. --- rtt/transports/corba/DataFlow.idl | 2 +- rtt/transports/corba/DataFlowI.cpp | 7 +------ rtt/transports/corba/DataFlowI.h | 2 +- 3 files changed, 3 insertions(+), 8 deletions(-) diff --git a/rtt/transports/corba/DataFlow.idl b/rtt/transports/corba/DataFlow.idl index d21acf0e0..a71e4dc3b 100644 --- a/rtt/transports/corba/DataFlow.idl +++ b/rtt/transports/corba/DataFlow.idl @@ -195,7 +195,7 @@ module RTT /** * Create the output half of a channel, not yet connected to the input port */ - CChannelElement buildChannelOutputHalf(in string input_port, inout CConnPolicy policy) + CChannelElement buildChannelOutputHalf(in string input_port, in CConnPolicy policy) raises(CNoCorbaTransport,CNoSuchPortException); /** diff --git a/rtt/transports/corba/DataFlowI.cpp b/rtt/transports/corba/DataFlowI.cpp index adcad6537..6140662c6 100644 --- a/rtt/transports/corba/DataFlowI.cpp +++ b/rtt/transports/corba/DataFlowI.cpp @@ -657,7 +657,7 @@ CORBA::Boolean CDataFlowInterface_i::connectChannelInputHalf(const char* output_ } CChannelElement_ptr CDataFlowInterface_i::buildChannelOutputHalf( - const char* port_name, CConnPolicy & corba_policy) ACE_THROW_SPEC (( + const char* port_name, const CConnPolicy & corba_policy) ACE_THROW_SPEC (( CORBA::SystemException ,::RTT::corba::CNoCorbaTransport ,::RTT::corba::CNoSuchPortException @@ -706,11 +706,6 @@ CChannelElement_ptr CDataFlowInterface_i::buildChannelOutputHalf( throw CNoCorbaTransport(); } - // if no user supplied name, pass on the new name. - if ( strlen( corba_policy.name_id.in()) == 0 ) { - corba_policy.name_id = CORBA::string_dup( policy.name_id.c_str() ); - } - // override, insert oob element between corba and endpoint and add a buffer between oob and endpoint. channel_corba_input->setOutput(stream_channel_output); ChannelElementBase::shared_ptr buf = type_info->buildDataStorage(toRTT(corba_policy)); diff --git a/rtt/transports/corba/DataFlowI.h b/rtt/transports/corba/DataFlowI.h index a0dd067cf..8f33725a1 100644 --- a/rtt/transports/corba/DataFlowI.h +++ b/rtt/transports/corba/DataFlowI.h @@ -231,7 +231,7 @@ namespace RTT { * unconnected to any input port */ CChannelElement_ptr buildChannelOutputHalf( - const char* port_name, CConnPolicy & corba_policy + const char* port_name, const CConnPolicy & corba_policy ) ACE_THROW_SPEC (( CORBA::SystemException ,::RTT::corba::CNoCorbaTransport From ea76a95b92680fae0b520ad2da6222f1e23fe12a Mon Sep 17 00:00:00 2001 From: Sylvain Joyeux Date: Mon, 17 Feb 2025 16:09:49 -0300 Subject: [PATCH 5/6] feat: allow to disable dataflow disconnection on destruction The current RTT behaviour is to have destructors explicitly disconnect channels. It's all well and good, but at destruction time things are ... unorderly. Allow to assume that a system manager will handle the cleanup when possible. --- rtt/CMakeLists.txt | 1 + rtt/base/OutputPortInterface.cpp | 3 +++ rtt/internal/ConnectionManager.cpp | 3 +++ rtt/rtt-config.h.in | 1 + 4 files changed, 8 insertions(+) diff --git a/rtt/CMakeLists.txt b/rtt/CMakeLists.txt index 7c3fa5fcc..fe5a17d4e 100644 --- a/rtt/CMakeLists.txt +++ b/rtt/CMakeLists.txt @@ -16,6 +16,7 @@ OPTION(OS_HAVE_STREAMS "Use C++ streams library." ON) OPTION(OS_HAVE_MAIN "Provide main() function in rtt library, which sets up the OS. The user implements ORO_main()." ON) OPTION(ORO_OS_USE_BOOST_THREAD "Use the Boost.Thread library. Currently only the mutex implementation is used." OFF) set(ROCK_USE_SANITIZERS "" CACHE STRING "Which sanitizers to enable during the build (comma separated, compiler specific)") +option(ORO_NO_DISCONNECT_DATAFLOW_ON_DESTRUCTION "Disable disconnecting dataflow on task context or port destruction" OFF) if (ROCK_USE_SANITIZERS) add_compile_options("-fsanitize=${ROCK_USE_SANITIZERS}") diff --git a/rtt/base/OutputPortInterface.cpp b/rtt/base/OutputPortInterface.cpp index 04a2334b2..84969e560 100644 --- a/rtt/base/OutputPortInterface.cpp +++ b/rtt/base/OutputPortInterface.cpp @@ -36,6 +36,7 @@ ***************************************************************************/ +#include "../rtt-config.h" #include "PortInterface.hpp" #include "OutputPortInterface.hpp" #include "InputPortInterface.hpp" @@ -52,7 +53,9 @@ OutputPortInterface::OutputPortInterface(std::string const& name) OutputPortInterface::~OutputPortInterface() { +#ifndef ORO_NO_DISCONNECT_DATAFLOW_ON_DESTRUCTION cmanager.disconnect(); +#endif } /** Returns true if this port is connected */ diff --git a/rtt/internal/ConnectionManager.cpp b/rtt/internal/ConnectionManager.cpp index 58240a3dc..64de2eb55 100644 --- a/rtt/internal/ConnectionManager.cpp +++ b/rtt/internal/ConnectionManager.cpp @@ -43,6 +43,7 @@ * Author: kaltan */ +#include "../rtt-config.h" #include "ConnectionManager.hpp" #include #include @@ -66,7 +67,9 @@ namespace RTT ConnectionManager::~ConnectionManager() { +#ifndef ORO_NO_DISCONNECT_DATAFLOW_ON_DESTRUCTION this->disconnect(); +#endif } /** diff --git a/rtt/rtt-config.h.in b/rtt/rtt-config.h.in index c7b534aac..acd345414 100644 --- a/rtt/rtt-config.h.in +++ b/rtt/rtt-config.h.in @@ -52,6 +52,7 @@ #define RTT_HAS_STATE_CHANGE_HOOK #cmakedefine ORO_DISABLE_PORT_DATA_SCRIPTING +#cmakedefine ORO_NO_DISCONNECT_DATAFLOW_ON_DESTRUCTION // if not defined, show an error #ifndef OROCOS_TARGET From 5931584a2952523c7eec1338e7e8602622764e81 Mon Sep 17 00:00:00 2001 From: Sylvain Joyeux Date: Tue, 18 Feb 2025 14:36:23 -0300 Subject: [PATCH 6/6] feat: implement CRemoteChannelElement::disconnectHalf --- rtt/transports/corba/DataFlow.idl | 8 +++++++ rtt/transports/corba/DataFlowI.cpp | 16 ++++++++++++- rtt/transports/corba/DataFlowI.h | 7 +++++- rtt/transports/corba/RemoteChannelElement.hpp | 23 ++++++++++++++++++- 4 files changed, 51 insertions(+), 3 deletions(-) diff --git a/rtt/transports/corba/DataFlow.idl b/rtt/transports/corba/DataFlow.idl index a71e4dc3b..602fa7a71 100644 --- a/rtt/transports/corba/DataFlow.idl +++ b/rtt/transports/corba/DataFlow.idl @@ -92,6 +92,14 @@ module RTT */ void remoteDisconnect(in boolean writer_to_reader); + /** Disconnect the local channel that includes this element + * + * Unlike disconnect, which calls the remote side internally, this + * method explicitly restricts itself to the local part of the channel + * The caller will be responsible to cleanup the other side. + */ + void disconnectHalf(); + }; /** Emitted when information is requested on a port that does not exist */ diff --git a/rtt/transports/corba/DataFlowI.cpp b/rtt/transports/corba/DataFlowI.cpp index 6140662c6..d0b222393 100644 --- a/rtt/transports/corba/DataFlowI.cpp +++ b/rtt/transports/corba/DataFlowI.cpp @@ -855,5 +855,19 @@ PortableServer::POA_ptr CRemoteChannelElement_i::_default_POA() void CRemoteChannelElement_i::setRemoteSide(CRemoteChannelElement_ptr remote) ACE_THROW_SPEC (( CORBA::SystemException )) -{ this->remote_side = RTT::corba::CRemoteChannelElement::_duplicate(remote); } +{ + RTT::os::MutexLock lock(remote_side_lock); + this->remote_side = RTT::corba::CRemoteChannelElement::_duplicate(remote); +} +CRemoteChannelElement_var CRemoteChannelElement_i::resetRemoteSide() { + RTT::os::MutexLock lock(remote_side_lock); + CRemoteChannelElement_var remote_side = this->remote_side; + this->remote_side = CRemoteChannelElement_var(); + return remote_side; +} + +CRemoteChannelElement_var CRemoteChannelElement_i::getRemoteSide() const { + RTT::os::MutexLock lock(remote_side_lock); + return remote_side; +} diff --git a/rtt/transports/corba/DataFlowI.h b/rtt/transports/corba/DataFlowI.h index 8f33725a1..b71fa9497 100644 --- a/rtt/transports/corba/DataFlowI.h +++ b/rtt/transports/corba/DataFlowI.h @@ -70,12 +70,17 @@ namespace RTT { : public POA_RTT::corba::CRemoteChannelElement , public virtual PortableServer::RefCountServantBase { - protected: + private: + mutable RTT::os::Mutex remote_side_lock; CRemoteChannelElement_var remote_side; + + protected: RTT::corba::CorbaTypeTransporter const& transport; PortableServer::POA_var mpoa; CDataFlowInterface_i* mdataflow; + CRemoteChannelElement_var resetRemoteSide(); + CRemoteChannelElement_var getRemoteSide() const; public: // standard constructor CRemoteChannelElement_i(corba::CorbaTypeTransporter const& transport, diff --git a/rtt/transports/corba/RemoteChannelElement.hpp b/rtt/transports/corba/RemoteChannelElement.hpp index fdb698755..e6d709ed2 100644 --- a/rtt/transports/corba/RemoteChannelElement.hpp +++ b/rtt/transports/corba/RemoteChannelElement.hpp @@ -152,6 +152,7 @@ namespace RTT { { // forward too. base::ChannelElementBase::signal(); + CRemoteChannelElement_var remote_side = getRemoteSide(); // intercept signal if no remote side set. if ( CORBA::is_nil(remote_side.in()) ) return true; @@ -180,6 +181,7 @@ namespace RTT { } void signalRemote() { + CRemoteChannelElement_var remote_side = getRemoteSide(); try { remote_side->remoteSignal(); } #ifdef CORBA_IS_OMNIORB @@ -213,6 +215,8 @@ namespace RTT { void disconnect() ACE_THROW_SPEC (( CORBA::SystemException )) { + CRemoteChannelElement_var remote_side = getRemoteSide(); + // disconnect both local and remote side. // !!!THIS RELIES ON BEHAVIOR OF REMOTEDISCONNECT BELOW doing both writer_to_reader and !writer_to_reader !!! try { @@ -225,10 +229,22 @@ namespace RTT { catch(CORBA::Exception&) {} } + /** + * CORBA IDL function. + */ + void disconnectHalf() ACE_THROW_SPEC (( + CORBA::SystemException + )) { + resetRemoteSide(); + this->remoteDisconnect(true); + } + void remoteDisconnect(bool writer_to_reader) ACE_THROW_SPEC (( CORBA::SystemException )) { + resetRemoteSide(); + base::ChannelElement::disconnect(writer_to_reader); // Because we support out-of-band transports, we must cleanup more thoroughly. @@ -251,6 +267,8 @@ namespace RTT { CORBA::SystemException )) { + CRemoteChannelElement_var remote_side = resetRemoteSide(); + try { if ( ! CORBA::is_nil(remote_side.in()) ) remote_side->remoteDisconnect(writer_to_reader); @@ -279,6 +297,8 @@ namespace RTT { if ( (fs = base::ChannelElement::read(sample, copy_old_data)) ) return fs; + CRemoteChannelElement_var remote_side = getRemoteSide(); + // go through corba CORBA::Any_var remote_value; try @@ -342,6 +362,7 @@ namespace RTT { if (base::ChannelElement::write(sample)) return true; // go through corba + CRemoteChannelElement_var remote_side = getRemoteSide(); assert( remote_side.in() != 0 && "Got write() without remote side. Need buffer OR remote side but neither was present."); try { @@ -421,6 +442,7 @@ namespace RTT { if(base->getOutput()) return RTT::base::ChannelElementBase::getRemoteURI(); + CRemoteChannelElement_var remote_side = getRemoteSide(); std::string uri = ApplicationServer::orb->object_to_string(remote_side); return uri; } @@ -444,4 +466,3 @@ namespace RTT { } #endif -