1 use std::sync::{Arc, mpsc};
2 use std::thread;
3
4 use net::NetworkListener;
5
6 pub struct ListenerPool<A: NetworkListener> {
7 acceptor: A
8 }
9
10 impl<A: NetworkListener + Send + 'static> ListenerPool<A> {
11 /// Create a thread pool to manage the acceptor.
new(acceptor: A) -> ListenerPool<A>12 pub fn new(acceptor: A) -> ListenerPool<A> {
13 ListenerPool { acceptor: acceptor }
14 }
15
16 /// Runs the acceptor pool. Blocks until the acceptors are closed.
17 ///
18 /// ## Panics
19 ///
20 /// Panics if threads == 0.
accept<F>(self, work: F, threads: usize) where F: Fn(A::Stream) + Send + Sync + 'static21 pub fn accept<F>(self, work: F, threads: usize)
22 where F: Fn(A::Stream) + Send + Sync + 'static {
23 assert!(threads != 0, "Can't accept on 0 threads.");
24
25 let (super_tx, supervisor_rx) = mpsc::channel();
26
27 let work = Arc::new(work);
28
29 // Begin work.
30 for _ in 0..threads {
31 spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone())
32 }
33
34 // Monitor for panics.
35 // FIXME(reem): This won't ever exit since we still have a super_tx handle.
36 for _ in supervisor_rx.iter() {
37 spawn_with(super_tx.clone(), work.clone(), self.acceptor.clone());
38 }
39 }
40 }
41
spawn_with<A, F>(supervisor: mpsc::Sender<()>, work: Arc<F>, mut acceptor: A) where A: NetworkListener + Send + 'static, F: Fn(<A as NetworkListener>::Stream) + Send + Sync + 'static42 fn spawn_with<A, F>(supervisor: mpsc::Sender<()>, work: Arc<F>, mut acceptor: A)
43 where A: NetworkListener + Send + 'static,
44 F: Fn(<A as NetworkListener>::Stream) + Send + Sync + 'static {
45 thread::spawn(move || {
46 let _sentinel = Sentinel::new(supervisor, ());
47
48 loop {
49 match acceptor.accept() {
50 Ok(stream) => work(stream),
51 Err(e) => {
52 info!("Connection failed: {}", e);
53 }
54 }
55 }
56 });
57 }
58
59 struct Sentinel<T: Send + 'static> {
60 value: Option<T>,
61 supervisor: mpsc::Sender<T>,
62 }
63
64 impl<T: Send + 'static> Sentinel<T> {
new(channel: mpsc::Sender<T>, data: T) -> Sentinel<T>65 fn new(channel: mpsc::Sender<T>, data: T) -> Sentinel<T> {
66 Sentinel {
67 value: Some(data),
68 supervisor: channel,
69 }
70 }
71 }
72
73 impl<T: Send + 'static> Drop for Sentinel<T> {
drop(&mut self)74 fn drop(&mut self) {
75 // Respawn ourselves
76 let _ = self.supervisor.send(self.value.take().unwrap());
77 }
78 }
79
80