1@cython.no_gc_clear
2@cython.freelist(DEFAULT_FREELIST_SIZE)
3cdef class _UDPSendContext:
4    # used to hold additional write request information for uv_write
5
6    cdef:
7        uv.uv_udp_send_t   req
8
9        uv.uv_buf_t     uv_buf
10        Py_buffer       py_buf
11
12        UDPTransport    udp
13
14        bint            closed
15
16    cdef close(self):
17        if self.closed:
18            return
19
20        self.closed = 1
21        PyBuffer_Release(&self.py_buf)  # void
22        self.req.data = NULL
23        self.uv_buf.base = NULL
24        Py_DECREF(self)
25        self.udp = None
26
27    @staticmethod
28    cdef _UDPSendContext new(UDPTransport udp, object data):
29        cdef _UDPSendContext ctx
30        ctx = _UDPSendContext.__new__(_UDPSendContext)
31        ctx.udp = None
32        ctx.closed = 1
33
34        ctx.req.data = <void*> ctx
35        Py_INCREF(ctx)
36
37        PyObject_GetBuffer(data, &ctx.py_buf, PyBUF_SIMPLE)
38        ctx.uv_buf.base = <char*>ctx.py_buf.buf
39        ctx.uv_buf.len = ctx.py_buf.len
40        ctx.udp = udp
41
42        ctx.closed = 0
43        return ctx
44
45    def __dealloc__(self):
46        if UVLOOP_DEBUG:
47            if not self.closed:
48                raise RuntimeError(
49                    'open _UDPSendContext is being deallocated')
50        self.udp = None
51
52
53@cython.no_gc_clear
54cdef class UDPTransport(UVBaseTransport):
55    def __cinit__(self):
56        self._family = uv.AF_UNSPEC
57        self.__receiving = 0
58        self._address = None
59        self.context = Context_CopyCurrent()
60
61    cdef _init(self, Loop loop, unsigned int family):
62        cdef int err
63
64        self._start_init(loop)
65
66        self._handle = <uv.uv_handle_t*>PyMem_RawMalloc(sizeof(uv.uv_udp_t))
67        if self._handle is NULL:
68            self._abort_init()
69            raise MemoryError()
70
71        err = uv.uv_udp_init_ex(loop.uvloop,
72                                <uv.uv_udp_t*>self._handle,
73                                family)
74        if err < 0:
75            self._abort_init()
76            raise convert_error(err)
77
78        if family in (uv.AF_INET, uv.AF_INET6):
79            self._family = family
80
81        self._finish_init()
82
83    cdef _set_address(self, system.addrinfo *addr):
84        self._address = __convert_sockaddr_to_pyaddr(addr.ai_addr)
85
86    cdef _connect(self, system.sockaddr* addr, size_t addr_len):
87        cdef int err
88        err = uv.uv_udp_connect(<uv.uv_udp_t*>self._handle, addr)
89        if err < 0:
90            exc = convert_error(err)
91            raise exc
92
93    cdef open(self, int family, int sockfd):
94        if family in (uv.AF_INET, uv.AF_INET6, uv.AF_UNIX):
95            self._family = family
96        else:
97            raise ValueError(
98                'cannot open a UDP handle, invalid family {}'.format(family))
99
100        cdef int err
101        err = uv.uv_udp_open(<uv.uv_udp_t*>self._handle,
102                             <uv.uv_os_sock_t>sockfd)
103
104        if err < 0:
105            exc = convert_error(err)
106            raise exc
107
108    cdef _bind(self, system.sockaddr* addr):
109        cdef:
110            int err
111            int flags = 0
112
113        self._ensure_alive()
114
115        err = uv.uv_udp_bind(<uv.uv_udp_t*>self._handle, addr, flags)
116        if err < 0:
117            exc = convert_error(err)
118            raise exc
119
120    cdef _set_broadcast(self, bint on):
121        cdef int err
122
123        self._ensure_alive()
124
125        err = uv.uv_udp_set_broadcast(<uv.uv_udp_t*>self._handle, on)
126        if err < 0:
127            exc = convert_error(err)
128            raise exc
129
130    cdef size_t _get_write_buffer_size(self):
131        if self._handle is NULL:
132            return 0
133        return (<uv.uv_udp_t*>self._handle).send_queue_size
134
135    cdef bint _is_reading(self):
136        return self.__receiving
137
138    cdef _start_reading(self):
139        cdef int err
140
141        if self.__receiving:
142            return
143
144        self._ensure_alive()
145
146        err = uv.uv_udp_recv_start(<uv.uv_udp_t*>self._handle,
147                                   __loop_alloc_buffer,
148                                   __uv_udp_on_receive)
149
150        if err < 0:
151            exc = convert_error(err)
152            self._fatal_error(exc, True)
153            return
154        else:
155            # UDPTransport must live until the read callback is called
156            self.__receiving_started()
157
158    cdef _stop_reading(self):
159        cdef int err
160
161        if not self.__receiving:
162            return
163
164        self._ensure_alive()
165
166        err = uv.uv_udp_recv_stop(<uv.uv_udp_t*>self._handle)
167        if err < 0:
168            exc = convert_error(err)
169            self._fatal_error(exc, True)
170            return
171        else:
172            self.__receiving_stopped()
173
174    cdef inline __receiving_started(self):
175        if self.__receiving:
176            return
177        self.__receiving = 1
178        Py_INCREF(self)
179
180    cdef inline __receiving_stopped(self):
181        if not self.__receiving:
182            return
183        self.__receiving = 0
184        Py_DECREF(self)
185
186    cdef _new_socket(self):
187        if self._family not in (uv.AF_INET, uv.AF_INET6, uv.AF_UNIX):
188            raise RuntimeError(
189                'UDPTransport.family is undefined; '
190                'cannot create python socket')
191
192        fileno = self._fileno()
193        return PseudoSocket(self._family, uv.SOCK_DGRAM, 0, fileno)
194
195    cdef _send(self, object data, object addr):
196        cdef:
197            _UDPSendContext ctx
198            system.sockaddr_storage saddr_st
199            system.sockaddr *saddr
200            Py_buffer       try_pybuf
201            uv.uv_buf_t     try_uvbuf
202
203        self._ensure_alive()
204
205        if self._family not in (uv.AF_INET, uv.AF_INET6, uv.AF_UNIX):
206            raise RuntimeError('UDPTransport.family is undefined; cannot send')
207
208        if addr is None:
209            saddr = NULL
210        else:
211            try:
212                __convert_pyaddr_to_sockaddr(self._family, addr,
213                                             <system.sockaddr*>&saddr_st)
214            except (ValueError, TypeError):
215                raise
216            except Exception:
217                raise ValueError(
218                    f'{addr!r}: socket family mismatch or '
219                    f'a DNS lookup is required')
220            saddr = <system.sockaddr*>(&saddr_st)
221
222        if self._get_write_buffer_size() == 0:
223            PyObject_GetBuffer(data, &try_pybuf, PyBUF_SIMPLE)
224            try_uvbuf.base = <char*>try_pybuf.buf
225            try_uvbuf.len = try_pybuf.len
226            err = uv.uv_udp_try_send(<uv.uv_udp_t*>self._handle,
227                                     &try_uvbuf,
228                                     1,
229                                     saddr)
230            PyBuffer_Release(&try_pybuf)
231        else:
232            err = uv.UV_EAGAIN
233
234        if err == uv.UV_EAGAIN:
235            ctx = _UDPSendContext.new(self, data)
236            err = uv.uv_udp_send(&ctx.req,
237                                 <uv.uv_udp_t*>self._handle,
238                                 &ctx.uv_buf,
239                                 1,
240                                 saddr,
241                                 __uv_udp_on_send)
242
243            if err < 0:
244                ctx.close()
245
246                exc = convert_error(err)
247                self._fatal_error(exc, True)
248            else:
249                self._maybe_pause_protocol()
250
251        else:
252            if err < 0:
253                exc = convert_error(err)
254                self._fatal_error(exc, True)
255            else:
256                self._on_sent(None, self.context.copy())
257
258    cdef _on_receive(self, bytes data, object exc, object addr):
259        if exc is None:
260            run_in_context2(
261                self.context, self._protocol.datagram_received, data, addr,
262            )
263        else:
264            run_in_context1(self.context, self._protocol.error_received, exc)
265
266    cdef _on_sent(self, object exc, object context=None):
267        if exc is not None:
268            if isinstance(exc, OSError):
269                if context is None:
270                    context = self.context
271                run_in_context1(context, self._protocol.error_received, exc)
272            else:
273                self._fatal_error(
274                    exc, False, 'Fatal write error on datagram transport')
275
276        self._maybe_resume_protocol()
277        if not self._get_write_buffer_size():
278            if self._closing:
279                self._schedule_call_connection_lost(None)
280
281    # === Public API ===
282
283    def sendto(self, data, addr=None):
284        if not data:
285            # Replicating asyncio logic here.
286            return
287
288        if self._address:
289            if addr not in (None, self._address):
290                # Replicating asyncio logic here.
291                raise ValueError(
292                    'Invalid address: must be None or %s' % (self._address,))
293
294            # Instead of setting addr to self._address below like what asyncio
295            # does, we depend on previous uv_udp_connect() to set the address
296            addr = None
297
298        if self._conn_lost:
299            # Replicating asyncio logic here.
300            if self._conn_lost >= LOG_THRESHOLD_FOR_CONNLOST_WRITES:
301                aio_logger.warning('socket.send() raised exception.')
302            self._conn_lost += 1
303            return
304
305        self._send(data, addr)
306
307
308cdef void __uv_udp_on_receive(uv.uv_udp_t* handle,
309                              ssize_t nread,
310                              const uv.uv_buf_t* buf,
311                              const system.sockaddr* addr,
312                              unsigned flags) with gil:
313
314    if __ensure_handle_data(<uv.uv_handle_t*>handle,
315                            "UDPTransport receive callback") == 0:
316        return
317
318    cdef:
319        UDPTransport udp = <UDPTransport>handle.data
320        Loop loop = udp._loop
321        bytes data
322        object pyaddr
323
324    # It's OK to free the buffer early, since nothing will
325    # be able to touch it until this method is done.
326    __loop_free_buffer(loop)
327
328    if udp._closed:
329        # The handle was closed, there is no reason to
330        # do any work now.
331        udp.__receiving_stopped()  # Just in case.
332        return
333
334    if addr is NULL and nread == 0:
335        # From libuv docs:
336        #      addr: struct sockaddr* containing the address
337        #      of the sender. Can be NULL. Valid for the duration
338        #      of the callback only.
339        #      [...]
340        #      The receive callback will be called with
341        #      nread == 0 and addr == NULL when there is
342        #      nothing to read, and with nread == 0 and
343        #      addr != NULL when an empty UDP packet is
344        #      received.
345        return
346
347    if addr is NULL:
348        pyaddr = None
349    elif addr.sa_family == uv.AF_UNSPEC:
350        # https://github.com/MagicStack/uvloop/issues/304
351        IF UNAME_SYSNAME == "Linux":
352            pyaddr = None
353        ELSE:
354            pyaddr = ''
355    else:
356        try:
357            pyaddr = __convert_sockaddr_to_pyaddr(addr)
358        except BaseException as exc:
359            udp._error(exc, False)
360            return
361
362    if nread < 0:
363        exc = convert_error(nread)
364        udp._on_receive(None, exc, pyaddr)
365        return
366
367    if nread == 0:
368        data = b''
369    else:
370        data = loop._recv_buffer[:nread]
371
372    try:
373        udp._on_receive(data, None, pyaddr)
374    except BaseException as exc:
375        udp._error(exc, False)
376
377
378cdef void __uv_udp_on_send(uv.uv_udp_send_t* req, int status) with gil:
379
380    if req.data is NULL:
381        # Shouldn't happen as:
382        #    - _UDPSendContext does an extra INCREF in its 'init()'
383        #    - _UDPSendContext holds a ref to the relevant UDPTransport
384        aio_logger.error(
385            'UVStream.write callback called with NULL req.data, status=%r',
386            status)
387        return
388
389    cdef:
390        _UDPSendContext ctx = <_UDPSendContext> req.data
391        UDPTransport udp = <UDPTransport>ctx.udp
392
393    ctx.close()
394
395    if status < 0:
396        exc = convert_error(status)
397        print(exc)
398    else:
399        exc = None
400
401    try:
402        udp._on_sent(exc)
403    except BaseException as exc:
404        udp._error(exc, False)
405