1<?php 2// (c) Copyright by authors of the Tiki Wiki CMS Groupware Project 3// 4// All Rights Reserved. See copyright.txt for details and a complete list of authors. 5// Licensed under the GNU LESSER GENERAL PUBLIC LICENSE. See license.txt for details. 6// $Id$ 7 8class Search_Elastic_Connection 9{ 10 private $dsn; 11 private $version; 12 private $mapping_type; 13 private $dirty = []; 14 15 private $indices = []; 16 17 private $bulk; 18 19 function __construct($dsn) 20 { 21 $this->dsn = rtrim($dsn, '/'); 22 $this->version = null; 23 if ($this->getVersion() >= 6.2) { 24 $this->mapping_type = '_doc'; // compatible with 7+ but not supported before 6.2 25 } else { 26 $this->mapping_type = 'doc'; 27 } 28 } 29 30 function __destruct() 31 { 32 try { 33 $this->flush(); 34 } catch (Search_Elastic_TransportException $e) { 35 // Left in blank 36 } 37 } 38 39 function startBulk($size = 500) 40 { 41 $this->bulk = new Search_Elastic_BulkOperation( 42 $size, 43 function ($data) { 44 $this->postBulk($data); 45 }, 46 ($this->getVersion() >= 7.0 ? '' : $this->mapping_type) 47 ); 48 } 49 50 function getStatus() 51 { 52 try { 53 $result = $this->get('/'); 54 55 if (isset($result->version)) { // elastic v2 56 $result->ok = true; 57 $result->status = 200; 58 } elseif (! isset($result->ok)) { 59 $result->ok = $result->status === 200; 60 } 61 62 return $result; 63 } catch (Exception $e) { 64 Feedback::error($e->getMessage()); 65 return (object) [ 66 'ok' => false, 67 'status' => 0, 68 ]; 69 } 70 } 71 72 /** 73 * gets the elasticsearch version string, e.g. 2.2.0 74 * 75 * @return float 76 */ 77 function getVersion() 78 { 79 if ($this->version === null) { 80 $status = $this->getStatus(); 81 82 if (! empty($status->version->number)) { 83 $this->version = (float) $status->version->number; 84 } else { 85 $this->version = 0; 86 } 87 } 88 89 return $this->version; 90 } 91 92 function getIndexStatus($index = '') 93 { 94 $index = $index ? '/' . $index : ''; 95 try { 96 if ($this->getVersion() < 2) { 97 return $this->get("$index/_status"); 98 } else { 99 return $this->get("$index/_stats"); // v2 "Indices Stats" API result 100 } 101 } catch (Exception $e) { 102 $message = $e->getMessage(); 103 104 if (strpos($message, '[_status]') === false && strpos($message, 'no such index') === false) { // another error 105 Feedback::error($message . ' for index ' . $index); 106 return null; 107 } 108 } 109 } 110 111 function deleteIndex($index) 112 { 113 $this->flush(); 114 115 try { 116 unset($this->indices[$index]); 117 return $this->delete("/$index"); 118 } catch (Search_Elastic_Exception $e) { 119 if ($e->getCode() !== 404) { 120 throw $e; 121 } 122 } 123 } 124 125 function search($index, array $query, array $args = [], $multisearch = false) 126 { 127 $indices = (array) $index; 128 foreach ($indices as $index) { 129 if (! empty($this->dirty[$index])) { 130 $this->refresh($index); 131 } 132 if (!$multisearch) { 133 // The purpose of Multisearch is to reduce connections to ES and so skip unless ES have Multivalidate 134 $this->validate($index, $query); 135 } 136 } 137 138 if ($multisearch) { 139 // Process an array of queries for Elasticsearch Multisearch 140 $queries = ''; 141 foreach ($query as $q) { 142 $queries .= "{}\n"; // use the search index set in URL 143 $queries .= json_encode($q) . "\n"; 144 } 145 } 146 147 $index = implode(',', $indices); 148 149 if ($multisearch) { 150 // We have an array of queries for Elasticsearch Multisearch 151 $ret = $this->post("/$index/_msearch?" . http_build_query($args, '', '&'), $queries); 152 $ret = $ret->responses; // return the array of responses for each query 153 } else { 154 $ret = $this->post("/$index/_search?" . http_build_query($args, '', '&'), json_encode($query)); 155 } 156 return $ret; 157 } 158 159 function validate($index, array $query) 160 { 161 $result = $this->post("/$index/_validate/query?explain=true", json_encode(['query' => $query['query']])); 162 if (isset($result->valid) && $result->valid === false) { 163 if (! empty($result->explanations)) { 164 foreach ($result->explanations as $explanation) { 165 if ($explanation->valid === false) { 166 throw new Search_Elastic_QueryParsingException($explanation->error); 167 } 168 } 169 } 170 if (! empty($result->error)) { 171 throw new Search_Elastic_QueryParsingException($result->error); 172 } 173 } 174 } 175 176 function scroll($scrollId, array $args = []) 177 { 178 if ($this->getVersion() < 5.0) { 179 return $this->post('/_search/scroll?' . http_build_query($args, '', '&'), $scrollId); 180 } else { 181 $args['scroll_id'] = $scrollId; 182 return $this->post('/_search/scroll?' . http_build_query($args, '', '&'), ''); 183 } 184 } 185 186 function storeQuery($index, $name, $query) 187 { 188 if ($this->getVersion() >= 5) { 189 return $this->rawIndex($index, 'percolator', $name, $query); 190 } else { 191 return $this->rawIndex($index, '.percolator', $name, $query); 192 } 193 } 194 195 function unstoreQuery($index, $name) 196 { 197 if ($this->getVersion() >= 5) { 198 return $this->delete("/$index/{$this->mapping_type}/percolator-$name"); 199 } else { 200 return $this->delete("/$index/.percolator/$name"); 201 } 202 } 203 204 function percolate($index, $type, $document) 205 { 206 if (! empty($this->dirty['_percolator'])) { 207 $this->refresh('_percolator'); 208 } 209 210 $type = $this->simplifyType($type); 211 if ($this->getVersion() >= 6) { 212 $result = $this->search($index, [ 213 'query' => [ 214 'percolate' => [ 215 'field' => 'query', 216 'document' => $document 217 ], 218 ], 219 ]); 220 return array_map(function ($item) { 221 return preg_replace('/^percolator-/', '', $item->_id); 222 }, $result->hits->hits); 223 } elseif ($this->getVersion() >= 5) { 224 $result = $this->search($index, [ 225 'query' => [ 226 'percolate' => [ 227 'field' => 'query', 228 'document_type' => $type, 229 'document' => $document 230 ], 231 ], 232 ]); 233 return array_map(function ($item) { 234 return preg_replace('/^percolator-/', '', $item->_id); 235 }, $result->hits->hits); 236 } else { 237 $result = $this->get("/$index/$type/_percolate", json_encode([ 238 'doc' => $document, 239 ])); 240 return array_map(function ($item) { 241 return $item->_id; 242 }, $result->matches); 243 } 244 } 245 246 function index($index, $type, $id, array $data) 247 { 248 $type = $this->simplifyType($type); 249 250 $this->rawIndex($index, $type, $id, $data); 251 } 252 253 function assignAlias($alias, $targetIndex) 254 { 255 $this->flush(); 256 257 $active = []; 258 $toRemove = []; 259 $current = $this->rawApi('/_aliases'); 260 foreach ($current as $indexName => $info) { 261 if (isset($info->aliases->$alias)) { 262 $active[] = $indexName; 263 $toRemove[] = $indexName; 264 } elseif (0 === strpos($indexName, $alias . '_') && $indexName != $targetIndex) { 265 $toRemove[] = $indexName; 266 } 267 } 268 $actions = [ 269 ['add' => ['index' => $targetIndex, 'alias' => $alias]], 270 ]; 271 272 foreach ($active as $index) { 273 $actions[] = ['remove' => ['index' => $index, 'alias' => $alias]]; 274 } 275 276 // Before assigning new alias, check there is not already an index matching alias name. 277 if (isset($current->$alias) && ! $this->aliasExists($alias)) { 278 $this->deleteIndex($alias); 279 } 280 281 $this->post('/_aliases', json_encode([ 282 'actions' => $actions, 283 ])); 284 285 // Make sure the new index is fully active, then clean-up 286 $this->refresh($alias); 287 288 foreach ($toRemove as $old) { 289 $this->deleteIndex($old); 290 } 291 } 292 293 function isRebuilding($aliasName) 294 { 295 try { 296 $current = $this->rawApi('/_aliases'); 297 } catch (Search_Elastic_Exception $e) { 298 $current = []; 299 } 300 foreach ($current as $indexName => $info) { 301 $hasAlias = isset($info->aliases) && count((array) $info->aliases) > 0; 302 if (0 === strpos($indexName, $aliasName . '_') && ! $hasAlias) { 303 // Matching name, no alias, means currently rebuilding 304 return true; 305 } 306 } 307 308 return false; 309 } 310 311 public function resolveAlias($aliasOrIndexName) { 312 try { 313 $current = $this->rawApi('/'.$aliasOrIndexName); 314 } catch (Search_Elastic_Exception $e) { 315 $current = []; 316 } 317 foreach ($current as $indexName => $_) { 318 return $indexName; 319 } 320 return $aliasOrIndexName; 321 } 322 323 private function rawIndex($index, $type, $id, $data) 324 { 325 $this->dirty[$index] = true; 326 327 if ($this->bulk) { 328 $this->bulk->index($index, $type, $id, $data); 329 } else { 330 $id = rawurlencode($id); 331 if ($type === '.percolator') { 332 return $this->put("/$index/$type/$id", json_encode($data)); 333 } else { 334 return $this->put("/$index/{$this->mapping_type}/$type-$id", json_encode($data)); 335 } 336 } 337 } 338 339 function unindex($index, $type, $id) 340 { 341 $this->dirty[$index] = true; 342 $type = $this->simplifyType($type); 343 344 if ($this->bulk) { 345 $this->bulk->unindex($index, $type, $id); 346 } else { 347 $id = rawurlencode($id); 348 if ($type === '.percolator') { 349 return $this->delete("/$index/$type/$id"); 350 } else { 351 return $this->delete("/$index/{$this->mapping_type}/$type-$id"); 352 } 353 } 354 } 355 356 function flush() 357 { 358 if ($this->bulk) { 359 try { 360 $this->bulk->flush(); 361 } catch (Exception $e) { 362 // report error and exception and try to carry on 363 $message = tr('Elastic search flush error: %0', $e->getMessage()); 364 365 Feedback::error($message); // for browser and console output 366 trigger_error($message); // gets logged while indexing 367 } 368 } 369 } 370 371 function refresh($index) 372 { 373 $this->flush(); 374 375 $this->post("/$index/_refresh", ''); 376 $this->dirty[$index] = false; 377 } 378 379 function document($index, $type, $id) 380 { 381 if (! empty($this->dirty[$index])) { 382 $this->refresh($index); 383 } 384 385 $type = $this->simplifyType($type); 386 $id = rawurlencode($id); 387 388 $document = $this->get("/$index/{$this->mapping_type}/$type-$id"); 389 390 if (isset($document->_source)) { 391 return $document->_source; 392 } 393 } 394 395 function mapping($index, array $mapping, callable $getIndex) 396 { 397 $data = ["properties" => $mapping]; 398 399 if (empty($this->indices[$index])) { 400 $this->createIndex($index, $getIndex); 401 $this->indices[$index] = true; 402 } 403 404 $result = $this->put("/$index/_mapping/".($this->getVersion() >= 7.0 ? '' : $this->mapping_type), json_encode($data)); 405 406 return $result; 407 } 408 409 function postBulk($data) 410 { 411 $this->post("/_bulk", $data); 412 } 413 414 function rawApi($path) 415 { 416 return $this->get($path); 417 } 418 419 private function aliasExists($index) 420 { 421 try { 422 $response = $this->get("/_alias/$index", ""); 423 if (! empty($response->status) && $response->status == 404) { 424 return false; 425 } 426 } catch (Search_Elastic_Exception $e) { 427 return false; 428 } 429 return true; 430 } 431 432 private function createIndex($index, callable $getIndex) 433 { 434 global $prefs; 435 436 if ($this->aliasExists($index)) { 437 return; 438 } 439 440 try { 441 $this->put( 442 "/$index", 443 json_encode($getIndex()) 444 ); 445 } catch (Search_Elastic_Exception $e) { 446 // Index already exists: ignore 447 } 448 449 if ($this->getVersion() >= 5) { 450 $this->put("/$index/_mapping/".($this->getVersion() >= 7.0 ? '' : $this->mapping_type), json_encode([ 451 'properties' => [ 452 'query' => [ 453 'type' => 'percolator' 454 ], 455 ], 456 ])); 457 $this->put("/$index/_settings", json_encode([ 458 'index.mapping.total_fields.limit' => $prefs['unified_elastic_field_limit'], 459 ])); 460 } 461 } 462 463 private function get($path, $data = null) 464 { 465 try { 466 $client = $this->getClient($path); 467 if ($data) { 468 $client->setRawBody($data); 469 } 470 $client->setMethod(Zend\Http\Request::METHOD_GET); 471 $client->setHeaders(['Content-Type: application/json']); 472 $response = $client->send(); 473 return $this->handleResponse($response); 474 } catch (Zend\Http\Exception\ExceptionInterface $e) { 475 throw new Search_Elastic_TransportException($e->getMessage()); 476 } 477 } 478 479 private function put($path, $data) 480 { 481 try { 482 $client = $this->getClient($path); 483 $client->getRequest()->setMethod(Zend\Http\Request::METHOD_PUT); 484 $client->getRequest()->setContent($data); 485 $client->setHeaders(['Content-Type: application/json']); 486 $response = $client->send(); 487 488 return $this->handleResponse($response); 489 } catch (Zend\Http\Exception\ExceptionInterface $e) { 490 throw new Search_Elastic_TransportException($e->getMessage()); 491 } 492 } 493 494 private function post($path, $data) 495 { 496 try { 497 $client = $this->getClient($path); 498 $client->getRequest()->setMethod(Zend\Http\Request::METHOD_POST); 499 $client->getRequest()->setContent($data); 500 $client->setHeaders(['Content-Type: application/json']); 501 $response = $client->send(); 502 503 return $this->handleResponse($response); 504 } catch (Zend\Http\Exception\ExceptionInterface $e) { 505 throw new Search_Elastic_TransportException($e->getMessage()); 506 } 507 } 508 509 private function delete($path) 510 { 511 try { 512 $client = $this->getClient($path); 513 $client->getRequest()->setMethod(Zend\Http\Request::METHOD_DELETE); 514 $client->setHeaders(['Content-Type: application/json']); 515 $response = $client->send(); 516 517 return $this->handleResponse($response); 518 } catch (Zend\Http\Exception\ExceptionInterface $e) { 519 throw new Search_Elastic_TransportException($e->getMessage()); 520 } 521 } 522 523 private function handleResponse($response) 524 { 525 $content = json_decode($response->getBody()); 526 527 if ($response->isSuccess()) { 528 if (isset($content->items)) { 529 foreach($content->items as $item) { 530 foreach($item as $res) { 531 if (isset($res->error)) { 532 $message = $res->error; 533 if (is_object($message) && ! empty($message->reason)) { 534 $message = $message->reason; 535 } 536 throw new Search_Elastic_Exception((string)$message); 537 } 538 } 539 } 540 } 541 return $content; 542 } elseif (isset($content->exists) && $content->exists === false) { 543 throw new Search_Elastic_NotFoundException($content->_type, $content->_id); 544 } elseif (isset($content->error)) { 545 $message = $content->error; 546 if (is_object($message) && ! empty($message->reason)) { 547 $message = $message->reason; 548 } 549 if (preg_match('/^MapperParsingException\[No handler for type \[(?P<type>.*)\].*\[(?P<field>.*)\]\]$/', $message, $parts)) { 550 throw new Search_Elastic_MappingException($parts['type'], $parts['field']); 551 } elseif (preg_match('/No mapping found for \[(\S+)\] in order to sort on/', $message, $parts)) { 552 throw new Search_Elastic_SortException($parts[1]); 553 } elseif (preg_match('/NumberFormatException\[For input string: "(.*?)"\]/', $message, $parts)) { 554 $pattern = '/' . preg_quote('{"match":{"___wildcard___":{"query":"' . $parts[1] . '"}}') . '/'; 555 $pattern = str_replace('___wildcard___', '([^"]*)', $pattern); 556 if (preg_match($pattern, $message, $parts2)) { 557 $field = $parts2[1]; 558 } else { 559 $field = ''; 560 } 561 throw new Search_Elastic_NumberFormatException($parts[1], $field); 562 } elseif (preg_match('/QueryParsingException\[\[[^\]]*\] \[[^\]]*\] ([^\]]*)\]/', $message, $parts)) { 563 throw new Search_Elastic_QueryParsingException($parts[1]); 564 } else { 565 throw new Search_Elastic_Exception($message, $content->status); 566 } 567 } else { 568 return $content; 569 } 570 } 571 572 private function getClient($path) 573 { 574 $full = "{$this->dsn}$path"; 575 576 $tikilib = TikiLib::lib('tiki'); 577 try { 578 return $tikilib->get_http_client($full); 579 } catch (\Zend\Http\Exception\ExceptionInterface $e) { 580 throw new Search_Elastic_TransportException($e->getMessage()); 581 } 582 } 583 584 private function simplifyType($type) 585 { 586 return preg_replace('/[^a-z]/', '', $type); 587 } 588 589 /** 590 * Store the dirty flags at the end of the request and restore them when opening the 591 * connection within a single user session so that if a modification requires re-indexing, 592 * the next page load will wait until indexing is done to show the results. 593 */ 594 function persistDirty(Tiki_Event_Manager $events) 595 { 596 if (isset($_SESSION['elastic_search_dirty'])) { 597 $this->dirty = $_SESSION['elastic_search_dirty']; 598 unset($_SESSION['elastic_search_dirty']); 599 } 600 601 // Before the HTTP request is closed 602 $events->bind('tiki.process.redirect', function () { 603 $_SESSION['elastic_search_dirty'] = $this->dirty; 604 }); 605 } 606} 607