ib RustBite Synchronization
about
05/19/2022
RustBites - Synchronization
Code Repository Code Examples Rust Bites Repo

Rust Bite - Synchronization

safe sharing with Mutex, RwLock, Condvar, Atomic, Arc, Barrier

1.0  Introduction

Note: much of the material provided in the tables, below, was taken from the Rust std documentation. The Barrier example code was also extracted, with minor modification, from the std::sync::Barrier documentation. Synchronization is the process of ensuring that, in a multi-threaded program, each thread gets exclusive access to a shared resource, usually by locking with a synchronization type. The common synchronization types are listed in Table 1., below.

Table 1. - Common Synchronization Types

TypeDescription
Mutex<T> A mutual exclusion primitive useful for protecting shared data.
RwLock<T> A reader-writer lock allowing multiple readers or at most one writer at any point in time.
Condvar A Condition Variable blocks threads waiting on an event to occur.
Atomic Atomic types provide primitive shared-memory communication between threads.
Arc<T> A thread-safe reference-counting pointer. Arc is used to enable sharing of guarded resources in programs with multiple threads. It holds its data in the heap.
Barrier A barrier enables multiple threads to synchronize the beginning of some computation.

2.0  Mutex<T>

The Rust Mutex<T> is a lock designed to grant a thread acquiring the lock exclusive access to its contained resource, tεT. Any other thread attempting to acquire the lock will block until the owning thread unlocks. Note that Rust Mutexes protect a specified data item, not regions of code. Guarding regions makes it impossible for two threads to attempt to share the data with separate locks, an error that some other language libraries allow.

Table 2. - Mutex Methods:

MethodDescription
pub fn new(t: T) -> Mutex Creates a new mutex in an unlocked state
pub fn lock(&self) -> LockResult<MutexGuard<'_, T>> Acquires a mutex, blocking the current thread until it is able to do so. The returned MutexGuard<'_, T>, wrapped in a LockResult, implicitly dereferences to provide access to the data. It is dropped when it goes out of scope, releasing the lock.
pub fn try_lock(&self) -> TryLockResult<MutexGuard<'_, T>> If the lock could not be acquired at this time, then Err is returned. Otherwise, an RAII guard is returned. The lock will be unlocked when the guard is dropped.
pub fn into_inner(self) -> LockResult<T> Consumes this mutex, returning the underlying data
more methods ... methods, descriptions, and examples
Mutex Example Mutex Example // Demonstrate threads sharing string resource #![allow(unused_imports)] #![allow(dead_code)] use std::thread; use std::sync::{Mutex, RwLock, Condvar, Arc}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; fn test_mutex() { /* define shared resource */ let s = String::new(); let share = Arc::new(Mutex::new(s)); /* define reader processing */ static RID:AtomicUsize = AtomicUsize::new(0usize); let rd_cls = |shared:Arc<Mutex<String>>| { /* fetch_add returns old value */ let my_id = RID.fetch_add(1, Ordering::SeqCst) + 1; let dur = Duration::from_millis(2); for _i in 0..10 { if let Ok(temp) = shared.lock() { print!( "\n reader {} : str len = {}", my_id, temp.len() ); } thread::sleep(dur); } }; /* start readers */ let shared = Arc::clone(&share); let handle1 = thread::spawn(move || { rd_cls(shared); }); let shared = Arc::clone(&share); let handle2 = thread::spawn(move || { rd_cls(shared); }); /* define writer processing */ static WID:AtomicUsize = AtomicUsize::new(2usize); let wr_cls = |shared:Arc<Mutex<String>>| { /* fetch_add returns old value */ let my_id = WID.fetch_add(1, Ordering::SeqCst) + 1; let dur = Duration::from_millis(2); for _i in 0..10 { if let Ok(mut temp) = shared.lock() { let digit = my_id.to_string(); temp.push_str(digit.as_str()); print!( "\n writer {} : str len = {}", my_id, temp.len() ); } thread::sleep(dur); } }; /* start writers */ let shared = Arc::clone(&share); let handle3 = thread::spawn(move || { wr_cls(shared); }); let shared = Arc::clone(&share); let handle4 = thread::spawn(move || { wr_cls(shared); }); /* main thread waits for children to finish */ let _ = handle1.join(); let _ = handle2.join(); let _ = handle3.join(); let _ = handle4.join(); /* lock will fail if thread holding lock panics */ if let Ok(mod_str) = share.lock() { print!("\n modified string: {:?}", mod_str); }; /* semicolon needed to ensure mod_str lives long enough */ } fn main() { test_mutex(); } Output reader 1 : str len = 0 reader 1 : str len = 0 reader 1 : str len = 0 reader 1 : str len = 0 writer 3 : str len = 1 reader 2 : str len = 1 writer 4 : str len = 2 reader 1 : str len = 2 writer 3 : str len = 3 reader 1 : str len = 3 writer 4 : str len = 4 reader 2 : str len = 4 writer 3 : str len = 5 reader 1 : str len = 5 writer 4 : str len = 6 reader 2 : str len = 6 writer 3 : str len = 7 reader 1 : str len = 7 writer 4 : str len = 8 reader 2 : str len = 8 reader 1 : str len = 8 writer 3 : str len = 9 writer 4 : str len = 10 reader 2 : str len = 10 reader 1 : str len = 10 writer 4 : str len = 11 reader 2 : str len = 11 writer 4 : str len = 12 reader 2 : str len = 12 writer 3 : str len = 13 writer 4 : str len = 14 reader 2 : str len = 14 writer 3 : str len = 15 writer 4 : str len = 16 reader 2 : str len = 16 writer 3 : str len = 17 writer 4 : str len = 18 reader 2 : str len = 18 writer 3 : str len = 19 writer 3 : str len = 20 modified string: "34343434344434343433"
mutex demo in playground

