1 use std::{
2     io::{self, Read, Write},
3     mem::{self, MaybeUninit},
4     ptr::copy_nonoverlapping,
5     sync::{atomic::Ordering, Arc},
6 };
7 
8 use crate::{consumer::Consumer, ring_buffer::*};
9 
10 /// Producer part of ring buffer.
11 pub struct Producer<T> {
12     pub(crate) rb: Arc<RingBuffer<T>>,
13 }
14 
15 impl<T: Sized> Producer<T> {
16     /// Returns capacity of the ring buffer.
17     ///
18     /// The capacity of the buffer is constant.
capacity(&self) -> usize19     pub fn capacity(&self) -> usize {
20         self.rb.capacity()
21     }
22 
23     /// Checks if the ring buffer is empty.
24     ///
25     /// The result is relevant until you push items to the producer.
is_empty(&self) -> bool26     pub fn is_empty(&self) -> bool {
27         self.rb.is_empty()
28     }
29 
30     /// Checks if the ring buffer is full.
31     ///
32     /// *The result may become irrelevant at any time because of concurring activity of the consumer.*
is_full(&self) -> bool33     pub fn is_full(&self) -> bool {
34         self.rb.is_full()
35     }
36 
37     /// The length of the data stored in the buffer.
38     ///
39     /// Actual length may be equal to or less than the returned value.
len(&self) -> usize40     pub fn len(&self) -> usize {
41         self.rb.len()
42     }
43 
44     /// The remaining space in the buffer.
45     ///
46     /// Actual remaining space may be equal to or greater than the returning value.
remaining(&self) -> usize47     pub fn remaining(&self) -> usize {
48         self.rb.remaining()
49     }
50 
51     /// Allows to write into ring buffer memory directry.
52     ///
53     /// *This function is unsafe because it gives access to possibly uninitialized memory*
54     ///
55     /// The method takes a function `f` as argument.
56     /// `f` takes two slices of ring buffer content (the second one or both of them may be empty).
57     /// First slice contains older elements.
58     ///
59     /// `f` should return number of elements been written.
60     /// *There is no checks for returned number - it remains on the developer's conscience.*
61     ///
62     /// The method **always** calls `f` even if ring buffer is full.
63     ///
64     /// The method returns number returned from `f`.
push_access<F>(&mut self, f: F) -> usize where F: FnOnce(&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) -> usize,65     pub unsafe fn push_access<F>(&mut self, f: F) -> usize
66     where
67         F: FnOnce(&mut [MaybeUninit<T>], &mut [MaybeUninit<T>]) -> usize,
68     {
69         let head = self.rb.head.load(Ordering::Acquire);
70         let tail = self.rb.tail.load(Ordering::Acquire);
71         let len = self.rb.data.get_ref().len();
72 
73         let ranges = if tail >= head {
74             if head > 0 {
75                 (tail..len, 0..(head - 1))
76             } else if tail < len - 1 {
77                 (tail..(len - 1), 0..0)
78             } else {
79                 (0..0, 0..0)
80             }
81         } else if tail < head - 1 {
82             (tail..(head - 1), 0..0)
83         } else {
84             (0..0, 0..0)
85         };
86 
87         let slices = (
88             &mut self.rb.data.get_mut()[ranges.0],
89             &mut self.rb.data.get_mut()[ranges.1],
90         );
91 
92         let n = f(slices.0, slices.1);
93 
94         if n > 0 {
95             let new_tail = (tail + n) % len;
96             self.rb.tail.store(new_tail, Ordering::Release);
97         }
98         n
99     }
100 
101     /// Copies data from the slice to the ring buffer in byte-to-byte manner.
102     ///
103     /// The `elems` slice should contain **initialized** data before the method call.
104     /// After the call the copied part of data in `elems` should be interpreted as **un-initialized**.
105     ///
106     /// Returns the number of items been copied.
push_copy(&mut self, elems: &[MaybeUninit<T>]) -> usize107     pub unsafe fn push_copy(&mut self, elems: &[MaybeUninit<T>]) -> usize {
108         self.push_access(|left, right| -> usize {
109             if elems.len() < left.len() {
110                 copy_nonoverlapping(elems.as_ptr(), left.as_mut_ptr(), elems.len());
111                 elems.len()
112             } else {
113                 copy_nonoverlapping(elems.as_ptr(), left.as_mut_ptr(), left.len());
114                 if elems.len() < left.len() + right.len() {
115                     copy_nonoverlapping(
116                         elems.as_ptr().add(left.len()),
117                         right.as_mut_ptr(),
118                         elems.len() - left.len(),
119                     );
120                     elems.len()
121                 } else {
122                     copy_nonoverlapping(
123                         elems.as_ptr().add(left.len()),
124                         right.as_mut_ptr(),
125                         right.len(),
126                     );
127                     left.len() + right.len()
128                 }
129             }
130         })
131     }
132 
133     /// Appends an element to the ring buffer.
134     /// On failure returns an error containing the element that hasn't beed appended.
push(&mut self, elem: T) -> Result<(), T>135     pub fn push(&mut self, elem: T) -> Result<(), T> {
136         let mut elem_mu = MaybeUninit::new(elem);
137         let n = unsafe {
138             self.push_access(|slice, _| {
139                 if !slice.is_empty() {
140                     mem::swap(slice.get_unchecked_mut(0), &mut elem_mu);
141                     1
142                 } else {
143                     0
144                 }
145             })
146         };
147         match n {
148             0 => Err(unsafe { elem_mu.assume_init() }),
149             1 => Ok(()),
150             _ => unreachable!(),
151         }
152     }
153 
154     /// Repeatedly calls the closure `f` and pushes elements returned from it to the ring buffer.
155     ///
156     /// The closure is called until it returns `None` or the ring buffer is full.
157     ///
158     /// The method returns number of elements been put into the buffer.
push_each<F: FnMut() -> Option<T>>(&mut self, mut f: F) -> usize159     pub fn push_each<F: FnMut() -> Option<T>>(&mut self, mut f: F) -> usize {
160         unsafe {
161             self.push_access(|left, right| {
162                 for (i, dst) in left.iter_mut().enumerate() {
163                     match f() {
164                         Some(e) => mem::replace(dst, MaybeUninit::new(e)),
165                         None => return i,
166                     };
167                 }
168                 for (i, dst) in right.iter_mut().enumerate() {
169                     match f() {
170                         Some(e) => mem::replace(dst, MaybeUninit::new(e)),
171                         None => return i + left.len(),
172                     };
173                 }
174                 left.len() + right.len()
175             })
176         }
177     }
178 
179     /// Appends elements from an iterator to the ring buffer.
180     /// Elements that haven't been added to the ring buffer remain in the iterator.
181     ///
182     /// Returns count of elements been appended to the ring buffer.
push_iter<I: Iterator<Item = T>>(&mut self, elems: &mut I) -> usize183     pub fn push_iter<I: Iterator<Item = T>>(&mut self, elems: &mut I) -> usize {
184         self.push_each(|| elems.next())
185     }
186 
187     /// Removes at most `count` elements from the consumer and appends them to the producer.
188     /// If `count` is `None` then as much as possible elements will be moved.
189     /// The producer and consumer parts may be of different buffers as well as of the same one.
190     ///
191     /// On success returns number of elements been moved.
move_from(&mut self, other: &mut Consumer<T>, count: Option<usize>) -> usize192     pub fn move_from(&mut self, other: &mut Consumer<T>, count: Option<usize>) -> usize {
193         move_items(other, self, count)
194     }
195 }
196 
197 impl<T: Sized + Copy> Producer<T> {
198     /// Appends elements from slice to the ring buffer.
199     /// Elements should be [`Copy`](https://doc.rust-lang.org/std/marker/trait.Copy.html).
200     ///
201     /// Returns count of elements been appended to the ring buffer.
push_slice(&mut self, elems: &[T]) -> usize202     pub fn push_slice(&mut self, elems: &[T]) -> usize {
203         unsafe { self.push_copy(&*(elems as *const [T] as *const [MaybeUninit<T>])) }
204     }
205 }
206 
207 impl Producer<u8> {
208     /// Reads at most `count` bytes
209     /// from [`Read`](https://doc.rust-lang.org/std/io/trait.Read.html) instance
210     /// and appends them to the ring buffer.
211     /// If `count` is `None` then as much as possible bytes will be read.
212     ///
213     /// Returns `Ok(n)` if `read` is succeded. `n` is number of bytes been read.
214     /// `n == 0` means that either `read` returned zero or ring buffer is full.
215     ///
216     /// If `read` is failed then error is returned.
read_from(&mut self, reader: &mut dyn Read, count: Option<usize>) -> io::Result<usize>217     pub fn read_from(&mut self, reader: &mut dyn Read, count: Option<usize>) -> io::Result<usize> {
218         let mut err = None;
219         let n = unsafe {
220             self.push_access(|left, _| -> usize {
221                 let left = match count {
222                     Some(c) => {
223                         if c < left.len() {
224                             &mut left[0..c]
225                         } else {
226                             left
227                         }
228                     }
229                     None => left,
230                 };
231                 match reader
232                     .read(&mut *(left as *mut [MaybeUninit<u8>] as *mut [u8]))
233                     .and_then(|n| {
234                         if n <= left.len() {
235                             Ok(n)
236                         } else {
237                             Err(io::Error::new(
238                                 io::ErrorKind::InvalidInput,
239                                 "Read operation returned invalid number",
240                             ))
241                         }
242                     }) {
243                     Ok(n) => n,
244                     Err(e) => {
245                         err = Some(e);
246                         0
247                     }
248                 }
249             })
250         };
251         match err {
252             Some(e) => Err(e),
253             None => Ok(n),
254         }
255     }
256 }
257 
258 impl Write for Producer<u8> {
write(&mut self, buffer: &[u8]) -> io::Result<usize>259     fn write(&mut self, buffer: &[u8]) -> io::Result<usize> {
260         let n = self.push_slice(buffer);
261         if n == 0 && !buffer.is_empty() {
262             Err(io::Error::new(
263                 io::ErrorKind::WouldBlock,
264                 "Ring buffer is full",
265             ))
266         } else {
267             Ok(n)
268         }
269     }
270 
flush(&mut self) -> io::Result<()>271     fn flush(&mut self) -> io::Result<()> {
272         Ok(())
273     }
274 }
275