1 /*
2 ** Zabbix
3 ** Copyright (C) 2001-2021 Zabbix SIA
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
18 **/
19 
20 #include "common.h"
21 #include "log.h"
22 #include "threads.h"
23 
24 #include "db.h"
25 #include "dbcache.h"
26 #include "ipc.h"
27 #include "mutexs.h"
28 #include "zbxserver.h"
29 #include "proxy.h"
30 #include "events.h"
31 #include "memalloc.h"
32 #include "zbxalgo.h"
33 #include "valuecache.h"
34 #include "zbxmodules.h"
35 #include "module.h"
36 #include "export.h"
37 #include "zbxjson.h"
38 #include "zbxhistory.h"
39 #include "daemon.h"
40 #include "zbxavailability.h"
41 #include "zbxtrends.h"
42 #include "zbxalgo.h"
43 #include "../zbxalgo/vectorimpl.h"
44 
45 static zbx_mem_info_t	*hc_index_mem = NULL;
46 static zbx_mem_info_t	*hc_mem = NULL;
47 static zbx_mem_info_t	*trend_mem = NULL;
48 
49 #define	LOCK_CACHE	zbx_mutex_lock(cache_lock)
50 #define	UNLOCK_CACHE	zbx_mutex_unlock(cache_lock)
51 #define	LOCK_TRENDS	zbx_mutex_lock(trends_lock)
52 #define	UNLOCK_TRENDS	zbx_mutex_unlock(trends_lock)
53 #define	LOCK_CACHE_IDS		zbx_mutex_lock(cache_ids_lock)
54 #define	UNLOCK_CACHE_IDS	zbx_mutex_unlock(cache_ids_lock)
55 
56 static zbx_mutex_t	cache_lock = ZBX_MUTEX_NULL;
57 static zbx_mutex_t	trends_lock = ZBX_MUTEX_NULL;
58 static zbx_mutex_t	cache_ids_lock = ZBX_MUTEX_NULL;
59 
60 static char		*sql = NULL;
61 static size_t		sql_alloc = 4 * ZBX_KIBIBYTE;
62 
63 extern unsigned char	program_type;
64 extern int		CONFIG_DOUBLE_PRECISION;
65 extern char		*CONFIG_EXPORT_DIR;
66 
67 #define ZBX_IDS_SIZE	10
68 
69 #define ZBX_HC_ITEMS_INIT_SIZE	1000
70 
71 #define ZBX_TRENDS_CLEANUP_TIME	((SEC_PER_HOUR * 55) / 60)
72 
73 /* the maximum time spent synchronizing history */
74 #define ZBX_HC_SYNC_TIME_MAX	10
75 
76 /* the maximum number of items in one synchronization batch */
77 #define ZBX_HC_SYNC_MAX		1000
78 #define ZBX_HC_TIMER_MAX	(ZBX_HC_SYNC_MAX / 2)
79 #define ZBX_HC_TIMER_SOFT_MAX	(ZBX_HC_TIMER_MAX - 10)
80 
81 /* the minimum processed item percentage of item candidates to continue synchronizing */
82 #define ZBX_HC_SYNC_MIN_PCNT	10
83 
84 /* the maximum number of characters for history cache values */
85 #define ZBX_HISTORY_VALUE_LEN	(1024 * 64)
86 
87 #define ZBX_DC_FLAGS_NOT_FOR_HISTORY	(ZBX_DC_FLAG_NOVALUE | ZBX_DC_FLAG_UNDEF | ZBX_DC_FLAG_NOHISTORY)
88 #define ZBX_DC_FLAGS_NOT_FOR_TRENDS	(ZBX_DC_FLAG_NOVALUE | ZBX_DC_FLAG_UNDEF | ZBX_DC_FLAG_NOTRENDS)
89 #define ZBX_DC_FLAGS_NOT_FOR_MODULES	(ZBX_DC_FLAGS_NOT_FOR_HISTORY | ZBX_DC_FLAG_LLD)
90 #define ZBX_DC_FLAGS_NOT_FOR_EXPORT	(ZBX_DC_FLAG_NOVALUE | ZBX_DC_FLAG_UNDEF)
91 
92 #define ZBX_HC_PROXYQUEUE_STATE_NORMAL 0
93 #define ZBX_HC_PROXYQUEUE_STATE_WAIT 1
94 
95 typedef struct
96 {
97 	char		table_name[ZBX_TABLENAME_LEN_MAX];
98 	zbx_uint64_t	lastid;
99 }
100 ZBX_DC_ID;
101 
102 typedef struct
103 {
104 	ZBX_DC_ID	id[ZBX_IDS_SIZE];
105 }
106 ZBX_DC_IDS;
107 
108 static ZBX_DC_IDS	*ids = NULL;
109 
110 typedef struct
111 {
112 	zbx_list_t	list;
113 	zbx_hashset_t	index;
114 	int		state;
115 }
116 zbx_hc_proxyqueue_t;
117 
118 typedef struct
119 {
120 	zbx_hashset_t		trends;
121 	ZBX_DC_STATS		stats;
122 
123 	zbx_hashset_t		history_items;
124 	zbx_binary_heap_t	history_queue;
125 
126 	int			history_num;
127 	int			trends_num;
128 	int			trends_last_cleanup_hour;
129 	int			history_num_total;
130 	int			history_progress_ts;
131 
132 	unsigned char		db_trigger_queue_lock;
133 
134 	zbx_hc_proxyqueue_t     proxyqueue;
135 }
136 ZBX_DC_CACHE;
137 
138 static ZBX_DC_CACHE	*cache = NULL;
139 
140 /* local history cache */
141 #define ZBX_MAX_VALUES_LOCAL	256
142 #define ZBX_STRUCT_REALLOC_STEP	8
143 #define ZBX_STRING_REALLOC_STEP	ZBX_KIBIBYTE
144 
145 typedef struct
146 {
147 	size_t	pvalue;
148 	size_t	len;
149 }
150 dc_value_str_t;
151 
152 typedef struct
153 {
154 	double		value_dbl;
155 	zbx_uint64_t	value_uint;
156 	dc_value_str_t	value_str;
157 }
158 dc_value_t;
159 
160 typedef struct
161 {
162 	zbx_uint64_t	itemid;
163 	dc_value_t	value;
164 	zbx_timespec_t	ts;
165 	dc_value_str_t	source;		/* for log items only */
166 	zbx_uint64_t	lastlogsize;
167 	int		timestamp;	/* for log items only */
168 	int		severity;	/* for log items only */
169 	int		logeventid;	/* for log items only */
170 	int		mtime;
171 	unsigned char	item_value_type;
172 	unsigned char	value_type;
173 	unsigned char	state;
174 	unsigned char	flags;		/* see ZBX_DC_FLAG_* above */
175 }
176 dc_item_value_t;
177 
178 static char		*string_values = NULL;
179 static size_t		string_values_alloc = 0, string_values_offset = 0;
180 static dc_item_value_t	*item_values = NULL;
181 static size_t		item_values_alloc = 0, item_values_num = 0;
182 
183 static void	hc_add_item_values(dc_item_value_t *values, int values_num);
184 static void	hc_pop_items(zbx_vector_ptr_t *history_items);
185 static void	hc_get_item_values(ZBX_DC_HISTORY *history, zbx_vector_ptr_t *history_items);
186 static void	hc_push_items(zbx_vector_ptr_t *history_items);
187 static void	hc_free_item_values(ZBX_DC_HISTORY *history, int history_num);
188 static void	hc_queue_item(zbx_hc_item_t *item);
189 static int	hc_queue_elem_compare_func(const void *d1, const void *d2);
190 static int	hc_queue_get_size(void);
191 static int	hc_get_history_compression_age(void);
192 
ZBX_PTR_VECTOR_DECL(item_tag,zbx_tag_t)193 ZBX_PTR_VECTOR_DECL(item_tag, zbx_tag_t)
194 ZBX_PTR_VECTOR_IMPL(item_tag, zbx_tag_t)
195 
196 /******************************************************************************
197  *                                                                            *
198  * Function: DCget_stats_all                                                  *
199  *                                                                            *
200  * Purpose: retrieves all internal metrics of the database cache              *
201  *                                                                            *
202  * Parameters: stats - [OUT] write cache metrics                              *
203  *                                                                            *
204  ******************************************************************************/
205 void	DCget_stats_all(zbx_wcache_info_t *wcache_info)
206 {
207 	LOCK_CACHE;
208 
209 	wcache_info->stats = cache->stats;
210 	wcache_info->history_free = hc_mem->free_size;
211 	wcache_info->history_total = hc_mem->total_size;
212 	wcache_info->index_free = hc_index_mem->free_size;
213 	wcache_info->index_total = hc_index_mem->total_size;
214 
215 	if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
216 	{
217 		wcache_info->trend_free = trend_mem->free_size;
218 		wcache_info->trend_total = trend_mem->orig_size;
219 	}
220 
221 	UNLOCK_CACHE;
222 }
223 
224 /******************************************************************************
225  *                                                                            *
226  * Function: DCget_stats                                                      *
227  *                                                                            *
228  * Purpose: get statistics of the database cache                              *
229  *                                                                            *
230  * Author: Alexander Vladishev                                                *
231  *                                                                            *
232  ******************************************************************************/
DCget_stats(int request)233 void	*DCget_stats(int request)
234 {
235 	static zbx_uint64_t	value_uint;
236 	static double		value_double;
237 	void			*ret;
238 
239 	LOCK_CACHE;
240 
241 	switch (request)
242 	{
243 		case ZBX_STATS_HISTORY_COUNTER:
244 			value_uint = cache->stats.history_counter;
245 			ret = (void *)&value_uint;
246 			break;
247 		case ZBX_STATS_HISTORY_FLOAT_COUNTER:
248 			value_uint = cache->stats.history_float_counter;
249 			ret = (void *)&value_uint;
250 			break;
251 		case ZBX_STATS_HISTORY_UINT_COUNTER:
252 			value_uint = cache->stats.history_uint_counter;
253 			ret = (void *)&value_uint;
254 			break;
255 		case ZBX_STATS_HISTORY_STR_COUNTER:
256 			value_uint = cache->stats.history_str_counter;
257 			ret = (void *)&value_uint;
258 			break;
259 		case ZBX_STATS_HISTORY_LOG_COUNTER:
260 			value_uint = cache->stats.history_log_counter;
261 			ret = (void *)&value_uint;
262 			break;
263 		case ZBX_STATS_HISTORY_TEXT_COUNTER:
264 			value_uint = cache->stats.history_text_counter;
265 			ret = (void *)&value_uint;
266 			break;
267 		case ZBX_STATS_NOTSUPPORTED_COUNTER:
268 			value_uint = cache->stats.notsupported_counter;
269 			ret = (void *)&value_uint;
270 			break;
271 		case ZBX_STATS_HISTORY_TOTAL:
272 			value_uint = hc_mem->total_size;
273 			ret = (void *)&value_uint;
274 			break;
275 		case ZBX_STATS_HISTORY_USED:
276 			value_uint = hc_mem->total_size - hc_mem->free_size;
277 			ret = (void *)&value_uint;
278 			break;
279 		case ZBX_STATS_HISTORY_FREE:
280 			value_uint = hc_mem->free_size;
281 			ret = (void *)&value_uint;
282 			break;
283 		case ZBX_STATS_HISTORY_PUSED:
284 			value_double = 100 * (double)(hc_mem->total_size - hc_mem->free_size) / hc_mem->total_size;
285 			ret = (void *)&value_double;
286 			break;
287 		case ZBX_STATS_HISTORY_PFREE:
288 			value_double = 100 * (double)hc_mem->free_size / hc_mem->total_size;
289 			ret = (void *)&value_double;
290 			break;
291 		case ZBX_STATS_TREND_TOTAL:
292 			value_uint = trend_mem->orig_size;
293 			ret = (void *)&value_uint;
294 			break;
295 		case ZBX_STATS_TREND_USED:
296 			value_uint = trend_mem->orig_size - trend_mem->free_size;
297 			ret = (void *)&value_uint;
298 			break;
299 		case ZBX_STATS_TREND_FREE:
300 			value_uint = trend_mem->free_size;
301 			ret = (void *)&value_uint;
302 			break;
303 		case ZBX_STATS_TREND_PUSED:
304 			value_double = 100 * (double)(trend_mem->orig_size - trend_mem->free_size) /
305 					trend_mem->orig_size;
306 			ret = (void *)&value_double;
307 			break;
308 		case ZBX_STATS_TREND_PFREE:
309 			value_double = 100 * (double)trend_mem->free_size / trend_mem->orig_size;
310 			ret = (void *)&value_double;
311 			break;
312 		case ZBX_STATS_HISTORY_INDEX_TOTAL:
313 			value_uint = hc_index_mem->total_size;
314 			ret = (void *)&value_uint;
315 			break;
316 		case ZBX_STATS_HISTORY_INDEX_USED:
317 			value_uint = hc_index_mem->total_size - hc_index_mem->free_size;
318 			ret = (void *)&value_uint;
319 			break;
320 		case ZBX_STATS_HISTORY_INDEX_FREE:
321 			value_uint = hc_index_mem->free_size;
322 			ret = (void *)&value_uint;
323 			break;
324 		case ZBX_STATS_HISTORY_INDEX_PUSED:
325 			value_double = 100 * (double)(hc_index_mem->total_size - hc_index_mem->free_size) /
326 					hc_index_mem->total_size;
327 			ret = (void *)&value_double;
328 			break;
329 		case ZBX_STATS_HISTORY_INDEX_PFREE:
330 			value_double = 100 * (double)hc_index_mem->free_size / hc_index_mem->total_size;
331 			ret = (void *)&value_double;
332 			break;
333 		default:
334 			ret = NULL;
335 	}
336 
337 	UNLOCK_CACHE;
338 
339 	return ret;
340 }
341 
342 /******************************************************************************
343  *                                                                            *
344  * Function: DCget_trend                                                      *
345  *                                                                            *
346  * Purpose: find existing or add new structure and return pointer             *
347  *                                                                            *
348  * Return value: pointer to a trend structure                                 *
349  *                                                                            *
350  * Author: Alexander Vladishev                                                *
351  *                                                                            *
352  ******************************************************************************/
DCget_trend(zbx_uint64_t itemid)353 static ZBX_DC_TREND	*DCget_trend(zbx_uint64_t itemid)
354 {
355 	ZBX_DC_TREND	*ptr, trend;
356 
357 	if (NULL != (ptr = (ZBX_DC_TREND *)zbx_hashset_search(&cache->trends, &itemid)))
358 		return ptr;
359 
360 	memset(&trend, 0, sizeof(ZBX_DC_TREND));
361 	trend.itemid = itemid;
362 
363 	return (ZBX_DC_TREND *)zbx_hashset_insert(&cache->trends, &trend, sizeof(ZBX_DC_TREND));
364 }
365 
366 /******************************************************************************
367  *                                                                            *
368  * Function: DCupdate_trends                                                  *
369  *                                                                            *
370  * Purpose: apply disable_from changes to cache                               *
371  *                                                                            *
372  ******************************************************************************/
DCupdate_trends(zbx_vector_uint64_pair_t * trends_diff)373 static void	DCupdate_trends(zbx_vector_uint64_pair_t *trends_diff)
374 {
375 	int	i;
376 
377 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
378 
379 	LOCK_TRENDS;
380 
381 	for (i = 0; i < trends_diff->values_num; i++)
382 	{
383 		ZBX_DC_TREND	*trend;
384 
385 		if (NULL != (trend = (ZBX_DC_TREND *)zbx_hashset_search(&cache->trends, &trends_diff->values[i].first)))
386 			trend->disable_from = trends_diff->values[i].second;
387 	}
388 
389 	UNLOCK_TRENDS;
390 
391 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
392 }
393 
394 /******************************************************************************
395  *                                                                            *
396  * Function: dc_insert_trends_in_db                                           *
397  *                                                                            *
398  * Purpose: helper function for DCflush trends                                *
399  *                                                                            *
400  ******************************************************************************/
dc_insert_trends_in_db(ZBX_DC_TREND * trends,int trends_num,unsigned char value_type,const char * table_name,int clock)401 static void	dc_insert_trends_in_db(ZBX_DC_TREND *trends, int trends_num, unsigned char value_type,
402 		const char *table_name, int clock)
403 {
404 	ZBX_DC_TREND	*trend;
405 	int		i;
406 	zbx_db_insert_t	db_insert;
407 
408 	zbx_db_insert_prepare(&db_insert, table_name, "itemid", "clock", "num", "value_min", "value_avg",
409 			"value_max", NULL);
410 
411 	for (i = 0; i < trends_num; i++)
412 	{
413 		trend = &trends[i];
414 
415 		if (0 == trend->itemid)
416 			continue;
417 
418 		if (clock != trend->clock || value_type != trend->value_type)
419 			continue;
420 
421 		if (ITEM_VALUE_TYPE_FLOAT == value_type)
422 		{
423 			zbx_db_insert_add_values(&db_insert, trend->itemid, trend->clock, trend->num,
424 					trend->value_min.dbl, trend->value_avg.dbl, trend->value_max.dbl);
425 		}
426 		else
427 		{
428 			zbx_uint128_t	avg;
429 
430 			/* calculate the trend average value */
431 			udiv128_64(&avg, &trend->value_avg.ui64, trend->num);
432 
433 			zbx_db_insert_add_values(&db_insert, trend->itemid, trend->clock, trend->num,
434 					trend->value_min.ui64, avg.lo, trend->value_max.ui64);
435 		}
436 
437 		trend->itemid = 0;
438 	}
439 
440 	zbx_db_insert_execute(&db_insert);
441 	zbx_db_insert_clean(&db_insert);
442 }
443 
444 /******************************************************************************
445  *                                                                            *
446  * Function: dc_remove_updated_trends                                         *
447  *                                                                            *
448  * Purpose: Update trends disable_until for items without trends data past or *
449  *          equal the specified clock                                         *
450  *                                                                            *
451  * Comments: A helper function for DCflush trends                             *
452  *                                                                            *
453  ******************************************************************************/
dc_remove_updated_trends(ZBX_DC_TREND * trends,int trends_num,const char * table_name,int value_type,zbx_uint64_t * itemids,int * itemids_num,int clock)454 static void	dc_remove_updated_trends(ZBX_DC_TREND *trends, int trends_num, const char *table_name,
455 		int value_type, zbx_uint64_t *itemids, int *itemids_num, int clock)
456 {
457 	int		i, j, clocks_num, now, age;
458 	ZBX_DC_TREND	*trend;
459 	zbx_uint64_t	itemid;
460 	size_t		sql_offset;
461 	DB_RESULT	result;
462 	DB_ROW		row;
463 	int		clocks[] = {SEC_PER_DAY, SEC_PER_WEEK, SEC_PER_MONTH, SEC_PER_YEAR, INT_MAX};
464 
465 	now = time(NULL);
466 	age = now - clock;
467 	for (clocks_num = 0; age > clocks[clocks_num]; clocks_num++)
468 		clocks[clocks_num] = now - clocks[clocks_num];
469 	clocks[clocks_num] = clock;
470 
471 	/* remove itemids with trends data past or equal the clock */
472 	for (j = 0; j <= clocks_num && 0 < *itemids_num; j++)
473 	{
474 		sql_offset = 0;
475 		zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
476 				"select distinct itemid"
477 				" from %s"
478 				" where clock>=%d and",
479 				table_name, clocks[j]);
480 
481 		if (0 < j)
482 			zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " clock<%d and", clocks[j - 1]);
483 
484 		DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", itemids, *itemids_num);
485 
486 		result = DBselect("%s", sql);
487 
488 		while (NULL != (row = DBfetch(result)))
489 		{
490 			ZBX_STR2UINT64(itemid, row[0]);
491 			uint64_array_remove(itemids, itemids_num, &itemid, 1);
492 		}
493 		DBfree_result(result);
494 	}
495 
496 	/* update trends disable_until for the leftover itemids */
497 	while (0 != *itemids_num)
498 	{
499 		itemid = itemids[--*itemids_num];
500 
501 		for (i = 0; i < trends_num; i++)
502 		{
503 			trend = &trends[i];
504 
505 			if (itemid != trend->itemid)
506 				continue;
507 
508 			if (clock != trend->clock || value_type != trend->value_type)
509 				continue;
510 
511 			trend->disable_from = clock;
512 			break;
513 		}
514 	}
515 }
516 
517 /******************************************************************************
518  *                                                                            *
519  * Function: dc_trends_update_float                                           *
520  *                                                                            *
521  * Purpose: helper function for DCflush trends                                *
522  *                                                                            *
523  ******************************************************************************/
dc_trends_update_float(ZBX_DC_TREND * trend,DB_ROW row,int num,size_t * sql_offset)524 static void	dc_trends_update_float(ZBX_DC_TREND *trend, DB_ROW row, int num, size_t *sql_offset)
525 {
526 	history_value_t	value_min, value_avg, value_max;
527 
528 	value_min.dbl = atof(row[2]);
529 	value_avg.dbl = atof(row[3]);
530 	value_max.dbl = atof(row[4]);
531 
532 	if (value_min.dbl < trend->value_min.dbl)
533 		trend->value_min.dbl = value_min.dbl;
534 
535 	if (value_max.dbl > trend->value_max.dbl)
536 		trend->value_max.dbl = value_max.dbl;
537 
538 	trend->value_avg.dbl = trend->value_avg.dbl / (trend->num + num) * trend->num +
539 			value_avg.dbl / (trend->num + num) * num;
540 	trend->num += num;
541 
542 	zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset, "update trends set"
543 			" num=%d,value_min=" ZBX_FS_DBL64_SQL ",value_avg=" ZBX_FS_DBL64_SQL
544 			",value_max=" ZBX_FS_DBL64_SQL
545 			" where itemid=" ZBX_FS_UI64 " and clock=%d;\n",
546 			trend->num, trend->value_min.dbl, trend->value_avg.dbl, trend->value_max.dbl,
547 			trend->itemid, trend->clock);
548 }
549 
550 /******************************************************************************
551  *                                                                            *
552  * Function: dc_trends_update_uint                                            *
553  *                                                                            *
554  * Purpose: helper function for DCflush trends                                *
555  *                                                                            *
556  ******************************************************************************/
dc_trends_update_uint(ZBX_DC_TREND * trend,DB_ROW row,int num,size_t * sql_offset)557 static void	dc_trends_update_uint(ZBX_DC_TREND *trend, DB_ROW row, int num, size_t *sql_offset)
558 {
559 	history_value_t	value_min, value_avg, value_max;
560 	zbx_uint128_t	avg;
561 
562 	ZBX_STR2UINT64(value_min.ui64, row[2]);
563 	ZBX_STR2UINT64(value_avg.ui64, row[3]);
564 	ZBX_STR2UINT64(value_max.ui64, row[4]);
565 
566 	if (value_min.ui64 < trend->value_min.ui64)
567 		trend->value_min.ui64 = value_min.ui64;
568 	if (value_max.ui64 > trend->value_max.ui64)
569 		trend->value_max.ui64 = value_max.ui64;
570 
571 	/* calculate the trend average value */
572 	umul64_64(&avg, num, value_avg.ui64);
573 	uinc128_128(&trend->value_avg.ui64, &avg);
574 	udiv128_64(&avg, &trend->value_avg.ui64, trend->num + num);
575 
576 	trend->num += num;
577 
578 	zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset,
579 			"update trends_uint set num=%d,value_min=" ZBX_FS_UI64 ",value_avg="
580 			ZBX_FS_UI64 ",value_max=" ZBX_FS_UI64 " where itemid=" ZBX_FS_UI64
581 			" and clock=%d;\n",
582 			trend->num,
583 			trend->value_min.ui64,
584 			avg.lo,
585 			trend->value_max.ui64,
586 			trend->itemid,
587 			trend->clock);
588 }
589 
590 /******************************************************************************
591  *                                                                            *
592  * Function: dc_trends_fetch_and_update                                       *
593  *                                                                            *
594  * Purpose: helper function for DCflush trends                                *
595  *                                                                            *
596  ******************************************************************************/
dc_trends_fetch_and_update(ZBX_DC_TREND * trends,int trends_num,zbx_uint64_t * itemids,int itemids_num,int * inserts_num,unsigned char value_type,const char * table_name,int clock)597 static void	dc_trends_fetch_and_update(ZBX_DC_TREND *trends, int trends_num, zbx_uint64_t *itemids,
598 		int itemids_num, int *inserts_num, unsigned char value_type,
599 		const char *table_name, int clock)
600 {
601 
602 	int		i, num;
603 	DB_RESULT	result;
604 	DB_ROW		row;
605 	zbx_uint64_t	itemid;
606 	ZBX_DC_TREND	*trend;
607 	size_t		sql_offset;
608 
609 	sql_offset = 0;
610 	zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
611 			"select itemid,num,value_min,value_avg,value_max"
612 			" from %s"
613 			" where clock=%d and",
614 			table_name, clock);
615 
616 	DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", itemids, itemids_num);
617 
618 	result = DBselect("%s order by itemid,clock", sql);
619 
620 	sql_offset = 0;
621 	DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset);
622 
623 	while (NULL != (row = DBfetch(result)))
624 	{
625 		ZBX_STR2UINT64(itemid, row[0]);
626 
627 		for (i = 0; i < trends_num; i++)
628 		{
629 			trend = &trends[i];
630 
631 			if (itemid != trend->itemid)
632 				continue;
633 
634 			if (clock != trend->clock || value_type != trend->value_type)
635 				continue;
636 
637 			break;
638 		}
639 
640 		if (i == trends_num)
641 		{
642 			THIS_SHOULD_NEVER_HAPPEN;
643 			continue;
644 		}
645 
646 		num = atoi(row[1]);
647 
648 		if (value_type == ITEM_VALUE_TYPE_FLOAT)
649 			dc_trends_update_float(trend, row, num, &sql_offset);
650 		else
651 			dc_trends_update_uint(trend, row, num, &sql_offset);
652 
653 		trend->itemid = 0;
654 
655 		--*inserts_num;
656 
657 		DBexecute_overflowed_sql(&sql, &sql_alloc, &sql_offset);
658 	}
659 
660 	DBfree_result(result);
661 
662 	DBend_multiple_update(&sql, &sql_alloc, &sql_offset);
663 
664 	if (sql_offset > 16)	/* In ORACLE always present begin..end; */
665 		DBexecute("%s", sql);
666 }
667 
668 /******************************************************************************
669  *                                                                            *
670  * Function: DBflush_trends                                                   *
671  *                                                                            *
672  * Purpose: flush trend to the database                                       *
673  *                                                                            *
674  * Author: Alexander Vladishev                                                *
675  *                                                                            *
676  ******************************************************************************/
DBflush_trends(ZBX_DC_TREND * trends,int * trends_num,zbx_vector_uint64_pair_t * trends_diff)677 static void	DBflush_trends(ZBX_DC_TREND *trends, int *trends_num, zbx_vector_uint64_pair_t *trends_diff)
678 {
679 	int		num, i, clock, inserts_num = 0, itemids_alloc, itemids_num = 0, trends_to = *trends_num;
680 	unsigned char	value_type;
681 	zbx_uint64_t	*itemids = NULL;
682 	ZBX_DC_TREND	*trend = NULL;
683 	const char	*table_name;
684 
685 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() trends_num:%d", __func__, *trends_num);
686 
687 	clock = trends[0].clock;
688 	value_type = trends[0].value_type;
689 
690 	switch (value_type)
691 	{
692 		case ITEM_VALUE_TYPE_FLOAT:
693 			table_name = "trends";
694 			break;
695 		case ITEM_VALUE_TYPE_UINT64:
696 			table_name = "trends_uint";
697 			break;
698 		default:
699 			assert(0);
700 	}
701 
702 	itemids_alloc = MIN(ZBX_HC_SYNC_MAX, *trends_num);
703 	itemids = (zbx_uint64_t *)zbx_malloc(itemids, itemids_alloc * sizeof(zbx_uint64_t));
704 
705 	for (i = 0; i < *trends_num; i++)
706 	{
707 		trend = &trends[i];
708 
709 		if (clock != trend->clock || value_type != trend->value_type)
710 			continue;
711 
712 		inserts_num++;
713 
714 		if (0 != trend->disable_from)
715 			continue;
716 
717 		uint64_array_add(&itemids, &itemids_alloc, &itemids_num, trend->itemid, 64);
718 
719 		if (ZBX_HC_SYNC_MAX == itemids_num)
720 		{
721 			trends_to = i + 1;
722 			break;
723 		}
724 	}
725 
726 	if (0 != itemids_num)
727 	{
728 		dc_remove_updated_trends(trends, trends_to, table_name, value_type, itemids,
729 				&itemids_num, clock);
730 	}
731 
732 	for (i = 0; i < trends_to; i++)
733 	{
734 		trend = &trends[i];
735 
736 		if (clock != trend->clock || value_type != trend->value_type)
737 			continue;
738 
739 		if (0 != trend->disable_from && clock >= trend->disable_from)
740 			continue;
741 
742 		uint64_array_add(&itemids, &itemids_alloc, &itemids_num, trend->itemid, 64);
743 	}
744 
745 	if (0 != itemids_num)
746 	{
747 		dc_trends_fetch_and_update(trends, trends_to, itemids, itemids_num,
748 				&inserts_num, value_type, table_name, clock);
749 	}
750 
751 	zbx_free(itemids);
752 
753 	/* if 'trends' is not a primary trends buffer */
754 	if (NULL != trends_diff)
755 	{
756 		/* we update it too */
757 		for (i = 0; i < trends_to; i++)
758 		{
759 			zbx_uint64_pair_t	pair;
760 
761 			if (0 == trends[i].itemid)
762 				continue;
763 
764 			if (clock != trends[i].clock || value_type != trends[i].value_type)
765 				continue;
766 
767 			if (0 == trends[i].disable_from || trends[i].disable_from > clock)
768 				continue;
769 
770 			pair.first = trends[i].itemid;
771 			pair.second = clock + SEC_PER_HOUR;
772 			zbx_vector_uint64_pair_append(trends_diff, pair);
773 		}
774 	}
775 
776 	if (0 != inserts_num)
777 		dc_insert_trends_in_db(trends, trends_to, value_type, table_name, clock);
778 
779 	/* clean trends */
780 	for (i = 0, num = 0; i < *trends_num; i++)
781 	{
782 		if (0 == trends[i].itemid)
783 			continue;
784 
785 		memcpy(&trends[num++], &trends[i], sizeof(ZBX_DC_TREND));
786 	}
787 	*trends_num = num;
788 
789 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
790 }
791 
792 /******************************************************************************
793  *                                                                            *
794  * Function: DCflush_trend                                                    *
795  *                                                                            *
796  * Purpose: move trend to the array of trends for flushing to DB              *
797  *                                                                            *
798  * Author: Alexander Vladishev                                                *
799  *                                                                            *
800  ******************************************************************************/
DCflush_trend(ZBX_DC_TREND * trend,ZBX_DC_TREND ** trends,int * trends_alloc,int * trends_num)801 static void	DCflush_trend(ZBX_DC_TREND *trend, ZBX_DC_TREND **trends, int *trends_alloc, int *trends_num)
802 {
803 	if (*trends_num == *trends_alloc)
804 	{
805 		*trends_alloc += 256;
806 		*trends = (ZBX_DC_TREND *)zbx_realloc(*trends, *trends_alloc * sizeof(ZBX_DC_TREND));
807 	}
808 
809 	memcpy(&(*trends)[*trends_num], trend, sizeof(ZBX_DC_TREND));
810 	(*trends_num)++;
811 
812 	trend->clock = 0;
813 	trend->num = 0;
814 	memset(&trend->value_min, 0, sizeof(history_value_t));
815 	memset(&trend->value_avg, 0, sizeof(value_avg_t));
816 	memset(&trend->value_max, 0, sizeof(history_value_t));
817 }
818 
819 /******************************************************************************
820  *                                                                            *
821  * Function: DCadd_trend                                                      *
822  *                                                                            *
823  * Purpose: add new value to the trends                                       *
824  *                                                                            *
825  * Author: Alexander Vladishev                                                *
826  *                                                                            *
827  ******************************************************************************/
DCadd_trend(const ZBX_DC_HISTORY * history,ZBX_DC_TREND ** trends,int * trends_alloc,int * trends_num)828 static void	DCadd_trend(const ZBX_DC_HISTORY *history, ZBX_DC_TREND **trends, int *trends_alloc, int *trends_num)
829 {
830 	ZBX_DC_TREND	*trend = NULL;
831 	int		hour;
832 
833 	hour = history->ts.sec - history->ts.sec % SEC_PER_HOUR;
834 
835 	trend = DCget_trend(history->itemid);
836 
837 	if (trend->num > 0 && (trend->clock != hour || trend->value_type != history->value_type) &&
838 			SUCCEED == zbx_history_requires_trends(trend->value_type))
839 	{
840 		DCflush_trend(trend, trends, trends_alloc, trends_num);
841 	}
842 
843 	trend->value_type = history->value_type;
844 	trend->clock = hour;
845 
846 	switch (trend->value_type)
847 	{
848 		case ITEM_VALUE_TYPE_FLOAT:
849 			if (trend->num == 0 || history->value.dbl < trend->value_min.dbl)
850 				trend->value_min.dbl = history->value.dbl;
851 			if (trend->num == 0 || history->value.dbl > trend->value_max.dbl)
852 				trend->value_max.dbl = history->value.dbl;
853 			trend->value_avg.dbl += history->value.dbl / (trend->num + 1) -
854 					trend->value_avg.dbl / (trend->num + 1);
855 			break;
856 		case ITEM_VALUE_TYPE_UINT64:
857 			if (trend->num == 0 || history->value.ui64 < trend->value_min.ui64)
858 				trend->value_min.ui64 = history->value.ui64;
859 			if (trend->num == 0 || history->value.ui64 > trend->value_max.ui64)
860 				trend->value_max.ui64 = history->value.ui64;
861 			uinc128_64(&trend->value_avg.ui64, history->value.ui64);
862 			break;
863 	}
864 	trend->num++;
865 }
866 
867 /******************************************************************************
868  *                                                                            *
869  * Function: DCmass_update_trends                                             *
870  *                                                                            *
871  * Purpose: update trends cache and get list of trends to flush into database *
872  *                                                                            *
873  * Parameters: history         - [IN]  array of history data                  *
874  *             history_num     - [IN]  number of history structures           *
875  *             trends          - [OUT] list of trends to flush into database  *
876  *             trends_num      - [OUT] number of trends                       *
877  *             compression_age - [IN]  history compression age                *
878  *                                                                            *
879  * Author: Alexander Vladishev                                                *
880  *                                                                            *
881  ******************************************************************************/
DCmass_update_trends(const ZBX_DC_HISTORY * history,int history_num,ZBX_DC_TREND ** trends,int * trends_num,int compression_age)882 static void	DCmass_update_trends(const ZBX_DC_HISTORY *history, int history_num, ZBX_DC_TREND **trends,
883 		int *trends_num, int compression_age)
884 {
885 	static int	last_trend_discard = 0;
886 	zbx_timespec_t	ts;
887 	int		trends_alloc = 0, i, hour, seconds;
888 
889 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
890 
891 	zbx_timespec(&ts);
892 	seconds = ts.sec % SEC_PER_HOUR;
893 	hour = ts.sec - seconds;
894 
895 	LOCK_TRENDS;
896 
897 	for (i = 0; i < history_num; i++)
898 	{
899 		const ZBX_DC_HISTORY	*h = &history[i];
900 
901 		if (0 != (ZBX_DC_FLAGS_NOT_FOR_TRENDS & h->flags))
902 			continue;
903 
904 		DCadd_trend(h, trends, &trends_alloc, trends_num);
905 	}
906 
907 	if (cache->trends_last_cleanup_hour < hour && ZBX_TRENDS_CLEANUP_TIME < seconds)
908 	{
909 		zbx_hashset_iter_t	iter;
910 		ZBX_DC_TREND		*trend;
911 
912 		zbx_hashset_iter_reset(&cache->trends, &iter);
913 
914 		while (NULL != (trend = (ZBX_DC_TREND *)zbx_hashset_iter_next(&iter)))
915 		{
916 			if (trend->clock == hour)
917 				continue;
918 
919 			/* discard trend items that are older than compression age */
920 			if (0 != compression_age && trend->clock < compression_age)
921 			{
922 				if (SEC_PER_HOUR < (ts.sec - last_trend_discard)) /* log once per hour */
923 				{
924 					zabbix_log(LOG_LEVEL_TRACE, "discarding trends that are pointing to"
925 							" compressed history period");
926 					last_trend_discard = ts.sec;
927 				}
928 			}
929 			else if (SUCCEED == zbx_history_requires_trends(trend->value_type))
930 				DCflush_trend(trend, trends, &trends_alloc, trends_num);
931 
932 			zbx_hashset_iter_remove(&iter);
933 		}
934 
935 		cache->trends_last_cleanup_hour = hour;
936 	}
937 
938 	UNLOCK_TRENDS;
939 
940 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
941 }
942 
zbx_trend_compare(const void * d1,const void * d2)943 static int	zbx_trend_compare(const void *d1, const void *d2)
944 {
945 	const ZBX_DC_TREND	*p1 = (const ZBX_DC_TREND *)d1;
946 	const ZBX_DC_TREND	*p2 = (const ZBX_DC_TREND *)d2;
947 
948 	ZBX_RETURN_IF_NOT_EQUAL(p1->itemid, p2->itemid);
949 	ZBX_RETURN_IF_NOT_EQUAL(p1->clock, p2->clock);
950 
951 	return 0;
952 }
953 
954 /******************************************************************************
955  *                                                                            *
956  * Function: DBmass_update_trends                                             *
957  *                                                                            *
958  * Purpose: prepare history data using items from configuration cache         *
959  *                                                                            *
960  * Parameters: trends      - [IN] trends from cache to be added to database   *
961  *             trends_num  - [IN] number of trends to add to database         *
962  *             trends_diff - [OUT] disable_from updates                       *
963  *                                                                            *
964  ******************************************************************************/
DBmass_update_trends(const ZBX_DC_TREND * trends,int trends_num,zbx_vector_uint64_pair_t * trends_diff)965 static void	DBmass_update_trends(const ZBX_DC_TREND *trends, int trends_num,
966 		zbx_vector_uint64_pair_t *trends_diff)
967 {
968 	ZBX_DC_TREND	*trends_tmp;
969 
970 	if (0 != trends_num)
971 	{
972 		trends_tmp = (ZBX_DC_TREND *)zbx_malloc(NULL, trends_num * sizeof(ZBX_DC_TREND));
973 		memcpy(trends_tmp, trends, trends_num * sizeof(ZBX_DC_TREND));
974 		qsort(trends_tmp, trends_num, sizeof(ZBX_DC_TREND), zbx_trend_compare);
975 
976 		while (0 < trends_num)
977 			DBflush_trends(trends_tmp, &trends_num, trends_diff);
978 
979 		zbx_free(trends_tmp);
980 	}
981 }
982 
983 typedef struct
984 {
985 	zbx_uint64_t		hostid;
986 	zbx_vector_ptr_t	groups;
987 }
988 zbx_host_info_t;
989 
990 /******************************************************************************
991  *                                                                            *
992  * Function: zbx_host_info_clean                                              *
993  *                                                                            *
994  * Purpose: frees resources allocated to store host groups names              *
995  *                                                                            *
996  * Parameters: host_info - [IN] host information                              *
997  *                                                                            *
998  ******************************************************************************/
zbx_host_info_clean(zbx_host_info_t * host_info)999 static void	zbx_host_info_clean(zbx_host_info_t *host_info)
1000 {
1001 	zbx_vector_ptr_clear_ext(&host_info->groups, zbx_ptr_free);
1002 	zbx_vector_ptr_destroy(&host_info->groups);
1003 }
1004 
1005 /******************************************************************************
1006  *                                                                            *
1007  * Function: db_get_hosts_info_by_hostid                                      *
1008  *                                                                            *
1009  * Purpose: get hosts groups names                                            *
1010  *                                                                            *
1011  * Parameters: hosts_info - [IN/OUT] output names of host groups for a host   *
1012  *             hostids    - [IN] hosts identifiers                            *
1013  *                                                                            *
1014  ******************************************************************************/
db_get_hosts_info_by_hostid(zbx_hashset_t * hosts_info,const zbx_vector_uint64_t * hostids)1015 static void	db_get_hosts_info_by_hostid(zbx_hashset_t *hosts_info, const zbx_vector_uint64_t *hostids)
1016 {
1017 	int		i;
1018 	size_t		sql_offset = 0;
1019 	DB_RESULT	result;
1020 	DB_ROW		row;
1021 
1022 	for (i = 0; i < hostids->values_num; i++)
1023 	{
1024 		zbx_host_info_t	host_info = {.hostid = hostids->values[i]};
1025 
1026 		zbx_vector_ptr_create(&host_info.groups);
1027 		zbx_hashset_insert(hosts_info, &host_info, sizeof(host_info));
1028 	}
1029 
1030 	zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
1031 				"select distinct hg.hostid,g.name"
1032 				" from hstgrp g,hosts_groups hg"
1033 				" where g.groupid=hg.groupid"
1034 					" and");
1035 
1036 	DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "hg.hostid", hostids->values, hostids->values_num);
1037 
1038 	result = DBselect("%s", sql);
1039 
1040 	while (NULL != (row = DBfetch(result)))
1041 	{
1042 		zbx_uint64_t	hostid;
1043 		zbx_host_info_t	*host_info;
1044 
1045 		ZBX_DBROW2UINT64(hostid, row[0]);
1046 
1047 		if (NULL == (host_info = (zbx_host_info_t *)zbx_hashset_search(hosts_info, &hostid)))
1048 		{
1049 			THIS_SHOULD_NEVER_HAPPEN;
1050 			continue;
1051 		}
1052 
1053 		zbx_vector_ptr_append(&host_info->groups, zbx_strdup(NULL, row[1]));
1054 	}
1055 	DBfree_result(result);
1056 }
1057 
1058 typedef struct
1059 {
1060 	zbx_uint64_t		itemid;
1061 	char			*name;
1062 	DC_ITEM			*item;
1063 	zbx_vector_item_tag_t	item_tags;
1064 }
1065 zbx_item_info_t;
1066 
1067 /******************************************************************************
1068  *                                                                            *
1069  * Function: db_get_items_info_by_itemid                                      *
1070  *                                                                            *
1071  * Purpose: get items name and item tags                                      *
1072  *                                                                            *
1073  * Parameters: items_info - [IN/OUT] output item name and item tags           *
1074  *             itemids    - [IN] the item identifiers                         *
1075  *                                                                            *
1076  ******************************************************************************/
db_get_items_info_by_itemid(zbx_hashset_t * items_info,const zbx_vector_uint64_t * itemids)1077 static void	db_get_items_info_by_itemid(zbx_hashset_t *items_info, const zbx_vector_uint64_t *itemids)
1078 {
1079 	size_t		sql_offset = 0;
1080 	DB_RESULT	result;
1081 	DB_ROW		row;
1082 
1083 	zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select itemid,name from items where");
1084 	DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", itemids->values, itemids->values_num);
1085 
1086 	result = DBselect("%s", sql);
1087 
1088 	while (NULL != (row = DBfetch(result)))
1089 	{
1090 		zbx_uint64_t	itemid;
1091 		zbx_item_info_t	*item_info;
1092 
1093 		ZBX_DBROW2UINT64(itemid, row[0]);
1094 
1095 		if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &itemid)))
1096 		{
1097 			THIS_SHOULD_NEVER_HAPPEN;
1098 			continue;
1099 		}
1100 
1101 		zbx_substitute_item_name_macros(item_info->item, row[1], &item_info->name);
1102 	}
1103 	DBfree_result(result);
1104 
1105 	sql_offset = 0;
1106 	zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
1107 			"select itemid,tag,value from item_tag where");
1108 
1109 	DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", itemids->values, itemids->values_num);
1110 
1111 	result = DBselect("%s", sql);
1112 
1113 	while (NULL != (row = DBfetch(result)))
1114 	{
1115 		zbx_uint64_t	itemid;
1116 		zbx_item_info_t	*item_info;
1117 		zbx_tag_t	item_tag;
1118 
1119 		ZBX_DBROW2UINT64(itemid, row[0]);
1120 
1121 		if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &itemid)))
1122 		{
1123 			THIS_SHOULD_NEVER_HAPPEN;
1124 			continue;
1125 		}
1126 
1127 		item_tag.tag = zbx_strdup(NULL, row[1]);
1128 		item_tag.value = zbx_strdup(NULL, row[2]);
1129 		zbx_vector_item_tag_append(&item_info->item_tags, item_tag);
1130 	}
1131 	DBfree_result(result);
1132 }
1133 
1134 /******************************************************************************
1135  *                                                                            *
1136  * Function: item_tag_free                                                    *
1137  *                                                                            *
1138  * Purpose: frees resources allocated to store item tag                       *
1139  *                                                                            *
1140  * Parameters: item_tag - [IN] item tag                                       *
1141  *                                                                            *
1142  ******************************************************************************/
item_tag_free(zbx_tag_t item_tag)1143 static void	item_tag_free(zbx_tag_t item_tag)
1144 {
1145 	zbx_free(item_tag.tag);
1146 	zbx_free(item_tag.value);
1147 }
1148 
1149 /******************************************************************************
1150  *                                                                            *
1151  * Function: zbx_item_info_clean                                              *
1152  *                                                                            *
1153  * Purpose: frees resources allocated to store item tags and name             *
1154  *                                                                            *
1155  * Parameters: item_info - [IN] item information                              *
1156  *                                                                            *
1157  ******************************************************************************/
zbx_item_info_clean(zbx_item_info_t * item_info)1158 static void	zbx_item_info_clean(zbx_item_info_t *item_info)
1159 {
1160 	zbx_vector_item_tag_clear_ext(&item_info->item_tags, item_tag_free);
1161 	zbx_vector_item_tag_destroy(&item_info->item_tags);
1162 	zbx_free(item_info->name);
1163 }
1164 
1165 /******************************************************************************
1166  *                                                                            *
1167  * Function: DCexport_trends                                                  *
1168  *                                                                            *
1169  * Purpose: export trends                                                     *
1170  *                                                                            *
1171  * Parameters: trends     - [IN] trends from cache                            *
1172  *             trends_num - [IN] number of trends                             *
1173  *             hosts_info - [IN] hosts groups names                           *
1174  *             items_info - [IN] item names and tags                          *
1175  *                                                                            *
1176  ******************************************************************************/
DCexport_trends(const ZBX_DC_TREND * trends,int trends_num,zbx_hashset_t * hosts_info,zbx_hashset_t * items_info)1177 static void	DCexport_trends(const ZBX_DC_TREND *trends, int trends_num, zbx_hashset_t *hosts_info,
1178 		zbx_hashset_t *items_info)
1179 {
1180 	struct zbx_json		json;
1181 	const ZBX_DC_TREND	*trend = NULL;
1182 	int			i, j;
1183 	const DC_ITEM		*item;
1184 	zbx_host_info_t		*host_info;
1185 	zbx_item_info_t		*item_info;
1186 	zbx_uint128_t		avg;	/* calculate the trend average value */
1187 
1188 	zbx_json_init(&json, ZBX_JSON_STAT_BUF_LEN);
1189 
1190 	for (i = 0; i < trends_num; i++)
1191 	{
1192 		trend = &trends[i];
1193 
1194 		if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &trend->itemid)))
1195 			continue;
1196 
1197 		item = item_info->item;
1198 
1199 		if (NULL == (host_info = (zbx_host_info_t *)zbx_hashset_search(hosts_info, &item->host.hostid)))
1200 		{
1201 			THIS_SHOULD_NEVER_HAPPEN;
1202 			continue;
1203 		}
1204 
1205 		zbx_json_clean(&json);
1206 
1207 		zbx_json_addobject(&json,ZBX_PROTO_TAG_HOST);
1208 		zbx_json_addstring(&json, ZBX_PROTO_TAG_HOST, item->host.host, ZBX_JSON_TYPE_STRING);
1209 		zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item->host.name, ZBX_JSON_TYPE_STRING);
1210 		zbx_json_close(&json);
1211 
1212 		zbx_json_addarray(&json, ZBX_PROTO_TAG_GROUPS);
1213 
1214 		for (j = 0; j < host_info->groups.values_num; j++)
1215 			zbx_json_addstring(&json, NULL, host_info->groups.values[j], ZBX_JSON_TYPE_STRING);
1216 
1217 		zbx_json_close(&json);
1218 
1219 		zbx_json_addarray(&json, ZBX_PROTO_TAG_ITEM_TAGS);
1220 
1221 		for (j = 0; j < item_info->item_tags.values_num; j++)
1222 		{
1223 			zbx_tag_t	item_tag = item_info->item_tags.values[j];
1224 
1225 			zbx_json_addobject(&json, NULL);
1226 			zbx_json_addstring(&json, ZBX_PROTO_TAG_TAG, item_tag.tag, ZBX_JSON_TYPE_STRING);
1227 			zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, item_tag.value, ZBX_JSON_TYPE_STRING);
1228 			zbx_json_close(&json);
1229 		}
1230 
1231 		zbx_json_close(&json);
1232 		zbx_json_adduint64(&json, ZBX_PROTO_TAG_ITEMID, item->itemid);
1233 
1234 		if (NULL != item_info->name)
1235 			zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item_info->name, ZBX_JSON_TYPE_STRING);
1236 
1237 		zbx_json_addint64(&json, ZBX_PROTO_TAG_CLOCK, trend->clock);
1238 		zbx_json_addint64(&json, ZBX_PROTO_TAG_COUNT, trend->num);
1239 
1240 		switch (trend->value_type)
1241 		{
1242 			case ITEM_VALUE_TYPE_FLOAT:
1243 				zbx_json_addfloat(&json, ZBX_PROTO_TAG_MIN, trend->value_min.dbl);
1244 				zbx_json_addfloat(&json, ZBX_PROTO_TAG_AVG, trend->value_avg.dbl);
1245 				zbx_json_addfloat(&json, ZBX_PROTO_TAG_MAX, trend->value_max.dbl);
1246 				break;
1247 			case ITEM_VALUE_TYPE_UINT64:
1248 				zbx_json_adduint64(&json, ZBX_PROTO_TAG_MIN, trend->value_min.ui64);
1249 				udiv128_64(&avg, &trend->value_avg.ui64, trend->num);
1250 				zbx_json_adduint64(&json, ZBX_PROTO_TAG_AVG, avg.lo);
1251 				zbx_json_adduint64(&json, ZBX_PROTO_TAG_MAX, trend->value_max.ui64);
1252 				break;
1253 			default:
1254 				THIS_SHOULD_NEVER_HAPPEN;
1255 		}
1256 
1257 		zbx_json_adduint64(&json, ZBX_PROTO_TAG_TYPE, trend->value_type);
1258 		zbx_trends_export_write(json.buffer, json.buffer_size);
1259 	}
1260 
1261 	zbx_trends_export_flush();
1262 	zbx_json_free(&json);
1263 }
1264 
1265 /******************************************************************************
1266  *                                                                            *
1267  * Function: DCexport_history                                                 *
1268  *                                                                            *
1269  * Purpose: export history                                                    *
1270  *                                                                            *
1271  * Parameters: history     - [IN/OUT] array of history data                   *
1272  *             history_num - [IN] number of history structures                *
1273  *             hosts_info  - [IN] hosts groups names                          *
1274  *             items_info  - [IN] item names and tags                         *
1275  *                                                                            *
1276  ******************************************************************************/
DCexport_history(const ZBX_DC_HISTORY * history,int history_num,zbx_hashset_t * hosts_info,zbx_hashset_t * items_info)1277 static void	DCexport_history(const ZBX_DC_HISTORY *history, int history_num, zbx_hashset_t *hosts_info,
1278 		zbx_hashset_t *items_info)
1279 {
1280 	const ZBX_DC_HISTORY	*h;
1281 	const DC_ITEM		*item;
1282 	int			i, j;
1283 	zbx_host_info_t		*host_info;
1284 	zbx_item_info_t		*item_info;
1285 	struct zbx_json		json;
1286 
1287 	zbx_json_init(&json, ZBX_JSON_STAT_BUF_LEN);
1288 
1289 	for (i = 0; i < history_num; i++)
1290 	{
1291 		h = &history[i];
1292 
1293 		if (0 != (ZBX_DC_FLAGS_NOT_FOR_MODULES & h->flags))
1294 			continue;
1295 
1296 		if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &h->itemid)))
1297 		{
1298 			THIS_SHOULD_NEVER_HAPPEN;
1299 			continue;
1300 		}
1301 
1302 		item = item_info->item;
1303 
1304 		if (NULL == (host_info = (zbx_host_info_t *)zbx_hashset_search(hosts_info, &item->host.hostid)))
1305 		{
1306 			THIS_SHOULD_NEVER_HAPPEN;
1307 			continue;
1308 		}
1309 
1310 		zbx_json_clean(&json);
1311 
1312 		zbx_json_addobject(&json,ZBX_PROTO_TAG_HOST);
1313 		zbx_json_addstring(&json, ZBX_PROTO_TAG_HOST, item->host.host, ZBX_JSON_TYPE_STRING);
1314 		zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item->host.name, ZBX_JSON_TYPE_STRING);
1315 		zbx_json_close(&json);
1316 
1317 		zbx_json_addarray(&json, ZBX_PROTO_TAG_GROUPS);
1318 
1319 		for (j = 0; j < host_info->groups.values_num; j++)
1320 			zbx_json_addstring(&json, NULL, host_info->groups.values[j], ZBX_JSON_TYPE_STRING);
1321 
1322 		zbx_json_close(&json);
1323 
1324 		zbx_json_addarray(&json, ZBX_PROTO_TAG_ITEM_TAGS);
1325 
1326 		for (j = 0; j < item_info->item_tags.values_num; j++)
1327 		{
1328 			zbx_tag_t	item_tag = item_info->item_tags.values[j];
1329 
1330 			zbx_json_addobject(&json, NULL);
1331 			zbx_json_addstring(&json, ZBX_PROTO_TAG_TAG, item_tag.tag, ZBX_JSON_TYPE_STRING);
1332 			zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, item_tag.value, ZBX_JSON_TYPE_STRING);
1333 			zbx_json_close(&json);
1334 		}
1335 
1336 		zbx_json_close(&json);
1337 		zbx_json_adduint64(&json, ZBX_PROTO_TAG_ITEMID, item->itemid);
1338 
1339 		if (NULL != item_info->name)
1340 			zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item_info->name, ZBX_JSON_TYPE_STRING);
1341 
1342 		zbx_json_addint64(&json, ZBX_PROTO_TAG_CLOCK, h->ts.sec);
1343 		zbx_json_addint64(&json, ZBX_PROTO_TAG_NS, h->ts.ns);
1344 
1345 		switch (h->value_type)
1346 		{
1347 			case ITEM_VALUE_TYPE_FLOAT:
1348 				zbx_json_addfloat(&json, ZBX_PROTO_TAG_VALUE, h->value.dbl);
1349 				break;
1350 			case ITEM_VALUE_TYPE_UINT64:
1351 				zbx_json_adduint64(&json, ZBX_PROTO_TAG_VALUE, h->value.ui64);
1352 				break;
1353 			case ITEM_VALUE_TYPE_STR:
1354 				zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, h->value.str, ZBX_JSON_TYPE_STRING);
1355 				break;
1356 			case ITEM_VALUE_TYPE_TEXT:
1357 				zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, h->value.str, ZBX_JSON_TYPE_STRING);
1358 				break;
1359 			case ITEM_VALUE_TYPE_LOG:
1360 				zbx_json_addint64(&json, ZBX_PROTO_TAG_LOGTIMESTAMP, h->value.log->timestamp);
1361 				zbx_json_addstring(&json, ZBX_PROTO_TAG_LOGSOURCE,
1362 						ZBX_NULL2EMPTY_STR(h->value.log->source), ZBX_JSON_TYPE_STRING);
1363 				zbx_json_addint64(&json, ZBX_PROTO_TAG_LOGSEVERITY, h->value.log->severity);
1364 				zbx_json_addint64(&json, ZBX_PROTO_TAG_LOGEVENTID, h->value.log->logeventid);
1365 				zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, h->value.log->value,
1366 						ZBX_JSON_TYPE_STRING);
1367 				break;
1368 			default:
1369 				THIS_SHOULD_NEVER_HAPPEN;
1370 		}
1371 
1372 		zbx_json_adduint64(&json, ZBX_PROTO_TAG_TYPE, h->value_type);
1373 		zbx_history_export_write(json.buffer, json.buffer_size);
1374 	}
1375 
1376 	zbx_history_export_flush();
1377 	zbx_json_free(&json);
1378 }
1379 
1380 /******************************************************************************
1381  *                                                                            *
1382  * Function: DCexport_history_and_trends                                      *
1383  *                                                                            *
1384  * Purpose: export history and trends                                         *
1385  *                                                                            *
1386  * Parameters: history     - [IN/OUT] array of history data                   *
1387  *             history_num - [IN] number of history structures                *
1388  *             itemids     - [IN] the item identifiers                        *
1389  *                                (used for item lookup)                      *
1390  *             items       - [IN] the items                                   *
1391  *             errcodes    - [IN] item error codes                            *
1392  *             trends      - [IN] trends from cache                           *
1393  *             trends_num  - [IN] number of trends                            *
1394  *                                                                            *
1395  ******************************************************************************/
DCexport_history_and_trends(const ZBX_DC_HISTORY * history,int history_num,const zbx_vector_uint64_t * itemids,DC_ITEM * items,const int * errcodes,const ZBX_DC_TREND * trends,int trends_num)1396 static void	DCexport_history_and_trends(const ZBX_DC_HISTORY *history, int history_num,
1397 		const zbx_vector_uint64_t *itemids, DC_ITEM *items, const int *errcodes, const ZBX_DC_TREND *trends,
1398 		int trends_num)
1399 {
1400 	int			i, index;
1401 	zbx_vector_uint64_t	hostids, item_info_ids;
1402 	zbx_hashset_t		hosts_info, items_info;
1403 	DC_ITEM			*item;
1404 	zbx_item_info_t		item_info;
1405 
1406 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d trends_num:%d", __func__, history_num, trends_num);
1407 
1408 	zbx_vector_uint64_create(&hostids);
1409 	zbx_vector_uint64_create(&item_info_ids);
1410 	zbx_hashset_create_ext(&items_info, itemids->values_num, ZBX_DEFAULT_UINT64_HASH_FUNC,
1411 			ZBX_DEFAULT_UINT64_COMPARE_FUNC, (zbx_clean_func_t)zbx_item_info_clean,
1412 			ZBX_DEFAULT_MEM_MALLOC_FUNC, ZBX_DEFAULT_MEM_REALLOC_FUNC, ZBX_DEFAULT_MEM_FREE_FUNC);
1413 
1414 	for (i = 0; i < history_num; i++)
1415 	{
1416 		const ZBX_DC_HISTORY	*h = &history[i];
1417 
1418 		if (0 != (ZBX_DC_FLAGS_NOT_FOR_EXPORT & h->flags))
1419 			continue;
1420 
1421 		if (FAIL == (index = zbx_vector_uint64_bsearch(itemids, h->itemid, ZBX_DEFAULT_UINT64_COMPARE_FUNC)))
1422 		{
1423 			THIS_SHOULD_NEVER_HAPPEN;
1424 			continue;
1425 		}
1426 
1427 		if (SUCCEED != errcodes[index])
1428 			continue;
1429 
1430 		item = &items[index];
1431 
1432 		zbx_vector_uint64_append(&hostids, item->host.hostid);
1433 		zbx_vector_uint64_append(&item_info_ids, item->itemid);
1434 
1435 		item_info.itemid = item->itemid;
1436 		item_info.name = NULL;
1437 		item_info.item = item;
1438 		zbx_vector_item_tag_create(&item_info.item_tags);
1439 		zbx_hashset_insert(&items_info, &item_info, sizeof(item_info));
1440 	}
1441 
1442 	if (0 == history_num)
1443 	{
1444 		for (i = 0; i < trends_num; i++)
1445 		{
1446 			const ZBX_DC_TREND	*trend = &trends[i];
1447 
1448 			if (FAIL == (index = zbx_vector_uint64_bsearch(itemids, trend->itemid,
1449 					ZBX_DEFAULT_UINT64_COMPARE_FUNC)))
1450 			{
1451 				THIS_SHOULD_NEVER_HAPPEN;
1452 				continue;
1453 			}
1454 
1455 			if (SUCCEED != errcodes[index])
1456 				continue;
1457 
1458 			item = &items[index];
1459 
1460 			zbx_vector_uint64_append(&hostids, item->host.hostid);
1461 			zbx_vector_uint64_append(&item_info_ids, item->itemid);
1462 
1463 			item_info.itemid = item->itemid;
1464 			item_info.name = NULL;
1465 			item_info.item = item;
1466 			zbx_vector_item_tag_create(&item_info.item_tags);
1467 			zbx_hashset_insert(&items_info, &item_info, sizeof(item_info));
1468 		}
1469 	}
1470 
1471 	if (0 == item_info_ids.values_num)
1472 		goto clean;
1473 
1474 	zbx_vector_uint64_sort(&item_info_ids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1475 	zbx_vector_uint64_sort(&hostids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1476 	zbx_vector_uint64_uniq(&hostids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1477 
1478 	zbx_hashset_create_ext(&hosts_info, hostids.values_num, ZBX_DEFAULT_UINT64_HASH_FUNC,
1479 			ZBX_DEFAULT_UINT64_COMPARE_FUNC, (zbx_clean_func_t)zbx_host_info_clean,
1480 			ZBX_DEFAULT_MEM_MALLOC_FUNC, ZBX_DEFAULT_MEM_REALLOC_FUNC, ZBX_DEFAULT_MEM_FREE_FUNC);
1481 
1482 	db_get_hosts_info_by_hostid(&hosts_info, &hostids);
1483 
1484 	db_get_items_info_by_itemid(&items_info, &item_info_ids);
1485 
1486 	if (0 != history_num)
1487 		DCexport_history(history, history_num, &hosts_info, &items_info);
1488 
1489 	if (0 != trends_num)
1490 		DCexport_trends(trends, trends_num, &hosts_info, &items_info);
1491 
1492 	zbx_hashset_destroy(&hosts_info);
1493 clean:
1494 	zbx_hashset_destroy(&items_info);
1495 	zbx_vector_uint64_destroy(&item_info_ids);
1496 	zbx_vector_uint64_destroy(&hostids);
1497 
1498 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1499 }
1500 
1501 /******************************************************************************
1502  *                                                                            *
1503  * Function: DCexport_all_trends                                              *
1504  *                                                                            *
1505  * Purpose: export all trends                                                 *
1506  *                                                                            *
1507  * Parameters: trends     - [IN] trends from cache                            *
1508  *             trends_num - [IN] number of trends                             *
1509  *                                                                            *
1510  ******************************************************************************/
DCexport_all_trends(const ZBX_DC_TREND * trends,int trends_num)1511 static void	DCexport_all_trends(const ZBX_DC_TREND *trends, int trends_num)
1512 {
1513 	DC_ITEM			*items;
1514 	zbx_vector_uint64_t	itemids;
1515 	int			*errcodes, i, num;
1516 
1517 	zabbix_log(LOG_LEVEL_WARNING, "exporting trend data...");
1518 
1519 	while (0 < trends_num)
1520 	{
1521 		num = MIN(ZBX_HC_SYNC_MAX, trends_num);
1522 
1523 		items = (DC_ITEM *)zbx_malloc(NULL, sizeof(DC_ITEM) * (size_t)num);
1524 		errcodes = (int *)zbx_malloc(NULL, sizeof(int) * (size_t)num);
1525 
1526 		zbx_vector_uint64_create(&itemids);
1527 		zbx_vector_uint64_reserve(&itemids, num);
1528 
1529 		for (i = 0; i < num; i++)
1530 			zbx_vector_uint64_append(&itemids, trends[i].itemid);
1531 
1532 		zbx_vector_uint64_sort(&itemids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1533 
1534 		DCconfig_get_items_by_itemids(items, itemids.values, errcodes, num);
1535 
1536 		DCexport_history_and_trends(NULL, 0, &itemids, items, errcodes, trends, num);
1537 
1538 		DCconfig_clean_items(items, errcodes, num);
1539 		zbx_vector_uint64_destroy(&itemids);
1540 		zbx_free(items);
1541 		zbx_free(errcodes);
1542 
1543 		trends += num;
1544 		trends_num -= num;
1545 	}
1546 
1547 	zabbix_log(LOG_LEVEL_WARNING, "exporting trend data done");
1548 }
1549 
1550 /******************************************************************************
1551  *                                                                            *
1552  * Function: DCsync_trends                                                    *
1553  *                                                                            *
1554  * Purpose: flush all trends to the database                                  *
1555  *                                                                            *
1556  * Author: Alexander Vladishev                                                *
1557  *                                                                            *
1558  ******************************************************************************/
DCsync_trends(void)1559 static void	DCsync_trends(void)
1560 {
1561 	zbx_hashset_iter_t	iter;
1562 	ZBX_DC_TREND		*trends = NULL, *trend;
1563 	int			trends_alloc = 0, trends_num = 0, compression_age;
1564 
1565 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() trends_num:%d", __func__, cache->trends_num);
1566 
1567 	compression_age = hc_get_history_compression_age();
1568 
1569 	zabbix_log(LOG_LEVEL_WARNING, "syncing trend data...");
1570 
1571 	LOCK_TRENDS;
1572 
1573 	zbx_hashset_iter_reset(&cache->trends, &iter);
1574 
1575 	while (NULL != (trend = (ZBX_DC_TREND *)zbx_hashset_iter_next(&iter)))
1576 	{
1577 		if (SUCCEED == zbx_history_requires_trends(trend->value_type) && trend->clock >= compression_age)
1578 			DCflush_trend(trend, &trends, &trends_alloc, &trends_num);
1579 	}
1580 
1581 	UNLOCK_TRENDS;
1582 
1583 	if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_TRENDS) && 0 != trends_num)
1584 		DCexport_all_trends(trends, trends_num);
1585 
1586 	if (0 < trends_num)
1587 		qsort(trends, trends_num, sizeof(ZBX_DC_TREND), zbx_trend_compare);
1588 
1589 	DBbegin();
1590 
1591 	while (trends_num > 0)
1592 		DBflush_trends(trends, &trends_num, NULL);
1593 
1594 	DBcommit();
1595 
1596 	zbx_free(trends);
1597 
1598 	zabbix_log(LOG_LEVEL_WARNING, "syncing trend data done");
1599 
1600 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1601 }
1602 
1603 /******************************************************************************
1604  *                                                                            *
1605  * Function: recalculate_triggers                                             *
1606  *                                                                            *
1607  * Purpose: re-calculate and update values of triggers related to the items   *
1608  *                                                                            *
1609  * Parameters: history           - [IN] array of history data                 *
1610  *             history_num       - [IN] number of history structures          *
1611  *             history_itemids   - [IN] the item identifiers                  *
1612  *                                      (used for item lookup)                *
1613  *             history_items     - [IN] the items                             *
1614  *             history_errcodes  - [IN] item error codes                      *
1615  *             timers            - [IN] the trigger timers                    *
1616  *             trigger_diff      - [OUT] trigger updates                      *
1617  *                                                                            *
1618  ******************************************************************************/
recalculate_triggers(const ZBX_DC_HISTORY * history,int history_num,const zbx_vector_uint64_t * history_itemids,const DC_ITEM * history_items,const int * history_errcodes,const zbx_vector_ptr_t * timers,zbx_vector_ptr_t * trigger_diff)1619 static void	recalculate_triggers(const ZBX_DC_HISTORY *history, int history_num,
1620 		const zbx_vector_uint64_t *history_itemids, const DC_ITEM *history_items, const int *history_errcodes,
1621 		const zbx_vector_ptr_t *timers, zbx_vector_ptr_t *trigger_diff)
1622 {
1623 	int			i, item_num = 0, timers_num = 0;
1624 	zbx_uint64_t		*itemids = NULL;
1625 	zbx_timespec_t		*timespecs = NULL;
1626 	zbx_hashset_t		trigger_info;
1627 	zbx_vector_ptr_t	trigger_order;
1628 	zbx_vector_ptr_t	trigger_items;
1629 
1630 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1631 
1632 	if (0 != history_num)
1633 	{
1634 		itemids = (zbx_uint64_t *)zbx_malloc(itemids, sizeof(zbx_uint64_t) * (size_t)history_num);
1635 		timespecs = (zbx_timespec_t *)zbx_malloc(timespecs, sizeof(zbx_timespec_t) * (size_t)history_num);
1636 
1637 		for (i = 0; i < history_num; i++)
1638 		{
1639 			const ZBX_DC_HISTORY	*h = &history[i];
1640 
1641 			if (0 != (ZBX_DC_FLAG_NOVALUE & h->flags))
1642 				continue;
1643 
1644 			itemids[item_num] = h->itemid;
1645 			timespecs[item_num] = h->ts;
1646 			item_num++;
1647 		}
1648 	}
1649 
1650 	for (i = 0; i < timers->values_num; i++)
1651 	{
1652 		zbx_trigger_timer_t	*timer = (zbx_trigger_timer_t *)timers->values[i];
1653 
1654 		if (0 != timer->lock)
1655 			timers_num++;
1656 	}
1657 
1658 	if (0 == item_num && 0 == timers_num)
1659 		goto out;
1660 
1661 	zbx_hashset_create(&trigger_info, MAX(100, 2 * item_num + timers_num),
1662 			ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1663 
1664 	zbx_vector_ptr_create(&trigger_order);
1665 	zbx_vector_ptr_reserve(&trigger_order, trigger_info.num_slots);
1666 
1667 	zbx_vector_ptr_create(&trigger_items);
1668 
1669 	if (0 != item_num)
1670 	{
1671 		DCconfig_get_triggers_by_itemids(&trigger_info, &trigger_order, itemids, timespecs, item_num);
1672 		prepare_triggers((DC_TRIGGER **)trigger_order.values, trigger_order.values_num);
1673 		zbx_determine_items_in_expressions(&trigger_order, itemids, item_num);
1674 	}
1675 
1676 	if (0 != timers_num)
1677 	{
1678 		int	offset = trigger_order.values_num;
1679 
1680 		zbx_dc_get_triggers_by_timers(&trigger_info, &trigger_order, timers);
1681 
1682 		if (offset != trigger_order.values_num)
1683 		{
1684 			prepare_triggers((DC_TRIGGER **)trigger_order.values + offset,
1685 					trigger_order.values_num - offset);
1686 		}
1687 	}
1688 
1689 	zbx_vector_ptr_sort(&trigger_order, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC);
1690 	evaluate_expressions(&trigger_order, history_itemids, history_items, history_errcodes);
1691 	zbx_process_triggers(&trigger_order, trigger_diff);
1692 
1693 	DCfree_triggers(&trigger_order);
1694 
1695 	zbx_vector_ptr_destroy(&trigger_items);
1696 
1697 	zbx_hashset_destroy(&trigger_info);
1698 	zbx_vector_ptr_destroy(&trigger_order);
1699 out:
1700 	zbx_free(timespecs);
1701 	zbx_free(itemids);
1702 
1703 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1704 }
1705 
DCinventory_value_add(zbx_vector_ptr_t * inventory_values,const DC_ITEM * item,ZBX_DC_HISTORY * h)1706 static void	DCinventory_value_add(zbx_vector_ptr_t *inventory_values, const DC_ITEM *item, ZBX_DC_HISTORY *h)
1707 {
1708 	char			value[MAX_BUFFER_LEN];
1709 	const char		*inventory_field;
1710 	zbx_inventory_value_t	*inventory_value;
1711 
1712 	if (ITEM_STATE_NOTSUPPORTED == h->state)
1713 		return;
1714 
1715 	if (HOST_INVENTORY_AUTOMATIC != item->host.inventory_mode)
1716 		return;
1717 
1718 	if (0 != (ZBX_DC_FLAG_UNDEF & h->flags) || 0 != (ZBX_DC_FLAG_NOVALUE & h->flags) ||
1719 			NULL == (inventory_field = DBget_inventory_field(item->inventory_link)))
1720 	{
1721 		return;
1722 	}
1723 
1724 	switch (h->value_type)
1725 	{
1726 		case ITEM_VALUE_TYPE_FLOAT:
1727 			zbx_print_double(value, sizeof(value), h->value.dbl);
1728 			break;
1729 		case ITEM_VALUE_TYPE_UINT64:
1730 			zbx_snprintf(value, sizeof(value), ZBX_FS_UI64, h->value.ui64);
1731 			break;
1732 		case ITEM_VALUE_TYPE_STR:
1733 		case ITEM_VALUE_TYPE_TEXT:
1734 			strscpy(value, h->value.str);
1735 			break;
1736 		default:
1737 			return;
1738 	}
1739 
1740 	zbx_format_value(value, sizeof(value), item->valuemapid, item->units, h->value_type);
1741 
1742 	inventory_value = (zbx_inventory_value_t *)zbx_malloc(NULL, sizeof(zbx_inventory_value_t));
1743 
1744 	inventory_value->hostid = item->host.hostid;
1745 	inventory_value->idx = item->inventory_link - 1;
1746 	inventory_value->field_name = inventory_field;
1747 	inventory_value->value = zbx_strdup(NULL, value);
1748 
1749 	zbx_vector_ptr_append(inventory_values, inventory_value);
1750 }
1751 
DCadd_update_inventory_sql(size_t * sql_offset,const zbx_vector_ptr_t * inventory_values)1752 static void	DCadd_update_inventory_sql(size_t *sql_offset, const zbx_vector_ptr_t *inventory_values)
1753 {
1754 	char	*value_esc;
1755 	int	i;
1756 
1757 	for (i = 0; i < inventory_values->values_num; i++)
1758 	{
1759 		const zbx_inventory_value_t	*inventory_value = (zbx_inventory_value_t *)inventory_values->values[i];
1760 
1761 		value_esc = DBdyn_escape_field("host_inventory", inventory_value->field_name, inventory_value->value);
1762 
1763 		zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset,
1764 				"update host_inventory set %s='%s' where hostid=" ZBX_FS_UI64 ";\n",
1765 				inventory_value->field_name, value_esc, inventory_value->hostid);
1766 
1767 		DBexecute_overflowed_sql(&sql, &sql_alloc, sql_offset);
1768 
1769 		zbx_free(value_esc);
1770 	}
1771 }
1772 
DCinventory_value_free(zbx_inventory_value_t * inventory_value)1773 static void	DCinventory_value_free(zbx_inventory_value_t *inventory_value)
1774 {
1775 	zbx_free(inventory_value->value);
1776 	zbx_free(inventory_value);
1777 }
1778 
1779 /******************************************************************************
1780  *                                                                            *
1781  * Function: dc_history_clean_value                                           *
1782  *                                                                            *
1783  * Purpose: frees resources allocated to store str/text/log value             *
1784  *                                                                            *
1785  * Parameters: history     - [IN] the history data                            *
1786  *             history_num - [IN] the number of values in history data        *
1787  *                                                                            *
1788  ******************************************************************************/
dc_history_clean_value(ZBX_DC_HISTORY * history)1789 static void	dc_history_clean_value(ZBX_DC_HISTORY *history)
1790 {
1791 	if (ITEM_STATE_NOTSUPPORTED == history->state)
1792 	{
1793 		zbx_free(history->value.err);
1794 		return;
1795 	}
1796 
1797 	if (0 != (ZBX_DC_FLAG_NOVALUE & history->flags))
1798 		return;
1799 
1800 	switch (history->value_type)
1801 	{
1802 		case ITEM_VALUE_TYPE_LOG:
1803 			zbx_free(history->value.log->value);
1804 			zbx_free(history->value.log->source);
1805 			zbx_free(history->value.log);
1806 			break;
1807 		case ITEM_VALUE_TYPE_STR:
1808 		case ITEM_VALUE_TYPE_TEXT:
1809 			zbx_free(history->value.str);
1810 			break;
1811 	}
1812 }
1813 
1814 /******************************************************************************
1815  *                                                                            *
1816  * Function: hc_free_item_values                                              *
1817  *                                                                            *
1818  * Purpose: frees resources allocated to store str/text/log values            *
1819  *                                                                            *
1820  * Parameters: history     - [IN] the history data                            *
1821  *             history_num - [IN] the number of values in history data        *
1822  *                                                                            *
1823  ******************************************************************************/
hc_free_item_values(ZBX_DC_HISTORY * history,int history_num)1824 static void	hc_free_item_values(ZBX_DC_HISTORY *history, int history_num)
1825 {
1826 	int	i;
1827 
1828 	for (i = 0; i < history_num; i++)
1829 		dc_history_clean_value(&history[i]);
1830 }
1831 
1832 /******************************************************************************
1833  *                                                                            *
1834  * Function: dc_history_set_error                                             *
1835  *                                                                            *
1836  * Purpose: sets history data to notsupported                                 *
1837  *                                                                            *
1838  * Parameters: history  - [IN] the history data                               *
1839  *             errmsg   - [IN] the error message                              *
1840  *                                                                            *
1841  * Comments: The error message is stored directly and freed with when history *
1842  *           data is cleaned.                                                 *
1843  *                                                                            *
1844  ******************************************************************************/
dc_history_set_error(ZBX_DC_HISTORY * hdata,char * errmsg)1845 static void	dc_history_set_error(ZBX_DC_HISTORY *hdata, char *errmsg)
1846 {
1847 	dc_history_clean_value(hdata);
1848 	hdata->value.err = errmsg;
1849 	hdata->state = ITEM_STATE_NOTSUPPORTED;
1850 	hdata->flags |= ZBX_DC_FLAG_UNDEF;
1851 }
1852 
1853 /******************************************************************************
1854  *                                                                            *
1855  * Function: dc_history_set_value                                             *
1856  *                                                                            *
1857  * Purpose: sets history data value                                           *
1858  *                                                                            *
1859  * Parameters: hdata      - [IN/OUT] the history data                         *
1860  *             value_type - [IN] the item value type                          *
1861  *             value      - [IN] the value to set                             *
1862  *                                                                            *
1863  ******************************************************************************/
dc_history_set_value(ZBX_DC_HISTORY * hdata,unsigned char value_type,zbx_variant_t * value)1864 static void	dc_history_set_value(ZBX_DC_HISTORY *hdata, unsigned char value_type, zbx_variant_t *value)
1865 {
1866 	char	*errmsg = NULL;
1867 
1868 	if (FAIL == zbx_variant_to_value_type(value, value_type, CONFIG_DOUBLE_PRECISION, &errmsg))
1869 	{
1870 		dc_history_set_error(hdata, errmsg);
1871 		return;
1872 	}
1873 
1874 	switch (value_type)
1875 	{
1876 		case ITEM_VALUE_TYPE_FLOAT:
1877 			dc_history_clean_value(hdata);
1878 			hdata->value.dbl = value->data.dbl;
1879 			break;
1880 		case ITEM_VALUE_TYPE_UINT64:
1881 			dc_history_clean_value(hdata);
1882 			hdata->value.ui64 = value->data.ui64;
1883 			break;
1884 		case ITEM_VALUE_TYPE_STR:
1885 			dc_history_clean_value(hdata);
1886 			hdata->value.str = value->data.str;
1887 			hdata->value.str[zbx_db_strlen_n(hdata->value.str, HISTORY_STR_VALUE_LEN)] = '\0';
1888 			break;
1889 		case ITEM_VALUE_TYPE_TEXT:
1890 			dc_history_clean_value(hdata);
1891 			hdata->value.str = value->data.str;
1892 			hdata->value.str[zbx_db_strlen_n(hdata->value.str, HISTORY_TEXT_VALUE_LEN)] = '\0';
1893 			break;
1894 		case ITEM_VALUE_TYPE_LOG:
1895 			if (ITEM_VALUE_TYPE_LOG != hdata->value_type)
1896 			{
1897 				dc_history_clean_value(hdata);
1898 				hdata->value.log = (zbx_log_value_t *)zbx_malloc(NULL, sizeof(zbx_log_value_t));
1899 				memset(hdata->value.log, 0, sizeof(zbx_log_value_t));
1900 			}
1901 			hdata->value.log->value = value->data.str;
1902 			hdata->value.str[zbx_db_strlen_n(hdata->value.str, HISTORY_LOG_VALUE_LEN)] = '\0';
1903 	}
1904 
1905 	hdata->value_type = value_type;
1906 	zbx_variant_set_none(value);
1907 }
1908 
1909 /******************************************************************************
1910  *                                                                            *
1911  * Function: normalize_item_value                                             *
1912  *                                                                            *
1913  * Purpose: normalize item value by performing truncation of long text        *
1914  *          values and changes value format according to the item value type  *
1915  *                                                                            *
1916  * Parameters: item          - [IN] the item                                  *
1917  *             hdata         - [IN/OUT] the historical data to process        *
1918  *                                                                            *
1919  ******************************************************************************/
normalize_item_value(const DC_ITEM * item,ZBX_DC_HISTORY * hdata)1920 static void	normalize_item_value(const DC_ITEM *item, ZBX_DC_HISTORY *hdata)
1921 {
1922 	char		*logvalue;
1923 	zbx_variant_t	value_var;
1924 
1925 	if (0 != (hdata->flags & ZBX_DC_FLAG_NOVALUE))
1926 		return;
1927 
1928 	if (ITEM_STATE_NOTSUPPORTED == hdata->state)
1929 		return;
1930 
1931 	if (0 == (hdata->flags & ZBX_DC_FLAG_NOHISTORY))
1932 		hdata->ttl = item->history_sec;
1933 
1934 	if (item->value_type == hdata->value_type)
1935 	{
1936 		/* truncate text based values if necessary */
1937 		switch (hdata->value_type)
1938 		{
1939 			case ITEM_VALUE_TYPE_STR:
1940 				hdata->value.str[zbx_db_strlen_n(hdata->value.str, HISTORY_STR_VALUE_LEN)] = '\0';
1941 				break;
1942 			case ITEM_VALUE_TYPE_TEXT:
1943 				hdata->value.str[zbx_db_strlen_n(hdata->value.str, HISTORY_TEXT_VALUE_LEN)] = '\0';
1944 				break;
1945 			case ITEM_VALUE_TYPE_LOG:
1946 				logvalue = hdata->value.log->value;
1947 				logvalue[zbx_db_strlen_n(logvalue, HISTORY_LOG_VALUE_LEN)] = '\0';
1948 				break;
1949 			case ITEM_VALUE_TYPE_FLOAT:
1950 				if (FAIL == zbx_validate_value_dbl(hdata->value.dbl, CONFIG_DOUBLE_PRECISION))
1951 				{
1952 					char	buffer[ZBX_MAX_DOUBLE_LEN + 1];
1953 
1954 					dc_history_set_error(hdata, zbx_dsprintf(NULL,
1955 							"Value %s is too small or too large.",
1956 							zbx_print_double(buffer, sizeof(buffer), hdata->value.dbl)));
1957 				}
1958 				break;
1959 		}
1960 		return;
1961 	}
1962 
1963 	switch (hdata->value_type)
1964 	{
1965 		case ITEM_VALUE_TYPE_FLOAT:
1966 			zbx_variant_set_dbl(&value_var, hdata->value.dbl);
1967 			break;
1968 		case ITEM_VALUE_TYPE_UINT64:
1969 			zbx_variant_set_ui64(&value_var, hdata->value.ui64);
1970 			break;
1971 		case ITEM_VALUE_TYPE_STR:
1972 		case ITEM_VALUE_TYPE_TEXT:
1973 			zbx_variant_set_str(&value_var, hdata->value.str);
1974 			hdata->value.str = NULL;
1975 			break;
1976 		case ITEM_VALUE_TYPE_LOG:
1977 			zbx_variant_set_str(&value_var, hdata->value.log->value);
1978 			hdata->value.log->value = NULL;
1979 			break;
1980 	}
1981 
1982 	dc_history_set_value(hdata, item->value_type, &value_var);
1983 	zbx_variant_clear(&value_var);
1984 }
1985 
1986 /******************************************************************************
1987  *                                                                            *
1988  * Function: calculate_item_update                                            *
1989  *                                                                            *
1990  * Purpose: calculates what item fields must be updated                       *
1991  *                                                                            *
1992  * Parameters: item      - [IN] the item                                      *
1993  *             h         - [IN] the historical data to process                *
1994  *                                                                            *
1995  * Return value: The update data. This data must be freed by the caller.      *
1996  *                                                                            *
1997  * Comments: Will generate internal events when item state switches.          *
1998  *                                                                            *
1999  ******************************************************************************/
calculate_item_update(DC_ITEM * item,const ZBX_DC_HISTORY * h)2000 static zbx_item_diff_t	*calculate_item_update(DC_ITEM *item, const ZBX_DC_HISTORY *h)
2001 {
2002 	zbx_uint64_t	flags = 0;
2003 	const char	*item_error = NULL;
2004 	zbx_item_diff_t	*diff;
2005 
2006 	if (0 != (ZBX_DC_FLAG_META & h->flags))
2007 	{
2008 		if (item->lastlogsize != h->lastlogsize)
2009 			flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE;
2010 
2011 		if (item->mtime != h->mtime)
2012 			flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME;
2013 	}
2014 
2015 	if (h->state != item->state)
2016 	{
2017 		flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE;
2018 
2019 		if (ITEM_STATE_NOTSUPPORTED == h->state)
2020 		{
2021 			zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" became not supported: %s",
2022 					item->host.host, item->key_orig, h->value.str);
2023 
2024 			zbx_add_event(EVENT_SOURCE_INTERNAL, EVENT_OBJECT_ITEM, item->itemid, &h->ts, h->state, NULL,
2025 					NULL, NULL, 0, 0, NULL, 0, NULL, 0, NULL, NULL, h->value.err);
2026 
2027 			if (0 != strcmp(item->error, h->value.err))
2028 				item_error = h->value.err;
2029 		}
2030 		else
2031 		{
2032 			zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" became supported",
2033 					item->host.host, item->key_orig);
2034 
2035 			/* we know it's EVENT_OBJECT_ITEM because LLDRULE that becomes */
2036 			/* supported is handled in lld_process_discovery_rule()        */
2037 			zbx_add_event(EVENT_SOURCE_INTERNAL, EVENT_OBJECT_ITEM, item->itemid, &h->ts, h->state,
2038 					NULL, NULL, NULL, 0, 0, NULL, 0, NULL, 0, NULL, NULL, NULL);
2039 
2040 			item_error = "";
2041 		}
2042 	}
2043 	else if (ITEM_STATE_NOTSUPPORTED == h->state && 0 != strcmp(item->error, h->value.err))
2044 	{
2045 		zabbix_log(LOG_LEVEL_WARNING, "error reason for \"%s:%s\" changed: %s", item->host.host,
2046 				item->key_orig, h->value.err);
2047 
2048 		item_error = h->value.err;
2049 	}
2050 
2051 	if (NULL != item_error)
2052 		flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_ERROR;
2053 
2054 	if (0 == flags)
2055 		return NULL;
2056 
2057 	diff = (zbx_item_diff_t *)zbx_malloc(NULL, sizeof(zbx_item_diff_t));
2058 	diff->itemid = item->itemid;
2059 	diff->flags = flags;
2060 
2061 	if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE & flags))
2062 		diff->lastlogsize = h->lastlogsize;
2063 
2064 	if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME & flags))
2065 		diff->mtime = h->mtime;
2066 
2067 	if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE & flags))
2068 	{
2069 		diff->state = h->state;
2070 		item->state = h->state;
2071 	}
2072 
2073 	if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_ERROR & flags))
2074 		diff->error = item_error;
2075 
2076 	return diff;
2077 }
2078 
2079 /******************************************************************************
2080  *                                                                            *
2081  * Function: DBmass_update_items                                              *
2082  *                                                                            *
2083  * Purpose: update item data and inventory in database                        *
2084  *                                                                            *
2085  * Parameters: item_diff        - item changes                                *
2086  *             inventory_values - inventory values                            *
2087  *                                                                            *
2088  ******************************************************************************/
DBmass_update_items(const zbx_vector_ptr_t * item_diff,const zbx_vector_ptr_t * inventory_values)2089 static void	DBmass_update_items(const zbx_vector_ptr_t *item_diff, const zbx_vector_ptr_t *inventory_values)
2090 {
2091 	size_t	sql_offset = 0;
2092 	int	i;
2093 
2094 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
2095 
2096 	for (i = 0; i < item_diff->values_num; i++)
2097 	{
2098 		zbx_item_diff_t	*diff;
2099 
2100 		diff = (zbx_item_diff_t *)item_diff->values[i];
2101 		if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_DB & diff->flags))
2102 			break;
2103 	}
2104 
2105 	if (i != item_diff->values_num || 0 != inventory_values->values_num)
2106 	{
2107 		DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset);
2108 
2109 		if (i != item_diff->values_num)
2110 		{
2111 			zbx_db_save_item_changes(&sql, &sql_alloc, &sql_offset, item_diff,
2112 					ZBX_FLAGS_ITEM_DIFF_UPDATE_DB);
2113 		}
2114 
2115 		if (0 != inventory_values->values_num)
2116 			DCadd_update_inventory_sql(&sql_offset, inventory_values);
2117 
2118 		DBend_multiple_update(&sql, &sql_alloc, &sql_offset);
2119 
2120 		if (sql_offset > 16)	/* In ORACLE always present begin..end; */
2121 			DBexecute("%s", sql);
2122 
2123 		DCconfig_update_inventory_values(inventory_values);
2124 	}
2125 
2126 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
2127 }
2128 
2129 /******************************************************************************
2130  *                                                                            *
2131  * Function: DCmass_proxy_prepare_itemdiff                                    *
2132  *                                                                            *
2133  * Purpose: prepare itemdiff after receiving new values                       *
2134  *                                                                            *
2135  * Parameters: history     - array of history data                            *
2136  *             history_num - number of history structures                     *
2137  *             item_diff   - vector to store prepared diff                    *
2138  *                                                                            *
2139  ******************************************************************************/
DCmass_proxy_prepare_itemdiff(ZBX_DC_HISTORY * history,int history_num,zbx_vector_ptr_t * item_diff)2140 static void	DCmass_proxy_prepare_itemdiff(ZBX_DC_HISTORY *history, int history_num, zbx_vector_ptr_t *item_diff)
2141 {
2142 	int	i;
2143 
2144 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
2145 
2146 	zbx_vector_ptr_reserve(item_diff, history_num);
2147 
2148 	for (i = 0; i < history_num; i++)
2149 	{
2150 		zbx_item_diff_t	*diff = (zbx_item_diff_t *)zbx_malloc(NULL, sizeof(zbx_item_diff_t));
2151 
2152 		diff->itemid = history[i].itemid;
2153 		diff->state = history[i].state;
2154 		diff->flags = ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE;
2155 
2156 		if (0 != (ZBX_DC_FLAG_META & history[i].flags))
2157 		{
2158 			diff->lastlogsize = history[i].lastlogsize;
2159 			diff->mtime = history[i].mtime;
2160 			diff->flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE | ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME;
2161 		}
2162 
2163 		zbx_vector_ptr_append(item_diff, diff);
2164 	}
2165 
2166 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
2167 }
2168 
2169 /******************************************************************************
2170  *                                                                            *
2171  * Function: DCmass_proxy_update_items                                        *
2172  *                                                                            *
2173  * Purpose: update items info after new value is received                     *
2174  *                                                                            *
2175  * Parameters: item_diff - diff of items to be updated                        *
2176  *                                                                            *
2177  * Author: Alexei Vladishev, Eugene Grigorjev, Alexander Vladishev            *
2178  *                                                                            *
2179  ******************************************************************************/
DBmass_proxy_update_items(zbx_vector_ptr_t * item_diff)2180 static void	DBmass_proxy_update_items(zbx_vector_ptr_t *item_diff)
2181 {
2182 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
2183 
2184 	if (0 != item_diff->values_num)
2185 	{
2186 		size_t	sql_offset = 0;
2187 
2188 		zbx_vector_ptr_sort(item_diff, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC);
2189 
2190 		DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset);
2191 
2192 		zbx_db_save_item_changes(&sql, &sql_alloc, &sql_offset, item_diff,
2193 				ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE | ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME);
2194 
2195 		DBend_multiple_update(&sql, &sql_alloc, &sql_offset);
2196 
2197 		if (sql_offset > 16)	/* In ORACLE always present begin..end; */
2198 			DBexecute("%s", sql);
2199 	}
2200 
2201 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
2202 }
2203 
2204 /******************************************************************************
2205  *                                                                            *
2206  * Function: DBmass_add_history                                               *
2207  *                                                                            *
2208  * Purpose: inserting new history data after new value is received            *
2209  *                                                                            *
2210  * Parameters: history     - array of history data                            *
2211  *             history_num - number of history structures                     *
2212  *                                                                            *
2213  ******************************************************************************/
DBmass_add_history(ZBX_DC_HISTORY * history,int history_num)2214 static int	DBmass_add_history(ZBX_DC_HISTORY *history, int history_num)
2215 {
2216 	int			i, ret = SUCCEED;
2217 	zbx_vector_ptr_t	history_values;
2218 
2219 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
2220 
2221 	zbx_vector_ptr_create(&history_values);
2222 	zbx_vector_ptr_reserve(&history_values, history_num);
2223 
2224 	for (i = 0; i < history_num; i++)
2225 	{
2226 		ZBX_DC_HISTORY	*h = &history[i];
2227 
2228 		if (0 != (ZBX_DC_FLAGS_NOT_FOR_HISTORY & h->flags))
2229 			continue;
2230 
2231 		zbx_vector_ptr_append(&history_values, h);
2232 	}
2233 
2234 	if (0 != history_values.values_num)
2235 		ret = zbx_vc_add_values(&history_values);
2236 
2237 	zbx_vector_ptr_destroy(&history_values);
2238 
2239 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
2240 
2241 	return ret;
2242 }
2243 
2244 /******************************************************************************
2245  *                                                                            *
2246  * Function: dc_add_proxy_history                                             *
2247  *                                                                            *
2248  * Purpose: helper function for DCmass_proxy_add_history()                    *
2249  *                                                                            *
2250  * Comment: this function is meant for items with value_type other other than *
2251  *          ITEM_VALUE_TYPE_LOG not containing meta information in result     *
2252  *                                                                            *
2253  ******************************************************************************/
dc_add_proxy_history(ZBX_DC_HISTORY * history,int history_num)2254 static void	dc_add_proxy_history(ZBX_DC_HISTORY *history, int history_num)
2255 {
2256 	int		i, now;
2257 	unsigned int	flags;
2258 	char		buffer[64], *pvalue;
2259 	zbx_db_insert_t	db_insert;
2260 
2261 	now = (int)time(NULL);
2262 	zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "value", "flags", "write_clock",
2263 			NULL);
2264 
2265 	for (i = 0; i < history_num; i++)
2266 	{
2267 		const ZBX_DC_HISTORY	*h = &history[i];
2268 
2269 		if (0 != (h->flags & ZBX_DC_FLAG_UNDEF))
2270 			continue;
2271 
2272 		if (0 != (h->flags & ZBX_DC_FLAG_META))
2273 			continue;
2274 
2275 		if (ITEM_STATE_NOTSUPPORTED == h->state)
2276 			continue;
2277 
2278 		if (0 == (h->flags & ZBX_DC_FLAG_NOVALUE))
2279 		{
2280 			switch (h->value_type)
2281 			{
2282 				case ITEM_VALUE_TYPE_FLOAT:
2283 					zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_DBL64, h->value.dbl);
2284 					break;
2285 				case ITEM_VALUE_TYPE_UINT64:
2286 					zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_UI64, h->value.ui64);
2287 					break;
2288 				case ITEM_VALUE_TYPE_STR:
2289 				case ITEM_VALUE_TYPE_TEXT:
2290 					pvalue = h->value.str;
2291 					break;
2292 				case ITEM_VALUE_TYPE_LOG:
2293 					continue;
2294 				default:
2295 					THIS_SHOULD_NEVER_HAPPEN;
2296 					continue;
2297 			}
2298 			flags = 0;
2299 		}
2300 		else
2301 		{
2302 			flags = PROXY_HISTORY_FLAG_NOVALUE;
2303 			pvalue = (char *)"";
2304 		}
2305 
2306 		zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, pvalue, flags, now);
2307 	}
2308 
2309 	zbx_db_insert_execute(&db_insert);
2310 	zbx_db_insert_clean(&db_insert);
2311 }
2312 
2313 /******************************************************************************
2314  *                                                                            *
2315  * Function: dc_add_proxy_history_meta                                        *
2316  *                                                                            *
2317  * Purpose: helper function for DCmass_proxy_add_history()                    *
2318  *                                                                            *
2319  * Comment: this function is meant for items with value_type other other than *
2320  *          ITEM_VALUE_TYPE_LOG containing meta information in result         *
2321  *                                                                            *
2322  ******************************************************************************/
dc_add_proxy_history_meta(ZBX_DC_HISTORY * history,int history_num)2323 static void	dc_add_proxy_history_meta(ZBX_DC_HISTORY *history, int history_num)
2324 {
2325 	int		i, now;
2326 	char		buffer[64], *pvalue;
2327 	zbx_db_insert_t	db_insert;
2328 
2329 	now = (int)time(NULL);
2330 	zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "value", "lastlogsize", "mtime",
2331 			"flags", "write_clock", NULL);
2332 
2333 	for (i = 0; i < history_num; i++)
2334 	{
2335 		unsigned int		flags = PROXY_HISTORY_FLAG_META;
2336 		const ZBX_DC_HISTORY	*h = &history[i];
2337 
2338 		if (ITEM_STATE_NOTSUPPORTED == h->state)
2339 			continue;
2340 
2341 		if (0 != (h->flags & ZBX_DC_FLAG_UNDEF))
2342 			continue;
2343 
2344 		if (0 == (h->flags & ZBX_DC_FLAG_META))
2345 			continue;
2346 
2347 		if (ITEM_VALUE_TYPE_LOG == h->value_type)
2348 			continue;
2349 
2350 		if (0 == (h->flags & ZBX_DC_FLAG_NOVALUE))
2351 		{
2352 			switch (h->value_type)
2353 			{
2354 				case ITEM_VALUE_TYPE_FLOAT:
2355 					zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_DBL64, h->value.dbl);
2356 					break;
2357 				case ITEM_VALUE_TYPE_UINT64:
2358 					zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_UI64, h->value.ui64);
2359 					break;
2360 				case ITEM_VALUE_TYPE_STR:
2361 				case ITEM_VALUE_TYPE_TEXT:
2362 					pvalue = h->value.str;
2363 					break;
2364 				default:
2365 					THIS_SHOULD_NEVER_HAPPEN;
2366 					continue;
2367 			}
2368 		}
2369 		else
2370 		{
2371 			flags |= PROXY_HISTORY_FLAG_NOVALUE;
2372 			pvalue = (char *)"";
2373 		}
2374 
2375 		zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, pvalue, h->lastlogsize, h->mtime,
2376 				flags, now);
2377 	}
2378 
2379 	zbx_db_insert_execute(&db_insert);
2380 	zbx_db_insert_clean(&db_insert);
2381 }
2382 
2383 /******************************************************************************
2384  *                                                                            *
2385  * Function: dc_add_proxy_history_log                                         *
2386  *                                                                            *
2387  * Purpose: helper function for DCmass_proxy_add_history()                    *
2388  *                                                                            *
2389  * Comment: this function is meant for items with value_type                  *
2390  *          ITEM_VALUE_TYPE_LOG                                               *
2391  *                                                                            *
2392  ******************************************************************************/
dc_add_proxy_history_log(ZBX_DC_HISTORY * history,int history_num)2393 static void	dc_add_proxy_history_log(ZBX_DC_HISTORY *history, int history_num)
2394 {
2395 	int		i, now;
2396 	zbx_db_insert_t	db_insert;
2397 
2398 	now = (int)time(NULL);
2399 
2400 	/* see hc_copy_history_data() for fields that might be uninitialized and need special handling here */
2401 	zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "timestamp", "source", "severity",
2402 			"value", "logeventid", "lastlogsize", "mtime", "flags", "write_clock", NULL);
2403 
2404 	for (i = 0; i < history_num; i++)
2405 	{
2406 		unsigned int		flags;
2407 		zbx_uint64_t		lastlogsize;
2408 		int			mtime;
2409 		const ZBX_DC_HISTORY	*h = &history[i];
2410 
2411 		if (ITEM_STATE_NOTSUPPORTED == h->state)
2412 			continue;
2413 
2414 		if (ITEM_VALUE_TYPE_LOG != h->value_type)
2415 			continue;
2416 
2417 		if (0 == (h->flags & ZBX_DC_FLAG_NOVALUE))
2418 		{
2419 			zbx_log_value_t *log = h->value.log;
2420 
2421 			if (0 != (h->flags & ZBX_DC_FLAG_META))
2422 			{
2423 				flags = PROXY_HISTORY_FLAG_META;
2424 				lastlogsize = h->lastlogsize;
2425 				mtime = h->mtime;
2426 			}
2427 			else
2428 			{
2429 				flags = 0;
2430 				lastlogsize = 0;
2431 				mtime = 0;
2432 			}
2433 
2434 			zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, log->timestamp,
2435 					ZBX_NULL2EMPTY_STR(log->source), log->severity, log->value, log->logeventid,
2436 					lastlogsize, mtime, flags, now);
2437 		}
2438 		else
2439 		{
2440 			/* sent to server only if not 0, see proxy_get_history_data() */
2441 			const int	unset_if_novalue = 0;
2442 
2443 			flags = PROXY_HISTORY_FLAG_META | PROXY_HISTORY_FLAG_NOVALUE;
2444 
2445 			zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, unset_if_novalue, "",
2446 					unset_if_novalue, "", unset_if_novalue, h->lastlogsize, h->mtime, flags, now);
2447 		}
2448 	}
2449 
2450 	zbx_db_insert_execute(&db_insert);
2451 	zbx_db_insert_clean(&db_insert);
2452 }
2453 
2454 /******************************************************************************
2455  *                                                                            *
2456  * Function: dc_add_proxy_history_notsupported                                *
2457  *                                                                            *
2458  * Purpose: helper function for DCmass_proxy_add_history()                    *
2459  *                                                                            *
2460  ******************************************************************************/
dc_add_proxy_history_notsupported(ZBX_DC_HISTORY * history,int history_num)2461 static void	dc_add_proxy_history_notsupported(ZBX_DC_HISTORY *history, int history_num)
2462 {
2463 	int		i, now;
2464 	zbx_db_insert_t	db_insert;
2465 
2466 	now = (int)time(NULL);
2467 	zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "value", "state", "write_clock",
2468 			NULL);
2469 
2470 	for (i = 0; i < history_num; i++)
2471 	{
2472 		const ZBX_DC_HISTORY	*h = &history[i];
2473 
2474 		if (ITEM_STATE_NOTSUPPORTED != h->state)
2475 			continue;
2476 
2477 		zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, ZBX_NULL2EMPTY_STR(h->value.err),
2478 				(int)h->state, now);
2479 	}
2480 
2481 	zbx_db_insert_execute(&db_insert);
2482 	zbx_db_insert_clean(&db_insert);
2483 }
2484 
2485 /******************************************************************************
2486  *                                                                            *
2487  * Function: DCmass_proxy_add_history                                         *
2488  *                                                                            *
2489  * Purpose: inserting new history data after new value is received            *
2490  *                                                                            *
2491  * Parameters: history     - array of history data                            *
2492  *             history_num - number of history structures                     *
2493  *                                                                            *
2494  * Author: Alexander Vladishev                                                *
2495  *                                                                            *
2496  ******************************************************************************/
DBmass_proxy_add_history(ZBX_DC_HISTORY * history,int history_num)2497 static void	DBmass_proxy_add_history(ZBX_DC_HISTORY *history, int history_num)
2498 {
2499 	int	i, h_num = 0, h_meta_num = 0, hlog_num = 0, notsupported_num = 0;
2500 
2501 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
2502 
2503 	for (i = 0; i < history_num; i++)
2504 	{
2505 		const ZBX_DC_HISTORY	*h = &history[i];
2506 
2507 		if (ITEM_STATE_NOTSUPPORTED == h->state)
2508 		{
2509 			notsupported_num++;
2510 			continue;
2511 		}
2512 
2513 		switch (h->value_type)
2514 		{
2515 			case ITEM_VALUE_TYPE_LOG:
2516 				hlog_num++;
2517 				break;
2518 			case ITEM_VALUE_TYPE_FLOAT:
2519 			case ITEM_VALUE_TYPE_UINT64:
2520 			case ITEM_VALUE_TYPE_STR:
2521 			case ITEM_VALUE_TYPE_TEXT:
2522 				if (0 != (h->flags & ZBX_DC_FLAG_META))
2523 					h_meta_num++;
2524 				else
2525 					h_num++;
2526 				break;
2527 			case ITEM_VALUE_TYPE_NONE:
2528 				h_num++;
2529 				break;
2530 			default:
2531 				THIS_SHOULD_NEVER_HAPPEN;
2532 		}
2533 	}
2534 
2535 	if (0 != h_num)
2536 		dc_add_proxy_history(history, history_num);
2537 
2538 	if (0 != h_meta_num)
2539 		dc_add_proxy_history_meta(history, history_num);
2540 
2541 	if (0 != hlog_num)
2542 		dc_add_proxy_history_log(history, history_num);
2543 
2544 	if (0 != notsupported_num)
2545 		dc_add_proxy_history_notsupported(history, history_num);
2546 
2547 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
2548 }
2549 
2550 /******************************************************************************
2551  *                                                                            *
2552  * Function: DCmass_prepare_history                                           *
2553  *                                                                            *
2554  * Purpose: prepare history data using items from configuration cache and     *
2555  *          generate item changes to be applied and host inventory values to  *
2556  *          be added                                                          *
2557  *                                                                            *
2558  * Parameters: history             - [IN/OUT] array of history data           *
2559  *             itemids             - [IN] the item identifiers                *
2560  *                                        (used for item lookup)              *
2561  *             items               - [IN] the items                           *
2562  *             errcodes            - [IN] item error codes                    *
2563  *             history_num         - [IN] number of history structures        *
2564  *             item_diff           - [OUT] the changes in item data           *
2565  *             inventory_values    - [OUT] the inventory values to add        *
2566  *             compression_age     - [IN] history compression age             *
2567  *             proxy_subscribtions - [IN] history compression age             *
2568  *                                                                            *
2569  ******************************************************************************/
DCmass_prepare_history(ZBX_DC_HISTORY * history,const zbx_vector_uint64_t * itemids,DC_ITEM * items,const int * errcodes,int history_num,zbx_vector_ptr_t * item_diff,zbx_vector_ptr_t * inventory_values,int compression_age,zbx_vector_uint64_pair_t * proxy_subscribtions)2570 static void	DCmass_prepare_history(ZBX_DC_HISTORY *history, const zbx_vector_uint64_t *itemids,
2571 		DC_ITEM *items, const int *errcodes, int history_num, zbx_vector_ptr_t *item_diff,
2572 		zbx_vector_ptr_t *inventory_values, int compression_age, zbx_vector_uint64_pair_t *proxy_subscribtions)
2573 {
2574 	static time_t	last_history_discard = 0;
2575 	time_t		now;
2576 	int		i;
2577 
2578 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d", __func__, history_num);
2579 
2580 	now = time(NULL);
2581 
2582 	for (i = 0; i < history_num; i++)
2583 	{
2584 		ZBX_DC_HISTORY	*h = &history[i];
2585 		DC_ITEM		*item;
2586 		zbx_item_diff_t	*diff;
2587 		int		index;
2588 
2589 		/* discard history items that are older than compression age */
2590 		if (0 != compression_age && h->ts.sec < compression_age)
2591 		{
2592 			if (SEC_PER_HOUR < (now - last_history_discard)) /* log once per hour */
2593 			{
2594 				zabbix_log(LOG_LEVEL_TRACE, "discarding history that is pointing to"
2595 							" compressed history period");
2596 				last_history_discard = now;
2597 			}
2598 
2599 			h->flags |= ZBX_DC_FLAG_UNDEF;
2600 			continue;
2601 		}
2602 
2603 		if (FAIL == (index = zbx_vector_uint64_bsearch(itemids, h->itemid, ZBX_DEFAULT_UINT64_COMPARE_FUNC)))
2604 		{
2605 			THIS_SHOULD_NEVER_HAPPEN;
2606 			h->flags |= ZBX_DC_FLAG_UNDEF;
2607 			continue;
2608 		}
2609 
2610 		if (SUCCEED != errcodes[index])
2611 		{
2612 			h->flags |= ZBX_DC_FLAG_UNDEF;
2613 			continue;
2614 		}
2615 
2616 		item = &items[index];
2617 
2618 		if (ITEM_STATUS_ACTIVE != item->status || HOST_STATUS_MONITORED != item->host.status)
2619 		{
2620 			h->flags |= ZBX_DC_FLAG_UNDEF;
2621 			continue;
2622 		}
2623 
2624 		if (0 == item->history)
2625 		{
2626 			h->flags |= ZBX_DC_FLAG_NOHISTORY;
2627 		}
2628 		else if (now - h->ts.sec > item->history_sec)
2629 		{
2630 			h->flags |= ZBX_DC_FLAG_NOHISTORY;
2631 			zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" value timestamp \"%s %s\" is outside history "
2632 					"storage period", item->host.host, item->key_orig,
2633 					zbx_date2str(h->ts.sec, NULL), zbx_time2str(h->ts.sec, NULL));
2634 		}
2635 
2636 		if (ITEM_VALUE_TYPE_FLOAT == item->value_type || ITEM_VALUE_TYPE_UINT64 == item->value_type)
2637 		{
2638 			if (0 == item->trends)
2639 			{
2640 				h->flags |= ZBX_DC_FLAG_NOTRENDS;
2641 			}
2642 			else if (now - h->ts.sec > item->trends_sec)
2643 			{
2644 				h->flags |= ZBX_DC_FLAG_NOTRENDS;
2645 				zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" value timestamp \"%s %s\" is outside "
2646 						"trends storage period", item->host.host, item->key_orig,
2647 						zbx_date2str(h->ts.sec, NULL), zbx_time2str(h->ts.sec, NULL));
2648 			}
2649 		}
2650 		else
2651 			h->flags |= ZBX_DC_FLAG_NOTRENDS;
2652 
2653 		normalize_item_value(item, h);
2654 
2655 		/* calculate item update and update already retrieved item status for trigger calculation */
2656 		if (NULL != (diff = calculate_item_update(item, h)))
2657 			zbx_vector_ptr_append(item_diff, diff);
2658 
2659 		DCinventory_value_add(inventory_values, item, h);
2660 
2661 		if (0 != item->host.proxy_hostid && FAIL == is_item_processed_by_server(item->type, item->key_orig))
2662 		{
2663 			zbx_uint64_pair_t	p = {item->host.proxy_hostid, h->ts.sec};
2664 
2665 			zbx_vector_uint64_pair_append(proxy_subscribtions, p);
2666 		}
2667 	}
2668 
2669 	zbx_vector_ptr_sort(inventory_values, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC);
2670 	zbx_vector_ptr_sort(item_diff, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC);
2671 
2672 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
2673 }
2674 
2675 /******************************************************************************
2676  *                                                                            *
2677  * Function: DCmodule_prepare_history                                         *
2678  *                                                                            *
2679  * Purpose: prepare history data to share them with loadable modules, sort    *
2680  *          data by type skipping low-level discovery data, meta information  *
2681  *          updates and notsupported items                                    *
2682  *                                                                            *
2683  * Parameters: history            - [IN] array of history data                *
2684  *             history_num        - [IN] number of history structures         *
2685  *             history_<type>     - [OUT] array of historical data of a       *
2686  *                                  specific data type                        *
2687  *             history_<type>_num - [OUT] number of values of a specific      *
2688  *                                  data type                                 *
2689  *                                                                            *
2690  ******************************************************************************/
DCmodule_prepare_history(ZBX_DC_HISTORY * history,int history_num,ZBX_HISTORY_FLOAT * history_float,int * history_float_num,ZBX_HISTORY_INTEGER * history_integer,int * history_integer_num,ZBX_HISTORY_STRING * history_string,int * history_string_num,ZBX_HISTORY_TEXT * history_text,int * history_text_num,ZBX_HISTORY_LOG * history_log,int * history_log_num)2691 static void	DCmodule_prepare_history(ZBX_DC_HISTORY *history, int history_num, ZBX_HISTORY_FLOAT *history_float,
2692 		int *history_float_num, ZBX_HISTORY_INTEGER *history_integer, int *history_integer_num,
2693 		ZBX_HISTORY_STRING *history_string, int *history_string_num, ZBX_HISTORY_TEXT *history_text,
2694 		int *history_text_num, ZBX_HISTORY_LOG *history_log, int *history_log_num)
2695 {
2696 	ZBX_DC_HISTORY		*h;
2697 	ZBX_HISTORY_FLOAT	*h_float;
2698 	ZBX_HISTORY_INTEGER	*h_integer;
2699 	ZBX_HISTORY_STRING	*h_string;
2700 	ZBX_HISTORY_TEXT	*h_text;
2701 	ZBX_HISTORY_LOG		*h_log;
2702 	int			i;
2703 	const zbx_log_value_t	*log;
2704 
2705 	*history_float_num = 0;
2706 	*history_integer_num = 0;
2707 	*history_string_num = 0;
2708 	*history_text_num = 0;
2709 	*history_log_num = 0;
2710 
2711 	for (i = 0; i < history_num; i++)
2712 	{
2713 		h = &history[i];
2714 
2715 		if (0 != (ZBX_DC_FLAGS_NOT_FOR_MODULES & h->flags))
2716 			continue;
2717 
2718 		switch (h->value_type)
2719 		{
2720 			case ITEM_VALUE_TYPE_FLOAT:
2721 				if (NULL == history_float_cbs)
2722 					continue;
2723 
2724 				h_float = &history_float[(*history_float_num)++];
2725 				h_float->itemid = h->itemid;
2726 				h_float->clock = h->ts.sec;
2727 				h_float->ns = h->ts.ns;
2728 				h_float->value = h->value.dbl;
2729 				break;
2730 			case ITEM_VALUE_TYPE_UINT64:
2731 				if (NULL == history_integer_cbs)
2732 					continue;
2733 
2734 				h_integer = &history_integer[(*history_integer_num)++];
2735 				h_integer->itemid = h->itemid;
2736 				h_integer->clock = h->ts.sec;
2737 				h_integer->ns = h->ts.ns;
2738 				h_integer->value = h->value.ui64;
2739 				break;
2740 			case ITEM_VALUE_TYPE_STR:
2741 				if (NULL == history_string_cbs)
2742 					continue;
2743 
2744 				h_string = &history_string[(*history_string_num)++];
2745 				h_string->itemid = h->itemid;
2746 				h_string->clock = h->ts.sec;
2747 				h_string->ns = h->ts.ns;
2748 				h_string->value = h->value.str;
2749 				break;
2750 			case ITEM_VALUE_TYPE_TEXT:
2751 				if (NULL == history_text_cbs)
2752 					continue;
2753 
2754 				h_text = &history_text[(*history_text_num)++];
2755 				h_text->itemid = h->itemid;
2756 				h_text->clock = h->ts.sec;
2757 				h_text->ns = h->ts.ns;
2758 				h_text->value = h->value.str;
2759 				break;
2760 			case ITEM_VALUE_TYPE_LOG:
2761 				if (NULL == history_log_cbs)
2762 					continue;
2763 
2764 				log = h->value.log;
2765 				h_log = &history_log[(*history_log_num)++];
2766 				h_log->itemid = h->itemid;
2767 				h_log->clock = h->ts.sec;
2768 				h_log->ns = h->ts.ns;
2769 				h_log->value = log->value;
2770 				h_log->source = ZBX_NULL2EMPTY_STR(log->source);
2771 				h_log->timestamp = log->timestamp;
2772 				h_log->logeventid = log->logeventid;
2773 				h_log->severity = log->severity;
2774 				break;
2775 			default:
2776 				THIS_SHOULD_NEVER_HAPPEN;
2777 		}
2778 	}
2779 }
2780 
DCmodule_sync_history(int history_float_num,int history_integer_num,int history_string_num,int history_text_num,int history_log_num,ZBX_HISTORY_FLOAT * history_float,ZBX_HISTORY_INTEGER * history_integer,ZBX_HISTORY_STRING * history_string,ZBX_HISTORY_TEXT * history_text,ZBX_HISTORY_LOG * history_log)2781 static void	DCmodule_sync_history(int history_float_num, int history_integer_num, int history_string_num,
2782 		int history_text_num, int history_log_num, ZBX_HISTORY_FLOAT *history_float,
2783 		ZBX_HISTORY_INTEGER *history_integer, ZBX_HISTORY_STRING *history_string,
2784 		ZBX_HISTORY_TEXT *history_text, ZBX_HISTORY_LOG *history_log)
2785 {
2786 	if (0 != history_float_num)
2787 	{
2788 		int	i;
2789 
2790 		zabbix_log(LOG_LEVEL_DEBUG, "syncing float history data with modules...");
2791 
2792 		for (i = 0; NULL != history_float_cbs[i].module; i++)
2793 		{
2794 			zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_float_cbs[i].module->name);
2795 			history_float_cbs[i].history_float_cb(history_float, history_float_num);
2796 		}
2797 
2798 		zabbix_log(LOG_LEVEL_DEBUG, "synced %d float values with modules", history_float_num);
2799 	}
2800 
2801 	if (0 != history_integer_num)
2802 	{
2803 		int	i;
2804 
2805 		zabbix_log(LOG_LEVEL_DEBUG, "syncing integer history data with modules...");
2806 
2807 		for (i = 0; NULL != history_integer_cbs[i].module; i++)
2808 		{
2809 			zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_integer_cbs[i].module->name);
2810 			history_integer_cbs[i].history_integer_cb(history_integer, history_integer_num);
2811 		}
2812 
2813 		zabbix_log(LOG_LEVEL_DEBUG, "synced %d integer values with modules", history_integer_num);
2814 	}
2815 
2816 	if (0 != history_string_num)
2817 	{
2818 		int	i;
2819 
2820 		zabbix_log(LOG_LEVEL_DEBUG, "syncing string history data with modules...");
2821 
2822 		for (i = 0; NULL != history_string_cbs[i].module; i++)
2823 		{
2824 			zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_string_cbs[i].module->name);
2825 			history_string_cbs[i].history_string_cb(history_string, history_string_num);
2826 		}
2827 
2828 		zabbix_log(LOG_LEVEL_DEBUG, "synced %d string values with modules", history_string_num);
2829 	}
2830 
2831 	if (0 != history_text_num)
2832 	{
2833 		int	i;
2834 
2835 		zabbix_log(LOG_LEVEL_DEBUG, "syncing text history data with modules...");
2836 
2837 		for (i = 0; NULL != history_text_cbs[i].module; i++)
2838 		{
2839 			zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_text_cbs[i].module->name);
2840 			history_text_cbs[i].history_text_cb(history_text, history_text_num);
2841 		}
2842 
2843 		zabbix_log(LOG_LEVEL_DEBUG, "synced %d text values with modules", history_text_num);
2844 	}
2845 
2846 	if (0 != history_log_num)
2847 	{
2848 		int	i;
2849 
2850 		zabbix_log(LOG_LEVEL_DEBUG, "syncing log history data with modules...");
2851 
2852 		for (i = 0; NULL != history_log_cbs[i].module; i++)
2853 		{
2854 			zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_log_cbs[i].module->name);
2855 			history_log_cbs[i].history_log_cb(history_log, history_log_num);
2856 		}
2857 
2858 		zabbix_log(LOG_LEVEL_DEBUG, "synced %d log values with modules", history_log_num);
2859 	}
2860 }
2861 
2862 /******************************************************************************
2863  *                                                                            *
2864  * Function: proxy_prepare_history                                            *
2865  *                                                                            *
2866  * Purpose: prepares history update by checking which values must be stored   *
2867  *                                                                            *
2868  * Parameters: history     - [IN/OUT] the history values                      *
2869  *             history_num - [IN] the number of history values                *
2870  *                                                                            *
2871  ******************************************************************************/
proxy_prepare_history(ZBX_DC_HISTORY * history,int history_num)2872 static void	proxy_prepare_history(ZBX_DC_HISTORY *history, int history_num)
2873 {
2874 	int			i, *errcodes;
2875 	DC_ITEM			*items;
2876 	zbx_vector_uint64_t	itemids;
2877 
2878 	zbx_vector_uint64_create(&itemids);
2879 	zbx_vector_uint64_reserve(&itemids, history_num);
2880 
2881 	for (i = 0; i < history_num; i++)
2882 		zbx_vector_uint64_append(&itemids, history[i].itemid);
2883 
2884 	items = (DC_ITEM *)zbx_malloc(NULL, sizeof(DC_ITEM) * (size_t)history_num);
2885 	errcodes = (int *)zbx_malloc(NULL, sizeof(int) * (size_t)history_num);
2886 
2887 	DCconfig_get_items_by_itemids(items, itemids.values, errcodes, itemids.values_num);
2888 
2889 	for (i = 0; i < history_num; i++)
2890 	{
2891 		if (SUCCEED != errcodes[i])
2892 			continue;
2893 
2894 		/* store items with enabled history  */
2895 		if (0 != items[i].history)
2896 			continue;
2897 
2898 		/* store numeric items to handle data conversion errors on server and trends */
2899 		if (ITEM_VALUE_TYPE_FLOAT == items[i].value_type || ITEM_VALUE_TYPE_UINT64 == items[i].value_type)
2900 			continue;
2901 
2902 		/* store discovery rules */
2903 		if (0 != (items[i].flags & ZBX_FLAG_DISCOVERY_RULE))
2904 			continue;
2905 
2906 		/* store errors or first value after an error */
2907 		if (ITEM_STATE_NOTSUPPORTED == history[i].state || ITEM_STATE_NOTSUPPORTED == items[i].state)
2908 			continue;
2909 
2910 		/* store items linked to host inventory */
2911 		if (0 != items[i].inventory_link)
2912 			continue;
2913 
2914 		dc_history_clean_value(history + i);
2915 
2916 		/* all checks passed, item value must not be stored in proxy history/sent to server */
2917 		history[i].flags |= ZBX_DC_FLAG_NOVALUE;
2918 	}
2919 
2920 	DCconfig_clean_items(items, errcodes, history_num);
2921 	zbx_free(items);
2922 	zbx_free(errcodes);
2923 	zbx_vector_uint64_destroy(&itemids);
2924 }
2925 
sync_proxy_history(int * total_num,int * more)2926 static void	sync_proxy_history(int *total_num, int *more)
2927 {
2928 	int			history_num, txn_rc;
2929 	time_t			sync_start;
2930 	zbx_vector_ptr_t	history_items;
2931 	zbx_vector_ptr_t	item_diff;
2932 	ZBX_DC_HISTORY		history[ZBX_HC_SYNC_MAX];
2933 
2934 	zbx_vector_ptr_create(&history_items);
2935 	zbx_vector_ptr_reserve(&history_items, ZBX_HC_SYNC_MAX);
2936 	zbx_vector_ptr_create(&item_diff);
2937 
2938 	sync_start = time(NULL);
2939 
2940 	do
2941 	{
2942 		*more = ZBX_SYNC_DONE;
2943 
2944 		LOCK_CACHE;
2945 
2946 		hc_pop_items(&history_items);		/* select and take items out of history cache */
2947 		history_num = history_items.values_num;
2948 
2949 		UNLOCK_CACHE;
2950 
2951 		if (0 == history_num)
2952 			break;
2953 
2954 		hc_get_item_values(history, &history_items);	/* copy item data from history cache */
2955 		proxy_prepare_history(history, history_items.values_num);
2956 
2957 		DCmass_proxy_prepare_itemdiff(history, history_num, &item_diff);
2958 
2959 		do
2960 		{
2961 			DBbegin();
2962 
2963 			DBmass_proxy_add_history(history, history_num);
2964 			DBmass_proxy_update_items(&item_diff);
2965 		}
2966 		while (ZBX_DB_DOWN == (txn_rc = DBcommit()));
2967 
2968 		LOCK_CACHE;
2969 
2970 		hc_push_items(&history_items);	/* return items to history cache */
2971 
2972 		if (ZBX_DB_FAIL != txn_rc)
2973 		{
2974 			if (0 != item_diff.values_num)
2975 				DCconfig_items_apply_changes(&item_diff);
2976 
2977 			cache->history_num -= history_num;
2978 
2979 			if (0 != hc_queue_get_size())
2980 				*more = ZBX_SYNC_MORE;
2981 
2982 			UNLOCK_CACHE;
2983 
2984 			*total_num += history_num;
2985 
2986 			hc_free_item_values(history, history_num);
2987 		}
2988 		else
2989 		{
2990 			*more = ZBX_SYNC_MORE;
2991 			UNLOCK_CACHE;
2992 		}
2993 
2994 		zbx_vector_ptr_clear(&history_items);
2995 		zbx_vector_ptr_clear_ext(&item_diff, zbx_default_mem_free_func);
2996 
2997 		/* Exit from sync loop if we have spent too much time here */
2998 		/* unless we are doing full sync. This is done to allow    */
2999 		/* syncer process to update their statistics.              */
3000 	}
3001 	while (ZBX_SYNC_MORE == *more && ZBX_HC_SYNC_TIME_MAX >= time(NULL) - sync_start);
3002 
3003 	zbx_vector_ptr_destroy(&item_diff);
3004 	zbx_vector_ptr_destroy(&history_items);
3005 }
3006 
3007 /******************************************************************************
3008  *                                                                            *
3009  * Function: sync_server_history                                              *
3010  *                                                                            *
3011  * Purpose: flush history cache to database, process triggers of flushed      *
3012  *          and timer triggers from timer queue                               *
3013  *                                                                            *
3014  * Parameters: sync_timeout - [IN] the timeout in seconds                     *
3015  *             values_num   - [IN/OUT] the number of synced values            *
3016  *             triggers_num - [IN/OUT] the number of processed timers         *
3017  *             more         - [OUT] a flag indicating the cache emptiness:    *
3018  *                               ZBX_SYNC_DONE - nothing to sync, go idle     *
3019  *                               ZBX_SYNC_MORE - more data to sync            *
3020  *                                                                            *
3021  * Comments: This function loops syncing history values by 1k batches and     *
3022  *           processing timer triggers by batches of 500 triggers.            *
3023  *           Unless full sync is being done the loop is aborted if either     *
3024  *           timeout has passed or there are no more data to process.         *
3025  *           The last is assumed when the following is true:                  *
3026  *            a) history cache is empty or less than 10% of batch values were *
3027  *               processed (the other items were locked by triggers)          *
3028  *            b) less than 500 (full batch) timer triggers were processed     *
3029  *                                                                            *
3030  ******************************************************************************/
sync_server_history(int * values_num,int * triggers_num,int * more)3031 static void	sync_server_history(int *values_num, int *triggers_num, int *more)
3032 {
3033 	static ZBX_HISTORY_FLOAT	*history_float;
3034 	static ZBX_HISTORY_INTEGER	*history_integer;
3035 	static ZBX_HISTORY_STRING	*history_string;
3036 	static ZBX_HISTORY_TEXT		*history_text;
3037 	static ZBX_HISTORY_LOG		*history_log;
3038 	int				i, history_num, history_float_num, history_integer_num, history_string_num,
3039 					history_text_num, history_log_num, txn_error, compression_age;
3040 	unsigned int			item_retrieve_mode;
3041 	time_t				sync_start;
3042 	zbx_vector_uint64_t		triggerids ;
3043 	zbx_vector_ptr_t		history_items, trigger_diff, item_diff, inventory_values, trigger_timers;
3044 	zbx_vector_uint64_pair_t	trends_diff, proxy_subscribtions;
3045 	ZBX_DC_HISTORY			history[ZBX_HC_SYNC_MAX];
3046 
3047 	item_retrieve_mode = NULL == CONFIG_EXPORT_DIR ? ZBX_ITEM_GET_SYNC : ZBX_ITEM_GET_SYNC_EXPORT;
3048 
3049 	if (NULL == history_float && NULL != history_float_cbs)
3050 	{
3051 		history_float = (ZBX_HISTORY_FLOAT *)zbx_malloc(history_float,
3052 				ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_FLOAT));
3053 	}
3054 
3055 	if (NULL == history_integer && NULL != history_integer_cbs)
3056 	{
3057 		history_integer = (ZBX_HISTORY_INTEGER *)zbx_malloc(history_integer,
3058 				ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_INTEGER));
3059 	}
3060 
3061 	if (NULL == history_string && NULL != history_string_cbs)
3062 	{
3063 		history_string = (ZBX_HISTORY_STRING *)zbx_malloc(history_string,
3064 				ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_STRING));
3065 	}
3066 
3067 	if (NULL == history_text && NULL != history_text_cbs)
3068 	{
3069 		history_text = (ZBX_HISTORY_TEXT *)zbx_malloc(history_text,
3070 				ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_TEXT));
3071 	}
3072 
3073 	if (NULL == history_log && NULL != history_log_cbs)
3074 	{
3075 		history_log = (ZBX_HISTORY_LOG *)zbx_malloc(history_log,
3076 				ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_LOG));
3077 	}
3078 
3079 	compression_age = hc_get_history_compression_age();
3080 
3081 	zbx_vector_ptr_create(&inventory_values);
3082 	zbx_vector_ptr_create(&item_diff);
3083 	zbx_vector_ptr_create(&trigger_diff);
3084 	zbx_vector_uint64_pair_create(&trends_diff);
3085 	zbx_vector_uint64_pair_create(&proxy_subscribtions);
3086 
3087 	zbx_vector_uint64_create(&triggerids);
3088 	zbx_vector_uint64_reserve(&triggerids, ZBX_HC_SYNC_MAX);
3089 
3090 	zbx_vector_ptr_create(&trigger_timers);
3091 	zbx_vector_ptr_reserve(&trigger_timers, ZBX_HC_TIMER_MAX);
3092 
3093 	zbx_vector_ptr_create(&history_items);
3094 	zbx_vector_ptr_reserve(&history_items, ZBX_HC_SYNC_MAX);
3095 
3096 	sync_start = time(NULL);
3097 
3098 	do
3099 	{
3100 		DC_ITEM			*items;
3101 		int			*errcodes, trends_num = 0, timers_num = 0, ret = SUCCEED;
3102 		zbx_vector_uint64_t	itemids;
3103 		ZBX_DC_TREND		*trends = NULL;
3104 
3105 		zbx_vector_uint64_create(&itemids);
3106 
3107 		*more = ZBX_SYNC_DONE;
3108 
3109 		LOCK_CACHE;
3110 		hc_pop_items(&history_items);		/* select and take items out of history cache */
3111 		UNLOCK_CACHE;
3112 
3113 		if (0 != history_items.values_num)
3114 		{
3115 			if (0 == (history_num = DCconfig_lock_triggers_by_history_items(&history_items, &triggerids)))
3116 			{
3117 				LOCK_CACHE;
3118 				hc_push_items(&history_items);
3119 				UNLOCK_CACHE;
3120 				zbx_vector_ptr_clear(&history_items);
3121 			}
3122 		}
3123 		else
3124 			history_num = 0;
3125 
3126 		if (0 != history_num)
3127 		{
3128 			hc_get_item_values(history, &history_items);	/* copy item data from history cache */
3129 
3130 			items = (DC_ITEM *)zbx_malloc(NULL, sizeof(DC_ITEM) * (size_t)history_num);
3131 			errcodes = (int *)zbx_malloc(NULL, sizeof(int) * (size_t)history_num);
3132 
3133 			zbx_vector_uint64_reserve(&itemids, history_num);
3134 
3135 			for (i = 0; i < history_num; i++)
3136 				zbx_vector_uint64_append(&itemids, history[i].itemid);
3137 
3138 			zbx_vector_uint64_sort(&itemids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
3139 
3140 			DCconfig_get_items_by_itemids_partial(items, itemids.values, errcodes, history_num,
3141 					item_retrieve_mode);
3142 
3143 			DCmass_prepare_history(history, &itemids, items, errcodes, history_num, &item_diff,
3144 					&inventory_values, compression_age, &proxy_subscribtions);
3145 
3146 			if (FAIL != (ret = DBmass_add_history(history, history_num)))
3147 			{
3148 				DCconfig_items_apply_changes(&item_diff);
3149 				DCmass_update_trends(history, history_num, &trends, &trends_num, compression_age);
3150 
3151 				if (0 != trends_num)
3152 					zbx_tfc_invalidate_trends(trends, trends_num);
3153 
3154 				do
3155 				{
3156 					DBbegin();
3157 
3158 					DBmass_update_items(&item_diff, &inventory_values);
3159 					DBmass_update_trends(trends, trends_num, &trends_diff);
3160 
3161 					/* process internal events generated by DCmass_prepare_history() */
3162 					zbx_process_events(NULL, NULL);
3163 
3164 					if (ZBX_DB_OK == (txn_error = DBcommit()))
3165 						DCupdate_trends(&trends_diff);
3166 					else
3167 						zbx_reset_event_recovery();
3168 
3169 					zbx_vector_uint64_pair_clear(&trends_diff);
3170 				}
3171 				while (ZBX_DB_DOWN == txn_error);
3172 			}
3173 
3174 			zbx_clean_events();
3175 
3176 			zbx_vector_ptr_clear_ext(&inventory_values, (zbx_clean_func_t)DCinventory_value_free);
3177 			zbx_vector_ptr_clear_ext(&item_diff, (zbx_clean_func_t)zbx_ptr_free);
3178 		}
3179 
3180 		if (FAIL != ret)
3181 		{
3182 			/* don't process trigger timers when server is shutting down */
3183 			if (ZBX_IS_RUNNING())
3184 			{
3185 				zbx_dc_get_trigger_timers(&trigger_timers, time(NULL), ZBX_HC_TIMER_SOFT_MAX,
3186 						ZBX_HC_TIMER_MAX);
3187 			}
3188 
3189 			timers_num = trigger_timers.values_num;
3190 
3191 			if (ZBX_HC_TIMER_SOFT_MAX <= timers_num)
3192 				*more = ZBX_SYNC_MORE;
3193 
3194 			if (0 != history_num || 0 != timers_num)
3195 			{
3196 				for (i = 0; i < trigger_timers.values_num; i++)
3197 				{
3198 					zbx_trigger_timer_t	*timer = (zbx_trigger_timer_t *)trigger_timers.values[i];
3199 
3200 					if (0 != timer->lock)
3201 						zbx_vector_uint64_append(&triggerids, timer->triggerid);
3202 				}
3203 
3204 				do
3205 				{
3206 					DBbegin();
3207 
3208 					recalculate_triggers(history, history_num, &itemids, items, errcodes,
3209 							&trigger_timers, &trigger_diff);
3210 
3211 					/* process trigger events generated by recalculate_triggers() */
3212 					zbx_process_events(&trigger_diff, &triggerids);
3213 					if (0 != trigger_diff.values_num)
3214 						zbx_db_save_trigger_changes(&trigger_diff);
3215 
3216 					if (ZBX_DB_OK == (txn_error = DBcommit()))
3217 					{
3218 						DCconfig_triggers_apply_changes(&trigger_diff);
3219 						DBupdate_itservices(&trigger_diff);
3220 					}
3221 					else
3222 						zbx_clean_events();
3223 
3224 					zbx_vector_ptr_clear_ext(&trigger_diff, (zbx_clean_func_t)zbx_trigger_diff_free);
3225 				}
3226 				while (ZBX_DB_DOWN == txn_error);
3227 			}
3228 		}
3229 
3230 		if (0 != triggerids.values_num)
3231 		{
3232 			*triggers_num += triggerids.values_num;
3233 			DCconfig_unlock_triggers(&triggerids);
3234 			zbx_vector_uint64_clear(&triggerids);
3235 		}
3236 
3237 		if (0 != trigger_timers.values_num)
3238 		{
3239 			zbx_dc_reschedule_trigger_timers(&trigger_timers, time(NULL));
3240 			zbx_vector_ptr_clear(&trigger_timers);
3241 		}
3242 
3243 		if (0 != proxy_subscribtions.values_num)
3244 		{
3245 			zbx_vector_uint64_pair_sort(&proxy_subscribtions, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
3246 			zbx_dc_proxy_update_nodata(&proxy_subscribtions);
3247 			zbx_vector_uint64_pair_clear(&proxy_subscribtions);
3248 		}
3249 
3250 		if (0 != history_num)
3251 		{
3252 			LOCK_CACHE;
3253 			hc_push_items(&history_items);	/* return items to history cache */
3254 			cache->history_num -= history_num;
3255 
3256 			if (0 != hc_queue_get_size())
3257 			{
3258 				/* Continue sync if enough of sync candidates were processed       */
3259 				/* (meaning most of sync candidates are not locked by triggers).   */
3260 				/* Otherwise better to wait a bit for other syncers to unlock      */
3261 				/* items rather than trying and failing to sync locked items over  */
3262 				/* and over again.                                                 */
3263 				if (ZBX_HC_SYNC_MIN_PCNT <= history_num * 100 / history_items.values_num)
3264 					*more = ZBX_SYNC_MORE;
3265 			}
3266 
3267 			UNLOCK_CACHE;
3268 
3269 			*values_num += history_num;
3270 		}
3271 
3272 		if (FAIL != ret)
3273 		{
3274 			if (0 != history_num)
3275 			{
3276 				const ZBX_DC_HISTORY	*phistory = NULL;
3277 				const ZBX_DC_TREND	*ptrends = NULL;
3278 				int			history_num_loc = 0, trends_num_loc = 0;
3279 
3280 				DCmodule_prepare_history(history, history_num, history_float, &history_float_num,
3281 						history_integer, &history_integer_num, history_string,
3282 						&history_string_num, history_text, &history_text_num, history_log,
3283 						&history_log_num);
3284 
3285 				DCmodule_sync_history(history_float_num, history_integer_num, history_string_num,
3286 						history_text_num, history_log_num, history_float, history_integer,
3287 						history_string, history_text, history_log);
3288 
3289 				if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_HISTORY))
3290 				{
3291 					phistory = history;
3292 					history_num_loc = history_num;
3293 				}
3294 
3295 				if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_TRENDS))
3296 				{
3297 					ptrends = trends;
3298 					trends_num_loc = trends_num;
3299 				}
3300 
3301 				if (NULL != phistory || NULL != ptrends)
3302 				{
3303 					DCexport_history_and_trends(phistory, history_num_loc, &itemids, items,
3304 							errcodes, ptrends, trends_num_loc);
3305 				}
3306 			}
3307 
3308 			if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_EVENTS))
3309 				zbx_export_events();
3310 		}
3311 
3312 		if (0 != history_num || 0 != timers_num)
3313 			zbx_clean_events();
3314 
3315 		if (0 != history_num)
3316 		{
3317 			zbx_free(trends);
3318 			DCconfig_clean_items(items, errcodes, history_num);
3319 			zbx_free(errcodes);
3320 			zbx_free(items);
3321 
3322 			zbx_vector_ptr_clear(&history_items);
3323 			hc_free_item_values(history, history_num);
3324 		}
3325 
3326 		zbx_vector_uint64_destroy(&itemids);
3327 
3328 		/* Exit from sync loop if we have spent too much time here.       */
3329 		/* This is done to allow syncer process to update its statistics. */
3330 	}
3331 	while (ZBX_SYNC_MORE == *more && ZBX_HC_SYNC_TIME_MAX >= time(NULL) - sync_start);
3332 
3333 	zbx_vector_ptr_destroy(&history_items);
3334 	zbx_vector_ptr_destroy(&inventory_values);
3335 	zbx_vector_ptr_destroy(&item_diff);
3336 	zbx_vector_ptr_destroy(&trigger_diff);
3337 	zbx_vector_uint64_pair_destroy(&trends_diff);
3338 	zbx_vector_uint64_pair_destroy(&proxy_subscribtions);
3339 
3340 	zbx_vector_ptr_destroy(&trigger_timers);
3341 	zbx_vector_uint64_destroy(&triggerids);
3342 }
3343 
3344 /******************************************************************************
3345  *                                                                            *
3346  * Function: sync_history_cache_full                                          *
3347  *                                                                            *
3348  * Purpose: writes updates and new data from history cache to database        *
3349  *                                                                            *
3350  * Comments: This function is used to flush history cache at server/proxy     *
3351  *           exit.                                                            *
3352  *           Other processes are already terminated, so cache locking is      *
3353  *           unnecessary.                                                     *
3354  *                                                                            *
3355  ******************************************************************************/
sync_history_cache_full(void)3356 static void	sync_history_cache_full(void)
3357 {
3358 	int			values_num = 0, triggers_num = 0, more;
3359 	zbx_hashset_iter_t	iter;
3360 	zbx_hc_item_t		*item;
3361 	zbx_binary_heap_t	tmp_history_queue;
3362 
3363 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d", __func__, cache->history_num);
3364 
3365 	/* History index cache might be full without any space left for queueing items from history index to  */
3366 	/* history queue. The solution: replace the shared-memory history queue with heap-allocated one. Add  */
3367 	/* all items from history index to the new history queue.                                             */
3368 	/*                                                                                                    */
3369 	/* Assertions that must be true.                                                                      */
3370 	/*   * This is the main server or proxy process,                                                      */
3371 	/*   * There are no other users of history index cache stored in shared memory. Other processes       */
3372 	/*     should have quit by this point.                                                                */
3373 	/*   * other parts of the program do not hold pointers to the elements of history queue that is       */
3374 	/*     stored in the shared memory.                                                                   */
3375 
3376 	if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
3377 	{
3378 		/* unlock all triggers before full sync so no items are locked by triggers */
3379 		DCconfig_unlock_all_triggers();
3380 	}
3381 
3382 	tmp_history_queue = cache->history_queue;
3383 
3384 	zbx_binary_heap_create(&cache->history_queue, hc_queue_elem_compare_func, ZBX_BINARY_HEAP_OPTION_EMPTY);
3385 	zbx_hashset_iter_reset(&cache->history_items, &iter);
3386 
3387 	/* add all items from history index to the new history queue */
3388 	while (NULL != (item = (zbx_hc_item_t *)zbx_hashset_iter_next(&iter)))
3389 	{
3390 		if (NULL != item->tail)
3391 		{
3392 			item->status = ZBX_HC_ITEM_STATUS_NORMAL;
3393 			hc_queue_item(item);
3394 		}
3395 	}
3396 
3397 	if (0 != hc_queue_get_size())
3398 	{
3399 		zabbix_log(LOG_LEVEL_WARNING, "syncing history data...");
3400 
3401 		do
3402 		{
3403 			if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
3404 				sync_server_history(&values_num, &triggers_num, &more);
3405 			else
3406 				sync_proxy_history(&values_num, &more);
3407 
3408 			zabbix_log(LOG_LEVEL_WARNING, "syncing history data... " ZBX_FS_DBL "%%",
3409 					(double)values_num / (cache->history_num + values_num) * 100);
3410 		}
3411 		while (0 != hc_queue_get_size());
3412 
3413 		zabbix_log(LOG_LEVEL_WARNING, "syncing history data done");
3414 	}
3415 
3416 	zbx_binary_heap_destroy(&cache->history_queue);
3417 	cache->history_queue = tmp_history_queue;
3418 
3419 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
3420 }
3421 
3422 /******************************************************************************
3423  *                                                                            *
3424  * Function: zbx_log_sync_history_cache_progress                              *
3425  *                                                                            *
3426  * Purpose: log progress of syncing history data                              *
3427  *                                                                            *
3428  ******************************************************************************/
zbx_log_sync_history_cache_progress(void)3429 void	zbx_log_sync_history_cache_progress(void)
3430 {
3431 	double		pcnt = -1.0;
3432 	int		ts_last, ts_next, sec;
3433 
3434 	LOCK_CACHE;
3435 
3436 	if (INT_MAX == cache->history_progress_ts)
3437 	{
3438 		UNLOCK_CACHE;
3439 		return;
3440 	}
3441 
3442 	ts_last = cache->history_progress_ts;
3443 	sec = time(NULL);
3444 
3445 	if (0 == cache->history_progress_ts)
3446 	{
3447 		cache->history_num_total = cache->history_num;
3448 		cache->history_progress_ts = sec;
3449 	}
3450 
3451 	if (ZBX_HC_SYNC_TIME_MAX <= sec - cache->history_progress_ts || 0 == cache->history_num)
3452 	{
3453 		if (0 != cache->history_num_total)
3454 			pcnt = 100 * (double)(cache->history_num_total - cache->history_num) / cache->history_num_total;
3455 
3456 		cache->history_progress_ts = (0 == cache->history_num ? INT_MAX : sec);
3457 	}
3458 
3459 	ts_next = cache->history_progress_ts;
3460 
3461 	UNLOCK_CACHE;
3462 
3463 	if (0 == ts_last)
3464 		zabbix_log(LOG_LEVEL_WARNING, "syncing history data in progress... ");
3465 
3466 	if (-1.0 != pcnt)
3467 		zabbix_log(LOG_LEVEL_WARNING, "syncing history data... " ZBX_FS_DBL "%%", pcnt);
3468 
3469 	if (INT_MAX == ts_next)
3470 		zabbix_log(LOG_LEVEL_WARNING, "syncing history data done");
3471 }
3472 
3473 /******************************************************************************
3474  *                                                                            *
3475  * Function: zbx_sync_history_cache                                           *
3476  *                                                                            *
3477  * Purpose: writes updates and new data from history cache to database        *
3478  *                                                                            *
3479  * Parameters: values_num - [OUT] the number of synced values                  *
3480  *             more      - [OUT] a flag indicating the cache emptiness:       *
3481  *                                ZBX_SYNC_DONE - nothing to sync, go idle    *
3482  *                                ZBX_SYNC_MORE - more data to sync           *
3483  *                                                                            *
3484  ******************************************************************************/
zbx_sync_history_cache(int * values_num,int * triggers_num,int * more)3485 void	zbx_sync_history_cache(int *values_num, int *triggers_num, int *more)
3486 {
3487 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d", __func__, cache->history_num);
3488 
3489 	*values_num = 0;
3490 	*triggers_num = 0;
3491 
3492 	if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
3493 		sync_server_history(values_num, triggers_num, more);
3494 	else
3495 		sync_proxy_history(values_num, more);
3496 }
3497 
3498 /******************************************************************************
3499  *                                                                            *
3500  * local history cache                                                        *
3501  *                                                                            *
3502  ******************************************************************************/
dc_string_buffer_realloc(size_t len)3503 static void	dc_string_buffer_realloc(size_t len)
3504 {
3505 	if (string_values_alloc >= string_values_offset + len)
3506 		return;
3507 
3508 	do
3509 	{
3510 		string_values_alloc += ZBX_STRING_REALLOC_STEP;
3511 	}
3512 	while (string_values_alloc < string_values_offset + len);
3513 
3514 	string_values = (char *)zbx_realloc(string_values, string_values_alloc);
3515 }
3516 
dc_local_get_history_slot(void)3517 static dc_item_value_t	*dc_local_get_history_slot(void)
3518 {
3519 	if (ZBX_MAX_VALUES_LOCAL == item_values_num)
3520 		dc_flush_history();
3521 
3522 	if (item_values_alloc == item_values_num)
3523 	{
3524 		item_values_alloc += ZBX_STRUCT_REALLOC_STEP;
3525 		item_values = (dc_item_value_t *)zbx_realloc(item_values, item_values_alloc * sizeof(dc_item_value_t));
3526 	}
3527 
3528 	return &item_values[item_values_num++];
3529 }
3530 
dc_local_add_history_dbl(zbx_uint64_t itemid,unsigned char item_value_type,const zbx_timespec_t * ts,double value_orig,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)3531 static void	dc_local_add_history_dbl(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts,
3532 		double value_orig, zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
3533 {
3534 	dc_item_value_t	*item_value;
3535 
3536 	item_value = dc_local_get_history_slot();
3537 
3538 	item_value->itemid = itemid;
3539 	item_value->ts = *ts;
3540 	item_value->item_value_type = item_value_type;
3541 	item_value->value_type = ITEM_VALUE_TYPE_FLOAT;
3542 	item_value->state = ITEM_STATE_NORMAL;
3543 	item_value->flags = flags;
3544 
3545 	if (0 != (item_value->flags & ZBX_DC_FLAG_META))
3546 	{
3547 		item_value->lastlogsize = lastlogsize;
3548 		item_value->mtime = mtime;
3549 	}
3550 
3551 	if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE))
3552 		item_value->value.value_dbl = value_orig;
3553 }
3554 
dc_local_add_history_uint(zbx_uint64_t itemid,unsigned char item_value_type,const zbx_timespec_t * ts,zbx_uint64_t value_orig,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)3555 static void	dc_local_add_history_uint(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts,
3556 		zbx_uint64_t value_orig, zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
3557 {
3558 	dc_item_value_t	*item_value;
3559 
3560 	item_value = dc_local_get_history_slot();
3561 
3562 	item_value->itemid = itemid;
3563 	item_value->ts = *ts;
3564 	item_value->item_value_type = item_value_type;
3565 	item_value->value_type = ITEM_VALUE_TYPE_UINT64;
3566 	item_value->state = ITEM_STATE_NORMAL;
3567 	item_value->flags = flags;
3568 
3569 	if (0 != (item_value->flags & ZBX_DC_FLAG_META))
3570 	{
3571 		item_value->lastlogsize = lastlogsize;
3572 		item_value->mtime = mtime;
3573 	}
3574 
3575 	if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE))
3576 		item_value->value.value_uint = value_orig;
3577 }
3578 
dc_local_add_history_text(zbx_uint64_t itemid,unsigned char item_value_type,const zbx_timespec_t * ts,const char * value_orig,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)3579 static void	dc_local_add_history_text(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts,
3580 		const char *value_orig, zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
3581 {
3582 	dc_item_value_t	*item_value;
3583 
3584 	item_value = dc_local_get_history_slot();
3585 
3586 	item_value->itemid = itemid;
3587 	item_value->ts = *ts;
3588 	item_value->item_value_type = item_value_type;
3589 	item_value->value_type = ITEM_VALUE_TYPE_TEXT;
3590 	item_value->state = ITEM_STATE_NORMAL;
3591 	item_value->flags = flags;
3592 
3593 	if (0 != (item_value->flags & ZBX_DC_FLAG_META))
3594 	{
3595 		item_value->lastlogsize = lastlogsize;
3596 		item_value->mtime = mtime;
3597 	}
3598 
3599 	if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE))
3600 	{
3601 		item_value->value.value_str.len = zbx_db_strlen_n(value_orig, ZBX_HISTORY_VALUE_LEN) + 1;
3602 		dc_string_buffer_realloc(item_value->value.value_str.len);
3603 
3604 		item_value->value.value_str.pvalue = string_values_offset;
3605 		memcpy(&string_values[string_values_offset], value_orig, item_value->value.value_str.len);
3606 		string_values_offset += item_value->value.value_str.len;
3607 	}
3608 	else
3609 		item_value->value.value_str.len = 0;
3610 }
3611 
dc_local_add_history_log(zbx_uint64_t itemid,unsigned char item_value_type,const zbx_timespec_t * ts,const zbx_log_t * log,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)3612 static void	dc_local_add_history_log(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts,
3613 		const zbx_log_t *log, zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
3614 {
3615 	dc_item_value_t	*item_value;
3616 
3617 	item_value = dc_local_get_history_slot();
3618 
3619 	item_value->itemid = itemid;
3620 	item_value->ts = *ts;
3621 	item_value->item_value_type = item_value_type;
3622 	item_value->value_type = ITEM_VALUE_TYPE_LOG;
3623 	item_value->state = ITEM_STATE_NORMAL;
3624 
3625 	item_value->flags = flags;
3626 
3627 	if (0 != (item_value->flags & ZBX_DC_FLAG_META))
3628 	{
3629 		item_value->lastlogsize = lastlogsize;
3630 		item_value->mtime = mtime;
3631 	}
3632 
3633 	if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE))
3634 	{
3635 		item_value->severity = log->severity;
3636 		item_value->logeventid = log->logeventid;
3637 		item_value->timestamp = log->timestamp;
3638 
3639 		item_value->value.value_str.len = zbx_db_strlen_n(log->value, ZBX_HISTORY_VALUE_LEN) + 1;
3640 
3641 		if (NULL != log->source && '\0' != *log->source)
3642 			item_value->source.len = zbx_db_strlen_n(log->source, HISTORY_LOG_SOURCE_LEN) + 1;
3643 		else
3644 			item_value->source.len = 0;
3645 	}
3646 	else
3647 	{
3648 		item_value->value.value_str.len = 0;
3649 		item_value->source.len = 0;
3650 	}
3651 
3652 	if (0 != item_value->value.value_str.len + item_value->source.len)
3653 	{
3654 		dc_string_buffer_realloc(item_value->value.value_str.len + item_value->source.len);
3655 
3656 		if (0 != item_value->value.value_str.len)
3657 		{
3658 			item_value->value.value_str.pvalue = string_values_offset;
3659 			memcpy(&string_values[string_values_offset], log->value, item_value->value.value_str.len);
3660 			string_values_offset += item_value->value.value_str.len;
3661 		}
3662 
3663 		if (0 != item_value->source.len)
3664 		{
3665 			item_value->source.pvalue = string_values_offset;
3666 			memcpy(&string_values[string_values_offset], log->source, item_value->source.len);
3667 			string_values_offset += item_value->source.len;
3668 		}
3669 	}
3670 }
3671 
dc_local_add_history_notsupported(zbx_uint64_t itemid,const zbx_timespec_t * ts,const char * error,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)3672 static void	dc_local_add_history_notsupported(zbx_uint64_t itemid, const zbx_timespec_t *ts, const char *error,
3673 		zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
3674 {
3675 	dc_item_value_t	*item_value;
3676 
3677 	item_value = dc_local_get_history_slot();
3678 
3679 	item_value->itemid = itemid;
3680 	item_value->ts = *ts;
3681 	item_value->state = ITEM_STATE_NOTSUPPORTED;
3682 	item_value->flags = flags;
3683 
3684 	if (0 != (item_value->flags & ZBX_DC_FLAG_META))
3685 	{
3686 		item_value->lastlogsize = lastlogsize;
3687 		item_value->mtime = mtime;
3688 	}
3689 
3690 	item_value->value.value_str.len = zbx_db_strlen_n(error, ITEM_ERROR_LEN) + 1;
3691 	dc_string_buffer_realloc(item_value->value.value_str.len);
3692 	item_value->value.value_str.pvalue = string_values_offset;
3693 	memcpy(&string_values[string_values_offset], error, item_value->value.value_str.len);
3694 	string_values_offset += item_value->value.value_str.len;
3695 }
3696 
dc_local_add_history_lld(zbx_uint64_t itemid,const zbx_timespec_t * ts,const char * value_orig)3697 static void	dc_local_add_history_lld(zbx_uint64_t itemid, const zbx_timespec_t *ts, const char *value_orig)
3698 {
3699 	dc_item_value_t	*item_value;
3700 
3701 	item_value = dc_local_get_history_slot();
3702 
3703 	item_value->itemid = itemid;
3704 	item_value->ts = *ts;
3705 	item_value->state = ITEM_STATE_NORMAL;
3706 	item_value->flags = ZBX_DC_FLAG_LLD;
3707 	item_value->value.value_str.len = strlen(value_orig) + 1;
3708 
3709 	dc_string_buffer_realloc(item_value->value.value_str.len);
3710 	item_value->value.value_str.pvalue = string_values_offset;
3711 	memcpy(&string_values[string_values_offset], value_orig, item_value->value.value_str.len);
3712 	string_values_offset += item_value->value.value_str.len;
3713 }
3714 
dc_local_add_history_empty(zbx_uint64_t itemid,unsigned char item_value_type,const zbx_timespec_t * ts,unsigned char flags)3715 static void	dc_local_add_history_empty(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts,
3716 		unsigned char flags)
3717 {
3718 	dc_item_value_t	*item_value;
3719 
3720 	item_value = dc_local_get_history_slot();
3721 
3722 	item_value->itemid = itemid;
3723 	item_value->ts = *ts;
3724 	item_value->item_value_type = item_value_type;
3725 	item_value->value_type = ITEM_VALUE_TYPE_NONE;
3726 	item_value->state = ITEM_STATE_NORMAL;
3727 	item_value->flags = flags;
3728 }
3729 
3730 /******************************************************************************
3731  *                                                                            *
3732  * Function: dc_add_history                                                   *
3733  *                                                                            *
3734  * Purpose: add new value to the cache                                        *
3735  *                                                                            *
3736  * Parameters:  itemid          - [IN] the itemid                             *
3737  *              item_value_type - [IN] the item value type                    *
3738  *              item_flags      - [IN] the item flags (e. g. lld rule)        *
3739  *              result          - [IN] agent result containing the value      *
3740  *                                to add                                      *
3741  *              ts              - [IN] the value timestamp                    *
3742  *              state           - [IN] the item state                         *
3743  *              error           - [IN] the error message in case item state   *
3744  *                                is ITEM_STATE_NOTSUPPORTED                  *
3745  *                                                                            *
3746  ******************************************************************************/
dc_add_history(zbx_uint64_t itemid,unsigned char item_value_type,unsigned char item_flags,AGENT_RESULT * result,const zbx_timespec_t * ts,unsigned char state,const char * error)3747 void	dc_add_history(zbx_uint64_t itemid, unsigned char item_value_type, unsigned char item_flags,
3748 		AGENT_RESULT *result, const zbx_timespec_t *ts, unsigned char state, const char *error)
3749 {
3750 	unsigned char	value_flags;
3751 
3752 	if (ITEM_STATE_NOTSUPPORTED == state)
3753 	{
3754 		zbx_uint64_t	lastlogsize;
3755 		int		mtime;
3756 
3757 		if (NULL != result && 0 != ISSET_META(result))
3758 		{
3759 			value_flags = ZBX_DC_FLAG_META;
3760 			lastlogsize = result->lastlogsize;
3761 			mtime = result->mtime;
3762 		}
3763 		else
3764 		{
3765 			value_flags = 0;
3766 			lastlogsize = 0;
3767 			mtime = 0;
3768 		}
3769 		dc_local_add_history_notsupported(itemid, ts, error, lastlogsize, mtime, value_flags);
3770 		return;
3771 	}
3772 
3773 	/* allow proxy to send timestamps of empty (throttled etc) values to update nextchecks for queue */
3774 	if (!ISSET_VALUE(result) && !ISSET_META(result) && 0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
3775 		return;
3776 
3777 	value_flags = 0;
3778 
3779 	if (!ISSET_VALUE(result))
3780 		value_flags |= ZBX_DC_FLAG_NOVALUE;
3781 
3782 	if (ISSET_META(result))
3783 		value_flags |= ZBX_DC_FLAG_META;
3784 
3785 	/* Add data to the local history cache if:                                           */
3786 	/*   1) the NOVALUE flag is set (data contains either meta information or timestamp) */
3787 	/*   2) the NOVALUE flag is not set and value conversion succeeded                   */
3788 
3789 	if (0 == (value_flags & ZBX_DC_FLAG_NOVALUE))
3790 	{
3791 		if (0 != (ZBX_FLAG_DISCOVERY_RULE & item_flags))
3792 		{
3793 			if (NULL == GET_TEXT_RESULT(result))
3794 				return;
3795 
3796 			/* proxy stores low-level discovery (lld) values in db */
3797 			if (0 == (ZBX_PROGRAM_TYPE_SERVER & program_type))
3798 				dc_local_add_history_lld(itemid, ts, result->text);
3799 
3800 			return;
3801 		}
3802 
3803 		if (ISSET_LOG(result))
3804 		{
3805 			dc_local_add_history_log(itemid, item_value_type, ts, result->log, result->lastlogsize,
3806 					result->mtime, value_flags);
3807 		}
3808 		else if (ISSET_UI64(result))
3809 		{
3810 			dc_local_add_history_uint(itemid, item_value_type, ts, result->ui64, result->lastlogsize,
3811 					result->mtime, value_flags);
3812 		}
3813 		else if (ISSET_DBL(result))
3814 		{
3815 			dc_local_add_history_dbl(itemid, item_value_type, ts, result->dbl, result->lastlogsize,
3816 					result->mtime, value_flags);
3817 		}
3818 		else if (ISSET_STR(result))
3819 		{
3820 			dc_local_add_history_text(itemid, item_value_type, ts, result->str, result->lastlogsize,
3821 					result->mtime, value_flags);
3822 		}
3823 		else if (ISSET_TEXT(result))
3824 		{
3825 			dc_local_add_history_text(itemid, item_value_type, ts, result->text, result->lastlogsize,
3826 					result->mtime, value_flags);
3827 		}
3828 		else
3829 		{
3830 			THIS_SHOULD_NEVER_HAPPEN;
3831 		}
3832 	}
3833 	else
3834 	{
3835 		if (0 != (value_flags & ZBX_DC_FLAG_META))
3836 		{
3837 			dc_local_add_history_log(itemid, item_value_type, ts, NULL, result->lastlogsize, result->mtime,
3838 					value_flags);
3839 		}
3840 		else
3841 			dc_local_add_history_empty(itemid, item_value_type, ts, value_flags);
3842 	}
3843 }
3844 
dc_flush_history(void)3845 void	dc_flush_history(void)
3846 {
3847 	if (0 == item_values_num)
3848 		return;
3849 
3850 	LOCK_CACHE;
3851 
3852 	hc_add_item_values(item_values, item_values_num);
3853 
3854 	cache->history_num += item_values_num;
3855 
3856 	UNLOCK_CACHE;
3857 
3858 	item_values_num = 0;
3859 	string_values_offset = 0;
3860 }
3861 
3862 /******************************************************************************
3863  *                                                                            *
3864  * history cache storage                                                      *
3865  *                                                                            *
3866  ******************************************************************************/
ZBX_MEM_FUNC_IMPL(__hc_index,hc_index_mem)3867 ZBX_MEM_FUNC_IMPL(__hc_index, hc_index_mem)
3868 ZBX_MEM_FUNC_IMPL(__hc, hc_mem)
3869 
3870 /******************************************************************************
3871  *                                                                            *
3872  * Function: hc_queue_elem_compare_func                                       *
3873  *                                                                            *
3874  * Purpose: compares history queue elements                                   *
3875  *                                                                            *
3876  ******************************************************************************/
3877 static int	hc_queue_elem_compare_func(const void *d1, const void *d2)
3878 {
3879 	const zbx_binary_heap_elem_t	*e1 = (const zbx_binary_heap_elem_t *)d1;
3880 	const zbx_binary_heap_elem_t	*e2 = (const zbx_binary_heap_elem_t *)d2;
3881 
3882 	const zbx_hc_item_t	*item1 = (const zbx_hc_item_t *)e1->data;
3883 	const zbx_hc_item_t	*item2 = (const zbx_hc_item_t *)e2->data;
3884 
3885 	/* compare by timestamp of the oldest value */
3886 	return zbx_timespec_compare(&item1->tail->ts, &item2->tail->ts);
3887 }
3888 
3889 /******************************************************************************
3890  *                                                                            *
3891  * Function: hc_free_data                                                     *
3892  *                                                                            *
3893  * Purpose: free history item data allocated in history cache                 *
3894  *                                                                            *
3895  * Parameters: data - [IN] history item data                                  *
3896  *                                                                            *
3897  ******************************************************************************/
hc_free_data(zbx_hc_data_t * data)3898 static void	hc_free_data(zbx_hc_data_t *data)
3899 {
3900 	if (ITEM_STATE_NOTSUPPORTED == data->state)
3901 	{
3902 		__hc_mem_free_func(data->value.str);
3903 	}
3904 	else
3905 	{
3906 		if (0 == (data->flags & ZBX_DC_FLAG_NOVALUE))
3907 		{
3908 			switch (data->value_type)
3909 			{
3910 				case ITEM_VALUE_TYPE_STR:
3911 				case ITEM_VALUE_TYPE_TEXT:
3912 					__hc_mem_free_func(data->value.str);
3913 					break;
3914 				case ITEM_VALUE_TYPE_LOG:
3915 					__hc_mem_free_func(data->value.log->value);
3916 
3917 					if (NULL != data->value.log->source)
3918 						__hc_mem_free_func(data->value.log->source);
3919 
3920 					__hc_mem_free_func(data->value.log);
3921 					break;
3922 			}
3923 		}
3924 	}
3925 
3926 	__hc_mem_free_func(data);
3927 }
3928 
3929 /******************************************************************************
3930  *                                                                            *
3931  * Function: hc_queue_item                                                    *
3932  *                                                                            *
3933  * Purpose: put back item into history queue                                  *
3934  *                                                                            *
3935  * Parameters: data - [IN] history item data                                  *
3936  *                                                                            *
3937  ******************************************************************************/
hc_queue_item(zbx_hc_item_t * item)3938 static void	hc_queue_item(zbx_hc_item_t *item)
3939 {
3940 	zbx_binary_heap_elem_t	elem = {item->itemid, (const void *)item};
3941 
3942 	zbx_binary_heap_insert(&cache->history_queue, &elem);
3943 }
3944 
3945 /******************************************************************************
3946  *                                                                            *
3947  * Function: hc_get_item                                                      *
3948  *                                                                            *
3949  * Purpose: returns history item by itemid                                    *
3950  *                                                                            *
3951  * Parameters: itemid - [IN] the item id                                      *
3952  *                                                                            *
3953  * Return value: the history item or NULL if the requested item is not in     *
3954  *               history cache                                                *
3955  *                                                                            *
3956  ******************************************************************************/
hc_get_item(zbx_uint64_t itemid)3957 static zbx_hc_item_t	*hc_get_item(zbx_uint64_t itemid)
3958 {
3959 	return (zbx_hc_item_t *)zbx_hashset_search(&cache->history_items, &itemid);
3960 }
3961 
3962 /******************************************************************************
3963  *                                                                            *
3964  * Function: hc_add_item                                                      *
3965  *                                                                            *
3966  * Purpose: adds a new item to history cache                                  *
3967  *                                                                            *
3968  * Parameters: itemid - [IN] the item id                                      *
3969  *                      [IN] the item data                                    *
3970  *                                                                            *
3971  * Return value: the added history item                                       *
3972  *                                                                            *
3973  ******************************************************************************/
hc_add_item(zbx_uint64_t itemid,zbx_hc_data_t * data)3974 static zbx_hc_item_t	*hc_add_item(zbx_uint64_t itemid, zbx_hc_data_t *data)
3975 {
3976 	zbx_hc_item_t	item_local = {itemid, ZBX_HC_ITEM_STATUS_NORMAL, 0, data, data};
3977 
3978 	return (zbx_hc_item_t *)zbx_hashset_insert(&cache->history_items, &item_local, sizeof(item_local));
3979 }
3980 
3981 /******************************************************************************
3982  *                                                                            *
3983  * Function: hc_mem_value_str_dup                                             *
3984  *                                                                            *
3985  * Purpose: copies string value to history cache                              *
3986  *                                                                            *
3987  * Parameters: str - [IN] the string value                                    *
3988  *                                                                            *
3989  * Return value: the copied string or NULL if there was not enough memory     *
3990  *                                                                            *
3991  ******************************************************************************/
hc_mem_value_str_dup(const dc_value_str_t * str)3992 static char	*hc_mem_value_str_dup(const dc_value_str_t *str)
3993 {
3994 	char	*ptr;
3995 
3996 	if (NULL == (ptr = (char *)__hc_mem_malloc_func(NULL, str->len)))
3997 		return NULL;
3998 
3999 	memcpy(ptr, &string_values[str->pvalue], str->len - 1);
4000 	ptr[str->len - 1] = '\0';
4001 
4002 	return ptr;
4003 }
4004 
4005 /******************************************************************************
4006  *                                                                            *
4007  * Function: hc_clone_history_str_data                                        *
4008  *                                                                            *
4009  * Purpose: clones string value into history data memory                      *
4010  *                                                                            *
4011  * Parameters: dst - [IN/OUT] a reference to the cloned value                 *
4012  *             str - [IN] the string value to clone                           *
4013  *                                                                            *
4014  * Return value: SUCCESS - either there was no need to clone the string       *
4015  *                         (it was empty or already cloned) or the string was *
4016  *                          cloned successfully                               *
4017  *               FAIL    - not enough memory                                  *
4018  *                                                                            *
4019  * Comments: This function can be called in loop with the same dst value      *
4020  *           until it finishes cloning string value.                          *
4021  *                                                                            *
4022  ******************************************************************************/
hc_clone_history_str_data(char ** dst,const dc_value_str_t * str)4023 static int	hc_clone_history_str_data(char **dst, const dc_value_str_t *str)
4024 {
4025 	if (0 == str->len)
4026 		return SUCCEED;
4027 
4028 	if (NULL != *dst)
4029 		return SUCCEED;
4030 
4031 	if (NULL != (*dst = hc_mem_value_str_dup(str)))
4032 		return SUCCEED;
4033 
4034 	return FAIL;
4035 }
4036 
4037 /******************************************************************************
4038  *                                                                            *
4039  * Function: hc_clone_history_log_data                                        *
4040  *                                                                            *
4041  * Purpose: clones log value into history data memory                         *
4042  *                                                                            *
4043  * Parameters: dst        - [IN/OUT] a reference to the cloned value          *
4044  *             item_value - [IN] the log value to clone                       *
4045  *                                                                            *
4046  * Return value: SUCCESS - the log value was cloned successfully              *
4047  *               FAIL    - not enough memory                                  *
4048  *                                                                            *
4049  * Comments: This function can be called in loop with the same dst value      *
4050  *           until it finishes cloning log value.                             *
4051  *                                                                            *
4052  ******************************************************************************/
hc_clone_history_log_data(zbx_log_value_t ** dst,const dc_item_value_t * item_value)4053 static int	hc_clone_history_log_data(zbx_log_value_t **dst, const dc_item_value_t *item_value)
4054 {
4055 	if (NULL == *dst)
4056 	{
4057 		/* using realloc instead of malloc just to suppress 'not used' warning for realloc */
4058 		if (NULL == (*dst = (zbx_log_value_t *)__hc_mem_realloc_func(NULL, sizeof(zbx_log_value_t))))
4059 			return FAIL;
4060 
4061 		memset(*dst, 0, sizeof(zbx_log_value_t));
4062 	}
4063 
4064 	if (SUCCEED != hc_clone_history_str_data(&(*dst)->value, &item_value->value.value_str))
4065 		return FAIL;
4066 
4067 	if (SUCCEED != hc_clone_history_str_data(&(*dst)->source, &item_value->source))
4068 		return FAIL;
4069 
4070 	(*dst)->logeventid = item_value->logeventid;
4071 	(*dst)->severity = item_value->severity;
4072 	(*dst)->timestamp = item_value->timestamp;
4073 
4074 	return SUCCEED;
4075 }
4076 
4077 /******************************************************************************
4078  *                                                                            *
4079  * Function: hc_clone_history_data                                            *
4080  *                                                                            *
4081  * Purpose: clones item value from local cache into history cache             *
4082  *                                                                            *
4083  * Parameters: data       - [IN/OUT] a reference to the cloned value          *
4084  *             item_value - [IN] the item value                               *
4085  *                                                                            *
4086  * Return value: SUCCESS - the item value was cloned successfully             *
4087  *               FAIL    - not enough memory                                  *
4088  *                                                                            *
4089  * Comments: This function can be called in loop with the same data value     *
4090  *           until it finishes cloning item value.                            *
4091  *                                                                            *
4092  ******************************************************************************/
hc_clone_history_data(zbx_hc_data_t ** data,const dc_item_value_t * item_value)4093 static int	hc_clone_history_data(zbx_hc_data_t **data, const dc_item_value_t *item_value)
4094 {
4095 	if (NULL == *data)
4096 	{
4097 		if (NULL == (*data = (zbx_hc_data_t *)__hc_mem_malloc_func(NULL, sizeof(zbx_hc_data_t))))
4098 			return FAIL;
4099 
4100 		memset(*data, 0, sizeof(zbx_hc_data_t));
4101 
4102 		(*data)->state = item_value->state;
4103 		(*data)->ts = item_value->ts;
4104 		(*data)->flags = item_value->flags;
4105 	}
4106 
4107 	if (0 != (ZBX_DC_FLAG_META & item_value->flags))
4108 	{
4109 		(*data)->lastlogsize = item_value->lastlogsize;
4110 		(*data)->mtime = item_value->mtime;
4111 	}
4112 
4113 	if (ITEM_STATE_NOTSUPPORTED == item_value->state)
4114 	{
4115 		if (NULL == ((*data)->value.str = hc_mem_value_str_dup(&item_value->value.value_str)))
4116 			return FAIL;
4117 
4118 		(*data)->value_type = item_value->value_type;
4119 		cache->stats.notsupported_counter++;
4120 
4121 		return SUCCEED;
4122 	}
4123 
4124 	if (0 != (ZBX_DC_FLAG_LLD & item_value->flags))
4125 	{
4126 		if (NULL == ((*data)->value.str = hc_mem_value_str_dup(&item_value->value.value_str)))
4127 			return FAIL;
4128 
4129 		(*data)->value_type = ITEM_VALUE_TYPE_TEXT;
4130 
4131 		cache->stats.history_text_counter++;
4132 		cache->stats.history_counter++;
4133 
4134 		return SUCCEED;
4135 	}
4136 
4137 	if (0 == (ZBX_DC_FLAG_NOVALUE & item_value->flags))
4138 	{
4139 		switch (item_value->value_type)
4140 		{
4141 			case ITEM_VALUE_TYPE_FLOAT:
4142 				(*data)->value.dbl = item_value->value.value_dbl;
4143 				break;
4144 			case ITEM_VALUE_TYPE_UINT64:
4145 				(*data)->value.ui64 = item_value->value.value_uint;
4146 				break;
4147 			case ITEM_VALUE_TYPE_STR:
4148 				if (SUCCEED != hc_clone_history_str_data(&(*data)->value.str,
4149 						&item_value->value.value_str))
4150 				{
4151 					return FAIL;
4152 				}
4153 				break;
4154 			case ITEM_VALUE_TYPE_TEXT:
4155 				if (SUCCEED != hc_clone_history_str_data(&(*data)->value.str,
4156 						&item_value->value.value_str))
4157 				{
4158 					return FAIL;
4159 				}
4160 				break;
4161 			case ITEM_VALUE_TYPE_LOG:
4162 				if (SUCCEED != hc_clone_history_log_data(&(*data)->value.log, item_value))
4163 					return FAIL;
4164 				break;
4165 		}
4166 
4167 		switch (item_value->item_value_type)
4168 		{
4169 			case ITEM_VALUE_TYPE_FLOAT:
4170 				cache->stats.history_float_counter++;
4171 				break;
4172 			case ITEM_VALUE_TYPE_UINT64:
4173 				cache->stats.history_uint_counter++;
4174 				break;
4175 			case ITEM_VALUE_TYPE_STR:
4176 				cache->stats.history_str_counter++;
4177 				break;
4178 			case ITEM_VALUE_TYPE_TEXT:
4179 				cache->stats.history_text_counter++;
4180 				break;
4181 			case ITEM_VALUE_TYPE_LOG:
4182 				cache->stats.history_log_counter++;
4183 				break;
4184 		}
4185 
4186 		cache->stats.history_counter++;
4187 	}
4188 
4189 	(*data)->value_type = item_value->value_type;
4190 
4191 	return SUCCEED;
4192 }
4193 
4194 /******************************************************************************
4195  *                                                                            *
4196  * Function: hc_add_item_values                                               *
4197  *                                                                            *
4198  * Purpose: adds item values to the history cache                             *
4199  *                                                                            *
4200  * Parameters: values     - [IN] the item values to add                       *
4201  *             values_num - [IN] the number of item values to add             *
4202  *                                                                            *
4203  * Comments: If the history cache is full this function will wait until       *
4204  *           history syncers processes values freeing enough space to store   *
4205  *           the new value.                                                   *
4206  *                                                                            *
4207  ******************************************************************************/
hc_add_item_values(dc_item_value_t * values,int values_num)4208 static void	hc_add_item_values(dc_item_value_t *values, int values_num)
4209 {
4210 	dc_item_value_t	*item_value;
4211 	int		i;
4212 	zbx_hc_item_t	*item;
4213 
4214 	for (i = 0; i < values_num; i++)
4215 	{
4216 		zbx_hc_data_t	*data = NULL;
4217 
4218 		item_value = &values[i];
4219 
4220 		/* a record with metadata and no value can be dropped if  */
4221 		/* the metadata update is copied to the last queued value */
4222 		if (NULL != (item = hc_get_item(item_value->itemid)) &&
4223 				0 != (item_value->flags & ZBX_DC_FLAG_NOVALUE) &&
4224 				0 != (item_value->flags & ZBX_DC_FLAG_META))
4225 		{
4226 			/* skip metadata updates when only one value is queued, */
4227 			/* because the item might be already being processed    */
4228 			if (item->head != item->tail)
4229 			{
4230 				item->head->lastlogsize = item_value->lastlogsize;
4231 				item->head->mtime = item_value->mtime;
4232 				item->head->flags |= ZBX_DC_FLAG_META;
4233 				continue;
4234 			}
4235 		}
4236 
4237 		if (SUCCEED != hc_clone_history_data(&data, item_value))
4238 		{
4239 			do
4240 			{
4241 				UNLOCK_CACHE;
4242 
4243 				zabbix_log(LOG_LEVEL_DEBUG, "History cache is full. Sleeping for 1 second.");
4244 				sleep(1);
4245 
4246 				LOCK_CACHE;
4247 			}
4248 			while (SUCCEED != hc_clone_history_data(&data, item_value));
4249 
4250 			item = hc_get_item(item_value->itemid);
4251 		}
4252 
4253 		if (NULL == item)
4254 		{
4255 			item = hc_add_item(item_value->itemid, data);
4256 			hc_queue_item(item);
4257 		}
4258 		else
4259 		{
4260 			item->head->next = data;
4261 			item->head = data;
4262 		}
4263 		item->values_num++;
4264 	}
4265 }
4266 
4267 /******************************************************************************
4268  *                                                                            *
4269  * Function: hc_copy_history_data                                             *
4270  *                                                                            *
4271  * Purpose: copies item value from history cache into the specified history   *
4272  *          value                                                             *
4273  *                                                                            *
4274  * Parameters: history - [OUT] the history value                              *
4275  *             itemid  - [IN] the item identifier                             *
4276  *             data    - [IN] the history data to copy                        *
4277  *                                                                            *
4278  * Comments: handling of uninitialized fields in dc_add_proxy_history_log()   *
4279  *                                                                            *
4280  ******************************************************************************/
hc_copy_history_data(ZBX_DC_HISTORY * history,zbx_uint64_t itemid,zbx_hc_data_t * data)4281 static void	hc_copy_history_data(ZBX_DC_HISTORY *history, zbx_uint64_t itemid, zbx_hc_data_t *data)
4282 {
4283 	history->itemid = itemid;
4284 	history->ts = data->ts;
4285 	history->state = data->state;
4286 	history->flags = data->flags;
4287 	history->lastlogsize = data->lastlogsize;
4288 	history->mtime = data->mtime;
4289 
4290 	if (ITEM_STATE_NOTSUPPORTED == data->state)
4291 	{
4292 		history->value.err = zbx_strdup(NULL, data->value.str);
4293 		history->flags |= ZBX_DC_FLAG_UNDEF;
4294 		return;
4295 	}
4296 
4297 	history->value_type = data->value_type;
4298 
4299 	if (0 == (ZBX_DC_FLAG_NOVALUE & data->flags))
4300 	{
4301 		switch (data->value_type)
4302 		{
4303 			case ITEM_VALUE_TYPE_FLOAT:
4304 				history->value.dbl = data->value.dbl;
4305 				break;
4306 			case ITEM_VALUE_TYPE_UINT64:
4307 				history->value.ui64 = data->value.ui64;
4308 				break;
4309 			case ITEM_VALUE_TYPE_STR:
4310 			case ITEM_VALUE_TYPE_TEXT:
4311 				history->value.str = zbx_strdup(NULL, data->value.str);
4312 				break;
4313 			case ITEM_VALUE_TYPE_LOG:
4314 				history->value.log = (zbx_log_value_t *)zbx_malloc(NULL, sizeof(zbx_log_value_t));
4315 				history->value.log->value = zbx_strdup(NULL, data->value.log->value);
4316 
4317 				if (NULL != data->value.log->source)
4318 					history->value.log->source = zbx_strdup(NULL, data->value.log->source);
4319 				else
4320 					history->value.log->source = NULL;
4321 
4322 				history->value.log->timestamp = data->value.log->timestamp;
4323 				history->value.log->severity = data->value.log->severity;
4324 				history->value.log->logeventid = data->value.log->logeventid;
4325 
4326 				break;
4327 		}
4328 	}
4329 }
4330 
4331 /******************************************************************************
4332  *                                                                            *
4333  * Function: hc_pop_items                                                     *
4334  *                                                                            *
4335  * Purpose: pops the next batch of history items from cache for processing    *
4336  *                                                                            *
4337  * Parameters: history_items - [OUT] the locked history items                 *
4338  *                                                                            *
4339  * Comments: The history_items must be returned back to history cache with    *
4340  *           hc_push_items() function after they have been processed.         *
4341  *                                                                            *
4342  ******************************************************************************/
hc_pop_items(zbx_vector_ptr_t * history_items)4343 static void	hc_pop_items(zbx_vector_ptr_t *history_items)
4344 {
4345 	zbx_binary_heap_elem_t	*elem;
4346 	zbx_hc_item_t		*item;
4347 
4348 	while (ZBX_HC_SYNC_MAX > history_items->values_num && FAIL == zbx_binary_heap_empty(&cache->history_queue))
4349 	{
4350 		elem = zbx_binary_heap_find_min(&cache->history_queue);
4351 		item = (zbx_hc_item_t *)elem->data;
4352 		zbx_vector_ptr_append(history_items, item);
4353 
4354 		zbx_binary_heap_remove_min(&cache->history_queue);
4355 	}
4356 }
4357 
4358 /******************************************************************************
4359  *                                                                            *
4360  * Function: hc_get_item_values                                               *
4361  *                                                                            *
4362  * Purpose: gets item history values                                          *
4363  *                                                                            *
4364  * Parameters: history       - [OUT] the history valeus                       *
4365  *             history_items - [IN] the history items                         *
4366  *                                                                            *
4367  ******************************************************************************/
hc_get_item_values(ZBX_DC_HISTORY * history,zbx_vector_ptr_t * history_items)4368 static void	hc_get_item_values(ZBX_DC_HISTORY *history, zbx_vector_ptr_t *history_items)
4369 {
4370 	int		i, history_num = 0;
4371 	zbx_hc_item_t	*item;
4372 
4373 	/* we don't need to lock history cache because no other processes can  */
4374 	/* change item's history data until it is pushed back to history queue */
4375 	for (i = 0; i < history_items->values_num; i++)
4376 	{
4377 		item = (zbx_hc_item_t *)history_items->values[i];
4378 
4379 		if (ZBX_HC_ITEM_STATUS_BUSY == item->status)
4380 			continue;
4381 
4382 		hc_copy_history_data(&history[history_num++], item->itemid, item->tail);
4383 	}
4384 }
4385 
4386 /******************************************************************************
4387  *                                                                            *
4388  * Function: hc_push_processed_items                                          *
4389  *                                                                            *
4390  * Purpose: push back the processed history items into history cache          *
4391  *                                                                            *
4392  * Parameters: history_items - [IN] the history items containing processed    *
4393  *                                  (available) and busy items                *
4394  *                                                                            *
4395  * Comments: This function removes processed value from history cache.        *
4396  *           If there is no more data for this item, then the item itself is  *
4397  *           removed from history index.                                      *
4398  *                                                                            *
4399  ******************************************************************************/
hc_push_items(zbx_vector_ptr_t * history_items)4400 void	hc_push_items(zbx_vector_ptr_t *history_items)
4401 {
4402 	int		i;
4403 	zbx_hc_item_t	*item;
4404 	zbx_hc_data_t	*data_free;
4405 
4406 	for (i = 0; i < history_items->values_num; i++)
4407 	{
4408 		item = (zbx_hc_item_t *)history_items->values[i];
4409 
4410 		switch (item->status)
4411 		{
4412 			case ZBX_HC_ITEM_STATUS_BUSY:
4413 				/* reset item status before returning it to queue */
4414 				item->status = ZBX_HC_ITEM_STATUS_NORMAL;
4415 				hc_queue_item(item);
4416 				break;
4417 			case ZBX_HC_ITEM_STATUS_NORMAL:
4418 				item->values_num--;
4419 				data_free = item->tail;
4420 				item->tail = item->tail->next;
4421 				hc_free_data(data_free);
4422 				if (NULL == item->tail)
4423 					zbx_hashset_remove(&cache->history_items, item);
4424 				else
4425 					hc_queue_item(item);
4426 				break;
4427 		}
4428 	}
4429 }
4430 
4431 /******************************************************************************
4432  *                                                                            *
4433  * Function: hc_queue_get_size                                                *
4434  *                                                                            *
4435  * Purpose: retrieve the size of history queue                                *
4436  *                                                                            *
4437  ******************************************************************************/
hc_queue_get_size(void)4438 int	hc_queue_get_size(void)
4439 {
4440 	return cache->history_queue.elems_num;
4441 }
4442 
hc_get_history_compression_age(void)4443 int	hc_get_history_compression_age(void)
4444 {
4445 #if defined(HAVE_POSTGRESQL)
4446 	zbx_config_t	cfg;
4447 	int		compression_age = 0;
4448 
4449 	zbx_config_get(&cfg, ZBX_CONFIG_FLAGS_DB_EXTENSION);
4450 
4451 	if (ON == cfg.db.history_compression_status && 0 != cfg.db.history_compress_older)
4452 	{
4453 		compression_age = (int)time(NULL) - cfg.db.history_compress_older;
4454 	}
4455 
4456 	zbx_config_clean(&cfg);
4457 
4458 	return compression_age;
4459 #else
4460 	return 0;
4461 #endif
4462 }
4463 
4464 /******************************************************************************
4465  *                                                                            *
4466  * Function: init_trend_cache                                                 *
4467  *                                                                            *
4468  * Purpose: Allocate shared memory for trend cache (part of database cache)   *
4469  *                                                                            *
4470  * Author: Vladimir Levijev                                                   *
4471  *                                                                            *
4472  * Comments: Is optionally called from init_database_cache()                  *
4473  *                                                                            *
4474  ******************************************************************************/
4475 
ZBX_MEM_FUNC_IMPL(__trend,trend_mem)4476 ZBX_MEM_FUNC_IMPL(__trend, trend_mem)
4477 
4478 static int	init_trend_cache(char **error)
4479 {
4480 	size_t	sz;
4481 	int	ret;
4482 
4483 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
4484 
4485 	if (SUCCEED != (ret = zbx_mutex_create(&trends_lock, ZBX_MUTEX_TRENDS, error)))
4486 		goto out;
4487 
4488 	sz = zbx_mem_required_size(1, "trend cache", "TrendCacheSize");
4489 	if (SUCCEED != (ret = zbx_mem_create(&trend_mem, CONFIG_TRENDS_CACHE_SIZE, "trend cache", "TrendCacheSize", 0,
4490 			error)))
4491 	{
4492 		goto out;
4493 	}
4494 
4495 	CONFIG_TRENDS_CACHE_SIZE -= sz;
4496 
4497 	cache->trends_num = 0;
4498 	cache->trends_last_cleanup_hour = 0;
4499 
4500 #define INIT_HASHSET_SIZE	100	/* Should be calculated dynamically based on trends size? */
4501 					/* Still does not make sense to have it more than initial */
4502 					/* item hashset size in configuration cache.              */
4503 
4504 	zbx_hashset_create_ext(&cache->trends, INIT_HASHSET_SIZE,
4505 			ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, NULL,
4506 			__trend_mem_malloc_func, __trend_mem_realloc_func, __trend_mem_free_func);
4507 
4508 #undef INIT_HASHSET_SIZE
4509 out:
4510 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
4511 
4512 	return ret;
4513 }
4514 
4515 /******************************************************************************
4516  *                                                                            *
4517  * Function: init_database_cache                                              *
4518  *                                                                            *
4519  * Purpose: Allocate shared memory for database cache                         *
4520  *                                                                            *
4521  * Author: Alexei Vladishev, Alexander Vladishev                              *
4522  *                                                                            *
4523  ******************************************************************************/
init_database_cache(char ** error)4524 int	init_database_cache(char **error)
4525 {
4526 	int	ret;
4527 
4528 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
4529 
4530 	if (SUCCEED != (ret = zbx_mutex_create(&cache_lock, ZBX_MUTEX_CACHE, error)))
4531 		goto out;
4532 
4533 	if (SUCCEED != (ret = zbx_mutex_create(&cache_ids_lock, ZBX_MUTEX_CACHE_IDS, error)))
4534 		goto out;
4535 
4536 	if (SUCCEED != (ret = zbx_mem_create(&hc_mem, CONFIG_HISTORY_CACHE_SIZE, "history cache",
4537 			"HistoryCacheSize", 1, error)))
4538 	{
4539 		goto out;
4540 	}
4541 
4542 	if (SUCCEED != (ret = zbx_mem_create(&hc_index_mem, CONFIG_HISTORY_INDEX_CACHE_SIZE, "history index cache",
4543 			"HistoryIndexCacheSize", 0, error)))
4544 	{
4545 		goto out;
4546 	}
4547 
4548 	cache = (ZBX_DC_CACHE *)__hc_index_mem_malloc_func(NULL, sizeof(ZBX_DC_CACHE));
4549 	memset(cache, 0, sizeof(ZBX_DC_CACHE));
4550 
4551 	ids = (ZBX_DC_IDS *)__hc_index_mem_malloc_func(NULL, sizeof(ZBX_DC_IDS));
4552 	memset(ids, 0, sizeof(ZBX_DC_IDS));
4553 
4554 	zbx_hashset_create_ext(&cache->history_items, ZBX_HC_ITEMS_INIT_SIZE,
4555 			ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, NULL,
4556 			__hc_index_mem_malloc_func, __hc_index_mem_realloc_func, __hc_index_mem_free_func);
4557 
4558 	zbx_binary_heap_create_ext(&cache->history_queue, hc_queue_elem_compare_func, ZBX_BINARY_HEAP_OPTION_EMPTY,
4559 			__hc_index_mem_malloc_func, __hc_index_mem_realloc_func, __hc_index_mem_free_func);
4560 
4561 	if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
4562 	{
4563 		zbx_hashset_create_ext(&(cache->proxyqueue.index), ZBX_HC_SYNC_MAX,
4564 			ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, NULL,
4565 			__hc_index_mem_malloc_func, __hc_index_mem_realloc_func, __hc_index_mem_free_func);
4566 
4567 		zbx_list_create_ext(&(cache->proxyqueue.list), __hc_index_mem_malloc_func, __hc_index_mem_free_func);
4568 
4569 		cache->proxyqueue.state = ZBX_HC_PROXYQUEUE_STATE_NORMAL;
4570 
4571 		if (SUCCEED != (ret = init_trend_cache(error)))
4572 			goto out;
4573 	}
4574 
4575 	cache->history_num_total = 0;
4576 	cache->history_progress_ts = 0;
4577 
4578 	cache->db_trigger_queue_lock = 1;
4579 
4580 	if (NULL == sql)
4581 		sql = (char *)zbx_malloc(sql, sql_alloc);
4582 out:
4583 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
4584 
4585 	return ret;
4586 }
4587 
4588 /******************************************************************************
4589  *                                                                            *
4590  * Function: DCsync_all                                                       *
4591  *                                                                            *
4592  * Purpose: writes updates and new data from pool and cache data to database  *
4593  *                                                                            *
4594  * Author: Alexei Vladishev                                                   *
4595  *                                                                            *
4596  ******************************************************************************/
DCsync_all(void)4597 static void	DCsync_all(void)
4598 {
4599 	zabbix_log(LOG_LEVEL_DEBUG, "In DCsync_all()");
4600 
4601 	sync_history_cache_full();
4602 	if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
4603 		DCsync_trends();
4604 
4605 	zabbix_log(LOG_LEVEL_DEBUG, "End of DCsync_all()");
4606 }
4607 
4608 /******************************************************************************
4609  *                                                                            *
4610  * Function: free_database_cache                                              *
4611  *                                                                            *
4612  * Purpose: Free memory allocated for database cache                          *
4613  *                                                                            *
4614  * Author: Alexei Vladishev, Alexander Vladishev                              *
4615  *                                                                            *
4616  ******************************************************************************/
free_database_cache(void)4617 void	free_database_cache(void)
4618 {
4619 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
4620 
4621 	DCsync_all();
4622 
4623 	cache = NULL;
4624 
4625 	zbx_mutex_destroy(&cache_lock);
4626 	zbx_mutex_destroy(&cache_ids_lock);
4627 
4628 	if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
4629 		zbx_mutex_destroy(&trends_lock);
4630 
4631 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
4632 }
4633 
4634 /******************************************************************************
4635  *                                                                            *
4636  * Function: DCget_nextid                                                     *
4637  *                                                                            *
4638  * Purpose: Return next id for requested table                                *
4639  *                                                                            *
4640  * Author: Alexander Vladishev                                                *
4641  *                                                                            *
4642  ******************************************************************************/
DCget_nextid(const char * table_name,int num)4643 zbx_uint64_t	DCget_nextid(const char *table_name, int num)
4644 {
4645 	int		i;
4646 	DB_RESULT	result;
4647 	DB_ROW		row;
4648 	const ZBX_TABLE	*table;
4649 	ZBX_DC_ID	*id;
4650 	zbx_uint64_t	min = 0, max = ZBX_DB_MAX_ID, nextid, lastid;
4651 
4652 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() table:'%s' num:%d", __func__, table_name, num);
4653 
4654 	LOCK_CACHE_IDS;
4655 
4656 	for (i = 0; i < ZBX_IDS_SIZE; i++)
4657 	{
4658 		id = &ids->id[i];
4659 		if ('\0' == *id->table_name)
4660 			break;
4661 
4662 		if (0 == strcmp(id->table_name, table_name))
4663 		{
4664 			nextid = id->lastid + 1;
4665 			id->lastid += num;
4666 			lastid = id->lastid;
4667 
4668 			UNLOCK_CACHE_IDS;
4669 
4670 			zabbix_log(LOG_LEVEL_DEBUG, "End of %s() table:'%s' [" ZBX_FS_UI64 ":" ZBX_FS_UI64 "]",
4671 					__func__, table_name, nextid, lastid);
4672 
4673 			return nextid;
4674 		}
4675 	}
4676 
4677 	if (i == ZBX_IDS_SIZE)
4678 	{
4679 		zabbix_log(LOG_LEVEL_ERR, "insufficient shared memory for ids");
4680 		exit(EXIT_FAILURE);
4681 	}
4682 
4683 	table = DBget_table(table_name);
4684 
4685 	result = DBselect("select max(%s) from %s where %s between " ZBX_FS_UI64 " and " ZBX_FS_UI64,
4686 			table->recid, table_name, table->recid, min, max);
4687 
4688 	if (NULL != result)
4689 	{
4690 		zbx_strlcpy(id->table_name, table_name, sizeof(id->table_name));
4691 
4692 		if (NULL == (row = DBfetch(result)) || SUCCEED == DBis_null(row[0]))
4693 			id->lastid = min;
4694 		else
4695 			ZBX_STR2UINT64(id->lastid, row[0]);
4696 
4697 		nextid = id->lastid + 1;
4698 		id->lastid += num;
4699 		lastid = id->lastid;
4700 	}
4701 	else
4702 		nextid = lastid = 0;
4703 
4704 	UNLOCK_CACHE_IDS;
4705 
4706 	DBfree_result(result);
4707 
4708 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() table:'%s' [" ZBX_FS_UI64 ":" ZBX_FS_UI64 "]",
4709 			__func__, table_name, nextid, lastid);
4710 
4711 	return nextid;
4712 }
4713 
4714 /******************************************************************************
4715  *                                                                            *
4716  * Function: DCupdate_interfaces_availability                                 *
4717  *                                                                            *
4718  * Purpose: performs interface availability reset for hosts with              *
4719  *          availability set on interfaces without enabled items              *
4720  *                                                                            *
4721  ******************************************************************************/
DCupdate_interfaces_availability(void)4722 void	DCupdate_interfaces_availability(void)
4723 {
4724 	zbx_vector_availability_ptr_t		interfaces;
4725 
4726 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
4727 
4728 	zbx_vector_availability_ptr_create(&interfaces);
4729 
4730 	if (SUCCEED != DCreset_interfaces_availability(&interfaces))
4731 		goto out;
4732 
4733 	zbx_availabilities_flush(&interfaces);
4734 out:
4735 	zbx_vector_availability_ptr_clear_ext(&interfaces, zbx_interface_availability_free);
4736 	zbx_vector_availability_ptr_destroy(&interfaces);
4737 
4738 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
4739 }
4740 
4741 /******************************************************************************
4742  *                                                                            *
4743  * Function: zbx_hc_get_diag_stats                                            *
4744  *                                                                            *
4745  * Purpose: get history cache diagnostics statistics                          *
4746  *                                                                            *
4747  ******************************************************************************/
zbx_hc_get_diag_stats(zbx_uint64_t * items_num,zbx_uint64_t * values_num)4748 void	zbx_hc_get_diag_stats(zbx_uint64_t *items_num, zbx_uint64_t *values_num)
4749 {
4750 	LOCK_CACHE;
4751 
4752 	*values_num = cache->history_num;
4753 	*items_num = cache->history_items.num_data;
4754 
4755 	UNLOCK_CACHE;
4756 }
4757 
4758 /******************************************************************************
4759  *                                                                            *
4760  * Function: zbx_hc_get_mem_stats                                             *
4761  *                                                                            *
4762  * Purpose: get shared memory allocator statistics                            *
4763  *                                                                            *
4764  ******************************************************************************/
zbx_hc_get_mem_stats(zbx_mem_stats_t * data,zbx_mem_stats_t * index)4765 void	zbx_hc_get_mem_stats(zbx_mem_stats_t *data, zbx_mem_stats_t *index)
4766 {
4767 	LOCK_CACHE;
4768 
4769 	if (NULL != data)
4770 		zbx_mem_get_stats(hc_mem, data);
4771 
4772 	if (NULL != index)
4773 		zbx_mem_get_stats(hc_index_mem, index);
4774 
4775 	UNLOCK_CACHE;
4776 }
4777 
4778 /******************************************************************************
4779  *                                                                            *
4780  * Function: zbx_hc_get_items                                                 *
4781  *                                                                            *
4782  * Purpose: get statistics of cached items                                    *
4783  *                                                                            *
4784  ******************************************************************************/
zbx_hc_get_items(zbx_vector_uint64_pair_t * items)4785 void	zbx_hc_get_items(zbx_vector_uint64_pair_t *items)
4786 {
4787 	zbx_hashset_iter_t	iter;
4788 	zbx_hc_item_t		*item;
4789 
4790 	LOCK_CACHE;
4791 
4792 	zbx_vector_uint64_pair_reserve(items, cache->history_items.num_data);
4793 
4794 	zbx_hashset_iter_reset(&cache->history_items, &iter);
4795 	while (NULL != (item = (zbx_hc_item_t *)zbx_hashset_iter_next(&iter)))
4796 	{
4797 		zbx_uint64_pair_t	pair = {item->itemid, item->values_num};
4798 		zbx_vector_uint64_pair_append_ptr(items, &pair);
4799 	}
4800 
4801 	UNLOCK_CACHE;
4802 }
4803 
4804 /******************************************************************************
4805  *                                                                            *
4806  * Function: zbx_db_trigger_queue_locked                                      *
4807  *                                                                            *
4808  * Purpose: checks if database trigger queue table is locked                  *
4809  *                                                                            *
4810  ******************************************************************************/
zbx_db_trigger_queue_locked(void)4811 int	zbx_db_trigger_queue_locked(void)
4812 {
4813 	return 0 == cache->db_trigger_queue_lock ? FAIL : SUCCEED;
4814 }
4815 
4816 /******************************************************************************
4817  *                                                                            *
4818  * Function: zbx_db_trigger_queue_unlock                                      *
4819  *                                                                            *
4820  * Purpose: unlocks database trigger queue table                              *
4821  *                                                                            *
4822  ******************************************************************************/
zbx_db_trigger_queue_unlock(void)4823 void	zbx_db_trigger_queue_unlock(void)
4824 {
4825 	cache->db_trigger_queue_lock = 0;
4826 }
4827 
4828 
4829 /******************************************************************************
4830  *                                                                            *
4831  * Function: zbx_hc_proxyqueue_peek                                           *
4832  *                                                                            *
4833  * Purpose: return first proxy in a queue, function assumes that a queue is   *
4834  *          not empty                                                         *
4835  *                                                                            *
4836  * Return value: proxyid at the top a queue                                   *
4837  *                                                                            *
4838  ******************************************************************************/
zbx_hc_proxyqueue_peek(void)4839 static zbx_uint64_t	zbx_hc_proxyqueue_peek(void)
4840 {
4841 	zbx_uint64_t	*p_val;
4842 
4843 	if (NULL == cache->proxyqueue.list.head)
4844 		return 0;
4845 
4846 	p_val = (zbx_uint64_t *)(cache->proxyqueue.list.head->data);
4847 
4848 	return *p_val;
4849 }
4850 
4851 /******************************************************************************
4852  *                                                                            *
4853  * Function: zbx_hc_proxyqueue_enqueue                                        *
4854  *                                                                            *
4855  * Purpose: add new proxyid to a queue                                        *
4856  *                                                                            *
4857  * Parameters: proxyid   - [IN] the proxy id                                  *
4858  *                                                                            *
4859  ******************************************************************************/
zbx_hc_proxyqueue_enqueue(zbx_uint64_t proxyid)4860 static void	zbx_hc_proxyqueue_enqueue(zbx_uint64_t proxyid)
4861 {
4862 	if (NULL == zbx_hashset_search(&cache->proxyqueue.index, &proxyid))
4863 	{
4864 		zbx_uint64_t *ptr;
4865 
4866 		ptr = zbx_hashset_insert(&cache->proxyqueue.index, &proxyid, sizeof(proxyid));
4867 		zbx_list_append(&cache->proxyqueue.list, ptr, NULL);
4868 	}
4869 }
4870 
4871 /******************************************************************************
4872  *                                                                            *
4873  * Function: zbx_hc_proxyqueue_dequeue                                        *
4874  *                                                                            *
4875  * Purpose: try to dequeue proxyid from a proxy queue                         *
4876  *                                                                            *
4877  * Parameters: chk_proxyid  - [IN] the proxyid                                *
4878  *                                                                            *
4879  * Return value: SUCCEED - retrieval successful                               *
4880  *               FAIL    - otherwise                                          *
4881  *                                                                            *
4882  ******************************************************************************/
zbx_hc_proxyqueue_dequeue(zbx_uint64_t proxyid)4883 static int	zbx_hc_proxyqueue_dequeue(zbx_uint64_t proxyid)
4884 {
4885 	zbx_uint64_t	top_val;
4886 	void		*rem_val = 0;
4887 
4888 	top_val = zbx_hc_proxyqueue_peek();
4889 
4890 	if (proxyid != top_val)
4891 		return FAIL;
4892 
4893 	if (FAIL == zbx_list_pop(&cache->proxyqueue.list, &rem_val))
4894 		return FAIL;
4895 
4896 	zbx_hashset_remove_direct(&cache->proxyqueue.index, rem_val);
4897 
4898 	return SUCCEED;
4899 }
4900 
4901 /******************************************************************************
4902  *                                                                            *
4903  * Function: zbx_hc_proxyqueue_clear                                          *
4904  *                                                                            *
4905  * Purpose: remove all proxies from proxy priority queue                      *
4906  *                                                                            *
4907  ******************************************************************************/
zbx_hc_proxyqueue_clear(void)4908 static void	zbx_hc_proxyqueue_clear(void)
4909 {
4910 	zbx_list_destroy(&cache->proxyqueue.list);
4911 	zbx_hashset_clear(&cache->proxyqueue.index);
4912 }
4913 
4914 /******************************************************************************
4915  *                                                                            *
4916  * Function: zbx_hc_check_proxy                                               *
4917  *                                                                            *
4918  * Purpose: check status of a history cache usage, enqueue/dequeue proxy      *
4919  *          from priority list and accordingly enable or disable wait mode    *
4920  *                                                                            *
4921  * Parameters: proxyid   - [IN] the proxyid                                   *
4922  *                                                                            *
4923  * Return value: SUCCEED - proxy can be processed now                         *
4924  *               FAIL    - proxy cannot be processed now, it got enqueued     *
4925  *                                                                            *
4926  ******************************************************************************/
zbx_hc_check_proxy(zbx_uint64_t proxyid)4927 int	zbx_hc_check_proxy(zbx_uint64_t proxyid)
4928 {
4929 	double	hc_pused;
4930 	int	ret;
4931 
4932 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() proxyid:"ZBX_FS_UI64, __func__, proxyid);
4933 
4934 	LOCK_CACHE;
4935 
4936 	hc_pused = 100 * (double)(hc_mem->total_size - hc_mem->free_size) / hc_mem->total_size;
4937 
4938 	if (20 >= hc_pused)
4939 	{
4940 		cache->proxyqueue.state = ZBX_HC_PROXYQUEUE_STATE_NORMAL;
4941 
4942 		zbx_hc_proxyqueue_clear();
4943 
4944 		ret = SUCCEED;
4945 		goto out;
4946 	}
4947 
4948 	if (ZBX_HC_PROXYQUEUE_STATE_WAIT == cache->proxyqueue.state)
4949 	{
4950 		zbx_hc_proxyqueue_enqueue(proxyid);
4951 
4952 		if (60 < hc_pused)
4953 		{
4954 			ret = FAIL;
4955 			goto out;
4956 		}
4957 
4958 		cache->proxyqueue.state = ZBX_HC_PROXYQUEUE_STATE_NORMAL;
4959 	}
4960 	else
4961 	{
4962 		if (80 <= hc_pused)
4963 		{
4964 			cache->proxyqueue.state = ZBX_HC_PROXYQUEUE_STATE_WAIT;
4965 			zbx_hc_proxyqueue_enqueue(proxyid);
4966 
4967 			ret = FAIL;
4968 			goto out;
4969 		}
4970 	}
4971 
4972 	if (0 == zbx_hc_proxyqueue_peek())
4973 	{
4974 		ret = SUCCEED;
4975 		goto out;
4976 	}
4977 
4978 	ret = zbx_hc_proxyqueue_dequeue(proxyid);
4979 
4980 out:
4981 	UNLOCK_CACHE;
4982 
4983 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
4984 
4985 	return ret;
4986 }
4987