3.0  RwLock<T>

RwLock<T>s held by a reader allow any number of simultaneous readers to access its protected data simultaneously. However, a RwLock<T> grants exclusive access to writers. That is, only one writer may own the lock, and all others attempting to acquire the lock with block.

Table 3. - RwLock<T> Methods:

MethodDescription
pub fn new(t: T) -> RwLock Creates a new instance of an RwLock which is unlocked.
pub fn read(&self) -> LockResult<RwLockReadGuard<'_, T>> Locks rwlock with shared read access, blocking current thread until it can be acquired.
pub fn write(&self) -> LockResult<RwLockWriteGuard<'_, T>> Locks rwlock with exclusive write access, blocking current thread until it can be acquired.
pub fn into_inner(self) -> LockResult<T> Consumes RwLock, returning underlying data.
more methods ... methods, descriptions, and examples
RwLock Example RwLock Example // Demonstrate threads sharing string resource #![allow(unused_imports)] #![allow(dead_code)] use std::thread; use std::sync::{Mutex, RwLock, Condvar, Arc}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; fn test_rwlock() { /* define shared resource */ let s = String::new(); let share = Arc::new(RwLock::new(s)); /* define reader processing */ static RID:AtomicUsize = AtomicUsize::new(0usize); let rd_cls = |shared:Arc>| { let my_id = RID.fetch_add(1, Ordering::SeqCst) + 1; let dur = Duration::from_millis(2); for _i in 0..10 { if let Ok(temp) = shared.read() { print!( "\n reader {} : str len = {}", my_id, temp.len() ); } thread::sleep(dur); } }; /* start readers */ let shared = Arc::clone(&share); let handle1 = thread::spawn(move || { rd_cls(shared); }); let shared = Arc::clone(&share); let handle2 = thread::spawn(move || { rd_cls(shared); }); let shared = Arc::clone(&share); let handle3 = thread::spawn(move || { rd_cls(shared); }); let shared = Arc::clone(&share); let handle4 = thread::spawn(move || { rd_cls(shared); }); /* define writer processing */ static WID:AtomicUsize = AtomicUsize::new(4usize); let wr_cls = |shared:Arc>| { let my_id = WID.fetch_add(1, Ordering::SeqCst) + 1; let dur = Duration::from_millis(2); for _i in 0..10 { if let Ok(mut temp) = shared.write() { let digit = my_id.to_string(); temp.push_str(digit.as_str()); print!( "\n writer {} : str len = {}", my_id, temp.len() ); } thread::sleep(dur); } }; /* start writers */ let shared = Arc::clone(&share); let handle5 = thread::spawn(move || { wr_cls(shared); }); let shared = Arc::clone(&share); let handle6 = thread::spawn(move || { wr_cls(shared); }); /* main thread waits for children to finish */ let _ = handle1.join(); let _ = handle2.join(); let _ = handle3.join(); let _ = handle4.join(); let _ = handle5.join(); let _ = handle6.join(); /* lock will fail if thread holding lock panics */ if let Ok(mod_str) = share.read() { print!("\n modified string: {:?}", *mod_str); }; /* semicolon needed here to ensure mod_str lives long enough */ } fn main() { test_rwlock(); } Output reader 1 : str len = 0 reader 1 : str len = 0 writer 5 : str len = 1 writer 6 : str len = 2 reader 2 : str len = 2 reader 3 : str len = 2 reader 4 : str len = 2 writer 5 : str len = 3 writer 6 : str len = 4 reader 2 : str len = 4 reader 3 : str len = 4 reader 1 : str len = 4 reader 4 : str len = 4 writer 5 : str len = 5 writer 6 : str len = 6 reader 2 : str len = 6 reader 3 : str len = 6 reader 4 : str len = 6 reader 1 : str len = 6 writer 5 : str len = 7 writer 6 : str len = 8 reader 2 : str len = 8 reader 3 : str len = 8 reader 4 : str len = 8 reader 1 : str len = 8 writer 5 : str len = 9 writer 6 : str len = 10 reader 2 : str len = 10 reader 3 : str len = 10 reader 4 : str len = 10 reader 1 : str len = 10 writer 5 : str len = 11 writer 6 : str len = 12 reader 2 : str len = 12 reader 3 : str len = 12 reader 4 : str len = 12 reader 1 : str len = 12 writer 5 : str len = 13 writer 6 : str len = 14 reader 2 : str len = 14 reader 3 : str len = 14 reader 4 : str len = 14 reader 1 : str len = 14 writer 5 : str len = 15 writer 6 : str len = 16 reader 2 : str len = 16 reader 3 : str len = 16 reader 4 : str len = 16 reader 1 : str len = 16 writer 5 : str len = 17 writer 6 : str len = 18 reader 2 : str len = 18 reader 3 : str len = 18 reader 4 : str len = 18 reader 1 : str len = 18 writer 5 : str len = 19 writer 6 : str len = 20 reader 2 : str len = 20 reader 3 : str len = 20 reader 4 : str len = 20 modified string: "56565656565656565656"
rwlock demo in playground

