1 /*
2 ** Zabbix
3 ** Copyright (C) 2001-2021 Zabbix SIA
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 **/
19
20 #include "common.h"
21 #include "daemon.h"
22
23 #include "zbxself.h"
24 #include "log.h"
25 #include "zbxipcservice.h"
26 #include "lld_manager.h"
27 #include "lld_protocol.h"
28
29 extern unsigned char process_type, program_type;
30 extern int server_num, process_num;
31
32 extern int CONFIG_LLDWORKER_FORKS;
33
34 /*
35 * The LLD queue is organized as a queue (rule_queue binary heap) of LLD rules,
36 * sorted by their oldest value timestamps. The values are stored in linked lists,
37 * each rule having its own list of values. Values inside list are not sorted, so
38 * in the case a LLD rule received a value with past timestamp, it will be processed
39 * in queuing order, not the value chronological order.
40 *
41 * During processing the rule with oldest value is popped from queue and sent
42 * to a free worker. After processing the rule worker sends done response and
43 * manager removes the oldest value from rule's value list. If there are no more
44 * values in the list the rule is removed from the index (rule_index hashset),
45 * otherwise the rule is enqueued back in LLD queue.
46 *
47 */
48
49 typedef struct
50 {
51 /* workers vector, created during manager initialization */
52 zbx_vector_ptr_t workers;
53
54 /* free workers */
55 zbx_queue_ptr_t free_workers;
56
57 /* workers indexed by IPC service clients */
58 zbx_hashset_t workers_client;
59
60 /* the next worker index to be assigned to new IPC service clients */
61 int next_worker_index;
62
63 /* index of queued LLD rules */
64 zbx_hashset_t rule_index;
65
66 /* LLD rule queue, ordered by the oldest values */
67 zbx_binary_heap_t rule_queue;
68
69 /* the number of queued LLD rules */
70 zbx_uint64_t queued_num;
71
72 }
73 zbx_lld_manager_t;
74
75 typedef struct
76 {
77 zbx_ipc_client_t *client;
78 zbx_lld_rule_t *rule;
79 }
80 zbx_lld_worker_t;
81
82 /* workers_client hashset support */
worker_hash_func(const void * d)83 static zbx_hash_t worker_hash_func(const void *d)
84 {
85 const zbx_lld_worker_t *worker = *(const zbx_lld_worker_t **)d;
86
87 zbx_hash_t hash = ZBX_DEFAULT_PTR_HASH_FUNC(&worker->client);
88
89 return hash;
90 }
91
worker_compare_func(const void * d1,const void * d2)92 static int worker_compare_func(const void *d1, const void *d2)
93 {
94 const zbx_lld_worker_t *p1 = *(const zbx_lld_worker_t **)d1;
95 const zbx_lld_worker_t *p2 = *(const zbx_lld_worker_t **)d2;
96
97 ZBX_RETURN_IF_NOT_EQUAL(p1->client, p2->client);
98 return 0;
99 }
100
101 /* rule_queue binary heap support */
rule_elem_compare_func(const void * d1,const void * d2)102 static int rule_elem_compare_func(const void *d1, const void *d2)
103 {
104 const zbx_binary_heap_elem_t *e1 = (const zbx_binary_heap_elem_t *)d1;
105 const zbx_binary_heap_elem_t *e2 = (const zbx_binary_heap_elem_t *)d2;
106
107 const zbx_lld_rule_t *rule1 = (const zbx_lld_rule_t *)e1->data;
108 const zbx_lld_rule_t *rule2 = (const zbx_lld_rule_t *)e2->data;
109
110 /* compare by timestamp of the oldest value */
111 return zbx_timespec_compare(&rule1->head->ts, &rule2->head->ts);
112 }
113
114 /******************************************************************************
115 * *
116 * Function: lld_data_free *
117 * *
118 * Purpose: frees LLD data *
119 * *
120 ******************************************************************************/
lld_data_free(zbx_lld_data_t * data)121 static void lld_data_free(zbx_lld_data_t *data)
122 {
123 zbx_free(data->value);
124 zbx_free(data->error);
125 zbx_free(data);
126 }
127
128 /******************************************************************************
129 * *
130 * Function: lld_rule_clear *
131 * *
132 * Purpose: clears LLD rule *
133 * *
134 ******************************************************************************/
lld_rule_clear(zbx_lld_rule_t * rule)135 static void lld_rule_clear(zbx_lld_rule_t *rule)
136 {
137 zbx_lld_data_t *data;
138
139 while (NULL != rule->head)
140 {
141 data = rule->head;
142 rule->head = data->next;
143 lld_data_free(data);
144 }
145 }
146
147 /******************************************************************************
148 * *
149 * Function: lld_worker_free *
150 * *
151 * Purpose: frees LLD worker *
152 * *
153 ******************************************************************************/
lld_worker_free(zbx_lld_worker_t * worker)154 static void lld_worker_free(zbx_lld_worker_t *worker)
155 {
156 zbx_free(worker);
157 }
158
159 /******************************************************************************
160 * *
161 * Function: lld_manager_init *
162 * *
163 * Purpose: initializes LLD manager *
164 * *
165 * Parameters: manager - [IN] the manager to initialize *
166 * *
167 ******************************************************************************/
lld_manager_init(zbx_lld_manager_t * manager)168 static void lld_manager_init(zbx_lld_manager_t *manager)
169 {
170 int i;
171 zbx_lld_worker_t *worker;
172
173 zabbix_log(LOG_LEVEL_DEBUG, "In %s() workers:%d", __func__, CONFIG_LLDWORKER_FORKS);
174
175 zbx_vector_ptr_create(&manager->workers);
176 zbx_queue_ptr_create(&manager->free_workers);
177 zbx_hashset_create(&manager->workers_client, 0, worker_hash_func, worker_compare_func);
178
179 zbx_hashset_create_ext(&manager->rule_index, 0, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC,
180 (zbx_clean_func_t)lld_rule_clear,
181 ZBX_DEFAULT_MEM_MALLOC_FUNC, ZBX_DEFAULT_MEM_REALLOC_FUNC, ZBX_DEFAULT_MEM_FREE_FUNC);
182
183 zbx_binary_heap_create(&manager->rule_queue, rule_elem_compare_func, ZBX_BINARY_HEAP_OPTION_EMPTY);
184
185 manager->next_worker_index = 0;
186
187 for (i = 0; i < CONFIG_LLDWORKER_FORKS; i++)
188 {
189 worker = (zbx_lld_worker_t *)zbx_malloc(NULL, sizeof(zbx_lld_worker_t));
190
191 worker->client = NULL;
192
193 zbx_vector_ptr_append(&manager->workers, worker);
194 }
195
196 manager->queued_num = 0;
197
198 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
199 }
200
201 /******************************************************************************
202 * *
203 * Function: lld_manager_destroy *
204 * *
205 * Purpose: destroys LLD manager *
206 * *
207 * Parameters: manager - [IN] the manager to destroy *
208 * *
209 ******************************************************************************/
lld_manager_destroy(zbx_lld_manager_t * manager)210 static void lld_manager_destroy(zbx_lld_manager_t *manager)
211 {
212 zbx_binary_heap_destroy(&manager->rule_queue);
213 zbx_hashset_destroy(&manager->rule_index);
214 zbx_queue_ptr_destroy(&manager->free_workers);
215 zbx_hashset_destroy(&manager->workers_client);
216 zbx_vector_ptr_clear_ext(&manager->workers, (zbx_clean_func_t)lld_worker_free);
217 zbx_vector_ptr_destroy(&manager->workers);
218 }
219
220 /******************************************************************************
221 * *
222 * Function: lld_get_worker_by_client *
223 * *
224 * Purpose: returns worker by connected IPC client data *
225 * *
226 * Parameters: manager - [IN] the manager *
227 * client - [IN] the connected worker *
228 * *
229 * Return value: The LLD worker *
230 * *
231 ******************************************************************************/
lld_get_worker_by_client(zbx_lld_manager_t * manager,zbx_ipc_client_t * client)232 static zbx_lld_worker_t *lld_get_worker_by_client(zbx_lld_manager_t *manager, zbx_ipc_client_t *client)
233 {
234 zbx_lld_worker_t **worker, worker_local, *plocal = &worker_local;
235
236 plocal->client = client;
237 worker = (zbx_lld_worker_t **)zbx_hashset_search(&manager->workers_client, &plocal);
238
239 if (NULL == worker)
240 {
241 THIS_SHOULD_NEVER_HAPPEN;
242 exit(EXIT_FAILURE);
243 }
244
245 return *worker;
246 }
247
248 /******************************************************************************
249 * *
250 * Function: lld_register_worker *
251 * *
252 * Purpose: registers worker *
253 * *
254 * Parameters: manager - [IN] the manager *
255 * client - [IN] the connected worker IPC client data *
256 * message - [IN] the received message *
257 * *
258 ******************************************************************************/
lld_register_worker(zbx_lld_manager_t * manager,zbx_ipc_client_t * client,const zbx_ipc_message_t * message)259 static void lld_register_worker(zbx_lld_manager_t *manager, zbx_ipc_client_t *client,
260 const zbx_ipc_message_t *message)
261 {
262 zbx_lld_worker_t *worker;
263 pid_t ppid;
264
265 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
266
267 memcpy(&ppid, message->data, sizeof(ppid));
268
269 if (ppid != getppid())
270 {
271 zbx_ipc_client_close(client);
272 zabbix_log(LOG_LEVEL_DEBUG, "refusing connection from foreign process");
273 }
274 else
275 {
276 if (manager->next_worker_index == manager->workers.values_num)
277 {
278 THIS_SHOULD_NEVER_HAPPEN;
279 exit(EXIT_FAILURE);
280 }
281
282 worker = (zbx_lld_worker_t *)manager->workers.values[manager->next_worker_index++];
283 worker->client = client;
284
285 zbx_hashset_insert(&manager->workers_client, &worker, sizeof(zbx_lld_worker_t *));
286 zbx_queue_ptr_push(&manager->free_workers, worker);
287 }
288
289 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
290 }
291
292 /******************************************************************************
293 * *
294 * Function: lld_queue_rule *
295 * *
296 * Purpose: queues LLD rule *
297 * *
298 * Parameters: manager - [IN] the LLD manager *
299 * rule - [IN] the LLD rule *
300 * *
301 ******************************************************************************/
lld_queue_rule(zbx_lld_manager_t * manager,zbx_lld_rule_t * rule)302 static void lld_queue_rule(zbx_lld_manager_t *manager, zbx_lld_rule_t *rule)
303 {
304 zbx_binary_heap_elem_t elem = {rule->hostid, rule};
305
306 zbx_binary_heap_insert(&manager->rule_queue, &elem);
307 }
308
309 /******************************************************************************
310 * *
311 * Function: lld_queue_request *
312 * *
313 * Purpose: queues low level discovery request *
314 * *
315 * Parameters: manager - [IN] the LLD manager *
316 * message - [IN] the message with LLD request *
317 * *
318 ******************************************************************************/
lld_queue_request(zbx_lld_manager_t * manager,const zbx_ipc_message_t * message)319 static void lld_queue_request(zbx_lld_manager_t *manager, const zbx_ipc_message_t *message)
320 {
321 zbx_uint64_t hostid;
322 zbx_lld_rule_t *rule;
323 zbx_lld_data_t *data;
324
325 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
326
327 data = (zbx_lld_data_t *)zbx_malloc(NULL, sizeof(zbx_lld_data_t));
328 data->next = NULL;
329
330 zbx_lld_deserialize_item_value(message->data, &data->itemid, &hostid, &data->value, &data->ts, &data->meta,
331 &data->lastlogsize, &data->mtime, &data->error);
332
333 if (NULL == (rule = zbx_hashset_search(&manager->rule_index, &hostid)))
334 {
335 zbx_lld_rule_t rule_local = {.hostid = hostid, .values_num = 0, .tail = data, .head = data};
336
337 data->prev = NULL;
338
339 rule = zbx_hashset_insert(&manager->rule_index, &rule_local, sizeof(rule_local));
340 lld_queue_rule(manager, rule);
341 }
342 else
343 {
344 if (0 == data->meta)
345 {
346 zbx_lld_data_t *data_ptr;
347
348 for (data_ptr = rule->tail; NULL != data_ptr; data_ptr = data_ptr->prev)
349 {
350 /* if there are multiple values then they should be different, check only last one */
351 if (data_ptr->itemid == data->itemid)
352 break;
353 }
354
355 if (NULL != data_ptr && 0 == zbx_strcmp_null(data->error, data_ptr->error) &&
356 0 == zbx_strcmp_null(data->value, data_ptr->value))
357 {
358 zabbix_log(LOG_LEVEL_DEBUG, "skip repeating values for discovery rule:" ZBX_FS_UI64,
359 data->itemid);
360
361 lld_data_free(data);
362 goto out;
363 }
364 }
365
366 data->prev = rule->tail;
367 rule->tail->next = data;
368 rule->tail = data;
369 }
370
371 zabbix_log(LOG_LEVEL_DEBUG, "queuing discovery rule:" ZBX_FS_UI64, data->itemid);
372
373 rule->values_num++;
374 manager->queued_num++;
375 out:
376 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
377 }
378
379 /******************************************************************************
380 * *
381 * Function: lld_process_next_request *
382 * *
383 * Purpose: processes next LLD request from queue *
384 * *
385 * Parameters: manager - [IN] the LLD manager *
386 * worker - [IN] the target worker *
387 * *
388 ******************************************************************************/
lld_process_next_request(zbx_lld_manager_t * manager,zbx_lld_worker_t * worker)389 static void lld_process_next_request(zbx_lld_manager_t *manager, zbx_lld_worker_t *worker)
390 {
391 zbx_binary_heap_elem_t *elem;
392 unsigned char *buf;
393 zbx_uint32_t buf_len;
394 zbx_lld_data_t *data;
395
396 elem = zbx_binary_heap_find_min(&manager->rule_queue);
397 worker->rule = (zbx_lld_rule_t *)elem->data;
398 zbx_binary_heap_remove_min(&manager->rule_queue);
399
400 data = worker->rule->head;
401 buf_len = zbx_lld_serialize_item_value(&buf, data->itemid, 0, data->value, &data->ts, data->meta,
402 data->lastlogsize, data->mtime, data->error);
403 zbx_ipc_client_send(worker->client, ZBX_IPC_LLD_TASK, buf, buf_len);
404 zbx_free(buf);
405 }
406
407 /******************************************************************************
408 * *
409 * Function: lld_process_queue *
410 * *
411 * Purpose: sends queued LLD rules to free workers *
412 * *
413 * Parameters: manager - [IN] the LLD manager *
414 * *
415 ******************************************************************************/
lld_process_queue(zbx_lld_manager_t * manager)416 static void lld_process_queue(zbx_lld_manager_t *manager)
417 {
418 zbx_lld_worker_t *worker;
419
420 while (SUCCEED != zbx_binary_heap_empty(&manager->rule_queue))
421 {
422 if (NULL == (worker = zbx_queue_ptr_pop(&manager->free_workers)))
423 break;
424
425 lld_process_next_request(manager, worker);
426 }
427 }
428
429 /******************************************************************************
430 * *
431 * Function: lld_process_result *
432 * *
433 * Purpose: processes LLD worker 'done' response *
434 * *
435 * Parameters: manager - [IN] the LLD manager *
436 * Parameters: client - [IN] the worker's IPC client connection *
437 * *
438 ******************************************************************************/
lld_process_result(zbx_lld_manager_t * manager,zbx_ipc_client_t * client)439 static void lld_process_result(zbx_lld_manager_t *manager, zbx_ipc_client_t *client)
440 {
441 zbx_lld_worker_t *worker;
442 zbx_lld_rule_t *rule;
443 zbx_lld_data_t *data;
444
445 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
446
447 worker = lld_get_worker_by_client(manager, client);
448
449 zabbix_log(LOG_LEVEL_DEBUG, "discovery rule:" ZBX_FS_UI64 " has been processed", worker->rule->head->itemid);
450
451 rule = worker->rule;
452 worker->rule = NULL;
453
454 data = rule->head;
455 rule->head = rule->head->next;
456
457 if (NULL == rule->head)
458 {
459 zbx_hashset_remove_direct(&manager->rule_index, rule);
460 }
461 else
462 {
463 rule->head->prev = NULL;
464 rule->values_num--;
465 lld_queue_rule(manager, rule);
466 }
467
468 lld_data_free(data);
469
470 if (SUCCEED != zbx_binary_heap_empty(&manager->rule_queue))
471 lld_process_next_request(manager, worker);
472 else
473 zbx_queue_ptr_push(&manager->free_workers, worker);
474
475 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
476 }
477
478 /******************************************************************************
479 * *
480 * Function: lld_process_diag_stats *
481 * *
482 * Purpose: processes external diagnostic statistics request *
483 * *
484 * Parameters: manager - [IN] the LLD manager *
485 * Parameters: client - [IN] the external IPC connection *
486 * *
487 ******************************************************************************/
lld_process_diag_stats(zbx_lld_manager_t * manager,zbx_ipc_client_t * client)488 static void lld_process_diag_stats(zbx_lld_manager_t *manager, zbx_ipc_client_t *client)
489 {
490 unsigned char *data;
491 zbx_uint32_t data_len;
492
493 data_len = zbx_lld_serialize_diag_stats(&data, manager->rule_index.num_data, manager->queued_num);
494 zbx_ipc_client_send(client, ZBX_IPC_LLD_DIAG_STATS_RESULT, data, data_len);
495 zbx_free(data);
496 }
497
498 /******************************************************************************
499 * *
500 * Function: lld_diag_item_compare_values_desc *
501 * *
502 * Purpose: sort lld manager cache item view by second value *
503 * (number of values) in descending order *
504 * *
505 ******************************************************************************/
lld_diag_item_compare_values_desc(const void * d1,const void * d2)506 static int lld_diag_item_compare_values_desc(const void *d1, const void *d2)
507 {
508 zbx_lld_rule_info_t *r1 = *(zbx_lld_rule_info_t **)d1;
509 zbx_lld_rule_info_t *r2 = *(zbx_lld_rule_info_t **)d2;
510
511 return r2->values_num - r1->values_num;
512 }
513
514 /******************************************************************************
515 * *
516 * Function: lld_process_diag_top *
517 * *
518 * Purpose: processes external top items request *
519 * *
520 * Parameters: manager - [IN] the manager *
521 * client - [IN] the connected worker IPC client data *
522 * message - [IN] the received message *
523 * *
524 ******************************************************************************/
lld_process_top_items(zbx_lld_manager_t * manager,zbx_ipc_client_t * client,const zbx_ipc_message_t * message)525 static void lld_process_top_items(zbx_lld_manager_t *manager, zbx_ipc_client_t *client,
526 const zbx_ipc_message_t *message)
527 {
528 int limit;
529 unsigned char *data;
530 zbx_uint32_t data_len;
531 zbx_vector_ptr_t view;
532 zbx_hashset_iter_t iter;
533 zbx_hashset_t rule_infos;
534 zbx_lld_rule_t *rule;
535
536 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
537
538 zbx_lld_deserialize_top_items_request(message->data, &limit);
539
540 zbx_hashset_create(&rule_infos, MAX(1000, (size_t)manager->rule_index.num_data), ZBX_DEFAULT_UINT64_HASH_FUNC,
541 ZBX_DEFAULT_UINT64_COMPARE_FUNC);
542 zbx_vector_ptr_create(&view);
543
544 zbx_hashset_iter_reset(&manager->rule_index, &iter);
545 while (NULL != (rule = (zbx_lld_rule_t *)zbx_hashset_iter_next(&iter)))
546 {
547 zbx_lld_data_t *data_ptr;
548
549 for (data_ptr = rule->head; NULL != data_ptr; data_ptr = data_ptr->next)
550 {
551 zbx_lld_rule_info_t *rule_info, rule_info_local = {.itemid = data_ptr->itemid};
552
553 rule_info = (zbx_lld_rule_info_t *)zbx_hashset_search(&rule_infos, &rule_info_local);
554
555 if (NULL == rule_info)
556 {
557 rule_info = (zbx_lld_rule_info_t *)zbx_hashset_insert(&rule_infos, &rule_info_local,
558 sizeof(zbx_lld_rule_info_t));
559 zbx_vector_ptr_append(&view, rule_info);
560 }
561
562 rule_info->values_num++;
563 }
564 }
565
566 zbx_vector_ptr_sort(&view, lld_diag_item_compare_values_desc);
567
568 data_len = zbx_lld_serialize_top_items_result(&data, (const zbx_lld_rule_info_t **)view.values,
569 MIN(limit, view.values_num));
570 zbx_ipc_client_send(client, ZBX_IPC_LLD_TOP_ITEMS_RESULT, data, data_len);
571
572 zbx_free(data);
573 zbx_vector_ptr_destroy(&view);
574 zbx_hashset_destroy(&rule_infos);
575
576 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
577 }
578
579 /******************************************************************************
580 * *
581 * Function: lld_manager_thread *
582 * *
583 * Purpose: main processing loop *
584 * *
585 ******************************************************************************/
ZBX_THREAD_ENTRY(lld_manager_thread,args)586 ZBX_THREAD_ENTRY(lld_manager_thread, args)
587 {
588 #define STAT_INTERVAL 5 /* if a process is busy and does not sleep then update status not faster than */
589 /* once in STAT_INTERVAL seconds */
590
591 zbx_ipc_service_t lld_service;
592 char *error = NULL;
593 zbx_ipc_client_t *client;
594 zbx_ipc_message_t *message;
595 double time_stat, time_now, sec, time_idle = 0;
596 zbx_lld_manager_t manager;
597 zbx_uint64_t processed_num = 0;
598 int ret;
599
600 process_type = ((zbx_thread_args_t *)args)->process_type;
601 server_num = ((zbx_thread_args_t *)args)->server_num;
602 process_num = ((zbx_thread_args_t *)args)->process_num;
603
604 zbx_setproctitle("%s #%d starting", get_process_type_string(process_type), process_num);
605
606 zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(program_type),
607 server_num, get_process_type_string(process_type), process_num);
608
609 if (FAIL == zbx_ipc_service_start(&lld_service, ZBX_IPC_SERVICE_LLD, &error))
610 {
611 zabbix_log(LOG_LEVEL_CRIT, "cannot start LLD manager service: %s", error);
612 zbx_free(error);
613 exit(EXIT_FAILURE);
614 }
615
616 lld_manager_init(&manager);
617
618 /* initialize statistics */
619 time_stat = zbx_time();
620
621 zbx_setproctitle("%s #%d started", get_process_type_string(process_type), process_num);
622
623 update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);
624
625 while (ZBX_IS_RUNNING())
626 {
627 time_now = zbx_time();
628
629 if (STAT_INTERVAL < time_now - time_stat)
630 {
631 zbx_setproctitle("%s #%d [processed " ZBX_FS_UI64 " LLD rules, idle " ZBX_FS_DBL
632 "sec during " ZBX_FS_DBL " sec]",
633 get_process_type_string(process_type), process_num, processed_num,
634 time_idle, time_now - time_stat);
635
636 time_stat = time_now;
637 time_idle = 0;
638 processed_num = 0;
639 }
640
641 update_selfmon_counter(ZBX_PROCESS_STATE_IDLE);
642 ret = zbx_ipc_service_recv(&lld_service, 1, &client, &message);
643 update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);
644
645 sec = zbx_time();
646 zbx_update_env(sec);
647
648 if (ZBX_IPC_RECV_IMMEDIATE != ret)
649 time_idle += sec - time_now;
650
651 if (NULL != message)
652 {
653 switch (message->code)
654 {
655 case ZBX_IPC_LLD_REGISTER:
656 lld_register_worker(&manager, client, message);
657 break;
658 case ZBX_IPC_LLD_REQUEST:
659 lld_queue_request(&manager, message);
660 lld_process_queue(&manager);
661 break;
662 case ZBX_IPC_LLD_DONE:
663 lld_process_result(&manager, client);
664 processed_num++;
665 manager.queued_num--;
666 break;
667 case ZBX_IPC_LLD_QUEUE:
668 zbx_ipc_client_send(client, message->code, (unsigned char *)&manager.queued_num,
669 sizeof(zbx_uint64_t));
670 break;
671 case ZBX_IPC_LLD_DIAG_STATS:
672 lld_process_diag_stats(&manager, client);
673 break;
674 case ZBX_IPC_LLD_TOP_ITEMS:
675 lld_process_top_items(&manager, client, message);
676 break;
677 }
678
679 zbx_ipc_message_free(message);
680 }
681
682 if (NULL != client)
683 zbx_ipc_client_release(client);
684 }
685
686 zbx_setproctitle("%s #%d [terminated]", get_process_type_string(process_type), process_num);
687
688 while (1)
689 zbx_sleep(SEC_PER_MIN);
690
691 zbx_ipc_service_close(&lld_service);
692 lld_manager_destroy(&manager);
693 }
694