1 use crate::loom::cell::UnsafeCell;
2 use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize};
3 use crate::loom::thread;
4
5 use std::mem::MaybeUninit;
6 use std::ops;
7 use std::ptr::{self, NonNull};
8 use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Release};
9
10 /// A block in a linked list.
11 ///
12 /// Each block in the list can hold up to `BLOCK_CAP` messages.
13 pub(crate) struct Block<T> {
14 /// The start index of this block.
15 ///
16 /// Slots in this block have indices in `start_index .. start_index + BLOCK_CAP`.
17 start_index: usize,
18
19 /// The next block in the linked list.
20 next: AtomicPtr<Block<T>>,
21
22 /// Bitfield tracking slots that are ready to have their values consumed.
23 ready_slots: AtomicUsize,
24
25 /// The observed `tail_position` value *after* the block has been passed by
26 /// `block_tail`.
27 observed_tail_position: UnsafeCell<usize>,
28
29 /// Array containing values pushed into the block. Values are stored in a
30 /// continuous array in order to improve cache line behavior when reading.
31 /// The values must be manually dropped.
32 values: Values<T>,
33 }
34
35 pub(crate) enum Read<T> {
36 Value(T),
37 Closed,
38 }
39
40 struct Values<T>([UnsafeCell<MaybeUninit<T>>; BLOCK_CAP]);
41
42 use super::BLOCK_CAP;
43
44 /// Masks an index to get the block identifier
45 const BLOCK_MASK: usize = !(BLOCK_CAP - 1);
46
47 /// Masks an index to get the value offset in a block.
48 const SLOT_MASK: usize = BLOCK_CAP - 1;
49
50 /// Flag tracking that a block has gone through the sender's release routine.
51 ///
52 /// When this is set, the receiver may consider freeing the block.
53 const RELEASED: usize = 1 << BLOCK_CAP;
54
55 /// Flag tracking all senders dropped.
56 ///
57 /// When this flag is set, the send half of the channel has closed.
58 const TX_CLOSED: usize = RELEASED << 1;
59
60 /// Mask covering all bits used to track slot readiness.
61 const READY_MASK: usize = RELEASED - 1;
62
63 /// Returns the index of the first slot in the block referenced by `slot_index`.
64 #[inline(always)]
start_index(slot_index: usize) -> usize65 pub(crate) fn start_index(slot_index: usize) -> usize {
66 BLOCK_MASK & slot_index
67 }
68
69 /// Returns the offset into the block referenced by `slot_index`.
70 #[inline(always)]
offset(slot_index: usize) -> usize71 pub(crate) fn offset(slot_index: usize) -> usize {
72 SLOT_MASK & slot_index
73 }
74
75 impl<T> Block<T> {
new(start_index: usize) -> Block<T>76 pub(crate) fn new(start_index: usize) -> Block<T> {
77 Block {
78 // The absolute index in the channel of the first slot in the block.
79 start_index,
80
81 // Pointer to the next block in the linked list.
82 next: AtomicPtr::new(ptr::null_mut()),
83
84 ready_slots: AtomicUsize::new(0),
85
86 observed_tail_position: UnsafeCell::new(0),
87
88 // Value storage
89 values: unsafe { Values::uninitialized() },
90 }
91 }
92
93 /// Returns `true` if the block matches the given index
is_at_index(&self, index: usize) -> bool94 pub(crate) fn is_at_index(&self, index: usize) -> bool {
95 debug_assert!(offset(index) == 0);
96 self.start_index == index
97 }
98
99 /// Returns the number of blocks between `self` and the block at the
100 /// specified index.
101 ///
102 /// `start_index` must represent a block *after* `self`.
distance(&self, other_index: usize) -> usize103 pub(crate) fn distance(&self, other_index: usize) -> usize {
104 debug_assert!(offset(other_index) == 0);
105 other_index.wrapping_sub(self.start_index) / BLOCK_CAP
106 }
107
108 /// Reads the value at the given offset.
109 ///
110 /// Returns `None` if the slot is empty.
111 ///
112 /// # Safety
113 ///
114 /// To maintain safety, the caller must ensure:
115 ///
116 /// * No concurrent access to the slot.
read(&self, slot_index: usize) -> Option<Read<T>>117 pub(crate) unsafe fn read(&self, slot_index: usize) -> Option<Read<T>> {
118 let offset = offset(slot_index);
119
120 let ready_bits = self.ready_slots.load(Acquire);
121
122 if !is_ready(ready_bits, offset) {
123 if is_tx_closed(ready_bits) {
124 return Some(Read::Closed);
125 }
126
127 return None;
128 }
129
130 // Get the value
131 let value = self.values[offset].with(|ptr| ptr::read(ptr));
132
133 Some(Read::Value(value.assume_init()))
134 }
135
136 /// Writes a value to the block at the given offset.
137 ///
138 /// # Safety
139 ///
140 /// To maintain safety, the caller must ensure:
141 ///
142 /// * The slot is empty.
143 /// * No concurrent access to the slot.
write(&self, slot_index: usize, value: T)144 pub(crate) unsafe fn write(&self, slot_index: usize, value: T) {
145 // Get the offset into the block
146 let slot_offset = offset(slot_index);
147
148 self.values[slot_offset].with_mut(|ptr| {
149 ptr::write(ptr, MaybeUninit::new(value));
150 });
151
152 // Release the value. After this point, the slot ref may no longer
153 // be used. It is possible for the receiver to free the memory at
154 // any point.
155 self.set_ready(slot_offset);
156 }
157
158 /// Signal to the receiver that the sender half of the list is closed.
tx_close(&self)159 pub(crate) unsafe fn tx_close(&self) {
160 self.ready_slots.fetch_or(TX_CLOSED, Release);
161 }
162
163 /// Resets the block to a blank state. This enables reusing blocks in the
164 /// channel.
165 ///
166 /// # Safety
167 ///
168 /// To maintain safety, the caller must ensure:
169 ///
170 /// * All slots are empty.
171 /// * The caller holds a unique pointer to the block.
reclaim(&mut self)172 pub(crate) unsafe fn reclaim(&mut self) {
173 self.start_index = 0;
174 self.next = AtomicPtr::new(ptr::null_mut());
175 self.ready_slots = AtomicUsize::new(0);
176 }
177
178 /// Releases the block to the rx half for freeing.
179 ///
180 /// This function is called by the tx half once it can be guaranteed that no
181 /// more senders will attempt to access the block.
182 ///
183 /// # Safety
184 ///
185 /// To maintain safety, the caller must ensure:
186 ///
187 /// * The block will no longer be accessed by any sender.
tx_release(&self, tail_position: usize)188 pub(crate) unsafe fn tx_release(&self, tail_position: usize) {
189 // Track the observed tail_position. Any sender targeting a greater
190 // tail_position is guaranteed to not access this block.
191 self.observed_tail_position
192 .with_mut(|ptr| *ptr = tail_position);
193
194 // Set the released bit, signalling to the receiver that it is safe to
195 // free the block's memory as soon as all slots **prior** to
196 // `observed_tail_position` have been filled.
197 self.ready_slots.fetch_or(RELEASED, Release);
198 }
199
200 /// Mark a slot as ready
set_ready(&self, slot: usize)201 fn set_ready(&self, slot: usize) {
202 let mask = 1 << slot;
203 self.ready_slots.fetch_or(mask, Release);
204 }
205
206 /// Returns `true` when all slots have their `ready` bits set.
207 ///
208 /// This indicates that the block is in its final state and will no longer
209 /// be mutated.
210 ///
211 /// # Implementation
212 ///
213 /// The implementation walks each slot checking the `ready` flag. It might
214 /// be that it would make more sense to coalesce ready flags as bits in a
215 /// single atomic cell. However, this could have negative impact on cache
216 /// behavior as there would be many more mutations to a single slot.
is_final(&self) -> bool217 pub(crate) fn is_final(&self) -> bool {
218 self.ready_slots.load(Acquire) & READY_MASK == READY_MASK
219 }
220
221 /// Returns the `observed_tail_position` value, if set
observed_tail_position(&self) -> Option<usize>222 pub(crate) fn observed_tail_position(&self) -> Option<usize> {
223 if 0 == RELEASED & self.ready_slots.load(Acquire) {
224 None
225 } else {
226 Some(self.observed_tail_position.with(|ptr| unsafe { *ptr }))
227 }
228 }
229
230 /// Loads the next block
load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>>231 pub(crate) fn load_next(&self, ordering: Ordering) -> Option<NonNull<Block<T>>> {
232 let ret = NonNull::new(self.next.load(ordering));
233
234 debug_assert!(unsafe {
235 ret.map(|block| block.as_ref().start_index == self.start_index.wrapping_add(BLOCK_CAP))
236 .unwrap_or(true)
237 });
238
239 ret
240 }
241
242 /// Pushes `block` as the next block in the link.
243 ///
244 /// Returns Ok if successful, otherwise, a pointer to the next block in
245 /// the list is returned.
246 ///
247 /// This requires that the next pointer is null.
248 ///
249 /// # Ordering
250 ///
251 /// This performs a compare-and-swap on `next` using AcqRel ordering.
252 ///
253 /// # Safety
254 ///
255 /// To maintain safety, the caller must ensure:
256 ///
257 /// * `block` is not freed until it has been removed from the list.
try_push( &self, block: &mut NonNull<Block<T>>, success: Ordering, failure: Ordering, ) -> Result<(), NonNull<Block<T>>>258 pub(crate) unsafe fn try_push(
259 &self,
260 block: &mut NonNull<Block<T>>,
261 success: Ordering,
262 failure: Ordering,
263 ) -> Result<(), NonNull<Block<T>>> {
264 block.as_mut().start_index = self.start_index.wrapping_add(BLOCK_CAP);
265
266 let next_ptr = self
267 .next
268 .compare_exchange(ptr::null_mut(), block.as_ptr(), success, failure)
269 .unwrap_or_else(|x| x);
270
271 match NonNull::new(next_ptr) {
272 Some(next_ptr) => Err(next_ptr),
273 None => Ok(()),
274 }
275 }
276
277 /// Grows the `Block` linked list by allocating and appending a new block.
278 ///
279 /// The next block in the linked list is returned. This may or may not be
280 /// the one allocated by the function call.
281 ///
282 /// # Implementation
283 ///
284 /// It is assumed that `self.next` is null. A new block is allocated with
285 /// `start_index` set to be the next block. A compare-and-swap is performed
286 /// with AcqRel memory ordering. If the compare-and-swap is successful, the
287 /// newly allocated block is released to other threads walking the block
288 /// linked list. If the compare-and-swap fails, the current thread acquires
289 /// the next block in the linked list, allowing the current thread to access
290 /// the slots.
grow(&self) -> NonNull<Block<T>>291 pub(crate) fn grow(&self) -> NonNull<Block<T>> {
292 // Create the new block. It is assumed that the block will become the
293 // next one after `&self`. If this turns out to not be the case,
294 // `start_index` is updated accordingly.
295 let new_block = Box::new(Block::new(self.start_index + BLOCK_CAP));
296
297 let mut new_block = unsafe { NonNull::new_unchecked(Box::into_raw(new_block)) };
298
299 // Attempt to store the block. The first compare-and-swap attempt is
300 // "unrolled" due to minor differences in logic
301 //
302 // `AcqRel` is used as the ordering **only** when attempting the
303 // compare-and-swap on self.next.
304 //
305 // If the compare-and-swap fails, then the actual value of the cell is
306 // returned from this function and accessed by the caller. Given this,
307 // the memory must be acquired.
308 //
309 // `Release` ensures that the newly allocated block is available to
310 // other threads acquiring the next pointer.
311 let next = NonNull::new(
312 self.next
313 .compare_exchange(ptr::null_mut(), new_block.as_ptr(), AcqRel, Acquire)
314 .unwrap_or_else(|x| x),
315 );
316
317 let next = match next {
318 Some(next) => next,
319 None => {
320 // The compare-and-swap succeeded and the newly allocated block
321 // is successfully pushed.
322 return new_block;
323 }
324 };
325
326 // There already is a next block in the linked list. The newly allocated
327 // block could be dropped and the discovered next block returned;
328 // however, that would be wasteful. Instead, the linked list is walked
329 // by repeatedly attempting to compare-and-swap the pointer into the
330 // `next` register until the compare-and-swap succeed.
331 //
332 // Care is taken to update new_block's start_index field as appropriate.
333
334 let mut curr = next;
335
336 // TODO: Should this iteration be capped?
337 loop {
338 let actual = unsafe { curr.as_ref().try_push(&mut new_block, AcqRel, Acquire) };
339
340 curr = match actual {
341 Ok(_) => {
342 return next;
343 }
344 Err(curr) => curr,
345 };
346
347 // When running outside of loom, this calls `spin_loop_hint`.
348 thread::yield_now();
349 }
350 }
351 }
352
353 /// Returns `true` if the specified slot has a value ready to be consumed.
is_ready(bits: usize, slot: usize) -> bool354 fn is_ready(bits: usize, slot: usize) -> bool {
355 let mask = 1 << slot;
356 mask == mask & bits
357 }
358
359 /// Returns `true` if the closed flag has been set.
is_tx_closed(bits: usize) -> bool360 fn is_tx_closed(bits: usize) -> bool {
361 TX_CLOSED == bits & TX_CLOSED
362 }
363
364 impl<T> Values<T> {
uninitialized() -> Values<T>365 unsafe fn uninitialized() -> Values<T> {
366 let mut vals = MaybeUninit::uninit();
367
368 // When fuzzing, `UnsafeCell` needs to be initialized.
369 if_loom! {
370 let p = vals.as_mut_ptr() as *mut UnsafeCell<MaybeUninit<T>>;
371 for i in 0..BLOCK_CAP {
372 p.add(i)
373 .write(UnsafeCell::new(MaybeUninit::uninit()));
374 }
375 }
376
377 Values(vals.assume_init())
378 }
379 }
380
381 impl<T> ops::Index<usize> for Values<T> {
382 type Output = UnsafeCell<MaybeUninit<T>>;
383
index(&self, index: usize) -> &Self::Output384 fn index(&self, index: usize) -> &Self::Output {
385 self.0.index(index)
386 }
387 }
388