1 /*****************************************************************************
2  * RRDtool 1.2.30  Copyright by Tobi Oetiker, 1997-2009
3  *****************************************************************************
4  * rrd_hw.c : Support for Holt-Winters Smoothing/ Aberrant Behavior Detection
5  *****************************************************************************
6  * Initial version by Jake Brutlag, WebTV Networks, 5/1/00
7  *****************************************************************************/
8 
9 #include "rrd_tool.h"
10 #include "rrd_hw.h"
11 
12 /* #define DEBUG */
13 
14 /* private functions */
15 unsigned long MyMod(signed long val, unsigned long mod);
16 int update_hwpredict(rrd_t *rrd, unsigned long cdp_idx, unsigned long rra_idx,
17                  unsigned long ds_idx, unsigned short CDP_scratch_idx);
18 int update_seasonal(rrd_t *rrd, unsigned long cdp_idx, unsigned long rra_idx,
19 				 unsigned long ds_idx, unsigned short CDP_scratch_idx,
20 				 rrd_value_t *seasonal_coef);
21 int update_devpredict(rrd_t *rrd, unsigned long cdp_idx,
22 				  unsigned long rra_idx, unsigned long ds_idx, unsigned short CDP_scratch_idx);
23 int update_devseasonal(rrd_t *rrd, unsigned long cdp_idx, unsigned long rra_idx,
24 	               unsigned long ds_idx, unsigned short CDP_scratch_idx,
25 				   rrd_value_t *seasonal_dev);
26 int update_failures(rrd_t *rrd, unsigned long cdp_idx, unsigned long rra_idx,
27 				unsigned long ds_idx, unsigned short CDP_scratch_idx);
28 
29 int
update_hwpredict(rrd_t * rrd,unsigned long cdp_idx,unsigned long rra_idx,unsigned long ds_idx,unsigned short CDP_scratch_idx)30 update_hwpredict(rrd_t *rrd, unsigned long cdp_idx, unsigned long rra_idx,
31                  unsigned long ds_idx, unsigned short CDP_scratch_idx)
32 {
33    rrd_value_t prediction, seasonal_coef;
34    unsigned long dependent_rra_idx, seasonal_cdp_idx;
35    unival *coefs = rrd -> cdp_prep[cdp_idx].scratch;
36    rra_def_t *current_rra = &(rrd -> rra_def[rra_idx]);
37 
38    /* save coefficients from current prediction */
39    coefs[CDP_hw_last_intercept].u_val = coefs[CDP_hw_intercept].u_val;
40    coefs[CDP_hw_last_slope].u_val = coefs[CDP_hw_slope].u_val;
41    coefs[CDP_last_null_count].u_cnt = coefs[CDP_null_count].u_cnt;
42 
43    /* retrieve the current seasonal coef */
44    dependent_rra_idx = current_rra -> par[RRA_dependent_rra_idx].u_cnt;
45    seasonal_cdp_idx = dependent_rra_idx*(rrd -> stat_head -> ds_cnt) + ds_idx;
46    if (dependent_rra_idx < rra_idx)
47 	  seasonal_coef = rrd -> cdp_prep[seasonal_cdp_idx].scratch[CDP_hw_last_seasonal].u_val;
48    else
49 	  seasonal_coef = rrd -> cdp_prep[seasonal_cdp_idx].scratch[CDP_hw_seasonal].u_val;
50 
51    /* compute the prediction */
52    if (isnan(coefs[CDP_hw_intercept].u_val) || isnan(coefs[CDP_hw_slope].u_val)
53       || isnan(seasonal_coef))
54    {
55      prediction = DNAN;
56 
57      /* bootstrap initialization of slope and intercept */
58      if (isnan(coefs[CDP_hw_intercept].u_val) &&
59 	 !isnan(coefs[CDP_scratch_idx].u_val))
60      {
61 #ifdef DEBUG
62        fprintf(stderr,"Initialization of slope/intercept\n");
63 #endif
64        coefs[CDP_hw_intercept].u_val = coefs[CDP_scratch_idx].u_val;
65        coefs[CDP_hw_last_intercept].u_val = coefs[CDP_scratch_idx].u_val;
66        /* initialize the slope to 0 */
67        coefs[CDP_hw_slope].u_val = 0.0;
68        coefs[CDP_hw_last_slope].u_val = 0.0;
69        /* initialize null count to 1 */
70        coefs[CDP_null_count].u_cnt = 1;
71        coefs[CDP_last_null_count].u_cnt = 1;
72      }
73      /* if seasonal coefficient is NA, then don't update intercept, slope */
74    } else {
75      prediction = coefs[CDP_hw_intercept].u_val +
76        (coefs[CDP_hw_slope].u_val)*(coefs[CDP_null_count].u_cnt)
77        + seasonal_coef;
78 #ifdef DEBUG
79      fprintf(stderr,"computed prediction: %f\n",prediction);
80 #endif
81      if (isnan(coefs[CDP_scratch_idx].u_val))
82      {
83        /* NA value, no updates of intercept, slope;
84 	    * increment the null count */
85        (coefs[CDP_null_count].u_cnt)++;
86      } else {
87 #ifdef DEBUG
88        fprintf(stderr,"Updating intercept, slope\n");
89 #endif
90        /* update the intercept */
91        coefs[CDP_hw_intercept].u_val = (current_rra -> par[RRA_hw_alpha].u_val)*
92 	 	(coefs[CDP_scratch_idx].u_val - seasonal_coef) +
93 	 	(1 - current_rra -> par[RRA_hw_alpha].u_val)*(coefs[CDP_hw_intercept].u_val
94 	 	+ (coefs[CDP_hw_slope].u_val)*(coefs[CDP_null_count].u_cnt));
95        /* update the slope */
96        coefs[CDP_hw_slope].u_val = (current_rra -> par[RRA_hw_beta].u_val)*
97 	 	(coefs[CDP_hw_intercept].u_val - coefs[CDP_hw_last_intercept].u_val) +
98 	 	(1 - current_rra -> par[RRA_hw_beta].u_val)*(coefs[CDP_hw_slope].u_val);
99        /* reset the null count */
100        coefs[CDP_null_count].u_cnt = 1;
101      }
102    }
103 
104    /* store the prediction for writing */
105    coefs[CDP_scratch_idx].u_val = prediction;
106    return 0;
107 }
108 
109 int
lookup_seasonal(rrd_t * rrd,unsigned long rra_idx,unsigned long rra_start,FILE * rrd_file,unsigned long offset,rrd_value_t ** seasonal_coef)110 lookup_seasonal(rrd_t *rrd, unsigned long rra_idx, unsigned long rra_start,
111 				FILE *rrd_file, unsigned long offset, rrd_value_t **seasonal_coef)
112 {
113    unsigned long pos_tmp;
114    /* rra_ptr[].cur_row points to the rra row to be written; this function
115 	* reads cur_row + offset */
116    unsigned long row_idx = rrd -> rra_ptr[rra_idx].cur_row + offset;
117    /* handle wrap around */
118    if (row_idx >= rrd -> rra_def[rra_idx].row_cnt)
119      row_idx = row_idx % (rrd -> rra_def[rra_idx].row_cnt);
120 
121    /* rra_start points to the appropriate rra block in the file */
122    /* compute the pointer to the appropriate location in the file */
123    pos_tmp = rra_start + (row_idx)*(rrd -> stat_head -> ds_cnt)*sizeof(rrd_value_t);
124 
125    /* allocate memory if need be */
126    if (*seasonal_coef == NULL)
127 	  *seasonal_coef =
128 	     (rrd_value_t *) malloc((rrd -> stat_head -> ds_cnt)*sizeof(rrd_value_t));
129    if (*seasonal_coef == NULL) {
130 	  rrd_set_error("memory allocation failure: seasonal coef");
131 	  return -1;
132    }
133 
134    if (!fseek(rrd_file,pos_tmp,SEEK_SET))
135    {
136       if (fread(*seasonal_coef,sizeof(rrd_value_t),rrd->stat_head->ds_cnt,rrd_file)
137 		  == rrd -> stat_head -> ds_cnt)
138 	  {
139 		 /* success! */
140          /* we can safely ignore the rule requiring a seek operation between read
141           * and write, because this read moves the file pointer to somewhere
142           * in the file other than the next write location.
143           * */
144 		 return 0;
145 	  } else {
146 	     rrd_set_error("read operation failed in lookup_seasonal(): %lu\n",pos_tmp);
147 	  }
148    } else {
149 	  rrd_set_error("seek operation failed in lookup_seasonal(): %lu\n",pos_tmp);
150    }
151 
152    return -1;
153 }
154 
155 int
update_seasonal(rrd_t * rrd,unsigned long cdp_idx,unsigned long rra_idx,unsigned long ds_idx,unsigned short CDP_scratch_idx,rrd_value_t * seasonal_coef)156 update_seasonal(rrd_t *rrd, unsigned long cdp_idx, unsigned long rra_idx,
157 				 unsigned long ds_idx, unsigned short CDP_scratch_idx, rrd_value_t *seasonal_coef)
158 {
159 /* TODO: extract common if subblocks in the wake of I/O optimization */
160    rrd_value_t intercept, seasonal;
161    rra_def_t *current_rra = &(rrd -> rra_def[rra_idx]);
162    rra_def_t *hw_rra = &(rrd -> rra_def[current_rra -> par[RRA_dependent_rra_idx].u_cnt]);
163    /* obtain cdp_prep index for HWPREDICT */
164    unsigned long hw_cdp_idx = (current_rra -> par[RRA_dependent_rra_idx].u_cnt)
165       * (rrd -> stat_head -> ds_cnt) + ds_idx;
166    unival *coefs = rrd -> cdp_prep[hw_cdp_idx].scratch;
167 
168    /* update seasonal coefficient in cdp prep areas */
169    seasonal = rrd -> cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val;
170    rrd -> cdp_prep[cdp_idx].scratch[CDP_hw_last_seasonal].u_val = seasonal;
171    rrd -> cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val =
172 	  seasonal_coef[ds_idx];
173 
174    /* update seasonal value for disk */
175    if (current_rra -> par[RRA_dependent_rra_idx].u_cnt < rra_idx)
176 	  /* associated HWPREDICT has already been updated */
177 	  /* check for possible NA values */
178       if (isnan(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val))
179 	  {
180 		 /* no update, store the old value unchanged,
181 		  * doesn't matter if it is NA */
182 	     rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = seasonal;
183 	  } else if (isnan(coefs[CDP_hw_last_intercept].u_val)
184 		     || isnan(coefs[CDP_hw_last_slope].u_val))
185 	  {
186 		 /* this should never happen, as HWPREDICT was already updated */
187 		 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val= DNAN;
188 	  } else if (isnan(seasonal))
189 	  {
190 		 /* initialization: intercept is not currently being updated */
191 #ifdef DEBUG
192 		 fprintf(stderr,"Initialization of seasonal coef %lu\n",
193 			rrd -> rra_ptr[rra_idx].cur_row);
194 #endif
195 		 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val
196 			-= coefs[CDP_hw_last_intercept].u_val;
197 	  } else {
198 	         intercept = coefs[CDP_hw_intercept].u_val;
199 #ifdef DEBUG
200 		 fprintf(stderr,
201 			"Updating seasonal, params: gamma %f, new intercept %f, old seasonal %f\n",
202 			current_rra -> par[RRA_seasonal_gamma].u_val,
203 			intercept, seasonal);
204 #endif
205 		 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =
206 			(current_rra -> par[RRA_seasonal_gamma].u_val)*
207 			(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val - intercept) +
208 			(1 - current_rra -> par[RRA_seasonal_gamma].u_val)*seasonal;
209 	  }
210    else {
211 	  /* SEASONAL array is updated first, which means the new intercept
212 	   * hasn't be computed; so we compute it here. */
213 
214 	  /* check for possible NA values */
215       if (isnan(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val))
216 	  {
217 		 /* no update, simple store the old value unchanged,
218 		  * doesn't matter if it is NA */
219 		 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =  seasonal;
220 	  } else if (isnan(coefs[CDP_hw_intercept].u_val)
221 		     || isnan(coefs[CDP_hw_slope].u_val))
222 	  {
223 		 /* Initialization of slope and intercept will occur.
224 		  * force seasonal coefficient to 0. */
225 		 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val= 0.0;
226 	  } else if (isnan(seasonal))
227 	  {
228 		 /* initialization: intercept will not be updated
229 		  * CDP_hw_intercept = CDP_hw_last_intercept; just need to
230 		  * subtract this baseline value. */
231 #ifdef DEBUG
232 		 fprintf(stderr,"Initialization of seasonal coef %lu\n",
233 			rrd -> rra_ptr[rra_idx].cur_row);
234 #endif
235 		 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val -= coefs[CDP_hw_intercept].u_val;
236 	  } else {
237 	         /* Note that we must get CDP_scratch_idx from SEASONAL array, as CDP_scratch_idx
238 	          * for HWPREDICT array will be DNAN. */
239 	     intercept = (hw_rra -> par[RRA_hw_alpha].u_val)*
240 		    (rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val - seasonal)
241 		    + (1 - hw_rra -> par[RRA_hw_alpha].u_val)*(coefs[CDP_hw_intercept].u_val
242 		    + (coefs[CDP_hw_slope].u_val)*(coefs[CDP_null_count].u_cnt));
243 		 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =
244 		   (current_rra -> par[RRA_seasonal_gamma].u_val)*
245 		   (rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val - intercept) +
246 		   (1 - current_rra -> par[RRA_seasonal_gamma].u_val)*seasonal;
247 	  }
248    }
249 #ifdef DEBUG
250    fprintf(stderr,"seasonal coefficient set= %f\n",
251          rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val);
252 #endif
253    return 0;
254 }
255 
256 int
update_devpredict(rrd_t * rrd,unsigned long cdp_idx,unsigned long rra_idx,unsigned long ds_idx,unsigned short CDP_scratch_idx)257 update_devpredict(rrd_t *rrd, unsigned long cdp_idx,
258 				  unsigned long rra_idx, unsigned long ds_idx, unsigned short CDP_scratch_idx)
259 {
260    /* there really isn't any "update" here; the only reason this information
261     * is stored separately from DEVSEASONAL is to preserve deviation predictions
262     * for a longer duration than one seasonal cycle. */
263    unsigned long seasonal_cdp_idx = (rrd -> rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt)
264       * (rrd -> stat_head -> ds_cnt) + ds_idx;
265 
266    if (rrd -> rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt < rra_idx)
267    {
268 	  /* associated DEVSEASONAL array already updated */
269 	  rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val
270 		 = rrd -> cdp_prep[seasonal_cdp_idx].scratch[CDP_last_seasonal_deviation].u_val;
271    } else {
272 	  /* associated DEVSEASONAL not yet updated */
273 	  rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val
274 		 = rrd -> cdp_prep[seasonal_cdp_idx].scratch[CDP_seasonal_deviation].u_val;
275    }
276    return 0;
277 }
278 
279 int
update_devseasonal(rrd_t * rrd,unsigned long cdp_idx,unsigned long rra_idx,unsigned long ds_idx,unsigned short CDP_scratch_idx,rrd_value_t * seasonal_dev)280 update_devseasonal(rrd_t *rrd, unsigned long cdp_idx, unsigned long rra_idx,
281 	               unsigned long ds_idx, unsigned short CDP_scratch_idx,
282 				   rrd_value_t *seasonal_dev)
283 {
284    rrd_value_t prediction = 0, seasonal_coef = DNAN;
285    rra_def_t *current_rra = &(rrd -> rra_def[rra_idx]);
286    /* obtain cdp_prep index for HWPREDICT */
287    unsigned long hw_rra_idx = current_rra -> par[RRA_dependent_rra_idx].u_cnt;
288    unsigned long hw_cdp_idx = hw_rra_idx * (rrd -> stat_head -> ds_cnt) + ds_idx;
289    unsigned long seasonal_cdp_idx;
290    unival *coefs = rrd -> cdp_prep[hw_cdp_idx].scratch;
291 
292    rrd -> cdp_prep[cdp_idx].scratch[CDP_last_seasonal_deviation].u_val =
293 	  rrd -> cdp_prep[cdp_idx].scratch[CDP_seasonal_deviation].u_val;
294    /* retrieve the next seasonal deviation value, could be NA */
295    rrd -> cdp_prep[cdp_idx].scratch[CDP_seasonal_deviation].u_val =
296 	  seasonal_dev[ds_idx];
297 
298    /* retrieve the current seasonal_coef (not to be confused with the
299 	* current seasonal deviation). Could make this more readable by introducing
300 	* some wrapper functions. */
301    seasonal_cdp_idx = (rrd -> rra_def[hw_rra_idx].par[RRA_dependent_rra_idx].u_cnt)
302 	  *(rrd -> stat_head -> ds_cnt) + ds_idx;
303    if (rrd -> rra_def[hw_rra_idx].par[RRA_dependent_rra_idx].u_cnt < rra_idx)
304 	  /* SEASONAL array already updated */
305 	  seasonal_coef = rrd -> cdp_prep[seasonal_cdp_idx].scratch[CDP_hw_last_seasonal].u_val;
306    else
307 	  /* SEASONAL array not yet updated */
308 	  seasonal_coef = rrd -> cdp_prep[seasonal_cdp_idx].scratch[CDP_hw_seasonal].u_val;
309 
310    /* compute the abs value of the difference between the prediction and
311 	* observed value */
312    if (hw_rra_idx < rra_idx)
313    {
314 	  /* associated HWPREDICT has already been updated */
315 	  if (isnan(coefs[CDP_hw_last_intercept].u_val) ||
316 	      isnan(coefs[CDP_hw_last_slope].u_val) ||
317 	      isnan(seasonal_coef))
318 	  {
319 		 /* one of the prediction values is uinitialized */
320 		 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = DNAN;
321 		 return 0;
322 	  } else {
323 	     prediction = coefs[CDP_hw_last_intercept].u_val +
324 		   (coefs[CDP_hw_last_slope].u_val)*(coefs[CDP_last_null_count].u_cnt)
325 		   + seasonal_coef;
326 	  }
327    } else {
328 	  /* associated HWPREDICT has NOT been updated */
329 	  if (isnan(coefs[CDP_hw_intercept].u_val) ||
330 	      isnan(coefs[CDP_hw_slope].u_val) ||
331 	      isnan(seasonal_coef))
332 	  {
333 		 /* one of the prediction values is uinitialized */
334 		 rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = DNAN;
335 		 return 0;
336 	  } else {
337 	     prediction = coefs[CDP_hw_intercept].u_val +
338 		   (coefs[CDP_hw_slope].u_val)*(coefs[CDP_null_count].u_cnt)
339 		   + seasonal_coef;
340 	  }
341    }
342 
343    if (isnan(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val))
344    {
345       /* no update, store existing value unchanged, doesn't
346 	   * matter if it is NA */
347 	  rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =
348 		 rrd -> cdp_prep[cdp_idx].scratch[CDP_last_seasonal_deviation].u_val;
349    } else if (isnan(rrd -> cdp_prep[cdp_idx].scratch[CDP_last_seasonal_deviation].u_val))
350    {
351 	  /* initialization */
352 #ifdef DEBUG
353 	  fprintf(stderr,"Initialization of seasonal deviation\n");
354 #endif
355 	  rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =
356 	     fabs(prediction - rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val);
357    } else {
358 	  /* exponential smoothing update */
359 	  rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val =
360 	    (rrd -> rra_def[rra_idx].par[RRA_seasonal_gamma].u_val)*
361 	    fabs(prediction - rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val)
362 	    + (1 -  rrd -> rra_def[rra_idx].par[RRA_seasonal_gamma].u_val)*
363 	    (rrd -> cdp_prep[cdp_idx].scratch[CDP_last_seasonal_deviation].u_val);
364    }
365    return 0;
366 }
367 
368 /* Check for a failure based on a threshold # of violations within the specified
369  * window. */
370 int
update_failures(rrd_t * rrd,unsigned long cdp_idx,unsigned long rra_idx,unsigned long ds_idx,unsigned short CDP_scratch_idx)371 update_failures(rrd_t *rrd, unsigned long cdp_idx, unsigned long rra_idx,
372 				unsigned long ds_idx, unsigned short CDP_scratch_idx)
373 {
374    /* detection of a violation depends on 3 RRAs:
375 	* HWPREDICT, SEASONAL, and DEVSEASONAL */
376    rra_def_t *current_rra = &(rrd -> rra_def[rra_idx]);
377    unsigned long dev_rra_idx = current_rra -> par[RRA_dependent_rra_idx].u_cnt;
378    rra_def_t *dev_rra = &(rrd -> rra_def[dev_rra_idx]);
379    unsigned long hw_rra_idx = dev_rra -> par[RRA_dependent_rra_idx].u_cnt;
380    rra_def_t *hw_rra =  &(rrd -> rra_def[hw_rra_idx]);
381    unsigned long seasonal_rra_idx = hw_rra -> par[RRA_dependent_rra_idx].u_cnt;
382    unsigned long temp_cdp_idx;
383    rrd_value_t deviation = DNAN;
384    rrd_value_t seasonal_coef = DNAN;
385    rrd_value_t prediction = DNAN;
386    char violation = 0;
387    unsigned short violation_cnt = 0, i;
388    char *violations_array;
389 
390    /* usual checks to determine the order of the RRAs */
391    temp_cdp_idx = dev_rra_idx * (rrd -> stat_head -> ds_cnt) + ds_idx;
392    if (rra_idx < seasonal_rra_idx)
393    {
394 	  /* DEVSEASONAL not yet updated */
395 	  deviation = rrd -> cdp_prep[temp_cdp_idx].scratch[CDP_seasonal_deviation].u_val;
396    } else {
397 	  /* DEVSEASONAL already updated */
398 	  deviation = rrd -> cdp_prep[temp_cdp_idx].scratch[CDP_last_seasonal_deviation].u_val;
399    }
400    if (!isnan(deviation)) {
401 
402    temp_cdp_idx = seasonal_rra_idx * (rrd -> stat_head -> ds_cnt) + ds_idx;
403    if (rra_idx < seasonal_rra_idx)
404    {
405 	  /* SEASONAL not yet updated */
406 	  seasonal_coef = rrd -> cdp_prep[temp_cdp_idx].scratch[CDP_hw_seasonal].u_val;
407    } else {
408 	  /* SEASONAL already updated */
409 	  seasonal_coef = rrd -> cdp_prep[temp_cdp_idx].scratch[CDP_hw_last_seasonal].u_val;
410    }
411    /* in this code block, we know seasonal coef is not DNAN, because deviation is not
412 	* null */
413 
414    temp_cdp_idx = hw_rra_idx * (rrd -> stat_head -> ds_cnt) + ds_idx;
415    if (rra_idx < hw_rra_idx)
416    {
417 	  /* HWPREDICT not yet updated */
418 	  prediction = rrd -> cdp_prep[temp_cdp_idx].scratch[CDP_hw_intercept].u_val +
419 	     (rrd -> cdp_prep[temp_cdp_idx].scratch[CDP_hw_slope].u_val)
420 		 *(rrd -> cdp_prep[temp_cdp_idx].scratch[CDP_null_count].u_cnt)
421 		 + seasonal_coef;
422    } else {
423 	  /* HWPREDICT already updated */
424 	  prediction = rrd -> cdp_prep[temp_cdp_idx].scratch[CDP_hw_last_intercept].u_val +
425 	     (rrd -> cdp_prep[temp_cdp_idx].scratch[CDP_hw_last_slope].u_val)
426 		 *(rrd -> cdp_prep[temp_cdp_idx].scratch[CDP_last_null_count].u_cnt)
427 		 + seasonal_coef;
428    }
429 
430    /* determine if the observed value is a violation */
431    if (!isnan(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val))
432    {
433 	  if (rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val > prediction +
434 		 (current_rra -> par[RRA_delta_pos].u_val)*deviation
435 	     || rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val < prediction -
436 		 (current_rra -> par[RRA_delta_neg].u_val)*deviation)
437 		 violation = 1;
438    } else {
439 	  violation = 1; /* count DNAN values as violations */
440    }
441 
442    }
443 
444    /* determine if a failure has occurred and update the failure array */
445    violation_cnt = violation;
446    violations_array = (char *) ((void *) rrd -> cdp_prep[cdp_idx].scratch);
447    for (i = current_rra -> par[RRA_window_len].u_cnt; i > 1; i--)
448    {
449 	  /* shift */
450 	  violations_array[i-1] = violations_array[i-2];
451 	  violation_cnt += violations_array[i-1];
452    }
453    violations_array[0] = violation;
454 
455    if (violation_cnt < current_rra -> par[RRA_failure_threshold].u_cnt)
456 	  /* not a failure */
457 	  rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = 0.0;
458    else
459 	  rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = 1.0;
460 
461    return (rrd-> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val);
462 }
463 
464 /* For the specified CDP prep area and the FAILURES RRA,
465  * erase all history of past violations.
466  */
467 void
erase_violations(rrd_t * rrd,unsigned long cdp_idx,unsigned long rra_idx)468 erase_violations(rrd_t *rrd, unsigned long cdp_idx, unsigned long rra_idx)
469 {
470    unsigned short i;
471    char *violations_array;
472    /* check that rra_idx is a CF_FAILURES array */
473    if (cf_conv(rrd -> rra_def[rra_idx].cf_nam) != CF_FAILURES)
474    {
475 #ifdef DEBUG
476 	  fprintf(stderr,"erase_violations called for non-FAILURES RRA: %s\n",
477 	     rrd -> rra_def[rra_idx].cf_nam);
478 #endif
479 	  return;
480    }
481 
482 #ifdef DEBUG
483    fprintf(stderr,"scratch buffer before erase:\n");
484    for (i = 0; i < MAX_CDP_PAR_EN; i++)
485    {
486 	  fprintf(stderr,"%lu ", rrd -> cdp_prep[cdp_idx].scratch[i].u_cnt);
487    }
488    fprintf(stderr,"\n");
489 #endif
490 
491    /* WARNING: an array of longs on disk is treated as an array of chars
492     * in memory. */
493    violations_array = (char *) ((void *) rrd -> cdp_prep[cdp_idx].scratch);
494    /* erase everything in the part of the CDP scratch array that will be
495     * used to store violations for the current window */
496    for (i = rrd -> rra_def[rra_idx].par[RRA_window_len].u_cnt; i > 0; i--)
497    {
498 	  violations_array[i-1] = 0;
499    }
500 #ifdef DEBUG
501    fprintf(stderr,"scratch buffer after erase:\n");
502    for (i = 0; i < MAX_CDP_PAR_EN; i++)
503    {
504 	  fprintf(stderr,"%lu ", rrd -> cdp_prep[cdp_idx].scratch[i].u_cnt);
505    }
506    fprintf(stderr,"\n");
507 #endif
508 }
509 
510 /* Smooth a periodic array with a moving average: equal weights and
511  * length = 5% of the period. */
512 int
apply_smoother(rrd_t * rrd,unsigned long rra_idx,unsigned long rra_start,FILE * rrd_file)513 apply_smoother(rrd_t *rrd, unsigned long rra_idx, unsigned long rra_start,
514                FILE *rrd_file)
515 {
516    unsigned long i, j, k;
517    unsigned long totalbytes;
518    rrd_value_t *rrd_values;
519    unsigned long row_length = rrd -> stat_head -> ds_cnt;
520    unsigned long row_count = rrd -> rra_def[rra_idx].row_cnt;
521    unsigned long offset;
522    FIFOqueue **buffers;
523    rrd_value_t *working_average;
524    rrd_value_t *baseline;
525 
526    offset = floor(0.025*row_count);
527    if (offset == 0) return 0; /* no smoothing */
528 
529    /* allocate memory */
530    totalbytes = sizeof(rrd_value_t)*row_length*row_count;
531    rrd_values = (rrd_value_t *) malloc(totalbytes);
532    if (rrd_values == NULL)
533    {
534 	  rrd_set_error("apply smoother: memory allocation failure");
535 	  return -1;
536    }
537 
538    /* rra_start is at the beginning of this rra */
539    if (fseek(rrd_file,rra_start,SEEK_SET))
540    {
541 	  rrd_set_error("seek to rra %d failed", rra_start);
542 	  free(rrd_values);
543 	  return -1;
544    }
545    fflush(rrd_file);
546    /* could read all data in a single block, but we need to
547     * check for NA values */
548    for (i = 0; i < row_count; ++i)
549    {
550 	  for (j = 0; j < row_length; ++j)
551 	  {
552 		 fread(&(rrd_values[i*row_length + j]),sizeof(rrd_value_t),1,rrd_file);
553 		 /* should check fread for errors... */
554 		 if (isnan(rrd_values[i*row_length + j])) {
555 			/* can't apply smoothing, still uninitialized values */
556 #ifdef DEBUG
557 			fprintf(stderr,"apply_smoother: NA detected in seasonal array: %ld %ld\n",i,j);
558 #endif
559 			free(rrd_values);
560 			return 0;
561 		 }
562 	  }
563    }
564 
565    /* allocate queues, one for each data source */
566    buffers = (FIFOqueue **) malloc(sizeof(FIFOqueue *)*row_length);
567    for (i = 0; i < row_length; ++i)
568    {
569       queue_alloc(&(buffers[i]),2*offset + 1);
570    }
571    /* need working average initialized to 0 */
572    working_average = (rrd_value_t *) calloc(row_length,sizeof(rrd_value_t));
573    baseline = (rrd_value_t *) calloc(row_length,sizeof(rrd_value_t));
574 
575    /* compute sums of the first 2*offset terms */
576    for (i = 0; i < 2*offset; ++i)
577    {
578 	  k = MyMod(i - offset,row_count);
579 	  for (j = 0; j < row_length; ++j)
580 	  {
581 		 queue_push(buffers[j],rrd_values[k*row_length + j]);
582 		 working_average[j] += rrd_values[k*row_length + j];
583 	  }
584    }
585 
586    /* compute moving averages */
587    for (i = offset; i < row_count + offset; ++i)
588    {
589 	  for (j = 0; j < row_length; ++j)
590 	  {
591 	     k = MyMod(i,row_count);
592 	     /* add a term to the sum */
593 	     working_average[j] += rrd_values[k*row_length + j];
594 	     queue_push(buffers[j],rrd_values[k*row_length + j]);
595 
596 	     /* reset k to be the center of the window */
597 	     k = MyMod(i - offset,row_count);
598 	     /* overwrite rdd_values entry, the old value is already
599 	      * saved in buffers */
600 	     rrd_values[k*row_length + j] = working_average[j]/(2*offset + 1);
601 	     baseline[j] += rrd_values[k*row_length + j];
602 
603 	     /* remove a term from the sum */
604 	     working_average[j] -= queue_pop(buffers[j]);
605 	  }
606    }
607 
608    for (i = 0; i < row_length; ++i)
609    {
610 	  queue_dealloc(buffers[i]);
611 	  baseline[i] /= row_count;
612    }
613    free(buffers);
614    free(working_average);
615 
616    if (cf_conv(rrd->rra_def[rra_idx].cf_nam) == CF_SEASONAL) {
617    for (j = 0; j < row_length; ++j)
618    {
619    for (i = 0; i < row_count; ++i)
620    {
621 	 rrd_values[i*row_length + j] -= baseline[j];
622    }
623 	 /* update the baseline coefficient,
624 	  * first, compute the cdp_index. */
625 	 offset = (rrd->rra_def[rra_idx].par[RRA_dependent_rra_idx].u_cnt)
626 	  * row_length + j;
627 	 (rrd->cdp_prep[offset]).scratch[CDP_hw_intercept].u_val += baseline[j];
628    }
629    /* flush cdp to disk */
630    fflush(rrd_file);
631    if (fseek(rrd_file,sizeof(stat_head_t) +
632 	  rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
633 	  rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
634 	  sizeof(live_head_t) +
635 	  rrd->stat_head->ds_cnt * sizeof(pdp_prep_t),SEEK_SET))
636    {
637 	  rrd_set_error("apply_smoother: seek to cdp_prep failed");
638 	  free(rrd_values);
639 	  return -1;
640    }
641    if (fwrite( rrd -> cdp_prep,
642 	  sizeof(cdp_prep_t),
643 	  (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt, rrd_file)
644 	  != (rrd->stat_head->rra_cnt) * (rrd->stat_head->ds_cnt) )
645    {
646 	  rrd_set_error("apply_smoother: cdp_prep write failed");
647 	  free(rrd_values);
648 	  return -1;
649    }
650    } /* endif CF_SEASONAL */
651 
652    /* flush updated values to disk */
653    fflush(rrd_file);
654    if (fseek(rrd_file,rra_start,SEEK_SET))
655    {
656 	  rrd_set_error("apply_smoother: seek to pos %d failed", rra_start);
657 	  free(rrd_values);
658 	  return -1;
659    }
660    /* write as a single block */
661    if (fwrite(rrd_values,sizeof(rrd_value_t),row_length*row_count,rrd_file)
662 	  != row_length*row_count)
663    {
664 	  rrd_set_error("apply_smoother: write failed to %lu",rra_start);
665 	  free(rrd_values);
666 	  return -1;
667    }
668 
669    fflush(rrd_file);
670    free(rrd_values);
671    free(baseline);
672    return 0;
673 }
674 
675 /* Reset aberrant behavior model coefficients, including intercept, slope,
676  * seasonal, and seasonal deviation for the specified data source. */
677 void
reset_aberrant_coefficients(rrd_t * rrd,FILE * rrd_file,unsigned long ds_idx)678 reset_aberrant_coefficients(rrd_t *rrd, FILE *rrd_file, unsigned long ds_idx)
679 {
680    unsigned long cdp_idx, rra_idx, i;
681    unsigned long cdp_start, rra_start;
682    rrd_value_t nan_buffer = DNAN;
683 
684    /* compute the offset for the cdp area */
685    cdp_start = sizeof(stat_head_t) +
686 	  rrd->stat_head->ds_cnt * sizeof(ds_def_t) +
687 	  rrd->stat_head->rra_cnt * sizeof(rra_def_t) +
688 	  sizeof(live_head_t) +
689 	  rrd->stat_head->ds_cnt * sizeof(pdp_prep_t);
690    /* compute the offset for the first rra */
691    rra_start = cdp_start +
692 	  (rrd->stat_head->ds_cnt) * (rrd->stat_head->rra_cnt) * sizeof(cdp_prep_t) +
693 	  rrd->stat_head->rra_cnt * sizeof(rra_ptr_t);
694 
695    /* loop over the RRAs */
696    for (rra_idx = 0; rra_idx < rrd -> stat_head -> rra_cnt; rra_idx++)
697    {
698 	  cdp_idx = rra_idx * (rrd-> stat_head-> ds_cnt) + ds_idx;
699 	  switch (cf_conv(rrd -> rra_def[rra_idx].cf_nam))
700 	  {
701 		 case CF_HWPREDICT:
702 	        init_hwpredict_cdp(&(rrd -> cdp_prep[cdp_idx]));
703 			break;
704 		 case CF_SEASONAL:
705 		 case CF_DEVSEASONAL:
706 			/* don't use init_seasonal because it will reset burn-in, which
707 			 * means different data sources will be calling for the smoother
708 			 * at different times. */
709 	        rrd->cdp_prep[cdp_idx].scratch[CDP_hw_seasonal].u_val = DNAN;
710 	        rrd->cdp_prep[cdp_idx].scratch[CDP_hw_last_seasonal].u_val = DNAN;
711 			/* move to first entry of data source for this rra */
712 			fseek(rrd_file,rra_start + ds_idx * sizeof(rrd_value_t),SEEK_SET);
713 			/* entries for the same data source are not contiguous,
714 			 * temporal entries are contiguous */
715 	        for (i = 0; i < rrd->rra_def[rra_idx].row_cnt; ++i)
716 			{
717 			   if (fwrite(&nan_buffer,sizeof(rrd_value_t),1,rrd_file) != 1)
718 			   {
719                   rrd_set_error(
720 				  "reset_aberrant_coefficients: write failed data source %lu rra %s",
721 				  ds_idx,rrd->rra_def[rra_idx].cf_nam);
722 				  return;
723 			   }
724 			   fseek(rrd_file,(rrd->stat_head->ds_cnt - 1) *
725 				  sizeof(rrd_value_t),SEEK_CUR);
726 			}
727 			break;
728 		 case CF_FAILURES:
729 			erase_violations(rrd,cdp_idx,rra_idx);
730 			break;
731 		 default:
732 			break;
733 	  }
734 	  /* move offset to the next rra */
735 	  rra_start += rrd->rra_def[rra_idx].row_cnt * rrd->stat_head->ds_cnt *
736 		 sizeof(rrd_value_t);
737    }
738    fseek(rrd_file,cdp_start,SEEK_SET);
739    if (fwrite( rrd -> cdp_prep,
740 	  sizeof(cdp_prep_t),
741 	  (rrd->stat_head->rra_cnt) * rrd->stat_head->ds_cnt, rrd_file)
742 	  != (rrd->stat_head->rra_cnt) * (rrd->stat_head->ds_cnt) )
743    {
744 	  rrd_set_error("reset_aberrant_coefficients: cdp_prep write failed");
745 	  return;
746    }
747 }
748 
init_hwpredict_cdp(cdp_prep_t * cdp)749 void init_hwpredict_cdp(cdp_prep_t *cdp)
750 {
751    cdp->scratch[CDP_hw_intercept].u_val = DNAN;
752    cdp->scratch[CDP_hw_last_intercept].u_val = DNAN;
753    cdp->scratch[CDP_hw_slope].u_val = DNAN;
754    cdp->scratch[CDP_hw_last_slope].u_val = DNAN;
755    cdp->scratch[CDP_null_count].u_cnt = 1;
756    cdp->scratch[CDP_last_null_count].u_cnt = 1;
757 }
758 
init_seasonal_cdp(cdp_prep_t * cdp)759 void init_seasonal_cdp(cdp_prep_t *cdp)
760 {
761    cdp->scratch[CDP_hw_seasonal].u_val = DNAN;
762    cdp->scratch[CDP_hw_last_seasonal].u_val = DNAN;
763    cdp->scratch[CDP_init_seasonal].u_cnt = 1;
764 }
765 
766 int
update_aberrant_CF(rrd_t * rrd,rrd_value_t pdp_val,enum cf_en current_cf,unsigned long cdp_idx,unsigned long rra_idx,unsigned long ds_idx,unsigned short CDP_scratch_idx,rrd_value_t * seasonal_coef)767 update_aberrant_CF(rrd_t *rrd, rrd_value_t pdp_val, enum cf_en current_cf,
768 		  unsigned long cdp_idx, unsigned long rra_idx, unsigned long ds_idx,
769 		  unsigned short CDP_scratch_idx, rrd_value_t *seasonal_coef)
770 {
771    rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val = pdp_val;
772    switch (current_cf) {
773    case CF_AVERAGE:
774    default:
775 	 return 0;
776    case CF_HWPREDICT:
777      return update_hwpredict(rrd,cdp_idx,rra_idx,ds_idx,CDP_scratch_idx);
778    case CF_DEVPREDICT:
779 	 return update_devpredict(rrd,cdp_idx,rra_idx,ds_idx,CDP_scratch_idx);
780    case CF_SEASONAL:
781      return update_seasonal(rrd,cdp_idx,rra_idx,ds_idx,CDP_scratch_idx,seasonal_coef);
782    case CF_DEVSEASONAL:
783      return update_devseasonal(rrd,cdp_idx,rra_idx,ds_idx,CDP_scratch_idx,seasonal_coef);
784    case CF_FAILURES:
785      return update_failures(rrd,cdp_idx,rra_idx,ds_idx,CDP_scratch_idx);
786    }
787    return -1;
788 }
789 
MyMod(signed long val,unsigned long mod)790 unsigned long MyMod(signed long val, unsigned long mod)
791 {
792    unsigned long new_val;
793    if (val < 0)
794      new_val = ((unsigned long) abs(val)) % mod;
795    else
796      new_val = (val % mod);
797 
798    if (val < 0)
799      return (mod - new_val);
800    else
801      return (new_val);
802 }
803 
804 /* a standard fixed-capacity FIF0 queue implementation
805  * No overflow checking is performed. */
queue_alloc(FIFOqueue ** q,int capacity)806 int queue_alloc(FIFOqueue **q,int capacity)
807 {
808    *q = (FIFOqueue *) malloc(sizeof(FIFOqueue));
809    if (*q == NULL) return -1;
810    (*q) -> queue = (rrd_value_t *) malloc(sizeof(rrd_value_t)*capacity);
811    if ((*q) -> queue == NULL)
812    {
813 	  free(*q);
814 	  return -1;
815    }
816    (*q) -> capacity = capacity;
817    (*q) -> head = capacity;
818    (*q) -> tail = 0;
819    return 0;
820 }
821 
queue_isempty(FIFOqueue * q)822 int queue_isempty(FIFOqueue *q)
823 {
824    return (q -> head % q -> capacity == q -> tail);
825 }
826 
queue_push(FIFOqueue * q,rrd_value_t value)827 void queue_push(FIFOqueue *q, rrd_value_t value)
828 {
829    q -> queue[(q -> tail)++] = value;
830    q -> tail = q -> tail % q -> capacity;
831 }
832 
queue_pop(FIFOqueue * q)833 rrd_value_t queue_pop(FIFOqueue *q)
834 {
835    q -> head = q -> head % q -> capacity;
836    return q -> queue[(q -> head)++];
837 }
838 
queue_dealloc(FIFOqueue * q)839 void queue_dealloc(FIFOqueue *q)
840 {
841    free(q -> queue);
842    free(q);
843 }
844