1 #include "data.table.h"
2
3 /* fast rolling mean - router
4 * early stopping for window bigger than input
5 * also handles 'align' in single place
6 * algo = 0: frollmeanFast
7 * adding/removing in/out of sliding window of observations
8 * algo = 1: frollmeanExact
9 * recalculate whole mean for each observation, roundoff correction is adjusted, also support for NaN and Inf
10 */
frollmean(unsigned int algo,double * x,uint64_t nx,ans_t * ans,int k,int align,double fill,bool narm,int hasna,bool verbose)11 void frollmean(unsigned int algo, double *x, uint64_t nx, ans_t *ans, int k, int align, double fill, bool narm, int hasna, bool verbose) {
12 if (nx < k) { // if window width bigger than input just return vector of fill values
13 if (verbose)
14 snprintf(end(ans->message[0]), 500, _("%s: window width longer than input vector, returning all NA vector\n"), __func__);
15 // implicit n_message limit discussed here: https://github.com/Rdatatable/data.table/issues/3423#issuecomment-487722586
16 for (int i=0; i<nx; i++) {
17 ans->dbl_v[i] = fill;
18 }
19 return;
20 }
21 double tic = 0;
22 if (verbose)
23 tic = omp_get_wtime();
24 if (algo==0) {
25 frollmeanFast(x, nx, ans, k, fill, narm, hasna, verbose);
26 } else if (algo==1) {
27 frollmeanExact(x, nx, ans, k, fill, narm, hasna, verbose);
28 }
29 if (ans->status < 3 && align < 1) { // align center or left, only when no errors occurred
30 int k_ = align==-1 ? k-1 : floor(k/2); // offset to shift
31 if (verbose)
32 snprintf(end(ans->message[0]), 500, _("%s: align %d, shift answer by %d\n"), __func__, align, -k_);
33 memmove((char *)ans->dbl_v, (char *)ans->dbl_v + (k_*sizeof(double)), (nx-k_)*sizeof(double)); // apply shift to achieve expected align
34 for (uint64_t i=nx-k_; i<nx; i++) { // fill from right side
35 ans->dbl_v[i] = fill;
36 }
37 }
38 if (verbose)
39 snprintf(end(ans->message[0]), 500, _("%s: processing algo %u took %.3fs\n"), __func__, algo, omp_get_wtime()-tic);
40 }
41 /* fast rolling mean - fast
42 * when no info on NA (hasNA argument) then assume no NAs run faster version
43 * rollmean implemented as single pass sliding window for align="right"
44 * if NAs detected re-run rollmean implemented as single pass sliding window with NA support
45 */
frollmeanFast(double * x,uint64_t nx,ans_t * ans,int k,double fill,bool narm,int hasna,bool verbose)46 void frollmeanFast(double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasna, bool verbose) {
47 if (verbose)
48 snprintf(end(ans->message[0]), 500, _("%s: running for input length %"PRIu64", window %d, hasna %d, narm %d\n"), "frollmeanFast", (uint64_t)nx, k, hasna, (int)narm);
49 long double w = 0.0; // sliding window aggregate
50 bool truehasna = hasna>0; // flag to re-run with NA support if NAs detected
51 if (!truehasna) {
52 int i; // iterator declared here because it is being used after foor loop
53 for (i=0; i<k-1; i++) { // loop over leading observation, all partial window only
54 w += x[i]; // add current row to sliding window
55 ans->dbl_v[i] = fill; // answers are fill for partial window
56 }
57 w += x[i]; // i==k-1
58 ans->dbl_v[i] = (double) (w / k); // first full sliding window, non-fill rollfun answer
59 if (R_FINITE((double) w)) { // proceed only if no NAs detected in first k obs, otherwise early stopping
60 for (uint64_t i=k; i<nx; i++) { // loop over obs, complete window, all remaining after partial window
61 w -= x[i-k]; // remove leaving row from sliding window
62 w += x[i]; // add current row to sliding window
63 ans->dbl_v[i] = (double) (w / k); // rollfun to answer vector
64 }
65 if (!R_FINITE((double) w)) { // mark to re-run with NA care
66 if (hasna==-1) { // raise warning
67 ans->status = 2;
68 snprintf(end(ans->message[2]), 500, _("%s: hasNA=FALSE used but NA (or other non-finite) value(s) are present in input, use default hasNA=NA to avoid this warning"), __func__);
69 }
70 if (verbose)
71 snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, re-running with extra care for NAs\n"), __func__);
72 w = 0.0;
73 truehasna = true;
74 }
75 } else { // early stopping branch when NAs detected in first k obs
76 if (hasna==-1) { // raise warning
77 ans->status = 2;
78 snprintf(end(ans->message[2]), 500, _("%s: hasNA=FALSE used but NA (or other non-finite) value(s) are present in input, use default hasNA=NA to avoid this warning"), __func__);
79 }
80 if (verbose)
81 snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, skip non-NA attempt and run with extra care for NAs\n"), __func__);
82 w = 0.0;
83 truehasna = true;
84 }
85 }
86 if (truehasna) {
87 int nc = 0; // NA counter within sliding window
88 int i; // iterator declared here because it is being used after foor loop
89 for (i=0; i<k-1; i++) { // loop over leading observation, all partial window only
90 if (R_FINITE(x[i])) {
91 w += x[i]; // add only finite values to window aggregate
92 } else {
93 nc++; // increment NA count in current window
94 }
95 ans->dbl_v[i] = fill; // partial window fill all
96 }
97 if (R_FINITE(x[i])) {
98 w += x[i]; // i==k-1
99 } else {
100 nc++;
101 }
102 if (nc == 0) {
103 ans->dbl_v[i] = (double) (w / k); // no NAs in first full window
104 } else if (nc == k) {
105 ans->dbl_v[i] = narm ? R_NaN : NA_REAL; // all values in sliding window are NA, expected output for fun(NA, na.rm=T/F)
106 } else {
107 ans->dbl_v[i] = narm ? (double) (w / (k - nc)) : NA_REAL; // some values in window are NA
108 }
109 for (uint64_t i=k; i<nx; i++) { // loop over obs, complete window, all remaining after partial window
110 if (R_FINITE(x[i])) {
111 w += x[i]; // add only finite to window aggregate
112 } else {
113 nc++; // increment NA count in current window
114 }
115 if (R_FINITE(x[i-k])) {
116 w -= x[i-k]; // remove only finite from window aggregate
117 } else {
118 nc--; // decrement NA count in current window
119 }
120 if (nc == 0) {
121 ans->dbl_v[i] = (double) (w / k); // no NAs in sliding window for present observation
122 } else if (nc == k) {
123 ans->dbl_v[i] = narm ? R_NaN : NA_REAL; // all values in window are NA, expected output for fun(NA, na.rm=T/F)
124 } else {
125 ans->dbl_v[i] = narm ? (double) (w / (k - nc)) : NA_REAL; // some values in window are NA
126 }
127 }
128 }
129 }
130 /* fast rolling mean - exact
131 * when no info on NA (hasNA argument) then assume no NAs run faster version, also when na.rm=FALSE faster version can proceed
132 * rollmean implemented as mean of k obs for each observation for align="right"
133 * if NAs detected and na.rm=TRUE then re-run rollmean implemented as mean of k bos for each observation with NA support
134 */
frollmeanExact(double * x,uint64_t nx,ans_t * ans,int k,double fill,bool narm,int hasna,bool verbose)135 void frollmeanExact(double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasna, bool verbose) {
136 if (verbose)
137 snprintf(end(ans->message[0]), 500, _("%s: running in parallel for input length %"PRIu64", window %d, hasna %d, narm %d\n"), "frollmeanExact", (uint64_t)nx, k, hasna, (int)narm);
138 for (int i=0; i<k-1; i++) { // fill partial window only
139 ans->dbl_v[i] = fill;
140 }
141 bool truehasna = hasna>0; // flag to re-run with NA support if NAs detected
142 if (!truehasna || !narm) {
143 #pragma omp parallel for num_threads(getDTthreads(nx, true))
144 for (uint64_t i=k-1; i<nx; i++) { // loop on every observation with complete window, partial already filled in single threaded section
145 if (narm && truehasna) {
146 continue; // if NAs detected no point to continue
147 }
148 long double w = 0.0;
149 for (int j=-k+1; j<=0; j++) { // sub-loop on window width
150 w += x[i+j]; // sum of window for particular observation
151 }
152 if (R_FINITE((double) w)) { // no need to calc roundoff correction if NAs detected as will re-call all below in truehasna==1
153 long double res = w / k; // keep results as long double for intermediate processing
154 long double err = 0.0; // roundoff corrector
155 for (int j=-k+1; j<=0; j++) { // nested loop on window width
156 err += x[i+j] - res; // measure difference of obs in sub-loop to calculated fun for obs
157 }
158 ans->dbl_v[i] = (double) (res + (err / k)); // adjust calculated rollfun with roundoff correction
159 } else {
160 if (!narm) {
161 ans->dbl_v[i] = (double) (w / k); // NAs should be propagated
162 }
163 truehasna = true; // NAs detected for this window, set flag so rest of windows will not be re-run
164 }
165 }
166 if (truehasna) {
167 if (hasna==-1) { // raise warning
168 ans->status = 2;
169 snprintf(end(ans->message[2]), 500, _("%s: hasNA=FALSE used but NA (or other non-finite) value(s) are present in input, use default hasNA=NA to avoid this warning"), __func__);
170 }
171 if (verbose) {
172 if (narm) {
173 snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, re-running with extra care for NAs\n"), __func__);
174 } else {
175 snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, na.rm was FALSE so in 'exact' implementation NAs were handled already, no need to re-run\n"), __func__);
176 }
177 }
178 }
179 }
180 if (truehasna && narm) {
181 #pragma omp parallel for num_threads(getDTthreads(nx, true))
182 for (uint64_t i=k-1; i<nx; i++) { // loop on every observation with complete window, partial already filled in single threaded section
183 long double w = 0.0;
184 int nc = 0; // NA counter within sliding window
185 for (int j=-k+1; j<=0; j++) { // nested loop on window width
186 if (ISNAN(x[i+j])) {
187 nc++; // increment NA count in current window
188 } else {
189 w += x[i+j]; // add observation to current window
190 }
191 }
192 if (w > DBL_MAX) {
193 ans->dbl_v[i] = R_PosInf; // handle Inf for na.rm=TRUE consistently to base R
194 } else if (w < -DBL_MAX) {
195 ans->dbl_v[i] = R_NegInf;
196 } else {
197 long double res = w / k; // keep results as long double for intermediate processing
198 long double err = 0.0; // roundoff corrector
199 if (nc == 0) { // no NAs in current window
200 for (int j=-k+1; j<=0; j++) { // sub-loop on window width
201 err += x[i+j] - res; // measure roundoff for each obs in window
202 }
203 ans->dbl_v[i] = (double) (res + (err / k)); // adjust calculated fun with roundoff correction
204 } else if (nc < k) {
205 for (int j=-k+1; j<=0; j++) { // sub-loop on window width
206 if (!ISNAN(x[i+j])) {
207 err += x[i+j] - res; // measure roundoff for each non-NA obs in window
208 }
209 }
210 ans->dbl_v[i] = (double) (res + (err / (k - nc))); // adjust calculated fun with roundoff correction
211 } else { // nc == k
212 ans->dbl_v[i] = R_NaN; // all values NAs and narm so produce expected values
213 }
214 }
215 }
216 }
217 }
218
219 /* fast rolling sum */
frollsum(unsigned int algo,double * x,uint64_t nx,ans_t * ans,int k,int align,double fill,bool narm,int hasna,bool verbose)220 void frollsum(unsigned int algo, double *x, uint64_t nx, ans_t *ans, int k, int align, double fill, bool narm, int hasna, bool verbose) {
221 if (nx < k) {
222 if (verbose)
223 snprintf(end(ans->message[0]), 500, _("%s: window width longer than input vector, returning all NA vector\n"), __func__);
224 for (int i=0; i<nx; i++) {
225 ans->dbl_v[i] = fill;
226 }
227 return;
228 }
229 double tic = 0;
230 if (verbose)
231 tic = omp_get_wtime();
232 if (algo==0) {
233 frollsumFast(x, nx, ans, k, fill, narm, hasna, verbose);
234 } else if (algo==1) {
235 frollsumExact(x, nx, ans, k, fill, narm, hasna, verbose);
236 }
237 if (ans->status < 3 && align < 1) {
238 int k_ = align==-1 ? k-1 : floor(k/2);
239 if (verbose)
240 snprintf(end(ans->message[0]), 500, _("%s: align %d, shift answer by %d\n"), __func__, align, -k_);
241 memmove((char *)ans->dbl_v, (char *)ans->dbl_v + (k_*sizeof(double)), (nx-k_)*sizeof(double));
242 for (uint64_t i=nx-k_; i<nx; i++) {
243 ans->dbl_v[i] = fill;
244 }
245 }
246 if (verbose)
247 snprintf(end(ans->message[0]), 500, _("%s: processing algo %u took %.3fs\n"), __func__, algo, omp_get_wtime()-tic);
248 }
frollsumFast(double * x,uint64_t nx,ans_t * ans,int k,double fill,bool narm,int hasna,bool verbose)249 void frollsumFast(double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasna, bool verbose) {
250 if (verbose)
251 snprintf(end(ans->message[0]), 500, _("%s: running for input length %"PRIu64", window %d, hasna %d, narm %d\n"), "frollsumFast", (uint64_t)nx, k, hasna, (int)narm);
252 long double w = 0.0;
253 bool truehasna = hasna>0;
254 if (!truehasna) {
255 int i;
256 for (i=0; i<k-1; i++) {
257 w += x[i];
258 ans->dbl_v[i] = fill;
259 }
260 w += x[i];
261 ans->dbl_v[i] = (double) w;
262 if (R_FINITE((double) w)) {
263 for (uint64_t i=k; i<nx; i++) {
264 w -= x[i-k];
265 w += x[i];
266 ans->dbl_v[i] = (double) w;
267 }
268 if (!R_FINITE((double) w)) {
269 if (hasna==-1) {
270 ans->status = 2;
271 snprintf(end(ans->message[2]), 500, _("%s: hasNA=FALSE used but NA (or other non-finite) value(s) are present in input, use default hasNA=NA to avoid this warning"), __func__);
272 }
273 if (verbose)
274 snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, re-running with extra care for NAs\n"), __func__);
275 w = 0.0;
276 truehasna = true;
277 }
278 } else {
279 if (hasna==-1) {
280 ans->status = 2;
281 snprintf(end(ans->message[2]), 500, _("%s: hasNA=FALSE used but NA (or other non-finite) value(s) are present in input, use default hasNA=NA to avoid this warning"), __func__);
282 }
283 if (verbose)
284 snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, skip non-NA attempt and run with extra care for NAs\n"), __func__);
285 w = 0.0;
286 truehasna = true;
287 }
288 }
289 if (truehasna) {
290 int nc = 0;
291 int i;
292 for (i=0; i<k-1; i++) {
293 if (R_FINITE(x[i])) {
294 w += x[i];
295 } else {
296 nc++;
297 }
298 ans->dbl_v[i] = fill;
299 }
300 if (R_FINITE(x[i])) {
301 w += x[i];
302 } else {
303 nc++;
304 }
305 if (nc == 0) {
306 ans->dbl_v[i] = (double) w;
307 } else if (nc == k) {
308 ans->dbl_v[i] = narm ? 0.0 : NA_REAL;
309 } else {
310 ans->dbl_v[i] = narm ? (double) w : NA_REAL;
311 }
312 for (uint64_t i=k; i<nx; i++) {
313 if (R_FINITE(x[i])) {
314 w += x[i];
315 } else {
316 nc++;
317 }
318 if (R_FINITE(x[i-k])) {
319 w -= x[i-k];
320 } else {
321 nc--;
322 }
323 if (nc == 0) {
324 ans->dbl_v[i] = (double) w;
325 } else if (nc == k) {
326 ans->dbl_v[i] = narm ? 0.0 : NA_REAL;
327 } else {
328 ans->dbl_v[i] = narm ? (double) w : NA_REAL;
329 }
330 }
331 }
332 }
frollsumExact(double * x,uint64_t nx,ans_t * ans,int k,double fill,bool narm,int hasna,bool verbose)333 void frollsumExact(double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasna, bool verbose) {
334 if (verbose)
335 snprintf(end(ans->message[0]), 500, _("%s: running in parallel for input length %"PRIu64", window %d, hasna %d, narm %d\n"), "frollsumExact", (uint64_t)nx, k, hasna, (int)narm);
336 for (int i=0; i<k-1; i++) {
337 ans->dbl_v[i] = fill;
338 }
339 bool truehasna = hasna>0;
340 if (!truehasna || !narm) {
341 #pragma omp parallel for num_threads(getDTthreads(nx, true))
342 for (uint64_t i=k-1; i<nx; i++) {
343 if (narm && truehasna) {
344 continue;
345 }
346 long double w = 0.0;
347 for (int j=-k+1; j<=0; j++) {
348 w += x[i+j];
349 }
350 if (R_FINITE((double) w)) {
351 ans->dbl_v[i] = (double) w;
352 } else {
353 if (!narm) {
354 ans->dbl_v[i] = (double) w;
355 }
356 truehasna = true;
357 }
358 }
359 if (truehasna) {
360 if (hasna==-1) {
361 ans->status = 2;
362 snprintf(end(ans->message[2]), 500, _("%s: hasNA=FALSE used but NA (or other non-finite) value(s) are present in input, use default hasNA=NA to avoid this warning"), __func__);
363 }
364 if (verbose) {
365 if (narm) {
366 snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, re-running with extra care for NAs\n"), __func__);
367 } else {
368 snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, na.rm was FALSE so in 'exact' implementation NAs were handled already, no need to re-run\n"), __func__);
369 }
370 }
371 }
372 }
373 if (truehasna && narm) {
374 #pragma omp parallel for num_threads(getDTthreads(nx, true))
375 for (uint64_t i=k-1; i<nx; i++) {
376 long double w = 0.0;
377 int nc = 0;
378 for (int j=-k+1; j<=0; j++) {
379 if (ISNAN(x[i+j])) {
380 nc++;
381 } else {
382 w += x[i+j];
383 }
384 }
385 if (w > DBL_MAX) {
386 ans->dbl_v[i] = R_PosInf;
387 } else if (w < -DBL_MAX) {
388 ans->dbl_v[i] = R_NegInf;
389 } else {
390 if (nc < k) {
391 ans->dbl_v[i] = (double) w;
392 } else {
393 ans->dbl_v[i] = 0.0;
394 }
395 }
396 }
397 }
398 }
399
400 /* fast rolling any R function
401 * not plain C, not thread safe
402 * R eval() allocates
403 */
frollapply(double * x,int64_t nx,double * w,int k,ans_t * ans,int align,double fill,SEXP call,SEXP rho,bool verbose)404 void frollapply(double *x, int64_t nx, double *w, int k, ans_t *ans, int align, double fill, SEXP call, SEXP rho, bool verbose) {
405 if (nx < k) {
406 if (verbose)
407 Rprintf(_("%s: window width longer than input vector, returning all NA vector\n"), __func__);
408 for (int i=0; i<nx; i++) {
409 ans->dbl_v[i] = fill;
410 }
411 return;
412 }
413 double tic = 0;
414 if (verbose)
415 tic = omp_get_wtime();
416 for (int i=0; i<k-1; i++) {
417 ans->dbl_v[i] = fill;
418 }
419 // this is i=k-1 iteration - first full window - taken out from the loop
420 // we use it to add extra check that results of a FUN are length 1 numeric
421 memcpy(w, x, k*sizeof(double));
422 SEXP eval0 = PROTECT(eval(call, rho));
423 if (xlength(eval0) != 1)
424 error(_("%s: results from provided FUN are not length 1"), __func__);
425 SEXPTYPE teval0 = TYPEOF(eval0);
426 if (teval0 == REALSXP) {
427 ans->dbl_v[k-1] = REAL(eval0)[0];
428 } else {
429 if (teval0==INTSXP || teval0==LGLSXP) {
430 if (verbose)
431 Rprintf(_("%s: results from provided FUN are not of type double, coercion from integer or logical will be applied on each iteration\n"), __func__);
432 ans->dbl_v[k-1] = REAL(coerceVector(eval0, REALSXP))[0];
433 } else {
434 error(_("%s: results from provided FUN are not of type double"), __func__);
435 }
436 }
437 UNPROTECT(1); // eval0
438 // for each row it copies expected window data into w
439 // evaluate call which has been prepared to point into w
440 if (teval0 == REALSXP) {
441 for (int64_t i=k; i<nx; i++) {
442 memcpy(w, x+(i-k+1), k*sizeof(double));
443 ans->dbl_v[i] = REAL(eval(call, rho))[0]; // this may fail with for a not type-stable fun
444 }
445 } else {
446 for (int64_t i=k; i<nx; i++) {
447 memcpy(w, x+(i-k+1), k*sizeof(double));
448 SEXP evali = PROTECT(eval(call, rho));
449 ans->dbl_v[i] = REAL(coerceVector(evali, REALSXP))[0];
450 UNPROTECT(1); // evali
451 }
452 }
453 if (ans->status < 3 && align < 1) {
454 int k_ = align==-1 ? k-1 : floor(k/2);
455 if (verbose)
456 Rprintf(_("%s: align %d, shift answer by %d\n"), __func__, align, -k_);
457 memmove((char *)ans->dbl_v, (char *)ans->dbl_v + (k_*sizeof(double)), (nx-k_)*sizeof(double));
458 for (int64_t i=nx-k_; i<nx; i++) {
459 ans->dbl_v[i] = fill;
460 }
461 }
462 if (verbose)
463 Rprintf(_("%s: took %.3fs\n"), __func__, omp_get_wtime()-tic);
464 }
465