1 /*
2 ** Zabbix
3 ** Copyright (C) 2001-2021 Zabbix SIA
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
18 **/
19 
20 #include "common.h"
21 #include "log.h"
22 #include "proxy.h"
23 #include "zbxserver.h"
24 #include "zbxserialize.h"
25 #include "zbxipcservice.h"
26 
27 #include "preproc.h"
28 #include "preprocessing.h"
29 #include "preproc_history.h"
30 
31 #define PACKED_FIELD_RAW	0
32 #define PACKED_FIELD_STRING	1
33 #define MAX_VALUES_LOCAL	256
34 
35 /* packed field data description */
36 typedef struct
37 {
38 	const void	*value;	/* value to be packed */
39 	zbx_uint32_t	size;	/* size of a value (can be 0 for strings) */
40 	unsigned char	type;	/* field type */
41 }
42 zbx_packed_field_t;
43 
44 #define PACKED_FIELD(value, size)	\
45 		(zbx_packed_field_t){(value), (size), (0 == (size) ? PACKED_FIELD_STRING : PACKED_FIELD_RAW)};
46 
47 static zbx_ipc_message_t	cached_message;
48 static int			cached_values;
49 
50 /******************************************************************************
51  *                                                                            *
52  * Function: message_pack_data                                                *
53  *                                                                            *
54  * Purpose: helper for data packing based on defined format                   *
55  *                                                                            *
56  * Parameters: message - [OUT] IPC message, can be NULL for buffer size       *
57  *                             calculations                                   *
58  *             fields  - [IN]  the definition of data to be packed            *
59  *             count   - [IN]  field count                                    *
60  *                                                                            *
61  * Return value: size of packed data or 0 if the message size would exceed    *
62  *               4GB limit                                                    *
63  *                                                                            *
64  ******************************************************************************/
message_pack_data(zbx_ipc_message_t * message,zbx_packed_field_t * fields,int count)65 static zbx_uint32_t	message_pack_data(zbx_ipc_message_t *message, zbx_packed_field_t *fields, int count)
66 {
67 	int 			i;
68 	zbx_uint32_t		data_size = 0;
69 	zbx_uint64_t		field_size;
70 	unsigned char		*offset = NULL;
71 	const zbx_uint64_t	max_uint32 = ~(zbx_uint32_t)0;
72 
73 	if (NULL != message)
74 	{
75 		/* recursive call to calculate required buffer size */
76 		data_size = message_pack_data(NULL, fields, count);
77 
78 		if (0 == data_size || max_uint32 - message->size < data_size)
79 			return 0;
80 
81 		message->size += data_size;
82 		message->data = (unsigned char *)zbx_realloc(message->data, message->size);
83 		offset = message->data + (message->size - data_size);
84 	}
85 
86 	for (i = 0; i < count; i++)
87 	{
88 		field_size = fields[i].size;
89 		if (NULL != offset)
90 		{
91 			/* data packing */
92 			if (PACKED_FIELD_STRING == fields[i].type)
93 			{
94 				zbx_uint32_t	field_size_uint32 = (zbx_uint32_t)field_size;
95 
96 				memcpy(offset, &field_size_uint32, sizeof(zbx_uint32_t));
97 				if (0 != field_size && NULL != fields[i].value)
98 					memcpy(offset + sizeof(zbx_uint32_t), fields[i].value, field_size);
99 				field_size += sizeof(zbx_uint32_t);
100 			}
101 			else
102 				memcpy(offset, fields[i].value, field_size);
103 
104 			offset += field_size;
105 		}
106 		else
107 		{
108 			/* size calculation */
109 			if (PACKED_FIELD_STRING == fields[i].type)
110 			{
111 				field_size = (NULL != fields[i].value) ? strlen((const char *)fields[i].value) + 1 : 0;
112 				fields[i].size = (zbx_uint32_t)field_size;
113 
114 				field_size += sizeof(zbx_uint32_t);
115 			}
116 
117 			if (field_size + data_size > max_uint32)
118 				return 0;
119 
120 			data_size += (zbx_uint32_t)field_size;
121 		}
122 	}
123 
124 	return data_size;
125 }
126 
127 /******************************************************************************
128  *                                                                            *
129  * Function: preprocessor_pack_value                                          *
130  *                                                                            *
131  * Purpose: pack item value data into a single buffer that can be used in IPC *
132  *                                                                            *
133  * Parameters: message - [OUT] IPC message                                    *
134  *             value   - [IN]  value to be packed                             *
135  *                                                                            *
136  * Return value: size of packed data                                          *
137  *                                                                            *
138  ******************************************************************************/
preprocessor_pack_value(zbx_ipc_message_t * message,zbx_preproc_item_value_t * value)139 static zbx_uint32_t	preprocessor_pack_value(zbx_ipc_message_t *message, zbx_preproc_item_value_t *value)
140 {
141 	zbx_packed_field_t	fields[24], *offset = fields;	/* 24 - max field count */
142 	unsigned char		ts_marker, result_marker, log_marker;
143 
144 	ts_marker = (NULL != value->ts);
145 	result_marker = (NULL != value->result_ptr->result);
146 
147 	*offset++ = PACKED_FIELD(&value->itemid, sizeof(zbx_uint64_t));
148 	*offset++ = PACKED_FIELD(&value->hostid, sizeof(zbx_uint64_t));
149 	*offset++ = PACKED_FIELD(&value->item_value_type, sizeof(unsigned char));
150 	*offset++ = PACKED_FIELD(&value->item_flags, sizeof(unsigned char));
151 	*offset++ = PACKED_FIELD(&value->state, sizeof(unsigned char));
152 	*offset++ = PACKED_FIELD(value->error, 0);
153 	*offset++ = PACKED_FIELD(&ts_marker, sizeof(unsigned char));
154 
155 	if (NULL != value->ts)
156 	{
157 		*offset++ = PACKED_FIELD(&value->ts->sec, sizeof(int));
158 		*offset++ = PACKED_FIELD(&value->ts->ns, sizeof(int));
159 	}
160 
161 	*offset++ = PACKED_FIELD(&result_marker, sizeof(unsigned char));
162 
163 	if (NULL != value->result_ptr->result)
164 	{
165 
166 		*offset++ = PACKED_FIELD(&value->result_ptr->result->lastlogsize, sizeof(zbx_uint64_t));
167 		*offset++ = PACKED_FIELD(&value->result_ptr->result->ui64, sizeof(zbx_uint64_t));
168 		*offset++ = PACKED_FIELD(&value->result_ptr->result->dbl, sizeof(double));
169 		*offset++ = PACKED_FIELD(value->result_ptr->result->str, 0);
170 		*offset++ = PACKED_FIELD(value->result_ptr->result->text, 0);
171 		*offset++ = PACKED_FIELD(value->result_ptr->result->msg, 0);
172 		*offset++ = PACKED_FIELD(&value->result_ptr->result->type, sizeof(int));
173 		*offset++ = PACKED_FIELD(&value->result_ptr->result->mtime, sizeof(int));
174 
175 		log_marker = (NULL != value->result_ptr->result->log);
176 		*offset++ = PACKED_FIELD(&log_marker, sizeof(unsigned char));
177 		if (NULL != value->result_ptr->result->log)
178 		{
179 			*offset++ = PACKED_FIELD(value->result_ptr->result->log->value, 0);
180 			*offset++ = PACKED_FIELD(value->result_ptr->result->log->source, 0);
181 			*offset++ = PACKED_FIELD(&value->result_ptr->result->log->timestamp, sizeof(int));
182 			*offset++ = PACKED_FIELD(&value->result_ptr->result->log->severity, sizeof(int));
183 			*offset++ = PACKED_FIELD(&value->result_ptr->result->log->logeventid, sizeof(int));
184 		}
185 	}
186 
187 	return message_pack_data(message, fields, offset - fields);
188 }
189 
190 /******************************************************************************
191  *                                                                            *
192  * Function: preprocessor_pack_variant                                        *
193  *                                                                            *
194  * Purpose: packs variant value for serialization                             *
195  *                                                                            *
196  * Parameters: fields - [OUT] the packed fields                               *
197  *             value  - [IN] the value to pack                                *
198  *                                                                            *
199  * Return value: The number of fields used.                                   *
200  *                                                                            *
201  * Comments: Don't pack local variables, only ones passed in parameters!      *
202  *                                                                            *
203  ******************************************************************************/
preprocessor_pack_variant(zbx_packed_field_t * fields,const zbx_variant_t * value)204 static int	preprocessor_pack_variant(zbx_packed_field_t *fields, const zbx_variant_t *value)
205 {
206 	int	offset = 0;
207 
208 	fields[offset++] = PACKED_FIELD(&value->type, sizeof(unsigned char));
209 
210 	switch (value->type)
211 	{
212 		case ZBX_VARIANT_UI64:
213 			fields[offset++] = PACKED_FIELD(&value->data.ui64, sizeof(zbx_uint64_t));
214 			break;
215 
216 		case ZBX_VARIANT_DBL:
217 			fields[offset++] = PACKED_FIELD(&value->data.dbl, sizeof(double));
218 			break;
219 
220 		case ZBX_VARIANT_STR:
221 			fields[offset++] = PACKED_FIELD(value->data.str, 0);
222 			break;
223 
224 		case ZBX_VARIANT_BIN:
225 			fields[offset++] = PACKED_FIELD(value->data.bin, sizeof(zbx_uint32_t) +
226 					zbx_variant_data_bin_get(value->data.bin, NULL));
227 			break;
228 	}
229 
230 	return offset;
231 }
232 
233 /******************************************************************************
234  *                                                                            *
235  * Function: preprocessor_pack_history                                        *
236  *                                                                            *
237  * Purpose: packs preprocessing history for serialization                     *
238  *                                                                            *
239  * Parameters: fields  - [OUT] the packed fields                              *
240  *             history - [IN] the history to pack                             *
241  *                                                                            *
242  * Return value: The number of fields used.                                   *
243  *                                                                            *
244  * Comments: Don't pack local variables, only ones passed in parameters!      *
245  *                                                                            *
246  ******************************************************************************/
preprocessor_pack_history(zbx_packed_field_t * fields,const zbx_vector_ptr_t * history,const int * history_num)247 static int	preprocessor_pack_history(zbx_packed_field_t *fields, const zbx_vector_ptr_t *history,
248 		const int *history_num)
249 {
250 	int	i, offset = 0;
251 
252 	fields[offset++] = PACKED_FIELD(history_num, sizeof(int));
253 
254 	for (i = 0; i < *history_num; i++)
255 	{
256 		zbx_preproc_op_history_t	*ophistory = (zbx_preproc_op_history_t *)history->values[i];
257 
258 		fields[offset++] = PACKED_FIELD(&ophistory->index, sizeof(int));
259 		offset += preprocessor_pack_variant(&fields[offset], &ophistory->value);
260 		fields[offset++] = PACKED_FIELD(&ophistory->ts.sec, sizeof(int));
261 		fields[offset++] = PACKED_FIELD(&ophistory->ts.ns, sizeof(int));
262 	}
263 
264 	return offset;
265 }
266 
267 /******************************************************************************
268  *                                                                            *
269  * Function: preprocessor_pack_step                                           *
270  *                                                                            *
271  * Purpose: packs preprocessing step for serialization                        *
272  *                                                                            *
273  * Parameters: fields - [OUT] the packed fields                               *
274  *             step   - [IN] the step to pack                                 *
275  *                                                                            *
276  * Return value: The number of fields used.                                   *
277  *                                                                            *
278  * Comments: Don't pack local variables, only ones passed in parameters!      *
279  *                                                                            *
280  ******************************************************************************/
preprocessor_pack_step(zbx_packed_field_t * fields,const zbx_preproc_op_t * step)281 static int	preprocessor_pack_step(zbx_packed_field_t *fields, const zbx_preproc_op_t *step)
282 {
283 	int	offset = 0;
284 
285 	fields[offset++] = PACKED_FIELD(&step->type, sizeof(char));
286 	fields[offset++] = PACKED_FIELD(step->params, 0);
287 	fields[offset++] = PACKED_FIELD(&step->error_handler, sizeof(char));
288 	fields[offset++] = PACKED_FIELD(step->error_handler_params, 0);
289 
290 	return offset;
291 }
292 
293 /******************************************************************************
294  *                                                                            *
295  * Function: preprocessor_pack_steps                                          *
296  *                                                                            *
297  * Purpose: packs preprocessing steps for serialization                       *
298  *                                                                            *
299  * Parameters: fields    - [OUT] the packed fields                            *
300  *             steps     - [IN] the steps to pack                             *
301  *             steps_num - [IN] the number of steps                           *
302  *                                                                            *
303  * Return value: The number of fields used.                                   *
304  *                                                                            *
305  * Comments: Don't pack local variables, only ones passed in parameters!      *
306  *                                                                            *
307  ******************************************************************************/
preprocessor_pack_steps(zbx_packed_field_t * fields,const zbx_preproc_op_t * steps,const int * steps_num)308 static int	preprocessor_pack_steps(zbx_packed_field_t *fields, const zbx_preproc_op_t *steps, const int *steps_num)
309 {
310 	int	i, offset = 0;
311 
312 	fields[offset++] = PACKED_FIELD(steps_num, sizeof(int));
313 
314 	for (i = 0; i < *steps_num; i++)
315 		offset += preprocessor_pack_step(&fields[offset], &steps[i]);
316 
317 	return offset;
318 }
319 
320 /******************************************************************************
321  *                                                                            *
322  * Function: preprocesser_unpack_variant                                      *
323  *                                                                            *
324  * Purpose: unpacks serialized variant value                                  *
325  *                                                                            *
326  * Parameters: data  - [IN] the serialized data                               *
327  *             value - [OUT] the value                                        *
328  *                                                                            *
329  * Return value: The number of bytes parsed.                                  *
330  *                                                                            *
331  ******************************************************************************/
preprocesser_unpack_variant(const unsigned char * data,zbx_variant_t * value)332 static int	preprocesser_unpack_variant(const unsigned char *data, zbx_variant_t *value)
333 {
334 	const unsigned char	*offset = data;
335 	zbx_uint32_t		value_len;
336 
337 	offset += zbx_deserialize_char(offset, &value->type);
338 
339 	switch (value->type)
340 	{
341 		case ZBX_VARIANT_UI64:
342 			offset += zbx_deserialize_uint64(offset, &value->data.ui64);
343 			break;
344 
345 		case ZBX_VARIANT_DBL:
346 			offset += zbx_deserialize_double(offset, &value->data.dbl);
347 			break;
348 
349 		case ZBX_VARIANT_STR:
350 			offset += zbx_deserialize_str(offset, &value->data.str, value_len);
351 			break;
352 
353 		case ZBX_VARIANT_BIN:
354 			offset += zbx_deserialize_bin(offset, &value->data.bin, value_len);
355 			break;
356 	}
357 
358 	return offset - data;
359 }
360 
361 /******************************************************************************
362  *                                                                            *
363  * Function: preprocesser_unpack_history                                      *
364  *                                                                            *
365  * Purpose: unpacks serialized preprocessing history                          *
366  *                                                                            *
367  * Parameters: data    - [IN] the serialized data                             *
368  *             history - [OUT] the history                                    *
369  *                                                                            *
370  * Return value: The number of bytes parsed.                                  *
371  *                                                                            *
372  ******************************************************************************/
preprocesser_unpack_history(const unsigned char * data,zbx_vector_ptr_t * history)373 static int	preprocesser_unpack_history(const unsigned char *data, zbx_vector_ptr_t *history)
374 {
375 	const unsigned char	*offset = data;
376 	int			i, history_num;
377 
378 	offset += zbx_deserialize_int(offset, &history_num);
379 
380 	if (0 != history_num)
381 	{
382 		zbx_vector_ptr_reserve(history, history_num);
383 
384 		for (i = 0; i < history_num; i++)
385 		{
386 			zbx_preproc_op_history_t	*ophistory;
387 
388 			ophistory = zbx_malloc(NULL, sizeof(zbx_preproc_op_history_t));
389 
390 			offset += zbx_deserialize_int(offset, &ophistory->index);
391 			offset += preprocesser_unpack_variant(offset, &ophistory->value);
392 			offset += zbx_deserialize_int(offset, &ophistory->ts.sec);
393 			offset += zbx_deserialize_int(offset, &ophistory->ts.ns);
394 
395 			zbx_vector_ptr_append(history, ophistory);
396 		}
397 	}
398 
399 	return offset - data;
400 }
401 
402 /******************************************************************************
403  *                                                                            *
404  * Function: preprocessor_unpack_step                                         *
405  *                                                                            *
406  * Purpose: unpacks serialized preprocessing step                             *
407  *                                                                            *
408  * Parameters: data - [IN] the serialized data                                *
409  *             step - [OUT] the preprocessing step                            *
410  *                                                                            *
411  * Return value: The number of bytes parsed.                                  *
412  *                                                                            *
413  ******************************************************************************/
preprocessor_unpack_step(const unsigned char * data,zbx_preproc_op_t * step)414 static int	preprocessor_unpack_step(const unsigned char *data, zbx_preproc_op_t *step)
415 {
416 	const unsigned char	*offset = data;
417 	zbx_uint32_t		value_len;
418 
419 	offset += zbx_deserialize_char(offset, &step->type);
420 	offset += zbx_deserialize_str_ptr(offset, step->params, value_len);
421 	offset += zbx_deserialize_char(offset, &step->error_handler);
422 	offset += zbx_deserialize_str_ptr(offset, step->error_handler_params, value_len);
423 
424 	return offset - data;
425 }
426 
427 /******************************************************************************
428  *                                                                            *
429  * Function: preprocessor_unpack_steps                                        *
430  *                                                                            *
431  * Purpose: unpacks serialized preprocessing steps                            *
432  *                                                                            *
433  * Parameters: data      - [IN] the serialized data                           *
434  *             steps     - [OUT] the preprocessing steps                      *
435  *             steps_num - [OUT] the number of steps                          *
436  *                                                                            *
437  * Return value: The number of bytes parsed.                                  *
438  *                                                                            *
439  ******************************************************************************/
preprocessor_unpack_steps(const unsigned char * data,zbx_preproc_op_t ** steps,int * steps_num)440 static int	preprocessor_unpack_steps(const unsigned char *data, zbx_preproc_op_t **steps, int *steps_num)
441 {
442 	const unsigned char	*offset = data;
443 	int			i;
444 
445 	offset += zbx_deserialize_int(offset, steps_num);
446 	if (0 < *steps_num)
447 	{
448 		*steps = (zbx_preproc_op_t *)zbx_malloc(NULL, sizeof(zbx_preproc_op_t) * (*steps_num));
449 		for (i = 0; i < *steps_num; i++)
450 			offset += preprocessor_unpack_step(offset, *steps + i);
451 	}
452 	else
453 		*steps = NULL;
454 
455 	return offset - data;
456 }
457 
458 /******************************************************************************
459  *                                                                            *
460  * Function: zbx_preprocessor_pack_task                                       *
461  *                                                                            *
462  * Purpose: pack preprocessing task data into a single buffer that can be     *
463  *          used in IPC                                                       *
464  *                                                                            *
465  * Parameters: data          - [OUT] memory buffer for packed data            *
466  *             itemid        - [IN] item id                                   *
467  *             value_type    - [IN] item value type                           *
468  *             ts            - [IN] value timestamp                           *
469  *             value         - [IN] item value                                *
470  *             history       - [IN] history data (can be NULL)                *
471  *             steps         - [IN] preprocessing steps                       *
472  *             steps_num     - [IN] preprocessing step count                  *
473  *                                                                            *
474  * Return value: size of packed data                                          *
475  *                                                                            *
476  ******************************************************************************/
zbx_preprocessor_pack_task(unsigned char ** data,zbx_uint64_t itemid,unsigned char value_type,zbx_timespec_t * ts,zbx_variant_t * value,const zbx_vector_ptr_t * history,const zbx_preproc_op_t * steps,int steps_num)477 zbx_uint32_t	zbx_preprocessor_pack_task(unsigned char **data, zbx_uint64_t itemid, unsigned char value_type,
478 		zbx_timespec_t *ts, zbx_variant_t *value, const zbx_vector_ptr_t *history,
479 		const zbx_preproc_op_t *steps, int steps_num)
480 {
481 	zbx_packed_field_t	*offset, *fields;
482 	unsigned char		ts_marker;
483 	zbx_uint32_t		size;
484 	int			history_num;
485 	zbx_ipc_message_t	message;
486 
487 	history_num = (NULL != history ? history->values_num : 0);
488 
489 	/* 9 is a max field count (without preprocessing step and history fields) */
490 	fields = (zbx_packed_field_t *)zbx_malloc(NULL, (9 + steps_num * 4 + history_num * 5)
491 			* sizeof(zbx_packed_field_t));
492 
493 	offset = fields;
494 	ts_marker = (NULL != ts);
495 
496 	*offset++ = PACKED_FIELD(&itemid, sizeof(zbx_uint64_t));
497 	*offset++ = PACKED_FIELD(&value_type, sizeof(unsigned char));
498 	*offset++ = PACKED_FIELD(&ts_marker, sizeof(unsigned char));
499 
500 	if (NULL != ts)
501 	{
502 		*offset++ = PACKED_FIELD(&ts->sec, sizeof(int));
503 		*offset++ = PACKED_FIELD(&ts->ns, sizeof(int));
504 	}
505 
506 	offset += preprocessor_pack_variant(offset, value);
507 	offset += preprocessor_pack_history(offset, history, &history_num);
508 	offset += preprocessor_pack_steps(offset, steps, &steps_num);
509 
510 	zbx_ipc_message_init(&message);
511 	size = message_pack_data(&message, fields, offset - fields);
512 	*data = message.data;
513 	zbx_free(fields);
514 
515 	return size;
516 }
517 
518 /******************************************************************************
519  *                                                                            *
520  * Function: zbx_preprocessor_pack_result                                     *
521  *                                                                            *
522  * Purpose: pack preprocessing result data into a single buffer that can be   *
523  *          used in IPC                                                       *
524  *                                                                            *
525  * Parameters: data          - [OUT] memory buffer for packed data            *
526  *             value         - [IN] result value                              *
527  *             history       - [IN] item history data                         *
528  *             error         - [IN] preprocessing error                       *
529  *                                                                            *
530  * Return value: size of packed data                                          *
531  *                                                                            *
532  ******************************************************************************/
zbx_preprocessor_pack_result(unsigned char ** data,zbx_variant_t * value,const zbx_vector_ptr_t * history,char * error)533 zbx_uint32_t	zbx_preprocessor_pack_result(unsigned char **data, zbx_variant_t *value,
534 		const zbx_vector_ptr_t *history, char *error)
535 {
536 	zbx_packed_field_t	*offset, *fields;
537 	zbx_uint32_t		size;
538 	zbx_ipc_message_t	message;
539 	int			history_num;
540 
541 	history_num = history->values_num;
542 
543 	/* 4 is a max field count (without history fields) */
544 	fields = (zbx_packed_field_t *)zbx_malloc(NULL, (4 + history_num * 5) * sizeof(zbx_packed_field_t));
545 	offset = fields;
546 
547 	offset += preprocessor_pack_variant(offset, value);
548 	offset += preprocessor_pack_history(offset, history, &history_num);
549 
550 	*offset++ = PACKED_FIELD(error, 0);
551 
552 	zbx_ipc_message_init(&message);
553 	size = message_pack_data(&message, fields, offset - fields);
554 	*data = message.data;
555 
556 	zbx_free(fields);
557 
558 	return size;
559 }
560 
561 /******************************************************************************
562  *                                                                            *
563  * Function: zbx_preprocessor_pack_test_result                                *
564  *                                                                            *
565  * Purpose: pack preprocessing result data into a single buffer that can be   *
566  *          used in IPC                                                       *
567  *                                                                            *
568  * Parameters: data          - [OUT] memory buffer for packed data            *
569  *             ret           - [IN] return code                               *
570  *             results       - [IN] the preprocessing step results            *
571  *             results_num   - [IN] the number of preprocessing step results  *
572  *             history       - [IN] item history data                         *
573  *             error         - [IN] preprocessing error                       *
574  *                                                                            *
575  * Return value: size of packed data                                          *
576  *                                                                            *
577  ******************************************************************************/
zbx_preprocessor_pack_test_result(unsigned char ** data,const zbx_preproc_result_t * results,int results_num,const zbx_vector_ptr_t * history,const char * error)578 zbx_uint32_t	zbx_preprocessor_pack_test_result(unsigned char **data, const zbx_preproc_result_t *results,
579 		int results_num, const zbx_vector_ptr_t *history, const char *error)
580 {
581 	zbx_packed_field_t	*offset, *fields;
582 	zbx_uint32_t		size;
583 	zbx_ipc_message_t	message;
584 	int			i, history_num;
585 
586 	history_num = history->values_num;
587 
588 	fields = (zbx_packed_field_t *)zbx_malloc(NULL, (3 + history_num * 5 + results_num * 4) *
589 			sizeof(zbx_packed_field_t));
590 	offset = fields;
591 
592 	*offset++ = PACKED_FIELD(&results_num, sizeof(int));
593 
594 	for (i = 0; i < results_num; i++)
595 	{
596 		offset += preprocessor_pack_variant(offset, &results[i].value);
597 		*offset++ = PACKED_FIELD(results[i].error, 0);
598 		*offset++ = PACKED_FIELD(&results[i].action, sizeof(unsigned char));
599 	}
600 
601 	offset += preprocessor_pack_history(offset, history, &history_num);
602 
603 	*offset++ = PACKED_FIELD(error, 0);
604 
605 	zbx_ipc_message_init(&message);
606 	size = message_pack_data(&message, fields, offset - fields);
607 	*data = message.data;
608 
609 	zbx_free(fields);
610 
611 	return size;
612 }
613 
614 /******************************************************************************
615  *                                                                            *
616  * Function: zbx_preprocessor_pack_diag_stats                                 *
617  *                                                                            *
618  * Purpose: pack diagnostic statistics data into a single buffer that can be  *
619  *          used in IPC                                                       *
620  * Parameters: data       - [OUT] memory buffer for packed data               *
621  *             total      - [IN] the number of values                         *
622  *             queued     - [IN] the number of values waiting to be           *
623  *                               preprocessed                                 *
624  *             processing - [IN] the number of values being preprocessed      *
625  *             done       - [IN] the number of values waiting to be flushed   *
626  *                               that are either preprocessed or did not      *
627  *                               require preprocessing                        *
628  *             pending    - [IN] the number of values pending to be           *
629  *                               preprocessed after previous value for        *
630  *                               example delta, throttling depends on         *
631  *                               previous value                               *
632  *             data       - [IN] IPC data buffer                              *
633  *                                                                            *
634  ******************************************************************************/
zbx_preprocessor_pack_diag_stats(unsigned char ** data,int total,int queued,int processing,int done,int pending)635 zbx_uint32_t	zbx_preprocessor_pack_diag_stats(unsigned char **data, int total, int queued, int processing, int done,
636 		int pending)
637 {
638 	unsigned char	*ptr;
639 	zbx_uint32_t	data_len = 0;
640 
641 	zbx_serialize_prepare_value(data_len, total);
642 	zbx_serialize_prepare_value(data_len, queued);
643 	zbx_serialize_prepare_value(data_len, processing);
644 	zbx_serialize_prepare_value(data_len, done);
645 	zbx_serialize_prepare_value(data_len, pending);
646 
647 	*data = (unsigned char *)zbx_malloc(NULL, data_len);
648 
649 	ptr = *data;
650 	ptr += zbx_serialize_value(ptr, total);
651 	ptr += zbx_serialize_value(ptr, queued);
652 	ptr += zbx_serialize_value(ptr, processing);
653 	ptr += zbx_serialize_value(ptr, done);
654 	(void)zbx_serialize_value(ptr, pending);
655 
656 	return data_len;
657 }
658 
659 /******************************************************************************
660  *                                                                            *
661  * Function: zbx_preprocessor_pack_top_request                                *
662  *                                                                            *
663  * Purpose: pack top request data into a single buffer that can be used in IPC*
664  *                                                                            *
665  * Parameters: data  - [OUT] memory buffer for packed data                    *
666  *             field - [IN] the sort field                                    *
667  *             limit - [IN] the number of top values to return                *
668  *                                                                            *
669  ******************************************************************************/
zbx_preprocessor_pack_top_items_request(unsigned char ** data,int limit)670 zbx_uint32_t	zbx_preprocessor_pack_top_items_request(unsigned char **data, int limit)
671 {
672 	zbx_uint32_t	data_len = 0;
673 
674 	zbx_serialize_prepare_value(data_len, limit);
675 	*data = (unsigned char *)zbx_malloc(NULL, data_len);
676 	(void)zbx_serialize_value(*data, limit);
677 
678 	return data_len;
679 }
680 
681 /******************************************************************************
682  *                                                                            *
683  * Function: zbx_preprocessor_pack_top_result                                 *
684  *                                                                            *
685  * Purpose: pack top result data into a single buffer that can be used in IPC *
686  *                                                                            *
687  * Parameters: data      - [OUT] memory buffer for packed data                *
688  *             items     - [IN] the array of item references                  *
689  *             items_num - [IN] the number of items                           *
690  *                                                                            *
691  ******************************************************************************/
zbx_preprocessor_pack_top_items_result(unsigned char ** data,zbx_preproc_item_stats_t ** items,int items_num)692 zbx_uint32_t	zbx_preprocessor_pack_top_items_result(unsigned char **data, zbx_preproc_item_stats_t **items,
693 		int items_num)
694 {
695 	unsigned char	*ptr;
696 	zbx_uint32_t	data_len = 0, item_len = 0;
697 	int		i;
698 
699 	if (0 != items_num)
700 	{
701 		zbx_serialize_prepare_value(item_len, items[0]->itemid);
702 		zbx_serialize_prepare_value(item_len, items[0]->values_num);
703 		zbx_serialize_prepare_value(item_len, items[0]->steps_num);
704 	}
705 
706 	zbx_serialize_prepare_value(data_len, items_num);
707 	data_len += item_len * items_num;
708 	*data = (unsigned char *)zbx_malloc(NULL, data_len);
709 
710 	ptr = *data;
711 	ptr += zbx_serialize_value(ptr, items_num);
712 
713 	for (i = 0; i < items_num; i++)
714 	{
715 		ptr += zbx_serialize_value(ptr, items[i]->itemid);
716 		ptr += zbx_serialize_value(ptr, items[i]->values_num);
717 		ptr += zbx_serialize_value(ptr, items[i]->steps_num);
718 	}
719 
720 	return data_len;
721 }
722 
723 /******************************************************************************
724  *                                                                            *
725  * Function: zbx_preprocessor_unpack_value                                    *
726  *                                                                            *
727  * Purpose: unpack item value data from IPC data buffer                       *
728  *                                                                            *
729  * Parameters: value    - [OUT] unpacked item value                           *
730  *             data     - [IN]  IPC data buffer                               *
731  *                                                                            *
732  * Return value: size of packed data                                          *
733  *                                                                            *
734  ******************************************************************************/
zbx_preprocessor_unpack_value(zbx_preproc_item_value_t * value,unsigned char * data)735 zbx_uint32_t	zbx_preprocessor_unpack_value(zbx_preproc_item_value_t *value, unsigned char *data)
736 {
737 	zbx_uint32_t	value_len;
738 	zbx_timespec_t	*timespec = NULL;
739 	AGENT_RESULT	*agent_result = NULL;
740 	zbx_log_t	*log = NULL;
741 	unsigned char	*offset = data, ts_marker, result_marker, log_marker;
742 
743 	offset += zbx_deserialize_uint64(offset, &value->itemid);
744 	offset += zbx_deserialize_uint64(offset, &value->hostid);
745 	offset += zbx_deserialize_char(offset, &value->item_value_type);
746 	offset += zbx_deserialize_char(offset, &value->item_flags);
747 	offset += zbx_deserialize_char(offset, &value->state);
748 	offset += zbx_deserialize_str(offset, &value->error, value_len);
749 	offset += zbx_deserialize_char(offset, &ts_marker);
750 
751 	if (0 != ts_marker)
752 	{
753 		timespec = (zbx_timespec_t *)zbx_malloc(NULL, sizeof(zbx_timespec_t));
754 
755 		offset += zbx_deserialize_int(offset, &timespec->sec);
756 		offset += zbx_deserialize_int(offset, &timespec->ns);
757 	}
758 
759 	value->ts = timespec;
760 
761 	offset += zbx_deserialize_char(offset, &result_marker);
762 	if (0 != result_marker)
763 	{
764 		agent_result = (AGENT_RESULT *)zbx_malloc(NULL, sizeof(AGENT_RESULT));
765 
766 		offset += zbx_deserialize_uint64(offset, &agent_result->lastlogsize);
767 		offset += zbx_deserialize_uint64(offset, &agent_result->ui64);
768 		offset += zbx_deserialize_double(offset, &agent_result->dbl);
769 		offset += zbx_deserialize_str(offset, &agent_result->str, value_len);
770 		offset += zbx_deserialize_str(offset, &agent_result->text, value_len);
771 		offset += zbx_deserialize_str(offset, &agent_result->msg, value_len);
772 		offset += zbx_deserialize_int(offset, &agent_result->type);
773 		offset += zbx_deserialize_int(offset, &agent_result->mtime);
774 
775 		offset += zbx_deserialize_char(offset, &log_marker);
776 		if (0 != log_marker)
777 		{
778 			log = (zbx_log_t *)zbx_malloc(NULL, sizeof(zbx_log_t));
779 
780 			offset += zbx_deserialize_str(offset, &log->value, value_len);
781 			offset += zbx_deserialize_str(offset, &log->source, value_len);
782 			offset += zbx_deserialize_int(offset, &log->timestamp);
783 			offset += zbx_deserialize_int(offset, &log->severity);
784 			offset += zbx_deserialize_int(offset, &log->logeventid);
785 		}
786 
787 		agent_result->log = log;
788 	}
789 
790 	value->result_ptr = (zbx_result_ptr_t *)zbx_malloc(NULL, sizeof(zbx_result_ptr_t));
791 	value->result_ptr->result = agent_result;
792 	value->result_ptr->refcount = 1;
793 
794 	return offset - data;
795 }
796 
797 /******************************************************************************
798  *                                                                            *
799  * Function: zbx_preprocessor_unpack_task                                     *
800  *                                                                            *
801  * Purpose: unpack preprocessing task data from IPC data buffer               *
802  *                                                                            *
803  * Parameters: itemid        - [OUT] itemid                                   *
804  *             value_type    - [OUT] item value type                          *
805  *             ts            - [OUT] value timestamp                          *
806  *             value         - [OUT] item value                               *
807  *             history       - [OUT] history data                             *
808  *             steps         - [OUT] preprocessing steps                      *
809  *             steps_num     - [OUT] preprocessing step count                 *
810  *             data          - [IN] IPC data buffer                           *
811  *                                                                            *
812  ******************************************************************************/
zbx_preprocessor_unpack_task(zbx_uint64_t * itemid,unsigned char * value_type,zbx_timespec_t ** ts,zbx_variant_t * value,zbx_vector_ptr_t * history,zbx_preproc_op_t ** steps,int * steps_num,const unsigned char * data)813 void	zbx_preprocessor_unpack_task(zbx_uint64_t *itemid, unsigned char *value_type, zbx_timespec_t **ts,
814 		zbx_variant_t *value, zbx_vector_ptr_t *history, zbx_preproc_op_t **steps,
815 		int *steps_num, const unsigned char *data)
816 {
817 	const unsigned char		*offset = data;
818 	unsigned char 			ts_marker;
819 	zbx_timespec_t			*timespec = NULL;
820 
821 	offset += zbx_deserialize_uint64(offset, itemid);
822 	offset += zbx_deserialize_char(offset, value_type);
823 	offset += zbx_deserialize_char(offset, &ts_marker);
824 
825 	if (0 != ts_marker)
826 	{
827 		timespec = (zbx_timespec_t *)zbx_malloc(NULL, sizeof(zbx_timespec_t));
828 
829 		offset += zbx_deserialize_int(offset, &timespec->sec);
830 		offset += zbx_deserialize_int(offset, &timespec->ns);
831 	}
832 
833 	*ts = timespec;
834 
835 	offset += preprocesser_unpack_variant(offset, value);
836 	offset += preprocesser_unpack_history(offset, history);
837 	(void)preprocessor_unpack_steps(offset, steps, steps_num);
838 }
839 
840 /******************************************************************************
841  *                                                                            *
842  * Function: zbx_preprocessor_unpack_result                                   *
843  *                                                                            *
844  * Purpose: unpack preprocessing task data from IPC data buffer               *
845  *                                                                            *
846  * Parameters: value         - [OUT] result value                             *
847  *             history       - [OUT] item history data                        *
848  *             error         - [OUT] preprocessing error                      *
849  *             data          - [IN] IPC data buffer                           *
850  *                                                                            *
851  ******************************************************************************/
zbx_preprocessor_unpack_result(zbx_variant_t * value,zbx_vector_ptr_t * history,char ** error,const unsigned char * data)852 void	zbx_preprocessor_unpack_result(zbx_variant_t *value, zbx_vector_ptr_t *history, char **error,
853 		const unsigned char *data)
854 {
855 	zbx_uint32_t		value_len;
856 	const unsigned char	*offset = data;
857 
858 	offset += preprocesser_unpack_variant(offset, value);
859 	offset += preprocesser_unpack_history(offset, history);
860 
861 	(void)zbx_deserialize_str(offset, error, value_len);
862 }
863 
864 /******************************************************************************
865  *                                                                            *
866  * Function: zbx_preprocessor_unpack_test_result                              *
867  *                                                                            *
868  * Purpose: unpack preprocessing test data from IPC data buffer               *
869  *                                                                            *
870  * Parameters: results       - [OUT] the preprocessing step results           *
871  *             history       - [OUT] item history data                        *
872  *             error         - [OUT] preprocessing error                      *
873  *             data          - [IN] IPC data buffer                           *
874  *                                                                            *
875  ******************************************************************************/
zbx_preprocessor_unpack_test_result(zbx_vector_ptr_t * results,zbx_vector_ptr_t * history,char ** error,const unsigned char * data)876 void	zbx_preprocessor_unpack_test_result(zbx_vector_ptr_t *results, zbx_vector_ptr_t *history,
877 		char **error, const unsigned char *data)
878 {
879 	zbx_uint32_t		value_len;
880 	const unsigned char	*offset = data;
881 	int			i, results_num;
882 	zbx_preproc_result_t	*result;
883 
884 	offset += zbx_deserialize_int(offset, &results_num);
885 
886 	zbx_vector_ptr_reserve(results, results_num);
887 
888 	for (i = 0; i < results_num; i++)
889 	{
890 		result = (zbx_preproc_result_t *)zbx_malloc(NULL, sizeof(zbx_preproc_result_t));
891 		offset += preprocesser_unpack_variant(offset, &result->value);
892 		offset += zbx_deserialize_str(offset, &result->error, value_len);
893 		offset += zbx_deserialize_char(offset, &result->action);
894 		zbx_vector_ptr_append(results, result);
895 	}
896 
897 	offset += preprocesser_unpack_history(offset, history);
898 
899 	(void)zbx_deserialize_str(offset, error, value_len);
900 }
901 
902 /******************************************************************************
903  *                                                                            *
904  * Function: zbx_preprocessor_unpack_diag_stats                               *
905  *                                                                            *
906  * Purpose: unpack preprocessing test data from IPC data buffer               *
907  *                                                                            *
908  * Parameters: total      - [OUT] the number of values                        *
909  *             queued     - [OUT] the number of values waiting to be          *
910  *                                preprocessed                                *
911  *             processing - [OUT] the number of values being preprocessed     *
912  *             done       - [OUT] the number of values waiting to be flushed  *
913  *                                that are either preprocessed or did not     *
914  *                                require preprocessing                       *
915  *             pending    - [OUT] the number of values pending to be          *
916  *                                preprocessed after previous value for       *
917  *                                example delta, throttling depends on        *
918  *                                previous value                              *
919  *             data       - [IN] IPC data buffer                              *
920  *                                                                            *
921  ******************************************************************************/
zbx_preprocessor_unpack_diag_stats(int * total,int * queued,int * processing,int * done,int * pending,const unsigned char * data)922 void	zbx_preprocessor_unpack_diag_stats(int *total, int *queued, int *processing, int *done,
923 		int *pending, const unsigned char *data)
924 {
925 	const unsigned char	*offset = data;
926 
927 	offset += zbx_deserialize_int(offset, total);
928 	offset += zbx_deserialize_int(offset, queued);
929 	offset += zbx_deserialize_int(offset, processing);
930 	offset += zbx_deserialize_int(offset, done);
931 	(void)zbx_deserialize_int(offset, pending);
932 }
933 
934 /******************************************************************************
935  *                                                                            *
936  * Function: zbx_preprocessor_unpack_top_request                              *
937  *                                                                            *
938  * Purpose: unpack preprocessing test data from IPC data buffer               *
939  *                                                                            *
940  * Parameters: data  - [OUT] memory buffer for packed data                    *
941  *             limit - [IN] the number of top values to return                *
942  *                                                                            *
943  ******************************************************************************/
zbx_preprocessor_unpack_top_request(int * limit,const unsigned char * data)944 void	zbx_preprocessor_unpack_top_request(int *limit, const unsigned char *data)
945 {
946 	(void)zbx_deserialize_value(data, limit);
947 }
948 
949 /******************************************************************************
950  *                                                                            *
951  * Function: zbx_preprocessor_unpack_top_request                              *
952  *                                                                            *
953  * Purpose: unpack preprocessing test data from IPC data buffer               *
954  *                                                                            *
955  * Parameters: items - [OUT] the item diag data                               *
956  *             data  - [IN] memory buffer for packed data                     *
957  *                                                                            *
958  ******************************************************************************/
zbx_preprocessor_unpack_top_result(zbx_vector_ptr_t * items,const unsigned char * data)959 void	zbx_preprocessor_unpack_top_result(zbx_vector_ptr_t *items, const unsigned char *data)
960 {
961 	int	i, items_num;
962 
963 	data += zbx_deserialize_value(data, &items_num);
964 
965 	if (0 != items_num)
966 	{
967 		zbx_vector_ptr_reserve(items, items_num);
968 
969 		for (i = 0; i < items_num; i++)
970 		{
971 			zbx_preproc_item_stats_t	*item;
972 
973 			item = (zbx_preproc_item_stats_t *)zbx_malloc(NULL, sizeof(zbx_preproc_item_stats_t));
974 			data += zbx_deserialize_value(data, &item->itemid);
975 			data += zbx_deserialize_value(data, &item->values_num);
976 			data += zbx_deserialize_value(data, &item->steps_num);
977 			zbx_vector_ptr_append(items, item);
978 		}
979 	}
980 }
981 
982 /******************************************************************************
983  *                                                                            *
984  * Function: preprocessor_send                                                *
985  *                                                                            *
986  * Purpose: sends command to preprocessor manager                             *
987  *                                                                            *
988  * Parameters: code     - [IN] message code                                   *
989  *             data     - [IN] message data                                   *
990  *             size     - [IN] message data size                              *
991  *             response - [OUT] response message (can be NULL if response is  *
992  *                              not requested)                                *
993  *                                                                            *
994  ******************************************************************************/
preprocessor_send(zbx_uint32_t code,unsigned char * data,zbx_uint32_t size,zbx_ipc_message_t * response)995 static void	preprocessor_send(zbx_uint32_t code, unsigned char *data, zbx_uint32_t size,
996 		zbx_ipc_message_t *response)
997 {
998 	char			*error = NULL;
999 	static zbx_ipc_socket_t	socket = {0};
1000 
1001 	/* each process has a permanent connection to preprocessing manager */
1002 	if (0 == socket.fd && FAIL == zbx_ipc_socket_open(&socket, ZBX_IPC_SERVICE_PREPROCESSING, SEC_PER_MIN,
1003 			&error))
1004 	{
1005 		zabbix_log(LOG_LEVEL_CRIT, "cannot connect to preprocessing service: %s", error);
1006 		exit(EXIT_FAILURE);
1007 	}
1008 
1009 	if (FAIL == zbx_ipc_socket_write(&socket, code, data, size))
1010 	{
1011 		zabbix_log(LOG_LEVEL_CRIT, "cannot send data to preprocessing service");
1012 		exit(EXIT_FAILURE);
1013 	}
1014 
1015 	if (NULL != response && FAIL == zbx_ipc_socket_read(&socket, response))
1016 	{
1017 		zabbix_log(LOG_LEVEL_CRIT, "cannot receive data from preprocessing service");
1018 		exit(EXIT_FAILURE);
1019 	}
1020 }
1021 
1022 /******************************************************************************
1023  *                                                                            *
1024  * Function: zbx_preprocess_item_value                                        *
1025  *                                                                            *
1026  * Purpose: perform item value preprocessing and dependent item processing    *
1027  *                                                                            *
1028  * Parameters: itemid          - [IN] the itemid                              *
1029  *             itemid          - [IN] the hostid                              *
1030  *             item_value_type - [IN] the item value type                     *
1031  *             item_flags      - [IN] the item flags (e. g. lld rule)         *
1032  *             result          - [IN] agent result containing the value       *
1033  *                               to add                                       *
1034  *             ts              - [IN] the value timestamp                     *
1035  *             state           - [IN] the item state                          *
1036  *             error           - [IN] the error message in case item state is *
1037  *                               ITEM_STATE_NOTSUPPORTED                      *
1038  *                                                                            *
1039  ******************************************************************************/
zbx_preprocess_item_value(zbx_uint64_t itemid,zbx_uint64_t hostid,unsigned char item_value_type,unsigned char item_flags,AGENT_RESULT * result,zbx_timespec_t * ts,unsigned char state,char * error)1040 void	zbx_preprocess_item_value(zbx_uint64_t itemid, zbx_uint64_t hostid, unsigned char item_value_type,
1041 		unsigned char item_flags, AGENT_RESULT *result, zbx_timespec_t *ts, unsigned char state, char *error)
1042 {
1043 	zbx_preproc_item_value_t	value = {.itemid = itemid, .hostid = hostid, .item_value_type = item_value_type,
1044 					.error = error, .item_flags = item_flags, .state = state, .ts = ts};
1045 	zbx_result_ptr_t		result_ptr = {.result = result};
1046 	size_t				value_len = 0, len;
1047 
1048 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
1049 
1050 	if (ITEM_STATE_NORMAL == state)
1051 	{
1052 		if (0 != ISSET_STR(result))
1053 			value_len = strlen(result->str);
1054 
1055 		if (0 != ISSET_TEXT(result))
1056 		{
1057 			if (value_len < (len = strlen(result->text)))
1058 				value_len = len;
1059 		}
1060 
1061 		if (0 != ISSET_LOG(result))
1062 		{
1063 			if (value_len < (len = strlen(result->log->value)))
1064 				value_len = len;
1065 		}
1066 
1067 		if (ZBX_MAX_RECV_DATA_SIZE < value_len)
1068 		{
1069 			result_ptr.result = NULL;
1070 			value.state = ITEM_STATE_NOTSUPPORTED;
1071 			value.error = "Value is too large.";
1072 		}
1073 	}
1074 
1075 	value.result_ptr = &result_ptr;
1076 
1077 	if (0 == preprocessor_pack_value(&cached_message, &value))
1078 	{
1079 		zbx_preprocessor_flush();
1080 		preprocessor_pack_value(&cached_message, &value);
1081 	}
1082 
1083 	if (MAX_VALUES_LOCAL < ++cached_values)
1084 		zbx_preprocessor_flush();
1085 
1086 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
1087 }
1088 
1089 /******************************************************************************
1090  *                                                                            *
1091  * Function: zbx_preprocessor_flush                                           *
1092  *                                                                            *
1093  * Purpose: send flush command to preprocessing manager                       *
1094  *                                                                            *
1095  ******************************************************************************/
zbx_preprocessor_flush(void)1096 void	zbx_preprocessor_flush(void)
1097 {
1098 	if (0 < cached_message.size)
1099 	{
1100 		preprocessor_send(ZBX_IPC_PREPROCESSOR_REQUEST, cached_message.data, cached_message.size, NULL);
1101 
1102 		zbx_ipc_message_clean(&cached_message);
1103 		zbx_ipc_message_init(&cached_message);
1104 		cached_values = 0;
1105 	}
1106 }
1107 
1108 /******************************************************************************
1109  *                                                                            *
1110  * Function: zbx_preprocessor_get_queue_size                                  *
1111  *                                                                            *
1112  * Purpose: get queue size (enqueued value count) of preprocessing manager    *
1113  *                                                                            *
1114  * Return value: enqueued item count                                          *
1115  *                                                                            *
1116  ******************************************************************************/
zbx_preprocessor_get_queue_size(void)1117 zbx_uint64_t	zbx_preprocessor_get_queue_size(void)
1118 {
1119 	zbx_uint64_t		size;
1120 	zbx_ipc_message_t	message;
1121 
1122 	zbx_ipc_message_init(&message);
1123 	preprocessor_send(ZBX_IPC_PREPROCESSOR_QUEUE, NULL, 0, &message);
1124 	memcpy(&size, message.data, sizeof(zbx_uint64_t));
1125 	zbx_ipc_message_clean(&message);
1126 
1127 	return size;
1128 }
1129 
1130 /******************************************************************************
1131  *                                                                            *
1132  * Function: zbx_preproc_op_free                                              *
1133  *                                                                            *
1134  * Purpose: frees preprocessing step                                          *
1135  *                                                                            *
1136  ******************************************************************************/
zbx_preproc_op_free(zbx_preproc_op_t * op)1137 void	zbx_preproc_op_free(zbx_preproc_op_t *op)
1138 {
1139 	zbx_free(op->params);
1140 	zbx_free(op->error_handler_params);
1141 	zbx_free(op);
1142 }
1143 
1144 /******************************************************************************
1145  *                                                                            *
1146  * Function: zbx_preproc_result_free                                          *
1147  *                                                                            *
1148  * Purpose: frees preprocessing step test result                              *
1149  *                                                                            *
1150  ******************************************************************************/
zbx_preproc_result_free(zbx_preproc_result_t * result)1151 void	zbx_preproc_result_free(zbx_preproc_result_t *result)
1152 {
1153 	zbx_variant_clear(&result->value);
1154 	zbx_free(result->error);
1155 	zbx_free(result);
1156 }
1157 
1158 /******************************************************************************
1159  *                                                                            *
1160  * Function: preprocessor_pack_test_request                                   *
1161  *                                                                            *
1162  * Purpose: packs preprocessing step request for serialization                *
1163  *                                                                            *
1164  * Return value: The size of packed data                                      *
1165  *                                                                            *
1166  ******************************************************************************/
preprocessor_pack_test_request(unsigned char ** data,unsigned char value_type,const char * value,const zbx_timespec_t * ts,const zbx_vector_ptr_t * history,const zbx_vector_ptr_t * steps)1167 static zbx_uint32_t	preprocessor_pack_test_request(unsigned char **data, unsigned char value_type,
1168 		const char *value, const zbx_timespec_t *ts, const zbx_vector_ptr_t *history,
1169 		const zbx_vector_ptr_t *steps)
1170 {
1171 	zbx_packed_field_t	*offset, *fields;
1172 	zbx_uint32_t		size;
1173 	int			i, history_num;
1174 	zbx_ipc_message_t	message;
1175 
1176 	history_num = (NULL != history ? history->values_num : 0);
1177 
1178 	/* 6 is a max field count (without preprocessing step and history fields) */
1179 	fields = (zbx_packed_field_t *)zbx_malloc(NULL, (6 + steps->values_num * 4 + history_num * 5)
1180 			* sizeof(zbx_packed_field_t));
1181 
1182 	offset = fields;
1183 
1184 	*offset++ = PACKED_FIELD(&value_type, sizeof(unsigned char));
1185 	*offset++ = PACKED_FIELD(value, 0);
1186 	*offset++ = PACKED_FIELD(&ts->sec, sizeof(int));
1187 	*offset++ = PACKED_FIELD(&ts->ns, sizeof(int));
1188 
1189 	offset += preprocessor_pack_history(offset, history, &history_num);
1190 
1191 	*offset++ = PACKED_FIELD(&steps->values_num, sizeof(int));
1192 
1193 	for (i = 0; i < steps->values_num; i++)
1194 		offset += preprocessor_pack_step(offset, (zbx_preproc_op_t *)steps->values[i]);
1195 
1196 	zbx_ipc_message_init(&message);
1197 	size = message_pack_data(&message, fields, offset - fields);
1198 	*data = message.data;
1199 	zbx_free(fields);
1200 
1201 	return size;
1202 }
1203 
1204 /******************************************************************************
1205  *                                                                            *
1206  * Function: zbx_preprocessor_unpack_test_request                             *
1207  *                                                                            *
1208  * Purpose: unpack preprocessing test request data from IPC data buffer       *
1209  *                                                                            *
1210  * Parameters: value_type    - [OUT] item value type                          *
1211  *             value         - [OUT] the value                                *
1212  *             ts            - [OUT] value timestamp                          *
1213  *             value         - [OUT] item value                               *
1214  *             history       - [OUT] history data                             *
1215  *             steps         - [OUT] preprocessing steps                      *
1216  *             steps_num     - [OUT] preprocessing step count                 *
1217  *             data          - [IN] IPC data buffer                           *
1218  *                                                                            *
1219  ******************************************************************************/
zbx_preprocessor_unpack_test_request(unsigned char * value_type,char ** value,zbx_timespec_t * ts,zbx_vector_ptr_t * history,zbx_preproc_op_t ** steps,int * steps_num,const unsigned char * data)1220 void	zbx_preprocessor_unpack_test_request(unsigned char *value_type, char **value, zbx_timespec_t *ts,
1221 		zbx_vector_ptr_t *history, zbx_preproc_op_t **steps, int *steps_num, const unsigned char *data)
1222 {
1223 	zbx_uint32_t			value_len;
1224 	const unsigned char		*offset = data;
1225 
1226 	offset += zbx_deserialize_char(offset, value_type);
1227 	offset += zbx_deserialize_str(offset, value, value_len);
1228 	offset += zbx_deserialize_int(offset, &ts->sec);
1229 	offset += zbx_deserialize_int(offset, &ts->ns);
1230 
1231 	offset += preprocesser_unpack_history(offset, history);
1232 	(void)preprocessor_unpack_steps(offset, steps, steps_num);
1233 }
1234 
1235 /******************************************************************************
1236  *                                                                            *
1237  * Function: zbx_preprocessor_test                                            *
1238  *                                                                            *
1239  * Purpose: tests item preprocessing with the specified input value and steps *
1240  *                                                                            *
1241  ******************************************************************************/
zbx_preprocessor_test(unsigned char value_type,const char * value,const zbx_timespec_t * ts,const zbx_vector_ptr_t * steps,zbx_vector_ptr_t * results,zbx_vector_ptr_t * history,char ** preproc_error,char ** error)1242 int	zbx_preprocessor_test(unsigned char value_type, const char *value, const zbx_timespec_t *ts,
1243 		const zbx_vector_ptr_t *steps, zbx_vector_ptr_t *results, zbx_vector_ptr_t *history,
1244 		char **preproc_error, char **error)
1245 {
1246 	unsigned char	*data = NULL;
1247 	zbx_uint32_t	size;
1248 	int		ret = FAIL;
1249 	unsigned char	*result;
1250 
1251 	size = preprocessor_pack_test_request(&data, value_type, value, ts, history, steps);
1252 
1253 	if (SUCCEED != zbx_ipc_async_exchange(ZBX_IPC_SERVICE_PREPROCESSING, ZBX_IPC_PREPROCESSOR_TEST_REQUEST,
1254 			SEC_PER_MIN, data, size, &result, error))
1255 	{
1256 		goto out;
1257 	}
1258 
1259 	zbx_preprocessor_unpack_test_result(results, history, preproc_error, result);
1260 	zbx_free(result);
1261 
1262 	ret = SUCCEED;
1263 out:
1264 	zbx_free(data);
1265 
1266 	return ret;
1267 }
1268 
1269 /******************************************************************************
1270  *                                                                            *
1271  * Function: zbx_preprocessor_get_diag_stats                                  *
1272  *                                                                            *
1273  * Purpose: get preprocessing manager diagnostic statistics                   *
1274  *                                                                            *
1275  ******************************************************************************/
zbx_preprocessor_get_diag_stats(int * total,int * queued,int * processing,int * done,int * pending,char ** error)1276 int	zbx_preprocessor_get_diag_stats(int *total, int *queued, int *processing, int *done,
1277 		int *pending, char **error)
1278 {
1279 	unsigned char	*result;
1280 
1281 	if (SUCCEED != zbx_ipc_async_exchange(ZBX_IPC_SERVICE_PREPROCESSING, ZBX_IPC_PREPROCESSOR_DIAG_STATS,
1282 			SEC_PER_MIN, NULL, 0, &result, error))
1283 	{
1284 		return FAIL;
1285 	}
1286 
1287 	zbx_preprocessor_unpack_diag_stats(total, queued, processing, done, pending, result);
1288 	zbx_free(result);
1289 
1290 	return SUCCEED;
1291 }
1292 
1293 /******************************************************************************
1294  *                                                                            *
1295  * Function: zbx_preprocessor_get_top_items                                   *
1296  *                                                                            *
1297  * Purpose: get the top N items by the number of queued values                *
1298  *                                                                            *
1299  ******************************************************************************/
preprocessor_get_top_items(int limit,zbx_vector_ptr_t * items,char ** error,zbx_uint32_t code)1300 static int	preprocessor_get_top_items(int limit, zbx_vector_ptr_t *items, char **error, zbx_uint32_t code)
1301 {
1302 	int		ret;
1303 	unsigned char	*data, *result;
1304 	zbx_uint32_t	data_len;
1305 
1306 	data_len = zbx_preprocessor_pack_top_items_request(&data, limit);
1307 
1308 	if (SUCCEED != (ret = zbx_ipc_async_exchange(ZBX_IPC_SERVICE_PREPROCESSING, code, SEC_PER_MIN, data, data_len,
1309 			&result, error)))
1310 	{
1311 		goto out;
1312 	}
1313 
1314 	zbx_preprocessor_unpack_top_result(items, result);
1315 	zbx_free(result);
1316 out:
1317 	zbx_free(data);
1318 
1319 	return ret;
1320 }
1321 
1322 /******************************************************************************
1323  *                                                                            *
1324  * Function: zbx_preprocessor_get_top_items                                   *
1325  *                                                                            *
1326  * Purpose: get the top N items by the number of queued values                *
1327  *                                                                            *
1328  ******************************************************************************/
zbx_preprocessor_get_top_items(int limit,zbx_vector_ptr_t * items,char ** error)1329 int	zbx_preprocessor_get_top_items(int limit, zbx_vector_ptr_t *items, char **error)
1330 {
1331 	return preprocessor_get_top_items(limit, items, error, ZBX_IPC_PREPROCESSOR_TOP_ITEMS);
1332 }
1333 
1334 /******************************************************************************
1335  *                                                                            *
1336  * Function: zbx_preprocessor_get_top_oldest_preproc_items                    *
1337  *                                                                            *
1338  * Purpose: get the oldest items with preprocessing still in queue            *
1339  *                                                                            *
1340  ******************************************************************************/
zbx_preprocessor_get_top_oldest_preproc_items(int limit,zbx_vector_ptr_t * items,char ** error)1341 int	zbx_preprocessor_get_top_oldest_preproc_items(int limit, zbx_vector_ptr_t *items, char **error)
1342 {
1343 	return preprocessor_get_top_items(limit, items, error, ZBX_IPC_PREPROCESSOR_TOP_OLDEST_PREPROC_ITEMS);
1344 }
1345