1# Licensed under a 3-clause BSD style license - see PYFITS.rst 2 3 4import datetime 5import os 6import sys 7import warnings 8from contextlib import suppress 9from inspect import signature, Parameter 10 11import numpy as np 12 13from astropy.io.fits import conf 14from astropy.io.fits.file import _File 15from astropy.io.fits.header import (Header, _BasicHeader, _pad_length, 16 _DelayedHeader) 17from astropy.io.fits.util import (_is_int, _is_pseudo_integer, _pseudo_zero, 18 itersubclasses, decode_ascii, _get_array_mmap, first, 19 _free_space_check, _extract_number) 20from astropy.io.fits.verify import _Verify, _ErrList 21 22from astropy.utils import lazyproperty 23from astropy.utils.exceptions import AstropyUserWarning 24from astropy.utils.decorators import deprecated_renamed_argument 25 26 27__all__ = [ 28 "DELAYED", 29 # classes 30 "InvalidHDUException", 31 "ExtensionHDU", 32 "NonstandardExtHDU", 33] 34 35 36class _Delayed: 37 pass 38 39 40DELAYED = _Delayed() 41 42 43BITPIX2DTYPE = {8: 'uint8', 16: 'int16', 32: 'int32', 64: 'int64', 44 -32: 'float32', -64: 'float64'} 45"""Maps FITS BITPIX values to Numpy dtype names.""" 46 47DTYPE2BITPIX = {'int8': 8, 'uint8': 8, 'int16': 16, 'uint16': 16, 48 'int32': 32, 'uint32': 32, 'int64': 64, 'uint64': 64, 49 'float32': -32, 'float64': -64} 50""" 51Maps Numpy dtype names to FITS BITPIX values (this includes unsigned 52integers, with the assumption that the pseudo-unsigned integer convention 53will be used in this case. 54""" 55 56 57class InvalidHDUException(Exception): 58 """ 59 A custom exception class used mainly to signal to _BaseHDU.__new__ that 60 an HDU cannot possibly be considered valid, and must be assumed to be 61 corrupted. 62 """ 63 64 65def _hdu_class_from_header(cls, header): 66 """ 67 Iterates through the subclasses of _BaseHDU and uses that class's 68 match_header() method to determine which subclass to instantiate. 69 70 It's important to be aware that the class hierarchy is traversed in a 71 depth-last order. Each match_header() should identify an HDU type as 72 uniquely as possible. Abstract types may choose to simply return False 73 or raise NotImplementedError to be skipped. 74 75 If any unexpected exceptions are raised while evaluating 76 match_header(), the type is taken to be _CorruptedHDU. 77 78 Used primarily by _BaseHDU._readfrom_internal and _BaseHDU._from_data to 79 find an appropriate HDU class to use based on values in the header. 80 """ 81 82 klass = cls # By default, if no subclasses are defined 83 if header: 84 for c in reversed(list(itersubclasses(cls))): 85 try: 86 # HDU classes built into astropy.io.fits are always considered, 87 # but extension HDUs must be explicitly registered 88 if not (c.__module__.startswith('astropy.io.fits.') or 89 c in cls._hdu_registry): 90 continue 91 if c.match_header(header): 92 klass = c 93 break 94 except NotImplementedError: 95 continue 96 except Exception as exc: 97 warnings.warn( 98 'An exception occurred matching an HDU header to the ' 99 'appropriate HDU type: {}'.format(exc), 100 AstropyUserWarning) 101 warnings.warn('The HDU will be treated as corrupted.', 102 AstropyUserWarning) 103 klass = _CorruptedHDU 104 del exc 105 break 106 107 return klass 108 109 110# TODO: Come up with a better __repr__ for HDUs (and for HDULists, for that 111# matter) 112class _BaseHDU: 113 """Base class for all HDU (header data unit) classes.""" 114 115 _hdu_registry = set() 116 117 # This HDU type is part of the FITS standard 118 _standard = True 119 120 # Byte to use for padding out blocks 121 _padding_byte = '\x00' 122 123 _default_name = '' 124 125 # _header uses a descriptor to delay the loading of the fits.Header object 126 # until it is necessary. 127 _header = _DelayedHeader() 128 129 def __init__(self, data=None, header=None, *args, **kwargs): 130 if header is None: 131 header = Header() 132 self._header = header 133 self._header_str = None 134 self._file = None 135 self._buffer = None 136 self._header_offset = None 137 self._data_offset = None 138 self._data_size = None 139 140 # This internal variable is used to track whether the data attribute 141 # still points to the same data array as when the HDU was originally 142 # created (this does not track whether the data is actually the same 143 # content-wise) 144 self._data_replaced = False 145 self._data_needs_rescale = False 146 self._new = True 147 self._output_checksum = False 148 149 if 'DATASUM' in self._header and 'CHECKSUM' not in self._header: 150 self._output_checksum = 'datasum' 151 elif 'CHECKSUM' in self._header: 152 self._output_checksum = True 153 154 def __init_subclass__(cls, **kwargs): 155 # Add the same data.deleter to all HDUs with a data property. 156 # It's unfortunate, but there's otherwise no straightforward way 157 # that a property can inherit setters/deleters of the property of the 158 # same name on base classes. 159 data_prop = cls.__dict__.get('data', None) 160 if (isinstance(data_prop, (lazyproperty, property)) 161 and data_prop.fdel is None): 162 # Don't do anything if the class has already explicitly 163 # set the deleter for its data property 164 def data(self): 165 # The deleter 166 if self._file is not None and self._data_loaded: 167 data_refcount = sys.getrefcount(self.data) 168 # Manually delete *now* so that FITS_rec.__del__ 169 # cleanup can happen if applicable 170 del self.__dict__['data'] 171 # Don't even do this unless the *only* reference to the 172 # .data array was the one we're deleting by deleting 173 # this attribute; if any other references to the array 174 # are hanging around (perhaps the user ran ``data = 175 # hdu.data``) don't even consider this: 176 if data_refcount == 2: 177 self._file._maybe_close_mmap() 178 179 setattr(cls, 'data', data_prop.deleter(data)) 180 181 return super().__init_subclass__(**kwargs) 182 183 @property 184 def header(self): 185 return self._header 186 187 @header.setter 188 def header(self, value): 189 self._header = value 190 191 @property 192 def name(self): 193 # Convert the value to a string to be flexible in some pathological 194 # cases (see ticket #96) 195 return str(self._header.get('EXTNAME', self._default_name)) 196 197 @name.setter 198 def name(self, value): 199 if not isinstance(value, str): 200 raise TypeError("'name' attribute must be a string") 201 if not conf.extension_name_case_sensitive: 202 value = value.upper() 203 if 'EXTNAME' in self._header: 204 self._header['EXTNAME'] = value 205 else: 206 self._header['EXTNAME'] = (value, 'extension name') 207 208 @property 209 def ver(self): 210 return self._header.get('EXTVER', 1) 211 212 @ver.setter 213 def ver(self, value): 214 if not _is_int(value): 215 raise TypeError("'ver' attribute must be an integer") 216 if 'EXTVER' in self._header: 217 self._header['EXTVER'] = value 218 else: 219 self._header['EXTVER'] = (value, 'extension value') 220 221 @property 222 def level(self): 223 return self._header.get('EXTLEVEL', 1) 224 225 @level.setter 226 def level(self, value): 227 if not _is_int(value): 228 raise TypeError("'level' attribute must be an integer") 229 if 'EXTLEVEL' in self._header: 230 self._header['EXTLEVEL'] = value 231 else: 232 self._header['EXTLEVEL'] = (value, 'extension level') 233 234 @property 235 def is_image(self): 236 return ( 237 self.name == 'PRIMARY' or 238 ('XTENSION' in self._header and 239 (self._header['XTENSION'] == 'IMAGE' or 240 (self._header['XTENSION'] == 'BINTABLE' and 241 'ZIMAGE' in self._header and self._header['ZIMAGE'] is True)))) 242 243 @property 244 def _data_loaded(self): 245 return ('data' in self.__dict__ and self.data is not DELAYED) 246 247 @property 248 def _has_data(self): 249 return self._data_loaded and self.data is not None 250 251 @classmethod 252 def register_hdu(cls, hducls): 253 cls._hdu_registry.add(hducls) 254 255 @classmethod 256 def unregister_hdu(cls, hducls): 257 if hducls in cls._hdu_registry: 258 cls._hdu_registry.remove(hducls) 259 260 @classmethod 261 def match_header(cls, header): 262 raise NotImplementedError 263 264 @classmethod 265 def fromstring(cls, data, checksum=False, ignore_missing_end=False, 266 **kwargs): 267 """ 268 Creates a new HDU object of the appropriate type from a string 269 containing the HDU's entire header and, optionally, its data. 270 271 Note: When creating a new HDU from a string without a backing file 272 object, the data of that HDU may be read-only. It depends on whether 273 the underlying string was an immutable Python str/bytes object, or some 274 kind of read-write memory buffer such as a `memoryview`. 275 276 Parameters 277 ---------- 278 data : str, bytearray, memoryview, ndarray 279 A byte string containing the HDU's header and data. 280 281 checksum : bool, optional 282 Check the HDU's checksum and/or datasum. 283 284 ignore_missing_end : bool, optional 285 Ignore a missing end card in the header data. Note that without the 286 end card the end of the header may be ambiguous and resulted in a 287 corrupt HDU. In this case the assumption is that the first 2880 288 block that does not begin with valid FITS header data is the 289 beginning of the data. 290 291 kwargs : optional 292 May consist of additional keyword arguments specific to an HDU 293 type--these correspond to keywords recognized by the constructors of 294 different HDU classes such as `PrimaryHDU`, `ImageHDU`, or 295 `BinTableHDU`. Any unrecognized keyword arguments are simply 296 ignored. 297 """ 298 299 return cls._readfrom_internal(data, checksum=checksum, 300 ignore_missing_end=ignore_missing_end, 301 **kwargs) 302 303 @classmethod 304 def readfrom(cls, fileobj, checksum=False, ignore_missing_end=False, 305 **kwargs): 306 """ 307 Read the HDU from a file. Normally an HDU should be opened with 308 :func:`open` which reads the entire HDU list in a FITS file. But this 309 method is still provided for symmetry with :func:`writeto`. 310 311 Parameters 312 ---------- 313 fileobj : file-like 314 Input FITS file. The file's seek pointer is assumed to be at the 315 beginning of the HDU. 316 317 checksum : bool 318 If `True`, verifies that both ``DATASUM`` and ``CHECKSUM`` card 319 values (when present in the HDU header) match the header and data 320 of all HDU's in the file. 321 322 ignore_missing_end : bool 323 Do not issue an exception when opening a file that is missing an 324 ``END`` card in the last header. 325 """ 326 327 # TODO: Figure out a way to make it possible for the _File 328 # constructor to be a noop if the argument is already a _File 329 if not isinstance(fileobj, _File): 330 fileobj = _File(fileobj) 331 332 hdu = cls._readfrom_internal(fileobj, checksum=checksum, 333 ignore_missing_end=ignore_missing_end, 334 **kwargs) 335 336 # If the checksum had to be checked the data may have already been read 337 # from the file, in which case we don't want to seek relative 338 fileobj.seek(hdu._data_offset + hdu._data_size, os.SEEK_SET) 339 return hdu 340 341 @deprecated_renamed_argument('clobber', 'overwrite', '2.0', 342 message='"clobber" was deprecated in version ' 343 '2.0 and will be removed in version ' 344 '5.1. Use argument "overwrite" ' 345 'instead.') 346 def writeto(self, name, output_verify='exception', overwrite=False, 347 checksum=False): 348 """ 349 Write the HDU to a new file. This is a convenience method to 350 provide a user easier output interface if only one HDU needs 351 to be written to a file. 352 353 Parameters 354 ---------- 355 name : path-like or file-like 356 Output FITS file. If the file object is already opened, it must 357 be opened in a writeable mode. 358 359 output_verify : str 360 Output verification option. Must be one of ``"fix"``, 361 ``"silentfix"``, ``"ignore"``, ``"warn"``, or 362 ``"exception"``. May also be any combination of ``"fix"`` or 363 ``"silentfix"`` with ``"+ignore"``, ``+warn``, or ``+exception" 364 (e.g. ``"fix+warn"``). See :ref:`astropy:verify` for more info. 365 366 overwrite : bool, optional 367 If ``True``, overwrite the output file if it exists. Raises an 368 ``OSError`` if ``False`` and the output file exists. Default is 369 ``False``. 370 371 .. versionchanged:: 1.3 372 ``overwrite`` replaces the deprecated ``clobber`` argument. 373 374 checksum : bool 375 When `True` adds both ``DATASUM`` and ``CHECKSUM`` cards 376 to the header of the HDU when written to the file. 377 """ 378 379 from .hdulist import HDUList 380 381 hdulist = HDUList([self]) 382 hdulist.writeto(name, output_verify, overwrite=overwrite, 383 checksum=checksum) 384 385 @classmethod 386 def _from_data(cls, data, header, **kwargs): 387 """ 388 Instantiate the HDU object after guessing the HDU class from the 389 FITS Header. 390 """ 391 klass = _hdu_class_from_header(cls, header) 392 return klass(data=data, header=header, **kwargs) 393 394 @classmethod 395 def _readfrom_internal(cls, data, header=None, checksum=False, 396 ignore_missing_end=False, **kwargs): 397 """ 398 Provides the bulk of the internal implementation for readfrom and 399 fromstring. 400 401 For some special cases, supports using a header that was already 402 created, and just using the input data for the actual array data. 403 """ 404 405 hdu_buffer = None 406 hdu_fileobj = None 407 header_offset = 0 408 409 if isinstance(data, _File): 410 if header is None: 411 header_offset = data.tell() 412 try: 413 # First we try to read the header with the fast parser 414 # from _BasicHeader, which will read only the standard 415 # 8 character keywords to get the structural keywords 416 # that are needed to build the HDU object. 417 header_str, header = _BasicHeader.fromfile(data) 418 except Exception: 419 # If the fast header parsing failed, then fallback to 420 # the classic Header parser, which has better support 421 # and reporting for the various issues that can be found 422 # in the wild. 423 data.seek(header_offset) 424 header = Header.fromfile(data, 425 endcard=not ignore_missing_end) 426 hdu_fileobj = data 427 data_offset = data.tell() # *after* reading the header 428 else: 429 try: 430 # Test that the given object supports the buffer interface by 431 # ensuring an ndarray can be created from it 432 np.ndarray((), dtype='ubyte', buffer=data) 433 except TypeError: 434 raise TypeError( 435 'The provided object {!r} does not contain an underlying ' 436 'memory buffer. fromstring() requires an object that ' 437 'supports the buffer interface such as bytes, buffer, ' 438 'memoryview, ndarray, etc. This restriction is to ensure ' 439 'that efficient access to the array/table data is possible.' 440 .format(data)) 441 442 if header is None: 443 def block_iter(nbytes): 444 idx = 0 445 while idx < len(data): 446 yield data[idx:idx + nbytes] 447 idx += nbytes 448 449 header_str, header = Header._from_blocks( 450 block_iter, True, '', not ignore_missing_end, True) 451 452 if len(data) > len(header_str): 453 hdu_buffer = data 454 elif data: 455 hdu_buffer = data 456 457 header_offset = 0 458 data_offset = len(header_str) 459 460 # Determine the appropriate arguments to pass to the constructor from 461 # self._kwargs. self._kwargs contains any number of optional arguments 462 # that may or may not be valid depending on the HDU type 463 cls = _hdu_class_from_header(cls, header) 464 sig = signature(cls.__init__) 465 new_kwargs = kwargs.copy() 466 if Parameter.VAR_KEYWORD not in (x.kind for x in sig.parameters.values()): 467 # If __init__ accepts arbitrary keyword arguments, then we can go 468 # ahead and pass all keyword arguments; otherwise we need to delete 469 # any that are invalid 470 for key in kwargs: 471 if key not in sig.parameters: 472 del new_kwargs[key] 473 474 try: 475 hdu = cls(data=DELAYED, header=header, **new_kwargs) 476 except TypeError: 477 # This may happen because some HDU class (e.g. GroupsHDU) wants 478 # to set a keyword on the header, which is not possible with the 479 # _BasicHeader. While HDU classes should not need to modify the 480 # header in general, sometimes this is needed to fix it. So in 481 # this case we build a full Header and try again to create the 482 # HDU object. 483 if isinstance(header, _BasicHeader): 484 header = Header.fromstring(header_str) 485 hdu = cls(data=DELAYED, header=header, **new_kwargs) 486 else: 487 raise 488 489 # One of these may be None, depending on whether the data came from a 490 # file or a string buffer--later this will be further abstracted 491 hdu._file = hdu_fileobj 492 hdu._buffer = hdu_buffer 493 494 hdu._header_offset = header_offset # beginning of the header area 495 hdu._data_offset = data_offset # beginning of the data area 496 497 # data area size, including padding 498 size = hdu.size 499 hdu._data_size = size + _pad_length(size) 500 501 if isinstance(hdu._header, _BasicHeader): 502 # Delete the temporary _BasicHeader. 503 # We need to do this before an eventual checksum computation, 504 # since it needs to modify temporarily the header 505 # 506 # The header string is stored in the HDU._header_str attribute, 507 # so that it can be used directly when we need to create the 508 # classic Header object, without having to parse again the file. 509 del hdu._header 510 hdu._header_str = header_str 511 512 # Checksums are not checked on invalid HDU types 513 if checksum and checksum != 'remove' and isinstance(hdu, _ValidHDU): 514 hdu._verify_checksum_datasum() 515 516 return hdu 517 518 def _get_raw_data(self, shape, code, offset): 519 """ 520 Return raw array from either the HDU's memory buffer or underlying 521 file. 522 """ 523 524 if isinstance(shape, int): 525 shape = (shape,) 526 527 if self._buffer: 528 return np.ndarray(shape, dtype=code, buffer=self._buffer, 529 offset=offset) 530 elif self._file: 531 return self._file.readarray(offset=offset, dtype=code, shape=shape) 532 else: 533 return None 534 535 # TODO: Rework checksum handling so that it's not necessary to add a 536 # checksum argument here 537 # TODO: The BaseHDU class shouldn't even handle checksums since they're 538 # only implemented on _ValidHDU... 539 def _prewriteto(self, checksum=False, inplace=False): 540 self._update_pseudo_int_scale_keywords() 541 542 # Handle checksum 543 self._update_checksum(checksum) 544 545 def _update_pseudo_int_scale_keywords(self): 546 """ 547 If the data is signed int 8, unsigned int 16, 32, or 64, 548 add BSCALE/BZERO cards to header. 549 """ 550 551 if (self._has_data and self._standard and 552 _is_pseudo_integer(self.data.dtype)): 553 # CompImageHDUs need TFIELDS immediately after GCOUNT, 554 # so BSCALE has to go after TFIELDS if it exists. 555 if 'TFIELDS' in self._header: 556 self._header.set('BSCALE', 1, after='TFIELDS') 557 elif 'GCOUNT' in self._header: 558 self._header.set('BSCALE', 1, after='GCOUNT') 559 else: 560 self._header.set('BSCALE', 1) 561 self._header.set('BZERO', _pseudo_zero(self.data.dtype), 562 after='BSCALE') 563 564 def _update_checksum(self, checksum, checksum_keyword='CHECKSUM', 565 datasum_keyword='DATASUM'): 566 """Update the 'CHECKSUM' and 'DATASUM' keywords in the header (or 567 keywords with equivalent semantics given by the ``checksum_keyword`` 568 and ``datasum_keyword`` arguments--see for example ``CompImageHDU`` 569 for an example of why this might need to be overridden). 570 """ 571 572 # If the data is loaded it isn't necessarily 'modified', but we have no 573 # way of knowing for sure 574 modified = self._header._modified or self._data_loaded 575 576 if checksum == 'remove': 577 if checksum_keyword in self._header: 578 del self._header[checksum_keyword] 579 580 if datasum_keyword in self._header: 581 del self._header[datasum_keyword] 582 elif (modified or self._new or 583 (checksum and ('CHECKSUM' not in self._header or 584 'DATASUM' not in self._header or 585 not self._checksum_valid or 586 not self._datasum_valid))): 587 if checksum == 'datasum': 588 self.add_datasum(datasum_keyword=datasum_keyword) 589 elif checksum: 590 self.add_checksum(checksum_keyword=checksum_keyword, 591 datasum_keyword=datasum_keyword) 592 593 def _postwriteto(self): 594 # If data is unsigned integer 16, 32 or 64, remove the 595 # BSCALE/BZERO cards 596 if (self._has_data and self._standard and 597 _is_pseudo_integer(self.data.dtype)): 598 for keyword in ('BSCALE', 'BZERO'): 599 with suppress(KeyError): 600 del self._header[keyword] 601 602 def _writeheader(self, fileobj): 603 offset = 0 604 with suppress(AttributeError, OSError): 605 offset = fileobj.tell() 606 607 self._header.tofile(fileobj) 608 609 try: 610 size = fileobj.tell() - offset 611 except (AttributeError, OSError): 612 size = len(str(self._header)) 613 614 return offset, size 615 616 def _writedata(self, fileobj): 617 size = 0 618 fileobj.flush() 619 try: 620 offset = fileobj.tell() 621 except (AttributeError, OSError): 622 offset = 0 623 624 if self._data_loaded or self._data_needs_rescale: 625 if self.data is not None: 626 size += self._writedata_internal(fileobj) 627 # pad the FITS data block 628 # to avoid a bug in the lustre filesystem client, don't 629 # write zero-byte objects 630 if size > 0 and _pad_length(size) > 0: 631 padding = _pad_length(size) * self._padding_byte 632 # TODO: Not that this is ever likely, but if for some odd 633 # reason _padding_byte is > 0x80 this will fail; but really if 634 # somebody's custom fits format is doing that, they're doing it 635 # wrong and should be reprimanded harshly. 636 fileobj.write(padding.encode('ascii')) 637 size += len(padding) 638 else: 639 # The data has not been modified or does not need need to be 640 # rescaled, so it can be copied, unmodified, directly from an 641 # existing file or buffer 642 size += self._writedata_direct_copy(fileobj) 643 644 # flush, to make sure the content is written 645 fileobj.flush() 646 647 # return both the location and the size of the data area 648 return offset, size 649 650 def _writedata_internal(self, fileobj): 651 """ 652 The beginning and end of most _writedata() implementations are the 653 same, but the details of writing the data array itself can vary between 654 HDU types, so that should be implemented in this method. 655 656 Should return the size in bytes of the data written. 657 """ 658 659 fileobj.writearray(self.data) 660 return self.data.size * self.data.itemsize 661 662 def _writedata_direct_copy(self, fileobj): 663 """Copies the data directly from one file/buffer to the new file. 664 665 For now this is handled by loading the raw data from the existing data 666 (including any padding) via a memory map or from an already in-memory 667 buffer and using Numpy's existing file-writing facilities to write to 668 the new file. 669 670 If this proves too slow a more direct approach may be used. 671 """ 672 raw = self._get_raw_data(self._data_size, 'ubyte', self._data_offset) 673 if raw is not None: 674 fileobj.writearray(raw) 675 return raw.nbytes 676 else: 677 return 0 678 679 # TODO: This is the start of moving HDU writing out of the _File class; 680 # Though right now this is an internal private method (though still used by 681 # HDUList, eventually the plan is to have this be moved into writeto() 682 # somehow... 683 def _writeto(self, fileobj, inplace=False, copy=False): 684 try: 685 dirname = os.path.dirname(fileobj._file.name) 686 except (AttributeError, TypeError): 687 dirname = None 688 689 with _free_space_check(self, dirname): 690 self._writeto_internal(fileobj, inplace, copy) 691 692 def _writeto_internal(self, fileobj, inplace, copy): 693 # For now fileobj is assumed to be a _File object 694 if not inplace or self._new: 695 header_offset, _ = self._writeheader(fileobj) 696 data_offset, data_size = self._writedata(fileobj) 697 698 # Set the various data location attributes on newly-written HDUs 699 if self._new: 700 self._header_offset = header_offset 701 self._data_offset = data_offset 702 self._data_size = data_size 703 return 704 705 hdrloc = self._header_offset 706 hdrsize = self._data_offset - self._header_offset 707 datloc = self._data_offset 708 datsize = self._data_size 709 710 if self._header._modified: 711 # Seek to the original header location in the file 712 self._file.seek(hdrloc) 713 # This should update hdrloc with he header location in the new file 714 hdrloc, hdrsize = self._writeheader(fileobj) 715 716 # If the data is to be written below with self._writedata, that 717 # will also properly update the data location; but it should be 718 # updated here too 719 datloc = hdrloc + hdrsize 720 elif copy: 721 # Seek to the original header location in the file 722 self._file.seek(hdrloc) 723 # Before writing, update the hdrloc with the current file position, 724 # which is the hdrloc for the new file 725 hdrloc = fileobj.tell() 726 fileobj.write(self._file.read(hdrsize)) 727 # The header size is unchanged, but the data location may be 728 # different from before depending on if previous HDUs were resized 729 datloc = fileobj.tell() 730 731 if self._data_loaded: 732 if self.data is not None: 733 # Seek through the array's bases for an memmap'd array; we 734 # can't rely on the _File object to give us this info since 735 # the user may have replaced the previous mmap'd array 736 if copy or self._data_replaced: 737 # Of course, if we're copying the data to a new file 738 # we don't care about flushing the original mmap; 739 # instead just read it into the new file 740 array_mmap = None 741 else: 742 array_mmap = _get_array_mmap(self.data) 743 744 if array_mmap is not None: 745 array_mmap.flush() 746 else: 747 self._file.seek(self._data_offset) 748 datloc, datsize = self._writedata(fileobj) 749 elif copy: 750 datsize = self._writedata_direct_copy(fileobj) 751 752 self._header_offset = hdrloc 753 self._data_offset = datloc 754 self._data_size = datsize 755 self._data_replaced = False 756 757 def _close(self, closed=True): 758 # If the data was mmap'd, close the underlying mmap (this will 759 # prevent any future access to the .data attribute if there are 760 # not other references to it; if there are other references then 761 # it is up to the user to clean those up 762 if (closed and self._data_loaded and 763 _get_array_mmap(self.data) is not None): 764 del self.data 765 766 767# For backwards-compatibility, though nobody should have 768# been using this directly: 769_AllHDU = _BaseHDU 770 771# For convenience... 772# TODO: register_hdu could be made into a class decorator which would be pretty 773# cool, but only once 2.6 support is dropped. 774register_hdu = _BaseHDU.register_hdu 775unregister_hdu = _BaseHDU.unregister_hdu 776 777 778class _CorruptedHDU(_BaseHDU): 779 """ 780 A Corrupted HDU class. 781 782 This class is used when one or more mandatory `Card`s are 783 corrupted (unparsable), such as the ``BITPIX``, ``NAXIS``, or 784 ``END`` cards. A corrupted HDU usually means that the data size 785 cannot be calculated or the ``END`` card is not found. In the case 786 of a missing ``END`` card, the `Header` may also contain the binary 787 data 788 789 .. note:: 790 In future, it may be possible to decipher where the last block 791 of the `Header` ends, but this task may be difficult when the 792 extension is a `TableHDU` containing ASCII data. 793 """ 794 795 @property 796 def size(self): 797 """ 798 Returns the size (in bytes) of the HDU's data part. 799 """ 800 801 # Note: On compressed files this might report a negative size; but the 802 # file is corrupt anyways so I'm not too worried about it. 803 if self._buffer is not None: 804 return len(self._buffer) - self._data_offset 805 806 return self._file.size - self._data_offset 807 808 def _summary(self): 809 return (self.name, self.ver, 'CorruptedHDU') 810 811 def verify(self): 812 pass 813 814 815class _NonstandardHDU(_BaseHDU, _Verify): 816 """ 817 A Non-standard HDU class. 818 819 This class is used for a Primary HDU when the ``SIMPLE`` Card has 820 a value of `False`. A non-standard HDU comes from a file that 821 resembles a FITS file but departs from the standards in some 822 significant way. One example would be files where the numbers are 823 in the DEC VAX internal storage format rather than the standard 824 FITS most significant byte first. The header for this HDU should 825 be valid. The data for this HDU is read from the file as a byte 826 stream that begins at the first byte after the header ``END`` card 827 and continues until the end of the file. 828 """ 829 830 _standard = False 831 832 @classmethod 833 def match_header(cls, header): 834 """ 835 Matches any HDU that has the 'SIMPLE' keyword but is not a standard 836 Primary or Groups HDU. 837 """ 838 839 # The SIMPLE keyword must be in the first card 840 card = header.cards[0] 841 842 # The check that 'GROUPS' is missing is a bit redundant, since the 843 # match_header for GroupsHDU will always be called before this one. 844 if card.keyword == 'SIMPLE': 845 if 'GROUPS' not in header and card.value is False: 846 return True 847 else: 848 raise InvalidHDUException 849 else: 850 return False 851 852 @property 853 def size(self): 854 """ 855 Returns the size (in bytes) of the HDU's data part. 856 """ 857 858 if self._buffer is not None: 859 return len(self._buffer) - self._data_offset 860 861 return self._file.size - self._data_offset 862 863 def _writedata(self, fileobj): 864 """ 865 Differs from the base class :class:`_writedata` in that it doesn't 866 automatically add padding, and treats the data as a string of raw bytes 867 instead of an array. 868 """ 869 870 offset = 0 871 size = 0 872 873 fileobj.flush() 874 try: 875 offset = fileobj.tell() 876 except OSError: 877 offset = 0 878 879 if self.data is not None: 880 fileobj.write(self.data) 881 # flush, to make sure the content is written 882 fileobj.flush() 883 size = len(self.data) 884 885 # return both the location and the size of the data area 886 return offset, size 887 888 def _summary(self): 889 return (self.name, self.ver, 'NonstandardHDU', len(self._header)) 890 891 @lazyproperty 892 def data(self): 893 """ 894 Return the file data. 895 """ 896 897 return self._get_raw_data(self.size, 'ubyte', self._data_offset) 898 899 def _verify(self, option='warn'): 900 errs = _ErrList([], unit='Card') 901 902 # verify each card 903 for card in self._header.cards: 904 errs.append(card._verify(option)) 905 906 return errs 907 908 909class _ValidHDU(_BaseHDU, _Verify): 910 """ 911 Base class for all HDUs which are not corrupted. 912 """ 913 914 def __init__(self, data=None, header=None, name=None, ver=None, **kwargs): 915 super().__init__(data=data, header=header) 916 917 if (header is not None and 918 not isinstance(header, (Header, _BasicHeader))): 919 # TODO: Instead maybe try initializing a new Header object from 920 # whatever is passed in as the header--there are various types 921 # of objects that could work for this... 922 raise ValueError('header must be a Header object') 923 924 # NOTE: private data members _checksum and _datasum are used by the 925 # utility script "fitscheck" to detect missing checksums. 926 self._checksum = None 927 self._checksum_valid = None 928 self._datasum = None 929 self._datasum_valid = None 930 931 if name is not None: 932 self.name = name 933 if ver is not None: 934 self.ver = ver 935 936 @classmethod 937 def match_header(cls, header): 938 """ 939 Matches any HDU that is not recognized as having either the SIMPLE or 940 XTENSION keyword in its header's first card, but is nonetheless not 941 corrupted. 942 943 TODO: Maybe it would make more sense to use _NonstandardHDU in this 944 case? Not sure... 945 """ 946 947 return first(header.keys()) not in ('SIMPLE', 'XTENSION') 948 949 @property 950 def size(self): 951 """ 952 Size (in bytes) of the data portion of the HDU. 953 """ 954 955 size = 0 956 naxis = self._header.get('NAXIS', 0) 957 if naxis > 0: 958 size = 1 959 for idx in range(naxis): 960 size = size * self._header['NAXIS' + str(idx + 1)] 961 bitpix = self._header['BITPIX'] 962 gcount = self._header.get('GCOUNT', 1) 963 pcount = self._header.get('PCOUNT', 0) 964 size = abs(bitpix) * gcount * (pcount + size) // 8 965 return size 966 967 def filebytes(self): 968 """ 969 Calculates and returns the number of bytes that this HDU will write to 970 a file. 971 """ 972 973 f = _File() 974 # TODO: Fix this once new HDU writing API is settled on 975 return self._writeheader(f)[1] + self._writedata(f)[1] 976 977 def fileinfo(self): 978 """ 979 Returns a dictionary detailing information about the locations 980 of this HDU within any associated file. The values are only 981 valid after a read or write of the associated file with no 982 intervening changes to the `HDUList`. 983 984 Returns 985 ------- 986 dict or None 987 988 The dictionary details information about the locations of 989 this HDU within an associated file. Returns `None` when 990 the HDU is not associated with a file. 991 992 Dictionary contents: 993 994 ========== ================================================ 995 Key Value 996 ========== ================================================ 997 file File object associated with the HDU 998 filemode Mode in which the file was opened (readonly, copyonwrite, 999 update, append, ostream) 1000 hdrLoc Starting byte location of header in file 1001 datLoc Starting byte location of data block in file 1002 datSpan Data size including padding 1003 ========== ================================================ 1004 """ 1005 1006 if hasattr(self, '_file') and self._file: 1007 return {'file': self._file, 'filemode': self._file.mode, 1008 'hdrLoc': self._header_offset, 'datLoc': self._data_offset, 1009 'datSpan': self._data_size} 1010 else: 1011 return None 1012 1013 def copy(self): 1014 """ 1015 Make a copy of the HDU, both header and data are copied. 1016 """ 1017 1018 if self.data is not None: 1019 data = self.data.copy() 1020 else: 1021 data = None 1022 return self.__class__(data=data, header=self._header.copy()) 1023 1024 def _verify(self, option='warn'): 1025 errs = _ErrList([], unit='Card') 1026 1027 is_valid = BITPIX2DTYPE.__contains__ 1028 1029 # Verify location and value of mandatory keywords. 1030 # Do the first card here, instead of in the respective HDU classes, so 1031 # the checking is in order, in case of required cards in wrong order. 1032 if isinstance(self, ExtensionHDU): 1033 firstkey = 'XTENSION' 1034 firstval = self._extension 1035 else: 1036 firstkey = 'SIMPLE' 1037 firstval = True 1038 1039 self.req_cards(firstkey, 0, None, firstval, option, errs) 1040 self.req_cards('BITPIX', 1, lambda v: (_is_int(v) and is_valid(v)), 8, 1041 option, errs) 1042 self.req_cards('NAXIS', 2, 1043 lambda v: (_is_int(v) and 0 <= v <= 999), 0, 1044 option, errs) 1045 1046 naxis = self._header.get('NAXIS', 0) 1047 if naxis < 1000: 1048 for ax in range(3, naxis + 3): 1049 key = 'NAXIS' + str(ax - 2) 1050 self.req_cards(key, ax, 1051 lambda v: (_is_int(v) and v >= 0), 1052 _extract_number(self._header[key], default=1), 1053 option, errs) 1054 1055 # Remove NAXISj cards where j is not in range 1, naxis inclusive. 1056 for keyword in self._header: 1057 if keyword.startswith('NAXIS') and len(keyword) > 5: 1058 try: 1059 number = int(keyword[5:]) 1060 if number <= 0 or number > naxis: 1061 raise ValueError 1062 except ValueError: 1063 err_text = ("NAXISj keyword out of range ('{}' when " 1064 "NAXIS == {})".format(keyword, naxis)) 1065 1066 def fix(self=self, keyword=keyword): 1067 del self._header[keyword] 1068 1069 errs.append( 1070 self.run_option(option=option, err_text=err_text, 1071 fix=fix, fix_text="Deleted.")) 1072 1073 # Verify that the EXTNAME keyword exists and is a string 1074 if 'EXTNAME' in self._header: 1075 if not isinstance(self._header['EXTNAME'], str): 1076 err_text = 'The EXTNAME keyword must have a string value.' 1077 fix_text = 'Converted the EXTNAME keyword to a string value.' 1078 1079 def fix(header=self._header): 1080 header['EXTNAME'] = str(header['EXTNAME']) 1081 1082 errs.append(self.run_option(option, err_text=err_text, 1083 fix_text=fix_text, fix=fix)) 1084 1085 # verify each card 1086 for card in self._header.cards: 1087 errs.append(card._verify(option)) 1088 1089 return errs 1090 1091 # TODO: Improve this API a little bit--for one, most of these arguments 1092 # could be optional 1093 def req_cards(self, keyword, pos, test, fix_value, option, errlist): 1094 """ 1095 Check the existence, location, and value of a required `Card`. 1096 1097 Parameters 1098 ---------- 1099 keyword : str 1100 The keyword to validate 1101 1102 pos : int, callable 1103 If an ``int``, this specifies the exact location this card should 1104 have in the header. Remember that Python is zero-indexed, so this 1105 means ``pos=0`` requires the card to be the first card in the 1106 header. If given a callable, it should take one argument--the 1107 actual position of the keyword--and return `True` or `False`. This 1108 can be used for custom evaluation. For example if 1109 ``pos=lambda idx: idx > 10`` this will check that the keyword's 1110 index is greater than 10. 1111 1112 test : callable 1113 This should be a callable (generally a function) that is passed the 1114 value of the given keyword and returns `True` or `False`. This can 1115 be used to validate the value associated with the given keyword. 1116 1117 fix_value : str, int, float, complex, bool, None 1118 A valid value for a FITS keyword to to use if the given ``test`` 1119 fails to replace an invalid value. In other words, this provides 1120 a default value to use as a replacement if the keyword's current 1121 value is invalid. If `None`, there is no replacement value and the 1122 keyword is unfixable. 1123 1124 option : str 1125 Output verification option. Must be one of ``"fix"``, 1126 ``"silentfix"``, ``"ignore"``, ``"warn"``, or 1127 ``"exception"``. May also be any combination of ``"fix"`` or 1128 ``"silentfix"`` with ``"+ignore"``, ``+warn``, or ``+exception" 1129 (e.g. ``"fix+warn"``). See :ref:`astropy:verify` for more info. 1130 1131 errlist : list 1132 A list of validation errors already found in the FITS file; this is 1133 used primarily for the validation system to collect errors across 1134 multiple HDUs and multiple calls to `req_cards`. 1135 1136 Notes 1137 ----- 1138 If ``pos=None``, the card can be anywhere in the header. If the card 1139 does not exist, the new card will have the ``fix_value`` as its value 1140 when created. Also check the card's value by using the ``test`` 1141 argument. 1142 """ 1143 1144 errs = errlist 1145 fix = None 1146 1147 try: 1148 index = self._header.index(keyword) 1149 except ValueError: 1150 index = None 1151 1152 fixable = fix_value is not None 1153 1154 insert_pos = len(self._header) + 1 1155 1156 # If pos is an int, insert at the given position (and convert it to a 1157 # lambda) 1158 if _is_int(pos): 1159 insert_pos = pos 1160 pos = lambda x: x == insert_pos 1161 1162 # if the card does not exist 1163 if index is None: 1164 err_text = f"'{keyword}' card does not exist." 1165 fix_text = f"Fixed by inserting a new '{keyword}' card." 1166 if fixable: 1167 # use repr to accommodate both string and non-string types 1168 # Boolean is also OK in this constructor 1169 card = (keyword, fix_value) 1170 1171 def fix(self=self, insert_pos=insert_pos, card=card): 1172 self._header.insert(insert_pos, card) 1173 1174 errs.append(self.run_option(option, err_text=err_text, 1175 fix_text=fix_text, fix=fix, fixable=fixable)) 1176 else: 1177 # if the supposed location is specified 1178 if pos is not None: 1179 if not pos(index): 1180 err_text = f"'{keyword}' card at the wrong place (card {index})." 1181 fix_text = f"Fixed by moving it to the right place (card {insert_pos})." 1182 1183 def fix(self=self, index=index, insert_pos=insert_pos): 1184 card = self._header.cards[index] 1185 del self._header[index] 1186 self._header.insert(insert_pos, card) 1187 1188 errs.append(self.run_option(option, err_text=err_text, 1189 fix_text=fix_text, fix=fix)) 1190 1191 # if value checking is specified 1192 if test: 1193 val = self._header[keyword] 1194 if not test(val): 1195 err_text = f"'{keyword}' card has invalid value '{val}'." 1196 fix_text = f"Fixed by setting a new value '{fix_value}'." 1197 1198 if fixable: 1199 def fix(self=self, keyword=keyword, val=fix_value): 1200 self._header[keyword] = fix_value 1201 1202 errs.append(self.run_option(option, err_text=err_text, 1203 fix_text=fix_text, fix=fix, fixable=fixable)) 1204 1205 return errs 1206 1207 def add_datasum(self, when=None, datasum_keyword='DATASUM'): 1208 """ 1209 Add the ``DATASUM`` card to this HDU with the value set to the 1210 checksum calculated for the data. 1211 1212 Parameters 1213 ---------- 1214 when : str, optional 1215 Comment string for the card that by default represents the 1216 time when the checksum was calculated 1217 1218 datasum_keyword : str, optional 1219 The name of the header keyword to store the datasum value in; 1220 this is typically 'DATASUM' per convention, but there exist 1221 use cases in which a different keyword should be used 1222 1223 Returns 1224 ------- 1225 checksum : int 1226 The calculated datasum 1227 1228 Notes 1229 ----- 1230 For testing purposes, provide a ``when`` argument to enable the comment 1231 value in the card to remain consistent. This will enable the 1232 generation of a ``CHECKSUM`` card with a consistent value. 1233 """ 1234 1235 cs = self._calculate_datasum() 1236 1237 if when is None: 1238 when = f'data unit checksum updated {self._get_timestamp()}' 1239 1240 self._header[datasum_keyword] = (str(cs), when) 1241 return cs 1242 1243 def add_checksum(self, when=None, override_datasum=False, 1244 checksum_keyword='CHECKSUM', datasum_keyword='DATASUM'): 1245 """ 1246 Add the ``CHECKSUM`` and ``DATASUM`` cards to this HDU with 1247 the values set to the checksum calculated for the HDU and the 1248 data respectively. The addition of the ``DATASUM`` card may 1249 be overridden. 1250 1251 Parameters 1252 ---------- 1253 when : str, optional 1254 comment string for the cards; by default the comments 1255 will represent the time when the checksum was calculated 1256 1257 override_datasum : bool, optional 1258 add the ``CHECKSUM`` card only 1259 1260 checksum_keyword : str, optional 1261 The name of the header keyword to store the checksum value in; this 1262 is typically 'CHECKSUM' per convention, but there exist use cases 1263 in which a different keyword should be used 1264 1265 datasum_keyword : str, optional 1266 See ``checksum_keyword`` 1267 1268 Notes 1269 ----- 1270 For testing purposes, first call `add_datasum` with a ``when`` 1271 argument, then call `add_checksum` with a ``when`` argument and 1272 ``override_datasum`` set to `True`. This will provide consistent 1273 comments for both cards and enable the generation of a ``CHECKSUM`` 1274 card with a consistent value. 1275 """ 1276 1277 if not override_datasum: 1278 # Calculate and add the data checksum to the header. 1279 data_cs = self.add_datasum(when, datasum_keyword=datasum_keyword) 1280 else: 1281 # Just calculate the data checksum 1282 data_cs = self._calculate_datasum() 1283 1284 if when is None: 1285 when = f'HDU checksum updated {self._get_timestamp()}' 1286 1287 # Add the CHECKSUM card to the header with a value of all zeros. 1288 if datasum_keyword in self._header: 1289 self._header.set(checksum_keyword, '0' * 16, when, 1290 before=datasum_keyword) 1291 else: 1292 self._header.set(checksum_keyword, '0' * 16, when) 1293 1294 csum = self._calculate_checksum(data_cs, 1295 checksum_keyword=checksum_keyword) 1296 self._header[checksum_keyword] = csum 1297 1298 def verify_datasum(self): 1299 """ 1300 Verify that the value in the ``DATASUM`` keyword matches the value 1301 calculated for the ``DATASUM`` of the current HDU data. 1302 1303 Returns 1304 ------- 1305 valid : int 1306 - 0 - failure 1307 - 1 - success 1308 - 2 - no ``DATASUM`` keyword present 1309 """ 1310 1311 if 'DATASUM' in self._header: 1312 datasum = self._calculate_datasum() 1313 if datasum == int(self._header['DATASUM']): 1314 return 1 1315 else: 1316 # Failed 1317 return 0 1318 else: 1319 return 2 1320 1321 def verify_checksum(self): 1322 """ 1323 Verify that the value in the ``CHECKSUM`` keyword matches the 1324 value calculated for the current HDU CHECKSUM. 1325 1326 Returns 1327 ------- 1328 valid : int 1329 - 0 - failure 1330 - 1 - success 1331 - 2 - no ``CHECKSUM`` keyword present 1332 """ 1333 1334 if 'CHECKSUM' in self._header: 1335 if 'DATASUM' in self._header: 1336 datasum = self._calculate_datasum() 1337 else: 1338 datasum = 0 1339 checksum = self._calculate_checksum(datasum) 1340 if checksum == self._header['CHECKSUM']: 1341 return 1 1342 else: 1343 # Failed 1344 return 0 1345 else: 1346 return 2 1347 1348 def _verify_checksum_datasum(self): 1349 """ 1350 Verify the checksum/datasum values if the cards exist in the header. 1351 Simply displays warnings if either the checksum or datasum don't match. 1352 """ 1353 1354 if 'CHECKSUM' in self._header: 1355 self._checksum = self._header['CHECKSUM'] 1356 self._checksum_valid = self.verify_checksum() 1357 if not self._checksum_valid: 1358 warnings.warn( 1359 'Checksum verification failed for HDU {}.\n'.format( 1360 (self.name, self.ver)), AstropyUserWarning) 1361 1362 if 'DATASUM' in self._header: 1363 self._datasum = self._header['DATASUM'] 1364 self._datasum_valid = self.verify_datasum() 1365 if not self._datasum_valid: 1366 warnings.warn( 1367 'Datasum verification failed for HDU {}.\n'.format( 1368 (self.name, self.ver)), AstropyUserWarning) 1369 1370 def _get_timestamp(self): 1371 """ 1372 Return the current timestamp in ISO 8601 format, with microseconds 1373 stripped off. 1374 1375 Ex.: 2007-05-30T19:05:11 1376 """ 1377 1378 return datetime.datetime.now().isoformat()[:19] 1379 1380 def _calculate_datasum(self): 1381 """ 1382 Calculate the value for the ``DATASUM`` card in the HDU. 1383 """ 1384 1385 if not self._data_loaded: 1386 # This is the case where the data has not been read from the file 1387 # yet. We find the data in the file, read it, and calculate the 1388 # datasum. 1389 if self.size > 0: 1390 raw_data = self._get_raw_data(self._data_size, 'ubyte', 1391 self._data_offset) 1392 return self._compute_checksum(raw_data) 1393 else: 1394 return 0 1395 elif self.data is not None: 1396 return self._compute_checksum(self.data.view('ubyte')) 1397 else: 1398 return 0 1399 1400 def _calculate_checksum(self, datasum, checksum_keyword='CHECKSUM'): 1401 """ 1402 Calculate the value of the ``CHECKSUM`` card in the HDU. 1403 """ 1404 1405 old_checksum = self._header[checksum_keyword] 1406 self._header[checksum_keyword] = '0' * 16 1407 1408 # Convert the header to bytes. 1409 s = self._header.tostring().encode('utf8') 1410 1411 # Calculate the checksum of the Header and data. 1412 cs = self._compute_checksum(np.frombuffer(s, dtype='ubyte'), datasum) 1413 1414 # Encode the checksum into a string. 1415 s = self._char_encode(~cs) 1416 1417 # Return the header card value. 1418 self._header[checksum_keyword] = old_checksum 1419 1420 return s 1421 1422 def _compute_checksum(self, data, sum32=0): 1423 """ 1424 Compute the ones-complement checksum of a sequence of bytes. 1425 1426 Parameters 1427 ---------- 1428 data 1429 a memory region to checksum 1430 1431 sum32 1432 incremental checksum value from another region 1433 1434 Returns 1435 ------- 1436 ones complement checksum 1437 """ 1438 1439 blocklen = 2880 1440 sum32 = np.uint32(sum32) 1441 for i in range(0, len(data), blocklen): 1442 length = min(blocklen, len(data) - i) # ???? 1443 sum32 = self._compute_hdu_checksum(data[i:i + length], sum32) 1444 return sum32 1445 1446 def _compute_hdu_checksum(self, data, sum32=0): 1447 """ 1448 Translated from FITS Checksum Proposal by Seaman, Pence, and Rots. 1449 Use uint32 literals as a hedge against type promotion to int64. 1450 1451 This code should only be called with blocks of 2880 bytes 1452 Longer blocks result in non-standard checksums with carry overflow 1453 Historically, this code *was* called with larger blocks and for that 1454 reason still needs to be for backward compatibility. 1455 """ 1456 1457 u8 = np.uint32(8) 1458 u16 = np.uint32(16) 1459 uFFFF = np.uint32(0xFFFF) 1460 1461 if data.nbytes % 2: 1462 last = data[-1] 1463 data = data[:-1] 1464 else: 1465 last = np.uint32(0) 1466 1467 data = data.view('>u2') 1468 1469 hi = sum32 >> u16 1470 lo = sum32 & uFFFF 1471 hi += np.add.reduce(data[0::2], dtype=np.uint64) 1472 lo += np.add.reduce(data[1::2], dtype=np.uint64) 1473 1474 if (data.nbytes // 2) % 2: 1475 lo += last << u8 1476 else: 1477 hi += last << u8 1478 1479 hicarry = hi >> u16 1480 locarry = lo >> u16 1481 1482 while hicarry or locarry: 1483 hi = (hi & uFFFF) + locarry 1484 lo = (lo & uFFFF) + hicarry 1485 hicarry = hi >> u16 1486 locarry = lo >> u16 1487 1488 return (hi << u16) + lo 1489 1490 # _MASK and _EXCLUDE used for encoding the checksum value into a character 1491 # string. 1492 _MASK = [0xFF000000, 1493 0x00FF0000, 1494 0x0000FF00, 1495 0x000000FF] 1496 1497 _EXCLUDE = [0x3a, 0x3b, 0x3c, 0x3d, 0x3e, 0x3f, 0x40, 1498 0x5b, 0x5c, 0x5d, 0x5e, 0x5f, 0x60] 1499 1500 def _encode_byte(self, byte): 1501 """ 1502 Encode a single byte. 1503 """ 1504 1505 quotient = byte // 4 + ord('0') 1506 remainder = byte % 4 1507 1508 ch = np.array( 1509 [(quotient + remainder), quotient, quotient, quotient], 1510 dtype='int32') 1511 1512 check = True 1513 while check: 1514 check = False 1515 for x in self._EXCLUDE: 1516 for j in [0, 2]: 1517 if ch[j] == x or ch[j + 1] == x: 1518 ch[j] += 1 1519 ch[j + 1] -= 1 1520 check = True 1521 return ch 1522 1523 def _char_encode(self, value): 1524 """ 1525 Encodes the checksum ``value`` using the algorithm described 1526 in SPR section A.7.2 and returns it as a 16 character string. 1527 1528 Parameters 1529 ---------- 1530 value 1531 a checksum 1532 1533 Returns 1534 ------- 1535 ascii encoded checksum 1536 """ 1537 1538 value = np.uint32(value) 1539 1540 asc = np.zeros((16,), dtype='byte') 1541 ascii = np.zeros((16,), dtype='byte') 1542 1543 for i in range(4): 1544 byte = (value & self._MASK[i]) >> ((3 - i) * 8) 1545 ch = self._encode_byte(byte) 1546 for j in range(4): 1547 asc[4 * j + i] = ch[j] 1548 1549 for i in range(16): 1550 ascii[i] = asc[(i + 15) % 16] 1551 1552 return decode_ascii(ascii.tobytes()) 1553 1554 1555class ExtensionHDU(_ValidHDU): 1556 """ 1557 An extension HDU class. 1558 1559 This class is the base class for the `TableHDU`, `ImageHDU`, and 1560 `BinTableHDU` classes. 1561 """ 1562 1563 _extension = '' 1564 1565 @classmethod 1566 def match_header(cls, header): 1567 """ 1568 This class should never be instantiated directly. Either a standard 1569 extension HDU type should be used for a specific extension, or 1570 NonstandardExtHDU should be used. 1571 """ 1572 1573 raise NotImplementedError 1574 1575 @deprecated_renamed_argument('clobber', 'overwrite', '2.0', 1576 message='"clobber" was deprecated in version ' 1577 '2.0 and will be removed in version ' 1578 '5.1. Use argument "overwrite" ' 1579 'instead.') 1580 def writeto(self, name, output_verify='exception', overwrite=False, 1581 checksum=False): 1582 """ 1583 Works similarly to the normal writeto(), but prepends a default 1584 `PrimaryHDU` are required by extension HDUs (which cannot stand on 1585 their own). 1586 1587 .. versionchanged:: 1.3 1588 ``overwrite`` replaces the deprecated ``clobber`` argument. 1589 """ 1590 1591 from .hdulist import HDUList 1592 from .image import PrimaryHDU 1593 1594 hdulist = HDUList([PrimaryHDU(), self]) 1595 hdulist.writeto(name, output_verify, overwrite=overwrite, 1596 checksum=checksum) 1597 1598 def _verify(self, option='warn'): 1599 1600 errs = super()._verify(option=option) 1601 1602 # Verify location and value of mandatory keywords. 1603 naxis = self._header.get('NAXIS', 0) 1604 self.req_cards('PCOUNT', naxis + 3, lambda v: (_is_int(v) and v >= 0), 1605 0, option, errs) 1606 self.req_cards('GCOUNT', naxis + 4, lambda v: (_is_int(v) and v == 1), 1607 1, option, errs) 1608 1609 return errs 1610 1611 1612# For backwards compatibility, though this needs to be deprecated 1613# TODO: Mark this as deprecated 1614_ExtensionHDU = ExtensionHDU 1615 1616 1617class NonstandardExtHDU(ExtensionHDU): 1618 """ 1619 A Non-standard Extension HDU class. 1620 1621 This class is used for an Extension HDU when the ``XTENSION`` 1622 `Card` has a non-standard value. In this case, Astropy can figure 1623 out how big the data is but not what it is. The data for this HDU 1624 is read from the file as a byte stream that begins at the first 1625 byte after the header ``END`` card and continues until the 1626 beginning of the next header or the end of the file. 1627 """ 1628 1629 _standard = False 1630 1631 @classmethod 1632 def match_header(cls, header): 1633 """ 1634 Matches any extension HDU that is not one of the standard extension HDU 1635 types. 1636 """ 1637 1638 card = header.cards[0] 1639 xtension = card.value 1640 if isinstance(xtension, str): 1641 xtension = xtension.rstrip() 1642 # A3DTABLE is not really considered a 'standard' extension, as it was 1643 # sort of the prototype for BINTABLE; however, since our BINTABLE 1644 # implementation handles A3DTABLE HDUs it is listed here. 1645 standard_xtensions = ('IMAGE', 'TABLE', 'BINTABLE', 'A3DTABLE') 1646 # The check that xtension is not one of the standard types should be 1647 # redundant. 1648 return (card.keyword == 'XTENSION' and 1649 xtension not in standard_xtensions) 1650 1651 def _summary(self): 1652 axes = tuple(self.data.shape) 1653 return (self.name, self.ver, 'NonstandardExtHDU', len(self._header), axes) 1654 1655 @lazyproperty 1656 def data(self): 1657 """ 1658 Return the file data. 1659 """ 1660 1661 return self._get_raw_data(self.size, 'ubyte', self._data_offset) 1662 1663 1664# TODO: Mark this as deprecated 1665_NonstandardExtHDU = NonstandardExtHDU 1666