1<?php
2/*
3** Zabbix
4** Copyright (C) 2001-2021 Zabbix SIA
5**
6** This program is free software; you can redistribute it and/or modify
7** it under the terms of the GNU General Public License as published by
8** the Free Software Foundation; either version 2 of the License, or
9** (at your option) any later version.
10**
11** This program is distributed in the hope that it will be useful,
12** but WITHOUT ANY WARRANTY; without even the implied warranty of
13** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14** GNU General Public License for more details.
15**
16** You should have received a copy of the GNU General Public License
17** along with this program; if not, write to the Free Software
18** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
19**/
20
21
22class DB {
23
24	const SCHEMA_FILE = 'schema.inc.php';
25
26	const DBEXECUTE_ERROR = 1;
27	const RESERVEIDS_ERROR = 2;
28	const SCHEMA_ERROR = 3;
29	const INPUT_ERROR = 4;
30
31	const TABLE_TYPE_CONFIG = 1;
32	const TABLE_TYPE_HISTORY = 2;
33
34	const FIELD_TYPE_INT = 'int';
35	const FIELD_TYPE_CHAR = 'char';
36	const FIELD_TYPE_ID = 'id';
37	const FIELD_TYPE_FLOAT = 'float';
38	const FIELD_TYPE_UINT = 'uint';
39	const FIELD_TYPE_BLOB = 'blob';
40	const FIELD_TYPE_TEXT = 'text';
41	const FIELD_TYPE_NCLOB = 'nclob';
42
43	private static $schema = null;
44
45	/**
46	 * @var DbBackend
47	 */
48	private static $dbBackend;
49
50	/**
51	 * Get necessary DB class.
52	 *
53	 * @return DbBackend
54	 */
55	public static function getDbBackend() {
56		global $DB;
57
58		if (!self::$dbBackend) {
59			switch ($DB['TYPE']) {
60				case ZBX_DB_MYSQL:
61					self::$dbBackend = new MysqlDbBackend();
62					break;
63				case ZBX_DB_POSTGRESQL:
64					self::$dbBackend = new PostgresqlDbBackend();
65					break;
66				case ZBX_DB_ORACLE:
67					self::$dbBackend = new OracleDbBackend();
68					break;
69			}
70		}
71
72		return self::$dbBackend;
73	}
74
75	private static function exception($code, $error) {
76		throw new DBException($error, $code);
77	}
78
79	/**
80	 * Reserve ids for primary key of passed table.
81	 * If record for table does not exist or value is out of range, ids record is created
82	 * using maximum id from table or minimum allowed value.
83	 *
84	 * @throw APIException
85	 *
86	 * @static
87	 *
88	 * @param string $table table name
89	 * @param int $count number of ids to reserve
90	 *
91	 * @return string
92	 */
93	public static function reserveIds($table, $count) {
94		global $DB;
95
96		$tableSchema = self::getSchema($table);
97		$id_name = $tableSchema['key'];
98
99		$sql = 'SELECT nextid'.
100				' FROM ids'.
101				' WHERE table_name='.zbx_dbstr($table).
102					' AND field_name='.zbx_dbstr($id_name).
103				' FOR UPDATE';
104
105		$res = DBfetch(DBselect($sql));
106
107		if ($res) {
108			$maxNextId = bcadd($res['nextid'], $count, 0);
109
110			if (bccomp($maxNextId, ZBX_DB_MAX_ID) == 1) {
111				$nextid = self::refreshIds($table, $count);
112			}
113			else {
114				$sql = 'UPDATE ids'.
115						' SET nextid='.$maxNextId.
116						' WHERE table_name='.zbx_dbstr($table).
117							' AND field_name='.zbx_dbstr($id_name);
118
119				if (!DBexecute($sql)) {
120					self::exception(self::DBEXECUTE_ERROR, 'DBEXECUTE_ERROR');
121				}
122
123				$nextid = bcadd($res['nextid'], 1, 0);
124			}
125		}
126
127		/*
128		 * Detect either the query is executable at all? If query is valid and schema is correct but query still cannot
129		 * be executed, then there is a good chance that previous transaction has left row level lock unreleased or it
130		 * is still running. In such a case execution must be stopped, otherwise it will call self::refreshIds method.
131		 */
132		elseif (!DBexecute($sql)) {
133			self::exception(self::DBEXECUTE_ERROR,
134				_('Your database is not working properly. Please wait a few minutes and try to repeat this action. If the problem still persists, please contact system administrator. The problem might be caused by long running transaction or row level lock accomplished by your database management system.')
135			);
136		}
137		// If query is executable, but still returns false, only then call refreshIds.
138		else {
139			$nextid = self::refreshIds($table, $count);
140		}
141
142		return $nextid;
143	}
144
145	/**
146	 * Refresh id record for given table.
147	 * Record is deleted and then created again with value of maximum id from table or minimum allowed.
148	 *
149	 * @throw APIException
150	 *
151	 * @static
152	 *
153	 * @param string $table table name
154	 * @param int    $count number of ids to reserve
155	 *
156	 * @return string
157	 */
158	private static function refreshIds($table, $count) {
159		$tableSchema = self::getSchema($table);
160		$id_name = $tableSchema['key'];
161
162		// when we reach the maximum ID, we try to refresh them to check if any IDs have been freed
163		$sql = 'DELETE FROM ids WHERE table_name='.zbx_dbstr($table).' AND field_name='.zbx_dbstr($id_name);
164
165		if (!DBexecute($sql)) {
166			self::exception(self::DBEXECUTE_ERROR, 'DBEXECUTE_ERROR');
167		}
168
169		$row = DBfetch(DBselect('SELECT MAX('.$id_name.') AS id FROM '.$table));
170
171		$nextid = ($row && $row['id']) ? $row['id'] : 0;
172
173		$maxNextId = bcadd($nextid, $count, 0);
174
175		if (bccomp($maxNextId, ZBX_DB_MAX_ID) == 1) {
176			self::exception(
177				self::RESERVEIDS_ERROR, __METHOD__.' ID greater than maximum allowed for table "'.$table.'"'
178			);
179		}
180
181		$sql = 'INSERT INTO ids (table_name,field_name,nextid)'.
182				' VALUES ('.zbx_dbstr($table).','.zbx_dbstr($id_name).','.$maxNextId.')';
183
184		if (!DBexecute($sql)) {
185			self::exception(self::DBEXECUTE_ERROR, 'DBEXECUTE_ERROR');
186		}
187
188		$nextid = bcadd($nextid, 1, 0);
189
190		return $nextid;
191	}
192
193	/**
194	 * Returns the array describing the database schema.
195	 *
196	 * If the $table parameter is passed, the method will return the schema for the given table,
197	 * otherwise - for the whole database.
198	 *
199	 * @static
200	 *
201	 * @throws APIException if the given table does not exist
202	 *
203	 * @param string $table
204	 *
205	 * @return array
206	 */
207	public static function getSchema($table = null) {
208		if (is_null(self::$schema)) {
209			self::$schema = include(dirname(__FILE__).'/../../'.self::SCHEMA_FILE);
210		}
211
212		if (is_null($table)) {
213			return self::$schema;
214		}
215		elseif (isset(self::$schema[$table])) {
216			return self::$schema[$table];
217		}
218		else {
219			self::exception(self::SCHEMA_ERROR, _s('Table "%1$s" does not exist.', $table));
220		}
221	}
222
223	/**
224	 * Returns the names of the fields that are used as the primary key of the table.
225	 *
226	 * @static
227	 *
228	 * @param string $tableName
229	 *
230	 * @return string|array
231	 */
232	protected static function getPk($tableName) {
233		$schema = self::getSchema($tableName);
234
235		return $schema['key'];
236	}
237
238	/**
239	 * Returns true if the table $tableName has the $fieldName field.
240	 *
241	 * @static
242	 *
243	 * @param string $tableName
244	 * @param string $fieldName
245	 *
246	 * @return bool
247	 */
248	public static function hasField($tableName, $fieldName) {
249		$schema = self::getSchema($tableName);
250
251		return isset($schema['fields'][$fieldName]);
252	}
253
254	/**
255	 * Returns length of the field.
256	 *
257	 * @static
258	 *
259	 * @param string $table_name
260	 * @param string $field_name
261	 *
262	 * @return int
263	 */
264	public static function getFieldLength($table_name, $field_name) {
265		global $DB;
266
267		$schema = self::getSchema($table_name);
268
269		if ($schema['fields'][$field_name]['type'] == self::FIELD_TYPE_TEXT) {
270			return ($DB['TYPE'] == ZBX_DB_ORACLE) ? 2048 : 65535;
271		}
272
273		if ($schema['fields'][$field_name]['type'] == self::FIELD_TYPE_NCLOB) {
274			return 65535;
275		}
276
277		return $schema['fields'][$field_name]['length'];
278	}
279
280	public static function getDefaults($table) {
281		$table = self::getSchema($table);
282
283		$defaults = [];
284		foreach ($table['fields'] as $name => $field) {
285			if (isset($field['default'])) {
286				$defaults[$name] = $field['default'];
287			}
288		}
289		return $defaults;
290	}
291
292	/**
293	 * Returns the default value of the given field.
294	 *
295	 * @param string $table		name of the table
296	 * @param string $field		name of the field
297	 *
298	 * @return string|null
299	 */
300	public static function getDefault($table, $field) {
301		$table = self::getSchema($table);
302		$field = $table['fields'][$field];
303
304		return isset($field['default']) ? $field['default'] : null;
305	}
306
307	/**
308	 * Get the updated values of a record by correctly comparing the new and old ones, taking field types into account.
309	 *
310	 * @param string $table_name
311	 * @param array  $new_values
312	 * @param array  $old_values
313	 *
314	 * @return array
315	 */
316	public static function getUpdatedValues(string $table_name, array $new_values, array $old_values): array {
317		$updated_values = [];
318
319		// Discard field names not existing in the target table.
320		$fields = array_intersect_key(DB::getSchema($table_name)['fields'], $new_values);
321
322		foreach ($fields as $name => $spec) {
323			if (!array_key_exists($name, $old_values)) {
324				$updated_values[$name] = $new_values[$name];
325				continue;
326			}
327
328			switch ($spec['type']) {
329				case DB::FIELD_TYPE_ID:
330					if (bccomp($new_values[$name], $old_values[$name]) != 0) {
331						$updated_values[$name] = $new_values[$name];
332					}
333					break;
334
335				case DB::FIELD_TYPE_INT:
336				case DB::FIELD_TYPE_UINT:
337				case DB::FIELD_TYPE_FLOAT:
338					if ($new_values[$name] != $old_values[$name]) {
339						$updated_values[$name] = $new_values[$name];
340					}
341					break;
342
343				default:
344					if ($new_values[$name] !== $old_values[$name]) {
345						$updated_values[$name] = $new_values[$name];
346					}
347					break;
348			}
349		}
350
351		return $updated_values;
352	}
353
354	private static function checkValueTypes($tableSchema, &$values) {
355		global $DB;
356
357		foreach ($values as $field => $value) {
358			if (!isset($tableSchema['fields'][$field])) {
359				unset($values[$field]);
360				continue;
361			}
362
363			if (isset($tableSchema['fields'][$field]['ref_table'])) {
364				if ($tableSchema['fields'][$field]['null']) {
365					$values[$field] = ($values[$field] == '0') ? NULL : $values[$field];
366				}
367			}
368
369			if (is_null($values[$field])) {
370				if ($tableSchema['fields'][$field]['null']) {
371					$values[$field] = 'NULL';
372				}
373				elseif (isset($tableSchema['fields'][$field]['default'])) {
374					$values[$field] = zbx_dbstr($tableSchema['fields'][$field]['default']);
375				}
376				else {
377					self::exception(self::DBEXECUTE_ERROR,
378						_s('Field "%1$s" cannot be set to NULL.', $field)
379					);
380				}
381			}
382			else {
383				switch ($tableSchema['fields'][$field]['type']) {
384					case self::FIELD_TYPE_CHAR:
385						$length = mb_strlen($values[$field]);
386
387						if ($length > $tableSchema['fields'][$field]['length']) {
388							self::exception(self::SCHEMA_ERROR, _s('Value "%1$s" is too long for field "%2$s" - %3$d characters. Allowed length is %4$d characters.',
389								$values[$field], $field, $length, $tableSchema['fields'][$field]['length']));
390						}
391						$values[$field] = zbx_dbstr($values[$field]);
392						break;
393					case self::FIELD_TYPE_ID:
394					case self::FIELD_TYPE_UINT:
395						if (!zbx_ctype_digit($values[$field])) {
396							self::exception(self::DBEXECUTE_ERROR, _s('Incorrect value "%1$s" for unsigned int field "%2$s".', $values[$field], $field));
397						}
398						$values[$field] = zbx_dbstr($values[$field]);
399						break;
400					case self::FIELD_TYPE_INT:
401						if (!zbx_is_int($values[$field])) {
402							self::exception(self::DBEXECUTE_ERROR, _s('Incorrect value "%1$s" for int field "%2$s".', $values[$field], $field));
403						}
404						$values[$field] = zbx_dbstr($values[$field]);
405						break;
406					case self::FIELD_TYPE_FLOAT:
407						if (!is_numeric($values[$field])) {
408							self::exception(self::DBEXECUTE_ERROR, _s('Incorrect value "%1$s" for float field "%2$s".', $values[$field], $field));
409						}
410						$values[$field] = zbx_dbstr($values[$field]);
411						break;
412					case self::FIELD_TYPE_TEXT:
413						if ($DB['TYPE'] == ZBX_DB_ORACLE) {
414							$length = mb_strlen($values[$field]);
415
416							if ($length > 2048) {
417								self::exception(self::SCHEMA_ERROR, _s('Value "%1$s" is too long for field "%2$s" - %3$d characters. Allowed length is %4$d characters.',
418									$values[$field], $field, $length, 2048));
419							}
420						}
421						$values[$field] = zbx_dbstr($values[$field]);
422						break;
423					case self::FIELD_TYPE_NCLOB:
424						// Using strlen because 4000 bytes is largest possible string literal in oracle query.
425						if ($DB['TYPE'] == ZBX_DB_ORACLE && strlen($values[$field]) > ORACLE_MAX_STRING_SIZE) {
426							$chunks = zbx_dbstr(self::chunkMultibyteStr($values[$field], ORACLE_MAX_STRING_SIZE));
427							$values[$field] = 'TO_NCLOB('.implode(') || TO_NCLOB(', $chunks).')';
428						}
429						else {
430							$values[$field] = zbx_dbstr($values[$field]);
431						}
432						break;
433				}
434			}
435		}
436	}
437
438	/**
439	 * @param string $str
440	 * @param int $chunk_size
441	 *
442	 * @return array
443	 */
444	public static function chunkMultibyteStr(string $str, int $chunk_size): array {
445		$chunks = [];
446		$offset = 0;
447		$size = strlen($str);
448
449		while ($offset < $size) {
450			$chunk = mb_strcut($str, $offset, $chunk_size);
451			$chunks[] = $chunk;
452			$offset = strlen($chunk) + $offset;
453		}
454
455		return $chunks;
456	}
457
458	/**
459	 * Returns the records that match the given criteria.
460	 *
461	 * @static
462	 *
463	 * @param string $tableName
464	 * @param array $criteria   An associative array of field-value pairs, where value can be either a single value
465	 *                          or an array (IN)
466	 *
467	 * @return array
468	 */
469	public static function find($tableName, array $criteria = []) {
470		// build the WHERE part
471		$sqlWhere = [];
472		foreach ($criteria as $field => $value) {
473			// check if the table has this field
474			if (!self::hasField($tableName, $field)) {
475				self::exception(self::DBEXECUTE_ERROR, _s('Table "%1$s" doesn\'t have a field named "%2$s".', $tableName, $field));
476			}
477
478			$sqlWhere[] = dbConditionString($field, zbx_toArray($value));
479		}
480
481		// build query
482		$sql = 'SELECT * FROM '.$tableName;
483		if ($sqlWhere) {
484			$sql .= ' WHERE '.implode(' AND ', $sqlWhere);
485		}
486
487		return DBfetchArray(DBSelect($sql));
488	}
489
490	/**
491	 * Insert data into DB.
492	 *
493	 * @param string $table
494	 * @param array  $values pair of fieldname => fieldvalue
495	 * @param bool   $getids
496	 *
497	 * @return array    an array of ids with the keys preserved
498	 */
499	public static function insert($table, $values, $getids = true) {
500		$table_schema = self::getSchema($table);
501		$fields = array_reduce($values, 'array_merge', []);
502		$fields = array_intersect_key($fields, $table_schema['fields']);
503
504		foreach ($fields as $field => &$value) {
505			$value = array_key_exists('default', $table_schema['fields'][$field])
506				? $table_schema['fields'][$field]['default']
507				: null;
508		}
509		unset($value);
510
511		foreach ($values as $key => &$row) {
512			$row = array_merge($fields, $row);
513		}
514		unset($row);
515
516		return self::insertBatch($table, $values, $getids);
517	}
518
519	/**
520	 * Returns the list of mandatory fields with default values for INSERT statements.
521	 *
522	 * @static
523	 *
524	 * @param array $table_schema
525	 *
526	 * @return array
527	 */
528	private static function getMandatoryFields(array $table_schema): array {
529		global $DB;
530
531		$mandatory_fields = [];
532
533		if ($DB['TYPE'] == ZBX_DB_MYSQL) {
534			foreach ($table_schema['fields'] as $name => $field) {
535				if ($field['type'] == self::FIELD_TYPE_TEXT || $field['type'] == self::FIELD_TYPE_NCLOB) {
536					$mandatory_fields += [$name => $field['default']];
537				}
538			}
539		}
540
541		return $mandatory_fields;
542	}
543
544	/**
545	 * Insert batch data into DB.
546	 *
547	 * @param string $table
548	 * @param array  $values pair of fieldname => fieldvalue
549	 * @param bool   $getids
550	 *
551	 * @return array    an array of ids with the keys preserved
552	 */
553	public static function insertBatch($table, $values, $getids = true) {
554		if (empty($values)) {
555			return true;
556		}
557
558		$resultIds = [];
559
560		$table_schema = self::getSchema($table);
561
562		if ($getids) {
563			$id = self::reserveIds($table, count($values));
564		}
565
566		$mandatory_fields = self::getMandatoryFields($table_schema);
567
568		foreach ($values as $key => &$row) {
569			if ($getids) {
570				$resultIds[$key] = $id;
571				$row[$table_schema['key']] = $id;
572				$id = bcadd($id, 1, 0);
573			}
574
575			$row += $mandatory_fields;
576
577			self::checkValueTypes($table_schema, $row);
578		}
579		unset($row);
580
581		$sql = self::getDbBackend()->createInsertQuery($table, array_keys(reset($values)), $values);
582
583		if (!DBexecute($sql)) {
584			self::exception(self::DBEXECUTE_ERROR, _s('SQL statement execution has failed "%1$s".', $sql));
585		}
586
587		return $resultIds;
588	}
589
590	/**
591	 * Update data in DB.
592	 *
593	 * @param string $table
594	 * @param array $data
595	 * @param array $data[...]['values'] pair of fieldname => fieldvalue for SET clause
596	 * @param array $data[...]['where'] pair of fieldname => fieldvalue for WHERE clause
597	 *
598	 * @return array of ids
599	 */
600	public static function update($table, $data) {
601		if (empty($data)) {
602			return true;
603		}
604
605		$tableSchema = self::getSchema($table);
606
607		$data = zbx_toArray($data);
608		foreach ($data as $row) {
609			// check
610			self::checkValueTypes($tableSchema, $row['values']);
611			if (empty($row['values'])) {
612				self::exception(self::DBEXECUTE_ERROR, _s('Cannot perform update statement on table "%1$s" without values.', $table));
613			}
614
615			// set creation
616			$sqlSet = '';
617			foreach ($row['values'] as $field => $value) {
618				if ($sqlSet !== '') {
619					$sqlSet .= ',';
620				}
621				$sqlSet .= $field.'='.$value;
622			}
623
624			if (!isset($row['where']) || empty($row['where']) || !is_array($row['where'])) {
625				self::exception(self::DBEXECUTE_ERROR, _s('Cannot perform update statement on table "%1$s" without where condition.', $table));
626			}
627
628			// where condition processing
629			$sqlWhere = [];
630			foreach ($row['where'] as $field => $values) {
631				if (!isset($tableSchema['fields'][$field]) || is_null($values)) {
632					self::exception(self::DBEXECUTE_ERROR, _s('Incorrect field "%1$s" name or value in where statement for table "%2$s".', $field, $table));
633				}
634				$values = zbx_toArray($values);
635				sort($values); // sorting ids to prevent deadlocks when two transactions depend on each other
636
637				$sqlWhere[] = dbConditionString($field, $values);
638			}
639
640			// sql execution
641			$sql = 'UPDATE '.$table.' SET '.$sqlSet.' WHERE '.implode(' AND ', $sqlWhere);
642			if (!DBexecute($sql)) {
643				self::exception(self::DBEXECUTE_ERROR, _s('SQL statement execution has failed "%1$s".', $sql));
644			}
645		}
646
647		return true;
648	}
649
650	/**
651	 * Updates the values by the given PK.
652	 *
653	 * @static
654	 *
655	 * @param string $tableName
656	 * @param string $pk
657	 * @param array $values
658	 *
659	 * @return bool
660	 */
661	public static function updateByPk($tableName, $pk, array $values) {
662		return self::update($tableName, [
663			'where' => [self::getPk($tableName) => $pk],
664			'values' => $values
665		]);
666	}
667
668	/**
669	 * Saves the given records to the database. If the record has the primary key set, it is updated, otherwise - a new
670	 * record is inserted. For new records the newly generated PK is added to the result.
671	 *
672	 * @static
673	 *
674	 * @param $tableName
675	 * @param $data
676	 *
677	 * @return array    the same records, that have been passed with the primary keys set for new records
678	 */
679	public static function save($tableName, array $data) {
680		$pk = self::getPk($tableName);
681
682		$newRecords = [];
683		foreach ($data as $key => $record) {
684			// if the pk is set - update the record
685			if (isset($record[$pk])) {
686				self::updateByPk($tableName, $record[$pk], $record);
687			}
688			// if no pk is set, create the record later
689			else {
690				$newRecords[$key] = $data[$key];
691			}
692		}
693
694		// insert the new records
695		if ($newRecords) {
696			$newIds = self::insert($tableName, $newRecords);
697			foreach ($newIds as $key => $id) {
698				$data[$key][$pk] = $id;
699			}
700		}
701
702		return $data;
703	}
704
705	/**
706	 * Replaces the records given in $oldRecords with the ones in $newRecords.
707	 *
708	 * If a record with the same primary key as a new one already exists in the old records, the record is updated
709	 * only if they are different. For new records the newly generated PK is added to the result. Old records that are
710	 * not present in the new records are deleted.
711	 *
712	 * All of the records must have the primary key defined.
713	 *
714	 * @static
715	 *
716	 * @param $tableName
717	 * @param array $oldRecords
718	 * @param array $newRecords
719	 *
720	 * @return array    the new records, that have been passed with the primary keys set for newly inserted records
721	 */
722	public static function replace($tableName, array $oldRecords, array $newRecords) {
723		$pk = self::getPk($tableName);
724		$oldRecords = zbx_toHash($oldRecords, $pk);
725
726		$modifiedRecords = [];
727		foreach ($newRecords as $key => $record) {
728			// if it's a new or modified record - save it later
729			if (!isset($record[$pk]) || self::recordModified($tableName, $oldRecords[$record[$pk]], $record)) {
730				$modifiedRecords[$key] = $record;
731			}
732
733			// remove the existing records from the collection, the remaining ones will be deleted
734			if(isset($record[$pk])) {
735				unset($oldRecords[$record[$pk]]);
736			}
737		}
738
739		// save modified records
740		if ($modifiedRecords) {
741			$modifiedRecords = self::save($tableName, $modifiedRecords);
742
743			// add the new IDs to the new records
744			foreach ($modifiedRecords as $key => $record) {
745				$newRecords[$key][$pk] = $record[$pk];
746			}
747		}
748
749		// delete remaining records
750		if ($oldRecords) {
751			DB::delete($tableName, [
752				$pk => array_keys($oldRecords)
753			]);
754		}
755
756		return $newRecords;
757	}
758
759	/**
760	 * Replaces the records given in $groupedOldRecords with the ones given in $groupedNewRecords.
761	 *
762	 * This method can be used to replace related objects in one-to-many relations. Both old and new records
763	 * must be grouped by the ID of the record they belong to. The records will be matched by position, instead of
764	 * the primary key as in DB::replace(). That is, the first new record will update the first old one, second new
765	 * record - the second old one, etc. Since the records are matched by position, the new records should not contain
766	 * primary keys.
767	 *
768	 * Example 1:
769	 * $old = array(2 => array( array('gitemid' => 1, 'color' => 'FF0000') ));
770	 * $new = array(2 => array( array('color' => '00FF00') ));
771	 * var_dump(DB::replaceByPosition('items', $old, $new));
772	 * // array(array('gitemid' => 1, 'color' => '00FF00'))
773	 *
774	 * The new record updated the old one.
775	 *
776	 * Example 2:
777	 * $old = array(2 => array( array('gitemid' => 1, 'color' => 'FF0000') ));
778	 * $new = array(
779	 *     2 => array(
780	 *         array('color' => '00FF00'),
781	 *         array('color' => '0000FF')
782	 *     )
783	 * );
784	 * var_dump(DB::replaceByPosition('items', $old, $new));
785	 * // array(array('gitemid' => 1, 'color' => '00FF00'), array('gitemid' => 2, 'color' => '0000FF'))
786	 *
787	 * The first record was updated, the second one - created.
788	 *
789	 * Example 3:
790	 * $old = array(
791	 *     2 => array(
792	 *         array('gitemid' => 1, 'color' => 'FF0000'),
793	 *         array('gitemid' => 2, 'color' => '0000FF')
794	 *     )
795	 * );
796	 * $new = array(2 => array( array('color' => '00FF00') ));
797	 * var_dump(DB::replaceByPosition('items', $old, $new));
798	 * // array(array('gitemid' => 1, 'color' => '00FF00'))
799	 *
800	 * The first record was updated, the second one - deleted.
801	 *
802	 * @param string 	$tableName			table to update
803	 * @param array 	$groupedOldRecords	grouped old records
804	 * @param array 	$groupedNewRecords	grouped new records
805	 *
806	 * @return array	array of new records not grouped (!).
807	 */
808	public static function replaceByPosition($tableName, array $groupedOldRecords, array $groupedNewRecords) {
809		$pk = self::getPk($tableName);
810
811		$allOldRecords = [];
812		$allNewRecords = [];
813		foreach ($groupedNewRecords as $key => $newRecords) {
814			// if records exist for the parent object - replace them, otherwise create new records
815			if (isset($groupedOldRecords[$key])) {
816				$oldRecords = $groupedOldRecords[$key];
817
818				// updated the records by position
819				$newRecords = self::mergeRecords($oldRecords, $newRecords, $pk);
820
821				foreach ($oldRecords as $record) {
822					$allOldRecords[] = $record;
823				}
824			}
825
826			foreach ($newRecords as $record) {
827				$allNewRecords[] = $record;
828			}
829		}
830
831		// replace the old records with the new ones
832		return self::replace($tableName, $allOldRecords, $allNewRecords);
833	}
834
835	/**
836	 * Compares the fields, that are present in both records, and returns true if any of the values differ.
837	 *
838	 * @static
839	 * @param $tableName
840	 * @param array $oldRecord
841	 * @param array $newRecord
842	 *
843	 * @return bool
844	 */
845	public static function recordModified($tableName, array $oldRecord, array $newRecord) {
846		foreach ($oldRecord as $field => $value) {
847			if (self::hasField($tableName, $field)
848					&& isset($newRecord[$field])
849					&& (string) $value !== (string) $newRecord[$field]) {
850				return true;
851			}
852		}
853
854		return false;
855	}
856
857	/**
858	 * Replace each record in $oldRecords with a corresponding record in $newRecords, but keep the old record IDs.
859	 * The records are match by position, that is, the first new record, replaces the first old record and etc.
860	 * If there are less $newRecords than $oldRecords, the remaining old records will be discarded.
861	 *
862	 * @param array 	$oldRecords		array of old records
863	 * @param array 	$newRecords		array of new records
864	 * @param string 	$pk				name of the private key column
865	 *
866	 * @return array	array of new records with the primary keys from the old ones
867	 */
868	protected static function mergeRecords(array $oldRecords, array $newRecords, $pk) {
869		$result = [];
870		foreach ($newRecords as $i => $record) {
871			if (isset($oldRecords[$i])) {
872				$record[$pk] = $oldRecords[$i][$pk];
873			}
874
875			$result[] = $record;
876		}
877
878		return $result;
879	}
880
881	/**
882	 * Delete data from DB.
883	 *
884	 * Example:
885	 * DB::delete('items', ['itemid' => [1, 8, 6]]);
886	 * DELETE FROM items WHERE itemid IN (1, 8, 6)
887	 *
888	 * DB::delete('items', ['itemid' => [1], 'templateid' => [10]]);
889	 * DELETE FROM items WHERE itemid IN (1) AND templateid IN (10)
890	 *
891	 * @param string $table
892	 * @param array  $wheres pair of fieldname => fieldvalues
893	 * @param bool   $use_or
894	 *
895	 * @return bool
896	 */
897	public static function delete($table, $wheres, $use_or = false) {
898		if (empty($wheres) || !is_array($wheres)) {
899			self::exception(self::DBEXECUTE_ERROR, _s('Cannot perform delete statement on table "%1$s" without where condition.', $table));
900		}
901		$table_schema = self::getSchema($table);
902
903		$sqlWhere = [];
904		foreach ($wheres as $field => $values) {
905			if (!isset($table_schema['fields'][$field]) || is_null($values)) {
906				self::exception(self::DBEXECUTE_ERROR, _s('Incorrect field "%1$s" name or value in where statement for table "%2$s".', $field, $table));
907			}
908			$values = zbx_toArray($values);
909			sort($values); // sorting ids to prevent deadlocks when two transactions depends from each other
910
911			$sqlWhere[] = dbConditionString($field, $values);
912		}
913
914		$sql = 'DELETE FROM '.$table.' WHERE '.implode(($use_or ? ' OR ' : ' AND '), $sqlWhere);
915		if (!DBexecute($sql)) {
916			self::exception(self::DBEXECUTE_ERROR, _s('SQL statement execution has failed "%1$s"', $sql));
917		}
918
919		return true;
920	}
921
922	/**
923	 * @param string $table_name
924	 * @param array  $options
925	 * @param string $table_alias
926	 *
927	 * @return string
928	 */
929	public static function makeSql($table_name, array &$options, $table_alias = null) {
930		$defaults = [
931			'output' => [],
932			'countOutput' => false,
933			'filter' => [],
934			'search' => [],
935			'startSearch' => false,
936			'searchByAny' => false,
937			'sortfield' => [],
938			'sortorder' => [],
939			'limit' => null,
940			'preservekeys' => false
941		];
942
943		if ($array_diff = array_diff_key($options, $defaults)) {
944			unset($array_diff[self::getPk($table_name).'s']);
945			if ($array_diff) {
946				self::exception(self::SCHEMA_ERROR,
947					vsprintf('%s: unsupported option "%s".', [__FUNCTION__, key($array_diff)])
948				);
949			}
950		}
951
952		$options = zbx_array_merge($defaults, $options);
953
954		$sql_parts = self::createSelectQueryParts($table_name, $options, $table_alias);
955
956		return 'SELECT '.implode(',', $sql_parts['select']).
957				' FROM '.implode(',', $sql_parts['from']).
958				($sql_parts['where'] ? ' WHERE '.implode(' AND ', $sql_parts['where']) : '').
959				($sql_parts['order'] ? ' ORDER BY '.implode(',', $sql_parts['order']) : '');
960	}
961
962	/**
963	 * @param string $table_name
964	 * @param array  $options
965	 * @param string $table_alias
966	 *
967	 * @return array
968	 */
969	public static function select($table_name, array $options, $table_alias = null) {
970		$result = [];
971		$field_names = array_flip($options['output']);
972		$db_result = DBSelect(self::makeSql($table_name, $options, $table_alias), $options['limit']);
973
974		if ($options['preservekeys']) {
975			$pk = self::getPk($table_name);
976
977			while ($db_row = DBfetch($db_result)) {
978				$result[$db_row[$pk]] = $options['countOutput'] ? $db_row : array_intersect_key($db_row, $field_names);
979			}
980		}
981		else {
982			while ($db_row = DBfetch($db_result)) {
983				$result[] = $options['countOutput'] ? $db_row : array_intersect_key($db_row, $field_names);
984			}
985		}
986
987		return $result;
988	}
989
990	/**
991	 * Returns the table name with the table alias.
992	 *
993	 * @param string $table_name
994	 * @param string $table_alias
995	 *
996	 * @return string
997	 */
998	private static function tableId($table_name, $table_alias = null) {
999		return($table_alias !== null) ? $table_name.' '.$table_alias : $table_name;
1000	}
1001
1002	/**
1003	 * Prepends the table alias to the given field name.
1004	 *
1005	 * @param string $field_name
1006	 * @param string $table_alias
1007	 *
1008	 * @return string
1009	 */
1010	private static function fieldId($field_name, $table_alias = null) {
1011		return ($table_alias !== null) ? $table_alias.'.'.$field_name : $field_name;
1012	}
1013
1014	/**
1015	 * Builds an SQL parts array from the given options.
1016	 *
1017	 * @param string $table_name
1018	 * @param array  $options
1019	 * @param string $table_alias
1020	 *
1021	 * @return array		The resulting SQL parts array
1022	 */
1023	private static function createSelectQueryParts($table_name, array $options, $table_alias = null) {
1024		$sql_parts = [
1025			'select' => [],
1026			'from' => [self::tableId($table_name, $table_alias)],
1027			'where' => [],
1028			'order' => []
1029		];
1030
1031		// add output options
1032		$sql_parts = self::applyQueryOutputOptions($table_name, $options, $table_alias, $sql_parts);
1033
1034		// add filter options
1035		$sql_parts = self::applyQueryFilterOptions($table_name, $options, $table_alias, $sql_parts);
1036
1037		// add search options
1038		$sql_parts = self::applyQuerySearchOptions($table_name, $options, $table_alias, $sql_parts);
1039
1040		// add sort options
1041		$sql_parts = self::applyQuerySortOptions($table_name, $options, $table_alias, $sql_parts);
1042
1043		return $sql_parts;
1044	}
1045
1046	/**
1047	 * Modifies the SQL parts to implement all of the output related options.
1048	 *
1049	 * @param string $table_name
1050	 * @param array  $options
1051	 * @param string $table_alias
1052	 * @param array  $sql_parts
1053	 *
1054	 * @return array
1055	 */
1056	private static function applyQueryOutputOptions($table_name, array $options, $table_alias = null,
1057			array $sql_parts) {
1058		if ($options['countOutput']) {
1059			$sql_parts['select'][] = 'COUNT('.self::fieldId('*', $table_alias).') AS rowscount';
1060		}
1061		else {
1062			$table_schema = self::getSchema($table_name);
1063			$select = [];
1064			$select[self::fieldId(self::getPk($table_name), $table_alias)] = true;
1065
1066			foreach ($options['output'] as $field_name) {
1067				if (!array_key_exists($field_name, $table_schema['fields'])) {
1068					self::exception(self::SCHEMA_ERROR,
1069						vsprintf('%s: field "%s.%s" does not exist.', [__FUNCTION__, $table_name, $field_name])
1070					);
1071				}
1072
1073				$select[self::fieldId($field_name, $table_alias)] = true;
1074			}
1075
1076			$sql_parts['select'] = array_keys($select);
1077		}
1078
1079		return $sql_parts;
1080	}
1081
1082	/**
1083	 * Modifies the SQL parts to implement all of the filter related options.
1084	 *
1085	 * @param string $table_name
1086	 * @param array  $options
1087	 * @param string $table_alias
1088	 * @param array  $sql_parts
1089	 *
1090	 * @return array
1091	 */
1092	private static function applyQueryFilterOptions($table_name, array $options, $table_alias = null,
1093			array $sql_parts) {
1094		$table_schema = self::getSchema($table_name);
1095		$pk = self::getPk($table_name);
1096		$pk_option = $pk.'s';
1097
1098		// pks
1099		if (array_key_exists($pk_option, $options)) {
1100			if (!is_array($options[$pk_option])) {
1101				$options[$pk_option] = [$options[$pk_option]];
1102			}
1103
1104			$field_schema = $table_schema['fields'][$pk];
1105			$field_name = self::fieldId($pk, $table_alias);
1106
1107			switch ($field_schema['type']) {
1108				case self::FIELD_TYPE_ID:
1109					$sql_parts['where'][] = dbConditionId($field_name, $options[$pk_option]);
1110					break;
1111
1112				case self::FIELD_TYPE_INT:
1113				case self::FIELD_TYPE_UINT:
1114					$sql_parts['where'][] = dbConditionInt($field_name, $options[$pk_option]);
1115					break;
1116
1117				default:
1118					$sql_parts['where'][] = dbConditionString($field_name, $options[$pk_option]);
1119			}
1120		}
1121
1122		// filters
1123		if (is_array($options['filter'])) {
1124			$sql_parts = self::dbFilter($table_name, $options, $table_alias, $sql_parts);
1125		}
1126
1127		return $sql_parts;
1128	}
1129
1130	/**
1131	 * Modifies the SQL parts to implement all of the search related options.
1132	 *
1133	 * @param string $table_name
1134	 * @param array  $options
1135	 * @param array  $options['search']
1136	 * @param bool   $options['startSearch']
1137	 * @param bool   $options['searchByAny']
1138	 * @param string $table_alias
1139	 * @param array  $sql_parts
1140	 *
1141	 * @return array
1142	 */
1143	private static function applyQuerySearchOptions($table_name, array $options, $table_alias = null,
1144			array $sql_parts) {
1145		global $DB;
1146
1147		$table_schema = DB::getSchema($table_name);
1148		$unsupported_types = [self::FIELD_TYPE_INT, self::FIELD_TYPE_ID, self::FIELD_TYPE_FLOAT, self::FIELD_TYPE_UINT,
1149			self::FIELD_TYPE_BLOB
1150		];
1151
1152		$start = $options['startSearch'] ? '' : '%';
1153		$glue = $options['searchByAny'] ? ' OR ' : ' AND ';
1154
1155		$search = [];
1156
1157		foreach ($options['search'] as $field_name => $patterns) {
1158			if (!array_key_exists($field_name, $table_schema['fields'])) {
1159				self::exception(self::SCHEMA_ERROR,
1160					vsprintf('%s: field "%s.%s" does not exist.', [__FUNCTION__, $table_name, $field_name])
1161				);
1162			}
1163
1164			$field_schema = $table_schema['fields'][$field_name];
1165
1166			if (in_array($field_schema['type'], $unsupported_types)) {
1167				self::exception(self::SCHEMA_ERROR,
1168					vsprintf('%s: field "%s.%s" has an unsupported type.', [__FUNCTION__, $table_name, $field_name])
1169				);
1170			}
1171
1172			if ($patterns === null) {
1173				continue;
1174			}
1175
1176			foreach ((array) $patterns as $pattern) {
1177				// escaping parameter that is about to be used in LIKE statement
1178				$pattern = mb_strtoupper(strtr($pattern, ['!' => '!!', '%' => '!%', '_' => '!_']));
1179				$pattern = $start.$pattern.'%';
1180
1181				if ($DB['TYPE'] == ZBX_DB_ORACLE && $field_schema['type'] === DB::FIELD_TYPE_NCLOB
1182						&& strlen($pattern) > ORACLE_MAX_STRING_SIZE) {
1183					$chunks = zbx_dbstr(DB::chunkMultibyteStr($pattern, ORACLE_MAX_STRING_SIZE));
1184					$pattern = 'TO_NCLOB('.implode(') || TO_NCLOB(', $chunks).')';
1185				}
1186				else {
1187					$pattern = zbx_dbstr($pattern);
1188				}
1189
1190				$search[] = 'UPPER('.self::fieldId($field_name, $table_alias).') LIKE '.$pattern." ESCAPE '!'";
1191			}
1192		}
1193
1194		if ($search) {
1195			$sql_parts['where'][] = ($options['searchByAny'] && count($search) > 1)
1196				? '('.implode($glue, $search).')'
1197				: implode($glue, $search);
1198		}
1199
1200		return $sql_parts;
1201	}
1202
1203	/**
1204	 * Apply filter conditions to sql built query.
1205	 *
1206	 * @param string $table_name
1207	 * @param array  $options
1208	 * @param string $table_alias
1209	 * @param array  $sql_parts
1210	 *
1211	 * @return bool
1212	 */
1213	private static function dbFilter($table_name, $options, $table_alias = null, $sql_parts) {
1214		$table_schema = self::getSchema($table_name);
1215		$filter = [];
1216
1217		foreach ($options['filter'] as $field_name => $value) {
1218			if (!array_key_exists($field_name, $table_schema['fields'])) {
1219				self::exception(self::SCHEMA_ERROR,
1220					vsprintf('%s: field "%s.%s" does not exist.', [__FUNCTION__, $table_name, $field_name])
1221				);
1222			}
1223
1224			$field_schema = $table_schema['fields'][$field_name];
1225
1226			if ($field_schema['type'] == self::FIELD_TYPE_TEXT || $field_schema['type'] == self::FIELD_TYPE_NCLOB) {
1227				self::exception(self::SCHEMA_ERROR,
1228					vsprintf('%s: field "%s.%s" has an unsupported type.', [__FUNCTION__, $table_name, $field_name])
1229				);
1230			}
1231
1232			if ($value === null) {
1233				continue;
1234			}
1235
1236			if (!is_array($value)) {
1237				$value = [$value];
1238			}
1239
1240			switch ($field_schema['type']) {
1241				case self::FIELD_TYPE_ID:
1242					$filter[] = dbConditionId(self::fieldId($field_name, $table_alias), $value);
1243					break;
1244
1245				case self::FIELD_TYPE_INT:
1246				case self::FIELD_TYPE_UINT:
1247					$filter[] = dbConditionInt(self::fieldId($field_name, $table_alias), $value);
1248					break;
1249
1250				default:
1251					$filter[] = dbConditionString(self::fieldId($field_name, $table_alias), $value);
1252			}
1253		}
1254
1255		if ($filter) {
1256			$sql_parts['where'][] = implode(' AND ', $filter);
1257		}
1258
1259		return $sql_parts;
1260	}
1261
1262	/**
1263	 * Modifies the SQL parts to implement all of the sorting related options.
1264	 *
1265	 * @param string $table_name
1266	 * @param array  $options
1267	 * @param string $table_alias
1268	 * @param array  $sql_parts
1269	 *
1270	 * @return array
1271	 */
1272	private static function applyQuerySortOptions($table_name, array $options, $table_alias = null, array $sql_parts) {
1273		$table_schema = self::getSchema($table_name);
1274
1275		foreach ($options['sortfield'] as $index => $field_name) {
1276			if (!array_key_exists($field_name, $table_schema['fields'])) {
1277				self::exception(self::SCHEMA_ERROR,
1278					vsprintf('%s: field "%s.%s" does not exist.', [__FUNCTION__, $table_name, $field_name])
1279				);
1280			}
1281
1282			$sortorder = '';
1283			if (array_key_exists($index, $options['sortorder']) && $options['sortorder'][$index] == ZBX_SORT_DOWN) {
1284				$sortorder = ' '.ZBX_SORT_DOWN;
1285			}
1286
1287			$sql_parts['order'][] = self::fieldId($field_name, $table_alias).$sortorder;
1288		}
1289
1290		return $sql_parts;
1291	}
1292}
1293