Demilade Sonuga's blog

All posts

Mutex

2023-05-27

In this post, we're going to implement a Mutex.

What Needs To be Done

A Mutex is a structure for protecting shared data by ensuring that only one thread/line of execution can access it at once.

Our aim here is to create a safe abstraction that allows for reading and mutating a shared value and enforces a restriction of one access at a time.

A typical use of Mutex should look like this:

static KEYBOARD: Mutex<Keyboard> = Mutex::new(Keyboard::new());

fn do_something() {
    KEYBOARD.lock().process_byte(scancode); // Function that mutates KEYBOARD
}

Think about how you'll implement this yourself before continuing.

Implementation

Create a new file mutex.rs in blasterball/src/sync.

In sync/mod.rs:

pub mod once;
pub mod mutex; // NEW

In sync/mutex.rs:

// A mutual exclusion primitive
pub struct Mutex<T> {

}

Unlike the Once, the Mutex will be given an already initialized value when we're creating it. The Mutex will take ownership of the value and act as a mediator between the value and any other client code that tries to access it.

Firstly, for the Mutex to be able to obtain ownership of a value, it should first have a space for storing that value:

use core::cell::UnsafeCell;
pub struct Mutex<T> {
    // Space for the data to be protected behind the Mutex
    data: UnsafeCell<T>,
}

The data field provides that space. It's of type UnsafeCell, which is the fundamental primitive for building abstractions with interior mutability.

Another thing we're going to need is a field that tells whether or not the Mutex has been locked by any code.

use core::sync::atomic::AtomicBool;
pub struct Mutex<T> {
    data: UnsafeCell<T>,
    // Tells if the Mutex has been locked by any code
    locked: AtomicBool
}

To create a new Mutex:

impl<T> Mutex<T> {
    // Creates a new Mutex
    pub const fn new(val: T) -> Self {
        Self {
            data: UnsafeCell::new(val),
            locked: AtomicBool::new(false)
        }
    }
}

The new function is declared const so that it can be used to initialize statics.

To lock a Mutex (get the value inside), we have to think about two things:

  1. What if the value is already been used by some other code?
  2. How can we know when the value is no longer being used, so that the lock can be freed?

The answer to question 1 is the locked field of Mutex. When the value is locked, it's set to true. And if any code calls lock while the locked field is true, then it will wait in a loop until the locked value becomes false again.

But the answer to question 2, how to know when the lock can be freed, is a little trickier. We can't just give the user a mutable reference to the data. If we do that, then there is no way to know whether or not the user still has that reference when another line of execution calls lock again. Doing this could even lead to a scenario where there are two mutable references active at the same time, completely violating Rust's rules and safety guarantees which will definitely lead to serious headaches later.

The idea here is that we just want to give the calling code temporary access to the value and then set the locked field of the original Mutex back to false when the calling code is done using it.

One solution will be to create a new structure, let's call it MutexGuard. An instance of this struct will hold a mutable reference to the data stored in the Mutex and a reference to the locked value of the Mutex. The calling code can use the mutable reference to access the data it needs to access.

To tell the Mutex that the calling code is done, the MutexGuard will have a Drop implementation that uses the locked reference to set locked back to false.

use core::sync::atomic::Ordering;
// A structure to give temporary access to data in a Mutex
pub struct MutexGuard<'a, T> {
    data: &'a mut T,
    locked: &'a AtomicBool
}

impl<'a, T> Drop for MutexGuard<'a, T> {
    // Releases the lock
    fn drop(&mut self) {
        self.locked.store(false, Ordering::Release);
    }
}

If you're not really familiar with memory ordering, you should check the references to learn about it.

If you take a good look at the code, you'll notice that data, the mutable reference in MutexGuard is not public. We reason is that we can allow the reference to be copied out by calling code to avoid the risk of having two mutable references active at once.

Instead, we implement traits Deref and DerefMut, so that the calling code will be able to read and mutate the data without being able to access any of these sensitive fields.

impl<'a, T> core::ops::Deref for MutexGuard<'a, T> {
    type Target = T;

    fn deref(&self) -> &T {
        self.data
    }
}

impl<'a, T> core::ops::DerefMut for MutexGuard<'a, T> {
    fn deref_mut(&mut self) -> &mut T {
        self.data
    }
}

Lastly, we need a function in Mutex that can actually be used to obtain a MutexGuard:

impl<T> Mutex<T> {
    // Obtains mutually exclusive access to the data in the Mutex
    pub fn lock(&self) -> MutexGuard<T> {
        while self.locked.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
            // Signal the processor to go into an efficient loop
            core::hint::spin_loop();
        }
        MutexGuard {
            data: unsafe { &mut *self.data.get() },
            locked: &self.locked
        }
    }
}

The AtomicBool's compare_exchange function called above atomically does the following:

  1. Check if locked is true.
  2. If it's true return an Err.
  3. If it's false, set it to true and return an Ok.

If the compare_exchange function returns an Err, the core::hint::spin_loop will tell the processor that it's in a busy waiting loop. Busy waiting is what happens when the processor is in a loop continuously checking for a condition to become true before continuing useful processing.

It's after this wait for locked to become false that the MutexGuard is created and returned to the calling code.

To complete the implementation, we also need to tell the compiler that Mutex is safe to share and move between threads. To do that:

unsafe impl<T: Sync> Sync for Mutex<T> {}
unsafe impl<T: Send> Send for Mutex<T> {}

The Send trait tells the compiler that it is safe to move values of a type between threads. The Sync trait tells the compiler that it is safe to share values of a type between threads.

The trait bounds specified tell the compiler that it's only safe to move or share a mutex between threads only if the same thing applies to the data it holds.

Testing

As a quick test, we'll just create a single static and mutate it in several threads.

#[cfg(test)]
mod tests {
    use super::*;
    
    #[test]
    fn test_mutex() {
        static NO: Mutex<i32> = Mutex::new(0);
        let mut handles = vec![];
        for _ in 0..10 {
            let handle = std::thread::spawn(|| {
                *NO.lock() += 1;
            });
            handles.push(handle);
        }
        handles.into_iter().for_each(|handle| { handle.join(); });
        assert_eq!(*NO.lock(), 10);
    }
}

This test spawns 10 threads that concurrently try to mutate NO. If the Mutex works, then it will synchronize accesses to the static enforcing only one thread to access it at once. The value at the end should be 10.

That brings us to the end of our Mutex implementation. In the next post, we're going to replace mutable statics with Onces and Mutexes.

Take Away

For the full code up to this point, go to the repo

In The Next Post

We'll be refactoring.

References

  • UnsafeCell docs: https://doc.rust-lang.org/core/cell/struct.UnsafeCell.html
  • std::sync::Mutex docs: https://doc.rust-lang.org/std/sync/struct.Mutex.html
  • core::sync::atomic::AtomicBool docs: https://doc.rust-lang.org/core/sync/atomic/struct.AtomicBool.html