1 /*
2 ** Zabbix
3 ** Copyright (C) 2001-2021 Zabbix SIA
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 **/
19
20 #include "common.h"
21 #include "log.h"
22 #include "proxy.h"
23 #include "zbxserver.h"
24 #include "zbxserialize.h"
25 #include "zbxipcservice.h"
26
27 #include "preproc.h"
28 #include "preprocessing.h"
29 #include "preproc_history.h"
30
31 #define PACKED_FIELD_RAW 0
32 #define PACKED_FIELD_STRING 1
33 #define MAX_VALUES_LOCAL 256
34
35 /* packed field data description */
36 typedef struct
37 {
38 const void *value; /* value to be packed */
39 zbx_uint32_t size; /* size of a value (can be 0 for strings) */
40 unsigned char type; /* field type */
41 }
42 zbx_packed_field_t;
43
44 #define PACKED_FIELD(value, size) \
45 (zbx_packed_field_t){(value), (size), (0 == (size) ? PACKED_FIELD_STRING : PACKED_FIELD_RAW)};
46
47 static zbx_ipc_message_t cached_message;
48 static int cached_values;
49
50 /******************************************************************************
51 * *
52 * Function: message_pack_data *
53 * *
54 * Purpose: helper for data packing based on defined format *
55 * *
56 * Parameters: message - [OUT] IPC message, can be NULL for buffer size *
57 * calculations *
58 * fields - [IN] the definition of data to be packed *
59 * count - [IN] field count *
60 * *
61 * Return value: size of packed data or 0 if the message size would exceed *
62 * 4GB limit *
63 * *
64 ******************************************************************************/
message_pack_data(zbx_ipc_message_t * message,zbx_packed_field_t * fields,int count)65 static zbx_uint32_t message_pack_data(zbx_ipc_message_t *message, zbx_packed_field_t *fields, int count)
66 {
67 int i;
68 zbx_uint32_t data_size = 0;
69 zbx_uint64_t field_size;
70 unsigned char *offset = NULL;
71 const zbx_uint64_t max_uint32 = ~(zbx_uint32_t)0;
72
73 if (NULL != message)
74 {
75 /* recursive call to calculate required buffer size */
76 data_size = message_pack_data(NULL, fields, count);
77
78 if (0 == data_size || max_uint32 - message->size < data_size)
79 return 0;
80
81 message->size += data_size;
82 message->data = (unsigned char *)zbx_realloc(message->data, message->size);
83 offset = message->data + (message->size - data_size);
84 }
85
86 for (i = 0; i < count; i++)
87 {
88 field_size = fields[i].size;
89 if (NULL != offset)
90 {
91 /* data packing */
92 if (PACKED_FIELD_STRING == fields[i].type)
93 {
94 zbx_uint32_t field_size_uint32 = (zbx_uint32_t)field_size;
95
96 memcpy(offset, &field_size_uint32, sizeof(zbx_uint32_t));
97 if (0 != field_size && NULL != fields[i].value)
98 memcpy(offset + sizeof(zbx_uint32_t), fields[i].value, field_size);
99 field_size += sizeof(zbx_uint32_t);
100 }
101 else
102 memcpy(offset, fields[i].value, field_size);
103
104 offset += field_size;
105 }
106 else
107 {
108 /* size calculation */
109 if (PACKED_FIELD_STRING == fields[i].type)
110 {
111 field_size = (NULL != fields[i].value) ? strlen((const char *)fields[i].value) + 1 : 0;
112 fields[i].size = (zbx_uint32_t)field_size;
113
114 field_size += sizeof(zbx_uint32_t);
115 }
116
117 if (field_size + data_size > max_uint32)
118 return 0;
119
120 data_size += (zbx_uint32_t)field_size;
121 }
122 }
123
124 return data_size;
125 }
126
127 /******************************************************************************
128 * *
129 * Function: preprocessor_pack_value *
130 * *
131 * Purpose: pack item value data into a single buffer that can be used in IPC *
132 * *
133 * Parameters: message - [OUT] IPC message *
134 * value - [IN] value to be packed *
135 * *
136 * Return value: size of packed data *
137 * *
138 ******************************************************************************/
preprocessor_pack_value(zbx_ipc_message_t * message,zbx_preproc_item_value_t * value)139 static zbx_uint32_t preprocessor_pack_value(zbx_ipc_message_t *message, zbx_preproc_item_value_t *value)
140 {
141 zbx_packed_field_t fields[24], *offset = fields; /* 24 - max field count */
142 unsigned char ts_marker, result_marker, log_marker;
143
144 ts_marker = (NULL != value->ts);
145 result_marker = (NULL != value->result_ptr->result);
146
147 *offset++ = PACKED_FIELD(&value->itemid, sizeof(zbx_uint64_t));
148 *offset++ = PACKED_FIELD(&value->hostid, sizeof(zbx_uint64_t));
149 *offset++ = PACKED_FIELD(&value->item_value_type, sizeof(unsigned char));
150 *offset++ = PACKED_FIELD(&value->item_flags, sizeof(unsigned char));
151 *offset++ = PACKED_FIELD(&value->state, sizeof(unsigned char));
152 *offset++ = PACKED_FIELD(value->error, 0);
153 *offset++ = PACKED_FIELD(&ts_marker, sizeof(unsigned char));
154
155 if (NULL != value->ts)
156 {
157 *offset++ = PACKED_FIELD(&value->ts->sec, sizeof(int));
158 *offset++ = PACKED_FIELD(&value->ts->ns, sizeof(int));
159 }
160
161 *offset++ = PACKED_FIELD(&result_marker, sizeof(unsigned char));
162
163 if (NULL != value->result_ptr->result)
164 {
165
166 *offset++ = PACKED_FIELD(&value->result_ptr->result->lastlogsize, sizeof(zbx_uint64_t));
167 *offset++ = PACKED_FIELD(&value->result_ptr->result->ui64, sizeof(zbx_uint64_t));
168 *offset++ = PACKED_FIELD(&value->result_ptr->result->dbl, sizeof(double));
169 *offset++ = PACKED_FIELD(value->result_ptr->result->str, 0);
170 *offset++ = PACKED_FIELD(value->result_ptr->result->text, 0);
171 *offset++ = PACKED_FIELD(value->result_ptr->result->msg, 0);
172 *offset++ = PACKED_FIELD(&value->result_ptr->result->type, sizeof(int));
173 *offset++ = PACKED_FIELD(&value->result_ptr->result->mtime, sizeof(int));
174
175 log_marker = (NULL != value->result_ptr->result->log);
176 *offset++ = PACKED_FIELD(&log_marker, sizeof(unsigned char));
177 if (NULL != value->result_ptr->result->log)
178 {
179 *offset++ = PACKED_FIELD(value->result_ptr->result->log->value, 0);
180 *offset++ = PACKED_FIELD(value->result_ptr->result->log->source, 0);
181 *offset++ = PACKED_FIELD(&value->result_ptr->result->log->timestamp, sizeof(int));
182 *offset++ = PACKED_FIELD(&value->result_ptr->result->log->severity, sizeof(int));
183 *offset++ = PACKED_FIELD(&value->result_ptr->result->log->logeventid, sizeof(int));
184 }
185 }
186
187 return message_pack_data(message, fields, offset - fields);
188 }
189
190 /******************************************************************************
191 * *
192 * Function: preprocessor_pack_variant *
193 * *
194 * Purpose: packs variant value for serialization *
195 * *
196 * Parameters: fields - [OUT] the packed fields *
197 * value - [IN] the value to pack *
198 * *
199 * Return value: The number of fields used. *
200 * *
201 * Comments: Don't pack local variables, only ones passed in parameters! *
202 * *
203 ******************************************************************************/
preprocessor_pack_variant(zbx_packed_field_t * fields,const zbx_variant_t * value)204 static int preprocessor_pack_variant(zbx_packed_field_t *fields, const zbx_variant_t *value)
205 {
206 int offset = 0;
207
208 fields[offset++] = PACKED_FIELD(&value->type, sizeof(unsigned char));
209
210 switch (value->type)
211 {
212 case ZBX_VARIANT_UI64:
213 fields[offset++] = PACKED_FIELD(&value->data.ui64, sizeof(zbx_uint64_t));
214 break;
215
216 case ZBX_VARIANT_DBL:
217 fields[offset++] = PACKED_FIELD(&value->data.dbl, sizeof(double));
218 break;
219
220 case ZBX_VARIANT_STR:
221 fields[offset++] = PACKED_FIELD(value->data.str, 0);
222 break;
223
224 case ZBX_VARIANT_BIN:
225 fields[offset++] = PACKED_FIELD(value->data.bin, sizeof(zbx_uint32_t) +
226 zbx_variant_data_bin_get(value->data.bin, NULL));
227 break;
228 }
229
230 return offset;
231 }
232
233 /******************************************************************************
234 * *
235 * Function: preprocessor_pack_history *
236 * *
237 * Purpose: packs preprocessing history for serialization *
238 * *
239 * Parameters: fields - [OUT] the packed fields *
240 * history - [IN] the history to pack *
241 * *
242 * Return value: The number of fields used. *
243 * *
244 * Comments: Don't pack local variables, only ones passed in parameters! *
245 * *
246 ******************************************************************************/
preprocessor_pack_history(zbx_packed_field_t * fields,const zbx_vector_ptr_t * history,const int * history_num)247 static int preprocessor_pack_history(zbx_packed_field_t *fields, const zbx_vector_ptr_t *history,
248 const int *history_num)
249 {
250 int i, offset = 0;
251
252 fields[offset++] = PACKED_FIELD(history_num, sizeof(int));
253
254 for (i = 0; i < *history_num; i++)
255 {
256 zbx_preproc_op_history_t *ophistory = (zbx_preproc_op_history_t *)history->values[i];
257
258 fields[offset++] = PACKED_FIELD(&ophistory->index, sizeof(int));
259 offset += preprocessor_pack_variant(&fields[offset], &ophistory->value);
260 fields[offset++] = PACKED_FIELD(&ophistory->ts.sec, sizeof(int));
261 fields[offset++] = PACKED_FIELD(&ophistory->ts.ns, sizeof(int));
262 }
263
264 return offset;
265 }
266
267 /******************************************************************************
268 * *
269 * Function: preprocessor_pack_step *
270 * *
271 * Purpose: packs preprocessing step for serialization *
272 * *
273 * Parameters: fields - [OUT] the packed fields *
274 * step - [IN] the step to pack *
275 * *
276 * Return value: The number of fields used. *
277 * *
278 * Comments: Don't pack local variables, only ones passed in parameters! *
279 * *
280 ******************************************************************************/
preprocessor_pack_step(zbx_packed_field_t * fields,const zbx_preproc_op_t * step)281 static int preprocessor_pack_step(zbx_packed_field_t *fields, const zbx_preproc_op_t *step)
282 {
283 int offset = 0;
284
285 fields[offset++] = PACKED_FIELD(&step->type, sizeof(char));
286 fields[offset++] = PACKED_FIELD(step->params, 0);
287 fields[offset++] = PACKED_FIELD(&step->error_handler, sizeof(char));
288 fields[offset++] = PACKED_FIELD(step->error_handler_params, 0);
289
290 return offset;
291 }
292
293 /******************************************************************************
294 * *
295 * Function: preprocessor_pack_steps *
296 * *
297 * Purpose: packs preprocessing steps for serialization *
298 * *
299 * Parameters: fields - [OUT] the packed fields *
300 * steps - [IN] the steps to pack *
301 * steps_num - [IN] the number of steps *
302 * *
303 * Return value: The number of fields used. *
304 * *
305 * Comments: Don't pack local variables, only ones passed in parameters! *
306 * *
307 ******************************************************************************/
preprocessor_pack_steps(zbx_packed_field_t * fields,const zbx_preproc_op_t * steps,const int * steps_num)308 static int preprocessor_pack_steps(zbx_packed_field_t *fields, const zbx_preproc_op_t *steps, const int *steps_num)
309 {
310 int i, offset = 0;
311
312 fields[offset++] = PACKED_FIELD(steps_num, sizeof(int));
313
314 for (i = 0; i < *steps_num; i++)
315 offset += preprocessor_pack_step(&fields[offset], &steps[i]);
316
317 return offset;
318 }
319
320 /******************************************************************************
321 * *
322 * Function: preprocesser_unpack_variant *
323 * *
324 * Purpose: unpacks serialized variant value *
325 * *
326 * Parameters: data - [IN] the serialized data *
327 * value - [OUT] the value *
328 * *
329 * Return value: The number of bytes parsed. *
330 * *
331 ******************************************************************************/
preprocesser_unpack_variant(const unsigned char * data,zbx_variant_t * value)332 static int preprocesser_unpack_variant(const unsigned char *data, zbx_variant_t *value)
333 {
334 const unsigned char *offset = data;
335 zbx_uint32_t value_len;
336
337 offset += zbx_deserialize_char(offset, &value->type);
338
339 switch (value->type)
340 {
341 case ZBX_VARIANT_UI64:
342 offset += zbx_deserialize_uint64(offset, &value->data.ui64);
343 break;
344
345 case ZBX_VARIANT_DBL:
346 offset += zbx_deserialize_double(offset, &value->data.dbl);
347 break;
348
349 case ZBX_VARIANT_STR:
350 offset += zbx_deserialize_str(offset, &value->data.str, value_len);
351 break;
352
353 case ZBX_VARIANT_BIN:
354 offset += zbx_deserialize_bin(offset, &value->data.bin, value_len);
355 break;
356 }
357
358 return offset - data;
359 }
360
361 /******************************************************************************
362 * *
363 * Function: preprocesser_unpack_history *
364 * *
365 * Purpose: unpacks serialized preprocessing history *
366 * *
367 * Parameters: data - [IN] the serialized data *
368 * history - [OUT] the history *
369 * *
370 * Return value: The number of bytes parsed. *
371 * *
372 ******************************************************************************/
preprocesser_unpack_history(const unsigned char * data,zbx_vector_ptr_t * history)373 static int preprocesser_unpack_history(const unsigned char *data, zbx_vector_ptr_t *history)
374 {
375 const unsigned char *offset = data;
376 int i, history_num;
377
378 offset += zbx_deserialize_int(offset, &history_num);
379
380 if (0 != history_num)
381 {
382 zbx_vector_ptr_reserve(history, history_num);
383
384 for (i = 0; i < history_num; i++)
385 {
386 zbx_preproc_op_history_t *ophistory;
387
388 ophistory = zbx_malloc(NULL, sizeof(zbx_preproc_op_history_t));
389
390 offset += zbx_deserialize_int(offset, &ophistory->index);
391 offset += preprocesser_unpack_variant(offset, &ophistory->value);
392 offset += zbx_deserialize_int(offset, &ophistory->ts.sec);
393 offset += zbx_deserialize_int(offset, &ophistory->ts.ns);
394
395 zbx_vector_ptr_append(history, ophistory);
396 }
397 }
398
399 return offset - data;
400 }
401
402 /******************************************************************************
403 * *
404 * Function: preprocessor_unpack_step *
405 * *
406 * Purpose: unpacks serialized preprocessing step *
407 * *
408 * Parameters: data - [IN] the serialized data *
409 * step - [OUT] the preprocessing step *
410 * *
411 * Return value: The number of bytes parsed. *
412 * *
413 ******************************************************************************/
preprocessor_unpack_step(const unsigned char * data,zbx_preproc_op_t * step)414 static int preprocessor_unpack_step(const unsigned char *data, zbx_preproc_op_t *step)
415 {
416 const unsigned char *offset = data;
417 zbx_uint32_t value_len;
418
419 offset += zbx_deserialize_char(offset, &step->type);
420 offset += zbx_deserialize_str_ptr(offset, step->params, value_len);
421 offset += zbx_deserialize_char(offset, &step->error_handler);
422 offset += zbx_deserialize_str_ptr(offset, step->error_handler_params, value_len);
423
424 return offset - data;
425 }
426
427 /******************************************************************************
428 * *
429 * Function: preprocessor_unpack_steps *
430 * *
431 * Purpose: unpacks serialized preprocessing steps *
432 * *
433 * Parameters: data - [IN] the serialized data *
434 * steps - [OUT] the preprocessing steps *
435 * steps_num - [OUT] the number of steps *
436 * *
437 * Return value: The number of bytes parsed. *
438 * *
439 ******************************************************************************/
preprocessor_unpack_steps(const unsigned char * data,zbx_preproc_op_t ** steps,int * steps_num)440 static int preprocessor_unpack_steps(const unsigned char *data, zbx_preproc_op_t **steps, int *steps_num)
441 {
442 const unsigned char *offset = data;
443 int i;
444
445 offset += zbx_deserialize_int(offset, steps_num);
446 if (0 < *steps_num)
447 {
448 *steps = (zbx_preproc_op_t *)zbx_malloc(NULL, sizeof(zbx_preproc_op_t) * (*steps_num));
449 for (i = 0; i < *steps_num; i++)
450 offset += preprocessor_unpack_step(offset, *steps + i);
451 }
452 else
453 *steps = NULL;
454
455 return offset - data;
456 }
457
458 /******************************************************************************
459 * *
460 * Function: zbx_preprocessor_pack_task *
461 * *
462 * Purpose: pack preprocessing task data into a single buffer that can be *
463 * used in IPC *
464 * *
465 * Parameters: data - [OUT] memory buffer for packed data *
466 * itemid - [IN] item id *
467 * value_type - [IN] item value type *
468 * ts - [IN] value timestamp *
469 * value - [IN] item value *
470 * history - [IN] history data (can be NULL) *
471 * steps - [IN] preprocessing steps *
472 * steps_num - [IN] preprocessing step count *
473 * *
474 * Return value: size of packed data *
475 * *
476 ******************************************************************************/
zbx_preprocessor_pack_task(unsigned char ** data,zbx_uint64_t itemid,unsigned char value_type,zbx_timespec_t * ts,zbx_variant_t * value,const zbx_vector_ptr_t * history,const zbx_preproc_op_t * steps,int steps_num)477 zbx_uint32_t zbx_preprocessor_pack_task(unsigned char **data, zbx_uint64_t itemid, unsigned char value_type,
478 zbx_timespec_t *ts, zbx_variant_t *value, const zbx_vector_ptr_t *history,
479 const zbx_preproc_op_t *steps, int steps_num)
480 {
481 zbx_packed_field_t *offset, *fields;
482 unsigned char ts_marker;
483 zbx_uint32_t size;
484 int history_num;
485 zbx_ipc_message_t message;
486
487 history_num = (NULL != history ? history->values_num : 0);
488
489 /* 9 is a max field count (without preprocessing step and history fields) */
490 fields = (zbx_packed_field_t *)zbx_malloc(NULL, (9 + steps_num * 4 + history_num * 5)
491 * sizeof(zbx_packed_field_t));
492
493 offset = fields;
494 ts_marker = (NULL != ts);
495
496 *offset++ = PACKED_FIELD(&itemid, sizeof(zbx_uint64_t));
497 *offset++ = PACKED_FIELD(&value_type, sizeof(unsigned char));
498 *offset++ = PACKED_FIELD(&ts_marker, sizeof(unsigned char));
499
500 if (NULL != ts)
501 {
502 *offset++ = PACKED_FIELD(&ts->sec, sizeof(int));
503 *offset++ = PACKED_FIELD(&ts->ns, sizeof(int));
504 }
505
506 offset += preprocessor_pack_variant(offset, value);
507 offset += preprocessor_pack_history(offset, history, &history_num);
508 offset += preprocessor_pack_steps(offset, steps, &steps_num);
509
510 zbx_ipc_message_init(&message);
511 size = message_pack_data(&message, fields, offset - fields);
512 *data = message.data;
513 zbx_free(fields);
514
515 return size;
516 }
517
518 /******************************************************************************
519 * *
520 * Function: zbx_preprocessor_pack_result *
521 * *
522 * Purpose: pack preprocessing result data into a single buffer that can be *
523 * used in IPC *
524 * *
525 * Parameters: data - [OUT] memory buffer for packed data *
526 * value - [IN] result value *
527 * history - [IN] item history data *
528 * error - [IN] preprocessing error *
529 * *
530 * Return value: size of packed data *
531 * *
532 ******************************************************************************/
zbx_preprocessor_pack_result(unsigned char ** data,zbx_variant_t * value,const zbx_vector_ptr_t * history,char * error)533 zbx_uint32_t zbx_preprocessor_pack_result(unsigned char **data, zbx_variant_t *value,
534 const zbx_vector_ptr_t *history, char *error)
535 {
536 zbx_packed_field_t *offset, *fields;
537 zbx_uint32_t size;
538 zbx_ipc_message_t message;
539 int history_num;
540
541 history_num = history->values_num;
542
543 /* 4 is a max field count (without history fields) */
544 fields = (zbx_packed_field_t *)zbx_malloc(NULL, (4 + history_num * 5) * sizeof(zbx_packed_field_t));
545 offset = fields;
546
547 offset += preprocessor_pack_variant(offset, value);
548 offset += preprocessor_pack_history(offset, history, &history_num);
549
550 *offset++ = PACKED_FIELD(error, 0);
551
552 zbx_ipc_message_init(&message);
553 size = message_pack_data(&message, fields, offset - fields);
554 *data = message.data;
555
556 zbx_free(fields);
557
558 return size;
559 }
560
561 /******************************************************************************
562 * *
563 * Function: zbx_preprocessor_pack_test_result *
564 * *
565 * Purpose: pack preprocessing result data into a single buffer that can be *
566 * used in IPC *
567 * *
568 * Parameters: data - [OUT] memory buffer for packed data *
569 * ret - [IN] return code *
570 * results - [IN] the preprocessing step results *
571 * results_num - [IN] the number of preprocessing step results *
572 * history - [IN] item history data *
573 * error - [IN] preprocessing error *
574 * *
575 * Return value: size of packed data *
576 * *
577 ******************************************************************************/
zbx_preprocessor_pack_test_result(unsigned char ** data,const zbx_preproc_result_t * results,int results_num,const zbx_vector_ptr_t * history,const char * error)578 zbx_uint32_t zbx_preprocessor_pack_test_result(unsigned char **data, const zbx_preproc_result_t *results,
579 int results_num, const zbx_vector_ptr_t *history, const char *error)
580 {
581 zbx_packed_field_t *offset, *fields;
582 zbx_uint32_t size;
583 zbx_ipc_message_t message;
584 int i, history_num;
585
586 history_num = history->values_num;
587
588 fields = (zbx_packed_field_t *)zbx_malloc(NULL, (3 + history_num * 5 + results_num * 4) *
589 sizeof(zbx_packed_field_t));
590 offset = fields;
591
592 *offset++ = PACKED_FIELD(&results_num, sizeof(int));
593
594 for (i = 0; i < results_num; i++)
595 {
596 offset += preprocessor_pack_variant(offset, &results[i].value);
597 *offset++ = PACKED_FIELD(results[i].error, 0);
598 *offset++ = PACKED_FIELD(&results[i].action, sizeof(unsigned char));
599 }
600
601 offset += preprocessor_pack_history(offset, history, &history_num);
602
603 *offset++ = PACKED_FIELD(error, 0);
604
605 zbx_ipc_message_init(&message);
606 size = message_pack_data(&message, fields, offset - fields);
607 *data = message.data;
608
609 zbx_free(fields);
610
611 return size;
612 }
613
614 /******************************************************************************
615 * *
616 * Function: zbx_preprocessor_pack_diag_stats *
617 * *
618 * Purpose: pack diagnostic statistics data into a single buffer that can be *
619 * used in IPC *
620 * Parameters: data - [OUT] memory buffer for packed data *
621 * total - [IN] the number of values *
622 * queued - [IN] the number of values waiting to be *
623 * preprocessed *
624 * processing - [IN] the number of values being preprocessed *
625 * done - [IN] the number of values waiting to be flushed *
626 * that are either preprocessed or did not *
627 * require preprocessing *
628 * pending - [IN] the number of values pending to be *
629 * preprocessed after previous value for *
630 * example delta, throttling depends on *
631 * previous value *
632 * data - [IN] IPC data buffer *
633 * *
634 ******************************************************************************/
zbx_preprocessor_pack_diag_stats(unsigned char ** data,int total,int queued,int processing,int done,int pending)635 zbx_uint32_t zbx_preprocessor_pack_diag_stats(unsigned char **data, int total, int queued, int processing, int done,
636 int pending)
637 {
638 unsigned char *ptr;
639 zbx_uint32_t data_len = 0;
640
641 zbx_serialize_prepare_value(data_len, total);
642 zbx_serialize_prepare_value(data_len, queued);
643 zbx_serialize_prepare_value(data_len, processing);
644 zbx_serialize_prepare_value(data_len, done);
645 zbx_serialize_prepare_value(data_len, pending);
646
647 *data = (unsigned char *)zbx_malloc(NULL, data_len);
648
649 ptr = *data;
650 ptr += zbx_serialize_value(ptr, total);
651 ptr += zbx_serialize_value(ptr, queued);
652 ptr += zbx_serialize_value(ptr, processing);
653 ptr += zbx_serialize_value(ptr, done);
654 (void)zbx_serialize_value(ptr, pending);
655
656 return data_len;
657 }
658
659 /******************************************************************************
660 * *
661 * Function: zbx_preprocessor_pack_top_request *
662 * *
663 * Purpose: pack top request data into a single buffer that can be used in IPC*
664 * *
665 * Parameters: data - [OUT] memory buffer for packed data *
666 * field - [IN] the sort field *
667 * limit - [IN] the number of top values to return *
668 * *
669 ******************************************************************************/
zbx_preprocessor_pack_top_items_request(unsigned char ** data,int limit)670 zbx_uint32_t zbx_preprocessor_pack_top_items_request(unsigned char **data, int limit)
671 {
672 zbx_uint32_t data_len = 0;
673
674 zbx_serialize_prepare_value(data_len, limit);
675 *data = (unsigned char *)zbx_malloc(NULL, data_len);
676 (void)zbx_serialize_value(*data, limit);
677
678 return data_len;
679 }
680
681 /******************************************************************************
682 * *
683 * Function: zbx_preprocessor_pack_top_result *
684 * *
685 * Purpose: pack top result data into a single buffer that can be used in IPC *
686 * *
687 * Parameters: data - [OUT] memory buffer for packed data *
688 * items - [IN] the array of item references *
689 * items_num - [IN] the number of items *
690 * *
691 ******************************************************************************/
zbx_preprocessor_pack_top_items_result(unsigned char ** data,zbx_preproc_item_stats_t ** items,int items_num)692 zbx_uint32_t zbx_preprocessor_pack_top_items_result(unsigned char **data, zbx_preproc_item_stats_t **items,
693 int items_num)
694 {
695 unsigned char *ptr;
696 zbx_uint32_t data_len = 0, item_len = 0;
697 int i;
698
699 if (0 != items_num)
700 {
701 zbx_serialize_prepare_value(item_len, items[0]->itemid);
702 zbx_serialize_prepare_value(item_len, items[0]->values_num);
703 zbx_serialize_prepare_value(item_len, items[0]->steps_num);
704 }
705
706 zbx_serialize_prepare_value(data_len, items_num);
707 data_len += item_len * items_num;
708 *data = (unsigned char *)zbx_malloc(NULL, data_len);
709
710 ptr = *data;
711 ptr += zbx_serialize_value(ptr, items_num);
712
713 for (i = 0; i < items_num; i++)
714 {
715 ptr += zbx_serialize_value(ptr, items[i]->itemid);
716 ptr += zbx_serialize_value(ptr, items[i]->values_num);
717 ptr += zbx_serialize_value(ptr, items[i]->steps_num);
718 }
719
720 return data_len;
721 }
722
723 /******************************************************************************
724 * *
725 * Function: zbx_preprocessor_unpack_value *
726 * *
727 * Purpose: unpack item value data from IPC data buffer *
728 * *
729 * Parameters: value - [OUT] unpacked item value *
730 * data - [IN] IPC data buffer *
731 * *
732 * Return value: size of packed data *
733 * *
734 ******************************************************************************/
zbx_preprocessor_unpack_value(zbx_preproc_item_value_t * value,unsigned char * data)735 zbx_uint32_t zbx_preprocessor_unpack_value(zbx_preproc_item_value_t *value, unsigned char *data)
736 {
737 zbx_uint32_t value_len;
738 zbx_timespec_t *timespec = NULL;
739 AGENT_RESULT *agent_result = NULL;
740 zbx_log_t *log = NULL;
741 unsigned char *offset = data, ts_marker, result_marker, log_marker;
742
743 offset += zbx_deserialize_uint64(offset, &value->itemid);
744 offset += zbx_deserialize_uint64(offset, &value->hostid);
745 offset += zbx_deserialize_char(offset, &value->item_value_type);
746 offset += zbx_deserialize_char(offset, &value->item_flags);
747 offset += zbx_deserialize_char(offset, &value->state);
748 offset += zbx_deserialize_str(offset, &value->error, value_len);
749 offset += zbx_deserialize_char(offset, &ts_marker);
750
751 if (0 != ts_marker)
752 {
753 timespec = (zbx_timespec_t *)zbx_malloc(NULL, sizeof(zbx_timespec_t));
754
755 offset += zbx_deserialize_int(offset, ×pec->sec);
756 offset += zbx_deserialize_int(offset, ×pec->ns);
757 }
758
759 value->ts = timespec;
760
761 offset += zbx_deserialize_char(offset, &result_marker);
762 if (0 != result_marker)
763 {
764 agent_result = (AGENT_RESULT *)zbx_malloc(NULL, sizeof(AGENT_RESULT));
765
766 offset += zbx_deserialize_uint64(offset, &agent_result->lastlogsize);
767 offset += zbx_deserialize_uint64(offset, &agent_result->ui64);
768 offset += zbx_deserialize_double(offset, &agent_result->dbl);
769 offset += zbx_deserialize_str(offset, &agent_result->str, value_len);
770 offset += zbx_deserialize_str(offset, &agent_result->text, value_len);
771 offset += zbx_deserialize_str(offset, &agent_result->msg, value_len);
772 offset += zbx_deserialize_int(offset, &agent_result->type);
773 offset += zbx_deserialize_int(offset, &agent_result->mtime);
774
775 offset += zbx_deserialize_char(offset, &log_marker);
776 if (0 != log_marker)
777 {
778 log = (zbx_log_t *)zbx_malloc(NULL, sizeof(zbx_log_t));
779
780 offset += zbx_deserialize_str(offset, &log->value, value_len);
781 offset += zbx_deserialize_str(offset, &log->source, value_len);
782 offset += zbx_deserialize_int(offset, &log->timestamp);
783 offset += zbx_deserialize_int(offset, &log->severity);
784 offset += zbx_deserialize_int(offset, &log->logeventid);
785 }
786
787 agent_result->log = log;
788 }
789
790 value->result_ptr = (zbx_result_ptr_t *)zbx_malloc(NULL, sizeof(zbx_result_ptr_t));
791 value->result_ptr->result = agent_result;
792 value->result_ptr->refcount = 1;
793
794 return offset - data;
795 }
796
797 /******************************************************************************
798 * *
799 * Function: zbx_preprocessor_unpack_task *
800 * *
801 * Purpose: unpack preprocessing task data from IPC data buffer *
802 * *
803 * Parameters: itemid - [OUT] itemid *
804 * value_type - [OUT] item value type *
805 * ts - [OUT] value timestamp *
806 * value - [OUT] item value *
807 * history - [OUT] history data *
808 * steps - [OUT] preprocessing steps *
809 * steps_num - [OUT] preprocessing step count *
810 * data - [IN] IPC data buffer *
811 * *
812 ******************************************************************************/
zbx_preprocessor_unpack_task(zbx_uint64_t * itemid,unsigned char * value_type,zbx_timespec_t ** ts,zbx_variant_t * value,zbx_vector_ptr_t * history,zbx_preproc_op_t ** steps,int * steps_num,const unsigned char * data)813 void zbx_preprocessor_unpack_task(zbx_uint64_t *itemid, unsigned char *value_type, zbx_timespec_t **ts,
814 zbx_variant_t *value, zbx_vector_ptr_t *history, zbx_preproc_op_t **steps,
815 int *steps_num, const unsigned char *data)
816 {
817 const unsigned char *offset = data;
818 unsigned char ts_marker;
819 zbx_timespec_t *timespec = NULL;
820
821 offset += zbx_deserialize_uint64(offset, itemid);
822 offset += zbx_deserialize_char(offset, value_type);
823 offset += zbx_deserialize_char(offset, &ts_marker);
824
825 if (0 != ts_marker)
826 {
827 timespec = (zbx_timespec_t *)zbx_malloc(NULL, sizeof(zbx_timespec_t));
828
829 offset += zbx_deserialize_int(offset, ×pec->sec);
830 offset += zbx_deserialize_int(offset, ×pec->ns);
831 }
832
833 *ts = timespec;
834
835 offset += preprocesser_unpack_variant(offset, value);
836 offset += preprocesser_unpack_history(offset, history);
837 (void)preprocessor_unpack_steps(offset, steps, steps_num);
838 }
839
840 /******************************************************************************
841 * *
842 * Function: zbx_preprocessor_unpack_result *
843 * *
844 * Purpose: unpack preprocessing task data from IPC data buffer *
845 * *
846 * Parameters: value - [OUT] result value *
847 * history - [OUT] item history data *
848 * error - [OUT] preprocessing error *
849 * data - [IN] IPC data buffer *
850 * *
851 ******************************************************************************/
zbx_preprocessor_unpack_result(zbx_variant_t * value,zbx_vector_ptr_t * history,char ** error,const unsigned char * data)852 void zbx_preprocessor_unpack_result(zbx_variant_t *value, zbx_vector_ptr_t *history, char **error,
853 const unsigned char *data)
854 {
855 zbx_uint32_t value_len;
856 const unsigned char *offset = data;
857
858 offset += preprocesser_unpack_variant(offset, value);
859 offset += preprocesser_unpack_history(offset, history);
860
861 (void)zbx_deserialize_str(offset, error, value_len);
862 }
863
864 /******************************************************************************
865 * *
866 * Function: zbx_preprocessor_unpack_test_result *
867 * *
868 * Purpose: unpack preprocessing test data from IPC data buffer *
869 * *
870 * Parameters: results - [OUT] the preprocessing step results *
871 * history - [OUT] item history data *
872 * error - [OUT] preprocessing error *
873 * data - [IN] IPC data buffer *
874 * *
875 ******************************************************************************/
zbx_preprocessor_unpack_test_result(zbx_vector_ptr_t * results,zbx_vector_ptr_t * history,char ** error,const unsigned char * data)876 void zbx_preprocessor_unpack_test_result(zbx_vector_ptr_t *results, zbx_vector_ptr_t *history,
877 char **error, const unsigned char *data)
878 {
879 zbx_uint32_t value_len;
880 const unsigned char *offset = data;
881 int i, results_num;
882 zbx_preproc_result_t *result;
883
884 offset += zbx_deserialize_int(offset, &results_num);
885
886 zbx_vector_ptr_reserve(results, results_num);
887
888 for (i = 0; i < results_num; i++)
889 {
890 result = (zbx_preproc_result_t *)zbx_malloc(NULL, sizeof(zbx_preproc_result_t));
891 offset += preprocesser_unpack_variant(offset, &result->value);
892 offset += zbx_deserialize_str(offset, &result->error, value_len);
893 offset += zbx_deserialize_char(offset, &result->action);
894 zbx_vector_ptr_append(results, result);
895 }
896
897 offset += preprocesser_unpack_history(offset, history);
898
899 (void)zbx_deserialize_str(offset, error, value_len);
900 }
901
902 /******************************************************************************
903 * *
904 * Function: zbx_preprocessor_unpack_diag_stats *
905 * *
906 * Purpose: unpack preprocessing test data from IPC data buffer *
907 * *
908 * Parameters: total - [OUT] the number of values *
909 * queued - [OUT] the number of values waiting to be *
910 * preprocessed *
911 * processing - [OUT] the number of values being preprocessed *
912 * done - [OUT] the number of values waiting to be flushed *
913 * that are either preprocessed or did not *
914 * require preprocessing *
915 * pending - [OUT] the number of values pending to be *
916 * preprocessed after previous value for *
917 * example delta, throttling depends on *
918 * previous value *
919 * data - [IN] IPC data buffer *
920 * *
921 ******************************************************************************/
zbx_preprocessor_unpack_diag_stats(int * total,int * queued,int * processing,int * done,int * pending,const unsigned char * data)922 void zbx_preprocessor_unpack_diag_stats(int *total, int *queued, int *processing, int *done,
923 int *pending, const unsigned char *data)
924 {
925 const unsigned char *offset = data;
926
927 offset += zbx_deserialize_int(offset, total);
928 offset += zbx_deserialize_int(offset, queued);
929 offset += zbx_deserialize_int(offset, processing);
930 offset += zbx_deserialize_int(offset, done);
931 (void)zbx_deserialize_int(offset, pending);
932 }
933
934 /******************************************************************************
935 * *
936 * Function: zbx_preprocessor_unpack_top_request *
937 * *
938 * Purpose: unpack preprocessing test data from IPC data buffer *
939 * *
940 * Parameters: data - [OUT] memory buffer for packed data *
941 * limit - [IN] the number of top values to return *
942 * *
943 ******************************************************************************/
zbx_preprocessor_unpack_top_request(int * limit,const unsigned char * data)944 void zbx_preprocessor_unpack_top_request(int *limit, const unsigned char *data)
945 {
946 (void)zbx_deserialize_value(data, limit);
947 }
948
949 /******************************************************************************
950 * *
951 * Function: zbx_preprocessor_unpack_top_request *
952 * *
953 * Purpose: unpack preprocessing test data from IPC data buffer *
954 * *
955 * Parameters: items - [OUT] the item diag data *
956 * data - [IN] memory buffer for packed data *
957 * *
958 ******************************************************************************/
zbx_preprocessor_unpack_top_result(zbx_vector_ptr_t * items,const unsigned char * data)959 void zbx_preprocessor_unpack_top_result(zbx_vector_ptr_t *items, const unsigned char *data)
960 {
961 int i, items_num;
962
963 data += zbx_deserialize_value(data, &items_num);
964
965 if (0 != items_num)
966 {
967 zbx_vector_ptr_reserve(items, items_num);
968
969 for (i = 0; i < items_num; i++)
970 {
971 zbx_preproc_item_stats_t *item;
972
973 item = (zbx_preproc_item_stats_t *)zbx_malloc(NULL, sizeof(zbx_preproc_item_stats_t));
974 data += zbx_deserialize_value(data, &item->itemid);
975 data += zbx_deserialize_value(data, &item->values_num);
976 data += zbx_deserialize_value(data, &item->steps_num);
977 zbx_vector_ptr_append(items, item);
978 }
979 }
980 }
981
982 /******************************************************************************
983 * *
984 * Function: preprocessor_send *
985 * *
986 * Purpose: sends command to preprocessor manager *
987 * *
988 * Parameters: code - [IN] message code *
989 * data - [IN] message data *
990 * size - [IN] message data size *
991 * response - [OUT] response message (can be NULL if response is *
992 * not requested) *
993 * *
994 ******************************************************************************/
preprocessor_send(zbx_uint32_t code,unsigned char * data,zbx_uint32_t size,zbx_ipc_message_t * response)995 static void preprocessor_send(zbx_uint32_t code, unsigned char *data, zbx_uint32_t size,
996 zbx_ipc_message_t *response)
997 {
998 char *error = NULL;
999 static zbx_ipc_socket_t socket = {0};
1000
1001 /* each process has a permanent connection to preprocessing manager */
1002 if (0 == socket.fd && FAIL == zbx_ipc_socket_open(&socket, ZBX_IPC_SERVICE_PREPROCESSING, SEC_PER_MIN,
1003 &error))
1004 {
1005 zabbix_log(LOG_LEVEL_CRIT, "cannot connect to preprocessing service: %s", error);
1006 exit(EXIT_FAILURE);
1007 }
1008
1009 if (FAIL == zbx_ipc_socket_write(&socket, code, data, size))
1010 {
1011 zabbix_log(LOG_LEVEL_CRIT, "cannot send data to preprocessing service");
1012 exit(EXIT_FAILURE);
1013 }
1014
1015 if (NULL != response && FAIL == zbx_ipc_socket_read(&socket, response))
1016 {
1017 zabbix_log(LOG_LEVEL_CRIT, "cannot receive data from preprocessing service");
1018 exit(EXIT_FAILURE);
1019 }
1020 }
1021
1022 /******************************************************************************
1023 * *
1024 * Function: zbx_preprocess_item_value *
1025 * *
1026 * Purpose: perform item value preprocessing and dependent item processing *
1027 * *
1028 * Parameters: itemid - [IN] the itemid *
1029 * itemid - [IN] the hostid *
1030 * item_value_type - [IN] the item value type *
1031 * item_flags - [IN] the item flags (e. g. lld rule) *
1032 * result - [IN] agent result containing the value *
1033 * to add *
1034 * ts - [IN] the value timestamp *
1035 * state - [IN] the item state *
1036 * error - [IN] the error message in case item state is *
1037 * ITEM_STATE_NOTSUPPORTED *
1038 * *
1039 ******************************************************************************/
zbx_preprocess_item_value(zbx_uint64_t itemid,zbx_uint64_t hostid,unsigned char item_value_type,unsigned char item_flags,AGENT_RESULT * result,zbx_timespec_t * ts,unsigned char state,char * error)1040 void zbx_preprocess_item_value(zbx_uint64_t itemid, zbx_uint64_t hostid, unsigned char item_value_type,
1041 unsigned char item_flags, AGENT_RESULT *result, zbx_timespec_t *ts, unsigned char state, char *error)
1042 {
1043 zbx_preproc_item_value_t value = {.itemid = itemid, .hostid = hostid, .item_value_type = item_value_type,
1044 .error = error, .item_flags = item_flags, .state = state, .ts = ts};
1045 zbx_result_ptr_t result_ptr = {.result = result};
1046 size_t value_len = 0, len;
1047
1048 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1049
1050 if (ITEM_STATE_NORMAL == state)
1051 {
1052 if (0 != ISSET_STR(result))
1053 value_len = strlen(result->str);
1054
1055 if (0 != ISSET_TEXT(result))
1056 {
1057 if (value_len < (len = strlen(result->text)))
1058 value_len = len;
1059 }
1060
1061 if (0 != ISSET_LOG(result))
1062 {
1063 if (value_len < (len = strlen(result->log->value)))
1064 value_len = len;
1065 }
1066
1067 if (ZBX_MAX_RECV_DATA_SIZE < value_len)
1068 {
1069 result_ptr.result = NULL;
1070 value.state = ITEM_STATE_NOTSUPPORTED;
1071 value.error = "Value is too large.";
1072 }
1073 }
1074
1075 value.result_ptr = &result_ptr;
1076
1077 if (0 == preprocessor_pack_value(&cached_message, &value))
1078 {
1079 zbx_preprocessor_flush();
1080 preprocessor_pack_value(&cached_message, &value);
1081 }
1082
1083 if (MAX_VALUES_LOCAL < ++cached_values)
1084 zbx_preprocessor_flush();
1085
1086 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1087 }
1088
1089 /******************************************************************************
1090 * *
1091 * Function: zbx_preprocessor_flush *
1092 * *
1093 * Purpose: send flush command to preprocessing manager *
1094 * *
1095 ******************************************************************************/
zbx_preprocessor_flush(void)1096 void zbx_preprocessor_flush(void)
1097 {
1098 if (0 < cached_message.size)
1099 {
1100 preprocessor_send(ZBX_IPC_PREPROCESSOR_REQUEST, cached_message.data, cached_message.size, NULL);
1101
1102 zbx_ipc_message_clean(&cached_message);
1103 zbx_ipc_message_init(&cached_message);
1104 cached_values = 0;
1105 }
1106 }
1107
1108 /******************************************************************************
1109 * *
1110 * Function: zbx_preprocessor_get_queue_size *
1111 * *
1112 * Purpose: get queue size (enqueued value count) of preprocessing manager *
1113 * *
1114 * Return value: enqueued item count *
1115 * *
1116 ******************************************************************************/
zbx_preprocessor_get_queue_size(void)1117 zbx_uint64_t zbx_preprocessor_get_queue_size(void)
1118 {
1119 zbx_uint64_t size;
1120 zbx_ipc_message_t message;
1121
1122 zbx_ipc_message_init(&message);
1123 preprocessor_send(ZBX_IPC_PREPROCESSOR_QUEUE, NULL, 0, &message);
1124 memcpy(&size, message.data, sizeof(zbx_uint64_t));
1125 zbx_ipc_message_clean(&message);
1126
1127 return size;
1128 }
1129
1130 /******************************************************************************
1131 * *
1132 * Function: zbx_preproc_op_free *
1133 * *
1134 * Purpose: frees preprocessing step *
1135 * *
1136 ******************************************************************************/
zbx_preproc_op_free(zbx_preproc_op_t * op)1137 void zbx_preproc_op_free(zbx_preproc_op_t *op)
1138 {
1139 zbx_free(op->params);
1140 zbx_free(op->error_handler_params);
1141 zbx_free(op);
1142 }
1143
1144 /******************************************************************************
1145 * *
1146 * Function: zbx_preproc_result_free *
1147 * *
1148 * Purpose: frees preprocessing step test result *
1149 * *
1150 ******************************************************************************/
zbx_preproc_result_free(zbx_preproc_result_t * result)1151 void zbx_preproc_result_free(zbx_preproc_result_t *result)
1152 {
1153 zbx_variant_clear(&result->value);
1154 zbx_free(result->error);
1155 zbx_free(result);
1156 }
1157
1158 /******************************************************************************
1159 * *
1160 * Function: preprocessor_pack_test_request *
1161 * *
1162 * Purpose: packs preprocessing step request for serialization *
1163 * *
1164 * Return value: The size of packed data *
1165 * *
1166 ******************************************************************************/
preprocessor_pack_test_request(unsigned char ** data,unsigned char value_type,const char * value,const zbx_timespec_t * ts,const zbx_vector_ptr_t * history,const zbx_vector_ptr_t * steps)1167 static zbx_uint32_t preprocessor_pack_test_request(unsigned char **data, unsigned char value_type,
1168 const char *value, const zbx_timespec_t *ts, const zbx_vector_ptr_t *history,
1169 const zbx_vector_ptr_t *steps)
1170 {
1171 zbx_packed_field_t *offset, *fields;
1172 zbx_uint32_t size;
1173 int i, history_num;
1174 zbx_ipc_message_t message;
1175
1176 history_num = (NULL != history ? history->values_num : 0);
1177
1178 /* 6 is a max field count (without preprocessing step and history fields) */
1179 fields = (zbx_packed_field_t *)zbx_malloc(NULL, (6 + steps->values_num * 4 + history_num * 5)
1180 * sizeof(zbx_packed_field_t));
1181
1182 offset = fields;
1183
1184 *offset++ = PACKED_FIELD(&value_type, sizeof(unsigned char));
1185 *offset++ = PACKED_FIELD(value, 0);
1186 *offset++ = PACKED_FIELD(&ts->sec, sizeof(int));
1187 *offset++ = PACKED_FIELD(&ts->ns, sizeof(int));
1188
1189 offset += preprocessor_pack_history(offset, history, &history_num);
1190
1191 *offset++ = PACKED_FIELD(&steps->values_num, sizeof(int));
1192
1193 for (i = 0; i < steps->values_num; i++)
1194 offset += preprocessor_pack_step(offset, (zbx_preproc_op_t *)steps->values[i]);
1195
1196 zbx_ipc_message_init(&message);
1197 size = message_pack_data(&message, fields, offset - fields);
1198 *data = message.data;
1199 zbx_free(fields);
1200
1201 return size;
1202 }
1203
1204 /******************************************************************************
1205 * *
1206 * Function: zbx_preprocessor_unpack_test_request *
1207 * *
1208 * Purpose: unpack preprocessing test request data from IPC data buffer *
1209 * *
1210 * Parameters: value_type - [OUT] item value type *
1211 * value - [OUT] the value *
1212 * ts - [OUT] value timestamp *
1213 * value - [OUT] item value *
1214 * history - [OUT] history data *
1215 * steps - [OUT] preprocessing steps *
1216 * steps_num - [OUT] preprocessing step count *
1217 * data - [IN] IPC data buffer *
1218 * *
1219 ******************************************************************************/
zbx_preprocessor_unpack_test_request(unsigned char * value_type,char ** value,zbx_timespec_t * ts,zbx_vector_ptr_t * history,zbx_preproc_op_t ** steps,int * steps_num,const unsigned char * data)1220 void zbx_preprocessor_unpack_test_request(unsigned char *value_type, char **value, zbx_timespec_t *ts,
1221 zbx_vector_ptr_t *history, zbx_preproc_op_t **steps, int *steps_num, const unsigned char *data)
1222 {
1223 zbx_uint32_t value_len;
1224 const unsigned char *offset = data;
1225
1226 offset += zbx_deserialize_char(offset, value_type);
1227 offset += zbx_deserialize_str(offset, value, value_len);
1228 offset += zbx_deserialize_int(offset, &ts->sec);
1229 offset += zbx_deserialize_int(offset, &ts->ns);
1230
1231 offset += preprocesser_unpack_history(offset, history);
1232 (void)preprocessor_unpack_steps(offset, steps, steps_num);
1233 }
1234
1235 /******************************************************************************
1236 * *
1237 * Function: zbx_preprocessor_test *
1238 * *
1239 * Purpose: tests item preprocessing with the specified input value and steps *
1240 * *
1241 ******************************************************************************/
zbx_preprocessor_test(unsigned char value_type,const char * value,const zbx_timespec_t * ts,const zbx_vector_ptr_t * steps,zbx_vector_ptr_t * results,zbx_vector_ptr_t * history,char ** preproc_error,char ** error)1242 int zbx_preprocessor_test(unsigned char value_type, const char *value, const zbx_timespec_t *ts,
1243 const zbx_vector_ptr_t *steps, zbx_vector_ptr_t *results, zbx_vector_ptr_t *history,
1244 char **preproc_error, char **error)
1245 {
1246 unsigned char *data = NULL;
1247 zbx_uint32_t size;
1248 int ret = FAIL;
1249 unsigned char *result;
1250
1251 size = preprocessor_pack_test_request(&data, value_type, value, ts, history, steps);
1252
1253 if (SUCCEED != zbx_ipc_async_exchange(ZBX_IPC_SERVICE_PREPROCESSING, ZBX_IPC_PREPROCESSOR_TEST_REQUEST,
1254 SEC_PER_MIN, data, size, &result, error))
1255 {
1256 goto out;
1257 }
1258
1259 zbx_preprocessor_unpack_test_result(results, history, preproc_error, result);
1260 zbx_free(result);
1261
1262 ret = SUCCEED;
1263 out:
1264 zbx_free(data);
1265
1266 return ret;
1267 }
1268
1269 /******************************************************************************
1270 * *
1271 * Function: zbx_preprocessor_get_diag_stats *
1272 * *
1273 * Purpose: get preprocessing manager diagnostic statistics *
1274 * *
1275 ******************************************************************************/
zbx_preprocessor_get_diag_stats(int * total,int * queued,int * processing,int * done,int * pending,char ** error)1276 int zbx_preprocessor_get_diag_stats(int *total, int *queued, int *processing, int *done,
1277 int *pending, char **error)
1278 {
1279 unsigned char *result;
1280
1281 if (SUCCEED != zbx_ipc_async_exchange(ZBX_IPC_SERVICE_PREPROCESSING, ZBX_IPC_PREPROCESSOR_DIAG_STATS,
1282 SEC_PER_MIN, NULL, 0, &result, error))
1283 {
1284 return FAIL;
1285 }
1286
1287 zbx_preprocessor_unpack_diag_stats(total, queued, processing, done, pending, result);
1288 zbx_free(result);
1289
1290 return SUCCEED;
1291 }
1292
1293 /******************************************************************************
1294 * *
1295 * Function: zbx_preprocessor_get_top_items *
1296 * *
1297 * Purpose: get the top N items by the number of queued values *
1298 * *
1299 ******************************************************************************/
preprocessor_get_top_items(int limit,zbx_vector_ptr_t * items,char ** error,zbx_uint32_t code)1300 static int preprocessor_get_top_items(int limit, zbx_vector_ptr_t *items, char **error, zbx_uint32_t code)
1301 {
1302 int ret;
1303 unsigned char *data, *result;
1304 zbx_uint32_t data_len;
1305
1306 data_len = zbx_preprocessor_pack_top_items_request(&data, limit);
1307
1308 if (SUCCEED != (ret = zbx_ipc_async_exchange(ZBX_IPC_SERVICE_PREPROCESSING, code, SEC_PER_MIN, data, data_len,
1309 &result, error)))
1310 {
1311 goto out;
1312 }
1313
1314 zbx_preprocessor_unpack_top_result(items, result);
1315 zbx_free(result);
1316 out:
1317 zbx_free(data);
1318
1319 return ret;
1320 }
1321
1322 /******************************************************************************
1323 * *
1324 * Function: zbx_preprocessor_get_top_items *
1325 * *
1326 * Purpose: get the top N items by the number of queued values *
1327 * *
1328 ******************************************************************************/
zbx_preprocessor_get_top_items(int limit,zbx_vector_ptr_t * items,char ** error)1329 int zbx_preprocessor_get_top_items(int limit, zbx_vector_ptr_t *items, char **error)
1330 {
1331 return preprocessor_get_top_items(limit, items, error, ZBX_IPC_PREPROCESSOR_TOP_ITEMS);
1332 }
1333
1334 /******************************************************************************
1335 * *
1336 * Function: zbx_preprocessor_get_top_oldest_preproc_items *
1337 * *
1338 * Purpose: get the oldest items with preprocessing still in queue *
1339 * *
1340 ******************************************************************************/
zbx_preprocessor_get_top_oldest_preproc_items(int limit,zbx_vector_ptr_t * items,char ** error)1341 int zbx_preprocessor_get_top_oldest_preproc_items(int limit, zbx_vector_ptr_t *items, char **error)
1342 {
1343 return preprocessor_get_top_items(limit, items, error, ZBX_IPC_PREPROCESSOR_TOP_OLDEST_PREPROC_ITEMS);
1344 }
1345