Skip to content

Commit

Permalink
Enhance execute method to simplify use
Browse files Browse the repository at this point in the history
  • Loading branch information
jgardona committed May 4, 2023
1 parent 3a7a9a9 commit 16d9b90
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 37 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
target/
Cargo.lock
16 changes: 9 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,28 @@
//! ```
//! use workerpool_rs::pool::WorkerPool;
//! use std::sync::mpsc::channel;
//! use std::sync::{Arc, Mutex};
//!
//! let n_workers = 4;
//! let n_jobs = 8;
//! let pool = WorkerPool::new(n_workers);
//!
//! let (tx, rx) = channel();
//! let atx = Arc::new(Mutex::new(tx));
//! for _ in 0..n_jobs {
//! let tx = tx.clone();
//! pool.execute(Box::new(move|| {
//! let atx = atx.clone();
//! pool.execute(move|| {
//! let tx = atx.lock().unwrap();
//! tx.send(1).expect("channel will be there waiting for the pool");
//! }));
//! });
//! }
//!
//! assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), 8);
//!```
//!
//! ### Sinchronized with Barrier
//!```
//!
//!
//! use std::sync::atomic::{AtomicUsize, Ordering};
//! use std::sync::{Arc, Barrier};
//! use workerpool_rs::pool::WorkerPool;
Expand All @@ -44,11 +47,11 @@
//! for _ in 0..n_jobs {
//! let barrier = barrier.clone();
//! let an_atomic = an_atomic.clone();
//!
//!
//! pool.execute(Box::new(move|| {
//! // do the heavy work
//! an_atomic.fetch_add(1, Ordering::Relaxed);
//!
//!
//! // then wait for the other threads
//! barrier.wait();
//! }));
Expand All @@ -58,6 +61,5 @@
//! assert_eq!(an_atomic.load(Ordering::SeqCst), /* n_jobs = */ 23);
//!```

// Imports and makes pool public.
pub mod pool;
53 changes: 30 additions & 23 deletions src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! ## Pool
//!
//! With this module, we are able to synchronize channels,
//! With this module, we are able to synchronize channels,
//! start jobs, wait for workers, and many others concurrent
//! tasks are made easy.
Expand All @@ -11,13 +11,13 @@ use std::{
};

// Basic types for concurrent tasks
type Job = Box<dyn FnOnce() + Send + 'static>;
type Job = Box<dyn FnOnce() + Send + Sync + 'static>;
type JobReceiver = Arc<Mutex<mpsc::Receiver<Job>>>;
type Handle = thread::JoinHandle<()>;

/// Implements a continuous pool of rust threads thats doesn't stops
/// unless it gets out of scope.
///
///
pub struct WorkerPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
Expand All @@ -37,16 +37,16 @@ impl WorkerPool {
/// let pool = WorkerPool::new(3);
///
/// assert_eq!("workers[] = (id: 0)(id: 1)(id: 2)", pool.to_string());
/// ```
/// ```
pub fn new(size: usize) -> WorkerPool {
let (tx, rx) = mpsc::channel();
let mut workers = Vec::<Worker>::with_capacity(size);
let rec = Arc::new(Mutex::new(rx));

for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&rec)));
}

WorkerPool {
workers,
sender: tx,
Expand All @@ -61,24 +61,31 @@ impl WorkerPool {
/// ```
/// use workerpool_rs::pool::WorkerPool;
/// use std::sync::mpsc;
/// use std::sync::{Arc, Mutex};
///
/// let njobs = 20;
/// let nworkers = 10;
///
/// let pool = WorkerPool::new(nworkers);
/// let (tx, rx) = mpsc::channel();
///
/// let atx = Arc::new(Mutex::new(tx));
///
/// for _ in 0 .. njobs {
/// let txc = tx.clone();
/// pool.execute(Box::new(move || {
/// txc.send(1).unwrap();
/// }));
/// let atx = atx.clone();
/// pool.execute(move || {
/// let tx = atx.lock().unwrap();
/// tx.send(1).unwrap();
/// });
/// }
///
/// let sum = rx.iter().take(njobs).sum();
/// assert_eq!(njobs, sum);
/// ```
pub fn execute(&self, f: Job) {
pub fn execute<J>(&self, f: J)
where
J: FnOnce() + Send + Sync + 'static,
{
let job = Box::new(f);
self.sender.send(job).expect("Cant send job");
}
Expand All @@ -97,7 +104,7 @@ impl Display for WorkerPool {
}

// A structure that holds an id and thread handle.
//
//
// id: usize - An id for worker indentification.\
// handle: JoinHandle<()> - a handle that has a working thread.
struct Worker {
Expand All @@ -112,18 +119,18 @@ impl Worker {
// handle: JoinHandle<()> - a thread handle.
fn new(id: usize, handle: JobReceiver) -> Worker {
let handle = thread::spawn(move || loop {
let job = match handle
.lock()
.expect("Cant acquire lock")
.recv() {
Ok(data) => data,
Err(_) => continue,
};
let job = match handle.lock().expect("Cant acquire lock").recv() {
Ok(data) => data,
Err(_) => continue,
};

job();
});

Worker { id, _handle: handle }
Worker {
id,
_handle: handle,
}
}
}

Expand Down Expand Up @@ -157,10 +164,10 @@ mod unit_tests {
#[test]
fn workerpool_should_execute_job_succeed() {
let pool = WorkerPool::new(1);
for _ in 0 .. 10000 {
pool.execute(Box::new(||{
for _ in 0..10000 {
pool.execute(|| {
let _sum = 3 + 1;
}));
});
}
}
}
19 changes: 12 additions & 7 deletions tests/integration.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use std::{
sync::atomic::AtomicUsize, sync::atomic::Ordering, sync::mpsc, sync::Arc, sync::Barrier,
sync::atomic::Ordering,
sync::mpsc,
sync::Arc,
sync::Barrier,
sync::{atomic::AtomicUsize, Mutex},
};

use workerpool_rs::pool;
Expand All @@ -17,10 +21,10 @@ fn pool_should_sum_atomic_variable() {
for _ in 0..njobs {
let b = barrier.clone();
let atomic = atomic.clone();
pool.execute(Box::new(move || {
pool.execute(move || {
atomic.fetch_add(1, Ordering::Relaxed);
b.wait();
}));
});
}
barrier.wait();
assert_eq!(atomic.load(Ordering::SeqCst), njobs);
Expand All @@ -34,12 +38,13 @@ fn pool_should_synchronize_sender_and_receiver_and_fold_results() {
let pool = pool::WorkerPool::new(nworkers);

let (tx, rx) = mpsc::channel();

let atx = Arc::new(Mutex::new(tx));
for _ in 0..njobs {
let tx = tx.clone();
pool.execute(Box::new(move || {
let atx = atx.clone();
pool.execute(move || {
let tx = atx.lock().unwrap();
tx.send(1).expect("channel waiting for pool");
}));
});
}

assert_eq!(rx.iter().take(njobs).fold(0, |a, b| a + b), njobs);
Expand Down

0 comments on commit 16d9b90

Please sign in to comment.