1 use super::plumbing::*; 2 use super::*; 3 use std::sync::atomic::{AtomicBool, Ordering}; 4 use std::thread; 5 6 /// `PanicFuse` is an adaptor that wraps an iterator with a fuse in case 7 /// of panics, to halt all threads as soon as possible. 8 /// 9 /// This struct is created by the [`panic_fuse()`] method on [`ParallelIterator`] 10 /// 11 /// [`panic_fuse()`]: trait.ParallelIterator.html#method.panic_fuse 12 /// [`ParallelIterator`]: trait.ParallelIterator.html 13 #[must_use = "iterator adaptors are lazy and do nothing unless consumed"] 14 #[derive(Debug, Clone)] 15 pub struct PanicFuse<I: ParallelIterator> { 16 base: I, 17 } 18 19 /// Helper that sets a bool to `true` if dropped while unwinding. 20 #[derive(Clone)] 21 struct Fuse<'a>(&'a AtomicBool); 22 23 impl<'a> Drop for Fuse<'a> { 24 #[inline] drop(&mut self)25 fn drop(&mut self) { 26 if thread::panicking() { 27 self.0.store(true, Ordering::Relaxed); 28 } 29 } 30 } 31 32 impl<'a> Fuse<'a> { 33 #[inline] panicked(&self) -> bool34 fn panicked(&self) -> bool { 35 self.0.load(Ordering::Relaxed) 36 } 37 } 38 39 impl<I> PanicFuse<I> 40 where 41 I: ParallelIterator, 42 { 43 /// Creates a new `PanicFuse` iterator. new(base: I) -> PanicFuse<I>44 pub(super) fn new(base: I) -> PanicFuse<I> { 45 PanicFuse { base } 46 } 47 } 48 49 impl<I> ParallelIterator for PanicFuse<I> 50 where 51 I: ParallelIterator, 52 { 53 type Item = I::Item; 54 drive_unindexed<C>(self, consumer: C) -> C::Result where C: UnindexedConsumer<Self::Item>,55 fn drive_unindexed<C>(self, consumer: C) -> C::Result 56 where 57 C: UnindexedConsumer<Self::Item>, 58 { 59 let panicked = AtomicBool::new(false); 60 let consumer1 = PanicFuseConsumer { 61 base: consumer, 62 fuse: Fuse(&panicked), 63 }; 64 self.base.drive_unindexed(consumer1) 65 } 66 opt_len(&self) -> Option<usize>67 fn opt_len(&self) -> Option<usize> { 68 self.base.opt_len() 69 } 70 } 71 72 impl<I> IndexedParallelIterator for PanicFuse<I> 73 where 74 I: IndexedParallelIterator, 75 { drive<C>(self, consumer: C) -> C::Result where C: Consumer<Self::Item>,76 fn drive<C>(self, consumer: C) -> C::Result 77 where 78 C: Consumer<Self::Item>, 79 { 80 let panicked = AtomicBool::new(false); 81 let consumer1 = PanicFuseConsumer { 82 base: consumer, 83 fuse: Fuse(&panicked), 84 }; 85 self.base.drive(consumer1) 86 } 87 len(&self) -> usize88 fn len(&self) -> usize { 89 self.base.len() 90 } 91 with_producer<CB>(self, callback: CB) -> CB::Output where CB: ProducerCallback<Self::Item>,92 fn with_producer<CB>(self, callback: CB) -> CB::Output 93 where 94 CB: ProducerCallback<Self::Item>, 95 { 96 return self.base.with_producer(Callback { callback }); 97 98 struct Callback<CB> { 99 callback: CB, 100 } 101 102 impl<T, CB> ProducerCallback<T> for Callback<CB> 103 where 104 CB: ProducerCallback<T>, 105 { 106 type Output = CB::Output; 107 108 fn callback<P>(self, base: P) -> CB::Output 109 where 110 P: Producer<Item = T>, 111 { 112 let panicked = AtomicBool::new(false); 113 let producer = PanicFuseProducer { 114 base, 115 fuse: Fuse(&panicked), 116 }; 117 self.callback.callback(producer) 118 } 119 } 120 } 121 } 122 123 /// //////////////////////////////////////////////////////////////////////// 124 /// Producer implementation 125 126 struct PanicFuseProducer<'a, P> { 127 base: P, 128 fuse: Fuse<'a>, 129 } 130 131 impl<'a, P> Producer for PanicFuseProducer<'a, P> 132 where 133 P: Producer, 134 { 135 type Item = P::Item; 136 type IntoIter = PanicFuseIter<'a, P::IntoIter>; 137 into_iter(self) -> Self::IntoIter138 fn into_iter(self) -> Self::IntoIter { 139 PanicFuseIter { 140 base: self.base.into_iter(), 141 fuse: self.fuse, 142 } 143 } 144 min_len(&self) -> usize145 fn min_len(&self) -> usize { 146 self.base.min_len() 147 } max_len(&self) -> usize148 fn max_len(&self) -> usize { 149 self.base.max_len() 150 } 151 split_at(self, index: usize) -> (Self, Self)152 fn split_at(self, index: usize) -> (Self, Self) { 153 let (left, right) = self.base.split_at(index); 154 ( 155 PanicFuseProducer { 156 base: left, 157 fuse: self.fuse.clone(), 158 }, 159 PanicFuseProducer { 160 base: right, 161 fuse: self.fuse, 162 }, 163 ) 164 } 165 fold_with<G>(self, folder: G) -> G where G: Folder<Self::Item>,166 fn fold_with<G>(self, folder: G) -> G 167 where 168 G: Folder<Self::Item>, 169 { 170 let folder1 = PanicFuseFolder { 171 base: folder, 172 fuse: self.fuse, 173 }; 174 self.base.fold_with(folder1).base 175 } 176 } 177 178 struct PanicFuseIter<'a, I> { 179 base: I, 180 fuse: Fuse<'a>, 181 } 182 183 impl<'a, I> Iterator for PanicFuseIter<'a, I> 184 where 185 I: Iterator, 186 { 187 type Item = I::Item; 188 next(&mut self) -> Option<Self::Item>189 fn next(&mut self) -> Option<Self::Item> { 190 if self.fuse.panicked() { 191 None 192 } else { 193 self.base.next() 194 } 195 } 196 size_hint(&self) -> (usize, Option<usize>)197 fn size_hint(&self) -> (usize, Option<usize>) { 198 self.base.size_hint() 199 } 200 } 201 202 impl<'a, I> DoubleEndedIterator for PanicFuseIter<'a, I> 203 where 204 I: DoubleEndedIterator, 205 { next_back(&mut self) -> Option<Self::Item>206 fn next_back(&mut self) -> Option<Self::Item> { 207 if self.fuse.panicked() { 208 None 209 } else { 210 self.base.next_back() 211 } 212 } 213 } 214 215 impl<'a, I> ExactSizeIterator for PanicFuseIter<'a, I> 216 where 217 I: ExactSizeIterator, 218 { len(&self) -> usize219 fn len(&self) -> usize { 220 self.base.len() 221 } 222 } 223 224 /// //////////////////////////////////////////////////////////////////////// 225 /// Consumer implementation 226 227 struct PanicFuseConsumer<'a, C> { 228 base: C, 229 fuse: Fuse<'a>, 230 } 231 232 impl<'a, T, C> Consumer<T> for PanicFuseConsumer<'a, C> 233 where 234 C: Consumer<T>, 235 { 236 type Folder = PanicFuseFolder<'a, C::Folder>; 237 type Reducer = PanicFuseReducer<'a, C::Reducer>; 238 type Result = C::Result; 239 split_at(self, index: usize) -> (Self, Self, Self::Reducer)240 fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { 241 let (left, right, reducer) = self.base.split_at(index); 242 ( 243 PanicFuseConsumer { 244 base: left, 245 fuse: self.fuse.clone(), 246 }, 247 PanicFuseConsumer { 248 base: right, 249 fuse: self.fuse.clone(), 250 }, 251 PanicFuseReducer { 252 base: reducer, 253 _fuse: self.fuse, 254 }, 255 ) 256 } 257 into_folder(self) -> Self::Folder258 fn into_folder(self) -> Self::Folder { 259 PanicFuseFolder { 260 base: self.base.into_folder(), 261 fuse: self.fuse, 262 } 263 } 264 full(&self) -> bool265 fn full(&self) -> bool { 266 self.fuse.panicked() || self.base.full() 267 } 268 } 269 270 impl<'a, T, C> UnindexedConsumer<T> for PanicFuseConsumer<'a, C> 271 where 272 C: UnindexedConsumer<T>, 273 { split_off_left(&self) -> Self274 fn split_off_left(&self) -> Self { 275 PanicFuseConsumer { 276 base: self.base.split_off_left(), 277 fuse: self.fuse.clone(), 278 } 279 } 280 to_reducer(&self) -> Self::Reducer281 fn to_reducer(&self) -> Self::Reducer { 282 PanicFuseReducer { 283 base: self.base.to_reducer(), 284 _fuse: self.fuse.clone(), 285 } 286 } 287 } 288 289 struct PanicFuseFolder<'a, C> { 290 base: C, 291 fuse: Fuse<'a>, 292 } 293 294 impl<'a, T, C> Folder<T> for PanicFuseFolder<'a, C> 295 where 296 C: Folder<T>, 297 { 298 type Result = C::Result; 299 consume(mut self, item: T) -> Self300 fn consume(mut self, item: T) -> Self { 301 self.base = self.base.consume(item); 302 self 303 } 304 consume_iter<I>(mut self, iter: I) -> Self where I: IntoIterator<Item = T>,305 fn consume_iter<I>(mut self, iter: I) -> Self 306 where 307 I: IntoIterator<Item = T>, 308 { 309 fn cool<'a, T>(fuse: &'a Fuse<'_>) -> impl Fn(&T) -> bool + 'a { 310 move |_| !fuse.panicked() 311 } 312 313 self.base = { 314 let fuse = &self.fuse; 315 let iter = iter.into_iter().take_while(cool(fuse)); 316 self.base.consume_iter(iter) 317 }; 318 self 319 } 320 complete(self) -> C::Result321 fn complete(self) -> C::Result { 322 self.base.complete() 323 } 324 full(&self) -> bool325 fn full(&self) -> bool { 326 self.fuse.panicked() || self.base.full() 327 } 328 } 329 330 struct PanicFuseReducer<'a, C> { 331 base: C, 332 _fuse: Fuse<'a>, 333 } 334 335 impl<'a, T, C> Reducer<T> for PanicFuseReducer<'a, C> 336 where 337 C: Reducer<T>, 338 { reduce(self, left: T, right: T) -> T339 fn reduce(self, left: T, right: T) -> T { 340 self.base.reduce(left, right) 341 } 342 } 343