1<?php 2/* Icinga Web 2 Elasticsearch Module | (c) 2016 Icinga Development Team | GPLv2+ */ 3 4namespace Icinga\Module\Elasticsearch\RestApi; 5 6use ArrayIterator; 7use LogicException; 8use Icinga\Application\Benchmark; 9use Icinga\Data\Extensible; 10use Icinga\Data\Filter\Filter; 11use Icinga\Data\Reducible; 12use Icinga\Data\Selectable; 13use Icinga\Data\Updatable; 14use Icinga\Exception\IcingaException; 15use Icinga\Exception\NotImplementedError; 16use Icinga\Exception\StatementException; 17use Icinga\Exception\QueryException; 18use Icinga\Web\UrlParams; 19use Icinga\Module\Elasticsearch\Exception\RestApiException; 20 21class RestApiClient implements Extensible, Reducible, Selectable, Updatable 22{ 23 /** 24 * The cURL handle of this RestApiClient 25 * 26 * @var resource 27 */ 28 protected $curl; 29 30 /** 31 * The host of the API 32 * 33 * @var string 34 */ 35 protected $host; 36 37 /** 38 * The name of the user to access the API with 39 * 40 * @var string 41 */ 42 protected $user; 43 44 /** 45 * The password for the user the API is accessed with 46 * 47 * @var string 48 */ 49 protected $pass; 50 51 /** 52 * The path of a file holding one or more certificates to verify the peer with 53 * 54 * @var string 55 */ 56 protected $certificatePath; 57 58 /** 59 * Create a new RestApiClient 60 * 61 * @param string $host The host of the API 62 * @param string $user The name of the user to access the API with 63 * @param string $pass The password for the user the API is accessed with 64 * @param string $certificatePath The path of a file holding one or more certificates to verify the peer with 65 */ 66 public function __construct($host, $user = null, $pass = null, $certificatePath = null) 67 { 68 $this->host = $host; 69 $this->user = $user; 70 $this->pass = $pass; 71 $this->certificatePath = $certificatePath; 72 } 73 74 /** 75 * Return the cURL handle of this RestApiClient 76 * 77 * @return resource 78 */ 79 public function getConnection() 80 { 81 if ($this->curl === null) { 82 $this->curl = $this->createConnection(); 83 } 84 85 return $this->curl; 86 } 87 88 /** 89 * Create and return a new cURL handle for this RestApiClient 90 * 91 * @return resource 92 */ 93 protected function createConnection() 94 { 95 $curl = curl_init(); 96 curl_setopt($curl, CURLOPT_HEADER, true); 97 curl_setopt($curl, CURLOPT_FAILONERROR, false); 98 curl_setopt($curl, CURLOPT_RETURNTRANSFER, true); 99 100 if ($this->certificatePath !== null) { 101 curl_setopt($curl, CURLOPT_CAINFO, $this->certificatePath); 102 } 103 104 if ($this->user !== null && $this->pass !== null) { 105 curl_setopt($curl, CURLOPT_HTTPAUTH, CURLAUTH_BASIC); 106 curl_setopt($curl, CURLOPT_USERPWD, $this->user . ':' . $this->pass); 107 } 108 109 return $curl; 110 } 111 112 /** 113 * Send the given request and return its response 114 * 115 * @param RestApiRequest $request 116 * 117 * @return RestApiResponse 118 * 119 * @throws RestApiException In case an error occured while handling the request 120 */ 121 public function request(RestApiRequest $request) 122 { 123 $scheme = strpos($this->host, '://') !== false ? '' : 'http://'; 124 $path = '/' . join('/', array_map('rawurlencode', explode('/', ltrim($request->getPath(), '/')))); 125 $query = ($request->getParams()->isEmpty() ? '' : ('?' . (string) $request->getParams())); 126 127 $curl = $this->getConnection(); 128 curl_setopt($curl, CURLOPT_HTTPHEADER, $request->getHeaders()); 129 curl_setopt($curl, CURLOPT_CUSTOMREQUEST, $request->getMethod()); 130 curl_setopt($curl, CURLOPT_URL, $scheme . $this->host . $path . $query); 131 curl_setopt($curl, CURLOPT_POSTFIELDS, $request->getPayload()); 132 133 $result = curl_exec($curl); 134 if ($result === false) { 135 $restApiException = new RestApiException(curl_error($curl)); 136 $restApiException->setErrorCode(curl_errno($curl)); 137 throw $restApiException; 138 } 139 140 $header = substr($result, 0, curl_getinfo($curl, CURLINFO_HEADER_SIZE)); 141 $result = substr($result, curl_getinfo($curl, CURLINFO_HEADER_SIZE)); 142 143 $statusCode = 0; 144 foreach (explode("\r\n", $header) as $headerLine) { 145 // The headers are inspected manually because curl_getinfo($curl, CURLINFO_HTTP_CODE) 146 // returns only the first status code. (e.g. 100 instead of 200) 147 $matches = array(); 148 if (preg_match('/^HTTP\/[0-9.]+ ([0-9]+)/', $headerLine, $matches)) { 149 $statusCode = (int) $matches[1]; 150 } 151 } 152 153 $response = new RestApiResponse($statusCode); 154 if ($result) { 155 $response->setPayload($result); 156 $response->setContentType(curl_getinfo($curl, CURLINFO_CONTENT_TYPE)); 157 } 158 159 return $response; 160 } 161 162 /** 163 * Create and return a new query for this RestApiClient 164 * 165 * @param array $indices An array of index name patterns 166 * @param array $types An array of document type names 167 * 168 * @return RestApiQuery 169 */ 170 public function select(array $indices = null, array $types = null) 171 { 172 $query = new RestApiQuery($this); 173 if ($indices !== null) { 174 $query->setIndices($indices); 175 } 176 if ($types !== null) { 177 $query->setTypes($types); 178 } 179 return $query; 180 } 181 182 /** 183 * Fetch and return all documents of the given query's result set using an iterator 184 * 185 * @param RestApiQuery $query The query returning the result set 186 * 187 * @return ArrayIterator 188 */ 189 public function query(RestApiQuery $query) 190 { 191 return new ArrayIterator($this->fetchAll($query)); 192 } 193 194 /** 195 * Count all documents of the result set 196 * 197 * @param RestApiQuery $query 198 * 199 * @return int 200 */ 201 public function count(RestApiQuery $query) 202 { 203 $response = $this->request($query->createCountRequest()); 204 if (! $response->isSuccess()) { 205 throw new QueryException($this->renderErrorMessage($response)); 206 } 207 208 return $query->createCountResult($response); 209 } 210 211 /** 212 * Retrieve an array containing all documents of the result set 213 * 214 * @param RestApiQuery $query 215 * 216 * @return array 217 */ 218 public function fetchAll(RestApiQuery $query) 219 { 220 $response = $this->request($query->createSearchRequest()); 221 if (! $response->isSuccess()) { 222 throw new QueryException($this->renderErrorMessage($response)); 223 } 224 225 return $query->createSearchResult($response); 226 } 227 228 /** 229 * Fetch the first document of the result set 230 * 231 * @param RestApiQuery $query 232 * 233 * @return array|false 234 */ 235 public function fetchRow(RestApiQuery $query) 236 { 237 $clonedQuery = clone $query; 238 $clonedQuery->limit(1); 239 $results = $this->fetchAll($clonedQuery); 240 return array_shift($results) ?: false; 241 } 242 243 /** 244 * Fetch the first field of all documents of the result set as an array 245 * 246 * @param RestApiQuery $query 247 * 248 * @return array 249 * 250 * @throws LogicException In case no attribute is being requested 251 */ 252 public function fetchColumn(RestApiQuery $query) 253 { 254 $fields = $query->getColumns(); 255 if (empty($fields)) { 256 throw new LogicException('You must request at least one attribute when fetching a single field'); 257 } 258 259 $results = $this->fetchAll($query); 260 $alias = key($fields); 261 $field = is_int($alias) ? current($fields) : $alias; 262 $values = array(); 263 foreach ($results as $document) { 264 if (isset($document->$field)) { 265 $values[] = $document->$field; 266 } 267 } 268 269 return $values; 270 } 271 272 /** 273 * Fetch the first field of the first document of the result set 274 * 275 * @param RestApiQuery $query 276 * 277 * @return string 278 */ 279 public function fetchOne(RestApiQuery $query) 280 { 281 throw new NotImplementedError('RestApiClient::fetchOne() is not implemented yet'); 282 } 283 284 /** 285 * Fetch all documents of the result set as an array of key-value pairs 286 * 287 * The first field is the key, the second field is the value. 288 * 289 * @param RestApiQuery $query 290 * 291 * @return array 292 */ 293 public function fetchPairs(RestApiQuery $query) 294 { 295 throw new NotImplementedError('RestApiClient::fetchPairs() is not implemented yet'); 296 } 297 298 /** 299 * Fetch and return the given document 300 * 301 * In case you are only interested in the source, pass "_source" as the only desired field. 302 * 303 * @param string $index The index the document is located in 304 * @param string $documentType The type of the document to fetch 305 * @param string $id The id of the document to fetch 306 * @param array $fields The desired fields to return instead of all fields 307 * @param UrlParams $params Additional URL parameters to add to the request 308 * 309 * @return object|false Returns false in case no document could be found 310 */ 311 public function fetchDocument($index, $documentType, $id, array $fields = null, UrlParams $params = null) 312 { 313 $request = new GetApiRequest($index, $documentType, $id); 314 if ($params !== null) { 315 $request->setParams($params); 316 } 317 318 if (! empty($fields)) { 319 if (count($fields) == 1 && reset($fields) === '_source') { 320 $request->setSourceOnly(); 321 $fields = null; 322 } elseif (! $request->getParams()->has('_source')) { 323 $request->getParams()->set('_source', join(',', $fields)); 324 } 325 } 326 327 $response = $this->request($request); 328 if (! $response->isSuccess()) { 329 if ($response->getStatusCode() === 404) { 330 return false; 331 } 332 333 throw new QueryException($this->renderErrorMessage($response)); 334 } 335 336 $hit = new SearchHit($response->json()); 337 return $hit->createRow($fields ?: array()); 338 } 339 340 /** 341 * Insert the given data for the given target 342 * 343 * @param string|array $target 344 * @param array $data 345 * @param UrlParams $params Additional URL parameters to add to the request 346 * 347 * @return bool Whether the document has been created or not 348 * 349 * @throws StatementException 350 */ 351 public function insert($target, array $data, UrlParams $params = null) 352 { 353 if (is_string($target)) { 354 $target = explode('/', $target); 355 } 356 357 switch (count($target)) { 358 case 3: 359 list($index, $documentType, $id) = $target; 360 break; 361 case 2: 362 list($index, $documentType) = $target; 363 $id = null; 364 break; 365 default: 366 throw new LogicException('Invalid target "%s"', join('/', $target)); 367 } 368 369 $request = new IndexApiRequest($index, $documentType, $id, $data); 370 if ($params !== null) { 371 $request->setParams($params); 372 } else { 373 $params = $request->getParams(); 374 } 375 376 if (! $params->has('refresh')) { 377 $params->set('refresh', true); 378 } 379 380 try { 381 $response = $this->request($request); 382 } catch (RestApiException $e) { 383 throw new StatementException( 384 'Failed to index document "%s". An error occurred: %s', 385 join('/', $target), 386 $e 387 ); 388 } 389 390 if (! $response->isSuccess()) { 391 throw new StatementException( 392 'Unable to index document "%s": %s', 393 join('/', $target), 394 $this->renderErrorMessage($response) 395 ); 396 } 397 398 $json = $response->json(); 399 return $json['created']; 400 } 401 402 /** 403 * Update the target with the given data and optionally limit the affected documents by using a filter 404 * 405 * Note that the given filter will have no effect in case the target represents a single document. 406 * 407 * @param string|array $target 408 * @param array $data 409 * @param Filter $filter 410 * @param UrlParams $params Additional URL parameters to add to the request 411 * 412 * @return array The response for the requested update 413 * 414 * @throws StatementException 415 * 416 * @todo Add support for bulk updates 417 */ 418 public function update($target, array $data, Filter $filter = null, UrlParams $params = null) 419 { 420 if (is_string($target)) { 421 $target = explode('/', $target); 422 } 423 424 switch (count($target)) { 425 case 3: 426 list($index, $documentType, $id) = $target; 427 break; 428 case 2: 429 if ($filter === null) { 430 throw new LogicException('Update requests without id are required to provide a filter'); 431 } 432 433 list($index, $documentType) = $target; 434 $id = null; 435 break; 436 default: 437 throw new LogicException('Invalid target "%s"', join('/', $target)); 438 } 439 440 if ($id !== null) { 441 $request = new UpdateApiRequest($index, $documentType, $id, array('doc' => (object) $data)); 442 } elseif ($filter !== null) { 443 $query = new RestApiQuery($this, array('_id')); 444 $ids = $query 445 ->setIndices(array($index)) 446 ->setTypes(array($documentType)) 447 ->setFilter($filter) 448 ->fetchColumn(); 449 if (empty($ids)) { 450 throw new StatementException('No documents found'); 451 } elseif (count($ids) == 1) { 452 $request = new UpdateApiRequest($index, $documentType, $ids[0], array('doc' => (object) $data)); 453 } else { 454 throw new NotImplementedError('Bulk updates are not supported yet'); 455 } 456 } 457 458 if ($params !== null) { 459 $request->setParams($params); 460 } else { 461 $params = $request->getParams(); 462 } 463 464 if (! $params->has('refresh')) { 465 $params->set('refresh', true); 466 } 467 if (! $params->has('fields')) { 468 $params->set('fields', '_source'); 469 } 470 471 try { 472 $response = $this->request($request); 473 } catch (RestApiException $e) { 474 throw new StatementException( 475 'Failed to update document "%s". An error occurred: %s', 476 join('/', $target), 477 $e 478 ); 479 } 480 481 if (! $response->isSuccess()) { 482 throw new StatementException( 483 'Unable to update document "%s": %s', 484 join('/', $target), 485 $this->renderErrorMessage($response) 486 ); 487 } 488 489 return $response->json(); 490 } 491 492 /** 493 * Delete documents in the given target, optionally limiting the affected documents by using a filter 494 * 495 * Note that the given filter will have no effect in case the target represents a single document. 496 * 497 * @param string|array $target 498 * @param Filter $filter 499 * @param UrlParams $params Additional URL parameters to add to the request 500 * 501 * @return array The response for the requested deletion 502 * 503 * @throws StatementException 504 * 505 * @todo Add support for bulk deletions 506 */ 507 public function delete($target, Filter $filter = null, UrlParams $params = null) 508 { 509 if (is_string($target)) { 510 $target = explode('/', $target); 511 } 512 513 switch (count($target)) { 514 case 3: 515 list($index, $documentType, $id) = $target; 516 break; 517 case 2: 518 if ($filter === null) { 519 throw new LogicException('Update requests without id are required to provide a filter'); 520 } 521 522 list($index, $documentType) = $target; 523 $id = null; 524 break; 525 default: 526 throw new LogicException('Invalid target "%s"', join('/', $target)); 527 } 528 529 if ($id !== null) { 530 $request = new DeleteApiRequest($index, $documentType, $id); 531 } elseif ($filter !== null) { 532 $query = new RestApiQuery($this, array('_id')); 533 $ids = $query 534 ->setIndices(array($index)) 535 ->setTypes(array($documentType)) 536 ->setFilter($filter) 537 ->fetchColumn(); 538 if (empty($ids)) { 539 throw new StatementException('No documents found'); 540 } elseif (count($ids) == 1) { 541 $request = new DeleteApiRequest($index, $documentType, $ids[0]); 542 } else { 543 throw new NotImplementedError('Bulk deletions are not supported yet'); 544 } 545 } 546 547 if ($params !== null) { 548 $request->setParams($params); 549 } else { 550 $params = $request->getParams(); 551 } 552 553 if (! $params->has('refresh')) { 554 $params->set('refresh', true); 555 } 556 557 try { 558 $response = $this->request($request); 559 } catch (RestApiException $e) { 560 throw new StatementException( 561 'Failed to delete document "%s". An error occurred: %s', 562 join('/', $target), 563 $e 564 ); 565 } 566 567 if (! $response->isSuccess()) { 568 throw new StatementException( 569 'Unable to delete document "%s": %s', 570 join('/', $target), 571 $this->renderErrorMessage($response) 572 ); 573 } 574 575 return $response->json(); 576 } 577 578 /** 579 * Render and return a human readable error message for the given error document 580 * 581 * @return string 582 * 583 * @todo Parse Elasticsearch 2.x structured errors 584 */ 585 public function renderErrorMessage(RestApiResponse $response) 586 { 587 try { 588 $errorDocument = $response->json(); 589 } catch (IcingaException $e) { 590 return sprintf('%s: %s', 591 $e->getMessage(), 592 $response->getPayload() 593 ); 594 } 595 596 if (! isset($errorDocument['error'])) { 597 return sprintf('Elasticsearch unknown json error %s: %s', 598 $response->getStatusCode(), 599 $response->getPayload() 600 ); 601 } 602 603 if (is_string($errorDocument['error'])) { 604 return $errorDocument['error']; 605 } 606 607 return sprintf('Elasticsearch json error %s: %s', 608 $response->getStatusCode(), 609 json_encode($errorDocument['error']) 610 ); 611 } 612 613 /** 614 * Render and return the given filter as Elasticsearch query 615 * 616 * @param Filter $filter 617 * 618 * @return array 619 */ 620 public function renderFilter(Filter $filter) 621 { 622 $renderer = new FilterRenderer($filter); 623 return $renderer->getQuery(); 624 } 625 626 /** 627 * Retrieve columns from the Elasticsearch indices. 628 * 629 * It will get you a merged list of columns available over the specified indices and types. 630 * 631 * @param array $indices The indices or index patterns to get 632 * @param array $types An array of types to get columns for 633 * 634 * @throws QueryException When Elasticsearch returns an error 635 * 636 * @return array A list of column names 637 * 638 * @todo Do a cached retrieval? 639 */ 640 public function fetchColumns(array $indices = null, array $types = null) 641 { 642 Benchmark::measure('Retrieving columns for types: ' . (!empty($types) ? join(', ', $types) : '(all)')); 643 $request = new GetMappingApiRequest($indices, $types); 644 645 $response = $this->request($request); 646 if (! $response->isSuccess()) { 647 if ($response->getStatusCode() === 404) { 648 return false; 649 } 650 651 throw new QueryException($this->renderErrorMessage($response)); 652 } 653 654 // initialize with interal columns 655 $columns = array( 656 '_index', 657 '_type', 658 '_id', 659 ); 660 661 foreach ($response->json() as $index => $mappings) { 662 if (! array_key_exists('mappings', $mappings)) { 663 continue; 664 } 665 foreach ($mappings['mappings'] as $type) { 666 if (! array_key_exists('properties', $type)) { 667 continue; 668 } 669 foreach ($type['properties'] as $column => $detail) { 670 if ($column === '@version') { 671 continue; 672 } 673 if (array_key_exists('properties', $detail)) { 674 // ignore structured types 675 // TODO: support this later? 676 continue; 677 } 678 if (! in_array($column, $columns)) { 679 $columns[] = $column; 680 } 681 } 682 683 } 684 } 685 Benchmark::measure('Finished retrieving columns'); 686 687 return $columns; 688 } 689} 690