1<?php
2// (c) Copyright by authors of the Tiki Wiki CMS Groupware Project
3//
4// All Rights Reserved. See copyright.txt for details and a complete list of authors.
5// Licensed under the GNU LESSER GENERAL PUBLIC LICENSE. See license.txt for details.
6// $Id$
7
8class Search_Elastic_Connection
9{
10	private $dsn;
11	private $version;
12	private $mapping_type;
13	private $dirty = [];
14
15	private $indices = [];
16
17	private $bulk;
18
19	function __construct($dsn)
20	{
21		$this->dsn = rtrim($dsn, '/');
22		$this->version = null;
23		if ($this->getVersion() >= 6.2) {
24			$this->mapping_type = '_doc'; // compatible with 7+ but not supported before 6.2
25		} else {
26			$this->mapping_type = 'doc';
27		}
28	}
29
30	function __destruct()
31	{
32		try {
33			$this->flush();
34		} catch (Search_Elastic_TransportException $e) {
35			 // Left in blank
36		}
37	}
38
39	function startBulk($size = 500)
40	{
41		$this->bulk = new Search_Elastic_BulkOperation(
42			$size,
43			function ($data) {
44				$this->postBulk($data);
45			},
46			($this->getVersion() >= 7.0 ? '' : $this->mapping_type)
47		);
48	}
49
50	function getStatus()
51	{
52		try {
53			$result = $this->get('/');
54
55			if (isset($result->version)) {	// elastic v2
56				$result->ok = true;
57				$result->status = 200;
58			} elseif (! isset($result->ok)) {
59				$result->ok = $result->status === 200;
60			}
61
62			return $result;
63		} catch (Exception $e) {
64			Feedback::error($e->getMessage());
65			return (object) [
66				'ok' => false,
67				'status' => 0,
68			];
69		}
70	}
71
72	/**
73	 * gets the elasticsearch version string, e.g. 2.2.0
74	 *
75	 * @return float
76	 */
77	function getVersion()
78	{
79		if ($this->version === null) {
80			$status = $this->getStatus();
81
82			if (! empty($status->version->number)) {
83				$this->version = (float) $status->version->number;
84			} else {
85				$this->version = 0;
86			}
87		}
88
89		return $this->version;
90	}
91
92	function getIndexStatus($index = '')
93	{
94		$index = $index ? '/' . $index : '';
95		try {
96			if ($this->getVersion() < 2) {
97				return $this->get("$index/_status");
98			} else {
99				return $this->get("$index/_stats");	// v2 "Indices Stats" API result
100			}
101		} catch (Exception $e) {
102			$message = $e->getMessage();
103
104			if (strpos($message, '[_status]') === false && strpos($message, 'no such index') === false) {	// another error
105				Feedback::error($message . ' for index ' . $index);
106				return null;
107			}
108		}
109	}
110
111	function deleteIndex($index)
112	{
113		$this->flush();
114
115		try {
116			unset($this->indices[$index]);
117			return $this->delete("/$index");
118		} catch (Search_Elastic_Exception $e) {
119			if ($e->getCode() !== 404) {
120				throw $e;
121			}
122		}
123	}
124
125	function search($index, array $query, array $args = [], $multisearch = false)
126	{
127		$indices = (array) $index;
128		foreach ($indices as $index) {
129			if (! empty($this->dirty[$index])) {
130				$this->refresh($index);
131			}
132			if (!$multisearch) {
133				// The purpose of Multisearch is to reduce connections to ES and so skip unless ES have Multivalidate
134				$this->validate($index, $query);
135			}
136		}
137
138		if ($multisearch) {
139			// Process an array of queries for Elasticsearch Multisearch
140			$queries = '';
141			foreach ($query as $q) {
142				$queries .= "{}\n"; // use the search index set in URL
143				$queries .= json_encode($q) . "\n";
144			}
145		}
146
147		$index = implode(',', $indices);
148
149		if ($multisearch) {
150			// We have an array of queries for Elasticsearch Multisearch
151			$ret = $this->post("/$index/_msearch?" . http_build_query($args, '', '&'), $queries);
152			$ret = $ret->responses; // return the array of responses for each query
153		} else {
154			$ret = $this->post("/$index/_search?" . http_build_query($args, '', '&'), json_encode($query));
155		}
156		return $ret;
157	}
158
159	function validate($index, array $query)
160	{
161		$result = $this->post("/$index/_validate/query?explain=true", json_encode(['query' => $query['query']]));
162		if (isset($result->valid) && $result->valid === false) {
163			if (! empty($result->explanations)) {
164				foreach ($result->explanations as $explanation) {
165					if ($explanation->valid === false) {
166						throw new Search_Elastic_QueryParsingException($explanation->error);
167					}
168				}
169			}
170			if (! empty($result->error)) {
171				throw new Search_Elastic_QueryParsingException($result->error);
172			}
173		}
174	}
175
176	function scroll($scrollId, array $args = [])
177	{
178		if ($this->getVersion() < 5.0) {
179			return $this->post('/_search/scroll?' . http_build_query($args, '', '&'), $scrollId);
180		} else {
181			$args['scroll_id'] = $scrollId;
182			return $this->post('/_search/scroll?' . http_build_query($args, '', '&'), '');
183		}
184	}
185
186	function storeQuery($index, $name, $query)
187	{
188		if ($this->getVersion() >= 5) {
189			return $this->rawIndex($index, 'percolator', $name, $query);
190		} else {
191			return $this->rawIndex($index, '.percolator', $name, $query);
192		}
193	}
194
195	function unstoreQuery($index, $name)
196	{
197		if ($this->getVersion() >= 5) {
198			return $this->delete("/$index/{$this->mapping_type}/percolator-$name");
199		} else {
200			return $this->delete("/$index/.percolator/$name");
201		}
202	}
203
204	function percolate($index, $type, $document)
205	{
206		if (! empty($this->dirty['_percolator'])) {
207			$this->refresh('_percolator');
208		}
209
210		$type = $this->simplifyType($type);
211		if ($this->getVersion() >= 6) {
212			$result = $this->search($index, [
213				'query' => [
214					'percolate' => [
215						'field' => 'query',
216						'document' => $document
217					],
218				],
219			]);
220			return array_map(function ($item) {
221				return preg_replace('/^percolator-/', '', $item->_id);
222			}, $result->hits->hits);
223		} elseif ($this->getVersion() >= 5) {
224			$result = $this->search($index, [
225				'query' => [
226					'percolate' => [
227						'field' => 'query',
228						'document_type' => $type,
229						'document' => $document
230					],
231				],
232			]);
233			return array_map(function ($item) {
234				return preg_replace('/^percolator-/', '', $item->_id);
235			}, $result->hits->hits);
236		} else {
237			$result = $this->get("/$index/$type/_percolate", json_encode([
238				'doc' => $document,
239			]));
240			return array_map(function ($item) {
241				return $item->_id;
242			}, $result->matches);
243		}
244	}
245
246	function index($index, $type, $id, array $data)
247	{
248		$type = $this->simplifyType($type);
249
250		$this->rawIndex($index, $type, $id, $data);
251	}
252
253	function assignAlias($alias, $targetIndex)
254	{
255		$this->flush();
256
257		$active = [];
258		$toRemove = [];
259		$current = $this->rawApi('/_aliases');
260		foreach ($current as $indexName => $info) {
261			if (isset($info->aliases->$alias)) {
262				$active[] = $indexName;
263				$toRemove[] = $indexName;
264			} elseif (0 === strpos($indexName, $alias . '_') && $indexName != $targetIndex) {
265				$toRemove[] = $indexName;
266			}
267		}
268		$actions = [
269			['add' => ['index' => $targetIndex, 'alias' => $alias]],
270		];
271
272		foreach ($active as $index) {
273			$actions[] = ['remove' => ['index' => $index, 'alias' => $alias]];
274		}
275
276		// Before assigning new alias, check there is not already an index matching alias name.
277		if (isset($current->$alias) && ! $this->aliasExists($alias)) {
278			$this->deleteIndex($alias);
279		}
280
281		$this->post('/_aliases', json_encode([
282			'actions' => $actions,
283		]));
284
285		// Make sure the new index is fully active, then clean-up
286		$this->refresh($alias);
287
288		foreach ($toRemove as $old) {
289			$this->deleteIndex($old);
290		}
291	}
292
293	function isRebuilding($aliasName)
294	{
295		try {
296			$current = $this->rawApi('/_aliases');
297		} catch (Search_Elastic_Exception $e) {
298			$current = [];
299		}
300		foreach ($current as $indexName => $info) {
301			$hasAlias = isset($info->aliases) && count((array) $info->aliases) > 0;
302			if (0 === strpos($indexName, $aliasName . '_') && ! $hasAlias) {
303				// Matching name, no alias, means currently rebuilding
304				return true;
305			}
306		}
307
308		return false;
309	}
310
311	public function resolveAlias($aliasOrIndexName) {
312		try {
313			$current = $this->rawApi('/'.$aliasOrIndexName);
314		} catch (Search_Elastic_Exception $e) {
315			$current = [];
316		}
317		foreach ($current as $indexName => $_) {
318			return $indexName;
319		}
320		return $aliasOrIndexName;
321	}
322
323	private function rawIndex($index, $type, $id, $data)
324	{
325		$this->dirty[$index] = true;
326
327		if ($this->bulk) {
328			$this->bulk->index($index, $type, $id, $data);
329		} else {
330			$id = rawurlencode($id);
331			if ($type === '.percolator') {
332				return $this->put("/$index/$type/$id", json_encode($data));
333			} else {
334				return $this->put("/$index/{$this->mapping_type}/$type-$id", json_encode($data));
335			}
336		}
337	}
338
339	function unindex($index, $type, $id)
340	{
341		$this->dirty[$index] = true;
342		$type = $this->simplifyType($type);
343
344		if ($this->bulk) {
345			$this->bulk->unindex($index, $type, $id);
346		} else {
347			$id = rawurlencode($id);
348			if ($type === '.percolator') {
349				return $this->delete("/$index/$type/$id");
350			} else {
351				return $this->delete("/$index/{$this->mapping_type}/$type-$id");
352			}
353		}
354	}
355
356	function flush()
357	{
358		if ($this->bulk) {
359			try {
360				$this->bulk->flush();
361			} catch (Exception $e) {
362				// report error and exception and try to carry on
363				$message = tr('Elastic search flush error: %0', $e->getMessage());
364
365				Feedback::error($message);	// for browser and console output
366				trigger_error($message);	// gets logged while indexing
367			}
368		}
369	}
370
371	function refresh($index)
372	{
373		$this->flush();
374
375		$this->post("/$index/_refresh", '');
376		$this->dirty[$index] = false;
377	}
378
379	function document($index, $type, $id)
380	{
381		if (! empty($this->dirty[$index])) {
382			$this->refresh($index);
383		}
384
385		$type = $this->simplifyType($type);
386		$id = rawurlencode($id);
387
388		$document = $this->get("/$index/{$this->mapping_type}/$type-$id");
389
390		if (isset($document->_source)) {
391			return $document->_source;
392		}
393	}
394
395	function mapping($index, array $mapping, callable $getIndex)
396	{
397		$data = ["properties" => $mapping];
398
399		if (empty($this->indices[$index])) {
400			$this->createIndex($index, $getIndex);
401			$this->indices[$index] = true;
402		}
403
404		$result = $this->put("/$index/_mapping/".($this->getVersion() >= 7.0 ? '' : $this->mapping_type), json_encode($data));
405
406		return $result;
407	}
408
409	function postBulk($data)
410	{
411		$this->post("/_bulk", $data);
412	}
413
414	function rawApi($path)
415	{
416		return $this->get($path);
417	}
418
419	private function aliasExists($index)
420	{
421		try {
422			$response = $this->get("/_alias/$index", "");
423			if (! empty($response->status) && $response->status == 404) {
424				return false;
425			}
426		} catch (Search_Elastic_Exception $e) {
427			return false;
428		}
429		return true;
430	}
431
432	private function createIndex($index, callable $getIndex)
433	{
434		global $prefs;
435
436		if ($this->aliasExists($index)) {
437			return;
438		}
439
440		try {
441			$this->put(
442				"/$index",
443				json_encode($getIndex())
444			);
445		} catch (Search_Elastic_Exception $e) {
446			// Index already exists: ignore
447		}
448
449		if ($this->getVersion() >= 5) {
450			$this->put("/$index/_mapping/".($this->getVersion() >= 7.0 ? '' : $this->mapping_type), json_encode([
451				'properties' => [
452					'query' => [
453						'type' => 'percolator'
454					],
455				],
456			]));
457			$this->put("/$index/_settings", json_encode([
458				'index.mapping.total_fields.limit' => $prefs['unified_elastic_field_limit'],
459			]));
460		}
461	}
462
463	private function get($path, $data = null)
464	{
465		try {
466			$client = $this->getClient($path);
467			if ($data) {
468				$client->setRawBody($data);
469			}
470			$client->setMethod(Zend\Http\Request::METHOD_GET);
471			$client->setHeaders(['Content-Type: application/json']);
472			$response = $client->send();
473			return $this->handleResponse($response);
474		} catch (Zend\Http\Exception\ExceptionInterface $e) {
475			throw new Search_Elastic_TransportException($e->getMessage());
476		}
477	}
478
479	private function put($path, $data)
480	{
481		try {
482			$client = $this->getClient($path);
483			$client->getRequest()->setMethod(Zend\Http\Request::METHOD_PUT);
484			$client->getRequest()->setContent($data);
485			$client->setHeaders(['Content-Type: application/json']);
486			$response = $client->send();
487
488			return $this->handleResponse($response);
489		} catch (Zend\Http\Exception\ExceptionInterface $e) {
490			throw new Search_Elastic_TransportException($e->getMessage());
491		}
492	}
493
494	private function post($path, $data)
495	{
496		try {
497			$client = $this->getClient($path);
498			$client->getRequest()->setMethod(Zend\Http\Request::METHOD_POST);
499			$client->getRequest()->setContent($data);
500			$client->setHeaders(['Content-Type: application/json']);
501			$response = $client->send();
502
503			return $this->handleResponse($response);
504		} catch (Zend\Http\Exception\ExceptionInterface $e) {
505			throw new Search_Elastic_TransportException($e->getMessage());
506		}
507	}
508
509	private function delete($path)
510	{
511		try {
512			$client = $this->getClient($path);
513			$client->getRequest()->setMethod(Zend\Http\Request::METHOD_DELETE);
514			$client->setHeaders(['Content-Type: application/json']);
515			$response = $client->send();
516
517			return $this->handleResponse($response);
518		} catch (Zend\Http\Exception\ExceptionInterface $e) {
519			throw new Search_Elastic_TransportException($e->getMessage());
520		}
521	}
522
523	private function handleResponse($response)
524	{
525		$content = json_decode($response->getBody());
526
527		if ($response->isSuccess()) {
528			if (isset($content->items)) {
529				foreach($content->items as $item) {
530					foreach($item as $res) {
531						if (isset($res->error)) {
532							$message = $res->error;
533							if (is_object($message) && ! empty($message->reason)) {
534								$message = $message->reason;
535							}
536							throw new Search_Elastic_Exception((string)$message);
537						}
538					}
539				}
540			}
541			return $content;
542		} elseif (isset($content->exists) && $content->exists === false) {
543			throw new Search_Elastic_NotFoundException($content->_type, $content->_id);
544		} elseif (isset($content->error)) {
545			$message = $content->error;
546			if (is_object($message) && ! empty($message->reason)) {
547				$message = $message->reason;
548			}
549			if (preg_match('/^MapperParsingException\[No handler for type \[(?P<type>.*)\].*\[(?P<field>.*)\]\]$/', $message, $parts)) {
550				throw new Search_Elastic_MappingException($parts['type'], $parts['field']);
551			} elseif (preg_match('/No mapping found for \[(\S+)\] in order to sort on/', $message, $parts)) {
552				throw new Search_Elastic_SortException($parts[1]);
553			} elseif (preg_match('/NumberFormatException\[For input string: "(.*?)"\]/', $message, $parts)) {
554				$pattern = '/' . preg_quote('{"match":{"___wildcard___":{"query":"' . $parts[1] . '"}}') . '/';
555				$pattern = str_replace('___wildcard___', '([^"]*)', $pattern);
556				if (preg_match($pattern, $message, $parts2)) {
557					$field = $parts2[1];
558				} else {
559					$field = '';
560				}
561				throw new Search_Elastic_NumberFormatException($parts[1], $field);
562			} elseif (preg_match('/QueryParsingException\[\[[^\]]*\] \[[^\]]*\] ([^\]]*)\]/', $message, $parts)) {
563				throw new Search_Elastic_QueryParsingException($parts[1]);
564			} else {
565				throw new Search_Elastic_Exception($message, $content->status);
566			}
567		} else {
568			return $content;
569		}
570	}
571
572	private function getClient($path)
573	{
574		$full = "{$this->dsn}$path";
575
576		$tikilib = TikiLib::lib('tiki');
577		try {
578			return $tikilib->get_http_client($full);
579		} catch (\Zend\Http\Exception\ExceptionInterface $e) {
580			throw new Search_Elastic_TransportException($e->getMessage());
581		}
582	}
583
584	private function simplifyType($type)
585	{
586		return preg_replace('/[^a-z]/', '', $type);
587	}
588
589	/**
590	 * Store the dirty flags at the end of the request and restore them when opening the
591	 * connection within a single user session so that if a modification requires re-indexing,
592	 * the next page load will wait until indexing is done to show the results.
593	 */
594	function persistDirty(Tiki_Event_Manager $events)
595	{
596		if (isset($_SESSION['elastic_search_dirty'])) {
597			$this->dirty = $_SESSION['elastic_search_dirty'];
598			unset($_SESSION['elastic_search_dirty']);
599		}
600
601		// Before the HTTP request is closed
602		$events->bind('tiki.process.redirect', function () {
603			$_SESSION['elastic_search_dirty'] = $this->dirty;
604		});
605	}
606}
607