ib RustBite Threads
about
05/19/2022
RustBites - Threads
Rust Bites Code

Rust Bite - Threads

spawning and using Rust threads

1.0  Introduction

Rust guarantees data safety - it is impossible to read from or write to memory that the current program scope does not own. To ensure data safety Rust maintains an invariant:
"no simultaneous shared mutability through references"
This is nearly equivalent to the conditions: any given program scope code may hold any number of immutable references to a data item but only one immutable reference with no immutable references.
These are sufficient conditions for data safety, but not necessary (see RustBites Safety). That means that safe code may fail the static tests and so fail to build. The Rust compiler checks the invariant at compile time through static analysis. But it can only do partial analysis of the "simultaneous" part using non-lexical analysis. For applications that need to share data between threads, Rust code defers checking of thread accesses to run-time. It does that with types that internally use UnsafeCell<T>. This type wraps an unsafe block holding an instance of T. We examine partial code for this type and two types that use it and show how that enables building safe code that would otherwise fail to build.

1.0 - Interior Mutability

The types that wrap UnsafeCell<T>, e.g., Cell<T> and RefCell<T> provide safe interfaces that support deferring borrow checking to run time.
Fig 1. Supporting Interior Mutability
Calling methods on UnsafeCell<T> is unsafe, but the types Cell<T> and RefCell<T> provide safe interfaces by calling into UnsafeCell<T> in ways that are verifiably safe. The purpose of UnsafeCell<T> is to support mutation from methods with immutable interfaces. That is, at compile-time mutability is hidden from the static checker so code that is sound but does not statisfy the static "no shared mutability" invariant will build. That means that borrow checking now must be enforced at run-time, and that is the purpose of RefCell<T>. The Cell<T> type has a pass and return by value interface so no run-time checks are needed. Before diving into code for UnsafeCell<T> and its Cell<T> and RefCell<T> users, let's look at a simple demonstration of the later. In the left panel we present code that doesn't use interior mutability. It fails to build if the last print! is uncommented. That violates the rule about no use of immutable references when a mutable reference is in scope. In the right panel we use RefCell to support interior mutability. That all works. The code builds and works as expected. But if we uncomment the drop(rx) statement we get a run-time panic. That shows that the run-time checking is working. This behavior is just enough to allow threads to share and mutate a common resource. That works because, due to locking, the shares are isolated in the scope of a lock.
No Interior Mutability /*----------------------------------------------------- Illustrates that perfectly safe code can fail to compile. The safety invariant is very strong. -----------------------------------------------------*/ fn may_fail_to_build() { print!("\n -- no interior mutability --"); let mut x = 42i32; print!("\n x = {}", x); print!("\n - immutable reference -"); let rx = &x; print!("\n *rx = {}", rx); print!("\n - mutable reference -"); let mrx = &mut x; *mrx += 1; // drop(mrx); won't save compilation - not used by borrow chkr print!("\n *mrx = {}", mrx); // error: attempt to use immutable ref with mutable ref in scope // causes compile failure // print!("\n *rx = {}", rx); println!(); // final observation: error is not due to declaration // it's due to use. } fn main() { may_fail_to_build(); } Output -- no interior mutability -- x = 42 - immutable reference - *rx = 42 - mutable reference - *mrx = 43 Interior Mutability /*----------------------------------------------------- Illustrates that code violating safety invariant can compile with the help of interior mutability. - you still can't break the iinvariant rule due to run-time checks. However, using locks avoids run-time failure (not illustrated here, but later). -----------------------------------------------------*/ fn may_fail_at_runtime() { print!("\n -- interior mutability --"); let x = RefCell::new(42i32); print!("\n x = {}", x.into_inner()); // RefCell is not copy so now invalid print!("\n - immutable reference -"); let x = RefCell::new(42i32); let rx = x.borrow(); print!("\n *rx = {}", rx); // commenting out drop causes run-time panic drop(rx); print!("\n - mutable reference -"); *x.borrow_mut() += 1; // mutable ref ends here print!("\n *mrx = {}", x.borrow()); } fn main() { may_fail_at_runtime(); } Output -- interior mutability -- x = 42 - immutable reference - *rx = 42 - mutable reference - *mrx = 43
code in playground So, you say to yourself "we've moved failure from compile-time to run-time. That's good??". The answer is yes, that's good, because now, as shown in Section 2., the use of locks avoids run-time failure. Without interior mutability we could't get there. Now, let's see how interior mutability is molded out of the std::cell components. All the important stuff happens in the RefCell<T> functions:
- fn borrow_mut(&self) -> RefMut<'_, T>
- fn deref_mut(&mut self) -> &mut T.
The signature for borrow_mut is immutable, but we get a wrapper RefMut which can be implicitly dereferenced to yield a mutable pointer! This lie satisfies the borrow checker and lets our code build. But now, RefCell is obligated to undo the lie by checking the borrowing invariants at run-time. If you look closely you will see some of that in the details section below this block.
RefCell<T> - Excerpts from std::cell code pub struct RefCell<T: ?Sized> { borrow: Cell<BorrowFlag>, value: UnsafeCell<T>, } type BorrowFlag = isize; const UNUSED: BorrowFlag = 0; impl<T> RefCell<T> { // elided code /// Mutably borrows the wrapped value. /// /// Borrow lasts until returned `RefMut` or all `RefMut`s /// derived from it exit scope. Value cannot be borrowed /// while this borrow is active. /// /// # Panics /// /// Panics if value is currently borrowed. For non-panicking /// variant, use [`try_borrow_mut`](#method.try_borrow_mut). /// /*-------------------------------------------------------- Here is the core of interior mutability - Part 1 --------------------------------------------------------*/ pub fn borrow_mut(&self) -> RefMut<'_, T> { self.try_borrow_mut().expect("already borrowed") } /// Mutably borrows wrapped value, returning error if value /// is currently borrowed. /// /// Borrow lasts until returned `RefMut` or all `RefMut`s /// derived from it exit scope. Value cannot be borrowed /// while this borrow is active. /// pub fn try_borrow_mut(&self) -> Result<RefMut<'_, T>, BorrowMutError> { match BorrowRefMut::new(&self.borrow) { // SAFETY: `BorrowRef` guarantees unique access. Some(b) => Ok( RefMut { value: unsafe { &mut *self.value.get() }, borrow: b } ), None => Err(BorrowMutError { _private: () }), } } // elided code } /*-------------------------------------------------------- Here is the core of interior mutability - Part 2 --------------------------------------------------------*/ impl<T: ?Sized> DerefMut for RefMut<'_, T> { fn deref_mut(&mut self) -> &mut T { self.value } }
The code block above shows how "interior mutability" is implemented. Excerpts of more of the std::cell library code are shown in the details below.
Larger Code Excerpts for std::cell types If you are interested you can see all the details in the std::cell::RefCell library, linked at the top of the right panel.
UnsafeCell<T> - partial definition pub struct UnsafeCell<T: ?Sized> { value: T, } impl<T: ?Sized> !Sync for UnsafeCell<T> {} impl<T> UnsafeCell<T> { pub const fn new(value: T) -> UnsafeCell<T> { UnsafeCell { value } } pub const fn into_inner(self) -> T { self.value } } impl<T: ?Sized> UnsafeCell<T> { /// immutable interface yields mutable pointer !! pub const fn get(&self) -> *mut T { // We can just cast pointer from `UnsafeCell<T>` to `T` self as *const UnsafeCell<T> as *const T as *mut T } /// This call borrows the `UnsafeCell` mutably /// (at compile-time) which guarantees that we /// possess the only reference. pub fn get_mut(&mut self) -> &mut T { &mut self.value } pub const fn raw_get(this: *const Self) -> *mut T { // We can just cast pointer from `UnsafeCell<T>` to `T` this as *const T as *mut T } } Cell<T> - Partial Definition pub struct Cell<T: ?Sized> { value: UnsafeCell<T>, } unsafe impl<T: ?Sized> Send for Cell<T> where T: Send {} impl<T: ?Sized> !Sync for Cell<T> {} impl<T: Copy> Clone for Cell<T> { fn clone(&self) -> Cell<T> { Cell::new(self.get()) } } impl<T: Default> Default for Cell<T> { /// Creates a `Cell<T>`, with the `Default` value for T. fn default() -> Cell<T> { Cell::new(Default::default()) } } impl<T> From<T> for Cell<T> { fn from(t: T) -> Cell<T> { Cell::new(t) } } impl<T> Cell<T> { /// Creates a new `Cell` containing the given value. pub const fn new(value: T) -> Cell<T> { Cell { value: UnsafeCell::new(value) } } /// Sets the contained value. pub fn set(&self, val: T) { let old = self.replace(val); drop(old); } /// Swaps the values of two Cells. /// Difference with `std::mem::swap` is that this /// function doesn't require `&mut` reference. pub fn swap(&self, other: &Self) { if ptr::eq(self, other) { return; } // SAFETY: This can be risky if called from // separate threads, but `Cell` is `!Sync` // so this won't happen. This also won't // invalidate any pointers since `Cell` makes // sure nothing else will be pointing into // either of these `Cell`s. unsafe { ptr::swap(self.value.get(), other.value.get()); } } /// Replaces the contained value, and returns it. pub fn replace(&self, val: T) -> T { // SAFETY: This can cause data races if called // from a separate thread, // but `Cell` is `!Sync` so this won't happen. mem::replace(unsafe { &mut *self.value.get() }, val) } /// Unwraps the value. pub const fn into_inner(self) -> T { self.value.into_inner() } } impl<T: Copy> Cell<T> { /// Returns a copy of the contained value. pub fn get(&self) -> T { // SAFETY: This can cause data races if called // from a separate thread, // but `Cell` is `!Sync` so this won't happen. unsafe { *self.value.get() } } } impl<T: ?Sized> Cell<T> { /// Returns raw pointer to underlying data in cell. pub const fn as_ptr(&self) -> *mut T { self.value.get() } /// Returns mutable reference to underlying data. /// /// Call borrows `Cell` mutably (at compile-time) /// which guarantees that we possess the only /// reference. pub fn get_mut(&mut self) -> &mut T { self.value.get_mut() } /// Returns a `&Cell<T>` from a `&mut T` pub fn from_mut(t: &mut T) -> &Cell<T> { // SAFETY: `&mut` ensures unique access. unsafe { &*(t as *mut T as *const Cell<T>) } } } impl<T: Default> Cell<T> { /// Takes value of cell, leaving `Default::default()` /// in its place. pub fn take(&self) -> T { self.replace(Default::default()) } } impl<T> Cell<[T]> { /// Returns a `&[Cell<T>]` from a `&Cell<[T]>` pub fn as_slice_of_cells(&self) -> &[Cell<T>] { // SAFETY: `Cell<T>` has same memory layout as `T`. unsafe { &*(self as *const Cell<[T]> as *const [Cell<T>]) } } } RefCell<T> - partial definition pub struct RefCell<T: ?Sized> { borrow: Cell<BorrowFlag>, value: UnsafeCell<T>, } /// An error returned by [`RefCell::try_borrow`]. pub struct BorrowError { _private: (), } /// An error returned by [`RefCell::try_borrow_mut`]. pub struct BorrowMutError { _private: (), } type BorrowFlag = isize; const UNUSED: BorrowFlag = 0; impl<T> RefCell<T> { pub const fn new(value: T) -> RefCell<T> { RefCell { value: UnsafeCell::new(value), borrow: Cell::new(UNUSED) } } pub const fn into_inner(self) -> T { // Since function takes `self` (the `RefCell`) by value, // compiler statically verifies that it is not currently // borrowed. self.value.into_inner() } pub fn replace(&self, t: T) -> T { mem::replace(&mut *self.borrow_mut(), t) } pub fn swap(&self, other: &Self) { mem::swap( &mut *self.borrow_mut(), &mut *other.borrow_mut() ) } } impl<T: ?Sized> RefCell<T> { /// Immutably borrows the wrapped value. /// /// Borrow lasts until returned `Ref` exits scope. /// Multiple immutable borrows can be taken out at /// the same time. /// /// # Panics /// /// Panics if value is currently mutably borrowed. /// For a non-panicking variant, use /// [`try_borrow`](#method.try_borrow). /// pub fn borrow(&self) -> Ref<'_, T> { self.try_borrow() .expect("already mutably borrowed") } /// Immutably borrows wrapped value, returning error /// if value is currently mutably borrowed. /// /// Borrow lasts until returned `Ref` exits scope. /// Multiple immutable borrows can be taken out at /// the same time. /// pub fn try_borrow(&self) -> Result<Ref<'_, T>, BorrowError> { match BorrowRef::new(&self.borrow) { // SAFETY: // `BorrowRef` ensures there is only immutable // access to value while borrowed. Some(b) => Ok( Ref { value: unsafe { &*self.value.get() }, borrow: b } ), None => Err(BorrowError { _private: () }), } } /// Mutably borrows the wrapped value. /// /// Borrow lasts until returned `RefMut` or all `RefMut`s /// derived from it exit scope. Value cannot be borrowed /// while this borrow is active. /// /// # Panics /// /// Panics if value is currently borrowed. For non-panicking /// variant, use [`try_borrow_mut`](#method.try_borrow_mut). /// /*-------------------------------------------------------- Here is the core of interior mutability - Part 1 --------------------------------------------------------*/ pub fn borrow_mut(&self) -> RefMut<'_, T> { self.try_borrow_mut().expect("already borrowed") } /// Mutably borrows wrapped value, returning error if value /// is currently borrowed. /// /// Borrow lasts until returned `RefMut` or all `RefMut`s /// derived from it exit scope. Value cannot be borrowed /// while this borrow is active. /// pub fn try_borrow_mut(&self) -> Result<RefMut<'_, T>, BorrowMutError> { match BorrowRefMut::new(&self.borrow) { // SAFETY: `BorrowRef` guarantees unique access. Some(b) => Ok( RefMut { value: unsafe { &mut *self.value.get() }, borrow: b } ), None => Err(BorrowMutError { _private: () }), } } /// Returns a raw pointer to the underlying data in this cell. pub fn as_ptr(&self) -> *mut T { self.value.get() } /// Returns a mutable reference to the underlying data. /// /// Call borrows `RefCell` mutably (at compile-time) so no /// need for dynamic checks. /// pub fn get_mut(&mut self) -> &mut T { self.value.get_mut() } } impl<T: Default> RefCell<T> { /// Takes wrapped value, leaving `Default::default()` /// in its place. pub fn take(&self) -> T { self.replace(Default::default()) } } unsafe impl<T: ?Sized> Send for RefCell<T> where T: Send {} impl<T: ?Sized> !Sync for RefCell<T> {} impl<T: Clone> Clone for RefCell<T> { /// Panics if the value is currently mutably borrowed. fn clone(&self) -> RefCell<T> { RefCell::new(self.borrow().clone()) } } impl<T: Default> Default for RefCell<T> { /// Creates `RefCell<T>`, with `Default` value for T. fn default() -> RefCell<T> { RefCell::new(Default::default()) } } impl<T> From<T> for RefCell<T> { fn from(t: T) -> RefCell<T> { RefCell::new(t) } } struct BorrowRef<'b> { borrow: &'b Cell<BorrowFlag>, } /// Wraps borrowed reference to value in a `RefCell` box. /// Wrapper type for immutably borrowed value from /// `RefCell<T>`. pub struct Ref<'b, T: ?Sized + 'b> { value: &'b T, borrow: BorrowRef<'b>, } impl<T: ?Sized> Deref for Ref<'_, T> { type Target = T; fn deref(&self) -> &T { self.value } } struct BorrowRefMut<'b> { borrow: &'b Cell<BorrowFlag>, } impl Drop for BorrowRefMut<'_> { fn drop(&mut self) { let borrow = self.borrow.get(); debug_assert!(is_writing(borrow)); self.borrow.set(borrow + 1); } } impl<'b> BorrowRefMut<'b> { fn new(borrow: &'b Cell<BorrowFlag>) -> Option<BorrowRefMut<'b>> { // NOTE: // Unlike BorrowRefMut::clone, new is called to // create initial mutable reference, and so there // must currently be no existing references. Thus, // while clone increments mutable refcount, // we only allow going from UNUSED to UNUSED - 1. match borrow.get() { UNUSED => { borrow.set(UNUSED - 1); Some(BorrowRefMut { borrow }) } _ => None, } } } /// Wrapper type for mutably borrowed value from `RefCell<T>`. pub struct RefMut<'b, T: ?Sized + 'b> { value: &'b mut T, borrow: BorrowRefMut<'b>, } impl<T: ?Sized> Deref for RefMut<'_, T> { type Target = T; fn deref(&self) -> &T { self.value } } /*-------------------------------------------------------- Here is the core of interior mutability - Part 2 --------------------------------------------------------*/ impl<T: ?Sized> DerefMut for RefMut<'_, T> { fn deref_mut(&mut self) -> &mut T { self.value } }
RefCell<T> tracks references at run-time, using its borrow member to remember the current state of references and if an attempt to create a mutable reference is made when there are any other active references the cell will panic. demonstration code in playground Adding synchronization with Mutex<T> or RWLock<T> will then ensure that locked data can only be accessed by a single thread at any given time. Both of these locking constructs use RefCell<T>s to allow multiple threads to mutably share a guarded data item, but not simultaneously, so there won't be any data races or memory faults. A Rust Mutex<T> guards data, t ε T, not regions of code. The only way to access protected shared data is through its Mutex<T> or other lock.

2.0  Basic Threads

Rust threads are represented by the thread type and created with an associated function:
spawn<F, T>(f: F) -> JoinHandle<T>
  where F: FnOnce() -> T, F: send + `static, T: Send + `static
spawn accepts a closure that implements thread processing. The std::thread library's implementation of spawn uses the platform api to start a native thread running the supplied closure. When the thread exits this closure it is terminated.
Note that f: F is passed by value, e.g., moved into the closure. If the closure uses any captured local data then you must explicitly move the closure into spawn. That's illustrated in the next demonstration. The code, below, illustrates creation of a child thread. The main thread and child share std::output and an AtomicUsize global variable. Global variables are usually avoided in Rust and other languages as well due to possibly widely distributed side affects. It's used here for local thread-safe sharing between main and child threads just to illustrate how that works.
Basic Thread Demo // Basic Threads // sharing std::output and AtomicUsize - both thread safe use std::thread; use std::time::Duration; use std::sync::atomic::{AtomicUsize, Ordering}; static COUNT: AtomicUsize = AtomicUsize::new(0usize); fn test() { // child thread processing let dur = Duration::from_millis(2); let handle = thread::spawn( move || { for _i in 0..10 { print!("\n child thread printing"); let _ = COUNT.fetch_add(1, Ordering::SeqCst); thread::sleep(dur); } } ); // main thread processing let dur = Duration::from_millis(3); for _i in 0..10 { print!("\n main thread printing"); let _ = COUNT.fetch_add(1, Ordering::SeqCst); thread::sleep(dur); } // wait for thread to complete let _ = handle.join(); print!("\n\n number of prints = {:?}", COUNT); } fn main() { print!("\n -- Basic Threads --\n"); test(); print!("\n\n That's all Folks!\n\n"); } Output -- Basic Threads -- main thread printing child thread printing child thread printing child thread printing main thread printing child thread printing main thread printing child thread printing main thread printing child thread printing child thread printing main thread printing child thread printing main thread printing child thread printing child thread printing main thread printing main thread printing main thread printing main thread printing number of prints = 20 That's all Folks!
code in playground Note that the global COUNT atomic is not declared as mutable, but in fact is mutated by both main and child. That works because Atomics, Arcs, and Mutexes implement interior mutability, as discussed above and in the Safety Bite. Interior mutability defers borrowing checks from compile-time to run-time. See Interior mutability in Rust, by Ricardo Martins .

3.0  Sharing non-thread-safe Resources:

The previous example was relatively simple because threads only shared inherently thread-safe objects, e.g., an Atomic and std::output. The example below illustrates how non-thread-safe objects need to be handled. One way to share objects that are not thread safe is to use locks. In the example below a Mutex wraps a String that three threads will share. Rust Mutexes are unique in that they protect data not areas of code. That eliminates a blunder, observed in other languages, of not sharing a common lock, so no protection is afforded to the shared item. Mutexes are usually shared by using a thread safe reference-counted pointer, Arc<T>, using the syntax:
let shared = Arc::new(Mutex::new(s));
where s is the entity to be shared. Sharing is then effected with:
let shared1 = Arc::clone(&shared)
let shared2 = Arc::clone(&shared)
shared1 and shared2 are pointers to the wrapped Mutex which, in turn, wraps the shared entity, s, affording sychronized access.
Note that shared1 and shared2 will both mutate a shared String resource. But they are not declared mutable because that would violate the static safety invariant. Instead they both use interior mutability offered by Mutex<T> because it holds a RefCell<T>.
Thread Sharing // threads sharing string use std::thread; use std::sync::{Arc, Mutex}; use std::time::Duration; fn test() { let a = " main thread pushes \'0\'"; let b = " first child thread pushes \'1\'"; let c = "second child thread pushes \'2\'"; print!("\n {}", a); print!("\n {}", b); print!("\n {}\n", c); let s = String::new(); let shared = Arc::new(Mutex::new(s)); // main thread gets first edit let shared0 = Arc::clone(&shared); if let Ok(mut temp) = shared0.lock() { temp.push('0'); } // create child threads let shared1 = Arc::clone(&shared); let dur = Duration::from_millis(2); // faster let handle1 = thread::spawn( move || { for _i in 0..15 { // child edits shared string if let Ok(mut temp) = shared1.lock() { temp.push('1'); } thread::sleep(dur); } } ); // create second child thread let shared2 = Arc::clone(&shared); let dur = Duration::from_millis(3); // slower let handle2 = thread::spawn( move || { for _i in 0..15 { // child edits string if let Ok(mut temp) = shared2.lock() { temp.push('2'); } thread::sleep(dur); } } ); let _ = handle1.join(); let _ = handle2.join(); let mut s = String::new(); if let Ok(mut temp) = shared0.lock() { temp.push('0'); s = temp.to_string(); } print!("\n {}", s); } fn main() { print!("\n -- Threads sharing String --\n"); test(); print!("\n\n That's all Folks!\n\n"); } Output -- Threads sharing String -- main thread pushes '0' first child thread pushes '1' second child thread pushes '2' 01121121211212112112121212222220 That's all Folks!
code in playground Rust has two other synchronizing constructs that are often used in threaded code: RWLock, a reader-writer lock, and Condvar a condition variable. We use Condvar in the next example.

4.0  ThreadResult

The C++ programming language has a std::future type that allows a thread to return a promise to deliver a result when it's done. Users can do work then block on the thread's future until the thread finishes. Rust also has a Future type, associated with async/await processing, but its operation is tied to the mechanisms used for that, e.g., a special-purpose runtime, often provided by the Tokio crate. In this section we develop a ThreadResult<T> type that behaves in a way very similar to the C++ std::future.
ThreadResult<T> //////////////////////////////////////////////// // thread_result::lib.rs // // - Wait for thread to complete // // // // Jim Fawcett, https://JimFawcett.github.io // //////////////////////////////////////////////// #![allow(clippy::mutex_atomic)] #![allow(dead_code)] use std::sync::*; use std::fmt::Debug; #[derive(Debug, Default)] pub struct ThreadResult<T> { pub result: Mutex<T>, cv: Condvar, ready: Mutex<bool> } impl<T: Debug + Default + Clone> ThreadResult<T> { pub fn new() -> Self { Self { result: Mutex::new(T::default()), cv: Condvar::new(), ready: Mutex::new(false), } } /*-------------------------------------------- Unwrapping is appropriate here. The operation fails if the Mutex becomes poisoned, due to panic on a thread holding the lock. But then you can't do much except quit, which the unwrap does for you. --------------------------------------------*/ pub fn set(&self, t:T) { let mut lr = self.ready.lock().unwrap(); *lr = true; let mut lrslt = self.result.lock().unwrap(); *lrslt = t; self.cv.notify_all(); } pub fn get(&self) -> T { let mut rdy = self.ready.lock().unwrap(); while !*rdy { rdy = self.cv.wait(rdy).unwrap(); } let rslt = self.result.lock().unwrap(); rslt.clone() } pub fn ready(&self) -> bool { *self.ready.lock().unwrap() } } Using Code //////////////////////////////////////////////// // thread_result::test1.rs // // - basic ThreadResult test // // // // Jim Fawcett, https://JimFawcett.github.io // //////////////////////////////////////////////// use std::sync::Arc; use std::thread; use std::time::{Duration, Instant}; fn test() { let thrd_rslt = Arc::new(ThreadResult::<i32>::new()); let thrd_rslt1 = Arc::clone(&thrd_rslt); let thrd_rslt2 = Arc::clone(&thrd_rslt); let cls = |share:Arc<ThreadResult<i32>>| { print!("\n starting thread"); let dur = Duration::from_millis(100u64); thread::sleep(dur); share.set(42); }; let handle1 = thread::spawn( move || {cls(thrd_rslt1).clone()} ); print!("\n -- poll for result --"); let dur = Duration::from_millis(20); loop { print!("\n main polling"); if !thrd_rslt2.ready() { thread::sleep(dur); } else { print!( "\n thread result is {}\n", thrd_rslt2.get() ); break; } } print!("\n -- wait for result --"); let thrd_rslt = Arc::new(ThreadResult::<i32>::new()); let thrd_rslt1 = Arc::clone(&thrd_rslt); let thrd_rslt2 = Arc::clone(&thrd_rslt); let dur = Duration::from_millis(50); let now = Instant::now(); let handle2 = thread::spawn( move || {cls(thrd_rslt1).clone()} ); thread::sleep(dur); let elapsed = now.elapsed().as_millis(); print!( "\n after {:?} millis calling get()", elapsed ); let rslt = thrd_rslt2.get(); let elapsed = now.elapsed().as_millis(); print!( "\n at {:?} millis, thread result is {}", elapsed, rslt ); let _ = handle1.join(); let _ = handle2.join(); println!(); } fn main() { test(); } Output -- poll for result -- main polling starting thread main polling main polling main polling main polling main polling thread result is 42 -- wait for result -- starting thread after 50 millis calling get() at 100 millis, thread result is 42
code in playground Normally we would not continuously poll the ThreadResult. Instead start the thread, go off and do work until you must use the result and then call get() which blocks until the result is ready. Note that the ThreadResult<T> is generic so the thread can return a rich set of possible data.

5.0  BlockingQueue:

The last example in this Bite is an implementation of a BlockingQueue<T>. It is thread-safe, supporting multiple enqueuers. A dequeuer blocks if the queue is empty until an enqueuer posts a message. With a little more work we can use BlockingQueue<T> to build a ThreadPool<T> that supports multiple dequeuers too. That you can explore using the link.
BlockingQueue<T> library ///////////////////////////////////////////////// // rust_blocking_queue::lib.rs - BlockingQueue // // // // Jim Fawcett, https://JimFawcett.github.io // ///////////////////////////////////////////////// /* This is a BlockingQueue abstraction. To be shared between threads, without using unsafe code, any abstraction must be composed only of Mutexes and Condvars or struct or tuple with only those members. That means that the blocking queue must hold its native queue in a Mutex, as shown below. There is another alternative, based on Rust channels, which are essentially blocking queues. */ #![allow(dead_code)] use std::sync::*; use std::collections::*; #[derive(Debug)] /// Thread-safe queue that blocks de_q on empty pub struct BlockingQueue<T> { q: Mutex<VecDeque<T>>, cv: Condvar, } impl<T> BlockingQueue<T> { /// Create empty blocking queue pub fn new() -> Self { Self { q: Mutex::new(VecDeque::new()), cv: Condvar::new(), } } /// push input on back of queue /// - unrecoverable if lock fails so just unwrap pub fn en_q(&self, t:T) { let mut lq = self.q.lock().unwrap(); lq.push_back(t); self.cv.notify_one(); } /// pop element from front of queue /// - unrecoverable if lock fails so just unwrap /// - same for condition variable pub fn de_q(&self) -> T { let mut lq = self.q.lock().unwrap(); while lq.len() == 0 { lq = self.cv.wait(lq).unwrap(); } lq.pop_front().unwrap() } /// return number of elements in queue pub fn len(&self) -> usize { self.q.lock().unwrap().len() } } #[cfg(test)] mod tests { use super::*; #[test] fn bq_len() { let bq = BlockingQueue::<f64>::new(); assert_eq!(bq.len(), 0); } #[test] fn bq_en_queue() { let bq = BlockingQueue::<f64>::new(); bq.en_q(3.5); assert_eq!(bq.len(), 1); } #[test] fn bq_de_queue() { let bq = BlockingQueue::<f64>::new(); bq.en_q(3.5); assert_eq!(bq.de_q(), 3.5); assert_eq!(bq.len(), 0); } } Using Code //////////////////////////////////////////////// // rust_blocking_queue::test1.rs // // - demo blocking queue // // // // Jim Fawcett, https://JimFawcett.github.io // //////////////////////////////////////////////// /* This is a good demonstration of the way BlockingQueue will be used in applications. */ use std::io::*; use std::sync::*; use std::thread; use rust_blocking_queue::{BlockingQueue}; use std::time::Duration; fn main() { print!( "\n Demonstrate queue shared between threads" ); print!( "\n ==========================================" ); test(); print!("\n\n That's all Folks!\n"); } /*-- simple test of BlockingQueue --*/ fn test() { let share = Arc::new(BlockingQueue::<String>::new()); let share1 = Arc::clone(&share); let share2 = Arc::clone(&share); let flush = || { let _ = std::io::stdout().flush(); }; /*-- child thread dequeues messages --*/ let handle = thread::spawn(move || { print!("\n child thread started"); flush(); let dur = Duration::from_millis(50); loop { let t = share1.de_q(); print!("\n dequeued {} on child thread", t); flush(); if &t == "quit" { break; } thread::sleep(dur); } print!("\n thread shutting down"); flush(); }); /*-- main thread enqueues messages --*/ let dur = Duration::from_millis(20); for i in 0..5 { let msg = format!("msg #{}", i.to_string()); print!("\n enqueued {:?} on main thread", msg); flush(); share2.en_q(msg); thread::sleep(dur); } /*-- shut down child thread --*/ print!("\n enqueued {:?} on main thread", "quit"); flush(); share2.en_q("quit".to_string()); /*-- child thread must complete before exiting --*/ print!("\n waiting for child thread to stop"); flush(); let _ = handle.join(); print!("\n queue length = {}", share2.len()); } Output Demonstrate queue shared between threads ========================================== enqueued "msg #0" on main thread child thread started dequeued msg #0 on child thread enqueued "msg #1" on main thread dequeued msg #1 on child thread enqueued "msg #2" on main thread enqueued "msg #3" on main thread dequeued msg #2 on child thread enqueued "msg #4" on main thread enqueued "quit" on main thread waiting for child thread to stop dequeued msg #3 on child thread dequeued msg #4 on child thread dequeued quit on child thread thread shutting down queue length = 0 That's all Folks!

6.0  Epilogue

Well, this has been some journey - yeah! Lot's of interesting things in the thread code mix. And, we haven't talked seriously about synchronization:
  • How does CondVar work and why do we need to use it?
  • What does RWLock<T> do and when would we use it?
  • What is a Barrier and should we care?
For those, stay tuned for the Synchronization bite. That probably will appear soon. In the mean time, look here: std::sync. That will eventually be followed by Channels. For now, look here: mpsc. Both locks and channels are treated lightly in the Rust Story.
  Next Prev Pages Sections About Keys