Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use BaseAction to share common logic #8

Merged
merged 2 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ jobs:
- name: Run unit tests
run: cargo x test --no-capture
shell: bash
- name: Run examples
shell: bash
run: |
cargo run --features="driver" --example time_driver
cargo run --features="tokio,logging" --example schedule_arbitrary_delay
cargo run --features="tokio,logging" --example schedule_at_fixed_rate
cargo run --features="tokio,logging" --example schedule_with_fixed_delay
env:
RUST_LOG: DEBUG

required:
name: Required
Expand Down
21 changes: 16 additions & 5 deletions fastimer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

[package]
name = "fastimer"
version = "0.2.4"
version = "0.3.0"

description = "This crate implements runtime-agnostic driver for async timers and scheduled tasks."

Expand All @@ -36,9 +36,8 @@ tokio = ["tokio-time", "tokio-spawn"]
tokio-spawn = ["dep:tokio", "tokio/rt-multi-thread"]
tokio-time = ["dep:tokio", "tokio/time"]

test = ["tokio", "driver", "logging"]

[dependencies]
if_chain = { version = "1.0.2" }
pin-project = { version = "1.1.7" }

atomic-waker = { version = "1.1.2", optional = true }
Expand All @@ -59,8 +58,20 @@ workspace = true

[[example]]
doc-scrape-examples = true
name = "simple_action"
path = "examples/simple_action.rs"
name = "schedule_arbitrary_delay"
path = "examples/schedule_arbitrary_delay.rs"
required-features = ["tokio", "logging"]

[[example]]
doc-scrape-examples = true
name = "schedule_at_fixed_rate"
path = "examples/schedule_at_fixed_rate.rs"
required-features = ["tokio", "logging"]

[[example]]
doc-scrape-examples = true
name = "schedule_with_fixed_delay"
path = "examples/schedule_with_fixed_delay.rs"
required-features = ["tokio", "logging"]

[[example]]
Expand Down
53 changes: 53 additions & 0 deletions fastimer/examples/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2024 FastLabs Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;
use std::sync::Arc;
use std::time::Duration;

use fastimer::tokio::MakeTokioDelay;
use mea::latch::Latch;
use mea::waitgroup::WaitGroup;

#[derive(Debug, Clone)]
pub struct Shutdown {
latch: Arc<Latch>,
wg: WaitGroup,
}

impl Shutdown {
pub fn new() -> Self {
Shutdown {
latch: Arc::new(Latch::new(1)),
wg: WaitGroup::new(),
}
}

pub fn shutdown(&self) {
self.latch.count_down();
}

pub async fn is_shutdown(&self) {
self.latch.wait().await;
}

pub async fn await_shutdown(self) {
self.wg.await;
}
}

pub async fn timeout(fut: impl Future) {
const T: Duration = Duration::from_secs(10);
fastimer::timeout(T, fut, MakeTokioDelay).await.unwrap();
}
75 changes: 75 additions & 0 deletions fastimer/examples/schedule_arbitrary_delay.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2024 FastLabs Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;
use std::time::Duration;
use std::time::Instant;

use fastimer::make_instant_from_now;
use fastimer::schedule::ArbitraryDelayAction;
use fastimer::schedule::ArbitraryDelayActionExt;
use fastimer::schedule::BaseAction;
use fastimer::tokio::MakeTokioDelay;
use fastimer::tokio::TokioSpawn;

use crate::common::Shutdown;

mod common;

#[derive(Debug)]
struct TickAction {
count: u32,
shutdown: Shutdown,
}

impl BaseAction for TickAction {
fn name(&self) -> &str {
"tick-arbitrary"
}

fn is_shutdown(&self) -> impl Future<Output = ()> + Send {
self.shutdown.is_shutdown()
}
}

impl ArbitraryDelayAction for TickAction {
async fn run(&mut self) -> Instant {
self.count += 1;
log::info!("tick: {}", self.count);
make_instant_from_now(Duration::from_secs(self.count as u64))
}
}

