1 use crate::latch::Latch; 2 use crate::unwind; 3 use crossbeam_deque::{Injector, Steal}; 4 use std::any::Any; 5 use std::cell::UnsafeCell; 6 use std::mem; 7 8 pub(super) enum JobResult<T> { 9 None, 10 Ok(T), 11 Panic(Box<dyn Any + Send>), 12 } 13 14 /// A `Job` is used to advertise work for other threads that they may 15 /// want to steal. In accordance with time honored tradition, jobs are 16 /// arranged in a deque, so that thieves can take from the top of the 17 /// deque while the main worker manages the bottom of the deque. This 18 /// deque is managed by the `thread_pool` module. 19 pub(super) trait Job { 20 /// Unsafe: this may be called from a different thread than the one 21 /// which scheduled the job, so the implementer must ensure the 22 /// appropriate traits are met, whether `Send`, `Sync`, or both. execute(this: *const Self)23 unsafe fn execute(this: *const Self); 24 } 25 26 /// Effectively a Job trait object. Each JobRef **must** be executed 27 /// exactly once, or else data may leak. 28 /// 29 /// Internally, we store the job's data in a `*const ()` pointer. The 30 /// true type is something like `*const StackJob<...>`, but we hide 31 /// it. We also carry the "execute fn" from the `Job` trait. 32 #[derive(Copy, Clone, Debug, PartialEq, Eq)] 33 pub(super) struct JobRef { 34 pointer: *const (), 35 execute_fn: unsafe fn(*const ()), 36 } 37 38 unsafe impl Send for JobRef {} 39 unsafe impl Sync for JobRef {} 40 41 impl JobRef { 42 /// Unsafe: caller asserts that `data` will remain valid until the 43 /// job is executed. new<T>(data: *const T) -> JobRef where T: Job,44 pub(super) unsafe fn new<T>(data: *const T) -> JobRef 45 where 46 T: Job, 47 { 48 let fn_ptr: unsafe fn(*const T) = <T as Job>::execute; 49 50 // erase types: 51 JobRef { 52 pointer: data as *const (), 53 execute_fn: mem::transmute(fn_ptr), 54 } 55 } 56 57 #[inline] execute(&self)58 pub(super) unsafe fn execute(&self) { 59 (self.execute_fn)(self.pointer) 60 } 61 } 62 63 /// A job that will be owned by a stack slot. This means that when it 64 /// executes it need not free any heap data, the cleanup occurs when 65 /// the stack frame is later popped. The function parameter indicates 66 /// `true` if the job was stolen -- executed on a different thread. 67 pub(super) struct StackJob<L, F, R> 68 where 69 L: Latch + Sync, 70 F: FnOnce(bool) -> R + Send, 71 R: Send, 72 { 73 pub(super) latch: L, 74 func: UnsafeCell<Option<F>>, 75 result: UnsafeCell<JobResult<R>>, 76 } 77 78 impl<L, F, R> StackJob<L, F, R> 79 where 80 L: Latch + Sync, 81 F: FnOnce(bool) -> R + Send, 82 R: Send, 83 { new(func: F, latch: L) -> StackJob<L, F, R>84 pub(super) fn new(func: F, latch: L) -> StackJob<L, F, R> { 85 StackJob { 86 latch, 87 func: UnsafeCell::new(Some(func)), 88 result: UnsafeCell::new(JobResult::None), 89 } 90 } 91 as_job_ref(&self) -> JobRef92 pub(super) unsafe fn as_job_ref(&self) -> JobRef { 93 JobRef::new(self) 94 } 95 run_inline(self, stolen: bool) -> R96 pub(super) unsafe fn run_inline(self, stolen: bool) -> R { 97 self.func.into_inner().unwrap()(stolen) 98 } 99 into_result(self) -> R100 pub(super) unsafe fn into_result(self) -> R { 101 self.result.into_inner().into_return_value() 102 } 103 } 104 105 impl<L, F, R> Job for StackJob<L, F, R> 106 where 107 L: Latch + Sync, 108 F: FnOnce(bool) -> R + Send, 109 R: Send, 110 { execute(this: *const Self)111 unsafe fn execute(this: *const Self) { 112 fn call<R>(func: impl FnOnce(bool) -> R) -> impl FnOnce() -> R { 113 move || func(true) 114 } 115 116 let this = &*this; 117 let abort = unwind::AbortIfPanic; 118 let func = (*this.func.get()).take().unwrap(); 119 (*this.result.get()) = match unwind::halt_unwinding(call(func)) { 120 Ok(x) => JobResult::Ok(x), 121 Err(x) => JobResult::Panic(x), 122 }; 123 this.latch.set(); 124 mem::forget(abort); 125 } 126 } 127 128 /// Represents a job stored in the heap. Used to implement 129 /// `scope`. Unlike `StackJob`, when executed, `HeapJob` simply 130 /// invokes a closure, which then triggers the appropriate logic to 131 /// signal that the job executed. 132 /// 133 /// (Probably `StackJob` should be refactored in a similar fashion.) 134 pub(super) struct HeapJob<BODY> 135 where 136 BODY: FnOnce() + Send, 137 { 138 job: UnsafeCell<Option<BODY>>, 139 } 140 141 impl<BODY> HeapJob<BODY> 142 where 143 BODY: FnOnce() + Send, 144 { new(func: BODY) -> Self145 pub(super) fn new(func: BODY) -> Self { 146 HeapJob { 147 job: UnsafeCell::new(Some(func)), 148 } 149 } 150 151 /// Creates a `JobRef` from this job -- note that this hides all 152 /// lifetimes, so it is up to you to ensure that this JobRef 153 /// doesn't outlive any data that it closes over. as_job_ref(self: Box<Self>) -> JobRef154 pub(super) unsafe fn as_job_ref(self: Box<Self>) -> JobRef { 155 let this: *const Self = mem::transmute(self); 156 JobRef::new(this) 157 } 158 } 159 160 impl<BODY> Job for HeapJob<BODY> 161 where 162 BODY: FnOnce() + Send, 163 { execute(this: *const Self)164 unsafe fn execute(this: *const Self) { 165 let this: Box<Self> = mem::transmute(this); 166 let job = (*this.job.get()).take().unwrap(); 167 job(); 168 } 169 } 170 171 impl<T> JobResult<T> { 172 /// Convert the `JobResult` for a job that has finished (and hence 173 /// its JobResult is populated) into its return value. 174 /// 175 /// NB. This will panic if the job panicked. into_return_value(self) -> T176 pub(super) fn into_return_value(self) -> T { 177 match self { 178 JobResult::None => unreachable!(), 179 JobResult::Ok(x) => x, 180 JobResult::Panic(x) => unwind::resume_unwinding(x), 181 } 182 } 183 } 184 185 /// Indirect queue to provide FIFO job priority. 186 pub(super) struct JobFifo { 187 inner: Injector<JobRef>, 188 } 189 190 impl JobFifo { new() -> Self191 pub(super) fn new() -> Self { 192 JobFifo { 193 inner: Injector::new(), 194 } 195 } 196 push(&self, job_ref: JobRef) -> JobRef197 pub(super) unsafe fn push(&self, job_ref: JobRef) -> JobRef { 198 // A little indirection ensures that spawns are always prioritized in FIFO order. The 199 // jobs in a thread's deque may be popped from the back (LIFO) or stolen from the front 200 // (FIFO), but either way they will end up popping from the front of this queue. 201 self.inner.push(job_ref); 202 JobRef::new(self) 203 } 204 } 205 206 impl Job for JobFifo { execute(this: *const Self)207 unsafe fn execute(this: *const Self) { 208 // We "execute" a queue by executing its first job, FIFO. 209 loop { 210 match (*this).inner.steal() { 211 Steal::Success(job_ref) => break job_ref.execute(), 212 Steal::Empty => panic!("FIFO is empty"), 213 Steal::Retry => {} 214 } 215 } 216 } 217 } 218