1@cython.no_gc_clear 2@cython.freelist(DEFAULT_FREELIST_SIZE) 3cdef class _UDPSendContext: 4 # used to hold additional write request information for uv_write 5 6 cdef: 7 uv.uv_udp_send_t req 8 9 uv.uv_buf_t uv_buf 10 Py_buffer py_buf 11 12 UDPTransport udp 13 14 bint closed 15 16 cdef close(self): 17 if self.closed: 18 return 19 20 self.closed = 1 21 PyBuffer_Release(&self.py_buf) # void 22 self.req.data = NULL 23 self.uv_buf.base = NULL 24 Py_DECREF(self) 25 self.udp = None 26 27 @staticmethod 28 cdef _UDPSendContext new(UDPTransport udp, object data): 29 cdef _UDPSendContext ctx 30 ctx = _UDPSendContext.__new__(_UDPSendContext) 31 ctx.udp = None 32 ctx.closed = 1 33 34 ctx.req.data = <void*> ctx 35 Py_INCREF(ctx) 36 37 PyObject_GetBuffer(data, &ctx.py_buf, PyBUF_SIMPLE) 38 ctx.uv_buf.base = <char*>ctx.py_buf.buf 39 ctx.uv_buf.len = ctx.py_buf.len 40 ctx.udp = udp 41 42 ctx.closed = 0 43 return ctx 44 45 def __dealloc__(self): 46 if UVLOOP_DEBUG: 47 if not self.closed: 48 raise RuntimeError( 49 'open _UDPSendContext is being deallocated') 50 self.udp = None 51 52 53@cython.no_gc_clear 54cdef class UDPTransport(UVBaseTransport): 55 def __cinit__(self): 56 self._family = uv.AF_UNSPEC 57 self.__receiving = 0 58 self._address = None 59 self.context = Context_CopyCurrent() 60 61 cdef _init(self, Loop loop, unsigned int family): 62 cdef int err 63 64 self._start_init(loop) 65 66 self._handle = <uv.uv_handle_t*>PyMem_RawMalloc(sizeof(uv.uv_udp_t)) 67 if self._handle is NULL: 68 self._abort_init() 69 raise MemoryError() 70 71 err = uv.uv_udp_init_ex(loop.uvloop, 72 <uv.uv_udp_t*>self._handle, 73 family) 74 if err < 0: 75 self._abort_init() 76 raise convert_error(err) 77 78 if family in (uv.AF_INET, uv.AF_INET6): 79 self._family = family 80 81 self._finish_init() 82 83 cdef _set_address(self, system.addrinfo *addr): 84 self._address = __convert_sockaddr_to_pyaddr(addr.ai_addr) 85 86 cdef _connect(self, system.sockaddr* addr, size_t addr_len): 87 cdef int err 88 err = uv.uv_udp_connect(<uv.uv_udp_t*>self._handle, addr) 89 if err < 0: 90 exc = convert_error(err) 91 raise exc 92 93 cdef open(self, int family, int sockfd): 94 if family in (uv.AF_INET, uv.AF_INET6, uv.AF_UNIX): 95 self._family = family 96 else: 97 raise ValueError( 98 'cannot open a UDP handle, invalid family {}'.format(family)) 99 100 cdef int err 101 err = uv.uv_udp_open(<uv.uv_udp_t*>self._handle, 102 <uv.uv_os_sock_t>sockfd) 103 104 if err < 0: 105 exc = convert_error(err) 106 raise exc 107 108 cdef _bind(self, system.sockaddr* addr): 109 cdef: 110 int err 111 int flags = 0 112 113 self._ensure_alive() 114 115 err = uv.uv_udp_bind(<uv.uv_udp_t*>self._handle, addr, flags) 116 if err < 0: 117 exc = convert_error(err) 118 raise exc 119 120 cdef _set_broadcast(self, bint on): 121 cdef int err 122 123 self._ensure_alive() 124 125 err = uv.uv_udp_set_broadcast(<uv.uv_udp_t*>self._handle, on) 126 if err < 0: 127 exc = convert_error(err) 128 raise exc 129 130 cdef size_t _get_write_buffer_size(self): 131 if self._handle is NULL: 132 return 0 133 return (<uv.uv_udp_t*>self._handle).send_queue_size 134 135 cdef bint _is_reading(self): 136 return self.__receiving 137 138 cdef _start_reading(self): 139 cdef int err 140 141 if self.__receiving: 142 return 143 144 self._ensure_alive() 145 146 err = uv.uv_udp_recv_start(<uv.uv_udp_t*>self._handle, 147 __loop_alloc_buffer, 148 __uv_udp_on_receive) 149 150 if err < 0: 151 exc = convert_error(err) 152 self._fatal_error(exc, True) 153 return 154 else: 155 # UDPTransport must live until the read callback is called 156 self.__receiving_started() 157 158 cdef _stop_reading(self): 159 cdef int err 160 161 if not self.__receiving: 162 return 163 164 self._ensure_alive() 165 166 err = uv.uv_udp_recv_stop(<uv.uv_udp_t*>self._handle) 167 if err < 0: 168 exc = convert_error(err) 169 self._fatal_error(exc, True) 170 return 171 else: 172 self.__receiving_stopped() 173 174 cdef inline __receiving_started(self): 175 if self.__receiving: 176 return 177 self.__receiving = 1 178 Py_INCREF(self) 179 180 cdef inline __receiving_stopped(self): 181 if not self.__receiving: 182 return 183 self.__receiving = 0 184 Py_DECREF(self) 185 186 cdef _new_socket(self): 187 if self._family not in (uv.AF_INET, uv.AF_INET6, uv.AF_UNIX): 188 raise RuntimeError( 189 'UDPTransport.family is undefined; ' 190 'cannot create python socket') 191 192 fileno = self._fileno() 193 return PseudoSocket(self._family, uv.SOCK_DGRAM, 0, fileno) 194 195 cdef _send(self, object data, object addr): 196 cdef: 197 _UDPSendContext ctx 198 system.sockaddr_storage saddr_st 199 system.sockaddr *saddr 200 Py_buffer try_pybuf 201 uv.uv_buf_t try_uvbuf 202 203 self._ensure_alive() 204 205 if self._family not in (uv.AF_INET, uv.AF_INET6, uv.AF_UNIX): 206 raise RuntimeError('UDPTransport.family is undefined; cannot send') 207 208 if addr is None: 209 saddr = NULL 210 else: 211 try: 212 __convert_pyaddr_to_sockaddr(self._family, addr, 213 <system.sockaddr*>&saddr_st) 214 except (ValueError, TypeError): 215 raise 216 except Exception: 217 raise ValueError( 218 f'{addr!r}: socket family mismatch or ' 219 f'a DNS lookup is required') 220 saddr = <system.sockaddr*>(&saddr_st) 221 222 if self._get_write_buffer_size() == 0: 223 PyObject_GetBuffer(data, &try_pybuf, PyBUF_SIMPLE) 224 try_uvbuf.base = <char*>try_pybuf.buf 225 try_uvbuf.len = try_pybuf.len 226 err = uv.uv_udp_try_send(<uv.uv_udp_t*>self._handle, 227 &try_uvbuf, 228 1, 229 saddr) 230 PyBuffer_Release(&try_pybuf) 231 else: 232 err = uv.UV_EAGAIN 233 234 if err == uv.UV_EAGAIN: 235 ctx = _UDPSendContext.new(self, data) 236 err = uv.uv_udp_send(&ctx.req, 237 <uv.uv_udp_t*>self._handle, 238 &ctx.uv_buf, 239 1, 240 saddr, 241 __uv_udp_on_send) 242 243 if err < 0: 244 ctx.close() 245 246 exc = convert_error(err) 247 self._fatal_error(exc, True) 248 else: 249 self._maybe_pause_protocol() 250 251 else: 252 if err < 0: 253 exc = convert_error(err) 254 self._fatal_error(exc, True) 255 else: 256 self._on_sent(None, self.context.copy()) 257 258 cdef _on_receive(self, bytes data, object exc, object addr): 259 if exc is None: 260 run_in_context2( 261 self.context, self._protocol.datagram_received, data, addr, 262 ) 263 else: 264 run_in_context1(self.context, self._protocol.error_received, exc) 265 266 cdef _on_sent(self, object exc, object context=None): 267 if exc is not None: 268 if isinstance(exc, OSError): 269 if context is None: 270 context = self.context 271 run_in_context1(context, self._protocol.error_received, exc) 272 else: 273 self._fatal_error( 274 exc, False, 'Fatal write error on datagram transport') 275 276 self._maybe_resume_protocol() 277 if not self._get_write_buffer_size(): 278 if self._closing: 279 self._schedule_call_connection_lost(None) 280 281 # === Public API === 282 283 def sendto(self, data, addr=None): 284 if not data: 285 # Replicating asyncio logic here. 286 return 287 288 if self._address: 289 if addr not in (None, self._address): 290 # Replicating asyncio logic here. 291 raise ValueError( 292 'Invalid address: must be None or %s' % (self._address,)) 293 294 # Instead of setting addr to self._address below like what asyncio 295 # does, we depend on previous uv_udp_connect() to set the address 296 addr = None 297 298 if self._conn_lost: 299 # Replicating asyncio logic here. 300 if self._conn_lost >= LOG_THRESHOLD_FOR_CONNLOST_WRITES: 301 aio_logger.warning('socket.send() raised exception.') 302 self._conn_lost += 1 303 return 304 305 self._send(data, addr) 306 307 308cdef void __uv_udp_on_receive(uv.uv_udp_t* handle, 309 ssize_t nread, 310 const uv.uv_buf_t* buf, 311 const system.sockaddr* addr, 312 unsigned flags) with gil: 313 314 if __ensure_handle_data(<uv.uv_handle_t*>handle, 315 "UDPTransport receive callback") == 0: 316 return 317 318 cdef: 319 UDPTransport udp = <UDPTransport>handle.data 320 Loop loop = udp._loop 321 bytes data 322 object pyaddr 323 324 # It's OK to free the buffer early, since nothing will 325 # be able to touch it until this method is done. 326 __loop_free_buffer(loop) 327 328 if udp._closed: 329 # The handle was closed, there is no reason to 330 # do any work now. 331 udp.__receiving_stopped() # Just in case. 332 return 333 334 if addr is NULL and nread == 0: 335 # From libuv docs: 336 # addr: struct sockaddr* containing the address 337 # of the sender. Can be NULL. Valid for the duration 338 # of the callback only. 339 # [...] 340 # The receive callback will be called with 341 # nread == 0 and addr == NULL when there is 342 # nothing to read, and with nread == 0 and 343 # addr != NULL when an empty UDP packet is 344 # received. 345 return 346 347 if addr is NULL: 348 pyaddr = None 349 elif addr.sa_family == uv.AF_UNSPEC: 350 # https://github.com/MagicStack/uvloop/issues/304 351 IF UNAME_SYSNAME == "Linux": 352 pyaddr = None 353 ELSE: 354 pyaddr = '' 355 else: 356 try: 357 pyaddr = __convert_sockaddr_to_pyaddr(addr) 358 except BaseException as exc: 359 udp._error(exc, False) 360 return 361 362 if nread < 0: 363 exc = convert_error(nread) 364 udp._on_receive(None, exc, pyaddr) 365 return 366 367 if nread == 0: 368 data = b'' 369 else: 370 data = loop._recv_buffer[:nread] 371 372 try: 373 udp._on_receive(data, None, pyaddr) 374 except BaseException as exc: 375 udp._error(exc, False) 376 377 378cdef void __uv_udp_on_send(uv.uv_udp_send_t* req, int status) with gil: 379 380 if req.data is NULL: 381 # Shouldn't happen as: 382 # - _UDPSendContext does an extra INCREF in its 'init()' 383 # - _UDPSendContext holds a ref to the relevant UDPTransport 384 aio_logger.error( 385 'UVStream.write callback called with NULL req.data, status=%r', 386 status) 387 return 388 389 cdef: 390 _UDPSendContext ctx = <_UDPSendContext> req.data 391 UDPTransport udp = <UDPTransport>ctx.udp 392 393 ctx.close() 394 395 if status < 0: 396 exc = convert_error(status) 397 print(exc) 398 else: 399 exc = None 400 401 try: 402 udp._on_sent(exc) 403 except BaseException as exc: 404 udp._error(exc, False) 405