1 #![doc(html_root_url = "https://docs.rs/want/0.3.0")]
2 #![deny(warnings)]
3 #![deny(missing_docs)]
4 #![deny(missing_debug_implementations)]
5
6 //! A Futures channel-like utility to signal when a value is wanted.
7 //!
8 //! Futures are supposed to be lazy, and only starting work if `Future::poll`
9 //! is called. The same is true of `Stream`s, but when using a channel as
10 //! a `Stream`, it can be hard to know if the receiver is ready for the next
11 //! value.
12 //!
13 //! Put another way, given a `(tx, rx)` from `futures::sync::mpsc::channel()`,
14 //! how can the sender (`tx`) know when the receiver (`rx`) actually wants more
15 //! work to be produced? Just because there is room in the channel buffer
16 //! doesn't mean the work would be used by the receiver.
17 //!
18 //! This is where something like `want` comes in. Added to a channel, you can
19 //! make sure that the `tx` only creates the message and sends it when the `rx`
20 //! has `poll()` for it, and the buffer was empty.
21 //!
22 //! # Example
23 //!
24 //! ```nightly
25 //! # //#![feature(async_await)]
26 //! extern crate want;
27 //!
28 //! # fn spawn<T>(_t: T) {}
29 //! # fn we_still_want_message() -> bool { true }
30 //! # fn mpsc_channel() -> (Tx, Rx) { (Tx, Rx) }
31 //! # struct Tx;
32 //! # impl Tx { fn send<T>(&mut self, _: T) {} }
33 //! # struct Rx;
34 //! # impl Rx { async fn recv(&mut self) -> Option<Expensive> { Some(Expensive) } }
35 //!
36 //! // Some message that is expensive to produce.
37 //! struct Expensive;
38 //!
39 //! // Some futures-aware MPSC channel...
40 //! let (mut tx, mut rx) = mpsc_channel();
41 //!
42 //! // And our `want` channel!
43 //! let (mut gv, mut tk) = want::new();
44 //!
45 //!
46 //! // Our receiving task...
47 //! spawn(async move {
48 //! // Maybe something comes up that prevents us from ever
49 //! // using the expensive message.
50 //! //
51 //! // Without `want`, the "send" task may have started to
52 //! // produce the expensive message even though we wouldn't
53 //! // be able to use it.
54 //! if !we_still_want_message() {
55 //! return;
56 //! }
57 //!
58 //! // But we can use it! So tell the `want` channel.
59 //! tk.want();
60 //!
61 //! match rx.recv().await {
62 //! Some(_msg) => println!("got a message"),
63 //! None => println!("DONE"),
64 //! }
65 //! });
66 //!
67 //! // Our sending task
68 //! spawn(async move {
69 //! // It's expensive to create a new message, so we wait until the
70 //! // receiving end truly *wants* the message.
71 //! if let Err(_closed) = gv.want().await {
72 //! // Looks like they will never want it...
73 //! return;
74 //! }
75 //!
76 //! // They want it, let's go!
77 //! tx.send(Expensive);
78 //! });
79 //!
80 //! # fn main() {}
81 //! ```
82
83 #[macro_use]
84 extern crate log;
85
86 use std::fmt;
87 use std::future::Future;
88 use std::mem;
89 use std::pin::Pin;
90 use std::sync::Arc;
91 use std::sync::atomic::AtomicUsize;
92 // SeqCst is the only ordering used to ensure accessing the state and
93 // TryLock are never re-ordered.
94 use std::sync::atomic::Ordering::SeqCst;
95 use std::task::{self, Poll, Waker};
96
97
98 use try_lock::TryLock;
99
100 /// Create a new `want` channel.
new() -> (Giver, Taker)101 pub fn new() -> (Giver, Taker) {
102 let inner = Arc::new(Inner {
103 state: AtomicUsize::new(State::Idle.into()),
104 task: TryLock::new(None),
105 });
106 let inner2 = inner.clone();
107 (
108 Giver {
109 inner: inner,
110 },
111 Taker {
112 inner: inner2,
113 },
114 )
115 }
116
117 /// An entity that gives a value when wanted.
118 pub struct Giver {
119 inner: Arc<Inner>,
120 }
121
122 /// An entity that wants a value.
123 pub struct Taker {
124 inner: Arc<Inner>,
125 }
126
127 /// A cloneable `Giver`.
128 ///
129 /// It differs from `Giver` in that you cannot poll for `want`. It's only
130 /// usable as a cancellation watcher.
131 #[derive(Clone)]
132 pub struct SharedGiver {
133 inner: Arc<Inner>,
134 }
135
136 /// The `Taker` has canceled its interest in a value.
137 pub struct Closed {
138 _inner: (),
139 }
140
141 #[derive(Clone, Copy, Debug)]
142 enum State {
143 Idle,
144 Want,
145 Give,
146 Closed,
147 }
148
149 impl From<State> for usize {
from(s: State) -> usize150 fn from(s: State) -> usize {
151 match s {
152 State::Idle => 0,
153 State::Want => 1,
154 State::Give => 2,
155 State::Closed => 3,
156 }
157 }
158 }
159
160 impl From<usize> for State {
from(num: usize) -> State161 fn from(num: usize) -> State {
162 match num {
163 0 => State::Idle,
164 1 => State::Want,
165 2 => State::Give,
166 3 => State::Closed,
167 _ => unreachable!("unknown state: {}", num),
168 }
169 }
170 }
171
172 struct Inner {
173 state: AtomicUsize,
174 task: TryLock<Option<Waker>>,
175 }
176
177 // ===== impl Giver ======
178
179 impl Giver {
180 /// Returns a `Future` that fulfills when the `Taker` has done some action.
want<'a>(&'a mut self) -> impl Future<Output = Result<(), Closed>> + 'a181 pub fn want<'a>(&'a mut self) -> impl Future<Output = Result<(), Closed>> + 'a {
182 Want(self)
183 }
184
185 /// Poll whether the `Taker` has registered interest in another value.
186 ///
187 /// - If the `Taker` has called `want()`, this returns `Async::Ready(())`.
188 /// - If the `Taker` has not called `want()` since last poll, this
189 /// returns `Async::NotReady`, and parks the current task to be notified
190 /// when the `Taker` does call `want()`.
191 /// - If the `Taker` has canceled (or dropped), this returns `Closed`.
192 ///
193 /// After knowing that the Taker is wanting, the state can be reset by
194 /// calling [`give`](Giver::give).
poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Closed>>195 pub fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Closed>> {
196 loop {
197 let state = self.inner.state.load(SeqCst).into();
198 match state {
199 State::Want => {
200 trace!("poll_want: taker wants!");
201 return Poll::Ready(Ok(()));
202 },
203 State::Closed => {
204 trace!("poll_want: closed");
205 return Poll::Ready(Err(Closed { _inner: () }));
206 },
207 State::Idle | State::Give => {
208 // Taker doesn't want anything yet, so park.
209 if let Some(mut locked) = self.inner.task.try_lock_order(SeqCst, SeqCst) {
210
211 // While we have the lock, try to set to GIVE.
212 let old = self.inner.state.compare_and_swap(
213 state.into(),
214 State::Give.into(),
215 SeqCst,
216 );
217 // If it's still the first state (Idle or Give), park current task.
218 if old == state.into() {
219 let park = locked.as_ref()
220 .map(|w| !w.will_wake(cx.waker()))
221 .unwrap_or(true);
222 if park {
223 let old = mem::replace(&mut *locked, Some(cx.waker().clone()));
224 drop(locked);
225 old.map(|prev_task| {
226 // there was an old task parked here.
227 // it might be waiting to be notified,
228 // so poke it before dropping.
229 prev_task.wake();
230 });
231 }
232 return Poll::Pending;
233 }
234 // Otherwise, something happened! Go around the loop again.
235 } else {
236 // if we couldn't take the lock, then a Taker has it.
237 // The *ONLY* reason is because it is in the process of notifying us
238 // of its want.
239 //
240 // We need to loop again to see what state it was changed to.
241 }
242 },
243 }
244 }
245 }
246
247 /// Mark the state as idle, if the Taker currently is wanting.
248 ///
249 /// Returns true if Taker was wanting, false otherwise.
250 #[inline]
give(&self) -> bool251 pub fn give(&self) -> bool {
252 // only set to IDLE if it is still Want
253 self.inner.state.compare_and_swap(
254 State::Want.into(),
255 State::Idle.into(),
256 SeqCst,
257 ) == State::Want.into()
258 }
259
260 /// Check if the `Taker` has called `want()` without parking a task.
261 ///
262 /// This is safe to call outside of a futures task context, but other
263 /// means of being notified is left to the user.
264 #[inline]
is_wanting(&self) -> bool265 pub fn is_wanting(&self) -> bool {
266 self.inner.state.load(SeqCst) == State::Want.into()
267 }
268
269
270 /// Check if the `Taker` has canceled interest without parking a task.
271 #[inline]
is_canceled(&self) -> bool272 pub fn is_canceled(&self) -> bool {
273 self.inner.state.load(SeqCst) == State::Closed.into()
274 }
275
276 /// Converts this into a `SharedGiver`.
277 #[inline]
shared(self) -> SharedGiver278 pub fn shared(self) -> SharedGiver {
279 SharedGiver {
280 inner: self.inner,
281 }
282 }
283 }
284
285 impl fmt::Debug for Giver {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result286 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
287 f.debug_struct("Giver")
288 .field("state", &self.inner.state())
289 .finish()
290 }
291 }
292
293 // ===== impl SharedGiver ======
294
295 impl SharedGiver {
296 /// Check if the `Taker` has called `want()` without parking a task.
297 ///
298 /// This is safe to call outside of a futures task context, but other
299 /// means of being notified is left to the user.
300 #[inline]
is_wanting(&self) -> bool301 pub fn is_wanting(&self) -> bool {
302 self.inner.state.load(SeqCst) == State::Want.into()
303 }
304
305
306 /// Check if the `Taker` has canceled interest without parking a task.
307 #[inline]
is_canceled(&self) -> bool308 pub fn is_canceled(&self) -> bool {
309 self.inner.state.load(SeqCst) == State::Closed.into()
310 }
311 }
312
313 impl fmt::Debug for SharedGiver {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result314 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
315 f.debug_struct("SharedGiver")
316 .field("state", &self.inner.state())
317 .finish()
318 }
319 }
320
321 // ===== impl Taker ======
322
323 impl Taker {
324 /// Signal to the `Giver` that the want is canceled.
325 ///
326 /// This is useful to tell that the channel is closed if you cannot
327 /// drop the value yet.
328 #[inline]
cancel(&mut self)329 pub fn cancel(&mut self) {
330 trace!("signal: {:?}", State::Closed);
331 self.signal(State::Closed)
332 }
333
334 /// Signal to the `Giver` that a value is wanted.
335 #[inline]
want(&mut self)336 pub fn want(&mut self) {
337 debug_assert!(
338 self.inner.state.load(SeqCst) != State::Closed.into(),
339 "want called after cancel"
340 );
341 trace!("signal: {:?}", State::Want);
342 self.signal(State::Want)
343 }
344
345 #[inline]
signal(&mut self, state: State)346 fn signal(&mut self, state: State) {
347 let old_state = self.inner.state.swap(state.into(), SeqCst).into();
348 match old_state {
349 State::Idle | State::Want | State::Closed => (),
350 State::Give => {
351 loop {
352 if let Some(mut locked) = self.inner.task.try_lock_order(SeqCst, SeqCst) {
353 if let Some(task) = locked.take() {
354 drop(locked);
355 trace!("signal found waiting giver, notifying");
356 task.wake();
357 }
358 return;
359 } else {
360 // if we couldn't take the lock, then a Giver has it.
361 // The *ONLY* reason is because it is in the process of parking.
362 //
363 // We need to loop and take the lock so we can notify this task.
364 }
365 }
366 },
367 }
368 }
369 }
370
371 impl Drop for Taker {
372 #[inline]
drop(&mut self)373 fn drop(&mut self) {
374 self.signal(State::Closed);
375 }
376 }
377
378 impl fmt::Debug for Taker {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result379 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
380 f.debug_struct("Taker")
381 .field("state", &self.inner.state())
382 .finish()
383 }
384 }
385
386 // ===== impl Closed ======
387
388 impl fmt::Debug for Closed {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result389 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
390 f.debug_struct("Closed")
391 .finish()
392 }
393 }
394
395 // ===== impl Inner ======
396
397 impl Inner {
398 #[inline]
state(&self) -> State399 fn state(&self) -> State {
400 self.state.load(SeqCst).into()
401 }
402 }
403
404 // ===== impl PollFn ======
405
406 struct Want<'a>(&'a mut Giver);
407
408
409 impl Future for Want<'_> {
410 type Output = Result<(), Closed>;
411
poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>412 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
413 self.0.poll_want(cx)
414 }
415 }
416
417 #[cfg(test)]
418 mod tests {
419 use std::thread;
420 use tokio_sync::oneshot;
421 use super::*;
422
block_on<F: Future>(f: F) -> F::Output423 fn block_on<F: Future>(f: F) -> F::Output {
424 tokio_executor::enter()
425 .expect("block_on enter")
426 .block_on(f)
427 }
428
429 #[test]
want_ready()430 fn want_ready() {
431 let (mut gv, mut tk) = new();
432
433 tk.want();
434
435 block_on(gv.want()).unwrap();
436 }
437
438 #[test]
want_notify_0()439 fn want_notify_0() {
440 let (mut gv, mut tk) = new();
441 let (tx, rx) = oneshot::channel();
442
443 thread::spawn(move || {
444 tk.want();
445 // use a oneshot to keep this thread alive
446 // until other thread was notified of want
447 block_on(rx).expect("rx");
448 });
449
450 block_on(gv.want()).expect("want");
451
452 assert!(gv.is_wanting(), "still wanting after poll_want success");
453 assert!(gv.give(), "give is true when wanting");
454
455 assert!(!gv.is_wanting(), "no longer wanting after give");
456 assert!(!gv.is_canceled(), "give doesn't cancel");
457
458 assert!(!gv.give(), "give is false if not wanting");
459
460 tx.send(()).expect("tx");
461 }
462
463 /*
464 /// This tests that if the Giver moves tasks after parking,
465 /// it will still wake up the correct task.
466 #[test]
467 fn want_notify_moving_tasks() {
468 use std::sync::Arc;
469 use futures::executor::{spawn, Notify, NotifyHandle};
470
471 struct WantNotify;
472
473 impl Notify for WantNotify {
474 fn notify(&self, _id: usize) {
475 }
476 }
477
478 fn n() -> NotifyHandle {
479 Arc::new(WantNotify).into()
480 }
481
482 let (mut gv, mut tk) = new();
483
484 let mut s = spawn(poll_fn(move || {
485 gv.poll_want()
486 }));
487
488 // Register with t1 as the task::current()
489 let t1 = n();
490 assert!(s.poll_future_notify(&t1, 1).unwrap().is_not_ready());
491
492 thread::spawn(move || {
493 thread::sleep(::std::time::Duration::from_millis(100));
494 tk.want();
495 });
496
497 // And now, move to a ThreadNotify task.
498 s.into_inner().wait().expect("poll_want");
499 }
500 */
501
502 #[test]
cancel()503 fn cancel() {
504 // explicit
505 let (mut gv, mut tk) = new();
506
507 assert!(!gv.is_canceled());
508
509 tk.cancel();
510
511 assert!(gv.is_canceled());
512 block_on(gv.want()).unwrap_err();
513
514 // implicit
515 let (mut gv, tk) = new();
516
517 assert!(!gv.is_canceled());
518
519 drop(tk);
520
521 assert!(gv.is_canceled());
522 block_on(gv.want()).unwrap_err();
523
524 // notifies
525 let (mut gv, tk) = new();
526
527 thread::spawn(move || {
528 let _tk = tk;
529 // and dropped
530 });
531
532 block_on(gv.want()).unwrap_err();
533 }
534
535 /*
536 #[test]
537 fn stress() {
538 let nthreads = 5;
539 let nwants = 100;
540
541 for _ in 0..nthreads {
542 let (mut gv, mut tk) = new();
543 let (mut tx, mut rx) = mpsc::channel(0);
544
545 // rx thread
546 thread::spawn(move || {
547 let mut cnt = 0;
548 poll_fn(move || {
549 while cnt < nwants {
550 let n = match rx.poll().expect("rx poll") {
551 Async::Ready(n) => n.expect("rx opt"),
552 Async::NotReady => {
553 tk.want();
554 return Ok(Async::NotReady);
555 },
556 };
557 assert_eq!(cnt, n);
558 cnt += 1;
559 }
560 Ok::<_, ()>(Async::Ready(()))
561 }).wait().expect("rx wait");
562 });
563
564 // tx thread
565 thread::spawn(move || {
566 let mut cnt = 0;
567 let nsent = poll_fn(move || {
568 loop {
569 while let Ok(()) = tx.try_send(cnt) {
570 cnt += 1;
571 }
572 match gv.poll_want() {
573 Ok(Async::Ready(_)) => (),
574 Ok(Async::NotReady) => return Ok::<_, ()>(Async::NotReady),
575 Err(_) => return Ok(Async::Ready(cnt)),
576 }
577 }
578 }).wait().expect("tx wait");
579
580 assert_eq!(nsent, nwants);
581 }).join().expect("thread join");
582 }
583 }
584 */
585 }
586