4.0  Condvar

Condvar supports one thread sending notifications to other waiting threads. Threads that need a result from the notifying thread block by calling wait method on the Condvar instance. Condvar instances are associated with a Mutex that guards information about the event. A notifier locks the Mutex, sets the information state, and then calls notify_one or notify_all on the Condvar. Threads that subscribe to the notification lock the Mutex and then call wait on the Condvar instance. Condvars do a special dance to avoid deadlocking by blocking the notifier. When wait is called the Condvar library code puts the subscribing thead to sleep and releases the lock so that notifiers can gain access. When a notification is called the library code acquires the lock, wakes the waiting thread, and locks.

Table 4. - Condvar Methods:

MethodsDescription
pub fn new() -> Condvar Creates new condition variable ready to be waited on and notified.
pub fn wait<'a, T>(&self, guard: MutexGuard<'a, T>)
-> LockResult<MutexGuard<'a, T>>
Blocks current thread until condition variable receives a notification.
pub fn notify_one(&self) Wakes up one blocked thread on this condvar.
pub fn notify_all(&self) Wakes up all blocked threads on this condvar.
more methods ... methods, descriptions, and examples
The following example, repeated from the thread Bite, uses a Condvar to implement a "Future" construct that is useful for returning results from threads.
Condvar Example ThreadResult<T> //////////////////////////////////////////////// // thread_result::lib.rs // // - Wait for thread to complete // // // // Jim Fawcett, https://JimFawcett.github.io // //////////////////////////////////////////////// #![allow(clippy::mutex_atomic)] #![allow(dead_code)] use std::sync::*; use std::fmt::Debug; #[derive(Debug, Default)] pub struct ThreadResult<T> { pub result: Mutex<T>, cv: Condvar, ready: Mutex<bool> } impl<T: Debug + Default + Clone> ThreadResult<T> { pub fn new() -> Self { Self { result: Mutex::new(T::default()), cv: Condvar::new(), ready: Mutex::new(false), } } /*-------------------------------------------- Unwrapping is appropriate here. The operation fails if the Mutex becomes poisoned, due to panic on a thread holding the lock. But then you can't do much except quit, which the unwrap does for you. --------------------------------------------*/ pub fn set(&self, t:T) { let mut lr = self.ready.lock().unwrap(); *lr = true; let mut lrslt = self.result.lock().unwrap(); *lrslt = t; self.cv.notify_all(); } pub fn get(&self) -> T { let mut rdy = self.ready.lock().unwrap(); while !*rdy { rdy = self.cv.wait(rdy).unwrap(); } let rslt = self.result.lock().unwrap(); rslt.clone() } pub fn ready(&self) -> bool { *self.ready.lock().unwrap() } } Using Code //////////////////////////////////////////////// // thread_result::test1.rs // // - basic ThreadResult test // // // // Jim Fawcett, https://JimFawcett.github.io // //////////////////////////////////////////////// use std::sync::Arc; use std::thread; use std::time::{Duration, Instant}; fn test() { let thrd_rslt = Arc::new(ThreadResult::<i32>::new()); let thrd_rslt1 = Arc::clone(&thrd_rslt); let thrd_rslt2 = Arc::clone(&thrd_rslt); let cls = |share:Arc<ThreadResult<i32>>| { print!("\n starting thread"); let dur = Duration::from_millis(100u64); thread::sleep(dur); share.set(42); }; let handle1 = thread::spawn( move || {cls(thrd_rslt1).clone()} ); print!("\n -- poll for result --"); let dur = Duration::from_millis(20); loop { print!("\n main polling"); if !thrd_rslt2.ready() { thread::sleep(dur); } else { print!( "\n thread result is {}\n", thrd_rslt2.get() ); break; } } print!("\n -- wait for result --"); let thrd_rslt = Arc::new(ThreadResult::<i32>::new()); let thrd_rslt1 = Arc::clone(&thrd_rslt); let thrd_rslt2 = Arc::clone(&thrd_rslt); let dur = Duration::from_millis(50); let now = Instant::now(); let handle2 = thread::spawn( move || {cls(thrd_rslt1).clone()} ); thread::sleep(dur); let elapsed = now.elapsed().as_millis(); print!( "\n after {:?} millis calling get()", elapsed ); let rslt = thrd_rslt2.get(); let elapsed = now.elapsed().as_millis(); print!( "\n at {:?} millis, thread result is {}", elapsed, rslt ); let _ = handle1.join(); let _ = handle2.join(); println!(); } fn main() { test(); } Output -- poll for result -- main polling starting thread main polling main polling main polling main polling main polling thread result is 42 -- wait for result -- starting thread after 50 millis calling get() at 100 millis, thread result is 42
Condvar example in playground

