1import abc 2import asyncio 3import collections 4import functools 5import io 6import operator 7import pathlib 8import stat 9import sys 10import time 11 12from . import errors 13from .common import ( 14 DEFAULT_BLOCK_SIZE, 15 AbstractAsyncLister, 16 AsyncStreamIterator, 17 with_timeout, 18) 19 20__all__ = ( 21 "AbstractPathIO", 22 "PathIO", 23 "AsyncPathIO", 24 "MemoryPathIO", 25 "PathIONursery", 26) 27 28 29class AsyncPathIOContext: 30 """ 31 Async pathio context. 32 33 Usage: 34 :: 35 36 >>> async with pathio.open(filename) as file_in: 37 ... async for block in file_in.iter_by_block(size): 38 ... # do 39 40 or borring: 41 :: 42 43 >>> file = await pathio.open(filename) 44 ... data = await file.read(size) 45 ... await file.write(data) 46 ... await file.close() 47 48 """ 49 def __init__(self, pathio, args, kwargs): 50 self.close = None 51 self.pathio = pathio 52 self.args = args 53 self.kwargs = kwargs 54 55 async def __aenter__(self): 56 self.file = await self.pathio._open(*self.args, **self.kwargs) 57 self.seek = functools.partial(self.pathio.seek, self.file) 58 self.write = functools.partial(self.pathio.write, self.file) 59 self.read = functools.partial(self.pathio.read, self.file) 60 self.close = functools.partial(self.pathio.close, self.file) 61 return self 62 63 async def __aexit__(self, *args): 64 if self.close is not None: 65 await self.close() 66 67 def __await__(self): 68 return self.__aenter__().__await__() 69 70 def iter_by_block(self, count=DEFAULT_BLOCK_SIZE): 71 return AsyncStreamIterator(lambda: self.read(count)) 72 73 74def universal_exception(coro): 75 """ 76 Decorator. Reraising any exception (except `CancelledError` and 77 `NotImplementedError`) with universal exception 78 :py:class:`aioftp.PathIOError` 79 """ 80 @functools.wraps(coro) 81 async def wrapper(*args, **kwargs): 82 try: 83 return await coro(*args, **kwargs) 84 except (asyncio.CancelledError, NotImplementedError, 85 StopAsyncIteration): 86 raise 87 except Exception as exc: 88 raise errors.PathIOError(reason=sys.exc_info()) from exc 89 90 return wrapper 91 92 93class PathIONursery: 94 95 def __init__(self, factory): 96 self.factory = factory 97 self.state = None 98 99 def __call__(self, *args, **kwargs): 100 instance = self.factory(*args, state=self.state, **kwargs) 101 if self.state is None: 102 self.state = instance.state 103 return instance 104 105 106def defend_file_methods(coro): 107 """ 108 Decorator. Raises exception when file methods called with wrapped by 109 :py:class:`aioftp.AsyncPathIOContext` file object. 110 """ 111 @functools.wraps(coro) 112 async def wrapper(self, file, *args, **kwargs): 113 if isinstance(file, AsyncPathIOContext): 114 raise ValueError("Native path io file methods can not be used " 115 "with wrapped file object") 116 return await coro(self, file, *args, **kwargs) 117 return wrapper 118 119 120class AbstractPathIO(abc.ABC): 121 """ 122 Abstract class for path io operations. 123 124 :param timeout: timeout used by `with_timeout` decorator 125 :type timeout: :py:class:`float`, :py:class:`int` or `None` 126 127 :param connection: server connection that is accessing this PathIO 128 :type connection: :py:class:`aioftp.Connection` 129 130 :param state: shared pathio state per server 131 """ 132 def __init__(self, timeout=None, connection=None, state=None): 133 self.timeout = timeout 134 self.connection = connection 135 136 @property 137 def state(self): 138 """ 139 Shared pathio state per server 140 """ 141 142 @universal_exception 143 @abc.abstractmethod 144 async def exists(self, path): 145 """ 146 :py:func:`asyncio.coroutine` 147 148 Check if path exists 149 150 :param path: path to check 151 :type path: :py:class:`pathlib.Path` 152 153 :rtype: :py:class:`bool` 154 """ 155 156 @universal_exception 157 @abc.abstractmethod 158 async def is_dir(self, path): 159 """ 160 :py:func:`asyncio.coroutine` 161 162 Check if path is directory 163 164 :param path: path to check 165 :type path: :py:class:`pathlib.Path` 166 167 :rtype: :py:class:`bool` 168 """ 169 170 @universal_exception 171 @abc.abstractmethod 172 async def is_file(self, path): 173 """ 174 :py:func:`asyncio.coroutine` 175 176 Check if path is file 177 178 :param path: path to check 179 :type path: :py:class:`pathlib.Path` 180 181 :rtype: :py:class:`bool` 182 """ 183 184 @universal_exception 185 @abc.abstractmethod 186 async def mkdir(self, path, *, parents=False, exist_ok=False): 187 """ 188 :py:func:`asyncio.coroutine` 189 190 Make directory 191 192 :param path: path to create 193 :type path: :py:class:`pathlib.Path` 194 195 :param parents: create parents is does not exists 196 :type parents: :py:class:`bool` 197 198 :param exist_ok: do not raise exception if directory already exists 199 :type exist_ok: :py:class:`bool` 200 """ 201 202 @universal_exception 203 @abc.abstractmethod 204 async def rmdir(self, path): 205 """ 206 :py:func:`asyncio.coroutine` 207 208 Remove directory 209 210 :param path: path to remove 211 :type path: :py:class:`pathlib.Path` 212 """ 213 214 @universal_exception 215 @abc.abstractmethod 216 async def unlink(self, path): 217 """ 218 :py:func:`asyncio.coroutine` 219 220 Remove file 221 222 :param path: path to remove 223 :type path: :py:class:`pathlib.Path` 224 """ 225 226 @abc.abstractmethod 227 def list(self, path): 228 """ 229 Create instance of subclass of :py:class:`aioftp.AbstractAsyncLister`. 230 You should subclass and implement `__anext__` method 231 for :py:class:`aioftp.AbstractAsyncLister` and return new instance. 232 233 :param path: path to list 234 :type path: :py:class:`pathlib.Path` 235 236 :rtype: :py:class:`aioftp.AbstractAsyncLister` 237 238 Usage: 239 :: 240 241 >>> async for p in pathio.list(path): 242 ... # do 243 244 or borring instance of :py:class:`list`: 245 :: 246 247 >>> paths = await pathio.list(path) 248 >>> paths 249 [path, path, path, ...] 250 251 """ 252 253 @universal_exception 254 @abc.abstractmethod 255 async def stat(self, path): 256 """ 257 :py:func:`asyncio.coroutine` 258 259 Get path stats 260 261 :param path: path, which stats need 262 :type path: :py:class:`pathlib.Path` 263 264 :return: path stats. For proper work you need only this stats: 265 st_size, st_mtime, st_ctime, st_nlink, st_mode 266 :rtype: same as :py:class:`os.stat_result` 267 """ 268 269 @universal_exception 270 @abc.abstractmethod 271 async def _open(self, path, mode): 272 """ 273 :py:func:`asyncio.coroutine` 274 275 Open file. You should implement "mode" argument, which can be: 276 "rb", "wb", "ab" (read, write, append. all binary). Return type depends 277 on implementation, anyway the only place you need this file-object 278 is in your implementation of read, write and close 279 280 :param path: path to create 281 :type path: :py:class:`pathlib.Path` 282 283 :param mode: specifies the mode in which the file is opened ("rb", 284 "wb", "ab", "r+b" (read, write, append, read/write, all binary)) 285 :type mode: :py:class:`str` 286 287 :return: file-object 288 """ 289 290 def open(self, *args, **kwargs): 291 """ 292 Create instance of :py:class:`aioftp.pathio.AsyncPathIOContext`, 293 parameters passed to :py:meth:`aioftp.AbstractPathIO._open` 294 295 :rtype: :py:class:`aioftp.pathio.AsyncPathIOContext` 296 """ 297 return AsyncPathIOContext(self, args, kwargs) 298 299 @universal_exception 300 @defend_file_methods 301 @abc.abstractmethod 302 async def seek(self, file, offset, whence=io.SEEK_SET): 303 """ 304 :py:func:`asyncio.coroutine` 305 306 Change the stream position to the given byte `offset`. Same behaviour 307 as :py:meth:`io.IOBase.seek` 308 309 :param file: file-object from :py:class:`aioftp.AbstractPathIO.open` 310 311 :param offset: relative byte offset 312 :type offset: :py:class:`int` 313 314 :param whence: base position for offset 315 :type whence: :py:class:`int` 316 """ 317 318 @universal_exception 319 @defend_file_methods 320 @abc.abstractmethod 321 async def write(self, file, data): 322 """ 323 :py:func:`asyncio.coroutine` 324 325 Write some data to file 326 327 :param file: file-object from :py:class:`aioftp.AbstractPathIO.open` 328 329 :param data: data to write 330 :type data: :py:class:`bytes` 331 """ 332 333 @universal_exception 334 @defend_file_methods 335 @abc.abstractmethod 336 async def read(self, file, block_size): 337 """ 338 :py:func:`asyncio.coroutine` 339 340 Read some data from file 341 342 :param file: file-object from :py:class:`aioftp.AbstractPathIO.open` 343 344 :param block_size: bytes count to read 345 :type block_size: :py:class:`int` 346 347 :rtype: :py:class:`bytes` 348 """ 349 350 @universal_exception 351 @defend_file_methods 352 @abc.abstractmethod 353 async def close(self, file): 354 """ 355 :py:func:`asyncio.coroutine` 356 357 Close file 358 359 :param file: file-object from :py:class:`aioftp.AbstractPathIO.open` 360 """ 361 362 @universal_exception 363 @abc.abstractmethod 364 async def rename(self, source, destination): 365 """ 366 :py:func:`asyncio.coroutine` 367 368 Rename path 369 370 :param source: rename from 371 :type source: :py:class:`pathlib.Path` 372 373 :param destination: rename to 374 :type destination: :py:class:`pathlib.Path` 375 """ 376 377 378class PathIO(AbstractPathIO): 379 """ 380 Blocking path io. Directly based on :py:class:`pathlib.Path` methods. 381 """ 382 383 @universal_exception 384 async def exists(self, path): 385 return path.exists() 386 387 @universal_exception 388 async def is_dir(self, path): 389 return path.is_dir() 390 391 @universal_exception 392 async def is_file(self, path): 393 return path.is_file() 394 395 @universal_exception 396 async def mkdir(self, path, *, parents=False, exist_ok=False): 397 return path.mkdir(parents=parents, exist_ok=exist_ok) 398 399 @universal_exception 400 async def rmdir(self, path): 401 return path.rmdir() 402 403 @universal_exception 404 async def unlink(self, path): 405 return path.unlink() 406 407 def list(self, path): 408 409 class Lister(AbstractAsyncLister): 410 iter = None 411 412 @universal_exception 413 async def __anext__(self): 414 if self.iter is None: 415 self.iter = path.glob("*") 416 try: 417 return next(self.iter) 418 except StopIteration: 419 raise StopAsyncIteration 420 421 return Lister(timeout=self.timeout) 422 423 @universal_exception 424 async def stat(self, path): 425 return path.stat() 426 427 @universal_exception 428 async def _open(self, path, *args, **kwargs): 429 return path.open(*args, **kwargs) 430 431 @universal_exception 432 @defend_file_methods 433 async def seek(self, file, *args, **kwargs): 434 return file.seek(*args, **kwargs) 435 436 @universal_exception 437 @defend_file_methods 438 async def write(self, file, *args, **kwargs): 439 return file.write(*args, **kwargs) 440 441 @universal_exception 442 @defend_file_methods 443 async def read(self, file, *args, **kwargs): 444 return file.read(*args, **kwargs) 445 446 @universal_exception 447 @defend_file_methods 448 async def close(self, file): 449 return file.close() 450 451 @universal_exception 452 async def rename(self, source, destination): 453 return source.rename(destination) 454 455 456def _blocking_io(f): 457 @functools.wraps(f) 458 async def wrapper(self, *args, **kwargs): 459 return await asyncio.get_running_loop().run_in_executor( 460 self.executor, 461 functools.partial(f, self, *args, **kwargs), 462 ) 463 return wrapper 464 465 466class AsyncPathIO(AbstractPathIO): 467 """ 468 Non-blocking path io. Based on 469 :py:meth:`asyncio.BaseEventLoop.run_in_executor` and 470 :py:class:`pathlib.Path` methods. It's really slow, so it's better to avoid 471 usage of this path io layer. 472 473 :param executor: executor for running blocking tasks 474 :type executor: :py:class:`concurrent.futures.Executor` 475 """ 476 def __init__(self, *args, executor=None, **kwargs): 477 super().__init__(*args, **kwargs) 478 self.executor = executor 479 480 @universal_exception 481 @with_timeout 482 @_blocking_io 483 def exists(self, path): 484 return path.exists() 485 486 @universal_exception 487 @with_timeout 488 @_blocking_io 489 def is_dir(self, path): 490 return path.is_dir() 491 492 @universal_exception 493 @with_timeout 494 @_blocking_io 495 def is_file(self, path): 496 return path.is_file() 497 498 @universal_exception 499 @with_timeout 500 @_blocking_io 501 def mkdir(self, path, *, parents=False, exist_ok=False): 502 return path.mkdir(parents=parents, exist_ok=exist_ok) 503 504 @universal_exception 505 @with_timeout 506 @_blocking_io 507 def rmdir(self, path): 508 return path.rmdir() 509 510 @universal_exception 511 @with_timeout 512 @_blocking_io 513 def unlink(self, path): 514 return path.unlink() 515 516 def list(self, path): 517 518 class Lister(AbstractAsyncLister): 519 iter = None 520 521 def __init__(self, *args, executor=None, **kwargs): 522 super().__init__(*args, **kwargs) 523 self.executor = executor 524 525 def worker(self): 526 try: 527 return next(self.iter) 528 except StopIteration: 529 raise StopAsyncIteration 530 531 @universal_exception 532 @with_timeout 533 @_blocking_io 534 def __anext__(self): 535 if self.iter is None: 536 self.iter = path.glob("*") 537 return self.worker() 538 539 return Lister(timeout=self.timeout, executor=self.executor) 540 541 @universal_exception 542 @with_timeout 543 @_blocking_io 544 def stat(self, path): 545 return path.stat() 546 547 @universal_exception 548 @with_timeout 549 @_blocking_io 550 def _open(self, path, *args, **kwargs): 551 return path.open(*args, **kwargs) 552 553 @universal_exception 554 @defend_file_methods 555 @with_timeout 556 @_blocking_io 557 def seek(self, file, *args, **kwargs): 558 return file.seek(*args, **kwargs) 559 560 @universal_exception 561 @defend_file_methods 562 @with_timeout 563 @_blocking_io 564 def write(self, file, *args, **kwargs): 565 return file.write(*args, **kwargs) 566 567 @universal_exception 568 @defend_file_methods 569 @with_timeout 570 @_blocking_io 571 def read(self, file, *args, **kwargs): 572 return file.read(*args, **kwargs) 573 574 @universal_exception 575 @defend_file_methods 576 @with_timeout 577 @_blocking_io 578 def close(self, file): 579 return file.close() 580 581 @universal_exception 582 @with_timeout 583 @_blocking_io 584 def rename(self, source, destination): 585 return source.rename(destination) 586 587 588class Node: 589 590 def __init__(self, type, name, ctime=None, mtime=None, *, content): 591 self.type = type 592 self.name = name 593 self.ctime = ctime or int(time.time()) 594 self.mtime = mtime or int(time.time()) 595 self.content = content 596 597 def __repr__(self): 598 return f"{self.__class__.__name__}(type={self.type!r}, " \ 599 f"name={self.name!r}, ctime={self.ctime!r}, " \ 600 f"mtime={self.mtime!r}, content={self.content!r})" 601 602 603class MemoryPathIO(AbstractPathIO): 604 """ 605 Non-blocking path io. Based on in-memory tree. It is just proof of concept 606 and probably not so fast as it can be. 607 """ 608 609 Stats = collections.namedtuple( 610 "Stats", 611 ( 612 "st_size", 613 "st_ctime", 614 "st_mtime", 615 "st_nlink", 616 "st_mode", 617 ) 618 ) 619 620 def __init__(self, *args, state=None, cwd=None, **kwargs): 621 super().__init__(*args, **kwargs) 622 self.cwd = pathlib.PurePosixPath(cwd or "/") 623 if state is None: 624 self.fs = [Node("dir", "/", content=[])] 625 else: 626 self.fs = state 627 628 @property 629 def state(self): 630 return self.fs 631 632 def __repr__(self): 633 return repr(self.fs) 634 635 def _absolute(self, path): 636 if not path.is_absolute(): 637 path = self.cwd / path 638 return path 639 640 def get_node(self, path): 641 nodes = self.fs 642 node = None 643 path = self._absolute(path) 644 for part in path.parts: 645 if not isinstance(nodes, list): 646 return 647 for node in nodes: 648 if node.name == part: 649 nodes = node.content 650 break 651 else: 652 return 653 return node 654 655 @universal_exception 656 async def exists(self, path): 657 return self.get_node(path) is not None 658 659 @universal_exception 660 async def is_dir(self, path): 661 node = self.get_node(path) 662 return not (node is None or node.type != "dir") 663 664 @universal_exception 665 async def is_file(self, path): 666 node = self.get_node(path) 667 return not (node is None or node.type != "file") 668 669 @universal_exception 670 async def mkdir(self, path, *, parents=False, exist_ok=False): 671 path = self._absolute(path) 672 node = self.get_node(path) 673 if node: 674 if node.type != "dir" or not exist_ok: 675 raise FileExistsError 676 elif not parents: 677 parent = self.get_node(path.parent) 678 if parent is None: 679 raise FileNotFoundError 680 if parent.type != "dir": 681 raise NotADirectoryError 682 node = Node("dir", path.name, content=[]) 683 parent.content.append(node) 684 else: 685 nodes = self.fs 686 for part in path.parts: 687 if isinstance(nodes, list): 688 for node in nodes: 689 if node.name == part: 690 nodes = node.content 691 break 692 else: 693 node = Node("dir", part, content=[]) 694 nodes.append(node) 695 nodes = node.content 696 else: 697 raise NotADirectoryError 698 699 @universal_exception 700 async def rmdir(self, path): 701 node = self.get_node(path) 702 if node is None: 703 raise FileNotFoundError 704 if node.type != "dir": 705 raise NotADirectoryError 706 if node.content: 707 raise OSError("Directory not empty") 708 709 parent = self.get_node(path.parent) 710 for i, node in enumerate(parent.content): 711 if node.name == path.name: 712 break 713 parent.content.pop(i) 714 715 @universal_exception 716 async def unlink(self, path): 717 node = self.get_node(path) 718 if node is None: 719 raise FileNotFoundError 720 if node.type != "file": 721 raise IsADirectoryError 722 723 parent = self.get_node(path.parent) 724 for i, node in enumerate(parent.content): 725 if node.name == path.name: 726 break 727 parent.content.pop(i) 728 729 def list(self, path): 730 731 class Lister(AbstractAsyncLister): 732 iter = None 733 734 @universal_exception 735 async def __anext__(cls): 736 if cls.iter is None: 737 node = self.get_node(path) 738 if node is None or node.type != "dir": 739 cls.iter = iter(()) 740 else: 741 names = map(operator.attrgetter("name"), node.content) 742 paths = map(lambda name: path / name, names) 743 cls.iter = iter(paths) 744 try: 745 return next(cls.iter) 746 except StopIteration: 747 raise StopAsyncIteration 748 749 return Lister(timeout=self.timeout) 750 751 @universal_exception 752 async def stat(self, path): 753 node = self.get_node(path) 754 if node is None: 755 raise FileNotFoundError 756 757 if node.type == "file": 758 size = len(node.content.getbuffer()) 759 mode = stat.S_IFREG | 0o666 760 else: 761 size = 0 762 mode = stat.S_IFDIR | 0o777 763 return MemoryPathIO.Stats( 764 size, 765 node.ctime, 766 node.mtime, 767 1, 768 mode, 769 ) 770 771 @universal_exception 772 async def _open(self, path, mode="rb", *args, **kwargs): 773 if mode == "rb": 774 node = self.get_node(path) 775 if node is None: 776 raise FileNotFoundError 777 file_like = node.content 778 file_like.seek(0, io.SEEK_SET) 779 elif mode in ("wb", "ab", "r+b"): 780 node = self.get_node(path) 781 if node is None: 782 parent = self.get_node(path.parent) 783 if parent is None or parent.type != "dir": 784 raise FileNotFoundError 785 new_node = Node("file", path.name, content=io.BytesIO()) 786 parent.content.append(new_node) 787 file_like = new_node.content 788 elif node.type != "file": 789 raise IsADirectoryError 790 else: 791 if mode == "wb": 792 file_like = node.content = io.BytesIO() 793 elif mode == "ab": 794 file_like = node.content 795 file_like.seek(0, io.SEEK_END) 796 elif mode == "r+b": 797 file_like = node.content 798 file_like.seek(0, io.SEEK_SET) 799 else: 800 raise ValueError(f"invalid mode: {mode}") 801 return file_like 802 803 @universal_exception 804 @defend_file_methods 805 async def seek(self, file, *args, **kwargs): 806 return file.seek(*args, **kwargs) 807 808 @universal_exception 809 @defend_file_methods 810 async def write(self, file, *args, **kwargs): 811 file.write(*args, **kwargs) 812 file.mtime = int(time.time()) 813 814 @universal_exception 815 @defend_file_methods 816 async def read(self, file, *args, **kwargs): 817 return file.read(*args, **kwargs) 818 819 @universal_exception 820 @defend_file_methods 821 async def close(self, file): 822 pass 823 824 @universal_exception 825 async def rename(self, source, destination): 826 if source != destination: 827 sparent = self.get_node(source.parent) 828 dparent = self.get_node(destination.parent) 829 snode = self.get_node(source) 830 if None in (snode, dparent): 831 raise FileNotFoundError 832 for i, node in enumerate(sparent.content): 833 if node.name == source.name: 834 sparent.content.pop(i) 835 snode.name = destination.name 836 for i, node in enumerate(dparent.content): 837 if node.name == destination.name: 838 dparent.content[i] = snode 839 break 840 else: 841 dparent.content.append(snode) 842