1 extern crate futures;
2
3 use std::mem;
4 use std::sync::Arc;
5 use std::rc::Rc;
6 use std::cell::{Cell, RefCell};
7 use std::sync::atomic::{Ordering, AtomicBool};
8
9 use futures::prelude::*;
10 use futures::future::ok;
11 use futures::stream;
12 use futures::sync::{oneshot, mpsc};
13 use futures::task::{self, Task};
14 use futures::executor::{self, Notify};
15 use futures::sink::SinkFromErr;
16
17 mod support;
18 use support::*;
19
20 #[test]
vec_sink()21 fn vec_sink() {
22 let mut v = Vec::new();
23 assert_eq!(v.start_send(0), Ok(AsyncSink::Ready));
24 assert_eq!(v.start_send(1), Ok(AsyncSink::Ready));
25 assert_eq!(v, vec![0, 1]);
26 assert_done(move || v.flush(), Ok(vec![0, 1]));
27 }
28
29 #[test]
send()30 fn send() {
31 let v = Vec::new();
32
33 let v = v.send(0).wait().unwrap();
34 assert_eq!(v, vec![0]);
35
36 let v = v.send(1).wait().unwrap();
37 assert_eq!(v, vec![0, 1]);
38
39 assert_done(move || v.send(2),
40 Ok(vec![0, 1, 2]));
41 }
42
43 #[test]
send_all()44 fn send_all() {
45 let v = Vec::new();
46
47 let (v, _) = v.send_all(stream::iter_ok(vec![0, 1])).wait().unwrap();
48 assert_eq!(v, vec![0, 1]);
49
50 let (v, _) = v.send_all(stream::iter_ok(vec![2, 3])).wait().unwrap();
51 assert_eq!(v, vec![0, 1, 2, 3]);
52
53 assert_done(
54 move || v.send_all(stream::iter_ok(vec![4, 5])).map(|(v, _)| v),
55 Ok(vec![0, 1, 2, 3, 4, 5]));
56 }
57
58 // An Unpark struct that records unpark events for inspection
59 struct Flag(pub AtomicBool);
60
61 impl Flag {
new() -> Arc<Flag>62 fn new() -> Arc<Flag> {
63 Arc::new(Flag(AtomicBool::new(false)))
64 }
65
get(&self) -> bool66 fn get(&self) -> bool {
67 self.0.load(Ordering::SeqCst)
68 }
69
set(&self, v: bool)70 fn set(&self, v: bool) {
71 self.0.store(v, Ordering::SeqCst)
72 }
73 }
74
75 impl Notify for Flag {
notify(&self, _id: usize)76 fn notify(&self, _id: usize) {
77 self.set(true)
78 }
79 }
80
81 // Sends a value on an i32 channel sink
82 struct StartSendFut<S: Sink>(Option<S>, Option<S::SinkItem>);
83
84 impl<S: Sink> StartSendFut<S> {
new(sink: S, item: S::SinkItem) -> StartSendFut<S>85 fn new(sink: S, item: S::SinkItem) -> StartSendFut<S> {
86 StartSendFut(Some(sink), Some(item))
87 }
88 }
89
90 impl<S: Sink> Future for StartSendFut<S> {
91 type Item = S;
92 type Error = S::SinkError;
93
poll(&mut self) -> Poll<S, S::SinkError>94 fn poll(&mut self) -> Poll<S, S::SinkError> {
95 match self.0.as_mut().unwrap().start_send(self.1.take().unwrap())? {
96 AsyncSink::Ready => Ok(Async::Ready(self.0.take().unwrap())),
97 AsyncSink::NotReady(item) => {
98 self.1 = Some(item);
99 Ok(Async::NotReady)
100 }
101 }
102
103 }
104 }
105
106 #[test]
107 // Test that `start_send` on an `mpsc` channel does indeed block when the
108 // channel is full
mpsc_blocking_start_send()109 fn mpsc_blocking_start_send() {
110 let (mut tx, mut rx) = mpsc::channel::<i32>(0);
111
112 futures::future::lazy(|| {
113 assert_eq!(tx.start_send(0).unwrap(), AsyncSink::Ready);
114
115 let flag = Flag::new();
116 let mut task = executor::spawn(StartSendFut::new(tx, 1));
117
118 assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
119 assert!(!flag.get());
120 sassert_next(&mut rx, 0);
121 assert!(flag.get());
122 flag.set(false);
123 assert!(task.poll_future_notify(&flag, 0).unwrap().is_ready());
124 assert!(!flag.get());
125 sassert_next(&mut rx, 1);
126
127 Ok::<(), ()>(())
128 }).wait().unwrap();
129 }
130
131 #[test]
132 // test `flush` by using `with` to make the first insertion into a sink block
133 // until a oneshot is completed
with_flush()134 fn with_flush() {
135 let (tx, rx) = oneshot::channel();
136 let mut block = Box::new(rx) as Box<Future<Item = _, Error = _>>;
137 let mut sink = Vec::new().with(|elem| {
138 mem::replace(&mut block, Box::new(ok(())))
139 .map(move |_| elem + 1).map_err(|_| -> () { panic!() })
140 });
141
142 assert_eq!(sink.start_send(0), Ok(AsyncSink::Ready));
143
144 let flag = Flag::new();
145 let mut task = executor::spawn(sink.flush());
146 assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
147 tx.send(()).unwrap();
148 assert!(flag.get());
149
150 let sink = match task.poll_future_notify(&flag, 0).unwrap() {
151 Async::Ready(sink) => sink,
152 _ => panic!()
153 };
154
155 assert_eq!(sink.send(1).wait().unwrap().get_ref(), &[1, 2]);
156 }
157
158 #[test]
159 // test simple use of with to change data
with_as_map()160 fn with_as_map() {
161 let sink = Vec::new().with(|item| -> Result<i32, ()> {
162 Ok(item * 2)
163 });
164 let sink = sink.send(0).wait().unwrap();
165 let sink = sink.send(1).wait().unwrap();
166 let sink = sink.send(2).wait().unwrap();
167 assert_eq!(sink.get_ref(), &[0, 2, 4]);
168 }
169
170 #[test]
171 // test simple use of with_flat_map
with_flat_map()172 fn with_flat_map() {
173 let sink = Vec::new().with_flat_map(|item| {
174 stream::iter_ok(vec![item; item])
175 });
176 let sink = sink.send(0).wait().unwrap();
177 let sink = sink.send(1).wait().unwrap();
178 let sink = sink.send(2).wait().unwrap();
179 let sink = sink.send(3).wait().unwrap();
180 assert_eq!(sink.get_ref(), &[1,2,2,3,3,3]);
181 }
182
183 // Immediately accepts all requests to start pushing, but completion is managed
184 // by manually flushing
185 struct ManualFlush<T> {
186 data: Vec<T>,
187 waiting_tasks: Vec<Task>,
188 }
189
190 impl<T> Sink for ManualFlush<T> {
191 type SinkItem = Option<T>; // Pass None to flush
192 type SinkError = ();
193
start_send(&mut self, op: Option<T>) -> StartSend<Option<T>, ()>194 fn start_send(&mut self, op: Option<T>) -> StartSend<Option<T>, ()> {
195 if let Some(item) = op {
196 self.data.push(item);
197 } else {
198 self.force_flush();
199 }
200 Ok(AsyncSink::Ready)
201 }
202
poll_complete(&mut self) -> Poll<(), ()>203 fn poll_complete(&mut self) -> Poll<(), ()> {
204 if self.data.is_empty() {
205 Ok(Async::Ready(()))
206 } else {
207 self.waiting_tasks.push(task::current());
208 Ok(Async::NotReady)
209 }
210 }
211
close(&mut self) -> Poll<(), ()>212 fn close(&mut self) -> Poll<(), ()> {
213 Ok(().into())
214 }
215 }
216
217 impl<T> ManualFlush<T> {
new() -> ManualFlush<T>218 fn new() -> ManualFlush<T> {
219 ManualFlush {
220 data: Vec::new(),
221 waiting_tasks: Vec::new()
222 }
223 }
224
force_flush(&mut self) -> Vec<T>225 fn force_flush(&mut self) -> Vec<T> {
226 for task in self.waiting_tasks.drain(..) {
227 task.notify()
228 }
229 mem::replace(&mut self.data, Vec::new())
230 }
231 }
232
233 #[test]
234 // test that the `with` sink doesn't require the underlying sink to flush,
235 // but doesn't claim to be flushed until the underlying sink is
with_flush_propagate()236 fn with_flush_propagate() {
237 let mut sink = ManualFlush::new().with(|x| -> Result<Option<i32>, ()> { Ok(x) });
238 assert_eq!(sink.start_send(Some(0)).unwrap(), AsyncSink::Ready);
239 assert_eq!(sink.start_send(Some(1)).unwrap(), AsyncSink::Ready);
240
241 let flag = Flag::new();
242 let mut task = executor::spawn(sink.flush());
243 assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
244 assert!(!flag.get());
245 assert_eq!(task.get_mut().get_mut().get_mut().force_flush(), vec![0, 1]);
246 assert!(flag.get());
247 assert!(task.poll_future_notify(&flag, 0).unwrap().is_ready());
248 }
249
250 #[test]
251 // test that a buffer is a no-nop around a sink that always accepts sends
buffer_noop()252 fn buffer_noop() {
253 let sink = Vec::new().buffer(0);
254 let sink = sink.send(0).wait().unwrap();
255 let sink = sink.send(1).wait().unwrap();
256 assert_eq!(sink.get_ref(), &[0, 1]);
257
258 let sink = Vec::new().buffer(1);
259 let sink = sink.send(0).wait().unwrap();
260 let sink = sink.send(1).wait().unwrap();
261 assert_eq!(sink.get_ref(), &[0, 1]);
262 }
263
264 struct ManualAllow<T> {
265 data: Vec<T>,
266 allow: Rc<Allow>,
267 }
268
269 struct Allow {
270 flag: Cell<bool>,
271 tasks: RefCell<Vec<Task>>,
272 }
273
274 impl Allow {
new() -> Allow275 fn new() -> Allow {
276 Allow {
277 flag: Cell::new(false),
278 tasks: RefCell::new(Vec::new()),
279 }
280 }
281
check(&self) -> bool282 fn check(&self) -> bool {
283 if self.flag.get() {
284 true
285 } else {
286 self.tasks.borrow_mut().push(task::current());
287 false
288 }
289 }
290
start(&self)291 fn start(&self) {
292 self.flag.set(true);
293 let mut tasks = self.tasks.borrow_mut();
294 for task in tasks.drain(..) {
295 task.notify();
296 }
297 }
298 }
299
300 impl<T> Sink for ManualAllow<T> {
301 type SinkItem = T;
302 type SinkError = ();
303
start_send(&mut self, item: T) -> StartSend<T, ()>304 fn start_send(&mut self, item: T) -> StartSend<T, ()> {
305 if self.allow.check() {
306 self.data.push(item);
307 Ok(AsyncSink::Ready)
308 } else {
309 Ok(AsyncSink::NotReady(item))
310 }
311 }
312
poll_complete(&mut self) -> Poll<(), ()>313 fn poll_complete(&mut self) -> Poll<(), ()> {
314 Ok(Async::Ready(()))
315 }
316
close(&mut self) -> Poll<(), ()>317 fn close(&mut self) -> Poll<(), ()> {
318 Ok(().into())
319 }
320 }
321
manual_allow<T>() -> (ManualAllow<T>, Rc<Allow>)322 fn manual_allow<T>() -> (ManualAllow<T>, Rc<Allow>) {
323 let allow = Rc::new(Allow::new());
324 let manual_allow = ManualAllow {
325 data: Vec::new(),
326 allow: allow.clone(),
327 };
328 (manual_allow, allow)
329 }
330
331 #[test]
332 // test basic buffer functionality, including both filling up to capacity,
333 // and writing out when the underlying sink is ready
buffer()334 fn buffer() {
335 let (sink, allow) = manual_allow::<i32>();
336 let sink = sink.buffer(2);
337
338 let sink = StartSendFut::new(sink, 0).wait().unwrap();
339 let sink = StartSendFut::new(sink, 1).wait().unwrap();
340
341 let flag = Flag::new();
342 let mut task = executor::spawn(sink.send(2));
343 assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
344 assert!(!flag.get());
345 allow.start();
346 assert!(flag.get());
347 match task.poll_future_notify(&flag, 0).unwrap() {
348 Async::Ready(sink) => {
349 assert_eq!(sink.get_ref().data, vec![0, 1, 2]);
350 }
351 _ => panic!()
352 }
353 }
354
355 #[test]
fanout_smoke()356 fn fanout_smoke() {
357 let sink1 = Vec::new();
358 let sink2 = Vec::new();
359 let sink = sink1.fanout(sink2);
360 let stream = futures::stream::iter_ok(vec![1,2,3]);
361 let (sink, _) = sink.send_all(stream).wait().unwrap();
362 let (sink1, sink2) = sink.into_inner();
363 assert_eq!(sink1, vec![1,2,3]);
364 assert_eq!(sink2, vec![1,2,3]);
365 }
366
367 #[test]
fanout_backpressure()368 fn fanout_backpressure() {
369 let (left_send, left_recv) = mpsc::channel(0);
370 let (right_send, right_recv) = mpsc::channel(0);
371 let sink = left_send.fanout(right_send);
372
373 let sink = StartSendFut::new(sink, 0).wait().unwrap();
374 let sink = StartSendFut::new(sink, 1).wait().unwrap();
375
376 let flag = Flag::new();
377 let mut task = executor::spawn(sink.send(2));
378 assert!(!flag.get());
379 assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
380 let (item, left_recv) = left_recv.into_future().wait().unwrap();
381 assert_eq!(item, Some(0));
382 assert!(flag.get());
383 assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
384 let (item, right_recv) = right_recv.into_future().wait().unwrap();
385 assert_eq!(item, Some(0));
386 assert!(flag.get());
387 assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
388 let (item, left_recv) = left_recv.into_future().wait().unwrap();
389 assert_eq!(item, Some(1));
390 assert!(flag.get());
391 assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
392 let (item, right_recv) = right_recv.into_future().wait().unwrap();
393 assert_eq!(item, Some(1));
394 assert!(flag.get());
395 let (item, left_recv) = left_recv.into_future().wait().unwrap();
396 assert_eq!(item, Some(2));
397 assert!(flag.get());
398 assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
399 let (item, right_recv) = right_recv.into_future().wait().unwrap();
400 assert_eq!(item, Some(2));
401 match task.poll_future_notify(&flag, 0).unwrap() {
402 Async::Ready(_) => {
403 },
404 _ => panic!()
405 };
406 // make sure receivers live until end of test to prevent send errors
407 drop(left_recv);
408 drop(right_recv);
409 }
410
411 #[test]
map_err()412 fn map_err() {
413 {
414 let (tx, _rx) = mpsc::channel(1);
415 let mut tx = tx.sink_map_err(|_| ());
416 assert_eq!(tx.start_send(()), Ok(AsyncSink::Ready));
417 assert_eq!(tx.poll_complete(), Ok(Async::Ready(())));
418 }
419
420 let tx = mpsc::channel(0).0;
421 assert_eq!(tx.sink_map_err(|_| ()).start_send(()), Err(()));
422 }
423
424 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
425 struct FromErrTest;
426
427 impl<T> From<mpsc::SendError<T>> for FromErrTest {
from(_: mpsc::SendError<T>) -> FromErrTest428 fn from(_: mpsc::SendError<T>) -> FromErrTest {
429 FromErrTest
430 }
431 }
432
433 #[test]
from_err()434 fn from_err() {
435 {
436 let (tx, _rx) = mpsc::channel(1);
437 let mut tx: SinkFromErr<mpsc::Sender<()>, FromErrTest> = tx.sink_from_err();
438 assert_eq!(tx.start_send(()), Ok(AsyncSink::Ready));
439 assert_eq!(tx.poll_complete(), Ok(Async::Ready(())));
440 }
441
442 let tx = mpsc::channel(0).0;
443 assert_eq!(tx.sink_from_err().start_send(()), Err(FromErrTest));
444 }
445