1 #![allow(bare_trait_objects, unknown_lints)]
2 
3 extern crate futures;
4 
5 use std::mem;
6 use std::sync::Arc;
7 use std::rc::Rc;
8 use std::cell::{Cell, RefCell};
9 use std::sync::atomic::{Ordering, AtomicBool};
10 
11 use futures::prelude::*;
12 use futures::future::ok;
13 use futures::stream;
14 use futures::sync::{oneshot, mpsc};
15 use futures::task::{self, Task};
16 use futures::executor::{self, Notify};
17 use futures::sink::SinkFromErr;
18 
19 mod support;
20 use support::*;
21 
22 #[test]
vec_sink()23 fn vec_sink() {
24     let mut v = Vec::new();
25     assert_eq!(v.start_send(0), Ok(AsyncSink::Ready));
26     assert_eq!(v.start_send(1), Ok(AsyncSink::Ready));
27     assert_eq!(v, vec![0, 1]);
28     assert_done(move || v.flush(), Ok(vec![0, 1]));
29 }
30 
31 #[test]
send()32 fn send() {
33     let v = Vec::new();
34 
35     let v = v.send(0).wait().unwrap();
36     assert_eq!(v, vec![0]);
37 
38     let v = v.send(1).wait().unwrap();
39     assert_eq!(v, vec![0, 1]);
40 
41     assert_done(move || v.send(2),
42                 Ok(vec![0, 1, 2]));
43 }
44 
45 #[test]
send_all()46 fn send_all() {
47     let v = Vec::new();
48 
49     let (v, _) = v.send_all(stream::iter_ok(vec![0, 1])).wait().unwrap();
50     assert_eq!(v, vec![0, 1]);
51 
52     let (v, _) = v.send_all(stream::iter_ok(vec![2, 3])).wait().unwrap();
53     assert_eq!(v, vec![0, 1, 2, 3]);
54 
55     assert_done(
56         move || v.send_all(stream::iter_ok(vec![4, 5])).map(|(v, _)| v),
57         Ok(vec![0, 1, 2, 3, 4, 5]));
58 }
59 
60 // An Unpark struct that records unpark events for inspection
61 struct Flag(pub AtomicBool);
62 
63 impl Flag {
new() -> Arc<Flag>64     fn new() -> Arc<Flag> {
65         Arc::new(Flag(AtomicBool::new(false)))
66     }
67 
get(&self) -> bool68     fn get(&self) -> bool {
69         self.0.load(Ordering::SeqCst)
70     }
71 
set(&self, v: bool)72     fn set(&self, v: bool) {
73         self.0.store(v, Ordering::SeqCst)
74     }
75 }
76 
77 impl Notify for Flag {
notify(&self, _id: usize)78     fn notify(&self, _id: usize) {
79         self.set(true)
80     }
81 }
82 
83 // Sends a value on an i32 channel sink
84 struct StartSendFut<S: Sink>(Option<S>, Option<S::SinkItem>);
85 
86 impl<S: Sink> StartSendFut<S> {
new(sink: S, item: S::SinkItem) -> StartSendFut<S>87     fn new(sink: S, item: S::SinkItem) -> StartSendFut<S> {
88         StartSendFut(Some(sink), Some(item))
89     }
90 }
91 
92 impl<S: Sink> Future for StartSendFut<S> {
93     type Item = S;
94     type Error = S::SinkError;
95 
poll(&mut self) -> Poll<S, S::SinkError>96     fn poll(&mut self) -> Poll<S, S::SinkError> {
97         match self.0.as_mut().unwrap().start_send(self.1.take().unwrap())? {
98             AsyncSink::Ready => Ok(Async::Ready(self.0.take().unwrap())),
99             AsyncSink::NotReady(item) => {
100                 self.1 = Some(item);
101                 Ok(Async::NotReady)
102             }
103         }
104 
105     }
106 }
107 
108 #[test]
109 // Test that `start_send` on an `mpsc` channel does indeed block when the
110 // channel is full
mpsc_blocking_start_send()111 fn mpsc_blocking_start_send() {
112     let (mut tx, mut rx) = mpsc::channel::<i32>(0);
113 
114     futures::future::lazy(|| {
115         assert_eq!(tx.start_send(0).unwrap(), AsyncSink::Ready);
116 
117         let flag = Flag::new();
118         let mut task = executor::spawn(StartSendFut::new(tx, 1));
119 
120         assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
121         assert!(!flag.get());
122         sassert_next(&mut rx, 0);
123         assert!(flag.get());
124         flag.set(false);
125         assert!(task.poll_future_notify(&flag, 0).unwrap().is_ready());
126         assert!(!flag.get());
127         sassert_next(&mut rx, 1);
128 
129         Ok::<(), ()>(())
130     }).wait().unwrap();
131 }
132 
133 #[test]
134 // test `flush` by using `with` to make the first insertion into a sink block
135 // until a oneshot is completed
with_flush()136 fn with_flush() {
137     let (tx, rx) = oneshot::channel();
138     let mut block = Box::new(rx) as Box<Future<Item = _, Error = _>>;
139     let mut sink = Vec::new().with(|elem| {
140         mem::replace(&mut block, Box::new(ok(())))
141             .map(move |_| elem + 1).map_err(|_| -> () { panic!() })
142     });
143 
144     assert_eq!(sink.start_send(0), Ok(AsyncSink::Ready));
145 
146     let flag = Flag::new();
147     let mut task = executor::spawn(sink.flush());
148     assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
149     tx.send(()).unwrap();
150     assert!(flag.get());
151 
152     let sink = match task.poll_future_notify(&flag, 0).unwrap() {
153         Async::Ready(sink) => sink,
154         _ => panic!()
155     };
156 
157     assert_eq!(sink.send(1).wait().unwrap().get_ref(), &[1, 2]);
158 }
159 
160 #[test]
161 // test simple use of with to change data
with_as_map()162 fn with_as_map() {
163     let sink = Vec::new().with(|item| -> Result<i32, ()> {
164         Ok(item * 2)
165     });
166     let sink = sink.send(0).wait().unwrap();
167     let sink = sink.send(1).wait().unwrap();
168     let sink = sink.send(2).wait().unwrap();
169     assert_eq!(sink.get_ref(), &[0, 2, 4]);
170 }
171 
172 #[test]
173 // test simple use of with_flat_map
with_flat_map()174 fn with_flat_map() {
175     let sink = Vec::new().with_flat_map(|item| {
176         stream::iter_ok(vec![item; item])
177     });
178     let sink = sink.send(0).wait().unwrap();
179     let sink = sink.send(1).wait().unwrap();
180     let sink = sink.send(2).wait().unwrap();
181     let sink = sink.send(3).wait().unwrap();
182     assert_eq!(sink.get_ref(), &[1,2,2,3,3,3]);
183 }
184 
185 // Immediately accepts all requests to start pushing, but completion is managed
186 // by manually flushing
187 struct ManualFlush<T> {
188     data: Vec<T>,
189     waiting_tasks: Vec<Task>,
190 }
191 
192 impl<T> Sink for ManualFlush<T> {
193     type SinkItem = Option<T>; // Pass None to flush
194     type SinkError = ();
195 
start_send(&mut self, op: Option<T>) -> StartSend<Option<T>, ()>196     fn start_send(&mut self, op: Option<T>) -> StartSend<Option<T>, ()> {
197         if let Some(item) = op {
198             self.data.push(item);
199         } else {
200             self.force_flush();
201         }
202         Ok(AsyncSink::Ready)
203     }
204 
poll_complete(&mut self) -> Poll<(), ()>205     fn poll_complete(&mut self) -> Poll<(), ()> {
206         if self.data.is_empty() {
207             Ok(Async::Ready(()))
208         } else {
209             self.waiting_tasks.push(task::current());
210             Ok(Async::NotReady)
211         }
212     }
213 
close(&mut self) -> Poll<(), ()>214     fn close(&mut self) -> Poll<(), ()> {
215         Ok(().into())
216     }
217 }
218 
219 impl<T> ManualFlush<T> {
new() -> ManualFlush<T>220     fn new() -> ManualFlush<T> {
221         ManualFlush {
222             data: Vec::new(),
223             waiting_tasks: Vec::new()
224         }
225     }
226 
force_flush(&mut self) -> Vec<T>227     fn force_flush(&mut self) -> Vec<T> {
228         for task in self.waiting_tasks.drain(..) {
229             task.notify()
230         }
231         mem::replace(&mut self.data, Vec::new())
232     }
233 }
234 
235 #[test]
236 // test that the `with` sink doesn't require the underlying sink to flush,
237 // but doesn't claim to be flushed until the underlying sink is
with_flush_propagate()238 fn with_flush_propagate() {
239     let mut sink = ManualFlush::new().with(|x| -> Result<Option<i32>, ()> { Ok(x) });
240     assert_eq!(sink.start_send(Some(0)).unwrap(), AsyncSink::Ready);
241     assert_eq!(sink.start_send(Some(1)).unwrap(), AsyncSink::Ready);
242 
243     let flag = Flag::new();
244     let mut task = executor::spawn(sink.flush());
245     assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
246     assert!(!flag.get());
247     assert_eq!(task.get_mut().get_mut().get_mut().force_flush(), vec![0, 1]);
248     assert!(flag.get());
249     assert!(task.poll_future_notify(&flag, 0).unwrap().is_ready());
250 }
251 
252 #[test]
253 // test that a buffer is a no-nop around a sink that always accepts sends
buffer_noop()254 fn buffer_noop() {
255     let sink = Vec::new().buffer(0);
256     let sink = sink.send(0).wait().unwrap();
257     let sink = sink.send(1).wait().unwrap();
258     assert_eq!(sink.get_ref(), &[0, 1]);
259 
260     let sink = Vec::new().buffer(1);
261     let sink = sink.send(0).wait().unwrap();
262     let sink = sink.send(1).wait().unwrap();
263     assert_eq!(sink.get_ref(), &[0, 1]);
264 }
265 
266 struct ManualAllow<T> {
267     data: Vec<T>,
268     allow: Rc<Allow>,
269 }
270 
271 struct Allow {
272     flag: Cell<bool>,
273     tasks: RefCell<Vec<Task>>,
274 }
275 
276 impl Allow {
new() -> Allow277     fn new() -> Allow {
278         Allow {
279             flag: Cell::new(false),
280             tasks: RefCell::new(Vec::new()),
281         }
282     }
283 
check(&self) -> bool284     fn check(&self) -> bool {
285         if self.flag.get() {
286             true
287         } else {
288             self.tasks.borrow_mut().push(task::current());
289             false
290         }
291     }
292 
start(&self)293     fn start(&self) {
294         self.flag.set(true);
295         let mut tasks = self.tasks.borrow_mut();
296         for task in tasks.drain(..) {
297             task.notify();
298         }
299     }
300 }
301 
302 impl<T> Sink for ManualAllow<T> {
303     type SinkItem = T;
304     type SinkError = ();
305 
start_send(&mut self, item: T) -> StartSend<T, ()>306     fn start_send(&mut self, item: T) -> StartSend<T, ()> {
307         if self.allow.check() {
308             self.data.push(item);
309             Ok(AsyncSink::Ready)
310         } else {
311             Ok(AsyncSink::NotReady(item))
312         }
313     }
314 
poll_complete(&mut self) -> Poll<(), ()>315     fn poll_complete(&mut self) -> Poll<(), ()> {
316         Ok(Async::Ready(()))
317     }
318 
close(&mut self) -> Poll<(), ()>319     fn close(&mut self) -> Poll<(), ()> {
320         Ok(().into())
321     }
322 }
323 
manual_allow<T>() -> (ManualAllow<T>, Rc<Allow>)324 fn manual_allow<T>() -> (ManualAllow<T>, Rc<Allow>) {
325     let allow = Rc::new(Allow::new());
326     let manual_allow = ManualAllow {
327         data: Vec::new(),
328         allow: allow.clone(),
329     };
330     (manual_allow, allow)
331 }
332 
333 #[test]
334 // test basic buffer functionality, including both filling up to capacity,
335 // and writing out when the underlying sink is ready
buffer()336 fn buffer() {
337     let (sink, allow) = manual_allow::<i32>();
338     let sink = sink.buffer(2);
339 
340     let sink = StartSendFut::new(sink, 0).wait().unwrap();
341     let sink = StartSendFut::new(sink, 1).wait().unwrap();
342 
343     let flag = Flag::new();
344     let mut task = executor::spawn(sink.send(2));
345     assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
346     assert!(!flag.get());
347     allow.start();
348     assert!(flag.get());
349     match task.poll_future_notify(&flag, 0).unwrap() {
350         Async::Ready(sink) => {
351             assert_eq!(sink.get_ref().data, vec![0, 1, 2]);
352         }
353         _ => panic!()
354     }
355 }
356 
357 #[test]
fanout_smoke()358 fn fanout_smoke() {
359     let sink1 = Vec::new();
360     let sink2 = Vec::new();
361     let sink = sink1.fanout(sink2);
362     let stream = futures::stream::iter_ok(vec![1,2,3]);
363     let (sink, _) = sink.send_all(stream).wait().unwrap();
364     let (sink1, sink2) = sink.into_inner();
365     assert_eq!(sink1, vec![1,2,3]);
366     assert_eq!(sink2, vec![1,2,3]);
367 }
368 
369 #[test]
fanout_backpressure()370 fn fanout_backpressure() {
371     let (left_send, left_recv) = mpsc::channel(0);
372     let (right_send, right_recv) = mpsc::channel(0);
373     let sink = left_send.fanout(right_send);
374 
375     let sink = StartSendFut::new(sink, 0).wait().unwrap();
376     let sink = StartSendFut::new(sink, 1).wait().unwrap();
377 
378     let flag = Flag::new();
379     let mut task = executor::spawn(sink.send(2));
380     assert!(!flag.get());
381     assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
382     let (item, left_recv) = left_recv.into_future().wait().unwrap();
383     assert_eq!(item, Some(0));
384     assert!(flag.get());
385     assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
386     let (item, right_recv) = right_recv.into_future().wait().unwrap();
387     assert_eq!(item, Some(0));
388     assert!(flag.get());
389     assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
390     let (item, left_recv) = left_recv.into_future().wait().unwrap();
391     assert_eq!(item, Some(1));
392     assert!(flag.get());
393     assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
394     let (item, right_recv) = right_recv.into_future().wait().unwrap();
395     assert_eq!(item, Some(1));
396     assert!(flag.get());
397     let (item, left_recv) = left_recv.into_future().wait().unwrap();
398     assert_eq!(item, Some(2));
399     assert!(flag.get());
400     assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
401     let (item, right_recv) = right_recv.into_future().wait().unwrap();
402     assert_eq!(item, Some(2));
403     match task.poll_future_notify(&flag, 0).unwrap() {
404         Async::Ready(_) => {
405         },
406         _ => panic!()
407     };
408     // make sure receivers live until end of test to prevent send errors
409     drop(left_recv);
410     drop(right_recv);
411 }
412 
413 #[test]
map_err()414 fn map_err() {
415     {
416         let (tx, _rx) = mpsc::channel(1);
417         let mut tx = tx.sink_map_err(|_| ());
418         assert_eq!(tx.start_send(()), Ok(AsyncSink::Ready));
419         assert_eq!(tx.poll_complete(), Ok(Async::Ready(())));
420     }
421 
422     let tx = mpsc::channel(0).0;
423     assert_eq!(tx.sink_map_err(|_| ()).start_send(()), Err(()));
424 }
425 
426 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
427 struct FromErrTest;
428 
429 impl<T> From<mpsc::SendError<T>> for FromErrTest {
from(_: mpsc::SendError<T>) -> FromErrTest430     fn from(_: mpsc::SendError<T>) -> FromErrTest {
431         FromErrTest
432     }
433 }
434 
435 #[test]
from_err()436 fn from_err() {
437     {
438         let (tx, _rx) = mpsc::channel(1);
439         let mut tx: SinkFromErr<mpsc::Sender<()>, FromErrTest> = tx.sink_from_err();
440         assert_eq!(tx.start_send(()), Ok(AsyncSink::Ready));
441         assert_eq!(tx.poll_complete(), Ok(Async::Ready(())));
442     }
443 
444     let tx = mpsc::channel(0).0;
445     assert_eq!(tx.sink_from_err().start_send(()), Err(FromErrTest));
446 }
447