1 //! Lock-free intrusive linked list.
2 //!
3 //! Ideas from Michael.  High Performance Dynamic Lock-Free Hash Tables and List-Based Sets.  SPAA
4 //! 2002.  http://dl.acm.org/citation.cfm?id=564870.564881
5 
6 use core::marker::PhantomData;
7 use core::sync::atomic::Ordering::{Acquire, Relaxed, Release};
8 
9 use {unprotected, Atomic, Guard, Shared};
10 
11 /// An entry in a linked list.
12 ///
13 /// An Entry is accessed from multiple threads, so it would be beneficial to put it in a different
14 /// cache-line than thread-local data in terms of performance.
15 #[derive(Debug)]
16 pub struct Entry {
17     /// The next entry in the linked list.
18     /// If the tag is 1, this entry is marked as deleted.
19     next: Atomic<Entry>,
20 }
21 
22 /// Implementing this trait asserts that the type `T` can be used as an element in the intrusive
23 /// linked list defined in this module. `T` has to contain (or otherwise be linked to) an instance
24 /// of `Entry`.
25 ///
26 /// # Example
27 ///
28 /// ```ignore
29 /// struct A {
30 ///     entry: Entry,
31 ///     data: usize,
32 /// }
33 ///
34 /// impl IsElement<A> for A {
35 ///     fn entry_of(a: &A) -> &Entry {
36 ///         let entry_ptr = ((a as usize) + offset_of!(A, entry)) as *const Entry;
37 ///         unsafe { &*entry_ptr }
38 ///     }
39 ///
40 ///     unsafe fn element_of(entry: &Entry) -> &T {
41 ///         let elem_ptr = ((entry as usize) - offset_of!(A, entry)) as *const T;
42 ///         &*elem_ptr
43 ///     }
44 ///
45 ///     unsafe fn finalize(entry: &Entry) {
46 ///         let elem = Self::element_of(entry);
47 ///         drop(Box::from_raw(elem as *const A as *mut A));
48 ///     }
49 /// }
50 /// ```
51 ///
52 /// This trait is implemented on a type separate from `T` (although it can be just `T`), because
53 /// one type might be placeable into multiple lists, in which case it would require multiple
54 /// implementations of `IsElement`. In such cases, each struct implementing `IsElement<T>`
55 /// represents a distinct `Entry` in `T`.
56 ///
57 /// For example, we can insert the following struct into two lists using `entry1` for one
58 /// and `entry2` for the other:
59 ///
60 /// ```ignore
61 /// struct B {
62 ///     entry1: Entry,
63 ///     entry2: Entry,
64 ///     data: usize,
65 /// }
66 /// ```
67 ///
68 pub trait IsElement<T> {
69     /// Returns a reference to this element's `Entry`.
entry_of(&T) -> &Entry70     fn entry_of(&T) -> &Entry;
71 
72     /// Given a reference to an element's entry, returns that element.
73     ///
74     /// ```ignore
75     /// let elem = ListElement::new();
76     /// assert_eq!(elem.entry_of(),
77     ///            unsafe { ListElement::element_of(elem.entry_of()) } );
78     /// ```
79     ///
80     /// # Safety
81     /// The caller has to guarantee that the `Entry` it
82     /// is called with was retrieved from an instance of the element type (`T`).
element_of(&Entry) -> &T83     unsafe fn element_of(&Entry) -> &T;
84 
85     /// Deallocates the whole element given its `Entry`. This is called when the list
86     /// is ready to actually free the element.
87     ///
88     /// # Safety
89     /// The caller has to guarantee that the `Entry` it
90     /// is called with was retrieved from an instance of the element type (`T`).
finalize(&Entry)91     unsafe fn finalize(&Entry);
92 }
93 
94 /// A lock-free, intrusive linked list of type `T`.
95 #[derive(Debug)]
96 pub struct List<T, C: IsElement<T> = T> {
97     /// The head of the linked list.
98     head: Atomic<Entry>,
99 
100     /// The phantom data for using `T` and `C`.
101     _marker: PhantomData<(T, C)>,
102 }
103 
104 /// An iterator used for retrieving values from the list.
105 pub struct Iter<'g, T: 'g, C: IsElement<T>> {
106     /// The guard that protects the iteration.
107     guard: &'g Guard,
108 
109     /// Pointer from the predecessor to the current entry.
110     pred: &'g Atomic<Entry>,
111 
112     /// The current entry.
113     curr: Shared<'g, Entry>,
114 
115     /// The list head, needed for restarting iteration.
116     head: &'g Atomic<Entry>,
117 
118     /// Logically, we store a borrow of an instance of `T` and
119     /// use the type information from `C`.
120     _marker: PhantomData<(&'g T, C)>,
121 }
122 
123 /// An error that occurs during iteration over the list.
124 #[derive(PartialEq, Debug)]
125 pub enum IterError {
126     /// A concurrent thread modified the state of the list at the same place that this iterator
127     /// was inspecting. Subsequent iteration will restart from the beginning of the list.
128     Stalled,
129 }
130 
131 impl Default for Entry {
132     /// Returns the empty entry.
default() -> Self133     fn default() -> Self {
134         Self {
135             next: Atomic::null(),
136         }
137     }
138 }
139 
140 impl Entry {
141     /// Marks this entry as deleted, deferring the actual deallocation to a later iteration.
142     ///
143     /// # Safety
144     ///
145     /// The entry should be a member of a linked list, and it should not have been deleted.
146     /// It should be safe to call `C::finalize` on the entry after the `guard` is dropped, where `C`
147     /// is the associated helper for the linked list.
delete(&self, guard: &Guard)148     pub unsafe fn delete(&self, guard: &Guard) {
149         self.next.fetch_or(1, Release, guard);
150     }
151 }
152 
153 impl<T, C: IsElement<T>> List<T, C> {
154     /// Returns a new, empty linked list.
new() -> Self155     pub fn new() -> Self {
156         Self {
157             head: Atomic::null(),
158             _marker: PhantomData,
159         }
160     }
161 
162     /// Inserts `entry` into the head of the list.
163     ///
164     /// # Safety
165     ///
166     /// You should guarantee that:
167     ///
168     /// - `container` is not null
169     /// - `container` is immovable, e.g. inside a `Box`
170     /// - the same `Entry` is not inserted more than once
171     /// - the inserted object will be removed before the list is dropped
insert<'g>(&'g self, container: Shared<'g, T>, guard: &'g Guard)172     pub unsafe fn insert<'g>(&'g self, container: Shared<'g, T>, guard: &'g Guard) {
173         // Insert right after head, i.e. at the beginning of the list.
174         let to = &self.head;
175         // Get the intrusively stored Entry of the new element to insert.
176         let entry: &Entry = C::entry_of(container.deref());
177         // Make a Shared ptr to that Entry.
178         let entry_ptr = Shared::from(entry as *const _);
179         // Read the current successor of where we want to insert.
180         let mut next = to.load(Relaxed, guard);
181 
182         loop {
183             // Set the Entry of the to-be-inserted element to point to the previous successor of
184             // `to`.
185             entry.next.store(next, Relaxed);
186             match to.compare_and_set_weak(next, entry_ptr, Release, guard) {
187                 Ok(_) => break,
188                 // We lost the race or weak CAS failed spuriously. Update the successor and try
189                 // again.
190                 Err(err) => next = err.current,
191             }
192         }
193     }
194 
195     /// Returns an iterator over all objects.
196     ///
197     /// # Caveat
198     ///
199     /// Every object that is inserted at the moment this function is called and persists at least
200     /// until the end of iteration will be returned. Since this iterator traverses a lock-free
201     /// linked list that may be concurrently modified, some additional caveats apply:
202     ///
203     /// 1. If a new object is inserted during iteration, it may or may not be returned.
204     /// 2. If an object is deleted during iteration, it may or may not be returned.
205     /// 3. The iteration may be aborted when it lost in a race condition. In this case, the winning
206     ///    thread will continue to iterate over the same list.
iter<'g>(&'g self, guard: &'g Guard) -> Iter<'g, T, C>207     pub fn iter<'g>(&'g self, guard: &'g Guard) -> Iter<'g, T, C> {
208         Iter {
209             guard,
210             pred: &self.head,
211             curr: self.head.load(Acquire, guard),
212             head: &self.head,
213             _marker: PhantomData,
214         }
215     }
216 }
217 
218 impl<T, C: IsElement<T>> Drop for List<T, C> {
drop(&mut self)219     fn drop(&mut self) {
220         unsafe {
221             let guard = &unprotected();
222             let mut curr = self.head.load(Relaxed, guard);
223             while let Some(c) = curr.as_ref() {
224                 let succ = c.next.load(Relaxed, guard);
225                 // Verify that all elements have been removed from the list.
226                 assert_eq!(succ.tag(), 1);
227 
228                 C::finalize(curr.deref());
229                 curr = succ;
230             }
231         }
232     }
233 }
234 
235 impl<'g, T: 'g, C: IsElement<T>> Iterator for Iter<'g, T, C> {
236     type Item = Result<&'g T, IterError>;
237 
next(&mut self) -> Option<Self::Item>238     fn next(&mut self) -> Option<Self::Item> {
239         while let Some(c) = unsafe { self.curr.as_ref() } {
240             let succ = c.next.load(Acquire, self.guard);
241 
242             if succ.tag() == 1 {
243                 // This entry was removed. Try unlinking it from the list.
244                 let succ = succ.with_tag(0);
245 
246                 // The tag should never be zero, because removing a node after a logically deleted
247                 // node leaves the list in an invalid state.
248                 debug_assert!(self.curr.tag() == 0);
249 
250                 match self
251                     .pred
252                     .compare_and_set(self.curr, succ, Acquire, self.guard)
253                 {
254                     Ok(_) => {
255                         // We succeeded in unlinking this element from the list, so we have to
256                         // schedule deallocation. Deferred drop is okay, because `list.delete()`
257                         // can only be called if `T: 'static`.
258                         unsafe {
259                             let p = self.curr;
260                             self.guard.defer_unchecked(move || C::finalize(p.deref()));
261                         }
262 
263                         // Move over the removed by only advancing `curr`, not `pred`.
264                         self.curr = succ;
265                         continue;
266                     }
267                     Err(_) => {
268                         // A concurrent thread modified the predecessor node. Since it might've
269                         // been deleted, we need to restart from `head`.
270                         self.pred = self.head;
271                         self.curr = self.head.load(Acquire, self.guard);
272 
273                         return Some(Err(IterError::Stalled));
274                     }
275                 }
276             }
277 
278             // Move one step forward.
279             self.pred = &c.next;
280             self.curr = succ;
281 
282             return Some(Ok(unsafe { C::element_of(c) }));
283         }
284 
285         // We reached the end of the list.
286         None
287     }
288 }
289 
290 #[cfg(test)]
291 mod tests {
292     use super::*;
293     use crossbeam_utils::thread;
294     use std::sync::Barrier;
295     use {Collector, Owned};
296 
297     impl IsElement<Entry> for Entry {
entry_of(entry: &Entry) -> &Entry298         fn entry_of(entry: &Entry) -> &Entry {
299             entry
300         }
301 
element_of(entry: &Entry) -> &Entry302         unsafe fn element_of(entry: &Entry) -> &Entry {
303             entry
304         }
305 
finalize(entry: &Entry)306         unsafe fn finalize(entry: &Entry) {
307             drop(Box::from_raw(entry as *const Entry as *mut Entry));
308         }
309     }
310 
311     /// Checks whether the list retains inserted elements
312     /// and returns them in the correct order.
313     #[test]
insert()314     fn insert() {
315         let collector = Collector::new();
316         let handle = collector.register();
317         let guard = handle.pin();
318 
319         let l: List<Entry> = List::new();
320 
321         let e1 = Owned::new(Entry::default()).into_shared(&guard);
322         let e2 = Owned::new(Entry::default()).into_shared(&guard);
323         let e3 = Owned::new(Entry::default()).into_shared(&guard);
324 
325         unsafe {
326             l.insert(e1, &guard);
327             l.insert(e2, &guard);
328             l.insert(e3, &guard);
329         }
330 
331         let mut iter = l.iter(&guard);
332         let maybe_e3 = iter.next();
333         assert!(maybe_e3.is_some());
334         assert!(maybe_e3.unwrap().unwrap() as *const Entry == e3.as_raw());
335         let maybe_e2 = iter.next();
336         assert!(maybe_e2.is_some());
337         assert!(maybe_e2.unwrap().unwrap() as *const Entry == e2.as_raw());
338         let maybe_e1 = iter.next();
339         assert!(maybe_e1.is_some());
340         assert!(maybe_e1.unwrap().unwrap() as *const Entry == e1.as_raw());
341         assert!(iter.next().is_none());
342 
343         unsafe {
344             e1.as_ref().unwrap().delete(&guard);
345             e2.as_ref().unwrap().delete(&guard);
346             e3.as_ref().unwrap().delete(&guard);
347         }
348     }
349 
350     /// Checks whether elements can be removed from the list and whether
351     /// the correct elements are removed.
352     #[test]
delete()353     fn delete() {
354         let collector = Collector::new();
355         let handle = collector.register();
356         let guard = handle.pin();
357 
358         let l: List<Entry> = List::new();
359 
360         let e1 = Owned::new(Entry::default()).into_shared(&guard);
361         let e2 = Owned::new(Entry::default()).into_shared(&guard);
362         let e3 = Owned::new(Entry::default()).into_shared(&guard);
363         unsafe {
364             l.insert(e1, &guard);
365             l.insert(e2, &guard);
366             l.insert(e3, &guard);
367             e2.as_ref().unwrap().delete(&guard);
368         }
369 
370         let mut iter = l.iter(&guard);
371         let maybe_e3 = iter.next();
372         assert!(maybe_e3.is_some());
373         assert!(maybe_e3.unwrap().unwrap() as *const Entry == e3.as_raw());
374         let maybe_e1 = iter.next();
375         assert!(maybe_e1.is_some());
376         assert!(maybe_e1.unwrap().unwrap() as *const Entry == e1.as_raw());
377         assert!(iter.next().is_none());
378 
379         unsafe {
380             e1.as_ref().unwrap().delete(&guard);
381             e3.as_ref().unwrap().delete(&guard);
382         }
383 
384         let mut iter = l.iter(&guard);
385         assert!(iter.next().is_none());
386     }
387 
388     const THREADS: usize = 8;
389     const ITERS: usize = 512;
390 
391     /// Contends the list on insert and delete operations to make sure they can run concurrently.
392     #[test]
insert_delete_multi()393     fn insert_delete_multi() {
394         let collector = Collector::new();
395 
396         let l: List<Entry> = List::new();
397         let b = Barrier::new(THREADS);
398 
399         thread::scope(|s| {
400             for _ in 0..THREADS {
401                 s.spawn(|_| {
402                     b.wait();
403 
404                     let handle = collector.register();
405                     let guard: Guard = handle.pin();
406                     let mut v = Vec::with_capacity(ITERS);
407 
408                     for _ in 0..ITERS {
409                         let e = Owned::new(Entry::default()).into_shared(&guard);
410                         v.push(e);
411                         unsafe {
412                             l.insert(e, &guard);
413                         }
414                     }
415 
416                     for e in v {
417                         unsafe {
418                             e.as_ref().unwrap().delete(&guard);
419                         }
420                     }
421                 });
422             }
423         }).unwrap();
424 
425         let handle = collector.register();
426         let guard = handle.pin();
427 
428         let mut iter = l.iter(&guard);
429         assert!(iter.next().is_none());
430     }
431 
432     /// Contends the list on iteration to make sure that it can be iterated over concurrently.
433     #[test]
iter_multi()434     fn iter_multi() {
435         let collector = Collector::new();
436 
437         let l: List<Entry> = List::new();
438         let b = Barrier::new(THREADS);
439 
440         thread::scope(|s| {
441             for _ in 0..THREADS {
442                 s.spawn(|_| {
443                     b.wait();
444 
445                     let handle = collector.register();
446                     let guard: Guard = handle.pin();
447                     let mut v = Vec::with_capacity(ITERS);
448 
449                     for _ in 0..ITERS {
450                         let e = Owned::new(Entry::default()).into_shared(&guard);
451                         v.push(e);
452                         unsafe {
453                             l.insert(e, &guard);
454                         }
455                     }
456 
457                     let mut iter = l.iter(&guard);
458                     for _ in 0..ITERS {
459                         assert!(iter.next().is_some());
460                     }
461 
462                     for e in v {
463                         unsafe {
464                             e.as_ref().unwrap().delete(&guard);
465                         }
466                     }
467                 });
468             }
469         }).unwrap();
470 
471         let handle = collector.register();
472         let guard = handle.pin();
473 
474         let mut iter = l.iter(&guard);
475         assert!(iter.next().is_none());
476     }
477 }
478