1@cython.no_gc_clear 2cdef class UVPoll(UVHandle): 3 cdef _init(self, Loop loop, int fd): 4 cdef int err 5 6 self._start_init(loop) 7 8 self._handle = <uv.uv_handle_t*>PyMem_RawMalloc(sizeof(uv.uv_poll_t)) 9 if self._handle is NULL: 10 self._abort_init() 11 raise MemoryError() 12 13 err = uv.uv_poll_init(self._loop.uvloop, 14 <uv.uv_poll_t *>self._handle, fd) 15 if err < 0: 16 self._abort_init() 17 raise convert_error(err) 18 19 self._finish_init() 20 21 self.fd = fd 22 self.reading_handle = None 23 self.writing_handle = None 24 25 @staticmethod 26 cdef UVPoll new(Loop loop, int fd): 27 cdef UVPoll handle 28 handle = UVPoll.__new__(UVPoll) 29 handle._init(loop, fd) 30 return handle 31 32 cdef int is_active(self): 33 return (self.reading_handle is not None or 34 self.writing_handle is not None) 35 36 cdef inline _poll_start(self, int flags): 37 cdef int err 38 39 self._ensure_alive() 40 41 err = uv.uv_poll_start( 42 <uv.uv_poll_t*>self._handle, 43 flags, 44 __on_uvpoll_event) 45 46 if err < 0: 47 exc = convert_error(err) 48 self._fatal_error(exc, True) 49 return 50 51 cdef inline _poll_stop(self): 52 cdef int err 53 54 if not self._is_alive(): 55 return 56 57 err = uv.uv_poll_stop(<uv.uv_poll_t*>self._handle) 58 if err < 0: 59 exc = convert_error(err) 60 self._fatal_error(exc, True) 61 return 62 63 cdef: 64 int backend_id 65 system.epoll_event dummy_event 66 67 if system.PLATFORM_IS_LINUX: 68 # libuv doesn't remove the FD from epoll immediately 69 # after uv_poll_stop or uv_poll_close, causing hard 70 # to debug issue with dup-ed file descriptors causing 71 # CPU burn in epoll/epoll_ctl: 72 # https://github.com/MagicStack/uvloop/issues/61 73 # 74 # It's safe though to manually call epoll_ctl here, 75 # after calling uv_poll_stop. 76 77 backend_id = uv.uv_backend_fd(self._loop.uvloop) 78 if backend_id != -1: 79 memset(&dummy_event, 0, sizeof(dummy_event)) 80 system.epoll_ctl( 81 backend_id, 82 system.EPOLL_CTL_DEL, 83 self.fd, 84 &dummy_event) # ignore errors 85 86 cdef is_reading(self): 87 return self._is_alive() and self.reading_handle is not None 88 89 cdef is_writing(self): 90 return self._is_alive() and self.writing_handle is not None 91 92 cdef start_reading(self, Handle callback): 93 cdef: 94 int mask = 0 95 96 if self.reading_handle is None: 97 # not reading right now, setup the handle 98 99 mask = uv.UV_READABLE 100 if self.writing_handle is not None: 101 # are we writing right now? 102 mask |= uv.UV_WRITABLE 103 104 self._poll_start(mask) 105 else: 106 self.reading_handle._cancel() 107 108 self.reading_handle = callback 109 110 cdef start_writing(self, Handle callback): 111 cdef: 112 int mask = 0 113 114 if self.writing_handle is None: 115 # not writing right now, setup the handle 116 117 mask = uv.UV_WRITABLE 118 if self.reading_handle is not None: 119 # are we reading right now? 120 mask |= uv.UV_READABLE 121 122 self._poll_start(mask) 123 else: 124 self.writing_handle._cancel() 125 126 self.writing_handle = callback 127 128 cdef stop_reading(self): 129 if self.reading_handle is None: 130 return False 131 132 self.reading_handle._cancel() 133 self.reading_handle = None 134 135 if self.writing_handle is None: 136 self.stop() 137 else: 138 self._poll_start(uv.UV_WRITABLE) 139 140 return True 141 142 cdef stop_writing(self): 143 if self.writing_handle is None: 144 return False 145 146 self.writing_handle._cancel() 147 self.writing_handle = None 148 149 if self.reading_handle is None: 150 self.stop() 151 else: 152 self._poll_start(uv.UV_READABLE) 153 154 return True 155 156 cdef stop(self): 157 if self.reading_handle is not None: 158 self.reading_handle._cancel() 159 self.reading_handle = None 160 161 if self.writing_handle is not None: 162 self.writing_handle._cancel() 163 self.writing_handle = None 164 165 self._poll_stop() 166 167 cdef _close(self): 168 if self.is_active(): 169 self.stop() 170 171 UVHandle._close(<UVHandle>self) 172 173 cdef _fatal_error(self, exc, throw, reason=None): 174 try: 175 if self.reading_handle is not None: 176 try: 177 self.reading_handle._run() 178 except BaseException as ex: 179 self._loop._handle_exception(ex) 180 self.reading_handle = None 181 182 if self.writing_handle is not None: 183 try: 184 self.writing_handle._run() 185 except BaseException as ex: 186 self._loop._handle_exception(ex) 187 self.writing_handle = None 188 189 finally: 190 self._close() 191 192 193cdef void __on_uvpoll_event(uv.uv_poll_t* handle, 194 int status, int events) with gil: 195 196 if __ensure_handle_data(<uv.uv_handle_t*>handle, "UVPoll callback") == 0: 197 return 198 199 cdef: 200 UVPoll poll = <UVPoll> handle.data 201 202 if status < 0: 203 exc = convert_error(status) 204 poll._fatal_error(exc, False) 205 return 206 207 if ((events & (uv.UV_READABLE | uv.UV_DISCONNECT)) and 208 poll.reading_handle is not None): 209 210 try: 211 if UVLOOP_DEBUG: 212 poll._loop._poll_read_events_total += 1 213 poll.reading_handle._run() 214 except BaseException as ex: 215 if UVLOOP_DEBUG: 216 poll._loop._poll_read_cb_errors_total += 1 217 poll._error(ex, False) 218 # continue code execution 219 220 if ((events & (uv.UV_WRITABLE | uv.UV_DISCONNECT)) and 221 poll.writing_handle is not None): 222 223 try: 224 if UVLOOP_DEBUG: 225 poll._loop._poll_write_events_total += 1 226 poll.writing_handle._run() 227 except BaseException as ex: 228 if UVLOOP_DEBUG: 229 poll._loop._poll_write_cb_errors_total += 1 230 poll._error(ex, False) 231