1 /*
2 ** Zabbix
3 ** Copyright (C) 2001-2021 Zabbix SIA
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
18 **/
19 
20 #include "common.h"
21 #include "log.h"
22 #include "threads.h"
23 
24 #include "db.h"
25 #include "dbcache.h"
26 #include "ipc.h"
27 #include "mutexs.h"
28 #include "zbxserver.h"
29 #include "proxy.h"
30 #include "events.h"
31 #include "memalloc.h"
32 #include "zbxalgo.h"
33 #include "valuecache.h"
34 #include "zbxmodules.h"
35 #include "module.h"
36 #include "export.h"
37 #include "zbxjson.h"
38 #include "zbxhistory.h"
39 #include "zbxalgo.h"
40 
41 static zbx_mem_info_t	*hc_index_mem = NULL;
42 static zbx_mem_info_t	*hc_mem = NULL;
43 static zbx_mem_info_t	*trend_mem = NULL;
44 
45 #define	LOCK_CACHE	zbx_mutex_lock(cache_lock)
46 #define	UNLOCK_CACHE	zbx_mutex_unlock(cache_lock)
47 #define	LOCK_TRENDS	zbx_mutex_lock(trends_lock)
48 #define	UNLOCK_TRENDS	zbx_mutex_unlock(trends_lock)
49 #define	LOCK_CACHE_IDS		zbx_mutex_lock(cache_ids_lock)
50 #define	UNLOCK_CACHE_IDS	zbx_mutex_unlock(cache_ids_lock)
51 
52 static zbx_mutex_t	cache_lock = ZBX_MUTEX_NULL;
53 static zbx_mutex_t	trends_lock = ZBX_MUTEX_NULL;
54 static zbx_mutex_t	cache_ids_lock = ZBX_MUTEX_NULL;
55 
56 static char		*sql = NULL;
57 static size_t		sql_alloc = 4 * ZBX_KIBIBYTE;
58 
59 extern unsigned char	program_type;
60 extern int		CONFIG_DOUBLE_PRECISION;
61 extern char		*CONFIG_EXPORT_DIR;
62 
63 #define ZBX_IDS_SIZE	9
64 
65 #define ZBX_HC_ITEMS_INIT_SIZE	1000
66 
67 #define ZBX_TRENDS_CLEANUP_TIME	((SEC_PER_HOUR * 55) / 60)
68 
69 /* the maximum time spent synchronizing history */
70 #define ZBX_HC_SYNC_TIME_MAX	10
71 
72 /* the maximum number of items in one synchronization batch */
73 #define ZBX_HC_SYNC_MAX		1000
74 #define ZBX_HC_TIMER_MAX	(ZBX_HC_SYNC_MAX / 2)
75 
76 /* the minimum processed item percentage of item candidates to continue synchronizing */
77 #define ZBX_HC_SYNC_MIN_PCNT	10
78 
79 /* the maximum number of characters for history cache values */
80 #define ZBX_HISTORY_VALUE_LEN	(1024 * 64)
81 
82 #define ZBX_DC_FLAGS_NOT_FOR_HISTORY	(ZBX_DC_FLAG_NOVALUE | ZBX_DC_FLAG_UNDEF | ZBX_DC_FLAG_NOHISTORY)
83 #define ZBX_DC_FLAGS_NOT_FOR_TRENDS	(ZBX_DC_FLAG_NOVALUE | ZBX_DC_FLAG_UNDEF | ZBX_DC_FLAG_NOTRENDS)
84 #define ZBX_DC_FLAGS_NOT_FOR_MODULES	(ZBX_DC_FLAGS_NOT_FOR_HISTORY | ZBX_DC_FLAG_LLD)
85 #define ZBX_DC_FLAGS_NOT_FOR_EXPORT	(ZBX_DC_FLAG_NOVALUE | ZBX_DC_FLAG_UNDEF)
86 
87 #define ZBX_HC_PROXYQUEUE_STATE_NORMAL 0
88 #define ZBX_HC_PROXYQUEUE_STATE_WAIT 1
89 
90 typedef struct
91 {
92 	char		table_name[ZBX_TABLENAME_LEN_MAX];
93 	zbx_uint64_t	lastid;
94 }
95 ZBX_DC_ID;
96 
97 typedef struct
98 {
99 	ZBX_DC_ID	id[ZBX_IDS_SIZE];
100 }
101 ZBX_DC_IDS;
102 
103 static ZBX_DC_IDS	*ids = NULL;
104 
105 typedef struct
106 {
107 	zbx_list_t	list;
108 	zbx_hashset_t	index;
109 	int		state;
110 }
111 zbx_hc_proxyqueue_t;
112 
113 typedef struct
114 {
115 	zbx_hashset_t		trends;
116 	ZBX_DC_STATS		stats;
117 
118 	zbx_hashset_t		history_items;
119 	zbx_binary_heap_t	history_queue;
120 
121 	int			history_num;
122 	int			trends_num;
123 	int			trends_last_cleanup_hour;
124 	int			history_num_total;
125 	int			history_progress_ts;
126 
127 	zbx_hc_proxyqueue_t     proxyqueue;
128 }
129 ZBX_DC_CACHE;
130 
131 static ZBX_DC_CACHE	*cache = NULL;
132 
133 /* local history cache */
134 #define ZBX_MAX_VALUES_LOCAL	256
135 #define ZBX_STRUCT_REALLOC_STEP	8
136 #define ZBX_STRING_REALLOC_STEP	ZBX_KIBIBYTE
137 
138 typedef struct
139 {
140 	size_t	pvalue;
141 	size_t	len;
142 }
143 dc_value_str_t;
144 
145 typedef struct
146 {
147 	double		value_dbl;
148 	zbx_uint64_t	value_uint;
149 	dc_value_str_t	value_str;
150 }
151 dc_value_t;
152 
153 typedef struct
154 {
155 	zbx_uint64_t	itemid;
156 	dc_value_t	value;
157 	zbx_timespec_t	ts;
158 	dc_value_str_t	source;		/* for log items only */
159 	zbx_uint64_t	lastlogsize;
160 	int		timestamp;	/* for log items only */
161 	int		severity;	/* for log items only */
162 	int		logeventid;	/* for log items only */
163 	int		mtime;
164 	unsigned char	item_value_type;
165 	unsigned char	value_type;
166 	unsigned char	state;
167 	unsigned char	flags;		/* see ZBX_DC_FLAG_* above */
168 }
169 dc_item_value_t;
170 
171 static char		*string_values = NULL;
172 static size_t		string_values_alloc = 0, string_values_offset = 0;
173 static dc_item_value_t	*item_values = NULL;
174 static size_t		item_values_alloc = 0, item_values_num = 0;
175 
176 static void	hc_add_item_values(dc_item_value_t *values, int values_num);
177 static void	hc_pop_items(zbx_vector_ptr_t *history_items);
178 static void	hc_get_item_values(ZBX_DC_HISTORY *history, zbx_vector_ptr_t *history_items);
179 static void	hc_push_items(zbx_vector_ptr_t *history_items);
180 static void	hc_free_item_values(ZBX_DC_HISTORY *history, int history_num);
181 static void	hc_queue_item(zbx_hc_item_t *item);
182 static int	hc_queue_elem_compare_func(const void *d1, const void *d2);
183 static int	hc_queue_get_size(void);
184 static int	hc_get_history_compression_age(void);
185 
186 /******************************************************************************
187  *                                                                            *
188  * Function: DCget_stats_all                                                  *
189  *                                                                            *
190  * Purpose: retrieves all internal metrics of the database cache              *
191  *                                                                            *
192  * Parameters: stats - [OUT] write cache metrics                              *
193  *                                                                            *
194  ******************************************************************************/
DCget_stats_all(zbx_wcache_info_t * wcache_info)195 void	DCget_stats_all(zbx_wcache_info_t *wcache_info)
196 {
197 	LOCK_CACHE;
198 
199 	wcache_info->stats = cache->stats;
200 	wcache_info->history_free = hc_mem->free_size;
201 	wcache_info->history_total = hc_mem->total_size;
202 	wcache_info->index_free = hc_index_mem->free_size;
203 	wcache_info->index_total = hc_index_mem->total_size;
204 
205 	if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
206 	{
207 		wcache_info->trend_free = trend_mem->free_size;
208 		wcache_info->trend_total = trend_mem->orig_size;
209 	}
210 
211 	UNLOCK_CACHE;
212 }
213 
214 /******************************************************************************
215  *                                                                            *
216  * Function: DCget_stats                                                      *
217  *                                                                            *
218  * Purpose: get statistics of the database cache                              *
219  *                                                                            *
220  * Author: Alexander Vladishev                                                *
221  *                                                                            *
222  ******************************************************************************/
DCget_stats(int request)223 void	*DCget_stats(int request)
224 {
225 	static zbx_uint64_t	value_uint;
226 	static double		value_double;
227 	void			*ret;
228 
229 	LOCK_CACHE;
230 
231 	switch (request)
232 	{
233 		case ZBX_STATS_HISTORY_COUNTER:
234 			value_uint = cache->stats.history_counter;
235 			ret = (void *)&value_uint;
236 			break;
237 		case ZBX_STATS_HISTORY_FLOAT_COUNTER:
238 			value_uint = cache->stats.history_float_counter;
239 			ret = (void *)&value_uint;
240 			break;
241 		case ZBX_STATS_HISTORY_UINT_COUNTER:
242 			value_uint = cache->stats.history_uint_counter;
243 			ret = (void *)&value_uint;
244 			break;
245 		case ZBX_STATS_HISTORY_STR_COUNTER:
246 			value_uint = cache->stats.history_str_counter;
247 			ret = (void *)&value_uint;
248 			break;
249 		case ZBX_STATS_HISTORY_LOG_COUNTER:
250 			value_uint = cache->stats.history_log_counter;
251 			ret = (void *)&value_uint;
252 			break;
253 		case ZBX_STATS_HISTORY_TEXT_COUNTER:
254 			value_uint = cache->stats.history_text_counter;
255 			ret = (void *)&value_uint;
256 			break;
257 		case ZBX_STATS_NOTSUPPORTED_COUNTER:
258 			value_uint = cache->stats.notsupported_counter;
259 			ret = (void *)&value_uint;
260 			break;
261 		case ZBX_STATS_HISTORY_TOTAL:
262 			value_uint = hc_mem->total_size;
263 			ret = (void *)&value_uint;
264 			break;
265 		case ZBX_STATS_HISTORY_USED:
266 			value_uint = hc_mem->total_size - hc_mem->free_size;
267 			ret = (void *)&value_uint;
268 			break;
269 		case ZBX_STATS_HISTORY_FREE:
270 			value_uint = hc_mem->free_size;
271 			ret = (void *)&value_uint;
272 			break;
273 		case ZBX_STATS_HISTORY_PUSED:
274 			value_double = 100 * (double)(hc_mem->total_size - hc_mem->free_size) / hc_mem->total_size;
275 			ret = (void *)&value_double;
276 			break;
277 		case ZBX_STATS_HISTORY_PFREE:
278 			value_double = 100 * (double)hc_mem->free_size / hc_mem->total_size;
279 			ret = (void *)&value_double;
280 			break;
281 		case ZBX_STATS_TREND_TOTAL:
282 			value_uint = trend_mem->orig_size;
283 			ret = (void *)&value_uint;
284 			break;
285 		case ZBX_STATS_TREND_USED:
286 			value_uint = trend_mem->orig_size - trend_mem->free_size;
287 			ret = (void *)&value_uint;
288 			break;
289 		case ZBX_STATS_TREND_FREE:
290 			value_uint = trend_mem->free_size;
291 			ret = (void *)&value_uint;
292 			break;
293 		case ZBX_STATS_TREND_PUSED:
294 			value_double = 100 * (double)(trend_mem->orig_size - trend_mem->free_size) /
295 					trend_mem->orig_size;
296 			ret = (void *)&value_double;
297 			break;
298 		case ZBX_STATS_TREND_PFREE:
299 			value_double = 100 * (double)trend_mem->free_size / trend_mem->orig_size;
300 			ret = (void *)&value_double;
301 			break;
302 		case ZBX_STATS_HISTORY_INDEX_TOTAL:
303 			value_uint = hc_index_mem->total_size;
304 			ret = (void *)&value_uint;
305 			break;
306 		case ZBX_STATS_HISTORY_INDEX_USED:
307 			value_uint = hc_index_mem->total_size - hc_index_mem->free_size;
308 			ret = (void *)&value_uint;
309 			break;
310 		case ZBX_STATS_HISTORY_INDEX_FREE:
311 			value_uint = hc_index_mem->free_size;
312 			ret = (void *)&value_uint;
313 			break;
314 		case ZBX_STATS_HISTORY_INDEX_PUSED:
315 			value_double = 100 * (double)(hc_index_mem->total_size - hc_index_mem->free_size) /
316 					hc_index_mem->total_size;
317 			ret = (void *)&value_double;
318 			break;
319 		case ZBX_STATS_HISTORY_INDEX_PFREE:
320 			value_double = 100 * (double)hc_index_mem->free_size / hc_index_mem->total_size;
321 			ret = (void *)&value_double;
322 			break;
323 		default:
324 			ret = NULL;
325 	}
326 
327 	UNLOCK_CACHE;
328 
329 	return ret;
330 }
331 
332 /******************************************************************************
333  *                                                                            *
334  * Function: DCget_trend                                                      *
335  *                                                                            *
336  * Purpose: find existing or add new structure and return pointer             *
337  *                                                                            *
338  * Return value: pointer to a trend structure                                 *
339  *                                                                            *
340  * Author: Alexander Vladishev                                                *
341  *                                                                            *
342  ******************************************************************************/
DCget_trend(zbx_uint64_t itemid)343 static ZBX_DC_TREND	*DCget_trend(zbx_uint64_t itemid)
344 {
345 	ZBX_DC_TREND	*ptr, trend;
346 
347 	if (NULL != (ptr = (ZBX_DC_TREND *)zbx_hashset_search(&cache->trends, &itemid)))
348 		return ptr;
349 
350 	memset(&trend, 0, sizeof(ZBX_DC_TREND));
351 	trend.itemid = itemid;
352 
353 	return (ZBX_DC_TREND *)zbx_hashset_insert(&cache->trends, &trend, sizeof(ZBX_DC_TREND));
354 }
355 
356 /******************************************************************************
357  *                                                                            *
358  * Function: DCupdate_trends                                                  *
359  *                                                                            *
360  * Purpose: apply disable_from changes to cache                               *
361  *                                                                            *
362  ******************************************************************************/
DCupdate_trends(zbx_vector_uint64_pair_t * trends_diff)363 static void	DCupdate_trends(zbx_vector_uint64_pair_t *trends_diff)
364 {
365 	int	i;
366 
367 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
368 
369 	LOCK_TRENDS;
370 
371 	for (i = 0; i < trends_diff->values_num; i++)
372 	{
373 		ZBX_DC_TREND	*trend;
374 
375 		if (NULL != (trend = (ZBX_DC_TREND *)zbx_hashset_search(&cache->trends, &trends_diff->values[i].first)))
376 			trend->disable_from = trends_diff->values[i].second;
377 	}
378 
379 	UNLOCK_TRENDS;
380 
381 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
382 }
383 
384 /******************************************************************************
385  *                                                                            *
386  * Function: dc_insert_trends_in_db                                           *
387  *                                                                            *
388  * Purpose: helper function for DCflush trends                                *
389  *                                                                            *
390  ******************************************************************************/
dc_insert_trends_in_db(ZBX_DC_TREND * trends,int trends_num,unsigned char value_type,const char * table_name,int clock)391 static void	dc_insert_trends_in_db(ZBX_DC_TREND *trends, int trends_num, unsigned char value_type,
392 		const char *table_name, int clock)
393 {
394 	ZBX_DC_TREND	*trend;
395 	int		i;
396 	zbx_db_insert_t	db_insert;
397 
398 	zbx_db_insert_prepare(&db_insert, table_name, "itemid", "clock", "num", "value_min", "value_avg",
399 			"value_max", NULL);
400 
401 	for (i = 0; i < trends_num; i++)
402 	{
403 		trend = &trends[i];
404 
405 		if (0 == trend->itemid)
406 			continue;
407 
408 		if (clock != trend->clock || value_type != trend->value_type)
409 			continue;
410 
411 		if (ITEM_VALUE_TYPE_FLOAT == value_type)
412 		{
413 			zbx_db_insert_add_values(&db_insert, trend->itemid, trend->clock, trend->num,
414 					trend->value_min.dbl, trend->value_avg.dbl, trend->value_max.dbl);
415 		}
416 		else
417 		{
418 			zbx_uint128_t	avg;
419 
420 			/* calculate the trend average value */
421 			udiv128_64(&avg, &trend->value_avg.ui64, trend->num);
422 
423 			zbx_db_insert_add_values(&db_insert, trend->itemid, trend->clock, trend->num,
424 					trend->value_min.ui64, avg.lo, trend->value_max.ui64);
425 		}
426 
427 		trend->itemid = 0;
428 	}
429 
430 	zbx_db_insert_execute(&db_insert);
431 	zbx_db_insert_clean(&db_insert);
432 }
433 
434 /******************************************************************************
435  *                                                                            *
436  * Function: dc_remove_updated_trends                                         *
437  *                                                                            *
438  * Purpose: Update trends disable_until for items without trends data past or *
439  *          equal the specified clock                                         *
440  *                                                                            *
441  * Comments: A helper function for DCflush trends                             *
442  *                                                                            *
443  ******************************************************************************/
dc_remove_updated_trends(ZBX_DC_TREND * trends,int trends_num,const char * table_name,int value_type,zbx_uint64_t * itemids,int * itemids_num,int clock)444 static void	dc_remove_updated_trends(ZBX_DC_TREND *trends, int trends_num, const char *table_name,
445 		int value_type, zbx_uint64_t *itemids, int *itemids_num, int clock)
446 {
447 	int		i, j, clocks_num, now, age;
448 	ZBX_DC_TREND	*trend;
449 	zbx_uint64_t	itemid;
450 	size_t		sql_offset;
451 	DB_RESULT	result;
452 	DB_ROW		row;
453 	int		clocks[] = {SEC_PER_DAY, SEC_PER_WEEK, SEC_PER_MONTH, SEC_PER_YEAR, INT_MAX};
454 
455 	now = time(NULL);
456 	age = now - clock;
457 	for (clocks_num = 0; age > clocks[clocks_num]; clocks_num++)
458 		clocks[clocks_num] = now - clocks[clocks_num];
459 	clocks[clocks_num] = clock;
460 
461 	/* remove itemids with trends data past or equal the clock */
462 	for (j = 0; j <= clocks_num && 0 < *itemids_num; j++)
463 	{
464 		sql_offset = 0;
465 		zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
466 				"select distinct itemid"
467 				" from %s"
468 				" where clock>=%d and",
469 				table_name, clocks[j]);
470 
471 		if (0 < j)
472 			zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, " clock<%d and", clocks[j - 1]);
473 
474 		DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", itemids, *itemids_num);
475 
476 		result = DBselect("%s", sql);
477 
478 		while (NULL != (row = DBfetch(result)))
479 		{
480 			ZBX_STR2UINT64(itemid, row[0]);
481 			uint64_array_remove(itemids, itemids_num, &itemid, 1);
482 		}
483 		DBfree_result(result);
484 	}
485 
486 	/* update trends disable_until for the leftover itemids */
487 	while (0 != *itemids_num)
488 	{
489 		itemid = itemids[--*itemids_num];
490 
491 		for (i = 0; i < trends_num; i++)
492 		{
493 			trend = &trends[i];
494 
495 			if (itemid != trend->itemid)
496 				continue;
497 
498 			if (clock != trend->clock || value_type != trend->value_type)
499 				continue;
500 
501 			trend->disable_from = clock;
502 			break;
503 		}
504 	}
505 }
506 
507 /******************************************************************************
508  *                                                                            *
509  * Function: dc_trends_update_float                                           *
510  *                                                                            *
511  * Purpose: helper function for DCflush trends                                *
512  *                                                                            *
513  ******************************************************************************/
dc_trends_update_float(ZBX_DC_TREND * trend,DB_ROW row,int num,size_t * sql_offset)514 static void	dc_trends_update_float(ZBX_DC_TREND *trend, DB_ROW row, int num, size_t *sql_offset)
515 {
516 	history_value_t	value_min, value_avg, value_max;
517 
518 	value_min.dbl = atof(row[2]);
519 	value_avg.dbl = atof(row[3]);
520 	value_max.dbl = atof(row[4]);
521 
522 	if (value_min.dbl < trend->value_min.dbl)
523 		trend->value_min.dbl = value_min.dbl;
524 
525 	if (value_max.dbl > trend->value_max.dbl)
526 		trend->value_max.dbl = value_max.dbl;
527 
528 	trend->value_avg.dbl = trend->value_avg.dbl / (trend->num + num) * trend->num +
529 			value_avg.dbl / (trend->num + num) * num;
530 	trend->num += num;
531 
532 	zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset, "update trends set"
533 			" num=%d,value_min=" ZBX_FS_DBL64_SQL ",value_avg=" ZBX_FS_DBL64_SQL
534 			",value_max=" ZBX_FS_DBL64_SQL
535 			" where itemid=" ZBX_FS_UI64 " and clock=%d;\n",
536 			trend->num, trend->value_min.dbl, trend->value_avg.dbl, trend->value_max.dbl,
537 			trend->itemid, trend->clock);
538 }
539 
540 /******************************************************************************
541  *                                                                            *
542  * Function: dc_trends_update_uint                                            *
543  *                                                                            *
544  * Purpose: helper function for DCflush trends                                *
545  *                                                                            *
546  ******************************************************************************/
dc_trends_update_uint(ZBX_DC_TREND * trend,DB_ROW row,int num,size_t * sql_offset)547 static void	dc_trends_update_uint(ZBX_DC_TREND *trend, DB_ROW row, int num, size_t *sql_offset)
548 {
549 	history_value_t	value_min, value_avg, value_max;
550 	zbx_uint128_t	avg;
551 
552 	ZBX_STR2UINT64(value_min.ui64, row[2]);
553 	ZBX_STR2UINT64(value_avg.ui64, row[3]);
554 	ZBX_STR2UINT64(value_max.ui64, row[4]);
555 
556 	if (value_min.ui64 < trend->value_min.ui64)
557 		trend->value_min.ui64 = value_min.ui64;
558 	if (value_max.ui64 > trend->value_max.ui64)
559 		trend->value_max.ui64 = value_max.ui64;
560 
561 	/* calculate the trend average value */
562 	umul64_64(&avg, num, value_avg.ui64);
563 	uinc128_128(&trend->value_avg.ui64, &avg);
564 	udiv128_64(&avg, &trend->value_avg.ui64, trend->num + num);
565 
566 	trend->num += num;
567 
568 	zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset,
569 			"update trends_uint set num=%d,value_min=" ZBX_FS_UI64 ",value_avg="
570 			ZBX_FS_UI64 ",value_max=" ZBX_FS_UI64 " where itemid=" ZBX_FS_UI64
571 			" and clock=%d;\n",
572 			trend->num,
573 			trend->value_min.ui64,
574 			avg.lo,
575 			trend->value_max.ui64,
576 			trend->itemid,
577 			trend->clock);
578 }
579 
580 /******************************************************************************
581  *                                                                            *
582  * Function: dc_trends_fetch_and_update                                       *
583  *                                                                            *
584  * Purpose: helper function for DCflush trends                                *
585  *                                                                            *
586  ******************************************************************************/
dc_trends_fetch_and_update(ZBX_DC_TREND * trends,int trends_num,zbx_uint64_t * itemids,int itemids_num,int * inserts_num,unsigned char value_type,const char * table_name,int clock)587 static void	dc_trends_fetch_and_update(ZBX_DC_TREND *trends, int trends_num, zbx_uint64_t *itemids,
588 		int itemids_num, int *inserts_num, unsigned char value_type,
589 		const char *table_name, int clock)
590 {
591 
592 	int		i, num;
593 	DB_RESULT	result;
594 	DB_ROW		row;
595 	zbx_uint64_t	itemid;
596 	ZBX_DC_TREND	*trend;
597 	size_t		sql_offset;
598 
599 	sql_offset = 0;
600 	zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
601 			"select itemid,num,value_min,value_avg,value_max"
602 			" from %s"
603 			" where clock=%d and",
604 			table_name, clock);
605 
606 	DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", itemids, itemids_num);
607 
608 	result = DBselect("%s order by itemid,clock", sql);
609 
610 	sql_offset = 0;
611 	DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset);
612 
613 	while (NULL != (row = DBfetch(result)))
614 	{
615 		ZBX_STR2UINT64(itemid, row[0]);
616 
617 		for (i = 0; i < trends_num; i++)
618 		{
619 			trend = &trends[i];
620 
621 			if (itemid != trend->itemid)
622 				continue;
623 
624 			if (clock != trend->clock || value_type != trend->value_type)
625 				continue;
626 
627 			break;
628 		}
629 
630 		if (i == trends_num)
631 		{
632 			THIS_SHOULD_NEVER_HAPPEN;
633 			continue;
634 		}
635 
636 		num = atoi(row[1]);
637 
638 		if (value_type == ITEM_VALUE_TYPE_FLOAT)
639 			dc_trends_update_float(trend, row, num, &sql_offset);
640 		else
641 			dc_trends_update_uint(trend, row, num, &sql_offset);
642 
643 		trend->itemid = 0;
644 
645 		--*inserts_num;
646 
647 		DBexecute_overflowed_sql(&sql, &sql_alloc, &sql_offset);
648 	}
649 
650 	DBfree_result(result);
651 
652 	DBend_multiple_update(&sql, &sql_alloc, &sql_offset);
653 
654 	if (sql_offset > 16)	/* In ORACLE always present begin..end; */
655 		DBexecute("%s", sql);
656 }
657 
658 /******************************************************************************
659  *                                                                            *
660  * Function: DBflush_trends                                                   *
661  *                                                                            *
662  * Purpose: flush trend to the database                                       *
663  *                                                                            *
664  * Author: Alexander Vladishev                                                *
665  *                                                                            *
666  ******************************************************************************/
DBflush_trends(ZBX_DC_TREND * trends,int * trends_num,zbx_vector_uint64_pair_t * trends_diff)667 static void	DBflush_trends(ZBX_DC_TREND *trends, int *trends_num, zbx_vector_uint64_pair_t *trends_diff)
668 {
669 	int		num, i, clock, inserts_num = 0, itemids_alloc, itemids_num = 0, trends_to = *trends_num;
670 	unsigned char	value_type;
671 	zbx_uint64_t	*itemids = NULL;
672 	ZBX_DC_TREND	*trend = NULL;
673 	const char	*table_name;
674 
675 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() trends_num:%d", __func__, *trends_num);
676 
677 	clock = trends[0].clock;
678 	value_type = trends[0].value_type;
679 
680 	switch (value_type)
681 	{
682 		case ITEM_VALUE_TYPE_FLOAT:
683 			table_name = "trends";
684 			break;
685 		case ITEM_VALUE_TYPE_UINT64:
686 			table_name = "trends_uint";
687 			break;
688 		default:
689 			assert(0);
690 	}
691 
692 	itemids_alloc = MIN(ZBX_HC_SYNC_MAX, *trends_num);
693 	itemids = (zbx_uint64_t *)zbx_malloc(itemids, itemids_alloc * sizeof(zbx_uint64_t));
694 
695 	for (i = 0; i < *trends_num; i++)
696 	{
697 		trend = &trends[i];
698 
699 		if (clock != trend->clock || value_type != trend->value_type)
700 			continue;
701 
702 		inserts_num++;
703 
704 		if (0 != trend->disable_from)
705 			continue;
706 
707 		uint64_array_add(&itemids, &itemids_alloc, &itemids_num, trend->itemid, 64);
708 
709 		if (ZBX_HC_SYNC_MAX == itemids_num)
710 		{
711 			trends_to = i + 1;
712 			break;
713 		}
714 	}
715 
716 	if (0 != itemids_num)
717 	{
718 		dc_remove_updated_trends(trends, trends_to, table_name, value_type, itemids,
719 				&itemids_num, clock);
720 	}
721 
722 	for (i = 0; i < trends_to; i++)
723 	{
724 		trend = &trends[i];
725 
726 		if (clock != trend->clock || value_type != trend->value_type)
727 			continue;
728 
729 		if (0 != trend->disable_from && clock >= trend->disable_from)
730 			continue;
731 
732 		uint64_array_add(&itemids, &itemids_alloc, &itemids_num, trend->itemid, 64);
733 	}
734 
735 	if (0 != itemids_num)
736 	{
737 		dc_trends_fetch_and_update(trends, trends_to, itemids, itemids_num,
738 				&inserts_num, value_type, table_name, clock);
739 	}
740 
741 	zbx_free(itemids);
742 
743 	/* if 'trends' is not a primary trends buffer */
744 	if (NULL != trends_diff)
745 	{
746 		/* we update it too */
747 		for (i = 0; i < trends_to; i++)
748 		{
749 			zbx_uint64_pair_t	pair;
750 
751 			if (0 == trends[i].itemid)
752 				continue;
753 
754 			if (clock != trends[i].clock || value_type != trends[i].value_type)
755 				continue;
756 
757 			if (0 == trends[i].disable_from || trends[i].disable_from > clock)
758 				continue;
759 
760 			pair.first = trends[i].itemid;
761 			pair.second = clock + SEC_PER_HOUR;
762 			zbx_vector_uint64_pair_append(trends_diff, pair);
763 		}
764 	}
765 
766 	if (0 != inserts_num)
767 		dc_insert_trends_in_db(trends, trends_to, value_type, table_name, clock);
768 
769 	/* clean trends */
770 	for (i = 0, num = 0; i < *trends_num; i++)
771 	{
772 		if (0 == trends[i].itemid)
773 			continue;
774 
775 		memcpy(&trends[num++], &trends[i], sizeof(ZBX_DC_TREND));
776 	}
777 	*trends_num = num;
778 
779 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
780 }
781 
782 /******************************************************************************
783  *                                                                            *
784  * Function: DCflush_trend                                                    *
785  *                                                                            *
786  * Purpose: move trend to the array of trends for flushing to DB              *
787  *                                                                            *
788  * Author: Alexander Vladishev                                                *
789  *                                                                            *
790  ******************************************************************************/
DCflush_trend(ZBX_DC_TREND * trend,ZBX_DC_TREND ** trends,int * trends_alloc,int * trends_num)791 static void	DCflush_trend(ZBX_DC_TREND *trend, ZBX_DC_TREND **trends, int *trends_alloc, int *trends_num)
792 {
793 	if (*trends_num == *trends_alloc)
794 	{
795 		*trends_alloc += 256;
796 		*trends = (ZBX_DC_TREND *)zbx_realloc(*trends, *trends_alloc * sizeof(ZBX_DC_TREND));
797 	}
798 
799 	memcpy(&(*trends)[*trends_num], trend, sizeof(ZBX_DC_TREND));
800 	(*trends_num)++;
801 
802 	trend->clock = 0;
803 	trend->num = 0;
804 	memset(&trend->value_min, 0, sizeof(history_value_t));
805 	memset(&trend->value_avg, 0, sizeof(value_avg_t));
806 	memset(&trend->value_max, 0, sizeof(history_value_t));
807 }
808 
809 /******************************************************************************
810  *                                                                            *
811  * Function: DCadd_trend                                                      *
812  *                                                                            *
813  * Purpose: add new value to the trends                                       *
814  *                                                                            *
815  * Author: Alexander Vladishev                                                *
816  *                                                                            *
817  ******************************************************************************/
DCadd_trend(const ZBX_DC_HISTORY * history,ZBX_DC_TREND ** trends,int * trends_alloc,int * trends_num)818 static void	DCadd_trend(const ZBX_DC_HISTORY *history, ZBX_DC_TREND **trends, int *trends_alloc, int *trends_num)
819 {
820 	ZBX_DC_TREND	*trend = NULL;
821 	int		hour;
822 
823 	hour = history->ts.sec - history->ts.sec % SEC_PER_HOUR;
824 
825 	trend = DCget_trend(history->itemid);
826 
827 	if (trend->num > 0 && (trend->clock != hour || trend->value_type != history->value_type) &&
828 			SUCCEED == zbx_history_requires_trends(trend->value_type))
829 	{
830 		DCflush_trend(trend, trends, trends_alloc, trends_num);
831 	}
832 
833 	trend->value_type = history->value_type;
834 	trend->clock = hour;
835 
836 	switch (trend->value_type)
837 	{
838 		case ITEM_VALUE_TYPE_FLOAT:
839 			if (trend->num == 0 || history->value.dbl < trend->value_min.dbl)
840 				trend->value_min.dbl = history->value.dbl;
841 			if (trend->num == 0 || history->value.dbl > trend->value_max.dbl)
842 				trend->value_max.dbl = history->value.dbl;
843 			trend->value_avg.dbl += history->value.dbl / (trend->num + 1) -
844 					trend->value_avg.dbl / (trend->num + 1);
845 			break;
846 		case ITEM_VALUE_TYPE_UINT64:
847 			if (trend->num == 0 || history->value.ui64 < trend->value_min.ui64)
848 				trend->value_min.ui64 = history->value.ui64;
849 			if (trend->num == 0 || history->value.ui64 > trend->value_max.ui64)
850 				trend->value_max.ui64 = history->value.ui64;
851 			uinc128_64(&trend->value_avg.ui64, history->value.ui64);
852 			break;
853 	}
854 	trend->num++;
855 }
856 
857 /******************************************************************************
858  *                                                                            *
859  * Function: DCmass_update_trends                                             *
860  *                                                                            *
861  * Purpose: update trends cache and get list of trends to flush into database *
862  *                                                                            *
863  * Parameters: history         - [IN]  array of history data                  *
864  *             history_num     - [IN]  number of history structures           *
865  *             trends          - [OUT] list of trends to flush into database  *
866  *             trends_num      - [OUT] number of trends                       *
867  *             compression_age - [IN]  history compression age                *
868  *                                                                            *
869  * Author: Alexander Vladishev                                                *
870  *                                                                            *
871  ******************************************************************************/
DCmass_update_trends(const ZBX_DC_HISTORY * history,int history_num,ZBX_DC_TREND ** trends,int * trends_num,int compression_age)872 static void	DCmass_update_trends(const ZBX_DC_HISTORY *history, int history_num, ZBX_DC_TREND **trends,
873 		int *trends_num, int compression_age)
874 {
875 	static int	last_trend_discard = 0;
876 	zbx_timespec_t	ts;
877 	int		trends_alloc = 0, i, hour, seconds;
878 
879 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
880 
881 	zbx_timespec(&ts);
882 	seconds = ts.sec % SEC_PER_HOUR;
883 	hour = ts.sec - seconds;
884 
885 	LOCK_TRENDS;
886 
887 	for (i = 0; i < history_num; i++)
888 	{
889 		const ZBX_DC_HISTORY	*h = &history[i];
890 
891 		if (0 != (ZBX_DC_FLAGS_NOT_FOR_TRENDS & h->flags))
892 			continue;
893 
894 		DCadd_trend(h, trends, &trends_alloc, trends_num);
895 	}
896 
897 	if (cache->trends_last_cleanup_hour < hour && ZBX_TRENDS_CLEANUP_TIME < seconds)
898 	{
899 		zbx_hashset_iter_t	iter;
900 		ZBX_DC_TREND		*trend;
901 
902 		zbx_hashset_iter_reset(&cache->trends, &iter);
903 
904 		while (NULL != (trend = (ZBX_DC_TREND *)zbx_hashset_iter_next(&iter)))
905 		{
906 			if (trend->clock == hour)
907 				continue;
908 
909 			/* discard trend items that are older than compression age */
910 			if (0 != compression_age && trend->clock < compression_age)
911 			{
912 				if (SEC_PER_HOUR < (ts.sec - last_trend_discard)) /* log once per hour */
913 				{
914 					zabbix_log(LOG_LEVEL_TRACE, "discarding trends that are pointing to"
915 							" compressed history period");
916 					last_trend_discard = ts.sec;
917 				}
918 			}
919 			else if (SUCCEED == zbx_history_requires_trends(trend->value_type))
920 				DCflush_trend(trend, trends, &trends_alloc, trends_num);
921 
922 			zbx_hashset_iter_remove(&iter);
923 		}
924 
925 		cache->trends_last_cleanup_hour = hour;
926 	}
927 
928 	UNLOCK_TRENDS;
929 
930 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
931 }
932 
zbx_trend_compare(const void * d1,const void * d2)933 static int	zbx_trend_compare(const void *d1, const void *d2)
934 {
935 	const ZBX_DC_TREND	*p1 = (const ZBX_DC_TREND *)d1;
936 	const ZBX_DC_TREND	*p2 = (const ZBX_DC_TREND *)d2;
937 
938 	ZBX_RETURN_IF_NOT_EQUAL(p1->itemid, p2->itemid);
939 	ZBX_RETURN_IF_NOT_EQUAL(p1->clock, p2->clock);
940 
941 	return 0;
942 }
943 
944 /******************************************************************************
945  *                                                                            *
946  * Function: DBmass_update_trends                                             *
947  *                                                                            *
948  * Purpose: prepare history data using items from configuration cache         *
949  *                                                                            *
950  * Parameters: trends      - [IN] trends from cache to be added to database   *
951  *             trends_num  - [IN] number of trends to add to database         *
952  *             trends_diff - [OUT] disable_from updates                       *
953  *                                                                            *
954  ******************************************************************************/
DBmass_update_trends(const ZBX_DC_TREND * trends,int trends_num,zbx_vector_uint64_pair_t * trends_diff)955 static void	DBmass_update_trends(const ZBX_DC_TREND *trends, int trends_num,
956 		zbx_vector_uint64_pair_t *trends_diff)
957 {
958 	ZBX_DC_TREND	*trends_tmp;
959 
960 	if (0 != trends_num)
961 	{
962 		trends_tmp = (ZBX_DC_TREND *)zbx_malloc(NULL, trends_num * sizeof(ZBX_DC_TREND));
963 		memcpy(trends_tmp, trends, trends_num * sizeof(ZBX_DC_TREND));
964 		qsort(trends_tmp, trends_num, sizeof(ZBX_DC_TREND), zbx_trend_compare);
965 
966 		while (0 < trends_num)
967 			DBflush_trends(trends_tmp, &trends_num, trends_diff);
968 
969 		zbx_free(trends_tmp);
970 	}
971 }
972 
973 typedef struct
974 {
975 	zbx_uint64_t		hostid;
976 	zbx_vector_ptr_t	groups;
977 }
978 zbx_host_info_t;
979 
980 /******************************************************************************
981  *                                                                            *
982  * Function: zbx_host_info_clean                                              *
983  *                                                                            *
984  * Purpose: frees resources allocated to store host groups names              *
985  *                                                                            *
986  * Parameters: host_info - [IN] host information                              *
987  *                                                                            *
988  ******************************************************************************/
zbx_host_info_clean(zbx_host_info_t * host_info)989 static void	zbx_host_info_clean(zbx_host_info_t *host_info)
990 {
991 	zbx_vector_ptr_clear_ext(&host_info->groups, zbx_ptr_free);
992 	zbx_vector_ptr_destroy(&host_info->groups);
993 }
994 
995 /******************************************************************************
996  *                                                                            *
997  * Function: db_get_hosts_info_by_hostid                                      *
998  *                                                                            *
999  * Purpose: get hosts groups names                                            *
1000  *                                                                            *
1001  * Parameters: hosts_info - [IN/OUT] output names of host groups for a host   *
1002  *             hostids    - [IN] hosts identifiers                            *
1003  *                                                                            *
1004  ******************************************************************************/
db_get_hosts_info_by_hostid(zbx_hashset_t * hosts_info,const zbx_vector_uint64_t * hostids)1005 static void	db_get_hosts_info_by_hostid(zbx_hashset_t *hosts_info, const zbx_vector_uint64_t *hostids)
1006 {
1007 	int		i;
1008 	size_t		sql_offset = 0;
1009 	DB_RESULT	result;
1010 	DB_ROW		row;
1011 
1012 	for (i = 0; i < hostids->values_num; i++)
1013 	{
1014 		zbx_host_info_t	host_info = {.hostid = hostids->values[i]};
1015 
1016 		zbx_vector_ptr_create(&host_info.groups);
1017 		zbx_hashset_insert(hosts_info, &host_info, sizeof(host_info));
1018 	}
1019 
1020 	zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
1021 				"select distinct hg.hostid,g.name"
1022 				" from hstgrp g,hosts_groups hg"
1023 				" where g.groupid=hg.groupid"
1024 					" and");
1025 
1026 	DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "hg.hostid", hostids->values, hostids->values_num);
1027 
1028 	result = DBselect("%s", sql);
1029 
1030 	while (NULL != (row = DBfetch(result)))
1031 	{
1032 		zbx_uint64_t	hostid;
1033 		zbx_host_info_t	*host_info;
1034 
1035 		ZBX_DBROW2UINT64(hostid, row[0]);
1036 
1037 		if (NULL == (host_info = (zbx_host_info_t *)zbx_hashset_search(hosts_info, &hostid)))
1038 		{
1039 			THIS_SHOULD_NEVER_HAPPEN;
1040 			continue;
1041 		}
1042 
1043 		zbx_vector_ptr_append(&host_info->groups, zbx_strdup(NULL, row[1]));
1044 	}
1045 	DBfree_result(result);
1046 }
1047 
1048 typedef struct
1049 {
1050 	zbx_uint64_t		itemid;
1051 	char			*name;
1052 	DC_ITEM			*item;
1053 	zbx_vector_ptr_t	applications;
1054 }
1055 zbx_item_info_t;
1056 
1057 /******************************************************************************
1058  *                                                                            *
1059  * Function: db_get_items_info_by_itemid                                      *
1060  *                                                                            *
1061  * Purpose: get items name and applications                                   *
1062  *                                                                            *
1063  * Parameters: items_info - [IN/OUT] output item name and applications        *
1064  *             itemids    - [IN] the item identifiers                         *
1065  *                                                                            *
1066  ******************************************************************************/
db_get_items_info_by_itemid(zbx_hashset_t * items_info,const zbx_vector_uint64_t * itemids)1067 static void	db_get_items_info_by_itemid(zbx_hashset_t *items_info, const zbx_vector_uint64_t *itemids)
1068 {
1069 	size_t		sql_offset = 0;
1070 	DB_RESULT	result;
1071 	DB_ROW		row;
1072 
1073 	zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset, "select itemid,name from items where");
1074 	DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "itemid", itemids->values, itemids->values_num);
1075 
1076 	result = DBselect("%s", sql);
1077 
1078 	while (NULL != (row = DBfetch(result)))
1079 	{
1080 		zbx_uint64_t	itemid;
1081 		zbx_item_info_t	*item_info;
1082 
1083 		ZBX_DBROW2UINT64(itemid, row[0]);
1084 
1085 		if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &itemid)))
1086 		{
1087 			THIS_SHOULD_NEVER_HAPPEN;
1088 			continue;
1089 		}
1090 
1091 		zbx_substitute_item_name_macros(item_info->item, row[1], &item_info->name);
1092 	}
1093 	DBfree_result(result);
1094 
1095 	sql_offset = 0;
1096 	zbx_snprintf_alloc(&sql, &sql_alloc, &sql_offset,
1097 			"select i.itemid,a.name"
1098 			" from applications a,items_applications i"
1099 			" where a.applicationid=i.applicationid"
1100 				" and");
1101 
1102 	DBadd_condition_alloc(&sql, &sql_alloc, &sql_offset, "i.itemid", itemids->values, itemids->values_num);
1103 
1104 	result = DBselect("%s", sql);
1105 
1106 	while (NULL != (row = DBfetch(result)))
1107 	{
1108 		zbx_uint64_t	itemid;
1109 		zbx_item_info_t	*item_info;
1110 
1111 		ZBX_DBROW2UINT64(itemid, row[0]);
1112 
1113 		if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &itemid)))
1114 		{
1115 			THIS_SHOULD_NEVER_HAPPEN;
1116 			continue;
1117 		}
1118 
1119 		zbx_vector_ptr_append(&item_info->applications, zbx_strdup(NULL, row[1]));
1120 	}
1121 	DBfree_result(result);
1122 }
1123 
1124 /******************************************************************************
1125  *                                                                            *
1126  * Function: zbx_item_info_clean                                              *
1127  *                                                                            *
1128  * Purpose: frees resources allocated to store item applications and name     *
1129  *                                                                            *
1130  * Parameters: item_info - [IN] item information                              *
1131  *                                                                            *
1132  ******************************************************************************/
zbx_item_info_clean(zbx_item_info_t * item_info)1133 static void	zbx_item_info_clean(zbx_item_info_t *item_info)
1134 {
1135 	zbx_vector_ptr_clear_ext(&item_info->applications, zbx_ptr_free);
1136 	zbx_vector_ptr_destroy(&item_info->applications);
1137 	zbx_free(item_info->name);
1138 }
1139 
1140 /******************************************************************************
1141  *                                                                            *
1142  * Function: DCexport_trends                                                  *
1143  *                                                                            *
1144  * Purpose: export trends                                                     *
1145  *                                                                            *
1146  * Parameters: trends     - [IN] trends from cache                            *
1147  *             trends_num - [IN] number of trends                             *
1148  *             hosts_info - [IN] hosts groups names                           *
1149  *             items_info - [IN] item names and applications                  *
1150  *                                                                            *
1151  ******************************************************************************/
DCexport_trends(const ZBX_DC_TREND * trends,int trends_num,zbx_hashset_t * hosts_info,zbx_hashset_t * items_info)1152 static void	DCexport_trends(const ZBX_DC_TREND *trends, int trends_num, zbx_hashset_t *hosts_info,
1153 		zbx_hashset_t *items_info)
1154 {
1155 	struct zbx_json		json;
1156 	const ZBX_DC_TREND	*trend = NULL;
1157 	int			i, j;
1158 	const DC_ITEM		*item;
1159 	zbx_host_info_t		*host_info;
1160 	zbx_item_info_t		*item_info;
1161 	zbx_uint128_t		avg;	/* calculate the trend average value */
1162 
1163 	zbx_json_init(&json, ZBX_JSON_STAT_BUF_LEN);
1164 
1165 	for (i = 0; i < trends_num; i++)
1166 	{
1167 		trend = &trends[i];
1168 
1169 		if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &trend->itemid)))
1170 			continue;
1171 
1172 		item = item_info->item;
1173 
1174 		if (NULL == (host_info = (zbx_host_info_t *)zbx_hashset_search(hosts_info, &item->host.hostid)))
1175 		{
1176 			THIS_SHOULD_NEVER_HAPPEN;
1177 			continue;
1178 		}
1179 
1180 		zbx_json_clean(&json);
1181 
1182 		zbx_json_addobject(&json,ZBX_PROTO_TAG_HOST);
1183 		zbx_json_addstring(&json, ZBX_PROTO_TAG_HOST, item->host.host, ZBX_JSON_TYPE_STRING);
1184 		zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item->host.name, ZBX_JSON_TYPE_STRING);
1185 		zbx_json_close(&json);
1186 
1187 		zbx_json_addarray(&json, ZBX_PROTO_TAG_GROUPS);
1188 
1189 		for (j = 0; j < host_info->groups.values_num; j++)
1190 			zbx_json_addstring(&json, NULL, host_info->groups.values[j], ZBX_JSON_TYPE_STRING);
1191 
1192 		zbx_json_close(&json);
1193 
1194 		zbx_json_addarray(&json, ZBX_PROTO_TAG_APPLICATIONS);
1195 
1196 		for (j = 0; j < item_info->applications.values_num; j++)
1197 			zbx_json_addstring(&json, NULL, item_info->applications.values[j], ZBX_JSON_TYPE_STRING);
1198 
1199 		zbx_json_close(&json);
1200 		zbx_json_adduint64(&json, ZBX_PROTO_TAG_ITEMID, item->itemid);
1201 
1202 		if (NULL != item_info->name)
1203 			zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item_info->name, ZBX_JSON_TYPE_STRING);
1204 
1205 		zbx_json_addint64(&json, ZBX_PROTO_TAG_CLOCK, trend->clock);
1206 		zbx_json_addint64(&json, ZBX_PROTO_TAG_COUNT, trend->num);
1207 
1208 		switch (trend->value_type)
1209 		{
1210 			case ITEM_VALUE_TYPE_FLOAT:
1211 				zbx_json_addfloat(&json, ZBX_PROTO_TAG_MIN, trend->value_min.dbl);
1212 				zbx_json_addfloat(&json, ZBX_PROTO_TAG_AVG, trend->value_avg.dbl);
1213 				zbx_json_addfloat(&json, ZBX_PROTO_TAG_MAX, trend->value_max.dbl);
1214 				break;
1215 			case ITEM_VALUE_TYPE_UINT64:
1216 				zbx_json_adduint64(&json, ZBX_PROTO_TAG_MIN, trend->value_min.ui64);
1217 				udiv128_64(&avg, &trend->value_avg.ui64, trend->num);
1218 				zbx_json_adduint64(&json, ZBX_PROTO_TAG_AVG, avg.lo);
1219 				zbx_json_adduint64(&json, ZBX_PROTO_TAG_MAX, trend->value_max.ui64);
1220 				break;
1221 			default:
1222 				THIS_SHOULD_NEVER_HAPPEN;
1223 		}
1224 
1225 		zbx_json_adduint64(&json, ZBX_PROTO_TAG_TYPE, trend->value_type);
1226 		zbx_trends_export_write(json.buffer, json.buffer_size);
1227 	}
1228 
1229 	zbx_trends_export_flush();
1230 	zbx_json_free(&json);
1231 }
1232 
1233 /******************************************************************************
1234  *                                                                            *
1235  * Function: DCexport_history                                                 *
1236  *                                                                            *
1237  * Purpose: export history                                                    *
1238  *                                                                            *
1239  * Parameters: history     - [IN/OUT] array of history data                   *
1240  *             history_num - [IN] number of history structures                *
1241  *             hosts_info  - [IN] hosts groups names                          *
1242  *             items_info  - [IN] item names and applications                 *
1243  *                                                                            *
1244  ******************************************************************************/
DCexport_history(const ZBX_DC_HISTORY * history,int history_num,zbx_hashset_t * hosts_info,zbx_hashset_t * items_info)1245 static void	DCexport_history(const ZBX_DC_HISTORY *history, int history_num, zbx_hashset_t *hosts_info,
1246 		zbx_hashset_t *items_info)
1247 {
1248 	const ZBX_DC_HISTORY	*h;
1249 	const DC_ITEM		*item;
1250 	int			i, j;
1251 	zbx_host_info_t		*host_info;
1252 	zbx_item_info_t		*item_info;
1253 	struct zbx_json		json;
1254 
1255 	zbx_json_init(&json, ZBX_JSON_STAT_BUF_LEN);
1256 
1257 	for (i = 0; i < history_num; i++)
1258 	{
1259 		h = &history[i];
1260 
1261 		if (0 != (ZBX_DC_FLAGS_NOT_FOR_MODULES & h->flags))
1262 			continue;
1263 
1264 		if (NULL == (item_info = (zbx_item_info_t *)zbx_hashset_search(items_info, &h->itemid)))
1265 		{
1266 			THIS_SHOULD_NEVER_HAPPEN;
1267 			continue;
1268 		}
1269 
1270 		item = item_info->item;
1271 
1272 		if (NULL == (host_info = (zbx_host_info_t *)zbx_hashset_search(hosts_info, &item->host.hostid)))
1273 		{
1274 			THIS_SHOULD_NEVER_HAPPEN;
1275 			continue;
1276 		}
1277 
1278 		zbx_json_clean(&json);
1279 
1280 		zbx_json_addobject(&json,ZBX_PROTO_TAG_HOST);
1281 		zbx_json_addstring(&json, ZBX_PROTO_TAG_HOST, item->host.host, ZBX_JSON_TYPE_STRING);
1282 		zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item->host.name, ZBX_JSON_TYPE_STRING);
1283 		zbx_json_close(&json);
1284 
1285 		zbx_json_addarray(&json, ZBX_PROTO_TAG_GROUPS);
1286 
1287 		for (j = 0; j < host_info->groups.values_num; j++)
1288 			zbx_json_addstring(&json, NULL, host_info->groups.values[j], ZBX_JSON_TYPE_STRING);
1289 
1290 		zbx_json_close(&json);
1291 
1292 		zbx_json_addarray(&json, ZBX_PROTO_TAG_APPLICATIONS);
1293 
1294 		for (j = 0; j < item_info->applications.values_num; j++)
1295 			zbx_json_addstring(&json, NULL, item_info->applications.values[j], ZBX_JSON_TYPE_STRING);
1296 
1297 		zbx_json_close(&json);
1298 		zbx_json_adduint64(&json, ZBX_PROTO_TAG_ITEMID, item->itemid);
1299 
1300 		if (NULL != item_info->name)
1301 			zbx_json_addstring(&json, ZBX_PROTO_TAG_NAME, item_info->name, ZBX_JSON_TYPE_STRING);
1302 
1303 		zbx_json_addint64(&json, ZBX_PROTO_TAG_CLOCK, h->ts.sec);
1304 		zbx_json_addint64(&json, ZBX_PROTO_TAG_NS, h->ts.ns);
1305 
1306 		switch (h->value_type)
1307 		{
1308 			case ITEM_VALUE_TYPE_FLOAT:
1309 				zbx_json_addfloat(&json, ZBX_PROTO_TAG_VALUE, h->value.dbl);
1310 				break;
1311 			case ITEM_VALUE_TYPE_UINT64:
1312 				zbx_json_adduint64(&json, ZBX_PROTO_TAG_VALUE, h->value.ui64);
1313 				break;
1314 			case ITEM_VALUE_TYPE_STR:
1315 				zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, h->value.str, ZBX_JSON_TYPE_STRING);
1316 				break;
1317 			case ITEM_VALUE_TYPE_TEXT:
1318 				zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, h->value.str, ZBX_JSON_TYPE_STRING);
1319 				break;
1320 			case ITEM_VALUE_TYPE_LOG:
1321 				zbx_json_addint64(&json, ZBX_PROTO_TAG_LOGTIMESTAMP, h->value.log->timestamp);
1322 				zbx_json_addstring(&json, ZBX_PROTO_TAG_LOGSOURCE,
1323 						ZBX_NULL2EMPTY_STR(h->value.log->source), ZBX_JSON_TYPE_STRING);
1324 				zbx_json_addint64(&json, ZBX_PROTO_TAG_LOGSEVERITY, h->value.log->severity);
1325 				zbx_json_addint64(&json, ZBX_PROTO_TAG_LOGEVENTID, h->value.log->logeventid);
1326 				zbx_json_addstring(&json, ZBX_PROTO_TAG_VALUE, h->value.log->value,
1327 						ZBX_JSON_TYPE_STRING);
1328 				break;
1329 			default:
1330 				THIS_SHOULD_NEVER_HAPPEN;
1331 		}
1332 
1333 		zbx_json_adduint64(&json, ZBX_PROTO_TAG_TYPE, h->value_type);
1334 		zbx_history_export_write(json.buffer, json.buffer_size);
1335 	}
1336 
1337 	zbx_history_export_flush();
1338 	zbx_json_free(&json);
1339 }
1340 
1341 /******************************************************************************
1342  *                                                                            *
1343  * Function: DCexport_history_and_trends                                      *
1344  *                                                                            *
1345  * Purpose: export history and trends                                         *
1346  *                                                                            *
1347  * Parameters: history     - [IN/OUT] array of history data                   *
1348  *             history_num - [IN] number of history structures                *
1349  *             itemids     - [IN] the item identifiers                        *
1350  *                                (used for item lookup)                      *
1351  *             items       - [IN] the items                                   *
1352  *             errcodes    - [IN] item error codes                            *
1353  *             trends      - [IN] trends from cache                           *
1354  *             trends_num  - [IN] number of trends                            *
1355  *                                                                            *
1356  ******************************************************************************/
DCexport_history_and_trends(const ZBX_DC_HISTORY * history,int history_num,const zbx_vector_uint64_t * itemids,DC_ITEM * items,const int * errcodes,const ZBX_DC_TREND * trends,int trends_num)1357 static void	DCexport_history_and_trends(const ZBX_DC_HISTORY *history, int history_num,
1358 		const zbx_vector_uint64_t *itemids, DC_ITEM *items, const int *errcodes, const ZBX_DC_TREND *trends,
1359 		int trends_num)
1360 {
1361 	int			i, index;
1362 	zbx_vector_uint64_t	hostids, item_info_ids;
1363 	zbx_hashset_t		hosts_info, items_info;
1364 	DC_ITEM			*item;
1365 	zbx_item_info_t		item_info;
1366 
1367 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d trends_num:%d", __func__, history_num, trends_num);
1368 
1369 	zbx_vector_uint64_create(&hostids);
1370 	zbx_vector_uint64_create(&item_info_ids);
1371 	zbx_hashset_create_ext(&items_info, itemids->values_num, ZBX_DEFAULT_UINT64_HASH_FUNC,
1372 			ZBX_DEFAULT_UINT64_COMPARE_FUNC, (zbx_clean_func_t)zbx_item_info_clean,
1373 			ZBX_DEFAULT_MEM_MALLOC_FUNC, ZBX_DEFAULT_MEM_REALLOC_FUNC, ZBX_DEFAULT_MEM_FREE_FUNC);
1374 
1375 	for (i = 0; i < history_num; i++)
1376 	{
1377 		const ZBX_DC_HISTORY	*h = &history[i];
1378 
1379 		if (0 != (ZBX_DC_FLAGS_NOT_FOR_EXPORT & h->flags))
1380 			continue;
1381 
1382 		if (FAIL == (index = zbx_vector_uint64_bsearch(itemids, h->itemid, ZBX_DEFAULT_UINT64_COMPARE_FUNC)))
1383 		{
1384 			THIS_SHOULD_NEVER_HAPPEN;
1385 			continue;
1386 		}
1387 
1388 		if (SUCCEED != errcodes[index])
1389 			continue;
1390 
1391 		item = &items[index];
1392 
1393 		zbx_vector_uint64_append(&hostids, item->host.hostid);
1394 		zbx_vector_uint64_append(&item_info_ids, item->itemid);
1395 
1396 		item_info.itemid = item->itemid;
1397 		item_info.name = NULL;
1398 		item_info.item = item;
1399 		zbx_vector_ptr_create(&item_info.applications);
1400 		zbx_hashset_insert(&items_info, &item_info, sizeof(item_info));
1401 	}
1402 
1403 	if (0 == history_num)
1404 	{
1405 		for (i = 0; i < trends_num; i++)
1406 		{
1407 			const ZBX_DC_TREND	*trend = &trends[i];
1408 
1409 			if (FAIL == (index = zbx_vector_uint64_bsearch(itemids, trend->itemid,
1410 					ZBX_DEFAULT_UINT64_COMPARE_FUNC)))
1411 			{
1412 				THIS_SHOULD_NEVER_HAPPEN;
1413 				continue;
1414 			}
1415 
1416 			if (SUCCEED != errcodes[index])
1417 				continue;
1418 
1419 			item = &items[index];
1420 
1421 			zbx_vector_uint64_append(&hostids, item->host.hostid);
1422 			zbx_vector_uint64_append(&item_info_ids, item->itemid);
1423 
1424 			item_info.itemid = item->itemid;
1425 			item_info.name = NULL;
1426 			item_info.item = item;
1427 			zbx_vector_ptr_create(&item_info.applications);
1428 			zbx_hashset_insert(&items_info, &item_info, sizeof(item_info));
1429 		}
1430 	}
1431 
1432 	if (0 == item_info_ids.values_num)
1433 		goto clean;
1434 
1435 	zbx_vector_uint64_sort(&item_info_ids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1436 	zbx_vector_uint64_sort(&hostids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1437 	zbx_vector_uint64_uniq(&hostids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1438 
1439 	zbx_hashset_create_ext(&hosts_info, hostids.values_num, ZBX_DEFAULT_UINT64_HASH_FUNC,
1440 			ZBX_DEFAULT_UINT64_COMPARE_FUNC, (zbx_clean_func_t)zbx_host_info_clean,
1441 			ZBX_DEFAULT_MEM_MALLOC_FUNC, ZBX_DEFAULT_MEM_REALLOC_FUNC, ZBX_DEFAULT_MEM_FREE_FUNC);
1442 
1443 	db_get_hosts_info_by_hostid(&hosts_info, &hostids);
1444 
1445 	db_get_items_info_by_itemid(&items_info, &item_info_ids);
1446 
1447 	if (0 != history_num)
1448 		DCexport_history(history, history_num, &hosts_info, &items_info);
1449 
1450 	if (0 != trends_num)
1451 		DCexport_trends(trends, trends_num, &hosts_info, &items_info);
1452 
1453 	zbx_hashset_destroy(&hosts_info);
1454 clean:
1455 	zbx_hashset_destroy(&items_info);
1456 	zbx_vector_uint64_destroy(&item_info_ids);
1457 	zbx_vector_uint64_destroy(&hostids);
1458 
1459 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1460 }
1461 
1462 /******************************************************************************
1463  *                                                                            *
1464  * Function: DCexport_all_trends                                              *
1465  *                                                                            *
1466  * Purpose: export all trends                                                 *
1467  *                                                                            *
1468  * Parameters: trends     - [IN] trends from cache                            *
1469  *             trends_num - [IN] number of trends                             *
1470  *                                                                            *
1471  ******************************************************************************/
DCexport_all_trends(const ZBX_DC_TREND * trends,int trends_num)1472 static void	DCexport_all_trends(const ZBX_DC_TREND *trends, int trends_num)
1473 {
1474 	DC_ITEM			*items;
1475 	zbx_vector_uint64_t	itemids;
1476 	int			*errcodes, i, num;
1477 
1478 	zabbix_log(LOG_LEVEL_WARNING, "exporting trend data...");
1479 
1480 	while (0 < trends_num)
1481 	{
1482 		num = MIN(ZBX_HC_SYNC_MAX, trends_num);
1483 
1484 		items = (DC_ITEM *)zbx_malloc(NULL, sizeof(DC_ITEM) * (size_t)num);
1485 		errcodes = (int *)zbx_malloc(NULL, sizeof(int) * (size_t)num);
1486 
1487 		zbx_vector_uint64_create(&itemids);
1488 		zbx_vector_uint64_reserve(&itemids, num);
1489 
1490 		for (i = 0; i < num; i++)
1491 			zbx_vector_uint64_append(&itemids, trends[i].itemid);
1492 
1493 		zbx_vector_uint64_sort(&itemids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1494 
1495 		DCconfig_get_items_by_itemids(items, itemids.values, errcodes, num);
1496 
1497 		DCexport_history_and_trends(NULL, 0, &itemids, items, errcodes, trends, num);
1498 
1499 		DCconfig_clean_items(items, errcodes, num);
1500 		zbx_vector_uint64_destroy(&itemids);
1501 		zbx_free(items);
1502 		zbx_free(errcodes);
1503 
1504 		trends += num;
1505 		trends_num -= num;
1506 	}
1507 
1508 	zabbix_log(LOG_LEVEL_WARNING, "exporting trend data done");
1509 }
1510 
1511 /******************************************************************************
1512  *                                                                            *
1513  * Function: DCsync_trends                                                    *
1514  *                                                                            *
1515  * Purpose: flush all trends to the database                                  *
1516  *                                                                            *
1517  * Author: Alexander Vladishev                                                *
1518  *                                                                            *
1519  ******************************************************************************/
DCsync_trends(void)1520 static void	DCsync_trends(void)
1521 {
1522 	zbx_hashset_iter_t	iter;
1523 	ZBX_DC_TREND		*trends = NULL, *trend;
1524 	int			trends_alloc = 0, trends_num = 0, compression_age;
1525 
1526 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() trends_num:%d", __func__, cache->trends_num);
1527 
1528 	compression_age = hc_get_history_compression_age();
1529 
1530 	zabbix_log(LOG_LEVEL_WARNING, "syncing trend data...");
1531 
1532 	LOCK_TRENDS;
1533 
1534 	zbx_hashset_iter_reset(&cache->trends, &iter);
1535 
1536 	while (NULL != (trend = (ZBX_DC_TREND *)zbx_hashset_iter_next(&iter)))
1537 	{
1538 		if (SUCCEED == zbx_history_requires_trends(trend->value_type) && trend->clock >= compression_age)
1539 			DCflush_trend(trend, &trends, &trends_alloc, &trends_num);
1540 	}
1541 
1542 	UNLOCK_TRENDS;
1543 
1544 	if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_TRENDS) && 0 != trends_num)
1545 		DCexport_all_trends(trends, trends_num);
1546 
1547 	if (0 < trends_num)
1548 		qsort(trends, trends_num, sizeof(ZBX_DC_TREND), zbx_trend_compare);
1549 
1550 	DBbegin();
1551 
1552 	while (trends_num > 0)
1553 		DBflush_trends(trends, &trends_num, NULL);
1554 
1555 	DBcommit();
1556 
1557 	zbx_free(trends);
1558 
1559 	zabbix_log(LOG_LEVEL_WARNING, "syncing trend data done");
1560 
1561 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1562 }
1563 
1564 /******************************************************************************
1565  *                                                                            *
1566  * Function: recalculate_triggers                                             *
1567  *                                                                            *
1568  * Purpose: re-calculate and update values of triggers related to the items   *
1569  *                                                                            *
1570  * Parameters: history           - [IN] array of history data                 *
1571  *             history_num       - [IN] number of history structures          *
1572  *             history_itemids   - [IN] the item identifiers                  *
1573  *                                      (used for item lookup)                *
1574  *             history_items     - [IN] the items                             *
1575  *             history_errcodes  - [IN] item error codes                      *
1576  *             timer_triggerids  - [IN] the timer triggerids to process       *
1577  *             ts                - [IN] timer trigger timestamp               *
1578  *             trigger_diff      - [OUT] trigger updates                      *
1579  *                                                                            *
1580  ******************************************************************************/
recalculate_triggers(const ZBX_DC_HISTORY * history,int history_num,const zbx_vector_uint64_t * history_itemids,const DC_ITEM * history_items,const int * history_errcodes,const zbx_vector_uint64_t * timer_triggerids,zbx_timespec_t * ts,zbx_vector_ptr_t * trigger_diff)1581 static void	recalculate_triggers(const ZBX_DC_HISTORY *history, int history_num,
1582 		const zbx_vector_uint64_t *history_itemids, const DC_ITEM *history_items, const int *history_errcodes,
1583 		const zbx_vector_uint64_t *timer_triggerids, zbx_timespec_t *ts, zbx_vector_ptr_t *trigger_diff)
1584 {
1585 	int			i, item_num = 0;
1586 	zbx_uint64_t		*itemids = NULL;
1587 	zbx_timespec_t		*timespecs = NULL;
1588 	zbx_hashset_t		trigger_info;
1589 	zbx_vector_ptr_t	trigger_order;
1590 	zbx_vector_ptr_t	trigger_items;
1591 
1592 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1593 
1594 	if (0 != history_num)
1595 	{
1596 		itemids = (zbx_uint64_t *)zbx_malloc(itemids, sizeof(zbx_uint64_t) * (size_t)history_num);
1597 		timespecs = (zbx_timespec_t *)zbx_malloc(timespecs, sizeof(zbx_timespec_t) * (size_t)history_num);
1598 
1599 		for (i = 0; i < history_num; i++)
1600 		{
1601 			const ZBX_DC_HISTORY	*h = &history[i];
1602 
1603 			if (0 != (ZBX_DC_FLAG_NOVALUE & h->flags))
1604 				continue;
1605 
1606 			itemids[item_num] = h->itemid;
1607 			timespecs[item_num] = h->ts;
1608 			item_num++;
1609 		}
1610 	}
1611 
1612 	if (0 == item_num && 0 == timer_triggerids->values_num)
1613 		goto out;
1614 
1615 	zbx_hashset_create(&trigger_info, MAX(100, 2 * item_num + timer_triggerids->values_num),
1616 			ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1617 
1618 	zbx_vector_ptr_create(&trigger_order);
1619 	zbx_vector_ptr_reserve(&trigger_order, trigger_info.num_slots);
1620 
1621 	zbx_vector_ptr_create(&trigger_items);
1622 
1623 	if (0 != item_num)
1624 	{
1625 		DCconfig_get_triggers_by_itemids(&trigger_info, &trigger_order, itemids, timespecs, item_num);
1626 		zbx_determine_items_in_expressions(&trigger_order, itemids, item_num);
1627 	}
1628 
1629 	if (0 != timer_triggerids->values_num)
1630 		zbx_dc_get_timer_triggers_by_triggerids(&trigger_info, &trigger_order, timer_triggerids, ts);
1631 
1632 	zbx_vector_ptr_sort(&trigger_order, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC);
1633 	evaluate_expressions(&trigger_order, history_itemids, history_items, history_errcodes);
1634 	zbx_process_triggers(&trigger_order, trigger_diff);
1635 
1636 	DCfree_triggers(&trigger_order);
1637 
1638 	zbx_vector_ptr_destroy(&trigger_items);
1639 
1640 	zbx_hashset_destroy(&trigger_info);
1641 	zbx_vector_ptr_destroy(&trigger_order);
1642 out:
1643 	zbx_free(timespecs);
1644 	zbx_free(itemids);
1645 
1646 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1647 }
1648 
DCinventory_value_add(zbx_vector_ptr_t * inventory_values,const DC_ITEM * item,ZBX_DC_HISTORY * h)1649 static void	DCinventory_value_add(zbx_vector_ptr_t *inventory_values, const DC_ITEM *item, ZBX_DC_HISTORY *h)
1650 {
1651 	char			value[MAX_BUFFER_LEN];
1652 	const char		*inventory_field;
1653 	zbx_inventory_value_t	*inventory_value;
1654 
1655 	if (ITEM_STATE_NOTSUPPORTED == h->state)
1656 		return;
1657 
1658 	if (HOST_INVENTORY_AUTOMATIC != item->host.inventory_mode)
1659 		return;
1660 
1661 	if (0 != (ZBX_DC_FLAG_UNDEF & h->flags) || 0 != (ZBX_DC_FLAG_NOVALUE & h->flags) ||
1662 			NULL == (inventory_field = DBget_inventory_field(item->inventory_link)))
1663 	{
1664 		return;
1665 	}
1666 
1667 	switch (h->value_type)
1668 	{
1669 		case ITEM_VALUE_TYPE_FLOAT:
1670 			zbx_print_double(value, sizeof(value), h->value.dbl);
1671 			break;
1672 		case ITEM_VALUE_TYPE_UINT64:
1673 			zbx_snprintf(value, sizeof(value), ZBX_FS_UI64, h->value.ui64);
1674 			break;
1675 		case ITEM_VALUE_TYPE_STR:
1676 		case ITEM_VALUE_TYPE_TEXT:
1677 			strscpy(value, h->value.str);
1678 			break;
1679 		default:
1680 			return;
1681 	}
1682 
1683 	zbx_format_value(value, sizeof(value), item->valuemapid, item->units, h->value_type);
1684 
1685 	inventory_value = (zbx_inventory_value_t *)zbx_malloc(NULL, sizeof(zbx_inventory_value_t));
1686 
1687 	inventory_value->hostid = item->host.hostid;
1688 	inventory_value->idx = item->inventory_link - 1;
1689 	inventory_value->field_name = inventory_field;
1690 	inventory_value->value = zbx_strdup(NULL, value);
1691 
1692 	zbx_vector_ptr_append(inventory_values, inventory_value);
1693 }
1694 
DCadd_update_inventory_sql(size_t * sql_offset,const zbx_vector_ptr_t * inventory_values)1695 static void	DCadd_update_inventory_sql(size_t *sql_offset, const zbx_vector_ptr_t *inventory_values)
1696 {
1697 	char	*value_esc;
1698 	int	i;
1699 
1700 	for (i = 0; i < inventory_values->values_num; i++)
1701 	{
1702 		const zbx_inventory_value_t	*inventory_value = (zbx_inventory_value_t *)inventory_values->values[i];
1703 
1704 		value_esc = DBdyn_escape_field("host_inventory", inventory_value->field_name, inventory_value->value);
1705 
1706 		zbx_snprintf_alloc(&sql, &sql_alloc, sql_offset,
1707 				"update host_inventory set %s='%s' where hostid=" ZBX_FS_UI64 ";\n",
1708 				inventory_value->field_name, value_esc, inventory_value->hostid);
1709 
1710 		DBexecute_overflowed_sql(&sql, &sql_alloc, sql_offset);
1711 
1712 		zbx_free(value_esc);
1713 	}
1714 }
1715 
DCinventory_value_free(zbx_inventory_value_t * inventory_value)1716 static void	DCinventory_value_free(zbx_inventory_value_t *inventory_value)
1717 {
1718 	zbx_free(inventory_value->value);
1719 	zbx_free(inventory_value);
1720 }
1721 
1722 /******************************************************************************
1723  *                                                                            *
1724  * Function: dc_history_clean_value                                           *
1725  *                                                                            *
1726  * Purpose: frees resources allocated to store str/text/log value             *
1727  *                                                                            *
1728  * Parameters: history     - [IN] the history data                            *
1729  *             history_num - [IN] the number of values in history data        *
1730  *                                                                            *
1731  ******************************************************************************/
dc_history_clean_value(ZBX_DC_HISTORY * history)1732 static void	dc_history_clean_value(ZBX_DC_HISTORY *history)
1733 {
1734 	if (ITEM_STATE_NOTSUPPORTED == history->state)
1735 	{
1736 		zbx_free(history->value.err);
1737 		return;
1738 	}
1739 
1740 	if (0 != (ZBX_DC_FLAG_NOVALUE & history->flags))
1741 		return;
1742 
1743 	switch (history->value_type)
1744 	{
1745 		case ITEM_VALUE_TYPE_LOG:
1746 			zbx_free(history->value.log->value);
1747 			zbx_free(history->value.log->source);
1748 			zbx_free(history->value.log);
1749 			break;
1750 		case ITEM_VALUE_TYPE_STR:
1751 		case ITEM_VALUE_TYPE_TEXT:
1752 			zbx_free(history->value.str);
1753 			break;
1754 	}
1755 }
1756 
1757 /******************************************************************************
1758  *                                                                            *
1759  * Function: hc_free_item_values                                              *
1760  *                                                                            *
1761  * Purpose: frees resources allocated to store str/text/log values            *
1762  *                                                                            *
1763  * Parameters: history     - [IN] the history data                            *
1764  *             history_num - [IN] the number of values in history data        *
1765  *                                                                            *
1766  ******************************************************************************/
hc_free_item_values(ZBX_DC_HISTORY * history,int history_num)1767 static void	hc_free_item_values(ZBX_DC_HISTORY *history, int history_num)
1768 {
1769 	int	i;
1770 
1771 	for (i = 0; i < history_num; i++)
1772 		dc_history_clean_value(&history[i]);
1773 }
1774 
1775 /******************************************************************************
1776  *                                                                            *
1777  * Function: dc_history_set_error                                             *
1778  *                                                                            *
1779  * Purpose: sets history data to notsupported                                 *
1780  *                                                                            *
1781  * Parameters: history  - [IN] the history data                               *
1782  *             errmsg   - [IN] the error message                              *
1783  *                                                                            *
1784  * Comments: The error message is stored directly and freed with when history *
1785  *           data is cleaned.                                                 *
1786  *                                                                            *
1787  ******************************************************************************/
dc_history_set_error(ZBX_DC_HISTORY * hdata,char * errmsg)1788 static void	dc_history_set_error(ZBX_DC_HISTORY *hdata, char *errmsg)
1789 {
1790 	dc_history_clean_value(hdata);
1791 	hdata->value.err = errmsg;
1792 	hdata->state = ITEM_STATE_NOTSUPPORTED;
1793 	hdata->flags |= ZBX_DC_FLAG_UNDEF;
1794 }
1795 
1796 /******************************************************************************
1797  *                                                                            *
1798  * Function: dc_history_set_value                                             *
1799  *                                                                            *
1800  * Purpose: sets history data value                                           *
1801  *                                                                            *
1802  * Parameters: hdata      - [IN/OUT] the history data                         *
1803  *             value_type - [IN] the item value type                          *
1804  *             value      - [IN] the value to set                             *
1805  *                                                                            *
1806  ******************************************************************************/
dc_history_set_value(ZBX_DC_HISTORY * hdata,unsigned char value_type,zbx_variant_t * value)1807 static void	dc_history_set_value(ZBX_DC_HISTORY *hdata, unsigned char value_type, zbx_variant_t *value)
1808 {
1809 	char	*errmsg = NULL;
1810 
1811 	if (FAIL == zbx_variant_to_value_type(value, value_type, CONFIG_DOUBLE_PRECISION, &errmsg))
1812 	{
1813 		dc_history_set_error(hdata, errmsg);
1814 		return;
1815 	}
1816 
1817 	switch (value_type)
1818 	{
1819 		case ITEM_VALUE_TYPE_FLOAT:
1820 			dc_history_clean_value(hdata);
1821 			hdata->value.dbl = value->data.dbl;
1822 			break;
1823 		case ITEM_VALUE_TYPE_UINT64:
1824 			dc_history_clean_value(hdata);
1825 			hdata->value.ui64 = value->data.ui64;
1826 			break;
1827 		case ITEM_VALUE_TYPE_STR:
1828 			dc_history_clean_value(hdata);
1829 			hdata->value.str = value->data.str;
1830 			hdata->value.str[zbx_db_strlen_n(hdata->value.str, HISTORY_STR_VALUE_LEN)] = '\0';
1831 			break;
1832 		case ITEM_VALUE_TYPE_TEXT:
1833 			dc_history_clean_value(hdata);
1834 			hdata->value.str = value->data.str;
1835 			hdata->value.str[zbx_db_strlen_n(hdata->value.str, HISTORY_TEXT_VALUE_LEN)] = '\0';
1836 			break;
1837 		case ITEM_VALUE_TYPE_LOG:
1838 			if (ITEM_VALUE_TYPE_LOG != hdata->value_type)
1839 			{
1840 				dc_history_clean_value(hdata);
1841 				hdata->value.log = (zbx_log_value_t *)zbx_malloc(NULL, sizeof(zbx_log_value_t));
1842 				memset(hdata->value.log, 0, sizeof(zbx_log_value_t));
1843 			}
1844 			hdata->value.log->value = value->data.str;
1845 			hdata->value.str[zbx_db_strlen_n(hdata->value.str, HISTORY_LOG_VALUE_LEN)] = '\0';
1846 	}
1847 
1848 	hdata->value_type = value_type;
1849 	zbx_variant_set_none(value);
1850 }
1851 
1852 /******************************************************************************
1853  *                                                                            *
1854  * Function: normalize_item_value                                             *
1855  *                                                                            *
1856  * Purpose: normalize item value by performing truncation of long text        *
1857  *          values and changes value format according to the item value type  *
1858  *                                                                            *
1859  * Parameters: item          - [IN] the item                                  *
1860  *             hdata         - [IN/OUT] the historical data to process        *
1861  *                                                                            *
1862  ******************************************************************************/
normalize_item_value(const DC_ITEM * item,ZBX_DC_HISTORY * hdata)1863 static void	normalize_item_value(const DC_ITEM *item, ZBX_DC_HISTORY *hdata)
1864 {
1865 	char		*logvalue;
1866 	zbx_variant_t	value_var;
1867 
1868 	if (0 != (hdata->flags & ZBX_DC_FLAG_NOVALUE))
1869 		return;
1870 
1871 	if (ITEM_STATE_NOTSUPPORTED == hdata->state)
1872 		return;
1873 
1874 	if (0 == (hdata->flags & ZBX_DC_FLAG_NOHISTORY))
1875 		hdata->ttl = item->history_sec;
1876 
1877 	if (item->value_type == hdata->value_type)
1878 	{
1879 		/* truncate text based values if necessary */
1880 		switch (hdata->value_type)
1881 		{
1882 			case ITEM_VALUE_TYPE_STR:
1883 				hdata->value.str[zbx_db_strlen_n(hdata->value.str, HISTORY_STR_VALUE_LEN)] = '\0';
1884 				break;
1885 			case ITEM_VALUE_TYPE_TEXT:
1886 				hdata->value.str[zbx_db_strlen_n(hdata->value.str, HISTORY_TEXT_VALUE_LEN)] = '\0';
1887 				break;
1888 			case ITEM_VALUE_TYPE_LOG:
1889 				logvalue = hdata->value.log->value;
1890 				logvalue[zbx_db_strlen_n(logvalue, HISTORY_LOG_VALUE_LEN)] = '\0';
1891 				break;
1892 			case ITEM_VALUE_TYPE_FLOAT:
1893 				if (FAIL == zbx_validate_value_dbl(hdata->value.dbl, CONFIG_DOUBLE_PRECISION))
1894 				{
1895 					char	buffer[ZBX_MAX_DOUBLE_LEN + 1];
1896 
1897 					dc_history_set_error(hdata, zbx_dsprintf(NULL,
1898 							"Value %s is too small or too large.",
1899 							zbx_print_double(buffer, sizeof(buffer), hdata->value.dbl)));
1900 				}
1901 				break;
1902 		}
1903 		return;
1904 	}
1905 
1906 	switch (hdata->value_type)
1907 	{
1908 		case ITEM_VALUE_TYPE_FLOAT:
1909 			zbx_variant_set_dbl(&value_var, hdata->value.dbl);
1910 			break;
1911 		case ITEM_VALUE_TYPE_UINT64:
1912 			zbx_variant_set_ui64(&value_var, hdata->value.ui64);
1913 			break;
1914 		case ITEM_VALUE_TYPE_STR:
1915 		case ITEM_VALUE_TYPE_TEXT:
1916 			zbx_variant_set_str(&value_var, hdata->value.str);
1917 			hdata->value.str = NULL;
1918 			break;
1919 		case ITEM_VALUE_TYPE_LOG:
1920 			zbx_variant_set_str(&value_var, hdata->value.log->value);
1921 			hdata->value.log->value = NULL;
1922 			break;
1923 	}
1924 
1925 	dc_history_set_value(hdata, item->value_type, &value_var);
1926 	zbx_variant_clear(&value_var);
1927 }
1928 
1929 /******************************************************************************
1930  *                                                                            *
1931  * Function: calculate_item_update                                            *
1932  *                                                                            *
1933  * Purpose: calculates what item fields must be updated                       *
1934  *                                                                            *
1935  * Parameters: item      - [IN] the item                                      *
1936  *             h         - [IN] the historical data to process                *
1937  *                                                                            *
1938  * Return value: The update data. This data must be freed by the caller.      *
1939  *                                                                            *
1940  * Comments: Will generate internal events when item state switches.          *
1941  *                                                                            *
1942  ******************************************************************************/
calculate_item_update(DC_ITEM * item,const ZBX_DC_HISTORY * h)1943 static zbx_item_diff_t	*calculate_item_update(DC_ITEM *item, const ZBX_DC_HISTORY *h)
1944 {
1945 	zbx_uint64_t	flags;
1946 	const char	*item_error = NULL;
1947 	zbx_item_diff_t	*diff;
1948 
1949 	flags = item->host.proxy_hostid == 0 ? ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTCLOCK : 0;
1950 
1951 	if (0 != (ZBX_DC_FLAG_META & h->flags))
1952 	{
1953 		if (item->lastlogsize != h->lastlogsize)
1954 			flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE;
1955 
1956 		if (item->mtime != h->mtime)
1957 			flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME;
1958 	}
1959 
1960 	if (h->state != item->state)
1961 	{
1962 		flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE;
1963 
1964 		if (ITEM_STATE_NOTSUPPORTED == h->state)
1965 		{
1966 			zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" became not supported: %s",
1967 					item->host.host, item->key_orig, h->value.str);
1968 
1969 			zbx_add_event(EVENT_SOURCE_INTERNAL, EVENT_OBJECT_ITEM, item->itemid, &h->ts, h->state, NULL,
1970 					NULL, NULL, 0, 0, NULL, 0, NULL, 0, NULL, h->value.err);
1971 
1972 			if (0 != strcmp(item->error, h->value.err))
1973 				item_error = h->value.err;
1974 		}
1975 		else
1976 		{
1977 			zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" became supported",
1978 					item->host.host, item->key_orig);
1979 
1980 			/* we know it's EVENT_OBJECT_ITEM because LLDRULE that becomes */
1981 			/* supported is handled in lld_process_discovery_rule()        */
1982 			zbx_add_event(EVENT_SOURCE_INTERNAL, EVENT_OBJECT_ITEM, item->itemid, &h->ts, h->state,
1983 					NULL, NULL, NULL, 0, 0, NULL, 0, NULL, 0, NULL, NULL);
1984 
1985 			item_error = "";
1986 		}
1987 	}
1988 	else if (ITEM_STATE_NOTSUPPORTED == h->state && 0 != strcmp(item->error, h->value.err))
1989 	{
1990 		zabbix_log(LOG_LEVEL_WARNING, "error reason for \"%s:%s\" changed: %s", item->host.host,
1991 				item->key_orig, h->value.err);
1992 
1993 		item_error = h->value.err;
1994 	}
1995 
1996 	if (NULL != item_error)
1997 		flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_ERROR;
1998 
1999 	if (0 == flags)
2000 		return NULL;
2001 
2002 	diff = (zbx_item_diff_t *)zbx_malloc(NULL, sizeof(zbx_item_diff_t));
2003 	diff->itemid = item->itemid;
2004 	diff->lastclock = h->ts.sec;
2005 	diff->flags = flags;
2006 
2007 	if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE & flags))
2008 		diff->lastlogsize = h->lastlogsize;
2009 
2010 	if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME & flags))
2011 		diff->mtime = h->mtime;
2012 
2013 	if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE & flags))
2014 	{
2015 		diff->state = h->state;
2016 		item->state = h->state;
2017 	}
2018 
2019 	if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_ERROR & flags))
2020 		diff->error = item_error;
2021 
2022 	return diff;
2023 }
2024 
2025 /******************************************************************************
2026  *                                                                            *
2027  * Function: DBmass_update_items                                              *
2028  *                                                                            *
2029  * Purpose: update item data and inventory in database                        *
2030  *                                                                            *
2031  * Parameters: item_diff        - item changes                                *
2032  *             inventory_values - inventory values                            *
2033  *                                                                            *
2034  ******************************************************************************/
DBmass_update_items(const zbx_vector_ptr_t * item_diff,const zbx_vector_ptr_t * inventory_values)2035 static void	DBmass_update_items(const zbx_vector_ptr_t *item_diff, const zbx_vector_ptr_t *inventory_values)
2036 {
2037 	size_t	sql_offset = 0;
2038 	int	i;
2039 
2040 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
2041 
2042 	for (i = 0; i < item_diff->values_num; i++)
2043 	{
2044 		zbx_item_diff_t	*diff;
2045 
2046 		diff = (zbx_item_diff_t *)item_diff->values[i];
2047 		if (0 != (ZBX_FLAGS_ITEM_DIFF_UPDATE_DB & diff->flags))
2048 			break;
2049 	}
2050 
2051 	if (i != item_diff->values_num || 0 != inventory_values->values_num)
2052 	{
2053 		DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset);
2054 
2055 		if (i != item_diff->values_num)
2056 		{
2057 			zbx_db_save_item_changes(&sql, &sql_alloc, &sql_offset, item_diff,
2058 					ZBX_FLAGS_ITEM_DIFF_UPDATE_DB);
2059 		}
2060 
2061 		if (0 != inventory_values->values_num)
2062 			DCadd_update_inventory_sql(&sql_offset, inventory_values);
2063 
2064 		DBend_multiple_update(&sql, &sql_alloc, &sql_offset);
2065 
2066 		if (sql_offset > 16)	/* In ORACLE always present begin..end; */
2067 			DBexecute("%s", sql);
2068 
2069 		DCconfig_update_inventory_values(inventory_values);
2070 	}
2071 
2072 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
2073 }
2074 
2075 /******************************************************************************
2076  *                                                                            *
2077  * Function: DCmass_proxy_update_items                                        *
2078  *                                                                            *
2079  * Purpose: update items info after new value is received                     *
2080  *                                                                            *
2081  * Parameters: history     - array of history data                            *
2082  *             history_num - number of history structures                     *
2083  *                                                                            *
2084  * Author: Alexei Vladishev, Eugene Grigorjev, Alexander Vladishev            *
2085  *                                                                            *
2086  ******************************************************************************/
DCmass_proxy_update_items(ZBX_DC_HISTORY * history,int history_num)2087 static void	DCmass_proxy_update_items(ZBX_DC_HISTORY *history, int history_num)
2088 {
2089 	int			i;
2090 	zbx_vector_ptr_t	item_diff;
2091 	zbx_item_diff_t		*diffs;
2092 
2093 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
2094 
2095 	zbx_vector_ptr_create(&item_diff);
2096 	zbx_vector_ptr_reserve(&item_diff, history_num);
2097 
2098 	/* preallocate zbx_item_diff_t structures for item_diff vector */
2099 	diffs = (zbx_item_diff_t *)zbx_malloc(NULL, sizeof(zbx_item_diff_t) * history_num);
2100 
2101 	for (i = 0; i < history_num; i++)
2102 	{
2103 		zbx_item_diff_t	*diff = &diffs[i];
2104 
2105 		diff->itemid = history[i].itemid;
2106 		diff->state = history[i].state;
2107 		diff->lastclock = history[i].ts.sec;
2108 		diff->flags = ZBX_FLAGS_ITEM_DIFF_UPDATE_STATE | ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTCLOCK;
2109 
2110 		if (0 != (ZBX_DC_FLAG_META & history[i].flags))
2111 		{
2112 			diff->lastlogsize = history[i].lastlogsize;
2113 			diff->mtime = history[i].mtime;
2114 			diff->flags |= ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE | ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME;
2115 		}
2116 
2117 		zbx_vector_ptr_append(&item_diff, diff);
2118 	}
2119 
2120 	if (0 != item_diff.values_num)
2121 	{
2122 		size_t	sql_offset = 0;
2123 
2124 		zbx_vector_ptr_sort(&item_diff, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC);
2125 
2126 		DBbegin_multiple_update(&sql, &sql_alloc, &sql_offset);
2127 
2128 		zbx_db_save_item_changes(&sql, &sql_alloc, &sql_offset, &item_diff,
2129 				ZBX_FLAGS_ITEM_DIFF_UPDATE_LASTLOGSIZE | ZBX_FLAGS_ITEM_DIFF_UPDATE_MTIME);
2130 
2131 		DBend_multiple_update(&sql, &sql_alloc, &sql_offset);
2132 
2133 		if (sql_offset > 16)	/* In ORACLE always present begin..end; */
2134 			DBexecute("%s", sql);
2135 
2136 		DCconfig_items_apply_changes(&item_diff);
2137 	}
2138 
2139 	zbx_vector_ptr_destroy(&item_diff);
2140 	zbx_free(diffs);
2141 
2142 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
2143 }
2144 
2145 /******************************************************************************
2146  *                                                                            *
2147  * Function: DBmass_add_history                                               *
2148  *                                                                            *
2149  * Purpose: inserting new history data after new value is received            *
2150  *                                                                            *
2151  * Parameters: history     - array of history data                            *
2152  *             history_num - number of history structures                     *
2153  *                                                                            *
2154  ******************************************************************************/
DBmass_add_history(ZBX_DC_HISTORY * history,int history_num)2155 static int	DBmass_add_history(ZBX_DC_HISTORY *history, int history_num)
2156 {
2157 	int			i, ret = SUCCEED;
2158 	zbx_vector_ptr_t	history_values;
2159 
2160 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
2161 
2162 	zbx_vector_ptr_create(&history_values);
2163 	zbx_vector_ptr_reserve(&history_values, history_num);
2164 
2165 	for (i = 0; i < history_num; i++)
2166 	{
2167 		ZBX_DC_HISTORY	*h = &history[i];
2168 
2169 		if (0 != (ZBX_DC_FLAGS_NOT_FOR_HISTORY & h->flags))
2170 			continue;
2171 
2172 		zbx_vector_ptr_append(&history_values, h);
2173 	}
2174 
2175 	if (0 != history_values.values_num)
2176 		ret = zbx_vc_add_values(&history_values);
2177 
2178 	zbx_vector_ptr_destroy(&history_values);
2179 
2180 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
2181 
2182 	return ret;
2183 }
2184 
2185 /******************************************************************************
2186  *                                                                            *
2187  * Function: dc_add_proxy_history                                             *
2188  *                                                                            *
2189  * Purpose: helper function for DCmass_proxy_add_history()                    *
2190  *                                                                            *
2191  * Comment: this function is meant for items with value_type other other than *
2192  *          ITEM_VALUE_TYPE_LOG not containing meta information in result     *
2193  *                                                                            *
2194  ******************************************************************************/
dc_add_proxy_history(ZBX_DC_HISTORY * history,int history_num)2195 static void	dc_add_proxy_history(ZBX_DC_HISTORY *history, int history_num)
2196 {
2197 	int		i, now;
2198 	unsigned int	flags;
2199 	char		buffer[64], *pvalue;
2200 	zbx_db_insert_t	db_insert;
2201 
2202 	now = (int)time(NULL);
2203 	zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "value", "flags", "write_clock",
2204 			NULL);
2205 
2206 	for (i = 0; i < history_num; i++)
2207 	{
2208 		const ZBX_DC_HISTORY	*h = &history[i];
2209 
2210 		if (0 != (h->flags & ZBX_DC_FLAG_UNDEF))
2211 			continue;
2212 
2213 		if (0 != (h->flags & ZBX_DC_FLAG_META))
2214 			continue;
2215 
2216 		if (ITEM_STATE_NOTSUPPORTED == h->state)
2217 			continue;
2218 
2219 		if (0 == (h->flags & ZBX_DC_FLAG_NOVALUE))
2220 		{
2221 			switch (h->value_type)
2222 			{
2223 				case ITEM_VALUE_TYPE_FLOAT:
2224 					zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_DBL64, h->value.dbl);
2225 					break;
2226 				case ITEM_VALUE_TYPE_UINT64:
2227 					zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_UI64, h->value.ui64);
2228 					break;
2229 				case ITEM_VALUE_TYPE_STR:
2230 				case ITEM_VALUE_TYPE_TEXT:
2231 					pvalue = h->value.str;
2232 					break;
2233 				case ITEM_VALUE_TYPE_LOG:
2234 					continue;
2235 				default:
2236 					THIS_SHOULD_NEVER_HAPPEN;
2237 					continue;
2238 			}
2239 			flags = 0;
2240 		}
2241 		else
2242 		{
2243 			flags = PROXY_HISTORY_FLAG_NOVALUE;
2244 			pvalue = (char *)"";
2245 		}
2246 
2247 		zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, pvalue, flags, now);
2248 	}
2249 
2250 	zbx_db_insert_execute(&db_insert);
2251 	zbx_db_insert_clean(&db_insert);
2252 }
2253 
2254 /******************************************************************************
2255  *                                                                            *
2256  * Function: dc_add_proxy_history_meta                                        *
2257  *                                                                            *
2258  * Purpose: helper function for DCmass_proxy_add_history()                    *
2259  *                                                                            *
2260  * Comment: this function is meant for items with value_type other other than *
2261  *          ITEM_VALUE_TYPE_LOG containing meta information in result         *
2262  *                                                                            *
2263  ******************************************************************************/
dc_add_proxy_history_meta(ZBX_DC_HISTORY * history,int history_num)2264 static void	dc_add_proxy_history_meta(ZBX_DC_HISTORY *history, int history_num)
2265 {
2266 	int		i, now;
2267 	char		buffer[64], *pvalue;
2268 	zbx_db_insert_t	db_insert;
2269 
2270 	now = (int)time(NULL);
2271 	zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "value", "lastlogsize", "mtime",
2272 			"flags", "write_clock", NULL);
2273 
2274 	for (i = 0; i < history_num; i++)
2275 	{
2276 		unsigned int		flags = PROXY_HISTORY_FLAG_META;
2277 		const ZBX_DC_HISTORY	*h = &history[i];
2278 
2279 		if (ITEM_STATE_NOTSUPPORTED == h->state)
2280 			continue;
2281 
2282 		if (0 != (h->flags & ZBX_DC_FLAG_UNDEF))
2283 			continue;
2284 
2285 		if (0 == (h->flags & ZBX_DC_FLAG_META))
2286 			continue;
2287 
2288 		if (ITEM_VALUE_TYPE_LOG == h->value_type)
2289 			continue;
2290 
2291 		if (0 == (h->flags & ZBX_DC_FLAG_NOVALUE))
2292 		{
2293 			switch (h->value_type)
2294 			{
2295 				case ITEM_VALUE_TYPE_FLOAT:
2296 					zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_DBL64, h->value.dbl);
2297 					break;
2298 				case ITEM_VALUE_TYPE_UINT64:
2299 					zbx_snprintf(pvalue = buffer, sizeof(buffer), ZBX_FS_UI64, h->value.ui64);
2300 					break;
2301 				case ITEM_VALUE_TYPE_STR:
2302 				case ITEM_VALUE_TYPE_TEXT:
2303 					pvalue = h->value.str;
2304 					break;
2305 				default:
2306 					THIS_SHOULD_NEVER_HAPPEN;
2307 					continue;
2308 			}
2309 		}
2310 		else
2311 		{
2312 			flags |= PROXY_HISTORY_FLAG_NOVALUE;
2313 			pvalue = (char *)"";
2314 		}
2315 
2316 		zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, pvalue, h->lastlogsize, h->mtime,
2317 				flags, now);
2318 	}
2319 
2320 	zbx_db_insert_execute(&db_insert);
2321 	zbx_db_insert_clean(&db_insert);
2322 }
2323 
2324 /******************************************************************************
2325  *                                                                            *
2326  * Function: dc_add_proxy_history_log                                         *
2327  *                                                                            *
2328  * Purpose: helper function for DCmass_proxy_add_history()                    *
2329  *                                                                            *
2330  * Comment: this function is meant for items with value_type                  *
2331  *          ITEM_VALUE_TYPE_LOG                                               *
2332  *                                                                            *
2333  ******************************************************************************/
dc_add_proxy_history_log(ZBX_DC_HISTORY * history,int history_num)2334 static void	dc_add_proxy_history_log(ZBX_DC_HISTORY *history, int history_num)
2335 {
2336 	int		i, now;
2337 	zbx_db_insert_t	db_insert;
2338 
2339 	now = (int)time(NULL);
2340 
2341 	/* see hc_copy_history_data() for fields that might be uninitialized and need special handling here */
2342 	zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "timestamp", "source", "severity",
2343 			"value", "logeventid", "lastlogsize", "mtime", "flags", "write_clock", NULL);
2344 
2345 	for (i = 0; i < history_num; i++)
2346 	{
2347 		unsigned int		flags;
2348 		zbx_uint64_t		lastlogsize;
2349 		int			mtime;
2350 		const ZBX_DC_HISTORY	*h = &history[i];
2351 
2352 		if (ITEM_STATE_NOTSUPPORTED == h->state)
2353 			continue;
2354 
2355 		if (ITEM_VALUE_TYPE_LOG != h->value_type)
2356 			continue;
2357 
2358 		if (0 == (h->flags & ZBX_DC_FLAG_NOVALUE))
2359 		{
2360 			zbx_log_value_t *log = h->value.log;
2361 
2362 			if (0 != (h->flags & ZBX_DC_FLAG_META))
2363 			{
2364 				flags = PROXY_HISTORY_FLAG_META;
2365 				lastlogsize = h->lastlogsize;
2366 				mtime = h->mtime;
2367 			}
2368 			else
2369 			{
2370 				flags = 0;
2371 				lastlogsize = 0;
2372 				mtime = 0;
2373 			}
2374 
2375 			zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, log->timestamp,
2376 					ZBX_NULL2EMPTY_STR(log->source), log->severity, log->value, log->logeventid,
2377 					lastlogsize, mtime, flags, now);
2378 		}
2379 		else
2380 		{
2381 			/* sent to server only if not 0, see proxy_get_history_data() */
2382 			const int	unset_if_novalue = 0;
2383 
2384 			flags = PROXY_HISTORY_FLAG_META | PROXY_HISTORY_FLAG_NOVALUE;
2385 
2386 			zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, unset_if_novalue, "",
2387 					unset_if_novalue, "", unset_if_novalue, h->lastlogsize, h->mtime, flags, now);
2388 		}
2389 	}
2390 
2391 	zbx_db_insert_execute(&db_insert);
2392 	zbx_db_insert_clean(&db_insert);
2393 }
2394 
2395 /******************************************************************************
2396  *                                                                            *
2397  * Function: dc_add_proxy_history_notsupported                                *
2398  *                                                                            *
2399  * Purpose: helper function for DCmass_proxy_add_history()                    *
2400  *                                                                            *
2401  ******************************************************************************/
dc_add_proxy_history_notsupported(ZBX_DC_HISTORY * history,int history_num)2402 static void	dc_add_proxy_history_notsupported(ZBX_DC_HISTORY *history, int history_num)
2403 {
2404 	int		i, now;
2405 	zbx_db_insert_t	db_insert;
2406 
2407 	now = (int)time(NULL);
2408 	zbx_db_insert_prepare(&db_insert, "proxy_history", "itemid", "clock", "ns", "value", "state", "write_clock",
2409 			NULL);
2410 
2411 	for (i = 0; i < history_num; i++)
2412 	{
2413 		const ZBX_DC_HISTORY	*h = &history[i];
2414 
2415 		if (ITEM_STATE_NOTSUPPORTED != h->state)
2416 			continue;
2417 
2418 		zbx_db_insert_add_values(&db_insert, h->itemid, h->ts.sec, h->ts.ns, ZBX_NULL2EMPTY_STR(h->value.err),
2419 				(int)h->state, now);
2420 	}
2421 
2422 	zbx_db_insert_execute(&db_insert);
2423 	zbx_db_insert_clean(&db_insert);
2424 }
2425 
2426 /******************************************************************************
2427  *                                                                            *
2428  * Function: DCmass_proxy_add_history                                         *
2429  *                                                                            *
2430  * Purpose: inserting new history data after new value is received            *
2431  *                                                                            *
2432  * Parameters: history     - array of history data                            *
2433  *             history_num - number of history structures                     *
2434  *                                                                            *
2435  * Author: Alexander Vladishev                                                *
2436  *                                                                            *
2437  ******************************************************************************/
DCmass_proxy_add_history(ZBX_DC_HISTORY * history,int history_num)2438 static void	DCmass_proxy_add_history(ZBX_DC_HISTORY *history, int history_num)
2439 {
2440 	int	i, h_num = 0, h_meta_num = 0, hlog_num = 0, notsupported_num = 0;
2441 
2442 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
2443 
2444 	for (i = 0; i < history_num; i++)
2445 	{
2446 		const ZBX_DC_HISTORY	*h = &history[i];
2447 
2448 		if (ITEM_STATE_NOTSUPPORTED == h->state)
2449 		{
2450 			notsupported_num++;
2451 			continue;
2452 		}
2453 
2454 		switch (h->value_type)
2455 		{
2456 			case ITEM_VALUE_TYPE_LOG:
2457 				hlog_num++;
2458 				break;
2459 			case ITEM_VALUE_TYPE_FLOAT:
2460 			case ITEM_VALUE_TYPE_UINT64:
2461 			case ITEM_VALUE_TYPE_STR:
2462 			case ITEM_VALUE_TYPE_TEXT:
2463 				if (0 != (h->flags & ZBX_DC_FLAG_META))
2464 					h_meta_num++;
2465 				else
2466 					h_num++;
2467 				break;
2468 			case ITEM_VALUE_TYPE_NONE:
2469 				h_num++;
2470 				break;
2471 			default:
2472 				THIS_SHOULD_NEVER_HAPPEN;
2473 		}
2474 	}
2475 
2476 	if (0 != h_num)
2477 		dc_add_proxy_history(history, history_num);
2478 
2479 	if (0 != h_meta_num)
2480 		dc_add_proxy_history_meta(history, history_num);
2481 
2482 	if (0 != hlog_num)
2483 		dc_add_proxy_history_log(history, history_num);
2484 
2485 	if (0 != notsupported_num)
2486 		dc_add_proxy_history_notsupported(history, history_num);
2487 
2488 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
2489 }
2490 
2491 /******************************************************************************
2492  *                                                                            *
2493  * Function: DCmass_prepare_history                                           *
2494  *                                                                            *
2495  * Purpose: prepare history data using items from configuration cache and     *
2496  *          generate item changes to be applied and host inventory values to  *
2497  *          be added                                                          *
2498  *                                                                            *
2499  * Parameters: history             - [IN/OUT] array of history data           *
2500  *             itemids             - [IN] the item identifiers                *
2501  *                                        (used for item lookup)              *
2502  *             items               - [IN] the items                           *
2503  *             errcodes            - [IN] item error codes                    *
2504  *             history_num         - [IN] number of history structures        *
2505  *             item_diff           - [OUT] the changes in item data           *
2506  *             inventory_values    - [OUT] the inventory values to add        *
2507  *             compression_age     - [IN] history compression age             *
2508  *             proxy_subscribtions - [IN] history compression age             *
2509  *                                                                            *
2510  ******************************************************************************/
DCmass_prepare_history(ZBX_DC_HISTORY * history,const zbx_vector_uint64_t * itemids,DC_ITEM * items,const int * errcodes,int history_num,zbx_vector_ptr_t * item_diff,zbx_vector_ptr_t * inventory_values,int compression_age,zbx_vector_uint64_pair_t * proxy_subscribtions)2511 static void	DCmass_prepare_history(ZBX_DC_HISTORY *history, const zbx_vector_uint64_t *itemids,
2512 		DC_ITEM *items, const int *errcodes, int history_num, zbx_vector_ptr_t *item_diff,
2513 		zbx_vector_ptr_t *inventory_values, int compression_age, zbx_vector_uint64_pair_t *proxy_subscribtions)
2514 {
2515 	static time_t	last_history_discard = 0;
2516 	time_t		now;
2517 	int		i;
2518 
2519 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d", __func__, history_num);
2520 
2521 	now = time(NULL);
2522 
2523 	for (i = 0; i < history_num; i++)
2524 	{
2525 		ZBX_DC_HISTORY	*h = &history[i];
2526 		DC_ITEM		*item;
2527 		zbx_item_diff_t	*diff;
2528 		int		index;
2529 
2530 		/* discard history items that are older than compression age */
2531 		if (0 != compression_age && h->ts.sec < compression_age)
2532 		{
2533 			if (SEC_PER_HOUR < (now - last_history_discard)) /* log once per hour */
2534 			{
2535 				zabbix_log(LOG_LEVEL_TRACE, "discarding history that is pointing to"
2536 							" compressed history period");
2537 				last_history_discard = now;
2538 			}
2539 
2540 			h->flags |= ZBX_DC_FLAG_UNDEF;
2541 			continue;
2542 		}
2543 
2544 		if (FAIL == (index = zbx_vector_uint64_bsearch(itemids, h->itemid, ZBX_DEFAULT_UINT64_COMPARE_FUNC)))
2545 		{
2546 			THIS_SHOULD_NEVER_HAPPEN;
2547 			h->flags |= ZBX_DC_FLAG_UNDEF;
2548 			continue;
2549 		}
2550 
2551 		if (SUCCEED != errcodes[index])
2552 		{
2553 			h->flags |= ZBX_DC_FLAG_UNDEF;
2554 			continue;
2555 		}
2556 
2557 		item = &items[index];
2558 
2559 		if (ITEM_STATUS_ACTIVE != item->status || HOST_STATUS_MONITORED != item->host.status)
2560 		{
2561 			h->flags |= ZBX_DC_FLAG_UNDEF;
2562 			continue;
2563 		}
2564 
2565 		if (0 == item->history)
2566 		{
2567 			h->flags |= ZBX_DC_FLAG_NOHISTORY;
2568 		}
2569 		else if (now - h->ts.sec > item->history_sec)
2570 		{
2571 			h->flags |= ZBX_DC_FLAG_NOHISTORY;
2572 			zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" value timestamp \"%s %s\" is outside history "
2573 					"storage period", item->host.host, item->key_orig, zbx_date2str(h->ts.sec),
2574 					zbx_time2str(h->ts.sec));
2575 		}
2576 
2577 		if (ITEM_VALUE_TYPE_FLOAT == item->value_type || ITEM_VALUE_TYPE_UINT64 == item->value_type)
2578 		{
2579 			if (0 == item->trends)
2580 			{
2581 				h->flags |= ZBX_DC_FLAG_NOTRENDS;
2582 			}
2583 			else if (now - h->ts.sec > item->trends_sec)
2584 			{
2585 				h->flags |= ZBX_DC_FLAG_NOTRENDS;
2586 				zabbix_log(LOG_LEVEL_WARNING, "item \"%s:%s\" value timestamp \"%s %s\" is outside "
2587 						"trends storage period", item->host.host, item->key_orig,
2588 						zbx_date2str(h->ts.sec), zbx_time2str(h->ts.sec));
2589 			}
2590 		}
2591 		else
2592 			h->flags |= ZBX_DC_FLAG_NOTRENDS;
2593 
2594 		normalize_item_value(item, h);
2595 
2596 		/* calculate item update and update already retrieved item status for trigger calculation */
2597 		if (NULL != (diff = calculate_item_update(item, h)))
2598 			zbx_vector_ptr_append(item_diff, diff);
2599 
2600 		DCinventory_value_add(inventory_values, item, h);
2601 
2602 		if (0 != item->host.proxy_hostid && FAIL == is_item_processed_by_server(item->type, item->key_orig))
2603 		{
2604 			zbx_uint64_pair_t	p = {item->host.proxy_hostid, h->ts.sec};
2605 
2606 			zbx_vector_uint64_pair_append(proxy_subscribtions, p);
2607 		}
2608 	}
2609 
2610 	zbx_vector_ptr_sort(inventory_values, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC);
2611 	zbx_vector_ptr_sort(item_diff, ZBX_DEFAULT_UINT64_PTR_COMPARE_FUNC);
2612 
2613 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
2614 }
2615 
2616 /******************************************************************************
2617  *                                                                            *
2618  * Function: DCmodule_prepare_history                                         *
2619  *                                                                            *
2620  * Purpose: prepare history data to share them with loadable modules, sort    *
2621  *          data by type skipping low-level discovery data, meta information  *
2622  *          updates and notsupported items                                    *
2623  *                                                                            *
2624  * Parameters: history            - [IN] array of history data                *
2625  *             history_num        - [IN] number of history structures         *
2626  *             history_<type>     - [OUT] array of historical data of a       *
2627  *                                  specific data type                        *
2628  *             history_<type>_num - [OUT] number of values of a specific      *
2629  *                                  data type                                 *
2630  *                                                                            *
2631  ******************************************************************************/
DCmodule_prepare_history(ZBX_DC_HISTORY * history,int history_num,ZBX_HISTORY_FLOAT * history_float,int * history_float_num,ZBX_HISTORY_INTEGER * history_integer,int * history_integer_num,ZBX_HISTORY_STRING * history_string,int * history_string_num,ZBX_HISTORY_TEXT * history_text,int * history_text_num,ZBX_HISTORY_LOG * history_log,int * history_log_num)2632 static void	DCmodule_prepare_history(ZBX_DC_HISTORY *history, int history_num, ZBX_HISTORY_FLOAT *history_float,
2633 		int *history_float_num, ZBX_HISTORY_INTEGER *history_integer, int *history_integer_num,
2634 		ZBX_HISTORY_STRING *history_string, int *history_string_num, ZBX_HISTORY_TEXT *history_text,
2635 		int *history_text_num, ZBX_HISTORY_LOG *history_log, int *history_log_num)
2636 {
2637 	ZBX_DC_HISTORY		*h;
2638 	ZBX_HISTORY_FLOAT	*h_float;
2639 	ZBX_HISTORY_INTEGER	*h_integer;
2640 	ZBX_HISTORY_STRING	*h_string;
2641 	ZBX_HISTORY_TEXT	*h_text;
2642 	ZBX_HISTORY_LOG		*h_log;
2643 	int			i;
2644 	const zbx_log_value_t	*log;
2645 
2646 	*history_float_num = 0;
2647 	*history_integer_num = 0;
2648 	*history_string_num = 0;
2649 	*history_text_num = 0;
2650 	*history_log_num = 0;
2651 
2652 	for (i = 0; i < history_num; i++)
2653 	{
2654 		h = &history[i];
2655 
2656 		if (0 != (ZBX_DC_FLAGS_NOT_FOR_MODULES & h->flags))
2657 			continue;
2658 
2659 		switch (h->value_type)
2660 		{
2661 			case ITEM_VALUE_TYPE_FLOAT:
2662 				if (NULL == history_float_cbs)
2663 					continue;
2664 
2665 				h_float = &history_float[(*history_float_num)++];
2666 				h_float->itemid = h->itemid;
2667 				h_float->clock = h->ts.sec;
2668 				h_float->ns = h->ts.ns;
2669 				h_float->value = h->value.dbl;
2670 				break;
2671 			case ITEM_VALUE_TYPE_UINT64:
2672 				if (NULL == history_integer_cbs)
2673 					continue;
2674 
2675 				h_integer = &history_integer[(*history_integer_num)++];
2676 				h_integer->itemid = h->itemid;
2677 				h_integer->clock = h->ts.sec;
2678 				h_integer->ns = h->ts.ns;
2679 				h_integer->value = h->value.ui64;
2680 				break;
2681 			case ITEM_VALUE_TYPE_STR:
2682 				if (NULL == history_string_cbs)
2683 					continue;
2684 
2685 				h_string = &history_string[(*history_string_num)++];
2686 				h_string->itemid = h->itemid;
2687 				h_string->clock = h->ts.sec;
2688 				h_string->ns = h->ts.ns;
2689 				h_string->value = h->value.str;
2690 				break;
2691 			case ITEM_VALUE_TYPE_TEXT:
2692 				if (NULL == history_text_cbs)
2693 					continue;
2694 
2695 				h_text = &history_text[(*history_text_num)++];
2696 				h_text->itemid = h->itemid;
2697 				h_text->clock = h->ts.sec;
2698 				h_text->ns = h->ts.ns;
2699 				h_text->value = h->value.str;
2700 				break;
2701 			case ITEM_VALUE_TYPE_LOG:
2702 				if (NULL == history_log_cbs)
2703 					continue;
2704 
2705 				log = h->value.log;
2706 				h_log = &history_log[(*history_log_num)++];
2707 				h_log->itemid = h->itemid;
2708 				h_log->clock = h->ts.sec;
2709 				h_log->ns = h->ts.ns;
2710 				h_log->value = log->value;
2711 				h_log->source = ZBX_NULL2EMPTY_STR(log->source);
2712 				h_log->timestamp = log->timestamp;
2713 				h_log->logeventid = log->logeventid;
2714 				h_log->severity = log->severity;
2715 				break;
2716 			default:
2717 				THIS_SHOULD_NEVER_HAPPEN;
2718 		}
2719 	}
2720 }
2721 
DCmodule_sync_history(int history_float_num,int history_integer_num,int history_string_num,int history_text_num,int history_log_num,ZBX_HISTORY_FLOAT * history_float,ZBX_HISTORY_INTEGER * history_integer,ZBX_HISTORY_STRING * history_string,ZBX_HISTORY_TEXT * history_text,ZBX_HISTORY_LOG * history_log)2722 static void	DCmodule_sync_history(int history_float_num, int history_integer_num, int history_string_num,
2723 		int history_text_num, int history_log_num, ZBX_HISTORY_FLOAT *history_float,
2724 		ZBX_HISTORY_INTEGER *history_integer, ZBX_HISTORY_STRING *history_string,
2725 		ZBX_HISTORY_TEXT *history_text, ZBX_HISTORY_LOG *history_log)
2726 {
2727 	if (0 != history_float_num)
2728 	{
2729 		int	i;
2730 
2731 		zabbix_log(LOG_LEVEL_DEBUG, "syncing float history data with modules...");
2732 
2733 		for (i = 0; NULL != history_float_cbs[i].module; i++)
2734 		{
2735 			zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_float_cbs[i].module->name);
2736 			history_float_cbs[i].history_float_cb(history_float, history_float_num);
2737 		}
2738 
2739 		zabbix_log(LOG_LEVEL_DEBUG, "synced %d float values with modules", history_float_num);
2740 	}
2741 
2742 	if (0 != history_integer_num)
2743 	{
2744 		int	i;
2745 
2746 		zabbix_log(LOG_LEVEL_DEBUG, "syncing integer history data with modules...");
2747 
2748 		for (i = 0; NULL != history_integer_cbs[i].module; i++)
2749 		{
2750 			zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_integer_cbs[i].module->name);
2751 			history_integer_cbs[i].history_integer_cb(history_integer, history_integer_num);
2752 		}
2753 
2754 		zabbix_log(LOG_LEVEL_DEBUG, "synced %d integer values with modules", history_integer_num);
2755 	}
2756 
2757 	if (0 != history_string_num)
2758 	{
2759 		int	i;
2760 
2761 		zabbix_log(LOG_LEVEL_DEBUG, "syncing string history data with modules...");
2762 
2763 		for (i = 0; NULL != history_string_cbs[i].module; i++)
2764 		{
2765 			zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_string_cbs[i].module->name);
2766 			history_string_cbs[i].history_string_cb(history_string, history_string_num);
2767 		}
2768 
2769 		zabbix_log(LOG_LEVEL_DEBUG, "synced %d string values with modules", history_string_num);
2770 	}
2771 
2772 	if (0 != history_text_num)
2773 	{
2774 		int	i;
2775 
2776 		zabbix_log(LOG_LEVEL_DEBUG, "syncing text history data with modules...");
2777 
2778 		for (i = 0; NULL != history_text_cbs[i].module; i++)
2779 		{
2780 			zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_text_cbs[i].module->name);
2781 			history_text_cbs[i].history_text_cb(history_text, history_text_num);
2782 		}
2783 
2784 		zabbix_log(LOG_LEVEL_DEBUG, "synced %d text values with modules", history_text_num);
2785 	}
2786 
2787 	if (0 != history_log_num)
2788 	{
2789 		int	i;
2790 
2791 		zabbix_log(LOG_LEVEL_DEBUG, "syncing log history data with modules...");
2792 
2793 		for (i = 0; NULL != history_log_cbs[i].module; i++)
2794 		{
2795 			zabbix_log(LOG_LEVEL_DEBUG, "... module \"%s\"", history_log_cbs[i].module->name);
2796 			history_log_cbs[i].history_log_cb(history_log, history_log_num);
2797 		}
2798 
2799 		zabbix_log(LOG_LEVEL_DEBUG, "synced %d log values with modules", history_log_num);
2800 	}
2801 }
2802 
sync_proxy_history(int * total_num,int * more)2803 static void	sync_proxy_history(int *total_num, int *more)
2804 {
2805 	int			history_num;
2806 	time_t			sync_start;
2807 	zbx_vector_ptr_t	history_items;
2808 	ZBX_DC_HISTORY		history[ZBX_HC_SYNC_MAX];
2809 
2810 	zbx_vector_ptr_create(&history_items);
2811 	zbx_vector_ptr_reserve(&history_items, ZBX_HC_SYNC_MAX);
2812 
2813 	sync_start = time(NULL);
2814 
2815 	do
2816 	{
2817 		*more = ZBX_SYNC_DONE;
2818 
2819 		LOCK_CACHE;
2820 
2821 		hc_pop_items(&history_items);		/* select and take items out of history cache */
2822 		history_num = history_items.values_num;
2823 
2824 		UNLOCK_CACHE;
2825 
2826 		if (0 == history_num)
2827 			break;
2828 
2829 		hc_get_item_values(history, &history_items);	/* copy item data from history cache */
2830 
2831 		do
2832 		{
2833 			DBbegin();
2834 
2835 			DCmass_proxy_add_history(history, history_num);
2836 			DCmass_proxy_update_items(history, history_num);
2837 		}
2838 		while (ZBX_DB_DOWN == DBcommit());
2839 
2840 		LOCK_CACHE;
2841 
2842 		hc_push_items(&history_items);	/* return items to history cache */
2843 		cache->history_num -= history_num;
2844 
2845 		if (0 != hc_queue_get_size())
2846 			*more = ZBX_SYNC_MORE;
2847 
2848 		UNLOCK_CACHE;
2849 
2850 		*total_num += history_num;
2851 
2852 		zbx_vector_ptr_clear(&history_items);
2853 		hc_free_item_values(history, history_num);
2854 
2855 		/* Exit from sync loop if we have spent too much time here */
2856 		/* unless we are doing full sync. This is done to allow    */
2857 		/* syncer process to update their statistics.              */
2858 	}
2859 	while (ZBX_SYNC_MORE == *more && ZBX_HC_SYNC_TIME_MAX >= time(NULL) - sync_start);
2860 
2861 	zbx_vector_ptr_destroy(&history_items);
2862 }
2863 
2864 /******************************************************************************
2865  *                                                                            *
2866  * Function: sync_server_history                                              *
2867  *                                                                            *
2868  * Purpose: flush history cache to database, process triggers of flushed      *
2869  *          and timer triggers from timer queue                               *
2870  *                                                                            *
2871  * Parameters: sync_timeout - [IN] the timeout in seconds                     *
2872  *             values_num   - [IN/OUT] the number of synced values            *
2873  *             triggers_num - [IN/OUT] the number of processed timers         *
2874  *             more         - [OUT] a flag indicating the cache emptiness:    *
2875  *                               ZBX_SYNC_DONE - nothing to sync, go idle     *
2876  *                               ZBX_SYNC_MORE - more data to sync            *
2877  *                                                                            *
2878  * Comments: This function loops syncing history values by 1k batches and     *
2879  *           processing timer triggers by batches of 500 triggers.            *
2880  *           Unless full sync is being done the loop is aborted if either     *
2881  *           timeout has passed or there are no more data to process.         *
2882  *           The last is assumed when the following is true:                  *
2883  *            a) history cache is empty or less than 10% of batch values were *
2884  *               processed (the other items were locked by triggers)          *
2885  *            b) less than 500 (full batch) timer triggers were processed     *
2886  *                                                                            *
2887  ******************************************************************************/
sync_server_history(int * values_num,int * triggers_num,int * more)2888 static void	sync_server_history(int *values_num, int *triggers_num, int *more)
2889 {
2890 	static ZBX_HISTORY_FLOAT	*history_float;
2891 	static ZBX_HISTORY_INTEGER	*history_integer;
2892 	static ZBX_HISTORY_STRING	*history_string;
2893 	static ZBX_HISTORY_TEXT		*history_text;
2894 	static ZBX_HISTORY_LOG		*history_log;
2895 	int				i, history_num, history_float_num, history_integer_num, history_string_num,
2896 					history_text_num, history_log_num, txn_error, compression_age;
2897 	unsigned int			item_retrieve_mode;
2898 	time_t				sync_start;
2899 	zbx_vector_uint64_t		triggerids, timer_triggerids;
2900 	zbx_vector_ptr_t		history_items, trigger_diff, item_diff, inventory_values;
2901 	zbx_vector_uint64_pair_t	trends_diff, proxy_subscribtions;
2902 	ZBX_DC_HISTORY			history[ZBX_HC_SYNC_MAX];
2903 
2904 	item_retrieve_mode = NULL == CONFIG_EXPORT_DIR ? ZBX_ITEM_GET_SYNC : ZBX_ITEM_GET_SYNC_EXPORT;
2905 
2906 	if (NULL == history_float && NULL != history_float_cbs)
2907 	{
2908 		history_float = (ZBX_HISTORY_FLOAT *)zbx_malloc(history_float,
2909 				ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_FLOAT));
2910 	}
2911 
2912 	if (NULL == history_integer && NULL != history_integer_cbs)
2913 	{
2914 		history_integer = (ZBX_HISTORY_INTEGER *)zbx_malloc(history_integer,
2915 				ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_INTEGER));
2916 	}
2917 
2918 	if (NULL == history_string && NULL != history_string_cbs)
2919 	{
2920 		history_string = (ZBX_HISTORY_STRING *)zbx_malloc(history_string,
2921 				ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_STRING));
2922 	}
2923 
2924 	if (NULL == history_text && NULL != history_text_cbs)
2925 	{
2926 		history_text = (ZBX_HISTORY_TEXT *)zbx_malloc(history_text,
2927 				ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_TEXT));
2928 	}
2929 
2930 	if (NULL == history_log && NULL != history_log_cbs)
2931 	{
2932 		history_log = (ZBX_HISTORY_LOG *)zbx_malloc(history_log,
2933 				ZBX_HC_SYNC_MAX * sizeof(ZBX_HISTORY_LOG));
2934 	}
2935 
2936 	compression_age = hc_get_history_compression_age();
2937 
2938 	zbx_vector_ptr_create(&inventory_values);
2939 	zbx_vector_ptr_create(&item_diff);
2940 	zbx_vector_ptr_create(&trigger_diff);
2941 	zbx_vector_uint64_pair_create(&trends_diff);
2942 	zbx_vector_uint64_pair_create(&proxy_subscribtions);
2943 
2944 	zbx_vector_uint64_create(&triggerids);
2945 	zbx_vector_uint64_reserve(&triggerids, ZBX_HC_SYNC_MAX);
2946 
2947 	zbx_vector_uint64_create(&timer_triggerids);
2948 	zbx_vector_uint64_reserve(&timer_triggerids, ZBX_HC_TIMER_MAX);
2949 
2950 	zbx_vector_ptr_create(&history_items);
2951 	zbx_vector_ptr_reserve(&history_items, ZBX_HC_SYNC_MAX);
2952 
2953 	sync_start = time(NULL);
2954 
2955 	do
2956 	{
2957 		DC_ITEM			*items;
2958 		int			*errcodes, trends_num = 0, timers_num = 0, ret = SUCCEED;
2959 		zbx_vector_uint64_t	itemids;
2960 		ZBX_DC_TREND		*trends = NULL;
2961 		zbx_timespec_t		ts;
2962 
2963 		zbx_vector_uint64_create(&itemids);
2964 
2965 		*more = ZBX_SYNC_DONE;
2966 
2967 		LOCK_CACHE;
2968 		hc_pop_items(&history_items);		/* select and take items out of history cache */
2969 		UNLOCK_CACHE;
2970 
2971 		if (0 != history_items.values_num)
2972 		{
2973 			if (0 == (history_num = DCconfig_lock_triggers_by_history_items(&history_items, &triggerids)))
2974 			{
2975 				LOCK_CACHE;
2976 				hc_push_items(&history_items);
2977 				UNLOCK_CACHE;
2978 				zbx_vector_ptr_clear(&history_items);
2979 			}
2980 		}
2981 		else
2982 			history_num = 0;
2983 
2984 		zbx_timespec(&ts);
2985 
2986 		if (0 != history_num)
2987 		{
2988 			hc_get_item_values(history, &history_items);	/* copy item data from history cache */
2989 
2990 			items = (DC_ITEM *)zbx_malloc(NULL, sizeof(DC_ITEM) * (size_t)history_num);
2991 			errcodes = (int *)zbx_malloc(NULL, sizeof(int) * (size_t)history_num);
2992 
2993 			zbx_vector_uint64_reserve(&itemids, history_num);
2994 
2995 			for (i = 0; i < history_num; i++)
2996 				zbx_vector_uint64_append(&itemids, history[i].itemid);
2997 
2998 			zbx_vector_uint64_sort(&itemids, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
2999 
3000 			DCconfig_get_items_by_itemids_partial(items, itemids.values, errcodes, history_num,
3001 					item_retrieve_mode);
3002 
3003 			DCmass_prepare_history(history, &itemids, items, errcodes, history_num, &item_diff,
3004 					&inventory_values, compression_age, &proxy_subscribtions);
3005 
3006 			if (FAIL != (ret = DBmass_add_history(history, history_num)))
3007 			{
3008 				DCconfig_items_apply_changes(&item_diff);
3009 				DCmass_update_trends(history, history_num, &trends, &trends_num, compression_age);
3010 
3011 				do
3012 				{
3013 					DBbegin();
3014 
3015 					DBmass_update_items(&item_diff, &inventory_values);
3016 					DBmass_update_trends(trends, trends_num, &trends_diff);
3017 
3018 					/* process internal events generated by DCmass_prepare_history() */
3019 					zbx_process_events(NULL, NULL);
3020 
3021 					if (ZBX_DB_OK == (txn_error = DBcommit()))
3022 						DCupdate_trends(&trends_diff);
3023 					else
3024 						zbx_reset_event_recovery();
3025 
3026 					zbx_vector_uint64_pair_clear(&trends_diff);
3027 				}
3028 				while (ZBX_DB_DOWN == txn_error);
3029 			}
3030 
3031 			zbx_clean_events();
3032 
3033 			zbx_vector_ptr_clear_ext(&inventory_values, (zbx_clean_func_t)DCinventory_value_free);
3034 			zbx_vector_ptr_clear_ext(&item_diff, (zbx_clean_func_t)zbx_ptr_free);
3035 		}
3036 
3037 		if (FAIL != ret)
3038 		{
3039 			zbx_dc_get_timer_triggerids(&timer_triggerids, time(NULL), ZBX_HC_TIMER_MAX);
3040 			timers_num = timer_triggerids.values_num;
3041 
3042 			if (ZBX_HC_TIMER_MAX == timers_num)
3043 				*more = ZBX_SYNC_MORE;
3044 
3045 			if (0 != history_num || 0 != timers_num)
3046 			{
3047 				/* timer triggers do not intersect with item triggers because item triggers */
3048 				/* where already locked and skipped when retrieving timer triggers          */
3049 				zbx_vector_uint64_append_array(&triggerids, timer_triggerids.values,
3050 						timer_triggerids.values_num);
3051 				do
3052 				{
3053 					DBbegin();
3054 
3055 					recalculate_triggers(history, history_num, &itemids, items, errcodes,
3056 							&timer_triggerids, &ts, &trigger_diff);
3057 
3058 					/* process trigger events generated by recalculate_triggers() */
3059 					zbx_process_events(&trigger_diff, &triggerids);
3060 					if (0 != trigger_diff.values_num)
3061 						zbx_db_save_trigger_changes(&trigger_diff);
3062 
3063 					if (ZBX_DB_OK == (txn_error = DBcommit()))
3064 					{
3065 						DCconfig_triggers_apply_changes(&trigger_diff);
3066 						DBupdate_itservices(&trigger_diff);
3067 					}
3068 					else
3069 						zbx_clean_events();
3070 
3071 					zbx_vector_ptr_clear_ext(&trigger_diff, (zbx_clean_func_t)zbx_trigger_diff_free);
3072 				}
3073 				while (ZBX_DB_DOWN == txn_error);
3074 			}
3075 
3076 			zbx_vector_uint64_clear(&timer_triggerids);
3077 		}
3078 
3079 		if (0 != triggerids.values_num)
3080 		{
3081 			*triggers_num += triggerids.values_num;
3082 			DCconfig_unlock_triggers(&triggerids);
3083 			zbx_vector_uint64_clear(&triggerids);
3084 		}
3085 
3086 		if (0 != proxy_subscribtions.values_num)
3087 		{
3088 			zbx_vector_uint64_pair_sort(&proxy_subscribtions, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
3089 			zbx_dc_proxy_update_nodata(&proxy_subscribtions);
3090 			zbx_vector_uint64_pair_clear(&proxy_subscribtions);
3091 		}
3092 
3093 		if (0 != history_num)
3094 		{
3095 			LOCK_CACHE;
3096 			hc_push_items(&history_items);	/* return items to history cache */
3097 			cache->history_num -= history_num;
3098 
3099 			if (0 != hc_queue_get_size())
3100 			{
3101 				/* Continue sync if enough of sync candidates were processed       */
3102 				/* (meaning most of sync candidates are not locked by triggers).   */
3103 				/* Otherwise better to wait a bit for other syncers to unlock      */
3104 				/* items rather than trying and failing to sync locked items over  */
3105 				/* and over again.                                                 */
3106 				if (ZBX_HC_SYNC_MIN_PCNT <= history_num * 100 / history_items.values_num)
3107 					*more = ZBX_SYNC_MORE;
3108 			}
3109 
3110 			UNLOCK_CACHE;
3111 
3112 			*values_num += history_num;
3113 		}
3114 
3115 		if (FAIL != ret)
3116 		{
3117 			if (0 != history_num)
3118 			{
3119 				const ZBX_DC_HISTORY	*phistory = NULL;
3120 				const ZBX_DC_TREND	*ptrends = NULL;
3121 				int			history_num_loc = 0, trends_num_loc = 0;
3122 
3123 				DCmodule_prepare_history(history, history_num, history_float, &history_float_num,
3124 						history_integer, &history_integer_num, history_string,
3125 						&history_string_num, history_text, &history_text_num, history_log,
3126 						&history_log_num);
3127 
3128 				DCmodule_sync_history(history_float_num, history_integer_num, history_string_num,
3129 						history_text_num, history_log_num, history_float, history_integer,
3130 						history_string, history_text, history_log);
3131 
3132 				if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_HISTORY))
3133 				{
3134 					phistory = history;
3135 					history_num_loc = history_num;
3136 				}
3137 
3138 				if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_TRENDS))
3139 				{
3140 					ptrends = trends;
3141 					trends_num_loc = trends_num;
3142 				}
3143 
3144 				if (NULL != phistory || NULL != ptrends)
3145 				{
3146 					DCexport_history_and_trends(phistory, history_num_loc, &itemids, items,
3147 							errcodes, ptrends, trends_num_loc);
3148 				}
3149 			}
3150 
3151 			if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_EVENTS))
3152 				zbx_export_events();
3153 		}
3154 
3155 		if (0 != history_num || 0 != timers_num)
3156 			zbx_clean_events();
3157 
3158 		if (0 != history_num)
3159 		{
3160 			zbx_free(trends);
3161 			DCconfig_clean_items(items, errcodes, history_num);
3162 			zbx_free(errcodes);
3163 			zbx_free(items);
3164 
3165 			zbx_vector_ptr_clear(&history_items);
3166 			hc_free_item_values(history, history_num);
3167 		}
3168 
3169 		zbx_vector_uint64_destroy(&itemids);
3170 
3171 		/* Exit from sync loop if we have spent too much time here.       */
3172 		/* This is done to allow syncer process to update its statistics. */
3173 	}
3174 	while (ZBX_SYNC_MORE == *more && ZBX_HC_SYNC_TIME_MAX >= time(NULL) - sync_start);
3175 
3176 	zbx_vector_ptr_destroy(&history_items);
3177 	zbx_vector_ptr_destroy(&inventory_values);
3178 	zbx_vector_ptr_destroy(&item_diff);
3179 	zbx_vector_ptr_destroy(&trigger_diff);
3180 	zbx_vector_uint64_pair_destroy(&trends_diff);
3181 	zbx_vector_uint64_pair_destroy(&proxy_subscribtions);
3182 
3183 	zbx_vector_uint64_destroy(&timer_triggerids);
3184 	zbx_vector_uint64_destroy(&triggerids);
3185 }
3186 
3187 /******************************************************************************
3188  *                                                                            *
3189  * Function: sync_history_cache_full                                          *
3190  *                                                                            *
3191  * Purpose: writes updates and new data from history cache to database        *
3192  *                                                                            *
3193  * Comments: This function is used to flush history cache at server/proxy     *
3194  *           exit.                                                            *
3195  *           Other processes are already terminated, so cache locking is      *
3196  *           unnecessary.                                                     *
3197  *                                                                            *
3198  ******************************************************************************/
sync_history_cache_full(void)3199 static void	sync_history_cache_full(void)
3200 {
3201 	int			values_num = 0, triggers_num = 0, more;
3202 	zbx_hashset_iter_t	iter;
3203 	zbx_hc_item_t		*item;
3204 	zbx_binary_heap_t	tmp_history_queue;
3205 
3206 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d", __func__, cache->history_num);
3207 
3208 	/* History index cache might be full without any space left for queueing items from history index to  */
3209 	/* history queue. The solution: replace the shared-memory history queue with heap-allocated one. Add  */
3210 	/* all items from history index to the new history queue.                                             */
3211 	/*                                                                                                    */
3212 	/* Assertions that must be true.                                                                      */
3213 	/*   * This is the main server or proxy process,                                                      */
3214 	/*   * There are no other users of history index cache stored in shared memory. Other processes       */
3215 	/*     should have quit by this point.                                                                */
3216 	/*   * other parts of the program do not hold pointers to the elements of history queue that is       */
3217 	/*     stored in the shared memory.                                                                   */
3218 
3219 	if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
3220 	{
3221 		/* unlock all triggers before full sync so no items are locked by triggers */
3222 		DCconfig_unlock_all_triggers();
3223 
3224 		/* clear timer trigger queue to avoid processing time triggers at exit */
3225 		zbx_dc_clear_timer_queue();
3226 	}
3227 
3228 	tmp_history_queue = cache->history_queue;
3229 
3230 	zbx_binary_heap_create(&cache->history_queue, hc_queue_elem_compare_func, ZBX_BINARY_HEAP_OPTION_EMPTY);
3231 	zbx_hashset_iter_reset(&cache->history_items, &iter);
3232 
3233 	/* add all items from history index to the new history queue */
3234 	while (NULL != (item = (zbx_hc_item_t *)zbx_hashset_iter_next(&iter)))
3235 	{
3236 		if (NULL != item->tail)
3237 		{
3238 			item->status = ZBX_HC_ITEM_STATUS_NORMAL;
3239 			hc_queue_item(item);
3240 		}
3241 	}
3242 
3243 	if (0 != hc_queue_get_size())
3244 	{
3245 		zabbix_log(LOG_LEVEL_WARNING, "syncing history data...");
3246 
3247 		do
3248 		{
3249 			if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
3250 				sync_server_history(&values_num, &triggers_num, &more);
3251 			else
3252 				sync_proxy_history(&values_num, &more);
3253 
3254 			zabbix_log(LOG_LEVEL_WARNING, "syncing history data... " ZBX_FS_DBL "%%",
3255 					(double)values_num / (cache->history_num + values_num) * 100);
3256 		}
3257 		while (0 != hc_queue_get_size());
3258 
3259 		zabbix_log(LOG_LEVEL_WARNING, "syncing history data done");
3260 	}
3261 
3262 	zbx_binary_heap_destroy(&cache->history_queue);
3263 	cache->history_queue = tmp_history_queue;
3264 
3265 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
3266 }
3267 
3268 /******************************************************************************
3269  *                                                                            *
3270  * Function: zbx_log_sync_history_cache_progress                              *
3271  *                                                                            *
3272  * Purpose: log progress of syncing history data                              *
3273  *                                                                            *
3274  ******************************************************************************/
zbx_log_sync_history_cache_progress(void)3275 void	zbx_log_sync_history_cache_progress(void)
3276 {
3277 	double		pcnt = -1.0;
3278 	int		ts_last, ts_next, sec;
3279 
3280 	LOCK_CACHE;
3281 
3282 	if (INT_MAX == cache->history_progress_ts)
3283 	{
3284 		UNLOCK_CACHE;
3285 		return;
3286 	}
3287 
3288 	ts_last = cache->history_progress_ts;
3289 	sec = time(NULL);
3290 
3291 	if (0 == cache->history_progress_ts)
3292 	{
3293 		cache->history_num_total = cache->history_num;
3294 		cache->history_progress_ts = sec;
3295 	}
3296 
3297 	if (ZBX_HC_SYNC_TIME_MAX <= sec - cache->history_progress_ts || 0 == cache->history_num)
3298 	{
3299 		if (0 != cache->history_num_total)
3300 			pcnt = 100 * (double)(cache->history_num_total - cache->history_num) / cache->history_num_total;
3301 
3302 		cache->history_progress_ts = (0 == cache->history_num ? INT_MAX : sec);
3303 	}
3304 
3305 	ts_next = cache->history_progress_ts;
3306 
3307 	UNLOCK_CACHE;
3308 
3309 	if (0 == ts_last)
3310 		zabbix_log(LOG_LEVEL_WARNING, "syncing history data in progress... ");
3311 
3312 	if (-1.0 != pcnt)
3313 		zabbix_log(LOG_LEVEL_WARNING, "syncing history data... " ZBX_FS_DBL "%%", pcnt);
3314 
3315 	if (INT_MAX == ts_next)
3316 		zabbix_log(LOG_LEVEL_WARNING, "syncing history data done");
3317 }
3318 
3319 /******************************************************************************
3320  *                                                                            *
3321  * Function: zbx_sync_history_cache                                           *
3322  *                                                                            *
3323  * Purpose: writes updates and new data from history cache to database        *
3324  *                                                                            *
3325  * Parameters: values_num - [OUT] the number of synced values                  *
3326  *             more      - [OUT] a flag indicating the cache emptiness:       *
3327  *                                ZBX_SYNC_DONE - nothing to sync, go idle    *
3328  *                                ZBX_SYNC_MORE - more data to sync           *
3329  *                                                                            *
3330  ******************************************************************************/
zbx_sync_history_cache(int * values_num,int * triggers_num,int * more)3331 void	zbx_sync_history_cache(int *values_num, int *triggers_num, int *more)
3332 {
3333 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() history_num:%d", __func__, cache->history_num);
3334 
3335 	*values_num = 0;
3336 	*triggers_num = 0;
3337 
3338 	if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
3339 		sync_server_history(values_num, triggers_num, more);
3340 	else
3341 		sync_proxy_history(values_num, more);
3342 }
3343 
3344 /******************************************************************************
3345  *                                                                            *
3346  * local history cache                                                        *
3347  *                                                                            *
3348  ******************************************************************************/
dc_string_buffer_realloc(size_t len)3349 static void	dc_string_buffer_realloc(size_t len)
3350 {
3351 	if (string_values_alloc >= string_values_offset + len)
3352 		return;
3353 
3354 	do
3355 	{
3356 		string_values_alloc += ZBX_STRING_REALLOC_STEP;
3357 	}
3358 	while (string_values_alloc < string_values_offset + len);
3359 
3360 	string_values = (char *)zbx_realloc(string_values, string_values_alloc);
3361 }
3362 
dc_local_get_history_slot(void)3363 static dc_item_value_t	*dc_local_get_history_slot(void)
3364 {
3365 	if (ZBX_MAX_VALUES_LOCAL == item_values_num)
3366 		dc_flush_history();
3367 
3368 	if (item_values_alloc == item_values_num)
3369 	{
3370 		item_values_alloc += ZBX_STRUCT_REALLOC_STEP;
3371 		item_values = (dc_item_value_t *)zbx_realloc(item_values, item_values_alloc * sizeof(dc_item_value_t));
3372 	}
3373 
3374 	return &item_values[item_values_num++];
3375 }
3376 
dc_local_add_history_dbl(zbx_uint64_t itemid,unsigned char item_value_type,const zbx_timespec_t * ts,double value_orig,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)3377 static void	dc_local_add_history_dbl(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts,
3378 		double value_orig, zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
3379 {
3380 	dc_item_value_t	*item_value;
3381 
3382 	item_value = dc_local_get_history_slot();
3383 
3384 	item_value->itemid = itemid;
3385 	item_value->ts = *ts;
3386 	item_value->item_value_type = item_value_type;
3387 	item_value->value_type = ITEM_VALUE_TYPE_FLOAT;
3388 	item_value->state = ITEM_STATE_NORMAL;
3389 	item_value->flags = flags;
3390 
3391 	if (0 != (item_value->flags & ZBX_DC_FLAG_META))
3392 	{
3393 		item_value->lastlogsize = lastlogsize;
3394 		item_value->mtime = mtime;
3395 	}
3396 
3397 	if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE))
3398 		item_value->value.value_dbl = value_orig;
3399 }
3400 
dc_local_add_history_uint(zbx_uint64_t itemid,unsigned char item_value_type,const zbx_timespec_t * ts,zbx_uint64_t value_orig,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)3401 static void	dc_local_add_history_uint(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts,
3402 		zbx_uint64_t value_orig, zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
3403 {
3404 	dc_item_value_t	*item_value;
3405 
3406 	item_value = dc_local_get_history_slot();
3407 
3408 	item_value->itemid = itemid;
3409 	item_value->ts = *ts;
3410 	item_value->item_value_type = item_value_type;
3411 	item_value->value_type = ITEM_VALUE_TYPE_UINT64;
3412 	item_value->state = ITEM_STATE_NORMAL;
3413 	item_value->flags = flags;
3414 
3415 	if (0 != (item_value->flags & ZBX_DC_FLAG_META))
3416 	{
3417 		item_value->lastlogsize = lastlogsize;
3418 		item_value->mtime = mtime;
3419 	}
3420 
3421 	if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE))
3422 		item_value->value.value_uint = value_orig;
3423 }
3424 
dc_local_add_history_text(zbx_uint64_t itemid,unsigned char item_value_type,const zbx_timespec_t * ts,const char * value_orig,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)3425 static void	dc_local_add_history_text(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts,
3426 		const char *value_orig, zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
3427 {
3428 	dc_item_value_t	*item_value;
3429 
3430 	item_value = dc_local_get_history_slot();
3431 
3432 	item_value->itemid = itemid;
3433 	item_value->ts = *ts;
3434 	item_value->item_value_type = item_value_type;
3435 	item_value->value_type = ITEM_VALUE_TYPE_TEXT;
3436 	item_value->state = ITEM_STATE_NORMAL;
3437 	item_value->flags = flags;
3438 
3439 	if (0 != (item_value->flags & ZBX_DC_FLAG_META))
3440 	{
3441 		item_value->lastlogsize = lastlogsize;
3442 		item_value->mtime = mtime;
3443 	}
3444 
3445 	if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE))
3446 	{
3447 		item_value->value.value_str.len = zbx_db_strlen_n(value_orig, ZBX_HISTORY_VALUE_LEN) + 1;
3448 		dc_string_buffer_realloc(item_value->value.value_str.len);
3449 
3450 		item_value->value.value_str.pvalue = string_values_offset;
3451 		memcpy(&string_values[string_values_offset], value_orig, item_value->value.value_str.len);
3452 		string_values_offset += item_value->value.value_str.len;
3453 	}
3454 	else
3455 		item_value->value.value_str.len = 0;
3456 }
3457 
dc_local_add_history_log(zbx_uint64_t itemid,unsigned char item_value_type,const zbx_timespec_t * ts,const zbx_log_t * log,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)3458 static void	dc_local_add_history_log(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts,
3459 		const zbx_log_t *log, zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
3460 {
3461 	dc_item_value_t	*item_value;
3462 
3463 	item_value = dc_local_get_history_slot();
3464 
3465 	item_value->itemid = itemid;
3466 	item_value->ts = *ts;
3467 	item_value->item_value_type = item_value_type;
3468 	item_value->value_type = ITEM_VALUE_TYPE_LOG;
3469 	item_value->state = ITEM_STATE_NORMAL;
3470 
3471 	item_value->flags = flags;
3472 
3473 	if (0 != (item_value->flags & ZBX_DC_FLAG_META))
3474 	{
3475 		item_value->lastlogsize = lastlogsize;
3476 		item_value->mtime = mtime;
3477 	}
3478 
3479 	if (0 == (item_value->flags & ZBX_DC_FLAG_NOVALUE))
3480 	{
3481 		item_value->severity = log->severity;
3482 		item_value->logeventid = log->logeventid;
3483 		item_value->timestamp = log->timestamp;
3484 
3485 		item_value->value.value_str.len = zbx_db_strlen_n(log->value, ZBX_HISTORY_VALUE_LEN) + 1;
3486 
3487 		if (NULL != log->source && '\0' != *log->source)
3488 			item_value->source.len = zbx_db_strlen_n(log->source, HISTORY_LOG_SOURCE_LEN) + 1;
3489 		else
3490 			item_value->source.len = 0;
3491 	}
3492 	else
3493 	{
3494 		item_value->value.value_str.len = 0;
3495 		item_value->source.len = 0;
3496 	}
3497 
3498 	if (0 != item_value->value.value_str.len + item_value->source.len)
3499 	{
3500 		dc_string_buffer_realloc(item_value->value.value_str.len + item_value->source.len);
3501 
3502 		if (0 != item_value->value.value_str.len)
3503 		{
3504 			item_value->value.value_str.pvalue = string_values_offset;
3505 			memcpy(&string_values[string_values_offset], log->value, item_value->value.value_str.len);
3506 			string_values_offset += item_value->value.value_str.len;
3507 		}
3508 
3509 		if (0 != item_value->source.len)
3510 		{
3511 			item_value->source.pvalue = string_values_offset;
3512 			memcpy(&string_values[string_values_offset], log->source, item_value->source.len);
3513 			string_values_offset += item_value->source.len;
3514 		}
3515 	}
3516 }
3517 
dc_local_add_history_notsupported(zbx_uint64_t itemid,const zbx_timespec_t * ts,const char * error,zbx_uint64_t lastlogsize,int mtime,unsigned char flags)3518 static void	dc_local_add_history_notsupported(zbx_uint64_t itemid, const zbx_timespec_t *ts, const char *error,
3519 		zbx_uint64_t lastlogsize, int mtime, unsigned char flags)
3520 {
3521 	dc_item_value_t	*item_value;
3522 
3523 	item_value = dc_local_get_history_slot();
3524 
3525 	item_value->itemid = itemid;
3526 	item_value->ts = *ts;
3527 	item_value->state = ITEM_STATE_NOTSUPPORTED;
3528 	item_value->flags = flags;
3529 
3530 	if (0 != (item_value->flags & ZBX_DC_FLAG_META))
3531 	{
3532 		item_value->lastlogsize = lastlogsize;
3533 		item_value->mtime = mtime;
3534 	}
3535 
3536 	item_value->value.value_str.len = zbx_db_strlen_n(error, ITEM_ERROR_LEN) + 1;
3537 	dc_string_buffer_realloc(item_value->value.value_str.len);
3538 	item_value->value.value_str.pvalue = string_values_offset;
3539 	memcpy(&string_values[string_values_offset], error, item_value->value.value_str.len);
3540 	string_values_offset += item_value->value.value_str.len;
3541 }
3542 
dc_local_add_history_lld(zbx_uint64_t itemid,const zbx_timespec_t * ts,const char * value_orig)3543 static void	dc_local_add_history_lld(zbx_uint64_t itemid, const zbx_timespec_t *ts, const char *value_orig)
3544 {
3545 	dc_item_value_t	*item_value;
3546 
3547 	item_value = dc_local_get_history_slot();
3548 
3549 	item_value->itemid = itemid;
3550 	item_value->ts = *ts;
3551 	item_value->state = ITEM_STATE_NORMAL;
3552 	item_value->flags = ZBX_DC_FLAG_LLD;
3553 	item_value->value.value_str.len = strlen(value_orig) + 1;
3554 
3555 	dc_string_buffer_realloc(item_value->value.value_str.len);
3556 	item_value->value.value_str.pvalue = string_values_offset;
3557 	memcpy(&string_values[string_values_offset], value_orig, item_value->value.value_str.len);
3558 	string_values_offset += item_value->value.value_str.len;
3559 }
3560 
dc_local_add_history_empty(zbx_uint64_t itemid,unsigned char item_value_type,const zbx_timespec_t * ts,unsigned char flags)3561 static void	dc_local_add_history_empty(zbx_uint64_t itemid, unsigned char item_value_type, const zbx_timespec_t *ts,
3562 		unsigned char flags)
3563 {
3564 	dc_item_value_t	*item_value;
3565 
3566 	item_value = dc_local_get_history_slot();
3567 
3568 	item_value->itemid = itemid;
3569 	item_value->ts = *ts;
3570 	item_value->item_value_type = item_value_type;
3571 	item_value->value_type = ITEM_VALUE_TYPE_NONE;
3572 	item_value->state = ITEM_STATE_NORMAL;
3573 	item_value->flags = flags;
3574 }
3575 
3576 /******************************************************************************
3577  *                                                                            *
3578  * Function: dc_add_history                                                   *
3579  *                                                                            *
3580  * Purpose: add new value to the cache                                        *
3581  *                                                                            *
3582  * Parameters:  itemid          - [IN] the itemid                             *
3583  *              item_value_type - [IN] the item value type                    *
3584  *              item_flags      - [IN] the item flags (e. g. lld rule)        *
3585  *              result          - [IN] agent result containing the value      *
3586  *                                to add                                      *
3587  *              ts              - [IN] the value timestamp                    *
3588  *              state           - [IN] the item state                         *
3589  *              error           - [IN] the error message in case item state   *
3590  *                                is ITEM_STATE_NOTSUPPORTED                  *
3591  *                                                                            *
3592  ******************************************************************************/
dc_add_history(zbx_uint64_t itemid,unsigned char item_value_type,unsigned char item_flags,AGENT_RESULT * result,const zbx_timespec_t * ts,unsigned char state,const char * error)3593 void	dc_add_history(zbx_uint64_t itemid, unsigned char item_value_type, unsigned char item_flags,
3594 		AGENT_RESULT *result, const zbx_timespec_t *ts, unsigned char state, const char *error)
3595 {
3596 	unsigned char	value_flags;
3597 
3598 	if (ITEM_STATE_NOTSUPPORTED == state)
3599 	{
3600 		zbx_uint64_t	lastlogsize;
3601 		int		mtime;
3602 
3603 		if (NULL != result && 0 != ISSET_META(result))
3604 		{
3605 			value_flags = ZBX_DC_FLAG_META;
3606 			lastlogsize = result->lastlogsize;
3607 			mtime = result->mtime;
3608 		}
3609 		else
3610 		{
3611 			value_flags = 0;
3612 			lastlogsize = 0;
3613 			mtime = 0;
3614 		}
3615 		dc_local_add_history_notsupported(itemid, ts, error, lastlogsize, mtime, value_flags);
3616 		return;
3617 	}
3618 
3619 	/* allow proxy to send timestamps of empty (throttled etc) values to update nextchecks for queue */
3620 	if (!ISSET_VALUE(result) && !ISSET_META(result) && 0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
3621 		return;
3622 
3623 	value_flags = 0;
3624 
3625 	if (!ISSET_VALUE(result))
3626 		value_flags |= ZBX_DC_FLAG_NOVALUE;
3627 
3628 	if (ISSET_META(result))
3629 		value_flags |= ZBX_DC_FLAG_META;
3630 
3631 	/* Add data to the local history cache if:                                           */
3632 	/*   1) the NOVALUE flag is set (data contains either meta information or timestamp) */
3633 	/*   2) the NOVALUE flag is not set and value conversion succeeded                   */
3634 
3635 	if (0 == (value_flags & ZBX_DC_FLAG_NOVALUE))
3636 	{
3637 		if (0 != (ZBX_FLAG_DISCOVERY_RULE & item_flags))
3638 		{
3639 			if (NULL == GET_TEXT_RESULT(result))
3640 				return;
3641 
3642 			/* proxy stores low-level discovery (lld) values in db */
3643 			if (0 == (ZBX_PROGRAM_TYPE_SERVER & program_type))
3644 				dc_local_add_history_lld(itemid, ts, result->text);
3645 
3646 			return;
3647 		}
3648 
3649 		if (ISSET_LOG(result))
3650 		{
3651 			dc_local_add_history_log(itemid, item_value_type, ts, result->log, result->lastlogsize,
3652 					result->mtime, value_flags);
3653 		}
3654 		else if (ISSET_UI64(result))
3655 		{
3656 			dc_local_add_history_uint(itemid, item_value_type, ts, result->ui64, result->lastlogsize,
3657 					result->mtime, value_flags);
3658 		}
3659 		else if (ISSET_DBL(result))
3660 		{
3661 			dc_local_add_history_dbl(itemid, item_value_type, ts, result->dbl, result->lastlogsize,
3662 					result->mtime, value_flags);
3663 		}
3664 		else if (ISSET_STR(result))
3665 		{
3666 			dc_local_add_history_text(itemid, item_value_type, ts, result->str, result->lastlogsize,
3667 					result->mtime, value_flags);
3668 		}
3669 		else if (ISSET_TEXT(result))
3670 		{
3671 			dc_local_add_history_text(itemid, item_value_type, ts, result->text, result->lastlogsize,
3672 					result->mtime, value_flags);
3673 		}
3674 		else
3675 		{
3676 			THIS_SHOULD_NEVER_HAPPEN;
3677 		}
3678 	}
3679 	else
3680 	{
3681 		if (0 != (value_flags & ZBX_DC_FLAG_META))
3682 		{
3683 			dc_local_add_history_log(itemid, item_value_type, ts, NULL, result->lastlogsize, result->mtime,
3684 					value_flags);
3685 		}
3686 		else
3687 			dc_local_add_history_empty(itemid, item_value_type, ts, value_flags);
3688 	}
3689 }
3690 
dc_flush_history(void)3691 void	dc_flush_history(void)
3692 {
3693 	if (0 == item_values_num)
3694 		return;
3695 
3696 	LOCK_CACHE;
3697 
3698 	hc_add_item_values(item_values, item_values_num);
3699 
3700 	cache->history_num += item_values_num;
3701 
3702 	UNLOCK_CACHE;
3703 
3704 	item_values_num = 0;
3705 	string_values_offset = 0;
3706 }
3707 
3708 /******************************************************************************
3709  *                                                                            *
3710  * history cache storage                                                      *
3711  *                                                                            *
3712  ******************************************************************************/
ZBX_MEM_FUNC_IMPL(__hc_index,hc_index_mem)3713 ZBX_MEM_FUNC_IMPL(__hc_index, hc_index_mem)
3714 ZBX_MEM_FUNC_IMPL(__hc, hc_mem)
3715 
3716 /******************************************************************************
3717  *                                                                            *
3718  * Function: hc_queue_elem_compare_func                                       *
3719  *                                                                            *
3720  * Purpose: compares history queue elements                                   *
3721  *                                                                            *
3722  ******************************************************************************/
3723 static int	hc_queue_elem_compare_func(const void *d1, const void *d2)
3724 {
3725 	const zbx_binary_heap_elem_t	*e1 = (const zbx_binary_heap_elem_t *)d1;
3726 	const zbx_binary_heap_elem_t	*e2 = (const zbx_binary_heap_elem_t *)d2;
3727 
3728 	const zbx_hc_item_t	*item1 = (const zbx_hc_item_t *)e1->data;
3729 	const zbx_hc_item_t	*item2 = (const zbx_hc_item_t *)e2->data;
3730 
3731 	/* compare by timestamp of the oldest value */
3732 	return zbx_timespec_compare(&item1->tail->ts, &item2->tail->ts);
3733 }
3734 
3735 /******************************************************************************
3736  *                                                                            *
3737  * Function: hc_free_data                                                     *
3738  *                                                                            *
3739  * Purpose: free history item data allocated in history cache                 *
3740  *                                                                            *
3741  * Parameters: data - [IN] history item data                                  *
3742  *                                                                            *
3743  ******************************************************************************/
hc_free_data(zbx_hc_data_t * data)3744 static void	hc_free_data(zbx_hc_data_t *data)
3745 {
3746 	if (ITEM_STATE_NOTSUPPORTED == data->state)
3747 	{
3748 		__hc_mem_free_func(data->value.str);
3749 	}
3750 	else
3751 	{
3752 		if (0 == (data->flags & ZBX_DC_FLAG_NOVALUE))
3753 		{
3754 			switch (data->value_type)
3755 			{
3756 				case ITEM_VALUE_TYPE_STR:
3757 				case ITEM_VALUE_TYPE_TEXT:
3758 					__hc_mem_free_func(data->value.str);
3759 					break;
3760 				case ITEM_VALUE_TYPE_LOG:
3761 					__hc_mem_free_func(data->value.log->value);
3762 
3763 					if (NULL != data->value.log->source)
3764 						__hc_mem_free_func(data->value.log->source);
3765 
3766 					__hc_mem_free_func(data->value.log);
3767 					break;
3768 			}
3769 		}
3770 	}
3771 
3772 	__hc_mem_free_func(data);
3773 }
3774 
3775 /******************************************************************************
3776  *                                                                            *
3777  * Function: hc_queue_item                                                    *
3778  *                                                                            *
3779  * Purpose: put back item into history queue                                  *
3780  *                                                                            *
3781  * Parameters: data - [IN] history item data                                  *
3782  *                                                                            *
3783  ******************************************************************************/
hc_queue_item(zbx_hc_item_t * item)3784 static void	hc_queue_item(zbx_hc_item_t *item)
3785 {
3786 	zbx_binary_heap_elem_t	elem = {item->itemid, (const void *)item};
3787 
3788 	zbx_binary_heap_insert(&cache->history_queue, &elem);
3789 }
3790 
3791 /******************************************************************************
3792  *                                                                            *
3793  * Function: hc_get_item                                                      *
3794  *                                                                            *
3795  * Purpose: returns history item by itemid                                    *
3796  *                                                                            *
3797  * Parameters: itemid - [IN] the item id                                      *
3798  *                                                                            *
3799  * Return value: the history item or NULL if the requested item is not in     *
3800  *               history cache                                                *
3801  *                                                                            *
3802  ******************************************************************************/
hc_get_item(zbx_uint64_t itemid)3803 static zbx_hc_item_t	*hc_get_item(zbx_uint64_t itemid)
3804 {
3805 	return (zbx_hc_item_t *)zbx_hashset_search(&cache->history_items, &itemid);
3806 }
3807 
3808 /******************************************************************************
3809  *                                                                            *
3810  * Function: hc_add_item                                                      *
3811  *                                                                            *
3812  * Purpose: adds a new item to history cache                                  *
3813  *                                                                            *
3814  * Parameters: itemid - [IN] the item id                                      *
3815  *                      [IN] the item data                                    *
3816  *                                                                            *
3817  * Return value: the added history item                                       *
3818  *                                                                            *
3819  ******************************************************************************/
hc_add_item(zbx_uint64_t itemid,zbx_hc_data_t * data)3820 static zbx_hc_item_t	*hc_add_item(zbx_uint64_t itemid, zbx_hc_data_t *data)
3821 {
3822 	zbx_hc_item_t	item_local = {itemid, ZBX_HC_ITEM_STATUS_NORMAL, 0, data, data};
3823 
3824 	return (zbx_hc_item_t *)zbx_hashset_insert(&cache->history_items, &item_local, sizeof(item_local));
3825 }
3826 
3827 /******************************************************************************
3828  *                                                                            *
3829  * Function: hc_mem_value_str_dup                                             *
3830  *                                                                            *
3831  * Purpose: copies string value to history cache                              *
3832  *                                                                            *
3833  * Parameters: str - [IN] the string value                                    *
3834  *                                                                            *
3835  * Return value: the copied string or NULL if there was not enough memory     *
3836  *                                                                            *
3837  ******************************************************************************/
hc_mem_value_str_dup(const dc_value_str_t * str)3838 static char	*hc_mem_value_str_dup(const dc_value_str_t *str)
3839 {
3840 	char	*ptr;
3841 
3842 	if (NULL == (ptr = (char *)__hc_mem_malloc_func(NULL, str->len)))
3843 		return NULL;
3844 
3845 	memcpy(ptr, &string_values[str->pvalue], str->len - 1);
3846 	ptr[str->len - 1] = '\0';
3847 
3848 	return ptr;
3849 }
3850 
3851 /******************************************************************************
3852  *                                                                            *
3853  * Function: hc_clone_history_str_data                                        *
3854  *                                                                            *
3855  * Purpose: clones string value into history data memory                      *
3856  *                                                                            *
3857  * Parameters: dst - [IN/OUT] a reference to the cloned value                 *
3858  *             str - [IN] the string value to clone                           *
3859  *                                                                            *
3860  * Return value: SUCCESS - either there was no need to clone the string       *
3861  *                         (it was empty or already cloned) or the string was *
3862  *                          cloned successfully                               *
3863  *               FAIL    - not enough memory                                  *
3864  *                                                                            *
3865  * Comments: This function can be called in loop with the same dst value      *
3866  *           until it finishes cloning string value.                          *
3867  *                                                                            *
3868  ******************************************************************************/
hc_clone_history_str_data(char ** dst,const dc_value_str_t * str)3869 static int	hc_clone_history_str_data(char **dst, const dc_value_str_t *str)
3870 {
3871 	if (0 == str->len)
3872 		return SUCCEED;
3873 
3874 	if (NULL != *dst)
3875 		return SUCCEED;
3876 
3877 	if (NULL != (*dst = hc_mem_value_str_dup(str)))
3878 		return SUCCEED;
3879 
3880 	return FAIL;
3881 }
3882 
3883 /******************************************************************************
3884  *                                                                            *
3885  * Function: hc_clone_history_log_data                                        *
3886  *                                                                            *
3887  * Purpose: clones log value into history data memory                         *
3888  *                                                                            *
3889  * Parameters: dst        - [IN/OUT] a reference to the cloned value          *
3890  *             item_value - [IN] the log value to clone                       *
3891  *                                                                            *
3892  * Return value: SUCCESS - the log value was cloned successfully              *
3893  *               FAIL    - not enough memory                                  *
3894  *                                                                            *
3895  * Comments: This function can be called in loop with the same dst value      *
3896  *           until it finishes cloning log value.                             *
3897  *                                                                            *
3898  ******************************************************************************/
hc_clone_history_log_data(zbx_log_value_t ** dst,const dc_item_value_t * item_value)3899 static int	hc_clone_history_log_data(zbx_log_value_t **dst, const dc_item_value_t *item_value)
3900 {
3901 	if (NULL == *dst)
3902 	{
3903 		/* using realloc instead of malloc just to suppress 'not used' warning for realloc */
3904 		if (NULL == (*dst = (zbx_log_value_t *)__hc_mem_realloc_func(NULL, sizeof(zbx_log_value_t))))
3905 			return FAIL;
3906 
3907 		memset(*dst, 0, sizeof(zbx_log_value_t));
3908 	}
3909 
3910 	if (SUCCEED != hc_clone_history_str_data(&(*dst)->value, &item_value->value.value_str))
3911 		return FAIL;
3912 
3913 	if (SUCCEED != hc_clone_history_str_data(&(*dst)->source, &item_value->source))
3914 		return FAIL;
3915 
3916 	(*dst)->logeventid = item_value->logeventid;
3917 	(*dst)->severity = item_value->severity;
3918 	(*dst)->timestamp = item_value->timestamp;
3919 
3920 	return SUCCEED;
3921 }
3922 
3923 /******************************************************************************
3924  *                                                                            *
3925  * Function: hc_clone_history_data                                            *
3926  *                                                                            *
3927  * Purpose: clones item value from local cache into history cache             *
3928  *                                                                            *
3929  * Parameters: data       - [IN/OUT] a reference to the cloned value          *
3930  *             item_value - [IN] the item value                               *
3931  *                                                                            *
3932  * Return value: SUCCESS - the item value was cloned successfully             *
3933  *               FAIL    - not enough memory                                  *
3934  *                                                                            *
3935  * Comments: This function can be called in loop with the same data value     *
3936  *           until it finishes cloning item value.                            *
3937  *                                                                            *
3938  ******************************************************************************/
hc_clone_history_data(zbx_hc_data_t ** data,const dc_item_value_t * item_value)3939 static int	hc_clone_history_data(zbx_hc_data_t **data, const dc_item_value_t *item_value)
3940 {
3941 	if (NULL == *data)
3942 	{
3943 		if (NULL == (*data = (zbx_hc_data_t *)__hc_mem_malloc_func(NULL, sizeof(zbx_hc_data_t))))
3944 			return FAIL;
3945 
3946 		memset(*data, 0, sizeof(zbx_hc_data_t));
3947 
3948 		(*data)->state = item_value->state;
3949 		(*data)->ts = item_value->ts;
3950 		(*data)->flags = item_value->flags;
3951 	}
3952 
3953 	if (0 != (ZBX_DC_FLAG_META & item_value->flags))
3954 	{
3955 		(*data)->lastlogsize = item_value->lastlogsize;
3956 		(*data)->mtime = item_value->mtime;
3957 	}
3958 
3959 	if (ITEM_STATE_NOTSUPPORTED == item_value->state)
3960 	{
3961 		if (NULL == ((*data)->value.str = hc_mem_value_str_dup(&item_value->value.value_str)))
3962 			return FAIL;
3963 
3964 		(*data)->value_type = item_value->value_type;
3965 		cache->stats.notsupported_counter++;
3966 
3967 		return SUCCEED;
3968 	}
3969 
3970 	if (0 != (ZBX_DC_FLAG_LLD & item_value->flags))
3971 	{
3972 		if (NULL == ((*data)->value.str = hc_mem_value_str_dup(&item_value->value.value_str)))
3973 			return FAIL;
3974 
3975 		(*data)->value_type = ITEM_VALUE_TYPE_TEXT;
3976 
3977 		cache->stats.history_text_counter++;
3978 		cache->stats.history_counter++;
3979 
3980 		return SUCCEED;
3981 	}
3982 
3983 	if (0 == (ZBX_DC_FLAG_NOVALUE & item_value->flags))
3984 	{
3985 		switch (item_value->value_type)
3986 		{
3987 			case ITEM_VALUE_TYPE_FLOAT:
3988 				(*data)->value.dbl = item_value->value.value_dbl;
3989 				break;
3990 			case ITEM_VALUE_TYPE_UINT64:
3991 				(*data)->value.ui64 = item_value->value.value_uint;
3992 				break;
3993 			case ITEM_VALUE_TYPE_STR:
3994 				if (SUCCEED != hc_clone_history_str_data(&(*data)->value.str,
3995 						&item_value->value.value_str))
3996 				{
3997 					return FAIL;
3998 				}
3999 				break;
4000 			case ITEM_VALUE_TYPE_TEXT:
4001 				if (SUCCEED != hc_clone_history_str_data(&(*data)->value.str,
4002 						&item_value->value.value_str))
4003 				{
4004 					return FAIL;
4005 				}
4006 				break;
4007 			case ITEM_VALUE_TYPE_LOG:
4008 				if (SUCCEED != hc_clone_history_log_data(&(*data)->value.log, item_value))
4009 					return FAIL;
4010 				break;
4011 		}
4012 
4013 		switch (item_value->item_value_type)
4014 		{
4015 			case ITEM_VALUE_TYPE_FLOAT:
4016 				cache->stats.history_float_counter++;
4017 				break;
4018 			case ITEM_VALUE_TYPE_UINT64:
4019 				cache->stats.history_uint_counter++;
4020 				break;
4021 			case ITEM_VALUE_TYPE_STR:
4022 				cache->stats.history_str_counter++;
4023 				break;
4024 			case ITEM_VALUE_TYPE_TEXT:
4025 				cache->stats.history_text_counter++;
4026 				break;
4027 			case ITEM_VALUE_TYPE_LOG:
4028 				cache->stats.history_log_counter++;
4029 				break;
4030 		}
4031 
4032 		cache->stats.history_counter++;
4033 	}
4034 
4035 	(*data)->value_type = item_value->value_type;
4036 
4037 	return SUCCEED;
4038 }
4039 
4040 /******************************************************************************
4041  *                                                                            *
4042  * Function: hc_add_item_values                                               *
4043  *                                                                            *
4044  * Purpose: adds item values to the history cache                             *
4045  *                                                                            *
4046  * Parameters: values     - [IN] the item values to add                       *
4047  *             values_num - [IN] the number of item values to add             *
4048  *                                                                            *
4049  * Comments: If the history cache is full this function will wait until       *
4050  *           history syncers processes values freeing enough space to store   *
4051  *           the new value.                                                   *
4052  *                                                                            *
4053  ******************************************************************************/
hc_add_item_values(dc_item_value_t * values,int values_num)4054 static void	hc_add_item_values(dc_item_value_t *values, int values_num)
4055 {
4056 	dc_item_value_t	*item_value;
4057 	int		i;
4058 	zbx_hc_item_t	*item;
4059 
4060 	for (i = 0; i < values_num; i++)
4061 	{
4062 		zbx_hc_data_t	*data = NULL;
4063 
4064 		item_value = &values[i];
4065 
4066 		/* a record with metadata and no value can be dropped if  */
4067 		/* the metadata update is copied to the last queued value */
4068 		if (NULL != (item = hc_get_item(item_value->itemid)) &&
4069 				0 != (item_value->flags & ZBX_DC_FLAG_NOVALUE) &&
4070 				0 != (item_value->flags & ZBX_DC_FLAG_META))
4071 		{
4072 			/* skip metadata updates when only one value is queued, */
4073 			/* because the item might be already being processed    */
4074 			if (item->head != item->tail)
4075 			{
4076 				item->head->lastlogsize = item_value->lastlogsize;
4077 				item->head->mtime = item_value->mtime;
4078 				item->head->flags |= ZBX_DC_FLAG_META;
4079 				continue;
4080 			}
4081 		}
4082 
4083 		if (SUCCEED != hc_clone_history_data(&data, item_value))
4084 		{
4085 			do
4086 			{
4087 				UNLOCK_CACHE;
4088 
4089 				zabbix_log(LOG_LEVEL_DEBUG, "History cache is full. Sleeping for 1 second.");
4090 				sleep(1);
4091 
4092 				LOCK_CACHE;
4093 			}
4094 			while (SUCCEED != hc_clone_history_data(&data, item_value));
4095 
4096 			item = hc_get_item(item_value->itemid);
4097 		}
4098 
4099 		if (NULL == item)
4100 		{
4101 			item = hc_add_item(item_value->itemid, data);
4102 			hc_queue_item(item);
4103 		}
4104 		else
4105 		{
4106 			item->head->next = data;
4107 			item->head = data;
4108 		}
4109 		item->values_num++;
4110 	}
4111 }
4112 
4113 /******************************************************************************
4114  *                                                                            *
4115  * Function: hc_copy_history_data                                             *
4116  *                                                                            *
4117  * Purpose: copies item value from history cache into the specified history   *
4118  *          value                                                             *
4119  *                                                                            *
4120  * Parameters: history - [OUT] the history value                              *
4121  *             itemid  - [IN] the item identifier                             *
4122  *             data    - [IN] the history data to copy                        *
4123  *                                                                            *
4124  * Comments: handling of uninitialized fields in dc_add_proxy_history_log()   *
4125  *                                                                            *
4126  ******************************************************************************/
hc_copy_history_data(ZBX_DC_HISTORY * history,zbx_uint64_t itemid,zbx_hc_data_t * data)4127 static void	hc_copy_history_data(ZBX_DC_HISTORY *history, zbx_uint64_t itemid, zbx_hc_data_t *data)
4128 {
4129 	history->itemid = itemid;
4130 	history->ts = data->ts;
4131 	history->state = data->state;
4132 	history->flags = data->flags;
4133 	history->lastlogsize = data->lastlogsize;
4134 	history->mtime = data->mtime;
4135 
4136 	if (ITEM_STATE_NOTSUPPORTED == data->state)
4137 	{
4138 		history->value.err = zbx_strdup(NULL, data->value.str);
4139 		history->flags |= ZBX_DC_FLAG_UNDEF;
4140 		return;
4141 	}
4142 
4143 	history->value_type = data->value_type;
4144 
4145 	if (0 == (ZBX_DC_FLAG_NOVALUE & data->flags))
4146 	{
4147 		switch (data->value_type)
4148 		{
4149 			case ITEM_VALUE_TYPE_FLOAT:
4150 				history->value.dbl = data->value.dbl;
4151 				break;
4152 			case ITEM_VALUE_TYPE_UINT64:
4153 				history->value.ui64 = data->value.ui64;
4154 				break;
4155 			case ITEM_VALUE_TYPE_STR:
4156 			case ITEM_VALUE_TYPE_TEXT:
4157 				history->value.str = zbx_strdup(NULL, data->value.str);
4158 				break;
4159 			case ITEM_VALUE_TYPE_LOG:
4160 				history->value.log = (zbx_log_value_t *)zbx_malloc(NULL, sizeof(zbx_log_value_t));
4161 				history->value.log->value = zbx_strdup(NULL, data->value.log->value);
4162 
4163 				if (NULL != data->value.log->source)
4164 					history->value.log->source = zbx_strdup(NULL, data->value.log->source);
4165 				else
4166 					history->value.log->source = NULL;
4167 
4168 				history->value.log->timestamp = data->value.log->timestamp;
4169 				history->value.log->severity = data->value.log->severity;
4170 				history->value.log->logeventid = data->value.log->logeventid;
4171 
4172 				break;
4173 		}
4174 	}
4175 }
4176 
4177 /******************************************************************************
4178  *                                                                            *
4179  * Function: hc_pop_items                                                     *
4180  *                                                                            *
4181  * Purpose: pops the next batch of history items from cache for processing    *
4182  *                                                                            *
4183  * Parameters: history_items - [OUT] the locked history items                 *
4184  *                                                                            *
4185  * Comments: The history_items must be returned back to history cache with    *
4186  *           hc_push_items() function after they have been processed.         *
4187  *                                                                            *
4188  ******************************************************************************/
hc_pop_items(zbx_vector_ptr_t * history_items)4189 static void	hc_pop_items(zbx_vector_ptr_t *history_items)
4190 {
4191 	zbx_binary_heap_elem_t	*elem;
4192 	zbx_hc_item_t		*item;
4193 
4194 	while (ZBX_HC_SYNC_MAX > history_items->values_num && FAIL == zbx_binary_heap_empty(&cache->history_queue))
4195 	{
4196 		elem = zbx_binary_heap_find_min(&cache->history_queue);
4197 		item = (zbx_hc_item_t *)elem->data;
4198 		zbx_vector_ptr_append(history_items, item);
4199 
4200 		zbx_binary_heap_remove_min(&cache->history_queue);
4201 	}
4202 }
4203 
4204 /******************************************************************************
4205  *                                                                            *
4206  * Function: hc_get_item_values                                               *
4207  *                                                                            *
4208  * Purpose: gets item history values                                          *
4209  *                                                                            *
4210  * Parameters: history       - [OUT] the history valeus                       *
4211  *             history_items - [IN] the history items                         *
4212  *                                                                            *
4213  ******************************************************************************/
hc_get_item_values(ZBX_DC_HISTORY * history,zbx_vector_ptr_t * history_items)4214 static void	hc_get_item_values(ZBX_DC_HISTORY *history, zbx_vector_ptr_t *history_items)
4215 {
4216 	int		i, history_num = 0;
4217 	zbx_hc_item_t	*item;
4218 
4219 	/* we don't need to lock history cache because no other processes can  */
4220 	/* change item's history data until it is pushed back to history queue */
4221 	for (i = 0; i < history_items->values_num; i++)
4222 	{
4223 		item = (zbx_hc_item_t *)history_items->values[i];
4224 
4225 		if (ZBX_HC_ITEM_STATUS_BUSY == item->status)
4226 			continue;
4227 
4228 		hc_copy_history_data(&history[history_num++], item->itemid, item->tail);
4229 	}
4230 }
4231 
4232 /******************************************************************************
4233  *                                                                            *
4234  * Function: hc_push_processed_items                                          *
4235  *                                                                            *
4236  * Purpose: push back the processed history items into history cache          *
4237  *                                                                            *
4238  * Parameters: history_items - [IN] the history items containing processed    *
4239  *                                  (available) and busy items                *
4240  *                                                                            *
4241  * Comments: This function removes processed value from history cache.        *
4242  *           If there is no more data for this item, then the item itself is  *
4243  *           removed from history index.                                      *
4244  *                                                                            *
4245  ******************************************************************************/
hc_push_items(zbx_vector_ptr_t * history_items)4246 void	hc_push_items(zbx_vector_ptr_t *history_items)
4247 {
4248 	int		i;
4249 	zbx_hc_item_t	*item;
4250 	zbx_hc_data_t	*data_free;
4251 
4252 	for (i = 0; i < history_items->values_num; i++)
4253 	{
4254 		item = (zbx_hc_item_t *)history_items->values[i];
4255 
4256 		switch (item->status)
4257 		{
4258 			case ZBX_HC_ITEM_STATUS_BUSY:
4259 				/* reset item status before returning it to queue */
4260 				item->status = ZBX_HC_ITEM_STATUS_NORMAL;
4261 				hc_queue_item(item);
4262 				break;
4263 			case ZBX_HC_ITEM_STATUS_NORMAL:
4264 				item->values_num--;
4265 				data_free = item->tail;
4266 				item->tail = item->tail->next;
4267 				hc_free_data(data_free);
4268 				if (NULL == item->tail)
4269 					zbx_hashset_remove(&cache->history_items, item);
4270 				else
4271 					hc_queue_item(item);
4272 				break;
4273 		}
4274 	}
4275 }
4276 
4277 /******************************************************************************
4278  *                                                                            *
4279  * Function: hc_queue_get_size                                                *
4280  *                                                                            *
4281  * Purpose: retrieve the size of history queue                                *
4282  *                                                                            *
4283  ******************************************************************************/
hc_queue_get_size(void)4284 int	hc_queue_get_size(void)
4285 {
4286 	return cache->history_queue.elems_num;
4287 }
4288 
hc_get_history_compression_age(void)4289 int	hc_get_history_compression_age(void)
4290 {
4291 #if defined(HAVE_POSTGRESQL)
4292 	zbx_config_t	cfg;
4293 	int		compression_age = 0;
4294 
4295 	zbx_config_get(&cfg, ZBX_CONFIG_FLAGS_DB_EXTENSION);
4296 
4297 	if (ON == cfg.db.history_compression_availability &&
4298 			ON == cfg.db.history_compression_status &&
4299 			0 != cfg.db.history_compress_older)
4300 	{
4301 		compression_age = (int)time(NULL) - cfg.db.history_compress_older;
4302 	}
4303 
4304 	zbx_config_clean(&cfg);
4305 
4306 	return compression_age;
4307 #else
4308 	return 0;
4309 #endif
4310 }
4311 
4312 /******************************************************************************
4313  *                                                                            *
4314  * Function: init_trend_cache                                                 *
4315  *                                                                            *
4316  * Purpose: Allocate shared memory for trend cache (part of database cache)   *
4317  *                                                                            *
4318  * Author: Vladimir Levijev                                                   *
4319  *                                                                            *
4320  * Comments: Is optionally called from init_database_cache()                  *
4321  *                                                                            *
4322  ******************************************************************************/
4323 
ZBX_MEM_FUNC_IMPL(__trend,trend_mem)4324 ZBX_MEM_FUNC_IMPL(__trend, trend_mem)
4325 
4326 static int	init_trend_cache(char **error)
4327 {
4328 	size_t	sz;
4329 	int	ret;
4330 
4331 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
4332 
4333 	if (SUCCEED != (ret = zbx_mutex_create(&trends_lock, ZBX_MUTEX_TRENDS, error)))
4334 		goto out;
4335 
4336 	sz = zbx_mem_required_size(1, "trend cache", "TrendCacheSize");
4337 	if (SUCCEED != (ret = zbx_mem_create(&trend_mem, CONFIG_TRENDS_CACHE_SIZE, "trend cache", "TrendCacheSize", 0,
4338 			error)))
4339 	{
4340 		goto out;
4341 	}
4342 
4343 	CONFIG_TRENDS_CACHE_SIZE -= sz;
4344 
4345 	cache->trends_num = 0;
4346 	cache->trends_last_cleanup_hour = 0;
4347 
4348 #define INIT_HASHSET_SIZE	100	/* Should be calculated dynamically based on trends size? */
4349 					/* Still does not make sense to have it more than initial */
4350 					/* item hashset size in configuration cache.              */
4351 
4352 	zbx_hashset_create_ext(&cache->trends, INIT_HASHSET_SIZE,
4353 			ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, NULL,
4354 			__trend_mem_malloc_func, __trend_mem_realloc_func, __trend_mem_free_func);
4355 
4356 #undef INIT_HASHSET_SIZE
4357 out:
4358 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
4359 
4360 	return ret;
4361 }
4362 
4363 /******************************************************************************
4364  *                                                                            *
4365  * Function: init_database_cache                                              *
4366  *                                                                            *
4367  * Purpose: Allocate shared memory for database cache                         *
4368  *                                                                            *
4369  * Author: Alexei Vladishev, Alexander Vladishev                              *
4370  *                                                                            *
4371  ******************************************************************************/
init_database_cache(char ** error)4372 int	init_database_cache(char **error)
4373 {
4374 	int	ret;
4375 
4376 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
4377 
4378 	if (SUCCEED != (ret = zbx_mutex_create(&cache_lock, ZBX_MUTEX_CACHE, error)))
4379 		goto out;
4380 
4381 	if (SUCCEED != (ret = zbx_mutex_create(&cache_ids_lock, ZBX_MUTEX_CACHE_IDS, error)))
4382 		goto out;
4383 
4384 	if (SUCCEED != (ret = zbx_mem_create(&hc_mem, CONFIG_HISTORY_CACHE_SIZE, "history cache",
4385 			"HistoryCacheSize", 1, error)))
4386 	{
4387 		goto out;
4388 	}
4389 
4390 	if (SUCCEED != (ret = zbx_mem_create(&hc_index_mem, CONFIG_HISTORY_INDEX_CACHE_SIZE, "history index cache",
4391 			"HistoryIndexCacheSize", 0, error)))
4392 	{
4393 		goto out;
4394 	}
4395 
4396 	cache = (ZBX_DC_CACHE *)__hc_index_mem_malloc_func(NULL, sizeof(ZBX_DC_CACHE));
4397 	memset(cache, 0, sizeof(ZBX_DC_CACHE));
4398 
4399 	ids = (ZBX_DC_IDS *)__hc_index_mem_malloc_func(NULL, sizeof(ZBX_DC_IDS));
4400 	memset(ids, 0, sizeof(ZBX_DC_IDS));
4401 
4402 	zbx_hashset_create_ext(&cache->history_items, ZBX_HC_ITEMS_INIT_SIZE,
4403 			ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, NULL,
4404 			__hc_index_mem_malloc_func, __hc_index_mem_realloc_func, __hc_index_mem_free_func);
4405 
4406 	zbx_binary_heap_create_ext(&cache->history_queue, hc_queue_elem_compare_func, ZBX_BINARY_HEAP_OPTION_EMPTY,
4407 			__hc_index_mem_malloc_func, __hc_index_mem_realloc_func, __hc_index_mem_free_func);
4408 
4409 	if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
4410 	{
4411 		zbx_hashset_create_ext(&(cache->proxyqueue.index), ZBX_HC_SYNC_MAX,
4412 			ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC, NULL,
4413 			__hc_index_mem_malloc_func, __hc_index_mem_realloc_func, __hc_index_mem_free_func);
4414 
4415 		zbx_list_create_ext(&(cache->proxyqueue.list), __hc_index_mem_malloc_func, __hc_index_mem_free_func);
4416 
4417 		cache->proxyqueue.state = ZBX_HC_PROXYQUEUE_STATE_NORMAL;
4418 
4419 		if (SUCCEED != (ret = init_trend_cache(error)))
4420 			goto out;
4421 	}
4422 
4423 	cache->history_num_total = 0;
4424 	cache->history_progress_ts = 0;
4425 
4426 	if (NULL == sql)
4427 		sql = (char *)zbx_malloc(sql, sql_alloc);
4428 out:
4429 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
4430 
4431 	return ret;
4432 }
4433 
4434 /******************************************************************************
4435  *                                                                            *
4436  * Function: DCsync_all                                                       *
4437  *                                                                            *
4438  * Purpose: writes updates and new data from pool and cache data to database  *
4439  *                                                                            *
4440  * Author: Alexei Vladishev                                                   *
4441  *                                                                            *
4442  ******************************************************************************/
DCsync_all(void)4443 static void	DCsync_all(void)
4444 {
4445 	zabbix_log(LOG_LEVEL_DEBUG, "In DCsync_all()");
4446 
4447 	sync_history_cache_full();
4448 	if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
4449 		DCsync_trends();
4450 
4451 	zabbix_log(LOG_LEVEL_DEBUG, "End of DCsync_all()");
4452 }
4453 
4454 /******************************************************************************
4455  *                                                                            *
4456  * Function: free_database_cache                                              *
4457  *                                                                            *
4458  * Purpose: Free memory allocated for database cache                          *
4459  *                                                                            *
4460  * Author: Alexei Vladishev, Alexander Vladishev                              *
4461  *                                                                            *
4462  ******************************************************************************/
free_database_cache(void)4463 void	free_database_cache(void)
4464 {
4465 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
4466 
4467 	DCsync_all();
4468 
4469 	cache = NULL;
4470 
4471 	zbx_mutex_destroy(&cache_lock);
4472 	zbx_mutex_destroy(&cache_ids_lock);
4473 
4474 	if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
4475 		zbx_mutex_destroy(&trends_lock);
4476 
4477 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
4478 }
4479 
4480 /******************************************************************************
4481  *                                                                            *
4482  * Function: DCget_nextid                                                     *
4483  *                                                                            *
4484  * Purpose: Return next id for requested table                                *
4485  *                                                                            *
4486  * Author: Alexander Vladishev                                                *
4487  *                                                                            *
4488  ******************************************************************************/
DCget_nextid(const char * table_name,int num)4489 zbx_uint64_t	DCget_nextid(const char *table_name, int num)
4490 {
4491 	int		i;
4492 	DB_RESULT	result;
4493 	DB_ROW		row;
4494 	const ZBX_TABLE	*table;
4495 	ZBX_DC_ID	*id;
4496 	zbx_uint64_t	min = 0, max = ZBX_DB_MAX_ID, nextid, lastid;
4497 
4498 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() table:'%s' num:%d", __func__, table_name, num);
4499 
4500 	LOCK_CACHE_IDS;
4501 
4502 	for (i = 0; i < ZBX_IDS_SIZE; i++)
4503 	{
4504 		id = &ids->id[i];
4505 		if ('\0' == *id->table_name)
4506 			break;
4507 
4508 		if (0 == strcmp(id->table_name, table_name))
4509 		{
4510 			nextid = id->lastid + 1;
4511 			id->lastid += num;
4512 			lastid = id->lastid;
4513 
4514 			UNLOCK_CACHE_IDS;
4515 
4516 			zabbix_log(LOG_LEVEL_DEBUG, "End of %s() table:'%s' [" ZBX_FS_UI64 ":" ZBX_FS_UI64 "]",
4517 					__func__, table_name, nextid, lastid);
4518 
4519 			return nextid;
4520 		}
4521 	}
4522 
4523 	if (i == ZBX_IDS_SIZE)
4524 	{
4525 		zabbix_log(LOG_LEVEL_ERR, "insufficient shared memory for ids");
4526 		exit(EXIT_FAILURE);
4527 	}
4528 
4529 	table = DBget_table(table_name);
4530 
4531 	result = DBselect("select max(%s) from %s where %s between " ZBX_FS_UI64 " and " ZBX_FS_UI64,
4532 			table->recid, table_name, table->recid, min, max);
4533 
4534 	if (NULL != result)
4535 	{
4536 		zbx_strlcpy(id->table_name, table_name, sizeof(id->table_name));
4537 
4538 		if (NULL == (row = DBfetch(result)) || SUCCEED == DBis_null(row[0]))
4539 			id->lastid = min;
4540 		else
4541 			ZBX_STR2UINT64(id->lastid, row[0]);
4542 
4543 		nextid = id->lastid + 1;
4544 		id->lastid += num;
4545 		lastid = id->lastid;
4546 	}
4547 	else
4548 		nextid = lastid = 0;
4549 
4550 	UNLOCK_CACHE_IDS;
4551 
4552 	DBfree_result(result);
4553 
4554 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() table:'%s' [" ZBX_FS_UI64 ":" ZBX_FS_UI64 "]",
4555 			__func__, table_name, nextid, lastid);
4556 
4557 	return nextid;
4558 }
4559 
4560 /******************************************************************************
4561  *                                                                            *
4562  * Function: DCupdate_hosts_availability                                      *
4563  *                                                                            *
4564  * Purpose: performs host availability reset for hosts with availability set  *
4565  *          on interfaces without enabled items                               *
4566  *                                                                            *
4567  ******************************************************************************/
DCupdate_hosts_availability(void)4568 void	DCupdate_hosts_availability(void)
4569 {
4570 	zbx_vector_ptr_t	hosts;
4571 	char			*sql_buf = NULL;
4572 	size_t			sql_buf_alloc = 0, sql_buf_offset = 0;
4573 	int			i;
4574 
4575 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
4576 
4577 	zbx_vector_ptr_create(&hosts);
4578 
4579 	if (SUCCEED != DCreset_hosts_availability(&hosts))
4580 		goto out;
4581 
4582 	DBbegin();
4583 	DBbegin_multiple_update(&sql_buf, &sql_buf_alloc, &sql_buf_offset);
4584 
4585 	for (i = 0; i < hosts.values_num; i++)
4586 	{
4587 		if (SUCCEED != zbx_sql_add_host_availability(&sql_buf, &sql_buf_alloc, &sql_buf_offset,
4588 				(zbx_host_availability_t *)hosts.values[i]))
4589 		{
4590 			continue;
4591 		}
4592 
4593 		zbx_strcpy_alloc(&sql_buf, &sql_buf_alloc, &sql_buf_offset, ";\n");
4594 		DBexecute_overflowed_sql(&sql_buf, &sql_buf_alloc, &sql_buf_offset);
4595 	}
4596 
4597 	DBend_multiple_update(&sql_buf, &sql_buf_alloc, &sql_buf_offset);
4598 
4599 	if (16 < sql_buf_offset)
4600 		DBexecute("%s", sql_buf);
4601 
4602 	DBcommit();
4603 
4604 	zbx_free(sql_buf);
4605 out:
4606 	zbx_vector_ptr_clear_ext(&hosts, (zbx_mem_free_func_t)zbx_host_availability_free);
4607 	zbx_vector_ptr_destroy(&hosts);
4608 
4609 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
4610 }
4611 
4612 /******************************************************************************
4613  *                                                                            *
4614  * Function: zbx_hc_get_diag_stats                                            *
4615  *                                                                            *
4616  * Purpose: get history cache diagnostics statistics                          *
4617  *                                                                            *
4618  ******************************************************************************/
zbx_hc_get_diag_stats(zbx_uint64_t * items_num,zbx_uint64_t * values_num)4619 void	zbx_hc_get_diag_stats(zbx_uint64_t *items_num, zbx_uint64_t *values_num)
4620 {
4621 	LOCK_CACHE;
4622 
4623 	*values_num = cache->history_num;
4624 	*items_num = cache->history_items.num_data;
4625 
4626 	UNLOCK_CACHE;
4627 }
4628 
4629 /******************************************************************************
4630  *                                                                            *
4631  * Function: zbx_hc_get_mem_stats                                             *
4632  *                                                                            *
4633  * Purpose: get shared memory allocator statistics                            *
4634  *                                                                            *
4635  ******************************************************************************/
zbx_hc_get_mem_stats(zbx_mem_stats_t * data,zbx_mem_stats_t * index)4636 void	zbx_hc_get_mem_stats(zbx_mem_stats_t *data, zbx_mem_stats_t *index)
4637 {
4638 	LOCK_CACHE;
4639 
4640 	if (NULL != data)
4641 		zbx_mem_get_stats(hc_mem, data);
4642 
4643 	if (NULL != index)
4644 		zbx_mem_get_stats(hc_index_mem, index);
4645 
4646 	UNLOCK_CACHE;
4647 }
4648 
4649 /******************************************************************************
4650  *                                                                            *
4651  * Function: zbx_hc_get_items                                                 *
4652  *                                                                            *
4653  * Purpose: get statistics of cached items                                    *
4654  *                                                                            *
4655  ******************************************************************************/
zbx_hc_get_items(zbx_vector_uint64_pair_t * items)4656 void	zbx_hc_get_items(zbx_vector_uint64_pair_t *items)
4657 {
4658 	zbx_hashset_iter_t	iter;
4659 	zbx_hc_item_t		*item;
4660 
4661 	LOCK_CACHE;
4662 
4663 	zbx_vector_uint64_pair_reserve(items, cache->history_items.num_data);
4664 
4665 	zbx_hashset_iter_reset(&cache->history_items, &iter);
4666 	while (NULL != (item = (zbx_hc_item_t *)zbx_hashset_iter_next(&iter)))
4667 	{
4668 		zbx_uint64_pair_t	pair = {item->itemid, item->values_num};
4669 		zbx_vector_uint64_pair_append_ptr(items, &pair);
4670 	}
4671 
4672 	UNLOCK_CACHE;
4673 }
4674 
4675 /******************************************************************************
4676  *                                                                            *
4677  * Function: zbx_hc_proxyqueue_peek                                           *
4678  *                                                                            *
4679  * Purpose: return first proxy in a queue, function assumes that a queue is   *
4680  *          not empty                                                         *
4681  *                                                                            *
4682  * Return value: proxyid at the top a queue                                   *
4683  *                                                                            *
4684  ******************************************************************************/
zbx_hc_proxyqueue_peek(void)4685 static zbx_uint64_t	zbx_hc_proxyqueue_peek(void)
4686 {
4687 	zbx_uint64_t	*p_val;
4688 
4689 	if (NULL == cache->proxyqueue.list.head)
4690 		return 0;
4691 
4692 	p_val = (zbx_uint64_t *)(cache->proxyqueue.list.head->data);
4693 
4694 	return *p_val;
4695 }
4696 
4697 /******************************************************************************
4698  *                                                                            *
4699  * Function: zbx_hc_proxyqueue_enqueue                                        *
4700  *                                                                            *
4701  * Purpose: add new proxyid to a queue                                        *
4702  *                                                                            *
4703  * Parameters: proxyid   - [IN] the proxy id                                  *
4704  *                                                                            *
4705  ******************************************************************************/
zbx_hc_proxyqueue_enqueue(zbx_uint64_t proxyid)4706 static void	zbx_hc_proxyqueue_enqueue(zbx_uint64_t proxyid)
4707 {
4708 	if (NULL == zbx_hashset_search(&cache->proxyqueue.index, &proxyid))
4709 	{
4710 		zbx_uint64_t *ptr;
4711 
4712 		ptr = zbx_hashset_insert(&cache->proxyqueue.index, &proxyid, sizeof(proxyid));
4713 		zbx_list_append(&cache->proxyqueue.list, ptr, NULL);
4714 	}
4715 }
4716 
4717 /******************************************************************************
4718  *                                                                            *
4719  * Function: zbx_hc_proxyqueue_dequeue                                        *
4720  *                                                                            *
4721  * Purpose: try to dequeue proxyid from a proxy queue                         *
4722  *                                                                            *
4723  * Parameters: chk_proxyid  - [IN] the proxyid                                *
4724  *                                                                            *
4725  * Return value: SUCCEED - retrieval successful                               *
4726  *               FAIL    - otherwise                                          *
4727  *                                                                            *
4728  ******************************************************************************/
zbx_hc_proxyqueue_dequeue(zbx_uint64_t proxyid)4729 static int	zbx_hc_proxyqueue_dequeue(zbx_uint64_t proxyid)
4730 {
4731 	zbx_uint64_t	top_val;
4732 	void		*rem_val = 0;
4733 
4734 	top_val = zbx_hc_proxyqueue_peek();
4735 
4736 	if (proxyid != top_val)
4737 		return FAIL;
4738 
4739 	if (FAIL == zbx_list_pop(&cache->proxyqueue.list, &rem_val))
4740 		return FAIL;
4741 
4742 	zbx_hashset_remove_direct(&cache->proxyqueue.index, rem_val);
4743 
4744 	return SUCCEED;
4745 }
4746 
4747 /******************************************************************************
4748  *                                                                            *
4749  * Function: zbx_hc_proxyqueue_clear                                          *
4750  *                                                                            *
4751  * Purpose: remove all proxies from proxy priority queue                      *
4752  *                                                                            *
4753  ******************************************************************************/
zbx_hc_proxyqueue_clear(void)4754 static void	zbx_hc_proxyqueue_clear(void)
4755 {
4756 	zbx_list_destroy(&cache->proxyqueue.list);
4757 	zbx_hashset_clear(&cache->proxyqueue.index);
4758 }
4759 
4760 /******************************************************************************
4761  *                                                                            *
4762  * Function: zbx_hc_check_proxy                                               *
4763  *                                                                            *
4764  * Purpose: check status of a history cache usage, enqueue/dequeue proxy      *
4765  *          from priority list and accordingly enable or disable wait mode    *
4766  *                                                                            *
4767  * Parameters: proxyid   - [IN] the proxyid                                   *
4768  *                                                                            *
4769  * Return value: SUCCEED - proxy can be processed now                         *
4770  *               FAIL    - proxy cannot be processed now, it got enqueued     *
4771  *                                                                            *
4772  ******************************************************************************/
zbx_hc_check_proxy(zbx_uint64_t proxyid)4773 int	zbx_hc_check_proxy(zbx_uint64_t proxyid)
4774 {
4775 	double	hc_pused;
4776 	int	ret;
4777 
4778 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() proxyid:"ZBX_FS_UI64, __func__, proxyid);
4779 
4780 	LOCK_CACHE;
4781 
4782 	hc_pused = 100 * (double)(hc_mem->total_size - hc_mem->free_size) / hc_mem->total_size;
4783 
4784 	if (20 >= hc_pused)
4785 	{
4786 		cache->proxyqueue.state = ZBX_HC_PROXYQUEUE_STATE_NORMAL;
4787 
4788 		zbx_hc_proxyqueue_clear();
4789 
4790 		ret = SUCCEED;
4791 		goto out;
4792 	}
4793 
4794 	if (ZBX_HC_PROXYQUEUE_STATE_WAIT == cache->proxyqueue.state)
4795 	{
4796 		zbx_hc_proxyqueue_enqueue(proxyid);
4797 
4798 		if (60 < hc_pused)
4799 		{
4800 			ret = FAIL;
4801 			goto out;
4802 		}
4803 
4804 		cache->proxyqueue.state = ZBX_HC_PROXYQUEUE_STATE_NORMAL;
4805 	}
4806 	else
4807 	{
4808 		if (80 <= hc_pused)
4809 		{
4810 			cache->proxyqueue.state = ZBX_HC_PROXYQUEUE_STATE_WAIT;
4811 			zbx_hc_proxyqueue_enqueue(proxyid);
4812 
4813 			ret = FAIL;
4814 			goto out;
4815 		}
4816 	}
4817 
4818 	if (0 == zbx_hc_proxyqueue_peek())
4819 	{
4820 		ret = SUCCEED;
4821 		goto out;
4822 	}
4823 
4824 	ret = zbx_hc_proxyqueue_dequeue(proxyid);
4825 
4826 out:
4827 	UNLOCK_CACHE;
4828 
4829 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s():%s", __func__, zbx_result_string(ret));
4830 
4831 	return ret;
4832 }
4833 
4834