1import os
2import pathlib
3import sys
4from dataclasses import dataclass
5from functools import partial
6from os import PathLike
7from typing import (
8    IO, TYPE_CHECKING, Any, AnyStr, AsyncIterator, Callable, Generic, Iterable, Iterator, List,
9    Optional, Sequence, Tuple, Union, cast, overload)
10
11from .. import to_thread
12from ..abc import AsyncResource
13
14if sys.version_info >= (3, 8):
15    from typing import Final
16else:
17    from typing_extensions import Final
18
19if TYPE_CHECKING:
20    from _typeshed import OpenBinaryMode, OpenTextMode, ReadableBuffer, WriteableBuffer
21else:
22    ReadableBuffer = OpenBinaryMode = OpenTextMode = WriteableBuffer = object
23
24
25class AsyncFile(AsyncResource, Generic[AnyStr]):
26    """
27    An asynchronous file object.
28
29    This class wraps a standard file object and provides async friendly versions of the following
30    blocking methods (where available on the original file object):
31
32    * read
33    * read1
34    * readline
35    * readlines
36    * readinto
37    * readinto1
38    * write
39    * writelines
40    * truncate
41    * seek
42    * tell
43    * flush
44
45    All other methods are directly passed through.
46
47    This class supports the asynchronous context manager protocol which closes the underlying file
48    at the end of the context block.
49
50    This class also supports asynchronous iteration::
51
52        async with await open_file(...) as f:
53            async for line in f:
54                print(line)
55    """
56
57    def __init__(self, fp: IO[AnyStr]) -> None:
58        self._fp: Any = fp
59
60    def __getattr__(self, name: str) -> object:
61        return getattr(self._fp, name)
62
63    @property
64    def wrapped(self) -> IO[AnyStr]:
65        """The wrapped file object."""
66        return self._fp
67
68    async def __aiter__(self) -> AsyncIterator[AnyStr]:
69        while True:
70            line = await self.readline()
71            if line:
72                yield line
73            else:
74                break
75
76    async def aclose(self) -> None:
77        return await to_thread.run_sync(self._fp.close)
78
79    async def read(self, size: int = -1) -> AnyStr:
80        return await to_thread.run_sync(self._fp.read, size)
81
82    async def read1(self: 'AsyncFile[bytes]', size: int = -1) -> bytes:
83        return await to_thread.run_sync(self._fp.read1, size)
84
85    async def readline(self) -> AnyStr:
86        return await to_thread.run_sync(self._fp.readline)
87
88    async def readlines(self) -> List[AnyStr]:
89        return await to_thread.run_sync(self._fp.readlines)
90
91    async def readinto(self: 'AsyncFile[bytes]', b: WriteableBuffer) -> bytes:
92        return await to_thread.run_sync(self._fp.readinto, b)
93
94    async def readinto1(self: 'AsyncFile[bytes]', b: WriteableBuffer) -> bytes:
95        return await to_thread.run_sync(self._fp.readinto1, b)
96
97    @overload
98    async def write(self: 'AsyncFile[bytes]', b: ReadableBuffer) -> int: ...
99
100    @overload
101    async def write(self: 'AsyncFile[str]', b: str) -> int: ...
102
103    async def write(self, b: Union[ReadableBuffer, str]) -> int:
104        return await to_thread.run_sync(self._fp.write, b)
105
106    @overload
107    async def writelines(self: 'AsyncFile[bytes]', lines: Iterable[ReadableBuffer]) -> None: ...
108
109    @overload
110    async def writelines(self: 'AsyncFile[str]', lines: Iterable[str]) -> None: ...
111
112    async def writelines(self, lines: Union[Iterable[ReadableBuffer], Iterable[str]]) -> None:
113        return await to_thread.run_sync(self._fp.writelines, lines)
114
115    async def truncate(self, size: Optional[int] = None) -> int:
116        return await to_thread.run_sync(self._fp.truncate, size)
117
118    async def seek(self, offset: int, whence: Optional[int] = os.SEEK_SET) -> int:
119        return await to_thread.run_sync(self._fp.seek, offset, whence)
120
121    async def tell(self) -> int:
122        return await to_thread.run_sync(self._fp.tell)
123
124    async def flush(self) -> None:
125        return await to_thread.run_sync(self._fp.flush)
126
127
128@overload
129async def open_file(file: Union[str, PathLike, int], mode: OpenBinaryMode,
130                    buffering: int = ..., encoding: Optional[str] = ...,
131                    errors: Optional[str] = ..., newline: Optional[str] = ..., closefd: bool = ...,
132                    opener: Optional[Callable[[str, int], int]] = ...) -> AsyncFile[bytes]:
133    ...
134
135
136@overload
137async def open_file(file: Union[str, PathLike, int], mode: OpenTextMode = ...,
138                    buffering: int = ..., encoding: Optional[str] = ...,
139                    errors: Optional[str] = ..., newline: Optional[str] = ..., closefd: bool = ...,
140                    opener: Optional[Callable[[str, int], int]] = ...) -> AsyncFile[str]:
141    ...
142
143
144async def open_file(file: Union[str, PathLike, int], mode: str = 'r', buffering: int = -1,
145                    encoding: Optional[str] = None, errors: Optional[str] = None,
146                    newline: Optional[str] = None, closefd: bool = True,
147                    opener: Optional[Callable[[str, int], int]] = None) -> AsyncFile:
148    """
149    Open a file asynchronously.
150
151    The arguments are exactly the same as for the builtin :func:`open`.
152
153    :return: an asynchronous file object
154
155    """
156    fp = await to_thread.run_sync(open, file, mode, buffering, encoding, errors, newline,
157                                  closefd, opener)
158    return AsyncFile(fp)
159
160
161def wrap_file(file: IO[AnyStr]) -> AsyncFile[AnyStr]:
162    """
163    Wrap an existing file as an asynchronous file.
164
165    :param file: an existing file-like object
166    :return: an asynchronous file object
167
168    """
169    return AsyncFile(file)
170
171
172@dataclass(eq=False)
173class _PathIterator(AsyncIterator['Path']):
174    iterator: Iterator[PathLike]
175
176    async def __anext__(self) -> 'Path':
177        nextval = await to_thread.run_sync(next, self.iterator, None, cancellable=True)
178        if nextval is None:
179            raise StopAsyncIteration from None
180
181        return Path(cast(PathLike, nextval))
182
183
184class Path:
185    """
186    An asynchronous version of :class:`pathlib.Path`.
187
188    This class cannot be substituted for :class:`pathlib.Path` or :class:`pathlib.PurePath`, but
189    it is compatible with the :class:`os.PathLike` interface.
190
191    It implements the Python 3.10 version of :class:`pathlib.Path` interface, except for the
192    deprecated :meth:`~pathlib.Path.link_to` method.
193
194    Any methods that do disk I/O need to be awaited on. These methods are:
195
196    * :meth:`~pathlib.Path.absolute`
197    * :meth:`~pathlib.Path.chmod`
198    * :meth:`~pathlib.Path.cwd`
199    * :meth:`~pathlib.Path.exists`
200    * :meth:`~pathlib.Path.expanduser`
201    * :meth:`~pathlib.Path.group`
202    * :meth:`~pathlib.Path.hardlink_to`
203    * :meth:`~pathlib.Path.home`
204    * :meth:`~pathlib.Path.is_block_device`
205    * :meth:`~pathlib.Path.is_char_device`
206    * :meth:`~pathlib.Path.is_dir`
207    * :meth:`~pathlib.Path.is_fifo`
208    * :meth:`~pathlib.Path.is_file`
209    * :meth:`~pathlib.Path.is_mount`
210    * :meth:`~pathlib.Path.lchmod`
211    * :meth:`~pathlib.Path.lstat`
212    * :meth:`~pathlib.Path.mkdir`
213    * :meth:`~pathlib.Path.open`
214    * :meth:`~pathlib.Path.owner`
215    * :meth:`~pathlib.Path.read_bytes`
216    * :meth:`~pathlib.Path.read_text`
217    * :meth:`~pathlib.Path.readlink`
218    * :meth:`~pathlib.Path.rename`
219    * :meth:`~pathlib.Path.replace`
220    * :meth:`~pathlib.Path.rmdir`
221    * :meth:`~pathlib.Path.samefile`
222    * :meth:`~pathlib.Path.stat`
223    * :meth:`~pathlib.Path.touch`
224    * :meth:`~pathlib.Path.unlink`
225    * :meth:`~pathlib.Path.write_bytes`
226    * :meth:`~pathlib.Path.write_text`
227
228    Additionally, the following methods return an async iterator yielding :class:`~.Path` objects:
229
230    * :meth:`~pathlib.Path.glob`
231    * :meth:`~pathlib.Path.iterdir`
232    * :meth:`~pathlib.Path.rglob`
233    """
234
235    __slots__ = '_path', '__weakref__'
236
237    def __init__(self, *args: Union[str, PathLike]) -> None:
238        self._path: Final[pathlib.Path] = pathlib.Path(*args)
239
240    def __fspath__(self) -> str:
241        return self._path.__fspath__()
242
243    def __str__(self) -> str:
244        return self._path.__str__()
245
246    def __repr__(self) -> str:
247        return f'{self.__class__.__name__}({self.as_posix()!r})'
248
249    def __bytes__(self) -> bytes:
250        return self._path.__bytes__()
251
252    def __hash__(self) -> int:
253        return self._path.__hash__()
254
255    def __eq__(self, other: object) -> bool:
256        target = other._path if isinstance(other, Path) else other
257        return self._path.__eq__(target)
258
259    def __lt__(self, other: 'Path') -> bool:
260        target = other._path if isinstance(other, Path) else other
261        return self._path.__lt__(target)
262
263    def __le__(self, other: 'Path') -> bool:
264        target = other._path if isinstance(other, Path) else other
265        return self._path.__le__(target)
266
267    def __gt__(self, other: 'Path') -> bool:
268        target = other._path if isinstance(other, Path) else other
269        return self._path.__gt__(target)
270
271    def __ge__(self, other: 'Path') -> bool:
272        target = other._path if isinstance(other, Path) else other
273        return self._path.__ge__(target)
274
275    def __truediv__(self, other: Any) -> 'Path':
276        return Path(self._path / other)
277
278    def __rtruediv__(self, other: Any) -> 'Path':
279        return Path(other) / self
280
281    @property
282    def parts(self) -> Tuple[str, ...]:
283        return self._path.parts
284
285    @property
286    def drive(self) -> str:
287        return self._path.drive
288
289    @property
290    def root(self) -> str:
291        return self._path.root
292
293    @property
294    def anchor(self) -> str:
295        return self._path.anchor
296
297    @property
298    def parents(self) -> Sequence['Path']:
299        return tuple(Path(p) for p in self._path.parents)
300
301    @property
302    def parent(self) -> 'Path':
303        return Path(self._path.parent)
304
305    @property
306    def name(self) -> str:
307        return self._path.name
308
309    @property
310    def suffix(self) -> str:
311        return self._path.suffix
312
313    @property
314    def suffixes(self) -> List[str]:
315        return self._path.suffixes
316
317    @property
318    def stem(self) -> str:
319        return self._path.stem
320
321    async def absolute(self) -> 'Path':
322        path = await to_thread.run_sync(self._path.absolute)
323        return Path(path)
324
325    def as_posix(self) -> str:
326        return self._path.as_posix()
327
328    def as_uri(self) -> str:
329        return self._path.as_uri()
330
331    def match(self, path_pattern: str) -> bool:
332        return self._path.match(path_pattern)
333
334    def is_relative_to(self, *other: Union[str, PathLike]) -> bool:
335        try:
336            self.relative_to(*other)
337            return True
338        except ValueError:
339            return False
340
341    async def chmod(self, mode: int, *, follow_symlinks: bool = True) -> None:
342        func = partial(os.chmod, follow_symlinks=follow_symlinks)
343        return await to_thread.run_sync(func, self._path, mode)
344
345    @classmethod
346    async def cwd(cls) -> 'Path':
347        path = await to_thread.run_sync(pathlib.Path.cwd)
348        return cls(path)
349
350    async def exists(self) -> bool:
351        return await to_thread.run_sync(self._path.exists, cancellable=True)
352
353    async def expanduser(self) -> 'Path':
354        return Path(await to_thread.run_sync(self._path.expanduser, cancellable=True))
355
356    def glob(self, pattern: str) -> AsyncIterator['Path']:
357        gen = self._path.glob(pattern)
358        return _PathIterator(gen)
359
360    async def group(self) -> str:
361        return await to_thread.run_sync(self._path.group, cancellable=True)
362
363    async def hardlink_to(self, target: Union[str, pathlib.Path, 'Path']) -> None:
364        if isinstance(target, Path):
365            target = target._path
366
367        await to_thread.run_sync(os.link, target, self)
368
369    @classmethod
370    async def home(cls) -> 'Path':
371        home_path = await to_thread.run_sync(pathlib.Path.home)
372        return cls(home_path)
373
374    def is_absolute(self) -> bool:
375        return self._path.is_absolute()
376
377    async def is_block_device(self) -> bool:
378        return await to_thread.run_sync(self._path.is_block_device, cancellable=True)
379
380    async def is_char_device(self) -> bool:
381        return await to_thread.run_sync(self._path.is_char_device, cancellable=True)
382
383    async def is_dir(self) -> bool:
384        return await to_thread.run_sync(self._path.is_dir, cancellable=True)
385
386    async def is_fifo(self) -> bool:
387        return await to_thread.run_sync(self._path.is_fifo, cancellable=True)
388
389    async def is_file(self) -> bool:
390        return await to_thread.run_sync(self._path.is_file, cancellable=True)
391
392    async def is_mount(self) -> bool:
393        return await to_thread.run_sync(os.path.ismount, self._path, cancellable=True)
394
395    def is_reserved(self) -> bool:
396        return self._path.is_reserved()
397
398    async def is_socket(self) -> bool:
399        return await to_thread.run_sync(self._path.is_socket, cancellable=True)
400
401    async def is_symlink(self) -> bool:
402        return await to_thread.run_sync(self._path.is_symlink, cancellable=True)
403
404    def iterdir(self) -> AsyncIterator['Path']:
405        gen = self._path.iterdir()
406        return _PathIterator(gen)
407
408    def joinpath(self, *args: Union[str, 'PathLike[str]']) -> 'Path':
409        return Path(self._path.joinpath(*args))
410
411    async def lchmod(self, mode: int) -> None:
412        await to_thread.run_sync(self._path.lchmod, mode)
413
414    async def lstat(self) -> os.stat_result:
415        return await to_thread.run_sync(self._path.lstat, cancellable=True)
416
417    async def mkdir(self, mode: int = 0o777, parents: bool = False,
418                    exist_ok: bool = False) -> None:
419        await to_thread.run_sync(self._path.mkdir, mode, parents, exist_ok)
420
421    @overload
422    async def open(self, mode: OpenBinaryMode, buffering: int = ..., encoding: Optional[str] = ...,
423                   errors: Optional[str] = ..., newline: Optional[str] = ...) -> AsyncFile[bytes]:
424        ...
425
426    @overload
427    async def open(self, mode: OpenTextMode = ..., buffering: int = ...,
428                   encoding: Optional[str] = ..., errors: Optional[str] = ...,
429                   newline: Optional[str] = ...) -> AsyncFile[str]:
430        ...
431
432    async def open(self, mode: str = 'r', buffering: int = -1, encoding: Optional[str] = None,
433                   errors: Optional[str] = None, newline: Optional[str] = None) -> AsyncFile:
434        fp = await to_thread.run_sync(self._path.open, mode, buffering, encoding, errors, newline)
435        return AsyncFile(fp)
436
437    async def owner(self) -> str:
438        return await to_thread.run_sync(self._path.owner, cancellable=True)
439
440    async def read_bytes(self) -> bytes:
441        return await to_thread.run_sync(self._path.read_bytes)
442
443    async def read_text(self, encoding: Optional[str] = None, errors: Optional[str] = None) -> str:
444        return await to_thread.run_sync(self._path.read_text, encoding, errors)
445
446    def relative_to(self, *other: Union[str, PathLike]) -> 'Path':
447        return Path(self._path.relative_to(*other))
448
449    async def readlink(self) -> 'Path':
450        target = await to_thread.run_sync(os.readlink, self._path)
451        return Path(cast(str, target))
452
453    async def rename(self, target: Union[str, pathlib.PurePath, 'Path']) -> 'Path':
454        if isinstance(target, Path):
455            target = target._path
456
457        await to_thread.run_sync(self._path.rename, target)
458        return Path(target)
459
460    async def replace(self, target: Union[str, pathlib.PurePath, 'Path']) -> 'Path':
461        if isinstance(target, Path):
462            target = target._path
463
464        await to_thread.run_sync(self._path.replace, target)
465        return Path(target)
466
467    async def resolve(self, strict: bool = False) -> 'Path':
468        func = partial(self._path.resolve, strict=strict)
469        return Path(await to_thread.run_sync(func, cancellable=True))
470
471    def rglob(self, pattern: str) -> AsyncIterator['Path']:
472        gen = self._path.rglob(pattern)
473        return _PathIterator(gen)
474
475    async def rmdir(self) -> None:
476        await to_thread.run_sync(self._path.rmdir)
477
478    async def samefile(self, other_path: Union[str, bytes, int, pathlib.Path, 'Path']) -> bool:
479        if isinstance(other_path, Path):
480            other_path = other_path._path
481
482        return await to_thread.run_sync(self._path.samefile, other_path, cancellable=True)
483
484    async def stat(self, *, follow_symlinks: bool = True) -> os.stat_result:
485        func = partial(os.stat, follow_symlinks=follow_symlinks)
486        return await to_thread.run_sync(func, self._path, cancellable=True)
487
488    async def symlink_to(self, target: Union[str, pathlib.Path, 'Path'],
489                         target_is_directory: bool = False) -> None:
490        if isinstance(target, Path):
491            target = target._path
492
493        await to_thread.run_sync(self._path.symlink_to, target, target_is_directory)
494
495    async def touch(self, mode: int = 0o666, exist_ok: bool = True) -> None:
496        await to_thread.run_sync(self._path.touch, mode, exist_ok)
497
498    async def unlink(self, missing_ok: bool = False) -> None:
499        try:
500            await to_thread.run_sync(self._path.unlink)
501        except FileNotFoundError:
502            if not missing_ok:
503                raise
504
505    def with_name(self, name: str) -> 'Path':
506        return Path(self._path.with_name(name))
507
508    def with_stem(self, stem: str) -> 'Path':
509        return Path(self._path.with_name(stem + self._path.suffix))
510
511    def with_suffix(self, suffix: str) -> 'Path':
512        return Path(self._path.with_suffix(suffix))
513
514    async def write_bytes(self, data: bytes) -> int:
515        return await to_thread.run_sync(self._path.write_bytes, data)
516
517    async def write_text(self, data: str, encoding: Optional[str] = None,
518                         errors: Optional[str] = None, newline: Optional[str] = None) -> int:
519        # Path.write_text() does not support the "newline" parameter before Python 3.10
520        def sync_write_text() -> int:
521            with self._path.open('w', encoding=encoding, errors=errors, newline=newline) as fp:
522                return fp.write(data)
523
524        return await to_thread.run_sync(sync_write_text)
525
526
527PathLike.register(Path)
528