1.. currentmodule:: asyncio
2
3.. _asyncio-streams:
4
5=======
6Streams
7=======
8
9**Source code:** :source:`Lib/asyncio/streams.py`
10
11-------------------------------------------------
12
13Streams are high-level async/await-ready primitives to work with
14network connections.  Streams allow sending and receiving data without
15using callbacks or low-level protocols and transports.
16
17.. _asyncio_example_stream:
18
19Here is an example of a TCP echo client written using asyncio
20streams::
21
22    import asyncio
23
24    async def tcp_echo_client(message):
25        reader, writer = await asyncio.open_connection(
26            '127.0.0.1', 8888)
27
28        print(f'Send: {message!r}')
29        writer.write(message.encode())
30        await writer.drain()
31
32        data = await reader.read(100)
33        print(f'Received: {data.decode()!r}')
34
35        print('Close the connection')
36        writer.close()
37        await writer.wait_closed()
38
39    asyncio.run(tcp_echo_client('Hello World!'))
40
41
42See also the `Examples`_ section below.
43
44
45.. rubric:: Stream Functions
46
47The following top-level asyncio functions can be used to create
48and work with streams:
49
50
51.. coroutinefunction:: open_connection(host=None, port=None, *, \
52                          limit=None, ssl=None, family=0, proto=0, \
53                          flags=0, sock=None, local_addr=None, \
54                          server_hostname=None, ssl_handshake_timeout=None)
55
56   Establish a network connection and return a pair of
57   ``(reader, writer)`` objects.
58
59   The returned *reader* and *writer* objects are instances of
60   :class:`StreamReader` and :class:`StreamWriter` classes.
61
62   *limit* determines the buffer size limit used by the
63   returned :class:`StreamReader` instance.  By default the *limit*
64   is set to 64 KiB.
65
66   The rest of the arguments are passed directly to
67   :meth:`loop.create_connection`.
68
69   .. versionadded:: 3.7
70
71      The *ssl_handshake_timeout* parameter.
72
73   .. deprecated-removed:: 3.8 3.10
74
75      The ``loop`` parameter.  This function has been implicitly getting the
76      current running loop since 3.7.  See
77      :ref:`What's New in 3.10's Removed section <whatsnew310-removed>`
78      for more information.
79
80
81.. coroutinefunction:: start_server(client_connected_cb, host=None, \
82                          port=None, *, limit=None, \
83                          family=socket.AF_UNSPEC, \
84                          flags=socket.AI_PASSIVE, sock=None, \
85                          backlog=100, ssl=None, reuse_address=None, \
86                          reuse_port=None, ssl_handshake_timeout=None, \
87                          start_serving=True)
88
89   Start a socket server.
90
91   The *client_connected_cb* callback is called whenever a new client
92   connection is established.  It receives a ``(reader, writer)`` pair
93   as two arguments, instances of the :class:`StreamReader` and
94   :class:`StreamWriter` classes.
95
96   *client_connected_cb* can be a plain callable or a
97   :ref:`coroutine function <coroutine>`; if it is a coroutine function,
98   it will be automatically scheduled as a :class:`Task`.
99
100   *limit* determines the buffer size limit used by the
101   returned :class:`StreamReader` instance.  By default the *limit*
102   is set to 64 KiB.
103
104   The rest of the arguments are passed directly to
105   :meth:`loop.create_server`.
106
107   .. versionadded:: 3.7
108
109      The *ssl_handshake_timeout* and *start_serving* parameters.
110
111   .. deprecated-removed:: 3.8 3.10
112
113      The ``loop`` parameter.  This function has been implicitly getting the
114      current running loop since 3.7.  See
115      :ref:`What's New in 3.10's Removed section <whatsnew310-removed>`
116      for more information.
117
118
119.. rubric:: Unix Sockets
120
121.. coroutinefunction:: open_unix_connection(path=None, *, limit=None, \
122                        ssl=None, sock=None, server_hostname=None, \
123                        ssl_handshake_timeout=None)
124
125   Establish a Unix socket connection and return a pair of
126   ``(reader, writer)``.
127
128   Similar to :func:`open_connection` but operates on Unix sockets.
129
130   See also the documentation of :meth:`loop.create_unix_connection`.
131
132   .. availability:: Unix.
133
134   .. versionadded:: 3.7
135
136      The *ssl_handshake_timeout* parameter.
137
138   .. versionchanged:: 3.7
139
140      The *path* parameter can now be a :term:`path-like object`
141
142   .. deprecated-removed:: 3.8 3.10
143
144      The ``loop`` parameter.  This function has been implicitly getting the
145      current running loop since 3.7.  See
146      :ref:`What's New in 3.10's Removed section <whatsnew310-removed>`
147      for more information.
148
149
150.. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \
151                          *, limit=None, sock=None, backlog=100, ssl=None, \
152                          ssl_handshake_timeout=None, start_serving=True)
153
154   Start a Unix socket server.
155
156   Similar to :func:`start_server` but works with Unix sockets.
157
158   See also the documentation of :meth:`loop.create_unix_server`.
159
160   .. availability:: Unix.
161
162   .. versionadded:: 3.7
163
164      The *ssl_handshake_timeout* and *start_serving* parameters.
165
166   .. versionchanged:: 3.7
167
168      The *path* parameter can now be a :term:`path-like object`.
169
170   .. deprecated-removed:: 3.8 3.10
171
172      The ``loop`` parameter.  This function has been implicitly getting the
173      current running loop since 3.7.  See
174      :ref:`What's New in 3.10's Removed section <whatsnew310-removed>`
175      for more information.
176
177
178StreamReader
179============
180
181.. class:: StreamReader
182
183   Represents a reader object that provides APIs to read data
184   from the IO stream.
185
186   It is not recommended to instantiate *StreamReader* objects
187   directly; use :func:`open_connection` and :func:`start_server`
188   instead.
189
190   .. coroutinemethod:: read(n=-1)
191
192      Read up to *n* bytes.  If *n* is not provided, or set to ``-1``,
193      read until EOF and return all read bytes.
194
195      If EOF was received and the internal buffer is empty,
196      return an empty ``bytes`` object.
197
198   .. coroutinemethod:: readline()
199
200      Read one line, where "line" is a sequence of bytes
201      ending with ``\n``.
202
203      If EOF is received and ``\n`` was not found, the method
204      returns partially read data.
205
206      If EOF is received and the internal buffer is empty,
207      return an empty ``bytes`` object.
208
209   .. coroutinemethod:: readexactly(n)
210
211      Read exactly *n* bytes.
212
213      Raise an :exc:`IncompleteReadError` if EOF is reached before *n*
214      can be read.  Use the :attr:`IncompleteReadError.partial`
215      attribute to get the partially read data.
216
217   .. coroutinemethod:: readuntil(separator=b'\\n')
218
219      Read data from the stream until *separator* is found.
220
221      On success, the data and separator will be removed from the
222      internal buffer (consumed). Returned data will include the
223      separator at the end.
224
225      If the amount of data read exceeds the configured stream limit, a
226      :exc:`LimitOverrunError` exception is raised, and the data
227      is left in the internal buffer and can be read again.
228
229      If EOF is reached before the complete separator is found,
230      an :exc:`IncompleteReadError` exception is raised, and the internal
231      buffer is reset.  The :attr:`IncompleteReadError.partial` attribute
232      may contain a portion of the separator.
233
234      .. versionadded:: 3.5.2
235
236   .. method:: at_eof()
237
238      Return ``True`` if the buffer is empty and :meth:`feed_eof`
239      was called.
240
241
242StreamWriter
243============
244
245.. class:: StreamWriter
246
247   Represents a writer object that provides APIs to write data
248   to the IO stream.
249
250   It is not recommended to instantiate *StreamWriter* objects
251   directly; use :func:`open_connection` and :func:`start_server`
252   instead.
253
254   .. method:: write(data)
255
256      The method attempts to write the *data* to the underlying socket immediately.
257      If that fails, the data is queued in an internal write buffer until it can be
258      sent.
259
260      The method should be used along with the ``drain()`` method::
261
262         stream.write(data)
263         await stream.drain()
264
265   .. method:: writelines(data)
266
267      The method writes a list (or any iterable) of bytes to the underlying socket
268      immediately.
269      If that fails, the data is queued in an internal write buffer until it can be
270      sent.
271
272      The method should be used along with the ``drain()`` method::
273
274         stream.writelines(lines)
275         await stream.drain()
276
277   .. method:: close()
278
279      The method closes the stream and the underlying socket.
280
281      The method should be used along with the ``wait_closed()`` method::
282
283         stream.close()
284         await stream.wait_closed()
285
286   .. method:: can_write_eof()
287
288      Return ``True`` if the underlying transport supports
289      the :meth:`write_eof` method, ``False`` otherwise.
290
291   .. method:: write_eof()
292
293      Close the write end of the stream after the buffered write
294      data is flushed.
295
296   .. attribute:: transport
297
298      Return the underlying asyncio transport.
299
300   .. method:: get_extra_info(name, default=None)
301
302      Access optional transport information; see
303      :meth:`BaseTransport.get_extra_info` for details.
304
305   .. coroutinemethod:: drain()
306
307      Wait until it is appropriate to resume writing to the stream.
308      Example::
309
310          writer.write(data)
311          await writer.drain()
312
313      This is a flow control method that interacts with the underlying
314      IO write buffer.  When the size of the buffer reaches
315      the high watermark, *drain()* blocks until the size of the
316      buffer is drained down to the low watermark and writing can
317      be resumed.  When there is nothing to wait for, the :meth:`drain`
318      returns immediately.
319
320   .. method:: is_closing()
321
322      Return ``True`` if the stream is closed or in the process of
323      being closed.
324
325      .. versionadded:: 3.7
326
327   .. coroutinemethod:: wait_closed()
328
329      Wait until the stream is closed.
330
331      Should be called after :meth:`close` to wait until the underlying
332      connection is closed.
333
334      .. versionadded:: 3.7
335
336
337Examples
338========
339
340.. _asyncio-tcp-echo-client-streams:
341
342TCP echo client using streams
343-----------------------------
344
345TCP echo client using the :func:`asyncio.open_connection` function::
346
347    import asyncio
348
349    async def tcp_echo_client(message):
350        reader, writer = await asyncio.open_connection(
351            '127.0.0.1', 8888)
352
353        print(f'Send: {message!r}')
354        writer.write(message.encode())
355
356        data = await reader.read(100)
357        print(f'Received: {data.decode()!r}')
358
359        print('Close the connection')
360        writer.close()
361
362    asyncio.run(tcp_echo_client('Hello World!'))
363
364
365.. seealso::
366
367   The :ref:`TCP echo client protocol <asyncio_example_tcp_echo_client_protocol>`
368   example uses the low-level :meth:`loop.create_connection` method.
369
370
371.. _asyncio-tcp-echo-server-streams:
372
373TCP echo server using streams
374-----------------------------
375
376TCP echo server using the :func:`asyncio.start_server` function::
377
378    import asyncio
379
380    async def handle_echo(reader, writer):
381        data = await reader.read(100)
382        message = data.decode()
383        addr = writer.get_extra_info('peername')
384
385        print(f"Received {message!r} from {addr!r}")
386
387        print(f"Send: {message!r}")
388        writer.write(data)
389        await writer.drain()
390
391        print("Close the connection")
392        writer.close()
393
394    async def main():
395        server = await asyncio.start_server(
396            handle_echo, '127.0.0.1', 8888)
397
398        addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
399        print(f'Serving on {addrs}')
400
401        async with server:
402            await server.serve_forever()
403
404    asyncio.run(main())
405
406
407.. seealso::
408
409   The :ref:`TCP echo server protocol <asyncio_example_tcp_echo_server_protocol>`
410   example uses the :meth:`loop.create_server` method.
411
412
413Get HTTP headers
414----------------
415
416Simple example querying HTTP headers of the URL passed on the command line::
417
418    import asyncio
419    import urllib.parse
420    import sys
421
422    async def print_http_headers(url):
423        url = urllib.parse.urlsplit(url)
424        if url.scheme == 'https':
425            reader, writer = await asyncio.open_connection(
426                url.hostname, 443, ssl=True)
427        else:
428            reader, writer = await asyncio.open_connection(
429                url.hostname, 80)
430
431        query = (
432            f"HEAD {url.path or '/'} HTTP/1.0\r\n"
433            f"Host: {url.hostname}\r\n"
434            f"\r\n"
435        )
436
437        writer.write(query.encode('latin-1'))
438        while True:
439            line = await reader.readline()
440            if not line:
441                break
442
443            line = line.decode('latin1').rstrip()
444            if line:
445                print(f'HTTP header> {line}')
446
447        # Ignore the body, close the socket
448        writer.close()
449
450    url = sys.argv[1]
451    asyncio.run(print_http_headers(url))
452
453
454Usage::
455
456    python example.py http://example.com/path/page.html
457
458or with HTTPS::
459
460    python example.py https://example.com/path/page.html
461
462
463.. _asyncio_example_create_connection-streams:
464
465Register an open socket to wait for data using streams
466------------------------------------------------------
467
468Coroutine waiting until a socket receives data using the
469:func:`open_connection` function::
470
471    import asyncio
472    import socket
473
474    async def wait_for_data():
475        # Get a reference to the current event loop because
476        # we want to access low-level APIs.
477        loop = asyncio.get_running_loop()
478
479        # Create a pair of connected sockets.
480        rsock, wsock = socket.socketpair()
481
482        # Register the open socket to wait for data.
483        reader, writer = await asyncio.open_connection(sock=rsock)
484
485        # Simulate the reception of data from the network
486        loop.call_soon(wsock.send, 'abc'.encode())
487
488        # Wait for data
489        data = await reader.read(100)
490
491        # Got data, we are done: close the socket
492        print("Received:", data.decode())
493        writer.close()
494
495        # Close the second socket
496        wsock.close()
497
498    asyncio.run(wait_for_data())
499
500.. seealso::
501
502   The :ref:`register an open socket to wait for data using a protocol
503   <asyncio_example_create_connection>` example uses a low-level protocol and
504   the :meth:`loop.create_connection` method.
505
506   The :ref:`watch a file descriptor for read events
507   <asyncio_example_watch_fd>` example uses the low-level
508   :meth:`loop.add_reader` method to watch a file descriptor.
509