1 /*
2 ** Zabbix
3 ** Copyright (C) 2001-2021 Zabbix SIA
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 **/
19
20 #include "common.h"
21 #include "log.h"
22 #include "threads.h"
23
24 #include "db.h"
25 #include "dbcache.h"
26 #include "ipc.h"
27 #include "mutexs.h"
28 #include "zbxserver.h"
29 #include "proxy.h"
30 #include "events.h"
31 #include "memalloc.h"
32 #include "zbxalgo.h"
33 #include "valuecache.h"
34 #include "zbxmodules.h"
35 #include "module.h"
36 #include "export.h"
37 #include "zbxjson.h"
38 #include "zbxhistory.h"
39
40 static zbx_mem_info_t *hc_index_mem = NULL;
41 static zbx_mem_info_t *hc_mem = NULL;
42 static zbx_mem_info_t *trend_mem = NULL;
43
44 #define LOCK_CACHE zbx_mutex_lock(cache_lock)
45 #define UNLOCK_CACHE zbx_mutex_unlock(cache_lock)
46 #define LOCK_TRENDS zbx_mutex_lock(trends_lock)
47 #define UNLOCK_TRENDS zbx_mutex_unlock(trends_lock)
48 #define LOCK_CACHE_IDS zbx_mutex_lock(cache_ids_lock)
49 #define UNLOCK_CACHE_IDS zbx_mutex_unlock(cache_ids_lock)
50
51 static zbx_mutex_t cache_lock = ZBX_MUTEX_NULL;
52 static zbx_mutex_t trends_lock = ZBX_MUTEX_NULL;
53 static zbx_mutex_t cache_ids_lock = ZBX_MUTEX_NULL;
54
55 static char *sql = NULL;
56 static size_t sql_alloc = 4 * ZBX_KIBIBYTE;
57
58 extern unsigned char program_type;
59
60 #define ZBX_IDS_SIZE 9
61
62 #define ZBX_HC_ITEMS_INIT_SIZE 1000
63
64 #define ZBX_TRENDS_CLEANUP_TIME ((SEC_PER_HOUR * 55) / 60)
65
66 /* the maximum time spent synchronizing history */
67 #define ZBX_HC_SYNC_TIME_MAX 10
68
69 /* the maximum number of items in one synchronization batch */
70 #define ZBX_HC_SYNC_MAX 1000
71 #define ZBX_HC_TIMER_MAX (ZBX_HC_SYNC_MAX / 2)
72
73 /* the minimum processed item percentage of item candidates to continue synchronizing */
74 #define ZBX_HC_SYNC_MIN_PCNT 10
75
76 /* the maximum number of characters for history cache values */
77 #define ZBX_HISTORY_VALUE_LEN (1024 * 64)
78
79 #define ZBX_DC_FLAGS_NOT_FOR_HISTORY (ZBX_DC_FLAG_NOVALUE | ZBX_DC_FLAG_UNDEF | ZBX_DC_FLAG_NOHISTORY)
80 #define ZBX_DC_FLAGS_NOT_FOR_TRENDS (ZBX_DC_FLAG_NOVALUE | ZBX_DC_FLAG_UNDEF | ZBX_DC_FLAG_NOTRENDS)
81 #define ZBX_DC_FLAGS_NOT_FOR_MODULES (ZBX_DC_FLAGS_NOT_FOR_HISTORY | ZBX_DC_FLAG_LLD)
82 #define ZBX_DC_FLAGS_NOT_FOR_EXPORT (ZBX_DC_FLAG_NOVALUE | ZBX_DC_FLAG_UNDEF)
83
84 typedef struct
85 {
86 char table_name[ZBX_TABLENAME_LEN_MAX];
87 zbx_uint64_t lastid;
88 }
89 ZBX_DC_ID;
90
91 typedef struct
92 {
93 ZBX_DC_ID id[ZBX_IDS_SIZE];
94 }
95 ZBX_DC_IDS;
96
97 static ZBX_DC_IDS *ids = NULL;
98
99 typedef struct
100 {
101 zbx_hashset_t trends;
102 ZBX_DC_STATS stats;
103
104 zbx_hashset_t history_items;
105 zbx_binary_heap_t history_queue;
106
107 int history_num;
108 int trends_num;
109 int trends_last_cleanup_hour;
110 int history_num_total;
111 int history_progress_ts;
112 }
113 ZBX_DC_CACHE;
114
115 static ZBX_DC_CACHE *cache = NULL;
116
117 /* local history cache */
118 #define ZBX_MAX_VALUES_LOCAL 256
119 #define ZBX_STRUCT_REALLOC_STEP 8
120 #define ZBX_STRING_REALLOC_STEP ZBX_KIBIBYTE
121
122 typedef struct
123 {
124 size_t pvalue;
125 size_t len;
126 }
127 dc_value_str_t;
128
129 typedef struct
130 {
131 double value_dbl;
132 zbx_uint64_t value_uint;
133 dc_value_str_t value_str;
134 }
135 dc_value_t;
136
137 typedef struct
138 {
139 zbx_uint64_t itemid;
140 dc_value_t value;
141 zbx_timespec_t ts;
142 dc_value_str_t source; /* for log items only */
143 zbx_uint64_t lastlogsize;
144 int timestamp; /* for log items only */
145 int severity; /* for log items only */
146 int logeventid; /* for log items only */
147 int mtime;
148 unsigned char item_value_type;
149 unsigned char value_type;
150 unsigned char state;
151 unsigned char flags; /* see ZBX_DC_FLAG_* above */
152 }
153 dc_item_value_t;
154
155 static char *string_values = NULL;
156 static size_t string_values_alloc = 0, string_values_offset = 0;
157 static dc_item_value_t *item_values = NULL;
158 static size_t item_values_alloc = 0, item_values_num = 0;
159
160 static void hc_add_item_values(dc_item_value_t *values, int values_num);
161 static void hc_pop_items(zbx_vector_ptr_t *history_items);
162 static void hc_get_item_values(ZBX_DC_HISTORY *history, zbx_vector_ptr_t *history_items);
163 static void hc_push_items(zbx_vector_ptr_t *history_items);
164 static void hc_free_item_values(ZBX_DC_HISTORY *history, int history_num);
165 static void hc_queue_item(zbx_hc_item_t *item);
166 static int hc_queue_elem_compare_func(const void *d1, const void *d2);
167 static int hc_queue_get_size(void);
168
169 /******************************************************************************
170 * *
171 * Function: DCget_stats_all *
172 * *
173 * Purpose: retrieves all internal metrics of the database cache *
174 * *
175 * Parameters: stats - [OUT] write cache metrics *
176 * *
177 ******************************************************************************/
DCget_stats_all(zbx_wcache_info_t * wcache_info)178 void DCget_stats_all(zbx_wcache_info_t *wcache_info)
179 {
180 LOCK_CACHE;
181
182 wcache_info->stats = cache->stats;
183 wcache_info->history_free = hc_mem->free_size;
184 wcache_info->history_total = hc_mem->total_size;
185 wcache_info->index_free = hc_index_mem->free_size;
186 wcache_info->index_total = hc_index_mem->total_size;
187
188 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
189 {
190 wcache_info->trend_free = trend_mem->free_size;
191 wcache_info->trend_total = trend_mem->orig_size;
192 }
193
194 UNLOCK_CACHE;
195 }
196
197 /******************************************************************************
198 * *
199 * Function: DCget_stats *
200 * *
201 * Purpose: get statistics of the database cache *
202 * *
203 * Author: Alexander Vladishev *
204 * *
205 ******************************************************************************/
DCget_stats(int request)206 void *DCget_stats(int request)
207 {
208 static zbx_uint64_t value_uint;
209 static double value_double;
210 void *ret;
211
212 LOCK_CACHE;
213
214 switch (request)
215 {
216 case ZBX_STATS_HISTORY_COUNTER:
217 value_uint = cache->stats.history_counter;
218 ret = (void *)&value_uint;
219 break;
220 case ZBX_STATS_HISTORY_FLOAT_COUNTER:
221 value_uint = cache->stats.history_float_counter;
222 ret = (void *)&value_uint;
223 break;
224 case ZBX_STATS_HISTORY_UINT_COUNTER:
225 value_uint = cache->stats.history_uint_counter;
226 ret = (void *)&value_uint;
227 break;
228 case ZBX_STATS_HISTORY_STR_COUNTER:
229 value_uint = cache->stats.history_str_counter;
230 ret = (void *)&value_uint;
231 break;
232 case ZBX_STATS_HISTORY_LOG_COUNTER:
233 value_uint = cache->stats.history_log_counter;
234 ret = (void *)&value_uint;
235 break;
236 case ZBX_STATS_HISTORY_TEXT_COUNTER:
237 value_uint = cache->stats.history_text_counter;
238 ret = (void *)&value_uint;
239 break;
240 case ZBX_STATS_NOTSUPPORTED_COUNTER:
241 value_uint = cache->stats.notsupported_counter;
242 ret = (void *)&value_uint;
243 break;
244 case ZBX_STATS_HISTORY_TOTAL:
245 value_uint = hc_mem->total_size;
246 ret = (void *)&value_uint;
247 break;
248 case ZBX_STATS_HISTORY_USED:
249 value_uint = hc_mem->total_size - hc_mem->free_size;
250 ret = (void *)&value_uint;
251 break;
252 case ZBX_STATS_HISTORY_FREE:
253 value_uint = hc_mem->free_size;
254 ret = (void *)&value_uint;
255 break;
256 case ZBX_STATS_HISTORY_PUSED:
257 value_double = 100 * (double)(hc_mem->total_size - hc_mem->free_size) / hc_mem->total_size;
258 ret = (void *)&value_double;
259 break;
260 case ZBX_STATS_HISTORY_PFREE:
261 value_double = 100 * (double)hc_mem->free_size / hc_mem->total_size;
262 ret = (void *)&value_double;
263 break;
264 case ZBX_STATS_TREND_TOTAL:
265 value_uint = trend_mem->orig_size;
266 ret = (void *)&value_uint;
267 break;
268 case ZBX_STATS_TREND_USED:
269 value_uint = trend_mem->orig_size - trend_mem->free_size;
270 ret = (void *)&value_uint;
271 break;
272 case ZBX_STATS_TREND_FREE:
273 value_uint = trend_mem->free_size;
274 ret = (void *)&value_uint;
275 break;
276 case ZBX_STATS_TREND_PUSED:
277 value_double = 100 * (double)(trend_mem->orig_size - trend_mem->free_size) /
278 trend_mem->orig_size;
279 ret = (void *)&value_double;
280 break;
281 case ZBX_STATS_TREND_PFREE:
282 value_double = 100 * (double)trend_mem->free_size / trend_mem->orig_size;
283 ret = (void *)&value_double;
284 break;
285 case ZBX_STATS_HISTORY_INDEX_TOTAL:
286 value_uint = hc_index_mem->total_size;
287 ret = (void *)&value_uint;
288 break;
289 case ZBX_STATS_HISTORY_INDEX_USED:
290 value_uint = hc_index_mem->total_size - hc_index_mem->free_size;
291 ret = (void *)&value_uint;
292 break;
293 case ZBX_STATS_HISTORY_INDEX_FREE:
294 value_uint = hc_index_mem->free_size;
295 ret = (void *)&value_uint;
296 break;
297 case ZBX_STATS_HISTORY_INDEX_PUSED:
298 value_double = 100 * (double)(hc_index_mem->total_size - hc_index_mem->free_size) /
299 hc_index_mem->total_size;
300 ret = (void *)&value_double;
301 break;
302 case ZBX_STATS_HISTORY_INDEX_PFREE:
303 value_double = 100 * (double)hc_index_mem->free_size / hc_index_mem->total_size;
304 ret = (void *)&value_double;
305 break;
306 default:
307 ret = NULL;
308 }
309
310 UNLOCK_CACHE;
311
312 return ret;
313 }
314
315 /******************************************************************************
316 * *
317 * Function: DCget_trend *
318 * *
319 * Purpose: find existing or add new structure and return pointer *
320 * *
321 * Return value: pointer to a trend structure *
322 * *
323 * Author: Alexander Vladishev *
324 * *
325 ******************************************************************************/
DCget_trend(zbx_uint64_t itemid)326 static ZBX_DC_TREND *DCget_trend(zbx_uint64_t itemid)
327 {
328 ZBX_DC_TREND *ptr, trend;
329
330 if (NULL != (ptr = (ZBX_DC_TREND *)zbx_hashset_search(&cache->trends, &itemid)))
331 return ptr;
332
333 memset(&trend, 0, sizeof(ZBX_DC_TREND));
334 trend.itemid = itemid;
335
336 return (ZBX_DC_TREND *)zbx_hashset_insert(&cache->trends, &trend, sizeof(ZBX_DC_TREND));
337 }
338
339 /******************************************************************************
340 * *
341 * Function: DCupdate_trends *
342 * *
343 * Purpose: apply disable_from changes to cache *
344 * *
345 ******************************************************************************/
DCupdate_trends(zbx_vector_uint64_pair_t * trends_diff)346 static void DCupdate_trends(zbx_vector_uint64_pair_t *trends_diff)
347 {
348 const char *__function_name = "DCupdate_trends";
349 int i;
350
351 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
352
353 LOCK_TRENDS;
354
355 for (i = 0; i < trends_diff->values_num; i++)
356 {
357 ZBX_DC_TREND *trend;
358
359 if (NULL != (trend = (ZBX_DC_TREND *)zbx_hashset_search(&cache->trends, &trends_diff->values[i].first)))
360 trend->disable_from = trends_diff->values[i].second;
361 }
362
363 UNLOCK_TRENDS;
364
365 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
366 }
367
368 /******************************************************************************
369 * *
370 * Function: dc_insert_trends_in_db *
371 * *
372 * Purpose: helper function for DCflush trends *
373 * *
374 ******************************************************************************/
dc_insert_trends_in_db(ZBX_DC_TREND * trends,int trends_num,unsigned char value_type,const char * table_name,int clock)375 static void dc_insert_trends_in_db(ZBX_DC_TREND *trends, int trends_num, unsigned char value_type,
376 const char *table_name, int clock)
377 {
378 ZBX_DC_TREND *trend;
379 int i;
380 zbx_db_insert_t db_insert;
381
382 zbx_db_insert_prepare(&db_insert, table_name, "itemid", "clock", "num", "value_min", "value_avg",
383 "value_max", NULL);
384
385 for (i = 0; i < trends_num; i++)
386 {
387 trend = &trends[i];
388
389 if (0 == trend->itemid)
390 continue;
391
392 if (clock != trend->clock || value_type != trend->value_type)
393 continue;
394
395 if (ITEM_VALUE_TYPE_FLOAT == value_type)
396 {
397 zbx_db_insert_add_values(&db_insert, trend->itemid, trend->clock, trend->num,
398 trend->value_min.dbl, trend->value_avg.dbl, trend->value_max.dbl);
399 }
400 else
401 {
402 zbx_uint128_t avg;
403
404 /* calculate the trend average value */
405 udiv128_64(&avg, &trend->value_avg.ui64, trend->num);
406
407 zbx_db_insert_add_values(&db_insert, trend->itemid, trend->clock, trend->num,
408 trend->value_min.ui64, avg.lo, trend->value_max.ui64);
409 }
410
411 trend->itemid = 0;
412 }
413
414 zbx_db_insert_execute(&db_insert);
415 zbx_db_insert_clean(&db_insert);
416 }
417
418 /******************************************************************************
419 * *
420 * Function: dc_remove_updated_trends *
421 * *
422 * Purpose: Update trends disable_until for items without trends data past or *
423 * equal the specified clock *
424 * *
425 * Comments: A helper function for DCflush trends *
426 * *
427 ******************************************************************************/
dc_remove_updated_trends(ZBX_DC_TREND * trends,int trends_num,const char * table_name,int value_type,zbx_uint64_t * itemids,int * itemids_num,int clock)428 static void dc_remove_updated_trends(ZBX_DC_TREND *trends, int trends_num, const char *table_name,
429 int value_type, zbx_uint64_t *itemids, int *itemids_num, int clock)
430 {
431 int i, j, clocks_num, now, age;
432 ZBX_DC_TREND *trend;
433 zbx_uint64_t itemid;
434 size_t sql_offset;
435 DB_RESULT result;
436 DB_ROW row;
437 int clocks[] = {SEC_PER_DAY, SEC_PER_WEEK, SEC_PER_MONTH, SEC_PER_YEAR, INT_MAX};
438
439 now = time(NULL);
440 age = now - clock;
441 for (clocks_num = 0; age > clocks[clocks_num]; clocks_num++)
442 clocks[clocks_num] = now - clocks[clocks_num];
443 clocks[clocks_num] = clock;
444
445 /* remove itemids with trends data past or equal the clock */
446 for (j = 0; j <= clocks_num && 0 < *itemids_num; j++)
447 {
448 sql_offset = 0;
449 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
450 "select distinct itemid"
451 " from %s"
452 " where clock>=%d and",
453 table_name, clocks[j]);
454
455 if (0 < j)
456 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " clock<%d and", clocks[j - 1]);
457
458 DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", itemids, *itemids_num);
459
460 result = DBselect("%s", sql);
461
462 while (NULL != (row = DBfetch(result)))
463 {
464 ZBX_STR2UINT64(itemid, row[0]);
465 uint64_array_remove(itemids, itemids_num, &itemid, 1);
466 }
467 DBfree_result(result);
468 }
469
470 /* update trends disable_until for the leftover itemids */
471 while (0 != *itemids_num)
472 {
473 itemid = itemids[--*itemids_num];
474
475 for (i = 0; i < trends_num; i++)
476 {
477 trend = &trends[i];
478
479 if (itemid != trend->itemid)
480 continue;
481
482 if (clock != trend->clock || value_type != trend->value_type)
483 continue;
484
485 trend->disable_from = clock;
486 break;
487 }
488 }
489 }
490
491 /******************************************************************************
492 * *
493 * Function: dc_trends_update_float *
494 * *
495 * Purpose: helper function for DCflush trends *
496 * *
497 ******************************************************************************/
dc_trends_update_float(ZBX_DC_TREND * trend,DB_ROW row,int num,size_t * sql_offset)498 static void dc_trends_update_float(ZBX_DC_TREND *trend, DB_ROW row, int num, size_t *sql_offset)
499 {
500 history_value_t value_min, value_avg, value_max;
501
502 value_min.dbl = atof(row[2]);
503 value_avg.dbl = atof(row[3]);
504 value_max.dbl = atof(row[4]);
505
506 if (value_min.dbl < trend->value_min.dbl)
507 trend->value_min.dbl = value_min.dbl;
508 if (value_max.dbl > trend->value_max.dbl)
509 trend->value_max.dbl = value_max.dbl;
510 trend->value_avg.dbl = (trend->num * trend->value_avg.dbl
511 + num * value_avg.dbl) / (trend->num + num);
512 trend->num += num;
513
514 zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset,
515 "update trends set num=%d,value_min=" ZBX_FS_DBL ",value_avg="
516 ZBX_FS_DBL ",value_max=" ZBX_FS_DBL " where itemid=" ZBX_FS_UI64
517 " and clock=%d;\n",
518 trend->num,
519 trend->value_min.dbl,
520 trend->value_avg.dbl,
521 trend->value_max.dbl,
522 trend->itemid,
523 trend->clock);
524 }
525
526 /******************************************************************************
527 * *
528 * Function: dc_trends_update_uint *
529 * *
530 * Purpose: helper function for DCflush trends *
531 * *
532 ******************************************************************************/
dc_trends_update_uint(ZBX_DC_TREND * trend,DB_ROW row,int num,size_t * sql_offset)533 static void dc_trends_update_uint(ZBX_DC_TREND *trend, DB_ROW row, int num, size_t *sql_offset)
534 {
535 history_value_t value_min, value_avg, value_max;
536 zbx_uint128_t avg;
537
538 ZBX_STR2UINT64(value_min.ui64, row[2]);
539 ZBX_STR2UINT64(value_avg.ui64, row[3]);
540 ZBX_STR2UINT64(value_max.ui64, row[4]);
541
542 if (value_min.ui64 < trend->value_min.ui64)
543 trend->value_min.ui64 = value_min.ui64;
544 if (value_max.ui64 > trend->value_max.ui64)
545 trend->value_max.ui64 = value_max.ui64;
546
547 /* calculate the trend average value */
548 umul64_64(&avg, num, value_avg.ui64);
549 uinc128_128(&trend->value_avg.ui64, &avg);
550 udiv128_64(&avg, &trend->value_avg.ui64, trend->num + num);
551
552 trend->num += num;
553
554 zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset,
555 "update trends_uint set num=%d,value_min=" ZBX_FS_UI64 ",value_avg="
556 ZBX_FS_UI64 ",value_max=" ZBX_FS_UI64 " where itemid=" ZBX_FS_UI64
557 " and clock=%d;\n",
558 trend->num,
559 trend->value_min.ui64,
560 avg.lo,
561 trend->value_max.ui64,
562 trend->itemid,
563 trend->clock);
564 }
565
566 /******************************************************************************
567 * *
568 * Function: dc_trends_fetch_and_update *
569 * *
570 * Purpose: helper function for DCflush trends *
571 * *
572 ******************************************************************************/
dc_trends_fetch_and_update(ZBX_DC_TREND * trends,int trends_num,zbx_uint64_t * itemids,int itemids_num,int * inserts_num,unsigned char value_type,const char * table_name,int clock)573 static void dc_trends_fetch_and_update(ZBX_DC_TREND *trends, int trends_num, zbx_uint64_t *itemids,
574 int itemids_num, int *inserts_num, unsigned char value_type,
575 const char *table_name, int clock)
576 {
577
578 int i, num;
579 DB_RESULT result;
580 DB_ROW row;
581 zbx_uint64_t itemid;
582 ZBX_DC_TREND *trend;
583 size_t sql_offset;
584
585 sql_offset = 0;
586 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
587 "select itemid,num,value_min,value_avg,value_max"
588 " from %s"
589 " where clock=%d and",
590 table_name, clock);
591
592 DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", itemids, itemids_num);
593
594 result = DBselect("%s", sql);
595
596 sql_offset = 0;
597 DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset);
598
599 while (NULL != (row = DBfetch(result)))
600 {
601 ZBX_STR2UINT64(itemid, row[0]);
602
603 for (i = 0; i < trends_num; i++)
604 {
605 trend = &trends[i];
606
607 if (itemid != trend->itemid)
608 continue;
609
610 if (clock != trend->clock || value_type != trend->value_type)
611 continue;
612
613 break;
614 }
615
616 if (i == trends_num)
617 {
618 THIS_SHOULD_NEVER_HAPPEN;
619 continue;
620 }
621
622 num = atoi(row[1]);
623
624 if (value_type == ITEM_VALUE_TYPE_FLOAT)
625 dc_trends_update_float(trend, row, num, &sql_offset);
626 else
627 dc_trends_update_uint(trend, row, num, &sql_offset);
628
629 trend->itemid = 0;
630
631 --*inserts_num;
632
633 DBexecute_overflowed_sql(&sql, &sql_alloc, &sql_offset);
634 }
635
636 DBfree_result(result);
637
638 DBend_multiple_update(&sql, &sql_alloc, &sql_offset);
639
640 if (sql_offset > 16) /* In ORACLE always present begin..end; */
641 DBexecute("%s", sql);
642 }
643
644 /******************************************************************************
645 * *
646 * Function: DBflush_trends *
647 * *
648 * Purpose: flush trend to the database *
649 * *
650 * Author: Alexander Vladishev *
651 * *
652 ******************************************************************************/
DBflush_trends(ZBX_DC_TREND * trends,int * trends_num,zbx_vector_uint64_pair_t * trends_diff)653 static void DBflush_trends(ZBX_DC_TREND *trends, int *trends_num, zbx_vector_uint64_pair_t *trends_diff)
654 {
655 const char *__function_name = "DBflush_trends";
656 int num, i, clock, inserts_num = 0, itemids_alloc, itemids_num = 0, trends_to = *trends_num;
657 unsigned char value_type;
658 zbx_uint64_t *itemids = NULL;
659 ZBX_DC_TREND *trend = NULL;
660 const char *table_name;
661
662 zabbix_log(LOG_LEVEL_DEBUG, "In %s() trends_num:%d", __function_name, *trends_num);
663
664 clock = trends[0].clock;
665 value_type = trends[0].value_type;
666
667 switch (value_type)
668 {
669 case ITEM_VALUE_TYPE_FLOAT:
670 table_name = "trends";
671 break;
672 case ITEM_VALUE_TYPE_UINT64:
673 table_name = "trends_uint";
674 break;
675 default:
676 assert(0);
677 }
678
679 itemids_alloc = MIN(ZBX_HC_SYNC_MAX, *trends_num);
680 itemids = (zbx_uint64_t *)zbx_malloc(itemids, itemids_alloc * sizeof(zbx_uint64_t));
681
682 for (i = 0; i < *trends_num; i++)
683 {
684 trend = &trends[i];
685
686 if (clock != trend->clock || value_type != trend->value_type)
687 continue;
688
689 inserts_num++;
690
691 if (0 != trend->disable_from)
692 continue;
693
694 uint64_array_add(&itemids, &itemids_alloc, &itemids_num, trend->itemid, 64);
695
696 if (ZBX_HC_SYNC_MAX == itemids_num)
697 {
698 trends_to = i + 1;
699 break;
700 }
701 }
702
703 if (0 != itemids_num)
704 {
705 dc_remove_updated_trends(trends, trends_to, table_name, value_type, itemids,
706 &itemids_num, clock);
707 }
708
709 for (i = 0; i < trends_to; i++)
710 {
711 trend = &trends[i];
712
713 if (clock != trend->clock || value_type != trend->value_type)
714 continue;
715
716 if (0 != trend->disable_from && clock >= trend->disable_from)
717 continue;
718
719 uint64_array_add(&itemids, &itemids_alloc, &itemids_num, trend->itemid, 64);
720 }
721
722 if (0 != itemids_num)
723 {
724 dc_trends_fetch_and_update(trends, trends_to, itemids, itemids_num,
725 &inserts_num, value_type, table_name, clock);
726 }
727
728 zbx_free(itemids);
729
730 /* if 'trends' is not a primary trends buffer */
731 if (NULL != trends_diff)
732 {
733 /* we update it too */
734 for (i = 0; i < trends_to; i++)
735 {
736 zbx_uint64_pair_t pair;
737
738 if (0 == trends[i].itemid)
739 continue;
740
741 if (clock != trends[i].clock || value_type != trends[i].value_type)
742 continue;
743
744 if (0 == trends[i].disable_from || trends[i].disable_from > clock)
745 continue;
746
747 pair.first = trends[i].itemid;
748 pair.second = clock + SEC_PER_HOUR;
749 zbx_vector_uint64_pair_append(trends_diff, pair);
750 }
751 }
752
753 if (0 != inserts_num)
754 dc_insert_trends_in_db(trends, trends_to, value_type, table_name, clock);
755
756 /* clean trends */
757 for (i = 0, num = 0; i < *trends_num; i++)
758 {
759 if (0 == trends[i].itemid)
760 continue;
761
762 memcpy(&trends[num++], &trends[i], sizeof(ZBX_DC_TREND));
763 }
764 *trends_num = num;
765
766 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
767 }
768
769 /******************************************************************************
770 * *
771 * Function: DCflush_trend *
772 * *
773 * Purpose: move trend to the array of trends for flushing to DB *
774 * *
775 * Author: Alexander Vladishev *
776 * *
777 ******************************************************************************/
DCflush_trend(ZBX_DC_TREND * trend,ZBX_DC_TREND ** trends,int * trends_alloc,int * trends_num)778 static void DCflush_trend(ZBX_DC_TREND *trend, ZBX_DC_TREND **trends, int *trends_alloc, int *trends_num)
779 {
780 if (*trends_num == *trends_alloc)
781 {
782 *trends_alloc += 256;
783 *trends = (ZBX_DC_TREND *)zbx_realloc(*trends, *trends_alloc * sizeof(ZBX_DC_TREND));
784 }
785
786 memcpy(&(*trends)[*trends_num], trend, sizeof(ZBX_DC_TREND));
787 (*trends_num)++;
788
789 trend->clock = 0;
790 trend->num = 0;
791 memset(&trend->value_min, 0, sizeof(history_value_t));
792 memset(&trend->value_avg, 0, sizeof(value_avg_t));
793 memset(&trend->value_max, 0, sizeof(history_value_t));
794 }
795
796 /******************************************************************************
797 * *
798 * Function: DCadd_trend *
799 * *
800 * Purpose: add new value to the trends *
801 * *
802 * Author: Alexander Vladishev *
803 * *
804 ******************************************************************************/
DCadd_trend(const ZBX_DC_HISTORY * history,ZBX_DC_TREND ** trends,int * trends_alloc,int * trends_num)805 static void DCadd_trend(const ZBX_DC_HISTORY *history, ZBX_DC_TREND **trends, int *trends_alloc, int *trends_num)
806 {
807 ZBX_DC_TREND *trend = NULL;
808 int hour;
809
810 hour = history->ts.sec - history->ts.sec % SEC_PER_HOUR;
811
812 trend = DCget_trend(history->itemid);
813
814 if (trend->num > 0 && (trend->clock != hour || trend->value_type != history->value_type) &&
815 SUCCEED == zbx_history_requires_trends(trend->value_type))
816 {
817 DCflush_trend(trend, trends, trends_alloc, trends_num);
818 }
819
820 trend->value_type = history->value_type;
821 trend->clock = hour;
822
823 switch (trend->value_type)
824 {
825 case ITEM_VALUE_TYPE_FLOAT:
826 if (trend->num == 0 || history->value.dbl < trend->value_min.dbl)
827 trend->value_min.dbl = history->value.dbl;
828 if (trend->num == 0 || history->value.dbl > trend->value_max.dbl)
829 trend->value_max.dbl = history->value.dbl;
830 trend->value_avg.dbl = (trend->num * trend->value_avg.dbl
831 + history->value.dbl) / (trend->num + 1);
832 break;
833 case ITEM_VALUE_TYPE_UINT64:
834 if (trend->num == 0 || history->value.ui64 < trend->value_min.ui64)
835 trend->value_min.ui64 = history->value.ui64;
836 if (trend->num == 0 || history->value.ui64 > trend->value_max.ui64)
837 trend->value_max.ui64 = history->value.ui64;
838 uinc128_64(&trend->value_avg.ui64, history->value.ui64);
839 break;
840 }
841 trend->num++;
842 }
843
844 /******************************************************************************
845 * *
846 * Function: DCmass_update_trends *
847 * *
848 * Purpose: update trends cache and get list of trends to flush into database *
849 * *
850 * Parameters: history - array of history data *
851 * history_num - number of history structures *
852 * trends - list of trends to flush into database *
853 * trends_num - number of trends *
854 * *
855 * Author: Alexander Vladishev *
856 * *
857 ******************************************************************************/
DCmass_update_trends(const ZBX_DC_HISTORY * history,int history_num,ZBX_DC_TREND ** trends,int * trends_num)858 static void DCmass_update_trends(const ZBX_DC_HISTORY *history, int history_num, ZBX_DC_TREND **trends,
859 int *trends_num)
860 {
861 const char *__function_name = "DCmass_update_trends";
862
863 zbx_timespec_t ts;
864 int trends_alloc = 0, i, hour, seconds;
865
866 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
867
868 zbx_timespec(&ts);
869 seconds = ts.sec % SEC_PER_HOUR;
870 hour = ts.sec - seconds;
871
872 LOCK_TRENDS;
873
874 for (i = 0; i < history_num; i++)
875 {
876 const ZBX_DC_HISTORY *h = &history[i];
877
878 if (0 != (ZBX_DC_FLAGS_NOT_FOR_TRENDS & h->flags))
879 continue;
880
881 DCadd_trend(h, trends, &trends_alloc, trends_num);
882 }
883
884 if (cache->trends_last_cleanup_hour < hour && ZBX_TRENDS_CLEANUP_TIME < seconds)
885 {
886 zbx_hashset_iter_t iter;
887 ZBX_DC_TREND *trend;
888
889 zbx_hashset_iter_reset(&cache->trends, &iter);
890
891 while (NULL != (trend = (ZBX_DC_TREND *)zbx_hashset_iter_next(&iter)))
892 {
893 if (trend->clock == hour)
894 continue;
895
896 if (SUCCEED == zbx_history_requires_trends(trend->value_type))
897 DCflush_trend(trend, trends, &trends_alloc, trends_num);
898
899 zbx_hashset_iter_remove(&iter);
900 }
901
902 cache->trends_last_cleanup_hour = hour;
903 }
904
905 UNLOCK_TRENDS;
906
907 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
908 }
909
910 /******************************************************************************
911 * *
912 * Function: DBmass_update_trends *
913 * *
914 * Purpose: prepare history data using items from configuration cache *
915 * *
916 * Parameters: trends - [IN] trends from cache to be added to database *
917 * trends_num - [IN] number of trends to add to database *
918 * trends_diff - [OUT] disable_from updates *
919 * *
920 ******************************************************************************/
DBmass_update_trends(const ZBX_DC_TREND * trends,int trends_num,zbx_vector_uint64_pair_t * trends_diff)921 static void DBmass_update_trends(const ZBX_DC_TREND *trends, int trends_num,
922 zbx_vector_uint64_pair_t *trends_diff)
923 {
924 ZBX_DC_TREND *trends_tmp;
925
926 if (0 != trends_num)
927 {
928 trends_tmp = (ZBX_DC_TREND *)zbx_malloc(NULL, trends_num * sizeof(ZBX_DC_TREND));
929 memcpy(trends_tmp, trends, trends_num * sizeof(ZBX_DC_TREND));
930
931 while (0 < trends_num)
932 DBflush_trends(trends_tmp, &trends_num, trends_diff);
933
934 zbx_free(trends_tmp);
935 }
936 }
937
938 typedef struct
939 {
940 zbx_uint64_t hostid;
941 zbx_vector_ptr_t groups;
942 }
943 zbx_host_info_t;
944
945 /******************************************************************************
946 * *
947 * Function: zbx_host_info_clean *
948 * *
949 * Purpose: frees resources allocated to store host groups names *
950 * *
951 * Parameters: host_info - [IN] host information *
952 * *
953 ******************************************************************************/
zbx_host_info_clean(zbx_host_info_t * host_info)954 static void zbx_host_info_clean(zbx_host_info_t *host_info)
955 {
956 zbx_vector_ptr_clear_ext(&host_info->groups, zbx_ptr_free);
957 zbx_vector_ptr_destroy(&host_info->groups);
958 }
959
960 /******************************************************************************
961 * *
962 * Function: db_get_hosts_info_by_hostid *
963 * *
964 * Purpose: get hosts groups names *
965 * *
966 * Parameters: hosts_info - [IN/OUT] output names of host groups for a host *
967 * hostids - [IN] hosts identifiers *
968 * *
969 ******************************************************************************/
db_get_hosts_info_by_hostid(zbx_hashset_t * hosts_info,const zbx_vector_uint64_t * hostids)970 static void db_get_hosts_info_by_hostid(zbx_hashset_t *hosts_info, const zbx_vector_uint64_t *hostids)
971 {
972 int i;
973 size_t sql_offset = 0;
974 DB_RESULT result;
975 DB_ROW row;
976
977 for (i = 0; i < hostids->values_num; i++)
978 {
979 zbx_host_info_t host_info = {.hostid = hostids->values[i]};
980
981 zbx_vector_ptr_create(&host_info.groups);
982 zbx_hashset_insert(hosts_info, &host_info, sizeof(host_info));
983 }
984
985 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
986 "select distinct hg.hostid,g.name"
987 " from hstgrp g,hosts_groups hg"
988 " where g.groupid=hg.groupid"
989 " and");
990
991 DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "hg.hostid", hostids->values, hostids->values_num);
992
993 result = DBselect("%s", sql);
994
995 while (NULL != (row = DBfetch(result)))
996 {
997 zbx_uint64_t hostid;
998 zbx_host_info_t *host_info;
999
1000 ZBX_DBROW2UINT64(hostid, row[0]);
1001
1002 if (NULL == (host_info = (zbx_host_info_t *)zbx_hashset_search(hosts_info, &hostid)))
1003 {
1004 THIS_SHOULD_NEVER_HAPPEN;
1005 continue;
1006 }
1007
1008 zbx_vector_ptr_append(&host_info->groups, zbx_strdup(NULL, row[1]));
1009 }
1010 DBfree_result(result);
1011 }
1012
1013 typedef struct
1014 {
1015 zbx_uint64_t itemid;
1016 char *name;
1017 DC_ITEM *item;
1018 zbx_vector_ptr_t applications;
1019 }
1020 zbx_item_info_t;
1021
1022 /******************************************************************************
1023 * *
1024 * Function: db_get_items_info_by_itemid *
1025 * *
1026 * Purpose: get items name and applications *
1027 * *
1028 * Parameters: items_info - [IN/OUT] output item name and applications *
1029 * itemids - [IN] the item identifiers *
1030 * *
1031 ******************************************************************************/
db_get_items_info_by_itemid(zbx_hashset_t * items_info,const zbx_vector_uint64_t * itemids)1032 static void db_get_items_info_by_itemid(zbx_hashset_t *items_info, const zbx_vector_uint64_t *itemids)
1033 {
1034 size_t sql_offset = 0;
1035 DB_RESULT result;
1036 DB_ROW row;
1037
1038 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select itemid,name from items where");
1039 DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", itemids->values, itemids->values_num);
1040
1041 result = DBselect("%s", sql);
1042
1043 while (NULL != (row = DBfetch(result)))
1044 {
1045 zbx_uint64_t itemid;
1046 zbx_item_info_t *item_info;
1047
1048 ZBX_DBROW2UINT64(itemid, row[0]);
1049
1050 if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &itemid)))
1051 {
1052 THIS_SHOULD_NEVER_HAPPEN;
1053 continue;
1054 }
1055
1056 zbx_substitute_item_name_macros(item_info->item, row[1], &item_info->name);
1057 }
1058 DBfree_result(result);
1059
1060 sql_offset = 0;
1061 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
1062 "select i.itemid,a.name"
1063 " from applications a,items_applications i"
1064 " where a.applicationid=i.applicationid"
1065 " and");
1066
1067 DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "i.itemid", itemids->values, itemids->values_num);
1068
1069 result = DBselect("%s", sql);
1070
1071 while (NULL != (row = DBfetch(result)))
1072 {
1073 zbx_uint64_t itemid;
1074 zbx_item_info_t *item_info;
1075
1076 ZBX_DBROW2UINT64(itemid, row[0]);
1077
1078 if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &itemid)))
1079 {
1080 THIS_SHOULD_NEVER_HAPPEN;
1081 continue;
1082 }
1083
1084 zbx_vector_ptr_append(&item_info->applications, zbx_strdup(NULL, row[1]));
1085 }
1086 DBfree_result(result);
1087 }
1088
1089 /******************************************************************************
1090 * *
1091 * Function: zbx_item_info_clean *
1092 * *
1093 * Purpose: frees resources allocated to store item applications and name *
1094 * *
1095 * Parameters: item_info - [IN] item information *
1096 * *
1097 ******************************************************************************/
zbx_item_info_clean(zbx_item_info_t * item_info)1098 static void zbx_item_info_clean(zbx_item_info_t *item_info)
1099 {
1100 zbx_vector_ptr_clear_ext(&item_info->applications, zbx_ptr_free);
1101 zbx_vector_ptr_destroy(&item_info->applications);
1102 zbx_free(item_info->name);
1103 }
1104
1105 /******************************************************************************
1106 * *
1107 * Function: DCexport_trends *
1108 * *
1109 * Purpose: export trends *
1110 * *
1111 * Parameters: trends - [IN] trends from cache *
1112 * trends_num - [IN] number of trends *
1113 * hosts_info - [IN] hosts groups names *
1114 * items_info - [IN] item names and applications *
1115 * *
1116 ******************************************************************************/
DCexport_trends(const ZBX_DC_TREND * trends,int trends_num,zbx_hashset_t * hosts_info,zbx_hashset_t * items_info)1117 static void DCexport_trends(const ZBX_DC_TREND *trends, int trends_num, zbx_hashset_t *hosts_info,
1118 zbx_hashset_t *items_info)
1119 {
1120 struct zbx_json json;
1121 const ZBX_DC_TREND *trend = NULL;
1122 int i, j;
1123 const DC_ITEM *item;
1124 zbx_host_info_t *host_info;
1125 zbx_item_info_t *item_info;
1126 zbx_uint128_t avg; /* calculate the trend average value */
1127
1128 zbx_json_init(&json, ZBX_JSON_STAT_BUF_LEN);
1129
1130 for (i = 0; i < trends_num; i++)
1131 {
1132 trend = &trends[i];
1133
1134 if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &trend->itemid)))
1135 continue;
1136
1137 item = item_info->item;
1138
1139 if (NULL == (host_info = (zbx_host_info_t *)zbx_hashset_search(hosts_info, &item->host.hostid)))
1140 {
1141 THIS_SHOULD_NEVER_HAPPEN;
1142 continue;
1143 }
1144
1145 zbx_json_clean(&json);
1146 zbx_json_addstring(&json, ZBX_PROTO_TAG_HOST, item->host.name, ZBX_JSON_TYPE_STRING);
1147 zbx_json_addarray(&json, ZBX_PROTO_TAG_GROUPS);
1148
1149 for (j = 0; j < host_info->groups.values_num; j++)
1150 zbx_json_addstring(&json, NULL, host_info->groups.values[j], ZBX_JSON_TYPE_STRING);
1151
1152 zbx_json_close(&json);
1153
1154 zbx_json_addarray(&json, ZBX_PROTO_TAG_APPLICATIONS);
1155
1156 for (j = 0; j < item_info->applications.values_num; j++)
1157 zbx_json_addstring(&json, NULL, item_info->applications.values[j], ZBX_JSON_TYPE_STRING);
1158
1159 zbx_json_close(&json);
1160 zbx_json_adduint64(&json, ZBX_PROTO_TAG_ITEMID, item->itemid);
1161
1162 if (NULL != item_info->name)
1163 zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item_info->name, ZBX_JSON_TYPE_STRING);
1164
1165 zbx_json_addint64(&json, ZBX_PROTO_TAG_CLOCK, trend->clock);
1166 zbx_json_addint64(&json, ZBX_PROTO_TAG_COUNT, trend->num);
1167
1168 switch (trend->value_type)
1169 {
1170 case ITEM_VALUE_TYPE_FLOAT:
1171 zbx_json_addfloat(&json, ZBX_PROTO_TAG_MIN, trend->value_min.dbl);
1172 zbx_json_addfloat(&json, ZBX_PROTO_TAG_AVG, trend->value_avg.dbl);
1173 zbx_json_addfloat(&json, ZBX_PROTO_TAG_MAX, trend->value_max.dbl);
1174 break;
1175 case ITEM_VALUE_TYPE_UINT64:
1176 zbx_json_adduint64(&json, ZBX_PROTO_TAG_MIN, trend->value_min.ui64);
1177 udiv128_64(&avg, &trend->value_avg.ui64, trend->num);
1178 zbx_json_adduint64(&json, ZBX_PROTO_TAG_AVG, avg.lo);
1179 zbx_json_adduint64(&json, ZBX_PROTO_TAG_MAX, trend->value_max.ui64);
1180 break;
1181 default:
1182 THIS_SHOULD_NEVER_HAPPEN;
1183 }
1184
1185 zbx_trends_export_write(json.buffer, json.buffer_size);
1186 }
1187
1188 zbx_trends_export_flush();
1189 zbx_json_free(&json);
1190 }
1191
1192 /******************************************************************************
1193 * *
1194 * Function: DCexport_history *
1195 * *
1196 * Purpose: export history *
1197 * *
1198 * Parameters: history - [IN/OUT] array of history data *
1199 * history_num - [IN] number of history structures *
1200 * hosts_info - [IN] hosts groups names *
1201 * items_info - [IN] item names and applications *
1202 * *
1203 ******************************************************************************/
DCexport_history(const ZBX_DC_HISTORY * history,int history_num,zbx_hashset_t * hosts_info,zbx_hashset_t * items_info)1204 static void DCexport_history(const ZBX_DC_HISTORY *history, int history_num, zbx_hashset_t *hosts_info,
1205 zbx_hashset_t *items_info)
1206 {
1207 const ZBX_DC_HISTORY *h;
1208 const DC_ITEM *item;
1209 int i, j;
1210 zbx_host_info_t *host_info;
1211 zbx_item_info_t *item_info;
1212 struct zbx_json json;
1213
1214 zbx_json_init(&json, ZBX_JSON_STAT_BUF_LEN);
1215
1216 for (i = 0; i < history_num; i++)
1217 {
1218 h = &history[i];
1219
1220 if (0 != (ZBX_DC_FLAGS_NOT_FOR_MODULES & h->flags))
1221 continue;
1222
1223 if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &h->itemid)))
1224 {
1225 THIS_SHOULD_NEVER_HAPPEN;
1226 continue;
1227 }
1228
1229 item = item_info->item;
1230
1231 if (NULL == (host_info = (zbx_host_info_t *)zbx_hashset_search(hosts_info, &item->host.hostid)))
1232 {
1233 THIS_SHOULD_NEVER_HAPPEN;
1234 continue;
1235 }
1236
1237 zbx_json_clean(&json);
1238 zbx_json_addstring(&json, ZBX_PROTO_TAG_HOST, item->host.name, ZBX_JSON_TYPE_STRING);
1239 zbx_json_addarray(&json, ZBX_PROTO_TAG_GROUPS);
1240
1241 for (j = 0; j < host_info->groups.values_num; j++)
1242 zbx_json_addstring(&json, NULL, host_info->groups.values[j], ZBX_JSON_TYPE_STRING);
1243
1244 zbx_json_close(&json);
1245
1246 zbx_json_addarray(&json, ZBX_PROTO_TAG_APPLICATIONS);
1247
1248 for (j = 0; j < item_info->applications.values_num; j++)
1249 zbx_json_addstring(&json, NULL, item_info->applications.values[j], ZBX_JSON_TYPE_STRING);
1250
1251 zbx_json_close(&json);
1252 zbx_json_adduint64(&json, ZBX_PROTO_TAG_ITEMID, item->itemid);
1253
1254 if (NULL != item_info->name)
1255 zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item_info->name, ZBX_JSON_TYPE_STRING);
1256
1257 zbx_json_addint64(&json, ZBX_PROTO_TAG_CLOCK, h->ts.sec);
1258 zbx_json_addint64(&json, ZBX_PROTO_TAG_NS, h->ts.ns);
1259
1260 switch (h->value_type)
1261 {
1262 case ITEM_VALUE_TYPE_FLOAT:
1263 zbx_json_addfloat(&json, ZBX_PROTO_TAG_VALUE, h->value.dbl);
1264 break;
1265 case ITEM_VALUE_TYPE_UINT64:
1266 zbx_json_adduint64(&json, ZBX_PROTO_TAG_VALUE, h->value.ui64);
1267 break;
1268 case ITEM_VALUE_TYPE_STR:
1269 zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, h->value.str, ZBX_JSON_TYPE_STRING);
1270 break;
1271 case ITEM_VALUE_TYPE_TEXT:
1272 zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, h->value.str, ZBX_JSON_TYPE_STRING);
1273 break;
1274 case ITEM_VALUE_TYPE_LOG:
1275 zbx_json_addint64(&json, ZBX_PROTO_TAG_LOGTIMESTAMP, h->value.log->timestamp);
1276 zbx_json_addstring(&json, ZBX_PROTO_TAG_LOGSOURCE,
1277 ZBX_NULL2EMPTY_STR(h->value.log->source), ZBX_JSON_TYPE_STRING);
1278 zbx_json_addint64(&json, ZBX_PROTO_TAG_LOGSEVERITY, h->value.log->severity);
1279 zbx_json_addint64(&json, ZBX_PROTO_TAG_LOGEVENTID, h->value.log->logeventid);
1280 zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, h->value.log->value,
1281 ZBX_JSON_TYPE_STRING);
1282 break;
1283 default:
1284 THIS_SHOULD_NEVER_HAPPEN;
1285 }
1286
1287 zbx_history_export_write(json.buffer, json.buffer_size);
1288 }
1289
1290 zbx_history_export_flush();
1291 zbx_json_free(&json);
1292 }
1293
1294 /******************************************************************************
1295 * *
1296 * Function: DCexport_history_and_trends *
1297 * *
1298 * Purpose: export history and trends *
1299 * *
1300 * Parameters: history - [IN/OUT] array of history data *
1301 * history_num - [IN] number of history structures *
1302 * itemids - [IN] the item identifiers *
1303 * (used for item lookup) *
1304 * items - [IN] the items *
1305 * errcodes - [IN] item error codes *
1306 * trends - [IN] trends from cache *
1307 * trends_num - [IN] number of trends *
1308 * *
1309 ******************************************************************************/
DCexport_history_and_trends(const ZBX_DC_HISTORY * history,int history_num,const zbx_vector_uint64_t * itemids,DC_ITEM * items,const int * errcodes,const ZBX_DC_TREND * trends,int trends_num)1310 static void DCexport_history_and_trends(const ZBX_DC_HISTORY *history, int history_num,
1311 const zbx_vector_uint64_t *itemids, DC_ITEM *items, const int *errcodes, const ZBX_DC_TREND *trends,
1312 int trends_num)
1313 {
1314 const char *__function_name = "DCexport_history_and_trends";
1315 int i, index;
1316 zbx_vector_uint64_t hostids, item_info_ids;
1317 zbx_hashset_t hosts_info, items_info;
1318 DC_ITEM *item;
1319 zbx_item_info_t item_info;
1320
1321 zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d trends_num:%d", __function_name, history_num, trends_num);
1322
1323 zbx_vector_uint64_create(&hostids);
1324 zbx_vector_uint64_create(&item_info_ids);
1325 zbx_hashset_create_ext(&items_info, itemids->values_num, ZBX_DEFAULT_UINT64_HASH_FUNC,
1326 ZBX_DEFAULT_UINT64_COMPARE_FUNC, (zbx_clean_func_t)zbx_item_info_clean,
1327 ZBX_DEFAULT_MEM_MALLOC_FUNC, ZBX_DEFAULT_MEM_REALLOC_FUNC, ZBX_DEFAULT_MEM_FREE_FUNC);
1328
1329 for (i = 0; i < history_num; i++)
1330 {
1331 const ZBX_DC_HISTORY *h = &history[i];
1332
1333 if (0 != (ZBX_DC_FLAGS_NOT_FOR_EXPORT & h->flags))
1334 continue;
1335
1336 if (FAIL == (index = zbx_vector_uint64_bsearch(itemids, h->itemid, ZBX_DEFAULT_UINT64_COMPARE_FUNC)))
1337 {
1338 THIS_SHOULD_NEVER_HAPPEN;
1339 continue;
1340 }
1341
1342 if (SUCCEED != errcodes[index])
1343 continue;
1344
1345 item = &items[index];
1346
1347 zbx_vector_uint64_append(&hostids, item->host.hostid);
1348 zbx_vector_uint64_append(&item_info_ids, item->itemid);
1349
1350 item_info.itemid = item->itemid;
1351 item_info.name = NULL;
1352 item_info.item = item;
1353 zbx_vector_ptr_create(&item_info.applications);
1354 zbx_hashset_insert(&items_info, &item_info, sizeof(item_info));
1355 }
1356
1357 if (0 == history_num)
1358 {
1359 for (i = 0; i < trends_num; i++)
1360 {
1361 const ZBX_DC_TREND *trend = &trends[i];
1362
1363 if (FAIL == (index = zbx_vector_uint64_bsearch(itemids, trend->itemid,
1364 ZBX_DEFAULT_UINT64_COMPARE_FUNC)))
1365 {
1366 THIS_SHOULD_NEVER_HAPPEN;
1367 continue;
1368 }
1369
1370 if (SUCCEED != errcodes[index])
1371 continue;
1372
1373 item = &items[index];
1374
1375 zbx_vector_uint64_append(&hostids, item->host.hostid);
1376 zbx_vector_uint64_append(&item_info_ids, item->itemid);
1377
1378 item_info.itemid = item->itemid;
1379 item_info.name = NULL;
1380 item_info.item = item;
1381 zbx_vector_ptr_create(&item_info.applications);
1382 zbx_hashset_insert(&items_info, &item_info, sizeof(item_info));
1383 }
1384 }
1385
1386 if (0 == item_info_ids.values_num)
1387 goto clean;
1388
1389 zbx_vector_uint64_sort(&item_info_ids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1390 zbx_vector_uint64_sort(&hostids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1391 zbx_vector_uint64_uniq(&hostids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1392
1393 zbx_hashset_create_ext(&hosts_info, hostids.values_num, ZBX_DEFAULT_UINT64_HASH_FUNC,
1394 ZBX_DEFAULT_UINT64_COMPARE_FUNC, (zbx_clean_func_t)zbx_host_info_clean,
1395 ZBX_DEFAULT_MEM_MALLOC_FUNC, ZBX_DEFAULT_MEM_REALLOC_FUNC, ZBX_DEFAULT_MEM_FREE_FUNC);
1396
1397 db_get_hosts_info_by_hostid(&hosts_info, &hostids);
1398
1399 db_get_items_info_by_itemid(&items_info, &item_info_ids);
1400
1401 if (0 != history_num)
1402 DCexport_history(history, history_num, &hosts_info, &items_info);
1403
1404 if (0 != trends_num)
1405 DCexport_trends(trends, trends_num, &hosts_info, &items_info);
1406
1407 zbx_hashset_destroy(&hosts_info);
1408 clean:
1409 zbx_hashset_destroy(&items_info);
1410 zbx_vector_uint64_destroy(&item_info_ids);
1411 zbx_vector_uint64_destroy(&hostids);
1412
1413 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
1414 }
1415
1416 /******************************************************************************
1417 * *
1418 * Function: DCexport_all_trends *
1419 * *
1420 * Purpose: export all trends *
1421 * *
1422 * Parameters: trends - [IN] trends from cache *
1423 * trends_num - [IN] number of trends *
1424 * *
1425 ******************************************************************************/
DCexport_all_trends(const ZBX_DC_TREND * trends,int trends_num)1426 static void DCexport_all_trends(const ZBX_DC_TREND *trends, int trends_num)
1427 {
1428 DC_ITEM *items;
1429 zbx_vector_uint64_t itemids;
1430 int *errcodes, i, num;
1431
1432 zabbix_log(LOG_LEVEL_WARNING, "exporting trend data...");
1433
1434 while (0 < trends_num)
1435 {
1436 num = MIN(ZBX_HC_SYNC_MAX, trends_num);
1437
1438 items = (DC_ITEM *)zbx_malloc(NULL, sizeof(DC_ITEM) * (size_t)num);
1439 errcodes = (int *)zbx_malloc(NULL, sizeof(int) * (size_t)num);
1440
1441 zbx_vector_uint64_create(&itemids);
1442 zbx_vector_uint64_reserve(&itemids, num);
1443
1444 for (i = 0; i < num; i++)
1445 zbx_vector_uint64_append(&itemids, trends[i].itemid);
1446
1447 zbx_vector_uint64_sort(&itemids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1448
1449 DCconfig_get_items_by_itemids(items, itemids.values, errcodes, num);
1450
1451 DCexport_history_and_trends(NULL, 0, &itemids, items, errcodes, trends, num);
1452
1453 DCconfig_clean_items(items, errcodes, num);
1454 zbx_vector_uint64_destroy(&itemids);
1455 zbx_free(items);
1456 zbx_free(errcodes);
1457
1458 trends += num;
1459 trends_num -= num;
1460 }
1461
1462 zabbix_log(LOG_LEVEL_WARNING, "exporting trend data done");
1463 }
1464
1465 /******************************************************************************
1466 * *
1467 * Function: DCsync_trends *
1468 * *
1469 * Purpose: flush all trends to the database *
1470 * *
1471 * Author: Alexander Vladishev *
1472 * *
1473 ******************************************************************************/
DCsync_trends(void)1474 static void DCsync_trends(void)
1475 {
1476 const char *__function_name = "DCsync_trends";
1477 zbx_hashset_iter_t iter;
1478 ZBX_DC_TREND *trends = NULL, *trend;
1479 int trends_alloc = 0, trends_num = 0;
1480
1481 zabbix_log(LOG_LEVEL_DEBUG, "In %s() trends_num:%d", __function_name, cache->trends_num);
1482
1483 zabbix_log(LOG_LEVEL_WARNING, "syncing trend data...");
1484
1485 LOCK_TRENDS;
1486
1487 zbx_hashset_iter_reset(&cache->trends, &iter);
1488
1489 while (NULL != (trend = (ZBX_DC_TREND *)zbx_hashset_iter_next(&iter)))
1490 {
1491 if (SUCCEED == zbx_history_requires_trends(trend->value_type))
1492 DCflush_trend(trend, &trends, &trends_alloc, &trends_num);
1493 }
1494
1495 UNLOCK_TRENDS;
1496
1497 if (SUCCEED == zbx_is_export_enabled() && 0 != trends_num)
1498 DCexport_all_trends(trends, trends_num);
1499
1500 DBbegin();
1501
1502 while (trends_num > 0)
1503 DBflush_trends(trends, &trends_num, NULL);
1504
1505 DBcommit();
1506
1507 zbx_free(trends);
1508
1509 zabbix_log(LOG_LEVEL_WARNING, "syncing trend data done");
1510
1511 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
1512 }
1513
1514 /******************************************************************************
1515 * *
1516 * Function: recalculate_triggers *
1517 * *
1518 * Purpose: re-calculate and update values of triggers related to the items *
1519 * *
1520 * Parameters: history - [IN] array of history data *
1521 * history_num - [IN] number of history structures *
1522 * timer_triggerids - [IN] the timer triggerids to process *
1523 * ts - [IN] timer trigger timestamp *
1524 * trigger_diff - [OUT] trigger updates *
1525 * *
1526 ******************************************************************************/
recalculate_triggers(const ZBX_DC_HISTORY * history,int history_num,const zbx_vector_uint64_t * timer_triggerids,zbx_timespec_t * ts,zbx_vector_ptr_t * trigger_diff)1527 static void recalculate_triggers(const ZBX_DC_HISTORY *history, int history_num,
1528 const zbx_vector_uint64_t *timer_triggerids, zbx_timespec_t *ts, zbx_vector_ptr_t *trigger_diff)
1529 {
1530 const char *__function_name = "recalculate_triggers";
1531 int i, item_num = 0;
1532 zbx_uint64_t *itemids = NULL;
1533 zbx_timespec_t *timespecs = NULL;
1534 zbx_hashset_t trigger_info;
1535 zbx_vector_ptr_t trigger_order;
1536
1537 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
1538
1539 if (0 != history_num)
1540 {
1541 itemids = (zbx_uint64_t *)zbx_malloc(itemids, sizeof(zbx_uint64_t) * (size_t)history_num);
1542 timespecs = (zbx_timespec_t *)zbx_malloc(timespecs, sizeof(zbx_timespec_t) * (size_t)history_num);
1543
1544 for (i = 0; i < history_num; i++)
1545 {
1546 const ZBX_DC_HISTORY *h = &history[i];
1547
1548 if (0 != (ZBX_DC_FLAG_NOVALUE & h->flags))
1549 continue;
1550
1551 itemids[item_num] = h->itemid;
1552 timespecs[item_num] = h->ts;
1553 item_num++;
1554 }
1555 }
1556
1557 if (0 == item_num && 0 == timer_triggerids->values_num)
1558 goto out;
1559
1560 zbx_hashset_create(&trigger_info, MAX(100, 2 * item_num + timer_triggerids->values_num),
1561 ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1562
1563 zbx_vector_ptr_create(&trigger_order);
1564 zbx_vector_ptr_reserve(&trigger_order, trigger_info.num_slots);
1565
1566 if (0 != item_num)
1567 {
1568 DCconfig_get_triggers_by_itemids(&trigger_info, &trigger_order, itemids, timespecs, item_num);
1569 zbx_determine_items_in_expressions(&trigger_order, itemids, item_num);
1570 }
1571
1572 if (0 != timer_triggerids->values_num)
1573 zbx_dc_get_timer_triggers_by_triggerids(&trigger_info, &trigger_order, timer_triggerids, ts);
1574
1575 zbx_vector_ptr_sort(&trigger_order, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC);
1576 evaluate_expressions(&trigger_order);
1577 zbx_process_triggers(&trigger_order, trigger_diff);
1578
1579 DCfree_triggers(&trigger_order);
1580
1581 zbx_hashset_destroy(&trigger_info);
1582 zbx_vector_ptr_destroy(&trigger_order);
1583 out:
1584 zbx_free(timespecs);
1585 zbx_free(itemids);
1586
1587 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
1588 }
1589
DCinventory_value_add(zbx_vector_ptr_t * inventory_values,const DC_ITEM * item,ZBX_DC_HISTORY * h)1590 static void DCinventory_value_add(zbx_vector_ptr_t *inventory_values, const DC_ITEM *item, ZBX_DC_HISTORY *h)
1591 {
1592 char value[MAX_BUFFER_LEN];
1593 const char *inventory_field;
1594 zbx_inventory_value_t *inventory_value;
1595
1596 if (ITEM_STATE_NOTSUPPORTED == h->state)
1597 return;
1598
1599 if (HOST_INVENTORY_AUTOMATIC != item->host.inventory_mode)
1600 return;
1601
1602 if (0 != (ZBX_DC_FLAG_UNDEF & h->flags) || 0 != (ZBX_DC_FLAG_NOVALUE & h->flags) ||
1603 NULL == (inventory_field = DBget_inventory_field(item->inventory_link)))
1604 {
1605 return;
1606 }
1607
1608 switch (h->value_type)
1609 {
1610 case ITEM_VALUE_TYPE_FLOAT:
1611 zbx_snprintf(value, sizeof(value), ZBX_FS_DBL, h->value.dbl);
1612 break;
1613 case ITEM_VALUE_TYPE_UINT64:
1614 zbx_snprintf(value, sizeof(value), ZBX_FS_UI64, h->value.ui64);
1615 break;
1616 case ITEM_VALUE_TYPE_STR:
1617 case ITEM_VALUE_TYPE_TEXT:
1618 strscpy(value, h->value.str);
1619 break;
1620 default:
1621 return;
1622 }
1623
1624 zbx_format_value(value, sizeof(value), item->valuemapid, item->units, h->value_type);
1625
1626 inventory_value = (zbx_inventory_value_t *)zbx_malloc(NULL, sizeof(zbx_inventory_value_t));
1627
1628 inventory_value->hostid = item->host.hostid;
1629 inventory_value->idx = item->inventory_link - 1;
1630 inventory_value->field_name = inventory_field;
1631 inventory_value->value = zbx_strdup(NULL, value);
1632
1633 zbx_vector_ptr_append(inventory_values, inventory_value);
1634 }
1635
DCadd_update_inventory_sql(size_t * sql_offset,const zbx_vector_ptr_t * inventory_values)1636 static void DCadd_update_inventory_sql(size_t *sql_offset, const zbx_vector_ptr_t *inventory_values)
1637 {
1638 char *value_esc;
1639 int i;
1640
1641 for (i = 0; i < inventory_values->values_num; i++)
1642 {
1643 const zbx_inventory_value_t *inventory_value = (zbx_inventory_value_t *)inventory_values->values[i];
1644
1645 value_esc = DBdyn_escape_field("host_inventory", inventory_value->field_name, inventory_value->value);
1646
1647 zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset,
1648 "update host_inventory set %s='%s' where hostid=" ZBX_FS_UI64 ";\n",
1649 inventory_value->field_name, value_esc, inventory_value->hostid);
1650
1651 DBexecute_overflowed_sql(&sql, &sql_alloc, sql_offset);
1652
1653 zbx_free(value_esc);
1654 }
1655 }
1656
DCinventory_value_free(zbx_inventory_value_t * inventory_value)1657 static void DCinventory_value_free(zbx_inventory_value_t *inventory_value)
1658 {
1659 zbx_free(inventory_value->value);
1660 zbx_free(inventory_value);
1661 }
1662
1663 /******************************************************************************
1664 * *
1665 * Function: dc_history_clean_value *
1666 * *
1667 * Purpose: frees resources allocated to store str/text/log value *
1668 * *
1669 * Parameters: history - [IN] the history data *
1670 * history_num - [IN] the number of values in history data *
1671 * *
1672 ******************************************************************************/
dc_history_clean_value(ZBX_DC_HISTORY * history)1673 static void dc_history_clean_value(ZBX_DC_HISTORY *history)
1674 {
1675 if (ITEM_STATE_NOTSUPPORTED == history->state)
1676 {
1677 zbx_free(history->value.err);
1678 return;
1679 }
1680
1681 if (0 != (ZBX_DC_FLAG_NOVALUE & history->flags))
1682 return;
1683
1684 switch (history->value_type)
1685 {
1686 case ITEM_VALUE_TYPE_LOG:
1687 zbx_free(history->value.log->value);
1688 zbx_free(history->value.log->source);
1689 zbx_free(history->value.log);
1690 break;
1691 case ITEM_VALUE_TYPE_STR:
1692 case ITEM_VALUE_TYPE_TEXT:
1693 zbx_free(history->value.str);
1694 break;
1695 }
1696 }
1697
1698 /******************************************************************************
1699 * *
1700 * Function: hc_free_item_values *
1701 * *
1702 * Purpose: frees resources allocated to store str/text/log values *
1703 * *
1704 * Parameters: history - [IN] the history data *
1705 * history_num - [IN] the number of values in history data *
1706 * *
1707 ******************************************************************************/
hc_free_item_values(ZBX_DC_HISTORY * history,int history_num)1708 static void hc_free_item_values(ZBX_DC_HISTORY *history, int history_num)
1709 {
1710 int i;
1711
1712 for (i = 0; i < history_num; i++)
1713 dc_history_clean_value(&history[i]);
1714 }
1715
1716 /******************************************************************************
1717 * *
1718 * Function: dc_history_set_error *
1719 * *
1720 * Purpose: sets history data to notsupported *
1721 * *
1722 * Parameters: history - [IN] the history data *
1723 * errmsg - [IN] the error message *
1724 * *
1725 * Comments: The error message is stored directly and freed with when history *
1726 * data is cleaned. *
1727 * *
1728 ******************************************************************************/
dc_history_set_error(ZBX_DC_HISTORY * hdata,char * errmsg)1729 static void dc_history_set_error(ZBX_DC_HISTORY *hdata, char *errmsg)
1730 {
1731 dc_history_clean_value(hdata);
1732 hdata->value.err = errmsg;
1733 hdata->state = ITEM_STATE_NOTSUPPORTED;
1734 hdata->flags |= ZBX_DC_FLAG_UNDEF;
1735 }
1736
1737 /******************************************************************************
1738 * *
1739 * Function: dc_history_set_value *
1740 * *
1741 * Purpose: sets history data value *
1742 * *
1743 * Parameters: hdata - [IN/OUT] the history data *
1744 * value_type - [IN] the item value type *
1745 * value - [IN] the value to set *
1746 * *
1747 * Return value: SUCCEED - Value conversion was successful. *
1748 * FAIL - Otherwise *
1749 * *
1750 ******************************************************************************/
dc_history_set_value(ZBX_DC_HISTORY * hdata,unsigned char value_type,zbx_variant_t * value)1751 static int dc_history_set_value(ZBX_DC_HISTORY *hdata, unsigned char value_type, zbx_variant_t *value)
1752 {
1753 int ret;
1754 char *errmsg = NULL;
1755
1756 switch (value_type)
1757 {
1758 case ITEM_VALUE_TYPE_FLOAT:
1759 if (SUCCEED == (ret = zbx_variant_convert(value, ZBX_VARIANT_DBL)))
1760 {
1761 if (FAIL == (ret = zbx_validate_value_dbl(value->data.dbl)))
1762 {
1763 errmsg = zbx_dsprintf(NULL, "Value " ZBX_FS_DBL " is too small or too large.",
1764 value->data.dbl);
1765 }
1766 }
1767 break;
1768 case ITEM_VALUE_TYPE_UINT64:
1769 ret = zbx_variant_convert(value, ZBX_VARIANT_UI64);
1770 break;
1771 case ITEM_VALUE_TYPE_STR:
1772 case ITEM_VALUE_TYPE_TEXT:
1773 case ITEM_VALUE_TYPE_LOG:
1774 ret = zbx_variant_convert(value, ZBX_VARIANT_STR);
1775 break;
1776 default:
1777 THIS_SHOULD_NEVER_HAPPEN;
1778 return FAIL;
1779 }
1780
1781 if (FAIL == ret)
1782 {
1783 if (NULL == errmsg)
1784 {
1785 errmsg = zbx_dsprintf(NULL, "Value \"%s\" of type \"%s\" is not suitable for"
1786 " value type \"%s\"", zbx_variant_value_desc(value),
1787 zbx_variant_type_desc(value), zbx_item_value_type_string(value_type));
1788 }
1789
1790 dc_history_set_error(hdata, errmsg);
1791 return FAIL;
1792 }
1793
1794 switch (value_type)
1795 {
1796 case ITEM_VALUE_TYPE_FLOAT:
1797 dc_history_clean_value(hdata);
1798 hdata->value.dbl = value->data.dbl;
1799 break;
1800 case ITEM_VALUE_TYPE_UINT64:
1801 dc_history_clean_value(hdata);
1802 hdata->value.ui64 = value->data.ui64;
1803 break;
1804 case ITEM_VALUE_TYPE_STR:
1805 dc_history_clean_value(hdata);
1806 hdata->value.str = value->data.str;
1807 hdata->value.str[zbx_db_strlen_n(hdata->value.str, HISTORY_STR_VALUE_LEN)] = '\0';
1808 break;
1809 case ITEM_VALUE_TYPE_TEXT:
1810 dc_history_clean_value(hdata);
1811 hdata->value.str = value->data.str;
1812 hdata->value.str[zbx_db_strlen_n(hdata->value.str, HISTORY_TEXT_VALUE_LEN)] = '\0';
1813 break;
1814 case ITEM_VALUE_TYPE_LOG:
1815 if (ITEM_VALUE_TYPE_LOG != hdata->value_type)
1816 {
1817 dc_history_clean_value(hdata);
1818 hdata->value.log = (zbx_log_value_t *)zbx_malloc(NULL, sizeof(zbx_log_value_t));
1819 memset(hdata->value.log, 0, sizeof(zbx_log_value_t));
1820 }
1821 hdata->value.log->value = value->data.str;
1822 hdata->value.str[zbx_db_strlen_n(hdata->value.str, HISTORY_LOG_VALUE_LEN)] = '\0';
1823 }
1824
1825 hdata->value_type = value_type;
1826 zbx_variant_set_none(value);
1827
1828 return ret;
1829 }
1830
1831 /******************************************************************************
1832 * *
1833 * Function: normalize_item_value *
1834 * *
1835 * Purpose: normalize item value by performing truncation of long text *
1836 * values and changes value format according to the item value type *
1837 * *
1838 * Parameters: item - [IN] the item *
1839 * hdata - [IN/OUT] the historical data to process *
1840 * *
1841 * Return value: SUCCEED - Normalization was successful. *
1842 * FAIL - Otherwise - ZBX_DC_FLAG_UNDEF will be set and item *
1843 * state changed to ZBX_NOTSUPPORTED. *
1844 * *
1845 ******************************************************************************/
normalize_item_value(const DC_ITEM * item,ZBX_DC_HISTORY * hdata)1846 static int normalize_item_value(const DC_ITEM *item, ZBX_DC_HISTORY *hdata)
1847 {
1848 int ret = FAIL;
1849 char *logvalue;
1850 zbx_variant_t value_var;
1851
1852 if (0 != (hdata->flags & ZBX_DC_FLAG_NOVALUE))
1853 {
1854 ret = SUCCEED;
1855 goto out;
1856 }
1857
1858 if (ITEM_STATE_NOTSUPPORTED == hdata->state)
1859 goto out;
1860
1861 if (0 == (hdata->flags & ZBX_DC_FLAG_NOHISTORY))
1862 hdata->ttl = item->history_sec;
1863
1864 if (item->value_type == hdata->value_type)
1865 {
1866 /* truncate text based values if necessary */
1867 switch (hdata->value_type)
1868 {
1869 case ITEM_VALUE_TYPE_STR:
1870 hdata->value.str[zbx_db_strlen_n(hdata->value.str, HISTORY_STR_VALUE_LEN)] = '\0';
1871 break;
1872 case ITEM_VALUE_TYPE_TEXT:
1873 hdata->value.str[zbx_db_strlen_n(hdata->value.str, HISTORY_TEXT_VALUE_LEN)] = '\0';
1874 break;
1875 case ITEM_VALUE_TYPE_LOG:
1876 logvalue = hdata->value.log->value;
1877 logvalue[zbx_db_strlen_n(logvalue, HISTORY_LOG_VALUE_LEN)] = '\0';
1878 break;
1879 case ITEM_VALUE_TYPE_FLOAT:
1880 if (FAIL == zbx_validate_value_dbl(hdata->value.dbl))
1881 {
1882 dc_history_set_error(hdata, zbx_dsprintf(NULL, "Value " ZBX_FS_DBL
1883 " is too small or too large.", hdata->value.dbl));
1884 return FAIL;
1885 }
1886 break;
1887 }
1888 return SUCCEED;
1889 }
1890
1891 switch (hdata->value_type)
1892 {
1893 case ITEM_VALUE_TYPE_FLOAT:
1894 zbx_variant_set_dbl(&value_var, hdata->value.dbl);
1895 break;
1896 case ITEM_VALUE_TYPE_UINT64:
1897 zbx_variant_set_ui64(&value_var, hdata->value.ui64);
1898 break;
1899 case ITEM_VALUE_TYPE_STR:
1900 case ITEM_VALUE_TYPE_TEXT:
1901 zbx_variant_set_str(&value_var, hdata->value.str);
1902 hdata->value.str = NULL;
1903 break;
1904 case ITEM_VALUE_TYPE_LOG:
1905 zbx_variant_set_str(&value_var, hdata->value.log->value);
1906 hdata->value.log->value = NULL;
1907 break;
1908 }
1909
1910 ret = dc_history_set_value(hdata, item->value_type, &value_var);
1911 zbx_variant_clear(&value_var);
1912 out:
1913 return ret;
1914 }
1915
1916 /******************************************************************************
1917 * *
1918 * Function: calculate_item_update *
1919 * *
1920 * Purpose: calculates what item fields must be updated *
1921 * *
1922 * Parameters: item - [IN] the item *
1923 * h - [IN] the historical data to process *
1924 * *
1925 * Return value: The update data. This data must be freed by the caller. *
1926 * *
1927 * Comments: Will generate internal events when item state switches. *
1928 * *
1929 ******************************************************************************/
calculate_item_update(const DC_ITEM * item,const ZBX_DC_HISTORY * h)1930 static zbx_item_diff_t *calculate_item_update(const DC_ITEM *item, const ZBX_DC_HISTORY *h)
1931 {
1932 zbx_uint64_t flags = ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTCLOCK;
1933 const char *item_error = NULL;
1934 zbx_item_diff_t *diff;
1935 int object;
1936
1937 if (0 != (ZBX_DC_FLAG_META & h->flags))
1938 {
1939 if (item->lastlogsize != h->lastlogsize)
1940 flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE;
1941
1942 if (item->mtime != h->mtime)
1943 flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME;
1944 }
1945
1946 if (h->state != item->state)
1947 {
1948 flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE;
1949
1950 if (ITEM_STATE_NOTSUPPORTED == h->state)
1951 {
1952 zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" became not supported: %s",
1953 item->host.host, item->key_orig, h->value.str);
1954
1955 object = (0 != (ZBX_FLAG_DISCOVERY_RULE & item->flags) ?
1956 EVENT_OBJECT_LLDRULE : EVENT_OBJECT_ITEM);
1957
1958 zbx_add_event(EVENT_SOURCE_INTERNAL, object, item->itemid, &h->ts, h->state, NULL, NULL, NULL,
1959 0, 0, NULL, 0, NULL, 0, h->value.err);
1960
1961 if (0 != strcmp(item->error, h->value.err))
1962 item_error = h->value.err;
1963 }
1964 else
1965 {
1966 zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" became supported",
1967 item->host.host, item->key_orig);
1968
1969 /* we know it's EVENT_OBJECT_ITEM because LLDRULE that becomes */
1970 /* supported is handled in lld_process_discovery_rule() */
1971 zbx_add_event(EVENT_SOURCE_INTERNAL, EVENT_OBJECT_ITEM, item->itemid, &h->ts, h->state,
1972 NULL, NULL, NULL, 0, 0, NULL, 0, NULL, 0, NULL);
1973
1974 item_error = "";
1975 }
1976 }
1977 else if (ITEM_STATE_NOTSUPPORTED == h->state && 0 != strcmp(item->error, h->value.err))
1978 {
1979 zabbix_log(LOG_LEVEL_WARNING, "error reason for \"%s:%s\" changed: %s", item->host.host,
1980 item->key_orig, h->value.err);
1981
1982 item_error = h->value.err;
1983 }
1984
1985 if (NULL != item_error)
1986 flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_ERROR;
1987
1988 diff = (zbx_item_diff_t *)zbx_malloc(NULL, sizeof(zbx_item_diff_t));
1989 diff->itemid = item->itemid;
1990 diff->lastclock = h->ts.sec;
1991 diff->flags = flags;
1992
1993 if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE & flags))
1994 diff->lastlogsize = h->lastlogsize;
1995
1996 if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME & flags))
1997 diff->mtime = h->mtime;
1998
1999 if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE & flags))
2000 diff->state = h->state;
2001
2002 if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_ERROR & flags))
2003 diff->error = item_error;
2004
2005 return diff;
2006 }
2007
2008 /******************************************************************************
2009 * *
2010 * Function: db_save_item_changes *
2011 * *
2012 * Purpose: save item state, error, mtime, lastlogsize changes to *
2013 * database *
2014 * *
2015 ******************************************************************************/
db_save_item_changes(size_t * sql_offset,const zbx_vector_ptr_t * item_diff)2016 static void db_save_item_changes(size_t *sql_offset, const zbx_vector_ptr_t *item_diff)
2017 {
2018 int i;
2019 const zbx_item_diff_t *diff;
2020 char *value_esc;
2021
2022 for (i = 0; i < item_diff->values_num; i++)
2023 {
2024 char delim = ' ';
2025
2026 diff = (const zbx_item_diff_t *)item_diff->values[i];
2027
2028 if (0 == (ZBX_FLAGS_ITEM_DIFF_UPDATE_DB & diff->flags))
2029 continue;
2030
2031 zbx_strcpy_alloc(&sql, &sql_alloc, sql_offset, "update items set");
2032
2033 if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE & diff->flags))
2034 {
2035 zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset, "%clastlogsize=" ZBX_FS_UI64, delim,
2036 diff->lastlogsize);
2037 delim = ',';
2038 }
2039
2040 if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME & diff->flags))
2041 {
2042 zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset, "%cmtime=%d", delim, diff->mtime);
2043 delim = ',';
2044 }
2045
2046 if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE & diff->flags))
2047 {
2048 zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset, "%cstate=%d", delim, (int)diff->state);
2049 delim = ',';
2050 }
2051
2052 if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_ERROR & diff->flags))
2053 {
2054 value_esc = DBdyn_escape_field("items", "error", diff->error);
2055 zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset, "%cerror='%s'", delim, value_esc);
2056 zbx_free(value_esc);
2057 }
2058
2059 zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset, " where itemid=" ZBX_FS_UI64 ";\n", diff->itemid);
2060
2061 DBexecute_overflowed_sql(&sql, &sql_alloc, sql_offset);
2062 }
2063 }
2064
2065 /******************************************************************************
2066 * *
2067 * Function: DBmass_update_items *
2068 * *
2069 * Purpose: update item data and inventory in database *
2070 * *
2071 * Parameters: item_diff - item changes *
2072 * inventory_values - inventory values *
2073 * *
2074 ******************************************************************************/
DBmass_update_items(const zbx_vector_ptr_t * item_diff,const zbx_vector_ptr_t * inventory_values)2075 static void DBmass_update_items(const zbx_vector_ptr_t *item_diff, const zbx_vector_ptr_t *inventory_values)
2076 {
2077 const char *__function_name = "DBmass_update_items";
2078
2079 size_t sql_offset = 0;
2080 int i;
2081
2082 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
2083
2084 for (i = 0; i < item_diff->values_num; i++)
2085 {
2086 zbx_item_diff_t *diff;
2087
2088 diff = (zbx_item_diff_t *)item_diff->values[i];
2089 if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_DB & diff->flags))
2090 break;
2091 }
2092
2093 if (i != item_diff->values_num || 0 != inventory_values->values_num)
2094 {
2095 DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset);
2096
2097 if (i != item_diff->values_num)
2098 db_save_item_changes(&sql_offset, item_diff);
2099
2100 if (0 != inventory_values->values_num)
2101 DCadd_update_inventory_sql(&sql_offset, inventory_values);
2102
2103 DBend_multiple_update(&sql, &sql_alloc, &sql_offset);
2104
2105 if (sql_offset > 16) /* In ORACLE always present begin..end; */
2106 DBexecute("%s", sql);
2107
2108 DCconfig_update_inventory_values(inventory_values);
2109 }
2110
2111 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
2112 }
2113
2114 /******************************************************************************
2115 * *
2116 * Function: DCmass_proxy_update_items *
2117 * *
2118 * Purpose: update items info after new value is received *
2119 * *
2120 * Parameters: history - array of history data *
2121 * history_num - number of history structures *
2122 * *
2123 * Author: Alexei Vladishev, Eugene Grigorjev, Alexander Vladishev *
2124 * *
2125 ******************************************************************************/
DCmass_proxy_update_items(ZBX_DC_HISTORY * history,int history_num)2126 static void DCmass_proxy_update_items(ZBX_DC_HISTORY *history, int history_num)
2127 {
2128 const char *__function_name = "DCmass_proxy_update_items";
2129
2130 size_t sql_offset = 0;
2131 int i;
2132 zbx_vector_ptr_t item_diff;
2133 zbx_item_diff_t *diffs;
2134
2135 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
2136
2137 zbx_vector_ptr_create(&item_diff);
2138 zbx_vector_ptr_reserve(&item_diff, history_num);
2139
2140 /* preallocate zbx_item_diff_t structures for item_diff vector */
2141 diffs = (zbx_item_diff_t *)zbx_malloc(NULL, sizeof(zbx_item_diff_t) * history_num);
2142
2143 DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset);
2144
2145 for (i = 0; i < history_num; i++)
2146 {
2147 zbx_item_diff_t *diff = &diffs[i];
2148
2149 diff->itemid = history[i].itemid;
2150 diff->state = history[i].state;
2151 diff->lastclock = history[i].ts.sec;
2152 diff->flags = ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE | ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTCLOCK;
2153
2154 if (0 != (ZBX_DC_FLAG_META & history[i].flags))
2155 {
2156 diff->lastlogsize = history[i].lastlogsize;
2157 diff->mtime = history[i].mtime;
2158 diff->flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE | ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME;
2159 }
2160
2161 zbx_vector_ptr_append(&item_diff, diff);
2162
2163 if (ITEM_STATE_NOTSUPPORTED == history[i].state)
2164 continue;
2165
2166 if (0 == (ZBX_DC_FLAG_META & history[i].flags))
2167 continue;
2168
2169 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
2170 "update items"
2171 " set lastlogsize=" ZBX_FS_UI64
2172 ",mtime=%d"
2173 " where itemid=" ZBX_FS_UI64 ";\n",
2174 history[i].lastlogsize, history[i].mtime, history[i].itemid);
2175
2176 DBexecute_overflowed_sql(&sql, &sql_alloc, &sql_offset);
2177 }
2178
2179 DBend_multiple_update(&sql, &sql_alloc, &sql_offset);
2180
2181 if (sql_offset > 16) /* In ORACLE always present begin..end; */
2182 DBexecute("%s", sql);
2183
2184 if (0 != item_diff.values_num)
2185 DCconfig_items_apply_changes(&item_diff);
2186
2187 zbx_vector_ptr_destroy(&item_diff);
2188 zbx_free(diffs);
2189
2190 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
2191 }
2192
2193 /******************************************************************************
2194 * *
2195 * Function: DBmass_add_history *
2196 * *
2197 * Purpose: inserting new history data after new value is received *
2198 * *
2199 * Parameters: history - array of history data *
2200 * history_num - number of history structures *
2201 * *
2202 ******************************************************************************/
DBmass_add_history(ZBX_DC_HISTORY * history,int history_num)2203 static int DBmass_add_history(ZBX_DC_HISTORY *history, int history_num)
2204 {
2205 const char *__function_name = "DBmass_add_history";
2206
2207 int i, ret = SUCCEED;
2208 zbx_vector_ptr_t history_values;
2209
2210 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
2211
2212 zbx_vector_ptr_create(&history_values);
2213 zbx_vector_ptr_reserve(&history_values, history_num);
2214
2215 for (i = 0; i < history_num; i++)
2216 {
2217 ZBX_DC_HISTORY *h = &history[i];
2218
2219 if (0 != (ZBX_DC_FLAGS_NOT_FOR_HISTORY & h->flags))
2220 continue;
2221
2222 zbx_vector_ptr_append(&history_values, h);
2223 }
2224
2225 if (0 != history_values.values_num)
2226 ret = zbx_vc_add_values(&history_values);
2227
2228 zbx_vector_ptr_destroy(&history_values);
2229
2230 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
2231
2232 return ret;
2233 }
2234
2235 /******************************************************************************
2236 * *
2237 * Function: dc_add_proxy_history *
2238 * *
2239 * Purpose: helper function for DCmass_proxy_add_history() *
2240 * *
2241 * Comment: this function is meant for items with value_type other other than *
2242 * ITEM_VALUE_TYPE_LOG not containing meta information in result *
2243 * *
2244 ******************************************************************************/
dc_add_proxy_history(ZBX_DC_HISTORY * history,int history_num)2245 static void dc_add_proxy_history(ZBX_DC_HISTORY *history, int history_num)
2246 {
2247 int i;
2248 char buffer[64], *pvalue;
2249 zbx_db_insert_t db_insert;
2250
2251 zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "value", NULL);
2252
2253 for (i = 0; i < history_num; i++)
2254 {
2255 const ZBX_DC_HISTORY *h = &history[i];
2256
2257 if (0 != (h->flags & ZBX_DC_FLAG_UNDEF))
2258 continue;
2259
2260 if (0 != (h->flags & ZBX_DC_FLAG_META))
2261 continue;
2262
2263 if (ITEM_STATE_NOTSUPPORTED == h->state)
2264 continue;
2265
2266 switch (h->value_type)
2267 {
2268 case ITEM_VALUE_TYPE_FLOAT:
2269 zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_DBL, h->value.dbl);
2270 break;
2271 case ITEM_VALUE_TYPE_UINT64:
2272 zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_UI64, h->value.ui64);
2273 break;
2274 case ITEM_VALUE_TYPE_STR:
2275 case ITEM_VALUE_TYPE_TEXT:
2276 pvalue = h->value.str;
2277 break;
2278 default:
2279 continue;
2280 }
2281
2282 zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, pvalue);
2283 }
2284
2285 zbx_db_insert_execute(&db_insert);
2286 zbx_db_insert_clean(&db_insert);
2287 }
2288
2289 /******************************************************************************
2290 * *
2291 * Function: dc_add_proxy_history_meta *
2292 * *
2293 * Purpose: helper function for DCmass_proxy_add_history() *
2294 * *
2295 * Comment: this function is meant for items with value_type other other than *
2296 * ITEM_VALUE_TYPE_LOG containing meta information in result *
2297 * *
2298 ******************************************************************************/
dc_add_proxy_history_meta(ZBX_DC_HISTORY * history,int history_num)2299 static void dc_add_proxy_history_meta(ZBX_DC_HISTORY *history, int history_num)
2300 {
2301 int i;
2302 char buffer[64], *pvalue;
2303 zbx_db_insert_t db_insert;
2304
2305 zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "value", "lastlogsize", "mtime",
2306 "flags", NULL);
2307
2308 for (i = 0; i < history_num; i++)
2309 {
2310 unsigned int flags = PROXY_HISTORY_FLAG_META;
2311 const ZBX_DC_HISTORY *h = &history[i];
2312
2313 if (ITEM_STATE_NOTSUPPORTED == h->state)
2314 continue;
2315
2316 if (0 != (h->flags & ZBX_DC_FLAG_UNDEF))
2317 continue;
2318
2319 if (0 == (h->flags & ZBX_DC_FLAG_META))
2320 continue;
2321
2322 if (ITEM_VALUE_TYPE_LOG == h->value_type)
2323 continue;
2324
2325 if (0 == (h->flags & ZBX_DC_FLAG_NOVALUE))
2326 {
2327 switch (h->value_type)
2328 {
2329 case ITEM_VALUE_TYPE_FLOAT:
2330 zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_DBL, h->value.dbl);
2331 break;
2332 case ITEM_VALUE_TYPE_UINT64:
2333 zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_UI64, h->value.ui64);
2334 break;
2335 case ITEM_VALUE_TYPE_STR:
2336 case ITEM_VALUE_TYPE_TEXT:
2337 pvalue = h->value.str;
2338 break;
2339 default:
2340 THIS_SHOULD_NEVER_HAPPEN;
2341 continue;
2342 }
2343 }
2344 else
2345 {
2346 flags |= PROXY_HISTORY_FLAG_NOVALUE;
2347 pvalue = (char *)"";
2348 }
2349
2350 zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, pvalue, h->lastlogsize, h->mtime,
2351 flags);
2352 }
2353
2354 zbx_db_insert_execute(&db_insert);
2355 zbx_db_insert_clean(&db_insert);
2356 }
2357
2358 /******************************************************************************
2359 * *
2360 * Function: dc_add_proxy_history_log *
2361 * *
2362 * Purpose: helper function for DCmass_proxy_add_history() *
2363 * *
2364 * Comment: this function is meant for items with value_type *
2365 * ITEM_VALUE_TYPE_LOG *
2366 * *
2367 ******************************************************************************/
dc_add_proxy_history_log(ZBX_DC_HISTORY * history,int history_num)2368 static void dc_add_proxy_history_log(ZBX_DC_HISTORY *history, int history_num)
2369 {
2370 int i;
2371 zbx_db_insert_t db_insert;
2372
2373 /* see hc_copy_history_data() for fields that might be uninitialized and need special handling here */
2374 zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "timestamp", "source", "severity",
2375 "value", "logeventid", "lastlogsize", "mtime", "flags", NULL);
2376
2377 for (i = 0; i < history_num; i++)
2378 {
2379 unsigned int flags;
2380 zbx_uint64_t lastlogsize;
2381 int mtime;
2382 const ZBX_DC_HISTORY *h = &history[i];
2383
2384 if (ITEM_STATE_NOTSUPPORTED == h->state)
2385 continue;
2386
2387 if (ITEM_VALUE_TYPE_LOG != h->value_type)
2388 continue;
2389
2390 if (0 == (h->flags & ZBX_DC_FLAG_NOVALUE))
2391 {
2392 zbx_log_value_t *log = h->value.log;
2393
2394 if (0 != (h->flags & ZBX_DC_FLAG_META))
2395 {
2396 flags = PROXY_HISTORY_FLAG_META;
2397 lastlogsize = h->lastlogsize;
2398 mtime = h->mtime;
2399 }
2400 else
2401 {
2402 flags = 0;
2403 lastlogsize = 0;
2404 mtime = 0;
2405 }
2406
2407 zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, log->timestamp,
2408 ZBX_NULL2EMPTY_STR(log->source), log->severity, log->value, log->logeventid,
2409 lastlogsize, mtime, flags);
2410 }
2411 else
2412 {
2413 /* sent to server only if not 0, see proxy_get_history_data() */
2414 const int unset_if_novalue = 0;
2415
2416 flags = PROXY_HISTORY_FLAG_META | PROXY_HISTORY_FLAG_NOVALUE;
2417
2418 zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, unset_if_novalue, "",
2419 unset_if_novalue, "", unset_if_novalue, h->lastlogsize, h->mtime, flags);
2420 }
2421 }
2422
2423 zbx_db_insert_execute(&db_insert);
2424 zbx_db_insert_clean(&db_insert);
2425 }
2426
2427 /******************************************************************************
2428 * *
2429 * Function: dc_add_proxy_history_notsupported *
2430 * *
2431 * Purpose: helper function for DCmass_proxy_add_history() *
2432 * *
2433 ******************************************************************************/
dc_add_proxy_history_notsupported(ZBX_DC_HISTORY * history,int history_num)2434 static void dc_add_proxy_history_notsupported(ZBX_DC_HISTORY *history, int history_num)
2435 {
2436 int i;
2437 zbx_db_insert_t db_insert;
2438
2439 zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "value", "state", NULL);
2440
2441 for (i = 0; i < history_num; i++)
2442 {
2443 const ZBX_DC_HISTORY *h = &history[i];
2444
2445 if (ITEM_STATE_NOTSUPPORTED != h->state)
2446 continue;
2447
2448 zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, ZBX_NULL2EMPTY_STR(h->value.err),
2449 (int)h->state);
2450 }
2451
2452 zbx_db_insert_execute(&db_insert);
2453 zbx_db_insert_clean(&db_insert);
2454 }
2455
2456 /******************************************************************************
2457 * *
2458 * Function: DCmass_proxy_add_history *
2459 * *
2460 * Purpose: inserting new history data after new value is received *
2461 * *
2462 * Parameters: history - array of history data *
2463 * history_num - number of history structures *
2464 * *
2465 * Author: Alexander Vladishev *
2466 * *
2467 ******************************************************************************/
DCmass_proxy_add_history(ZBX_DC_HISTORY * history,int history_num)2468 static void DCmass_proxy_add_history(ZBX_DC_HISTORY *history, int history_num)
2469 {
2470 const char *__function_name = "DCmass_proxy_add_history";
2471 int i, h_num = 0, h_meta_num = 0, hlog_num = 0, notsupported_num = 0;
2472
2473 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
2474
2475 for (i = 0; i < history_num; i++)
2476 {
2477 const ZBX_DC_HISTORY *h = &history[i];
2478
2479 if (ITEM_STATE_NOTSUPPORTED == h->state)
2480 {
2481 notsupported_num++;
2482 continue;
2483 }
2484
2485 switch (h->value_type)
2486 {
2487 case ITEM_VALUE_TYPE_LOG:
2488 hlog_num++;
2489 break;
2490 case ITEM_VALUE_TYPE_FLOAT:
2491 case ITEM_VALUE_TYPE_UINT64:
2492 case ITEM_VALUE_TYPE_STR:
2493 case ITEM_VALUE_TYPE_TEXT:
2494 if (0 != (h->flags & ZBX_DC_FLAG_META))
2495 h_meta_num++;
2496 else
2497 h_num++;
2498 break;
2499 default:
2500 THIS_SHOULD_NEVER_HAPPEN;
2501 }
2502 }
2503
2504 if (0 != h_num)
2505 dc_add_proxy_history(history, history_num);
2506
2507 if (0 != h_meta_num)
2508 dc_add_proxy_history_meta(history, history_num);
2509
2510 if (0 != hlog_num)
2511 dc_add_proxy_history_log(history, history_num);
2512
2513 if (0 != notsupported_num)
2514 dc_add_proxy_history_notsupported(history, history_num);
2515
2516 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
2517 }
2518
2519 /******************************************************************************
2520 * *
2521 * Function: DCmass_prepare_history *
2522 * *
2523 * Purpose: prepare history data using items from configuration cache and *
2524 * generate item changes to be applied and host inventory values to *
2525 * be added *
2526 * *
2527 * Parameters: history - [IN/OUT] array of history data *
2528 * itemids - [IN] the item identifiers *
2529 * (used for item lookup) *
2530 * items - [IN] the items *
2531 * errcodes - [IN] item error codes *
2532 * history_num - [IN] number of history structures *
2533 * item_diff - [OUT] the changes in item data *
2534 * inventory_values - [OUT] the inventory values to add *
2535 * *
2536 ******************************************************************************/
DCmass_prepare_history(ZBX_DC_HISTORY * history,const zbx_vector_uint64_t * itemids,const DC_ITEM * items,const int * errcodes,int history_num,zbx_vector_ptr_t * item_diff,zbx_vector_ptr_t * inventory_values)2537 static void DCmass_prepare_history(ZBX_DC_HISTORY *history, const zbx_vector_uint64_t *itemids,
2538 const DC_ITEM *items, const int *errcodes, int history_num, zbx_vector_ptr_t *item_diff,
2539 zbx_vector_ptr_t *inventory_values)
2540 {
2541 const char *__function_name = "DCmass_prepare_history";
2542 int i;
2543 time_t now;
2544
2545 zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d", __function_name, history_num);
2546
2547 now = time(NULL);
2548
2549 for (i = 0; i < history_num; i++)
2550 {
2551 ZBX_DC_HISTORY *h = &history[i];
2552 const DC_ITEM *item;
2553 zbx_item_diff_t *diff;
2554 int index;
2555
2556 if (FAIL == (index = zbx_vector_uint64_bsearch(itemids, h->itemid, ZBX_DEFAULT_UINT64_COMPARE_FUNC)))
2557 {
2558 THIS_SHOULD_NEVER_HAPPEN;
2559 h->flags |= ZBX_DC_FLAG_UNDEF;
2560 continue;
2561 }
2562
2563 if (SUCCEED != errcodes[index])
2564 {
2565 h->flags |= ZBX_DC_FLAG_UNDEF;
2566 continue;
2567 }
2568
2569 item = &items[index];
2570
2571 if (ITEM_STATUS_ACTIVE != item->status || HOST_STATUS_MONITORED != item->host.status)
2572 {
2573 h->flags |= ZBX_DC_FLAG_UNDEF;
2574 continue;
2575 }
2576
2577 if (0 == item->history)
2578 {
2579 h->flags |= ZBX_DC_FLAG_NOHISTORY;
2580 }
2581 else if (now - h->ts.sec > item->history_sec)
2582 {
2583 h->flags |= ZBX_DC_FLAG_NOHISTORY;
2584 zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" value timestamp \"%s %s\" is outside history "
2585 "storage period", item->host.host, item->key_orig, zbx_date2str(h->ts.sec),
2586 zbx_time2str(h->ts.sec));
2587 }
2588
2589 if (ITEM_VALUE_TYPE_FLOAT == item->value_type || ITEM_VALUE_TYPE_UINT64 == item->value_type)
2590 {
2591 if (0 == item->trends)
2592 {
2593 h->flags |= ZBX_DC_FLAG_NOTRENDS;
2594 }
2595 else if (now - h->ts.sec > item->trends_sec)
2596 {
2597 h->flags |= ZBX_DC_FLAG_NOTRENDS;
2598 zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" value timestamp \"%s %s\" is outside "
2599 "trends storage period", item->host.host, item->key_orig,
2600 zbx_date2str(h->ts.sec), zbx_time2str(h->ts.sec));
2601 }
2602 }
2603 else
2604 h->flags |= ZBX_DC_FLAG_NOTRENDS;
2605
2606 normalize_item_value(item, h);
2607
2608 diff = calculate_item_update(item, h);
2609 zbx_vector_ptr_append(item_diff, diff);
2610 DCinventory_value_add(inventory_values, item, h);
2611 }
2612
2613 zbx_vector_ptr_sort(inventory_values, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC);
2614 zbx_vector_ptr_sort(item_diff, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC);
2615
2616 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
2617 }
2618
2619 /******************************************************************************
2620 * *
2621 * Function: DCmodule_prepare_history *
2622 * *
2623 * Purpose: prepare history data to share them with loadable modules, sort *
2624 * data by type skipping low-level discovery data, meta information *
2625 * updates and notsupported items *
2626 * *
2627 * Parameters: history - [IN] array of history data *
2628 * history_num - [IN] number of history structures *
2629 * history_<type> - [OUT] array of historical data of a *
2630 * specific data type *
2631 * history_<type>_num - [OUT] number of values of a specific *
2632 * data type *
2633 * *
2634 ******************************************************************************/
DCmodule_prepare_history(ZBX_DC_HISTORY * history,int history_num,ZBX_HISTORY_FLOAT * history_float,int * history_float_num,ZBX_HISTORY_INTEGER * history_integer,int * history_integer_num,ZBX_HISTORY_STRING * history_string,int * history_string_num,ZBX_HISTORY_TEXT * history_text,int * history_text_num,ZBX_HISTORY_LOG * history_log,int * history_log_num)2635 static void DCmodule_prepare_history(ZBX_DC_HISTORY *history, int history_num, ZBX_HISTORY_FLOAT *history_float,
2636 int *history_float_num, ZBX_HISTORY_INTEGER *history_integer, int *history_integer_num,
2637 ZBX_HISTORY_STRING *history_string, int *history_string_num, ZBX_HISTORY_TEXT *history_text,
2638 int *history_text_num, ZBX_HISTORY_LOG *history_log, int *history_log_num)
2639 {
2640 ZBX_DC_HISTORY *h;
2641 ZBX_HISTORY_FLOAT *h_float;
2642 ZBX_HISTORY_INTEGER *h_integer;
2643 ZBX_HISTORY_STRING *h_string;
2644 ZBX_HISTORY_TEXT *h_text;
2645 ZBX_HISTORY_LOG *h_log;
2646 int i;
2647 const zbx_log_value_t *log;
2648
2649 *history_float_num = 0;
2650 *history_integer_num = 0;
2651 *history_string_num = 0;
2652 *history_text_num = 0;
2653 *history_log_num = 0;
2654
2655 for (i = 0; i < history_num; i++)
2656 {
2657 h = &history[i];
2658
2659 if (0 != (ZBX_DC_FLAGS_NOT_FOR_MODULES & h->flags))
2660 continue;
2661
2662 switch (h->value_type)
2663 {
2664 case ITEM_VALUE_TYPE_FLOAT:
2665 if (NULL == history_float_cbs)
2666 continue;
2667
2668 h_float = &history_float[(*history_float_num)++];
2669 h_float->itemid = h->itemid;
2670 h_float->clock = h->ts.sec;
2671 h_float->ns = h->ts.ns;
2672 h_float->value = h->value.dbl;
2673 break;
2674 case ITEM_VALUE_TYPE_UINT64:
2675 if (NULL == history_integer_cbs)
2676 continue;
2677
2678 h_integer = &history_integer[(*history_integer_num)++];
2679 h_integer->itemid = h->itemid;
2680 h_integer->clock = h->ts.sec;
2681 h_integer->ns = h->ts.ns;
2682 h_integer->value = h->value.ui64;
2683 break;
2684 case ITEM_VALUE_TYPE_STR:
2685 if (NULL == history_string_cbs)
2686 continue;
2687
2688 h_string = &history_string[(*history_string_num)++];
2689 h_string->itemid = h->itemid;
2690 h_string->clock = h->ts.sec;
2691 h_string->ns = h->ts.ns;
2692 h_string->value = h->value.str;
2693 break;
2694 case ITEM_VALUE_TYPE_TEXT:
2695 if (NULL == history_text_cbs)
2696 continue;
2697
2698 h_text = &history_text[(*history_text_num)++];
2699 h_text->itemid = h->itemid;
2700 h_text->clock = h->ts.sec;
2701 h_text->ns = h->ts.ns;
2702 h_text->value = h->value.str;
2703 break;
2704 case ITEM_VALUE_TYPE_LOG:
2705 if (NULL == history_log_cbs)
2706 continue;
2707
2708 log = h->value.log;
2709 h_log = &history_log[(*history_log_num)++];
2710 h_log->itemid = h->itemid;
2711 h_log->clock = h->ts.sec;
2712 h_log->ns = h->ts.ns;
2713 h_log->value = log->value;
2714 h_log->source = ZBX_NULL2EMPTY_STR(log->source);
2715 h_log->timestamp = log->timestamp;
2716 h_log->logeventid = log->logeventid;
2717 h_log->severity = log->severity;
2718 break;
2719 default:
2720 THIS_SHOULD_NEVER_HAPPEN;
2721 }
2722 }
2723 }
2724
DCmodule_sync_history(int history_float_num,int history_integer_num,int history_string_num,int history_text_num,int history_log_num,ZBX_HISTORY_FLOAT * history_float,ZBX_HISTORY_INTEGER * history_integer,ZBX_HISTORY_STRING * history_string,ZBX_HISTORY_TEXT * history_text,ZBX_HISTORY_LOG * history_log)2725 static void DCmodule_sync_history(int history_float_num, int history_integer_num, int history_string_num,
2726 int history_text_num, int history_log_num, ZBX_HISTORY_FLOAT *history_float,
2727 ZBX_HISTORY_INTEGER *history_integer, ZBX_HISTORY_STRING *history_string,
2728 ZBX_HISTORY_TEXT *history_text, ZBX_HISTORY_LOG *history_log)
2729 {
2730 if (0 != history_float_num)
2731 {
2732 int i;
2733
2734 zabbix_log(LOG_LEVEL_DEBUG, "syncing float history data with modules...");
2735
2736 for (i = 0; NULL != history_float_cbs[i].module; i++)
2737 {
2738 zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_float_cbs[i].module->name);
2739 history_float_cbs[i].history_float_cb(history_float, history_float_num);
2740 }
2741
2742 zabbix_log(LOG_LEVEL_DEBUG, "synced %d float values with modules", history_float_num);
2743 }
2744
2745 if (0 != history_integer_num)
2746 {
2747 int i;
2748
2749 zabbix_log(LOG_LEVEL_DEBUG, "syncing integer history data with modules...");
2750
2751 for (i = 0; NULL != history_integer_cbs[i].module; i++)
2752 {
2753 zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_integer_cbs[i].module->name);
2754 history_integer_cbs[i].history_integer_cb(history_integer, history_integer_num);
2755 }
2756
2757 zabbix_log(LOG_LEVEL_DEBUG, "synced %d integer values with modules", history_integer_num);
2758 }
2759
2760 if (0 != history_string_num)
2761 {
2762 int i;
2763
2764 zabbix_log(LOG_LEVEL_DEBUG, "syncing string history data with modules...");
2765
2766 for (i = 0; NULL != history_string_cbs[i].module; i++)
2767 {
2768 zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_string_cbs[i].module->name);
2769 history_string_cbs[i].history_string_cb(history_string, history_string_num);
2770 }
2771
2772 zabbix_log(LOG_LEVEL_DEBUG, "synced %d string values with modules", history_string_num);
2773 }
2774
2775 if (0 != history_text_num)
2776 {
2777 int i;
2778
2779 zabbix_log(LOG_LEVEL_DEBUG, "syncing text history data with modules...");
2780
2781 for (i = 0; NULL != history_text_cbs[i].module; i++)
2782 {
2783 zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_text_cbs[i].module->name);
2784 history_text_cbs[i].history_text_cb(history_text, history_text_num);
2785 }
2786
2787 zabbix_log(LOG_LEVEL_DEBUG, "synced %d text values with modules", history_text_num);
2788 }
2789
2790 if (0 != history_log_num)
2791 {
2792 int i;
2793
2794 zabbix_log(LOG_LEVEL_DEBUG, "syncing log history data with modules...");
2795
2796 for (i = 0; NULL != history_log_cbs[i].module; i++)
2797 {
2798 zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_log_cbs[i].module->name);
2799 history_log_cbs[i].history_log_cb(history_log, history_log_num);
2800 }
2801
2802 zabbix_log(LOG_LEVEL_DEBUG, "synced %d log values with modules", history_log_num);
2803 }
2804 }
2805
sync_proxy_history(int * total_num,int * more)2806 static void sync_proxy_history(int *total_num, int *more)
2807 {
2808 int history_num;
2809 time_t sync_start;
2810 zbx_vector_ptr_t history_items;
2811 ZBX_DC_HISTORY history[ZBX_HC_SYNC_MAX];
2812
2813 zbx_vector_ptr_create(&history_items);
2814 zbx_vector_ptr_reserve(&history_items, ZBX_HC_SYNC_MAX);
2815
2816 sync_start = time(NULL);
2817
2818 do
2819 {
2820 *more = ZBX_SYNC_DONE;
2821
2822 LOCK_CACHE;
2823
2824 hc_pop_items(&history_items); /* select and take items out of history cache */
2825 history_num = history_items.values_num;
2826
2827 UNLOCK_CACHE;
2828
2829 if (0 == history_num)
2830 break;
2831
2832 hc_get_item_values(history, &history_items); /* copy item data from history cache */
2833
2834 do
2835 {
2836 DBbegin();
2837
2838 DCmass_proxy_add_history(history, history_num);
2839 DCmass_proxy_update_items(history, history_num);
2840 }
2841 while (ZBX_DB_DOWN == DBcommit());
2842
2843 LOCK_CACHE;
2844
2845 hc_push_items(&history_items); /* return items to history cache */
2846 cache->history_num -= history_num;
2847
2848 if (0 != hc_queue_get_size())
2849 *more = ZBX_SYNC_MORE;
2850
2851 UNLOCK_CACHE;
2852
2853 *total_num += history_num;
2854
2855 zbx_vector_ptr_clear(&history_items);
2856 hc_free_item_values(history, history_num);
2857
2858 /* Exit from sync loop if we have spent too much time here */
2859 /* unless we are doing full sync. This is done to allow */
2860 /* syncer process to update their statistics. */
2861 }
2862 while (ZBX_SYNC_MORE == *more && ZBX_HC_SYNC_TIME_MAX >= time(NULL) - sync_start);
2863
2864 zbx_vector_ptr_destroy(&history_items);
2865 }
2866
2867 /******************************************************************************
2868 * *
2869 * Function: sync_server_history *
2870 * *
2871 * Purpose: flush history cache to database, process triggers of flushed *
2872 * and timer triggers from timer queue *
2873 * *
2874 * Parameters: sync_timeout - [IN] the timeout in seconds *
2875 * values_num - [IN/OUT] the number of synced values *
2876 * triggers_num - [IN/OUT] the number of processed timers *
2877 * more - [OUT] a flag indicating the cache emptiness: *
2878 * ZBX_SYNC_DONE - nothing to sync, go idle *
2879 * ZBX_SYNC_MORE - more data to sync *
2880 * *
2881 * Comments: This function loops syncing history values by 1k batches and *
2882 * processing timer triggers by batches of 500 triggers. *
2883 * Unless full sync is being done the loop is aborted if either *
2884 * timeout has passed or there are no more data to process. *
2885 * The last is assumed when the following is true: *
2886 * a) history cache is empty or less than 10% of batch values were *
2887 * processed (the other items were locked by triggers) *
2888 * b) less than 500 (full batch) timer triggers were processed *
2889 * *
2890 ******************************************************************************/
sync_server_history(int * values_num,int * triggers_num,int * more)2891 static void sync_server_history(int *values_num, int *triggers_num, int *more)
2892 {
2893 static ZBX_HISTORY_FLOAT *history_float;
2894 static ZBX_HISTORY_INTEGER *history_integer;
2895 static ZBX_HISTORY_STRING *history_string;
2896 static ZBX_HISTORY_TEXT *history_text;
2897 static ZBX_HISTORY_LOG *history_log;
2898 int i, history_num, history_float_num, history_integer_num, history_string_num,
2899 history_text_num, history_log_num, txn_error;
2900 time_t sync_start;
2901 zbx_vector_uint64_t triggerids, timer_triggerids;
2902 zbx_vector_ptr_t history_items, trigger_diff, item_diff, inventory_values;
2903 zbx_vector_uint64_pair_t trends_diff;
2904 ZBX_DC_HISTORY history[ZBX_HC_SYNC_MAX];
2905
2906 if (NULL == history_float && NULL != history_float_cbs)
2907 {
2908 history_float = (ZBX_HISTORY_FLOAT *)zbx_malloc(history_float,
2909 ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_FLOAT));
2910 }
2911
2912 if (NULL == history_integer && NULL != history_integer_cbs)
2913 {
2914 history_integer = (ZBX_HISTORY_INTEGER *)zbx_malloc(history_integer,
2915 ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_INTEGER));
2916 }
2917
2918 if (NULL == history_string && NULL != history_string_cbs)
2919 {
2920 history_string = (ZBX_HISTORY_STRING *)zbx_malloc(history_string,
2921 ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_STRING));
2922 }
2923
2924 if (NULL == history_text && NULL != history_text_cbs)
2925 {
2926 history_text = (ZBX_HISTORY_TEXT *)zbx_malloc(history_text,
2927 ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_TEXT));
2928 }
2929
2930 if (NULL == history_log && NULL != history_log_cbs)
2931 {
2932 history_log = (ZBX_HISTORY_LOG *)zbx_malloc(history_log,
2933 ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_LOG));
2934 }
2935
2936 zbx_vector_ptr_create(&inventory_values);
2937 zbx_vector_ptr_create(&item_diff);
2938 zbx_vector_ptr_create(&trigger_diff);
2939 zbx_vector_uint64_pair_create(&trends_diff);
2940
2941 zbx_vector_uint64_create(&triggerids);
2942 zbx_vector_uint64_reserve(&triggerids, ZBX_HC_SYNC_MAX);
2943
2944 zbx_vector_uint64_create(&timer_triggerids);
2945 zbx_vector_uint64_reserve(&timer_triggerids, ZBX_HC_TIMER_MAX);
2946
2947 zbx_vector_ptr_create(&history_items);
2948 zbx_vector_ptr_reserve(&history_items, ZBX_HC_SYNC_MAX);
2949
2950 sync_start = time(NULL);
2951
2952 do
2953 {
2954 DC_ITEM *items;
2955 int *errcodes, trends_num = 0, timers_num = 0, ret = SUCCEED;
2956 zbx_vector_uint64_t itemids;
2957 ZBX_DC_TREND *trends = NULL;
2958 zbx_timespec_t ts;
2959
2960 *more = ZBX_SYNC_DONE;
2961
2962 LOCK_CACHE;
2963 hc_pop_items(&history_items); /* select and take items out of history cache */
2964 UNLOCK_CACHE;
2965
2966 if (0 != history_items.values_num)
2967 {
2968 if (0 == (history_num = DCconfig_lock_triggers_by_history_items(&history_items, &triggerids)))
2969 {
2970 LOCK_CACHE;
2971 hc_push_items(&history_items);
2972 UNLOCK_CACHE;
2973 zbx_vector_ptr_clear(&history_items);
2974 }
2975 }
2976 else
2977 history_num = 0;
2978
2979 zbx_timespec(&ts);
2980
2981 if (0 != history_num)
2982 {
2983 hc_get_item_values(history, &history_items); /* copy item data from history cache */
2984
2985 items = (DC_ITEM *)zbx_malloc(NULL, sizeof(DC_ITEM) * (size_t)history_num);
2986 errcodes = (int *)zbx_malloc(NULL, sizeof(int) * (size_t)history_num);
2987
2988 zbx_vector_uint64_create(&itemids);
2989 zbx_vector_uint64_reserve(&itemids, history_num);
2990
2991 for (i = 0; i < history_num; i++)
2992 zbx_vector_uint64_append(&itemids, history[i].itemid);
2993
2994 zbx_vector_uint64_sort(&itemids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
2995
2996 DCconfig_get_items_by_itemids(items, itemids.values, errcodes, history_num);
2997
2998 DCmass_prepare_history(history, &itemids, items, errcodes, history_num, &item_diff,
2999 &inventory_values);
3000
3001 if (FAIL != (ret = DBmass_add_history(history, history_num)))
3002 {
3003 DCconfig_items_apply_changes(&item_diff);
3004 DCmass_update_trends(history, history_num, &trends, &trends_num);
3005
3006 do
3007 {
3008 DBbegin();
3009
3010 DBmass_update_items(&item_diff, &inventory_values);
3011 DBmass_update_trends(trends, trends_num, &trends_diff);
3012
3013 /* process internal events generated by DCmass_prepare_history() */
3014 zbx_process_events(NULL, NULL);
3015
3016 if (ZBX_DB_OK == (txn_error = DBcommit()))
3017 DCupdate_trends(&trends_diff);
3018 else
3019 zbx_reset_event_recovery();
3020
3021 zbx_vector_uint64_pair_clear(&trends_diff);
3022 }
3023 while (ZBX_DB_DOWN == txn_error);
3024 }
3025
3026 zbx_clean_events();
3027
3028 zbx_vector_ptr_clear_ext(&inventory_values, (zbx_clean_func_t)DCinventory_value_free);
3029 zbx_vector_ptr_clear_ext(&item_diff, (zbx_clean_func_t)zbx_ptr_free);
3030 }
3031
3032 if (FAIL != ret)
3033 {
3034 zbx_dc_get_timer_triggerids(&timer_triggerids, time(NULL), ZBX_HC_TIMER_MAX);
3035 timers_num = timer_triggerids.values_num;
3036
3037 if (ZBX_HC_TIMER_MAX == timers_num)
3038 *more = ZBX_SYNC_MORE;
3039
3040 if (0 != history_num || 0 != timers_num)
3041 {
3042 /* timer triggers do not intersect with item triggers because item triggers */
3043 /* where already locked and skipped when retrieving timer triggers */
3044 zbx_vector_uint64_append_array(&triggerids, timer_triggerids.values,
3045 timer_triggerids.values_num);
3046 do
3047 {
3048 DBbegin();
3049
3050 recalculate_triggers(history, history_num, &timer_triggerids, &ts,
3051 &trigger_diff);
3052
3053 /* process trigger events generated by recalculate_triggers() */
3054 zbx_process_events(&trigger_diff, &triggerids);
3055 if (0 != trigger_diff.values_num)
3056 zbx_db_save_trigger_changes(&trigger_diff);
3057
3058 if (ZBX_DB_OK == (txn_error = DBcommit()))
3059 {
3060 DCconfig_triggers_apply_changes(&trigger_diff);
3061 DBupdate_itservices(&trigger_diff);
3062 }
3063 else
3064 zbx_clean_events();
3065
3066 zbx_vector_ptr_clear_ext(&trigger_diff, (zbx_clean_func_t)zbx_trigger_diff_free);
3067 }
3068 while (ZBX_DB_DOWN == txn_error);
3069 }
3070
3071 zbx_vector_uint64_clear(&timer_triggerids);
3072 }
3073
3074 if (0 != triggerids.values_num)
3075 {
3076 *triggers_num += triggerids.values_num;
3077 DCconfig_unlock_triggers(&triggerids);
3078 zbx_vector_uint64_clear(&triggerids);
3079 }
3080
3081 if (0 != history_num)
3082 {
3083 LOCK_CACHE;
3084 hc_push_items(&history_items); /* return items to history cache */
3085 cache->history_num -= history_num;
3086
3087 if (0 != hc_queue_get_size())
3088 {
3089 /* Continue sync if enough of sync candidates were processed */
3090 /* (meaning most of sync candidates are not locked by triggers). */
3091 /* Otherwise better to wait a bit for other syncers to unlock */
3092 /* items rather than trying and failing to sync locked items over */
3093 /* and over again. */
3094 if (ZBX_HC_SYNC_MIN_PCNT <= history_num * 100 / history_items.values_num)
3095 *more = ZBX_SYNC_MORE;
3096 }
3097
3098 UNLOCK_CACHE;
3099
3100 *values_num += history_num;
3101 }
3102
3103 if (FAIL != ret)
3104 {
3105 if (0 != history_num)
3106 {
3107 DCmodule_prepare_history(history, history_num, history_float, &history_float_num,
3108 history_integer, &history_integer_num, history_string,
3109 &history_string_num, history_text, &history_text_num, history_log,
3110 &history_log_num);
3111
3112 DCmodule_sync_history(history_float_num, history_integer_num, history_string_num,
3113 history_text_num, history_log_num, history_float, history_integer,
3114 history_string, history_text, history_log);
3115 }
3116
3117 if (SUCCEED == zbx_is_export_enabled())
3118 {
3119 if (0 != history_num)
3120 {
3121 DCexport_history_and_trends(history, history_num, &itemids, items, errcodes,
3122 trends, trends_num);
3123 }
3124
3125 zbx_export_events();
3126 }
3127 }
3128
3129 if (0 != history_num || 0 != timers_num)
3130 zbx_clean_events();
3131
3132 if (0 != history_num)
3133 {
3134 zbx_free(trends);
3135 zbx_vector_uint64_destroy(&itemids);
3136 DCconfig_clean_items(items, errcodes, history_num);
3137 zbx_free(errcodes);
3138 zbx_free(items);
3139
3140 zbx_vector_ptr_clear(&history_items);
3141 hc_free_item_values(history, history_num);
3142 }
3143
3144 /* Exit from sync loop if we have spent too much time here. */
3145 /* This is done to allow syncer process to update its statistics. */
3146 }
3147 while (ZBX_SYNC_MORE == *more && ZBX_HC_SYNC_TIME_MAX >= time(NULL) - sync_start);
3148
3149 zbx_vector_ptr_destroy(&history_items);
3150 zbx_vector_ptr_destroy(&inventory_values);
3151 zbx_vector_ptr_destroy(&item_diff);
3152 zbx_vector_ptr_destroy(&trigger_diff);
3153 zbx_vector_uint64_pair_destroy(&trends_diff);
3154
3155 zbx_vector_uint64_destroy(&timer_triggerids);
3156 zbx_vector_uint64_destroy(&triggerids);
3157 }
3158
3159 /******************************************************************************
3160 * *
3161 * Function: sync_history_cache_full *
3162 * *
3163 * Purpose: writes updates and new data from history cache to database *
3164 * *
3165 * Comments: This function is used to flush history cache at server/proxy *
3166 * exit. *
3167 * Other processes are already terminated, so cache locking is *
3168 * unnecessary. *
3169 * *
3170 ******************************************************************************/
sync_history_cache_full(void)3171 static void sync_history_cache_full(void)
3172 {
3173 const char *__function_name = "sync_history_cache_full";
3174
3175 int values_num = 0, triggers_num = 0, more;
3176 zbx_hashset_iter_t iter;
3177 zbx_hc_item_t *item;
3178 zbx_binary_heap_t tmp_history_queue;
3179
3180 zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d", __function_name, cache->history_num);
3181
3182 /* History index cache might be full without any space left for queueing items from history index to */
3183 /* history queue. The solution: replace the shared-memory history queue with heap-allocated one. Add */
3184 /* all items from history index to the new history queue. */
3185 /* */
3186 /* Assertions that must be true. */
3187 /* * This is the main server or proxy process, */
3188 /* * There are no other users of history index cache stored in shared memory. Other processes */
3189 /* should have quit by this point. */
3190 /* * other parts of the program do not hold pointers to the elements of history queue that is */
3191 /* stored in the shared memory. */
3192
3193 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
3194 {
3195 /* unlock all triggers before full sync so no items are locked by triggers */
3196 DCconfig_unlock_all_triggers();
3197
3198 /* clear timer trigger queue to avoid processing time triggers at exit */
3199 zbx_dc_clear_timer_queue();
3200 }
3201
3202 tmp_history_queue = cache->history_queue;
3203
3204 zbx_binary_heap_create(&cache->history_queue, hc_queue_elem_compare_func, ZBX_BINARY_HEAP_OPTION_EMPTY);
3205 zbx_hashset_iter_reset(&cache->history_items, &iter);
3206
3207 /* add all items from history index to the new history queue */
3208 while (NULL != (item = (zbx_hc_item_t *)zbx_hashset_iter_next(&iter)))
3209 {
3210 if (NULL != item->tail)
3211 {
3212 item->status = ZBX_HC_ITEM_STATUS_NORMAL;
3213 hc_queue_item(item);
3214 }
3215 }
3216
3217 if (0 != hc_queue_get_size())
3218 {
3219 zabbix_log(LOG_LEVEL_WARNING, "syncing history data...");
3220
3221 do
3222 {
3223 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
3224 sync_server_history(&values_num, &triggers_num, &more);
3225 else
3226 sync_proxy_history(&values_num, &more);
3227
3228 zabbix_log(LOG_LEVEL_WARNING, "syncing history data... " ZBX_FS_DBL "%%",
3229 (double)values_num / (cache->history_num + values_num) * 100);
3230 }
3231 while (0 != hc_queue_get_size());
3232
3233 zabbix_log(LOG_LEVEL_WARNING, "syncing history data done");
3234 }
3235
3236 zbx_binary_heap_destroy(&cache->history_queue);
3237 cache->history_queue = tmp_history_queue;
3238
3239 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
3240 }
3241
3242 /******************************************************************************
3243 * *
3244 * Function: zbx_log_sync_history_cache_progress *
3245 * *
3246 * Purpose: log progress of syncing history data *
3247 * *
3248 ******************************************************************************/
zbx_log_sync_history_cache_progress(void)3249 void zbx_log_sync_history_cache_progress(void)
3250 {
3251 double pcnt = -1.0;
3252 int ts_last, ts_next, sec;
3253
3254 LOCK_CACHE;
3255
3256 if (INT_MAX == cache->history_progress_ts)
3257 {
3258 UNLOCK_CACHE;
3259 return;
3260 }
3261
3262 ts_last = cache->history_progress_ts;
3263 sec = time(NULL);
3264
3265 if (0 == cache->history_progress_ts)
3266 {
3267 cache->history_num_total = cache->history_num;
3268 cache->history_progress_ts = sec;
3269 }
3270
3271 if (ZBX_HC_SYNC_TIME_MAX <= sec - cache->history_progress_ts || 0 == cache->history_num)
3272 {
3273 if (0 != cache->history_num_total)
3274 pcnt = 100 * (double)(cache->history_num_total - cache->history_num) / cache->history_num_total;
3275
3276 cache->history_progress_ts = (0 == cache->history_num ? INT_MAX : sec);
3277 }
3278
3279 ts_next = cache->history_progress_ts;
3280
3281 UNLOCK_CACHE;
3282
3283 if (0 == ts_last)
3284 zabbix_log(LOG_LEVEL_WARNING, "syncing history data in progress... ");
3285
3286 if (-1.0 != pcnt)
3287 zabbix_log(LOG_LEVEL_WARNING, "syncing history data... " ZBX_FS_DBL "%%", pcnt);
3288
3289 if (INT_MAX == ts_next)
3290 zabbix_log(LOG_LEVEL_WARNING, "syncing history data done");
3291 }
3292
3293 /******************************************************************************
3294 * *
3295 * Function: zbx_sync_history_cache *
3296 * *
3297 * Purpose: writes updates and new data from history cache to database *
3298 * *
3299 * Parameters: values_num - [OUT] the number of synced values *
3300 * more - [OUT] a flag indicating the cache emptiness: *
3301 * ZBX_SYNC_DONE - nothing to sync, go idle *
3302 * ZBX_SYNC_MORE - more data to sync *
3303 * *
3304 ******************************************************************************/
zbx_sync_history_cache(int * values_num,int * triggers_num,int * more)3305 void zbx_sync_history_cache(int *values_num, int *triggers_num, int *more)
3306 {
3307 const char *__function_name = "zbx_sync_history_cache";
3308
3309 zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d", __function_name, cache->history_num);
3310
3311 *values_num = 0;
3312 *triggers_num = 0;
3313
3314 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
3315 sync_server_history(values_num, triggers_num, more);
3316 else
3317 sync_proxy_history(values_num, more);
3318
3319 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
3320 }
3321
3322 /******************************************************************************
3323 * *
3324 * local history cache *
3325 * *
3326 ******************************************************************************/
dc_string_buffer_realloc(size_t len)3327 static void dc_string_buffer_realloc(size_t len)
3328 {
3329 if (string_values_alloc >= string_values_offset + len)
3330 return;
3331
3332 do
3333 {
3334 string_values_alloc += ZBX_STRING_REALLOC_STEP;
3335 }
3336 while (string_values_alloc < string_values_offset + len);
3337
3338 string_values = (char *)zbx_realloc(string_values, string_values_alloc);
3339 }
3340
dc_local_get_history_slot(void)3341 static dc_item_value_t *dc_local_get_history_slot(void)
3342 {
3343 if (ZBX_MAX_VALUES_LOCAL == item_values_num)
3344 dc_flush_history();
3345
3346 if (item_values_alloc == item_values_num)
3347 {
3348 item_values_alloc += ZBX_STRUCT_REALLOC_STEP;
3349 item_values = (dc_item_value_t *)zbx_realloc(item_values, item_values_alloc * sizeof(dc_item_value_t));
3350 }
3351
3352 return &item_values[item_values_num++];
3353 }
3354
dc_local_add_history_dbl(zbx_uint64_t itemid,unsigned char item_value_type,const zbx_timespec_t * ts,double value_orig,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)3355 static void dc_local_add_history_dbl(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts,
3356 double value_orig, zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
3357 {
3358 dc_item_value_t *item_value;
3359
3360 item_value = dc_local_get_history_slot();
3361
3362 item_value->itemid = itemid;
3363 item_value->ts = *ts;
3364 item_value->item_value_type = item_value_type;
3365 item_value->value_type = ITEM_VALUE_TYPE_FLOAT;
3366 item_value->state = ITEM_STATE_NORMAL;
3367 item_value->flags = flags;
3368
3369 if (0 != (item_value->flags & ZBX_DC_FLAG_META))
3370 {
3371 item_value->lastlogsize = lastlogsize;
3372 item_value->mtime = mtime;
3373 }
3374
3375 if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE))
3376 item_value->value.value_dbl = value_orig;
3377 }
3378
dc_local_add_history_uint(zbx_uint64_t itemid,unsigned char item_value_type,const zbx_timespec_t * ts,zbx_uint64_t value_orig,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)3379 static void dc_local_add_history_uint(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts,
3380 zbx_uint64_t value_orig, zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
3381 {
3382 dc_item_value_t *item_value;
3383
3384 item_value = dc_local_get_history_slot();
3385
3386 item_value->itemid = itemid;
3387 item_value->ts = *ts;
3388 item_value->item_value_type = item_value_type;
3389 item_value->value_type = ITEM_VALUE_TYPE_UINT64;
3390 item_value->state = ITEM_STATE_NORMAL;
3391 item_value->flags = flags;
3392
3393 if (0 != (item_value->flags & ZBX_DC_FLAG_META))
3394 {
3395 item_value->lastlogsize = lastlogsize;
3396 item_value->mtime = mtime;
3397 }
3398
3399 if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE))
3400 item_value->value.value_uint = value_orig;
3401 }
3402
dc_local_add_history_text(zbx_uint64_t itemid,unsigned char item_value_type,const zbx_timespec_t * ts,const char * value_orig,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)3403 static void dc_local_add_history_text(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts,
3404 const char *value_orig, zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
3405 {
3406 dc_item_value_t *item_value;
3407
3408 item_value = dc_local_get_history_slot();
3409
3410 item_value->itemid = itemid;
3411 item_value->ts = *ts;
3412 item_value->item_value_type = item_value_type;
3413 item_value->value_type = ITEM_VALUE_TYPE_TEXT;
3414 item_value->state = ITEM_STATE_NORMAL;
3415 item_value->flags = flags;
3416
3417 if (0 != (item_value->flags & ZBX_DC_FLAG_META))
3418 {
3419 item_value->lastlogsize = lastlogsize;
3420 item_value->mtime = mtime;
3421 }
3422
3423 if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE))
3424 {
3425 item_value->value.value_str.len = zbx_db_strlen_n(value_orig, ZBX_HISTORY_VALUE_LEN) + 1;
3426 dc_string_buffer_realloc(item_value->value.value_str.len);
3427
3428 item_value->value.value_str.pvalue = string_values_offset;
3429 memcpy(&string_values[string_values_offset], value_orig, item_value->value.value_str.len);
3430 string_values_offset += item_value->value.value_str.len;
3431 }
3432 else
3433 item_value->value.value_str.len = 0;
3434 }
3435
dc_local_add_history_log(zbx_uint64_t itemid,unsigned char item_value_type,const zbx_timespec_t * ts,const zbx_log_t * log,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)3436 static void dc_local_add_history_log(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts,
3437 const zbx_log_t *log, zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
3438 {
3439 dc_item_value_t *item_value;
3440
3441 item_value = dc_local_get_history_slot();
3442
3443 item_value->itemid = itemid;
3444 item_value->ts = *ts;
3445 item_value->item_value_type = item_value_type;
3446 item_value->value_type = ITEM_VALUE_TYPE_LOG;
3447 item_value->state = ITEM_STATE_NORMAL;
3448
3449 item_value->flags = flags;
3450
3451 if (0 != (item_value->flags & ZBX_DC_FLAG_META))
3452 {
3453 item_value->lastlogsize = lastlogsize;
3454 item_value->mtime = mtime;
3455 }
3456
3457 if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE))
3458 {
3459 item_value->severity = log->severity;
3460 item_value->logeventid = log->logeventid;
3461 item_value->timestamp = log->timestamp;
3462
3463 item_value->value.value_str.len = zbx_db_strlen_n(log->value, ZBX_HISTORY_VALUE_LEN) + 1;
3464
3465 if (NULL != log->source && '\0' != *log->source)
3466 item_value->source.len = zbx_db_strlen_n(log->source, HISTORY_LOG_SOURCE_LEN) + 1;
3467 else
3468 item_value->source.len = 0;
3469 }
3470 else
3471 {
3472 item_value->value.value_str.len = 0;
3473 item_value->source.len = 0;
3474 }
3475
3476 if (0 != item_value->value.value_str.len + item_value->source.len)
3477 {
3478 dc_string_buffer_realloc(item_value->value.value_str.len + item_value->source.len);
3479
3480 if (0 != item_value->value.value_str.len)
3481 {
3482 item_value->value.value_str.pvalue = string_values_offset;
3483 memcpy(&string_values[string_values_offset], log->value, item_value->value.value_str.len);
3484 string_values_offset += item_value->value.value_str.len;
3485 }
3486
3487 if (0 != item_value->source.len)
3488 {
3489 item_value->source.pvalue = string_values_offset;
3490 memcpy(&string_values[string_values_offset], log->source, item_value->source.len);
3491 string_values_offset += item_value->source.len;
3492 }
3493 }
3494 }
3495
dc_local_add_history_notsupported(zbx_uint64_t itemid,const zbx_timespec_t * ts,const char * error,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)3496 static void dc_local_add_history_notsupported(zbx_uint64_t itemid, const zbx_timespec_t *ts, const char *error,
3497 zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
3498 {
3499 dc_item_value_t *item_value;
3500
3501 item_value = dc_local_get_history_slot();
3502
3503 item_value->itemid = itemid;
3504 item_value->ts = *ts;
3505 item_value->state = ITEM_STATE_NOTSUPPORTED;
3506 item_value->flags = flags;
3507
3508 if (0 != (item_value->flags & ZBX_DC_FLAG_META))
3509 {
3510 item_value->lastlogsize = lastlogsize;
3511 item_value->mtime = mtime;
3512 }
3513
3514 item_value->value.value_str.len = zbx_db_strlen_n(error, ITEM_ERROR_LEN) + 1;
3515 dc_string_buffer_realloc(item_value->value.value_str.len);
3516 item_value->value.value_str.pvalue = string_values_offset;
3517 memcpy(&string_values[string_values_offset], error, item_value->value.value_str.len);
3518 string_values_offset += item_value->value.value_str.len;
3519 }
3520
dc_local_add_history_lld(zbx_uint64_t itemid,const zbx_timespec_t * ts,const char * value_orig)3521 static void dc_local_add_history_lld(zbx_uint64_t itemid, const zbx_timespec_t *ts, const char *value_orig)
3522 {
3523 dc_item_value_t *item_value;
3524
3525 item_value = dc_local_get_history_slot();
3526
3527 item_value->itemid = itemid;
3528 item_value->ts = *ts;
3529 item_value->state = ITEM_STATE_NORMAL;
3530 item_value->flags = ZBX_DC_FLAG_LLD;
3531 item_value->value.value_str.len = strlen(value_orig) + 1;
3532
3533 dc_string_buffer_realloc(item_value->value.value_str.len);
3534 item_value->value.value_str.pvalue = string_values_offset;
3535 memcpy(&string_values[string_values_offset], value_orig, item_value->value.value_str.len);
3536 string_values_offset += item_value->value.value_str.len;
3537 }
3538
3539 /******************************************************************************
3540 * *
3541 * Function: dc_add_history *
3542 * *
3543 * Purpose: add new value to the cache *
3544 * *
3545 * Parameters: itemid - [IN] the itemid *
3546 * item_value_type - [IN] the item value type *
3547 * item_flags - [IN] the item flags (e. g. lld rule) *
3548 * result - [IN] agent result containing the value *
3549 * to add *
3550 * ts - [IN] the value timestamp *
3551 * state - [IN] the item state *
3552 * error - [IN] the error message in case item state *
3553 * is ITEM_STATE_NOTSUPPORTED *
3554 * *
3555 ******************************************************************************/
dc_add_history(zbx_uint64_t itemid,unsigned char item_value_type,unsigned char item_flags,AGENT_RESULT * result,const zbx_timespec_t * ts,unsigned char state,const char * error)3556 void dc_add_history(zbx_uint64_t itemid, unsigned char item_value_type, unsigned char item_flags,
3557 AGENT_RESULT *result, const zbx_timespec_t *ts, unsigned char state, const char *error)
3558 {
3559 unsigned char value_flags;
3560
3561 if (ITEM_STATE_NOTSUPPORTED == state)
3562 {
3563 zbx_uint64_t lastlogsize;
3564 int mtime;
3565
3566 if (NULL != result && 0 != ISSET_META(result))
3567 {
3568 value_flags = ZBX_DC_FLAG_META;
3569 lastlogsize = result->lastlogsize;
3570 mtime = result->mtime;
3571 }
3572 else
3573 {
3574 value_flags = 0;
3575 lastlogsize = 0;
3576 mtime = 0;
3577 }
3578 dc_local_add_history_notsupported(itemid, ts, error, lastlogsize, mtime, value_flags);
3579 return;
3580 }
3581
3582 if (0 != (ZBX_FLAG_DISCOVERY_RULE & item_flags))
3583 {
3584 if (NULL == GET_TEXT_RESULT(result))
3585 return;
3586
3587 /* proxy stores low-level discovery (lld) values in db */
3588 if (0 == (ZBX_PROGRAM_TYPE_SERVER & program_type))
3589 dc_local_add_history_lld(itemid, ts, result->text);
3590
3591 return;
3592 }
3593
3594 if (!ISSET_VALUE(result) && !ISSET_META(result))
3595 return;
3596
3597 value_flags = 0;
3598
3599 if (!ISSET_VALUE(result))
3600 value_flags |= ZBX_DC_FLAG_NOVALUE;
3601
3602 if (ISSET_META(result))
3603 value_flags |= ZBX_DC_FLAG_META;
3604
3605 /* Add data to the local history cache if: */
3606 /* 1) the NOVALUE flag is set (data contains only meta information) */
3607 /* 2) the NOVALUE flag is not set and value conversion succeeded */
3608
3609 if (0 == (value_flags & ZBX_DC_FLAG_NOVALUE))
3610 {
3611 if (ISSET_LOG(result))
3612 {
3613 dc_local_add_history_log(itemid, item_value_type, ts, result->log, result->lastlogsize,
3614 result->mtime, value_flags);
3615 }
3616 else if (ISSET_UI64(result))
3617 {
3618 dc_local_add_history_uint(itemid, item_value_type, ts, result->ui64, result->lastlogsize,
3619 result->mtime, value_flags);
3620 }
3621 else if (ISSET_DBL(result))
3622 {
3623 dc_local_add_history_dbl(itemid, item_value_type, ts, result->dbl, result->lastlogsize,
3624 result->mtime, value_flags);
3625 }
3626 else if (ISSET_STR(result))
3627 {
3628 dc_local_add_history_text(itemid, item_value_type, ts, result->str, result->lastlogsize,
3629 result->mtime, value_flags);
3630 }
3631 else if (ISSET_TEXT(result))
3632 {
3633 dc_local_add_history_text(itemid, item_value_type, ts, result->text, result->lastlogsize,
3634 result->mtime, value_flags);
3635 }
3636 else
3637 {
3638 THIS_SHOULD_NEVER_HAPPEN;
3639 }
3640 }
3641 else
3642 {
3643 if (0 != (value_flags & ZBX_DC_FLAG_META))
3644 {
3645 dc_local_add_history_log(itemid, item_value_type, ts, NULL, result->lastlogsize, result->mtime,
3646 value_flags);
3647 }
3648 else
3649 THIS_SHOULD_NEVER_HAPPEN;
3650
3651 }
3652 }
3653
dc_flush_history(void)3654 void dc_flush_history(void)
3655 {
3656 if (0 == item_values_num)
3657 return;
3658
3659 LOCK_CACHE;
3660
3661 hc_add_item_values(item_values, item_values_num);
3662
3663 cache->history_num += item_values_num;
3664
3665 UNLOCK_CACHE;
3666
3667 item_values_num = 0;
3668 string_values_offset = 0;
3669 }
3670
3671 /******************************************************************************
3672 * *
3673 * history cache storage *
3674 * *
3675 ******************************************************************************/
ZBX_MEM_FUNC_IMPL(__hc_index,hc_index_mem)3676 ZBX_MEM_FUNC_IMPL(__hc_index, hc_index_mem)
3677 ZBX_MEM_FUNC_IMPL(__hc, hc_mem)
3678
3679 /******************************************************************************
3680 * *
3681 * Function: hc_queue_elem_compare_func *
3682 * *
3683 * Purpose: compares history queue elements *
3684 * *
3685 ******************************************************************************/
3686 static int hc_queue_elem_compare_func(const void *d1, const void *d2)
3687 {
3688 const zbx_binary_heap_elem_t *e1 = (const zbx_binary_heap_elem_t *)d1;
3689 const zbx_binary_heap_elem_t *e2 = (const zbx_binary_heap_elem_t *)d2;
3690
3691 const zbx_hc_item_t *item1 = (const zbx_hc_item_t *)e1->data;
3692 const zbx_hc_item_t *item2 = (const zbx_hc_item_t *)e2->data;
3693
3694 /* compare by timestamp of the oldest value */
3695 return zbx_timespec_compare(&item1->tail->ts, &item2->tail->ts);
3696 }
3697
3698 /******************************************************************************
3699 * *
3700 * Function: hc_free_data *
3701 * *
3702 * Purpose: free history item data allocated in history cache *
3703 * *
3704 * Parameters: data - [IN] history item data *
3705 * *
3706 ******************************************************************************/
hc_free_data(zbx_hc_data_t * data)3707 static void hc_free_data(zbx_hc_data_t *data)
3708 {
3709 if (ITEM_STATE_NOTSUPPORTED == data->state)
3710 {
3711 __hc_mem_free_func(data->value.str);
3712 }
3713 else
3714 {
3715 if (0 == (data->flags & ZBX_DC_FLAG_NOVALUE))
3716 {
3717 switch (data->value_type)
3718 {
3719 case ITEM_VALUE_TYPE_STR:
3720 case ITEM_VALUE_TYPE_TEXT:
3721 __hc_mem_free_func(data->value.str);
3722 break;
3723 case ITEM_VALUE_TYPE_LOG:
3724 __hc_mem_free_func(data->value.log->value);
3725
3726 if (NULL != data->value.log->source)
3727 __hc_mem_free_func(data->value.log->source);
3728
3729 __hc_mem_free_func(data->value.log);
3730 break;
3731 }
3732 }
3733 }
3734
3735 __hc_mem_free_func(data);
3736 }
3737
3738 /******************************************************************************
3739 * *
3740 * Function: hc_queue_item *
3741 * *
3742 * Purpose: put back item into history queue *
3743 * *
3744 * Parameters: data - [IN] history item data *
3745 * *
3746 ******************************************************************************/
hc_queue_item(zbx_hc_item_t * item)3747 static void hc_queue_item(zbx_hc_item_t *item)
3748 {
3749 zbx_binary_heap_elem_t elem = {item->itemid, (const void *)item};
3750
3751 zbx_binary_heap_insert(&cache->history_queue, &elem);
3752 }
3753
3754 /******************************************************************************
3755 * *
3756 * Function: hc_get_item *
3757 * *
3758 * Purpose: returns history item by itemid *
3759 * *
3760 * Parameters: itemid - [IN] the item id *
3761 * *
3762 * Return value: the history item or NULL if the requested item is not in *
3763 * history cache *
3764 * *
3765 ******************************************************************************/
hc_get_item(zbx_uint64_t itemid)3766 static zbx_hc_item_t *hc_get_item(zbx_uint64_t itemid)
3767 {
3768 return (zbx_hc_item_t *)zbx_hashset_search(&cache->history_items, &itemid);
3769 }
3770
3771 /******************************************************************************
3772 * *
3773 * Function: hc_add_item *
3774 * *
3775 * Purpose: adds a new item to history cache *
3776 * *
3777 * Parameters: itemid - [IN] the item id *
3778 * [IN] the item data *
3779 * *
3780 * Return value: the added history item *
3781 * *
3782 ******************************************************************************/
hc_add_item(zbx_uint64_t itemid,zbx_hc_data_t * data)3783 static zbx_hc_item_t *hc_add_item(zbx_uint64_t itemid, zbx_hc_data_t *data)
3784 {
3785 zbx_hc_item_t item_local = {itemid, ZBX_HC_ITEM_STATUS_NORMAL, data, data};
3786
3787 return (zbx_hc_item_t *)zbx_hashset_insert(&cache->history_items, &item_local, sizeof(item_local));
3788 }
3789
3790 /******************************************************************************
3791 * *
3792 * Function: hc_mem_value_str_dup *
3793 * *
3794 * Purpose: copies string value to history cache *
3795 * *
3796 * Parameters: str - [IN] the string value *
3797 * *
3798 * Return value: the copied string or NULL if there was not enough memory *
3799 * *
3800 ******************************************************************************/
hc_mem_value_str_dup(const dc_value_str_t * str)3801 static char *hc_mem_value_str_dup(const dc_value_str_t *str)
3802 {
3803 char *ptr;
3804
3805 if (NULL == (ptr = (char *)__hc_mem_malloc_func(NULL, str->len)))
3806 return NULL;
3807
3808 memcpy(ptr, &string_values[str->pvalue], str->len - 1);
3809 ptr[str->len - 1] = '\0';
3810
3811 return ptr;
3812 }
3813
3814 /******************************************************************************
3815 * *
3816 * Function: hc_clone_history_str_data *
3817 * *
3818 * Purpose: clones string value into history data memory *
3819 * *
3820 * Parameters: dst - [IN/OUT] a reference to the cloned value *
3821 * str - [IN] the string value to clone *
3822 * *
3823 * Return value: SUCCESS - either there was no need to clone the string *
3824 * (it was empty or already cloned) or the string was *
3825 * cloned successfully *
3826 * FAIL - not enough memory *
3827 * *
3828 * Comments: This function can be called in loop with the same dst value *
3829 * until it finishes cloning string value. *
3830 * *
3831 ******************************************************************************/
hc_clone_history_str_data(char ** dst,const dc_value_str_t * str)3832 static int hc_clone_history_str_data(char **dst, const dc_value_str_t *str)
3833 {
3834 if (0 == str->len)
3835 return SUCCEED;
3836
3837 if (NULL != *dst)
3838 return SUCCEED;
3839
3840 if (NULL != (*dst = hc_mem_value_str_dup(str)))
3841 return SUCCEED;
3842
3843 return FAIL;
3844 }
3845
3846 /******************************************************************************
3847 * *
3848 * Function: hc_clone_history_log_data *
3849 * *
3850 * Purpose: clones log value into history data memory *
3851 * *
3852 * Parameters: dst - [IN/OUT] a reference to the cloned value *
3853 * item_value - [IN] the log value to clone *
3854 * *
3855 * Return value: SUCCESS - the log value was cloned successfully *
3856 * FAIL - not enough memory *
3857 * *
3858 * Comments: This function can be called in loop with the same dst value *
3859 * until it finishes cloning log value. *
3860 * *
3861 ******************************************************************************/
hc_clone_history_log_data(zbx_log_value_t ** dst,const dc_item_value_t * item_value)3862 static int hc_clone_history_log_data(zbx_log_value_t **dst, const dc_item_value_t *item_value)
3863 {
3864 if (NULL == *dst)
3865 {
3866 /* using realloc instead of malloc just to suppress 'not used' warning for realloc */
3867 if (NULL == (*dst = (zbx_log_value_t *)__hc_mem_realloc_func(NULL, sizeof(zbx_log_value_t))))
3868 return FAIL;
3869
3870 memset(*dst, 0, sizeof(zbx_log_value_t));
3871 }
3872
3873 if (SUCCEED != hc_clone_history_str_data(&(*dst)->value, &item_value->value.value_str))
3874 return FAIL;
3875
3876 if (SUCCEED != hc_clone_history_str_data(&(*dst)->source, &item_value->source))
3877 return FAIL;
3878
3879 (*dst)->logeventid = item_value->logeventid;
3880 (*dst)->severity = item_value->severity;
3881 (*dst)->timestamp = item_value->timestamp;
3882
3883 return SUCCEED;
3884 }
3885
3886 /******************************************************************************
3887 * *
3888 * Function: hc_clone_history_data *
3889 * *
3890 * Purpose: clones item value from local cache into history cache *
3891 * *
3892 * Parameters: data - [IN/OUT] a reference to the cloned value *
3893 * item_value - [IN] the item value *
3894 * *
3895 * Return value: SUCCESS - the item value was cloned successfully *
3896 * FAIL - not enough memory *
3897 * *
3898 * Comments: This function can be called in loop with the same data value *
3899 * until it finishes cloning item value. *
3900 * *
3901 ******************************************************************************/
hc_clone_history_data(zbx_hc_data_t ** data,const dc_item_value_t * item_value)3902 static int hc_clone_history_data(zbx_hc_data_t **data, const dc_item_value_t *item_value)
3903 {
3904 if (NULL == *data)
3905 {
3906 if (NULL == (*data = (zbx_hc_data_t *)__hc_mem_malloc_func(NULL, sizeof(zbx_hc_data_t))))
3907 return FAIL;
3908
3909 memset(*data, 0, sizeof(zbx_hc_data_t));
3910
3911 (*data)->state = item_value->state;
3912 (*data)->ts = item_value->ts;
3913 (*data)->flags = item_value->flags;
3914 }
3915
3916 if (0 != (ZBX_DC_FLAG_META & item_value->flags))
3917 {
3918 (*data)->lastlogsize = item_value->lastlogsize;
3919 (*data)->mtime = item_value->mtime;
3920 }
3921
3922 if (ITEM_STATE_NOTSUPPORTED == item_value->state)
3923 {
3924 if (NULL == ((*data)->value.str = hc_mem_value_str_dup(&item_value->value.value_str)))
3925 return FAIL;
3926
3927 (*data)->value_type = item_value->value_type;
3928 cache->stats.notsupported_counter++;
3929
3930 return SUCCEED;
3931 }
3932
3933 if (0 != (ZBX_DC_FLAG_LLD & item_value->flags))
3934 {
3935 if (NULL == ((*data)->value.str = hc_mem_value_str_dup(&item_value->value.value_str)))
3936 return FAIL;
3937
3938 (*data)->value_type = ITEM_VALUE_TYPE_TEXT;
3939
3940 cache->stats.history_text_counter++;
3941 cache->stats.history_counter++;
3942
3943 return SUCCEED;
3944 }
3945
3946 if (0 == (ZBX_DC_FLAG_NOVALUE & item_value->flags))
3947 {
3948 switch (item_value->value_type)
3949 {
3950 case ITEM_VALUE_TYPE_FLOAT:
3951 (*data)->value.dbl = item_value->value.value_dbl;
3952 break;
3953 case ITEM_VALUE_TYPE_UINT64:
3954 (*data)->value.ui64 = item_value->value.value_uint;
3955 break;
3956 case ITEM_VALUE_TYPE_STR:
3957 if (SUCCEED != hc_clone_history_str_data(&(*data)->value.str,
3958 &item_value->value.value_str))
3959 {
3960 return FAIL;
3961 }
3962 break;
3963 case ITEM_VALUE_TYPE_TEXT:
3964 if (SUCCEED != hc_clone_history_str_data(&(*data)->value.str,
3965 &item_value->value.value_str))
3966 {
3967 return FAIL;
3968 }
3969 break;
3970 case ITEM_VALUE_TYPE_LOG:
3971 if (SUCCEED != hc_clone_history_log_data(&(*data)->value.log, item_value))
3972 return FAIL;
3973 break;
3974 }
3975
3976 switch (item_value->item_value_type)
3977 {
3978 case ITEM_VALUE_TYPE_FLOAT:
3979 cache->stats.history_float_counter++;
3980 break;
3981 case ITEM_VALUE_TYPE_UINT64:
3982 cache->stats.history_uint_counter++;
3983 break;
3984 case ITEM_VALUE_TYPE_STR:
3985 cache->stats.history_str_counter++;
3986 break;
3987 case ITEM_VALUE_TYPE_TEXT:
3988 cache->stats.history_text_counter++;
3989 break;
3990 case ITEM_VALUE_TYPE_LOG:
3991 cache->stats.history_log_counter++;
3992 break;
3993 }
3994
3995 cache->stats.history_counter++;
3996 }
3997
3998 (*data)->value_type = item_value->value_type;
3999
4000 return SUCCEED;
4001 }
4002
4003 /******************************************************************************
4004 * *
4005 * Function: hc_add_item_values *
4006 * *
4007 * Purpose: adds item values to the history cache *
4008 * *
4009 * Parameters: values - [IN] the item values to add *
4010 * values_num - [IN] the number of item values to add *
4011 * *
4012 * Comments: If the history cache is full this function will wait until *
4013 * history syncers processes values freeing enough space to store *
4014 * the new value. *
4015 * *
4016 ******************************************************************************/
hc_add_item_values(dc_item_value_t * values,int values_num)4017 static void hc_add_item_values(dc_item_value_t *values, int values_num)
4018 {
4019 dc_item_value_t *item_value;
4020 int i;
4021 zbx_hc_item_t *item;
4022
4023 for (i = 0; i < values_num; i++)
4024 {
4025 zbx_hc_data_t *data = NULL;
4026
4027 item_value = &values[i];
4028
4029 while (SUCCEED != hc_clone_history_data(&data, item_value))
4030 {
4031 UNLOCK_CACHE;
4032
4033 zabbix_log(LOG_LEVEL_DEBUG, "History cache is full. Sleeping for 1 second.");
4034 sleep(1);
4035
4036 LOCK_CACHE;
4037 }
4038
4039 if (NULL == (item = hc_get_item(item_value->itemid)))
4040 {
4041 item = hc_add_item(item_value->itemid, data);
4042 hc_queue_item(item);
4043 }
4044 else
4045 {
4046 item->head->next = data;
4047 item->head = data;
4048 }
4049 }
4050 }
4051
4052 /******************************************************************************
4053 * *
4054 * Function: hc_copy_history_data *
4055 * *
4056 * Purpose: copies item value from history cache into the specified history *
4057 * value *
4058 * *
4059 * Parameters: history - [OUT] the history value *
4060 * itemid - [IN] the item identifier *
4061 * data - [IN] the history data to copy *
4062 * *
4063 * Comments: handling of uninitialized fields in dc_add_proxy_history_log() *
4064 * *
4065 ******************************************************************************/
hc_copy_history_data(ZBX_DC_HISTORY * history,zbx_uint64_t itemid,zbx_hc_data_t * data)4066 static void hc_copy_history_data(ZBX_DC_HISTORY *history, zbx_uint64_t itemid, zbx_hc_data_t *data)
4067 {
4068 history->itemid = itemid;
4069 history->ts = data->ts;
4070 history->state = data->state;
4071 history->flags = data->flags;
4072 history->lastlogsize = data->lastlogsize;
4073 history->mtime = data->mtime;
4074
4075 if (ITEM_STATE_NOTSUPPORTED == data->state)
4076 {
4077 history->value.err = zbx_strdup(NULL, data->value.str);
4078 history->flags |= ZBX_DC_FLAG_UNDEF;
4079 return;
4080 }
4081
4082 history->value_type = data->value_type;
4083
4084 if (0 == (ZBX_DC_FLAG_NOVALUE & data->flags))
4085 {
4086 switch (data->value_type)
4087 {
4088 case ITEM_VALUE_TYPE_FLOAT:
4089 history->value.dbl = data->value.dbl;
4090 break;
4091 case ITEM_VALUE_TYPE_UINT64:
4092 history->value.ui64 = data->value.ui64;
4093 break;
4094 case ITEM_VALUE_TYPE_STR:
4095 case ITEM_VALUE_TYPE_TEXT:
4096 history->value.str = zbx_strdup(NULL, data->value.str);
4097 break;
4098 case ITEM_VALUE_TYPE_LOG:
4099 history->value.log = (zbx_log_value_t *)zbx_malloc(NULL, sizeof(zbx_log_value_t));
4100 history->value.log->value = zbx_strdup(NULL, data->value.log->value);
4101
4102 if (NULL != data->value.log->source)
4103 history->value.log->source = zbx_strdup(NULL, data->value.log->source);
4104 else
4105 history->value.log->source = NULL;
4106
4107 history->value.log->timestamp = data->value.log->timestamp;
4108 history->value.log->severity = data->value.log->severity;
4109 history->value.log->logeventid = data->value.log->logeventid;
4110
4111 break;
4112 }
4113 }
4114 }
4115
4116 /******************************************************************************
4117 * *
4118 * Function: hc_pop_items *
4119 * *
4120 * Purpose: pops the next batch of history items from cache for processing *
4121 * *
4122 * Parameters: history_items - [OUT] the locked history items *
4123 * *
4124 * Comments: The history_items must be returned back to history cache with *
4125 * hc_push_items() function after they have been processed. *
4126 * *
4127 ******************************************************************************/
hc_pop_items(zbx_vector_ptr_t * history_items)4128 static void hc_pop_items(zbx_vector_ptr_t *history_items)
4129 {
4130 zbx_binary_heap_elem_t *elem;
4131 zbx_hc_item_t *item;
4132
4133 while (ZBX_HC_SYNC_MAX > history_items->values_num && FAIL == zbx_binary_heap_empty(&cache->history_queue))
4134 {
4135 elem = zbx_binary_heap_find_min(&cache->history_queue);
4136 item = (zbx_hc_item_t *)elem->data;
4137 zbx_vector_ptr_append(history_items, item);
4138
4139 zbx_binary_heap_remove_min(&cache->history_queue);
4140 }
4141 }
4142
4143 /******************************************************************************
4144 * *
4145 * Function: hc_get_item_values *
4146 * *
4147 * Purpose: gets item history values *
4148 * *
4149 * Parameters: history - [OUT] the history valeus *
4150 * history_items - [IN] the history items *
4151 * *
4152 ******************************************************************************/
hc_get_item_values(ZBX_DC_HISTORY * history,zbx_vector_ptr_t * history_items)4153 static void hc_get_item_values(ZBX_DC_HISTORY *history, zbx_vector_ptr_t *history_items)
4154 {
4155 int i, history_num = 0;
4156 zbx_hc_item_t *item;
4157
4158 /* we don't need to lock history cache because no other processes can */
4159 /* change item's history data until it is pushed back to history queue */
4160 for (i = 0; i < history_items->values_num; i++)
4161 {
4162 item = (zbx_hc_item_t *)history_items->values[i];
4163
4164 if (ZBX_HC_ITEM_STATUS_BUSY == item->status)
4165 continue;
4166
4167 hc_copy_history_data(&history[history_num++], item->itemid, item->tail);
4168 }
4169 }
4170
4171 /******************************************************************************
4172 * *
4173 * Function: hc_push_processed_items *
4174 * *
4175 * Purpose: push back the processed history items into history cache *
4176 * *
4177 * Parameters: history_items - [IN] the history items containing processed *
4178 * (available) and busy items *
4179 * *
4180 * Comments: This function removes processed value from history cache. *
4181 * If there is no more data for this item, then the item itself is *
4182 * removed from history index. *
4183 * *
4184 ******************************************************************************/
hc_push_items(zbx_vector_ptr_t * history_items)4185 void hc_push_items(zbx_vector_ptr_t *history_items)
4186 {
4187 int i;
4188 zbx_hc_item_t *item;
4189 zbx_hc_data_t *data_free;
4190
4191 for (i = 0; i < history_items->values_num; i++)
4192 {
4193 item = (zbx_hc_item_t *)history_items->values[i];
4194
4195 switch (item->status)
4196 {
4197 case ZBX_HC_ITEM_STATUS_BUSY:
4198 /* reset item status before returning it to queue */
4199 item->status = ZBX_HC_ITEM_STATUS_NORMAL;
4200 hc_queue_item(item);
4201 break;
4202 case ZBX_HC_ITEM_STATUS_NORMAL:
4203 data_free = item->tail;
4204 item->tail = item->tail->next;
4205 hc_free_data(data_free);
4206 if (NULL == item->tail)
4207 zbx_hashset_remove(&cache->history_items, item);
4208 else
4209 hc_queue_item(item);
4210 break;
4211 }
4212 }
4213 }
4214
4215 /******************************************************************************
4216 * *
4217 * Function: hc_queue_get_size *
4218 * *
4219 * Purpose: retrieve the size of history queue *
4220 * *
4221 ******************************************************************************/
hc_queue_get_size(void)4222 int hc_queue_get_size(void)
4223 {
4224 return cache->history_queue.elems_num;
4225 }
4226
4227 /******************************************************************************
4228 * *
4229 * Function: init_trend_cache *
4230 * *
4231 * Purpose: Allocate shared memory for trend cache (part of database cache) *
4232 * *
4233 * Author: Vladimir Levijev *
4234 * *
4235 * Comments: Is optionally called from init_database_cache() *
4236 * *
4237 ******************************************************************************/
4238
ZBX_MEM_FUNC_IMPL(__trend,trend_mem)4239 ZBX_MEM_FUNC_IMPL(__trend, trend_mem)
4240
4241 static int init_trend_cache(char **error)
4242 {
4243 const char *__function_name = "init_trend_cache";
4244 size_t sz;
4245 int ret;
4246
4247 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
4248
4249 if (SUCCEED != (ret = zbx_mutex_create(&trends_lock, ZBX_MUTEX_TRENDS, error)))
4250 goto out;
4251
4252 sz = zbx_mem_required_size(1, "trend cache", "TrendCacheSize");
4253 if (SUCCEED != (ret = zbx_mem_create(&trend_mem, CONFIG_TRENDS_CACHE_SIZE, "trend cache", "TrendCacheSize", 0,
4254 error)))
4255 {
4256 goto out;
4257 }
4258
4259 CONFIG_TRENDS_CACHE_SIZE -= sz;
4260
4261 cache->trends_num = 0;
4262 cache->trends_last_cleanup_hour = 0;
4263
4264 #define INIT_HASHSET_SIZE 100 /* Should be calculated dynamically based on trends size? */
4265 /* Still does not make sense to have it more than initial */
4266 /* item hashset size in configuration cache. */
4267
4268 zbx_hashset_create_ext(&cache->trends, INIT_HASHSET_SIZE,
4269 ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, NULL,
4270 __trend_mem_malloc_func, __trend_mem_realloc_func, __trend_mem_free_func);
4271
4272 #undef INIT_HASHSET_SIZE
4273 out:
4274 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
4275
4276 return ret;
4277 }
4278
4279 /******************************************************************************
4280 * *
4281 * Function: init_database_cache *
4282 * *
4283 * Purpose: Allocate shared memory for database cache *
4284 * *
4285 * Author: Alexei Vladishev, Alexander Vladishev *
4286 * *
4287 ******************************************************************************/
init_database_cache(char ** error)4288 int init_database_cache(char **error)
4289 {
4290 const char *__function_name = "init_database_cache";
4291
4292 int ret;
4293
4294 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
4295
4296 if (SUCCEED != (ret = zbx_mutex_create(&cache_lock, ZBX_MUTEX_CACHE, error)))
4297 goto out;
4298
4299 if (SUCCEED != (ret = zbx_mutex_create(&cache_ids_lock, ZBX_MUTEX_CACHE_IDS, error)))
4300 goto out;
4301
4302 if (SUCCEED != (ret = zbx_mem_create(&hc_mem, CONFIG_HISTORY_CACHE_SIZE, "history cache",
4303 "HistoryCacheSize", 1, error)))
4304 {
4305 goto out;
4306 }
4307
4308 if (SUCCEED != (ret = zbx_mem_create(&hc_index_mem, CONFIG_HISTORY_INDEX_CACHE_SIZE, "history index cache",
4309 "HistoryIndexCacheSize", 0, error)))
4310 {
4311 goto out;
4312 }
4313
4314 cache = (ZBX_DC_CACHE *)__hc_index_mem_malloc_func(NULL, sizeof(ZBX_DC_CACHE));
4315 memset(cache, 0, sizeof(ZBX_DC_CACHE));
4316
4317 ids = (ZBX_DC_IDS *)__hc_index_mem_malloc_func(NULL, sizeof(ZBX_DC_IDS));
4318 memset(ids, 0, sizeof(ZBX_DC_IDS));
4319
4320 zbx_hashset_create_ext(&cache->history_items, ZBX_HC_ITEMS_INIT_SIZE,
4321 ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, NULL,
4322 __hc_index_mem_malloc_func, __hc_index_mem_realloc_func, __hc_index_mem_free_func);
4323
4324 zbx_binary_heap_create_ext(&cache->history_queue, hc_queue_elem_compare_func, ZBX_BINARY_HEAP_OPTION_EMPTY,
4325 __hc_index_mem_malloc_func, __hc_index_mem_realloc_func, __hc_index_mem_free_func);
4326
4327 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
4328 {
4329 if (SUCCEED != (ret = init_trend_cache(error)))
4330 goto out;
4331 }
4332
4333 cache->history_num_total = 0;
4334 cache->history_progress_ts = 0;
4335
4336 if (NULL == sql)
4337 sql = (char *)zbx_malloc(sql, sql_alloc);
4338 out:
4339 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
4340
4341 return ret;
4342 }
4343
4344 /******************************************************************************
4345 * *
4346 * Function: DCsync_all *
4347 * *
4348 * Purpose: writes updates and new data from pool and cache data to database *
4349 * *
4350 * Author: Alexei Vladishev *
4351 * *
4352 ******************************************************************************/
DCsync_all(void)4353 static void DCsync_all(void)
4354 {
4355 zabbix_log(LOG_LEVEL_DEBUG, "In DCsync_all()");
4356
4357 sync_history_cache_full();
4358 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
4359 DCsync_trends();
4360
4361 zabbix_log(LOG_LEVEL_DEBUG, "End of DCsync_all()");
4362 }
4363
4364 /******************************************************************************
4365 * *
4366 * Function: free_database_cache *
4367 * *
4368 * Purpose: Free memory allocated for database cache *
4369 * *
4370 * Author: Alexei Vladishev, Alexander Vladishev *
4371 * *
4372 ******************************************************************************/
free_database_cache(void)4373 void free_database_cache(void)
4374 {
4375 const char *__function_name = "free_database_cache";
4376
4377 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
4378
4379 DCsync_all();
4380
4381 cache = NULL;
4382
4383 zbx_mutex_destroy(&cache_lock);
4384 zbx_mutex_destroy(&cache_ids_lock);
4385
4386 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
4387 zbx_mutex_destroy(&trends_lock);
4388
4389 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
4390 }
4391
4392 /******************************************************************************
4393 * *
4394 * Function: DCget_nextid *
4395 * *
4396 * Purpose: Return next id for requested table *
4397 * *
4398 * Author: Alexander Vladishev *
4399 * *
4400 ******************************************************************************/
DCget_nextid(const char * table_name,int num)4401 zbx_uint64_t DCget_nextid(const char *table_name, int num)
4402 {
4403 const char *__function_name = "DCget_nextid";
4404 int i;
4405 DB_RESULT result;
4406 DB_ROW row;
4407 const ZBX_TABLE *table;
4408 ZBX_DC_ID *id;
4409 zbx_uint64_t min = 0, max = ZBX_DB_MAX_ID, nextid, lastid;
4410
4411 zabbix_log(LOG_LEVEL_DEBUG, "In %s() table:'%s' num:%d",
4412 __function_name, table_name, num);
4413
4414 LOCK_CACHE_IDS;
4415
4416 for (i = 0; i < ZBX_IDS_SIZE; i++)
4417 {
4418 id = &ids->id[i];
4419 if ('\0' == *id->table_name)
4420 break;
4421
4422 if (0 == strcmp(id->table_name, table_name))
4423 {
4424 nextid = id->lastid + 1;
4425 id->lastid += num;
4426 lastid = id->lastid;
4427
4428 UNLOCK_CACHE_IDS;
4429
4430 zabbix_log(LOG_LEVEL_DEBUG, "End of %s() table:'%s' [" ZBX_FS_UI64 ":" ZBX_FS_UI64 "]",
4431 __function_name, table_name, nextid, lastid);
4432
4433 return nextid;
4434 }
4435 }
4436
4437 if (i == ZBX_IDS_SIZE)
4438 {
4439 zabbix_log(LOG_LEVEL_ERR, "insufficient shared memory for ids");
4440 exit(EXIT_FAILURE);
4441 }
4442
4443 table = DBget_table(table_name);
4444
4445 result = DBselect("select max(%s) from %s where %s between " ZBX_FS_UI64 " and " ZBX_FS_UI64,
4446 table->recid, table_name, table->recid, min, max);
4447
4448 if (NULL != result)
4449 {
4450 zbx_strlcpy(id->table_name, table_name, sizeof(id->table_name));
4451
4452 if (NULL == (row = DBfetch(result)) || SUCCEED == DBis_null(row[0]))
4453 id->lastid = min;
4454 else
4455 ZBX_STR2UINT64(id->lastid, row[0]);
4456
4457 nextid = id->lastid + 1;
4458 id->lastid += num;
4459 lastid = id->lastid;
4460 }
4461 else
4462 nextid = lastid = 0;
4463
4464 UNLOCK_CACHE_IDS;
4465
4466 DBfree_result(result);
4467
4468 zabbix_log(LOG_LEVEL_DEBUG, "End of %s() table:'%s' [" ZBX_FS_UI64 ":" ZBX_FS_UI64 "]",
4469 __function_name, table_name, nextid, lastid);
4470
4471 return nextid;
4472 }
4473
4474 /******************************************************************************
4475 * *
4476 * Function: DCupdate_hosts_availability *
4477 * *
4478 * Purpose: performs host availability reset for hosts with availability set *
4479 * on interfaces without enabled items *
4480 * *
4481 ******************************************************************************/
DCupdate_hosts_availability(void)4482 void DCupdate_hosts_availability(void)
4483 {
4484 const char *__function_name = "DCupdate_hosts_availability";
4485 zbx_vector_ptr_t hosts;
4486 char *sql_buf = NULL;
4487 size_t sql_buf_alloc = 0, sql_buf_offset = 0;
4488 int i;
4489
4490 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __function_name);
4491
4492 zbx_vector_ptr_create(&hosts);
4493
4494 if (SUCCEED != DCreset_hosts_availability(&hosts))
4495 goto out;
4496
4497 DBbegin();
4498 DBbegin_multiple_update(&sql_buf, &sql_buf_alloc, &sql_buf_offset);
4499
4500 for (i = 0; i < hosts.values_num; i++)
4501 {
4502 if (SUCCEED != zbx_sql_add_host_availability(&sql_buf, &sql_buf_alloc, &sql_buf_offset,
4503 (zbx_host_availability_t *)hosts.values[i]))
4504 {
4505 continue;
4506 }
4507
4508 zbx_strcpy_alloc(&sql_buf, &sql_buf_alloc, &sql_buf_offset, ";\n");
4509 DBexecute_overflowed_sql(&sql_buf, &sql_buf_alloc, &sql_buf_offset);
4510 }
4511
4512 DBend_multiple_update(&sql_buf, &sql_buf_alloc, &sql_buf_offset);
4513
4514 if (16 < sql_buf_offset)
4515 DBexecute("%s", sql_buf);
4516
4517 DBcommit();
4518
4519 zbx_free(sql_buf);
4520 out:
4521 zbx_vector_ptr_clear_ext(&hosts, (zbx_mem_free_func_t)zbx_host_availability_free);
4522 zbx_vector_ptr_destroy(&hosts);
4523
4524 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __function_name);
4525 }
4526