1 /*
2 ** Zabbix
3 ** Copyright (C) 2001-2021 Zabbix SIA
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
18 **/
19 
20 #include "common.h"
21 
22 #include "dbcache.h"
23 #include "daemon.h"
24 #include "zbxself.h"
25 #include "log.h"
26 #include "zbxserver.h"
27 #include "sysinfo.h"
28 #include "zbxserialize.h"
29 #include "zbxipcservice.h"
30 #include "zbxlld.h"
31 
32 #include "preprocessing.h"
33 #include "preproc_manager.h"
34 #include "zbxalgo.h"
35 #include "preproc_history.h"
36 
37 extern unsigned char	process_type, program_type;
38 extern int		server_num, process_num, CONFIG_PREPROCESSOR_FORKS;
39 
40 #define ZBX_PREPROCESSING_MANAGER_DELAY	1
41 
42 #define ZBX_PREPROC_PRIORITY_NONE	0
43 #define ZBX_PREPROC_PRIORITY_FIRST	1
44 
45 typedef enum
46 {
47 	REQUEST_STATE_QUEUED		= 0,		/* requires preprocessing */
48 	REQUEST_STATE_PROCESSING	= 1,		/* is being preprocessed  */
49 	REQUEST_STATE_DONE		= 2,		/* value is set, waiting for flush */
50 	REQUEST_STATE_PENDING		= 3		/* value requires preprocessing, */
51 							/* but is waiting on other request to complete */
52 }
53 zbx_preprocessing_states_t;
54 
55 /* preprocessing request */
56 typedef struct preprocessing_request
57 {
58 	zbx_preprocessing_states_t	state;		/* request state */
59 	struct preprocessing_request	*pending;	/* the request waiting on this request to complete */
60 	zbx_preproc_item_value_t	value;		/* unpacked item value */
61 	zbx_preproc_op_t		*steps;		/* preprocessing steps */
62 	int				steps_num;	/* number of preprocessing steps */
63 	unsigned char			value_type;	/* value type from configuration */
64 							/* at the beginning of preprocessing queue */
65 }
66 zbx_preprocessing_request_t;
67 
68 /* preprocessing worker data */
69 typedef struct
70 {
71 	zbx_ipc_client_t	*client;	/* the connected preprocessing worker client */
72 	void			*task;		/* the current task data */
73 }
74 zbx_preprocessing_worker_t;
75 
76 /* item link index */
77 typedef struct
78 {
79 	zbx_uint64_t		itemid;		/* item id */
80 	zbx_list_item_t		*queue_item;	/* queued item */
81 }
82 zbx_item_link_t;
83 
84 /* direct request to be forwarded to worker, bypassing the preprocessing queue */
85 typedef struct
86 {
87 	zbx_ipc_client_t	*client;	/* the IPC client sending forward message to worker */
88 	zbx_ipc_message_t	message;
89 }
90 zbx_preprocessing_direct_request_t;
91 
92 /* preprocessing manager data */
93 typedef struct
94 {
95 	zbx_preprocessing_worker_t	*workers;	/* preprocessing worker array */
96 	int				worker_count;	/* preprocessing worker count */
97 	zbx_list_t			queue;		/* queue of item values */
98 	zbx_hashset_t			item_config;	/* item configuration L2 cache */
99 	zbx_hashset_t			history_cache;	/* item value history cache */
100 	zbx_hashset_t			linked_items;	/* linked items placed in queue */
101 	int				cache_ts;	/* cache timestamp */
102 	zbx_uint64_t			processed_num;	/* processed value counter */
103 	zbx_uint64_t			queued_num;	/* queued value counter */
104 	zbx_uint64_t			preproc_num;	/* queued values with preprocessing steps */
105 	zbx_list_iterator_t		priority_tail;	/* iterator to the last queued priority item */
106 
107 	zbx_list_t			direct_queue;	/* Queue of external requests that have to be */
108 							/* forwarded to workers for preprocessing.    */
109 }
110 zbx_preprocessing_manager_t;
111 
112 static void	preprocessor_enqueue_dependent(zbx_preprocessing_manager_t *manager,
113 		zbx_preproc_item_value_t *source_value, zbx_list_item_t *master);
114 
115 /* cleanup functions */
116 
preproc_item_clear(zbx_preproc_item_t * item)117 static void	preproc_item_clear(zbx_preproc_item_t *item)
118 {
119 	int	i;
120 
121 	zbx_free(item->dep_itemids);
122 
123 	for (i = 0; i < item->preproc_ops_num; i++)
124 	{
125 		zbx_free(item->preproc_ops[i].params);
126 		zbx_free(item->preproc_ops[i].error_handler_params);
127 	}
128 
129 	zbx_free(item->preproc_ops);
130 }
131 
request_free_steps(zbx_preprocessing_request_t * request)132 static void	request_free_steps(zbx_preprocessing_request_t *request)
133 {
134 	while (0 < request->steps_num)
135 	{
136 		request->steps_num--;
137 		zbx_free(request->steps[request->steps_num].params);
138 		zbx_free(request->steps[request->steps_num].error_handler_params);
139 	}
140 
141 	zbx_free(request->steps);
142 }
143 
144 /******************************************************************************
145  *                                                                            *
146  * Function: preprocessor_sync_configuration                                  *
147  *                                                                            *
148  * Purpose: synchronize preprocessing manager with configuration cache data   *
149  *                                                                            *
150  * Parameters: manager - [IN] the manager to be synchronized                  *
151  *                                                                            *
152  ******************************************************************************/
preprocessor_sync_configuration(zbx_preprocessing_manager_t * manager)153 static void	preprocessor_sync_configuration(zbx_preprocessing_manager_t *manager)
154 {
155 	zbx_hashset_iter_t	iter;
156 	int			ts;
157 	zbx_preproc_history_t	*vault;
158 	zbx_preproc_item_t	*item;
159 
160 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
161 
162 	ts = manager->cache_ts;
163 	DCconfig_get_preprocessable_items(&manager->item_config, &manager->cache_ts);
164 
165 	if (ts != manager->cache_ts)
166 	{
167 		/* drop items with removed preprocessing steps from preprocessing history cache */
168 		zbx_hashset_iter_reset(&manager->history_cache, &iter);
169 		while (NULL != (vault = (zbx_preproc_history_t *)zbx_hashset_iter_next(&iter)))
170 		{
171 			if (NULL != zbx_hashset_search(&manager->item_config, &vault->itemid))
172 				continue;
173 
174 			zbx_vector_ptr_clear_ext(&vault->history, (zbx_clean_func_t)zbx_preproc_op_history_free);
175 			zbx_vector_ptr_destroy(&vault->history);
176 			zbx_hashset_iter_remove(&iter);
177 		}
178 
179 		/* reset preprocessing history for an item if its preprocessing step was modified */
180 		zbx_hashset_iter_reset(&manager->item_config, &iter);
181 		while (NULL != (item = (zbx_preproc_item_t *)zbx_hashset_iter_next(&iter)))
182 		{
183 			if (ts >= item->update_time)
184 				continue;
185 
186 			if (NULL == (vault = (zbx_preproc_history_t *)zbx_hashset_search(&manager->history_cache,
187 					&item->itemid)))
188 			{
189 				continue;
190 			}
191 
192 			zbx_vector_ptr_clear_ext(&vault->history, (zbx_clean_func_t)zbx_preproc_op_history_free);
193 			zbx_vector_ptr_destroy(&vault->history);
194 			zbx_hashset_remove_direct(&manager->history_cache, vault);
195 		}
196 	}
197 
198 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s() item config size: %d, history cache size: %d", __func__,
199 			manager->item_config.num_data, manager->history_cache.num_data);
200 }
201 
202 /******************************************************************************
203  *                                                                            *
204  * Function: preprocessor_create_task                                         *
205  *                                                                            *
206  * Purpose: create preprocessing task for request                             *
207  *                                                                            *
208  * Parameters: manager - [IN] preprocessing manager                           *
209  *             request - [IN] preprocessing request                           *
210  *             task    - [OUT] preprocessing task data                        *
211  *                                                                            *
212  ******************************************************************************/
preprocessor_create_task(zbx_preprocessing_manager_t * manager,zbx_preprocessing_request_t * request,unsigned char ** task)213 static zbx_uint32_t	preprocessor_create_task(zbx_preprocessing_manager_t *manager,
214 		zbx_preprocessing_request_t *request, unsigned char **task)
215 {
216 	zbx_variant_t		value;
217 	zbx_preproc_history_t	*vault;
218 	zbx_vector_ptr_t	*phistory;
219 
220 	if (ISSET_LOG(request->value.result_ptr->result))
221 		zbx_variant_set_str(&value, request->value.result_ptr->result->log->value);
222 	else if (ISSET_UI64(request->value.result_ptr->result))
223 		zbx_variant_set_ui64(&value, request->value.result_ptr->result->ui64);
224 	else if (ISSET_DBL(request->value.result_ptr->result))
225 		zbx_variant_set_dbl(&value, request->value.result_ptr->result->dbl);
226 	else if (ISSET_STR(request->value.result_ptr->result))
227 		zbx_variant_set_str(&value, request->value.result_ptr->result->str);
228 	else if (ISSET_TEXT(request->value.result_ptr->result))
229 		zbx_variant_set_str(&value, request->value.result_ptr->result->text);
230 	else
231 		THIS_SHOULD_NEVER_HAPPEN;
232 
233 	if (NULL != (vault = (zbx_preproc_history_t *)zbx_hashset_search(&manager->history_cache,
234 				&request->value.itemid)))
235 	{
236 		phistory = &vault->history;
237 	}
238 	else
239 		phistory = NULL;
240 
241 
242 	return zbx_preprocessor_pack_task(task, request->value.itemid, request->value_type, request->value.ts, &value,
243 			phistory, request->steps, request->steps_num);
244 }
245 
246 /******************************************************************************
247  *                                                                            *
248  * Function: preprocessor_set_request_state_done                              *
249  *                                                                            *
250  * Purpose: set request state to done and handle linked items                 *
251  *                                                                            *
252  * Parameters: manager    - [IN] preprocessing manager                        *
253  *             request    - [IN] preprocessing request                        *
254  *             queue_item - [IN] queued item                                  *
255  *                                                                            *
256  ******************************************************************************/
preprocessor_set_request_state_done(zbx_preprocessing_manager_t * manager,zbx_preprocessing_request_t * request,const zbx_list_item_t * queue_item)257 static	void	preprocessor_set_request_state_done(zbx_preprocessing_manager_t *manager,
258 		zbx_preprocessing_request_t *request, const zbx_list_item_t *queue_item)
259 {
260 	zbx_item_link_t	*index;
261 
262 	request->state = REQUEST_STATE_DONE;
263 
264 	/* value processed - the pending value can now be processed */
265 	if (NULL != request->pending)
266 		request->pending->state = REQUEST_STATE_QUEUED;
267 
268 	if (NULL != (index = (zbx_item_link_t *)zbx_hashset_search(&manager->linked_items, &request->value.itemid)) &&
269 			queue_item == index->queue_item)
270 	{
271 		zbx_hashset_remove_direct(&manager->linked_items, index);
272 	}
273 }
274 
275 /******************************************************************************
276  *                                                                            *
277  * Function: preprocessor_get_next_task                                       *
278  *                                                                            *
279  * Purpose: gets next task to be sent to worker                               *
280  *                                                                            *
281  * Parameters: manager - [IN] preprocessing manager                           *
282  *             message - [OUT] the serialized task to be sent                 *
283  *                                                                            *
284  * Return value: pointer to the task object                                   *
285  *                                                                            *
286  ******************************************************************************/
preprocessor_get_next_task(zbx_preprocessing_manager_t * manager,zbx_ipc_message_t * message)287 static void	*preprocessor_get_next_task(zbx_preprocessing_manager_t *manager, zbx_ipc_message_t *message)
288 {
289 	zbx_list_iterator_t			iterator;
290 	zbx_preprocessing_request_t		*request = NULL;
291 	void					*task = NULL;
292 	zbx_preprocessing_direct_request_t	*direct_request;
293 
294 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
295 
296 	if (SUCCEED == zbx_list_pop(&manager->direct_queue, (void **)&direct_request))
297 	{
298 		*message = direct_request->message;
299 		zbx_ipc_message_init(&direct_request->message);
300 		task = direct_request;
301 		goto out;
302 	}
303 
304 	zbx_list_iterator_init(&manager->queue, &iterator);
305 	while (SUCCEED == zbx_list_iterator_next(&iterator))
306 	{
307 		zbx_list_iterator_peek(&iterator, (void **)&request);
308 
309 		if (REQUEST_STATE_QUEUED != request->state)
310 			continue;
311 
312 		if (ITEM_STATE_NOTSUPPORTED == request->value.state)
313 		{
314 			zbx_preproc_history_t	*vault;
315 
316 			if (NULL != (vault = (zbx_preproc_history_t *) zbx_hashset_search(&manager->history_cache,
317 					&request->value.itemid)))
318 			{
319 				zbx_vector_ptr_clear_ext(&vault->history,
320 						(zbx_clean_func_t) zbx_preproc_op_history_free);
321 				zbx_vector_ptr_destroy(&vault->history);
322 				zbx_hashset_remove_direct(&manager->history_cache, vault);
323 			}
324 
325 			preprocessor_set_request_state_done(manager, request, iterator.current);
326 			continue;
327 		}
328 
329 		task = iterator.current;
330 		request->state = REQUEST_STATE_PROCESSING;
331 		message->code = ZBX_IPC_PREPROCESSOR_REQUEST;
332 		message->size = preprocessor_create_task(manager, request, &message->data);
333 		request_free_steps(request);
334 		break;
335 	}
336 out:
337 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
338 
339 	return task;
340 }
341 
preproc_item_result_free(zbx_preproc_item_value_t * value)342 static void	preproc_item_result_free(zbx_preproc_item_value_t *value)
343 {
344 	if (0 == --(value->result_ptr->refcount))
345 	{
346 		if (NULL != value->result_ptr->result)
347 		{
348 			free_result(value->result_ptr->result);
349 			zbx_free(value->result_ptr->result);
350 		}
351 		zbx_free(value->result_ptr);
352 	}
353 	else
354 		value->result_ptr = NULL;
355 }
356 
357 /******************************************************************************
358  *                                                                            *
359  * Function: preprocessor_get_worker_by_client                                *
360  *                                                                            *
361  * Purpose: get worker data by IPC client                                     *
362  *                                                                            *
363  * Parameters: manager - [IN] preprocessing manager                           *
364  *             client  - [IN] IPC client                                      *
365  *                                                                            *
366  * Return value: pointer to the worker data                                   *
367  *                                                                            *
368  ******************************************************************************/
preprocessor_get_worker_by_client(zbx_preprocessing_manager_t * manager,zbx_ipc_client_t * client)369 static zbx_preprocessing_worker_t	*preprocessor_get_worker_by_client(zbx_preprocessing_manager_t *manager,
370 		zbx_ipc_client_t *client)
371 {
372 	int				i;
373 	zbx_preprocessing_worker_t	*worker = NULL;
374 
375 	for (i = 0; i < manager->worker_count; i++)
376 	{
377 		if (client == manager->workers[i].client)
378 		{
379 			worker = &manager->workers[i];
380 			break;
381 		}
382 	}
383 
384 	if (NULL == worker)
385 	{
386 		THIS_SHOULD_NEVER_HAPPEN;
387 		exit(EXIT_FAILURE);
388 	}
389 
390 	return worker;
391 }
392 
393 /******************************************************************************
394  *                                                                            *
395  * Function: preprocessor_get_free_worker                                     *
396  *                                                                            *
397  * Purpose: get worker without active preprocessing task                      *
398  *                                                                            *
399  * Parameters: manager - [IN] preprocessing manager                           *
400  *                                                                            *
401  * Return value: pointer to the worker data or NULL if none                   *
402  *                                                                            *
403  ******************************************************************************/
preprocessor_get_free_worker(zbx_preprocessing_manager_t * manager)404 static zbx_preprocessing_worker_t	*preprocessor_get_free_worker(zbx_preprocessing_manager_t *manager)
405 {
406 	int	i;
407 
408 	for (i = 0; i < manager->worker_count; i++)
409 	{
410 		if (NULL == manager->workers[i].task)
411 			return &manager->workers[i];
412 	}
413 
414 	return NULL;
415 }
416 
417 /******************************************************************************
418  *                                                                            *
419  * Function: preprocessor_assign_tasks                                        *
420  *                                                                            *
421  * Purpose: assign available queued preprocessing tasks to free workers       *
422  *                                                                            *
423  * Parameters: manager - [IN] preprocessing manager                           *
424  *                                                                            *
425  ******************************************************************************/
preprocessor_assign_tasks(zbx_preprocessing_manager_t * manager)426 static void	preprocessor_assign_tasks(zbx_preprocessing_manager_t *manager)
427 {
428 	zbx_preprocessing_worker_t	*worker;
429 	void				*data;
430 	zbx_ipc_message_t		message;
431 
432 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
433 
434 	while (NULL != (worker = preprocessor_get_free_worker(manager)) &&
435 			NULL != (data = preprocessor_get_next_task(manager, &message)))
436 	{
437 		if (FAIL == zbx_ipc_client_send(worker->client, message.code, message.data, message.size))
438 		{
439 			zabbix_log(LOG_LEVEL_CRIT, "cannot send data to preprocessing worker");
440 			exit(EXIT_FAILURE);
441 		}
442 
443 		worker->task = data;
444 		zbx_ipc_message_clean(&message);
445 	}
446 
447 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
448 }
449 
450 /******************************************************************************
451  *                                                                            *
452  * Function: preproc_item_value_clear                                         *
453  *                                                                            *
454  * Purpose: frees resources allocated by preprocessor item value              *
455  *                                                                            *
456  * Parameters: value - [IN] value to be freed                                 *
457  *                                                                            *
458  ******************************************************************************/
preproc_item_value_clear(zbx_preproc_item_value_t * value)459 static void	preproc_item_value_clear(zbx_preproc_item_value_t *value)
460 {
461 	zbx_free(value->error);
462 	preproc_item_result_free(value);
463 	zbx_free(value->ts);
464 }
465 
466 /******************************************************************************
467  *                                                                            *
468  * Function: preprocessor_free_request                                        *
469  *                                                                            *
470  * Purpose: free preprocessing request                                        *
471  *                                                                            *
472  * Parameters: request - [IN] request data to be freed                        *
473  *                                                                            *
474  ******************************************************************************/
preprocessor_free_request(zbx_preprocessing_request_t * request)475 static void	preprocessor_free_request(zbx_preprocessing_request_t *request)
476 {
477 	preproc_item_value_clear(&request->value);
478 	request_free_steps(request);
479 	zbx_free(request);
480 }
481 
482 /******************************************************************************
483  *                                                                            *
484  * Function: preprocessor_free_direct_request                                 *
485  *                                                                            *
486  * Purpose: free preprocessing direct request                                 *
487  *                                                                            *
488  * Parameters: forward - [IN] forward data to be freed                        *
489  *                                                                            *
490  ******************************************************************************/
preprocessor_free_direct_request(zbx_preprocessing_direct_request_t * direct_request)491 static void	preprocessor_free_direct_request(zbx_preprocessing_direct_request_t *direct_request)
492 {
493 	zbx_ipc_client_release(direct_request->client);
494 	zbx_ipc_message_clean(&direct_request->message);
495 	zbx_free(direct_request);
496 }
497 
498 /******************************************************************************
499  *                                                                            *
500  * Function: preprocessor_flush_value                                         *
501  *                                                                            *
502  * Purpose: add new value to the local history cache or send to LLD manager   *
503  *                                                                            *
504  * Parameters: value - [IN] value to be added or sent                         *
505  *                                                                            *
506  ******************************************************************************/
preprocessor_flush_value(const zbx_preproc_item_value_t * value)507 static void	preprocessor_flush_value(const zbx_preproc_item_value_t *value)
508 {
509 	if (0 == (value->item_flags & ZBX_FLAG_DISCOVERY_RULE) || 0 == (program_type & ZBX_PROGRAM_TYPE_SERVER))
510 	{
511 		dc_add_history(value->itemid, value->item_value_type, value->item_flags, value->result_ptr->result,
512 				value->ts, value->state, value->error);
513 	}
514 	else
515 		zbx_lld_process_agent_result(value->itemid, value->hostid, value->result_ptr->result, value->ts,
516 				value->error);
517 }
518 
519 /******************************************************************************
520  *                                                                            *
521  * Function: preprocessing_flush_queue                                        *
522  *                                                                            *
523  * Purpose: add all sequential processed values from beginning of the queue   *
524  *          to the local history cache                                        *
525  *                                                                            *
526  * Parameters: manager - [IN] preprocessing manager                           *
527  *                                                                            *
528  ******************************************************************************/
preprocessing_flush_queue(zbx_preprocessing_manager_t * manager)529 static void	preprocessing_flush_queue(zbx_preprocessing_manager_t *manager)
530 {
531 	zbx_preprocessing_request_t	*request;
532 	zbx_list_iterator_t		iterator;
533 
534 	zbx_list_iterator_init(&manager->queue, &iterator);
535 	while (SUCCEED == zbx_list_iterator_next(&iterator))
536 	{
537 		zbx_list_iterator_peek(&iterator, (void **)&request);
538 
539 		if (REQUEST_STATE_DONE != request->state)
540 			break;
541 
542 		preprocessor_flush_value(&request->value);
543 		preprocessor_free_request(request);
544 
545 		if (SUCCEED == zbx_list_iterator_equal(&iterator, &manager->priority_tail))
546 			zbx_list_iterator_clear(&manager->priority_tail);
547 
548 		zbx_list_pop(&manager->queue, NULL);
549 
550 		manager->processed_num++;
551 		manager->queued_num--;
552 	}
553 }
554 
555 /******************************************************************************
556  *                                                                            *
557  * Function: preprocessor_link_items                                          *
558  *                                                                            *
559  * Purpose: create relation between item values within value queue            *
560  *                                                                            *
561  * Parameters: manager     - [IN] preprocessing manager                       *
562  *             enqueued_at - [IN] position in value queue                     *
563  *             item        - [IN] item configuration data                     *
564  *                                                                            *
565  ******************************************************************************/
preprocessor_link_items(zbx_preprocessing_manager_t * manager,zbx_list_item_t * enqueued_at,zbx_preproc_item_t * item)566 static void	preprocessor_link_items(zbx_preprocessing_manager_t *manager, zbx_list_item_t *enqueued_at,
567 		zbx_preproc_item_t *item)
568 {
569 	int				i;
570 	zbx_preprocessing_request_t	*request, *dep_request;
571 	zbx_item_link_t			*index, index_local;
572 	zbx_preproc_op_t		*op;
573 
574 	for (i = 0; i < item->preproc_ops_num; i++)
575 	{
576 		op = &item->preproc_ops[i];
577 
578 		if (ZBX_PREPROC_DELTA_VALUE == op->type || ZBX_PREPROC_DELTA_SPEED == op->type)
579 			break;
580 
581 		if (ZBX_PREPROC_THROTTLE_VALUE == op->type || ZBX_PREPROC_THROTTLE_TIMED_VALUE == op->type)
582 			break;
583 	}
584 
585 	if (i != item->preproc_ops_num)
586 	{
587 		/* existing linked item*/
588 		if (NULL != (index = (zbx_item_link_t *)zbx_hashset_search(&manager->linked_items, &item->itemid)))
589 		{
590 			dep_request = (zbx_preprocessing_request_t *)(enqueued_at->data);
591 			request = (zbx_preprocessing_request_t *)(index->queue_item->data);
592 
593 			if (REQUEST_STATE_DONE != request->state)
594 			{
595 				request->pending = dep_request;
596 				dep_request->state = REQUEST_STATE_PENDING;
597 			}
598 
599 			index->queue_item = enqueued_at;
600 		}
601 		else
602 		{
603 			index_local.itemid = item->itemid;
604 			index_local.queue_item = enqueued_at;
605 
606 			zbx_hashset_insert(&manager->linked_items, &index_local, sizeof(zbx_item_link_t));
607 		}
608 	}
609 }
610 
611 /******************************************************************************
612  *                                                                            *
613  * Function: preprocessor_copy_value                                          *
614  *                                                                            *
615  * Purpose: create a copy of existing item value                              *
616  *                                                                            *
617  * Parameters: target  - [OUT] created copy                                   *
618  *             source  - [IN]  value to be copied                             *
619  *                                                                            *
620  ******************************************************************************/
preprocessor_copy_value(zbx_preproc_item_value_t * target,zbx_preproc_item_value_t * source)621 static void	preprocessor_copy_value(zbx_preproc_item_value_t *target, zbx_preproc_item_value_t *source)
622 {
623 	memcpy(target, source, sizeof(zbx_preproc_item_value_t));
624 
625 	if (NULL != source->error)
626 		target->error = zbx_strdup(NULL, source->error);
627 
628 	if (NULL != source->ts)
629 	{
630 		target->ts = (zbx_timespec_t *)zbx_malloc(NULL, sizeof(zbx_timespec_t));
631 		memcpy(target->ts, source->ts, sizeof(zbx_timespec_t));
632 	}
633 }
634 
635 /******************************************************************************
636  *                                                                            *
637  * Function: preprocessor_enqueue                                             *
638  *                                                                            *
639  * Purpose: enqueue preprocessing request                                     *
640  *                                                                            *
641  * Parameters: manage   - [IN] preprocessing manager                          *
642  *             value    - [IN] item value                                     *
643  *             master   - [IN] request should be enqueued after this item     *
644  *                             (NULL for the end of the queue)                *
645  *                                                                            *
646  ******************************************************************************/
preprocessor_enqueue(zbx_preprocessing_manager_t * manager,zbx_preproc_item_value_t * value,zbx_list_item_t * master)647 static void	preprocessor_enqueue(zbx_preprocessing_manager_t *manager, zbx_preproc_item_value_t *value,
648 		zbx_list_item_t *master)
649 {
650 	zbx_preprocessing_request_t	*request;
651 	zbx_preproc_item_t		*item, item_local;
652 	zbx_list_item_t			*enqueued_at;
653 	int				i;
654 	zbx_preprocessing_states_t	state;
655 	unsigned char			priority = ZBX_PREPROC_PRIORITY_NONE;
656 
657 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() itemid: " ZBX_FS_UI64, __func__, value->itemid);
658 
659 	item_local.itemid = value->itemid;
660 	item = (zbx_preproc_item_t *)zbx_hashset_search(&manager->item_config, &item_local);
661 
662 	/* override priority based on item type */
663 	if (NULL != item && ITEM_TYPE_INTERNAL == item->type)
664 		priority = ZBX_PREPROC_PRIORITY_FIRST;
665 
666 	if (NULL == item || 0 == item->preproc_ops_num || (ITEM_STATE_NOTSUPPORTED != value->state &&
667 			(NULL == value->result_ptr->result || 0 == ISSET_VALUE(value->result_ptr->result))))
668 	{
669 		state = REQUEST_STATE_DONE;
670 
671 		if (NULL == manager->queue.head)
672 		{
673 			/* queue is empty and item is done, it can be flushed */
674 			preprocessor_flush_value(value);
675 			manager->processed_num++;
676 			preprocessor_enqueue_dependent(manager, value, NULL);
677 			preproc_item_value_clear(value);
678 
679 			goto out;
680 		}
681 	}
682 	else
683 		state = REQUEST_STATE_QUEUED;
684 
685 	request = (zbx_preprocessing_request_t *)zbx_malloc(NULL, sizeof(zbx_preprocessing_request_t));
686 	memset(request, 0, sizeof(zbx_preprocessing_request_t));
687 	memcpy(&request->value, value, sizeof(zbx_preproc_item_value_t));
688 	request->state = state;
689 
690 	if (REQUEST_STATE_QUEUED == state && ITEM_STATE_NOTSUPPORTED != value->state)
691 	{
692 		request->value_type = item->value_type;
693 		request->steps = (zbx_preproc_op_t *)zbx_malloc(NULL, sizeof(zbx_preproc_op_t) * item->preproc_ops_num);
694 		request->steps_num = item->preproc_ops_num;
695 
696 		for (i = 0; i < item->preproc_ops_num; i++)
697 		{
698 			request->steps[i].type = item->preproc_ops[i].type;
699 			request->steps[i].params = zbx_strdup(NULL, item->preproc_ops[i].params);
700 			request->steps[i].error_handler = item->preproc_ops[i].error_handler;
701 			request->steps[i].error_handler_params = zbx_strdup(NULL,
702 					item->preproc_ops[i].error_handler_params);
703 		}
704 
705 		manager->preproc_num++;
706 	}
707 
708 	/* priority items are enqueued at the beginning of the line */
709 	if (NULL == master && ZBX_PREPROC_PRIORITY_FIRST == priority)
710 	{
711 		if (SUCCEED == zbx_list_iterator_isset(&manager->priority_tail))
712 		{
713 			/* insert after the last internal item */
714 			zbx_list_insert_after(&manager->queue, manager->priority_tail.current, request, &enqueued_at);
715 			zbx_list_iterator_update(&manager->priority_tail);
716 		}
717 		else
718 		{
719 			/* no internal items in queue, insert at the beginning */
720 			zbx_list_prepend(&manager->queue, request, &enqueued_at);
721 			zbx_list_iterator_init(&manager->queue, &manager->priority_tail);
722 		}
723 
724 		zbx_list_iterator_next(&manager->priority_tail);
725 	}
726 	else
727 	{
728 		zbx_list_insert_after(&manager->queue, master, request, &enqueued_at);
729 		zbx_list_iterator_update(&manager->priority_tail);
730 
731 		/* move internal item tail position if we are inserting after last internal item */
732 		if (NULL != master && master == manager->priority_tail.current)
733 			zbx_list_iterator_next(&manager->priority_tail);
734 	}
735 
736 	if (REQUEST_STATE_QUEUED == request->state)
737 		preprocessor_link_items(manager, enqueued_at, item);
738 
739 	/* if no preprocessing is needed, dependent items are enqueued */
740 	if (REQUEST_STATE_DONE == request->state)
741 		preprocessor_enqueue_dependent(manager, value, enqueued_at);
742 
743 	manager->queued_num++;
744 out:
745 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
746 }
747 
748 /******************************************************************************
749  *                                                                            *
750  * Function: preprocessor_enqueue_dependent                                   *
751  *                                                                            *
752  * Purpose: enqueue dependent items (if any)                                  *
753  *                                                                            *
754  * Parameters: manager      - [IN] preprocessing manager                      *
755  *             source_value - [IN] master item value                          *
756  *             master       - [IN] dependent item should be enqueued after    *
757  *                                 this item                                  *
758  *                                                                            *
759  ******************************************************************************/
preprocessor_enqueue_dependent(zbx_preprocessing_manager_t * manager,zbx_preproc_item_value_t * source_value,zbx_list_item_t * master)760 static void	preprocessor_enqueue_dependent(zbx_preprocessing_manager_t *manager,
761 		zbx_preproc_item_value_t *source_value, zbx_list_item_t *master)
762 {
763 	int				i;
764 	zbx_preproc_item_t		*item, item_local;
765 	zbx_preproc_item_value_t	value;
766 
767 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() itemid: " ZBX_FS_UI64, __func__, source_value->itemid);
768 
769 	if (NULL != source_value->result_ptr->result && ISSET_VALUE(source_value->result_ptr->result))
770 	{
771 		item_local.itemid = source_value->itemid;
772 		if (NULL != (item = (zbx_preproc_item_t *)zbx_hashset_search(&manager->item_config, &item_local)) &&
773 				0 != item->dep_itemids_num)
774 		{
775 			/* result is shared between all dependent items, new result will be created after preprocessing */
776 			source_value->result_ptr->refcount += item->dep_itemids_num;
777 
778 			for (i = item->dep_itemids_num - 1; i >= 0; i--)
779 			{
780 				preprocessor_copy_value(&value, source_value);
781 				value.itemid = item->dep_itemids[i].first;
782 				value.item_flags = item->dep_itemids[i].second;
783 				preprocessor_enqueue(manager, &value, master);
784 			}
785 
786 			preprocessor_assign_tasks(manager);
787 			preprocessing_flush_queue(manager);
788 		}
789 	}
790 
791 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
792 }
793 
794 /******************************************************************************
795  *                                                                            *
796  * Function: preprocessor_add_request                                         *
797  *                                                                            *
798  * Purpose: handle new preprocessing request                                  *
799  *                                                                            *
800  * Parameters: manager - [IN] preprocessing manager                           *
801  *             message - [IN] packed preprocessing request                    *
802  *                                                                            *
803  ******************************************************************************/
preprocessor_add_request(zbx_preprocessing_manager_t * manager,zbx_ipc_message_t * message)804 static void	preprocessor_add_request(zbx_preprocessing_manager_t *manager, zbx_ipc_message_t *message)
805 {
806 	zbx_uint32_t			offset = 0;
807 	zbx_preproc_item_value_t	value;
808 
809 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
810 
811 	preprocessor_sync_configuration(manager);
812 
813 	while (offset < message->size)
814 	{
815 		offset += zbx_preprocessor_unpack_value(&value, message->data + offset);
816 		preprocessor_enqueue(manager, &value, NULL);
817 	}
818 
819 	preprocessor_assign_tasks(manager);
820 	preprocessing_flush_queue(manager);
821 
822 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
823 }
824 
825 /******************************************************************************
826  *                                                                            *
827  * Function: preprocessor_add_test_request                                    *
828  *                                                                            *
829  * Purpose: handle new preprocessing test request                             *
830  *                                                                            *
831  * Parameters: manager - [IN] preprocessing manager                           *
832  *             message - [IN] packed preprocessing request                    *
833  *                                                                            *
834  ******************************************************************************/
preprocessor_add_test_request(zbx_preprocessing_manager_t * manager,zbx_ipc_client_t * client,zbx_ipc_message_t * message)835 static void	preprocessor_add_test_request(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
836 		zbx_ipc_message_t *message)
837 {
838 	zbx_preprocessing_direct_request_t	*direct_request;
839 
840 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
841 
842 	zbx_ipc_client_addref(client);
843 	direct_request = zbx_malloc(NULL, sizeof(zbx_preprocessing_direct_request_t));
844 	direct_request->client = client;
845 	zbx_ipc_message_copy(&direct_request->message, message);
846 	zbx_list_append(&manager->direct_queue, direct_request, NULL);
847 
848 	preprocessor_assign_tasks(manager);
849 	preprocessing_flush_queue(manager);
850 
851 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
852 }
853 
854 /******************************************************************************
855  *                                                                            *
856  * Function: create_result_with_meta                                          *
857  *                                                                            *
858  * Purpose: create new result and copy meta information from previous result  *
859  *                                                                            *
860  * Parameters: result_old - [IN] result that can contain meta information     *
861  *                                                                            *
862  * Return value: pointer newly allocated result                               *
863  *                                                                            *
864  ******************************************************************************/
create_result_with_meta(const AGENT_RESULT * result_old)865 static AGENT_RESULT	*create_result_with_meta(const AGENT_RESULT *result_old)
866 {
867 	AGENT_RESULT	*result;
868 
869 	result = zbx_malloc(NULL, sizeof(AGENT_RESULT));
870 
871 	init_result(result);
872 
873 	if (0 == ISSET_META(result_old))
874 		return result;
875 
876 	result->type = AR_META;
877 	result->lastlogsize = result_old->lastlogsize;
878 	result->mtime = result_old->mtime;
879 
880 	return result;
881 }
882 
883 /******************************************************************************
884  *                                                                            *
885  * Function: preprocessor_set_variant_result                                  *
886  *                                                                            *
887  * Purpose: get result data from variant and error message                    *
888  *                                                                            *
889  * Parameters: request - [IN/OUT] preprocessing request                       *
890  *             value   - [IN] variant value                                   *
891  *             error   - [IN] error message (if any)                          *
892  *                                                                            *
893  ******************************************************************************/
preprocessor_set_variant_result(zbx_preprocessing_request_t * request,zbx_variant_t * value,char * error)894 static int	preprocessor_set_variant_result(zbx_preprocessing_request_t *request,
895 		zbx_variant_t *value, char *error)
896 {
897 	int		type, ret = FAIL;
898 	zbx_log_t	*log;
899 
900 	if (NULL != error)
901 	{
902 		/* on error item state is set to ITEM_STATE_NOTSUPPORTED */
903 		request->value.state = ITEM_STATE_NOTSUPPORTED;
904 		request->value.error = error;
905 		ret = FAIL;
906 
907 		goto out;
908 	}
909 
910 	if (ZBX_VARIANT_NONE == value->type)
911 	{
912 		AGENT_RESULT	*result;
913 
914 		result = create_result_with_meta(request->value.result_ptr->result);
915 
916 		preproc_item_result_free(&request->value);
917 		request->value.result_ptr = (zbx_result_ptr_t *)zbx_malloc(NULL, sizeof(zbx_result_ptr_t));
918 		request->value.result_ptr->refcount = 1;
919 		request->value.result_ptr->result = result;
920 
921 		ret = FAIL;
922 
923 		goto out;
924 	}
925 
926 	switch (request->value_type)
927 	{
928 		case ITEM_VALUE_TYPE_FLOAT:
929 			type = ZBX_VARIANT_DBL;
930 			break;
931 		case ITEM_VALUE_TYPE_UINT64:
932 			type = ZBX_VARIANT_UI64;
933 			break;
934 		default:
935 			/* ITEM_VALUE_TYPE_STR, ITEM_VALUE_TYPE_TEXT, ITEM_VALUE_TYPE_LOG */
936 			type = ZBX_VARIANT_STR;
937 	}
938 
939 	if (FAIL != (ret = zbx_variant_convert(value, type)))
940 	{
941 		/* old result is shared between dependent and master items, it cannot be modified, create new result */
942 		AGENT_RESULT	*result;
943 
944 		result = create_result_with_meta(request->value.result_ptr->result);
945 
946 		switch (request->value_type)
947 		{
948 			case ITEM_VALUE_TYPE_FLOAT:
949 				SET_DBL_RESULT(result, value->data.dbl);
950 				break;
951 			case ITEM_VALUE_TYPE_STR:
952 				SET_STR_RESULT(result, value->data.str);
953 				break;
954 			case ITEM_VALUE_TYPE_LOG:
955 				log = (zbx_log_t *)zbx_malloc(NULL, sizeof(zbx_log_t));
956 
957 				if (ISSET_LOG(request->value.result_ptr->result))
958 				{
959 					*log = *request->value.result_ptr->result->log;
960 					if (NULL != log->source)
961 						log->source = zbx_strdup(NULL, log->source);
962 				}
963 				else
964 					memset(log, 0, sizeof(zbx_log_t));
965 
966 				log->value = value->data.str;
967 				SET_LOG_RESULT(result, log);
968 				break;
969 			case ITEM_VALUE_TYPE_UINT64:
970 				SET_UI64_RESULT(result, value->data.ui64);
971 				break;
972 			case ITEM_VALUE_TYPE_TEXT:
973 				SET_TEXT_RESULT(result, value->data.str);
974 				break;
975 		}
976 
977 		preproc_item_result_free(&request->value);
978 		request->value.result_ptr = (zbx_result_ptr_t *)zbx_malloc(NULL, sizeof(zbx_result_ptr_t));
979 		request->value.result_ptr->refcount = 1;
980 		request->value.result_ptr->result = result;
981 
982 		zbx_variant_set_none(value);
983 	}
984 	else
985 	{
986 		zbx_free(request->value.error);
987 		request->value.error = zbx_dsprintf(NULL, "Value \"%s\" of type \"%s\" is not suitable for"
988 			" value type \"%s\"", zbx_variant_value_desc(value), zbx_variant_type_desc(value),
989 			zbx_item_value_type_string((zbx_item_value_type_t)request->value_type));
990 
991 		request->value.state = ITEM_STATE_NOTSUPPORTED;
992 		ret = FAIL;
993 	}
994 
995 out:
996 	return ret;
997 }
998 
999 /******************************************************************************
1000  *                                                                            *
1001  * Function: preprocessor_add_result                                          *
1002  *                                                                            *
1003  * Purpose: handle preprocessing result                                       *
1004  *                                                                            *
1005  * Parameters: manager - [IN] preprocessing manager                           *
1006  *             client  - [IN] IPC client                                      *
1007  *             message - [IN] packed preprocessing result                     *
1008  *                                                                            *
1009  ******************************************************************************/
preprocessor_add_result(zbx_preprocessing_manager_t * manager,zbx_ipc_client_t * client,zbx_ipc_message_t * message)1010 static void	preprocessor_add_result(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
1011 		zbx_ipc_message_t *message)
1012 {
1013 	zbx_preprocessing_worker_t	*worker;
1014 	zbx_preprocessing_request_t	*request;
1015 	zbx_variant_t			value;
1016 	char				*error;
1017 	zbx_vector_ptr_t		history;
1018 	zbx_preproc_history_t		*vault;
1019 	zbx_list_item_t			*node;
1020 
1021 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1022 
1023 	worker = preprocessor_get_worker_by_client(manager, client);
1024 	node = (zbx_list_item_t *)worker->task;
1025 	request = (zbx_preprocessing_request_t *)node->data;
1026 
1027 	zbx_vector_ptr_create(&history);
1028 	zbx_preprocessor_unpack_result(&value, &history, &error, message->data);
1029 
1030 	if (NULL != (vault = (zbx_preproc_history_t *)zbx_hashset_search(&manager->history_cache,
1031 			&request->value.itemid)))
1032 	{
1033 		zbx_vector_ptr_clear_ext(&vault->history, (zbx_clean_func_t)zbx_preproc_op_history_free);
1034 	}
1035 
1036 	if (0 != history.values_num)
1037 	{
1038 		if (NULL == vault)
1039 		{
1040 			zbx_preproc_history_t	history_local;
1041 
1042 			history_local.itemid = request->value.itemid;
1043 			vault = (zbx_preproc_history_t *)zbx_hashset_insert(&manager->history_cache, &history_local,
1044 					sizeof(history_local));
1045 			zbx_vector_ptr_create(&vault->history);
1046 		}
1047 
1048 		zbx_vector_ptr_append_array(&vault->history, history.values, history.values_num);
1049 		zbx_vector_ptr_clear(&history);
1050 	}
1051 	else
1052 	{
1053 		if (NULL != vault)
1054 		{
1055 			zbx_vector_ptr_destroy(&vault->history);
1056 			zbx_hashset_remove_direct(&manager->history_cache, vault);
1057 		}
1058 	}
1059 
1060 	preprocessor_set_request_state_done(manager, request, worker->task);
1061 
1062 	if (FAIL != preprocessor_set_variant_result(request, &value, error))
1063 		preprocessor_enqueue_dependent(manager, &request->value, worker->task);
1064 
1065 	worker->task = NULL;
1066 	zbx_variant_clear(&value);
1067 
1068 	manager->preproc_num--;
1069 
1070 	preprocessor_assign_tasks(manager);
1071 	preprocessing_flush_queue(manager);
1072 
1073 	zbx_vector_ptr_destroy(&history);
1074 
1075 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1076 }
1077 
1078 /******************************************************************************
1079  *                                                                            *
1080  * Function: preprocessor_flush_test_result                                   *
1081  *                                                                            *
1082  * Purpose: handle preprocessing result                                       *
1083  *                                                                            *
1084  * Parameters: manager - [IN] preprocessing manager                           *
1085  *             client  - [IN] IPC client                                      *
1086  *             message - [IN] packed preprocessing result                     *
1087  *                                                                            *
1088  * Comments: Preprocessing testing results are directly forwarded to source   *
1089  *           client as they are.                                              *
1090  *                                                                            *
1091  ******************************************************************************/
preprocessor_flush_test_result(zbx_preprocessing_manager_t * manager,zbx_ipc_client_t * client,zbx_ipc_message_t * message)1092 static void	preprocessor_flush_test_result(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
1093 		zbx_ipc_message_t *message)
1094 {
1095 	zbx_preprocessing_worker_t		*worker;
1096 	zbx_preprocessing_direct_request_t	*direct_request;
1097 
1098 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1099 
1100 	worker = preprocessor_get_worker_by_client(manager, client);
1101 	direct_request = (zbx_preprocessing_direct_request_t *)worker->task;
1102 
1103 	/* forward the response to the client */
1104 	if (SUCCEED == zbx_ipc_client_connected(direct_request->client))
1105 		zbx_ipc_client_send(direct_request->client, message->code, message->data, message->size);
1106 
1107 	worker->task = NULL;
1108 	preprocessor_free_direct_request(direct_request);
1109 
1110 	preprocessor_assign_tasks(manager);
1111 	preprocessing_flush_queue(manager);
1112 
1113 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1114 }
1115 
preprocessor_get_items_totals(zbx_preprocessing_manager_t * manager,int * total,int * queued,int * processing,int * done,int * pending)1116 static	void	preprocessor_get_items_totals(zbx_preprocessing_manager_t *manager, int *total, int *queued,
1117 		int *processing, int *done, int *pending)
1118 {
1119 #define ZBX_MAX_REQUEST_STATE_PRINT_LIMIT	25
1120 
1121 	zbx_preproc_item_stats_t	*item;
1122 	zbx_list_iterator_t		iterator;
1123 	zbx_preprocessing_request_t	*request;
1124 	zbx_hashset_t			items;
1125 
1126 	*total = 0;
1127 	*queued = 0;
1128 	*processing = 0;
1129 	*done = 0;
1130 	*pending = 0;
1131 
1132 	zbx_hashset_create(&items, 1024, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1133 
1134 	zbx_list_iterator_init(&manager->queue, &iterator);
1135 	while (SUCCEED == zbx_list_iterator_next(&iterator))
1136 	{
1137 		zbx_list_iterator_peek(&iterator, (void **)&request);
1138 
1139 		if (NULL == (item = zbx_hashset_search(&items, &request->value.itemid)))
1140 		{
1141 			zbx_preproc_item_stats_t	item_local = {.itemid = request->value.itemid};
1142 
1143 			item = zbx_hashset_insert(&items, &item_local, sizeof(item_local));
1144 		}
1145 
1146 		switch(request->state)
1147 		{
1148 			case REQUEST_STATE_QUEUED:
1149 				if (*queued < ZBX_MAX_REQUEST_STATE_PRINT_LIMIT)
1150 				{
1151 					zabbix_log(LOG_LEVEL_DEBUG, "oldest queued itemid: " ZBX_FS_UI64
1152 							" values:%d pos:%d", item->itemid, item->values_num, *total);
1153 				}
1154 				(*queued)++;
1155 				break;
1156 			case REQUEST_STATE_PROCESSING:
1157 				if (*processing < ZBX_MAX_REQUEST_STATE_PRINT_LIMIT)
1158 				{
1159 					zabbix_log(LOG_LEVEL_DEBUG, "oldest processing itemid: " ZBX_FS_UI64
1160 							" values:%d pos:%d", item->itemid, item->values_num, *total);
1161 				}
1162 				(*processing)++;
1163 				break;
1164 			case REQUEST_STATE_DONE:
1165 				if (*done < ZBX_MAX_REQUEST_STATE_PRINT_LIMIT)
1166 				{
1167 					zabbix_log(LOG_LEVEL_DEBUG, "oldest done itemid: " ZBX_FS_UI64
1168 							" values:%d pos:%d", item->itemid, item->values_num, *total);
1169 				}
1170 				(*done)++;
1171 				break;
1172 			case REQUEST_STATE_PENDING:
1173 				if (*pending < ZBX_MAX_REQUEST_STATE_PRINT_LIMIT)
1174 				{
1175 					zabbix_log(LOG_LEVEL_DEBUG, "oldest pending itemid: " ZBX_FS_UI64
1176 							" values:%d pos:%d", item->itemid, item->values_num, *total);
1177 				}
1178 				(*pending)++;
1179 				break;
1180 		}
1181 
1182 		item->values_num++;
1183 		(*total)++;
1184 	}
1185 
1186 	zbx_hashset_destroy(&items);
1187 #undef ZBX_MAX_REQUEST_STATE_PRINT_LIMIT
1188 }
1189 
1190 /******************************************************************************
1191  *                                                                            *
1192  * Function: preprocessor_get_diag_stats                                      *
1193  *                                                                            *
1194  * Purpose: return diagnostic statistics                                      *
1195  *                                                                            *
1196  * Parameters: manager - [IN] preprocessing manager                           *
1197  *             client  - [IN] IPC client                                      *
1198  *                                                                            *
1199  ******************************************************************************/
preprocessor_get_diag_stats(zbx_preprocessing_manager_t * manager,zbx_ipc_client_t * client)1200 static void	preprocessor_get_diag_stats(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client)
1201 {
1202 	unsigned char	*data;
1203 	zbx_uint32_t	data_len;
1204 	int		total, queued, processing, done, pending;
1205 
1206 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1207 
1208 	preprocessor_get_items_totals(manager, &total, &queued, &processing, &done, &pending);
1209 
1210 	data_len = zbx_preprocessor_pack_diag_stats(&data, total, queued, processing, done, pending);
1211 	zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_DIAG_STATS_RESULT, data, data_len);
1212 	zbx_free(data);
1213 
1214 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1215 }
1216 
1217 /******************************************************************************
1218  *                                                                            *
1219  * Function: preproc_sort_item_by_values_desc                                 *
1220  *                                                                            *
1221  * Purpose: compare item statistics by value                                  *
1222  *                                                                            *
1223  ******************************************************************************/
preproc_sort_item_by_values_desc(const void * d1,const void * d2)1224 static int	preproc_sort_item_by_values_desc(const void *d1, const void *d2)
1225 {
1226 	zbx_preproc_item_stats_t	*i1 = *(zbx_preproc_item_stats_t **)d1;
1227 	zbx_preproc_item_stats_t	*i2 = *(zbx_preproc_item_stats_t **)d2;
1228 
1229 	return i2->values_num - i1->values_num;
1230 }
1231 
preprocessor_get_items_view(zbx_preprocessing_manager_t * manager,zbx_hashset_t * items,zbx_vector_ptr_t * view)1232 static	void	preprocessor_get_items_view(zbx_preprocessing_manager_t *manager, zbx_hashset_t *items,
1233 		zbx_vector_ptr_t *view)
1234 {
1235 	zbx_preproc_item_stats_t	*item;
1236 	zbx_list_iterator_t		iterator;
1237 	zbx_preprocessing_request_t	*request;
1238 
1239 	zbx_list_iterator_init(&manager->queue, &iterator);
1240 	while (SUCCEED == zbx_list_iterator_next(&iterator))
1241 	{
1242 		zbx_list_iterator_peek(&iterator, (void **)&request);
1243 
1244 		if (NULL == (item = zbx_hashset_search(items, &request->value.itemid)))
1245 		{
1246 			zbx_preproc_item_stats_t	item_local = {.itemid = request->value.itemid};
1247 
1248 			item = zbx_hashset_insert(items, &item_local, sizeof(item_local));
1249 			zbx_vector_ptr_append(view, item);
1250 		}
1251 		/* There might be processed, but not yet flushed items at the start of queue with    */
1252 		/* freed preprocessing steps and steps_num being zero. Because of that keep updating */
1253 		/* items steps_num to have preprocessing steps of last queued item.                  */
1254 		item->steps_num = request->steps_num;
1255 		item->values_num++;
1256 	}
1257 }
1258 
1259 
1260 /******************************************************************************
1261  *                                                                            *
1262  * Function: preprocessor_get_top_items                                       *
1263  *                                                                            *
1264  * Purpose: return diagnostic top view                                        *
1265  *                                                                            *
1266  * Parameters: manager - [IN] preprocessing manager                           *
1267  *             client  - [IN] IPC client                                      *
1268  *             message - [IN] the message with request                        *
1269  *                                                                            *
1270  ******************************************************************************/
preprocessor_get_top_items(zbx_preprocessing_manager_t * manager,zbx_ipc_client_t * client,zbx_ipc_message_t * message)1271 static void	preprocessor_get_top_items(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
1272 		zbx_ipc_message_t *message)
1273 {
1274 	int			limit;
1275 	unsigned char		*data;
1276 	zbx_uint32_t		data_len;
1277 	zbx_hashset_t		items;
1278 	zbx_vector_ptr_t	view;
1279 
1280 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1281 
1282 	zbx_preprocessor_unpack_top_request(&limit, message->data);
1283 
1284 	zbx_hashset_create(&items, 1024, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1285 	zbx_vector_ptr_create(&view);
1286 
1287 	preprocessor_get_items_view(manager, &items, &view);
1288 
1289 	zbx_vector_ptr_sort(&view, preproc_sort_item_by_values_desc);
1290 
1291 	data_len = zbx_preprocessor_pack_top_items_result(&data, (zbx_preproc_item_stats_t **)view.values,
1292 			MIN(limit, view.values_num));
1293 	zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_TOP_ITEMS_RESULT, data, data_len);
1294 	zbx_free(data);
1295 
1296 	zbx_vector_ptr_destroy(&view);
1297 	zbx_hashset_destroy(&items);
1298 
1299 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1300 }
1301 
preprocessor_get_oldest_preproc_items(zbx_preprocessing_manager_t * manager,zbx_ipc_client_t * client,zbx_ipc_message_t * message)1302 static void	preprocessor_get_oldest_preproc_items(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
1303 		zbx_ipc_message_t *message)
1304 {
1305 	int			limit, i;
1306 	unsigned char		*data;
1307 	zbx_uint32_t		data_len;
1308 	zbx_hashset_t		items;
1309 	zbx_vector_ptr_t	view, view_preproc;
1310 
1311 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1312 
1313 	zbx_preprocessor_unpack_top_request(&limit, message->data);
1314 
1315 	zbx_hashset_create(&items, 1024, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1316 	zbx_vector_ptr_create(&view);
1317 	zbx_vector_ptr_create(&view_preproc);
1318 	zbx_vector_ptr_reserve(&view_preproc, limit);
1319 
1320 	preprocessor_get_items_view(manager, &items, &view);
1321 
1322 	for (i = 0; i < view.values_num && 0 < limit; i++)
1323 	{
1324 		zbx_preproc_item_stats_t	*item;
1325 
1326 		item = (zbx_preproc_item_stats_t *)view.values[i];
1327 
1328 		/* only items with preprocessing can slow down queue */
1329 		if (0 == item->steps_num)
1330 			continue;
1331 
1332 		zbx_vector_ptr_append(&view_preproc, item);
1333 		limit--;
1334 	}
1335 
1336 	data_len = zbx_preprocessor_pack_top_items_result(&data, (zbx_preproc_item_stats_t **)view_preproc.values,
1337 			MIN(limit, view_preproc.values_num));
1338 	zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_TOP_ITEMS_RESULT, data, data_len);
1339 	zbx_free(data);
1340 
1341 	zbx_vector_ptr_destroy(&view_preproc);
1342 	zbx_vector_ptr_destroy(&view);
1343 	zbx_hashset_destroy(&items);
1344 
1345 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1346 }
1347 
1348 /******************************************************************************
1349  *                                                                            *
1350  * Function: preprocessor_init_manager                                        *
1351  *                                                                            *
1352  * Purpose: initializes preprocessing manager                                 *
1353  *                                                                            *
1354  * Parameters: manager - [IN] the manager to initialize                       *
1355  *                                                                            *
1356  ******************************************************************************/
preprocessor_init_manager(zbx_preprocessing_manager_t * manager)1357 static void	preprocessor_init_manager(zbx_preprocessing_manager_t *manager)
1358 {
1359 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() workers: %d", __func__, CONFIG_PREPROCESSOR_FORKS);
1360 
1361 	memset(manager, 0, sizeof(zbx_preprocessing_manager_t));
1362 
1363 	manager->workers = (zbx_preprocessing_worker_t *)zbx_calloc(NULL, CONFIG_PREPROCESSOR_FORKS,
1364 			sizeof(zbx_preprocessing_worker_t));
1365 	zbx_list_create(&manager->queue);
1366 	zbx_list_create(&manager->direct_queue);
1367 	zbx_hashset_create_ext(&manager->item_config, 0, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC,
1368 			(zbx_clean_func_t)preproc_item_clear,
1369 			ZBX_DEFAULT_MEM_MALLOC_FUNC, ZBX_DEFAULT_MEM_REALLOC_FUNC, ZBX_DEFAULT_MEM_FREE_FUNC);
1370 	zbx_hashset_create(&manager->linked_items, 0, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1371 	zbx_hashset_create(&manager->history_cache, 1000, ZBX_DEFAULT_UINT64_HASH_FUNC,
1372 			ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1373 
1374 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1375 }
1376 
1377 /******************************************************************************
1378  *                                                                            *
1379  * Function: preprocessor_register_worker                                     *
1380  *                                                                            *
1381  * Purpose: registers preprocessing worker                                    *
1382  *                                                                            *
1383  * Parameters: manager - [IN] the manager                                     *
1384  *             client  - [IN] the connected preprocessing worker              *
1385  *             message - [IN] message received by preprocessing manager       *
1386  *                                                                            *
1387  ******************************************************************************/
preprocessor_register_worker(zbx_preprocessing_manager_t * manager,zbx_ipc_client_t * client,zbx_ipc_message_t * message)1388 static void preprocessor_register_worker(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
1389 		zbx_ipc_message_t *message)
1390 {
1391 	zbx_preprocessing_worker_t	*worker = NULL;
1392 	pid_t				ppid;
1393 
1394 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1395 
1396 	memcpy(&ppid, message->data, sizeof(ppid));
1397 
1398 	if (ppid != getppid())
1399 	{
1400 		zbx_ipc_client_close(client);
1401 		zabbix_log(LOG_LEVEL_DEBUG, "refusing connection from foreign process");
1402 	}
1403 	else
1404 	{
1405 		if (CONFIG_PREPROCESSOR_FORKS == manager->worker_count)
1406 		{
1407 			THIS_SHOULD_NEVER_HAPPEN;
1408 			exit(EXIT_FAILURE);
1409 		}
1410 
1411 		worker = (zbx_preprocessing_worker_t *)&manager->workers[manager->worker_count++];
1412 		worker->client = client;
1413 
1414 		preprocessor_assign_tasks(manager);
1415 	}
1416 
1417 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1418 }
1419 
1420 /******************************************************************************
1421  *                                                                            *
1422  * Function: preprocessor_destroy_manager                                     *
1423  *                                                                            *
1424  * Purpose: destroy preprocessing manager                                     *
1425  *                                                                            *
1426  * Parameters: manager - [IN] the manager to destroy                          *
1427  *                                                                            *
1428  ******************************************************************************/
preprocessor_destroy_manager(zbx_preprocessing_manager_t * manager)1429 static void	preprocessor_destroy_manager(zbx_preprocessing_manager_t *manager)
1430 {
1431 	zbx_preprocessing_request_t		*request;
1432 	zbx_preprocessing_direct_request_t	*direct_request;
1433 
1434 	zbx_free(manager->workers);
1435 
1436 	/* this is the place where values are lost */
1437 	while (SUCCEED == zbx_list_pop(&manager->direct_queue, (void **)&direct_request))
1438 		preprocessor_free_direct_request(direct_request);
1439 
1440 	zbx_list_destroy(&manager->direct_queue);
1441 
1442 	while (SUCCEED == zbx_list_pop(&manager->queue, (void **)&request))
1443 		preprocessor_free_request(request);
1444 
1445 	zbx_list_destroy(&manager->queue);
1446 
1447 	zbx_hashset_destroy(&manager->item_config);
1448 	zbx_hashset_destroy(&manager->linked_items);
1449 	zbx_hashset_destroy(&manager->history_cache);
1450 }
1451 
ZBX_THREAD_ENTRY(preprocessing_manager_thread,args)1452 ZBX_THREAD_ENTRY(preprocessing_manager_thread, args)
1453 {
1454 	zbx_ipc_service_t		service;
1455 	char				*error = NULL;
1456 	zbx_ipc_client_t		*client;
1457 	zbx_ipc_message_t		*message;
1458 	zbx_preprocessing_manager_t	manager;
1459 	int				ret;
1460 	double				time_stat, time_idle = 0, time_now, time_flush, sec;
1461 
1462 #define	STAT_INTERVAL	5	/* if a process is busy and does not sleep then update status not faster than */
1463 				/* once in STAT_INTERVAL seconds */
1464 
1465 	process_type = ((zbx_thread_args_t *)args)->process_type;
1466 	server_num = ((zbx_thread_args_t *)args)->server_num;
1467 	process_num = ((zbx_thread_args_t *)args)->process_num;
1468 
1469 	zbx_setproctitle("%s #%d starting", get_process_type_string(process_type), process_num);
1470 
1471 	zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(program_type),
1472 			server_num, get_process_type_string(process_type), process_num);
1473 
1474 	update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);
1475 
1476 	if (FAIL == zbx_ipc_service_start(&service, ZBX_IPC_SERVICE_PREPROCESSING, &error))
1477 	{
1478 		zabbix_log(LOG_LEVEL_CRIT, "cannot start preprocessing service: %s", error);
1479 		zbx_free(error);
1480 		exit(EXIT_FAILURE);
1481 	}
1482 
1483 	preprocessor_init_manager(&manager);
1484 
1485 	/* initialize statistics */
1486 	time_stat = zbx_time();
1487 	time_flush = time_stat;
1488 
1489 	zbx_setproctitle("%s #%d started", get_process_type_string(process_type), process_num);
1490 
1491 	while (ZBX_IS_RUNNING())
1492 	{
1493 		time_now = zbx_time();
1494 
1495 		if (STAT_INTERVAL < time_now - time_stat)
1496 		{
1497 			zbx_setproctitle("%s #%d [queued " ZBX_FS_UI64 ", processed " ZBX_FS_UI64 " values, idle "
1498 					ZBX_FS_DBL " sec during " ZBX_FS_DBL " sec]",
1499 					get_process_type_string(process_type), process_num,
1500 					manager.queued_num, manager.processed_num, time_idle, time_now - time_stat);
1501 
1502 			time_stat = time_now;
1503 			time_idle = 0;
1504 			manager.processed_num = 0;
1505 		}
1506 
1507 		update_selfmon_counter(ZBX_PROCESS_STATE_IDLE);
1508 		ret = zbx_ipc_service_recv(&service, ZBX_PREPROCESSING_MANAGER_DELAY, &client, &message);
1509 		update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);
1510 		sec = zbx_time();
1511 		zbx_update_env(sec);
1512 
1513 		if (ZBX_IPC_RECV_IMMEDIATE != ret)
1514 			time_idle += sec - time_now;
1515 
1516 		if (NULL != message)
1517 		{
1518 			switch (message->code)
1519 			{
1520 				case ZBX_IPC_PREPROCESSOR_WORKER:
1521 					preprocessor_register_worker(&manager, client, message);
1522 					break;
1523 				case ZBX_IPC_PREPROCESSOR_REQUEST:
1524 					preprocessor_add_request(&manager, message);
1525 					break;
1526 				case ZBX_IPC_PREPROCESSOR_RESULT:
1527 					preprocessor_add_result(&manager, client, message);
1528 					break;
1529 				case ZBX_IPC_PREPROCESSOR_QUEUE:
1530 					zbx_ipc_client_send(client, message->code, (unsigned char *)&manager.queued_num,
1531 							sizeof(zbx_uint64_t));
1532 					break;
1533 				case ZBX_IPC_PREPROCESSOR_TEST_REQUEST:
1534 					preprocessor_add_test_request(&manager, client, message);
1535 					break;
1536 				case ZBX_IPC_PREPROCESSOR_TEST_RESULT:
1537 					preprocessor_flush_test_result(&manager, client, message);
1538 					break;
1539 				case ZBX_IPC_PREPROCESSOR_DIAG_STATS:
1540 					preprocessor_get_diag_stats(&manager, client);
1541 					break;
1542 				case ZBX_IPC_PREPROCESSOR_TOP_ITEMS:
1543 					preprocessor_get_top_items(&manager, client, message);
1544 					break;
1545 				case ZBX_IPC_PREPROCESSOR_TOP_OLDEST_PREPROC_ITEMS:
1546 					preprocessor_get_oldest_preproc_items(&manager, client, message);
1547 					break;
1548 			}
1549 
1550 			zbx_ipc_message_free(message);
1551 		}
1552 
1553 		if (NULL != client)
1554 			zbx_ipc_client_release(client);
1555 
1556 		if (0 == manager.preproc_num || 1 < time_now - time_flush)
1557 		{
1558 			dc_flush_history();
1559 			time_flush = time_now;
1560 		}
1561 	}
1562 
1563 	zbx_setproctitle("%s #%d [terminated]", get_process_type_string(process_type), process_num);
1564 
1565 	while (1)
1566 		zbx_sleep(SEC_PER_MIN);
1567 
1568 	zbx_ipc_service_close(&service);
1569 	preprocessor_destroy_manager(&manager);
1570 #undef STAT_INTERVAL
1571 }
1572