1 /*-
2 * Copyright 2016 Vsevolod Stakhov
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 #include "config.h"
17 #include "rrd.h"
18 #include "util.h"
19 #include "cfg_file.h"
20 #include "logger.h"
21 #include "unix-std.h"
22 #include "cryptobox.h"
23 #include <math.h>
24
25 #define RSPAMD_RRD_DS_COUNT METRIC_ACTION_MAX
26 #define RSPAMD_RRD_OLD_DS_COUNT 4
27 #define RSPAMD_RRD_RRA_COUNT 4
28
29 #define msg_err_rrd(...) rspamd_default_log_function (G_LOG_LEVEL_CRITICAL, \
30 "rrd", file->id, \
31 G_STRFUNC, \
32 __VA_ARGS__)
33 #define msg_warn_rrd(...) rspamd_default_log_function (G_LOG_LEVEL_WARNING, \
34 "rrd", file->id, \
35 G_STRFUNC, \
36 __VA_ARGS__)
37 #define msg_info_rrd(...) rspamd_default_log_function (G_LOG_LEVEL_INFO, \
38 "rrd", file->id, \
39 G_STRFUNC, \
40 __VA_ARGS__)
41 #define msg_debug_rrd(...) rspamd_conditional_debug_fast (NULL, NULL, \
42 rspamd_rrd_log_id, "rrd", file->id, \
43 G_STRFUNC, \
44 __VA_ARGS__)
45
INIT_LOG_MODULE(rrd)46 INIT_LOG_MODULE(rrd)
47
48 static GQuark
49 rrd_error_quark (void)
50 {
51 return g_quark_from_static_string ("rrd-error");
52 }
53
54 /**
55 * Convert rrd dst type from string to numeric value
56 */
57 enum rrd_dst_type
rrd_dst_from_string(const gchar * str)58 rrd_dst_from_string (const gchar *str)
59 {
60 if (g_ascii_strcasecmp (str, "counter") == 0) {
61 return RRD_DST_COUNTER;
62 }
63 else if (g_ascii_strcasecmp (str, "absolute") == 0) {
64 return RRD_DST_ABSOLUTE;
65 }
66 else if (g_ascii_strcasecmp (str, "gauge") == 0) {
67 return RRD_DST_GAUGE;
68 }
69 else if (g_ascii_strcasecmp (str, "cdef") == 0) {
70 return RRD_DST_CDEF;
71 }
72 else if (g_ascii_strcasecmp (str, "derive") == 0) {
73 return RRD_DST_DERIVE;
74 }
75
76 return RRD_DST_INVALID;
77 }
78
79 /**
80 * Convert numeric presentation of dst to string
81 */
82 const gchar *
rrd_dst_to_string(enum rrd_dst_type type)83 rrd_dst_to_string (enum rrd_dst_type type)
84 {
85 switch (type) {
86 case RRD_DST_COUNTER:
87 return "COUNTER";
88 case RRD_DST_ABSOLUTE:
89 return "ABSOLUTE";
90 case RRD_DST_GAUGE:
91 return "GAUGE";
92 case RRD_DST_CDEF:
93 return "CDEF";
94 case RRD_DST_DERIVE:
95 return "DERIVE";
96 default:
97 return "U";
98 }
99
100 return "U";
101 }
102
103 /**
104 * Convert rrd consolidation function type from string to numeric value
105 */
106 enum rrd_cf_type
rrd_cf_from_string(const gchar * str)107 rrd_cf_from_string (const gchar *str)
108 {
109 if (g_ascii_strcasecmp (str, "average") == 0) {
110 return RRD_CF_AVERAGE;
111 }
112 else if (g_ascii_strcasecmp (str, "minimum") == 0) {
113 return RRD_CF_MINIMUM;
114 }
115 else if (g_ascii_strcasecmp (str, "maximum") == 0) {
116 return RRD_CF_MAXIMUM;
117 }
118 else if (g_ascii_strcasecmp (str, "last") == 0) {
119 return RRD_CF_LAST;
120 }
121 /* XXX: add other CF functions supported by rrd */
122
123 return RRD_CF_INVALID;
124 }
125
126 /**
127 * Convert numeric presentation of cf to string
128 */
129 const gchar *
rrd_cf_to_string(enum rrd_cf_type type)130 rrd_cf_to_string (enum rrd_cf_type type)
131 {
132 switch (type) {
133 case RRD_CF_AVERAGE:
134 return "AVERAGE";
135 case RRD_CF_MINIMUM:
136 return "MINIMUM";
137 case RRD_CF_MAXIMUM:
138 return "MAXIMUM";
139 case RRD_CF_LAST:
140 return "LAST";
141 default:
142 return "U";
143 }
144
145 /* XXX: add other CF functions supported by rrd */
146
147 return "U";
148 }
149
150 void
rrd_make_default_rra(const gchar * cf_name,gulong pdp_cnt,gulong rows,struct rrd_rra_def * rra)151 rrd_make_default_rra (const gchar *cf_name,
152 gulong pdp_cnt,
153 gulong rows,
154 struct rrd_rra_def *rra)
155 {
156 g_assert (cf_name != NULL);
157 g_assert (rrd_cf_from_string (cf_name) != RRD_CF_INVALID);
158
159 rra->pdp_cnt = pdp_cnt;
160 rra->row_cnt = rows;
161 rspamd_strlcpy (rra->cf_nam, cf_name, sizeof (rra->cf_nam));
162 memset (rra->par, 0, sizeof (rra->par));
163 rra->par[RRA_cdp_xff_val].dv = 0.5;
164 }
165
166 void
rrd_make_default_ds(const gchar * name,const gchar * type,gulong pdp_step,struct rrd_ds_def * ds)167 rrd_make_default_ds (const gchar *name,
168 const gchar *type,
169 gulong pdp_step,
170 struct rrd_ds_def *ds)
171 {
172 g_assert (name != NULL);
173 g_assert (type != NULL);
174 g_assert (rrd_dst_from_string (type) != RRD_DST_INVALID);
175
176 rspamd_strlcpy (ds->ds_nam, name, sizeof (ds->ds_nam));
177 rspamd_strlcpy (ds->dst, type, sizeof (ds->dst));
178 memset (ds->par, 0, sizeof (ds->par));
179 ds->par[RRD_DS_mrhb_cnt].lv = pdp_step * 2;
180 ds->par[RRD_DS_min_val].dv = NAN;
181 ds->par[RRD_DS_max_val].dv = NAN;
182 }
183
184 /**
185 * Check rrd file for correctness (size, cookies, etc)
186 */
187 static gboolean
rspamd_rrd_check_file(const gchar * filename,gboolean need_data,GError ** err)188 rspamd_rrd_check_file (const gchar *filename, gboolean need_data, GError **err)
189 {
190 gint fd, i;
191 struct stat st;
192 struct rrd_file_head head;
193 struct rrd_rra_def rra;
194 gint head_size;
195
196 fd = open (filename, O_RDWR);
197 if (fd == -1) {
198 g_set_error (err,
199 rrd_error_quark (), errno, "rrd open error: %s", strerror (errno));
200 return FALSE;
201 }
202
203 if (fstat (fd, &st) == -1) {
204 g_set_error (err,
205 rrd_error_quark (), errno, "rrd stat error: %s", strerror (errno));
206 close (fd);
207 return FALSE;
208 }
209 if (st.st_size < (goffset)sizeof (struct rrd_file_head)) {
210 /* We have trimmed file */
211 g_set_error (err, rrd_error_quark (), EINVAL, "rrd size is bad: %ud",
212 (guint)st.st_size);
213 close (fd);
214 return FALSE;
215 }
216
217 /* Try to read header */
218 if (read (fd, &head, sizeof (head)) != sizeof (head)) {
219 g_set_error (err,
220 rrd_error_quark (), errno, "rrd read head error: %s",
221 strerror (errno));
222 close (fd);
223 return FALSE;
224 }
225 /* Check magic */
226 if (memcmp (head.version, RRD_VERSION, sizeof (head.version)) != 0) {
227 g_set_error (err,
228 rrd_error_quark (), EINVAL, "rrd head error: bad cookie");
229 close (fd);
230 return FALSE;
231 }
232 if (head.float_cookie != RRD_FLOAT_COOKIE) {
233 g_set_error (err,
234 rrd_error_quark (), EINVAL, "rrd head error: another architecture "
235 "(file cookie %g != our cookie %g)",
236 head.float_cookie, RRD_FLOAT_COOKIE);
237 close (fd);
238 return FALSE;
239 }
240 /* Check for other params */
241 if (head.ds_cnt <= 0 || head.rra_cnt <= 0) {
242 g_set_error (err,
243 rrd_error_quark (), EINVAL, "rrd head cookies error: bad rra or ds count");
244 close (fd);
245 return FALSE;
246 }
247 /* Now we can calculate the overall size of rrd */
248 head_size = sizeof (struct rrd_file_head) +
249 sizeof (struct rrd_ds_def) * head.ds_cnt +
250 sizeof (struct rrd_rra_def) * head.rra_cnt +
251 sizeof (struct rrd_live_head) +
252 sizeof (struct rrd_pdp_prep) * head.ds_cnt +
253 sizeof (struct rrd_cdp_prep) * head.ds_cnt * head.rra_cnt +
254 sizeof (struct rrd_rra_ptr) * head.rra_cnt;
255 if (st.st_size < (goffset)head_size) {
256 g_set_error (err,
257 rrd_error_quark (), errno, "rrd file seems to have stripped header: %d",
258 head_size);
259 close (fd);
260 return FALSE;
261 }
262
263 if (need_data) {
264 /* Now check rra */
265 if (lseek (fd, sizeof (struct rrd_ds_def) * head.ds_cnt,
266 SEEK_CUR) == -1) {
267 g_set_error (err,
268 rrd_error_quark (), errno, "rrd head lseek error: %s",
269 strerror (errno));
270 close (fd);
271 return FALSE;
272 }
273 for (i = 0; i < (gint)head.rra_cnt; i++) {
274 if (read (fd, &rra, sizeof (rra)) != sizeof (rra)) {
275 g_set_error (err,
276 rrd_error_quark (), errno, "rrd read rra error: %s",
277 strerror (errno));
278 close (fd);
279 return FALSE;
280 }
281 head_size += rra.row_cnt * head.ds_cnt * sizeof (gdouble);
282 }
283
284 if (st.st_size != head_size) {
285 g_set_error (err,
286 rrd_error_quark (), EINVAL, "rrd file seems to have incorrect size: %d, must be %d",
287 (gint)st.st_size, head_size);
288 close (fd);
289 return FALSE;
290 }
291 }
292
293 close (fd);
294 return TRUE;
295 }
296
297 /**
298 * Adjust pointers in mmapped rrd file
299 * @param file
300 */
301 static void
rspamd_rrd_adjust_pointers(struct rspamd_rrd_file * file,gboolean completed)302 rspamd_rrd_adjust_pointers (struct rspamd_rrd_file *file, gboolean completed)
303 {
304 guint8 *ptr;
305
306 ptr = file->map;
307 file->stat_head = (struct rrd_file_head *)ptr;
308 ptr += sizeof (struct rrd_file_head);
309 file->ds_def = (struct rrd_ds_def *)ptr;
310 ptr += sizeof (struct rrd_ds_def) * file->stat_head->ds_cnt;
311 file->rra_def = (struct rrd_rra_def *)ptr;
312 ptr += sizeof (struct rrd_rra_def) * file->stat_head->rra_cnt;
313 file->live_head = (struct rrd_live_head *)ptr;
314 ptr += sizeof (struct rrd_live_head);
315 file->pdp_prep = (struct rrd_pdp_prep *)ptr;
316 ptr += sizeof (struct rrd_pdp_prep) * file->stat_head->ds_cnt;
317 file->cdp_prep = (struct rrd_cdp_prep *)ptr;
318 ptr += sizeof (struct rrd_cdp_prep) * file->stat_head->rra_cnt *
319 file->stat_head->ds_cnt;
320 file->rra_ptr = (struct rrd_rra_ptr *)ptr;
321 if (completed) {
322 ptr += sizeof (struct rrd_rra_ptr) * file->stat_head->rra_cnt;
323 file->rrd_value = (gdouble *)ptr;
324 }
325 else {
326 file->rrd_value = NULL;
327 }
328 }
329
330 static void
rspamd_rrd_calculate_checksum(struct rspamd_rrd_file * file)331 rspamd_rrd_calculate_checksum (struct rspamd_rrd_file *file)
332 {
333 guchar sigbuf[rspamd_cryptobox_HASHBYTES];
334 struct rrd_ds_def *ds;
335 guint i;
336 rspamd_cryptobox_hash_state_t st;
337
338 if (file->finalized) {
339 rspamd_cryptobox_hash_init (&st, NULL, 0);
340 rspamd_cryptobox_hash_update (&st, file->filename, strlen (file->filename));
341
342 for (i = 0; i < file->stat_head->ds_cnt; i ++) {
343 ds = &file->ds_def[i];
344 rspamd_cryptobox_hash_update (&st, ds->ds_nam, sizeof (ds->ds_nam));
345 }
346
347 rspamd_cryptobox_hash_final (&st, sigbuf);
348
349 file->id = rspamd_encode_base32 (sigbuf, sizeof (sigbuf), RSPAMD_BASE32_DEFAULT);
350 }
351 }
352
353 static int
rspamd_rrd_open_exclusive(const gchar * filename)354 rspamd_rrd_open_exclusive (const gchar *filename)
355 {
356 struct timespec sleep_ts = {
357 .tv_sec = 0,
358 .tv_nsec = 1000000
359 };
360 gint fd;
361
362 fd = open (filename, O_RDWR);
363
364 if (fd == -1) {
365 return -1;
366 }
367
368 for (;;) {
369 if (rspamd_file_lock (fd, TRUE) == -1) {
370 if (errno == EAGAIN || errno == EWOULDBLOCK) {
371 nanosleep (&sleep_ts, NULL);
372 continue;
373 }
374 else {
375 close (fd);
376 return -1;
377 }
378 }
379 else {
380 break;
381 }
382 }
383
384 return fd;
385 };
386
387 /**
388 * Open completed or incompleted rrd file
389 * @param filename
390 * @param completed
391 * @param err
392 * @return
393 */
394 static struct rspamd_rrd_file *
rspamd_rrd_open_common(const gchar * filename,gboolean completed,GError ** err)395 rspamd_rrd_open_common (const gchar *filename, gboolean completed, GError **err)
396 {
397 struct rspamd_rrd_file *file;
398 gint fd;
399 struct stat st;
400
401 if (!rspamd_rrd_check_file (filename, completed, err)) {
402 return NULL;
403 }
404
405 file = g_malloc0 (sizeof (struct rspamd_rrd_file));
406
407 /* Open file */
408 fd = rspamd_rrd_open_exclusive (filename);
409 if (fd == -1) {
410 g_set_error (err,
411 rrd_error_quark (), errno, "rrd open error: %s", strerror (errno));
412 g_free (file);
413 return FALSE;
414 }
415
416 if (fstat (fd, &st) == -1) {
417 g_set_error (err,
418 rrd_error_quark (), errno, "rrd stat error: %s", strerror (errno));
419 rspamd_file_unlock (fd, FALSE);
420 g_free (file);
421 close (fd);
422 return FALSE;
423 }
424 /* Mmap file */
425 file->size = st.st_size;
426 if ((file->map =
427 mmap (NULL, st.st_size, PROT_READ | PROT_WRITE,
428 MAP_SHARED, fd, 0)) == MAP_FAILED) {
429
430 rspamd_file_unlock (fd, FALSE);
431 close (fd);
432 g_set_error (err,
433 rrd_error_quark (), ENOMEM, "mmap failed: %s", strerror (errno));
434 g_free (file);
435 return NULL;
436 }
437
438 file->fd = fd;
439
440 /* Adjust pointers */
441 rspamd_rrd_adjust_pointers (file, completed);
442
443 /* Mark it as finalized */
444 file->finalized = completed;
445
446 file->filename = g_strdup (filename);
447 rspamd_rrd_calculate_checksum (file);
448
449 return file;
450 }
451
452 /**
453 * Open (and mmap) existing RRD file
454 * @param filename path
455 * @param err error pointer
456 * @return rrd file structure
457 */
458 struct rspamd_rrd_file *
rspamd_rrd_open(const gchar * filename,GError ** err)459 rspamd_rrd_open (const gchar *filename, GError **err)
460 {
461 struct rspamd_rrd_file *file;
462
463 if ((file = rspamd_rrd_open_common (filename, TRUE, err))) {
464 msg_info_rrd ("rrd file opened: %s", filename);
465 }
466
467 return file;
468 }
469
470 /**
471 * Create basic header for rrd file
472 * @param filename file path
473 * @param ds_count number of data sources
474 * @param rra_count number of round robin archives
475 * @param pdp_step step of primary data points
476 * @param err error pointer
477 * @return TRUE if file has been created
478 */
479 struct rspamd_rrd_file *
rspamd_rrd_create(const gchar * filename,gulong ds_count,gulong rra_count,gulong pdp_step,gdouble initial_ticks,GError ** err)480 rspamd_rrd_create (const gchar *filename,
481 gulong ds_count,
482 gulong rra_count,
483 gulong pdp_step,
484 gdouble initial_ticks,
485 GError **err)
486 {
487 struct rspamd_rrd_file *new;
488 struct rrd_file_head head;
489 struct rrd_ds_def ds;
490 struct rrd_rra_def rra;
491 struct rrd_live_head lh;
492 struct rrd_pdp_prep pdp;
493 struct rrd_cdp_prep cdp;
494 struct rrd_rra_ptr rra_ptr;
495 gint fd;
496 guint i, j;
497
498 /* Open file */
499 fd = open (filename, O_RDWR | O_CREAT | O_EXCL, 0644);
500 if (fd == -1) {
501 g_set_error (err,
502 rrd_error_quark (), errno, "rrd create error: %s",
503 strerror (errno));
504 return NULL;
505 }
506
507 rspamd_file_lock (fd, FALSE);
508
509 /* Fill header */
510 memset (&head, 0, sizeof (head));
511 head.rra_cnt = rra_count;
512 head.ds_cnt = ds_count;
513 head.pdp_step = pdp_step;
514 memcpy (head.cookie, RRD_COOKIE, sizeof (head.cookie));
515 memcpy (head.version, RRD_VERSION, sizeof (head.version));
516 head.float_cookie = RRD_FLOAT_COOKIE;
517
518 if (write (fd, &head, sizeof (head)) != sizeof (head)) {
519 rspamd_file_unlock (fd, FALSE);
520 close (fd);
521 g_set_error (err,
522 rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
523 return NULL;
524 }
525
526 /* Fill DS section */
527 memset (&ds, 0, sizeof (ds));
528 memset (&ds.ds_nam, 0, sizeof (ds.ds_nam));
529 memcpy (&ds.dst, "COUNTER", sizeof ("COUNTER"));
530 memset (&ds.par, 0, sizeof (ds.par));
531 for (i = 0; i < ds_count; i++) {
532 if (write (fd, &ds, sizeof (ds)) != sizeof (ds)) {
533 rspamd_file_unlock (fd, FALSE);
534 close (fd);
535 g_set_error (err,
536 rrd_error_quark (), errno, "rrd write error: %s",
537 strerror (errno));
538 return NULL;
539 }
540 }
541
542 /* Fill RRA section */
543 memset (&rra, 0, sizeof (rra));
544 memcpy (&rra.cf_nam, "AVERAGE", sizeof ("AVERAGE"));
545 rra.pdp_cnt = 1;
546 memset (&rra.par, 0, sizeof (rra.par));
547 for (i = 0; i < rra_count; i++) {
548 if (write (fd, &rra, sizeof (rra)) != sizeof (rra)) {
549 rspamd_file_unlock (fd, FALSE);
550 close (fd);
551 g_set_error (err,
552 rrd_error_quark (), errno, "rrd write error: %s",
553 strerror (errno));
554 return NULL;
555 }
556 }
557
558 /* Fill live header */
559 memset (&lh, 0, sizeof (lh));
560 lh.last_up = (glong)initial_ticks;
561 lh.last_up_usec = (glong)((initial_ticks - lh.last_up) * 1e6f);
562
563 if (write (fd, &lh, sizeof (lh)) != sizeof (lh)) {
564 rspamd_file_unlock (fd, FALSE);
565 close (fd);
566 g_set_error (err,
567 rrd_error_quark (), errno, "rrd write error: %s", strerror (errno));
568 return NULL;
569 }
570
571 /* Fill pdp prep */
572 memset (&pdp, 0, sizeof (pdp));
573 memcpy (&pdp.last_ds, "U", sizeof ("U"));
574 memset (&pdp.scratch, 0, sizeof (pdp.scratch));
575 pdp.scratch[PDP_val].dv = NAN;
576 pdp.scratch[PDP_unkn_sec_cnt].lv = 0;
577
578 for (i = 0; i < ds_count; i++) {
579 if (write (fd, &pdp, sizeof (pdp)) != sizeof (pdp)) {
580 rspamd_file_unlock (fd, FALSE);
581 close (fd);
582 g_set_error (err,
583 rrd_error_quark (), errno, "rrd write error: %s",
584 strerror (errno));
585 return NULL;
586 }
587 }
588
589 /* Fill cdp prep */
590 memset (&cdp, 0, sizeof (cdp));
591 memset (&cdp.scratch, 0, sizeof (cdp.scratch));
592 cdp.scratch[CDP_val].dv = NAN;
593 cdp.scratch[CDP_unkn_pdp_cnt].lv = 0;
594
595 for (i = 0; i < rra_count; i++) {
596 for (j = 0; j < ds_count; j++) {
597 if (write (fd, &cdp, sizeof (cdp)) != sizeof (cdp)) {
598 rspamd_file_unlock (fd, FALSE);
599 close (fd);
600 g_set_error (err,
601 rrd_error_quark (), errno, "rrd write error: %s",
602 strerror (errno));
603 return NULL;
604 }
605 }
606 }
607
608 /* Set row pointers */
609 memset (&rra_ptr, 0, sizeof (rra_ptr));
610 for (i = 0; i < rra_count; i++) {
611 if (write (fd, &rra_ptr, sizeof (rra_ptr)) != sizeof (rra_ptr)) {
612 rspamd_file_unlock (fd, FALSE);
613 close (fd);
614 g_set_error (err,
615 rrd_error_quark (), errno, "rrd write error: %s",
616 strerror (errno));
617 return NULL;
618 }
619 }
620
621 rspamd_file_unlock (fd, FALSE);
622 close (fd);
623
624 new = rspamd_rrd_open_common (filename, FALSE, err);
625
626 return new;
627 }
628
629 /**
630 * Add data sources to rrd file
631 * @param filename path to file
632 * @param ds array of struct rrd_ds_def
633 * @param err error pointer
634 * @return TRUE if data sources were added
635 */
636 gboolean
rspamd_rrd_add_ds(struct rspamd_rrd_file * file,GArray * ds,GError ** err)637 rspamd_rrd_add_ds (struct rspamd_rrd_file *file, GArray *ds, GError **err)
638 {
639
640 if (file == NULL || file->stat_head->ds_cnt * sizeof (struct rrd_ds_def) !=
641 ds->len) {
642 g_set_error (err,
643 rrd_error_quark (), EINVAL, "rrd add ds failed: wrong arguments");
644 return FALSE;
645 }
646
647 /* Straightforward memcpy */
648 memcpy (file->ds_def, ds->data, ds->len);
649
650 return TRUE;
651 }
652
653 /**
654 * Add round robin archives to rrd file
655 * @param filename path to file
656 * @param ds array of struct rrd_rra_def
657 * @param err error pointer
658 * @return TRUE if archives were added
659 */
660 gboolean
rspamd_rrd_add_rra(struct rspamd_rrd_file * file,GArray * rra,GError ** err)661 rspamd_rrd_add_rra (struct rspamd_rrd_file *file, GArray *rra, GError **err)
662 {
663 if (file == NULL || file->stat_head->rra_cnt *
664 sizeof (struct rrd_rra_def) != rra->len) {
665 g_set_error (err,
666 rrd_error_quark (), EINVAL, "rrd add rra failed: wrong arguments");
667 return FALSE;
668 }
669
670 /* Straightforward memcpy */
671 memcpy (file->rra_def, rra->data, rra->len);
672
673 return TRUE;
674 }
675
676 /**
677 * Finalize rrd file header and initialize all RRA in the file
678 * @param filename file path
679 * @param err error pointer
680 * @return TRUE if rrd file is ready for use
681 */
682 gboolean
rspamd_rrd_finalize(struct rspamd_rrd_file * file,GError ** err)683 rspamd_rrd_finalize (struct rspamd_rrd_file *file, GError **err)
684 {
685 gint fd;
686 guint i;
687 gint count = 0;
688 gdouble vbuf[1024];
689 struct stat st;
690
691 if (file == NULL || file->filename == NULL || file->fd == -1) {
692 g_set_error (err,
693 rrd_error_quark (), EINVAL, "rrd add rra failed: wrong arguments");
694 return FALSE;
695 }
696
697 fd = file->fd;
698
699 if (lseek (fd, 0, SEEK_END) == -1) {
700 g_set_error (err,
701 rrd_error_quark (), errno, "rrd seek error: %s", strerror (errno));
702 close (fd);
703 return FALSE;
704 }
705
706 /* Adjust CDP */
707 for (i = 0; i < file->stat_head->rra_cnt; i++) {
708 file->cdp_prep->scratch[CDP_unkn_pdp_cnt].lv = 0;
709 /* Randomize row pointer (disabled) */
710 /* file->rra_ptr->cur_row = g_random_int () % file->rra_def[i].row_cnt; */
711 file->rra_ptr->cur_row = file->rra_def[i].row_cnt - 1;
712 /* Calculate values count */
713 count += file->rra_def[i].row_cnt * file->stat_head->ds_cnt;
714 }
715
716 munmap (file->map, file->size);
717 /* Write values */
718 for (i = 0; i < G_N_ELEMENTS (vbuf); i++) {
719 vbuf[i] = NAN;
720 }
721
722 while (count > 0) {
723 /* Write values in buffered matter */
724 if (write (fd, vbuf,
725 MIN ((gint)G_N_ELEMENTS (vbuf), count) * sizeof (gdouble)) == -1) {
726 g_set_error (err,
727 rrd_error_quark (), errno, "rrd write error: %s",
728 strerror (errno));
729 close (fd);
730 return FALSE;
731 }
732 count -= G_N_ELEMENTS (vbuf);
733 }
734
735 if (fstat (fd, &st) == -1) {
736 g_set_error (err,
737 rrd_error_quark (), errno, "rrd stat error: %s", strerror (errno));
738 close (fd);
739 return FALSE;
740 }
741
742 /* Mmap again */
743 file->size = st.st_size;
744 if ((file->map =
745 mmap (NULL, st.st_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd,
746 0)) == MAP_FAILED) {
747 close (fd);
748 g_set_error (err,
749 rrd_error_quark (), ENOMEM, "mmap failed: %s", strerror (errno));
750
751 return FALSE;
752 }
753
754 /* Adjust pointers */
755 rspamd_rrd_adjust_pointers (file, TRUE);
756
757 file->finalized = TRUE;
758 rspamd_rrd_calculate_checksum (file);
759 msg_info_rrd ("rrd file created: %s", file->filename);
760
761 return TRUE;
762 }
763
764 /**
765 * Update pdp_prep data
766 * @param file rrd file
767 * @param vals new values
768 * @param pdp_new new pdp array
769 * @param interval time elapsed from the last update
770 * @return
771 */
772 static gboolean
rspamd_rrd_update_pdp_prep(struct rspamd_rrd_file * file,gdouble * vals,gdouble * pdp_new,gdouble interval)773 rspamd_rrd_update_pdp_prep (struct rspamd_rrd_file *file,
774 gdouble *vals,
775 gdouble *pdp_new,
776 gdouble interval)
777 {
778 guint i;
779 enum rrd_dst_type type;
780
781 for (i = 0; i < file->stat_head->ds_cnt; i++) {
782 type = rrd_dst_from_string (file->ds_def[i].dst);
783
784 if (file->ds_def[i].par[RRD_DS_mrhb_cnt].lv < interval) {
785 rspamd_strlcpy (file->pdp_prep[i].last_ds, "U",
786 sizeof (file->pdp_prep[i].last_ds));
787 pdp_new[i] = NAN;
788 msg_debug_rrd ("adding unknown point interval %.3f is less than heartbeat %l",
789 interval, file->ds_def[i].par[RRD_DS_mrhb_cnt].lv);
790 }
791 else {
792 switch (type) {
793 case RRD_DST_COUNTER:
794 case RRD_DST_DERIVE:
795 if (file->pdp_prep[i].last_ds[0] == 'U') {
796 pdp_new[i] = NAN;
797 msg_debug_rrd ("last point is NaN for point %ud", i);
798 }
799 else {
800 pdp_new[i] = vals[i] - strtod (file->pdp_prep[i].last_ds,
801 NULL);
802 msg_debug_rrd ("new PDP %ud, %.3f", i, pdp_new[i]);
803 }
804 break;
805 case RRD_DST_GAUGE:
806 pdp_new[i] = vals[i] * interval;
807 msg_debug_rrd ("new PDP %ud, %.3f", i, pdp_new[i]);
808 break;
809 case RRD_DST_ABSOLUTE:
810 pdp_new[i] = vals[i];
811 msg_debug_rrd ("new PDP %ud, %.3f", i, pdp_new[i]);
812 break;
813 default:
814 return FALSE;
815 }
816 }
817
818 /* Copy value to the last_ds */
819 if (!isnan (vals[i])) {
820 rspamd_snprintf (file->pdp_prep[i].last_ds,
821 sizeof (file->pdp_prep[i].last_ds), "%.4f", vals[i]);
822 }
823 else {
824 file->pdp_prep[i].last_ds[0] = 'U';
825 file->pdp_prep[i].last_ds[1] = '\0';
826 }
827 }
828
829
830 return TRUE;
831 }
832
833 /**
834 * Update step for this pdp
835 * @param file
836 * @param pdp_new new pdp array
837 * @param pdp_temp temp pdp array
838 * @param interval time till last update
839 * @param pre_int pre interval
840 * @param post_int post intervall
841 * @param pdp_diff time till last pdp update
842 */
843 static void
rspamd_rrd_update_pdp_step(struct rspamd_rrd_file * file,gdouble * pdp_new,gdouble * pdp_temp,gdouble interval,gulong pdp_diff)844 rspamd_rrd_update_pdp_step (struct rspamd_rrd_file *file,
845 gdouble *pdp_new,
846 gdouble *pdp_temp,
847 gdouble interval,
848 gulong pdp_diff)
849 {
850 guint i;
851 rrd_value_t *scratch;
852 gulong heartbeat;
853
854
855 for (i = 0; i < file->stat_head->ds_cnt; i++) {
856 scratch = file->pdp_prep[i].scratch;
857 heartbeat = file->ds_def[i].par[RRD_DS_mrhb_cnt].lv;
858
859 if (!isnan (pdp_new[i])) {
860 if (isnan (scratch[PDP_val].dv)) {
861 scratch[PDP_val].dv = 0;
862 }
863 }
864
865 /* Check interval value for heartbeat for this DS */
866 if ((interval > heartbeat) ||
867 (file->stat_head->pdp_step / 2.0 < scratch[PDP_unkn_sec_cnt].lv)) {
868 pdp_temp[i] = NAN;
869 }
870 else {
871 pdp_temp[i] = scratch[PDP_val].dv /
872 ((double) (pdp_diff - scratch[PDP_unkn_sec_cnt].lv));
873 }
874
875 if (isnan (pdp_new[i])) {
876 scratch[PDP_unkn_sec_cnt].lv = interval;
877 scratch[PDP_val].dv = NAN;
878 } else {
879 scratch[PDP_unkn_sec_cnt].lv = 0;
880 scratch[PDP_val].dv = pdp_new[i] / interval;
881 }
882
883 msg_debug_rrd ("new temp PDP %ud, %.3f -> %.3f, scratch: %3f",
884 i, pdp_new[i], pdp_temp[i],
885 scratch[PDP_val].dv);
886 }
887 }
888
889 /**
890 * Update CDP for this rra
891 * @param file rrd file
892 * @param pdp_steps how much pdp steps elapsed from the last update
893 * @param pdp_offset offset from pdp
894 * @param rra_steps how much steps must be updated for this rra
895 * @param rra_index index of desired rra
896 * @param pdp_temp temporary pdp points
897 */
898 static void
rspamd_rrd_update_cdp(struct rspamd_rrd_file * file,gdouble pdp_steps,gdouble pdp_offset,gulong * rra_steps,gulong rra_index,gdouble * pdp_temp)899 rspamd_rrd_update_cdp (struct rspamd_rrd_file *file,
900 gdouble pdp_steps,
901 gdouble pdp_offset,
902 gulong *rra_steps,
903 gulong rra_index,
904 gdouble *pdp_temp)
905 {
906 guint i;
907 struct rrd_rra_def *rra;
908 rrd_value_t *scratch;
909 enum rrd_cf_type cf;
910 gdouble last_cdp = INFINITY, cur_cdp = INFINITY;
911 gulong pdp_in_cdp;
912
913 rra = &file->rra_def[rra_index];
914 cf = rrd_cf_from_string (rra->cf_nam);
915
916 /* Iterate over all DS for this RRA */
917 for (i = 0; i < file->stat_head->ds_cnt; i++) {
918 /* Get CDP for this RRA and DS */
919 scratch =
920 file->cdp_prep[rra_index * file->stat_head->ds_cnt + i].scratch;
921 if (rra->pdp_cnt > 1) {
922 /* Do we have any CDP to update for this rra ? */
923 if (rra_steps[rra_index] > 0) {
924
925 if (isnan (pdp_temp[i])) {
926 /* New pdp is nan */
927 /* Increment unknown points count */
928 scratch[CDP_unkn_pdp_cnt].lv += pdp_offset;
929 /* Reset secondary value */
930 scratch[CDP_secondary_val].dv = NAN;
931 }
932 else {
933 scratch[CDP_secondary_val].dv = pdp_temp[i];
934 }
935
936 /* Check XFF for this rra */
937 if (scratch[CDP_unkn_pdp_cnt].lv > rra->pdp_cnt *
938 rra->par[RRA_cdp_xff_val].lv) {
939 /* XFF is reached */
940 scratch[CDP_primary_val].dv = NAN;
941 }
942 else {
943 /* Need to initialize CDP using specified consolidation */
944 switch (cf) {
945 case RRD_CF_AVERAGE:
946 last_cdp =
947 isnan (scratch[CDP_val].dv) ? 0.0 : scratch[CDP_val]
948 .dv;
949 cur_cdp = isnan (pdp_temp[i]) ? 0.0 : pdp_temp[i];
950 scratch[CDP_primary_val].dv =
951 (last_cdp + cur_cdp *
952 pdp_offset) /
953 (rra->pdp_cnt - scratch[CDP_unkn_pdp_cnt].lv);
954 break;
955 case RRD_CF_MAXIMUM:
956 last_cdp =
957 isnan (scratch[CDP_val].dv) ? -INFINITY : scratch[
958 CDP_val].dv;
959 cur_cdp = isnan (pdp_temp[i]) ? -INFINITY : pdp_temp[i];
960 scratch[CDP_primary_val].dv = MAX (last_cdp, cur_cdp);
961 break;
962 case RRD_CF_MINIMUM:
963 last_cdp =
964 isnan (scratch[CDP_val].dv) ? INFINITY : scratch[
965 CDP_val].dv;
966 cur_cdp = isnan (pdp_temp[i]) ? INFINITY : pdp_temp[i];
967 scratch[CDP_primary_val].dv = MIN (last_cdp, cur_cdp);
968 break;
969 case RRD_CF_LAST:
970 default:
971 scratch[CDP_primary_val].dv = pdp_temp[i];
972 last_cdp = INFINITY;
973 break;
974 }
975 }
976
977 /* Init carry of this CDP */
978 pdp_in_cdp = (pdp_steps - pdp_offset) / rra->pdp_cnt;
979 if (pdp_in_cdp == 0 || isnan (pdp_temp[i])) {
980 /* Set overflow */
981 switch (cf) {
982 case RRD_CF_AVERAGE:
983 scratch[CDP_val].dv = 0;
984 break;
985 case RRD_CF_MAXIMUM:
986 scratch[CDP_val].dv = -INFINITY;
987 break;
988 case RRD_CF_MINIMUM:
989 scratch[CDP_val].dv = INFINITY;
990 break;
991 default:
992 scratch[CDP_val].dv = NAN;
993 break;
994 }
995 }
996 else {
997 /* Special carry for average */
998 if (cf == RRD_CF_AVERAGE) {
999 scratch[CDP_val].dv = pdp_temp[i] * pdp_in_cdp;
1000 }
1001 else {
1002 scratch[CDP_val].dv = pdp_temp[i];
1003 }
1004 }
1005
1006 scratch[CDP_unkn_pdp_cnt].lv = 0;
1007
1008 msg_debug_rrd ("update cdp for DS %d with value %.3f, "
1009 "stored value: %.3f, carry: %.3f",
1010 i, last_cdp,
1011 scratch[CDP_primary_val].dv, scratch[CDP_val].dv);
1012 }
1013 /* In this case we just need to update cdp_prep for this RRA */
1014 else {
1015 if (isnan (pdp_temp[i])) {
1016 /* Just increase undefined zone */
1017 scratch[CDP_unkn_pdp_cnt].lv += pdp_steps;
1018 }
1019 else {
1020 /* Calculate cdp value */
1021 last_cdp = scratch[CDP_val].dv;
1022 switch (cf) {
1023 case RRD_CF_AVERAGE:
1024 if (isnan (last_cdp)) {
1025 scratch[CDP_val].dv = pdp_temp[i] * pdp_steps;
1026 }
1027 else {
1028 scratch[CDP_val].dv = last_cdp + pdp_temp[i] *
1029 pdp_steps;
1030 }
1031 break;
1032 case RRD_CF_MAXIMUM:
1033 scratch[CDP_val].dv = MAX (last_cdp, pdp_temp[i]);
1034 break;
1035 case RRD_CF_MINIMUM:
1036 scratch[CDP_val].dv = MIN (last_cdp, pdp_temp[i]);
1037 break;
1038 case RRD_CF_LAST:
1039 scratch[CDP_val].dv = pdp_temp[i];
1040 break;
1041 default:
1042 scratch[CDP_val].dv = NAN;
1043 break;
1044 }
1045 }
1046
1047 msg_debug_rrd ("aggregate cdp %d with pdp %.3f, "
1048 "stored value: %.3f",
1049 i, pdp_temp[i], scratch[CDP_val].dv);
1050 }
1051 }
1052 else {
1053 /* We have nothing to consolidate, but we may miss some pdp */
1054 if (pdp_steps > 2) {
1055 /* Just write PDP value */
1056 scratch[CDP_primary_val].dv = pdp_temp[i];
1057 scratch[CDP_secondary_val].dv = pdp_temp[i];
1058 }
1059 }
1060 }
1061 }
1062
1063 /**
1064 * Update RRA in a file
1065 * @param file rrd file
1066 * @param rra_steps steps for each rra
1067 * @param now current time
1068 */
1069 void
rspamd_rrd_write_rra(struct rspamd_rrd_file * file,gulong * rra_steps)1070 rspamd_rrd_write_rra (struct rspamd_rrd_file *file, gulong *rra_steps)
1071 {
1072 guint i, j, ds_cnt;
1073 struct rrd_rra_def *rra;
1074 struct rrd_cdp_prep *cdp;
1075 gdouble *rra_row = file->rrd_value, *cur_row;
1076
1077
1078 ds_cnt = file->stat_head->ds_cnt;
1079 /* Iterate over all RRA */
1080 for (i = 0; i < file->stat_head->rra_cnt; i++) {
1081 rra = &file->rra_def[i];
1082
1083 if (rra_steps[i] > 0) {
1084
1085 /* Move row ptr */
1086 if (++file->rra_ptr[i].cur_row >= rra->row_cnt) {
1087 file->rra_ptr[i].cur_row = 0;
1088 }
1089 /* Calculate seek */
1090 cdp = &file->cdp_prep[ds_cnt * i];
1091 cur_row = rra_row + ds_cnt * file->rra_ptr[i].cur_row;
1092 /* Iterate over DS */
1093 for (j = 0; j < ds_cnt; j++) {
1094 cur_row[j] = cdp[j].scratch[CDP_primary_val].dv;
1095 msg_debug_rrd ("write cdp %d: %.3f", j, cur_row[j]);
1096 }
1097 }
1098
1099 rra_row += rra->row_cnt * ds_cnt;
1100 }
1101 }
1102
1103 /**
1104 * Add record to rrd file
1105 * @param file rrd file object
1106 * @param points points (must be row suitable for this RRA, depending on ds count)
1107 * @param err error pointer
1108 * @return TRUE if a row has been added
1109 */
1110 gboolean
rspamd_rrd_add_record(struct rspamd_rrd_file * file,GArray * points,gdouble ticks,GError ** err)1111 rspamd_rrd_add_record (struct rspamd_rrd_file *file,
1112 GArray *points,
1113 gdouble ticks,
1114 GError **err)
1115 {
1116 gdouble interval, *pdp_new, *pdp_temp;
1117 guint i;
1118 glong seconds, microseconds;
1119 gulong pdp_steps, cur_pdp_count, prev_pdp_step, cur_pdp_step,
1120 prev_pdp_age, cur_pdp_age, *rra_steps, pdp_offset;
1121
1122 if (file == NULL || file->stat_head->ds_cnt * sizeof (gdouble) !=
1123 points->len) {
1124 g_set_error (err,
1125 rrd_error_quark (), EINVAL,
1126 "rrd add points failed: wrong arguments");
1127 return FALSE;
1128 }
1129
1130 /* Get interval */
1131 seconds = (glong)ticks;
1132 microseconds = (glong)((ticks - seconds) * 1000000.);
1133 interval = ticks - ((gdouble)file->live_head->last_up +
1134 file->live_head->last_up_usec / 1000000.);
1135
1136 msg_debug_rrd ("update rrd record after %.3f seconds", interval);
1137
1138 /* Update PDP preparation values */
1139 pdp_new = g_malloc0 (sizeof (gdouble) * file->stat_head->ds_cnt);
1140 pdp_temp = g_malloc0 (sizeof (gdouble) * file->stat_head->ds_cnt);
1141 /* How much steps need to be updated in each RRA */
1142 rra_steps = g_malloc0 (sizeof (gulong) * file->stat_head->rra_cnt);
1143
1144 if (!rspamd_rrd_update_pdp_prep (file, (gdouble *)points->data, pdp_new,
1145 interval)) {
1146 g_set_error (err,
1147 rrd_error_quark (), EINVAL,
1148 "rrd update pdp failed: wrong arguments");
1149 g_free (pdp_new);
1150 g_free (pdp_temp);
1151 g_free (rra_steps);
1152 return FALSE;
1153 }
1154
1155 /* Calculate elapsed steps */
1156 /* Age in seconds for previous pdp store */
1157 prev_pdp_age = file->live_head->last_up % file->stat_head->pdp_step;
1158 /* Time in seconds for last pdp update */
1159 prev_pdp_step = file->live_head->last_up - prev_pdp_age;
1160 /* Age in seconds from current time to required pdp time */
1161 cur_pdp_age = seconds % file->stat_head->pdp_step;
1162 /* Time of desired pdp step */
1163 cur_pdp_step = seconds - cur_pdp_age;
1164 cur_pdp_count = cur_pdp_step / file->stat_head->pdp_step;
1165 pdp_steps = (cur_pdp_step - prev_pdp_step) / file->stat_head->pdp_step;
1166
1167
1168 if (pdp_steps == 0) {
1169 /* Simple update of pdp prep */
1170 for (i = 0; i < file->stat_head->ds_cnt; i++) {
1171 if (isnan (pdp_new[i])) {
1172 /* Increment unknown period */
1173 file->pdp_prep[i].scratch[PDP_unkn_sec_cnt].lv += floor (
1174 interval);
1175 }
1176 else {
1177 if (isnan (file->pdp_prep[i].scratch[PDP_val].dv)) {
1178 /* Reset pdp to the current value */
1179 file->pdp_prep[i].scratch[PDP_val].dv = pdp_new[i];
1180 }
1181 else {
1182 /* Increment pdp value */
1183 file->pdp_prep[i].scratch[PDP_val].dv += pdp_new[i];
1184 }
1185 }
1186 }
1187 }
1188 else {
1189 /* Complex update of PDP, CDP and RRA */
1190
1191 /* Update PDP for this step */
1192 rspamd_rrd_update_pdp_step (file,
1193 pdp_new,
1194 pdp_temp,
1195 interval,
1196 pdp_steps * file->stat_head->pdp_step);
1197
1198
1199 /* Update CDP points for each RRA*/
1200 for (i = 0; i < file->stat_head->rra_cnt; i++) {
1201 /* Calculate pdp offset for this RRA */
1202 pdp_offset = file->rra_def[i].pdp_cnt - cur_pdp_count %
1203 file->rra_def[i].pdp_cnt;
1204 /* How much steps we got for this RRA */
1205 if (pdp_offset <= pdp_steps) {
1206 rra_steps[i] =
1207 (pdp_steps - pdp_offset) / file->rra_def[i].pdp_cnt + 1;
1208 }
1209 else {
1210 /* This rra have not passed enough pdp steps */
1211 rra_steps[i] = 0;
1212 }
1213
1214 msg_debug_rrd ("cdp: %ud, rra steps: %ul(%ul), pdp steps: %ul",
1215 i, rra_steps[i], pdp_offset, pdp_steps);
1216
1217 /* Update this specific CDP */
1218 rspamd_rrd_update_cdp (file,
1219 pdp_steps,
1220 pdp_offset,
1221 rra_steps,
1222 i,
1223 pdp_temp);
1224
1225 }
1226
1227 /* Write RRA */
1228 rspamd_rrd_write_rra (file, rra_steps);
1229 }
1230 file->live_head->last_up = seconds;
1231 file->live_head->last_up_usec = microseconds;
1232
1233 /* Sync and invalidate */
1234 msync (file->map, file->size, MS_ASYNC | MS_INVALIDATE);
1235
1236 g_free (pdp_new);
1237 g_free (pdp_temp);
1238 g_free (rra_steps);
1239
1240 return TRUE;
1241 }
1242
1243 /**
1244 * Close rrd file
1245 * @param file
1246 * @return
1247 */
1248 gint
rspamd_rrd_close(struct rspamd_rrd_file * file)1249 rspamd_rrd_close (struct rspamd_rrd_file * file)
1250 {
1251 if (file == NULL) {
1252 errno = EINVAL;
1253 return -1;
1254 }
1255
1256 munmap (file->map, file->size);
1257 close (file->fd);
1258 g_free (file->filename);
1259 g_free (file->id);
1260
1261 g_free (file);
1262
1263 return 0;
1264 }
1265
1266 static struct rspamd_rrd_file *
rspamd_rrd_create_file(const gchar * path,gboolean finalize,GError ** err)1267 rspamd_rrd_create_file (const gchar *path, gboolean finalize, GError **err)
1268 {
1269 struct rspamd_rrd_file *file;
1270 struct rrd_ds_def ds[RSPAMD_RRD_DS_COUNT];
1271 struct rrd_rra_def rra[RSPAMD_RRD_RRA_COUNT];
1272 gint i;
1273 GArray ar;
1274
1275 /* Try to create new rrd file */
1276
1277 file = rspamd_rrd_create (path, RSPAMD_RRD_DS_COUNT, RSPAMD_RRD_RRA_COUNT,
1278 1, rspamd_get_calendar_ticks (), err);
1279
1280 if (file == NULL) {
1281 return NULL;
1282 }
1283
1284 /* Create DS and RRA */
1285
1286 for (i = METRIC_ACTION_REJECT; i < METRIC_ACTION_MAX; i ++) {
1287 rrd_make_default_ds (rspamd_action_to_str (i),
1288 rrd_dst_to_string (RRD_DST_COUNTER), 1, &ds[i]);
1289 }
1290
1291 ar.data = (gchar *)ds;
1292 ar.len = sizeof (ds);
1293
1294 if (!rspamd_rrd_add_ds (file, &ar, err)) {
1295 rspamd_rrd_close (file);
1296 return NULL;
1297 }
1298
1299 /* Once per minute for 1 day */
1300 rrd_make_default_rra (rrd_cf_to_string (RRD_CF_AVERAGE),
1301 60, 24 * 60, &rra[0]);
1302 /* Once per 5 minutes for 1 week */
1303 rrd_make_default_rra (rrd_cf_to_string (RRD_CF_AVERAGE),
1304 5 * 60, 7 * 24 * 60 / 5, &rra[1]);
1305 /* Once per 10 mins for 1 month */
1306 rrd_make_default_rra (rrd_cf_to_string (RRD_CF_AVERAGE),
1307 60 * 10, 30 * 24 * 6, &rra[2]);
1308 /* Once per hour for 1 year */
1309 rrd_make_default_rra (rrd_cf_to_string (RRD_CF_AVERAGE),
1310 60 * 60, 365 * 24, &rra[3]);
1311 ar.data = (gchar *)rra;
1312 ar.len = sizeof (rra);
1313
1314 if (!rspamd_rrd_add_rra (file, &ar, err)) {
1315 rspamd_rrd_close (file);
1316 return NULL;
1317 }
1318
1319 if (finalize && !rspamd_rrd_finalize (file, err)) {
1320 rspamd_rrd_close (file);
1321 return NULL;
1322 }
1323
1324 return file;
1325 }
1326
1327 static void
rspamd_rrd_convert_ds(struct rspamd_rrd_file * old,struct rspamd_rrd_file * cur,gint idx_old,gint idx_new)1328 rspamd_rrd_convert_ds (struct rspamd_rrd_file *old,
1329 struct rspamd_rrd_file *cur, gint idx_old, gint idx_new)
1330 {
1331 struct rrd_pdp_prep *pdp_prep_old, *pdp_prep_new;
1332 struct rrd_cdp_prep *cdp_prep_old, *cdp_prep_new;
1333 gdouble *val_old, *val_new;
1334 gulong rra_cnt, i, j, points_cnt, old_ds, new_ds;
1335
1336 rra_cnt = old->stat_head->rra_cnt;
1337 pdp_prep_old = &old->pdp_prep[idx_old];
1338 pdp_prep_new = &cur->pdp_prep[idx_new];
1339 memcpy (pdp_prep_new, pdp_prep_old, sizeof (*pdp_prep_new));
1340 val_old = old->rrd_value;
1341 val_new = cur->rrd_value;
1342 old_ds = old->stat_head->ds_cnt;
1343 new_ds = cur->stat_head->ds_cnt;
1344
1345 for (i = 0; i < rra_cnt; i++) {
1346 cdp_prep_old = &old->cdp_prep[i * old_ds] + idx_old;
1347 cdp_prep_new = &cur->cdp_prep[i * new_ds] + idx_new;
1348 memcpy (cdp_prep_new, cdp_prep_old, sizeof (*cdp_prep_new));
1349 points_cnt = old->rra_def[i].row_cnt;
1350
1351 for (j = 0; j < points_cnt; j ++) {
1352 val_new[j * new_ds + idx_new] = val_old[j * old_ds + idx_old];
1353 }
1354
1355 val_new += points_cnt * new_ds;
1356 val_old += points_cnt * old_ds;
1357 }
1358 }
1359
1360 static struct rspamd_rrd_file *
rspamd_rrd_convert(const gchar * path,struct rspamd_rrd_file * old,GError ** err)1361 rspamd_rrd_convert (const gchar *path, struct rspamd_rrd_file *old,
1362 GError **err)
1363 {
1364 struct rspamd_rrd_file *rrd;
1365 gchar tpath[PATH_MAX];
1366
1367 g_assert (old != NULL);
1368
1369 rspamd_snprintf (tpath, sizeof (tpath), "%s.new", path);
1370 rrd = rspamd_rrd_create_file (tpath, TRUE, err);
1371
1372 if (rrd) {
1373 /* Copy old data */
1374 memcpy (rrd->live_head, old->live_head, sizeof (*rrd->live_head));
1375 memcpy (rrd->rra_ptr, old->rra_ptr,
1376 sizeof (*old->rra_ptr) * rrd->stat_head->rra_cnt);
1377
1378 /*
1379 * Old DSes:
1380 * 0 - spam -> reject
1381 * 1 - probable spam -> add header
1382 * 2 - greylist -> greylist
1383 * 3 - ham -> ham
1384 */
1385 rspamd_rrd_convert_ds (old, rrd, 0, METRIC_ACTION_REJECT);
1386 rspamd_rrd_convert_ds (old, rrd, 1, METRIC_ACTION_ADD_HEADER);
1387 rspamd_rrd_convert_ds (old, rrd, 2, METRIC_ACTION_GREYLIST);
1388 rspamd_rrd_convert_ds (old, rrd, 3, METRIC_ACTION_NOACTION);
1389
1390 if (unlink (path) == -1) {
1391 g_set_error (err, rrd_error_quark (), errno, "cannot unlink old rrd file %s: %s",
1392 path, strerror (errno));
1393 unlink (tpath);
1394 rspamd_rrd_close (rrd);
1395
1396 return NULL;
1397 }
1398
1399 if (rename (tpath, path) == -1) {
1400 g_set_error (err, rrd_error_quark (), errno, "cannot rename old rrd file %s: %s",
1401 path, strerror (errno));
1402 unlink (tpath);
1403 rspamd_rrd_close (rrd);
1404
1405 return NULL;
1406 }
1407 }
1408
1409 return rrd;
1410 }
1411
1412 struct rspamd_rrd_file *
rspamd_rrd_file_default(const gchar * path,GError ** err)1413 rspamd_rrd_file_default (const gchar *path,
1414 GError **err)
1415 {
1416 struct rspamd_rrd_file *file, *nf;
1417
1418 g_assert (path != NULL);
1419
1420 if (access (path, R_OK) != -1) {
1421 /* We can open rrd file */
1422 file = rspamd_rrd_open (path, err);
1423
1424 if (file == NULL) {
1425 return NULL;
1426 }
1427
1428
1429 if (file->stat_head->rra_cnt != RSPAMD_RRD_RRA_COUNT) {
1430 msg_err_rrd ("rrd file is not suitable for rspamd: it has "
1431 "%ul ds and %ul rra", file->stat_head->ds_cnt,
1432 file->stat_head->rra_cnt);
1433 g_set_error (err, rrd_error_quark (), EINVAL, "bad rrd file");
1434 rspamd_rrd_close (file);
1435
1436 return NULL;
1437 }
1438 else if (file->stat_head->ds_cnt == RSPAMD_RRD_OLD_DS_COUNT) {
1439 /* Old rrd, need to convert */
1440 msg_info_rrd ("rrd file %s is not suitable for rspamd, convert it",
1441 path);
1442
1443 nf = rspamd_rrd_convert (path, file, err);
1444 rspamd_rrd_close (file);
1445
1446 return nf;
1447 }
1448 else if (file->stat_head->ds_cnt == RSPAMD_RRD_DS_COUNT) {
1449 return file;
1450 }
1451 else {
1452 msg_err_rrd ("rrd file is not suitable for rspamd: it has "
1453 "%ul ds and %ul rra", file->stat_head->ds_cnt,
1454 file->stat_head->rra_cnt);
1455 g_set_error (err, rrd_error_quark (), EINVAL, "bad rrd file");
1456 rspamd_rrd_close (file);
1457
1458 return NULL;
1459 }
1460 }
1461
1462 file = rspamd_rrd_create_file (path, TRUE, err);
1463
1464 return file;
1465 }
1466
1467 struct rspamd_rrd_query_result *
rspamd_rrd_query(struct rspamd_rrd_file * file,gulong rra_num)1468 rspamd_rrd_query (struct rspamd_rrd_file *file,
1469 gulong rra_num)
1470 {
1471 struct rspamd_rrd_query_result *res;
1472 struct rrd_rra_def *rra;
1473 const gdouble *rra_offset = NULL;
1474 guint i;
1475
1476 g_assert (file != NULL);
1477
1478
1479 if (rra_num > file->stat_head->rra_cnt) {
1480 msg_err_rrd ("requested unexisting rra: %l", rra_num);
1481
1482 return NULL;
1483 }
1484
1485 res = g_malloc0 (sizeof (*res));
1486 res->ds_count = file->stat_head->ds_cnt;
1487 res->last_update = (gdouble)file->live_head->last_up +
1488 ((gdouble)file->live_head->last_up_usec / 1e6f);
1489 res->pdp_per_cdp = file->rra_def[rra_num].pdp_cnt;
1490 res->rra_rows = file->rra_def[rra_num].row_cnt;
1491 rra_offset = file->rrd_value;
1492
1493 for (i = 0; i < file->stat_head->rra_cnt; i++) {
1494 rra = &file->rra_def[i];
1495
1496 if (i == rra_num) {
1497 res->cur_row = file->rra_ptr[i].cur_row % rra->row_cnt;
1498 break;
1499 }
1500
1501 rra_offset += rra->row_cnt * res->ds_count;
1502 }
1503
1504 res->data = rra_offset;
1505
1506 return res;
1507 }
1508