1 #![allow(bare_trait_objects, unknown_lints)]
2
3 extern crate futures;
4
5 use std::mem;
6 use std::sync::Arc;
7 use std::rc::Rc;
8 use std::cell::{Cell, RefCell};
9 use std::sync::atomic::{Ordering, AtomicBool};
10
11 use futures::prelude::*;
12 use futures::future::ok;
13 use futures::stream;
14 use futures::sync::{oneshot, mpsc};
15 use futures::task::{self, Task};
16 use futures::executor::{self, Notify};
17 use futures::sink::SinkFromErr;
18
19 mod support;
20 use support::*;
21
22 #[test]
vec_sink()23 fn vec_sink() {
24 let mut v = Vec::new();
25 assert_eq!(v.start_send(0), Ok(AsyncSink::Ready));
26 assert_eq!(v.start_send(1), Ok(AsyncSink::Ready));
27 assert_eq!(v, vec![0, 1]);
28 assert_done(move || v.flush(), Ok(vec![0, 1]));
29 }
30
31 #[test]
send()32 fn send() {
33 let v = Vec::new();
34
35 let v = v.send(0).wait().unwrap();
36 assert_eq!(v, vec![0]);
37
38 let v = v.send(1).wait().unwrap();
39 assert_eq!(v, vec![0, 1]);
40
41 assert_done(move || v.send(2),
42 Ok(vec![0, 1, 2]));
43 }
44
45 #[test]
send_all()46 fn send_all() {
47 let v = Vec::new();
48
49 let (v, _) = v.send_all(stream::iter_ok(vec![0, 1])).wait().unwrap();
50 assert_eq!(v, vec![0, 1]);
51
52 let (v, _) = v.send_all(stream::iter_ok(vec![2, 3])).wait().unwrap();
53 assert_eq!(v, vec![0, 1, 2, 3]);
54
55 assert_done(
56 move || v.send_all(stream::iter_ok(vec![4, 5])).map(|(v, _)| v),
57 Ok(vec![0, 1, 2, 3, 4, 5]));
58 }
59
60 // An Unpark struct that records unpark events for inspection
61 struct Flag(pub AtomicBool);
62
63 impl Flag {
new() -> Arc<Flag>64 fn new() -> Arc<Flag> {
65 Arc::new(Flag(AtomicBool::new(false)))
66 }
67
get(&self) -> bool68 fn get(&self) -> bool {
69 self.0.load(Ordering::SeqCst)
70 }
71
set(&self, v: bool)72 fn set(&self, v: bool) {
73 self.0.store(v, Ordering::SeqCst)
74 }
75 }
76
77 impl Notify for Flag {
notify(&self, _id: usize)78 fn notify(&self, _id: usize) {
79 self.set(true)
80 }
81 }
82
83 // Sends a value on an i32 channel sink
84 struct StartSendFut<S: Sink>(Option<S>, Option<S::SinkItem>);
85
86 impl<S: Sink> StartSendFut<S> {
new(sink: S, item: S::SinkItem) -> StartSendFut<S>87 fn new(sink: S, item: S::SinkItem) -> StartSendFut<S> {
88 StartSendFut(Some(sink), Some(item))
89 }
90 }
91
92 impl<S: Sink> Future for StartSendFut<S> {
93 type Item = S;
94 type Error = S::SinkError;
95
poll(&mut self) -> Poll<S, S::SinkError>96 fn poll(&mut self) -> Poll<S, S::SinkError> {
97 match self.0.as_mut().unwrap().start_send(self.1.take().unwrap())? {
98 AsyncSink::Ready => Ok(Async::Ready(self.0.take().unwrap())),
99 AsyncSink::NotReady(item) => {
100 self.1 = Some(item);
101 Ok(Async::NotReady)
102 }
103 }
104
105 }
106 }
107
108 #[test]
109 // Test that `start_send` on an `mpsc` channel does indeed block when the
110 // channel is full
mpsc_blocking_start_send()111 fn mpsc_blocking_start_send() {
112 let (mut tx, mut rx) = mpsc::channel::<i32>(0);
113
114 futures::future::lazy(|| {
115 assert_eq!(tx.start_send(0).unwrap(), AsyncSink::Ready);
116
117 let flag = Flag::new();
118 let mut task = executor::spawn(StartSendFut::new(tx, 1));
119
120 assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
121 assert!(!flag.get());
122 sassert_next(&mut rx, 0);
123 assert!(flag.get());
124 flag.set(false);
125 assert!(task.poll_future_notify(&flag, 0).unwrap().is_ready());
126 assert!(!flag.get());
127 sassert_next(&mut rx, 1);
128
129 Ok::<(), ()>(())
130 }).wait().unwrap();
131 }
132
133 #[test]
134 // test `flush` by using `with` to make the first insertion into a sink block
135 // until a oneshot is completed
with_flush()136 fn with_flush() {
137 let (tx, rx) = oneshot::channel();
138 let mut block = Box::new(rx) as Box<Future<Item = _, Error = _>>;
139 let mut sink = Vec::new().with(|elem| {
140 mem::replace(&mut block, Box::new(ok(())))
141 .map(move |_| elem + 1).map_err(|_| -> () { panic!() })
142 });
143
144 assert_eq!(sink.start_send(0), Ok(AsyncSink::Ready));
145
146 let flag = Flag::new();
147 let mut task = executor::spawn(sink.flush());
148 assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
149 tx.send(()).unwrap();
150 assert!(flag.get());
151
152 let sink = match task.poll_future_notify(&flag, 0).unwrap() {
153 Async::Ready(sink) => sink,
154 _ => panic!()
155 };
156
157 assert_eq!(sink.send(1).wait().unwrap().get_ref(), &[1, 2]);
158 }
159
160 #[test]
161 // test simple use of with to change data
with_as_map()162 fn with_as_map() {
163 let sink = Vec::new().with(|item| -> Result<i32, ()> {
164 Ok(item * 2)
165 });
166 let sink = sink.send(0).wait().unwrap();
167 let sink = sink.send(1).wait().unwrap();
168 let sink = sink.send(2).wait().unwrap();
169 assert_eq!(sink.get_ref(), &[0, 2, 4]);
170 }
171
172 #[test]
173 // test simple use of with_flat_map
with_flat_map()174 fn with_flat_map() {
175 let sink = Vec::new().with_flat_map(|item| {
176 stream::iter_ok(vec![item; item])
177 });
178 let sink = sink.send(0).wait().unwrap();
179 let sink = sink.send(1).wait().unwrap();
180 let sink = sink.send(2).wait().unwrap();
181 let sink = sink.send(3).wait().unwrap();
182 assert_eq!(sink.get_ref(), &[1,2,2,3,3,3]);
183 }
184
185 // Immediately accepts all requests to start pushing, but completion is managed
186 // by manually flushing
187 struct ManualFlush<T> {
188 data: Vec<T>,
189 waiting_tasks: Vec<Task>,
190 }
191
192 impl<T> Sink for ManualFlush<T> {
193 type SinkItem = Option<T>; // Pass None to flush
194 type SinkError = ();
195
start_send(&mut self, op: Option<T>) -> StartSend<Option<T>, ()>196 fn start_send(&mut self, op: Option<T>) -> StartSend<Option<T>, ()> {
197 if let Some(item) = op {
198 self.data.push(item);
199 } else {
200 self.force_flush();
201 }
202 Ok(AsyncSink::Ready)
203 }
204
poll_complete(&mut self) -> Poll<(), ()>205 fn poll_complete(&mut self) -> Poll<(), ()> {
206 if self.data.is_empty() {
207 Ok(Async::Ready(()))
208 } else {
209 self.waiting_tasks.push(task::current());
210 Ok(Async::NotReady)
211 }
212 }
213
close(&mut self) -> Poll<(), ()>214 fn close(&mut self) -> Poll<(), ()> {
215 Ok(().into())
216 }
217 }
218
219 impl<T> ManualFlush<T> {
new() -> ManualFlush<T>220 fn new() -> ManualFlush<T> {
221 ManualFlush {
222 data: Vec::new(),
223 waiting_tasks: Vec::new()
224 }
225 }
226
force_flush(&mut self) -> Vec<T>227 fn force_flush(&mut self) -> Vec<T> {
228 for task in self.waiting_tasks.drain(..) {
229 task.notify()
230 }
231 mem::replace(&mut self.data, Vec::new())
232 }
233 }
234
235 #[test]
236 // test that the `with` sink doesn't require the underlying sink to flush,
237 // but doesn't claim to be flushed until the underlying sink is
with_flush_propagate()238 fn with_flush_propagate() {
239 let mut sink = ManualFlush::new().with(|x| -> Result<Option<i32>, ()> { Ok(x) });
240 assert_eq!(sink.start_send(Some(0)).unwrap(), AsyncSink::Ready);
241 assert_eq!(sink.start_send(Some(1)).unwrap(), AsyncSink::Ready);
242
243 let flag = Flag::new();
244 let mut task = executor::spawn(sink.flush());
245 assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
246 assert!(!flag.get());
247 assert_eq!(task.get_mut().get_mut().get_mut().force_flush(), vec![0, 1]);
248 assert!(flag.get());
249 assert!(task.poll_future_notify(&flag, 0).unwrap().is_ready());
250 }
251
252 #[test]
253 // test that a buffer is a no-nop around a sink that always accepts sends
buffer_noop()254 fn buffer_noop() {
255 let sink = Vec::new().buffer(0);
256 let sink = sink.send(0).wait().unwrap();
257 let sink = sink.send(1).wait().unwrap();
258 assert_eq!(sink.get_ref(), &[0, 1]);
259
260 let sink = Vec::new().buffer(1);
261 let sink = sink.send(0).wait().unwrap();
262 let sink = sink.send(1).wait().unwrap();
263 assert_eq!(sink.get_ref(), &[0, 1]);
264 }
265
266 struct ManualAllow<T> {
267 data: Vec<T>,
268 allow: Rc<Allow>,
269 }
270
271 struct Allow {
272 flag: Cell<bool>,
273 tasks: RefCell<Vec<Task>>,
274 }
275
276 impl Allow {
new() -> Allow277 fn new() -> Allow {
278 Allow {
279 flag: Cell::new(false),
280 tasks: RefCell::new(Vec::new()),
281 }
282 }
283
check(&self) -> bool284 fn check(&self) -> bool {
285 if self.flag.get() {
286 true
287 } else {
288 self.tasks.borrow_mut().push(task::current());
289 false
290 }
291 }
292
start(&self)293 fn start(&self) {
294 self.flag.set(true);
295 let mut tasks = self.tasks.borrow_mut();
296 for task in tasks.drain(..) {
297 task.notify();
298 }
299 }
300 }
301
302 impl<T> Sink for ManualAllow<T> {
303 type SinkItem = T;
304 type SinkError = ();
305
start_send(&mut self, item: T) -> StartSend<T, ()>306 fn start_send(&mut self, item: T) -> StartSend<T, ()> {
307 if self.allow.check() {
308 self.data.push(item);
309 Ok(AsyncSink::Ready)
310 } else {
311 Ok(AsyncSink::NotReady(item))
312 }
313 }
314
poll_complete(&mut self) -> Poll<(), ()>315 fn poll_complete(&mut self) -> Poll<(), ()> {
316 Ok(Async::Ready(()))
317 }
318
close(&mut self) -> Poll<(), ()>319 fn close(&mut self) -> Poll<(), ()> {
320 Ok(().into())
321 }
322 }
323
manual_allow<T>() -> (ManualAllow<T>, Rc<Allow>)324 fn manual_allow<T>() -> (ManualAllow<T>, Rc<Allow>) {
325 let allow = Rc::new(Allow::new());
326 let manual_allow = ManualAllow {
327 data: Vec::new(),
328 allow: allow.clone(),
329 };
330 (manual_allow, allow)
331 }
332
333 #[test]
334 // test basic buffer functionality, including both filling up to capacity,
335 // and writing out when the underlying sink is ready
buffer()336 fn buffer() {
337 let (sink, allow) = manual_allow::<i32>();
338 let sink = sink.buffer(2);
339
340 let sink = StartSendFut::new(sink, 0).wait().unwrap();
341 let sink = StartSendFut::new(sink, 1).wait().unwrap();
342
343 let flag = Flag::new();
344 let mut task = executor::spawn(sink.send(2));
345 assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
346 assert!(!flag.get());
347 allow.start();
348 assert!(flag.get());
349 match task.poll_future_notify(&flag, 0).unwrap() {
350 Async::Ready(sink) => {
351 assert_eq!(sink.get_ref().data, vec![0, 1, 2]);
352 }
353 _ => panic!()
354 }
355 }
356
357 #[test]
fanout_smoke()358 fn fanout_smoke() {
359 let sink1 = Vec::new();
360 let sink2 = Vec::new();
361 let sink = sink1.fanout(sink2);
362 let stream = futures::stream::iter_ok(vec![1,2,3]);
363 let (sink, _) = sink.send_all(stream).wait().unwrap();
364 let (sink1, sink2) = sink.into_inner();
365 assert_eq!(sink1, vec![1,2,3]);
366 assert_eq!(sink2, vec![1,2,3]);
367 }
368
369 #[test]
fanout_backpressure()370 fn fanout_backpressure() {
371 let (left_send, left_recv) = mpsc::channel(0);
372 let (right_send, right_recv) = mpsc::channel(0);
373 let sink = left_send.fanout(right_send);
374
375 let sink = StartSendFut::new(sink, 0).wait().unwrap();
376 let sink = StartSendFut::new(sink, 1).wait().unwrap();
377
378 let flag = Flag::new();
379 let mut task = executor::spawn(sink.send(2));
380 assert!(!flag.get());
381 assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
382 let (item, left_recv) = left_recv.into_future().wait().unwrap();
383 assert_eq!(item, Some(0));
384 assert!(flag.get());
385 assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
386 let (item, right_recv) = right_recv.into_future().wait().unwrap();
387 assert_eq!(item, Some(0));
388 assert!(flag.get());
389 assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
390 let (item, left_recv) = left_recv.into_future().wait().unwrap();
391 assert_eq!(item, Some(1));
392 assert!(flag.get());
393 assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
394 let (item, right_recv) = right_recv.into_future().wait().unwrap();
395 assert_eq!(item, Some(1));
396 assert!(flag.get());
397 let (item, left_recv) = left_recv.into_future().wait().unwrap();
398 assert_eq!(item, Some(2));
399 assert!(flag.get());
400 assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready());
401 let (item, right_recv) = right_recv.into_future().wait().unwrap();
402 assert_eq!(item, Some(2));
403 match task.poll_future_notify(&flag, 0).unwrap() {
404 Async::Ready(_) => {
405 },
406 _ => panic!()
407 };
408 // make sure receivers live until end of test to prevent send errors
409 drop(left_recv);
410 drop(right_recv);
411 }
412
413 #[test]
map_err()414 fn map_err() {
415 {
416 let (tx, _rx) = mpsc::channel(1);
417 let mut tx = tx.sink_map_err(|_| ());
418 assert_eq!(tx.start_send(()), Ok(AsyncSink::Ready));
419 assert_eq!(tx.poll_complete(), Ok(Async::Ready(())));
420 }
421
422 let tx = mpsc::channel(0).0;
423 assert_eq!(tx.sink_map_err(|_| ()).start_send(()), Err(()));
424 }
425
426 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
427 struct FromErrTest;
428
429 impl<T> From<mpsc::SendError<T>> for FromErrTest {
from(_: mpsc::SendError<T>) -> FromErrTest430 fn from(_: mpsc::SendError<T>) -> FromErrTest {
431 FromErrTest
432 }
433 }
434
435 #[test]
from_err()436 fn from_err() {
437 {
438 let (tx, _rx) = mpsc::channel(1);
439 let mut tx: SinkFromErr<mpsc::Sender<()>, FromErrTest> = tx.sink_from_err();
440 assert_eq!(tx.start_send(()), Ok(AsyncSink::Ready));
441 assert_eq!(tx.poll_complete(), Ok(Async::Ready(())));
442 }
443
444 let tx = mpsc::channel(0).0;
445 assert_eq!(tx.sink_from_err().start_send(()), Err(FromErrTest));
446 }
447