1 #include "data.table.h"
2
3 /* fast adaptive rolling mean - router
4 * algo = 0: fadaptiverollmeanFast
5 * first pass cumsum based solution, second pass uses cumsum to calculate answer
6 * algo = 1: fadaptiverollmeanExact
7 * recalculate whole mean for each observation, roundoff correction is adjusted, also support for NaN and Inf
8 */
fadaptiverollmean(unsigned int algo,double * x,uint64_t nx,ans_t * ans,int * k,double fill,bool narm,int hasna,bool verbose)9 void fadaptiverollmean(unsigned int algo, double *x, uint64_t nx, ans_t *ans, int *k, double fill, bool narm, int hasna, bool verbose) {
10 double tic = 0;
11 if (verbose)
12 tic = omp_get_wtime();
13 if (algo==0) {
14 fadaptiverollmeanFast(x, nx, ans, k, fill, narm, hasna, verbose);
15 } else if (algo==1) {
16 fadaptiverollmeanExact(x, nx, ans, k, fill, narm, hasna, verbose);
17 }
18 if (verbose)
19 snprintf(end(ans->message[0]), 500, _("%s: processing algo %u took %.3fs\n"), __func__, algo, omp_get_wtime()-tic);
20 // implicit n_message limit discussed here: https://github.com/Rdatatable/data.table/issues/3423#issuecomment-487722586
21 }
22 /* fast adaptive rolling mean - fast
23 * when no info on NA (hasNA argument) then assume no NAs run faster version
24 * adaptive rollmean implemented as cumsum first pass, then diff cumsum by indexes `i` to `i-k[i]`
25 * if NAs detected re-run rollmean implemented as cumsum with NA support
26 */
fadaptiverollmeanFast(double * x,uint64_t nx,ans_t * ans,int * k,double fill,bool narm,int hasna,bool verbose)27 void fadaptiverollmeanFast(double *x, uint64_t nx, ans_t *ans, int *k, double fill, bool narm, int hasna, bool verbose) {
28 if (verbose)
29 snprintf(end(ans->message[0]), 500, _("%s: running for input length %"PRIu64", hasna %d, narm %d\n"), "fadaptiverollmeanFast", (uint64_t)nx, hasna, (int) narm);
30 bool truehasna = hasna>0; // flag to re-run if NAs detected
31 long double w = 0.0;
32 double *cs = malloc(nx*sizeof(double)); // cumsum vector, same as double cs[nx] but no segfault
33 if (!cs) { // # nocov start
34 ans->status = 3; // raise error
35 snprintf(ans->message[3], 500, _("%s: Unable to allocate memory for cumsum"), __func__);
36 free(cs);
37 return;
38 } // # nocov end
39 if (!truehasna) {
40 for (uint64_t i=0; i<nx; i++) { // loop on every observation to calculate cumsum only
41 w += x[i]; // cumulate in long double
42 cs[i] = (double) w;
43 }
44 if (R_FINITE((double) w)) { // no need to calc this if NAs detected as will re-calc all below in truehasna==1
45 #pragma omp parallel for num_threads(getDTthreads(nx, true))
46 for (uint64_t i=0; i<nx; i++) { // loop over observations to calculate final answer
47 if (i+1 == k[i]) {
48 ans->dbl_v[i] = cs[i]/k[i]; // current obs window width exactly same as obs position in a vector
49 } else if (i+1 > k[i]) {
50 ans->dbl_v[i] = (cs[i]-cs[i-k[i]])/k[i]; // window width smaller than position so use cumsum to calculate diff
51 } else {
52 ans->dbl_v[i] = fill; // position in a vector smaller than obs window width - partial window
53 }
54 }
55 } else { // update truehasna flag if NAs detected
56 if (hasna==-1) { // raise warning
57 ans->status = 2;
58 snprintf(end(ans->message[2]), 500, _("%s: hasNA=FALSE used but NA (or other non-finite) value(s) are present in input, use default hasNA=NA to avoid this warning"), __func__);
59 }
60 if (verbose)
61 snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, re-running with extra care for NAs\n"), __func__);
62 w = 0.0;
63 truehasna = true;
64 }
65 }
66 if (truehasna) {
67 uint64_t nc = 0; // running NA counter
68 uint64_t *cn = malloc(nx*sizeof(uint64_t)); // cumulative NA counter, used the same way as cumsum, same as uint64_t cn[nx] but no segfault
69 if (!cn) { // # nocov start
70 ans->status = 3; // raise error
71 snprintf(ans->message[3], 500, _("%s: Unable to allocate memory for cum NA counter"), __func__);
72 free(cs);
73 free(cn);
74 return;
75 } // # nocov end
76 for (uint64_t i=0; i<nx; i++) { // loop over observations to calculate cumsum and cum NA counter
77 if (R_FINITE(x[i])) {
78 w += x[i]; // add observation to running sum
79 } else {
80 nc++; // increment non-finite counter
81 }
82 cs[i] = (double) w; // cumsum, na.rm=TRUE always, NAs handled using cum NA counter
83 cn[i] = nc; // cum NA counter
84 }
85 #pragma omp parallel for num_threads(getDTthreads(nx, true))
86 for (uint64_t i=0; i<nx; i++) { // loop over observations to calculate final answer
87 if (i+1 < k[i]) { // partial window
88 ans->dbl_v[i] = fill;
89 } else if (!narm) { // this branch reduce number of branching in narm=1 below
90 if (i+1 == k[i]) {
91 ans->dbl_v[i] = cn[i]>0 ? NA_REAL : cs[i]/k[i];
92 } else if (i+1 > k[i]) {
93 ans->dbl_v[i] = (cn[i] - cn[i-k[i]])>0 ? NA_REAL : (cs[i]-cs[i-k[i]])/k[i];
94 }
95 } else if (i+1 == k[i]) { // window width equal to observation position in vector
96 int thisk = k[i] - ((int) cn[i]); // window width taking NAs into account, we assume single window width is int32, cum NA counter can be int64
97 ans->dbl_v[i] = thisk==0 ? R_NaN : cs[i]/thisk; // handle all obs NAs and na.rm=TRUE
98 } else if (i+1 > k[i]) { // window width smaller than observation position in vector
99 int thisk = k[i] - ((int) (cn[i] - cn[i-k[i]])); // window width taking NAs into account, we assume single window width is int32, cum NA counter can be int64
100 ans->dbl_v[i] = thisk==0 ? R_NaN : (cs[i]-cs[i-k[i]])/thisk; // handle all obs NAs and na.rm=TRUE
101 }
102 }
103 free(cn);
104 } // end of truehasna
105 free(cs);
106 }
107 /* fast adaptive rolling mean exact
108 * extra nested loop to calculate mean of each obs and error correction
109 * requires much more cpu
110 * uses multiple cores
111 */
fadaptiverollmeanExact(double * x,uint64_t nx,ans_t * ans,int * k,double fill,bool narm,int hasna,bool verbose)112 void fadaptiverollmeanExact(double *x, uint64_t nx, ans_t *ans, int *k, double fill, bool narm, int hasna, bool verbose) {
113 if (verbose)
114 snprintf(end(ans->message[0]), 500, _("%s: running in parallel for input length %"PRIu64", hasna %d, narm %d\n"), "fadaptiverollmeanExact", (uint64_t)nx, hasna, (int) narm);
115 bool truehasna = hasna>0; // flag to re-run if NAs detected
116 if (!truehasna || !narm) { // narm=FALSE handled here as NAs properly propagated in exact algo
117 #pragma omp parallel for num_threads(getDTthreads(nx, true))
118 for (uint64_t i=0; i<nx; i++) { // loop on every observation to produce final answer
119 if (narm && truehasna) {
120 continue; // if NAs detected no point to continue
121 }
122 if (i+1 < k[i]) {
123 ans->dbl_v[i] = fill; // position in a vector smaller than obs window width - partial window
124 } else {
125 long double w = 0.0;
126 for (int j=-k[i]+1; j<=0; j++) { // sub-loop on window width
127 w += x[i+j]; // sum of window for particular observation
128 }
129 if (R_FINITE((double) w)) { // no need to calc roundoff correction if NAs detected as will re-call all below in truehasna==1
130 long double res = w / k[i]; // keep results as long double for intermediate processing
131 long double err = 0.0; // roundoff corrector
132 for (int j=-k[i]+1; j<=0; j++) { // sub-loop on window width
133 err += x[i+j] - res; // measure difference of obs in sub-loop to calculated fun for obs
134 }
135 ans->dbl_v[i] = (double) (res + (err / k[i])); // adjust calculated fun with roundoff correction
136 } else {
137 if (!narm) {
138 ans->dbl_v[i] = (double) (w / k[i]); // NAs should be propagated
139 }
140 truehasna = true; // NAs detected for this window, set flag so rest of windows will not be re-run
141 }
142 }
143 }
144 if (truehasna) {
145 if (hasna==-1) { // raise warning
146 ans->status = 2;
147 snprintf(end(ans->message[2]), 500, _("%s: hasNA=FALSE used but NA (or other non-finite) value(s) are present in input, use default hasNA=NA to avoid this warning"), __func__);
148 }
149 if (verbose) {
150 if (narm) {
151 snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, re-running with extra care for NAs\n"), __func__);
152 } else {
153 snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, na.rm was FALSE so in 'exact' implementation NAs were handled already, no need to re-run\n"), __func__);
154 }
155 }
156 }
157 }
158 if (truehasna && narm) {
159 #pragma omp parallel for num_threads(getDTthreads(nx, true))
160 for (uint64_t i=0; i<nx; i++) { // loop over observations to produce final answer
161 if (i+1 < k[i]) {
162 ans->dbl_v[i] = fill; // partial window
163 } else {
164 long double w = 0.0; // window to accumulate values in particular window
165 long double err = 0.0; // accumulate roundoff error
166 long double res; // keep results as long double for intermediate processing
167 int nc = 0; // NA counter within current window
168 for (int j=-k[i]+1; j<=0; j++) { // sub-loop on window width
169 if (ISNAN(x[i+j])) {
170 nc++; // increment NA count in current window
171 } else {
172 w += x[i+j]; // add observation to current window
173 }
174 }
175 if (w > DBL_MAX) {
176 ans->dbl_v[i] = R_PosInf; // handle Inf for na.rm=TRUE consistently to base R
177 } else if (w < -DBL_MAX) {
178 ans->dbl_v[i] = R_NegInf;
179 } else {
180 if (nc == 0) { // no NAs in current window
181 res = w / k[i];
182 for (int j=-k[i]+1; j<=0; j++) { // sub-loop on window width to accumulate roundoff error
183 err += x[i+j] - res; // measure roundoff for each obs in window
184 }
185 ans->dbl_v[i] = (double) (res + (err / k[i])); // adjust calculated fun with roundoff correction
186 } else if (nc < k[i]) {
187 res = w / (k[i]-nc);
188 for (int j=-k[i]+1; j<=0; j++) { // sub-loop on window width to accumulate roundoff error
189 if (!ISNAN(x[i+j])) {
190 err += x[i+j] - res; // measure roundoff for each obs in window
191 }
192 }
193 ans->dbl_v[i] = (double) (res + (err / (k[i] - nc))); // adjust calculated fun with roundoff correction
194 } else { // nc == k[i]
195 ans->dbl_v[i] = R_NaN; // this branch assume narm so R_NaN always here
196 }
197 }
198 }
199 }
200 } // end of truehasna
201 }
202
203 /* fast adaptive rolling sum */
fadaptiverollsum(unsigned int algo,double * x,uint64_t nx,ans_t * ans,int * k,double fill,bool narm,int hasna,bool verbose)204 void fadaptiverollsum(unsigned int algo, double *x, uint64_t nx, ans_t *ans, int *k, double fill, bool narm, int hasna, bool verbose) {
205 double tic = 0;
206 if (verbose)
207 tic = omp_get_wtime();
208 if (algo==0) {
209 fadaptiverollsumFast(x, nx, ans, k, fill, narm, hasna, verbose);
210 } else if (algo==1) {
211 fadaptiverollsumExact(x, nx, ans, k, fill, narm, hasna, verbose);
212 }
213 if (verbose)
214 snprintf(end(ans->message[0]), 500, _("%s: processing algo %u took %.3fs\n"), __func__, algo, omp_get_wtime()-tic);
215 }
fadaptiverollsumFast(double * x,uint64_t nx,ans_t * ans,int * k,double fill,bool narm,int hasna,bool verbose)216 void fadaptiverollsumFast(double *x, uint64_t nx, ans_t *ans, int *k, double fill, bool narm, int hasna, bool verbose) {
217 if (verbose)
218 snprintf(end(ans->message[0]), 500, _("%s: running for input length %"PRIu64", hasna %d, narm %d\n"), "fadaptiverollsumFast", (uint64_t)nx, hasna, (int) narm);
219 bool truehasna = hasna>0;
220 long double w = 0.0;
221 double *cs = malloc(nx*sizeof(double));
222 if (!cs) { // # nocov start
223 ans->status = 3;
224 snprintf(ans->message[3], 500, _("%s: Unable to allocate memory for cumsum"), __func__);
225 free(cs);
226 return;
227 } // # nocov end
228 if (!truehasna) {
229 for (uint64_t i=0; i<nx; i++) {
230 w += x[i];
231 cs[i] = (double) w;
232 }
233 if (R_FINITE((double) w)) {
234 #pragma omp parallel for num_threads(getDTthreads(nx, true))
235 for (uint64_t i=0; i<nx; i++) {
236 if (i+1 == k[i]) {
237 ans->dbl_v[i] = cs[i];
238 } else if (i+1 > k[i]) {
239 ans->dbl_v[i] = cs[i]-cs[i-k[i]];
240 } else {
241 ans->dbl_v[i] = fill;
242 }
243 }
244 } else {
245 if (hasna==-1) {
246 ans->status = 2;
247 snprintf(end(ans->message[2]), 500, _("%s: hasNA=FALSE used but NA (or other non-finite) value(s) are present in input, use default hasNA=NA to avoid this warning"), __func__);
248 }
249 if (verbose)
250 snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, re-running with extra care for NAs\n"), __func__);
251 w = 0.0;
252 truehasna = true;
253 }
254 }
255 if (truehasna) {
256 uint64_t nc = 0;
257 uint64_t *cn = malloc(nx*sizeof(uint64_t));
258 if (!cn) { // # nocov start
259 ans->status = 3;
260 snprintf(ans->message[3], 500, _("%s: Unable to allocate memory for cum NA counter"), __func__);
261 free(cs);
262 free(cn);
263 return;
264 } // # nocov end
265 for (uint64_t i=0; i<nx; i++) {
266 if (R_FINITE(x[i])) {
267 w += x[i];
268 } else {
269 nc++;
270 }
271 cs[i] = (double) w;
272 cn[i] = nc;
273 }
274 #pragma omp parallel for num_threads(getDTthreads(nx, true))
275 for (uint64_t i=0; i<nx; i++) {
276 if (i+1 < k[i]) {
277 ans->dbl_v[i] = fill;
278 } else if (!narm) {
279 if (i+1 == k[i]) {
280 ans->dbl_v[i] = cn[i]>0 ? NA_REAL : cs[i];
281 } else if (i+1 > k[i]) {
282 ans->dbl_v[i] = (cn[i] - cn[i-k[i]])>0 ? NA_REAL : cs[i]-cs[i-k[i]];
283 }
284 } else if (i+1 == k[i]) {
285 int thisk = k[i] - ((int) cn[i]);
286 ans->dbl_v[i] = thisk==0 ? 0.0 : cs[i];
287 } else if (i+1 > k[i]) {
288 int thisk = k[i] - ((int) (cn[i] - cn[i-k[i]]));
289 ans->dbl_v[i] = thisk==0 ? 0.0 : cs[i]-cs[i-k[i]];
290 }
291 }
292 free(cn);
293 }
294 free(cs);
295 }
fadaptiverollsumExact(double * x,uint64_t nx,ans_t * ans,int * k,double fill,bool narm,int hasna,bool verbose)296 void fadaptiverollsumExact(double *x, uint64_t nx, ans_t *ans, int *k, double fill, bool narm, int hasna, bool verbose) {
297 if (verbose)
298 snprintf(end(ans->message[0]), 500, _("%s: running in parallel for input length %"PRIu64", hasna %d, narm %d\n"), "fadaptiverollsumExact", (uint64_t)nx, hasna, (int) narm);
299 bool truehasna = hasna>0;
300 if (!truehasna || !narm) {
301 #pragma omp parallel for num_threads(getDTthreads(nx, true))
302 for (uint64_t i=0; i<nx; i++) {
303 if (narm && truehasna) {
304 continue;
305 }
306 if (i+1 < k[i]) {
307 ans->dbl_v[i] = fill;
308 } else {
309 long double w = 0.0;
310 for (int j=-k[i]+1; j<=0; j++) {
311 w += x[i+j];
312 }
313 if (R_FINITE((double) w)) {
314 ans->dbl_v[i] = (double) w;
315 } else {
316 if (!narm) {
317 ans->dbl_v[i] = (double) w;
318 }
319 truehasna = true;
320 }
321 }
322 }
323 if (truehasna) {
324 if (hasna==-1) {
325 ans->status = 2;
326 snprintf(end(ans->message[2]), 500, _("%s: hasNA=FALSE used but NA (or other non-finite) value(s) are present in input, use default hasNA=NA to avoid this warning"), __func__);
327 }
328 if (verbose) {
329 if (narm) {
330 snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, re-running with extra care for NAs\n"), __func__);
331 } else {
332 snprintf(end(ans->message[0]), 500, _("%s: NA (or other non-finite) value(s) are present in input, na.rm was FALSE so in 'exact' implementation NAs were handled already, no need to re-run\n"), __func__);
333 }
334 }
335 }
336 }
337 if (truehasna && narm) {
338 #pragma omp parallel for num_threads(getDTthreads(nx, true))
339 for (uint64_t i=0; i<nx; i++) {
340 if (i+1 < k[i]) {
341 ans->dbl_v[i] = fill;
342 } else {
343 long double w = 0.0;
344 int nc = 0;
345 for (int j=-k[i]+1; j<=0; j++) {
346 if (ISNAN(x[i+j])) {
347 nc++;
348 } else {
349 w += x[i+j];
350 }
351 }
352 if (w > DBL_MAX) {
353 ans->dbl_v[i] = R_PosInf;
354 } else if (w < -DBL_MAX) {
355 ans->dbl_v[i] = R_NegInf;
356 } else {
357 if (nc < k[i]) {
358 ans->dbl_v[i] = (double) w;
359 } else {
360 ans->dbl_v[i] = 0.0;
361 }
362 }
363 }
364 }
365 }
366 }
367