1# Licensed under a 3-clause BSD style license - see PYFITS.rst
2
3import gzip
4import itertools
5import io
6import mmap
7import operator
8import os
9import platform
10import signal
11import sys
12import tempfile
13import textwrap
14import threading
15import warnings
16import weakref
17from contextlib import contextmanager, suppress
18from functools import wraps
19
20from astropy.utils import data
21
22from distutils.version import LooseVersion
23
24import numpy as np
25
26from astropy.utils.exceptions import AstropyUserWarning
27
28path_like = (str, os.PathLike)
29
30cmp = lambda a, b: (a > b) - (a < b)
31
32all_integer_types = (int, np.integer)
33
34
35class NotifierMixin:
36    """
37    Mixin class that provides services by which objects can register
38    listeners to changes on that object.
39
40    All methods provided by this class are underscored, since this is intended
41    for internal use to communicate between classes in a generic way, and is
42    not machinery that should be exposed to users of the classes involved.
43
44    Use the ``_add_listener`` method to register a listener on an instance of
45    the notifier.  This registers the listener with a weak reference, so if
46    no other references to the listener exist it is automatically dropped from
47    the list and does not need to be manually removed.
48
49    Call the ``_notify`` method on the notifier to update all listeners
50    upon changes.  ``_notify('change_type', *args, **kwargs)`` results
51    in calling ``listener._update_change_type(*args, **kwargs)`` on all
52    listeners subscribed to that notifier.
53
54    If a particular listener does not have the appropriate update method
55    it is ignored.
56
57    Examples
58    --------
59
60    >>> class Widget(NotifierMixin):
61    ...     state = 1
62    ...     def __init__(self, name):
63    ...         self.name = name
64    ...     def update_state(self):
65    ...         self.state += 1
66    ...         self._notify('widget_state_changed', self)
67    ...
68    >>> class WidgetListener:
69    ...     def _update_widget_state_changed(self, widget):
70    ...         print('Widget {0} changed state to {1}'.format(
71    ...             widget.name, widget.state))
72    ...
73    >>> widget = Widget('fred')
74    >>> listener = WidgetListener()
75    >>> widget._add_listener(listener)
76    >>> widget.update_state()
77    Widget fred changed state to 2
78    """
79
80    _listeners = None
81
82    def _add_listener(self, listener):
83        """
84        Add an object to the list of listeners to notify of changes to this
85        object.  This adds a weakref to the list of listeners that is
86        removed from the listeners list when the listener has no other
87        references to it.
88        """
89
90        if self._listeners is None:
91            self._listeners = weakref.WeakValueDictionary()
92
93        self._listeners[id(listener)] = listener
94
95    def _remove_listener(self, listener):
96        """
97        Removes the specified listener from the listeners list.  This relies
98        on object identity (i.e. the ``is`` operator).
99        """
100
101        if self._listeners is None:
102            return
103
104        with suppress(KeyError):
105            del self._listeners[id(listener)]
106
107    def _notify(self, notification, *args, **kwargs):
108        """
109        Notify all listeners of some particular state change by calling their
110        ``_update_<notification>`` method with the given ``*args`` and
111        ``**kwargs``.
112
113        The notification does not by default include the object that actually
114        changed (``self``), but it certainly may if required.
115        """
116
117        if self._listeners is None:
118            return
119
120        method_name = f'_update_{notification}'
121        for listener in self._listeners.valuerefs():
122            # Use valuerefs instead of itervaluerefs; see
123            # https://github.com/astropy/astropy/issues/4015
124            listener = listener()  # dereference weakref
125            if listener is None:
126                continue
127
128            if hasattr(listener, method_name):
129                method = getattr(listener, method_name)
130                if callable(method):
131                    method(*args, **kwargs)
132
133    def __getstate__(self):
134        """
135        Exclude listeners when saving the listener's state, since they may be
136        ephemeral.
137        """
138
139        # TODO: This hasn't come up often, but if anyone needs to pickle HDU
140        # objects it will be necessary when HDU objects' states are restored to
141        # re-register themselves as listeners on their new column instances.
142        try:
143            state = super().__getstate__()
144        except AttributeError:
145            # Chances are the super object doesn't have a getstate
146            state = self.__dict__.copy()
147
148        state['_listeners'] = None
149        return state
150
151
152def first(iterable):
153    """
154    Returns the first item returned by iterating over an iterable object.
155
156    Example:
157
158    >>> a = [1, 2, 3]
159    >>> first(a)
160    1
161    """
162
163    return next(iter(iterable))
164
165
166def itersubclasses(cls, _seen=None):
167    """
168    Generator over all subclasses of a given class, in depth first order.
169
170    >>> class A: pass
171    >>> class B(A): pass
172    >>> class C(A): pass
173    >>> class D(B,C): pass
174    >>> class E(D): pass
175    >>>
176    >>> for cls in itersubclasses(A):
177    ...     print(cls.__name__)
178    B
179    D
180    E
181    C
182    >>> # get ALL classes currently defined
183    >>> [cls.__name__ for cls in itersubclasses(object)]
184    [...'tuple', ...'type', ...]
185
186    From http://code.activestate.com/recipes/576949/
187    """
188
189    if _seen is None:
190        _seen = set()
191    try:
192        subs = cls.__subclasses__()
193    except TypeError:  # fails only when cls is type
194        subs = cls.__subclasses__(cls)
195    for sub in sorted(subs, key=operator.attrgetter('__name__')):
196        if sub not in _seen:
197            _seen.add(sub)
198            yield sub
199            for sub in itersubclasses(sub, _seen):
200                yield sub
201
202
203def ignore_sigint(func):
204    """
205    This decorator registers a custom SIGINT handler to catch and ignore SIGINT
206    until the wrapped function is completed.
207    """
208
209    @wraps(func)
210    def wrapped(*args, **kwargs):
211        # Get the name of the current thread and determine if this is a single
212        # threaded application
213        curr_thread = threading.current_thread()
214        single_thread = (threading.active_count() == 1 and
215                         curr_thread.name == 'MainThread')
216
217        class SigintHandler:
218            def __init__(self):
219                self.sigint_received = False
220
221            def __call__(self, signum, frame):
222                warnings.warn('KeyboardInterrupt ignored until {} is '
223                              'complete!'.format(func.__name__),
224                              AstropyUserWarning)
225                self.sigint_received = True
226
227        sigint_handler = SigintHandler()
228
229        # Define new signal interput handler
230        if single_thread:
231            # Install new handler
232            old_handler = signal.signal(signal.SIGINT, sigint_handler)
233
234        try:
235            func(*args, **kwargs)
236        finally:
237            if single_thread:
238                if old_handler is not None:
239                    signal.signal(signal.SIGINT, old_handler)
240                else:
241                    signal.signal(signal.SIGINT, signal.SIG_DFL)
242
243                if sigint_handler.sigint_received:
244                    raise KeyboardInterrupt
245
246    return wrapped
247
248
249def pairwise(iterable):
250    """Return the items of an iterable paired with its next item.
251
252    Ex: s -> (s0,s1), (s1,s2), (s2,s3), ....
253    """
254
255    a, b = itertools.tee(iterable)
256    for _ in b:
257        # Just a little trick to advance b without having to catch
258        # StopIter if b happens to be empty
259        break
260    return zip(a, b)
261
262
263def encode_ascii(s):
264    if isinstance(s, str):
265        return s.encode('ascii')
266    elif (isinstance(s, np.ndarray) and
267          issubclass(s.dtype.type, np.str_)):
268        ns = np.char.encode(s, 'ascii').view(type(s))
269        if ns.dtype.itemsize != s.dtype.itemsize / 4:
270            ns = ns.astype((np.bytes_, s.dtype.itemsize / 4))
271        return ns
272    elif (isinstance(s, np.ndarray) and
273          not issubclass(s.dtype.type, np.bytes_)):
274        raise TypeError('string operation on non-string array')
275    return s
276
277
278def decode_ascii(s):
279    if isinstance(s, bytes):
280        try:
281            return s.decode('ascii')
282        except UnicodeDecodeError:
283            warnings.warn('non-ASCII characters are present in the FITS '
284                          'file header and have been replaced by "?" '
285                          'characters', AstropyUserWarning)
286            s = s.decode('ascii', errors='replace')
287            return s.replace('\ufffd', '?')
288    elif (isinstance(s, np.ndarray) and
289          issubclass(s.dtype.type, np.bytes_)):
290        # np.char.encode/decode annoyingly don't preserve the type of the
291        # array, hence the view() call
292        # It also doesn't necessarily preserve widths of the strings,
293        # hence the astype()
294        if s.size == 0:
295            # Numpy apparently also has a bug that if a string array is
296            # empty calling np.char.decode on it returns an empty float64
297            # array wth
298            dt = s.dtype.str.replace('S', 'U')
299            ns = np.array([], dtype=dt).view(type(s))
300        else:
301            ns = np.char.decode(s, 'ascii').view(type(s))
302        if ns.dtype.itemsize / 4 != s.dtype.itemsize:
303            ns = ns.astype((np.str_, s.dtype.itemsize))
304        return ns
305    elif (isinstance(s, np.ndarray) and
306          not issubclass(s.dtype.type, np.str_)):
307        # Don't silently pass through on non-string arrays; we don't want
308        # to hide errors where things that are not stringy are attempting
309        # to be decoded
310        raise TypeError('string operation on non-string array')
311    return s
312
313
314def isreadable(f):
315    """
316    Returns True if the file-like object can be read from.  This is a common-
317    sense approximation of io.IOBase.readable.
318    """
319
320    if hasattr(f, 'readable'):
321        return f.readable()
322
323    if hasattr(f, 'closed') and f.closed:
324        # This mimics the behavior of io.IOBase.readable
325        raise ValueError('I/O operation on closed file')
326
327    if not hasattr(f, 'read'):
328        return False
329
330    if hasattr(f, 'mode') and not any(c in f.mode for c in 'r+'):
331        return False
332
333    # Not closed, has a 'read()' method, and either has no known mode or a
334    # readable mode--should be good enough to assume 'readable'
335    return True
336
337
338def iswritable(f):
339    """
340    Returns True if the file-like object can be written to.  This is a common-
341    sense approximation of io.IOBase.writable.
342    """
343
344    if hasattr(f, 'writable'):
345        return f.writable()
346
347    if hasattr(f, 'closed') and f.closed:
348        # This mimics the behavior of io.IOBase.writable
349        raise ValueError('I/O operation on closed file')
350
351    if not hasattr(f, 'write'):
352        return False
353
354    if hasattr(f, 'mode') and not any(c in f.mode for c in 'wa+'):
355        return False
356
357    # Note closed, has a 'write()' method, and either has no known mode or a
358    # mode that supports writing--should be good enough to assume 'writable'
359    return True
360
361
362def isfile(f):
363    """
364    Returns True if the given object represents an OS-level file (that is,
365    ``isinstance(f, file)``).
366
367    On Python 3 this also returns True if the given object is higher level
368    wrapper on top of a FileIO object, such as a TextIOWrapper.
369    """
370
371    if isinstance(f, io.FileIO):
372        return True
373    elif hasattr(f, 'buffer'):
374        return isfile(f.buffer)
375    elif hasattr(f, 'raw'):
376        return isfile(f.raw)
377    return False
378
379
380def fileobj_open(filename, mode):
381    """
382    A wrapper around the `open()` builtin.
383
384    This exists because `open()` returns an `io.BufferedReader` by default.
385    This is bad, because `io.BufferedReader` doesn't support random access,
386    which we need in some cases.  We must call open with buffering=0 to get
387    a raw random-access file reader.
388    """
389
390    return open(filename, mode, buffering=0)
391
392
393def fileobj_name(f):
394    """
395    Returns the 'name' of file-like object *f*, if it has anything that could be
396    called its name.  Otherwise f's class or type is returned.  If f is a
397    string f itself is returned.
398    """
399
400    if isinstance(f, (str, bytes)):
401        return f
402    elif isinstance(f, gzip.GzipFile):
403        # The .name attribute on GzipFiles does not always represent the name
404        # of the file being read/written--it can also represent the original
405        # name of the file being compressed
406        # See the documentation at
407        # https://docs.python.org/3/library/gzip.html#gzip.GzipFile
408        # As such, for gzip files only return the name of the underlying
409        # fileobj, if it exists
410        return fileobj_name(f.fileobj)
411    elif hasattr(f, 'name'):
412        return f.name
413    elif hasattr(f, 'filename'):
414        return f.filename
415    elif hasattr(f, '__class__'):
416        return str(f.__class__)
417    else:
418        return str(type(f))
419
420
421def fileobj_closed(f):
422    """
423    Returns True if the given file-like object is closed or if *f* is a string
424    (and assumed to be a pathname).
425
426    Returns False for all other types of objects, under the assumption that
427    they are file-like objects with no sense of a 'closed' state.
428    """
429
430    if isinstance(f, path_like):
431        return True
432
433    if hasattr(f, 'closed'):
434        return f.closed
435    elif hasattr(f, 'fileobj') and hasattr(f.fileobj, 'closed'):
436        return f.fileobj.closed
437    elif hasattr(f, 'fp') and hasattr(f.fp, 'closed'):
438        return f.fp.closed
439    else:
440        return False
441
442
443def fileobj_mode(f):
444    """
445    Returns the 'mode' string of a file-like object if such a thing exists.
446    Otherwise returns None.
447    """
448
449    # Go from most to least specific--for example gzip objects have a 'mode'
450    # attribute, but it's not analogous to the file.mode attribute
451
452    # gzip.GzipFile -like
453    if hasattr(f, 'fileobj') and hasattr(f.fileobj, 'mode'):
454        fileobj = f.fileobj
455
456    # astropy.io.fits._File -like, doesn't need additional checks because it's
457    # already validated
458    elif hasattr(f, 'fileobj_mode'):
459        return f.fileobj_mode
460
461    # PIL-Image -like investigate the fp (filebuffer)
462    elif hasattr(f, 'fp') and hasattr(f.fp, 'mode'):
463        fileobj = f.fp
464
465    # FILEIO -like (normal open(...)), keep as is.
466    elif hasattr(f, 'mode'):
467        fileobj = f
468
469    # Doesn't look like a file-like object, for example strings, urls or paths.
470    else:
471        return None
472
473    return _fileobj_normalize_mode(fileobj)
474
475
476def _fileobj_normalize_mode(f):
477    """Takes care of some corner cases in Python where the mode string
478    is either oddly formatted or does not truly represent the file mode.
479    """
480    mode = f.mode
481
482    # Special case: Gzip modes:
483    if isinstance(f, gzip.GzipFile):
484        # GzipFiles can be either readonly or writeonly
485        if mode == gzip.READ:
486            return 'rb'
487        elif mode == gzip.WRITE:
488            return 'wb'
489        else:
490            return None  # This shouldn't happen?
491
492    # Sometimes Python can produce modes like 'r+b' which will be normalized
493    # here to 'rb+'
494    if '+' in mode:
495        mode = mode.replace('+', '')
496        mode += '+'
497
498    return mode
499
500
501def fileobj_is_binary(f):
502    """
503    Returns True if the give file or file-like object has a file open in binary
504    mode.  When in doubt, returns True by default.
505    """
506
507    # This is kind of a hack for this to work correctly with _File objects,
508    # which, for the time being, are *always* binary
509    if hasattr(f, 'binary'):
510        return f.binary
511
512    if isinstance(f, io.TextIOBase):
513        return False
514
515    mode = fileobj_mode(f)
516    if mode:
517        return 'b' in mode
518    else:
519        return True
520
521
522def translate(s, table, deletechars):
523    if deletechars:
524        table = table.copy()
525        for c in deletechars:
526            table[ord(c)] = None
527    return s.translate(table)
528
529
530def fill(text, width, **kwargs):
531    """
532    Like :func:`textwrap.wrap` but preserves existing paragraphs which
533    :func:`textwrap.wrap` does not otherwise handle well.  Also handles section
534    headers.
535    """
536
537    paragraphs = text.split('\n\n')
538
539    def maybe_fill(t):
540        if all(len(l) < width for l in t.splitlines()):
541            return t
542        else:
543            return textwrap.fill(t, width, **kwargs)
544
545    return '\n\n'.join(maybe_fill(p) for p in paragraphs)
546
547
548# On MacOS X 10.8 and earlier, there is a bug that causes numpy.fromfile to
549# fail when reading over 2Gb of data. If we detect these versions of MacOS X,
550# we can instead read the data in chunks. To avoid performance penalties at
551# import time, we defer the setting of this global variable until the first
552# time it is needed.
553CHUNKED_FROMFILE = None
554
555
556def _array_from_file(infile, dtype, count):
557    """Create a numpy array from a file or a file-like object."""
558
559    if isfile(infile):
560
561        global CHUNKED_FROMFILE
562        if CHUNKED_FROMFILE is None:
563            if (sys.platform == 'darwin' and
564                    LooseVersion(platform.mac_ver()[0]) < LooseVersion('10.9')):
565                CHUNKED_FROMFILE = True
566            else:
567                CHUNKED_FROMFILE = False
568
569        if CHUNKED_FROMFILE:
570            chunk_size = int(1024 ** 3 / dtype.itemsize)  # 1Gb to be safe
571            if count < chunk_size:
572                return np.fromfile(infile, dtype=dtype, count=count)
573            else:
574                array = np.empty(count, dtype=dtype)
575                for beg in range(0, count, chunk_size):
576                    end = min(count, beg + chunk_size)
577                    array[beg:end] = np.fromfile(infile, dtype=dtype, count=end - beg)
578                return array
579        else:
580            return np.fromfile(infile, dtype=dtype, count=count)
581    else:
582        # treat as file-like object with "read" method; this includes gzip file
583        # objects, because numpy.fromfile just reads the compressed bytes from
584        # their underlying file object, instead of the decompressed bytes
585        read_size = np.dtype(dtype).itemsize * count
586        s = infile.read(read_size)
587        array = np.ndarray(buffer=s, dtype=dtype, shape=(count,))
588        # copy is needed because np.frombuffer returns a read-only view of the
589        # underlying buffer
590        array = array.copy()
591        return array
592
593
594_OSX_WRITE_LIMIT = (2 ** 32) - 1
595_WIN_WRITE_LIMIT = (2 ** 31) - 1
596
597
598def _array_to_file(arr, outfile):
599    """
600    Write a numpy array to a file or a file-like object.
601
602    Parameters
603    ----------
604    arr : ndarray
605        The Numpy array to write.
606    outfile : file-like
607        A file-like object such as a Python file object, an `io.BytesIO`, or
608        anything else with a ``write`` method.  The file object must support
609        the buffer interface in its ``write``.
610
611    If writing directly to an on-disk file this delegates directly to
612    `ndarray.tofile`.  Otherwise a slower Python implementation is used.
613    """
614
615    if isfile(outfile) and not isinstance(outfile, io.BufferedIOBase):
616        write = lambda a, f: a.tofile(f)
617    else:
618        write = _array_to_file_like
619
620    # Implements a workaround for a bug deep in OSX's stdlib file writing
621    # functions; on 64-bit OSX it is not possible to correctly write a number
622    # of bytes greater than 2 ** 32 and divisible by 4096 (or possibly 8192--
623    # whatever the default blocksize for the filesystem is).
624    # This issue should have a workaround in Numpy too, but hasn't been
625    # implemented there yet: https://github.com/astropy/astropy/issues/839
626    #
627    # Apparently Windows has its own fwrite bug:
628    # https://github.com/numpy/numpy/issues/2256
629
630    if (sys.platform == 'darwin' and arr.nbytes >= _OSX_WRITE_LIMIT + 1 and
631            arr.nbytes % 4096 == 0):
632        # chunksize is a count of elements in the array, not bytes
633        chunksize = _OSX_WRITE_LIMIT // arr.itemsize
634    elif sys.platform.startswith('win'):
635        chunksize = _WIN_WRITE_LIMIT // arr.itemsize
636    else:
637        # Just pass the whole array to the write routine
638        return write(arr, outfile)
639
640    # Write one chunk at a time for systems whose fwrite chokes on large
641    # writes.
642    idx = 0
643    arr = arr.view(np.ndarray).flatten()
644    while idx < arr.nbytes:
645        write(arr[idx:idx + chunksize], outfile)
646        idx += chunksize
647
648
649def _array_to_file_like(arr, fileobj):
650    """
651    Write a `~numpy.ndarray` to a file-like object (which is not supported by
652    `numpy.ndarray.tofile`).
653    """
654
655    # If the array is empty, we can simply take a shortcut and return since
656    # there is nothing to write.
657    if len(arr) == 0:
658        return
659
660    if arr.flags.contiguous:
661
662        # It suffices to just pass the underlying buffer directly to the
663        # fileobj's write (assuming it supports the buffer interface). If
664        # it does not have the buffer interface, a TypeError should be returned
665        # in which case we can fall back to the other methods.
666
667        try:
668            fileobj.write(arr.data)
669        except TypeError:
670            pass
671        else:
672            return
673
674    if hasattr(np, 'nditer'):
675        # nditer version for non-contiguous arrays
676        for item in np.nditer(arr, order='C'):
677            fileobj.write(item.tobytes())
678    else:
679        # Slower version for Numpy versions without nditer;
680        # The problem with flatiter is it doesn't preserve the original
681        # byteorder
682        byteorder = arr.dtype.byteorder
683        if ((sys.byteorder == 'little' and byteorder == '>')
684                or (sys.byteorder == 'big' and byteorder == '<')):
685            for item in arr.flat:
686                fileobj.write(item.byteswap().tobytes())
687        else:
688            for item in arr.flat:
689                fileobj.write(item.tobytes())
690
691
692def _write_string(f, s):
693    """
694    Write a string to a file, encoding to ASCII if the file is open in binary
695    mode, or decoding if the file is open in text mode.
696    """
697
698    # Assume if the file object doesn't have a specific mode, that the mode is
699    # binary
700    binmode = fileobj_is_binary(f)
701
702    if binmode and isinstance(s, str):
703        s = encode_ascii(s)
704    elif not binmode and not isinstance(f, str):
705        s = decode_ascii(s)
706
707    f.write(s)
708
709
710def _convert_array(array, dtype):
711    """
712    Converts an array to a new dtype--if the itemsize of the new dtype is
713    the same as the old dtype and both types are not numeric, a view is
714    returned.  Otherwise a new array must be created.
715    """
716
717    if array.dtype == dtype:
718        return array
719    elif (array.dtype.itemsize == dtype.itemsize and not
720            (np.issubdtype(array.dtype, np.number) and
721             np.issubdtype(dtype, np.number))):
722        # Includes a special case when both dtypes are at least numeric to
723        # account for old Trac ticket 218 (now inaccessible).
724        return array.view(dtype)
725    else:
726        return array.astype(dtype)
727
728
729def _pseudo_zero(dtype):
730    """
731    Given a numpy dtype, finds its "zero" point, which is exactly in the
732    middle of its range.
733    """
734
735    # special case for int8
736    if dtype.kind == 'i' and dtype.itemsize == 1:
737        return -128
738
739    assert dtype.kind == 'u'
740    return 1 << (dtype.itemsize * 8 - 1)
741
742
743def _is_pseudo_integer(dtype):
744    return (
745        (dtype.kind == 'u' and dtype.itemsize >= 2)
746        or (dtype.kind == 'i' and dtype.itemsize == 1)
747    )
748
749
750def _is_int(val):
751    return isinstance(val, all_integer_types)
752
753
754def _str_to_num(val):
755    """Converts a given string to either an int or a float if necessary."""
756
757    try:
758        num = int(val)
759    except ValueError:
760        # If this fails then an exception should be raised anyways
761        num = float(val)
762    return num
763
764
765def _words_group(s, width):
766    """
767    Split a long string into parts where each part is no longer than ``strlen``
768    and no word is cut into two pieces.  But if there are any single words
769    which are longer than ``strlen``, then they will be split in the middle of
770    the word.
771    """
772
773    words = []
774    slen = len(s)
775
776    # appending one blank at the end always ensures that the "last" blank
777    # is beyond the end of the string
778    arr = np.frombuffer(s.encode('utf8') + b' ', dtype='S1')
779
780    # locations of the blanks
781    blank_loc = np.nonzero(arr == b' ')[0]
782    offset = 0
783    xoffset = 0
784
785    while True:
786        try:
787            loc = np.nonzero(blank_loc >= width + offset)[0][0]
788        except IndexError:
789            loc = len(blank_loc)
790
791        if loc > 0:
792            offset = blank_loc[loc - 1] + 1
793        else:
794            offset = -1
795
796        # check for one word longer than strlen, break in the middle
797        if offset <= xoffset:
798            offset = min(xoffset + width, slen)
799
800        # collect the pieces in a list
801        words.append(s[xoffset:offset])
802        if offset >= slen:
803            break
804        xoffset = offset
805
806    return words
807
808
809def _tmp_name(input):
810    """
811    Create a temporary file name which should not already exist.  Use the
812    directory of the input file as the base name of the mkstemp() output.
813    """
814
815    if input is not None:
816        input = os.path.dirname(input)
817    f, fn = tempfile.mkstemp(dir=input)
818    os.close(f)
819    return fn
820
821
822def _get_array_mmap(array):
823    """
824    If the array has an mmap.mmap at base of its base chain, return the mmap
825    object; otherwise return None.
826    """
827
828    if isinstance(array, mmap.mmap):
829        return array
830
831    base = array
832    while hasattr(base, 'base') and base.base is not None:
833        if isinstance(base.base, mmap.mmap):
834            return base.base
835        base = base.base
836
837
838@contextmanager
839def _free_space_check(hdulist, dirname=None):
840    try:
841        yield
842    except OSError as exc:
843        error_message = ''
844        if not isinstance(hdulist, list):
845            hdulist = [hdulist, ]
846        if dirname is None:
847            dirname = os.path.dirname(hdulist._file.name)
848        if os.path.isdir(dirname):
849            free_space = data.get_free_space_in_dir(dirname)
850            hdulist_size = sum(hdu.size for hdu in hdulist)
851            if free_space < hdulist_size:
852                error_message = ("Not enough space on disk: requested {}, "
853                                 "available {}. ".format(hdulist_size, free_space))
854
855        for hdu in hdulist:
856            hdu._close()
857
858        raise OSError(error_message + str(exc))
859
860
861def _extract_number(value, default):
862    """
863    Attempts to extract an integer number from the given value. If the
864    extraction fails, the value of the 'default' argument is returned.
865    """
866
867    try:
868        # The _str_to_num method converts the value to string/float
869        # so we need to perform one additional conversion to int on top
870        return int(_str_to_num(value))
871    except (TypeError, ValueError):
872        return default
873
874
875def get_testdata_filepath(filename):
876    """
877    Return a string representing the path to the file requested from the
878    io.fits test data set.
879
880    .. versionadded:: 2.0.3
881
882    Parameters
883    ----------
884    filename : str
885        The filename of the test data file.
886
887    Returns
888    -------
889    filepath : str
890        The path to the requested file.
891    """
892    return data.get_pkg_data_filename(
893        f'io/fits/tests/data/{filename}', 'astropy')
894
895
896def _rstrip_inplace(array):
897    """
898    Performs an in-place rstrip operation on string arrays. This is necessary
899    since the built-in `np.char.rstrip` in Numpy does not perform an in-place
900    calculation.
901    """
902
903    # The following implementation convert the string to unsigned integers of
904    # the right length. Trailing spaces (which are represented as 32) are then
905    # converted to null characters (represented as zeros). To avoid creating
906    # large temporary mask arrays, we loop over chunks (attempting to do that
907    # on a 1-D version of the array; large memory may still be needed in the
908    # unlikely case that a string array has small first dimension and cannot
909    # be represented as a contiguous 1-D array in memory).
910
911    dt = array.dtype
912
913    if dt.kind not in 'SU':
914        raise TypeError("This function can only be used on string arrays")
915    # View the array as appropriate integers. The last dimension will
916    # equal the number of characters in each string.
917    bpc = 1 if dt.kind == 'S' else 4
918    dt_int = f"({dt.itemsize // bpc},){dt.byteorder}u{bpc}"
919    b = array.view(dt_int, np.ndarray)
920    # For optimal speed, work in chunks of the internal ufunc buffer size.
921    bufsize = np.getbufsize()
922    # Attempt to have the strings as a 1-D array to give the chunk known size.
923    # Note: the code will work if this fails; the chunks will just be larger.
924    if b.ndim > 2:
925        try:
926            b.shape = -1, b.shape[-1]
927        except AttributeError:  # can occur for non-contiguous arrays
928            pass
929    for j in range(0, b.shape[0], bufsize):
930        c = b[j:j + bufsize]
931        # Mask which will tell whether we're in a sequence of trailing spaces.
932        mask = np.ones(c.shape[:-1], dtype=bool)
933        # Loop over the characters in the strings, in reverse order. We process
934        # the i-th character of all strings in the chunk at the same time. If
935        # the character is 32, this corresponds to a space, and we then change
936        # this to 0. We then construct a new mask to find rows where the
937        # i-th character is 0 (null) and the i-1-th is 32 (space) and repeat.
938        for i in range(-1, -c.shape[-1], -1):
939            mask &= c[..., i] == 32
940            c[..., i][mask] = 0
941            mask = c[..., i] == 0
942
943    return array
944