1 use std::{ 2 io::{self, Read, Write}, 3 mem::{self, MaybeUninit}, 4 ptr::copy_nonoverlapping, 5 sync::{atomic::Ordering, Arc}, 6 }; 7 8 use crate::{consumer::Consumer, ring_buffer::*}; 9 10 /// Producer part of ring buffer. 11 pub struct Producer<T> { 12 pub(crate) rb: Arc<RingBuffer<T>>, 13 } 14 15 impl<T: Sized> Producer<T> { 16 /// Returns capacity of the ring buffer. 17 /// 18 /// The capacity of the buffer is constant. capacity(&self) -> usize19 pub fn capacity(&self) -> usize { 20 self.rb.capacity() 21 } 22 23 /// Checks if the ring buffer is empty. 24 /// 25 /// The result is relevant until you push items to the producer. is_empty(&self) -> bool26 pub fn is_empty(&self) -> bool { 27 self.rb.is_empty() 28 } 29 30 /// Checks if the ring buffer is full. 31 /// 32 /// *The result may become irrelevant at any time because of concurring activity of the consumer.* is_full(&self) -> bool33 pub fn is_full(&self) -> bool { 34 self.rb.is_full() 35 } 36 37 /// The length of the data stored in the buffer. 38 /// 39 /// Actual length may be equal to or less than the returned value. len(&self) -> usize40 pub fn len(&self) -> usize { 41 self.rb.len() 42 } 43 44 /// The remaining space in the buffer. 45 /// 46 /// Actual remaining space may be equal to or greater than the returning value. remaining(&self) -> usize47 pub fn remaining(&self) -> usize { 48 self.rb.remaining() 49 } 50 51 /// Allows to write into ring buffer memory directry. 52 /// 53 /// *This function is unsafe because it gives access to possibly uninitialized memory* 54 /// 55 /// The method takes a function `f` as argument. 56 /// `f` takes two slices of ring buffer content (the second one or both of them may be empty). 57 /// First slice contains older elements. 58 /// 59 /// `f` should return number of elements been written. 60 /// *There is no checks for returned number - it remains on the developer's conscience.* 61 /// 62 /// The method **always** calls `f` even if ring buffer is full. 63 /// 64 /// The method returns number returned from `f`. push_access<F>(&mut self, f: F) -> usize where F: FnOnce(&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) -> usize,65 pub unsafe fn push_access<F>(&mut self, f: F) -> usize 66 where 67 F: FnOnce(&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) -> usize, 68 { 69 let head = self.rb.head.load(Ordering::Acquire); 70 let tail = self.rb.tail.load(Ordering::Acquire); 71 let len = self.rb.data.get_ref().len(); 72 73 let ranges = if tail >= head { 74 if head > 0 { 75 (tail..len, 0..(head - 1)) 76 } else if tail < len - 1 { 77 (tail..(len - 1), 0..0) 78 } else { 79 (0..0, 0..0) 80 } 81 } else if tail < head - 1 { 82 (tail..(head - 1), 0..0) 83 } else { 84 (0..0, 0..0) 85 }; 86 87 let slices = ( 88 &mut self.rb.data.get_mut()[ranges.0], 89 &mut self.rb.data.get_mut()[ranges.1], 90 ); 91 92 let n = f(slices.0, slices.1); 93 94 if n > 0 { 95 let new_tail = (tail + n) % len; 96 self.rb.tail.store(new_tail, Ordering::Release); 97 } 98 n 99 } 100 101 /// Copies data from the slice to the ring buffer in byte-to-byte manner. 102 /// 103 /// The `elems` slice should contain **initialized** data before the method call. 104 /// After the call the copied part of data in `elems` should be interpreted as **un-initialized**. 105 /// 106 /// Returns the number of items been copied. push_copy(&mut self, elems: &[MaybeUninit<T>]) -> usize107 pub unsafe fn push_copy(&mut self, elems: &[MaybeUninit<T>]) -> usize { 108 self.push_access(|left, right| -> usize { 109 if elems.len() < left.len() { 110 copy_nonoverlapping(elems.as_ptr(), left.as_mut_ptr(), elems.len()); 111 elems.len() 112 } else { 113 copy_nonoverlapping(elems.as_ptr(), left.as_mut_ptr(), left.len()); 114 if elems.len() < left.len() + right.len() { 115 copy_nonoverlapping( 116 elems.as_ptr().add(left.len()), 117 right.as_mut_ptr(), 118 elems.len() - left.len(), 119 ); 120 elems.len() 121 } else { 122 copy_nonoverlapping( 123 elems.as_ptr().add(left.len()), 124 right.as_mut_ptr(), 125 right.len(), 126 ); 127 left.len() + right.len() 128 } 129 } 130 }) 131 } 132 133 /// Appends an element to the ring buffer. 134 /// On failure returns an error containing the element that hasn't beed appended. push(&mut self, elem: T) -> Result<(), T>135 pub fn push(&mut self, elem: T) -> Result<(), T> { 136 let mut elem_mu = MaybeUninit::new(elem); 137 let n = unsafe { 138 self.push_access(|slice, _| { 139 if !slice.is_empty() { 140 mem::swap(slice.get_unchecked_mut(0), &mut elem_mu); 141 1 142 } else { 143 0 144 } 145 }) 146 }; 147 match n { 148 0 => Err(unsafe { elem_mu.assume_init() }), 149 1 => Ok(()), 150 _ => unreachable!(), 151 } 152 } 153 154 /// Repeatedly calls the closure `f` and pushes elements returned from it to the ring buffer. 155 /// 156 /// The closure is called until it returns `None` or the ring buffer is full. 157 /// 158 /// The method returns number of elements been put into the buffer. push_each<F: FnMut() -> Option<T>>(&mut self, mut f: F) -> usize159 pub fn push_each<F: FnMut() -> Option<T>>(&mut self, mut f: F) -> usize { 160 unsafe { 161 self.push_access(|left, right| { 162 for (i, dst) in left.iter_mut().enumerate() { 163 match f() { 164 Some(e) => mem::replace(dst, MaybeUninit::new(e)), 165 None => return i, 166 }; 167 } 168 for (i, dst) in right.iter_mut().enumerate() { 169 match f() { 170 Some(e) => mem::replace(dst, MaybeUninit::new(e)), 171 None => return i + left.len(), 172 }; 173 } 174 left.len() + right.len() 175 }) 176 } 177 } 178 179 /// Appends elements from an iterator to the ring buffer. 180 /// Elements that haven't been added to the ring buffer remain in the iterator. 181 /// 182 /// Returns count of elements been appended to the ring buffer. push_iter<I: Iterator<Item = T>>(&mut self, elems: &mut I) -> usize183 pub fn push_iter<I: Iterator<Item = T>>(&mut self, elems: &mut I) -> usize { 184 self.push_each(|| elems.next()) 185 } 186 187 /// Removes at most `count` elements from the consumer and appends them to the producer. 188 /// If `count` is `None` then as much as possible elements will be moved. 189 /// The producer and consumer parts may be of different buffers as well as of the same one. 190 /// 191 /// On success returns number of elements been moved. move_from(&mut self, other: &mut Consumer<T>, count: Option<usize>) -> usize192 pub fn move_from(&mut self, other: &mut Consumer<T>, count: Option<usize>) -> usize { 193 move_items(other, self, count) 194 } 195 } 196 197 impl<T: Sized + Copy> Producer<T> { 198 /// Appends elements from slice to the ring buffer. 199 /// Elements should be [`Copy`](https://doc.rust-lang.org/std/marker/trait.Copy.html). 200 /// 201 /// Returns count of elements been appended to the ring buffer. push_slice(&mut self, elems: &[T]) -> usize202 pub fn push_slice(&mut self, elems: &[T]) -> usize { 203 unsafe { self.push_copy(&*(elems as *const [T] as *const [MaybeUninit<T>])) } 204 } 205 } 206 207 impl Producer<u8> { 208 /// Reads at most `count` bytes 209 /// from [`Read`](https://doc.rust-lang.org/std/io/trait.Read.html) instance 210 /// and appends them to the ring buffer. 211 /// If `count` is `None` then as much as possible bytes will be read. 212 /// 213 /// Returns `Ok(n)` if `read` is succeded. `n` is number of bytes been read. 214 /// `n == 0` means that either `read` returned zero or ring buffer is full. 215 /// 216 /// If `read` is failed then error is returned. read_from(&mut self, reader: &mut dyn Read, count: Option<usize>) -> io::Result<usize>217 pub fn read_from(&mut self, reader: &mut dyn Read, count: Option<usize>) -> io::Result<usize> { 218 let mut err = None; 219 let n = unsafe { 220 self.push_access(|left, _| -> usize { 221 let left = match count { 222 Some(c) => { 223 if c < left.len() { 224 &mut left[0..c] 225 } else { 226 left 227 } 228 } 229 None => left, 230 }; 231 match reader 232 .read(&mut *(left as *mut [MaybeUninit<u8>] as *mut [u8])) 233 .and_then(|n| { 234 if n <= left.len() { 235 Ok(n) 236 } else { 237 Err(io::Error::new( 238 io::ErrorKind::InvalidInput, 239 "Read operation returned invalid number", 240 )) 241 } 242 }) { 243 Ok(n) => n, 244 Err(e) => { 245 err = Some(e); 246 0 247 } 248 } 249 }) 250 }; 251 match err { 252 Some(e) => Err(e), 253 None => Ok(n), 254 } 255 } 256 } 257 258 impl Write for Producer<u8> { write(&mut self, buffer: &[u8]) -> io::Result<usize>259 fn write(&mut self, buffer: &[u8]) -> io::Result<usize> { 260 let n = self.push_slice(buffer); 261 if n == 0 && !buffer.is_empty() { 262 Err(io::Error::new( 263 io::ErrorKind::WouldBlock, 264 "Ring buffer is full", 265 )) 266 } else { 267 Ok(n) 268 } 269 } 270 flush(&mut self) -> io::Result<()>271 fn flush(&mut self) -> io::Result<()> { 272 Ok(()) 273 } 274 } 275