fn main() {
logforth::stderr().apply();

let shutdown = Shutdown::new();
let tick = TickAction {
count: 0,
shutdown: shutdown.clone(),
};

let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
tick.schedule(
&TokioSpawn::current(),
MakeTokioDelay,
Some(Duration::from_secs(1)),
);

tokio::time::sleep(Duration::from_secs(10)).await;
shutdown.shutdown();
common::timeout(shutdown.await_shutdown()).await;
});
}
81 changes: 81 additions & 0 deletions fastimer/examples/schedule_at_fixed_rate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2024 FastLabs Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;
use std::time::Duration;

use fastimer::schedule::BaseAction;
use fastimer::schedule::SimpleAction;
use fastimer::schedule::SimpleActionExt;
use fastimer::tokio::MakeTokioDelay;
use fastimer::tokio::TokioSpawn;

use crate::common::Shutdown;

mod common;

#[derive(Debug)]
struct TickAction {
name: String,
count: u32,
sleep: Duration,
shutdown: Shutdown,
}

impl BaseAction for TickAction {
fn name(&self) -> &str {
&self.name
}

fn is_shutdown(&self) -> impl Future<Output = ()> + Send {
self.shutdown.is_shutdown()
}
}

impl SimpleAction for TickAction {
async fn run(&mut self) {
self.count += 1;
log::info!("[{}] tick start: {}", self.name(), self.count);
tokio::time::sleep(self.sleep).await;
log::info!("[{}] tick end: {}", self.name(), self.count);
}
}

async fn do_schedule_at_fixed_rate(sleep: u64, period: u64) {
let shutdown = Shutdown::new();
let tick = TickAction {
name: format!("fixed-rate-{sleep}/{period}"),
count: 0,
sleep: Duration::from_secs(sleep),
shutdown: shutdown.clone(),
};
tick.schedule_at_fixed_rate(
&TokioSpawn::current(),
MakeTokioDelay,
None,
Duration::from_secs(period),
);
tokio::time::sleep(Duration::from_secs(10)).await;
shutdown.shutdown();
common::timeout(shutdown.await_shutdown()).await;
}

fn main() {
logforth::stderr().apply();

let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(do_schedule_at_fixed_rate(1, 2));
rt.block_on(do_schedule_at_fixed_rate(3, 2));
rt.block_on(do_schedule_at_fixed_rate(5, 2));
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,61 +13,61 @@
// limitations under the License.

use std::future::Future;
use std::sync::Arc;
use std::time::Duration;

use fastimer::schedule::BaseAction;
use fastimer::schedule::SimpleAction;
use fastimer::schedule::SimpleActionExt;
use fastimer::tokio::MakeTokioDelay;
use fastimer::tokio::TokioSpawn;
use mea::latch::Latch;
use mea::waitgroup::WaitGroup;

use crate::common::Shutdown;

mod common;

#[derive(Debug)]
struct TickAction {
count: u32,

latch: Arc<Latch>,
_wg: WaitGroup,
shutdown: Shutdown,
}

impl SimpleAction for TickAction {
impl BaseAction for TickAction {
fn name(&self) -> &str {
"tick"
"tick-fixed-delay"
}

async fn run(&mut self) {
self.count += 1;
println!("tick: {}", self.count);
fn is_shutdown(&self) -> impl Future<Output = ()> + Send {
self.shutdown.is_shutdown()
}
}

fn is_shutdown(&self) -> impl Future<Output = ()> + Send {
self.latch.wait()
impl SimpleAction for TickAction {
async fn run(&mut self) {
self.count += 1;
log::info!("tick: {}", self.count);
}
}

fn main() {
let wg = WaitGroup::new();
let latch = Arc::new(Latch::new(1));
logforth::stderr().apply();

let shutdown = Shutdown::new();
let tick = TickAction {
count: 0,
latch: latch.clone(),
_wg: wg.clone(),
shutdown: shutdown.clone(),
};

let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
tick.schedule_with_fixed_delay(
&TokioSpawn::current(),
MakeTokioDelay,
None,
Some(Duration::from_secs(1)),
Duration::from_secs(1),
);

tokio::time::sleep(Duration::from_secs(5)).await;
latch.count_down();
fastimer::timeout(Duration::from_secs(5), wg, MakeTokioDelay)
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(10)).await;
shutdown.shutdown();
common::timeout(shutdown.await_shutdown()).await;
});
}
Loading