1 /*****************************************************************************
2  * RRDtool 1.2.30  Copyright by Tobi Oetiker, 1997-2009
3  *****************************************************************************
4  * rrd_update.c  RRD Update Function
5  *****************************************************************************
6  * $Id: rrd_update.c 1735 2009-01-19 14:29:11Z oetiker $
7  *****************************************************************************/
8 
9 #include "rrd_tool.h"
10 #include <sys/types.h>
11 #include <fcntl.h>
12 
13 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
14  #include <sys/locking.h>
15  #include <sys/stat.h>
16  #include <io.h>
17 #endif
18 
19 #include "rrd_hw.h"
20 #include "rrd_rpncalc.h"
21 
22 #include "rrd_is_thread_safe.h"
23 #include "unused.h"
24 
25 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
26 /*
27  * WIN32 does not have gettimeofday	and struct timeval. This is a quick and dirty
28  * replacement.
29  */
30 #include <sys/timeb.h>
31 
32 #if (defined(__MINGW32__) && \
33        ((__MINGW32_MAJOR_VERSION == 3 && __MINGW32_MINOR_VERSION >= 12) || __MINGW32_MAJOR_VERSION > 3))
34 #include <sys/time.h>
35 #else
36 
37 struct timeval {
38 	time_t tv_sec; /* seconds */
39 	long tv_usec;  /* microseconds */
40 };
41 
42 struct __timezone {
43 	int  tz_minuteswest; /* minutes W of Greenwich */
44 	int  tz_dsttime;     /* type of dst correction */
45 };
46 
gettimeofday(struct timeval * t,struct __timezone * tz)47 static int gettimeofday(struct timeval *t, struct __timezone *tz) {
48 
49 	struct _timeb current_time;
50 
51 	_ftime(&current_time);
52 
53 	t->tv_sec  = current_time.time;
54 	t->tv_usec = current_time.millitm * 1000;
55 
56 	return 0;
57 }
58 
59 #endif /* mingw32 3.4.5 */
60 #endif
61 
62 /*
63  * normilize time as returned by gettimeofday. usec part must
64  * be always >= 0
65  */
normalize_time(struct timeval * t)66 static void normalize_time(struct timeval *t)
67 {
68 	if(t->tv_usec < 0) {
69 		t->tv_sec--;
70 		t->tv_usec += 1000000L;
71 	}
72 }
73 
74 /* Local prototypes */
75 int LockRRD(FILE *rrd_file);
76 #ifdef HAVE_MMAP
77 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
78 					unsigned long *rra_current,
79 					unsigned short CDP_scratch_idx,
80 #ifndef DEBUG
81 FILE UNUSED(*rrd_file),
82 #else
83 FILE *rrd_file,
84 #endif
85 					info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file);
86 #else
87 info_t *write_RRA_row (rrd_t *rrd, unsigned long rra_idx,
88 					unsigned long *rra_current,
89 					unsigned short CDP_scratch_idx, FILE *rrd_file,
90 					info_t *pcdp_summary, time_t *rra_time);
91 #endif
92 int rrd_update_r(const char *filename, const char *tmplt, int argc, const char **argv);
93 int _rrd_update(const char *filename, const char *tmplt, int argc, const char **argv,
94 					info_t*);
95 
96 #define IFDNAN(X,Y) (isnan(X) ? (Y) : (X));
97 
98 
rrd_update_v(int argc,char ** argv)99 info_t *rrd_update_v(int argc, char **argv)
100 {
101     char             *tmplt = NULL;
102 	info_t *result = NULL;
103 	infoval rc;
104       rc.u_int = -1;
105     optind = 0; opterr = 0;  /* initialize getopt */
106 
107     while (1) {
108 		static struct option long_options[] =
109 			{
110 				{"template",      required_argument, 0, 't'},
111 				{0,0,0,0}
112 			};
113 		int option_index = 0;
114 		int opt;
115 		opt = getopt_long(argc, argv, "t:",
116 						  long_options, &option_index);
117 
118 		if (opt == EOF)
119 			break;
120 
121 		switch(opt) {
122 		case 't':
123 			tmplt = optarg;
124 			break;
125 
126 		case '?':
127 			rrd_set_error("unknown option '%s'",argv[optind-1]);
128 			goto end_tag;
129 		}
130     }
131 
132     /* need at least 2 arguments: filename, data. */
133     if (argc-optind < 2) {
134 		rrd_set_error("Not enough arguments");
135 		goto end_tag;
136     }
137     rc.u_int = 0;
138     result = info_push(NULL,sprintf_alloc("return_value"),RD_I_INT,rc);
139    	rc.u_int = _rrd_update(argv[optind], tmplt,
140 		      argc - optind - 1, (const char **)(argv + optind + 1), result);
141     result->value.u_int = rc.u_int;
142 end_tag:
143     return result;
144 }
145 
146 int
rrd_update(int argc,char ** argv)147 rrd_update(int argc, char **argv)
148 {
149     char             *tmplt = NULL;
150     int rc;
151     optind = 0; opterr = 0;  /* initialize getopt */
152 
153     while (1) {
154 		static struct option long_options[] =
155 			{
156 				{"template",      required_argument, 0, 't'},
157 				{0,0,0,0}
158 			};
159 		int option_index = 0;
160 		int opt;
161 		opt = getopt_long(argc, argv, "t:",
162 						  long_options, &option_index);
163 
164 		if (opt == EOF)
165 			break;
166 
167 		switch(opt) {
168 		case 't':
169 			tmplt = optarg;
170 			break;
171 
172 		case '?':
173 			rrd_set_error("unknown option '%s'",argv[optind-1]);
174 			return(-1);
175 		}
176     }
177 
178     /* need at least 2 arguments: filename, data. */
179     if (argc-optind < 2) {
180 		rrd_set_error("Not enough arguments");
181 
182 		return -1;
183     }
184 
185    	rc = rrd_update_r(argv[optind], tmplt,
186 		      argc - optind - 1, (const char **)(argv + optind + 1));
187     return rc;
188 }
189 
190 int
rrd_update_r(const char * filename,const char * tmplt,int argc,const char ** argv)191 rrd_update_r(const char *filename, const char *tmplt, int argc, const char **argv)
192 {
193    return _rrd_update(filename, tmplt, argc, argv, NULL);
194 }
195 
196 int
_rrd_update(const char * filename,const char * tmplt,int argc,const char ** argv,info_t * pcdp_summary)197 _rrd_update(const char *filename, const char *tmplt, int argc, const char **argv,
198    info_t *pcdp_summary)
199 {
200 
201     int              arg_i = 2;
202     short            j;
203     unsigned long    i,ii,iii=1;
204 
205     unsigned long    rra_begin;          /* byte pointer to the rra
206 					  * area in the rrd file.  this
207 					  * pointer never changes value */
208     unsigned long    rra_start;          /* byte pointer to the rra
209 					  * area in the rrd file.  this
210 					  * pointer changes as each rrd is
211 					  * processed. */
212     unsigned long    rra_current;        /* byte pointer to the current write
213 					  * spot in the rrd file. */
214     unsigned long    rra_pos_tmp;        /* temporary byte pointer. */
215     double           interval,
216                      pre_int,post_int;   /* interval between this and
217 					  * the last run */
218     unsigned long    proc_pdp_st;        /* which pdp_st was the last
219 					  * to be processed */
220     unsigned long    occu_pdp_st;        /* when was the pdp_st
221 					  * before the last update
222 					  * time */
223     unsigned long    proc_pdp_age;       /* how old was the data in
224 					  * the pdp prep area when it
225 					  * was last updated */
226     unsigned long    occu_pdp_age;       /* how long ago was the last
227 					  * pdp_step time */
228     rrd_value_t      *pdp_new;           /* prepare the incoming data
229 					  * to be added the the
230 					  * existing entry */
231     rrd_value_t      *pdp_temp;          /* prepare the pdp values
232 					  * to be added the the
233 					  * cdp values */
234 
235     long             *tmpl_idx;          /* index representing the settings
236 					    transported by the tmplt index */
237     unsigned long    tmpl_cnt = 2;       /* time and data */
238 
239     FILE             *rrd_file;
240     rrd_t            rrd;
241     time_t           current_time = 0;
242     time_t           rra_time = 0;  	 /* time of update for a RRA */
243     unsigned long    current_time_usec=0;/* microseconds part of current time */
244     struct timeval   tmp_time;           /* used for time conversion */
245 
246     char             **updvals;
247     int              schedule_smooth = 0;
248 	rrd_value_t      *seasonal_coef = NULL, *last_seasonal_coef = NULL;
249 					 /* a vector of future Holt-Winters seasonal coefs */
250     unsigned long    elapsed_pdp_st;
251 					 /* number of elapsed PDP steps since last update */
252     unsigned long    *rra_step_cnt = NULL;
253 					 /* number of rows to be updated in an RRA for a data
254 					  * value. */
255     unsigned long    start_pdp_offset;
256 					 /* number of PDP steps since the last update that
257 					  * are assigned to the first CDP to be generated
258 					  * since the last update. */
259     unsigned short   scratch_idx;
260 					 /* index into the CDP scratch array */
261     enum cf_en       current_cf;
262 					 /* numeric id of the current consolidation function */
263     rpnstack_t       rpnstack; /* used for COMPUTE DS */
264     int		     version;  /* rrd version */
265     char             *endptr; /* used in the conversion */
266 
267 #ifdef HAVE_MMAP
268     void	     *rrd_mmaped_file;
269     unsigned long    rrd_filesize;
270 #endif
271 
272 
273     rpnstack_init(&rpnstack);
274 
275     /* need at least 1 arguments: data. */
276     if (argc < 1) {
277 	rrd_set_error("Not enough arguments");
278 	return -1;
279     }
280 
281 
282 
283     if(rrd_open(filename,&rrd_file,&rrd, RRD_READWRITE)==-1){
284 	return -1;
285     }
286 
287     /* initialize time */
288     version = atoi(rrd.stat_head->version);
289     gettimeofday(&tmp_time, 0);
290     normalize_time(&tmp_time);
291     current_time = tmp_time.tv_sec;
292     if(version >= 3) {
293         current_time_usec = tmp_time.tv_usec;
294     }
295     else {
296 	current_time_usec = 0;
297     }
298 
299     rra_current = rra_start = rra_begin = ftell(rrd_file);
300     /* This is defined in the ANSI C standard, section 7.9.5.3:
301 
302         When a file is opened with udpate mode ('+' as the second
303         or third character in the ... list of mode argument
304         variables), both input and ouptut may be performed on the
305         associated stream.  However, ...  input may not be directly
306         followed by output without an intervening call to a file
307         positioning function, unless the input oepration encounters
308         end-of-file. */
309 #ifdef HAVE_MMAP
310     fseek(rrd_file, 0, SEEK_END);
311     rrd_filesize = ftell(rrd_file);
312     fseek(rrd_file, rra_current, SEEK_SET);
313 #else
314     fseek(rrd_file, 0, SEEK_CUR);
315 #endif
316 
317 
318     /* get exclusive lock to whole file.
319      * lock gets removed when we close the file.
320      */
321     if (LockRRD(rrd_file) != 0) {
322       rrd_set_error("could not lock RRD");
323       rrd_free(&rrd);
324       fclose(rrd_file);
325       return(-1);
326     }
327 
328     if((updvals = malloc( sizeof(char*) * (rrd.stat_head->ds_cnt+1)))==NULL){
329 	rrd_set_error("allocating updvals pointer array");
330 	rrd_free(&rrd);
331         fclose(rrd_file);
332 	return(-1);
333     }
334 
335     if ((pdp_temp = malloc(sizeof(rrd_value_t)
336 			   *rrd.stat_head->ds_cnt))==NULL){
337 	rrd_set_error("allocating pdp_temp ...");
338 	free(updvals);
339 	rrd_free(&rrd);
340         fclose(rrd_file);
341 	return(-1);
342     }
343 
344     if ((tmpl_idx = malloc(sizeof(unsigned long)
345 			   *(rrd.stat_head->ds_cnt+1)))==NULL){
346 	rrd_set_error("allocating tmpl_idx ...");
347 	free(pdp_temp);
348 	free(updvals);
349 	rrd_free(&rrd);
350         fclose(rrd_file);
351 	return(-1);
352     }
353     /* initialize tmplt redirector */
354     /* default config example (assume DS 1 is a CDEF DS)
355        tmpl_idx[0] -> 0; (time)
356        tmpl_idx[1] -> 1; (DS 0)
357        tmpl_idx[2] -> 3; (DS 2)
358        tmpl_idx[3] -> 4; (DS 3) */
359     tmpl_idx[0] = 0; /* time */
360     for (i = 1, ii = 1 ; i <= rrd.stat_head->ds_cnt ; i++)
361 	{
362 	   if (dst_conv(rrd.ds_def[i-1].dst) != DST_CDEF)
363 	      tmpl_idx[ii++]=i;
364 	}
365     tmpl_cnt= ii;
366 
367     if (tmplt) {
368     	/* we should work on a writeable copy here */
369 	char *dsname;
370 	unsigned int tmpl_len;
371     	char *tmplt_copy = strdup(tmplt);
372 	dsname = tmplt_copy;
373 	tmpl_cnt = 1; /* the first entry is the time */
374 	tmpl_len = strlen(tmplt_copy);
375 	for(i=0;i<=tmpl_len ;i++) {
376 	    if (tmplt_copy[i] == ':' || tmplt_copy[i] == '\0') {
377 		tmplt_copy[i] = '\0';
378 		if (tmpl_cnt>rrd.stat_head->ds_cnt){
379   		    rrd_set_error("tmplt contains more DS definitions than RRD");
380 		    free(updvals); free(pdp_temp);
381 		    free(tmpl_idx); rrd_free(&rrd);
382 		    fclose(rrd_file); return(-1);
383 		}
384 		if ((tmpl_idx[tmpl_cnt++] = ds_match(&rrd,dsname)) == -1){
385   		    rrd_set_error("unknown DS name '%s'",dsname);
386 		    free(updvals); free(pdp_temp);
387 		    free(tmplt_copy);
388 		    free(tmpl_idx); rrd_free(&rrd);
389 		    fclose(rrd_file); return(-1);
390 		} else {
391 		  /* the first element is always the time */
392 		  tmpl_idx[tmpl_cnt-1]++;
393 		  /* go to the next entry on the tmplt_copy */
394 		  dsname = &tmplt_copy[i+1];
395                   /* fix the damage we did before */
396                   if (i<tmpl_len) {
397                      tmplt_copy[i]=':';
398                   }
399 
400 		}
401 	    }
402 	}
403 	free(tmplt_copy);
404     }
405     if ((pdp_new = malloc(sizeof(rrd_value_t)
406 			  *rrd.stat_head->ds_cnt))==NULL){
407 	rrd_set_error("allocating pdp_new ...");
408 	free(updvals);
409 	free(pdp_temp);
410 	free(tmpl_idx);
411 	rrd_free(&rrd);
412         fclose(rrd_file);
413 	return(-1);
414     }
415 
416 #ifdef HAVE_MMAP
417     rrd_mmaped_file = mmap(0,
418 		    	rrd_filesize,
419 			PROT_READ | PROT_WRITE,
420 			MAP_SHARED,
421 			fileno(rrd_file),
422 			0);
423     if (rrd_mmaped_file == MAP_FAILED) {
424         rrd_set_error("error mmapping file %s", filename);
425 	free(updvals);
426 	free(pdp_temp);
427 	free(tmpl_idx);
428 	rrd_free(&rrd);
429         fclose(rrd_file);
430 	return(-1);
431     }
432 #ifdef USE_MADVISE
433     /* when we use mmaping we tell the kernel the mmap equivalent
434        of POSIX_FADV_RANDOM */
435     madvise(rrd_mmaped_file,rrd_filesize,MADV_RANDOM);
436 #endif
437 #endif
438     /* loop through the arguments. */
439     for(arg_i=0; arg_i<argc;arg_i++) {
440 	char *stepper = strdup(argv[arg_i]);
441         char *step_start = stepper;
442 	char *p;
443 	char *parsetime_error = NULL;
444 	enum {atstyle, normal} timesyntax;
445 	struct rrd_time_value ds_tv;
446         if (stepper == NULL){
447                 rrd_set_error("failed duplication argv entry");
448 		free(step_start);
449                 free(updvals);
450                 free(pdp_temp);
451                 free(tmpl_idx);
452                 rrd_free(&rrd);
453 #ifdef HAVE_MMAP
454     		munmap(rrd_mmaped_file, rrd_filesize);
455 #endif
456                 fclose(rrd_file);
457                 return(-1);
458          }
459 	/* initialize all ds input to unknown except the first one
460            which has always got to be set */
461 	for(ii=1;ii<=rrd.stat_head->ds_cnt;ii++) updvals[ii] = "U";
462 	updvals[0]=stepper;
463 	/* separate all ds elements; first must be examined separately
464 	   due to alternate time syntax */
465 	if ((p=strchr(stepper,'@'))!=NULL) {
466 	    timesyntax = atstyle;
467 	    *p = '\0';
468 	    stepper = p+1;
469 	} else if ((p=strchr(stepper,':'))!=NULL) {
470 	    timesyntax = normal;
471 	    *p = '\0';
472 	    stepper = p+1;
473 	} else {
474 	    rrd_set_error("expected timestamp not found in data source from %s",
475 			  argv[arg_i]);
476 	    free(step_start);
477 	    break;
478 	}
479 	ii=1;
480 	updvals[tmpl_idx[ii]] = stepper;
481 	while (*stepper) {
482 	    if (*stepper == ':') {
483 		*stepper = '\0';
484 		ii++;
485 		if (ii<tmpl_cnt){
486 		    updvals[tmpl_idx[ii]] = stepper+1;
487 		}
488 	    }
489 	    stepper++;
490 	}
491 
492 	if (ii != tmpl_cnt-1) {
493 	    rrd_set_error("expected %lu data source readings (got %lu) from %s",
494 			  tmpl_cnt-1, ii, argv[arg_i]);
495 	    free(step_start);
496 	    break;
497 	}
498 
499         /* get the time from the reading ... handle N */
500 	if (timesyntax == atstyle) {
501             if ((parsetime_error = parsetime(updvals[0], &ds_tv))) {
502                 rrd_set_error("ds time: %s: %s", updvals[0], parsetime_error );
503 		free(step_start);
504 		break;
505 	    }
506 	    if (ds_tv.type == RELATIVE_TO_END_TIME ||
507 		ds_tv.type == RELATIVE_TO_START_TIME) {
508 		rrd_set_error("specifying time relative to the 'start' "
509 			      "or 'end' makes no sense here: %s",
510 			      updvals[0]);
511 		free(step_start);
512 		break;
513 	    }
514 
515 	    current_time = mktime(&ds_tv.tm) + ds_tv.offset;
516 	    current_time_usec = 0; /* FIXME: how to handle usecs here ? */
517 
518 	} else if (strcmp(updvals[0],"N")==0){
519 	    gettimeofday(&tmp_time, 0);
520 	    normalize_time(&tmp_time);
521 	    current_time = tmp_time.tv_sec;
522 	    current_time_usec = tmp_time.tv_usec;
523 	} else {
524 	    double tmp;
525 	    tmp = strtod(updvals[0], 0);
526 	    current_time = floor(tmp);
527 	    current_time_usec = (long)((tmp-(double)current_time) * 1000000.0);
528 	}
529 	/* dont do any correction for old version RRDs */
530 	if(version < 3)
531 	    current_time_usec = 0;
532 
533 	if(current_time < rrd.live_head->last_up ||
534 	  (current_time == rrd.live_head->last_up &&
535 	   (long)current_time_usec <= (long)rrd.live_head->last_up_usec)) {
536 	    rrd_set_error("%s: illegal attempt to update using time %ld when "
537 			  "last update time is %ld (minimum one second step)",
538 			  filename, current_time, rrd.live_head->last_up);
539 	    free(step_start);
540 	    break;
541 	}
542 
543 
544 	/* seek to the beginning of the rra's */
545 	if (rra_current != rra_begin) {
546 #ifndef HAVE_MMAP
547 	    if(fseek(rrd_file, rra_begin, SEEK_SET) != 0) {
548 		rrd_set_error("seek error in rrd");
549 		free(step_start);
550 		break;
551 	    }
552 #endif
553 	    rra_current = rra_begin;
554 	}
555 	rra_start = rra_begin;
556 
557 	/* when was the current pdp started */
558 	proc_pdp_age = rrd.live_head->last_up % rrd.stat_head->pdp_step;
559 	proc_pdp_st = rrd.live_head->last_up - proc_pdp_age;
560 
561 	/* when did the last pdp_st occur */
562 	occu_pdp_age = current_time % rrd.stat_head->pdp_step;
563 	occu_pdp_st = current_time - occu_pdp_age;
564 
565 	/* interval = current_time - rrd.live_head->last_up; */
566 	interval    = (double)(current_time - rrd.live_head->last_up)
567 	            + (double)((long)current_time_usec - (long)rrd.live_head->last_up_usec)/1000000.0;
568 
569 	if (occu_pdp_st > proc_pdp_st){
570 	    /* OK we passed the pdp_st moment*/
571 	    pre_int =  (long)occu_pdp_st - rrd.live_head->last_up; /* how much of the input data
572 							      * occurred before the latest
573 							      * pdp_st moment*/
574 	    pre_int -= ((double)rrd.live_head->last_up_usec)/1000000.0; /* adjust usecs */
575 	    post_int = occu_pdp_age;			     /* how much after it */
576 	    post_int += ((double)current_time_usec)/1000000.0;  /* adjust usecs */
577 	} else {
578 	    pre_int = interval;
579 	    post_int = 0;
580 	}
581 
582 #ifdef DEBUG
583 	printf(
584 	       "proc_pdp_age %lu\t"
585 	       "proc_pdp_st %lu\t"
586 	       "occu_pfp_age %lu\t"
587 	       "occu_pdp_st %lu\t"
588 	       "int %lf\t"
589 	       "pre_int %lf\t"
590 	       "post_int %lf\n", proc_pdp_age, proc_pdp_st,
591 		occu_pdp_age, occu_pdp_st,
592 	       interval, pre_int, post_int);
593 #endif
594 
595 	/* process the data sources and update the pdp_prep
596 	 * area accordingly */
597 	for(i=0;i<rrd.stat_head->ds_cnt;i++){
598 	    enum dst_en dst_idx;
599 	    dst_idx= dst_conv(rrd.ds_def[i].dst);
600 
601             /* make sure we do not build diffs with old last_ds values */
602 	    if(rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt < interval) {
603 		strncpy(rrd.pdp_prep[i].last_ds,"U",LAST_DS_LEN-1);
604 		rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
605 	    }
606 
607   	    /* NOTE: DST_CDEF should never enter this if block, because
608              * updvals[i+1][0] is initialized to 'U'; unless the caller
609 	     * accidently specified a value for the DST_CDEF. To handle
610 	      * this case, an extra check is required. */
611 
612 	    if((updvals[i+1][0] != 'U') &&
613 		   (dst_idx != DST_CDEF) &&
614 	       rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt >= interval) {
615 	       double rate = DNAN;
616      	       /* the data source type defines how to process the data */
617 		/* pdp_new contains rate * time ... eg the bytes
618 		 * transferred during the interval. Doing it this way saves
619 		 * a lot of math operations */
620 
621 
622 		switch(dst_idx){
623 		case DST_COUNTER:
624 		case DST_DERIVE:
625             for(ii=0;updvals[i+1][ii] != '\0';ii++){
626                  if((updvals[i+1][ii] < '0' || updvals[i+1][ii] > '9') && (ii != 0 && updvals[i+1][ii] != '-')){
627                       rrd_set_error("not a simple integer: '%s'",updvals[i+1]);
628                       break;
629                  }
630             }
631             if (rrd_test_error()){
632                    break;
633             }
634 		    if(rrd.pdp_prep[i].last_ds[0] != 'U'){
635 		       pdp_new[i]= rrd_diff(updvals[i+1],rrd.pdp_prep[i].last_ds);
636 		       if(dst_idx == DST_COUNTER) {
637 			  /* simple overflow catcher suggested by Andres Kroonmaa */
638 			  /* this will fail terribly for non 32 or 64 bit counters ... */
639 			  /* are there any others in SNMP land ? */
640 			  if (pdp_new[i] < (double)0.0 )
641 			    pdp_new[i] += (double)4294967296.0 ;  /* 2^32 */
642 			  if (pdp_new[i] < (double)0.0 )
643 			    pdp_new[i] += (double)18446744069414584320.0; /* 2^64-2^32 */;
644 		       }
645 		       rate = pdp_new[i] / interval;
646 		    }
647 		   else {
648 		     pdp_new[i]= DNAN;
649 		   }
650 		   break;
651 		case DST_ABSOLUTE:
652                     errno = 0;
653                     pdp_new[i] = strtod(updvals[i+1],&endptr);
654                     if (errno > 0){
655                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
656                         break;
657                     };
658                     if (endptr[0] != '\0'){
659                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
660                         break;
661                     }
662 		    rate = pdp_new[i] / interval;
663 		    break;
664 		case DST_GAUGE:
665                     errno = 0;
666                     pdp_new[i] = strtod(updvals[i+1],&endptr) * interval;
667                     if (errno > 0){
668                         rrd_set_error("converting  '%s' to float: %s",updvals[i+1],rrd_strerror(errno));
669                         break;
670                     };
671                     if (endptr[0] != '\0'){
672                         rrd_set_error("conversion of '%s' to float not complete: tail '%s'",updvals[i+1],endptr);
673                         break;
674                     }
675 		    rate = pdp_new[i] / interval;
676 		    break;
677 		default:
678 		    rrd_set_error("rrd contains unknown DS type : '%s'",
679 				  rrd.ds_def[i].dst);
680 		    break;
681 		}
682 		/* break out of this for loop if the error string is set */
683 		if (rrd_test_error()){
684 		    break;
685 		}
686 	       /* make sure pdp_temp is neither too large or too small
687 		* if any of these occur it becomes unknown ...
688 		* sorry folks ... */
689 	       if ( ! isnan(rate) &&
690 	            (( ! isnan(rrd.ds_def[i].par[DS_max_val].u_val) &&
691 	                 rate > rrd.ds_def[i].par[DS_max_val].u_val ) ||
692 	            ( ! isnan(rrd.ds_def[i].par[DS_min_val].u_val) &&
693 	                rate < rrd.ds_def[i].par[DS_min_val].u_val ))){
694 		  pdp_new[i] = DNAN;
695 	       }
696 	    } else {
697 		/* no news is news all the same */
698 		pdp_new[i] = DNAN;
699 	    }
700 
701 
702 	    /* make a copy of the command line argument for the next run */
703 #ifdef DEBUG
704 	    fprintf(stderr,
705 		    "prep ds[%lu]\t"
706 		    "last_arg '%s'\t"
707 		    "this_arg '%s'\t"
708 		    "pdp_new %10.2f\n",
709 		    i,
710 		    rrd.pdp_prep[i].last_ds,
711 		    updvals[i+1], pdp_new[i]);
712 #endif
713 	    strncpy(rrd.pdp_prep[i].last_ds, updvals[i+1],LAST_DS_LEN-1);
714 	    rrd.pdp_prep[i].last_ds[LAST_DS_LEN-1]='\0';
715 	}
716 	/* break out of the argument parsing loop if the error_string is set */
717 	if (rrd_test_error()){
718 	    free(step_start);
719 	    break;
720 	}
721 	/* has a pdp_st moment occurred since the last run ? */
722 
723 	if (proc_pdp_st == occu_pdp_st){
724 	    /* no we have not passed a pdp_st moment. therefore update is simple */
725 
726 	    for(i=0;i<rrd.stat_head->ds_cnt;i++){
727 		if(isnan(pdp_new[i])) {
728 	            /* this is not realy accurate if we use subsecond data arival time
729 		       should have thought of it when going subsecond resolution ...
730                        sorry next format change we will have it! */
731 		    rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt += floor(interval);
732 		} else {
733 		     if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
734 		     	rrd.pdp_prep[i].scratch[PDP_val].u_val= pdp_new[i];
735 		     } else {
736 		        rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i];
737 		     }
738 		}
739 #ifdef DEBUG
740 		fprintf(stderr,
741 			"NO PDP  ds[%lu]\t"
742 			"value %10.2f\t"
743 			"unkn_sec %5lu\n",
744 			i,
745 			rrd.pdp_prep[i].scratch[PDP_val].u_val,
746 			rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
747 #endif
748 	    }
749 	} else {
750 	    /* an pdp_st has occurred. */
751 
752 	    /* in pdp_prep[].scratch[PDP_val].u_val we have collected rate*seconds which
753 	     * occurred up to the last run.
754 	    pdp_new[] contains rate*seconds from the latest run.
755 	    pdp_temp[] will contain the rate for cdp */
756 
757 	    for(i=0;i<rrd.stat_head->ds_cnt;i++){
758 		/* update pdp_prep to the current pdp_st. */
759                 double pre_unknown = 0.0;
760 		if(isnan(pdp_new[i]))
761                     /* a final bit of unkonwn to be added bevore calculation
762 		     * we use a tempaorary variable for this so that we
763 		     * don't have to turn integer lines before using the value */
764 		    pre_unknown = pre_int;
765 		else {
766   	             if (isnan( rrd.pdp_prep[i].scratch[PDP_val].u_val )){
767 		     	rrd.pdp_prep[i].scratch[PDP_val].u_val= 	pdp_new[i]/interval*pre_int;
768 		     } else {
769 		        rrd.pdp_prep[i].scratch[PDP_val].u_val+= pdp_new[i]/interval*pre_int;
770 		     }
771 		 }
772 
773 
774 		/* if too much of the pdp_prep is unknown we dump it */
775 		if (
776 		    /* removed because this does not agree with the definition
777 		       a heart beat can be unknown */
778 		    /* (rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt
779 		     > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) || */
780 		    /* if the interval is larger thatn mrhb we get NAN */
781 	            (interval > rrd.ds_def[i].par[DS_mrhb_cnt].u_cnt) ||
782                     (rrd.stat_head -> pdp_step / 2.0 <
783 		     rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)) {
784 		    pdp_temp[i] = DNAN;
785 		} else {
786 		    pdp_temp[i] = rrd.pdp_prep[i].scratch[PDP_val].u_val
787 			/ ((double)(occu_pdp_st - proc_pdp_st
788                                     - rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt)
789                             -pre_unknown);
790 		}
791 
792 		/* process CDEF data sources; remember each CDEF DS can
793 		 * only reference other DS with a lower index number */
794 	    if (dst_conv(rrd.ds_def[i].dst) == DST_CDEF) {
795 		   rpnp_t *rpnp;
796 		   rpnp = rpn_expand((rpn_cdefds_t *) &(rrd.ds_def[i].par[DS_cdef]));
797 		   /* substitue data values for OP_VARIABLE nodes */
798 		   for (ii = 0; rpnp[ii].op != OP_END; ii++)
799 		   {
800 			  if (rpnp[ii].op == OP_VARIABLE) {
801 				 rpnp[ii].op = OP_NUMBER;
802 				 rpnp[ii].val =  pdp_temp[rpnp[ii].ptr];
803 			  }
804 		   }
805 		   /* run the rpn calculator */
806 		   if (rpn_calc(rpnp,&rpnstack,0,pdp_temp,i) == -1) {
807 			  free(rpnp);
808 			  break; /* exits the data sources pdp_temp loop */
809 		   }
810 		}
811 
812 		/* make pdp_prep ready for the next run */
813 		if(isnan(pdp_new[i])){
814 	            /* this is not realy accurate if we use subsecond data arival time
815 		       should have thought of it when going subsecond resolution ...
816                        sorry next format change we will have it! */
817 		    rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = floor(post_int);
818 		    rrd.pdp_prep[i].scratch[PDP_val].u_val = DNAN;
819 		} else {
820 		    rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt = 0;
821 		    rrd.pdp_prep[i].scratch[PDP_val].u_val =
822 			pdp_new[i]/interval*post_int;
823 		}
824 
825 #ifdef DEBUG
826 		fprintf(stderr,
827 			"PDP UPD ds[%lu]\t"
828 			"pdp_temp %10.2f\t"
829 			"new_prep %10.2f\t"
830 			"new_unkn_sec %5lu\n",
831 			i, pdp_temp[i],
832 			rrd.pdp_prep[i].scratch[PDP_val].u_val,
833 			rrd.pdp_prep[i].scratch[PDP_unkn_sec_cnt].u_cnt);
834 #endif
835 	    }
836 
837 		/* if there were errors during the last loop, bail out here */
838 	    if (rrd_test_error()){
839 	       free(step_start);
840 	       break;
841 	    }
842 
843 		/* compute the number of elapsed pdp_st moments */
844 		elapsed_pdp_st = (occu_pdp_st - proc_pdp_st) / rrd.stat_head -> pdp_step;
845 #ifdef DEBUG
846 		fprintf(stderr,"elapsed PDP steps: %lu\n", elapsed_pdp_st);
847 #endif
848 		if (rra_step_cnt == NULL)
849 		{
850 		   rra_step_cnt = (unsigned long *)
851 			  malloc((rrd.stat_head->rra_cnt)* sizeof(unsigned long));
852 		}
853 
854 	    for(i = 0, rra_start = rra_begin;
855 		i < rrd.stat_head->rra_cnt;
856 	    rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
857 		i++)
858 		{
859 		current_cf = cf_conv(rrd.rra_def[i].cf_nam);
860 		start_pdp_offset = rrd.rra_def[i].pdp_cnt -
861 		   (proc_pdp_st / rrd.stat_head -> pdp_step) % rrd.rra_def[i].pdp_cnt;
862         if (start_pdp_offset <= elapsed_pdp_st) {
863            rra_step_cnt[i] = (elapsed_pdp_st - start_pdp_offset) /
864 		      rrd.rra_def[i].pdp_cnt + 1;
865 	    } else {
866 		   rra_step_cnt[i] = 0;
867 		}
868 
869 		if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
870 		{
871 		   /* If this is a bulk update, we need to skip ahead in the seasonal
872 			* arrays so that they will be correct for the next observed value;
873 			* note that for the bulk update itself, no update will occur to
874 			* DEVSEASONAL or SEASONAL; futhermore, HWPREDICT and DEVPREDICT will
875 			* be set to DNAN. */
876            if (rra_step_cnt[i] > 2)
877 		   {
878 			  /* skip update by resetting rra_step_cnt[i],
879 			   * note that this is not data source specific; this is due
880 			   * to the bulk update, not a DNAN value for the specific data
881 			   * source. */
882 			  rra_step_cnt[i] = 0;
883               lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st,
884 			     &last_seasonal_coef);
885 		      lookup_seasonal(&rrd,i,rra_start,rrd_file,elapsed_pdp_st + 1,
886 			     &seasonal_coef);
887 		   }
888 
889 		  /* periodically run a smoother for seasonal effects */
890 		  /* Need to use first cdp parameter buffer to track
891 		   * burnin (burnin requires a specific smoothing schedule).
892 		   * The CDP_init_seasonal parameter is really an RRA level,
893 		   * not a data source within RRA level parameter, but the rra_def
894 		   * is read only for rrd_update (not flushed to disk). */
895 		  iii = i*(rrd.stat_head -> ds_cnt);
896 		  if (rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt
897 			  <= BURNIN_CYCLES)
898 		  {
899 		     if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
900 				 > rrd.rra_def[i].row_cnt - 1) {
901 			   /* mark off one of the burnin cycles */
902 			   ++(rrd.cdp_prep[iii].scratch[CDP_init_seasonal].u_cnt);
903 		       schedule_smooth = 1;
904 			 }
905 		  } else {
906 			 /* someone has no doubt invented a trick to deal with this
907 			  * wrap around, but at least this code is clear. */
908 			 if (rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt >
909 			     rrd.rra_ptr[i].cur_row)
910 			 {
911 				 /* here elapsed_pdp_st = rra_step_cnt[i] because of 1-1
912 				  * mapping between PDP and CDP */
913 				 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st
914 					>= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
915 				 {
916 #ifdef DEBUG
917 					fprintf(stderr,
918 					"schedule_smooth 1: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
919                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
920 					rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
921 #endif
922 					schedule_smooth = 1;
923 				 }
924              } else {
925 				 /* can't rely on negative numbers because we are working with
926 				  * unsigned values */
927 				 /* Don't need modulus here. If we've wrapped more than once, only
928 				  * one smooth is executed at the end. */
929 				 if (rrd.rra_ptr[i].cur_row + elapsed_pdp_st >= rrd.rra_def[i].row_cnt
930 					&& rrd.rra_ptr[i].cur_row + elapsed_pdp_st - rrd.rra_def[i].row_cnt
931 					>= rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt)
932 				 {
933 #ifdef DEBUG
934 					fprintf(stderr,
935 					"schedule_smooth 2: cur_row %lu, elapsed_pdp_st %lu, smooth idx %lu\n",
936                     rrd.rra_ptr[i].cur_row, elapsed_pdp_st,
937 					rrd.rra_def[i].par[RRA_seasonal_smooth_idx].u_cnt);
938 #endif
939 					schedule_smooth = 1;
940 				 }
941 			 }
942 		  }
943 
944 	      rra_current = ftell(rrd_file);
945 		} /* if cf is DEVSEASONAL or SEASONAL */
946 
947         if (rrd_test_error()) break;
948 
949 		    /* update CDP_PREP areas */
950 		    /* loop over data soures within each RRA */
951 		    for(ii = 0;
952 			ii < rrd.stat_head->ds_cnt;
953 			ii++)
954 			{
955 
956 			/* iii indexes the CDP prep area for this data source within the RRA */
957 			iii=i*rrd.stat_head->ds_cnt+ii;
958 
959 			if (rrd.rra_def[i].pdp_cnt > 1) {
960 
961 			   if (rra_step_cnt[i] > 0) {
962 			   /* If we are in this block, as least 1 CDP value will be written to
963 				* disk, this is the CDP_primary_val entry. If more than 1 value needs
964 				* to be written, then the "fill in" value is the CDP_secondary_val
965 				* entry. */
966 				  if (isnan(pdp_temp[ii]))
967                   {
968 					 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += start_pdp_offset;
969 					 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
970 				  } else {
971 					 /* CDP_secondary value is the RRA "fill in" value for intermediary
972 					  * CDP data entries. No matter the CF, the value is the same because
973 					  * the average, max, min, and last of a list of identical values is
974 					  * the same, namely, the value itself. */
975 					 rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = pdp_temp[ii];
976 				  }
977 
978 				  if (rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt
979 				      > rrd.rra_def[i].pdp_cnt*
980 				      rrd.rra_def[i].par[RRA_cdp_xff_val].u_val)
981 				  {
982 					 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
983 					 /* initialize carry over */
984 					 if (current_cf == CF_AVERAGE) {
985 						   if (isnan(pdp_temp[ii])) {
986 							  rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
987 						   } else {
988 							  rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
989 								 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
990 						   }
991 					 } else {
992 						rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
993 					 }
994 				  } else {
995 					 rrd_value_t cum_val, cur_val;
996 				     switch (current_cf) {
997 						case CF_AVERAGE:
998 						  cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, 0.0);
999 						  cur_val = IFDNAN(pdp_temp[ii],0.0);
1000                           rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val =
1001 					       (cum_val + cur_val * start_pdp_offset) /
1002 				           (rrd.rra_def[i].pdp_cnt
1003 					       -rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt);
1004 						   /* initialize carry over value */
1005 						   if (isnan(pdp_temp[ii])) {
1006 							  rrd.cdp_prep[iii].scratch[CDP_val].u_val = DNAN;
1007 						   } else {
1008 							  rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1009 								 ((elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt);
1010 						   }
1011 						   break;
1012 						case CF_MAXIMUM:
1013 						  cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, -DINF);
1014 						  cur_val = IFDNAN(pdp_temp[ii],-DINF);
1015 #ifdef DEBUG
1016 						  if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1017 							  isnan(pdp_temp[ii])) {
1018 						     fprintf(stderr,
1019 								"RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1020 								i,ii);
1021 							 exit(-1);
1022 						  }
1023 #endif
1024 						  if (cur_val > cum_val)
1025 							 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1026 						  else
1027 							 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1028 						  /* initialize carry over value */
1029 						  rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1030 						  break;
1031 						case CF_MINIMUM:
1032 						  cum_val = IFDNAN(rrd.cdp_prep[iii].scratch[CDP_val].u_val, DINF);
1033 						  cur_val = IFDNAN(pdp_temp[ii],DINF);
1034 #ifdef DEBUG
1035 						  if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val) &&
1036 							  isnan(pdp_temp[ii])) {
1037 						     fprintf(stderr,
1038 								"RRA %lu, DS %lu, both CDP_val and pdp_temp are DNAN!",
1039 								i,ii);
1040 							 exit(-1);
1041 						  }
1042 #endif
1043 						  if (cur_val < cum_val)
1044 							 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cur_val;
1045 						  else
1046 							 rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = cum_val;
1047 						  /* initialize carry over value */
1048 						  rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1049 						  break;
1050 						case CF_LAST:
1051 						default:
1052 						   rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = pdp_temp[ii];
1053 						   /* initialize carry over value */
1054 						   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1055 						break;
1056 					 }
1057 				  } /* endif meets xff value requirement for a valid value */
1058 				  /* initialize carry over CDP_unkn_pdp_cnt, this must after CDP_primary_val
1059 				   * is set because CDP_unkn_pdp_cnt is required to compute that value. */
1060 				  if (isnan(pdp_temp[ii]))
1061 					 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt =
1062 						(elapsed_pdp_st - start_pdp_offset) % rrd.rra_def[i].pdp_cnt;
1063 				  else
1064 					 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt = 0;
1065                } else  /* rra_step_cnt[i]  == 0 */
1066 			   {
1067 #ifdef DEBUG
1068 				  if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val)) {
1069 				  fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, DNAN\n",
1070 					 i,ii);
1071 				  } else {
1072 				  fprintf(stderr,"schedule CDP_val update, RRA %lu DS %lu, %10.2f\n",
1073 					 i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1074 				  }
1075 #endif
1076 				  if (isnan(pdp_temp[ii])) {
1077 			     	 rrd.cdp_prep[iii].scratch[CDP_unkn_pdp_cnt].u_cnt += elapsed_pdp_st;
1078 				  } else if (isnan(rrd.cdp_prep[iii].scratch[CDP_val].u_val))
1079 				  {
1080 					 if (current_cf == CF_AVERAGE) {
1081 					    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii] *
1082 						   elapsed_pdp_st;
1083 					 } else {
1084 					    rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1085 					 }
1086 #ifdef DEBUG
1087 					 fprintf(stderr,"Initialize CDP_val for RRA %lu DS %lu: %10.2f\n",
1088 					    i,ii,rrd.cdp_prep[iii].scratch[CDP_val].u_val);
1089 #endif
1090 				  } else {
1091 					 switch (current_cf) {
1092 					 case CF_AVERAGE:
1093 					    rrd.cdp_prep[iii].scratch[CDP_val].u_val += pdp_temp[ii] *
1094 						   elapsed_pdp_st;
1095 						break;
1096 					 case CF_MINIMUM:
1097 						if (pdp_temp[ii] < rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1098 						   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1099 						break;
1100 					 case CF_MAXIMUM:
1101 						if (pdp_temp[ii] > rrd.cdp_prep[iii].scratch[CDP_val].u_val)
1102 						   rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1103 						break;
1104 					 case CF_LAST:
1105 					 default:
1106 						rrd.cdp_prep[iii].scratch[CDP_val].u_val = pdp_temp[ii];
1107 						break;
1108 					 }
1109 				  }
1110 			   }
1111 			} else { /* rrd.rra_def[i].pdp_cnt == 1 */
1112 			   if (elapsed_pdp_st > 2)
1113 			   {
1114 				   switch (current_cf) {
1115 				   case CF_AVERAGE:
1116 				   default:
1117 			          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val=pdp_temp[ii];
1118 			          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val=pdp_temp[ii];
1119 					  break;
1120                    case CF_SEASONAL:
1121 				   case CF_DEVSEASONAL:
1122 					  /* need to update cached seasonal values, so they are consistent
1123 					   * with the bulk update */
1124                       /* WARNING: code relies on the fact that CDP_hw_last_seasonal and
1125 					   * CDP_last_deviation are the same. */
1126                       rrd.cdp_prep[iii].scratch[CDP_hw_last_seasonal].u_val =
1127 						 last_seasonal_coef[ii];
1128 					  rrd.cdp_prep[iii].scratch[CDP_hw_seasonal].u_val =
1129 						 seasonal_coef[ii];
1130 					  break;
1131                    case CF_HWPREDICT:
1132 					  /* need to update the null_count and last_null_count.
1133 					   * even do this for non-DNAN pdp_temp because the
1134 					   * algorithm is not learning from batch updates. */
1135 					  rrd.cdp_prep[iii].scratch[CDP_null_count].u_cnt +=
1136 						 elapsed_pdp_st;
1137 					  rrd.cdp_prep[iii].scratch[CDP_last_null_count].u_cnt +=
1138 						 elapsed_pdp_st - 1;
1139 					  /* fall through */
1140 				   case CF_DEVPREDICT:
1141 			          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = DNAN;
1142 			          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = DNAN;
1143 					  break;
1144                    case CF_FAILURES:
1145 					  /* do not count missed bulk values as failures */
1146 			          rrd.cdp_prep[iii].scratch[CDP_primary_val].u_val = 0;
1147 			          rrd.cdp_prep[iii].scratch[CDP_secondary_val].u_val = 0;
1148 					  /* need to reset violations buffer.
1149 					   * could do this more carefully, but for now, just
1150 					   * assume a bulk update wipes away all violations. */
1151                       erase_violations(&rrd, iii, i);
1152 					  break;
1153 				   }
1154 			   }
1155 			} /* endif rrd.rra_def[i].pdp_cnt == 1 */
1156 
1157 			if (rrd_test_error()) break;
1158 
1159 			} /* endif data sources loop */
1160         } /* end RRA Loop */
1161 
1162 		/* this loop is only entered if elapsed_pdp_st < 3 */
1163 		for (j = elapsed_pdp_st, scratch_idx = CDP_primary_val;
1164 			 j > 0 && j < 3; j--, scratch_idx = CDP_secondary_val)
1165 		{
1166 	       for(i = 0, rra_start = rra_begin;
1167 		   i < rrd.stat_head->rra_cnt;
1168 	       rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1169 		   i++)
1170 		   {
1171 			  if (rrd.rra_def[i].pdp_cnt > 1) continue;
1172 
1173 	          current_cf = cf_conv(rrd.rra_def[i].cf_nam);
1174 			  if (current_cf == CF_SEASONAL || current_cf == CF_DEVSEASONAL)
1175 			  {
1176 		         lookup_seasonal(&rrd,i,rra_start,rrd_file,
1177 				    elapsed_pdp_st + (scratch_idx == CDP_primary_val ? 1 : 2),
1178 			        &seasonal_coef);
1179                  rra_current = ftell(rrd_file);
1180 			  }
1181 			  if (rrd_test_error()) break;
1182 		      /* loop over data soures within each RRA */
1183 		      for(ii = 0;
1184 			  ii < rrd.stat_head->ds_cnt;
1185 			  ii++)
1186 			  {
1187 			     update_aberrant_CF(&rrd,pdp_temp[ii],current_cf,
1188 					i*(rrd.stat_head->ds_cnt) + ii,i,ii,
1189 				    scratch_idx, seasonal_coef);
1190 			  }
1191            } /* end RRA Loop */
1192 		   if (rrd_test_error()) break;
1193 	    } /* end elapsed_pdp_st loop */
1194 
1195 		if (rrd_test_error()) break;
1196 
1197 		/* Ready to write to disk */
1198 		/* Move sequentially through the file, writing one RRA at a time.
1199 		 * Note this architecture divorces the computation of CDP with
1200 		 * flushing updated RRA entries to disk. */
1201 	    for(i = 0, rra_start = rra_begin;
1202 		i < rrd.stat_head->rra_cnt;
1203 	    rra_start += rrd.rra_def[i].row_cnt * rrd.stat_head -> ds_cnt * sizeof(rrd_value_t),
1204 		i++) {
1205 		/* is th5Aere anything to write for this RRA? If not, continue. */
1206         if (rra_step_cnt[i] == 0) continue;
1207 
1208 		/* write the first row */
1209 #ifdef DEBUG
1210         fprintf(stderr,"  -- RRA Preseek %ld\n",ftell(rrd_file));
1211 #endif
1212 	    rrd.rra_ptr[i].cur_row++;
1213 	    if (rrd.rra_ptr[i].cur_row >= rrd.rra_def[i].row_cnt)
1214 		   rrd.rra_ptr[i].cur_row = 0; /* wrap around */
1215 		/* positition on the first row */
1216 		rra_pos_tmp = rra_start +
1217 		   (rrd.stat_head->ds_cnt)*(rrd.rra_ptr[i].cur_row)*sizeof(rrd_value_t);
1218 		if(rra_pos_tmp != rra_current) {
1219 #ifndef HAVE_MMAP
1220 		   if(fseek(rrd_file, rra_pos_tmp, SEEK_SET) != 0){
1221 		      rrd_set_error("seek error in rrd");
1222 		      break;
1223 		   }
1224 #endif
1225 		   rra_current = rra_pos_tmp;
1226 		}
1227 
1228 #ifdef DEBUG
1229 	    fprintf(stderr,"  -- RRA Postseek %ld\n",ftell(rrd_file));
1230 #endif
1231 		scratch_idx = CDP_primary_val;
1232 		if (pcdp_summary != NULL)
1233 		{
1234 		   rra_time = (current_time - current_time
1235 		   % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1236 		   - ((rra_step_cnt[i]-1)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1237 		}
1238 #ifdef HAVE_MMAP
1239 		pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1240 		   pcdp_summary, &rra_time, rrd_mmaped_file);
1241 #else
1242 		pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1243 		   pcdp_summary, &rra_time);
1244 #endif
1245 		if (rrd_test_error()) break;
1246 
1247 		/* write other rows of the bulk update, if any */
1248 		scratch_idx = CDP_secondary_val;
1249                for ( ; rra_step_cnt[i] > 1; rra_step_cnt[i]--)
1250 		{
1251                   if (++rrd.rra_ptr[i].cur_row == rrd.rra_def[i].row_cnt)
1252 		   {
1253 #ifdef DEBUG
1254               fprintf(stderr,"Wraparound for RRA %s, %lu updates left\n",
1255 			  rrd.rra_def[i].cf_nam, rra_step_cnt[i] - 1);
1256 #endif
1257 			  /* wrap */
1258 			  rrd.rra_ptr[i].cur_row = 0;
1259 			  /* seek back to beginning of current rra */
1260 		      if (fseek(rrd_file, rra_start, SEEK_SET) != 0)
1261 			  {
1262 		         rrd_set_error("seek error in rrd");
1263 		         break;
1264 			  }
1265 #ifdef DEBUG
1266 	          fprintf(stderr,"  -- Wraparound Postseek %ld\n",ftell(rrd_file));
1267 #endif
1268 			  rra_current = rra_start;
1269 		   }
1270 		   if (pcdp_summary != NULL)
1271 		   {
1272 		      rra_time = (current_time - current_time
1273 		      % (rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step))
1274 		      - ((rra_step_cnt[i]-2)*rrd.rra_def[i].pdp_cnt*rrd.stat_head->pdp_step);
1275 		   }
1276 #ifdef HAVE_MMAP
1277 		   pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1278 		      pcdp_summary, &rra_time, rrd_mmaped_file);
1279 #else
1280 		   pcdp_summary = write_RRA_row(&rrd, i, &rra_current, scratch_idx, rrd_file,
1281 		      pcdp_summary, &rra_time);
1282 #endif
1283 		}
1284 
1285 		if (rrd_test_error())
1286 		  break;
1287 		} /* RRA LOOP */
1288 
1289 	    /* break out of the argument parsing loop if error_string is set */
1290 	    if (rrd_test_error()){
1291 		   free(step_start);
1292 		   break;
1293 	    }
1294 
1295 	} /* endif a pdp_st has occurred */
1296 	rrd.live_head->last_up = current_time;
1297 	rrd.live_head->last_up_usec = current_time_usec;
1298 	free(step_start);
1299     } /* function argument loop */
1300 
1301     if (seasonal_coef != NULL) free(seasonal_coef);
1302     if (last_seasonal_coef != NULL) free(last_seasonal_coef);
1303 	if (rra_step_cnt != NULL) free(rra_step_cnt);
1304     rpnstack_free(&rpnstack);
1305 
1306 #ifdef HAVE_MMAP
1307     if (munmap(rrd_mmaped_file, rrd_filesize) == -1) {
1308             rrd_set_error("error writing(unmapping) file: %s", filename);
1309     }
1310 #endif
1311     /* if we got here and if there is an error and if the file has not been
1312      * written to, then close things up and return. */
1313     if (rrd_test_error()) {
1314 	free(updvals);
1315 	free(tmpl_idx);
1316 	rrd_free(&rrd);
1317 	free(pdp_temp);
1318 	free(pdp_new);
1319 	fclose(rrd_file);
1320 	return(-1);
1321     }
1322 
1323     /* aargh ... that was tough ... so many loops ... anyway, its done.
1324      * we just need to write back the live header portion now*/
1325 
1326     if (fseek(rrd_file, (sizeof(stat_head_t)
1327 			 + sizeof(ds_def_t)*rrd.stat_head->ds_cnt
1328 			 + sizeof(rra_def_t)*rrd.stat_head->rra_cnt),
1329 	      SEEK_SET) != 0) {
1330 	rrd_set_error("seek rrd for live header writeback");
1331 	free(updvals);
1332 	free(tmpl_idx);
1333 	rrd_free(&rrd);
1334 	free(pdp_temp);
1335 	free(pdp_new);
1336 	fclose(rrd_file);
1337 	return(-1);
1338     }
1339 
1340     if(version >= 3) {
1341 	    if(fwrite( rrd.live_head,
1342 		       sizeof(live_head_t), 1, rrd_file) != 1){
1343 		rrd_set_error("fwrite live_head to rrd");
1344 		free(updvals);
1345 		rrd_free(&rrd);
1346 		free(tmpl_idx);
1347 		free(pdp_temp);
1348 		free(pdp_new);
1349 		fclose(rrd_file);
1350 		return(-1);
1351 	    }
1352     }
1353     else {
1354 	    if(fwrite( &rrd.live_head->last_up,
1355 		       sizeof(time_t), 1, rrd_file) != 1){
1356 		rrd_set_error("fwrite live_head to rrd");
1357 		free(updvals);
1358 		rrd_free(&rrd);
1359 		free(tmpl_idx);
1360 		free(pdp_temp);
1361 		free(pdp_new);
1362 		fclose(rrd_file);
1363 		return(-1);
1364 	    }
1365     }
1366 
1367 
1368     if(fwrite( rrd.pdp_prep,
1369 	       sizeof(pdp_prep_t),
1370 	       rrd.stat_head->ds_cnt, rrd_file) != rrd.stat_head->ds_cnt){
1371 	rrd_set_error("ftwrite pdp_prep to rrd");
1372 	free(updvals);
1373 	rrd_free(&rrd);
1374 	free(tmpl_idx);
1375 	free(pdp_temp);
1376 	free(pdp_new);
1377 	fclose(rrd_file);
1378 	return(-1);
1379     }
1380 
1381     if(fwrite( rrd.cdp_prep,
1382 	       sizeof(cdp_prep_t),
1383 	       rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt, rrd_file)
1384        != rrd.stat_head->rra_cnt *rrd.stat_head->ds_cnt){
1385 
1386 	rrd_set_error("ftwrite cdp_prep to rrd");
1387 	free(updvals);
1388 	free(tmpl_idx);
1389 	rrd_free(&rrd);
1390 	free(pdp_temp);
1391 	free(pdp_new);
1392 	fclose(rrd_file);
1393 	return(-1);
1394     }
1395 
1396     if(fwrite( rrd.rra_ptr,
1397 	       sizeof(rra_ptr_t),
1398 	       rrd.stat_head->rra_cnt,rrd_file) != rrd.stat_head->rra_cnt){
1399 	rrd_set_error("fwrite rra_ptr to rrd");
1400 	free(updvals);
1401 	free(tmpl_idx);
1402 	rrd_free(&rrd);
1403 	free(pdp_temp);
1404 	free(pdp_new);
1405 	fclose(rrd_file);
1406 	return(-1);
1407     }
1408 
1409     /* OK now close the files and free the memory */
1410     if(fclose(rrd_file) != 0){
1411 	rrd_set_error("closing rrd");
1412 	free(updvals);
1413 	free(tmpl_idx);
1414 	rrd_free(&rrd);
1415 	free(pdp_temp);
1416 	free(pdp_new);
1417 	return(-1);
1418     }
1419 
1420     /* calling the smoothing code here guarantees at most
1421  	 * one smoothing operation per rrd_update call. Unfortunately,
1422 	 * it is possible with bulk updates, or a long-delayed update
1423 	 * for smoothing to occur off-schedule. This really isn't
1424 	 * critical except during the burning cycles. */
1425 	if (schedule_smooth)
1426 	{
1427 	  rrd_file = fopen(filename,"rb+");
1428 
1429 
1430 	  rra_start = rra_begin;
1431 	  for (i = 0; i < rrd.stat_head -> rra_cnt; ++i)
1432 	  {
1433 	    if (cf_conv(rrd.rra_def[i].cf_nam) == CF_DEVSEASONAL ||
1434 		cf_conv(rrd.rra_def[i].cf_nam) == CF_SEASONAL)
1435 	    {
1436 #ifdef DEBUG
1437 	      fprintf(stderr,"Running smoother for rra %ld\n",i);
1438 #endif
1439 	      apply_smoother(&rrd,i,rra_start,rrd_file);
1440 	      if (rrd_test_error())
1441 		break;
1442 	    }
1443 	    rra_start += rrd.rra_def[i].row_cnt
1444 	      *rrd.stat_head->ds_cnt*sizeof(rrd_value_t);
1445 	  }
1446 	  fclose(rrd_file);
1447 	}
1448     rrd_free(&rrd);
1449     free(updvals);
1450     free(tmpl_idx);
1451     free(pdp_new);
1452     free(pdp_temp);
1453     return(0);
1454 }
1455 
1456 /*
1457  * get exclusive lock to whole file.
1458  * lock gets removed when we close the file
1459  *
1460  * returns 0 on success
1461  */
1462 int
LockRRD(FILE * rrdfile)1463 LockRRD(FILE *rrdfile)
1464 {
1465     int	rrd_fd;		/* File descriptor for RRD */
1466     int	rcstat;
1467 
1468     rrd_fd = fileno(rrdfile);
1469 
1470 	{
1471 #if defined(_WIN32) && !defined(__CYGWIN__) && !defined(__CYGWIN32__)
1472     struct _stat st;
1473 
1474     if ( _fstat( rrd_fd, &st ) == 0 ) {
1475 	    rcstat = _locking ( rrd_fd, _LK_NBLCK, st.st_size );
1476     } else {
1477 	    rcstat = -1;
1478     }
1479 #else
1480     struct flock	lock;
1481     lock.l_type = F_WRLCK;    /* exclusive write lock */
1482     lock.l_len = 0;	      /* whole file */
1483     lock.l_start = 0;	      /* start of file */
1484     lock.l_whence = SEEK_SET;   /* end of file */
1485 
1486     rcstat = fcntl(rrd_fd, F_SETLK, &lock);
1487 #endif
1488 	}
1489 
1490     return(rcstat);
1491 }
1492 
1493 
1494 #ifdef HAVE_MMAP
1495 info_t
write_RRA_row(rrd_t * rrd,unsigned long rra_idx,unsigned long * rra_current,unsigned short CDP_scratch_idx,FILE UNUSED (* rrd_file),info_t * pcdp_summary,time_t * rra_time,void * rrd_mmaped_file)1496 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1497 	       unsigned short CDP_scratch_idx,
1498 #ifndef DEBUG
1499 FILE UNUSED(*rrd_file),
1500 #else
1501 FILE *rrd_file,
1502 #endif
1503 		   info_t *pcdp_summary, time_t *rra_time, void *rrd_mmaped_file)
1504 #else
1505 info_t
1506 *write_RRA_row (rrd_t *rrd, unsigned long rra_idx, unsigned long *rra_current,
1507 	       unsigned short CDP_scratch_idx, FILE *rrd_file,
1508 		   info_t *pcdp_summary, time_t *rra_time)
1509 #endif
1510 {
1511    unsigned long ds_idx, cdp_idx;
1512    infoval iv;
1513 
1514    for (ds_idx = 0; ds_idx < rrd -> stat_head -> ds_cnt; ds_idx++)
1515    {
1516       /* compute the cdp index */
1517       cdp_idx =rra_idx * (rrd -> stat_head->ds_cnt) + ds_idx;
1518 #ifdef DEBUG
1519 	  fprintf(stderr,"  -- RRA WRITE VALUE %e, at %ld CF:%s\n",
1520 	     rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val,ftell(rrd_file),
1521 	     rrd -> rra_def[rra_idx].cf_nam);
1522 #endif
1523       if (pcdp_summary != NULL)
1524 	  {
1525 	     iv.u_val = rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val;
1526 	     /* append info to the return hash */
1527 	  	 pcdp_summary = info_push(pcdp_summary,
1528 		 sprintf_alloc("[%d]RRA[%s][%lu]DS[%s]",
1529 		 *rra_time, rrd->rra_def[rra_idx].cf_nam,
1530 		 rrd->rra_def[rra_idx].pdp_cnt, rrd->ds_def[ds_idx].ds_nam),
1531          RD_I_VAL, iv);
1532 	  }
1533 #ifdef HAVE_MMAP
1534 	  memcpy((char *)rrd_mmaped_file + *rra_current,
1535 			  &(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1536 			  sizeof(rrd_value_t));
1537 #else
1538 	  if(fwrite(&(rrd -> cdp_prep[cdp_idx].scratch[CDP_scratch_idx].u_val),
1539 		 sizeof(rrd_value_t),1,rrd_file) != 1)
1540 	  {
1541 	     rrd_set_error("writing rrd");
1542 	     return 0;
1543 	  }
1544 #endif
1545 	  *rra_current += sizeof(rrd_value_t);
1546 	}
1547 	return (pcdp_summary);
1548 }
1549