1 //! Lock-free intrusive linked list.
2 //!
3 //! Ideas from Michael.  High Performance Dynamic Lock-Free Hash Tables and List-Based Sets.  SPAA
4 //! 2002.  http://dl.acm.org/citation.cfm?id=564870.564881
5 
6 use core::marker::PhantomData;
7 use core::sync::atomic::Ordering::{Acquire, Relaxed, Release};
8 
9 use {unprotected, Atomic, Guard, Shared};
10 
11 /// An entry in a linked list.
12 ///
13 /// An Entry is accessed from multiple threads, so it would be beneficial to put it in a different
14 /// cache-line than thread-local data in terms of performance.
15 #[derive(Debug)]
16 pub struct Entry {
17     /// The next entry in the linked list.
18     /// If the tag is 1, this entry is marked as deleted.
19     next: Atomic<Entry>,
20 }
21 
22 /// Implementing this trait asserts that the type `T` can be used as an element in the intrusive
23 /// linked list defined in this module. `T` has to contain (or otherwise be linked to) an instance
24 /// of `Entry`.
25 ///
26 /// # Example
27 ///
28 /// ```ignore
29 /// struct A {
30 ///     entry: Entry,
31 ///     data: usize,
32 /// }
33 ///
34 /// impl IsElement<A> for A {
35 ///     fn entry_of(a: &A) -> &Entry {
36 ///         let entry_ptr = ((a as usize) + offset_of!(A, entry)) as *const Entry;
37 ///         unsafe { &*entry_ptr }
38 ///     }
39 ///
40 ///     unsafe fn element_of(entry: &Entry) -> &T {
41 ///         let elem_ptr = ((entry as usize) - offset_of!(A, entry)) as *const T;
42 ///         &*elem_ptr
43 ///     }
44 ///
45 ///     unsafe fn finalize(entry: &Entry, guard: &Guard) {
46 ///         guard.defer_destroy(Shared::from(Self::element_of(entry) as *const _));
47 ///     }
48 /// }
49 /// ```
50 ///
51 /// This trait is implemented on a type separate from `T` (although it can be just `T`), because
52 /// one type might be placeable into multiple lists, in which case it would require multiple
53 /// implementations of `IsElement`. In such cases, each struct implementing `IsElement<T>`
54 /// represents a distinct `Entry` in `T`.
55 ///
56 /// For example, we can insert the following struct into two lists using `entry1` for one
57 /// and `entry2` for the other:
58 ///
59 /// ```ignore
60 /// struct B {
61 ///     entry1: Entry,
62 ///     entry2: Entry,
63 ///     data: usize,
64 /// }
65 /// ```
66 ///
67 pub trait IsElement<T> {
68     /// Returns a reference to this element's `Entry`.
entry_of(&T) -> &Entry69     fn entry_of(&T) -> &Entry;
70 
71     /// Given a reference to an element's entry, returns that element.
72     ///
73     /// ```ignore
74     /// let elem = ListElement::new();
75     /// assert_eq!(elem.entry_of(),
76     ///            unsafe { ListElement::element_of(elem.entry_of()) } );
77     /// ```
78     ///
79     /// # Safety
80     ///
81     /// The caller has to guarantee that the `Entry` is called with was retrieved from an instance
82     /// of the element type (`T`).
element_of(&Entry) -> &T83     unsafe fn element_of(&Entry) -> &T;
84 
85     /// The function that is called when an entry is unlinked from list.
86     ///
87     /// # Safety
88     ///
89     /// The caller has to guarantee that the `Entry` is called with was retrieved from an instance
90     /// of the element type (`T`).
finalize(&Entry, &Guard)91     unsafe fn finalize(&Entry, &Guard);
92 }
93 
94 /// A lock-free, intrusive linked list of type `T`.
95 #[derive(Debug)]
96 pub struct List<T, C: IsElement<T> = T> {
97     /// The head of the linked list.
98     head: Atomic<Entry>,
99 
100     /// The phantom data for using `T` and `C`.
101     _marker: PhantomData<(T, C)>,
102 }
103 
104 /// An iterator used for retrieving values from the list.
105 pub struct Iter<'g, T: 'g, C: IsElement<T>> {
106     /// The guard that protects the iteration.
107     guard: &'g Guard,
108 
109     /// Pointer from the predecessor to the current entry.
110     pred: &'g Atomic<Entry>,
111 
112     /// The current entry.
113     curr: Shared<'g, Entry>,
114 
115     /// The list head, needed for restarting iteration.
116     head: &'g Atomic<Entry>,
117 
118     /// Logically, we store a borrow of an instance of `T` and
119     /// use the type information from `C`.
120     _marker: PhantomData<(&'g T, C)>,
121 }
122 
123 /// An error that occurs during iteration over the list.
124 #[derive(PartialEq, Debug)]
125 pub enum IterError {
126     /// A concurrent thread modified the state of the list at the same place that this iterator
127     /// was inspecting. Subsequent iteration will restart from the beginning of the list.
128     Stalled,
129 }
130 
131 impl Default for Entry {
132     /// Returns the empty entry.
default() -> Self133     fn default() -> Self {
134         Self {
135             next: Atomic::null(),
136         }
137     }
138 }
139 
140 impl Entry {
141     /// Marks this entry as deleted, deferring the actual deallocation to a later iteration.
142     ///
143     /// # Safety
144     ///
145     /// The entry should be a member of a linked list, and it should not have been deleted.
146     /// It should be safe to call `C::finalize` on the entry after the `guard` is dropped, where `C`
147     /// is the associated helper for the linked list.
delete(&self, guard: &Guard)148     pub unsafe fn delete(&self, guard: &Guard) {
149         self.next.fetch_or(1, Release, guard);
150     }
151 }
152 
153 impl<T, C: IsElement<T>> List<T, C> {
154     /// Returns a new, empty linked list.
new() -> Self155     pub fn new() -> Self {
156         Self {
157             head: Atomic::null(),
158             _marker: PhantomData,
159         }
160     }
161 
162     /// Inserts `entry` into the head of the list.
163     ///
164     /// # Safety
165     ///
166     /// You should guarantee that:
167     ///
168     /// - `container` is not null
169     /// - `container` is immovable, e.g. inside an `Owned`
170     /// - the same `Entry` is not inserted more than once
171     /// - the inserted object will be removed before the list is dropped
insert<'g>(&'g self, container: Shared<'g, T>, guard: &'g Guard)172     pub unsafe fn insert<'g>(&'g self, container: Shared<'g, T>, guard: &'g Guard) {
173         // Insert right after head, i.e. at the beginning of the list.
174         let to = &self.head;
175         // Get the intrusively stored Entry of the new element to insert.
176         let entry: &Entry = C::entry_of(container.deref());
177         // Make a Shared ptr to that Entry.
178         let entry_ptr = Shared::from(entry as *const _);
179         // Read the current successor of where we want to insert.
180         let mut next = to.load(Relaxed, guard);
181 
182         loop {
183             // Set the Entry of the to-be-inserted element to point to the previous successor of
184             // `to`.
185             entry.next.store(next, Relaxed);
186             match to.compare_and_set_weak(next, entry_ptr, Release, guard) {
187                 Ok(_) => break,
188                 // We lost the race or weak CAS failed spuriously. Update the successor and try
189                 // again.
190                 Err(err) => next = err.current,
191             }
192         }
193     }
194 
195     /// Returns an iterator over all objects.
196     ///
197     /// # Caveat
198     ///
199     /// Every object that is inserted at the moment this function is called and persists at least
200     /// until the end of iteration will be returned. Since this iterator traverses a lock-free
201     /// linked list that may be concurrently modified, some additional caveats apply:
202     ///
203     /// 1. If a new object is inserted during iteration, it may or may not be returned.
204     /// 2. If an object is deleted during iteration, it may or may not be returned.
205     /// 3. The iteration may be aborted when it lost in a race condition. In this case, the winning
206     ///    thread will continue to iterate over the same list.
iter<'g>(&'g self, guard: &'g Guard) -> Iter<'g, T, C>207     pub fn iter<'g>(&'g self, guard: &'g Guard) -> Iter<'g, T, C> {
208         Iter {
209             guard,
210             pred: &self.head,
211             curr: self.head.load(Acquire, guard),
212             head: &self.head,
213             _marker: PhantomData,
214         }
215     }
216 }
217 
218 impl<T, C: IsElement<T>> Drop for List<T, C> {
drop(&mut self)219     fn drop(&mut self) {
220         unsafe {
221             let guard = &unprotected();
222             let mut curr = self.head.load(Relaxed, guard);
223             while let Some(c) = curr.as_ref() {
224                 let succ = c.next.load(Relaxed, guard);
225                 // Verify that all elements have been removed from the list.
226                 assert_eq!(succ.tag(), 1);
227 
228                 C::finalize(curr.deref(), guard);
229                 curr = succ;
230             }
231         }
232     }
233 }
234 
235 impl<'g, T: 'g, C: IsElement<T>> Iterator for Iter<'g, T, C> {
236     type Item = Result<&'g T, IterError>;
237 
next(&mut self) -> Option<Self::Item>238     fn next(&mut self) -> Option<Self::Item> {
239         while let Some(c) = unsafe { self.curr.as_ref() } {
240             let succ = c.next.load(Acquire, self.guard);
241 
242             if succ.tag() == 1 {
243                 // This entry was removed. Try unlinking it from the list.
244                 let succ = succ.with_tag(0);
245 
246                 // The tag should always be zero, because removing a node after a logically deleted
247                 // node leaves the list in an invalid state.
248                 debug_assert!(self.curr.tag() == 0);
249 
250                 // Try to unlink `curr` from the list, and get the new value of `self.pred`.
251                 let succ = match self
252                     .pred
253                     .compare_and_set(self.curr, succ, Acquire, self.guard)
254                 {
255                     Ok(_) => {
256                         // We succeeded in unlinking `curr`, so we have to schedule
257                         // deallocation. Deferred drop is okay, because `list.delete()` can only be
258                         // called if `T: 'static`.
259                         unsafe {
260                             C::finalize(self.curr.deref(), self.guard);
261                         }
262 
263                         // `succ` is the new value of `self.pred`.
264                         succ
265                     }
266                     Err(e) => {
267                         // `e.current` is the current value of `self.pred`.
268                         e.current
269                     }
270                 };
271 
272                 // If the predecessor node is already marked as deleted, we need to restart from
273                 // `head`.
274                 if succ.tag() != 0 {
275                     self.pred = self.head;
276                     self.curr = self.head.load(Acquire, self.guard);
277 
278                     return Some(Err(IterError::Stalled));
279                 }
280 
281                 // Move over the removed by only advancing `curr`, not `pred`.
282                 self.curr = succ;
283                 continue;
284             }
285 
286             // Move one step forward.
287             self.pred = &c.next;
288             self.curr = succ;
289 
290             return Some(Ok(unsafe { C::element_of(c) }));
291         }
292 
293         // We reached the end of the list.
294         None
295     }
296 }
297 
298 #[cfg(test)]
299 mod tests {
300     use super::*;
301     use crossbeam_utils::thread;
302     use std::sync::Barrier;
303     use {Collector, Owned};
304 
305     impl IsElement<Entry> for Entry {
entry_of(entry: &Entry) -> &Entry306         fn entry_of(entry: &Entry) -> &Entry {
307             entry
308         }
309 
element_of(entry: &Entry) -> &Entry310         unsafe fn element_of(entry: &Entry) -> &Entry {
311             entry
312         }
313 
finalize(entry: &Entry, guard: &Guard)314         unsafe fn finalize(entry: &Entry, guard: &Guard) {
315             guard.defer_destroy(Shared::from(Self::element_of(entry) as *const _));
316         }
317     }
318 
319     /// Checks whether the list retains inserted elements
320     /// and returns them in the correct order.
321     #[test]
insert()322     fn insert() {
323         let collector = Collector::new();
324         let handle = collector.register();
325         let guard = handle.pin();
326 
327         let l: List<Entry> = List::new();
328 
329         let e1 = Owned::new(Entry::default()).into_shared(&guard);
330         let e2 = Owned::new(Entry::default()).into_shared(&guard);
331         let e3 = Owned::new(Entry::default()).into_shared(&guard);
332 
333         unsafe {
334             l.insert(e1, &guard);
335             l.insert(e2, &guard);
336             l.insert(e3, &guard);
337         }
338 
339         let mut iter = l.iter(&guard);
340         let maybe_e3 = iter.next();
341         assert!(maybe_e3.is_some());
342         assert!(maybe_e3.unwrap().unwrap() as *const Entry == e3.as_raw());
343         let maybe_e2 = iter.next();
344         assert!(maybe_e2.is_some());
345         assert!(maybe_e2.unwrap().unwrap() as *const Entry == e2.as_raw());
346         let maybe_e1 = iter.next();
347         assert!(maybe_e1.is_some());
348         assert!(maybe_e1.unwrap().unwrap() as *const Entry == e1.as_raw());
349         assert!(iter.next().is_none());
350 
351         unsafe {
352             e1.as_ref().unwrap().delete(&guard);
353             e2.as_ref().unwrap().delete(&guard);
354             e3.as_ref().unwrap().delete(&guard);
355         }
356     }
357 
358     /// Checks whether elements can be removed from the list and whether
359     /// the correct elements are removed.
360     #[test]
delete()361     fn delete() {
362         let collector = Collector::new();
363         let handle = collector.register();
364         let guard = handle.pin();
365 
366         let l: List<Entry> = List::new();
367 
368         let e1 = Owned::new(Entry::default()).into_shared(&guard);
369         let e2 = Owned::new(Entry::default()).into_shared(&guard);
370         let e3 = Owned::new(Entry::default()).into_shared(&guard);
371         unsafe {
372             l.insert(e1, &guard);
373             l.insert(e2, &guard);
374             l.insert(e3, &guard);
375             e2.as_ref().unwrap().delete(&guard);
376         }
377 
378         let mut iter = l.iter(&guard);
379         let maybe_e3 = iter.next();
380         assert!(maybe_e3.is_some());
381         assert!(maybe_e3.unwrap().unwrap() as *const Entry == e3.as_raw());
382         let maybe_e1 = iter.next();
383         assert!(maybe_e1.is_some());
384         assert!(maybe_e1.unwrap().unwrap() as *const Entry == e1.as_raw());
385         assert!(iter.next().is_none());
386 
387         unsafe {
388             e1.as_ref().unwrap().delete(&guard);
389             e3.as_ref().unwrap().delete(&guard);
390         }
391 
392         let mut iter = l.iter(&guard);
393         assert!(iter.next().is_none());
394     }
395 
396     const THREADS: usize = 8;
397     const ITERS: usize = 512;
398 
399     /// Contends the list on insert and delete operations to make sure they can run concurrently.
400     #[test]
insert_delete_multi()401     fn insert_delete_multi() {
402         let collector = Collector::new();
403 
404         let l: List<Entry> = List::new();
405         let b = Barrier::new(THREADS);
406 
407         thread::scope(|s| {
408             for _ in 0..THREADS {
409                 s.spawn(|_| {
410                     b.wait();
411 
412                     let handle = collector.register();
413                     let guard: Guard = handle.pin();
414                     let mut v = Vec::with_capacity(ITERS);
415 
416                     for _ in 0..ITERS {
417                         let e = Owned::new(Entry::default()).into_shared(&guard);
418                         v.push(e);
419                         unsafe {
420                             l.insert(e, &guard);
421                         }
422                     }
423 
424                     for e in v {
425                         unsafe {
426                             e.as_ref().unwrap().delete(&guard);
427                         }
428                     }
429                 });
430             }
431         })
432         .unwrap();
433 
434         let handle = collector.register();
435         let guard = handle.pin();
436 
437         let mut iter = l.iter(&guard);
438         assert!(iter.next().is_none());
439     }
440 
441     /// Contends the list on iteration to make sure that it can be iterated over concurrently.
442     #[test]
iter_multi()443     fn iter_multi() {
444         let collector = Collector::new();
445 
446         let l: List<Entry> = List::new();
447         let b = Barrier::new(THREADS);
448 
449         thread::scope(|s| {
450             for _ in 0..THREADS {
451                 s.spawn(|_| {
452                     b.wait();
453 
454                     let handle = collector.register();
455                     let guard: Guard = handle.pin();
456                     let mut v = Vec::with_capacity(ITERS);
457 
458                     for _ in 0..ITERS {
459                         let e = Owned::new(Entry::default()).into_shared(&guard);
460                         v.push(e);
461                         unsafe {
462                             l.insert(e, &guard);
463                         }
464                     }
465 
466                     let mut iter = l.iter(&guard);
467                     for _ in 0..ITERS {
468                         assert!(iter.next().is_some());
469                     }
470 
471                     for e in v {
472                         unsafe {
473                             e.as_ref().unwrap().delete(&guard);
474                         }
475                     }
476                 });
477             }
478         })
479         .unwrap();
480 
481         let handle = collector.register();
482         let guard = handle.pin();
483 
484         let mut iter = l.iter(&guard);
485         assert!(iter.next().is_none());
486     }
487 }
488