1DEF __PREALLOCED_BUFS = 4
2
3
4@cython.no_gc_clear
5@cython.freelist(DEFAULT_FREELIST_SIZE)
6cdef class _StreamWriteContext:
7    # used to hold additional write request information for uv_write
8
9    cdef:
10        uv.uv_write_t   req
11
12        list            buffers
13
14        uv.uv_buf_t     uv_bufs_sml[__PREALLOCED_BUFS]
15        Py_buffer       py_bufs_sml[__PREALLOCED_BUFS]
16        bint            py_bufs_sml_inuse
17
18        uv.uv_buf_t*    uv_bufs
19        Py_buffer*      py_bufs
20        size_t          py_bufs_len
21
22        uv.uv_buf_t*    uv_bufs_start
23        size_t          uv_bufs_len
24
25        UVStream        stream
26
27        bint            closed
28
29    cdef free_bufs(self):
30        cdef size_t i
31
32        if self.uv_bufs is not NULL:
33            PyMem_RawFree(self.uv_bufs)
34            self.uv_bufs = NULL
35            if UVLOOP_DEBUG:
36                if self.py_bufs_sml_inuse:
37                    raise RuntimeError(
38                        '_StreamWriteContext.close: uv_bufs != NULL and '
39                        'py_bufs_sml_inuse is True')
40
41        if self.py_bufs is not NULL:
42            for i from 0 <= i < self.py_bufs_len:
43                PyBuffer_Release(&self.py_bufs[i])
44            PyMem_RawFree(self.py_bufs)
45            self.py_bufs = NULL
46            if UVLOOP_DEBUG:
47                if self.py_bufs_sml_inuse:
48                    raise RuntimeError(
49                        '_StreamWriteContext.close: py_bufs != NULL and '
50                        'py_bufs_sml_inuse is True')
51
52        if self.py_bufs_sml_inuse:
53            for i from 0 <= i < self.py_bufs_len:
54                PyBuffer_Release(&self.py_bufs_sml[i])
55            self.py_bufs_sml_inuse = 0
56
57        self.py_bufs_len = 0
58        self.buffers = None
59
60    cdef close(self):
61        if self.closed:
62            return
63        self.closed = 1
64        self.free_bufs()
65        Py_DECREF(self)
66
67    cdef advance_uv_buf(self, size_t sent):
68        # Advance the pointer to first uv_buf and the
69        # pointer to first byte in that buffer.
70        #
71        # We do this after a "uv_try_write" call, which
72        # sometimes sends only a portion of data.
73        # We then call "advance_uv_buf" on the write
74        # context, and reuse it in a "uv_write" call.
75
76        cdef:
77            uv.uv_buf_t* buf
78            size_t idx
79
80        for idx from 0 <= idx < self.uv_bufs_len:
81            buf = &self.uv_bufs_start[idx]
82            if buf.len > sent:
83                buf.len -= sent
84                buf.base = buf.base + sent
85                self.uv_bufs_start = buf
86                self.uv_bufs_len -= idx
87                return
88            else:
89                sent -= self.uv_bufs_start[idx].len
90
91            if UVLOOP_DEBUG:
92                if sent < 0:
93                    raise RuntimeError('fatal: sent < 0 in advance_uv_buf')
94
95        raise RuntimeError('fatal: Could not advance _StreamWriteContext')
96
97    @staticmethod
98    cdef _StreamWriteContext new(UVStream stream, list buffers):
99        cdef:
100            _StreamWriteContext ctx
101            int uv_bufs_idx = 0
102            size_t py_bufs_len = 0
103            int i
104
105            Py_buffer* p_pybufs
106            uv.uv_buf_t* p_uvbufs
107
108        ctx = _StreamWriteContext.__new__(_StreamWriteContext)
109        ctx.stream = None
110        ctx.closed = 1
111        ctx.py_bufs_len = 0
112        ctx.py_bufs_sml_inuse = 0
113        ctx.uv_bufs = NULL
114        ctx.py_bufs = NULL
115        ctx.buffers = buffers
116        ctx.stream = stream
117
118        if len(buffers) <= __PREALLOCED_BUFS:
119            # We've got a small number of buffers to write, don't
120            # need to use malloc.
121            ctx.py_bufs_sml_inuse = 1
122            p_pybufs = <Py_buffer*>&ctx.py_bufs_sml
123            p_uvbufs = <uv.uv_buf_t*>&ctx.uv_bufs_sml
124
125        else:
126            for buf in buffers:
127                if UVLOOP_DEBUG:
128                    if not isinstance(buf, (bytes, bytearray, memoryview)):
129                        raise RuntimeError(
130                            'invalid data in writebuf: an instance of '
131                            'bytes, bytearray or memoryview was expected, '
132                            'got {}'.format(type(buf)))
133
134                if not PyBytes_CheckExact(buf):
135                    py_bufs_len += 1
136
137            if py_bufs_len > 0:
138                ctx.py_bufs = <Py_buffer*>PyMem_RawMalloc(
139                    py_bufs_len * sizeof(Py_buffer))
140                if ctx.py_bufs is NULL:
141                    raise MemoryError()
142
143            ctx.uv_bufs = <uv.uv_buf_t*>PyMem_RawMalloc(
144                len(buffers) * sizeof(uv.uv_buf_t))
145            if ctx.uv_bufs is NULL:
146                raise MemoryError()
147
148            p_pybufs = ctx.py_bufs
149            p_uvbufs = ctx.uv_bufs
150
151        py_bufs_len = 0
152        for buf in buffers:
153            if PyBytes_CheckExact(buf):
154                # We can only use this hack for bytes since it's
155                # immutable.  For everything else it is only safe to
156                # use buffer protocol.
157                p_uvbufs[uv_bufs_idx].base = PyBytes_AS_STRING(buf)
158                p_uvbufs[uv_bufs_idx].len = Py_SIZE(buf)
159
160            else:
161                try:
162                    PyObject_GetBuffer(
163                        buf, &p_pybufs[py_bufs_len], PyBUF_SIMPLE)
164                except Exception:
165                    # This shouldn't ever happen, as `UVStream._write`
166                    # casts non-bytes objects to `memoryviews`.
167                    ctx.py_bufs_len = py_bufs_len
168                    ctx.free_bufs()
169                    raise
170
171                p_uvbufs[uv_bufs_idx].base = <char*>p_pybufs[py_bufs_len].buf
172                p_uvbufs[uv_bufs_idx].len = p_pybufs[py_bufs_len].len
173
174                py_bufs_len += 1
175
176            uv_bufs_idx += 1
177
178        ctx.uv_bufs_start = p_uvbufs
179        ctx.uv_bufs_len = uv_bufs_idx
180
181        ctx.py_bufs_len = py_bufs_len
182        ctx.req.data = <void*> ctx
183
184        if UVLOOP_DEBUG:
185            stream._loop._debug_stream_write_ctx_total += 1
186            stream._loop._debug_stream_write_ctx_cnt += 1
187
188        # Do incref after everything else is done.
189        # Under no circumstances we want `ctx` to be GCed while
190        # libuv is still working with `ctx.uv_bufs`.
191        Py_INCREF(ctx)
192        ctx.closed = 0
193        return ctx
194
195    def __dealloc__(self):
196        if not self.closed:
197            # Because we do an INCREF in _StreamWriteContext.new,
198            # __dealloc__ shouldn't ever happen with `self.closed == 1`
199            raise RuntimeError(
200                'open _StreamWriteContext is being deallocated')
201
202        if UVLOOP_DEBUG:
203            if self.stream is not None:
204                self.stream._loop._debug_stream_write_ctx_cnt -= 1
205                self.stream = None
206
207
208@cython.no_gc_clear
209cdef class UVStream(UVBaseTransport):
210
211    def __cinit__(self):
212        self.__shutting_down = 0
213        self.__reading = 0
214        self.__read_error_close = 0
215        self.__buffered = 0
216        self._eof = 0
217        self._buffer = []
218        self._buffer_size = 0
219
220        self._protocol_get_buffer = None
221        self._protocol_buffer_updated = None
222
223        self._read_pybuf_acquired = False
224
225    cdef _set_protocol(self, object protocol):
226        if protocol is None:
227            raise TypeError('protocol is required')
228
229        UVBaseTransport._set_protocol(self, protocol)
230
231        if (hasattr(protocol, 'get_buffer') and
232                not isinstance(protocol, aio_Protocol)):
233            try:
234                self._protocol_get_buffer = protocol.get_buffer
235                self._protocol_buffer_updated = protocol.buffer_updated
236                self.__buffered = 1
237            except AttributeError:
238                pass
239        else:
240            self.__buffered = 0
241
242    cdef _clear_protocol(self):
243        UVBaseTransport._clear_protocol(self)
244        self._protocol_get_buffer = None
245        self._protocol_buffer_updated = None
246        self.__buffered = 0
247
248    cdef inline _shutdown(self):
249        cdef int err
250
251        if self.__shutting_down:
252            return
253        self.__shutting_down = 1
254
255        self._ensure_alive()
256
257        self._shutdown_req.data = <void*> self
258        err = uv.uv_shutdown(&self._shutdown_req,
259                             <uv.uv_stream_t*> self._handle,
260                             __uv_stream_on_shutdown)
261        if err < 0:
262            exc = convert_error(err)
263            self._fatal_error(exc, True)
264            return
265
266    cdef inline _accept(self, UVStream server):
267        cdef int err
268        self._ensure_alive()
269
270        err = uv.uv_accept(<uv.uv_stream_t*>server._handle,
271                           <uv.uv_stream_t*>self._handle)
272        if err < 0:
273            exc = convert_error(err)
274            self._fatal_error(exc, True)
275            return
276
277        self._on_accept()
278
279    cdef inline _close_on_read_error(self):
280        self.__read_error_close = 1
281
282    cdef bint _is_reading(self):
283        return self.__reading
284
285    cdef _start_reading(self):
286        cdef int err
287
288        if self._closing:
289            return
290
291        self._ensure_alive()
292
293        if self.__reading:
294            return
295
296        if self.__buffered:
297            err = uv.uv_read_start(<uv.uv_stream_t*>self._handle,
298                                   __uv_stream_buffered_alloc,
299                                   __uv_stream_buffered_on_read)
300        else:
301            err = uv.uv_read_start(<uv.uv_stream_t*>self._handle,
302                                   __loop_alloc_buffer,
303                                   __uv_stream_on_read)
304        if err < 0:
305            exc = convert_error(err)
306            self._fatal_error(exc, True)
307            return
308        else:
309            # UVStream must live until the read callback is called
310            self.__reading_started()
311
312    cdef inline __reading_started(self):
313        if self.__reading:
314            return
315        self.__reading = 1
316        Py_INCREF(self)
317
318    cdef inline __reading_stopped(self):
319        if not self.__reading:
320            return
321        self.__reading = 0
322        Py_DECREF(self)
323
324    cdef _stop_reading(self):
325        cdef int err
326
327        if not self.__reading:
328            return
329
330        self._ensure_alive()
331
332        # From libuv docs:
333        #    This function is idempotent and may be safely
334        #    called on a stopped stream.
335        err = uv.uv_read_stop(<uv.uv_stream_t*>self._handle)
336        if err < 0:
337            exc = convert_error(err)
338            self._fatal_error(exc, True)
339            return
340        else:
341            self.__reading_stopped()
342
343    cdef inline _try_write(self, object data):
344        cdef:
345            ssize_t written
346            bint used_buf = 0
347            Py_buffer py_buf
348            void* buf
349            size_t blen
350            int saved_errno
351            int fd
352
353        if (<uv.uv_stream_t*>self._handle).write_queue_size != 0:
354            raise RuntimeError(
355                'UVStream._try_write called with data in uv buffers')
356
357        if PyBytes_CheckExact(data):
358            # We can only use this hack for bytes since it's
359            # immutable.  For everything else it is only safe to
360            # use buffer protocol.
361            buf = <void*>PyBytes_AS_STRING(data)
362            blen = Py_SIZE(data)
363        else:
364            PyObject_GetBuffer(data, &py_buf, PyBUF_SIMPLE)
365            used_buf = 1
366            buf = py_buf.buf
367            blen = py_buf.len
368
369        if blen == 0:
370            # Empty data, do nothing.
371            return 0
372
373        fd = self._fileno()
374        # Use `unistd.h/write` directly, it's faster than
375        # uv_try_write -- less layers of code.  The error
376        # checking logic is copied from libuv.
377        written = system.write(fd, buf, blen)
378        while written == -1 and (
379                errno.errno == errno.EINTR or
380                (system.PLATFORM_IS_APPLE and
381                    errno.errno == errno.EPROTOTYPE)):
382            # From libuv code (unix/stream.c):
383            #   Due to a possible kernel bug at least in OS X 10.10 "Yosemite",
384            #   EPROTOTYPE can be returned while trying to write to a socket
385            #   that is shutting down. If we retry the write, we should get
386            #   the expected EPIPE instead.
387            written = system.write(fd, buf, blen)
388        saved_errno = errno.errno
389
390        if used_buf:
391            PyBuffer_Release(&py_buf)
392
393        if written < 0:
394            if saved_errno == errno.EAGAIN or \
395                    saved_errno == system.EWOULDBLOCK:
396                return -1
397            else:
398                exc = convert_error(-saved_errno)
399                self._fatal_error(exc, True)
400                return
401
402        if UVLOOP_DEBUG:
403            self._loop._debug_stream_write_tries += 1
404
405        if <size_t>written == blen:
406            return 0
407
408        return written
409
410    cdef inline _write(self, object data):
411        cdef int dlen
412
413        if not PyBytes_CheckExact(data):
414            data = memoryview(data).cast('b')
415
416        dlen = len(data)
417        if not dlen:
418            return
419
420        self._buffer_size += dlen
421        self._buffer.append(data)
422
423        if (not self._protocol_paused and
424                (<uv.uv_stream_t*>self._handle).write_queue_size == 0 and
425                self._buffer_size > self._high_water):
426            # Fast-path.  If:
427            #   - the protocol isn't yet paused,
428            #   - there is no data in libuv buffers for this stream,
429            #   - the protocol will be paused if we continue to buffer data
430            #
431            # Then:
432            #   - Try to write all buffered data right now.
433            all_sent = self._exec_write()
434            if UVLOOP_DEBUG:
435                if self._buffer_size != 0 or self._buffer != []:
436                    raise RuntimeError(
437                        '_buffer_size is not 0 after a successful _exec_write')
438
439            # There is no need to call `_queue_write` anymore,
440            # as `uv_write` should be called already.
441
442            if not all_sent:
443                # If not all of the data was sent successfully,
444                # we might need to pause the protocol.
445                self._maybe_pause_protocol()
446            return
447
448        self._maybe_pause_protocol()
449        self._loop._queue_write(self)
450
451    cdef inline _exec_write(self):
452        cdef:
453            int err
454            int buf_len
455            _StreamWriteContext ctx = None
456
457        if self._closed:
458            # If the handle is closed, just return, it's too
459            # late to do anything.
460            return
461
462        buf_len = len(self._buffer)
463        if not buf_len:
464            return
465
466        if (<uv.uv_stream_t*>self._handle).write_queue_size == 0:
467            # libuv internal write buffers for this stream are empty.
468            if buf_len == 1:
469                # If we only have one piece of data to send, let's
470                # use our fast implementation of try_write.
471                data = self._buffer[0]
472                sent = self._try_write(data)
473
474                if sent is None:
475                    # A `self._fatal_error` was called.
476                    # It might not raise an exception under some
477                    # conditions.
478                    self._buffer_size = 0
479                    self._buffer.clear()
480                    if not self._closing:
481                        # This should never happen.
482                        raise RuntimeError(
483                            'stream is open after UVStream._try_write '
484                            'returned None')
485                    return
486
487                if sent == 0:
488                    # All data was successfully written.
489                    self._buffer_size = 0
490                    self._buffer.clear()
491                    # on_write will call "maybe_resume_protocol".
492                    self._on_write()
493                    return True
494
495                if sent > 0:
496                    if UVLOOP_DEBUG:
497                        if sent == len(data):
498                            raise RuntimeError(
499                                '_try_write sent all data and returned '
500                                'non-zero')
501
502                    if PyBytes_CheckExact(data):
503                        # Cast bytes to memoryview to avoid copying
504                        # data that wasn't sent.
505                        data = memoryview(data)
506                    data = data[sent:]
507
508                    self._buffer_size -= sent
509                    self._buffer[0] = data
510
511                # At this point it's either data was sent partially,
512                # or an EAGAIN has happened.
513
514            else:
515                ctx = _StreamWriteContext.new(self, self._buffer)
516
517                err = uv.uv_try_write(<uv.uv_stream_t*>self._handle,
518                                      ctx.uv_bufs_start,
519                                      ctx.uv_bufs_len)
520
521                if err > 0:
522                    # Some data was successfully sent.
523
524                    if <size_t>err == self._buffer_size:
525                        # Everything was sent.
526                        ctx.close()
527                        self._buffer.clear()
528                        self._buffer_size = 0
529                        # on_write will call "maybe_resume_protocol".
530                        self._on_write()
531                        return True
532
533                    try:
534                        # Advance pointers to uv_bufs in `ctx`,
535                        # we will reuse it soon for a uv_write
536                        # call.
537                        ctx.advance_uv_buf(<ssize_t>err)
538                    except Exception as ex:  # This should never happen.
539                        # Let's try to close the `ctx` anyways.
540                        ctx.close()
541                        self._fatal_error(ex, True)
542                        self._buffer.clear()
543                        self._buffer_size = 0
544                        return
545
546                elif err != uv.UV_EAGAIN:
547                    ctx.close()
548                    exc = convert_error(err)
549                    self._fatal_error(exc, True)
550                    self._buffer.clear()
551                    self._buffer_size = 0
552                    return
553
554                # fall through
555
556        if ctx is None:
557            ctx = _StreamWriteContext.new(self, self._buffer)
558
559        err = uv.uv_write(&ctx.req,
560                          <uv.uv_stream_t*>self._handle,
561                          ctx.uv_bufs_start,
562                          ctx.uv_bufs_len,
563                          __uv_stream_on_write)
564
565        self._buffer_size = 0
566        # Can't use `_buffer.clear()` here: `ctx` holds a reference to
567        # the `_buffer`.
568        self._buffer = []
569
570        if err < 0:
571            # close write context
572            ctx.close()
573
574            exc = convert_error(err)
575            self._fatal_error(exc, True)
576            return
577
578        self._maybe_resume_protocol()
579
580    cdef size_t _get_write_buffer_size(self):
581        if self._handle is NULL:
582            return 0
583        return ((<uv.uv_stream_t*>self._handle).write_queue_size +
584                self._buffer_size)
585
586    cdef _close(self):
587        try:
588            if self._read_pybuf_acquired:
589                # Should never happen. libuv always calls uv_alloc/uv_read
590                # in pairs.
591                self._loop.call_exception_handler({
592                    'transport': self,
593                    'message': 'XXX: an allocated buffer in transport._close()'
594                })
595                self._read_pybuf_acquired = 0
596                PyBuffer_Release(&self._read_pybuf)
597
598            self._stop_reading()
599        finally:
600            UVSocketHandle._close(<UVHandle>self)
601
602    cdef inline _on_accept(self):
603        # Ultimately called by __uv_stream_on_listen.
604        self._init_protocol()
605
606    cdef inline _on_eof(self):
607        # Any exception raised here will be caught in
608        # __uv_stream_on_read.
609
610        try:
611            meth = self._protocol.eof_received
612        except AttributeError:
613            keep_open = False
614        else:
615            keep_open = run_in_context(self.context, meth)
616
617        if keep_open:
618            # We're keeping the connection open so the
619            # protocol can write more, but we still can't
620            # receive more, so remove the reader callback.
621            self._stop_reading()
622        else:
623            self.close()
624
625    cdef inline _on_write(self):
626        self._maybe_resume_protocol()
627        if not self._get_write_buffer_size():
628            if self._closing:
629                self._schedule_call_connection_lost(None)
630            elif self._eof:
631                self._shutdown()
632
633    cdef inline _init(self, Loop loop, object protocol, Server server,
634                      object waiter, object context):
635        self.context = context
636        self._set_protocol(protocol)
637        self._start_init(loop)
638
639        if server is not None:
640            self._set_server(server)
641
642        if waiter is not None:
643            self._set_waiter(waiter)
644
645    cdef inline _on_connect(self, object exc):
646        # Called from __tcp_connect_callback (tcp.pyx) and
647        # __pipe_connect_callback (pipe.pyx).
648        if exc is None:
649            self._init_protocol()
650        else:
651            if self._waiter is None:
652                self._fatal_error(exc, False, "connect failed")
653            elif self._waiter.cancelled():
654                # Connect call was cancelled; just close the transport
655                # silently.
656                self._close()
657            elif self._waiter.done():
658                self._fatal_error(exc, False, "connect failed")
659            else:
660                self._waiter.set_exception(exc)
661                self._close()
662
663    # === Public API ===
664
665    def __repr__(self):
666        return '<{} closed={} reading={} {:#x}>'.format(
667            self.__class__.__name__,
668            self._closed,
669            self.__reading,
670            id(self))
671
672    def write(self, object buf):
673        self._ensure_alive()
674
675        if self._eof:
676            raise RuntimeError('Cannot call write() after write_eof()')
677        if not buf:
678            return
679        if self._conn_lost:
680            self._conn_lost += 1
681            return
682        self._write(buf)
683
684    def writelines(self, bufs):
685        self._ensure_alive()
686
687        if self._eof:
688            raise RuntimeError('Cannot call writelines() after write_eof()')
689        if self._conn_lost:
690            self._conn_lost += 1
691            return
692        for buf in bufs:
693            self._write(buf)
694
695    def write_eof(self):
696        self._ensure_alive()
697
698        if self._eof:
699            return
700
701        self._eof = 1
702        if not self._get_write_buffer_size():
703            self._shutdown()
704
705    def can_write_eof(self):
706        return True
707
708    def is_reading(self):
709        return self._is_reading()
710
711    def pause_reading(self):
712        if self._closing or not self._is_reading():
713            return
714        self._stop_reading()
715
716    def resume_reading(self):
717        if self._is_reading() or self._closing:
718            return
719        self._start_reading()
720
721
722cdef void __uv_stream_on_shutdown(uv.uv_shutdown_t* req,
723                                  int status) with gil:
724
725    # callback for uv_shutdown
726
727    if req.data is NULL:
728        aio_logger.error(
729            'UVStream.shutdown callback called with NULL req.data, status=%r',
730            status)
731        return
732
733    cdef UVStream stream = <UVStream> req.data
734
735    if status < 0 and status != uv.UV_ECANCELED:
736        # From libuv source code:
737        #     The ECANCELED error code is a lie, the shutdown(2) syscall is a
738        #     fait accompli at this point. Maybe we should revisit this in
739        #     v0.11.  A possible reason for leaving it unchanged is that it
740        #     informs the callee that the handle has been destroyed.
741
742        if UVLOOP_DEBUG:
743            stream._loop._debug_stream_shutdown_errors_total += 1
744
745        exc = convert_error(status)
746        stream._fatal_error(
747            exc, False, "error status in uv_stream_t.shutdown callback")
748        return
749
750
751cdef inline bint __uv_stream_on_read_common(UVStream sc, Loop loop,
752                                            ssize_t nread):
753    if sc._closed:
754        # The stream was closed, there is no reason to
755        # do any work now.
756        sc.__reading_stopped()  # Just in case.
757        return True
758
759    if nread == uv.UV_EOF:
760        # From libuv docs:
761        #     The callee is responsible for stopping closing the stream
762        #     when an error happens by calling uv_read_stop() or uv_close().
763        #     Trying to read from the stream again is undefined.
764        try:
765            if UVLOOP_DEBUG:
766                loop._debug_stream_read_eof_total += 1
767
768            sc._stop_reading()
769            sc._on_eof()
770        except BaseException as ex:
771            if UVLOOP_DEBUG:
772                loop._debug_stream_read_eof_cb_errors_total += 1
773
774            sc._fatal_error(ex, False)
775        finally:
776            return True
777
778    if nread == 0:
779        # From libuv docs:
780        #     nread might be 0, which does not indicate an error or EOF.
781        #     This is equivalent to EAGAIN or EWOULDBLOCK under read(2).
782        return True
783
784    if nread < 0:
785        # From libuv docs:
786        #     The callee is responsible for stopping closing the stream
787        #     when an error happens by calling uv_read_stop() or uv_close().
788        #     Trying to read from the stream again is undefined.
789        #
790        # Therefore, we're closing the stream.  Since "UVHandle._close()"
791        # doesn't raise exceptions unless uvloop is built with DEBUG=1,
792        # we don't need try...finally here.
793
794        if UVLOOP_DEBUG:
795            loop._debug_stream_read_errors_total += 1
796
797        if sc.__read_error_close:
798            # Used for getting notified when a pipe is closed.
799            # See WriteUnixTransport for the explanation.
800            sc._on_eof()
801            return True
802
803        exc = convert_error(nread)
804        sc._fatal_error(
805            exc, False, "error status in uv_stream_t.read callback")
806        return True
807
808    return False
809
810
811cdef inline void __uv_stream_on_read_impl(uv.uv_stream_t* stream,
812                                          ssize_t nread,
813                                          const uv.uv_buf_t* buf):
814    cdef:
815        UVStream sc = <UVStream>stream.data
816        Loop loop = sc._loop
817
818    # It's OK to free the buffer early, since nothing will
819    # be able to touch it until this method is done.
820    __loop_free_buffer(loop)
821
822    if __uv_stream_on_read_common(sc, loop, nread):
823        return
824
825    try:
826        if UVLOOP_DEBUG:
827            loop._debug_stream_read_cb_total += 1
828
829        run_in_context1(
830            sc.context,
831            sc._protocol_data_received,
832            loop._recv_buffer[:nread],
833        )
834    except BaseException as exc:
835        if UVLOOP_DEBUG:
836            loop._debug_stream_read_cb_errors_total += 1
837
838        sc._fatal_error(exc, False)
839
840
841cdef inline void __uv_stream_on_write_impl(uv.uv_write_t* req, int status):
842    cdef:
843        _StreamWriteContext ctx = <_StreamWriteContext> req.data
844        UVStream stream = <UVStream>ctx.stream
845
846    ctx.close()
847
848    if stream._closed:
849        # The stream was closed, there is nothing to do.
850        # Even if there is an error, like EPIPE, there
851        # is no reason to report it.
852        return
853
854    if status < 0:
855        if UVLOOP_DEBUG:
856            stream._loop._debug_stream_write_errors_total += 1
857
858        exc = convert_error(status)
859        stream._fatal_error(
860            exc, False, "error status in uv_stream_t.write callback")
861        return
862
863    try:
864        stream._on_write()
865    except BaseException as exc:
866        if UVLOOP_DEBUG:
867            stream._loop._debug_stream_write_cb_errors_total += 1
868
869        stream._fatal_error(exc, False)
870
871
872cdef void __uv_stream_on_read(uv.uv_stream_t* stream,
873                              ssize_t nread,
874                              const uv.uv_buf_t* buf) with gil:
875
876    if __ensure_handle_data(<uv.uv_handle_t*>stream,
877                            "UVStream read callback") == 0:
878        return
879
880    # Don't need try-finally, __uv_stream_on_read_impl is void
881    __uv_stream_on_read_impl(stream, nread, buf)
882
883
884cdef void __uv_stream_on_write(uv.uv_write_t* req, int status) with gil:
885
886    if UVLOOP_DEBUG:
887        if req.data is NULL:
888            aio_logger.error(
889                'UVStream.write callback called with NULL req.data, status=%r',
890                status)
891            return
892
893    # Don't need try-finally, __uv_stream_on_write_impl is void
894    __uv_stream_on_write_impl(req, status)
895
896
897cdef void __uv_stream_buffered_alloc(uv.uv_handle_t* stream,
898                                     size_t suggested_size,
899                                     uv.uv_buf_t* uvbuf) with gil:
900
901    if __ensure_handle_data(<uv.uv_handle_t*>stream,
902                            "UVStream alloc buffer callback") == 0:
903        return
904
905    cdef:
906        UVStream sc = <UVStream>stream.data
907        Loop loop = sc._loop
908        Py_buffer* pybuf = &sc._read_pybuf
909        int got_buf = 0
910
911    if sc._read_pybuf_acquired:
912        uvbuf.len = 0
913        uvbuf.base = NULL
914        return
915
916    sc._read_pybuf_acquired = 0
917    try:
918        buf = run_in_context1(
919            sc.context,
920            sc._protocol_get_buffer,
921            suggested_size,
922        )
923        PyObject_GetBuffer(buf, pybuf, PyBUF_WRITABLE)
924        got_buf = 1
925    except BaseException as exc:
926        # Can't call 'sc._fatal_error' or 'sc._close', libuv will SF.
927        # We'll do it later in __uv_stream_buffered_on_read when we
928        # receive UV_ENOBUFS.
929        uvbuf.len = 0
930        uvbuf.base = NULL
931        return
932
933    if not pybuf.len:
934        uvbuf.len = 0
935        uvbuf.base = NULL
936        if got_buf:
937            PyBuffer_Release(pybuf)
938        return
939
940    sc._read_pybuf_acquired = 1
941    uvbuf.base = <char*>pybuf.buf
942    uvbuf.len = pybuf.len
943
944
945cdef void __uv_stream_buffered_on_read(uv.uv_stream_t* stream,
946                                       ssize_t nread,
947                                       const uv.uv_buf_t* buf) with gil:
948
949    if __ensure_handle_data(<uv.uv_handle_t*>stream,
950                            "UVStream buffered read callback") == 0:
951        return
952
953    cdef:
954        UVStream sc = <UVStream>stream.data
955        Loop loop = sc._loop
956        Py_buffer* pybuf = &sc._read_pybuf
957
958    if nread == uv.UV_ENOBUFS:
959        sc._fatal_error(
960            RuntimeError(
961                'unhandled error (or an empty buffer) in get_buffer()'),
962            False)
963        return
964
965    try:
966        if nread > 0 and not sc._read_pybuf_acquired:
967            # From libuv docs:
968            #     nread is > 0 if there is data available or < 0 on error. When
969            #     we’ve reached EOF, nread will be set to UV_EOF. When
970            #     nread < 0, the buf parameter might not point to a valid
971            #     buffer; in that case buf.len and buf.base are both set to 0.
972            raise RuntimeError(
973                f'no python buffer is allocated in on_read; nread={nread}')
974
975        if nread == 0:
976            # From libuv docs:
977            #     nread might be 0, which does not indicate an error or EOF.
978            #     This is equivalent to EAGAIN or EWOULDBLOCK under read(2).
979            return
980
981        if __uv_stream_on_read_common(sc, loop, nread):
982            return
983
984        if UVLOOP_DEBUG:
985            loop._debug_stream_read_cb_total += 1
986
987        run_in_context1(sc.context, sc._protocol_buffer_updated, nread)
988    except BaseException as exc:
989        if UVLOOP_DEBUG:
990            loop._debug_stream_read_cb_errors_total += 1
991
992        sc._fatal_error(exc, False)
993    finally:
994        sc._read_pybuf_acquired = 0
995        PyBuffer_Release(pybuf)
996