1 /*
2 ** Zabbix
3 ** Copyright (C) 2001-2021 Zabbix SIA
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 **/
19
20 #include "common.h"
21 #include "daemon.h"
22 #include "zbxself.h"
23 #include "log.h"
24 #include "zbxipcservice.h"
25 #include "zbxserialize.h"
26 #include "preprocessing.h"
27 #include "zbxembed.h"
28
29 #include "sysinfo.h"
30 #include "preproc_worker.h"
31 #include "item_preproc.h"
32 #include "preproc_history.h"
33
34 extern unsigned char process_type, program_type;
35 extern int server_num, process_num;
36
37 #define ZBX_PREPROC_VALUE_PREVIEW_LEN 100
38
39 zbx_es_t es_engine;
40
41 /******************************************************************************
42 * *
43 * Function: worker_format_value *
44 * *
45 * Purpose: formats value in text format *
46 * *
47 * Parameters: value - [IN] the value to format *
48 * value_str - [OUT] the formatted value *
49 * *
50 * Comments: Control characters are replaced with '.' and truncated if it's *
51 * larger than ZBX_PREPROC_VALUE_PREVIEW_LEN characters. *
52 * *
53 ******************************************************************************/
worker_format_value(const zbx_variant_t * value,char ** value_str)54 static void worker_format_value(const zbx_variant_t *value, char **value_str)
55 {
56 int len, i;
57 const char *value_desc;
58
59 value_desc = zbx_variant_value_desc(value);
60
61 if (ZBX_PREPROC_VALUE_PREVIEW_LEN < zbx_strlen_utf8(value_desc))
62 {
63 /* truncate value and append '...' */
64 len = zbx_strlen_utf8_nchars(value_desc, ZBX_PREPROC_VALUE_PREVIEW_LEN - ZBX_CONST_STRLEN("..."));
65 *value_str = zbx_malloc(NULL, len + ZBX_CONST_STRLEN("...") + 1);
66 memcpy(*value_str, value_desc, len);
67 memcpy(*value_str + len, "...", ZBX_CONST_STRLEN("...") + 1);
68 }
69 else
70 {
71 *value_str = zbx_malloc(NULL, (len = strlen(value_desc)) + 1);
72 memcpy(*value_str, value_desc, len + 1);
73 }
74
75 /* replace control characters */
76 for (i = 0; i < len; i++)
77 {
78 if (0 != iscntrl((*value_str)[i]))
79 (*value_str)[i] = '.';
80 }
81 }
82
83 /******************************************************************************
84 * *
85 * Function: worker_format_result *
86 * *
87 * Purpose: formats one preprocessing step result *
88 * *
89 * Parameters: step - [IN] the preprocessing step number *
90 * result - [IN] the preprocessing step result *
91 * error - [IN] the preprocessing step error (can be NULL) *
92 * out - [OUT] the formatted string *
93 * *
94 ******************************************************************************/
worker_format_result(int step,const zbx_preproc_result_t * result,const char * error,char ** out)95 static void worker_format_result(int step, const zbx_preproc_result_t *result, const char *error, char **out)
96 {
97 char *actions[] = {"", " (discard value)", " (set value)", " (set error)"};
98
99 if (NULL == error)
100 {
101 char *value_str;
102
103 worker_format_value(&result->value, &value_str);
104 *out = zbx_dsprintf(NULL, "%d. Result%s: %s\n", step, actions[result->action], value_str);
105 zbx_free(value_str);
106 }
107 else
108 {
109 *out = zbx_dsprintf(NULL, "%d. Failed%s: %s\n", step, actions[result->action], error);
110 zbx_rtrim(*out, ZBX_WHITESPACE);
111 }
112 }
113
114 /******************************************************************************
115 * *
116 * Function: worker_format_error *
117 * *
118 * Purpose: formats preprocessing error message *
119 * *
120 * Parameters: value - [IN] the input value *
121 * results - [IN] the preprocessing step results *
122 * results_num - [IN] the number of executed steps *
123 * errmsg - [IN] the error message of last executed step *
124 * error - [OUT] the formatted error message *
125 * *
126 ******************************************************************************/
worker_format_error(const zbx_variant_t * value,zbx_preproc_result_t * results,int results_num,const char * errmsg,char ** error)127 static void worker_format_error(const zbx_variant_t *value, zbx_preproc_result_t *results, int results_num,
128 const char *errmsg, char **error)
129 {
130 char *value_str, *err_step;
131 int i;
132 size_t error_alloc = 512, error_offset = 0;
133 zbx_vector_str_t results_str;
134 zbx_db_mock_field_t field;
135
136 zbx_vector_str_create(&results_str);
137
138 /* add header to error message */
139 *error = zbx_malloc(NULL, error_alloc);
140 worker_format_value(value, &value_str);
141 zbx_snprintf_alloc(error, &error_alloc, &error_offset, "Preprocessing failed for: %s\n", value_str);
142 zbx_free(value_str);
143
144 zbx_db_mock_field_init(&field, ZBX_TYPE_CHAR, ITEM_ERROR_LEN);
145
146 zbx_db_mock_field_append(&field, *error);
147 zbx_db_mock_field_append(&field, "...\n");
148
149 /* format the last (failed) step */
150 worker_format_result(results_num, &results[results_num - 1], errmsg, &err_step);
151 zbx_vector_str_append(&results_str, err_step);
152
153 if (SUCCEED == zbx_db_mock_field_append(&field, err_step))
154 {
155 /* format the first steps */
156 for (i = results_num - 2; i >= 0; i--)
157 {
158 worker_format_result(i + 1, &results[i], NULL, &err_step);
159
160 if (SUCCEED != zbx_db_mock_field_append(&field, err_step))
161 {
162 zbx_free(err_step);
163 break;
164 }
165
166 zbx_vector_str_append(&results_str, err_step);
167 }
168 }
169
170 /* add steps to error message */
171
172 if (results_str.values_num < results_num)
173 zbx_strcpy_alloc(error, &error_alloc, &error_offset, "...\n");
174
175 for (i = results_str.values_num - 1; i >= 0; i--)
176 zbx_strcpy_alloc(error, &error_alloc, &error_offset, results_str.values[i]);
177
178 /* truncate formatted error if necessary */
179 if (ITEM_ERROR_LEN < zbx_strlen_utf8(*error))
180 {
181 char *ptr;
182
183 ptr = (*error) + zbx_db_strlen_n(*error, ITEM_ERROR_LEN - 3);
184 for (i = 0; i < 3; i++)
185 *ptr++ = '.';
186 *ptr = '\0';
187 }
188
189 zbx_vector_str_clear_ext(&results_str, zbx_str_free);
190 zbx_vector_str_destroy(&results_str);
191 }
192
193 /******************************************************************************
194 * *
195 * Function: worker_item_preproc_execute *
196 * *
197 * Purpose: execute preprocessing steps *
198 * *
199 * Parameters: value_type - [IN] the item value type *
200 * value - [IN/OUT] the value to process *
201 * ts - [IN] the value timestamp *
202 * steps - [IN] the preprocessing steps to execute *
203 * steps_num - [IN] the number of preprocessing steps *
204 * history_in - [IN] the preprocessing history *
205 * history_out - [OUT] the new preprocessing history *
206 * results - [OUT] the preprocessing step results *
207 * results_num - [OUT] the number of step results *
208 * error - [OUT] error message *
209 * *
210 * Return value: SUCCEED - the preprocessing steps finished successfully *
211 * FAIL - otherwise, error contains the error message *
212 * *
213 ******************************************************************************/
worker_item_preproc_execute(unsigned char value_type,zbx_variant_t * value,const zbx_timespec_t * ts,zbx_preproc_op_t * steps,int steps_num,zbx_vector_ptr_t * history_in,zbx_vector_ptr_t * history_out,zbx_preproc_result_t * results,int * results_num,char ** error)214 static int worker_item_preproc_execute(unsigned char value_type, zbx_variant_t *value, const zbx_timespec_t *ts,
215 zbx_preproc_op_t *steps, int steps_num, zbx_vector_ptr_t *history_in, zbx_vector_ptr_t *history_out,
216 zbx_preproc_result_t *results, int *results_num, char **error)
217 {
218 int i, ret = SUCCEED;
219
220 for (i = 0; i < steps_num; i++)
221 {
222 zbx_preproc_op_t *op = &steps[i];
223 zbx_variant_t history_value;
224 zbx_timespec_t history_ts;
225
226 zbx_preproc_history_pop_value(history_in, i, &history_value, &history_ts);
227
228 if (FAIL == (ret = zbx_item_preproc(value_type, value, ts, op, &history_value, &history_ts, error)))
229 {
230 results[i].action = op->error_handler;
231 ret = zbx_item_preproc_handle_error(value, op, error);
232 zbx_variant_clear(&history_value);
233 }
234 else
235 results[i].action = ZBX_PREPROC_FAIL_DEFAULT;
236
237 if (SUCCEED == ret)
238 {
239 if (NULL == *error)
240 {
241 /* result history is kept to report results of steps before failing step, */
242 /* which means it can be omitted for the last step. */
243 if (i != steps_num - 1)
244 zbx_variant_copy(&results[i].value, value);
245 else
246 zbx_variant_set_none(&results[i].value);
247 }
248 else
249 {
250 /* preprocessing step successfully extracted error, set it */
251 results[i].action = ZBX_PREPROC_FAIL_FORCE_ERROR;
252 ret = FAIL;
253 }
254 }
255
256 if (SUCCEED != ret)
257 {
258 break;
259 }
260
261 if (ZBX_VARIANT_NONE != history_value.type)
262 {
263 /* the value is byte copied to history_out vector and doesn't have to be cleared */
264 zbx_preproc_history_add_value(history_out, i, &history_value, &history_ts);
265 }
266
267 if (ZBX_VARIANT_NONE == value->type)
268 break;
269 }
270
271 *results_num = (i == steps_num ? i : i + 1);
272
273 return ret;
274 }
275
276 /******************************************************************************
277 * *
278 * Function: worker_preprocess_value *
279 * *
280 * Purpose: handle item value preprocessing task *
281 * *
282 * Parameters: socket - [IN] IPC socket *
283 * message - [IN] packed preprocessing task *
284 * *
285 ******************************************************************************/
worker_preprocess_value(zbx_ipc_socket_t * socket,zbx_ipc_message_t * message)286 static void worker_preprocess_value(zbx_ipc_socket_t *socket, zbx_ipc_message_t *message)
287 {
288 zbx_uint32_t size = 0;
289 unsigned char *data = NULL, value_type;
290 zbx_uint64_t itemid;
291 zbx_variant_t value, value_start;
292 int i, steps_num, results_num, ret;
293 char *errmsg = NULL, *error = NULL;
294 zbx_timespec_t *ts;
295 zbx_preproc_op_t *steps;
296 zbx_vector_ptr_t history_in, history_out;
297 zbx_preproc_result_t *results;
298
299 zbx_vector_ptr_create(&history_in);
300 zbx_vector_ptr_create(&history_out);
301
302 zbx_preprocessor_unpack_task(&itemid, &value_type, &ts, &value, &history_in, &steps, &steps_num,
303 message->data);
304
305 zbx_variant_copy(&value_start, &value);
306 results = (zbx_preproc_result_t *)zbx_malloc(NULL, sizeof(zbx_preproc_result_t) * steps_num);
307 memset(results, 0, sizeof(zbx_preproc_result_t) * steps_num);
308
309 if (FAIL == (ret = worker_item_preproc_execute(value_type, &value, ts, steps, steps_num, &history_in,
310 &history_out, results, &results_num, &errmsg)) && 0 != results_num)
311 {
312 int action = results[results_num - 1].action;
313
314 if (ZBX_PREPROC_FAIL_SET_ERROR != action && ZBX_PREPROC_FAIL_FORCE_ERROR != action)
315 {
316 worker_format_error(&value_start, results, results_num, errmsg, &error);
317 zbx_free(errmsg);
318 }
319 else
320 error = errmsg;
321 }
322
323 if (SUCCEED == ZBX_CHECK_LOG_LEVEL(LOG_LEVEL_DEBUG))
324 {
325 const char *result;
326
327 result = (SUCCEED == ret ? zbx_variant_value_desc(&value) : error);
328 zabbix_log(LOG_LEVEL_DEBUG, "%s(): %s", __func__, zbx_variant_value_desc(&value_start));
329 zabbix_log(LOG_LEVEL_DEBUG, "%s: %s %s",__func__, zbx_result_string(ret), result);
330 }
331
332 size = zbx_preprocessor_pack_result(&data, &value, &history_out, error);
333 zbx_variant_clear(&value);
334 zbx_free(error);
335 zbx_free(ts);
336 zbx_free(steps);
337
338 if (FAIL == zbx_ipc_socket_write(socket, ZBX_IPC_PREPROCESSOR_RESULT, data, size))
339 {
340 zabbix_log(LOG_LEVEL_CRIT, "cannot send preprocessing result");
341 exit(EXIT_FAILURE);
342 }
343
344 zbx_free(data);
345
346 zbx_variant_clear(&value_start);
347
348 for (i = 0; i < results_num; i++)
349 zbx_variant_clear(&results[i].value);
350 zbx_free(results);
351
352 zbx_vector_ptr_clear_ext(&history_out, (zbx_clean_func_t)zbx_preproc_op_history_free);
353 zbx_vector_ptr_destroy(&history_out);
354
355 zbx_vector_ptr_clear_ext(&history_in, (zbx_clean_func_t)zbx_preproc_op_history_free);
356 zbx_vector_ptr_destroy(&history_in);
357 }
358
359 /******************************************************************************
360 * *
361 * Function: worker_test_value *
362 * *
363 * Purpose: handle item value test preprocessing task *
364 * *
365 * Parameters: socket - [IN] IPC socket *
366 * message - [IN] packed preprocessing task *
367 * *
368 ******************************************************************************/
worker_test_value(zbx_ipc_socket_t * socket,zbx_ipc_message_t * message)369 static void worker_test_value(zbx_ipc_socket_t *socket, zbx_ipc_message_t *message)
370 {
371 zbx_uint32_t size;
372 unsigned char *data, value_type;
373 zbx_variant_t value, value_start;
374 int i, steps_num, results_num;
375 char *error = NULL, *value_str;
376 zbx_timespec_t ts;
377 zbx_preproc_op_t *steps;
378 zbx_vector_ptr_t history_in, history_out;
379 zbx_preproc_result_t *results;
380
381 zbx_vector_ptr_create(&history_in);
382 zbx_vector_ptr_create(&history_out);
383
384 zbx_preprocessor_unpack_test_request(&value_type, &value_str, &ts, &history_in, &steps, &steps_num,
385 message->data);
386
387 zbx_variant_set_str(&value, value_str);
388 zbx_variant_copy(&value_start, &value);
389
390 results = (zbx_preproc_result_t *)zbx_malloc(NULL, sizeof(zbx_preproc_result_t) * steps_num);
391 memset(results, 0, sizeof(zbx_preproc_result_t) * steps_num);
392
393 zbx_item_preproc_test(value_type, &value, &ts, steps, steps_num, &history_in, &history_out, results,
394 &results_num, &error);
395
396 size = zbx_preprocessor_pack_test_result(&data, results, results_num, &history_out, error);
397
398 if (FAIL == zbx_ipc_socket_write(socket, ZBX_IPC_PREPROCESSOR_TEST_RESULT, data, size))
399 {
400 zabbix_log(LOG_LEVEL_CRIT, "cannot send preprocessing result");
401 exit(EXIT_FAILURE);
402 }
403
404 zbx_variant_clear(&value);
405 zbx_free(error);
406 zbx_free(steps);
407 zbx_free(data);
408
409 zbx_variant_clear(&value_start);
410
411 for (i = 0; i < results_num; i++)
412 {
413 zbx_variant_clear(&results[i].value);
414 zbx_free(results[i].error);
415 }
416 zbx_free(results);
417
418 zbx_vector_ptr_clear_ext(&history_out, (zbx_clean_func_t)zbx_preproc_op_history_free);
419 zbx_vector_ptr_destroy(&history_out);
420
421 zbx_vector_ptr_clear_ext(&history_in, (zbx_clean_func_t)zbx_preproc_op_history_free);
422 zbx_vector_ptr_destroy(&history_in);
423 }
424
ZBX_THREAD_ENTRY(preprocessing_worker_thread,args)425 ZBX_THREAD_ENTRY(preprocessing_worker_thread, args)
426 {
427 pid_t ppid;
428 char *error = NULL;
429 zbx_ipc_socket_t socket;
430 zbx_ipc_message_t message;
431
432 process_type = ((zbx_thread_args_t *)args)->process_type;
433 server_num = ((zbx_thread_args_t *)args)->server_num;
434 process_num = ((zbx_thread_args_t *)args)->process_num;
435
436 zbx_setproctitle("%s #%d starting", get_process_type_string(process_type), process_num);
437
438 zbx_es_init(&es_engine);
439
440 zbx_ipc_message_init(&message);
441
442 if (FAIL == zbx_ipc_socket_open(&socket, ZBX_IPC_SERVICE_PREPROCESSING, SEC_PER_MIN, &error))
443 {
444 zabbix_log(LOG_LEVEL_CRIT, "cannot connect to preprocessing service: %s", error);
445 zbx_free(error);
446 exit(EXIT_FAILURE);
447 }
448
449 ppid = getppid();
450 zbx_ipc_socket_write(&socket, ZBX_IPC_PREPROCESSOR_WORKER, (unsigned char *)&ppid, sizeof(ppid));
451
452 zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(program_type),
453 server_num, get_process_type_string(process_type), process_num);
454
455 update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);
456
457 zbx_setproctitle("%s #%d started", get_process_type_string(process_type), process_num);
458
459 while (ZBX_IS_RUNNING())
460 {
461 update_selfmon_counter(ZBX_PROCESS_STATE_IDLE);
462
463 if (SUCCEED != zbx_ipc_socket_read(&socket, &message))
464 {
465 zabbix_log(LOG_LEVEL_CRIT, "cannot read preprocessing service request");
466 exit(EXIT_FAILURE);
467 }
468
469 update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);
470 zbx_update_env(zbx_time());
471
472 switch (message.code)
473 {
474 case ZBX_IPC_PREPROCESSOR_REQUEST:
475 worker_preprocess_value(&socket, &message);
476 break;
477 case ZBX_IPC_PREPROCESSOR_TEST_REQUEST:
478 worker_test_value(&socket, &message);
479 break;
480 }
481
482 zbx_ipc_message_clean(&message);
483 }
484
485 zbx_setproctitle("%s #%d [terminated]", get_process_type_string(process_type), process_num);
486
487 while (1)
488 zbx_sleep(SEC_PER_MIN);
489
490 zbx_es_destroy(&es_engine);
491 }
492