1 /*
2 ** Zabbix
3 ** Copyright (C) 2001-2021 Zabbix SIA
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 **/
19
20 #include "common.h"
21 #include "log.h"
22 #include "threads.h"
23
24 #include "db.h"
25 #include "dbcache.h"
26 #include "ipc.h"
27 #include "mutexs.h"
28 #include "zbxserver.h"
29 #include "proxy.h"
30 #include "events.h"
31 #include "memalloc.h"
32 #include "zbxalgo.h"
33 #include "valuecache.h"
34 #include "zbxmodules.h"
35 #include "module.h"
36 #include "export.h"
37 #include "zbxjson.h"
38 #include "zbxhistory.h"
39 #include "zbxalgo.h"
40
41 static zbx_mem_info_t *hc_index_mem = NULL;
42 static zbx_mem_info_t *hc_mem = NULL;
43 static zbx_mem_info_t *trend_mem = NULL;
44
45 #define LOCK_CACHE zbx_mutex_lock(cache_lock)
46 #define UNLOCK_CACHE zbx_mutex_unlock(cache_lock)
47 #define LOCK_TRENDS zbx_mutex_lock(trends_lock)
48 #define UNLOCK_TRENDS zbx_mutex_unlock(trends_lock)
49 #define LOCK_CACHE_IDS zbx_mutex_lock(cache_ids_lock)
50 #define UNLOCK_CACHE_IDS zbx_mutex_unlock(cache_ids_lock)
51
52 static zbx_mutex_t cache_lock = ZBX_MUTEX_NULL;
53 static zbx_mutex_t trends_lock = ZBX_MUTEX_NULL;
54 static zbx_mutex_t cache_ids_lock = ZBX_MUTEX_NULL;
55
56 static char *sql = NULL;
57 static size_t sql_alloc = 4 * ZBX_KIBIBYTE;
58
59 extern unsigned char program_type;
60 extern int CONFIG_DOUBLE_PRECISION;
61 extern char *CONFIG_EXPORT_DIR;
62
63 #define ZBX_IDS_SIZE 9
64
65 #define ZBX_HC_ITEMS_INIT_SIZE 1000
66
67 #define ZBX_TRENDS_CLEANUP_TIME ((SEC_PER_HOUR * 55) / 60)
68
69 /* the maximum time spent synchronizing history */
70 #define ZBX_HC_SYNC_TIME_MAX 10
71
72 /* the maximum number of items in one synchronization batch */
73 #define ZBX_HC_SYNC_MAX 1000
74 #define ZBX_HC_TIMER_MAX (ZBX_HC_SYNC_MAX / 2)
75
76 /* the minimum processed item percentage of item candidates to continue synchronizing */
77 #define ZBX_HC_SYNC_MIN_PCNT 10
78
79 /* the maximum number of characters for history cache values */
80 #define ZBX_HISTORY_VALUE_LEN (1024 * 64)
81
82 #define ZBX_DC_FLAGS_NOT_FOR_HISTORY (ZBX_DC_FLAG_NOVALUE | ZBX_DC_FLAG_UNDEF | ZBX_DC_FLAG_NOHISTORY)
83 #define ZBX_DC_FLAGS_NOT_FOR_TRENDS (ZBX_DC_FLAG_NOVALUE | ZBX_DC_FLAG_UNDEF | ZBX_DC_FLAG_NOTRENDS)
84 #define ZBX_DC_FLAGS_NOT_FOR_MODULES (ZBX_DC_FLAGS_NOT_FOR_HISTORY | ZBX_DC_FLAG_LLD)
85 #define ZBX_DC_FLAGS_NOT_FOR_EXPORT (ZBX_DC_FLAG_NOVALUE | ZBX_DC_FLAG_UNDEF)
86
87 #define ZBX_HC_PROXYQUEUE_STATE_NORMAL 0
88 #define ZBX_HC_PROXYQUEUE_STATE_WAIT 1
89
90 typedef struct
91 {
92 char table_name[ZBX_TABLENAME_LEN_MAX];
93 zbx_uint64_t lastid;
94 }
95 ZBX_DC_ID;
96
97 typedef struct
98 {
99 ZBX_DC_ID id[ZBX_IDS_SIZE];
100 }
101 ZBX_DC_IDS;
102
103 static ZBX_DC_IDS *ids = NULL;
104
105 typedef struct
106 {
107 zbx_list_t list;
108 zbx_hashset_t index;
109 int state;
110 }
111 zbx_hc_proxyqueue_t;
112
113 typedef struct
114 {
115 zbx_hashset_t trends;
116 ZBX_DC_STATS stats;
117
118 zbx_hashset_t history_items;
119 zbx_binary_heap_t history_queue;
120
121 int history_num;
122 int trends_num;
123 int trends_last_cleanup_hour;
124 int history_num_total;
125 int history_progress_ts;
126
127 zbx_hc_proxyqueue_t proxyqueue;
128 }
129 ZBX_DC_CACHE;
130
131 static ZBX_DC_CACHE *cache = NULL;
132
133 /* local history cache */
134 #define ZBX_MAX_VALUES_LOCAL 256
135 #define ZBX_STRUCT_REALLOC_STEP 8
136 #define ZBX_STRING_REALLOC_STEP ZBX_KIBIBYTE
137
138 typedef struct
139 {
140 size_t pvalue;
141 size_t len;
142 }
143 dc_value_str_t;
144
145 typedef struct
146 {
147 double value_dbl;
148 zbx_uint64_t value_uint;
149 dc_value_str_t value_str;
150 }
151 dc_value_t;
152
153 typedef struct
154 {
155 zbx_uint64_t itemid;
156 dc_value_t value;
157 zbx_timespec_t ts;
158 dc_value_str_t source; /* for log items only */
159 zbx_uint64_t lastlogsize;
160 int timestamp; /* for log items only */
161 int severity; /* for log items only */
162 int logeventid; /* for log items only */
163 int mtime;
164 unsigned char item_value_type;
165 unsigned char value_type;
166 unsigned char state;
167 unsigned char flags; /* see ZBX_DC_FLAG_* above */
168 }
169 dc_item_value_t;
170
171 static char *string_values = NULL;
172 static size_t string_values_alloc = 0, string_values_offset = 0;
173 static dc_item_value_t *item_values = NULL;
174 static size_t item_values_alloc = 0, item_values_num = 0;
175
176 static void hc_add_item_values(dc_item_value_t *values, int values_num);
177 static void hc_pop_items(zbx_vector_ptr_t *history_items);
178 static void hc_get_item_values(ZBX_DC_HISTORY *history, zbx_vector_ptr_t *history_items);
179 static void hc_push_items(zbx_vector_ptr_t *history_items);
180 static void hc_free_item_values(ZBX_DC_HISTORY *history, int history_num);
181 static void hc_queue_item(zbx_hc_item_t *item);
182 static int hc_queue_elem_compare_func(const void *d1, const void *d2);
183 static int hc_queue_get_size(void);
184 static int hc_get_history_compression_age(void);
185
186 /******************************************************************************
187 * *
188 * Function: DCget_stats_all *
189 * *
190 * Purpose: retrieves all internal metrics of the database cache *
191 * *
192 * Parameters: stats - [OUT] write cache metrics *
193 * *
194 ******************************************************************************/
DCget_stats_all(zbx_wcache_info_t * wcache_info)195 void DCget_stats_all(zbx_wcache_info_t *wcache_info)
196 {
197 LOCK_CACHE;
198
199 wcache_info->stats = cache->stats;
200 wcache_info->history_free = hc_mem->free_size;
201 wcache_info->history_total = hc_mem->total_size;
202 wcache_info->index_free = hc_index_mem->free_size;
203 wcache_info->index_total = hc_index_mem->total_size;
204
205 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
206 {
207 wcache_info->trend_free = trend_mem->free_size;
208 wcache_info->trend_total = trend_mem->orig_size;
209 }
210
211 UNLOCK_CACHE;
212 }
213
214 /******************************************************************************
215 * *
216 * Function: DCget_stats *
217 * *
218 * Purpose: get statistics of the database cache *
219 * *
220 * Author: Alexander Vladishev *
221 * *
222 ******************************************************************************/
DCget_stats(int request)223 void *DCget_stats(int request)
224 {
225 static zbx_uint64_t value_uint;
226 static double value_double;
227 void *ret;
228
229 LOCK_CACHE;
230
231 switch (request)
232 {
233 case ZBX_STATS_HISTORY_COUNTER:
234 value_uint = cache->stats.history_counter;
235 ret = (void *)&value_uint;
236 break;
237 case ZBX_STATS_HISTORY_FLOAT_COUNTER:
238 value_uint = cache->stats.history_float_counter;
239 ret = (void *)&value_uint;
240 break;
241 case ZBX_STATS_HISTORY_UINT_COUNTER:
242 value_uint = cache->stats.history_uint_counter;
243 ret = (void *)&value_uint;
244 break;
245 case ZBX_STATS_HISTORY_STR_COUNTER:
246 value_uint = cache->stats.history_str_counter;
247 ret = (void *)&value_uint;
248 break;
249 case ZBX_STATS_HISTORY_LOG_COUNTER:
250 value_uint = cache->stats.history_log_counter;
251 ret = (void *)&value_uint;
252 break;
253 case ZBX_STATS_HISTORY_TEXT_COUNTER:
254 value_uint = cache->stats.history_text_counter;
255 ret = (void *)&value_uint;
256 break;
257 case ZBX_STATS_NOTSUPPORTED_COUNTER:
258 value_uint = cache->stats.notsupported_counter;
259 ret = (void *)&value_uint;
260 break;
261 case ZBX_STATS_HISTORY_TOTAL:
262 value_uint = hc_mem->total_size;
263 ret = (void *)&value_uint;
264 break;
265 case ZBX_STATS_HISTORY_USED:
266 value_uint = hc_mem->total_size - hc_mem->free_size;
267 ret = (void *)&value_uint;
268 break;
269 case ZBX_STATS_HISTORY_FREE:
270 value_uint = hc_mem->free_size;
271 ret = (void *)&value_uint;
272 break;
273 case ZBX_STATS_HISTORY_PUSED:
274 value_double = 100 * (double)(hc_mem->total_size - hc_mem->free_size) / hc_mem->total_size;
275 ret = (void *)&value_double;
276 break;
277 case ZBX_STATS_HISTORY_PFREE:
278 value_double = 100 * (double)hc_mem->free_size / hc_mem->total_size;
279 ret = (void *)&value_double;
280 break;
281 case ZBX_STATS_TREND_TOTAL:
282 value_uint = trend_mem->orig_size;
283 ret = (void *)&value_uint;
284 break;
285 case ZBX_STATS_TREND_USED:
286 value_uint = trend_mem->orig_size - trend_mem->free_size;
287 ret = (void *)&value_uint;
288 break;
289 case ZBX_STATS_TREND_FREE:
290 value_uint = trend_mem->free_size;
291 ret = (void *)&value_uint;
292 break;
293 case ZBX_STATS_TREND_PUSED:
294 value_double = 100 * (double)(trend_mem->orig_size - trend_mem->free_size) /
295 trend_mem->orig_size;
296 ret = (void *)&value_double;
297 break;
298 case ZBX_STATS_TREND_PFREE:
299 value_double = 100 * (double)trend_mem->free_size / trend_mem->orig_size;
300 ret = (void *)&value_double;
301 break;
302 case ZBX_STATS_HISTORY_INDEX_TOTAL:
303 value_uint = hc_index_mem->total_size;
304 ret = (void *)&value_uint;
305 break;
306 case ZBX_STATS_HISTORY_INDEX_USED:
307 value_uint = hc_index_mem->total_size - hc_index_mem->free_size;
308 ret = (void *)&value_uint;
309 break;
310 case ZBX_STATS_HISTORY_INDEX_FREE:
311 value_uint = hc_index_mem->free_size;
312 ret = (void *)&value_uint;
313 break;
314 case ZBX_STATS_HISTORY_INDEX_PUSED:
315 value_double = 100 * (double)(hc_index_mem->total_size - hc_index_mem->free_size) /
316 hc_index_mem->total_size;
317 ret = (void *)&value_double;
318 break;
319 case ZBX_STATS_HISTORY_INDEX_PFREE:
320 value_double = 100 * (double)hc_index_mem->free_size / hc_index_mem->total_size;
321 ret = (void *)&value_double;
322 break;
323 default:
324 ret = NULL;
325 }
326
327 UNLOCK_CACHE;
328
329 return ret;
330 }
331
332 /******************************************************************************
333 * *
334 * Function: DCget_trend *
335 * *
336 * Purpose: find existing or add new structure and return pointer *
337 * *
338 * Return value: pointer to a trend structure *
339 * *
340 * Author: Alexander Vladishev *
341 * *
342 ******************************************************************************/
DCget_trend(zbx_uint64_t itemid)343 static ZBX_DC_TREND *DCget_trend(zbx_uint64_t itemid)
344 {
345 ZBX_DC_TREND *ptr, trend;
346
347 if (NULL != (ptr = (ZBX_DC_TREND *)zbx_hashset_search(&cache->trends, &itemid)))
348 return ptr;
349
350 memset(&trend, 0, sizeof(ZBX_DC_TREND));
351 trend.itemid = itemid;
352
353 return (ZBX_DC_TREND *)zbx_hashset_insert(&cache->trends, &trend, sizeof(ZBX_DC_TREND));
354 }
355
356 /******************************************************************************
357 * *
358 * Function: DCupdate_trends *
359 * *
360 * Purpose: apply disable_from changes to cache *
361 * *
362 ******************************************************************************/
DCupdate_trends(zbx_vector_uint64_pair_t * trends_diff)363 static void DCupdate_trends(zbx_vector_uint64_pair_t *trends_diff)
364 {
365 int i;
366
367 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
368
369 LOCK_TRENDS;
370
371 for (i = 0; i < trends_diff->values_num; i++)
372 {
373 ZBX_DC_TREND *trend;
374
375 if (NULL != (trend = (ZBX_DC_TREND *)zbx_hashset_search(&cache->trends, &trends_diff->values[i].first)))
376 trend->disable_from = trends_diff->values[i].second;
377 }
378
379 UNLOCK_TRENDS;
380
381 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
382 }
383
384 /******************************************************************************
385 * *
386 * Function: dc_insert_trends_in_db *
387 * *
388 * Purpose: helper function for DCflush trends *
389 * *
390 ******************************************************************************/
dc_insert_trends_in_db(ZBX_DC_TREND * trends,int trends_num,unsigned char value_type,const char * table_name,int clock)391 static void dc_insert_trends_in_db(ZBX_DC_TREND *trends, int trends_num, unsigned char value_type,
392 const char *table_name, int clock)
393 {
394 ZBX_DC_TREND *trend;
395 int i;
396 zbx_db_insert_t db_insert;
397
398 zbx_db_insert_prepare(&db_insert, table_name, "itemid", "clock", "num", "value_min", "value_avg",
399 "value_max", NULL);
400
401 for (i = 0; i < trends_num; i++)
402 {
403 trend = &trends[i];
404
405 if (0 == trend->itemid)
406 continue;
407
408 if (clock != trend->clock || value_type != trend->value_type)
409 continue;
410
411 if (ITEM_VALUE_TYPE_FLOAT == value_type)
412 {
413 zbx_db_insert_add_values(&db_insert, trend->itemid, trend->clock, trend->num,
414 trend->value_min.dbl, trend->value_avg.dbl, trend->value_max.dbl);
415 }
416 else
417 {
418 zbx_uint128_t avg;
419
420 /* calculate the trend average value */
421 udiv128_64(&avg, &trend->value_avg.ui64, trend->num);
422
423 zbx_db_insert_add_values(&db_insert, trend->itemid, trend->clock, trend->num,
424 trend->value_min.ui64, avg.lo, trend->value_max.ui64);
425 }
426
427 trend->itemid = 0;
428 }
429
430 zbx_db_insert_execute(&db_insert);
431 zbx_db_insert_clean(&db_insert);
432 }
433
434 /******************************************************************************
435 * *
436 * Function: dc_remove_updated_trends *
437 * *
438 * Purpose: Update trends disable_until for items without trends data past or *
439 * equal the specified clock *
440 * *
441 * Comments: A helper function for DCflush trends *
442 * *
443 ******************************************************************************/
dc_remove_updated_trends(ZBX_DC_TREND * trends,int trends_num,const char * table_name,int value_type,zbx_uint64_t * itemids,int * itemids_num,int clock)444 static void dc_remove_updated_trends(ZBX_DC_TREND *trends, int trends_num, const char *table_name,
445 int value_type, zbx_uint64_t *itemids, int *itemids_num, int clock)
446 {
447 int i, j, clocks_num, now, age;
448 ZBX_DC_TREND *trend;
449 zbx_uint64_t itemid;
450 size_t sql_offset;
451 DB_RESULT result;
452 DB_ROW row;
453 int clocks[] = {SEC_PER_DAY, SEC_PER_WEEK, SEC_PER_MONTH, SEC_PER_YEAR, INT_MAX};
454
455 now = time(NULL);
456 age = now - clock;
457 for (clocks_num = 0; age > clocks[clocks_num]; clocks_num++)
458 clocks[clocks_num] = now - clocks[clocks_num];
459 clocks[clocks_num] = clock;
460
461 /* remove itemids with trends data past or equal the clock */
462 for (j = 0; j <= clocks_num && 0 < *itemids_num; j++)
463 {
464 sql_offset = 0;
465 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
466 "select distinct itemid"
467 " from %s"
468 " where clock>=%d and",
469 table_name, clocks[j]);
470
471 if (0 < j)
472 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " clock<%d and", clocks[j - 1]);
473
474 DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", itemids, *itemids_num);
475
476 result = DBselect("%s", sql);
477
478 while (NULL != (row = DBfetch(result)))
479 {
480 ZBX_STR2UINT64(itemid, row[0]);
481 uint64_array_remove(itemids, itemids_num, &itemid, 1);
482 }
483 DBfree_result(result);
484 }
485
486 /* update trends disable_until for the leftover itemids */
487 while (0 != *itemids_num)
488 {
489 itemid = itemids[--*itemids_num];
490
491 for (i = 0; i < trends_num; i++)
492 {
493 trend = &trends[i];
494
495 if (itemid != trend->itemid)
496 continue;
497
498 if (clock != trend->clock || value_type != trend->value_type)
499 continue;
500
501 trend->disable_from = clock;
502 break;
503 }
504 }
505 }
506
507 /******************************************************************************
508 * *
509 * Function: dc_trends_update_float *
510 * *
511 * Purpose: helper function for DCflush trends *
512 * *
513 ******************************************************************************/
dc_trends_update_float(ZBX_DC_TREND * trend,DB_ROW row,int num,size_t * sql_offset)514 static void dc_trends_update_float(ZBX_DC_TREND *trend, DB_ROW row, int num, size_t *sql_offset)
515 {
516 history_value_t value_min, value_avg, value_max;
517
518 value_min.dbl = atof(row[2]);
519 value_avg.dbl = atof(row[3]);
520 value_max.dbl = atof(row[4]);
521
522 if (value_min.dbl < trend->value_min.dbl)
523 trend->value_min.dbl = value_min.dbl;
524
525 if (value_max.dbl > trend->value_max.dbl)
526 trend->value_max.dbl = value_max.dbl;
527
528 trend->value_avg.dbl = trend->value_avg.dbl / (trend->num + num) * trend->num +
529 value_avg.dbl / (trend->num + num) * num;
530 trend->num += num;
531
532 zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset, "update trends set"
533 " num=%d,value_min=" ZBX_FS_DBL64_SQL ",value_avg=" ZBX_FS_DBL64_SQL
534 ",value_max=" ZBX_FS_DBL64_SQL
535 " where itemid=" ZBX_FS_UI64 " and clock=%d;\n",
536 trend->num, trend->value_min.dbl, trend->value_avg.dbl, trend->value_max.dbl,
537 trend->itemid, trend->clock);
538 }
539
540 /******************************************************************************
541 * *
542 * Function: dc_trends_update_uint *
543 * *
544 * Purpose: helper function for DCflush trends *
545 * *
546 ******************************************************************************/
dc_trends_update_uint(ZBX_DC_TREND * trend,DB_ROW row,int num,size_t * sql_offset)547 static void dc_trends_update_uint(ZBX_DC_TREND *trend, DB_ROW row, int num, size_t *sql_offset)
548 {
549 history_value_t value_min, value_avg, value_max;
550 zbx_uint128_t avg;
551
552 ZBX_STR2UINT64(value_min.ui64, row[2]);
553 ZBX_STR2UINT64(value_avg.ui64, row[3]);
554 ZBX_STR2UINT64(value_max.ui64, row[4]);
555
556 if (value_min.ui64 < trend->value_min.ui64)
557 trend->value_min.ui64 = value_min.ui64;
558 if (value_max.ui64 > trend->value_max.ui64)
559 trend->value_max.ui64 = value_max.ui64;
560
561 /* calculate the trend average value */
562 umul64_64(&avg, num, value_avg.ui64);
563 uinc128_128(&trend->value_avg.ui64, &avg);
564 udiv128_64(&avg, &trend->value_avg.ui64, trend->num + num);
565
566 trend->num += num;
567
568 zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset,
569 "update trends_uint set num=%d,value_min=" ZBX_FS_UI64 ",value_avg="
570 ZBX_FS_UI64 ",value_max=" ZBX_FS_UI64 " where itemid=" ZBX_FS_UI64
571 " and clock=%d;\n",
572 trend->num,
573 trend->value_min.ui64,
574 avg.lo,
575 trend->value_max.ui64,
576 trend->itemid,
577 trend->clock);
578 }
579
580 /******************************************************************************
581 * *
582 * Function: dc_trends_fetch_and_update *
583 * *
584 * Purpose: helper function for DCflush trends *
585 * *
586 ******************************************************************************/
dc_trends_fetch_and_update(ZBX_DC_TREND * trends,int trends_num,zbx_uint64_t * itemids,int itemids_num,int * inserts_num,unsigned char value_type,const char * table_name,int clock)587 static void dc_trends_fetch_and_update(ZBX_DC_TREND *trends, int trends_num, zbx_uint64_t *itemids,
588 int itemids_num, int *inserts_num, unsigned char value_type,
589 const char *table_name, int clock)
590 {
591
592 int i, num;
593 DB_RESULT result;
594 DB_ROW row;
595 zbx_uint64_t itemid;
596 ZBX_DC_TREND *trend;
597 size_t sql_offset;
598
599 sql_offset = 0;
600 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
601 "select itemid,num,value_min,value_avg,value_max"
602 " from %s"
603 " where clock=%d and",
604 table_name, clock);
605
606 DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", itemids, itemids_num);
607
608 result = DBselect("%s order by itemid,clock", sql);
609
610 sql_offset = 0;
611 DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset);
612
613 while (NULL != (row = DBfetch(result)))
614 {
615 ZBX_STR2UINT64(itemid, row[0]);
616
617 for (i = 0; i < trends_num; i++)
618 {
619 trend = &trends[i];
620
621 if (itemid != trend->itemid)
622 continue;
623
624 if (clock != trend->clock || value_type != trend->value_type)
625 continue;
626
627 break;
628 }
629
630 if (i == trends_num)
631 {
632 THIS_SHOULD_NEVER_HAPPEN;
633 continue;
634 }
635
636 num = atoi(row[1]);
637
638 if (value_type == ITEM_VALUE_TYPE_FLOAT)
639 dc_trends_update_float(trend, row, num, &sql_offset);
640 else
641 dc_trends_update_uint(trend, row, num, &sql_offset);
642
643 trend->itemid = 0;
644
645 --*inserts_num;
646
647 DBexecute_overflowed_sql(&sql, &sql_alloc, &sql_offset);
648 }
649
650 DBfree_result(result);
651
652 DBend_multiple_update(&sql, &sql_alloc, &sql_offset);
653
654 if (sql_offset > 16) /* In ORACLE always present begin..end; */
655 DBexecute("%s", sql);
656 }
657
658 /******************************************************************************
659 * *
660 * Function: DBflush_trends *
661 * *
662 * Purpose: flush trend to the database *
663 * *
664 * Author: Alexander Vladishev *
665 * *
666 ******************************************************************************/
DBflush_trends(ZBX_DC_TREND * trends,int * trends_num,zbx_vector_uint64_pair_t * trends_diff)667 static void DBflush_trends(ZBX_DC_TREND *trends, int *trends_num, zbx_vector_uint64_pair_t *trends_diff)
668 {
669 int num, i, clock, inserts_num = 0, itemids_alloc, itemids_num = 0, trends_to = *trends_num;
670 unsigned char value_type;
671 zbx_uint64_t *itemids = NULL;
672 ZBX_DC_TREND *trend = NULL;
673 const char *table_name;
674
675 zabbix_log(LOG_LEVEL_DEBUG, "In %s() trends_num:%d", __func__, *trends_num);
676
677 clock = trends[0].clock;
678 value_type = trends[0].value_type;
679
680 switch (value_type)
681 {
682 case ITEM_VALUE_TYPE_FLOAT:
683 table_name = "trends";
684 break;
685 case ITEM_VALUE_TYPE_UINT64:
686 table_name = "trends_uint";
687 break;
688 default:
689 assert(0);
690 }
691
692 itemids_alloc = MIN(ZBX_HC_SYNC_MAX, *trends_num);
693 itemids = (zbx_uint64_t *)zbx_malloc(itemids, itemids_alloc * sizeof(zbx_uint64_t));
694
695 for (i = 0; i < *trends_num; i++)
696 {
697 trend = &trends[i];
698
699 if (clock != trend->clock || value_type != trend->value_type)
700 continue;
701
702 inserts_num++;
703
704 if (0 != trend->disable_from)
705 continue;
706
707 uint64_array_add(&itemids, &itemids_alloc, &itemids_num, trend->itemid, 64);
708
709 if (ZBX_HC_SYNC_MAX == itemids_num)
710 {
711 trends_to = i + 1;
712 break;
713 }
714 }
715
716 if (0 != itemids_num)
717 {
718 dc_remove_updated_trends(trends, trends_to, table_name, value_type, itemids,
719 &itemids_num, clock);
720 }
721
722 for (i = 0; i < trends_to; i++)
723 {
724 trend = &trends[i];
725
726 if (clock != trend->clock || value_type != trend->value_type)
727 continue;
728
729 if (0 != trend->disable_from && clock >= trend->disable_from)
730 continue;
731
732 uint64_array_add(&itemids, &itemids_alloc, &itemids_num, trend->itemid, 64);
733 }
734
735 if (0 != itemids_num)
736 {
737 dc_trends_fetch_and_update(trends, trends_to, itemids, itemids_num,
738 &inserts_num, value_type, table_name, clock);
739 }
740
741 zbx_free(itemids);
742
743 /* if 'trends' is not a primary trends buffer */
744 if (NULL != trends_diff)
745 {
746 /* we update it too */
747 for (i = 0; i < trends_to; i++)
748 {
749 zbx_uint64_pair_t pair;
750
751 if (0 == trends[i].itemid)
752 continue;
753
754 if (clock != trends[i].clock || value_type != trends[i].value_type)
755 continue;
756
757 if (0 == trends[i].disable_from || trends[i].disable_from > clock)
758 continue;
759
760 pair.first = trends[i].itemid;
761 pair.second = clock + SEC_PER_HOUR;
762 zbx_vector_uint64_pair_append(trends_diff, pair);
763 }
764 }
765
766 if (0 != inserts_num)
767 dc_insert_trends_in_db(trends, trends_to, value_type, table_name, clock);
768
769 /* clean trends */
770 for (i = 0, num = 0; i < *trends_num; i++)
771 {
772 if (0 == trends[i].itemid)
773 continue;
774
775 memcpy(&trends[num++], &trends[i], sizeof(ZBX_DC_TREND));
776 }
777 *trends_num = num;
778
779 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
780 }
781
782 /******************************************************************************
783 * *
784 * Function: DCflush_trend *
785 * *
786 * Purpose: move trend to the array of trends for flushing to DB *
787 * *
788 * Author: Alexander Vladishev *
789 * *
790 ******************************************************************************/
DCflush_trend(ZBX_DC_TREND * trend,ZBX_DC_TREND ** trends,int * trends_alloc,int * trends_num)791 static void DCflush_trend(ZBX_DC_TREND *trend, ZBX_DC_TREND **trends, int *trends_alloc, int *trends_num)
792 {
793 if (*trends_num == *trends_alloc)
794 {
795 *trends_alloc += 256;
796 *trends = (ZBX_DC_TREND *)zbx_realloc(*trends, *trends_alloc * sizeof(ZBX_DC_TREND));
797 }
798
799 memcpy(&(*trends)[*trends_num], trend, sizeof(ZBX_DC_TREND));
800 (*trends_num)++;
801
802 trend->clock = 0;
803 trend->num = 0;
804 memset(&trend->value_min, 0, sizeof(history_value_t));
805 memset(&trend->value_avg, 0, sizeof(value_avg_t));
806 memset(&trend->value_max, 0, sizeof(history_value_t));
807 }
808
809 /******************************************************************************
810 * *
811 * Function: DCadd_trend *
812 * *
813 * Purpose: add new value to the trends *
814 * *
815 * Author: Alexander Vladishev *
816 * *
817 ******************************************************************************/
DCadd_trend(const ZBX_DC_HISTORY * history,ZBX_DC_TREND ** trends,int * trends_alloc,int * trends_num)818 static void DCadd_trend(const ZBX_DC_HISTORY *history, ZBX_DC_TREND **trends, int *trends_alloc, int *trends_num)
819 {
820 ZBX_DC_TREND *trend = NULL;
821 int hour;
822
823 hour = history->ts.sec - history->ts.sec % SEC_PER_HOUR;
824
825 trend = DCget_trend(history->itemid);
826
827 if (trend->num > 0 && (trend->clock != hour || trend->value_type != history->value_type) &&
828 SUCCEED == zbx_history_requires_trends(trend->value_type))
829 {
830 DCflush_trend(trend, trends, trends_alloc, trends_num);
831 }
832
833 trend->value_type = history->value_type;
834 trend->clock = hour;
835
836 switch (trend->value_type)
837 {
838 case ITEM_VALUE_TYPE_FLOAT:
839 if (trend->num == 0 || history->value.dbl < trend->value_min.dbl)
840 trend->value_min.dbl = history->value.dbl;
841 if (trend->num == 0 || history->value.dbl > trend->value_max.dbl)
842 trend->value_max.dbl = history->value.dbl;
843 trend->value_avg.dbl += history->value.dbl / (trend->num + 1) -
844 trend->value_avg.dbl / (trend->num + 1);
845 break;
846 case ITEM_VALUE_TYPE_UINT64:
847 if (trend->num == 0 || history->value.ui64 < trend->value_min.ui64)
848 trend->value_min.ui64 = history->value.ui64;
849 if (trend->num == 0 || history->value.ui64 > trend->value_max.ui64)
850 trend->value_max.ui64 = history->value.ui64;
851 uinc128_64(&trend->value_avg.ui64, history->value.ui64);
852 break;
853 }
854 trend->num++;
855 }
856
857 /******************************************************************************
858 * *
859 * Function: DCmass_update_trends *
860 * *
861 * Purpose: update trends cache and get list of trends to flush into database *
862 * *
863 * Parameters: history - [IN] array of history data *
864 * history_num - [IN] number of history structures *
865 * trends - [OUT] list of trends to flush into database *
866 * trends_num - [OUT] number of trends *
867 * compression_age - [IN] history compression age *
868 * *
869 * Author: Alexander Vladishev *
870 * *
871 ******************************************************************************/
DCmass_update_trends(const ZBX_DC_HISTORY * history,int history_num,ZBX_DC_TREND ** trends,int * trends_num,int compression_age)872 static void DCmass_update_trends(const ZBX_DC_HISTORY *history, int history_num, ZBX_DC_TREND **trends,
873 int *trends_num, int compression_age)
874 {
875 static int last_trend_discard = 0;
876 zbx_timespec_t ts;
877 int trends_alloc = 0, i, hour, seconds;
878
879 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
880
881 zbx_timespec(&ts);
882 seconds = ts.sec % SEC_PER_HOUR;
883 hour = ts.sec - seconds;
884
885 LOCK_TRENDS;
886
887 for (i = 0; i < history_num; i++)
888 {
889 const ZBX_DC_HISTORY *h = &history[i];
890
891 if (0 != (ZBX_DC_FLAGS_NOT_FOR_TRENDS & h->flags))
892 continue;
893
894 DCadd_trend(h, trends, &trends_alloc, trends_num);
895 }
896
897 if (cache->trends_last_cleanup_hour < hour && ZBX_TRENDS_CLEANUP_TIME < seconds)
898 {
899 zbx_hashset_iter_t iter;
900 ZBX_DC_TREND *trend;
901
902 zbx_hashset_iter_reset(&cache->trends, &iter);
903
904 while (NULL != (trend = (ZBX_DC_TREND *)zbx_hashset_iter_next(&iter)))
905 {
906 if (trend->clock == hour)
907 continue;
908
909 /* discard trend items that are older than compression age */
910 if (0 != compression_age && trend->clock < compression_age)
911 {
912 if (SEC_PER_HOUR < (ts.sec - last_trend_discard)) /* log once per hour */
913 {
914 zabbix_log(LOG_LEVEL_TRACE, "discarding trends that are pointing to"
915 " compressed history period");
916 last_trend_discard = ts.sec;
917 }
918 }
919 else if (SUCCEED == zbx_history_requires_trends(trend->value_type))
920 DCflush_trend(trend, trends, &trends_alloc, trends_num);
921
922 zbx_hashset_iter_remove(&iter);
923 }
924
925 cache->trends_last_cleanup_hour = hour;
926 }
927
928 UNLOCK_TRENDS;
929
930 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
931 }
932
zbx_trend_compare(const void * d1,const void * d2)933 static int zbx_trend_compare(const void *d1, const void *d2)
934 {
935 const ZBX_DC_TREND *p1 = (const ZBX_DC_TREND *)d1;
936 const ZBX_DC_TREND *p2 = (const ZBX_DC_TREND *)d2;
937
938 ZBX_RETURN_IF_NOT_EQUAL(p1->itemid, p2->itemid);
939 ZBX_RETURN_IF_NOT_EQUAL(p1->clock, p2->clock);
940
941 return 0;
942 }
943
944 /******************************************************************************
945 * *
946 * Function: DBmass_update_trends *
947 * *
948 * Purpose: prepare history data using items from configuration cache *
949 * *
950 * Parameters: trends - [IN] trends from cache to be added to database *
951 * trends_num - [IN] number of trends to add to database *
952 * trends_diff - [OUT] disable_from updates *
953 * *
954 ******************************************************************************/
DBmass_update_trends(const ZBX_DC_TREND * trends,int trends_num,zbx_vector_uint64_pair_t * trends_diff)955 static void DBmass_update_trends(const ZBX_DC_TREND *trends, int trends_num,
956 zbx_vector_uint64_pair_t *trends_diff)
957 {
958 ZBX_DC_TREND *trends_tmp;
959
960 if (0 != trends_num)
961 {
962 trends_tmp = (ZBX_DC_TREND *)zbx_malloc(NULL, trends_num * sizeof(ZBX_DC_TREND));
963 memcpy(trends_tmp, trends, trends_num * sizeof(ZBX_DC_TREND));
964 qsort(trends_tmp, trends_num, sizeof(ZBX_DC_TREND), zbx_trend_compare);
965
966 while (0 < trends_num)
967 DBflush_trends(trends_tmp, &trends_num, trends_diff);
968
969 zbx_free(trends_tmp);
970 }
971 }
972
973 typedef struct
974 {
975 zbx_uint64_t hostid;
976 zbx_vector_ptr_t groups;
977 }
978 zbx_host_info_t;
979
980 /******************************************************************************
981 * *
982 * Function: zbx_host_info_clean *
983 * *
984 * Purpose: frees resources allocated to store host groups names *
985 * *
986 * Parameters: host_info - [IN] host information *
987 * *
988 ******************************************************************************/
zbx_host_info_clean(zbx_host_info_t * host_info)989 static void zbx_host_info_clean(zbx_host_info_t *host_info)
990 {
991 zbx_vector_ptr_clear_ext(&host_info->groups, zbx_ptr_free);
992 zbx_vector_ptr_destroy(&host_info->groups);
993 }
994
995 /******************************************************************************
996 * *
997 * Function: db_get_hosts_info_by_hostid *
998 * *
999 * Purpose: get hosts groups names *
1000 * *
1001 * Parameters: hosts_info - [IN/OUT] output names of host groups for a host *
1002 * hostids - [IN] hosts identifiers *
1003 * *
1004 ******************************************************************************/
db_get_hosts_info_by_hostid(zbx_hashset_t * hosts_info,const zbx_vector_uint64_t * hostids)1005 static void db_get_hosts_info_by_hostid(zbx_hashset_t *hosts_info, const zbx_vector_uint64_t *hostids)
1006 {
1007 int i;
1008 size_t sql_offset = 0;
1009 DB_RESULT result;
1010 DB_ROW row;
1011
1012 for (i = 0; i < hostids->values_num; i++)
1013 {
1014 zbx_host_info_t host_info = {.hostid = hostids->values[i]};
1015
1016 zbx_vector_ptr_create(&host_info.groups);
1017 zbx_hashset_insert(hosts_info, &host_info, sizeof(host_info));
1018 }
1019
1020 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
1021 "select distinct hg.hostid,g.name"
1022 " from hstgrp g,hosts_groups hg"
1023 " where g.groupid=hg.groupid"
1024 " and");
1025
1026 DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "hg.hostid", hostids->values, hostids->values_num);
1027
1028 result = DBselect("%s", sql);
1029
1030 while (NULL != (row = DBfetch(result)))
1031 {
1032 zbx_uint64_t hostid;
1033 zbx_host_info_t *host_info;
1034
1035 ZBX_DBROW2UINT64(hostid, row[0]);
1036
1037 if (NULL == (host_info = (zbx_host_info_t *)zbx_hashset_search(hosts_info, &hostid)))
1038 {
1039 THIS_SHOULD_NEVER_HAPPEN;
1040 continue;
1041 }
1042
1043 zbx_vector_ptr_append(&host_info->groups, zbx_strdup(NULL, row[1]));
1044 }
1045 DBfree_result(result);
1046 }
1047
1048 typedef struct
1049 {
1050 zbx_uint64_t itemid;
1051 char *name;
1052 DC_ITEM *item;
1053 zbx_vector_ptr_t applications;
1054 }
1055 zbx_item_info_t;
1056
1057 /******************************************************************************
1058 * *
1059 * Function: db_get_items_info_by_itemid *
1060 * *
1061 * Purpose: get items name and applications *
1062 * *
1063 * Parameters: items_info - [IN/OUT] output item name and applications *
1064 * itemids - [IN] the item identifiers *
1065 * *
1066 ******************************************************************************/
db_get_items_info_by_itemid(zbx_hashset_t * items_info,const zbx_vector_uint64_t * itemids)1067 static void db_get_items_info_by_itemid(zbx_hashset_t *items_info, const zbx_vector_uint64_t *itemids)
1068 {
1069 size_t sql_offset = 0;
1070 DB_RESULT result;
1071 DB_ROW row;
1072
1073 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select itemid,name from items where");
1074 DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", itemids->values, itemids->values_num);
1075
1076 result = DBselect("%s", sql);
1077
1078 while (NULL != (row = DBfetch(result)))
1079 {
1080 zbx_uint64_t itemid;
1081 zbx_item_info_t *item_info;
1082
1083 ZBX_DBROW2UINT64(itemid, row[0]);
1084
1085 if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &itemid)))
1086 {
1087 THIS_SHOULD_NEVER_HAPPEN;
1088 continue;
1089 }
1090
1091 zbx_substitute_item_name_macros(item_info->item, row[1], &item_info->name);
1092 }
1093 DBfree_result(result);
1094
1095 sql_offset = 0;
1096 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
1097 "select i.itemid,a.name"
1098 " from applications a,items_applications i"
1099 " where a.applicationid=i.applicationid"
1100 " and");
1101
1102 DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "i.itemid", itemids->values, itemids->values_num);
1103
1104 result = DBselect("%s", sql);
1105
1106 while (NULL != (row = DBfetch(result)))
1107 {
1108 zbx_uint64_t itemid;
1109 zbx_item_info_t *item_info;
1110
1111 ZBX_DBROW2UINT64(itemid, row[0]);
1112
1113 if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &itemid)))
1114 {
1115 THIS_SHOULD_NEVER_HAPPEN;
1116 continue;
1117 }
1118
1119 zbx_vector_ptr_append(&item_info->applications, zbx_strdup(NULL, row[1]));
1120 }
1121 DBfree_result(result);
1122 }
1123
1124 /******************************************************************************
1125 * *
1126 * Function: zbx_item_info_clean *
1127 * *
1128 * Purpose: frees resources allocated to store item applications and name *
1129 * *
1130 * Parameters: item_info - [IN] item information *
1131 * *
1132 ******************************************************************************/
zbx_item_info_clean(zbx_item_info_t * item_info)1133 static void zbx_item_info_clean(zbx_item_info_t *item_info)
1134 {
1135 zbx_vector_ptr_clear_ext(&item_info->applications, zbx_ptr_free);
1136 zbx_vector_ptr_destroy(&item_info->applications);
1137 zbx_free(item_info->name);
1138 }
1139
1140 /******************************************************************************
1141 * *
1142 * Function: DCexport_trends *
1143 * *
1144 * Purpose: export trends *
1145 * *
1146 * Parameters: trends - [IN] trends from cache *
1147 * trends_num - [IN] number of trends *
1148 * hosts_info - [IN] hosts groups names *
1149 * items_info - [IN] item names and applications *
1150 * *
1151 ******************************************************************************/
DCexport_trends(const ZBX_DC_TREND * trends,int trends_num,zbx_hashset_t * hosts_info,zbx_hashset_t * items_info)1152 static void DCexport_trends(const ZBX_DC_TREND *trends, int trends_num, zbx_hashset_t *hosts_info,
1153 zbx_hashset_t *items_info)
1154 {
1155 struct zbx_json json;
1156 const ZBX_DC_TREND *trend = NULL;
1157 int i, j;
1158 const DC_ITEM *item;
1159 zbx_host_info_t *host_info;
1160 zbx_item_info_t *item_info;
1161 zbx_uint128_t avg; /* calculate the trend average value */
1162
1163 zbx_json_init(&json, ZBX_JSON_STAT_BUF_LEN);
1164
1165 for (i = 0; i < trends_num; i++)
1166 {
1167 trend = &trends[i];
1168
1169 if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &trend->itemid)))
1170 continue;
1171
1172 item = item_info->item;
1173
1174 if (NULL == (host_info = (zbx_host_info_t *)zbx_hashset_search(hosts_info, &item->host.hostid)))
1175 {
1176 THIS_SHOULD_NEVER_HAPPEN;
1177 continue;
1178 }
1179
1180 zbx_json_clean(&json);
1181
1182 zbx_json_addobject(&json,ZBX_PROTO_TAG_HOST);
1183 zbx_json_addstring(&json, ZBX_PROTO_TAG_HOST, item->host.host, ZBX_JSON_TYPE_STRING);
1184 zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item->host.name, ZBX_JSON_TYPE_STRING);
1185 zbx_json_close(&json);
1186
1187 zbx_json_addarray(&json, ZBX_PROTO_TAG_GROUPS);
1188
1189 for (j = 0; j < host_info->groups.values_num; j++)
1190 zbx_json_addstring(&json, NULL, host_info->groups.values[j], ZBX_JSON_TYPE_STRING);
1191
1192 zbx_json_close(&json);
1193
1194 zbx_json_addarray(&json, ZBX_PROTO_TAG_APPLICATIONS);
1195
1196 for (j = 0; j < item_info->applications.values_num; j++)
1197 zbx_json_addstring(&json, NULL, item_info->applications.values[j], ZBX_JSON_TYPE_STRING);
1198
1199 zbx_json_close(&json);
1200 zbx_json_adduint64(&json, ZBX_PROTO_TAG_ITEMID, item->itemid);
1201
1202 if (NULL != item_info->name)
1203 zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item_info->name, ZBX_JSON_TYPE_STRING);
1204
1205 zbx_json_addint64(&json, ZBX_PROTO_TAG_CLOCK, trend->clock);
1206 zbx_json_addint64(&json, ZBX_PROTO_TAG_COUNT, trend->num);
1207
1208 switch (trend->value_type)
1209 {
1210 case ITEM_VALUE_TYPE_FLOAT:
1211 zbx_json_addfloat(&json, ZBX_PROTO_TAG_MIN, trend->value_min.dbl);
1212 zbx_json_addfloat(&json, ZBX_PROTO_TAG_AVG, trend->value_avg.dbl);
1213 zbx_json_addfloat(&json, ZBX_PROTO_TAG_MAX, trend->value_max.dbl);
1214 break;
1215 case ITEM_VALUE_TYPE_UINT64:
1216 zbx_json_adduint64(&json, ZBX_PROTO_TAG_MIN, trend->value_min.ui64);
1217 udiv128_64(&avg, &trend->value_avg.ui64, trend->num);
1218 zbx_json_adduint64(&json, ZBX_PROTO_TAG_AVG, avg.lo);
1219 zbx_json_adduint64(&json, ZBX_PROTO_TAG_MAX, trend->value_max.ui64);
1220 break;
1221 default:
1222 THIS_SHOULD_NEVER_HAPPEN;
1223 }
1224
1225 zbx_json_adduint64(&json, ZBX_PROTO_TAG_TYPE, trend->value_type);
1226 zbx_trends_export_write(json.buffer, json.buffer_size);
1227 }
1228
1229 zbx_trends_export_flush();
1230 zbx_json_free(&json);
1231 }
1232
1233 /******************************************************************************
1234 * *
1235 * Function: DCexport_history *
1236 * *
1237 * Purpose: export history *
1238 * *
1239 * Parameters: history - [IN/OUT] array of history data *
1240 * history_num - [IN] number of history structures *
1241 * hosts_info - [IN] hosts groups names *
1242 * items_info - [IN] item names and applications *
1243 * *
1244 ******************************************************************************/
DCexport_history(const ZBX_DC_HISTORY * history,int history_num,zbx_hashset_t * hosts_info,zbx_hashset_t * items_info)1245 static void DCexport_history(const ZBX_DC_HISTORY *history, int history_num, zbx_hashset_t *hosts_info,
1246 zbx_hashset_t *items_info)
1247 {
1248 const ZBX_DC_HISTORY *h;
1249 const DC_ITEM *item;
1250 int i, j;
1251 zbx_host_info_t *host_info;
1252 zbx_item_info_t *item_info;
1253 struct zbx_json json;
1254
1255 zbx_json_init(&json, ZBX_JSON_STAT_BUF_LEN);
1256
1257 for (i = 0; i < history_num; i++)
1258 {
1259 h = &history[i];
1260
1261 if (0 != (ZBX_DC_FLAGS_NOT_FOR_MODULES & h->flags))
1262 continue;
1263
1264 if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &h->itemid)))
1265 {
1266 THIS_SHOULD_NEVER_HAPPEN;
1267 continue;
1268 }
1269
1270 item = item_info->item;
1271
1272 if (NULL == (host_info = (zbx_host_info_t *)zbx_hashset_search(hosts_info, &item->host.hostid)))
1273 {
1274 THIS_SHOULD_NEVER_HAPPEN;
1275 continue;
1276 }
1277
1278 zbx_json_clean(&json);
1279
1280 zbx_json_addobject(&json,ZBX_PROTO_TAG_HOST);
1281 zbx_json_addstring(&json, ZBX_PROTO_TAG_HOST, item->host.host, ZBX_JSON_TYPE_STRING);
1282 zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item->host.name, ZBX_JSON_TYPE_STRING);
1283 zbx_json_close(&json);
1284
1285 zbx_json_addarray(&json, ZBX_PROTO_TAG_GROUPS);
1286
1287 for (j = 0; j < host_info->groups.values_num; j++)
1288 zbx_json_addstring(&json, NULL, host_info->groups.values[j], ZBX_JSON_TYPE_STRING);
1289
1290 zbx_json_close(&json);
1291
1292 zbx_json_addarray(&json, ZBX_PROTO_TAG_APPLICATIONS);
1293
1294 for (j = 0; j < item_info->applications.values_num; j++)
1295 zbx_json_addstring(&json, NULL, item_info->applications.values[j], ZBX_JSON_TYPE_STRING);
1296
1297 zbx_json_close(&json);
1298 zbx_json_adduint64(&json, ZBX_PROTO_TAG_ITEMID, item->itemid);
1299
1300 if (NULL != item_info->name)
1301 zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item_info->name, ZBX_JSON_TYPE_STRING);
1302
1303 zbx_json_addint64(&json, ZBX_PROTO_TAG_CLOCK, h->ts.sec);
1304 zbx_json_addint64(&json, ZBX_PROTO_TAG_NS, h->ts.ns);
1305
1306 switch (h->value_type)
1307 {
1308 case ITEM_VALUE_TYPE_FLOAT:
1309 zbx_json_addfloat(&json, ZBX_PROTO_TAG_VALUE, h->value.dbl);
1310 break;
1311 case ITEM_VALUE_TYPE_UINT64:
1312 zbx_json_adduint64(&json, ZBX_PROTO_TAG_VALUE, h->value.ui64);
1313 break;
1314 case ITEM_VALUE_TYPE_STR:
1315 zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, h->value.str, ZBX_JSON_TYPE_STRING);
1316 break;
1317 case ITEM_VALUE_TYPE_TEXT:
1318 zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, h->value.str, ZBX_JSON_TYPE_STRING);
1319 break;
1320 case ITEM_VALUE_TYPE_LOG:
1321 zbx_json_addint64(&json, ZBX_PROTO_TAG_LOGTIMESTAMP, h->value.log->timestamp);
1322 zbx_json_addstring(&json, ZBX_PROTO_TAG_LOGSOURCE,
1323 ZBX_NULL2EMPTY_STR(h->value.log->source), ZBX_JSON_TYPE_STRING);
1324 zbx_json_addint64(&json, ZBX_PROTO_TAG_LOGSEVERITY, h->value.log->severity);
1325 zbx_json_addint64(&json, ZBX_PROTO_TAG_LOGEVENTID, h->value.log->logeventid);
1326 zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, h->value.log->value,
1327 ZBX_JSON_TYPE_STRING);
1328 break;
1329 default:
1330 THIS_SHOULD_NEVER_HAPPEN;
1331 }
1332
1333 zbx_json_adduint64(&json, ZBX_PROTO_TAG_TYPE, h->value_type);
1334 zbx_history_export_write(json.buffer, json.buffer_size);
1335 }
1336
1337 zbx_history_export_flush();
1338 zbx_json_free(&json);
1339 }
1340
1341 /******************************************************************************
1342 * *
1343 * Function: DCexport_history_and_trends *
1344 * *
1345 * Purpose: export history and trends *
1346 * *
1347 * Parameters: history - [IN/OUT] array of history data *
1348 * history_num - [IN] number of history structures *
1349 * itemids - [IN] the item identifiers *
1350 * (used for item lookup) *
1351 * items - [IN] the items *
1352 * errcodes - [IN] item error codes *
1353 * trends - [IN] trends from cache *
1354 * trends_num - [IN] number of trends *
1355 * *
1356 ******************************************************************************/
DCexport_history_and_trends(const ZBX_DC_HISTORY * history,int history_num,const zbx_vector_uint64_t * itemids,DC_ITEM * items,const int * errcodes,const ZBX_DC_TREND * trends,int trends_num)1357 static void DCexport_history_and_trends(const ZBX_DC_HISTORY *history, int history_num,
1358 const zbx_vector_uint64_t *itemids, DC_ITEM *items, const int *errcodes, const ZBX_DC_TREND *trends,
1359 int trends_num)
1360 {
1361 int i, index;
1362 zbx_vector_uint64_t hostids, item_info_ids;
1363 zbx_hashset_t hosts_info, items_info;
1364 DC_ITEM *item;
1365 zbx_item_info_t item_info;
1366
1367 zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d trends_num:%d", __func__, history_num, trends_num);
1368
1369 zbx_vector_uint64_create(&hostids);
1370 zbx_vector_uint64_create(&item_info_ids);
1371 zbx_hashset_create_ext(&items_info, itemids->values_num, ZBX_DEFAULT_UINT64_HASH_FUNC,
1372 ZBX_DEFAULT_UINT64_COMPARE_FUNC, (zbx_clean_func_t)zbx_item_info_clean,
1373 ZBX_DEFAULT_MEM_MALLOC_FUNC, ZBX_DEFAULT_MEM_REALLOC_FUNC, ZBX_DEFAULT_MEM_FREE_FUNC);
1374
1375 for (i = 0; i < history_num; i++)
1376 {
1377 const ZBX_DC_HISTORY *h = &history[i];
1378
1379 if (0 != (ZBX_DC_FLAGS_NOT_FOR_EXPORT & h->flags))
1380 continue;
1381
1382 if (FAIL == (index = zbx_vector_uint64_bsearch(itemids, h->itemid, ZBX_DEFAULT_UINT64_COMPARE_FUNC)))
1383 {
1384 THIS_SHOULD_NEVER_HAPPEN;
1385 continue;
1386 }
1387
1388 if (SUCCEED != errcodes[index])
1389 continue;
1390
1391 item = &items[index];
1392
1393 zbx_vector_uint64_append(&hostids, item->host.hostid);
1394 zbx_vector_uint64_append(&item_info_ids, item->itemid);
1395
1396 item_info.itemid = item->itemid;
1397 item_info.name = NULL;
1398 item_info.item = item;
1399 zbx_vector_ptr_create(&item_info.applications);
1400 zbx_hashset_insert(&items_info, &item_info, sizeof(item_info));
1401 }
1402
1403 if (0 == history_num)
1404 {
1405 for (i = 0; i < trends_num; i++)
1406 {
1407 const ZBX_DC_TREND *trend = &trends[i];
1408
1409 if (FAIL == (index = zbx_vector_uint64_bsearch(itemids, trend->itemid,
1410 ZBX_DEFAULT_UINT64_COMPARE_FUNC)))
1411 {
1412 THIS_SHOULD_NEVER_HAPPEN;
1413 continue;
1414 }
1415
1416 if (SUCCEED != errcodes[index])
1417 continue;
1418
1419 item = &items[index];
1420
1421 zbx_vector_uint64_append(&hostids, item->host.hostid);
1422 zbx_vector_uint64_append(&item_info_ids, item->itemid);
1423
1424 item_info.itemid = item->itemid;
1425 item_info.name = NULL;
1426 item_info.item = item;
1427 zbx_vector_ptr_create(&item_info.applications);
1428 zbx_hashset_insert(&items_info, &item_info, sizeof(item_info));
1429 }
1430 }
1431
1432 if (0 == item_info_ids.values_num)
1433 goto clean;
1434
1435 zbx_vector_uint64_sort(&item_info_ids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1436 zbx_vector_uint64_sort(&hostids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1437 zbx_vector_uint64_uniq(&hostids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1438
1439 zbx_hashset_create_ext(&hosts_info, hostids.values_num, ZBX_DEFAULT_UINT64_HASH_FUNC,
1440 ZBX_DEFAULT_UINT64_COMPARE_FUNC, (zbx_clean_func_t)zbx_host_info_clean,
1441 ZBX_DEFAULT_MEM_MALLOC_FUNC, ZBX_DEFAULT_MEM_REALLOC_FUNC, ZBX_DEFAULT_MEM_FREE_FUNC);
1442
1443 db_get_hosts_info_by_hostid(&hosts_info, &hostids);
1444
1445 db_get_items_info_by_itemid(&items_info, &item_info_ids);
1446
1447 if (0 != history_num)
1448 DCexport_history(history, history_num, &hosts_info, &items_info);
1449
1450 if (0 != trends_num)
1451 DCexport_trends(trends, trends_num, &hosts_info, &items_info);
1452
1453 zbx_hashset_destroy(&hosts_info);
1454 clean:
1455 zbx_hashset_destroy(&items_info);
1456 zbx_vector_uint64_destroy(&item_info_ids);
1457 zbx_vector_uint64_destroy(&hostids);
1458
1459 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1460 }
1461
1462 /******************************************************************************
1463 * *
1464 * Function: DCexport_all_trends *
1465 * *
1466 * Purpose: export all trends *
1467 * *
1468 * Parameters: trends - [IN] trends from cache *
1469 * trends_num - [IN] number of trends *
1470 * *
1471 ******************************************************************************/
DCexport_all_trends(const ZBX_DC_TREND * trends,int trends_num)1472 static void DCexport_all_trends(const ZBX_DC_TREND *trends, int trends_num)
1473 {
1474 DC_ITEM *items;
1475 zbx_vector_uint64_t itemids;
1476 int *errcodes, i, num;
1477
1478 zabbix_log(LOG_LEVEL_WARNING, "exporting trend data...");
1479
1480 while (0 < trends_num)
1481 {
1482 num = MIN(ZBX_HC_SYNC_MAX, trends_num);
1483
1484 items = (DC_ITEM *)zbx_malloc(NULL, sizeof(DC_ITEM) * (size_t)num);
1485 errcodes = (int *)zbx_malloc(NULL, sizeof(int) * (size_t)num);
1486
1487 zbx_vector_uint64_create(&itemids);
1488 zbx_vector_uint64_reserve(&itemids, num);
1489
1490 for (i = 0; i < num; i++)
1491 zbx_vector_uint64_append(&itemids, trends[i].itemid);
1492
1493 zbx_vector_uint64_sort(&itemids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1494
1495 DCconfig_get_items_by_itemids(items, itemids.values, errcodes, num);
1496
1497 DCexport_history_and_trends(NULL, 0, &itemids, items, errcodes, trends, num);
1498
1499 DCconfig_clean_items(items, errcodes, num);
1500 zbx_vector_uint64_destroy(&itemids);
1501 zbx_free(items);
1502 zbx_free(errcodes);
1503
1504 trends += num;
1505 trends_num -= num;
1506 }
1507
1508 zabbix_log(LOG_LEVEL_WARNING, "exporting trend data done");
1509 }
1510
1511 /******************************************************************************
1512 * *
1513 * Function: DCsync_trends *
1514 * *
1515 * Purpose: flush all trends to the database *
1516 * *
1517 * Author: Alexander Vladishev *
1518 * *
1519 ******************************************************************************/
DCsync_trends(void)1520 static void DCsync_trends(void)
1521 {
1522 zbx_hashset_iter_t iter;
1523 ZBX_DC_TREND *trends = NULL, *trend;
1524 int trends_alloc = 0, trends_num = 0, compression_age;
1525
1526 zabbix_log(LOG_LEVEL_DEBUG, "In %s() trends_num:%d", __func__, cache->trends_num);
1527
1528 compression_age = hc_get_history_compression_age();
1529
1530 zabbix_log(LOG_LEVEL_WARNING, "syncing trend data...");
1531
1532 LOCK_TRENDS;
1533
1534 zbx_hashset_iter_reset(&cache->trends, &iter);
1535
1536 while (NULL != (trend = (ZBX_DC_TREND *)zbx_hashset_iter_next(&iter)))
1537 {
1538 if (SUCCEED == zbx_history_requires_trends(trend->value_type) && trend->clock >= compression_age)
1539 DCflush_trend(trend, &trends, &trends_alloc, &trends_num);
1540 }
1541
1542 UNLOCK_TRENDS;
1543
1544 if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_TRENDS) && 0 != trends_num)
1545 DCexport_all_trends(trends, trends_num);
1546
1547 if (0 < trends_num)
1548 qsort(trends, trends_num, sizeof(ZBX_DC_TREND), zbx_trend_compare);
1549
1550 DBbegin();
1551
1552 while (trends_num > 0)
1553 DBflush_trends(trends, &trends_num, NULL);
1554
1555 DBcommit();
1556
1557 zbx_free(trends);
1558
1559 zabbix_log(LOG_LEVEL_WARNING, "syncing trend data done");
1560
1561 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1562 }
1563
1564 /******************************************************************************
1565 * *
1566 * Function: recalculate_triggers *
1567 * *
1568 * Purpose: re-calculate and update values of triggers related to the items *
1569 * *
1570 * Parameters: history - [IN] array of history data *
1571 * history_num - [IN] number of history structures *
1572 * history_itemids - [IN] the item identifiers *
1573 * (used for item lookup) *
1574 * history_items - [IN] the items *
1575 * history_errcodes - [IN] item error codes *
1576 * timer_triggerids - [IN] the timer triggerids to process *
1577 * ts - [IN] timer trigger timestamp *
1578 * trigger_diff - [OUT] trigger updates *
1579 * *
1580 ******************************************************************************/
recalculate_triggers(const ZBX_DC_HISTORY * history,int history_num,const zbx_vector_uint64_t * history_itemids,const DC_ITEM * history_items,const int * history_errcodes,const zbx_vector_uint64_t * timer_triggerids,zbx_timespec_t * ts,zbx_vector_ptr_t * trigger_diff)1581 static void recalculate_triggers(const ZBX_DC_HISTORY *history, int history_num,
1582 const zbx_vector_uint64_t *history_itemids, const DC_ITEM *history_items, const int *history_errcodes,
1583 const zbx_vector_uint64_t *timer_triggerids, zbx_timespec_t *ts, zbx_vector_ptr_t *trigger_diff)
1584 {
1585 int i, item_num = 0;
1586 zbx_uint64_t *itemids = NULL;
1587 zbx_timespec_t *timespecs = NULL;
1588 zbx_hashset_t trigger_info;
1589 zbx_vector_ptr_t trigger_order;
1590 zbx_vector_ptr_t trigger_items;
1591
1592 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1593
1594 if (0 != history_num)
1595 {
1596 itemids = (zbx_uint64_t *)zbx_malloc(itemids, sizeof(zbx_uint64_t) * (size_t)history_num);
1597 timespecs = (zbx_timespec_t *)zbx_malloc(timespecs, sizeof(zbx_timespec_t) * (size_t)history_num);
1598
1599 for (i = 0; i < history_num; i++)
1600 {
1601 const ZBX_DC_HISTORY *h = &history[i];
1602
1603 if (0 != (ZBX_DC_FLAG_NOVALUE & h->flags))
1604 continue;
1605
1606 itemids[item_num] = h->itemid;
1607 timespecs[item_num] = h->ts;
1608 item_num++;
1609 }
1610 }
1611
1612 if (0 == item_num && 0 == timer_triggerids->values_num)
1613 goto out;
1614
1615 zbx_hashset_create(&trigger_info, MAX(100, 2 * item_num + timer_triggerids->values_num),
1616 ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1617
1618 zbx_vector_ptr_create(&trigger_order);
1619 zbx_vector_ptr_reserve(&trigger_order, trigger_info.num_slots);
1620
1621 zbx_vector_ptr_create(&trigger_items);
1622
1623 if (0 != item_num)
1624 {
1625 DCconfig_get_triggers_by_itemids(&trigger_info, &trigger_order, itemids, timespecs, item_num);
1626 zbx_determine_items_in_expressions(&trigger_order, itemids, item_num);
1627 }
1628
1629 if (0 != timer_triggerids->values_num)
1630 zbx_dc_get_timer_triggers_by_triggerids(&trigger_info, &trigger_order, timer_triggerids, ts);
1631
1632 zbx_vector_ptr_sort(&trigger_order, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC);
1633 evaluate_expressions(&trigger_order, history_itemids, history_items, history_errcodes);
1634 zbx_process_triggers(&trigger_order, trigger_diff);
1635
1636 DCfree_triggers(&trigger_order);
1637
1638 zbx_vector_ptr_destroy(&trigger_items);
1639
1640 zbx_hashset_destroy(&trigger_info);
1641 zbx_vector_ptr_destroy(&trigger_order);
1642 out:
1643 zbx_free(timespecs);
1644 zbx_free(itemids);
1645
1646 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1647 }
1648
DCinventory_value_add(zbx_vector_ptr_t * inventory_values,const DC_ITEM * item,ZBX_DC_HISTORY * h)1649 static void DCinventory_value_add(zbx_vector_ptr_t *inventory_values, const DC_ITEM *item, ZBX_DC_HISTORY *h)
1650 {
1651 char value[MAX_BUFFER_LEN];
1652 const char *inventory_field;
1653 zbx_inventory_value_t *inventory_value;
1654
1655 if (ITEM_STATE_NOTSUPPORTED == h->state)
1656 return;
1657
1658 if (HOST_INVENTORY_AUTOMATIC != item->host.inventory_mode)
1659 return;
1660
1661 if (0 != (ZBX_DC_FLAG_UNDEF & h->flags) || 0 != (ZBX_DC_FLAG_NOVALUE & h->flags) ||
1662 NULL == (inventory_field = DBget_inventory_field(item->inventory_link)))
1663 {
1664 return;
1665 }
1666
1667 switch (h->value_type)
1668 {
1669 case ITEM_VALUE_TYPE_FLOAT:
1670 zbx_print_double(value, sizeof(value), h->value.dbl);
1671 break;
1672 case ITEM_VALUE_TYPE_UINT64:
1673 zbx_snprintf(value, sizeof(value), ZBX_FS_UI64, h->value.ui64);
1674 break;
1675 case ITEM_VALUE_TYPE_STR:
1676 case ITEM_VALUE_TYPE_TEXT:
1677 strscpy(value, h->value.str);
1678 break;
1679 default:
1680 return;
1681 }
1682
1683 zbx_format_value(value, sizeof(value), item->valuemapid, item->units, h->value_type);
1684
1685 inventory_value = (zbx_inventory_value_t *)zbx_malloc(NULL, sizeof(zbx_inventory_value_t));
1686
1687 inventory_value->hostid = item->host.hostid;
1688 inventory_value->idx = item->inventory_link - 1;
1689 inventory_value->field_name = inventory_field;
1690 inventory_value->value = zbx_strdup(NULL, value);
1691
1692 zbx_vector_ptr_append(inventory_values, inventory_value);
1693 }
1694
DCadd_update_inventory_sql(size_t * sql_offset,const zbx_vector_ptr_t * inventory_values)1695 static void DCadd_update_inventory_sql(size_t *sql_offset, const zbx_vector_ptr_t *inventory_values)
1696 {
1697 char *value_esc;
1698 int i;
1699
1700 for (i = 0; i < inventory_values->values_num; i++)
1701 {
1702 const zbx_inventory_value_t *inventory_value = (zbx_inventory_value_t *)inventory_values->values[i];
1703
1704 value_esc = DBdyn_escape_field("host_inventory", inventory_value->field_name, inventory_value->value);
1705
1706 zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset,
1707 "update host_inventory set %s='%s' where hostid=" ZBX_FS_UI64 ";\n",
1708 inventory_value->field_name, value_esc, inventory_value->hostid);
1709
1710 DBexecute_overflowed_sql(&sql, &sql_alloc, sql_offset);
1711
1712 zbx_free(value_esc);
1713 }
1714 }
1715
DCinventory_value_free(zbx_inventory_value_t * inventory_value)1716 static void DCinventory_value_free(zbx_inventory_value_t *inventory_value)
1717 {
1718 zbx_free(inventory_value->value);
1719 zbx_free(inventory_value);
1720 }
1721
1722 /******************************************************************************
1723 * *
1724 * Function: dc_history_clean_value *
1725 * *
1726 * Purpose: frees resources allocated to store str/text/log value *
1727 * *
1728 * Parameters: history - [IN] the history data *
1729 * history_num - [IN] the number of values in history data *
1730 * *
1731 ******************************************************************************/
dc_history_clean_value(ZBX_DC_HISTORY * history)1732 static void dc_history_clean_value(ZBX_DC_HISTORY *history)
1733 {
1734 if (ITEM_STATE_NOTSUPPORTED == history->state)
1735 {
1736 zbx_free(history->value.err);
1737 return;
1738 }
1739
1740 if (0 != (ZBX_DC_FLAG_NOVALUE & history->flags))
1741 return;
1742
1743 switch (history->value_type)
1744 {
1745 case ITEM_VALUE_TYPE_LOG:
1746 zbx_free(history->value.log->value);
1747 zbx_free(history->value.log->source);
1748 zbx_free(history->value.log);
1749 break;
1750 case ITEM_VALUE_TYPE_STR:
1751 case ITEM_VALUE_TYPE_TEXT:
1752 zbx_free(history->value.str);
1753 break;
1754 }
1755 }
1756
1757 /******************************************************************************
1758 * *
1759 * Function: hc_free_item_values *
1760 * *
1761 * Purpose: frees resources allocated to store str/text/log values *
1762 * *
1763 * Parameters: history - [IN] the history data *
1764 * history_num - [IN] the number of values in history data *
1765 * *
1766 ******************************************************************************/
hc_free_item_values(ZBX_DC_HISTORY * history,int history_num)1767 static void hc_free_item_values(ZBX_DC_HISTORY *history, int history_num)
1768 {
1769 int i;
1770
1771 for (i = 0; i < history_num; i++)
1772 dc_history_clean_value(&history[i]);
1773 }
1774
1775 /******************************************************************************
1776 * *
1777 * Function: dc_history_set_error *
1778 * *
1779 * Purpose: sets history data to notsupported *
1780 * *
1781 * Parameters: history - [IN] the history data *
1782 * errmsg - [IN] the error message *
1783 * *
1784 * Comments: The error message is stored directly and freed with when history *
1785 * data is cleaned. *
1786 * *
1787 ******************************************************************************/
dc_history_set_error(ZBX_DC_HISTORY * hdata,char * errmsg)1788 static void dc_history_set_error(ZBX_DC_HISTORY *hdata, char *errmsg)
1789 {
1790 dc_history_clean_value(hdata);
1791 hdata->value.err = errmsg;
1792 hdata->state = ITEM_STATE_NOTSUPPORTED;
1793 hdata->flags |= ZBX_DC_FLAG_UNDEF;
1794 }
1795
1796 /******************************************************************************
1797 * *
1798 * Function: dc_history_set_value *
1799 * *
1800 * Purpose: sets history data value *
1801 * *
1802 * Parameters: hdata - [IN/OUT] the history data *
1803 * value_type - [IN] the item value type *
1804 * value - [IN] the value to set *
1805 * *
1806 ******************************************************************************/
dc_history_set_value(ZBX_DC_HISTORY * hdata,unsigned char value_type,zbx_variant_t * value)1807 static void dc_history_set_value(ZBX_DC_HISTORY *hdata, unsigned char value_type, zbx_variant_t *value)
1808 {
1809 char *errmsg = NULL;
1810
1811 if (FAIL == zbx_variant_to_value_type(value, value_type, CONFIG_DOUBLE_PRECISION, &errmsg))
1812 {
1813 dc_history_set_error(hdata, errmsg);
1814 return;
1815 }
1816
1817 switch (value_type)
1818 {
1819 case ITEM_VALUE_TYPE_FLOAT:
1820 dc_history_clean_value(hdata);
1821 hdata->value.dbl = value->data.dbl;
1822 break;
1823 case ITEM_VALUE_TYPE_UINT64:
1824 dc_history_clean_value(hdata);
1825 hdata->value.ui64 = value->data.ui64;
1826 break;
1827 case ITEM_VALUE_TYPE_STR:
1828 dc_history_clean_value(hdata);
1829 hdata->value.str = value->data.str;
1830 hdata->value.str[zbx_db_strlen_n(hdata->value.str, HISTORY_STR_VALUE_LEN)] = '\0';
1831 break;
1832 case ITEM_VALUE_TYPE_TEXT:
1833 dc_history_clean_value(hdata);
1834 hdata->value.str = value->data.str;
1835 hdata->value.str[zbx_db_strlen_n(hdata->value.str, HISTORY_TEXT_VALUE_LEN)] = '\0';
1836 break;
1837 case ITEM_VALUE_TYPE_LOG:
1838 if (ITEM_VALUE_TYPE_LOG != hdata->value_type)
1839 {
1840 dc_history_clean_value(hdata);
1841 hdata->value.log = (zbx_log_value_t *)zbx_malloc(NULL, sizeof(zbx_log_value_t));
1842 memset(hdata->value.log, 0, sizeof(zbx_log_value_t));
1843 }
1844 hdata->value.log->value = value->data.str;
1845 hdata->value.str[zbx_db_strlen_n(hdata->value.str, HISTORY_LOG_VALUE_LEN)] = '\0';
1846 }
1847
1848 hdata->value_type = value_type;
1849 zbx_variant_set_none(value);
1850 }
1851
1852 /******************************************************************************
1853 * *
1854 * Function: normalize_item_value *
1855 * *
1856 * Purpose: normalize item value by performing truncation of long text *
1857 * values and changes value format according to the item value type *
1858 * *
1859 * Parameters: item - [IN] the item *
1860 * hdata - [IN/OUT] the historical data to process *
1861 * *
1862 ******************************************************************************/
normalize_item_value(const DC_ITEM * item,ZBX_DC_HISTORY * hdata)1863 static void normalize_item_value(const DC_ITEM *item, ZBX_DC_HISTORY *hdata)
1864 {
1865 char *logvalue;
1866 zbx_variant_t value_var;
1867
1868 if (0 != (hdata->flags & ZBX_DC_FLAG_NOVALUE))
1869 return;
1870
1871 if (ITEM_STATE_NOTSUPPORTED == hdata->state)
1872 return;
1873
1874 if (0 == (hdata->flags & ZBX_DC_FLAG_NOHISTORY))
1875 hdata->ttl = item->history_sec;
1876
1877 if (item->value_type == hdata->value_type)
1878 {
1879 /* truncate text based values if necessary */
1880 switch (hdata->value_type)
1881 {
1882 case ITEM_VALUE_TYPE_STR:
1883 hdata->value.str[zbx_db_strlen_n(hdata->value.str, HISTORY_STR_VALUE_LEN)] = '\0';
1884 break;
1885 case ITEM_VALUE_TYPE_TEXT:
1886 hdata->value.str[zbx_db_strlen_n(hdata->value.str, HISTORY_TEXT_VALUE_LEN)] = '\0';
1887 break;
1888 case ITEM_VALUE_TYPE_LOG:
1889 logvalue = hdata->value.log->value;
1890 logvalue[zbx_db_strlen_n(logvalue, HISTORY_LOG_VALUE_LEN)] = '\0';
1891 break;
1892 case ITEM_VALUE_TYPE_FLOAT:
1893 if (FAIL == zbx_validate_value_dbl(hdata->value.dbl, CONFIG_DOUBLE_PRECISION))
1894 {
1895 char buffer[ZBX_MAX_DOUBLE_LEN + 1];
1896
1897 dc_history_set_error(hdata, zbx_dsprintf(NULL,
1898 "Value %s is too small or too large.",
1899 zbx_print_double(buffer, sizeof(buffer), hdata->value.dbl)));
1900 }
1901 break;
1902 }
1903 return;
1904 }
1905
1906 switch (hdata->value_type)
1907 {
1908 case ITEM_VALUE_TYPE_FLOAT:
1909 zbx_variant_set_dbl(&value_var, hdata->value.dbl);
1910 break;
1911 case ITEM_VALUE_TYPE_UINT64:
1912 zbx_variant_set_ui64(&value_var, hdata->value.ui64);
1913 break;
1914 case ITEM_VALUE_TYPE_STR:
1915 case ITEM_VALUE_TYPE_TEXT:
1916 zbx_variant_set_str(&value_var, hdata->value.str);
1917 hdata->value.str = NULL;
1918 break;
1919 case ITEM_VALUE_TYPE_LOG:
1920 zbx_variant_set_str(&value_var, hdata->value.log->value);
1921 hdata->value.log->value = NULL;
1922 break;
1923 }
1924
1925 dc_history_set_value(hdata, item->value_type, &value_var);
1926 zbx_variant_clear(&value_var);
1927 }
1928
1929 /******************************************************************************
1930 * *
1931 * Function: calculate_item_update *
1932 * *
1933 * Purpose: calculates what item fields must be updated *
1934 * *
1935 * Parameters: item - [IN] the item *
1936 * h - [IN] the historical data to process *
1937 * *
1938 * Return value: The update data. This data must be freed by the caller. *
1939 * *
1940 * Comments: Will generate internal events when item state switches. *
1941 * *
1942 ******************************************************************************/
calculate_item_update(DC_ITEM * item,const ZBX_DC_HISTORY * h)1943 static zbx_item_diff_t *calculate_item_update(DC_ITEM *item, const ZBX_DC_HISTORY *h)
1944 {
1945 zbx_uint64_t flags;
1946 const char *item_error = NULL;
1947 zbx_item_diff_t *diff;
1948
1949 flags = item->host.proxy_hostid == 0 ? ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTCLOCK : 0;
1950
1951 if (0 != (ZBX_DC_FLAG_META & h->flags))
1952 {
1953 if (item->lastlogsize != h->lastlogsize)
1954 flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE;
1955
1956 if (item->mtime != h->mtime)
1957 flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME;
1958 }
1959
1960 if (h->state != item->state)
1961 {
1962 flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE;
1963
1964 if (ITEM_STATE_NOTSUPPORTED == h->state)
1965 {
1966 zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" became not supported: %s",
1967 item->host.host, item->key_orig, h->value.str);
1968
1969 zbx_add_event(EVENT_SOURCE_INTERNAL, EVENT_OBJECT_ITEM, item->itemid, &h->ts, h->state, NULL,
1970 NULL, NULL, 0, 0, NULL, 0, NULL, 0, NULL, h->value.err);
1971
1972 if (0 != strcmp(item->error, h->value.err))
1973 item_error = h->value.err;
1974 }
1975 else
1976 {
1977 zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" became supported",
1978 item->host.host, item->key_orig);
1979
1980 /* we know it's EVENT_OBJECT_ITEM because LLDRULE that becomes */
1981 /* supported is handled in lld_process_discovery_rule() */
1982 zbx_add_event(EVENT_SOURCE_INTERNAL, EVENT_OBJECT_ITEM, item->itemid, &h->ts, h->state,
1983 NULL, NULL, NULL, 0, 0, NULL, 0, NULL, 0, NULL, NULL);
1984
1985 item_error = "";
1986 }
1987 }
1988 else if (ITEM_STATE_NOTSUPPORTED == h->state && 0 != strcmp(item->error, h->value.err))
1989 {
1990 zabbix_log(LOG_LEVEL_WARNING, "error reason for \"%s:%s\" changed: %s", item->host.host,
1991 item->key_orig, h->value.err);
1992
1993 item_error = h->value.err;
1994 }
1995
1996 if (NULL != item_error)
1997 flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_ERROR;
1998
1999 if (0 == flags)
2000 return NULL;
2001
2002 diff = (zbx_item_diff_t *)zbx_malloc(NULL, sizeof(zbx_item_diff_t));
2003 diff->itemid = item->itemid;
2004 diff->lastclock = h->ts.sec;
2005 diff->flags = flags;
2006
2007 if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE & flags))
2008 diff->lastlogsize = h->lastlogsize;
2009
2010 if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME & flags))
2011 diff->mtime = h->mtime;
2012
2013 if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE & flags))
2014 {
2015 diff->state = h->state;
2016 item->state = h->state;
2017 }
2018
2019 if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_ERROR & flags))
2020 diff->error = item_error;
2021
2022 return diff;
2023 }
2024
2025 /******************************************************************************
2026 * *
2027 * Function: DBmass_update_items *
2028 * *
2029 * Purpose: update item data and inventory in database *
2030 * *
2031 * Parameters: item_diff - item changes *
2032 * inventory_values - inventory values *
2033 * *
2034 ******************************************************************************/
DBmass_update_items(const zbx_vector_ptr_t * item_diff,const zbx_vector_ptr_t * inventory_values)2035 static void DBmass_update_items(const zbx_vector_ptr_t *item_diff, const zbx_vector_ptr_t *inventory_values)
2036 {
2037 size_t sql_offset = 0;
2038 int i;
2039
2040 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
2041
2042 for (i = 0; i < item_diff->values_num; i++)
2043 {
2044 zbx_item_diff_t *diff;
2045
2046 diff = (zbx_item_diff_t *)item_diff->values[i];
2047 if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_DB & diff->flags))
2048 break;
2049 }
2050
2051 if (i != item_diff->values_num || 0 != inventory_values->values_num)
2052 {
2053 DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset);
2054
2055 if (i != item_diff->values_num)
2056 {
2057 zbx_db_save_item_changes(&sql, &sql_alloc, &sql_offset, item_diff,
2058 ZBX_FLAGS_ITEM_DIFF_UPDATE_DB);
2059 }
2060
2061 if (0 != inventory_values->values_num)
2062 DCadd_update_inventory_sql(&sql_offset, inventory_values);
2063
2064 DBend_multiple_update(&sql, &sql_alloc, &sql_offset);
2065
2066 if (sql_offset > 16) /* In ORACLE always present begin..end; */
2067 DBexecute("%s", sql);
2068
2069 DCconfig_update_inventory_values(inventory_values);
2070 }
2071
2072 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
2073 }
2074
2075 /******************************************************************************
2076 * *
2077 * Function: DCmass_proxy_update_items *
2078 * *
2079 * Purpose: update items info after new value is received *
2080 * *
2081 * Parameters: history - array of history data *
2082 * history_num - number of history structures *
2083 * *
2084 * Author: Alexei Vladishev, Eugene Grigorjev, Alexander Vladishev *
2085 * *
2086 ******************************************************************************/
DCmass_proxy_update_items(ZBX_DC_HISTORY * history,int history_num)2087 static void DCmass_proxy_update_items(ZBX_DC_HISTORY *history, int history_num)
2088 {
2089 int i;
2090 zbx_vector_ptr_t item_diff;
2091 zbx_item_diff_t *diffs;
2092
2093 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
2094
2095 zbx_vector_ptr_create(&item_diff);
2096 zbx_vector_ptr_reserve(&item_diff, history_num);
2097
2098 /* preallocate zbx_item_diff_t structures for item_diff vector */
2099 diffs = (zbx_item_diff_t *)zbx_malloc(NULL, sizeof(zbx_item_diff_t) * history_num);
2100
2101 for (i = 0; i < history_num; i++)
2102 {
2103 zbx_item_diff_t *diff = &diffs[i];
2104
2105 diff->itemid = history[i].itemid;
2106 diff->state = history[i].state;
2107 diff->lastclock = history[i].ts.sec;
2108 diff->flags = ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE | ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTCLOCK;
2109
2110 if (0 != (ZBX_DC_FLAG_META & history[i].flags))
2111 {
2112 diff->lastlogsize = history[i].lastlogsize;
2113 diff->mtime = history[i].mtime;
2114 diff->flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE | ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME;
2115 }
2116
2117 zbx_vector_ptr_append(&item_diff, diff);
2118 }
2119
2120 if (0 != item_diff.values_num)
2121 {
2122 size_t sql_offset = 0;
2123
2124 zbx_vector_ptr_sort(&item_diff, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC);
2125
2126 DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset);
2127
2128 zbx_db_save_item_changes(&sql, &sql_alloc, &sql_offset, &item_diff,
2129 ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE | ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME);
2130
2131 DBend_multiple_update(&sql, &sql_alloc, &sql_offset);
2132
2133 if (sql_offset > 16) /* In ORACLE always present begin..end; */
2134 DBexecute("%s", sql);
2135
2136 DCconfig_items_apply_changes(&item_diff);
2137 }
2138
2139 zbx_vector_ptr_destroy(&item_diff);
2140 zbx_free(diffs);
2141
2142 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
2143 }
2144
2145 /******************************************************************************
2146 * *
2147 * Function: DBmass_add_history *
2148 * *
2149 * Purpose: inserting new history data after new value is received *
2150 * *
2151 * Parameters: history - array of history data *
2152 * history_num - number of history structures *
2153 * *
2154 ******************************************************************************/
DBmass_add_history(ZBX_DC_HISTORY * history,int history_num)2155 static int DBmass_add_history(ZBX_DC_HISTORY *history, int history_num)
2156 {
2157 int i, ret = SUCCEED;
2158 zbx_vector_ptr_t history_values;
2159
2160 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
2161
2162 zbx_vector_ptr_create(&history_values);
2163 zbx_vector_ptr_reserve(&history_values, history_num);
2164
2165 for (i = 0; i < history_num; i++)
2166 {
2167 ZBX_DC_HISTORY *h = &history[i];
2168
2169 if (0 != (ZBX_DC_FLAGS_NOT_FOR_HISTORY & h->flags))
2170 continue;
2171
2172 zbx_vector_ptr_append(&history_values, h);
2173 }
2174
2175 if (0 != history_values.values_num)
2176 ret = zbx_vc_add_values(&history_values);
2177
2178 zbx_vector_ptr_destroy(&history_values);
2179
2180 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
2181
2182 return ret;
2183 }
2184
2185 /******************************************************************************
2186 * *
2187 * Function: dc_add_proxy_history *
2188 * *
2189 * Purpose: helper function for DCmass_proxy_add_history() *
2190 * *
2191 * Comment: this function is meant for items with value_type other other than *
2192 * ITEM_VALUE_TYPE_LOG not containing meta information in result *
2193 * *
2194 ******************************************************************************/
dc_add_proxy_history(ZBX_DC_HISTORY * history,int history_num)2195 static void dc_add_proxy_history(ZBX_DC_HISTORY *history, int history_num)
2196 {
2197 int i, now;
2198 unsigned int flags;
2199 char buffer[64], *pvalue;
2200 zbx_db_insert_t db_insert;
2201
2202 now = (int)time(NULL);
2203 zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "value", "flags", "write_clock",
2204 NULL);
2205
2206 for (i = 0; i < history_num; i++)
2207 {
2208 const ZBX_DC_HISTORY *h = &history[i];
2209
2210 if (0 != (h->flags & ZBX_DC_FLAG_UNDEF))
2211 continue;
2212
2213 if (0 != (h->flags & ZBX_DC_FLAG_META))
2214 continue;
2215
2216 if (ITEM_STATE_NOTSUPPORTED == h->state)
2217 continue;
2218
2219 if (0 == (h->flags & ZBX_DC_FLAG_NOVALUE))
2220 {
2221 switch (h->value_type)
2222 {
2223 case ITEM_VALUE_TYPE_FLOAT:
2224 zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_DBL64, h->value.dbl);
2225 break;
2226 case ITEM_VALUE_TYPE_UINT64:
2227 zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_UI64, h->value.ui64);
2228 break;
2229 case ITEM_VALUE_TYPE_STR:
2230 case ITEM_VALUE_TYPE_TEXT:
2231 pvalue = h->value.str;
2232 break;
2233 case ITEM_VALUE_TYPE_LOG:
2234 continue;
2235 default:
2236 THIS_SHOULD_NEVER_HAPPEN;
2237 continue;
2238 }
2239 flags = 0;
2240 }
2241 else
2242 {
2243 flags = PROXY_HISTORY_FLAG_NOVALUE;
2244 pvalue = (char *)"";
2245 }
2246
2247 zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, pvalue, flags, now);
2248 }
2249
2250 zbx_db_insert_execute(&db_insert);
2251 zbx_db_insert_clean(&db_insert);
2252 }
2253
2254 /******************************************************************************
2255 * *
2256 * Function: dc_add_proxy_history_meta *
2257 * *
2258 * Purpose: helper function for DCmass_proxy_add_history() *
2259 * *
2260 * Comment: this function is meant for items with value_type other other than *
2261 * ITEM_VALUE_TYPE_LOG containing meta information in result *
2262 * *
2263 ******************************************************************************/
dc_add_proxy_history_meta(ZBX_DC_HISTORY * history,int history_num)2264 static void dc_add_proxy_history_meta(ZBX_DC_HISTORY *history, int history_num)
2265 {
2266 int i, now;
2267 char buffer[64], *pvalue;
2268 zbx_db_insert_t db_insert;
2269
2270 now = (int)time(NULL);
2271 zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "value", "lastlogsize", "mtime",
2272 "flags", "write_clock", NULL);
2273
2274 for (i = 0; i < history_num; i++)
2275 {
2276 unsigned int flags = PROXY_HISTORY_FLAG_META;
2277 const ZBX_DC_HISTORY *h = &history[i];
2278
2279 if (ITEM_STATE_NOTSUPPORTED == h->state)
2280 continue;
2281
2282 if (0 != (h->flags & ZBX_DC_FLAG_UNDEF))
2283 continue;
2284
2285 if (0 == (h->flags & ZBX_DC_FLAG_META))
2286 continue;
2287
2288 if (ITEM_VALUE_TYPE_LOG == h->value_type)
2289 continue;
2290
2291 if (0 == (h->flags & ZBX_DC_FLAG_NOVALUE))
2292 {
2293 switch (h->value_type)
2294 {
2295 case ITEM_VALUE_TYPE_FLOAT:
2296 zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_DBL64, h->value.dbl);
2297 break;
2298 case ITEM_VALUE_TYPE_UINT64:
2299 zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_UI64, h->value.ui64);
2300 break;
2301 case ITEM_VALUE_TYPE_STR:
2302 case ITEM_VALUE_TYPE_TEXT:
2303 pvalue = h->value.str;
2304 break;
2305 default:
2306 THIS_SHOULD_NEVER_HAPPEN;
2307 continue;
2308 }
2309 }
2310 else
2311 {
2312 flags |= PROXY_HISTORY_FLAG_NOVALUE;
2313 pvalue = (char *)"";
2314 }
2315
2316 zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, pvalue, h->lastlogsize, h->mtime,
2317 flags, now);
2318 }
2319
2320 zbx_db_insert_execute(&db_insert);
2321 zbx_db_insert_clean(&db_insert);
2322 }
2323
2324 /******************************************************************************
2325 * *
2326 * Function: dc_add_proxy_history_log *
2327 * *
2328 * Purpose: helper function for DCmass_proxy_add_history() *
2329 * *
2330 * Comment: this function is meant for items with value_type *
2331 * ITEM_VALUE_TYPE_LOG *
2332 * *
2333 ******************************************************************************/
dc_add_proxy_history_log(ZBX_DC_HISTORY * history,int history_num)2334 static void dc_add_proxy_history_log(ZBX_DC_HISTORY *history, int history_num)
2335 {
2336 int i, now;
2337 zbx_db_insert_t db_insert;
2338
2339 now = (int)time(NULL);
2340
2341 /* see hc_copy_history_data() for fields that might be uninitialized and need special handling here */
2342 zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "timestamp", "source", "severity",
2343 "value", "logeventid", "lastlogsize", "mtime", "flags", "write_clock", NULL);
2344
2345 for (i = 0; i < history_num; i++)
2346 {
2347 unsigned int flags;
2348 zbx_uint64_t lastlogsize;
2349 int mtime;
2350 const ZBX_DC_HISTORY *h = &history[i];
2351
2352 if (ITEM_STATE_NOTSUPPORTED == h->state)
2353 continue;
2354
2355 if (ITEM_VALUE_TYPE_LOG != h->value_type)
2356 continue;
2357
2358 if (0 == (h->flags & ZBX_DC_FLAG_NOVALUE))
2359 {
2360 zbx_log_value_t *log = h->value.log;
2361
2362 if (0 != (h->flags & ZBX_DC_FLAG_META))
2363 {
2364 flags = PROXY_HISTORY_FLAG_META;
2365 lastlogsize = h->lastlogsize;
2366 mtime = h->mtime;
2367 }
2368 else
2369 {
2370 flags = 0;
2371 lastlogsize = 0;
2372 mtime = 0;
2373 }
2374
2375 zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, log->timestamp,
2376 ZBX_NULL2EMPTY_STR(log->source), log->severity, log->value, log->logeventid,
2377 lastlogsize, mtime, flags, now);
2378 }
2379 else
2380 {
2381 /* sent to server only if not 0, see proxy_get_history_data() */
2382 const int unset_if_novalue = 0;
2383
2384 flags = PROXY_HISTORY_FLAG_META | PROXY_HISTORY_FLAG_NOVALUE;
2385
2386 zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, unset_if_novalue, "",
2387 unset_if_novalue, "", unset_if_novalue, h->lastlogsize, h->mtime, flags, now);
2388 }
2389 }
2390
2391 zbx_db_insert_execute(&db_insert);
2392 zbx_db_insert_clean(&db_insert);
2393 }
2394
2395 /******************************************************************************
2396 * *
2397 * Function: dc_add_proxy_history_notsupported *
2398 * *
2399 * Purpose: helper function for DCmass_proxy_add_history() *
2400 * *
2401 ******************************************************************************/
dc_add_proxy_history_notsupported(ZBX_DC_HISTORY * history,int history_num)2402 static void dc_add_proxy_history_notsupported(ZBX_DC_HISTORY *history, int history_num)
2403 {
2404 int i, now;
2405 zbx_db_insert_t db_insert;
2406
2407 now = (int)time(NULL);
2408 zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "value", "state", "write_clock",
2409 NULL);
2410
2411 for (i = 0; i < history_num; i++)
2412 {
2413 const ZBX_DC_HISTORY *h = &history[i];
2414
2415 if (ITEM_STATE_NOTSUPPORTED != h->state)
2416 continue;
2417
2418 zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, ZBX_NULL2EMPTY_STR(h->value.err),
2419 (int)h->state, now);
2420 }
2421
2422 zbx_db_insert_execute(&db_insert);
2423 zbx_db_insert_clean(&db_insert);
2424 }
2425
2426 /******************************************************************************
2427 * *
2428 * Function: DCmass_proxy_add_history *
2429 * *
2430 * Purpose: inserting new history data after new value is received *
2431 * *
2432 * Parameters: history - array of history data *
2433 * history_num - number of history structures *
2434 * *
2435 * Author: Alexander Vladishev *
2436 * *
2437 ******************************************************************************/
DCmass_proxy_add_history(ZBX_DC_HISTORY * history,int history_num)2438 static void DCmass_proxy_add_history(ZBX_DC_HISTORY *history, int history_num)
2439 {
2440 int i, h_num = 0, h_meta_num = 0, hlog_num = 0, notsupported_num = 0;
2441
2442 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
2443
2444 for (i = 0; i < history_num; i++)
2445 {
2446 const ZBX_DC_HISTORY *h = &history[i];
2447
2448 if (ITEM_STATE_NOTSUPPORTED == h->state)
2449 {
2450 notsupported_num++;
2451 continue;
2452 }
2453
2454 switch (h->value_type)
2455 {
2456 case ITEM_VALUE_TYPE_LOG:
2457 hlog_num++;
2458 break;
2459 case ITEM_VALUE_TYPE_FLOAT:
2460 case ITEM_VALUE_TYPE_UINT64:
2461 case ITEM_VALUE_TYPE_STR:
2462 case ITEM_VALUE_TYPE_TEXT:
2463 if (0 != (h->flags & ZBX_DC_FLAG_META))
2464 h_meta_num++;
2465 else
2466 h_num++;
2467 break;
2468 case ITEM_VALUE_TYPE_NONE:
2469 h_num++;
2470 break;
2471 default:
2472 THIS_SHOULD_NEVER_HAPPEN;
2473 }
2474 }
2475
2476 if (0 != h_num)
2477 dc_add_proxy_history(history, history_num);
2478
2479 if (0 != h_meta_num)
2480 dc_add_proxy_history_meta(history, history_num);
2481
2482 if (0 != hlog_num)
2483 dc_add_proxy_history_log(history, history_num);
2484
2485 if (0 != notsupported_num)
2486 dc_add_proxy_history_notsupported(history, history_num);
2487
2488 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
2489 }
2490
2491 /******************************************************************************
2492 * *
2493 * Function: DCmass_prepare_history *
2494 * *
2495 * Purpose: prepare history data using items from configuration cache and *
2496 * generate item changes to be applied and host inventory values to *
2497 * be added *
2498 * *
2499 * Parameters: history - [IN/OUT] array of history data *
2500 * itemids - [IN] the item identifiers *
2501 * (used for item lookup) *
2502 * items - [IN] the items *
2503 * errcodes - [IN] item error codes *
2504 * history_num - [IN] number of history structures *
2505 * item_diff - [OUT] the changes in item data *
2506 * inventory_values - [OUT] the inventory values to add *
2507 * compression_age - [IN] history compression age *
2508 * proxy_subscribtions - [IN] history compression age *
2509 * *
2510 ******************************************************************************/
DCmass_prepare_history(ZBX_DC_HISTORY * history,const zbx_vector_uint64_t * itemids,DC_ITEM * items,const int * errcodes,int history_num,zbx_vector_ptr_t * item_diff,zbx_vector_ptr_t * inventory_values,int compression_age,zbx_vector_uint64_pair_t * proxy_subscribtions)2511 static void DCmass_prepare_history(ZBX_DC_HISTORY *history, const zbx_vector_uint64_t *itemids,
2512 DC_ITEM *items, const int *errcodes, int history_num, zbx_vector_ptr_t *item_diff,
2513 zbx_vector_ptr_t *inventory_values, int compression_age, zbx_vector_uint64_pair_t *proxy_subscribtions)
2514 {
2515 static time_t last_history_discard = 0;
2516 time_t now;
2517 int i;
2518
2519 zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d", __func__, history_num);
2520
2521 now = time(NULL);
2522
2523 for (i = 0; i < history_num; i++)
2524 {
2525 ZBX_DC_HISTORY *h = &history[i];
2526 DC_ITEM *item;
2527 zbx_item_diff_t *diff;
2528 int index;
2529
2530 /* discard history items that are older than compression age */
2531 if (0 != compression_age && h->ts.sec < compression_age)
2532 {
2533 if (SEC_PER_HOUR < (now - last_history_discard)) /* log once per hour */
2534 {
2535 zabbix_log(LOG_LEVEL_TRACE, "discarding history that is pointing to"
2536 " compressed history period");
2537 last_history_discard = now;
2538 }
2539
2540 h->flags |= ZBX_DC_FLAG_UNDEF;
2541 continue;
2542 }
2543
2544 if (FAIL == (index = zbx_vector_uint64_bsearch(itemids, h->itemid, ZBX_DEFAULT_UINT64_COMPARE_FUNC)))
2545 {
2546 THIS_SHOULD_NEVER_HAPPEN;
2547 h->flags |= ZBX_DC_FLAG_UNDEF;
2548 continue;
2549 }
2550
2551 if (SUCCEED != errcodes[index])
2552 {
2553 h->flags |= ZBX_DC_FLAG_UNDEF;
2554 continue;
2555 }
2556
2557 item = &items[index];
2558
2559 if (ITEM_STATUS_ACTIVE != item->status || HOST_STATUS_MONITORED != item->host.status)
2560 {
2561 h->flags |= ZBX_DC_FLAG_UNDEF;
2562 continue;
2563 }
2564
2565 if (0 == item->history)
2566 {
2567 h->flags |= ZBX_DC_FLAG_NOHISTORY;
2568 }
2569 else if (now - h->ts.sec > item->history_sec)
2570 {
2571 h->flags |= ZBX_DC_FLAG_NOHISTORY;
2572 zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" value timestamp \"%s %s\" is outside history "
2573 "storage period", item->host.host, item->key_orig, zbx_date2str(h->ts.sec),
2574 zbx_time2str(h->ts.sec));
2575 }
2576
2577 if (ITEM_VALUE_TYPE_FLOAT == item->value_type || ITEM_VALUE_TYPE_UINT64 == item->value_type)
2578 {
2579 if (0 == item->trends)
2580 {
2581 h->flags |= ZBX_DC_FLAG_NOTRENDS;
2582 }
2583 else if (now - h->ts.sec > item->trends_sec)
2584 {
2585 h->flags |= ZBX_DC_FLAG_NOTRENDS;
2586 zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" value timestamp \"%s %s\" is outside "
2587 "trends storage period", item->host.host, item->key_orig,
2588 zbx_date2str(h->ts.sec), zbx_time2str(h->ts.sec));
2589 }
2590 }
2591 else
2592 h->flags |= ZBX_DC_FLAG_NOTRENDS;
2593
2594 normalize_item_value(item, h);
2595
2596 /* calculate item update and update already retrieved item status for trigger calculation */
2597 if (NULL != (diff = calculate_item_update(item, h)))
2598 zbx_vector_ptr_append(item_diff, diff);
2599
2600 DCinventory_value_add(inventory_values, item, h);
2601
2602 if (0 != item->host.proxy_hostid && FAIL == is_item_processed_by_server(item->type, item->key_orig))
2603 {
2604 zbx_uint64_pair_t p = {item->host.proxy_hostid, h->ts.sec};
2605
2606 zbx_vector_uint64_pair_append(proxy_subscribtions, p);
2607 }
2608 }
2609
2610 zbx_vector_ptr_sort(inventory_values, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC);
2611 zbx_vector_ptr_sort(item_diff, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC);
2612
2613 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
2614 }
2615
2616 /******************************************************************************
2617 * *
2618 * Function: DCmodule_prepare_history *
2619 * *
2620 * Purpose: prepare history data to share them with loadable modules, sort *
2621 * data by type skipping low-level discovery data, meta information *
2622 * updates and notsupported items *
2623 * *
2624 * Parameters: history - [IN] array of history data *
2625 * history_num - [IN] number of history structures *
2626 * history_<type> - [OUT] array of historical data of a *
2627 * specific data type *
2628 * history_<type>_num - [OUT] number of values of a specific *
2629 * data type *
2630 * *
2631 ******************************************************************************/
DCmodule_prepare_history(ZBX_DC_HISTORY * history,int history_num,ZBX_HISTORY_FLOAT * history_float,int * history_float_num,ZBX_HISTORY_INTEGER * history_integer,int * history_integer_num,ZBX_HISTORY_STRING * history_string,int * history_string_num,ZBX_HISTORY_TEXT * history_text,int * history_text_num,ZBX_HISTORY_LOG * history_log,int * history_log_num)2632 static void DCmodule_prepare_history(ZBX_DC_HISTORY *history, int history_num, ZBX_HISTORY_FLOAT *history_float,
2633 int *history_float_num, ZBX_HISTORY_INTEGER *history_integer, int *history_integer_num,
2634 ZBX_HISTORY_STRING *history_string, int *history_string_num, ZBX_HISTORY_TEXT *history_text,
2635 int *history_text_num, ZBX_HISTORY_LOG *history_log, int *history_log_num)
2636 {
2637 ZBX_DC_HISTORY *h;
2638 ZBX_HISTORY_FLOAT *h_float;
2639 ZBX_HISTORY_INTEGER *h_integer;
2640 ZBX_HISTORY_STRING *h_string;
2641 ZBX_HISTORY_TEXT *h_text;
2642 ZBX_HISTORY_LOG *h_log;
2643 int i;
2644 const zbx_log_value_t *log;
2645
2646 *history_float_num = 0;
2647 *history_integer_num = 0;
2648 *history_string_num = 0;
2649 *history_text_num = 0;
2650 *history_log_num = 0;
2651
2652 for (i = 0; i < history_num; i++)
2653 {
2654 h = &history[i];
2655
2656 if (0 != (ZBX_DC_FLAGS_NOT_FOR_MODULES & h->flags))
2657 continue;
2658
2659 switch (h->value_type)
2660 {
2661 case ITEM_VALUE_TYPE_FLOAT:
2662 if (NULL == history_float_cbs)
2663 continue;
2664
2665 h_float = &history_float[(*history_float_num)++];
2666 h_float->itemid = h->itemid;
2667 h_float->clock = h->ts.sec;
2668 h_float->ns = h->ts.ns;
2669 h_float->value = h->value.dbl;
2670 break;
2671 case ITEM_VALUE_TYPE_UINT64:
2672 if (NULL == history_integer_cbs)
2673 continue;
2674
2675 h_integer = &history_integer[(*history_integer_num)++];
2676 h_integer->itemid = h->itemid;
2677 h_integer->clock = h->ts.sec;
2678 h_integer->ns = h->ts.ns;
2679 h_integer->value = h->value.ui64;
2680 break;
2681 case ITEM_VALUE_TYPE_STR:
2682 if (NULL == history_string_cbs)
2683 continue;
2684
2685 h_string = &history_string[(*history_string_num)++];
2686 h_string->itemid = h->itemid;
2687 h_string->clock = h->ts.sec;
2688 h_string->ns = h->ts.ns;
2689 h_string->value = h->value.str;
2690 break;
2691 case ITEM_VALUE_TYPE_TEXT:
2692 if (NULL == history_text_cbs)
2693 continue;
2694
2695 h_text = &history_text[(*history_text_num)++];
2696 h_text->itemid = h->itemid;
2697 h_text->clock = h->ts.sec;
2698 h_text->ns = h->ts.ns;
2699 h_text->value = h->value.str;
2700 break;
2701 case ITEM_VALUE_TYPE_LOG:
2702 if (NULL == history_log_cbs)
2703 continue;
2704
2705 log = h->value.log;
2706 h_log = &history_log[(*history_log_num)++];
2707 h_log->itemid = h->itemid;
2708 h_log->clock = h->ts.sec;
2709 h_log->ns = h->ts.ns;
2710 h_log->value = log->value;
2711 h_log->source = ZBX_NULL2EMPTY_STR(log->source);
2712 h_log->timestamp = log->timestamp;
2713 h_log->logeventid = log->logeventid;
2714 h_log->severity = log->severity;
2715 break;
2716 default:
2717 THIS_SHOULD_NEVER_HAPPEN;
2718 }
2719 }
2720 }
2721
DCmodule_sync_history(int history_float_num,int history_integer_num,int history_string_num,int history_text_num,int history_log_num,ZBX_HISTORY_FLOAT * history_float,ZBX_HISTORY_INTEGER * history_integer,ZBX_HISTORY_STRING * history_string,ZBX_HISTORY_TEXT * history_text,ZBX_HISTORY_LOG * history_log)2722 static void DCmodule_sync_history(int history_float_num, int history_integer_num, int history_string_num,
2723 int history_text_num, int history_log_num, ZBX_HISTORY_FLOAT *history_float,
2724 ZBX_HISTORY_INTEGER *history_integer, ZBX_HISTORY_STRING *history_string,
2725 ZBX_HISTORY_TEXT *history_text, ZBX_HISTORY_LOG *history_log)
2726 {
2727 if (0 != history_float_num)
2728 {
2729 int i;
2730
2731 zabbix_log(LOG_LEVEL_DEBUG, "syncing float history data with modules...");
2732
2733 for (i = 0; NULL != history_float_cbs[i].module; i++)
2734 {
2735 zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_float_cbs[i].module->name);
2736 history_float_cbs[i].history_float_cb(history_float, history_float_num);
2737 }
2738
2739 zabbix_log(LOG_LEVEL_DEBUG, "synced %d float values with modules", history_float_num);
2740 }
2741
2742 if (0 != history_integer_num)
2743 {
2744 int i;
2745
2746 zabbix_log(LOG_LEVEL_DEBUG, "syncing integer history data with modules...");
2747
2748 for (i = 0; NULL != history_integer_cbs[i].module; i++)
2749 {
2750 zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_integer_cbs[i].module->name);
2751 history_integer_cbs[i].history_integer_cb(history_integer, history_integer_num);
2752 }
2753
2754 zabbix_log(LOG_LEVEL_DEBUG, "synced %d integer values with modules", history_integer_num);
2755 }
2756
2757 if (0 != history_string_num)
2758 {
2759 int i;
2760
2761 zabbix_log(LOG_LEVEL_DEBUG, "syncing string history data with modules...");
2762
2763 for (i = 0; NULL != history_string_cbs[i].module; i++)
2764 {
2765 zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_string_cbs[i].module->name);
2766 history_string_cbs[i].history_string_cb(history_string, history_string_num);
2767 }
2768
2769 zabbix_log(LOG_LEVEL_DEBUG, "synced %d string values with modules", history_string_num);
2770 }
2771
2772 if (0 != history_text_num)
2773 {
2774 int i;
2775
2776 zabbix_log(LOG_LEVEL_DEBUG, "syncing text history data with modules...");
2777
2778 for (i = 0; NULL != history_text_cbs[i].module; i++)
2779 {
2780 zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_text_cbs[i].module->name);
2781 history_text_cbs[i].history_text_cb(history_text, history_text_num);
2782 }
2783
2784 zabbix_log(LOG_LEVEL_DEBUG, "synced %d text values with modules", history_text_num);
2785 }
2786
2787 if (0 != history_log_num)
2788 {
2789 int i;
2790
2791 zabbix_log(LOG_LEVEL_DEBUG, "syncing log history data with modules...");
2792
2793 for (i = 0; NULL != history_log_cbs[i].module; i++)
2794 {
2795 zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_log_cbs[i].module->name);
2796 history_log_cbs[i].history_log_cb(history_log, history_log_num);
2797 }
2798
2799 zabbix_log(LOG_LEVEL_DEBUG, "synced %d log values with modules", history_log_num);
2800 }
2801 }
2802
sync_proxy_history(int * total_num,int * more)2803 static void sync_proxy_history(int *total_num, int *more)
2804 {
2805 int history_num;
2806 time_t sync_start;
2807 zbx_vector_ptr_t history_items;
2808 ZBX_DC_HISTORY history[ZBX_HC_SYNC_MAX];
2809
2810 zbx_vector_ptr_create(&history_items);
2811 zbx_vector_ptr_reserve(&history_items, ZBX_HC_SYNC_MAX);
2812
2813 sync_start = time(NULL);
2814
2815 do
2816 {
2817 *more = ZBX_SYNC_DONE;
2818
2819 LOCK_CACHE;
2820
2821 hc_pop_items(&history_items); /* select and take items out of history cache */
2822 history_num = history_items.values_num;
2823
2824 UNLOCK_CACHE;
2825
2826 if (0 == history_num)
2827 break;
2828
2829 hc_get_item_values(history, &history_items); /* copy item data from history cache */
2830
2831 do
2832 {
2833 DBbegin();
2834
2835 DCmass_proxy_add_history(history, history_num);
2836 DCmass_proxy_update_items(history, history_num);
2837 }
2838 while (ZBX_DB_DOWN == DBcommit());
2839
2840 LOCK_CACHE;
2841
2842 hc_push_items(&history_items); /* return items to history cache */
2843 cache->history_num -= history_num;
2844
2845 if (0 != hc_queue_get_size())
2846 *more = ZBX_SYNC_MORE;
2847
2848 UNLOCK_CACHE;
2849
2850 *total_num += history_num;
2851
2852 zbx_vector_ptr_clear(&history_items);
2853 hc_free_item_values(history, history_num);
2854
2855 /* Exit from sync loop if we have spent too much time here */
2856 /* unless we are doing full sync. This is done to allow */
2857 /* syncer process to update their statistics. */
2858 }
2859 while (ZBX_SYNC_MORE == *more && ZBX_HC_SYNC_TIME_MAX >= time(NULL) - sync_start);
2860
2861 zbx_vector_ptr_destroy(&history_items);
2862 }
2863
2864 /******************************************************************************
2865 * *
2866 * Function: sync_server_history *
2867 * *
2868 * Purpose: flush history cache to database, process triggers of flushed *
2869 * and timer triggers from timer queue *
2870 * *
2871 * Parameters: sync_timeout - [IN] the timeout in seconds *
2872 * values_num - [IN/OUT] the number of synced values *
2873 * triggers_num - [IN/OUT] the number of processed timers *
2874 * more - [OUT] a flag indicating the cache emptiness: *
2875 * ZBX_SYNC_DONE - nothing to sync, go idle *
2876 * ZBX_SYNC_MORE - more data to sync *
2877 * *
2878 * Comments: This function loops syncing history values by 1k batches and *
2879 * processing timer triggers by batches of 500 triggers. *
2880 * Unless full sync is being done the loop is aborted if either *
2881 * timeout has passed or there are no more data to process. *
2882 * The last is assumed when the following is true: *
2883 * a) history cache is empty or less than 10% of batch values were *
2884 * processed (the other items were locked by triggers) *
2885 * b) less than 500 (full batch) timer triggers were processed *
2886 * *
2887 ******************************************************************************/
sync_server_history(int * values_num,int * triggers_num,int * more)2888 static void sync_server_history(int *values_num, int *triggers_num, int *more)
2889 {
2890 static ZBX_HISTORY_FLOAT *history_float;
2891 static ZBX_HISTORY_INTEGER *history_integer;
2892 static ZBX_HISTORY_STRING *history_string;
2893 static ZBX_HISTORY_TEXT *history_text;
2894 static ZBX_HISTORY_LOG *history_log;
2895 int i, history_num, history_float_num, history_integer_num, history_string_num,
2896 history_text_num, history_log_num, txn_error, compression_age;
2897 unsigned int item_retrieve_mode;
2898 time_t sync_start;
2899 zbx_vector_uint64_t triggerids, timer_triggerids;
2900 zbx_vector_ptr_t history_items, trigger_diff, item_diff, inventory_values;
2901 zbx_vector_uint64_pair_t trends_diff, proxy_subscribtions;
2902 ZBX_DC_HISTORY history[ZBX_HC_SYNC_MAX];
2903
2904 item_retrieve_mode = NULL == CONFIG_EXPORT_DIR ? ZBX_ITEM_GET_SYNC : ZBX_ITEM_GET_SYNC_EXPORT;
2905
2906 if (NULL == history_float && NULL != history_float_cbs)
2907 {
2908 history_float = (ZBX_HISTORY_FLOAT *)zbx_malloc(history_float,
2909 ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_FLOAT));
2910 }
2911
2912 if (NULL == history_integer && NULL != history_integer_cbs)
2913 {
2914 history_integer = (ZBX_HISTORY_INTEGER *)zbx_malloc(history_integer,
2915 ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_INTEGER));
2916 }
2917
2918 if (NULL == history_string && NULL != history_string_cbs)
2919 {
2920 history_string = (ZBX_HISTORY_STRING *)zbx_malloc(history_string,
2921 ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_STRING));
2922 }
2923
2924 if (NULL == history_text && NULL != history_text_cbs)
2925 {
2926 history_text = (ZBX_HISTORY_TEXT *)zbx_malloc(history_text,
2927 ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_TEXT));
2928 }
2929
2930 if (NULL == history_log && NULL != history_log_cbs)
2931 {
2932 history_log = (ZBX_HISTORY_LOG *)zbx_malloc(history_log,
2933 ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_LOG));
2934 }
2935
2936 compression_age = hc_get_history_compression_age();
2937
2938 zbx_vector_ptr_create(&inventory_values);
2939 zbx_vector_ptr_create(&item_diff);
2940 zbx_vector_ptr_create(&trigger_diff);
2941 zbx_vector_uint64_pair_create(&trends_diff);
2942 zbx_vector_uint64_pair_create(&proxy_subscribtions);
2943
2944 zbx_vector_uint64_create(&triggerids);
2945 zbx_vector_uint64_reserve(&triggerids, ZBX_HC_SYNC_MAX);
2946
2947 zbx_vector_uint64_create(&timer_triggerids);
2948 zbx_vector_uint64_reserve(&timer_triggerids, ZBX_HC_TIMER_MAX);
2949
2950 zbx_vector_ptr_create(&history_items);
2951 zbx_vector_ptr_reserve(&history_items, ZBX_HC_SYNC_MAX);
2952
2953 sync_start = time(NULL);
2954
2955 do
2956 {
2957 DC_ITEM *items;
2958 int *errcodes, trends_num = 0, timers_num = 0, ret = SUCCEED;
2959 zbx_vector_uint64_t itemids;
2960 ZBX_DC_TREND *trends = NULL;
2961 zbx_timespec_t ts;
2962
2963 zbx_vector_uint64_create(&itemids);
2964
2965 *more = ZBX_SYNC_DONE;
2966
2967 LOCK_CACHE;
2968 hc_pop_items(&history_items); /* select and take items out of history cache */
2969 UNLOCK_CACHE;
2970
2971 if (0 != history_items.values_num)
2972 {
2973 if (0 == (history_num = DCconfig_lock_triggers_by_history_items(&history_items, &triggerids)))
2974 {
2975 LOCK_CACHE;
2976 hc_push_items(&history_items);
2977 UNLOCK_CACHE;
2978 zbx_vector_ptr_clear(&history_items);
2979 }
2980 }
2981 else
2982 history_num = 0;
2983
2984 zbx_timespec(&ts);
2985
2986 if (0 != history_num)
2987 {
2988 hc_get_item_values(history, &history_items); /* copy item data from history cache */
2989
2990 items = (DC_ITEM *)zbx_malloc(NULL, sizeof(DC_ITEM) * (size_t)history_num);
2991 errcodes = (int *)zbx_malloc(NULL, sizeof(int) * (size_t)history_num);
2992
2993 zbx_vector_uint64_reserve(&itemids, history_num);
2994
2995 for (i = 0; i < history_num; i++)
2996 zbx_vector_uint64_append(&itemids, history[i].itemid);
2997
2998 zbx_vector_uint64_sort(&itemids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
2999
3000 DCconfig_get_items_by_itemids_partial(items, itemids.values, errcodes, history_num,
3001 item_retrieve_mode);
3002
3003 DCmass_prepare_history(history, &itemids, items, errcodes, history_num, &item_diff,
3004 &inventory_values, compression_age, &proxy_subscribtions);
3005
3006 if (FAIL != (ret = DBmass_add_history(history, history_num)))
3007 {
3008 DCconfig_items_apply_changes(&item_diff);
3009 DCmass_update_trends(history, history_num, &trends, &trends_num, compression_age);
3010
3011 do
3012 {
3013 DBbegin();
3014
3015 DBmass_update_items(&item_diff, &inventory_values);
3016 DBmass_update_trends(trends, trends_num, &trends_diff);
3017
3018 /* process internal events generated by DCmass_prepare_history() */
3019 zbx_process_events(NULL, NULL);
3020
3021 if (ZBX_DB_OK == (txn_error = DBcommit()))
3022 DCupdate_trends(&trends_diff);
3023 else
3024 zbx_reset_event_recovery();
3025
3026 zbx_vector_uint64_pair_clear(&trends_diff);
3027 }
3028 while (ZBX_DB_DOWN == txn_error);
3029 }
3030
3031 zbx_clean_events();
3032
3033 zbx_vector_ptr_clear_ext(&inventory_values, (zbx_clean_func_t)DCinventory_value_free);
3034 zbx_vector_ptr_clear_ext(&item_diff, (zbx_clean_func_t)zbx_ptr_free);
3035 }
3036
3037 if (FAIL != ret)
3038 {
3039 zbx_dc_get_timer_triggerids(&timer_triggerids, time(NULL), ZBX_HC_TIMER_MAX);
3040 timers_num = timer_triggerids.values_num;
3041
3042 if (ZBX_HC_TIMER_MAX == timers_num)
3043 *more = ZBX_SYNC_MORE;
3044
3045 if (0 != history_num || 0 != timers_num)
3046 {
3047 /* timer triggers do not intersect with item triggers because item triggers */
3048 /* where already locked and skipped when retrieving timer triggers */
3049 zbx_vector_uint64_append_array(&triggerids, timer_triggerids.values,
3050 timer_triggerids.values_num);
3051 do
3052 {
3053 DBbegin();
3054
3055 recalculate_triggers(history, history_num, &itemids, items, errcodes,
3056 &timer_triggerids, &ts, &trigger_diff);
3057
3058 /* process trigger events generated by recalculate_triggers() */
3059 zbx_process_events(&trigger_diff, &triggerids);
3060 if (0 != trigger_diff.values_num)
3061 zbx_db_save_trigger_changes(&trigger_diff);
3062
3063 if (ZBX_DB_OK == (txn_error = DBcommit()))
3064 {
3065 DCconfig_triggers_apply_changes(&trigger_diff);
3066 DBupdate_itservices(&trigger_diff);
3067 }
3068 else
3069 zbx_clean_events();
3070
3071 zbx_vector_ptr_clear_ext(&trigger_diff, (zbx_clean_func_t)zbx_trigger_diff_free);
3072 }
3073 while (ZBX_DB_DOWN == txn_error);
3074 }
3075
3076 zbx_vector_uint64_clear(&timer_triggerids);
3077 }
3078
3079 if (0 != triggerids.values_num)
3080 {
3081 *triggers_num += triggerids.values_num;
3082 DCconfig_unlock_triggers(&triggerids);
3083 zbx_vector_uint64_clear(&triggerids);
3084 }
3085
3086 if (0 != proxy_subscribtions.values_num)
3087 {
3088 zbx_vector_uint64_pair_sort(&proxy_subscribtions, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
3089 zbx_dc_proxy_update_nodata(&proxy_subscribtions);
3090 zbx_vector_uint64_pair_clear(&proxy_subscribtions);
3091 }
3092
3093 if (0 != history_num)
3094 {
3095 LOCK_CACHE;
3096 hc_push_items(&history_items); /* return items to history cache */
3097 cache->history_num -= history_num;
3098
3099 if (0 != hc_queue_get_size())
3100 {
3101 /* Continue sync if enough of sync candidates were processed */
3102 /* (meaning most of sync candidates are not locked by triggers). */
3103 /* Otherwise better to wait a bit for other syncers to unlock */
3104 /* items rather than trying and failing to sync locked items over */
3105 /* and over again. */
3106 if (ZBX_HC_SYNC_MIN_PCNT <= history_num * 100 / history_items.values_num)
3107 *more = ZBX_SYNC_MORE;
3108 }
3109
3110 UNLOCK_CACHE;
3111
3112 *values_num += history_num;
3113 }
3114
3115 if (FAIL != ret)
3116 {
3117 if (0 != history_num)
3118 {
3119 const ZBX_DC_HISTORY *phistory = NULL;
3120 const ZBX_DC_TREND *ptrends = NULL;
3121 int history_num_loc = 0, trends_num_loc = 0;
3122
3123 DCmodule_prepare_history(history, history_num, history_float, &history_float_num,
3124 history_integer, &history_integer_num, history_string,
3125 &history_string_num, history_text, &history_text_num, history_log,
3126 &history_log_num);
3127
3128 DCmodule_sync_history(history_float_num, history_integer_num, history_string_num,
3129 history_text_num, history_log_num, history_float, history_integer,
3130 history_string, history_text, history_log);
3131
3132 if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_HISTORY))
3133 {
3134 phistory = history;
3135 history_num_loc = history_num;
3136 }
3137
3138 if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_TRENDS))
3139 {
3140 ptrends = trends;
3141 trends_num_loc = trends_num;
3142 }
3143
3144 if (NULL != phistory || NULL != ptrends)
3145 {
3146 DCexport_history_and_trends(phistory, history_num_loc, &itemids, items,
3147 errcodes, ptrends, trends_num_loc);
3148 }
3149 }
3150
3151 if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_EVENTS))
3152 zbx_export_events();
3153 }
3154
3155 if (0 != history_num || 0 != timers_num)
3156 zbx_clean_events();
3157
3158 if (0 != history_num)
3159 {
3160 zbx_free(trends);
3161 DCconfig_clean_items(items, errcodes, history_num);
3162 zbx_free(errcodes);
3163 zbx_free(items);
3164
3165 zbx_vector_ptr_clear(&history_items);
3166 hc_free_item_values(history, history_num);
3167 }
3168
3169 zbx_vector_uint64_destroy(&itemids);
3170
3171 /* Exit from sync loop if we have spent too much time here. */
3172 /* This is done to allow syncer process to update its statistics. */
3173 }
3174 while (ZBX_SYNC_MORE == *more && ZBX_HC_SYNC_TIME_MAX >= time(NULL) - sync_start);
3175
3176 zbx_vector_ptr_destroy(&history_items);
3177 zbx_vector_ptr_destroy(&inventory_values);
3178 zbx_vector_ptr_destroy(&item_diff);
3179 zbx_vector_ptr_destroy(&trigger_diff);
3180 zbx_vector_uint64_pair_destroy(&trends_diff);
3181 zbx_vector_uint64_pair_destroy(&proxy_subscribtions);
3182
3183 zbx_vector_uint64_destroy(&timer_triggerids);
3184 zbx_vector_uint64_destroy(&triggerids);
3185 }
3186
3187 /******************************************************************************
3188 * *
3189 * Function: sync_history_cache_full *
3190 * *
3191 * Purpose: writes updates and new data from history cache to database *
3192 * *
3193 * Comments: This function is used to flush history cache at server/proxy *
3194 * exit. *
3195 * Other processes are already terminated, so cache locking is *
3196 * unnecessary. *
3197 * *
3198 ******************************************************************************/
sync_history_cache_full(void)3199 static void sync_history_cache_full(void)
3200 {
3201 int values_num = 0, triggers_num = 0, more;
3202 zbx_hashset_iter_t iter;
3203 zbx_hc_item_t *item;
3204 zbx_binary_heap_t tmp_history_queue;
3205
3206 zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d", __func__, cache->history_num);
3207
3208 /* History index cache might be full without any space left for queueing items from history index to */
3209 /* history queue. The solution: replace the shared-memory history queue with heap-allocated one. Add */
3210 /* all items from history index to the new history queue. */
3211 /* */
3212 /* Assertions that must be true. */
3213 /* * This is the main server or proxy process, */
3214 /* * There are no other users of history index cache stored in shared memory. Other processes */
3215 /* should have quit by this point. */
3216 /* * other parts of the program do not hold pointers to the elements of history queue that is */
3217 /* stored in the shared memory. */
3218
3219 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
3220 {
3221 /* unlock all triggers before full sync so no items are locked by triggers */
3222 DCconfig_unlock_all_triggers();
3223
3224 /* clear timer trigger queue to avoid processing time triggers at exit */
3225 zbx_dc_clear_timer_queue();
3226 }
3227
3228 tmp_history_queue = cache->history_queue;
3229
3230 zbx_binary_heap_create(&cache->history_queue, hc_queue_elem_compare_func, ZBX_BINARY_HEAP_OPTION_EMPTY);
3231 zbx_hashset_iter_reset(&cache->history_items, &iter);
3232
3233 /* add all items from history index to the new history queue */
3234 while (NULL != (item = (zbx_hc_item_t *)zbx_hashset_iter_next(&iter)))
3235 {
3236 if (NULL != item->tail)
3237 {
3238 item->status = ZBX_HC_ITEM_STATUS_NORMAL;
3239 hc_queue_item(item);
3240 }
3241 }
3242
3243 if (0 != hc_queue_get_size())
3244 {
3245 zabbix_log(LOG_LEVEL_WARNING, "syncing history data...");
3246
3247 do
3248 {
3249 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
3250 sync_server_history(&values_num, &triggers_num, &more);
3251 else
3252 sync_proxy_history(&values_num, &more);
3253
3254 zabbix_log(LOG_LEVEL_WARNING, "syncing history data... " ZBX_FS_DBL "%%",
3255 (double)values_num / (cache->history_num + values_num) * 100);
3256 }
3257 while (0 != hc_queue_get_size());
3258
3259 zabbix_log(LOG_LEVEL_WARNING, "syncing history data done");
3260 }
3261
3262 zbx_binary_heap_destroy(&cache->history_queue);
3263 cache->history_queue = tmp_history_queue;
3264
3265 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
3266 }
3267
3268 /******************************************************************************
3269 * *
3270 * Function: zbx_log_sync_history_cache_progress *
3271 * *
3272 * Purpose: log progress of syncing history data *
3273 * *
3274 ******************************************************************************/
zbx_log_sync_history_cache_progress(void)3275 void zbx_log_sync_history_cache_progress(void)
3276 {
3277 double pcnt = -1.0;
3278 int ts_last, ts_next, sec;
3279
3280 LOCK_CACHE;
3281
3282 if (INT_MAX == cache->history_progress_ts)
3283 {
3284 UNLOCK_CACHE;
3285 return;
3286 }
3287
3288 ts_last = cache->history_progress_ts;
3289 sec = time(NULL);
3290
3291 if (0 == cache->history_progress_ts)
3292 {
3293 cache->history_num_total = cache->history_num;
3294 cache->history_progress_ts = sec;
3295 }
3296
3297 if (ZBX_HC_SYNC_TIME_MAX <= sec - cache->history_progress_ts || 0 == cache->history_num)
3298 {
3299 if (0 != cache->history_num_total)
3300 pcnt = 100 * (double)(cache->history_num_total - cache->history_num) / cache->history_num_total;
3301
3302 cache->history_progress_ts = (0 == cache->history_num ? INT_MAX : sec);
3303 }
3304
3305 ts_next = cache->history_progress_ts;
3306
3307 UNLOCK_CACHE;
3308
3309 if (0 == ts_last)
3310 zabbix_log(LOG_LEVEL_WARNING, "syncing history data in progress... ");
3311
3312 if (-1.0 != pcnt)
3313 zabbix_log(LOG_LEVEL_WARNING, "syncing history data... " ZBX_FS_DBL "%%", pcnt);
3314
3315 if (INT_MAX == ts_next)
3316 zabbix_log(LOG_LEVEL_WARNING, "syncing history data done");
3317 }
3318
3319 /******************************************************************************
3320 * *
3321 * Function: zbx_sync_history_cache *
3322 * *
3323 * Purpose: writes updates and new data from history cache to database *
3324 * *
3325 * Parameters: values_num - [OUT] the number of synced values *
3326 * more - [OUT] a flag indicating the cache emptiness: *
3327 * ZBX_SYNC_DONE - nothing to sync, go idle *
3328 * ZBX_SYNC_MORE - more data to sync *
3329 * *
3330 ******************************************************************************/
zbx_sync_history_cache(int * values_num,int * triggers_num,int * more)3331 void zbx_sync_history_cache(int *values_num, int *triggers_num, int *more)
3332 {
3333 zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d", __func__, cache->history_num);
3334
3335 *values_num = 0;
3336 *triggers_num = 0;
3337
3338 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
3339 sync_server_history(values_num, triggers_num, more);
3340 else
3341 sync_proxy_history(values_num, more);
3342 }
3343
3344 /******************************************************************************
3345 * *
3346 * local history cache *
3347 * *
3348 ******************************************************************************/
dc_string_buffer_realloc(size_t len)3349 static void dc_string_buffer_realloc(size_t len)
3350 {
3351 if (string_values_alloc >= string_values_offset + len)
3352 return;
3353
3354 do
3355 {
3356 string_values_alloc += ZBX_STRING_REALLOC_STEP;
3357 }
3358 while (string_values_alloc < string_values_offset + len);
3359
3360 string_values = (char *)zbx_realloc(string_values, string_values_alloc);
3361 }
3362
dc_local_get_history_slot(void)3363 static dc_item_value_t *dc_local_get_history_slot(void)
3364 {
3365 if (ZBX_MAX_VALUES_LOCAL == item_values_num)
3366 dc_flush_history();
3367
3368 if (item_values_alloc == item_values_num)
3369 {
3370 item_values_alloc += ZBX_STRUCT_REALLOC_STEP;
3371 item_values = (dc_item_value_t *)zbx_realloc(item_values, item_values_alloc * sizeof(dc_item_value_t));
3372 }
3373
3374 return &item_values[item_values_num++];
3375 }
3376
dc_local_add_history_dbl(zbx_uint64_t itemid,unsigned char item_value_type,const zbx_timespec_t * ts,double value_orig,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)3377 static void dc_local_add_history_dbl(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts,
3378 double value_orig, zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
3379 {
3380 dc_item_value_t *item_value;
3381
3382 item_value = dc_local_get_history_slot();
3383
3384 item_value->itemid = itemid;
3385 item_value->ts = *ts;
3386 item_value->item_value_type = item_value_type;
3387 item_value->value_type = ITEM_VALUE_TYPE_FLOAT;
3388 item_value->state = ITEM_STATE_NORMAL;
3389 item_value->flags = flags;
3390
3391 if (0 != (item_value->flags & ZBX_DC_FLAG_META))
3392 {
3393 item_value->lastlogsize = lastlogsize;
3394 item_value->mtime = mtime;
3395 }
3396
3397 if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE))
3398 item_value->value.value_dbl = value_orig;
3399 }
3400
dc_local_add_history_uint(zbx_uint64_t itemid,unsigned char item_value_type,const zbx_timespec_t * ts,zbx_uint64_t value_orig,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)3401 static void dc_local_add_history_uint(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts,
3402 zbx_uint64_t value_orig, zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
3403 {
3404 dc_item_value_t *item_value;
3405
3406 item_value = dc_local_get_history_slot();
3407
3408 item_value->itemid = itemid;
3409 item_value->ts = *ts;
3410 item_value->item_value_type = item_value_type;
3411 item_value->value_type = ITEM_VALUE_TYPE_UINT64;
3412 item_value->state = ITEM_STATE_NORMAL;
3413 item_value->flags = flags;
3414
3415 if (0 != (item_value->flags & ZBX_DC_FLAG_META))
3416 {
3417 item_value->lastlogsize = lastlogsize;
3418 item_value->mtime = mtime;
3419 }
3420
3421 if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE))
3422 item_value->value.value_uint = value_orig;
3423 }
3424
dc_local_add_history_text(zbx_uint64_t itemid,unsigned char item_value_type,const zbx_timespec_t * ts,const char * value_orig,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)3425 static void dc_local_add_history_text(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts,
3426 const char *value_orig, zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
3427 {
3428 dc_item_value_t *item_value;
3429
3430 item_value = dc_local_get_history_slot();
3431
3432 item_value->itemid = itemid;
3433 item_value->ts = *ts;
3434 item_value->item_value_type = item_value_type;
3435 item_value->value_type = ITEM_VALUE_TYPE_TEXT;
3436 item_value->state = ITEM_STATE_NORMAL;
3437 item_value->flags = flags;
3438
3439 if (0 != (item_value->flags & ZBX_DC_FLAG_META))
3440 {
3441 item_value->lastlogsize = lastlogsize;
3442 item_value->mtime = mtime;
3443 }
3444
3445 if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE))
3446 {
3447 item_value->value.value_str.len = zbx_db_strlen_n(value_orig, ZBX_HISTORY_VALUE_LEN) + 1;
3448 dc_string_buffer_realloc(item_value->value.value_str.len);
3449
3450 item_value->value.value_str.pvalue = string_values_offset;
3451 memcpy(&string_values[string_values_offset], value_orig, item_value->value.value_str.len);
3452 string_values_offset += item_value->value.value_str.len;
3453 }
3454 else
3455 item_value->value.value_str.len = 0;
3456 }
3457
dc_local_add_history_log(zbx_uint64_t itemid,unsigned char item_value_type,const zbx_timespec_t * ts,const zbx_log_t * log,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)3458 static void dc_local_add_history_log(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts,
3459 const zbx_log_t *log, zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
3460 {
3461 dc_item_value_t *item_value;
3462
3463 item_value = dc_local_get_history_slot();
3464
3465 item_value->itemid = itemid;
3466 item_value->ts = *ts;
3467 item_value->item_value_type = item_value_type;
3468 item_value->value_type = ITEM_VALUE_TYPE_LOG;
3469 item_value->state = ITEM_STATE_NORMAL;
3470
3471 item_value->flags = flags;
3472
3473 if (0 != (item_value->flags & ZBX_DC_FLAG_META))
3474 {
3475 item_value->lastlogsize = lastlogsize;
3476 item_value->mtime = mtime;
3477 }
3478
3479 if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE))
3480 {
3481 item_value->severity = log->severity;
3482 item_value->logeventid = log->logeventid;
3483 item_value->timestamp = log->timestamp;
3484
3485 item_value->value.value_str.len = zbx_db_strlen_n(log->value, ZBX_HISTORY_VALUE_LEN) + 1;
3486
3487 if (NULL != log->source && '\0' != *log->source)
3488 item_value->source.len = zbx_db_strlen_n(log->source, HISTORY_LOG_SOURCE_LEN) + 1;
3489 else
3490 item_value->source.len = 0;
3491 }
3492 else
3493 {
3494 item_value->value.value_str.len = 0;
3495 item_value->source.len = 0;
3496 }
3497
3498 if (0 != item_value->value.value_str.len + item_value->source.len)
3499 {
3500 dc_string_buffer_realloc(item_value->value.value_str.len + item_value->source.len);
3501
3502 if (0 != item_value->value.value_str.len)
3503 {
3504 item_value->value.value_str.pvalue = string_values_offset;
3505 memcpy(&string_values[string_values_offset], log->value, item_value->value.value_str.len);
3506 string_values_offset += item_value->value.value_str.len;
3507 }
3508
3509 if (0 != item_value->source.len)
3510 {
3511 item_value->source.pvalue = string_values_offset;
3512 memcpy(&string_values[string_values_offset], log->source, item_value->source.len);
3513 string_values_offset += item_value->source.len;
3514 }
3515 }
3516 }
3517
dc_local_add_history_notsupported(zbx_uint64_t itemid,const zbx_timespec_t * ts,const char * error,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)3518 static void dc_local_add_history_notsupported(zbx_uint64_t itemid, const zbx_timespec_t *ts, const char *error,
3519 zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
3520 {
3521 dc_item_value_t *item_value;
3522
3523 item_value = dc_local_get_history_slot();
3524
3525 item_value->itemid = itemid;
3526 item_value->ts = *ts;
3527 item_value->state = ITEM_STATE_NOTSUPPORTED;
3528 item_value->flags = flags;
3529
3530 if (0 != (item_value->flags & ZBX_DC_FLAG_META))
3531 {
3532 item_value->lastlogsize = lastlogsize;
3533 item_value->mtime = mtime;
3534 }
3535
3536 item_value->value.value_str.len = zbx_db_strlen_n(error, ITEM_ERROR_LEN) + 1;
3537 dc_string_buffer_realloc(item_value->value.value_str.len);
3538 item_value->value.value_str.pvalue = string_values_offset;
3539 memcpy(&string_values[string_values_offset], error, item_value->value.value_str.len);
3540 string_values_offset += item_value->value.value_str.len;
3541 }
3542
dc_local_add_history_lld(zbx_uint64_t itemid,const zbx_timespec_t * ts,const char * value_orig)3543 static void dc_local_add_history_lld(zbx_uint64_t itemid, const zbx_timespec_t *ts, const char *value_orig)
3544 {
3545 dc_item_value_t *item_value;
3546
3547 item_value = dc_local_get_history_slot();
3548
3549 item_value->itemid = itemid;
3550 item_value->ts = *ts;
3551 item_value->state = ITEM_STATE_NORMAL;
3552 item_value->flags = ZBX_DC_FLAG_LLD;
3553 item_value->value.value_str.len = strlen(value_orig) + 1;
3554
3555 dc_string_buffer_realloc(item_value->value.value_str.len);
3556 item_value->value.value_str.pvalue = string_values_offset;
3557 memcpy(&string_values[string_values_offset], value_orig, item_value->value.value_str.len);
3558 string_values_offset += item_value->value.value_str.len;
3559 }
3560
dc_local_add_history_empty(zbx_uint64_t itemid,unsigned char item_value_type,const zbx_timespec_t * ts,unsigned char flags)3561 static void dc_local_add_history_empty(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts,
3562 unsigned char flags)
3563 {
3564 dc_item_value_t *item_value;
3565
3566 item_value = dc_local_get_history_slot();
3567
3568 item_value->itemid = itemid;
3569 item_value->ts = *ts;
3570 item_value->item_value_type = item_value_type;
3571 item_value->value_type = ITEM_VALUE_TYPE_NONE;
3572 item_value->state = ITEM_STATE_NORMAL;
3573 item_value->flags = flags;
3574 }
3575
3576 /******************************************************************************
3577 * *
3578 * Function: dc_add_history *
3579 * *
3580 * Purpose: add new value to the cache *
3581 * *
3582 * Parameters: itemid - [IN] the itemid *
3583 * item_value_type - [IN] the item value type *
3584 * item_flags - [IN] the item flags (e. g. lld rule) *
3585 * result - [IN] agent result containing the value *
3586 * to add *
3587 * ts - [IN] the value timestamp *
3588 * state - [IN] the item state *
3589 * error - [IN] the error message in case item state *
3590 * is ITEM_STATE_NOTSUPPORTED *
3591 * *
3592 ******************************************************************************/
dc_add_history(zbx_uint64_t itemid,unsigned char item_value_type,unsigned char item_flags,AGENT_RESULT * result,const zbx_timespec_t * ts,unsigned char state,const char * error)3593 void dc_add_history(zbx_uint64_t itemid, unsigned char item_value_type, unsigned char item_flags,
3594 AGENT_RESULT *result, const zbx_timespec_t *ts, unsigned char state, const char *error)
3595 {
3596 unsigned char value_flags;
3597
3598 if (ITEM_STATE_NOTSUPPORTED == state)
3599 {
3600 zbx_uint64_t lastlogsize;
3601 int mtime;
3602
3603 if (NULL != result && 0 != ISSET_META(result))
3604 {
3605 value_flags = ZBX_DC_FLAG_META;
3606 lastlogsize = result->lastlogsize;
3607 mtime = result->mtime;
3608 }
3609 else
3610 {
3611 value_flags = 0;
3612 lastlogsize = 0;
3613 mtime = 0;
3614 }
3615 dc_local_add_history_notsupported(itemid, ts, error, lastlogsize, mtime, value_flags);
3616 return;
3617 }
3618
3619 /* allow proxy to send timestamps of empty (throttled etc) values to update nextchecks for queue */
3620 if (!ISSET_VALUE(result) && !ISSET_META(result) && 0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
3621 return;
3622
3623 value_flags = 0;
3624
3625 if (!ISSET_VALUE(result))
3626 value_flags |= ZBX_DC_FLAG_NOVALUE;
3627
3628 if (ISSET_META(result))
3629 value_flags |= ZBX_DC_FLAG_META;
3630
3631 /* Add data to the local history cache if: */
3632 /* 1) the NOVALUE flag is set (data contains either meta information or timestamp) */
3633 /* 2) the NOVALUE flag is not set and value conversion succeeded */
3634
3635 if (0 == (value_flags & ZBX_DC_FLAG_NOVALUE))
3636 {
3637 if (0 != (ZBX_FLAG_DISCOVERY_RULE & item_flags))
3638 {
3639 if (NULL == GET_TEXT_RESULT(result))
3640 return;
3641
3642 /* proxy stores low-level discovery (lld) values in db */
3643 if (0 == (ZBX_PROGRAM_TYPE_SERVER & program_type))
3644 dc_local_add_history_lld(itemid, ts, result->text);
3645
3646 return;
3647 }
3648
3649 if (ISSET_LOG(result))
3650 {
3651 dc_local_add_history_log(itemid, item_value_type, ts, result->log, result->lastlogsize,
3652 result->mtime, value_flags);
3653 }
3654 else if (ISSET_UI64(result))
3655 {
3656 dc_local_add_history_uint(itemid, item_value_type, ts, result->ui64, result->lastlogsize,
3657 result->mtime, value_flags);
3658 }
3659 else if (ISSET_DBL(result))
3660 {
3661 dc_local_add_history_dbl(itemid, item_value_type, ts, result->dbl, result->lastlogsize,
3662 result->mtime, value_flags);
3663 }
3664 else if (ISSET_STR(result))
3665 {
3666 dc_local_add_history_text(itemid, item_value_type, ts, result->str, result->lastlogsize,
3667 result->mtime, value_flags);
3668 }
3669 else if (ISSET_TEXT(result))
3670 {
3671 dc_local_add_history_text(itemid, item_value_type, ts, result->text, result->lastlogsize,
3672 result->mtime, value_flags);
3673 }
3674 else
3675 {
3676 THIS_SHOULD_NEVER_HAPPEN;
3677 }
3678 }
3679 else
3680 {
3681 if (0 != (value_flags & ZBX_DC_FLAG_META))
3682 {
3683 dc_local_add_history_log(itemid, item_value_type, ts, NULL, result->lastlogsize, result->mtime,
3684 value_flags);
3685 }
3686 else
3687 dc_local_add_history_empty(itemid, item_value_type, ts, value_flags);
3688 }
3689 }
3690
dc_flush_history(void)3691 void dc_flush_history(void)
3692 {
3693 if (0 == item_values_num)
3694 return;
3695
3696 LOCK_CACHE;
3697
3698 hc_add_item_values(item_values, item_values_num);
3699
3700 cache->history_num += item_values_num;
3701
3702 UNLOCK_CACHE;
3703
3704 item_values_num = 0;
3705 string_values_offset = 0;
3706 }
3707
3708 /******************************************************************************
3709 * *
3710 * history cache storage *
3711 * *
3712 ******************************************************************************/
ZBX_MEM_FUNC_IMPL(__hc_index,hc_index_mem)3713 ZBX_MEM_FUNC_IMPL(__hc_index, hc_index_mem)
3714 ZBX_MEM_FUNC_IMPL(__hc, hc_mem)
3715
3716 /******************************************************************************
3717 * *
3718 * Function: hc_queue_elem_compare_func *
3719 * *
3720 * Purpose: compares history queue elements *
3721 * *
3722 ******************************************************************************/
3723 static int hc_queue_elem_compare_func(const void *d1, const void *d2)
3724 {
3725 const zbx_binary_heap_elem_t *e1 = (const zbx_binary_heap_elem_t *)d1;
3726 const zbx_binary_heap_elem_t *e2 = (const zbx_binary_heap_elem_t *)d2;
3727
3728 const zbx_hc_item_t *item1 = (const zbx_hc_item_t *)e1->data;
3729 const zbx_hc_item_t *item2 = (const zbx_hc_item_t *)e2->data;
3730
3731 /* compare by timestamp of the oldest value */
3732 return zbx_timespec_compare(&item1->tail->ts, &item2->tail->ts);
3733 }
3734
3735 /******************************************************************************
3736 * *
3737 * Function: hc_free_data *
3738 * *
3739 * Purpose: free history item data allocated in history cache *
3740 * *
3741 * Parameters: data - [IN] history item data *
3742 * *
3743 ******************************************************************************/
hc_free_data(zbx_hc_data_t * data)3744 static void hc_free_data(zbx_hc_data_t *data)
3745 {
3746 if (ITEM_STATE_NOTSUPPORTED == data->state)
3747 {
3748 __hc_mem_free_func(data->value.str);
3749 }
3750 else
3751 {
3752 if (0 == (data->flags & ZBX_DC_FLAG_NOVALUE))
3753 {
3754 switch (data->value_type)
3755 {
3756 case ITEM_VALUE_TYPE_STR:
3757 case ITEM_VALUE_TYPE_TEXT:
3758 __hc_mem_free_func(data->value.str);
3759 break;
3760 case ITEM_VALUE_TYPE_LOG:
3761 __hc_mem_free_func(data->value.log->value);
3762
3763 if (NULL != data->value.log->source)
3764 __hc_mem_free_func(data->value.log->source);
3765
3766 __hc_mem_free_func(data->value.log);
3767 break;
3768 }
3769 }
3770 }
3771
3772 __hc_mem_free_func(data);
3773 }
3774
3775 /******************************************************************************
3776 * *
3777 * Function: hc_queue_item *
3778 * *
3779 * Purpose: put back item into history queue *
3780 * *
3781 * Parameters: data - [IN] history item data *
3782 * *
3783 ******************************************************************************/
hc_queue_item(zbx_hc_item_t * item)3784 static void hc_queue_item(zbx_hc_item_t *item)
3785 {
3786 zbx_binary_heap_elem_t elem = {item->itemid, (const void *)item};
3787
3788 zbx_binary_heap_insert(&cache->history_queue, &elem);
3789 }
3790
3791 /******************************************************************************
3792 * *
3793 * Function: hc_get_item *
3794 * *
3795 * Purpose: returns history item by itemid *
3796 * *
3797 * Parameters: itemid - [IN] the item id *
3798 * *
3799 * Return value: the history item or NULL if the requested item is not in *
3800 * history cache *
3801 * *
3802 ******************************************************************************/
hc_get_item(zbx_uint64_t itemid)3803 static zbx_hc_item_t *hc_get_item(zbx_uint64_t itemid)
3804 {
3805 return (zbx_hc_item_t *)zbx_hashset_search(&cache->history_items, &itemid);
3806 }
3807
3808 /******************************************************************************
3809 * *
3810 * Function: hc_add_item *
3811 * *
3812 * Purpose: adds a new item to history cache *
3813 * *
3814 * Parameters: itemid - [IN] the item id *
3815 * [IN] the item data *
3816 * *
3817 * Return value: the added history item *
3818 * *
3819 ******************************************************************************/
hc_add_item(zbx_uint64_t itemid,zbx_hc_data_t * data)3820 static zbx_hc_item_t *hc_add_item(zbx_uint64_t itemid, zbx_hc_data_t *data)
3821 {
3822 zbx_hc_item_t item_local = {itemid, ZBX_HC_ITEM_STATUS_NORMAL, 0, data, data};
3823
3824 return (zbx_hc_item_t *)zbx_hashset_insert(&cache->history_items, &item_local, sizeof(item_local));
3825 }
3826
3827 /******************************************************************************
3828 * *
3829 * Function: hc_mem_value_str_dup *
3830 * *
3831 * Purpose: copies string value to history cache *
3832 * *
3833 * Parameters: str - [IN] the string value *
3834 * *
3835 * Return value: the copied string or NULL if there was not enough memory *
3836 * *
3837 ******************************************************************************/
hc_mem_value_str_dup(const dc_value_str_t * str)3838 static char *hc_mem_value_str_dup(const dc_value_str_t *str)
3839 {
3840 char *ptr;
3841
3842 if (NULL == (ptr = (char *)__hc_mem_malloc_func(NULL, str->len)))
3843 return NULL;
3844
3845 memcpy(ptr, &string_values[str->pvalue], str->len - 1);
3846 ptr[str->len - 1] = '\0';
3847
3848 return ptr;
3849 }
3850
3851 /******************************************************************************
3852 * *
3853 * Function: hc_clone_history_str_data *
3854 * *
3855 * Purpose: clones string value into history data memory *
3856 * *
3857 * Parameters: dst - [IN/OUT] a reference to the cloned value *
3858 * str - [IN] the string value to clone *
3859 * *
3860 * Return value: SUCCESS - either there was no need to clone the string *
3861 * (it was empty or already cloned) or the string was *
3862 * cloned successfully *
3863 * FAIL - not enough memory *
3864 * *
3865 * Comments: This function can be called in loop with the same dst value *
3866 * until it finishes cloning string value. *
3867 * *
3868 ******************************************************************************/
hc_clone_history_str_data(char ** dst,const dc_value_str_t * str)3869 static int hc_clone_history_str_data(char **dst, const dc_value_str_t *str)
3870 {
3871 if (0 == str->len)
3872 return SUCCEED;
3873
3874 if (NULL != *dst)
3875 return SUCCEED;
3876
3877 if (NULL != (*dst = hc_mem_value_str_dup(str)))
3878 return SUCCEED;
3879
3880 return FAIL;
3881 }
3882
3883 /******************************************************************************
3884 * *
3885 * Function: hc_clone_history_log_data *
3886 * *
3887 * Purpose: clones log value into history data memory *
3888 * *
3889 * Parameters: dst - [IN/OUT] a reference to the cloned value *
3890 * item_value - [IN] the log value to clone *
3891 * *
3892 * Return value: SUCCESS - the log value was cloned successfully *
3893 * FAIL - not enough memory *
3894 * *
3895 * Comments: This function can be called in loop with the same dst value *
3896 * until it finishes cloning log value. *
3897 * *
3898 ******************************************************************************/
hc_clone_history_log_data(zbx_log_value_t ** dst,const dc_item_value_t * item_value)3899 static int hc_clone_history_log_data(zbx_log_value_t **dst, const dc_item_value_t *item_value)
3900 {
3901 if (NULL == *dst)
3902 {
3903 /* using realloc instead of malloc just to suppress 'not used' warning for realloc */
3904 if (NULL == (*dst = (zbx_log_value_t *)__hc_mem_realloc_func(NULL, sizeof(zbx_log_value_t))))
3905 return FAIL;
3906
3907 memset(*dst, 0, sizeof(zbx_log_value_t));
3908 }
3909
3910 if (SUCCEED != hc_clone_history_str_data(&(*dst)->value, &item_value->value.value_str))
3911 return FAIL;
3912
3913 if (SUCCEED != hc_clone_history_str_data(&(*dst)->source, &item_value->source))
3914 return FAIL;
3915
3916 (*dst)->logeventid = item_value->logeventid;
3917 (*dst)->severity = item_value->severity;
3918 (*dst)->timestamp = item_value->timestamp;
3919
3920 return SUCCEED;
3921 }
3922
3923 /******************************************************************************
3924 * *
3925 * Function: hc_clone_history_data *
3926 * *
3927 * Purpose: clones item value from local cache into history cache *
3928 * *
3929 * Parameters: data - [IN/OUT] a reference to the cloned value *
3930 * item_value - [IN] the item value *
3931 * *
3932 * Return value: SUCCESS - the item value was cloned successfully *
3933 * FAIL - not enough memory *
3934 * *
3935 * Comments: This function can be called in loop with the same data value *
3936 * until it finishes cloning item value. *
3937 * *
3938 ******************************************************************************/
hc_clone_history_data(zbx_hc_data_t ** data,const dc_item_value_t * item_value)3939 static int hc_clone_history_data(zbx_hc_data_t **data, const dc_item_value_t *item_value)
3940 {
3941 if (NULL == *data)
3942 {
3943 if (NULL == (*data = (zbx_hc_data_t *)__hc_mem_malloc_func(NULL, sizeof(zbx_hc_data_t))))
3944 return FAIL;
3945
3946 memset(*data, 0, sizeof(zbx_hc_data_t));
3947
3948 (*data)->state = item_value->state;
3949 (*data)->ts = item_value->ts;
3950 (*data)->flags = item_value->flags;
3951 }
3952
3953 if (0 != (ZBX_DC_FLAG_META & item_value->flags))
3954 {
3955 (*data)->lastlogsize = item_value->lastlogsize;
3956 (*data)->mtime = item_value->mtime;
3957 }
3958
3959 if (ITEM_STATE_NOTSUPPORTED == item_value->state)
3960 {
3961 if (NULL == ((*data)->value.str = hc_mem_value_str_dup(&item_value->value.value_str)))
3962 return FAIL;
3963
3964 (*data)->value_type = item_value->value_type;
3965 cache->stats.notsupported_counter++;
3966
3967 return SUCCEED;
3968 }
3969
3970 if (0 != (ZBX_DC_FLAG_LLD & item_value->flags))
3971 {
3972 if (NULL == ((*data)->value.str = hc_mem_value_str_dup(&item_value->value.value_str)))
3973 return FAIL;
3974
3975 (*data)->value_type = ITEM_VALUE_TYPE_TEXT;
3976
3977 cache->stats.history_text_counter++;
3978 cache->stats.history_counter++;
3979
3980 return SUCCEED;
3981 }
3982
3983 if (0 == (ZBX_DC_FLAG_NOVALUE & item_value->flags))
3984 {
3985 switch (item_value->value_type)
3986 {
3987 case ITEM_VALUE_TYPE_FLOAT:
3988 (*data)->value.dbl = item_value->value.value_dbl;
3989 break;
3990 case ITEM_VALUE_TYPE_UINT64:
3991 (*data)->value.ui64 = item_value->value.value_uint;
3992 break;
3993 case ITEM_VALUE_TYPE_STR:
3994 if (SUCCEED != hc_clone_history_str_data(&(*data)->value.str,
3995 &item_value->value.value_str))
3996 {
3997 return FAIL;
3998 }
3999 break;
4000 case ITEM_VALUE_TYPE_TEXT:
4001 if (SUCCEED != hc_clone_history_str_data(&(*data)->value.str,
4002 &item_value->value.value_str))
4003 {
4004 return FAIL;
4005 }
4006 break;
4007 case ITEM_VALUE_TYPE_LOG:
4008 if (SUCCEED != hc_clone_history_log_data(&(*data)->value.log, item_value))
4009 return FAIL;
4010 break;
4011 }
4012
4013 switch (item_value->item_value_type)
4014 {
4015 case ITEM_VALUE_TYPE_FLOAT:
4016 cache->stats.history_float_counter++;
4017 break;
4018 case ITEM_VALUE_TYPE_UINT64:
4019 cache->stats.history_uint_counter++;
4020 break;
4021 case ITEM_VALUE_TYPE_STR:
4022 cache->stats.history_str_counter++;
4023 break;
4024 case ITEM_VALUE_TYPE_TEXT:
4025 cache->stats.history_text_counter++;
4026 break;
4027 case ITEM_VALUE_TYPE_LOG:
4028 cache->stats.history_log_counter++;
4029 break;
4030 }
4031
4032 cache->stats.history_counter++;
4033 }
4034
4035 (*data)->value_type = item_value->value_type;
4036
4037 return SUCCEED;
4038 }
4039
4040 /******************************************************************************
4041 * *
4042 * Function: hc_add_item_values *
4043 * *
4044 * Purpose: adds item values to the history cache *
4045 * *
4046 * Parameters: values - [IN] the item values to add *
4047 * values_num - [IN] the number of item values to add *
4048 * *
4049 * Comments: If the history cache is full this function will wait until *
4050 * history syncers processes values freeing enough space to store *
4051 * the new value. *
4052 * *
4053 ******************************************************************************/
hc_add_item_values(dc_item_value_t * values,int values_num)4054 static void hc_add_item_values(dc_item_value_t *values, int values_num)
4055 {
4056 dc_item_value_t *item_value;
4057 int i;
4058 zbx_hc_item_t *item;
4059
4060 for (i = 0; i < values_num; i++)
4061 {
4062 zbx_hc_data_t *data = NULL;
4063
4064 item_value = &values[i];
4065
4066 /* a record with metadata and no value can be dropped if */
4067 /* the metadata update is copied to the last queued value */
4068 if (NULL != (item = hc_get_item(item_value->itemid)) &&
4069 0 != (item_value->flags & ZBX_DC_FLAG_NOVALUE) &&
4070 0 != (item_value->flags & ZBX_DC_FLAG_META))
4071 {
4072 /* skip metadata updates when only one value is queued, */
4073 /* because the item might be already being processed */
4074 if (item->head != item->tail)
4075 {
4076 item->head->lastlogsize = item_value->lastlogsize;
4077 item->head->mtime = item_value->mtime;
4078 item->head->flags |= ZBX_DC_FLAG_META;
4079 continue;
4080 }
4081 }
4082
4083 if (SUCCEED != hc_clone_history_data(&data, item_value))
4084 {
4085 do
4086 {
4087 UNLOCK_CACHE;
4088
4089 zabbix_log(LOG_LEVEL_DEBUG, "History cache is full. Sleeping for 1 second.");
4090 sleep(1);
4091
4092 LOCK_CACHE;
4093 }
4094 while (SUCCEED != hc_clone_history_data(&data, item_value));
4095
4096 item = hc_get_item(item_value->itemid);
4097 }
4098
4099 if (NULL == item)
4100 {
4101 item = hc_add_item(item_value->itemid, data);
4102 hc_queue_item(item);
4103 }
4104 else
4105 {
4106 item->head->next = data;
4107 item->head = data;
4108 }
4109 item->values_num++;
4110 }
4111 }
4112
4113 /******************************************************************************
4114 * *
4115 * Function: hc_copy_history_data *
4116 * *
4117 * Purpose: copies item value from history cache into the specified history *
4118 * value *
4119 * *
4120 * Parameters: history - [OUT] the history value *
4121 * itemid - [IN] the item identifier *
4122 * data - [IN] the history data to copy *
4123 * *
4124 * Comments: handling of uninitialized fields in dc_add_proxy_history_log() *
4125 * *
4126 ******************************************************************************/
hc_copy_history_data(ZBX_DC_HISTORY * history,zbx_uint64_t itemid,zbx_hc_data_t * data)4127 static void hc_copy_history_data(ZBX_DC_HISTORY *history, zbx_uint64_t itemid, zbx_hc_data_t *data)
4128 {
4129 history->itemid = itemid;
4130 history->ts = data->ts;
4131 history->state = data->state;
4132 history->flags = data->flags;
4133 history->lastlogsize = data->lastlogsize;
4134 history->mtime = data->mtime;
4135
4136 if (ITEM_STATE_NOTSUPPORTED == data->state)
4137 {
4138 history->value.err = zbx_strdup(NULL, data->value.str);
4139 history->flags |= ZBX_DC_FLAG_UNDEF;
4140 return;
4141 }
4142
4143 history->value_type = data->value_type;
4144
4145 if (0 == (ZBX_DC_FLAG_NOVALUE & data->flags))
4146 {
4147 switch (data->value_type)
4148 {
4149 case ITEM_VALUE_TYPE_FLOAT:
4150 history->value.dbl = data->value.dbl;
4151 break;
4152 case ITEM_VALUE_TYPE_UINT64:
4153 history->value.ui64 = data->value.ui64;
4154 break;
4155 case ITEM_VALUE_TYPE_STR:
4156 case ITEM_VALUE_TYPE_TEXT:
4157 history->value.str = zbx_strdup(NULL, data->value.str);
4158 break;
4159 case ITEM_VALUE_TYPE_LOG:
4160 history->value.log = (zbx_log_value_t *)zbx_malloc(NULL, sizeof(zbx_log_value_t));
4161 history->value.log->value = zbx_strdup(NULL, data->value.log->value);
4162
4163 if (NULL != data->value.log->source)
4164 history->value.log->source = zbx_strdup(NULL, data->value.log->source);
4165 else
4166 history->value.log->source = NULL;
4167
4168 history->value.log->timestamp = data->value.log->timestamp;
4169 history->value.log->severity = data->value.log->severity;
4170 history->value.log->logeventid = data->value.log->logeventid;
4171
4172 break;
4173 }
4174 }
4175 }
4176
4177 /******************************************************************************
4178 * *
4179 * Function: hc_pop_items *
4180 * *
4181 * Purpose: pops the next batch of history items from cache for processing *
4182 * *
4183 * Parameters: history_items - [OUT] the locked history items *
4184 * *
4185 * Comments: The history_items must be returned back to history cache with *
4186 * hc_push_items() function after they have been processed. *
4187 * *
4188 ******************************************************************************/
hc_pop_items(zbx_vector_ptr_t * history_items)4189 static void hc_pop_items(zbx_vector_ptr_t *history_items)
4190 {
4191 zbx_binary_heap_elem_t *elem;
4192 zbx_hc_item_t *item;
4193
4194 while (ZBX_HC_SYNC_MAX > history_items->values_num && FAIL == zbx_binary_heap_empty(&cache->history_queue))
4195 {
4196 elem = zbx_binary_heap_find_min(&cache->history_queue);
4197 item = (zbx_hc_item_t *)elem->data;
4198 zbx_vector_ptr_append(history_items, item);
4199
4200 zbx_binary_heap_remove_min(&cache->history_queue);
4201 }
4202 }
4203
4204 /******************************************************************************
4205 * *
4206 * Function: hc_get_item_values *
4207 * *
4208 * Purpose: gets item history values *
4209 * *
4210 * Parameters: history - [OUT] the history valeus *
4211 * history_items - [IN] the history items *
4212 * *
4213 ******************************************************************************/
hc_get_item_values(ZBX_DC_HISTORY * history,zbx_vector_ptr_t * history_items)4214 static void hc_get_item_values(ZBX_DC_HISTORY *history, zbx_vector_ptr_t *history_items)
4215 {
4216 int i, history_num = 0;
4217 zbx_hc_item_t *item;
4218
4219 /* we don't need to lock history cache because no other processes can */
4220 /* change item's history data until it is pushed back to history queue */
4221 for (i = 0; i < history_items->values_num; i++)
4222 {
4223 item = (zbx_hc_item_t *)history_items->values[i];
4224
4225 if (ZBX_HC_ITEM_STATUS_BUSY == item->status)
4226 continue;
4227
4228 hc_copy_history_data(&history[history_num++], item->itemid, item->tail);
4229 }
4230 }
4231
4232 /******************************************************************************
4233 * *
4234 * Function: hc_push_processed_items *
4235 * *
4236 * Purpose: push back the processed history items into history cache *
4237 * *
4238 * Parameters: history_items - [IN] the history items containing processed *
4239 * (available) and busy items *
4240 * *
4241 * Comments: This function removes processed value from history cache. *
4242 * If there is no more data for this item, then the item itself is *
4243 * removed from history index. *
4244 * *
4245 ******************************************************************************/
hc_push_items(zbx_vector_ptr_t * history_items)4246 void hc_push_items(zbx_vector_ptr_t *history_items)
4247 {
4248 int i;
4249 zbx_hc_item_t *item;
4250 zbx_hc_data_t *data_free;
4251
4252 for (i = 0; i < history_items->values_num; i++)
4253 {
4254 item = (zbx_hc_item_t *)history_items->values[i];
4255
4256 switch (item->status)
4257 {
4258 case ZBX_HC_ITEM_STATUS_BUSY:
4259 /* reset item status before returning it to queue */
4260 item->status = ZBX_HC_ITEM_STATUS_NORMAL;
4261 hc_queue_item(item);
4262 break;
4263 case ZBX_HC_ITEM_STATUS_NORMAL:
4264 item->values_num--;
4265 data_free = item->tail;
4266 item->tail = item->tail->next;
4267 hc_free_data(data_free);
4268 if (NULL == item->tail)
4269 zbx_hashset_remove(&cache->history_items, item);
4270 else
4271 hc_queue_item(item);
4272 break;
4273 }
4274 }
4275 }
4276
4277 /******************************************************************************
4278 * *
4279 * Function: hc_queue_get_size *
4280 * *
4281 * Purpose: retrieve the size of history queue *
4282 * *
4283 ******************************************************************************/
hc_queue_get_size(void)4284 int hc_queue_get_size(void)
4285 {
4286 return cache->history_queue.elems_num;
4287 }
4288
hc_get_history_compression_age(void)4289 int hc_get_history_compression_age(void)
4290 {
4291 #if defined(HAVE_POSTGRESQL)
4292 zbx_config_t cfg;
4293 int compression_age = 0;
4294
4295 zbx_config_get(&cfg, ZBX_CONFIG_FLAGS_DB_EXTENSION);
4296
4297 if (ON == cfg.db.history_compression_availability &&
4298 ON == cfg.db.history_compression_status &&
4299 0 != cfg.db.history_compress_older)
4300 {
4301 compression_age = (int)time(NULL) - cfg.db.history_compress_older;
4302 }
4303
4304 zbx_config_clean(&cfg);
4305
4306 return compression_age;
4307 #else
4308 return 0;
4309 #endif
4310 }
4311
4312 /******************************************************************************
4313 * *
4314 * Function: init_trend_cache *
4315 * *
4316 * Purpose: Allocate shared memory for trend cache (part of database cache) *
4317 * *
4318 * Author: Vladimir Levijev *
4319 * *
4320 * Comments: Is optionally called from init_database_cache() *
4321 * *
4322 ******************************************************************************/
4323
ZBX_MEM_FUNC_IMPL(__trend,trend_mem)4324 ZBX_MEM_FUNC_IMPL(__trend, trend_mem)
4325
4326 static int init_trend_cache(char **error)
4327 {
4328 size_t sz;
4329 int ret;
4330
4331 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
4332
4333 if (SUCCEED != (ret = zbx_mutex_create(&trends_lock, ZBX_MUTEX_TRENDS, error)))
4334 goto out;
4335
4336 sz = zbx_mem_required_size(1, "trend cache", "TrendCacheSize");
4337 if (SUCCEED != (ret = zbx_mem_create(&trend_mem, CONFIG_TRENDS_CACHE_SIZE, "trend cache", "TrendCacheSize", 0,
4338 error)))
4339 {
4340 goto out;
4341 }
4342
4343 CONFIG_TRENDS_CACHE_SIZE -= sz;
4344
4345 cache->trends_num = 0;
4346 cache->trends_last_cleanup_hour = 0;
4347
4348 #define INIT_HASHSET_SIZE 100 /* Should be calculated dynamically based on trends size? */
4349 /* Still does not make sense to have it more than initial */
4350 /* item hashset size in configuration cache. */
4351
4352 zbx_hashset_create_ext(&cache->trends, INIT_HASHSET_SIZE,
4353 ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, NULL,
4354 __trend_mem_malloc_func, __trend_mem_realloc_func, __trend_mem_free_func);
4355
4356 #undef INIT_HASHSET_SIZE
4357 out:
4358 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
4359
4360 return ret;
4361 }
4362
4363 /******************************************************************************
4364 * *
4365 * Function: init_database_cache *
4366 * *
4367 * Purpose: Allocate shared memory for database cache *
4368 * *
4369 * Author: Alexei Vladishev, Alexander Vladishev *
4370 * *
4371 ******************************************************************************/
init_database_cache(char ** error)4372 int init_database_cache(char **error)
4373 {
4374 int ret;
4375
4376 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
4377
4378 if (SUCCEED != (ret = zbx_mutex_create(&cache_lock, ZBX_MUTEX_CACHE, error)))
4379 goto out;
4380
4381 if (SUCCEED != (ret = zbx_mutex_create(&cache_ids_lock, ZBX_MUTEX_CACHE_IDS, error)))
4382 goto out;
4383
4384 if (SUCCEED != (ret = zbx_mem_create(&hc_mem, CONFIG_HISTORY_CACHE_SIZE, "history cache",
4385 "HistoryCacheSize", 1, error)))
4386 {
4387 goto out;
4388 }
4389
4390 if (SUCCEED != (ret = zbx_mem_create(&hc_index_mem, CONFIG_HISTORY_INDEX_CACHE_SIZE, "history index cache",
4391 "HistoryIndexCacheSize", 0, error)))
4392 {
4393 goto out;
4394 }
4395
4396 cache = (ZBX_DC_CACHE *)__hc_index_mem_malloc_func(NULL, sizeof(ZBX_DC_CACHE));
4397 memset(cache, 0, sizeof(ZBX_DC_CACHE));
4398
4399 ids = (ZBX_DC_IDS *)__hc_index_mem_malloc_func(NULL, sizeof(ZBX_DC_IDS));
4400 memset(ids, 0, sizeof(ZBX_DC_IDS));
4401
4402 zbx_hashset_create_ext(&cache->history_items, ZBX_HC_ITEMS_INIT_SIZE,
4403 ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, NULL,
4404 __hc_index_mem_malloc_func, __hc_index_mem_realloc_func, __hc_index_mem_free_func);
4405
4406 zbx_binary_heap_create_ext(&cache->history_queue, hc_queue_elem_compare_func, ZBX_BINARY_HEAP_OPTION_EMPTY,
4407 __hc_index_mem_malloc_func, __hc_index_mem_realloc_func, __hc_index_mem_free_func);
4408
4409 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
4410 {
4411 zbx_hashset_create_ext(&(cache->proxyqueue.index), ZBX_HC_SYNC_MAX,
4412 ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, NULL,
4413 __hc_index_mem_malloc_func, __hc_index_mem_realloc_func, __hc_index_mem_free_func);
4414
4415 zbx_list_create_ext(&(cache->proxyqueue.list), __hc_index_mem_malloc_func, __hc_index_mem_free_func);
4416
4417 cache->proxyqueue.state = ZBX_HC_PROXYQUEUE_STATE_NORMAL;
4418
4419 if (SUCCEED != (ret = init_trend_cache(error)))
4420 goto out;
4421 }
4422
4423 cache->history_num_total = 0;
4424 cache->history_progress_ts = 0;
4425
4426 if (NULL == sql)
4427 sql = (char *)zbx_malloc(sql, sql_alloc);
4428 out:
4429 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
4430
4431 return ret;
4432 }
4433
4434 /******************************************************************************
4435 * *
4436 * Function: DCsync_all *
4437 * *
4438 * Purpose: writes updates and new data from pool and cache data to database *
4439 * *
4440 * Author: Alexei Vladishev *
4441 * *
4442 ******************************************************************************/
DCsync_all(void)4443 static void DCsync_all(void)
4444 {
4445 zabbix_log(LOG_LEVEL_DEBUG, "In DCsync_all()");
4446
4447 sync_history_cache_full();
4448 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
4449 DCsync_trends();
4450
4451 zabbix_log(LOG_LEVEL_DEBUG, "End of DCsync_all()");
4452 }
4453
4454 /******************************************************************************
4455 * *
4456 * Function: free_database_cache *
4457 * *
4458 * Purpose: Free memory allocated for database cache *
4459 * *
4460 * Author: Alexei Vladishev, Alexander Vladishev *
4461 * *
4462 ******************************************************************************/
free_database_cache(void)4463 void free_database_cache(void)
4464 {
4465 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
4466
4467 DCsync_all();
4468
4469 cache = NULL;
4470
4471 zbx_mutex_destroy(&cache_lock);
4472 zbx_mutex_destroy(&cache_ids_lock);
4473
4474 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
4475 zbx_mutex_destroy(&trends_lock);
4476
4477 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
4478 }
4479
4480 /******************************************************************************
4481 * *
4482 * Function: DCget_nextid *
4483 * *
4484 * Purpose: Return next id for requested table *
4485 * *
4486 * Author: Alexander Vladishev *
4487 * *
4488 ******************************************************************************/
DCget_nextid(const char * table_name,int num)4489 zbx_uint64_t DCget_nextid(const char *table_name, int num)
4490 {
4491 int i;
4492 DB_RESULT result;
4493 DB_ROW row;
4494 const ZBX_TABLE *table;
4495 ZBX_DC_ID *id;
4496 zbx_uint64_t min = 0, max = ZBX_DB_MAX_ID, nextid, lastid;
4497
4498 zabbix_log(LOG_LEVEL_DEBUG, "In %s() table:'%s' num:%d", __func__, table_name, num);
4499
4500 LOCK_CACHE_IDS;
4501
4502 for (i = 0; i < ZBX_IDS_SIZE; i++)
4503 {
4504 id = &ids->id[i];
4505 if ('\0' == *id->table_name)
4506 break;
4507
4508 if (0 == strcmp(id->table_name, table_name))
4509 {
4510 nextid = id->lastid + 1;
4511 id->lastid += num;
4512 lastid = id->lastid;
4513
4514 UNLOCK_CACHE_IDS;
4515
4516 zabbix_log(LOG_LEVEL_DEBUG, "End of %s() table:'%s' [" ZBX_FS_UI64 ":" ZBX_FS_UI64 "]",
4517 __func__, table_name, nextid, lastid);
4518
4519 return nextid;
4520 }
4521 }
4522
4523 if (i == ZBX_IDS_SIZE)
4524 {
4525 zabbix_log(LOG_LEVEL_ERR, "insufficient shared memory for ids");
4526 exit(EXIT_FAILURE);
4527 }
4528
4529 table = DBget_table(table_name);
4530
4531 result = DBselect("select max(%s) from %s where %s between " ZBX_FS_UI64 " and " ZBX_FS_UI64,
4532 table->recid, table_name, table->recid, min, max);
4533
4534 if (NULL != result)
4535 {
4536 zbx_strlcpy(id->table_name, table_name, sizeof(id->table_name));
4537
4538 if (NULL == (row = DBfetch(result)) || SUCCEED == DBis_null(row[0]))
4539 id->lastid = min;
4540 else
4541 ZBX_STR2UINT64(id->lastid, row[0]);
4542
4543 nextid = id->lastid + 1;
4544 id->lastid += num;
4545 lastid = id->lastid;
4546 }
4547 else
4548 nextid = lastid = 0;
4549
4550 UNLOCK_CACHE_IDS;
4551
4552 DBfree_result(result);
4553
4554 zabbix_log(LOG_LEVEL_DEBUG, "End of %s() table:'%s' [" ZBX_FS_UI64 ":" ZBX_FS_UI64 "]",
4555 __func__, table_name, nextid, lastid);
4556
4557 return nextid;
4558 }
4559
4560 /******************************************************************************
4561 * *
4562 * Function: DCupdate_hosts_availability *
4563 * *
4564 * Purpose: performs host availability reset for hosts with availability set *
4565 * on interfaces without enabled items *
4566 * *
4567 ******************************************************************************/
DCupdate_hosts_availability(void)4568 void DCupdate_hosts_availability(void)
4569 {
4570 zbx_vector_ptr_t hosts;
4571 char *sql_buf = NULL;
4572 size_t sql_buf_alloc = 0, sql_buf_offset = 0;
4573 int i;
4574
4575 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
4576
4577 zbx_vector_ptr_create(&hosts);
4578
4579 if (SUCCEED != DCreset_hosts_availability(&hosts))
4580 goto out;
4581
4582 DBbegin();
4583 DBbegin_multiple_update(&sql_buf, &sql_buf_alloc, &sql_buf_offset);
4584
4585 for (i = 0; i < hosts.values_num; i++)
4586 {
4587 if (SUCCEED != zbx_sql_add_host_availability(&sql_buf, &sql_buf_alloc, &sql_buf_offset,
4588 (zbx_host_availability_t *)hosts.values[i]))
4589 {
4590 continue;
4591 }
4592
4593 zbx_strcpy_alloc(&sql_buf, &sql_buf_alloc, &sql_buf_offset, ";\n");
4594 DBexecute_overflowed_sql(&sql_buf, &sql_buf_alloc, &sql_buf_offset);
4595 }
4596
4597 DBend_multiple_update(&sql_buf, &sql_buf_alloc, &sql_buf_offset);
4598
4599 if (16 < sql_buf_offset)
4600 DBexecute("%s", sql_buf);
4601
4602 DBcommit();
4603
4604 zbx_free(sql_buf);
4605 out:
4606 zbx_vector_ptr_clear_ext(&hosts, (zbx_mem_free_func_t)zbx_host_availability_free);
4607 zbx_vector_ptr_destroy(&hosts);
4608
4609 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
4610 }
4611
4612 /******************************************************************************
4613 * *
4614 * Function: zbx_hc_get_diag_stats *
4615 * *
4616 * Purpose: get history cache diagnostics statistics *
4617 * *
4618 ******************************************************************************/
zbx_hc_get_diag_stats(zbx_uint64_t * items_num,zbx_uint64_t * values_num)4619 void zbx_hc_get_diag_stats(zbx_uint64_t *items_num, zbx_uint64_t *values_num)
4620 {
4621 LOCK_CACHE;
4622
4623 *values_num = cache->history_num;
4624 *items_num = cache->history_items.num_data;
4625
4626 UNLOCK_CACHE;
4627 }
4628
4629 /******************************************************************************
4630 * *
4631 * Function: zbx_hc_get_mem_stats *
4632 * *
4633 * Purpose: get shared memory allocator statistics *
4634 * *
4635 ******************************************************************************/
zbx_hc_get_mem_stats(zbx_mem_stats_t * data,zbx_mem_stats_t * index)4636 void zbx_hc_get_mem_stats(zbx_mem_stats_t *data, zbx_mem_stats_t *index)
4637 {
4638 LOCK_CACHE;
4639
4640 if (NULL != data)
4641 zbx_mem_get_stats(hc_mem, data);
4642
4643 if (NULL != index)
4644 zbx_mem_get_stats(hc_index_mem, index);
4645
4646 UNLOCK_CACHE;
4647 }
4648
4649 /******************************************************************************
4650 * *
4651 * Function: zbx_hc_get_items *
4652 * *
4653 * Purpose: get statistics of cached items *
4654 * *
4655 ******************************************************************************/
zbx_hc_get_items(zbx_vector_uint64_pair_t * items)4656 void zbx_hc_get_items(zbx_vector_uint64_pair_t *items)
4657 {
4658 zbx_hashset_iter_t iter;
4659 zbx_hc_item_t *item;
4660
4661 LOCK_CACHE;
4662
4663 zbx_vector_uint64_pair_reserve(items, cache->history_items.num_data);
4664
4665 zbx_hashset_iter_reset(&cache->history_items, &iter);
4666 while (NULL != (item = (zbx_hc_item_t *)zbx_hashset_iter_next(&iter)))
4667 {
4668 zbx_uint64_pair_t pair = {item->itemid, item->values_num};
4669 zbx_vector_uint64_pair_append_ptr(items, &pair);
4670 }
4671
4672 UNLOCK_CACHE;
4673 }
4674
4675 /******************************************************************************
4676 * *
4677 * Function: zbx_hc_proxyqueue_peek *
4678 * *
4679 * Purpose: return first proxy in a queue, function assumes that a queue is *
4680 * not empty *
4681 * *
4682 * Return value: proxyid at the top a queue *
4683 * *
4684 ******************************************************************************/
zbx_hc_proxyqueue_peek(void)4685 static zbx_uint64_t zbx_hc_proxyqueue_peek(void)
4686 {
4687 zbx_uint64_t *p_val;
4688
4689 if (NULL == cache->proxyqueue.list.head)
4690 return 0;
4691
4692 p_val = (zbx_uint64_t *)(cache->proxyqueue.list.head->data);
4693
4694 return *p_val;
4695 }
4696
4697 /******************************************************************************
4698 * *
4699 * Function: zbx_hc_proxyqueue_enqueue *
4700 * *
4701 * Purpose: add new proxyid to a queue *
4702 * *
4703 * Parameters: proxyid - [IN] the proxy id *
4704 * *
4705 ******************************************************************************/
zbx_hc_proxyqueue_enqueue(zbx_uint64_t proxyid)4706 static void zbx_hc_proxyqueue_enqueue(zbx_uint64_t proxyid)
4707 {
4708 if (NULL == zbx_hashset_search(&cache->proxyqueue.index, &proxyid))
4709 {
4710 zbx_uint64_t *ptr;
4711
4712 ptr = zbx_hashset_insert(&cache->proxyqueue.index, &proxyid, sizeof(proxyid));
4713 zbx_list_append(&cache->proxyqueue.list, ptr, NULL);
4714 }
4715 }
4716
4717 /******************************************************************************
4718 * *
4719 * Function: zbx_hc_proxyqueue_dequeue *
4720 * *
4721 * Purpose: try to dequeue proxyid from a proxy queue *
4722 * *
4723 * Parameters: chk_proxyid - [IN] the proxyid *
4724 * *
4725 * Return value: SUCCEED - retrieval successful *
4726 * FAIL - otherwise *
4727 * *
4728 ******************************************************************************/
zbx_hc_proxyqueue_dequeue(zbx_uint64_t proxyid)4729 static int zbx_hc_proxyqueue_dequeue(zbx_uint64_t proxyid)
4730 {
4731 zbx_uint64_t top_val;
4732 void *rem_val = 0;
4733
4734 top_val = zbx_hc_proxyqueue_peek();
4735
4736 if (proxyid != top_val)
4737 return FAIL;
4738
4739 if (FAIL == zbx_list_pop(&cache->proxyqueue.list, &rem_val))
4740 return FAIL;
4741
4742 zbx_hashset_remove_direct(&cache->proxyqueue.index, rem_val);
4743
4744 return SUCCEED;
4745 }
4746
4747 /******************************************************************************
4748 * *
4749 * Function: zbx_hc_proxyqueue_clear *
4750 * *
4751 * Purpose: remove all proxies from proxy priority queue *
4752 * *
4753 ******************************************************************************/
zbx_hc_proxyqueue_clear(void)4754 static void zbx_hc_proxyqueue_clear(void)
4755 {
4756 zbx_list_destroy(&cache->proxyqueue.list);
4757 zbx_hashset_clear(&cache->proxyqueue.index);
4758 }
4759
4760 /******************************************************************************
4761 * *
4762 * Function: zbx_hc_check_proxy *
4763 * *
4764 * Purpose: check status of a history cache usage, enqueue/dequeue proxy *
4765 * from priority list and accordingly enable or disable wait mode *
4766 * *
4767 * Parameters: proxyid - [IN] the proxyid *
4768 * *
4769 * Return value: SUCCEED - proxy can be processed now *
4770 * FAIL - proxy cannot be processed now, it got enqueued *
4771 * *
4772 ******************************************************************************/
zbx_hc_check_proxy(zbx_uint64_t proxyid)4773 int zbx_hc_check_proxy(zbx_uint64_t proxyid)
4774 {
4775 double hc_pused;
4776 int ret;
4777
4778 zabbix_log(LOG_LEVEL_DEBUG, "In %s() proxyid:"ZBX_FS_UI64, __func__, proxyid);
4779
4780 LOCK_CACHE;
4781
4782 hc_pused = 100 * (double)(hc_mem->total_size - hc_mem->free_size) / hc_mem->total_size;
4783
4784 if (20 >= hc_pused)
4785 {
4786 cache->proxyqueue.state = ZBX_HC_PROXYQUEUE_STATE_NORMAL;
4787
4788 zbx_hc_proxyqueue_clear();
4789
4790 ret = SUCCEED;
4791 goto out;
4792 }
4793
4794 if (ZBX_HC_PROXYQUEUE_STATE_WAIT == cache->proxyqueue.state)
4795 {
4796 zbx_hc_proxyqueue_enqueue(proxyid);
4797
4798 if (60 < hc_pused)
4799 {
4800 ret = FAIL;
4801 goto out;
4802 }
4803
4804 cache->proxyqueue.state = ZBX_HC_PROXYQUEUE_STATE_NORMAL;
4805 }
4806 else
4807 {
4808 if (80 <= hc_pused)
4809 {
4810 cache->proxyqueue.state = ZBX_HC_PROXYQUEUE_STATE_WAIT;
4811 zbx_hc_proxyqueue_enqueue(proxyid);
4812
4813 ret = FAIL;
4814 goto out;
4815 }
4816 }
4817
4818 if (0 == zbx_hc_proxyqueue_peek())
4819 {
4820 ret = SUCCEED;
4821 goto out;
4822 }
4823
4824 ret = zbx_hc_proxyqueue_dequeue(proxyid);
4825
4826 out:
4827 UNLOCK_CACHE;
4828
4829 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
4830
4831 return ret;
4832 }
4833
4834