1 use std::os::unix::io::RawFd;
2 use std::sync::atomic::Ordering;
3 use std::sync::Arc;
4 use std::time::Duration;
5 use std::{cmp, io, isize, ptr};
6
7 use super::{from_nix_error, timeout_handler, EventData, IoData, TimerList};
8 use crate::coroutine_impl::run_coroutine;
9 use crate::scheduler::get_scheduler;
10 use crate::timeout_list::{now, ns_to_ms};
11 use crossbeam::queue::SegQueue as mpsc;
12 use libc::{eventfd, EFD_NONBLOCK};
13 use nix::sys::epoll::*;
14 use nix::unistd::{close, read, write};
15 use smallvec::SmallVec;
16
create_eventfd() -> io::Result<RawFd>17 fn create_eventfd() -> io::Result<RawFd> {
18 let fd = unsafe { eventfd(0, EFD_NONBLOCK) };
19 if fd < 0 {
20 return Err(io::Error::last_os_error());
21 }
22 Ok(fd as RawFd)
23 }
24
25 pub type SysEvent = EpollEvent;
26
27 struct SingleSelector {
28 epfd: RawFd,
29 evfd: RawFd,
30 timer_list: TimerList,
31 free_ev: mpsc<Arc<EventData>>,
32 }
33
34 impl SingleSelector {
new() -> io::Result<Self>35 pub fn new() -> io::Result<Self> {
36 // wakeup data is 0
37 let mut info = EpollEvent::new(EpollFlags::EPOLLET | EpollFlags::EPOLLIN, 0);
38
39 let epfd = epoll_create().map_err(from_nix_error)?;
40 let evfd = create_eventfd()?;
41
42 // add the eventfd to the epfd
43 epoll_ctl(epfd, EpollOp::EpollCtlAdd, evfd, &mut info).map_err(from_nix_error)?;
44
45 Ok(SingleSelector {
46 epfd,
47 evfd,
48 free_ev: mpsc::new(),
49 timer_list: TimerList::new(),
50 })
51 }
52 }
53
54 impl Drop for SingleSelector {
drop(&mut self)55 fn drop(&mut self) {
56 let _ = close(self.evfd);
57 let _ = close(self.epfd);
58 }
59 }
60
61 pub struct Selector {
62 // 128 should be fine for max io threads
63 vec: SmallVec<[SingleSelector; 128]>,
64 }
65
66 impl Selector {
new(io_workers: usize) -> io::Result<Self>67 pub fn new(io_workers: usize) -> io::Result<Self> {
68 let mut s = Selector {
69 vec: SmallVec::new(),
70 };
71
72 for _ in 0..io_workers {
73 let ss = SingleSelector::new()?;
74 s.vec.push(ss);
75 }
76
77 Ok(s)
78 }
79
select( &self, id: usize, events: &mut [SysEvent], timeout: Option<u64>, ) -> io::Result<Option<u64>>80 pub fn select(
81 &self,
82 id: usize,
83 events: &mut [SysEvent],
84 timeout: Option<u64>,
85 ) -> io::Result<Option<u64>> {
86 // let mut ev = EpollEvent::new(EpollFlags::EPOLLIN, 0);
87 let timeout_ms = timeout
88 .map(|to| cmp::min(ns_to_ms(to), isize::MAX as u64) as isize)
89 .unwrap_or(-1);
90 // info!("select; timeout={:?}", timeout_ms);
91
92 // Wait for epoll events for at most timeout_ms milliseconds
93 let mask = 1 << id;
94 let single_selector = unsafe { self.vec.get_unchecked(id) };
95 let epfd = single_selector.epfd;
96 // first register thread handle
97 let scheduler = get_scheduler();
98 scheduler.workers.parked.fetch_or(mask, Ordering::Relaxed);
99
100 let n = epoll_wait(epfd, events, timeout_ms).map_err(from_nix_error)?;
101
102 // clear the park stat after comeback
103 scheduler.workers.parked.fetch_and(!mask, Ordering::Relaxed);
104
105 for event in events[..n].iter() {
106 if event.data() == 0 {
107 #[cold]
108 {
109 // this is just a wakeup event, ignore it
110 let mut buf = [0u8; 8];
111 // clear the eventfd, ignore the result
112 read(single_selector.evfd, &mut buf).ok();
113 // info!("got wakeup event in select, id={}", id);
114 continue;
115 }
116 }
117 let data = unsafe { &mut *(event.data() as *mut EventData) };
118 // info!("select got event, data={:p}", data);
119 data.io_flag.store(true, Ordering::Release);
120
121 // first check the atomic co, this may be grab by the worker first
122 let co = match data.co.take(Ordering::Acquire) {
123 None => continue,
124 Some(co) => co,
125 };
126 co.prefetch();
127
128 // it's safe to remove the timer since we are running the timer_list in the same thread
129 data.timer.borrow_mut().take().map(|h| {
130 unsafe {
131 // tell the timer handler not to cancel the io
132 // it's not always true that you can really remove the timer entry
133 h.with_mut_data(|value| value.data.event_data = ptr::null_mut());
134 }
135 h.remove()
136 });
137
138 // schedule the coroutine
139 run_coroutine(co);
140 }
141
142 // run all the local tasks
143 scheduler.run_queued_tasks(id);
144
145 // free the unused event_data
146 self.free_unused_event_data(id);
147
148 // deal with the timer list
149 let next_expire = single_selector
150 .timer_list
151 .schedule_timer(now(), &timeout_handler);
152 Ok(next_expire)
153 }
154
155 // this will post an os event so that we can wake up the event loop
156 #[inline]
wakeup(&self, id: usize)157 pub fn wakeup(&self, id: usize) {
158 let buf = unsafe { ::std::slice::from_raw_parts(&1u64 as *const u64 as _, 8) };
159 let ret = write(unsafe { self.vec.get_unchecked(id) }.evfd, buf);
160 info!("wakeup id={:?}, ret={:?}", id, ret);
161 }
162
163 // register io event to the selector
164 #[inline]
add_fd(&self, io_data: IoData) -> io::Result<IoData>165 pub fn add_fd(&self, io_data: IoData) -> io::Result<IoData> {
166 let mut info = EpollEvent::new(
167 EpollFlags::EPOLLIN
168 | EpollFlags::EPOLLOUT
169 | EpollFlags::EPOLLRDHUP
170 | EpollFlags::EPOLLET,
171 io_data.as_ref() as *const _ as _,
172 );
173
174 let fd = io_data.fd;
175 let id = fd as usize % self.vec.len();
176 let single_selector = unsafe { self.vec.get_unchecked(id) };
177 let epfd = single_selector.epfd;
178 info!("add fd to epoll select, fd={:?}", fd);
179 epoll_ctl(epfd, EpollOp::EpollCtlAdd, fd, &mut info)
180 .map_err(from_nix_error)
181 .map(|_| io_data)
182 }
183
184 #[inline]
del_fd(&self, io_data: &IoData)185 pub fn del_fd(&self, io_data: &IoData) {
186 use std::ops::Deref;
187
188 let mut info = EpollEvent::empty();
189
190 if let Some(h) = io_data.timer.borrow_mut().take() {
191 unsafe {
192 // mark the timer as removed if any, this only happened
193 // when cancel an IO. what if the timer expired at the same time?
194 // because we run this func in the user space, so the timer handler
195 // will not got the coroutine
196 h.with_mut_data(|value| value.data.event_data = ptr::null_mut());
197 }
198 }
199
200 let fd = io_data.fd;
201 let id = fd as usize % self.vec.len();
202 let single_selector = unsafe { self.vec.get_unchecked(id) };
203 let epfd = single_selector.epfd;
204 info!("del fd from epoll select, fd={:?}", fd);
205 epoll_ctl(epfd, EpollOp::EpollCtlDel, fd, &mut info).ok();
206
207 // after EpollCtlDel push the unused event data
208 single_selector.free_ev.push(io_data.deref().clone());
209 }
210
211 // we can't free the event data directly in the worker thread
212 // must free them before the next epoll_wait
213 #[inline]
free_unused_event_data(&self, id: usize)214 fn free_unused_event_data(&self, id: usize) {
215 let free_ev = &unsafe { self.vec.get_unchecked(id) }.free_ev;
216 while let Ok(_) = free_ev.pop() {}
217 }
218
219 // register the io request to the timeout list
220 #[inline]
add_io_timer(&self, io: &IoData, timeout: Duration)221 pub fn add_io_timer(&self, io: &IoData, timeout: Duration) {
222 let id = io.fd as usize % self.vec.len();
223 // info!("io timeout = {:?}", dur);
224 let (h, b_new) = unsafe { self.vec.get_unchecked(id) }
225 .timer_list
226 .add_timer(timeout, io.timer_data());
227 if b_new {
228 // wake up the event loop thread to recall the next wait timeout
229 self.wakeup(id);
230 }
231 io.timer.borrow_mut().replace(h);
232 }
233 }
234