1#cython: wraparound=False 2#cython: boundscheck=False 3#cython: nonecheck=False 4 5"""Utility functions for finding peaks in signals.""" 6 7import warnings 8 9import numpy as np 10 11cimport numpy as np 12from libc.math cimport ceil 13 14 15__all__ = ['_local_maxima_1d', '_select_by_peak_distance', '_peak_prominences', 16 '_peak_widths'] 17 18 19def _local_maxima_1d(np.float64_t[::1] x not None): 20 """ 21 Find local maxima in a 1D array. 22 23 This function finds all local maxima in a 1D array and returns the indices 24 for their edges and midpoints (rounded down for even plateau sizes). 25 26 Parameters 27 ---------- 28 x : ndarray 29 The array to search for local maxima. 30 31 Returns 32 ------- 33 midpoints : ndarray 34 Indices of midpoints of local maxima in `x`. 35 left_edges : ndarray 36 Indices of edges to the left of local maxima in `x`. 37 right_edges : ndarray 38 Indices of edges to the right of local maxima in `x`. 39 40 Notes 41 ----- 42 - Compared to `argrelmax` this function is significantly faster and can 43 detect maxima that are more than one sample wide. However this comes at 44 the cost of being only applicable to 1D arrays. 45 - A maxima is defined as one or more samples of equal value that are 46 surrounded on both sides by at least one smaller sample. 47 48 .. versionadded:: 1.1.0 49 """ 50 cdef: 51 np.intp_t[::1] midpoints, left_edges, right_edges 52 np.intp_t m, i, i_ahead, i_max 53 54 # Preallocate, there can't be more maxima than half the size of `x` 55 midpoints = np.empty(x.shape[0] // 2, dtype=np.intp) 56 left_edges = np.empty(x.shape[0] // 2, dtype=np.intp) 57 right_edges = np.empty(x.shape[0] // 2, dtype=np.intp) 58 m = 0 # Pointer to the end of valid area in allocated arrays 59 60 with nogil: 61 i = 1 # Pointer to current sample, first one can't be maxima 62 i_max = x.shape[0] - 1 # Last sample can't be maxima 63 while i < i_max: 64 # Test if previous sample is smaller 65 if x[i - 1] < x[i]: 66 i_ahead = i + 1 # Index to look ahead of current sample 67 68 # Find next sample that is unequal to x[i] 69 while i_ahead < i_max and x[i_ahead] == x[i]: 70 i_ahead += 1 71 72 # Maxima is found if next unequal sample is smaller than x[i] 73 if x[i_ahead] < x[i]: 74 left_edges[m] = i 75 right_edges[m] = i_ahead - 1 76 midpoints[m] = (left_edges[m] + right_edges[m]) // 2 77 m += 1 78 # Skip samples that can't be maximum 79 i = i_ahead 80 i += 1 81 82 # Keep only valid part of array memory. 83 midpoints.base.resize(m, refcheck=False) 84 left_edges.base.resize(m, refcheck=False) 85 right_edges.base.resize(m, refcheck=False) 86 87 return midpoints.base, left_edges.base, right_edges.base 88 89 90def _select_by_peak_distance(np.intp_t[::1] peaks not None, 91 np.float64_t[::1] priority not None, 92 np.float64_t distance): 93 """ 94 Evaluate which peaks fulfill the distance condition. 95 96 Parameters 97 ---------- 98 peaks : ndarray 99 Indices of peaks in `vector`. 100 priority : ndarray 101 An array matching `peaks` used to determine priority of each peak. A 102 peak with a higher priority value is kept over one with a lower one. 103 distance : np.float64 104 Minimal distance that peaks must be spaced. 105 106 Returns 107 ------- 108 keep : ndarray[bool] 109 A boolean mask evaluating to true where `peaks` fulfill the distance 110 condition. 111 112 Notes 113 ----- 114 Declaring the input arrays as C-contiguous doesn't seem to have performance 115 advantages. 116 117 .. versionadded:: 1.1.0 118 """ 119 cdef: 120 np.uint8_t[::1] keep 121 np.intp_t[::1] priority_to_position 122 np.intp_t i, j, k, peaks_size, distance_ 123 124 peaks_size = peaks.shape[0] 125 # Round up because actual peak distance can only be natural number 126 distance_ = <np.intp_t>ceil(distance) 127 keep = np.ones(peaks_size, dtype=np.uint8) # Prepare array of flags 128 129 # Create map from `i` (index for `peaks` sorted by `priority`) to `j` (index 130 # for `peaks` sorted by position). This allows to iterate `peaks` and `keep` 131 # with `j` by order of `priority` while still maintaining the ability to 132 # step to neighbouring peaks with (`j` + 1) or (`j` - 1). 133 priority_to_position = np.argsort(priority) 134 135 with nogil: 136 # Highest priority first -> iterate in reverse order (decreasing) 137 for i in range(peaks_size - 1, -1, -1): 138 # "Translate" `i` to `j` which points to current peak whose 139 # neighbours are to be evaluated 140 j = priority_to_position[i] 141 if keep[j] == 0: 142 # Skip evaluation for peak already marked as "don't keep" 143 continue 144 145 k = j - 1 146 # Flag "earlier" peaks for removal until minimal distance is exceeded 147 while 0 <= k and peaks[j] - peaks[k] < distance_: 148 keep[k] = 0 149 k -= 1 150 151 k = j + 1 152 # Flag "later" peaks for removal until minimal distance is exceeded 153 while k < peaks_size and peaks[k] - peaks[j] < distance_: 154 keep[k] = 0 155 k += 1 156 157 return keep.base.view(dtype=np.bool_) # Return as boolean array 158 159 160class PeakPropertyWarning(RuntimeWarning): 161 """Calculated property of a peak has unexpected value.""" 162 pass 163 164 165def _peak_prominences(np.float64_t[::1] x not None, 166 np.intp_t[::1] peaks not None, 167 np.intp_t wlen): 168 """ 169 Calculate the prominence of each peak in a signal. 170 171 Parameters 172 ---------- 173 x : ndarray 174 A signal with peaks. 175 peaks : ndarray 176 Indices of peaks in `x`. 177 wlen : np.intp 178 A window length in samples (see `peak_prominences`) which is rounded up 179 to the nearest odd integer. If smaller than 2 the entire signal `x` is 180 used. 181 182 Returns 183 ------- 184 prominences : ndarray 185 The calculated prominences for each peak in `peaks`. 186 left_bases, right_bases : ndarray 187 The peaks' bases as indices in `x` to the left and right of each peak. 188 189 Raises 190 ------ 191 ValueError 192 If a value in `peaks` is an invalid index for `x`. 193 194 Warns 195 ----- 196 PeakPropertyWarning 197 If a prominence of 0 was calculated for any peak. 198 199 Notes 200 ----- 201 This is the inner function to `peak_prominences`. 202 203 .. versionadded:: 1.1.0 204 """ 205 cdef: 206 np.float64_t[::1] prominences 207 np.intp_t[::1] left_bases, right_bases 208 np.float64_t left_min, right_min 209 np.intp_t peak_nr, peak, i_min, i_max, i 210 np.uint8_t show_warning 211 212 show_warning = False 213 prominences = np.empty(peaks.shape[0], dtype=np.float64) 214 left_bases = np.empty(peaks.shape[0], dtype=np.intp) 215 right_bases = np.empty(peaks.shape[0], dtype=np.intp) 216 217 with nogil: 218 for peak_nr in range(peaks.shape[0]): 219 peak = peaks[peak_nr] 220 i_min = 0 221 i_max = x.shape[0] - 1 222 if not i_min <= peak <= i_max: 223 with gil: 224 raise ValueError("peak {} is not a valid index for `x`" 225 .format(peak)) 226 227 if 2 <= wlen: 228 # Adjust window around the evaluated peak (within bounds); 229 # if wlen is even the resulting window length is is implicitly 230 # rounded to next odd integer 231 i_min = max(peak - wlen // 2, i_min) 232 i_max = min(peak + wlen // 2, i_max) 233 234 # Find the left base in interval [i_min, peak] 235 i = left_bases[peak_nr] = peak 236 left_min = x[peak] 237 while i_min <= i and x[i] <= x[peak]: 238 if x[i] < left_min: 239 left_min = x[i] 240 left_bases[peak_nr] = i 241 i -= 1 242 243 # Find the right base in interval [peak, i_max] 244 i = right_bases[peak_nr] = peak 245 right_min = x[peak] 246 while i <= i_max and x[i] <= x[peak]: 247 if x[i] < right_min: 248 right_min = x[i] 249 right_bases[peak_nr] = i 250 i += 1 251 252 prominences[peak_nr] = x[peak] - max(left_min, right_min) 253 if prominences[peak_nr] == 0: 254 show_warning = True 255 256 if show_warning: 257 warnings.warn("some peaks have a prominence of 0", 258 PeakPropertyWarning, stacklevel=2) 259 # Return memoryviews as ndarrays 260 return prominences.base, left_bases.base, right_bases.base 261 262 263def _peak_widths(np.float64_t[::1] x not None, 264 np.intp_t[::1] peaks not None, 265 np.float64_t rel_height, 266 np.float64_t[::1] prominences not None, 267 np.intp_t[::1] left_bases not None, 268 np.intp_t[::1] right_bases not None): 269 """ 270 Calculate the width of each each peak in a signal. 271 272 Parameters 273 ---------- 274 x : ndarray 275 A signal with peaks. 276 peaks : ndarray 277 Indices of peaks in `x`. 278 rel_height : np.float64 279 Chooses the relative height at which the peak width is measured as a 280 percentage of its prominence (see `peak_widths`). 281 prominences : ndarray 282 Prominences of each peak in `peaks` as returned by `peak_prominences`. 283 left_bases, right_bases : ndarray 284 Left and right bases of each peak in `peaks` as returned by 285 `peak_prominences`. 286 287 Returns 288 ------- 289 widths : ndarray 290 The widths for each peak in samples. 291 width_heights : ndarray 292 The height of the contour lines at which the `widths` where evaluated. 293 left_ips, right_ips : ndarray 294 Interpolated positions of left and right intersection points of a 295 horizontal line at the respective evaluation height. 296 297 Raises 298 ------ 299 ValueError 300 If the supplied prominence data doesn't satisfy the condition 301 ``0 <= left_base <= peak <= right_base < x.shape[0]`` for each peak or 302 if `peaks`, `left_bases` and `right_bases` don't share the same shape. 303 Or if `rel_height` is not at least 0. 304 305 Warnings 306 -------- 307 PeakPropertyWarning 308 If a width of 0 was calculated for any peak. 309 310 Notes 311 ----- 312 This is the inner function to `peak_widths`. 313 314 .. versionadded:: 1.1.0 315 """ 316 cdef: 317 np.float64_t[::1] widths, width_heights, left_ips, right_ips 318 np.float64_t height, left_ip, right_ip 319 np.intp_t p, peak, i, i_max, i_min 320 np.uint8_t show_warning 321 322 if rel_height < 0: 323 raise ValueError('`rel_height` must be greater or equal to 0.0') 324 if not (peaks.shape[0] == prominences.shape[0] == left_bases.shape[0] 325 == right_bases.shape[0]): 326 raise ValueError("arrays in `prominence_data` must have the same shape " 327 "as `peaks`") 328 329 show_warning = False 330 widths = np.empty(peaks.shape[0], dtype=np.float64) 331 width_heights = np.empty(peaks.shape[0], dtype=np.float64) 332 left_ips = np.empty(peaks.shape[0], dtype=np.float64) 333 right_ips = np.empty(peaks.shape[0], dtype=np.float64) 334 335 with nogil: 336 for p in range(peaks.shape[0]): 337 i_min = left_bases[p] 338 i_max = right_bases[p] 339 peak = peaks[p] 340 # Validate bounds and order 341 if not 0 <= i_min <= peak <= i_max < x.shape[0]: 342 with gil: 343 raise ValueError("prominence data is invalid for peak {}" 344 .format(peak)) 345 height = width_heights[p] = x[peak] - prominences[p] * rel_height 346 347 # Find intersection point on left side 348 i = peak 349 while i_min < i and height < x[i]: 350 i -= 1 351 left_ip = <np.float64_t>i 352 if x[i] < height: 353 # Interpolate if true intersection height is between samples 354 left_ip += (height - x[i]) / (x[i + 1] - x[i]) 355 356 # Find intersection point on right side 357 i = peak 358 while i < i_max and height < x[i]: 359 i += 1 360 right_ip = <np.float64_t>i 361 if x[i] < height: 362 # Interpolate if true intersection height is between samples 363 right_ip -= (height - x[i]) / (x[i - 1] - x[i]) 364 365 widths[p] = right_ip - left_ip 366 if widths[p] == 0: 367 show_warning = True 368 left_ips[p] = left_ip 369 right_ips[p] = right_ip 370 371 if show_warning: 372 warnings.warn("some peaks have a width of 0", 373 PeakPropertyWarning, stacklevel=2) 374 return widths.base, width_heights.base, left_ips.base, right_ips.base 375