1 use {Reactor, Handle, Task};
2 use atomic_task::AtomicTask;
3
4 use futures::{Future, Async, Poll, task};
5
6 use std::io;
7 use std::thread;
8 use std::sync::Arc;
9 use std::sync::atomic::AtomicUsize;
10 use std::sync::atomic::Ordering::SeqCst;
11
12 /// Handle to the reactor running on a background thread.
13 ///
14 /// Instances are created by calling [`Reactor::background`].
15 ///
16 /// [`Reactor::background`]: struct.Reactor.html#method.background
17 #[derive(Debug)]
18 pub struct Background {
19 /// When `None`, the reactor thread will run until the process terminates.
20 inner: Option<Inner>,
21 }
22
23 /// Future that resolves when the reactor thread has shutdown.
24 #[derive(Debug)]
25 pub struct Shutdown {
26 inner: Inner,
27 }
28
29 /// Actual Background handle.
30 #[derive(Debug)]
31 struct Inner {
32 /// Handle to the reactor
33 handle: Handle,
34
35 /// Shared state between the background handle and the reactor thread.
36 shared: Arc<Shared>,
37 }
38
39 #[derive(Debug)]
40 struct Shared {
41 /// Signal the reactor thread to shutdown.
42 shutdown: AtomicUsize,
43
44 /// Task to notify when the reactor thread enters a shutdown state.
45 shutdown_task: AtomicTask,
46 }
47
48 /// Notifies the reactor thread to shutdown once the reactor becomes idle.
49 const SHUTDOWN_IDLE: usize = 1;
50
51 /// Notifies the reactor thread to shutdown immediately.
52 const SHUTDOWN_NOW: usize = 2;
53
54 /// The reactor is currently shutdown.
55 const SHUTDOWN: usize = 3;
56
57 // ===== impl Background =====
58
59 impl Background {
60 /// Launch a reactor in the background and return a handle to the thread.
new(reactor: Reactor) -> io::Result<Background>61 pub(crate) fn new(reactor: Reactor) -> io::Result<Background> {
62 // Grab a handle to the reactor
63 let handle = reactor.handle().clone();
64
65 // Create the state shared between the background handle and the reactor
66 // thread.
67 let shared = Arc::new(Shared {
68 shutdown: AtomicUsize::new(0),
69 shutdown_task: AtomicTask::new(),
70 });
71
72 // For the reactor thread
73 let shared2 = shared.clone();
74
75 // Start the reactor thread
76 thread::Builder::new()
77 .spawn(move || run(reactor, shared2))?;
78
79 Ok(Background {
80 inner: Some(Inner {
81 handle,
82 shared,
83 }),
84 })
85 }
86
87 /// Returns a reference to the reactor handle.
handle(&self) -> &Handle88 pub fn handle(&self) -> &Handle {
89 &self.inner.as_ref().unwrap().handle
90 }
91
92 /// Shutdown the reactor on idle.
93 ///
94 /// Returns a future that completes once the reactor thread has shutdown.
shutdown_on_idle(mut self) -> Shutdown95 pub fn shutdown_on_idle(mut self) -> Shutdown {
96 let inner = self.inner.take().unwrap();
97 inner.shutdown_on_idle();
98
99 Shutdown { inner }
100 }
101
102 /// Shutdown the reactor immediately
103 ///
104 /// Returns a future that completes once the reactor thread has shutdown.
shutdown_now(mut self) -> Shutdown105 pub fn shutdown_now(mut self) -> Shutdown {
106 let inner = self.inner.take().unwrap();
107 inner.shutdown_now();
108
109 Shutdown { inner }
110 }
111
112 /// Run the reactor on its thread until the process terminates.
forget(mut self)113 pub fn forget(mut self) {
114 drop(self.inner.take());
115 }
116 }
117
118 impl Drop for Background {
drop(&mut self)119 fn drop(&mut self) {
120 let inner = match self.inner.take() {
121 Some(i) => i,
122 None => return,
123 };
124
125 inner.shutdown_now();
126
127 let shutdown = Shutdown { inner };
128 let _ = shutdown.wait();
129 }
130 }
131
132 // ===== impl Shutdown =====
133
134 impl Future for Shutdown {
135 type Item = ();
136 type Error = ();
137
poll(&mut self) -> Poll<(), ()>138 fn poll(&mut self) -> Poll<(), ()> {
139 let task = Task::Futures1(task::current());
140 self.inner.shared.shutdown_task.register_task(task);
141
142 if !self.inner.is_shutdown() {
143 return Ok(Async::NotReady);
144 }
145
146 Ok(().into())
147 }
148 }
149
150 // ===== impl Inner =====
151
152 impl Inner {
153 /// Returns true if the reactor thread is shutdown.
is_shutdown(&self) -> bool154 fn is_shutdown(&self) -> bool {
155 self.shared.shutdown.load(SeqCst) == SHUTDOWN
156 }
157
158 /// Notify the reactor thread to shutdown once the reactor transitions to an
159 /// idle state.
shutdown_on_idle(&self)160 fn shutdown_on_idle(&self) {
161 self.shared.shutdown
162 .compare_and_swap(0, SHUTDOWN_IDLE, SeqCst);
163 self.handle.wakeup();
164 }
165
166 /// Notify the reactor thread to shutdown immediately.
shutdown_now(&self)167 fn shutdown_now(&self) {
168 let mut curr = self.shared.shutdown.load(SeqCst);
169
170 loop {
171 if curr >= SHUTDOWN_NOW {
172 return;
173 }
174
175 let act = self.shared.shutdown
176 .compare_and_swap(curr, SHUTDOWN_NOW, SeqCst);
177
178 if act == curr {
179 self.handle.wakeup();
180 return;
181 }
182
183 curr = act;
184 }
185 }
186 }
187
188 // ===== impl Reactor thread =====
189
run(mut reactor: Reactor, shared: Arc<Shared>)190 fn run(mut reactor: Reactor, shared: Arc<Shared>) {
191 debug!("starting background reactor");
192 loop {
193 let shutdown = shared.shutdown.load(SeqCst);
194
195 if shutdown == SHUTDOWN_NOW {
196 debug!("shutting background reactor down NOW");
197 break;
198 }
199
200 if shutdown == SHUTDOWN_IDLE && reactor.is_idle() {
201 debug!("shutting background reactor on idle");
202 break;
203 }
204
205 reactor.turn(None).unwrap();
206 }
207
208 drop(reactor);
209
210 // Transition the state to shutdown
211 shared.shutdown.store(SHUTDOWN, SeqCst);
212
213 // Notify any waiters
214 shared.shutdown_task.notify();
215
216 debug!("background reactor has shutdown");
217 }
218