1 use super::{BlockingError, BlockingImpl};
2 use futures::Poll;
3 use std::cell::Cell;
4 use std::fmt;
5 use std::marker::PhantomData;
6 use tokio_executor::Enter;
7 
8 thread_local! {
9     static CURRENT: Cell<BlockingImpl> = Cell::new(super::default_blocking);
10 }
11 
12 /// Ensures that the executor is removed from the thread-local context
13 /// when leaving the scope. This handles cases that involve panicking.
14 ///
15 /// **NOTE:** This is intended specifically for use by `tokio` 0.2's
16 /// backwards-compatibility layer. In general, user code should not override the
17 /// blocking implementation. If you use this, make sure you know what you're
18 /// doing.
19 pub struct DefaultGuard<'a> {
20     prior: BlockingImpl,
21     _lifetime: PhantomData<&'a ()>,
22 }
23 
24 /// Set the default blocking implementation, returning a guard that resets the
25 /// blocking implementation when dropped.
26 ///
27 /// **NOTE:** This is intended specifically for use by `tokio` 0.2's
28 /// backwards-compatibility layer. In general, user code should not override the
29 /// blocking implementation. If you use this, make sure you know what you're
30 /// doing.
set_default<'a>(blocking: BlockingImpl) -> DefaultGuard<'a>31 pub fn set_default<'a>(blocking: BlockingImpl) -> DefaultGuard<'a> {
32     CURRENT.with(|cell| {
33         let prior = cell.replace(blocking);
34         DefaultGuard {
35             prior,
36             _lifetime: PhantomData,
37         }
38     })
39 }
40 
41 /// Set the default blocking implementation for the duration of the closure.
42 ///
43 /// **NOTE:** This is intended specifically for use by `tokio` 0.2's
44 /// backwards-compatibility layer. In general, user code should not override the
45 /// blocking implementation. If you use this, make sure you know what you're
46 /// doing.
with_default<F, R>(blocking: BlockingImpl, enter: &mut Enter, f: F) -> R where F: FnOnce(&mut Enter) -> R,47 pub fn with_default<F, R>(blocking: BlockingImpl, enter: &mut Enter, f: F) -> R
48 where
49     F: FnOnce(&mut Enter) -> R,
50 {
51     let _guard = set_default(blocking);
52     f(enter)
53 }
54 
55 /// Enter a blocking section of code.
56 ///
57 /// The `blocking` function annotates a section of code that performs a blocking
58 /// operation, either by issuing a blocking syscall or by performing a long
59 /// running CPU-bound computation.
60 ///
61 /// When the `blocking` function enters, it hands off the responsibility of
62 /// processing the current work queue to another thread. Then, it calls the
63 /// supplied closure. The closure is permitted to block indefinitely.
64 ///
65 /// If the maximum number of concurrent `blocking` calls has been reached, then
66 /// `NotReady` is returned and the task is notified once existing `blocking`
67 /// calls complete. The maximum value is specified when creating a thread pool
68 /// using [`Builder::max_blocking`][build]
69 ///
70 /// NB: The entire task that called `blocking` is blocked whenever the supplied
71 /// closure blocks, even if you have used future combinators such as `select` -
72 /// the other futures in this task will not make progress until the closure
73 /// returns.
74 /// If this is not desired, ensure that `blocking` runs in its own task (e.g.
75 /// using `futures::sync::oneshot::spawn`).
76 ///
77 /// [build]: struct.Builder.html#method.max_blocking
78 ///
79 /// # Return
80 ///
81 /// When the blocking closure is executed, `Ok(Ready(T))` is returned, where
82 /// `T` is the closure's return value.
83 ///
84 /// If the thread pool has shutdown, `Err` is returned.
85 ///
86 /// If the number of concurrent `blocking` calls has reached the maximum,
87 /// `Ok(NotReady)` is returned and the current task is notified when a call to
88 /// `blocking` will succeed.
89 ///
90 /// If `blocking` is called from outside the context of a Tokio thread pool,
91 /// `Err` is returned.
92 ///
93 /// # Background
94 ///
95 /// By default, the Tokio thread pool expects that tasks will only run for short
96 /// periods at a time before yielding back to the thread pool. This is the basic
97 /// premise of cooperative multitasking.
98 ///
99 /// However, it is common to want to perform a blocking operation while
100 /// processing an asynchronous computation. Examples of blocking operation
101 /// include:
102 ///
103 /// * Performing synchronous file operations (reading and writing).
104 /// * Blocking on acquiring a mutex.
105 /// * Performing a CPU bound computation, like cryptographic encryption or
106 ///   decryption.
107 ///
108 /// One option for dealing with blocking operations in an asynchronous context
109 /// is to use a thread pool dedicated to performing these operations. This not
110 /// ideal as it requires bidirectional message passing as well as a channel to
111 /// communicate which adds a level of buffering.
112 ///
113 /// Instead, `blocking` hands off the responsibility of processing the work queue
114 /// to another thread. This hand off is light compared to a channel and does not
115 /// require buffering.
116 ///
117 /// # Examples
118 ///
119 /// Block on receiving a message from a `std` channel. This example is a little
120 /// silly as using the non-blocking channel from the `futures` crate would make
121 /// more sense. The blocking receive can be replaced with any blocking operation
122 /// that needs to be performed.
123 ///
124 /// ```rust
125 /// # extern crate futures;
126 /// # extern crate tokio_threadpool;
127 ///
128 /// use tokio_threadpool::{ThreadPool, blocking};
129 ///
130 /// use futures::Future;
131 /// use futures::future::{lazy, poll_fn};
132 ///
133 /// use std::sync::mpsc;
134 /// use std::thread;
135 /// use std::time::Duration;
136 ///
137 /// pub fn main() {
138 ///     // This is a *blocking* channel
139 ///     let (tx, rx) = mpsc::channel();
140 ///
141 ///     // Spawn a thread to send a message
142 ///     thread::spawn(move || {
143 ///         thread::sleep(Duration::from_millis(500));
144 ///         tx.send("hello").unwrap();
145 ///     });
146 ///
147 ///     let pool = ThreadPool::new();
148 ///
149 ///     pool.spawn(lazy(move || {
150 ///         // Because `blocking` returns `Poll`, it is intended to be used
151 ///         // from the context of a `Future` implementation. Since we don't
152 ///         // have a complicated requirement, we can use `poll_fn` in this
153 ///         // case.
154 ///         poll_fn(move || {
155 ///             blocking(|| {
156 ///                 let msg = rx.recv().unwrap();
157 ///                 println!("message = {}", msg);
158 ///             }).map_err(|_| panic!("the threadpool shut down"))
159 ///         })
160 ///     }));
161 ///
162 ///     // Wait for the task we just spawned to complete.
163 ///     pool.shutdown_on_idle().wait().unwrap();
164 /// }
165 /// ```
blocking<F, T>(f: F) -> Poll<T, BlockingError> where F: FnOnce() -> T,166 pub fn blocking<F, T>(f: F) -> Poll<T, BlockingError>
167 where
168     F: FnOnce() -> T,
169 {
170     CURRENT.with(|cell| {
171         let blocking = cell.get();
172 
173         // Object-safety workaround: the `Blocking` trait must be object-safe,
174         // since we use a trait object in the thread-local. However, a blocking
175         // _operation_ will be generic over the return type of the blocking
176         // function. Therefore, rather than passing a function with a return
177         // type to `Blocking::run_blocking`, we pass a _new_ closure which
178         // doesn't have a return value. That closure invokes the blocking
179         // function and assigns its value to `ret`, which we then unpack when
180         // the blocking call finishes.
181         let mut f = Some(f);
182         let mut ret = None;
183         {
184             let ret2 = &mut ret;
185             let mut run = move || {
186                 let f = f
187                     .take()
188                     .expect("blocking closure invoked twice; this is a bug!");
189                 *ret2 = Some((f)());
190             };
191 
192             try_ready!((blocking)(&mut run));
193         }
194 
195         // Return the result
196         let ret =
197             ret.expect("blocking function finished, but return value was unset; this is a bug!");
198         Ok(ret.into())
199     })
200 }
201 
202 // === impl DefaultGuard ===
203 
204 impl<'a> fmt::Debug for DefaultGuard<'a> {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result205     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
206         f.pad("DefaultGuard { .. }")
207     }
208 }
209 
210 impl<'a> Drop for DefaultGuard<'a> {
drop(&mut self)211     fn drop(&mut self) {
212         // if the TLS value has already been torn down, there's nothing else we
213         // can do. we're almost certainly panicking anyway.
214         let _ = CURRENT.try_with(|cell| {
215             cell.set(self.prior);
216         });
217     }
218 }
219