1# Licensed under a 3-clause BSD style license - see PYFITS.rst 2 3import gzip 4import itertools 5import os 6import re 7import shutil 8import sys 9import warnings 10 11import numpy as np 12 13from . import compressed 14from .base import _BaseHDU, _ValidHDU, _NonstandardHDU, ExtensionHDU 15from .groups import GroupsHDU 16from .image import PrimaryHDU, ImageHDU 17from astropy.io.fits.file import _File, FILE_MODES 18from astropy.io.fits.header import _pad_length 19from astropy.io.fits.util import (_free_space_check, _get_array_mmap, _is_int, 20 _tmp_name, fileobj_closed, fileobj_mode, 21 ignore_sigint, isfile) 22from astropy.io.fits.verify import _Verify, _ErrList, VerifyError, VerifyWarning 23from astropy.utils import indent 24from astropy.utils.exceptions import AstropyUserWarning 25from astropy.utils.decorators import deprecated_renamed_argument 26 27# NOTE: Python can be built without bz2. 28from astropy.utils.compat.optional_deps import HAS_BZ2 29if HAS_BZ2: 30 import bz2 31 32__all__ = ["HDUList", "fitsopen"] 33 34# FITS file signature as per RFC 4047 35FITS_SIGNATURE = b'SIMPLE = T' 36 37 38def fitsopen(name, mode='readonly', memmap=None, save_backup=False, 39 cache=True, lazy_load_hdus=None, ignore_missing_simple=False, 40 **kwargs): 41 """Factory function to open a FITS file and return an `HDUList` object. 42 43 Parameters 44 ---------- 45 name : str, file-like or `pathlib.Path` 46 File to be opened. 47 48 mode : str, optional 49 Open mode, 'readonly', 'update', 'append', 'denywrite', or 50 'ostream'. Default is 'readonly'. 51 52 If ``name`` is a file object that is already opened, ``mode`` must 53 match the mode the file was opened with, readonly (rb), update (rb+), 54 append (ab+), ostream (w), denywrite (rb)). 55 56 memmap : bool, optional 57 Is memory mapping to be used? This value is obtained from the 58 configuration item ``astropy.io.fits.Conf.use_memmap``. 59 Default is `True`. 60 61 save_backup : bool, optional 62 If the file was opened in update or append mode, this ensures that 63 a backup of the original file is saved before any changes are flushed. 64 The backup has the same name as the original file with ".bak" appended. 65 If "file.bak" already exists then "file.bak.1" is used, and so on. 66 Default is `False`. 67 68 cache : bool, optional 69 If the file name is a URL, `~astropy.utils.data.download_file` is used 70 to open the file. This specifies whether or not to save the file 71 locally in Astropy's download cache. Default is `True`. 72 73 lazy_load_hdus : bool, optional 74 To avoid reading all the HDUs and headers in a FITS file immediately 75 upon opening. This is an optimization especially useful for large 76 files, as FITS has no way of determining the number and offsets of all 77 the HDUs in a file without scanning through the file and reading all 78 the headers. Default is `True`. 79 80 To disable lazy loading and read all HDUs immediately (the old 81 behavior) use ``lazy_load_hdus=False``. This can lead to fewer 82 surprises--for example with lazy loading enabled, ``len(hdul)`` 83 can be slow, as it means the entire FITS file needs to be read in 84 order to determine the number of HDUs. ``lazy_load_hdus=False`` 85 ensures that all HDUs have already been loaded after the file has 86 been opened. 87 88 .. versionadded:: 1.3 89 90 uint : bool, optional 91 Interpret signed integer data where ``BZERO`` is the central value and 92 ``BSCALE == 1`` as unsigned integer data. For example, ``int16`` data 93 with ``BZERO = 32768`` and ``BSCALE = 1`` would be treated as 94 ``uint16`` data. Default is `True` so that the pseudo-unsigned 95 integer convention is assumed. 96 97 ignore_missing_end : bool, optional 98 Do not raise an exception when opening a file that is missing an 99 ``END`` card in the last header. Default is `False`. 100 101 ignore_missing_simple : bool, optional 102 Do not raise an exception when the SIMPLE keyword is missing. Note 103 that io.fits will raise a warning if a SIMPLE card is present but 104 written in a way that does not follow the FITS Standard. 105 Default is `False`. 106 107 .. versionadded:: 4.2 108 109 checksum : bool, str, optional 110 If `True`, verifies that both ``DATASUM`` and ``CHECKSUM`` card values 111 (when present in the HDU header) match the header and data of all HDU's 112 in the file. Updates to a file that already has a checksum will 113 preserve and update the existing checksums unless this argument is 114 given a value of 'remove', in which case the CHECKSUM and DATASUM 115 values are not checked, and are removed when saving changes to the 116 file. Default is `False`. 117 118 disable_image_compression : bool, optional 119 If `True`, treats compressed image HDU's like normal binary table 120 HDU's. Default is `False`. 121 122 do_not_scale_image_data : bool, optional 123 If `True`, image data is not scaled using BSCALE/BZERO values 124 when read. Default is `False`. 125 126 character_as_bytes : bool, optional 127 Whether to return bytes for string columns, otherwise unicode strings 128 are returned, but this does not respect memory mapping and loads the 129 whole column in memory when accessed. Default is `False`. 130 131 ignore_blank : bool, optional 132 If `True`, the BLANK keyword is ignored if present. 133 Default is `False`. 134 135 scale_back : bool, optional 136 If `True`, when saving changes to a file that contained scaled image 137 data, restore the data to the original type and reapply the original 138 BSCALE/BZERO values. This could lead to loss of accuracy if scaling 139 back to integer values after performing floating point operations on 140 the data. Default is `False`. 141 142 output_verify : str 143 Output verification option. Must be one of ``"fix"``, 144 ``"silentfix"``, ``"ignore"``, ``"warn"``, or 145 ``"exception"``. May also be any combination of ``"fix"`` or 146 ``"silentfix"`` with ``"+ignore"``, ``+warn``, or ``+exception" 147 (e.g. ``"fix+warn"``). See :ref:`astropy:verify` for more info. 148 149 Returns 150 ------- 151 hdulist : `HDUList` 152 `HDUList` containing all of the header data units in the file. 153 154 """ 155 156 from astropy.io.fits import conf 157 158 if memmap is None: 159 # distinguish between True (kwarg explicitly set) 160 # and None (preference for memmap in config, might be ignored) 161 memmap = None if conf.use_memmap else False 162 else: 163 memmap = bool(memmap) 164 165 if lazy_load_hdus is None: 166 lazy_load_hdus = conf.lazy_load_hdus 167 else: 168 lazy_load_hdus = bool(lazy_load_hdus) 169 170 if 'uint' not in kwargs: 171 kwargs['uint'] = conf.enable_uint 172 173 if not name: 174 raise ValueError(f'Empty filename: {name!r}') 175 176 return HDUList.fromfile(name, mode, memmap, save_backup, cache, 177 lazy_load_hdus, ignore_missing_simple, **kwargs) 178 179 180class HDUList(list, _Verify): 181 """ 182 HDU list class. This is the top-level FITS object. When a FITS 183 file is opened, a `HDUList` object is returned. 184 """ 185 186 def __init__(self, hdus=[], file=None): 187 """ 188 Construct a `HDUList` object. 189 190 Parameters 191 ---------- 192 hdus : BaseHDU or sequence thereof, optional 193 The HDU object(s) to comprise the `HDUList`. Should be 194 instances of HDU classes like `ImageHDU` or `BinTableHDU`. 195 196 file : file-like, bytes, optional 197 The opened physical file associated with the `HDUList` 198 or a bytes object containing the contents of the FITS 199 file. 200 """ 201 202 if isinstance(file, bytes): 203 self._data = file 204 self._file = None 205 else: 206 self._file = file 207 self._data = None 208 209 # For internal use only--the keyword args passed to fitsopen / 210 # HDUList.fromfile/string when opening the file 211 self._open_kwargs = {} 212 self._in_read_next_hdu = False 213 214 # If we have read all the HDUs from the file or not 215 # The assumes that all HDUs have been written when we first opened the 216 # file; we do not currently support loading additional HDUs from a file 217 # while it is being streamed to. In the future that might be supported 218 # but for now this is only used for the purpose of lazy-loading of 219 # existing HDUs. 220 if file is None: 221 self._read_all = True 222 elif self._file is not None: 223 # Should never attempt to read HDUs in ostream mode 224 self._read_all = self._file.mode == 'ostream' 225 else: 226 self._read_all = False 227 228 if hdus is None: 229 hdus = [] 230 231 # can take one HDU, as well as a list of HDU's as input 232 if isinstance(hdus, _ValidHDU): 233 hdus = [hdus] 234 elif not isinstance(hdus, (HDUList, list)): 235 raise TypeError("Invalid input for HDUList.") 236 237 for idx, hdu in enumerate(hdus): 238 if not isinstance(hdu, _BaseHDU): 239 raise TypeError(f"Element {idx} in the HDUList input is not an HDU.") 240 241 super().__init__(hdus) 242 243 if file is None: 244 # Only do this when initializing from an existing list of HDUs 245 # When initializing from a file, this will be handled by the 246 # append method after the first HDU is read 247 self.update_extend() 248 249 def __len__(self): 250 if not self._in_read_next_hdu: 251 self.readall() 252 253 return super().__len__() 254 255 def __repr__(self): 256 # In order to correctly repr an HDUList we need to load all the 257 # HDUs as well 258 self.readall() 259 260 return super().__repr__() 261 262 def __iter__(self): 263 # While effectively this does the same as: 264 # for idx in range(len(self)): 265 # yield self[idx] 266 # the more complicated structure is here to prevent the use of len(), 267 # which would break the lazy loading 268 for idx in itertools.count(): 269 try: 270 yield self[idx] 271 except IndexError: 272 break 273 274 def __getitem__(self, key): 275 """ 276 Get an HDU from the `HDUList`, indexed by number or name. 277 """ 278 279 # If the key is a slice we need to make sure the necessary HDUs 280 # have been loaded before passing the slice on to super. 281 if isinstance(key, slice): 282 max_idx = key.stop 283 # Check for and handle the case when no maximum was 284 # specified (e.g. [1:]). 285 if max_idx is None: 286 # We need all of the HDUs, so load them 287 # and reset the maximum to the actual length. 288 max_idx = len(self) 289 290 # Just in case the max_idx is negative... 291 max_idx = self._positive_index_of(max_idx) 292 293 number_loaded = super().__len__() 294 295 if max_idx >= number_loaded: 296 # We need more than we have, try loading up to and including 297 # max_idx. Note we do not try to be clever about skipping HDUs 298 # even though key.step might conceivably allow it. 299 for i in range(number_loaded, max_idx): 300 # Read until max_idx or to the end of the file, whichever 301 # comes first. 302 if not self._read_next_hdu(): 303 break 304 305 try: 306 hdus = super().__getitem__(key) 307 except IndexError as e: 308 # Raise a more helpful IndexError if the file was not fully read. 309 if self._read_all: 310 raise e 311 else: 312 raise IndexError('HDU not found, possibly because the index ' 313 'is out of range, or because the file was ' 314 'closed before all HDUs were read') 315 else: 316 return HDUList(hdus) 317 318 # Originally this used recursion, but hypothetically an HDU with 319 # a very large number of HDUs could blow the stack, so use a loop 320 # instead 321 try: 322 return self._try_while_unread_hdus(super().__getitem__, 323 self._positive_index_of(key)) 324 except IndexError as e: 325 # Raise a more helpful IndexError if the file was not fully read. 326 if self._read_all: 327 raise e 328 else: 329 raise IndexError('HDU not found, possibly because the index ' 330 'is out of range, or because the file was ' 331 'closed before all HDUs were read') 332 333 def __contains__(self, item): 334 """ 335 Returns `True` if ``item`` is an ``HDU`` _in_ ``self`` or a valid 336 extension specification (e.g., integer extension number, extension 337 name, or a tuple of extension name and an extension version) 338 of a ``HDU`` in ``self``. 339 340 """ 341 try: 342 self._try_while_unread_hdus(self.index_of, item) 343 except (KeyError, ValueError): 344 return False 345 346 return True 347 348 def __setitem__(self, key, hdu): 349 """ 350 Set an HDU to the `HDUList`, indexed by number or name. 351 """ 352 353 _key = self._positive_index_of(key) 354 if isinstance(hdu, (slice, list)): 355 if _is_int(_key): 356 raise ValueError('An element in the HDUList must be an HDU.') 357 for item in hdu: 358 if not isinstance(item, _BaseHDU): 359 raise ValueError(f'{item} is not an HDU.') 360 else: 361 if not isinstance(hdu, _BaseHDU): 362 raise ValueError(f'{hdu} is not an HDU.') 363 364 try: 365 self._try_while_unread_hdus(super().__setitem__, _key, hdu) 366 except IndexError: 367 raise IndexError(f'Extension {key} is out of bound or not found.') 368 369 self._resize = True 370 self._truncate = False 371 372 def __delitem__(self, key): 373 """ 374 Delete an HDU from the `HDUList`, indexed by number or name. 375 """ 376 377 if isinstance(key, slice): 378 end_index = len(self) 379 else: 380 key = self._positive_index_of(key) 381 end_index = len(self) - 1 382 383 self._try_while_unread_hdus(super().__delitem__, key) 384 385 if (key == end_index or key == -1 and not self._resize): 386 self._truncate = True 387 else: 388 self._truncate = False 389 self._resize = True 390 391 # Support the 'with' statement 392 def __enter__(self): 393 return self 394 395 def __exit__(self, type, value, traceback): 396 output_verify = self._open_kwargs.get('output_verify', 'exception') 397 self.close(output_verify=output_verify) 398 399 @classmethod 400 def fromfile(cls, fileobj, mode=None, memmap=None, 401 save_backup=False, cache=True, lazy_load_hdus=True, 402 ignore_missing_simple=False, **kwargs): 403 """ 404 Creates an `HDUList` instance from a file-like object. 405 406 The actual implementation of ``fitsopen()``, and generally shouldn't 407 be used directly. Use :func:`open` instead (and see its 408 documentation for details of the parameters accepted by this method). 409 """ 410 411 return cls._readfrom(fileobj=fileobj, mode=mode, memmap=memmap, 412 save_backup=save_backup, cache=cache, 413 ignore_missing_simple=ignore_missing_simple, 414 lazy_load_hdus=lazy_load_hdus, **kwargs) 415 416 @classmethod 417 def fromstring(cls, data, **kwargs): 418 """ 419 Creates an `HDUList` instance from a string or other in-memory data 420 buffer containing an entire FITS file. Similar to 421 :meth:`HDUList.fromfile`, but does not accept the mode or memmap 422 arguments, as they are only relevant to reading from a file on disk. 423 424 This is useful for interfacing with other libraries such as CFITSIO, 425 and may also be useful for streaming applications. 426 427 Parameters 428 ---------- 429 data : str, buffer-like, etc. 430 A string or other memory buffer containing an entire FITS file. 431 Buffer-like objects include :class:`~bytes`, :class:`~bytearray`, 432 :class:`~memoryview`, and :class:`~numpy.ndarray`. 433 It should be noted that if that memory is read-only (such as a 434 Python string) the returned :class:`HDUList`'s data portions will 435 also be read-only. 436 437 kwargs : dict 438 Optional keyword arguments. See 439 :func:`astropy.io.fits.open` for details. 440 441 Returns 442 ------- 443 hdul : HDUList 444 An :class:`HDUList` object representing the in-memory FITS file. 445 """ 446 447 try: 448 # Test that the given object supports the buffer interface by 449 # ensuring an ndarray can be created from it 450 np.ndarray((), dtype='ubyte', buffer=data) 451 except TypeError: 452 raise TypeError( 453 'The provided object {} does not contain an underlying ' 454 'memory buffer. fromstring() requires an object that ' 455 'supports the buffer interface such as bytes, buffer, ' 456 'memoryview, ndarray, etc. This restriction is to ensure ' 457 'that efficient access to the array/table data is possible.' 458 ''.format(data)) 459 460 return cls._readfrom(data=data, **kwargs) 461 462 def fileinfo(self, index): 463 """ 464 Returns a dictionary detailing information about the locations 465 of the indexed HDU within any associated file. The values are 466 only valid after a read or write of the associated file with 467 no intervening changes to the `HDUList`. 468 469 Parameters 470 ---------- 471 index : int 472 Index of HDU for which info is to be returned. 473 474 Returns 475 ------- 476 fileinfo : dict or None 477 478 The dictionary details information about the locations of 479 the indexed HDU within an associated file. Returns `None` 480 when the HDU is not associated with a file. 481 482 Dictionary contents: 483 484 ========== ======================================================== 485 Key Value 486 ========== ======================================================== 487 file File object associated with the HDU 488 filename Name of associated file object 489 filemode Mode in which the file was opened (readonly, 490 update, append, denywrite, ostream) 491 resized Flag that when `True` indicates that the data has been 492 resized since the last read/write so the returned values 493 may not be valid. 494 hdrLoc Starting byte location of header in file 495 datLoc Starting byte location of data block in file 496 datSpan Data size including padding 497 ========== ======================================================== 498 499 """ 500 501 if self._file is not None: 502 output = self[index].fileinfo() 503 504 if not output: 505 # OK, the HDU associated with this index is not yet 506 # tied to the file associated with the HDUList. The only way 507 # to get the file object is to check each of the HDU's in the 508 # list until we find the one associated with the file. 509 f = None 510 511 for hdu in self: 512 info = hdu.fileinfo() 513 514 if info: 515 f = info['file'] 516 fm = info['filemode'] 517 break 518 519 output = {'file': f, 'filemode': fm, 'hdrLoc': None, 520 'datLoc': None, 'datSpan': None} 521 522 output['filename'] = self._file.name 523 output['resized'] = self._wasresized() 524 else: 525 output = None 526 527 return output 528 529 def __copy__(self): 530 """ 531 Return a shallow copy of an HDUList. 532 533 Returns 534 ------- 535 copy : `HDUList` 536 A shallow copy of this `HDUList` object. 537 538 """ 539 540 return self[:] 541 542 # Syntactic sugar for `__copy__()` magic method 543 copy = __copy__ 544 545 def __deepcopy__(self, memo=None): 546 return HDUList([hdu.copy() for hdu in self]) 547 548 def pop(self, index=-1): 549 """ Remove an item from the list and return it. 550 551 Parameters 552 ---------- 553 index : int, str, tuple of (string, int), optional 554 An integer value of ``index`` indicates the position from which 555 ``pop()`` removes and returns an HDU. A string value or a tuple 556 of ``(string, int)`` functions as a key for identifying the 557 HDU to be removed and returned. If ``key`` is a tuple, it is 558 of the form ``(key, ver)`` where ``ver`` is an ``EXTVER`` 559 value that must match the HDU being searched for. 560 561 If the key is ambiguous (e.g. there are multiple 'SCI' extensions) 562 the first match is returned. For a more precise match use the 563 ``(name, ver)`` pair. 564 565 If even the ``(name, ver)`` pair is ambiguous the numeric index 566 must be used to index the duplicate HDU. 567 568 Returns 569 ------- 570 hdu : BaseHDU 571 The HDU object at position indicated by ``index`` or having name 572 and version specified by ``index``. 573 """ 574 575 # Make sure that HDUs are loaded before attempting to pop 576 self.readall() 577 list_index = self.index_of(index) 578 return super().pop(list_index) 579 580 def insert(self, index, hdu): 581 """ 582 Insert an HDU into the `HDUList` at the given ``index``. 583 584 Parameters 585 ---------- 586 index : int 587 Index before which to insert the new HDU. 588 589 hdu : BaseHDU 590 The HDU object to insert 591 """ 592 593 if not isinstance(hdu, _BaseHDU): 594 raise ValueError(f'{hdu} is not an HDU.') 595 596 num_hdus = len(self) 597 598 if index == 0 or num_hdus == 0: 599 if num_hdus != 0: 600 # We are inserting a new Primary HDU so we need to 601 # make the current Primary HDU into an extension HDU. 602 if isinstance(self[0], GroupsHDU): 603 raise ValueError( 604 "The current Primary HDU is a GroupsHDU. " 605 "It can't be made into an extension HDU, " 606 "so another HDU cannot be inserted before it.") 607 608 hdu1 = ImageHDU(self[0].data, self[0].header) 609 610 # Insert it into position 1, then delete HDU at position 0. 611 super().insert(1, hdu1) 612 super().__delitem__(0) 613 614 if not isinstance(hdu, (PrimaryHDU, _NonstandardHDU)): 615 # You passed in an Extension HDU but we need a Primary HDU. 616 # If you provided an ImageHDU then we can convert it to 617 # a primary HDU and use that. 618 if isinstance(hdu, ImageHDU): 619 hdu = PrimaryHDU(hdu.data, hdu.header) 620 else: 621 # You didn't provide an ImageHDU so we create a 622 # simple Primary HDU and append that first before 623 # we append the new Extension HDU. 624 phdu = PrimaryHDU() 625 626 super().insert(0, phdu) 627 index = 1 628 else: 629 if isinstance(hdu, GroupsHDU): 630 raise ValueError('A GroupsHDU must be inserted as a ' 631 'Primary HDU.') 632 633 if isinstance(hdu, PrimaryHDU): 634 # You passed a Primary HDU but we need an Extension HDU 635 # so create an Extension HDU from the input Primary HDU. 636 hdu = ImageHDU(hdu.data, hdu.header) 637 638 super().insert(index, hdu) 639 hdu._new = True 640 self._resize = True 641 self._truncate = False 642 # make sure the EXTEND keyword is in primary HDU if there is extension 643 self.update_extend() 644 645 def append(self, hdu): 646 """ 647 Append a new HDU to the `HDUList`. 648 649 Parameters 650 ---------- 651 hdu : BaseHDU 652 HDU to add to the `HDUList`. 653 """ 654 655 if not isinstance(hdu, _BaseHDU): 656 raise ValueError('HDUList can only append an HDU.') 657 658 if len(self) > 0: 659 if isinstance(hdu, GroupsHDU): 660 raise ValueError( 661 "Can't append a GroupsHDU to a non-empty HDUList") 662 663 if isinstance(hdu, PrimaryHDU): 664 # You passed a Primary HDU but we need an Extension HDU 665 # so create an Extension HDU from the input Primary HDU. 666 # TODO: This isn't necessarily sufficient to copy the HDU; 667 # _header_offset and friends need to be copied too. 668 hdu = ImageHDU(hdu.data, hdu.header) 669 else: 670 if not isinstance(hdu, (PrimaryHDU, _NonstandardHDU)): 671 # You passed in an Extension HDU but we need a Primary 672 # HDU. 673 # If you provided an ImageHDU then we can convert it to 674 # a primary HDU and use that. 675 if isinstance(hdu, ImageHDU): 676 hdu = PrimaryHDU(hdu.data, hdu.header) 677 else: 678 # You didn't provide an ImageHDU so we create a 679 # simple Primary HDU and append that first before 680 # we append the new Extension HDU. 681 phdu = PrimaryHDU() 682 super().append(phdu) 683 684 super().append(hdu) 685 hdu._new = True 686 self._resize = True 687 self._truncate = False 688 689 # make sure the EXTEND keyword is in primary HDU if there is extension 690 self.update_extend() 691 692 def index_of(self, key): 693 """ 694 Get the index of an HDU from the `HDUList`. 695 696 Parameters 697 ---------- 698 key : int, str, tuple of (string, int) or BaseHDU 699 The key identifying the HDU. If ``key`` is a tuple, it is of the 700 form ``(name, ver)`` where ``ver`` is an ``EXTVER`` value that must 701 match the HDU being searched for. 702 703 If the key is ambiguous (e.g. there are multiple 'SCI' extensions) 704 the first match is returned. For a more precise match use the 705 ``(name, ver)`` pair. 706 707 If even the ``(name, ver)`` pair is ambiguous (it shouldn't be 708 but it's not impossible) the numeric index must be used to index 709 the duplicate HDU. 710 711 When ``key`` is an HDU object, this function returns the 712 index of that HDU object in the ``HDUList``. 713 714 Returns 715 ------- 716 index : int 717 The index of the HDU in the `HDUList`. 718 719 Raises 720 ------ 721 ValueError 722 If ``key`` is an HDU object and it is not found in the ``HDUList``. 723 724 KeyError 725 If an HDU specified by the ``key`` that is an extension number, 726 extension name, or a tuple of extension name and version is not 727 found in the ``HDUList``. 728 729 """ 730 731 if _is_int(key): 732 return key 733 elif isinstance(key, tuple): 734 _key, _ver = key 735 elif isinstance(key, _BaseHDU): 736 return self.index(key) 737 else: 738 _key = key 739 _ver = None 740 741 if not isinstance(_key, str): 742 raise KeyError( 743 '{} indices must be integers, extension names as strings, ' 744 'or (extname, version) tuples; got {}' 745 ''.format(self.__class__.__name__, _key)) 746 747 _key = (_key.strip()).upper() 748 749 found = None 750 for idx, hdu in enumerate(self): 751 name = hdu.name 752 if isinstance(name, str): 753 name = name.strip().upper() 754 # 'PRIMARY' should always work as a reference to the first HDU 755 if ((name == _key or (_key == 'PRIMARY' and idx == 0)) and 756 (_ver is None or _ver == hdu.ver)): 757 found = idx 758 break 759 760 if (found is None): 761 raise KeyError(f'Extension {key!r} not found.') 762 else: 763 return found 764 765 def _positive_index_of(self, key): 766 """ 767 Same as index_of, but ensures always returning a positive index 768 or zero. 769 770 (Really this should be called non_negative_index_of but it felt 771 too long.) 772 773 This means that if the key is a negative integer, we have to 774 convert it to the corresponding positive index. This means 775 knowing the length of the HDUList, which in turn means loading 776 all HDUs. Therefore using negative indices on HDULists is inherently 777 inefficient. 778 """ 779 780 index = self.index_of(key) 781 782 if index >= 0: 783 return index 784 785 if abs(index) > len(self): 786 raise IndexError( 787 f'Extension {index} is out of bound or not found.') 788 789 return len(self) + index 790 791 def readall(self): 792 """ 793 Read data of all HDUs into memory. 794 """ 795 while self._read_next_hdu(): 796 pass 797 798 @ignore_sigint 799 def flush(self, output_verify='fix', verbose=False): 800 """ 801 Force a write of the `HDUList` back to the file (for append and 802 update modes only). 803 804 Parameters 805 ---------- 806 output_verify : str 807 Output verification option. Must be one of ``"fix"``, 808 ``"silentfix"``, ``"ignore"``, ``"warn"``, or 809 ``"exception"``. May also be any combination of ``"fix"`` or 810 ``"silentfix"`` with ``"+ignore"``, ``+warn``, or ``+exception" 811 (e.g. ``"fix+warn"``). See :ref:`astropy:verify` for more info. 812 813 verbose : bool 814 When `True`, print verbose messages 815 """ 816 817 if self._file.mode not in ('append', 'update', 'ostream'): 818 warnings.warn("Flush for '{}' mode is not supported." 819 .format(self._file.mode), AstropyUserWarning) 820 return 821 822 save_backup = self._open_kwargs.get('save_backup', False) 823 if save_backup and self._file.mode in ('append', 'update'): 824 filename = self._file.name 825 if os.path.exists(filename): 826 # The the file doesn't actually exist anymore for some reason 827 # then there's no point in trying to make a backup 828 backup = filename + '.bak' 829 idx = 1 830 while os.path.exists(backup): 831 backup = filename + '.bak.' + str(idx) 832 idx += 1 833 warnings.warn('Saving a backup of {} to {}.'.format( 834 filename, backup), AstropyUserWarning) 835 try: 836 shutil.copy(filename, backup) 837 except OSError as exc: 838 raise OSError('Failed to save backup to destination {}: ' 839 '{}'.format(filename, exc)) 840 841 self.verify(option=output_verify) 842 843 if self._file.mode in ('append', 'ostream'): 844 for hdu in self: 845 if verbose: 846 try: 847 extver = str(hdu._header['extver']) 848 except KeyError: 849 extver = '' 850 851 # only append HDU's which are "new" 852 if hdu._new: 853 hdu._prewriteto(checksum=hdu._output_checksum) 854 with _free_space_check(self): 855 hdu._writeto(self._file) 856 if verbose: 857 print('append HDU', hdu.name, extver) 858 hdu._new = False 859 hdu._postwriteto() 860 861 elif self._file.mode == 'update': 862 self._flush_update() 863 864 def update_extend(self): 865 """ 866 Make sure that if the primary header needs the keyword ``EXTEND`` that 867 it has it and it is correct. 868 """ 869 870 if not len(self): 871 return 872 873 if not isinstance(self[0], PrimaryHDU): 874 # A PrimaryHDU will be automatically inserted at some point, but it 875 # might not have been added yet 876 return 877 878 hdr = self[0].header 879 880 def get_first_ext(): 881 try: 882 return self[1] 883 except IndexError: 884 return None 885 886 if 'EXTEND' in hdr: 887 if not hdr['EXTEND'] and get_first_ext() is not None: 888 hdr['EXTEND'] = True 889 elif get_first_ext() is not None: 890 if hdr['NAXIS'] == 0: 891 hdr.set('EXTEND', True, after='NAXIS') 892 else: 893 n = hdr['NAXIS'] 894 hdr.set('EXTEND', True, after='NAXIS' + str(n)) 895 896 @deprecated_renamed_argument('clobber', 'overwrite', '2.0', 897 message='"clobber" was deprecated in version ' 898 '2.0 and will be removed in version ' 899 '5.1. Use argument "overwrite" ' 900 'instead.') 901 def writeto(self, fileobj, output_verify='exception', overwrite=False, 902 checksum=False): 903 """ 904 Write the `HDUList` to a new file. 905 906 Parameters 907 ---------- 908 fileobj : str, file-like or `pathlib.Path` 909 File to write to. If a file object, must be opened in a 910 writeable mode. 911 912 output_verify : str 913 Output verification option. Must be one of ``"fix"``, 914 ``"silentfix"``, ``"ignore"``, ``"warn"``, or 915 ``"exception"``. May also be any combination of ``"fix"`` or 916 ``"silentfix"`` with ``"+ignore"``, ``+warn``, or ``+exception" 917 (e.g. ``"fix+warn"``). See :ref:`astropy:verify` for more info. 918 919 overwrite : bool, optional 920 If ``True``, overwrite the output file if it exists. Raises an 921 ``OSError`` if ``False`` and the output file exists. Default is 922 ``False``. 923 924 .. versionchanged:: 1.3 925 ``overwrite`` replaces the deprecated ``clobber`` argument. 926 927 checksum : bool 928 When `True` adds both ``DATASUM`` and ``CHECKSUM`` cards 929 to the headers of all HDU's written to the file. 930 """ 931 932 if (len(self) == 0): 933 warnings.warn("There is nothing to write.", AstropyUserWarning) 934 return 935 936 self.verify(option=output_verify) 937 938 # make sure the EXTEND keyword is there if there is extension 939 self.update_extend() 940 941 # make note of whether the input file object is already open, in which 942 # case we should not close it after writing (that should be the job 943 # of the caller) 944 closed = isinstance(fileobj, str) or fileobj_closed(fileobj) 945 946 mode = FILE_MODES[fileobj_mode(fileobj)] if isfile(fileobj) else 'ostream' 947 948 # This can accept an open file object that's open to write only, or in 949 # append/update modes but only if the file doesn't exist. 950 fileobj = _File(fileobj, mode=mode, overwrite=overwrite) 951 hdulist = self.fromfile(fileobj) 952 try: 953 dirname = os.path.dirname(hdulist._file.name) 954 except (AttributeError, TypeError): 955 dirname = None 956 957 with _free_space_check(self, dirname=dirname): 958 for hdu in self: 959 hdu._prewriteto(checksum=checksum) 960 hdu._writeto(hdulist._file) 961 hdu._postwriteto() 962 hdulist.close(output_verify=output_verify, closed=closed) 963 964 def close(self, output_verify='exception', verbose=False, closed=True): 965 """ 966 Close the associated FITS file and memmap object, if any. 967 968 Parameters 969 ---------- 970 output_verify : str 971 Output verification option. Must be one of ``"fix"``, 972 ``"silentfix"``, ``"ignore"``, ``"warn"``, or 973 ``"exception"``. May also be any combination of ``"fix"`` or 974 ``"silentfix"`` with ``"+ignore"``, ``+warn``, or ``+exception" 975 (e.g. ``"fix+warn"``). See :ref:`astropy:verify` for more info. 976 977 verbose : bool 978 When `True`, print out verbose messages. 979 980 closed : bool 981 When `True`, close the underlying file object. 982 """ 983 984 try: 985 if (self._file and self._file.mode in ('append', 'update') 986 and not self._file.closed): 987 self.flush(output_verify=output_verify, verbose=verbose) 988 finally: 989 if self._file and closed and hasattr(self._file, 'close'): 990 self._file.close() 991 992 # Give individual HDUs an opportunity to do on-close cleanup 993 for hdu in self: 994 hdu._close(closed=closed) 995 996 def info(self, output=None): 997 """ 998 Summarize the info of the HDUs in this `HDUList`. 999 1000 Note that this function prints its results to the console---it 1001 does not return a value. 1002 1003 Parameters 1004 ---------- 1005 output : file-like or bool, optional 1006 A file-like object to write the output to. If `False`, does not 1007 output to a file and instead returns a list of tuples representing 1008 the HDU info. Writes to ``sys.stdout`` by default. 1009 """ 1010 1011 if output is None: 1012 output = sys.stdout 1013 1014 if self._file is None: 1015 name = '(No file associated with this HDUList)' 1016 else: 1017 name = self._file.name 1018 1019 results = [f'Filename: {name}', 1020 'No. Name Ver Type Cards Dimensions Format'] 1021 1022 format = '{:3d} {:10} {:3} {:11} {:5d} {} {} {}' 1023 default = ('', '', '', 0, (), '', '') 1024 for idx, hdu in enumerate(self): 1025 summary = hdu._summary() 1026 if len(summary) < len(default): 1027 summary += default[len(summary):] 1028 summary = (idx,) + summary 1029 if output: 1030 results.append(format.format(*summary)) 1031 else: 1032 results.append(summary) 1033 1034 if output: 1035 output.write('\n'.join(results)) 1036 output.write('\n') 1037 output.flush() 1038 else: 1039 return results[2:] 1040 1041 def filename(self): 1042 """ 1043 Return the file name associated with the HDUList object if one exists. 1044 Otherwise returns None. 1045 1046 Returns 1047 ------- 1048 filename : str 1049 A string containing the file name associated with the HDUList 1050 object if an association exists. Otherwise returns None. 1051 1052 """ 1053 if self._file is not None: 1054 if hasattr(self._file, 'name'): 1055 return self._file.name 1056 return None 1057 1058 @classmethod 1059 def _readfrom(cls, fileobj=None, data=None, mode=None, memmap=None, 1060 cache=True, lazy_load_hdus=True, ignore_missing_simple=False, 1061 **kwargs): 1062 """ 1063 Provides the implementations from HDUList.fromfile and 1064 HDUList.fromstring, both of which wrap this method, as their 1065 implementations are largely the same. 1066 """ 1067 1068 if fileobj is not None: 1069 if not isinstance(fileobj, _File): 1070 # instantiate a FITS file object (ffo) 1071 fileobj = _File(fileobj, mode=mode, memmap=memmap, cache=cache) 1072 # The Astropy mode is determined by the _File initializer if the 1073 # supplied mode was None 1074 mode = fileobj.mode 1075 hdulist = cls(file=fileobj) 1076 else: 1077 if mode is None: 1078 # The default mode 1079 mode = 'readonly' 1080 1081 hdulist = cls(file=data) 1082 # This method is currently only called from HDUList.fromstring and 1083 # HDUList.fromfile. If fileobj is None then this must be the 1084 # fromstring case; the data type of ``data`` will be checked in the 1085 # _BaseHDU.fromstring call. 1086 1087 if (not ignore_missing_simple and 1088 hdulist._file and 1089 hdulist._file.mode != 'ostream' and 1090 hdulist._file.size > 0): 1091 pos = hdulist._file.tell() 1092 # FITS signature is supposed to be in the first 30 bytes, but to 1093 # allow reading various invalid files we will check in the first 1094 # card (80 bytes). 1095 simple = hdulist._file.read(80) 1096 match_sig = (simple[:29] == FITS_SIGNATURE[:-1] and 1097 simple[29:30] in (b'T', b'F')) 1098 1099 if not match_sig: 1100 # Check the SIMPLE card is there but not written correctly 1101 match_sig_relaxed = re.match(rb"SIMPLE\s*=\s*[T|F]", simple) 1102 1103 if match_sig_relaxed: 1104 warnings.warn("Found a SIMPLE card but its format doesn't" 1105 " respect the FITS Standard", VerifyWarning) 1106 else: 1107 if hdulist._file.close_on_error: 1108 hdulist._file.close() 1109 raise OSError( 1110 'No SIMPLE card found, this file does not appear to ' 1111 'be a valid FITS file. If this is really a FITS file, ' 1112 'try with ignore_missing_simple=True') 1113 1114 hdulist._file.seek(pos) 1115 1116 # Store additional keyword args that were passed to fits.open 1117 hdulist._open_kwargs = kwargs 1118 1119 if fileobj is not None and fileobj.writeonly: 1120 # Output stream--not interested in reading/parsing 1121 # the HDUs--just writing to the output file 1122 return hdulist 1123 1124 # Make sure at least the PRIMARY HDU can be read 1125 read_one = hdulist._read_next_hdu() 1126 1127 # If we're trying to read only and no header units were found, 1128 # raise an exception 1129 if not read_one and mode in ('readonly', 'denywrite'): 1130 # Close the file if necessary (issue #6168) 1131 if hdulist._file.close_on_error: 1132 hdulist._file.close() 1133 1134 raise OSError('Empty or corrupt FITS file') 1135 1136 if not lazy_load_hdus or kwargs.get('checksum') is True: 1137 # Go ahead and load all HDUs 1138 while hdulist._read_next_hdu(): 1139 pass 1140 1141 # initialize/reset attributes to be used in "update/append" mode 1142 hdulist._resize = False 1143 hdulist._truncate = False 1144 1145 return hdulist 1146 1147 def _try_while_unread_hdus(self, func, *args, **kwargs): 1148 """ 1149 Attempt an operation that accesses an HDU by index/name 1150 that can fail if not all HDUs have been read yet. Keep 1151 reading HDUs until the operation succeeds or there are no 1152 more HDUs to read. 1153 """ 1154 1155 while True: 1156 try: 1157 return func(*args, **kwargs) 1158 except Exception: 1159 if self._read_next_hdu(): 1160 continue 1161 else: 1162 raise 1163 1164 def _read_next_hdu(self): 1165 """ 1166 Lazily load a single HDU from the fileobj or data string the `HDUList` 1167 was opened from, unless no further HDUs are found. 1168 1169 Returns True if a new HDU was loaded, or False otherwise. 1170 """ 1171 1172 if self._read_all: 1173 return False 1174 1175 saved_compression_enabled = compressed.COMPRESSION_ENABLED 1176 fileobj, data, kwargs = self._file, self._data, self._open_kwargs 1177 1178 if fileobj is not None and fileobj.closed: 1179 return False 1180 1181 try: 1182 self._in_read_next_hdu = True 1183 1184 if ('disable_image_compression' in kwargs and 1185 kwargs['disable_image_compression']): 1186 compressed.COMPRESSION_ENABLED = False 1187 1188 # read all HDUs 1189 try: 1190 if fileobj is not None: 1191 try: 1192 # Make sure we're back to the end of the last read 1193 # HDU 1194 if len(self) > 0: 1195 last = self[len(self) - 1] 1196 if last._data_offset is not None: 1197 offset = last._data_offset + last._data_size 1198 fileobj.seek(offset, os.SEEK_SET) 1199 1200 hdu = _BaseHDU.readfrom(fileobj, **kwargs) 1201 except EOFError: 1202 self._read_all = True 1203 return False 1204 except OSError: 1205 # Close the file: see 1206 # https://github.com/astropy/astropy/issues/6168 1207 # 1208 if self._file.close_on_error: 1209 self._file.close() 1210 1211 if fileobj.writeonly: 1212 self._read_all = True 1213 return False 1214 else: 1215 raise 1216 else: 1217 if not data: 1218 self._read_all = True 1219 return False 1220 hdu = _BaseHDU.fromstring(data, **kwargs) 1221 self._data = data[hdu._data_offset + hdu._data_size:] 1222 1223 super().append(hdu) 1224 if len(self) == 1: 1225 # Check for an extension HDU and update the EXTEND 1226 # keyword of the primary HDU accordingly 1227 self.update_extend() 1228 1229 hdu._new = False 1230 if 'checksum' in kwargs: 1231 hdu._output_checksum = kwargs['checksum'] 1232 # check in the case there is extra space after the last HDU or 1233 # corrupted HDU 1234 except (VerifyError, ValueError) as exc: 1235 warnings.warn( 1236 'Error validating header for HDU #{} (note: Astropy ' 1237 'uses zero-based indexing).\n{}\n' 1238 'There may be extra bytes after the last HDU or the ' 1239 'file is corrupted.'.format( 1240 len(self), indent(str(exc))), VerifyWarning) 1241 del exc 1242 self._read_all = True 1243 return False 1244 finally: 1245 compressed.COMPRESSION_ENABLED = saved_compression_enabled 1246 self._in_read_next_hdu = False 1247 1248 return True 1249 1250 def _verify(self, option='warn'): 1251 errs = _ErrList([], unit='HDU') 1252 1253 # the first (0th) element must be a primary HDU 1254 if len(self) > 0 and (not isinstance(self[0], PrimaryHDU)) and \ 1255 (not isinstance(self[0], _NonstandardHDU)): 1256 err_text = "HDUList's 0th element is not a primary HDU." 1257 fix_text = 'Fixed by inserting one as 0th HDU.' 1258 1259 def fix(self=self): 1260 self.insert(0, PrimaryHDU()) 1261 1262 err = self.run_option(option, err_text=err_text, 1263 fix_text=fix_text, fix=fix) 1264 errs.append(err) 1265 1266 if len(self) > 1 and ('EXTEND' not in self[0].header or 1267 self[0].header['EXTEND'] is not True): 1268 err_text = ('Primary HDU does not contain an EXTEND keyword ' 1269 'equal to T even though there are extension HDUs.') 1270 fix_text = 'Fixed by inserting or updating the EXTEND keyword.' 1271 1272 def fix(header=self[0].header): 1273 naxis = header['NAXIS'] 1274 if naxis == 0: 1275 after = 'NAXIS' 1276 else: 1277 after = 'NAXIS' + str(naxis) 1278 header.set('EXTEND', value=True, after=after) 1279 1280 errs.append(self.run_option(option, err_text=err_text, 1281 fix_text=fix_text, fix=fix)) 1282 1283 # each element calls their own verify 1284 for idx, hdu in enumerate(self): 1285 if idx > 0 and (not isinstance(hdu, ExtensionHDU)): 1286 err_text = f"HDUList's element {str(idx)} is not an extension HDU." 1287 1288 err = self.run_option(option, err_text=err_text, fixable=False) 1289 errs.append(err) 1290 1291 else: 1292 result = hdu._verify(option) 1293 if result: 1294 errs.append(result) 1295 return errs 1296 1297 def _flush_update(self): 1298 """Implements flushing changes to a file in update mode.""" 1299 1300 for hdu in self: 1301 # Need to all _prewriteto() for each HDU first to determine if 1302 # resizing will be necessary 1303 hdu._prewriteto(checksum=hdu._output_checksum, inplace=True) 1304 1305 try: 1306 self._wasresized() 1307 1308 # if the HDUList is resized, need to write out the entire contents of 1309 # the hdulist to the file. 1310 if self._resize or self._file.compression: 1311 self._flush_resize() 1312 else: 1313 # if not resized, update in place 1314 for hdu in self: 1315 hdu._writeto(self._file, inplace=True) 1316 1317 # reset the modification attributes after updating 1318 for hdu in self: 1319 hdu._header._modified = False 1320 finally: 1321 for hdu in self: 1322 hdu._postwriteto() 1323 1324 def _flush_resize(self): 1325 """ 1326 Implements flushing changes in update mode when parts of one or more HDU 1327 need to be resized. 1328 """ 1329 1330 old_name = self._file.name 1331 old_memmap = self._file.memmap 1332 name = _tmp_name(old_name) 1333 1334 if not self._file.file_like: 1335 old_mode = os.stat(old_name).st_mode 1336 # The underlying file is an actual file object. The HDUList is 1337 # resized, so we need to write it to a tmp file, delete the 1338 # original file, and rename the tmp file to the original file. 1339 if self._file.compression == 'gzip': 1340 new_file = gzip.GzipFile(name, mode='ab+') 1341 elif self._file.compression == 'bzip2': 1342 if not HAS_BZ2: 1343 raise ModuleNotFoundError( 1344 "This Python installation does not provide the bz2 module.") 1345 new_file = bz2.BZ2File(name, mode='w') 1346 else: 1347 new_file = name 1348 1349 with self.fromfile(new_file, mode='append') as hdulist: 1350 1351 for hdu in self: 1352 hdu._writeto(hdulist._file, inplace=True, copy=True) 1353 if sys.platform.startswith('win'): 1354 # Collect a list of open mmaps to the data; this well be 1355 # used later. See below. 1356 mmaps = [(idx, _get_array_mmap(hdu.data), hdu.data) 1357 for idx, hdu in enumerate(self) if hdu._has_data] 1358 1359 hdulist._file.close() 1360 self._file.close() 1361 if sys.platform.startswith('win'): 1362 # Close all open mmaps to the data. This is only necessary on 1363 # Windows, which will not allow a file to be renamed or deleted 1364 # until all handles to that file have been closed. 1365 for idx, mmap, arr in mmaps: 1366 if mmap is not None: 1367 mmap.close() 1368 1369 os.remove(self._file.name) 1370 1371 # reopen the renamed new file with "update" mode 1372 os.rename(name, old_name) 1373 os.chmod(old_name, old_mode) 1374 1375 if isinstance(new_file, gzip.GzipFile): 1376 old_file = gzip.GzipFile(old_name, mode='rb+') 1377 else: 1378 old_file = old_name 1379 1380 ffo = _File(old_file, mode='update', memmap=old_memmap) 1381 1382 self._file = ffo 1383 1384 for hdu in self: 1385 # Need to update the _file attribute and close any open mmaps 1386 # on each HDU 1387 if hdu._has_data and _get_array_mmap(hdu.data) is not None: 1388 del hdu.data 1389 hdu._file = ffo 1390 1391 if sys.platform.startswith('win'): 1392 # On Windows, all the original data mmaps were closed above. 1393 # However, it's possible that the user still has references to 1394 # the old data which would no longer work (possibly even cause 1395 # a segfault if they try to access it). This replaces the 1396 # buffers used by the original arrays with the buffers of mmap 1397 # arrays created from the new file. This seems to work, but 1398 # it's a flaming hack and carries no guarantees that it won't 1399 # lead to odd behavior in practice. Better to just not keep 1400 # references to data from files that had to be resized upon 1401 # flushing (on Windows--again, this is no problem on Linux). 1402 for idx, mmap, arr in mmaps: 1403 if mmap is not None: 1404 # https://github.com/numpy/numpy/issues/8628 1405 with warnings.catch_warnings(): 1406 warnings.simplefilter('ignore', category=DeprecationWarning) 1407 arr.data = self[idx].data.data 1408 del mmaps # Just to be sure 1409 1410 else: 1411 # The underlying file is not a file object, it is a file like 1412 # object. We can't write out to a file, we must update the file 1413 # like object in place. To do this, we write out to a temporary 1414 # file, then delete the contents in our file like object, then 1415 # write the contents of the temporary file to the now empty file 1416 # like object. 1417 self.writeto(name) 1418 hdulist = self.fromfile(name) 1419 ffo = self._file 1420 1421 ffo.truncate(0) 1422 ffo.seek(0) 1423 1424 for hdu in hdulist: 1425 hdu._writeto(ffo, inplace=True, copy=True) 1426 1427 # Close the temporary file and delete it. 1428 hdulist.close() 1429 os.remove(hdulist._file.name) 1430 1431 # reset the resize attributes after updating 1432 self._resize = False 1433 self._truncate = False 1434 for hdu in self: 1435 hdu._header._modified = False 1436 hdu._new = False 1437 hdu._file = ffo 1438 1439 def _wasresized(self, verbose=False): 1440 """ 1441 Determine if any changes to the HDUList will require a file resize 1442 when flushing the file. 1443 1444 Side effect of setting the objects _resize attribute. 1445 """ 1446 1447 if not self._resize: 1448 1449 # determine if any of the HDU is resized 1450 for hdu in self: 1451 # Header: 1452 nbytes = len(str(hdu._header)) 1453 if nbytes != (hdu._data_offset - hdu._header_offset): 1454 self._resize = True 1455 self._truncate = False 1456 if verbose: 1457 print('One or more header is resized.') 1458 break 1459 1460 # Data: 1461 if not hdu._has_data: 1462 continue 1463 1464 nbytes = hdu.size 1465 nbytes = nbytes + _pad_length(nbytes) 1466 if nbytes != hdu._data_size: 1467 self._resize = True 1468 self._truncate = False 1469 if verbose: 1470 print('One or more data area is resized.') 1471 break 1472 1473 if self._truncate: 1474 try: 1475 self._file.truncate(hdu._data_offset + hdu._data_size) 1476 except OSError: 1477 self._resize = True 1478 self._truncate = False 1479 1480 return self._resize 1481