1 /*
2 ** Zabbix
3 ** Copyright (C) 2001-2021 Zabbix SIA
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 **/
19
20 #include "common.h"
21 #include "log.h"
22 #include "threads.h"
23
24 #include "db.h"
25 #include "dbcache.h"
26 #include "ipc.h"
27 #include "mutexs.h"
28 #include "zbxserver.h"
29 #include "proxy.h"
30 #include "events.h"
31 #include "memalloc.h"
32 #include "zbxalgo.h"
33 #include "valuecache.h"
34 #include "zbxmodules.h"
35 #include "module.h"
36 #include "export.h"
37 #include "zbxjson.h"
38 #include "zbxhistory.h"
39 #include "daemon.h"
40 #include "zbxavailability.h"
41 #include "zbxtrends.h"
42 #include "zbxalgo.h"
43 #include "../zbxalgo/vectorimpl.h"
44
45 static zbx_mem_info_t *hc_index_mem = NULL;
46 static zbx_mem_info_t *hc_mem = NULL;
47 static zbx_mem_info_t *trend_mem = NULL;
48
49 #define LOCK_CACHE zbx_mutex_lock(cache_lock)
50 #define UNLOCK_CACHE zbx_mutex_unlock(cache_lock)
51 #define LOCK_TRENDS zbx_mutex_lock(trends_lock)
52 #define UNLOCK_TRENDS zbx_mutex_unlock(trends_lock)
53 #define LOCK_CACHE_IDS zbx_mutex_lock(cache_ids_lock)
54 #define UNLOCK_CACHE_IDS zbx_mutex_unlock(cache_ids_lock)
55
56 static zbx_mutex_t cache_lock = ZBX_MUTEX_NULL;
57 static zbx_mutex_t trends_lock = ZBX_MUTEX_NULL;
58 static zbx_mutex_t cache_ids_lock = ZBX_MUTEX_NULL;
59
60 static char *sql = NULL;
61 static size_t sql_alloc = 4 * ZBX_KIBIBYTE;
62
63 extern unsigned char program_type;
64 extern int CONFIG_DOUBLE_PRECISION;
65 extern char *CONFIG_EXPORT_DIR;
66
67 #define ZBX_IDS_SIZE 10
68
69 #define ZBX_HC_ITEMS_INIT_SIZE 1000
70
71 #define ZBX_TRENDS_CLEANUP_TIME ((SEC_PER_HOUR * 55) / 60)
72
73 /* the maximum time spent synchronizing history */
74 #define ZBX_HC_SYNC_TIME_MAX 10
75
76 /* the maximum number of items in one synchronization batch */
77 #define ZBX_HC_SYNC_MAX 1000
78 #define ZBX_HC_TIMER_MAX (ZBX_HC_SYNC_MAX / 2)
79 #define ZBX_HC_TIMER_SOFT_MAX (ZBX_HC_TIMER_MAX - 10)
80
81 /* the minimum processed item percentage of item candidates to continue synchronizing */
82 #define ZBX_HC_SYNC_MIN_PCNT 10
83
84 /* the maximum number of characters for history cache values */
85 #define ZBX_HISTORY_VALUE_LEN (1024 * 64)
86
87 #define ZBX_DC_FLAGS_NOT_FOR_HISTORY (ZBX_DC_FLAG_NOVALUE | ZBX_DC_FLAG_UNDEF | ZBX_DC_FLAG_NOHISTORY)
88 #define ZBX_DC_FLAGS_NOT_FOR_TRENDS (ZBX_DC_FLAG_NOVALUE | ZBX_DC_FLAG_UNDEF | ZBX_DC_FLAG_NOTRENDS)
89 #define ZBX_DC_FLAGS_NOT_FOR_MODULES (ZBX_DC_FLAGS_NOT_FOR_HISTORY | ZBX_DC_FLAG_LLD)
90 #define ZBX_DC_FLAGS_NOT_FOR_EXPORT (ZBX_DC_FLAG_NOVALUE | ZBX_DC_FLAG_UNDEF)
91
92 #define ZBX_HC_PROXYQUEUE_STATE_NORMAL 0
93 #define ZBX_HC_PROXYQUEUE_STATE_WAIT 1
94
95 typedef struct
96 {
97 char table_name[ZBX_TABLENAME_LEN_MAX];
98 zbx_uint64_t lastid;
99 }
100 ZBX_DC_ID;
101
102 typedef struct
103 {
104 ZBX_DC_ID id[ZBX_IDS_SIZE];
105 }
106 ZBX_DC_IDS;
107
108 static ZBX_DC_IDS *ids = NULL;
109
110 typedef struct
111 {
112 zbx_list_t list;
113 zbx_hashset_t index;
114 int state;
115 }
116 zbx_hc_proxyqueue_t;
117
118 typedef struct
119 {
120 zbx_hashset_t trends;
121 ZBX_DC_STATS stats;
122
123 zbx_hashset_t history_items;
124 zbx_binary_heap_t history_queue;
125
126 int history_num;
127 int trends_num;
128 int trends_last_cleanup_hour;
129 int history_num_total;
130 int history_progress_ts;
131
132 unsigned char db_trigger_queue_lock;
133
134 zbx_hc_proxyqueue_t proxyqueue;
135 }
136 ZBX_DC_CACHE;
137
138 static ZBX_DC_CACHE *cache = NULL;
139
140 /* local history cache */
141 #define ZBX_MAX_VALUES_LOCAL 256
142 #define ZBX_STRUCT_REALLOC_STEP 8
143 #define ZBX_STRING_REALLOC_STEP ZBX_KIBIBYTE
144
145 typedef struct
146 {
147 size_t pvalue;
148 size_t len;
149 }
150 dc_value_str_t;
151
152 typedef struct
153 {
154 double value_dbl;
155 zbx_uint64_t value_uint;
156 dc_value_str_t value_str;
157 }
158 dc_value_t;
159
160 typedef struct
161 {
162 zbx_uint64_t itemid;
163 dc_value_t value;
164 zbx_timespec_t ts;
165 dc_value_str_t source; /* for log items only */
166 zbx_uint64_t lastlogsize;
167 int timestamp; /* for log items only */
168 int severity; /* for log items only */
169 int logeventid; /* for log items only */
170 int mtime;
171 unsigned char item_value_type;
172 unsigned char value_type;
173 unsigned char state;
174 unsigned char flags; /* see ZBX_DC_FLAG_* above */
175 }
176 dc_item_value_t;
177
178 static char *string_values = NULL;
179 static size_t string_values_alloc = 0, string_values_offset = 0;
180 static dc_item_value_t *item_values = NULL;
181 static size_t item_values_alloc = 0, item_values_num = 0;
182
183 static void hc_add_item_values(dc_item_value_t *values, int values_num);
184 static void hc_pop_items(zbx_vector_ptr_t *history_items);
185 static void hc_get_item_values(ZBX_DC_HISTORY *history, zbx_vector_ptr_t *history_items);
186 static void hc_push_items(zbx_vector_ptr_t *history_items);
187 static void hc_free_item_values(ZBX_DC_HISTORY *history, int history_num);
188 static void hc_queue_item(zbx_hc_item_t *item);
189 static int hc_queue_elem_compare_func(const void *d1, const void *d2);
190 static int hc_queue_get_size(void);
191 static int hc_get_history_compression_age(void);
192
ZBX_PTR_VECTOR_DECL(item_tag,zbx_tag_t)193 ZBX_PTR_VECTOR_DECL(item_tag, zbx_tag_t)
194 ZBX_PTR_VECTOR_IMPL(item_tag, zbx_tag_t)
195
196 /******************************************************************************
197 * *
198 * Function: DCget_stats_all *
199 * *
200 * Purpose: retrieves all internal metrics of the database cache *
201 * *
202 * Parameters: stats - [OUT] write cache metrics *
203 * *
204 ******************************************************************************/
205 void DCget_stats_all(zbx_wcache_info_t *wcache_info)
206 {
207 LOCK_CACHE;
208
209 wcache_info->stats = cache->stats;
210 wcache_info->history_free = hc_mem->free_size;
211 wcache_info->history_total = hc_mem->total_size;
212 wcache_info->index_free = hc_index_mem->free_size;
213 wcache_info->index_total = hc_index_mem->total_size;
214
215 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
216 {
217 wcache_info->trend_free = trend_mem->free_size;
218 wcache_info->trend_total = trend_mem->orig_size;
219 }
220
221 UNLOCK_CACHE;
222 }
223
224 /******************************************************************************
225 * *
226 * Function: DCget_stats *
227 * *
228 * Purpose: get statistics of the database cache *
229 * *
230 * Author: Alexander Vladishev *
231 * *
232 ******************************************************************************/
DCget_stats(int request)233 void *DCget_stats(int request)
234 {
235 static zbx_uint64_t value_uint;
236 static double value_double;
237 void *ret;
238
239 LOCK_CACHE;
240
241 switch (request)
242 {
243 case ZBX_STATS_HISTORY_COUNTER:
244 value_uint = cache->stats.history_counter;
245 ret = (void *)&value_uint;
246 break;
247 case ZBX_STATS_HISTORY_FLOAT_COUNTER:
248 value_uint = cache->stats.history_float_counter;
249 ret = (void *)&value_uint;
250 break;
251 case ZBX_STATS_HISTORY_UINT_COUNTER:
252 value_uint = cache->stats.history_uint_counter;
253 ret = (void *)&value_uint;
254 break;
255 case ZBX_STATS_HISTORY_STR_COUNTER:
256 value_uint = cache->stats.history_str_counter;
257 ret = (void *)&value_uint;
258 break;
259 case ZBX_STATS_HISTORY_LOG_COUNTER:
260 value_uint = cache->stats.history_log_counter;
261 ret = (void *)&value_uint;
262 break;
263 case ZBX_STATS_HISTORY_TEXT_COUNTER:
264 value_uint = cache->stats.history_text_counter;
265 ret = (void *)&value_uint;
266 break;
267 case ZBX_STATS_NOTSUPPORTED_COUNTER:
268 value_uint = cache->stats.notsupported_counter;
269 ret = (void *)&value_uint;
270 break;
271 case ZBX_STATS_HISTORY_TOTAL:
272 value_uint = hc_mem->total_size;
273 ret = (void *)&value_uint;
274 break;
275 case ZBX_STATS_HISTORY_USED:
276 value_uint = hc_mem->total_size - hc_mem->free_size;
277 ret = (void *)&value_uint;
278 break;
279 case ZBX_STATS_HISTORY_FREE:
280 value_uint = hc_mem->free_size;
281 ret = (void *)&value_uint;
282 break;
283 case ZBX_STATS_HISTORY_PUSED:
284 value_double = 100 * (double)(hc_mem->total_size - hc_mem->free_size) / hc_mem->total_size;
285 ret = (void *)&value_double;
286 break;
287 case ZBX_STATS_HISTORY_PFREE:
288 value_double = 100 * (double)hc_mem->free_size / hc_mem->total_size;
289 ret = (void *)&value_double;
290 break;
291 case ZBX_STATS_TREND_TOTAL:
292 value_uint = trend_mem->orig_size;
293 ret = (void *)&value_uint;
294 break;
295 case ZBX_STATS_TREND_USED:
296 value_uint = trend_mem->orig_size - trend_mem->free_size;
297 ret = (void *)&value_uint;
298 break;
299 case ZBX_STATS_TREND_FREE:
300 value_uint = trend_mem->free_size;
301 ret = (void *)&value_uint;
302 break;
303 case ZBX_STATS_TREND_PUSED:
304 value_double = 100 * (double)(trend_mem->orig_size - trend_mem->free_size) /
305 trend_mem->orig_size;
306 ret = (void *)&value_double;
307 break;
308 case ZBX_STATS_TREND_PFREE:
309 value_double = 100 * (double)trend_mem->free_size / trend_mem->orig_size;
310 ret = (void *)&value_double;
311 break;
312 case ZBX_STATS_HISTORY_INDEX_TOTAL:
313 value_uint = hc_index_mem->total_size;
314 ret = (void *)&value_uint;
315 break;
316 case ZBX_STATS_HISTORY_INDEX_USED:
317 value_uint = hc_index_mem->total_size - hc_index_mem->free_size;
318 ret = (void *)&value_uint;
319 break;
320 case ZBX_STATS_HISTORY_INDEX_FREE:
321 value_uint = hc_index_mem->free_size;
322 ret = (void *)&value_uint;
323 break;
324 case ZBX_STATS_HISTORY_INDEX_PUSED:
325 value_double = 100 * (double)(hc_index_mem->total_size - hc_index_mem->free_size) /
326 hc_index_mem->total_size;
327 ret = (void *)&value_double;
328 break;
329 case ZBX_STATS_HISTORY_INDEX_PFREE:
330 value_double = 100 * (double)hc_index_mem->free_size / hc_index_mem->total_size;
331 ret = (void *)&value_double;
332 break;
333 default:
334 ret = NULL;
335 }
336
337 UNLOCK_CACHE;
338
339 return ret;
340 }
341
342 /******************************************************************************
343 * *
344 * Function: DCget_trend *
345 * *
346 * Purpose: find existing or add new structure and return pointer *
347 * *
348 * Return value: pointer to a trend structure *
349 * *
350 * Author: Alexander Vladishev *
351 * *
352 ******************************************************************************/
DCget_trend(zbx_uint64_t itemid)353 static ZBX_DC_TREND *DCget_trend(zbx_uint64_t itemid)
354 {
355 ZBX_DC_TREND *ptr, trend;
356
357 if (NULL != (ptr = (ZBX_DC_TREND *)zbx_hashset_search(&cache->trends, &itemid)))
358 return ptr;
359
360 memset(&trend, 0, sizeof(ZBX_DC_TREND));
361 trend.itemid = itemid;
362
363 return (ZBX_DC_TREND *)zbx_hashset_insert(&cache->trends, &trend, sizeof(ZBX_DC_TREND));
364 }
365
366 /******************************************************************************
367 * *
368 * Function: DCupdate_trends *
369 * *
370 * Purpose: apply disable_from changes to cache *
371 * *
372 ******************************************************************************/
DCupdate_trends(zbx_vector_uint64_pair_t * trends_diff)373 static void DCupdate_trends(zbx_vector_uint64_pair_t *trends_diff)
374 {
375 int i;
376
377 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
378
379 LOCK_TRENDS;
380
381 for (i = 0; i < trends_diff->values_num; i++)
382 {
383 ZBX_DC_TREND *trend;
384
385 if (NULL != (trend = (ZBX_DC_TREND *)zbx_hashset_search(&cache->trends, &trends_diff->values[i].first)))
386 trend->disable_from = trends_diff->values[i].second;
387 }
388
389 UNLOCK_TRENDS;
390
391 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
392 }
393
394 /******************************************************************************
395 * *
396 * Function: dc_insert_trends_in_db *
397 * *
398 * Purpose: helper function for DCflush trends *
399 * *
400 ******************************************************************************/
dc_insert_trends_in_db(ZBX_DC_TREND * trends,int trends_num,unsigned char value_type,const char * table_name,int clock)401 static void dc_insert_trends_in_db(ZBX_DC_TREND *trends, int trends_num, unsigned char value_type,
402 const char *table_name, int clock)
403 {
404 ZBX_DC_TREND *trend;
405 int i;
406 zbx_db_insert_t db_insert;
407
408 zbx_db_insert_prepare(&db_insert, table_name, "itemid", "clock", "num", "value_min", "value_avg",
409 "value_max", NULL);
410
411 for (i = 0; i < trends_num; i++)
412 {
413 trend = &trends[i];
414
415 if (0 == trend->itemid)
416 continue;
417
418 if (clock != trend->clock || value_type != trend->value_type)
419 continue;
420
421 if (ITEM_VALUE_TYPE_FLOAT == value_type)
422 {
423 zbx_db_insert_add_values(&db_insert, trend->itemid, trend->clock, trend->num,
424 trend->value_min.dbl, trend->value_avg.dbl, trend->value_max.dbl);
425 }
426 else
427 {
428 zbx_uint128_t avg;
429
430 /* calculate the trend average value */
431 udiv128_64(&avg, &trend->value_avg.ui64, trend->num);
432
433 zbx_db_insert_add_values(&db_insert, trend->itemid, trend->clock, trend->num,
434 trend->value_min.ui64, avg.lo, trend->value_max.ui64);
435 }
436
437 trend->itemid = 0;
438 }
439
440 zbx_db_insert_execute(&db_insert);
441 zbx_db_insert_clean(&db_insert);
442 }
443
444 /******************************************************************************
445 * *
446 * Function: dc_remove_updated_trends *
447 * *
448 * Purpose: Update trends disable_until for items without trends data past or *
449 * equal the specified clock *
450 * *
451 * Comments: A helper function for DCflush trends *
452 * *
453 ******************************************************************************/
dc_remove_updated_trends(ZBX_DC_TREND * trends,int trends_num,const char * table_name,int value_type,zbx_uint64_t * itemids,int * itemids_num,int clock)454 static void dc_remove_updated_trends(ZBX_DC_TREND *trends, int trends_num, const char *table_name,
455 int value_type, zbx_uint64_t *itemids, int *itemids_num, int clock)
456 {
457 int i, j, clocks_num, now, age;
458 ZBX_DC_TREND *trend;
459 zbx_uint64_t itemid;
460 size_t sql_offset;
461 DB_RESULT result;
462 DB_ROW row;
463 int clocks[] = {SEC_PER_DAY, SEC_PER_WEEK, SEC_PER_MONTH, SEC_PER_YEAR, INT_MAX};
464
465 now = time(NULL);
466 age = now - clock;
467 for (clocks_num = 0; age > clocks[clocks_num]; clocks_num++)
468 clocks[clocks_num] = now - clocks[clocks_num];
469 clocks[clocks_num] = clock;
470
471 /* remove itemids with trends data past or equal the clock */
472 for (j = 0; j <= clocks_num && 0 < *itemids_num; j++)
473 {
474 sql_offset = 0;
475 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
476 "select distinct itemid"
477 " from %s"
478 " where clock>=%d and",
479 table_name, clocks[j]);
480
481 if (0 < j)
482 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " clock<%d and", clocks[j - 1]);
483
484 DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", itemids, *itemids_num);
485
486 result = DBselect("%s", sql);
487
488 while (NULL != (row = DBfetch(result)))
489 {
490 ZBX_STR2UINT64(itemid, row[0]);
491 uint64_array_remove(itemids, itemids_num, &itemid, 1);
492 }
493 DBfree_result(result);
494 }
495
496 /* update trends disable_until for the leftover itemids */
497 while (0 != *itemids_num)
498 {
499 itemid = itemids[--*itemids_num];
500
501 for (i = 0; i < trends_num; i++)
502 {
503 trend = &trends[i];
504
505 if (itemid != trend->itemid)
506 continue;
507
508 if (clock != trend->clock || value_type != trend->value_type)
509 continue;
510
511 trend->disable_from = clock;
512 break;
513 }
514 }
515 }
516
517 /******************************************************************************
518 * *
519 * Function: dc_trends_update_float *
520 * *
521 * Purpose: helper function for DCflush trends *
522 * *
523 ******************************************************************************/
dc_trends_update_float(ZBX_DC_TREND * trend,DB_ROW row,int num,size_t * sql_offset)524 static void dc_trends_update_float(ZBX_DC_TREND *trend, DB_ROW row, int num, size_t *sql_offset)
525 {
526 history_value_t value_min, value_avg, value_max;
527
528 value_min.dbl = atof(row[2]);
529 value_avg.dbl = atof(row[3]);
530 value_max.dbl = atof(row[4]);
531
532 if (value_min.dbl < trend->value_min.dbl)
533 trend->value_min.dbl = value_min.dbl;
534
535 if (value_max.dbl > trend->value_max.dbl)
536 trend->value_max.dbl = value_max.dbl;
537
538 trend->value_avg.dbl = trend->value_avg.dbl / (trend->num + num) * trend->num +
539 value_avg.dbl / (trend->num + num) * num;
540 trend->num += num;
541
542 zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset, "update trends set"
543 " num=%d,value_min=" ZBX_FS_DBL64_SQL ",value_avg=" ZBX_FS_DBL64_SQL
544 ",value_max=" ZBX_FS_DBL64_SQL
545 " where itemid=" ZBX_FS_UI64 " and clock=%d;\n",
546 trend->num, trend->value_min.dbl, trend->value_avg.dbl, trend->value_max.dbl,
547 trend->itemid, trend->clock);
548 }
549
550 /******************************************************************************
551 * *
552 * Function: dc_trends_update_uint *
553 * *
554 * Purpose: helper function for DCflush trends *
555 * *
556 ******************************************************************************/
dc_trends_update_uint(ZBX_DC_TREND * trend,DB_ROW row,int num,size_t * sql_offset)557 static void dc_trends_update_uint(ZBX_DC_TREND *trend, DB_ROW row, int num, size_t *sql_offset)
558 {
559 history_value_t value_min, value_avg, value_max;
560 zbx_uint128_t avg;
561
562 ZBX_STR2UINT64(value_min.ui64, row[2]);
563 ZBX_STR2UINT64(value_avg.ui64, row[3]);
564 ZBX_STR2UINT64(value_max.ui64, row[4]);
565
566 if (value_min.ui64 < trend->value_min.ui64)
567 trend->value_min.ui64 = value_min.ui64;
568 if (value_max.ui64 > trend->value_max.ui64)
569 trend->value_max.ui64 = value_max.ui64;
570
571 /* calculate the trend average value */
572 umul64_64(&avg, num, value_avg.ui64);
573 uinc128_128(&trend->value_avg.ui64, &avg);
574 udiv128_64(&avg, &trend->value_avg.ui64, trend->num + num);
575
576 trend->num += num;
577
578 zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset,
579 "update trends_uint set num=%d,value_min=" ZBX_FS_UI64 ",value_avg="
580 ZBX_FS_UI64 ",value_max=" ZBX_FS_UI64 " where itemid=" ZBX_FS_UI64
581 " and clock=%d;\n",
582 trend->num,
583 trend->value_min.ui64,
584 avg.lo,
585 trend->value_max.ui64,
586 trend->itemid,
587 trend->clock);
588 }
589
590 /******************************************************************************
591 * *
592 * Function: dc_trends_fetch_and_update *
593 * *
594 * Purpose: helper function for DCflush trends *
595 * *
596 ******************************************************************************/
dc_trends_fetch_and_update(ZBX_DC_TREND * trends,int trends_num,zbx_uint64_t * itemids,int itemids_num,int * inserts_num,unsigned char value_type,const char * table_name,int clock)597 static void dc_trends_fetch_and_update(ZBX_DC_TREND *trends, int trends_num, zbx_uint64_t *itemids,
598 int itemids_num, int *inserts_num, unsigned char value_type,
599 const char *table_name, int clock)
600 {
601
602 int i, num;
603 DB_RESULT result;
604 DB_ROW row;
605 zbx_uint64_t itemid;
606 ZBX_DC_TREND *trend;
607 size_t sql_offset;
608
609 sql_offset = 0;
610 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
611 "select itemid,num,value_min,value_avg,value_max"
612 " from %s"
613 " where clock=%d and",
614 table_name, clock);
615
616 DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", itemids, itemids_num);
617
618 result = DBselect("%s order by itemid,clock", sql);
619
620 sql_offset = 0;
621 DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset);
622
623 while (NULL != (row = DBfetch(result)))
624 {
625 ZBX_STR2UINT64(itemid, row[0]);
626
627 for (i = 0; i < trends_num; i++)
628 {
629 trend = &trends[i];
630
631 if (itemid != trend->itemid)
632 continue;
633
634 if (clock != trend->clock || value_type != trend->value_type)
635 continue;
636
637 break;
638 }
639
640 if (i == trends_num)
641 {
642 THIS_SHOULD_NEVER_HAPPEN;
643 continue;
644 }
645
646 num = atoi(row[1]);
647
648 if (value_type == ITEM_VALUE_TYPE_FLOAT)
649 dc_trends_update_float(trend, row, num, &sql_offset);
650 else
651 dc_trends_update_uint(trend, row, num, &sql_offset);
652
653 trend->itemid = 0;
654
655 --*inserts_num;
656
657 DBexecute_overflowed_sql(&sql, &sql_alloc, &sql_offset);
658 }
659
660 DBfree_result(result);
661
662 DBend_multiple_update(&sql, &sql_alloc, &sql_offset);
663
664 if (sql_offset > 16) /* In ORACLE always present begin..end; */
665 DBexecute("%s", sql);
666 }
667
668 /******************************************************************************
669 * *
670 * Function: DBflush_trends *
671 * *
672 * Purpose: flush trend to the database *
673 * *
674 * Author: Alexander Vladishev *
675 * *
676 ******************************************************************************/
DBflush_trends(ZBX_DC_TREND * trends,int * trends_num,zbx_vector_uint64_pair_t * trends_diff)677 static void DBflush_trends(ZBX_DC_TREND *trends, int *trends_num, zbx_vector_uint64_pair_t *trends_diff)
678 {
679 int num, i, clock, inserts_num = 0, itemids_alloc, itemids_num = 0, trends_to = *trends_num;
680 unsigned char value_type;
681 zbx_uint64_t *itemids = NULL;
682 ZBX_DC_TREND *trend = NULL;
683 const char *table_name;
684
685 zabbix_log(LOG_LEVEL_DEBUG, "In %s() trends_num:%d", __func__, *trends_num);
686
687 clock = trends[0].clock;
688 value_type = trends[0].value_type;
689
690 switch (value_type)
691 {
692 case ITEM_VALUE_TYPE_FLOAT:
693 table_name = "trends";
694 break;
695 case ITEM_VALUE_TYPE_UINT64:
696 table_name = "trends_uint";
697 break;
698 default:
699 assert(0);
700 }
701
702 itemids_alloc = MIN(ZBX_HC_SYNC_MAX, *trends_num);
703 itemids = (zbx_uint64_t *)zbx_malloc(itemids, itemids_alloc * sizeof(zbx_uint64_t));
704
705 for (i = 0; i < *trends_num; i++)
706 {
707 trend = &trends[i];
708
709 if (clock != trend->clock || value_type != trend->value_type)
710 continue;
711
712 inserts_num++;
713
714 if (0 != trend->disable_from)
715 continue;
716
717 uint64_array_add(&itemids, &itemids_alloc, &itemids_num, trend->itemid, 64);
718
719 if (ZBX_HC_SYNC_MAX == itemids_num)
720 {
721 trends_to = i + 1;
722 break;
723 }
724 }
725
726 if (0 != itemids_num)
727 {
728 dc_remove_updated_trends(trends, trends_to, table_name, value_type, itemids,
729 &itemids_num, clock);
730 }
731
732 for (i = 0; i < trends_to; i++)
733 {
734 trend = &trends[i];
735
736 if (clock != trend->clock || value_type != trend->value_type)
737 continue;
738
739 if (0 != trend->disable_from && clock >= trend->disable_from)
740 continue;
741
742 uint64_array_add(&itemids, &itemids_alloc, &itemids_num, trend->itemid, 64);
743 }
744
745 if (0 != itemids_num)
746 {
747 dc_trends_fetch_and_update(trends, trends_to, itemids, itemids_num,
748 &inserts_num, value_type, table_name, clock);
749 }
750
751 zbx_free(itemids);
752
753 /* if 'trends' is not a primary trends buffer */
754 if (NULL != trends_diff)
755 {
756 /* we update it too */
757 for (i = 0; i < trends_to; i++)
758 {
759 zbx_uint64_pair_t pair;
760
761 if (0 == trends[i].itemid)
762 continue;
763
764 if (clock != trends[i].clock || value_type != trends[i].value_type)
765 continue;
766
767 if (0 == trends[i].disable_from || trends[i].disable_from > clock)
768 continue;
769
770 pair.first = trends[i].itemid;
771 pair.second = clock + SEC_PER_HOUR;
772 zbx_vector_uint64_pair_append(trends_diff, pair);
773 }
774 }
775
776 if (0 != inserts_num)
777 dc_insert_trends_in_db(trends, trends_to, value_type, table_name, clock);
778
779 /* clean trends */
780 for (i = 0, num = 0; i < *trends_num; i++)
781 {
782 if (0 == trends[i].itemid)
783 continue;
784
785 memcpy(&trends[num++], &trends[i], sizeof(ZBX_DC_TREND));
786 }
787 *trends_num = num;
788
789 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
790 }
791
792 /******************************************************************************
793 * *
794 * Function: DCflush_trend *
795 * *
796 * Purpose: move trend to the array of trends for flushing to DB *
797 * *
798 * Author: Alexander Vladishev *
799 * *
800 ******************************************************************************/
DCflush_trend(ZBX_DC_TREND * trend,ZBX_DC_TREND ** trends,int * trends_alloc,int * trends_num)801 static void DCflush_trend(ZBX_DC_TREND *trend, ZBX_DC_TREND **trends, int *trends_alloc, int *trends_num)
802 {
803 if (*trends_num == *trends_alloc)
804 {
805 *trends_alloc += 256;
806 *trends = (ZBX_DC_TREND *)zbx_realloc(*trends, *trends_alloc * sizeof(ZBX_DC_TREND));
807 }
808
809 memcpy(&(*trends)[*trends_num], trend, sizeof(ZBX_DC_TREND));
810 (*trends_num)++;
811
812 trend->clock = 0;
813 trend->num = 0;
814 memset(&trend->value_min, 0, sizeof(history_value_t));
815 memset(&trend->value_avg, 0, sizeof(value_avg_t));
816 memset(&trend->value_max, 0, sizeof(history_value_t));
817 }
818
819 /******************************************************************************
820 * *
821 * Function: DCadd_trend *
822 * *
823 * Purpose: add new value to the trends *
824 * *
825 * Author: Alexander Vladishev *
826 * *
827 ******************************************************************************/
DCadd_trend(const ZBX_DC_HISTORY * history,ZBX_DC_TREND ** trends,int * trends_alloc,int * trends_num)828 static void DCadd_trend(const ZBX_DC_HISTORY *history, ZBX_DC_TREND **trends, int *trends_alloc, int *trends_num)
829 {
830 ZBX_DC_TREND *trend = NULL;
831 int hour;
832
833 hour = history->ts.sec - history->ts.sec % SEC_PER_HOUR;
834
835 trend = DCget_trend(history->itemid);
836
837 if (trend->num > 0 && (trend->clock != hour || trend->value_type != history->value_type) &&
838 SUCCEED == zbx_history_requires_trends(trend->value_type))
839 {
840 DCflush_trend(trend, trends, trends_alloc, trends_num);
841 }
842
843 trend->value_type = history->value_type;
844 trend->clock = hour;
845
846 switch (trend->value_type)
847 {
848 case ITEM_VALUE_TYPE_FLOAT:
849 if (trend->num == 0 || history->value.dbl < trend->value_min.dbl)
850 trend->value_min.dbl = history->value.dbl;
851 if (trend->num == 0 || history->value.dbl > trend->value_max.dbl)
852 trend->value_max.dbl = history->value.dbl;
853 trend->value_avg.dbl += history->value.dbl / (trend->num + 1) -
854 trend->value_avg.dbl / (trend->num + 1);
855 break;
856 case ITEM_VALUE_TYPE_UINT64:
857 if (trend->num == 0 || history->value.ui64 < trend->value_min.ui64)
858 trend->value_min.ui64 = history->value.ui64;
859 if (trend->num == 0 || history->value.ui64 > trend->value_max.ui64)
860 trend->value_max.ui64 = history->value.ui64;
861 uinc128_64(&trend->value_avg.ui64, history->value.ui64);
862 break;
863 }
864 trend->num++;
865 }
866
867 /******************************************************************************
868 * *
869 * Function: DCmass_update_trends *
870 * *
871 * Purpose: update trends cache and get list of trends to flush into database *
872 * *
873 * Parameters: history - [IN] array of history data *
874 * history_num - [IN] number of history structures *
875 * trends - [OUT] list of trends to flush into database *
876 * trends_num - [OUT] number of trends *
877 * compression_age - [IN] history compression age *
878 * *
879 * Author: Alexander Vladishev *
880 * *
881 ******************************************************************************/
DCmass_update_trends(const ZBX_DC_HISTORY * history,int history_num,ZBX_DC_TREND ** trends,int * trends_num,int compression_age)882 static void DCmass_update_trends(const ZBX_DC_HISTORY *history, int history_num, ZBX_DC_TREND **trends,
883 int *trends_num, int compression_age)
884 {
885 static int last_trend_discard = 0;
886 zbx_timespec_t ts;
887 int trends_alloc = 0, i, hour, seconds;
888
889 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
890
891 zbx_timespec(&ts);
892 seconds = ts.sec % SEC_PER_HOUR;
893 hour = ts.sec - seconds;
894
895 LOCK_TRENDS;
896
897 for (i = 0; i < history_num; i++)
898 {
899 const ZBX_DC_HISTORY *h = &history[i];
900
901 if (0 != (ZBX_DC_FLAGS_NOT_FOR_TRENDS & h->flags))
902 continue;
903
904 DCadd_trend(h, trends, &trends_alloc, trends_num);
905 }
906
907 if (cache->trends_last_cleanup_hour < hour && ZBX_TRENDS_CLEANUP_TIME < seconds)
908 {
909 zbx_hashset_iter_t iter;
910 ZBX_DC_TREND *trend;
911
912 zbx_hashset_iter_reset(&cache->trends, &iter);
913
914 while (NULL != (trend = (ZBX_DC_TREND *)zbx_hashset_iter_next(&iter)))
915 {
916 if (trend->clock == hour)
917 continue;
918
919 /* discard trend items that are older than compression age */
920 if (0 != compression_age && trend->clock < compression_age)
921 {
922 if (SEC_PER_HOUR < (ts.sec - last_trend_discard)) /* log once per hour */
923 {
924 zabbix_log(LOG_LEVEL_TRACE, "discarding trends that are pointing to"
925 " compressed history period");
926 last_trend_discard = ts.sec;
927 }
928 }
929 else if (SUCCEED == zbx_history_requires_trends(trend->value_type))
930 DCflush_trend(trend, trends, &trends_alloc, trends_num);
931
932 zbx_hashset_iter_remove(&iter);
933 }
934
935 cache->trends_last_cleanup_hour = hour;
936 }
937
938 UNLOCK_TRENDS;
939
940 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
941 }
942
zbx_trend_compare(const void * d1,const void * d2)943 static int zbx_trend_compare(const void *d1, const void *d2)
944 {
945 const ZBX_DC_TREND *p1 = (const ZBX_DC_TREND *)d1;
946 const ZBX_DC_TREND *p2 = (const ZBX_DC_TREND *)d2;
947
948 ZBX_RETURN_IF_NOT_EQUAL(p1->itemid, p2->itemid);
949 ZBX_RETURN_IF_NOT_EQUAL(p1->clock, p2->clock);
950
951 return 0;
952 }
953
954 /******************************************************************************
955 * *
956 * Function: DBmass_update_trends *
957 * *
958 * Purpose: prepare history data using items from configuration cache *
959 * *
960 * Parameters: trends - [IN] trends from cache to be added to database *
961 * trends_num - [IN] number of trends to add to database *
962 * trends_diff - [OUT] disable_from updates *
963 * *
964 ******************************************************************************/
DBmass_update_trends(const ZBX_DC_TREND * trends,int trends_num,zbx_vector_uint64_pair_t * trends_diff)965 static void DBmass_update_trends(const ZBX_DC_TREND *trends, int trends_num,
966 zbx_vector_uint64_pair_t *trends_diff)
967 {
968 ZBX_DC_TREND *trends_tmp;
969
970 if (0 != trends_num)
971 {
972 trends_tmp = (ZBX_DC_TREND *)zbx_malloc(NULL, trends_num * sizeof(ZBX_DC_TREND));
973 memcpy(trends_tmp, trends, trends_num * sizeof(ZBX_DC_TREND));
974 qsort(trends_tmp, trends_num, sizeof(ZBX_DC_TREND), zbx_trend_compare);
975
976 while (0 < trends_num)
977 DBflush_trends(trends_tmp, &trends_num, trends_diff);
978
979 zbx_free(trends_tmp);
980 }
981 }
982
983 typedef struct
984 {
985 zbx_uint64_t hostid;
986 zbx_vector_ptr_t groups;
987 }
988 zbx_host_info_t;
989
990 /******************************************************************************
991 * *
992 * Function: zbx_host_info_clean *
993 * *
994 * Purpose: frees resources allocated to store host groups names *
995 * *
996 * Parameters: host_info - [IN] host information *
997 * *
998 ******************************************************************************/
zbx_host_info_clean(zbx_host_info_t * host_info)999 static void zbx_host_info_clean(zbx_host_info_t *host_info)
1000 {
1001 zbx_vector_ptr_clear_ext(&host_info->groups, zbx_ptr_free);
1002 zbx_vector_ptr_destroy(&host_info->groups);
1003 }
1004
1005 /******************************************************************************
1006 * *
1007 * Function: db_get_hosts_info_by_hostid *
1008 * *
1009 * Purpose: get hosts groups names *
1010 * *
1011 * Parameters: hosts_info - [IN/OUT] output names of host groups for a host *
1012 * hostids - [IN] hosts identifiers *
1013 * *
1014 ******************************************************************************/
db_get_hosts_info_by_hostid(zbx_hashset_t * hosts_info,const zbx_vector_uint64_t * hostids)1015 static void db_get_hosts_info_by_hostid(zbx_hashset_t *hosts_info, const zbx_vector_uint64_t *hostids)
1016 {
1017 int i;
1018 size_t sql_offset = 0;
1019 DB_RESULT result;
1020 DB_ROW row;
1021
1022 for (i = 0; i < hostids->values_num; i++)
1023 {
1024 zbx_host_info_t host_info = {.hostid = hostids->values[i]};
1025
1026 zbx_vector_ptr_create(&host_info.groups);
1027 zbx_hashset_insert(hosts_info, &host_info, sizeof(host_info));
1028 }
1029
1030 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
1031 "select distinct hg.hostid,g.name"
1032 " from hstgrp g,hosts_groups hg"
1033 " where g.groupid=hg.groupid"
1034 " and");
1035
1036 DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "hg.hostid", hostids->values, hostids->values_num);
1037
1038 result = DBselect("%s", sql);
1039
1040 while (NULL != (row = DBfetch(result)))
1041 {
1042 zbx_uint64_t hostid;
1043 zbx_host_info_t *host_info;
1044
1045 ZBX_DBROW2UINT64(hostid, row[0]);
1046
1047 if (NULL == (host_info = (zbx_host_info_t *)zbx_hashset_search(hosts_info, &hostid)))
1048 {
1049 THIS_SHOULD_NEVER_HAPPEN;
1050 continue;
1051 }
1052
1053 zbx_vector_ptr_append(&host_info->groups, zbx_strdup(NULL, row[1]));
1054 }
1055 DBfree_result(result);
1056 }
1057
1058 typedef struct
1059 {
1060 zbx_uint64_t itemid;
1061 char *name;
1062 DC_ITEM *item;
1063 zbx_vector_item_tag_t item_tags;
1064 }
1065 zbx_item_info_t;
1066
1067 /******************************************************************************
1068 * *
1069 * Function: db_get_items_info_by_itemid *
1070 * *
1071 * Purpose: get items name and item tags *
1072 * *
1073 * Parameters: items_info - [IN/OUT] output item name and item tags *
1074 * itemids - [IN] the item identifiers *
1075 * *
1076 ******************************************************************************/
db_get_items_info_by_itemid(zbx_hashset_t * items_info,const zbx_vector_uint64_t * itemids)1077 static void db_get_items_info_by_itemid(zbx_hashset_t *items_info, const zbx_vector_uint64_t *itemids)
1078 {
1079 size_t sql_offset = 0;
1080 DB_RESULT result;
1081 DB_ROW row;
1082
1083 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select itemid,name from items where");
1084 DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", itemids->values, itemids->values_num);
1085
1086 result = DBselect("%s", sql);
1087
1088 while (NULL != (row = DBfetch(result)))
1089 {
1090 zbx_uint64_t itemid;
1091 zbx_item_info_t *item_info;
1092
1093 ZBX_DBROW2UINT64(itemid, row[0]);
1094
1095 if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &itemid)))
1096 {
1097 THIS_SHOULD_NEVER_HAPPEN;
1098 continue;
1099 }
1100
1101 zbx_substitute_item_name_macros(item_info->item, row[1], &item_info->name);
1102 }
1103 DBfree_result(result);
1104
1105 sql_offset = 0;
1106 zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
1107 "select itemid,tag,value from item_tag where");
1108
1109 DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", itemids->values, itemids->values_num);
1110
1111 result = DBselect("%s", sql);
1112
1113 while (NULL != (row = DBfetch(result)))
1114 {
1115 zbx_uint64_t itemid;
1116 zbx_item_info_t *item_info;
1117 zbx_tag_t item_tag;
1118
1119 ZBX_DBROW2UINT64(itemid, row[0]);
1120
1121 if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &itemid)))
1122 {
1123 THIS_SHOULD_NEVER_HAPPEN;
1124 continue;
1125 }
1126
1127 item_tag.tag = zbx_strdup(NULL, row[1]);
1128 item_tag.value = zbx_strdup(NULL, row[2]);
1129 zbx_vector_item_tag_append(&item_info->item_tags, item_tag);
1130 }
1131 DBfree_result(result);
1132 }
1133
1134 /******************************************************************************
1135 * *
1136 * Function: item_tag_free *
1137 * *
1138 * Purpose: frees resources allocated to store item tag *
1139 * *
1140 * Parameters: item_tag - [IN] item tag *
1141 * *
1142 ******************************************************************************/
item_tag_free(zbx_tag_t item_tag)1143 static void item_tag_free(zbx_tag_t item_tag)
1144 {
1145 zbx_free(item_tag.tag);
1146 zbx_free(item_tag.value);
1147 }
1148
1149 /******************************************************************************
1150 * *
1151 * Function: zbx_item_info_clean *
1152 * *
1153 * Purpose: frees resources allocated to store item tags and name *
1154 * *
1155 * Parameters: item_info - [IN] item information *
1156 * *
1157 ******************************************************************************/
zbx_item_info_clean(zbx_item_info_t * item_info)1158 static void zbx_item_info_clean(zbx_item_info_t *item_info)
1159 {
1160 zbx_vector_item_tag_clear_ext(&item_info->item_tags, item_tag_free);
1161 zbx_vector_item_tag_destroy(&item_info->item_tags);
1162 zbx_free(item_info->name);
1163 }
1164
1165 /******************************************************************************
1166 * *
1167 * Function: DCexport_trends *
1168 * *
1169 * Purpose: export trends *
1170 * *
1171 * Parameters: trends - [IN] trends from cache *
1172 * trends_num - [IN] number of trends *
1173 * hosts_info - [IN] hosts groups names *
1174 * items_info - [IN] item names and tags *
1175 * *
1176 ******************************************************************************/
DCexport_trends(const ZBX_DC_TREND * trends,int trends_num,zbx_hashset_t * hosts_info,zbx_hashset_t * items_info)1177 static void DCexport_trends(const ZBX_DC_TREND *trends, int trends_num, zbx_hashset_t *hosts_info,
1178 zbx_hashset_t *items_info)
1179 {
1180 struct zbx_json json;
1181 const ZBX_DC_TREND *trend = NULL;
1182 int i, j;
1183 const DC_ITEM *item;
1184 zbx_host_info_t *host_info;
1185 zbx_item_info_t *item_info;
1186 zbx_uint128_t avg; /* calculate the trend average value */
1187
1188 zbx_json_init(&json, ZBX_JSON_STAT_BUF_LEN);
1189
1190 for (i = 0; i < trends_num; i++)
1191 {
1192 trend = &trends[i];
1193
1194 if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &trend->itemid)))
1195 continue;
1196
1197 item = item_info->item;
1198
1199 if (NULL == (host_info = (zbx_host_info_t *)zbx_hashset_search(hosts_info, &item->host.hostid)))
1200 {
1201 THIS_SHOULD_NEVER_HAPPEN;
1202 continue;
1203 }
1204
1205 zbx_json_clean(&json);
1206
1207 zbx_json_addobject(&json,ZBX_PROTO_TAG_HOST);
1208 zbx_json_addstring(&json, ZBX_PROTO_TAG_HOST, item->host.host, ZBX_JSON_TYPE_STRING);
1209 zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item->host.name, ZBX_JSON_TYPE_STRING);
1210 zbx_json_close(&json);
1211
1212 zbx_json_addarray(&json, ZBX_PROTO_TAG_GROUPS);
1213
1214 for (j = 0; j < host_info->groups.values_num; j++)
1215 zbx_json_addstring(&json, NULL, host_info->groups.values[j], ZBX_JSON_TYPE_STRING);
1216
1217 zbx_json_close(&json);
1218
1219 zbx_json_addarray(&json, ZBX_PROTO_TAG_ITEM_TAGS);
1220
1221 for (j = 0; j < item_info->item_tags.values_num; j++)
1222 {
1223 zbx_tag_t item_tag = item_info->item_tags.values[j];
1224
1225 zbx_json_addobject(&json, NULL);
1226 zbx_json_addstring(&json, ZBX_PROTO_TAG_TAG, item_tag.tag, ZBX_JSON_TYPE_STRING);
1227 zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, item_tag.value, ZBX_JSON_TYPE_STRING);
1228 zbx_json_close(&json);
1229 }
1230
1231 zbx_json_close(&json);
1232 zbx_json_adduint64(&json, ZBX_PROTO_TAG_ITEMID, item->itemid);
1233
1234 if (NULL != item_info->name)
1235 zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item_info->name, ZBX_JSON_TYPE_STRING);
1236
1237 zbx_json_addint64(&json, ZBX_PROTO_TAG_CLOCK, trend->clock);
1238 zbx_json_addint64(&json, ZBX_PROTO_TAG_COUNT, trend->num);
1239
1240 switch (trend->value_type)
1241 {
1242 case ITEM_VALUE_TYPE_FLOAT:
1243 zbx_json_addfloat(&json, ZBX_PROTO_TAG_MIN, trend->value_min.dbl);
1244 zbx_json_addfloat(&json, ZBX_PROTO_TAG_AVG, trend->value_avg.dbl);
1245 zbx_json_addfloat(&json, ZBX_PROTO_TAG_MAX, trend->value_max.dbl);
1246 break;
1247 case ITEM_VALUE_TYPE_UINT64:
1248 zbx_json_adduint64(&json, ZBX_PROTO_TAG_MIN, trend->value_min.ui64);
1249 udiv128_64(&avg, &trend->value_avg.ui64, trend->num);
1250 zbx_json_adduint64(&json, ZBX_PROTO_TAG_AVG, avg.lo);
1251 zbx_json_adduint64(&json, ZBX_PROTO_TAG_MAX, trend->value_max.ui64);
1252 break;
1253 default:
1254 THIS_SHOULD_NEVER_HAPPEN;
1255 }
1256
1257 zbx_json_adduint64(&json, ZBX_PROTO_TAG_TYPE, trend->value_type);
1258 zbx_trends_export_write(json.buffer, json.buffer_size);
1259 }
1260
1261 zbx_trends_export_flush();
1262 zbx_json_free(&json);
1263 }
1264
1265 /******************************************************************************
1266 * *
1267 * Function: DCexport_history *
1268 * *
1269 * Purpose: export history *
1270 * *
1271 * Parameters: history - [IN/OUT] array of history data *
1272 * history_num - [IN] number of history structures *
1273 * hosts_info - [IN] hosts groups names *
1274 * items_info - [IN] item names and tags *
1275 * *
1276 ******************************************************************************/
DCexport_history(const ZBX_DC_HISTORY * history,int history_num,zbx_hashset_t * hosts_info,zbx_hashset_t * items_info)1277 static void DCexport_history(const ZBX_DC_HISTORY *history, int history_num, zbx_hashset_t *hosts_info,
1278 zbx_hashset_t *items_info)
1279 {
1280 const ZBX_DC_HISTORY *h;
1281 const DC_ITEM *item;
1282 int i, j;
1283 zbx_host_info_t *host_info;
1284 zbx_item_info_t *item_info;
1285 struct zbx_json json;
1286
1287 zbx_json_init(&json, ZBX_JSON_STAT_BUF_LEN);
1288
1289 for (i = 0; i < history_num; i++)
1290 {
1291 h = &history[i];
1292
1293 if (0 != (ZBX_DC_FLAGS_NOT_FOR_MODULES & h->flags))
1294 continue;
1295
1296 if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &h->itemid)))
1297 {
1298 THIS_SHOULD_NEVER_HAPPEN;
1299 continue;
1300 }
1301
1302 item = item_info->item;
1303
1304 if (NULL == (host_info = (zbx_host_info_t *)zbx_hashset_search(hosts_info, &item->host.hostid)))
1305 {
1306 THIS_SHOULD_NEVER_HAPPEN;
1307 continue;
1308 }
1309
1310 zbx_json_clean(&json);
1311
1312 zbx_json_addobject(&json,ZBX_PROTO_TAG_HOST);
1313 zbx_json_addstring(&json, ZBX_PROTO_TAG_HOST, item->host.host, ZBX_JSON_TYPE_STRING);
1314 zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item->host.name, ZBX_JSON_TYPE_STRING);
1315 zbx_json_close(&json);
1316
1317 zbx_json_addarray(&json, ZBX_PROTO_TAG_GROUPS);
1318
1319 for (j = 0; j < host_info->groups.values_num; j++)
1320 zbx_json_addstring(&json, NULL, host_info->groups.values[j], ZBX_JSON_TYPE_STRING);
1321
1322 zbx_json_close(&json);
1323
1324 zbx_json_addarray(&json, ZBX_PROTO_TAG_ITEM_TAGS);
1325
1326 for (j = 0; j < item_info->item_tags.values_num; j++)
1327 {
1328 zbx_tag_t item_tag = item_info->item_tags.values[j];
1329
1330 zbx_json_addobject(&json, NULL);
1331 zbx_json_addstring(&json, ZBX_PROTO_TAG_TAG, item_tag.tag, ZBX_JSON_TYPE_STRING);
1332 zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, item_tag.value, ZBX_JSON_TYPE_STRING);
1333 zbx_json_close(&json);
1334 }
1335
1336 zbx_json_close(&json);
1337 zbx_json_adduint64(&json, ZBX_PROTO_TAG_ITEMID, item->itemid);
1338
1339 if (NULL != item_info->name)
1340 zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item_info->name, ZBX_JSON_TYPE_STRING);
1341
1342 zbx_json_addint64(&json, ZBX_PROTO_TAG_CLOCK, h->ts.sec);
1343 zbx_json_addint64(&json, ZBX_PROTO_TAG_NS, h->ts.ns);
1344
1345 switch (h->value_type)
1346 {
1347 case ITEM_VALUE_TYPE_FLOAT:
1348 zbx_json_addfloat(&json, ZBX_PROTO_TAG_VALUE, h->value.dbl);
1349 break;
1350 case ITEM_VALUE_TYPE_UINT64:
1351 zbx_json_adduint64(&json, ZBX_PROTO_TAG_VALUE, h->value.ui64);
1352 break;
1353 case ITEM_VALUE_TYPE_STR:
1354 zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, h->value.str, ZBX_JSON_TYPE_STRING);
1355 break;
1356 case ITEM_VALUE_TYPE_TEXT:
1357 zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, h->value.str, ZBX_JSON_TYPE_STRING);
1358 break;
1359 case ITEM_VALUE_TYPE_LOG:
1360 zbx_json_addint64(&json, ZBX_PROTO_TAG_LOGTIMESTAMP, h->value.log->timestamp);
1361 zbx_json_addstring(&json, ZBX_PROTO_TAG_LOGSOURCE,
1362 ZBX_NULL2EMPTY_STR(h->value.log->source), ZBX_JSON_TYPE_STRING);
1363 zbx_json_addint64(&json, ZBX_PROTO_TAG_LOGSEVERITY, h->value.log->severity);
1364 zbx_json_addint64(&json, ZBX_PROTO_TAG_LOGEVENTID, h->value.log->logeventid);
1365 zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, h->value.log->value,
1366 ZBX_JSON_TYPE_STRING);
1367 break;
1368 default:
1369 THIS_SHOULD_NEVER_HAPPEN;
1370 }
1371
1372 zbx_json_adduint64(&json, ZBX_PROTO_TAG_TYPE, h->value_type);
1373 zbx_history_export_write(json.buffer, json.buffer_size);
1374 }
1375
1376 zbx_history_export_flush();
1377 zbx_json_free(&json);
1378 }
1379
1380 /******************************************************************************
1381 * *
1382 * Function: DCexport_history_and_trends *
1383 * *
1384 * Purpose: export history and trends *
1385 * *
1386 * Parameters: history - [IN/OUT] array of history data *
1387 * history_num - [IN] number of history structures *
1388 * itemids - [IN] the item identifiers *
1389 * (used for item lookup) *
1390 * items - [IN] the items *
1391 * errcodes - [IN] item error codes *
1392 * trends - [IN] trends from cache *
1393 * trends_num - [IN] number of trends *
1394 * *
1395 ******************************************************************************/
DCexport_history_and_trends(const ZBX_DC_HISTORY * history,int history_num,const zbx_vector_uint64_t * itemids,DC_ITEM * items,const int * errcodes,const ZBX_DC_TREND * trends,int trends_num)1396 static void DCexport_history_and_trends(const ZBX_DC_HISTORY *history, int history_num,
1397 const zbx_vector_uint64_t *itemids, DC_ITEM *items, const int *errcodes, const ZBX_DC_TREND *trends,
1398 int trends_num)
1399 {
1400 int i, index;
1401 zbx_vector_uint64_t hostids, item_info_ids;
1402 zbx_hashset_t hosts_info, items_info;
1403 DC_ITEM *item;
1404 zbx_item_info_t item_info;
1405
1406 zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d trends_num:%d", __func__, history_num, trends_num);
1407
1408 zbx_vector_uint64_create(&hostids);
1409 zbx_vector_uint64_create(&item_info_ids);
1410 zbx_hashset_create_ext(&items_info, itemids->values_num, ZBX_DEFAULT_UINT64_HASH_FUNC,
1411 ZBX_DEFAULT_UINT64_COMPARE_FUNC, (zbx_clean_func_t)zbx_item_info_clean,
1412 ZBX_DEFAULT_MEM_MALLOC_FUNC, ZBX_DEFAULT_MEM_REALLOC_FUNC, ZBX_DEFAULT_MEM_FREE_FUNC);
1413
1414 for (i = 0; i < history_num; i++)
1415 {
1416 const ZBX_DC_HISTORY *h = &history[i];
1417
1418 if (0 != (ZBX_DC_FLAGS_NOT_FOR_EXPORT & h->flags))
1419 continue;
1420
1421 if (FAIL == (index = zbx_vector_uint64_bsearch(itemids, h->itemid, ZBX_DEFAULT_UINT64_COMPARE_FUNC)))
1422 {
1423 THIS_SHOULD_NEVER_HAPPEN;
1424 continue;
1425 }
1426
1427 if (SUCCEED != errcodes[index])
1428 continue;
1429
1430 item = &items[index];
1431
1432 zbx_vector_uint64_append(&hostids, item->host.hostid);
1433 zbx_vector_uint64_append(&item_info_ids, item->itemid);
1434
1435 item_info.itemid = item->itemid;
1436 item_info.name = NULL;
1437 item_info.item = item;
1438 zbx_vector_item_tag_create(&item_info.item_tags);
1439 zbx_hashset_insert(&items_info, &item_info, sizeof(item_info));
1440 }
1441
1442 if (0 == history_num)
1443 {
1444 for (i = 0; i < trends_num; i++)
1445 {
1446 const ZBX_DC_TREND *trend = &trends[i];
1447
1448 if (FAIL == (index = zbx_vector_uint64_bsearch(itemids, trend->itemid,
1449 ZBX_DEFAULT_UINT64_COMPARE_FUNC)))
1450 {
1451 THIS_SHOULD_NEVER_HAPPEN;
1452 continue;
1453 }
1454
1455 if (SUCCEED != errcodes[index])
1456 continue;
1457
1458 item = &items[index];
1459
1460 zbx_vector_uint64_append(&hostids, item->host.hostid);
1461 zbx_vector_uint64_append(&item_info_ids, item->itemid);
1462
1463 item_info.itemid = item->itemid;
1464 item_info.name = NULL;
1465 item_info.item = item;
1466 zbx_vector_item_tag_create(&item_info.item_tags);
1467 zbx_hashset_insert(&items_info, &item_info, sizeof(item_info));
1468 }
1469 }
1470
1471 if (0 == item_info_ids.values_num)
1472 goto clean;
1473
1474 zbx_vector_uint64_sort(&item_info_ids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1475 zbx_vector_uint64_sort(&hostids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1476 zbx_vector_uint64_uniq(&hostids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1477
1478 zbx_hashset_create_ext(&hosts_info, hostids.values_num, ZBX_DEFAULT_UINT64_HASH_FUNC,
1479 ZBX_DEFAULT_UINT64_COMPARE_FUNC, (zbx_clean_func_t)zbx_host_info_clean,
1480 ZBX_DEFAULT_MEM_MALLOC_FUNC, ZBX_DEFAULT_MEM_REALLOC_FUNC, ZBX_DEFAULT_MEM_FREE_FUNC);
1481
1482 db_get_hosts_info_by_hostid(&hosts_info, &hostids);
1483
1484 db_get_items_info_by_itemid(&items_info, &item_info_ids);
1485
1486 if (0 != history_num)
1487 DCexport_history(history, history_num, &hosts_info, &items_info);
1488
1489 if (0 != trends_num)
1490 DCexport_trends(trends, trends_num, &hosts_info, &items_info);
1491
1492 zbx_hashset_destroy(&hosts_info);
1493 clean:
1494 zbx_hashset_destroy(&items_info);
1495 zbx_vector_uint64_destroy(&item_info_ids);
1496 zbx_vector_uint64_destroy(&hostids);
1497
1498 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1499 }
1500
1501 /******************************************************************************
1502 * *
1503 * Function: DCexport_all_trends *
1504 * *
1505 * Purpose: export all trends *
1506 * *
1507 * Parameters: trends - [IN] trends from cache *
1508 * trends_num - [IN] number of trends *
1509 * *
1510 ******************************************************************************/
DCexport_all_trends(const ZBX_DC_TREND * trends,int trends_num)1511 static void DCexport_all_trends(const ZBX_DC_TREND *trends, int trends_num)
1512 {
1513 DC_ITEM *items;
1514 zbx_vector_uint64_t itemids;
1515 int *errcodes, i, num;
1516
1517 zabbix_log(LOG_LEVEL_WARNING, "exporting trend data...");
1518
1519 while (0 < trends_num)
1520 {
1521 num = MIN(ZBX_HC_SYNC_MAX, trends_num);
1522
1523 items = (DC_ITEM *)zbx_malloc(NULL, sizeof(DC_ITEM) * (size_t)num);
1524 errcodes = (int *)zbx_malloc(NULL, sizeof(int) * (size_t)num);
1525
1526 zbx_vector_uint64_create(&itemids);
1527 zbx_vector_uint64_reserve(&itemids, num);
1528
1529 for (i = 0; i < num; i++)
1530 zbx_vector_uint64_append(&itemids, trends[i].itemid);
1531
1532 zbx_vector_uint64_sort(&itemids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1533
1534 DCconfig_get_items_by_itemids(items, itemids.values, errcodes, num);
1535
1536 DCexport_history_and_trends(NULL, 0, &itemids, items, errcodes, trends, num);
1537
1538 DCconfig_clean_items(items, errcodes, num);
1539 zbx_vector_uint64_destroy(&itemids);
1540 zbx_free(items);
1541 zbx_free(errcodes);
1542
1543 trends += num;
1544 trends_num -= num;
1545 }
1546
1547 zabbix_log(LOG_LEVEL_WARNING, "exporting trend data done");
1548 }
1549
1550 /******************************************************************************
1551 * *
1552 * Function: DCsync_trends *
1553 * *
1554 * Purpose: flush all trends to the database *
1555 * *
1556 * Author: Alexander Vladishev *
1557 * *
1558 ******************************************************************************/
DCsync_trends(void)1559 static void DCsync_trends(void)
1560 {
1561 zbx_hashset_iter_t iter;
1562 ZBX_DC_TREND *trends = NULL, *trend;
1563 int trends_alloc = 0, trends_num = 0, compression_age;
1564
1565 zabbix_log(LOG_LEVEL_DEBUG, "In %s() trends_num:%d", __func__, cache->trends_num);
1566
1567 compression_age = hc_get_history_compression_age();
1568
1569 zabbix_log(LOG_LEVEL_WARNING, "syncing trend data...");
1570
1571 LOCK_TRENDS;
1572
1573 zbx_hashset_iter_reset(&cache->trends, &iter);
1574
1575 while (NULL != (trend = (ZBX_DC_TREND *)zbx_hashset_iter_next(&iter)))
1576 {
1577 if (SUCCEED == zbx_history_requires_trends(trend->value_type) && trend->clock >= compression_age)
1578 DCflush_trend(trend, &trends, &trends_alloc, &trends_num);
1579 }
1580
1581 UNLOCK_TRENDS;
1582
1583 if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_TRENDS) && 0 != trends_num)
1584 DCexport_all_trends(trends, trends_num);
1585
1586 if (0 < trends_num)
1587 qsort(trends, trends_num, sizeof(ZBX_DC_TREND), zbx_trend_compare);
1588
1589 DBbegin();
1590
1591 while (trends_num > 0)
1592 DBflush_trends(trends, &trends_num, NULL);
1593
1594 DBcommit();
1595
1596 zbx_free(trends);
1597
1598 zabbix_log(LOG_LEVEL_WARNING, "syncing trend data done");
1599
1600 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1601 }
1602
1603 /******************************************************************************
1604 * *
1605 * Function: recalculate_triggers *
1606 * *
1607 * Purpose: re-calculate and update values of triggers related to the items *
1608 * *
1609 * Parameters: history - [IN] array of history data *
1610 * history_num - [IN] number of history structures *
1611 * history_itemids - [IN] the item identifiers *
1612 * (used for item lookup) *
1613 * history_items - [IN] the items *
1614 * history_errcodes - [IN] item error codes *
1615 * timers - [IN] the trigger timers *
1616 * trigger_diff - [OUT] trigger updates *
1617 * *
1618 ******************************************************************************/
recalculate_triggers(const ZBX_DC_HISTORY * history,int history_num,const zbx_vector_uint64_t * history_itemids,const DC_ITEM * history_items,const int * history_errcodes,const zbx_vector_ptr_t * timers,zbx_vector_ptr_t * trigger_diff)1619 static void recalculate_triggers(const ZBX_DC_HISTORY *history, int history_num,
1620 const zbx_vector_uint64_t *history_itemids, const DC_ITEM *history_items, const int *history_errcodes,
1621 const zbx_vector_ptr_t *timers, zbx_vector_ptr_t *trigger_diff)
1622 {
1623 int i, item_num = 0, timers_num = 0;
1624 zbx_uint64_t *itemids = NULL;
1625 zbx_timespec_t *timespecs = NULL;
1626 zbx_hashset_t trigger_info;
1627 zbx_vector_ptr_t trigger_order;
1628 zbx_vector_ptr_t trigger_items;
1629
1630 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1631
1632 if (0 != history_num)
1633 {
1634 itemids = (zbx_uint64_t *)zbx_malloc(itemids, sizeof(zbx_uint64_t) * (size_t)history_num);
1635 timespecs = (zbx_timespec_t *)zbx_malloc(timespecs, sizeof(zbx_timespec_t) * (size_t)history_num);
1636
1637 for (i = 0; i < history_num; i++)
1638 {
1639 const ZBX_DC_HISTORY *h = &history[i];
1640
1641 if (0 != (ZBX_DC_FLAG_NOVALUE & h->flags))
1642 continue;
1643
1644 itemids[item_num] = h->itemid;
1645 timespecs[item_num] = h->ts;
1646 item_num++;
1647 }
1648 }
1649
1650 for (i = 0; i < timers->values_num; i++)
1651 {
1652 zbx_trigger_timer_t *timer = (zbx_trigger_timer_t *)timers->values[i];
1653
1654 if (0 != timer->lock)
1655 timers_num++;
1656 }
1657
1658 if (0 == item_num && 0 == timers_num)
1659 goto out;
1660
1661 zbx_hashset_create(&trigger_info, MAX(100, 2 * item_num + timers_num),
1662 ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1663
1664 zbx_vector_ptr_create(&trigger_order);
1665 zbx_vector_ptr_reserve(&trigger_order, trigger_info.num_slots);
1666
1667 zbx_vector_ptr_create(&trigger_items);
1668
1669 if (0 != item_num)
1670 {
1671 DCconfig_get_triggers_by_itemids(&trigger_info, &trigger_order, itemids, timespecs, item_num);
1672 prepare_triggers((DC_TRIGGER **)trigger_order.values, trigger_order.values_num);
1673 zbx_determine_items_in_expressions(&trigger_order, itemids, item_num);
1674 }
1675
1676 if (0 != timers_num)
1677 {
1678 int offset = trigger_order.values_num;
1679
1680 zbx_dc_get_triggers_by_timers(&trigger_info, &trigger_order, timers);
1681
1682 if (offset != trigger_order.values_num)
1683 {
1684 prepare_triggers((DC_TRIGGER **)trigger_order.values + offset,
1685 trigger_order.values_num - offset);
1686 }
1687 }
1688
1689 zbx_vector_ptr_sort(&trigger_order, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC);
1690 evaluate_expressions(&trigger_order, history_itemids, history_items, history_errcodes);
1691 zbx_process_triggers(&trigger_order, trigger_diff);
1692
1693 DCfree_triggers(&trigger_order);
1694
1695 zbx_vector_ptr_destroy(&trigger_items);
1696
1697 zbx_hashset_destroy(&trigger_info);
1698 zbx_vector_ptr_destroy(&trigger_order);
1699 out:
1700 zbx_free(timespecs);
1701 zbx_free(itemids);
1702
1703 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1704 }
1705
DCinventory_value_add(zbx_vector_ptr_t * inventory_values,const DC_ITEM * item,ZBX_DC_HISTORY * h)1706 static void DCinventory_value_add(zbx_vector_ptr_t *inventory_values, const DC_ITEM *item, ZBX_DC_HISTORY *h)
1707 {
1708 char value[MAX_BUFFER_LEN];
1709 const char *inventory_field;
1710 zbx_inventory_value_t *inventory_value;
1711
1712 if (ITEM_STATE_NOTSUPPORTED == h->state)
1713 return;
1714
1715 if (HOST_INVENTORY_AUTOMATIC != item->host.inventory_mode)
1716 return;
1717
1718 if (0 != (ZBX_DC_FLAG_UNDEF & h->flags) || 0 != (ZBX_DC_FLAG_NOVALUE & h->flags) ||
1719 NULL == (inventory_field = DBget_inventory_field(item->inventory_link)))
1720 {
1721 return;
1722 }
1723
1724 switch (h->value_type)
1725 {
1726 case ITEM_VALUE_TYPE_FLOAT:
1727 zbx_print_double(value, sizeof(value), h->value.dbl);
1728 break;
1729 case ITEM_VALUE_TYPE_UINT64:
1730 zbx_snprintf(value, sizeof(value), ZBX_FS_UI64, h->value.ui64);
1731 break;
1732 case ITEM_VALUE_TYPE_STR:
1733 case ITEM_VALUE_TYPE_TEXT:
1734 strscpy(value, h->value.str);
1735 break;
1736 default:
1737 return;
1738 }
1739
1740 zbx_format_value(value, sizeof(value), item->valuemapid, item->units, h->value_type);
1741
1742 inventory_value = (zbx_inventory_value_t *)zbx_malloc(NULL, sizeof(zbx_inventory_value_t));
1743
1744 inventory_value->hostid = item->host.hostid;
1745 inventory_value->idx = item->inventory_link - 1;
1746 inventory_value->field_name = inventory_field;
1747 inventory_value->value = zbx_strdup(NULL, value);
1748
1749 zbx_vector_ptr_append(inventory_values, inventory_value);
1750 }
1751
DCadd_update_inventory_sql(size_t * sql_offset,const zbx_vector_ptr_t * inventory_values)1752 static void DCadd_update_inventory_sql(size_t *sql_offset, const zbx_vector_ptr_t *inventory_values)
1753 {
1754 char *value_esc;
1755 int i;
1756
1757 for (i = 0; i < inventory_values->values_num; i++)
1758 {
1759 const zbx_inventory_value_t *inventory_value = (zbx_inventory_value_t *)inventory_values->values[i];
1760
1761 value_esc = DBdyn_escape_field("host_inventory", inventory_value->field_name, inventory_value->value);
1762
1763 zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset,
1764 "update host_inventory set %s='%s' where hostid=" ZBX_FS_UI64 ";\n",
1765 inventory_value->field_name, value_esc, inventory_value->hostid);
1766
1767 DBexecute_overflowed_sql(&sql, &sql_alloc, sql_offset);
1768
1769 zbx_free(value_esc);
1770 }
1771 }
1772
DCinventory_value_free(zbx_inventory_value_t * inventory_value)1773 static void DCinventory_value_free(zbx_inventory_value_t *inventory_value)
1774 {
1775 zbx_free(inventory_value->value);
1776 zbx_free(inventory_value);
1777 }
1778
1779 /******************************************************************************
1780 * *
1781 * Function: dc_history_clean_value *
1782 * *
1783 * Purpose: frees resources allocated to store str/text/log value *
1784 * *
1785 * Parameters: history - [IN] the history data *
1786 * history_num - [IN] the number of values in history data *
1787 * *
1788 ******************************************************************************/
dc_history_clean_value(ZBX_DC_HISTORY * history)1789 static void dc_history_clean_value(ZBX_DC_HISTORY *history)
1790 {
1791 if (ITEM_STATE_NOTSUPPORTED == history->state)
1792 {
1793 zbx_free(history->value.err);
1794 return;
1795 }
1796
1797 if (0 != (ZBX_DC_FLAG_NOVALUE & history->flags))
1798 return;
1799
1800 switch (history->value_type)
1801 {
1802 case ITEM_VALUE_TYPE_LOG:
1803 zbx_free(history->value.log->value);
1804 zbx_free(history->value.log->source);
1805 zbx_free(history->value.log);
1806 break;
1807 case ITEM_VALUE_TYPE_STR:
1808 case ITEM_VALUE_TYPE_TEXT:
1809 zbx_free(history->value.str);
1810 break;
1811 }
1812 }
1813
1814 /******************************************************************************
1815 * *
1816 * Function: hc_free_item_values *
1817 * *
1818 * Purpose: frees resources allocated to store str/text/log values *
1819 * *
1820 * Parameters: history - [IN] the history data *
1821 * history_num - [IN] the number of values in history data *
1822 * *
1823 ******************************************************************************/
hc_free_item_values(ZBX_DC_HISTORY * history,int history_num)1824 static void hc_free_item_values(ZBX_DC_HISTORY *history, int history_num)
1825 {
1826 int i;
1827
1828 for (i = 0; i < history_num; i++)
1829 dc_history_clean_value(&history[i]);
1830 }
1831
1832 /******************************************************************************
1833 * *
1834 * Function: dc_history_set_error *
1835 * *
1836 * Purpose: sets history data to notsupported *
1837 * *
1838 * Parameters: history - [IN] the history data *
1839 * errmsg - [IN] the error message *
1840 * *
1841 * Comments: The error message is stored directly and freed with when history *
1842 * data is cleaned. *
1843 * *
1844 ******************************************************************************/
dc_history_set_error(ZBX_DC_HISTORY * hdata,char * errmsg)1845 static void dc_history_set_error(ZBX_DC_HISTORY *hdata, char *errmsg)
1846 {
1847 dc_history_clean_value(hdata);
1848 hdata->value.err = errmsg;
1849 hdata->state = ITEM_STATE_NOTSUPPORTED;
1850 hdata->flags |= ZBX_DC_FLAG_UNDEF;
1851 }
1852
1853 /******************************************************************************
1854 * *
1855 * Function: dc_history_set_value *
1856 * *
1857 * Purpose: sets history data value *
1858 * *
1859 * Parameters: hdata - [IN/OUT] the history data *
1860 * value_type - [IN] the item value type *
1861 * value - [IN] the value to set *
1862 * *
1863 ******************************************************************************/
dc_history_set_value(ZBX_DC_HISTORY * hdata,unsigned char value_type,zbx_variant_t * value)1864 static void dc_history_set_value(ZBX_DC_HISTORY *hdata, unsigned char value_type, zbx_variant_t *value)
1865 {
1866 char *errmsg = NULL;
1867
1868 if (FAIL == zbx_variant_to_value_type(value, value_type, CONFIG_DOUBLE_PRECISION, &errmsg))
1869 {
1870 dc_history_set_error(hdata, errmsg);
1871 return;
1872 }
1873
1874 switch (value_type)
1875 {
1876 case ITEM_VALUE_TYPE_FLOAT:
1877 dc_history_clean_value(hdata);
1878 hdata->value.dbl = value->data.dbl;
1879 break;
1880 case ITEM_VALUE_TYPE_UINT64:
1881 dc_history_clean_value(hdata);
1882 hdata->value.ui64 = value->data.ui64;
1883 break;
1884 case ITEM_VALUE_TYPE_STR:
1885 dc_history_clean_value(hdata);
1886 hdata->value.str = value->data.str;
1887 hdata->value.str[zbx_db_strlen_n(hdata->value.str, HISTORY_STR_VALUE_LEN)] = '\0';
1888 break;
1889 case ITEM_VALUE_TYPE_TEXT:
1890 dc_history_clean_value(hdata);
1891 hdata->value.str = value->data.str;
1892 hdata->value.str[zbx_db_strlen_n(hdata->value.str, HISTORY_TEXT_VALUE_LEN)] = '\0';
1893 break;
1894 case ITEM_VALUE_TYPE_LOG:
1895 if (ITEM_VALUE_TYPE_LOG != hdata->value_type)
1896 {
1897 dc_history_clean_value(hdata);
1898 hdata->value.log = (zbx_log_value_t *)zbx_malloc(NULL, sizeof(zbx_log_value_t));
1899 memset(hdata->value.log, 0, sizeof(zbx_log_value_t));
1900 }
1901 hdata->value.log->value = value->data.str;
1902 hdata->value.str[zbx_db_strlen_n(hdata->value.str, HISTORY_LOG_VALUE_LEN)] = '\0';
1903 }
1904
1905 hdata->value_type = value_type;
1906 zbx_variant_set_none(value);
1907 }
1908
1909 /******************************************************************************
1910 * *
1911 * Function: normalize_item_value *
1912 * *
1913 * Purpose: normalize item value by performing truncation of long text *
1914 * values and changes value format according to the item value type *
1915 * *
1916 * Parameters: item - [IN] the item *
1917 * hdata - [IN/OUT] the historical data to process *
1918 * *
1919 ******************************************************************************/
normalize_item_value(const DC_ITEM * item,ZBX_DC_HISTORY * hdata)1920 static void normalize_item_value(const DC_ITEM *item, ZBX_DC_HISTORY *hdata)
1921 {
1922 char *logvalue;
1923 zbx_variant_t value_var;
1924
1925 if (0 != (hdata->flags & ZBX_DC_FLAG_NOVALUE))
1926 return;
1927
1928 if (ITEM_STATE_NOTSUPPORTED == hdata->state)
1929 return;
1930
1931 if (0 == (hdata->flags & ZBX_DC_FLAG_NOHISTORY))
1932 hdata->ttl = item->history_sec;
1933
1934 if (item->value_type == hdata->value_type)
1935 {
1936 /* truncate text based values if necessary */
1937 switch (hdata->value_type)
1938 {
1939 case ITEM_VALUE_TYPE_STR:
1940 hdata->value.str[zbx_db_strlen_n(hdata->value.str, HISTORY_STR_VALUE_LEN)] = '\0';
1941 break;
1942 case ITEM_VALUE_TYPE_TEXT:
1943 hdata->value.str[zbx_db_strlen_n(hdata->value.str, HISTORY_TEXT_VALUE_LEN)] = '\0';
1944 break;
1945 case ITEM_VALUE_TYPE_LOG:
1946 logvalue = hdata->value.log->value;
1947 logvalue[zbx_db_strlen_n(logvalue, HISTORY_LOG_VALUE_LEN)] = '\0';
1948 break;
1949 case ITEM_VALUE_TYPE_FLOAT:
1950 if (FAIL == zbx_validate_value_dbl(hdata->value.dbl, CONFIG_DOUBLE_PRECISION))
1951 {
1952 char buffer[ZBX_MAX_DOUBLE_LEN + 1];
1953
1954 dc_history_set_error(hdata, zbx_dsprintf(NULL,
1955 "Value %s is too small or too large.",
1956 zbx_print_double(buffer, sizeof(buffer), hdata->value.dbl)));
1957 }
1958 break;
1959 }
1960 return;
1961 }
1962
1963 switch (hdata->value_type)
1964 {
1965 case ITEM_VALUE_TYPE_FLOAT:
1966 zbx_variant_set_dbl(&value_var, hdata->value.dbl);
1967 break;
1968 case ITEM_VALUE_TYPE_UINT64:
1969 zbx_variant_set_ui64(&value_var, hdata->value.ui64);
1970 break;
1971 case ITEM_VALUE_TYPE_STR:
1972 case ITEM_VALUE_TYPE_TEXT:
1973 zbx_variant_set_str(&value_var, hdata->value.str);
1974 hdata->value.str = NULL;
1975 break;
1976 case ITEM_VALUE_TYPE_LOG:
1977 zbx_variant_set_str(&value_var, hdata->value.log->value);
1978 hdata->value.log->value = NULL;
1979 break;
1980 }
1981
1982 dc_history_set_value(hdata, item->value_type, &value_var);
1983 zbx_variant_clear(&value_var);
1984 }
1985
1986 /******************************************************************************
1987 * *
1988 * Function: calculate_item_update *
1989 * *
1990 * Purpose: calculates what item fields must be updated *
1991 * *
1992 * Parameters: item - [IN] the item *
1993 * h - [IN] the historical data to process *
1994 * *
1995 * Return value: The update data. This data must be freed by the caller. *
1996 * *
1997 * Comments: Will generate internal events when item state switches. *
1998 * *
1999 ******************************************************************************/
calculate_item_update(DC_ITEM * item,const ZBX_DC_HISTORY * h)2000 static zbx_item_diff_t *calculate_item_update(DC_ITEM *item, const ZBX_DC_HISTORY *h)
2001 {
2002 zbx_uint64_t flags = 0;
2003 const char *item_error = NULL;
2004 zbx_item_diff_t *diff;
2005
2006 if (0 != (ZBX_DC_FLAG_META & h->flags))
2007 {
2008 if (item->lastlogsize != h->lastlogsize)
2009 flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE;
2010
2011 if (item->mtime != h->mtime)
2012 flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME;
2013 }
2014
2015 if (h->state != item->state)
2016 {
2017 flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE;
2018
2019 if (ITEM_STATE_NOTSUPPORTED == h->state)
2020 {
2021 zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" became not supported: %s",
2022 item->host.host, item->key_orig, h->value.str);
2023
2024 zbx_add_event(EVENT_SOURCE_INTERNAL, EVENT_OBJECT_ITEM, item->itemid, &h->ts, h->state, NULL,
2025 NULL, NULL, 0, 0, NULL, 0, NULL, 0, NULL, NULL, h->value.err);
2026
2027 if (0 != strcmp(item->error, h->value.err))
2028 item_error = h->value.err;
2029 }
2030 else
2031 {
2032 zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" became supported",
2033 item->host.host, item->key_orig);
2034
2035 /* we know it's EVENT_OBJECT_ITEM because LLDRULE that becomes */
2036 /* supported is handled in lld_process_discovery_rule() */
2037 zbx_add_event(EVENT_SOURCE_INTERNAL, EVENT_OBJECT_ITEM, item->itemid, &h->ts, h->state,
2038 NULL, NULL, NULL, 0, 0, NULL, 0, NULL, 0, NULL, NULL, NULL);
2039
2040 item_error = "";
2041 }
2042 }
2043 else if (ITEM_STATE_NOTSUPPORTED == h->state && 0 != strcmp(item->error, h->value.err))
2044 {
2045 zabbix_log(LOG_LEVEL_WARNING, "error reason for \"%s:%s\" changed: %s", item->host.host,
2046 item->key_orig, h->value.err);
2047
2048 item_error = h->value.err;
2049 }
2050
2051 if (NULL != item_error)
2052 flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_ERROR;
2053
2054 if (0 == flags)
2055 return NULL;
2056
2057 diff = (zbx_item_diff_t *)zbx_malloc(NULL, sizeof(zbx_item_diff_t));
2058 diff->itemid = item->itemid;
2059 diff->flags = flags;
2060
2061 if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE & flags))
2062 diff->lastlogsize = h->lastlogsize;
2063
2064 if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME & flags))
2065 diff->mtime = h->mtime;
2066
2067 if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE & flags))
2068 {
2069 diff->state = h->state;
2070 item->state = h->state;
2071 }
2072
2073 if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_ERROR & flags))
2074 diff->error = item_error;
2075
2076 return diff;
2077 }
2078
2079 /******************************************************************************
2080 * *
2081 * Function: DBmass_update_items *
2082 * *
2083 * Purpose: update item data and inventory in database *
2084 * *
2085 * Parameters: item_diff - item changes *
2086 * inventory_values - inventory values *
2087 * *
2088 ******************************************************************************/
DBmass_update_items(const zbx_vector_ptr_t * item_diff,const zbx_vector_ptr_t * inventory_values)2089 static void DBmass_update_items(const zbx_vector_ptr_t *item_diff, const zbx_vector_ptr_t *inventory_values)
2090 {
2091 size_t sql_offset = 0;
2092 int i;
2093
2094 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
2095
2096 for (i = 0; i < item_diff->values_num; i++)
2097 {
2098 zbx_item_diff_t *diff;
2099
2100 diff = (zbx_item_diff_t *)item_diff->values[i];
2101 if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_DB & diff->flags))
2102 break;
2103 }
2104
2105 if (i != item_diff->values_num || 0 != inventory_values->values_num)
2106 {
2107 DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset);
2108
2109 if (i != item_diff->values_num)
2110 {
2111 zbx_db_save_item_changes(&sql, &sql_alloc, &sql_offset, item_diff,
2112 ZBX_FLAGS_ITEM_DIFF_UPDATE_DB);
2113 }
2114
2115 if (0 != inventory_values->values_num)
2116 DCadd_update_inventory_sql(&sql_offset, inventory_values);
2117
2118 DBend_multiple_update(&sql, &sql_alloc, &sql_offset);
2119
2120 if (sql_offset > 16) /* In ORACLE always present begin..end; */
2121 DBexecute("%s", sql);
2122
2123 DCconfig_update_inventory_values(inventory_values);
2124 }
2125
2126 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
2127 }
2128
2129 /******************************************************************************
2130 * *
2131 * Function: DCmass_proxy_prepare_itemdiff *
2132 * *
2133 * Purpose: prepare itemdiff after receiving new values *
2134 * *
2135 * Parameters: history - array of history data *
2136 * history_num - number of history structures *
2137 * item_diff - vector to store prepared diff *
2138 * *
2139 ******************************************************************************/
DCmass_proxy_prepare_itemdiff(ZBX_DC_HISTORY * history,int history_num,zbx_vector_ptr_t * item_diff)2140 static void DCmass_proxy_prepare_itemdiff(ZBX_DC_HISTORY *history, int history_num, zbx_vector_ptr_t *item_diff)
2141 {
2142 int i;
2143
2144 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
2145
2146 zbx_vector_ptr_reserve(item_diff, history_num);
2147
2148 for (i = 0; i < history_num; i++)
2149 {
2150 zbx_item_diff_t *diff = (zbx_item_diff_t *)zbx_malloc(NULL, sizeof(zbx_item_diff_t));
2151
2152 diff->itemid = history[i].itemid;
2153 diff->state = history[i].state;
2154 diff->flags = ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE;
2155
2156 if (0 != (ZBX_DC_FLAG_META & history[i].flags))
2157 {
2158 diff->lastlogsize = history[i].lastlogsize;
2159 diff->mtime = history[i].mtime;
2160 diff->flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE | ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME;
2161 }
2162
2163 zbx_vector_ptr_append(item_diff, diff);
2164 }
2165
2166 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
2167 }
2168
2169 /******************************************************************************
2170 * *
2171 * Function: DCmass_proxy_update_items *
2172 * *
2173 * Purpose: update items info after new value is received *
2174 * *
2175 * Parameters: item_diff - diff of items to be updated *
2176 * *
2177 * Author: Alexei Vladishev, Eugene Grigorjev, Alexander Vladishev *
2178 * *
2179 ******************************************************************************/
DBmass_proxy_update_items(zbx_vector_ptr_t * item_diff)2180 static void DBmass_proxy_update_items(zbx_vector_ptr_t *item_diff)
2181 {
2182 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
2183
2184 if (0 != item_diff->values_num)
2185 {
2186 size_t sql_offset = 0;
2187
2188 zbx_vector_ptr_sort(item_diff, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC);
2189
2190 DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset);
2191
2192 zbx_db_save_item_changes(&sql, &sql_alloc, &sql_offset, item_diff,
2193 ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE | ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME);
2194
2195 DBend_multiple_update(&sql, &sql_alloc, &sql_offset);
2196
2197 if (sql_offset > 16) /* In ORACLE always present begin..end; */
2198 DBexecute("%s", sql);
2199 }
2200
2201 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
2202 }
2203
2204 /******************************************************************************
2205 * *
2206 * Function: DBmass_add_history *
2207 * *
2208 * Purpose: inserting new history data after new value is received *
2209 * *
2210 * Parameters: history - array of history data *
2211 * history_num - number of history structures *
2212 * *
2213 ******************************************************************************/
DBmass_add_history(ZBX_DC_HISTORY * history,int history_num)2214 static int DBmass_add_history(ZBX_DC_HISTORY *history, int history_num)
2215 {
2216 int i, ret = SUCCEED;
2217 zbx_vector_ptr_t history_values;
2218
2219 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
2220
2221 zbx_vector_ptr_create(&history_values);
2222 zbx_vector_ptr_reserve(&history_values, history_num);
2223
2224 for (i = 0; i < history_num; i++)
2225 {
2226 ZBX_DC_HISTORY *h = &history[i];
2227
2228 if (0 != (ZBX_DC_FLAGS_NOT_FOR_HISTORY & h->flags))
2229 continue;
2230
2231 zbx_vector_ptr_append(&history_values, h);
2232 }
2233
2234 if (0 != history_values.values_num)
2235 ret = zbx_vc_add_values(&history_values);
2236
2237 zbx_vector_ptr_destroy(&history_values);
2238
2239 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
2240
2241 return ret;
2242 }
2243
2244 /******************************************************************************
2245 * *
2246 * Function: dc_add_proxy_history *
2247 * *
2248 * Purpose: helper function for DCmass_proxy_add_history() *
2249 * *
2250 * Comment: this function is meant for items with value_type other other than *
2251 * ITEM_VALUE_TYPE_LOG not containing meta information in result *
2252 * *
2253 ******************************************************************************/
dc_add_proxy_history(ZBX_DC_HISTORY * history,int history_num)2254 static void dc_add_proxy_history(ZBX_DC_HISTORY *history, int history_num)
2255 {
2256 int i, now;
2257 unsigned int flags;
2258 char buffer[64], *pvalue;
2259 zbx_db_insert_t db_insert;
2260
2261 now = (int)time(NULL);
2262 zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "value", "flags", "write_clock",
2263 NULL);
2264
2265 for (i = 0; i < history_num; i++)
2266 {
2267 const ZBX_DC_HISTORY *h = &history[i];
2268
2269 if (0 != (h->flags & ZBX_DC_FLAG_UNDEF))
2270 continue;
2271
2272 if (0 != (h->flags & ZBX_DC_FLAG_META))
2273 continue;
2274
2275 if (ITEM_STATE_NOTSUPPORTED == h->state)
2276 continue;
2277
2278 if (0 == (h->flags & ZBX_DC_FLAG_NOVALUE))
2279 {
2280 switch (h->value_type)
2281 {
2282 case ITEM_VALUE_TYPE_FLOAT:
2283 zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_DBL64, h->value.dbl);
2284 break;
2285 case ITEM_VALUE_TYPE_UINT64:
2286 zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_UI64, h->value.ui64);
2287 break;
2288 case ITEM_VALUE_TYPE_STR:
2289 case ITEM_VALUE_TYPE_TEXT:
2290 pvalue = h->value.str;
2291 break;
2292 case ITEM_VALUE_TYPE_LOG:
2293 continue;
2294 default:
2295 THIS_SHOULD_NEVER_HAPPEN;
2296 continue;
2297 }
2298 flags = 0;
2299 }
2300 else
2301 {
2302 flags = PROXY_HISTORY_FLAG_NOVALUE;
2303 pvalue = (char *)"";
2304 }
2305
2306 zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, pvalue, flags, now);
2307 }
2308
2309 zbx_db_insert_execute(&db_insert);
2310 zbx_db_insert_clean(&db_insert);
2311 }
2312
2313 /******************************************************************************
2314 * *
2315 * Function: dc_add_proxy_history_meta *
2316 * *
2317 * Purpose: helper function for DCmass_proxy_add_history() *
2318 * *
2319 * Comment: this function is meant for items with value_type other other than *
2320 * ITEM_VALUE_TYPE_LOG containing meta information in result *
2321 * *
2322 ******************************************************************************/
dc_add_proxy_history_meta(ZBX_DC_HISTORY * history,int history_num)2323 static void dc_add_proxy_history_meta(ZBX_DC_HISTORY *history, int history_num)
2324 {
2325 int i, now;
2326 char buffer[64], *pvalue;
2327 zbx_db_insert_t db_insert;
2328
2329 now = (int)time(NULL);
2330 zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "value", "lastlogsize", "mtime",
2331 "flags", "write_clock", NULL);
2332
2333 for (i = 0; i < history_num; i++)
2334 {
2335 unsigned int flags = PROXY_HISTORY_FLAG_META;
2336 const ZBX_DC_HISTORY *h = &history[i];
2337
2338 if (ITEM_STATE_NOTSUPPORTED == h->state)
2339 continue;
2340
2341 if (0 != (h->flags & ZBX_DC_FLAG_UNDEF))
2342 continue;
2343
2344 if (0 == (h->flags & ZBX_DC_FLAG_META))
2345 continue;
2346
2347 if (ITEM_VALUE_TYPE_LOG == h->value_type)
2348 continue;
2349
2350 if (0 == (h->flags & ZBX_DC_FLAG_NOVALUE))
2351 {
2352 switch (h->value_type)
2353 {
2354 case ITEM_VALUE_TYPE_FLOAT:
2355 zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_DBL64, h->value.dbl);
2356 break;
2357 case ITEM_VALUE_TYPE_UINT64:
2358 zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_UI64, h->value.ui64);
2359 break;
2360 case ITEM_VALUE_TYPE_STR:
2361 case ITEM_VALUE_TYPE_TEXT:
2362 pvalue = h->value.str;
2363 break;
2364 default:
2365 THIS_SHOULD_NEVER_HAPPEN;
2366 continue;
2367 }
2368 }
2369 else
2370 {
2371 flags |= PROXY_HISTORY_FLAG_NOVALUE;
2372 pvalue = (char *)"";
2373 }
2374
2375 zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, pvalue, h->lastlogsize, h->mtime,
2376 flags, now);
2377 }
2378
2379 zbx_db_insert_execute(&db_insert);
2380 zbx_db_insert_clean(&db_insert);
2381 }
2382
2383 /******************************************************************************
2384 * *
2385 * Function: dc_add_proxy_history_log *
2386 * *
2387 * Purpose: helper function for DCmass_proxy_add_history() *
2388 * *
2389 * Comment: this function is meant for items with value_type *
2390 * ITEM_VALUE_TYPE_LOG *
2391 * *
2392 ******************************************************************************/
dc_add_proxy_history_log(ZBX_DC_HISTORY * history,int history_num)2393 static void dc_add_proxy_history_log(ZBX_DC_HISTORY *history, int history_num)
2394 {
2395 int i, now;
2396 zbx_db_insert_t db_insert;
2397
2398 now = (int)time(NULL);
2399
2400 /* see hc_copy_history_data() for fields that might be uninitialized and need special handling here */
2401 zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "timestamp", "source", "severity",
2402 "value", "logeventid", "lastlogsize", "mtime", "flags", "write_clock", NULL);
2403
2404 for (i = 0; i < history_num; i++)
2405 {
2406 unsigned int flags;
2407 zbx_uint64_t lastlogsize;
2408 int mtime;
2409 const ZBX_DC_HISTORY *h = &history[i];
2410
2411 if (ITEM_STATE_NOTSUPPORTED == h->state)
2412 continue;
2413
2414 if (ITEM_VALUE_TYPE_LOG != h->value_type)
2415 continue;
2416
2417 if (0 == (h->flags & ZBX_DC_FLAG_NOVALUE))
2418 {
2419 zbx_log_value_t *log = h->value.log;
2420
2421 if (0 != (h->flags & ZBX_DC_FLAG_META))
2422 {
2423 flags = PROXY_HISTORY_FLAG_META;
2424 lastlogsize = h->lastlogsize;
2425 mtime = h->mtime;
2426 }
2427 else
2428 {
2429 flags = 0;
2430 lastlogsize = 0;
2431 mtime = 0;
2432 }
2433
2434 zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, log->timestamp,
2435 ZBX_NULL2EMPTY_STR(log->source), log->severity, log->value, log->logeventid,
2436 lastlogsize, mtime, flags, now);
2437 }
2438 else
2439 {
2440 /* sent to server only if not 0, see proxy_get_history_data() */
2441 const int unset_if_novalue = 0;
2442
2443 flags = PROXY_HISTORY_FLAG_META | PROXY_HISTORY_FLAG_NOVALUE;
2444
2445 zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, unset_if_novalue, "",
2446 unset_if_novalue, "", unset_if_novalue, h->lastlogsize, h->mtime, flags, now);
2447 }
2448 }
2449
2450 zbx_db_insert_execute(&db_insert);
2451 zbx_db_insert_clean(&db_insert);
2452 }
2453
2454 /******************************************************************************
2455 * *
2456 * Function: dc_add_proxy_history_notsupported *
2457 * *
2458 * Purpose: helper function for DCmass_proxy_add_history() *
2459 * *
2460 ******************************************************************************/
dc_add_proxy_history_notsupported(ZBX_DC_HISTORY * history,int history_num)2461 static void dc_add_proxy_history_notsupported(ZBX_DC_HISTORY *history, int history_num)
2462 {
2463 int i, now;
2464 zbx_db_insert_t db_insert;
2465
2466 now = (int)time(NULL);
2467 zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "value", "state", "write_clock",
2468 NULL);
2469
2470 for (i = 0; i < history_num; i++)
2471 {
2472 const ZBX_DC_HISTORY *h = &history[i];
2473
2474 if (ITEM_STATE_NOTSUPPORTED != h->state)
2475 continue;
2476
2477 zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, ZBX_NULL2EMPTY_STR(h->value.err),
2478 (int)h->state, now);
2479 }
2480
2481 zbx_db_insert_execute(&db_insert);
2482 zbx_db_insert_clean(&db_insert);
2483 }
2484
2485 /******************************************************************************
2486 * *
2487 * Function: DCmass_proxy_add_history *
2488 * *
2489 * Purpose: inserting new history data after new value is received *
2490 * *
2491 * Parameters: history - array of history data *
2492 * history_num - number of history structures *
2493 * *
2494 * Author: Alexander Vladishev *
2495 * *
2496 ******************************************************************************/
DBmass_proxy_add_history(ZBX_DC_HISTORY * history,int history_num)2497 static void DBmass_proxy_add_history(ZBX_DC_HISTORY *history, int history_num)
2498 {
2499 int i, h_num = 0, h_meta_num = 0, hlog_num = 0, notsupported_num = 0;
2500
2501 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
2502
2503 for (i = 0; i < history_num; i++)
2504 {
2505 const ZBX_DC_HISTORY *h = &history[i];
2506
2507 if (ITEM_STATE_NOTSUPPORTED == h->state)
2508 {
2509 notsupported_num++;
2510 continue;
2511 }
2512
2513 switch (h->value_type)
2514 {
2515 case ITEM_VALUE_TYPE_LOG:
2516 hlog_num++;
2517 break;
2518 case ITEM_VALUE_TYPE_FLOAT:
2519 case ITEM_VALUE_TYPE_UINT64:
2520 case ITEM_VALUE_TYPE_STR:
2521 case ITEM_VALUE_TYPE_TEXT:
2522 if (0 != (h->flags & ZBX_DC_FLAG_META))
2523 h_meta_num++;
2524 else
2525 h_num++;
2526 break;
2527 case ITEM_VALUE_TYPE_NONE:
2528 h_num++;
2529 break;
2530 default:
2531 THIS_SHOULD_NEVER_HAPPEN;
2532 }
2533 }
2534
2535 if (0 != h_num)
2536 dc_add_proxy_history(history, history_num);
2537
2538 if (0 != h_meta_num)
2539 dc_add_proxy_history_meta(history, history_num);
2540
2541 if (0 != hlog_num)
2542 dc_add_proxy_history_log(history, history_num);
2543
2544 if (0 != notsupported_num)
2545 dc_add_proxy_history_notsupported(history, history_num);
2546
2547 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
2548 }
2549
2550 /******************************************************************************
2551 * *
2552 * Function: DCmass_prepare_history *
2553 * *
2554 * Purpose: prepare history data using items from configuration cache and *
2555 * generate item changes to be applied and host inventory values to *
2556 * be added *
2557 * *
2558 * Parameters: history - [IN/OUT] array of history data *
2559 * itemids - [IN] the item identifiers *
2560 * (used for item lookup) *
2561 * items - [IN] the items *
2562 * errcodes - [IN] item error codes *
2563 * history_num - [IN] number of history structures *
2564 * item_diff - [OUT] the changes in item data *
2565 * inventory_values - [OUT] the inventory values to add *
2566 * compression_age - [IN] history compression age *
2567 * proxy_subscribtions - [IN] history compression age *
2568 * *
2569 ******************************************************************************/
DCmass_prepare_history(ZBX_DC_HISTORY * history,const zbx_vector_uint64_t * itemids,DC_ITEM * items,const int * errcodes,int history_num,zbx_vector_ptr_t * item_diff,zbx_vector_ptr_t * inventory_values,int compression_age,zbx_vector_uint64_pair_t * proxy_subscribtions)2570 static void DCmass_prepare_history(ZBX_DC_HISTORY *history, const zbx_vector_uint64_t *itemids,
2571 DC_ITEM *items, const int *errcodes, int history_num, zbx_vector_ptr_t *item_diff,
2572 zbx_vector_ptr_t *inventory_values, int compression_age, zbx_vector_uint64_pair_t *proxy_subscribtions)
2573 {
2574 static time_t last_history_discard = 0;
2575 time_t now;
2576 int i;
2577
2578 zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d", __func__, history_num);
2579
2580 now = time(NULL);
2581
2582 for (i = 0; i < history_num; i++)
2583 {
2584 ZBX_DC_HISTORY *h = &history[i];
2585 DC_ITEM *item;
2586 zbx_item_diff_t *diff;
2587 int index;
2588
2589 /* discard history items that are older than compression age */
2590 if (0 != compression_age && h->ts.sec < compression_age)
2591 {
2592 if (SEC_PER_HOUR < (now - last_history_discard)) /* log once per hour */
2593 {
2594 zabbix_log(LOG_LEVEL_TRACE, "discarding history that is pointing to"
2595 " compressed history period");
2596 last_history_discard = now;
2597 }
2598
2599 h->flags |= ZBX_DC_FLAG_UNDEF;
2600 continue;
2601 }
2602
2603 if (FAIL == (index = zbx_vector_uint64_bsearch(itemids, h->itemid, ZBX_DEFAULT_UINT64_COMPARE_FUNC)))
2604 {
2605 THIS_SHOULD_NEVER_HAPPEN;
2606 h->flags |= ZBX_DC_FLAG_UNDEF;
2607 continue;
2608 }
2609
2610 if (SUCCEED != errcodes[index])
2611 {
2612 h->flags |= ZBX_DC_FLAG_UNDEF;
2613 continue;
2614 }
2615
2616 item = &items[index];
2617
2618 if (ITEM_STATUS_ACTIVE != item->status || HOST_STATUS_MONITORED != item->host.status)
2619 {
2620 h->flags |= ZBX_DC_FLAG_UNDEF;
2621 continue;
2622 }
2623
2624 if (0 == item->history)
2625 {
2626 h->flags |= ZBX_DC_FLAG_NOHISTORY;
2627 }
2628 else if (now - h->ts.sec > item->history_sec)
2629 {
2630 h->flags |= ZBX_DC_FLAG_NOHISTORY;
2631 zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" value timestamp \"%s %s\" is outside history "
2632 "storage period", item->host.host, item->key_orig,
2633 zbx_date2str(h->ts.sec, NULL), zbx_time2str(h->ts.sec, NULL));
2634 }
2635
2636 if (ITEM_VALUE_TYPE_FLOAT == item->value_type || ITEM_VALUE_TYPE_UINT64 == item->value_type)
2637 {
2638 if (0 == item->trends)
2639 {
2640 h->flags |= ZBX_DC_FLAG_NOTRENDS;
2641 }
2642 else if (now - h->ts.sec > item->trends_sec)
2643 {
2644 h->flags |= ZBX_DC_FLAG_NOTRENDS;
2645 zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" value timestamp \"%s %s\" is outside "
2646 "trends storage period", item->host.host, item->key_orig,
2647 zbx_date2str(h->ts.sec, NULL), zbx_time2str(h->ts.sec, NULL));
2648 }
2649 }
2650 else
2651 h->flags |= ZBX_DC_FLAG_NOTRENDS;
2652
2653 normalize_item_value(item, h);
2654
2655 /* calculate item update and update already retrieved item status for trigger calculation */
2656 if (NULL != (diff = calculate_item_update(item, h)))
2657 zbx_vector_ptr_append(item_diff, diff);
2658
2659 DCinventory_value_add(inventory_values, item, h);
2660
2661 if (0 != item->host.proxy_hostid && FAIL == is_item_processed_by_server(item->type, item->key_orig))
2662 {
2663 zbx_uint64_pair_t p = {item->host.proxy_hostid, h->ts.sec};
2664
2665 zbx_vector_uint64_pair_append(proxy_subscribtions, p);
2666 }
2667 }
2668
2669 zbx_vector_ptr_sort(inventory_values, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC);
2670 zbx_vector_ptr_sort(item_diff, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC);
2671
2672 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
2673 }
2674
2675 /******************************************************************************
2676 * *
2677 * Function: DCmodule_prepare_history *
2678 * *
2679 * Purpose: prepare history data to share them with loadable modules, sort *
2680 * data by type skipping low-level discovery data, meta information *
2681 * updates and notsupported items *
2682 * *
2683 * Parameters: history - [IN] array of history data *
2684 * history_num - [IN] number of history structures *
2685 * history_<type> - [OUT] array of historical data of a *
2686 * specific data type *
2687 * history_<type>_num - [OUT] number of values of a specific *
2688 * data type *
2689 * *
2690 ******************************************************************************/
DCmodule_prepare_history(ZBX_DC_HISTORY * history,int history_num,ZBX_HISTORY_FLOAT * history_float,int * history_float_num,ZBX_HISTORY_INTEGER * history_integer,int * history_integer_num,ZBX_HISTORY_STRING * history_string,int * history_string_num,ZBX_HISTORY_TEXT * history_text,int * history_text_num,ZBX_HISTORY_LOG * history_log,int * history_log_num)2691 static void DCmodule_prepare_history(ZBX_DC_HISTORY *history, int history_num, ZBX_HISTORY_FLOAT *history_float,
2692 int *history_float_num, ZBX_HISTORY_INTEGER *history_integer, int *history_integer_num,
2693 ZBX_HISTORY_STRING *history_string, int *history_string_num, ZBX_HISTORY_TEXT *history_text,
2694 int *history_text_num, ZBX_HISTORY_LOG *history_log, int *history_log_num)
2695 {
2696 ZBX_DC_HISTORY *h;
2697 ZBX_HISTORY_FLOAT *h_float;
2698 ZBX_HISTORY_INTEGER *h_integer;
2699 ZBX_HISTORY_STRING *h_string;
2700 ZBX_HISTORY_TEXT *h_text;
2701 ZBX_HISTORY_LOG *h_log;
2702 int i;
2703 const zbx_log_value_t *log;
2704
2705 *history_float_num = 0;
2706 *history_integer_num = 0;
2707 *history_string_num = 0;
2708 *history_text_num = 0;
2709 *history_log_num = 0;
2710
2711 for (i = 0; i < history_num; i++)
2712 {
2713 h = &history[i];
2714
2715 if (0 != (ZBX_DC_FLAGS_NOT_FOR_MODULES & h->flags))
2716 continue;
2717
2718 switch (h->value_type)
2719 {
2720 case ITEM_VALUE_TYPE_FLOAT:
2721 if (NULL == history_float_cbs)
2722 continue;
2723
2724 h_float = &history_float[(*history_float_num)++];
2725 h_float->itemid = h->itemid;
2726 h_float->clock = h->ts.sec;
2727 h_float->ns = h->ts.ns;
2728 h_float->value = h->value.dbl;
2729 break;
2730 case ITEM_VALUE_TYPE_UINT64:
2731 if (NULL == history_integer_cbs)
2732 continue;
2733
2734 h_integer = &history_integer[(*history_integer_num)++];
2735 h_integer->itemid = h->itemid;
2736 h_integer->clock = h->ts.sec;
2737 h_integer->ns = h->ts.ns;
2738 h_integer->value = h->value.ui64;
2739 break;
2740 case ITEM_VALUE_TYPE_STR:
2741 if (NULL == history_string_cbs)
2742 continue;
2743
2744 h_string = &history_string[(*history_string_num)++];
2745 h_string->itemid = h->itemid;
2746 h_string->clock = h->ts.sec;
2747 h_string->ns = h->ts.ns;
2748 h_string->value = h->value.str;
2749 break;
2750 case ITEM_VALUE_TYPE_TEXT:
2751 if (NULL == history_text_cbs)
2752 continue;
2753
2754 h_text = &history_text[(*history_text_num)++];
2755 h_text->itemid = h->itemid;
2756 h_text->clock = h->ts.sec;
2757 h_text->ns = h->ts.ns;
2758 h_text->value = h->value.str;
2759 break;
2760 case ITEM_VALUE_TYPE_LOG:
2761 if (NULL == history_log_cbs)
2762 continue;
2763
2764 log = h->value.log;
2765 h_log = &history_log[(*history_log_num)++];
2766 h_log->itemid = h->itemid;
2767 h_log->clock = h->ts.sec;
2768 h_log->ns = h->ts.ns;
2769 h_log->value = log->value;
2770 h_log->source = ZBX_NULL2EMPTY_STR(log->source);
2771 h_log->timestamp = log->timestamp;
2772 h_log->logeventid = log->logeventid;
2773 h_log->severity = log->severity;
2774 break;
2775 default:
2776 THIS_SHOULD_NEVER_HAPPEN;
2777 }
2778 }
2779 }
2780
DCmodule_sync_history(int history_float_num,int history_integer_num,int history_string_num,int history_text_num,int history_log_num,ZBX_HISTORY_FLOAT * history_float,ZBX_HISTORY_INTEGER * history_integer,ZBX_HISTORY_STRING * history_string,ZBX_HISTORY_TEXT * history_text,ZBX_HISTORY_LOG * history_log)2781 static void DCmodule_sync_history(int history_float_num, int history_integer_num, int history_string_num,
2782 int history_text_num, int history_log_num, ZBX_HISTORY_FLOAT *history_float,
2783 ZBX_HISTORY_INTEGER *history_integer, ZBX_HISTORY_STRING *history_string,
2784 ZBX_HISTORY_TEXT *history_text, ZBX_HISTORY_LOG *history_log)
2785 {
2786 if (0 != history_float_num)
2787 {
2788 int i;
2789
2790 zabbix_log(LOG_LEVEL_DEBUG, "syncing float history data with modules...");
2791
2792 for (i = 0; NULL != history_float_cbs[i].module; i++)
2793 {
2794 zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_float_cbs[i].module->name);
2795 history_float_cbs[i].history_float_cb(history_float, history_float_num);
2796 }
2797
2798 zabbix_log(LOG_LEVEL_DEBUG, "synced %d float values with modules", history_float_num);
2799 }
2800
2801 if (0 != history_integer_num)
2802 {
2803 int i;
2804
2805 zabbix_log(LOG_LEVEL_DEBUG, "syncing integer history data with modules...");
2806
2807 for (i = 0; NULL != history_integer_cbs[i].module; i++)
2808 {
2809 zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_integer_cbs[i].module->name);
2810 history_integer_cbs[i].history_integer_cb(history_integer, history_integer_num);
2811 }
2812
2813 zabbix_log(LOG_LEVEL_DEBUG, "synced %d integer values with modules", history_integer_num);
2814 }
2815
2816 if (0 != history_string_num)
2817 {
2818 int i;
2819
2820 zabbix_log(LOG_LEVEL_DEBUG, "syncing string history data with modules...");
2821
2822 for (i = 0; NULL != history_string_cbs[i].module; i++)
2823 {
2824 zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_string_cbs[i].module->name);
2825 history_string_cbs[i].history_string_cb(history_string, history_string_num);
2826 }
2827
2828 zabbix_log(LOG_LEVEL_DEBUG, "synced %d string values with modules", history_string_num);
2829 }
2830
2831 if (0 != history_text_num)
2832 {
2833 int i;
2834
2835 zabbix_log(LOG_LEVEL_DEBUG, "syncing text history data with modules...");
2836
2837 for (i = 0; NULL != history_text_cbs[i].module; i++)
2838 {
2839 zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_text_cbs[i].module->name);
2840 history_text_cbs[i].history_text_cb(history_text, history_text_num);
2841 }
2842
2843 zabbix_log(LOG_LEVEL_DEBUG, "synced %d text values with modules", history_text_num);
2844 }
2845
2846 if (0 != history_log_num)
2847 {
2848 int i;
2849
2850 zabbix_log(LOG_LEVEL_DEBUG, "syncing log history data with modules...");
2851
2852 for (i = 0; NULL != history_log_cbs[i].module; i++)
2853 {
2854 zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_log_cbs[i].module->name);
2855 history_log_cbs[i].history_log_cb(history_log, history_log_num);
2856 }
2857
2858 zabbix_log(LOG_LEVEL_DEBUG, "synced %d log values with modules", history_log_num);
2859 }
2860 }
2861
2862 /******************************************************************************
2863 * *
2864 * Function: proxy_prepare_history *
2865 * *
2866 * Purpose: prepares history update by checking which values must be stored *
2867 * *
2868 * Parameters: history - [IN/OUT] the history values *
2869 * history_num - [IN] the number of history values *
2870 * *
2871 ******************************************************************************/
proxy_prepare_history(ZBX_DC_HISTORY * history,int history_num)2872 static void proxy_prepare_history(ZBX_DC_HISTORY *history, int history_num)
2873 {
2874 int i, *errcodes;
2875 DC_ITEM *items;
2876 zbx_vector_uint64_t itemids;
2877
2878 zbx_vector_uint64_create(&itemids);
2879 zbx_vector_uint64_reserve(&itemids, history_num);
2880
2881 for (i = 0; i < history_num; i++)
2882 zbx_vector_uint64_append(&itemids, history[i].itemid);
2883
2884 items = (DC_ITEM *)zbx_malloc(NULL, sizeof(DC_ITEM) * (size_t)history_num);
2885 errcodes = (int *)zbx_malloc(NULL, sizeof(int) * (size_t)history_num);
2886
2887 DCconfig_get_items_by_itemids(items, itemids.values, errcodes, itemids.values_num);
2888
2889 for (i = 0; i < history_num; i++)
2890 {
2891 if (SUCCEED != errcodes[i])
2892 continue;
2893
2894 /* store items with enabled history */
2895 if (0 != items[i].history)
2896 continue;
2897
2898 /* store numeric items to handle data conversion errors on server and trends */
2899 if (ITEM_VALUE_TYPE_FLOAT == items[i].value_type || ITEM_VALUE_TYPE_UINT64 == items[i].value_type)
2900 continue;
2901
2902 /* store discovery rules */
2903 if (0 != (items[i].flags & ZBX_FLAG_DISCOVERY_RULE))
2904 continue;
2905
2906 /* store errors or first value after an error */
2907 if (ITEM_STATE_NOTSUPPORTED == history[i].state || ITEM_STATE_NOTSUPPORTED == items[i].state)
2908 continue;
2909
2910 /* store items linked to host inventory */
2911 if (0 != items[i].inventory_link)
2912 continue;
2913
2914 dc_history_clean_value(history + i);
2915
2916 /* all checks passed, item value must not be stored in proxy history/sent to server */
2917 history[i].flags |= ZBX_DC_FLAG_NOVALUE;
2918 }
2919
2920 DCconfig_clean_items(items, errcodes, history_num);
2921 zbx_free(items);
2922 zbx_free(errcodes);
2923 zbx_vector_uint64_destroy(&itemids);
2924 }
2925
sync_proxy_history(int * total_num,int * more)2926 static void sync_proxy_history(int *total_num, int *more)
2927 {
2928 int history_num, txn_rc;
2929 time_t sync_start;
2930 zbx_vector_ptr_t history_items;
2931 zbx_vector_ptr_t item_diff;
2932 ZBX_DC_HISTORY history[ZBX_HC_SYNC_MAX];
2933
2934 zbx_vector_ptr_create(&history_items);
2935 zbx_vector_ptr_reserve(&history_items, ZBX_HC_SYNC_MAX);
2936 zbx_vector_ptr_create(&item_diff);
2937
2938 sync_start = time(NULL);
2939
2940 do
2941 {
2942 *more = ZBX_SYNC_DONE;
2943
2944 LOCK_CACHE;
2945
2946 hc_pop_items(&history_items); /* select and take items out of history cache */
2947 history_num = history_items.values_num;
2948
2949 UNLOCK_CACHE;
2950
2951 if (0 == history_num)
2952 break;
2953
2954 hc_get_item_values(history, &history_items); /* copy item data from history cache */
2955 proxy_prepare_history(history, history_items.values_num);
2956
2957 DCmass_proxy_prepare_itemdiff(history, history_num, &item_diff);
2958
2959 do
2960 {
2961 DBbegin();
2962
2963 DBmass_proxy_add_history(history, history_num);
2964 DBmass_proxy_update_items(&item_diff);
2965 }
2966 while (ZBX_DB_DOWN == (txn_rc = DBcommit()));
2967
2968 LOCK_CACHE;
2969
2970 hc_push_items(&history_items); /* return items to history cache */
2971
2972 if (ZBX_DB_FAIL != txn_rc)
2973 {
2974 if (0 != item_diff.values_num)
2975 DCconfig_items_apply_changes(&item_diff);
2976
2977 cache->history_num -= history_num;
2978
2979 if (0 != hc_queue_get_size())
2980 *more = ZBX_SYNC_MORE;
2981
2982 UNLOCK_CACHE;
2983
2984 *total_num += history_num;
2985
2986 hc_free_item_values(history, history_num);
2987 }
2988 else
2989 {
2990 *more = ZBX_SYNC_MORE;
2991 UNLOCK_CACHE;
2992 }
2993
2994 zbx_vector_ptr_clear(&history_items);
2995 zbx_vector_ptr_clear_ext(&item_diff, zbx_default_mem_free_func);
2996
2997 /* Exit from sync loop if we have spent too much time here */
2998 /* unless we are doing full sync. This is done to allow */
2999 /* syncer process to update their statistics. */
3000 }
3001 while (ZBX_SYNC_MORE == *more && ZBX_HC_SYNC_TIME_MAX >= time(NULL) - sync_start);
3002
3003 zbx_vector_ptr_destroy(&item_diff);
3004 zbx_vector_ptr_destroy(&history_items);
3005 }
3006
3007 /******************************************************************************
3008 * *
3009 * Function: sync_server_history *
3010 * *
3011 * Purpose: flush history cache to database, process triggers of flushed *
3012 * and timer triggers from timer queue *
3013 * *
3014 * Parameters: sync_timeout - [IN] the timeout in seconds *
3015 * values_num - [IN/OUT] the number of synced values *
3016 * triggers_num - [IN/OUT] the number of processed timers *
3017 * more - [OUT] a flag indicating the cache emptiness: *
3018 * ZBX_SYNC_DONE - nothing to sync, go idle *
3019 * ZBX_SYNC_MORE - more data to sync *
3020 * *
3021 * Comments: This function loops syncing history values by 1k batches and *
3022 * processing timer triggers by batches of 500 triggers. *
3023 * Unless full sync is being done the loop is aborted if either *
3024 * timeout has passed or there are no more data to process. *
3025 * The last is assumed when the following is true: *
3026 * a) history cache is empty or less than 10% of batch values were *
3027 * processed (the other items were locked by triggers) *
3028 * b) less than 500 (full batch) timer triggers were processed *
3029 * *
3030 ******************************************************************************/
sync_server_history(int * values_num,int * triggers_num,int * more)3031 static void sync_server_history(int *values_num, int *triggers_num, int *more)
3032 {
3033 static ZBX_HISTORY_FLOAT *history_float;
3034 static ZBX_HISTORY_INTEGER *history_integer;
3035 static ZBX_HISTORY_STRING *history_string;
3036 static ZBX_HISTORY_TEXT *history_text;
3037 static ZBX_HISTORY_LOG *history_log;
3038 int i, history_num, history_float_num, history_integer_num, history_string_num,
3039 history_text_num, history_log_num, txn_error, compression_age;
3040 unsigned int item_retrieve_mode;
3041 time_t sync_start;
3042 zbx_vector_uint64_t triggerids ;
3043 zbx_vector_ptr_t history_items, trigger_diff, item_diff, inventory_values, trigger_timers;
3044 zbx_vector_uint64_pair_t trends_diff, proxy_subscribtions;
3045 ZBX_DC_HISTORY history[ZBX_HC_SYNC_MAX];
3046
3047 item_retrieve_mode = NULL == CONFIG_EXPORT_DIR ? ZBX_ITEM_GET_SYNC : ZBX_ITEM_GET_SYNC_EXPORT;
3048
3049 if (NULL == history_float && NULL != history_float_cbs)
3050 {
3051 history_float = (ZBX_HISTORY_FLOAT *)zbx_malloc(history_float,
3052 ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_FLOAT));
3053 }
3054
3055 if (NULL == history_integer && NULL != history_integer_cbs)
3056 {
3057 history_integer = (ZBX_HISTORY_INTEGER *)zbx_malloc(history_integer,
3058 ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_INTEGER));
3059 }
3060
3061 if (NULL == history_string && NULL != history_string_cbs)
3062 {
3063 history_string = (ZBX_HISTORY_STRING *)zbx_malloc(history_string,
3064 ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_STRING));
3065 }
3066
3067 if (NULL == history_text && NULL != history_text_cbs)
3068 {
3069 history_text = (ZBX_HISTORY_TEXT *)zbx_malloc(history_text,
3070 ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_TEXT));
3071 }
3072
3073 if (NULL == history_log && NULL != history_log_cbs)
3074 {
3075 history_log = (ZBX_HISTORY_LOG *)zbx_malloc(history_log,
3076 ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_LOG));
3077 }
3078
3079 compression_age = hc_get_history_compression_age();
3080
3081 zbx_vector_ptr_create(&inventory_values);
3082 zbx_vector_ptr_create(&item_diff);
3083 zbx_vector_ptr_create(&trigger_diff);
3084 zbx_vector_uint64_pair_create(&trends_diff);
3085 zbx_vector_uint64_pair_create(&proxy_subscribtions);
3086
3087 zbx_vector_uint64_create(&triggerids);
3088 zbx_vector_uint64_reserve(&triggerids, ZBX_HC_SYNC_MAX);
3089
3090 zbx_vector_ptr_create(&trigger_timers);
3091 zbx_vector_ptr_reserve(&trigger_timers, ZBX_HC_TIMER_MAX);
3092
3093 zbx_vector_ptr_create(&history_items);
3094 zbx_vector_ptr_reserve(&history_items, ZBX_HC_SYNC_MAX);
3095
3096 sync_start = time(NULL);
3097
3098 do
3099 {
3100 DC_ITEM *items;
3101 int *errcodes, trends_num = 0, timers_num = 0, ret = SUCCEED;
3102 zbx_vector_uint64_t itemids;
3103 ZBX_DC_TREND *trends = NULL;
3104
3105 zbx_vector_uint64_create(&itemids);
3106
3107 *more = ZBX_SYNC_DONE;
3108
3109 LOCK_CACHE;
3110 hc_pop_items(&history_items); /* select and take items out of history cache */
3111 UNLOCK_CACHE;
3112
3113 if (0 != history_items.values_num)
3114 {
3115 if (0 == (history_num = DCconfig_lock_triggers_by_history_items(&history_items, &triggerids)))
3116 {
3117 LOCK_CACHE;
3118 hc_push_items(&history_items);
3119 UNLOCK_CACHE;
3120 zbx_vector_ptr_clear(&history_items);
3121 }
3122 }
3123 else
3124 history_num = 0;
3125
3126 if (0 != history_num)
3127 {
3128 hc_get_item_values(history, &history_items); /* copy item data from history cache */
3129
3130 items = (DC_ITEM *)zbx_malloc(NULL, sizeof(DC_ITEM) * (size_t)history_num);
3131 errcodes = (int *)zbx_malloc(NULL, sizeof(int) * (size_t)history_num);
3132
3133 zbx_vector_uint64_reserve(&itemids, history_num);
3134
3135 for (i = 0; i < history_num; i++)
3136 zbx_vector_uint64_append(&itemids, history[i].itemid);
3137
3138 zbx_vector_uint64_sort(&itemids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
3139
3140 DCconfig_get_items_by_itemids_partial(items, itemids.values, errcodes, history_num,
3141 item_retrieve_mode);
3142
3143 DCmass_prepare_history(history, &itemids, items, errcodes, history_num, &item_diff,
3144 &inventory_values, compression_age, &proxy_subscribtions);
3145
3146 if (FAIL != (ret = DBmass_add_history(history, history_num)))
3147 {
3148 DCconfig_items_apply_changes(&item_diff);
3149 DCmass_update_trends(history, history_num, &trends, &trends_num, compression_age);
3150
3151 if (0 != trends_num)
3152 zbx_tfc_invalidate_trends(trends, trends_num);
3153
3154 do
3155 {
3156 DBbegin();
3157
3158 DBmass_update_items(&item_diff, &inventory_values);
3159 DBmass_update_trends(trends, trends_num, &trends_diff);
3160
3161 /* process internal events generated by DCmass_prepare_history() */
3162 zbx_process_events(NULL, NULL);
3163
3164 if (ZBX_DB_OK == (txn_error = DBcommit()))
3165 DCupdate_trends(&trends_diff);
3166 else
3167 zbx_reset_event_recovery();
3168
3169 zbx_vector_uint64_pair_clear(&trends_diff);
3170 }
3171 while (ZBX_DB_DOWN == txn_error);
3172 }
3173
3174 zbx_clean_events();
3175
3176 zbx_vector_ptr_clear_ext(&inventory_values, (zbx_clean_func_t)DCinventory_value_free);
3177 zbx_vector_ptr_clear_ext(&item_diff, (zbx_clean_func_t)zbx_ptr_free);
3178 }
3179
3180 if (FAIL != ret)
3181 {
3182 /* don't process trigger timers when server is shutting down */
3183 if (ZBX_IS_RUNNING())
3184 {
3185 zbx_dc_get_trigger_timers(&trigger_timers, time(NULL), ZBX_HC_TIMER_SOFT_MAX,
3186 ZBX_HC_TIMER_MAX);
3187 }
3188
3189 timers_num = trigger_timers.values_num;
3190
3191 if (ZBX_HC_TIMER_SOFT_MAX <= timers_num)
3192 *more = ZBX_SYNC_MORE;
3193
3194 if (0 != history_num || 0 != timers_num)
3195 {
3196 for (i = 0; i < trigger_timers.values_num; i++)
3197 {
3198 zbx_trigger_timer_t *timer = (zbx_trigger_timer_t *)trigger_timers.values[i];
3199
3200 if (0 != timer->lock)
3201 zbx_vector_uint64_append(&triggerids, timer->triggerid);
3202 }
3203
3204 do
3205 {
3206 DBbegin();
3207
3208 recalculate_triggers(history, history_num, &itemids, items, errcodes,
3209 &trigger_timers, &trigger_diff);
3210
3211 /* process trigger events generated by recalculate_triggers() */
3212 zbx_process_events(&trigger_diff, &triggerids);
3213 if (0 != trigger_diff.values_num)
3214 zbx_db_save_trigger_changes(&trigger_diff);
3215
3216 if (ZBX_DB_OK == (txn_error = DBcommit()))
3217 {
3218 DCconfig_triggers_apply_changes(&trigger_diff);
3219 DBupdate_itservices(&trigger_diff);
3220 }
3221 else
3222 zbx_clean_events();
3223
3224 zbx_vector_ptr_clear_ext(&trigger_diff, (zbx_clean_func_t)zbx_trigger_diff_free);
3225 }
3226 while (ZBX_DB_DOWN == txn_error);
3227 }
3228 }
3229
3230 if (0 != triggerids.values_num)
3231 {
3232 *triggers_num += triggerids.values_num;
3233 DCconfig_unlock_triggers(&triggerids);
3234 zbx_vector_uint64_clear(&triggerids);
3235 }
3236
3237 if (0 != trigger_timers.values_num)
3238 {
3239 zbx_dc_reschedule_trigger_timers(&trigger_timers, time(NULL));
3240 zbx_vector_ptr_clear(&trigger_timers);
3241 }
3242
3243 if (0 != proxy_subscribtions.values_num)
3244 {
3245 zbx_vector_uint64_pair_sort(&proxy_subscribtions, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
3246 zbx_dc_proxy_update_nodata(&proxy_subscribtions);
3247 zbx_vector_uint64_pair_clear(&proxy_subscribtions);
3248 }
3249
3250 if (0 != history_num)
3251 {
3252 LOCK_CACHE;
3253 hc_push_items(&history_items); /* return items to history cache */
3254 cache->history_num -= history_num;
3255
3256 if (0 != hc_queue_get_size())
3257 {
3258 /* Continue sync if enough of sync candidates were processed */
3259 /* (meaning most of sync candidates are not locked by triggers). */
3260 /* Otherwise better to wait a bit for other syncers to unlock */
3261 /* items rather than trying and failing to sync locked items over */
3262 /* and over again. */
3263 if (ZBX_HC_SYNC_MIN_PCNT <= history_num * 100 / history_items.values_num)
3264 *more = ZBX_SYNC_MORE;
3265 }
3266
3267 UNLOCK_CACHE;
3268
3269 *values_num += history_num;
3270 }
3271
3272 if (FAIL != ret)
3273 {
3274 if (0 != history_num)
3275 {
3276 const ZBX_DC_HISTORY *phistory = NULL;
3277 const ZBX_DC_TREND *ptrends = NULL;
3278 int history_num_loc = 0, trends_num_loc = 0;
3279
3280 DCmodule_prepare_history(history, history_num, history_float, &history_float_num,
3281 history_integer, &history_integer_num, history_string,
3282 &history_string_num, history_text, &history_text_num, history_log,
3283 &history_log_num);
3284
3285 DCmodule_sync_history(history_float_num, history_integer_num, history_string_num,
3286 history_text_num, history_log_num, history_float, history_integer,
3287 history_string, history_text, history_log);
3288
3289 if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_HISTORY))
3290 {
3291 phistory = history;
3292 history_num_loc = history_num;
3293 }
3294
3295 if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_TRENDS))
3296 {
3297 ptrends = trends;
3298 trends_num_loc = trends_num;
3299 }
3300
3301 if (NULL != phistory || NULL != ptrends)
3302 {
3303 DCexport_history_and_trends(phistory, history_num_loc, &itemids, items,
3304 errcodes, ptrends, trends_num_loc);
3305 }
3306 }
3307
3308 if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_EVENTS))
3309 zbx_export_events();
3310 }
3311
3312 if (0 != history_num || 0 != timers_num)
3313 zbx_clean_events();
3314
3315 if (0 != history_num)
3316 {
3317 zbx_free(trends);
3318 DCconfig_clean_items(items, errcodes, history_num);
3319 zbx_free(errcodes);
3320 zbx_free(items);
3321
3322 zbx_vector_ptr_clear(&history_items);
3323 hc_free_item_values(history, history_num);
3324 }
3325
3326 zbx_vector_uint64_destroy(&itemids);
3327
3328 /* Exit from sync loop if we have spent too much time here. */
3329 /* This is done to allow syncer process to update its statistics. */
3330 }
3331 while (ZBX_SYNC_MORE == *more && ZBX_HC_SYNC_TIME_MAX >= time(NULL) - sync_start);
3332
3333 zbx_vector_ptr_destroy(&history_items);
3334 zbx_vector_ptr_destroy(&inventory_values);
3335 zbx_vector_ptr_destroy(&item_diff);
3336 zbx_vector_ptr_destroy(&trigger_diff);
3337 zbx_vector_uint64_pair_destroy(&trends_diff);
3338 zbx_vector_uint64_pair_destroy(&proxy_subscribtions);
3339
3340 zbx_vector_ptr_destroy(&trigger_timers);
3341 zbx_vector_uint64_destroy(&triggerids);
3342 }
3343
3344 /******************************************************************************
3345 * *
3346 * Function: sync_history_cache_full *
3347 * *
3348 * Purpose: writes updates and new data from history cache to database *
3349 * *
3350 * Comments: This function is used to flush history cache at server/proxy *
3351 * exit. *
3352 * Other processes are already terminated, so cache locking is *
3353 * unnecessary. *
3354 * *
3355 ******************************************************************************/
sync_history_cache_full(void)3356 static void sync_history_cache_full(void)
3357 {
3358 int values_num = 0, triggers_num = 0, more;
3359 zbx_hashset_iter_t iter;
3360 zbx_hc_item_t *item;
3361 zbx_binary_heap_t tmp_history_queue;
3362
3363 zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d", __func__, cache->history_num);
3364
3365 /* History index cache might be full without any space left for queueing items from history index to */
3366 /* history queue. The solution: replace the shared-memory history queue with heap-allocated one. Add */
3367 /* all items from history index to the new history queue. */
3368 /* */
3369 /* Assertions that must be true. */
3370 /* * This is the main server or proxy process, */
3371 /* * There are no other users of history index cache stored in shared memory. Other processes */
3372 /* should have quit by this point. */
3373 /* * other parts of the program do not hold pointers to the elements of history queue that is */
3374 /* stored in the shared memory. */
3375
3376 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
3377 {
3378 /* unlock all triggers before full sync so no items are locked by triggers */
3379 DCconfig_unlock_all_triggers();
3380 }
3381
3382 tmp_history_queue = cache->history_queue;
3383
3384 zbx_binary_heap_create(&cache->history_queue, hc_queue_elem_compare_func, ZBX_BINARY_HEAP_OPTION_EMPTY);
3385 zbx_hashset_iter_reset(&cache->history_items, &iter);
3386
3387 /* add all items from history index to the new history queue */
3388 while (NULL != (item = (zbx_hc_item_t *)zbx_hashset_iter_next(&iter)))
3389 {
3390 if (NULL != item->tail)
3391 {
3392 item->status = ZBX_HC_ITEM_STATUS_NORMAL;
3393 hc_queue_item(item);
3394 }
3395 }
3396
3397 if (0 != hc_queue_get_size())
3398 {
3399 zabbix_log(LOG_LEVEL_WARNING, "syncing history data...");
3400
3401 do
3402 {
3403 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
3404 sync_server_history(&values_num, &triggers_num, &more);
3405 else
3406 sync_proxy_history(&values_num, &more);
3407
3408 zabbix_log(LOG_LEVEL_WARNING, "syncing history data... " ZBX_FS_DBL "%%",
3409 (double)values_num / (cache->history_num + values_num) * 100);
3410 }
3411 while (0 != hc_queue_get_size());
3412
3413 zabbix_log(LOG_LEVEL_WARNING, "syncing history data done");
3414 }
3415
3416 zbx_binary_heap_destroy(&cache->history_queue);
3417 cache->history_queue = tmp_history_queue;
3418
3419 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
3420 }
3421
3422 /******************************************************************************
3423 * *
3424 * Function: zbx_log_sync_history_cache_progress *
3425 * *
3426 * Purpose: log progress of syncing history data *
3427 * *
3428 ******************************************************************************/
zbx_log_sync_history_cache_progress(void)3429 void zbx_log_sync_history_cache_progress(void)
3430 {
3431 double pcnt = -1.0;
3432 int ts_last, ts_next, sec;
3433
3434 LOCK_CACHE;
3435
3436 if (INT_MAX == cache->history_progress_ts)
3437 {
3438 UNLOCK_CACHE;
3439 return;
3440 }
3441
3442 ts_last = cache->history_progress_ts;
3443 sec = time(NULL);
3444
3445 if (0 == cache->history_progress_ts)
3446 {
3447 cache->history_num_total = cache->history_num;
3448 cache->history_progress_ts = sec;
3449 }
3450
3451 if (ZBX_HC_SYNC_TIME_MAX <= sec - cache->history_progress_ts || 0 == cache->history_num)
3452 {
3453 if (0 != cache->history_num_total)
3454 pcnt = 100 * (double)(cache->history_num_total - cache->history_num) / cache->history_num_total;
3455
3456 cache->history_progress_ts = (0 == cache->history_num ? INT_MAX : sec);
3457 }
3458
3459 ts_next = cache->history_progress_ts;
3460
3461 UNLOCK_CACHE;
3462
3463 if (0 == ts_last)
3464 zabbix_log(LOG_LEVEL_WARNING, "syncing history data in progress... ");
3465
3466 if (-1.0 != pcnt)
3467 zabbix_log(LOG_LEVEL_WARNING, "syncing history data... " ZBX_FS_DBL "%%", pcnt);
3468
3469 if (INT_MAX == ts_next)
3470 zabbix_log(LOG_LEVEL_WARNING, "syncing history data done");
3471 }
3472
3473 /******************************************************************************
3474 * *
3475 * Function: zbx_sync_history_cache *
3476 * *
3477 * Purpose: writes updates and new data from history cache to database *
3478 * *
3479 * Parameters: values_num - [OUT] the number of synced values *
3480 * more - [OUT] a flag indicating the cache emptiness: *
3481 * ZBX_SYNC_DONE - nothing to sync, go idle *
3482 * ZBX_SYNC_MORE - more data to sync *
3483 * *
3484 ******************************************************************************/
zbx_sync_history_cache(int * values_num,int * triggers_num,int * more)3485 void zbx_sync_history_cache(int *values_num, int *triggers_num, int *more)
3486 {
3487 zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d", __func__, cache->history_num);
3488
3489 *values_num = 0;
3490 *triggers_num = 0;
3491
3492 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
3493 sync_server_history(values_num, triggers_num, more);
3494 else
3495 sync_proxy_history(values_num, more);
3496 }
3497
3498 /******************************************************************************
3499 * *
3500 * local history cache *
3501 * *
3502 ******************************************************************************/
dc_string_buffer_realloc(size_t len)3503 static void dc_string_buffer_realloc(size_t len)
3504 {
3505 if (string_values_alloc >= string_values_offset + len)
3506 return;
3507
3508 do
3509 {
3510 string_values_alloc += ZBX_STRING_REALLOC_STEP;
3511 }
3512 while (string_values_alloc < string_values_offset + len);
3513
3514 string_values = (char *)zbx_realloc(string_values, string_values_alloc);
3515 }
3516
dc_local_get_history_slot(void)3517 static dc_item_value_t *dc_local_get_history_slot(void)
3518 {
3519 if (ZBX_MAX_VALUES_LOCAL == item_values_num)
3520 dc_flush_history();
3521
3522 if (item_values_alloc == item_values_num)
3523 {
3524 item_values_alloc += ZBX_STRUCT_REALLOC_STEP;
3525 item_values = (dc_item_value_t *)zbx_realloc(item_values, item_values_alloc * sizeof(dc_item_value_t));
3526 }
3527
3528 return &item_values[item_values_num++];
3529 }
3530
dc_local_add_history_dbl(zbx_uint64_t itemid,unsigned char item_value_type,const zbx_timespec_t * ts,double value_orig,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)3531 static void dc_local_add_history_dbl(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts,
3532 double value_orig, zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
3533 {
3534 dc_item_value_t *item_value;
3535
3536 item_value = dc_local_get_history_slot();
3537
3538 item_value->itemid = itemid;
3539 item_value->ts = *ts;
3540 item_value->item_value_type = item_value_type;
3541 item_value->value_type = ITEM_VALUE_TYPE_FLOAT;
3542 item_value->state = ITEM_STATE_NORMAL;
3543 item_value->flags = flags;
3544
3545 if (0 != (item_value->flags & ZBX_DC_FLAG_META))
3546 {
3547 item_value->lastlogsize = lastlogsize;
3548 item_value->mtime = mtime;
3549 }
3550
3551 if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE))
3552 item_value->value.value_dbl = value_orig;
3553 }
3554
dc_local_add_history_uint(zbx_uint64_t itemid,unsigned char item_value_type,const zbx_timespec_t * ts,zbx_uint64_t value_orig,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)3555 static void dc_local_add_history_uint(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts,
3556 zbx_uint64_t value_orig, zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
3557 {
3558 dc_item_value_t *item_value;
3559
3560 item_value = dc_local_get_history_slot();
3561
3562 item_value->itemid = itemid;
3563 item_value->ts = *ts;
3564 item_value->item_value_type = item_value_type;
3565 item_value->value_type = ITEM_VALUE_TYPE_UINT64;
3566 item_value->state = ITEM_STATE_NORMAL;
3567 item_value->flags = flags;
3568
3569 if (0 != (item_value->flags & ZBX_DC_FLAG_META))
3570 {
3571 item_value->lastlogsize = lastlogsize;
3572 item_value->mtime = mtime;
3573 }
3574
3575 if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE))
3576 item_value->value.value_uint = value_orig;
3577 }
3578
dc_local_add_history_text(zbx_uint64_t itemid,unsigned char item_value_type,const zbx_timespec_t * ts,const char * value_orig,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)3579 static void dc_local_add_history_text(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts,
3580 const char *value_orig, zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
3581 {
3582 dc_item_value_t *item_value;
3583
3584 item_value = dc_local_get_history_slot();
3585
3586 item_value->itemid = itemid;
3587 item_value->ts = *ts;
3588 item_value->item_value_type = item_value_type;
3589 item_value->value_type = ITEM_VALUE_TYPE_TEXT;
3590 item_value->state = ITEM_STATE_NORMAL;
3591 item_value->flags = flags;
3592
3593 if (0 != (item_value->flags & ZBX_DC_FLAG_META))
3594 {
3595 item_value->lastlogsize = lastlogsize;
3596 item_value->mtime = mtime;
3597 }
3598
3599 if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE))
3600 {
3601 item_value->value.value_str.len = zbx_db_strlen_n(value_orig, ZBX_HISTORY_VALUE_LEN) + 1;
3602 dc_string_buffer_realloc(item_value->value.value_str.len);
3603
3604 item_value->value.value_str.pvalue = string_values_offset;
3605 memcpy(&string_values[string_values_offset], value_orig, item_value->value.value_str.len);
3606 string_values_offset += item_value->value.value_str.len;
3607 }
3608 else
3609 item_value->value.value_str.len = 0;
3610 }
3611
dc_local_add_history_log(zbx_uint64_t itemid,unsigned char item_value_type,const zbx_timespec_t * ts,const zbx_log_t * log,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)3612 static void dc_local_add_history_log(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts,
3613 const zbx_log_t *log, zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
3614 {
3615 dc_item_value_t *item_value;
3616
3617 item_value = dc_local_get_history_slot();
3618
3619 item_value->itemid = itemid;
3620 item_value->ts = *ts;
3621 item_value->item_value_type = item_value_type;
3622 item_value->value_type = ITEM_VALUE_TYPE_LOG;
3623 item_value->state = ITEM_STATE_NORMAL;
3624
3625 item_value->flags = flags;
3626
3627 if (0 != (item_value->flags & ZBX_DC_FLAG_META))
3628 {
3629 item_value->lastlogsize = lastlogsize;
3630 item_value->mtime = mtime;
3631 }
3632
3633 if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE))
3634 {
3635 item_value->severity = log->severity;
3636 item_value->logeventid = log->logeventid;
3637 item_value->timestamp = log->timestamp;
3638
3639 item_value->value.value_str.len = zbx_db_strlen_n(log->value, ZBX_HISTORY_VALUE_LEN) + 1;
3640
3641 if (NULL != log->source && '\0' != *log->source)
3642 item_value->source.len = zbx_db_strlen_n(log->source, HISTORY_LOG_SOURCE_LEN) + 1;
3643 else
3644 item_value->source.len = 0;
3645 }
3646 else
3647 {
3648 item_value->value.value_str.len = 0;
3649 item_value->source.len = 0;
3650 }
3651
3652 if (0 != item_value->value.value_str.len + item_value->source.len)
3653 {
3654 dc_string_buffer_realloc(item_value->value.value_str.len + item_value->source.len);
3655
3656 if (0 != item_value->value.value_str.len)
3657 {
3658 item_value->value.value_str.pvalue = string_values_offset;
3659 memcpy(&string_values[string_values_offset], log->value, item_value->value.value_str.len);
3660 string_values_offset += item_value->value.value_str.len;
3661 }
3662
3663 if (0 != item_value->source.len)
3664 {
3665 item_value->source.pvalue = string_values_offset;
3666 memcpy(&string_values[string_values_offset], log->source, item_value->source.len);
3667 string_values_offset += item_value->source.len;
3668 }
3669 }
3670 }
3671
dc_local_add_history_notsupported(zbx_uint64_t itemid,const zbx_timespec_t * ts,const char * error,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)3672 static void dc_local_add_history_notsupported(zbx_uint64_t itemid, const zbx_timespec_t *ts, const char *error,
3673 zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
3674 {
3675 dc_item_value_t *item_value;
3676
3677 item_value = dc_local_get_history_slot();
3678
3679 item_value->itemid = itemid;
3680 item_value->ts = *ts;
3681 item_value->state = ITEM_STATE_NOTSUPPORTED;
3682 item_value->flags = flags;
3683
3684 if (0 != (item_value->flags & ZBX_DC_FLAG_META))
3685 {
3686 item_value->lastlogsize = lastlogsize;
3687 item_value->mtime = mtime;
3688 }
3689
3690 item_value->value.value_str.len = zbx_db_strlen_n(error, ITEM_ERROR_LEN) + 1;
3691 dc_string_buffer_realloc(item_value->value.value_str.len);
3692 item_value->value.value_str.pvalue = string_values_offset;
3693 memcpy(&string_values[string_values_offset], error, item_value->value.value_str.len);
3694 string_values_offset += item_value->value.value_str.len;
3695 }
3696
dc_local_add_history_lld(zbx_uint64_t itemid,const zbx_timespec_t * ts,const char * value_orig)3697 static void dc_local_add_history_lld(zbx_uint64_t itemid, const zbx_timespec_t *ts, const char *value_orig)
3698 {
3699 dc_item_value_t *item_value;
3700
3701 item_value = dc_local_get_history_slot();
3702
3703 item_value->itemid = itemid;
3704 item_value->ts = *ts;
3705 item_value->state = ITEM_STATE_NORMAL;
3706 item_value->flags = ZBX_DC_FLAG_LLD;
3707 item_value->value.value_str.len = strlen(value_orig) + 1;
3708
3709 dc_string_buffer_realloc(item_value->value.value_str.len);
3710 item_value->value.value_str.pvalue = string_values_offset;
3711 memcpy(&string_values[string_values_offset], value_orig, item_value->value.value_str.len);
3712 string_values_offset += item_value->value.value_str.len;
3713 }
3714
dc_local_add_history_empty(zbx_uint64_t itemid,unsigned char item_value_type,const zbx_timespec_t * ts,unsigned char flags)3715 static void dc_local_add_history_empty(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts,
3716 unsigned char flags)
3717 {
3718 dc_item_value_t *item_value;
3719
3720 item_value = dc_local_get_history_slot();
3721
3722 item_value->itemid = itemid;
3723 item_value->ts = *ts;
3724 item_value->item_value_type = item_value_type;
3725 item_value->value_type = ITEM_VALUE_TYPE_NONE;
3726 item_value->state = ITEM_STATE_NORMAL;
3727 item_value->flags = flags;
3728 }
3729
3730 /******************************************************************************
3731 * *
3732 * Function: dc_add_history *
3733 * *
3734 * Purpose: add new value to the cache *
3735 * *
3736 * Parameters: itemid - [IN] the itemid *
3737 * item_value_type - [IN] the item value type *
3738 * item_flags - [IN] the item flags (e. g. lld rule) *
3739 * result - [IN] agent result containing the value *
3740 * to add *
3741 * ts - [IN] the value timestamp *
3742 * state - [IN] the item state *
3743 * error - [IN] the error message in case item state *
3744 * is ITEM_STATE_NOTSUPPORTED *
3745 * *
3746 ******************************************************************************/
dc_add_history(zbx_uint64_t itemid,unsigned char item_value_type,unsigned char item_flags,AGENT_RESULT * result,const zbx_timespec_t * ts,unsigned char state,const char * error)3747 void dc_add_history(zbx_uint64_t itemid, unsigned char item_value_type, unsigned char item_flags,
3748 AGENT_RESULT *result, const zbx_timespec_t *ts, unsigned char state, const char *error)
3749 {
3750 unsigned char value_flags;
3751
3752 if (ITEM_STATE_NOTSUPPORTED == state)
3753 {
3754 zbx_uint64_t lastlogsize;
3755 int mtime;
3756
3757 if (NULL != result && 0 != ISSET_META(result))
3758 {
3759 value_flags = ZBX_DC_FLAG_META;
3760 lastlogsize = result->lastlogsize;
3761 mtime = result->mtime;
3762 }
3763 else
3764 {
3765 value_flags = 0;
3766 lastlogsize = 0;
3767 mtime = 0;
3768 }
3769 dc_local_add_history_notsupported(itemid, ts, error, lastlogsize, mtime, value_flags);
3770 return;
3771 }
3772
3773 /* allow proxy to send timestamps of empty (throttled etc) values to update nextchecks for queue */
3774 if (!ISSET_VALUE(result) && !ISSET_META(result) && 0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
3775 return;
3776
3777 value_flags = 0;
3778
3779 if (!ISSET_VALUE(result))
3780 value_flags |= ZBX_DC_FLAG_NOVALUE;
3781
3782 if (ISSET_META(result))
3783 value_flags |= ZBX_DC_FLAG_META;
3784
3785 /* Add data to the local history cache if: */
3786 /* 1) the NOVALUE flag is set (data contains either meta information or timestamp) */
3787 /* 2) the NOVALUE flag is not set and value conversion succeeded */
3788
3789 if (0 == (value_flags & ZBX_DC_FLAG_NOVALUE))
3790 {
3791 if (0 != (ZBX_FLAG_DISCOVERY_RULE & item_flags))
3792 {
3793 if (NULL == GET_TEXT_RESULT(result))
3794 return;
3795
3796 /* proxy stores low-level discovery (lld) values in db */
3797 if (0 == (ZBX_PROGRAM_TYPE_SERVER & program_type))
3798 dc_local_add_history_lld(itemid, ts, result->text);
3799
3800 return;
3801 }
3802
3803 if (ISSET_LOG(result))
3804 {
3805 dc_local_add_history_log(itemid, item_value_type, ts, result->log, result->lastlogsize,
3806 result->mtime, value_flags);
3807 }
3808 else if (ISSET_UI64(result))
3809 {
3810 dc_local_add_history_uint(itemid, item_value_type, ts, result->ui64, result->lastlogsize,
3811 result->mtime, value_flags);
3812 }
3813 else if (ISSET_DBL(result))
3814 {
3815 dc_local_add_history_dbl(itemid, item_value_type, ts, result->dbl, result->lastlogsize,
3816 result->mtime, value_flags);
3817 }
3818 else if (ISSET_STR(result))
3819 {
3820 dc_local_add_history_text(itemid, item_value_type, ts, result->str, result->lastlogsize,
3821 result->mtime, value_flags);
3822 }
3823 else if (ISSET_TEXT(result))
3824 {
3825 dc_local_add_history_text(itemid, item_value_type, ts, result->text, result->lastlogsize,
3826 result->mtime, value_flags);
3827 }
3828 else
3829 {
3830 THIS_SHOULD_NEVER_HAPPEN;
3831 }
3832 }
3833 else
3834 {
3835 if (0 != (value_flags & ZBX_DC_FLAG_META))
3836 {
3837 dc_local_add_history_log(itemid, item_value_type, ts, NULL, result->lastlogsize, result->mtime,
3838 value_flags);
3839 }
3840 else
3841 dc_local_add_history_empty(itemid, item_value_type, ts, value_flags);
3842 }
3843 }
3844
dc_flush_history(void)3845 void dc_flush_history(void)
3846 {
3847 if (0 == item_values_num)
3848 return;
3849
3850 LOCK_CACHE;
3851
3852 hc_add_item_values(item_values, item_values_num);
3853
3854 cache->history_num += item_values_num;
3855
3856 UNLOCK_CACHE;
3857
3858 item_values_num = 0;
3859 string_values_offset = 0;
3860 }
3861
3862 /******************************************************************************
3863 * *
3864 * history cache storage *
3865 * *
3866 ******************************************************************************/
ZBX_MEM_FUNC_IMPL(__hc_index,hc_index_mem)3867 ZBX_MEM_FUNC_IMPL(__hc_index, hc_index_mem)
3868 ZBX_MEM_FUNC_IMPL(__hc, hc_mem)
3869
3870 /******************************************************************************
3871 * *
3872 * Function: hc_queue_elem_compare_func *
3873 * *
3874 * Purpose: compares history queue elements *
3875 * *
3876 ******************************************************************************/
3877 static int hc_queue_elem_compare_func(const void *d1, const void *d2)
3878 {
3879 const zbx_binary_heap_elem_t *e1 = (const zbx_binary_heap_elem_t *)d1;
3880 const zbx_binary_heap_elem_t *e2 = (const zbx_binary_heap_elem_t *)d2;
3881
3882 const zbx_hc_item_t *item1 = (const zbx_hc_item_t *)e1->data;
3883 const zbx_hc_item_t *item2 = (const zbx_hc_item_t *)e2->data;
3884
3885 /* compare by timestamp of the oldest value */
3886 return zbx_timespec_compare(&item1->tail->ts, &item2->tail->ts);
3887 }
3888
3889 /******************************************************************************
3890 * *
3891 * Function: hc_free_data *
3892 * *
3893 * Purpose: free history item data allocated in history cache *
3894 * *
3895 * Parameters: data - [IN] history item data *
3896 * *
3897 ******************************************************************************/
hc_free_data(zbx_hc_data_t * data)3898 static void hc_free_data(zbx_hc_data_t *data)
3899 {
3900 if (ITEM_STATE_NOTSUPPORTED == data->state)
3901 {
3902 __hc_mem_free_func(data->value.str);
3903 }
3904 else
3905 {
3906 if (0 == (data->flags & ZBX_DC_FLAG_NOVALUE))
3907 {
3908 switch (data->value_type)
3909 {
3910 case ITEM_VALUE_TYPE_STR:
3911 case ITEM_VALUE_TYPE_TEXT:
3912 __hc_mem_free_func(data->value.str);
3913 break;
3914 case ITEM_VALUE_TYPE_LOG:
3915 __hc_mem_free_func(data->value.log->value);
3916
3917 if (NULL != data->value.log->source)
3918 __hc_mem_free_func(data->value.log->source);
3919
3920 __hc_mem_free_func(data->value.log);
3921 break;
3922 }
3923 }
3924 }
3925
3926 __hc_mem_free_func(data);
3927 }
3928
3929 /******************************************************************************
3930 * *
3931 * Function: hc_queue_item *
3932 * *
3933 * Purpose: put back item into history queue *
3934 * *
3935 * Parameters: data - [IN] history item data *
3936 * *
3937 ******************************************************************************/
hc_queue_item(zbx_hc_item_t * item)3938 static void hc_queue_item(zbx_hc_item_t *item)
3939 {
3940 zbx_binary_heap_elem_t elem = {item->itemid, (const void *)item};
3941
3942 zbx_binary_heap_insert(&cache->history_queue, &elem);
3943 }
3944
3945 /******************************************************************************
3946 * *
3947 * Function: hc_get_item *
3948 * *
3949 * Purpose: returns history item by itemid *
3950 * *
3951 * Parameters: itemid - [IN] the item id *
3952 * *
3953 * Return value: the history item or NULL if the requested item is not in *
3954 * history cache *
3955 * *
3956 ******************************************************************************/
hc_get_item(zbx_uint64_t itemid)3957 static zbx_hc_item_t *hc_get_item(zbx_uint64_t itemid)
3958 {
3959 return (zbx_hc_item_t *)zbx_hashset_search(&cache->history_items, &itemid);
3960 }
3961
3962 /******************************************************************************
3963 * *
3964 * Function: hc_add_item *
3965 * *
3966 * Purpose: adds a new item to history cache *
3967 * *
3968 * Parameters: itemid - [IN] the item id *
3969 * [IN] the item data *
3970 * *
3971 * Return value: the added history item *
3972 * *
3973 ******************************************************************************/
hc_add_item(zbx_uint64_t itemid,zbx_hc_data_t * data)3974 static zbx_hc_item_t *hc_add_item(zbx_uint64_t itemid, zbx_hc_data_t *data)
3975 {
3976 zbx_hc_item_t item_local = {itemid, ZBX_HC_ITEM_STATUS_NORMAL, 0, data, data};
3977
3978 return (zbx_hc_item_t *)zbx_hashset_insert(&cache->history_items, &item_local, sizeof(item_local));
3979 }
3980
3981 /******************************************************************************
3982 * *
3983 * Function: hc_mem_value_str_dup *
3984 * *
3985 * Purpose: copies string value to history cache *
3986 * *
3987 * Parameters: str - [IN] the string value *
3988 * *
3989 * Return value: the copied string or NULL if there was not enough memory *
3990 * *
3991 ******************************************************************************/
hc_mem_value_str_dup(const dc_value_str_t * str)3992 static char *hc_mem_value_str_dup(const dc_value_str_t *str)
3993 {
3994 char *ptr;
3995
3996 if (NULL == (ptr = (char *)__hc_mem_malloc_func(NULL, str->len)))
3997 return NULL;
3998
3999 memcpy(ptr, &string_values[str->pvalue], str->len - 1);
4000 ptr[str->len - 1] = '\0';
4001
4002 return ptr;
4003 }
4004
4005 /******************************************************************************
4006 * *
4007 * Function: hc_clone_history_str_data *
4008 * *
4009 * Purpose: clones string value into history data memory *
4010 * *
4011 * Parameters: dst - [IN/OUT] a reference to the cloned value *
4012 * str - [IN] the string value to clone *
4013 * *
4014 * Return value: SUCCESS - either there was no need to clone the string *
4015 * (it was empty or already cloned) or the string was *
4016 * cloned successfully *
4017 * FAIL - not enough memory *
4018 * *
4019 * Comments: This function can be called in loop with the same dst value *
4020 * until it finishes cloning string value. *
4021 * *
4022 ******************************************************************************/
hc_clone_history_str_data(char ** dst,const dc_value_str_t * str)4023 static int hc_clone_history_str_data(char **dst, const dc_value_str_t *str)
4024 {
4025 if (0 == str->len)
4026 return SUCCEED;
4027
4028 if (NULL != *dst)
4029 return SUCCEED;
4030
4031 if (NULL != (*dst = hc_mem_value_str_dup(str)))
4032 return SUCCEED;
4033
4034 return FAIL;
4035 }
4036
4037 /******************************************************************************
4038 * *
4039 * Function: hc_clone_history_log_data *
4040 * *
4041 * Purpose: clones log value into history data memory *
4042 * *
4043 * Parameters: dst - [IN/OUT] a reference to the cloned value *
4044 * item_value - [IN] the log value to clone *
4045 * *
4046 * Return value: SUCCESS - the log value was cloned successfully *
4047 * FAIL - not enough memory *
4048 * *
4049 * Comments: This function can be called in loop with the same dst value *
4050 * until it finishes cloning log value. *
4051 * *
4052 ******************************************************************************/
hc_clone_history_log_data(zbx_log_value_t ** dst,const dc_item_value_t * item_value)4053 static int hc_clone_history_log_data(zbx_log_value_t **dst, const dc_item_value_t *item_value)
4054 {
4055 if (NULL == *dst)
4056 {
4057 /* using realloc instead of malloc just to suppress 'not used' warning for realloc */
4058 if (NULL == (*dst = (zbx_log_value_t *)__hc_mem_realloc_func(NULL, sizeof(zbx_log_value_t))))
4059 return FAIL;
4060
4061 memset(*dst, 0, sizeof(zbx_log_value_t));
4062 }
4063
4064 if (SUCCEED != hc_clone_history_str_data(&(*dst)->value, &item_value->value.value_str))
4065 return FAIL;
4066
4067 if (SUCCEED != hc_clone_history_str_data(&(*dst)->source, &item_value->source))
4068 return FAIL;
4069
4070 (*dst)->logeventid = item_value->logeventid;
4071 (*dst)->severity = item_value->severity;
4072 (*dst)->timestamp = item_value->timestamp;
4073
4074 return SUCCEED;
4075 }
4076
4077 /******************************************************************************
4078 * *
4079 * Function: hc_clone_history_data *
4080 * *
4081 * Purpose: clones item value from local cache into history cache *
4082 * *
4083 * Parameters: data - [IN/OUT] a reference to the cloned value *
4084 * item_value - [IN] the item value *
4085 * *
4086 * Return value: SUCCESS - the item value was cloned successfully *
4087 * FAIL - not enough memory *
4088 * *
4089 * Comments: This function can be called in loop with the same data value *
4090 * until it finishes cloning item value. *
4091 * *
4092 ******************************************************************************/
hc_clone_history_data(zbx_hc_data_t ** data,const dc_item_value_t * item_value)4093 static int hc_clone_history_data(zbx_hc_data_t **data, const dc_item_value_t *item_value)
4094 {
4095 if (NULL == *data)
4096 {
4097 if (NULL == (*data = (zbx_hc_data_t *)__hc_mem_malloc_func(NULL, sizeof(zbx_hc_data_t))))
4098 return FAIL;
4099
4100 memset(*data, 0, sizeof(zbx_hc_data_t));
4101
4102 (*data)->state = item_value->state;
4103 (*data)->ts = item_value->ts;
4104 (*data)->flags = item_value->flags;
4105 }
4106
4107 if (0 != (ZBX_DC_FLAG_META & item_value->flags))
4108 {
4109 (*data)->lastlogsize = item_value->lastlogsize;
4110 (*data)->mtime = item_value->mtime;
4111 }
4112
4113 if (ITEM_STATE_NOTSUPPORTED == item_value->state)
4114 {
4115 if (NULL == ((*data)->value.str = hc_mem_value_str_dup(&item_value->value.value_str)))
4116 return FAIL;
4117
4118 (*data)->value_type = item_value->value_type;
4119 cache->stats.notsupported_counter++;
4120
4121 return SUCCEED;
4122 }
4123
4124 if (0 != (ZBX_DC_FLAG_LLD & item_value->flags))
4125 {
4126 if (NULL == ((*data)->value.str = hc_mem_value_str_dup(&item_value->value.value_str)))
4127 return FAIL;
4128
4129 (*data)->value_type = ITEM_VALUE_TYPE_TEXT;
4130
4131 cache->stats.history_text_counter++;
4132 cache->stats.history_counter++;
4133
4134 return SUCCEED;
4135 }
4136
4137 if (0 == (ZBX_DC_FLAG_NOVALUE & item_value->flags))
4138 {
4139 switch (item_value->value_type)
4140 {
4141 case ITEM_VALUE_TYPE_FLOAT:
4142 (*data)->value.dbl = item_value->value.value_dbl;
4143 break;
4144 case ITEM_VALUE_TYPE_UINT64:
4145 (*data)->value.ui64 = item_value->value.value_uint;
4146 break;
4147 case ITEM_VALUE_TYPE_STR:
4148 if (SUCCEED != hc_clone_history_str_data(&(*data)->value.str,
4149 &item_value->value.value_str))
4150 {
4151 return FAIL;
4152 }
4153 break;
4154 case ITEM_VALUE_TYPE_TEXT:
4155 if (SUCCEED != hc_clone_history_str_data(&(*data)->value.str,
4156 &item_value->value.value_str))
4157 {
4158 return FAIL;
4159 }
4160 break;
4161 case ITEM_VALUE_TYPE_LOG:
4162 if (SUCCEED != hc_clone_history_log_data(&(*data)->value.log, item_value))
4163 return FAIL;
4164 break;
4165 }
4166
4167 switch (item_value->item_value_type)
4168 {
4169 case ITEM_VALUE_TYPE_FLOAT:
4170 cache->stats.history_float_counter++;
4171 break;
4172 case ITEM_VALUE_TYPE_UINT64:
4173 cache->stats.history_uint_counter++;
4174 break;
4175 case ITEM_VALUE_TYPE_STR:
4176 cache->stats.history_str_counter++;
4177 break;
4178 case ITEM_VALUE_TYPE_TEXT:
4179 cache->stats.history_text_counter++;
4180 break;
4181 case ITEM_VALUE_TYPE_LOG:
4182 cache->stats.history_log_counter++;
4183 break;
4184 }
4185
4186 cache->stats.history_counter++;
4187 }
4188
4189 (*data)->value_type = item_value->value_type;
4190
4191 return SUCCEED;
4192 }
4193
4194 /******************************************************************************
4195 * *
4196 * Function: hc_add_item_values *
4197 * *
4198 * Purpose: adds item values to the history cache *
4199 * *
4200 * Parameters: values - [IN] the item values to add *
4201 * values_num - [IN] the number of item values to add *
4202 * *
4203 * Comments: If the history cache is full this function will wait until *
4204 * history syncers processes values freeing enough space to store *
4205 * the new value. *
4206 * *
4207 ******************************************************************************/
hc_add_item_values(dc_item_value_t * values,int values_num)4208 static void hc_add_item_values(dc_item_value_t *values, int values_num)
4209 {
4210 dc_item_value_t *item_value;
4211 int i;
4212 zbx_hc_item_t *item;
4213
4214 for (i = 0; i < values_num; i++)
4215 {
4216 zbx_hc_data_t *data = NULL;
4217
4218 item_value = &values[i];
4219
4220 /* a record with metadata and no value can be dropped if */
4221 /* the metadata update is copied to the last queued value */
4222 if (NULL != (item = hc_get_item(item_value->itemid)) &&
4223 0 != (item_value->flags & ZBX_DC_FLAG_NOVALUE) &&
4224 0 != (item_value->flags & ZBX_DC_FLAG_META))
4225 {
4226 /* skip metadata updates when only one value is queued, */
4227 /* because the item might be already being processed */
4228 if (item->head != item->tail)
4229 {
4230 item->head->lastlogsize = item_value->lastlogsize;
4231 item->head->mtime = item_value->mtime;
4232 item->head->flags |= ZBX_DC_FLAG_META;
4233 continue;
4234 }
4235 }
4236
4237 if (SUCCEED != hc_clone_history_data(&data, item_value))
4238 {
4239 do
4240 {
4241 UNLOCK_CACHE;
4242
4243 zabbix_log(LOG_LEVEL_DEBUG, "History cache is full. Sleeping for 1 second.");
4244 sleep(1);
4245
4246 LOCK_CACHE;
4247 }
4248 while (SUCCEED != hc_clone_history_data(&data, item_value));
4249
4250 item = hc_get_item(item_value->itemid);
4251 }
4252
4253 if (NULL == item)
4254 {
4255 item = hc_add_item(item_value->itemid, data);
4256 hc_queue_item(item);
4257 }
4258 else
4259 {
4260 item->head->next = data;
4261 item->head = data;
4262 }
4263 item->values_num++;
4264 }
4265 }
4266
4267 /******************************************************************************
4268 * *
4269 * Function: hc_copy_history_data *
4270 * *
4271 * Purpose: copies item value from history cache into the specified history *
4272 * value *
4273 * *
4274 * Parameters: history - [OUT] the history value *
4275 * itemid - [IN] the item identifier *
4276 * data - [IN] the history data to copy *
4277 * *
4278 * Comments: handling of uninitialized fields in dc_add_proxy_history_log() *
4279 * *
4280 ******************************************************************************/
hc_copy_history_data(ZBX_DC_HISTORY * history,zbx_uint64_t itemid,zbx_hc_data_t * data)4281 static void hc_copy_history_data(ZBX_DC_HISTORY *history, zbx_uint64_t itemid, zbx_hc_data_t *data)
4282 {
4283 history->itemid = itemid;
4284 history->ts = data->ts;
4285 history->state = data->state;
4286 history->flags = data->flags;
4287 history->lastlogsize = data->lastlogsize;
4288 history->mtime = data->mtime;
4289
4290 if (ITEM_STATE_NOTSUPPORTED == data->state)
4291 {
4292 history->value.err = zbx_strdup(NULL, data->value.str);
4293 history->flags |= ZBX_DC_FLAG_UNDEF;
4294 return;
4295 }
4296
4297 history->value_type = data->value_type;
4298
4299 if (0 == (ZBX_DC_FLAG_NOVALUE & data->flags))
4300 {
4301 switch (data->value_type)
4302 {
4303 case ITEM_VALUE_TYPE_FLOAT:
4304 history->value.dbl = data->value.dbl;
4305 break;
4306 case ITEM_VALUE_TYPE_UINT64:
4307 history->value.ui64 = data->value.ui64;
4308 break;
4309 case ITEM_VALUE_TYPE_STR:
4310 case ITEM_VALUE_TYPE_TEXT:
4311 history->value.str = zbx_strdup(NULL, data->value.str);
4312 break;
4313 case ITEM_VALUE_TYPE_LOG:
4314 history->value.log = (zbx_log_value_t *)zbx_malloc(NULL, sizeof(zbx_log_value_t));
4315 history->value.log->value = zbx_strdup(NULL, data->value.log->value);
4316
4317 if (NULL != data->value.log->source)
4318 history->value.log->source = zbx_strdup(NULL, data->value.log->source);
4319 else
4320 history->value.log->source = NULL;
4321
4322 history->value.log->timestamp = data->value.log->timestamp;
4323 history->value.log->severity = data->value.log->severity;
4324 history->value.log->logeventid = data->value.log->logeventid;
4325
4326 break;
4327 }
4328 }
4329 }
4330
4331 /******************************************************************************
4332 * *
4333 * Function: hc_pop_items *
4334 * *
4335 * Purpose: pops the next batch of history items from cache for processing *
4336 * *
4337 * Parameters: history_items - [OUT] the locked history items *
4338 * *
4339 * Comments: The history_items must be returned back to history cache with *
4340 * hc_push_items() function after they have been processed. *
4341 * *
4342 ******************************************************************************/
hc_pop_items(zbx_vector_ptr_t * history_items)4343 static void hc_pop_items(zbx_vector_ptr_t *history_items)
4344 {
4345 zbx_binary_heap_elem_t *elem;
4346 zbx_hc_item_t *item;
4347
4348 while (ZBX_HC_SYNC_MAX > history_items->values_num && FAIL == zbx_binary_heap_empty(&cache->history_queue))
4349 {
4350 elem = zbx_binary_heap_find_min(&cache->history_queue);
4351 item = (zbx_hc_item_t *)elem->data;
4352 zbx_vector_ptr_append(history_items, item);
4353
4354 zbx_binary_heap_remove_min(&cache->history_queue);
4355 }
4356 }
4357
4358 /******************************************************************************
4359 * *
4360 * Function: hc_get_item_values *
4361 * *
4362 * Purpose: gets item history values *
4363 * *
4364 * Parameters: history - [OUT] the history valeus *
4365 * history_items - [IN] the history items *
4366 * *
4367 ******************************************************************************/
hc_get_item_values(ZBX_DC_HISTORY * history,zbx_vector_ptr_t * history_items)4368 static void hc_get_item_values(ZBX_DC_HISTORY *history, zbx_vector_ptr_t *history_items)
4369 {
4370 int i, history_num = 0;
4371 zbx_hc_item_t *item;
4372
4373 /* we don't need to lock history cache because no other processes can */
4374 /* change item's history data until it is pushed back to history queue */
4375 for (i = 0; i < history_items->values_num; i++)
4376 {
4377 item = (zbx_hc_item_t *)history_items->values[i];
4378
4379 if (ZBX_HC_ITEM_STATUS_BUSY == item->status)
4380 continue;
4381
4382 hc_copy_history_data(&history[history_num++], item->itemid, item->tail);
4383 }
4384 }
4385
4386 /******************************************************************************
4387 * *
4388 * Function: hc_push_processed_items *
4389 * *
4390 * Purpose: push back the processed history items into history cache *
4391 * *
4392 * Parameters: history_items - [IN] the history items containing processed *
4393 * (available) and busy items *
4394 * *
4395 * Comments: This function removes processed value from history cache. *
4396 * If there is no more data for this item, then the item itself is *
4397 * removed from history index. *
4398 * *
4399 ******************************************************************************/
hc_push_items(zbx_vector_ptr_t * history_items)4400 void hc_push_items(zbx_vector_ptr_t *history_items)
4401 {
4402 int i;
4403 zbx_hc_item_t *item;
4404 zbx_hc_data_t *data_free;
4405
4406 for (i = 0; i < history_items->values_num; i++)
4407 {
4408 item = (zbx_hc_item_t *)history_items->values[i];
4409
4410 switch (item->status)
4411 {
4412 case ZBX_HC_ITEM_STATUS_BUSY:
4413 /* reset item status before returning it to queue */
4414 item->status = ZBX_HC_ITEM_STATUS_NORMAL;
4415 hc_queue_item(item);
4416 break;
4417 case ZBX_HC_ITEM_STATUS_NORMAL:
4418 item->values_num--;
4419 data_free = item->tail;
4420 item->tail = item->tail->next;
4421 hc_free_data(data_free);
4422 if (NULL == item->tail)
4423 zbx_hashset_remove(&cache->history_items, item);
4424 else
4425 hc_queue_item(item);
4426 break;
4427 }
4428 }
4429 }
4430
4431 /******************************************************************************
4432 * *
4433 * Function: hc_queue_get_size *
4434 * *
4435 * Purpose: retrieve the size of history queue *
4436 * *
4437 ******************************************************************************/
hc_queue_get_size(void)4438 int hc_queue_get_size(void)
4439 {
4440 return cache->history_queue.elems_num;
4441 }
4442
hc_get_history_compression_age(void)4443 int hc_get_history_compression_age(void)
4444 {
4445 #if defined(HAVE_POSTGRESQL)
4446 zbx_config_t cfg;
4447 int compression_age = 0;
4448
4449 zbx_config_get(&cfg, ZBX_CONFIG_FLAGS_DB_EXTENSION);
4450
4451 if (ON == cfg.db.history_compression_status && 0 != cfg.db.history_compress_older)
4452 {
4453 compression_age = (int)time(NULL) - cfg.db.history_compress_older;
4454 }
4455
4456 zbx_config_clean(&cfg);
4457
4458 return compression_age;
4459 #else
4460 return 0;
4461 #endif
4462 }
4463
4464 /******************************************************************************
4465 * *
4466 * Function: init_trend_cache *
4467 * *
4468 * Purpose: Allocate shared memory for trend cache (part of database cache) *
4469 * *
4470 * Author: Vladimir Levijev *
4471 * *
4472 * Comments: Is optionally called from init_database_cache() *
4473 * *
4474 ******************************************************************************/
4475
ZBX_MEM_FUNC_IMPL(__trend,trend_mem)4476 ZBX_MEM_FUNC_IMPL(__trend, trend_mem)
4477
4478 static int init_trend_cache(char **error)
4479 {
4480 size_t sz;
4481 int ret;
4482
4483 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
4484
4485 if (SUCCEED != (ret = zbx_mutex_create(&trends_lock, ZBX_MUTEX_TRENDS, error)))
4486 goto out;
4487
4488 sz = zbx_mem_required_size(1, "trend cache", "TrendCacheSize");
4489 if (SUCCEED != (ret = zbx_mem_create(&trend_mem, CONFIG_TRENDS_CACHE_SIZE, "trend cache", "TrendCacheSize", 0,
4490 error)))
4491 {
4492 goto out;
4493 }
4494
4495 CONFIG_TRENDS_CACHE_SIZE -= sz;
4496
4497 cache->trends_num = 0;
4498 cache->trends_last_cleanup_hour = 0;
4499
4500 #define INIT_HASHSET_SIZE 100 /* Should be calculated dynamically based on trends size? */
4501 /* Still does not make sense to have it more than initial */
4502 /* item hashset size in configuration cache. */
4503
4504 zbx_hashset_create_ext(&cache->trends, INIT_HASHSET_SIZE,
4505 ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, NULL,
4506 __trend_mem_malloc_func, __trend_mem_realloc_func, __trend_mem_free_func);
4507
4508 #undef INIT_HASHSET_SIZE
4509 out:
4510 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
4511
4512 return ret;
4513 }
4514
4515 /******************************************************************************
4516 * *
4517 * Function: init_database_cache *
4518 * *
4519 * Purpose: Allocate shared memory for database cache *
4520 * *
4521 * Author: Alexei Vladishev, Alexander Vladishev *
4522 * *
4523 ******************************************************************************/
init_database_cache(char ** error)4524 int init_database_cache(char **error)
4525 {
4526 int ret;
4527
4528 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
4529
4530 if (SUCCEED != (ret = zbx_mutex_create(&cache_lock, ZBX_MUTEX_CACHE, error)))
4531 goto out;
4532
4533 if (SUCCEED != (ret = zbx_mutex_create(&cache_ids_lock, ZBX_MUTEX_CACHE_IDS, error)))
4534 goto out;
4535
4536 if (SUCCEED != (ret = zbx_mem_create(&hc_mem, CONFIG_HISTORY_CACHE_SIZE, "history cache",
4537 "HistoryCacheSize", 1, error)))
4538 {
4539 goto out;
4540 }
4541
4542 if (SUCCEED != (ret = zbx_mem_create(&hc_index_mem, CONFIG_HISTORY_INDEX_CACHE_SIZE, "history index cache",
4543 "HistoryIndexCacheSize", 0, error)))
4544 {
4545 goto out;
4546 }
4547
4548 cache = (ZBX_DC_CACHE *)__hc_index_mem_malloc_func(NULL, sizeof(ZBX_DC_CACHE));
4549 memset(cache, 0, sizeof(ZBX_DC_CACHE));
4550
4551 ids = (ZBX_DC_IDS *)__hc_index_mem_malloc_func(NULL, sizeof(ZBX_DC_IDS));
4552 memset(ids, 0, sizeof(ZBX_DC_IDS));
4553
4554 zbx_hashset_create_ext(&cache->history_items, ZBX_HC_ITEMS_INIT_SIZE,
4555 ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, NULL,
4556 __hc_index_mem_malloc_func, __hc_index_mem_realloc_func, __hc_index_mem_free_func);
4557
4558 zbx_binary_heap_create_ext(&cache->history_queue, hc_queue_elem_compare_func, ZBX_BINARY_HEAP_OPTION_EMPTY,
4559 __hc_index_mem_malloc_func, __hc_index_mem_realloc_func, __hc_index_mem_free_func);
4560
4561 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
4562 {
4563 zbx_hashset_create_ext(&(cache->proxyqueue.index), ZBX_HC_SYNC_MAX,
4564 ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, NULL,
4565 __hc_index_mem_malloc_func, __hc_index_mem_realloc_func, __hc_index_mem_free_func);
4566
4567 zbx_list_create_ext(&(cache->proxyqueue.list), __hc_index_mem_malloc_func, __hc_index_mem_free_func);
4568
4569 cache->proxyqueue.state = ZBX_HC_PROXYQUEUE_STATE_NORMAL;
4570
4571 if (SUCCEED != (ret = init_trend_cache(error)))
4572 goto out;
4573 }
4574
4575 cache->history_num_total = 0;
4576 cache->history_progress_ts = 0;
4577
4578 cache->db_trigger_queue_lock = 1;
4579
4580 if (NULL == sql)
4581 sql = (char *)zbx_malloc(sql, sql_alloc);
4582 out:
4583 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
4584
4585 return ret;
4586 }
4587
4588 /******************************************************************************
4589 * *
4590 * Function: DCsync_all *
4591 * *
4592 * Purpose: writes updates and new data from pool and cache data to database *
4593 * *
4594 * Author: Alexei Vladishev *
4595 * *
4596 ******************************************************************************/
DCsync_all(void)4597 static void DCsync_all(void)
4598 {
4599 zabbix_log(LOG_LEVEL_DEBUG, "In DCsync_all()");
4600
4601 sync_history_cache_full();
4602 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
4603 DCsync_trends();
4604
4605 zabbix_log(LOG_LEVEL_DEBUG, "End of DCsync_all()");
4606 }
4607
4608 /******************************************************************************
4609 * *
4610 * Function: free_database_cache *
4611 * *
4612 * Purpose: Free memory allocated for database cache *
4613 * *
4614 * Author: Alexei Vladishev, Alexander Vladishev *
4615 * *
4616 ******************************************************************************/
free_database_cache(void)4617 void free_database_cache(void)
4618 {
4619 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
4620
4621 DCsync_all();
4622
4623 cache = NULL;
4624
4625 zbx_mutex_destroy(&cache_lock);
4626 zbx_mutex_destroy(&cache_ids_lock);
4627
4628 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
4629 zbx_mutex_destroy(&trends_lock);
4630
4631 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
4632 }
4633
4634 /******************************************************************************
4635 * *
4636 * Function: DCget_nextid *
4637 * *
4638 * Purpose: Return next id for requested table *
4639 * *
4640 * Author: Alexander Vladishev *
4641 * *
4642 ******************************************************************************/
DCget_nextid(const char * table_name,int num)4643 zbx_uint64_t DCget_nextid(const char *table_name, int num)
4644 {
4645 int i;
4646 DB_RESULT result;
4647 DB_ROW row;
4648 const ZBX_TABLE *table;
4649 ZBX_DC_ID *id;
4650 zbx_uint64_t min = 0, max = ZBX_DB_MAX_ID, nextid, lastid;
4651
4652 zabbix_log(LOG_LEVEL_DEBUG, "In %s() table:'%s' num:%d", __func__, table_name, num);
4653
4654 LOCK_CACHE_IDS;
4655
4656 for (i = 0; i < ZBX_IDS_SIZE; i++)
4657 {
4658 id = &ids->id[i];
4659 if ('\0' == *id->table_name)
4660 break;
4661
4662 if (0 == strcmp(id->table_name, table_name))
4663 {
4664 nextid = id->lastid + 1;
4665 id->lastid += num;
4666 lastid = id->lastid;
4667
4668 UNLOCK_CACHE_IDS;
4669
4670 zabbix_log(LOG_LEVEL_DEBUG, "End of %s() table:'%s' [" ZBX_FS_UI64 ":" ZBX_FS_UI64 "]",
4671 __func__, table_name, nextid, lastid);
4672
4673 return nextid;
4674 }
4675 }
4676
4677 if (i == ZBX_IDS_SIZE)
4678 {
4679 zabbix_log(LOG_LEVEL_ERR, "insufficient shared memory for ids");
4680 exit(EXIT_FAILURE);
4681 }
4682
4683 table = DBget_table(table_name);
4684
4685 result = DBselect("select max(%s) from %s where %s between " ZBX_FS_UI64 " and " ZBX_FS_UI64,
4686 table->recid, table_name, table->recid, min, max);
4687
4688 if (NULL != result)
4689 {
4690 zbx_strlcpy(id->table_name, table_name, sizeof(id->table_name));
4691
4692 if (NULL == (row = DBfetch(result)) || SUCCEED == DBis_null(row[0]))
4693 id->lastid = min;
4694 else
4695 ZBX_STR2UINT64(id->lastid, row[0]);
4696
4697 nextid = id->lastid + 1;
4698 id->lastid += num;
4699 lastid = id->lastid;
4700 }
4701 else
4702 nextid = lastid = 0;
4703
4704 UNLOCK_CACHE_IDS;
4705
4706 DBfree_result(result);
4707
4708 zabbix_log(LOG_LEVEL_DEBUG, "End of %s() table:'%s' [" ZBX_FS_UI64 ":" ZBX_FS_UI64 "]",
4709 __func__, table_name, nextid, lastid);
4710
4711 return nextid;
4712 }
4713
4714 /******************************************************************************
4715 * *
4716 * Function: DCupdate_interfaces_availability *
4717 * *
4718 * Purpose: performs interface availability reset for hosts with *
4719 * availability set on interfaces without enabled items *
4720 * *
4721 ******************************************************************************/
DCupdate_interfaces_availability(void)4722 void DCupdate_interfaces_availability(void)
4723 {
4724 zbx_vector_availability_ptr_t interfaces;
4725
4726 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
4727
4728 zbx_vector_availability_ptr_create(&interfaces);
4729
4730 if (SUCCEED != DCreset_interfaces_availability(&interfaces))
4731 goto out;
4732
4733 zbx_availabilities_flush(&interfaces);
4734 out:
4735 zbx_vector_availability_ptr_clear_ext(&interfaces, zbx_interface_availability_free);
4736 zbx_vector_availability_ptr_destroy(&interfaces);
4737
4738 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
4739 }
4740
4741 /******************************************************************************
4742 * *
4743 * Function: zbx_hc_get_diag_stats *
4744 * *
4745 * Purpose: get history cache diagnostics statistics *
4746 * *
4747 ******************************************************************************/
zbx_hc_get_diag_stats(zbx_uint64_t * items_num,zbx_uint64_t * values_num)4748 void zbx_hc_get_diag_stats(zbx_uint64_t *items_num, zbx_uint64_t *values_num)
4749 {
4750 LOCK_CACHE;
4751
4752 *values_num = cache->history_num;
4753 *items_num = cache->history_items.num_data;
4754
4755 UNLOCK_CACHE;
4756 }
4757
4758 /******************************************************************************
4759 * *
4760 * Function: zbx_hc_get_mem_stats *
4761 * *
4762 * Purpose: get shared memory allocator statistics *
4763 * *
4764 ******************************************************************************/
zbx_hc_get_mem_stats(zbx_mem_stats_t * data,zbx_mem_stats_t * index)4765 void zbx_hc_get_mem_stats(zbx_mem_stats_t *data, zbx_mem_stats_t *index)
4766 {
4767 LOCK_CACHE;
4768
4769 if (NULL != data)
4770 zbx_mem_get_stats(hc_mem, data);
4771
4772 if (NULL != index)
4773 zbx_mem_get_stats(hc_index_mem, index);
4774
4775 UNLOCK_CACHE;
4776 }
4777
4778 /******************************************************************************
4779 * *
4780 * Function: zbx_hc_get_items *
4781 * *
4782 * Purpose: get statistics of cached items *
4783 * *
4784 ******************************************************************************/
zbx_hc_get_items(zbx_vector_uint64_pair_t * items)4785 void zbx_hc_get_items(zbx_vector_uint64_pair_t *items)
4786 {
4787 zbx_hashset_iter_t iter;
4788 zbx_hc_item_t *item;
4789
4790 LOCK_CACHE;
4791
4792 zbx_vector_uint64_pair_reserve(items, cache->history_items.num_data);
4793
4794 zbx_hashset_iter_reset(&cache->history_items, &iter);
4795 while (NULL != (item = (zbx_hc_item_t *)zbx_hashset_iter_next(&iter)))
4796 {
4797 zbx_uint64_pair_t pair = {item->itemid, item->values_num};
4798 zbx_vector_uint64_pair_append_ptr(items, &pair);
4799 }
4800
4801 UNLOCK_CACHE;
4802 }
4803
4804 /******************************************************************************
4805 * *
4806 * Function: zbx_db_trigger_queue_locked *
4807 * *
4808 * Purpose: checks if database trigger queue table is locked *
4809 * *
4810 ******************************************************************************/
zbx_db_trigger_queue_locked(void)4811 int zbx_db_trigger_queue_locked(void)
4812 {
4813 return 0 == cache->db_trigger_queue_lock ? FAIL : SUCCEED;
4814 }
4815
4816 /******************************************************************************
4817 * *
4818 * Function: zbx_db_trigger_queue_unlock *
4819 * *
4820 * Purpose: unlocks database trigger queue table *
4821 * *
4822 ******************************************************************************/
zbx_db_trigger_queue_unlock(void)4823 void zbx_db_trigger_queue_unlock(void)
4824 {
4825 cache->db_trigger_queue_lock = 0;
4826 }
4827
4828
4829 /******************************************************************************
4830 * *
4831 * Function: zbx_hc_proxyqueue_peek *
4832 * *
4833 * Purpose: return first proxy in a queue, function assumes that a queue is *
4834 * not empty *
4835 * *
4836 * Return value: proxyid at the top a queue *
4837 * *
4838 ******************************************************************************/
zbx_hc_proxyqueue_peek(void)4839 static zbx_uint64_t zbx_hc_proxyqueue_peek(void)
4840 {
4841 zbx_uint64_t *p_val;
4842
4843 if (NULL == cache->proxyqueue.list.head)
4844 return 0;
4845
4846 p_val = (zbx_uint64_t *)(cache->proxyqueue.list.head->data);
4847
4848 return *p_val;
4849 }
4850
4851 /******************************************************************************
4852 * *
4853 * Function: zbx_hc_proxyqueue_enqueue *
4854 * *
4855 * Purpose: add new proxyid to a queue *
4856 * *
4857 * Parameters: proxyid - [IN] the proxy id *
4858 * *
4859 ******************************************************************************/
zbx_hc_proxyqueue_enqueue(zbx_uint64_t proxyid)4860 static void zbx_hc_proxyqueue_enqueue(zbx_uint64_t proxyid)
4861 {
4862 if (NULL == zbx_hashset_search(&cache->proxyqueue.index, &proxyid))
4863 {
4864 zbx_uint64_t *ptr;
4865
4866 ptr = zbx_hashset_insert(&cache->proxyqueue.index, &proxyid, sizeof(proxyid));
4867 zbx_list_append(&cache->proxyqueue.list, ptr, NULL);
4868 }
4869 }
4870
4871 /******************************************************************************
4872 * *
4873 * Function: zbx_hc_proxyqueue_dequeue *
4874 * *
4875 * Purpose: try to dequeue proxyid from a proxy queue *
4876 * *
4877 * Parameters: chk_proxyid - [IN] the proxyid *
4878 * *
4879 * Return value: SUCCEED - retrieval successful *
4880 * FAIL - otherwise *
4881 * *
4882 ******************************************************************************/
zbx_hc_proxyqueue_dequeue(zbx_uint64_t proxyid)4883 static int zbx_hc_proxyqueue_dequeue(zbx_uint64_t proxyid)
4884 {
4885 zbx_uint64_t top_val;
4886 void *rem_val = 0;
4887
4888 top_val = zbx_hc_proxyqueue_peek();
4889
4890 if (proxyid != top_val)
4891 return FAIL;
4892
4893 if (FAIL == zbx_list_pop(&cache->proxyqueue.list, &rem_val))
4894 return FAIL;
4895
4896 zbx_hashset_remove_direct(&cache->proxyqueue.index, rem_val);
4897
4898 return SUCCEED;
4899 }
4900
4901 /******************************************************************************
4902 * *
4903 * Function: zbx_hc_proxyqueue_clear *
4904 * *
4905 * Purpose: remove all proxies from proxy priority queue *
4906 * *
4907 ******************************************************************************/
zbx_hc_proxyqueue_clear(void)4908 static void zbx_hc_proxyqueue_clear(void)
4909 {
4910 zbx_list_destroy(&cache->proxyqueue.list);
4911 zbx_hashset_clear(&cache->proxyqueue.index);
4912 }
4913
4914 /******************************************************************************
4915 * *
4916 * Function: zbx_hc_check_proxy *
4917 * *
4918 * Purpose: check status of a history cache usage, enqueue/dequeue proxy *
4919 * from priority list and accordingly enable or disable wait mode *
4920 * *
4921 * Parameters: proxyid - [IN] the proxyid *
4922 * *
4923 * Return value: SUCCEED - proxy can be processed now *
4924 * FAIL - proxy cannot be processed now, it got enqueued *
4925 * *
4926 ******************************************************************************/
zbx_hc_check_proxy(zbx_uint64_t proxyid)4927 int zbx_hc_check_proxy(zbx_uint64_t proxyid)
4928 {
4929 double hc_pused;
4930 int ret;
4931
4932 zabbix_log(LOG_LEVEL_DEBUG, "In %s() proxyid:"ZBX_FS_UI64, __func__, proxyid);
4933
4934 LOCK_CACHE;
4935
4936 hc_pused = 100 * (double)(hc_mem->total_size - hc_mem->free_size) / hc_mem->total_size;
4937
4938 if (20 >= hc_pused)
4939 {
4940 cache->proxyqueue.state = ZBX_HC_PROXYQUEUE_STATE_NORMAL;
4941
4942 zbx_hc_proxyqueue_clear();
4943
4944 ret = SUCCEED;
4945 goto out;
4946 }
4947
4948 if (ZBX_HC_PROXYQUEUE_STATE_WAIT == cache->proxyqueue.state)
4949 {
4950 zbx_hc_proxyqueue_enqueue(proxyid);
4951
4952 if (60 < hc_pused)
4953 {
4954 ret = FAIL;
4955 goto out;
4956 }
4957
4958 cache->proxyqueue.state = ZBX_HC_PROXYQUEUE_STATE_NORMAL;
4959 }
4960 else
4961 {
4962 if (80 <= hc_pused)
4963 {
4964 cache->proxyqueue.state = ZBX_HC_PROXYQUEUE_STATE_WAIT;
4965 zbx_hc_proxyqueue_enqueue(proxyid);
4966
4967 ret = FAIL;
4968 goto out;
4969 }
4970 }
4971
4972 if (0 == zbx_hc_proxyqueue_peek())
4973 {
4974 ret = SUCCEED;
4975 goto out;
4976 }
4977
4978 ret = zbx_hc_proxyqueue_dequeue(proxyid);
4979
4980 out:
4981 UNLOCK_CACHE;
4982
4983 zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
4984
4985 return ret;
4986 }
4987