1import os 2import pathlib 3import sys 4from dataclasses import dataclass 5from functools import partial 6from os import PathLike 7from typing import ( 8 IO, TYPE_CHECKING, Any, AnyStr, AsyncIterator, Callable, Generic, Iterable, Iterator, List, 9 Optional, Sequence, Tuple, Union, cast, overload) 10 11from .. import to_thread 12from ..abc import AsyncResource 13 14if sys.version_info >= (3, 8): 15 from typing import Final 16else: 17 from typing_extensions import Final 18 19if TYPE_CHECKING: 20 from _typeshed import OpenBinaryMode, OpenTextMode, ReadableBuffer, WriteableBuffer 21else: 22 ReadableBuffer = OpenBinaryMode = OpenTextMode = WriteableBuffer = object 23 24 25class AsyncFile(AsyncResource, Generic[AnyStr]): 26 """ 27 An asynchronous file object. 28 29 This class wraps a standard file object and provides async friendly versions of the following 30 blocking methods (where available on the original file object): 31 32 * read 33 * read1 34 * readline 35 * readlines 36 * readinto 37 * readinto1 38 * write 39 * writelines 40 * truncate 41 * seek 42 * tell 43 * flush 44 45 All other methods are directly passed through. 46 47 This class supports the asynchronous context manager protocol which closes the underlying file 48 at the end of the context block. 49 50 This class also supports asynchronous iteration:: 51 52 async with await open_file(...) as f: 53 async for line in f: 54 print(line) 55 """ 56 57 def __init__(self, fp: IO[AnyStr]) -> None: 58 self._fp: Any = fp 59 60 def __getattr__(self, name: str) -> object: 61 return getattr(self._fp, name) 62 63 @property 64 def wrapped(self) -> IO[AnyStr]: 65 """The wrapped file object.""" 66 return self._fp 67 68 async def __aiter__(self) -> AsyncIterator[AnyStr]: 69 while True: 70 line = await self.readline() 71 if line: 72 yield line 73 else: 74 break 75 76 async def aclose(self) -> None: 77 return await to_thread.run_sync(self._fp.close) 78 79 async def read(self, size: int = -1) -> AnyStr: 80 return await to_thread.run_sync(self._fp.read, size) 81 82 async def read1(self: 'AsyncFile[bytes]', size: int = -1) -> bytes: 83 return await to_thread.run_sync(self._fp.read1, size) 84 85 async def readline(self) -> AnyStr: 86 return await to_thread.run_sync(self._fp.readline) 87 88 async def readlines(self) -> List[AnyStr]: 89 return await to_thread.run_sync(self._fp.readlines) 90 91 async def readinto(self: 'AsyncFile[bytes]', b: WriteableBuffer) -> bytes: 92 return await to_thread.run_sync(self._fp.readinto, b) 93 94 async def readinto1(self: 'AsyncFile[bytes]', b: WriteableBuffer) -> bytes: 95 return await to_thread.run_sync(self._fp.readinto1, b) 96 97 @overload 98 async def write(self: 'AsyncFile[bytes]', b: ReadableBuffer) -> int: ... 99 100 @overload 101 async def write(self: 'AsyncFile[str]', b: str) -> int: ... 102 103 async def write(self, b: Union[ReadableBuffer, str]) -> int: 104 return await to_thread.run_sync(self._fp.write, b) 105 106 @overload 107 async def writelines(self: 'AsyncFile[bytes]', lines: Iterable[ReadableBuffer]) -> None: ... 108 109 @overload 110 async def writelines(self: 'AsyncFile[str]', lines: Iterable[str]) -> None: ... 111 112 async def writelines(self, lines: Union[Iterable[ReadableBuffer], Iterable[str]]) -> None: 113 return await to_thread.run_sync(self._fp.writelines, lines) 114 115 async def truncate(self, size: Optional[int] = None) -> int: 116 return await to_thread.run_sync(self._fp.truncate, size) 117 118 async def seek(self, offset: int, whence: Optional[int] = os.SEEK_SET) -> int: 119 return await to_thread.run_sync(self._fp.seek, offset, whence) 120 121 async def tell(self) -> int: 122 return await to_thread.run_sync(self._fp.tell) 123 124 async def flush(self) -> None: 125 return await to_thread.run_sync(self._fp.flush) 126 127 128@overload 129async def open_file(file: Union[str, PathLike, int], mode: OpenBinaryMode, 130 buffering: int = ..., encoding: Optional[str] = ..., 131 errors: Optional[str] = ..., newline: Optional[str] = ..., closefd: bool = ..., 132 opener: Optional[Callable[[str, int], int]] = ...) -> AsyncFile[bytes]: 133 ... 134 135 136@overload 137async def open_file(file: Union[str, PathLike, int], mode: OpenTextMode = ..., 138 buffering: int = ..., encoding: Optional[str] = ..., 139 errors: Optional[str] = ..., newline: Optional[str] = ..., closefd: bool = ..., 140 opener: Optional[Callable[[str, int], int]] = ...) -> AsyncFile[str]: 141 ... 142 143 144async def open_file(file: Union[str, PathLike, int], mode: str = 'r', buffering: int = -1, 145 encoding: Optional[str] = None, errors: Optional[str] = None, 146 newline: Optional[str] = None, closefd: bool = True, 147 opener: Optional[Callable[[str, int], int]] = None) -> AsyncFile: 148 """ 149 Open a file asynchronously. 150 151 The arguments are exactly the same as for the builtin :func:`open`. 152 153 :return: an asynchronous file object 154 155 """ 156 fp = await to_thread.run_sync(open, file, mode, buffering, encoding, errors, newline, 157 closefd, opener) 158 return AsyncFile(fp) 159 160 161def wrap_file(file: IO[AnyStr]) -> AsyncFile[AnyStr]: 162 """ 163 Wrap an existing file as an asynchronous file. 164 165 :param file: an existing file-like object 166 :return: an asynchronous file object 167 168 """ 169 return AsyncFile(file) 170 171 172@dataclass(eq=False) 173class _PathIterator(AsyncIterator['Path']): 174 iterator: Iterator[PathLike] 175 176 async def __anext__(self) -> 'Path': 177 nextval = await to_thread.run_sync(next, self.iterator, None, cancellable=True) 178 if nextval is None: 179 raise StopAsyncIteration from None 180 181 return Path(cast(PathLike, nextval)) 182 183 184class Path: 185 """ 186 An asynchronous version of :class:`pathlib.Path`. 187 188 This class cannot be substituted for :class:`pathlib.Path` or :class:`pathlib.PurePath`, but 189 it is compatible with the :class:`os.PathLike` interface. 190 191 It implements the Python 3.10 version of :class:`pathlib.Path` interface, except for the 192 deprecated :meth:`~pathlib.Path.link_to` method. 193 194 Any methods that do disk I/O need to be awaited on. These methods are: 195 196 * :meth:`~pathlib.Path.absolute` 197 * :meth:`~pathlib.Path.chmod` 198 * :meth:`~pathlib.Path.cwd` 199 * :meth:`~pathlib.Path.exists` 200 * :meth:`~pathlib.Path.expanduser` 201 * :meth:`~pathlib.Path.group` 202 * :meth:`~pathlib.Path.hardlink_to` 203 * :meth:`~pathlib.Path.home` 204 * :meth:`~pathlib.Path.is_block_device` 205 * :meth:`~pathlib.Path.is_char_device` 206 * :meth:`~pathlib.Path.is_dir` 207 * :meth:`~pathlib.Path.is_fifo` 208 * :meth:`~pathlib.Path.is_file` 209 * :meth:`~pathlib.Path.is_mount` 210 * :meth:`~pathlib.Path.lchmod` 211 * :meth:`~pathlib.Path.lstat` 212 * :meth:`~pathlib.Path.mkdir` 213 * :meth:`~pathlib.Path.open` 214 * :meth:`~pathlib.Path.owner` 215 * :meth:`~pathlib.Path.read_bytes` 216 * :meth:`~pathlib.Path.read_text` 217 * :meth:`~pathlib.Path.readlink` 218 * :meth:`~pathlib.Path.rename` 219 * :meth:`~pathlib.Path.replace` 220 * :meth:`~pathlib.Path.rmdir` 221 * :meth:`~pathlib.Path.samefile` 222 * :meth:`~pathlib.Path.stat` 223 * :meth:`~pathlib.Path.touch` 224 * :meth:`~pathlib.Path.unlink` 225 * :meth:`~pathlib.Path.write_bytes` 226 * :meth:`~pathlib.Path.write_text` 227 228 Additionally, the following methods return an async iterator yielding :class:`~.Path` objects: 229 230 * :meth:`~pathlib.Path.glob` 231 * :meth:`~pathlib.Path.iterdir` 232 * :meth:`~pathlib.Path.rglob` 233 """ 234 235 __slots__ = '_path', '__weakref__' 236 237 def __init__(self, *args: Union[str, PathLike]) -> None: 238 self._path: Final[pathlib.Path] = pathlib.Path(*args) 239 240 def __fspath__(self) -> str: 241 return self._path.__fspath__() 242 243 def __str__(self) -> str: 244 return self._path.__str__() 245 246 def __repr__(self) -> str: 247 return f'{self.__class__.__name__}({self.as_posix()!r})' 248 249 def __bytes__(self) -> bytes: 250 return self._path.__bytes__() 251 252 def __hash__(self) -> int: 253 return self._path.__hash__() 254 255 def __eq__(self, other: object) -> bool: 256 target = other._path if isinstance(other, Path) else other 257 return self._path.__eq__(target) 258 259 def __lt__(self, other: 'Path') -> bool: 260 target = other._path if isinstance(other, Path) else other 261 return self._path.__lt__(target) 262 263 def __le__(self, other: 'Path') -> bool: 264 target = other._path if isinstance(other, Path) else other 265 return self._path.__le__(target) 266 267 def __gt__(self, other: 'Path') -> bool: 268 target = other._path if isinstance(other, Path) else other 269 return self._path.__gt__(target) 270 271 def __ge__(self, other: 'Path') -> bool: 272 target = other._path if isinstance(other, Path) else other 273 return self._path.__ge__(target) 274 275 def __truediv__(self, other: Any) -> 'Path': 276 return Path(self._path / other) 277 278 def __rtruediv__(self, other: Any) -> 'Path': 279 return Path(other) / self 280 281 @property 282 def parts(self) -> Tuple[str, ...]: 283 return self._path.parts 284 285 @property 286 def drive(self) -> str: 287 return self._path.drive 288 289 @property 290 def root(self) -> str: 291 return self._path.root 292 293 @property 294 def anchor(self) -> str: 295 return self._path.anchor 296 297 @property 298 def parents(self) -> Sequence['Path']: 299 return tuple(Path(p) for p in self._path.parents) 300 301 @property 302 def parent(self) -> 'Path': 303 return Path(self._path.parent) 304 305 @property 306 def name(self) -> str: 307 return self._path.name 308 309 @property 310 def suffix(self) -> str: 311 return self._path.suffix 312 313 @property 314 def suffixes(self) -> List[str]: 315 return self._path.suffixes 316 317 @property 318 def stem(self) -> str: 319 return self._path.stem 320 321 async def absolute(self) -> 'Path': 322 path = await to_thread.run_sync(self._path.absolute) 323 return Path(path) 324 325 def as_posix(self) -> str: 326 return self._path.as_posix() 327 328 def as_uri(self) -> str: 329 return self._path.as_uri() 330 331 def match(self, path_pattern: str) -> bool: 332 return self._path.match(path_pattern) 333 334 def is_relative_to(self, *other: Union[str, PathLike]) -> bool: 335 try: 336 self.relative_to(*other) 337 return True 338 except ValueError: 339 return False 340 341 async def chmod(self, mode: int, *, follow_symlinks: bool = True) -> None: 342 func = partial(os.chmod, follow_symlinks=follow_symlinks) 343 return await to_thread.run_sync(func, self._path, mode) 344 345 @classmethod 346 async def cwd(cls) -> 'Path': 347 path = await to_thread.run_sync(pathlib.Path.cwd) 348 return cls(path) 349 350 async def exists(self) -> bool: 351 return await to_thread.run_sync(self._path.exists, cancellable=True) 352 353 async def expanduser(self) -> 'Path': 354 return Path(await to_thread.run_sync(self._path.expanduser, cancellable=True)) 355 356 def glob(self, pattern: str) -> AsyncIterator['Path']: 357 gen = self._path.glob(pattern) 358 return _PathIterator(gen) 359 360 async def group(self) -> str: 361 return await to_thread.run_sync(self._path.group, cancellable=True) 362 363 async def hardlink_to(self, target: Union[str, pathlib.Path, 'Path']) -> None: 364 if isinstance(target, Path): 365 target = target._path 366 367 await to_thread.run_sync(os.link, target, self) 368 369 @classmethod 370 async def home(cls) -> 'Path': 371 home_path = await to_thread.run_sync(pathlib.Path.home) 372 return cls(home_path) 373 374 def is_absolute(self) -> bool: 375 return self._path.is_absolute() 376 377 async def is_block_device(self) -> bool: 378 return await to_thread.run_sync(self._path.is_block_device, cancellable=True) 379 380 async def is_char_device(self) -> bool: 381 return await to_thread.run_sync(self._path.is_char_device, cancellable=True) 382 383 async def is_dir(self) -> bool: 384 return await to_thread.run_sync(self._path.is_dir, cancellable=True) 385 386 async def is_fifo(self) -> bool: 387 return await to_thread.run_sync(self._path.is_fifo, cancellable=True) 388 389 async def is_file(self) -> bool: 390 return await to_thread.run_sync(self._path.is_file, cancellable=True) 391 392 async def is_mount(self) -> bool: 393 return await to_thread.run_sync(os.path.ismount, self._path, cancellable=True) 394 395 def is_reserved(self) -> bool: 396 return self._path.is_reserved() 397 398 async def is_socket(self) -> bool: 399 return await to_thread.run_sync(self._path.is_socket, cancellable=True) 400 401 async def is_symlink(self) -> bool: 402 return await to_thread.run_sync(self._path.is_symlink, cancellable=True) 403 404 def iterdir(self) -> AsyncIterator['Path']: 405 gen = self._path.iterdir() 406 return _PathIterator(gen) 407 408 def joinpath(self, *args: Union[str, 'PathLike[str]']) -> 'Path': 409 return Path(self._path.joinpath(*args)) 410 411 async def lchmod(self, mode: int) -> None: 412 await to_thread.run_sync(self._path.lchmod, mode) 413 414 async def lstat(self) -> os.stat_result: 415 return await to_thread.run_sync(self._path.lstat, cancellable=True) 416 417 async def mkdir(self, mode: int = 0o777, parents: bool = False, 418 exist_ok: bool = False) -> None: 419 await to_thread.run_sync(self._path.mkdir, mode, parents, exist_ok) 420 421 @overload 422 async def open(self, mode: OpenBinaryMode, buffering: int = ..., encoding: Optional[str] = ..., 423 errors: Optional[str] = ..., newline: Optional[str] = ...) -> AsyncFile[bytes]: 424 ... 425 426 @overload 427 async def open(self, mode: OpenTextMode = ..., buffering: int = ..., 428 encoding: Optional[str] = ..., errors: Optional[str] = ..., 429 newline: Optional[str] = ...) -> AsyncFile[str]: 430 ... 431 432 async def open(self, mode: str = 'r', buffering: int = -1, encoding: Optional[str] = None, 433 errors: Optional[str] = None, newline: Optional[str] = None) -> AsyncFile: 434 fp = await to_thread.run_sync(self._path.open, mode, buffering, encoding, errors, newline) 435 return AsyncFile(fp) 436 437 async def owner(self) -> str: 438 return await to_thread.run_sync(self._path.owner, cancellable=True) 439 440 async def read_bytes(self) -> bytes: 441 return await to_thread.run_sync(self._path.read_bytes) 442 443 async def read_text(self, encoding: Optional[str] = None, errors: Optional[str] = None) -> str: 444 return await to_thread.run_sync(self._path.read_text, encoding, errors) 445 446 def relative_to(self, *other: Union[str, PathLike]) -> 'Path': 447 return Path(self._path.relative_to(*other)) 448 449 async def readlink(self) -> 'Path': 450 target = await to_thread.run_sync(os.readlink, self._path) 451 return Path(cast(str, target)) 452 453 async def rename(self, target: Union[str, pathlib.PurePath, 'Path']) -> 'Path': 454 if isinstance(target, Path): 455 target = target._path 456 457 await to_thread.run_sync(self._path.rename, target) 458 return Path(target) 459 460 async def replace(self, target: Union[str, pathlib.PurePath, 'Path']) -> 'Path': 461 if isinstance(target, Path): 462 target = target._path 463 464 await to_thread.run_sync(self._path.replace, target) 465 return Path(target) 466 467 async def resolve(self, strict: bool = False) -> 'Path': 468 func = partial(self._path.resolve, strict=strict) 469 return Path(await to_thread.run_sync(func, cancellable=True)) 470 471 def rglob(self, pattern: str) -> AsyncIterator['Path']: 472 gen = self._path.rglob(pattern) 473 return _PathIterator(gen) 474 475 async def rmdir(self) -> None: 476 await to_thread.run_sync(self._path.rmdir) 477 478 async def samefile(self, other_path: Union[str, bytes, int, pathlib.Path, 'Path']) -> bool: 479 if isinstance(other_path, Path): 480 other_path = other_path._path 481 482 return await to_thread.run_sync(self._path.samefile, other_path, cancellable=True) 483 484 async def stat(self, *, follow_symlinks: bool = True) -> os.stat_result: 485 func = partial(os.stat, follow_symlinks=follow_symlinks) 486 return await to_thread.run_sync(func, self._path, cancellable=True) 487 488 async def symlink_to(self, target: Union[str, pathlib.Path, 'Path'], 489 target_is_directory: bool = False) -> None: 490 if isinstance(target, Path): 491 target = target._path 492 493 await to_thread.run_sync(self._path.symlink_to, target, target_is_directory) 494 495 async def touch(self, mode: int = 0o666, exist_ok: bool = True) -> None: 496 await to_thread.run_sync(self._path.touch, mode, exist_ok) 497 498 async def unlink(self, missing_ok: bool = False) -> None: 499 try: 500 await to_thread.run_sync(self._path.unlink) 501 except FileNotFoundError: 502 if not missing_ok: 503 raise 504 505 def with_name(self, name: str) -> 'Path': 506 return Path(self._path.with_name(name)) 507 508 def with_stem(self, stem: str) -> 'Path': 509 return Path(self._path.with_name(stem + self._path.suffix)) 510 511 def with_suffix(self, suffix: str) -> 'Path': 512 return Path(self._path.with_suffix(suffix)) 513 514 async def write_bytes(self, data: bytes) -> int: 515 return await to_thread.run_sync(self._path.write_bytes, data) 516 517 async def write_text(self, data: str, encoding: Optional[str] = None, 518 errors: Optional[str] = None, newline: Optional[str] = None) -> int: 519 # Path.write_text() does not support the "newline" parameter before Python 3.10 520 def sync_write_text() -> int: 521 with self._path.open('w', encoding=encoding, errors=errors, newline=newline) as fp: 522 return fp.write(data) 523 524 return await to_thread.run_sync(sync_write_text) 525 526 527PathLike.register(Path) 528