1<?php 2 3namespace Drupal\Core\Database\Driver\pgsql; 4 5use Drupal\Core\Database\SchemaObjectExistsException; 6use Drupal\Core\Database\SchemaObjectDoesNotExistException; 7use Drupal\Core\Database\Schema as DatabaseSchema; 8 9// cSpell:ignore adbin adnum adrelid adsrc attisdropped attname attnum attrdef 10// cSpell:ignore attrelid atttypid atttypmod bigserial conkey conname conrelid 11// cSpell:ignore contype fillfactor indexname indexrelid indisprimary indkey 12// cSpell:ignore indrelid nextval nspname regclass relkind relname relnamespace 13// cSpell:ignore schemaname setval 14 15/** 16 * @addtogroup schemaapi 17 * @{ 18 */ 19 20/** 21 * PostgreSQL implementation of \Drupal\Core\Database\Schema. 22 */ 23class Schema extends DatabaseSchema { 24 25 /** 26 * A cache of information about blob columns and sequences of tables. 27 * 28 * This is collected by Schema::queryTableInformation(), by introspecting the 29 * database. 30 * 31 * @see \Drupal\Core\Database\Driver\pgsql\Schema::queryTableInformation() 32 * @var array 33 */ 34 protected $tableInformation = []; 35 36 /** 37 * The maximum allowed length for index, primary key and constraint names. 38 * 39 * Value will usually be set to a 63 chars limit but PostgreSQL allows 40 * to higher this value before compiling, so we need to check for that. 41 * 42 * @var int 43 */ 44 protected $maxIdentifierLength; 45 46 /** 47 * PostgreSQL's temporary namespace name. 48 * 49 * @var string 50 */ 51 protected $tempNamespaceName; 52 53 /** 54 * Make sure to limit identifiers according to PostgreSQL compiled in length. 55 * 56 * PostgreSQL allows in standard configuration identifiers no longer than 63 57 * chars for table/relation names, indexes, primary keys, and constraints. So 58 * we map all identifiers that are too long to drupal_base64hash_tag, where 59 * tag is one of: 60 * - idx for indexes 61 * - key for constraints 62 * - pkey for primary keys 63 * - seq for sequences 64 * 65 * @param string $table_identifier_part 66 * The first argument used to build the identifier string. This usually 67 * refers to a table/relation name. 68 * @param string $column_identifier_part 69 * The second argument used to build the identifier string. This usually 70 * refers to one or more column names. 71 * @param string $tag 72 * The identifier tag. It can be one of 'idx', 'key', 'pkey' or 'seq'. 73 * @param string $separator 74 * (optional) The separator used to glue together the aforementioned 75 * identifier parts. Defaults to '__'. 76 * 77 * @return string 78 * The index/constraint/pkey identifier. 79 */ 80 protected function ensureIdentifiersLength($table_identifier_part, $column_identifier_part, $tag, $separator = '__') { 81 $info = $this->getPrefixInfo($table_identifier_part); 82 $table_identifier_part = $info['table']; 83 $identifierName = implode($separator, [$table_identifier_part, $column_identifier_part, $tag]); 84 85 // Retrieve the max identifier length which is usually 63 characters 86 // but can be altered before PostgreSQL is compiled so we need to check. 87 if (empty($this->maxIdentifierLength)) { 88 $this->maxIdentifierLength = $this->connection->query("SHOW max_identifier_length")->fetchField(); 89 } 90 91 if (strlen($identifierName) > $this->maxIdentifierLength) { 92 $saveIdentifier = '"drupal_' . $this->hashBase64($identifierName) . '_' . $tag . '"'; 93 } 94 else { 95 $saveIdentifier = $identifierName; 96 } 97 return $saveIdentifier; 98 } 99 100 /** 101 * Fetch the list of blobs and sequences used on a table. 102 * 103 * We introspect the database to collect the information required by insert 104 * and update queries. 105 * 106 * @param string $table 107 * The non-prefixed name of the table. 108 * 109 * @return mixed|object 110 * An object with two member variables: 111 * - 'blob_fields' that lists all the blob fields in the table. 112 * - 'sequences' that lists the sequences used in that table. 113 * 114 * @throws \Exception 115 * Exception thrown when the query for the table information fails. 116 */ 117 public function queryTableInformation($table) { 118 // Generate a key to reference this table's information on. 119 $key = $this->connection->prefixTables('{' . $table . '}'); 120 121 // Take into account that temporary tables are stored in a different schema. 122 // \Drupal\Core\Database\Connection::generateTemporaryTableName() sets the 123 // 'db_temporary_' prefix to all temporary tables. 124 if (strpos($key, '.') === FALSE && strpos($table, 'db_temporary_') === FALSE) { 125 $key = 'public.' . $key; 126 } 127 else { 128 $key = $this->getTempNamespaceName() . '.' . $key; 129 } 130 131 if (!isset($this->tableInformation[$key])) { 132 $table_information = (object) [ 133 'blob_fields' => [], 134 'sequences' => [], 135 ]; 136 $this->connection->addSavepoint(); 137 138 try { 139 // The bytea columns and sequences for a table can be found in 140 // pg_attribute, which is significantly faster than querying the 141 // information_schema. The data type of a field can be found by lookup 142 // of the attribute ID, and the default value must be extracted from the 143 // node tree for the attribute definition instead of the historical 144 // human-readable column, adsrc. 145 $sql = <<<'EOD' 146SELECT pg_attribute.attname AS column_name, format_type(pg_attribute.atttypid, pg_attribute.atttypmod) AS data_type, pg_get_expr(pg_attrdef.adbin, pg_attribute.attrelid) AS column_default 147FROM pg_attribute 148LEFT JOIN pg_attrdef ON pg_attrdef.adrelid = pg_attribute.attrelid AND pg_attrdef.adnum = pg_attribute.attnum 149WHERE pg_attribute.attnum > 0 150AND NOT pg_attribute.attisdropped 151AND pg_attribute.attrelid = :key::regclass 152AND (format_type(pg_attribute.atttypid, pg_attribute.atttypmod) = 'bytea' 153OR pg_get_expr(pg_attrdef.adbin, pg_attribute.attrelid) LIKE 'nextval%') 154EOD; 155 $result = $this->connection->query($sql, [ 156 ':key' => $key, 157 ]); 158 } 159 catch (\Exception $e) { 160 $this->connection->rollbackSavepoint(); 161 throw $e; 162 } 163 $this->connection->releaseSavepoint(); 164 165 // If the table information does not yet exist in the PostgreSQL 166 // metadata, then return the default table information here, so that it 167 // will not be cached. 168 if (empty($result)) { 169 return $table_information; 170 } 171 172 foreach ($result as $column) { 173 if ($column->data_type == 'bytea') { 174 $table_information->blob_fields[$column->column_name] = TRUE; 175 } 176 elseif (preg_match("/nextval\('([^']+)'/", $column->column_default, $matches)) { 177 // We must know of any sequences in the table structure to help us 178 // return the last insert id. If there is more than 1 sequences the 179 // first one (index 0 of the sequences array) will be used. 180 $table_information->sequences[] = $matches[1]; 181 $table_information->serial_fields[] = $column->column_name; 182 } 183 } 184 $this->tableInformation[$key] = $table_information; 185 } 186 return $this->tableInformation[$key]; 187 } 188 189 /** 190 * Gets PostgreSQL's temporary namespace name. 191 * 192 * @return string 193 * PostgreSQL's temporary namespace name. 194 */ 195 protected function getTempNamespaceName() { 196 if (!isset($this->tempNamespaceName)) { 197 $this->tempNamespaceName = $this->connection->query('SELECT nspname FROM pg_namespace WHERE oid = pg_my_temp_schema()')->fetchField(); 198 } 199 return $this->tempNamespaceName; 200 } 201 202 /** 203 * Resets information about table blobs, sequences and serial fields. 204 * 205 * @param $table 206 * The non-prefixed name of the table. 207 */ 208 protected function resetTableInformation($table) { 209 $key = $this->connection->prefixTables('{' . $table . '}'); 210 if (strpos($key, '.') === FALSE) { 211 $key = 'public.' . $key; 212 } 213 unset($this->tableInformation[$key]); 214 } 215 216 /** 217 * Fetches the list of constraints used on a field. 218 * 219 * We introspect the database to collect the information required by field 220 * alteration. 221 * 222 * @param string $table 223 * The non-prefixed name of the table. 224 * @param string $field 225 * The name of the field. 226 * @param string $constraint_type 227 * (optional) The type of the constraint. This can be one of the following: 228 * - c: check constraint; 229 * - f: foreign key constraint; 230 * - p: primary key constraint; 231 * - u: unique constraint; 232 * - t: constraint trigger; 233 * - x: exclusion constraint. 234 * Defaults to 'c' for a CHECK constraint. 235 * @see https://www.postgresql.org/docs/current/catalog-pg-constraint.html 236 * 237 * @return array 238 * An array containing all the constraint names for the field. 239 * 240 * @throws \Exception 241 * Exception thrown when the query for the table information fails. 242 */ 243 public function queryFieldInformation($table, $field, $constraint_type = 'c') { 244 assert(in_array($constraint_type, ['c', 'f', 'p', 'u', 't', 'x'])); 245 $prefixInfo = $this->getPrefixInfo($table, TRUE); 246 247 // Split the key into schema and table for querying. 248 $schema = $prefixInfo['schema']; 249 $table_name = $prefixInfo['table']; 250 251 $this->connection->addSavepoint(); 252 253 try { 254 $checks = $this->connection->query("SELECT conname FROM pg_class cl INNER JOIN pg_constraint co ON co.conrelid = cl.oid INNER JOIN pg_attribute attr ON attr.attrelid = cl.oid AND attr.attnum = ANY (co.conkey) INNER JOIN pg_namespace ns ON cl.relnamespace = ns.oid WHERE co.contype = :constraint_type AND ns.nspname = :schema AND cl.relname = :table AND attr.attname = :column", [ 255 ':constraint_type' => $constraint_type, 256 ':schema' => $schema, 257 ':table' => $table_name, 258 ':column' => $field, 259 ]); 260 } 261 catch (\Exception $e) { 262 $this->connection->rollbackSavepoint(); 263 throw $e; 264 } 265 266 $this->connection->releaseSavepoint(); 267 268 $field_information = $checks->fetchCol(); 269 270 return $field_information; 271 } 272 273 /** 274 * Generate SQL to create a new table from a Drupal schema definition. 275 * 276 * @param string $name 277 * The name of the table to create. 278 * @param array $table 279 * A Schema API table definition array. 280 * 281 * @return array 282 * An array of SQL statements to create the table. 283 */ 284 protected function createTableSql($name, $table) { 285 $sql_fields = []; 286 foreach ($table['fields'] as $field_name => $field) { 287 $sql_fields[] = $this->createFieldSql($field_name, $this->processField($field)); 288 } 289 290 $sql_keys = []; 291 if (!empty($table['primary key']) && is_array($table['primary key'])) { 292 $this->ensureNotNullPrimaryKey($table['primary key'], $table['fields']); 293 $sql_keys[] = 'CONSTRAINT ' . $this->ensureIdentifiersLength($name, '', 'pkey') . ' PRIMARY KEY (' . $this->createPrimaryKeySql($table['primary key']) . ')'; 294 } 295 if (isset($table['unique keys']) && is_array($table['unique keys'])) { 296 foreach ($table['unique keys'] as $key_name => $key) { 297 $sql_keys[] = 'CONSTRAINT ' . $this->ensureIdentifiersLength($name, $key_name, 'key') . ' UNIQUE (' . implode(', ', $key) . ')'; 298 } 299 } 300 301 $sql = "CREATE TABLE {" . $name . "} (\n\t"; 302 $sql .= implode(",\n\t", $sql_fields); 303 if (count($sql_keys) > 0) { 304 $sql .= ",\n\t"; 305 } 306 $sql .= implode(",\n\t", $sql_keys); 307 $sql .= "\n)"; 308 $statements[] = $sql; 309 310 if (isset($table['indexes']) && is_array($table['indexes'])) { 311 foreach ($table['indexes'] as $key_name => $key) { 312 $statements[] = $this->_createIndexSql($name, $key_name, $key); 313 } 314 } 315 316 // Add table comment. 317 if (!empty($table['description'])) { 318 $statements[] = 'COMMENT ON TABLE {' . $name . '} IS ' . $this->prepareComment($table['description']); 319 } 320 321 // Add column comments. 322 foreach ($table['fields'] as $field_name => $field) { 323 if (!empty($field['description'])) { 324 $statements[] = 'COMMENT ON COLUMN {' . $name . '}.' . $field_name . ' IS ' . $this->prepareComment($field['description']); 325 } 326 } 327 328 return $statements; 329 } 330 331 /** 332 * Create an SQL string for a field to be used in table creation or 333 * alteration. 334 * 335 * @param $name 336 * Name of the field. 337 * @param $spec 338 * The field specification, as per the schema data structure format. 339 */ 340 protected function createFieldSql($name, $spec) { 341 // The PostgreSQL server converts names into lowercase, unless quoted. 342 $sql = '"' . $name . '" ' . $spec['pgsql_type']; 343 344 if (isset($spec['type']) && $spec['type'] == 'serial') { 345 unset($spec['not null']); 346 } 347 348 if (in_array($spec['pgsql_type'], ['varchar', 'character']) && isset($spec['length'])) { 349 $sql .= '(' . $spec['length'] . ')'; 350 } 351 elseif (isset($spec['precision']) && isset($spec['scale'])) { 352 $sql .= '(' . $spec['precision'] . ', ' . $spec['scale'] . ')'; 353 } 354 355 if (!empty($spec['unsigned'])) { 356 $sql .= " CHECK ($name >= 0)"; 357 } 358 359 if (isset($spec['not null'])) { 360 if ($spec['not null']) { 361 $sql .= ' NOT NULL'; 362 } 363 else { 364 $sql .= ' NULL'; 365 } 366 } 367 if (array_key_exists('default', $spec)) { 368 $default = $this->escapeDefaultValue($spec['default']); 369 $sql .= " default $default"; 370 } 371 372 return $sql; 373 } 374 375 /** 376 * Set database-engine specific properties for a field. 377 * 378 * @param $field 379 * A field description array, as specified in the schema documentation. 380 */ 381 protected function processField($field) { 382 if (!isset($field['size'])) { 383 $field['size'] = 'normal'; 384 } 385 386 // Set the correct database-engine specific datatype. 387 // In case one is already provided, force it to lowercase. 388 if (isset($field['pgsql_type'])) { 389 $field['pgsql_type'] = mb_strtolower($field['pgsql_type']); 390 } 391 else { 392 $map = $this->getFieldTypeMap(); 393 $field['pgsql_type'] = $map[$field['type'] . ':' . $field['size']]; 394 } 395 396 if (!empty($field['unsigned'])) { 397 // Unsigned data types are not supported in PostgreSQL 10. In MySQL, 398 // they are used to ensure a positive number is inserted and it also 399 // doubles the maximum integer size that can be stored in a field. 400 // The PostgreSQL schema in Drupal creates a check constraint 401 // to ensure that a value inserted is >= 0. To provide the extra 402 // integer capacity, here, we bump up the column field size. 403 if (!isset($map)) { 404 $map = $this->getFieldTypeMap(); 405 } 406 switch ($field['pgsql_type']) { 407 case 'smallint': 408 $field['pgsql_type'] = $map['int:medium']; 409 break; 410 411 case 'int': 412 $field['pgsql_type'] = $map['int:big']; 413 break; 414 } 415 } 416 if (isset($field['type']) && $field['type'] == 'serial') { 417 unset($field['not null']); 418 } 419 return $field; 420 } 421 422 /** 423 * {@inheritdoc} 424 */ 425 public function getFieldTypeMap() { 426 // Put :normal last so it gets preserved by array_flip. This makes 427 // it much easier for modules (such as schema.module) to map 428 // database types back into schema types. 429 // $map does not use drupal_static as its value never changes. 430 static $map = [ 431 'varchar_ascii:normal' => 'varchar', 432 433 'varchar:normal' => 'varchar', 434 'char:normal' => 'character', 435 436 'text:tiny' => 'text', 437 'text:small' => 'text', 438 'text:medium' => 'text', 439 'text:big' => 'text', 440 'text:normal' => 'text', 441 442 'int:tiny' => 'smallint', 443 'int:small' => 'smallint', 444 'int:medium' => 'int', 445 'int:big' => 'bigint', 446 'int:normal' => 'int', 447 448 'float:tiny' => 'real', 449 'float:small' => 'real', 450 'float:medium' => 'real', 451 'float:big' => 'double precision', 452 'float:normal' => 'real', 453 454 'numeric:normal' => 'numeric', 455 456 'blob:big' => 'bytea', 457 'blob:normal' => 'bytea', 458 459 'serial:tiny' => 'serial', 460 'serial:small' => 'serial', 461 'serial:medium' => 'serial', 462 'serial:big' => 'bigserial', 463 'serial:normal' => 'serial', 464 ]; 465 return $map; 466 } 467 468 protected function _createKeySql($fields) { 469 $return = []; 470 foreach ($fields as $field) { 471 if (is_array($field)) { 472 $return[] = 'substr(' . $field[0] . ', 1, ' . $field[1] . ')'; 473 } 474 else { 475 $return[] = '"' . $field . '"'; 476 } 477 } 478 return implode(', ', $return); 479 } 480 481 /** 482 * Create the SQL expression for primary keys. 483 * 484 * Postgresql does not support key length. It does support fillfactor, but 485 * that requires a separate database lookup for each column in the key. The 486 * key length defined in the schema is ignored. 487 */ 488 protected function createPrimaryKeySql($fields) { 489 $return = []; 490 foreach ($fields as $field) { 491 if (is_array($field)) { 492 $return[] = '"' . $field[0] . '"'; 493 } 494 else { 495 $return[] = '"' . $field . '"'; 496 } 497 } 498 return implode(', ', $return); 499 } 500 501 /** 502 * {@inheritdoc} 503 */ 504 public function tableExists($table) { 505 $prefixInfo = $this->getPrefixInfo($table, TRUE); 506 507 return (bool) $this->connection->query("SELECT 1 FROM pg_tables WHERE schemaname = :schema AND tablename = :table", [':schema' => $prefixInfo['schema'], ':table' => $prefixInfo['table']])->fetchField(); 508 } 509 510 /** 511 * {@inheritdoc} 512 */ 513 public function findTables($table_expression) { 514 $individually_prefixed_tables = $this->connection->getUnprefixedTablesMap(); 515 $default_prefix = $this->connection->tablePrefix(); 516 $default_prefix_length = strlen($default_prefix); 517 $tables = []; 518 519 // Load all the tables up front in order to take into account per-table 520 // prefixes. The actual matching is done at the bottom of the method. 521 $results = $this->connection->query("SELECT tablename FROM pg_tables WHERE schemaname = :schema", [':schema' => $this->defaultSchema]); 522 foreach ($results as $table) { 523 // Take into account tables that have an individual prefix. 524 if (isset($individually_prefixed_tables[$table->tablename])) { 525 $prefix_length = strlen($this->connection->tablePrefix($individually_prefixed_tables[$table->tablename])); 526 } 527 elseif ($default_prefix && substr($table->tablename, 0, $default_prefix_length) !== $default_prefix) { 528 // This table name does not start the default prefix, which means that 529 // it is not managed by Drupal so it should be excluded from the result. 530 continue; 531 } 532 else { 533 $prefix_length = $default_prefix_length; 534 } 535 536 // Remove the prefix from the returned tables. 537 $unprefixed_table_name = substr($table->tablename, $prefix_length); 538 539 // The pattern can match a table which is the same as the prefix. That 540 // will become an empty string when we remove the prefix, which will 541 // probably surprise the caller, besides not being a prefixed table. So 542 // remove it. 543 if (!empty($unprefixed_table_name)) { 544 $tables[$unprefixed_table_name] = $unprefixed_table_name; 545 } 546 } 547 548 // Convert the table expression from its SQL LIKE syntax to a regular 549 // expression and escape the delimiter that will be used for matching. 550 $table_expression = str_replace(['%', '_'], ['.*?', '.'], preg_quote($table_expression, '/')); 551 $tables = preg_grep('/^' . $table_expression . '$/i', $tables); 552 553 return $tables; 554 } 555 556 /** 557 * {@inheritdoc} 558 */ 559 public function renameTable($table, $new_name) { 560 if (!$this->tableExists($table)) { 561 throw new SchemaObjectDoesNotExistException("Cannot rename '$table' to '$new_name': table '$table' doesn't exist."); 562 } 563 if ($this->tableExists($new_name)) { 564 throw new SchemaObjectExistsException("Cannot rename '$table' to '$new_name': table '$new_name' already exists."); 565 } 566 567 // Get the schema and tablename for the old table. 568 $old_full_name = str_replace('"', '', $this->connection->prefixTables('{' . $table . '}')); 569 list($old_schema, $old_table_name) = strpos($old_full_name, '.') ? explode('.', $old_full_name) : ['public', $old_full_name]; 570 571 // Index names and constraint names are global in PostgreSQL, so we need to 572 // rename them when renaming the table. 573 $indexes = $this->connection->query('SELECT indexname FROM pg_indexes WHERE schemaname = :schema AND tablename = :table', [':schema' => $old_schema, ':table' => $old_table_name]); 574 575 foreach ($indexes as $index) { 576 // Get the index type by suffix, e.g. idx/key/pkey 577 $index_type = substr($index->indexname, strrpos($index->indexname, '_') + 1); 578 579 // If the index is already rewritten by ensureIdentifiersLength() to not 580 // exceed the 63 chars limit of PostgreSQL, we need to take care of that. 581 // cSpell:disable-next-line 582 // Example (drupal_Gk7Su_T1jcBHVuvSPeP22_I3Ni4GrVEgTYlIYnBJkro_idx). 583 if (strpos($index->indexname, 'drupal_') !== FALSE) { 584 preg_match('/^drupal_(.*)_' . preg_quote($index_type) . '/', $index->indexname, $matches); 585 $index_name = $matches[1]; 586 } 587 else { 588 // Make sure to remove the suffix from index names, because 589 // $this->ensureIdentifiersLength() will add the suffix again and thus 590 // would result in a wrong index name. 591 preg_match('/^' . preg_quote($old_full_name) . '__(.*)__' . preg_quote($index_type) . '/', $index->indexname, $matches); 592 $index_name = $matches[1]; 593 } 594 $this->connection->query('ALTER INDEX "' . $index->indexname . '" RENAME TO ' . $this->ensureIdentifiersLength($new_name, $index_name, $index_type) . ''); 595 } 596 597 // Ensure the new table name does not include schema syntax. 598 $prefixInfo = $this->getPrefixInfo($new_name); 599 600 // Rename sequences if the table contains serial fields. 601 $info = $this->queryTableInformation($table); 602 if (!empty($info->serial_fields)) { 603 foreach ($info->serial_fields as $field) { 604 // The initial name of the sequence is generated automatically by 605 // PostgreSQL when the table is created, so we need to use 606 // pg_get_serial_sequence() to retrieve it. 607 $old_sequence = $this->connection->query("SELECT pg_get_serial_sequence('" . $old_full_name . "', '" . $field . "')")->fetchField(); 608 609 // If the new sequence name exceeds the maximum identifier length limit, 610 // it will not match the pattern that is automatically applied by 611 // PostgreSQL on table creation, but that's ok because 612 // pg_get_serial_sequence() will return our non-standard name on 613 // subsequent table renames. 614 $new_sequence = $this->ensureIdentifiersLength($new_name, $field, 'seq', '_'); 615 616 $this->connection->query('ALTER SEQUENCE ' . $old_sequence . ' RENAME TO ' . $new_sequence); 617 } 618 } 619 // Now rename the table. 620 $this->connection->query('ALTER TABLE {' . $table . '} RENAME TO ' . $prefixInfo['table']); 621 $this->resetTableInformation($table); 622 } 623 624 /** 625 * {@inheritdoc} 626 */ 627 public function dropTable($table) { 628 if (!$this->tableExists($table)) { 629 return FALSE; 630 } 631 632 $this->connection->query('DROP TABLE {' . $table . '}'); 633 $this->resetTableInformation($table); 634 return TRUE; 635 } 636 637 /** 638 * {@inheritdoc} 639 */ 640 public function addField($table, $field, $spec, $new_keys = []) { 641 if (!$this->tableExists($table)) { 642 throw new SchemaObjectDoesNotExistException("Cannot add field '$table.$field': table doesn't exist."); 643 } 644 if ($this->fieldExists($table, $field)) { 645 throw new SchemaObjectExistsException("Cannot add field '$table.$field': field already exists."); 646 } 647 648 // Fields that are part of a PRIMARY KEY must be added as NOT NULL. 649 $is_primary_key = isset($new_keys['primary key']) && in_array($field, $new_keys['primary key'], TRUE); 650 if ($is_primary_key) { 651 $this->ensureNotNullPrimaryKey($new_keys['primary key'], [$field => $spec]); 652 } 653 654 $fixnull = FALSE; 655 if (!empty($spec['not null']) && !isset($spec['default']) && !$is_primary_key) { 656 $fixnull = TRUE; 657 $spec['not null'] = FALSE; 658 } 659 $query = 'ALTER TABLE {' . $table . '} ADD COLUMN '; 660 $query .= $this->createFieldSql($field, $this->processField($spec)); 661 $this->connection->query($query); 662 if (isset($spec['initial_from_field'])) { 663 if (isset($spec['initial'])) { 664 $expression = 'COALESCE(' . $spec['initial_from_field'] . ', :default_initial_value)'; 665 $arguments = [':default_initial_value' => $spec['initial']]; 666 } 667 else { 668 $expression = $spec['initial_from_field']; 669 $arguments = []; 670 } 671 $this->connection->update($table) 672 ->expression($field, $expression, $arguments) 673 ->execute(); 674 } 675 elseif (isset($spec['initial'])) { 676 $this->connection->update($table) 677 ->fields([$field => $spec['initial']]) 678 ->execute(); 679 } 680 if ($fixnull) { 681 $this->connection->query("ALTER TABLE {" . $table . "} ALTER $field SET NOT NULL"); 682 } 683 if (isset($new_keys)) { 684 // Make sure to drop the existing primary key before adding a new one. 685 // This is only needed when adding a field because this method, unlike 686 // changeField(), is supposed to handle primary keys automatically. 687 if (isset($new_keys['primary key']) && $this->constraintExists($table, 'pkey')) { 688 $this->dropPrimaryKey($table); 689 } 690 $this->_createKeys($table, $new_keys); 691 } 692 // Add column comment. 693 if (!empty($spec['description'])) { 694 $this->connection->query('COMMENT ON COLUMN {' . $table . '}.' . $field . ' IS ' . $this->prepareComment($spec['description'])); 695 } 696 $this->resetTableInformation($table); 697 } 698 699 /** 700 * {@inheritdoc} 701 */ 702 public function dropField($table, $field) { 703 if (!$this->fieldExists($table, $field)) { 704 return FALSE; 705 } 706 707 $this->connection->query('ALTER TABLE {' . $table . '} DROP COLUMN "' . $field . '"'); 708 $this->resetTableInformation($table); 709 return TRUE; 710 } 711 712 /** 713 * {@inheritdoc} 714 */ 715 public function fieldExists($table, $column) { 716 $prefixInfo = $this->getPrefixInfo($table); 717 718 return (bool) $this->connection->query("SELECT 1 FROM pg_attribute WHERE attrelid = :key::regclass AND attname = :column AND NOT attisdropped AND attnum > 0", [':key' => $prefixInfo['schema'] . '.' . $prefixInfo['table'], ':column' => $column])->fetchField(); 719 } 720 721 /** 722 * {@inheritdoc} 723 */ 724 public function indexExists($table, $name) { 725 // Details https://www.postgresql.org/docs/10/view-pg-indexes.html 726 $index_name = $this->ensureIdentifiersLength($table, $name, 'idx'); 727 // Remove leading and trailing quotes because the index name is in a WHERE 728 // clause and not used as an identifier. 729 $index_name = str_replace('"', '', $index_name); 730 return (bool) $this->connection->query("SELECT 1 FROM pg_indexes WHERE indexname = '$index_name'")->fetchField(); 731 } 732 733 /** 734 * Helper function: check if a constraint (PK, FK, UK) exists. 735 * 736 * @param string $table 737 * The name of the table. 738 * @param string $name 739 * The name of the constraint (typically 'pkey' or '[constraint]__key'). 740 * 741 * @return bool 742 * TRUE if the constraint exists, FALSE otherwise. 743 */ 744 public function constraintExists($table, $name) { 745 // ::ensureIdentifiersLength() expects three parameters, although not 746 // explicitly stated in its signature, thus we split our constraint name in 747 // a proper name and a suffix. 748 if ($name == 'pkey') { 749 $suffix = $name; 750 $name = ''; 751 } 752 else { 753 $pos = strrpos($name, '__'); 754 $suffix = substr($name, $pos + 2); 755 $name = substr($name, 0, $pos); 756 } 757 $constraint_name = $this->ensureIdentifiersLength($table, $name, $suffix); 758 // Remove leading and trailing quotes because the index name is in a WHERE 759 // clause and not used as an identifier. 760 $constraint_name = str_replace('"', '', $constraint_name); 761 return (bool) $this->connection->query("SELECT 1 FROM pg_constraint WHERE conname = '$constraint_name'")->fetchField(); 762 } 763 764 /** 765 * {@inheritdoc} 766 */ 767 public function addPrimaryKey($table, $fields) { 768 if (!$this->tableExists($table)) { 769 throw new SchemaObjectDoesNotExistException("Cannot add primary key to table '$table': table doesn't exist."); 770 } 771 if ($this->constraintExists($table, 'pkey')) { 772 throw new SchemaObjectExistsException("Cannot add primary key to table '$table': primary key already exists."); 773 } 774 775 $this->connection->query('ALTER TABLE {' . $table . '} ADD CONSTRAINT ' . $this->ensureIdentifiersLength($table, '', 'pkey') . ' PRIMARY KEY (' . $this->createPrimaryKeySql($fields) . ')'); 776 $this->resetTableInformation($table); 777 } 778 779 /** 780 * {@inheritdoc} 781 */ 782 public function dropPrimaryKey($table) { 783 if (!$this->constraintExists($table, 'pkey')) { 784 return FALSE; 785 } 786 787 $this->connection->query('ALTER TABLE {' . $table . '} DROP CONSTRAINT ' . $this->ensureIdentifiersLength($table, '', 'pkey')); 788 $this->resetTableInformation($table); 789 return TRUE; 790 } 791 792 /** 793 * {@inheritdoc} 794 */ 795 protected function findPrimaryKeyColumns($table) { 796 if (!$this->tableExists($table)) { 797 return FALSE; 798 } 799 return $this->connection->query("SELECT array_position(i.indkey, a.attnum) AS position, a.attname FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) WHERE i.indrelid = '{" . $table . "}'::regclass AND i.indisprimary ORDER BY position")->fetchAllKeyed(); 800 } 801 802 /** 803 * {@inheritdoc} 804 */ 805 public function addUniqueKey($table, $name, $fields) { 806 if (!$this->tableExists($table)) { 807 throw new SchemaObjectDoesNotExistException("Cannot add unique key '$name' to table '$table': table doesn't exist."); 808 } 809 if ($this->constraintExists($table, $name . '__key')) { 810 throw new SchemaObjectExistsException("Cannot add unique key '$name' to table '$table': unique key already exists."); 811 } 812 813 $this->connection->query('ALTER TABLE {' . $table . '} ADD CONSTRAINT ' . $this->ensureIdentifiersLength($table, $name, 'key') . ' UNIQUE (' . implode(',', $fields) . ')'); 814 $this->resetTableInformation($table); 815 } 816 817 /** 818 * {@inheritdoc} 819 */ 820 public function dropUniqueKey($table, $name) { 821 if (!$this->constraintExists($table, $name . '__key')) { 822 return FALSE; 823 } 824 825 $this->connection->query('ALTER TABLE {' . $table . '} DROP CONSTRAINT ' . $this->ensureIdentifiersLength($table, $name, 'key')); 826 $this->resetTableInformation($table); 827 return TRUE; 828 } 829 830 /** 831 * {@inheritdoc} 832 */ 833 public function addIndex($table, $name, $fields, array $spec) { 834 if (!$this->tableExists($table)) { 835 throw new SchemaObjectDoesNotExistException("Cannot add index '$name' to table '$table': table doesn't exist."); 836 } 837 if ($this->indexExists($table, $name)) { 838 throw new SchemaObjectExistsException("Cannot add index '$name' to table '$table': index already exists."); 839 } 840 841 $this->connection->query($this->_createIndexSql($table, $name, $fields)); 842 $this->resetTableInformation($table); 843 } 844 845 /** 846 * {@inheritdoc} 847 */ 848 public function dropIndex($table, $name) { 849 if (!$this->indexExists($table, $name)) { 850 return FALSE; 851 } 852 853 $this->connection->query('DROP INDEX ' . $this->ensureIdentifiersLength($table, $name, 'idx')); 854 $this->resetTableInformation($table); 855 return TRUE; 856 } 857 858 /** 859 * {@inheritdoc} 860 */ 861 protected function introspectIndexSchema($table) { 862 if (!$this->tableExists($table)) { 863 throw new SchemaObjectDoesNotExistException("The table $table doesn't exist."); 864 } 865 866 $index_schema = [ 867 'primary key' => [], 868 'unique keys' => [], 869 'indexes' => [], 870 ]; 871 872 // Get the schema and tablename for the table without identifier quotes. 873 $full_name = str_replace('"', '', $this->connection->prefixTables('{' . $table . '}')); 874 $result = $this->connection->query("SELECT i.relname AS index_name, a.attname AS column_name FROM pg_class t, pg_class i, pg_index ix, pg_attribute a WHERE t.oid = ix.indrelid AND i.oid = ix.indexrelid AND a.attrelid = t.oid AND a.attnum = ANY(ix.indkey) AND t.relkind = 'r' AND t.relname = :table_name ORDER BY index_name ASC, column_name ASC", [ 875 ':table_name' => $full_name, 876 ])->fetchAll(); 877 foreach ($result as $row) { 878 if (preg_match('/_pkey$/', $row->index_name)) { 879 $index_schema['primary key'][] = $row->column_name; 880 } 881 elseif (preg_match('/_key$/', $row->index_name)) { 882 $index_schema['unique keys'][$row->index_name][] = $row->column_name; 883 } 884 elseif (preg_match('/_idx$/', $row->index_name)) { 885 $index_schema['indexes'][$row->index_name][] = $row->column_name; 886 } 887 } 888 889 return $index_schema; 890 } 891 892 /** 893 * {@inheritdoc} 894 */ 895 public function changeField($table, $field, $field_new, $spec, $new_keys = []) { 896 if (!$this->fieldExists($table, $field)) { 897 throw new SchemaObjectDoesNotExistException("Cannot change the definition of field '$table.$field': field doesn't exist."); 898 } 899 if (($field != $field_new) && $this->fieldExists($table, $field_new)) { 900 throw new SchemaObjectExistsException("Cannot rename field '$table.$field' to '$field_new': target field already exists."); 901 } 902 if (isset($new_keys['primary key']) && in_array($field_new, $new_keys['primary key'], TRUE)) { 903 $this->ensureNotNullPrimaryKey($new_keys['primary key'], [$field_new => $spec]); 904 } 905 906 $spec = $this->processField($spec); 907 908 // Type 'serial' is known to PostgreSQL, but only during table creation, 909 // not when altering. Because of that, we create it here as an 'int'. After 910 // we create it we manually re-apply the sequence. 911 if (in_array($spec['pgsql_type'], ['serial', 'bigserial'])) { 912 $field_def = 'int'; 913 } 914 else { 915 $field_def = $spec['pgsql_type']; 916 } 917 918 if (in_array($spec['pgsql_type'], ['varchar', 'character', 'text']) && isset($spec['length'])) { 919 $field_def .= '(' . $spec['length'] . ')'; 920 } 921 elseif (isset($spec['precision']) && isset($spec['scale'])) { 922 $field_def .= '(' . $spec['precision'] . ', ' . $spec['scale'] . ')'; 923 } 924 925 // Remove old check constraints. 926 $field_info = $this->queryFieldInformation($table, $field); 927 928 foreach ($field_info as $check) { 929 $this->connection->query('ALTER TABLE {' . $table . '} DROP CONSTRAINT "' . $check . '"'); 930 } 931 932 // Remove old default. 933 $this->connection->query('ALTER TABLE {' . $table . '} ALTER COLUMN "' . $field . '" DROP DEFAULT'); 934 935 // Convert field type. 936 // Usually, we do this via a simple typecast 'USING fieldname::type'. But 937 // the typecast does not work for conversions to bytea. 938 // @see http://www.postgresql.org/docs/current/static/datatype-binary.html 939 $table_information = $this->queryTableInformation($table); 940 $is_bytea = !empty($table_information->blob_fields[$field]); 941 if ($spec['pgsql_type'] != 'bytea') { 942 if ($is_bytea) { 943 $this->connection->query('ALTER TABLE {' . $table . '} ALTER "' . $field . '" TYPE ' . $field_def . ' USING convert_from("' . $field . '"' . ", 'UTF8')"); 944 } 945 else { 946 $this->connection->query('ALTER TABLE {' . $table . '} ALTER "' . $field . '" TYPE ' . $field_def . ' USING "' . $field . '"::' . $field_def); 947 } 948 } 949 else { 950 // Do not attempt to convert a field that is bytea already. 951 if (!$is_bytea) { 952 // Convert to a bytea type by using the SQL replace() function to 953 // convert any single backslashes in the field content to double 954 // backslashes ('\' to '\\'). 955 $this->connection->query('ALTER TABLE {' . $table . '} ALTER "' . $field . '" TYPE ' . $field_def . ' USING decode(replace("' . $field . '"' . ", E'\\\\', E'\\\\\\\\'), 'escape');"); 956 } 957 } 958 959 if (isset($spec['not null'])) { 960 if ($spec['not null']) { 961 $null_action = 'SET NOT NULL'; 962 } 963 else { 964 $null_action = 'DROP NOT NULL'; 965 } 966 $this->connection->query('ALTER TABLE {' . $table . '} ALTER "' . $field . '" ' . $null_action); 967 } 968 969 if (in_array($spec['pgsql_type'], ['serial', 'bigserial'])) { 970 // Type "serial" is known to PostgreSQL, but *only* during table creation, 971 // not when altering. Because of that, the sequence needs to be created 972 // and initialized by hand. 973 $seq = $this->connection->makeSequenceName($table, $field_new); 974 $this->connection->query("CREATE SEQUENCE " . $seq); 975 // Set sequence to maximal field value to not conflict with existing 976 // entries. 977 $this->connection->query("SELECT setval('" . $seq . "', MAX(\"" . $field . '")) FROM {' . $table . "}"); 978 $this->connection->query('ALTER TABLE {' . $table . '} ALTER ' . $field . ' SET DEFAULT nextval(' . $this->connection->quote($seq) . ')'); 979 } 980 981 // Rename the column if necessary. 982 if ($field != $field_new) { 983 $this->connection->query('ALTER TABLE {' . $table . '} RENAME "' . $field . '" TO "' . $field_new . '"'); 984 } 985 986 // Add unsigned check if necessary. 987 if (!empty($spec['unsigned'])) { 988 $this->connection->query('ALTER TABLE {' . $table . '} ADD CHECK ("' . $field_new . '" >= 0)'); 989 } 990 991 // Add default if necessary. 992 if (isset($spec['default'])) { 993 $this->connection->query('ALTER TABLE {' . $table . '} ALTER COLUMN "' . $field_new . '" SET DEFAULT ' . $this->escapeDefaultValue($spec['default'])); 994 } 995 996 // Change description if necessary. 997 if (!empty($spec['description'])) { 998 $this->connection->query('COMMENT ON COLUMN {' . $table . '}."' . $field_new . '" IS ' . $this->prepareComment($spec['description'])); 999 } 1000 1001 if (isset($new_keys)) { 1002 $this->_createKeys($table, $new_keys); 1003 } 1004 $this->resetTableInformation($table); 1005 } 1006 1007 protected function _createIndexSql($table, $name, $fields) { 1008 $query = 'CREATE INDEX ' . $this->ensureIdentifiersLength($table, $name, 'idx') . ' ON {' . $table . '} ('; 1009 $query .= $this->_createKeySql($fields) . ')'; 1010 return $query; 1011 } 1012 1013 protected function _createKeys($table, $new_keys) { 1014 if (isset($new_keys['primary key'])) { 1015 $this->addPrimaryKey($table, $new_keys['primary key']); 1016 } 1017 if (isset($new_keys['unique keys'])) { 1018 foreach ($new_keys['unique keys'] as $name => $fields) { 1019 $this->addUniqueKey($table, $name, $fields); 1020 } 1021 } 1022 if (isset($new_keys['indexes'])) { 1023 foreach ($new_keys['indexes'] as $name => $fields) { 1024 // Even though $new_keys is not a full schema it still has 'indexes' and 1025 // so is a partial schema. Technically addIndex() doesn't do anything 1026 // with it so passing an empty array would work as well. 1027 $this->addIndex($table, $name, $fields, $new_keys); 1028 } 1029 } 1030 } 1031 1032 /** 1033 * Retrieve a table or column comment. 1034 */ 1035 public function getComment($table, $column = NULL) { 1036 $info = $this->getPrefixInfo($table); 1037 // Don't use {} around pg_class, pg_attribute tables. 1038 if (isset($column)) { 1039 return $this->connection->query('SELECT col_description(oid, attnum) FROM pg_class, pg_attribute WHERE attrelid = oid AND relname = ? AND attname = ?', [$info['table'], $column])->fetchField(); 1040 } 1041 else { 1042 return $this->connection->query('SELECT obj_description(oid, ?) FROM pg_class WHERE relname = ?', ['pg_class', $info['table']])->fetchField(); 1043 } 1044 } 1045 1046 /** 1047 * Calculates a base-64 encoded, PostgreSQL-safe sha-256 hash per PostgreSQL 1048 * documentation: 4.1. Lexical Structure. 1049 * 1050 * @param $data 1051 * String to be hashed. 1052 * 1053 * @return string 1054 * A base-64 encoded sha-256 hash, with + and / replaced with _ and any = 1055 * padding characters removed. 1056 */ 1057 protected function hashBase64($data) { 1058 $hash = base64_encode(hash('sha256', $data, TRUE)); 1059 // Modify the hash so it's safe to use in PostgreSQL identifiers. 1060 return strtr($hash, ['+' => '_', '/' => '_', '=' => '']); 1061 } 1062 1063} 1064 1065/** 1066 * @} End of "addtogroup schemaapi". 1067 */ 1068