1<?php
2/*
3** Zabbix
4** Copyright (C) 2001-2021 Zabbix SIA
5**
6** This program is free software; you can redistribute it and/or modify
7** it under the terms of the GNU General Public License as published by
8** the Free Software Foundation; either version 2 of the License, or
9** (at your option) any later version.
10**
11** This program is distributed in the hope that it will be useful,
12** but WITHOUT ANY WARRANTY; without even the implied warranty of
13** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14** GNU General Public License for more details.
15**
16** You should have received a copy of the GNU General Public License
17** along with this program; if not, write to the Free Software
18** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
19**/
20
21
22/**
23 * A helper class for working with Elasticsearch.
24 */
25class CElasticsearchHelper {
26
27	const MAX_RESULT_WINDOW = 10000;
28	const KEEP_CONTEXT_PERIOD = '10s';
29
30	private static $scroll_id;
31	private static $scrolls;
32
33	/**
34	 * Perform request to Elasticsearch.
35	 *
36	 * @param string $method      HTTP method to be used to perform request
37	 * @param string $endpoint    requested url
38	 * @param mixed  $request     data to be sent
39	 *
40	 * @return string    result
41	 */
42	private static function request($method, $endpoint, $request = null) {
43		$time_start = microtime(true);
44		$options = [
45			'http' => [
46				'header'  => "Content-Type: application/json; charset=UTF-8",
47				'method'  => $method,
48				'ignore_errors' => true // To get error messages from Elasticsearch.
49			]
50		];
51
52		if ($request) {
53			$request = json_encode($request);
54			$options['http']['content'] = $request;
55		}
56
57		try {
58			$result = file_get_contents($endpoint, false, stream_context_create($options));
59		}
60		catch (Exception $e) {
61			error($e->getMessage());
62		}
63
64		CProfiler::getInstance()->profileElasticsearch(microtime(true) - $time_start, $method, $endpoint, $request);
65
66		return $result;
67	}
68
69	/**
70	 * Get Elasticsearch endpoint for scroll API requests.
71	 * Endpoint should be in following format: <Elasticsearch url>/<indices>/<values>/<action><query string>.
72	 *
73	 * @param string $endpoint    endpoint of the initial request
74	 *
75	 * @return array    parsed result
76	 */
77	private static function getScrollApiEndpoint($endpoint) {
78		$url = $endpoint;
79
80		for ($i = 0; $i < 2; $i++) {
81			if (($pos = strrpos($url, '/')) !== false) {
82				$url = substr($url, 0, $pos);
83			}
84			else {
85				// Endpoint is in different format, no way to get scroll API url.
86				error(_s('Elasticsearch error: %1$s.',
87						_('cannot perform Scroll API request, data could be truncated'))
88				);
89
90				return null;
91			}
92		}
93
94		return $url.'/_search/scroll';
95	}
96
97	/**
98	 * Perform request(s) to Elasticsearch and parse the results.
99	 *
100	 * @param string $method      HTTP method to be used to perform request
101	 * @param string $endpoint    requested url
102	 * @param mixed  $request     data to be sent
103	 *
104	 * @return array    parsed result
105	 */
106	public static function query($method, $endpoint, $request = null) {
107		$parse_as = ELASTICSEARCH_RESPONSE_PLAIN;
108
109		// For non-search requests no additional parsing is done.
110		if (substr($endpoint, -strlen('/_search')) === '/_search') {
111			$parse_as = ELASTICSEARCH_RESPONSE_DOCUMENTS;
112
113			if (is_array($request) && array_key_exists('aggs', $request)) {
114				$parse_as = (array_key_exists('size', $request) && $request['size'] == 0)
115						? ELASTICSEARCH_RESPONSE_AGGREGATION : ELASTICSEARCH_RESPONSE_PLAIN;
116			}
117		}
118
119		if (is_array($request) && (!array_key_exists('size', $request) || $request['size'] > self::MAX_RESULT_WINDOW)) {
120			// Scroll API should be used to retrieve all data.
121			$results = [];
122			$limit = array_key_exists('size', $request) ? $request['size'] : null;
123			$request['size'] = self::MAX_RESULT_WINDOW;
124			self::$scroll_id = null;
125			self::$scrolls = [];
126
127			$scroll_endpoint = self::getScrollApiEndpoint($endpoint);
128			if ($scroll_endpoint !== null) {
129				$endpoint .= '?scroll='.self::KEEP_CONTEXT_PERIOD;
130			}
131
132			$slice = self::parseResult(self::request($method, $endpoint, $request), $parse_as);
133			$results = array_merge($results, $slice);
134
135			if (self::$scroll_id === null) {
136				$slice = null; // Reset slice if there is no scroll_id.
137			}
138
139			$endpoint = $scroll_endpoint;
140
141			while ($slice) {
142				if (count($slice) < self::MAX_RESULT_WINDOW) {
143					// No need to continue as there are no more data.
144					break;
145				}
146
147				$scroll = [
148					'scroll' => self::KEEP_CONTEXT_PERIOD,
149					'scroll_id' => self::$scroll_id
150				];
151
152				$slice = self::parseResult(self::request($method, $endpoint, $scroll), $parse_as);
153				$results = array_merge($results, $slice);
154
155				if ($limit !== null && count($results) >= $limit) {
156					$results = array_slice($results, 0, $limit);
157
158					// No need to perform additional queries as limit is reached.
159					break;
160				}
161			}
162
163			// Scrolls should be deleted when they are not required anymore.
164			if (count(self::$scrolls) > 0) {
165				self::request('DELETE', $endpoint, ['scroll_id' => array_keys(self::$scrolls)]);
166			}
167
168			return $results;
169		}
170
171		return self::parseResult(self::request($method, $endpoint, $request), $parse_as);
172	}
173
174	/**
175	 * Parse result based on request data.
176	 *
177	 * @param string $data        result as a string
178	 * @param int    $parse_as    result type
179	 *
180	 * @return array    parsed result
181	 */
182	private static function parseResult($data, $parse_as) {
183		$result = json_decode($data, TRUE);
184		if (!is_array($result)) {
185			error(_s('Elasticsearch error: %1$s.', _('failed to parse JSON')));
186
187			return [];
188		}
189
190		if (array_key_exists('error', $result)) {
191			$error = (is_array($result['error']) && array_key_exists('reason', $result['error']))
192					? $result['error']['reason']
193					: _('Unknown error');
194
195			error(_s('Elasticsearch error: %1$s.', $error));
196
197			return [];
198		}
199
200		if (array_key_exists('_scroll_id', $result)) {
201			self::$scroll_id = $result['_scroll_id'];
202			self::$scrolls[self::$scroll_id] = true;
203		}
204
205		switch ($parse_as) {
206			// Return aggregations only.
207			case ELASTICSEARCH_RESPONSE_AGGREGATION:
208				if (array_key_exists('aggregations', $result) && is_array($result['aggregations'])) {
209					return $result['aggregations'];
210				}
211				break;
212
213			// Return documents only.
214			case ELASTICSEARCH_RESPONSE_DOCUMENTS:
215				if (array_key_exists('hits', $result) && array_key_exists('hits', $result['hits'])) {
216					$values = [];
217
218					foreach ($result['hits']['hits'] as $row) {
219						if (!array_key_exists('_source', $row)) {
220							continue;
221						}
222
223						$values[] = $row['_source'];
224					}
225
226					return $values;
227				}
228				break;
229
230			// Return result "as is".
231			case ELASTICSEARCH_RESPONSE_PLAIN:
232				return $result;
233		}
234
235		return [];
236	}
237
238	/**
239	 * Add filters to Elasticsearch query.
240	 *
241	 * @param array $schema   DB schema
242	 * @param array $query    Elasticsearch query.
243	 * @param array $options  Filtering options.
244	 *
245	 * @return array    Elasticsearch query with added filtering
246	 */
247	public static function addFilter($schema, $query, $options) {
248		foreach ($options['filter'] as $field => $value) {
249			// Skip missing fields, textual fields (different mapping is needed for exact matching) and empty values.
250			if ($value === null || !array_key_exists($field, $schema['fields'])) {
251				continue;
252			}
253
254			$field_type = $schema['fields'][$field]['type'];
255			if (in_array($field_type, [DB::FIELD_TYPE_TEXT, DB::FIELD_TYPE_NCLOB, DB::FIELD_TYPE_CHAR])) {
256				continue;
257			}
258
259			if ($options['searchByAny']) {
260				$type = 'should';
261				$query['minimum_should_match'] = 1;
262			}
263			else {
264				$type = 'must';
265			}
266
267			$query['query']['bool'][$type][] = [
268				'terms' => [
269					$field => $value
270				]
271			];
272		}
273
274		return $query;
275	}
276
277	/**
278	 * Add search criteria to Elasticsearch query.
279	 *
280	 * @param array $schema     DB schema
281	 * @param array $query      Elasticsearch query
282	 * @param array $options    search options
283	 *
284	 * @return array    Elasticsearch query with added search criteria
285	 */
286	public static function addSearch($schema, $query, $options) {
287		$start = $options['startSearch'] ? '' : '*';
288		$exclude = $options['excludeSearch'] ? 'must_not' : 'must';
289
290		if ($options['searchByAny']) {
291			if (!$options['excludeSearch']) {
292				$exclude = 'should';
293			}
294
295			$query['minimum_should_match'] = 1;
296		}
297
298		foreach ($options['search'] as $field => $value) {
299			// Skip missing fields, non textual fields and empty values.
300			if ($value === null || !array_key_exists($field, $schema['fields'])) {
301				continue;
302			}
303
304			$field_type = $schema['fields'][$field]['type'];
305			if (!in_array($field_type, [DB::FIELD_TYPE_TEXT, DB::FIELD_TYPE_NCLOB, DB::FIELD_TYPE_CHAR])) {
306				continue;
307			}
308
309			foreach ($value as $phrase) {
310				$phrase = str_replace('?', '\\?', $phrase);
311
312				if (!$options['searchWildcardsEnabled']) {
313					$phrase = str_replace('*', '\\*', $phrase);
314					$criteria = [
315						'wildcard' => [
316							$field => $start.$phrase.'*'
317						]
318					];
319				}
320				else {
321					$criteria = [
322						'wildcard' => [
323							$field => $phrase
324						]
325					];
326				}
327
328				if ($options['excludeSearch'] && $options['searchByAny']) {
329					$query['query']['bool']['must_not']['bool']['should'][] = $criteria;
330				}
331				else {
332					$query['query']['bool'][$exclude][] = $criteria;
333				}
334			}
335		}
336
337		return $query;
338	}
339
340	/**
341	 * Add sorting criteria to Elasticsearch query.
342	 *
343	 * @param array $query    Elasticsearch query.
344	 * @param array $options  Sorting options.
345	 *
346	 * @return array    Elasticsearch query with added sorting options
347	 */
348	public static function addSort($query, $options) {
349		foreach ($options['sortfield'] as $i => $sortfield) {
350			// Add sort field to order.
351			if (is_array($options['sortorder'])) {
352				$sortorder = array_key_exists($i, $options['sortorder']) ? $options['sortorder'][$i] : ZBX_SORT_UP;
353			}
354			else {
355				$sortorder = ($options['sortorder'] !== '') ? $options['sortorder'] : ZBX_SORT_UP;
356			}
357
358			if ($sortorder === ZBX_SORT_DOWN) {
359				$query['sort'][$sortfield] = $sortorder;
360			}
361			else {
362				$query['sort'][] = $sortfield;
363			}
364		}
365
366		return $query;
367	}
368}
369