5.0  Atomics

Atomics are a group of thread-safe constructs including: AtomicBool, AtomicI32, AtomicU8, AtomicUsize, ...

Table 5. - AtomicBool and AtomicUsize Methods

MethodDescription
AtomicBool Ordering: Acquire, Relaxed, SeqCst
pub const fn new(v: bool) -> AtomicBool Creates a new AtomicBool.
pub fn load(&self, order: Ordering) -> bool Loads a value from the bool.
pub fn store(&self, val: bool, order: Ordering) Stores a value into its bool.
pub fn swap(&self, val: bool, order: Ordering) -> bool Stores value into its bool, returning previous value.
pub fn into_inner(self) -> bool Consumes atomic and returns contained value.
more methods ... methods, descriptions, and examples
AtomicUsize Ordering: Acquire, Relaxed, SeqCst
pub const fn new(v: usize) -> AtomicUsize Creates a new atomic integer.
pub fn load(&self, order: Ordering) -> usize Loads value from atomic integer.
pub fn store(&self, val: usize, order: Ordering) Stores value into atomic integer.
pub fn fetch_add(&self, val: usize, order: Ordering) -> usize Adds to the current value, returning the previous value.
pub fn into_inner(self) -> usize Consumes atomic and returns contained value.
more methods ... methods, descriptions, and examples
more types ... AtomicI8, ...
The following example, repeated from the thread Bite, uses an AtomicUsize to count child thread operations and share count value with main thread.
AtomicUsize Example Basic Thread Demo // Basic Threads // sharing std::output and AtomicUsize - both thread safe use std::thread; use std::time::Duration; use std::sync::atomic::{AtomicUsize, Ordering}; static COUNT: AtomicUsize = AtomicUsize::new(0usize); fn test() { // child thread processing let dur = Duration::from_millis(2); let handle = thread::spawn( move || { for _i in 0..10 { print!("\n child thread printing"); let _ = COUNT.fetch_add(1, Ordering::SeqCst); thread::sleep(dur); } } ); // main thread processing let dur = Duration::from_millis(3); for _i in 0..10 { print!("\n main thread printing"); let _ = COUNT.fetch_add(1, Ordering::SeqCst); thread::sleep(dur); } // wait for thread to complete let _ = handle.join(); print!("\n\n number of prints = {:?}", COUNT); } fn main() { print!("\n -- Basic Threads --\n"); test(); print!("\n\n That's all Folks!\n\n"); } Output -- Basic Threads -- main thread printing child thread printing child thread printing child thread printing main thread printing child thread printing main thread printing child thread printing main thread printing child thread printing child thread printing main thread printing child thread printing main thread printing child thread printing child thread printing main thread printing main thread printing main thread printing main thread printing number of prints = 20 That's all Folks!
code in playground

