1 use crate::loom::cell::UnsafeCell;
2 use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize};
3 use crate::loom::thread;
4 
5 use std::mem::MaybeUninit;
6 use std::ops;
7 use std::ptr::{self, NonNull};
8 use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release};
9 
10 /// A block in a linked list.
11 ///
12 /// Each block in the list can hold up to `BLOCK_CAP` messages.
13 pub(crate) struct Block<T> {
14     /// The start index of this block.
15     ///
16     /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`.
17     start_index: usize,
18 
19     /// The next block in the linked list.
20     next: AtomicPtr<Block<T>>,
21 
22     /// Bitfield tracking slots that are ready to have their values consumed.
23     ready_slots: AtomicUsize,
24 
25     /// The observed `tail_position` value *after* the block has been passed by
26     /// `block_tail`.
27     observed_tail_position: UnsafeCell<usize>,
28 
29     /// Array containing values pushed into the block. Values are stored in a
30     /// continuous array in order to improve cache line behavior when reading.
31     /// The values must be manually dropped.
32     values: Values<T>,
33 }
34 
35 pub(crate) enum Read<T> {
36     Value(T),
37     Closed,
38 }
39 
40 struct Values<T>([UnsafeCell<MaybeUninit<T>>; BLOCK_CAP]);
41 
42 use super::BLOCK_CAP;
43 
44 /// Masks an index to get the block identifier
45 const BLOCK_MASK: usize = !(BLOCK_CAP - 1);
46 
47 /// Masks an index to get the value offset in a block.
48 const SLOT_MASK: usize = BLOCK_CAP - 1;
49 
50 /// Flag tracking that a block has gone through the sender's release routine.
51 ///
52 /// When this is set, the receiver may consider freeing the block.
53 const RELEASED: usize = 1 << BLOCK_CAP;
54 
55 /// Flag tracking all senders dropped.
56 ///
57 /// When this flag is set, the send half of the channel has closed.
58 const TX_CLOSED: usize = RELEASED << 1;
59 
60 /// Mask covering all bits used to track slot readiness.
61 const READY_MASK: usize = RELEASED - 1;
62 
63 /// Returns the index of the first slot in the block referenced by `slot_index`.
64 #[inline(always)]
start_index(slot_index: usize) -> usize65 pub(crate) fn start_index(slot_index: usize) -> usize {
66     BLOCK_MASK & slot_index
67 }
68 
69 /// Returns the offset into the block referenced by `slot_index`.
70 #[inline(always)]
offset(slot_index: usize) -> usize71 pub(crate) fn offset(slot_index: usize) -> usize {
72     SLOT_MASK & slot_index
73 }
74 
75 impl<T> Block<T> {
new(start_index: usize) -> Block<T>76     pub(crate) fn new(start_index: usize) -> Block<T> {
77         Block {
78             // The absolute index in the channel of the first slot in the block.
79             start_index,
80 
81             // Pointer to the next block in the linked list.
82             next: AtomicPtr::new(ptr::null_mut()),
83 
84             ready_slots: AtomicUsize::new(0),
85 
86             observed_tail_position: UnsafeCell::new(0),
87 
88             // Value storage
89             values: unsafe { Values::uninitialized() },
90         }
91     }
92 
93     /// Returns `true` if the block matches the given index
is_at_index(&self, index: usize) -> bool94     pub(crate) fn is_at_index(&self, index: usize) -> bool {
95         debug_assert!(offset(index) == 0);
96         self.start_index == index
97     }
98 
99     /// Returns the number of blocks between `self` and the block at the
100     /// specified index.
101     ///
102     /// `start_index` must represent a block *after* `self`.
distance(&self, other_index: usize) -> usize103     pub(crate) fn distance(&self, other_index: usize) -> usize {
104         debug_assert!(offset(other_index) == 0);
105         other_index.wrapping_sub(self.start_index) / BLOCK_CAP
106     }
107 
108     /// Reads the value at the given offset.
109     ///
110     /// Returns `None` if the slot is empty.
111     ///
112     /// # Safety
113     ///
114     /// To maintain safety, the caller must ensure:
115     ///
116     /// * No concurrent access to the slot.
read(&self, slot_index: usize) -> Option<Read<T>>117     pub(crate) unsafe fn read(&self, slot_index: usize) -> Option<Read<T>> {
118         let offset = offset(slot_index);
119 
120         let ready_bits = self.ready_slots.load(Acquire);
121 
122         if !is_ready(ready_bits, offset) {
123             if is_tx_closed(ready_bits) {
124                 return Some(Read::Closed);
125             }
126 
127             return None;
128         }
129 
130         // Get the value
131         let value = self.values[offset].with(|ptr| ptr::read(ptr));
132 
133         Some(Read::Value(value.assume_init()))
134     }
135 
136     /// Writes a value to the block at the given offset.
137     ///
138     /// # Safety
139     ///
140     /// To maintain safety, the caller must ensure:
141     ///
142     /// * The slot is empty.
143     /// * No concurrent access to the slot.
write(&self, slot_index: usize, value: T)144     pub(crate) unsafe fn write(&self, slot_index: usize, value: T) {
145         // Get the offset into the block
146         let slot_offset = offset(slot_index);
147 
148         self.values[slot_offset].with_mut(|ptr| {
149             ptr::write(ptr, MaybeUninit::new(value));
150         });
151 
152         // Release the value. After this point, the slot ref may no longer
153         // be used. It is possible for the receiver to free the memory at
154         // any point.
155         self.set_ready(slot_offset);
156     }
157 
158     /// Signal to the receiver that the sender half of the list is closed.
tx_close(&self)159     pub(crate) unsafe fn tx_close(&self) {
160         self.ready_slots.fetch_or(TX_CLOSED, Release);
161     }
162 
163     /// Resets the block to a blank state. This enables reusing blocks in the
164     /// channel.
165     ///
166     /// # Safety
167     ///
168     /// To maintain safety, the caller must ensure:
169     ///
170     /// * All slots are empty.
171     /// * The caller holds a unique pointer to the block.
reclaim(&mut self)172     pub(crate) unsafe fn reclaim(&mut self) {
173         self.start_index = 0;
174         self.next = AtomicPtr::new(ptr::null_mut());
175         self.ready_slots = AtomicUsize::new(0);
176     }
177 
178     /// Releases the block to the rx half for freeing.
179     ///
180     /// This function is called by the tx half once it can be guaranteed that no
181     /// more senders will attempt to access the block.
182     ///
183     /// # Safety
184     ///
185     /// To maintain safety, the caller must ensure:
186     ///
187     /// * The block will no longer be accessed by any sender.
tx_release(&self, tail_position: usize)188     pub(crate) unsafe fn tx_release(&self, tail_position: usize) {
189         // Track the observed tail_position. Any sender targeting a greater
190         // tail_position is guaranteed to not access this block.
191         self.observed_tail_position
192             .with_mut(|ptr| *ptr = tail_position);
193 
194         // Set the released bit, signalling to the receiver that it is safe to
195         // free the block's memory as soon as all slots **prior** to
196         // `observed_tail_position` have been filled.
197         self.ready_slots.fetch_or(RELEASED, Release);
198     }
199 
200     /// Mark a slot as ready
set_ready(&self, slot: usize)201     fn set_ready(&self, slot: usize) {
202         let mask = 1 << slot;
203         self.ready_slots.fetch_or(mask, Release);
204     }
205 
206     /// Returns `true` when all slots have their `ready` bits set.
207     ///
208     /// This indicates that the block is in its final state and will no longer
209     /// be mutated.
210     ///
211     /// # Implementation
212     ///
213     /// The implementation walks each slot checking the `ready` flag. It might
214     /// be that it would make more sense to coalesce ready flags as bits in a
215     /// single atomic cell. However, this could have negative impact on cache
216     /// behavior as there would be many more mutations to a single slot.
is_final(&self) -> bool217     pub(crate) fn is_final(&self) -> bool {
218         self.ready_slots.load(Acquire) & READY_MASK == READY_MASK
219     }
220 
221     /// Returns the `observed_tail_position` value, if set
observed_tail_position(&self) -> Option<usize>222     pub(crate) fn observed_tail_position(&self) -> Option<usize> {
223         if 0 == RELEASED & self.ready_slots.load(Acquire) {
224             None
225         } else {
226             Some(self.observed_tail_position.with(|ptr| unsafe { *ptr }))
227         }
228     }
229 
230     /// Loads the next block
load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>>231     pub(crate) fn load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>> {
232         let ret = NonNull::new(self.next.load(ordering));
233 
234         debug_assert!(unsafe {
235             ret.map(|block| block.as_ref().start_index == self.start_index.wrapping_add(BLOCK_CAP))
236                 .unwrap_or(true)
237         });
238 
239         ret
240     }
241 
242     /// Pushes `block` as the next block in the link.
243     ///
244     /// Returns Ok if successful, otherwise, a pointer to the next block in
245     /// the list is returned.
246     ///
247     /// This requires that the next pointer is null.
248     ///
249     /// # Ordering
250     ///
251     /// This performs a compare-and-swap on `next` using AcqRel ordering.
252     ///
253     /// # Safety
254     ///
255     /// To maintain safety, the caller must ensure:
256     ///
257     /// * `block` is not freed until it has been removed from the list.
try_push( &self, block: &mut NonNull<Block<T>>, success: Ordering, failure: Ordering, ) -> Result<(), NonNull<Block<T>>>258     pub(crate) unsafe fn try_push(
259         &self,
260         block: &mut NonNull<Block<T>>,
261         success: Ordering,
262         failure: Ordering,
263     ) -> Result<(), NonNull<Block<T>>> {
264         block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP);
265 
266         let next_ptr = self
267             .next
268             .compare_exchange(ptr::null_mut(), block.as_ptr(), success, failure)
269             .unwrap_or_else(|x| x);
270 
271         match NonNull::new(next_ptr) {
272             Some(next_ptr) => Err(next_ptr),
273             None => Ok(()),
274         }
275     }
276 
277     /// Grows the `Block` linked list by allocating and appending a new block.
278     ///
279     /// The next block in the linked list is returned. This may or may not be
280     /// the one allocated by the function call.
281     ///
282     /// # Implementation
283     ///
284     /// It is assumed that `self.next` is null. A new block is allocated with
285     /// `start_index` set to be the next block. A compare-and-swap is performed
286     /// with AcqRel memory ordering. If the compare-and-swap is successful, the
287     /// newly allocated block is released to other threads walking the block
288     /// linked list. If the compare-and-swap fails, the current thread acquires
289     /// the next block in the linked list, allowing the current thread to access
290     /// the slots.
grow(&self) -> NonNull<Block<T>>291     pub(crate) fn grow(&self) -> NonNull<Block<T>> {
292         // Create the new block. It is assumed that the block will become the
293         // next one after `&self`. If this turns out to not be the case,
294         // `start_index` is updated accordingly.
295         let new_block = Box::new(Block::new(self.start_index + BLOCK_CAP));
296 
297         let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) };
298 
299         // Attempt to store the block. The first compare-and-swap attempt is
300         // "unrolled" due to minor differences in logic
301         //
302         // `AcqRel` is used as the ordering **only** when attempting the
303         // compare-and-swap on self.next.
304         //
305         // If the compare-and-swap fails, then the actual value of the cell is
306         // returned from this function and accessed by the caller. Given this,
307         // the memory must be acquired.
308         //
309         // `Release` ensures that the newly allocated block is available to
310         // other threads acquiring the next pointer.
311         let next = NonNull::new(
312             self.next
313                 .compare_exchange(ptr::null_mut(), new_block.as_ptr(), AcqRel, Acquire)
314                 .unwrap_or_else(|x| x),
315         );
316 
317         let next = match next {
318             Some(next) => next,
319             None => {
320                 // The compare-and-swap succeeded and the newly allocated block
321                 // is successfully pushed.
322                 return new_block;
323             }
324         };
325 
326         // There already is a next block in the linked list. The newly allocated
327         // block could be dropped and the discovered next block returned;
328         // however, that would be wasteful. Instead, the linked list is walked
329         // by repeatedly attempting to compare-and-swap the pointer into the
330         // `next` register until the compare-and-swap succeed.
331         //
332         // Care is taken to update new_block's start_index field as appropriate.
333 
334         let mut curr = next;
335 
336         // TODO: Should this iteration be capped?
337         loop {
338             let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel, Acquire) };
339 
340             curr = match actual {
341                 Ok(_) => {
342                     return next;
343                 }
344                 Err(curr) => curr,
345             };
346 
347             // When running outside of loom, this calls `spin_loop_hint`.
348             thread::yield_now();
349         }
350     }
351 }
352 
353 /// Returns `true` if the specified slot has a value ready to be consumed.
is_ready(bits: usize, slot: usize) -> bool354 fn is_ready(bits: usize, slot: usize) -> bool {
355     let mask = 1 << slot;
356     mask == mask & bits
357 }
358 
359 /// Returns `true` if the closed flag has been set.
is_tx_closed(bits: usize) -> bool360 fn is_tx_closed(bits: usize) -> bool {
361     TX_CLOSED == bits & TX_CLOSED
362 }
363 
364 impl<T> Values<T> {
uninitialized() -> Values<T>365     unsafe fn uninitialized() -> Values<T> {
366         let mut vals = MaybeUninit::uninit();
367 
368         // When fuzzing, `UnsafeCell` needs to be initialized.
369         if_loom! {
370             let p = vals.as_mut_ptr() as *mut UnsafeCell<MaybeUninit<T>>;
371             for i in 0..BLOCK_CAP {
372                 p.add(i)
373                     .write(UnsafeCell::new(MaybeUninit::uninit()));
374             }
375         }
376 
377         Values(vals.assume_init())
378     }
379 }
380 
381 impl<T> ops::Index<usize> for Values<T> {
382     type Output = UnsafeCell<MaybeUninit<T>>;
383 
index(&self, index: usize) -> &Self::Output384     fn index(&self, index: usize) -> &Self::Output {
385         self.0.index(index)
386     }
387 }
388