1# Licensed to the Apache Software Foundation (ASF) under one 2# or more contributor license agreements. See the NOTICE file 3# distributed with this work for additional information 4# regarding copyright ownership. The ASF licenses this file 5# to you under the Apache License, Version 2.0 (the 6# "License"); you may not use this file except in compliance 7# with the License. You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, 12# software distributed under the License is distributed on an 13# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14# KIND, either express or implied. See the License for the 15# specific language governing permissions and limitations 16# under the License. 17 18# cython: language_level = 3 19 20from cpython.datetime cimport datetime, PyDateTime_DateTime 21 22from pyarrow.compat import frombytes, tobytes 23from pyarrow.includes.common cimport * 24from pyarrow.includes.libarrow cimport PyDateTime_to_TimePoint 25from pyarrow.lib import _detect_compression 26from pyarrow.lib cimport * 27from pyarrow.util import _stringify_path 28 29from datetime import timezone 30import pathlib 31 32 33cdef inline c_string _path_as_bytes(path) except *: 34 # handle only abstract paths, not bound to any filesystem like pathlib is, 35 # so we only accept plain strings 36 if not isinstance(path, (bytes, str)): 37 raise TypeError('Path must be a string') 38 # tobytes always uses utf-8, which is more or less ok, at least on Windows 39 # since the C++ side then decodes from utf-8. On Unix, os.fsencode may be 40 # better. 41 return tobytes(path) 42 43 44def _normalize_path(FileSystem filesystem, path): 45 """ 46 Normalize path for the given filesystem. 47 48 The default implementation of this method is a no-op, but subclasses 49 may allow normalizing irregular path forms (such as Windows local paths). 50 """ 51 cdef c_string c_path = _path_as_bytes(path) 52 cdef c_string c_path_normalized 53 54 c_path_normalized = GetResultValue(filesystem.fs.NormalizePath(c_path)) 55 return frombytes(c_path_normalized) 56 57 58cdef class FileInfo: 59 """ 60 FileSystem entry info. 61 """ 62 63 def __init__(self): 64 raise TypeError("FileInfo cannot be instantiated directly, use " 65 "FileSystem.get_file_info method instead.") 66 67 @staticmethod 68 cdef wrap(CFileInfo info): 69 cdef FileInfo self = FileInfo.__new__(FileInfo) 70 self.info = info 71 return self 72 73 cdef inline CFileInfo unwrap(self) nogil: 74 return self.info 75 76 def __repr__(self): 77 def getvalue(attr): 78 try: 79 return getattr(self, attr) 80 except ValueError: 81 return '' 82 83 s = '<FileInfo for {!r}: type={}'.format(self.path, str(self.type)) 84 if self.is_file: 85 s += ', size={}'.format(self.size) 86 s += '>' 87 return s 88 89 @property 90 def type(self): 91 """ 92 Type of the file. 93 94 The returned enum values can be the following: 95 96 - FileType.NotFound: target does not exist 97 - FileType.Unknown: target exists but its type is unknown (could be a 98 special file such as a Unix socket or character device, or 99 Windows NUL / CON / ...) 100 - FileType.File: target is a regular file 101 - FileType.Directory: target is a regular directory 102 103 Returns 104 ------- 105 type : FileType 106 """ 107 return FileType(<int8_t> self.info.type()) 108 109 @property 110 def is_file(self): 111 """ 112 """ 113 return self.type == FileType.File 114 115 @property 116 def path(self): 117 """ 118 The full file path in the filesystem. 119 """ 120 return frombytes(self.info.path()) 121 122 @property 123 def base_name(self): 124 """ 125 The file base name. 126 127 Component after the last directory separator. 128 """ 129 return frombytes(self.info.base_name()) 130 131 @property 132 def size(self): 133 """ 134 The size in bytes, if available. 135 136 Only regular files are guaranteed to have a size. 137 """ 138 return self.info.size() if self.is_file else None 139 140 @property 141 def extension(self): 142 """ 143 The file extension. 144 """ 145 return frombytes(self.info.extension()) 146 147 @property 148 def mtime(self): 149 """ 150 The time of last modification, if available. 151 152 Returns 153 ------- 154 mtime : datetime.datetime 155 """ 156 cdef int64_t nanoseconds 157 nanoseconds = TimePoint_to_ns(self.info.mtime()) 158 return datetime.fromtimestamp(nanoseconds / 1.0e9, timezone.utc) 159 160 @property 161 def mtime_ns(self): 162 """ 163 The time of last modification, if available, expressed in nanoseconds 164 since the Unix epoch. 165 166 Returns 167 ------- 168 mtime_ns : int 169 """ 170 return TimePoint_to_ns(self.info.mtime()) 171 172 173cdef class FileSelector: 174 """ 175 File and directory selector. 176 177 It contains a set of options that describes how to search for files and 178 directories. 179 180 Parameters 181 ---------- 182 base_dir : str 183 The directory in which to select files. Relative paths also work, use 184 '.' for the current directory and '..' for the parent. 185 allow_not_found : bool, default False 186 The behavior if `base_dir` doesn't exist in the filesystem. 187 If false, an error is returned. 188 If true, an empty selection is returned. 189 recursive : bool, default False 190 Whether to recurse into subdirectories. 191 """ 192 193 def __init__(self, base_dir, bint allow_not_found=False, 194 bint recursive=False): 195 self.base_dir = base_dir 196 self.recursive = recursive 197 self.allow_not_found = allow_not_found 198 199 cdef inline CFileSelector unwrap(self) nogil: 200 return self.selector 201 202 @property 203 def base_dir(self): 204 return frombytes(self.selector.base_dir) 205 206 @base_dir.setter 207 def base_dir(self, base_dir): 208 self.selector.base_dir = _path_as_bytes(base_dir) 209 210 @property 211 def allow_not_found(self): 212 return self.selector.allow_not_found 213 214 @allow_not_found.setter 215 def allow_not_found(self, bint allow_not_found): 216 self.selector.allow_not_found = allow_not_found 217 218 @property 219 def recursive(self): 220 return self.selector.recursive 221 222 @recursive.setter 223 def recursive(self, bint recursive): 224 self.selector.recursive = recursive 225 226 def __repr__(self): 227 return ("<FileSelector base_dir={0.base_dir!r} " 228 "recursive={0.recursive}>".format(self)) 229 230 231cdef class FileSystem: 232 """ 233 Abstract file system API. 234 """ 235 236 def __init__(self): 237 raise TypeError("FileSystem is an abstract class, instantiate one of " 238 "the subclasses instead: LocalFileSystem or " 239 "SubTreeFileSystem") 240 241 @staticmethod 242 def from_uri(uri): 243 """ 244 Create a new FileSystem from URI or Path. 245 246 Recognized URI schemes are "file", "mock", "s3fs", "hdfs" and "viewfs". 247 In addition, the argument can be a pathlib.Path object, or a string 248 describing an absolute local path. 249 250 Parameters 251 ---------- 252 uri : string 253 URI-based path, for example: file:///some/local/path. 254 255 Returns 256 ------- 257 With (filesystem, path) tuple where path is the abstract path inside 258 the FileSystem instance. 259 """ 260 cdef: 261 c_string path 262 CResult[shared_ptr[CFileSystem]] result 263 264 if isinstance(uri, pathlib.Path): 265 # Make absolute 266 uri = uri.resolve().absolute() 267 uri = _stringify_path(uri) 268 result = CFileSystemFromUriOrPath(tobytes(uri), &path) 269 return FileSystem.wrap(GetResultValue(result)), frombytes(path) 270 271 cdef init(self, const shared_ptr[CFileSystem]& wrapped): 272 self.wrapped = wrapped 273 self.fs = wrapped.get() 274 275 @staticmethod 276 cdef wrap(const shared_ptr[CFileSystem]& sp): 277 cdef FileSystem self 278 279 typ = frombytes(sp.get().type_name()) 280 if typ == 'local': 281 self = LocalFileSystem.__new__(LocalFileSystem) 282 elif typ == 'mock': 283 self = _MockFileSystem.__new__(_MockFileSystem) 284 elif typ == 'subtree': 285 self = SubTreeFileSystem.__new__(SubTreeFileSystem) 286 elif typ == 's3': 287 from pyarrow._s3fs import S3FileSystem 288 self = S3FileSystem.__new__(S3FileSystem) 289 elif typ == 'hdfs': 290 from pyarrow._hdfs import HadoopFileSystem 291 self = HadoopFileSystem.__new__(HadoopFileSystem) 292 else: 293 raise TypeError('Cannot wrap FileSystem pointer') 294 295 self.init(sp) 296 return self 297 298 cdef inline shared_ptr[CFileSystem] unwrap(self) nogil: 299 return self.wrapped 300 301 def equals(self, FileSystem other): 302 return self.fs.Equals(other.unwrap()) 303 304 def __eq__(self, other): 305 try: 306 return self.equals(other) 307 except TypeError: 308 return NotImplemented 309 310 def get_file_info(self, paths_or_selector): 311 """ 312 Get info for the given files. 313 314 Any symlink is automatically dereferenced, recursively. A non-existing 315 or unreachable file returns a FileStat object and has a FileType of 316 value NotFound. An exception indicates a truly exceptional condition 317 (low-level I/O error, etc.). 318 319 Parameters 320 ---------- 321 paths_or_selector: FileSelector or list of path-likes 322 Either a selector object or a list of path-like objects. 323 The selector's base directory will not be part of the results, even 324 if it exists. If it doesn't exist, use `allow_not_found`. 325 326 Returns 327 ------- 328 file_infos : list of FileInfo 329 """ 330 cdef: 331 vector[CFileInfo] infos 332 vector[c_string] paths 333 CFileSelector selector 334 335 if isinstance(paths_or_selector, FileSelector): 336 with nogil: 337 selector = (<FileSelector>paths_or_selector).selector 338 infos = GetResultValue(self.fs.GetFileInfo(selector)) 339 elif isinstance(paths_or_selector, (list, tuple)): 340 paths = [_path_as_bytes(s) for s in paths_or_selector] 341 with nogil: 342 infos = GetResultValue(self.fs.GetFileInfo(paths)) 343 else: 344 raise TypeError('Must pass either paths or a FileSelector') 345 346 return [FileInfo.wrap(info) for info in infos] 347 348 def create_dir(self, path, *, bint recursive=True): 349 """ 350 Create a directory and subdirectories. 351 352 This function succeeds if the directory already exists. 353 354 Parameters 355 ---------- 356 path : str 357 The path of the new directory. 358 recursive: bool, default True 359 Create nested directories as well. 360 """ 361 cdef c_string directory = _path_as_bytes(path) 362 with nogil: 363 check_status(self.fs.CreateDir(directory, recursive=recursive)) 364 365 def delete_dir(self, path): 366 """Delete a directory and its contents, recursively. 367 368 Parameters 369 ---------- 370 path : str 371 The path of the directory to be deleted. 372 """ 373 cdef c_string directory = _path_as_bytes(path) 374 with nogil: 375 check_status(self.fs.DeleteDir(directory)) 376 377 def move(self, src, dest): 378 """ 379 Move / rename a file or directory. 380 381 If the destination exists: 382 - if it is a non-empty directory, an error is returned 383 - otherwise, if it has the same type as the source, it is replaced 384 - otherwise, behavior is unspecified (implementation-dependent). 385 386 Parameters 387 ---------- 388 src : str 389 The path of the file or the directory to be moved. 390 dest : str 391 The destination path where the file or directory is moved to. 392 """ 393 cdef: 394 c_string source = _path_as_bytes(src) 395 c_string destination = _path_as_bytes(dest) 396 with nogil: 397 check_status(self.fs.Move(source, destination)) 398 399 def copy_file(self, src, dest): 400 """ 401 Copy a file. 402 403 If the destination exists and is a directory, an error is returned. 404 Otherwise, it is replaced. 405 406 Parameters 407 ---------- 408 src : str 409 The path of the file to be copied from. 410 dest : str 411 The destination path where the file is copied to. 412 """ 413 cdef: 414 c_string source = _path_as_bytes(src) 415 c_string destination = _path_as_bytes(dest) 416 with nogil: 417 check_status(self.fs.CopyFile(source, destination)) 418 419 def delete_file(self, path): 420 """ 421 Delete a file. 422 423 Parameters 424 ---------- 425 path : str 426 The path of the file to be deleted. 427 """ 428 cdef c_string file = _path_as_bytes(path) 429 with nogil: 430 check_status(self.fs.DeleteFile(file)) 431 432 def _wrap_input_stream(self, stream, path, compression, buffer_size): 433 if buffer_size is not None and buffer_size != 0: 434 stream = BufferedInputStream(stream, buffer_size) 435 if compression == 'detect': 436 compression = _detect_compression(path) 437 if compression is not None: 438 stream = CompressedInputStream(stream, compression) 439 return stream 440 441 def _wrap_output_stream(self, stream, path, compression, buffer_size): 442 if buffer_size is not None and buffer_size != 0: 443 stream = BufferedOutputStream(stream, buffer_size) 444 if compression == 'detect': 445 compression = _detect_compression(path) 446 if compression is not None: 447 stream = CompressedOutputStream(stream, compression) 448 return stream 449 450 def open_input_file(self, path): 451 """ 452 Open an input file for random access reading. 453 454 Parameters 455 ---------- 456 path : str 457 The source to open for reading. 458 459 Returns 460 ------- 461 stram : NativeFile 462 """ 463 cdef: 464 c_string pathstr = _path_as_bytes(path) 465 NativeFile stream = NativeFile() 466 shared_ptr[CRandomAccessFile] in_handle 467 468 with nogil: 469 in_handle = GetResultValue(self.fs.OpenInputFile(pathstr)) 470 471 stream.set_random_access_file(in_handle) 472 stream.is_readable = True 473 return stream 474 475 def open_input_stream(self, path, compression='detect', buffer_size=None): 476 """ 477 Open an input stream for sequential reading. 478 479 Parameters 480 ---------- 481 source: str 482 The source to open for reading. 483 compression: str optional, default 'detect' 484 The compression algorithm to use for on-the-fly decompression. 485 If "detect" and source is a file path, then compression will be 486 chosen based on the file extension. 487 If None, no compression will be applied. Otherwise, a well-known 488 algorithm name must be supplied (e.g. "gzip"). 489 buffer_size: int optional, default None 490 If None or 0, no buffering will happen. Otherwise the size of the 491 temporary read buffer. 492 493 Returns 494 ------- 495 stream : NativeFile 496 """ 497 cdef: 498 c_string pathstr = _path_as_bytes(path) 499 NativeFile stream = NativeFile() 500 shared_ptr[CInputStream] in_handle 501 502 with nogil: 503 in_handle = GetResultValue(self.fs.OpenInputStream(pathstr)) 504 505 stream.set_input_stream(in_handle) 506 stream.is_readable = True 507 508 return self._wrap_input_stream( 509 stream, path=path, compression=compression, buffer_size=buffer_size 510 ) 511 512 def open_output_stream(self, path, compression='detect', buffer_size=None): 513 """ 514 Open an output stream for sequential writing. 515 516 If the target already exists, existing data is truncated. 517 518 Parameters 519 ---------- 520 path : str 521 The source to open for writing. 522 compression: str optional, default 'detect' 523 The compression algorithm to use for on-the-fly compression. 524 If "detect" and source is a file path, then compression will be 525 chosen based on the file extension. 526 If None, no compression will be applied. Otherwise, a well-known 527 algorithm name must be supplied (e.g. "gzip"). 528 buffer_size: int optional, default None 529 If None or 0, no buffering will happen. Otherwise the size of the 530 temporary write buffer. 531 532 Returns 533 ------- 534 stream : NativeFile 535 """ 536 cdef: 537 c_string pathstr = _path_as_bytes(path) 538 NativeFile stream = NativeFile() 539 shared_ptr[COutputStream] out_handle 540 541 with nogil: 542 out_handle = GetResultValue(self.fs.OpenOutputStream(pathstr)) 543 544 stream.set_output_stream(out_handle) 545 stream.is_writable = True 546 547 return self._wrap_output_stream( 548 stream, path=path, compression=compression, buffer_size=buffer_size 549 ) 550 551 def open_append_stream(self, path, compression='detect', buffer_size=None): 552 """ 553 Open an output stream for appending. 554 555 If the target doesn't exist, a new empty file is created. 556 557 Parameters 558 ---------- 559 path : str 560 The source to open for writing. 561 compression: str optional, default 'detect' 562 The compression algorithm to use for on-the-fly compression. 563 If "detect" and source is a file path, then compression will be 564 chosen based on the file extension. 565 If None, no compression will be applied. Otherwise, a well-known 566 algorithm name must be supplied (e.g. "gzip"). 567 buffer_size: int optional, default None 568 If None or 0, no buffering will happen. Otherwise the size of the 569 temporary write buffer. 570 571 Returns 572 ------- 573 stream : NativeFile 574 """ 575 cdef: 576 c_string pathstr = _path_as_bytes(path) 577 NativeFile stream = NativeFile() 578 shared_ptr[COutputStream] out_handle 579 580 with nogil: 581 out_handle = GetResultValue(self.fs.OpenAppendStream(pathstr)) 582 583 stream.set_output_stream(out_handle) 584 stream.is_writable = True 585 586 return self._wrap_output_stream( 587 stream, path=path, compression=compression, buffer_size=buffer_size 588 ) 589 590 591cdef class LocalFileSystem(FileSystem): 592 """ 593 A FileSystem implementation accessing files on the local machine. 594 595 Details such as symlinks are abstracted away (symlinks are always followed, 596 except when deleting an entry). 597 598 Parameters 599 ---------- 600 use_mmap: bool, default False 601 Whether open_input_stream and open_input_file should return 602 a mmap'ed file or a regular file. 603 """ 604 605 def __init__(self, use_mmap=False): 606 cdef: 607 CLocalFileSystemOptions opts 608 shared_ptr[CLocalFileSystem] fs 609 610 opts = CLocalFileSystemOptions.Defaults() 611 opts.use_mmap = use_mmap 612 613 fs = make_shared[CLocalFileSystem](opts) 614 self.init(<shared_ptr[CFileSystem]> fs) 615 616 cdef init(self, const shared_ptr[CFileSystem]& c_fs): 617 FileSystem.init(self, c_fs) 618 self.localfs = <CLocalFileSystem*> c_fs.get() 619 620 def __reduce__(self): 621 cdef CLocalFileSystemOptions opts = self.localfs.options() 622 return LocalFileSystem, (opts.use_mmap,) 623 624 625cdef class SubTreeFileSystem(FileSystem): 626 """ 627 Delegates to another implementation after prepending a fixed base path. 628 629 This is useful to expose a logical view of a subtree of a filesystem, 630 for example a directory in a LocalFileSystem. 631 632 Note, that this makes no security guarantee. For example, symlinks may 633 allow to "escape" the subtree and access other parts of the underlying 634 filesystem. 635 636 Parameters 637 ---------- 638 base_path: str 639 The root of the subtree. 640 base_fs: FileSystem 641 FileSystem object the operations delegated to. 642 """ 643 644 def __init__(self, base_path, FileSystem base_fs): 645 cdef: 646 c_string pathstr 647 shared_ptr[CSubTreeFileSystem] wrapped 648 649 pathstr = _path_as_bytes(base_path) 650 wrapped = make_shared[CSubTreeFileSystem](pathstr, base_fs.wrapped) 651 652 self.init(<shared_ptr[CFileSystem]> wrapped) 653 654 cdef init(self, const shared_ptr[CFileSystem]& wrapped): 655 FileSystem.init(self, wrapped) 656 self.subtreefs = <CSubTreeFileSystem*> wrapped.get() 657 658 def __reduce__(self): 659 return SubTreeFileSystem, ( 660 frombytes(self.subtreefs.base_path()), 661 FileSystem.wrap(self.subtreefs.base_fs()) 662 ) 663 664 @property 665 def base_path(self): 666 return frombytes(self.subtreefs.base_path()) 667 668 @property 669 def base_fs(self): 670 return FileSystem.wrap(self.subtreefs.base_fs()) 671 672 673cdef class _MockFileSystem(FileSystem): 674 675 def __init__(self, datetime current_time=None): 676 cdef shared_ptr[CMockFileSystem] wrapped 677 678 current_time = current_time or datetime.now() 679 wrapped = make_shared[CMockFileSystem]( 680 PyDateTime_to_TimePoint(<PyDateTime_DateTime*> current_time) 681 ) 682 683 self.init(<shared_ptr[CFileSystem]> wrapped) 684 685 cdef init(self, const shared_ptr[CFileSystem]& wrapped): 686 FileSystem.init(self, wrapped) 687 self.mockfs = <CMockFileSystem*> wrapped.get() 688