1 use futures::channel::{mpsc, oneshot};
2 use futures::executor::block_on;
3 use futures::future::{self, poll_fn, Future, FutureExt, TryFutureExt};
4 use futures::never::Never;
5 use futures::ready;
6 use futures::sink::{self, Sink, SinkErrInto, SinkExt};
7 use futures::stream::{self, Stream, StreamExt};
8 use futures::task::{self, ArcWake, Context, Poll, Waker};
9 use futures_test::task::panic_context;
10 use std::cell::{Cell, RefCell};
11 use std::collections::VecDeque;
12 use std::fmt;
13 use std::mem;
14 use std::pin::Pin;
15 use std::rc::Rc;
16 use std::sync::atomic::{AtomicBool, Ordering};
17 use std::sync::Arc;
18 
sassert_next<S>(s: &mut S, item: S::Item) where S: Stream + Unpin, S::Item: Eq + fmt::Debug,19 fn sassert_next<S>(s: &mut S, item: S::Item)
20 where
21     S: Stream + Unpin,
22     S::Item: Eq + fmt::Debug,
23 {
24     match s.poll_next_unpin(&mut panic_context()) {
25         Poll::Ready(None) => panic!("stream is at its end"),
26         Poll::Ready(Some(e)) => assert_eq!(e, item),
27         Poll::Pending => panic!("stream wasn't ready"),
28     }
29 }
30 
unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T31 fn unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T {
32     match x {
33         Poll::Ready(Ok(x)) => x,
34         Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"),
35         Poll::Pending => panic!("Poll::Pending"),
36     }
37 }
38 
39 // An Unpark struct that records unpark events for inspection
40 struct Flag(AtomicBool);
41 
42 impl Flag {
new() -> Arc<Self>43     fn new() -> Arc<Self> {
44         Arc::new(Self(AtomicBool::new(false)))
45     }
46 
take(&self) -> bool47     fn take(&self) -> bool {
48         self.0.swap(false, Ordering::SeqCst)
49     }
50 
set(&self, v: bool)51     fn set(&self, v: bool) {
52         self.0.store(v, Ordering::SeqCst)
53     }
54 }
55 
56 impl ArcWake for Flag {
wake_by_ref(arc_self: &Arc<Self>)57     fn wake_by_ref(arc_self: &Arc<Self>) {
58         arc_self.set(true)
59     }
60 }
61 
flag_cx<F, R>(f: F) -> R where F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R,62 fn flag_cx<F, R>(f: F) -> R
63 where
64     F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R,
65 {
66     let flag = Flag::new();
67     let waker = task::waker_ref(&flag);
68     let cx = &mut Context::from_waker(&waker);
69     f(flag.clone(), cx)
70 }
71 
72 // Sends a value on an i32 channel sink
73 struct StartSendFut<S: Sink<Item> + Unpin, Item: Unpin>(Option<S>, Option<Item>);
74 
75 impl<S: Sink<Item> + Unpin, Item: Unpin> StartSendFut<S, Item> {
new(sink: S, item: Item) -> Self76     fn new(sink: S, item: Item) -> Self {
77         Self(Some(sink), Some(item))
78     }
79 }
80 
81 impl<S: Sink<Item> + Unpin, Item: Unpin> Future for StartSendFut<S, Item> {
82     type Output = Result<S, S::Error>;
83 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>84     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
85         let Self(inner, item) = self.get_mut();
86         {
87             let mut inner = inner.as_mut().unwrap();
88             ready!(Pin::new(&mut inner).poll_ready(cx))?;
89             Pin::new(&mut inner).start_send(item.take().unwrap())?;
90         }
91         Poll::Ready(Ok(inner.take().unwrap()))
92     }
93 }
94 
95 // Immediately accepts all requests to start pushing, but completion is managed
96 // by manually flushing
97 struct ManualFlush<T: Unpin> {
98     data: Vec<T>,
99     waiting_tasks: Vec<Waker>,
100 }
101 
102 impl<T: Unpin> Sink<Option<T>> for ManualFlush<T> {
103     type Error = ();
104 
poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>105     fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
106         Poll::Ready(Ok(()))
107     }
108 
start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error>109     fn start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error> {
110         if let Some(item) = item {
111             self.data.push(item);
112         } else {
113             self.force_flush();
114         }
115         Ok(())
116     }
117 
poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>118     fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
119         if self.data.is_empty() {
120             Poll::Ready(Ok(()))
121         } else {
122             self.waiting_tasks.push(cx.waker().clone());
123             Poll::Pending
124         }
125     }
126 
poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>127     fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
128         self.poll_flush(cx)
129     }
130 }
131 
132 impl<T: Unpin> ManualFlush<T> {
new() -> Self133     fn new() -> Self {
134         Self { data: Vec::new(), waiting_tasks: Vec::new() }
135     }
136 
force_flush(&mut self) -> Vec<T>137     fn force_flush(&mut self) -> Vec<T> {
138         for task in self.waiting_tasks.drain(..) {
139             task.wake()
140         }
141         mem::replace(&mut self.data, Vec::new())
142     }
143 }
144 
145 struct ManualAllow<T: Unpin> {
146     data: Vec<T>,
147     allow: Rc<Allow>,
148 }
149 
150 struct Allow {
151     flag: Cell<bool>,
152     tasks: RefCell<Vec<Waker>>,
153 }
154 
155 impl Allow {
new() -> Self156     fn new() -> Self {
157         Self { flag: Cell::new(false), tasks: RefCell::new(Vec::new()) }
158     }
159 
check(&self, cx: &mut Context<'_>) -> bool160     fn check(&self, cx: &mut Context<'_>) -> bool {
161         if self.flag.get() {
162             true
163         } else {
164             self.tasks.borrow_mut().push(cx.waker().clone());
165             false
166         }
167     }
168 
start(&self)169     fn start(&self) {
170         self.flag.set(true);
171         let mut tasks = self.tasks.borrow_mut();
172         for task in tasks.drain(..) {
173             task.wake();
174         }
175     }
176 }
177 
178 impl<T: Unpin> Sink<T> for ManualAllow<T> {
179     type Error = ();
180 
poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>181     fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
182         if self.allow.check(cx) {
183             Poll::Ready(Ok(()))
184         } else {
185             Poll::Pending
186         }
187     }
188 
start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error>189     fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
190         self.data.push(item);
191         Ok(())
192     }
193 
poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>194     fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
195         Poll::Ready(Ok(()))
196     }
197 
poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>198     fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
199         Poll::Ready(Ok(()))
200     }
201 }
202 
manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>)203 fn manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>) {
204     let allow = Rc::new(Allow::new());
205     let manual_allow = ManualAllow { data: Vec::new(), allow: allow.clone() };
206     (manual_allow, allow)
207 }
208 
209 #[test]
either_sink()210 fn either_sink() {
211     let mut s =
212         if true { Vec::<i32>::new().left_sink() } else { VecDeque::<i32>::new().right_sink() };
213 
214     Pin::new(&mut s).start_send(0).unwrap();
215 }
216 
217 #[test]
vec_sink()218 fn vec_sink() {
219     let mut v = Vec::new();
220     Pin::new(&mut v).start_send(0).unwrap();
221     Pin::new(&mut v).start_send(1).unwrap();
222     assert_eq!(v, vec![0, 1]);
223     block_on(v.flush()).unwrap();
224     assert_eq!(v, vec![0, 1]);
225 }
226 
227 #[test]
vecdeque_sink()228 fn vecdeque_sink() {
229     let mut deque = VecDeque::new();
230     Pin::new(&mut deque).start_send(2).unwrap();
231     Pin::new(&mut deque).start_send(3).unwrap();
232 
233     assert_eq!(deque.pop_front(), Some(2));
234     assert_eq!(deque.pop_front(), Some(3));
235     assert_eq!(deque.pop_front(), None);
236 }
237 
238 #[test]
send()239 fn send() {
240     let mut v = Vec::new();
241 
242     block_on(v.send(0)).unwrap();
243     assert_eq!(v, vec![0]);
244 
245     block_on(v.send(1)).unwrap();
246     assert_eq!(v, vec![0, 1]);
247 
248     block_on(v.send(2)).unwrap();
249     assert_eq!(v, vec![0, 1, 2]);
250 }
251 
252 #[test]
send_all()253 fn send_all() {
254     let mut v = Vec::new();
255 
256     block_on(v.send_all(&mut stream::iter(vec![0, 1]).map(Ok))).unwrap();
257     assert_eq!(v, vec![0, 1]);
258 
259     block_on(v.send_all(&mut stream::iter(vec![2, 3]).map(Ok))).unwrap();
260     assert_eq!(v, vec![0, 1, 2, 3]);
261 
262     block_on(v.send_all(&mut stream::iter(vec![4, 5]).map(Ok))).unwrap();
263     assert_eq!(v, vec![0, 1, 2, 3, 4, 5]);
264 }
265 
266 // Test that `start_send` on an `mpsc` channel does indeed block when the
267 // channel is full
268 #[test]
mpsc_blocking_start_send()269 fn mpsc_blocking_start_send() {
270     let (mut tx, mut rx) = mpsc::channel::<i32>(0);
271 
272     block_on(future::lazy(|_| {
273         tx.start_send(0).unwrap();
274 
275         flag_cx(|flag, cx| {
276             let mut task = StartSendFut::new(tx, 1);
277 
278             assert!(task.poll_unpin(cx).is_pending());
279             assert!(!flag.take());
280             sassert_next(&mut rx, 0);
281             assert!(flag.take());
282             unwrap(task.poll_unpin(cx));
283             assert!(!flag.take());
284             sassert_next(&mut rx, 1);
285         })
286     }));
287 }
288 
289 // test `flush` by using `with` to make the first insertion into a sink block
290 // until a oneshot is completed
291 #[test]
with_flush()292 fn with_flush() {
293     let (tx, rx) = oneshot::channel();
294     let mut block = rx.boxed();
295     let mut sink = Vec::new().with(|elem| {
296         mem::replace(&mut block, future::ok(()).boxed())
297             .map_ok(move |()| elem + 1)
298             .map_err(|_| -> Never { panic!() })
299     });
300 
301     assert_eq!(Pin::new(&mut sink).start_send(0).ok(), Some(()));
302 
303     flag_cx(|flag, cx| {
304         let mut task = sink.flush();
305         assert!(task.poll_unpin(cx).is_pending());
306         tx.send(()).unwrap();
307         assert!(flag.take());
308 
309         unwrap(task.poll_unpin(cx));
310 
311         block_on(sink.send(1)).unwrap();
312         assert_eq!(sink.get_ref(), &[1, 2]);
313     })
314 }
315 
316 // test simple use of with to change data
317 #[test]
with_as_map()318 fn with_as_map() {
319     let mut sink = Vec::new().with(|item| future::ok::<i32, Never>(item * 2));
320     block_on(sink.send(0)).unwrap();
321     block_on(sink.send(1)).unwrap();
322     block_on(sink.send(2)).unwrap();
323     assert_eq!(sink.get_ref(), &[0, 2, 4]);
324 }
325 
326 // test simple use of with_flat_map
327 #[test]
with_flat_map()328 fn with_flat_map() {
329     let mut sink = Vec::new().with_flat_map(|item| stream::iter(vec![item; item]).map(Ok));
330     block_on(sink.send(0)).unwrap();
331     block_on(sink.send(1)).unwrap();
332     block_on(sink.send(2)).unwrap();
333     block_on(sink.send(3)).unwrap();
334     assert_eq!(sink.get_ref(), &[1, 2, 2, 3, 3, 3]);
335 }
336 
337 // Check that `with` propagates `poll_ready` to the inner sink.
338 // Regression test for the issue #1834.
339 #[test]
with_propagates_poll_ready()340 fn with_propagates_poll_ready() {
341     let (tx, mut rx) = mpsc::channel::<i32>(0);
342     let mut tx = tx.with(|item: i32| future::ok::<i32, mpsc::SendError>(item + 10));
343 
344     block_on(future::lazy(|_| {
345         flag_cx(|flag, cx| {
346             let mut tx = Pin::new(&mut tx);
347 
348             // Should be ready for the first item.
349             assert_eq!(tx.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
350             assert_eq!(tx.as_mut().start_send(0), Ok(()));
351 
352             // Should be ready for the second item only after the first one is received.
353             assert_eq!(tx.as_mut().poll_ready(cx), Poll::Pending);
354             assert!(!flag.take());
355             sassert_next(&mut rx, 10);
356             assert!(flag.take());
357             assert_eq!(tx.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
358             assert_eq!(tx.as_mut().start_send(1), Ok(()));
359         })
360     }));
361 }
362 
363 // test that the `with` sink doesn't require the underlying sink to flush,
364 // but doesn't claim to be flushed until the underlying sink is
365 #[test]
with_flush_propagate()366 fn with_flush_propagate() {
367     let mut sink = ManualFlush::new().with(future::ok::<Option<i32>, ()>);
368     flag_cx(|flag, cx| {
369         unwrap(Pin::new(&mut sink).poll_ready(cx));
370         Pin::new(&mut sink).start_send(Some(0)).unwrap();
371         unwrap(Pin::new(&mut sink).poll_ready(cx));
372         Pin::new(&mut sink).start_send(Some(1)).unwrap();
373 
374         {
375             let mut task = sink.flush();
376             assert!(task.poll_unpin(cx).is_pending());
377             assert!(!flag.take());
378         }
379         assert_eq!(sink.get_mut().force_flush(), vec![0, 1]);
380         assert!(flag.take());
381         unwrap(sink.flush().poll_unpin(cx));
382     })
383 }
384 
385 // test that `Clone` is implemented on `with` sinks
386 #[test]
with_implements_clone()387 fn with_implements_clone() {
388     let (mut tx, rx) = mpsc::channel(5);
389 
390     {
391         let mut is_positive = tx.clone().with(|item| future::ok::<bool, mpsc::SendError>(item > 0));
392 
393         let mut is_long =
394             tx.clone().with(|item: &str| future::ok::<bool, mpsc::SendError>(item.len() > 5));
395 
396         block_on(is_positive.clone().send(-1)).unwrap();
397         block_on(is_long.clone().send("123456")).unwrap();
398         block_on(is_long.send("123")).unwrap();
399         block_on(is_positive.send(1)).unwrap();
400     }
401 
402     block_on(tx.send(false)).unwrap();
403 
404     block_on(tx.close()).unwrap();
405 
406     assert_eq!(block_on(rx.collect::<Vec<_>>()), vec![false, true, false, true, false]);
407 }
408 
409 // test that a buffer is a no-nop around a sink that always accepts sends
410 #[test]
buffer_noop()411 fn buffer_noop() {
412     let mut sink = Vec::new().buffer(0);
413     block_on(sink.send(0)).unwrap();
414     block_on(sink.send(1)).unwrap();
415     assert_eq!(sink.get_ref(), &[0, 1]);
416 
417     let mut sink = Vec::new().buffer(1);
418     block_on(sink.send(0)).unwrap();
419     block_on(sink.send(1)).unwrap();
420     assert_eq!(sink.get_ref(), &[0, 1]);
421 }
422 
423 // test basic buffer functionality, including both filling up to capacity,
424 // and writing out when the underlying sink is ready
425 #[test]
buffer()426 fn buffer() {
427     let (sink, allow) = manual_allow::<i32>();
428     let sink = sink.buffer(2);
429 
430     let sink = block_on(StartSendFut::new(sink, 0)).unwrap();
431     let mut sink = block_on(StartSendFut::new(sink, 1)).unwrap();
432 
433     flag_cx(|flag, cx| {
434         let mut task = sink.send(2);
435         assert!(task.poll_unpin(cx).is_pending());
436         assert!(!flag.take());
437         allow.start();
438         assert!(flag.take());
439         unwrap(task.poll_unpin(cx));
440         assert_eq!(sink.get_ref().data, vec![0, 1, 2]);
441     })
442 }
443 
444 #[test]
fanout_smoke()445 fn fanout_smoke() {
446     let sink1 = Vec::new();
447     let sink2 = Vec::new();
448     let mut sink = sink1.fanout(sink2);
449     block_on(sink.send_all(&mut stream::iter(vec![1, 2, 3]).map(Ok))).unwrap();
450     let (sink1, sink2) = sink.into_inner();
451     assert_eq!(sink1, vec![1, 2, 3]);
452     assert_eq!(sink2, vec![1, 2, 3]);
453 }
454 
455 #[test]
fanout_backpressure()456 fn fanout_backpressure() {
457     let (left_send, mut left_recv) = mpsc::channel(0);
458     let (right_send, mut right_recv) = mpsc::channel(0);
459     let sink = left_send.fanout(right_send);
460 
461     let mut sink = block_on(StartSendFut::new(sink, 0)).unwrap();
462 
463     flag_cx(|flag, cx| {
464         let mut task = sink.send(2);
465         assert!(!flag.take());
466         assert!(task.poll_unpin(cx).is_pending());
467         assert_eq!(block_on(left_recv.next()), Some(0));
468         assert!(flag.take());
469         assert!(task.poll_unpin(cx).is_pending());
470         assert_eq!(block_on(right_recv.next()), Some(0));
471         assert!(flag.take());
472 
473         assert!(task.poll_unpin(cx).is_pending());
474         assert_eq!(block_on(left_recv.next()), Some(2));
475         assert!(flag.take());
476         assert!(task.poll_unpin(cx).is_pending());
477         assert_eq!(block_on(right_recv.next()), Some(2));
478         assert!(flag.take());
479 
480         unwrap(task.poll_unpin(cx));
481         // make sure receivers live until end of test to prevent send errors
482         drop(left_recv);
483         drop(right_recv);
484     })
485 }
486 
487 #[test]
sink_map_err()488 fn sink_map_err() {
489     {
490         let cx = &mut panic_context();
491         let (tx, _rx) = mpsc::channel(1);
492         let mut tx = tx.sink_map_err(|_| ());
493         assert_eq!(Pin::new(&mut tx).start_send(()), Ok(()));
494         assert_eq!(Pin::new(&mut tx).poll_flush(cx), Poll::Ready(Ok(())));
495     }
496 
497     let tx = mpsc::channel(0).0;
498     assert_eq!(Pin::new(&mut tx.sink_map_err(|_| ())).start_send(()), Err(()));
499 }
500 
501 #[test]
sink_unfold()502 fn sink_unfold() {
503     block_on(poll_fn(|cx| {
504         let (tx, mut rx) = mpsc::channel(1);
505         let unfold = sink::unfold((), |(), i: i32| {
506             let mut tx = tx.clone();
507             async move {
508                 tx.send(i).await.unwrap();
509                 Ok::<_, String>(())
510             }
511         });
512         futures::pin_mut!(unfold);
513         assert_eq!(unfold.as_mut().start_send(1), Ok(()));
514         assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Ready(Ok(())));
515         assert_eq!(rx.try_next().unwrap(), Some(1));
516 
517         assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
518         assert_eq!(unfold.as_mut().start_send(2), Ok(()));
519         assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
520         assert_eq!(unfold.as_mut().start_send(3), Ok(()));
521         assert_eq!(rx.try_next().unwrap(), Some(2));
522         assert!(rx.try_next().is_err());
523         assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
524         assert_eq!(unfold.as_mut().start_send(4), Ok(()));
525         assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Pending); // Channel full
526         assert_eq!(rx.try_next().unwrap(), Some(3));
527         assert_eq!(rx.try_next().unwrap(), Some(4));
528 
529         Poll::Ready(())
530     }))
531 }
532 
533 #[test]
err_into()534 fn err_into() {
535     #[derive(Copy, Clone, Debug, PartialEq, Eq)]
536     struct ErrIntoTest;
537 
538     impl From<mpsc::SendError> for ErrIntoTest {
539         fn from(_: mpsc::SendError) -> Self {
540             Self
541         }
542     }
543 
544     {
545         let cx = &mut panic_context();
546         let (tx, _rx) = mpsc::channel(1);
547         let mut tx: SinkErrInto<mpsc::Sender<()>, _, ErrIntoTest> = tx.sink_err_into();
548         assert_eq!(Pin::new(&mut tx).start_send(()), Ok(()));
549         assert_eq!(Pin::new(&mut tx).poll_flush(cx), Poll::Ready(Ok(())));
550     }
551 
552     let tx = mpsc::channel(0).0;
553     assert_eq!(Pin::new(&mut tx.sink_err_into()).start_send(()), Err(ErrIntoTest));
554 }
555