Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async Workers #446

Draft
wants to merge 101 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
101 commits
Select commit Hold shift + click to select a range
9b9de10
Drafting traits for generic executors
mxgrey Oct 7, 2024
2dba330
Fleshing out interfaces for async execution
mxgrey Oct 7, 2024
f10036d
Beginning to migrate subscriptions to async
mxgrey Oct 8, 2024
b1dda94
Implementing async execution for subscriptions
mxgrey Oct 8, 2024
c31ad7d
Update wait set behavior
mxgrey Oct 9, 2024
cda131d
Reimagining the way wait sets are handled
mxgrey Oct 10, 2024
d28ed70
Finished reworking wait sets -- need to migrate all waitables
mxgrey Oct 10, 2024
abb8367
Finished migrating guard conditions and services
mxgrey Oct 11, 2024
3a851be
Change Waitable from trait to struct
mxgrey Oct 12, 2024
2f91083
Migrate clients to new async implementation
mxgrey Oct 12, 2024
cc5b264
Fleshing out basic executor
mxgrey Oct 13, 2024
06cb0ab
Finished implementing basic executor -- needs testing
mxgrey Oct 13, 2024
40d8746
Add support for waiting on graph events
mxgrey Oct 13, 2024
329cafb
Migrate parameter service tests to new async executor
mxgrey Oct 13, 2024
d15f762
Try async-std timeouts instead of tokio
mxgrey Oct 13, 2024
fe56cc9
Debugging strange client failure to take
mxgrey Oct 13, 2024
8f0f192
Experimenting with only taking from services and clients in the same …
mxgrey Oct 14, 2024
c99f410
Migrate subscription to use shared callback instead of an async task
mxgrey Oct 14, 2024
26b41f8
Remove task modules that are no longer used
mxgrey Oct 14, 2024
500b083
Rename wait module to wait_set
mxgrey Oct 14, 2024
ee15de5
Move the wait_set_runner to the wait_set module
mxgrey Oct 14, 2024
d395f5f
Remove unnecessary debug outputs
mxgrey Oct 14, 2024
5c59592
Big cleanup
mxgrey Oct 14, 2024
0138e33
Update doctests
mxgrey Oct 14, 2024
22d2c84
Remove old comments
mxgrey Oct 14, 2024
29fe10e
Migrate Context, Executor, and Node creation to new API
mxgrey Nov 16, 2024
1b5c187
Update examples
mxgrey Nov 16, 2024
1ec9f10
Fix documentation
mxgrey Nov 19, 2024
0874d8d
Fix formatting
mxgrey Nov 20, 2024
433a348
Migrate to SubscriptionOptions
mxgrey Nov 20, 2024
f12f874
Migrate to PublisherOptions
mxgrey Nov 20, 2024
2c32e20
Migrate to ServiceOptions
mxgrey Nov 20, 2024
6c61c9c
Migrate to ClientOptions
mxgrey Nov 20, 2024
da05361
Enable direct creation of the _Options for all primitive types
mxgrey Nov 20, 2024
bf3d01a
Migrate Node to shared state pattern
mxgrey Nov 20, 2024
bbb5333
Migrate primitives to state pattern
mxgrey Nov 20, 2024
4c2a67b
Fix example formatting
mxgrey Nov 20, 2024
daebaa8
Fix example formatting
mxgrey Nov 20, 2024
2cff0b7
Fix examples
mxgrey Nov 20, 2024
126aaca
Fix docs
mxgrey Nov 20, 2024
012ff2e
Make deadline, liveliness_lease, and lifespan all symmetric
mxgrey Nov 21, 2024
f33e8d5
Add an API to the primitive options builder for avoiding ROS namespac…
mxgrey Nov 21, 2024
98b8ea2
Retrigger CI
mxgrey Nov 21, 2024
6059506
Implicitly cast &str to NodeOptions
mxgrey Nov 23, 2024
259fcb3
Remove debug outputs
mxgrey Nov 23, 2024
e1ceb70
Fix formatting
mxgrey Nov 23, 2024
6f9543c
Basic Timer type compiling :).
agalbachicar Nov 28, 2024
7b42c02
Evaluates the Timer::new() againts different clock types.
agalbachicar Nov 29, 2024
a60d9f3
Implement time_until_next_call
agalbachicar Nov 29, 2024
d233b47
Added cancel behavior
JesusSilvaUtrera Nov 29, 2024
3de2bb9
Merge pull request #1 from JesusSilvaUtrera/jsilva/add_cancel
agalbachicar Nov 29, 2024
189606f
Implement rcl_timer_reset
agalbachicar Nov 29, 2024
59ed7e2
Adds Timer::call().
agalbachicar Nov 29, 2024
9af9dd9
Added timer_period_ns (#2)
JesusSilvaUtrera Nov 29, 2024
e46224f
Adds Timer::is_ready().
agalbachicar Nov 29, 2024
501439d
WIP Timer callback implementation.
agalbachicar Nov 29, 2024
132c9db
Preliminary callback.
agalbachicar Nov 29, 2024
1095351
Added comments to avoid warnings (#3)
JesusSilvaUtrera Nov 29, 2024
965ca22
Integrated the Timer into the WaitSet.
agalbachicar Nov 29, 2024
f503c84
Add create_timer to node (WIP) (#4)
JesusSilvaUtrera Nov 29, 2024
ed78b35
Makes it work with the integration demo.
agalbachicar Nov 29, 2024
214a991
Working E2E timer with node.
agalbachicar Nov 29, 2024
1eb1acc
Format fix.
agalbachicar Nov 29, 2024
91756ca
Fix a TODO for peace of mind.
agalbachicar Nov 29, 2024
fbb8629
Adds an example.
agalbachicar Dec 1, 2024
ab3d63a
Fix format for the example.
agalbachicar Dec 1, 2024
4515e9a
Adds tests, documentation and removes dead code in node.rs.
agalbachicar Dec 1, 2024
85930a3
Fix documentation style in clock.rs.
agalbachicar Dec 1, 2024
1563895
Removes duplicated test in node.rs
agalbachicar Dec 1, 2024
08acef5
Fix warnings while running tests in node.rs.
agalbachicar Dec 1, 2024
a4c1a97
Fix missing documentation in wait.rs.
agalbachicar Dec 1, 2024
655185d
Improvements to timer.
agalbachicar Dec 1, 2024
30a6717
Makes rustdoc pass in the examples.
agalbachicar Dec 1, 2024
37d5e77
Streamline API for Timer
mxgrey Dec 2, 2024
e38144a
Fix lifecycles of Clock and Timer to guarantee safe usage
mxgrey Dec 2, 2024
2d424f0
Fill in documentation
mxgrey Dec 2, 2024
a72ebd6
Fix formatting
mxgrey Dec 2, 2024
9e28ee7
Keep track of time elapsed between timer calls
mxgrey Dec 2, 2024
d5c3e71
Add a test that spins Timers created from a Node
mxgrey Dec 2, 2024
f77d593
Update timer example
mxgrey Dec 2, 2024
464d5d9
Fix documentation
mxgrey Dec 3, 2024
c7c1cba
Rename remove_callback to set_inert
mxgrey Dec 3, 2024
0918476
Merge with latest main
mxgrey Dec 9, 2024
f1c4716
Merge execution structure PR
mxgrey Dec 9, 2024
2a7ff60
Merge options pattern PR
mxgrey Dec 9, 2024
e86707e
Merge in shared state pattern
mxgrey Dec 9, 2024
1f1d826
Refining merge
mxgrey Dec 9, 2024
6d3f7e4
Update examples
mxgrey Dec 9, 2024
8f8dfe9
Beginning implementation of Worker concept
mxgrey Dec 9, 2024
8fed9dd
Reworking Commands API
mxgrey Dec 10, 2024
5ae2a94
Enabling Worker features in Executor
mxgrey Dec 10, 2024
4af6efe
Migrated all primitives to new worker architecture
mxgrey Dec 11, 2024
f8a5803
Fix tests and eliminate (most) warnings
mxgrey Dec 11, 2024
49379f1
Simplify implementation of subscription deduction
mxgrey Dec 11, 2024
3db67ee
Finished initial Worker implementation - needs testing
mxgrey Dec 11, 2024
f761590
Finished and tested basic features of Worker
mxgrey Dec 11, 2024
4a98c8d
Fill in documentation
mxgrey Dec 11, 2024
5cf50fd
Merge in Timer features
mxgrey Dec 12, 2024
7d540c2
Implement timers for Workers
mxgrey Dec 12, 2024
6608192
Remove dbg outputs
mxgrey Dec 12, 2024
073c25e
Add demo for worker feature
mxgrey Dec 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 9 additions & 15 deletions examples/message_demo/src/message_demo.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{convert::TryInto, env, sync::Arc};
use std::convert::TryInto;

use anyhow::{Error, Result};
use rosidl_runtime_rs::{seq, BoundedSequence, Message, Sequence};
Expand Down Expand Up @@ -138,38 +138,32 @@ fn demonstrate_sequences() {
fn demonstrate_pubsub() -> Result<(), Error> {
println!("================== Interoperability demo ==================");
// Demonstrate interoperability between idiomatic and RMW-native message types
let context = rclrs::Context::new(env::args())?;
let node = rclrs::create_node(&context, "message_demo")?;
let mut executor = rclrs::Context::default_from_env()?.create_basic_executor();
let node = executor.create_node("message_demo")?;

let idiomatic_publisher = node.create_publisher::<rclrs_example_msgs::msg::VariousTypes>(
"topic",
rclrs::QOS_PROFILE_DEFAULT,
)?;
let direct_publisher = node.create_publisher::<rclrs_example_msgs::msg::rmw::VariousTypes>(
"topic",
rclrs::QOS_PROFILE_DEFAULT,
)?;
let idiomatic_publisher =
node.create_publisher::<rclrs_example_msgs::msg::VariousTypes>("topic")?;
let direct_publisher =
node.create_publisher::<rclrs_example_msgs::msg::rmw::VariousTypes>("topic")?;

let _idiomatic_subscription = node
.create_subscription::<rclrs_example_msgs::msg::VariousTypes, _>(
"topic",
rclrs::QOS_PROFILE_DEFAULT,
move |_msg: rclrs_example_msgs::msg::VariousTypes| println!("Got idiomatic message!"),
)?;
let _direct_subscription = node
.create_subscription::<rclrs_example_msgs::msg::rmw::VariousTypes, _>(
"topic",
rclrs::QOS_PROFILE_DEFAULT,
move |_msg: rclrs_example_msgs::msg::rmw::VariousTypes| {
println!("Got RMW-native message!")
},
)?;
println!("Sending idiomatic message.");
idiomatic_publisher.publish(rclrs_example_msgs::msg::VariousTypes::default())?;
rclrs::spin_once(Arc::clone(&node), None)?;
executor.spin(rclrs::SpinOptions::spin_once())?;
println!("Sending RMW-native message.");
direct_publisher.publish(rclrs_example_msgs::msg::rmw::VariousTypes::default())?;
rclrs::spin_once(Arc::clone(&node), None)?;
executor.spin(rclrs::SpinOptions::spin_once())?;

Ok(())
}
Expand Down
32 changes: 15 additions & 17 deletions examples/minimal_client_service/src/minimal_client.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,32 @@
use std::env;

use anyhow::{Error, Result};
use rclrs::{Context, SpinOptions, Promise};

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
let mut executor = Context::default_from_env()?.create_basic_executor();

let node = rclrs::create_node(&context, "minimal_client")?;
let node = executor.create_node("minimal_client")?;

let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;

let request = example_interfaces::srv::AddTwoInts_Request { a: 41, b: 1 };

println!("Starting client");

while !client.service_is_ready()? {
std::thread::sleep(std::time::Duration::from_millis(10));
}

client.async_send_request_with_callback(
&request,
move |response: example_interfaces::srv::AddTwoInts_Response| {
println!(
"Result of {} + {} is: {}",
request.a, request.b, response.sum
);
},
)?;
let request = example_interfaces::srv::AddTwoInts_Request { a: 41, b: 1 };

let response: Promise<example_interfaces::srv::AddTwoInts_Response> = client.call(&request).unwrap();

std::thread::sleep(std::time::Duration::from_millis(500));
let promise = executor.commands().run(async move {
let response = response.await.unwrap();
println!(
"Result of {} + {} is: {}",
request.a, request.b, response.sum,
);
});

println!("Waiting for response");
rclrs::spin(node).map_err(|err| err.into())
executor.spin(SpinOptions::new().until_promise_resolved(promise))?;
Ok(())
}
31 changes: 14 additions & 17 deletions examples/minimal_client_service/src/minimal_client_async.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::env;

use anyhow::{Error, Result};
use rclrs::{Context, SpinOptions};

#[tokio::main]
async fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
fn main() -> Result<(), Error> {
let mut executor = Context::default_from_env()?.create_basic_executor();

let node = rclrs::create_node(&context, "minimal_client")?;
let node = executor.create_node("minimal_client")?;

let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;

Expand All @@ -18,18 +16,17 @@ async fn main() -> Result<(), Error> {

let request = example_interfaces::srv::AddTwoInts_Request { a: 41, b: 1 };

let future = client.call_async(&request);
let promise = client.call_then(
&request,
move |response: example_interfaces::srv::AddTwoInts_Response| {
println!(
"Result of {} + {} is: {}",
request.a, request.b, response.sum,
);
}
).unwrap();

println!("Waiting for response");

let rclrs_spin = tokio::task::spawn_blocking(move || rclrs::spin(node));

let response = future.await?;
println!(
"Result of {} + {} is: {}",
request.a, request.b, response.sum
);

rclrs_spin.await.ok();
executor.spin(SpinOptions::new().until_promise_resolved(promise))?;
Ok(())
}
19 changes: 12 additions & 7 deletions examples/minimal_client_service/src/minimal_service.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,30 @@
use std::env;

use anyhow::{Error, Result};
use rclrs::{Context, ServiceInfo, SpinOptions};

fn handle_service(
_request_header: &rclrs::rmw_request_id_t,
request: example_interfaces::srv::AddTwoInts_Request,
info: ServiceInfo,
) -> example_interfaces::srv::AddTwoInts_Response {
println!("request: {} + {}", request.a, request.b);
let timestamp = info
.received_timestamp
.map(|t| format!(" at [{t:?}]"))
.unwrap_or(String::new());

println!("request{timestamp}: {} + {}", request.a, request.b);
example_interfaces::srv::AddTwoInts_Response {
sum: request.a + request.b,
}
}

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
let mut executor = Context::default_from_env()?.create_basic_executor();

let node = rclrs::create_node(&context, "minimal_service")?;
let node = executor.create_node("minimal_service")?;

let _server = node
.create_service::<example_interfaces::srv::AddTwoInts, _>("add_two_ints", handle_service)?;

println!("Starting server");
rclrs::spin(node).map_err(|err| err.into())
executor.spin(SpinOptions::default())?;
Ok(())
}
10 changes: 4 additions & 6 deletions examples/minimal_pub_sub/src/minimal_publisher.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use std::env;

use anyhow::{Error, Result};

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
let context = rclrs::Context::default_from_env()?;
let executor = context.create_basic_executor();

let node = rclrs::create_node(&context, "minimal_publisher")?;
let node = executor.create_node("minimal_publisher")?;

let publisher =
node.create_publisher::<std_msgs::msg::String>("topic", rclrs::QOS_PROFILE_DEFAULT)?;
let publisher = node.create_publisher::<std_msgs::msg::String>("topic")?;

let mut message = std_msgs::msg::String::default();

Expand Down
22 changes: 12 additions & 10 deletions examples/minimal_pub_sub/src/minimal_subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
use std::env;

use anyhow::{Error, Result};
use std::sync::Mutex;
use rclrs::{Context, SpinOptions};

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;

let node = rclrs::create_node(&context, "minimal_subscriber")?;
let context = Context::default_from_env()?;
let mut executor = context.create_basic_executor();

let mut num_messages: usize = 0;
let node = executor.create_node("minimal_subscriber")?;

let num_messages = Mutex::new(0usize);
let _subscription = node.create_subscription::<std_msgs::msg::String, _>(
"topic",
rclrs::QOS_PROFILE_DEFAULT,
move |msg: std_msgs::msg::String| {
num_messages += 1;
let mut num = num_messages.lock().unwrap();
*num += 1;
println!("I heard: '{}'", msg.data);
println!("(Got {} messages so far)", num_messages);
println!("(Got {} messages so far)", num);
},
)?;

rclrs::spin(node).map_err(|err| err.into())
println!("Waiting for messages...");
executor.spin(SpinOptions::default())?;
Ok(())
}
46 changes: 21 additions & 25 deletions examples/minimal_pub_sub/src/minimal_two_nodes.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
use std::{
env,
sync::{
atomic::{AtomicU32, Ordering},
Arc, Mutex,
},
use std::sync::{
atomic::{AtomicU32, Ordering},
Arc, Mutex,
};

use anyhow::{Error, Result};

struct MinimalSubscriber {
num_messages: AtomicU32,
node: Arc<rclrs::Node>,
subscription: Mutex<Option<Arc<rclrs::Subscription<std_msgs::msg::String>>>>,
node: rclrs::Node,
subscription: Mutex<Option<rclrs::Subscription<std_msgs::msg::String>>>,
}

impl MinimalSubscriber {
pub fn new(name: &str, topic: &str) -> Result<Arc<Self>, rclrs::RclrsError> {
let context = rclrs::Context::new(env::args())?;
let node = rclrs::create_node(&context, name)?;
pub fn new(
executor: &rclrs::Executor,
name: &str,
topic: &str,
) -> Result<Arc<Self>, rclrs::RclrsError> {
let node = executor.create_node(name)?;
let minimal_subscriber = Arc::new(MinimalSubscriber {
num_messages: 0.into(),
node,
Expand All @@ -29,7 +29,6 @@ impl MinimalSubscriber {
.node
.create_subscription::<std_msgs::msg::String, _>(
topic,
rclrs::QOS_PROFILE_DEFAULT,
move |msg: std_msgs::msg::String| {
minimal_subscriber_aux.callback(msg);
},
Expand All @@ -50,14 +49,15 @@ impl MinimalSubscriber {
}

fn main() -> Result<(), Error> {
let publisher_context = rclrs::Context::new(env::args())?;
let publisher_node = rclrs::create_node(&publisher_context, "minimal_publisher")?;
let mut executor = rclrs::Context::default_from_env()?.create_basic_executor();
let publisher_node = executor.create_node("minimal_publisher")?;

let subscriber_node_one = MinimalSubscriber::new("minimal_subscriber_one", "topic")?;
let subscriber_node_two = MinimalSubscriber::new("minimal_subscriber_two", "topic")?;
let _subscriber_node_one =
MinimalSubscriber::new(&executor, "minimal_subscriber_one", "topic")?;
let _subscriber_node_two =
MinimalSubscriber::new(&executor, "minimal_subscriber_two", "topic")?;

let publisher = publisher_node
.create_publisher::<std_msgs::msg::String>("topic", rclrs::QOS_PROFILE_DEFAULT)?;
let publisher = publisher_node.create_publisher::<std_msgs::msg::String>("topic")?;

std::thread::spawn(move || -> Result<(), rclrs::RclrsError> {
let mut message = std_msgs::msg::String::default();
Expand All @@ -71,11 +71,7 @@ fn main() -> Result<(), Error> {
}
});

let executor = rclrs::SingleThreadedExecutor::new();

executor.add_node(&publisher_node)?;
executor.add_node(&subscriber_node_one.node)?;
executor.add_node(&subscriber_node_two.node)?;

executor.spin().map_err(|err| err.into())
executor
.spin(rclrs::SpinOptions::default())
.map_err(|err| err.into())
}
10 changes: 4 additions & 6 deletions examples/minimal_pub_sub/src/zero_copy_publisher.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use std::env;

use anyhow::{Error, Result};

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
let context = rclrs::Context::default_from_env()?;
let executor = context.create_basic_executor();

let node = rclrs::create_node(&context, "minimal_publisher")?;
let node = executor.create_node("minimal_publisher")?;

let publisher =
node.create_publisher::<std_msgs::msg::rmw::UInt32>("topic", rclrs::QOS_PROFILE_DEFAULT)?;
let publisher = node.create_publisher::<std_msgs::msg::rmw::UInt32>("topic")?;

let mut publish_count: u32 = 1;

Expand Down
21 changes: 11 additions & 10 deletions examples/minimal_pub_sub/src/zero_copy_subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
use std::env;

use anyhow::{Error, Result};
use std::sync::Mutex;
use rclrs::ReadOnlyLoanedMessage;

fn main() -> Result<(), Error> {
let context = rclrs::Context::new(env::args())?;
let mut executor = rclrs::Context::default_from_env()?.create_basic_executor();

let node = rclrs::create_node(&context, "minimal_subscriber")?;
let node = executor.create_node("minimal_subscriber")?;

let mut num_messages: usize = 0;
let num_messages = Mutex::new(0usize);

let _subscription = node.create_subscription::<std_msgs::msg::UInt32, _>(
"topic",
rclrs::QOS_PROFILE_DEFAULT,
move |msg: rclrs::ReadOnlyLoanedMessage<'_, std_msgs::msg::UInt32>| {
num_messages += 1;
move |msg: ReadOnlyLoanedMessage<std_msgs::msg::UInt32>| {
let mut num = num_messages.lock().unwrap();
*num += 1;
println!("I heard: '{}'", msg.data);
println!("(Got {} messages so far)", num_messages);
println!("(Got {} messages so far)", *num);
},
)?;

rclrs::spin(node).map_err(|err| err.into())
executor.spin(rclrs::SpinOptions::default())?;
Ok(())
}
12 changes: 12 additions & 0 deletions examples/rclrs_timer_demo/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "rclrs_timer_demo"
version = "0.1.0"
edition = "2021"

[[bin]]
name="rclrs_timer_demo"
path="src/rclrs_timer_demo.rs"


[dependencies]
rclrs = "*"
Loading
Loading