1.. currentmodule:: asyncio
2
3.. _asyncio-streams:
4
5=======
6Streams
7=======
8
9**Source code:** :source:`Lib/asyncio/streams.py`
10
11-------------------------------------------------
12
13Streams are high-level async/await-ready primitives to work with
14network connections.  Streams allow sending and receiving data without
15using callbacks or low-level protocols and transports.
16
17.. _asyncio_example_stream:
18
19Here is an example of a TCP echo client written using asyncio
20streams::
21
22    import asyncio
23
24    async def tcp_echo_client(message):
25        reader, writer = await asyncio.open_connection(
26            '127.0.0.1', 8888)
27
28        print(f'Send: {message!r}')
29        writer.write(message.encode())
30        await writer.drain()
31
32        data = await reader.read(100)
33        print(f'Received: {data.decode()!r}')
34
35        print('Close the connection')
36        writer.close()
37        await writer.wait_closed()
38
39    asyncio.run(tcp_echo_client('Hello World!'))
40
41
42See also the `Examples`_ section below.
43
44
45.. rubric:: Stream Functions
46
47The following top-level asyncio functions can be used to create
48and work with streams:
49
50
51.. coroutinefunction:: open_connection(host=None, port=None, *, \
52                          loop=None, limit=None, ssl=None, family=0, \
53                          proto=0, flags=0, sock=None, local_addr=None, \
54                          server_hostname=None, ssl_handshake_timeout=None)
55
56   Establish a network connection and return a pair of
57   ``(reader, writer)`` objects.
58
59   The returned *reader* and *writer* objects are instances of
60   :class:`StreamReader` and :class:`StreamWriter` classes.
61
62   The *loop* argument is optional and can always be determined
63   automatically when this function is awaited from a coroutine.
64
65   *limit* determines the buffer size limit used by the
66   returned :class:`StreamReader` instance.  By default the *limit*
67   is set to 64 KiB.
68
69   The rest of the arguments are passed directly to
70   :meth:`loop.create_connection`.
71
72   .. versionadded:: 3.7
73
74      The *ssl_handshake_timeout* parameter.
75
76.. coroutinefunction:: start_server(client_connected_cb, host=None, \
77                          port=None, *, loop=None, limit=None, \
78                          family=socket.AF_UNSPEC, \
79                          flags=socket.AI_PASSIVE, sock=None, \
80                          backlog=100, ssl=None, reuse_address=None, \
81                          reuse_port=None, ssl_handshake_timeout=None, \
82                          start_serving=True)
83
84   Start a socket server.
85
86   The *client_connected_cb* callback is called whenever a new client
87   connection is established.  It receives a ``(reader, writer)`` pair
88   as two arguments, instances of the :class:`StreamReader` and
89   :class:`StreamWriter` classes.
90
91   *client_connected_cb* can be a plain callable or a
92   :ref:`coroutine function <coroutine>`; if it is a coroutine function,
93   it will be automatically scheduled as a :class:`Task`.
94
95   The *loop* argument is optional and can always be determined
96   automatically when this method is awaited from a coroutine.
97
98   *limit* determines the buffer size limit used by the
99   returned :class:`StreamReader` instance.  By default the *limit*
100   is set to 64 KiB.
101
102   The rest of the arguments are passed directly to
103   :meth:`loop.create_server`.
104
105   .. versionadded:: 3.7
106
107      The *ssl_handshake_timeout* and *start_serving* parameters.
108
109
110.. rubric:: Unix Sockets
111
112.. coroutinefunction:: open_unix_connection(path=None, *, loop=None, \
113                        limit=None, ssl=None, sock=None, \
114                        server_hostname=None, ssl_handshake_timeout=None)
115
116   Establish a Unix socket connection and return a pair of
117   ``(reader, writer)``.
118
119   Similar to :func:`open_connection` but operates on Unix sockets.
120
121   See also the documentation of :meth:`loop.create_unix_connection`.
122
123   .. availability:: Unix.
124
125   .. versionadded:: 3.7
126
127      The *ssl_handshake_timeout* parameter.
128
129   .. versionchanged:: 3.7
130
131      The *path* parameter can now be a :term:`path-like object`
132
133
134.. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \
135                          *, loop=None, limit=None, sock=None, \
136                          backlog=100, ssl=None, ssl_handshake_timeout=None, \
137                          start_serving=True)
138
139   Start a Unix socket server.
140
141   Similar to :func:`start_server` but works with Unix sockets.
142
143   See also the documentation of :meth:`loop.create_unix_server`.
144
145   .. availability:: Unix.
146
147   .. versionadded:: 3.7
148
149      The *ssl_handshake_timeout* and *start_serving* parameters.
150
151   .. versionchanged:: 3.7
152
153      The *path* parameter can now be a :term:`path-like object`.
154
155
156StreamReader
157============
158
159.. class:: StreamReader
160
161   Represents a reader object that provides APIs to read data
162   from the IO stream.
163
164   It is not recommended to instantiate *StreamReader* objects
165   directly; use :func:`open_connection` and :func:`start_server`
166   instead.
167
168   .. coroutinemethod:: read(n=-1)
169
170      Read up to *n* bytes.  If *n* is not provided, or set to ``-1``,
171      read until EOF and return all read bytes.
172
173      If EOF was received and the internal buffer is empty,
174      return an empty ``bytes`` object.
175
176   .. coroutinemethod:: readline()
177
178      Read one line, where "line" is a sequence of bytes
179      ending with ``\n``.
180
181      If EOF is received and ``\n`` was not found, the method
182      returns partially read data.
183
184      If EOF is received and the internal buffer is empty,
185      return an empty ``bytes`` object.
186
187   .. coroutinemethod:: readexactly(n)
188
189      Read exactly *n* bytes.
190
191      Raise an :exc:`IncompleteReadError` if EOF is reached before *n*
192      can be read.  Use the :attr:`IncompleteReadError.partial`
193      attribute to get the partially read data.
194
195   .. coroutinemethod:: readuntil(separator=b'\\n')
196
197      Read data from the stream until *separator* is found.
198
199      On success, the data and separator will be removed from the
200      internal buffer (consumed). Returned data will include the
201      separator at the end.
202
203      If the amount of data read exceeds the configured stream limit, a
204      :exc:`LimitOverrunError` exception is raised, and the data
205      is left in the internal buffer and can be read again.
206
207      If EOF is reached before the complete separator is found,
208      an :exc:`IncompleteReadError` exception is raised, and the internal
209      buffer is reset.  The :attr:`IncompleteReadError.partial` attribute
210      may contain a portion of the separator.
211
212      .. versionadded:: 3.5.2
213
214   .. method:: at_eof()
215
216      Return ``True`` if the buffer is empty and :meth:`feed_eof`
217      was called.
218
219
220StreamWriter
221============
222
223.. class:: StreamWriter
224
225   Represents a writer object that provides APIs to write data
226   to the IO stream.
227
228   It is not recommended to instantiate *StreamWriter* objects
229   directly; use :func:`open_connection` and :func:`start_server`
230   instead.
231
232   .. method:: write(data)
233
234      The method attempts to write the *data* to the underlying socket immediately.
235      If that fails, the data is queued in an internal write buffer until it can be
236      sent.
237
238      The method should be used along with the ``drain()`` method::
239
240         stream.write(data)
241         await stream.drain()
242
243   .. method:: writelines(data)
244
245      The method writes a list (or any iterable) of bytes to the underlying socket
246      immediately.
247      If that fails, the data is queued in an internal write buffer until it can be
248      sent.
249
250      The method should be used along with the ``drain()`` method::
251
252         stream.writelines(lines)
253         await stream.drain()
254
255   .. method:: close()
256
257      The method closes the stream and the underlying socket.
258
259      The method should be used along with the ``wait_closed()`` method::
260
261         stream.close()
262         await stream.wait_closed()
263
264   .. method:: can_write_eof()
265
266      Return ``True`` if the underlying transport supports
267      the :meth:`write_eof` method, ``False`` otherwise.
268
269   .. method:: write_eof()
270
271      Close the write end of the stream after the buffered write
272      data is flushed.
273
274   .. attribute:: transport
275
276      Return the underlying asyncio transport.
277
278   .. method:: get_extra_info(name, default=None)
279
280      Access optional transport information; see
281      :meth:`BaseTransport.get_extra_info` for details.
282
283   .. coroutinemethod:: drain()
284
285      Wait until it is appropriate to resume writing to the stream.
286      Example::
287
288          writer.write(data)
289          await writer.drain()
290
291      This is a flow control method that interacts with the underlying
292      IO write buffer.  When the size of the buffer reaches
293      the high watermark, *drain()* blocks until the size of the
294      buffer is drained down to the low watermark and writing can
295      be resumed.  When there is nothing to wait for, the :meth:`drain`
296      returns immediately.
297
298   .. method:: is_closing()
299
300      Return ``True`` if the stream is closed or in the process of
301      being closed.
302
303      .. versionadded:: 3.7
304
305   .. coroutinemethod:: wait_closed()
306
307      Wait until the stream is closed.
308
309      Should be called after :meth:`close` to wait until the underlying
310      connection is closed.
311
312      .. versionadded:: 3.7
313
314
315Examples
316========
317
318.. _asyncio-tcp-echo-client-streams:
319
320TCP echo client using streams
321-----------------------------
322
323TCP echo client using the :func:`asyncio.open_connection` function::
324
325    import asyncio
326
327    async def tcp_echo_client(message):
328        reader, writer = await asyncio.open_connection(
329            '127.0.0.1', 8888)
330
331        print(f'Send: {message!r}')
332        writer.write(message.encode())
333
334        data = await reader.read(100)
335        print(f'Received: {data.decode()!r}')
336
337        print('Close the connection')
338        writer.close()
339
340    asyncio.run(tcp_echo_client('Hello World!'))
341
342
343.. seealso::
344
345   The :ref:`TCP echo client protocol <asyncio_example_tcp_echo_client_protocol>`
346   example uses the low-level :meth:`loop.create_connection` method.
347
348
349.. _asyncio-tcp-echo-server-streams:
350
351TCP echo server using streams
352-----------------------------
353
354TCP echo server using the :func:`asyncio.start_server` function::
355
356    import asyncio
357
358    async def handle_echo(reader, writer):
359        data = await reader.read(100)
360        message = data.decode()
361        addr = writer.get_extra_info('peername')
362
363        print(f"Received {message!r} from {addr!r}")
364
365        print(f"Send: {message!r}")
366        writer.write(data)
367        await writer.drain()
368
369        print("Close the connection")
370        writer.close()
371
372    async def main():
373        server = await asyncio.start_server(
374            handle_echo, '127.0.0.1', 8888)
375
376        addr = server.sockets[0].getsockname()
377        print(f'Serving on {addr}')
378
379        async with server:
380            await server.serve_forever()
381
382    asyncio.run(main())
383
384
385.. seealso::
386
387   The :ref:`TCP echo server protocol <asyncio_example_tcp_echo_server_protocol>`
388   example uses the :meth:`loop.create_server` method.
389
390
391Get HTTP headers
392----------------
393
394Simple example querying HTTP headers of the URL passed on the command line::
395
396    import asyncio
397    import urllib.parse
398    import sys
399
400    async def print_http_headers(url):
401        url = urllib.parse.urlsplit(url)
402        if url.scheme == 'https':
403            reader, writer = await asyncio.open_connection(
404                url.hostname, 443, ssl=True)
405        else:
406            reader, writer = await asyncio.open_connection(
407                url.hostname, 80)
408
409        query = (
410            f"HEAD {url.path or '/'} HTTP/1.0\r\n"
411            f"Host: {url.hostname}\r\n"
412            f"\r\n"
413        )
414
415        writer.write(query.encode('latin-1'))
416        while True:
417            line = await reader.readline()
418            if not line:
419                break
420
421            line = line.decode('latin1').rstrip()
422            if line:
423                print(f'HTTP header> {line}')
424
425        # Ignore the body, close the socket
426        writer.close()
427
428    url = sys.argv[1]
429    asyncio.run(print_http_headers(url))
430
431
432Usage::
433
434    python example.py http://example.com/path/page.html
435
436or with HTTPS::
437
438    python example.py https://example.com/path/page.html
439
440
441.. _asyncio_example_create_connection-streams:
442
443Register an open socket to wait for data using streams
444------------------------------------------------------
445
446Coroutine waiting until a socket receives data using the
447:func:`open_connection` function::
448
449    import asyncio
450    import socket
451
452    async def wait_for_data():
453        # Get a reference to the current event loop because
454        # we want to access low-level APIs.
455        loop = asyncio.get_running_loop()
456
457        # Create a pair of connected sockets.
458        rsock, wsock = socket.socketpair()
459
460        # Register the open socket to wait for data.
461        reader, writer = await asyncio.open_connection(sock=rsock)
462
463        # Simulate the reception of data from the network
464        loop.call_soon(wsock.send, 'abc'.encode())
465
466        # Wait for data
467        data = await reader.read(100)
468
469        # Got data, we are done: close the socket
470        print("Received:", data.decode())
471        writer.close()
472
473        # Close the second socket
474        wsock.close()
475
476    asyncio.run(wait_for_data())
477
478.. seealso::
479
480   The :ref:`register an open socket to wait for data using a protocol
481   <asyncio_example_create_connection>` example uses a low-level protocol and
482   the :meth:`loop.create_connection` method.
483
484   The :ref:`watch a file descriptor for read events
485   <asyncio_example_watch_fd>` example uses the low-level
486   :meth:`loop.add_reader` method to watch a file descriptor.
487