BlockingQueue<T> library
/////////////////////////////////////////////////
// rust_blocking_queue::lib.rs - BlockingQueue //
// //
// Jim Fawcett, https://JimFawcett.github.io //
/////////////////////////////////////////////////
/*
This is a BlockingQueue abstraction. To be shared
between threads, without using unsafe code, any
abstraction must be composed only of Mutexes and
Condvars or struct or tuple with only those members.
That means that the blocking queue must hold its
native queue in a Mutex, as shown below.
There is another alternative, based on Rust
channels, which are essentially blocking queues.
*/
#![allow(dead_code)]
use std::sync::*;
use std::collections::*;
#[derive(Debug)]
/// Thread-safe queue that blocks de_q on empty
pub struct BlockingQueue<T> {
q: Mutex<VecDeque<T>>,
cv: Condvar,
}
impl<T> BlockingQueue<T> {
/// Create empty blocking queue
pub fn new() -> Self {
Self {
q: Mutex::new(VecDeque::new()),
cv: Condvar::new(),
}
}
/// push input on back of queue
/// - unrecoverable if lock fails so just unwrap
pub fn en_q(&self, t:T) {
let mut lq = self.q.lock().unwrap();
lq.push_back(t);
self.cv.notify_one();
}
/// pop element from front of queue
/// - unrecoverable if lock fails so just unwrap
/// - same for condition variable
pub fn de_q(&self) -> T {
let mut lq = self.q.lock().unwrap();
while lq.len() == 0 {
lq = self.cv.wait(lq).unwrap();
}
lq.pop_front().unwrap()
}
/// return number of elements in queue
pub fn len(&self) -> usize {
self.q.lock().unwrap().len()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn bq_len() {
let bq = BlockingQueue::<f64>::new();
assert_eq!(bq.len(), 0);
}
#[test]
fn bq_en_queue() {
let bq = BlockingQueue::<f64>::new();
bq.en_q(3.5);
assert_eq!(bq.len(), 1);
}
#[test]
fn bq_de_queue() {
let bq = BlockingQueue::<f64>::new();
bq.en_q(3.5);
assert_eq!(bq.de_q(), 3.5);
assert_eq!(bq.len(), 0);
}
}
Using Code
////////////////////////////////////////////////
// rust_blocking_queue::test1.rs //
// - demo blocking queue //
// //
// Jim Fawcett, https://JimFawcett.github.io //
////////////////////////////////////////////////
/*
This is a good demonstration of the way
BlockingQueue will be used in applications.
*/
use std::io::*;
use std::sync::*;
use std::thread;
use rust_blocking_queue::{BlockingQueue};
use std::time::Duration;
fn main() {
print!(
"\n Demonstrate queue shared between threads"
);
print!(
"\n =========================================="
);
test();
print!("\n\n That's all Folks!\n");
}
/*-- simple test of BlockingQueue --*/
fn test() {
let share = Arc::new(BlockingQueue::<String>::new());
let share1 = Arc::clone(&share);
let share2 = Arc::clone(&share);
let flush = || { let _ = std::io::stdout().flush(); };
/*-- child thread dequeues messages --*/
let handle = thread::spawn(move || {
print!("\n child thread started");
flush();
let dur = Duration::from_millis(50);
loop {
let t = share1.de_q();
print!("\n dequeued {} on child thread", t);
flush();
if &t == "quit" {
break;
}
thread::sleep(dur);
}
print!("\n thread shutting down");
flush();
});
/*-- main thread enqueues messages --*/
let dur = Duration::from_millis(20);
for i in 0..5 {
let msg = format!("msg #{}", i.to_string());
print!("\n enqueued {:?} on main thread", msg);
flush();
share2.en_q(msg);
thread::sleep(dur);
}
/*-- shut down child thread --*/
print!("\n enqueued {:?} on main thread", "quit");
flush();
share2.en_q("quit".to_string());
/*-- child thread must complete before exiting --*/
print!("\n waiting for child thread to stop");
flush();
let _ = handle.join();
print!("\n queue length = {}", share2.len());
}
Output
Demonstrate queue shared between threads
==========================================
enqueued "msg #0" on main thread
child thread started
dequeued msg #0 on child thread
enqueued "msg #1" on main thread
dequeued msg #1 on child thread
enqueued "msg #2" on main thread
enqueued "msg #3" on main thread
dequeued msg #2 on child thread
enqueued "msg #4" on main thread
enqueued "quit" on main thread
waiting for child thread to stop
dequeued msg #3 on child thread
dequeued msg #4 on child thread
dequeued quit on child thread
thread shutting down
queue length = 0
That's all Folks!