1 use std::os::unix::io::RawFd;
2 use std::sync::atomic::Ordering;
3 use std::sync::Arc;
4 use std::time::Duration;
5 use std::{cmp, io, isize, ptr};
6 
7 use super::{from_nix_error, timeout_handler, EventData, IoData, TimerList};
8 use crate::coroutine_impl::run_coroutine;
9 use crate::scheduler::get_scheduler;
10 use crate::timeout_list::{now, ns_to_ms};
11 use crossbeam::queue::SegQueue as mpsc;
12 use libc::{eventfd, EFD_NONBLOCK};
13 use nix::sys::epoll::*;
14 use nix::unistd::{close, read, write};
15 use smallvec::SmallVec;
16 
create_eventfd() -> io::Result<RawFd>17 fn create_eventfd() -> io::Result<RawFd> {
18     let fd = unsafe { eventfd(0, EFD_NONBLOCK) };
19     if fd < 0 {
20         return Err(io::Error::last_os_error());
21     }
22     Ok(fd as RawFd)
23 }
24 
25 pub type SysEvent = EpollEvent;
26 
27 struct SingleSelector {
28     epfd: RawFd,
29     evfd: RawFd,
30     timer_list: TimerList,
31     free_ev: mpsc<Arc<EventData>>,
32 }
33 
34 impl SingleSelector {
new() -> io::Result<Self>35     pub fn new() -> io::Result<Self> {
36         // wakeup data is 0
37         let mut info = EpollEvent::new(EpollFlags::EPOLLET | EpollFlags::EPOLLIN, 0);
38 
39         let epfd = epoll_create().map_err(from_nix_error)?;
40         let evfd = create_eventfd()?;
41 
42         // add the eventfd to the epfd
43         epoll_ctl(epfd, EpollOp::EpollCtlAdd, evfd, &mut info).map_err(from_nix_error)?;
44 
45         Ok(SingleSelector {
46             epfd,
47             evfd,
48             free_ev: mpsc::new(),
49             timer_list: TimerList::new(),
50         })
51     }
52 }
53 
54 impl Drop for SingleSelector {
drop(&mut self)55     fn drop(&mut self) {
56         let _ = close(self.evfd);
57         let _ = close(self.epfd);
58     }
59 }
60 
61 pub struct Selector {
62     // 128 should be fine for max io threads
63     vec: SmallVec<[SingleSelector; 128]>,
64 }
65 
66 impl Selector {
new(io_workers: usize) -> io::Result<Self>67     pub fn new(io_workers: usize) -> io::Result<Self> {
68         let mut s = Selector {
69             vec: SmallVec::new(),
70         };
71 
72         for _ in 0..io_workers {
73             let ss = SingleSelector::new()?;
74             s.vec.push(ss);
75         }
76 
77         Ok(s)
78     }
79 
select( &self, id: usize, events: &mut [SysEvent], timeout: Option<u64>, ) -> io::Result<Option<u64>>80     pub fn select(
81         &self,
82         id: usize,
83         events: &mut [SysEvent],
84         timeout: Option<u64>,
85     ) -> io::Result<Option<u64>> {
86         // let mut ev = EpollEvent::new(EpollFlags::EPOLLIN, 0);
87         let timeout_ms = timeout
88             .map(|to| cmp::min(ns_to_ms(to), isize::MAX as u64) as isize)
89             .unwrap_or(-1);
90         // info!("select; timeout={:?}", timeout_ms);
91 
92         // Wait for epoll events for at most timeout_ms milliseconds
93         let mask = 1 << id;
94         let single_selector = unsafe { self.vec.get_unchecked(id) };
95         let epfd = single_selector.epfd;
96         // first register thread handle
97         let scheduler = get_scheduler();
98         scheduler.workers.parked.fetch_or(mask, Ordering::Relaxed);
99 
100         let n = epoll_wait(epfd, events, timeout_ms).map_err(from_nix_error)?;
101 
102         // clear the park stat after comeback
103         scheduler.workers.parked.fetch_and(!mask, Ordering::Relaxed);
104 
105         for event in events[..n].iter() {
106             if event.data() == 0 {
107                 #[cold]
108                 {
109                     // this is just a wakeup event, ignore it
110                     let mut buf = [0u8; 8];
111                     // clear the eventfd, ignore the result
112                     read(single_selector.evfd, &mut buf).ok();
113                     // info!("got wakeup event in select, id={}", id);
114                     continue;
115                 }
116             }
117             let data = unsafe { &mut *(event.data() as *mut EventData) };
118             // info!("select got event, data={:p}", data);
119             data.io_flag.store(true, Ordering::Release);
120 
121             // first check the atomic co, this may be grab by the worker first
122             let co = match data.co.take(Ordering::Acquire) {
123                 None => continue,
124                 Some(co) => co,
125             };
126             co.prefetch();
127 
128             // it's safe to remove the timer since we are running the timer_list in the same thread
129             data.timer.borrow_mut().take().map(|h| {
130                 unsafe {
131                     // tell the timer handler not to cancel the io
132                     // it's not always true that you can really remove the timer entry
133                     h.with_mut_data(|value| value.data.event_data = ptr::null_mut());
134                 }
135                 h.remove()
136             });
137 
138             // schedule the coroutine
139             run_coroutine(co);
140         }
141 
142         // run all the local tasks
143         scheduler.run_queued_tasks(id);
144 
145         // free the unused event_data
146         self.free_unused_event_data(id);
147 
148         // deal with the timer list
149         let next_expire = single_selector
150             .timer_list
151             .schedule_timer(now(), &timeout_handler);
152         Ok(next_expire)
153     }
154 
155     // this will post an os event so that we can wake up the event loop
156     #[inline]
wakeup(&self, id: usize)157     pub fn wakeup(&self, id: usize) {
158         let buf = unsafe { ::std::slice::from_raw_parts(&1u64 as *const u64 as _, 8) };
159         let ret = write(unsafe { self.vec.get_unchecked(id) }.evfd, buf);
160         info!("wakeup id={:?}, ret={:?}", id, ret);
161     }
162 
163     // register io event to the selector
164     #[inline]
add_fd(&self, io_data: IoData) -> io::Result<IoData>165     pub fn add_fd(&self, io_data: IoData) -> io::Result<IoData> {
166         let mut info = EpollEvent::new(
167             EpollFlags::EPOLLIN
168                 | EpollFlags::EPOLLOUT
169                 | EpollFlags::EPOLLRDHUP
170                 | EpollFlags::EPOLLET,
171             io_data.as_ref() as *const _ as _,
172         );
173 
174         let fd = io_data.fd;
175         let id = fd as usize % self.vec.len();
176         let single_selector = unsafe { self.vec.get_unchecked(id) };
177         let epfd = single_selector.epfd;
178         info!("add fd to epoll select, fd={:?}", fd);
179         epoll_ctl(epfd, EpollOp::EpollCtlAdd, fd, &mut info)
180             .map_err(from_nix_error)
181             .map(|_| io_data)
182     }
183 
184     #[inline]
del_fd(&self, io_data: &IoData)185     pub fn del_fd(&self, io_data: &IoData) {
186         use std::ops::Deref;
187 
188         let mut info = EpollEvent::empty();
189 
190         if let Some(h) = io_data.timer.borrow_mut().take() {
191             unsafe {
192                 // mark the timer as removed if any, this only happened
193                 // when cancel an IO. what if the timer expired at the same time?
194                 // because we run this func in the user space, so the timer handler
195                 // will not got the coroutine
196                 h.with_mut_data(|value| value.data.event_data = ptr::null_mut());
197             }
198         }
199 
200         let fd = io_data.fd;
201         let id = fd as usize % self.vec.len();
202         let single_selector = unsafe { self.vec.get_unchecked(id) };
203         let epfd = single_selector.epfd;
204         info!("del fd from epoll select, fd={:?}", fd);
205         epoll_ctl(epfd, EpollOp::EpollCtlDel, fd, &mut info).ok();
206 
207         // after EpollCtlDel push the unused event data
208         single_selector.free_ev.push(io_data.deref().clone());
209     }
210 
211     // we can't free the event data directly in the worker thread
212     // must free them before the next epoll_wait
213     #[inline]
free_unused_event_data(&self, id: usize)214     fn free_unused_event_data(&self, id: usize) {
215         let free_ev = &unsafe { self.vec.get_unchecked(id) }.free_ev;
216         while let Ok(_) = free_ev.pop() {}
217     }
218 
219     // register the io request to the timeout list
220     #[inline]
add_io_timer(&self, io: &IoData, timeout: Duration)221     pub fn add_io_timer(&self, io: &IoData, timeout: Duration) {
222         let id = io.fd as usize % self.vec.len();
223         // info!("io timeout = {:?}", dur);
224         let (h, b_new) = unsafe { self.vec.get_unchecked(id) }
225             .timer_list
226             .add_timer(timeout, io.timer_data());
227         if b_new {
228             // wake up the event loop thread to recall the next wait timeout
229             self.wakeup(id);
230         }
231         io.timer.borrow_mut().replace(h);
232     }
233 }
234