Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execution structure (spin-off of #427) #428

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions examples/message_demo/src/message_demo.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{convert::TryInto, env, sync::Arc};
use std::convert::TryInto;

use anyhow::{Error, Result};
use rosidl_runtime_rs::{seq, BoundedSequence, Message, Sequence};
Expand Down Expand Up @@ -138,8 +138,8 @@ fn demonstrate_sequences() {
fn demonstrate_pubsub() -> Result<(), Error> {
println!("================== Interoperability demo ==================");
// Demonstrate interoperability between idiomatic and RMW-native message types
let context = rclrs::Context::new(env::args())?;
let node = rclrs::create_node(&context, "message_demo")?;
let mut executor = rclrs::Context::default_from_env()?.create_basic_executor();
let node = executor.create_node("message_demo")?;

let idiomatic_publisher = node.create_publisher::<rclrs_example_msgs::msg::VariousTypes>(
"topic",
Expand All @@ -166,10 +166,10 @@ fn demonstrate_pubsub() -> Result<(), Error> {
)?;
println!("Sending idiomatic message.");
idiomatic_publisher.publish(rclrs_example_msgs::msg::VariousTypes::default())?;
rclrs::spin_once(Arc::clone(&node), None)?;
executor.spin(rclrs::SpinOptions::spin_once())?;
println!("Sending RMW-native message.");
direct_publisher.publish(rclrs_example_msgs::msg::rmw::VariousTypes::default())?;
rclrs::spin_once(Arc::clone(&node), None)?;
executor.spin(rclrs::SpinOptions::spin_once())?;

Ok(())
}
Expand Down
10 changes: 5 additions & 5 deletions examples/minimal_client_service/src/minimal_client.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use std::env;

use anyhow::{Error, Result};

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
let mut executor = rclrs::Context::default_from_env()?.create_basic_executor();

let node = rclrs::create_node(&context, "minimal_client")?;
let node = executor.create_node("minimal_client")?;

let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;

Expand All @@ -30,5 +28,7 @@ fn main() -> Result<(), Error> {
std::thread::sleep(std::time::Duration::from_millis(500));

println!("Waiting for response");
rclrs::spin(node).map_err(|err| err.into())
executor
.spin(rclrs::SpinOptions::default())
.map_err(|err| err.into())
}
9 changes: 4 additions & 5 deletions examples/minimal_client_service/src/minimal_client_async.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::env;

use anyhow::{Error, Result};

#[tokio::main]
async fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
let mut executor = rclrs::Context::default_from_env()?.create_basic_executor();

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mxgrey how would a custom executor work here? It'd take a Context as a argument, right? I'd remove the create_basic_executor function and be slightly more verbose (SingleThreadedExecutor::new(&context)) to show how the API works and basically to not make any distinction between the executors.

Copy link
Collaborator Author

@mxgrey mxgrey Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we look several PRs into the future we can see how the basic executor will be created once we have the ability to support custom executors.

The hook for making a custom executor will be the runtime trait which downstream users can implement for their custom struct. Once they've implemented that trait, anyone can use the custom runtime to create an opaque Executor instance. The Executor hides away what kind of runtime is being used so the rest of the application doesn't have to care about it.

I've gone ahead and made a repo to demonstrate what this would look like. In that repo I've made two different custom executors: DoNothingSimple and DoNothingAdvanced. Both are custom executor runtimes which, as their name implies, do nothing. They're not useful as far as executors go, but they can show us how the custom executor API will work. If you clone that repo into a colcon workspace and checkout the worker branch of my fork you'll be able to compile and run the demo.

Simple

The simplest way to create a custom executor is to use the Context::create_executor method as demonstrated here. This will work perfectly fine as long as it's okay for downstream users to directly initialize your custom runtime.

Advanced

There are two reasons you might want to use a more advanced approach for a custom runtime:

  1. You don't want users to be able to directly instantiate your runtime. Maybe your runtime is a singleton or relies on some dependencies that you want to hide as implementation details.
  2. Your runtime needs access to the Context right away during instantiation, and you want to guarantee that the user can't pass the wrong context to you.

To achieve this you can use the extension trait pattern. The repo demonstrates an implementation of this pattern here. Then the pattern gets used here.

This pattern allows you to add new methods to the Context struct from downstream crates. In this case we're adding Context::create_do_nothing_executor which bears an uncanny resemblance to our built-in method Context::create_basic_executor.

All that is to say, the API for custom downstream executors can be fully consistent with this out-of-the-box upstream API of Context::create_basic_executor.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed in the Working Group meeting, I've change this PR to use an extension trait to provide the Context::create_basic_executor method: a75cd24

At the same time I've simplified all the tests and demos to just do use rclrs::*; since that's probably how most users will want to use rclrs anyway.

let node = rclrs::create_node(&context, "minimal_client")?;
let node = executor.create_node("minimal_client")?;

let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;

Expand All @@ -22,7 +20,8 @@ async fn main() -> Result<(), Error> {

println!("Waiting for response");

let rclrs_spin = tokio::task::spawn_blocking(move || rclrs::spin(node));
let rclrs_spin =
tokio::task::spawn_blocking(move || executor.spin(rclrs::SpinOptions::default()));

let response = future.await?;
println!(
Expand Down
10 changes: 5 additions & 5 deletions examples/minimal_client_service/src/minimal_service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::env;

use anyhow::{Error, Result};

fn handle_service(
Expand All @@ -13,13 +11,15 @@ fn handle_service(
}

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
let mut executor = rclrs::Context::default_from_env()?.create_basic_executor();

let node = rclrs::create_node(&context, "minimal_service")?;
let node = executor.create_node("minimal_service")?;

let _server = node
.create_service::<example_interfaces::srv::AddTwoInts, _>("add_two_ints", handle_service)?;

println!("Starting server");
rclrs::spin(node).map_err(|err| err.into())
executor
.spin(rclrs::SpinOptions::default())
.map_err(|err| err.into())
}
7 changes: 3 additions & 4 deletions examples/minimal_pub_sub/src/minimal_publisher.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::env;

use anyhow::{Error, Result};

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
let context = rclrs::Context::default_from_env()?;
let executor = context.create_basic_executor();

let node = rclrs::create_node(&context, "minimal_publisher")?;
let node = executor.create_node("minimal_publisher")?;

let publisher =
node.create_publisher::<std_msgs::msg::String>("topic", rclrs::QOS_PROFILE_DEFAULT)?;
Expand Down
11 changes: 6 additions & 5 deletions examples/minimal_pub_sub/src/minimal_subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::env;

use anyhow::{Error, Result};

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
let context = rclrs::Context::default_from_env()?;
let mut executor = context.create_basic_executor();

let node = rclrs::create_node(&context, "minimal_subscriber")?;
let node = executor.create_node("minimal_subscriber")?;

let mut num_messages: usize = 0;

Expand All @@ -19,5 +18,7 @@ fn main() -> Result<(), Error> {
},
)?;

rclrs::spin(node).map_err(|err| err.into())
executor
.spin(rclrs::SpinOptions::default())
.map_err(|err| err.into())
}
38 changes: 18 additions & 20 deletions examples/minimal_pub_sub/src/minimal_two_nodes.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use std::{
env,
sync::{
atomic::{AtomicU32, Ordering},
Arc, Mutex,
},
use std::sync::{
atomic::{AtomicU32, Ordering},
Arc, Mutex,
};

use anyhow::{Error, Result};
Expand All @@ -15,9 +12,12 @@ struct MinimalSubscriber {
}

impl MinimalSubscriber {
pub fn new(name: &str, topic: &str) -> Result<Arc<Self>, rclrs::RclrsError> {
let context = rclrs::Context::new(env::args())?;
let node = rclrs::create_node(&context, name)?;
pub fn new(
executor: &rclrs::Executor,
name: &str,
topic: &str,
) -> Result<Arc<Self>, rclrs::RclrsError> {
let node = executor.create_node(name)?;
let minimal_subscriber = Arc::new(MinimalSubscriber {
num_messages: 0.into(),
node,
Expand Down Expand Up @@ -50,11 +50,13 @@ impl MinimalSubscriber {
}

fn main() -> Result<(), Error> {
let publisher_context = rclrs::Context::new(env::args())?;
let publisher_node = rclrs::create_node(&publisher_context, "minimal_publisher")?;
let mut executor = rclrs::Context::default_from_env()?.create_basic_executor();
let publisher_node = executor.create_node("minimal_publisher")?;

let subscriber_node_one = MinimalSubscriber::new("minimal_subscriber_one", "topic")?;
let subscriber_node_two = MinimalSubscriber::new("minimal_subscriber_two", "topic")?;
let _subscriber_node_one =
MinimalSubscriber::new(&executor, "minimal_subscriber_one", "topic")?;
let _subscriber_node_two =
MinimalSubscriber::new(&executor, "minimal_subscriber_two", "topic")?;

let publisher = publisher_node
.create_publisher::<std_msgs::msg::String>("topic", rclrs::QOS_PROFILE_DEFAULT)?;
Expand All @@ -71,11 +73,7 @@ fn main() -> Result<(), Error> {
}
});

let executor = rclrs::SingleThreadedExecutor::new();

executor.add_node(&publisher_node)?;
executor.add_node(&subscriber_node_one.node)?;
executor.add_node(&subscriber_node_two.node)?;

executor.spin().map_err(|err| err.into())
executor
.spin(rclrs::SpinOptions::default())
.map_err(|err| err.into())
}
7 changes: 3 additions & 4 deletions examples/minimal_pub_sub/src/zero_copy_publisher.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::env;

use anyhow::{Error, Result};

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
let context = rclrs::Context::default_from_env()?;
let executor = context.create_basic_executor();

let node = rclrs::create_node(&context, "minimal_publisher")?;
let node = executor.create_node("minimal_publisher")?;

let publisher =
node.create_publisher::<std_msgs::msg::rmw::UInt32>("topic", rclrs::QOS_PROFILE_DEFAULT)?;
Expand Down
10 changes: 5 additions & 5 deletions examples/minimal_pub_sub/src/zero_copy_subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use std::env;

use anyhow::{Error, Result};

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
let mut executor = rclrs::Context::default_from_env()?.create_basic_executor();

let node = rclrs::create_node(&context, "minimal_subscriber")?;
let node = executor.create_node("minimal_subscriber")?;

let mut num_messages: usize = 0;

Expand All @@ -19,5 +17,7 @@ fn main() -> Result<(), Error> {
},
)?;

rclrs::spin(node).map_err(|err| err.into())
executor
.spin(rclrs::SpinOptions::default())
.map_err(|err| err.into())
}
27 changes: 14 additions & 13 deletions examples/rust_pubsub/src/simple_publisher.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,37 @@
use rclrs::{create_node, Context, Node, Publisher, RclrsError, QOS_PROFILE_DEFAULT};
use rclrs::{Context, Executor, Publisher, RclrsError, SpinOptions, QOS_PROFILE_DEFAULT};
use std::{sync::Arc, thread, time::Duration};
use std_msgs::msg::String as StringMsg;
struct SimplePublisherNode {
node: Arc<Node>,
_publisher: Arc<Publisher<StringMsg>>,

struct SimplePublisher {
publisher: Arc<Publisher<StringMsg>>,
}
impl SimplePublisherNode {
fn new(context: &Context) -> Result<Self, RclrsError> {
let node = create_node(context, "simple_publisher").unwrap();
let _publisher = node

impl SimplePublisher {
fn new(executor: &Executor) -> Result<Self, RclrsError> {
let node = executor.create_node("simple_publisher").unwrap();
let publisher = node
.create_publisher("publish_hello", QOS_PROFILE_DEFAULT)
.unwrap();
Ok(Self { node, _publisher })
Ok(Self { publisher })
}

fn publish_data(&self, increment: i32) -> Result<i32, RclrsError> {
let msg: StringMsg = StringMsg {
data: format!("Hello World {}", increment),
};
self._publisher.publish(msg).unwrap();
self.publisher.publish(msg).unwrap();
Ok(increment + 1_i32)
}
}

fn main() -> Result<(), RclrsError> {
let context = Context::new(std::env::args()).unwrap();
let publisher = Arc::new(SimplePublisherNode::new(&context).unwrap());
let mut executor = Context::default_from_env().unwrap().create_basic_executor();
let publisher = Arc::new(SimplePublisher::new(&executor).unwrap());
let publisher_other_thread = Arc::clone(&publisher);
let mut count: i32 = 0;
thread::spawn(move || loop {
thread::sleep(Duration::from_millis(1000));
count = publisher_other_thread.publish_data(count).unwrap();
});
rclrs::spin(publisher.node.clone())
executor.spin(SpinOptions::default())
}
22 changes: 9 additions & 13 deletions examples/rust_pubsub/src/simple_subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use rclrs::{create_node, Context, Node, RclrsError, Subscription, QOS_PROFILE_DEFAULT};
use rclrs::{Context, Executor, RclrsError, SpinOptions, Subscription, QOS_PROFILE_DEFAULT};
use std::{
env,
sync::{Arc, Mutex},
thread,
time::Duration,
};
use std_msgs::msg::String as StringMsg;

pub struct SimpleSubscriptionNode {
node: Arc<Node>,
_subscriber: Arc<Subscription<StringMsg>>,
data: Arc<Mutex<Option<StringMsg>>>,
}

impl SimpleSubscriptionNode {
fn new(context: &Context) -> Result<Self, RclrsError> {
let node = create_node(context, "simple_subscription").unwrap();
fn new(executor: &Executor) -> Result<Self, RclrsError> {
let node = executor.create_node("simple_subscription").unwrap();
let data: Arc<Mutex<Option<StringMsg>>> = Arc::new(Mutex::new(None));
let data_mut: Arc<Mutex<Option<StringMsg>>> = Arc::clone(&data);
let _subscriber = node
Expand All @@ -25,11 +25,7 @@ impl SimpleSubscriptionNode {
},
)
.unwrap();
Ok(Self {
node,
_subscriber,
data,
})
Ok(Self { _subscriber, data })
}
fn data_callback(&self) -> Result<(), RclrsError> {
if let Some(data) = self.data.lock().unwrap().as_ref() {
Expand All @@ -41,12 +37,12 @@ impl SimpleSubscriptionNode {
}
}
fn main() -> Result<(), RclrsError> {
let context = Context::new(env::args()).unwrap();
let subscription = Arc::new(SimpleSubscriptionNode::new(&context).unwrap());
let mut executor = Context::default_from_env().unwrap().create_basic_executor();
let subscription = Arc::new(SimpleSubscriptionNode::new(&executor).unwrap());
let subscription_other_thread = Arc::clone(&subscription);
thread::spawn(move || loop {
thread::sleep(Duration::from_millis(1000));
subscription_other_thread.data_callback().unwrap()
});
rclrs::spin(subscription.node.clone())
executor.spin(SpinOptions::default())
}
Loading
Loading