1 extern crate futures;
2 
3 use std::mem;
4 use std::sync::Arc;
5 use std::rc::Rc;
6 use std::cell::{Cell, RefCell};
7 use std::sync::atomic::{Ordering, AtomicBool};
8 
9 use futures::prelude::*;
10 use futures::future::ok;
11 use futures::stream;
12 use futures::sync::{oneshot, mpsc};
13 use futures::task::{self, Task};
14 use futures::executor::{self, Notify};
15 use futures::sink::SinkFromErr;
16 
17 mod support;
18 use support::*;
19 
20 #[test]
vec_sink()21 fn vec_sink() {
22     let mut v = Vec::new();
23     assert_eq!(v.start_send(0), Ok(AsyncSink::Ready));
24     assert_eq!(v.start_send(1), Ok(AsyncSink::Ready));
25     assert_eq!(v, vec![0, 1]);
26     assert_done(move || v.flush(), Ok(vec![0, 1]));
27 }
28 
29 #[test]
send()30 fn send() {
31     let v = Vec::new();
32 
33     let v = v.send(0).wait().unwrap();
34     assert_eq!(v, vec![0]);
35 
36     let v = v.send(1).wait().unwrap();
37     assert_eq!(v, vec![0, 1]);
38 
39     assert_done(move || v.send(2),
40                 Ok(vec![0, 1, 2]));
41 }
42 
43 #[test]
send_all()44 fn send_all() {
45     let v = Vec::new();
46 
47     let (v, _) = v.send_all(stream::iter_ok(vec![0, 1])).wait().unwrap();
48     assert_eq!(v, vec![0, 1]);
49 
50     let (v, _) = v.send_all(stream::iter_ok(vec![2, 3])).wait().unwrap();
51     assert_eq!(v, vec![0, 1, 2, 3]);
52 
53     assert_done(
54         move || v.send_all(stream::iter_ok(vec![4, 5])).map(|(v, _)| v),
55         Ok(vec![0, 1, 2, 3, 4, 5]));
56 }
57 
58 // An Unpark struct that records unpark events for inspection
59 struct Flag(pub AtomicBool);
60 
61 impl Flag {
new() -> Arc<Flag>62     fn new() -> Arc<Flag> {
63         Arc::new(Flag(AtomicBool::new(false)))
64     }
65 
get(&self) -> bool66     fn get(&self) -> bool {
67         self.0.load(Ordering::SeqCst)
68     }
69 
set(&self, v: bool)70     fn set(&self, v: bool) {
71         self.0.store(v, Ordering::SeqCst)
72     }
73 }
74 
75 impl Notify for Flag {
notify(&self, _id: usize)76     fn notify(&self, _id: usize) {
77         self.set(true)
78     }
79 }
80 
81 // Sends a value on an i32 channel sink
82 struct StartSendFut<S: Sink>(Option<S>, Option<S::SinkItem>);
83 
84 impl<S: Sink> StartSendFut<S> {
new(sink: S, item: S::SinkItem) -> StartSendFut<S>85     fn new(sink: S, item: S::SinkItem) -> StartSendFut<S> {
86         StartSendFut(Some(sink), Some(item))
87     }
88 }
89 
90 impl<S: Sink> Future for StartSendFut<S> {
91     type Item = S;
92     type Error = S::SinkError;
93 
poll(&mut self) -> Poll<S, S::SinkError>94     fn poll(&mut self) -> Poll<S, S::SinkError> {
95         match self.0.as_mut().unwrap().start_send(self.1.take().unwrap())? {
96             AsyncSink::Ready => Ok(Async::Ready(self.0.take().unwrap())),
97             AsyncSink::NotReady(item) => {
98                 self.1 = Some(item);
99                 Ok(Async::NotReady)
100             }
101         }
102 
103     }
104 }
105 
106 #[test]
107 // Test that `start_send` on an `mpsc` channel does indeed block when the
108 // channel is full
mpsc_blocking_start_send()109 fn mpsc_blocking_start_send() {
110     let (mut tx, mut rx) = mpsc::channel::<i32>(0);
111 
112     futures::future::lazy(|| {
113         assert_eq!(tx.start_send(0).unwrap(), AsyncSink::Ready);
114 
115         let flag = Flag::new();
116         let mut task = executor::spawn(StartSendFut::new(tx, 1));
117 
118         assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
119         assert!(!flag.get());
120         sassert_next(&mut rx, 0);
121         assert!(flag.get());
122         flag.set(false);
123         assert!(task.poll_future_notify(&flag, 0).unwrap().is_ready());
124         assert!(!flag.get());
125         sassert_next(&mut rx, 1);
126 
127         Ok::<(), ()>(())
128     }).wait().unwrap();
129 }
130 
131 #[test]
132 // test `flush` by using `with` to make the first insertion into a sink block
133 // until a oneshot is completed
with_flush()134 fn with_flush() {
135     let (tx, rx) = oneshot::channel();
136     let mut block = Box::new(rx) as Box<Future<Item = _, Error = _>>;
137     let mut sink = Vec::new().with(|elem| {
138         mem::replace(&mut block, Box::new(ok(())))
139             .map(move |_| elem + 1).map_err(|_| -> () { panic!() })
140     });
141 
142     assert_eq!(sink.start_send(0), Ok(AsyncSink::Ready));
143 
144     let flag = Flag::new();
145     let mut task = executor::spawn(sink.flush());
146     assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
147     tx.send(()).unwrap();
148     assert!(flag.get());
149 
150     let sink = match task.poll_future_notify(&flag, 0).unwrap() {
151         Async::Ready(sink) => sink,
152         _ => panic!()
153     };
154 
155     assert_eq!(sink.send(1).wait().unwrap().get_ref(), &[1, 2]);
156 }
157 
158 #[test]
159 // test simple use of with to change data
with_as_map()160 fn with_as_map() {
161     let sink = Vec::new().with(|item| -> Result<i32, ()> {
162         Ok(item * 2)
163     });
164     let sink = sink.send(0).wait().unwrap();
165     let sink = sink.send(1).wait().unwrap();
166     let sink = sink.send(2).wait().unwrap();
167     assert_eq!(sink.get_ref(), &[0, 2, 4]);
168 }
169 
170 #[test]
171 // test simple use of with_flat_map
with_flat_map()172 fn with_flat_map() {
173     let sink = Vec::new().with_flat_map(|item| {
174         stream::iter_ok(vec![item; item])
175     });
176     let sink = sink.send(0).wait().unwrap();
177     let sink = sink.send(1).wait().unwrap();
178     let sink = sink.send(2).wait().unwrap();
179     let sink = sink.send(3).wait().unwrap();
180     assert_eq!(sink.get_ref(), &[1,2,2,3,3,3]);
181 }
182 
183 // Immediately accepts all requests to start pushing, but completion is managed
184 // by manually flushing
185 struct ManualFlush<T> {
186     data: Vec<T>,
187     waiting_tasks: Vec<Task>,
188 }
189 
190 impl<T> Sink for ManualFlush<T> {
191     type SinkItem = Option<T>; // Pass None to flush
192     type SinkError = ();
193 
start_send(&mut self, op: Option<T>) -> StartSend<Option<T>, ()>194     fn start_send(&mut self, op: Option<T>) -> StartSend<Option<T>, ()> {
195         if let Some(item) = op {
196             self.data.push(item);
197         } else {
198             self.force_flush();
199         }
200         Ok(AsyncSink::Ready)
201     }
202 
poll_complete(&mut self) -> Poll<(), ()>203     fn poll_complete(&mut self) -> Poll<(), ()> {
204         if self.data.is_empty() {
205             Ok(Async::Ready(()))
206         } else {
207             self.waiting_tasks.push(task::current());
208             Ok(Async::NotReady)
209         }
210     }
211 
close(&mut self) -> Poll<(), ()>212     fn close(&mut self) -> Poll<(), ()> {
213         Ok(().into())
214     }
215 }
216 
217 impl<T> ManualFlush<T> {
new() -> ManualFlush<T>218     fn new() -> ManualFlush<T> {
219         ManualFlush {
220             data: Vec::new(),
221             waiting_tasks: Vec::new()
222         }
223     }
224 
force_flush(&mut self) -> Vec<T>225     fn force_flush(&mut self) -> Vec<T> {
226         for task in self.waiting_tasks.drain(..) {
227             task.notify()
228         }
229         mem::replace(&mut self.data, Vec::new())
230     }
231 }
232 
233 #[test]
234 // test that the `with` sink doesn't require the underlying sink to flush,
235 // but doesn't claim to be flushed until the underlying sink is
with_flush_propagate()236 fn with_flush_propagate() {
237     let mut sink = ManualFlush::new().with(|x| -> Result<Option<i32>, ()> { Ok(x) });
238     assert_eq!(sink.start_send(Some(0)).unwrap(), AsyncSink::Ready);
239     assert_eq!(sink.start_send(Some(1)).unwrap(), AsyncSink::Ready);
240 
241     let flag = Flag::new();
242     let mut task = executor::spawn(sink.flush());
243     assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
244     assert!(!flag.get());
245     assert_eq!(task.get_mut().get_mut().get_mut().force_flush(), vec![0, 1]);
246     assert!(flag.get());
247     assert!(task.poll_future_notify(&flag, 0).unwrap().is_ready());
248 }
249 
250 #[test]
251 // test that a buffer is a no-nop around a sink that always accepts sends
buffer_noop()252 fn buffer_noop() {
253     let sink = Vec::new().buffer(0);
254     let sink = sink.send(0).wait().unwrap();
255     let sink = sink.send(1).wait().unwrap();
256     assert_eq!(sink.get_ref(), &[0, 1]);
257 
258     let sink = Vec::new().buffer(1);
259     let sink = sink.send(0).wait().unwrap();
260     let sink = sink.send(1).wait().unwrap();
261     assert_eq!(sink.get_ref(), &[0, 1]);
262 }
263 
264 struct ManualAllow<T> {
265     data: Vec<T>,
266     allow: Rc<Allow>,
267 }
268 
269 struct Allow {
270     flag: Cell<bool>,
271     tasks: RefCell<Vec<Task>>,
272 }
273 
274 impl Allow {
new() -> Allow275     fn new() -> Allow {
276         Allow {
277             flag: Cell::new(false),
278             tasks: RefCell::new(Vec::new()),
279         }
280     }
281 
check(&self) -> bool282     fn check(&self) -> bool {
283         if self.flag.get() {
284             true
285         } else {
286             self.tasks.borrow_mut().push(task::current());
287             false
288         }
289     }
290 
start(&self)291     fn start(&self) {
292         self.flag.set(true);
293         let mut tasks = self.tasks.borrow_mut();
294         for task in tasks.drain(..) {
295             task.notify();
296         }
297     }
298 }
299 
300 impl<T> Sink for ManualAllow<T> {
301     type SinkItem = T;
302     type SinkError = ();
303 
start_send(&mut self, item: T) -> StartSend<T, ()>304     fn start_send(&mut self, item: T) -> StartSend<T, ()> {
305         if self.allow.check() {
306             self.data.push(item);
307             Ok(AsyncSink::Ready)
308         } else {
309             Ok(AsyncSink::NotReady(item))
310         }
311     }
312 
poll_complete(&mut self) -> Poll<(), ()>313     fn poll_complete(&mut self) -> Poll<(), ()> {
314         Ok(Async::Ready(()))
315     }
316 
close(&mut self) -> Poll<(), ()>317     fn close(&mut self) -> Poll<(), ()> {
318         Ok(().into())
319     }
320 }
321 
manual_allow<T>() -> (ManualAllow<T>, Rc<Allow>)322 fn manual_allow<T>() -> (ManualAllow<T>, Rc<Allow>) {
323     let allow = Rc::new(Allow::new());
324     let manual_allow = ManualAllow {
325         data: Vec::new(),
326         allow: allow.clone(),
327     };
328     (manual_allow, allow)
329 }
330 
331 #[test]
332 // test basic buffer functionality, including both filling up to capacity,
333 // and writing out when the underlying sink is ready
buffer()334 fn buffer() {
335     let (sink, allow) = manual_allow::<i32>();
336     let sink = sink.buffer(2);
337 
338     let sink = StartSendFut::new(sink, 0).wait().unwrap();
339     let sink = StartSendFut::new(sink, 1).wait().unwrap();
340 
341     let flag = Flag::new();
342     let mut task = executor::spawn(sink.send(2));
343     assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
344     assert!(!flag.get());
345     allow.start();
346     assert!(flag.get());
347     match task.poll_future_notify(&flag, 0).unwrap() {
348         Async::Ready(sink) => {
349             assert_eq!(sink.get_ref().data, vec![0, 1, 2]);
350         }
351         _ => panic!()
352     }
353 }
354 
355 #[test]
fanout_smoke()356 fn fanout_smoke() {
357     let sink1 = Vec::new();
358     let sink2 = Vec::new();
359     let sink = sink1.fanout(sink2);
360     let stream = futures::stream::iter_ok(vec![1,2,3]);
361     let (sink, _) = sink.send_all(stream).wait().unwrap();
362     let (sink1, sink2) = sink.into_inner();
363     assert_eq!(sink1, vec![1,2,3]);
364     assert_eq!(sink2, vec![1,2,3]);
365 }
366 
367 #[test]
fanout_backpressure()368 fn fanout_backpressure() {
369     let (left_send, left_recv) = mpsc::channel(0);
370     let (right_send, right_recv) = mpsc::channel(0);
371     let sink = left_send.fanout(right_send);
372 
373     let sink = StartSendFut::new(sink, 0).wait().unwrap();
374     let sink = StartSendFut::new(sink, 1).wait().unwrap();
375 
376     let flag = Flag::new();
377     let mut task = executor::spawn(sink.send(2));
378     assert!(!flag.get());
379     assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
380     let (item, left_recv) = left_recv.into_future().wait().unwrap();
381     assert_eq!(item, Some(0));
382     assert!(flag.get());
383     assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
384     let (item, right_recv) = right_recv.into_future().wait().unwrap();
385     assert_eq!(item, Some(0));
386     assert!(flag.get());
387     assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
388     let (item, left_recv) = left_recv.into_future().wait().unwrap();
389     assert_eq!(item, Some(1));
390     assert!(flag.get());
391     assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
392     let (item, right_recv) = right_recv.into_future().wait().unwrap();
393     assert_eq!(item, Some(1));
394     assert!(flag.get());
395     let (item, left_recv) = left_recv.into_future().wait().unwrap();
396     assert_eq!(item, Some(2));
397     assert!(flag.get());
398     assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
399     let (item, right_recv) = right_recv.into_future().wait().unwrap();
400     assert_eq!(item, Some(2));
401     match task.poll_future_notify(&flag, 0).unwrap() {
402         Async::Ready(_) => {
403         },
404         _ => panic!()
405     };
406     // make sure receivers live until end of test to prevent send errors
407     drop(left_recv);
408     drop(right_recv);
409 }
410 
411 #[test]
map_err()412 fn map_err() {
413     {
414         let (tx, _rx) = mpsc::channel(1);
415         let mut tx = tx.sink_map_err(|_| ());
416         assert_eq!(tx.start_send(()), Ok(AsyncSink::Ready));
417         assert_eq!(tx.poll_complete(), Ok(Async::Ready(())));
418     }
419 
420     let tx = mpsc::channel(0).0;
421     assert_eq!(tx.sink_map_err(|_| ()).start_send(()), Err(()));
422 }
423 
424 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
425 struct FromErrTest;
426 
427 impl<T> From<mpsc::SendError<T>> for FromErrTest {
from(_: mpsc::SendError<T>) -> FromErrTest428     fn from(_: mpsc::SendError<T>) -> FromErrTest {
429         FromErrTest
430     }
431 }
432 
433 #[test]
from_err()434 fn from_err() {
435     {
436         let (tx, _rx) = mpsc::channel(1);
437         let mut tx: SinkFromErr<mpsc::Sender<()>, FromErrTest> = tx.sink_from_err();
438         assert_eq!(tx.start_send(()), Ok(AsyncSink::Ready));
439         assert_eq!(tx.poll_complete(), Ok(Async::Ready(())));
440     }
441 
442     let tx = mpsc::channel(0).0;
443     assert_eq!(tx.sink_from_err().start_send(()), Err(FromErrTest));
444 }
445