1 #[cfg(any(target_os = "linux", target_os = "android"))]
2 #[path = "epoll.rs"]
3 mod select;
4 
5 #[cfg(any(
6     target_os = "bitrig",
7     target_os = "dragonfly",
8     target_os = "freebsd",
9     target_os = "ios",
10     target_os = "macos",
11     target_os = "netbsd",
12     target_os = "openbsd"
13 ))]
14 #[path = "kqueue.rs"]
15 mod select;
16 
17 pub mod cancel;
18 pub mod co_io;
19 pub mod net;
20 pub mod wait_io;
21 
22 use std::cell::RefCell;
23 use std::ops::Deref;
24 use std::os::unix::io::{AsRawFd, RawFd};
25 use std::sync::atomic::{AtomicBool, Ordering};
26 use std::sync::Arc;
27 use std::{fmt, io, ptr};
28 
29 use crate::coroutine_impl::{run_coroutine, CoroutineImpl};
30 use crate::scheduler::get_scheduler;
31 use crate::sync::AtomicOption;
32 use crate::timeout_list::{TimeOutList, TimeoutHandle};
33 use crate::yield_now::{get_co_para, set_co_para};
34 use nix;
35 
36 pub use self::select::{Selector, SysEvent};
37 
38 #[inline]
add_socket<T: AsRawFd + ?Sized>(t: &T) -> io::Result<IoData>39 pub fn add_socket<T: AsRawFd + ?Sized>(t: &T) -> io::Result<IoData> {
40     get_scheduler().get_selector().add_fd(IoData::new(t))
41 }
42 
43 #[inline]
del_socket(io: &IoData)44 fn del_socket(io: &IoData) {
45     // transfer the io to the selector
46     get_scheduler().get_selector().del_fd(io);
47 }
48 
49 // deal with the io result
50 #[inline]
co_io_result() -> io::Result<()>51 fn co_io_result() -> io::Result<()> {
52     match get_co_para() {
53         None => Ok(()),
54         #[cold]
55         Some(err) => Err(err),
56     }
57 }
58 
59 #[inline]
from_nix_error(err: nix::Error) -> ::std::io::Error60 fn from_nix_error(err: nix::Error) -> ::std::io::Error {
61     use nix::Error::*;
62 
63     match err {
64         Sys(errno) => ::std::io::Error::from_raw_os_error(errno as i32),
65         #[cold]
66         _ => ::std::io::Error::new(::std::io::ErrorKind::Other, "nix other error"),
67     }
68 }
69 
timeout_handler(data: TimerData)70 fn timeout_handler(data: TimerData) {
71     if data.event_data.is_null() {
72         return;
73     }
74 
75     let event_data = unsafe { &mut *data.event_data };
76     // remove the event timer
77     event_data.timer.borrow_mut().take();
78 
79     // get and check the coroutine
80     let mut co = match event_data.co.take(Ordering::Relaxed) {
81         Some(co) => co,
82         None => return,
83     };
84 
85     set_co_para(&mut co, io::Error::new(io::ErrorKind::TimedOut, "timeout"));
86 
87     // resume the coroutine with timeout error
88     run_coroutine(co);
89     drop(data); // explicitly consume the data
90 }
91 
92 // the timeout data
93 pub struct TimerData {
94     event_data: *mut EventData,
95 }
96 
97 pub type TimerList = TimeOutList<TimerData>;
98 pub type TimerHandle = TimeoutHandle<TimerData>;
99 
100 // event associated io data, must be construct in
101 // each file handle, the epoll event.data would point to it
102 pub struct EventData {
103     pub fd: RawFd,
104     pub io_flag: AtomicBool,
105     pub timer: RefCell<Option<TimerHandle>>,
106     pub co: AtomicOption<CoroutineImpl>,
107 }
108 
109 unsafe impl Send for EventData {}
110 unsafe impl Sync for EventData {}
111 
112 impl EventData {
new(fd: RawFd) -> EventData113     pub fn new(fd: RawFd) -> EventData {
114         EventData {
115             fd,
116             io_flag: AtomicBool::new(false),
117             timer: RefCell::new(None),
118             co: AtomicOption::none(),
119         }
120     }
121 
timer_data(&self) -> TimerData122     pub fn timer_data(&self) -> TimerData {
123         TimerData {
124             event_data: self as *const _ as *mut _,
125         }
126     }
127 
128     #[inline]
schedule(&self)129     pub fn schedule(&self) {
130         info!("event schedul");
131         let co = match self.co.take(Ordering::Acquire) {
132             None => return, // it's already take by selector
133             Some(co) => co,
134         };
135 
136         // it's safe to remove the timer since we are running the timer_list in the same thread
137         self.timer.borrow_mut().take().map(|h| {
138             unsafe {
139                 // tell the timer function not to cancel the io
140                 // it's not always true that you can really remove the timer entry
141                 h.with_mut_data(|value| value.data.event_data = ptr::null_mut());
142             }
143             h.remove()
144         });
145 
146         // schedule the coroutine
147         run_coroutine(co);
148     }
149 }
150 
151 // each file associated data
152 pub struct IoData(Arc<EventData>);
153 
154 impl IoData {
new<T: AsRawFd + ?Sized>(t: &T) -> Self155     pub fn new<T: AsRawFd + ?Sized>(t: &T) -> Self {
156         let fd = t.as_raw_fd();
157         let event_data = Arc::new(EventData::new(fd));
158         IoData(event_data)
159     }
160 
161     // clear the io flag
162     #[inline]
reset(&self)163     pub fn reset(&self) {
164         self.io_flag.store(false, Ordering::Relaxed);
165     }
166 }
167 
168 impl Deref for IoData {
169     type Target = Arc<EventData>;
170 
deref(&self) -> &Arc<EventData>171     fn deref(&self) -> &Arc<EventData> {
172         &self.0
173     }
174 }
175 
176 impl fmt::Debug for IoData {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result177     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
178         write!(f, "IoData = {{ ... }}")
179     }
180 }
181 
182 impl Drop for IoData {
drop(&mut self)183     fn drop(&mut self) {
184         del_socket(self);
185     }
186 }
187 
188 unsafe impl Send for IoData {}
189