1 use alloc::sync::Arc;
2 use core::{
3     mem::{self, MaybeUninit},
4     ptr::copy_nonoverlapping,
5     sync::atomic::Ordering,
6 };
7 #[cfg(feature = "std")]
8 use std::io::{self, Read, Write};
9 
10 use crate::{consumer::Consumer, ring_buffer::*};
11 
12 /// Producer part of ring buffer.
13 pub struct Producer<T> {
14     pub(crate) rb: Arc<RingBuffer<T>>,
15 }
16 
17 impl<T: Sized> Producer<T> {
18     /// Returns capacity of the ring buffer.
19     ///
20     /// The capacity of the buffer is constant.
capacity(&self) -> usize21     pub fn capacity(&self) -> usize {
22         self.rb.capacity()
23     }
24 
25     /// Checks if the ring buffer is empty.
26     ///
27     /// The result is relevant until you push items to the producer.
is_empty(&self) -> bool28     pub fn is_empty(&self) -> bool {
29         self.rb.is_empty()
30     }
31 
32     /// Checks if the ring buffer is full.
33     ///
34     /// *The result may become irrelevant at any time because of concurring activity of the consumer.*
is_full(&self) -> bool35     pub fn is_full(&self) -> bool {
36         self.rb.is_full()
37     }
38 
39     /// The length of the data stored in the buffer.
40     ///
41     /// Actual length may be equal to or less than the returned value.
len(&self) -> usize42     pub fn len(&self) -> usize {
43         self.rb.len()
44     }
45 
46     /// The remaining space in the buffer.
47     ///
48     /// Actual remaining space may be equal to or greater than the returning value.
remaining(&self) -> usize49     pub fn remaining(&self) -> usize {
50         self.rb.remaining()
51     }
52 
53     /// Allows to write into ring buffer memory directry.
54     ///
55     /// *This function is unsafe because it gives access to possibly uninitialized memory*
56     ///
57     /// The method takes a function `f` as argument.
58     /// `f` takes two slices of ring buffer content (the second one or both of them may be empty).
59     /// First slice contains older elements.
60     ///
61     /// `f` should return number of elements been written.
62     /// *There is no checks for returned number - it remains on the developer's conscience.*
63     ///
64     /// The method **always** calls `f` even if ring buffer is full.
65     ///
66     /// The method returns number returned from `f`.
67     ///
68     /// # Safety
69     ///
70     /// The method gives access to ring buffer underlying memory which may be uninitialized.
71     ///
push_access<F>(&mut self, f: F) -> usize where F: FnOnce(&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) -> usize,72     pub unsafe fn push_access<F>(&mut self, f: F) -> usize
73     where
74         F: FnOnce(&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) -> usize,
75     {
76         let head = self.rb.head.load(Ordering::Acquire);
77         let tail = self.rb.tail.load(Ordering::Acquire);
78         let len = self.rb.data.get_ref().len();
79 
80         let ranges = if tail >= head {
81             if head > 0 {
82                 (tail..len, 0..(head - 1))
83             } else if tail < len - 1 {
84                 (tail..(len - 1), 0..0)
85             } else {
86                 (0..0, 0..0)
87             }
88         } else if tail < head - 1 {
89             (tail..(head - 1), 0..0)
90         } else {
91             (0..0, 0..0)
92         };
93 
94         let slices = (
95             &mut self.rb.data.get_mut()[ranges.0],
96             &mut self.rb.data.get_mut()[ranges.1],
97         );
98 
99         let n = f(slices.0, slices.1);
100 
101         if n > 0 {
102             let new_tail = (tail + n) % len;
103             self.rb.tail.store(new_tail, Ordering::Release);
104         }
105         n
106     }
107 
108     /// Copies data from the slice to the ring buffer in byte-to-byte manner.
109     ///
110     /// The `elems` slice should contain **initialized** data before the method call.
111     /// After the call the copied part of data in `elems` should be interpreted as **un-initialized**.
112     ///
113     /// Returns the number of items been copied.
114     ///
115     /// # Safety
116     ///
117     /// The method copies raw data into the ring buffer.
118     ///
119     /// *You should properly fill the slice and manage remaining elements after copy.*
120     ///
push_copy(&mut self, elems: &[MaybeUninit<T>]) -> usize121     pub unsafe fn push_copy(&mut self, elems: &[MaybeUninit<T>]) -> usize {
122         self.push_access(|left, right| -> usize {
123             if elems.len() < left.len() {
124                 copy_nonoverlapping(elems.as_ptr(), left.as_mut_ptr(), elems.len());
125                 elems.len()
126             } else {
127                 copy_nonoverlapping(elems.as_ptr(), left.as_mut_ptr(), left.len());
128                 if elems.len() < left.len() + right.len() {
129                     copy_nonoverlapping(
130                         elems.as_ptr().add(left.len()),
131                         right.as_mut_ptr(),
132                         elems.len() - left.len(),
133                     );
134                     elems.len()
135                 } else {
136                     copy_nonoverlapping(
137                         elems.as_ptr().add(left.len()),
138                         right.as_mut_ptr(),
139                         right.len(),
140                     );
141                     left.len() + right.len()
142                 }
143             }
144         })
145     }
146 
147     /// Appends an element to the ring buffer.
148     /// On failure returns an error containing the element that hasn't beed appended.
push(&mut self, elem: T) -> Result<(), T>149     pub fn push(&mut self, elem: T) -> Result<(), T> {
150         let mut elem_mu = MaybeUninit::new(elem);
151         let n = unsafe {
152             self.push_access(|slice, _| {
153                 if !slice.is_empty() {
154                     mem::swap(slice.get_unchecked_mut(0), &mut elem_mu);
155                     1
156                 } else {
157                     0
158                 }
159             })
160         };
161         match n {
162             0 => Err(unsafe { elem_mu.assume_init() }),
163             1 => Ok(()),
164             _ => unreachable!(),
165         }
166     }
167 
168     /// Repeatedly calls the closure `f` and pushes elements returned from it to the ring buffer.
169     ///
170     /// The closure is called until it returns `None` or the ring buffer is full.
171     ///
172     /// The method returns number of elements been put into the buffer.
push_each<F: FnMut() -> Option<T>>(&mut self, mut f: F) -> usize173     pub fn push_each<F: FnMut() -> Option<T>>(&mut self, mut f: F) -> usize {
174         unsafe {
175             self.push_access(|left, right| {
176                 for (i, dst) in left.iter_mut().enumerate() {
177                     match f() {
178                         Some(e) => mem::replace(dst, MaybeUninit::new(e)),
179                         None => return i,
180                     };
181                 }
182                 for (i, dst) in right.iter_mut().enumerate() {
183                     match f() {
184                         Some(e) => mem::replace(dst, MaybeUninit::new(e)),
185                         None => return i + left.len(),
186                     };
187                 }
188                 left.len() + right.len()
189             })
190         }
191     }
192 
193     /// Appends elements from an iterator to the ring buffer.
194     /// Elements that haven't been added to the ring buffer remain in the iterator.
195     ///
196     /// Returns count of elements been appended to the ring buffer.
push_iter<I: Iterator<Item = T>>(&mut self, elems: &mut I) -> usize197     pub fn push_iter<I: Iterator<Item = T>>(&mut self, elems: &mut I) -> usize {
198         self.push_each(|| elems.next())
199     }
200 
201     /// Removes at most `count` elements from the consumer and appends them to the producer.
202     /// If `count` is `None` then as much as possible elements will be moved.
203     /// The producer and consumer parts may be of different buffers as well as of the same one.
204     ///
205     /// On success returns number of elements been moved.
move_from(&mut self, other: &mut Consumer<T>, count: Option<usize>) -> usize206     pub fn move_from(&mut self, other: &mut Consumer<T>, count: Option<usize>) -> usize {
207         move_items(other, self, count)
208     }
209 }
210 
211 impl<T: Sized + Copy> Producer<T> {
212     /// Appends elements from slice to the ring buffer.
213     /// Elements should be [`Copy`](https://doc.rust-lang.org/std/marker/trait.Copy.html).
214     ///
215     /// Returns count of elements been appended to the ring buffer.
push_slice(&mut self, elems: &[T]) -> usize216     pub fn push_slice(&mut self, elems: &[T]) -> usize {
217         unsafe { self.push_copy(&*(elems as *const [T] as *const [MaybeUninit<T>])) }
218     }
219 }
220 
221 #[cfg(feature = "std")]
222 impl Producer<u8> {
223     /// Reads at most `count` bytes
224     /// from [`Read`](https://doc.rust-lang.org/std/io/trait.Read.html) instance
225     /// and appends them to the ring buffer.
226     /// If `count` is `None` then as much as possible bytes will be read.
227     ///
228     /// Returns `Ok(n)` if `read` is succeded. `n` is number of bytes been read.
229     /// `n == 0` means that either `read` returned zero or ring buffer is full.
230     ///
231     /// If `read` is failed or returned an invalid number then error is returned.
read_from(&mut self, reader: &mut dyn Read, count: Option<usize>) -> io::Result<usize>232     pub fn read_from(&mut self, reader: &mut dyn Read, count: Option<usize>) -> io::Result<usize> {
233         let mut err = None;
234         let n = unsafe {
235             self.push_access(|left, _| -> usize {
236                 let left = match count {
237                     Some(c) => {
238                         if c < left.len() {
239                             &mut left[0..c]
240                         } else {
241                             left
242                         }
243                     }
244                     None => left,
245                 };
246                 match reader
247                     .read(&mut *(left as *mut [MaybeUninit<u8>] as *mut [u8]))
248                     .and_then(|n| {
249                         if n <= left.len() {
250                             Ok(n)
251                         } else {
252                             Err(io::Error::new(
253                                 io::ErrorKind::InvalidInput,
254                                 "Read operation returned an invalid number",
255                             ))
256                         }
257                     }) {
258                     Ok(n) => n,
259                     Err(e) => {
260                         err = Some(e);
261                         0
262                     }
263                 }
264             })
265         };
266         match err {
267             Some(e) => Err(e),
268             None => Ok(n),
269         }
270     }
271 }
272 
273 #[cfg(feature = "std")]
274 impl Write for Producer<u8> {
write(&mut self, buffer: &[u8]) -> io::Result<usize>275     fn write(&mut self, buffer: &[u8]) -> io::Result<usize> {
276         let n = self.push_slice(buffer);
277         if n == 0 && !buffer.is_empty() {
278             Err(io::ErrorKind::WouldBlock.into())
279         } else {
280             Ok(n)
281         }
282     }
283 
flush(&mut self) -> io::Result<()>284     fn flush(&mut self) -> io::Result<()> {
285         Ok(())
286     }
287 }
288