1 // Copyright 2016 Amanieu d'Antras 2 // 3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or 4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or 5 // http://opensource.org/licenses/MIT>, at your option. This file may not be 6 // copied, modified, or distributed except according to those terms. 7 8 use core::{ 9 mem, 10 sync::atomic::{AtomicUsize, Ordering}, 11 }; 12 use instant::Instant; 13 use winapi::{ 14 shared::{ 15 basetsd::SIZE_T, 16 minwindef::{BOOL, DWORD, FALSE, TRUE}, 17 winerror::ERROR_TIMEOUT, 18 }, 19 um::{ 20 errhandlingapi::GetLastError, 21 libloaderapi::{GetModuleHandleA, GetProcAddress}, 22 winbase::INFINITE, 23 winnt::{LPCSTR, PVOID}, 24 }, 25 }; 26 27 #[allow(non_snake_case)] 28 pub struct WaitAddress { 29 WaitOnAddress: extern "system" fn( 30 Address: PVOID, 31 CompareAddress: PVOID, 32 AddressSize: SIZE_T, 33 dwMilliseconds: DWORD, 34 ) -> BOOL, 35 WakeByAddressSingle: extern "system" fn(Address: PVOID), 36 } 37 38 impl WaitAddress { 39 #[allow(non_snake_case)] create() -> Option<WaitAddress>40 pub fn create() -> Option<WaitAddress> { 41 unsafe { 42 // MSDN claims that that WaitOnAddress and WakeByAddressSingle are 43 // located in kernel32.dll, but they are lying... 44 let synch_dll = 45 GetModuleHandleA(b"api-ms-win-core-synch-l1-2-0.dll\0".as_ptr() as LPCSTR); 46 if synch_dll.is_null() { 47 return None; 48 } 49 50 let WaitOnAddress = GetProcAddress(synch_dll, b"WaitOnAddress\0".as_ptr() as LPCSTR); 51 if WaitOnAddress.is_null() { 52 return None; 53 } 54 let WakeByAddressSingle = 55 GetProcAddress(synch_dll, b"WakeByAddressSingle\0".as_ptr() as LPCSTR); 56 if WakeByAddressSingle.is_null() { 57 return None; 58 } 59 Some(WaitAddress { 60 WaitOnAddress: mem::transmute(WaitOnAddress), 61 WakeByAddressSingle: mem::transmute(WakeByAddressSingle), 62 }) 63 } 64 } 65 66 #[inline] prepare_park(&'static self, key: &AtomicUsize)67 pub fn prepare_park(&'static self, key: &AtomicUsize) { 68 key.store(1, Ordering::Relaxed); 69 } 70 71 #[inline] timed_out(&'static self, key: &AtomicUsize) -> bool72 pub fn timed_out(&'static self, key: &AtomicUsize) -> bool { 73 key.load(Ordering::Relaxed) != 0 74 } 75 76 #[inline] park(&'static self, key: &AtomicUsize)77 pub fn park(&'static self, key: &AtomicUsize) { 78 while key.load(Ordering::Acquire) != 0 { 79 let r = self.wait_on_address(key, INFINITE); 80 debug_assert!(r == TRUE); 81 } 82 } 83 84 #[inline] park_until(&'static self, key: &AtomicUsize, timeout: Instant) -> bool85 pub fn park_until(&'static self, key: &AtomicUsize, timeout: Instant) -> bool { 86 while key.load(Ordering::Acquire) != 0 { 87 let now = Instant::now(); 88 if timeout <= now { 89 return false; 90 } 91 let diff = timeout - now; 92 let timeout = diff 93 .as_secs() 94 .checked_mul(1000) 95 .and_then(|x| x.checked_add((diff.subsec_nanos() as u64 + 999999) / 1000000)) 96 .map(|ms| { 97 if ms > <DWORD>::max_value() as u64 { 98 INFINITE 99 } else { 100 ms as DWORD 101 } 102 }) 103 .unwrap_or(INFINITE); 104 if self.wait_on_address(key, timeout) == FALSE { 105 debug_assert_eq!(unsafe { GetLastError() }, ERROR_TIMEOUT); 106 } 107 } 108 true 109 } 110 111 #[inline] unpark_lock(&'static self, key: &AtomicUsize) -> UnparkHandle112 pub fn unpark_lock(&'static self, key: &AtomicUsize) -> UnparkHandle { 113 // We don't need to lock anything, just clear the state 114 key.store(0, Ordering::Release); 115 116 UnparkHandle { 117 key: key, 118 waitaddress: self, 119 } 120 } 121 122 #[inline] wait_on_address(&'static self, key: &AtomicUsize, timeout: DWORD) -> BOOL123 fn wait_on_address(&'static self, key: &AtomicUsize, timeout: DWORD) -> BOOL { 124 let cmp = 1usize; 125 (self.WaitOnAddress)( 126 key as *const _ as PVOID, 127 &cmp as *const _ as PVOID, 128 mem::size_of::<usize>() as SIZE_T, 129 timeout, 130 ) 131 } 132 } 133 134 // Handle for a thread that is about to be unparked. We need to mark the thread 135 // as unparked while holding the queue lock, but we delay the actual unparking 136 // until after the queue lock is released. 137 pub struct UnparkHandle { 138 key: *const AtomicUsize, 139 waitaddress: &'static WaitAddress, 140 } 141 142 impl UnparkHandle { 143 // Wakes up the parked thread. This should be called after the queue lock is 144 // released to avoid blocking the queue for too long. 145 #[inline] unpark(self)146 pub fn unpark(self) { 147 (self.waitaddress.WakeByAddressSingle)(self.key as PVOID); 148 } 149 } 150