1<?php 2/* 3** Zabbix 4** Copyright (C) 2001-2021 Zabbix SIA 5** 6** This program is free software; you can redistribute it and/or modify 7** it under the terms of the GNU General Public License as published by 8** the Free Software Foundation; either version 2 of the License, or 9** (at your option) any later version. 10** 11** This program is distributed in the hope that it will be useful, 12** but WITHOUT ANY WARRANTY; without even the implied warranty of 13** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14** GNU General Public License for more details. 15** 16** You should have received a copy of the GNU General Public License 17** along with this program; if not, write to the Free Software 18** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 19**/ 20 21 22/** 23 * A helper class for working with Elasticsearch. 24 */ 25class CElasticsearchHelper { 26 27 const MAX_RESULT_WINDOW = 10000; 28 const KEEP_CONTEXT_PERIOD = '10s'; 29 30 private static $scroll_id; 31 private static $scrolls; 32 33 /** 34 * Perform request to Elasticsearch. 35 * 36 * @param string $method HTTP method to be used to perform request 37 * @param string $endpoint requested url 38 * @param mixed $request data to be sent 39 * 40 * @return string result 41 */ 42 private static function request($method, $endpoint, $request = null) { 43 $time_start = microtime(true); 44 $options = [ 45 'http' => [ 46 'header' => "Content-Type: application/json; charset=UTF-8", 47 'method' => $method, 48 'ignore_errors' => true // To get error messages from Elasticsearch. 49 ] 50 ]; 51 52 if ($request) { 53 $request = json_encode($request); 54 $options['http']['content'] = $request; 55 } 56 57 try { 58 $result = file_get_contents($endpoint, false, stream_context_create($options)); 59 } 60 catch (Exception $e) { 61 error($e->getMessage()); 62 } 63 64 CProfiler::getInstance()->profileElasticsearch(microtime(true) - $time_start, $method, $endpoint, $request); 65 66 return $result; 67 } 68 69 /** 70 * Get Elasticsearch endpoint for scroll API requests. 71 * Endpoint should be in following format: <Elasticsearch url>/<indices>/<values>/<action><query string>. 72 * 73 * @param string $endpoint endpoint of the initial request 74 * 75 * @return array parsed result 76 */ 77 private static function getScrollApiEndpoint($endpoint) { 78 $url = $endpoint; 79 80 for ($i = 0; $i < 2; $i++) { 81 if (($pos = strrpos($url, '/')) !== false) { 82 $url = substr($url, 0, $pos); 83 } 84 else { 85 // Endpoint is in different format, no way to get scroll API url. 86 error(_s('Elasticsearch error: %1$s.', 87 _('cannot perform Scroll API request, data could be truncated')) 88 ); 89 90 return null; 91 } 92 } 93 94 return $url.'/_search/scroll'; 95 } 96 97 /** 98 * Perform request(s) to Elasticsearch and parse the results. 99 * 100 * @param string $method HTTP method to be used to perform request 101 * @param string $endpoint requested url 102 * @param mixed $request data to be sent 103 * 104 * @return array parsed result 105 */ 106 public static function query($method, $endpoint, $request = null) { 107 $parse_as = ELASTICSEARCH_RESPONSE_PLAIN; 108 109 // For non-search requests no additional parsing is done. 110 if (substr($endpoint, -strlen('/_search')) === '/_search') { 111 $parse_as = ELASTICSEARCH_RESPONSE_DOCUMENTS; 112 113 if (is_array($request) && array_key_exists('aggs', $request)) { 114 $parse_as = (array_key_exists('size', $request) && $request['size'] == 0) 115 ? ELASTICSEARCH_RESPONSE_AGGREGATION : ELASTICSEARCH_RESPONSE_PLAIN; 116 } 117 } 118 119 if (is_array($request) && (!array_key_exists('size', $request) || $request['size'] > self::MAX_RESULT_WINDOW)) { 120 // Scroll API should be used to retrieve all data. 121 $results = []; 122 $limit = array_key_exists('size', $request) ? $request['size'] : null; 123 $request['size'] = self::MAX_RESULT_WINDOW; 124 self::$scroll_id = null; 125 self::$scrolls = []; 126 127 $scroll_endpoint = self::getScrollApiEndpoint($endpoint); 128 if ($scroll_endpoint !== null) { 129 $endpoint .= '?scroll='.self::KEEP_CONTEXT_PERIOD; 130 } 131 132 $slice = self::parseResult(self::request($method, $endpoint, $request), $parse_as); 133 $results = array_merge($results, $slice); 134 135 if (self::$scroll_id === null) { 136 $slice = null; // Reset slice if there is no scroll_id. 137 } 138 139 $endpoint = $scroll_endpoint; 140 141 while ($slice) { 142 if (count($slice) < self::MAX_RESULT_WINDOW) { 143 // No need to continue as there are no more data. 144 break; 145 } 146 147 $scroll = [ 148 'scroll' => self::KEEP_CONTEXT_PERIOD, 149 'scroll_id' => self::$scroll_id 150 ]; 151 152 $slice = self::parseResult(self::request($method, $endpoint, $scroll), $parse_as); 153 $results = array_merge($results, $slice); 154 155 if ($limit !== null && count($results) >= $limit) { 156 $results = array_slice($results, 0, $limit); 157 158 // No need to perform additional queries as limit is reached. 159 break; 160 } 161 } 162 163 // Scrolls should be deleted when they are not required anymore. 164 if (count(self::$scrolls) > 0) { 165 self::request('DELETE', $endpoint, ['scroll_id' => array_keys(self::$scrolls)]); 166 } 167 168 return $results; 169 } 170 171 return self::parseResult(self::request($method, $endpoint, $request), $parse_as); 172 } 173 174 /** 175 * Parse result based on request data. 176 * 177 * @param string $data result as a string 178 * @param int $parse_as result type 179 * 180 * @return array parsed result 181 */ 182 private static function parseResult($data, $parse_as) { 183 $result = json_decode($data, TRUE); 184 if (!is_array($result)) { 185 error(_s('Elasticsearch error: %1$s.', _('failed to parse JSON'))); 186 187 return []; 188 } 189 190 if (array_key_exists('error', $result)) { 191 $error = (is_array($result['error']) && array_key_exists('reason', $result['error'])) 192 ? $result['error']['reason'] 193 : _('Unknown error'); 194 195 error(_s('Elasticsearch error: %1$s.', $error)); 196 197 return []; 198 } 199 200 if (array_key_exists('_scroll_id', $result)) { 201 self::$scroll_id = $result['_scroll_id']; 202 self::$scrolls[self::$scroll_id] = true; 203 } 204 205 switch ($parse_as) { 206 // Return aggregations only. 207 case ELASTICSEARCH_RESPONSE_AGGREGATION: 208 if (array_key_exists('aggregations', $result) && is_array($result['aggregations'])) { 209 return $result['aggregations']; 210 } 211 break; 212 213 // Return documents only. 214 case ELASTICSEARCH_RESPONSE_DOCUMENTS: 215 if (array_key_exists('hits', $result) && array_key_exists('hits', $result['hits'])) { 216 $values = []; 217 218 foreach ($result['hits']['hits'] as $row) { 219 if (!array_key_exists('_source', $row)) { 220 continue; 221 } 222 223 $values[] = $row['_source']; 224 } 225 226 return $values; 227 } 228 break; 229 230 // Return result "as is". 231 case ELASTICSEARCH_RESPONSE_PLAIN: 232 return $result; 233 } 234 235 return []; 236 } 237 238 /** 239 * Add filters to Elasticsearch query. 240 * 241 * @param array $schema DB schema 242 * @param array $query Elasticsearch query. 243 * @param array $options Filtering options. 244 * 245 * @return array Elasticsearch query with added filtering 246 */ 247 public static function addFilter($schema, $query, $options) { 248 foreach ($options['filter'] as $field => $value) { 249 // Skip missing fields, textual fields (different mapping is needed for exact matching) and empty values. 250 if ($value === null || !array_key_exists($field, $schema['fields'])) { 251 continue; 252 } 253 254 $field_type = $schema['fields'][$field]['type']; 255 if (in_array($field_type, [DB::FIELD_TYPE_TEXT, DB::FIELD_TYPE_NCLOB, DB::FIELD_TYPE_CHAR])) { 256 continue; 257 } 258 259 if ($options['searchByAny']) { 260 $type = 'should'; 261 $query['minimum_should_match'] = 1; 262 } 263 else { 264 $type = 'must'; 265 } 266 267 $query['query']['bool'][$type][] = [ 268 'terms' => [ 269 $field => $value 270 ] 271 ]; 272 } 273 274 return $query; 275 } 276 277 /** 278 * Add search criteria to Elasticsearch query. 279 * 280 * @param array $schema DB schema 281 * @param array $query Elasticsearch query 282 * @param array $options search options 283 * 284 * @return array Elasticsearch query with added search criteria 285 */ 286 public static function addSearch($schema, $query, $options) { 287 $start = $options['startSearch'] ? '' : '*'; 288 $exclude = $options['excludeSearch'] ? 'must_not' : 'must'; 289 290 if ($options['searchByAny']) { 291 if (!$options['excludeSearch']) { 292 $exclude = 'should'; 293 } 294 295 $query['minimum_should_match'] = 1; 296 } 297 298 foreach ($options['search'] as $field => $value) { 299 // Skip missing fields, non textual fields and empty values. 300 if ($value === null || !array_key_exists($field, $schema['fields'])) { 301 continue; 302 } 303 304 $field_type = $schema['fields'][$field]['type']; 305 if (!in_array($field_type, [DB::FIELD_TYPE_TEXT, DB::FIELD_TYPE_NCLOB, DB::FIELD_TYPE_CHAR])) { 306 continue; 307 } 308 309 foreach ($value as $phrase) { 310 $phrase = str_replace('?', '\\?', $phrase); 311 312 if (!$options['searchWildcardsEnabled']) { 313 $phrase = str_replace('*', '\\*', $phrase); 314 $criteria = [ 315 'wildcard' => [ 316 $field => $start.$phrase.'*' 317 ] 318 ]; 319 } 320 else { 321 $criteria = [ 322 'wildcard' => [ 323 $field => $phrase 324 ] 325 ]; 326 } 327 328 if ($options['excludeSearch'] && $options['searchByAny']) { 329 $query['query']['bool']['must_not']['bool']['should'][] = $criteria; 330 } 331 else { 332 $query['query']['bool'][$exclude][] = $criteria; 333 } 334 } 335 } 336 337 return $query; 338 } 339 340 /** 341 * Add sorting criteria to Elasticsearch query. 342 * 343 * @param array $query Elasticsearch query. 344 * @param array $options Sorting options. 345 * 346 * @return array Elasticsearch query with added sorting options 347 */ 348 public static function addSort($query, $options) { 349 foreach ($options['sortfield'] as $i => $sortfield) { 350 // Add sort field to order. 351 if (is_array($options['sortorder'])) { 352 $sortorder = array_key_exists($i, $options['sortorder']) ? $options['sortorder'][$i] : ZBX_SORT_UP; 353 } 354 else { 355 $sortorder = ($options['sortorder'] !== '') ? $options['sortorder'] : ZBX_SORT_UP; 356 } 357 358 if ($sortorder === ZBX_SORT_DOWN) { 359 $query['sort'][$sortfield] = $sortorder; 360 } 361 else { 362 $query['sort'][] = $sortfield; 363 } 364 } 365 366 return $query; 367 } 368} 369