1 /*-
2  * Copyright 2016 Vsevolod Stakhov
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #include "config.h"
17 #include "rrd.h"
18 #include "util.h"
19 #include "cfg_file.h"
20 #include "logger.h"
21 #include "unix-std.h"
22 #include "cryptobox.h"
23 #include <math.h>
24 
25 #define RSPAMD_RRD_DS_COUNT METRIC_ACTION_MAX
26 #define RSPAMD_RRD_OLD_DS_COUNT 4
27 #define RSPAMD_RRD_RRA_COUNT 4
28 
29 #define msg_err_rrd(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
30         "rrd", file->id, \
31         G_STRFUNC, \
32         __VA_ARGS__)
33 #define msg_warn_rrd(...)   rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
34         "rrd", file->id, \
35         G_STRFUNC, \
36         __VA_ARGS__)
37 #define msg_info_rrd(...)   rspamd_default_log_function (G_LOG_LEVEL_INFO, \
38         "rrd", file->id, \
39         G_STRFUNC, \
40         __VA_ARGS__)
41 #define msg_debug_rrd(...)  rspamd_conditional_debug_fast (NULL, NULL, \
42         rspamd_rrd_log_id, "rrd", file->id, \
43         G_STRFUNC, \
44         __VA_ARGS__)
45 
INIT_LOG_MODULE(rrd)46 INIT_LOG_MODULE(rrd)
47 
48 static GQuark
49 rrd_error_quark (void)
50 {
51 	return g_quark_from_static_string ("rrd-error");
52 }
53 
54 /**
55  * Convert rrd dst type from string to numeric value
56  */
57 enum rrd_dst_type
rrd_dst_from_string(const gchar * str)58 rrd_dst_from_string (const gchar *str)
59 {
60 	if (g_ascii_strcasecmp (str, "counter") == 0) {
61 		return RRD_DST_COUNTER;
62 	}
63 	else if (g_ascii_strcasecmp (str, "absolute") == 0) {
64 		return RRD_DST_ABSOLUTE;
65 	}
66 	else if (g_ascii_strcasecmp (str, "gauge") == 0) {
67 		return RRD_DST_GAUGE;
68 	}
69 	else if (g_ascii_strcasecmp (str, "cdef") == 0) {
70 		return RRD_DST_CDEF;
71 	}
72 	else if (g_ascii_strcasecmp (str, "derive") == 0) {
73 		return RRD_DST_DERIVE;
74 	}
75 
76 	return RRD_DST_INVALID;
77 }
78 
79 /**
80  * Convert numeric presentation of dst to string
81  */
82 const gchar *
rrd_dst_to_string(enum rrd_dst_type type)83 rrd_dst_to_string (enum rrd_dst_type type)
84 {
85 	switch (type) {
86 	case RRD_DST_COUNTER:
87 		return "COUNTER";
88 	case RRD_DST_ABSOLUTE:
89 		return "ABSOLUTE";
90 	case RRD_DST_GAUGE:
91 		return "GAUGE";
92 	case RRD_DST_CDEF:
93 		return "CDEF";
94 	case RRD_DST_DERIVE:
95 		return "DERIVE";
96 	default:
97 		return "U";
98 	}
99 
100 	return "U";
101 }
102 
103 /**
104  * Convert rrd consolidation function type from string to numeric value
105  */
106 enum rrd_cf_type
rrd_cf_from_string(const gchar * str)107 rrd_cf_from_string (const gchar *str)
108 {
109 	if (g_ascii_strcasecmp (str, "average") == 0) {
110 		return RRD_CF_AVERAGE;
111 	}
112 	else if (g_ascii_strcasecmp (str, "minimum") == 0) {
113 		return RRD_CF_MINIMUM;
114 	}
115 	else if (g_ascii_strcasecmp (str, "maximum") == 0) {
116 		return RRD_CF_MAXIMUM;
117 	}
118 	else if (g_ascii_strcasecmp (str, "last") == 0) {
119 		return RRD_CF_LAST;
120 	}
121 	/* XXX: add other CF functions supported by rrd */
122 
123 	return RRD_CF_INVALID;
124 }
125 
126 /**
127  * Convert numeric presentation of cf to string
128  */
129 const gchar *
rrd_cf_to_string(enum rrd_cf_type type)130 rrd_cf_to_string (enum rrd_cf_type type)
131 {
132 	switch (type) {
133 	case RRD_CF_AVERAGE:
134 		return "AVERAGE";
135 	case RRD_CF_MINIMUM:
136 		return "MINIMUM";
137 	case RRD_CF_MAXIMUM:
138 		return "MAXIMUM";
139 	case RRD_CF_LAST:
140 		return "LAST";
141 	default:
142 		return "U";
143 	}
144 
145 	/* XXX: add other CF functions supported by rrd */
146 
147 	return "U";
148 }
149 
150 void
rrd_make_default_rra(const gchar * cf_name,gulong pdp_cnt,gulong rows,struct rrd_rra_def * rra)151 rrd_make_default_rra (const gchar *cf_name,
152 	gulong pdp_cnt,
153 	gulong rows,
154 	struct rrd_rra_def *rra)
155 {
156 	g_assert (cf_name != NULL);
157 	g_assert (rrd_cf_from_string (cf_name) != RRD_CF_INVALID);
158 
159 	rra->pdp_cnt = pdp_cnt;
160 	rra->row_cnt = rows;
161 	rspamd_strlcpy (rra->cf_nam, cf_name, sizeof (rra->cf_nam));
162 	memset (rra->par, 0, sizeof (rra->par));
163 	rra->par[RRA_cdp_xff_val].dv = 0.5;
164 }
165 
166 void
rrd_make_default_ds(const gchar * name,const gchar * type,gulong pdp_step,struct rrd_ds_def * ds)167 rrd_make_default_ds (const gchar *name,
168 		const gchar *type,
169 		gulong pdp_step,
170 		struct rrd_ds_def *ds)
171 {
172 	g_assert (name != NULL);
173 	g_assert (type != NULL);
174 	g_assert (rrd_dst_from_string (type) != RRD_DST_INVALID);
175 
176 	rspamd_strlcpy (ds->ds_nam, name,	   sizeof (ds->ds_nam));
177 	rspamd_strlcpy (ds->dst,	type, sizeof (ds->dst));
178 	memset (ds->par, 0, sizeof (ds->par));
179 	ds->par[RRD_DS_mrhb_cnt].lv = pdp_step * 2;
180 	ds->par[RRD_DS_min_val].dv = NAN;
181 	ds->par[RRD_DS_max_val].dv = NAN;
182 }
183 
184 /**
185  * Check rrd file for correctness (size, cookies, etc)
186  */
187 static gboolean
rspamd_rrd_check_file(const gchar * filename,gboolean need_data,GError ** err)188 rspamd_rrd_check_file (const gchar *filename, gboolean need_data, GError **err)
189 {
190 	gint fd, i;
191 	struct stat st;
192 	struct rrd_file_head head;
193 	struct rrd_rra_def rra;
194 	gint head_size;
195 
196 	fd = open (filename, O_RDWR);
197 	if (fd == -1) {
198 		g_set_error (err,
199 			rrd_error_quark (), errno, "rrd open error: %s", strerror (errno));
200 		return FALSE;
201 	}
202 
203 	if (fstat (fd, &st) == -1) {
204 		g_set_error (err,
205 			rrd_error_quark (), errno, "rrd stat error: %s", strerror (errno));
206 		close (fd);
207 		return FALSE;
208 	}
209 	if (st.st_size < (goffset)sizeof (struct rrd_file_head)) {
210 		/* We have trimmed file */
211 		g_set_error (err, rrd_error_quark (), EINVAL, "rrd size is bad: %ud",
212 			(guint)st.st_size);
213 		close (fd);
214 		return FALSE;
215 	}
216 
217 	/* Try to read header */
218 	if (read (fd, &head, sizeof (head)) != sizeof (head)) {
219 		g_set_error (err,
220 			rrd_error_quark (), errno, "rrd read head error: %s",
221 			strerror (errno));
222 		close (fd);
223 		return FALSE;
224 	}
225 	/* Check magic */
226 	if (memcmp (head.version, RRD_VERSION, sizeof (head.version)) != 0) {
227 		g_set_error (err,
228 				rrd_error_quark (), EINVAL, "rrd head error: bad cookie");
229 		close (fd);
230 		return FALSE;
231 	}
232 	if (head.float_cookie != RRD_FLOAT_COOKIE) {
233 		g_set_error (err,
234 				rrd_error_quark (), EINVAL, "rrd head error: another architecture "
235 						"(file cookie %g != our cookie %g)",
236 				head.float_cookie, RRD_FLOAT_COOKIE);
237 		close (fd);
238 		return FALSE;
239 	}
240 	/* Check for other params */
241 	if (head.ds_cnt <= 0 || head.rra_cnt <= 0) {
242 		g_set_error (err,
243 			rrd_error_quark (), EINVAL, "rrd head cookies error: bad rra or ds count");
244 		close (fd);
245 		return FALSE;
246 	}
247 	/* Now we can calculate the overall size of rrd */
248 	head_size = sizeof (struct rrd_file_head) +
249 		sizeof (struct rrd_ds_def) * head.ds_cnt +
250 		sizeof (struct rrd_rra_def) * head.rra_cnt +
251 		sizeof (struct rrd_live_head) +
252 		sizeof (struct rrd_pdp_prep) * head.ds_cnt +
253 		sizeof (struct rrd_cdp_prep) * head.ds_cnt * head.rra_cnt +
254 		sizeof (struct rrd_rra_ptr) * head.rra_cnt;
255 	if (st.st_size < (goffset)head_size) {
256 		g_set_error (err,
257 			rrd_error_quark (), errno, "rrd file seems to have stripped header: %d",
258 			head_size);
259 		close (fd);
260 		return FALSE;
261 	}
262 
263 	if (need_data) {
264 		/* Now check rra */
265 		if (lseek (fd, sizeof (struct rrd_ds_def) * head.ds_cnt,
266 			SEEK_CUR) == -1) {
267 			g_set_error (err,
268 				rrd_error_quark (), errno, "rrd head lseek error: %s",
269 				strerror (errno));
270 			close (fd);
271 			return FALSE;
272 		}
273 		for (i = 0; i < (gint)head.rra_cnt; i++) {
274 			if (read (fd, &rra, sizeof (rra)) != sizeof (rra)) {
275 				g_set_error (err,
276 					rrd_error_quark (), errno, "rrd read rra error: %s",
277 					strerror (errno));
278 				close (fd);
279 				return FALSE;
280 			}
281 			head_size += rra.row_cnt * head.ds_cnt * sizeof (gdouble);
282 		}
283 
284 		if (st.st_size != head_size) {
285 			g_set_error (err,
286 				rrd_error_quark (), EINVAL, "rrd file seems to have incorrect size: %d, must be %d",
287 				(gint)st.st_size, head_size);
288 			close (fd);
289 			return FALSE;
290 		}
291 	}
292 
293 	close (fd);
294 	return TRUE;
295 }
296 
297 /**
298  * Adjust pointers in mmapped rrd file
299  * @param file
300  */
301 static void
rspamd_rrd_adjust_pointers(struct rspamd_rrd_file * file,gboolean completed)302 rspamd_rrd_adjust_pointers (struct rspamd_rrd_file *file, gboolean completed)
303 {
304 	guint8 *ptr;
305 
306 	ptr = file->map;
307 	file->stat_head = (struct rrd_file_head *)ptr;
308 	ptr += sizeof (struct rrd_file_head);
309 	file->ds_def = (struct rrd_ds_def *)ptr;
310 	ptr += sizeof (struct rrd_ds_def) * file->stat_head->ds_cnt;
311 	file->rra_def = (struct rrd_rra_def *)ptr;
312 	ptr += sizeof (struct rrd_rra_def) * file->stat_head->rra_cnt;
313 	file->live_head = (struct rrd_live_head *)ptr;
314 	ptr += sizeof (struct rrd_live_head);
315 	file->pdp_prep = (struct rrd_pdp_prep *)ptr;
316 	ptr += sizeof (struct rrd_pdp_prep) * file->stat_head->ds_cnt;
317 	file->cdp_prep = (struct rrd_cdp_prep *)ptr;
318 	ptr += sizeof (struct rrd_cdp_prep) * file->stat_head->rra_cnt *
319 		file->stat_head->ds_cnt;
320 	file->rra_ptr = (struct rrd_rra_ptr *)ptr;
321 	if (completed) {
322 		ptr += sizeof (struct rrd_rra_ptr) * file->stat_head->rra_cnt;
323 		file->rrd_value = (gdouble *)ptr;
324 	}
325 	else {
326 		file->rrd_value = NULL;
327 	}
328 }
329 
330 static void
rspamd_rrd_calculate_checksum(struct rspamd_rrd_file * file)331 rspamd_rrd_calculate_checksum (struct rspamd_rrd_file *file)
332 {
333 	guchar sigbuf[rspamd_cryptobox_HASHBYTES];
334 	struct rrd_ds_def *ds;
335 	guint i;
336 	rspamd_cryptobox_hash_state_t st;
337 
338 	if (file->finalized) {
339 		rspamd_cryptobox_hash_init (&st, NULL, 0);
340 		rspamd_cryptobox_hash_update (&st, file->filename, strlen (file->filename));
341 
342 		for (i = 0; i < file->stat_head->ds_cnt; i ++) {
343 			ds = &file->ds_def[i];
344 			rspamd_cryptobox_hash_update (&st, ds->ds_nam, sizeof (ds->ds_nam));
345 		}
346 
347 		rspamd_cryptobox_hash_final (&st, sigbuf);
348 
349 		file->id = rspamd_encode_base32 (sigbuf, sizeof (sigbuf), RSPAMD_BASE32_DEFAULT);
350 	}
351 }
352 
353 static int
rspamd_rrd_open_exclusive(const gchar * filename)354 rspamd_rrd_open_exclusive (const gchar *filename)
355 {
356 	struct timespec sleep_ts = {
357 			.tv_sec = 0,
358 			.tv_nsec = 1000000
359 	};
360 	gint fd;
361 
362 	fd = open (filename, O_RDWR);
363 
364 	if (fd == -1) {
365 		return -1;
366 	}
367 
368 	for (;;) {
369 		if (rspamd_file_lock (fd, TRUE) == -1) {
370 			if (errno == EAGAIN || errno == EWOULDBLOCK) {
371 				nanosleep (&sleep_ts, NULL);
372 				continue;
373 			}
374 			else {
375 				close (fd);
376 				return -1;
377 			}
378 		}
379 		else {
380 			break;
381 		}
382 	}
383 
384 	return fd;
385 };
386 
387 /**
388  * Open completed or incompleted rrd file
389  * @param filename
390  * @param completed
391  * @param err
392  * @return
393  */
394 static struct rspamd_rrd_file *
rspamd_rrd_open_common(const gchar * filename,gboolean completed,GError ** err)395 rspamd_rrd_open_common (const gchar *filename, gboolean completed, GError **err)
396 {
397 	struct rspamd_rrd_file *file;
398 	gint fd;
399 	struct stat st;
400 
401 	if (!rspamd_rrd_check_file (filename, completed, err)) {
402 		return NULL;
403 	}
404 
405 	file = g_malloc0 (sizeof (struct rspamd_rrd_file));
406 
407 	/* Open file */
408 	fd = rspamd_rrd_open_exclusive (filename);
409 	if (fd == -1) {
410 		g_set_error (err,
411 			rrd_error_quark (), errno, "rrd open error: %s", strerror (errno));
412 		g_free (file);
413 		return FALSE;
414 	}
415 
416 	if (fstat (fd, &st) == -1) {
417 		g_set_error (err,
418 			rrd_error_quark (), errno, "rrd stat error: %s", strerror (errno));
419 		rspamd_file_unlock (fd, FALSE);
420 		g_free (file);
421 		close (fd);
422 		return FALSE;
423 	}
424 	/* Mmap file */
425 	file->size = st.st_size;
426 	if ((file->map =
427 			mmap (NULL, st.st_size, PROT_READ | PROT_WRITE,
428 					MAP_SHARED, fd, 0)) == MAP_FAILED) {
429 
430 		rspamd_file_unlock (fd, FALSE);
431 		close (fd);
432 		g_set_error (err,
433 			rrd_error_quark (), ENOMEM, "mmap failed: %s", strerror (errno));
434 		g_free (file);
435 		return NULL;
436 	}
437 
438 	file->fd = fd;
439 
440 	/* Adjust pointers */
441 	rspamd_rrd_adjust_pointers (file, completed);
442 
443 	/* Mark it as finalized */
444 	file->finalized = completed;
445 
446 	file->filename = g_strdup (filename);
447 	rspamd_rrd_calculate_checksum (file);
448 
449 	return file;
450 }
451 
452 /**
453  * Open (and mmap) existing RRD file
454  * @param filename path
455  * @param err error pointer
456  * @return rrd file structure
457  */
458 struct rspamd_rrd_file *
rspamd_rrd_open(const gchar * filename,GError ** err)459 rspamd_rrd_open (const gchar *filename, GError **err)
460 {
461 	struct rspamd_rrd_file *file;
462 
463 	if ((file = rspamd_rrd_open_common (filename, TRUE, err))) {
464 		msg_info_rrd ("rrd file opened: %s", filename);
465 	}
466 
467 	return file;
468 }
469 
470 /**
471  * Create basic header for rrd file
472  * @param filename file path
473  * @param ds_count number of data sources
474  * @param rra_count number of round robin archives
475  * @param pdp_step step of primary data points
476  * @param err error pointer
477  * @return TRUE if file has been created
478  */
479 struct rspamd_rrd_file *
rspamd_rrd_create(const gchar * filename,gulong ds_count,gulong rra_count,gulong pdp_step,gdouble initial_ticks,GError ** err)480 rspamd_rrd_create (const gchar *filename,
481 		gulong ds_count,
482 		gulong rra_count,
483 		gulong pdp_step,
484 		gdouble initial_ticks,
485 		GError **err)
486 {
487 	struct rspamd_rrd_file *new;
488 	struct rrd_file_head head;
489 	struct rrd_ds_def ds;
490 	struct rrd_rra_def rra;
491 	struct rrd_live_head lh;
492 	struct rrd_pdp_prep pdp;
493 	struct rrd_cdp_prep cdp;
494 	struct rrd_rra_ptr rra_ptr;
495 	gint fd;
496 	guint i, j;
497 
498 	/* Open file */
499 	fd = open (filename, O_RDWR | O_CREAT | O_EXCL, 0644);
500 	if (fd == -1) {
501 		g_set_error (err,
502 			rrd_error_quark (), errno, "rrd create error: %s",
503 			strerror (errno));
504 		return NULL;
505 	}
506 
507 	rspamd_file_lock (fd, FALSE);
508 
509 	/* Fill header */
510 	memset (&head, 0, sizeof (head));
511 	head.rra_cnt = rra_count;
512 	head.ds_cnt = ds_count;
513 	head.pdp_step = pdp_step;
514 	memcpy (head.cookie,  RRD_COOKIE,  sizeof (head.cookie));
515 	memcpy (head.version, RRD_VERSION, sizeof (head.version));
516 	head.float_cookie = RRD_FLOAT_COOKIE;
517 
518 	if (write (fd, &head, sizeof (head)) != sizeof (head)) {
519 		rspamd_file_unlock (fd, FALSE);
520 		close (fd);
521 		g_set_error (err,
522 			rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
523 		return NULL;
524 	}
525 
526 	/* Fill DS section */
527 	memset (&ds, 0, sizeof (ds));
528 	memset (&ds.ds_nam, 0, sizeof (ds.ds_nam));
529 	memcpy (&ds.dst, "COUNTER", sizeof ("COUNTER"));
530 	memset (&ds.par, 0, sizeof (ds.par));
531 	for (i = 0; i < ds_count; i++) {
532 		if (write (fd, &ds, sizeof (ds)) != sizeof (ds)) {
533 			rspamd_file_unlock (fd, FALSE);
534 			close (fd);
535 			g_set_error (err,
536 				rrd_error_quark (), errno, "rrd write error: %s",
537 				strerror (errno));
538 			return NULL;
539 		}
540 	}
541 
542 	/* Fill RRA section */
543 	memset (&rra, 0, sizeof (rra));
544 	memcpy (&rra.cf_nam, "AVERAGE", sizeof ("AVERAGE"));
545 	rra.pdp_cnt = 1;
546 	memset (&rra.par, 0, sizeof (rra.par));
547 	for (i = 0; i < rra_count; i++) {
548 		if (write (fd, &rra, sizeof (rra)) != sizeof (rra)) {
549 			rspamd_file_unlock (fd, FALSE);
550 			close (fd);
551 			g_set_error (err,
552 				rrd_error_quark (), errno, "rrd write error: %s",
553 				strerror (errno));
554 			return NULL;
555 		}
556 	}
557 
558 	/* Fill live header */
559 	memset (&lh, 0, sizeof (lh));
560 	lh.last_up = (glong)initial_ticks;
561 	lh.last_up_usec = (glong)((initial_ticks - lh.last_up) * 1e6f);
562 
563 	if (write (fd, &lh, sizeof (lh)) != sizeof (lh)) {
564 		rspamd_file_unlock (fd, FALSE);
565 		close (fd);
566 		g_set_error (err,
567 			rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
568 		return NULL;
569 	}
570 
571 	/* Fill pdp prep */
572 	memset (&pdp, 0, sizeof (pdp));
573 	memcpy (&pdp.last_ds, "U", sizeof ("U"));
574 	memset (&pdp.scratch, 0, sizeof (pdp.scratch));
575 	pdp.scratch[PDP_val].dv = NAN;
576 	pdp.scratch[PDP_unkn_sec_cnt].lv = 0;
577 
578 	for (i = 0; i < ds_count; i++) {
579 		if (write (fd, &pdp, sizeof (pdp)) != sizeof (pdp)) {
580 			rspamd_file_unlock (fd, FALSE);
581 			close (fd);
582 			g_set_error (err,
583 				rrd_error_quark (), errno, "rrd write error: %s",
584 				strerror (errno));
585 			return NULL;
586 		}
587 	}
588 
589 	/* Fill cdp prep */
590 	memset (&cdp, 0, sizeof (cdp));
591 	memset (&cdp.scratch, 0, sizeof (cdp.scratch));
592 	cdp.scratch[CDP_val].dv = NAN;
593 	cdp.scratch[CDP_unkn_pdp_cnt].lv = 0;
594 
595 	for (i = 0; i < rra_count; i++) {
596 		for (j = 0; j < ds_count; j++) {
597 			if (write (fd, &cdp, sizeof (cdp)) != sizeof (cdp)) {
598 				rspamd_file_unlock (fd, FALSE);
599 				close (fd);
600 				g_set_error (err,
601 					rrd_error_quark (), errno, "rrd write error: %s",
602 					strerror (errno));
603 				return NULL;
604 			}
605 		}
606 	}
607 
608 	/* Set row pointers */
609 	memset (&rra_ptr, 0, sizeof (rra_ptr));
610 	for (i = 0; i < rra_count; i++) {
611 		if (write (fd, &rra_ptr, sizeof (rra_ptr)) != sizeof (rra_ptr)) {
612 			rspamd_file_unlock (fd, FALSE);
613 			close (fd);
614 			g_set_error (err,
615 				rrd_error_quark (), errno, "rrd write error: %s",
616 				strerror (errno));
617 			return NULL;
618 		}
619 	}
620 
621 	rspamd_file_unlock (fd, FALSE);
622 	close (fd);
623 
624 	new = rspamd_rrd_open_common (filename, FALSE, err);
625 
626 	return new;
627 }
628 
629 /**
630  * Add data sources to rrd file
631  * @param filename path to file
632  * @param ds array of struct rrd_ds_def
633  * @param err error pointer
634  * @return TRUE if data sources were added
635  */
636 gboolean
rspamd_rrd_add_ds(struct rspamd_rrd_file * file,GArray * ds,GError ** err)637 rspamd_rrd_add_ds (struct rspamd_rrd_file *file, GArray *ds, GError **err)
638 {
639 
640 	if (file == NULL || file->stat_head->ds_cnt * sizeof (struct rrd_ds_def) !=
641 		ds->len) {
642 		g_set_error (err,
643 			rrd_error_quark (), EINVAL, "rrd add ds failed: wrong arguments");
644 		return FALSE;
645 	}
646 
647 	/* Straightforward memcpy */
648 	memcpy (file->ds_def, ds->data, ds->len);
649 
650 	return TRUE;
651 }
652 
653 /**
654  * Add round robin archives to rrd file
655  * @param filename path to file
656  * @param ds array of struct rrd_rra_def
657  * @param err error pointer
658  * @return TRUE if archives were added
659  */
660 gboolean
rspamd_rrd_add_rra(struct rspamd_rrd_file * file,GArray * rra,GError ** err)661 rspamd_rrd_add_rra (struct rspamd_rrd_file *file, GArray *rra, GError **err)
662 {
663 	if (file == NULL || file->stat_head->rra_cnt *
664 		sizeof (struct rrd_rra_def) != rra->len) {
665 		g_set_error (err,
666 			rrd_error_quark (), EINVAL, "rrd add rra failed: wrong arguments");
667 		return FALSE;
668 	}
669 
670 	/* Straightforward memcpy */
671 	memcpy (file->rra_def, rra->data, rra->len);
672 
673 	return TRUE;
674 }
675 
676 /**
677  * Finalize rrd file header and initialize all RRA in the file
678  * @param filename file path
679  * @param err error pointer
680  * @return TRUE if rrd file is ready for use
681  */
682 gboolean
rspamd_rrd_finalize(struct rspamd_rrd_file * file,GError ** err)683 rspamd_rrd_finalize (struct rspamd_rrd_file *file, GError **err)
684 {
685 	gint fd;
686 	guint i;
687 	gint count = 0;
688 	gdouble vbuf[1024];
689 	struct stat st;
690 
691 	if (file == NULL || file->filename == NULL || file->fd == -1) {
692 		g_set_error (err,
693 			rrd_error_quark (), EINVAL, "rrd add rra failed: wrong arguments");
694 		return FALSE;
695 	}
696 
697 	fd = file->fd;
698 
699 	if (lseek (fd, 0, SEEK_END) == -1) {
700 		g_set_error (err,
701 			rrd_error_quark (), errno, "rrd seek error: %s", strerror (errno));
702 		close (fd);
703 		return FALSE;
704 	}
705 
706 	/* Adjust CDP */
707 	for (i = 0; i < file->stat_head->rra_cnt; i++) {
708 		file->cdp_prep->scratch[CDP_unkn_pdp_cnt].lv = 0;
709 		/* Randomize row pointer (disabled) */
710 		/* file->rra_ptr->cur_row = g_random_int () % file->rra_def[i].row_cnt; */
711 		file->rra_ptr->cur_row = file->rra_def[i].row_cnt - 1;
712 		/* Calculate values count */
713 		count += file->rra_def[i].row_cnt * file->stat_head->ds_cnt;
714 	}
715 
716 	munmap (file->map, file->size);
717 	/* Write values */
718 	for (i = 0; i < G_N_ELEMENTS (vbuf); i++) {
719 		vbuf[i] = NAN;
720 	}
721 
722 	while (count > 0) {
723 		/* Write values in buffered matter */
724 		if (write (fd, vbuf,
725 			MIN ((gint)G_N_ELEMENTS (vbuf), count) * sizeof (gdouble)) == -1) {
726 			g_set_error (err,
727 				rrd_error_quark (), errno, "rrd write error: %s",
728 				strerror (errno));
729 			close (fd);
730 			return FALSE;
731 		}
732 		count -= G_N_ELEMENTS (vbuf);
733 	}
734 
735 	if (fstat (fd, &st) == -1) {
736 		g_set_error (err,
737 			rrd_error_quark (), errno, "rrd stat error: %s", strerror (errno));
738 		close (fd);
739 		return FALSE;
740 	}
741 
742 	/* Mmap again */
743 	file->size = st.st_size;
744 	if ((file->map =
745 		mmap (NULL, st.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd,
746 		0)) == MAP_FAILED) {
747 		close (fd);
748 		g_set_error (err,
749 			rrd_error_quark (), ENOMEM, "mmap failed: %s", strerror (errno));
750 
751 		return FALSE;
752 	}
753 
754 	/* Adjust pointers */
755 	rspamd_rrd_adjust_pointers (file, TRUE);
756 
757 	file->finalized = TRUE;
758 	rspamd_rrd_calculate_checksum (file);
759 	msg_info_rrd ("rrd file created: %s", file->filename);
760 
761 	return TRUE;
762 }
763 
764 /**
765  * Update pdp_prep data
766  * @param file rrd file
767  * @param vals new values
768  * @param pdp_new new pdp array
769  * @param interval time elapsed from the last update
770  * @return
771  */
772 static gboolean
rspamd_rrd_update_pdp_prep(struct rspamd_rrd_file * file,gdouble * vals,gdouble * pdp_new,gdouble interval)773 rspamd_rrd_update_pdp_prep (struct rspamd_rrd_file *file,
774 	gdouble *vals,
775 	gdouble *pdp_new,
776 	gdouble interval)
777 {
778 	guint i;
779 	enum rrd_dst_type type;
780 
781 	for (i = 0; i < file->stat_head->ds_cnt; i++) {
782 		type = rrd_dst_from_string (file->ds_def[i].dst);
783 
784 		if (file->ds_def[i].par[RRD_DS_mrhb_cnt].lv < interval) {
785 			rspamd_strlcpy (file->pdp_prep[i].last_ds, "U",
786 					sizeof (file->pdp_prep[i].last_ds));
787 			pdp_new[i] = NAN;
788 			msg_debug_rrd ("adding unknown point interval %.3f is less than heartbeat %l",
789 					interval, file->ds_def[i].par[RRD_DS_mrhb_cnt].lv);
790 		}
791 		else {
792 			switch (type) {
793 			case RRD_DST_COUNTER:
794 			case RRD_DST_DERIVE:
795 				if (file->pdp_prep[i].last_ds[0] == 'U') {
796 					pdp_new[i] = NAN;
797 					msg_debug_rrd ("last point is NaN for point %ud", i);
798 				}
799 				else {
800 					pdp_new[i] = vals[i] - strtod (file->pdp_prep[i].last_ds,
801 							NULL);
802 					msg_debug_rrd ("new PDP %ud, %.3f", i, pdp_new[i]);
803 				}
804 				break;
805 			case RRD_DST_GAUGE:
806 				pdp_new[i] = vals[i] * interval;
807 				msg_debug_rrd ("new PDP %ud, %.3f", i, pdp_new[i]);
808 				break;
809 			case RRD_DST_ABSOLUTE:
810 				pdp_new[i] = vals[i];
811 				msg_debug_rrd ("new PDP %ud, %.3f", i, pdp_new[i]);
812 				break;
813 			default:
814 				return FALSE;
815 			}
816 		}
817 
818 		/* Copy value to the last_ds */
819 		if (!isnan (vals[i])) {
820 			rspamd_snprintf (file->pdp_prep[i].last_ds,
821 				sizeof (file->pdp_prep[i].last_ds), "%.4f", vals[i]);
822 		}
823 		else {
824 			file->pdp_prep[i].last_ds[0] = 'U';
825 			file->pdp_prep[i].last_ds[1] = '\0';
826 		}
827 	}
828 
829 
830 	return TRUE;
831 }
832 
833 /**
834  * Update step for this pdp
835  * @param file
836  * @param pdp_new new pdp array
837  * @param pdp_temp temp pdp array
838  * @param interval time till last update
839  * @param pre_int pre interval
840  * @param post_int post intervall
841  * @param pdp_diff time till last pdp update
842  */
843 static void
rspamd_rrd_update_pdp_step(struct rspamd_rrd_file * file,gdouble * pdp_new,gdouble * pdp_temp,gdouble interval,gulong pdp_diff)844 rspamd_rrd_update_pdp_step (struct rspamd_rrd_file *file,
845 	gdouble *pdp_new,
846 	gdouble *pdp_temp,
847 	gdouble interval,
848 	gulong pdp_diff)
849 {
850 	guint i;
851 	rrd_value_t *scratch;
852 	gulong heartbeat;
853 
854 
855 	for (i = 0; i < file->stat_head->ds_cnt; i++) {
856 		scratch = file->pdp_prep[i].scratch;
857 		heartbeat = file->ds_def[i].par[RRD_DS_mrhb_cnt].lv;
858 
859 		if (!isnan (pdp_new[i])) {
860 			if (isnan (scratch[PDP_val].dv)) {
861 				scratch[PDP_val].dv = 0;
862 			}
863 		}
864 
865 		/* Check interval value for heartbeat for this DS */
866 		if ((interval > heartbeat) ||
867 			(file->stat_head->pdp_step / 2.0 < scratch[PDP_unkn_sec_cnt].lv)) {
868 			pdp_temp[i] = NAN;
869 		}
870 		else {
871 			pdp_temp[i] = scratch[PDP_val].dv /
872 				((double) (pdp_diff - scratch[PDP_unkn_sec_cnt].lv));
873 		}
874 
875 		if (isnan (pdp_new[i])) {
876 			scratch[PDP_unkn_sec_cnt].lv = interval;
877 			scratch[PDP_val].dv = NAN;
878 		} else {
879 			scratch[PDP_unkn_sec_cnt].lv = 0;
880 			scratch[PDP_val].dv = pdp_new[i] / interval;
881 		}
882 
883 		msg_debug_rrd ("new temp PDP %ud, %.3f -> %.3f, scratch: %3f",
884 				i, pdp_new[i], pdp_temp[i],
885 				scratch[PDP_val].dv);
886 	}
887 }
888 
889 /**
890  * Update CDP for this rra
891  * @param file rrd file
892  * @param pdp_steps how much pdp steps elapsed from the last update
893  * @param pdp_offset offset from pdp
894  * @param rra_steps how much steps must be updated for this rra
895  * @param rra_index index of desired rra
896  * @param pdp_temp temporary pdp points
897  */
898 static void
rspamd_rrd_update_cdp(struct rspamd_rrd_file * file,gdouble pdp_steps,gdouble pdp_offset,gulong * rra_steps,gulong rra_index,gdouble * pdp_temp)899 rspamd_rrd_update_cdp (struct rspamd_rrd_file *file,
900 	gdouble pdp_steps,
901 	gdouble pdp_offset,
902 	gulong *rra_steps,
903 	gulong rra_index,
904 	gdouble *pdp_temp)
905 {
906 	guint i;
907 	struct rrd_rra_def *rra;
908 	rrd_value_t *scratch;
909 	enum rrd_cf_type cf;
910 	gdouble last_cdp = INFINITY, cur_cdp = INFINITY;
911 	gulong pdp_in_cdp;
912 
913 	rra = &file->rra_def[rra_index];
914 	cf = rrd_cf_from_string (rra->cf_nam);
915 
916 	/* Iterate over all DS for this RRA */
917 	for (i = 0; i < file->stat_head->ds_cnt; i++) {
918 		/* Get CDP for this RRA and DS */
919 		scratch =
920 			file->cdp_prep[rra_index * file->stat_head->ds_cnt + i].scratch;
921 		if (rra->pdp_cnt > 1) {
922 			/* Do we have any CDP to update for this rra ? */
923 			if (rra_steps[rra_index] > 0) {
924 
925 				if (isnan (pdp_temp[i])) {
926 					/* New pdp is nan */
927 					/* Increment unknown points count */
928 					scratch[CDP_unkn_pdp_cnt].lv += pdp_offset;
929 					/* Reset secondary value */
930 					scratch[CDP_secondary_val].dv = NAN;
931 				}
932 				else {
933 					scratch[CDP_secondary_val].dv = pdp_temp[i];
934 				}
935 
936 				/* Check XFF for this rra */
937 				if (scratch[CDP_unkn_pdp_cnt].lv > rra->pdp_cnt *
938 					rra->par[RRA_cdp_xff_val].lv) {
939 					/* XFF is reached */
940 					scratch[CDP_primary_val].dv = NAN;
941 				}
942 				else {
943 					/* Need to initialize CDP using specified consolidation */
944 					switch (cf) {
945 					case RRD_CF_AVERAGE:
946 						last_cdp =
947 							isnan (scratch[CDP_val].dv) ? 0.0 : scratch[CDP_val]
948 							.dv;
949 						cur_cdp = isnan (pdp_temp[i]) ? 0.0 : pdp_temp[i];
950 						scratch[CDP_primary_val].dv =
951 							(last_cdp + cur_cdp *
952 							pdp_offset) /
953 							(rra->pdp_cnt - scratch[CDP_unkn_pdp_cnt].lv);
954 						break;
955 					case RRD_CF_MAXIMUM:
956 						last_cdp =
957 							isnan (scratch[CDP_val].dv) ? -INFINITY : scratch[
958 							CDP_val].dv;
959 						cur_cdp = isnan (pdp_temp[i]) ? -INFINITY : pdp_temp[i];
960 						scratch[CDP_primary_val].dv = MAX (last_cdp, cur_cdp);
961 						break;
962 					case RRD_CF_MINIMUM:
963 						last_cdp =
964 							isnan (scratch[CDP_val].dv) ? INFINITY : scratch[
965 							CDP_val].dv;
966 						cur_cdp = isnan (pdp_temp[i]) ? INFINITY : pdp_temp[i];
967 						scratch[CDP_primary_val].dv = MIN (last_cdp, cur_cdp);
968 						break;
969 					case RRD_CF_LAST:
970 					default:
971 						scratch[CDP_primary_val].dv = pdp_temp[i];
972 						last_cdp = INFINITY;
973 						break;
974 					}
975 				}
976 
977 				/* Init carry of this CDP */
978 				pdp_in_cdp = (pdp_steps - pdp_offset) / rra->pdp_cnt;
979 				if (pdp_in_cdp == 0 || isnan (pdp_temp[i])) {
980 					/* Set overflow */
981 					switch (cf) {
982 					case RRD_CF_AVERAGE:
983 						scratch[CDP_val].dv = 0;
984 						break;
985 					case RRD_CF_MAXIMUM:
986 						scratch[CDP_val].dv = -INFINITY;
987 						break;
988 					case RRD_CF_MINIMUM:
989 						scratch[CDP_val].dv = INFINITY;
990 						break;
991 					default:
992 						scratch[CDP_val].dv = NAN;
993 						break;
994 					}
995 				}
996 				else {
997 					/* Special carry for average */
998 					if (cf == RRD_CF_AVERAGE) {
999 						scratch[CDP_val].dv = pdp_temp[i] * pdp_in_cdp;
1000 					}
1001 					else {
1002 						scratch[CDP_val].dv = pdp_temp[i];
1003 					}
1004 				}
1005 
1006 				scratch[CDP_unkn_pdp_cnt].lv = 0;
1007 
1008 				msg_debug_rrd ("update cdp for DS %d with value %.3f, "
1009 						"stored value: %.3f, carry: %.3f",
1010 						i, last_cdp,
1011 						scratch[CDP_primary_val].dv, scratch[CDP_val].dv);
1012 			}
1013 			/* In this case we just need to update cdp_prep for this RRA */
1014 			else {
1015 				if (isnan (pdp_temp[i])) {
1016 					/* Just increase undefined zone */
1017 					scratch[CDP_unkn_pdp_cnt].lv += pdp_steps;
1018 				}
1019 				else {
1020 					/* Calculate cdp value */
1021 					last_cdp = scratch[CDP_val].dv;
1022 					switch (cf) {
1023 					case RRD_CF_AVERAGE:
1024 						if (isnan (last_cdp)) {
1025 							scratch[CDP_val].dv = pdp_temp[i] * pdp_steps;
1026 						}
1027 						else {
1028 							scratch[CDP_val].dv = last_cdp + pdp_temp[i] *
1029 								pdp_steps;
1030 						}
1031 						break;
1032 					case RRD_CF_MAXIMUM:
1033 						scratch[CDP_val].dv = MAX (last_cdp, pdp_temp[i]);
1034 						break;
1035 					case RRD_CF_MINIMUM:
1036 						scratch[CDP_val].dv = MIN (last_cdp, pdp_temp[i]);
1037 						break;
1038 					case RRD_CF_LAST:
1039 						scratch[CDP_val].dv = pdp_temp[i];
1040 						break;
1041 					default:
1042 						scratch[CDP_val].dv = NAN;
1043 						break;
1044 					}
1045 				}
1046 
1047 				msg_debug_rrd ("aggregate cdp %d with pdp %.3f, "
1048 						"stored value: %.3f",
1049 						i, pdp_temp[i], scratch[CDP_val].dv);
1050 			}
1051 		}
1052 		else {
1053 			/* We have nothing to consolidate, but we may miss some pdp */
1054 			if (pdp_steps > 2) {
1055 				/* Just write PDP value */
1056 				scratch[CDP_primary_val].dv = pdp_temp[i];
1057 				scratch[CDP_secondary_val].dv = pdp_temp[i];
1058 			}
1059 		}
1060 	}
1061 }
1062 
1063 /**
1064  * Update RRA in a file
1065  * @param file rrd file
1066  * @param rra_steps steps for each rra
1067  * @param now current time
1068  */
1069 void
rspamd_rrd_write_rra(struct rspamd_rrd_file * file,gulong * rra_steps)1070 rspamd_rrd_write_rra (struct rspamd_rrd_file *file, gulong *rra_steps)
1071 {
1072 	guint i, j, ds_cnt;
1073 	struct rrd_rra_def *rra;
1074 	struct rrd_cdp_prep *cdp;
1075 	gdouble *rra_row = file->rrd_value, *cur_row;
1076 
1077 
1078 	ds_cnt = file->stat_head->ds_cnt;
1079 	/* Iterate over all RRA */
1080 	for (i = 0; i < file->stat_head->rra_cnt; i++) {
1081 		rra = &file->rra_def[i];
1082 
1083 		if (rra_steps[i] > 0) {
1084 
1085 			/* Move row ptr */
1086 			if (++file->rra_ptr[i].cur_row >= rra->row_cnt) {
1087 				file->rra_ptr[i].cur_row = 0;
1088 			}
1089 			/* Calculate seek */
1090 			cdp = &file->cdp_prep[ds_cnt * i];
1091 			cur_row = rra_row + ds_cnt * file->rra_ptr[i].cur_row;
1092 			/* Iterate over DS */
1093 			for (j = 0; j < ds_cnt; j++) {
1094 				cur_row[j] = cdp[j].scratch[CDP_primary_val].dv;
1095 				msg_debug_rrd ("write cdp %d: %.3f", j, cur_row[j]);
1096 			}
1097 		}
1098 
1099 		rra_row += rra->row_cnt * ds_cnt;
1100 	}
1101 }
1102 
1103 /**
1104  * Add record to rrd file
1105  * @param file rrd file object
1106  * @param points points (must be row suitable for this RRA, depending on ds count)
1107  * @param err error pointer
1108  * @return TRUE if a row has been added
1109  */
1110 gboolean
rspamd_rrd_add_record(struct rspamd_rrd_file * file,GArray * points,gdouble ticks,GError ** err)1111 rspamd_rrd_add_record (struct rspamd_rrd_file *file,
1112 		GArray *points,
1113 		gdouble ticks,
1114 		GError **err)
1115 {
1116 	gdouble interval, *pdp_new, *pdp_temp;
1117 	guint i;
1118 	glong seconds, microseconds;
1119 	gulong pdp_steps, cur_pdp_count, prev_pdp_step, cur_pdp_step,
1120 		prev_pdp_age, cur_pdp_age, *rra_steps, pdp_offset;
1121 
1122 	if (file == NULL || file->stat_head->ds_cnt * sizeof (gdouble) !=
1123 		points->len) {
1124 		g_set_error (err,
1125 			rrd_error_quark (), EINVAL,
1126 			"rrd add points failed: wrong arguments");
1127 		return FALSE;
1128 	}
1129 
1130 	/* Get interval */
1131 	seconds = (glong)ticks;
1132 	microseconds = (glong)((ticks - seconds) * 1000000.);
1133 	interval = ticks - ((gdouble)file->live_head->last_up +
1134 			file->live_head->last_up_usec / 1000000.);
1135 
1136 	msg_debug_rrd ("update rrd record after %.3f seconds", interval);
1137 
1138 	/* Update PDP preparation values */
1139 	pdp_new = g_malloc0 (sizeof (gdouble) * file->stat_head->ds_cnt);
1140 	pdp_temp = g_malloc0 (sizeof (gdouble) * file->stat_head->ds_cnt);
1141 	/* How much steps need to be updated in each RRA */
1142 	rra_steps = g_malloc0 (sizeof (gulong) * file->stat_head->rra_cnt);
1143 
1144 	if (!rspamd_rrd_update_pdp_prep (file, (gdouble *)points->data, pdp_new,
1145 		interval)) {
1146 		g_set_error (err,
1147 			rrd_error_quark (), EINVAL,
1148 			"rrd update pdp failed: wrong arguments");
1149 		g_free (pdp_new);
1150 		g_free (pdp_temp);
1151 		g_free (rra_steps);
1152 		return FALSE;
1153 	}
1154 
1155 	/* Calculate elapsed steps */
1156 	/* Age in seconds for previous pdp store */
1157 	prev_pdp_age = file->live_head->last_up % file->stat_head->pdp_step;
1158 	/* Time in seconds for last pdp update */
1159 	prev_pdp_step = file->live_head->last_up - prev_pdp_age;
1160 	/* Age in seconds from current time to required pdp time */
1161 	cur_pdp_age = seconds % file->stat_head->pdp_step;
1162 	/* Time of desired pdp step */
1163 	cur_pdp_step = seconds - cur_pdp_age;
1164 	cur_pdp_count = cur_pdp_step / file->stat_head->pdp_step;
1165 	pdp_steps = (cur_pdp_step - prev_pdp_step) / file->stat_head->pdp_step;
1166 
1167 
1168 	if (pdp_steps == 0) {
1169 		/* Simple update of pdp prep */
1170 		for (i = 0; i < file->stat_head->ds_cnt; i++) {
1171 			if (isnan (pdp_new[i])) {
1172 				/* Increment unknown period */
1173 				file->pdp_prep[i].scratch[PDP_unkn_sec_cnt].lv += floor (
1174 					interval);
1175 			}
1176 			else {
1177 				if (isnan (file->pdp_prep[i].scratch[PDP_val].dv)) {
1178 					/* Reset pdp to the current value */
1179 					file->pdp_prep[i].scratch[PDP_val].dv = pdp_new[i];
1180 				}
1181 				else {
1182 					/* Increment pdp value */
1183 					file->pdp_prep[i].scratch[PDP_val].dv += pdp_new[i];
1184 				}
1185 			}
1186 		}
1187 	}
1188 	else {
1189 		/* Complex update of PDP, CDP and RRA */
1190 
1191 		/* Update PDP for this step */
1192 		rspamd_rrd_update_pdp_step (file,
1193 			pdp_new,
1194 			pdp_temp,
1195 			interval,
1196 			pdp_steps * file->stat_head->pdp_step);
1197 
1198 
1199 		/* Update CDP points for each RRA*/
1200 		for (i = 0; i < file->stat_head->rra_cnt; i++) {
1201 			/* Calculate pdp offset for this RRA */
1202 			pdp_offset = file->rra_def[i].pdp_cnt - cur_pdp_count %
1203 				file->rra_def[i].pdp_cnt;
1204 			/* How much steps we got for this RRA */
1205 			if (pdp_offset <= pdp_steps) {
1206 				rra_steps[i] =
1207 					(pdp_steps - pdp_offset) / file->rra_def[i].pdp_cnt + 1;
1208 			}
1209 			else {
1210 				/* This rra have not passed enough pdp steps */
1211 				rra_steps[i] = 0;
1212 			}
1213 
1214 			msg_debug_rrd ("cdp: %ud, rra steps: %ul(%ul), pdp steps: %ul",
1215 					i, rra_steps[i], pdp_offset, pdp_steps);
1216 
1217 			/* Update this specific CDP */
1218 			rspamd_rrd_update_cdp (file,
1219 				pdp_steps,
1220 				pdp_offset,
1221 				rra_steps,
1222 				i,
1223 				pdp_temp);
1224 
1225 		}
1226 
1227 		/* Write RRA */
1228 		rspamd_rrd_write_rra (file, rra_steps);
1229 	}
1230 	file->live_head->last_up = seconds;
1231 	file->live_head->last_up_usec = microseconds;
1232 
1233 	/* Sync and invalidate */
1234 	msync (file->map, file->size, MS_ASYNC | MS_INVALIDATE);
1235 
1236 	g_free (pdp_new);
1237 	g_free (pdp_temp);
1238 	g_free (rra_steps);
1239 
1240 	return TRUE;
1241 }
1242 
1243 /**
1244  * Close rrd file
1245  * @param file
1246  * @return
1247  */
1248 gint
rspamd_rrd_close(struct rspamd_rrd_file * file)1249 rspamd_rrd_close (struct rspamd_rrd_file * file)
1250 {
1251 	if (file == NULL) {
1252 		errno = EINVAL;
1253 		return -1;
1254 	}
1255 
1256 	munmap (file->map, file->size);
1257 	close (file->fd);
1258 	g_free (file->filename);
1259 	g_free (file->id);
1260 
1261 	g_free (file);
1262 
1263 	return 0;
1264 }
1265 
1266 static struct rspamd_rrd_file *
rspamd_rrd_create_file(const gchar * path,gboolean finalize,GError ** err)1267 rspamd_rrd_create_file (const gchar *path, gboolean finalize, GError **err)
1268 {
1269 	struct rspamd_rrd_file *file;
1270 	struct rrd_ds_def ds[RSPAMD_RRD_DS_COUNT];
1271 	struct rrd_rra_def rra[RSPAMD_RRD_RRA_COUNT];
1272 	gint i;
1273 	GArray ar;
1274 
1275 	/* Try to create new rrd file */
1276 
1277 	file = rspamd_rrd_create (path, RSPAMD_RRD_DS_COUNT, RSPAMD_RRD_RRA_COUNT,
1278 			1, rspamd_get_calendar_ticks (), err);
1279 
1280 	if (file == NULL) {
1281 		return NULL;
1282 	}
1283 
1284 	/* Create DS and RRA */
1285 
1286 	for (i = METRIC_ACTION_REJECT; i < METRIC_ACTION_MAX; i ++) {
1287 		rrd_make_default_ds (rspamd_action_to_str (i),
1288 				rrd_dst_to_string (RRD_DST_COUNTER), 1, &ds[i]);
1289 	}
1290 
1291 	ar.data = (gchar *)ds;
1292 	ar.len = sizeof (ds);
1293 
1294 	if (!rspamd_rrd_add_ds (file, &ar, err)) {
1295 		rspamd_rrd_close (file);
1296 		return NULL;
1297 	}
1298 
1299 	/* Once per minute for 1 day */
1300 	rrd_make_default_rra (rrd_cf_to_string (RRD_CF_AVERAGE),
1301 			60, 24 * 60, &rra[0]);
1302 	/* Once per 5 minutes for 1 week */
1303 	rrd_make_default_rra (rrd_cf_to_string (RRD_CF_AVERAGE),
1304 			5 * 60, 7 * 24 * 60 / 5, &rra[1]);
1305 	/* Once per 10 mins for 1 month */
1306 	rrd_make_default_rra (rrd_cf_to_string (RRD_CF_AVERAGE),
1307 			60 * 10, 30 * 24 * 6, &rra[2]);
1308 	/* Once per hour for 1 year */
1309 	rrd_make_default_rra (rrd_cf_to_string (RRD_CF_AVERAGE),
1310 			60 * 60, 365 * 24, &rra[3]);
1311 	ar.data = (gchar *)rra;
1312 	ar.len = sizeof (rra);
1313 
1314 	if (!rspamd_rrd_add_rra (file, &ar, err)) {
1315 		rspamd_rrd_close (file);
1316 		return NULL;
1317 	}
1318 
1319 	if (finalize && !rspamd_rrd_finalize (file, err)) {
1320 		rspamd_rrd_close (file);
1321 		return NULL;
1322 	}
1323 
1324 	return file;
1325 }
1326 
1327 static void
rspamd_rrd_convert_ds(struct rspamd_rrd_file * old,struct rspamd_rrd_file * cur,gint idx_old,gint idx_new)1328 rspamd_rrd_convert_ds (struct rspamd_rrd_file *old,
1329 		 struct rspamd_rrd_file *cur, gint idx_old, gint idx_new)
1330 {
1331 	struct rrd_pdp_prep *pdp_prep_old, *pdp_prep_new;
1332 	struct rrd_cdp_prep *cdp_prep_old, *cdp_prep_new;
1333 	gdouble *val_old, *val_new;
1334 	gulong rra_cnt, i, j, points_cnt, old_ds, new_ds;
1335 
1336 	rra_cnt = old->stat_head->rra_cnt;
1337 	pdp_prep_old = &old->pdp_prep[idx_old];
1338 	pdp_prep_new = &cur->pdp_prep[idx_new];
1339 	memcpy (pdp_prep_new, pdp_prep_old, sizeof (*pdp_prep_new));
1340 	val_old = old->rrd_value;
1341 	val_new = cur->rrd_value;
1342 	old_ds = old->stat_head->ds_cnt;
1343 	new_ds = cur->stat_head->ds_cnt;
1344 
1345 	for (i = 0; i < rra_cnt; i++) {
1346 		cdp_prep_old = &old->cdp_prep[i * old_ds] + idx_old;
1347 		cdp_prep_new = &cur->cdp_prep[i * new_ds] + idx_new;
1348 		memcpy (cdp_prep_new, cdp_prep_old, sizeof (*cdp_prep_new));
1349 		points_cnt = old->rra_def[i].row_cnt;
1350 
1351 		for (j = 0; j < points_cnt; j ++) {
1352 			val_new[j * new_ds + idx_new] = val_old[j * old_ds + idx_old];
1353 		}
1354 
1355 		val_new += points_cnt * new_ds;
1356 		val_old += points_cnt * old_ds;
1357 	}
1358 }
1359 
1360 static struct rspamd_rrd_file *
rspamd_rrd_convert(const gchar * path,struct rspamd_rrd_file * old,GError ** err)1361 rspamd_rrd_convert (const gchar *path, struct rspamd_rrd_file *old,
1362 		GError **err)
1363 {
1364 	struct rspamd_rrd_file *rrd;
1365 	gchar tpath[PATH_MAX];
1366 
1367 	g_assert (old != NULL);
1368 
1369 	rspamd_snprintf (tpath, sizeof (tpath), "%s.new", path);
1370 	rrd = rspamd_rrd_create_file (tpath, TRUE, err);
1371 
1372 	if (rrd) {
1373 		/* Copy old data */
1374 		memcpy (rrd->live_head, old->live_head, sizeof (*rrd->live_head));
1375 		memcpy (rrd->rra_ptr, old->rra_ptr,
1376 				sizeof (*old->rra_ptr) * rrd->stat_head->rra_cnt);
1377 
1378 		/*
1379 		 * Old DSes:
1380 		 * 0 - spam -> reject
1381 		 * 1 - probable spam -> add header
1382 		 * 2 - greylist -> greylist
1383 		 * 3 - ham -> ham
1384 		 */
1385 		rspamd_rrd_convert_ds (old, rrd, 0, METRIC_ACTION_REJECT);
1386 		rspamd_rrd_convert_ds (old, rrd, 1, METRIC_ACTION_ADD_HEADER);
1387 		rspamd_rrd_convert_ds (old, rrd, 2, METRIC_ACTION_GREYLIST);
1388 		rspamd_rrd_convert_ds (old, rrd, 3, METRIC_ACTION_NOACTION);
1389 
1390 		if (unlink (path) == -1) {
1391 			g_set_error (err, rrd_error_quark (), errno, "cannot unlink old rrd file %s: %s",
1392 					path, strerror (errno));
1393 			unlink (tpath);
1394 			rspamd_rrd_close (rrd);
1395 
1396 			return NULL;
1397 		}
1398 
1399 		if (rename (tpath, path) == -1) {
1400 			g_set_error (err, rrd_error_quark (), errno, "cannot rename old rrd file %s: %s",
1401 					path, strerror (errno));
1402 			unlink (tpath);
1403 			rspamd_rrd_close (rrd);
1404 
1405 			return NULL;
1406 		}
1407 	}
1408 
1409 	return rrd;
1410 }
1411 
1412 struct rspamd_rrd_file *
rspamd_rrd_file_default(const gchar * path,GError ** err)1413 rspamd_rrd_file_default (const gchar *path,
1414 		GError **err)
1415 {
1416 	struct rspamd_rrd_file *file, *nf;
1417 
1418 	g_assert (path != NULL);
1419 
1420 	if (access (path, R_OK) != -1) {
1421 		/* We can open rrd file */
1422 		file = rspamd_rrd_open (path, err);
1423 
1424 		if (file == NULL) {
1425 			return NULL;
1426 		}
1427 
1428 
1429 		if (file->stat_head->rra_cnt != RSPAMD_RRD_RRA_COUNT) {
1430 			msg_err_rrd ("rrd file is not suitable for rspamd: it has "
1431 					"%ul ds and %ul rra", file->stat_head->ds_cnt,
1432 					file->stat_head->rra_cnt);
1433 			g_set_error (err, rrd_error_quark (), EINVAL, "bad rrd file");
1434 			rspamd_rrd_close (file);
1435 
1436 			return NULL;
1437 		}
1438 		else if (file->stat_head->ds_cnt == RSPAMD_RRD_OLD_DS_COUNT) {
1439 			/* Old rrd, need to convert */
1440 			msg_info_rrd ("rrd file %s is not suitable for rspamd, convert it",
1441 					path);
1442 
1443 			nf = rspamd_rrd_convert (path, file, err);
1444 			rspamd_rrd_close (file);
1445 
1446 			return nf;
1447 		}
1448 		else if (file->stat_head->ds_cnt == RSPAMD_RRD_DS_COUNT) {
1449 			return file;
1450 		}
1451 		else {
1452 			msg_err_rrd ("rrd file is not suitable for rspamd: it has "
1453 					"%ul ds and %ul rra", file->stat_head->ds_cnt,
1454 					file->stat_head->rra_cnt);
1455 			g_set_error (err, rrd_error_quark (), EINVAL, "bad rrd file");
1456 			rspamd_rrd_close (file);
1457 
1458 			return NULL;
1459 		}
1460 	}
1461 
1462 	file = rspamd_rrd_create_file (path, TRUE, err);
1463 
1464 	return file;
1465 }
1466 
1467 struct rspamd_rrd_query_result *
rspamd_rrd_query(struct rspamd_rrd_file * file,gulong rra_num)1468 rspamd_rrd_query (struct rspamd_rrd_file *file,
1469 		gulong rra_num)
1470 {
1471 	struct rspamd_rrd_query_result *res;
1472 	struct rrd_rra_def *rra;
1473 	const gdouble *rra_offset = NULL;
1474 	guint i;
1475 
1476 	g_assert (file != NULL);
1477 
1478 
1479 	if (rra_num > file->stat_head->rra_cnt) {
1480 		msg_err_rrd ("requested unexisting rra: %l", rra_num);
1481 
1482 		return NULL;
1483 	}
1484 
1485 	res = g_malloc0 (sizeof (*res));
1486 	res->ds_count = file->stat_head->ds_cnt;
1487 	res->last_update = (gdouble)file->live_head->last_up +
1488 			((gdouble)file->live_head->last_up_usec / 1e6f);
1489 	res->pdp_per_cdp = file->rra_def[rra_num].pdp_cnt;
1490 	res->rra_rows = file->rra_def[rra_num].row_cnt;
1491 	rra_offset = file->rrd_value;
1492 
1493 	for (i = 0; i < file->stat_head->rra_cnt; i++) {
1494 		rra = &file->rra_def[i];
1495 
1496 		if (i == rra_num) {
1497 			res->cur_row = file->rra_ptr[i].cur_row % rra->row_cnt;
1498 			break;
1499 		}
1500 
1501 		rra_offset += rra->row_cnt * res->ds_count;
1502 	}
1503 
1504 	res->data = rra_offset;
1505 
1506 	return res;
1507 }
1508