1 use std::sync::{Arc, mpsc};
2 use std::thread;
3 
4 use net::NetworkListener;
5 
6 pub struct ListenerPool<A: NetworkListener> {
7     acceptor: A
8 }
9 
10 impl<A: NetworkListener + Send + 'static> ListenerPool<A> {
11     /// Create a thread pool to manage the acceptor.
new(acceptor: A) -> ListenerPool<A>12     pub fn new(acceptor: A) -> ListenerPool<A> {
13         ListenerPool { acceptor: acceptor }
14     }
15 
16     /// Runs the acceptor pool. Blocks until the acceptors are closed.
17     ///
18     /// ## Panics
19     ///
20     /// Panics if threads == 0.
accept<F>(self, work: F, threads: usize) where F: Fn(A::Stream) + Send + Sync + 'static21     pub fn accept<F>(self, work: F, threads: usize)
22         where F: Fn(A::Stream) + Send + Sync + 'static {
23         assert!(threads != 0, "Can't accept on 0 threads.");
24 
25         let (super_tx, supervisor_rx) = mpsc::channel();
26 
27         let work = Arc::new(work);
28 
29         // Begin work.
30         for _ in 0..threads {
31             spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone())
32         }
33 
34         // Monitor for panics.
35         // FIXME(reem): This won't ever exit since we still have a super_tx handle.
36         for _ in supervisor_rx.iter() {
37             spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone());
38         }
39     }
40 }
41 
spawn_with<A, F>(supervisor: mpsc::Sender<()>, work: Arc<F>, mut acceptor: A) where A: NetworkListener + Send + 'static, F: Fn(<A as NetworkListener>::Stream) + Send + Sync + 'static42 fn spawn_with<A, F>(supervisor: mpsc::Sender<()>, work: Arc<F>, mut acceptor: A)
43 where A: NetworkListener + Send + 'static,
44       F: Fn(<A as NetworkListener>::Stream) + Send + Sync + 'static {
45     thread::spawn(move || {
46         let _sentinel = Sentinel::new(supervisor, ());
47 
48         loop {
49             match acceptor.accept() {
50                 Ok(stream) => work(stream),
51                 Err(e) => {
52                     info!("Connection failed: {}", e);
53                 }
54             }
55         }
56     });
57 }
58 
59 struct Sentinel<T: Send + 'static> {
60     value: Option<T>,
61     supervisor: mpsc::Sender<T>,
62 }
63 
64 impl<T: Send + 'static> Sentinel<T> {
new(channel: mpsc::Sender<T>, data: T) -> Sentinel<T>65     fn new(channel: mpsc::Sender<T>, data: T) -> Sentinel<T> {
66         Sentinel {
67             value: Some(data),
68             supervisor: channel,
69         }
70     }
71 }
72 
73 impl<T: Send + 'static> Drop for Sentinel<T> {
drop(&mut self)74     fn drop(&mut self) {
75         // Respawn ourselves
76         let _ = self.supervisor.send(self.value.take().unwrap());
77     }
78 }
79 
80