1from __future__ import absolute_import, division, print_function 2 3import io 4import logging 5import os 6import re 7from glob import has_magic 8 9# for backwards compat, we export cache things from here too 10from .caching import ( # noqa: F401 11 BaseCache, 12 BlockCache, 13 BytesCache, 14 MMapCache, 15 ReadAheadCache, 16 caches, 17) 18from .compression import compr 19from .registry import filesystem, get_filesystem_class 20from .utils import ( 21 _unstrip_protocol, 22 build_name_function, 23 infer_compression, 24 stringify_path, 25 update_storage_options, 26) 27 28logger = logging.getLogger("fsspec") 29 30 31class OpenFile(object): 32 """ 33 File-like object to be used in a context 34 35 Can layer (buffered) text-mode and compression over any file-system, which 36 are typically binary-only. 37 38 These instances are safe to serialize, as the low-level file object 39 is not created until invoked using `with`. 40 41 Parameters 42 ---------- 43 fs: FileSystem 44 The file system to use for opening the file. Should be a subclass or duck-type 45 with ``fsspec.spec.AbstractFileSystem`` 46 path: str 47 Location to open 48 mode: str like 'rb', optional 49 Mode of the opened file 50 compression: str or None, optional 51 Compression to apply 52 encoding: str or None, optional 53 The encoding to use if opened in text mode. 54 errors: str or None, optional 55 How to handle encoding errors if opened in text mode. 56 newline: None or str 57 Passed to TextIOWrapper in text mode, how to handle line endings. 58 """ 59 60 def __init__( 61 self, 62 fs, 63 path, 64 mode="rb", 65 compression=None, 66 encoding=None, 67 errors=None, 68 newline=None, 69 ): 70 self.fs = fs 71 self.path = path 72 self.mode = mode 73 self.compression = get_compression(path, compression) 74 self.encoding = encoding 75 self.errors = errors 76 self.newline = newline 77 self.fobjects = [] 78 79 def __reduce__(self): 80 return ( 81 OpenFile, 82 ( 83 self.fs, 84 self.path, 85 self.mode, 86 self.compression, 87 self.encoding, 88 self.errors, 89 self.newline, 90 ), 91 ) 92 93 def __repr__(self): 94 return "<OpenFile '{}'>".format(self.path) 95 96 def __fspath__(self): 97 # may raise if cannot be resolved to local file 98 return self.open().__fspath__() 99 100 def __enter__(self): 101 mode = self.mode.replace("t", "").replace("b", "") + "b" 102 103 f = self.fs.open(self.path, mode=mode) 104 105 self.fobjects = [f] 106 107 if self.compression is not None: 108 compress = compr[self.compression] 109 f = compress(f, mode=mode[0]) 110 self.fobjects.append(f) 111 112 if "b" not in self.mode: 113 # assume, for example, that 'r' is equivalent to 'rt' as in builtin 114 f = io.TextIOWrapper( 115 f, encoding=self.encoding, errors=self.errors, newline=self.newline 116 ) 117 self.fobjects.append(f) 118 119 return self.fobjects[-1] 120 121 def __exit__(self, *args): 122 self.close() 123 124 def __del__(self): 125 if hasattr(self, "fobjects"): 126 self.fobjects.clear() # may cause cleanup of objects and close files 127 128 @property 129 def full_name(self): 130 return _unstrip_protocol(self.path, self.fs) 131 132 def open(self): 133 """Materialise this as a real open file without context 134 135 The file should be explicitly closed to avoid enclosed file 136 instances persisting. This code-path monkey-patches the file-like 137 objects, so they can close even if the parent OpenFile object has already 138 been deleted; but a with-context is better style. 139 """ 140 out = self.__enter__() 141 closer = out.close 142 fobjects = self.fobjects.copy()[:-1] 143 mode = self.mode 144 145 def close(): 146 # this func has no reference to 147 closer() # original close bound method of the final file-like 148 _close(fobjects, mode) # call close on other dependent file-likes 149 150 out.close = close 151 return out 152 153 def close(self): 154 """Close all encapsulated file objects""" 155 _close(self.fobjects, self.mode) 156 157 158class OpenFiles(list): 159 """List of OpenFile instances 160 161 Can be used in a single context, which opens and closes all of the 162 contained files. Normal list access to get the elements works as 163 normal. 164 165 A special case is made for caching filesystems - the files will 166 be down/uploaded together at the start or end of the context, and 167 this may happen concurrently, if the target filesystem supports it. 168 """ 169 170 def __init__(self, *args, mode="rb", fs=None): 171 self.mode = mode 172 self.fs = fs 173 self.files = [] 174 super().__init__(*args) 175 176 def __enter__(self): 177 if self.fs is None: 178 raise ValueError("Context has already been used") 179 180 fs = self.fs 181 while True: 182 if hasattr(fs, "open_many"): 183 # check for concurrent cache download; or set up for upload 184 self.files = fs.open_many(self) 185 return self.files 186 if hasattr(fs, "fs") and fs.fs is not None: 187 fs = fs.fs 188 else: 189 break 190 return [s.__enter__() for s in self] 191 192 def __exit__(self, *args): 193 fs = self.fs 194 if "r" not in self.mode: 195 while True: 196 if hasattr(fs, "open_many"): 197 # check for concurrent cache upload 198 fs.commit_many(self.files) 199 self.files.clear() 200 return 201 if hasattr(fs, "fs") and fs.fs is not None: 202 fs = fs.fs 203 else: 204 break 205 [s.__exit__(*args) for s in self] 206 207 def __repr__(self): 208 return "<List of %s OpenFile instances>" % len(self) 209 210 211def _close(fobjects, mode): 212 for f in reversed(fobjects): 213 if "r" not in mode and not f.closed: 214 f.flush() 215 f.close() 216 fobjects.clear() 217 218 219def open_files( 220 urlpath, 221 mode="rb", 222 compression=None, 223 encoding="utf8", 224 errors=None, 225 name_function=None, 226 num=1, 227 protocol=None, 228 newline=None, 229 auto_mkdir=True, 230 expand=True, 231 **kwargs, 232): 233 """Given a path or paths, return a list of ``OpenFile`` objects. 234 235 For writing, a str path must contain the "*" character, which will be filled 236 in by increasing numbers, e.g., "part*" -> "part1", "part2" if num=2. 237 238 For either reading or writing, can instead provide explicit list of paths. 239 240 Parameters 241 ---------- 242 urlpath: string or list 243 Absolute or relative filepath(s). Prefix with a protocol like ``s3://`` 244 to read from alternative filesystems. To read from multiple files you 245 can pass a globstring or a list of paths, with the caveat that they 246 must all have the same protocol. 247 mode: 'rb', 'wt', etc. 248 compression: string or None 249 If given, open file using compression codec. Can either be a compression 250 name (a key in ``fsspec.compression.compr``) or "infer" to guess the 251 compression from the filename suffix. 252 encoding: str 253 For text mode only 254 errors: None or str 255 Passed to TextIOWrapper in text mode 256 name_function: function or None 257 if opening a set of files for writing, those files do not yet exist, 258 so we need to generate their names by formatting the urlpath for 259 each sequence number 260 num: int [1] 261 if writing mode, number of files we expect to create (passed to 262 name+function) 263 protocol: str or None 264 If given, overrides the protocol found in the URL. 265 newline: bytes or None 266 Used for line terminator in text mode. If None, uses system default; 267 if blank, uses no translation. 268 auto_mkdir: bool (True) 269 If in write mode, this will ensure the target directory exists before 270 writing, by calling ``fs.mkdirs(exist_ok=True)``. 271 expand: bool 272 **kwargs: dict 273 Extra options that make sense to a particular storage connection, e.g. 274 host, port, username, password, etc. 275 276 Examples 277 -------- 278 >>> files = open_files('2015-*-*.csv') # doctest: +SKIP 279 >>> files = open_files( 280 ... 's3://bucket/2015-*-*.csv.gz', compression='gzip' 281 ... ) # doctest: +SKIP 282 283 Returns 284 ------- 285 An ``OpenFiles`` instance, which is a list of ``OpenFile`` objects that can 286 be used as a single context 287 """ 288 fs, fs_token, paths = get_fs_token_paths( 289 urlpath, 290 mode, 291 num=num, 292 name_function=name_function, 293 storage_options=kwargs, 294 protocol=protocol, 295 expand=expand, 296 ) 297 if "r" not in mode and auto_mkdir: 298 parents = {fs._parent(path) for path in paths} 299 [fs.makedirs(parent, exist_ok=True) for parent in parents] 300 return OpenFiles( 301 [ 302 OpenFile( 303 fs, 304 path, 305 mode=mode, 306 compression=compression, 307 encoding=encoding, 308 errors=errors, 309 newline=newline, 310 ) 311 for path in paths 312 ], 313 mode=mode, 314 fs=fs, 315 ) 316 317 318def _un_chain(path, kwargs): 319 if isinstance(path, (tuple, list)): 320 bits = [_un_chain(p, kwargs) for p in path] 321 out = [] 322 for pbit in zip(*bits): 323 paths, protocols, kwargs = zip(*pbit) 324 if len(set(protocols)) > 1: 325 raise ValueError("Protocol mismatch in URL chain") 326 if len(set(paths)) == 1: 327 paths = paths[0] 328 else: 329 paths = list(paths) 330 out.append([paths, protocols[0], kwargs[0]]) 331 return out 332 x = re.compile(".*[^a-z]+.*") # test for non protocol-like single word 333 bits = ( 334 [p if "://" in p or x.match(p) else p + "://" for p in path.split("::")] 335 if "::" in path 336 else [path] 337 ) 338 if len(bits) < 2: 339 return [] 340 # [[url, protocol, kwargs], ...] 341 out = [] 342 previous_bit = None 343 for bit in reversed(bits): 344 protocol = split_protocol(bit)[0] or "file" 345 cls = get_filesystem_class(protocol) 346 extra_kwargs = cls._get_kwargs_from_urls(bit) 347 kws = kwargs.get(protocol, {}) 348 kw = dict(**extra_kwargs, **kws) 349 bit = cls._strip_protocol(bit) 350 if ( 351 protocol in {"blockcache", "filecache", "simplecache"} 352 and "target_protocol" not in kw 353 ): 354 bit = previous_bit 355 out.append((bit, protocol, kw)) 356 previous_bit = bit 357 out = list(reversed(out)) 358 return out 359 360 361def url_to_fs(url, **kwargs): 362 """ 363 Turn fully-qualified and potentially chained URL into filesystem instance 364 365 Parameters 366 ---------- 367 url : str 368 The fsspec-compatible URL 369 **kwargs: dict 370 Extra options that make sense to a particular storage connection, e.g. 371 host, port, username, password, etc. 372 373 Returns 374 ------- 375 filesystem : FileSystem 376 The new filesystem discovered from ``url`` and created with 377 ``**kwargs``. 378 urlpath : str 379 The file-systems-specific URL for ``url``. 380 """ 381 chain = _un_chain(url, kwargs) 382 if len(chain) > 1: 383 inkwargs = {} 384 # Reverse iterate the chain, creating a nested target_* structure 385 for i, ch in enumerate(reversed(chain)): 386 urls, protocol, kw = ch 387 if i == len(chain) - 1: 388 inkwargs = dict(**kw, **inkwargs) 389 continue 390 inkwargs["target_options"] = dict(**kw, **inkwargs) 391 inkwargs["target_protocol"] = protocol 392 inkwargs["fo"] = urls 393 urlpath, protocol, _ = chain[0] 394 fs = filesystem(protocol, **inkwargs) 395 else: 396 protocol = split_protocol(url)[0] 397 cls = get_filesystem_class(protocol) 398 399 options = cls._get_kwargs_from_urls(url) 400 update_storage_options(options, kwargs) 401 fs = cls(**options) 402 urlpath = fs._strip_protocol(url) 403 return fs, urlpath 404 405 406def open( 407 urlpath, 408 mode="rb", 409 compression=None, 410 encoding="utf8", 411 errors=None, 412 protocol=None, 413 newline=None, 414 **kwargs, 415): 416 """Given a path or paths, return one ``OpenFile`` object. 417 418 Parameters 419 ---------- 420 urlpath: string or list 421 Absolute or relative filepath. Prefix with a protocol like ``s3://`` 422 to read from alternative filesystems. Should not include glob 423 character(s). 424 mode: 'rb', 'wt', etc. 425 compression: string or None 426 If given, open file using compression codec. Can either be a compression 427 name (a key in ``fsspec.compression.compr``) or "infer" to guess the 428 compression from the filename suffix. 429 encoding: str 430 For text mode only 431 errors: None or str 432 Passed to TextIOWrapper in text mode 433 protocol: str or None 434 If given, overrides the protocol found in the URL. 435 newline: bytes or None 436 Used for line terminator in text mode. If None, uses system default; 437 if blank, uses no translation. 438 **kwargs: dict 439 Extra options that make sense to a particular storage connection, e.g. 440 host, port, username, password, etc. 441 442 Examples 443 -------- 444 >>> openfile = open('2015-01-01.csv') # doctest: +SKIP 445 >>> openfile = open( 446 ... 's3://bucket/2015-01-01.csv.gz', compression='gzip' 447 ... ) # doctest: +SKIP 448 >>> with openfile as f: 449 ... df = pd.read_csv(f) # doctest: +SKIP 450 ... 451 452 Returns 453 ------- 454 ``OpenFile`` object. 455 """ 456 return open_files( 457 urlpath=[urlpath], 458 mode=mode, 459 compression=compression, 460 encoding=encoding, 461 errors=errors, 462 protocol=protocol, 463 newline=newline, 464 expand=False, 465 **kwargs, 466 )[0] 467 468 469def open_local(url, mode="rb", **storage_options): 470 """Open file(s) which can be resolved to local 471 472 For files which either are local, or get downloaded upon open 473 (e.g., by file caching) 474 475 Parameters 476 ---------- 477 url: str or list(str) 478 mode: str 479 Must be read mode 480 storage_options: 481 passed on to FS for or used by open_files (e.g., compression) 482 """ 483 if "r" not in mode: 484 raise ValueError("Can only ensure local files when reading") 485 of = open_files(url, mode=mode, **storage_options) 486 if not getattr(of[0].fs, "local_file", False): 487 raise ValueError( 488 "open_local can only be used on a filesystem which" 489 " has attribute local_file=True" 490 ) 491 with of as files: 492 paths = [f.name for f in files] 493 if isinstance(url, str) and not has_magic(url): 494 return paths[0] 495 return paths 496 497 498def get_compression(urlpath, compression): 499 if compression == "infer": 500 compression = infer_compression(urlpath) 501 if compression is not None and compression not in compr: 502 raise ValueError("Compression type %s not supported" % compression) 503 return compression 504 505 506def split_protocol(urlpath): 507 """Return protocol, path pair""" 508 urlpath = stringify_path(urlpath) 509 if "://" in urlpath: 510 protocol, path = urlpath.split("://", 1) 511 if len(protocol) > 1: 512 # excludes Windows paths 513 return protocol, path 514 return None, urlpath 515 516 517def strip_protocol(urlpath): 518 """Return only path part of full URL, according to appropriate backend""" 519 protocol, _ = split_protocol(urlpath) 520 cls = get_filesystem_class(protocol) 521 return cls._strip_protocol(urlpath) 522 523 524def expand_paths_if_needed(paths, mode, num, fs, name_function): 525 """Expand paths if they have a ``*`` in them. 526 527 :param paths: list of paths 528 mode: str 529 Mode in which to open files. 530 num: int 531 If opening in writing mode, number of files we expect to create. 532 fs: filesystem object 533 name_function: callable 534 If opening in writing mode, this callable is used to generate path 535 names. Names are generated for each partition by 536 ``urlpath.replace('*', name_function(partition_index))``. 537 :return: list of paths 538 """ 539 expanded_paths = [] 540 paths = list(paths) 541 if "w" in mode and sum([1 for p in paths if "*" in p]) > 1: 542 raise ValueError("When writing data, only one filename mask can be specified.") 543 elif "w" in mode: 544 num = max(num, len(paths)) 545 for curr_path in paths: 546 if "*" in curr_path: 547 if "w" in mode: 548 # expand using name_function 549 expanded_paths.extend(_expand_paths(curr_path, name_function, num)) 550 else: 551 # expand using glob 552 expanded_paths.extend(fs.glob(curr_path)) 553 else: 554 expanded_paths.append(curr_path) 555 # if we generated more paths that asked for, trim the list 556 if "w" in mode and len(expanded_paths) > num: 557 expanded_paths = expanded_paths[:num] 558 return expanded_paths 559 560 561def get_fs_token_paths( 562 urlpath, 563 mode="rb", 564 num=1, 565 name_function=None, 566 storage_options=None, 567 protocol=None, 568 expand=True, 569): 570 """Filesystem, deterministic token, and paths from a urlpath and options. 571 572 Parameters 573 ---------- 574 urlpath: string or iterable 575 Absolute or relative filepath, URL (may include protocols like 576 ``s3://``), or globstring pointing to data. 577 mode: str, optional 578 Mode in which to open files. 579 num: int, optional 580 If opening in writing mode, number of files we expect to create. 581 name_function: callable, optional 582 If opening in writing mode, this callable is used to generate path 583 names. Names are generated for each partition by 584 ``urlpath.replace('*', name_function(partition_index))``. 585 storage_options: dict, optional 586 Additional keywords to pass to the filesystem class. 587 protocol: str or None 588 To override the protocol specifier in the URL 589 expand: bool 590 Expand string paths for writing, assuming the path is a directory 591 """ 592 if isinstance(urlpath, (list, tuple, set)): 593 if not urlpath: 594 raise ValueError("empty urlpath sequence") 595 urlpath = [stringify_path(u) for u in urlpath] 596 else: 597 urlpath = stringify_path(urlpath) 598 chain = _un_chain(urlpath, storage_options or {}) 599 if len(chain) > 1: 600 inkwargs = {} 601 # Reverse iterate the chain, creating a nested target_* structure 602 for i, ch in enumerate(reversed(chain)): 603 urls, nested_protocol, kw = ch 604 if i == len(chain) - 1: 605 inkwargs = dict(**kw, **inkwargs) 606 continue 607 inkwargs["target_options"] = dict(**kw, **inkwargs) 608 inkwargs["target_protocol"] = nested_protocol 609 inkwargs["fo"] = urls 610 paths, protocol, _ = chain[0] 611 fs = filesystem(protocol, **inkwargs) 612 if isinstance(paths, (list, tuple, set)): 613 paths = [fs._strip_protocol(u) for u in paths] 614 else: 615 paths = fs._strip_protocol(paths) 616 else: 617 if isinstance(urlpath, (list, tuple, set)): 618 protocols, paths = zip(*map(split_protocol, urlpath)) 619 if protocol is None: 620 protocol = protocols[0] 621 if not all(p == protocol for p in protocols): 622 raise ValueError( 623 "When specifying a list of paths, all paths must " 624 "share the same protocol" 625 ) 626 cls = get_filesystem_class(protocol) 627 optionss = list(map(cls._get_kwargs_from_urls, urlpath)) 628 paths = [cls._strip_protocol(u) for u in urlpath] 629 options = optionss[0] 630 if not all(o == options for o in optionss): 631 raise ValueError( 632 "When specifying a list of paths, all paths must " 633 "share the same file-system options" 634 ) 635 update_storage_options(options, storage_options) 636 fs = cls(**options) 637 else: 638 protocols = split_protocol(urlpath)[0] 639 protocol = protocol or protocols 640 cls = get_filesystem_class(protocol) 641 options = cls._get_kwargs_from_urls(urlpath) 642 paths = cls._strip_protocol(urlpath) 643 update_storage_options(options, storage_options) 644 fs = cls(**options) 645 646 if isinstance(paths, (list, tuple, set)): 647 paths = expand_paths_if_needed(paths, mode, num, fs, name_function) 648 else: 649 if "w" in mode and expand: 650 paths = _expand_paths(paths, name_function, num) 651 elif "*" in paths: 652 paths = [f for f in sorted(fs.glob(paths)) if not fs.isdir(f)] 653 else: 654 paths = [paths] 655 656 return fs, fs._fs_token, paths 657 658 659def _expand_paths(path, name_function, num): 660 if isinstance(path, str): 661 if path.count("*") > 1: 662 raise ValueError("Output path spec must contain exactly one '*'.") 663 elif "*" not in path: 664 path = os.path.join(path, "*.part") 665 666 if name_function is None: 667 name_function = build_name_function(num - 1) 668 669 paths = [path.replace("*", name_function(i)) for i in range(num)] 670 if paths != sorted(paths): 671 logger.warning( 672 "In order to preserve order between partitions" 673 " paths created with ``name_function`` should " 674 "sort to partition order" 675 ) 676 elif isinstance(path, (tuple, list)): 677 assert len(path) == num 678 paths = list(path) 679 else: 680 raise ValueError( 681 "Path should be either\n" 682 "1. A list of paths: ['foo.json', 'bar.json', ...]\n" 683 "2. A directory: 'foo/\n" 684 "3. A path with a '*' in it: 'foo.*.json'" 685 ) 686 return paths 687