1 use alloc::sync::Arc; 2 use core::{ 3 mem::{self, MaybeUninit}, 4 ptr::copy_nonoverlapping, 5 sync::atomic::Ordering, 6 }; 7 #[cfg(feature = "std")] 8 use std::io::{self, Read, Write}; 9 10 use crate::{consumer::Consumer, ring_buffer::*}; 11 12 /// Producer part of ring buffer. 13 pub struct Producer<T> { 14 pub(crate) rb: Arc<RingBuffer<T>>, 15 } 16 17 impl<T: Sized> Producer<T> { 18 /// Returns capacity of the ring buffer. 19 /// 20 /// The capacity of the buffer is constant. capacity(&self) -> usize21 pub fn capacity(&self) -> usize { 22 self.rb.capacity() 23 } 24 25 /// Checks if the ring buffer is empty. 26 /// 27 /// The result is relevant until you push items to the producer. is_empty(&self) -> bool28 pub fn is_empty(&self) -> bool { 29 self.rb.is_empty() 30 } 31 32 /// Checks if the ring buffer is full. 33 /// 34 /// *The result may become irrelevant at any time because of concurring activity of the consumer.* is_full(&self) -> bool35 pub fn is_full(&self) -> bool { 36 self.rb.is_full() 37 } 38 39 /// The length of the data stored in the buffer. 40 /// 41 /// Actual length may be equal to or less than the returned value. len(&self) -> usize42 pub fn len(&self) -> usize { 43 self.rb.len() 44 } 45 46 /// The remaining space in the buffer. 47 /// 48 /// Actual remaining space may be equal to or greater than the returning value. remaining(&self) -> usize49 pub fn remaining(&self) -> usize { 50 self.rb.remaining() 51 } 52 53 /// Allows to write into ring buffer memory directry. 54 /// 55 /// *This function is unsafe because it gives access to possibly uninitialized memory* 56 /// 57 /// The method takes a function `f` as argument. 58 /// `f` takes two slices of ring buffer content (the second one or both of them may be empty). 59 /// First slice contains older elements. 60 /// 61 /// `f` should return number of elements been written. 62 /// *There is no checks for returned number - it remains on the developer's conscience.* 63 /// 64 /// The method **always** calls `f` even if ring buffer is full. 65 /// 66 /// The method returns number returned from `f`. 67 /// 68 /// # Safety 69 /// 70 /// The method gives access to ring buffer underlying memory which may be uninitialized. 71 /// push_access<F>(&mut self, f: F) -> usize where F: FnOnce(&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) -> usize,72 pub unsafe fn push_access<F>(&mut self, f: F) -> usize 73 where 74 F: FnOnce(&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) -> usize, 75 { 76 let head = self.rb.head.load(Ordering::Acquire); 77 let tail = self.rb.tail.load(Ordering::Acquire); 78 let len = self.rb.data.get_ref().len(); 79 80 let ranges = if tail >= head { 81 if head > 0 { 82 (tail..len, 0..(head - 1)) 83 } else if tail < len - 1 { 84 (tail..(len - 1), 0..0) 85 } else { 86 (0..0, 0..0) 87 } 88 } else if tail < head - 1 { 89 (tail..(head - 1), 0..0) 90 } else { 91 (0..0, 0..0) 92 }; 93 94 let slices = ( 95 &mut self.rb.data.get_mut()[ranges.0], 96 &mut self.rb.data.get_mut()[ranges.1], 97 ); 98 99 let n = f(slices.0, slices.1); 100 101 if n > 0 { 102 let new_tail = (tail + n) % len; 103 self.rb.tail.store(new_tail, Ordering::Release); 104 } 105 n 106 } 107 108 /// Copies data from the slice to the ring buffer in byte-to-byte manner. 109 /// 110 /// The `elems` slice should contain **initialized** data before the method call. 111 /// After the call the copied part of data in `elems` should be interpreted as **un-initialized**. 112 /// 113 /// Returns the number of items been copied. 114 /// 115 /// # Safety 116 /// 117 /// The method copies raw data into the ring buffer. 118 /// 119 /// *You should properly fill the slice and manage remaining elements after copy.* 120 /// push_copy(&mut self, elems: &[MaybeUninit<T>]) -> usize121 pub unsafe fn push_copy(&mut self, elems: &[MaybeUninit<T>]) -> usize { 122 self.push_access(|left, right| -> usize { 123 if elems.len() < left.len() { 124 copy_nonoverlapping(elems.as_ptr(), left.as_mut_ptr(), elems.len()); 125 elems.len() 126 } else { 127 copy_nonoverlapping(elems.as_ptr(), left.as_mut_ptr(), left.len()); 128 if elems.len() < left.len() + right.len() { 129 copy_nonoverlapping( 130 elems.as_ptr().add(left.len()), 131 right.as_mut_ptr(), 132 elems.len() - left.len(), 133 ); 134 elems.len() 135 } else { 136 copy_nonoverlapping( 137 elems.as_ptr().add(left.len()), 138 right.as_mut_ptr(), 139 right.len(), 140 ); 141 left.len() + right.len() 142 } 143 } 144 }) 145 } 146 147 /// Appends an element to the ring buffer. 148 /// On failure returns an error containing the element that hasn't beed appended. push(&mut self, elem: T) -> Result<(), T>149 pub fn push(&mut self, elem: T) -> Result<(), T> { 150 let mut elem_mu = MaybeUninit::new(elem); 151 let n = unsafe { 152 self.push_access(|slice, _| { 153 if !slice.is_empty() { 154 mem::swap(slice.get_unchecked_mut(0), &mut elem_mu); 155 1 156 } else { 157 0 158 } 159 }) 160 }; 161 match n { 162 0 => Err(unsafe { elem_mu.assume_init() }), 163 1 => Ok(()), 164 _ => unreachable!(), 165 } 166 } 167 168 /// Repeatedly calls the closure `f` and pushes elements returned from it to the ring buffer. 169 /// 170 /// The closure is called until it returns `None` or the ring buffer is full. 171 /// 172 /// The method returns number of elements been put into the buffer. push_each<F: FnMut() -> Option<T>>(&mut self, mut f: F) -> usize173 pub fn push_each<F: FnMut() -> Option<T>>(&mut self, mut f: F) -> usize { 174 unsafe { 175 self.push_access(|left, right| { 176 for (i, dst) in left.iter_mut().enumerate() { 177 match f() { 178 Some(e) => mem::replace(dst, MaybeUninit::new(e)), 179 None => return i, 180 }; 181 } 182 for (i, dst) in right.iter_mut().enumerate() { 183 match f() { 184 Some(e) => mem::replace(dst, MaybeUninit::new(e)), 185 None => return i + left.len(), 186 }; 187 } 188 left.len() + right.len() 189 }) 190 } 191 } 192 193 /// Appends elements from an iterator to the ring buffer. 194 /// Elements that haven't been added to the ring buffer remain in the iterator. 195 /// 196 /// Returns count of elements been appended to the ring buffer. push_iter<I: Iterator<Item = T>>(&mut self, elems: &mut I) -> usize197 pub fn push_iter<I: Iterator<Item = T>>(&mut self, elems: &mut I) -> usize { 198 self.push_each(|| elems.next()) 199 } 200 201 /// Removes at most `count` elements from the consumer and appends them to the producer. 202 /// If `count` is `None` then as much as possible elements will be moved. 203 /// The producer and consumer parts may be of different buffers as well as of the same one. 204 /// 205 /// On success returns number of elements been moved. move_from(&mut self, other: &mut Consumer<T>, count: Option<usize>) -> usize206 pub fn move_from(&mut self, other: &mut Consumer<T>, count: Option<usize>) -> usize { 207 move_items(other, self, count) 208 } 209 } 210 211 impl<T: Sized + Copy> Producer<T> { 212 /// Appends elements from slice to the ring buffer. 213 /// Elements should be [`Copy`](https://doc.rust-lang.org/std/marker/trait.Copy.html). 214 /// 215 /// Returns count of elements been appended to the ring buffer. push_slice(&mut self, elems: &[T]) -> usize216 pub fn push_slice(&mut self, elems: &[T]) -> usize { 217 unsafe { self.push_copy(&*(elems as *const [T] as *const [MaybeUninit<T>])) } 218 } 219 } 220 221 #[cfg(feature = "std")] 222 impl Producer<u8> { 223 /// Reads at most `count` bytes 224 /// from [`Read`](https://doc.rust-lang.org/std/io/trait.Read.html) instance 225 /// and appends them to the ring buffer. 226 /// If `count` is `None` then as much as possible bytes will be read. 227 /// 228 /// Returns `Ok(n)` if `read` is succeded. `n` is number of bytes been read. 229 /// `n == 0` means that either `read` returned zero or ring buffer is full. 230 /// 231 /// If `read` is failed or returned an invalid number then error is returned. read_from(&mut self, reader: &mut dyn Read, count: Option<usize>) -> io::Result<usize>232 pub fn read_from(&mut self, reader: &mut dyn Read, count: Option<usize>) -> io::Result<usize> { 233 let mut err = None; 234 let n = unsafe { 235 self.push_access(|left, _| -> usize { 236 let left = match count { 237 Some(c) => { 238 if c < left.len() { 239 &mut left[0..c] 240 } else { 241 left 242 } 243 } 244 None => left, 245 }; 246 match reader 247 .read(&mut *(left as *mut [MaybeUninit<u8>] as *mut [u8])) 248 .and_then(|n| { 249 if n <= left.len() { 250 Ok(n) 251 } else { 252 Err(io::Error::new( 253 io::ErrorKind::InvalidInput, 254 "Read operation returned an invalid number", 255 )) 256 } 257 }) { 258 Ok(n) => n, 259 Err(e) => { 260 err = Some(e); 261 0 262 } 263 } 264 }) 265 }; 266 match err { 267 Some(e) => Err(e), 268 None => Ok(n), 269 } 270 } 271 } 272 273 #[cfg(feature = "std")] 274 impl Write for Producer<u8> { write(&mut self, buffer: &[u8]) -> io::Result<usize>275 fn write(&mut self, buffer: &[u8]) -> io::Result<usize> { 276 let n = self.push_slice(buffer); 277 if n == 0 && !buffer.is_empty() { 278 Err(io::ErrorKind::WouldBlock.into()) 279 } else { 280 Ok(n) 281 } 282 } 283 flush(&mut self) -> io::Result<()>284 fn flush(&mut self) -> io::Result<()> { 285 Ok(()) 286 } 287 } 288