1 use crate::rt::object; 2 use crate::rt::{self, Access, Synchronize, VersionVec}; 3 4 use std::sync::atomic::Ordering::{Acquire, Release}; 5 6 #[derive(Debug, Copy, Clone)] 7 pub(crate) struct Notify { 8 state: object::Ref<State>, 9 } 10 11 #[derive(Debug)] 12 pub(super) struct State { 13 /// If true, spurious notifications are possible 14 spurious: bool, 15 16 /// True if the notify woke up spuriously last time 17 did_spur: bool, 18 19 /// When true, notification is sequentiall consistent. 20 seq_cst: bool, 21 22 /// `true` if there is a pending notification to consume. 23 notified: bool, 24 25 /// Tracks access to the notify object 26 last_access: Option<Access>, 27 28 /// Causality transfers between threads 29 synchronize: Synchronize, 30 } 31 32 impl Notify { new(seq_cst: bool, spurious: bool) -> Notify33 pub(crate) fn new(seq_cst: bool, spurious: bool) -> Notify { 34 super::execution(|execution| { 35 let state = execution.objects.insert(State { 36 spurious, 37 did_spur: false, 38 seq_cst, 39 notified: false, 40 last_access: None, 41 synchronize: Synchronize::new(), 42 }); 43 44 Notify { state } 45 }) 46 } 47 notify(self)48 pub(crate) fn notify(self) { 49 self.state.branch_opaque(); 50 51 rt::execution(|execution| { 52 let state = self.state.get_mut(&mut execution.objects); 53 54 state 55 .synchronize 56 .sync_store(&mut execution.threads, Release); 57 58 if state.seq_cst { 59 execution.threads.seq_cst(); 60 } 61 62 state.notified = true; 63 64 let (active, inactive) = execution.threads.split_active(); 65 66 for thread in inactive { 67 let obj = thread 68 .operation 69 .as_ref() 70 .map(|operation| operation.object()); 71 72 if obj == Some(self.state.erase()) { 73 thread.unpark(active); 74 } 75 } 76 }); 77 } 78 wait(self)79 pub(crate) fn wait(self) { 80 let (notified, spurious) = rt::execution(|execution| { 81 let spurious = if self.state.get(&execution.objects).might_spur() { 82 execution.path.branch_spurious() 83 } else { 84 false 85 }; 86 87 let state = self.state.get_mut(&mut execution.objects); 88 89 if spurious { 90 state.did_spur = true; 91 } 92 93 dbg!((state.notified, spurious)) 94 }); 95 96 if spurious { 97 rt::yield_now(); 98 return; 99 } 100 101 if notified { 102 self.state.branch_opaque(); 103 } else { 104 // This should become branch_disable 105 self.state.branch_acquire(true) 106 } 107 108 // Thread was notified 109 super::execution(|execution| { 110 let state = self.state.get_mut(&mut execution.objects); 111 112 assert!(state.notified); 113 114 state.synchronize.sync_load(&mut execution.threads, Acquire); 115 116 if state.seq_cst { 117 // Establish sequential consistency between locks 118 execution.threads.seq_cst(); 119 } 120 121 state.notified = false; 122 }); 123 } 124 } 125 126 impl State { might_spur(&self) -> bool127 pub(crate) fn might_spur(&self) -> bool { 128 self.spurious && !self.did_spur 129 } 130 last_dependent_access(&self) -> Option<&Access>131 pub(crate) fn last_dependent_access(&self) -> Option<&Access> { 132 self.last_access.as_ref() 133 } 134 set_last_access(&mut self, path_id: usize, version: &VersionVec)135 pub(crate) fn set_last_access(&mut self, path_id: usize, version: &VersionVec) { 136 Access::set_or_create(&mut self.last_access, path_id, version); 137 } 138 } 139