1# Licensed under a 3-clause BSD style license - see PYFITS.rst 2 3import gzip 4import itertools 5import io 6import mmap 7import operator 8import os 9import platform 10import signal 11import sys 12import tempfile 13import textwrap 14import threading 15import warnings 16import weakref 17from contextlib import contextmanager, suppress 18from functools import wraps 19 20from astropy.utils import data 21 22from distutils.version import LooseVersion 23 24import numpy as np 25 26from astropy.utils.exceptions import AstropyUserWarning 27 28path_like = (str, os.PathLike) 29 30cmp = lambda a, b: (a > b) - (a < b) 31 32all_integer_types = (int, np.integer) 33 34 35class NotifierMixin: 36 """ 37 Mixin class that provides services by which objects can register 38 listeners to changes on that object. 39 40 All methods provided by this class are underscored, since this is intended 41 for internal use to communicate between classes in a generic way, and is 42 not machinery that should be exposed to users of the classes involved. 43 44 Use the ``_add_listener`` method to register a listener on an instance of 45 the notifier. This registers the listener with a weak reference, so if 46 no other references to the listener exist it is automatically dropped from 47 the list and does not need to be manually removed. 48 49 Call the ``_notify`` method on the notifier to update all listeners 50 upon changes. ``_notify('change_type', *args, **kwargs)`` results 51 in calling ``listener._update_change_type(*args, **kwargs)`` on all 52 listeners subscribed to that notifier. 53 54 If a particular listener does not have the appropriate update method 55 it is ignored. 56 57 Examples 58 -------- 59 60 >>> class Widget(NotifierMixin): 61 ... state = 1 62 ... def __init__(self, name): 63 ... self.name = name 64 ... def update_state(self): 65 ... self.state += 1 66 ... self._notify('widget_state_changed', self) 67 ... 68 >>> class WidgetListener: 69 ... def _update_widget_state_changed(self, widget): 70 ... print('Widget {0} changed state to {1}'.format( 71 ... widget.name, widget.state)) 72 ... 73 >>> widget = Widget('fred') 74 >>> listener = WidgetListener() 75 >>> widget._add_listener(listener) 76 >>> widget.update_state() 77 Widget fred changed state to 2 78 """ 79 80 _listeners = None 81 82 def _add_listener(self, listener): 83 """ 84 Add an object to the list of listeners to notify of changes to this 85 object. This adds a weakref to the list of listeners that is 86 removed from the listeners list when the listener has no other 87 references to it. 88 """ 89 90 if self._listeners is None: 91 self._listeners = weakref.WeakValueDictionary() 92 93 self._listeners[id(listener)] = listener 94 95 def _remove_listener(self, listener): 96 """ 97 Removes the specified listener from the listeners list. This relies 98 on object identity (i.e. the ``is`` operator). 99 """ 100 101 if self._listeners is None: 102 return 103 104 with suppress(KeyError): 105 del self._listeners[id(listener)] 106 107 def _notify(self, notification, *args, **kwargs): 108 """ 109 Notify all listeners of some particular state change by calling their 110 ``_update_<notification>`` method with the given ``*args`` and 111 ``**kwargs``. 112 113 The notification does not by default include the object that actually 114 changed (``self``), but it certainly may if required. 115 """ 116 117 if self._listeners is None: 118 return 119 120 method_name = f'_update_{notification}' 121 for listener in self._listeners.valuerefs(): 122 # Use valuerefs instead of itervaluerefs; see 123 # https://github.com/astropy/astropy/issues/4015 124 listener = listener() # dereference weakref 125 if listener is None: 126 continue 127 128 if hasattr(listener, method_name): 129 method = getattr(listener, method_name) 130 if callable(method): 131 method(*args, **kwargs) 132 133 def __getstate__(self): 134 """ 135 Exclude listeners when saving the listener's state, since they may be 136 ephemeral. 137 """ 138 139 # TODO: This hasn't come up often, but if anyone needs to pickle HDU 140 # objects it will be necessary when HDU objects' states are restored to 141 # re-register themselves as listeners on their new column instances. 142 try: 143 state = super().__getstate__() 144 except AttributeError: 145 # Chances are the super object doesn't have a getstate 146 state = self.__dict__.copy() 147 148 state['_listeners'] = None 149 return state 150 151 152def first(iterable): 153 """ 154 Returns the first item returned by iterating over an iterable object. 155 156 Example: 157 158 >>> a = [1, 2, 3] 159 >>> first(a) 160 1 161 """ 162 163 return next(iter(iterable)) 164 165 166def itersubclasses(cls, _seen=None): 167 """ 168 Generator over all subclasses of a given class, in depth first order. 169 170 >>> class A: pass 171 >>> class B(A): pass 172 >>> class C(A): pass 173 >>> class D(B,C): pass 174 >>> class E(D): pass 175 >>> 176 >>> for cls in itersubclasses(A): 177 ... print(cls.__name__) 178 B 179 D 180 E 181 C 182 >>> # get ALL classes currently defined 183 >>> [cls.__name__ for cls in itersubclasses(object)] 184 [...'tuple', ...'type', ...] 185 186 From http://code.activestate.com/recipes/576949/ 187 """ 188 189 if _seen is None: 190 _seen = set() 191 try: 192 subs = cls.__subclasses__() 193 except TypeError: # fails only when cls is type 194 subs = cls.__subclasses__(cls) 195 for sub in sorted(subs, key=operator.attrgetter('__name__')): 196 if sub not in _seen: 197 _seen.add(sub) 198 yield sub 199 for sub in itersubclasses(sub, _seen): 200 yield sub 201 202 203def ignore_sigint(func): 204 """ 205 This decorator registers a custom SIGINT handler to catch and ignore SIGINT 206 until the wrapped function is completed. 207 """ 208 209 @wraps(func) 210 def wrapped(*args, **kwargs): 211 # Get the name of the current thread and determine if this is a single 212 # threaded application 213 curr_thread = threading.current_thread() 214 single_thread = (threading.active_count() == 1 and 215 curr_thread.name == 'MainThread') 216 217 class SigintHandler: 218 def __init__(self): 219 self.sigint_received = False 220 221 def __call__(self, signum, frame): 222 warnings.warn('KeyboardInterrupt ignored until {} is ' 223 'complete!'.format(func.__name__), 224 AstropyUserWarning) 225 self.sigint_received = True 226 227 sigint_handler = SigintHandler() 228 229 # Define new signal interput handler 230 if single_thread: 231 # Install new handler 232 old_handler = signal.signal(signal.SIGINT, sigint_handler) 233 234 try: 235 func(*args, **kwargs) 236 finally: 237 if single_thread: 238 if old_handler is not None: 239 signal.signal(signal.SIGINT, old_handler) 240 else: 241 signal.signal(signal.SIGINT, signal.SIG_DFL) 242 243 if sigint_handler.sigint_received: 244 raise KeyboardInterrupt 245 246 return wrapped 247 248 249def pairwise(iterable): 250 """Return the items of an iterable paired with its next item. 251 252 Ex: s -> (s0,s1), (s1,s2), (s2,s3), .... 253 """ 254 255 a, b = itertools.tee(iterable) 256 for _ in b: 257 # Just a little trick to advance b without having to catch 258 # StopIter if b happens to be empty 259 break 260 return zip(a, b) 261 262 263def encode_ascii(s): 264 if isinstance(s, str): 265 return s.encode('ascii') 266 elif (isinstance(s, np.ndarray) and 267 issubclass(s.dtype.type, np.str_)): 268 ns = np.char.encode(s, 'ascii').view(type(s)) 269 if ns.dtype.itemsize != s.dtype.itemsize / 4: 270 ns = ns.astype((np.bytes_, s.dtype.itemsize / 4)) 271 return ns 272 elif (isinstance(s, np.ndarray) and 273 not issubclass(s.dtype.type, np.bytes_)): 274 raise TypeError('string operation on non-string array') 275 return s 276 277 278def decode_ascii(s): 279 if isinstance(s, bytes): 280 try: 281 return s.decode('ascii') 282 except UnicodeDecodeError: 283 warnings.warn('non-ASCII characters are present in the FITS ' 284 'file header and have been replaced by "?" ' 285 'characters', AstropyUserWarning) 286 s = s.decode('ascii', errors='replace') 287 return s.replace('\ufffd', '?') 288 elif (isinstance(s, np.ndarray) and 289 issubclass(s.dtype.type, np.bytes_)): 290 # np.char.encode/decode annoyingly don't preserve the type of the 291 # array, hence the view() call 292 # It also doesn't necessarily preserve widths of the strings, 293 # hence the astype() 294 if s.size == 0: 295 # Numpy apparently also has a bug that if a string array is 296 # empty calling np.char.decode on it returns an empty float64 297 # array wth 298 dt = s.dtype.str.replace('S', 'U') 299 ns = np.array([], dtype=dt).view(type(s)) 300 else: 301 ns = np.char.decode(s, 'ascii').view(type(s)) 302 if ns.dtype.itemsize / 4 != s.dtype.itemsize: 303 ns = ns.astype((np.str_, s.dtype.itemsize)) 304 return ns 305 elif (isinstance(s, np.ndarray) and 306 not issubclass(s.dtype.type, np.str_)): 307 # Don't silently pass through on non-string arrays; we don't want 308 # to hide errors where things that are not stringy are attempting 309 # to be decoded 310 raise TypeError('string operation on non-string array') 311 return s 312 313 314def isreadable(f): 315 """ 316 Returns True if the file-like object can be read from. This is a common- 317 sense approximation of io.IOBase.readable. 318 """ 319 320 if hasattr(f, 'readable'): 321 return f.readable() 322 323 if hasattr(f, 'closed') and f.closed: 324 # This mimics the behavior of io.IOBase.readable 325 raise ValueError('I/O operation on closed file') 326 327 if not hasattr(f, 'read'): 328 return False 329 330 if hasattr(f, 'mode') and not any(c in f.mode for c in 'r+'): 331 return False 332 333 # Not closed, has a 'read()' method, and either has no known mode or a 334 # readable mode--should be good enough to assume 'readable' 335 return True 336 337 338def iswritable(f): 339 """ 340 Returns True if the file-like object can be written to. This is a common- 341 sense approximation of io.IOBase.writable. 342 """ 343 344 if hasattr(f, 'writable'): 345 return f.writable() 346 347 if hasattr(f, 'closed') and f.closed: 348 # This mimics the behavior of io.IOBase.writable 349 raise ValueError('I/O operation on closed file') 350 351 if not hasattr(f, 'write'): 352 return False 353 354 if hasattr(f, 'mode') and not any(c in f.mode for c in 'wa+'): 355 return False 356 357 # Note closed, has a 'write()' method, and either has no known mode or a 358 # mode that supports writing--should be good enough to assume 'writable' 359 return True 360 361 362def isfile(f): 363 """ 364 Returns True if the given object represents an OS-level file (that is, 365 ``isinstance(f, file)``). 366 367 On Python 3 this also returns True if the given object is higher level 368 wrapper on top of a FileIO object, such as a TextIOWrapper. 369 """ 370 371 if isinstance(f, io.FileIO): 372 return True 373 elif hasattr(f, 'buffer'): 374 return isfile(f.buffer) 375 elif hasattr(f, 'raw'): 376 return isfile(f.raw) 377 return False 378 379 380def fileobj_open(filename, mode): 381 """ 382 A wrapper around the `open()` builtin. 383 384 This exists because `open()` returns an `io.BufferedReader` by default. 385 This is bad, because `io.BufferedReader` doesn't support random access, 386 which we need in some cases. We must call open with buffering=0 to get 387 a raw random-access file reader. 388 """ 389 390 return open(filename, mode, buffering=0) 391 392 393def fileobj_name(f): 394 """ 395 Returns the 'name' of file-like object *f*, if it has anything that could be 396 called its name. Otherwise f's class or type is returned. If f is a 397 string f itself is returned. 398 """ 399 400 if isinstance(f, (str, bytes)): 401 return f 402 elif isinstance(f, gzip.GzipFile): 403 # The .name attribute on GzipFiles does not always represent the name 404 # of the file being read/written--it can also represent the original 405 # name of the file being compressed 406 # See the documentation at 407 # https://docs.python.org/3/library/gzip.html#gzip.GzipFile 408 # As such, for gzip files only return the name of the underlying 409 # fileobj, if it exists 410 return fileobj_name(f.fileobj) 411 elif hasattr(f, 'name'): 412 return f.name 413 elif hasattr(f, 'filename'): 414 return f.filename 415 elif hasattr(f, '__class__'): 416 return str(f.__class__) 417 else: 418 return str(type(f)) 419 420 421def fileobj_closed(f): 422 """ 423 Returns True if the given file-like object is closed or if *f* is a string 424 (and assumed to be a pathname). 425 426 Returns False for all other types of objects, under the assumption that 427 they are file-like objects with no sense of a 'closed' state. 428 """ 429 430 if isinstance(f, path_like): 431 return True 432 433 if hasattr(f, 'closed'): 434 return f.closed 435 elif hasattr(f, 'fileobj') and hasattr(f.fileobj, 'closed'): 436 return f.fileobj.closed 437 elif hasattr(f, 'fp') and hasattr(f.fp, 'closed'): 438 return f.fp.closed 439 else: 440 return False 441 442 443def fileobj_mode(f): 444 """ 445 Returns the 'mode' string of a file-like object if such a thing exists. 446 Otherwise returns None. 447 """ 448 449 # Go from most to least specific--for example gzip objects have a 'mode' 450 # attribute, but it's not analogous to the file.mode attribute 451 452 # gzip.GzipFile -like 453 if hasattr(f, 'fileobj') and hasattr(f.fileobj, 'mode'): 454 fileobj = f.fileobj 455 456 # astropy.io.fits._File -like, doesn't need additional checks because it's 457 # already validated 458 elif hasattr(f, 'fileobj_mode'): 459 return f.fileobj_mode 460 461 # PIL-Image -like investigate the fp (filebuffer) 462 elif hasattr(f, 'fp') and hasattr(f.fp, 'mode'): 463 fileobj = f.fp 464 465 # FILEIO -like (normal open(...)), keep as is. 466 elif hasattr(f, 'mode'): 467 fileobj = f 468 469 # Doesn't look like a file-like object, for example strings, urls or paths. 470 else: 471 return None 472 473 return _fileobj_normalize_mode(fileobj) 474 475 476def _fileobj_normalize_mode(f): 477 """Takes care of some corner cases in Python where the mode string 478 is either oddly formatted or does not truly represent the file mode. 479 """ 480 mode = f.mode 481 482 # Special case: Gzip modes: 483 if isinstance(f, gzip.GzipFile): 484 # GzipFiles can be either readonly or writeonly 485 if mode == gzip.READ: 486 return 'rb' 487 elif mode == gzip.WRITE: 488 return 'wb' 489 else: 490 return None # This shouldn't happen? 491 492 # Sometimes Python can produce modes like 'r+b' which will be normalized 493 # here to 'rb+' 494 if '+' in mode: 495 mode = mode.replace('+', '') 496 mode += '+' 497 498 return mode 499 500 501def fileobj_is_binary(f): 502 """ 503 Returns True if the give file or file-like object has a file open in binary 504 mode. When in doubt, returns True by default. 505 """ 506 507 # This is kind of a hack for this to work correctly with _File objects, 508 # which, for the time being, are *always* binary 509 if hasattr(f, 'binary'): 510 return f.binary 511 512 if isinstance(f, io.TextIOBase): 513 return False 514 515 mode = fileobj_mode(f) 516 if mode: 517 return 'b' in mode 518 else: 519 return True 520 521 522def translate(s, table, deletechars): 523 if deletechars: 524 table = table.copy() 525 for c in deletechars: 526 table[ord(c)] = None 527 return s.translate(table) 528 529 530def fill(text, width, **kwargs): 531 """ 532 Like :func:`textwrap.wrap` but preserves existing paragraphs which 533 :func:`textwrap.wrap` does not otherwise handle well. Also handles section 534 headers. 535 """ 536 537 paragraphs = text.split('\n\n') 538 539 def maybe_fill(t): 540 if all(len(l) < width for l in t.splitlines()): 541 return t 542 else: 543 return textwrap.fill(t, width, **kwargs) 544 545 return '\n\n'.join(maybe_fill(p) for p in paragraphs) 546 547 548# On MacOS X 10.8 and earlier, there is a bug that causes numpy.fromfile to 549# fail when reading over 2Gb of data. If we detect these versions of MacOS X, 550# we can instead read the data in chunks. To avoid performance penalties at 551# import time, we defer the setting of this global variable until the first 552# time it is needed. 553CHUNKED_FROMFILE = None 554 555 556def _array_from_file(infile, dtype, count): 557 """Create a numpy array from a file or a file-like object.""" 558 559 if isfile(infile): 560 561 global CHUNKED_FROMFILE 562 if CHUNKED_FROMFILE is None: 563 if (sys.platform == 'darwin' and 564 LooseVersion(platform.mac_ver()[0]) < LooseVersion('10.9')): 565 CHUNKED_FROMFILE = True 566 else: 567 CHUNKED_FROMFILE = False 568 569 if CHUNKED_FROMFILE: 570 chunk_size = int(1024 ** 3 / dtype.itemsize) # 1Gb to be safe 571 if count < chunk_size: 572 return np.fromfile(infile, dtype=dtype, count=count) 573 else: 574 array = np.empty(count, dtype=dtype) 575 for beg in range(0, count, chunk_size): 576 end = min(count, beg + chunk_size) 577 array[beg:end] = np.fromfile(infile, dtype=dtype, count=end - beg) 578 return array 579 else: 580 return np.fromfile(infile, dtype=dtype, count=count) 581 else: 582 # treat as file-like object with "read" method; this includes gzip file 583 # objects, because numpy.fromfile just reads the compressed bytes from 584 # their underlying file object, instead of the decompressed bytes 585 read_size = np.dtype(dtype).itemsize * count 586 s = infile.read(read_size) 587 array = np.ndarray(buffer=s, dtype=dtype, shape=(count,)) 588 # copy is needed because np.frombuffer returns a read-only view of the 589 # underlying buffer 590 array = array.copy() 591 return array 592 593 594_OSX_WRITE_LIMIT = (2 ** 32) - 1 595_WIN_WRITE_LIMIT = (2 ** 31) - 1 596 597 598def _array_to_file(arr, outfile): 599 """ 600 Write a numpy array to a file or a file-like object. 601 602 Parameters 603 ---------- 604 arr : ndarray 605 The Numpy array to write. 606 outfile : file-like 607 A file-like object such as a Python file object, an `io.BytesIO`, or 608 anything else with a ``write`` method. The file object must support 609 the buffer interface in its ``write``. 610 611 If writing directly to an on-disk file this delegates directly to 612 `ndarray.tofile`. Otherwise a slower Python implementation is used. 613 """ 614 615 if isfile(outfile) and not isinstance(outfile, io.BufferedIOBase): 616 write = lambda a, f: a.tofile(f) 617 else: 618 write = _array_to_file_like 619 620 # Implements a workaround for a bug deep in OSX's stdlib file writing 621 # functions; on 64-bit OSX it is not possible to correctly write a number 622 # of bytes greater than 2 ** 32 and divisible by 4096 (or possibly 8192-- 623 # whatever the default blocksize for the filesystem is). 624 # This issue should have a workaround in Numpy too, but hasn't been 625 # implemented there yet: https://github.com/astropy/astropy/issues/839 626 # 627 # Apparently Windows has its own fwrite bug: 628 # https://github.com/numpy/numpy/issues/2256 629 630 if (sys.platform == 'darwin' and arr.nbytes >= _OSX_WRITE_LIMIT + 1 and 631 arr.nbytes % 4096 == 0): 632 # chunksize is a count of elements in the array, not bytes 633 chunksize = _OSX_WRITE_LIMIT // arr.itemsize 634 elif sys.platform.startswith('win'): 635 chunksize = _WIN_WRITE_LIMIT // arr.itemsize 636 else: 637 # Just pass the whole array to the write routine 638 return write(arr, outfile) 639 640 # Write one chunk at a time for systems whose fwrite chokes on large 641 # writes. 642 idx = 0 643 arr = arr.view(np.ndarray).flatten() 644 while idx < arr.nbytes: 645 write(arr[idx:idx + chunksize], outfile) 646 idx += chunksize 647 648 649def _array_to_file_like(arr, fileobj): 650 """ 651 Write a `~numpy.ndarray` to a file-like object (which is not supported by 652 `numpy.ndarray.tofile`). 653 """ 654 655 # If the array is empty, we can simply take a shortcut and return since 656 # there is nothing to write. 657 if len(arr) == 0: 658 return 659 660 if arr.flags.contiguous: 661 662 # It suffices to just pass the underlying buffer directly to the 663 # fileobj's write (assuming it supports the buffer interface). If 664 # it does not have the buffer interface, a TypeError should be returned 665 # in which case we can fall back to the other methods. 666 667 try: 668 fileobj.write(arr.data) 669 except TypeError: 670 pass 671 else: 672 return 673 674 if hasattr(np, 'nditer'): 675 # nditer version for non-contiguous arrays 676 for item in np.nditer(arr, order='C'): 677 fileobj.write(item.tobytes()) 678 else: 679 # Slower version for Numpy versions without nditer; 680 # The problem with flatiter is it doesn't preserve the original 681 # byteorder 682 byteorder = arr.dtype.byteorder 683 if ((sys.byteorder == 'little' and byteorder == '>') 684 or (sys.byteorder == 'big' and byteorder == '<')): 685 for item in arr.flat: 686 fileobj.write(item.byteswap().tobytes()) 687 else: 688 for item in arr.flat: 689 fileobj.write(item.tobytes()) 690 691 692def _write_string(f, s): 693 """ 694 Write a string to a file, encoding to ASCII if the file is open in binary 695 mode, or decoding if the file is open in text mode. 696 """ 697 698 # Assume if the file object doesn't have a specific mode, that the mode is 699 # binary 700 binmode = fileobj_is_binary(f) 701 702 if binmode and isinstance(s, str): 703 s = encode_ascii(s) 704 elif not binmode and not isinstance(f, str): 705 s = decode_ascii(s) 706 707 f.write(s) 708 709 710def _convert_array(array, dtype): 711 """ 712 Converts an array to a new dtype--if the itemsize of the new dtype is 713 the same as the old dtype and both types are not numeric, a view is 714 returned. Otherwise a new array must be created. 715 """ 716 717 if array.dtype == dtype: 718 return array 719 elif (array.dtype.itemsize == dtype.itemsize and not 720 (np.issubdtype(array.dtype, np.number) and 721 np.issubdtype(dtype, np.number))): 722 # Includes a special case when both dtypes are at least numeric to 723 # account for old Trac ticket 218 (now inaccessible). 724 return array.view(dtype) 725 else: 726 return array.astype(dtype) 727 728 729def _pseudo_zero(dtype): 730 """ 731 Given a numpy dtype, finds its "zero" point, which is exactly in the 732 middle of its range. 733 """ 734 735 # special case for int8 736 if dtype.kind == 'i' and dtype.itemsize == 1: 737 return -128 738 739 assert dtype.kind == 'u' 740 return 1 << (dtype.itemsize * 8 - 1) 741 742 743def _is_pseudo_integer(dtype): 744 return ( 745 (dtype.kind == 'u' and dtype.itemsize >= 2) 746 or (dtype.kind == 'i' and dtype.itemsize == 1) 747 ) 748 749 750def _is_int(val): 751 return isinstance(val, all_integer_types) 752 753 754def _str_to_num(val): 755 """Converts a given string to either an int or a float if necessary.""" 756 757 try: 758 num = int(val) 759 except ValueError: 760 # If this fails then an exception should be raised anyways 761 num = float(val) 762 return num 763 764 765def _words_group(s, width): 766 """ 767 Split a long string into parts where each part is no longer than ``strlen`` 768 and no word is cut into two pieces. But if there are any single words 769 which are longer than ``strlen``, then they will be split in the middle of 770 the word. 771 """ 772 773 words = [] 774 slen = len(s) 775 776 # appending one blank at the end always ensures that the "last" blank 777 # is beyond the end of the string 778 arr = np.frombuffer(s.encode('utf8') + b' ', dtype='S1') 779 780 # locations of the blanks 781 blank_loc = np.nonzero(arr == b' ')[0] 782 offset = 0 783 xoffset = 0 784 785 while True: 786 try: 787 loc = np.nonzero(blank_loc >= width + offset)[0][0] 788 except IndexError: 789 loc = len(blank_loc) 790 791 if loc > 0: 792 offset = blank_loc[loc - 1] + 1 793 else: 794 offset = -1 795 796 # check for one word longer than strlen, break in the middle 797 if offset <= xoffset: 798 offset = min(xoffset + width, slen) 799 800 # collect the pieces in a list 801 words.append(s[xoffset:offset]) 802 if offset >= slen: 803 break 804 xoffset = offset 805 806 return words 807 808 809def _tmp_name(input): 810 """ 811 Create a temporary file name which should not already exist. Use the 812 directory of the input file as the base name of the mkstemp() output. 813 """ 814 815 if input is not None: 816 input = os.path.dirname(input) 817 f, fn = tempfile.mkstemp(dir=input) 818 os.close(f) 819 return fn 820 821 822def _get_array_mmap(array): 823 """ 824 If the array has an mmap.mmap at base of its base chain, return the mmap 825 object; otherwise return None. 826 """ 827 828 if isinstance(array, mmap.mmap): 829 return array 830 831 base = array 832 while hasattr(base, 'base') and base.base is not None: 833 if isinstance(base.base, mmap.mmap): 834 return base.base 835 base = base.base 836 837 838@contextmanager 839def _free_space_check(hdulist, dirname=None): 840 try: 841 yield 842 except OSError as exc: 843 error_message = '' 844 if not isinstance(hdulist, list): 845 hdulist = [hdulist, ] 846 if dirname is None: 847 dirname = os.path.dirname(hdulist._file.name) 848 if os.path.isdir(dirname): 849 free_space = data.get_free_space_in_dir(dirname) 850 hdulist_size = sum(hdu.size for hdu in hdulist) 851 if free_space < hdulist_size: 852 error_message = ("Not enough space on disk: requested {}, " 853 "available {}. ".format(hdulist_size, free_space)) 854 855 for hdu in hdulist: 856 hdu._close() 857 858 raise OSError(error_message + str(exc)) 859 860 861def _extract_number(value, default): 862 """ 863 Attempts to extract an integer number from the given value. If the 864 extraction fails, the value of the 'default' argument is returned. 865 """ 866 867 try: 868 # The _str_to_num method converts the value to string/float 869 # so we need to perform one additional conversion to int on top 870 return int(_str_to_num(value)) 871 except (TypeError, ValueError): 872 return default 873 874 875def get_testdata_filepath(filename): 876 """ 877 Return a string representing the path to the file requested from the 878 io.fits test data set. 879 880 .. versionadded:: 2.0.3 881 882 Parameters 883 ---------- 884 filename : str 885 The filename of the test data file. 886 887 Returns 888 ------- 889 filepath : str 890 The path to the requested file. 891 """ 892 return data.get_pkg_data_filename( 893 f'io/fits/tests/data/{filename}', 'astropy') 894 895 896def _rstrip_inplace(array): 897 """ 898 Performs an in-place rstrip operation on string arrays. This is necessary 899 since the built-in `np.char.rstrip` in Numpy does not perform an in-place 900 calculation. 901 """ 902 903 # The following implementation convert the string to unsigned integers of 904 # the right length. Trailing spaces (which are represented as 32) are then 905 # converted to null characters (represented as zeros). To avoid creating 906 # large temporary mask arrays, we loop over chunks (attempting to do that 907 # on a 1-D version of the array; large memory may still be needed in the 908 # unlikely case that a string array has small first dimension and cannot 909 # be represented as a contiguous 1-D array in memory). 910 911 dt = array.dtype 912 913 if dt.kind not in 'SU': 914 raise TypeError("This function can only be used on string arrays") 915 # View the array as appropriate integers. The last dimension will 916 # equal the number of characters in each string. 917 bpc = 1 if dt.kind == 'S' else 4 918 dt_int = f"({dt.itemsize // bpc},){dt.byteorder}u{bpc}" 919 b = array.view(dt_int, np.ndarray) 920 # For optimal speed, work in chunks of the internal ufunc buffer size. 921 bufsize = np.getbufsize() 922 # Attempt to have the strings as a 1-D array to give the chunk known size. 923 # Note: the code will work if this fails; the chunks will just be larger. 924 if b.ndim > 2: 925 try: 926 b.shape = -1, b.shape[-1] 927 except AttributeError: # can occur for non-contiguous arrays 928 pass 929 for j in range(0, b.shape[0], bufsize): 930 c = b[j:j + bufsize] 931 # Mask which will tell whether we're in a sequence of trailing spaces. 932 mask = np.ones(c.shape[:-1], dtype=bool) 933 # Loop over the characters in the strings, in reverse order. We process 934 # the i-th character of all strings in the chunk at the same time. If 935 # the character is 32, this corresponds to a space, and we then change 936 # this to 0. We then construct a new mask to find rows where the 937 # i-th character is 0 (null) and the i-1-th is 32 (space) and repeat. 938 for i in range(-1, -c.shape[-1], -1): 939 mask &= c[..., i] == 32 940 c[..., i][mask] = 0 941 mask = c[..., i] == 0 942 943 return array 944