1 #include "data.table.h"
2 
3 /* fast rolling mean - router
4  * early stopping for window bigger than input
5  * also handles 'align' in single place
6  * algo = 0: frollmeanFast
7  *   adding/removing in/out of sliding window of observations
8  * algo = 1: frollmeanExact
9  *   recalculate whole mean for each observation, roundoff correction is adjusted, also support for NaN and Inf
10  */
frollmean(unsigned int algo,double * x,uint64_t nx,ans_t * ans,int k,int align,double fill,bool narm,int hasna,bool verbose)11 void frollmean(unsigned int algo, double *x, uint64_t nx, ans_t *ans, int k, int align, double fill, bool narm, int hasna, bool verbose) {
12   if (nx < k) {                                                 // if window width bigger than input just return vector of fill values
13     if (verbose)
14       snprintf(end(ans->message[0]), 500, _("%s: window width longer than input vector, returning all NA vector\n"), __func__);
15     // implicit n_message limit discussed here: https://github.com/Rdatatable/data.table/issues/3423#issuecomment-487722586
16     for (int i=0; i<nx; i++) {
17       ans->dbl_v[i] = fill;
18     }
19     return;
20   }
21   double tic = 0;
22   if (verbose)
23     tic = omp_get_wtime();
24   if (algo==0) {
25     frollmeanFast(x, nx, ans, k, fill, narm, hasna, verbose);
26   } else if (algo==1) {
27     frollmeanExact(x, nx, ans, k, fill, narm, hasna, verbose);
28   }
29   if (ans->status < 3 && align < 1) {                           // align center or left, only when no errors occurred
30     int k_ = align==-1 ? k-1 : floor(k/2);                      // offset to shift
31     if (verbose)
32       snprintf(end(ans->message[0]), 500, _("%s: align %d, shift answer by %d\n"), __func__, align, -k_);
33     memmove((char *)ans->dbl_v, (char *)ans->dbl_v + (k_*sizeof(double)), (nx-k_)*sizeof(double)); // apply shift to achieve expected align
34     for (uint64_t i=nx-k_; i<nx; i++) {                         // fill from right side
35       ans->dbl_v[i] = fill;
36     }
37   }
38   if (verbose)
39     snprintf(end(ans->message[0]), 500, _("%s: processing algo %u took %.3fs\n"), __func__, algo, omp_get_wtime()-tic);
40 }
41 /* fast rolling mean - fast
42  * when no info on NA (hasNA argument) then assume no NAs run faster version
43  * rollmean implemented as single pass sliding window for align="right"
44  * if NAs detected re-run rollmean implemented as single pass sliding window with NA support
45  */
frollmeanFast(double * x,uint64_t nx,ans_t * ans,int k,double fill,bool narm,int hasna,bool verbose)46 void frollmeanFast(double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasna, bool verbose) {
47   if (verbose)
48     snprintf(end(ans->message[0]), 500, _("%s: running for input length %"PRIu64", window %d, hasna %d, narm %d\n"), "frollmeanFast", (uint64_t)nx, k, hasna, (int)narm);
49   long double w = 0.0;                                          // sliding window aggregate
50   bool truehasna = hasna>0;                                     // flag to re-run with NA support if NAs detected
51   if (!truehasna) {
52     int i;                                                      // iterator declared here because it is being used after foor loop
53     for (i=0; i<k-1; i++) {                                     // loop over leading observation, all partial window only
54       w += x[i];                                                // add current row to sliding window
55       ans->dbl_v[i] = fill;                                     // answers are fill for partial window
56     }
57     w += x[i];                                                  // i==k-1
58     ans->dbl_v[i] = (double) (w / k);                           // first full sliding window, non-fill rollfun answer
59     if (R_FINITE((double) w)) {                                 // proceed only if no NAs detected in first k obs, otherwise early stopping
60       for (uint64_t i=k; i<nx; i++) {                           // loop over obs, complete window, all remaining after partial window
61         w -= x[i-k];                                            // remove leaving row from sliding window
62         w += x[i];                                              // add current row to sliding window
63         ans->dbl_v[i] = (double) (w / k);                       // rollfun to answer vector
64       }
65       if (!R_FINITE((double) w)) {                              // mark to re-run with NA care
66         if (hasna==-1) {                                        // raise warning
67           ans->status = 2;
68           snprintf(end(ans->message[2]), 500, _("%s: hasNA=FALSE used but NA (or other non-finite) value(s) are present in input, use default hasNA=NA to avoid this warning"), __func__);
69         }
70         if (verbose)
71           snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, re-running with extra care for NAs\n"), __func__);
72         w = 0.0;
73         truehasna = true;
74       }
75     } else {                                                    // early stopping branch when NAs detected in first k obs
76       if (hasna==-1) {                                          // raise warning
77         ans->status = 2;
78         snprintf(end(ans->message[2]), 500, _("%s: hasNA=FALSE used but NA (or other non-finite) value(s) are present in input, use default hasNA=NA to avoid this warning"), __func__);
79       }
80       if (verbose)
81         snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, skip non-NA attempt and run with extra care for NAs\n"), __func__);
82       w = 0.0;
83       truehasna = true;
84     }
85   }
86   if (truehasna) {
87     int nc = 0;                                                 // NA counter within sliding window
88     int i;                                                      // iterator declared here because it is being used after foor loop
89     for (i=0; i<k-1; i++) {                                     // loop over leading observation, all partial window only
90       if (R_FINITE(x[i])) {
91         w += x[i];                                              // add only finite values to window aggregate
92       } else {
93         nc++;                                                   // increment NA count in current window
94       }
95       ans->dbl_v[i] = fill;                                     // partial window fill all
96     }
97     if (R_FINITE(x[i])) {
98       w += x[i];                                                // i==k-1
99     } else {
100       nc++;
101     }
102     if (nc == 0) {
103       ans->dbl_v[i] = (double) (w / k);                         // no NAs in first full window
104     } else if (nc == k) {
105       ans->dbl_v[i] = narm ? R_NaN : NA_REAL;                   // all values in sliding window are NA, expected output for fun(NA, na.rm=T/F)
106     } else {
107       ans->dbl_v[i] = narm ? (double) (w / (k - nc)) : NA_REAL; // some values in window are NA
108     }
109     for (uint64_t i=k; i<nx; i++) {                             // loop over obs, complete window, all remaining after partial window
110       if (R_FINITE(x[i])) {
111         w += x[i];                                              // add only finite to window aggregate
112       } else {
113         nc++;                                                   // increment NA count in current window
114       }
115       if (R_FINITE(x[i-k])) {
116         w -= x[i-k];                                            // remove only finite from window aggregate
117       } else {
118         nc--;                                                   // decrement NA count in current window
119       }
120       if (nc == 0) {
121         ans->dbl_v[i] = (double) (w / k);                       // no NAs in sliding window for present observation
122       } else if (nc == k) {
123         ans->dbl_v[i] = narm ? R_NaN : NA_REAL;                 // all values in window are NA, expected output for fun(NA, na.rm=T/F)
124       } else {
125         ans->dbl_v[i] = narm ? (double) (w / (k - nc)) : NA_REAL; // some values in window are NA
126       }
127     }
128   }
129 }
130 /* fast rolling mean - exact
131  * when no info on NA (hasNA argument) then assume no NAs run faster version, also when na.rm=FALSE faster version can proceed
132  * rollmean implemented as mean of k obs for each observation for align="right"
133  * if NAs detected and na.rm=TRUE then re-run rollmean implemented as mean of k bos for each observation with NA support
134  */
frollmeanExact(double * x,uint64_t nx,ans_t * ans,int k,double fill,bool narm,int hasna,bool verbose)135 void frollmeanExact(double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasna, bool verbose) {
136   if (verbose)
137     snprintf(end(ans->message[0]), 500, _("%s: running in parallel for input length %"PRIu64", window %d, hasna %d, narm %d\n"), "frollmeanExact", (uint64_t)nx, k, hasna, (int)narm);
138   for (int i=0; i<k-1; i++) {                                   // fill partial window only
139     ans->dbl_v[i] = fill;
140   }
141   bool truehasna = hasna>0;                                     // flag to re-run with NA support if NAs detected
142   if (!truehasna || !narm) {
143     #pragma omp parallel for num_threads(getDTthreads(nx, true))
144     for (uint64_t i=k-1; i<nx; i++) {                           // loop on every observation with complete window, partial already filled in single threaded section
145       if (narm && truehasna) {
146         continue;                                               // if NAs detected no point to continue
147       }
148       long double w = 0.0;
149       for (int j=-k+1; j<=0; j++) {                             // sub-loop on window width
150         w += x[i+j];                                            // sum of window for particular observation
151       }
152       if (R_FINITE((double) w)) {                               // no need to calc roundoff correction if NAs detected as will re-call all below in truehasna==1
153         long double res = w / k;                                // keep results as long double for intermediate processing
154         long double err = 0.0;                                  // roundoff corrector
155         for (int j=-k+1; j<=0; j++) {                           // nested loop on window width
156           err += x[i+j] - res;                                  // measure difference of obs in sub-loop to calculated fun for obs
157         }
158         ans->dbl_v[i] = (double) (res + (err / k));             // adjust calculated rollfun with roundoff correction
159       } else {
160         if (!narm) {
161           ans->dbl_v[i] = (double) (w / k);                     // NAs should be propagated
162         }
163         truehasna = true;                                       // NAs detected for this window, set flag so rest of windows will not be re-run
164       }
165     }
166     if (truehasna) {
167       if (hasna==-1) {                                          // raise warning
168         ans->status = 2;
169         snprintf(end(ans->message[2]), 500, _("%s: hasNA=FALSE used but NA (or other non-finite) value(s) are present in input, use default hasNA=NA to avoid this warning"), __func__);
170       }
171       if (verbose) {
172         if (narm) {
173           snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, re-running with extra care for NAs\n"), __func__);
174         } else {
175           snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, na.rm was FALSE so in 'exact' implementation NAs were handled already, no need to re-run\n"), __func__);
176         }
177       }
178     }
179   }
180   if (truehasna && narm) {
181     #pragma omp parallel for num_threads(getDTthreads(nx, true))
182     for (uint64_t i=k-1; i<nx; i++) {                           // loop on every observation with complete window, partial already filled in single threaded section
183       long double w = 0.0;
184       int nc = 0;                                               // NA counter within sliding window
185       for (int j=-k+1; j<=0; j++) {                             // nested loop on window width
186         if (ISNAN(x[i+j])) {
187           nc++;                                                 // increment NA count in current window
188         } else {
189           w += x[i+j];                                          // add observation to current window
190         }
191       }
192       if (w > DBL_MAX) {
193         ans->dbl_v[i] = R_PosInf;                               // handle Inf for na.rm=TRUE consistently to base R
194       } else if (w < -DBL_MAX) {
195         ans->dbl_v[i] = R_NegInf;
196       } else {
197         long double res = w / k;                                // keep results as long double for intermediate processing
198         long double err = 0.0;                                  // roundoff corrector
199         if (nc == 0) {                                          // no NAs in current window
200           for (int j=-k+1; j<=0; j++) {                         // sub-loop on window width
201             err += x[i+j] - res;                                // measure roundoff for each obs in window
202           }
203           ans->dbl_v[i] = (double) (res + (err / k));           // adjust calculated fun with roundoff correction
204         } else if (nc < k) {
205           for (int j=-k+1; j<=0; j++) {                         // sub-loop on window width
206             if (!ISNAN(x[i+j])) {
207               err += x[i+j] - res;                              // measure roundoff for each non-NA obs in window
208             }
209           }
210           ans->dbl_v[i] = (double) (res + (err / (k - nc)));    // adjust calculated fun with roundoff correction
211         } else {                                                // nc == k
212           ans->dbl_v[i] = R_NaN;                                // all values NAs and narm so produce expected values
213         }
214       }
215     }
216   }
217 }
218 
219 /* fast rolling sum */
frollsum(unsigned int algo,double * x,uint64_t nx,ans_t * ans,int k,int align,double fill,bool narm,int hasna,bool verbose)220 void frollsum(unsigned int algo, double *x, uint64_t nx, ans_t *ans, int k, int align, double fill, bool narm, int hasna, bool verbose) {
221   if (nx < k) {
222     if (verbose)
223       snprintf(end(ans->message[0]), 500, _("%s: window width longer than input vector, returning all NA vector\n"), __func__);
224     for (int i=0; i<nx; i++) {
225       ans->dbl_v[i] = fill;
226     }
227     return;
228   }
229   double tic = 0;
230   if (verbose)
231     tic = omp_get_wtime();
232   if (algo==0) {
233     frollsumFast(x, nx, ans, k, fill, narm, hasna, verbose);
234   } else if (algo==1) {
235     frollsumExact(x, nx, ans, k, fill, narm, hasna, verbose);
236   }
237   if (ans->status < 3 && align < 1) {
238     int k_ = align==-1 ? k-1 : floor(k/2);
239     if (verbose)
240       snprintf(end(ans->message[0]), 500, _("%s: align %d, shift answer by %d\n"), __func__, align, -k_);
241     memmove((char *)ans->dbl_v, (char *)ans->dbl_v + (k_*sizeof(double)), (nx-k_)*sizeof(double));
242     for (uint64_t i=nx-k_; i<nx; i++) {
243       ans->dbl_v[i] = fill;
244     }
245   }
246   if (verbose)
247     snprintf(end(ans->message[0]), 500, _("%s: processing algo %u took %.3fs\n"), __func__, algo, omp_get_wtime()-tic);
248 }
frollsumFast(double * x,uint64_t nx,ans_t * ans,int k,double fill,bool narm,int hasna,bool verbose)249 void frollsumFast(double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasna, bool verbose) {
250   if (verbose)
251     snprintf(end(ans->message[0]), 500, _("%s: running for input length %"PRIu64", window %d, hasna %d, narm %d\n"), "frollsumFast", (uint64_t)nx, k, hasna, (int)narm);
252   long double w = 0.0;
253   bool truehasna = hasna>0;
254   if (!truehasna) {
255     int i;
256     for (i=0; i<k-1; i++) {
257       w += x[i];
258       ans->dbl_v[i] = fill;
259     }
260     w += x[i];
261     ans->dbl_v[i] = (double) w;
262     if (R_FINITE((double) w)) {
263       for (uint64_t i=k; i<nx; i++) {
264         w -= x[i-k];
265         w += x[i];
266         ans->dbl_v[i] = (double) w;
267       }
268       if (!R_FINITE((double) w)) {
269         if (hasna==-1) {
270           ans->status = 2;
271           snprintf(end(ans->message[2]), 500, _("%s: hasNA=FALSE used but NA (or other non-finite) value(s) are present in input, use default hasNA=NA to avoid this warning"), __func__);
272         }
273         if (verbose)
274           snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, re-running with extra care for NAs\n"), __func__);
275         w = 0.0;
276         truehasna = true;
277       }
278     } else {
279       if (hasna==-1) {
280         ans->status = 2;
281         snprintf(end(ans->message[2]), 500, _("%s: hasNA=FALSE used but NA (or other non-finite) value(s) are present in input, use default hasNA=NA to avoid this warning"), __func__);
282       }
283       if (verbose)
284         snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, skip non-NA attempt and run with extra care for NAs\n"), __func__);
285       w = 0.0;
286       truehasna = true;
287     }
288   }
289   if (truehasna) {
290     int nc = 0;
291     int i;
292     for (i=0; i<k-1; i++) {
293       if (R_FINITE(x[i])) {
294         w += x[i];
295       } else {
296         nc++;
297       }
298       ans->dbl_v[i] = fill;
299     }
300     if (R_FINITE(x[i])) {
301       w += x[i];
302     } else {
303       nc++;
304     }
305     if (nc == 0) {
306       ans->dbl_v[i] = (double) w;
307     } else if (nc == k) {
308       ans->dbl_v[i] = narm ? 0.0 : NA_REAL;
309     } else {
310       ans->dbl_v[i] = narm ? (double) w : NA_REAL;
311     }
312     for (uint64_t i=k; i<nx; i++) {
313       if (R_FINITE(x[i])) {
314         w += x[i];
315       } else {
316         nc++;
317       }
318       if (R_FINITE(x[i-k])) {
319         w -= x[i-k];
320       } else {
321         nc--;
322       }
323       if (nc == 0) {
324         ans->dbl_v[i] = (double) w;
325       } else if (nc == k) {
326         ans->dbl_v[i] = narm ? 0.0 : NA_REAL;
327       } else {
328         ans->dbl_v[i] = narm ? (double) w : NA_REAL;
329       }
330     }
331   }
332 }
frollsumExact(double * x,uint64_t nx,ans_t * ans,int k,double fill,bool narm,int hasna,bool verbose)333 void frollsumExact(double *x, uint64_t nx, ans_t *ans, int k, double fill, bool narm, int hasna, bool verbose) {
334   if (verbose)
335     snprintf(end(ans->message[0]), 500, _("%s: running in parallel for input length %"PRIu64", window %d, hasna %d, narm %d\n"), "frollsumExact", (uint64_t)nx, k, hasna, (int)narm);
336   for (int i=0; i<k-1; i++) {
337     ans->dbl_v[i] = fill;
338   }
339   bool truehasna = hasna>0;
340   if (!truehasna || !narm) {
341     #pragma omp parallel for num_threads(getDTthreads(nx, true))
342     for (uint64_t i=k-1; i<nx; i++) {
343       if (narm && truehasna) {
344         continue;
345       }
346       long double w = 0.0;
347       for (int j=-k+1; j<=0; j++) {
348         w += x[i+j];
349       }
350       if (R_FINITE((double) w)) {
351         ans->dbl_v[i] = (double) w;
352       } else {
353         if (!narm) {
354           ans->dbl_v[i] = (double) w;
355         }
356         truehasna = true;
357       }
358     }
359     if (truehasna) {
360       if (hasna==-1) {
361         ans->status = 2;
362         snprintf(end(ans->message[2]), 500, _("%s: hasNA=FALSE used but NA (or other non-finite) value(s) are present in input, use default hasNA=NA to avoid this warning"), __func__);
363       }
364       if (verbose) {
365         if (narm) {
366           snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, re-running with extra care for NAs\n"), __func__);
367         } else {
368           snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, na.rm was FALSE so in 'exact' implementation NAs were handled already, no need to re-run\n"), __func__);
369         }
370       }
371     }
372   }
373   if (truehasna && narm) {
374     #pragma omp parallel for num_threads(getDTthreads(nx, true))
375     for (uint64_t i=k-1; i<nx; i++) {
376       long double w = 0.0;
377       int nc = 0;
378       for (int j=-k+1; j<=0; j++) {
379         if (ISNAN(x[i+j])) {
380           nc++;
381         } else {
382           w += x[i+j];
383         }
384       }
385       if (w > DBL_MAX) {
386         ans->dbl_v[i] = R_PosInf;
387       } else if (w < -DBL_MAX) {
388         ans->dbl_v[i] = R_NegInf;
389       } else {
390         if (nc < k) {
391           ans->dbl_v[i] = (double) w;
392         } else {
393           ans->dbl_v[i] = 0.0;
394         }
395       }
396     }
397   }
398 }
399 
400 /* fast rolling any R function
401  * not plain C, not thread safe
402  * R eval() allocates
403  */
frollapply(double * x,int64_t nx,double * w,int k,ans_t * ans,int align,double fill,SEXP call,SEXP rho,bool verbose)404 void frollapply(double *x, int64_t nx, double *w, int k, ans_t *ans, int align, double fill, SEXP call, SEXP rho, bool verbose) {
405   if (nx < k) {
406     if (verbose)
407       Rprintf(_("%s: window width longer than input vector, returning all NA vector\n"), __func__);
408     for (int i=0; i<nx; i++) {
409       ans->dbl_v[i] = fill;
410     }
411     return;
412   }
413   double tic = 0;
414   if (verbose)
415     tic = omp_get_wtime();
416   for (int i=0; i<k-1; i++) {
417     ans->dbl_v[i] = fill;
418   }
419   // this is i=k-1 iteration - first full window - taken out from the loop
420   // we use it to add extra check that results of a FUN are length 1 numeric
421   memcpy(w, x, k*sizeof(double));
422   SEXP eval0 = PROTECT(eval(call, rho));
423   if (xlength(eval0) != 1)
424     error(_("%s: results from provided FUN are not length 1"), __func__);
425   SEXPTYPE teval0 = TYPEOF(eval0);
426   if (teval0 == REALSXP) {
427     ans->dbl_v[k-1] = REAL(eval0)[0];
428   } else {
429     if (teval0==INTSXP || teval0==LGLSXP) {
430       if (verbose)
431         Rprintf(_("%s: results from provided FUN are not of type double, coercion from integer or logical will be applied on each iteration\n"), __func__);
432       ans->dbl_v[k-1] = REAL(coerceVector(eval0, REALSXP))[0];
433     } else {
434       error(_("%s: results from provided FUN are not of type double"), __func__);
435     }
436   }
437   UNPROTECT(1); // eval0
438   // for each row it copies expected window data into w
439   // evaluate call which has been prepared to point into w
440   if (teval0 == REALSXP) {
441     for (int64_t i=k; i<nx; i++) {
442       memcpy(w, x+(i-k+1), k*sizeof(double));
443       ans->dbl_v[i] = REAL(eval(call, rho))[0]; // this may fail with for a not type-stable fun
444     }
445   } else {
446     for (int64_t i=k; i<nx; i++) {
447       memcpy(w, x+(i-k+1), k*sizeof(double));
448       SEXP evali = PROTECT(eval(call, rho));
449       ans->dbl_v[i] = REAL(coerceVector(evali, REALSXP))[0];
450       UNPROTECT(1); // evali
451     }
452   }
453   if (ans->status < 3 && align < 1) {
454     int k_ = align==-1 ? k-1 : floor(k/2);
455     if (verbose)
456       Rprintf(_("%s: align %d, shift answer by %d\n"), __func__, align, -k_);
457     memmove((char *)ans->dbl_v, (char *)ans->dbl_v + (k_*sizeof(double)), (nx-k_)*sizeof(double));
458     for (int64_t i=nx-k_; i<nx; i++) {
459       ans->dbl_v[i] = fill;
460     }
461   }
462   if (verbose)
463     Rprintf(_("%s: took %.3fs\n"), __func__, omp_get_wtime()-tic);
464 }
465