1 use {Reactor, Handle, Task};
2 use atomic_task::AtomicTask;
3 
4 use futures::{Future, Async, Poll, task};
5 
6 use std::io;
7 use std::thread;
8 use std::sync::Arc;
9 use std::sync::atomic::AtomicUsize;
10 use std::sync::atomic::Ordering::SeqCst;
11 
12 /// Handle to the reactor running on a background thread.
13 ///
14 /// Instances are created by calling [`Reactor::background`].
15 ///
16 /// [`Reactor::background`]: struct.Reactor.html#method.background
17 #[derive(Debug)]
18 pub struct Background {
19     /// When `None`, the reactor thread will run until the process terminates.
20     inner: Option<Inner>,
21 }
22 
23 /// Future that resolves when the reactor thread has shutdown.
24 #[derive(Debug)]
25 pub struct Shutdown {
26     inner: Inner,
27 }
28 
29 /// Actual Background handle.
30 #[derive(Debug)]
31 struct Inner {
32     /// Handle to the reactor
33     handle: Handle,
34 
35     /// Shared state between the background handle and the reactor thread.
36     shared: Arc<Shared>,
37 }
38 
39 #[derive(Debug)]
40 struct Shared {
41     /// Signal the reactor thread to shutdown.
42     shutdown: AtomicUsize,
43 
44     /// Task to notify when the reactor thread enters a shutdown state.
45     shutdown_task: AtomicTask,
46 }
47 
48 /// Notifies the reactor thread to shutdown once the reactor becomes idle.
49 const SHUTDOWN_IDLE: usize = 1;
50 
51 /// Notifies the reactor thread to shutdown immediately.
52 const SHUTDOWN_NOW: usize = 2;
53 
54 /// The reactor is currently shutdown.
55 const SHUTDOWN: usize = 3;
56 
57 // ===== impl Background =====
58 
59 impl Background {
60     /// Launch a reactor in the background and return a handle to the thread.
new(reactor: Reactor) -> io::Result<Background>61     pub(crate) fn new(reactor: Reactor) -> io::Result<Background> {
62         // Grab a handle to the reactor
63         let handle = reactor.handle().clone();
64 
65         // Create the state shared between the background handle and the reactor
66         // thread.
67         let shared = Arc::new(Shared {
68             shutdown: AtomicUsize::new(0),
69             shutdown_task: AtomicTask::new(),
70         });
71 
72         // For the reactor thread
73         let shared2 = shared.clone();
74 
75         // Start the reactor thread
76         thread::Builder::new()
77             .spawn(move || run(reactor, shared2))?;
78 
79         Ok(Background {
80             inner: Some(Inner {
81                 handle,
82                 shared,
83             }),
84         })
85     }
86 
87     /// Returns a reference to the reactor handle.
handle(&self) -> &Handle88     pub fn handle(&self) -> &Handle {
89         &self.inner.as_ref().unwrap().handle
90     }
91 
92     /// Shutdown the reactor on idle.
93     ///
94     /// Returns a future that completes once the reactor thread has shutdown.
shutdown_on_idle(mut self) -> Shutdown95     pub fn shutdown_on_idle(mut self) -> Shutdown {
96         let inner = self.inner.take().unwrap();
97         inner.shutdown_on_idle();
98 
99         Shutdown { inner }
100     }
101 
102     /// Shutdown the reactor immediately
103     ///
104     /// Returns a future that completes once the reactor thread has shutdown.
shutdown_now(mut self) -> Shutdown105     pub fn shutdown_now(mut self) -> Shutdown {
106         let inner = self.inner.take().unwrap();
107         inner.shutdown_now();
108 
109         Shutdown { inner }
110     }
111 
112     /// Run the reactor on its thread until the process terminates.
forget(mut self)113     pub fn forget(mut self) {
114         drop(self.inner.take());
115     }
116 }
117 
118 impl Drop for Background {
drop(&mut self)119     fn drop(&mut self) {
120         let inner = match self.inner.take() {
121             Some(i) => i,
122             None => return,
123         };
124 
125         inner.shutdown_now();
126 
127         let shutdown = Shutdown { inner };
128         let _ = shutdown.wait();
129     }
130 }
131 
132 // ===== impl Shutdown =====
133 
134 impl Future for Shutdown {
135     type Item = ();
136     type Error = ();
137 
poll(&mut self) -> Poll<(), ()>138     fn poll(&mut self) -> Poll<(), ()> {
139         let task = Task::Futures1(task::current());
140         self.inner.shared.shutdown_task.register_task(task);
141 
142         if !self.inner.is_shutdown() {
143             return Ok(Async::NotReady);
144         }
145 
146         Ok(().into())
147     }
148 }
149 
150 // ===== impl Inner =====
151 
152 impl Inner {
153     /// Returns true if the reactor thread is shutdown.
is_shutdown(&self) -> bool154     fn is_shutdown(&self) -> bool {
155         self.shared.shutdown.load(SeqCst) == SHUTDOWN
156     }
157 
158     /// Notify the reactor thread to shutdown once the reactor transitions to an
159     /// idle state.
shutdown_on_idle(&self)160     fn shutdown_on_idle(&self) {
161        self.shared.shutdown
162            .compare_and_swap(0, SHUTDOWN_IDLE, SeqCst);
163        self.handle.wakeup();
164     }
165 
166     /// Notify the reactor thread to shutdown immediately.
shutdown_now(&self)167     fn shutdown_now(&self) {
168         let mut curr = self.shared.shutdown.load(SeqCst);
169 
170         loop {
171             if curr >= SHUTDOWN_NOW {
172                 return;
173             }
174 
175             let act = self.shared.shutdown
176                 .compare_and_swap(curr, SHUTDOWN_NOW, SeqCst);
177 
178             if act == curr {
179                 self.handle.wakeup();
180                 return;
181             }
182 
183             curr = act;
184         }
185     }
186 }
187 
188 // ===== impl Reactor thread =====
189 
run(mut reactor: Reactor, shared: Arc<Shared>)190 fn run(mut reactor: Reactor, shared: Arc<Shared>) {
191     debug!("starting background reactor");
192     loop {
193         let shutdown = shared.shutdown.load(SeqCst);
194 
195         if shutdown == SHUTDOWN_NOW {
196             debug!("shutting background reactor down NOW");
197             break;
198         }
199 
200         if shutdown == SHUTDOWN_IDLE && reactor.is_idle() {
201             debug!("shutting background reactor on idle");
202             break;
203         }
204 
205         reactor.turn(None).unwrap();
206     }
207 
208     drop(reactor);
209 
210     // Transition the state to shutdown
211     shared.shutdown.store(SHUTDOWN, SeqCst);
212 
213     // Notify any waiters
214     shared.shutdown_task.notify();
215 
216     debug!("background reactor has shutdown");
217 }
218