1<?php
2/* Icinga Web 2 Elasticsearch Module | (c) 2016 Icinga Development Team | GPLv2+ */
3
4namespace Icinga\Module\Elasticsearch\RestApi;
5
6use ArrayIterator;
7use LogicException;
8use Icinga\Application\Benchmark;
9use Icinga\Data\Extensible;
10use Icinga\Data\Filter\Filter;
11use Icinga\Data\Reducible;
12use Icinga\Data\Selectable;
13use Icinga\Data\Updatable;
14use Icinga\Exception\IcingaException;
15use Icinga\Exception\NotImplementedError;
16use Icinga\Exception\StatementException;
17use Icinga\Exception\QueryException;
18use Icinga\Web\UrlParams;
19use Icinga\Module\Elasticsearch\Exception\RestApiException;
20
21class RestApiClient implements Extensible, Reducible, Selectable, Updatable
22{
23    /**
24     * The cURL handle of this RestApiClient
25     *
26     * @var resource
27     */
28    protected $curl;
29
30    /**
31     * The host of the API
32     *
33     * @var string
34     */
35    protected $host;
36
37    /**
38     * The name of the user to access the API with
39     *
40     * @var string
41     */
42    protected $user;
43
44    /**
45     * The password for the user the API is accessed with
46     *
47     * @var string
48     */
49    protected $pass;
50
51    /**
52     * The path of a file holding one or more certificates to verify the peer with
53     *
54     * @var string
55     */
56    protected $certificatePath;
57
58    /**
59     * Create a new RestApiClient
60     *
61     * @param   string  $host               The host of the API
62     * @param   string  $user               The name of the user to access the API with
63     * @param   string  $pass               The password for the user the API is accessed with
64     * @param   string  $certificatePath    The path of a file holding one or more certificates to verify the peer with
65     */
66    public function __construct($host, $user = null, $pass = null, $certificatePath = null)
67    {
68        $this->host = $host;
69        $this->user = $user;
70        $this->pass = $pass;
71        $this->certificatePath = $certificatePath;
72    }
73
74    /**
75     * Return the cURL handle of this RestApiClient
76     *
77     * @return  resource
78     */
79    public function getConnection()
80    {
81        if ($this->curl === null) {
82            $this->curl = $this->createConnection();
83        }
84
85        return $this->curl;
86    }
87
88    /**
89     * Create and return a new cURL handle for this RestApiClient
90     *
91     * @return  resource
92     */
93    protected function createConnection()
94    {
95        $curl = curl_init();
96        curl_setopt($curl, CURLOPT_HEADER, true);
97        curl_setopt($curl, CURLOPT_FAILONERROR, false);
98        curl_setopt($curl, CURLOPT_RETURNTRANSFER, true);
99
100        if ($this->certificatePath !== null) {
101            curl_setopt($curl, CURLOPT_CAINFO, $this->certificatePath);
102        }
103
104        if ($this->user !== null && $this->pass !== null) {
105            curl_setopt($curl, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
106            curl_setopt($curl, CURLOPT_USERPWD, $this->user . ':' . $this->pass);
107        }
108
109        return $curl;
110    }
111
112    /**
113     * Send the given request and return its response
114     *
115     * @param   RestApiRequest  $request
116     *
117     * @return  RestApiResponse
118     *
119     * @throws  RestApiException            In case an error occured while handling the request
120     */
121    public function request(RestApiRequest $request)
122    {
123        $scheme = strpos($this->host, '://') !== false ? '' : 'http://';
124        $path = '/' . join('/', array_map('rawurlencode', explode('/', ltrim($request->getPath(), '/'))));
125        $query = ($request->getParams()->isEmpty() ? '' : ('?' . (string) $request->getParams()));
126
127        $curl = $this->getConnection();
128        curl_setopt($curl, CURLOPT_HTTPHEADER, $request->getHeaders());
129        curl_setopt($curl, CURLOPT_CUSTOMREQUEST, $request->getMethod());
130        curl_setopt($curl, CURLOPT_URL, $scheme . $this->host . $path . $query);
131        curl_setopt($curl, CURLOPT_POSTFIELDS, $request->getPayload());
132
133        $result = curl_exec($curl);
134        if ($result === false) {
135            $restApiException = new RestApiException(curl_error($curl));
136            $restApiException->setErrorCode(curl_errno($curl));
137            throw $restApiException;
138        }
139
140        $header = substr($result, 0, curl_getinfo($curl, CURLINFO_HEADER_SIZE));
141        $result = substr($result, curl_getinfo($curl, CURLINFO_HEADER_SIZE));
142
143        $statusCode = 0;
144        foreach (explode("\r\n", $header) as $headerLine) {
145            // The headers are inspected manually because curl_getinfo($curl, CURLINFO_HTTP_CODE)
146            // returns only the first status code. (e.g. 100 instead of 200)
147            $matches = array();
148            if (preg_match('/^HTTP\/[0-9.]+ ([0-9]+)/', $headerLine, $matches)) {
149                $statusCode = (int) $matches[1];
150            }
151        }
152
153        $response = new RestApiResponse($statusCode);
154        if ($result) {
155            $response->setPayload($result);
156            $response->setContentType(curl_getinfo($curl, CURLINFO_CONTENT_TYPE));
157        }
158
159        return $response;
160    }
161
162    /**
163     * Create and return a new query for this RestApiClient
164     *
165     * @param   array   $indices    An array of index name patterns
166     * @param   array   $types      An array of document type names
167     *
168     * @return  RestApiQuery
169     */
170    public function select(array $indices = null, array $types = null)
171    {
172        $query = new RestApiQuery($this);
173        if ($indices !== null) {
174            $query->setIndices($indices);
175        }
176        if ($types !== null) {
177            $query->setTypes($types);
178        }
179        return $query;
180    }
181
182    /**
183     * Fetch and return all documents of the given query's result set using an iterator
184     *
185     * @param   RestApiQuery    $query  The query returning the result set
186     *
187     * @return  ArrayIterator
188     */
189    public function query(RestApiQuery $query)
190    {
191        return new ArrayIterator($this->fetchAll($query));
192    }
193
194    /**
195     * Count all documents of the result set
196     *
197     * @param   RestApiQuery    $query
198     *
199     * @return  int
200     */
201    public function count(RestApiQuery $query)
202    {
203        $response = $this->request($query->createCountRequest());
204        if (! $response->isSuccess()) {
205            throw new QueryException($this->renderErrorMessage($response));
206        }
207
208        return $query->createCountResult($response);
209    }
210
211    /**
212     * Retrieve an array containing all documents of the result set
213     *
214     * @param   RestApiQuery    $query
215     *
216     * @return  array
217     */
218    public function fetchAll(RestApiQuery $query)
219    {
220        $response = $this->request($query->createSearchRequest());
221        if (! $response->isSuccess()) {
222            throw new QueryException($this->renderErrorMessage($response));
223        }
224
225        return $query->createSearchResult($response);
226    }
227
228    /**
229     * Fetch the first document of the result set
230     *
231     * @param   RestApiQuery    $query
232     *
233     * @return  array|false
234     */
235    public function fetchRow(RestApiQuery $query)
236    {
237        $clonedQuery = clone $query;
238        $clonedQuery->limit(1);
239        $results = $this->fetchAll($clonedQuery);
240        return array_shift($results) ?: false;
241    }
242
243    /**
244     * Fetch the first field of all documents of the result set as an array
245     *
246     * @param   RestApiQuery    $query
247     *
248     * @return  array
249     *
250     * @throws  LogicException      In case no attribute is being requested
251     */
252    public function fetchColumn(RestApiQuery $query)
253    {
254        $fields = $query->getColumns();
255        if (empty($fields)) {
256            throw new LogicException('You must request at least one attribute when fetching a single field');
257        }
258
259        $results = $this->fetchAll($query);
260        $alias = key($fields);
261        $field = is_int($alias) ? current($fields) : $alias;
262        $values = array();
263        foreach ($results as $document) {
264            if (isset($document->$field)) {
265                $values[] = $document->$field;
266            }
267        }
268
269        return $values;
270    }
271
272    /**
273     * Fetch the first field of the first document of the result set
274     *
275     * @param   RestApiQuery    $query
276     *
277     * @return  string
278     */
279    public function fetchOne(RestApiQuery $query)
280    {
281        throw new NotImplementedError('RestApiClient::fetchOne() is not implemented yet');
282    }
283
284    /**
285     * Fetch all documents of the result set as an array of key-value pairs
286     *
287     * The first field is the key, the second field is the value.
288     *
289     * @param   RestApiQuery    $query
290     *
291     * @return  array
292     */
293    public function fetchPairs(RestApiQuery $query)
294    {
295        throw new NotImplementedError('RestApiClient::fetchPairs() is not implemented yet');
296    }
297
298    /**
299     * Fetch and return the given document
300     *
301     * In case you are only interested in the source, pass "_source" as the only desired field.
302     *
303     * @param   string      $index          The index the document is located in
304     * @param   string      $documentType   The type of the document to fetch
305     * @param   string      $id             The id of the document to fetch
306     * @param   array       $fields         The desired fields to return instead of all fields
307     * @param   UrlParams   $params         Additional URL parameters to add to the request
308     *
309     * @return  object|false            Returns false in case no document could be found
310     */
311    public function fetchDocument($index, $documentType, $id, array $fields = null, UrlParams $params = null)
312    {
313        $request = new GetApiRequest($index, $documentType, $id);
314        if ($params !== null) {
315            $request->setParams($params);
316        }
317
318        if (! empty($fields)) {
319            if (count($fields) == 1 && reset($fields) === '_source') {
320                $request->setSourceOnly();
321                $fields = null;
322            } elseif (! $request->getParams()->has('_source')) {
323                $request->getParams()->set('_source', join(',', $fields));
324            }
325        }
326
327        $response = $this->request($request);
328        if (! $response->isSuccess()) {
329            if ($response->getStatusCode() === 404) {
330                return false;
331            }
332
333            throw new QueryException($this->renderErrorMessage($response));
334        }
335
336        $hit = new SearchHit($response->json());
337        return $hit->createRow($fields ?: array());
338    }
339
340    /**
341     * Insert the given data for the given target
342     *
343     * @param   string|array    $target
344     * @param   array           $data
345     * @param   UrlParams       $params     Additional URL parameters to add to the request
346     *
347     * @return  bool    Whether the document has been created or not
348     *
349     * @throws  StatementException
350     */
351    public function insert($target, array $data, UrlParams $params = null)
352    {
353        if (is_string($target)) {
354            $target = explode('/', $target);
355        }
356
357        switch (count($target)) {
358            case 3:
359                list($index, $documentType, $id) = $target;
360                break;
361            case 2:
362                list($index, $documentType) = $target;
363                $id = null;
364                break;
365            default:
366                throw new LogicException('Invalid target "%s"', join('/', $target));
367        }
368
369        $request = new IndexApiRequest($index, $documentType, $id, $data);
370        if ($params !== null) {
371            $request->setParams($params);
372        } else {
373            $params = $request->getParams();
374        }
375
376        if (! $params->has('refresh')) {
377            $params->set('refresh', true);
378        }
379
380        try {
381            $response = $this->request($request);
382        } catch (RestApiException $e) {
383            throw new StatementException(
384                'Failed to index document "%s". An error occurred: %s',
385                join('/', $target),
386                $e
387            );
388        }
389
390        if (! $response->isSuccess()) {
391            throw new StatementException(
392                'Unable to index document "%s": %s',
393                join('/', $target),
394                $this->renderErrorMessage($response)
395            );
396        }
397
398        $json = $response->json();
399        return $json['created'];
400    }
401
402    /**
403     * Update the target with the given data and optionally limit the affected documents by using a filter
404     *
405     * Note that the given filter will have no effect in case the target represents a single document.
406     *
407     * @param   string|array    $target
408     * @param   array           $data
409     * @param   Filter          $filter
410     * @param   UrlParams       $params     Additional URL parameters to add to the request
411     *
412     * @return  array   The response for the requested update
413     *
414     * @throws  StatementException
415     *
416     * @todo    Add support for bulk updates
417     */
418    public function update($target, array $data, Filter $filter = null, UrlParams $params = null)
419    {
420        if (is_string($target)) {
421            $target = explode('/', $target);
422        }
423
424        switch (count($target)) {
425            case 3:
426                list($index, $documentType, $id) = $target;
427                break;
428            case 2:
429                if ($filter === null) {
430                    throw new LogicException('Update requests without id are required to provide a filter');
431                }
432
433                list($index, $documentType) = $target;
434                $id = null;
435                break;
436            default:
437                throw new LogicException('Invalid target "%s"', join('/', $target));
438        }
439
440        if ($id !== null) {
441            $request = new UpdateApiRequest($index, $documentType, $id, array('doc' => (object) $data));
442        } elseif ($filter !== null) {
443            $query = new RestApiQuery($this, array('_id'));
444            $ids = $query
445                ->setIndices(array($index))
446                ->setTypes(array($documentType))
447                ->setFilter($filter)
448                ->fetchColumn();
449            if (empty($ids)) {
450                throw new StatementException('No documents found');
451            } elseif (count($ids) == 1) {
452                $request = new UpdateApiRequest($index, $documentType, $ids[0], array('doc' => (object) $data));
453            } else {
454                throw new NotImplementedError('Bulk updates are not supported yet');
455            }
456        }
457
458        if ($params !== null) {
459            $request->setParams($params);
460        } else {
461            $params = $request->getParams();
462        }
463
464        if (! $params->has('refresh')) {
465            $params->set('refresh', true);
466        }
467        if (! $params->has('fields')) {
468            $params->set('fields', '_source');
469        }
470
471        try {
472            $response = $this->request($request);
473        } catch (RestApiException $e) {
474            throw new StatementException(
475                'Failed to update document "%s". An error occurred: %s',
476                join('/', $target),
477                $e
478            );
479        }
480
481        if (! $response->isSuccess()) {
482            throw new StatementException(
483                'Unable to update document "%s": %s',
484                join('/', $target),
485                $this->renderErrorMessage($response)
486            );
487        }
488
489        return $response->json();
490    }
491
492    /**
493     * Delete documents in the given target, optionally limiting the affected documents by using a filter
494     *
495     * Note that the given filter will have no effect in case the target represents a single document.
496     *
497     * @param   string|array    $target
498     * @param   Filter          $filter
499     * @param   UrlParams       $params     Additional URL parameters to add to the request
500     *
501     * @return  array   The response for the requested deletion
502     *
503     * @throws  StatementException
504     *
505     * @todo    Add support for bulk deletions
506     */
507    public function delete($target, Filter $filter = null, UrlParams $params = null)
508    {
509        if (is_string($target)) {
510            $target = explode('/', $target);
511        }
512
513        switch (count($target)) {
514            case 3:
515                list($index, $documentType, $id) = $target;
516                break;
517            case 2:
518                if ($filter === null) {
519                    throw new LogicException('Update requests without id are required to provide a filter');
520                }
521
522                list($index, $documentType) = $target;
523                $id = null;
524                break;
525            default:
526                throw new LogicException('Invalid target "%s"', join('/', $target));
527        }
528
529        if ($id !== null) {
530            $request = new DeleteApiRequest($index, $documentType, $id);
531        } elseif ($filter !== null) {
532            $query = new RestApiQuery($this, array('_id'));
533            $ids = $query
534                ->setIndices(array($index))
535                ->setTypes(array($documentType))
536                ->setFilter($filter)
537                ->fetchColumn();
538            if (empty($ids)) {
539                throw new StatementException('No documents found');
540            } elseif (count($ids) == 1) {
541                $request = new DeleteApiRequest($index, $documentType, $ids[0]);
542            } else {
543                throw new NotImplementedError('Bulk deletions are not supported yet');
544            }
545        }
546
547        if ($params !== null) {
548            $request->setParams($params);
549        } else {
550            $params = $request->getParams();
551        }
552
553        if (! $params->has('refresh')) {
554            $params->set('refresh', true);
555        }
556
557        try {
558            $response = $this->request($request);
559        } catch (RestApiException $e) {
560            throw new StatementException(
561                'Failed to delete document "%s". An error occurred: %s',
562                join('/', $target),
563                $e
564            );
565        }
566
567        if (! $response->isSuccess()) {
568            throw new StatementException(
569                'Unable to delete document "%s": %s',
570                join('/', $target),
571                $this->renderErrorMessage($response)
572            );
573        }
574
575        return $response->json();
576    }
577
578    /**
579     * Render and return a human readable error message for the given error document
580     *
581     * @return  string
582     *
583     * @todo    Parse Elasticsearch 2.x structured errors
584     */
585    public function renderErrorMessage(RestApiResponse $response)
586    {
587        try {
588            $errorDocument = $response->json();
589        } catch (IcingaException $e) {
590            return sprintf('%s: %s',
591                $e->getMessage(),
592                $response->getPayload()
593            );
594        }
595
596        if (! isset($errorDocument['error'])) {
597            return sprintf('Elasticsearch unknown json error %s: %s',
598                $response->getStatusCode(),
599                $response->getPayload()
600            );
601        }
602
603        if (is_string($errorDocument['error'])) {
604            return $errorDocument['error'];
605        }
606
607        return sprintf('Elasticsearch json error %s: %s',
608            $response->getStatusCode(),
609             json_encode($errorDocument['error'])
610        );
611    }
612
613    /**
614     * Render and return the given filter as Elasticsearch query
615     *
616     * @param   Filter  $filter
617     *
618     * @return  array
619     */
620    public function renderFilter(Filter $filter)
621    {
622        $renderer = new FilterRenderer($filter);
623        return $renderer->getQuery();
624    }
625
626    /**
627     * Retrieve columns from the Elasticsearch indices.
628     *
629     * It will get you a merged list of columns available over the specified indices and types.
630     *
631     * @param   array   $indices    The indices or index patterns to get
632     * @param   array   $types      An array of types to get columns for
633     *
634     * @throws  QueryException  When Elasticsearch returns an error
635     *
636     * @return  array           A list of column names
637     *
638     * @todo    Do a cached retrieval?
639     */
640    public function fetchColumns(array $indices = null, array $types = null)
641    {
642        Benchmark::measure('Retrieving columns for types: ' . (!empty($types) ? join(', ', $types) : '(all)'));
643        $request = new GetMappingApiRequest($indices, $types);
644
645        $response = $this->request($request);
646        if (! $response->isSuccess()) {
647            if ($response->getStatusCode() === 404) {
648                return false;
649            }
650
651            throw new QueryException($this->renderErrorMessage($response));
652        }
653
654        // initialize with interal columns
655        $columns = array(
656            '_index',
657            '_type',
658            '_id',
659        );
660
661        foreach ($response->json() as $index => $mappings) {
662            if (! array_key_exists('mappings', $mappings)) {
663                continue;
664            }
665            foreach ($mappings['mappings'] as $type) {
666                if (! array_key_exists('properties', $type)) {
667                    continue;
668                }
669                foreach ($type['properties'] as $column => $detail) {
670                    if ($column === '@version') {
671                        continue;
672                    }
673                    if (array_key_exists('properties', $detail)) {
674                        // ignore structured types
675                        // TODO: support this later?
676                        continue;
677                    }
678                    if (! in_array($column, $columns)) {
679                        $columns[] = $column;
680                    }
681                }
682
683            }
684        }
685        Benchmark::measure('Finished retrieving columns');
686
687        return $columns;
688    }
689}
690