1 /*
2 ** Zabbix
3 ** Copyright (C) 2001-2021 Zabbix SIA
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
18 **/
19 
20 #include "common.h"
21 #include "daemon.h"
22 #include "zbxself.h"
23 #include "log.h"
24 #include "zbxipcservice.h"
25 #include "zbxserialize.h"
26 #include "preprocessing.h"
27 #include "zbxembed.h"
28 
29 #include "sysinfo.h"
30 #include "preproc_worker.h"
31 #include "item_preproc.h"
32 #include "preproc_history.h"
33 
34 extern unsigned char	process_type, program_type;
35 extern int		server_num, process_num;
36 
37 #define ZBX_PREPROC_VALUE_PREVIEW_LEN		100
38 
39 zbx_es_t	es_engine;
40 
41 /******************************************************************************
42  *                                                                            *
43  * Function: worker_format_value                                              *
44  *                                                                            *
45  * Purpose: formats value in text format                                      *
46  *                                                                            *
47  * Parameters: value     - [IN] the value to format                           *
48  *             value_str - [OUT] the formatted value                          *
49  *                                                                            *
50  * Comments: Control characters are replaced with '.' and truncated if it's   *
51  *           larger than ZBX_PREPROC_VALUE_PREVIEW_LEN characters.            *
52  *                                                                            *
53  ******************************************************************************/
worker_format_value(const zbx_variant_t * value,char ** value_str)54 static void	worker_format_value(const zbx_variant_t *value, char **value_str)
55 {
56 	int		len, i;
57 	const char	*value_desc;
58 
59 	value_desc = zbx_variant_value_desc(value);
60 
61 	if (ZBX_PREPROC_VALUE_PREVIEW_LEN < zbx_strlen_utf8(value_desc))
62 	{
63 		/* truncate value and append '...' */
64 		len = zbx_strlen_utf8_nchars(value_desc, ZBX_PREPROC_VALUE_PREVIEW_LEN - ZBX_CONST_STRLEN("..."));
65 		*value_str = zbx_malloc(NULL, len + ZBX_CONST_STRLEN("...") + 1);
66 		memcpy(*value_str, value_desc, len);
67 		memcpy(*value_str + len, "...", ZBX_CONST_STRLEN("...") + 1);
68 	}
69 	else
70 	{
71 		*value_str = zbx_malloc(NULL, (len = strlen(value_desc)) + 1);
72 		memcpy(*value_str, value_desc, len + 1);
73 	}
74 
75 	/* replace control characters */
76 	for (i = 0; i < len; i++)
77 	{
78 		if (0 != iscntrl((*value_str)[i]))
79 			(*value_str)[i] = '.';
80 	}
81 }
82 
83 /******************************************************************************
84  *                                                                            *
85  * Function: worker_format_result                                             *
86  *                                                                            *
87  * Purpose: formats one preprocessing step result                             *
88  *                                                                            *
89  * Parameters: step   - [IN] the preprocessing step number                    *
90  *             result - [IN] the preprocessing step result                    *
91  *             error  - [IN] the preprocessing step error (can be NULL)       *
92  *             out    - [OUT] the formatted string                            *
93  *                                                                            *
94  ******************************************************************************/
worker_format_result(int step,const zbx_preproc_result_t * result,const char * error,char ** out)95 static void	worker_format_result(int step, const zbx_preproc_result_t *result, const char *error, char **out)
96 {
97 	char	*actions[] = {"", " (discard value)", " (set value)", " (set error)"};
98 
99 	if (NULL == error)
100 	{
101 		char	*value_str;
102 
103 		worker_format_value(&result->value, &value_str);
104 		*out = zbx_dsprintf(NULL, "%d. Result%s: %s\n", step, actions[result->action], value_str);
105 		zbx_free(value_str);
106 	}
107 	else
108 	{
109 		*out = zbx_dsprintf(NULL, "%d. Failed%s: %s\n", step, actions[result->action], error);
110 		zbx_rtrim(*out, ZBX_WHITESPACE);
111 	}
112 }
113 
114 /******************************************************************************
115  *                                                                            *
116  * Function: worker_format_error                                              *
117  *                                                                            *
118  * Purpose: formats preprocessing error message                               *
119  *                                                                            *
120  * Parameters: value        - [IN] the input value                            *
121  *             results      - [IN] the preprocessing step results             *
122  *             results_num  - [IN] the number of executed steps               *
123  *             errmsg       - [IN] the error message of last executed step    *
124  *             error        - [OUT] the formatted error message               *
125  *                                                                            *
126  ******************************************************************************/
worker_format_error(const zbx_variant_t * value,zbx_preproc_result_t * results,int results_num,const char * errmsg,char ** error)127 static void	worker_format_error(const zbx_variant_t *value, zbx_preproc_result_t *results, int results_num,
128 		const char *errmsg, char **error)
129 {
130 	char			*value_str, *err_step;
131 	int			i;
132 	size_t			error_alloc = 512, error_offset = 0;
133 	zbx_vector_str_t	results_str;
134 	zbx_db_mock_field_t	field;
135 
136 	zbx_vector_str_create(&results_str);
137 
138 	/* add header to error message */
139 	*error = zbx_malloc(NULL, error_alloc);
140 	worker_format_value(value, &value_str);
141 	zbx_snprintf_alloc(error, &error_alloc, &error_offset, "Preprocessing failed for: %s\n", value_str);
142 	zbx_free(value_str);
143 
144 	zbx_db_mock_field_init(&field, ZBX_TYPE_CHAR, ITEM_ERROR_LEN);
145 
146 	zbx_db_mock_field_append(&field, *error);
147 	zbx_db_mock_field_append(&field, "...\n");
148 
149 	/* format the last (failed) step */
150 	worker_format_result(results_num, &results[results_num - 1], errmsg, &err_step);
151 	zbx_vector_str_append(&results_str, err_step);
152 
153 	if (SUCCEED == zbx_db_mock_field_append(&field, err_step))
154 	{
155 		/* format the first steps */
156 		for (i = results_num - 2; i >= 0; i--)
157 		{
158 			worker_format_result(i + 1, &results[i], NULL, &err_step);
159 
160 			if (SUCCEED != zbx_db_mock_field_append(&field, err_step))
161 			{
162 				zbx_free(err_step);
163 				break;
164 			}
165 
166 			zbx_vector_str_append(&results_str, err_step);
167 		}
168 	}
169 
170 	/* add steps to error message */
171 
172 	if (results_str.values_num < results_num)
173 		zbx_strcpy_alloc(error, &error_alloc, &error_offset, "...\n");
174 
175 	for (i = results_str.values_num - 1; i >= 0; i--)
176 		zbx_strcpy_alloc(error, &error_alloc, &error_offset, results_str.values[i]);
177 
178 	/* truncate formatted error if necessary */
179 	if (ITEM_ERROR_LEN < zbx_strlen_utf8(*error))
180 	{
181 		char	*ptr;
182 
183 		ptr = (*error) + zbx_db_strlen_n(*error, ITEM_ERROR_LEN - 3);
184 		for (i = 0; i < 3; i++)
185 			*ptr++ = '.';
186 		*ptr = '\0';
187 	}
188 
189 	zbx_vector_str_clear_ext(&results_str, zbx_str_free);
190 	zbx_vector_str_destroy(&results_str);
191 }
192 
193 /******************************************************************************
194  *                                                                            *
195  * Function: worker_item_preproc_execute                                      *
196  *                                                                            *
197  * Purpose: execute preprocessing steps                                       *
198  *                                                                            *
199  * Parameters: value_type    - [IN] the item value type                       *
200  *             value         - [IN/OUT] the value to process                  *
201  *             ts            - [IN] the value timestamp                       *
202  *             steps         - [IN] the preprocessing steps to execute        *
203  *             steps_num     - [IN] the number of preprocessing steps         *
204  *             history_in    - [IN] the preprocessing history                 *
205  *             history_out   - [OUT] the new preprocessing history            *
206  *             results       - [OUT] the preprocessing step results           *
207  *             results_num   - [OUT] the number of step results               *
208  *             error         - [OUT] error message                            *
209  *                                                                            *
210  * Return value: SUCCEED - the preprocessing steps finished successfully      *
211  *               FAIL - otherwise, error contains the error message           *
212  *                                                                            *
213  ******************************************************************************/
worker_item_preproc_execute(unsigned char value_type,zbx_variant_t * value,const zbx_timespec_t * ts,zbx_preproc_op_t * steps,int steps_num,zbx_vector_ptr_t * history_in,zbx_vector_ptr_t * history_out,zbx_preproc_result_t * results,int * results_num,char ** error)214 static int	worker_item_preproc_execute(unsigned char value_type, zbx_variant_t *value, const zbx_timespec_t *ts,
215 		zbx_preproc_op_t *steps, int steps_num, zbx_vector_ptr_t *history_in, zbx_vector_ptr_t *history_out,
216 		zbx_preproc_result_t *results, int *results_num, char **error)
217 {
218 	int		i, ret = SUCCEED;
219 
220 	for (i = 0; i < steps_num; i++)
221 	{
222 		zbx_preproc_op_t	*op = &steps[i];
223 		zbx_variant_t		history_value;
224 		zbx_timespec_t		history_ts;
225 
226 		zbx_preproc_history_pop_value(history_in, i, &history_value, &history_ts);
227 
228 		if (FAIL == (ret = zbx_item_preproc(value_type, value, ts, op, &history_value, &history_ts, error)))
229 		{
230 			results[i].action = op->error_handler;
231 			ret = zbx_item_preproc_handle_error(value, op, error);
232 			zbx_variant_clear(&history_value);
233 		}
234 		else
235 			results[i].action = ZBX_PREPROC_FAIL_DEFAULT;
236 
237 		if (SUCCEED == ret)
238 		{
239 			if (NULL == *error)
240 			{
241 				/* result history is kept to report results of steps before failing step, */
242 				/* which means it can be omitted for the last step.                       */
243 				if (i != steps_num - 1)
244 					zbx_variant_copy(&results[i].value, value);
245 				else
246 					zbx_variant_set_none(&results[i].value);
247 			}
248 			else
249 			{
250 				/* preprocessing step successfully extracted error, set it */
251 				results[i].action = ZBX_PREPROC_FAIL_FORCE_ERROR;
252 				ret = FAIL;
253 			}
254 		}
255 
256 		if (SUCCEED != ret)
257 		{
258 			break;
259 		}
260 
261 		if (ZBX_VARIANT_NONE != history_value.type)
262 		{
263 			/* the value is byte copied to history_out vector and doesn't have to be cleared */
264 			zbx_preproc_history_add_value(history_out, i, &history_value, &history_ts);
265 		}
266 
267 		if (ZBX_VARIANT_NONE == value->type)
268 			break;
269 	}
270 
271 	*results_num = (i == steps_num ? i : i + 1);
272 
273 	return ret;
274 }
275 
276 /******************************************************************************
277  *                                                                            *
278  * Function: worker_preprocess_value                                          *
279  *                                                                            *
280  * Purpose: handle item value preprocessing task                              *
281  *                                                                            *
282  * Parameters: socket  - [IN] IPC socket                                      *
283  *             message - [IN] packed preprocessing task                       *
284  *                                                                            *
285  ******************************************************************************/
worker_preprocess_value(zbx_ipc_socket_t * socket,zbx_ipc_message_t * message)286 static void	worker_preprocess_value(zbx_ipc_socket_t *socket, zbx_ipc_message_t *message)
287 {
288 	zbx_uint32_t		size = 0;
289 	unsigned char		*data = NULL, value_type;
290 	zbx_uint64_t		itemid;
291 	zbx_variant_t		value, value_start;
292 	int			i, steps_num, results_num, ret;
293 	char			*errmsg = NULL, *error = NULL;
294 	zbx_timespec_t		*ts;
295 	zbx_preproc_op_t	*steps;
296 	zbx_vector_ptr_t	history_in, history_out;
297 	zbx_preproc_result_t	*results;
298 
299 	zbx_vector_ptr_create(&history_in);
300 	zbx_vector_ptr_create(&history_out);
301 
302 	zbx_preprocessor_unpack_task(&itemid, &value_type, &ts, &value, &history_in, &steps, &steps_num,
303 			message->data);
304 
305 	zbx_variant_copy(&value_start, &value);
306 	results = (zbx_preproc_result_t *)zbx_malloc(NULL, sizeof(zbx_preproc_result_t) * steps_num);
307 	memset(results, 0, sizeof(zbx_preproc_result_t) * steps_num);
308 
309 	if (FAIL == (ret = worker_item_preproc_execute(value_type, &value, ts, steps, steps_num, &history_in,
310 			&history_out, results, &results_num, &errmsg)) && 0 != results_num)
311 	{
312 		int action = results[results_num - 1].action;
313 
314 		if (ZBX_PREPROC_FAIL_SET_ERROR != action && ZBX_PREPROC_FAIL_FORCE_ERROR != action)
315 		{
316 			worker_format_error(&value_start, results, results_num, errmsg, &error);
317 			zbx_free(errmsg);
318 		}
319 		else
320 			error = errmsg;
321 	}
322 
323 	if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_DEBUG))
324 	{
325 		const char	*result;
326 
327 		result = (SUCCEED == ret ? zbx_variant_value_desc(&value) : error);
328 		zabbix_log(LOG_LEVEL_DEBUG, "%s(): %s", __func__, zbx_variant_value_desc(&value_start));
329 		zabbix_log(LOG_LEVEL_DEBUG, "%s: %s %s",__func__, zbx_result_string(ret), result);
330 	}
331 
332 	size = zbx_preprocessor_pack_result(&data, &value, &history_out, error);
333 	zbx_variant_clear(&value);
334 	zbx_free(error);
335 	zbx_free(ts);
336 	zbx_free(steps);
337 
338 	if (FAIL == zbx_ipc_socket_write(socket, ZBX_IPC_PREPROCESSOR_RESULT, data, size))
339 	{
340 		zabbix_log(LOG_LEVEL_CRIT, "cannot send preprocessing result");
341 		exit(EXIT_FAILURE);
342 	}
343 
344 	zbx_free(data);
345 
346 	zbx_variant_clear(&value_start);
347 
348 	for (i = 0; i < results_num; i++)
349 		zbx_variant_clear(&results[i].value);
350 	zbx_free(results);
351 
352 	zbx_vector_ptr_clear_ext(&history_out, (zbx_clean_func_t)zbx_preproc_op_history_free);
353 	zbx_vector_ptr_destroy(&history_out);
354 
355 	zbx_vector_ptr_clear_ext(&history_in, (zbx_clean_func_t)zbx_preproc_op_history_free);
356 	zbx_vector_ptr_destroy(&history_in);
357 }
358 
359 /******************************************************************************
360  *                                                                            *
361  * Function: worker_test_value                                                *
362  *                                                                            *
363  * Purpose: handle item value test preprocessing task                         *
364  *                                                                            *
365  * Parameters: socket  - [IN] IPC socket                                      *
366  *             message - [IN] packed preprocessing task                       *
367  *                                                                            *
368  ******************************************************************************/
worker_test_value(zbx_ipc_socket_t * socket,zbx_ipc_message_t * message)369 static void	worker_test_value(zbx_ipc_socket_t *socket, zbx_ipc_message_t *message)
370 {
371 	zbx_uint32_t		size;
372 	unsigned char		*data, value_type;
373 	zbx_variant_t		value, value_start;
374 	int			i, steps_num, results_num;
375 	char			*error = NULL, *value_str;
376 	zbx_timespec_t		ts;
377 	zbx_preproc_op_t	*steps;
378 	zbx_vector_ptr_t	history_in, history_out;
379 	zbx_preproc_result_t	*results;
380 
381 	zbx_vector_ptr_create(&history_in);
382 	zbx_vector_ptr_create(&history_out);
383 
384 	zbx_preprocessor_unpack_test_request(&value_type, &value_str, &ts, &history_in, &steps, &steps_num,
385 			message->data);
386 
387 	zbx_variant_set_str(&value, value_str);
388 	zbx_variant_copy(&value_start, &value);
389 
390 	results = (zbx_preproc_result_t *)zbx_malloc(NULL, sizeof(zbx_preproc_result_t) * steps_num);
391 	memset(results, 0, sizeof(zbx_preproc_result_t) * steps_num);
392 
393 	zbx_item_preproc_test(value_type, &value, &ts, steps, steps_num, &history_in, &history_out, results,
394 			&results_num, &error);
395 
396 	size = zbx_preprocessor_pack_test_result(&data, results, results_num, &history_out, error);
397 
398 	if (FAIL == zbx_ipc_socket_write(socket, ZBX_IPC_PREPROCESSOR_TEST_RESULT, data, size))
399 	{
400 		zabbix_log(LOG_LEVEL_CRIT, "cannot send preprocessing result");
401 		exit(EXIT_FAILURE);
402 	}
403 
404 	zbx_variant_clear(&value);
405 	zbx_free(error);
406 	zbx_free(steps);
407 	zbx_free(data);
408 
409 	zbx_variant_clear(&value_start);
410 
411 	for (i = 0; i < results_num; i++)
412 	{
413 		zbx_variant_clear(&results[i].value);
414 		zbx_free(results[i].error);
415 	}
416 	zbx_free(results);
417 
418 	zbx_vector_ptr_clear_ext(&history_out, (zbx_clean_func_t)zbx_preproc_op_history_free);
419 	zbx_vector_ptr_destroy(&history_out);
420 
421 	zbx_vector_ptr_clear_ext(&history_in, (zbx_clean_func_t)zbx_preproc_op_history_free);
422 	zbx_vector_ptr_destroy(&history_in);
423 }
424 
ZBX_THREAD_ENTRY(preprocessing_worker_thread,args)425 ZBX_THREAD_ENTRY(preprocessing_worker_thread, args)
426 {
427 	pid_t			ppid;
428 	char			*error = NULL;
429 	zbx_ipc_socket_t	socket;
430 	zbx_ipc_message_t	message;
431 
432 	process_type = ((zbx_thread_args_t *)args)->process_type;
433 	server_num = ((zbx_thread_args_t *)args)->server_num;
434 	process_num = ((zbx_thread_args_t *)args)->process_num;
435 
436 	zbx_setproctitle("%s #%d starting", get_process_type_string(process_type), process_num);
437 
438 	zbx_es_init(&es_engine);
439 
440 	zbx_ipc_message_init(&message);
441 
442 	if (FAIL == zbx_ipc_socket_open(&socket, ZBX_IPC_SERVICE_PREPROCESSING, SEC_PER_MIN, &error))
443 	{
444 		zabbix_log(LOG_LEVEL_CRIT, "cannot connect to preprocessing service: %s", error);
445 		zbx_free(error);
446 		exit(EXIT_FAILURE);
447 	}
448 
449 	ppid = getppid();
450 	zbx_ipc_socket_write(&socket, ZBX_IPC_PREPROCESSOR_WORKER, (unsigned char *)&ppid, sizeof(ppid));
451 
452 	zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(program_type),
453 			server_num, get_process_type_string(process_type), process_num);
454 
455 	update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);
456 
457 	zbx_setproctitle("%s #%d started", get_process_type_string(process_type), process_num);
458 
459 	while (ZBX_IS_RUNNING())
460 	{
461 		update_selfmon_counter(ZBX_PROCESS_STATE_IDLE);
462 
463 		if (SUCCEED != zbx_ipc_socket_read(&socket, &message))
464 		{
465 			zabbix_log(LOG_LEVEL_CRIT, "cannot read preprocessing service request");
466 			exit(EXIT_FAILURE);
467 		}
468 
469 		update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);
470 		zbx_update_env(zbx_time());
471 
472 		switch (message.code)
473 		{
474 			case ZBX_IPC_PREPROCESSOR_REQUEST:
475 				worker_preprocess_value(&socket, &message);
476 				break;
477 			case ZBX_IPC_PREPROCESSOR_TEST_REQUEST:
478 				worker_test_value(&socket, &message);
479 				break;
480 		}
481 
482 		zbx_ipc_message_clean(&message);
483 	}
484 
485 	zbx_setproctitle("%s #%d [terminated]", get_process_type_string(process_type), process_num);
486 
487 	while (1)
488 		zbx_sleep(SEC_PER_MIN);
489 
490 	zbx_es_destroy(&es_engine);
491 }
492