1<?php
2// This file is part of Moodle - http://moodle.org/
3//
4// Moodle is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8//
9// Moodle is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13//
14// You should have received a copy of the GNU General Public License
15// along with Moodle.  If not, see <http://www.gnu.org/licenses/>.
16
17/**
18 * Native pgsql class representing moodle database interface.
19 *
20 * @package    core_dml
21 * @copyright  2008 Petr Skoda (http://skodak.org)
22 * @license    http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
23 */
24
25defined('MOODLE_INTERNAL') || die();
26
27require_once(__DIR__.'/moodle_database.php');
28require_once(__DIR__.'/moodle_read_slave_trait.php');
29require_once(__DIR__.'/pgsql_native_moodle_recordset.php');
30require_once(__DIR__.'/pgsql_native_moodle_temptables.php');
31
32/**
33 * Native pgsql class representing moodle database interface.
34 *
35 * @package    core_dml
36 * @copyright  2008 Petr Skoda (http://skodak.org)
37 * @license    http://www.gnu.org/copyleft/gpl.html GNU GPL v3 or later
38 */
39class pgsql_native_moodle_database extends moodle_database {
40    use moodle_read_slave_trait {
41        select_db_handle as read_slave_select_db_handle;
42        can_use_readonly as read_slave_can_use_readonly;
43        query_start as read_slave_query_start;
44    }
45
46    /** @var array $dbhcursor keep track of open cursors */
47    private $dbhcursor = [];
48
49    /** @var resource $pgsql database resource */
50    protected $pgsql     = null;
51
52    protected $last_error_reporting; // To handle pgsql driver default verbosity
53
54    /** @var bool savepoint hack for MDL-35506 - workaround for automatic transaction rollback on error */
55    protected $savepointpresent = false;
56
57    /** @var int Number of cursors used (for constructing a unique ID) */
58    protected $cursorcount = 0;
59
60    /** @var int Default number of rows to fetch at a time when using recordsets with cursors */
61    const DEFAULT_FETCH_BUFFER_SIZE = 100000;
62
63    /**
64     * Detects if all needed PHP stuff installed.
65     * Note: can be used before connect()
66     * @return mixed true if ok, string if something
67     */
68    public function driver_installed() {
69        if (!extension_loaded('pgsql')) {
70            return get_string('pgsqlextensionisnotpresentinphp', 'install');
71        }
72        return true;
73    }
74
75    /**
76     * Returns database family type - describes SQL dialect
77     * Note: can be used before connect()
78     * @return string db family name (mysql, postgres, mssql, oracle, etc.)
79     */
80    public function get_dbfamily() {
81        return 'postgres';
82    }
83
84    /**
85     * Returns more specific database driver type
86     * Note: can be used before connect()
87     * @return string db type mysqli, pgsql, oci, mssql, sqlsrv
88     */
89    protected function get_dbtype() {
90        return 'pgsql';
91    }
92
93    /**
94     * Returns general database library name
95     * Note: can be used before connect()
96     * @return string db type pdo, native
97     */
98    protected function get_dblibrary() {
99        return 'native';
100    }
101
102    /**
103     * Returns localised database type name
104     * Note: can be used before connect()
105     * @return string
106     */
107    public function get_name() {
108        return get_string('nativepgsql', 'install');
109    }
110
111    /**
112     * Returns localised database configuration help.
113     * Note: can be used before connect()
114     * @return string
115     */
116    public function get_configuration_help() {
117        return get_string('nativepgsqlhelp', 'install');
118    }
119
120    /**
121     * Connect to db
122     * @param string $dbhost The database host.
123     * @param string $dbuser The database username.
124     * @param string $dbpass The database username's password.
125     * @param string $dbname The name of the database being connected to.
126     * @param mixed $prefix string means moodle db prefix, false used for external databases where prefix not used
127     * @param array $dboptions driver specific options
128     * @return bool true
129     * @throws dml_connection_exception if error
130     */
131    public function raw_connect(string $dbhost, string $dbuser, string $dbpass, string $dbname, $prefix, array $dboptions=null): bool {
132        if ($prefix == '' and !$this->external) {
133            //Enforce prefixes for everybody but mysql
134            throw new dml_exception('prefixcannotbeempty', $this->get_dbfamily());
135        }
136
137        $driverstatus = $this->driver_installed();
138
139        if ($driverstatus !== true) {
140            throw new dml_exception('dbdriverproblem', $driverstatus);
141        }
142
143        $this->store_settings($dbhost, $dbuser, $dbpass, $dbname, $prefix, $dboptions);
144
145        $pass = addcslashes($this->dbpass, "'\\");
146
147        // Unix socket connections should have lower overhead
148        if (!empty($this->dboptions['dbsocket']) and ($this->dbhost === 'localhost' or $this->dbhost === '127.0.0.1')) {
149            $connection = "user='$this->dbuser' password='$pass' dbname='$this->dbname'";
150            if (strpos($this->dboptions['dbsocket'], '/') !== false) {
151                // A directory was specified as the socket location.
152                $connection .= " host='".$this->dboptions['dbsocket']."'";
153            }
154            if (!empty($this->dboptions['dbport'])) {
155                // A port as specified, add it to the connection as it's used as part of the socket path.
156                $connection .= " port ='".$this->dboptions['dbport']."'";
157            }
158        } else {
159            $this->dboptions['dbsocket'] = '';
160            if (empty($this->dbname)) {
161                // probably old style socket connection - do not add port
162                $port = "";
163            } else if (empty($this->dboptions['dbport'])) {
164                $port = "port ='5432'";
165            } else {
166                $port = "port ='".$this->dboptions['dbport']."'";
167            }
168            $connection = "host='$this->dbhost' $port user='$this->dbuser' password='$pass' dbname='$this->dbname'";
169        }
170
171        if (!empty($this->dboptions['connecttimeout'])) {
172            $connection .= " connect_timeout=".$this->dboptions['connecttimeout'];
173        }
174
175        if (empty($this->dboptions['dbhandlesoptions'])) {
176            // ALTER USER and ALTER DATABASE are overridden by these settings.
177            $options = array('--client_encoding=utf8', '--standard_conforming_strings=on');
178            // Select schema if specified, otherwise the first one wins.
179            if (!empty($this->dboptions['dbschema'])) {
180                $options[] = "-c search_path=" . addcslashes($this->dboptions['dbschema'], "'\\");
181            }
182
183            $connection .= " options='" . implode(' ', $options) . "'";
184        }
185
186        ob_start();
187        if (empty($this->dboptions['dbpersist'])) {
188            $this->pgsql = pg_connect($connection, PGSQL_CONNECT_FORCE_NEW);
189        } else {
190            $this->pgsql = pg_pconnect($connection, PGSQL_CONNECT_FORCE_NEW);
191        }
192        $dberr = ob_get_contents();
193        ob_end_clean();
194
195        $status = $this->pgsql ? pg_connection_status($this->pgsql) : false;
196
197        if ($status === false or $status === PGSQL_CONNECTION_BAD) {
198            $this->pgsql = null;
199            throw new dml_connection_exception($dberr);
200        }
201
202        if (!empty($this->dboptions['dbpersist'])) {
203            // There are rare situations (such as PHP out of memory errors) when open cursors may
204            // not be closed at the end of a connection. When using persistent connections, the
205            // cursors remain open and 'get in the way' of future connections. To avoid this
206            // problem, close all cursors here.
207            $result = pg_query($this->pgsql, 'CLOSE ALL');
208            if ($result) {
209                pg_free_result($result);
210            }
211        }
212
213        if (!empty($this->dboptions['dbhandlesoptions'])) {
214            /* We don't trust people who just set the dbhandlesoptions, this code checks up on them.
215             * These functions do not talk to the server, they use the client library knowledge to determine state.
216             */
217            if (!empty($this->dboptions['dbschema'])) {
218                throw new dml_connection_exception('You cannot specify a schema with dbhandlesoptions, use the database to set it.');
219            }
220            if (pg_client_encoding($this->pgsql) != 'UTF8') {
221                throw new dml_connection_exception('client_encoding = UTF8 not set, it is: ' . pg_client_encoding($this->pgsql));
222            }
223            if (pg_escape_string($this->pgsql, '\\') != '\\') {
224                throw new dml_connection_exception('standard_conforming_strings = on, must be set at the database.');
225            }
226        }
227
228        // Connection stabilised and configured, going to instantiate the temptables controller
229        $this->temptables = new pgsql_native_moodle_temptables($this);
230
231        return true;
232    }
233
234    /**
235     * Close database connection and release all resources
236     * and memory (especially circular memory references).
237     * Do NOT use connect() again, create a new instance if needed.
238     */
239    public function dispose() {
240        parent::dispose(); // Call parent dispose to write/close session and other common stuff before closing connection
241        if ($this->pgsql) {
242            pg_close($this->pgsql);
243            $this->pgsql = null;
244        }
245    }
246
247    /**
248     * Gets db handle currently used with queries
249     * @return resource
250     */
251    protected function get_db_handle() {
252        return $this->pgsql;
253    }
254
255    /**
256     * Sets db handle to be used with subsequent queries
257     * @param resource $dbh
258     * @return void
259     */
260    protected function set_db_handle($dbh): void {
261        $this->pgsql = $dbh;
262    }
263
264    /**
265     * Select appropriate db handle - readwrite or readonly
266     * @param int $type type of query
267     * @param string $sql
268     * @return void
269     */
270    protected function select_db_handle(int $type, string $sql): void {
271        $this->read_slave_select_db_handle($type, $sql);
272
273        if (preg_match('/^DECLARE (crs\w*) NO SCROLL CURSOR/', $sql, $match)) {
274            $cursor = $match[1];
275            $this->dbhcursor[$cursor] = $this->pgsql;
276        }
277        if (preg_match('/^(?:FETCH \d+ FROM|CLOSE) (crs\w*)\b/', $sql, $match)) {
278            $cursor = $match[1];
279            $this->pgsql = $this->dbhcursor[$cursor];
280        }
281    }
282
283    /**
284     * Check if The query qualifies for readonly connection execution
285     * Logging queries are exempt, those are write operations that circumvent
286     * standard query_start/query_end paths.
287     * @param int $type type of query
288     * @param string $sql
289     * @return bool
290     */
291    protected function can_use_readonly(int $type, string $sql): bool {
292        // ... pg_*lock queries always go to master.
293        if (preg_match('/\bpg_\w*lock/', $sql)) {
294            return false;
295        }
296
297        // ... a nuisance - temptables use this.
298        if (preg_match('/\bpg_catalog/', $sql) && $this->temptables->get_temptables()) {
299            return false;
300        }
301
302        return $this->read_slave_can_use_readonly($type, $sql);
303
304    }
305
306    /**
307     * Called before each db query.
308     * @param string $sql
309     * @param array array of parameters
310     * @param int $type type of query
311     * @param mixed $extrainfo driver specific extra information
312     * @return void
313     */
314    protected function query_start($sql, array $params=null, $type, $extrainfo=null) {
315        $this->read_slave_query_start($sql, $params, $type, $extrainfo);
316        // pgsql driver tends to send debug to output, we do not need that.
317        $this->last_error_reporting = error_reporting(0);
318    }
319
320    /**
321     * Called immediately after each db query.
322     * @param mixed db specific result
323     * @return void
324     */
325    protected function query_end($result) {
326        // reset original debug level
327        error_reporting($this->last_error_reporting);
328        try {
329            parent::query_end($result);
330            if ($this->savepointpresent and $this->last_type != SQL_QUERY_AUX and $this->last_type != SQL_QUERY_SELECT) {
331                $res = @pg_query($this->pgsql, "RELEASE SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
332                if ($res) {
333                    pg_free_result($res);
334                }
335            }
336        } catch (Exception $e) {
337            if ($this->savepointpresent) {
338                $res = @pg_query($this->pgsql, "ROLLBACK TO SAVEPOINT moodle_pg_savepoint; SAVEPOINT moodle_pg_savepoint");
339                if ($res) {
340                    pg_free_result($res);
341                }
342            }
343            throw $e;
344        }
345    }
346
347    /**
348     * Returns database server info array
349     * @return array Array containing 'description' and 'version' info
350     */
351    public function get_server_info() {
352        static $info;
353        if (!$info) {
354            $this->query_start("--pg_version()", null, SQL_QUERY_AUX);
355            $info = pg_version($this->pgsql);
356            $this->query_end(true);
357        }
358        return array('description'=>$info['server'], 'version'=>$info['server']);
359    }
360
361    /**
362     * Returns supported query parameter types
363     * @return int bitmask of accepted SQL_PARAMS_*
364     */
365    protected function allowed_param_types() {
366        return SQL_PARAMS_DOLLAR;
367    }
368
369    /**
370     * Returns last error reported by database engine.
371     * @return string error message
372     */
373    public function get_last_error() {
374        return pg_last_error($this->pgsql);
375    }
376
377    /**
378     * Return tables in database WITHOUT current prefix.
379     * @param bool $usecache if true, returns list of cached tables.
380     * @return array of table names in lowercase and without prefix
381     */
382    public function get_tables($usecache=true) {
383        if ($usecache and $this->tables !== null) {
384            return $this->tables;
385        }
386        $this->tables = array();
387        $prefix = str_replace('_', '|_', $this->prefix);
388        $sql = "SELECT c.relname
389                  FROM pg_catalog.pg_class c
390                  JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
391                 WHERE c.relname LIKE '$prefix%' ESCAPE '|'
392                       AND c.relkind = 'r'
393                       AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())";
394        $this->query_start($sql, null, SQL_QUERY_AUX);
395        $result = pg_query($this->pgsql, $sql);
396        $this->query_end($result);
397
398        if ($result) {
399            while ($row = pg_fetch_row($result)) {
400                $tablename = reset($row);
401                if ($this->prefix !== false && $this->prefix !== '') {
402                    if (strpos($tablename, $this->prefix) !== 0) {
403                        continue;
404                    }
405                    $tablename = substr($tablename, strlen($this->prefix));
406                }
407                $this->tables[$tablename] = $tablename;
408            }
409            pg_free_result($result);
410        }
411        return $this->tables;
412    }
413
414    /**
415     * Constructs 'IN()' or '=' sql fragment
416     *
417     * Method overriding {@see moodle_database::get_in_or_equal} to be able to use
418     * more than 65535 elements in $items array.
419     *
420     * @param mixed $items A single value or array of values for the expression.
421     * @param int $type Parameter bounding type : SQL_PARAMS_QM or SQL_PARAMS_NAMED.
422     * @param string $prefix Named parameter placeholder prefix (a unique counter value is appended to each parameter name).
423     * @param bool $equal True means we want to equate to the constructed expression, false means we don't want to equate to it.
424     * @param mixed $onemptyitems This defines the behavior when the array of items provided is empty. Defaults to false,
425     *              meaning throw exceptions. Other values will become part of the returned SQL fragment.
426     * @throws coding_exception | dml_exception
427     * @return array A list containing the constructed sql fragment and an array of parameters.
428     */
429    public function get_in_or_equal($items, $type=SQL_PARAMS_QM, $prefix='param', $equal=true, $onemptyitems=false): array {
430        // We only interfere if number of items in expression exceeds 16 bit value.
431        if (!is_array($items) || count($items) < 65535) {
432            return parent::get_in_or_equal($items, $type, $prefix,  $equal, $onemptyitems);
433        }
434
435        // Determine the type from the first value. We don't need to be very smart here,
436        // it is developer's responsibility to make sure that variable type is matching
437        // field type, if not the case, DB engine will hint. Also mixing types won't work
438        // here anyway, so we ignore NULL or boolean (unlikely you need 56k values of
439        // these types only).
440        $cast = is_string(current($items)) ? '::text' : '::bigint';
441
442        if ($type == SQL_PARAMS_QM) {
443            if ($equal) {
444                $sql = 'IN (VALUES ('.implode('),(', array_fill(0, count($items), '?'.$cast)).'))';
445            } else {
446                $sql = 'NOT IN (VALUES ('.implode('),(', array_fill(0, count($items), '?'.$cast)).'))';
447            }
448            $params = array_values($items);
449        } else if ($type == SQL_PARAMS_NAMED) {
450            if (empty($prefix)) {
451                $prefix = 'param';
452            }
453            $params = [];
454            $sql = [];
455            foreach ($items as $item) {
456                $param = $prefix.$this->inorequaluniqueindex++;
457                $params[$param] = $item;
458                $sql[] = ':'.$param.$cast;
459            }
460            if ($equal) {
461                $sql = 'IN (VALUES ('.implode('),(', $sql).'))';
462            } else {
463                $sql = 'NOT IN (VALUES ('.implode('),(', $sql).'))';
464            }
465        } else {
466            throw new dml_exception('typenotimplement');
467        }
468        return [$sql, $params];
469    }
470
471    /**
472     * Return table indexes - everything lowercased.
473     * @param string $table The table we want to get indexes from.
474     * @return array of arrays
475     */
476    public function get_indexes($table) {
477        $indexes = array();
478        $tablename = $this->prefix.$table;
479
480        $sql = "SELECT i.*
481                  FROM pg_catalog.pg_indexes i
482                  JOIN pg_catalog.pg_namespace as ns ON ns.nspname = i.schemaname
483                 WHERE i.tablename = '$tablename'
484                       AND (i.schemaname = current_schema() OR ns.oid = pg_my_temp_schema())";
485
486        $this->query_start($sql, null, SQL_QUERY_AUX);
487        $result = pg_query($this->pgsql, $sql);
488        $this->query_end($result);
489
490        if ($result) {
491            while ($row = pg_fetch_assoc($result)) {
492                // The index definition could be generated schema-qualifying the target table name
493                // for safety, depending on the pgsql version (CVE-2018-1058).
494                if (!preg_match('/CREATE (|UNIQUE )INDEX ([^\s]+) ON (|'.$row['schemaname'].'\.)'.$tablename.' USING ([^\s]+) \(([^\)]+)\)/i', $row['indexdef'], $matches)) {
495                    continue;
496                }
497                if ($matches[5] === 'id') {
498                    continue;
499                }
500                $columns = explode(',', $matches[5]);
501                foreach ($columns as $k=>$column) {
502                    $column = trim($column);
503                    if ($pos = strpos($column, ' ')) {
504                        // index type is separated by space
505                        $column = substr($column, 0, $pos);
506                    }
507                    $columns[$k] = $this->trim_quotes($column);
508                }
509                $indexes[$row['indexname']] = array('unique'=>!empty($matches[1]),
510                                              'columns'=>$columns);
511            }
512            pg_free_result($result);
513        }
514        return $indexes;
515    }
516
517    /**
518     * Returns detailed information about columns in table.
519     *
520     * @param string $table name
521     * @return database_column_info[] array of database_column_info objects indexed with column names
522     */
523    protected function fetch_columns(string $table): array {
524        $structure = array();
525
526        $tablename = $this->prefix.$table;
527
528        $sql = "SELECT a.attnum, a.attname AS field, t.typname AS type, a.attlen, a.atttypmod, a.attnotnull, a.atthasdef,
529                       CASE WHEN a.atthasdef THEN pg_catalog.pg_get_expr(d.adbin, d.adrelid) END AS adsrc
530                  FROM pg_catalog.pg_class c
531                  JOIN pg_catalog.pg_namespace as ns ON ns.oid = c.relnamespace
532                  JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid
533                  JOIN pg_catalog.pg_type t ON t.oid = a.atttypid
534             LEFT JOIN pg_catalog.pg_attrdef d ON (d.adrelid = c.oid AND d.adnum = a.attnum)
535                 WHERE relkind = 'r' AND c.relname = '$tablename' AND c.reltype > 0 AND a.attnum > 0
536                       AND (ns.nspname = current_schema() OR ns.oid = pg_my_temp_schema())
537              ORDER BY a.attnum";
538
539        $this->query_start($sql, null, SQL_QUERY_AUX);
540        $result = pg_query($this->pgsql, $sql);
541        $this->query_end($result);
542
543        if (!$result) {
544            return array();
545        }
546        while ($rawcolumn = pg_fetch_object($result)) {
547
548            $info = new stdClass();
549            $info->name = $rawcolumn->field;
550            $matches = null;
551
552            if ($rawcolumn->type === 'varchar') {
553                $info->type          = 'varchar';
554                $info->meta_type     = 'C';
555                $info->max_length    = $rawcolumn->atttypmod - 4;
556                $info->scale         = null;
557                $info->not_null      = ($rawcolumn->attnotnull === 't');
558                $info->has_default   = ($rawcolumn->atthasdef === 't');
559                if ($info->has_default) {
560                    $parts = explode('::', $rawcolumn->adsrc);
561                    if (count($parts) > 1) {
562                        $info->default_value = reset($parts);
563                        $info->default_value = trim($info->default_value, "'");
564                    } else {
565                        $info->default_value = $rawcolumn->adsrc;
566                    }
567                } else {
568                    $info->default_value = null;
569                }
570                $info->primary_key   = false;
571                $info->binary        = false;
572                $info->unsigned      = null;
573                $info->auto_increment= false;
574                $info->unique        = null;
575
576            } else if (preg_match('/int(\d)/i', $rawcolumn->type, $matches)) {
577                $info->type = 'int';
578                if (strpos($rawcolumn->adsrc, 'nextval') === 0) {
579                    $info->primary_key   = true;
580                    $info->meta_type     = 'R';
581                    $info->unique        = true;
582                    $info->auto_increment= true;
583                    $info->has_default   = false;
584                } else {
585                    $info->primary_key   = false;
586                    $info->meta_type     = 'I';
587                    $info->unique        = null;
588                    $info->auto_increment= false;
589                    $info->has_default   = ($rawcolumn->atthasdef === 't');
590                }
591                // Return number of decimals, not bytes here.
592                if ($matches[1] >= 8) {
593                    $info->max_length = 18;
594                } else if ($matches[1] >= 4) {
595                    $info->max_length = 9;
596                } else if ($matches[1] >= 2) {
597                    $info->max_length = 4;
598                } else if ($matches[1] >= 1) {
599                    $info->max_length = 2;
600                } else {
601                    $info->max_length = 0;
602                }
603                $info->scale         = null;
604                $info->not_null      = ($rawcolumn->attnotnull === 't');
605                if ($info->has_default) {
606                    // PG 9.5+ uses ::<TYPE> syntax for some defaults.
607                    $parts = explode('::', $rawcolumn->adsrc);
608                    if (count($parts) > 1) {
609                        $info->default_value = reset($parts);
610                    } else {
611                        $info->default_value = $rawcolumn->adsrc;
612                    }
613                    $info->default_value = trim($info->default_value, "()'");
614                } else {
615                    $info->default_value = null;
616                }
617                $info->binary        = false;
618                $info->unsigned      = false;
619
620            } else if ($rawcolumn->type === 'numeric') {
621                $info->type = $rawcolumn->type;
622                $info->meta_type     = 'N';
623                $info->primary_key   = false;
624                $info->binary        = false;
625                $info->unsigned      = null;
626                $info->auto_increment= false;
627                $info->unique        = null;
628                $info->not_null      = ($rawcolumn->attnotnull === 't');
629                $info->has_default   = ($rawcolumn->atthasdef === 't');
630                if ($info->has_default) {
631                    // PG 9.5+ uses ::<TYPE> syntax for some defaults.
632                    $parts = explode('::', $rawcolumn->adsrc);
633                    if (count($parts) > 1) {
634                        $info->default_value = reset($parts);
635                    } else {
636                        $info->default_value = $rawcolumn->adsrc;
637                    }
638                    $info->default_value = trim($info->default_value, "()'");
639                } else {
640                    $info->default_value = null;
641                }
642                $info->max_length    = $rawcolumn->atttypmod >> 16;
643                $info->scale         = ($rawcolumn->atttypmod & 0xFFFF) - 4;
644
645            } else if (preg_match('/float(\d)/i', $rawcolumn->type, $matches)) {
646                $info->type = 'float';
647                $info->meta_type     = 'N';
648                $info->primary_key   = false;
649                $info->binary        = false;
650                $info->unsigned      = null;
651                $info->auto_increment= false;
652                $info->unique        = null;
653                $info->not_null      = ($rawcolumn->attnotnull === 't');
654                $info->has_default   = ($rawcolumn->atthasdef === 't');
655                if ($info->has_default) {
656                    // PG 9.5+ uses ::<TYPE> syntax for some defaults.
657                    $parts = explode('::', $rawcolumn->adsrc);
658                    if (count($parts) > 1) {
659                        $info->default_value = reset($parts);
660                    } else {
661                        $info->default_value = $rawcolumn->adsrc;
662                    }
663                    $info->default_value = trim($info->default_value, "()'");
664                } else {
665                    $info->default_value = null;
666                }
667                // just guess expected number of deciaml places :-(
668                if ($matches[1] == 8) {
669                    // total 15 digits
670                    $info->max_length = 8;
671                    $info->scale      = 7;
672                } else {
673                    // total 6 digits
674                    $info->max_length = 4;
675                    $info->scale      = 2;
676                }
677
678            } else if ($rawcolumn->type === 'text') {
679                $info->type          = $rawcolumn->type;
680                $info->meta_type     = 'X';
681                $info->max_length    = -1;
682                $info->scale         = null;
683                $info->not_null      = ($rawcolumn->attnotnull === 't');
684                $info->has_default   = ($rawcolumn->atthasdef === 't');
685                if ($info->has_default) {
686                    $parts = explode('::', $rawcolumn->adsrc);
687                    if (count($parts) > 1) {
688                        $info->default_value = reset($parts);
689                        $info->default_value = trim($info->default_value, "'");
690                    } else {
691                        $info->default_value = $rawcolumn->adsrc;
692                    }
693                } else {
694                    $info->default_value = null;
695                }
696                $info->primary_key   = false;
697                $info->binary        = false;
698                $info->unsigned      = null;
699                $info->auto_increment= false;
700                $info->unique        = null;
701
702            } else if ($rawcolumn->type === 'bytea') {
703                $info->type          = $rawcolumn->type;
704                $info->meta_type     = 'B';
705                $info->max_length    = -1;
706                $info->scale         = null;
707                $info->not_null      = ($rawcolumn->attnotnull === 't');
708                $info->has_default   = false;
709                $info->default_value = null;
710                $info->primary_key   = false;
711                $info->binary        = true;
712                $info->unsigned      = null;
713                $info->auto_increment= false;
714                $info->unique        = null;
715
716            }
717
718            $structure[$info->name] = new database_column_info($info);
719        }
720
721        pg_free_result($result);
722
723        return $structure;
724    }
725
726    /**
727     * Normalise values based in RDBMS dependencies (booleans, LOBs...)
728     *
729     * @param database_column_info $column column metadata corresponding with the value we are going to normalise
730     * @param mixed $value value we are going to normalise
731     * @return mixed the normalised value
732     */
733    protected function normalise_value($column, $value) {
734        $this->detect_objects($value);
735
736        if (is_bool($value)) { // Always, convert boolean to int
737            $value = (int)$value;
738
739        } else if ($column->meta_type === 'B') {
740            if (!is_null($value)) {
741                // standard_conforming_strings must be enabled, otherwise pg_escape_bytea() will double escape
742                // \ and produce data errors.  This is set on the connection.
743                $value = pg_escape_bytea($this->pgsql, $value);
744            }
745
746        } else if ($value === '') {
747            if ($column->meta_type === 'I' or $column->meta_type === 'F' or $column->meta_type === 'N') {
748                $value = 0; // prevent '' problems in numeric fields
749            }
750        }
751        return $value;
752    }
753
754    /**
755     * Is db in unicode mode?
756     * @return bool
757     */
758    public function setup_is_unicodedb() {
759        // Get PostgreSQL server_encoding value
760        $sql = "SHOW server_encoding";
761        $this->query_start($sql, null, SQL_QUERY_AUX);
762        $result = pg_query($this->pgsql, $sql);
763        $this->query_end($result);
764
765        if (!$result) {
766            return false;
767        }
768        $rawcolumn = pg_fetch_object($result);
769        $encoding = $rawcolumn->server_encoding;
770        pg_free_result($result);
771
772        return (strtoupper($encoding) == 'UNICODE' || strtoupper($encoding) == 'UTF8');
773    }
774
775    /**
776     * Do NOT use in code, to be used by database_manager only!
777     * @param string|array $sql query
778     * @param array|null $tablenames an array of xmldb table names affected by this request.
779     * @return bool true
780     * @throws ddl_change_structure_exception A DDL specific exception is thrown for any errors.
781     */
782    public function change_database_structure($sql, $tablenames = null) {
783        $this->get_manager(); // Includes DDL exceptions classes ;-)
784        if (is_array($sql)) {
785            $sql = implode("\n;\n", $sql);
786        }
787        if (!$this->is_transaction_started()) {
788            // It is better to do all or nothing, this helps with recovery...
789            $sql = "BEGIN ISOLATION LEVEL SERIALIZABLE;\n$sql\n; COMMIT";
790        }
791
792        try {
793            $this->query_start($sql, null, SQL_QUERY_STRUCTURE);
794            $result = pg_query($this->pgsql, $sql);
795            $this->query_end($result);
796            pg_free_result($result);
797        } catch (ddl_change_structure_exception $e) {
798            if (!$this->is_transaction_started()) {
799                $result = @pg_query($this->pgsql, "ROLLBACK");
800                @pg_free_result($result);
801            }
802            $this->reset_caches($tablenames);
803            throw $e;
804        }
805
806        $this->reset_caches($tablenames);
807        return true;
808    }
809
810    /**
811     * Execute general sql query. Should be used only when no other method suitable.
812     * Do NOT use this to make changes in db structure, use database_manager methods instead!
813     * @param string $sql query
814     * @param array $params query parameters
815     * @return bool true
816     * @throws dml_exception A DML specific exception is thrown for any errors.
817     */
818    public function execute($sql, array $params=null) {
819        list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
820
821        if (strpos($sql, ';') !== false) {
822            throw new coding_exception('moodle_database::execute() Multiple sql statements found or bound parameters not used properly in query!');
823        }
824
825        $this->query_start($sql, $params, SQL_QUERY_UPDATE);
826        $result = pg_query_params($this->pgsql, $sql, $params);
827        $this->query_end($result);
828
829        pg_free_result($result);
830        return true;
831    }
832
833    /**
834     * Get a number of records as a moodle_recordset using a SQL statement.
835     *
836     * Since this method is a little less readable, use of it should be restricted to
837     * code where it's possible there might be large datasets being returned.  For known
838     * small datasets use get_records_sql - it leads to simpler code.
839     *
840     * The return type is like:
841     * @see function get_recordset.
842     *
843     * @param string $sql the SQL select query to execute.
844     * @param array $params array of sql parameters
845     * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
846     * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
847     * @return moodle_recordset instance
848     * @throws dml_exception A DML specific exception is thrown for any errors.
849     */
850    public function get_recordset_sql($sql, array $params=null, $limitfrom=0, $limitnum=0) {
851
852        list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum);
853
854        if ($limitnum) {
855            $sql .= " LIMIT $limitnum";
856        }
857        if ($limitfrom) {
858            $sql .= " OFFSET $limitfrom";
859        }
860
861        list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
862
863        // For any query that doesn't explicitly specify a limit, we must use cursors to stop it
864        // loading the entire thing (unless the config setting is turned off).
865        $usecursors = !$limitnum && ($this->get_fetch_buffer_size() > 0);
866        if ($usecursors) {
867            // Work out the cursor unique identifer. This is based on a simple count used which
868            // should be OK because the identifiers only need to be unique within the current
869            // transaction.
870            $this->cursorcount++;
871            $cursorname = 'crs' . $this->cursorcount;
872
873            // Do the query to a cursor.
874            $sql = 'DECLARE ' . $cursorname . ' NO SCROLL CURSOR WITH HOLD FOR ' . $sql;
875        } else {
876            $cursorname = '';
877        }
878
879        $this->query_start($sql, $params, SQL_QUERY_SELECT);
880
881        $result = pg_query_params($this->pgsql, $sql, $params);
882
883        $this->query_end($result);
884        if ($usecursors) {
885            pg_free_result($result);
886            $result = null;
887        }
888
889        return new pgsql_native_moodle_recordset($result, $this, $cursorname);
890    }
891
892    /**
893     * Gets size of fetch buffer used for recordset queries.
894     *
895     * If this returns 0 then cursors will not be used, meaning recordset queries will occupy enough
896     * memory as needed for the Postgres library to hold the entire query results in memory.
897     *
898     * @return int Fetch buffer size or 0 indicating not to use cursors
899     */
900    protected function get_fetch_buffer_size() {
901        if (array_key_exists('fetchbuffersize', $this->dboptions)) {
902            return (int)$this->dboptions['fetchbuffersize'];
903        } else {
904            return self::DEFAULT_FETCH_BUFFER_SIZE;
905        }
906    }
907
908    /**
909     * Retrieves data from cursor. For use by recordset only; do not call directly.
910     *
911     * Return value contains the next batch of Postgres data, and a boolean indicating if this is
912     * definitely the last batch (if false, there may be more)
913     *
914     * @param string $cursorname Name of cursor to read from
915     * @return array Array with 2 elements (next data batch and boolean indicating last batch)
916     */
917    public function fetch_from_cursor($cursorname) {
918        $count = $this->get_fetch_buffer_size();
919
920        $sql = 'FETCH ' . $count . ' FROM ' . $cursorname;
921
922        $this->query_start($sql, [], SQL_QUERY_AUX);
923        $result = pg_query($this->pgsql, $sql);
924        $last = pg_num_rows($result) !== $count;
925
926        $this->query_end($result);
927
928        return [$result, $last];
929    }
930
931    /**
932     * Closes a cursor. For use by recordset only; do not call directly.
933     *
934     * @param string $cursorname Name of cursor to close
935     * @return bool True if we actually closed one, false if the transaction was cancelled
936     */
937    public function close_cursor($cursorname) {
938        // If the transaction got cancelled, then ignore this request.
939        $sql = 'CLOSE ' . $cursorname;
940        $this->query_start($sql, [], SQL_QUERY_AUX);
941        $result = pg_query($this->pgsql, $sql);
942        $this->query_end($result);
943        if ($result) {
944            pg_free_result($result);
945        }
946        return true;
947    }
948
949    /**
950     * Get a number of records as an array of objects using a SQL statement.
951     *
952     * Return value is like:
953     * @see function get_records.
954     *
955     * @param string $sql the SQL select query to execute. The first column of this SELECT statement
956     *   must be a unique value (usually the 'id' field), as it will be used as the key of the
957     *   returned array.
958     * @param array $params array of sql parameters
959     * @param int $limitfrom return a subset of records, starting at this point (optional, required if $limitnum is set).
960     * @param int $limitnum return a subset comprising this many records (optional, required if $limitfrom is set).
961     * @return array of objects, or empty array if no records were found
962     * @throws dml_exception A DML specific exception is thrown for any errors.
963     */
964    public function get_records_sql($sql, array $params = null, $limitfrom = 0, $limitnum = 0) {
965        list($limitfrom, $limitnum) = $this->normalise_limit_from_num($limitfrom, $limitnum);
966
967        if ($limitnum) {
968            $sql .= " LIMIT $limitnum";
969        }
970        if ($limitfrom) {
971            $sql .= " OFFSET $limitfrom";
972        }
973
974        list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
975        $this->query_start($sql, $params, SQL_QUERY_SELECT);
976        $result = pg_query_params($this->pgsql, $sql, $params);
977        $this->query_end($result);
978
979        // find out if there are any blobs
980        $numfields = pg_num_fields($result);
981        $blobs = array();
982        for ($i = 0; $i < $numfields; $i++) {
983            $type = pg_field_type($result, $i);
984            if ($type == 'bytea') {
985                $blobs[] = pg_field_name($result, $i);
986            }
987        }
988
989        $return = [];
990        while ($row = pg_fetch_assoc($result)) {
991            $id = reset($row);
992            if ($blobs) {
993                foreach ($blobs as $blob) {
994                    $row[$blob] = ($row[$blob] !== null ? pg_unescape_bytea($row[$blob]) : null);
995                }
996            }
997            if (isset($return[$id])) {
998                $colname = key($row);
999                debugging("Did you remember to make the first column something unique in your call to get_records? Duplicate value '$id' found in column '$colname'.", DEBUG_DEVELOPER);
1000            }
1001            $return[$id] = (object) $row;
1002        }
1003
1004        return $return;
1005    }
1006
1007    /**
1008     * Selects records and return values (first field) as an array using a SQL statement.
1009     *
1010     * @param string $sql The SQL query
1011     * @param array $params array of sql parameters
1012     * @return array of values
1013     * @throws dml_exception A DML specific exception is thrown for any errors.
1014     */
1015    public function get_fieldset_sql($sql, array $params=null) {
1016        list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1017
1018        $this->query_start($sql, $params, SQL_QUERY_SELECT);
1019        $result = pg_query_params($this->pgsql, $sql, $params);
1020        $this->query_end($result);
1021
1022        $return = pg_fetch_all_columns($result, 0);
1023
1024        if (pg_field_type($result, 0) == 'bytea') {
1025            foreach ($return as $key => $value) {
1026                $return[$key] = ($value === null ? $value : pg_unescape_bytea($value));
1027            }
1028        }
1029
1030        pg_free_result($result);
1031
1032        return $return;
1033    }
1034
1035    /**
1036     * Insert new record into database, as fast as possible, no safety checks, lobs not supported.
1037     * @param string $table name
1038     * @param mixed $params data record as object or array
1039     * @param bool $returnit return it of inserted record
1040     * @param bool $bulk true means repeated inserts expected
1041     * @param bool $customsequence true if 'id' included in $params, disables $returnid
1042     * @return bool|int true or new id
1043     * @throws dml_exception A DML specific exception is thrown for any errors.
1044     */
1045    public function insert_record_raw($table, $params, $returnid=true, $bulk=false, $customsequence=false) {
1046        if (!is_array($params)) {
1047            $params = (array)$params;
1048        }
1049
1050        $returning = "";
1051
1052        if ($customsequence) {
1053            if (!isset($params['id'])) {
1054                throw new coding_exception('moodle_database::insert_record_raw() id field must be specified if custom sequences used.');
1055            }
1056            $returnid = false;
1057        } else {
1058            if ($returnid) {
1059                $returning = "RETURNING id";
1060                unset($params['id']);
1061            } else {
1062                unset($params['id']);
1063            }
1064        }
1065
1066        if (empty($params)) {
1067            throw new coding_exception('moodle_database::insert_record_raw() no fields found.');
1068        }
1069
1070        $fields = implode(',', array_keys($params));
1071        $values = array();
1072        $i = 1;
1073        foreach ($params as $value) {
1074            $this->detect_objects($value);
1075            $values[] = "\$".$i++;
1076        }
1077        $values = implode(',', $values);
1078
1079        $sql = "INSERT INTO {$this->prefix}$table ($fields) VALUES($values) $returning";
1080        $this->query_start($sql, $params, SQL_QUERY_INSERT);
1081        $result = pg_query_params($this->pgsql, $sql, $params);
1082        $this->query_end($result);
1083
1084        if ($returning !== "") {
1085            $row = pg_fetch_assoc($result);
1086            $params['id'] = reset($row);
1087        }
1088        pg_free_result($result);
1089
1090        if (!$returnid) {
1091            return true;
1092        }
1093
1094        return (int)$params['id'];
1095    }
1096
1097    /**
1098     * Insert a record into a table and return the "id" field if required.
1099     *
1100     * Some conversions and safety checks are carried out. Lobs are supported.
1101     * If the return ID isn't required, then this just reports success as true/false.
1102     * $data is an object containing needed data
1103     * @param string $table The database table to be inserted into
1104     * @param object|array $dataobject A data object with values for one or more fields in the record
1105     * @param bool $returnid Should the id of the newly created record entry be returned? If this option is not requested then true/false is returned.
1106     * @return bool|int true or new id
1107     * @throws dml_exception A DML specific exception is thrown for any errors.
1108     */
1109    public function insert_record($table, $dataobject, $returnid=true, $bulk=false) {
1110        $dataobject = (array)$dataobject;
1111
1112        $columns = $this->get_columns($table);
1113        if (empty($columns)) {
1114            throw new dml_exception('ddltablenotexist', $table);
1115        }
1116
1117        $cleaned = array();
1118
1119        foreach ($dataobject as $field=>$value) {
1120            if ($field === 'id') {
1121                continue;
1122            }
1123            if (!isset($columns[$field])) {
1124                continue;
1125            }
1126            $column = $columns[$field];
1127            $cleaned[$field] = $this->normalise_value($column, $value);
1128        }
1129
1130        return $this->insert_record_raw($table, $cleaned, $returnid, $bulk);
1131
1132    }
1133
1134    /**
1135     * Insert multiple records into database as fast as possible.
1136     *
1137     * Order of inserts is maintained, but the operation is not atomic,
1138     * use transactions if necessary.
1139     *
1140     * This method is intended for inserting of large number of small objects,
1141     * do not use for huge objects with text or binary fields.
1142     *
1143     * @since Moodle 2.7
1144     *
1145     * @param string $table  The database table to be inserted into
1146     * @param array|Traversable $dataobjects list of objects to be inserted, must be compatible with foreach
1147     * @return void does not return new record ids
1148     *
1149     * @throws coding_exception if data objects have different structure
1150     * @throws dml_exception A DML specific exception is thrown for any errors.
1151     */
1152    public function insert_records($table, $dataobjects) {
1153        if (!is_array($dataobjects) and !($dataobjects instanceof Traversable)) {
1154            throw new coding_exception('insert_records() passed non-traversable object');
1155        }
1156
1157        // PostgreSQL does not seem to have problems with huge queries.
1158        $chunksize = 500;
1159        if (!empty($this->dboptions['bulkinsertsize'])) {
1160            $chunksize = (int)$this->dboptions['bulkinsertsize'];
1161        }
1162
1163        $columns = $this->get_columns($table, true);
1164
1165        $fields = null;
1166        $count = 0;
1167        $chunk = array();
1168        foreach ($dataobjects as $dataobject) {
1169            if (!is_array($dataobject) and !is_object($dataobject)) {
1170                throw new coding_exception('insert_records() passed invalid record object');
1171            }
1172            $dataobject = (array)$dataobject;
1173            if ($fields === null) {
1174                $fields = array_keys($dataobject);
1175                $columns = array_intersect_key($columns, $dataobject);
1176                unset($columns['id']);
1177            } else if ($fields !== array_keys($dataobject)) {
1178                throw new coding_exception('All dataobjects in insert_records() must have the same structure!');
1179            }
1180
1181            $count++;
1182            $chunk[] = $dataobject;
1183
1184            if ($count === $chunksize) {
1185                $this->insert_chunk($table, $chunk, $columns);
1186                $chunk = array();
1187                $count = 0;
1188            }
1189        }
1190
1191        if ($count) {
1192            $this->insert_chunk($table, $chunk, $columns);
1193        }
1194    }
1195
1196    /**
1197     * Insert records in chunks, strict param types...
1198     *
1199     * Note: can be used only from insert_records().
1200     *
1201     * @param string $table
1202     * @param array $chunk
1203     * @param database_column_info[] $columns
1204     */
1205    protected function insert_chunk($table, array $chunk, array $columns) {
1206        $i = 1;
1207        $params = array();
1208        $values = array();
1209        foreach ($chunk as $dataobject) {
1210            $vals = array();
1211            foreach ($columns as $field => $column) {
1212                $params[] = $this->normalise_value($column, $dataobject[$field]);
1213                $vals[] = "\$".$i++;
1214            }
1215            $values[] = '('.implode(',', $vals).')';
1216        }
1217
1218        $fieldssql = '('.implode(',', array_keys($columns)).')';
1219        $valuessql = implode(',', $values);
1220
1221        $sql = "INSERT INTO {$this->prefix}$table $fieldssql VALUES $valuessql";
1222        $this->query_start($sql, $params, SQL_QUERY_INSERT);
1223        $result = pg_query_params($this->pgsql, $sql, $params);
1224        $this->query_end($result);
1225        pg_free_result($result);
1226    }
1227
1228    /**
1229     * Import a record into a table, id field is required.
1230     * Safety checks are NOT carried out. Lobs are supported.
1231     *
1232     * @param string $table name of database table to be inserted into
1233     * @param object $dataobject A data object with values for one or more fields in the record
1234     * @return bool true
1235     * @throws dml_exception A DML specific exception is thrown for any errors.
1236     */
1237    public function import_record($table, $dataobject) {
1238        $dataobject = (array)$dataobject;
1239
1240        $columns = $this->get_columns($table);
1241        $cleaned = array();
1242
1243        foreach ($dataobject as $field=>$value) {
1244            $this->detect_objects($value);
1245            if (!isset($columns[$field])) {
1246                continue;
1247            }
1248            $column = $columns[$field];
1249            $cleaned[$field] = $this->normalise_value($column, $value);
1250        }
1251
1252        return $this->insert_record_raw($table, $cleaned, false, true, true);
1253    }
1254
1255    /**
1256     * Update record in database, as fast as possible, no safety checks, lobs not supported.
1257     * @param string $table name
1258     * @param mixed $params data record as object or array
1259     * @param bool true means repeated updates expected
1260     * @return bool true
1261     * @throws dml_exception A DML specific exception is thrown for any errors.
1262     */
1263    public function update_record_raw($table, $params, $bulk=false) {
1264        $params = (array)$params;
1265
1266        if (!isset($params['id'])) {
1267            throw new coding_exception('moodle_database::update_record_raw() id field must be specified.');
1268        }
1269        $id = $params['id'];
1270        unset($params['id']);
1271
1272        if (empty($params)) {
1273            throw new coding_exception('moodle_database::update_record_raw() no fields found.');
1274        }
1275
1276        $i = 1;
1277
1278        $sets = array();
1279        foreach ($params as $field=>$value) {
1280            $this->detect_objects($value);
1281            $sets[] = "$field = \$".$i++;
1282        }
1283
1284        $params[] = $id; // last ? in WHERE condition
1285
1286        $sets = implode(',', $sets);
1287        $sql = "UPDATE {$this->prefix}$table SET $sets WHERE id=\$".$i;
1288
1289        $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1290        $result = pg_query_params($this->pgsql, $sql, $params);
1291        $this->query_end($result);
1292
1293        pg_free_result($result);
1294        return true;
1295    }
1296
1297    /**
1298     * Update a record in a table
1299     *
1300     * $dataobject is an object containing needed data
1301     * Relies on $dataobject having a variable "id" to
1302     * specify the record to update
1303     *
1304     * @param string $table The database table to be checked against.
1305     * @param object $dataobject An object with contents equal to fieldname=>fieldvalue. Must have an entry for 'id' to map to the table specified.
1306     * @param bool true means repeated updates expected
1307     * @return bool true
1308     * @throws dml_exception A DML specific exception is thrown for any errors.
1309     */
1310    public function update_record($table, $dataobject, $bulk=false) {
1311        $dataobject = (array)$dataobject;
1312
1313        $columns = $this->get_columns($table);
1314        $cleaned = array();
1315
1316        foreach ($dataobject as $field=>$value) {
1317            if (!isset($columns[$field])) {
1318                continue;
1319            }
1320            $column = $columns[$field];
1321            $cleaned[$field] = $this->normalise_value($column, $value);
1322        }
1323
1324        $this->update_record_raw($table, $cleaned, $bulk);
1325
1326        return true;
1327    }
1328
1329    /**
1330     * Set a single field in every table record which match a particular WHERE clause.
1331     *
1332     * @param string $table The database table to be checked against.
1333     * @param string $newfield the field to set.
1334     * @param string $newvalue the value to set the field to.
1335     * @param string $select A fragment of SQL to be used in a where clause in the SQL call.
1336     * @param array $params array of sql parameters
1337     * @return bool true
1338     * @throws dml_exception A DML specific exception is thrown for any errors.
1339     */
1340    public function set_field_select($table, $newfield, $newvalue, $select, array $params=null) {
1341
1342        if ($select) {
1343            $select = "WHERE $select";
1344        }
1345        if (is_null($params)) {
1346            $params = array();
1347        }
1348        list($select, $params, $type) = $this->fix_sql_params($select, $params);
1349        $i = count($params)+1;
1350
1351        // Get column metadata
1352        $columns = $this->get_columns($table);
1353        $column = $columns[$newfield];
1354
1355        $normalisedvalue = $this->normalise_value($column, $newvalue);
1356
1357        $newfield = "$newfield = \$" . $i;
1358        $params[] = $normalisedvalue;
1359        $sql = "UPDATE {$this->prefix}$table SET $newfield $select";
1360
1361        $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1362        $result = pg_query_params($this->pgsql, $sql, $params);
1363        $this->query_end($result);
1364
1365        pg_free_result($result);
1366
1367        return true;
1368    }
1369
1370    /**
1371     * Delete one or more records from a table which match a particular WHERE clause, lobs not supported.
1372     *
1373     * @param string $table The database table to be checked against.
1374     * @param string $select A fragment of SQL to be used in a where clause in the SQL call (used to define the selection criteria).
1375     * @param array $params array of sql parameters
1376     * @return bool true
1377     * @throws dml_exception A DML specific exception is thrown for any errors.
1378     */
1379    public function delete_records_select($table, $select, array $params=null) {
1380        if ($select) {
1381            $select = "WHERE $select";
1382        }
1383        $sql = "DELETE FROM {$this->prefix}$table $select";
1384
1385        list($sql, $params, $type) = $this->fix_sql_params($sql, $params);
1386
1387        $this->query_start($sql, $params, SQL_QUERY_UPDATE);
1388        $result = pg_query_params($this->pgsql, $sql, $params);
1389        $this->query_end($result);
1390
1391        pg_free_result($result);
1392
1393        return true;
1394    }
1395
1396    /**
1397     * Returns 'LIKE' part of a query.
1398     *
1399     * @param string $fieldname usually name of the table column
1400     * @param string $param usually bound query parameter (?, :named)
1401     * @param bool $casesensitive use case sensitive search
1402     * @param bool $accensensitive use accent sensitive search (not all databases support accent insensitive)
1403     * @param bool $notlike true means "NOT LIKE"
1404     * @param string $escapechar escape char for '%' and '_'
1405     * @return string SQL code fragment
1406     */
1407    public function sql_like($fieldname, $param, $casesensitive = true, $accentsensitive = true, $notlike = false, $escapechar = '\\') {
1408        if (strpos($param, '%') !== false) {
1409            debugging('Potential SQL injection detected, sql_like() expects bound parameters (? or :named)');
1410        }
1411
1412        // postgresql does not support accent insensitive text comparisons, sorry
1413        if ($casesensitive) {
1414            $LIKE = $notlike ? 'NOT LIKE' : 'LIKE';
1415        } else {
1416            $LIKE = $notlike ? 'NOT ILIKE' : 'ILIKE';
1417        }
1418        return "$fieldname $LIKE $param ESCAPE '$escapechar'";
1419    }
1420
1421    public function sql_bitxor($int1, $int2) {
1422        return '((' . $int1 . ') # (' . $int2 . '))';
1423    }
1424
1425    public function sql_cast_char2int($fieldname, $text=false) {
1426        return ' CAST(' . $fieldname . ' AS INT) ';
1427    }
1428
1429    public function sql_cast_char2real($fieldname, $text=false) {
1430        return " $fieldname::real ";
1431    }
1432
1433    public function sql_concat() {
1434        $arr = func_get_args();
1435        $s = implode(' || ', $arr);
1436        if ($s === '') {
1437            return " '' ";
1438        }
1439        // Add always empty string element so integer-exclusive concats
1440        // will work without needing to cast each element explicitly
1441        return " '' || $s ";
1442    }
1443
1444    public function sql_concat_join($separator="' '", $elements=array()) {
1445        for ($n=count($elements)-1; $n > 0 ; $n--) {
1446            array_splice($elements, $n, 0, $separator);
1447        }
1448        $s = implode(' || ', $elements);
1449        if ($s === '') {
1450            return " '' ";
1451        }
1452        return " $s ";
1453    }
1454
1455    /**
1456     * Return SQL for performing group concatenation on given field/expression
1457     *
1458     * @param string $field
1459     * @param string $separator
1460     * @param string $sort
1461     * @return string
1462     */
1463    public function sql_group_concat(string $field, string $separator = ', ', string $sort = ''): string {
1464        $fieldsort = $sort ? "ORDER BY {$sort}" : '';
1465        return "STRING_AGG(CAST({$field} AS VARCHAR), '{$separator}' {$fieldsort})";
1466    }
1467
1468    public function sql_regex_supported() {
1469        return true;
1470    }
1471
1472    public function sql_regex($positivematch = true, $casesensitive = false) {
1473        if ($casesensitive) {
1474            return $positivematch ? '~' : '!~';
1475        } else {
1476            return $positivematch ? '~*' : '!~*';
1477        }
1478    }
1479
1480    /**
1481     * Does this driver support tool_replace?
1482     *
1483     * @since Moodle 2.6.1
1484     * @return bool
1485     */
1486    public function replace_all_text_supported() {
1487        return true;
1488    }
1489
1490    public function session_lock_supported() {
1491        return true;
1492    }
1493
1494    /**
1495     * Obtain session lock
1496     * @param int $rowid id of the row with session record
1497     * @param int $timeout max allowed time to wait for the lock in seconds
1498     * @return bool success
1499     */
1500    public function get_session_lock($rowid, $timeout) {
1501        // NOTE: there is a potential locking problem for database running
1502        //       multiple instances of moodle, we could try to use pg_advisory_lock(int, int),
1503        //       luckily there is not a big chance that they would collide
1504        if (!$this->session_lock_supported()) {
1505            return;
1506        }
1507
1508        parent::get_session_lock($rowid, $timeout);
1509
1510        $timeoutmilli = $timeout * 1000;
1511
1512        $sql = "SET statement_timeout TO $timeoutmilli";
1513        $this->query_start($sql, null, SQL_QUERY_AUX);
1514        $result = pg_query($this->pgsql, $sql);
1515        $this->query_end($result);
1516
1517        if ($result) {
1518            pg_free_result($result);
1519        }
1520
1521        $sql = "SELECT pg_advisory_lock($rowid)";
1522        $this->query_start($sql, null, SQL_QUERY_AUX);
1523        $start = time();
1524        $result = pg_query($this->pgsql, $sql);
1525        $end = time();
1526        try {
1527            $this->query_end($result);
1528        } catch (dml_exception $ex) {
1529            if ($end - $start >= $timeout) {
1530                throw new dml_sessionwait_exception();
1531            } else {
1532                throw $ex;
1533            }
1534        }
1535
1536        if ($result) {
1537            pg_free_result($result);
1538        }
1539
1540        $sql = "SET statement_timeout TO DEFAULT";
1541        $this->query_start($sql, null, SQL_QUERY_AUX);
1542        $result = pg_query($this->pgsql, $sql);
1543        $this->query_end($result);
1544
1545        if ($result) {
1546            pg_free_result($result);
1547        }
1548    }
1549
1550    public function release_session_lock($rowid) {
1551        if (!$this->session_lock_supported()) {
1552            return;
1553        }
1554        if (!$this->used_for_db_sessions) {
1555            return;
1556        }
1557
1558        parent::release_session_lock($rowid);
1559
1560        $sql = "SELECT pg_advisory_unlock($rowid)";
1561        $this->query_start($sql, null, SQL_QUERY_AUX);
1562        $result = pg_query($this->pgsql, $sql);
1563        $this->query_end($result);
1564
1565        if ($result) {
1566            pg_free_result($result);
1567        }
1568    }
1569
1570    /**
1571     * Driver specific start of real database transaction,
1572     * this can not be used directly in code.
1573     * @return void
1574     */
1575    protected function begin_transaction() {
1576        $this->savepointpresent = true;
1577        $sql = "BEGIN ISOLATION LEVEL READ COMMITTED; SAVEPOINT moodle_pg_savepoint";
1578        $this->query_start($sql, null, SQL_QUERY_AUX);
1579        $result = pg_query($this->pgsql, $sql);
1580        $this->query_end($result);
1581
1582        pg_free_result($result);
1583    }
1584
1585    /**
1586     * Driver specific commit of real database transaction,
1587     * this can not be used directly in code.
1588     * @return void
1589     */
1590    protected function commit_transaction() {
1591        $this->savepointpresent = false;
1592        $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; COMMIT";
1593        $this->query_start($sql, null, SQL_QUERY_AUX);
1594        $result = pg_query($this->pgsql, $sql);
1595        $this->query_end($result);
1596
1597        pg_free_result($result);
1598    }
1599
1600    /**
1601     * Driver specific abort of real database transaction,
1602     * this can not be used directly in code.
1603     * @return void
1604     */
1605    protected function rollback_transaction() {
1606        $this->savepointpresent = false;
1607        $sql = "RELEASE SAVEPOINT moodle_pg_savepoint; ROLLBACK";
1608        $this->query_start($sql, null, SQL_QUERY_AUX);
1609        $result = pg_query($this->pgsql, $sql);
1610        $this->query_end($result);
1611
1612        pg_free_result($result);
1613    }
1614
1615    /**
1616     * Helper function trimming (whitespace + quotes) any string
1617     * needed because PG uses to enclose with double quotes some
1618     * fields in indexes definition and others
1619     *
1620     * @param string $str string to apply whitespace + quotes trim
1621     * @return string trimmed string
1622     */
1623    private function trim_quotes($str) {
1624        return trim(trim($str), "'\"");
1625    }
1626
1627    /**
1628     * Postgresql supports full-text search indexes.
1629     *
1630     * @return bool
1631     */
1632    public function is_fulltext_search_supported() {
1633        return true;
1634    }
1635}
1636