1<?php 2// This file is part of Moodle - http://moodle.org/ 3// 4// Moodle is free software: you can redistribute it and/or modify 5// it under the terms of the GNU General Public License as published by 6// the Free Software Foundation, either version 3 of the License, or 7// (at your option) any later version. 8// 9// Moodle is distributed in the hope that it will be useful, 10// but WITHOUT ANY WARRANTY; without even the implied warranty of 11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12// GNU General Public License for more details. 13// 14// You should have received a copy of the GNU General Public License 15// along with Moodle. If not, see <http://www.gnu.org/licenses/>. 16 17/** 18 * Native pgsql class representing moodle database interface. 19 * 20 * @package core_dml 21 * @copyright 2008 Petr Skoda (http://skodak.org) 22 * @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later 23 */ 24 25defined('MOODLE_INTERNAL') || die(); 26 27require_once(__DIR__.'/moodle_database.php'); 28require_once(__DIR__.'/moodle_read_slave_trait.php'); 29require_once(__DIR__.'/pgsql_native_moodle_recordset.php'); 30require_once(__DIR__.'/pgsql_native_moodle_temptables.php'); 31 32/** 33 * Native pgsql class representing moodle database interface. 34 * 35 * @package core_dml 36 * @copyright 2008 Petr Skoda (http://skodak.org) 37 * @license http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later 38 */ 39class pgsql_native_moodle_database extends moodle_database { 40 use moodle_read_slave_trait { 41 select_db_handle as read_slave_select_db_handle; 42 can_use_readonly as read_slave_can_use_readonly; 43 query_start as read_slave_query_start; 44 } 45 46 /** @var array $dbhcursor keep track of open cursors */ 47 private $dbhcursor = []; 48 49 /** @var resource $pgsql database resource */ 50 protected $pgsql = null; 51 52 protected $last_error_reporting; // To handle pgsql driver default verbosity 53 54 /** @var bool savepoint hack for MDL-35506 - workaround for automatic transaction rollback on error */ 55 protected $savepointpresent = false; 56 57 /** @var int Number of cursors used (for constructing a unique ID) */ 58 protected $cursorcount = 0; 59 60 /** @var int Default number of rows to fetch at a time when using recordsets with cursors */ 61 const DEFAULT_FETCH_BUFFER_SIZE = 100000; 62 63 /** 64 * Detects if all needed PHP stuff installed. 65 * Note: can be used before connect() 66 * @return mixed true if ok, string if something 67 */ 68 public function driver_installed() { 69 if (!extension_loaded('pgsql')) { 70 return get_string('pgsqlextensionisnotpresentinphp', 'install'); 71 } 72 return true; 73 } 74 75 /** 76 * Returns database family type - describes SQL dialect 77 * Note: can be used before connect() 78 * @return string db family name (mysql, postgres, mssql, oracle, etc.) 79 */ 80 public function get_dbfamily() { 81 return 'postgres'; 82 } 83 84 /** 85 * Returns more specific database driver type 86 * Note: can be used before connect() 87 * @return string db type mysqli, pgsql, oci, mssql, sqlsrv 88 */ 89 protected function get_dbtype() { 90 return 'pgsql'; 91 } 92 93 /** 94 * Returns general database library name 95 * Note: can be used before connect() 96 * @return string db type pdo, native 97 */ 98 protected function get_dblibrary() { 99 return 'native'; 100 } 101 102 /** 103 * Returns localised database type name 104 * Note: can be used before connect() 105 * @return string 106 */ 107 public function get_name() { 108 return get_string('nativepgsql', 'install'); 109 } 110 111 /** 112 * Returns localised database configuration help. 113 * Note: can be used before connect() 114 * @return string 115 */ 116 public function get_configuration_help() { 117 return get_string('nativepgsqlhelp', 'install'); 118 } 119 120 /** 121 * Connect to db 122 * @param string $dbhost The database host. 123 * @param string $dbuser The database username. 124 * @param string $dbpass The database username's password. 125 * @param string $dbname The name of the database being connected to. 126 * @param mixed $prefix string means moodle db prefix, false used for external databases where prefix not used 127 * @param array $dboptions driver specific options 128 * @return bool true 129 * @throws dml_connection_exception if error 130 */ 131 public function raw_connect(string $dbhost, string $dbuser, string $dbpass, string $dbname, $prefix, array $dboptions=null): bool { 132 if ($prefix == '' and !$this->external) { 133 //Enforce prefixes for everybody but mysql 134 throw new dml_exception('prefixcannotbeempty', $this->get_dbfamily()); 135 } 136 137 $driverstatus = $this->driver_installed(); 138 139 if ($driverstatus !== true) { 140 throw new dml_exception('dbdriverproblem', $driverstatus); 141 } 142 143 $this->store_settings($dbhost, $dbuser, $dbpass, $dbname, $prefix, $dboptions); 144 145 $pass = addcslashes($this->dbpass, "'\\"); 146 147 // Unix socket connections should have lower overhead 148 if (!empty($this->dboptions['dbsocket']) and ($this->dbhost === 'localhost' or $this->dbhost === '127.0.0.1')) { 149 $connection = "user='$this->dbuser' password='$pass' dbname='$this->dbname'"; 150 if (strpos($this->dboptions['dbsocket'], '/') !== false) { 151 // A directory was specified as the socket location. 152 $connection .= " host='".$this->dboptions['dbsocket']."'"; 153 } 154 if (!empty($this->dboptions['dbport'])) { 155 // A port as specified, add it to the connection as it's used as part of the socket path. 156 $connection .= " port ='".$this->dboptions['dbport']."'"; 157 } 158 } else { 159 $this->dboptions['dbsocket'] = ''; 160 if (empty($this->dbname)) { 161 // probably old style socket connection - do not add port 162 $port = ""; 163 } else if (empty($this->dboptions['dbport'])) { 164 $port = "port ='5432'"; 165 } else { 166 $port = "port ='".$this->dboptions['dbport']."'"; 167 } 168 $connection = "host='$this->dbhost' $port user='$this->dbuser' password='$pass' dbname='$this->dbname'"; 169 } 170 171 if (!empty($this->dboptions['connecttimeout'])) { 172 $connection .= " connect_timeout=".$this->dboptions['connecttimeout']; 173 } 174 175 if (empty($this->dboptions['dbhandlesoptions'])) { 176 // ALTER USER and ALTER DATABASE are overridden by these settings. 177 $options = array('--client_encoding=utf8', '--standard_conforming_strings=on'); 178 // Select schema if specified, otherwise the first one wins. 179 if (!empty($this->dboptions['dbschema'])) { 180 $options[] = "-c search_path=" . addcslashes($this->dboptions['dbschema'], "'\\"); 181 } 182 183 $connection .= " options='" . implode(' ', $options) . "'"; 184 } 185 186 ob_start(); 187 if (empty($this->dboptions['dbpersist'])) { 188 $this->pgsql = pg_connect($connection, PGSQL_CONNECT_FORCE_NEW); 189 } else { 190 $this->pgsql = pg_pconnect($connection, PGSQL_CONNECT_FORCE_NEW); 191 } 192 $dberr = ob_get_contents(); 193 ob_end_clean(); 194 195 $status = $this->pgsql ? pg_connection_status($this->pgsql) : false; 196 197 if ($status === false or $status === PGSQL_CONNECTION_BAD) { 198 $this->pgsql = null; 199 throw new dml_connection_exception($dberr); 200 } 201 202 if (!empty($this->dboptions['dbpersist'])) { 203 // There are rare situations (such as PHP out of memory errors) when open cursors may 204 // not be closed at the end of a connection. When using persistent connections, the 205 // cursors remain open and 'get in the way' of future connections. To avoid this 206 // problem, close all cursors here. 207 $result = pg_query($this->pgsql, 'CLOSE ALL'); 208 if ($result) { 209 pg_free_result($result); 210 } 211 } 212 213 if (!empty($this->dboptions['dbhandlesoptions'])) { 214 /* We don't trust people who just set the dbhandlesoptions, this code checks up on them. 215 * These functions do not talk to the server, they use the client library knowledge to determine state. 216 */ 217 if (!empty($this->dboptions['dbschema'])) { 218 throw new dml_connection_exception('You cannot specify a schema with dbhandlesoptions, use the database to set it.'); 219 } 220 if (pg_client_encoding($this->pgsql) != 'UTF8') { 221 throw new dml_connection_exception('client_encoding = UTF8 not set, it is: ' . pg_client_encoding($this->pgsql)); 222 } 223 if (pg_escape_string($this->pgsql, '\\') != '\\') { 224 throw new dml_connection_exception('standard_conforming_strings = on, must be set at the database.'); 225 } 226 } 227 228 // Connection stabilised and configured, going to instantiate the temptables controller 229 $this->temptables = new pgsql_native_moodle_temptables($this); 230 231 return true; 232 } 233 234 /** 235 * Close database connection and release all resources 236 * and memory (especially circular memory references). 237 * Do NOT use connect() again, create a new instance if needed. 238 */ 239 public function dispose() { 240 parent::dispose(); // Call parent dispose to write/close session and other common stuff before closing connection 241 if ($this->pgsql) { 242 pg_close($this->pgsql); 243 $this->pgsql = null; 244 } 245 } 246 247 /** 248 * Gets db handle currently used with queries 249 * @return resource 250 */ 251 protected function get_db_handle() { 252 return $this->pgsql; 253 } 254 255 /** 256 * Sets db handle to be used with subsequent queries 257 * @param resource $dbh 258 * @return void 259 */ 260 protected function set_db_handle($dbh): void { 261 $this->pgsql = $dbh; 262 } 263 264 /** 265 * Select appropriate db handle - readwrite or readonly 266 * @param int $type type of query 267 * @param string $sql 268 * @return void 269 */ 270 protected function select_db_handle(int $type, string $sql): void { 271 $this->read_slave_select_db_handle($type, $sql); 272 273 if (preg_match('/^DECLARE (crs\w*) NO SCROLL CURSOR/', $sql, $match)) { 274 $cursor = $match[1]; 275 $this->dbhcursor[$cursor] = $this->pgsql; 276 } 277 if (preg_match('/^(?:FETCH \d+ FROM|CLOSE) (crs\w*)\b/', $sql, $match)) { 278 $cursor = $match[1]; 279 $this->pgsql = $this->dbhcursor[$cursor]; 280 } 281 } 282 283 /** 284 * Check if The query qualifies for readonly connection execution 285 * Logging queries are exempt, those are write operations that circumvent 286 * standard query_start/query_end paths. 287 * @param int $type type of query 288 * @param string $sql 289 * @return bool 290 */ 291 protected function can_use_readonly(int $type, string $sql): bool { 292 // ... pg_*lock queries always go to master. 293 if (preg_match('/\bpg_\w*lock/', $sql)) { 294 return false; 295 } 296 297 // ... a nuisance - temptables use this. 298 if (preg_match('/\bpg_catalog/', $sql) && $this->temptables->get_temptables()) { 299 return false; 300 } 301 302 return $this->read_slave_can_use_readonly($type, $sql); 303 304 } 305 306 /** 307 * Called before each db query. 308 * @param string $sql 309 * @param array array of parameters 310 * @param int $type type of query 311 * @param mixed $extrainfo driver specific extra information 312 * @return void 313 */ 314 protected function query_start($sql, array $params=null, $type, $extrainfo=null) { 315 $this->read_slave_query_start($sql, $params, $type, $extrainfo); 316 // pgsql driver tends to send debug to output, we do not need that. 317 $this->last_error_reporting = error_reporting(0); 318 } 319 320 /** 321 * Called immediately after each db query. 322 * @param mixed db specific result 323 * @return void 324 */ 325 protected function query_end($result) { 326 // reset original debug level 327 error_reporting($this->last_error_reporting); 328 try { 329 parent::query_end($result); 330 if ($this->savepointpresent and $this->last_type != SQL_QUERY_AUX and $this->last_type != SQL_QUERY_SELECT) { 331 $res = @pg_query($this->pgsql, "RELEASE SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint"); 332 if ($res) { 333 pg_free_result($res); 334 } 335 } 336 } catch (Exception $e) { 337 if ($this->savepointpresent) { 338 $res = @pg_query($this->pgsql, "ROLLBACK TO SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint"); 339 if ($res) { 340 pg_free_result($res); 341 } 342 } 343 throw $e; 344 } 345 } 346 347 /** 348 * Returns database server info array 349 * @return array Array containing 'description' and 'version' info 350 */ 351 public function get_server_info() { 352 static $info; 353 if (!$info) { 354 $this->query_start("--pg_version()", null, SQL_QUERY_AUX); 355 $info = pg_version($this->pgsql); 356 $this->query_end(true); 357 } 358 return array('description'=>$info['server'], 'version'=>$info['server']); 359 } 360 361 /** 362 * Returns supported query parameter types 363 * @return int bitmask of accepted SQL_PARAMS_* 364 */ 365 protected function allowed_param_types() { 366 return SQL_PARAMS_DOLLAR; 367 } 368 369 /** 370 * Returns last error reported by database engine. 371 * @return string error message 372 */ 373 public function get_last_error() { 374 return pg_last_error($this->pgsql); 375 } 376 377 /** 378 * Return tables in database WITHOUT current prefix. 379 * @param bool $usecache if true, returns list of cached tables. 380 * @return array of table names in lowercase and without prefix 381 */ 382 public function get_tables($usecache=true) { 383 if ($usecache and $this->tables !== null) { 384 return $this->tables; 385 } 386 $this->tables = array(); 387 $prefix = str_replace('_', '|_', $this->prefix); 388 $sql = "SELECT c.relname 389 FROM pg_catalog.pg_class c 390 JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace 391 WHERE c.relname LIKE '$prefix%' ESCAPE '|' 392 AND c.relkind = 'r' 393 AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())"; 394 $this->query_start($sql, null, SQL_QUERY_AUX); 395 $result = pg_query($this->pgsql, $sql); 396 $this->query_end($result); 397 398 if ($result) { 399 while ($row = pg_fetch_row($result)) { 400 $tablename = reset($row); 401 if ($this->prefix !== false && $this->prefix !== '') { 402 if (strpos($tablename, $this->prefix) !== 0) { 403 continue; 404 } 405 $tablename = substr($tablename, strlen($this->prefix)); 406 } 407 $this->tables[$tablename] = $tablename; 408 } 409 pg_free_result($result); 410 } 411 return $this->tables; 412 } 413 414 /** 415 * Constructs 'IN()' or '=' sql fragment 416 * 417 * Method overriding {@see moodle_database::get_in_or_equal} to be able to use 418 * more than 65535 elements in $items array. 419 * 420 * @param mixed $items A single value or array of values for the expression. 421 * @param int $type Parameter bounding type : SQL_PARAMS_QM or SQL_PARAMS_NAMED. 422 * @param string $prefix Named parameter placeholder prefix (a unique counter value is appended to each parameter name). 423 * @param bool $equal True means we want to equate to the constructed expression, false means we don't want to equate to it. 424 * @param mixed $onemptyitems This defines the behavior when the array of items provided is empty. Defaults to false, 425 * meaning throw exceptions. Other values will become part of the returned SQL fragment. 426 * @throws coding_exception | dml_exception 427 * @return array A list containing the constructed sql fragment and an array of parameters. 428 */ 429 public function get_in_or_equal($items, $type=SQL_PARAMS_QM, $prefix='param', $equal=true, $onemptyitems=false): array { 430 // We only interfere if number of items in expression exceeds 16 bit value. 431 if (!is_array($items) || count($items) < 65535) { 432 return parent::get_in_or_equal($items, $type, $prefix, $equal, $onemptyitems); 433 } 434 435 // Determine the type from the first value. We don't need to be very smart here, 436 // it is developer's responsibility to make sure that variable type is matching 437 // field type, if not the case, DB engine will hint. Also mixing types won't work 438 // here anyway, so we ignore NULL or boolean (unlikely you need 56k values of 439 // these types only). 440 $cast = is_string(current($items)) ? '::text' : '::bigint'; 441 442 if ($type == SQL_PARAMS_QM) { 443 if ($equal) { 444 $sql = 'IN (VALUES ('.implode('),(', array_fill(0, count($items), '?'.$cast)).'))'; 445 } else { 446 $sql = 'NOT IN (VALUES ('.implode('),(', array_fill(0, count($items), '?'.$cast)).'))'; 447 } 448 $params = array_values($items); 449 } else if ($type == SQL_PARAMS_NAMED) { 450 if (empty($prefix)) { 451 $prefix = 'param'; 452 } 453 $params = []; 454 $sql = []; 455 foreach ($items as $item) { 456 $param = $prefix.$this->inorequaluniqueindex++; 457 $params[$param] = $item; 458 $sql[] = ':'.$param.$cast; 459 } 460 if ($equal) { 461 $sql = 'IN (VALUES ('.implode('),(', $sql).'))'; 462 } else { 463 $sql = 'NOT IN (VALUES ('.implode('),(', $sql).'))'; 464 } 465 } else { 466 throw new dml_exception('typenotimplement'); 467 } 468 return [$sql, $params]; 469 } 470 471 /** 472 * Return table indexes - everything lowercased. 473 * @param string $table The table we want to get indexes from. 474 * @return array of arrays 475 */ 476 public function get_indexes($table) { 477 $indexes = array(); 478 $tablename = $this->prefix.$table; 479 480 $sql = "SELECT i.* 481 FROM pg_catalog.pg_indexes i 482 JOIN pg_catalog.pg_namespace as ns ON ns.nspname = i.schemaname 483 WHERE i.tablename = '$tablename' 484 AND (i.schemaname = current_schema() OR ns.oid = pg_my_temp_schema())"; 485 486 $this->query_start($sql, null, SQL_QUERY_AUX); 487 $result = pg_query($this->pgsql, $sql); 488 $this->query_end($result); 489 490 if ($result) { 491 while ($row = pg_fetch_assoc($result)) { 492 // The index definition could be generated schema-qualifying the target table name 493 // for safety, depending on the pgsql version (CVE-2018-1058). 494 if (!preg_match('/CREATE (|UNIQUE )INDEX ([^\s]+) ON (|'.$row['schemaname'].'\.)'.$tablename.' USING ([^\s]+) \(([^\)]+)\)/i', $row['indexdef'], $matches)) { 495 continue; 496 } 497 if ($matches[5] === 'id') { 498 continue; 499 } 500 $columns = explode(',', $matches[5]); 501 foreach ($columns as $k=>$column) { 502 $column = trim($column); 503 if ($pos = strpos($column, ' ')) { 504 // index type is separated by space 505 $column = substr($column, 0, $pos); 506 } 507 $columns[$k] = $this->trim_quotes($column); 508 } 509 $indexes[$row['indexname']] = array('unique'=>!empty($matches[1]), 510 'columns'=>$columns); 511 } 512 pg_free_result($result); 513 } 514 return $indexes; 515 } 516 517 /** 518 * Returns detailed information about columns in table. 519 * 520 * @param string $table name 521 * @return database_column_info[] array of database_column_info objects indexed with column names 522 */ 523 protected function fetch_columns(string $table): array { 524 $structure = array(); 525 526 $tablename = $this->prefix.$table; 527 528 $sql = "SELECT a.attnum, a.attname AS field, t.typname AS type, a.attlen, a.atttypmod, a.attnotnull, a.atthasdef, 529 CASE WHEN a.atthasdef THEN pg_catalog.pg_get_expr(d.adbin, d.adrelid) END AS adsrc 530 FROM pg_catalog.pg_class c 531 JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace 532 JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid 533 JOIN pg_catalog.pg_type t ON t.oid = a.atttypid 534 LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = c.oid AND d.adnum = a.attnum) 535 WHERE relkind = 'r' AND c.relname = '$tablename' AND c.reltype > 0 AND a.attnum > 0 536 AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema()) 537 ORDER BY a.attnum"; 538 539 $this->query_start($sql, null, SQL_QUERY_AUX); 540 $result = pg_query($this->pgsql, $sql); 541 $this->query_end($result); 542 543 if (!$result) { 544 return array(); 545 } 546 while ($rawcolumn = pg_fetch_object($result)) { 547 548 $info = new stdClass(); 549 $info->name = $rawcolumn->field; 550 $matches = null; 551 552 if ($rawcolumn->type === 'varchar') { 553 $info->type = 'varchar'; 554 $info->meta_type = 'C'; 555 $info->max_length = $rawcolumn->atttypmod - 4; 556 $info->scale = null; 557 $info->not_null = ($rawcolumn->attnotnull === 't'); 558 $info->has_default = ($rawcolumn->atthasdef === 't'); 559 if ($info->has_default) { 560 $parts = explode('::', $rawcolumn->adsrc); 561 if (count($parts) > 1) { 562 $info->default_value = reset($parts); 563 $info->default_value = trim($info->default_value, "'"); 564 } else { 565 $info->default_value = $rawcolumn->adsrc; 566 } 567 } else { 568 $info->default_value = null; 569 } 570 $info->primary_key = false; 571 $info->binary = false; 572 $info->unsigned = null; 573 $info->auto_increment= false; 574 $info->unique = null; 575 576 } else if (preg_match('/int(\d)/i', $rawcolumn->type, $matches)) { 577 $info->type = 'int'; 578 if (strpos($rawcolumn->adsrc, 'nextval') === 0) { 579 $info->primary_key = true; 580 $info->meta_type = 'R'; 581 $info->unique = true; 582 $info->auto_increment= true; 583 $info->has_default = false; 584 } else { 585 $info->primary_key = false; 586 $info->meta_type = 'I'; 587 $info->unique = null; 588 $info->auto_increment= false; 589 $info->has_default = ($rawcolumn->atthasdef === 't'); 590 } 591 // Return number of decimals, not bytes here. 592 if ($matches[1] >= 8) { 593 $info->max_length = 18; 594 } else if ($matches[1] >= 4) { 595 $info->max_length = 9; 596 } else if ($matches[1] >= 2) { 597 $info->max_length = 4; 598 } else if ($matches[1] >= 1) { 599 $info->max_length = 2; 600 } else { 601 $info->max_length = 0; 602 } 603 $info->scale = null; 604 $info->not_null = ($rawcolumn->attnotnull === 't'); 605 if ($info->has_default) { 606 // PG 9.5+ uses ::<TYPE> syntax for some defaults. 607 $parts = explode('::', $rawcolumn->adsrc); 608 if (count($parts) > 1) { 609 $info->default_value = reset($parts); 610 } else { 611 $info->default_value = $rawcolumn->adsrc; 612 } 613 $info->default_value = trim($info->default_value, "()'"); 614 } else { 615 $info->default_value = null; 616 } 617 $info->binary = false; 618 $info->unsigned = false; 619 620 } else if ($rawcolumn->type === 'numeric') { 621 $info->type = $rawcolumn->type; 622 $info->meta_type = 'N'; 623 $info->primary_key = false; 624 $info->binary = false; 625 $info->unsigned = null; 626 $info->auto_increment= false; 627 $info->unique = null; 628 $info->not_null = ($rawcolumn->attnotnull === 't'); 629 $info->has_default = ($rawcolumn->atthasdef === 't'); 630 if ($info->has_default) { 631 // PG 9.5+ uses ::<TYPE> syntax for some defaults. 632 $parts = explode('::', $rawcolumn->adsrc); 633 if (count($parts) > 1) { 634 $info->default_value = reset($parts); 635 } else { 636 $info->default_value = $rawcolumn->adsrc; 637 } 638 $info->default_value = trim($info->default_value, "()'"); 639 } else { 640 $info->default_value = null; 641 } 642 $info->max_length = $rawcolumn->atttypmod >> 16; 643 $info->scale = ($rawcolumn->atttypmod & 0xFFFF) - 4; 644 645 } else if (preg_match('/float(\d)/i', $rawcolumn->type, $matches)) { 646 $info->type = 'float'; 647 $info->meta_type = 'N'; 648 $info->primary_key = false; 649 $info->binary = false; 650 $info->unsigned = null; 651 $info->auto_increment= false; 652 $info->unique = null; 653 $info->not_null = ($rawcolumn->attnotnull === 't'); 654 $info->has_default = ($rawcolumn->atthasdef === 't'); 655 if ($info->has_default) { 656 // PG 9.5+ uses ::<TYPE> syntax for some defaults. 657 $parts = explode('::', $rawcolumn->adsrc); 658 if (count($parts) > 1) { 659 $info->default_value = reset($parts); 660 } else { 661 $info->default_value = $rawcolumn->adsrc; 662 } 663 $info->default_value = trim($info->default_value, "()'"); 664 } else { 665 $info->default_value = null; 666 } 667 // just guess expected number of deciaml places :-( 668 if ($matches[1] == 8) { 669 // total 15 digits 670 $info->max_length = 8; 671 $info->scale = 7; 672 } else { 673 // total 6 digits 674 $info->max_length = 4; 675 $info->scale = 2; 676 } 677 678 } else if ($rawcolumn->type === 'text') { 679 $info->type = $rawcolumn->type; 680 $info->meta_type = 'X'; 681 $info->max_length = -1; 682 $info->scale = null; 683 $info->not_null = ($rawcolumn->attnotnull === 't'); 684 $info->has_default = ($rawcolumn->atthasdef === 't'); 685 if ($info->has_default) { 686 $parts = explode('::', $rawcolumn->adsrc); 687 if (count($parts) > 1) { 688 $info->default_value = reset($parts); 689 $info->default_value = trim($info->default_value, "'"); 690 } else { 691 $info->default_value = $rawcolumn->adsrc; 692 } 693 } else { 694 $info->default_value = null; 695 } 696 $info->primary_key = false; 697 $info->binary = false; 698 $info->unsigned = null; 699 $info->auto_increment= false; 700 $info->unique = null; 701 702 } else if ($rawcolumn->type === 'bytea') { 703 $info->type = $rawcolumn->type; 704 $info->meta_type = 'B'; 705 $info->max_length = -1; 706 $info->scale = null; 707 $info->not_null = ($rawcolumn->attnotnull === 't'); 708 $info->has_default = false; 709 $info->default_value = null; 710 $info->primary_key = false; 711 $info->binary = true; 712 $info->unsigned = null; 713 $info->auto_increment= false; 714 $info->unique = null; 715 716 } 717 718 $structure[$info->name] = new database_column_info($info); 719 } 720 721 pg_free_result($result); 722 723 return $structure; 724 } 725 726 /** 727 * Normalise values based in RDBMS dependencies (booleans, LOBs...) 728 * 729 * @param database_column_info $column column metadata corresponding with the value we are going to normalise 730 * @param mixed $value value we are going to normalise 731 * @return mixed the normalised value 732 */ 733 protected function normalise_value($column, $value) { 734 $this->detect_objects($value); 735 736 if (is_bool($value)) { // Always, convert boolean to int 737 $value = (int)$value; 738 739 } else if ($column->meta_type === 'B') { 740 if (!is_null($value)) { 741 // standard_conforming_strings must be enabled, otherwise pg_escape_bytea() will double escape 742 // \ and produce data errors. This is set on the connection. 743 $value = pg_escape_bytea($this->pgsql, $value); 744 } 745 746 } else if ($value === '') { 747 if ($column->meta_type === 'I' or $column->meta_type === 'F' or $column->meta_type === 'N') { 748 $value = 0; // prevent '' problems in numeric fields 749 } 750 } 751 return $value; 752 } 753 754 /** 755 * Is db in unicode mode? 756 * @return bool 757 */ 758 public function setup_is_unicodedb() { 759 // Get PostgreSQL server_encoding value 760 $sql = "SHOW server_encoding"; 761 $this->query_start($sql, null, SQL_QUERY_AUX); 762 $result = pg_query($this->pgsql, $sql); 763 $this->query_end($result); 764 765 if (!$result) { 766 return false; 767 } 768 $rawcolumn = pg_fetch_object($result); 769 $encoding = $rawcolumn->server_encoding; 770 pg_free_result($result); 771 772 return (strtoupper($encoding) == 'UNICODE' || strtoupper($encoding) == 'UTF8'); 773 } 774 775 /** 776 * Do NOT use in code, to be used by database_manager only! 777 * @param string|array $sql query 778 * @param array|null $tablenames an array of xmldb table names affected by this request. 779 * @return bool true 780 * @throws ddl_change_structure_exception A DDL specific exception is thrown for any errors. 781 */ 782 public function change_database_structure($sql, $tablenames = null) { 783 $this->get_manager(); // Includes DDL exceptions classes ;-) 784 if (is_array($sql)) { 785 $sql = implode("\n;\n", $sql); 786 } 787 if (!$this->is_transaction_started()) { 788 // It is better to do all or nothing, this helps with recovery... 789 $sql = "BEGIN ISOLATION LEVEL SERIALIZABLE;\n$sql\n; COMMIT"; 790 } 791 792 try { 793 $this->query_start($sql, null, SQL_QUERY_STRUCTURE); 794 $result = pg_query($this->pgsql, $sql); 795 $this->query_end($result); 796 pg_free_result($result); 797 } catch (ddl_change_structure_exception $e) { 798 if (!$this->is_transaction_started()) { 799 $result = @pg_query($this->pgsql, "ROLLBACK"); 800 @pg_free_result($result); 801 } 802 $this->reset_caches($tablenames); 803 throw $e; 804 } 805 806 $this->reset_caches($tablenames); 807 return true; 808 } 809 810 /** 811 * Execute general sql query. Should be used only when no other method suitable. 812 * Do NOT use this to make changes in db structure, use database_manager methods instead! 813 * @param string $sql query 814 * @param array $params query parameters 815 * @return bool true 816 * @throws dml_exception A DML specific exception is thrown for any errors. 817 */ 818 public function execute($sql, array $params=null) { 819 list($sql, $params, $type) = $this->fix_sql_params($sql, $params); 820 821 if (strpos($sql, ';') !== false) { 822 throw new coding_exception('moodle_database::execute() Multiple sql statements found or bound parameters not used properly in query!'); 823 } 824 825 $this->query_start($sql, $params, SQL_QUERY_UPDATE); 826 $result = pg_query_params($this->pgsql, $sql, $params); 827 $this->query_end($result); 828 829 pg_free_result($result); 830 return true; 831 } 832 833 /** 834 * Get a number of records as a moodle_recordset using a SQL statement. 835 * 836 * Since this method is a little less readable, use of it should be restricted to 837 * code where it's possible there might be large datasets being returned. For known 838 * small datasets use get_records_sql - it leads to simpler code. 839 * 840 * The return type is like: 841 * @see function get_recordset. 842 * 843 * @param string $sql the SQL select query to execute. 844 * @param array $params array of sql parameters 845 * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set). 846 * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set). 847 * @return moodle_recordset instance 848 * @throws dml_exception A DML specific exception is thrown for any errors. 849 */ 850 public function get_recordset_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) { 851 852 list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum); 853 854 if ($limitnum) { 855 $sql .= " LIMIT $limitnum"; 856 } 857 if ($limitfrom) { 858 $sql .= " OFFSET $limitfrom"; 859 } 860 861 list($sql, $params, $type) = $this->fix_sql_params($sql, $params); 862 863 // For any query that doesn't explicitly specify a limit, we must use cursors to stop it 864 // loading the entire thing (unless the config setting is turned off). 865 $usecursors = !$limitnum && ($this->get_fetch_buffer_size() > 0); 866 if ($usecursors) { 867 // Work out the cursor unique identifer. This is based on a simple count used which 868 // should be OK because the identifiers only need to be unique within the current 869 // transaction. 870 $this->cursorcount++; 871 $cursorname = 'crs' . $this->cursorcount; 872 873 // Do the query to a cursor. 874 $sql = 'DECLARE ' . $cursorname . ' NO SCROLL CURSOR WITH HOLD FOR ' . $sql; 875 } else { 876 $cursorname = ''; 877 } 878 879 $this->query_start($sql, $params, SQL_QUERY_SELECT); 880 881 $result = pg_query_params($this->pgsql, $sql, $params); 882 883 $this->query_end($result); 884 if ($usecursors) { 885 pg_free_result($result); 886 $result = null; 887 } 888 889 return new pgsql_native_moodle_recordset($result, $this, $cursorname); 890 } 891 892 /** 893 * Gets size of fetch buffer used for recordset queries. 894 * 895 * If this returns 0 then cursors will not be used, meaning recordset queries will occupy enough 896 * memory as needed for the Postgres library to hold the entire query results in memory. 897 * 898 * @return int Fetch buffer size or 0 indicating not to use cursors 899 */ 900 protected function get_fetch_buffer_size() { 901 if (array_key_exists('fetchbuffersize', $this->dboptions)) { 902 return (int)$this->dboptions['fetchbuffersize']; 903 } else { 904 return self::DEFAULT_FETCH_BUFFER_SIZE; 905 } 906 } 907 908 /** 909 * Retrieves data from cursor. For use by recordset only; do not call directly. 910 * 911 * Return value contains the next batch of Postgres data, and a boolean indicating if this is 912 * definitely the last batch (if false, there may be more) 913 * 914 * @param string $cursorname Name of cursor to read from 915 * @return array Array with 2 elements (next data batch and boolean indicating last batch) 916 */ 917 public function fetch_from_cursor($cursorname) { 918 $count = $this->get_fetch_buffer_size(); 919 920 $sql = 'FETCH ' . $count . ' FROM ' . $cursorname; 921 922 $this->query_start($sql, [], SQL_QUERY_AUX); 923 $result = pg_query($this->pgsql, $sql); 924 $last = pg_num_rows($result) !== $count; 925 926 $this->query_end($result); 927 928 return [$result, $last]; 929 } 930 931 /** 932 * Closes a cursor. For use by recordset only; do not call directly. 933 * 934 * @param string $cursorname Name of cursor to close 935 * @return bool True if we actually closed one, false if the transaction was cancelled 936 */ 937 public function close_cursor($cursorname) { 938 // If the transaction got cancelled, then ignore this request. 939 $sql = 'CLOSE ' . $cursorname; 940 $this->query_start($sql, [], SQL_QUERY_AUX); 941 $result = pg_query($this->pgsql, $sql); 942 $this->query_end($result); 943 if ($result) { 944 pg_free_result($result); 945 } 946 return true; 947 } 948 949 /** 950 * Get a number of records as an array of objects using a SQL statement. 951 * 952 * Return value is like: 953 * @see function get_records. 954 * 955 * @param string $sql the SQL select query to execute. The first column of this SELECT statement 956 * must be a unique value (usually the 'id' field), as it will be used as the key of the 957 * returned array. 958 * @param array $params array of sql parameters 959 * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set). 960 * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set). 961 * @return array of objects, or empty array if no records were found 962 * @throws dml_exception A DML specific exception is thrown for any errors. 963 */ 964 public function get_records_sql($sql, array $params = null, $limitfrom = 0, $limitnum = 0) { 965 list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum); 966 967 if ($limitnum) { 968 $sql .= " LIMIT $limitnum"; 969 } 970 if ($limitfrom) { 971 $sql .= " OFFSET $limitfrom"; 972 } 973 974 list($sql, $params, $type) = $this->fix_sql_params($sql, $params); 975 $this->query_start($sql, $params, SQL_QUERY_SELECT); 976 $result = pg_query_params($this->pgsql, $sql, $params); 977 $this->query_end($result); 978 979 // find out if there are any blobs 980 $numfields = pg_num_fields($result); 981 $blobs = array(); 982 for ($i = 0; $i < $numfields; $i++) { 983 $type = pg_field_type($result, $i); 984 if ($type == 'bytea') { 985 $blobs[] = pg_field_name($result, $i); 986 } 987 } 988 989 $return = []; 990 while ($row = pg_fetch_assoc($result)) { 991 $id = reset($row); 992 if ($blobs) { 993 foreach ($blobs as $blob) { 994 $row[$blob] = ($row[$blob] !== null ? pg_unescape_bytea($row[$blob]) : null); 995 } 996 } 997 if (isset($return[$id])) { 998 $colname = key($row); 999 debugging("Did you remember to make the first column something unique in your call to get_records? Duplicate value '$id' found in column '$colname'.", DEBUG_DEVELOPER); 1000 } 1001 $return[$id] = (object) $row; 1002 } 1003 1004 return $return; 1005 } 1006 1007 /** 1008 * Selects records and return values (first field) as an array using a SQL statement. 1009 * 1010 * @param string $sql The SQL query 1011 * @param array $params array of sql parameters 1012 * @return array of values 1013 * @throws dml_exception A DML specific exception is thrown for any errors. 1014 */ 1015 public function get_fieldset_sql($sql, array $params=null) { 1016 list($sql, $params, $type) = $this->fix_sql_params($sql, $params); 1017 1018 $this->query_start($sql, $params, SQL_QUERY_SELECT); 1019 $result = pg_query_params($this->pgsql, $sql, $params); 1020 $this->query_end($result); 1021 1022 $return = pg_fetch_all_columns($result, 0); 1023 1024 if (pg_field_type($result, 0) == 'bytea') { 1025 foreach ($return as $key => $value) { 1026 $return[$key] = ($value === null ? $value : pg_unescape_bytea($value)); 1027 } 1028 } 1029 1030 pg_free_result($result); 1031 1032 return $return; 1033 } 1034 1035 /** 1036 * Insert new record into database, as fast as possible, no safety checks, lobs not supported. 1037 * @param string $table name 1038 * @param mixed $params data record as object or array 1039 * @param bool $returnit return it of inserted record 1040 * @param bool $bulk true means repeated inserts expected 1041 * @param bool $customsequence true if 'id' included in $params, disables $returnid 1042 * @return bool|int true or new id 1043 * @throws dml_exception A DML specific exception is thrown for any errors. 1044 */ 1045 public function insert_record_raw($table, $params, $returnid=true, $bulk=false, $customsequence=false) { 1046 if (!is_array($params)) { 1047 $params = (array)$params; 1048 } 1049 1050 $returning = ""; 1051 1052 if ($customsequence) { 1053 if (!isset($params['id'])) { 1054 throw new coding_exception('moodle_database::insert_record_raw() id field must be specified if custom sequences used.'); 1055 } 1056 $returnid = false; 1057 } else { 1058 if ($returnid) { 1059 $returning = "RETURNING id"; 1060 unset($params['id']); 1061 } else { 1062 unset($params['id']); 1063 } 1064 } 1065 1066 if (empty($params)) { 1067 throw new coding_exception('moodle_database::insert_record_raw() no fields found.'); 1068 } 1069 1070 $fields = implode(',', array_keys($params)); 1071 $values = array(); 1072 $i = 1; 1073 foreach ($params as $value) { 1074 $this->detect_objects($value); 1075 $values[] = "\$".$i++; 1076 } 1077 $values = implode(',', $values); 1078 1079 $sql = "INSERT INTO {$this->prefix}$table ($fields) VALUES($values) $returning"; 1080 $this->query_start($sql, $params, SQL_QUERY_INSERT); 1081 $result = pg_query_params($this->pgsql, $sql, $params); 1082 $this->query_end($result); 1083 1084 if ($returning !== "") { 1085 $row = pg_fetch_assoc($result); 1086 $params['id'] = reset($row); 1087 } 1088 pg_free_result($result); 1089 1090 if (!$returnid) { 1091 return true; 1092 } 1093 1094 return (int)$params['id']; 1095 } 1096 1097 /** 1098 * Insert a record into a table and return the "id" field if required. 1099 * 1100 * Some conversions and safety checks are carried out. Lobs are supported. 1101 * If the return ID isn't required, then this just reports success as true/false. 1102 * $data is an object containing needed data 1103 * @param string $table The database table to be inserted into 1104 * @param object|array $dataobject A data object with values for one or more fields in the record 1105 * @param bool $returnid Should the id of the newly created record entry be returned? If this option is not requested then true/false is returned. 1106 * @return bool|int true or new id 1107 * @throws dml_exception A DML specific exception is thrown for any errors. 1108 */ 1109 public function insert_record($table, $dataobject, $returnid=true, $bulk=false) { 1110 $dataobject = (array)$dataobject; 1111 1112 $columns = $this->get_columns($table); 1113 if (empty($columns)) { 1114 throw new dml_exception('ddltablenotexist', $table); 1115 } 1116 1117 $cleaned = array(); 1118 1119 foreach ($dataobject as $field=>$value) { 1120 if ($field === 'id') { 1121 continue; 1122 } 1123 if (!isset($columns[$field])) { 1124 continue; 1125 } 1126 $column = $columns[$field]; 1127 $cleaned[$field] = $this->normalise_value($column, $value); 1128 } 1129 1130 return $this->insert_record_raw($table, $cleaned, $returnid, $bulk); 1131 1132 } 1133 1134 /** 1135 * Insert multiple records into database as fast as possible. 1136 * 1137 * Order of inserts is maintained, but the operation is not atomic, 1138 * use transactions if necessary. 1139 * 1140 * This method is intended for inserting of large number of small objects, 1141 * do not use for huge objects with text or binary fields. 1142 * 1143 * @since Moodle 2.7 1144 * 1145 * @param string $table The database table to be inserted into 1146 * @param array|Traversable $dataobjects list of objects to be inserted, must be compatible with foreach 1147 * @return void does not return new record ids 1148 * 1149 * @throws coding_exception if data objects have different structure 1150 * @throws dml_exception A DML specific exception is thrown for any errors. 1151 */ 1152 public function insert_records($table, $dataobjects) { 1153 if (!is_array($dataobjects) and !($dataobjects instanceof Traversable)) { 1154 throw new coding_exception('insert_records() passed non-traversable object'); 1155 } 1156 1157 // PostgreSQL does not seem to have problems with huge queries. 1158 $chunksize = 500; 1159 if (!empty($this->dboptions['bulkinsertsize'])) { 1160 $chunksize = (int)$this->dboptions['bulkinsertsize']; 1161 } 1162 1163 $columns = $this->get_columns($table, true); 1164 1165 $fields = null; 1166 $count = 0; 1167 $chunk = array(); 1168 foreach ($dataobjects as $dataobject) { 1169 if (!is_array($dataobject) and !is_object($dataobject)) { 1170 throw new coding_exception('insert_records() passed invalid record object'); 1171 } 1172 $dataobject = (array)$dataobject; 1173 if ($fields === null) { 1174 $fields = array_keys($dataobject); 1175 $columns = array_intersect_key($columns, $dataobject); 1176 unset($columns['id']); 1177 } else if ($fields !== array_keys($dataobject)) { 1178 throw new coding_exception('All dataobjects in insert_records() must have the same structure!'); 1179 } 1180 1181 $count++; 1182 $chunk[] = $dataobject; 1183 1184 if ($count === $chunksize) { 1185 $this->insert_chunk($table, $chunk, $columns); 1186 $chunk = array(); 1187 $count = 0; 1188 } 1189 } 1190 1191 if ($count) { 1192 $this->insert_chunk($table, $chunk, $columns); 1193 } 1194 } 1195 1196 /** 1197 * Insert records in chunks, strict param types... 1198 * 1199 * Note: can be used only from insert_records(). 1200 * 1201 * @param string $table 1202 * @param array $chunk 1203 * @param database_column_info[] $columns 1204 */ 1205 protected function insert_chunk($table, array $chunk, array $columns) { 1206 $i = 1; 1207 $params = array(); 1208 $values = array(); 1209 foreach ($chunk as $dataobject) { 1210 $vals = array(); 1211 foreach ($columns as $field => $column) { 1212 $params[] = $this->normalise_value($column, $dataobject[$field]); 1213 $vals[] = "\$".$i++; 1214 } 1215 $values[] = '('.implode(',', $vals).')'; 1216 } 1217 1218 $fieldssql = '('.implode(',', array_keys($columns)).')'; 1219 $valuessql = implode(',', $values); 1220 1221 $sql = "INSERT INTO {$this->prefix}$table $fieldssql VALUES $valuessql"; 1222 $this->query_start($sql, $params, SQL_QUERY_INSERT); 1223 $result = pg_query_params($this->pgsql, $sql, $params); 1224 $this->query_end($result); 1225 pg_free_result($result); 1226 } 1227 1228 /** 1229 * Import a record into a table, id field is required. 1230 * Safety checks are NOT carried out. Lobs are supported. 1231 * 1232 * @param string $table name of database table to be inserted into 1233 * @param object $dataobject A data object with values for one or more fields in the record 1234 * @return bool true 1235 * @throws dml_exception A DML specific exception is thrown for any errors. 1236 */ 1237 public function import_record($table, $dataobject) { 1238 $dataobject = (array)$dataobject; 1239 1240 $columns = $this->get_columns($table); 1241 $cleaned = array(); 1242 1243 foreach ($dataobject as $field=>$value) { 1244 $this->detect_objects($value); 1245 if (!isset($columns[$field])) { 1246 continue; 1247 } 1248 $column = $columns[$field]; 1249 $cleaned[$field] = $this->normalise_value($column, $value); 1250 } 1251 1252 return $this->insert_record_raw($table, $cleaned, false, true, true); 1253 } 1254 1255 /** 1256 * Update record in database, as fast as possible, no safety checks, lobs not supported. 1257 * @param string $table name 1258 * @param mixed $params data record as object or array 1259 * @param bool true means repeated updates expected 1260 * @return bool true 1261 * @throws dml_exception A DML specific exception is thrown for any errors. 1262 */ 1263 public function update_record_raw($table, $params, $bulk=false) { 1264 $params = (array)$params; 1265 1266 if (!isset($params['id'])) { 1267 throw new coding_exception('moodle_database::update_record_raw() id field must be specified.'); 1268 } 1269 $id = $params['id']; 1270 unset($params['id']); 1271 1272 if (empty($params)) { 1273 throw new coding_exception('moodle_database::update_record_raw() no fields found.'); 1274 } 1275 1276 $i = 1; 1277 1278 $sets = array(); 1279 foreach ($params as $field=>$value) { 1280 $this->detect_objects($value); 1281 $sets[] = "$field = \$".$i++; 1282 } 1283 1284 $params[] = $id; // last ? in WHERE condition 1285 1286 $sets = implode(',', $sets); 1287 $sql = "UPDATE {$this->prefix}$table SET $sets WHERE id=\$".$i; 1288 1289 $this->query_start($sql, $params, SQL_QUERY_UPDATE); 1290 $result = pg_query_params($this->pgsql, $sql, $params); 1291 $this->query_end($result); 1292 1293 pg_free_result($result); 1294 return true; 1295 } 1296 1297 /** 1298 * Update a record in a table 1299 * 1300 * $dataobject is an object containing needed data 1301 * Relies on $dataobject having a variable "id" to 1302 * specify the record to update 1303 * 1304 * @param string $table The database table to be checked against. 1305 * @param object $dataobject An object with contents equal to fieldname=>fieldvalue. Must have an entry for 'id' to map to the table specified. 1306 * @param bool true means repeated updates expected 1307 * @return bool true 1308 * @throws dml_exception A DML specific exception is thrown for any errors. 1309 */ 1310 public function update_record($table, $dataobject, $bulk=false) { 1311 $dataobject = (array)$dataobject; 1312 1313 $columns = $this->get_columns($table); 1314 $cleaned = array(); 1315 1316 foreach ($dataobject as $field=>$value) { 1317 if (!isset($columns[$field])) { 1318 continue; 1319 } 1320 $column = $columns[$field]; 1321 $cleaned[$field] = $this->normalise_value($column, $value); 1322 } 1323 1324 $this->update_record_raw($table, $cleaned, $bulk); 1325 1326 return true; 1327 } 1328 1329 /** 1330 * Set a single field in every table record which match a particular WHERE clause. 1331 * 1332 * @param string $table The database table to be checked against. 1333 * @param string $newfield the field to set. 1334 * @param string $newvalue the value to set the field to. 1335 * @param string $select A fragment of SQL to be used in a where clause in the SQL call. 1336 * @param array $params array of sql parameters 1337 * @return bool true 1338 * @throws dml_exception A DML specific exception is thrown for any errors. 1339 */ 1340 public function set_field_select($table, $newfield, $newvalue, $select, array $params=null) { 1341 1342 if ($select) { 1343 $select = "WHERE $select"; 1344 } 1345 if (is_null($params)) { 1346 $params = array(); 1347 } 1348 list($select, $params, $type) = $this->fix_sql_params($select, $params); 1349 $i = count($params)+1; 1350 1351 // Get column metadata 1352 $columns = $this->get_columns($table); 1353 $column = $columns[$newfield]; 1354 1355 $normalisedvalue = $this->normalise_value($column, $newvalue); 1356 1357 $newfield = "$newfield = \$" . $i; 1358 $params[] = $normalisedvalue; 1359 $sql = "UPDATE {$this->prefix}$table SET $newfield $select"; 1360 1361 $this->query_start($sql, $params, SQL_QUERY_UPDATE); 1362 $result = pg_query_params($this->pgsql, $sql, $params); 1363 $this->query_end($result); 1364 1365 pg_free_result($result); 1366 1367 return true; 1368 } 1369 1370 /** 1371 * Delete one or more records from a table which match a particular WHERE clause, lobs not supported. 1372 * 1373 * @param string $table The database table to be checked against. 1374 * @param string $select A fragment of SQL to be used in a where clause in the SQL call (used to define the selection criteria). 1375 * @param array $params array of sql parameters 1376 * @return bool true 1377 * @throws dml_exception A DML specific exception is thrown for any errors. 1378 */ 1379 public function delete_records_select($table, $select, array $params=null) { 1380 if ($select) { 1381 $select = "WHERE $select"; 1382 } 1383 $sql = "DELETE FROM {$this->prefix}$table $select"; 1384 1385 list($sql, $params, $type) = $this->fix_sql_params($sql, $params); 1386 1387 $this->query_start($sql, $params, SQL_QUERY_UPDATE); 1388 $result = pg_query_params($this->pgsql, $sql, $params); 1389 $this->query_end($result); 1390 1391 pg_free_result($result); 1392 1393 return true; 1394 } 1395 1396 /** 1397 * Returns 'LIKE' part of a query. 1398 * 1399 * @param string $fieldname usually name of the table column 1400 * @param string $param usually bound query parameter (?, :named) 1401 * @param bool $casesensitive use case sensitive search 1402 * @param bool $accensensitive use accent sensitive search (not all databases support accent insensitive) 1403 * @param bool $notlike true means "NOT LIKE" 1404 * @param string $escapechar escape char for '%' and '_' 1405 * @return string SQL code fragment 1406 */ 1407 public function sql_like($fieldname, $param, $casesensitive = true, $accentsensitive = true, $notlike = false, $escapechar = '\\') { 1408 if (strpos($param, '%') !== false) { 1409 debugging('Potential SQL injection detected, sql_like() expects bound parameters (? or :named)'); 1410 } 1411 1412 // postgresql does not support accent insensitive text comparisons, sorry 1413 if ($casesensitive) { 1414 $LIKE = $notlike ? 'NOT LIKE' : 'LIKE'; 1415 } else { 1416 $LIKE = $notlike ? 'NOT ILIKE' : 'ILIKE'; 1417 } 1418 return "$fieldname $LIKE $param ESCAPE '$escapechar'"; 1419 } 1420 1421 public function sql_bitxor($int1, $int2) { 1422 return '((' . $int1 . ') # (' . $int2 . '))'; 1423 } 1424 1425 public function sql_cast_char2int($fieldname, $text=false) { 1426 return ' CAST(' . $fieldname . ' AS INT) '; 1427 } 1428 1429 public function sql_cast_char2real($fieldname, $text=false) { 1430 return " $fieldname::real "; 1431 } 1432 1433 public function sql_concat() { 1434 $arr = func_get_args(); 1435 $s = implode(' || ', $arr); 1436 if ($s === '') { 1437 return " '' "; 1438 } 1439 // Add always empty string element so integer-exclusive concats 1440 // will work without needing to cast each element explicitly 1441 return " '' || $s "; 1442 } 1443 1444 public function sql_concat_join($separator="' '", $elements=array()) { 1445 for ($n=count($elements)-1; $n > 0 ; $n--) { 1446 array_splice($elements, $n, 0, $separator); 1447 } 1448 $s = implode(' || ', $elements); 1449 if ($s === '') { 1450 return " '' "; 1451 } 1452 return " $s "; 1453 } 1454 1455 /** 1456 * Return SQL for performing group concatenation on given field/expression 1457 * 1458 * @param string $field 1459 * @param string $separator 1460 * @param string $sort 1461 * @return string 1462 */ 1463 public function sql_group_concat(string $field, string $separator = ', ', string $sort = ''): string { 1464 $fieldsort = $sort ? "ORDER BY {$sort}" : ''; 1465 return "STRING_AGG(CAST({$field} AS VARCHAR), '{$separator}' {$fieldsort})"; 1466 } 1467 1468 public function sql_regex_supported() { 1469 return true; 1470 } 1471 1472 public function sql_regex($positivematch = true, $casesensitive = false) { 1473 if ($casesensitive) { 1474 return $positivematch ? '~' : '!~'; 1475 } else { 1476 return $positivematch ? '~*' : '!~*'; 1477 } 1478 } 1479 1480 /** 1481 * Does this driver support tool_replace? 1482 * 1483 * @since Moodle 2.6.1 1484 * @return bool 1485 */ 1486 public function replace_all_text_supported() { 1487 return true; 1488 } 1489 1490 public function session_lock_supported() { 1491 return true; 1492 } 1493 1494 /** 1495 * Obtain session lock 1496 * @param int $rowid id of the row with session record 1497 * @param int $timeout max allowed time to wait for the lock in seconds 1498 * @return bool success 1499 */ 1500 public function get_session_lock($rowid, $timeout) { 1501 // NOTE: there is a potential locking problem for database running 1502 // multiple instances of moodle, we could try to use pg_advisory_lock(int, int), 1503 // luckily there is not a big chance that they would collide 1504 if (!$this->session_lock_supported()) { 1505 return; 1506 } 1507 1508 parent::get_session_lock($rowid, $timeout); 1509 1510 $timeoutmilli = $timeout * 1000; 1511 1512 $sql = "SET statement_timeout TO $timeoutmilli"; 1513 $this->query_start($sql, null, SQL_QUERY_AUX); 1514 $result = pg_query($this->pgsql, $sql); 1515 $this->query_end($result); 1516 1517 if ($result) { 1518 pg_free_result($result); 1519 } 1520 1521 $sql = "SELECT pg_advisory_lock($rowid)"; 1522 $this->query_start($sql, null, SQL_QUERY_AUX); 1523 $start = time(); 1524 $result = pg_query($this->pgsql, $sql); 1525 $end = time(); 1526 try { 1527 $this->query_end($result); 1528 } catch (dml_exception $ex) { 1529 if ($end - $start >= $timeout) { 1530 throw new dml_sessionwait_exception(); 1531 } else { 1532 throw $ex; 1533 } 1534 } 1535 1536 if ($result) { 1537 pg_free_result($result); 1538 } 1539 1540 $sql = "SET statement_timeout TO DEFAULT"; 1541 $this->query_start($sql, null, SQL_QUERY_AUX); 1542 $result = pg_query($this->pgsql, $sql); 1543 $this->query_end($result); 1544 1545 if ($result) { 1546 pg_free_result($result); 1547 } 1548 } 1549 1550 public function release_session_lock($rowid) { 1551 if (!$this->session_lock_supported()) { 1552 return; 1553 } 1554 if (!$this->used_for_db_sessions) { 1555 return; 1556 } 1557 1558 parent::release_session_lock($rowid); 1559 1560 $sql = "SELECT pg_advisory_unlock($rowid)"; 1561 $this->query_start($sql, null, SQL_QUERY_AUX); 1562 $result = pg_query($this->pgsql, $sql); 1563 $this->query_end($result); 1564 1565 if ($result) { 1566 pg_free_result($result); 1567 } 1568 } 1569 1570 /** 1571 * Driver specific start of real database transaction, 1572 * this can not be used directly in code. 1573 * @return void 1574 */ 1575 protected function begin_transaction() { 1576 $this->savepointpresent = true; 1577 $sql = "BEGIN ISOLATION LEVEL READ COMMITTED; SAVEPOINT moodle_pg_savepoint"; 1578 $this->query_start($sql, null, SQL_QUERY_AUX); 1579 $result = pg_query($this->pgsql, $sql); 1580 $this->query_end($result); 1581 1582 pg_free_result($result); 1583 } 1584 1585 /** 1586 * Driver specific commit of real database transaction, 1587 * this can not be used directly in code. 1588 * @return void 1589 */ 1590 protected function commit_transaction() { 1591 $this->savepointpresent = false; 1592 $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; COMMIT"; 1593 $this->query_start($sql, null, SQL_QUERY_AUX); 1594 $result = pg_query($this->pgsql, $sql); 1595 $this->query_end($result); 1596 1597 pg_free_result($result); 1598 } 1599 1600 /** 1601 * Driver specific abort of real database transaction, 1602 * this can not be used directly in code. 1603 * @return void 1604 */ 1605 protected function rollback_transaction() { 1606 $this->savepointpresent = false; 1607 $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; ROLLBACK"; 1608 $this->query_start($sql, null, SQL_QUERY_AUX); 1609 $result = pg_query($this->pgsql, $sql); 1610 $this->query_end($result); 1611 1612 pg_free_result($result); 1613 } 1614 1615 /** 1616 * Helper function trimming (whitespace + quotes) any string 1617 * needed because PG uses to enclose with double quotes some 1618 * fields in indexes definition and others 1619 * 1620 * @param string $str string to apply whitespace + quotes trim 1621 * @return string trimmed string 1622 */ 1623 private function trim_quotes($str) { 1624 return trim(trim($str), "'\""); 1625 } 1626 1627 /** 1628 * Postgresql supports full-text search indexes. 1629 * 1630 * @return bool 1631 */ 1632 public function is_fulltext_search_supported() { 1633 return true; 1634 } 1635} 1636