1DEF __PREALLOCED_BUFS = 4 2 3 4@cython.no_gc_clear 5@cython.freelist(DEFAULT_FREELIST_SIZE) 6cdef class _StreamWriteContext: 7 # used to hold additional write request information for uv_write 8 9 cdef: 10 uv.uv_write_t req 11 12 list buffers 13 14 uv.uv_buf_t uv_bufs_sml[__PREALLOCED_BUFS] 15 Py_buffer py_bufs_sml[__PREALLOCED_BUFS] 16 bint py_bufs_sml_inuse 17 18 uv.uv_buf_t* uv_bufs 19 Py_buffer* py_bufs 20 size_t py_bufs_len 21 22 uv.uv_buf_t* uv_bufs_start 23 size_t uv_bufs_len 24 25 UVStream stream 26 27 bint closed 28 29 cdef free_bufs(self): 30 cdef size_t i 31 32 if self.uv_bufs is not NULL: 33 PyMem_RawFree(self.uv_bufs) 34 self.uv_bufs = NULL 35 if UVLOOP_DEBUG: 36 if self.py_bufs_sml_inuse: 37 raise RuntimeError( 38 '_StreamWriteContext.close: uv_bufs != NULL and ' 39 'py_bufs_sml_inuse is True') 40 41 if self.py_bufs is not NULL: 42 for i from 0 <= i < self.py_bufs_len: 43 PyBuffer_Release(&self.py_bufs[i]) 44 PyMem_RawFree(self.py_bufs) 45 self.py_bufs = NULL 46 if UVLOOP_DEBUG: 47 if self.py_bufs_sml_inuse: 48 raise RuntimeError( 49 '_StreamWriteContext.close: py_bufs != NULL and ' 50 'py_bufs_sml_inuse is True') 51 52 if self.py_bufs_sml_inuse: 53 for i from 0 <= i < self.py_bufs_len: 54 PyBuffer_Release(&self.py_bufs_sml[i]) 55 self.py_bufs_sml_inuse = 0 56 57 self.py_bufs_len = 0 58 self.buffers = None 59 60 cdef close(self): 61 if self.closed: 62 return 63 self.closed = 1 64 self.free_bufs() 65 Py_DECREF(self) 66 67 cdef advance_uv_buf(self, size_t sent): 68 # Advance the pointer to first uv_buf and the 69 # pointer to first byte in that buffer. 70 # 71 # We do this after a "uv_try_write" call, which 72 # sometimes sends only a portion of data. 73 # We then call "advance_uv_buf" on the write 74 # context, and reuse it in a "uv_write" call. 75 76 cdef: 77 uv.uv_buf_t* buf 78 size_t idx 79 80 for idx from 0 <= idx < self.uv_bufs_len: 81 buf = &self.uv_bufs_start[idx] 82 if buf.len > sent: 83 buf.len -= sent 84 buf.base = buf.base + sent 85 self.uv_bufs_start = buf 86 self.uv_bufs_len -= idx 87 return 88 else: 89 sent -= self.uv_bufs_start[idx].len 90 91 if UVLOOP_DEBUG: 92 if sent < 0: 93 raise RuntimeError('fatal: sent < 0 in advance_uv_buf') 94 95 raise RuntimeError('fatal: Could not advance _StreamWriteContext') 96 97 @staticmethod 98 cdef _StreamWriteContext new(UVStream stream, list buffers): 99 cdef: 100 _StreamWriteContext ctx 101 int uv_bufs_idx = 0 102 size_t py_bufs_len = 0 103 int i 104 105 Py_buffer* p_pybufs 106 uv.uv_buf_t* p_uvbufs 107 108 ctx = _StreamWriteContext.__new__(_StreamWriteContext) 109 ctx.stream = None 110 ctx.closed = 1 111 ctx.py_bufs_len = 0 112 ctx.py_bufs_sml_inuse = 0 113 ctx.uv_bufs = NULL 114 ctx.py_bufs = NULL 115 ctx.buffers = buffers 116 ctx.stream = stream 117 118 if len(buffers) <= __PREALLOCED_BUFS: 119 # We've got a small number of buffers to write, don't 120 # need to use malloc. 121 ctx.py_bufs_sml_inuse = 1 122 p_pybufs = <Py_buffer*>&ctx.py_bufs_sml 123 p_uvbufs = <uv.uv_buf_t*>&ctx.uv_bufs_sml 124 125 else: 126 for buf in buffers: 127 if UVLOOP_DEBUG: 128 if not isinstance(buf, (bytes, bytearray, memoryview)): 129 raise RuntimeError( 130 'invalid data in writebuf: an instance of ' 131 'bytes, bytearray or memoryview was expected, ' 132 'got {}'.format(type(buf))) 133 134 if not PyBytes_CheckExact(buf): 135 py_bufs_len += 1 136 137 if py_bufs_len > 0: 138 ctx.py_bufs = <Py_buffer*>PyMem_RawMalloc( 139 py_bufs_len * sizeof(Py_buffer)) 140 if ctx.py_bufs is NULL: 141 raise MemoryError() 142 143 ctx.uv_bufs = <uv.uv_buf_t*>PyMem_RawMalloc( 144 len(buffers) * sizeof(uv.uv_buf_t)) 145 if ctx.uv_bufs is NULL: 146 raise MemoryError() 147 148 p_pybufs = ctx.py_bufs 149 p_uvbufs = ctx.uv_bufs 150 151 py_bufs_len = 0 152 for buf in buffers: 153 if PyBytes_CheckExact(buf): 154 # We can only use this hack for bytes since it's 155 # immutable. For everything else it is only safe to 156 # use buffer protocol. 157 p_uvbufs[uv_bufs_idx].base = PyBytes_AS_STRING(buf) 158 p_uvbufs[uv_bufs_idx].len = Py_SIZE(buf) 159 160 else: 161 try: 162 PyObject_GetBuffer( 163 buf, &p_pybufs[py_bufs_len], PyBUF_SIMPLE) 164 except Exception: 165 # This shouldn't ever happen, as `UVStream._write` 166 # casts non-bytes objects to `memoryviews`. 167 ctx.py_bufs_len = py_bufs_len 168 ctx.free_bufs() 169 raise 170 171 p_uvbufs[uv_bufs_idx].base = <char*>p_pybufs[py_bufs_len].buf 172 p_uvbufs[uv_bufs_idx].len = p_pybufs[py_bufs_len].len 173 174 py_bufs_len += 1 175 176 uv_bufs_idx += 1 177 178 ctx.uv_bufs_start = p_uvbufs 179 ctx.uv_bufs_len = uv_bufs_idx 180 181 ctx.py_bufs_len = py_bufs_len 182 ctx.req.data = <void*> ctx 183 184 if UVLOOP_DEBUG: 185 stream._loop._debug_stream_write_ctx_total += 1 186 stream._loop._debug_stream_write_ctx_cnt += 1 187 188 # Do incref after everything else is done. 189 # Under no circumstances we want `ctx` to be GCed while 190 # libuv is still working with `ctx.uv_bufs`. 191 Py_INCREF(ctx) 192 ctx.closed = 0 193 return ctx 194 195 def __dealloc__(self): 196 if not self.closed: 197 # Because we do an INCREF in _StreamWriteContext.new, 198 # __dealloc__ shouldn't ever happen with `self.closed == 1` 199 raise RuntimeError( 200 'open _StreamWriteContext is being deallocated') 201 202 if UVLOOP_DEBUG: 203 if self.stream is not None: 204 self.stream._loop._debug_stream_write_ctx_cnt -= 1 205 self.stream = None 206 207 208@cython.no_gc_clear 209cdef class UVStream(UVBaseTransport): 210 211 def __cinit__(self): 212 self.__shutting_down = 0 213 self.__reading = 0 214 self.__read_error_close = 0 215 self.__buffered = 0 216 self._eof = 0 217 self._buffer = [] 218 self._buffer_size = 0 219 220 self._protocol_get_buffer = None 221 self._protocol_buffer_updated = None 222 223 self._read_pybuf_acquired = False 224 225 cdef _set_protocol(self, object protocol): 226 if protocol is None: 227 raise TypeError('protocol is required') 228 229 UVBaseTransport._set_protocol(self, protocol) 230 231 if (hasattr(protocol, 'get_buffer') and 232 not isinstance(protocol, aio_Protocol)): 233 try: 234 self._protocol_get_buffer = protocol.get_buffer 235 self._protocol_buffer_updated = protocol.buffer_updated 236 self.__buffered = 1 237 except AttributeError: 238 pass 239 else: 240 self.__buffered = 0 241 242 cdef _clear_protocol(self): 243 UVBaseTransport._clear_protocol(self) 244 self._protocol_get_buffer = None 245 self._protocol_buffer_updated = None 246 self.__buffered = 0 247 248 cdef inline _shutdown(self): 249 cdef int err 250 251 if self.__shutting_down: 252 return 253 self.__shutting_down = 1 254 255 self._ensure_alive() 256 257 self._shutdown_req.data = <void*> self 258 err = uv.uv_shutdown(&self._shutdown_req, 259 <uv.uv_stream_t*> self._handle, 260 __uv_stream_on_shutdown) 261 if err < 0: 262 exc = convert_error(err) 263 self._fatal_error(exc, True) 264 return 265 266 cdef inline _accept(self, UVStream server): 267 cdef int err 268 self._ensure_alive() 269 270 err = uv.uv_accept(<uv.uv_stream_t*>server._handle, 271 <uv.uv_stream_t*>self._handle) 272 if err < 0: 273 exc = convert_error(err) 274 self._fatal_error(exc, True) 275 return 276 277 self._on_accept() 278 279 cdef inline _close_on_read_error(self): 280 self.__read_error_close = 1 281 282 cdef bint _is_reading(self): 283 return self.__reading 284 285 cdef _start_reading(self): 286 cdef int err 287 288 if self._closing: 289 return 290 291 self._ensure_alive() 292 293 if self.__reading: 294 return 295 296 if self.__buffered: 297 err = uv.uv_read_start(<uv.uv_stream_t*>self._handle, 298 __uv_stream_buffered_alloc, 299 __uv_stream_buffered_on_read) 300 else: 301 err = uv.uv_read_start(<uv.uv_stream_t*>self._handle, 302 __loop_alloc_buffer, 303 __uv_stream_on_read) 304 if err < 0: 305 exc = convert_error(err) 306 self._fatal_error(exc, True) 307 return 308 else: 309 # UVStream must live until the read callback is called 310 self.__reading_started() 311 312 cdef inline __reading_started(self): 313 if self.__reading: 314 return 315 self.__reading = 1 316 Py_INCREF(self) 317 318 cdef inline __reading_stopped(self): 319 if not self.__reading: 320 return 321 self.__reading = 0 322 Py_DECREF(self) 323 324 cdef _stop_reading(self): 325 cdef int err 326 327 if not self.__reading: 328 return 329 330 self._ensure_alive() 331 332 # From libuv docs: 333 # This function is idempotent and may be safely 334 # called on a stopped stream. 335 err = uv.uv_read_stop(<uv.uv_stream_t*>self._handle) 336 if err < 0: 337 exc = convert_error(err) 338 self._fatal_error(exc, True) 339 return 340 else: 341 self.__reading_stopped() 342 343 cdef inline _try_write(self, object data): 344 cdef: 345 ssize_t written 346 bint used_buf = 0 347 Py_buffer py_buf 348 void* buf 349 size_t blen 350 int saved_errno 351 int fd 352 353 if (<uv.uv_stream_t*>self._handle).write_queue_size != 0: 354 raise RuntimeError( 355 'UVStream._try_write called with data in uv buffers') 356 357 if PyBytes_CheckExact(data): 358 # We can only use this hack for bytes since it's 359 # immutable. For everything else it is only safe to 360 # use buffer protocol. 361 buf = <void*>PyBytes_AS_STRING(data) 362 blen = Py_SIZE(data) 363 else: 364 PyObject_GetBuffer(data, &py_buf, PyBUF_SIMPLE) 365 used_buf = 1 366 buf = py_buf.buf 367 blen = py_buf.len 368 369 if blen == 0: 370 # Empty data, do nothing. 371 return 0 372 373 fd = self._fileno() 374 # Use `unistd.h/write` directly, it's faster than 375 # uv_try_write -- less layers of code. The error 376 # checking logic is copied from libuv. 377 written = system.write(fd, buf, blen) 378 while written == -1 and ( 379 errno.errno == errno.EINTR or 380 (system.PLATFORM_IS_APPLE and 381 errno.errno == errno.EPROTOTYPE)): 382 # From libuv code (unix/stream.c): 383 # Due to a possible kernel bug at least in OS X 10.10 "Yosemite", 384 # EPROTOTYPE can be returned while trying to write to a socket 385 # that is shutting down. If we retry the write, we should get 386 # the expected EPIPE instead. 387 written = system.write(fd, buf, blen) 388 saved_errno = errno.errno 389 390 if used_buf: 391 PyBuffer_Release(&py_buf) 392 393 if written < 0: 394 if saved_errno == errno.EAGAIN or \ 395 saved_errno == system.EWOULDBLOCK: 396 return -1 397 else: 398 exc = convert_error(-saved_errno) 399 self._fatal_error(exc, True) 400 return 401 402 if UVLOOP_DEBUG: 403 self._loop._debug_stream_write_tries += 1 404 405 if <size_t>written == blen: 406 return 0 407 408 return written 409 410 cdef inline _write(self, object data): 411 cdef int dlen 412 413 if not PyBytes_CheckExact(data): 414 data = memoryview(data).cast('b') 415 416 dlen = len(data) 417 if not dlen: 418 return 419 420 self._buffer_size += dlen 421 self._buffer.append(data) 422 423 if (not self._protocol_paused and 424 (<uv.uv_stream_t*>self._handle).write_queue_size == 0 and 425 self._buffer_size > self._high_water): 426 # Fast-path. If: 427 # - the protocol isn't yet paused, 428 # - there is no data in libuv buffers for this stream, 429 # - the protocol will be paused if we continue to buffer data 430 # 431 # Then: 432 # - Try to write all buffered data right now. 433 all_sent = self._exec_write() 434 if UVLOOP_DEBUG: 435 if self._buffer_size != 0 or self._buffer != []: 436 raise RuntimeError( 437 '_buffer_size is not 0 after a successful _exec_write') 438 439 # There is no need to call `_queue_write` anymore, 440 # as `uv_write` should be called already. 441 442 if not all_sent: 443 # If not all of the data was sent successfully, 444 # we might need to pause the protocol. 445 self._maybe_pause_protocol() 446 return 447 448 self._maybe_pause_protocol() 449 self._loop._queue_write(self) 450 451 cdef inline _exec_write(self): 452 cdef: 453 int err 454 int buf_len 455 _StreamWriteContext ctx = None 456 457 if self._closed: 458 # If the handle is closed, just return, it's too 459 # late to do anything. 460 return 461 462 buf_len = len(self._buffer) 463 if not buf_len: 464 return 465 466 if (<uv.uv_stream_t*>self._handle).write_queue_size == 0: 467 # libuv internal write buffers for this stream are empty. 468 if buf_len == 1: 469 # If we only have one piece of data to send, let's 470 # use our fast implementation of try_write. 471 data = self._buffer[0] 472 sent = self._try_write(data) 473 474 if sent is None: 475 # A `self._fatal_error` was called. 476 # It might not raise an exception under some 477 # conditions. 478 self._buffer_size = 0 479 self._buffer.clear() 480 if not self._closing: 481 # This should never happen. 482 raise RuntimeError( 483 'stream is open after UVStream._try_write ' 484 'returned None') 485 return 486 487 if sent == 0: 488 # All data was successfully written. 489 self._buffer_size = 0 490 self._buffer.clear() 491 # on_write will call "maybe_resume_protocol". 492 self._on_write() 493 return True 494 495 if sent > 0: 496 if UVLOOP_DEBUG: 497 if sent == len(data): 498 raise RuntimeError( 499 '_try_write sent all data and returned ' 500 'non-zero') 501 502 if PyBytes_CheckExact(data): 503 # Cast bytes to memoryview to avoid copying 504 # data that wasn't sent. 505 data = memoryview(data) 506 data = data[sent:] 507 508 self._buffer_size -= sent 509 self._buffer[0] = data 510 511 # At this point it's either data was sent partially, 512 # or an EAGAIN has happened. 513 514 else: 515 ctx = _StreamWriteContext.new(self, self._buffer) 516 517 err = uv.uv_try_write(<uv.uv_stream_t*>self._handle, 518 ctx.uv_bufs_start, 519 ctx.uv_bufs_len) 520 521 if err > 0: 522 # Some data was successfully sent. 523 524 if <size_t>err == self._buffer_size: 525 # Everything was sent. 526 ctx.close() 527 self._buffer.clear() 528 self._buffer_size = 0 529 # on_write will call "maybe_resume_protocol". 530 self._on_write() 531 return True 532 533 try: 534 # Advance pointers to uv_bufs in `ctx`, 535 # we will reuse it soon for a uv_write 536 # call. 537 ctx.advance_uv_buf(<ssize_t>err) 538 except Exception as ex: # This should never happen. 539 # Let's try to close the `ctx` anyways. 540 ctx.close() 541 self._fatal_error(ex, True) 542 self._buffer.clear() 543 self._buffer_size = 0 544 return 545 546 elif err != uv.UV_EAGAIN: 547 ctx.close() 548 exc = convert_error(err) 549 self._fatal_error(exc, True) 550 self._buffer.clear() 551 self._buffer_size = 0 552 return 553 554 # fall through 555 556 if ctx is None: 557 ctx = _StreamWriteContext.new(self, self._buffer) 558 559 err = uv.uv_write(&ctx.req, 560 <uv.uv_stream_t*>self._handle, 561 ctx.uv_bufs_start, 562 ctx.uv_bufs_len, 563 __uv_stream_on_write) 564 565 self._buffer_size = 0 566 # Can't use `_buffer.clear()` here: `ctx` holds a reference to 567 # the `_buffer`. 568 self._buffer = [] 569 570 if err < 0: 571 # close write context 572 ctx.close() 573 574 exc = convert_error(err) 575 self._fatal_error(exc, True) 576 return 577 578 self._maybe_resume_protocol() 579 580 cdef size_t _get_write_buffer_size(self): 581 if self._handle is NULL: 582 return 0 583 return ((<uv.uv_stream_t*>self._handle).write_queue_size + 584 self._buffer_size) 585 586 cdef _close(self): 587 try: 588 if self._read_pybuf_acquired: 589 # Should never happen. libuv always calls uv_alloc/uv_read 590 # in pairs. 591 self._loop.call_exception_handler({ 592 'transport': self, 593 'message': 'XXX: an allocated buffer in transport._close()' 594 }) 595 self._read_pybuf_acquired = 0 596 PyBuffer_Release(&self._read_pybuf) 597 598 self._stop_reading() 599 finally: 600 UVSocketHandle._close(<UVHandle>self) 601 602 cdef inline _on_accept(self): 603 # Ultimately called by __uv_stream_on_listen. 604 self._init_protocol() 605 606 cdef inline _on_eof(self): 607 # Any exception raised here will be caught in 608 # __uv_stream_on_read. 609 610 try: 611 meth = self._protocol.eof_received 612 except AttributeError: 613 keep_open = False 614 else: 615 keep_open = run_in_context(self.context, meth) 616 617 if keep_open: 618 # We're keeping the connection open so the 619 # protocol can write more, but we still can't 620 # receive more, so remove the reader callback. 621 self._stop_reading() 622 else: 623 self.close() 624 625 cdef inline _on_write(self): 626 self._maybe_resume_protocol() 627 if not self._get_write_buffer_size(): 628 if self._closing: 629 self._schedule_call_connection_lost(None) 630 elif self._eof: 631 self._shutdown() 632 633 cdef inline _init(self, Loop loop, object protocol, Server server, 634 object waiter, object context): 635 self.context = context 636 self._set_protocol(protocol) 637 self._start_init(loop) 638 639 if server is not None: 640 self._set_server(server) 641 642 if waiter is not None: 643 self._set_waiter(waiter) 644 645 cdef inline _on_connect(self, object exc): 646 # Called from __tcp_connect_callback (tcp.pyx) and 647 # __pipe_connect_callback (pipe.pyx). 648 if exc is None: 649 self._init_protocol() 650 else: 651 if self._waiter is None: 652 self._fatal_error(exc, False, "connect failed") 653 elif self._waiter.cancelled(): 654 # Connect call was cancelled; just close the transport 655 # silently. 656 self._close() 657 elif self._waiter.done(): 658 self._fatal_error(exc, False, "connect failed") 659 else: 660 self._waiter.set_exception(exc) 661 self._close() 662 663 # === Public API === 664 665 def __repr__(self): 666 return '<{} closed={} reading={} {:#x}>'.format( 667 self.__class__.__name__, 668 self._closed, 669 self.__reading, 670 id(self)) 671 672 def write(self, object buf): 673 self._ensure_alive() 674 675 if self._eof: 676 raise RuntimeError('Cannot call write() after write_eof()') 677 if not buf: 678 return 679 if self._conn_lost: 680 self._conn_lost += 1 681 return 682 self._write(buf) 683 684 def writelines(self, bufs): 685 self._ensure_alive() 686 687 if self._eof: 688 raise RuntimeError('Cannot call writelines() after write_eof()') 689 if self._conn_lost: 690 self._conn_lost += 1 691 return 692 for buf in bufs: 693 self._write(buf) 694 695 def write_eof(self): 696 self._ensure_alive() 697 698 if self._eof: 699 return 700 701 self._eof = 1 702 if not self._get_write_buffer_size(): 703 self._shutdown() 704 705 def can_write_eof(self): 706 return True 707 708 def is_reading(self): 709 return self._is_reading() 710 711 def pause_reading(self): 712 if self._closing or not self._is_reading(): 713 return 714 self._stop_reading() 715 716 def resume_reading(self): 717 if self._is_reading() or self._closing: 718 return 719 self._start_reading() 720 721 722cdef void __uv_stream_on_shutdown(uv.uv_shutdown_t* req, 723 int status) with gil: 724 725 # callback for uv_shutdown 726 727 if req.data is NULL: 728 aio_logger.error( 729 'UVStream.shutdown callback called with NULL req.data, status=%r', 730 status) 731 return 732 733 cdef UVStream stream = <UVStream> req.data 734 735 if status < 0 and status != uv.UV_ECANCELED: 736 # From libuv source code: 737 # The ECANCELED error code is a lie, the shutdown(2) syscall is a 738 # fait accompli at this point. Maybe we should revisit this in 739 # v0.11. A possible reason for leaving it unchanged is that it 740 # informs the callee that the handle has been destroyed. 741 742 if UVLOOP_DEBUG: 743 stream._loop._debug_stream_shutdown_errors_total += 1 744 745 exc = convert_error(status) 746 stream._fatal_error( 747 exc, False, "error status in uv_stream_t.shutdown callback") 748 return 749 750 751cdef inline bint __uv_stream_on_read_common(UVStream sc, Loop loop, 752 ssize_t nread): 753 if sc._closed: 754 # The stream was closed, there is no reason to 755 # do any work now. 756 sc.__reading_stopped() # Just in case. 757 return True 758 759 if nread == uv.UV_EOF: 760 # From libuv docs: 761 # The callee is responsible for stopping closing the stream 762 # when an error happens by calling uv_read_stop() or uv_close(). 763 # Trying to read from the stream again is undefined. 764 try: 765 if UVLOOP_DEBUG: 766 loop._debug_stream_read_eof_total += 1 767 768 sc._stop_reading() 769 sc._on_eof() 770 except BaseException as ex: 771 if UVLOOP_DEBUG: 772 loop._debug_stream_read_eof_cb_errors_total += 1 773 774 sc._fatal_error(ex, False) 775 finally: 776 return True 777 778 if nread == 0: 779 # From libuv docs: 780 # nread might be 0, which does not indicate an error or EOF. 781 # This is equivalent to EAGAIN or EWOULDBLOCK under read(2). 782 return True 783 784 if nread < 0: 785 # From libuv docs: 786 # The callee is responsible for stopping closing the stream 787 # when an error happens by calling uv_read_stop() or uv_close(). 788 # Trying to read from the stream again is undefined. 789 # 790 # Therefore, we're closing the stream. Since "UVHandle._close()" 791 # doesn't raise exceptions unless uvloop is built with DEBUG=1, 792 # we don't need try...finally here. 793 794 if UVLOOP_DEBUG: 795 loop._debug_stream_read_errors_total += 1 796 797 if sc.__read_error_close: 798 # Used for getting notified when a pipe is closed. 799 # See WriteUnixTransport for the explanation. 800 sc._on_eof() 801 return True 802 803 exc = convert_error(nread) 804 sc._fatal_error( 805 exc, False, "error status in uv_stream_t.read callback") 806 return True 807 808 return False 809 810 811cdef inline void __uv_stream_on_read_impl(uv.uv_stream_t* stream, 812 ssize_t nread, 813 const uv.uv_buf_t* buf): 814 cdef: 815 UVStream sc = <UVStream>stream.data 816 Loop loop = sc._loop 817 818 # It's OK to free the buffer early, since nothing will 819 # be able to touch it until this method is done. 820 __loop_free_buffer(loop) 821 822 if __uv_stream_on_read_common(sc, loop, nread): 823 return 824 825 try: 826 if UVLOOP_DEBUG: 827 loop._debug_stream_read_cb_total += 1 828 829 run_in_context1( 830 sc.context, 831 sc._protocol_data_received, 832 loop._recv_buffer[:nread], 833 ) 834 except BaseException as exc: 835 if UVLOOP_DEBUG: 836 loop._debug_stream_read_cb_errors_total += 1 837 838 sc._fatal_error(exc, False) 839 840 841cdef inline void __uv_stream_on_write_impl(uv.uv_write_t* req, int status): 842 cdef: 843 _StreamWriteContext ctx = <_StreamWriteContext> req.data 844 UVStream stream = <UVStream>ctx.stream 845 846 ctx.close() 847 848 if stream._closed: 849 # The stream was closed, there is nothing to do. 850 # Even if there is an error, like EPIPE, there 851 # is no reason to report it. 852 return 853 854 if status < 0: 855 if UVLOOP_DEBUG: 856 stream._loop._debug_stream_write_errors_total += 1 857 858 exc = convert_error(status) 859 stream._fatal_error( 860 exc, False, "error status in uv_stream_t.write callback") 861 return 862 863 try: 864 stream._on_write() 865 except BaseException as exc: 866 if UVLOOP_DEBUG: 867 stream._loop._debug_stream_write_cb_errors_total += 1 868 869 stream._fatal_error(exc, False) 870 871 872cdef void __uv_stream_on_read(uv.uv_stream_t* stream, 873 ssize_t nread, 874 const uv.uv_buf_t* buf) with gil: 875 876 if __ensure_handle_data(<uv.uv_handle_t*>stream, 877 "UVStream read callback") == 0: 878 return 879 880 # Don't need try-finally, __uv_stream_on_read_impl is void 881 __uv_stream_on_read_impl(stream, nread, buf) 882 883 884cdef void __uv_stream_on_write(uv.uv_write_t* req, int status) with gil: 885 886 if UVLOOP_DEBUG: 887 if req.data is NULL: 888 aio_logger.error( 889 'UVStream.write callback called with NULL req.data, status=%r', 890 status) 891 return 892 893 # Don't need try-finally, __uv_stream_on_write_impl is void 894 __uv_stream_on_write_impl(req, status) 895 896 897cdef void __uv_stream_buffered_alloc(uv.uv_handle_t* stream, 898 size_t suggested_size, 899 uv.uv_buf_t* uvbuf) with gil: 900 901 if __ensure_handle_data(<uv.uv_handle_t*>stream, 902 "UVStream alloc buffer callback") == 0: 903 return 904 905 cdef: 906 UVStream sc = <UVStream>stream.data 907 Loop loop = sc._loop 908 Py_buffer* pybuf = &sc._read_pybuf 909 int got_buf = 0 910 911 if sc._read_pybuf_acquired: 912 uvbuf.len = 0 913 uvbuf.base = NULL 914 return 915 916 sc._read_pybuf_acquired = 0 917 try: 918 buf = run_in_context1( 919 sc.context, 920 sc._protocol_get_buffer, 921 suggested_size, 922 ) 923 PyObject_GetBuffer(buf, pybuf, PyBUF_WRITABLE) 924 got_buf = 1 925 except BaseException as exc: 926 # Can't call 'sc._fatal_error' or 'sc._close', libuv will SF. 927 # We'll do it later in __uv_stream_buffered_on_read when we 928 # receive UV_ENOBUFS. 929 uvbuf.len = 0 930 uvbuf.base = NULL 931 return 932 933 if not pybuf.len: 934 uvbuf.len = 0 935 uvbuf.base = NULL 936 if got_buf: 937 PyBuffer_Release(pybuf) 938 return 939 940 sc._read_pybuf_acquired = 1 941 uvbuf.base = <char*>pybuf.buf 942 uvbuf.len = pybuf.len 943 944 945cdef void __uv_stream_buffered_on_read(uv.uv_stream_t* stream, 946 ssize_t nread, 947 const uv.uv_buf_t* buf) with gil: 948 949 if __ensure_handle_data(<uv.uv_handle_t*>stream, 950 "UVStream buffered read callback") == 0: 951 return 952 953 cdef: 954 UVStream sc = <UVStream>stream.data 955 Loop loop = sc._loop 956 Py_buffer* pybuf = &sc._read_pybuf 957 958 if nread == uv.UV_ENOBUFS: 959 sc._fatal_error( 960 RuntimeError( 961 'unhandled error (or an empty buffer) in get_buffer()'), 962 False) 963 return 964 965 try: 966 if nread > 0 and not sc._read_pybuf_acquired: 967 # From libuv docs: 968 # nread is > 0 if there is data available or < 0 on error. When 969 # we’ve reached EOF, nread will be set to UV_EOF. When 970 # nread < 0, the buf parameter might not point to a valid 971 # buffer; in that case buf.len and buf.base are both set to 0. 972 raise RuntimeError( 973 f'no python buffer is allocated in on_read; nread={nread}') 974 975 if nread == 0: 976 # From libuv docs: 977 # nread might be 0, which does not indicate an error or EOF. 978 # This is equivalent to EAGAIN or EWOULDBLOCK under read(2). 979 return 980 981 if __uv_stream_on_read_common(sc, loop, nread): 982 return 983 984 if UVLOOP_DEBUG: 985 loop._debug_stream_read_cb_total += 1 986 987 run_in_context1(sc.context, sc._protocol_buffer_updated, nread) 988 except BaseException as exc: 989 if UVLOOP_DEBUG: 990 loop._debug_stream_read_cb_errors_total += 1 991 992 sc._fatal_error(exc, False) 993 finally: 994 sc._read_pybuf_acquired = 0 995 PyBuffer_Release(pybuf) 996