1 use crate::rt::object; 2 use crate::rt::{thread, Access, Synchronize, VersionVec}; 3 4 use std::sync::atomic::Ordering::{Acquire, Release}; 5 6 #[derive(Debug, Copy, Clone)] 7 pub(crate) struct Mutex { 8 state: object::Ref<State>, 9 } 10 11 #[derive(Debug)] 12 pub(super) struct State { 13 /// If the mutex should establish sequential consistency. 14 seq_cst: bool, 15 16 /// `Some` when the mutex is in the locked state. The `thread::Id` 17 /// references the thread that currently holds the mutex. 18 lock: Option<thread::Id>, 19 20 /// Tracks access to the mutex 21 last_access: Option<Access>, 22 23 /// Causality transfers between threads 24 synchronize: Synchronize, 25 } 26 27 impl Mutex { new(seq_cst: bool) -> Mutex28 pub(crate) fn new(seq_cst: bool) -> Mutex { 29 super::execution(|execution| { 30 let state = execution.objects.insert(State { 31 seq_cst, 32 lock: None, 33 last_access: None, 34 synchronize: Synchronize::new(), 35 }); 36 37 Mutex { state } 38 }) 39 } 40 acquire_lock(&self)41 pub(crate) fn acquire_lock(&self) { 42 self.state.branch_acquire(self.is_locked()); 43 assert!(self.post_acquire(), "expected to be able to acquire lock"); 44 } 45 try_acquire_lock(&self) -> bool46 pub(crate) fn try_acquire_lock(&self) -> bool { 47 self.state.branch_opaque(); 48 self.post_acquire() 49 } 50 release_lock(&self)51 pub(crate) fn release_lock(&self) { 52 super::execution(|execution| { 53 let state = self.state.get_mut(&mut execution.objects); 54 55 // Release the lock flag 56 state.lock = None; 57 58 state 59 .synchronize 60 .sync_store(&mut execution.threads, Release); 61 62 if state.seq_cst { 63 // Establish sequential consistency between the lock's operations. 64 execution.threads.seq_cst(); 65 } 66 67 let thread_id = execution.threads.active_id(); 68 69 for (id, thread) in execution.threads.iter_mut() { 70 if id == thread_id { 71 continue; 72 } 73 74 let obj = thread 75 .operation 76 .as_ref() 77 .map(|operation| operation.object()); 78 79 if obj == Some(self.state.erase()) { 80 thread.set_runnable(); 81 } 82 } 83 }); 84 } 85 post_acquire(&self) -> bool86 fn post_acquire(&self) -> bool { 87 super::execution(|execution| { 88 let state = self.state.get_mut(&mut execution.objects); 89 let thread_id = execution.threads.active_id(); 90 91 if state.lock.is_some() { 92 return false; 93 } 94 95 // Set the lock to the current thread 96 state.lock = Some(thread_id); 97 98 dbg!(state.synchronize.sync_load(&mut execution.threads, Acquire)); 99 100 if state.seq_cst { 101 // Establish sequential consistency between locks 102 execution.threads.seq_cst(); 103 } 104 105 // Block all **other** threads attempting to acquire the mutex 106 for (id, thread) in execution.threads.iter_mut() { 107 if id == thread_id { 108 continue; 109 } 110 111 let obj = thread 112 .operation 113 .as_ref() 114 .map(|operation| operation.object()); 115 116 if obj == Some(self.state.erase()) { 117 thread.set_blocked(); 118 } 119 } 120 121 true 122 }) 123 } 124 125 /// Returns `true` if the mutex is currently locked is_locked(&self) -> bool126 fn is_locked(&self) -> bool { 127 super::execution(|execution| self.state.get(&execution.objects).lock.is_some()) 128 } 129 } 130 131 impl State { last_dependent_access(&self) -> Option<&Access>132 pub(crate) fn last_dependent_access(&self) -> Option<&Access> { 133 self.last_access.as_ref() 134 } 135 set_last_access(&mut self, path_id: usize, version: &VersionVec)136 pub(crate) fn set_last_access(&mut self, path_id: usize, version: &VersionVec) { 137 Access::set_or_create(&mut self.last_access, path_id, version); 138 } 139 } 140