1 /*
2 ** Zabbix
3 ** Copyright (C) 2001-2021 Zabbix SIA
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 **/
19
20 #include "common.h"
21
22 #include "dbcache.h"
23 #include "daemon.h"
24 #include "zbxself.h"
25 #include "log.h"
26 #include "zbxserver.h"
27 #include "sysinfo.h"
28 #include "zbxserialize.h"
29 #include "zbxipcservice.h"
30 #include "zbxlld.h"
31
32 #include "preprocessing.h"
33 #include "preproc_manager.h"
34 #include "zbxalgo.h"
35 #include "preproc_history.h"
36
37 extern unsigned char process_type, program_type;
38 extern int server_num, process_num, CONFIG_PREPROCESSOR_FORKS;
39
40 #define ZBX_PREPROCESSING_MANAGER_DELAY 1
41
42 #define ZBX_PREPROC_PRIORITY_NONE 0
43 #define ZBX_PREPROC_PRIORITY_FIRST 1
44
45 typedef enum
46 {
47 REQUEST_STATE_QUEUED = 0, /* requires preprocessing */
48 REQUEST_STATE_PROCESSING = 1, /* is being preprocessed */
49 REQUEST_STATE_DONE = 2, /* value is set, waiting for flush */
50 REQUEST_STATE_PENDING = 3 /* value requires preprocessing, */
51 /* but is waiting on other request to complete */
52 }
53 zbx_preprocessing_states_t;
54
55 /* preprocessing request */
56 typedef struct preprocessing_request
57 {
58 zbx_preprocessing_states_t state; /* request state */
59 struct preprocessing_request *pending; /* the request waiting on this request to complete */
60 zbx_preproc_item_value_t value; /* unpacked item value */
61 zbx_preproc_op_t *steps; /* preprocessing steps */
62 int steps_num; /* number of preprocessing steps */
63 unsigned char value_type; /* value type from configuration */
64 /* at the beginning of preprocessing queue */
65 }
66 zbx_preprocessing_request_t;
67
68 /* preprocessing worker data */
69 typedef struct
70 {
71 zbx_ipc_client_t *client; /* the connected preprocessing worker client */
72 void *task; /* the current task data */
73 }
74 zbx_preprocessing_worker_t;
75
76 /* item link index */
77 typedef struct
78 {
79 zbx_uint64_t itemid; /* item id */
80 zbx_list_item_t *queue_item; /* queued item */
81 }
82 zbx_item_link_t;
83
84 /* direct request to be forwarded to worker, bypassing the preprocessing queue */
85 typedef struct
86 {
87 zbx_ipc_client_t *client; /* the IPC client sending forward message to worker */
88 zbx_ipc_message_t message;
89 }
90 zbx_preprocessing_direct_request_t;
91
92 /* preprocessing manager data */
93 typedef struct
94 {
95 zbx_preprocessing_worker_t *workers; /* preprocessing worker array */
96 int worker_count; /* preprocessing worker count */
97 zbx_list_t queue; /* queue of item values */
98 zbx_hashset_t item_config; /* item configuration L2 cache */
99 zbx_hashset_t history_cache; /* item value history cache */
100 zbx_hashset_t linked_items; /* linked items placed in queue */
101 int cache_ts; /* cache timestamp */
102 zbx_uint64_t processed_num; /* processed value counter */
103 zbx_uint64_t queued_num; /* queued value counter */
104 zbx_uint64_t preproc_num; /* queued values with preprocessing steps */
105 zbx_list_iterator_t priority_tail; /* iterator to the last queued priority item */
106
107 zbx_list_t direct_queue; /* Queue of external requests that have to be */
108 /* forwarded to workers for preprocessing. */
109 }
110 zbx_preprocessing_manager_t;
111
112 static void preprocessor_enqueue_dependent(zbx_preprocessing_manager_t *manager,
113 zbx_preproc_item_value_t *source_value, zbx_list_item_t *master);
114
115 /* cleanup functions */
116
preproc_item_clear(zbx_preproc_item_t * item)117 static void preproc_item_clear(zbx_preproc_item_t *item)
118 {
119 int i;
120
121 zbx_free(item->dep_itemids);
122
123 for (i = 0; i < item->preproc_ops_num; i++)
124 {
125 zbx_free(item->preproc_ops[i].params);
126 zbx_free(item->preproc_ops[i].error_handler_params);
127 }
128
129 zbx_free(item->preproc_ops);
130 }
131
request_free_steps(zbx_preprocessing_request_t * request)132 static void request_free_steps(zbx_preprocessing_request_t *request)
133 {
134 while (0 < request->steps_num)
135 {
136 request->steps_num--;
137 zbx_free(request->steps[request->steps_num].params);
138 zbx_free(request->steps[request->steps_num].error_handler_params);
139 }
140
141 zbx_free(request->steps);
142 }
143
144 /******************************************************************************
145 * *
146 * Function: preprocessor_sync_configuration *
147 * *
148 * Purpose: synchronize preprocessing manager with configuration cache data *
149 * *
150 * Parameters: manager - [IN] the manager to be synchronized *
151 * *
152 ******************************************************************************/
preprocessor_sync_configuration(zbx_preprocessing_manager_t * manager)153 static void preprocessor_sync_configuration(zbx_preprocessing_manager_t *manager)
154 {
155 zbx_hashset_iter_t iter;
156 int ts;
157 zbx_preproc_history_t *vault;
158 zbx_preproc_item_t *item;
159
160 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
161
162 ts = manager->cache_ts;
163 DCconfig_get_preprocessable_items(&manager->item_config, &manager->cache_ts);
164
165 if (ts != manager->cache_ts)
166 {
167 /* drop items with removed preprocessing steps from preprocessing history cache */
168 zbx_hashset_iter_reset(&manager->history_cache, &iter);
169 while (NULL != (vault = (zbx_preproc_history_t *)zbx_hashset_iter_next(&iter)))
170 {
171 if (NULL != zbx_hashset_search(&manager->item_config, &vault->itemid))
172 continue;
173
174 zbx_vector_ptr_clear_ext(&vault->history, (zbx_clean_func_t)zbx_preproc_op_history_free);
175 zbx_vector_ptr_destroy(&vault->history);
176 zbx_hashset_iter_remove(&iter);
177 }
178
179 /* reset preprocessing history for an item if its preprocessing step was modified */
180 zbx_hashset_iter_reset(&manager->item_config, &iter);
181 while (NULL != (item = (zbx_preproc_item_t *)zbx_hashset_iter_next(&iter)))
182 {
183 if (ts >= item->update_time)
184 continue;
185
186 if (NULL == (vault = (zbx_preproc_history_t *)zbx_hashset_search(&manager->history_cache,
187 &item->itemid)))
188 {
189 continue;
190 }
191
192 zbx_vector_ptr_clear_ext(&vault->history, (zbx_clean_func_t)zbx_preproc_op_history_free);
193 zbx_vector_ptr_destroy(&vault->history);
194 zbx_hashset_remove_direct(&manager->history_cache, vault);
195 }
196 }
197
198 zabbix_log(LOG_LEVEL_DEBUG, "End of %s() item config size: %d, history cache size: %d", __func__,
199 manager->item_config.num_data, manager->history_cache.num_data);
200 }
201
202 /******************************************************************************
203 * *
204 * Function: preprocessor_create_task *
205 * *
206 * Purpose: create preprocessing task for request *
207 * *
208 * Parameters: manager - [IN] preprocessing manager *
209 * request - [IN] preprocessing request *
210 * task - [OUT] preprocessing task data *
211 * *
212 ******************************************************************************/
preprocessor_create_task(zbx_preprocessing_manager_t * manager,zbx_preprocessing_request_t * request,unsigned char ** task)213 static zbx_uint32_t preprocessor_create_task(zbx_preprocessing_manager_t *manager,
214 zbx_preprocessing_request_t *request, unsigned char **task)
215 {
216 zbx_variant_t value;
217 zbx_preproc_history_t *vault;
218 zbx_vector_ptr_t *phistory;
219
220 if (ISSET_LOG(request->value.result_ptr->result))
221 zbx_variant_set_str(&value, request->value.result_ptr->result->log->value);
222 else if (ISSET_UI64(request->value.result_ptr->result))
223 zbx_variant_set_ui64(&value, request->value.result_ptr->result->ui64);
224 else if (ISSET_DBL(request->value.result_ptr->result))
225 zbx_variant_set_dbl(&value, request->value.result_ptr->result->dbl);
226 else if (ISSET_STR(request->value.result_ptr->result))
227 zbx_variant_set_str(&value, request->value.result_ptr->result->str);
228 else if (ISSET_TEXT(request->value.result_ptr->result))
229 zbx_variant_set_str(&value, request->value.result_ptr->result->text);
230 else
231 THIS_SHOULD_NEVER_HAPPEN;
232
233 if (NULL != (vault = (zbx_preproc_history_t *)zbx_hashset_search(&manager->history_cache,
234 &request->value.itemid)))
235 {
236 phistory = &vault->history;
237 }
238 else
239 phistory = NULL;
240
241
242 return zbx_preprocessor_pack_task(task, request->value.itemid, request->value_type, request->value.ts, &value,
243 phistory, request->steps, request->steps_num);
244 }
245
246 /******************************************************************************
247 * *
248 * Function: preprocessor_set_request_state_done *
249 * *
250 * Purpose: set request state to done and handle linked items *
251 * *
252 * Parameters: manager - [IN] preprocessing manager *
253 * request - [IN] preprocessing request *
254 * queue_item - [IN] queued item *
255 * *
256 ******************************************************************************/
preprocessor_set_request_state_done(zbx_preprocessing_manager_t * manager,zbx_preprocessing_request_t * request,const zbx_list_item_t * queue_item)257 static void preprocessor_set_request_state_done(zbx_preprocessing_manager_t *manager,
258 zbx_preprocessing_request_t *request, const zbx_list_item_t *queue_item)
259 {
260 zbx_item_link_t *index;
261
262 request->state = REQUEST_STATE_DONE;
263
264 /* value processed - the pending value can now be processed */
265 if (NULL != request->pending)
266 request->pending->state = REQUEST_STATE_QUEUED;
267
268 if (NULL != (index = (zbx_item_link_t *)zbx_hashset_search(&manager->linked_items, &request->value.itemid)) &&
269 queue_item == index->queue_item)
270 {
271 zbx_hashset_remove_direct(&manager->linked_items, index);
272 }
273 }
274
275 /******************************************************************************
276 * *
277 * Function: preprocessor_get_next_task *
278 * *
279 * Purpose: gets next task to be sent to worker *
280 * *
281 * Parameters: manager - [IN] preprocessing manager *
282 * message - [OUT] the serialized task to be sent *
283 * *
284 * Return value: pointer to the task object *
285 * *
286 ******************************************************************************/
preprocessor_get_next_task(zbx_preprocessing_manager_t * manager,zbx_ipc_message_t * message)287 static void *preprocessor_get_next_task(zbx_preprocessing_manager_t *manager, zbx_ipc_message_t *message)
288 {
289 zbx_list_iterator_t iterator;
290 zbx_preprocessing_request_t *request = NULL;
291 void *task = NULL;
292 zbx_preprocessing_direct_request_t *direct_request;
293
294 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
295
296 if (SUCCEED == zbx_list_pop(&manager->direct_queue, (void **)&direct_request))
297 {
298 *message = direct_request->message;
299 zbx_ipc_message_init(&direct_request->message);
300 task = direct_request;
301 goto out;
302 }
303
304 zbx_list_iterator_init(&manager->queue, &iterator);
305 while (SUCCEED == zbx_list_iterator_next(&iterator))
306 {
307 zbx_list_iterator_peek(&iterator, (void **)&request);
308
309 if (REQUEST_STATE_QUEUED != request->state)
310 continue;
311
312 if (ITEM_STATE_NOTSUPPORTED == request->value.state)
313 {
314 zbx_preproc_history_t *vault;
315
316 if (NULL != (vault = (zbx_preproc_history_t *) zbx_hashset_search(&manager->history_cache,
317 &request->value.itemid)))
318 {
319 zbx_vector_ptr_clear_ext(&vault->history,
320 (zbx_clean_func_t) zbx_preproc_op_history_free);
321 zbx_vector_ptr_destroy(&vault->history);
322 zbx_hashset_remove_direct(&manager->history_cache, vault);
323 }
324
325 preprocessor_set_request_state_done(manager, request, iterator.current);
326 continue;
327 }
328
329 task = iterator.current;
330 request->state = REQUEST_STATE_PROCESSING;
331 message->code = ZBX_IPC_PREPROCESSOR_REQUEST;
332 message->size = preprocessor_create_task(manager, request, &message->data);
333 request_free_steps(request);
334 break;
335 }
336 out:
337 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
338
339 return task;
340 }
341
preproc_item_result_free(zbx_preproc_item_value_t * value)342 static void preproc_item_result_free(zbx_preproc_item_value_t *value)
343 {
344 if (0 == --(value->result_ptr->refcount))
345 {
346 if (NULL != value->result_ptr->result)
347 {
348 free_result(value->result_ptr->result);
349 zbx_free(value->result_ptr->result);
350 }
351 zbx_free(value->result_ptr);
352 }
353 else
354 value->result_ptr = NULL;
355 }
356
357 /******************************************************************************
358 * *
359 * Function: preprocessor_get_worker_by_client *
360 * *
361 * Purpose: get worker data by IPC client *
362 * *
363 * Parameters: manager - [IN] preprocessing manager *
364 * client - [IN] IPC client *
365 * *
366 * Return value: pointer to the worker data *
367 * *
368 ******************************************************************************/
preprocessor_get_worker_by_client(zbx_preprocessing_manager_t * manager,zbx_ipc_client_t * client)369 static zbx_preprocessing_worker_t *preprocessor_get_worker_by_client(zbx_preprocessing_manager_t *manager,
370 zbx_ipc_client_t *client)
371 {
372 int i;
373 zbx_preprocessing_worker_t *worker = NULL;
374
375 for (i = 0; i < manager->worker_count; i++)
376 {
377 if (client == manager->workers[i].client)
378 {
379 worker = &manager->workers[i];
380 break;
381 }
382 }
383
384 if (NULL == worker)
385 {
386 THIS_SHOULD_NEVER_HAPPEN;
387 exit(EXIT_FAILURE);
388 }
389
390 return worker;
391 }
392
393 /******************************************************************************
394 * *
395 * Function: preprocessor_get_free_worker *
396 * *
397 * Purpose: get worker without active preprocessing task *
398 * *
399 * Parameters: manager - [IN] preprocessing manager *
400 * *
401 * Return value: pointer to the worker data or NULL if none *
402 * *
403 ******************************************************************************/
preprocessor_get_free_worker(zbx_preprocessing_manager_t * manager)404 static zbx_preprocessing_worker_t *preprocessor_get_free_worker(zbx_preprocessing_manager_t *manager)
405 {
406 int i;
407
408 for (i = 0; i < manager->worker_count; i++)
409 {
410 if (NULL == manager->workers[i].task)
411 return &manager->workers[i];
412 }
413
414 return NULL;
415 }
416
417 /******************************************************************************
418 * *
419 * Function: preprocessor_assign_tasks *
420 * *
421 * Purpose: assign available queued preprocessing tasks to free workers *
422 * *
423 * Parameters: manager - [IN] preprocessing manager *
424 * *
425 ******************************************************************************/
preprocessor_assign_tasks(zbx_preprocessing_manager_t * manager)426 static void preprocessor_assign_tasks(zbx_preprocessing_manager_t *manager)
427 {
428 zbx_preprocessing_worker_t *worker;
429 void *data;
430 zbx_ipc_message_t message;
431
432 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
433
434 while (NULL != (worker = preprocessor_get_free_worker(manager)) &&
435 NULL != (data = preprocessor_get_next_task(manager, &message)))
436 {
437 if (FAIL == zbx_ipc_client_send(worker->client, message.code, message.data, message.size))
438 {
439 zabbix_log(LOG_LEVEL_CRIT, "cannot send data to preprocessing worker");
440 exit(EXIT_FAILURE);
441 }
442
443 worker->task = data;
444 zbx_ipc_message_clean(&message);
445 }
446
447 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
448 }
449
450 /******************************************************************************
451 * *
452 * Function: preproc_item_value_clear *
453 * *
454 * Purpose: frees resources allocated by preprocessor item value *
455 * *
456 * Parameters: value - [IN] value to be freed *
457 * *
458 ******************************************************************************/
preproc_item_value_clear(zbx_preproc_item_value_t * value)459 static void preproc_item_value_clear(zbx_preproc_item_value_t *value)
460 {
461 zbx_free(value->error);
462 preproc_item_result_free(value);
463 zbx_free(value->ts);
464 }
465
466 /******************************************************************************
467 * *
468 * Function: preprocessor_free_request *
469 * *
470 * Purpose: free preprocessing request *
471 * *
472 * Parameters: request - [IN] request data to be freed *
473 * *
474 ******************************************************************************/
preprocessor_free_request(zbx_preprocessing_request_t * request)475 static void preprocessor_free_request(zbx_preprocessing_request_t *request)
476 {
477 preproc_item_value_clear(&request->value);
478 request_free_steps(request);
479 zbx_free(request);
480 }
481
482 /******************************************************************************
483 * *
484 * Function: preprocessor_free_direct_request *
485 * *
486 * Purpose: free preprocessing direct request *
487 * *
488 * Parameters: forward - [IN] forward data to be freed *
489 * *
490 ******************************************************************************/
preprocessor_free_direct_request(zbx_preprocessing_direct_request_t * direct_request)491 static void preprocessor_free_direct_request(zbx_preprocessing_direct_request_t *direct_request)
492 {
493 zbx_ipc_client_release(direct_request->client);
494 zbx_ipc_message_clean(&direct_request->message);
495 zbx_free(direct_request);
496 }
497
498 /******************************************************************************
499 * *
500 * Function: preprocessor_flush_value *
501 * *
502 * Purpose: add new value to the local history cache or send to LLD manager *
503 * *
504 * Parameters: value - [IN] value to be added or sent *
505 * *
506 ******************************************************************************/
preprocessor_flush_value(const zbx_preproc_item_value_t * value)507 static void preprocessor_flush_value(const zbx_preproc_item_value_t *value)
508 {
509 if (0 == (value->item_flags & ZBX_FLAG_DISCOVERY_RULE) || 0 == (program_type & ZBX_PROGRAM_TYPE_SERVER))
510 {
511 dc_add_history(value->itemid, value->item_value_type, value->item_flags, value->result_ptr->result,
512 value->ts, value->state, value->error);
513 }
514 else
515 zbx_lld_process_agent_result(value->itemid, value->hostid, value->result_ptr->result, value->ts,
516 value->error);
517 }
518
519 /******************************************************************************
520 * *
521 * Function: preprocessing_flush_queue *
522 * *
523 * Purpose: add all sequential processed values from beginning of the queue *
524 * to the local history cache *
525 * *
526 * Parameters: manager - [IN] preprocessing manager *
527 * *
528 ******************************************************************************/
preprocessing_flush_queue(zbx_preprocessing_manager_t * manager)529 static void preprocessing_flush_queue(zbx_preprocessing_manager_t *manager)
530 {
531 zbx_preprocessing_request_t *request;
532 zbx_list_iterator_t iterator;
533
534 zbx_list_iterator_init(&manager->queue, &iterator);
535 while (SUCCEED == zbx_list_iterator_next(&iterator))
536 {
537 zbx_list_iterator_peek(&iterator, (void **)&request);
538
539 if (REQUEST_STATE_DONE != request->state)
540 break;
541
542 preprocessor_flush_value(&request->value);
543 preprocessor_free_request(request);
544
545 if (SUCCEED == zbx_list_iterator_equal(&iterator, &manager->priority_tail))
546 zbx_list_iterator_clear(&manager->priority_tail);
547
548 zbx_list_pop(&manager->queue, NULL);
549
550 manager->processed_num++;
551 manager->queued_num--;
552 }
553 }
554
555 /******************************************************************************
556 * *
557 * Function: preprocessor_link_items *
558 * *
559 * Purpose: create relation between item values within value queue *
560 * *
561 * Parameters: manager - [IN] preprocessing manager *
562 * enqueued_at - [IN] position in value queue *
563 * item - [IN] item configuration data *
564 * *
565 ******************************************************************************/
preprocessor_link_items(zbx_preprocessing_manager_t * manager,zbx_list_item_t * enqueued_at,zbx_preproc_item_t * item)566 static void preprocessor_link_items(zbx_preprocessing_manager_t *manager, zbx_list_item_t *enqueued_at,
567 zbx_preproc_item_t *item)
568 {
569 int i;
570 zbx_preprocessing_request_t *request, *dep_request;
571 zbx_item_link_t *index, index_local;
572 zbx_preproc_op_t *op;
573
574 for (i = 0; i < item->preproc_ops_num; i++)
575 {
576 op = &item->preproc_ops[i];
577
578 if (ZBX_PREPROC_DELTA_VALUE == op->type || ZBX_PREPROC_DELTA_SPEED == op->type)
579 break;
580
581 if (ZBX_PREPROC_THROTTLE_VALUE == op->type || ZBX_PREPROC_THROTTLE_TIMED_VALUE == op->type)
582 break;
583 }
584
585 if (i != item->preproc_ops_num)
586 {
587 /* existing linked item*/
588 if (NULL != (index = (zbx_item_link_t *)zbx_hashset_search(&manager->linked_items, &item->itemid)))
589 {
590 dep_request = (zbx_preprocessing_request_t *)(enqueued_at->data);
591 request = (zbx_preprocessing_request_t *)(index->queue_item->data);
592
593 if (REQUEST_STATE_DONE != request->state)
594 {
595 request->pending = dep_request;
596 dep_request->state = REQUEST_STATE_PENDING;
597 }
598
599 index->queue_item = enqueued_at;
600 }
601 else
602 {
603 index_local.itemid = item->itemid;
604 index_local.queue_item = enqueued_at;
605
606 zbx_hashset_insert(&manager->linked_items, &index_local, sizeof(zbx_item_link_t));
607 }
608 }
609 }
610
611 /******************************************************************************
612 * *
613 * Function: preprocessor_copy_value *
614 * *
615 * Purpose: create a copy of existing item value *
616 * *
617 * Parameters: target - [OUT] created copy *
618 * source - [IN] value to be copied *
619 * *
620 ******************************************************************************/
preprocessor_copy_value(zbx_preproc_item_value_t * target,zbx_preproc_item_value_t * source)621 static void preprocessor_copy_value(zbx_preproc_item_value_t *target, zbx_preproc_item_value_t *source)
622 {
623 memcpy(target, source, sizeof(zbx_preproc_item_value_t));
624
625 if (NULL != source->error)
626 target->error = zbx_strdup(NULL, source->error);
627
628 if (NULL != source->ts)
629 {
630 target->ts = (zbx_timespec_t *)zbx_malloc(NULL, sizeof(zbx_timespec_t));
631 memcpy(target->ts, source->ts, sizeof(zbx_timespec_t));
632 }
633 }
634
635 /******************************************************************************
636 * *
637 * Function: preprocessor_enqueue *
638 * *
639 * Purpose: enqueue preprocessing request *
640 * *
641 * Parameters: manage - [IN] preprocessing manager *
642 * value - [IN] item value *
643 * master - [IN] request should be enqueued after this item *
644 * (NULL for the end of the queue) *
645 * *
646 ******************************************************************************/
preprocessor_enqueue(zbx_preprocessing_manager_t * manager,zbx_preproc_item_value_t * value,zbx_list_item_t * master)647 static void preprocessor_enqueue(zbx_preprocessing_manager_t *manager, zbx_preproc_item_value_t *value,
648 zbx_list_item_t *master)
649 {
650 zbx_preprocessing_request_t *request;
651 zbx_preproc_item_t *item, item_local;
652 zbx_list_item_t *enqueued_at;
653 int i;
654 zbx_preprocessing_states_t state;
655 unsigned char priority = ZBX_PREPROC_PRIORITY_NONE;
656
657 zabbix_log(LOG_LEVEL_DEBUG, "In %s() itemid: " ZBX_FS_UI64, __func__, value->itemid);
658
659 item_local.itemid = value->itemid;
660 item = (zbx_preproc_item_t *)zbx_hashset_search(&manager->item_config, &item_local);
661
662 /* override priority based on item type */
663 if (NULL != item && ITEM_TYPE_INTERNAL == item->type)
664 priority = ZBX_PREPROC_PRIORITY_FIRST;
665
666 if (NULL == item || 0 == item->preproc_ops_num || (ITEM_STATE_NOTSUPPORTED != value->state &&
667 (NULL == value->result_ptr->result || 0 == ISSET_VALUE(value->result_ptr->result))))
668 {
669 state = REQUEST_STATE_DONE;
670
671 if (NULL == manager->queue.head)
672 {
673 /* queue is empty and item is done, it can be flushed */
674 preprocessor_flush_value(value);
675 manager->processed_num++;
676 preprocessor_enqueue_dependent(manager, value, NULL);
677 preproc_item_value_clear(value);
678
679 goto out;
680 }
681 }
682 else
683 state = REQUEST_STATE_QUEUED;
684
685 request = (zbx_preprocessing_request_t *)zbx_malloc(NULL, sizeof(zbx_preprocessing_request_t));
686 memset(request, 0, sizeof(zbx_preprocessing_request_t));
687 memcpy(&request->value, value, sizeof(zbx_preproc_item_value_t));
688 request->state = state;
689
690 if (REQUEST_STATE_QUEUED == state && ITEM_STATE_NOTSUPPORTED != value->state)
691 {
692 request->value_type = item->value_type;
693 request->steps = (zbx_preproc_op_t *)zbx_malloc(NULL, sizeof(zbx_preproc_op_t) * item->preproc_ops_num);
694 request->steps_num = item->preproc_ops_num;
695
696 for (i = 0; i < item->preproc_ops_num; i++)
697 {
698 request->steps[i].type = item->preproc_ops[i].type;
699 request->steps[i].params = zbx_strdup(NULL, item->preproc_ops[i].params);
700 request->steps[i].error_handler = item->preproc_ops[i].error_handler;
701 request->steps[i].error_handler_params = zbx_strdup(NULL,
702 item->preproc_ops[i].error_handler_params);
703 }
704
705 manager->preproc_num++;
706 }
707
708 /* priority items are enqueued at the beginning of the line */
709 if (NULL == master && ZBX_PREPROC_PRIORITY_FIRST == priority)
710 {
711 if (SUCCEED == zbx_list_iterator_isset(&manager->priority_tail))
712 {
713 /* insert after the last internal item */
714 zbx_list_insert_after(&manager->queue, manager->priority_tail.current, request, &enqueued_at);
715 zbx_list_iterator_update(&manager->priority_tail);
716 }
717 else
718 {
719 /* no internal items in queue, insert at the beginning */
720 zbx_list_prepend(&manager->queue, request, &enqueued_at);
721 zbx_list_iterator_init(&manager->queue, &manager->priority_tail);
722 }
723
724 zbx_list_iterator_next(&manager->priority_tail);
725 }
726 else
727 {
728 zbx_list_insert_after(&manager->queue, master, request, &enqueued_at);
729 zbx_list_iterator_update(&manager->priority_tail);
730
731 /* move internal item tail position if we are inserting after last internal item */
732 if (NULL != master && master == manager->priority_tail.current)
733 zbx_list_iterator_next(&manager->priority_tail);
734 }
735
736 if (REQUEST_STATE_QUEUED == request->state)
737 preprocessor_link_items(manager, enqueued_at, item);
738
739 /* if no preprocessing is needed, dependent items are enqueued */
740 if (REQUEST_STATE_DONE == request->state)
741 preprocessor_enqueue_dependent(manager, value, enqueued_at);
742
743 manager->queued_num++;
744 out:
745 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
746 }
747
748 /******************************************************************************
749 * *
750 * Function: preprocessor_enqueue_dependent *
751 * *
752 * Purpose: enqueue dependent items (if any) *
753 * *
754 * Parameters: manager - [IN] preprocessing manager *
755 * source_value - [IN] master item value *
756 * master - [IN] dependent item should be enqueued after *
757 * this item *
758 * *
759 ******************************************************************************/
preprocessor_enqueue_dependent(zbx_preprocessing_manager_t * manager,zbx_preproc_item_value_t * source_value,zbx_list_item_t * master)760 static void preprocessor_enqueue_dependent(zbx_preprocessing_manager_t *manager,
761 zbx_preproc_item_value_t *source_value, zbx_list_item_t *master)
762 {
763 int i;
764 zbx_preproc_item_t *item, item_local;
765 zbx_preproc_item_value_t value;
766
767 zabbix_log(LOG_LEVEL_DEBUG, "In %s() itemid: " ZBX_FS_UI64, __func__, source_value->itemid);
768
769 if (NULL != source_value->result_ptr->result && ISSET_VALUE(source_value->result_ptr->result))
770 {
771 item_local.itemid = source_value->itemid;
772 if (NULL != (item = (zbx_preproc_item_t *)zbx_hashset_search(&manager->item_config, &item_local)) &&
773 0 != item->dep_itemids_num)
774 {
775 /* result is shared between all dependent items, new result will be created after preprocessing */
776 source_value->result_ptr->refcount += item->dep_itemids_num;
777
778 for (i = item->dep_itemids_num - 1; i >= 0; i--)
779 {
780 preprocessor_copy_value(&value, source_value);
781 value.itemid = item->dep_itemids[i].first;
782 value.item_flags = item->dep_itemids[i].second;
783 preprocessor_enqueue(manager, &value, master);
784 }
785
786 preprocessor_assign_tasks(manager);
787 preprocessing_flush_queue(manager);
788 }
789 }
790
791 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
792 }
793
794 /******************************************************************************
795 * *
796 * Function: preprocessor_add_request *
797 * *
798 * Purpose: handle new preprocessing request *
799 * *
800 * Parameters: manager - [IN] preprocessing manager *
801 * message - [IN] packed preprocessing request *
802 * *
803 ******************************************************************************/
preprocessor_add_request(zbx_preprocessing_manager_t * manager,zbx_ipc_message_t * message)804 static void preprocessor_add_request(zbx_preprocessing_manager_t *manager, zbx_ipc_message_t *message)
805 {
806 zbx_uint32_t offset = 0;
807 zbx_preproc_item_value_t value;
808
809 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
810
811 preprocessor_sync_configuration(manager);
812
813 while (offset < message->size)
814 {
815 offset += zbx_preprocessor_unpack_value(&value, message->data + offset);
816 preprocessor_enqueue(manager, &value, NULL);
817 }
818
819 preprocessor_assign_tasks(manager);
820 preprocessing_flush_queue(manager);
821
822 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
823 }
824
825 /******************************************************************************
826 * *
827 * Function: preprocessor_add_test_request *
828 * *
829 * Purpose: handle new preprocessing test request *
830 * *
831 * Parameters: manager - [IN] preprocessing manager *
832 * message - [IN] packed preprocessing request *
833 * *
834 ******************************************************************************/
preprocessor_add_test_request(zbx_preprocessing_manager_t * manager,zbx_ipc_client_t * client,zbx_ipc_message_t * message)835 static void preprocessor_add_test_request(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
836 zbx_ipc_message_t *message)
837 {
838 zbx_preprocessing_direct_request_t *direct_request;
839
840 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
841
842 zbx_ipc_client_addref(client);
843 direct_request = zbx_malloc(NULL, sizeof(zbx_preprocessing_direct_request_t));
844 direct_request->client = client;
845 zbx_ipc_message_copy(&direct_request->message, message);
846 zbx_list_append(&manager->direct_queue, direct_request, NULL);
847
848 preprocessor_assign_tasks(manager);
849 preprocessing_flush_queue(manager);
850
851 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
852 }
853
854 /******************************************************************************
855 * *
856 * Function: create_result_with_meta *
857 * *
858 * Purpose: create new result and copy meta information from previous result *
859 * *
860 * Parameters: result_old - [IN] result that can contain meta information *
861 * *
862 * Return value: pointer newly allocated result *
863 * *
864 ******************************************************************************/
create_result_with_meta(const AGENT_RESULT * result_old)865 static AGENT_RESULT *create_result_with_meta(const AGENT_RESULT *result_old)
866 {
867 AGENT_RESULT *result;
868
869 result = zbx_malloc(NULL, sizeof(AGENT_RESULT));
870
871 init_result(result);
872
873 if (0 == ISSET_META(result_old))
874 return result;
875
876 result->type = AR_META;
877 result->lastlogsize = result_old->lastlogsize;
878 result->mtime = result_old->mtime;
879
880 return result;
881 }
882
883 /******************************************************************************
884 * *
885 * Function: preprocessor_set_variant_result *
886 * *
887 * Purpose: get result data from variant and error message *
888 * *
889 * Parameters: request - [IN/OUT] preprocessing request *
890 * value - [IN] variant value *
891 * error - [IN] error message (if any) *
892 * *
893 ******************************************************************************/
preprocessor_set_variant_result(zbx_preprocessing_request_t * request,zbx_variant_t * value,char * error)894 static int preprocessor_set_variant_result(zbx_preprocessing_request_t *request,
895 zbx_variant_t *value, char *error)
896 {
897 int type, ret = FAIL;
898 zbx_log_t *log;
899
900 if (NULL != error)
901 {
902 /* on error item state is set to ITEM_STATE_NOTSUPPORTED */
903 request->value.state = ITEM_STATE_NOTSUPPORTED;
904 request->value.error = error;
905 ret = FAIL;
906
907 goto out;
908 }
909
910 if (ZBX_VARIANT_NONE == value->type)
911 {
912 AGENT_RESULT *result;
913
914 result = create_result_with_meta(request->value.result_ptr->result);
915
916 preproc_item_result_free(&request->value);
917 request->value.result_ptr = (zbx_result_ptr_t *)zbx_malloc(NULL, sizeof(zbx_result_ptr_t));
918 request->value.result_ptr->refcount = 1;
919 request->value.result_ptr->result = result;
920
921 ret = FAIL;
922
923 goto out;
924 }
925
926 switch (request->value_type)
927 {
928 case ITEM_VALUE_TYPE_FLOAT:
929 type = ZBX_VARIANT_DBL;
930 break;
931 case ITEM_VALUE_TYPE_UINT64:
932 type = ZBX_VARIANT_UI64;
933 break;
934 default:
935 /* ITEM_VALUE_TYPE_STR, ITEM_VALUE_TYPE_TEXT, ITEM_VALUE_TYPE_LOG */
936 type = ZBX_VARIANT_STR;
937 }
938
939 if (FAIL != (ret = zbx_variant_convert(value, type)))
940 {
941 /* old result is shared between dependent and master items, it cannot be modified, create new result */
942 AGENT_RESULT *result;
943
944 result = create_result_with_meta(request->value.result_ptr->result);
945
946 switch (request->value_type)
947 {
948 case ITEM_VALUE_TYPE_FLOAT:
949 SET_DBL_RESULT(result, value->data.dbl);
950 break;
951 case ITEM_VALUE_TYPE_STR:
952 SET_STR_RESULT(result, value->data.str);
953 break;
954 case ITEM_VALUE_TYPE_LOG:
955 log = (zbx_log_t *)zbx_malloc(NULL, sizeof(zbx_log_t));
956
957 if (ISSET_LOG(request->value.result_ptr->result))
958 {
959 *log = *request->value.result_ptr->result->log;
960 if (NULL != log->source)
961 log->source = zbx_strdup(NULL, log->source);
962 }
963 else
964 memset(log, 0, sizeof(zbx_log_t));
965
966 log->value = value->data.str;
967 SET_LOG_RESULT(result, log);
968 break;
969 case ITEM_VALUE_TYPE_UINT64:
970 SET_UI64_RESULT(result, value->data.ui64);
971 break;
972 case ITEM_VALUE_TYPE_TEXT:
973 SET_TEXT_RESULT(result, value->data.str);
974 break;
975 }
976
977 preproc_item_result_free(&request->value);
978 request->value.result_ptr = (zbx_result_ptr_t *)zbx_malloc(NULL, sizeof(zbx_result_ptr_t));
979 request->value.result_ptr->refcount = 1;
980 request->value.result_ptr->result = result;
981
982 zbx_variant_set_none(value);
983 }
984 else
985 {
986 zbx_free(request->value.error);
987 request->value.error = zbx_dsprintf(NULL, "Value \"%s\" of type \"%s\" is not suitable for"
988 " value type \"%s\"", zbx_variant_value_desc(value), zbx_variant_type_desc(value),
989 zbx_item_value_type_string((zbx_item_value_type_t)request->value_type));
990
991 request->value.state = ITEM_STATE_NOTSUPPORTED;
992 ret = FAIL;
993 }
994
995 out:
996 return ret;
997 }
998
999 /******************************************************************************
1000 * *
1001 * Function: preprocessor_add_result *
1002 * *
1003 * Purpose: handle preprocessing result *
1004 * *
1005 * Parameters: manager - [IN] preprocessing manager *
1006 * client - [IN] IPC client *
1007 * message - [IN] packed preprocessing result *
1008 * *
1009 ******************************************************************************/
preprocessor_add_result(zbx_preprocessing_manager_t * manager,zbx_ipc_client_t * client,zbx_ipc_message_t * message)1010 static void preprocessor_add_result(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
1011 zbx_ipc_message_t *message)
1012 {
1013 zbx_preprocessing_worker_t *worker;
1014 zbx_preprocessing_request_t *request;
1015 zbx_variant_t value;
1016 char *error;
1017 zbx_vector_ptr_t history;
1018 zbx_preproc_history_t *vault;
1019 zbx_list_item_t *node;
1020
1021 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1022
1023 worker = preprocessor_get_worker_by_client(manager, client);
1024 node = (zbx_list_item_t *)worker->task;
1025 request = (zbx_preprocessing_request_t *)node->data;
1026
1027 zbx_vector_ptr_create(&history);
1028 zbx_preprocessor_unpack_result(&value, &history, &error, message->data);
1029
1030 if (NULL != (vault = (zbx_preproc_history_t *)zbx_hashset_search(&manager->history_cache,
1031 &request->value.itemid)))
1032 {
1033 zbx_vector_ptr_clear_ext(&vault->history, (zbx_clean_func_t)zbx_preproc_op_history_free);
1034 }
1035
1036 if (0 != history.values_num)
1037 {
1038 if (NULL == vault)
1039 {
1040 zbx_preproc_history_t history_local;
1041
1042 history_local.itemid = request->value.itemid;
1043 vault = (zbx_preproc_history_t *)zbx_hashset_insert(&manager->history_cache, &history_local,
1044 sizeof(history_local));
1045 zbx_vector_ptr_create(&vault->history);
1046 }
1047
1048 zbx_vector_ptr_append_array(&vault->history, history.values, history.values_num);
1049 zbx_vector_ptr_clear(&history);
1050 }
1051 else
1052 {
1053 if (NULL != vault)
1054 {
1055 zbx_vector_ptr_destroy(&vault->history);
1056 zbx_hashset_remove_direct(&manager->history_cache, vault);
1057 }
1058 }
1059
1060 preprocessor_set_request_state_done(manager, request, worker->task);
1061
1062 if (FAIL != preprocessor_set_variant_result(request, &value, error))
1063 preprocessor_enqueue_dependent(manager, &request->value, worker->task);
1064
1065 worker->task = NULL;
1066 zbx_variant_clear(&value);
1067
1068 manager->preproc_num--;
1069
1070 preprocessor_assign_tasks(manager);
1071 preprocessing_flush_queue(manager);
1072
1073 zbx_vector_ptr_destroy(&history);
1074
1075 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1076 }
1077
1078 /******************************************************************************
1079 * *
1080 * Function: preprocessor_flush_test_result *
1081 * *
1082 * Purpose: handle preprocessing result *
1083 * *
1084 * Parameters: manager - [IN] preprocessing manager *
1085 * client - [IN] IPC client *
1086 * message - [IN] packed preprocessing result *
1087 * *
1088 * Comments: Preprocessing testing results are directly forwarded to source *
1089 * client as they are. *
1090 * *
1091 ******************************************************************************/
preprocessor_flush_test_result(zbx_preprocessing_manager_t * manager,zbx_ipc_client_t * client,zbx_ipc_message_t * message)1092 static void preprocessor_flush_test_result(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
1093 zbx_ipc_message_t *message)
1094 {
1095 zbx_preprocessing_worker_t *worker;
1096 zbx_preprocessing_direct_request_t *direct_request;
1097
1098 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1099
1100 worker = preprocessor_get_worker_by_client(manager, client);
1101 direct_request = (zbx_preprocessing_direct_request_t *)worker->task;
1102
1103 /* forward the response to the client */
1104 if (SUCCEED == zbx_ipc_client_connected(direct_request->client))
1105 zbx_ipc_client_send(direct_request->client, message->code, message->data, message->size);
1106
1107 worker->task = NULL;
1108 preprocessor_free_direct_request(direct_request);
1109
1110 preprocessor_assign_tasks(manager);
1111 preprocessing_flush_queue(manager);
1112
1113 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1114 }
1115
preprocessor_get_items_totals(zbx_preprocessing_manager_t * manager,int * total,int * queued,int * processing,int * done,int * pending)1116 static void preprocessor_get_items_totals(zbx_preprocessing_manager_t *manager, int *total, int *queued,
1117 int *processing, int *done, int *pending)
1118 {
1119 #define ZBX_MAX_REQUEST_STATE_PRINT_LIMIT 25
1120
1121 zbx_preproc_item_stats_t *item;
1122 zbx_list_iterator_t iterator;
1123 zbx_preprocessing_request_t *request;
1124 zbx_hashset_t items;
1125
1126 *total = 0;
1127 *queued = 0;
1128 *processing = 0;
1129 *done = 0;
1130 *pending = 0;
1131
1132 zbx_hashset_create(&items, 1024, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1133
1134 zbx_list_iterator_init(&manager->queue, &iterator);
1135 while (SUCCEED == zbx_list_iterator_next(&iterator))
1136 {
1137 zbx_list_iterator_peek(&iterator, (void **)&request);
1138
1139 if (NULL == (item = zbx_hashset_search(&items, &request->value.itemid)))
1140 {
1141 zbx_preproc_item_stats_t item_local = {.itemid = request->value.itemid};
1142
1143 item = zbx_hashset_insert(&items, &item_local, sizeof(item_local));
1144 }
1145
1146 switch(request->state)
1147 {
1148 case REQUEST_STATE_QUEUED:
1149 if (*queued < ZBX_MAX_REQUEST_STATE_PRINT_LIMIT)
1150 {
1151 zabbix_log(LOG_LEVEL_DEBUG, "oldest queued itemid: " ZBX_FS_UI64
1152 " values:%d pos:%d", item->itemid, item->values_num, *total);
1153 }
1154 (*queued)++;
1155 break;
1156 case REQUEST_STATE_PROCESSING:
1157 if (*processing < ZBX_MAX_REQUEST_STATE_PRINT_LIMIT)
1158 {
1159 zabbix_log(LOG_LEVEL_DEBUG, "oldest processing itemid: " ZBX_FS_UI64
1160 " values:%d pos:%d", item->itemid, item->values_num, *total);
1161 }
1162 (*processing)++;
1163 break;
1164 case REQUEST_STATE_DONE:
1165 if (*done < ZBX_MAX_REQUEST_STATE_PRINT_LIMIT)
1166 {
1167 zabbix_log(LOG_LEVEL_DEBUG, "oldest done itemid: " ZBX_FS_UI64
1168 " values:%d pos:%d", item->itemid, item->values_num, *total);
1169 }
1170 (*done)++;
1171 break;
1172 case REQUEST_STATE_PENDING:
1173 if (*pending < ZBX_MAX_REQUEST_STATE_PRINT_LIMIT)
1174 {
1175 zabbix_log(LOG_LEVEL_DEBUG, "oldest pending itemid: " ZBX_FS_UI64
1176 " values:%d pos:%d", item->itemid, item->values_num, *total);
1177 }
1178 (*pending)++;
1179 break;
1180 }
1181
1182 item->values_num++;
1183 (*total)++;
1184 }
1185
1186 zbx_hashset_destroy(&items);
1187 #undef ZBX_MAX_REQUEST_STATE_PRINT_LIMIT
1188 }
1189
1190 /******************************************************************************
1191 * *
1192 * Function: preprocessor_get_diag_stats *
1193 * *
1194 * Purpose: return diagnostic statistics *
1195 * *
1196 * Parameters: manager - [IN] preprocessing manager *
1197 * client - [IN] IPC client *
1198 * *
1199 ******************************************************************************/
preprocessor_get_diag_stats(zbx_preprocessing_manager_t * manager,zbx_ipc_client_t * client)1200 static void preprocessor_get_diag_stats(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client)
1201 {
1202 unsigned char *data;
1203 zbx_uint32_t data_len;
1204 int total, queued, processing, done, pending;
1205
1206 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1207
1208 preprocessor_get_items_totals(manager, &total, &queued, &processing, &done, &pending);
1209
1210 data_len = zbx_preprocessor_pack_diag_stats(&data, total, queued, processing, done, pending);
1211 zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_DIAG_STATS_RESULT, data, data_len);
1212 zbx_free(data);
1213
1214 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1215 }
1216
1217 /******************************************************************************
1218 * *
1219 * Function: preproc_sort_item_by_values_desc *
1220 * *
1221 * Purpose: compare item statistics by value *
1222 * *
1223 ******************************************************************************/
preproc_sort_item_by_values_desc(const void * d1,const void * d2)1224 static int preproc_sort_item_by_values_desc(const void *d1, const void *d2)
1225 {
1226 zbx_preproc_item_stats_t *i1 = *(zbx_preproc_item_stats_t **)d1;
1227 zbx_preproc_item_stats_t *i2 = *(zbx_preproc_item_stats_t **)d2;
1228
1229 return i2->values_num - i1->values_num;
1230 }
1231
preprocessor_get_items_view(zbx_preprocessing_manager_t * manager,zbx_hashset_t * items,zbx_vector_ptr_t * view)1232 static void preprocessor_get_items_view(zbx_preprocessing_manager_t *manager, zbx_hashset_t *items,
1233 zbx_vector_ptr_t *view)
1234 {
1235 zbx_preproc_item_stats_t *item;
1236 zbx_list_iterator_t iterator;
1237 zbx_preprocessing_request_t *request;
1238
1239 zbx_list_iterator_init(&manager->queue, &iterator);
1240 while (SUCCEED == zbx_list_iterator_next(&iterator))
1241 {
1242 zbx_list_iterator_peek(&iterator, (void **)&request);
1243
1244 if (NULL == (item = zbx_hashset_search(items, &request->value.itemid)))
1245 {
1246 zbx_preproc_item_stats_t item_local = {.itemid = request->value.itemid};
1247
1248 item = zbx_hashset_insert(items, &item_local, sizeof(item_local));
1249 zbx_vector_ptr_append(view, item);
1250 }
1251 /* There might be processed, but not yet flushed items at the start of queue with */
1252 /* freed preprocessing steps and steps_num being zero. Because of that keep updating */
1253 /* items steps_num to have preprocessing steps of last queued item. */
1254 item->steps_num = request->steps_num;
1255 item->values_num++;
1256 }
1257 }
1258
1259
1260 /******************************************************************************
1261 * *
1262 * Function: preprocessor_get_top_items *
1263 * *
1264 * Purpose: return diagnostic top view *
1265 * *
1266 * Parameters: manager - [IN] preprocessing manager *
1267 * client - [IN] IPC client *
1268 * message - [IN] the message with request *
1269 * *
1270 ******************************************************************************/
preprocessor_get_top_items(zbx_preprocessing_manager_t * manager,zbx_ipc_client_t * client,zbx_ipc_message_t * message)1271 static void preprocessor_get_top_items(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
1272 zbx_ipc_message_t *message)
1273 {
1274 int limit;
1275 unsigned char *data;
1276 zbx_uint32_t data_len;
1277 zbx_hashset_t items;
1278 zbx_vector_ptr_t view;
1279
1280 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1281
1282 zbx_preprocessor_unpack_top_request(&limit, message->data);
1283
1284 zbx_hashset_create(&items, 1024, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1285 zbx_vector_ptr_create(&view);
1286
1287 preprocessor_get_items_view(manager, &items, &view);
1288
1289 zbx_vector_ptr_sort(&view, preproc_sort_item_by_values_desc);
1290
1291 data_len = zbx_preprocessor_pack_top_items_result(&data, (zbx_preproc_item_stats_t **)view.values,
1292 MIN(limit, view.values_num));
1293 zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_TOP_ITEMS_RESULT, data, data_len);
1294 zbx_free(data);
1295
1296 zbx_vector_ptr_destroy(&view);
1297 zbx_hashset_destroy(&items);
1298
1299 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1300 }
1301
preprocessor_get_oldest_preproc_items(zbx_preprocessing_manager_t * manager,zbx_ipc_client_t * client,zbx_ipc_message_t * message)1302 static void preprocessor_get_oldest_preproc_items(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
1303 zbx_ipc_message_t *message)
1304 {
1305 int limit, i;
1306 unsigned char *data;
1307 zbx_uint32_t data_len;
1308 zbx_hashset_t items;
1309 zbx_vector_ptr_t view, view_preproc;
1310
1311 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1312
1313 zbx_preprocessor_unpack_top_request(&limit, message->data);
1314
1315 zbx_hashset_create(&items, 1024, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1316 zbx_vector_ptr_create(&view);
1317 zbx_vector_ptr_create(&view_preproc);
1318 zbx_vector_ptr_reserve(&view_preproc, limit);
1319
1320 preprocessor_get_items_view(manager, &items, &view);
1321
1322 for (i = 0; i < view.values_num && 0 < limit; i++)
1323 {
1324 zbx_preproc_item_stats_t *item;
1325
1326 item = (zbx_preproc_item_stats_t *)view.values[i];
1327
1328 /* only items with preprocessing can slow down queue */
1329 if (0 == item->steps_num)
1330 continue;
1331
1332 zbx_vector_ptr_append(&view_preproc, item);
1333 limit--;
1334 }
1335
1336 data_len = zbx_preprocessor_pack_top_items_result(&data, (zbx_preproc_item_stats_t **)view_preproc.values,
1337 MIN(limit, view_preproc.values_num));
1338 zbx_ipc_client_send(client, ZBX_IPC_PREPROCESSOR_TOP_ITEMS_RESULT, data, data_len);
1339 zbx_free(data);
1340
1341 zbx_vector_ptr_destroy(&view_preproc);
1342 zbx_vector_ptr_destroy(&view);
1343 zbx_hashset_destroy(&items);
1344
1345 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1346 }
1347
1348 /******************************************************************************
1349 * *
1350 * Function: preprocessor_init_manager *
1351 * *
1352 * Purpose: initializes preprocessing manager *
1353 * *
1354 * Parameters: manager - [IN] the manager to initialize *
1355 * *
1356 ******************************************************************************/
preprocessor_init_manager(zbx_preprocessing_manager_t * manager)1357 static void preprocessor_init_manager(zbx_preprocessing_manager_t *manager)
1358 {
1359 zabbix_log(LOG_LEVEL_DEBUG, "In %s() workers: %d", __func__, CONFIG_PREPROCESSOR_FORKS);
1360
1361 memset(manager, 0, sizeof(zbx_preprocessing_manager_t));
1362
1363 manager->workers = (zbx_preprocessing_worker_t *)zbx_calloc(NULL, CONFIG_PREPROCESSOR_FORKS,
1364 sizeof(zbx_preprocessing_worker_t));
1365 zbx_list_create(&manager->queue);
1366 zbx_list_create(&manager->direct_queue);
1367 zbx_hashset_create_ext(&manager->item_config, 0, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC,
1368 (zbx_clean_func_t)preproc_item_clear,
1369 ZBX_DEFAULT_MEM_MALLOC_FUNC, ZBX_DEFAULT_MEM_REALLOC_FUNC, ZBX_DEFAULT_MEM_FREE_FUNC);
1370 zbx_hashset_create(&manager->linked_items, 0, ZBX_DEFAULT_UINT64_HASH_FUNC, ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1371 zbx_hashset_create(&manager->history_cache, 1000, ZBX_DEFAULT_UINT64_HASH_FUNC,
1372 ZBX_DEFAULT_UINT64_COMPARE_FUNC);
1373
1374 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1375 }
1376
1377 /******************************************************************************
1378 * *
1379 * Function: preprocessor_register_worker *
1380 * *
1381 * Purpose: registers preprocessing worker *
1382 * *
1383 * Parameters: manager - [IN] the manager *
1384 * client - [IN] the connected preprocessing worker *
1385 * message - [IN] message received by preprocessing manager *
1386 * *
1387 ******************************************************************************/
preprocessor_register_worker(zbx_preprocessing_manager_t * manager,zbx_ipc_client_t * client,zbx_ipc_message_t * message)1388 static void preprocessor_register_worker(zbx_preprocessing_manager_t *manager, zbx_ipc_client_t *client,
1389 zbx_ipc_message_t *message)
1390 {
1391 zbx_preprocessing_worker_t *worker = NULL;
1392 pid_t ppid;
1393
1394 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1395
1396 memcpy(&ppid, message->data, sizeof(ppid));
1397
1398 if (ppid != getppid())
1399 {
1400 zbx_ipc_client_close(client);
1401 zabbix_log(LOG_LEVEL_DEBUG, "refusing connection from foreign process");
1402 }
1403 else
1404 {
1405 if (CONFIG_PREPROCESSOR_FORKS == manager->worker_count)
1406 {
1407 THIS_SHOULD_NEVER_HAPPEN;
1408 exit(EXIT_FAILURE);
1409 }
1410
1411 worker = (zbx_preprocessing_worker_t *)&manager->workers[manager->worker_count++];
1412 worker->client = client;
1413
1414 preprocessor_assign_tasks(manager);
1415 }
1416
1417 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1418 }
1419
1420 /******************************************************************************
1421 * *
1422 * Function: preprocessor_destroy_manager *
1423 * *
1424 * Purpose: destroy preprocessing manager *
1425 * *
1426 * Parameters: manager - [IN] the manager to destroy *
1427 * *
1428 ******************************************************************************/
preprocessor_destroy_manager(zbx_preprocessing_manager_t * manager)1429 static void preprocessor_destroy_manager(zbx_preprocessing_manager_t *manager)
1430 {
1431 zbx_preprocessing_request_t *request;
1432 zbx_preprocessing_direct_request_t *direct_request;
1433
1434 zbx_free(manager->workers);
1435
1436 /* this is the place where values are lost */
1437 while (SUCCEED == zbx_list_pop(&manager->direct_queue, (void **)&direct_request))
1438 preprocessor_free_direct_request(direct_request);
1439
1440 zbx_list_destroy(&manager->direct_queue);
1441
1442 while (SUCCEED == zbx_list_pop(&manager->queue, (void **)&request))
1443 preprocessor_free_request(request);
1444
1445 zbx_list_destroy(&manager->queue);
1446
1447 zbx_hashset_destroy(&manager->item_config);
1448 zbx_hashset_destroy(&manager->linked_items);
1449 zbx_hashset_destroy(&manager->history_cache);
1450 }
1451
ZBX_THREAD_ENTRY(preprocessing_manager_thread,args)1452 ZBX_THREAD_ENTRY(preprocessing_manager_thread, args)
1453 {
1454 zbx_ipc_service_t service;
1455 char *error = NULL;
1456 zbx_ipc_client_t *client;
1457 zbx_ipc_message_t *message;
1458 zbx_preprocessing_manager_t manager;
1459 int ret;
1460 double time_stat, time_idle = 0, time_now, time_flush, sec;
1461
1462 #define STAT_INTERVAL 5 /* if a process is busy and does not sleep then update status not faster than */
1463 /* once in STAT_INTERVAL seconds */
1464
1465 process_type = ((zbx_thread_args_t *)args)->process_type;
1466 server_num = ((zbx_thread_args_t *)args)->server_num;
1467 process_num = ((zbx_thread_args_t *)args)->process_num;
1468
1469 zbx_setproctitle("%s #%d starting", get_process_type_string(process_type), process_num);
1470
1471 zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(program_type),
1472 server_num, get_process_type_string(process_type), process_num);
1473
1474 update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);
1475
1476 if (FAIL == zbx_ipc_service_start(&service, ZBX_IPC_SERVICE_PREPROCESSING, &error))
1477 {
1478 zabbix_log(LOG_LEVEL_CRIT, "cannot start preprocessing service: %s", error);
1479 zbx_free(error);
1480 exit(EXIT_FAILURE);
1481 }
1482
1483 preprocessor_init_manager(&manager);
1484
1485 /* initialize statistics */
1486 time_stat = zbx_time();
1487 time_flush = time_stat;
1488
1489 zbx_setproctitle("%s #%d started", get_process_type_string(process_type), process_num);
1490
1491 while (ZBX_IS_RUNNING())
1492 {
1493 time_now = zbx_time();
1494
1495 if (STAT_INTERVAL < time_now - time_stat)
1496 {
1497 zbx_setproctitle("%s #%d [queued " ZBX_FS_UI64 ", processed " ZBX_FS_UI64 " values, idle "
1498 ZBX_FS_DBL " sec during " ZBX_FS_DBL " sec]",
1499 get_process_type_string(process_type), process_num,
1500 manager.queued_num, manager.processed_num, time_idle, time_now - time_stat);
1501
1502 time_stat = time_now;
1503 time_idle = 0;
1504 manager.processed_num = 0;
1505 }
1506
1507 update_selfmon_counter(ZBX_PROCESS_STATE_IDLE);
1508 ret = zbx_ipc_service_recv(&service, ZBX_PREPROCESSING_MANAGER_DELAY, &client, &message);
1509 update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);
1510 sec = zbx_time();
1511 zbx_update_env(sec);
1512
1513 if (ZBX_IPC_RECV_IMMEDIATE != ret)
1514 time_idle += sec - time_now;
1515
1516 if (NULL != message)
1517 {
1518 switch (message->code)
1519 {
1520 case ZBX_IPC_PREPROCESSOR_WORKER:
1521 preprocessor_register_worker(&manager, client, message);
1522 break;
1523 case ZBX_IPC_PREPROCESSOR_REQUEST:
1524 preprocessor_add_request(&manager, message);
1525 break;
1526 case ZBX_IPC_PREPROCESSOR_RESULT:
1527 preprocessor_add_result(&manager, client, message);
1528 break;
1529 case ZBX_IPC_PREPROCESSOR_QUEUE:
1530 zbx_ipc_client_send(client, message->code, (unsigned char *)&manager.queued_num,
1531 sizeof(zbx_uint64_t));
1532 break;
1533 case ZBX_IPC_PREPROCESSOR_TEST_REQUEST:
1534 preprocessor_add_test_request(&manager, client, message);
1535 break;
1536 case ZBX_IPC_PREPROCESSOR_TEST_RESULT:
1537 preprocessor_flush_test_result(&manager, client, message);
1538 break;
1539 case ZBX_IPC_PREPROCESSOR_DIAG_STATS:
1540 preprocessor_get_diag_stats(&manager, client);
1541 break;
1542 case ZBX_IPC_PREPROCESSOR_TOP_ITEMS:
1543 preprocessor_get_top_items(&manager, client, message);
1544 break;
1545 case ZBX_IPC_PREPROCESSOR_TOP_OLDEST_PREPROC_ITEMS:
1546 preprocessor_get_oldest_preproc_items(&manager, client, message);
1547 break;
1548 }
1549
1550 zbx_ipc_message_free(message);
1551 }
1552
1553 if (NULL != client)
1554 zbx_ipc_client_release(client);
1555
1556 if (0 == manager.preproc_num || 1 < time_now - time_flush)
1557 {
1558 dc_flush_history();
1559 time_flush = time_now;
1560 }
1561 }
1562
1563 zbx_setproctitle("%s #%d [terminated]", get_process_type_string(process_type), process_num);
1564
1565 while (1)
1566 zbx_sleep(SEC_PER_MIN);
1567
1568 zbx_ipc_service_close(&service);
1569 preprocessor_destroy_manager(&manager);
1570 #undef STAT_INTERVAL
1571 }
1572