1 /*
2 ** Zabbix
3 ** Copyright (C) 2001-2021 Zabbix SIA
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
18 **/
19 
20 #include "common.h"
21 #include "log.h"
22 #include "zbxjson.h"
23 #include "zbxalgo.h"
24 #include "dbcache.h"
25 #include "zbxhistory.h"
26 #include "zbxself.h"
27 #include "history.h"
28 
29 /* curl_multi_wait() is supported starting with version 7.28.0 (0x071c00) */
30 #if defined(HAVE_LIBCURL) && LIBCURL_VERSION_NUM >= 0x071c00
31 
32 #define		ZBX_HISTORY_STORAGE_DOWN	10000 /* Timeout in milliseconds */
33 
34 #define		ZBX_IDX_JSON_ALLOCATE		256
35 #define		ZBX_JSON_ALLOCATE		2048
36 
37 
38 const char	*value_type_str[] = {"dbl", "str", "log", "uint", "text"};
39 
40 extern char	*CONFIG_HISTORY_STORAGE_URL;
41 extern int	CONFIG_HISTORY_STORAGE_PIPELINES;
42 
43 typedef struct
44 {
45 	char	*base_url;
46 	char	*post_url;
47 	char	*buf;
48 	CURL	*handle;
49 }
50 zbx_elastic_data_t;
51 
52 typedef struct
53 {
54 	unsigned char		initialized;
55 	zbx_vector_ptr_t	ifaces;
56 
57 	CURLM			*handle;
58 }
59 zbx_elastic_writer_t;
60 
61 static zbx_elastic_writer_t	writer;
62 
63 typedef struct
64 {
65 	char	*data;
66 	size_t	alloc;
67 	size_t	offset;
68 }
69 zbx_httppage_t;
70 
71 static zbx_httppage_t	page_r;
72 
73 typedef struct
74 {
75 	zbx_httppage_t	page;
76 	char		errbuf[CURL_ERROR_SIZE];
77 }
78 zbx_curlpage_t;
79 
80 static zbx_curlpage_t	page_w[ITEM_VALUE_TYPE_MAX];
81 
curl_write_cb(void * ptr,size_t size,size_t nmemb,void * userdata)82 static size_t	curl_write_cb(void *ptr, size_t size, size_t nmemb, void *userdata)
83 {
84 	size_t	r_size = size * nmemb;
85 
86 	zbx_httppage_t	*page = (zbx_httppage_t	*)userdata;
87 
88 	zbx_strncpy_alloc(&page->data, &page->alloc, &page->offset, ptr, r_size);
89 
90 	return r_size;
91 }
92 
history_str2value(char * str,unsigned char value_type)93 static history_value_t	history_str2value(char *str, unsigned char value_type)
94 {
95 	history_value_t	value;
96 
97 	switch (value_type)
98 	{
99 		case ITEM_VALUE_TYPE_LOG:
100 			value.log = (zbx_log_value_t *)zbx_malloc(NULL, sizeof(zbx_log_value_t));
101 			memset(value.log, 0, sizeof(zbx_log_value_t));
102 			value.log->value = zbx_strdup(NULL, str);
103 			break;
104 		case ITEM_VALUE_TYPE_STR:
105 		case ITEM_VALUE_TYPE_TEXT:
106 			value.str = zbx_strdup(NULL, str);
107 			break;
108 		case ITEM_VALUE_TYPE_FLOAT:
109 			value.dbl = atof(str);
110 			break;
111 		case ITEM_VALUE_TYPE_UINT64:
112 			ZBX_STR2UINT64(value.ui64, str);
113 			break;
114 	}
115 
116 	return value;
117 }
118 
history_value2str(const ZBX_DC_HISTORY * h)119 static const char	*history_value2str(const ZBX_DC_HISTORY *h)
120 {
121 	static char	buffer[ZBX_MAX_DOUBLE_LEN + 1];
122 
123 	switch (h->value_type)
124 	{
125 		case ITEM_VALUE_TYPE_STR:
126 		case ITEM_VALUE_TYPE_TEXT:
127 			return h->value.str;
128 		case ITEM_VALUE_TYPE_LOG:
129 			return h->value.log->value;
130 		case ITEM_VALUE_TYPE_FLOAT:
131 			zbx_snprintf(buffer, sizeof(buffer), ZBX_FS_DBL64, h->value.dbl);
132 			break;
133 		case ITEM_VALUE_TYPE_UINT64:
134 			zbx_snprintf(buffer, sizeof(buffer), ZBX_FS_UI64, h->value.ui64);
135 			break;
136 	}
137 
138 	return buffer;
139 }
140 
history_parse_value(struct zbx_json_parse * jp,unsigned char value_type,zbx_history_record_t * hr)141 static int	history_parse_value(struct zbx_json_parse *jp, unsigned char value_type, zbx_history_record_t *hr)
142 {
143 	char	*value = NULL;
144 	size_t	value_alloc = 0;
145 	int	ret = FAIL;
146 
147 	if (SUCCEED != zbx_json_value_by_name_dyn(jp, "clock", &value, &value_alloc, NULL))
148 		goto out;
149 
150 	hr->timestamp.sec = atoi(value);
151 
152 	if (SUCCEED != zbx_json_value_by_name_dyn(jp, "ns", &value, &value_alloc, NULL))
153 		goto out;
154 
155 	hr->timestamp.ns = atoi(value);
156 
157 	if (SUCCEED != zbx_json_value_by_name_dyn(jp, "value", &value, &value_alloc, NULL))
158 		goto out;
159 
160 	hr->value = history_str2value(value, value_type);
161 
162 	if (ITEM_VALUE_TYPE_LOG == value_type)
163 	{
164 
165 		if (SUCCEED != zbx_json_value_by_name_dyn(jp, "timestamp", &value, &value_alloc, NULL))
166 			goto out;
167 
168 		hr->value.log->timestamp = atoi(value);
169 
170 		if (SUCCEED != zbx_json_value_by_name_dyn(jp, "logeventid", &value, &value_alloc, NULL))
171 			goto out;
172 
173 		hr->value.log->logeventid = atoi(value);
174 
175 		if (SUCCEED != zbx_json_value_by_name_dyn(jp, "severity", &value, &value_alloc, NULL))
176 			goto out;
177 
178 		hr->value.log->severity = atoi(value);
179 
180 		if (SUCCEED != zbx_json_value_by_name_dyn(jp, "source", &value, &value_alloc, NULL))
181 			goto out;
182 
183 		hr->value.log->source = zbx_strdup(NULL, value);
184 	}
185 
186 	ret = SUCCEED;
187 
188 out:
189 	zbx_free(value);
190 
191 	return ret;
192 }
193 
elastic_log_error(CURL * handle,CURLcode error,const char * errbuf)194 static void	elastic_log_error(CURL *handle, CURLcode error, const char *errbuf)
195 {
196 	char		http_status[MAX_STRING_LEN];
197 	long int	http_code;
198 
199 	if (CURLE_HTTP_RETURNED_ERROR == error)
200 	{
201 		if (CURLE_OK == curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &http_code))
202 			zbx_snprintf(http_status, sizeof(http_status), "HTTP status code: %ld", http_code);
203 		else
204 			zbx_strlcpy(http_status, "unknown HTTP status code", sizeof(http_status));
205 
206 		if (0 != page_r.offset)
207 		{
208 			zabbix_log(LOG_LEVEL_ERR, "cannot get values from elasticsearch, %s, message: %s", http_status,
209 					page_r.data);
210 		}
211 		else
212 			zabbix_log(LOG_LEVEL_ERR, "cannot get values from elasticsearch, %s", http_status);
213 	}
214 	else
215 	{
216 		zabbix_log(LOG_LEVEL_ERR, "cannot get values from elasticsearch: %s",
217 				'\0' != *errbuf ? errbuf : curl_easy_strerror(error));
218 	}
219 }
220 
221 /************************************************************************************
222  *                                                                                  *
223  * Function: elastic_close                                                          *
224  *                                                                                  *
225  * Purpose: closes connection and releases allocated resources                      *
226  *                                                                                  *
227  * Parameters:  hist - [IN] the history storage interface                           *
228  *                                                                                  *
229  ************************************************************************************/
elastic_close(zbx_history_iface_t * hist)230 static void	elastic_close(zbx_history_iface_t *hist)
231 {
232 	zbx_elastic_data_t	*data = (zbx_elastic_data_t *)hist->data;
233 
234 	zbx_free(data->buf);
235 	zbx_free(data->post_url);
236 
237 	if (NULL != data->handle)
238 	{
239 		if (NULL != writer.handle)
240 			curl_multi_remove_handle(writer.handle, data->handle);
241 
242 		curl_easy_cleanup(data->handle);
243 		data->handle = NULL;
244 	}
245 }
246 
247 /******************************************************************************
248  *                                                                            *
249  * Function: elastic_is_error_present                                         *
250  *                                                                            *
251  * Purpose: check a error from Elastic json response                          *
252  *                                                                            *
253  * Parameters: page - [IN]  the buffer with json response                     *
254  *             err  - [OUT] the parse error message. If the error value is    *
255  *                           set it must be freed by caller after it has      *
256  *                           been used.                                       *
257  *                                                                            *
258  * Return value: SUCCEED - the response contains an error                     *
259  *               FAIL    - otherwise                                          *
260  *                                                                            *
261  ******************************************************************************/
elastic_is_error_present(zbx_httppage_t * page,char ** err)262 static int	elastic_is_error_present(zbx_httppage_t *page, char **err)
263 {
264 	struct zbx_json_parse	jp, jp_values, jp_index, jp_error, jp_items, jp_item;
265 	const char		*errors, *p = NULL;
266 	char			*index = NULL, *status = NULL, *type = NULL, *reason = NULL;
267 	size_t			index_alloc = 0, status_alloc = 0, type_alloc = 0, reason_alloc = 0;
268 	int			rc_js = SUCCEED;
269 
270 	zabbix_log(LOG_LEVEL_TRACE, "%s() raw json: %s", __func__, ZBX_NULL2EMPTY_STR(page->data));
271 
272 	if (SUCCEED != zbx_json_open(page->data, &jp) || SUCCEED != zbx_json_brackets_open(jp.start, &jp_values))
273 		return FAIL;
274 
275 	if (NULL == (errors = zbx_json_pair_by_name(&jp_values, "errors")) || 0 != strncmp("true", errors, 4))
276 		return FAIL;
277 
278 	if (SUCCEED == zbx_json_brackets_by_name(&jp, "items", &jp_items))
279 	{
280 		while (NULL != (p = zbx_json_next(&jp_items, p)))
281 		{
282 			if (SUCCEED == zbx_json_brackets_open(p, &jp_item) &&
283 					SUCCEED == zbx_json_brackets_by_name(&jp_item, "index", &jp_index) &&
284 					SUCCEED == zbx_json_brackets_by_name(&jp_index, "error", &jp_error))
285 			{
286 				if (SUCCEED != zbx_json_value_by_name_dyn(&jp_error, "type", &type, &type_alloc, NULL))
287 					rc_js = FAIL;
288 				if (SUCCEED != zbx_json_value_by_name_dyn(&jp_error, "reason", &reason, &reason_alloc, NULL))
289 					rc_js = FAIL;
290 			}
291 			else
292 				continue;
293 
294 			if (SUCCEED != zbx_json_value_by_name_dyn(&jp_index, "status", &status, &status_alloc, NULL))
295 				rc_js = FAIL;
296 			if (SUCCEED != zbx_json_value_by_name_dyn(&jp_index, "_index", &index, &index_alloc, NULL))
297 				rc_js = FAIL;
298 
299 			break;
300 		}
301 	}
302 	else
303 		rc_js = FAIL;
304 
305 	*err = zbx_dsprintf(NULL,"index:%s status:%s type:%s reason:%s%s", ZBX_NULL2EMPTY_STR(index),
306 			ZBX_NULL2EMPTY_STR(status), ZBX_NULL2EMPTY_STR(type), ZBX_NULL2EMPTY_STR(reason),
307 			FAIL == rc_js ? " / elasticsearch version is not fully compatible with zabbix server" : "");
308 
309 	zbx_free(status);
310 	zbx_free(type);
311 	zbx_free(reason);
312 	zbx_free(index);
313 
314 	return SUCCEED;
315 }
316 
317 /******************************************************************************************************************
318  *                                                                                                                *
319  * common sql service support                                                                                     *
320  *                                                                                                                *
321  ******************************************************************************************************************/
322 
323 
324 
325 /************************************************************************************
326  *                                                                                  *
327  * Function: elastic_writer_init                                                    *
328  *                                                                                  *
329  * Purpose: initializes elastic writer for a new batch of history values            *
330  *                                                                                  *
331  ************************************************************************************/
elastic_writer_init(void)332 static void	elastic_writer_init(void)
333 {
334 	if (0 != writer.initialized)
335 		return;
336 
337 	zbx_vector_ptr_create(&writer.ifaces);
338 
339 	if (NULL == (writer.handle = curl_multi_init()))
340 	{
341 		zbx_error("Cannot initialize cURL multi session");
342 		exit(EXIT_FAILURE);
343 	}
344 
345 	writer.initialized = 1;
346 }
347 
348 /************************************************************************************
349  *                                                                                  *
350  * Function: elastic_writer_release                                                 *
351  *                                                                                  *
352  * Purpose: releases initialized elastic writer by freeing allocated resources and  *
353  *          setting its state to uninitialized.                                     *
354  *                                                                                  *
355  ************************************************************************************/
elastic_writer_release(void)356 static void	elastic_writer_release(void)
357 {
358 	int	i;
359 
360 	for (i = 0; i < writer.ifaces.values_num; i++)
361 		elastic_close((zbx_history_iface_t *)writer.ifaces.values[i]);
362 
363 	curl_multi_cleanup(writer.handle);
364 	writer.handle = NULL;
365 
366 	zbx_vector_ptr_destroy(&writer.ifaces);
367 
368 	writer.initialized = 0;
369 }
370 
371 /************************************************************************************
372  *                                                                                  *
373  * Function: elastic_writer_add_iface                                               *
374  *                                                                                  *
375  * Purpose: adds history storage interface to be flushed later                      *
376  *                                                                                  *
377  * Parameters: db_insert - [IN] bulk insert data                                    *
378  *                                                                                  *
379  ************************************************************************************/
elastic_writer_add_iface(zbx_history_iface_t * hist)380 static void	elastic_writer_add_iface(zbx_history_iface_t *hist)
381 {
382 	zbx_elastic_data_t	*data = (zbx_elastic_data_t *)hist->data;
383 	CURLoption		opt;
384 	CURLcode		err;
385 
386 	elastic_writer_init();
387 
388 	if (NULL == (data->handle = curl_easy_init()))
389 	{
390 		zabbix_log(LOG_LEVEL_ERR, "cannot initialize cURL session");
391 		return;
392 	}
393 
394 	if (CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_URL, data->post_url)) ||
395 			CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_POST, 1L)) ||
396 			CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_POSTFIELDS, data->buf)) ||
397 			CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_WRITEFUNCTION,
398 					curl_write_cb)) ||
399 			CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_WRITEDATA,
400 					&page_w[hist->value_type].page)) ||
401 			CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_FAILONERROR, 1L)) ||
402 			CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_ERRORBUFFER,
403 					page_w[hist->value_type].errbuf)) ||
404 			CURLE_OK != (err = curl_easy_setopt(data->handle, opt = ZBX_CURLOPT_ACCEPT_ENCODING, "")))
405 	{
406 		zabbix_log(LOG_LEVEL_ERR, "cannot set cURL option %d: [%s]", (int)opt, curl_easy_strerror(err));
407 		goto out;
408 	}
409 
410 	*page_w[hist->value_type].errbuf = '\0';
411 
412 	if (CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_PRIVATE, &page_w[hist->value_type])))
413 	{
414 		zabbix_log(LOG_LEVEL_ERR, "cannot set cURL option %d: [%s]", (int)opt, curl_easy_strerror(err));
415 		goto out;
416 	}
417 
418 	page_w[hist->value_type].page.offset = 0;
419 
420 	if (0 < page_w[hist->value_type].page.alloc)
421 		*page_w[hist->value_type].page.data = '\0';
422 
423 	curl_multi_add_handle(writer.handle, data->handle);
424 
425 	zbx_vector_ptr_append(&writer.ifaces, hist);
426 
427 	return;
428 out:
429 	elastic_close(hist);
430 }
431 
432 /************************************************************************************
433  *                                                                                  *
434  * Function: elastic_writer_flush                                                   *
435  *                                                                                  *
436  * Purpose: posts historical data to elastic storage                                *
437  *                                                                                  *
438  ************************************************************************************/
elastic_writer_flush(void)439 static int	elastic_writer_flush(void)
440 {
441 	struct curl_slist	*curl_headers = NULL;
442 	int			i, running, previous, msgnum;
443 	CURLMsg			*msg;
444 	zbx_vector_ptr_t	retries;
445 	CURLcode		err;
446 	int			ret = SUCCEED;
447 
448 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
449 
450 	/* The writer might be uninitialized only if the history */
451 	/* was already flushed. In that case, return SUCCEED */
452 	if (0 == writer.initialized)
453 		goto end;
454 
455 	zbx_vector_ptr_create(&retries);
456 
457 	curl_headers = curl_slist_append(curl_headers, "Content-Type: application/x-ndjson");
458 
459 	for (i = 0; i < writer.ifaces.values_num; i++)
460 	{
461 		zbx_history_iface_t	*hist = (zbx_history_iface_t *)writer.ifaces.values[i];
462 		zbx_elastic_data_t	*data = (zbx_elastic_data_t *)hist->data;
463 
464 		if (CURLE_OK != (err = curl_easy_setopt(data->handle, CURLOPT_HTTPHEADER, curl_headers)))
465 		{
466 			zabbix_log(LOG_LEVEL_ERR, "cannot set cURL option %d: [%s]", (int)CURLOPT_HTTPHEADER,
467 					curl_easy_strerror(err));
468 			ret = FAIL;
469 			goto clean;
470 		}
471 
472 		zabbix_log(LOG_LEVEL_DEBUG, "sending %s", data->buf);
473 	}
474 
475 try_again:
476 	previous = 0;
477 
478 	do
479 	{
480 		int		fds;
481 		CURLMcode	code;
482 		char 		*error;
483 		zbx_curlpage_t	*curl_page;
484 
485 		if (CURLM_OK != (code = curl_multi_perform(writer.handle, &running)))
486 		{
487 			zabbix_log(LOG_LEVEL_ERR, "cannot perform on curl multi handle: %s", curl_multi_strerror(code));
488 			break;
489 		}
490 
491 		if (CURLM_OK != (code = curl_multi_wait(writer.handle, NULL, 0, ZBX_HISTORY_STORAGE_DOWN, &fds)))
492 		{
493 			zabbix_log(LOG_LEVEL_ERR, "cannot wait on curl multi handle: %s", curl_multi_strerror(code));
494 			break;
495 		}
496 
497 		if (previous == running)
498 			continue;
499 
500 		while (NULL != (msg = curl_multi_info_read(writer.handle, &msgnum)))
501 		{
502 			/* If the error is due to malformed data, there is no sense on re-trying to send. */
503 			/* That's why we actually check for transport and curl errors separately */
504 			if (CURLE_HTTP_RETURNED_ERROR == msg->data.result)
505 			{
506 				if (CURLE_OK == curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE,
507 						(char **)&curl_page) && '\0' != *curl_page->errbuf)
508 				{
509 					zabbix_log(LOG_LEVEL_ERR, "cannot send data to elasticsearch, HTTP error"
510 							" message: %s", curl_page->errbuf);
511 				}
512 				else
513 				{
514 					char		http_status[MAX_STRING_LEN];
515 					long int	response_code;
516 
517 					if (CURLE_OK == curl_easy_getinfo(msg->easy_handle,
518 							CURLINFO_RESPONSE_CODE, &response_code))
519 					{
520 						zbx_snprintf(http_status, sizeof(http_status), "HTTP status code: %ld",
521 								response_code);
522 					}
523 					else
524 					{
525 						zbx_strlcpy(http_status, "unknown HTTP status code",
526 								sizeof(http_status));
527 					}
528 
529 					zabbix_log(LOG_LEVEL_ERR, "cannot send data to elasticsearch, %s", http_status);
530 				}
531 			}
532 			else if (CURLE_OK != msg->data.result)
533 			{
534 				if (CURLE_OK == curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE,
535 						(char **)&curl_page) && '\0' != *curl_page->errbuf)
536 				{
537 					zabbix_log(LOG_LEVEL_WARNING, "cannot send data to elasticsearch: %s",
538 							curl_page->errbuf);
539 				}
540 				else
541 				{
542 					zabbix_log(LOG_LEVEL_WARNING, "cannot send data to elasticsearch: %s",
543 							curl_easy_strerror(msg->data.result));
544 				}
545 
546 				/* If the error is due to curl internal problems or unrelated */
547 				/* problems with HTTP, we put the handle in a retry list and */
548 				/* remove it from the current execution loop */
549 				zbx_vector_ptr_append(&retries, msg->easy_handle);
550 				curl_multi_remove_handle(writer.handle, msg->easy_handle);
551 			}
552 			else if (CURLE_OK == curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, (char **)&curl_page)
553 					&& SUCCEED == elastic_is_error_present(&curl_page->page, &error))
554 			{
555 				zabbix_log(LOG_LEVEL_WARNING, "%s() cannot send data to elasticsearch: %s",
556 						__func__, error);
557 				zbx_free(error);
558 
559 				/* If the error is due to elastic internal problems (for example an index */
560 				/* became read-only), we put the handle in a retry list and */
561 				/* remove it from the current execution loop */
562 				zbx_vector_ptr_append(&retries, msg->easy_handle);
563 				curl_multi_remove_handle(writer.handle, msg->easy_handle);
564 			}
565 		}
566 
567 		previous = running;
568 	}
569 	while (running);
570 
571 	/* We check if we have handles to retry. If yes, we put them back in the multi */
572 	/* handle and go to the beginning of the do while() for try sending the data again */
573 	/* after sleeping for ZBX_HISTORY_STORAGE_DOWN / 1000 (seconds) */
574 	if (0 < retries.values_num)
575 	{
576 		for (i = 0; i < retries.values_num; i++)
577 			curl_multi_add_handle(writer.handle, retries.values[i]);
578 
579 		zbx_vector_ptr_clear(&retries);
580 
581 		sleep(ZBX_HISTORY_STORAGE_DOWN / 1000);
582 		goto try_again;
583 	}
584 clean:
585 	curl_slist_free_all(curl_headers);
586 
587 	zbx_vector_ptr_destroy(&retries);
588 
589 	elastic_writer_release();
590 
591 end:
592 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
593 
594 	return ret;
595 }
596 
597 /******************************************************************************************************************
598  *                                                                                                                *
599  * history interface support                                                                                      *
600  *                                                                                                                *
601  ******************************************************************************************************************/
602 
603 /************************************************************************************
604  *                                                                                  *
605  * Function: elastic_destroy                                                        *
606  *                                                                                  *
607  * Purpose: destroys history storage interface                                      *
608  *                                                                                  *
609  * Parameters:  hist - [IN] the history storage interface                           *
610  *                                                                                  *
611  ************************************************************************************/
elastic_destroy(zbx_history_iface_t * hist)612 static void	elastic_destroy(zbx_history_iface_t *hist)
613 {
614 	zbx_elastic_data_t	*data = (zbx_elastic_data_t *)hist->data;
615 
616 	elastic_close(hist);
617 
618 	zbx_free(data->base_url);
619 	zbx_free(data);
620 }
621 
622 /************************************************************************************
623  *                                                                                  *
624  * Function: elastic_get_values                                                     *
625  *                                                                                  *
626  * Purpose: gets item history data from history storage                             *
627  *                                                                                  *
628  * Parameters:  hist    - [IN] the history storage interface                        *
629  *              itemid  - [IN] the itemid                                           *
630  *              start   - [IN] the period start timestamp                           *
631  *              count   - [IN] the number of values to read                         *
632  *              end     - [IN] the period end timestamp                             *
633  *              values  - [OUT] the item history data values                        *
634  *                                                                                  *
635  * Return value: SUCCEED - the history data were read successfully                  *
636  *               FAIL - otherwise                                                   *
637  *                                                                                  *
638  * Comments: This function reads <count> values from ]<start>,<end>] interval or    *
639  *           all values from the specified interval if count is zero.               *
640  *                                                                                  *
641  ************************************************************************************/
elastic_get_values(zbx_history_iface_t * hist,zbx_uint64_t itemid,int start,int count,int end,zbx_vector_history_record_t * values)642 static int	elastic_get_values(zbx_history_iface_t *hist, zbx_uint64_t itemid, int start, int count, int end,
643 		zbx_vector_history_record_t *values)
644 {
645 	zbx_elastic_data_t	*data = (zbx_elastic_data_t *)hist->data;
646 	size_t			url_alloc = 0, url_offset = 0, id_alloc = 0, scroll_alloc = 0, scroll_offset = 0;
647 	int			total, empty, ret;
648 	CURLcode		err;
649 	struct zbx_json		query;
650 	struct curl_slist	*curl_headers = NULL;
651 	char			*scroll_id = NULL, *scroll_query = NULL, errbuf[CURL_ERROR_SIZE];
652 	CURLoption		opt;
653 
654 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
655 
656 	ret = FAIL;
657 
658 	if (NULL == (data->handle = curl_easy_init()))
659 	{
660 		zabbix_log(LOG_LEVEL_ERR, "cannot initialize cURL session");
661 
662 		return FAIL;
663 	}
664 
665 	zbx_snprintf_alloc(&data->post_url, &url_alloc, &url_offset, "%s/%s*/_search?scroll=10s", data->base_url,
666 			value_type_str[hist->value_type]);
667 
668 	/* prepare the json query for elasticsearch, apply ranges if needed */
669 	zbx_json_init(&query, ZBX_JSON_ALLOCATE);
670 
671 	if (0 < count)
672 	{
673 		zbx_json_adduint64(&query, "size", count);
674 		zbx_json_addarray(&query, "sort");
675 		zbx_json_addobject(&query, NULL);
676 		zbx_json_addobject(&query, "clock");
677 		zbx_json_addstring(&query, "order", "desc", ZBX_JSON_TYPE_STRING);
678 		zbx_json_close(&query);
679 		zbx_json_close(&query);
680 		zbx_json_close(&query);
681 	}
682 
683 	zbx_json_addobject(&query, "query");
684 	zbx_json_addobject(&query, "bool");
685 	zbx_json_addarray(&query, "must");
686 	zbx_json_addobject(&query, NULL);
687 	zbx_json_addobject(&query, "match");
688 	zbx_json_adduint64(&query, "itemid", itemid);
689 	zbx_json_close(&query);
690 	zbx_json_close(&query);
691 	zbx_json_close(&query);
692 	zbx_json_addarray(&query, "filter");
693 	zbx_json_addobject(&query, NULL);
694 	zbx_json_addobject(&query, "range");
695 	zbx_json_addobject(&query, "clock");
696 
697 	if (0 < start)
698 		zbx_json_adduint64(&query, "gt", start);
699 
700 	if (0 < end)
701 		zbx_json_adduint64(&query, "lte", end);
702 
703 	zbx_json_close(&query);
704 	zbx_json_close(&query);
705 	zbx_json_close(&query);
706 	zbx_json_close(&query);
707 	zbx_json_close(&query);
708 	zbx_json_close(&query);
709 	zbx_json_close(&query);
710 
711 	curl_headers = curl_slist_append(curl_headers, "Content-Type: application/json");
712 
713 	if (CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_URL, data->post_url)) ||
714 			CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_POSTFIELDS, query.buffer)) ||
715 			CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_WRITEFUNCTION,
716 					curl_write_cb)) ||
717 			CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_WRITEDATA, &page_r)) ||
718 			CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_HTTPHEADER, curl_headers)) ||
719 			CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_FAILONERROR, 1L)) ||
720 			CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_ERRORBUFFER, errbuf)) ||
721 			CURLE_OK != (err = curl_easy_setopt(data->handle, opt = ZBX_CURLOPT_ACCEPT_ENCODING, "")))
722 	{
723 		zabbix_log(LOG_LEVEL_ERR, "cannot set cURL option %d: [%s]", (int)opt, curl_easy_strerror(err));
724 		goto out;
725 	}
726 
727 	zabbix_log(LOG_LEVEL_DEBUG, "sending query to %s; post data: %s", data->post_url, query.buffer);
728 
729 	page_r.offset = 0;
730 	*errbuf = '\0';
731 	if (CURLE_OK != (err = curl_easy_perform(data->handle)))
732 	{
733 		elastic_log_error(data->handle, err, errbuf);
734 		goto out;
735 	}
736 
737 	url_offset = 0;
738 	zbx_snprintf_alloc(&data->post_url, &url_alloc, &url_offset, "%s/_search/scroll", data->base_url);
739 
740 	if (CURLE_OK != (err = curl_easy_setopt(data->handle, CURLOPT_URL, data->post_url)))
741 	{
742 		zabbix_log(LOG_LEVEL_ERR, "cannot set cURL option %d: [%s]", (int)CURLOPT_URL,
743 				curl_easy_strerror(err));
744 		goto out;
745 	}
746 
747 	total = (0 == count ? -1 : count);
748 
749 	/* For processing the records, we need to keep track of the total requested and if the response from the */
750 	/* elasticsearch server is empty. For this we use two variables, empty and total. If the result is empty or */
751 	/* the total reach zero, we terminate the scrolling query and return what we currently have. */
752 	do
753 	{
754 		struct zbx_json_parse	jp, jp_values, jp_item, jp_sub, jp_hits, jp_source;
755 		zbx_history_record_t	hr;
756 		const char		*p = NULL;
757 
758 		empty = 1;
759 
760 		zabbix_log(LOG_LEVEL_DEBUG, "received from elasticsearch: %s", page_r.data);
761 
762 		zbx_json_open(page_r.data, &jp);
763 		zbx_json_brackets_open(jp.start, &jp_values);
764 
765 		/* get the scroll id immediately, for being used in subsequent queries */
766 		if (SUCCEED != zbx_json_value_by_name_dyn(&jp_values, "_scroll_id", &scroll_id, &id_alloc, NULL))
767 		{
768 			zabbix_log(LOG_LEVEL_WARNING, "elasticsearch version is not compatible with zabbix server. "
769 					"_scroll_id tag is absent");
770 		}
771 
772 		zbx_json_brackets_by_name(&jp_values, "hits", &jp_sub);
773 		zbx_json_brackets_by_name(&jp_sub, "hits", &jp_hits);
774 
775 		while (NULL != (p = zbx_json_next(&jp_hits, p)))
776 		{
777 			empty = 0;
778 
779 			if (SUCCEED != zbx_json_brackets_open(p, &jp_item))
780 				continue;
781 
782 			if (SUCCEED != zbx_json_brackets_by_name(&jp_item, "_source", &jp_source))
783 				continue;
784 
785 			if (SUCCEED != history_parse_value(&jp_source, hist->value_type, &hr))
786 				continue;
787 
788 			zbx_vector_history_record_append_ptr(values, &hr);
789 
790 			if (-1 != total)
791 				--total;
792 
793 			if (0 == total)
794 			{
795 				empty = 1;
796 				break;
797 			}
798 		}
799 
800 		if (1 == empty)
801 		{
802 			ret = SUCCEED;
803 			break;
804 		}
805 
806 		/* scroll to the next page */
807 		scroll_offset = 0;
808 		zbx_snprintf_alloc(&scroll_query, &scroll_alloc, &scroll_offset,
809 				"{\"scroll\":\"10s\",\"scroll_id\":\"%s\"}\n", ZBX_NULL2EMPTY_STR(scroll_id));
810 
811 		if (CURLE_OK != (err = curl_easy_setopt(data->handle, CURLOPT_POSTFIELDS, scroll_query)))
812 		{
813 			zabbix_log(LOG_LEVEL_ERR, "cannot set cURL option %d: [%s]", (int)CURLOPT_POSTFIELDS,
814 					curl_easy_strerror(err));
815 			break;
816 		}
817 
818 		page_r.offset = 0;
819 		*errbuf = '\0';
820 		if (CURLE_OK != (err = curl_easy_perform(data->handle)))
821 		{
822 			elastic_log_error(data->handle, err, errbuf);
823 			break;
824 		}
825 	}
826 	while (0 == empty);
827 
828 	/* as recommended by the elasticsearch documentation, we close the scroll search through a DELETE request */
829 	if (NULL != scroll_id)
830 	{
831 		url_offset = 0;
832 		zbx_snprintf_alloc(&data->post_url, &url_alloc, &url_offset, "%s/_search/scroll/%s", data->base_url,
833 				scroll_id);
834 
835 		if (CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_URL, data->post_url)) ||
836 				CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_POSTFIELDS, "")) ||
837 				CURLE_OK != (err = curl_easy_setopt(data->handle, opt = CURLOPT_CUSTOMREQUEST, "DELETE")))
838 		{
839 			zabbix_log(LOG_LEVEL_ERR, "cannot set cURL option %d: [%s]", (int)opt,
840 					curl_easy_strerror(err));
841 			ret = FAIL;
842 			goto out;
843 		}
844 
845 
846 		zabbix_log(LOG_LEVEL_DEBUG, "elasticsearch closing scroll %s", data->post_url);
847 
848 		page_r.offset = 0;
849 		*errbuf = '\0';
850 		if (CURLE_OK != (err = curl_easy_perform(data->handle)))
851 			elastic_log_error(data->handle, err, errbuf);
852 	}
853 
854 out:
855 	elastic_close(hist);
856 
857 	curl_slist_free_all(curl_headers);
858 
859 	zbx_json_free(&query);
860 
861 	zbx_free(scroll_id);
862 	zbx_free(scroll_query);
863 
864 	zbx_vector_history_record_sort(values, (zbx_compare_func_t)zbx_history_record_compare_desc_func);
865 
866 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
867 
868 	return ret;
869 }
870 
871 /************************************************************************************
872  *                                                                                  *
873  * Function: elastic_add_values                                                     *
874  *                                                                                  *
875  * Purpose: sends history data to the storage                                       *
876  *                                                                                  *
877  * Parameters:  hist    - [IN] the history storage interface                        *
878  *              history - [IN] the history data vector (may have mixed value types) *
879  *                                                                                  *
880  ************************************************************************************/
elastic_add_values(zbx_history_iface_t * hist,const zbx_vector_ptr_t * history)881 static int	elastic_add_values(zbx_history_iface_t *hist, const zbx_vector_ptr_t *history)
882 {
883 	zbx_elastic_data_t	*data = (zbx_elastic_data_t *)hist->data;
884 	int			i, num = 0;
885 	ZBX_DC_HISTORY		*h;
886 	struct zbx_json		json_idx, json;
887 	size_t			buf_alloc = 0, buf_offset = 0;
888 	char			pipeline[14]; /* index name length + suffix "-pipeline" */
889 
890 	zabbix_log(LOG_LEVEL_DEBUG, "In %s()", __func__);
891 
892 	zbx_json_init(&json_idx, ZBX_IDX_JSON_ALLOCATE);
893 
894 	zbx_json_addobject(&json_idx, "index");
895 	zbx_json_addstring(&json_idx, "_index", value_type_str[hist->value_type], ZBX_JSON_TYPE_STRING);
896 
897 	if (1 == CONFIG_HISTORY_STORAGE_PIPELINES)
898 	{
899 		zbx_snprintf(pipeline, sizeof(pipeline), "%s-pipeline", value_type_str[hist->value_type]);
900 		zbx_json_addstring(&json_idx, "pipeline", pipeline, ZBX_JSON_TYPE_STRING);
901 	}
902 
903 	zbx_json_close(&json_idx);
904 	zbx_json_close(&json_idx);
905 
906 	for (i = 0; i < history->values_num; i++)
907 	{
908 		h = (ZBX_DC_HISTORY *)history->values[i];
909 
910 		if (hist->value_type != h->value_type)
911 			continue;
912 
913 		zbx_json_init(&json, ZBX_JSON_ALLOCATE);
914 
915 		zbx_json_adduint64(&json, "itemid", h->itemid);
916 
917 		zbx_json_addstring(&json, "value", history_value2str(h), ZBX_JSON_TYPE_STRING);
918 
919 		if (ITEM_VALUE_TYPE_LOG == h->value_type)
920 		{
921 			const zbx_log_value_t	*log;
922 
923 			log = h->value.log;
924 
925 			zbx_json_adduint64(&json, "timestamp", log->timestamp);
926 			zbx_json_addstring(&json, "source", ZBX_NULL2EMPTY_STR(log->source), ZBX_JSON_TYPE_STRING);
927 			zbx_json_adduint64(&json, "severity", log->severity);
928 			zbx_json_adduint64(&json, "logeventid", log->logeventid);
929 		}
930 
931 		zbx_json_adduint64(&json, "clock", h->ts.sec);
932 		zbx_json_adduint64(&json, "ns", h->ts.ns);
933 		zbx_json_adduint64(&json, "ttl", h->ttl);
934 
935 		zbx_json_close(&json);
936 
937 		zbx_snprintf_alloc(&data->buf, &buf_alloc, &buf_offset, "%s\n%s\n", json_idx.buffer, json.buffer);
938 
939 		zbx_json_free(&json);
940 
941 		num++;
942 	}
943 
944 	if (num > 0)
945 	{
946 		data->post_url = zbx_dsprintf(NULL, "%s/_bulk?refresh=true", data->base_url);
947 		elastic_writer_add_iface(hist);
948 	}
949 
950 	zbx_json_free(&json_idx);
951 
952 	zabbix_log(LOG_LEVEL_DEBUG, "End of %s()", __func__);
953 
954 	return num;
955 }
956 
957 /************************************************************************************
958  *                                                                                  *
959  * Function: elastic_flush                                                          *
960  *                                                                                  *
961  * Purpose: flushes the history data to storage                                     *
962  *                                                                                  *
963  * Parameters:  hist    - [IN] the history storage interface                        *
964  *                                                                                  *
965  * Comments: This function will try to flush the data until it succeeds or          *
966  *           unrecoverable error occurs                                             *
967  *                                                                                  *
968  ************************************************************************************/
elastic_flush(zbx_history_iface_t * hist)969 static int	elastic_flush(zbx_history_iface_t *hist)
970 {
971 	ZBX_UNUSED(hist);
972 
973 	return elastic_writer_flush();
974 }
975 
976 /************************************************************************************
977  *                                                                                  *
978  * Function: zbx_history_elastic_init                                               *
979  *                                                                                  *
980  * Purpose: initializes history storage interface                                   *
981  *                                                                                  *
982  * Parameters:  hist       - [IN] the history storage interface                     *
983  *              value_type - [IN] the target value type                             *
984  *              error      - [OUT] the error message                                *
985  *                                                                                  *
986  * Return value: SUCCEED - the history storage interface was initialized            *
987  *               FAIL    - otherwise                                                *
988  *                                                                                  *
989  ************************************************************************************/
zbx_history_elastic_init(zbx_history_iface_t * hist,unsigned char value_type,char ** error)990 int	zbx_history_elastic_init(zbx_history_iface_t *hist, unsigned char value_type, char **error)
991 {
992 	zbx_elastic_data_t	*data;
993 
994 	if (0 != curl_global_init(CURL_GLOBAL_ALL))
995 	{
996 		*error = zbx_strdup(*error, "Cannot initialize cURL library");
997 		return FAIL;
998 	}
999 
1000 	data = (zbx_elastic_data_t *)zbx_malloc(NULL, sizeof(zbx_elastic_data_t));
1001 	memset(data, 0, sizeof(zbx_elastic_data_t));
1002 	data->base_url = zbx_strdup(NULL, CONFIG_HISTORY_STORAGE_URL);
1003 	zbx_rtrim(data->base_url, "/");
1004 	data->buf = NULL;
1005 	data->post_url = NULL;
1006 	data->handle = NULL;
1007 
1008 	hist->value_type = value_type;
1009 	hist->data = data;
1010 	hist->destroy = elastic_destroy;
1011 	hist->add_values = elastic_add_values;
1012 	hist->flush = elastic_flush;
1013 	hist->get_values = elastic_get_values;
1014 	hist->requires_trends = 0;
1015 
1016 	return SUCCEED;
1017 }
1018 
1019 #else
1020 
zbx_history_elastic_init(zbx_history_iface_t * hist,unsigned char value_type,char ** error)1021 int	zbx_history_elastic_init(zbx_history_iface_t *hist, unsigned char value_type, char **error)
1022 {
1023 	ZBX_UNUSED(hist);
1024 	ZBX_UNUSED(value_type);
1025 
1026 	*error = zbx_strdup(*error, "cURL library support >= 7.28.0 is required for Elasticsearch history backend");
1027 	return FAIL;
1028 }
1029 
1030 #endif
1031