6.0  Barrier

Barriers enable multiple threads to synchronize beginning of their computations. All waiting threads start at essentially the same time. This avoids a thread, collaborating with several other threads, from starting before the others are ready.

Table 6. - Barrier Methods

MethodDescription
pub fn new(n: usize) -> Barrier Creates a new barrier that can block a given number of threads.
pub fn wait(&self) -> BarrierWaitResult Blocks the current thread until all threads have rendezvoused here.
Barrier Example Barrier Example //////////////////////////////////////////////// // Barrier Demos // // - Wait for all threads to start // // // // Jim Fawcett, https://JimFawcett.github.io // //////////////////////////////////////////////// use std::sync::{Arc, Barrier}; use std::thread; use std::thread::JoinHandle; fn main() { let mut handles:Vec<JoinHandle<()>> = Vec::with_capacity(5); let barrier = Arc::new(Barrier::new(5)); for i in 0..5 { let c:Arc<Barrier> = barrier.clone(); handles.push(thread::spawn(move|| { println!( "Thread {} paused...awaiting other threads", i ); c.wait(); println!("Thread {} processing done", i); })); } // Wait for all threads to complete execution for handle in handles { handle.join().unwrap(); } } Output Thread 0 paused...awaiting other threads Thread 1 paused...awaiting other threads Thread 2 paused...awaiting other threads Thread 3 paused...awaiting other threads Thread 4 paused...awaiting other threads Thread 0 processing done Thread 4 processing done Thread 1 processing done Thread 3 processing done Thread 2 processing done
Barrier example in playground
  Next Prev Pages Sections About Keys