1import io 2import logging 3import os 4import threading 5import warnings 6import weakref 7from distutils.version import LooseVersion 8from errno import ESPIPE 9from glob import has_magic 10from hashlib import sha256 11 12from .callbacks import _DEFAULT_CALLBACK 13from .config import apply_config, conf 14from .dircache import DirCache 15from .transaction import Transaction 16from .utils import ( 17 _unstrip_protocol, 18 get_package_version_without_import, 19 other_paths, 20 read_block, 21 stringify_path, 22 tokenize, 23) 24 25logger = logging.getLogger("fsspec") 26 27 28def make_instance(cls, args, kwargs): 29 return cls(*args, **kwargs) 30 31 32class _Cached(type): 33 """ 34 Metaclass for caching file system instances. 35 36 Notes 37 ----- 38 Instances are cached according to 39 40 * The values of the class attributes listed in `_extra_tokenize_attributes` 41 * The arguments passed to ``__init__``. 42 43 This creates an additional reference to the filesystem, which prevents the 44 filesystem from being garbage collected when all *user* references go away. 45 A call to the :meth:`AbstractFileSystem.clear_instance_cache` must *also* 46 be made for a filesystem instance to be garbage collected. 47 """ 48 49 def __init__(cls, *args, **kwargs): 50 super().__init__(*args, **kwargs) 51 # Note: we intentionally create a reference here, to avoid garbage 52 # collecting instances when all other references are gone. To really 53 # delete a FileSystem, the cache must be cleared. 54 if conf.get("weakref_instance_cache"): # pragma: no cover 55 # debug option for analysing fork/spawn conditions 56 cls._cache = weakref.WeakValueDictionary() 57 else: 58 cls._cache = {} 59 cls._pid = os.getpid() 60 61 def __call__(cls, *args, **kwargs): 62 kwargs = apply_config(cls, kwargs) 63 extra_tokens = tuple( 64 getattr(cls, attr, None) for attr in cls._extra_tokenize_attributes 65 ) 66 token = tokenize( 67 cls, cls._pid, threading.get_ident(), *args, *extra_tokens, **kwargs 68 ) 69 skip = kwargs.pop("skip_instance_cache", False) 70 if os.getpid() != cls._pid: 71 cls._cache.clear() 72 cls._pid = os.getpid() 73 if not skip and cls.cachable and token in cls._cache: 74 return cls._cache[token] 75 else: 76 obj = super().__call__(*args, **kwargs) 77 # Setting _fs_token here causes some static linters to complain. 78 obj._fs_token_ = token 79 obj.storage_args = args 80 obj.storage_options = kwargs 81 if obj.async_impl: 82 from .asyn import mirror_sync_methods 83 84 mirror_sync_methods(obj) 85 86 if cls.cachable and not skip: 87 cls._cache[token] = obj 88 return obj 89 90 91pa_version = get_package_version_without_import("pyarrow") 92if pa_version and LooseVersion(pa_version) < LooseVersion("2.0"): 93 try: 94 import pyarrow as pa 95 96 up = pa.filesystem.DaskFileSystem 97 except ImportError: # pragma: no cover 98 # pyarrow exists but doesn't import for some reason 99 up = object 100else: # pragma: no cover 101 up = object 102 103 104class AbstractFileSystem(up, metaclass=_Cached): 105 """ 106 An abstract super-class for pythonic file-systems 107 108 Implementations are expected to be compatible with or, better, subclass 109 from here. 110 """ 111 112 cachable = True # this class can be cached, instances reused 113 _cached = False 114 blocksize = 2 ** 22 115 sep = "/" 116 protocol = "abstract" 117 async_impl = False 118 root_marker = "" # For some FSs, may require leading '/' or other character 119 120 #: Extra *class attributes* that should be considered when hashing. 121 _extra_tokenize_attributes = () 122 123 def __init__(self, *args, **storage_options): 124 """Create and configure file-system instance 125 126 Instances may be cachable, so if similar enough arguments are seen 127 a new instance is not required. The token attribute exists to allow 128 implementations to cache instances if they wish. 129 130 A reasonable default should be provided if there are no arguments. 131 132 Subclasses should call this method. 133 134 Parameters 135 ---------- 136 use_listings_cache, listings_expiry_time, max_paths: 137 passed to ``DirCache``, if the implementation supports 138 directory listing caching. Pass use_listings_cache=False 139 to disable such caching. 140 skip_instance_cache: bool 141 If this is a cachable implementation, pass True here to force 142 creating a new instance even if a matching instance exists, and prevent 143 storing this instance. 144 asynchronous: bool 145 loop: asyncio-compatible IOLoop or None 146 """ 147 if self._cached: 148 # reusing instance, don't change 149 return 150 self._cached = True 151 self._intrans = False 152 self._transaction = None 153 self._invalidated_caches_in_transaction = [] 154 self.dircache = DirCache(**storage_options) 155 156 if storage_options.pop("add_docs", None): 157 warnings.warn("add_docs is no longer supported.", FutureWarning) 158 159 if storage_options.pop("add_aliases", None): 160 warnings.warn("add_aliases has been removed.", FutureWarning) 161 # This is set in _Cached 162 self._fs_token_ = None 163 164 @property 165 def _fs_token(self): 166 return self._fs_token_ 167 168 def __dask_tokenize__(self): 169 return self._fs_token 170 171 def __hash__(self): 172 return int(self._fs_token, 16) 173 174 def __eq__(self, other): 175 return isinstance(other, type(self)) and self._fs_token == other._fs_token 176 177 def __reduce__(self): 178 return make_instance, (type(self), self.storage_args, self.storage_options) 179 180 @classmethod 181 def _strip_protocol(cls, path): 182 """Turn path from fully-qualified to file-system-specific 183 184 May require FS-specific handling, e.g., for relative paths or links. 185 """ 186 if isinstance(path, list): 187 return [cls._strip_protocol(p) for p in path] 188 path = stringify_path(path) 189 protos = (cls.protocol,) if isinstance(cls.protocol, str) else cls.protocol 190 for protocol in protos: 191 if path.startswith(protocol + "://"): 192 path = path[len(protocol) + 3 :] 193 elif path.startswith(protocol + "::"): 194 path = path[len(protocol) + 2 :] 195 path = path.rstrip("/") 196 # use of root_marker to make minimum required path, e.g., "/" 197 return path or cls.root_marker 198 199 @staticmethod 200 def _get_kwargs_from_urls(path): 201 """If kwargs can be encoded in the paths, extract them here 202 203 This should happen before instantiation of the class; incoming paths 204 then should be amended to strip the options in methods. 205 206 Examples may look like an sftp path "sftp://user@host:/my/path", where 207 the user and host should become kwargs and later get stripped. 208 """ 209 # by default, nothing happens 210 return {} 211 212 @classmethod 213 def current(cls): 214 """Return the most recently created FileSystem 215 216 If no instance has been created, then create one with defaults 217 """ 218 if not len(cls._cache): 219 return cls() 220 else: 221 return list(cls._cache.values())[-1] 222 223 @property 224 def transaction(self): 225 """A context within which files are committed together upon exit 226 227 Requires the file class to implement `.commit()` and `.discard()` 228 for the normal and exception cases. 229 """ 230 if self._transaction is None: 231 self._transaction = Transaction(self) 232 return self._transaction 233 234 def start_transaction(self): 235 """Begin write transaction for deferring files, non-context version""" 236 self._intrans = True 237 self._transaction = Transaction(self) 238 return self.transaction 239 240 def end_transaction(self): 241 """Finish write transaction, non-context version""" 242 self.transaction.complete() 243 self._transaction = None 244 # The invalid cache must be cleared after the transcation is completed. 245 for path in self._invalidated_caches_in_transaction: 246 self.invalidate_cache(path) 247 self._invalidated_caches_in_transaction.clear() 248 249 def invalidate_cache(self, path=None): 250 """ 251 Discard any cached directory information 252 253 Parameters 254 ---------- 255 path: string or None 256 If None, clear all listings cached else listings at or under given 257 path. 258 """ 259 # Not necessary to implement invalidation mechanism, may have no cache. 260 # But if have, you should call this method of parent class from your 261 # subclass to ensure expiring caches after transacations correctly. 262 # See the implementation of FTPFileSystem in ftp.py 263 if self._intrans: 264 self._invalidated_caches_in_transaction.append(path) 265 266 def mkdir(self, path, create_parents=True, **kwargs): 267 """ 268 Create directory entry at path 269 270 For systems that don't have true directories, may create an for 271 this instance only and not touch the real filesystem 272 273 Parameters 274 ---------- 275 path: str 276 location 277 create_parents: bool 278 if True, this is equivalent to ``makedirs`` 279 kwargs: 280 may be permissions, etc. 281 """ 282 pass # not necessary to implement, may not have directories 283 284 def makedirs(self, path, exist_ok=False): 285 """Recursively make directories 286 287 Creates directory at path and any intervening required directories. 288 Raises exception if, for instance, the path already exists but is a 289 file. 290 291 Parameters 292 ---------- 293 path: str 294 leaf directory name 295 exist_ok: bool (False) 296 If False, will error if the target already exists 297 """ 298 pass # not necessary to implement, may not have directories 299 300 def rmdir(self, path): 301 """Remove a directory, if empty""" 302 pass # not necessary to implement, may not have directories 303 304 def ls(self, path, detail=True, **kwargs): 305 """List objects at path. 306 307 This should include subdirectories and files at that location. The 308 difference between a file and a directory must be clear when details 309 are requested. 310 311 The specific keys, or perhaps a FileInfo class, or similar, is TBD, 312 but must be consistent across implementations. 313 Must include: 314 315 - full path to the entry (without protocol) 316 - size of the entry, in bytes. If the value cannot be determined, will 317 be ``None``. 318 - type of entry, "file", "directory" or other 319 320 Additional information 321 may be present, aproriate to the file-system, e.g., generation, 322 checksum, etc. 323 324 May use refresh=True|False to allow use of self._ls_from_cache to 325 check for a saved listing and avoid calling the backend. This would be 326 common where listing may be expensive. 327 328 Parameters 329 ---------- 330 path: str 331 detail: bool 332 if True, gives a list of dictionaries, where each is the same as 333 the result of ``info(path)``. If False, gives a list of paths 334 (str). 335 kwargs: may have additional backend-specific options, such as version 336 information 337 338 Returns 339 ------- 340 List of strings if detail is False, or list of directory information 341 dicts if detail is True. 342 """ 343 raise NotImplementedError 344 345 def _ls_from_cache(self, path): 346 """Check cache for listing 347 348 Returns listing, if found (may me empty list for a directly that exists 349 but contains nothing), None if not in cache. 350 """ 351 parent = self._parent(path) 352 if path.rstrip("/") in self.dircache: 353 return self.dircache[path.rstrip("/")] 354 try: 355 files = [ 356 f 357 for f in self.dircache[parent] 358 if f["name"] == path 359 or (f["name"] == path.rstrip("/") and f["type"] == "directory") 360 ] 361 if len(files) == 0: 362 # parent dir was listed but did not contain this file 363 raise FileNotFoundError(path) 364 return files 365 except KeyError: 366 pass 367 368 def walk(self, path, maxdepth=None, **kwargs): 369 """Return all files belows path 370 371 List all files, recursing into subdirectories; output is iterator-style, 372 like ``os.walk()``. For a simple list of files, ``find()`` is available. 373 374 Note that the "files" outputted will include anything that is not 375 a directory, such as links. 376 377 Parameters 378 ---------- 379 path: str 380 Root to recurse into 381 maxdepth: int 382 Maximum recursion depth. None means limitless, but not recommended 383 on link-based file-systems. 384 kwargs: passed to ``ls`` 385 """ 386 path = self._strip_protocol(path) 387 full_dirs = {} 388 dirs = {} 389 files = {} 390 391 detail = kwargs.pop("detail", False) 392 try: 393 listing = self.ls(path, detail=True, **kwargs) 394 except (FileNotFoundError, IOError): 395 if detail: 396 return path, {}, {} 397 return path, [], [] 398 399 for info in listing: 400 # each info name must be at least [path]/part , but here 401 # we check also for names like [path]/part/ 402 pathname = info["name"].rstrip("/") 403 name = pathname.rsplit("/", 1)[-1] 404 if info["type"] == "directory" and pathname != path: 405 # do not include "self" path 406 full_dirs[pathname] = info 407 dirs[name] = info 408 elif pathname == path: 409 # file-like with same name as give path 410 files[""] = info 411 else: 412 files[name] = info 413 414 if detail: 415 yield path, dirs, files 416 else: 417 yield path, list(dirs), list(files) 418 419 if maxdepth is not None: 420 maxdepth -= 1 421 if maxdepth < 1: 422 return 423 424 for d in full_dirs: 425 yield from self.walk(d, maxdepth=maxdepth, detail=detail, **kwargs) 426 427 def find(self, path, maxdepth=None, withdirs=False, **kwargs): 428 """List all files below path. 429 430 Like posix ``find`` command without conditions 431 432 Parameters 433 ---------- 434 path : str 435 maxdepth: int or None 436 If not None, the maximum number of levels to descend 437 withdirs: bool 438 Whether to include directory paths in the output. This is True 439 when used by glob, but users usually only want files. 440 kwargs are passed to ``ls``. 441 """ 442 # TODO: allow equivalent of -name parameter 443 path = self._strip_protocol(path) 444 out = dict() 445 detail = kwargs.pop("detail", False) 446 for _, dirs, files in self.walk(path, maxdepth, detail=True, **kwargs): 447 if withdirs: 448 files.update(dirs) 449 out.update({info["name"]: info for name, info in files.items()}) 450 if not out and self.isfile(path): 451 # walk works on directories, but find should also return [path] 452 # when path happens to be a file 453 out[path] = {} 454 names = sorted(out) 455 if not detail: 456 return names 457 else: 458 return {name: out[name] for name in names} 459 460 def du(self, path, total=True, maxdepth=None, **kwargs): 461 """Space used by files within a path 462 463 Parameters 464 ---------- 465 path: str 466 total: bool 467 whether to sum all the file sizes 468 maxdepth: int or None 469 maximum number of directory levels to descend, None for unlimited. 470 kwargs: passed to ``ls`` 471 472 Returns 473 ------- 474 Dict of {fn: size} if total=False, or int otherwise, where numbers 475 refer to bytes used. 476 """ 477 sizes = {} 478 for f in self.find(path, maxdepth=maxdepth, **kwargs): 479 info = self.info(f) 480 sizes[info["name"]] = info["size"] 481 if total: 482 return sum(sizes.values()) 483 else: 484 return sizes 485 486 def glob(self, path, **kwargs): 487 """ 488 Find files by glob-matching. 489 490 If the path ends with '/' and does not contain "*", it is essentially 491 the same as ``ls(path)``, returning only files. 492 493 We support ``"**"``, 494 ``"?"`` and ``"[..]"``. We do not support ^ for pattern negation. 495 496 Search path names that contain embedded characters special to this 497 implementation of glob may not produce expected results; 498 e.g., 'foo/bar/*starredfilename*'. 499 500 kwargs are passed to ``ls``. 501 """ 502 import re 503 504 ends = path.endswith("/") 505 path = self._strip_protocol(path) 506 indstar = path.find("*") if path.find("*") >= 0 else len(path) 507 indques = path.find("?") if path.find("?") >= 0 else len(path) 508 indbrace = path.find("[") if path.find("[") >= 0 else len(path) 509 510 ind = min(indstar, indques, indbrace) 511 512 detail = kwargs.pop("detail", False) 513 514 if not has_magic(path): 515 root = path 516 depth = 1 517 if ends: 518 path += "/*" 519 elif self.exists(path): 520 if not detail: 521 return [path] 522 else: 523 return {path: self.info(path)} 524 else: 525 if not detail: 526 return [] # glob of non-existent returns empty 527 else: 528 return {} 529 elif "/" in path[:ind]: 530 ind2 = path[:ind].rindex("/") 531 root = path[: ind2 + 1] 532 depth = None if "**" in path else path[ind2 + 1 :].count("/") + 1 533 else: 534 root = "" 535 depth = None if "**" in path else path[ind + 1 :].count("/") + 1 536 537 allpaths = self.find(root, maxdepth=depth, withdirs=True, detail=True, **kwargs) 538 # Escape characters special to python regex, leaving our supported 539 # special characters in place. 540 # See https://www.gnu.org/software/bash/manual/html_node/Pattern-Matching.html 541 # for shell globbing details. 542 pattern = ( 543 "^" 544 + ( 545 path.replace("\\", r"\\") 546 .replace(".", r"\.") 547 .replace("+", r"\+") 548 .replace("//", "/") 549 .replace("(", r"\(") 550 .replace(")", r"\)") 551 .replace("|", r"\|") 552 .replace("^", r"\^") 553 .replace("$", r"\$") 554 .replace("{", r"\{") 555 .replace("}", r"\}") 556 .rstrip("/") 557 .replace("?", ".") 558 ) 559 + "$" 560 ) 561 pattern = re.sub("[*]{2}", "=PLACEHOLDER=", pattern) 562 pattern = re.sub("[*]", "[^/]*", pattern) 563 pattern = re.compile(pattern.replace("=PLACEHOLDER=", ".*")) 564 out = { 565 p: allpaths[p] 566 for p in sorted(allpaths) 567 if pattern.match(p.replace("//", "/").rstrip("/")) 568 } 569 if detail: 570 return out 571 else: 572 return list(out) 573 574 def exists(self, path, **kwargs): 575 """Is there a file at the given path""" 576 try: 577 self.info(path, **kwargs) 578 return True 579 except: # noqa: E722 580 # any exception allowed bar FileNotFoundError? 581 return False 582 583 def lexists(self, path, **kwargs): 584 """If there is a file at the given path (including 585 broken links)""" 586 return self.exists(path) 587 588 def info(self, path, **kwargs): 589 """Give details of entry at path 590 591 Returns a single dictionary, with exactly the same information as ``ls`` 592 would with ``detail=True``. 593 594 The default implementation should calls ls and could be overridden by a 595 shortcut. kwargs are passed on to ```ls()``. 596 597 Some file systems might not be able to measure the file's size, in 598 which case, the returned dict will include ``'size': None``. 599 600 Returns 601 ------- 602 dict with keys: name (full path in the FS), size (in bytes), type (file, 603 directory, or something else) and other FS-specific keys. 604 """ 605 path = self._strip_protocol(path) 606 out = self.ls(self._parent(path), detail=True, **kwargs) 607 out = [o for o in out if o["name"].rstrip("/") == path] 608 if out: 609 return out[0] 610 out = self.ls(path, detail=True, **kwargs) 611 path = path.rstrip("/") 612 out1 = [o for o in out if o["name"].rstrip("/") == path] 613 if len(out1) == 1: 614 if "size" not in out1[0]: 615 out1[0]["size"] = None 616 return out1[0] 617 elif len(out1) > 1 or out: 618 return {"name": path, "size": 0, "type": "directory"} 619 else: 620 raise FileNotFoundError(path) 621 622 def checksum(self, path): 623 """Unique value for current version of file 624 625 If the checksum is the same from one moment to another, the contents 626 are guaranteed to be the same. If the checksum changes, the contents 627 *might* have changed. 628 629 This should normally be overridden; default will probably capture 630 creation/modification timestamp (which would be good) or maybe 631 access timestamp (which would be bad) 632 """ 633 return int(tokenize(self.info(path)), 16) 634 635 def size(self, path): 636 """Size in bytes of file""" 637 return self.info(path).get("size", None) 638 639 def sizes(self, paths): 640 """Size in bytes of each file in a list of paths""" 641 return [self.size(p) for p in paths] 642 643 def isdir(self, path): 644 """Is this entry directory-like?""" 645 try: 646 return self.info(path)["type"] == "directory" 647 except IOError: 648 return False 649 650 def isfile(self, path): 651 """Is this entry file-like?""" 652 try: 653 return self.info(path)["type"] == "file" 654 except: # noqa: E722 655 return False 656 657 def cat_file(self, path, start=None, end=None, **kwargs): 658 """Get the content of a file 659 660 Parameters 661 ---------- 662 path: URL of file on this filesystems 663 start, end: int 664 Bytes limits of the read. If negative, backwards from end, 665 like usual python slices. Either can be None for start or 666 end of file, respectively 667 kwargs: passed to ``open()``. 668 """ 669 # explicitly set buffering off? 670 with self.open(path, "rb", **kwargs) as f: 671 if start is not None: 672 if start >= 0: 673 f.seek(start) 674 else: 675 f.seek(max(0, f.size + start)) 676 if end is not None: 677 if end < 0: 678 end = f.size + end 679 return f.read(end - f.tell()) 680 return f.read() 681 682 def pipe_file(self, path, value, **kwargs): 683 """Set the bytes of given file""" 684 with self.open(path, "wb") as f: 685 f.write(value) 686 687 def pipe(self, path, value=None, **kwargs): 688 """Put value into path 689 690 (counterpart to ``cat``) 691 692 Parameters 693 ---------- 694 path: string or dict(str, bytes) 695 If a string, a single remote location to put ``value`` bytes; if a dict, 696 a mapping of {path: bytesvalue}. 697 value: bytes, optional 698 If using a single path, these are the bytes to put there. Ignored if 699 ``path`` is a dict 700 """ 701 if isinstance(path, str): 702 self.pipe_file(self._strip_protocol(path), value, **kwargs) 703 elif isinstance(path, dict): 704 for k, v in path.items(): 705 self.pipe_file(self._strip_protocol(k), v, **kwargs) 706 else: 707 raise ValueError("path must be str or dict") 708 709 def cat_ranges(self, paths, starts, ends, max_gap=None, **kwargs): 710 if max_gap is not None: 711 raise NotImplementedError 712 if not isinstance(paths, list): 713 raise TypeError 714 if not isinstance(starts, list): 715 starts = [starts] * len(paths) 716 if not isinstance(ends, list): 717 ends = [starts] * len(paths) 718 if len(starts) != len(paths) or len(ends) != len(paths): 719 raise ValueError 720 return [self.cat_file(p, s, e) for p, s, e in zip(paths, starts, ends)] 721 722 def cat(self, path, recursive=False, on_error="raise", **kwargs): 723 """Fetch (potentially multiple) paths' contents 724 725 Parameters 726 ---------- 727 recursive: bool 728 If True, assume the path(s) are directories, and get all the 729 contained files 730 on_error : "raise", "omit", "return" 731 If raise, an underlying exception will be raised (converted to KeyError 732 if the type is in self.missing_exceptions); if omit, keys with exception 733 will simply not be included in the output; if "return", all keys are 734 included in the output, but the value will be bytes or an exception 735 instance. 736 kwargs: passed to cat_file 737 738 Returns 739 ------- 740 dict of {path: contents} if there are multiple paths 741 or the path has been otherwise expanded 742 """ 743 paths = self.expand_path(path, recursive=recursive) 744 if ( 745 len(paths) > 1 746 or isinstance(path, list) 747 or paths[0] != self._strip_protocol(path) 748 ): 749 out = {} 750 for path in paths: 751 try: 752 out[path] = self.cat_file(path, **kwargs) 753 except Exception as e: 754 if on_error == "raise": 755 raise 756 if on_error == "return": 757 out[path] = e 758 return out 759 else: 760 return self.cat_file(paths[0], **kwargs) 761 762 def get_file(self, rpath, lpath, callback=_DEFAULT_CALLBACK, **kwargs): 763 """Copy single remote file to local""" 764 if self.isdir(rpath): 765 os.makedirs(lpath, exist_ok=True) 766 return None 767 768 with self.open(rpath, "rb", **kwargs) as f1: 769 callback.set_size(getattr(f1, "size", None)) 770 with open(lpath, "wb") as f2: 771 data = True 772 while data: 773 data = f1.read(self.blocksize) 774 segment_len = f2.write(data) 775 callback.relative_update(segment_len) 776 777 def get(self, rpath, lpath, recursive=False, callback=_DEFAULT_CALLBACK, **kwargs): 778 """Copy file(s) to local. 779 780 Copies a specific file or tree of files (if recursive=True). If lpath 781 ends with a "/", it will be assumed to be a directory, and target files 782 will go within. Can submit a list of paths, which may be glob-patterns 783 and will be expanded. 784 785 Calls get_file for each source. 786 """ 787 from .implementations.local import make_path_posix 788 789 if isinstance(lpath, str): 790 lpath = make_path_posix(lpath) 791 rpaths = self.expand_path(rpath, recursive=recursive) 792 lpaths = other_paths(rpaths, lpath) 793 794 callback.set_size(len(lpaths)) 795 for lpath, rpath in callback.wrap(zip(lpaths, rpaths)): 796 callback.branch(rpath, lpath, kwargs) 797 self.get_file(rpath, lpath, **kwargs) 798 799 def put_file(self, lpath, rpath, callback=_DEFAULT_CALLBACK, **kwargs): 800 """Copy single file to remote""" 801 if os.path.isdir(lpath): 802 self.makedirs(rpath, exist_ok=True) 803 return None 804 805 with open(lpath, "rb") as f1: 806 callback.set_size(f1.seek(0, 2)) 807 f1.seek(0) 808 809 self.mkdirs(self._parent(os.fspath(rpath)), exist_ok=True) 810 with self.open(rpath, "wb", **kwargs) as f2: 811 data = True 812 while data: 813 data = f1.read(self.blocksize) 814 segment_len = f2.write(data) 815 callback.relative_update(segment_len) 816 817 def put(self, lpath, rpath, recursive=False, callback=_DEFAULT_CALLBACK, **kwargs): 818 """Copy file(s) from local. 819 820 Copies a specific file or tree of files (if recursive=True). If rpath 821 ends with a "/", it will be assumed to be a directory, and target files 822 will go within. 823 824 Calls put_file for each source. 825 """ 826 from .implementations.local import LocalFileSystem, make_path_posix 827 828 rpath = ( 829 self._strip_protocol(rpath) 830 if isinstance(rpath, str) 831 else [self._strip_protocol(p) for p in rpath] 832 ) 833 if isinstance(lpath, str): 834 lpath = make_path_posix(lpath) 835 fs = LocalFileSystem() 836 lpaths = fs.expand_path(lpath, recursive=recursive) 837 rpaths = other_paths( 838 lpaths, rpath, exists=isinstance(rpath, str) and self.isdir(rpath) 839 ) 840 841 callback.set_size(len(rpaths)) 842 for lpath, rpath in callback.wrap(zip(lpaths, rpaths)): 843 callback.branch(lpath, rpath, kwargs) 844 self.put_file(lpath, rpath, **kwargs) 845 846 def head(self, path, size=1024): 847 """Get the first ``size`` bytes from file""" 848 with self.open(path, "rb") as f: 849 return f.read(size) 850 851 def tail(self, path, size=1024): 852 """Get the last ``size`` bytes from file""" 853 with self.open(path, "rb") as f: 854 f.seek(max(-size, -f.size), 2) 855 return f.read() 856 857 def cp_file(self, path1, path2, **kwargs): 858 raise NotImplementedError 859 860 def copy(self, path1, path2, recursive=False, on_error=None, **kwargs): 861 """Copy within two locations in the filesystem 862 863 on_error : "raise", "ignore" 864 If raise, any not-found exceptions will be raised; if ignore any 865 not-found exceptions will cause the path to be skipped; defaults to 866 raise unless recursive is true, where the default is ignore 867 """ 868 if on_error is None and recursive: 869 on_error = "ignore" 870 elif on_error is None: 871 on_error = "raise" 872 873 paths = self.expand_path(path1, recursive=recursive) 874 path2 = other_paths(paths, path2) 875 for p1, p2 in zip(paths, path2): 876 try: 877 self.cp_file(p1, p2, **kwargs) 878 except FileNotFoundError: 879 if on_error == "raise": 880 raise 881 882 def expand_path(self, path, recursive=False, maxdepth=None): 883 """Turn one or more globs or directories into a list of all matching paths 884 to files or directories.""" 885 if isinstance(path, str): 886 out = self.expand_path([path], recursive, maxdepth) 887 else: 888 # reduce depth on each recursion level unless None or 0 889 maxdepth = maxdepth if not maxdepth else maxdepth - 1 890 out = set() 891 path = [self._strip_protocol(p) for p in path] 892 for p in path: 893 if has_magic(p): 894 bit = set(self.glob(p)) 895 out |= bit 896 if recursive: 897 out |= set( 898 self.expand_path( 899 list(bit), recursive=recursive, maxdepth=maxdepth 900 ) 901 ) 902 continue 903 elif recursive: 904 rec = set(self.find(p, maxdepth=maxdepth, withdirs=True)) 905 out |= rec 906 if p not in out and (recursive is False or self.exists(p)): 907 # should only check once, for the root 908 out.add(p) 909 if not out: 910 raise FileNotFoundError(path) 911 return list(sorted(out)) 912 913 def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs): 914 """Move file(s) from one location to another""" 915 self.copy(path1, path2, recursive=recursive, maxdepth=maxdepth) 916 self.rm(path1, recursive=recursive) 917 918 def rm_file(self, path): 919 """Delete a file""" 920 self._rm(path) 921 922 def _rm(self, path): 923 """Delete one file""" 924 # this is the old name for the method, prefer rm_file 925 raise NotImplementedError 926 927 def rm(self, path, recursive=False, maxdepth=None): 928 """Delete files. 929 930 Parameters 931 ---------- 932 path: str or list of str 933 File(s) to delete. 934 recursive: bool 935 If file(s) are directories, recursively delete contents and then 936 also remove the directory 937 maxdepth: int or None 938 Depth to pass to walk for finding files to delete, if recursive. 939 If None, there will be no limit and infinite recursion may be 940 possible. 941 """ 942 path = self.expand_path(path, recursive=recursive, maxdepth=maxdepth) 943 for p in reversed(path): 944 self.rm_file(p) 945 946 @classmethod 947 def _parent(cls, path): 948 path = cls._strip_protocol(path.rstrip("/")) 949 if "/" in path: 950 parent = path.rsplit("/", 1)[0].lstrip(cls.root_marker) 951 return cls.root_marker + parent 952 else: 953 return cls.root_marker 954 955 def _open( 956 self, 957 path, 958 mode="rb", 959 block_size=None, 960 autocommit=True, 961 cache_options=None, 962 **kwargs, 963 ): 964 """Return raw bytes-mode file-like from the file-system""" 965 return AbstractBufferedFile( 966 self, 967 path, 968 mode, 969 block_size, 970 autocommit, 971 cache_options=cache_options, 972 **kwargs, 973 ) 974 975 def open( 976 self, 977 path, 978 mode="rb", 979 block_size=None, 980 cache_options=None, 981 compression=None, 982 **kwargs, 983 ): 984 """ 985 Return a file-like object from the filesystem 986 987 The resultant instance must function correctly in a context ``with`` 988 block. 989 990 Parameters 991 ---------- 992 path: str 993 Target file 994 mode: str like 'rb', 'w' 995 See builtin ``open()`` 996 block_size: int 997 Some indication of buffering - this is a value in bytes 998 cache_options : dict, optional 999 Extra arguments to pass through to the cache. 1000 compression: string or None 1001 If given, open file using compression codec. Can either be a compression 1002 name (a key in ``fsspec.compression.compr``) or "infer" to guess the 1003 compression from the filename suffix. 1004 encoding, errors, newline: passed on to TextIOWrapper for text mode 1005 """ 1006 import io 1007 1008 path = self._strip_protocol(path) 1009 if "b" not in mode: 1010 mode = mode.replace("t", "") + "b" 1011 1012 text_kwargs = { 1013 k: kwargs.pop(k) 1014 for k in ["encoding", "errors", "newline"] 1015 if k in kwargs 1016 } 1017 return io.TextIOWrapper( 1018 self.open( 1019 path, 1020 mode, 1021 block_size=block_size, 1022 cache_options=cache_options, 1023 compression=compression, 1024 **kwargs, 1025 ), 1026 **text_kwargs, 1027 ) 1028 else: 1029 ac = kwargs.pop("autocommit", not self._intrans) 1030 f = self._open( 1031 path, 1032 mode=mode, 1033 block_size=block_size, 1034 autocommit=ac, 1035 cache_options=cache_options, 1036 **kwargs, 1037 ) 1038 if compression is not None: 1039 from fsspec.compression import compr 1040 from fsspec.core import get_compression 1041 1042 compression = get_compression(path, compression) 1043 compress = compr[compression] 1044 f = compress(f, mode=mode[0]) 1045 1046 if not ac and "r" not in mode: 1047 self.transaction.files.append(f) 1048 return f 1049 1050 def touch(self, path, truncate=True, **kwargs): 1051 """Create empty file, or update timestamp 1052 1053 Parameters 1054 ---------- 1055 path: str 1056 file location 1057 truncate: bool 1058 If True, always set file size to 0; if False, update timestamp and 1059 leave file unchanged, if backend allows this 1060 """ 1061 if truncate or not self.exists(path): 1062 with self.open(path, "wb", **kwargs): 1063 pass 1064 else: 1065 raise NotImplementedError # update timestamp, if possible 1066 1067 def ukey(self, path): 1068 """Hash of file properties, to tell if it has changed""" 1069 return sha256(str(self.info(path)).encode()).hexdigest() 1070 1071 def read_block(self, fn, offset, length, delimiter=None): 1072 """Read a block of bytes from 1073 1074 Starting at ``offset`` of the file, read ``length`` bytes. If 1075 ``delimiter`` is set then we ensure that the read starts and stops at 1076 delimiter boundaries that follow the locations ``offset`` and ``offset 1077 + length``. If ``offset`` is zero then we start at zero. The 1078 bytestring returned WILL include the end delimiter string. 1079 1080 If offset+length is beyond the eof, reads to eof. 1081 1082 Parameters 1083 ---------- 1084 fn: string 1085 Path to filename 1086 offset: int 1087 Byte offset to start read 1088 length: int 1089 Number of bytes to read 1090 delimiter: bytes (optional) 1091 Ensure reading starts and stops at delimiter bytestring 1092 1093 Examples 1094 -------- 1095 >>> fs.read_block('data/file.csv', 0, 13) # doctest: +SKIP 1096 b'Alice, 100\\nBo' 1097 >>> fs.read_block('data/file.csv', 0, 13, delimiter=b'\\n') # doctest: +SKIP 1098 b'Alice, 100\\nBob, 200\\n' 1099 1100 Use ``length=None`` to read to the end of the file. 1101 >>> fs.read_block('data/file.csv', 0, None, delimiter=b'\\n') # doctest: +SKIP 1102 b'Alice, 100\\nBob, 200\\nCharlie, 300' 1103 1104 See Also 1105 -------- 1106 utils.read_block 1107 """ 1108 with self.open(fn, "rb") as f: 1109 size = f.size 1110 if length is None: 1111 length = size 1112 if size is not None and offset + length > size: 1113 length = size - offset 1114 return read_block(f, offset, length, delimiter) 1115 1116 def to_json(self): 1117 """ 1118 JSON representation of this filesystem instance 1119 1120 Returns 1121 ------- 1122 str: JSON structure with keys cls (the python location of this class), 1123 protocol (text name of this class's protocol, first one in case of 1124 multiple), args (positional args, usually empty), and all other 1125 kwargs as their own keys. 1126 """ 1127 import json 1128 1129 cls = type(self) 1130 cls = ".".join((cls.__module__, cls.__name__)) 1131 proto = ( 1132 self.protocol[0] 1133 if isinstance(self.protocol, (tuple, list)) 1134 else self.protocol 1135 ) 1136 return json.dumps( 1137 dict( 1138 **{"cls": cls, "protocol": proto, "args": self.storage_args}, 1139 **self.storage_options, 1140 ) 1141 ) 1142 1143 @staticmethod 1144 def from_json(blob): 1145 """ 1146 Recreate a filesystem instance from JSON representation 1147 1148 See ``.to_json()`` for the expected structure of the input 1149 1150 Parameters 1151 ---------- 1152 blob: str 1153 1154 Returns 1155 ------- 1156 file system instance, not necessarily of this particular class. 1157 """ 1158 import json 1159 1160 from .registry import _import_class, get_filesystem_class 1161 1162 dic = json.loads(blob) 1163 protocol = dic.pop("protocol") 1164 try: 1165 cls = _import_class(dic.pop("cls")) 1166 except (ImportError, ValueError, RuntimeError, KeyError): 1167 cls = get_filesystem_class(protocol) 1168 return cls(*dic.pop("args", ()), **dic) 1169 1170 def _get_pyarrow_filesystem(self): 1171 """ 1172 Make a version of the FS instance which will be acceptable to pyarrow 1173 """ 1174 # all instances already also derive from pyarrow 1175 return self 1176 1177 def get_mapper(self, root, check=False, create=False): 1178 """Create key/value store based on this file-system 1179 1180 Makes a MutableMapping interface to the FS at the given root path. 1181 See ``fsspec.mapping.FSMap`` for further details. 1182 """ 1183 from .mapping import FSMap 1184 1185 return FSMap(root, self, check, create) 1186 1187 @classmethod 1188 def clear_instance_cache(cls): 1189 """ 1190 Clear the cache of filesystem instances. 1191 1192 Notes 1193 ----- 1194 Unless overridden by setting the ``cachable`` class attribute to False, 1195 the filesystem class stores a reference to newly created instances. This 1196 prevents Python's normal rules around garbage collection from working, 1197 since the instances refcount will not drop to zero until 1198 ``clear_instance_cache`` is called. 1199 """ 1200 cls._cache.clear() 1201 1202 def created(self, path): 1203 """Return the created timestamp of a file as a datetime.datetime""" 1204 raise NotImplementedError 1205 1206 def modified(self, path): 1207 """Return the modified timestamp of a file as a datetime.datetime""" 1208 raise NotImplementedError 1209 1210 # ------------------------------------------------------------------------ 1211 # Aliases 1212 1213 def makedir(self, path, create_parents=True, **kwargs): 1214 """Alias of `AbstractFileSystem.mkdir`.""" 1215 return self.mkdir(path, create_parents=create_parents, **kwargs) 1216 1217 def mkdirs(self, path, exist_ok=False): 1218 """Alias of `AbstractFileSystem.makedirs`.""" 1219 return self.makedirs(path, exist_ok=exist_ok) 1220 1221 def listdir(self, path, detail=True, **kwargs): 1222 """Alias of `AbstractFileSystem.ls`.""" 1223 return self.ls(path, detail=detail, **kwargs) 1224 1225 def cp(self, path1, path2, **kwargs): 1226 """Alias of `AbstractFileSystem.copy`.""" 1227 return self.copy(path1, path2, **kwargs) 1228 1229 def move(self, path1, path2, **kwargs): 1230 """Alias of `AbstractFileSystem.mv`.""" 1231 return self.mv(path1, path2, **kwargs) 1232 1233 def stat(self, path, **kwargs): 1234 """Alias of `AbstractFileSystem.info`.""" 1235 return self.info(path, **kwargs) 1236 1237 def disk_usage(self, path, total=True, maxdepth=None, **kwargs): 1238 """Alias of `AbstractFileSystem.du`.""" 1239 return self.du(path, total=total, maxdepth=maxdepth, **kwargs) 1240 1241 def rename(self, path1, path2, **kwargs): 1242 """Alias of `AbstractFileSystem.mv`.""" 1243 return self.mv(path1, path2, **kwargs) 1244 1245 def delete(self, path, recursive=False, maxdepth=None): 1246 """Alias of `AbstractFileSystem.rm`.""" 1247 return self.rm(path, recursive=recursive, maxdepth=maxdepth) 1248 1249 def upload(self, lpath, rpath, recursive=False, **kwargs): 1250 """Alias of `AbstractFileSystem.put`.""" 1251 return self.put(lpath, rpath, recursive=recursive, **kwargs) 1252 1253 def download(self, rpath, lpath, recursive=False, **kwargs): 1254 """Alias of `AbstractFileSystem.get`.""" 1255 return self.get(rpath, lpath, recursive=recursive, **kwargs) 1256 1257 def sign(self, path, expiration=100, **kwargs): 1258 """Create a signed URL representing the given path 1259 1260 Some implementations allow temporary URLs to be generated, as a 1261 way of delegating credentials. 1262 1263 Parameters 1264 ---------- 1265 path : str 1266 The path on the filesystem 1267 expiration : int 1268 Number of seconds to enable the URL for (if supported) 1269 1270 Returns 1271 ------- 1272 URL : str 1273 The signed URL 1274 1275 Raises 1276 ------ 1277 NotImplementedError : if method is not implemented for a filesystem 1278 """ 1279 raise NotImplementedError("Sign is not implemented for this filesystem") 1280 1281 def _isfilestore(self): 1282 # Originally inherited from pyarrow DaskFileSystem. Keeping this 1283 # here for backwards compatibility as long as pyarrow uses its 1284 # legacy fsspec-compatible filesystems and thus accepts fsspec 1285 # filesystems as well 1286 return False 1287 1288 1289class AbstractBufferedFile(io.IOBase): 1290 """Convenient class to derive from to provide buffering 1291 1292 In the case that the backend does not provide a pythonic file-like object 1293 already, this class contains much of the logic to build one. The only 1294 methods that need to be overridden are ``_upload_chunk``, 1295 ``_initiate_upload`` and ``_fetch_range``. 1296 """ 1297 1298 DEFAULT_BLOCK_SIZE = 5 * 2 ** 20 1299 _details = None 1300 1301 def __init__( 1302 self, 1303 fs, 1304 path, 1305 mode="rb", 1306 block_size="default", 1307 autocommit=True, 1308 cache_type="readahead", 1309 cache_options=None, 1310 size=None, 1311 **kwargs, 1312 ): 1313 """ 1314 Template for files with buffered reading and writing 1315 1316 Parameters 1317 ---------- 1318 fs: instance of FileSystem 1319 path: str 1320 location in file-system 1321 mode: str 1322 Normal file modes. Currently only 'wb', 'ab' or 'rb'. Some file 1323 systems may be read-only, and some may not support append. 1324 block_size: int 1325 Buffer size for reading or writing, 'default' for class default 1326 autocommit: bool 1327 Whether to write to final destination; may only impact what 1328 happens when file is being closed. 1329 cache_type: {"readahead", "none", "mmap", "bytes"}, default "readahead" 1330 Caching policy in read mode. See the definitions in ``core``. 1331 cache_options : dict 1332 Additional options passed to the constructor for the cache specified 1333 by `cache_type`. 1334 size: int 1335 If given and in read mode, suppressed having to look up the file size 1336 kwargs: 1337 Gets stored as self.kwargs 1338 """ 1339 from .core import caches 1340 1341 self.path = path 1342 self.fs = fs 1343 self.mode = mode 1344 self.blocksize = ( 1345 self.DEFAULT_BLOCK_SIZE if block_size in ["default", None] else block_size 1346 ) 1347 self.loc = 0 1348 self.autocommit = autocommit 1349 self.end = None 1350 self.start = None 1351 self.closed = False 1352 1353 if cache_options is None: 1354 cache_options = {} 1355 1356 if "trim" in kwargs: 1357 warnings.warn( 1358 "Passing 'trim' to control the cache behavior has been deprecated. " 1359 "Specify it within the 'cache_options' argument instead.", 1360 FutureWarning, 1361 ) 1362 cache_options["trim"] = kwargs.pop("trim") 1363 1364 self.kwargs = kwargs 1365 1366 if mode not in {"ab", "rb", "wb"}: 1367 raise NotImplementedError("File mode not supported") 1368 if mode == "rb": 1369 if size is not None: 1370 self.size = size 1371 else: 1372 self.size = self.details["size"] 1373 self.cache = caches[cache_type]( 1374 self.blocksize, self._fetch_range, self.size, **cache_options 1375 ) 1376 else: 1377 self.buffer = io.BytesIO() 1378 self.offset = None 1379 self.forced = False 1380 self.location = None 1381 1382 @property 1383 def details(self): 1384 if self._details is None: 1385 self._details = self.fs.info(self.path) 1386 return self._details 1387 1388 @details.setter 1389 def details(self, value): 1390 self._details = value 1391 self.size = value["size"] 1392 1393 @property 1394 def full_name(self): 1395 return _unstrip_protocol(self.path, self.fs) 1396 1397 @property 1398 def closed(self): 1399 # get around this attr being read-only in IOBase 1400 # use getattr here, since this can be called during del 1401 return getattr(self, "_closed", True) 1402 1403 @closed.setter 1404 def closed(self, c): 1405 self._closed = c 1406 1407 def __hash__(self): 1408 if "w" in self.mode: 1409 return id(self) 1410 else: 1411 return int(tokenize(self.details), 16) 1412 1413 def __eq__(self, other): 1414 """Files are equal if they have the same checksum, only in read mode""" 1415 return self.mode == "rb" and other.mode == "rb" and hash(self) == hash(other) 1416 1417 def commit(self): 1418 """Move from temp to final destination""" 1419 1420 def discard(self): 1421 """Throw away temporary file""" 1422 1423 def info(self): 1424 """File information about this path""" 1425 if "r" in self.mode: 1426 return self.details 1427 else: 1428 raise ValueError("Info not available while writing") 1429 1430 def tell(self): 1431 """Current file location""" 1432 return self.loc 1433 1434 def seek(self, loc, whence=0): 1435 """Set current file location 1436 1437 Parameters 1438 ---------- 1439 loc: int 1440 byte location 1441 whence: {0, 1, 2} 1442 from start of file, current location or end of file, resp. 1443 """ 1444 loc = int(loc) 1445 if not self.mode == "rb": 1446 raise OSError(ESPIPE, "Seek only available in read mode") 1447 if whence == 0: 1448 nloc = loc 1449 elif whence == 1: 1450 nloc = self.loc + loc 1451 elif whence == 2: 1452 nloc = self.size + loc 1453 else: 1454 raise ValueError("invalid whence (%s, should be 0, 1 or 2)" % whence) 1455 if nloc < 0: 1456 raise ValueError("Seek before start of file") 1457 self.loc = nloc 1458 return self.loc 1459 1460 def write(self, data): 1461 """ 1462 Write data to buffer. 1463 1464 Buffer only sent on flush() or if buffer is greater than 1465 or equal to blocksize. 1466 1467 Parameters 1468 ---------- 1469 data: bytes 1470 Set of bytes to be written. 1471 """ 1472 if self.mode not in {"wb", "ab"}: 1473 raise ValueError("File not in write mode") 1474 if self.closed: 1475 raise ValueError("I/O operation on closed file.") 1476 if self.forced: 1477 raise ValueError("This file has been force-flushed, can only close") 1478 out = self.buffer.write(data) 1479 self.loc += out 1480 if self.buffer.tell() >= self.blocksize: 1481 self.flush() 1482 return out 1483 1484 def flush(self, force=False): 1485 """ 1486 Write buffered data to backend store. 1487 1488 Writes the current buffer, if it is larger than the block-size, or if 1489 the file is being closed. 1490 1491 Parameters 1492 ---------- 1493 force: bool 1494 When closing, write the last block even if it is smaller than 1495 blocks are allowed to be. Disallows further writing to this file. 1496 """ 1497 1498 if self.closed: 1499 raise ValueError("Flush on closed file") 1500 if force and self.forced: 1501 raise ValueError("Force flush cannot be called more than once") 1502 if force: 1503 self.forced = True 1504 1505 if self.mode not in {"wb", "ab"}: 1506 # no-op to flush on read-mode 1507 return 1508 1509 if not force and self.buffer.tell() < self.blocksize: 1510 # Defer write on small block 1511 return 1512 1513 if self.offset is None: 1514 # Initialize a multipart upload 1515 self.offset = 0 1516 try: 1517 self._initiate_upload() 1518 except: # noqa: E722 1519 self.closed = True 1520 raise 1521 1522 if self._upload_chunk(final=force) is not False: 1523 self.offset += self.buffer.seek(0, 2) 1524 self.buffer = io.BytesIO() 1525 1526 def _upload_chunk(self, final=False): 1527 """Write one part of a multi-block file upload 1528 1529 Parameters 1530 ========== 1531 final: bool 1532 This is the last block, so should complete file, if 1533 self.autocommit is True. 1534 """ 1535 # may not yet have been initialized, may need to call _initialize_upload 1536 1537 def _initiate_upload(self): 1538 """Create remote file/upload""" 1539 pass 1540 1541 def _fetch_range(self, start, end): 1542 """Get the specified set of bytes from remote""" 1543 raise NotImplementedError 1544 1545 def read(self, length=-1): 1546 """ 1547 Return data from cache, or fetch pieces as necessary 1548 1549 Parameters 1550 ---------- 1551 length: int (-1) 1552 Number of bytes to read; if <0, all remaining bytes. 1553 """ 1554 length = -1 if length is None else int(length) 1555 if self.mode != "rb": 1556 raise ValueError("File not in read mode") 1557 if length < 0: 1558 length = self.size - self.loc 1559 if self.closed: 1560 raise ValueError("I/O operation on closed file.") 1561 logger.debug("%s read: %i - %i" % (self, self.loc, self.loc + length)) 1562 if length == 0: 1563 # don't even bother calling fetch 1564 return b"" 1565 out = self.cache._fetch(self.loc, self.loc + length) 1566 self.loc += len(out) 1567 return out 1568 1569 def readinto(self, b): 1570 """mirrors builtin file's readinto method 1571 1572 https://docs.python.org/3/library/io.html#io.RawIOBase.readinto 1573 """ 1574 out = memoryview(b).cast("B") 1575 data = self.read(out.nbytes) 1576 out[: len(data)] = data 1577 return len(data) 1578 1579 def readuntil(self, char=b"\n", blocks=None): 1580 """Return data between current position and first occurrence of char 1581 1582 char is included in the output, except if the end of the tile is 1583 encountered first. 1584 1585 Parameters 1586 ---------- 1587 char: bytes 1588 Thing to find 1589 blocks: None or int 1590 How much to read in each go. Defaults to file blocksize - which may 1591 mean a new read on every call. 1592 """ 1593 out = [] 1594 while True: 1595 start = self.tell() 1596 part = self.read(blocks or self.blocksize) 1597 if len(part) == 0: 1598 break 1599 found = part.find(char) 1600 if found > -1: 1601 out.append(part[: found + len(char)]) 1602 self.seek(start + found + len(char)) 1603 break 1604 out.append(part) 1605 return b"".join(out) 1606 1607 def readline(self): 1608 """Read until first occurrence of newline character 1609 1610 Note that, because of character encoding, this is not necessarily a 1611 true line ending. 1612 """ 1613 return self.readuntil(b"\n") 1614 1615 def __next__(self): 1616 out = self.readline() 1617 if out: 1618 return out 1619 raise StopIteration 1620 1621 def __iter__(self): 1622 return self 1623 1624 def readlines(self): 1625 """Return all data, split by the newline character""" 1626 data = self.read() 1627 lines = data.split(b"\n") 1628 out = [l + b"\n" for l in lines[:-1]] 1629 if data.endswith(b"\n"): 1630 return out 1631 else: 1632 return out + [lines[-1]] 1633 # return list(self) ??? 1634 1635 def readinto1(self, b): 1636 return self.readinto(b) 1637 1638 def close(self): 1639 """Close file 1640 1641 Finalizes writes, discards cache 1642 """ 1643 if getattr(self, "_unclosable", False): 1644 return 1645 if self.closed: 1646 return 1647 if self.mode == "rb": 1648 self.cache = None 1649 else: 1650 if not self.forced: 1651 self.flush(force=True) 1652 1653 if self.fs is not None: 1654 self.fs.invalidate_cache(self.path) 1655 self.fs.invalidate_cache(self.fs._parent(self.path)) 1656 1657 self.closed = True 1658 1659 def readable(self): 1660 """Whether opened for reading""" 1661 return self.mode == "rb" and not self.closed 1662 1663 def seekable(self): 1664 """Whether is seekable (only in read mode)""" 1665 return self.readable() 1666 1667 def writable(self): 1668 """Whether opened for writing""" 1669 return self.mode in {"wb", "ab"} and not self.closed 1670 1671 def __del__(self): 1672 if not self.closed: 1673 self.close() 1674 1675 def __str__(self): 1676 return "<File-like object %s, %s>" % (type(self.fs).__name__, self.path) 1677 1678 __repr__ = __str__ 1679 1680 def __enter__(self): 1681 return self 1682 1683 def __exit__(self, *args): 1684 self.close() 1685