1 use crate::rt::object;
2 use crate::rt::{self, Access, Synchronize, VersionVec};
3 
4 use std::sync::atomic::Ordering::{Acquire, Release};
5 
6 #[derive(Debug, Copy, Clone)]
7 pub(crate) struct Notify {
8     state: object::Ref<State>,
9 }
10 
11 #[derive(Debug)]
12 pub(super) struct State {
13     /// If true, spurious notifications are possible
14     spurious: bool,
15 
16     /// True if the notify woke up spuriously last time
17     did_spur: bool,
18 
19     /// When true, notification is sequentiall consistent.
20     seq_cst: bool,
21 
22     /// `true` if there is a pending notification to consume.
23     notified: bool,
24 
25     /// Tracks access to the notify object
26     last_access: Option<Access>,
27 
28     /// Causality transfers between threads
29     synchronize: Synchronize,
30 }
31 
32 impl Notify {
new(seq_cst: bool, spurious: bool) -> Notify33     pub(crate) fn new(seq_cst: bool, spurious: bool) -> Notify {
34         super::execution(|execution| {
35             let state = execution.objects.insert(State {
36                 spurious,
37                 did_spur: false,
38                 seq_cst,
39                 notified: false,
40                 last_access: None,
41                 synchronize: Synchronize::new(),
42             });
43 
44             Notify { state }
45         })
46     }
47 
notify(self)48     pub(crate) fn notify(self) {
49         self.state.branch_opaque();
50 
51         rt::execution(|execution| {
52             let state = self.state.get_mut(&mut execution.objects);
53 
54             state
55                 .synchronize
56                 .sync_store(&mut execution.threads, Release);
57 
58             if state.seq_cst {
59                 execution.threads.seq_cst();
60             }
61 
62             state.notified = true;
63 
64             let (active, inactive) = execution.threads.split_active();
65 
66             for thread in inactive {
67                 let obj = thread
68                     .operation
69                     .as_ref()
70                     .map(|operation| operation.object());
71 
72                 if obj == Some(self.state.erase()) {
73                     thread.unpark(active);
74                 }
75             }
76         });
77     }
78 
wait(self)79     pub(crate) fn wait(self) {
80         let (notified, spurious) = rt::execution(|execution| {
81             let spurious = if self.state.get(&execution.objects).might_spur() {
82                 execution.path.branch_spurious()
83             } else {
84                 false
85             };
86 
87             let state = self.state.get_mut(&mut execution.objects);
88 
89             if spurious {
90                 state.did_spur = true;
91             }
92 
93             dbg!((state.notified, spurious))
94         });
95 
96         if spurious {
97             rt::yield_now();
98             return;
99         }
100 
101         if notified {
102             self.state.branch_opaque();
103         } else {
104             // This should become branch_disable
105             self.state.branch_acquire(true)
106         }
107 
108         // Thread was notified
109         super::execution(|execution| {
110             let state = self.state.get_mut(&mut execution.objects);
111 
112             assert!(state.notified);
113 
114             state.synchronize.sync_load(&mut execution.threads, Acquire);
115 
116             if state.seq_cst {
117                 // Establish sequential consistency between locks
118                 execution.threads.seq_cst();
119             }
120 
121             state.notified = false;
122         });
123     }
124 }
125 
126 impl State {
might_spur(&self) -> bool127     pub(crate) fn might_spur(&self) -> bool {
128         self.spurious && !self.did_spur
129     }
130 
last_dependent_access(&self) -> Option<&Access>131     pub(crate) fn last_dependent_access(&self) -> Option<&Access> {
132         self.last_access.as_ref()
133     }
134 
set_last_access(&mut self, path_id: usize, version: &VersionVec)135     pub(crate) fn set_last_access(&mut self, path_id: usize, version: &VersionVec) {
136         Access::set_or_create(&mut self.last_access, path_id, version);
137     }
138 }
139