1.. currentmodule:: asyncio 2 3.. _asyncio-streams: 4 5======= 6Streams 7======= 8 9**Source code:** :source:`Lib/asyncio/streams.py` 10 11------------------------------------------------- 12 13Streams are high-level async/await-ready primitives to work with 14network connections. Streams allow sending and receiving data without 15using callbacks or low-level protocols and transports. 16 17.. _asyncio_example_stream: 18 19Here is an example of a TCP echo client written using asyncio 20streams:: 21 22 import asyncio 23 24 async def tcp_echo_client(message): 25 reader, writer = await asyncio.open_connection( 26 '127.0.0.1', 8888) 27 28 print(f'Send: {message!r}') 29 writer.write(message.encode()) 30 await writer.drain() 31 32 data = await reader.read(100) 33 print(f'Received: {data.decode()!r}') 34 35 print('Close the connection') 36 writer.close() 37 await writer.wait_closed() 38 39 asyncio.run(tcp_echo_client('Hello World!')) 40 41 42See also the `Examples`_ section below. 43 44 45.. rubric:: Stream Functions 46 47The following top-level asyncio functions can be used to create 48and work with streams: 49 50 51.. coroutinefunction:: open_connection(host=None, port=None, *, \ 52 loop=None, limit=None, ssl=None, family=0, \ 53 proto=0, flags=0, sock=None, local_addr=None, \ 54 server_hostname=None, ssl_handshake_timeout=None) 55 56 Establish a network connection and return a pair of 57 ``(reader, writer)`` objects. 58 59 The returned *reader* and *writer* objects are instances of 60 :class:`StreamReader` and :class:`StreamWriter` classes. 61 62 The *loop* argument is optional and can always be determined 63 automatically when this function is awaited from a coroutine. 64 65 *limit* determines the buffer size limit used by the 66 returned :class:`StreamReader` instance. By default the *limit* 67 is set to 64 KiB. 68 69 The rest of the arguments are passed directly to 70 :meth:`loop.create_connection`. 71 72 .. versionadded:: 3.7 73 74 The *ssl_handshake_timeout* parameter. 75 76.. coroutinefunction:: start_server(client_connected_cb, host=None, \ 77 port=None, *, loop=None, limit=None, \ 78 family=socket.AF_UNSPEC, \ 79 flags=socket.AI_PASSIVE, sock=None, \ 80 backlog=100, ssl=None, reuse_address=None, \ 81 reuse_port=None, ssl_handshake_timeout=None, \ 82 start_serving=True) 83 84 Start a socket server. 85 86 The *client_connected_cb* callback is called whenever a new client 87 connection is established. It receives a ``(reader, writer)`` pair 88 as two arguments, instances of the :class:`StreamReader` and 89 :class:`StreamWriter` classes. 90 91 *client_connected_cb* can be a plain callable or a 92 :ref:`coroutine function <coroutine>`; if it is a coroutine function, 93 it will be automatically scheduled as a :class:`Task`. 94 95 The *loop* argument is optional and can always be determined 96 automatically when this method is awaited from a coroutine. 97 98 *limit* determines the buffer size limit used by the 99 returned :class:`StreamReader` instance. By default the *limit* 100 is set to 64 KiB. 101 102 The rest of the arguments are passed directly to 103 :meth:`loop.create_server`. 104 105 .. versionadded:: 3.7 106 107 The *ssl_handshake_timeout* and *start_serving* parameters. 108 109 110.. rubric:: Unix Sockets 111 112.. coroutinefunction:: open_unix_connection(path=None, *, loop=None, \ 113 limit=None, ssl=None, sock=None, \ 114 server_hostname=None, ssl_handshake_timeout=None) 115 116 Establish a Unix socket connection and return a pair of 117 ``(reader, writer)``. 118 119 Similar to :func:`open_connection` but operates on Unix sockets. 120 121 See also the documentation of :meth:`loop.create_unix_connection`. 122 123 .. availability:: Unix. 124 125 .. versionadded:: 3.7 126 127 The *ssl_handshake_timeout* parameter. 128 129 .. versionchanged:: 3.7 130 131 The *path* parameter can now be a :term:`path-like object` 132 133 134.. coroutinefunction:: start_unix_server(client_connected_cb, path=None, \ 135 *, loop=None, limit=None, sock=None, \ 136 backlog=100, ssl=None, ssl_handshake_timeout=None, \ 137 start_serving=True) 138 139 Start a Unix socket server. 140 141 Similar to :func:`start_server` but works with Unix sockets. 142 143 See also the documentation of :meth:`loop.create_unix_server`. 144 145 .. availability:: Unix. 146 147 .. versionadded:: 3.7 148 149 The *ssl_handshake_timeout* and *start_serving* parameters. 150 151 .. versionchanged:: 3.7 152 153 The *path* parameter can now be a :term:`path-like object`. 154 155 156StreamReader 157============ 158 159.. class:: StreamReader 160 161 Represents a reader object that provides APIs to read data 162 from the IO stream. 163 164 It is not recommended to instantiate *StreamReader* objects 165 directly; use :func:`open_connection` and :func:`start_server` 166 instead. 167 168 .. coroutinemethod:: read(n=-1) 169 170 Read up to *n* bytes. If *n* is not provided, or set to ``-1``, 171 read until EOF and return all read bytes. 172 173 If EOF was received and the internal buffer is empty, 174 return an empty ``bytes`` object. 175 176 .. coroutinemethod:: readline() 177 178 Read one line, where "line" is a sequence of bytes 179 ending with ``\n``. 180 181 If EOF is received and ``\n`` was not found, the method 182 returns partially read data. 183 184 If EOF is received and the internal buffer is empty, 185 return an empty ``bytes`` object. 186 187 .. coroutinemethod:: readexactly(n) 188 189 Read exactly *n* bytes. 190 191 Raise an :exc:`IncompleteReadError` if EOF is reached before *n* 192 can be read. Use the :attr:`IncompleteReadError.partial` 193 attribute to get the partially read data. 194 195 .. coroutinemethod:: readuntil(separator=b'\\n') 196 197 Read data from the stream until *separator* is found. 198 199 On success, the data and separator will be removed from the 200 internal buffer (consumed). Returned data will include the 201 separator at the end. 202 203 If the amount of data read exceeds the configured stream limit, a 204 :exc:`LimitOverrunError` exception is raised, and the data 205 is left in the internal buffer and can be read again. 206 207 If EOF is reached before the complete separator is found, 208 an :exc:`IncompleteReadError` exception is raised, and the internal 209 buffer is reset. The :attr:`IncompleteReadError.partial` attribute 210 may contain a portion of the separator. 211 212 .. versionadded:: 3.5.2 213 214 .. method:: at_eof() 215 216 Return ``True`` if the buffer is empty and :meth:`feed_eof` 217 was called. 218 219 220StreamWriter 221============ 222 223.. class:: StreamWriter 224 225 Represents a writer object that provides APIs to write data 226 to the IO stream. 227 228 It is not recommended to instantiate *StreamWriter* objects 229 directly; use :func:`open_connection` and :func:`start_server` 230 instead. 231 232 .. method:: write(data) 233 234 The method attempts to write the *data* to the underlying socket immediately. 235 If that fails, the data is queued in an internal write buffer until it can be 236 sent. 237 238 The method should be used along with the ``drain()`` method:: 239 240 stream.write(data) 241 await stream.drain() 242 243 .. method:: writelines(data) 244 245 The method writes a list (or any iterable) of bytes to the underlying socket 246 immediately. 247 If that fails, the data is queued in an internal write buffer until it can be 248 sent. 249 250 The method should be used along with the ``drain()`` method:: 251 252 stream.writelines(lines) 253 await stream.drain() 254 255 .. method:: close() 256 257 The method closes the stream and the underlying socket. 258 259 The method should be used along with the ``wait_closed()`` method:: 260 261 stream.close() 262 await stream.wait_closed() 263 264 .. method:: can_write_eof() 265 266 Return ``True`` if the underlying transport supports 267 the :meth:`write_eof` method, ``False`` otherwise. 268 269 .. method:: write_eof() 270 271 Close the write end of the stream after the buffered write 272 data is flushed. 273 274 .. attribute:: transport 275 276 Return the underlying asyncio transport. 277 278 .. method:: get_extra_info(name, default=None) 279 280 Access optional transport information; see 281 :meth:`BaseTransport.get_extra_info` for details. 282 283 .. coroutinemethod:: drain() 284 285 Wait until it is appropriate to resume writing to the stream. 286 Example:: 287 288 writer.write(data) 289 await writer.drain() 290 291 This is a flow control method that interacts with the underlying 292 IO write buffer. When the size of the buffer reaches 293 the high watermark, *drain()* blocks until the size of the 294 buffer is drained down to the low watermark and writing can 295 be resumed. When there is nothing to wait for, the :meth:`drain` 296 returns immediately. 297 298 .. method:: is_closing() 299 300 Return ``True`` if the stream is closed or in the process of 301 being closed. 302 303 .. versionadded:: 3.7 304 305 .. coroutinemethod:: wait_closed() 306 307 Wait until the stream is closed. 308 309 Should be called after :meth:`close` to wait until the underlying 310 connection is closed. 311 312 .. versionadded:: 3.7 313 314 315Examples 316======== 317 318.. _asyncio-tcp-echo-client-streams: 319 320TCP echo client using streams 321----------------------------- 322 323TCP echo client using the :func:`asyncio.open_connection` function:: 324 325 import asyncio 326 327 async def tcp_echo_client(message): 328 reader, writer = await asyncio.open_connection( 329 '127.0.0.1', 8888) 330 331 print(f'Send: {message!r}') 332 writer.write(message.encode()) 333 334 data = await reader.read(100) 335 print(f'Received: {data.decode()!r}') 336 337 print('Close the connection') 338 writer.close() 339 340 asyncio.run(tcp_echo_client('Hello World!')) 341 342 343.. seealso:: 344 345 The :ref:`TCP echo client protocol <asyncio_example_tcp_echo_client_protocol>` 346 example uses the low-level :meth:`loop.create_connection` method. 347 348 349.. _asyncio-tcp-echo-server-streams: 350 351TCP echo server using streams 352----------------------------- 353 354TCP echo server using the :func:`asyncio.start_server` function:: 355 356 import asyncio 357 358 async def handle_echo(reader, writer): 359 data = await reader.read(100) 360 message = data.decode() 361 addr = writer.get_extra_info('peername') 362 363 print(f"Received {message!r} from {addr!r}") 364 365 print(f"Send: {message!r}") 366 writer.write(data) 367 await writer.drain() 368 369 print("Close the connection") 370 writer.close() 371 372 async def main(): 373 server = await asyncio.start_server( 374 handle_echo, '127.0.0.1', 8888) 375 376 addr = server.sockets[0].getsockname() 377 print(f'Serving on {addr}') 378 379 async with server: 380 await server.serve_forever() 381 382 asyncio.run(main()) 383 384 385.. seealso:: 386 387 The :ref:`TCP echo server protocol <asyncio_example_tcp_echo_server_protocol>` 388 example uses the :meth:`loop.create_server` method. 389 390 391Get HTTP headers 392---------------- 393 394Simple example querying HTTP headers of the URL passed on the command line:: 395 396 import asyncio 397 import urllib.parse 398 import sys 399 400 async def print_http_headers(url): 401 url = urllib.parse.urlsplit(url) 402 if url.scheme == 'https': 403 reader, writer = await asyncio.open_connection( 404 url.hostname, 443, ssl=True) 405 else: 406 reader, writer = await asyncio.open_connection( 407 url.hostname, 80) 408 409 query = ( 410 f"HEAD {url.path or '/'} HTTP/1.0\r\n" 411 f"Host: {url.hostname}\r\n" 412 f"\r\n" 413 ) 414 415 writer.write(query.encode('latin-1')) 416 while True: 417 line = await reader.readline() 418 if not line: 419 break 420 421 line = line.decode('latin1').rstrip() 422 if line: 423 print(f'HTTP header> {line}') 424 425 # Ignore the body, close the socket 426 writer.close() 427 428 url = sys.argv[1] 429 asyncio.run(print_http_headers(url)) 430 431 432Usage:: 433 434 python example.py http://example.com/path/page.html 435 436or with HTTPS:: 437 438 python example.py https://example.com/path/page.html 439 440 441.. _asyncio_example_create_connection-streams: 442 443Register an open socket to wait for data using streams 444------------------------------------------------------ 445 446Coroutine waiting until a socket receives data using the 447:func:`open_connection` function:: 448 449 import asyncio 450 import socket 451 452 async def wait_for_data(): 453 # Get a reference to the current event loop because 454 # we want to access low-level APIs. 455 loop = asyncio.get_running_loop() 456 457 # Create a pair of connected sockets. 458 rsock, wsock = socket.socketpair() 459 460 # Register the open socket to wait for data. 461 reader, writer = await asyncio.open_connection(sock=rsock) 462 463 # Simulate the reception of data from the network 464 loop.call_soon(wsock.send, 'abc'.encode()) 465 466 # Wait for data 467 data = await reader.read(100) 468 469 # Got data, we are done: close the socket 470 print("Received:", data.decode()) 471 writer.close() 472 473 # Close the second socket 474 wsock.close() 475 476 asyncio.run(wait_for_data()) 477 478.. seealso:: 479 480 The :ref:`register an open socket to wait for data using a protocol 481 <asyncio_example_create_connection>` example uses a low-level protocol and 482 the :meth:`loop.create_connection` method. 483 484 The :ref:`watch a file descriptor for read events 485 <asyncio_example_watch_fd>` example uses the low-level 486 :meth:`loop.add_reader` method to watch a file descriptor. 487