1 /*
2 ** Zabbix
3 ** Copyright (C) 2001-2021 Zabbix SIA
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 **/
19
20 #include "common.h"
21 #include "log.h"
22 #include "zbxjson.h"
23 #include "zbxalgo.h"
24 #include "dbcache.h"
25 #include "zbxhistory.h"
26 #include "zbxself.h"
27 #include "history.h"
28
29 /* curl_multi_wait() is supported starting with version 7.28.0 (0x071c00) */
30 #if defined(HAVE_LIBCURL) && LIBCURL_VERSION_NUM >= 0x071c00
31
32 #define ZBX_HISTORY_STORAGE_DOWN 10000 /* Timeout in milliseconds */
33
34 #define ZBX_IDX_JSON_ALLOCATE 256
35 #define ZBX_JSON_ALLOCATE 2048
36
37
38 const char *value_type_str[] = {"dbl", "str", "log", "uint", "text"};
39
40 extern char *CONFIG_HISTORY_STORAGE_URL;
41 extern int CONFIG_HISTORY_STORAGE_PIPELINES;
42
43 typedef struct
44 {
45 char *base_url;
46 char *post_url;
47 char *buf;
48 CURL *handle;
49 }
50 zbx_elastic_data_t;
51
52 typedef struct
53 {
54 unsigned char initialized;
55 zbx_vector_ptr_t ifaces;
56
57 CURLM *handle;
58 }
59 zbx_elastic_writer_t;
60
61 static zbx_elastic_writer_t writer;
62
63 typedef struct
64 {
65 char *data;
66 size_t alloc;
67 size_t offset;
68 }
69 zbx_httppage_t;
70
71 static zbx_httppage_t page_r;
72
73 typedef struct
74 {
75 zbx_httppage_t page;
76 char errbuf[CURL_ERROR_SIZE];
77 }
78 zbx_curlpage_t;
79
80 static zbx_curlpage_t page_w[ITEM_VALUE_TYPE_MAX];
81
curl_write_cb(void * ptr,size_t size,size_t nmemb,void * userdata)82 static size_t curl_write_cb(void *ptr, size_t size, size_t nmemb, void *userdata)
83 {
84 size_t r_size = size * nmemb;
85
86 zbx_httppage_t *page = (zbx_httppage_t *)userdata;
87
88 zbx_strncpy_alloc(&page->data, &page->alloc, &page->offset, ptr, r_size);
89
90 return r_size;
91 }
92
history_str2value(char * str,unsigned char value_type)93 static history_value_t history_str2value(char *str, unsigned char value_type)
94 {
95 history_value_t value;
96
97 switch (value_type)
98 {
99 case ITEM_VALUE_TYPE_LOG:
100 value.log = (zbx_log_value_t *)zbx_malloc(NULL, sizeof(zbx_log_value_t));
101 memset(value.log, 0, sizeof(zbx_log_value_t));
102 value.log->value = zbx_strdup(NULL, str);
103 break;
104 case ITEM_VALUE_TYPE_STR:
105 case ITEM_VALUE_TYPE_TEXT:
106 value.str = zbx_strdup(NULL, str);
107 break;
108 case ITEM_VALUE_TYPE_FLOAT:
109 value.dbl = atof(str);
110 break;
111 case ITEM_VALUE_TYPE_UINT64:
112 ZBX_STR2UINT64(value.ui64, str);
113 break;
114 }
115
116 return value;
117 }
118
history_value2str(const ZBX_DC_HISTORY * h)119 static const char *history_value2str(const ZBX_DC_HISTORY *h)
120 {
121 static char buffer[ZBX_MAX_DOUBLE_LEN + 1];
122
123 switch (h->value_type)
124 {
125 case ITEM_VALUE_TYPE_STR:
126 case ITEM_VALUE_TYPE_TEXT:
127 return h->value.str;
128 case ITEM_VALUE_TYPE_LOG:
129 return h->value.log->value;
130 case ITEM_VALUE_TYPE_FLOAT:
131 zbx_snprintf(buffer, sizeof(buffer), ZBX_FS_DBL64, h->value.dbl);
132 break;
133 case ITEM_VALUE_TYPE_UINT64:
134 zbx_snprintf(buffer, sizeof(buffer), ZBX_FS_UI64, h->value.ui64);
135 break;
136 }
137
138 return buffer;
139 }
140
history_parse_value(struct zbx_json_parse * jp,unsigned char value_type,zbx_history_record_t * hr)141 static int history_parse_value(struct zbx_json_parse *jp, unsigned char value_type, zbx_history_record_t *hr)
142 {
143 char *value = NULL;
144 size_t value_alloc = 0;
145 int ret = FAIL;
146
147 if (SUCCEED != zbx_json_value_by_name_dyn(jp, "clock", &value, &value_alloc, NULL))
148 goto out;
149
150 hr->timestamp.sec = atoi(value);
151
152 if (SUCCEED != zbx_json_value_by_name_dyn(jp, "ns", &value, &value_alloc, NULL))
153 goto out;
154
155 hr->timestamp.ns = atoi(value);
156
157 if (SUCCEED != zbx_json_value_by_name_dyn(jp, "value", &value, &value_alloc, NULL))
158 goto out;
159
160 hr->value = history_str2value(value, value_type);
161
162 if (ITEM_VALUE_TYPE_LOG == value_type)
163 {
164
165 if (SUCCEED != zbx_json_value_by_name_dyn(jp, "timestamp", &value, &value_alloc, NULL))
166 goto out;
167
168 hr->value.log->timestamp = atoi(value);
169
170 if (SUCCEED != zbx_json_value_by_name_dyn(jp, "logeventid", &value, &value_alloc, NULL))
171 goto out;
172
173 hr->value.log->logeventid = atoi(value);
174
175 if (SUCCEED != zbx_json_value_by_name_dyn(jp, "severity", &value, &value_alloc, NULL))
176 goto out;
177
178 hr->value.log->severity = atoi(value);
179
180 if (SUCCEED != zbx_json_value_by_name_dyn(jp, "source", &value, &value_alloc, NULL))
181 goto out;
182
183 hr->value.log->source = zbx_strdup(NULL, value);
184 }
185
186 ret = SUCCEED;
187
188 out:
189 zbx_free(value);
190
191 return ret;
192 }
193
elastic_log_error(CURL * handle,CURLcode error,const char * errbuf)194 static void elastic_log_error(CURL *handle, CURLcode error, const char *errbuf)
195 {
196 char http_status[MAX_STRING_LEN];
197 long int http_code;
198
199 if (CURLE_HTTP_RETURNED_ERROR == error)
200 {
201 if (CURLE_OK == curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &http_code))
202 zbx_snprintf(http_status, sizeof(http_status), "HTTP status code: %ld", http_code);
203 else
204 zbx_strlcpy(http_status, "unknown HTTP status code", sizeof(http_status));
205
206 if (0 != page_r.offset)
207 {
208 zabbix_log(LOG_LEVEL_ERR, "cannot get values from elasticsearch, %s, message: %s", http_status,
209 page_r.data);
210 }
211 else
212 zabbix_log(LOG_LEVEL_ERR, "cannot get values from elasticsearch, %s", http_status);
213 }
214 else
215 {
216 zabbix_log(LOG_LEVEL_ERR, "cannot get values from elasticsearch: %s",
217 '\0' != *errbuf ? errbuf : curl_easy_strerror(error));
218 }
219 }
220
221 /************************************************************************************
222 * *
223 * Function: elastic_close *
224 * *
225 * Purpose: closes connection and releases allocated resources *
226 * *
227 * Parameters: hist - [IN] the history storage interface *
228 * *
229 ************************************************************************************/
elastic_close(zbx_history_iface_t * hist)230 static void elastic_close(zbx_history_iface_t *hist)
231 {
232 zbx_elastic_data_t *data = (zbx_elastic_data_t *)hist->data;
233
234 zbx_free(data->buf);
235 zbx_free(data->post_url);
236
237 if (NULL != data->handle)
238 {
239 if (NULL != writer.handle)
240 curl_multi_remove_handle(writer.handle, data->handle);
241
242 curl_easy_cleanup(data->handle);
243 data->handle = NULL;
244 }
245 }
246
247 /******************************************************************************
248 * *
249 * Function: elastic_is_error_present *
250 * *
251 * Purpose: check a error from Elastic json response *
252 * *
253 * Parameters: page - [IN] the buffer with json response *
254 * err - [OUT] the parse error message. If the error value is *
255 * set it must be freed by caller after it has *
256 * been used. *
257 * *
258 * Return value: SUCCEED - the response contains an error *
259 * FAIL - otherwise *
260 * *
261 ******************************************************************************/
elastic_is_error_present(zbx_httppage_t * page,char ** err)262 static int elastic_is_error_present(zbx_httppage_t *page, char **err)
263 {
264 struct zbx_json_parse jp, jp_values, jp_index, jp_error, jp_items, jp_item;
265 const char *errors, *p = NULL;
266 char *index = NULL, *status = NULL, *type = NULL, *reason = NULL;
267 size_t index_alloc = 0, status_alloc = 0, type_alloc = 0, reason_alloc = 0;
268 int rc_js = SUCCEED;
269
270 zabbix_log(LOG_LEVEL_TRACE, "%s() raw json: %s", __func__, ZBX_NULL2EMPTY_STR(page->data));
271
272 if (SUCCEED != zbx_json_open(page->data, &jp) || SUCCEED != zbx_json_brackets_open(jp.start, &jp_values))
273 return FAIL;
274
275 if (NULL == (errors = zbx_json_pair_by_name(&jp_values, "errors")) || 0 != strncmp("true", errors, 4))
276 return FAIL;
277
278 if (SUCCEED == zbx_json_brackets_by_name(&jp, "items", &jp_items))
279 {
280 while (NULL != (p = zbx_json_next(&jp_items, p)))
281 {
282 if (SUCCEED == zbx_json_brackets_open(p, &jp_item) &&
283 SUCCEED == zbx_json_brackets_by_name(&jp_item, "index", &jp_index) &&
284 SUCCEED == zbx_json_brackets_by_name(&jp_index, "error", &jp_error))
285 {
286 if (SUCCEED != zbx_json_value_by_name_dyn(&jp_error, "type", &type, &type_alloc, NULL))
287 rc_js = FAIL;
288 if (SUCCEED != zbx_json_value_by_name_dyn(&jp_error, "reason", &reason, &reason_alloc, NULL))
289 rc_js = FAIL;
290 }
291 else
292 continue;
293
294 if (SUCCEED != zbx_json_value_by_name_dyn(&jp_index, "status", &status, &status_alloc, NULL))
295 rc_js = FAIL;
296 if (SUCCEED != zbx_json_value_by_name_dyn(&jp_index, "_index", &index, &index_alloc, NULL))
297 rc_js = FAIL;
298
299 break;
300 }
301 }
302 else
303 rc_js = FAIL;
304
305 *err = zbx_dsprintf(NULL,"index:%s status:%s type:%s reason:%s%s", ZBX_NULL2EMPTY_STR(index),
306 ZBX_NULL2EMPTY_STR(status), ZBX_NULL2EMPTY_STR(type), ZBX_NULL2EMPTY_STR(reason),
307 FAIL == rc_js ? " / elasticsearch version is not fully compatible with zabbix server" : "");
308
309 zbx_free(status);
310 zbx_free(type);
311 zbx_free(reason);
312 zbx_free(index);
313
314 return SUCCEED;
315 }
316
317 /******************************************************************************************************************
318 * *
319 * common sql service support *
320 * *
321 ******************************************************************************************************************/
322
323
324
325 /************************************************************************************
326 * *
327 * Function: elastic_writer_init *
328 * *
329 * Purpose: initializes elastic writer for a new batch of history values *
330 * *
331 ************************************************************************************/
elastic_writer_init(void)332 static void elastic_writer_init(void)
333 {
334 if (0 != writer.initialized)
335 return;
336
337 zbx_vector_ptr_create(&writer.ifaces);
338
339 if (NULL == (writer.handle = curl_multi_init()))
340 {
341 zbx_error("Cannot initialize cURL multi session");
342 exit(EXIT_FAILURE);
343 }
344
345 writer.initialized = 1;
346 }
347
348 /************************************************************************************
349 * *
350 * Function: elastic_writer_release *
351 * *
352 * Purpose: releases initialized elastic writer by freeing allocated resources and *
353 * setting its state to uninitialized. *
354 * *
355 ************************************************************************************/
elastic_writer_release(void)356 static void elastic_writer_release(void)
357 {
358 int i;
359
360 for (i = 0; i < writer.ifaces.values_num; i++)
361 elastic_close((zbx_history_iface_t *)writer.ifaces.values[i]);
362
363 curl_multi_cleanup(writer.handle);
364 writer.handle = NULL;
365
366 zbx_vector_ptr_destroy(&writer.ifaces);
367
368 writer.initialized = 0;
369 }
370
371 /************************************************************************************
372 * *
373 * Function: elastic_writer_add_iface *
374 * *
375 * Purpose: adds history storage interface to be flushed later *
376 * *
377 * Parameters: db_insert - [IN] bulk insert data *
378 * *
379 ************************************************************************************/
elastic_writer_add_iface(zbx_history_iface_t * hist)380 static void elastic_writer_add_iface(zbx_history_iface_t *hist)
381 {
382 zbx_elastic_data_t *data = (zbx_elastic_data_t *)hist->data;
383 CURLoption opt;
384 CURLcode err;
385
386 elastic_writer_init();
387
388 if (NULL == (data->handle = curl_easy_init()))
389 {
390 zabbix_log(LOG_LEVEL_ERR, "cannot initialize cURL session");
391 return;
392 }
393
394 if (CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_URL, data->post_url)) ||
395 CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_POST, 1L)) ||
396 CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_POSTFIELDS, data->buf)) ||
397 CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_WRITEFUNCTION,
398 curl_write_cb)) ||
399 CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_WRITEDATA,
400 &page_w[hist->value_type].page)) ||
401 CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_FAILONERROR, 1L)) ||
402 CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_ERRORBUFFER,
403 page_w[hist->value_type].errbuf)) ||
404 CURLE_OK != (err = curl_easy_setopt(data->handle, opt = ZBX_CURLOPT_ACCEPT_ENCODING, "")))
405 {
406 zabbix_log(LOG_LEVEL_ERR, "cannot set cURL option %d: [%s]", (int)opt, curl_easy_strerror(err));
407 goto out;
408 }
409
410 *page_w[hist->value_type].errbuf = '\0';
411
412 if (CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_PRIVATE, &page_w[hist->value_type])))
413 {
414 zabbix_log(LOG_LEVEL_ERR, "cannot set cURL option %d: [%s]", (int)opt, curl_easy_strerror(err));
415 goto out;
416 }
417
418 page_w[hist->value_type].page.offset = 0;
419
420 if (0 < page_w[hist->value_type].page.alloc)
421 *page_w[hist->value_type].page.data = '\0';
422
423 curl_multi_add_handle(writer.handle, data->handle);
424
425 zbx_vector_ptr_append(&writer.ifaces, hist);
426
427 return;
428 out:
429 elastic_close(hist);
430 }
431
432 /************************************************************************************
433 * *
434 * Function: elastic_writer_flush *
435 * *
436 * Purpose: posts historical data to elastic storage *
437 * *
438 ************************************************************************************/
elastic_writer_flush(void)439 static int elastic_writer_flush(void)
440 {
441 struct curl_slist *curl_headers = NULL;
442 int i, running, previous, msgnum;
443 CURLMsg *msg;
444 zbx_vector_ptr_t retries;
445 CURLcode err;
446 int ret = SUCCEED;
447
448 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
449
450 /* The writer might be uninitialized only if the history */
451 /* was already flushed. In that case, return SUCCEED */
452 if (0 == writer.initialized)
453 goto end;
454
455 zbx_vector_ptr_create(&retries);
456
457 curl_headers = curl_slist_append(curl_headers, "Content-Type: application/x-ndjson");
458
459 for (i = 0; i < writer.ifaces.values_num; i++)
460 {
461 zbx_history_iface_t *hist = (zbx_history_iface_t *)writer.ifaces.values[i];
462 zbx_elastic_data_t *data = (zbx_elastic_data_t *)hist->data;
463
464 if (CURLE_OK != (err = curl_easy_setopt(data->handle, CURLOPT_HTTPHEADER, curl_headers)))
465 {
466 zabbix_log(LOG_LEVEL_ERR, "cannot set cURL option %d: [%s]", (int)CURLOPT_HTTPHEADER,
467 curl_easy_strerror(err));
468 ret = FAIL;
469 goto clean;
470 }
471
472 zabbix_log(LOG_LEVEL_DEBUG, "sending %s", data->buf);
473 }
474
475 try_again:
476 previous = 0;
477
478 do
479 {
480 int fds;
481 CURLMcode code;
482 char *error;
483 zbx_curlpage_t *curl_page;
484
485 if (CURLM_OK != (code = curl_multi_perform(writer.handle, &running)))
486 {
487 zabbix_log(LOG_LEVEL_ERR, "cannot perform on curl multi handle: %s", curl_multi_strerror(code));
488 break;
489 }
490
491 if (CURLM_OK != (code = curl_multi_wait(writer.handle, NULL, 0, ZBX_HISTORY_STORAGE_DOWN, &fds)))
492 {
493 zabbix_log(LOG_LEVEL_ERR, "cannot wait on curl multi handle: %s", curl_multi_strerror(code));
494 break;
495 }
496
497 if (previous == running)
498 continue;
499
500 while (NULL != (msg = curl_multi_info_read(writer.handle, &msgnum)))
501 {
502 /* If the error is due to malformed data, there is no sense on re-trying to send. */
503 /* That's why we actually check for transport and curl errors separately */
504 if (CURLE_HTTP_RETURNED_ERROR == msg->data.result)
505 {
506 if (CURLE_OK == curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE,
507 (char **)&curl_page) && '\0' != *curl_page->errbuf)
508 {
509 zabbix_log(LOG_LEVEL_ERR, "cannot send data to elasticsearch, HTTP error"
510 " message: %s", curl_page->errbuf);
511 }
512 else
513 {
514 char http_status[MAX_STRING_LEN];
515 long int response_code;
516
517 if (CURLE_OK == curl_easy_getinfo(msg->easy_handle,
518 CURLINFO_RESPONSE_CODE, &response_code))
519 {
520 zbx_snprintf(http_status, sizeof(http_status), "HTTP status code: %ld",
521 response_code);
522 }
523 else
524 {
525 zbx_strlcpy(http_status, "unknown HTTP status code",
526 sizeof(http_status));
527 }
528
529 zabbix_log(LOG_LEVEL_ERR, "cannot send data to elasticsearch, %s", http_status);
530 }
531 }
532 else if (CURLE_OK != msg->data.result)
533 {
534 if (CURLE_OK == curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE,
535 (char **)&curl_page) && '\0' != *curl_page->errbuf)
536 {
537 zabbix_log(LOG_LEVEL_WARNING, "cannot send data to elasticsearch: %s",
538 curl_page->errbuf);
539 }
540 else
541 {
542 zabbix_log(LOG_LEVEL_WARNING, "cannot send data to elasticsearch: %s",
543 curl_easy_strerror(msg->data.result));
544 }
545
546 /* If the error is due to curl internal problems or unrelated */
547 /* problems with HTTP, we put the handle in a retry list and */
548 /* remove it from the current execution loop */
549 zbx_vector_ptr_append(&retries, msg->easy_handle);
550 curl_multi_remove_handle(writer.handle, msg->easy_handle);
551 }
552 else if (CURLE_OK == curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, (char **)&curl_page)
553 && SUCCEED == elastic_is_error_present(&curl_page->page, &error))
554 {
555 zabbix_log(LOG_LEVEL_WARNING, "%s() cannot send data to elasticsearch: %s",
556 __func__, error);
557 zbx_free(error);
558
559 /* If the error is due to elastic internal problems (for example an index */
560 /* became read-only), we put the handle in a retry list and */
561 /* remove it from the current execution loop */
562 zbx_vector_ptr_append(&retries, msg->easy_handle);
563 curl_multi_remove_handle(writer.handle, msg->easy_handle);
564 }
565 }
566
567 previous = running;
568 }
569 while (running);
570
571 /* We check if we have handles to retry. If yes, we put them back in the multi */
572 /* handle and go to the beginning of the do while() for try sending the data again */
573 /* after sleeping for ZBX_HISTORY_STORAGE_DOWN / 1000 (seconds) */
574 if (0 < retries.values_num)
575 {
576 for (i = 0; i < retries.values_num; i++)
577 curl_multi_add_handle(writer.handle, retries.values[i]);
578
579 zbx_vector_ptr_clear(&retries);
580
581 sleep(ZBX_HISTORY_STORAGE_DOWN / 1000);
582 goto try_again;
583 }
584 clean:
585 curl_slist_free_all(curl_headers);
586
587 zbx_vector_ptr_destroy(&retries);
588
589 elastic_writer_release();
590
591 end:
592 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
593
594 return ret;
595 }
596
597 /******************************************************************************************************************
598 * *
599 * history interface support *
600 * *
601 ******************************************************************************************************************/
602
603 /************************************************************************************
604 * *
605 * Function: elastic_destroy *
606 * *
607 * Purpose: destroys history storage interface *
608 * *
609 * Parameters: hist - [IN] the history storage interface *
610 * *
611 ************************************************************************************/
elastic_destroy(zbx_history_iface_t * hist)612 static void elastic_destroy(zbx_history_iface_t *hist)
613 {
614 zbx_elastic_data_t *data = (zbx_elastic_data_t *)hist->data;
615
616 elastic_close(hist);
617
618 zbx_free(data->base_url);
619 zbx_free(data);
620 }
621
622 /************************************************************************************
623 * *
624 * Function: elastic_get_values *
625 * *
626 * Purpose: gets item history data from history storage *
627 * *
628 * Parameters: hist - [IN] the history storage interface *
629 * itemid - [IN] the itemid *
630 * start - [IN] the period start timestamp *
631 * count - [IN] the number of values to read *
632 * end - [IN] the period end timestamp *
633 * values - [OUT] the item history data values *
634 * *
635 * Return value: SUCCEED - the history data were read successfully *
636 * FAIL - otherwise *
637 * *
638 * Comments: This function reads <count> values from ]<start>,<end>] interval or *
639 * all values from the specified interval if count is zero. *
640 * *
641 ************************************************************************************/
elastic_get_values(zbx_history_iface_t * hist,zbx_uint64_t itemid,int start,int count,int end,zbx_vector_history_record_t * values)642 static int elastic_get_values(zbx_history_iface_t *hist, zbx_uint64_t itemid, int start, int count, int end,
643 zbx_vector_history_record_t *values)
644 {
645 zbx_elastic_data_t *data = (zbx_elastic_data_t *)hist->data;
646 size_t url_alloc = 0, url_offset = 0, id_alloc = 0, scroll_alloc = 0, scroll_offset = 0;
647 int total, empty, ret;
648 CURLcode err;
649 struct zbx_json query;
650 struct curl_slist *curl_headers = NULL;
651 char *scroll_id = NULL, *scroll_query = NULL, errbuf[CURL_ERROR_SIZE];
652 CURLoption opt;
653
654 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
655
656 ret = FAIL;
657
658 if (NULL == (data->handle = curl_easy_init()))
659 {
660 zabbix_log(LOG_LEVEL_ERR, "cannot initialize cURL session");
661
662 return FAIL;
663 }
664
665 zbx_snprintf_alloc(&data->post_url, &url_alloc, &url_offset, "%s/%s*/_search?scroll=10s", data->base_url,
666 value_type_str[hist->value_type]);
667
668 /* prepare the json query for elasticsearch, apply ranges if needed */
669 zbx_json_init(&query, ZBX_JSON_ALLOCATE);
670
671 if (0 < count)
672 {
673 zbx_json_adduint64(&query, "size", count);
674 zbx_json_addarray(&query, "sort");
675 zbx_json_addobject(&query, NULL);
676 zbx_json_addobject(&query, "clock");
677 zbx_json_addstring(&query, "order", "desc", ZBX_JSON_TYPE_STRING);
678 zbx_json_close(&query);
679 zbx_json_close(&query);
680 zbx_json_close(&query);
681 }
682
683 zbx_json_addobject(&query, "query");
684 zbx_json_addobject(&query, "bool");
685 zbx_json_addarray(&query, "must");
686 zbx_json_addobject(&query, NULL);
687 zbx_json_addobject(&query, "match");
688 zbx_json_adduint64(&query, "itemid", itemid);
689 zbx_json_close(&query);
690 zbx_json_close(&query);
691 zbx_json_close(&query);
692 zbx_json_addarray(&query, "filter");
693 zbx_json_addobject(&query, NULL);
694 zbx_json_addobject(&query, "range");
695 zbx_json_addobject(&query, "clock");
696
697 if (0 < start)
698 zbx_json_adduint64(&query, "gt", start);
699
700 if (0 < end)
701 zbx_json_adduint64(&query, "lte", end);
702
703 zbx_json_close(&query);
704 zbx_json_close(&query);
705 zbx_json_close(&query);
706 zbx_json_close(&query);
707 zbx_json_close(&query);
708 zbx_json_close(&query);
709 zbx_json_close(&query);
710
711 curl_headers = curl_slist_append(curl_headers, "Content-Type: application/json");
712
713 if (CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_URL, data->post_url)) ||
714 CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_POSTFIELDS, query.buffer)) ||
715 CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_WRITEFUNCTION,
716 curl_write_cb)) ||
717 CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_WRITEDATA, &page_r)) ||
718 CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_HTTPHEADER, curl_headers)) ||
719 CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_FAILONERROR, 1L)) ||
720 CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_ERRORBUFFER, errbuf)) ||
721 CURLE_OK != (err = curl_easy_setopt(data->handle, opt = ZBX_CURLOPT_ACCEPT_ENCODING, "")))
722 {
723 zabbix_log(LOG_LEVEL_ERR, "cannot set cURL option %d: [%s]", (int)opt, curl_easy_strerror(err));
724 goto out;
725 }
726
727 zabbix_log(LOG_LEVEL_DEBUG, "sending query to %s; post data: %s", data->post_url, query.buffer);
728
729 page_r.offset = 0;
730 *errbuf = '\0';
731 if (CURLE_OK != (err = curl_easy_perform(data->handle)))
732 {
733 elastic_log_error(data->handle, err, errbuf);
734 goto out;
735 }
736
737 url_offset = 0;
738 zbx_snprintf_alloc(&data->post_url, &url_alloc, &url_offset, "%s/_search/scroll", data->base_url);
739
740 if (CURLE_OK != (err = curl_easy_setopt(data->handle, CURLOPT_URL, data->post_url)))
741 {
742 zabbix_log(LOG_LEVEL_ERR, "cannot set cURL option %d: [%s]", (int)CURLOPT_URL,
743 curl_easy_strerror(err));
744 goto out;
745 }
746
747 total = (0 == count ? -1 : count);
748
749 /* For processing the records, we need to keep track of the total requested and if the response from the */
750 /* elasticsearch server is empty. For this we use two variables, empty and total. If the result is empty or */
751 /* the total reach zero, we terminate the scrolling query and return what we currently have. */
752 do
753 {
754 struct zbx_json_parse jp, jp_values, jp_item, jp_sub, jp_hits, jp_source;
755 zbx_history_record_t hr;
756 const char *p = NULL;
757
758 empty = 1;
759
760 zabbix_log(LOG_LEVEL_DEBUG, "received from elasticsearch: %s", page_r.data);
761
762 zbx_json_open(page_r.data, &jp);
763 zbx_json_brackets_open(jp.start, &jp_values);
764
765 /* get the scroll id immediately, for being used in subsequent queries */
766 if (SUCCEED != zbx_json_value_by_name_dyn(&jp_values, "_scroll_id", &scroll_id, &id_alloc, NULL))
767 {
768 zabbix_log(LOG_LEVEL_WARNING, "elasticsearch version is not compatible with zabbix server. "
769 "_scroll_id tag is absent");
770 }
771
772 zbx_json_brackets_by_name(&jp_values, "hits", &jp_sub);
773 zbx_json_brackets_by_name(&jp_sub, "hits", &jp_hits);
774
775 while (NULL != (p = zbx_json_next(&jp_hits, p)))
776 {
777 empty = 0;
778
779 if (SUCCEED != zbx_json_brackets_open(p, &jp_item))
780 continue;
781
782 if (SUCCEED != zbx_json_brackets_by_name(&jp_item, "_source", &jp_source))
783 continue;
784
785 if (SUCCEED != history_parse_value(&jp_source, hist->value_type, &hr))
786 continue;
787
788 zbx_vector_history_record_append_ptr(values, &hr);
789
790 if (-1 != total)
791 --total;
792
793 if (0 == total)
794 {
795 empty = 1;
796 break;
797 }
798 }
799
800 if (1 == empty)
801 {
802 ret = SUCCEED;
803 break;
804 }
805
806 /* scroll to the next page */
807 scroll_offset = 0;
808 zbx_snprintf_alloc(&scroll_query, &scroll_alloc, &scroll_offset,
809 "{\"scroll\":\"10s\",\"scroll_id\":\"%s\"}\n", ZBX_NULL2EMPTY_STR(scroll_id));
810
811 if (CURLE_OK != (err = curl_easy_setopt(data->handle, CURLOPT_POSTFIELDS, scroll_query)))
812 {
813 zabbix_log(LOG_LEVEL_ERR, "cannot set cURL option %d: [%s]", (int)CURLOPT_POSTFIELDS,
814 curl_easy_strerror(err));
815 break;
816 }
817
818 page_r.offset = 0;
819 *errbuf = '\0';
820 if (CURLE_OK != (err = curl_easy_perform(data->handle)))
821 {
822 elastic_log_error(data->handle, err, errbuf);
823 break;
824 }
825 }
826 while (0 == empty);
827
828 /* as recommended by the elasticsearch documentation, we close the scroll search through a DELETE request */
829 if (NULL != scroll_id)
830 {
831 url_offset = 0;
832 zbx_snprintf_alloc(&data->post_url, &url_alloc, &url_offset, "%s/_search/scroll/%s", data->base_url,
833 scroll_id);
834
835 if (CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_URL, data->post_url)) ||
836 CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_POSTFIELDS, "")) ||
837 CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_CUSTOMREQUEST, "DELETE")))
838 {
839 zabbix_log(LOG_LEVEL_ERR, "cannot set cURL option %d: [%s]", (int)opt,
840 curl_easy_strerror(err));
841 ret = FAIL;
842 goto out;
843 }
844
845
846 zabbix_log(LOG_LEVEL_DEBUG, "elasticsearch closing scroll %s", data->post_url);
847
848 page_r.offset = 0;
849 *errbuf = '\0';
850 if (CURLE_OK != (err = curl_easy_perform(data->handle)))
851 elastic_log_error(data->handle, err, errbuf);
852 }
853
854 out:
855 elastic_close(hist);
856
857 curl_slist_free_all(curl_headers);
858
859 zbx_json_free(&query);
860
861 zbx_free(scroll_id);
862 zbx_free(scroll_query);
863
864 zbx_vector_history_record_sort(values, (zbx_compare_func_t)zbx_history_record_compare_desc_func);
865
866 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
867
868 return ret;
869 }
870
871 /************************************************************************************
872 * *
873 * Function: elastic_add_values *
874 * *
875 * Purpose: sends history data to the storage *
876 * *
877 * Parameters: hist - [IN] the history storage interface *
878 * history - [IN] the history data vector (may have mixed value types) *
879 * *
880 ************************************************************************************/
elastic_add_values(zbx_history_iface_t * hist,const zbx_vector_ptr_t * history)881 static int elastic_add_values(zbx_history_iface_t *hist, const zbx_vector_ptr_t *history)
882 {
883 zbx_elastic_data_t *data = (zbx_elastic_data_t *)hist->data;
884 int i, num = 0;
885 ZBX_DC_HISTORY *h;
886 struct zbx_json json_idx, json;
887 size_t buf_alloc = 0, buf_offset = 0;
888 char pipeline[14]; /* index name length + suffix "-pipeline" */
889
890 zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
891
892 zbx_json_init(&json_idx, ZBX_IDX_JSON_ALLOCATE);
893
894 zbx_json_addobject(&json_idx, "index");
895 zbx_json_addstring(&json_idx, "_index", value_type_str[hist->value_type], ZBX_JSON_TYPE_STRING);
896
897 if (1 == CONFIG_HISTORY_STORAGE_PIPELINES)
898 {
899 zbx_snprintf(pipeline, sizeof(pipeline), "%s-pipeline", value_type_str[hist->value_type]);
900 zbx_json_addstring(&json_idx, "pipeline", pipeline, ZBX_JSON_TYPE_STRING);
901 }
902
903 zbx_json_close(&json_idx);
904 zbx_json_close(&json_idx);
905
906 for (i = 0; i < history->values_num; i++)
907 {
908 h = (ZBX_DC_HISTORY *)history->values[i];
909
910 if (hist->value_type != h->value_type)
911 continue;
912
913 zbx_json_init(&json, ZBX_JSON_ALLOCATE);
914
915 zbx_json_adduint64(&json, "itemid", h->itemid);
916
917 zbx_json_addstring(&json, "value", history_value2str(h), ZBX_JSON_TYPE_STRING);
918
919 if (ITEM_VALUE_TYPE_LOG == h->value_type)
920 {
921 const zbx_log_value_t *log;
922
923 log = h->value.log;
924
925 zbx_json_adduint64(&json, "timestamp", log->timestamp);
926 zbx_json_addstring(&json, "source", ZBX_NULL2EMPTY_STR(log->source), ZBX_JSON_TYPE_STRING);
927 zbx_json_adduint64(&json, "severity", log->severity);
928 zbx_json_adduint64(&json, "logeventid", log->logeventid);
929 }
930
931 zbx_json_adduint64(&json, "clock", h->ts.sec);
932 zbx_json_adduint64(&json, "ns", h->ts.ns);
933 zbx_json_adduint64(&json, "ttl", h->ttl);
934
935 zbx_json_close(&json);
936
937 zbx_snprintf_alloc(&data->buf, &buf_alloc, &buf_offset, "%s\n%s\n", json_idx.buffer, json.buffer);
938
939 zbx_json_free(&json);
940
941 num++;
942 }
943
944 if (num > 0)
945 {
946 data->post_url = zbx_dsprintf(NULL, "%s/_bulk?refresh=true", data->base_url);
947 elastic_writer_add_iface(hist);
948 }
949
950 zbx_json_free(&json_idx);
951
952 zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
953
954 return num;
955 }
956
957 /************************************************************************************
958 * *
959 * Function: elastic_flush *
960 * *
961 * Purpose: flushes the history data to storage *
962 * *
963 * Parameters: hist - [IN] the history storage interface *
964 * *
965 * Comments: This function will try to flush the data until it succeeds or *
966 * unrecoverable error occurs *
967 * *
968 ************************************************************************************/
elastic_flush(zbx_history_iface_t * hist)969 static int elastic_flush(zbx_history_iface_t *hist)
970 {
971 ZBX_UNUSED(hist);
972
973 return elastic_writer_flush();
974 }
975
976 /************************************************************************************
977 * *
978 * Function: zbx_history_elastic_init *
979 * *
980 * Purpose: initializes history storage interface *
981 * *
982 * Parameters: hist - [IN] the history storage interface *
983 * value_type - [IN] the target value type *
984 * error - [OUT] the error message *
985 * *
986 * Return value: SUCCEED - the history storage interface was initialized *
987 * FAIL - otherwise *
988 * *
989 ************************************************************************************/
zbx_history_elastic_init(zbx_history_iface_t * hist,unsigned char value_type,char ** error)990 int zbx_history_elastic_init(zbx_history_iface_t *hist, unsigned char value_type, char **error)
991 {
992 zbx_elastic_data_t *data;
993
994 if (0 != curl_global_init(CURL_GLOBAL_ALL))
995 {
996 *error = zbx_strdup(*error, "Cannot initialize cURL library");
997 return FAIL;
998 }
999
1000 data = (zbx_elastic_data_t *)zbx_malloc(NULL, sizeof(zbx_elastic_data_t));
1001 memset(data, 0, sizeof(zbx_elastic_data_t));
1002 data->base_url = zbx_strdup(NULL, CONFIG_HISTORY_STORAGE_URL);
1003 zbx_rtrim(data->base_url, "/");
1004 data->buf = NULL;
1005 data->post_url = NULL;
1006 data->handle = NULL;
1007
1008 hist->value_type = value_type;
1009 hist->data = data;
1010 hist->destroy = elastic_destroy;
1011 hist->add_values = elastic_add_values;
1012 hist->flush = elastic_flush;
1013 hist->get_values = elastic_get_values;
1014 hist->requires_trends = 0;
1015
1016 return SUCCEED;
1017 }
1018
1019 #else
1020
zbx_history_elastic_init(zbx_history_iface_t * hist,unsigned char value_type,char ** error)1021 int zbx_history_elastic_init(zbx_history_iface_t *hist, unsigned char value_type, char **error)
1022 {
1023 ZBX_UNUSED(hist);
1024 ZBX_UNUSED(value_type);
1025
1026 *error = zbx_strdup(*error, "cURL library support >= 7.28.0 is required for Elasticsearch history backend");
1027 return FAIL;
1028 }
1029
1030 #endif
1031