1 /*
2 ** Zabbix
3 ** Copyright (C) 2001-2021 Zabbix SIA
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
18 **/
19 
20 #include "common.h"
21 #include "daemon.h"
22 
23 #include "zbxself.h"
24 #include "log.h"
25 #include "zbxipcservice.h"
26 #include "lld_manager.h"
27 #include "lld_protocol.h"
28 
29 extern unsigned char	process_type, program_type;
30 extern int		server_num, process_num;
31 
32 extern int	CONFIG_LLDWORKER_FORKS;
33 
34 /*
35  * The LLD queue is organized as a queue (rule_queue binary heap) of LLD rules,
36  * sorted by their oldest value timestamps. The values are stored in linked lists,
37  * each rule having its own list of values. Values inside list are not sorted, so
38  * in the case a LLD rule received a value with past timestamp, it will be processed
39  * in queuing order, not the value chronological order.
40  *
41  * During processing the rule with oldest value is popped from queue and sent
42  * to a free worker. After processing the rule worker sends done response and
43  * manager removes the oldest value from rule's value list. If there are no more
44  * values in the list the rule is removed from the index (rule_index hashset),
45  * otherwise the rule is enqueued back in LLD queue.
46  *
47  */
48 
49 typedef struct
50 {
51 	/* workers vector, created during manager initialization */
52 	zbx_vector_ptr_t	workers;
53 
54 	/* free workers */
55 	zbx_queue_ptr_t		free_workers;
56 
57 	/* workers indexed by IPC service clients */
58 	zbx_hashset_t		workers_client;
59 
60 	/* the next worker index to be assigned to new IPC service clients */
61 	int			next_worker_index;
62 
63 	/* index of queued LLD rules */
64 	zbx_hashset_t		rule_index;
65 
66 	/* LLD rule queue, ordered by the oldest values */
67 	zbx_binary_heap_t	rule_queue;
68 
69 	/* the number of queued LLD rules */
70 	zbx_uint64_t		queued_num;
71 
72 }
73 zbx_lld_manager_t;
74 
75 typedef struct
76 {
77 	zbx_ipc_client_t	*client;
78 	zbx_lld_rule_t		*rule;
79 }
80 zbx_lld_worker_t;
81 
82 /* workers_client hashset support */
worker_hash_func(const void * d)83 static zbx_hash_t	worker_hash_func(const void *d)
84 {
85 	const zbx_lld_worker_t	*worker = *(const zbx_lld_worker_t **)d;
86 
87 	zbx_hash_t hash =  ZBX_DEFAULT_PTR_HASH_FUNC(&worker->client);
88 
89 	return hash;
90 }
91 
worker_compare_func(const void * d1,const void * d2)92 static int	worker_compare_func(const void *d1, const void *d2)
93 {
94 	const zbx_lld_worker_t	*p1 = *(const zbx_lld_worker_t **)d1;
95 	const zbx_lld_worker_t	*p2 = *(const zbx_lld_worker_t **)d2;
96 
97 	ZBX_RETURN_IF_NOT_EQUAL(p1->client, p2->client);
98 	return 0;
99 }
100 
101 /* rule_queue binary heap support */
rule_elem_compare_func(const void * d1,const void * d2)102 static int	rule_elem_compare_func(const void *d1, const void *d2)
103 {
104 	const zbx_binary_heap_elem_t	*e1 = (const zbx_binary_heap_elem_t *)d1;
105 	const zbx_binary_heap_elem_t	*e2 = (const zbx_binary_heap_elem_t *)d2;
106 
107 	const zbx_lld_rule_t	*rule1 = (const zbx_lld_rule_t *)e1->data;
108 	const zbx_lld_rule_t	*rule2 = (const zbx_lld_rule_t *)e2->data;
109 
110 	/* compare by timestamp of the oldest value */
111 	return zbx_timespec_compare(&rule1->head->ts, &rule2->head->ts);
112 }
113 
114 /******************************************************************************
115  *                                                                            *
116  * Function: lld_data_free                                                    *
117  *                                                                            *
118  * Purpose: frees LLD data                                                    *
119  *                                                                            *
120  ******************************************************************************/
lld_data_free(zbx_lld_data_t * data)121 static void	lld_data_free(zbx_lld_data_t *data)
122 {
123 	zbx_free(data->value);
124 	zbx_free(data->error);
125 	zbx_free(data);
126 }
127 
128 /******************************************************************************
129  *                                                                            *
130  * Function: lld_rule_clear                                                   *
131  *                                                                            *
132  * Purpose: clears LLD rule                                                   *
133  *                                                                            *
134  ******************************************************************************/
lld_rule_clear(zbx_lld_rule_t * rule)135 static void	lld_rule_clear(zbx_lld_rule_t *rule)
136 {
137 	zbx_lld_data_t	*data;
138 
139 	while (NULL != rule->head)
140 	{
141 		data = rule->head;
142 		rule->head = data->next;
143 		lld_data_free(data);
144 	}
145 }
146 
147 /******************************************************************************
148  *                                                                            *
149  * Function: lld_worker_free                                                  *
150  *                                                                            *
151  * Purpose: frees LLD worker                                                  *
152  *                                                                            *
153  ******************************************************************************/
lld_worker_free(zbx_lld_worker_t * worker)154 static void	lld_worker_free(zbx_lld_worker_t *worker)
155 {
156 	zbx_free(worker);
157 }
158 
159 /******************************************************************************
160  *                                                                            *
161  * Function: lld_manager_init                                                 *
162  *                                                                            *
163  * Purpose: initializes LLD manager                                           *
164  *                                                                            *
165  * Parameters: manager - [IN] the manager to initialize                       *
166  *                                                                            *
167  ******************************************************************************/
lld_manager_init(zbx_lld_manager_t * manager)168 static void	lld_manager_init(zbx_lld_manager_t *manager)
169 {
170 	int			i;
171 	zbx_lld_worker_t	*worker;
172 
173 	zabbix_log(LOG_LEVEL_DEBUG, "In %s() workers:%d", __func__, CONFIG_LLDWORKER_FORKS);
174 
175 	zbx_vector_ptr_create(&manager->workers);
176 	zbx_queue_ptr_create(&manager->free_workers);
177 	zbx_hashset_create(&manager->workers_client, 0, worker_hash_func, worker_compare_func);
178 
179 	zbx_hashset_create_ext(&manager->rule_index, 0, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC,
180 			(zbx_clean_func_t)lld_rule_clear,
181 			ZBX_DEFAULT_MEM_MALLOC_FUNC, ZBX_DEFAULT_MEM_REALLOC_FUNC, ZBX_DEFAULT_MEM_FREE_FUNC);
182 
183 	zbx_binary_heap_create(&manager->rule_queue, rule_elem_compare_func, ZBX_BINARY_HEAP_OPTION_EMPTY);
184 
185 	manager->next_worker_index = 0;
186 
187 	for (i = 0; i < CONFIG_LLDWORKER_FORKS; i++)
188 	{
189 		worker = (zbx_lld_worker_t *)zbx_malloc(NULL, sizeof(zbx_lld_worker_t));
190 
191 		worker->client = NULL;
192 
193 		zbx_vector_ptr_append(&manager->workers, worker);
194 	}
195 
196 	manager->queued_num = 0;
197 
198 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
199 }
200 
201 /******************************************************************************
202  *                                                                            *
203  * Function: lld_manager_destroy                                              *
204  *                                                                            *
205  * Purpose: destroys LLD manager                                              *
206  *                                                                            *
207  * Parameters: manager - [IN] the manager to destroy                          *
208  *                                                                            *
209  ******************************************************************************/
lld_manager_destroy(zbx_lld_manager_t * manager)210 static void	lld_manager_destroy(zbx_lld_manager_t *manager)
211 {
212 	zbx_binary_heap_destroy(&manager->rule_queue);
213 	zbx_hashset_destroy(&manager->rule_index);
214 	zbx_queue_ptr_destroy(&manager->free_workers);
215 	zbx_hashset_destroy(&manager->workers_client);
216 	zbx_vector_ptr_clear_ext(&manager->workers, (zbx_clean_func_t)lld_worker_free);
217 	zbx_vector_ptr_destroy(&manager->workers);
218 }
219 
220 /******************************************************************************
221  *                                                                            *
222  * Function: lld_get_worker_by_client                                         *
223  *                                                                            *
224  * Purpose: returns worker by connected IPC client data                       *
225  *                                                                            *
226  * Parameters: manager - [IN] the manager                                     *
227  *             client  - [IN] the connected worker                            *
228  *                                                                            *
229  * Return value: The LLD worker                                               *
230  *                                                                            *
231  ******************************************************************************/
lld_get_worker_by_client(zbx_lld_manager_t * manager,zbx_ipc_client_t * client)232 static zbx_lld_worker_t	*lld_get_worker_by_client(zbx_lld_manager_t *manager, zbx_ipc_client_t *client)
233 {
234 	zbx_lld_worker_t	**worker, worker_local, *plocal = &worker_local;
235 
236 	plocal->client = client;
237 	worker = (zbx_lld_worker_t **)zbx_hashset_search(&manager->workers_client, &plocal);
238 
239 	if (NULL == worker)
240 	{
241 		THIS_SHOULD_NEVER_HAPPEN;
242 		exit(EXIT_FAILURE);
243 	}
244 
245 	return *worker;
246 }
247 
248 /******************************************************************************
249  *                                                                            *
250  * Function: lld_register_worker                                              *
251  *                                                                            *
252  * Purpose: registers worker                                                  *
253  *                                                                            *
254  * Parameters: manager - [IN] the manager                                     *
255  *             client  - [IN] the connected worker IPC client data            *
256  *             message - [IN] the received message                            *
257  *                                                                            *
258  ******************************************************************************/
lld_register_worker(zbx_lld_manager_t * manager,zbx_ipc_client_t * client,const zbx_ipc_message_t * message)259 static void	lld_register_worker(zbx_lld_manager_t *manager, zbx_ipc_client_t *client,
260 		const zbx_ipc_message_t *message)
261 {
262 	zbx_lld_worker_t	*worker;
263 	pid_t			ppid;
264 
265 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
266 
267 	memcpy(&ppid, message->data, sizeof(ppid));
268 
269 	if (ppid != getppid())
270 	{
271 		zbx_ipc_client_close(client);
272 		zabbix_log(LOG_LEVEL_DEBUG, "refusing connection from foreign process");
273 	}
274 	else
275 	{
276 		if (manager->next_worker_index == manager->workers.values_num)
277 		{
278 			THIS_SHOULD_NEVER_HAPPEN;
279 			exit(EXIT_FAILURE);
280 		}
281 
282 		worker = (zbx_lld_worker_t *)manager->workers.values[manager->next_worker_index++];
283 		worker->client = client;
284 
285 		zbx_hashset_insert(&manager->workers_client, &worker, sizeof(zbx_lld_worker_t *));
286 		zbx_queue_ptr_push(&manager->free_workers, worker);
287 	}
288 
289 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
290 }
291 
292 /******************************************************************************
293  *                                                                            *
294  * Function: lld_queue_rule                                                   *
295  *                                                                            *
296  * Purpose: queues LLD rule                                                   *
297  *                                                                            *
298  * Parameters: manager - [IN] the LLD manager                                 *
299  *             rule    - [IN] the LLD rule                                    *
300  *                                                                            *
301  ******************************************************************************/
lld_queue_rule(zbx_lld_manager_t * manager,zbx_lld_rule_t * rule)302 static void	lld_queue_rule(zbx_lld_manager_t *manager, zbx_lld_rule_t *rule)
303 {
304 	zbx_binary_heap_elem_t	elem = {rule->hostid, rule};
305 
306 	zbx_binary_heap_insert(&manager->rule_queue, &elem);
307 }
308 
309 /******************************************************************************
310  *                                                                            *
311  * Function: lld_queue_request                                                *
312  *                                                                            *
313  * Purpose: queues low level discovery request                                *
314  *                                                                            *
315  * Parameters: manager - [IN] the LLD manager                                 *
316  *             message - [IN] the message with LLD request                    *
317  *                                                                            *
318  ******************************************************************************/
lld_queue_request(zbx_lld_manager_t * manager,const zbx_ipc_message_t * message)319 static void	lld_queue_request(zbx_lld_manager_t *manager, const zbx_ipc_message_t *message)
320 {
321 	zbx_uint64_t	hostid;
322 	zbx_lld_rule_t	*rule;
323 	zbx_lld_data_t	*data;
324 
325 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
326 
327 	data = (zbx_lld_data_t *)zbx_malloc(NULL, sizeof(zbx_lld_data_t));
328 	data->next = NULL;
329 
330 	zbx_lld_deserialize_item_value(message->data, &data->itemid, &hostid, &data->value, &data->ts, &data->meta,
331 			&data->lastlogsize, &data->mtime, &data->error);
332 
333 	if (NULL == (rule = zbx_hashset_search(&manager->rule_index, &hostid)))
334 	{
335 		zbx_lld_rule_t	rule_local = {.hostid = hostid, .values_num = 0, .tail = data, .head = data};
336 
337 		data->prev = NULL;
338 
339 		rule = zbx_hashset_insert(&manager->rule_index, &rule_local, sizeof(rule_local));
340 		lld_queue_rule(manager, rule);
341 	}
342 	else
343 	{
344 		if (0 == data->meta)
345 		{
346 			zbx_lld_data_t	*data_ptr;
347 
348 			for (data_ptr = rule->tail; NULL != data_ptr; data_ptr = data_ptr->prev)
349 			{
350 				/* if there are multiple values then they should be different, check only last one */
351 				if (data_ptr->itemid == data->itemid)
352 					break;
353 			}
354 
355 			if (NULL != data_ptr && 0 == zbx_strcmp_null(data->error, data_ptr->error) &&
356 					0 == zbx_strcmp_null(data->value, data_ptr->value))
357 			{
358 				zabbix_log(LOG_LEVEL_DEBUG, "skip repeating values for discovery rule:" ZBX_FS_UI64,
359 						data->itemid);
360 
361 				lld_data_free(data);
362 				goto out;
363 			}
364 		}
365 
366 		data->prev = rule->tail;
367 		rule->tail->next = data;
368 		rule->tail = data;
369 	}
370 
371 	zabbix_log(LOG_LEVEL_DEBUG, "queuing discovery rule:" ZBX_FS_UI64, data->itemid);
372 
373 	rule->values_num++;
374 	manager->queued_num++;
375 out:
376 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
377 }
378 
379 /******************************************************************************
380  *                                                                            *
381  * Function: lld_process_next_request                                         *
382  *                                                                            *
383  * Purpose: processes next LLD request from queue                             *
384  *                                                                            *
385  * Parameters: manager - [IN] the LLD manager                                 *
386  *             worker  - [IN] the target worker                               *
387  *                                                                            *
388  ******************************************************************************/
lld_process_next_request(zbx_lld_manager_t * manager,zbx_lld_worker_t * worker)389 static void	lld_process_next_request(zbx_lld_manager_t *manager, zbx_lld_worker_t *worker)
390 {
391 	zbx_binary_heap_elem_t	*elem;
392 	unsigned char		*buf;
393 	zbx_uint32_t		buf_len;
394 	zbx_lld_data_t		*data;
395 
396 	elem = zbx_binary_heap_find_min(&manager->rule_queue);
397 	worker->rule = (zbx_lld_rule_t *)elem->data;
398 	zbx_binary_heap_remove_min(&manager->rule_queue);
399 
400 	data = worker->rule->head;
401 	buf_len = zbx_lld_serialize_item_value(&buf, data->itemid, 0, data->value, &data->ts, data->meta,
402 			data->lastlogsize, data->mtime, data->error);
403 	zbx_ipc_client_send(worker->client, ZBX_IPC_LLD_TASK, buf, buf_len);
404 	zbx_free(buf);
405 }
406 
407 /******************************************************************************
408  *                                                                            *
409  * Function: lld_process_queue                                                *
410  *                                                                            *
411  * Purpose: sends queued LLD rules to free workers                            *
412  *                                                                            *
413  * Parameters: manager - [IN] the LLD manager                                 *
414  *                                                                            *
415  ******************************************************************************/
lld_process_queue(zbx_lld_manager_t * manager)416 static void	lld_process_queue(zbx_lld_manager_t *manager)
417 {
418 	zbx_lld_worker_t	*worker;
419 
420 	while (SUCCEED != zbx_binary_heap_empty(&manager->rule_queue))
421 	{
422 		if (NULL == (worker = zbx_queue_ptr_pop(&manager->free_workers)))
423 			break;
424 
425 		lld_process_next_request(manager, worker);
426 	}
427 }
428 
429 /******************************************************************************
430  *                                                                            *
431  * Function: lld_process_result                                               *
432  *                                                                            *
433  * Purpose: processes LLD worker 'done' response                              *
434  *                                                                            *
435  * Parameters: manager - [IN] the LLD manager                                 *
436  * Parameters: client  - [IN] the worker's IPC client connection              *
437  *                                                                            *
438  ******************************************************************************/
lld_process_result(zbx_lld_manager_t * manager,zbx_ipc_client_t * client)439 static void	lld_process_result(zbx_lld_manager_t *manager, zbx_ipc_client_t *client)
440 {
441 	zbx_lld_worker_t	*worker;
442 	zbx_lld_rule_t		*rule;
443 	zbx_lld_data_t		*data;
444 
445 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
446 
447 	worker = lld_get_worker_by_client(manager, client);
448 
449 	zabbix_log(LOG_LEVEL_DEBUG, "discovery rule:" ZBX_FS_UI64 " has been processed", worker->rule->head->itemid);
450 
451 	rule = worker->rule;
452 	worker->rule = NULL;
453 
454 	data = rule->head;
455 	rule->head = rule->head->next;
456 
457 	if (NULL == rule->head)
458 	{
459 		zbx_hashset_remove_direct(&manager->rule_index, rule);
460 	}
461 	else
462 	{
463 		rule->head->prev = NULL;
464 		rule->values_num--;
465 		lld_queue_rule(manager, rule);
466 	}
467 
468 	lld_data_free(data);
469 
470 	if (SUCCEED != zbx_binary_heap_empty(&manager->rule_queue))
471 		lld_process_next_request(manager, worker);
472 	else
473 		zbx_queue_ptr_push(&manager->free_workers, worker);
474 
475 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
476 }
477 
478 /******************************************************************************
479  *                                                                            *
480  * Function: lld_process_diag_stats                                           *
481  *                                                                            *
482  * Purpose: processes external diagnostic statistics request                  *
483  *                                                                            *
484  * Parameters: manager - [IN] the LLD manager                                 *
485  * Parameters: client  - [IN] the external IPC connection                     *
486  *                                                                            *
487  ******************************************************************************/
lld_process_diag_stats(zbx_lld_manager_t * manager,zbx_ipc_client_t * client)488 static void	lld_process_diag_stats(zbx_lld_manager_t *manager, zbx_ipc_client_t *client)
489 {
490 	unsigned char	*data;
491 	zbx_uint32_t	data_len;
492 
493 	data_len = zbx_lld_serialize_diag_stats(&data, manager->rule_index.num_data, manager->queued_num);
494 	zbx_ipc_client_send(client, ZBX_IPC_LLD_DIAG_STATS_RESULT, data, data_len);
495 	zbx_free(data);
496 }
497 
498 /******************************************************************************
499  *                                                                            *
500  * Function: lld_diag_item_compare_values_desc                                *
501  *                                                                            *
502  * Purpose: sort lld manager cache item view by second value                  *
503  *          (number of values) in descending order                            *
504  *                                                                            *
505  ******************************************************************************/
lld_diag_item_compare_values_desc(const void * d1,const void * d2)506 static int	lld_diag_item_compare_values_desc(const void *d1, const void *d2)
507 {
508 	zbx_lld_rule_info_t	*r1 = *(zbx_lld_rule_info_t **)d1;
509 	zbx_lld_rule_info_t	*r2 = *(zbx_lld_rule_info_t **)d2;
510 
511 	return r2->values_num - r1->values_num;
512 }
513 
514 /******************************************************************************
515  *                                                                            *
516  * Function: lld_process_diag_top                                             *
517  *                                                                            *
518  * Purpose: processes external top items request                              *
519  *                                                                            *
520  * Parameters: manager - [IN] the manager                                     *
521  *             client  - [IN] the connected worker IPC client data            *
522  *             message - [IN] the received message                            *
523  *                                                                            *
524  ******************************************************************************/
lld_process_top_items(zbx_lld_manager_t * manager,zbx_ipc_client_t * client,const zbx_ipc_message_t * message)525 static void	lld_process_top_items(zbx_lld_manager_t *manager, zbx_ipc_client_t *client,
526 		const zbx_ipc_message_t *message)
527 {
528 	int			limit;
529 	unsigned char		*data;
530 	zbx_uint32_t		data_len;
531 	zbx_vector_ptr_t	view;
532 	zbx_hashset_iter_t	iter;
533 	zbx_hashset_t		rule_infos;
534 	zbx_lld_rule_t		*rule;
535 
536 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
537 
538 	zbx_lld_deserialize_top_items_request(message->data, &limit);
539 
540 	zbx_hashset_create(&rule_infos, MAX(1000, (size_t)manager->rule_index.num_data), ZBX_DEFAULT_UINT64_HASH_FUNC,
541 			ZBX_DEFAULT_UINT64_COMPARE_FUNC);
542 	zbx_vector_ptr_create(&view);
543 
544 	zbx_hashset_iter_reset(&manager->rule_index, &iter);
545 	while (NULL != (rule = (zbx_lld_rule_t *)zbx_hashset_iter_next(&iter)))
546 	{
547 		zbx_lld_data_t	*data_ptr;
548 
549 		for (data_ptr = rule->head; NULL != data_ptr; data_ptr = data_ptr->next)
550 		{
551 			zbx_lld_rule_info_t	*rule_info, rule_info_local = {.itemid = data_ptr->itemid};
552 
553 			rule_info = (zbx_lld_rule_info_t *)zbx_hashset_search(&rule_infos, &rule_info_local);
554 
555 			if (NULL == rule_info)
556 			{
557 				rule_info = (zbx_lld_rule_info_t *)zbx_hashset_insert(&rule_infos, &rule_info_local,
558 						sizeof(zbx_lld_rule_info_t));
559 				zbx_vector_ptr_append(&view, rule_info);
560 			}
561 
562 			rule_info->values_num++;
563 		}
564 	}
565 
566 	zbx_vector_ptr_sort(&view, lld_diag_item_compare_values_desc);
567 
568 	data_len = zbx_lld_serialize_top_items_result(&data, (const zbx_lld_rule_info_t **)view.values,
569 			MIN(limit, view.values_num));
570 	zbx_ipc_client_send(client, ZBX_IPC_LLD_TOP_ITEMS_RESULT, data, data_len);
571 
572 	zbx_free(data);
573 	zbx_vector_ptr_destroy(&view);
574 	zbx_hashset_destroy(&rule_infos);
575 
576 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
577 }
578 
579 /******************************************************************************
580  *                                                                            *
581  * Function: lld_manager_thread                                               *
582  *                                                                            *
583  * Purpose: main processing loop                                              *
584  *                                                                            *
585  ******************************************************************************/
ZBX_THREAD_ENTRY(lld_manager_thread,args)586 ZBX_THREAD_ENTRY(lld_manager_thread, args)
587 {
588 #define	STAT_INTERVAL	5	/* if a process is busy and does not sleep then update status not faster than */
589 				/* once in STAT_INTERVAL seconds */
590 
591 	zbx_ipc_service_t	lld_service;
592 	char			*error = NULL;
593 	zbx_ipc_client_t	*client;
594 	zbx_ipc_message_t	*message;
595 	double			time_stat, time_now, sec, time_idle = 0;
596 	zbx_lld_manager_t	manager;
597 	zbx_uint64_t		processed_num = 0;
598 	int			ret;
599 
600 	process_type = ((zbx_thread_args_t *)args)->process_type;
601 	server_num = ((zbx_thread_args_t *)args)->server_num;
602 	process_num = ((zbx_thread_args_t *)args)->process_num;
603 
604 	zbx_setproctitle("%s #%d starting", get_process_type_string(process_type), process_num);
605 
606 	zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(program_type),
607 			server_num, get_process_type_string(process_type), process_num);
608 
609 	if (FAIL == zbx_ipc_service_start(&lld_service, ZBX_IPC_SERVICE_LLD, &error))
610 	{
611 		zabbix_log(LOG_LEVEL_CRIT, "cannot start LLD manager service: %s", error);
612 		zbx_free(error);
613 		exit(EXIT_FAILURE);
614 	}
615 
616 	lld_manager_init(&manager);
617 
618 	/* initialize statistics */
619 	time_stat = zbx_time();
620 
621 	zbx_setproctitle("%s #%d started", get_process_type_string(process_type), process_num);
622 
623 	update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);
624 
625 	while (ZBX_IS_RUNNING())
626 	{
627 		time_now = zbx_time();
628 
629 		if (STAT_INTERVAL < time_now - time_stat)
630 		{
631 			zbx_setproctitle("%s #%d [processed " ZBX_FS_UI64 " LLD rules, idle " ZBX_FS_DBL
632 					"sec during " ZBX_FS_DBL " sec]",
633 					get_process_type_string(process_type), process_num, processed_num,
634 					time_idle, time_now - time_stat);
635 
636 			time_stat = time_now;
637 			time_idle = 0;
638 			processed_num = 0;
639 		}
640 
641 		update_selfmon_counter(ZBX_PROCESS_STATE_IDLE);
642 		ret = zbx_ipc_service_recv(&lld_service, 1, &client, &message);
643 		update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);
644 
645 		sec = zbx_time();
646 		zbx_update_env(sec);
647 
648 		if (ZBX_IPC_RECV_IMMEDIATE != ret)
649 			time_idle += sec - time_now;
650 
651 		if (NULL != message)
652 		{
653 			switch (message->code)
654 			{
655 				case ZBX_IPC_LLD_REGISTER:
656 					lld_register_worker(&manager, client, message);
657 					break;
658 				case ZBX_IPC_LLD_REQUEST:
659 					lld_queue_request(&manager, message);
660 					lld_process_queue(&manager);
661 					break;
662 				case ZBX_IPC_LLD_DONE:
663 					lld_process_result(&manager, client);
664 					processed_num++;
665 					manager.queued_num--;
666 					break;
667 				case ZBX_IPC_LLD_QUEUE:
668 					zbx_ipc_client_send(client, message->code, (unsigned char *)&manager.queued_num,
669 							sizeof(zbx_uint64_t));
670 					break;
671 				case ZBX_IPC_LLD_DIAG_STATS:
672 					lld_process_diag_stats(&manager, client);
673 					break;
674 				case ZBX_IPC_LLD_TOP_ITEMS:
675 					lld_process_top_items(&manager, client, message);
676 					break;
677 			}
678 
679 			zbx_ipc_message_free(message);
680 		}
681 
682 		if (NULL != client)
683 			zbx_ipc_client_release(client);
684 	}
685 
686 	zbx_setproctitle("%s #%d [terminated]", get_process_type_string(process_type), process_num);
687 
688 	while (1)
689 		zbx_sleep(SEC_PER_MIN);
690 
691 	zbx_ipc_service_close(&lld_service);
692 	lld_manager_destroy(&manager);
693 }
694