1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7 
8 use core::{
9     mem,
10     sync::atomic::{AtomicUsize, Ordering},
11 };
12 use instant::Instant;
13 use winapi::{
14     shared::{
15         basetsd::SIZE_T,
16         minwindef::{BOOL, DWORD, FALSE, TRUE},
17         winerror::ERROR_TIMEOUT,
18     },
19     um::{
20         errhandlingapi::GetLastError,
21         libloaderapi::{GetModuleHandleA, GetProcAddress},
22         winbase::INFINITE,
23         winnt::{LPCSTR, PVOID},
24     },
25 };
26 
27 #[allow(non_snake_case)]
28 pub struct WaitAddress {
29     WaitOnAddress: extern "system" fn(
30         Address: PVOID,
31         CompareAddress: PVOID,
32         AddressSize: SIZE_T,
33         dwMilliseconds: DWORD,
34     ) -> BOOL,
35     WakeByAddressSingle: extern "system" fn(Address: PVOID),
36 }
37 
38 impl WaitAddress {
39     #[allow(non_snake_case)]
create() -> Option<WaitAddress>40     pub fn create() -> Option<WaitAddress> {
41         unsafe {
42             // MSDN claims that that WaitOnAddress and WakeByAddressSingle are
43             // located in kernel32.dll, but they are lying...
44             let synch_dll =
45                 GetModuleHandleA(b"api-ms-win-core-synch-l1-2-0.dll\0".as_ptr() as LPCSTR);
46             if synch_dll.is_null() {
47                 return None;
48             }
49 
50             let WaitOnAddress = GetProcAddress(synch_dll, b"WaitOnAddress\0".as_ptr() as LPCSTR);
51             if WaitOnAddress.is_null() {
52                 return None;
53             }
54             let WakeByAddressSingle =
55                 GetProcAddress(synch_dll, b"WakeByAddressSingle\0".as_ptr() as LPCSTR);
56             if WakeByAddressSingle.is_null() {
57                 return None;
58             }
59             Some(WaitAddress {
60                 WaitOnAddress: mem::transmute(WaitOnAddress),
61                 WakeByAddressSingle: mem::transmute(WakeByAddressSingle),
62             })
63         }
64     }
65 
66     #[inline]
prepare_park(&'static self, key: &AtomicUsize)67     pub fn prepare_park(&'static self, key: &AtomicUsize) {
68         key.store(1, Ordering::Relaxed);
69     }
70 
71     #[inline]
timed_out(&'static self, key: &AtomicUsize) -> bool72     pub fn timed_out(&'static self, key: &AtomicUsize) -> bool {
73         key.load(Ordering::Relaxed) != 0
74     }
75 
76     #[inline]
park(&'static self, key: &AtomicUsize)77     pub fn park(&'static self, key: &AtomicUsize) {
78         while key.load(Ordering::Acquire) != 0 {
79             let r = self.wait_on_address(key, INFINITE);
80             debug_assert!(r == TRUE);
81         }
82     }
83 
84     #[inline]
park_until(&'static self, key: &AtomicUsize, timeout: Instant) -> bool85     pub fn park_until(&'static self, key: &AtomicUsize, timeout: Instant) -> bool {
86         while key.load(Ordering::Acquire) != 0 {
87             let now = Instant::now();
88             if timeout <= now {
89                 return false;
90             }
91             let diff = timeout - now;
92             let timeout = diff
93                 .as_secs()
94                 .checked_mul(1000)
95                 .and_then(|x| x.checked_add((diff.subsec_nanos() as u64 + 999999) / 1000000))
96                 .map(|ms| {
97                     if ms > <DWORD>::max_value() as u64 {
98                         INFINITE
99                     } else {
100                         ms as DWORD
101                     }
102                 })
103                 .unwrap_or(INFINITE);
104             if self.wait_on_address(key, timeout) == FALSE {
105                 debug_assert_eq!(unsafe { GetLastError() }, ERROR_TIMEOUT);
106             }
107         }
108         true
109     }
110 
111     #[inline]
unpark_lock(&'static self, key: &AtomicUsize) -> UnparkHandle112     pub fn unpark_lock(&'static self, key: &AtomicUsize) -> UnparkHandle {
113         // We don't need to lock anything, just clear the state
114         key.store(0, Ordering::Release);
115 
116         UnparkHandle {
117             key: key,
118             waitaddress: self,
119         }
120     }
121 
122     #[inline]
wait_on_address(&'static self, key: &AtomicUsize, timeout: DWORD) -> BOOL123     fn wait_on_address(&'static self, key: &AtomicUsize, timeout: DWORD) -> BOOL {
124         let cmp = 1usize;
125         (self.WaitOnAddress)(
126             key as *const _ as PVOID,
127             &cmp as *const _ as PVOID,
128             mem::size_of::<usize>() as SIZE_T,
129             timeout,
130         )
131     }
132 }
133 
134 // Handle for a thread that is about to be unparked. We need to mark the thread
135 // as unparked while holding the queue lock, but we delay the actual unparking
136 // until after the queue lock is released.
137 pub struct UnparkHandle {
138     key: *const AtomicUsize,
139     waitaddress: &'static WaitAddress,
140 }
141 
142 impl UnparkHandle {
143     // Wakes up the parked thread. This should be called after the queue lock is
144     // released to avoid blocking the queue for too long.
145     #[inline]
unpark(self)146     pub fn unpark(self) {
147         (self.waitaddress.WakeByAddressSingle)(self.key as PVOID);
148     }
149 }
150