1 /*------------------------------------------------------------------------- 2 * 3 * rewriteDefine.c 4 * routines for defining a rewrite rule 5 * 6 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group 7 * Portions Copyright (c) 1994, Regents of the University of California 8 * 9 * 10 * IDENTIFICATION 11 * src/backend/rewrite/rewriteDefine.c 12 * 13 *------------------------------------------------------------------------- 14 */ 15 #include "postgres.h" 16 17 #include "access/heapam.h" 18 #include "access/htup_details.h" 19 #include "access/multixact.h" 20 #include "access/tableam.h" 21 #include "access/transam.h" 22 #include "access/xact.h" 23 #include "catalog/catalog.h" 24 #include "catalog/dependency.h" 25 #include "catalog/heap.h" 26 #include "catalog/namespace.h" 27 #include "catalog/objectaccess.h" 28 #include "catalog/pg_inherits.h" 29 #include "catalog/pg_rewrite.h" 30 #include "catalog/storage.h" 31 #include "commands/policy.h" 32 #include "miscadmin.h" 33 #include "nodes/nodeFuncs.h" 34 #include "parser/parse_utilcmd.h" 35 #include "rewrite/rewriteDefine.h" 36 #include "rewrite/rewriteManip.h" 37 #include "rewrite/rewriteSupport.h" 38 #include "utils/acl.h" 39 #include "utils/builtins.h" 40 #include "utils/inval.h" 41 #include "utils/lsyscache.h" 42 #include "utils/rel.h" 43 #include "utils/snapmgr.h" 44 #include "utils/syscache.h" 45 46 47 static void checkRuleResultList(List *targetList, TupleDesc resultDesc, 48 bool isSelect, bool requireColumnNameMatch); 49 static bool setRuleCheckAsUser_walker(Node *node, Oid *context); 50 static void setRuleCheckAsUser_Query(Query *qry, Oid userid); 51 52 53 /* 54 * InsertRule - 55 * takes the arguments and inserts them as a row into the system 56 * relation "pg_rewrite" 57 */ 58 static Oid 59 InsertRule(const char *rulname, 60 int evtype, 61 Oid eventrel_oid, 62 bool evinstead, 63 Node *event_qual, 64 List *action, 65 bool replace) 66 { 67 char *evqual = nodeToString(event_qual); 68 char *actiontree = nodeToString((Node *) action); 69 Datum values[Natts_pg_rewrite]; 70 bool nulls[Natts_pg_rewrite]; 71 bool replaces[Natts_pg_rewrite]; 72 NameData rname; 73 Relation pg_rewrite_desc; 74 HeapTuple tup, 75 oldtup; 76 Oid rewriteObjectId; 77 ObjectAddress myself, 78 referenced; 79 bool is_update = false; 80 81 /* 82 * Set up *nulls and *values arrays 83 */ 84 MemSet(nulls, false, sizeof(nulls)); 85 86 namestrcpy(&rname, rulname); 87 values[Anum_pg_rewrite_rulename - 1] = NameGetDatum(&rname); 88 values[Anum_pg_rewrite_ev_class - 1] = ObjectIdGetDatum(eventrel_oid); 89 values[Anum_pg_rewrite_ev_type - 1] = CharGetDatum(evtype + '0'); 90 values[Anum_pg_rewrite_ev_enabled - 1] = CharGetDatum(RULE_FIRES_ON_ORIGIN); 91 values[Anum_pg_rewrite_is_instead - 1] = BoolGetDatum(evinstead); 92 values[Anum_pg_rewrite_ev_qual - 1] = CStringGetTextDatum(evqual); 93 values[Anum_pg_rewrite_ev_action - 1] = CStringGetTextDatum(actiontree); 94 95 /* 96 * Ready to store new pg_rewrite tuple 97 */ 98 pg_rewrite_desc = table_open(RewriteRelationId, RowExclusiveLock); 99 100 /* 101 * Check to see if we are replacing an existing tuple 102 */ 103 oldtup = SearchSysCache2(RULERELNAME, 104 ObjectIdGetDatum(eventrel_oid), 105 PointerGetDatum(rulname)); 106 107 if (HeapTupleIsValid(oldtup)) 108 { 109 if (!replace) 110 ereport(ERROR, 111 (errcode(ERRCODE_DUPLICATE_OBJECT), 112 errmsg("rule \"%s\" for relation \"%s\" already exists", 113 rulname, get_rel_name(eventrel_oid)))); 114 115 /* 116 * When replacing, we don't need to replace every attribute 117 */ 118 MemSet(replaces, false, sizeof(replaces)); 119 replaces[Anum_pg_rewrite_ev_type - 1] = true; 120 replaces[Anum_pg_rewrite_is_instead - 1] = true; 121 replaces[Anum_pg_rewrite_ev_qual - 1] = true; 122 replaces[Anum_pg_rewrite_ev_action - 1] = true; 123 124 tup = heap_modify_tuple(oldtup, RelationGetDescr(pg_rewrite_desc), 125 values, nulls, replaces); 126 127 CatalogTupleUpdate(pg_rewrite_desc, &tup->t_self, tup); 128 129 ReleaseSysCache(oldtup); 130 131 rewriteObjectId = ((Form_pg_rewrite) GETSTRUCT(tup))->oid; 132 is_update = true; 133 } 134 else 135 { 136 rewriteObjectId = GetNewOidWithIndex(pg_rewrite_desc, 137 RewriteOidIndexId, 138 Anum_pg_rewrite_oid); 139 values[Anum_pg_rewrite_oid - 1] = ObjectIdGetDatum(rewriteObjectId); 140 141 tup = heap_form_tuple(pg_rewrite_desc->rd_att, values, nulls); 142 143 CatalogTupleInsert(pg_rewrite_desc, tup); 144 } 145 146 147 heap_freetuple(tup); 148 149 /* If replacing, get rid of old dependencies and make new ones */ 150 if (is_update) 151 deleteDependencyRecordsFor(RewriteRelationId, rewriteObjectId, false); 152 153 /* 154 * Install dependency on rule's relation to ensure it will go away on 155 * relation deletion. If the rule is ON SELECT, make the dependency 156 * implicit --- this prevents deleting a view's SELECT rule. Other kinds 157 * of rules can be AUTO. 158 */ 159 myself.classId = RewriteRelationId; 160 myself.objectId = rewriteObjectId; 161 myself.objectSubId = 0; 162 163 referenced.classId = RelationRelationId; 164 referenced.objectId = eventrel_oid; 165 referenced.objectSubId = 0; 166 167 recordDependencyOn(&myself, &referenced, 168 (evtype == CMD_SELECT) ? DEPENDENCY_INTERNAL : DEPENDENCY_AUTO); 169 170 /* 171 * Also install dependencies on objects referenced in action and qual. 172 */ 173 recordDependencyOnExpr(&myself, (Node *) action, NIL, 174 DEPENDENCY_NORMAL); 175 176 if (event_qual != NULL) 177 { 178 /* Find query containing OLD/NEW rtable entries */ 179 Query *qry = linitial_node(Query, action); 180 181 qry = getInsertSelectQuery(qry, NULL); 182 recordDependencyOnExpr(&myself, event_qual, qry->rtable, 183 DEPENDENCY_NORMAL); 184 } 185 186 /* Post creation hook for new rule */ 187 InvokeObjectPostCreateHook(RewriteRelationId, rewriteObjectId, 0); 188 189 table_close(pg_rewrite_desc, RowExclusiveLock); 190 191 return rewriteObjectId; 192 } 193 194 /* 195 * DefineRule 196 * Execute a CREATE RULE command. 197 */ 198 ObjectAddress 199 DefineRule(RuleStmt *stmt, const char *queryString) 200 { 201 List *actions; 202 Node *whereClause; 203 Oid relId; 204 205 /* Parse analysis. */ 206 transformRuleStmt(stmt, queryString, &actions, &whereClause); 207 208 /* 209 * Find and lock the relation. Lock level should match 210 * DefineQueryRewrite. 211 */ 212 relId = RangeVarGetRelid(stmt->relation, AccessExclusiveLock, false); 213 214 /* ... and execute */ 215 return DefineQueryRewrite(stmt->rulename, 216 relId, 217 whereClause, 218 stmt->event, 219 stmt->instead, 220 stmt->replace, 221 actions); 222 } 223 224 225 /* 226 * DefineQueryRewrite 227 * Create a rule 228 * 229 * This is essentially the same as DefineRule() except that the rule's 230 * action and qual have already been passed through parse analysis. 231 */ 232 ObjectAddress 233 DefineQueryRewrite(const char *rulename, 234 Oid event_relid, 235 Node *event_qual, 236 CmdType event_type, 237 bool is_instead, 238 bool replace, 239 List *action) 240 { 241 Relation event_relation; 242 ListCell *l; 243 Query *query; 244 bool RelisBecomingView = false; 245 Oid ruleId = InvalidOid; 246 ObjectAddress address; 247 248 /* 249 * If we are installing an ON SELECT rule, we had better grab 250 * AccessExclusiveLock to ensure no SELECTs are currently running on the 251 * event relation. For other types of rules, it would be sufficient to 252 * grab ShareRowExclusiveLock to lock out insert/update/delete actions and 253 * to ensure that we lock out current CREATE RULE statements; but because 254 * of race conditions in access to catalog entries, we can't do that yet. 255 * 256 * Note that this lock level should match the one used in DefineRule. 257 */ 258 event_relation = table_open(event_relid, AccessExclusiveLock); 259 260 /* 261 * Verify relation is of a type that rules can sensibly be applied to. 262 * Internal callers can target materialized views, but transformRuleStmt() 263 * blocks them for users. Don't mention them in the error message. 264 */ 265 if (event_relation->rd_rel->relkind != RELKIND_RELATION && 266 event_relation->rd_rel->relkind != RELKIND_MATVIEW && 267 event_relation->rd_rel->relkind != RELKIND_VIEW && 268 event_relation->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) 269 ereport(ERROR, 270 (errcode(ERRCODE_WRONG_OBJECT_TYPE), 271 errmsg("\"%s\" is not a table or view", 272 RelationGetRelationName(event_relation)))); 273 274 if (!allowSystemTableMods && IsSystemRelation(event_relation)) 275 ereport(ERROR, 276 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), 277 errmsg("permission denied: \"%s\" is a system catalog", 278 RelationGetRelationName(event_relation)))); 279 280 /* 281 * Check user has permission to apply rules to this relation. 282 */ 283 if (!pg_class_ownercheck(event_relid, GetUserId())) 284 aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(event_relation->rd_rel->relkind), 285 RelationGetRelationName(event_relation)); 286 287 /* 288 * No rule actions that modify OLD or NEW 289 */ 290 foreach(l, action) 291 { 292 query = lfirst_node(Query, l); 293 if (query->resultRelation == 0) 294 continue; 295 /* Don't be fooled by INSERT/SELECT */ 296 if (query != getInsertSelectQuery(query, NULL)) 297 continue; 298 if (query->resultRelation == PRS2_OLD_VARNO) 299 ereport(ERROR, 300 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), 301 errmsg("rule actions on OLD are not implemented"), 302 errhint("Use views or triggers instead."))); 303 if (query->resultRelation == PRS2_NEW_VARNO) 304 ereport(ERROR, 305 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), 306 errmsg("rule actions on NEW are not implemented"), 307 errhint("Use triggers instead."))); 308 } 309 310 if (event_type == CMD_SELECT) 311 { 312 /* 313 * Rules ON SELECT are restricted to view definitions 314 * 315 * So there cannot be INSTEAD NOTHING, ... 316 */ 317 if (list_length(action) == 0) 318 ereport(ERROR, 319 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), 320 errmsg("INSTEAD NOTHING rules on SELECT are not implemented"), 321 errhint("Use views instead."))); 322 323 /* 324 * ... there cannot be multiple actions, ... 325 */ 326 if (list_length(action) > 1) 327 ereport(ERROR, 328 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), 329 errmsg("multiple actions for rules on SELECT are not implemented"))); 330 331 /* 332 * ... the one action must be a SELECT, ... 333 */ 334 query = linitial_node(Query, action); 335 if (!is_instead || 336 query->commandType != CMD_SELECT) 337 ereport(ERROR, 338 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), 339 errmsg("rules on SELECT must have action INSTEAD SELECT"))); 340 341 /* 342 * ... it cannot contain data-modifying WITH ... 343 */ 344 if (query->hasModifyingCTE) 345 ereport(ERROR, 346 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), 347 errmsg("rules on SELECT must not contain data-modifying statements in WITH"))); 348 349 /* 350 * ... there can be no rule qual, ... 351 */ 352 if (event_qual != NULL) 353 ereport(ERROR, 354 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), 355 errmsg("event qualifications are not implemented for rules on SELECT"))); 356 357 /* 358 * ... the targetlist of the SELECT action must exactly match the 359 * event relation, ... 360 */ 361 checkRuleResultList(query->targetList, 362 RelationGetDescr(event_relation), 363 true, 364 event_relation->rd_rel->relkind != 365 RELKIND_MATVIEW); 366 367 /* 368 * ... there must not be another ON SELECT rule already ... 369 */ 370 if (!replace && event_relation->rd_rules != NULL) 371 { 372 int i; 373 374 for (i = 0; i < event_relation->rd_rules->numLocks; i++) 375 { 376 RewriteRule *rule; 377 378 rule = event_relation->rd_rules->rules[i]; 379 if (rule->event == CMD_SELECT) 380 ereport(ERROR, 381 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), 382 errmsg("\"%s\" is already a view", 383 RelationGetRelationName(event_relation)))); 384 } 385 } 386 387 /* 388 * ... and finally the rule must be named _RETURN. 389 */ 390 if (strcmp(rulename, ViewSelectRuleName) != 0) 391 { 392 /* 393 * In versions before 7.3, the expected name was _RETviewname. For 394 * backwards compatibility with old pg_dump output, accept that 395 * and silently change it to _RETURN. Since this is just a quick 396 * backwards-compatibility hack, limit the number of characters 397 * checked to a few less than NAMEDATALEN; this saves having to 398 * worry about where a multibyte character might have gotten 399 * truncated. 400 */ 401 if (strncmp(rulename, "_RET", 4) != 0 || 402 strncmp(rulename + 4, RelationGetRelationName(event_relation), 403 NAMEDATALEN - 4 - 4) != 0) 404 ereport(ERROR, 405 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), 406 errmsg("view rule for \"%s\" must be named \"%s\"", 407 RelationGetRelationName(event_relation), 408 ViewSelectRuleName))); 409 rulename = pstrdup(ViewSelectRuleName); 410 } 411 412 /* 413 * Are we converting a relation to a view? 414 * 415 * If so, check that the relation is empty because the storage for the 416 * relation is going to be deleted. Also insist that the rel not be 417 * involved in partitioning, nor have any triggers, indexes, child or 418 * parent tables, RLS policies, or RLS enabled. (Note: some of these 419 * tests are too strict, because they will reject relations that once 420 * had such but don't anymore. But we don't really care, because this 421 * whole business of converting relations to views is just an obsolete 422 * kluge to allow dump/reload of views that participate in circular 423 * dependencies.) 424 */ 425 if (event_relation->rd_rel->relkind != RELKIND_VIEW && 426 event_relation->rd_rel->relkind != RELKIND_MATVIEW) 427 { 428 TableScanDesc scanDesc; 429 Snapshot snapshot; 430 TupleTableSlot *slot; 431 432 if (event_relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) 433 ereport(ERROR, 434 (errcode(ERRCODE_WRONG_OBJECT_TYPE), 435 errmsg("cannot convert partitioned table \"%s\" to a view", 436 RelationGetRelationName(event_relation)))); 437 438 /* only case left: */ 439 Assert(event_relation->rd_rel->relkind == RELKIND_RELATION); 440 441 if (event_relation->rd_rel->relispartition) 442 ereport(ERROR, 443 (errcode(ERRCODE_WRONG_OBJECT_TYPE), 444 errmsg("cannot convert partition \"%s\" to a view", 445 RelationGetRelationName(event_relation)))); 446 447 snapshot = RegisterSnapshot(GetLatestSnapshot()); 448 scanDesc = table_beginscan(event_relation, snapshot, 0, NULL); 449 slot = table_slot_create(event_relation, NULL); 450 if (table_scan_getnextslot(scanDesc, ForwardScanDirection, slot)) 451 ereport(ERROR, 452 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), 453 errmsg("could not convert table \"%s\" to a view because it is not empty", 454 RelationGetRelationName(event_relation)))); 455 ExecDropSingleTupleTableSlot(slot); 456 table_endscan(scanDesc); 457 UnregisterSnapshot(snapshot); 458 459 if (event_relation->rd_rel->relhastriggers) 460 ereport(ERROR, 461 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), 462 errmsg("could not convert table \"%s\" to a view because it has triggers", 463 RelationGetRelationName(event_relation)), 464 errhint("In particular, the table cannot be involved in any foreign key relationships."))); 465 466 if (event_relation->rd_rel->relhasindex) 467 ereport(ERROR, 468 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), 469 errmsg("could not convert table \"%s\" to a view because it has indexes", 470 RelationGetRelationName(event_relation)))); 471 472 if (event_relation->rd_rel->relhassubclass) 473 ereport(ERROR, 474 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), 475 errmsg("could not convert table \"%s\" to a view because it has child tables", 476 RelationGetRelationName(event_relation)))); 477 478 if (has_superclass(RelationGetRelid(event_relation))) 479 ereport(ERROR, 480 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), 481 errmsg("could not convert table \"%s\" to a view because it has parent tables", 482 RelationGetRelationName(event_relation)))); 483 484 if (event_relation->rd_rel->relrowsecurity) 485 ereport(ERROR, 486 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), 487 errmsg("could not convert table \"%s\" to a view because it has row security enabled", 488 RelationGetRelationName(event_relation)))); 489 490 if (relation_has_policies(event_relation)) 491 ereport(ERROR, 492 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), 493 errmsg("could not convert table \"%s\" to a view because it has row security policies", 494 RelationGetRelationName(event_relation)))); 495 496 RelisBecomingView = true; 497 } 498 } 499 else 500 { 501 /* 502 * For non-SELECT rules, a RETURNING list can appear in at most one of 503 * the actions ... and there can't be any RETURNING list at all in a 504 * conditional or non-INSTEAD rule. (Actually, there can be at most 505 * one RETURNING list across all rules on the same event, but it seems 506 * best to enforce that at rule expansion time.) If there is a 507 * RETURNING list, it must match the event relation. 508 */ 509 bool haveReturning = false; 510 511 foreach(l, action) 512 { 513 query = lfirst_node(Query, l); 514 515 if (!query->returningList) 516 continue; 517 if (haveReturning) 518 ereport(ERROR, 519 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), 520 errmsg("cannot have multiple RETURNING lists in a rule"))); 521 haveReturning = true; 522 if (event_qual != NULL) 523 ereport(ERROR, 524 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), 525 errmsg("RETURNING lists are not supported in conditional rules"))); 526 if (!is_instead) 527 ereport(ERROR, 528 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), 529 errmsg("RETURNING lists are not supported in non-INSTEAD rules"))); 530 checkRuleResultList(query->returningList, 531 RelationGetDescr(event_relation), 532 false, false); 533 } 534 } 535 536 /* 537 * This rule is allowed - prepare to install it. 538 */ 539 540 /* discard rule if it's null action and not INSTEAD; it's a no-op */ 541 if (action != NIL || is_instead) 542 { 543 ruleId = InsertRule(rulename, 544 event_type, 545 event_relid, 546 is_instead, 547 event_qual, 548 action, 549 replace); 550 551 /* 552 * Set pg_class 'relhasrules' field true for event relation. 553 * 554 * Important side effect: an SI notice is broadcast to force all 555 * backends (including me!) to update relcache entries with the new 556 * rule. 557 */ 558 SetRelationRuleStatus(event_relid, true); 559 } 560 561 /* --------------------------------------------------------------------- 562 * If the relation is becoming a view: 563 * - delete the associated storage files 564 * - get rid of any system attributes in pg_attribute; a view shouldn't 565 * have any of those 566 * - remove the toast table; there is no need for it anymore, and its 567 * presence would make vacuum slightly more complicated 568 * - set relkind to RELKIND_VIEW, and adjust other pg_class fields 569 * to be appropriate for a view 570 * 571 * NB: we had better have AccessExclusiveLock to do this ... 572 * --------------------------------------------------------------------- 573 */ 574 if (RelisBecomingView) 575 { 576 Relation relationRelation; 577 Oid toastrelid; 578 HeapTuple classTup; 579 Form_pg_class classForm; 580 581 relationRelation = table_open(RelationRelationId, RowExclusiveLock); 582 toastrelid = event_relation->rd_rel->reltoastrelid; 583 584 /* drop storage while table still looks like a table */ 585 RelationDropStorage(event_relation); 586 DeleteSystemAttributeTuples(event_relid); 587 588 /* 589 * Drop the toast table if any. (This won't take care of updating the 590 * toast fields in the relation's own pg_class entry; we handle that 591 * below.) 592 */ 593 if (OidIsValid(toastrelid)) 594 { 595 ObjectAddress toastobject; 596 597 /* 598 * Delete the dependency of the toast relation on the main 599 * relation so we can drop the former without dropping the latter. 600 */ 601 deleteDependencyRecordsFor(RelationRelationId, toastrelid, 602 false); 603 604 /* Make deletion of dependency record visible */ 605 CommandCounterIncrement(); 606 607 /* Now drop toast table, including its index */ 608 toastobject.classId = RelationRelationId; 609 toastobject.objectId = toastrelid; 610 toastobject.objectSubId = 0; 611 performDeletion(&toastobject, DROP_RESTRICT, 612 PERFORM_DELETION_INTERNAL); 613 } 614 615 /* 616 * SetRelationRuleStatus may have updated the pg_class row, so we must 617 * advance the command counter before trying to update it again. 618 */ 619 CommandCounterIncrement(); 620 621 /* 622 * Fix pg_class entry to look like a normal view's, including setting 623 * the correct relkind and removal of reltoastrelid of the toast table 624 * we potentially removed above. 625 */ 626 classTup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(event_relid)); 627 if (!HeapTupleIsValid(classTup)) 628 elog(ERROR, "cache lookup failed for relation %u", event_relid); 629 classForm = (Form_pg_class) GETSTRUCT(classTup); 630 631 classForm->relam = InvalidOid; 632 classForm->reltablespace = InvalidOid; 633 classForm->relpages = 0; 634 classForm->reltuples = -1; 635 classForm->relallvisible = 0; 636 classForm->reltoastrelid = InvalidOid; 637 classForm->relhasindex = false; 638 classForm->relkind = RELKIND_VIEW; 639 classForm->relfrozenxid = InvalidTransactionId; 640 classForm->relminmxid = InvalidMultiXactId; 641 classForm->relreplident = REPLICA_IDENTITY_NOTHING; 642 643 CatalogTupleUpdate(relationRelation, &classTup->t_self, classTup); 644 645 heap_freetuple(classTup); 646 table_close(relationRelation, RowExclusiveLock); 647 } 648 649 ObjectAddressSet(address, RewriteRelationId, ruleId); 650 651 /* Close rel, but keep lock till commit... */ 652 table_close(event_relation, NoLock); 653 654 return address; 655 } 656 657 /* 658 * checkRuleResultList 659 * Verify that targetList produces output compatible with a tupledesc 660 * 661 * The targetList might be either a SELECT targetlist, or a RETURNING list; 662 * isSelect tells which. This is used for choosing error messages. 663 * 664 * A SELECT targetlist may optionally require that column names match. 665 */ 666 static void 667 checkRuleResultList(List *targetList, TupleDesc resultDesc, bool isSelect, 668 bool requireColumnNameMatch) 669 { 670 ListCell *tllist; 671 int i; 672 673 /* Only a SELECT may require a column name match. */ 674 Assert(isSelect || !requireColumnNameMatch); 675 676 i = 0; 677 foreach(tllist, targetList) 678 { 679 TargetEntry *tle = (TargetEntry *) lfirst(tllist); 680 Oid tletypid; 681 int32 tletypmod; 682 Form_pg_attribute attr; 683 char *attname; 684 685 /* resjunk entries may be ignored */ 686 if (tle->resjunk) 687 continue; 688 i++; 689 if (i > resultDesc->natts) 690 ereport(ERROR, 691 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), 692 isSelect ? 693 errmsg("SELECT rule's target list has too many entries") : 694 errmsg("RETURNING list has too many entries"))); 695 696 attr = TupleDescAttr(resultDesc, i - 1); 697 attname = NameStr(attr->attname); 698 699 /* 700 * Disallow dropped columns in the relation. This is not really 701 * expected to happen when creating an ON SELECT rule. It'd be 702 * possible if someone tried to convert a relation with dropped 703 * columns to a view, but the only case we care about supporting 704 * table-to-view conversion for is pg_dump, and pg_dump won't do that. 705 * 706 * Unfortunately, the situation is also possible when adding a rule 707 * with RETURNING to a regular table, and rejecting that case is 708 * altogether more annoying. In principle we could support it by 709 * modifying the targetlist to include dummy NULL columns 710 * corresponding to the dropped columns in the tupdesc. However, 711 * places like ruleutils.c would have to be fixed to not process such 712 * entries, and that would take an uncertain and possibly rather large 713 * amount of work. (Note we could not dodge that by marking the dummy 714 * columns resjunk, since it's precisely the non-resjunk tlist columns 715 * that are expected to correspond to table columns.) 716 */ 717 if (attr->attisdropped) 718 ereport(ERROR, 719 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), 720 isSelect ? 721 errmsg("cannot convert relation containing dropped columns to view") : 722 errmsg("cannot create a RETURNING list for a relation containing dropped columns"))); 723 724 /* Check name match if required; no need for two error texts here */ 725 if (requireColumnNameMatch && strcmp(tle->resname, attname) != 0) 726 ereport(ERROR, 727 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), 728 errmsg("SELECT rule's target entry %d has different column name from column \"%s\"", 729 i, attname), 730 errdetail("SELECT target entry is named \"%s\".", 731 tle->resname))); 732 733 /* Check type match. */ 734 tletypid = exprType((Node *) tle->expr); 735 if (attr->atttypid != tletypid) 736 ereport(ERROR, 737 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), 738 isSelect ? 739 errmsg("SELECT rule's target entry %d has different type from column \"%s\"", 740 i, attname) : 741 errmsg("RETURNING list's entry %d has different type from column \"%s\"", 742 i, attname), 743 isSelect ? 744 errdetail("SELECT target entry has type %s, but column has type %s.", 745 format_type_be(tletypid), 746 format_type_be(attr->atttypid)) : 747 errdetail("RETURNING list entry has type %s, but column has type %s.", 748 format_type_be(tletypid), 749 format_type_be(attr->atttypid)))); 750 751 /* 752 * Allow typmods to be different only if one of them is -1, ie, 753 * "unspecified". This is necessary for cases like "numeric", where 754 * the table will have a filled-in default length but the select 755 * rule's expression will probably have typmod = -1. 756 */ 757 tletypmod = exprTypmod((Node *) tle->expr); 758 if (attr->atttypmod != tletypmod && 759 attr->atttypmod != -1 && tletypmod != -1) 760 ereport(ERROR, 761 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), 762 isSelect ? 763 errmsg("SELECT rule's target entry %d has different size from column \"%s\"", 764 i, attname) : 765 errmsg("RETURNING list's entry %d has different size from column \"%s\"", 766 i, attname), 767 isSelect ? 768 errdetail("SELECT target entry has type %s, but column has type %s.", 769 format_type_with_typemod(tletypid, tletypmod), 770 format_type_with_typemod(attr->atttypid, 771 attr->atttypmod)) : 772 errdetail("RETURNING list entry has type %s, but column has type %s.", 773 format_type_with_typemod(tletypid, tletypmod), 774 format_type_with_typemod(attr->atttypid, 775 attr->atttypmod)))); 776 } 777 778 if (i != resultDesc->natts) 779 ereport(ERROR, 780 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), 781 isSelect ? 782 errmsg("SELECT rule's target list has too few entries") : 783 errmsg("RETURNING list has too few entries"))); 784 } 785 786 /* 787 * setRuleCheckAsUser 788 * Recursively scan a query or expression tree and set the checkAsUser 789 * field to the given userid in all rtable entries. 790 * 791 * Note: for a view (ON SELECT rule), the checkAsUser field of the OLD 792 * RTE entry will be overridden when the view rule is expanded, and the 793 * checkAsUser field of the NEW entry is irrelevant because that entry's 794 * requiredPerms bits will always be zero. However, for other types of rules 795 * it's important to set these fields to match the rule owner. So we just set 796 * them always. 797 */ 798 void 799 setRuleCheckAsUser(Node *node, Oid userid) 800 { 801 (void) setRuleCheckAsUser_walker(node, &userid); 802 } 803 804 static bool 805 setRuleCheckAsUser_walker(Node *node, Oid *context) 806 { 807 if (node == NULL) 808 return false; 809 if (IsA(node, Query)) 810 { 811 setRuleCheckAsUser_Query((Query *) node, *context); 812 return false; 813 } 814 return expression_tree_walker(node, setRuleCheckAsUser_walker, 815 (void *) context); 816 } 817 818 static void 819 setRuleCheckAsUser_Query(Query *qry, Oid userid) 820 { 821 ListCell *l; 822 823 /* Set all the RTEs in this query node */ 824 foreach(l, qry->rtable) 825 { 826 RangeTblEntry *rte = (RangeTblEntry *) lfirst(l); 827 828 if (rte->rtekind == RTE_SUBQUERY) 829 { 830 /* Recurse into subquery in FROM */ 831 setRuleCheckAsUser_Query(rte->subquery, userid); 832 } 833 else 834 rte->checkAsUser = userid; 835 } 836 837 /* Recurse into subquery-in-WITH */ 838 foreach(l, qry->cteList) 839 { 840 CommonTableExpr *cte = (CommonTableExpr *) lfirst(l); 841 842 setRuleCheckAsUser_Query(castNode(Query, cte->ctequery), userid); 843 } 844 845 /* If there are sublinks, search for them and process their RTEs */ 846 if (qry->hasSubLinks) 847 query_tree_walker(qry, setRuleCheckAsUser_walker, (void *) &userid, 848 QTW_IGNORE_RC_SUBQUERIES); 849 } 850 851 852 /* 853 * Change the firing semantics of an existing rule. 854 */ 855 void 856 EnableDisableRule(Relation rel, const char *rulename, 857 char fires_when) 858 { 859 Relation pg_rewrite_desc; 860 Oid owningRel = RelationGetRelid(rel); 861 Oid eventRelationOid; 862 HeapTuple ruletup; 863 Form_pg_rewrite ruleform; 864 bool changed = false; 865 866 /* 867 * Find the rule tuple to change. 868 */ 869 pg_rewrite_desc = table_open(RewriteRelationId, RowExclusiveLock); 870 ruletup = SearchSysCacheCopy2(RULERELNAME, 871 ObjectIdGetDatum(owningRel), 872 PointerGetDatum(rulename)); 873 if (!HeapTupleIsValid(ruletup)) 874 ereport(ERROR, 875 (errcode(ERRCODE_UNDEFINED_OBJECT), 876 errmsg("rule \"%s\" for relation \"%s\" does not exist", 877 rulename, get_rel_name(owningRel)))); 878 879 ruleform = (Form_pg_rewrite) GETSTRUCT(ruletup); 880 881 /* 882 * Verify that the user has appropriate permissions. 883 */ 884 eventRelationOid = ruleform->ev_class; 885 Assert(eventRelationOid == owningRel); 886 if (!pg_class_ownercheck(eventRelationOid, GetUserId())) 887 aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(eventRelationOid)), 888 get_rel_name(eventRelationOid)); 889 890 /* 891 * Change ev_enabled if it is different from the desired new state. 892 */ 893 if (DatumGetChar(ruleform->ev_enabled) != 894 fires_when) 895 { 896 ruleform->ev_enabled = CharGetDatum(fires_when); 897 CatalogTupleUpdate(pg_rewrite_desc, &ruletup->t_self, ruletup); 898 899 changed = true; 900 } 901 902 InvokeObjectPostAlterHook(RewriteRelationId, ruleform->oid, 0); 903 904 heap_freetuple(ruletup); 905 table_close(pg_rewrite_desc, RowExclusiveLock); 906 907 /* 908 * If we changed anything, broadcast a SI inval message to force each 909 * backend (including our own!) to rebuild relation's relcache entry. 910 * Otherwise they will fail to apply the change promptly. 911 */ 912 if (changed) 913 CacheInvalidateRelcache(rel); 914 } 915 916 917 /* 918 * Perform permissions and integrity checks before acquiring a relation lock. 919 */ 920 static void 921 RangeVarCallbackForRenameRule(const RangeVar *rv, Oid relid, Oid oldrelid, 922 void *arg) 923 { 924 HeapTuple tuple; 925 Form_pg_class form; 926 927 tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid)); 928 if (!HeapTupleIsValid(tuple)) 929 return; /* concurrently dropped */ 930 form = (Form_pg_class) GETSTRUCT(tuple); 931 932 /* only tables and views can have rules */ 933 if (form->relkind != RELKIND_RELATION && 934 form->relkind != RELKIND_VIEW && 935 form->relkind != RELKIND_PARTITIONED_TABLE) 936 ereport(ERROR, 937 (errcode(ERRCODE_WRONG_OBJECT_TYPE), 938 errmsg("\"%s\" is not a table or view", rv->relname))); 939 940 if (!allowSystemTableMods && IsSystemClass(relid, form)) 941 ereport(ERROR, 942 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), 943 errmsg("permission denied: \"%s\" is a system catalog", 944 rv->relname))); 945 946 /* you must own the table to rename one of its rules */ 947 if (!pg_class_ownercheck(relid, GetUserId())) 948 aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(relid)), rv->relname); 949 950 ReleaseSysCache(tuple); 951 } 952 953 /* 954 * Rename an existing rewrite rule. 955 */ 956 ObjectAddress 957 RenameRewriteRule(RangeVar *relation, const char *oldName, 958 const char *newName) 959 { 960 Oid relid; 961 Relation targetrel; 962 Relation pg_rewrite_desc; 963 HeapTuple ruletup; 964 Form_pg_rewrite ruleform; 965 Oid ruleOid; 966 ObjectAddress address; 967 968 /* 969 * Look up name, check permissions, and acquire lock (which we will NOT 970 * release until end of transaction). 971 */ 972 relid = RangeVarGetRelidExtended(relation, AccessExclusiveLock, 973 0, 974 RangeVarCallbackForRenameRule, 975 NULL); 976 977 /* Have lock already, so just need to build relcache entry. */ 978 targetrel = relation_open(relid, NoLock); 979 980 /* Prepare to modify pg_rewrite */ 981 pg_rewrite_desc = table_open(RewriteRelationId, RowExclusiveLock); 982 983 /* Fetch the rule's entry (it had better exist) */ 984 ruletup = SearchSysCacheCopy2(RULERELNAME, 985 ObjectIdGetDatum(relid), 986 PointerGetDatum(oldName)); 987 if (!HeapTupleIsValid(ruletup)) 988 ereport(ERROR, 989 (errcode(ERRCODE_UNDEFINED_OBJECT), 990 errmsg("rule \"%s\" for relation \"%s\" does not exist", 991 oldName, RelationGetRelationName(targetrel)))); 992 ruleform = (Form_pg_rewrite) GETSTRUCT(ruletup); 993 ruleOid = ruleform->oid; 994 995 /* rule with the new name should not already exist */ 996 if (IsDefinedRewriteRule(relid, newName)) 997 ereport(ERROR, 998 (errcode(ERRCODE_DUPLICATE_OBJECT), 999 errmsg("rule \"%s\" for relation \"%s\" already exists", 1000 newName, RelationGetRelationName(targetrel)))); 1001 1002 /* 1003 * We disallow renaming ON SELECT rules, because they should always be 1004 * named "_RETURN". 1005 */ 1006 if (ruleform->ev_type == CMD_SELECT + '0') 1007 ereport(ERROR, 1008 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), 1009 errmsg("renaming an ON SELECT rule is not allowed"))); 1010 1011 /* OK, do the update */ 1012 namestrcpy(&(ruleform->rulename), newName); 1013 1014 CatalogTupleUpdate(pg_rewrite_desc, &ruletup->t_self, ruletup); 1015 1016 InvokeObjectPostAlterHook(RewriteRelationId, ruleOid, 0); 1017 1018 heap_freetuple(ruletup); 1019 table_close(pg_rewrite_desc, RowExclusiveLock); 1020 1021 /* 1022 * Invalidate relation's relcache entry so that other backends (and this 1023 * one too!) are sent SI message to make them rebuild relcache entries. 1024 * (Ideally this should happen automatically...) 1025 */ 1026 CacheInvalidateRelcache(targetrel); 1027 1028 ObjectAddressSet(address, RewriteRelationId, ruleOid); 1029 1030 /* 1031 * Close rel, but keep exclusive lock! 1032 */ 1033 relation_close(targetrel, NoLock); 1034 1035 return address; 1036 } 1037