1 /*
2 ** Zabbix
3 ** Copyright (C) 2001-2021 Zabbix SIA
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 **/
19
20 #include "common.h"
21 #include "log.h"
22 #include "threads.h"
23
24 #include "db.h"
25 #include "dbcache.h"
26 #include "ipc.h"
27 #include "mutexs.h"
28 #include "zbxserver.h"
29 #include "proxy.h"
30 #include "events.h"
31 #include "memalloc.h"
32 #include "zbxalgo.h"
33 #include "valuecache.h"
34
35 static zbx_mem_info_t *hc_index_mem = NULL;
36 static zbx_mem_info_t *hc_mem = NULL;
37 static zbx_mem_info_t *trend_mem = NULL;
38
39 #define LOCK_CACHE zbx_mutex_lock(&cache_lock)
40 #define UNLOCK_CACHE zbx_mutex_unlock(&cache_lock)
41 #define LOCK_TRENDS zbx_mutex_lock(&trends_lock)
42 #define UNLOCK_TRENDS zbx_mutex_unlock(&trends_lock)
43 #define LOCK_CACHE_IDS zbx_mutex_lock(&cache_ids_lock)
44 #define UNLOCK_CACHE_IDS zbx_mutex_unlock(&cache_ids_lock)
45
46 static ZBX_MUTEX cache_lock = ZBX_MUTEX_NULL;
47 static ZBX_MUTEX trends_lock = ZBX_MUTEX_NULL;
48 static ZBX_MUTEX cache_ids_lock = ZBX_MUTEX_NULL;
49
50 static char *sql = NULL;
51 static size_t sql_alloc = 64 * ZBX_KIBIBYTE;
52
53 extern unsigned char program_type;
54
55 #define ZBX_IDS_SIZE 10
56
57 #define ZBX_HC_ITEMS_INIT_SIZE 1000
58
59 #define ZBX_TRENDS_CLEANUP_TIME ((SEC_PER_HOUR * 55) / 60)
60
61 /* the maximum time spent synchronizing history */
62 #define ZBX_HC_SYNC_TIME_MAX SEC_PER_MIN
63
64 /* the maximum number of items in one synchronization batch */
65 #define ZBX_HC_SYNC_MAX 1000
66
67 /* the minimum processed item percentage of item candidates to continue synchronizing */
68 #define ZBX_HC_SYNC_MIN_PCNT 10
69
70 typedef struct
71 {
72 char table_name[ZBX_TABLENAME_LEN_MAX];
73 zbx_uint64_t lastid;
74 }
75 ZBX_DC_ID;
76
77 typedef struct
78 {
79 ZBX_DC_ID id[ZBX_IDS_SIZE];
80 }
81 ZBX_DC_IDS;
82
83 static ZBX_DC_IDS *ids = NULL;
84
85 #define ZBX_DC_FLAG_META 0x01 /* contains meta information (lastlogsize and mtime) */
86 #define ZBX_DC_FLAG_NOVALUE 0x02 /* entry contains no value */
87 #define ZBX_DC_FLAG_LLD 0x04 /* low-level discovery value */
88 #define ZBX_DC_FLAG_UNDEF 0x08 /* unsupported or undefined (delta calculation failed) value */
89
90 typedef struct
91 {
92 zbx_uint64_t itemid;
93 history_value_t value_orig; /* uninitialized if ZBX_DC_FLAG_NOVALUE is set */
94 history_value_t value; /* uninitialized if ZBX_DC_FLAG_NOVALUE is set, source for log items */
95 zbx_uint64_t lastlogsize;
96 zbx_timespec_t ts;
97 int timestamp; /* uninitialized if ZBX_DC_FLAG_NOVALUE is set */
98 int severity; /* uninitialized if ZBX_DC_FLAG_NOVALUE is set */
99 int logeventid; /* uninitialized if ZBX_DC_FLAG_NOVALUE is set */
100 int mtime;
101 unsigned char value_type;
102 unsigned char flags; /* see ZBX_DC_FLAG_* above */
103 unsigned char keep_history;
104 unsigned char keep_trends;
105 unsigned char state;
106 }
107 ZBX_DC_HISTORY;
108
109 typedef struct
110 {
111 zbx_uint64_t hostid;
112 const char *field_name;
113 char *value_esc;
114 }
115 zbx_inventory_value_t;
116
117 /* value_avg_t structure is used for item average value trend calculations. */
118 /* */
119 /* For double values the average value is calculated on the fly with the */
120 /* following formula: avg = (dbl * count + value) / (count + 1) and stored */
121 /* into dbl member. */
122 /* For uint64 values the item values are summed into ui64 member and the */
123 /* average value is calculated before flushing trends to database: */
124 /* avg = ui64 / count */
125 typedef union
126 {
127 double dbl;
128 zbx_uint128_t ui64;
129 }
130 value_avg_t;
131
132 typedef struct
133 {
134 zbx_uint64_t itemid;
135 history_value_t value_min;
136 value_avg_t value_avg;
137 history_value_t value_max;
138 int clock;
139 int num;
140 int disable_from;
141 unsigned char value_type;
142 }
143 ZBX_DC_TREND;
144
145 typedef struct
146 {
147 zbx_uint64_t history_counter; /* the total number of processed values */
148 zbx_uint64_t history_float_counter; /* the number of processed float values */
149 zbx_uint64_t history_uint_counter; /* the number of processed uint values */
150 zbx_uint64_t history_str_counter; /* the number of processed str values */
151 zbx_uint64_t history_log_counter; /* the number of processed log values */
152 zbx_uint64_t history_text_counter; /* the number of processed text values */
153 zbx_uint64_t notsupported_counter; /* the number of processed not supported items */
154 }
155 ZBX_DC_STATS;
156
157 typedef struct
158 {
159 zbx_hashset_t trends;
160 ZBX_DC_STATS stats;
161
162 zbx_hashset_t history_items;
163 zbx_binary_heap_t history_queue;
164
165 int history_num;
166 int trends_num;
167 int trends_last_cleanup_hour;
168 }
169 ZBX_DC_CACHE;
170
171 static ZBX_DC_CACHE *cache = NULL;
172
173 /* local history cache */
174 #define ZBX_MAX_VALUES_LOCAL 256
175 #define ZBX_STRUCT_REALLOC_STEP 8
176 #define ZBX_STRING_REALLOC_STEP ZBX_KIBIBYTE
177
178 typedef struct
179 {
180 size_t pvalue;
181 size_t len;
182 }
183 dc_value_str_t;
184
185 typedef struct
186 {
187 double value_dbl;
188 zbx_uint64_t value_uint;
189 dc_value_str_t value_str;
190 }
191 dc_value_t;
192
193 typedef struct
194 {
195 zbx_uint64_t itemid;
196 dc_value_t value;
197 zbx_timespec_t ts;
198 dc_value_str_t source; /* for log items only */
199 zbx_uint64_t lastlogsize;
200 int timestamp; /* for log items only */
201 int severity; /* for log items only */
202 int logeventid; /* for log items only */
203 int mtime;
204 unsigned char value_type;
205 unsigned char state;
206 unsigned char flags; /* see ZBX_DC_FLAG_* above */
207 }
208 dc_item_value_t;
209
210 static char *string_values = NULL;
211 static size_t string_values_alloc = 0, string_values_offset = 0;
212 static dc_item_value_t *item_values = NULL;
213 static size_t item_values_alloc = 0, item_values_num = 0;
214
215 static void hc_add_item_values(dc_item_value_t *item_values, int item_values_num);
216 static void hc_pop_items(zbx_vector_ptr_t *history_items);
217 static void hc_get_item_values(ZBX_DC_HISTORY *history, zbx_vector_ptr_t *history_items);
218 static void hc_push_busy_items(zbx_vector_ptr_t *history_items);
219 static int hc_push_processed_items(zbx_vector_ptr_t *history_items);
220 static void hc_free_item_values(ZBX_DC_HISTORY *history, int history_num);
221 static void hc_queue_item(zbx_hc_item_t *item);
222 static int hc_queue_elem_compare_func(const void *d1, const void *d2);
223
224 /******************************************************************************
225 * *
226 * Function: DCget_stats *
227 * *
228 * Purpose: get statistics of the database cache *
229 * *
230 * Author: Alexander Vladishev *
231 * *
232 ******************************************************************************/
DCget_stats(int request)233 void *DCget_stats(int request)
234 {
235 static zbx_uint64_t value_uint;
236 static double value_double;
237 void *ret;
238
239 LOCK_CACHE;
240
241 switch (request)
242 {
243 case ZBX_STATS_HISTORY_COUNTER:
244 value_uint = cache->stats.history_counter;
245 ret = (void *)&value_uint;
246 break;
247 case ZBX_STATS_HISTORY_FLOAT_COUNTER:
248 value_uint = cache->stats.history_float_counter;
249 ret = (void *)&value_uint;
250 break;
251 case ZBX_STATS_HISTORY_UINT_COUNTER:
252 value_uint = cache->stats.history_uint_counter;
253 ret = (void *)&value_uint;
254 break;
255 case ZBX_STATS_HISTORY_STR_COUNTER:
256 value_uint = cache->stats.history_str_counter;
257 ret = (void *)&value_uint;
258 break;
259 case ZBX_STATS_HISTORY_LOG_COUNTER:
260 value_uint = cache->stats.history_log_counter;
261 ret = (void *)&value_uint;
262 break;
263 case ZBX_STATS_HISTORY_TEXT_COUNTER:
264 value_uint = cache->stats.history_text_counter;
265 ret = (void *)&value_uint;
266 break;
267 case ZBX_STATS_NOTSUPPORTED_COUNTER:
268 value_uint = cache->stats.notsupported_counter;
269 ret = (void *)&value_uint;
270 break;
271 case ZBX_STATS_HISTORY_TOTAL:
272 value_uint = hc_mem->total_size;
273 ret = (void *)&value_uint;
274 break;
275 case ZBX_STATS_HISTORY_USED:
276 value_uint = hc_mem->total_size - hc_mem->free_size;
277 ret = (void *)&value_uint;
278 break;
279 case ZBX_STATS_HISTORY_FREE:
280 value_uint = hc_mem->free_size;
281 ret = (void *)&value_uint;
282 break;
283 case ZBX_STATS_HISTORY_PFREE:
284 value_double = 100 * (double)hc_mem->free_size / hc_mem->total_size;
285 ret = (void *)&value_double;
286 break;
287 case ZBX_STATS_TREND_TOTAL:
288 value_uint = trend_mem->orig_size;
289 ret = (void *)&value_uint;
290 break;
291 case ZBX_STATS_TREND_USED:
292 value_uint = trend_mem->orig_size - trend_mem->free_size;
293 ret = (void *)&value_uint;
294 break;
295 case ZBX_STATS_TREND_FREE:
296 value_uint = trend_mem->free_size;
297 ret = (void *)&value_uint;
298 break;
299 case ZBX_STATS_TREND_PFREE:
300 value_double = 100 * (double)trend_mem->free_size / trend_mem->orig_size;
301 ret = (void *)&value_double;
302 break;
303 case ZBX_STATS_HISTORY_INDEX_TOTAL:
304 value_uint = hc_index_mem->total_size;
305 ret = (void *)&value_uint;
306 break;
307 case ZBX_STATS_HISTORY_INDEX_USED:
308 value_uint = hc_index_mem->total_size - hc_index_mem->free_size;
309 ret = (void *)&value_uint;
310 break;
311 case ZBX_STATS_HISTORY_INDEX_FREE:
312 value_uint = hc_index_mem->free_size;
313 ret = (void *)&value_uint;
314 break;
315 case ZBX_STATS_HISTORY_INDEX_PFREE:
316 value_double = 100 * (double)hc_index_mem->free_size / hc_index_mem->total_size;
317 ret = (void *)&value_double;
318 break;
319 default:
320 ret = NULL;
321 }
322
323 UNLOCK_CACHE;
324
325 return ret;
326 }
327
328 /******************************************************************************
329 * *
330 * Function: DCget_trend *
331 * *
332 * Purpose: find existing or add new structure and return pointer *
333 * *
334 * Return value: pointer to a trend structure *
335 * *
336 * Author: Alexander Vladishev *
337 * *
338 ******************************************************************************/
DCget_trend(zbx_uint64_t itemid)339 static ZBX_DC_TREND *DCget_trend(zbx_uint64_t itemid)
340 {
341 ZBX_DC_TREND *ptr, trend;
342
343 if (NULL != (ptr = (ZBX_DC_TREND *)zbx_hashset_search(&cache->trends, &itemid)))
344 return ptr;
345
346 memset(&trend, 0, sizeof(ZBX_DC_TREND));
347 trend.itemid = itemid;
348
349 return (ZBX_DC_TREND *)zbx_hashset_insert(&cache->trends, &trend, sizeof(ZBX_DC_TREND));
350 }
351
352 /******************************************************************************
353 * *
354 * Function: DCflush_trends *
355 * *
356 * Purpose: flush trend to the database *
357 * *
358 * Author: Alexander Vladishev *
359 * *
360 ******************************************************************************/
DCflush_trends(ZBX_DC_TREND * trends,int * trends_num,int update_cache)361 static void DCflush_trends(ZBX_DC_TREND *trends, int *trends_num, int update_cache)
362 {
363 const char *__function_name = "DCflush_trends";
364 DB_RESULT result;
365 DB_ROW row;
366 size_t sql_offset;
367 int num, i, clock, inserts_num = 0, ids_alloc, ids_num = 0, trends_to = *trends_num;
368 history_value_t value_min, value_avg, value_max;
369 unsigned char value_type;
370 zbx_uint64_t *ids = NULL, itemid;
371 ZBX_DC_TREND *trend = NULL;
372 const char *table_name;
373 zbx_db_insert_t db_insert;
374
375 zabbix_log(LOG_LEVEL_DEBUG, "In %s() trends_num:%d", __function_name, *trends_num);
376
377 clock = trends[0].clock;
378 value_type = trends[0].value_type;
379
380 switch (value_type)
381 {
382 case ITEM_VALUE_TYPE_FLOAT:
383 table_name = "trends";
384 break;
385 case ITEM_VALUE_TYPE_UINT64:
386 table_name = "trends_uint";
387 break;
388 default:
389 assert(0);
390 }
391
392 ids_alloc = MIN(ZBX_HC_SYNC_MAX, *trends_num);
393 ids = zbx_malloc(ids, ids_alloc * sizeof(zbx_uint64_t));
394
395 for (i = 0; i < *trends_num; i++)
396 {
397 trend = &trends[i];
398
399 if (clock != trend->clock || value_type != trend->value_type)
400 continue;
401
402 inserts_num++;
403
404 if (0 != trend->disable_from)
405 continue;
406
407 uint64_array_add(&ids, &ids_alloc, &ids_num, trend->itemid, 64);
408
409 if (ZBX_HC_SYNC_MAX == ids_num)
410 {
411 trends_to = i + 1;
412 break;
413 }
414 }
415
416 if (0 != ids_num)
417 {
418 sql_offset = 0;
419 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
420 "select distinct itemid"
421 " from %s"
422 " where clock>=%d and",
423 table_name, clock);
424
425 DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", ids, ids_num);
426
427 result = DBselect("%s", sql);
428
429 while (NULL != (row = DBfetch(result)))
430 {
431 ZBX_STR2UINT64(itemid, row[0]);
432 uint64_array_remove(ids, &ids_num, &itemid, 1);
433 }
434 DBfree_result(result);
435
436 while (0 != ids_num)
437 {
438 itemid = ids[--ids_num];
439
440 for (i = 0; i < trends_to; i++)
441 {
442 trend = &trends[i];
443
444 if (itemid != trend->itemid)
445 continue;
446
447 if (clock != trend->clock || value_type != trend->value_type)
448 continue;
449
450 trend->disable_from = clock;
451 break;
452 }
453 }
454 }
455
456 for (i = 0; i < trends_to; i++)
457 {
458 trend = &trends[i];
459
460 if (clock != trend->clock || value_type != trend->value_type)
461 continue;
462
463 if (0 != trend->disable_from && trend->disable_from <= clock)
464 continue;
465
466 uint64_array_add(&ids, &ids_alloc, &ids_num, trend->itemid, 64);
467 }
468
469 if (0 != ids_num)
470 {
471 sql_offset = 0;
472 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
473 "select itemid,num,value_min,value_avg,value_max"
474 " from %s"
475 " where clock=%d and",
476 table_name, clock);
477
478 DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", ids, ids_num);
479
480 result = DBselect("%s", sql);
481
482 sql_offset = 0;
483 DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset);
484
485 while (NULL != (row = DBfetch(result)))
486 {
487 ZBX_STR2UINT64(itemid, row[0]);
488
489 for (i = 0; i < trends_to; i++)
490 {
491 trend = &trends[i];
492
493 if (itemid != trend->itemid)
494 continue;
495
496 if (clock != trend->clock || value_type != trend->value_type)
497 continue;
498
499 break;
500 }
501
502 if (i == trends_to)
503 {
504 THIS_SHOULD_NEVER_HAPPEN;
505 continue;
506 }
507
508 num = atoi(row[1]);
509
510 if (value_type == ITEM_VALUE_TYPE_FLOAT)
511 {
512 value_min.dbl = atof(row[2]);
513 value_avg.dbl = atof(row[3]);
514 value_max.dbl = atof(row[4]);
515
516 if (value_min.dbl < trend->value_min.dbl)
517 trend->value_min.dbl = value_min.dbl;
518 if (value_max.dbl > trend->value_max.dbl)
519 trend->value_max.dbl = value_max.dbl;
520 trend->value_avg.dbl = (trend->num * trend->value_avg.dbl
521 + num * value_avg.dbl) / (trend->num + num);
522 trend->num += num;
523
524 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
525 "update trends set num=%d,value_min=" ZBX_FS_DBL ",value_avg="
526 ZBX_FS_DBL ",value_max=" ZBX_FS_DBL " where itemid=" ZBX_FS_UI64
527 " and clock=%d;\n",
528 trend->num,
529 trend->value_min.dbl,
530 trend->value_avg.dbl,
531 trend->value_max.dbl,
532 trend->itemid,
533 trend->clock);
534 }
535 else
536 {
537 zbx_uint128_t avg;
538
539 ZBX_STR2UINT64(value_min.ui64, row[2]);
540 ZBX_STR2UINT64(value_avg.ui64, row[3]);
541 ZBX_STR2UINT64(value_max.ui64, row[4]);
542
543 if (value_min.ui64 < trend->value_min.ui64)
544 trend->value_min.ui64 = value_min.ui64;
545 if (value_max.ui64 > trend->value_max.ui64)
546 trend->value_max.ui64 = value_max.ui64;
547
548 /* calculate the trend average value */
549 umul64_64(&avg, num, value_avg.ui64);
550 uinc128_128(&trend->value_avg.ui64, &avg);
551 udiv128_64(&avg, &trend->value_avg.ui64, trend->num + num);
552
553 trend->num += num;
554
555 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
556 "update trends_uint set num=%d,value_min=" ZBX_FS_UI64 ",value_avg="
557 ZBX_FS_UI64 ",value_max=" ZBX_FS_UI64 " where itemid=" ZBX_FS_UI64
558 " and clock=%d;\n",
559 trend->num,
560 trend->value_min.ui64,
561 avg.lo,
562 trend->value_max.ui64,
563 trend->itemid,
564 trend->clock);
565 }
566
567 trend->itemid = 0;
568
569 inserts_num--;
570
571 DBexecute_overflowed_sql(&sql, &sql_alloc, &sql_offset);
572 }
573 DBfree_result(result);
574
575 DBend_multiple_update(&sql, &sql_alloc, &sql_offset);
576
577 if (sql_offset > 16) /* In ORACLE always present begin..end; */
578 DBexecute("%s", sql);
579 }
580
581 zbx_free(ids);
582
583 /* if 'trends' is not a primary trends buffer */
584 if (0 != update_cache)
585 {
586 /* we update it too */
587 LOCK_TRENDS;
588
589 for (i = 0; i < trends_to; i++)
590 {
591 if (0 == trends[i].itemid)
592 continue;
593
594 if (clock != trends[i].clock || value_type != trends[i].value_type)
595 continue;
596
597 if (0 == trends[i].disable_from || trends[i].disable_from > clock)
598 continue;
599
600 if (NULL != (trend = zbx_hashset_search(&cache->trends, &trends[i].itemid)))
601 trend->disable_from = clock + SEC_PER_HOUR;
602 }
603
604 UNLOCK_TRENDS;
605 }
606
607 if (0 != inserts_num)
608 {
609 zbx_db_insert_prepare(&db_insert, table_name, "itemid", "clock", "num", "value_min", "value_avg",
610 "value_max", NULL);
611
612 for (i = 0; i < trends_to; i++)
613 {
614 trend = &trends[i];
615
616 if (0 == trend->itemid)
617 continue;
618
619 if (clock != trend->clock || value_type != trend->value_type)
620 continue;
621
622 if (ITEM_VALUE_TYPE_FLOAT == value_type)
623 {
624 zbx_db_insert_add_values(&db_insert, trend->itemid, trend->clock, trend->num,
625 trend->value_min.dbl, trend->value_avg.dbl, trend->value_max.dbl);
626 }
627 else
628 {
629 zbx_uint128_t avg;
630
631 /* calculate the trend average value */
632 udiv128_64(&avg, &trend->value_avg.ui64, trend->num);
633
634 zbx_db_insert_add_values(&db_insert, trend->itemid, trend->clock, trend->num,
635 trend->value_min.ui64, avg.lo, trend->value_max.ui64);
636 }
637
638 trend->itemid = 0;
639 }
640
641 zbx_db_insert_execute(&db_insert);
642 zbx_db_insert_clean(&db_insert);
643 }
644
645 /* clean trends */
646 for (i = 0, num = 0; i < *trends_num; i++)
647 {
648 if (0 == trends[i].itemid)
649 continue;
650
651 memcpy(&trends[num++], &trends[i], sizeof(ZBX_DC_TREND));
652 }
653 *trends_num = num;
654
655 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
656 }
657
658 /******************************************************************************
659 * *
660 * Function: DCflush_trend *
661 * *
662 * Purpose: move trend to the array of trends for flushing to DB *
663 * *
664 * Author: Alexander Vladishev *
665 * *
666 ******************************************************************************/
DCflush_trend(ZBX_DC_TREND * trend,ZBX_DC_TREND ** trends,int * trends_alloc,int * trends_num)667 static void DCflush_trend(ZBX_DC_TREND *trend, ZBX_DC_TREND **trends, int *trends_alloc, int *trends_num)
668 {
669 if (*trends_num == *trends_alloc)
670 {
671 *trends_alloc += 256;
672 *trends = zbx_realloc(*trends, *trends_alloc * sizeof(ZBX_DC_TREND));
673 }
674
675 memcpy(&(*trends)[*trends_num], trend, sizeof(ZBX_DC_TREND));
676 (*trends_num)++;
677
678 trend->clock = 0;
679 trend->num = 0;
680 memset(&trend->value_min, 0, sizeof(history_value_t));
681 memset(&trend->value_avg, 0, sizeof(value_avg_t));
682 memset(&trend->value_max, 0, sizeof(history_value_t));
683 }
684
685 /******************************************************************************
686 * *
687 * Function: DCadd_trend *
688 * *
689 * Purpose: add new value to the trends *
690 * *
691 * Author: Alexander Vladishev *
692 * *
693 ******************************************************************************/
DCadd_trend(const ZBX_DC_HISTORY * history,ZBX_DC_TREND ** trends,int * trends_alloc,int * trends_num)694 static void DCadd_trend(const ZBX_DC_HISTORY *history, ZBX_DC_TREND **trends, int *trends_alloc, int *trends_num)
695 {
696 ZBX_DC_TREND *trend = NULL;
697 int hour;
698
699 hour = history->ts.sec - history->ts.sec % SEC_PER_HOUR;
700
701 trend = DCget_trend(history->itemid);
702
703 if (trend->num > 0 && (trend->clock != hour || trend->value_type != history->value_type))
704 DCflush_trend(trend, trends, trends_alloc, trends_num);
705
706 trend->value_type = history->value_type;
707 trend->clock = hour;
708
709 switch (trend->value_type)
710 {
711 case ITEM_VALUE_TYPE_FLOAT:
712 if (trend->num == 0 || history->value.dbl < trend->value_min.dbl)
713 trend->value_min.dbl = history->value.dbl;
714 if (trend->num == 0 || history->value.dbl > trend->value_max.dbl)
715 trend->value_max.dbl = history->value.dbl;
716 trend->value_avg.dbl = (trend->num * trend->value_avg.dbl
717 + history->value.dbl) / (trend->num + 1);
718 break;
719 case ITEM_VALUE_TYPE_UINT64:
720 if (trend->num == 0 || history->value.ui64 < trend->value_min.ui64)
721 trend->value_min.ui64 = history->value.ui64;
722 if (trend->num == 0 || history->value.ui64 > trend->value_max.ui64)
723 trend->value_max.ui64 = history->value.ui64;
724 uinc128_64(&trend->value_avg.ui64, history->value.ui64);
725 break;
726 }
727 trend->num++;
728 }
729
730 /******************************************************************************
731 * *
732 * Function: DCmass_update_trends *
733 * *
734 * Parameters: history - array of history data *
735 * history_num - number of history structures *
736 * *
737 * Author: Alexander Vladishev *
738 * *
739 ******************************************************************************/
DCmass_update_trends(ZBX_DC_HISTORY * history,int history_num)740 static void DCmass_update_trends(ZBX_DC_HISTORY *history, int history_num)
741 {
742 const char *__function_name = "DCmass_update_trends";
743 ZBX_DC_TREND *trends = NULL;
744 zbx_timespec_t ts;
745 int trends_alloc = 0, trends_num = 0, i, hour, seconds;
746
747 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
748
749 zbx_timespec(&ts);
750 seconds = ts.sec % SEC_PER_HOUR;
751 hour = ts.sec - seconds;
752
753 LOCK_TRENDS;
754
755 for (i = 0; i < history_num; i++)
756 {
757 const ZBX_DC_HISTORY *h = &history[i];
758
759 if (0 != (ZBX_DC_FLAG_UNDEF & h->flags) || 0 != (ZBX_DC_FLAG_NOVALUE & h->flags))
760 continue;
761
762 if (0 == h->keep_trends)
763 continue;
764
765 DCadd_trend(h, &trends, &trends_alloc, &trends_num);
766 }
767
768 if (cache->trends_last_cleanup_hour < hour && ZBX_TRENDS_CLEANUP_TIME < seconds)
769 {
770 zbx_hashset_iter_t iter;
771 ZBX_DC_TREND *trend;
772
773 zbx_hashset_iter_reset(&cache->trends, &iter);
774
775 while (NULL != (trend = (ZBX_DC_TREND *)zbx_hashset_iter_next(&iter)))
776 {
777 if (trend->clock != hour)
778 {
779 DCflush_trend(trend, &trends, &trends_alloc, &trends_num);
780 zbx_hashset_iter_remove(&iter);
781 }
782 }
783
784 cache->trends_last_cleanup_hour = hour;
785 }
786
787 UNLOCK_TRENDS;
788
789 while (0 < trends_num)
790 DCflush_trends(trends, &trends_num, 1);
791
792 zbx_free(trends);
793
794 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
795 }
796
797 /******************************************************************************
798 * *
799 * Function: DCsync_trends *
800 * *
801 * Purpose: flush all trends to the database *
802 * *
803 * Author: Alexander Vladishev *
804 * *
805 ******************************************************************************/
DCsync_trends()806 static void DCsync_trends()
807 {
808 const char *__function_name = "DCsync_trends";
809 zbx_hashset_iter_t iter;
810 ZBX_DC_TREND *trends = NULL, *trend;
811 int trends_alloc = 0, trends_num = 0;
812
813 zabbix_log(LOG_LEVEL_DEBUG, "In %s() trends_num:%d", __function_name, cache->trends_num);
814
815 zabbix_log(LOG_LEVEL_WARNING, "syncing trends data...");
816
817 LOCK_TRENDS;
818
819 zbx_hashset_iter_reset(&cache->trends, &iter);
820
821 while (NULL != (trend = (ZBX_DC_TREND *)zbx_hashset_iter_next(&iter)))
822 DCflush_trend(trend, &trends, &trends_alloc, &trends_num);
823
824 UNLOCK_TRENDS;
825
826 DBbegin();
827
828 while (trends_num > 0)
829 DCflush_trends(trends, &trends_num, 0);
830
831 DBcommit();
832
833 zbx_free(trends);
834
835 zabbix_log(LOG_LEVEL_WARNING, "syncing trends data done");
836
837 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
838 }
839
840 /******************************************************************************
841 * *
842 * Function: DCmass_update_triggers *
843 * *
844 * Purpose: re-calculate and update values of triggers related to the items *
845 * *
846 * Parameters: history - array of history data *
847 * history_num - number of history structures *
848 * *
849 * Author: Alexei Vladishev, Alexander Vladishev *
850 * *
851 ******************************************************************************/
DCmass_update_triggers(ZBX_DC_HISTORY * history,int history_num)852 static void DCmass_update_triggers(ZBX_DC_HISTORY *history, int history_num)
853 {
854 const char *__function_name = "DCmass_update_triggers";
855 int i, item_num = 0;
856 zbx_uint64_t *itemids = NULL;
857 zbx_timespec_t *timespecs = NULL;
858 zbx_hashset_t trigger_info;
859 zbx_vector_ptr_t trigger_order;
860
861 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
862
863 itemids = zbx_malloc(itemids, sizeof(zbx_uint64_t) * history_num);
864 timespecs = zbx_malloc(timespecs, sizeof(zbx_timespec_t) * history_num);
865
866 for (i = 0; i < history_num; i++)
867 {
868 const ZBX_DC_HISTORY *h = &history[i];
869
870 if (0 != (ZBX_DC_FLAG_UNDEF & h->flags) || 0 != (ZBX_DC_FLAG_NOVALUE & h->flags))
871 continue;
872
873 itemids[item_num] = h->itemid;
874 timespecs[item_num] = h->ts;
875 item_num++;
876 }
877
878 if (0 == item_num)
879 goto clean_items;
880
881 zbx_hashset_create(&trigger_info, MAX(100, 2 * item_num),
882 ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
883
884 zbx_vector_ptr_create(&trigger_order);
885 zbx_vector_ptr_reserve(&trigger_order, item_num);
886
887 DCconfig_get_triggers_by_itemids(&trigger_info, &trigger_order, itemids, timespecs, NULL, item_num,
888 ZBX_EXPAND_MACROS);
889
890 if (0 == trigger_order.values_num)
891 goto clean_triggers;
892
893 evaluate_expressions(&trigger_order);
894
895 process_triggers(&trigger_order);
896
897 DCfree_triggers(&trigger_order);
898 clean_triggers:
899 zbx_hashset_destroy(&trigger_info);
900 zbx_vector_ptr_destroy(&trigger_order);
901 clean_items:
902 zbx_free(timespecs);
903 zbx_free(itemids);
904
905 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
906 }
907
DBchk_double(double value)908 static int DBchk_double(double value)
909 {
910 /* field with precision 16, scale 4 [NUMERIC(16,4)] */
911 const double pg_min_numeric = -1e12;
912 const double pg_max_numeric = 1e12;
913
914 if (value <= pg_min_numeric || value >= pg_max_numeric)
915 return FAIL;
916
917 return SUCCEED;
918 }
919
multiply_item_value_float(DC_ITEM * item,double value)920 static double multiply_item_value_float(DC_ITEM *item, double value)
921 {
922 double value_double;
923
924 if (ITEM_MULTIPLIER_USE != item->multiplier)
925 return value;
926
927 value_double = value * atof(item->formula);
928
929 zabbix_log(LOG_LEVEL_DEBUG, "multiply_item_value_float() " ZBX_FS_DBL ",%s " ZBX_FS_DBL,
930 value, item->formula, value_double);
931
932 return value_double;
933 }
934
multiply_item_value_uint64(DC_ITEM * item,zbx_uint64_t value)935 static zbx_uint64_t multiply_item_value_uint64(DC_ITEM *item, zbx_uint64_t value)
936 {
937 zbx_uint64_t formula_uint64, value_uint64;
938
939 if (ITEM_MULTIPLIER_USE != item->multiplier)
940 return value;
941
942 if (SUCCEED == is_uint64(item->formula, &formula_uint64))
943 value_uint64 = value * formula_uint64;
944 else
945 value_uint64 = (zbx_uint64_t)((double)value * atof(item->formula));
946
947 zabbix_log(LOG_LEVEL_DEBUG, "multiply_item_value_uint64() " ZBX_FS_UI64 ",%s " ZBX_FS_UI64,
948 value, item->formula, value_uint64);
949
950 return value_uint64;
951 }
952
953 /******************************************************************************
954 * *
955 * Function: DCcalculate_item_delta_float *
956 * *
957 * Purpose: calculate delta value for items of float value type *
958 * *
959 * Parameters: item - [IN] item reference *
960 * h - [IN/OUT] a reference to history cache value *
961 * deltaitem - [IN] a reference to the last raw history value *
962 * (value + timestamp) *
963 * *
964 ******************************************************************************/
DCcalculate_item_delta_float(DC_ITEM * item,ZBX_DC_HISTORY * h,zbx_item_history_value_t * deltaitem)965 static void DCcalculate_item_delta_float(DC_ITEM *item, ZBX_DC_HISTORY *h, zbx_item_history_value_t *deltaitem)
966 {
967 switch (item->delta)
968 {
969 case ITEM_STORE_AS_IS:
970 h->value.dbl = multiply_item_value_float(item, h->value_orig.dbl);
971
972 if (SUCCEED != DBchk_double(h->value.dbl))
973 {
974 h->state = ITEM_STATE_NOTSUPPORTED;
975 h->flags |= ZBX_DC_FLAG_UNDEF;
976 }
977
978 break;
979 case ITEM_STORE_SPEED_PER_SECOND:
980 if (0 != deltaitem->timestamp.sec && deltaitem->value.dbl <= h->value_orig.dbl &&
981 0 > zbx_timespec_compare(&deltaitem->timestamp, &h->ts))
982 {
983 h->value.dbl = (h->value_orig.dbl - deltaitem->value.dbl) /
984 ((h->ts.sec - deltaitem->timestamp.sec) +
985 (double)(h->ts.ns - deltaitem->timestamp.ns) / 1000000000);
986 h->value.dbl = multiply_item_value_float(item, h->value.dbl);
987
988 if (SUCCEED != DBchk_double(h->value.dbl))
989 {
990 h->state = ITEM_STATE_NOTSUPPORTED;
991 h->flags |= ZBX_DC_FLAG_UNDEF;
992 }
993 }
994 else
995 h->flags |= ZBX_DC_FLAG_UNDEF;
996
997 break;
998 case ITEM_STORE_SIMPLE_CHANGE:
999 if (0 != deltaitem->timestamp.sec && deltaitem->value.dbl <= h->value_orig.dbl)
1000 {
1001 h->value.dbl = h->value_orig.dbl - deltaitem->value.dbl;
1002 h->value.dbl = multiply_item_value_float(item, h->value.dbl);
1003
1004 if (SUCCEED != DBchk_double(h->value.dbl))
1005 {
1006 h->state = ITEM_STATE_NOTSUPPORTED;
1007 h->flags |= ZBX_DC_FLAG_UNDEF;
1008 }
1009 }
1010 else
1011 h->flags |= ZBX_DC_FLAG_UNDEF;
1012
1013 break;
1014 }
1015
1016 if (ITEM_STATE_NOTSUPPORTED == h->state)
1017 {
1018 int errcode = SUCCEED;
1019
1020 h->value_orig.err = zbx_dsprintf(NULL, "Type of received value"
1021 " [" ZBX_FS_DBL "] is not suitable for value type [%s]",
1022 h->value.dbl, zbx_item_value_type_string(item->value_type));
1023
1024 DCrequeue_items(&h->itemid, &h->state, &h->ts.sec, NULL, NULL, &errcode, 1);
1025 }
1026 }
1027
1028 /******************************************************************************
1029 * *
1030 * Function: DCcalculate_item_delta_uint64 *
1031 * *
1032 * Purpose: calculate delta value for items of uint64 value type *
1033 * *
1034 * Parameters: item - [IN] item reference *
1035 * h - [IN/OUT] a reference to history cache value *
1036 * deltaitem - [IN] a reference to the last raw history value *
1037 * (value + timestamp) *
1038 * *
1039 ******************************************************************************/
DCcalculate_item_delta_uint64(DC_ITEM * item,ZBX_DC_HISTORY * h,zbx_item_history_value_t * deltaitem)1040 static void DCcalculate_item_delta_uint64(DC_ITEM *item, ZBX_DC_HISTORY *h, zbx_item_history_value_t *deltaitem)
1041 {
1042 switch (item->delta)
1043 {
1044 case ITEM_STORE_AS_IS:
1045 h->value.ui64 = multiply_item_value_uint64(item, h->value_orig.ui64);
1046
1047 break;
1048 case ITEM_STORE_SPEED_PER_SECOND:
1049 if (0 != deltaitem->timestamp.sec && deltaitem->value.ui64 <= h->value_orig.ui64 &&
1050 0 > zbx_timespec_compare(&deltaitem->timestamp, &h->ts))
1051 {
1052 h->value.ui64 = (h->value_orig.ui64 - deltaitem->value.ui64) /
1053 ((h->ts.sec - deltaitem->timestamp.sec) +
1054 (double)(h->ts.ns - deltaitem->timestamp.ns) / 1000000000);
1055 h->value.ui64 = multiply_item_value_uint64(item, h->value.ui64);
1056 }
1057 else
1058 h->flags |= ZBX_DC_FLAG_UNDEF;
1059
1060 break;
1061 case ITEM_STORE_SIMPLE_CHANGE:
1062 if (0 != deltaitem->timestamp.sec && deltaitem->value.ui64 <= h->value_orig.ui64)
1063 {
1064 h->value.ui64 = h->value_orig.ui64 - deltaitem->value.ui64;
1065 h->value.ui64 = multiply_item_value_uint64(item, h->value.ui64);
1066 }
1067 else
1068 h->flags |= ZBX_DC_FLAG_UNDEF;
1069
1070 break;
1071 }
1072 }
1073
DCget_deltaitem(zbx_hashset_t * delta_history,DC_ITEM * item,ZBX_DC_HISTORY * h)1074 zbx_item_history_value_t *DCget_deltaitem(zbx_hashset_t *delta_history, DC_ITEM *item, ZBX_DC_HISTORY *h)
1075 {
1076 zbx_item_history_value_t *deltaitem;
1077
1078 if (ITEM_VALUE_TYPE_FLOAT != item->value_type && ITEM_VALUE_TYPE_UINT64 != item->value_type)
1079 return NULL;
1080
1081 if (ITEM_STORE_AS_IS == item->delta)
1082 return NULL;
1083
1084 deltaitem = zbx_hashset_search(delta_history, &item->itemid);
1085
1086 if (ITEM_STATE_NOTSUPPORTED == h->state)
1087 return deltaitem;
1088
1089 if (NULL == deltaitem)
1090 {
1091 zbx_item_history_value_t value = {item->itemid};
1092
1093 deltaitem = zbx_hashset_insert(delta_history, &value, sizeof(value));
1094 }
1095
1096 return deltaitem;
1097 }
1098
1099 /******************************************************************************
1100 * *
1101 * Function: DCadd_update_item_sql *
1102 * *
1103 * Purpose: 1) generate sql for updating item in database *
1104 * 2) calculate item delta value *
1105 * 3) add events (item supported/not supported) *
1106 * 4) update cache (requeue item, add nextcheck) *
1107 * *
1108 * Parameters: item - [IN/OUT] item reference *
1109 * h - [IN/OUT] a reference to history cache value *
1110 * *
1111 ******************************************************************************/
DCadd_update_item_sql(size_t * sql_offset,DC_ITEM * item,ZBX_DC_HISTORY * h,zbx_hashset_t * delta_history)1112 static void DCadd_update_item_sql(size_t *sql_offset, DC_ITEM *item, ZBX_DC_HISTORY *h,
1113 zbx_hashset_t *delta_history)
1114 {
1115 char *value_esc;
1116 const char *sql_start = "update items set ", *sql_continue = ",";
1117 zbx_item_history_value_t *deltaitem;
1118
1119 deltaitem = DCget_deltaitem(delta_history, item, h);
1120
1121 if (ITEM_STATE_NOTSUPPORTED == h->state)
1122 goto notsupported;
1123
1124 if (0 == (ZBX_DC_FLAG_NOVALUE & h->flags))
1125 {
1126 switch (item->value_type)
1127 {
1128 case ITEM_VALUE_TYPE_FLOAT:
1129 DCcalculate_item_delta_float(item, h, deltaitem);
1130 break;
1131 case ITEM_VALUE_TYPE_UINT64:
1132 DCcalculate_item_delta_uint64(item, h, deltaitem);
1133 break;
1134 }
1135 }
1136
1137 if (0 != (ZBX_DC_FLAG_META & h->flags))
1138 {
1139 zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset, "%slastlogsize=" ZBX_FS_UI64 ",mtime=%d",
1140 sql_start, h->lastlogsize, h->mtime);
1141 sql_start = sql_continue;
1142 }
1143
1144 notsupported:
1145 /* update the last value (raw) of the delta items */
1146 if (NULL != deltaitem)
1147 {
1148 /* set timestamp.sec to zero to remove this record from delta items later */
1149 if (ITEM_STATE_NOTSUPPORTED == h->state || ITEM_STORE_AS_IS == item->delta)
1150 {
1151 deltaitem->timestamp.sec = 0;
1152 }
1153 else
1154 {
1155 deltaitem->timestamp = h->ts;
1156 deltaitem->value = h->value_orig;
1157 }
1158 }
1159
1160 if (ITEM_STATE_NOTSUPPORTED == h->state)
1161 {
1162 int update_cache = 0;
1163
1164 if (ITEM_STATE_NOTSUPPORTED != item->db_state)
1165 {
1166 unsigned char object;
1167
1168 zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" became not supported: %s",
1169 item->host.host, item->key_orig, h->value_orig.err);
1170
1171 object = (0 != (ZBX_FLAG_DISCOVERY_RULE & item->flags) ?
1172 EVENT_OBJECT_LLDRULE : EVENT_OBJECT_ITEM);
1173 add_event(0, EVENT_SOURCE_INTERNAL, object, item->itemid, &h->ts, h->state, NULL, NULL, 0, 0);
1174
1175 zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset, "%sstate=%d", sql_start, (int)h->state);
1176 sql_start = sql_continue;
1177
1178 update_cache = 1;
1179 }
1180
1181 if (0 != strcmp(item->db_error, h->value_orig.err))
1182 {
1183 value_esc = DBdyn_escape_field("items", "error", h->value_orig.err);
1184 zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset, "%serror='%s'", sql_start, value_esc);
1185 sql_start = sql_continue;
1186
1187 if (ITEM_STATE_NOTSUPPORTED == item->db_state)
1188 {
1189 zabbix_log(LOG_LEVEL_WARNING, "error reason for \"%s:%s\" changed: %s", item->host.host,
1190 item->key_orig, h->value_orig.err);
1191 }
1192
1193 zbx_free(value_esc);
1194
1195 update_cache = 1;
1196 }
1197
1198 DCadd_nextcheck(item->itemid, &h->ts, h->value_orig.err);
1199
1200 if (0 != update_cache)
1201 DCconfig_set_item_db_state(item->itemid, h->state, h->value_orig.err);
1202 }
1203 else
1204 {
1205 if (ITEM_STATE_NOTSUPPORTED == item->db_state)
1206 {
1207 zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" became supported",
1208 item->host.host, item->key_orig);
1209
1210 /* we know it's EVENT_OBJECT_ITEM because LLDRULE that becomes */
1211 /* supported is handled in lld_process_discovery_rule() */
1212 add_event(0, EVENT_SOURCE_INTERNAL, EVENT_OBJECT_ITEM, item->itemid, &h->ts, h->state,
1213 NULL, NULL, 0, 0);
1214
1215 zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset, "%sstate=%d,error=''", sql_start,
1216 (int)h->state);
1217 sql_start = sql_continue;
1218
1219 DCconfig_set_item_db_state(item->itemid, h->state, "");
1220 }
1221 }
1222 if (sql_start == sql_continue)
1223 zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset, " where itemid=" ZBX_FS_UI64 ";\n", item->itemid);
1224 }
1225
DCinventory_value_add(zbx_vector_ptr_t * inventory_values,DC_ITEM * item,ZBX_DC_HISTORY * h)1226 static void DCinventory_value_add(zbx_vector_ptr_t *inventory_values, DC_ITEM *item, ZBX_DC_HISTORY *h)
1227 {
1228 char value[MAX_BUFFER_LEN];
1229 const char *inventory_field;
1230 zbx_inventory_value_t *inventory_value;
1231
1232 if (ITEM_STATE_NOTSUPPORTED == h->state)
1233 return;
1234
1235 if (HOST_INVENTORY_AUTOMATIC != item->host.inventory_mode)
1236 return;
1237
1238 if (0 != (ZBX_DC_FLAG_UNDEF & h->flags) || 0 != (ZBX_DC_FLAG_NOVALUE & h->flags) ||
1239 NULL == (inventory_field = DBget_inventory_field(item->inventory_link)))
1240 {
1241 return;
1242 }
1243
1244 switch (h->value_type)
1245 {
1246 case ITEM_VALUE_TYPE_FLOAT:
1247 zbx_snprintf(value, sizeof(value), ZBX_FS_DBL, h->value.dbl);
1248 break;
1249 case ITEM_VALUE_TYPE_UINT64:
1250 zbx_snprintf(value, sizeof(value), ZBX_FS_UI64, h->value.ui64);
1251 break;
1252 case ITEM_VALUE_TYPE_STR:
1253 case ITEM_VALUE_TYPE_TEXT:
1254 strscpy(value, h->value_orig.str);
1255 break;
1256 default:
1257 return;
1258 }
1259
1260 zbx_format_value(value, sizeof(value), item->valuemapid, item->units, h->value_type);
1261
1262 inventory_value = zbx_malloc(NULL, sizeof(zbx_inventory_value_t));
1263
1264 inventory_value->hostid = item->host.hostid;
1265 inventory_value->field_name = inventory_field;
1266 inventory_value->value_esc = DBdyn_escape_field("host_inventory", inventory_field, value);
1267
1268 zbx_vector_ptr_append(inventory_values, inventory_value);
1269 }
1270
DCadd_update_inventory_sql(size_t * sql_offset,zbx_vector_ptr_t * inventory_values)1271 static void DCadd_update_inventory_sql(size_t *sql_offset, zbx_vector_ptr_t *inventory_values)
1272 {
1273 int i;
1274
1275 if (0 == inventory_values->values_num)
1276 return;
1277
1278 for (i = 0; i < inventory_values->values_num; i++)
1279 {
1280 zbx_inventory_value_t *inventory_value = (zbx_inventory_value_t *)inventory_values->values[i];
1281
1282 zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset,
1283 "update host_inventory set %s='%s' where hostid=" ZBX_FS_UI64 ";\n",
1284 inventory_value->field_name, inventory_value->value_esc, inventory_value->hostid);
1285
1286 DBexecute_overflowed_sql(&sql, &sql_alloc, sql_offset);
1287 }
1288 }
1289
DCinventory_value_free(zbx_inventory_value_t * inventory_value)1290 static void DCinventory_value_free(zbx_inventory_value_t *inventory_value)
1291 {
1292 zbx_free(inventory_value->value_esc);
1293 zbx_free(inventory_value);
1294 }
1295
1296 /******************************************************************************
1297 * *
1298 * Function: DCmass_update_items *
1299 * *
1300 * Purpose: update items info after new value is received *
1301 * *
1302 * Parameters: history - array of history data *
1303 * history_num - number of history structures *
1304 * *
1305 * Author: Alexei Vladishev, Eugene Grigorjev, Alexander Vladishev *
1306 * *
1307 ******************************************************************************/
DCmass_update_items(ZBX_DC_HISTORY * history,int history_num)1308 static void DCmass_update_items(ZBX_DC_HISTORY *history, int history_num)
1309 {
1310 const char *__function_name = "DCmass_update_items";
1311
1312 size_t sql_offset = 0;
1313 ZBX_DC_HISTORY *h;
1314 zbx_vector_uint64_t ids;
1315 DC_ITEM *items = NULL;
1316 int i, j, *errcodes = NULL;
1317 zbx_hashset_t delta_history = {NULL};
1318 zbx_vector_ptr_t inventory_values;
1319
1320 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
1321
1322 items = zbx_malloc(items, sizeof(DC_ITEM) * history_num);
1323 errcodes = zbx_malloc(errcodes, sizeof(int) * history_num);
1324 zbx_hashset_create(&delta_history, 1000, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1325
1326 zbx_vector_ptr_create(&inventory_values);
1327 zbx_vector_uint64_create(&ids);
1328 zbx_vector_uint64_reserve(&ids, history_num);
1329
1330 for (i = 0; i < history_num; i++)
1331 zbx_vector_uint64_append(&ids, history[i].itemid);
1332
1333 zbx_vector_uint64_sort(&ids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1334
1335 DCconfig_get_items_by_itemids(items, ids.values, errcodes, history_num);
1336 DCget_delta_items(&delta_history, &ids);
1337
1338 zbx_vector_uint64_clear(&ids); /* item ids that are not disabled and not deleted in DB */
1339
1340 DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset);
1341
1342 for (i = 0; i < history_num; i++)
1343 {
1344 if (SUCCEED != errcodes[i])
1345 continue;
1346
1347 if (ITEM_STATUS_ACTIVE != items[i].status)
1348 continue;
1349
1350 if (HOST_STATUS_MONITORED != items[i].host.status)
1351 continue;
1352
1353 for (j = 0; j < history_num; j++)
1354 {
1355 if (items[i].itemid == history[j].itemid)
1356 break;
1357 }
1358
1359 if (history_num == j)
1360 {
1361 THIS_SHOULD_NEVER_HAPPEN;
1362 continue;
1363 }
1364
1365 h = &history[j];
1366
1367 if (ITEM_STATE_NORMAL == h->state && h->value_type != items[i].value_type)
1368 continue;
1369
1370 h->keep_history = (0 != items[i].history);
1371 if (ITEM_VALUE_TYPE_FLOAT == items[i].value_type || ITEM_VALUE_TYPE_UINT64 == items[i].value_type)
1372 h->keep_trends = (0 != items[i].trends);
1373
1374 DCadd_update_item_sql(&sql_offset, &items[i], h, &delta_history);
1375 DBexecute_overflowed_sql(&sql, &sql_alloc, &sql_offset);
1376
1377 DCinventory_value_add(&inventory_values, &items[i], h);
1378
1379 zbx_vector_uint64_append(&ids, items[i].itemid);
1380 }
1381
1382 zbx_vector_ptr_sort(&inventory_values, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC);
1383
1384 DCadd_update_inventory_sql(&sql_offset, &inventory_values);
1385
1386 zbx_vector_ptr_clear_ext(&inventory_values, (zbx_clean_func_t)DCinventory_value_free);
1387 zbx_vector_ptr_destroy(&inventory_values);
1388
1389 /* disable processing of deleted and disabled items by setting ZBX_DC_FLAG_UNDEF flag */
1390 for (i = 0; i < history_num; i++)
1391 {
1392 if (FAIL == zbx_vector_uint64_bsearch(&ids, history[i].itemid, ZBX_DEFAULT_UINT64_COMPARE_FUNC))
1393 history[i].flags |= ZBX_DC_FLAG_UNDEF;
1394 }
1395
1396 zbx_vector_uint64_destroy(&ids);
1397
1398 DCset_delta_items(&delta_history);
1399
1400 DCconfig_clean_items(items, errcodes, history_num);
1401
1402 zbx_hashset_destroy(&delta_history);
1403 zbx_free(errcodes);
1404 zbx_free(items);
1405
1406 if (sql_offset > 16) /* In ORACLE always present begin..end; */
1407 {
1408 DBend_multiple_update(&sql, &sql_alloc, &sql_offset);
1409 DBexecute("%s", sql);
1410 }
1411
1412 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
1413 }
1414
1415 /******************************************************************************
1416 * *
1417 * Function: DCmass_proxy_update_items *
1418 * *
1419 * Purpose: update items info after new value is received *
1420 * *
1421 * Parameters: history - array of history data *
1422 * history_num - number of history structures *
1423 * *
1424 * Author: Alexei Vladishev, Eugene Grigorjev, Alexander Vladishev *
1425 * *
1426 ******************************************************************************/
DCmass_proxy_update_items(ZBX_DC_HISTORY * history,int history_num)1427 static void DCmass_proxy_update_items(ZBX_DC_HISTORY *history, int history_num)
1428 {
1429 const char *__function_name = "DCmass_proxy_update_items";
1430
1431 size_t sql_offset = 0;
1432 int i;
1433
1434 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
1435
1436 DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset);
1437
1438 for (i = 0; i < history_num; i++)
1439 {
1440 if (ITEM_STATE_NOTSUPPORTED == history[i].state)
1441 continue;
1442
1443 if (0 == (ZBX_DC_FLAG_META & history[i].flags))
1444 continue;
1445
1446 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
1447 "update items"
1448 " set lastlogsize=" ZBX_FS_UI64
1449 ",mtime=%d"
1450 " where itemid=" ZBX_FS_UI64 ";\n",
1451 history[i].lastlogsize, history[i].mtime, history[i].itemid);
1452
1453 DBexecute_overflowed_sql(&sql, &sql_alloc, &sql_offset);
1454 }
1455
1456 DBend_multiple_update(&sql, &sql_alloc, &sql_offset);
1457
1458 if (sql_offset > 16) /* In ORACLE always present begin..end; */
1459 DBexecute("%s", sql);
1460
1461 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
1462 }
1463
1464 /******************************************************************************
1465 * *
1466 * Function: dc_add_history_dbl *
1467 * *
1468 * Purpose: helper function for DCmass_add_history() *
1469 * *
1470 ******************************************************************************/
dc_add_history_dbl(ZBX_DC_HISTORY * history,int history_num)1471 static void dc_add_history_dbl(ZBX_DC_HISTORY *history, int history_num)
1472 {
1473 int i;
1474 zbx_db_insert_t db_insert;
1475
1476 zbx_db_insert_prepare(&db_insert, "history", "itemid", "clock", "ns", "value", NULL);
1477
1478 for (i = 0; i < history_num; i++)
1479 {
1480 const ZBX_DC_HISTORY *h = &history[i];
1481
1482 if (0 != (ZBX_DC_FLAG_UNDEF & h->flags) || 0 != (ZBX_DC_FLAG_NOVALUE & h->flags))
1483 continue;
1484
1485 if (0 == h->keep_history)
1486 continue;
1487
1488 if (ITEM_VALUE_TYPE_FLOAT != h->value_type)
1489 continue;
1490
1491 zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, h->value.dbl);
1492 }
1493
1494 zbx_db_insert_execute(&db_insert);
1495 zbx_db_insert_clean(&db_insert);
1496 }
1497
1498 /******************************************************************************
1499 * *
1500 * Function: dc_add_history_uint *
1501 * *
1502 * Purpose: helper function for DCmass_add_history() *
1503 * *
1504 ******************************************************************************/
dc_add_history_uint(ZBX_DC_HISTORY * history,int history_num)1505 static void dc_add_history_uint(ZBX_DC_HISTORY *history, int history_num)
1506 {
1507 int i;
1508 zbx_db_insert_t db_insert;
1509
1510 zbx_db_insert_prepare(&db_insert, "history_uint", "itemid", "clock", "ns", "value", NULL);
1511
1512 for (i = 0; i < history_num; i++)
1513 {
1514 const ZBX_DC_HISTORY *h = &history[i];
1515
1516 if (0 != (ZBX_DC_FLAG_UNDEF & h->flags) || 0 != (ZBX_DC_FLAG_NOVALUE & h->flags))
1517 continue;
1518
1519 if (0 == h->keep_history)
1520 continue;
1521
1522 if (ITEM_VALUE_TYPE_UINT64 != history[i].value_type)
1523 continue;
1524
1525 zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, h->value.ui64);
1526 }
1527
1528 zbx_db_insert_execute(&db_insert);
1529 zbx_db_insert_clean(&db_insert);
1530 }
1531
1532 /******************************************************************************
1533 * *
1534 * Function: dc_add_history_str *
1535 * *
1536 * Purpose: helper function for DCmass_add_history() *
1537 * *
1538 ******************************************************************************/
dc_add_history_str(ZBX_DC_HISTORY * history,int history_num)1539 static void dc_add_history_str(ZBX_DC_HISTORY *history, int history_num)
1540 {
1541 int i;
1542 zbx_db_insert_t db_insert;
1543
1544 zbx_db_insert_prepare(&db_insert, "history_str", "itemid", "clock", "ns", "value", NULL);
1545
1546 for (i = 0; i < history_num; i++)
1547 {
1548 const ZBX_DC_HISTORY *h = &history[i];
1549
1550 if (0 != (ZBX_DC_FLAG_UNDEF & h->flags) || 0 != (ZBX_DC_FLAG_NOVALUE & h->flags))
1551 continue;
1552
1553 if (0 == h->keep_history)
1554 continue;
1555
1556 if (ITEM_VALUE_TYPE_STR != h->value_type)
1557 continue;
1558
1559 zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, h->value_orig.str);
1560 }
1561
1562 zbx_db_insert_execute(&db_insert);
1563 zbx_db_insert_clean(&db_insert);
1564 }
1565
1566 /******************************************************************************
1567 * *
1568 * Function: dc_add_history_text *
1569 * *
1570 * Purpose: helper function for DCmass_add_history() *
1571 * *
1572 ******************************************************************************/
dc_add_history_text(ZBX_DC_HISTORY * history,int history_num,int htext_num)1573 static void dc_add_history_text(ZBX_DC_HISTORY *history, int history_num, int htext_num)
1574 {
1575 int i;
1576 zbx_uint64_t id;
1577 zbx_db_insert_t db_insert;
1578
1579 zbx_db_insert_prepare(&db_insert, "history_text", "id", "itemid", "clock", "ns", "value", NULL);
1580
1581 id = DBget_maxid_num("history_text", htext_num);
1582
1583 for (i = 0; i < history_num; i++)
1584 {
1585 const ZBX_DC_HISTORY *h = &history[i];
1586
1587 if (0 != (ZBX_DC_FLAG_UNDEF & h->flags) || 0 != (ZBX_DC_FLAG_NOVALUE & h->flags))
1588 continue;
1589
1590 if (0 == h->keep_history)
1591 continue;
1592
1593 if (ITEM_VALUE_TYPE_TEXT != h->value_type)
1594 continue;
1595
1596 zbx_db_insert_add_values(&db_insert, id++, h->itemid, h->ts.sec, h->ts.ns, h->value_orig.str);
1597 }
1598
1599 zbx_db_insert_execute(&db_insert);
1600 zbx_db_insert_clean(&db_insert);
1601 }
1602
1603 /******************************************************************************
1604 * *
1605 * Function: dc_add_history_log *
1606 * *
1607 * Purpose: helper function for DCmass_add_history() *
1608 * *
1609 ******************************************************************************/
dc_add_history_log(ZBX_DC_HISTORY * history,int history_num,int hlog_num)1610 static void dc_add_history_log(ZBX_DC_HISTORY *history, int history_num, int hlog_num)
1611 {
1612 int i;
1613 zbx_uint64_t id;
1614 zbx_db_insert_t db_insert;
1615
1616 zbx_db_insert_prepare(&db_insert, "history_log", "id", "itemid", "clock", "ns", "timestamp", "source",
1617 "severity", "value", "logeventid", NULL);
1618
1619 id = DBget_maxid_num("history_log", hlog_num);
1620
1621 for (i = 0; i < history_num; i++)
1622 {
1623 const ZBX_DC_HISTORY *h = &history[i];
1624
1625 if (0 != (ZBX_DC_FLAG_UNDEF & h->flags) || 0 != (ZBX_DC_FLAG_NOVALUE & h->flags))
1626 continue;
1627
1628 if (0 == h->keep_history)
1629 continue;
1630
1631 if (ITEM_VALUE_TYPE_LOG != h->value_type)
1632 continue;
1633
1634 zbx_db_insert_add_values(&db_insert, id++, h->itemid, h->ts.sec, h->ts.ns, h->timestamp,
1635 NULL != h->value.str ? h->value.str : "", h->severity, h->value_orig.str,
1636 h->logeventid);
1637 }
1638
1639 zbx_db_insert_execute(&db_insert);
1640 zbx_db_insert_clean(&db_insert);
1641 }
1642
1643 /******************************************************************************
1644 * *
1645 * Function: DCmass_add_history *
1646 * *
1647 * Purpose: inserting new history data after new value is received *
1648 * *
1649 * Parameters: history - array of history data *
1650 * history_num - number of history structures *
1651 * *
1652 * Author: Alexander Vladishev *
1653 * *
1654 ******************************************************************************/
DCmass_add_history(ZBX_DC_HISTORY * history,int history_num)1655 static void DCmass_add_history(ZBX_DC_HISTORY *history, int history_num)
1656 {
1657 const char *__function_name = "DCmass_add_history";
1658
1659 int i, h_num = 0, huint_num = 0, hstr_num = 0, htext_num = 0, hlog_num = 0, rc = ZBX_DB_OK;
1660
1661 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
1662
1663 for (i = 0; i < history_num; i++)
1664 {
1665 const ZBX_DC_HISTORY *h = &history[i];
1666
1667 if (0 != (ZBX_DC_FLAG_UNDEF & h->flags) || 0 != (ZBX_DC_FLAG_NOVALUE & h->flags))
1668 continue;
1669
1670 if (0 == h->keep_history)
1671 continue;
1672
1673 switch (h->value_type)
1674 {
1675 case ITEM_VALUE_TYPE_FLOAT:
1676 h_num++;
1677 break;
1678 case ITEM_VALUE_TYPE_UINT64:
1679 huint_num++;
1680 break;
1681 case ITEM_VALUE_TYPE_STR:
1682 hstr_num++;
1683 break;
1684 case ITEM_VALUE_TYPE_TEXT:
1685 htext_num++;
1686 break;
1687 case ITEM_VALUE_TYPE_LOG:
1688 hlog_num++;
1689 break;
1690 }
1691 }
1692
1693 /* history */
1694 if (0 != h_num)
1695 dc_add_history_dbl(history, history_num);
1696
1697 /* history_uint */
1698 if (0 != huint_num)
1699 dc_add_history_uint(history, history_num);
1700
1701 /* history_str */
1702 if (0 != hstr_num)
1703 dc_add_history_str(history, history_num);
1704
1705 /* history_text */
1706 if (0 != htext_num)
1707 dc_add_history_text(history, history_num, htext_num);
1708
1709 /* history_log */
1710 if (0 != hlog_num)
1711 dc_add_history_log(history, history_num, hlog_num);
1712
1713 /* update value cache */
1714 if (ZBX_DB_OK <= rc && 0 != (program_type & ZBX_PROGRAM_TYPE_SERVER) &&
1715 0 != h_num + huint_num + hstr_num + htext_num + hlog_num)
1716 {
1717 /* the history values were written into database, now add to value cache */
1718 zbx_log_value_t log;
1719 history_value_t value, *pvalue;
1720
1721 value.log = &log;
1722
1723 zbx_vc_lock();
1724
1725 for (i = 0; i < history_num; i++)
1726 {
1727 ZBX_DC_HISTORY *h = &history[i];
1728
1729 if (0 == h->keep_history)
1730 continue;
1731
1732 if (0 != (ZBX_DC_FLAG_UNDEF & h->flags) || 0 != (ZBX_DC_FLAG_NOVALUE & h->flags))
1733 continue;
1734
1735 switch (h->value_type)
1736 {
1737 case ITEM_VALUE_TYPE_FLOAT:
1738 case ITEM_VALUE_TYPE_UINT64:
1739 pvalue = &h->value;
1740 break;
1741 case ITEM_VALUE_TYPE_STR:
1742 case ITEM_VALUE_TYPE_TEXT:
1743 pvalue = &h->value_orig;
1744 break;
1745 case ITEM_VALUE_TYPE_LOG:
1746 log.timestamp = h->timestamp;
1747 log.severity = h->severity;
1748 log.logeventid = h->logeventid;
1749 log.value = h->value_orig.str;
1750 log.source = h->value.str;
1751 pvalue = &value;
1752 break;
1753 default:
1754 THIS_SHOULD_NEVER_HAPPEN;
1755 continue;
1756 }
1757
1758 zbx_vc_add_value(h->itemid, h->value_type, &h->ts, pvalue);
1759 }
1760
1761 zbx_vc_unlock();
1762 }
1763
1764 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
1765 }
1766
1767 /******************************************************************************
1768 * *
1769 * Function: dc_add_proxy_history *
1770 * *
1771 * Purpose: helper function for DCmass_proxy_add_history() *
1772 * *
1773 * Comment: this function is meant for items with value_type other other than *
1774 * ITEM_VALUE_TYPE_LOG not containing meta information in result *
1775 * *
1776 ******************************************************************************/
dc_add_proxy_history(ZBX_DC_HISTORY * history,int history_num)1777 static void dc_add_proxy_history(ZBX_DC_HISTORY *history, int history_num)
1778 {
1779 int i;
1780 char buffer[64], *pvalue;
1781 zbx_db_insert_t db_insert;
1782
1783 zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "value", NULL);
1784
1785 for (i = 0; i < history_num; i++)
1786 {
1787 const ZBX_DC_HISTORY *h = &history[i];
1788
1789 if (0 != (h->flags & ZBX_DC_FLAG_UNDEF))
1790 continue;
1791
1792 if (0 != (h->flags & ZBX_DC_FLAG_META))
1793 continue;
1794
1795 if (ITEM_STATE_NOTSUPPORTED == h->state)
1796 continue;
1797
1798 switch (h->value_type)
1799 {
1800 case ITEM_VALUE_TYPE_FLOAT:
1801 zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_DBL, h->value_orig.dbl);
1802 break;
1803 case ITEM_VALUE_TYPE_UINT64:
1804 zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_UI64, h->value_orig.ui64);
1805 break;
1806 case ITEM_VALUE_TYPE_STR:
1807 case ITEM_VALUE_TYPE_TEXT:
1808 case ITEM_VALUE_TYPE_LOG:
1809 pvalue = h->value_orig.str;
1810 break;
1811 default:
1812 continue;
1813 }
1814
1815 zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, pvalue);
1816 }
1817
1818 zbx_db_insert_execute(&db_insert);
1819 zbx_db_insert_clean(&db_insert);
1820 }
1821
1822 /******************************************************************************
1823 * *
1824 * Function: dc_add_proxy_history_meta *
1825 * *
1826 * Purpose: helper function for DCmass_proxy_add_history() *
1827 * *
1828 * Comment: this function is meant for items with value_type other other than *
1829 * ITEM_VALUE_TYPE_LOG containing meta information in result *
1830 * *
1831 ******************************************************************************/
dc_add_proxy_history_meta(ZBX_DC_HISTORY * history,int history_num)1832 static void dc_add_proxy_history_meta(ZBX_DC_HISTORY *history, int history_num)
1833 {
1834 int i;
1835 char buffer[64], *pvalue;
1836 zbx_db_insert_t db_insert;
1837
1838 zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "value", "lastlogsize", "mtime",
1839 "flags", NULL);
1840
1841 for (i = 0; i < history_num; i++)
1842 {
1843 unsigned int flags = PROXY_HISTORY_FLAG_META;
1844 const ZBX_DC_HISTORY *h = &history[i];
1845
1846 if (ITEM_STATE_NOTSUPPORTED == h->state)
1847 continue;
1848
1849 if (0 != (h->flags & ZBX_DC_FLAG_UNDEF))
1850 continue;
1851
1852 if (0 == (h->flags & ZBX_DC_FLAG_META))
1853 continue;
1854
1855 if (ITEM_VALUE_TYPE_LOG == h->value_type)
1856 continue;
1857
1858 if (0 == (h->flags & ZBX_DC_FLAG_NOVALUE))
1859 {
1860 switch (h->value_type)
1861 {
1862 case ITEM_VALUE_TYPE_FLOAT:
1863 zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_DBL, h->value_orig.dbl);
1864 break;
1865 case ITEM_VALUE_TYPE_UINT64:
1866 zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_UI64, h->value_orig.ui64);
1867 break;
1868 case ITEM_VALUE_TYPE_STR:
1869 case ITEM_VALUE_TYPE_TEXT:
1870 pvalue = h->value_orig.str;
1871 break;
1872 default:
1873 THIS_SHOULD_NEVER_HAPPEN;
1874 continue;
1875 }
1876 }
1877 else
1878 {
1879 flags |= PROXY_HISTORY_FLAG_NOVALUE;
1880 pvalue = "";
1881 }
1882
1883 zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, pvalue, h->lastlogsize, h->mtime,
1884 flags);
1885 }
1886
1887 zbx_db_insert_execute(&db_insert);
1888 zbx_db_insert_clean(&db_insert);
1889 }
1890
1891 /******************************************************************************
1892 * *
1893 * Function: dc_add_proxy_history_log *
1894 * *
1895 * Purpose: helper function for DCmass_proxy_add_history() *
1896 * *
1897 * Comment: this function is meant for items with value_type *
1898 * ITEM_VALUE_TYPE_LOG *
1899 * *
1900 ******************************************************************************/
dc_add_proxy_history_log(ZBX_DC_HISTORY * history,int history_num)1901 static void dc_add_proxy_history_log(ZBX_DC_HISTORY *history, int history_num)
1902 {
1903 int i;
1904 zbx_db_insert_t db_insert;
1905
1906 /* see hc_copy_history_data() for fields that might be uninitialized and need special handling here */
1907 zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "timestamp", "source", "severity",
1908 "value", "logeventid", "lastlogsize", "mtime", "flags", NULL);
1909
1910 for (i = 0; i < history_num; i++)
1911 {
1912 unsigned int flags = PROXY_HISTORY_FLAG_META;
1913 const char *pvalue;
1914 const char *psource;
1915 const ZBX_DC_HISTORY *h = &history[i];
1916
1917 if (ITEM_STATE_NOTSUPPORTED == h->state)
1918 continue;
1919
1920 if (ITEM_VALUE_TYPE_LOG != h->value_type)
1921 continue;
1922
1923 if (0 != (h->flags & ZBX_DC_FLAG_NOVALUE))
1924 {
1925 /* sent to server only if not 0, see proxy_get_history_data() */
1926 const int unset_if_novalue = 0;
1927
1928 flags |= PROXY_HISTORY_FLAG_NOVALUE;
1929
1930 pvalue = "";
1931 psource = "";
1932
1933 zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, unset_if_novalue, psource,
1934 unset_if_novalue, pvalue, unset_if_novalue, h->lastlogsize, h->mtime, flags);
1935 }
1936 else
1937 {
1938 pvalue = h->value_orig.str;
1939
1940 if (NULL != h->value.str)
1941 psource = h->value.str;
1942 else
1943 psource = "";
1944
1945 zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, h->timestamp, psource,
1946 h->severity, pvalue, h->logeventid, h->lastlogsize, h->mtime, flags);
1947 }
1948 }
1949
1950 zbx_db_insert_execute(&db_insert);
1951 zbx_db_insert_clean(&db_insert);
1952 }
1953
1954 /******************************************************************************
1955 * *
1956 * Function: dc_add_proxy_history_notsupported *
1957 * *
1958 * Purpose: helper function for DCmass_proxy_add_history() *
1959 * *
1960 ******************************************************************************/
dc_add_proxy_history_notsupported(ZBX_DC_HISTORY * history,int history_num)1961 static void dc_add_proxy_history_notsupported(ZBX_DC_HISTORY *history, int history_num)
1962 {
1963 int i;
1964 zbx_db_insert_t db_insert;
1965
1966 zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "value", "state", NULL);
1967
1968 for (i = 0; i < history_num; i++)
1969 {
1970 const ZBX_DC_HISTORY *h = &history[i];
1971
1972 if (ITEM_STATE_NOTSUPPORTED != h->state)
1973 continue;
1974
1975 zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, h->value_orig.err, (int)h->state);
1976 }
1977
1978 zbx_db_insert_execute(&db_insert);
1979 zbx_db_insert_clean(&db_insert);
1980 }
1981
1982 /******************************************************************************
1983 * *
1984 * Function: DCmass_proxy_add_history *
1985 * *
1986 * Purpose: inserting new history data after new value is received *
1987 * *
1988 * Parameters: history - array of history data *
1989 * history_num - number of history structures *
1990 * *
1991 * Author: Alexander Vladishev *
1992 * *
1993 ******************************************************************************/
DCmass_proxy_add_history(ZBX_DC_HISTORY * history,int history_num)1994 static void DCmass_proxy_add_history(ZBX_DC_HISTORY *history, int history_num)
1995 {
1996 const char *__function_name = "DCmass_proxy_add_history";
1997 int i, h_num = 0, h_meta_num = 0, hlog_num = 0, notsupported_num = 0;
1998
1999 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
2000
2001 for (i = 0; i < history_num; i++)
2002 {
2003 const ZBX_DC_HISTORY *h = &history[i];
2004
2005 if (ITEM_STATE_NOTSUPPORTED == h->state)
2006 {
2007 notsupported_num++;
2008 continue;
2009 }
2010
2011 switch (h->value_type)
2012 {
2013 case ITEM_VALUE_TYPE_LOG:
2014 /* if log item has no meta information it has no other information but value */
2015 if (0 != (h->flags & ZBX_DC_FLAG_META))
2016 hlog_num++;
2017 else
2018 h_num++;
2019 break;
2020 case ITEM_VALUE_TYPE_FLOAT:
2021 case ITEM_VALUE_TYPE_UINT64:
2022 case ITEM_VALUE_TYPE_STR:
2023 case ITEM_VALUE_TYPE_TEXT:
2024 if (0 != (h->flags & ZBX_DC_FLAG_META))
2025 h_meta_num++;
2026 else
2027 h_num++;
2028 break;
2029 default:
2030 THIS_SHOULD_NEVER_HAPPEN;
2031 }
2032 }
2033
2034 if (0 != h_num)
2035 dc_add_proxy_history(history, history_num);
2036
2037 if (0 != h_meta_num)
2038 dc_add_proxy_history_meta(history, history_num);
2039
2040 if (0 != hlog_num)
2041 dc_add_proxy_history_log(history, history_num);
2042
2043 if (0 != notsupported_num)
2044 dc_add_proxy_history_notsupported(history, history_num);
2045
2046 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
2047 }
2048
2049 /******************************************************************************
2050 * *
2051 * Function: DCsync_history *
2052 * *
2053 * Purpose: writes updates and new data from pool to database *
2054 * *
2055 * Return value: the timestamp of next history queue value to sync, *
2056 * 0 if the queue is empty or most of items are locked by *
2057 * triggers. *
2058 * *
2059 * Author: Alexei Vladishev *
2060 * *
2061 ******************************************************************************/
DCsync_history(int sync_type,int * total_num)2062 int DCsync_history(int sync_type, int *total_num)
2063 {
2064 const char *__function_name = "DCsync_history";
2065 static ZBX_DC_HISTORY *history = NULL;
2066 int history_num, candidate_num, next_sync = 0;
2067 time_t sync_start, now;
2068 zbx_vector_uint64_t triggerids;
2069 zbx_vector_ptr_t history_items, itservice_updates;
2070 zbx_binary_heap_t tmp_history_queue;
2071
2072 zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d", __function_name, cache->history_num);
2073
2074 *total_num = 0;
2075
2076 if (ZBX_SYNC_FULL == sync_type)
2077 {
2078 zbx_hashset_iter_t iter;
2079 zbx_hc_item_t *item;
2080
2081 /* History index cache might be full without any space left for queueing items from history index to */
2082 /* history queue. The solution: replace the shared-memory history queue with heap-allocated one. Add */
2083 /* all items from history index to the new history queue. */
2084 /* */
2085 /* Assertions that must be true. */
2086 /* * This is the main server or proxy process, */
2087 /* * There are no other users of history index cache stored in shared memory. Other processes */
2088 /* should have quit by this point. */
2089 /* * other parts of the program do not hold pointers to the elements of history queue that is */
2090 /* stored in the shared memory. */
2091
2092 /* unlock all triggers before full sync so no items are locked by triggers */
2093 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
2094 DCconfig_unlock_all_triggers();
2095
2096 LOCK_CACHE;
2097
2098 tmp_history_queue = cache->history_queue;
2099
2100 zbx_binary_heap_create(&cache->history_queue, hc_queue_elem_compare_func, ZBX_BINARY_HEAP_OPTION_EMPTY);
2101 zbx_hashset_iter_reset(&cache->history_items, &iter);
2102
2103 /* add all items from history index to the new history queue */
2104 while (NULL != (item = (zbx_hc_item_t *)zbx_hashset_iter_next(&iter)))
2105 hc_queue_item(item);
2106
2107 UNLOCK_CACHE;
2108
2109 zabbix_log(LOG_LEVEL_WARNING, "syncing history data...");
2110 }
2111
2112 if (0 == cache->history_num)
2113 goto finish;
2114
2115 sync_start = time(NULL);
2116
2117 if (NULL == history)
2118 history = zbx_malloc(history, ZBX_HC_SYNC_MAX * sizeof(ZBX_DC_HISTORY));
2119
2120 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
2121 {
2122 zbx_vector_uint64_create(&triggerids);
2123 zbx_vector_uint64_reserve(&triggerids, MIN(cache->history_num, ZBX_HC_SYNC_MAX) + 32);
2124 zbx_vector_ptr_create(&itservice_updates);
2125 }
2126
2127 zbx_vector_ptr_create(&history_items);
2128 zbx_vector_ptr_reserve(&history_items, MIN(cache->history_num, ZBX_HC_SYNC_MAX) + 32);
2129
2130 do
2131 {
2132 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
2133 zbx_vector_uint64_clear(&triggerids);
2134
2135 LOCK_CACHE;
2136
2137 hc_pop_items(&history_items);
2138
2139 if (0 != history_items.values_num && 0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
2140 {
2141 history_num = DCconfig_lock_triggers_by_history_items(&history_items, &triggerids);
2142
2143 /* there are unavailable items, push them back in history queue */
2144 if (history_num != history_items.values_num)
2145 hc_push_busy_items(&history_items);
2146 }
2147 else
2148 history_num = history_items.values_num;
2149
2150 UNLOCK_CACHE;
2151
2152 if (0 == history_num)
2153 break;
2154
2155 hc_get_item_values(history, &history_items);
2156
2157 DBbegin();
2158
2159 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
2160 {
2161 DCmass_update_items(history, history_num);
2162 DCmass_add_history(history, history_num);
2163 DCmass_update_triggers(history, history_num);
2164 DCmass_update_trends(history, history_num);
2165 DCflush_nextchecks();
2166
2167 /* processing of events, generated in functions: */
2168 /* DCmass_update_items() */
2169 /* DCmass_update_triggers() */
2170 /* DCflush_nextchecks() */
2171 process_events(&itservice_updates);
2172 }
2173 else
2174 {
2175 DCmass_proxy_add_history(history, history_num);
2176 DCmass_proxy_update_items(history, history_num);
2177 }
2178
2179 DBcommit();
2180
2181 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
2182 {
2183 DBupdate_itservices(&itservice_updates);
2184 zbx_vector_ptr_clear_ext(&itservice_updates, zbx_ptr_free);
2185 DCconfig_unlock_triggers(&triggerids);
2186 }
2187
2188 LOCK_CACHE;
2189
2190 next_sync = hc_push_processed_items(&history_items);
2191 cache->history_num -= history_num;
2192
2193 UNLOCK_CACHE;
2194
2195 *total_num += history_num;
2196 candidate_num = history_items.values_num;
2197
2198 now = time(NULL);
2199
2200 if (ZBX_SYNC_FULL == sync_type && now - sync_start >= 10)
2201 {
2202 zabbix_log(LOG_LEVEL_WARNING, "syncing history data... " ZBX_FS_DBL "%%",
2203 (double)*total_num / (cache->history_num + *total_num) * 100);
2204 sync_start = now;
2205 }
2206
2207 zbx_vector_ptr_clear(&history_items);
2208 hc_free_item_values(history, history_num);
2209
2210 if (ZBX_HC_SYNC_MIN_PCNT > history_num * 100 / candidate_num)
2211 {
2212 /* Stop sync if only small percentage of sync candidates were processed */
2213 /* (meaning most of sync candidates are locked by triggers). */
2214 /* In this case is better to wait a bit for other syncers to unlock items */
2215 /* rather than trying and failing to sync locked items over and over again. */
2216
2217 next_sync = 0;
2218 break;
2219 }
2220
2221 /* Exit from sync loop if we have spent too much time here */
2222 /* unless we are doing full sync. This is done to allow */
2223 /* syncer process to update their statistics. */
2224 }
2225 while ((ZBX_HC_SYNC_TIME_MAX >= now - sync_start && 0 != next_sync) || sync_type == ZBX_SYNC_FULL);
2226
2227 zbx_vector_ptr_destroy(&history_items);
2228
2229 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
2230 {
2231 zbx_vector_uint64_destroy(&triggerids);
2232 zbx_vector_ptr_destroy(&itservice_updates);
2233 }
2234 finish:
2235 if (ZBX_SYNC_FULL == sync_type)
2236 {
2237 LOCK_CACHE;
2238
2239 zbx_binary_heap_destroy(&cache->history_queue);
2240 cache->history_queue = tmp_history_queue;
2241
2242 UNLOCK_CACHE;
2243
2244 zabbix_log(LOG_LEVEL_WARNING, "syncing history data done");
2245 }
2246
2247 return next_sync;
2248 }
2249
2250 /******************************************************************************
2251 * *
2252 * local history cache *
2253 * *
2254 ******************************************************************************/
dc_string_buffer_realloc(size_t len)2255 static void dc_string_buffer_realloc(size_t len)
2256 {
2257 if (string_values_alloc >= string_values_offset + len)
2258 return;
2259
2260 do
2261 {
2262 string_values_alloc += ZBX_STRING_REALLOC_STEP;
2263 }
2264 while (string_values_alloc < string_values_offset + len);
2265
2266 string_values = zbx_realloc(string_values, string_values_alloc);
2267 }
2268
dc_local_get_history_slot()2269 static dc_item_value_t *dc_local_get_history_slot()
2270 {
2271 if (ZBX_MAX_VALUES_LOCAL == item_values_num)
2272 dc_flush_history();
2273
2274 if (item_values_alloc == item_values_num)
2275 {
2276 item_values_alloc += ZBX_STRUCT_REALLOC_STEP;
2277 item_values = zbx_realloc(item_values, item_values_alloc * sizeof(dc_item_value_t));
2278 }
2279
2280 return &item_values[item_values_num++];
2281 }
2282
dc_local_add_history_dbl(zbx_uint64_t itemid,const zbx_timespec_t * ts,double value_orig,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)2283 static void dc_local_add_history_dbl(zbx_uint64_t itemid, const zbx_timespec_t *ts, double value_orig,
2284 zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
2285 {
2286 dc_item_value_t *item_value;
2287
2288 item_value = dc_local_get_history_slot();
2289
2290 item_value->itemid = itemid;
2291 item_value->ts = *ts;
2292 item_value->value_type = ITEM_VALUE_TYPE_FLOAT;
2293 item_value->state = ITEM_STATE_NORMAL;
2294 item_value->flags = flags;
2295
2296 if (0 != (item_value->flags & ZBX_DC_FLAG_META))
2297 {
2298 item_value->lastlogsize = lastlogsize;
2299 item_value->mtime = mtime;
2300 }
2301
2302 if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE))
2303 item_value->value.value_dbl = value_orig;
2304 }
2305
dc_local_add_history_uint(zbx_uint64_t itemid,const zbx_timespec_t * ts,zbx_uint64_t value_orig,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)2306 static void dc_local_add_history_uint(zbx_uint64_t itemid, const zbx_timespec_t *ts, zbx_uint64_t value_orig,
2307 zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
2308 {
2309 dc_item_value_t *item_value;
2310
2311 item_value = dc_local_get_history_slot();
2312
2313 item_value->itemid = itemid;
2314 item_value->ts = *ts;
2315 item_value->value_type = ITEM_VALUE_TYPE_UINT64;
2316 item_value->state = ITEM_STATE_NORMAL;
2317 item_value->flags = flags;
2318
2319 if (0 != (item_value->flags & ZBX_DC_FLAG_META))
2320 {
2321 item_value->lastlogsize = lastlogsize;
2322 item_value->mtime = mtime;
2323 }
2324
2325 if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE))
2326 item_value->value.value_uint = value_orig;
2327 }
2328
dc_local_add_history_str(zbx_uint64_t itemid,const zbx_timespec_t * ts,const char * value_orig,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)2329 static void dc_local_add_history_str(zbx_uint64_t itemid, const zbx_timespec_t *ts, const char *value_orig,
2330 zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
2331 {
2332 dc_item_value_t *item_value;
2333
2334 item_value = dc_local_get_history_slot();
2335
2336 item_value->itemid = itemid;
2337 item_value->ts = *ts;
2338 item_value->value_type = ITEM_VALUE_TYPE_STR;
2339 item_value->state = ITEM_STATE_NORMAL;
2340 item_value->flags = flags;
2341
2342 if (0 != (item_value->flags & ZBX_DC_FLAG_META))
2343 {
2344 item_value->lastlogsize = lastlogsize;
2345 item_value->mtime = mtime;
2346 }
2347
2348 if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE))
2349 {
2350 item_value->value.value_str.len = zbx_db_strlen_n(value_orig, HISTORY_STR_VALUE_LEN) + 1;
2351 dc_string_buffer_realloc(item_value->value.value_str.len);
2352
2353 item_value->value.value_str.pvalue = string_values_offset;
2354 memcpy(&string_values[string_values_offset], value_orig, item_value->value.value_str.len);
2355 string_values_offset += item_value->value.value_str.len;
2356 }
2357 else
2358 item_value->value.value_str.len = 0;
2359 }
2360
dc_local_add_history_text(zbx_uint64_t itemid,const zbx_timespec_t * ts,const char * value_orig,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)2361 static void dc_local_add_history_text(zbx_uint64_t itemid, const zbx_timespec_t *ts, const char *value_orig,
2362 zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
2363 {
2364 dc_item_value_t *item_value;
2365
2366 item_value = dc_local_get_history_slot();
2367
2368 item_value->itemid = itemid;
2369 item_value->ts = *ts;
2370 item_value->value_type = ITEM_VALUE_TYPE_TEXT;
2371 item_value->state = ITEM_STATE_NORMAL;
2372 item_value->flags = flags;
2373
2374 if (0 != (item_value->flags & ZBX_DC_FLAG_META))
2375 {
2376 item_value->lastlogsize = lastlogsize;
2377 item_value->mtime = mtime;
2378 }
2379
2380 if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE))
2381 {
2382 item_value->value.value_str.len = zbx_db_strlen_n(value_orig, HISTORY_TEXT_VALUE_LEN) + 1;
2383 dc_string_buffer_realloc(item_value->value.value_str.len);
2384
2385 item_value->value.value_str.pvalue = string_values_offset;
2386 memcpy(&string_values[string_values_offset], value_orig, item_value->value.value_str.len);
2387 string_values_offset += item_value->value.value_str.len;
2388 }
2389 else
2390 item_value->value.value_str.len = 0;
2391 }
2392
dc_local_add_history_log(zbx_uint64_t itemid,const zbx_timespec_t * ts,const zbx_log_t * log,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)2393 static void dc_local_add_history_log(zbx_uint64_t itemid, const zbx_timespec_t *ts, const zbx_log_t *log,
2394 zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
2395 {
2396 dc_item_value_t *item_value;
2397
2398 item_value = dc_local_get_history_slot();
2399
2400 item_value->itemid = itemid;
2401 item_value->ts = *ts;
2402 item_value->value_type = ITEM_VALUE_TYPE_LOG;
2403 item_value->state = ITEM_STATE_NORMAL;
2404
2405 item_value->flags = flags;
2406
2407 if (0 != (item_value->flags & ZBX_DC_FLAG_META))
2408 {
2409 item_value->lastlogsize = lastlogsize;
2410 item_value->mtime = mtime;
2411 }
2412
2413 if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE))
2414 {
2415 item_value->severity = log->severity;
2416 item_value->logeventid = log->logeventid;
2417 item_value->timestamp = log->timestamp;
2418
2419 item_value->value.value_str.len = zbx_db_strlen_n(log->value, HISTORY_LOG_VALUE_LEN) + 1;
2420
2421 if (NULL != log->source && '\0' != *log->source)
2422 item_value->source.len = zbx_db_strlen_n(log->source, HISTORY_LOG_SOURCE_LEN) + 1;
2423 else
2424 item_value->source.len = 0;
2425 }
2426 else
2427 {
2428 item_value->value.value_str.len = 0;
2429 item_value->source.len = 0;
2430 }
2431
2432 if (0 != item_value->value.value_str.len + item_value->source.len)
2433 {
2434 dc_string_buffer_realloc(item_value->value.value_str.len + item_value->source.len);
2435
2436 if (0 != item_value->value.value_str.len)
2437 {
2438 item_value->value.value_str.pvalue = string_values_offset;
2439 memcpy(&string_values[string_values_offset], log->value, item_value->value.value_str.len);
2440 string_values_offset += item_value->value.value_str.len;
2441 }
2442
2443 if (0 != item_value->source.len)
2444 {
2445 item_value->source.pvalue = string_values_offset;
2446 memcpy(&string_values[string_values_offset], log->source, item_value->source.len);
2447 string_values_offset += item_value->source.len;
2448 }
2449 }
2450 }
2451
dc_local_add_history_notsupported(zbx_uint64_t itemid,const zbx_timespec_t * ts,const char * error)2452 static void dc_local_add_history_notsupported(zbx_uint64_t itemid, const zbx_timespec_t *ts, const char *error)
2453 {
2454 dc_item_value_t *item_value;
2455
2456 item_value = dc_local_get_history_slot();
2457
2458 item_value->itemid = itemid;
2459 item_value->ts = *ts;
2460 item_value->state = ITEM_STATE_NOTSUPPORTED;
2461 item_value->value.value_str.len = zbx_db_strlen_n(error, ITEM_ERROR_LEN) + 1;
2462
2463 dc_string_buffer_realloc(item_value->value.value_str.len);
2464 item_value->value.value_str.pvalue = string_values_offset;
2465 memcpy(&string_values[string_values_offset], error, item_value->value.value_str.len);
2466 string_values_offset += item_value->value.value_str.len;
2467 }
2468
dc_local_add_history_lld(zbx_uint64_t itemid,const zbx_timespec_t * ts,const char * value_orig)2469 static void dc_local_add_history_lld(zbx_uint64_t itemid, const zbx_timespec_t *ts, const char *value_orig)
2470 {
2471 dc_item_value_t *item_value;
2472
2473 item_value = dc_local_get_history_slot();
2474
2475 item_value->itemid = itemid;
2476 item_value->ts = *ts;
2477 item_value->state = ITEM_STATE_NORMAL;
2478 item_value->flags = ZBX_DC_FLAG_LLD;
2479 item_value->value.value_str.len = strlen(value_orig) + 1;
2480
2481 dc_string_buffer_realloc(item_value->value.value_str.len);
2482 item_value->value.value_str.pvalue = string_values_offset;
2483 memcpy(&string_values[string_values_offset], value_orig, item_value->value.value_str.len);
2484 string_values_offset += item_value->value.value_str.len;
2485 }
2486
2487 /******************************************************************************
2488 * *
2489 * Function: dc_add_history *
2490 * *
2491 * Purpose: add new value to the cache *
2492 * *
2493 * Parameters: itemid - [IN] the itemid *
2494 * value_type - [IN] the value type (see ITEM_VALUE_TYPE_* defs) *
2495 * item_flags - [IN] the item flags (e. g. lld rule) *
2496 * result - [IN] agent result containing the value to add *
2497 * ts - [IN] the value timestamp *
2498 * state - [IN] the item state *
2499 * error - [IN] the error message in case item state is *
2500 * ITEM_STATE_NOTSUPPORTED *
2501 * *
2502 ******************************************************************************/
dc_add_history(zbx_uint64_t itemid,unsigned char value_type,unsigned char item_flags,AGENT_RESULT * result,const zbx_timespec_t * ts,unsigned char state,const char * error)2503 void dc_add_history(zbx_uint64_t itemid, unsigned char value_type, unsigned char item_flags, AGENT_RESULT *result,
2504 const zbx_timespec_t *ts, unsigned char state, const char *error)
2505 {
2506 unsigned char value_flags;
2507
2508 if (ITEM_STATE_NOTSUPPORTED == state)
2509 {
2510 dc_local_add_history_notsupported(itemid, ts, error);
2511 return;
2512 }
2513
2514 if (0 != (ZBX_FLAG_DISCOVERY_RULE & item_flags))
2515 {
2516 if (NULL == GET_TEXT_RESULT(result))
2517 return;
2518
2519 /* server processes low-level discovery (lld) items while proxy stores their values in db */
2520 if (0 != (ZBX_PROGRAM_TYPE_SERVER & program_type))
2521 lld_process_discovery_rule(itemid, result->text, ts);
2522 else
2523 dc_local_add_history_lld(itemid, ts, result->text);
2524
2525 return;
2526 }
2527
2528 if (!ISSET_VALUE(result) && !ISSET_META(result))
2529 return;
2530
2531 value_flags = 0;
2532
2533 if (!ISSET_VALUE(result))
2534 value_flags |= ZBX_DC_FLAG_NOVALUE;
2535
2536 if (ISSET_META(result))
2537 value_flags |= ZBX_DC_FLAG_META;
2538
2539 /* Add data to the local history cache if: */
2540 /* 1) the NOVALUE flag is set (data contains only meta information) */
2541 /* 2) the NOVALUE flag is not set and value conversion succeeded */
2542 switch (value_type)
2543 {
2544 case ITEM_VALUE_TYPE_FLOAT:
2545 if (0 != (value_flags & ZBX_DC_FLAG_NOVALUE) || GET_DBL_RESULT(result))
2546 {
2547 dc_local_add_history_dbl(itemid, ts, result->dbl, result->lastlogsize, result->mtime,
2548 value_flags);
2549 }
2550 break;
2551 case ITEM_VALUE_TYPE_UINT64:
2552 if (0 != (value_flags & ZBX_DC_FLAG_NOVALUE) || GET_UI64_RESULT(result))
2553 {
2554 dc_local_add_history_uint(itemid, ts, result->ui64, result->lastlogsize, result->mtime,
2555 value_flags);
2556 }
2557 break;
2558 case ITEM_VALUE_TYPE_STR:
2559 if (0 != (value_flags & ZBX_DC_FLAG_NOVALUE) || GET_STR_RESULT(result))
2560 {
2561 dc_local_add_history_str(itemid, ts, result->str, result->lastlogsize, result->mtime,
2562 value_flags);
2563 }
2564 break;
2565 case ITEM_VALUE_TYPE_TEXT:
2566 if (0 != (value_flags & ZBX_DC_FLAG_NOVALUE) || GET_TEXT_RESULT(result))
2567 {
2568 dc_local_add_history_text(itemid, ts, result->text, result->lastlogsize, result->mtime,
2569 value_flags);
2570 }
2571 break;
2572 case ITEM_VALUE_TYPE_LOG:
2573 if (0 != (value_flags & ZBX_DC_FLAG_NOVALUE) || GET_LOG_RESULT(result))
2574 {
2575 dc_local_add_history_log(itemid, ts, result->log, result->lastlogsize, result->mtime,
2576 value_flags);
2577 }
2578 break;
2579 default:
2580 zabbix_log(LOG_LEVEL_ERR, "unknown value type [%d] for itemid [" ZBX_FS_UI64 "]",
2581 value_type, itemid);
2582 return;
2583 }
2584 }
2585
dc_flush_history()2586 void dc_flush_history()
2587 {
2588 if (0 == item_values_num)
2589 return;
2590
2591 LOCK_CACHE;
2592
2593 hc_add_item_values(item_values, item_values_num);
2594
2595 cache->history_num += item_values_num;
2596
2597 UNLOCK_CACHE;
2598
2599 item_values_num = 0;
2600 string_values_offset = 0;
2601 }
2602
2603 /******************************************************************************
2604 * *
2605 * history cache storage *
2606 * *
2607 ******************************************************************************/
2608 ZBX_MEM_FUNC_IMPL(__hc_index, hc_index_mem)
2609 ZBX_MEM_FUNC_IMPL(__hc, hc_mem)
2610
2611 struct zbx_hc_data
2612 {
2613 history_value_t value;
2614 zbx_uint64_t lastlogsize;
2615 zbx_timespec_t ts;
2616 int mtime;
2617 unsigned char value_type;
2618 unsigned char flags;
2619 unsigned char state;
2620
2621 struct zbx_hc_data *next;
2622 };
2623
2624 /******************************************************************************
2625 * *
2626 * Function: hc_queue_elem_compare_func *
2627 * *
2628 * Purpose: compares history queue elements *
2629 * *
2630 ******************************************************************************/
hc_queue_elem_compare_func(const void * d1,const void * d2)2631 static int hc_queue_elem_compare_func(const void *d1, const void *d2)
2632 {
2633 const zbx_binary_heap_elem_t *e1 = (const zbx_binary_heap_elem_t *)d1;
2634 const zbx_binary_heap_elem_t *e2 = (const zbx_binary_heap_elem_t *)d2;
2635
2636 const zbx_hc_item_t *item1 = (const zbx_hc_item_t *)e1->data;
2637 const zbx_hc_item_t *item2 = (const zbx_hc_item_t *)e2->data;
2638
2639 /* compare by timestamp of the oldest value */
2640 return zbx_timespec_compare(&item1->tail->ts, &item2->tail->ts);
2641 }
2642
2643 /******************************************************************************
2644 * *
2645 * Function: hc_free_data *
2646 * *
2647 * Purpose: free history item data allocated in history cache *
2648 * *
2649 * Parameters: data - [IN] history item data *
2650 * *
2651 ******************************************************************************/
hc_free_data(zbx_hc_data_t * data)2652 static void hc_free_data(zbx_hc_data_t *data)
2653 {
2654 if (ITEM_STATE_NOTSUPPORTED == data->state)
2655 {
2656 __hc_mem_free_func(data->value.str);
2657 }
2658 else
2659 {
2660 if (0 == (data->flags & ZBX_DC_FLAG_NOVALUE))
2661 {
2662 switch (data->value_type)
2663 {
2664 case ITEM_VALUE_TYPE_STR:
2665 case ITEM_VALUE_TYPE_TEXT:
2666 __hc_mem_free_func(data->value.str);
2667 break;
2668 case ITEM_VALUE_TYPE_LOG:
2669 __hc_mem_free_func(data->value.log->value);
2670
2671 if (NULL != data->value.log->source)
2672 __hc_mem_free_func(data->value.log->source);
2673
2674 __hc_mem_free_func(data->value.log);
2675 break;
2676 }
2677 }
2678 }
2679
2680 __hc_mem_free_func(data);
2681 }
2682
2683 /******************************************************************************
2684 * *
2685 * Function: hc_queue_item *
2686 * *
2687 * Purpose: put back item into history queue *
2688 * *
2689 * Parameters: data - [IN] history item data *
2690 * *
2691 ******************************************************************************/
hc_queue_item(zbx_hc_item_t * item)2692 static void hc_queue_item(zbx_hc_item_t *item)
2693 {
2694 zbx_binary_heap_elem_t elem = {item->itemid, (const void *)item};
2695
2696 zbx_binary_heap_insert(&cache->history_queue, &elem);
2697 }
2698
2699 /******************************************************************************
2700 * *
2701 * Function: hc_get_item *
2702 * *
2703 * Purpose: returns history item by itemid *
2704 * *
2705 * Parameters: itemid - [IN] the item id *
2706 * *
2707 * Return value: the history item or NULL if the requested item is not in *
2708 * history cache *
2709 * *
2710 ******************************************************************************/
hc_get_item(zbx_uint64_t itemid)2711 static zbx_hc_item_t *hc_get_item(zbx_uint64_t itemid)
2712 {
2713 return (zbx_hc_item_t *)zbx_hashset_search(&cache->history_items, &itemid);
2714 }
2715
2716 /******************************************************************************
2717 * *
2718 * Function: hc_add_item *
2719 * *
2720 * Purpose: adds a new item to history cache *
2721 * *
2722 * Parameters: itemid - [IN] the item id *
2723 * [IN] the item data *
2724 * *
2725 * Return value: the added history item *
2726 * *
2727 ******************************************************************************/
hc_add_item(zbx_uint64_t itemid,zbx_hc_data_t * data)2728 static zbx_hc_item_t *hc_add_item(zbx_uint64_t itemid, zbx_hc_data_t *data)
2729 {
2730 zbx_hc_item_t item_local = {itemid, ZBX_HC_ITEM_STATUS_NORMAL, data, data};
2731
2732 return (zbx_hc_item_t *)zbx_hashset_insert(&cache->history_items, &item_local, sizeof(item_local));
2733 }
2734
2735 /******************************************************************************
2736 * *
2737 * Function: hc_mem_value_str_dup *
2738 * *
2739 * Purpose: copies string value to history cache *
2740 * *
2741 * Parameters: str - [IN] the string value *
2742 * *
2743 * Return value: the copied string or NULL if there was not enough memory *
2744 * *
2745 ******************************************************************************/
hc_mem_value_str_dup(const dc_value_str_t * str)2746 static char *hc_mem_value_str_dup(const dc_value_str_t *str)
2747 {
2748 char *ptr;
2749
2750 if (NULL == (ptr = (char *)__hc_mem_malloc_func(NULL, str->len)))
2751 return NULL;
2752
2753 memcpy(ptr, &string_values[str->pvalue], str->len - 1);
2754 ptr[str->len - 1] = '\0';
2755
2756 return ptr;
2757 }
2758
2759 /******************************************************************************
2760 * *
2761 * Function: hc_clone_history_str_data *
2762 * *
2763 * Purpose: clones string value into history data memory *
2764 * *
2765 * Parameters: dst - [IN/OUT] a reference to the cloned value *
2766 * str - [IN] the string value to clone *
2767 * *
2768 * Return value: SUCCESS - either there was no need to clone the string *
2769 * (it was empty or already cloned) or the string was *
2770 * cloned successfully *
2771 * FAIL - not enough memory *
2772 * *
2773 * Comments: This function can be called in loop with the same dst value *
2774 * until it finishes cloning string value. *
2775 * *
2776 ******************************************************************************/
hc_clone_history_str_data(char ** dst,const dc_value_str_t * str)2777 static int hc_clone_history_str_data(char **dst, const dc_value_str_t *str)
2778 {
2779 if (0 == str->len)
2780 return SUCCEED;
2781
2782 if (NULL != *dst)
2783 return SUCCEED;
2784
2785 if (NULL != (*dst = hc_mem_value_str_dup(str)))
2786 return SUCCEED;
2787
2788 return FAIL;
2789 }
2790
2791 /******************************************************************************
2792 * *
2793 * Function: hc_clone_history_log_data *
2794 * *
2795 * Purpose: clones log value into history data memory *
2796 * *
2797 * Parameters: dst - [IN/OUT] a reference to the cloned value *
2798 * item_value - [IN] the log value to clone *
2799 * *
2800 * Return value: SUCCESS - the log value was cloned successfully *
2801 * FAIL - not enough memory *
2802 * *
2803 * Comments: This function can be called in loop with the same dst value *
2804 * until it finishes cloning log value. *
2805 * *
2806 ******************************************************************************/
hc_clone_history_log_data(zbx_log_value_t ** dst,const dc_item_value_t * item_value)2807 static int hc_clone_history_log_data(zbx_log_value_t **dst, const dc_item_value_t *item_value)
2808 {
2809 if (NULL == *dst)
2810 {
2811 /* using realloc instead of malloc just to suppress 'not used' warning for realloc */
2812 if (NULL == (*dst = (zbx_log_value_t *)__hc_mem_realloc_func(NULL, sizeof(zbx_log_value_t))))
2813 return FAIL;
2814
2815 memset(*dst, 0, sizeof(zbx_log_value_t));
2816 }
2817
2818 if (SUCCEED != hc_clone_history_str_data(&(*dst)->value, &item_value->value.value_str))
2819 return FAIL;
2820
2821 if (SUCCEED != hc_clone_history_str_data(&(*dst)->source, &item_value->source))
2822 return FAIL;
2823
2824 (*dst)->logeventid = item_value->logeventid;
2825 (*dst)->severity = item_value->severity;
2826 (*dst)->timestamp = item_value->timestamp;
2827
2828 return SUCCEED;
2829 }
2830
2831 /******************************************************************************
2832 * *
2833 * Function: hc_clone_history_data *
2834 * *
2835 * Purpose: clones item value from local cache into history cache *
2836 * *
2837 * Parameters: data - [IN/OUT] a reference to the cloned value *
2838 * item_value - [IN] the item value *
2839 * *
2840 * Return value: SUCCESS - the item value was cloned successfully *
2841 * FAIL - not enough memory *
2842 * *
2843 * Comments: This function can be called in loop with the same data value *
2844 * until it finishes cloning item value. *
2845 * *
2846 ******************************************************************************/
hc_clone_history_data(zbx_hc_data_t ** data,const dc_item_value_t * item_value)2847 static int hc_clone_history_data(zbx_hc_data_t **data, const dc_item_value_t *item_value)
2848 {
2849 if (NULL == *data)
2850 {
2851 if (NULL == (*data = (zbx_hc_data_t *)__hc_mem_malloc_func(NULL, sizeof(zbx_hc_data_t))))
2852 return FAIL;
2853
2854 memset(*data, 0, sizeof(zbx_hc_data_t));
2855
2856 (*data)->state = item_value->state;
2857 (*data)->ts = item_value->ts;
2858 (*data)->flags = item_value->flags;
2859 }
2860
2861 if (ITEM_STATE_NOTSUPPORTED == item_value->state)
2862 {
2863 if (NULL == ((*data)->value.str = hc_mem_value_str_dup(&item_value->value.value_str)))
2864 return FAIL;
2865
2866 cache->stats.notsupported_counter++;
2867
2868 return SUCCEED;
2869 }
2870
2871 if (0 != (ZBX_DC_FLAG_LLD & item_value->flags))
2872 {
2873 if (NULL == ((*data)->value.str = hc_mem_value_str_dup(&item_value->value.value_str)))
2874 return FAIL;
2875
2876 (*data)->value_type = ITEM_VALUE_TYPE_TEXT;
2877
2878 cache->stats.history_text_counter++;
2879 cache->stats.history_counter++;
2880
2881 return SUCCEED;
2882 }
2883
2884 if (0 == (ZBX_DC_FLAG_NOVALUE & item_value->flags))
2885 {
2886 switch (item_value->value_type)
2887 {
2888 case ITEM_VALUE_TYPE_FLOAT:
2889 (*data)->value.dbl = item_value->value.value_dbl;
2890 cache->stats.history_float_counter++;
2891 break;
2892 case ITEM_VALUE_TYPE_UINT64:
2893 (*data)->value.ui64 = item_value->value.value_uint;
2894 cache->stats.history_uint_counter++;
2895 break;
2896 case ITEM_VALUE_TYPE_STR:
2897 if (SUCCEED != hc_clone_history_str_data(&(*data)->value.str, &item_value->value.value_str))
2898 return FAIL;
2899
2900 cache->stats.history_str_counter++;
2901 break;
2902 case ITEM_VALUE_TYPE_TEXT:
2903 if (SUCCEED != hc_clone_history_str_data(&(*data)->value.str, &item_value->value.value_str))
2904 return FAIL;
2905
2906 cache->stats.history_text_counter++;
2907 break;
2908 case ITEM_VALUE_TYPE_LOG:
2909 if (SUCCEED != hc_clone_history_log_data(&(*data)->value.log, item_value))
2910 return FAIL;
2911
2912 cache->stats.history_log_counter++;
2913 break;
2914 }
2915
2916 cache->stats.history_counter++;
2917 }
2918
2919 (*data)->value_type = item_value->value_type;
2920
2921 if (0 != (ZBX_DC_FLAG_META & item_value->flags))
2922 {
2923 (*data)->lastlogsize = item_value->lastlogsize;
2924 (*data)->mtime = item_value->mtime;
2925 }
2926
2927 return SUCCEED;
2928 }
2929
2930 /******************************************************************************
2931 * *
2932 * Function: hc_add_item_values *
2933 * *
2934 * Purpose: adds item values to the history cache *
2935 * *
2936 * Parameters: item_values - [IN] the item values to add *
2937 * item_values_num - [IN] the number of item values to add *
2938 * *
2939 * Comments: If the history cache is full this function will wait until *
2940 * history syncers processes values freeing enough space to store *
2941 * the new value. *
2942 * *
2943 ******************************************************************************/
hc_add_item_values(dc_item_value_t * item_values,int item_values_num)2944 static void hc_add_item_values(dc_item_value_t *item_values, int item_values_num)
2945 {
2946 dc_item_value_t *item_value;
2947 int i;
2948 zbx_hc_item_t *item;
2949
2950 for (i = 0; i < item_values_num; i++)
2951 {
2952 zbx_hc_data_t *data = NULL;
2953
2954 item_value = &item_values[i];
2955
2956 while (SUCCEED != hc_clone_history_data(&data, item_value))
2957 {
2958 UNLOCK_CACHE;
2959
2960 zabbix_log(LOG_LEVEL_DEBUG, "History buffer is full. Sleeping for 1 second.");
2961 sleep(1);
2962
2963 LOCK_CACHE;
2964 }
2965
2966 if (NULL == (item = hc_get_item(item_value->itemid)))
2967 {
2968 item = hc_add_item(item_value->itemid, data);
2969 hc_queue_item(item);
2970 }
2971 else
2972 {
2973 item->head->next = data;
2974 item->head = data;
2975 }
2976 }
2977 }
2978
2979 /******************************************************************************
2980 * *
2981 * Function: hc_copy_history_data *
2982 * *
2983 * Purpose: copies item value from history cache into the specified history *
2984 * value *
2985 * *
2986 * Parameters: history - [OUT] the history value *
2987 * itemid - [IN] the item identifier *
2988 * data - [IN] the history data to copy *
2989 * *
2990 * Comments: handling of uninitialized fields in dc_add_proxy_history_log() *
2991 * *
2992 ******************************************************************************/
hc_copy_history_data(ZBX_DC_HISTORY * history,zbx_uint64_t itemid,zbx_hc_data_t * data)2993 static void hc_copy_history_data(ZBX_DC_HISTORY *history, zbx_uint64_t itemid, zbx_hc_data_t *data)
2994 {
2995 history->itemid = itemid;
2996 history->ts = data->ts;
2997 history->state = data->state;
2998 history->flags = data->flags;
2999 history->keep_history = 0;
3000 history->keep_trends = 0;
3001
3002 if (ITEM_STATE_NOTSUPPORTED == data->state)
3003 {
3004 history->value_orig.err = zbx_strdup(NULL, data->value.str);
3005 history->flags |= ZBX_DC_FLAG_UNDEF;
3006 return;
3007 }
3008
3009 history->value_type = data->value_type;
3010 history->lastlogsize = data->lastlogsize;
3011 history->mtime = data->mtime;
3012
3013 if (0 == (ZBX_DC_FLAG_NOVALUE & data->flags))
3014 {
3015 switch (data->value_type)
3016 {
3017 case ITEM_VALUE_TYPE_FLOAT:
3018 history->value_orig.dbl = data->value.dbl;
3019 history->value.dbl = 0;
3020 break;
3021 case ITEM_VALUE_TYPE_UINT64:
3022 history->value_orig.ui64 = data->value.ui64;
3023 history->value.ui64 = 0;
3024 break;
3025 case ITEM_VALUE_TYPE_STR:
3026 case ITEM_VALUE_TYPE_TEXT:
3027 history->value_orig.str = zbx_strdup(NULL, data->value.str);
3028 break;
3029 case ITEM_VALUE_TYPE_LOG:
3030 history->value_orig.str = zbx_strdup(NULL, data->value.log->value);
3031 if (NULL != data->value.log->source)
3032 history->value.str = zbx_strdup(NULL, data->value.log->source);
3033 else
3034 history->value.str = NULL;
3035
3036 history->timestamp = data->value.log->timestamp;
3037 history->severity = data->value.log->severity;
3038 history->logeventid = data->value.log->logeventid;
3039
3040 break;
3041 }
3042 }
3043 }
3044
3045 /******************************************************************************
3046 * *
3047 * Function: hc_pop_items *
3048 * *
3049 * Purpose: pops the next batch of history items from cache for processing *
3050 * *
3051 * Parameters: history_items - [OUT] the locked history items *
3052 * *
3053 * Comments: The history_items must be returned back to history cache with *
3054 * hc_push_items() function after they have been processed. *
3055 * *
3056 ******************************************************************************/
hc_pop_items(zbx_vector_ptr_t * history_items)3057 static void hc_pop_items(zbx_vector_ptr_t *history_items)
3058 {
3059 zbx_binary_heap_elem_t *elem;
3060 zbx_hc_item_t *item;
3061
3062 while (ZBX_HC_SYNC_MAX > history_items->values_num && FAIL == zbx_binary_heap_empty(&cache->history_queue))
3063 {
3064 elem = zbx_binary_heap_find_min(&cache->history_queue);
3065 item = (zbx_hc_item_t *)elem->data;
3066 zbx_vector_ptr_append(history_items, item);
3067
3068 zbx_binary_heap_remove_min(&cache->history_queue);
3069 }
3070 }
3071
3072 /******************************************************************************
3073 * *
3074 * Function: hc_get_item_values *
3075 * *
3076 * Purpose: gets item history values *
3077 * *
3078 * Parameters: history - [OUT] the history valeus *
3079 * history_items - [IN] the history items *
3080 * *
3081 ******************************************************************************/
hc_get_item_values(ZBX_DC_HISTORY * history,zbx_vector_ptr_t * history_items)3082 static void hc_get_item_values(ZBX_DC_HISTORY *history, zbx_vector_ptr_t *history_items)
3083 {
3084 int i, history_num = 0;
3085 zbx_hc_item_t *item;
3086
3087 /* we don't need to lock history cache because no other processes can */
3088 /* change item's history data until it is pushed back to history queue */
3089 for (i = 0; i < history_items->values_num; i++)
3090 {
3091 /* busy items were replaced with NULL values in hc_push_busy_items() function */
3092 if (NULL == (item = (zbx_hc_item_t *)history_items->values[i]))
3093 continue;
3094
3095 hc_copy_history_data(&history[history_num++], item->itemid, item->tail);
3096 }
3097 }
3098
3099 /******************************************************************************
3100 * *
3101 * Function: hc_push_busy_items *
3102 * *
3103 * Purpose: push back the busy (locked by triggers) items into history cache *
3104 * *
3105 * Parameters: history_items - [IN] the history items *
3106 * *
3107 ******************************************************************************/
hc_push_busy_items(zbx_vector_ptr_t * history_items)3108 static void hc_push_busy_items(zbx_vector_ptr_t *history_items)
3109 {
3110 int i;
3111 zbx_hc_item_t *item;
3112
3113 for (i = 0; i < history_items->values_num; i++)
3114 {
3115 item = (zbx_hc_item_t *)history_items->values[i];
3116
3117 if (ZBX_HC_ITEM_STATUS_NORMAL == item->status)
3118 continue;
3119
3120 /* reset item status before returning it to queue */
3121 item->status = ZBX_HC_ITEM_STATUS_NORMAL;
3122 hc_queue_item(item);
3123
3124 /* After pushing back to queue current syncer has released ownership of this item. */
3125 /* To avoid using it further reset the item reference in vector to NULL. */
3126 history_items->values[i] = NULL;
3127 }
3128 }
3129
3130 /******************************************************************************
3131 * *
3132 * Function: hc_push_processed_items *
3133 * *
3134 * Purpose: push back the processed history items into history cache *
3135 * *
3136 * Parameters: history_items - [IN] the history items containing processed *
3137 * (available) and busy items *
3138 * *
3139 * Return value: time of the next history item to sync *
3140 * *
3141 * Comments: This function removes processed value from history cache. *
3142 * If there is no more data for this item, then the item itself is *
3143 * removed from history index. *
3144 * *
3145 ******************************************************************************/
hc_push_processed_items(zbx_vector_ptr_t * history_items)3146 static int hc_push_processed_items(zbx_vector_ptr_t *history_items)
3147 {
3148 int i;
3149 zbx_hc_item_t *item;
3150 zbx_hc_data_t *data_free;
3151 int next_sync;
3152
3153 for (i = 0; i < history_items->values_num; i++)
3154 {
3155 /* busy items were replaced with NULL values in hc_push_busy_items() function */
3156 if (NULL == (item = (zbx_hc_item_t *)history_items->values[i]))
3157 continue;
3158
3159 data_free = item->tail;
3160 item->tail = item->tail->next;
3161 hc_free_data(data_free);
3162
3163 if (NULL == item->tail)
3164 {
3165 zbx_hashset_remove(&cache->history_items, item);
3166 continue;
3167 }
3168
3169 hc_queue_item(item);
3170 }
3171
3172 if (FAIL == zbx_binary_heap_empty(&cache->history_queue))
3173 {
3174 zbx_binary_heap_elem_t *elem;
3175
3176 elem = zbx_binary_heap_find_min(&cache->history_queue);
3177 item = (zbx_hc_item_t *)elem->data;
3178
3179 next_sync = item->tail->ts.sec;
3180 }
3181 else
3182 next_sync = 0;
3183
3184 return next_sync;
3185 }
3186
3187 /******************************************************************************
3188 * *
3189 * Function: hc_free_item_values *
3190 * *
3191 * Purpose: frees resources allocated to store str/text/log values *
3192 * *
3193 * Parameters: history - [IN] the history data (
3194 * history_num - [IN] the number of values in history data *
3195 * *
3196 ******************************************************************************/
hc_free_item_values(ZBX_DC_HISTORY * history,int history_num)3197 static void hc_free_item_values(ZBX_DC_HISTORY *history, int history_num)
3198 {
3199 int i;
3200
3201 for (i = 0; i < history_num; i++)
3202 {
3203 if (ITEM_STATE_NOTSUPPORTED == history[i].state)
3204 {
3205 zbx_free(history[i].value_orig.err);
3206 continue;
3207 }
3208
3209 if (0 != (ZBX_DC_FLAG_NOVALUE & history[i].flags))
3210 continue;
3211
3212 switch (history[i].value_type)
3213 {
3214 case ITEM_VALUE_TYPE_LOG:
3215 zbx_free(history[i].value.str);
3216 /* break; is not missing here */
3217 case ITEM_VALUE_TYPE_STR:
3218 case ITEM_VALUE_TYPE_TEXT:
3219 zbx_free(history[i].value_orig.str);
3220 break;
3221 }
3222 }
3223 }
3224
3225 /******************************************************************************
3226 * *
3227 * Function: init_trend_cache *
3228 * *
3229 * Purpose: Allocate shared memory for trend cache (part of database cache) *
3230 * *
3231 * Author: Vladimir Levijev *
3232 * *
3233 * Comments: Is optionally called from init_database_cache() *
3234 * *
3235 ******************************************************************************/
3236
3237 ZBX_MEM_FUNC_IMPL(__trend, trend_mem);
3238
init_trend_cache()3239 static void init_trend_cache()
3240 {
3241 const char *__function_name = "init_trend_cache";
3242 key_t trend_shm_key;
3243 size_t sz;
3244
3245 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
3246
3247 if (-1 == (trend_shm_key = zbx_ftok(CONFIG_FILE, ZBX_IPC_TREND_ID)))
3248 {
3249 zabbix_log(LOG_LEVEL_CRIT, "cannot create IPC key for trend cache");
3250 exit(EXIT_FAILURE);
3251 }
3252
3253 if (FAIL == zbx_mutex_create_force(&trends_lock, ZBX_MUTEX_TRENDS))
3254 {
3255 zbx_error("cannot create mutex for trend cache");
3256 exit(EXIT_FAILURE);
3257 }
3258
3259 sz = zbx_mem_required_size(1, "trend cache", "TrendCacheSize");
3260 zbx_mem_create(&trend_mem, trend_shm_key, ZBX_NO_MUTEX, CONFIG_TRENDS_CACHE_SIZE,
3261 "trend cache", "TrendCacheSize", 0);
3262 CONFIG_TRENDS_CACHE_SIZE -= sz;
3263
3264 cache->trends_num = 0;
3265 cache->trends_last_cleanup_hour = 0;
3266
3267 #define INIT_HASHSET_SIZE 100 /* Should be calculated dynamically based on trends size? */
3268 /* Still does not make sense to have it more than initial */
3269 /* item hashset size in configuration cache. */
3270
3271 zbx_hashset_create_ext(&cache->trends, INIT_HASHSET_SIZE,
3272 ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, NULL,
3273 __trend_mem_malloc_func, __trend_mem_realloc_func, __trend_mem_free_func);
3274
3275 #undef INIT_HASHSET_SIZE
3276
3277 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
3278 }
3279
3280 /******************************************************************************
3281 * *
3282 * Function: init_database_cache *
3283 * *
3284 * Purpose: Allocate shared memory for database cache *
3285 * *
3286 * Author: Alexei Vladishev, Alexander Vladishev *
3287 * *
3288 ******************************************************************************/
init_database_cache()3289 void init_database_cache()
3290 {
3291 const char *__function_name = "init_database_cache";
3292 key_t hc_shm_key, hc_index_shm_key;
3293
3294 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
3295
3296 if (-1 == (hc_shm_key = zbx_ftok(CONFIG_FILE, ZBX_IPC_HISTORY_ID)) ||
3297 -1 == (hc_index_shm_key = zbx_ftok(CONFIG_FILE, ZBX_IPC_HISTORY_INDEX_ID)))
3298 {
3299 zabbix_log(LOG_LEVEL_CRIT, "cannot create IPC keys for history cache");
3300 exit(EXIT_FAILURE);
3301 }
3302
3303 if (FAIL == zbx_mutex_create_force(&cache_lock, ZBX_MUTEX_CACHE))
3304 {
3305 zbx_error("cannot create mutex for history cache");
3306 exit(EXIT_FAILURE);
3307 }
3308
3309 if (FAIL == zbx_mutex_create_force(&cache_ids_lock, ZBX_MUTEX_CACHE_IDS))
3310 {
3311 zbx_error("cannot create mutex for IDs cache");
3312 exit(EXIT_FAILURE);
3313 }
3314
3315 /* history cache */
3316 zbx_mem_create(&hc_mem, hc_shm_key, ZBX_NO_MUTEX, CONFIG_HISTORY_CACHE_SIZE, "history cache",
3317 "HistoryCacheSize", 1);
3318
3319 /* history index cache */
3320 zbx_mem_create(&hc_index_mem, hc_index_shm_key, ZBX_NO_MUTEX, CONFIG_HISTORY_INDEX_CACHE_SIZE,
3321 "history index cache", "HistoryIndexCacheSize", 0);
3322
3323 cache = (ZBX_DC_CACHE *)__hc_index_mem_malloc_func(NULL, sizeof(ZBX_DC_CACHE));
3324 memset(cache, 0, sizeof(ZBX_DC_CACHE));
3325
3326 ids = (ZBX_DC_IDS *)__hc_index_mem_malloc_func(NULL, sizeof(ZBX_DC_IDS));
3327 memset(ids, 0, sizeof(ZBX_DC_IDS));
3328
3329 zbx_hashset_create_ext(&cache->history_items, ZBX_HC_ITEMS_INIT_SIZE,
3330 ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, NULL,
3331 __hc_index_mem_malloc_func, __hc_index_mem_realloc_func, __hc_index_mem_free_func);
3332
3333 zbx_binary_heap_create_ext(&cache->history_queue, hc_queue_elem_compare_func, ZBX_BINARY_HEAP_OPTION_EMPTY,
3334 __hc_index_mem_malloc_func, __hc_index_mem_realloc_func, __hc_index_mem_free_func);
3335
3336 /* trend cache */
3337 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
3338 init_trend_cache();
3339
3340 if (NULL == sql)
3341 sql = zbx_malloc(sql, sql_alloc);
3342
3343 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
3344 }
3345
3346 /******************************************************************************
3347 * *
3348 * Function: DCsync_all *
3349 * *
3350 * Purpose: writes updates and new data from pool and cache data to database *
3351 * *
3352 * Author: Alexei Vladishev *
3353 * *
3354 ******************************************************************************/
DCsync_all()3355 static void DCsync_all()
3356 {
3357 int sync_num;
3358
3359 zabbix_log(LOG_LEVEL_DEBUG, "In DCsync_all()");
3360
3361 DCsync_history(ZBX_SYNC_FULL, &sync_num);
3362 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
3363 DCsync_trends();
3364
3365 zabbix_log(LOG_LEVEL_DEBUG, "End of DCsync_all()");
3366 }
3367
3368 /******************************************************************************
3369 * *
3370 * Function: free_database_cache *
3371 * *
3372 * Purpose: Free memory allocated for database cache *
3373 * *
3374 * Author: Alexei Vladishev, Alexander Vladishev *
3375 * *
3376 ******************************************************************************/
free_database_cache()3377 void free_database_cache()
3378 {
3379 const char *__function_name = "free_database_cache";
3380
3381 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
3382
3383 DCsync_all();
3384
3385 cache = NULL;
3386
3387 zbx_mem_destroy(hc_mem);
3388 zbx_mem_destroy(hc_index_mem);
3389
3390 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
3391 zbx_mem_destroy(trend_mem);
3392
3393 zbx_mutex_destroy(&cache_lock);
3394 zbx_mutex_destroy(&cache_ids_lock);
3395
3396 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
3397 zbx_mutex_destroy(&trends_lock);
3398
3399 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
3400 }
3401
3402 /******************************************************************************
3403 * *
3404 * Function: DCget_nextid *
3405 * *
3406 * Purpose: Return next id for requested table *
3407 * *
3408 * Author: Alexander Vladishev *
3409 * *
3410 ******************************************************************************/
DCget_nextid(const char * table_name,int num)3411 zbx_uint64_t DCget_nextid(const char *table_name, int num)
3412 {
3413 const char *__function_name = "DCget_nextid";
3414 int i;
3415 DB_RESULT result;
3416 DB_ROW row;
3417 const ZBX_TABLE *table;
3418 ZBX_DC_ID *id;
3419 zbx_uint64_t min = 0, max = ZBX_DB_MAX_ID, nextid, lastid;
3420
3421 zabbix_log(LOG_LEVEL_DEBUG, "In %s() table:'%s' num:%d",
3422 __function_name, table_name, num);
3423
3424 LOCK_CACHE_IDS;
3425
3426 for (i = 0; i < ZBX_IDS_SIZE; i++)
3427 {
3428 id = &ids->id[i];
3429 if ('\0' == *id->table_name)
3430 break;
3431
3432 if (0 == strcmp(id->table_name, table_name))
3433 {
3434 nextid = id->lastid + 1;
3435 id->lastid += num;
3436 lastid = id->lastid;
3437
3438 UNLOCK_CACHE_IDS;
3439
3440 zabbix_log(LOG_LEVEL_DEBUG, "End of %s() table:'%s' [" ZBX_FS_UI64 ":" ZBX_FS_UI64 "]",
3441 __function_name, table_name, nextid, lastid);
3442
3443 return nextid;
3444 }
3445 }
3446
3447 if (i == ZBX_IDS_SIZE)
3448 {
3449 zabbix_log(LOG_LEVEL_ERR, "insufficient shared memory for ids");
3450 exit(EXIT_FAILURE);
3451 }
3452
3453 table = DBget_table(table_name);
3454
3455 result = DBselect("select max(%s) from %s where %s between " ZBX_FS_UI64 " and " ZBX_FS_UI64,
3456 table->recid, table_name, table->recid, min, max);
3457
3458 if (NULL != result)
3459 {
3460 zbx_strlcpy(id->table_name, table_name, sizeof(id->table_name));
3461
3462 if (NULL == (row = DBfetch(result)) || SUCCEED == DBis_null(row[0]))
3463 id->lastid = min;
3464 else
3465 ZBX_STR2UINT64(id->lastid, row[0]);
3466
3467 nextid = id->lastid + 1;
3468 id->lastid += num;
3469 lastid = id->lastid;
3470 }
3471 else
3472 nextid = lastid = 0;
3473
3474 UNLOCK_CACHE_IDS;
3475
3476 DBfree_result(result);
3477
3478 zabbix_log(LOG_LEVEL_DEBUG, "End of %s() table:'%s' [" ZBX_FS_UI64 ":" ZBX_FS_UI64 "]",
3479 __function_name, table_name, nextid, lastid);
3480
3481 return nextid;
3482 }
3483
3484 /******************************************************************************
3485 * *
3486 * Function: DCupdate_hosts_availability *
3487 * *
3488 * Purpose: performs host availability reset for hosts with availability set *
3489 * on interfaces without enabled items *
3490 * *
3491 ******************************************************************************/
DCupdate_hosts_availability()3492 void DCupdate_hosts_availability()
3493 {
3494 const char *__function_name = "DCupdate_hosts_availability";
3495 zbx_vector_ptr_t hosts;
3496 char *sql = NULL;
3497 size_t sql_alloc = 0, sql_offset = 0;
3498 int i;
3499
3500 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
3501
3502 zbx_vector_ptr_create(&hosts);
3503
3504 if (SUCCEED != DCreset_hosts_availability(&hosts))
3505 goto out;
3506
3507 DBbegin();
3508 DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset);
3509
3510 for (i = 0; i < hosts.values_num; i++)
3511 {
3512 if (SUCCEED == zbx_sql_add_host_availability(&sql, &sql_alloc, &sql_offset, hosts.values[i]))
3513 zbx_strcpy_alloc(&sql, &sql_alloc, &sql_offset, ";\n");
3514
3515 DBexecute_overflowed_sql(&sql, &sql_alloc, &sql_offset);
3516 }
3517
3518 DBend_multiple_update(&sql, &sql_alloc, &sql_offset);
3519
3520 if (16 < sql_offset)
3521 DBexecute("%s", sql);
3522
3523 DBcommit();
3524
3525 zbx_free(sql);
3526 out:
3527 zbx_vector_ptr_clear_ext(&hosts, (zbx_mem_free_func_t)zbx_host_availability_free);
3528 zbx_vector_ptr_destroy(&hosts);
3529
3530 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
3531 }
3532