1 #include "data.table.h"
2 
3 /* fast adaptive rolling mean - router
4  * algo = 0: fadaptiverollmeanFast
5  *   first pass cumsum based solution, second pass uses cumsum to calculate answer
6  * algo = 1: fadaptiverollmeanExact
7  *   recalculate whole mean for each observation, roundoff correction is adjusted, also support for NaN and Inf
8  */
fadaptiverollmean(unsigned int algo,double * x,uint64_t nx,ans_t * ans,int * k,double fill,bool narm,int hasna,bool verbose)9 void fadaptiverollmean(unsigned int algo, double *x, uint64_t nx, ans_t *ans, int *k, double fill, bool narm, int hasna, bool verbose) {
10   double tic = 0;
11   if (verbose)
12     tic = omp_get_wtime();
13   if (algo==0) {
14     fadaptiverollmeanFast(x, nx, ans, k, fill, narm, hasna, verbose);
15   } else if (algo==1) {
16     fadaptiverollmeanExact(x, nx, ans, k, fill, narm, hasna, verbose);
17   }
18   if (verbose)
19     snprintf(end(ans->message[0]), 500, _("%s: processing algo %u took %.3fs\n"), __func__, algo, omp_get_wtime()-tic);
20   // implicit n_message limit discussed here: https://github.com/Rdatatable/data.table/issues/3423#issuecomment-487722586
21 }
22 /* fast adaptive rolling mean - fast
23  * when no info on NA (hasNA argument) then assume no NAs run faster version
24  * adaptive rollmean implemented as cumsum first pass, then diff cumsum by indexes `i` to `i-k[i]`
25  * if NAs detected re-run rollmean implemented as cumsum with NA support
26  */
fadaptiverollmeanFast(double * x,uint64_t nx,ans_t * ans,int * k,double fill,bool narm,int hasna,bool verbose)27 void fadaptiverollmeanFast(double *x, uint64_t nx, ans_t *ans, int *k, double fill, bool narm, int hasna, bool verbose) {
28   if (verbose)
29     snprintf(end(ans->message[0]), 500, _("%s: running for input length %"PRIu64", hasna %d, narm %d\n"), "fadaptiverollmeanFast", (uint64_t)nx, hasna, (int) narm);
30   bool truehasna = hasna>0;                                     // flag to re-run if NAs detected
31   long double w = 0.0;
32   double *cs = malloc(nx*sizeof(double));                       // cumsum vector, same as double cs[nx] but no segfault
33   if (!cs) {                                                    // # nocov start
34     ans->status = 3;                                            // raise error
35     snprintf(ans->message[3], 500, _("%s: Unable to allocate memory for cumsum"), __func__);
36     free(cs);
37     return;
38   }                                                             // # nocov end
39   if (!truehasna) {
40     for (uint64_t i=0; i<nx; i++) {                             // loop on every observation to calculate cumsum only
41       w += x[i];                                                // cumulate in long double
42       cs[i] = (double) w;
43     }
44     if (R_FINITE((double) w)) {                                 // no need to calc this if NAs detected as will re-calc all below in truehasna==1
45       #pragma omp parallel for num_threads(getDTthreads(nx, true))
46       for (uint64_t i=0; i<nx; i++) {                           // loop over observations to calculate final answer
47         if (i+1 == k[i]) {
48           ans->dbl_v[i] = cs[i]/k[i];                           // current obs window width exactly same as obs position in a vector
49         } else if (i+1 > k[i]) {
50           ans->dbl_v[i] = (cs[i]-cs[i-k[i]])/k[i];              // window width smaller than position so use cumsum to calculate diff
51         } else {
52           ans->dbl_v[i] = fill;                                 // position in a vector smaller than obs window width - partial window
53         }
54       }
55     } else {                                                    // update truehasna flag if NAs detected
56       if (hasna==-1) {                                          // raise warning
57         ans->status = 2;
58         snprintf(end(ans->message[2]), 500, _("%s: hasNA=FALSE used but NA (or other non-finite) value(s) are present in input, use default hasNA=NA to avoid this warning"), __func__);
59       }
60       if (verbose)
61         snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, re-running with extra care for NAs\n"), __func__);
62       w = 0.0;
63       truehasna = true;
64     }
65   }
66   if (truehasna) {
67     uint64_t nc = 0;                                            // running NA counter
68     uint64_t *cn = malloc(nx*sizeof(uint64_t));                 // cumulative NA counter, used the same way as cumsum, same as uint64_t cn[nx] but no segfault
69     if (!cn) {                                                  // # nocov start
70       ans->status = 3;                                          // raise error
71       snprintf(ans->message[3], 500, _("%s: Unable to allocate memory for cum NA counter"), __func__);
72       free(cs);
73       free(cn);
74       return;
75     }                                                           // # nocov end
76     for (uint64_t i=0; i<nx; i++) {                             // loop over observations to calculate cumsum and cum NA counter
77       if (R_FINITE(x[i])) {
78         w += x[i];                                              // add observation to running sum
79       } else {
80         nc++;                                                   // increment non-finite counter
81       }
82       cs[i] = (double) w;                                       // cumsum, na.rm=TRUE always, NAs handled using cum NA counter
83       cn[i] = nc;                                               // cum NA counter
84     }
85     #pragma omp parallel for num_threads(getDTthreads(nx, true))
86     for (uint64_t i=0; i<nx; i++) {                             // loop over observations to calculate final answer
87       if (i+1 < k[i]) {                                         // partial window
88         ans->dbl_v[i] = fill;
89       } else if (!narm) {                                       // this branch reduce number of branching in narm=1 below
90         if (i+1 == k[i]) {
91           ans->dbl_v[i] = cn[i]>0 ? NA_REAL : cs[i]/k[i];
92         } else if (i+1 > k[i]) {
93           ans->dbl_v[i] = (cn[i] - cn[i-k[i]])>0 ? NA_REAL : (cs[i]-cs[i-k[i]])/k[i];
94         }
95       } else if (i+1 == k[i]) {                                 // window width equal to observation position in vector
96         int thisk = k[i] - ((int) cn[i]);                       // window width taking NAs into account, we assume single window width is int32, cum NA counter can be int64
97         ans->dbl_v[i] = thisk==0 ? R_NaN : cs[i]/thisk;         // handle all obs NAs and na.rm=TRUE
98       } else if (i+1 > k[i]) {                                  // window width smaller than observation position in vector
99         int thisk = k[i] - ((int) (cn[i] - cn[i-k[i]]));        // window width taking NAs into account, we assume single window width is int32, cum NA counter can be int64
100         ans->dbl_v[i] = thisk==0 ? R_NaN : (cs[i]-cs[i-k[i]])/thisk; // handle all obs NAs and na.rm=TRUE
101       }
102     }
103     free(cn);
104   } // end of truehasna
105   free(cs);
106 }
107 /* fast adaptive rolling mean exact
108  * extra nested loop to calculate mean of each obs and error correction
109  * requires much more cpu
110  * uses multiple cores
111  */
fadaptiverollmeanExact(double * x,uint64_t nx,ans_t * ans,int * k,double fill,bool narm,int hasna,bool verbose)112 void fadaptiverollmeanExact(double *x, uint64_t nx, ans_t *ans, int *k, double fill, bool narm, int hasna, bool verbose) {
113   if (verbose)
114     snprintf(end(ans->message[0]), 500, _("%s: running in parallel for input length %"PRIu64", hasna %d, narm %d\n"), "fadaptiverollmeanExact", (uint64_t)nx, hasna, (int) narm);
115   bool truehasna = hasna>0;                                     // flag to re-run if NAs detected
116   if (!truehasna || !narm) {                                    // narm=FALSE handled here as NAs properly propagated in exact algo
117     #pragma omp parallel for num_threads(getDTthreads(nx, true))
118     for (uint64_t i=0; i<nx; i++) {                             // loop on every observation to produce final answer
119       if (narm && truehasna) {
120         continue;                                               // if NAs detected no point to continue
121       }
122       if (i+1 < k[i]) {
123         ans->dbl_v[i] = fill;                                   // position in a vector smaller than obs window width - partial window
124       } else {
125         long double w = 0.0;
126         for (int j=-k[i]+1; j<=0; j++) {                        // sub-loop on window width
127           w += x[i+j];                                          // sum of window for particular observation
128         }
129         if (R_FINITE((double) w)) {                             // no need to calc roundoff correction if NAs detected as will re-call all below in truehasna==1
130           long double res = w / k[i];                           // keep results as long double for intermediate processing
131           long double err = 0.0;                                // roundoff corrector
132           for (int j=-k[i]+1; j<=0; j++) {                      // sub-loop on window width
133             err += x[i+j] - res;                                // measure difference of obs in sub-loop to calculated fun for obs
134           }
135           ans->dbl_v[i] = (double) (res + (err / k[i]));        // adjust calculated fun with roundoff correction
136         } else {
137           if (!narm) {
138             ans->dbl_v[i] = (double) (w / k[i]);                // NAs should be propagated
139           }
140           truehasna = true;                                     // NAs detected for this window, set flag so rest of windows will not be re-run
141         }
142       }
143     }
144     if (truehasna) {
145       if (hasna==-1) {                                          // raise warning
146         ans->status = 2;
147         snprintf(end(ans->message[2]), 500, _("%s: hasNA=FALSE used but NA (or other non-finite) value(s) are present in input, use default hasNA=NA to avoid this warning"), __func__);
148       }
149       if (verbose) {
150         if (narm) {
151           snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, re-running with extra care for NAs\n"), __func__);
152         } else {
153           snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, na.rm was FALSE so in 'exact' implementation NAs were handled already, no need to re-run\n"), __func__);
154         }
155       }
156     }
157   }
158   if (truehasna && narm) {
159     #pragma omp parallel for num_threads(getDTthreads(nx, true))
160     for (uint64_t i=0; i<nx; i++) {                             // loop over observations to produce final answer
161       if (i+1 < k[i]) {
162         ans->dbl_v[i] = fill;                                   // partial window
163       } else {
164         long double w = 0.0;                                    // window to accumulate values in particular window
165         long double err = 0.0;                                  // accumulate roundoff error
166         long double res;                                        // keep results as long double for intermediate processing
167         int nc = 0;                                             // NA counter within current window
168         for (int j=-k[i]+1; j<=0; j++) {                        // sub-loop on window width
169           if (ISNAN(x[i+j])) {
170             nc++;                                               // increment NA count in current window
171           } else {
172             w += x[i+j];                                        // add observation to current window
173           }
174         }
175         if (w > DBL_MAX) {
176           ans->dbl_v[i] = R_PosInf;                             // handle Inf for na.rm=TRUE consistently to base R
177         } else if (w < -DBL_MAX) {
178           ans->dbl_v[i] = R_NegInf;
179         } else {
180           if (nc == 0) {                                        // no NAs in current window
181             res = w / k[i];
182             for (int j=-k[i]+1; j<=0; j++) {                    // sub-loop on window width to accumulate roundoff error
183               err += x[i+j] - res;                              // measure roundoff for each obs in window
184             }
185             ans->dbl_v[i] = (double) (res + (err / k[i]));      // adjust calculated fun with roundoff correction
186           } else if (nc < k[i]) {
187             res = w / (k[i]-nc);
188             for (int j=-k[i]+1; j<=0; j++) {                    // sub-loop on window width to accumulate roundoff error
189               if (!ISNAN(x[i+j])) {
190                 err += x[i+j] - res;                            // measure roundoff for each obs in window
191               }
192             }
193             ans->dbl_v[i] = (double) (res + (err / (k[i] - nc))); // adjust calculated fun with roundoff correction
194           } else {                                              // nc == k[i]
195             ans->dbl_v[i] = R_NaN;                              // this branch assume narm so R_NaN always here
196           }
197         }
198       }
199     }
200   } // end of truehasna
201 }
202 
203 /* fast adaptive rolling sum */
fadaptiverollsum(unsigned int algo,double * x,uint64_t nx,ans_t * ans,int * k,double fill,bool narm,int hasna,bool verbose)204 void fadaptiverollsum(unsigned int algo, double *x, uint64_t nx, ans_t *ans, int *k, double fill, bool narm, int hasna, bool verbose) {
205   double tic = 0;
206   if (verbose)
207     tic = omp_get_wtime();
208   if (algo==0) {
209     fadaptiverollsumFast(x, nx, ans, k, fill, narm, hasna, verbose);
210   } else if (algo==1) {
211     fadaptiverollsumExact(x, nx, ans, k, fill, narm, hasna, verbose);
212   }
213   if (verbose)
214     snprintf(end(ans->message[0]), 500, _("%s: processing algo %u took %.3fs\n"), __func__, algo, omp_get_wtime()-tic);
215 }
fadaptiverollsumFast(double * x,uint64_t nx,ans_t * ans,int * k,double fill,bool narm,int hasna,bool verbose)216 void fadaptiverollsumFast(double *x, uint64_t nx, ans_t *ans, int *k, double fill, bool narm, int hasna, bool verbose) {
217   if (verbose)
218     snprintf(end(ans->message[0]), 500, _("%s: running for input length %"PRIu64", hasna %d, narm %d\n"), "fadaptiverollsumFast", (uint64_t)nx, hasna, (int) narm);
219   bool truehasna = hasna>0;
220   long double w = 0.0;
221   double *cs = malloc(nx*sizeof(double));
222   if (!cs) {                                                    // # nocov start
223     ans->status = 3;
224     snprintf(ans->message[3], 500, _("%s: Unable to allocate memory for cumsum"), __func__);
225     free(cs);
226     return;
227   }                                                             // # nocov end
228   if (!truehasna) {
229     for (uint64_t i=0; i<nx; i++) {
230       w += x[i];
231       cs[i] = (double) w;
232     }
233     if (R_FINITE((double) w)) {
234       #pragma omp parallel for num_threads(getDTthreads(nx, true))
235       for (uint64_t i=0; i<nx; i++) {
236         if (i+1 == k[i]) {
237           ans->dbl_v[i] = cs[i];
238         } else if (i+1 > k[i]) {
239           ans->dbl_v[i] = cs[i]-cs[i-k[i]];
240         } else {
241           ans->dbl_v[i] = fill;
242         }
243       }
244     } else {
245       if (hasna==-1) {
246         ans->status = 2;
247         snprintf(end(ans->message[2]), 500, _("%s: hasNA=FALSE used but NA (or other non-finite) value(s) are present in input, use default hasNA=NA to avoid this warning"), __func__);
248       }
249       if (verbose)
250         snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, re-running with extra care for NAs\n"), __func__);
251       w = 0.0;
252       truehasna = true;
253     }
254   }
255   if (truehasna) {
256     uint64_t nc = 0;
257     uint64_t *cn = malloc(nx*sizeof(uint64_t));
258     if (!cn) {                                                  // # nocov start
259       ans->status = 3;
260       snprintf(ans->message[3], 500, _("%s: Unable to allocate memory for cum NA counter"), __func__);
261       free(cs);
262       free(cn);
263       return;
264     }                                                           // # nocov end
265     for (uint64_t i=0; i<nx; i++) {
266       if (R_FINITE(x[i])) {
267         w += x[i];
268       } else {
269         nc++;
270       }
271       cs[i] = (double) w;
272       cn[i] = nc;
273     }
274     #pragma omp parallel for num_threads(getDTthreads(nx, true))
275     for (uint64_t i=0; i<nx; i++) {
276       if (i+1 < k[i]) {
277         ans->dbl_v[i] = fill;
278       } else if (!narm) {
279         if (i+1 == k[i]) {
280           ans->dbl_v[i] = cn[i]>0 ? NA_REAL : cs[i];
281         } else if (i+1 > k[i]) {
282           ans->dbl_v[i] = (cn[i] - cn[i-k[i]])>0 ? NA_REAL : cs[i]-cs[i-k[i]];
283         }
284       } else if (i+1 == k[i]) {
285         int thisk = k[i] - ((int) cn[i]);
286         ans->dbl_v[i] = thisk==0 ? 0.0 : cs[i];
287       } else if (i+1 > k[i]) {
288         int thisk = k[i] - ((int) (cn[i] - cn[i-k[i]]));
289         ans->dbl_v[i] = thisk==0 ? 0.0 : cs[i]-cs[i-k[i]];
290       }
291     }
292     free(cn);
293   }
294   free(cs);
295 }
fadaptiverollsumExact(double * x,uint64_t nx,ans_t * ans,int * k,double fill,bool narm,int hasna,bool verbose)296 void fadaptiverollsumExact(double *x, uint64_t nx, ans_t *ans, int *k, double fill, bool narm, int hasna, bool verbose) {
297   if (verbose)
298     snprintf(end(ans->message[0]), 500, _("%s: running in parallel for input length %"PRIu64", hasna %d, narm %d\n"), "fadaptiverollsumExact", (uint64_t)nx, hasna, (int) narm);
299   bool truehasna = hasna>0;
300   if (!truehasna || !narm) {
301     #pragma omp parallel for num_threads(getDTthreads(nx, true))
302     for (uint64_t i=0; i<nx; i++) {
303       if (narm && truehasna) {
304         continue;
305       }
306       if (i+1 < k[i]) {
307         ans->dbl_v[i] = fill;
308       } else {
309         long double w = 0.0;
310         for (int j=-k[i]+1; j<=0; j++) {
311           w += x[i+j];
312         }
313         if (R_FINITE((double) w)) {
314           ans->dbl_v[i] = (double) w;
315         } else {
316           if (!narm) {
317             ans->dbl_v[i] = (double) w;
318           }
319           truehasna = true;
320         }
321       }
322     }
323     if (truehasna) {
324       if (hasna==-1) {
325         ans->status = 2;
326         snprintf(end(ans->message[2]), 500, _("%s: hasNA=FALSE used but NA (or other non-finite) value(s) are present in input, use default hasNA=NA to avoid this warning"), __func__);
327       }
328       if (verbose) {
329         if (narm) {
330           snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, re-running with extra care for NAs\n"), __func__);
331         } else {
332           snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, na.rm was FALSE so in 'exact' implementation NAs were handled already, no need to re-run\n"), __func__);
333         }
334       }
335     }
336   }
337   if (truehasna && narm) {
338     #pragma omp parallel for num_threads(getDTthreads(nx, true))
339     for (uint64_t i=0; i<nx; i++) {
340       if (i+1 < k[i]) {
341         ans->dbl_v[i] = fill;
342       } else {
343         long double w = 0.0;
344         int nc = 0;
345         for (int j=-k[i]+1; j<=0; j++) {
346           if (ISNAN(x[i+j])) {
347             nc++;
348           } else {
349             w += x[i+j];
350           }
351         }
352         if (w > DBL_MAX) {
353           ans->dbl_v[i] = R_PosInf;
354         } else if (w < -DBL_MAX) {
355           ans->dbl_v[i] = R_NegInf;
356         } else {
357           if (nc < k[i]) {
358             ans->dbl_v[i] = (double) w;
359           } else {
360             ans->dbl_v[i] = 0.0;
361           }
362         }
363       }
364     }
365   }
366 }
367