1import abc
2import asyncio
3import collections
4import functools
5import io
6import operator
7import pathlib
8import stat
9import sys
10import time
11
12from . import errors
13from .common import (
14    DEFAULT_BLOCK_SIZE,
15    AbstractAsyncLister,
16    AsyncStreamIterator,
17    with_timeout,
18)
19
20__all__ = (
21    "AbstractPathIO",
22    "PathIO",
23    "AsyncPathIO",
24    "MemoryPathIO",
25    "PathIONursery",
26)
27
28
29class AsyncPathIOContext:
30    """
31    Async pathio context.
32
33    Usage:
34    ::
35
36        >>> async with pathio.open(filename) as file_in:
37        ...     async for block in file_in.iter_by_block(size):
38        ...         # do
39
40    or borring:
41    ::
42
43        >>> file = await pathio.open(filename)
44        ... data = await file.read(size)
45        ... await file.write(data)
46        ... await file.close()
47
48    """
49    def __init__(self, pathio, args, kwargs):
50        self.close = None
51        self.pathio = pathio
52        self.args = args
53        self.kwargs = kwargs
54
55    async def __aenter__(self):
56        self.file = await self.pathio._open(*self.args, **self.kwargs)
57        self.seek = functools.partial(self.pathio.seek, self.file)
58        self.write = functools.partial(self.pathio.write, self.file)
59        self.read = functools.partial(self.pathio.read, self.file)
60        self.close = functools.partial(self.pathio.close, self.file)
61        return self
62
63    async def __aexit__(self, *args):
64        if self.close is not None:
65            await self.close()
66
67    def __await__(self):
68        return self.__aenter__().__await__()
69
70    def iter_by_block(self, count=DEFAULT_BLOCK_SIZE):
71        return AsyncStreamIterator(lambda: self.read(count))
72
73
74def universal_exception(coro):
75    """
76    Decorator. Reraising any exception (except `CancelledError` and
77    `NotImplementedError`) with universal exception
78    :py:class:`aioftp.PathIOError`
79    """
80    @functools.wraps(coro)
81    async def wrapper(*args, **kwargs):
82        try:
83            return await coro(*args, **kwargs)
84        except (asyncio.CancelledError, NotImplementedError,
85                StopAsyncIteration):
86            raise
87        except Exception as exc:
88            raise errors.PathIOError(reason=sys.exc_info()) from exc
89
90    return wrapper
91
92
93class PathIONursery:
94
95    def __init__(self, factory):
96        self.factory = factory
97        self.state = None
98
99    def __call__(self, *args, **kwargs):
100        instance = self.factory(*args, state=self.state, **kwargs)
101        if self.state is None:
102            self.state = instance.state
103        return instance
104
105
106def defend_file_methods(coro):
107    """
108    Decorator. Raises exception when file methods called with wrapped by
109    :py:class:`aioftp.AsyncPathIOContext` file object.
110    """
111    @functools.wraps(coro)
112    async def wrapper(self, file, *args, **kwargs):
113        if isinstance(file, AsyncPathIOContext):
114            raise ValueError("Native path io file methods can not be used "
115                             "with wrapped file object")
116        return await coro(self, file, *args, **kwargs)
117    return wrapper
118
119
120class AbstractPathIO(abc.ABC):
121    """
122    Abstract class for path io operations.
123
124    :param timeout: timeout used by `with_timeout` decorator
125    :type timeout: :py:class:`float`, :py:class:`int` or `None`
126
127    :param connection: server connection that is accessing this PathIO
128    :type connection: :py:class:`aioftp.Connection`
129
130    :param state: shared pathio state per server
131    """
132    def __init__(self, timeout=None, connection=None, state=None):
133        self.timeout = timeout
134        self.connection = connection
135
136    @property
137    def state(self):
138        """
139        Shared pathio state per server
140        """
141
142    @universal_exception
143    @abc.abstractmethod
144    async def exists(self, path):
145        """
146        :py:func:`asyncio.coroutine`
147
148        Check if path exists
149
150        :param path: path to check
151        :type path: :py:class:`pathlib.Path`
152
153        :rtype: :py:class:`bool`
154        """
155
156    @universal_exception
157    @abc.abstractmethod
158    async def is_dir(self, path):
159        """
160        :py:func:`asyncio.coroutine`
161
162        Check if path is directory
163
164        :param path: path to check
165        :type path: :py:class:`pathlib.Path`
166
167        :rtype: :py:class:`bool`
168        """
169
170    @universal_exception
171    @abc.abstractmethod
172    async def is_file(self, path):
173        """
174        :py:func:`asyncio.coroutine`
175
176        Check if path is file
177
178        :param path: path to check
179        :type path: :py:class:`pathlib.Path`
180
181        :rtype: :py:class:`bool`
182        """
183
184    @universal_exception
185    @abc.abstractmethod
186    async def mkdir(self, path, *, parents=False, exist_ok=False):
187        """
188        :py:func:`asyncio.coroutine`
189
190        Make directory
191
192        :param path: path to create
193        :type path: :py:class:`pathlib.Path`
194
195        :param parents: create parents is does not exists
196        :type parents: :py:class:`bool`
197
198        :param exist_ok: do not raise exception if directory already exists
199        :type exist_ok: :py:class:`bool`
200        """
201
202    @universal_exception
203    @abc.abstractmethod
204    async def rmdir(self, path):
205        """
206        :py:func:`asyncio.coroutine`
207
208        Remove directory
209
210        :param path: path to remove
211        :type path: :py:class:`pathlib.Path`
212        """
213
214    @universal_exception
215    @abc.abstractmethod
216    async def unlink(self, path):
217        """
218        :py:func:`asyncio.coroutine`
219
220        Remove file
221
222        :param path: path to remove
223        :type path: :py:class:`pathlib.Path`
224        """
225
226    @abc.abstractmethod
227    def list(self, path):
228        """
229        Create instance of subclass of :py:class:`aioftp.AbstractAsyncLister`.
230        You should subclass and implement `__anext__` method
231        for :py:class:`aioftp.AbstractAsyncLister` and return new instance.
232
233        :param path: path to list
234        :type path: :py:class:`pathlib.Path`
235
236        :rtype: :py:class:`aioftp.AbstractAsyncLister`
237
238        Usage:
239        ::
240
241            >>> async for p in pathio.list(path):
242            ...     # do
243
244        or borring instance of :py:class:`list`:
245        ::
246
247            >>> paths = await pathio.list(path)
248            >>> paths
249            [path, path, path, ...]
250
251        """
252
253    @universal_exception
254    @abc.abstractmethod
255    async def stat(self, path):
256        """
257        :py:func:`asyncio.coroutine`
258
259        Get path stats
260
261        :param path: path, which stats need
262        :type path: :py:class:`pathlib.Path`
263
264        :return: path stats. For proper work you need only this stats:
265          st_size, st_mtime, st_ctime, st_nlink, st_mode
266        :rtype: same as :py:class:`os.stat_result`
267        """
268
269    @universal_exception
270    @abc.abstractmethod
271    async def _open(self, path, mode):
272        """
273        :py:func:`asyncio.coroutine`
274
275        Open file. You should implement "mode" argument, which can be:
276        "rb", "wb", "ab" (read, write, append. all binary). Return type depends
277        on implementation, anyway the only place you need this file-object
278        is in your implementation of read, write and close
279
280        :param path: path to create
281        :type path: :py:class:`pathlib.Path`
282
283        :param mode: specifies the mode in which the file is opened ("rb",
284            "wb", "ab", "r+b" (read, write, append, read/write, all binary))
285        :type mode: :py:class:`str`
286
287        :return: file-object
288        """
289
290    def open(self, *args, **kwargs):
291        """
292        Create instance of :py:class:`aioftp.pathio.AsyncPathIOContext`,
293        parameters passed to :py:meth:`aioftp.AbstractPathIO._open`
294
295        :rtype: :py:class:`aioftp.pathio.AsyncPathIOContext`
296        """
297        return AsyncPathIOContext(self, args, kwargs)
298
299    @universal_exception
300    @defend_file_methods
301    @abc.abstractmethod
302    async def seek(self, file, offset, whence=io.SEEK_SET):
303        """
304        :py:func:`asyncio.coroutine`
305
306        Change the stream position to the given byte `offset`. Same behaviour
307        as :py:meth:`io.IOBase.seek`
308
309        :param file: file-object from :py:class:`aioftp.AbstractPathIO.open`
310
311        :param offset: relative byte offset
312        :type offset: :py:class:`int`
313
314        :param whence: base position for offset
315        :type whence: :py:class:`int`
316        """
317
318    @universal_exception
319    @defend_file_methods
320    @abc.abstractmethod
321    async def write(self, file, data):
322        """
323        :py:func:`asyncio.coroutine`
324
325        Write some data to file
326
327        :param file: file-object from :py:class:`aioftp.AbstractPathIO.open`
328
329        :param data: data to write
330        :type data: :py:class:`bytes`
331        """
332
333    @universal_exception
334    @defend_file_methods
335    @abc.abstractmethod
336    async def read(self, file, block_size):
337        """
338        :py:func:`asyncio.coroutine`
339
340        Read some data from file
341
342        :param file: file-object from :py:class:`aioftp.AbstractPathIO.open`
343
344        :param block_size: bytes count to read
345        :type block_size: :py:class:`int`
346
347        :rtype: :py:class:`bytes`
348        """
349
350    @universal_exception
351    @defend_file_methods
352    @abc.abstractmethod
353    async def close(self, file):
354        """
355        :py:func:`asyncio.coroutine`
356
357        Close file
358
359        :param file: file-object from :py:class:`aioftp.AbstractPathIO.open`
360        """
361
362    @universal_exception
363    @abc.abstractmethod
364    async def rename(self, source, destination):
365        """
366        :py:func:`asyncio.coroutine`
367
368        Rename path
369
370        :param source: rename from
371        :type source: :py:class:`pathlib.Path`
372
373        :param destination: rename to
374        :type destination: :py:class:`pathlib.Path`
375        """
376
377
378class PathIO(AbstractPathIO):
379    """
380    Blocking path io. Directly based on :py:class:`pathlib.Path` methods.
381    """
382
383    @universal_exception
384    async def exists(self, path):
385        return path.exists()
386
387    @universal_exception
388    async def is_dir(self, path):
389        return path.is_dir()
390
391    @universal_exception
392    async def is_file(self, path):
393        return path.is_file()
394
395    @universal_exception
396    async def mkdir(self, path, *, parents=False, exist_ok=False):
397        return path.mkdir(parents=parents, exist_ok=exist_ok)
398
399    @universal_exception
400    async def rmdir(self, path):
401        return path.rmdir()
402
403    @universal_exception
404    async def unlink(self, path):
405        return path.unlink()
406
407    def list(self, path):
408
409        class Lister(AbstractAsyncLister):
410            iter = None
411
412            @universal_exception
413            async def __anext__(self):
414                if self.iter is None:
415                    self.iter = path.glob("*")
416                try:
417                    return next(self.iter)
418                except StopIteration:
419                    raise StopAsyncIteration
420
421        return Lister(timeout=self.timeout)
422
423    @universal_exception
424    async def stat(self, path):
425        return path.stat()
426
427    @universal_exception
428    async def _open(self, path, *args, **kwargs):
429        return path.open(*args, **kwargs)
430
431    @universal_exception
432    @defend_file_methods
433    async def seek(self, file, *args, **kwargs):
434        return file.seek(*args, **kwargs)
435
436    @universal_exception
437    @defend_file_methods
438    async def write(self, file, *args, **kwargs):
439        return file.write(*args, **kwargs)
440
441    @universal_exception
442    @defend_file_methods
443    async def read(self, file, *args, **kwargs):
444        return file.read(*args, **kwargs)
445
446    @universal_exception
447    @defend_file_methods
448    async def close(self, file):
449        return file.close()
450
451    @universal_exception
452    async def rename(self, source, destination):
453        return source.rename(destination)
454
455
456def _blocking_io(f):
457    @functools.wraps(f)
458    async def wrapper(self, *args, **kwargs):
459        return await asyncio.get_running_loop().run_in_executor(
460            self.executor,
461            functools.partial(f, self, *args, **kwargs),
462        )
463    return wrapper
464
465
466class AsyncPathIO(AbstractPathIO):
467    """
468    Non-blocking path io. Based on
469    :py:meth:`asyncio.BaseEventLoop.run_in_executor` and
470    :py:class:`pathlib.Path` methods. It's really slow, so it's better to avoid
471    usage of this path io layer.
472
473    :param executor: executor for running blocking tasks
474    :type executor: :py:class:`concurrent.futures.Executor`
475    """
476    def __init__(self, *args, executor=None, **kwargs):
477        super().__init__(*args, **kwargs)
478        self.executor = executor
479
480    @universal_exception
481    @with_timeout
482    @_blocking_io
483    def exists(self, path):
484        return path.exists()
485
486    @universal_exception
487    @with_timeout
488    @_blocking_io
489    def is_dir(self, path):
490        return path.is_dir()
491
492    @universal_exception
493    @with_timeout
494    @_blocking_io
495    def is_file(self, path):
496        return path.is_file()
497
498    @universal_exception
499    @with_timeout
500    @_blocking_io
501    def mkdir(self, path, *, parents=False, exist_ok=False):
502        return path.mkdir(parents=parents, exist_ok=exist_ok)
503
504    @universal_exception
505    @with_timeout
506    @_blocking_io
507    def rmdir(self, path):
508        return path.rmdir()
509
510    @universal_exception
511    @with_timeout
512    @_blocking_io
513    def unlink(self, path):
514        return path.unlink()
515
516    def list(self, path):
517
518        class Lister(AbstractAsyncLister):
519            iter = None
520
521            def __init__(self, *args, executor=None, **kwargs):
522                super().__init__(*args, **kwargs)
523                self.executor = executor
524
525            def worker(self):
526                try:
527                    return next(self.iter)
528                except StopIteration:
529                    raise StopAsyncIteration
530
531            @universal_exception
532            @with_timeout
533            @_blocking_io
534            def __anext__(self):
535                if self.iter is None:
536                    self.iter = path.glob("*")
537                return self.worker()
538
539        return Lister(timeout=self.timeout, executor=self.executor)
540
541    @universal_exception
542    @with_timeout
543    @_blocking_io
544    def stat(self, path):
545        return path.stat()
546
547    @universal_exception
548    @with_timeout
549    @_blocking_io
550    def _open(self, path, *args, **kwargs):
551        return path.open(*args, **kwargs)
552
553    @universal_exception
554    @defend_file_methods
555    @with_timeout
556    @_blocking_io
557    def seek(self, file, *args, **kwargs):
558        return file.seek(*args, **kwargs)
559
560    @universal_exception
561    @defend_file_methods
562    @with_timeout
563    @_blocking_io
564    def write(self, file, *args, **kwargs):
565        return file.write(*args, **kwargs)
566
567    @universal_exception
568    @defend_file_methods
569    @with_timeout
570    @_blocking_io
571    def read(self, file, *args, **kwargs):
572        return file.read(*args, **kwargs)
573
574    @universal_exception
575    @defend_file_methods
576    @with_timeout
577    @_blocking_io
578    def close(self, file):
579        return file.close()
580
581    @universal_exception
582    @with_timeout
583    @_blocking_io
584    def rename(self, source, destination):
585        return source.rename(destination)
586
587
588class Node:
589
590    def __init__(self, type, name, ctime=None, mtime=None, *, content):
591        self.type = type
592        self.name = name
593        self.ctime = ctime or int(time.time())
594        self.mtime = mtime or int(time.time())
595        self.content = content
596
597    def __repr__(self):
598        return f"{self.__class__.__name__}(type={self.type!r}, " \
599               f"name={self.name!r}, ctime={self.ctime!r}, " \
600               f"mtime={self.mtime!r}, content={self.content!r})"
601
602
603class MemoryPathIO(AbstractPathIO):
604    """
605    Non-blocking path io. Based on in-memory tree. It is just proof of concept
606    and probably not so fast as it can be.
607    """
608
609    Stats = collections.namedtuple(
610        "Stats",
611        (
612            "st_size",
613            "st_ctime",
614            "st_mtime",
615            "st_nlink",
616            "st_mode",
617        )
618    )
619
620    def __init__(self, *args, state=None, cwd=None, **kwargs):
621        super().__init__(*args, **kwargs)
622        self.cwd = pathlib.PurePosixPath(cwd or "/")
623        if state is None:
624            self.fs = [Node("dir", "/", content=[])]
625        else:
626            self.fs = state
627
628    @property
629    def state(self):
630        return self.fs
631
632    def __repr__(self):
633        return repr(self.fs)
634
635    def _absolute(self, path):
636        if not path.is_absolute():
637            path = self.cwd / path
638        return path
639
640    def get_node(self, path):
641        nodes = self.fs
642        node = None
643        path = self._absolute(path)
644        for part in path.parts:
645            if not isinstance(nodes, list):
646                return
647            for node in nodes:
648                if node.name == part:
649                    nodes = node.content
650                    break
651            else:
652                return
653        return node
654
655    @universal_exception
656    async def exists(self, path):
657        return self.get_node(path) is not None
658
659    @universal_exception
660    async def is_dir(self, path):
661        node = self.get_node(path)
662        return not (node is None or node.type != "dir")
663
664    @universal_exception
665    async def is_file(self, path):
666        node = self.get_node(path)
667        return not (node is None or node.type != "file")
668
669    @universal_exception
670    async def mkdir(self, path, *, parents=False, exist_ok=False):
671        path = self._absolute(path)
672        node = self.get_node(path)
673        if node:
674            if node.type != "dir" or not exist_ok:
675                raise FileExistsError
676        elif not parents:
677            parent = self.get_node(path.parent)
678            if parent is None:
679                raise FileNotFoundError
680            if parent.type != "dir":
681                raise NotADirectoryError
682            node = Node("dir", path.name, content=[])
683            parent.content.append(node)
684        else:
685            nodes = self.fs
686            for part in path.parts:
687                if isinstance(nodes, list):
688                    for node in nodes:
689                        if node.name == part:
690                            nodes = node.content
691                            break
692                    else:
693                        node = Node("dir", part, content=[])
694                        nodes.append(node)
695                        nodes = node.content
696                else:
697                    raise NotADirectoryError
698
699    @universal_exception
700    async def rmdir(self, path):
701        node = self.get_node(path)
702        if node is None:
703            raise FileNotFoundError
704        if node.type != "dir":
705            raise NotADirectoryError
706        if node.content:
707            raise OSError("Directory not empty")
708
709        parent = self.get_node(path.parent)
710        for i, node in enumerate(parent.content):
711            if node.name == path.name:
712                break
713        parent.content.pop(i)
714
715    @universal_exception
716    async def unlink(self, path):
717        node = self.get_node(path)
718        if node is None:
719            raise FileNotFoundError
720        if node.type != "file":
721            raise IsADirectoryError
722
723        parent = self.get_node(path.parent)
724        for i, node in enumerate(parent.content):
725            if node.name == path.name:
726                break
727        parent.content.pop(i)
728
729    def list(self, path):
730
731        class Lister(AbstractAsyncLister):
732            iter = None
733
734            @universal_exception
735            async def __anext__(cls):
736                if cls.iter is None:
737                    node = self.get_node(path)
738                    if node is None or node.type != "dir":
739                        cls.iter = iter(())
740                    else:
741                        names = map(operator.attrgetter("name"), node.content)
742                        paths = map(lambda name: path / name, names)
743                        cls.iter = iter(paths)
744                try:
745                    return next(cls.iter)
746                except StopIteration:
747                    raise StopAsyncIteration
748
749        return Lister(timeout=self.timeout)
750
751    @universal_exception
752    async def stat(self, path):
753        node = self.get_node(path)
754        if node is None:
755            raise FileNotFoundError
756
757        if node.type == "file":
758            size = len(node.content.getbuffer())
759            mode = stat.S_IFREG | 0o666
760        else:
761            size = 0
762            mode = stat.S_IFDIR | 0o777
763        return MemoryPathIO.Stats(
764            size,
765            node.ctime,
766            node.mtime,
767            1,
768            mode,
769        )
770
771    @universal_exception
772    async def _open(self, path, mode="rb", *args, **kwargs):
773        if mode == "rb":
774            node = self.get_node(path)
775            if node is None:
776                raise FileNotFoundError
777            file_like = node.content
778            file_like.seek(0, io.SEEK_SET)
779        elif mode in ("wb", "ab", "r+b"):
780            node = self.get_node(path)
781            if node is None:
782                parent = self.get_node(path.parent)
783                if parent is None or parent.type != "dir":
784                    raise FileNotFoundError
785                new_node = Node("file", path.name, content=io.BytesIO())
786                parent.content.append(new_node)
787                file_like = new_node.content
788            elif node.type != "file":
789                raise IsADirectoryError
790            else:
791                if mode == "wb":
792                    file_like = node.content = io.BytesIO()
793                elif mode == "ab":
794                    file_like = node.content
795                    file_like.seek(0, io.SEEK_END)
796                elif mode == "r+b":
797                    file_like = node.content
798                    file_like.seek(0, io.SEEK_SET)
799        else:
800            raise ValueError(f"invalid mode: {mode}")
801        return file_like
802
803    @universal_exception
804    @defend_file_methods
805    async def seek(self, file, *args, **kwargs):
806        return file.seek(*args, **kwargs)
807
808    @universal_exception
809    @defend_file_methods
810    async def write(self, file, *args, **kwargs):
811        file.write(*args, **kwargs)
812        file.mtime = int(time.time())
813
814    @universal_exception
815    @defend_file_methods
816    async def read(self, file, *args, **kwargs):
817        return file.read(*args, **kwargs)
818
819    @universal_exception
820    @defend_file_methods
821    async def close(self, file):
822        pass
823
824    @universal_exception
825    async def rename(self, source, destination):
826        if source != destination:
827            sparent = self.get_node(source.parent)
828            dparent = self.get_node(destination.parent)
829            snode = self.get_node(source)
830            if None in (snode, dparent):
831                raise FileNotFoundError
832            for i, node in enumerate(sparent.content):
833                if node.name == source.name:
834                    sparent.content.pop(i)
835            snode.name = destination.name
836            for i, node in enumerate(dparent.content):
837                if node.name == destination.name:
838                    dparent.content[i] = snode
839                    break
840            else:
841                dparent.content.append(snode)
842