1<?php
2/*
3** Zabbix
4** Copyright (C) 2001-2021 Zabbix SIA
5**
6** This program is free software; you can redistribute it and/or modify
7** it under the terms of the GNU General Public License as published by
8** the Free Software Foundation; either version 2 of the License, or
9** (at your option) any later version.
10**
11** This program is distributed in the hope that it will be useful,
12** but WITHOUT ANY WARRANTY; without even the implied warranty of
13** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14** GNU General Public License for more details.
15**
16** You should have received a copy of the GNU General Public License
17** along with this program; if not, write to the Free Software
18** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
19**/
20
21
22/**
23 * Class to perform low level history related actions.
24 */
25class CHistoryManager {
26
27	/**
28	 * Returns the last $limit history objects for the given items.
29	 *
30	 * @param array $items     an array of items with the 'itemid' and 'value_type' properties
31	 * @param int   $limit     max object count to be returned
32	 * @param int   $period    the maximum period to retrieve data for
33	 *
34	 * @return array    an array with items IDs as keys and arrays of history objects as values
35	 */
36	public function getLastValues(array $items, $limit = 1, $period = null) {
37		$results = [];
38		$grouped_items = self::getItemsGroupedByStorage($items);
39
40		if (array_key_exists(ZBX_HISTORY_SOURCE_ELASTIC, $grouped_items)) {
41			$results += $this->getLastValuesFromElasticsearch($grouped_items[ZBX_HISTORY_SOURCE_ELASTIC], $limit,
42					$period
43			);
44		}
45
46		if (array_key_exists(ZBX_HISTORY_SOURCE_SQL, $grouped_items)) {
47			$results += $this->getLastValuesFromSql($grouped_items[ZBX_HISTORY_SOURCE_SQL], $limit, $period);
48		}
49
50		return $results;
51	}
52
53	/**
54	 * Elasticsearch specific implementation of getLastValues.
55	 *
56	 * @see CHistoryManager::getLastValues
57	 */
58	private function getLastValuesFromElasticsearch($items, $limit, $period) {
59		$terms = [];
60		$results = [];
61		$filter = [];
62
63		foreach ($items as $item) {
64			$terms[$item['value_type']][] = $item['itemid'];
65		}
66
67		$query = [
68			'aggs' => [
69				'group_by_itemid' => [
70					'terms' => [
71						'field' => 'itemid'
72					],
73					'aggs' => [
74						'group_by_docs' => [
75							'top_hits' => [
76								'size' => $limit,
77								'sort' => [
78									'clock' => ZBX_SORT_DOWN
79								]
80							]
81						]
82					]
83				]
84			],
85			'size' => 0
86		];
87
88		if ($period) {
89			$filter[] = [
90				'range' => [
91					'clock' => [
92						'gt' => (time() - $period)
93					]
94				]
95			];
96		}
97
98		foreach (self::getElasticsearchEndpoints(array_keys($terms)) as $type => $endpoint) {
99			$query['query']['bool']['must'] = array_merge([[
100				'terms' => [
101					'itemid' => $terms[$type]
102				]
103			]], $filter);
104			// Assure that aggregations for all terms are returned.
105			$query['aggs']['group_by_itemid']['terms']['size'] = count($terms[$type]);
106			$data = CElasticsearchHelper::query('POST', $endpoint, $query);
107
108			if (!is_array($data) || !array_key_exists('group_by_itemid', $data)
109					|| !array_key_exists('buckets', $data['group_by_itemid'])
110					|| !is_array($data['group_by_itemid']['buckets'])) {
111				continue;
112			}
113
114			foreach ($data['group_by_itemid']['buckets'] as $item) {
115				if (!is_array($item['group_by_docs']) || !array_key_exists('hits', $item['group_by_docs'])
116						|| !is_array($item['group_by_docs']['hits'])
117						|| !array_key_exists('hits', $item['group_by_docs']['hits'])
118						|| !is_array($item['group_by_docs']['hits']['hits'])) {
119					continue;
120				}
121
122				foreach ($item['group_by_docs']['hits']['hits'] as $row) {
123					if (!array_key_exists('_source', $row) || !is_array($row['_source'])) {
124						continue;
125					}
126
127					$results[$item['key']][] = $row['_source'];
128				}
129			}
130		}
131
132		return $results;
133	}
134
135	/**
136	 * SQL specific implementation of getLastValues.
137	 *
138	 * @see CHistoryManager::getLastValues
139	 */
140	private function getLastValuesFromSql($items, $limit, $period) {
141		$results = [];
142
143		if ($period) {
144			$period = time() - $period;
145		}
146
147		if ($limit == 1) {
148			foreach ($items as $item) {
149				// Executing two subsequent queries individually for the sake of performance.
150
151				$clock_max = DBfetch(DBselect(
152					'SELECT MAX(h.clock)'.
153					' FROM '.self::getTableName($item['value_type']).' h'.
154					' WHERE h.itemid='.zbx_dbstr($item['itemid']).
155						($period ? ' AND h.clock>'.$period : '')
156				), false);
157
158				if ($clock_max) {
159					$clock_max = reset($clock_max);
160
161					if ($clock_max !== null) {
162						$values = DBfetchArray(DBselect(
163							'SELECT *'.
164							' FROM '.self::getTableName($item['value_type']).' h'.
165							' WHERE h.itemid='.zbx_dbstr($item['itemid']).
166								' AND h.clock='.zbx_dbstr($clock_max).
167							' ORDER BY h.ns DESC',
168							$limit
169						));
170
171						if ($values) {
172							$results[$item['itemid']] = $values;
173						}
174					}
175				}
176			}
177		}
178		else {
179			foreach ($items as $item) {
180				// Cannot order by h.ns directly here due to performance issues.
181				$values = DBfetchArray(DBselect(
182					'SELECT *'.
183					' FROM '.self::getTableName($item['value_type']).' h'.
184					' WHERE h.itemid='.zbx_dbstr($item['itemid']).
185						($period ? ' AND h.clock>'.$period : '').
186					' ORDER BY h.clock DESC',
187					$limit + 1
188				));
189
190				if ($values) {
191					$count = count($values);
192					$clock = $values[$count - 1]['clock'];
193
194					if ($count == $limit + 1 && $values[$count - 2]['clock'] == $clock) {
195						/*
196						 * The last selected entries having the same clock means the selection (not just the order)
197						 * of the last entries is possibly wrong due to unordered by nanoseconds.
198						 */
199
200						do {
201							unset($values[--$count]);
202						} while ($values && $values[$count - 1]['clock'] == $clock);
203
204						$db_values = DBselect(
205							'SELECT *'.
206							' FROM '.self::getTableName($item['value_type']).' h'.
207							' WHERE h.itemid='.zbx_dbstr($item['itemid']).
208								' AND h.clock='.$clock.
209							' ORDER BY h.ns DESC',
210							$limit - $count
211						);
212
213						while ($db_value = DBfetch($db_values)) {
214							$values[] = $db_value;
215							$count++;
216						}
217					}
218
219					CArrayHelper::sort($values, [
220						['field' => 'clock', 'order' => ZBX_SORT_DOWN],
221						['field' => 'ns', 'order' => ZBX_SORT_DOWN]
222					]);
223
224					$values = array_values($values);
225
226					while ($count > $limit) {
227						unset($values[--$count]);
228					}
229
230					$results[$item['itemid']] = $values;
231				}
232			}
233		}
234
235		return $results;
236	}
237
238	/**
239	 * Returns the history value of the item at the given time. If no value exists at the given time, the function
240	 * will return the previous value.
241	 *
242	 * The $item parameter must have the value_type and itemid properties set.
243	 *
244	 * @param array  $item
245	 * @param string $item['itemid']
246	 * @param int    $item['value_type']
247	 * @param int    $clock
248	 * @param int    $ns
249	 *
250	 * @return string|null  Value at specified time of first value before specified time. null if value is not found.
251	 */
252	public function getValueAt(array $item, $clock, $ns) {
253		switch (self::getDataSourceType($item['value_type'])) {
254			case ZBX_HISTORY_SOURCE_ELASTIC:
255				return $this->getValueAtFromElasticsearch($item, $clock, $ns);
256
257			default:
258				return $this->getValueAtFromSql($item, $clock, $ns);
259		}
260	}
261
262	/**
263	 * Elasticsearch specific implementation of getValueAt.
264	 *
265	 * @see CHistoryManager::getValueAt
266	 */
267	private function getValueAtFromElasticsearch(array $item, $clock, $ns) {
268		$query = [
269			'sort' => [
270				'clock' => ZBX_SORT_DOWN,
271				'ns' => ZBX_SORT_DOWN
272			],
273			'size' => 1
274		];
275
276		$filters = [
277			[
278				[
279					'term' => [
280						'itemid' => $item['itemid']
281					]
282				],
283				[
284					'term' => [
285						'clock' => $clock
286					]
287				],
288				[
289					'range' => [
290						'ns' => [
291							'lte' => $ns
292						]
293					]
294				]
295			],
296			[
297				[
298					'term' => [
299						'itemid' => $item['itemid']
300					]
301				],
302				[
303					'range' => [
304						'clock' => [
305							'lt' => $clock
306						] + (ZBX_HISTORY_PERIOD ? ['gte' => $clock - ZBX_HISTORY_PERIOD] : [])
307					]
308				]
309			]
310		];
311
312		foreach ($filters as $filter) {
313			$query['query']['bool']['must'] = $filter;
314			$endpoints = self::getElasticsearchEndpoints($item['value_type']);
315
316			if (count($endpoints) !== 1) {
317				break;
318			}
319
320			$result = CElasticsearchHelper::query('POST', reset($endpoints), $query);
321
322			if (count($result) === 1 && is_array($result[0]) && array_key_exists('value', $result[0])) {
323				return $result[0]['value'];
324			}
325		}
326
327		return null;
328	}
329
330	/**
331	 * SQL specific implementation of getValueAt.
332	 *
333	 * @see CHistoryManager::getValueAt
334	 */
335	private function getValueAtFromSql(array $item, $clock, $ns) {
336		$value = null;
337		$table = self::getTableName($item['value_type']);
338
339		$sql = 'SELECT value'.
340				' FROM '.$table.
341				' WHERE itemid='.zbx_dbstr($item['itemid']).
342					' AND clock='.zbx_dbstr($clock).
343					' AND ns='.zbx_dbstr($ns);
344
345		if (($row = DBfetch(DBselect($sql, 1))) !== false) {
346			$value = $row['value'];
347		}
348
349		if ($value !== null) {
350			return $value;
351		}
352
353		$max_clock = 0;
354		$sql = 'SELECT DISTINCT clock'.
355				' FROM '.$table.
356				' WHERE itemid='.zbx_dbstr($item['itemid']).
357					' AND clock='.zbx_dbstr($clock).
358					' AND ns<'.zbx_dbstr($ns);
359
360		if (($row = DBfetch(DBselect($sql))) !== false) {
361			$max_clock = $row['clock'];
362		}
363
364		if ($max_clock == 0) {
365			$sql = 'SELECT MAX(clock) AS clock'.
366					' FROM '.$table.
367					' WHERE itemid='.zbx_dbstr($item['itemid']).
368						' AND clock<'.zbx_dbstr($clock).
369						(ZBX_HISTORY_PERIOD ? ' AND clock>='.zbx_dbstr($clock - ZBX_HISTORY_PERIOD) : '');
370
371			if (($row = DBfetch(DBselect($sql))) !== false) {
372				$max_clock = $row['clock'];
373			}
374		}
375
376		if ($max_clock == 0) {
377			return $value;
378		}
379
380		if ($clock == $max_clock) {
381			$sql = 'SELECT value'.
382					' FROM '.$table.
383					' WHERE itemid='.zbx_dbstr($item['itemid']).
384						' AND clock='.zbx_dbstr($clock).
385						' AND ns<'.zbx_dbstr($ns);
386		}
387		else {
388			$sql = 'SELECT value'.
389					' FROM '.$table.
390					' WHERE itemid='.zbx_dbstr($item['itemid']).
391						' AND clock='.zbx_dbstr($max_clock).
392					' ORDER BY itemid,clock desc,ns desc';
393		}
394
395		if (($row = DBfetch(DBselect($sql, 1))) !== false) {
396			$value = $row['value'];
397		}
398
399		return $value;
400	}
401
402	/**
403	 * Returns history value aggregation for graphs.
404	 *
405	 * The $item parameter must have the value_type, itemid and source properties set.
406	 *
407	 * @param array  $items        items to get aggregated values for
408	 * @param int    $time_from    minimal timestamp (seconds) to get data from
409	 * @param int    $time_to      maximum timestamp (seconds) to get data from
410	 * @param int    $width        graph width in pixels (is not required for pie charts)
411	 *
412	 * @return array    history value aggregation for graphs
413	 */
414	public function getGraphAggregation(array $items, $time_from, $time_to, $width = null) {
415		if ($width !== null) {
416			$size = $time_to - $time_from;
417			$delta = $size - $time_from % $size;
418		}
419		else {
420			$size = null;
421			$delta = null;
422		}
423
424		$grouped_items = self::getItemsGroupedByStorage($items);
425
426		$results = [];
427		if (array_key_exists(ZBX_HISTORY_SOURCE_ELASTIC, $grouped_items)) {
428			$results += $this->getGraphAggregationFromElasticsearch($grouped_items[ZBX_HISTORY_SOURCE_ELASTIC],
429					$time_from, $time_to, $width, $size, $delta
430			);
431		}
432
433		if (array_key_exists(ZBX_HISTORY_SOURCE_SQL, $grouped_items)) {
434			$results += $this->getGraphAggregationFromSql($grouped_items[ZBX_HISTORY_SOURCE_SQL], $time_from, $time_to,
435					$width, $size, $delta
436			);
437		}
438
439		return $results;
440	}
441
442	/**
443	 * Elasticsearch specific implementation of getGraphAggregation.
444	 *
445	 * @see CHistoryManager::getGraphAggregation
446	 */
447	private function getGraphAggregationFromElasticsearch(array $items, $time_from, $time_to, $width, $size, $delta) {
448		$terms = [];
449
450		foreach ($items as $item) {
451			$terms[$item['value_type']][] = $item['itemid'];
452		}
453
454		$aggs = [
455			'max_value' => [
456				'max' => [
457					'field' => 'value'
458				]
459			],
460			'avg_value' => [
461				'avg' => [
462					'field' => 'value'
463				]
464			],
465			'min_value' => [
466				'min' => [
467					'field' => 'value'
468				]
469			],
470			'max_clock' => [
471				'max' => [
472					'field' => 'clock'
473				]
474			]
475		];
476
477		$query = [
478			'aggs' => [
479				'group_by_itemid' => [
480					'terms' => [
481						// Assure that aggregations for all terms are returned.
482						'size' => count($items),
483						'field' => 'itemid'
484					]
485				]
486			],
487			'query' => [
488				'bool' => [
489					'must' => [
490						[
491							'terms' => [
492								'itemid' => $terms
493							]
494						],
495						[
496							'range' => [
497								'clock' => [
498									'gte' => $time_from,
499									'lte' => $time_to
500								]
501							]
502						]
503					]
504				]
505			],
506			'size' => 0
507		];
508
509		if ($width !== null && $size !== null && $delta !== null) {
510			// Additional grouping for line graphs.
511			$aggs['max_clock'] = [
512				'max' => [
513					'field' => 'clock'
514				]
515			];
516
517			// Clock value is divided by 1000 as it is stored as milliseconds.
518			$formula = 'Math.floor((params.width*((doc[\'clock\'].date.getMillis()/1000+params.delta)%params.size))'.
519					'/params.size)';
520
521			$script = [
522				'inline' => $formula,
523				'params' => [
524					'width' => (int)$width,
525					'delta' => $delta,
526					'size' => $size
527				]
528			];
529			$aggs = [
530				'group_by_script' => [
531					'terms' => [
532						'size' => $width,
533						'script' => $script
534					],
535					'aggs' => $aggs
536				]
537			];
538		}
539
540		$query['aggs']['group_by_itemid']['aggs'] = $aggs;
541
542		$results = [];
543
544		foreach (self::getElasticsearchEndpoints(array_keys($terms)) as $type => $endpoint) {
545			$query['query']['bool']['must'] = [
546				[
547					'terms' => [
548						'itemid' => $terms[$type]
549					]
550				],
551				[
552					'range' => [
553						'clock' => [
554							'gte' => $time_from,
555							'lte' => $time_to
556						]
557					]
558				]
559			];
560
561			$data = CElasticsearchHelper::query('POST', $endpoint, $query);
562
563			if ($width !== null && $size !== null && $delta !== null) {
564				foreach ($data['group_by_itemid']['buckets'] as $item) {
565					if (!is_array($item['group_by_script']) || !array_key_exists('buckets', $item['group_by_script'])
566							|| !is_array($item['group_by_script']['buckets'])) {
567						continue;
568					}
569
570					$results[$item['key']]['source'] = 'history';
571					foreach ($item['group_by_script']['buckets'] as $point) {
572						$results[$item['key']]['data'][] = [
573							'itemid' => $item['key'],
574							'i' => $point['key'],
575							'count' => $point['doc_count'],
576							'min' => $point['min_value']['value'],
577							'avg' => $point['avg_value']['value'],
578							'max' => $point['max_value']['value'],
579							// Field value_as_string is used to get value as seconds instead of milliseconds.
580							'clock' => $point['max_clock']['value_as_string']
581						];
582					}
583				}
584			}
585			else {
586				foreach ($data['group_by_itemid']['buckets'] as $item) {
587					$results[$item['key']]['source'] = 'history';
588					$results[$item['key']]['data'][] = [
589						'itemid' => $item['key'],
590						'min' => $item['min_value']['value'],
591						'avg' => $item['avg_value']['value'],
592						'max' => $item['max_value']['value'],
593						// Field value_as_string is used to get value as seconds instead of milliseconds.
594						'clock' => $item['max_clock']['value_as_string']
595					];
596				}
597			}
598		}
599
600		return $results;
601	}
602
603	/**
604	 * SQL specific implementation of getGraphAggregation.
605	 *
606	 * @see CHistoryManager::getGraphAggregation
607	 */
608	private function getGraphAggregationFromSql(array $items, $time_from, $time_to, $width, $size, $delta) {
609		$group_by = 'itemid';
610		$sql_select_extra = '';
611
612		if ($width !== null && $size !== null && $delta !== null) {
613			// Required for 'group by' support of Oracle.
614			$calc_field = 'round('.$width.'*'.zbx_sql_mod(zbx_dbcast_2bigint('clock').'+'.$delta, $size)
615					.'/('.$size.'),0)';
616
617			$sql_select_extra = ','.$calc_field.' AS i';
618			$group_by .= ','.$calc_field;
619		}
620
621		$results = [];
622
623		foreach ($items as $item) {
624			if ($item['source'] === 'history') {
625				$sql_select = 'COUNT(*) AS count,AVG(value) AS avg,MIN(value) AS min,MAX(value) AS max';
626				$sql_from = ($item['value_type'] == ITEM_VALUE_TYPE_UINT64) ? 'history_uint' : 'history';
627			}
628			else {
629				$sql_select = 'SUM(num) AS count,AVG(value_avg) AS avg,MIN(value_min) AS min,MAX(value_max) AS max';
630				$sql_from = ($item['value_type'] == ITEM_VALUE_TYPE_UINT64) ? 'trends_uint' : 'trends';
631			}
632
633			$result = DBselect(
634				'SELECT itemid,'.$sql_select.$sql_select_extra.',MAX(clock) AS clock'.
635				' FROM '.$sql_from.
636				' WHERE itemid='.zbx_dbstr($item['itemid']).
637					' AND clock>='.zbx_dbstr($time_from).
638					' AND clock<='.zbx_dbstr($time_to).
639				' GROUP BY '.$group_by
640			);
641
642			$data = [];
643			while (($row = DBfetch($result)) !== false) {
644				$data[] = $row;
645			}
646
647			$results[$item['itemid']]['source'] = $item['source'];
648			$results[$item['itemid']]['data'] = $data;
649		}
650
651		return $results;
652	}
653
654	/**
655	 * Returns aggregated history value.
656	 *
657	 * The $item parameter must have the value_type and itemid properties set.
658	 *
659	 * @param array  $item         item to get aggregated value for
660	 * @param string $aggregation  aggregation to be applied (min / max / avg)
661	 * @param int    $time_from    timestamp (seconds)
662	 *
663	 * @return string    aggregated history value
664	 */
665	public function getAggregatedValue(array $item, $aggregation, $time_from) {
666		switch (self::getDataSourceType($item['value_type'])) {
667			case ZBX_HISTORY_SOURCE_ELASTIC:
668				return $this->getAggregatedValueFromElasticsearch($item, $aggregation, $time_from);
669
670			default:
671				return $this->getAggregatedValueFromSql($item, $aggregation, $time_from);
672		}
673	}
674
675	/**
676	 * Elasticsearch specific implementation of getAggregatedValue.
677	 *
678	 * @see CHistoryManager::getAggregatedValue
679	 */
680	private function getAggregatedValueFromElasticsearch(array $item, $aggregation, $time_from) {
681		$query = [
682			'aggs' => [
683				$aggregation.'_value' => [
684					$aggregation => [
685						'field' => 'value'
686					]
687				]
688			],
689			'query' => [
690				'bool' => [
691					'must' => [
692						[
693							'term' => [
694								'itemid' => $item['itemid']
695							]
696						],
697						[
698							'range' => [
699								'clock' => [
700									'gte' => $time_from
701								]
702							]
703						]
704					]
705				]
706			],
707			'size' => 0
708		];
709
710		$endpoints = self::getElasticsearchEndpoints($item['value_type']);
711
712		if ($endpoints) {
713			$data = CElasticsearchHelper::query('POST', reset($endpoints), $query);
714
715			if (array_key_exists($aggregation.'_value', $data)
716					&& array_key_exists('value', $data[$aggregation.'_value'])) {
717				return $data[$aggregation.'_value']['value'];
718			}
719		}
720
721		return null;
722	}
723
724	/**
725	 * SQL specific implementation of getAggregatedValue.
726	 *
727	 * @see CHistoryManager::getAggregatedValue
728	 */
729	private function getAggregatedValueFromSql(array $item, $aggregation, $time_from) {
730		$result = DBselect(
731			'SELECT '.$aggregation.'(value) AS value'.
732			' FROM '.self::getTableName($item['value_type']).
733			' WHERE clock>'.$time_from.
734			' AND itemid='.zbx_dbstr($item['itemid']).
735			' HAVING COUNT(*)>0' // Necessary because DBselect() return 0 if empty data set, for graph templates.
736		);
737
738		if (($row = DBfetch($result)) !== false) {
739			return $row['value'];
740		}
741
742		return null;
743	}
744
745	/**
746	 * Clear item history and trends by provided item IDs. History is deleted from both SQL and Elasticsearch.
747	 *
748	 * @param array $itemids    item ids to delete history for
749	 *
750	 * @return bool
751	 */
752	public function deleteHistory(array $itemids) {
753		return $this->deleteHistoryFromSql($itemids) && $this->deleteHistoryFromElasticsearch($itemids);
754	}
755
756	/**
757	 * Elasticsearch specific implementation of deleteHistory.
758	 *
759	 * @see CHistoryManager::deleteHistory
760	 */
761	private function deleteHistoryFromElasticsearch(array $itemids) {
762		global $HISTORY;
763
764		if (is_array($HISTORY) && array_key_exists('types', $HISTORY) && is_array($HISTORY['types'])
765				&& count($HISTORY['types']) > 0) {
766
767			$query = [
768				'query' => [
769					'terms' => [
770						'itemid' => array_values($itemids)
771					]
772				]
773			];
774
775			$types = [];
776			foreach ($HISTORY['types'] as $type) {
777				$types[] = self::getTypeIdByTypeName($type);
778			}
779
780			foreach (self::getElasticsearchEndpoints($types, '_delete_by_query') as $endpoint) {
781				if (!CElasticsearchHelper::query('POST', $endpoint, $query)) {
782					return false;
783				}
784			}
785		}
786
787		return true;
788	}
789
790	/**
791	 * SQL specific implementation of deleteHistory.
792	 *
793	 * @see CHistoryManager::deleteHistory
794	 */
795	private function deleteHistoryFromSql(array $itemids) {
796		return DBexecute('DELETE FROM trends WHERE '.dbConditionInt('itemid', $itemids))
797				&& DBexecute('DELETE FROM trends_uint WHERE '.dbConditionInt('itemid', $itemids))
798				&& DBexecute('DELETE FROM history_text WHERE '.dbConditionInt('itemid', $itemids))
799				&& DBexecute('DELETE FROM history_log WHERE '.dbConditionInt('itemid', $itemids))
800				&& DBexecute('DELETE FROM history_uint WHERE '.dbConditionInt('itemid', $itemids))
801				&& DBexecute('DELETE FROM history_str WHERE '.dbConditionInt('itemid', $itemids))
802				&& DBexecute('DELETE FROM history WHERE '.dbConditionInt('itemid', $itemids));
803	}
804
805	/**
806	 * Get type name by value type id.
807	 *
808	 * @param int $value_type    value type id
809	 *
810	 * @return string    value type name
811	 */
812	public static function getTypeNameByTypeId($value_type) {
813		$mapping = [
814			ITEM_VALUE_TYPE_FLOAT => 'dbl',
815			ITEM_VALUE_TYPE_STR => 'str',
816			ITEM_VALUE_TYPE_LOG => 'log',
817			ITEM_VALUE_TYPE_UINT64 => 'uint',
818			ITEM_VALUE_TYPE_TEXT => 'text'
819		];
820
821		if (array_key_exists($value_type, $mapping)) {
822			return $mapping[$value_type];
823		}
824
825		// Fallback to float.
826		return $mapping[ITEM_VALUE_TYPE_FLOAT];
827	}
828
829	/**
830	 * Get type id by value type name.
831	 *
832	 * @param int $type_name    value type name
833	 *
834	 * @return int    value type id
835	 */
836	public static function getTypeIdByTypeName($type_name) {
837		$mapping = [
838			'dbl' => ITEM_VALUE_TYPE_FLOAT,
839			'str' => ITEM_VALUE_TYPE_STR,
840			'log' => ITEM_VALUE_TYPE_LOG,
841			'uint' => ITEM_VALUE_TYPE_UINT64,
842			'text' => ITEM_VALUE_TYPE_TEXT
843		];
844
845		if (array_key_exists($type_name, $mapping)) {
846			return $mapping[$type_name];
847		}
848
849		// Fallback to float.
850		return ITEM_VALUE_TYPE_FLOAT;
851	}
852
853	/**
854	 * Get data source (SQL or Elasticsearch) type based on value type id.
855	 *
856	 * @param int $value_type    value type id
857	 *
858	 * @return string    data source type
859	 */
860	public static function getDataSourceType($value_type) {
861		static $cache = [];
862
863		if (!array_key_exists($value_type, $cache)) {
864			global $HISTORY;
865
866			if (is_array($HISTORY) && array_key_exists('types', $HISTORY) && is_array($HISTORY['types'])) {
867				$cache[$value_type] = in_array(self::getTypeNameByTypeId($value_type), $HISTORY['types'])
868						? ZBX_HISTORY_SOURCE_ELASTIC : ZBX_HISTORY_SOURCE_SQL;
869			}
870			else {
871				// SQL is a fallback data source.
872				$cache[$value_type] = ZBX_HISTORY_SOURCE_SQL;
873			}
874		}
875
876		return $cache[$value_type];
877	}
878
879	private static function getElasticsearchUrl($value_name) {
880		static $urls = [];
881		static $invalid = [];
882
883		// Additional check to limit error count produced by invalid configuration.
884		if (array_key_exists($value_name, $invalid)) {
885			return null;
886		}
887
888		if (!array_key_exists($value_name, $urls)) {
889			global $HISTORY;
890
891			if (!is_array($HISTORY) || !array_key_exists('url', $HISTORY)) {
892				$invalid[$value_name] = true;
893				error(_s('Elasticsearch url is not set for type: %1$s.', $value_name));
894
895				return null;
896			}
897
898			$url = $HISTORY['url'];
899			if (is_array($url)) {
900				if (!array_key_exists($value_name, $url)) {
901					$invalid[$value_name] = true;
902					error(_s('Elasticsearch url is not set for type: %1$s.', $value_name));
903
904					return null;
905				}
906
907				$url = $url[$value_name];
908			}
909
910			if (substr($url, -1) !== '/') {
911				$url .= '/';
912			}
913
914			$urls[$value_name] = $url;
915		}
916
917		return $urls[$value_name];
918	}
919
920	/**
921	 * Get endpoints for Elasticsearch requests.
922	 *
923	 * @param mixed $value_types    value type(s)
924	 *
925	 * @return array    Elasticsearch query endpoints
926	 */
927	public static function getElasticsearchEndpoints($value_types, $action = '_search') {
928		if (!is_array($value_types)) {
929			$value_types = [$value_types];
930		}
931
932		$indices = [];
933		$endponts = [];
934
935		foreach (array_unique($value_types) as $type) {
936			if (self::getDataSourceType($type) === ZBX_HISTORY_SOURCE_ELASTIC) {
937				$indices[$type] = self::getTypeNameByTypeId($type);
938			}
939		}
940
941		foreach ($indices as $type => $index) {
942			if (($url = self::getElasticsearchUrl($index)) !== null) {
943				$endponts[$type] = $url.$index.'*/values/'.$action;
944			}
945		}
946
947		return $endponts;
948	}
949
950	/**
951	 * Return the name of the table where the data for the given value type is stored.
952	 *
953	 * @param int $value_type    value type
954	 *
955	 * @return string    table name
956	 */
957	public static function getTableName($value_type) {
958		$tables = [
959			ITEM_VALUE_TYPE_LOG => 'history_log',
960			ITEM_VALUE_TYPE_TEXT => 'history_text',
961			ITEM_VALUE_TYPE_STR => 'history_str',
962			ITEM_VALUE_TYPE_FLOAT => 'history',
963			ITEM_VALUE_TYPE_UINT64 => 'history_uint'
964		];
965
966		return $tables[$value_type];
967	}
968
969	/**
970	 * Returns the items grouped by the storage type.
971	 *
972	 * @param array $items     an array of items with the 'value_type' property
973	 *
974	 * @return array    an array with storage type as a keys and item arrays as a values
975	 */
976	private function getItemsGroupedByStorage(array $items) {
977		$grouped_items = [];
978
979		foreach ($items as $item) {
980			$source = self::getDataSourceType($item['value_type']);
981			$grouped_items[$source][] = $item;
982		}
983
984		return $grouped_items;
985	}
986}
987