1import warnings
2
3import numpy as np
4import pandas as pd
5from numpy.core.multiarray import normalize_axis_index  # type: ignore[attr-defined]
6
7from .options import OPTIONS
8
9try:
10    import bottleneck as bn
11
12    _USE_BOTTLENECK = True
13except ImportError:
14    # use numpy methods instead
15    bn = np
16    _USE_BOTTLENECK = False
17
18
19def _select_along_axis(values, idx, axis):
20    other_ind = np.ix_(*[np.arange(s) for s in idx.shape])
21    sl = other_ind[:axis] + (idx,) + other_ind[axis:]
22    return values[sl]
23
24
25def nanfirst(values, axis):
26    axis = normalize_axis_index(axis, values.ndim)
27    idx_first = np.argmax(~pd.isnull(values), axis=axis)
28    return _select_along_axis(values, idx_first, axis)
29
30
31def nanlast(values, axis):
32    axis = normalize_axis_index(axis, values.ndim)
33    rev = (slice(None),) * axis + (slice(None, None, -1),)
34    idx_last = -1 - np.argmax(~pd.isnull(values)[rev], axis=axis)
35    return _select_along_axis(values, idx_last, axis)
36
37
38def inverse_permutation(indices):
39    """Return indices for an inverse permutation.
40
41    Parameters
42    ----------
43    indices : 1D np.ndarray with dtype=int
44        Integer positions to assign elements to.
45
46    Returns
47    -------
48    inverse_permutation : 1D np.ndarray with dtype=int
49        Integer indices to take from the original array to create the
50        permutation.
51    """
52    # use intp instead of int64 because of windows :(
53    inverse_permutation = np.empty(len(indices), dtype=np.intp)
54    inverse_permutation[indices] = np.arange(len(indices), dtype=np.intp)
55    return inverse_permutation
56
57
58def _ensure_bool_is_ndarray(result, *args):
59    # numpy will sometimes return a scalar value from binary comparisons if it
60    # can't handle the comparison instead of broadcasting, e.g.,
61    # In [10]: 1 == np.array(['a', 'b'])
62    # Out[10]: False
63    # This function ensures that the result is the appropriate shape in these
64    # cases
65    if isinstance(result, bool):
66        shape = np.broadcast(*args).shape
67        constructor = np.ones if result else np.zeros
68        result = constructor(shape, dtype=bool)
69    return result
70
71
72def array_eq(self, other):
73    with warnings.catch_warnings():
74        warnings.filterwarnings("ignore", r"elementwise comparison failed")
75        return _ensure_bool_is_ndarray(self == other, self, other)
76
77
78def array_ne(self, other):
79    with warnings.catch_warnings():
80        warnings.filterwarnings("ignore", r"elementwise comparison failed")
81        return _ensure_bool_is_ndarray(self != other, self, other)
82
83
84def _is_contiguous(positions):
85    """Given a non-empty list, does it consist of contiguous integers?"""
86    previous = positions[0]
87    for current in positions[1:]:
88        if current != previous + 1:
89            return False
90        previous = current
91    return True
92
93
94def _advanced_indexer_subspaces(key):
95    """Indices of the advanced indexes subspaces for mixed indexing and vindex."""
96    if not isinstance(key, tuple):
97        key = (key,)
98    advanced_index_positions = [
99        i for i, k in enumerate(key) if not isinstance(k, slice)
100    ]
101
102    if not advanced_index_positions or not _is_contiguous(advanced_index_positions):
103        # Nothing to reorder: dimensions on the indexing result are already
104        # ordered like vindex. See NumPy's rule for "Combining advanced and
105        # basic indexing":
106        # https://docs.scipy.org/doc/numpy/reference/arrays.indexing.html#combining-advanced-and-basic-indexing
107        return (), ()
108
109    non_slices = [k for k in key if not isinstance(k, slice)]
110    ndim = len(np.broadcast(*non_slices).shape)
111    mixed_positions = advanced_index_positions[0] + np.arange(ndim)
112    vindex_positions = np.arange(ndim)
113    return mixed_positions, vindex_positions
114
115
116class NumpyVIndexAdapter:
117    """Object that implements indexing like vindex on a np.ndarray.
118
119    This is a pure Python implementation of (some of) the logic in this NumPy
120    proposal: https://github.com/numpy/numpy/pull/6256
121    """
122
123    def __init__(self, array):
124        self._array = array
125
126    def __getitem__(self, key):
127        mixed_positions, vindex_positions = _advanced_indexer_subspaces(key)
128        return np.moveaxis(self._array[key], mixed_positions, vindex_positions)
129
130    def __setitem__(self, key, value):
131        """Value must have dimensionality matching the key."""
132        mixed_positions, vindex_positions = _advanced_indexer_subspaces(key)
133        self._array[key] = np.moveaxis(value, vindex_positions, mixed_positions)
134
135
136def _create_bottleneck_method(name, npmodule=np):
137    def f(values, axis=None, **kwargs):
138        dtype = kwargs.get("dtype", None)
139        bn_func = getattr(bn, name, None)
140
141        if (
142            _USE_BOTTLENECK
143            and OPTIONS["use_bottleneck"]
144            and isinstance(values, np.ndarray)
145            and bn_func is not None
146            and not isinstance(axis, tuple)
147            and values.dtype.kind in "uifc"
148            and values.dtype.isnative
149            and (dtype is None or np.dtype(dtype) == values.dtype)
150        ):
151            # bottleneck does not take care dtype, min_count
152            kwargs.pop("dtype", None)
153            result = bn_func(values, axis=axis, **kwargs)
154        else:
155            result = getattr(npmodule, name)(values, axis=axis, **kwargs)
156
157        return result
158
159    f.__name__ = name
160    return f
161
162
163def _nanpolyfit_1d(arr, x, rcond=None):
164    out = np.full((x.shape[1] + 1,), np.nan)
165    mask = np.isnan(arr)
166    if not np.all(mask):
167        out[:-1], resid, rank, _ = np.linalg.lstsq(x[~mask, :], arr[~mask], rcond=rcond)
168        out[-1] = resid if resid.size > 0 else np.nan
169        warn_on_deficient_rank(rank, x.shape[1])
170    return out
171
172
173def warn_on_deficient_rank(rank, order):
174    if rank != order:
175        warnings.warn("Polyfit may be poorly conditioned", np.RankWarning, stacklevel=2)
176
177
178def least_squares(lhs, rhs, rcond=None, skipna=False):
179    if skipna:
180        added_dim = rhs.ndim == 1
181        if added_dim:
182            rhs = rhs.reshape(rhs.shape[0], 1)
183        nan_cols = np.any(np.isnan(rhs), axis=0)
184        out = np.empty((lhs.shape[1] + 1, rhs.shape[1]))
185        if np.any(nan_cols):
186            out[:, nan_cols] = np.apply_along_axis(
187                _nanpolyfit_1d, 0, rhs[:, nan_cols], lhs
188            )
189        if np.any(~nan_cols):
190            out[:-1, ~nan_cols], resids, rank, _ = np.linalg.lstsq(
191                lhs, rhs[:, ~nan_cols], rcond=rcond
192            )
193            out[-1, ~nan_cols] = resids if resids.size > 0 else np.nan
194            warn_on_deficient_rank(rank, lhs.shape[1])
195        coeffs = out[:-1, :]
196        residuals = out[-1, :]
197        if added_dim:
198            coeffs = coeffs.reshape(coeffs.shape[0])
199            residuals = residuals.reshape(residuals.shape[0])
200    else:
201        coeffs, residuals, rank, _ = np.linalg.lstsq(lhs, rhs, rcond=rcond)
202        if residuals.size == 0:
203            residuals = coeffs[0] * np.nan
204        warn_on_deficient_rank(rank, lhs.shape[1])
205    return coeffs, residuals
206
207
208nanmin = _create_bottleneck_method("nanmin")
209nanmax = _create_bottleneck_method("nanmax")
210nanmean = _create_bottleneck_method("nanmean")
211nanmedian = _create_bottleneck_method("nanmedian")
212nanvar = _create_bottleneck_method("nanvar")
213nanstd = _create_bottleneck_method("nanstd")
214nanprod = _create_bottleneck_method("nanprod")
215nancumsum = _create_bottleneck_method("nancumsum")
216nancumprod = _create_bottleneck_method("nancumprod")
217nanargmin = _create_bottleneck_method("nanargmin")
218nanargmax = _create_bottleneck_method("nanargmax")
219