1 /*-------------------------------------------------------------------------
2 *
3 * rewriteDefine.c
4 * routines for defining a rewrite rule
5 *
6 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/rewrite/rewriteDefine.c
12 *
13 *-------------------------------------------------------------------------
14 */
15 #include "postgres.h"
16
17 #include "access/heapam.h"
18 #include "access/htup_details.h"
19 #include "access/multixact.h"
20 #include "access/transam.h"
21 #include "access/xact.h"
22 #include "catalog/catalog.h"
23 #include "catalog/dependency.h"
24 #include "catalog/heap.h"
25 #include "catalog/indexing.h"
26 #include "catalog/namespace.h"
27 #include "catalog/objectaccess.h"
28 #include "catalog/pg_inherits_fn.h"
29 #include "catalog/pg_rewrite.h"
30 #include "catalog/storage.h"
31 #include "commands/policy.h"
32 #include "miscadmin.h"
33 #include "nodes/nodeFuncs.h"
34 #include "parser/parse_utilcmd.h"
35 #include "rewrite/rewriteDefine.h"
36 #include "rewrite/rewriteManip.h"
37 #include "rewrite/rewriteSupport.h"
38 #include "utils/acl.h"
39 #include "utils/builtins.h"
40 #include "utils/inval.h"
41 #include "utils/lsyscache.h"
42 #include "utils/rel.h"
43 #include "utils/snapmgr.h"
44 #include "utils/syscache.h"
45 #include "utils/tqual.h"
46
47
48 static void checkRuleResultList(List *targetList, TupleDesc resultDesc,
49 bool isSelect, bool requireColumnNameMatch);
50 static bool setRuleCheckAsUser_walker(Node *node, Oid *context);
51 static void setRuleCheckAsUser_Query(Query *qry, Oid userid);
52
53
54 /*
55 * InsertRule -
56 * takes the arguments and inserts them as a row into the system
57 * relation "pg_rewrite"
58 */
59 static Oid
InsertRule(char * rulname,int evtype,Oid eventrel_oid,bool evinstead,Node * event_qual,List * action,bool replace)60 InsertRule(char *rulname,
61 int evtype,
62 Oid eventrel_oid,
63 bool evinstead,
64 Node *event_qual,
65 List *action,
66 bool replace)
67 {
68 char *evqual = nodeToString(event_qual);
69 char *actiontree = nodeToString((Node *) action);
70 Datum values[Natts_pg_rewrite];
71 bool nulls[Natts_pg_rewrite];
72 bool replaces[Natts_pg_rewrite];
73 NameData rname;
74 Relation pg_rewrite_desc;
75 HeapTuple tup,
76 oldtup;
77 Oid rewriteObjectId;
78 ObjectAddress myself,
79 referenced;
80 bool is_update = false;
81
82 /*
83 * Set up *nulls and *values arrays
84 */
85 MemSet(nulls, false, sizeof(nulls));
86
87 namestrcpy(&rname, rulname);
88 values[Anum_pg_rewrite_rulename - 1] = NameGetDatum(&rname);
89 values[Anum_pg_rewrite_ev_class - 1] = ObjectIdGetDatum(eventrel_oid);
90 values[Anum_pg_rewrite_ev_type - 1] = CharGetDatum(evtype + '0');
91 values[Anum_pg_rewrite_ev_enabled - 1] = CharGetDatum(RULE_FIRES_ON_ORIGIN);
92 values[Anum_pg_rewrite_is_instead - 1] = BoolGetDatum(evinstead);
93 values[Anum_pg_rewrite_ev_qual - 1] = CStringGetTextDatum(evqual);
94 values[Anum_pg_rewrite_ev_action - 1] = CStringGetTextDatum(actiontree);
95
96 /*
97 * Ready to store new pg_rewrite tuple
98 */
99 pg_rewrite_desc = heap_open(RewriteRelationId, RowExclusiveLock);
100
101 /*
102 * Check to see if we are replacing an existing tuple
103 */
104 oldtup = SearchSysCache2(RULERELNAME,
105 ObjectIdGetDatum(eventrel_oid),
106 PointerGetDatum(rulname));
107
108 if (HeapTupleIsValid(oldtup))
109 {
110 if (!replace)
111 ereport(ERROR,
112 (errcode(ERRCODE_DUPLICATE_OBJECT),
113 errmsg("rule \"%s\" for relation \"%s\" already exists",
114 rulname, get_rel_name(eventrel_oid))));
115
116 /*
117 * When replacing, we don't need to replace every attribute
118 */
119 MemSet(replaces, false, sizeof(replaces));
120 replaces[Anum_pg_rewrite_ev_type - 1] = true;
121 replaces[Anum_pg_rewrite_is_instead - 1] = true;
122 replaces[Anum_pg_rewrite_ev_qual - 1] = true;
123 replaces[Anum_pg_rewrite_ev_action - 1] = true;
124
125 tup = heap_modify_tuple(oldtup, RelationGetDescr(pg_rewrite_desc),
126 values, nulls, replaces);
127
128 CatalogTupleUpdate(pg_rewrite_desc, &tup->t_self, tup);
129
130 ReleaseSysCache(oldtup);
131
132 rewriteObjectId = HeapTupleGetOid(tup);
133 is_update = true;
134 }
135 else
136 {
137 tup = heap_form_tuple(pg_rewrite_desc->rd_att, values, nulls);
138
139 rewriteObjectId = CatalogTupleInsert(pg_rewrite_desc, tup);
140 }
141
142
143 heap_freetuple(tup);
144
145 /* If replacing, get rid of old dependencies and make new ones */
146 if (is_update)
147 deleteDependencyRecordsFor(RewriteRelationId, rewriteObjectId, false);
148
149 /*
150 * Install dependency on rule's relation to ensure it will go away on
151 * relation deletion. If the rule is ON SELECT, make the dependency
152 * implicit --- this prevents deleting a view's SELECT rule. Other kinds
153 * of rules can be AUTO.
154 */
155 myself.classId = RewriteRelationId;
156 myself.objectId = rewriteObjectId;
157 myself.objectSubId = 0;
158
159 referenced.classId = RelationRelationId;
160 referenced.objectId = eventrel_oid;
161 referenced.objectSubId = 0;
162
163 recordDependencyOn(&myself, &referenced,
164 (evtype == CMD_SELECT) ? DEPENDENCY_INTERNAL : DEPENDENCY_AUTO);
165
166 /*
167 * Also install dependencies on objects referenced in action and qual.
168 */
169 recordDependencyOnExpr(&myself, (Node *) action, NIL,
170 DEPENDENCY_NORMAL);
171
172 if (event_qual != NULL)
173 {
174 /* Find query containing OLD/NEW rtable entries */
175 Query *qry = linitial_node(Query, action);
176
177 qry = getInsertSelectQuery(qry, NULL);
178 recordDependencyOnExpr(&myself, event_qual, qry->rtable,
179 DEPENDENCY_NORMAL);
180 }
181
182 /* Post creation hook for new rule */
183 InvokeObjectPostCreateHook(RewriteRelationId, rewriteObjectId, 0);
184
185 heap_close(pg_rewrite_desc, RowExclusiveLock);
186
187 return rewriteObjectId;
188 }
189
190 /*
191 * DefineRule
192 * Execute a CREATE RULE command.
193 */
194 ObjectAddress
DefineRule(RuleStmt * stmt,const char * queryString)195 DefineRule(RuleStmt *stmt, const char *queryString)
196 {
197 List *actions;
198 Node *whereClause;
199 Oid relId;
200
201 /* Parse analysis. */
202 transformRuleStmt(stmt, queryString, &actions, &whereClause);
203
204 /*
205 * Find and lock the relation. Lock level should match
206 * DefineQueryRewrite.
207 */
208 relId = RangeVarGetRelid(stmt->relation, AccessExclusiveLock, false);
209
210 /* ... and execute */
211 return DefineQueryRewrite(stmt->rulename,
212 relId,
213 whereClause,
214 stmt->event,
215 stmt->instead,
216 stmt->replace,
217 actions);
218 }
219
220
221 /*
222 * DefineQueryRewrite
223 * Create a rule
224 *
225 * This is essentially the same as DefineRule() except that the rule's
226 * action and qual have already been passed through parse analysis.
227 */
228 ObjectAddress
DefineQueryRewrite(char * rulename,Oid event_relid,Node * event_qual,CmdType event_type,bool is_instead,bool replace,List * action)229 DefineQueryRewrite(char *rulename,
230 Oid event_relid,
231 Node *event_qual,
232 CmdType event_type,
233 bool is_instead,
234 bool replace,
235 List *action)
236 {
237 Relation event_relation;
238 ListCell *l;
239 Query *query;
240 bool RelisBecomingView = false;
241 Oid ruleId = InvalidOid;
242 ObjectAddress address;
243
244 /*
245 * If we are installing an ON SELECT rule, we had better grab
246 * AccessExclusiveLock to ensure no SELECTs are currently running on the
247 * event relation. For other types of rules, it would be sufficient to
248 * grab ShareRowExclusiveLock to lock out insert/update/delete actions and
249 * to ensure that we lock out current CREATE RULE statements; but because
250 * of race conditions in access to catalog entries, we can't do that yet.
251 *
252 * Note that this lock level should match the one used in DefineRule.
253 */
254 event_relation = heap_open(event_relid, AccessExclusiveLock);
255
256 /*
257 * Verify relation is of a type that rules can sensibly be applied to.
258 * Internal callers can target materialized views, but transformRuleStmt()
259 * blocks them for users. Don't mention them in the error message.
260 */
261 if (event_relation->rd_rel->relkind != RELKIND_RELATION &&
262 event_relation->rd_rel->relkind != RELKIND_MATVIEW &&
263 event_relation->rd_rel->relkind != RELKIND_VIEW &&
264 event_relation->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
265 ereport(ERROR,
266 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
267 errmsg("\"%s\" is not a table or view",
268 RelationGetRelationName(event_relation))));
269
270 if (!allowSystemTableMods && IsSystemRelation(event_relation))
271 ereport(ERROR,
272 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
273 errmsg("permission denied: \"%s\" is a system catalog",
274 RelationGetRelationName(event_relation))));
275
276 /*
277 * Check user has permission to apply rules to this relation.
278 */
279 if (!pg_class_ownercheck(event_relid, GetUserId()))
280 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
281 RelationGetRelationName(event_relation));
282
283 /*
284 * No rule actions that modify OLD or NEW
285 */
286 foreach(l, action)
287 {
288 query = lfirst_node(Query, l);
289 if (query->resultRelation == 0)
290 continue;
291 /* Don't be fooled by INSERT/SELECT */
292 if (query != getInsertSelectQuery(query, NULL))
293 continue;
294 if (query->resultRelation == PRS2_OLD_VARNO)
295 ereport(ERROR,
296 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
297 errmsg("rule actions on OLD are not implemented"),
298 errhint("Use views or triggers instead.")));
299 if (query->resultRelation == PRS2_NEW_VARNO)
300 ereport(ERROR,
301 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
302 errmsg("rule actions on NEW are not implemented"),
303 errhint("Use triggers instead.")));
304 }
305
306 if (event_type == CMD_SELECT)
307 {
308 /*
309 * Rules ON SELECT are restricted to view definitions
310 *
311 * So there cannot be INSTEAD NOTHING, ...
312 */
313 if (list_length(action) == 0)
314 ereport(ERROR,
315 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
316 errmsg("INSTEAD NOTHING rules on SELECT are not implemented"),
317 errhint("Use views instead.")));
318
319 /*
320 * ... there cannot be multiple actions, ...
321 */
322 if (list_length(action) > 1)
323 ereport(ERROR,
324 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
325 errmsg("multiple actions for rules on SELECT are not implemented")));
326
327 /*
328 * ... the one action must be a SELECT, ...
329 */
330 query = linitial_node(Query, action);
331 if (!is_instead ||
332 query->commandType != CMD_SELECT)
333 ereport(ERROR,
334 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
335 errmsg("rules on SELECT must have action INSTEAD SELECT")));
336
337 /*
338 * ... it cannot contain data-modifying WITH ...
339 */
340 if (query->hasModifyingCTE)
341 ereport(ERROR,
342 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
343 errmsg("rules on SELECT must not contain data-modifying statements in WITH")));
344
345 /*
346 * ... there can be no rule qual, ...
347 */
348 if (event_qual != NULL)
349 ereport(ERROR,
350 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
351 errmsg("event qualifications are not implemented for rules on SELECT")));
352
353 /*
354 * ... the targetlist of the SELECT action must exactly match the
355 * event relation, ...
356 */
357 checkRuleResultList(query->targetList,
358 RelationGetDescr(event_relation),
359 true,
360 event_relation->rd_rel->relkind !=
361 RELKIND_MATVIEW);
362
363 /*
364 * ... there must not be another ON SELECT rule already ...
365 */
366 if (!replace && event_relation->rd_rules != NULL)
367 {
368 int i;
369
370 for (i = 0; i < event_relation->rd_rules->numLocks; i++)
371 {
372 RewriteRule *rule;
373
374 rule = event_relation->rd_rules->rules[i];
375 if (rule->event == CMD_SELECT)
376 ereport(ERROR,
377 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
378 errmsg("\"%s\" is already a view",
379 RelationGetRelationName(event_relation))));
380 }
381 }
382
383 /*
384 * ... and finally the rule must be named _RETURN.
385 */
386 if (strcmp(rulename, ViewSelectRuleName) != 0)
387 {
388 /*
389 * In versions before 7.3, the expected name was _RETviewname. For
390 * backwards compatibility with old pg_dump output, accept that
391 * and silently change it to _RETURN. Since this is just a quick
392 * backwards-compatibility hack, limit the number of characters
393 * checked to a few less than NAMEDATALEN; this saves having to
394 * worry about where a multibyte character might have gotten
395 * truncated.
396 */
397 if (strncmp(rulename, "_RET", 4) != 0 ||
398 strncmp(rulename + 4, RelationGetRelationName(event_relation),
399 NAMEDATALEN - 4 - 4) != 0)
400 ereport(ERROR,
401 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
402 errmsg("view rule for \"%s\" must be named \"%s\"",
403 RelationGetRelationName(event_relation),
404 ViewSelectRuleName)));
405 rulename = pstrdup(ViewSelectRuleName);
406 }
407
408 /*
409 * Are we converting a relation to a view?
410 *
411 * If so, check that the relation is empty because the storage for the
412 * relation is going to be deleted. Also insist that the rel not be
413 * involved in partitioning, nor have any triggers, indexes, child or
414 * parent tables, RLS policies, or RLS enabled. (Note: some of these
415 * tests are too strict, because they will reject relations that once
416 * had such but don't anymore. But we don't really care, because this
417 * whole business of converting relations to views is just an obsolete
418 * kluge to allow dump/reload of views that participate in circular
419 * dependencies.)
420 */
421 if (event_relation->rd_rel->relkind != RELKIND_VIEW &&
422 event_relation->rd_rel->relkind != RELKIND_MATVIEW)
423 {
424 HeapScanDesc scanDesc;
425 Snapshot snapshot;
426
427 if (event_relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
428 ereport(ERROR,
429 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
430 errmsg("cannot convert partitioned table \"%s\" to a view",
431 RelationGetRelationName(event_relation))));
432
433 /* only case left: */
434 Assert(event_relation->rd_rel->relkind == RELKIND_RELATION);
435
436 if (event_relation->rd_rel->relispartition)
437 ereport(ERROR,
438 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
439 errmsg("cannot convert partition \"%s\" to a view",
440 RelationGetRelationName(event_relation))));
441
442 snapshot = RegisterSnapshot(GetLatestSnapshot());
443 scanDesc = heap_beginscan(event_relation, snapshot, 0, NULL);
444 if (heap_getnext(scanDesc, ForwardScanDirection) != NULL)
445 ereport(ERROR,
446 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
447 errmsg("could not convert table \"%s\" to a view because it is not empty",
448 RelationGetRelationName(event_relation))));
449 heap_endscan(scanDesc);
450 UnregisterSnapshot(snapshot);
451
452 if (event_relation->rd_rel->relhastriggers)
453 ereport(ERROR,
454 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
455 errmsg("could not convert table \"%s\" to a view because it has triggers",
456 RelationGetRelationName(event_relation)),
457 errhint("In particular, the table cannot be involved in any foreign key relationships.")));
458
459 if (event_relation->rd_rel->relhasindex)
460 ereport(ERROR,
461 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
462 errmsg("could not convert table \"%s\" to a view because it has indexes",
463 RelationGetRelationName(event_relation))));
464
465 if (event_relation->rd_rel->relhassubclass)
466 ereport(ERROR,
467 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
468 errmsg("could not convert table \"%s\" to a view because it has child tables",
469 RelationGetRelationName(event_relation))));
470
471 if (has_superclass(RelationGetRelid(event_relation)))
472 ereport(ERROR,
473 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
474 errmsg("could not convert table \"%s\" to a view because it has parent tables",
475 RelationGetRelationName(event_relation))));
476
477 if (event_relation->rd_rel->relrowsecurity)
478 ereport(ERROR,
479 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
480 errmsg("could not convert table \"%s\" to a view because it has row security enabled",
481 RelationGetRelationName(event_relation))));
482
483 if (relation_has_policies(event_relation))
484 ereport(ERROR,
485 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
486 errmsg("could not convert table \"%s\" to a view because it has row security policies",
487 RelationGetRelationName(event_relation))));
488
489 RelisBecomingView = true;
490 }
491 }
492 else
493 {
494 /*
495 * For non-SELECT rules, a RETURNING list can appear in at most one of
496 * the actions ... and there can't be any RETURNING list at all in a
497 * conditional or non-INSTEAD rule. (Actually, there can be at most
498 * one RETURNING list across all rules on the same event, but it seems
499 * best to enforce that at rule expansion time.) If there is a
500 * RETURNING list, it must match the event relation.
501 */
502 bool haveReturning = false;
503
504 foreach(l, action)
505 {
506 query = lfirst_node(Query, l);
507
508 if (!query->returningList)
509 continue;
510 if (haveReturning)
511 ereport(ERROR,
512 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
513 errmsg("cannot have multiple RETURNING lists in a rule")));
514 haveReturning = true;
515 if (event_qual != NULL)
516 ereport(ERROR,
517 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
518 errmsg("RETURNING lists are not supported in conditional rules")));
519 if (!is_instead)
520 ereport(ERROR,
521 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
522 errmsg("RETURNING lists are not supported in non-INSTEAD rules")));
523 checkRuleResultList(query->returningList,
524 RelationGetDescr(event_relation),
525 false, false);
526 }
527 }
528
529 /*
530 * This rule is allowed - prepare to install it.
531 */
532
533 /* discard rule if it's null action and not INSTEAD; it's a no-op */
534 if (action != NIL || is_instead)
535 {
536 ruleId = InsertRule(rulename,
537 event_type,
538 event_relid,
539 is_instead,
540 event_qual,
541 action,
542 replace);
543
544 /*
545 * Set pg_class 'relhasrules' field TRUE for event relation.
546 *
547 * Important side effect: an SI notice is broadcast to force all
548 * backends (including me!) to update relcache entries with the new
549 * rule.
550 */
551 SetRelationRuleStatus(event_relid, true);
552 }
553
554 /* ---------------------------------------------------------------------
555 * If the relation is becoming a view:
556 * - delete the associated storage files
557 * - get rid of any system attributes in pg_attribute; a view shouldn't
558 * have any of those
559 * - remove the toast table; there is no need for it anymore, and its
560 * presence would make vacuum slightly more complicated
561 * - set relkind to RELKIND_VIEW, and adjust other pg_class fields
562 * to be appropriate for a view
563 *
564 * NB: we had better have AccessExclusiveLock to do this ...
565 * ---------------------------------------------------------------------
566 */
567 if (RelisBecomingView)
568 {
569 Relation relationRelation;
570 Oid toastrelid;
571 HeapTuple classTup;
572 Form_pg_class classForm;
573
574 relationRelation = heap_open(RelationRelationId, RowExclusiveLock);
575 toastrelid = event_relation->rd_rel->reltoastrelid;
576
577 /* drop storage while table still looks like a table */
578 RelationDropStorage(event_relation);
579 DeleteSystemAttributeTuples(event_relid);
580
581 /*
582 * Drop the toast table if any. (This won't take care of updating the
583 * toast fields in the relation's own pg_class entry; we handle that
584 * below.)
585 */
586 if (OidIsValid(toastrelid))
587 {
588 ObjectAddress toastobject;
589
590 /*
591 * Delete the dependency of the toast relation on the main
592 * relation so we can drop the former without dropping the latter.
593 */
594 deleteDependencyRecordsFor(RelationRelationId, toastrelid,
595 false);
596
597 /* Make deletion of dependency record visible */
598 CommandCounterIncrement();
599
600 /* Now drop toast table, including its index */
601 toastobject.classId = RelationRelationId;
602 toastobject.objectId = toastrelid;
603 toastobject.objectSubId = 0;
604 performDeletion(&toastobject, DROP_RESTRICT,
605 PERFORM_DELETION_INTERNAL);
606 }
607
608 /*
609 * SetRelationRuleStatus may have updated the pg_class row, so we must
610 * advance the command counter before trying to update it again.
611 */
612 CommandCounterIncrement();
613
614 /*
615 * Fix pg_class entry to look like a normal view's, including setting
616 * the correct relkind and removal of reltoastrelid of the toast table
617 * we potentially removed above.
618 */
619 classTup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(event_relid));
620 if (!HeapTupleIsValid(classTup))
621 elog(ERROR, "cache lookup failed for relation %u", event_relid);
622 classForm = (Form_pg_class) GETSTRUCT(classTup);
623
624 classForm->reltablespace = InvalidOid;
625 classForm->relpages = 0;
626 classForm->reltuples = 0;
627 classForm->relallvisible = 0;
628 classForm->reltoastrelid = InvalidOid;
629 classForm->relhasindex = false;
630 classForm->relkind = RELKIND_VIEW;
631 classForm->relhasoids = false;
632 classForm->relhaspkey = false;
633 classForm->relfrozenxid = InvalidTransactionId;
634 classForm->relminmxid = InvalidMultiXactId;
635 classForm->relreplident = REPLICA_IDENTITY_NOTHING;
636
637 CatalogTupleUpdate(relationRelation, &classTup->t_self, classTup);
638
639 heap_freetuple(classTup);
640 heap_close(relationRelation, RowExclusiveLock);
641 }
642
643 ObjectAddressSet(address, RewriteRelationId, ruleId);
644
645 /* Close rel, but keep lock till commit... */
646 heap_close(event_relation, NoLock);
647
648 return address;
649 }
650
651 /*
652 * checkRuleResultList
653 * Verify that targetList produces output compatible with a tupledesc
654 *
655 * The targetList might be either a SELECT targetlist, or a RETURNING list;
656 * isSelect tells which. This is used for choosing error messages.
657 *
658 * A SELECT targetlist may optionally require that column names match.
659 */
660 static void
checkRuleResultList(List * targetList,TupleDesc resultDesc,bool isSelect,bool requireColumnNameMatch)661 checkRuleResultList(List *targetList, TupleDesc resultDesc, bool isSelect,
662 bool requireColumnNameMatch)
663 {
664 ListCell *tllist;
665 int i;
666
667 /* Only a SELECT may require a column name match. */
668 Assert(isSelect || !requireColumnNameMatch);
669
670 i = 0;
671 foreach(tllist, targetList)
672 {
673 TargetEntry *tle = (TargetEntry *) lfirst(tllist);
674 Oid tletypid;
675 int32 tletypmod;
676 Form_pg_attribute attr;
677 char *attname;
678
679 /* resjunk entries may be ignored */
680 if (tle->resjunk)
681 continue;
682 i++;
683 if (i > resultDesc->natts)
684 ereport(ERROR,
685 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
686 isSelect ?
687 errmsg("SELECT rule's target list has too many entries") :
688 errmsg("RETURNING list has too many entries")));
689
690 attr = resultDesc->attrs[i - 1];
691 attname = NameStr(attr->attname);
692
693 /*
694 * Disallow dropped columns in the relation. This is not really
695 * expected to happen when creating an ON SELECT rule. It'd be
696 * possible if someone tried to convert a relation with dropped
697 * columns to a view, but the only case we care about supporting
698 * table-to-view conversion for is pg_dump, and pg_dump won't do that.
699 *
700 * Unfortunately, the situation is also possible when adding a rule
701 * with RETURNING to a regular table, and rejecting that case is
702 * altogether more annoying. In principle we could support it by
703 * modifying the targetlist to include dummy NULL columns
704 * corresponding to the dropped columns in the tupdesc. However,
705 * places like ruleutils.c would have to be fixed to not process such
706 * entries, and that would take an uncertain and possibly rather large
707 * amount of work. (Note we could not dodge that by marking the dummy
708 * columns resjunk, since it's precisely the non-resjunk tlist columns
709 * that are expected to correspond to table columns.)
710 */
711 if (attr->attisdropped)
712 ereport(ERROR,
713 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
714 isSelect ?
715 errmsg("cannot convert relation containing dropped columns to view") :
716 errmsg("cannot create a RETURNING list for a relation containing dropped columns")));
717
718 /* Check name match if required; no need for two error texts here */
719 if (requireColumnNameMatch && strcmp(tle->resname, attname) != 0)
720 ereport(ERROR,
721 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
722 errmsg("SELECT rule's target entry %d has different column name from column \"%s\"",
723 i, attname),
724 errdetail("SELECT target entry is named \"%s\".",
725 tle->resname)));
726
727 /* Check type match. */
728 tletypid = exprType((Node *) tle->expr);
729 if (attr->atttypid != tletypid)
730 ereport(ERROR,
731 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
732 isSelect ?
733 errmsg("SELECT rule's target entry %d has different type from column \"%s\"",
734 i, attname) :
735 errmsg("RETURNING list's entry %d has different type from column \"%s\"",
736 i, attname),
737 isSelect ?
738 errdetail("SELECT target entry has type %s, but column has type %s.",
739 format_type_be(tletypid),
740 format_type_be(attr->atttypid)) :
741 errdetail("RETURNING list entry has type %s, but column has type %s.",
742 format_type_be(tletypid),
743 format_type_be(attr->atttypid))));
744
745 /*
746 * Allow typmods to be different only if one of them is -1, ie,
747 * "unspecified". This is necessary for cases like "numeric", where
748 * the table will have a filled-in default length but the select
749 * rule's expression will probably have typmod = -1.
750 */
751 tletypmod = exprTypmod((Node *) tle->expr);
752 if (attr->atttypmod != tletypmod &&
753 attr->atttypmod != -1 && tletypmod != -1)
754 ereport(ERROR,
755 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
756 isSelect ?
757 errmsg("SELECT rule's target entry %d has different size from column \"%s\"",
758 i, attname) :
759 errmsg("RETURNING list's entry %d has different size from column \"%s\"",
760 i, attname),
761 isSelect ?
762 errdetail("SELECT target entry has type %s, but column has type %s.",
763 format_type_with_typemod(tletypid, tletypmod),
764 format_type_with_typemod(attr->atttypid,
765 attr->atttypmod)) :
766 errdetail("RETURNING list entry has type %s, but column has type %s.",
767 format_type_with_typemod(tletypid, tletypmod),
768 format_type_with_typemod(attr->atttypid,
769 attr->atttypmod))));
770 }
771
772 if (i != resultDesc->natts)
773 ereport(ERROR,
774 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
775 isSelect ?
776 errmsg("SELECT rule's target list has too few entries") :
777 errmsg("RETURNING list has too few entries")));
778 }
779
780 /*
781 * setRuleCheckAsUser
782 * Recursively scan a query or expression tree and set the checkAsUser
783 * field to the given userid in all rtable entries.
784 *
785 * Note: for a view (ON SELECT rule), the checkAsUser field of the OLD
786 * RTE entry will be overridden when the view rule is expanded, and the
787 * checkAsUser field of the NEW entry is irrelevant because that entry's
788 * requiredPerms bits will always be zero. However, for other types of rules
789 * it's important to set these fields to match the rule owner. So we just set
790 * them always.
791 */
792 void
setRuleCheckAsUser(Node * node,Oid userid)793 setRuleCheckAsUser(Node *node, Oid userid)
794 {
795 (void) setRuleCheckAsUser_walker(node, &userid);
796 }
797
798 static bool
setRuleCheckAsUser_walker(Node * node,Oid * context)799 setRuleCheckAsUser_walker(Node *node, Oid *context)
800 {
801 if (node == NULL)
802 return false;
803 if (IsA(node, Query))
804 {
805 setRuleCheckAsUser_Query((Query *) node, *context);
806 return false;
807 }
808 return expression_tree_walker(node, setRuleCheckAsUser_walker,
809 (void *) context);
810 }
811
812 static void
setRuleCheckAsUser_Query(Query * qry,Oid userid)813 setRuleCheckAsUser_Query(Query *qry, Oid userid)
814 {
815 ListCell *l;
816
817 /* Set all the RTEs in this query node */
818 foreach(l, qry->rtable)
819 {
820 RangeTblEntry *rte = (RangeTblEntry *) lfirst(l);
821
822 if (rte->rtekind == RTE_SUBQUERY)
823 {
824 /* Recurse into subquery in FROM */
825 setRuleCheckAsUser_Query(rte->subquery, userid);
826 }
827 else
828 rte->checkAsUser = userid;
829 }
830
831 /* Recurse into subquery-in-WITH */
832 foreach(l, qry->cteList)
833 {
834 CommonTableExpr *cte = (CommonTableExpr *) lfirst(l);
835
836 setRuleCheckAsUser_Query(castNode(Query, cte->ctequery), userid);
837 }
838
839 /* If there are sublinks, search for them and process their RTEs */
840 if (qry->hasSubLinks)
841 query_tree_walker(qry, setRuleCheckAsUser_walker, (void *) &userid,
842 QTW_IGNORE_RC_SUBQUERIES);
843 }
844
845
846 /*
847 * Change the firing semantics of an existing rule.
848 */
849 void
EnableDisableRule(Relation rel,const char * rulename,char fires_when)850 EnableDisableRule(Relation rel, const char *rulename,
851 char fires_when)
852 {
853 Relation pg_rewrite_desc;
854 Oid owningRel = RelationGetRelid(rel);
855 Oid eventRelationOid;
856 HeapTuple ruletup;
857 bool changed = false;
858
859 /*
860 * Find the rule tuple to change.
861 */
862 pg_rewrite_desc = heap_open(RewriteRelationId, RowExclusiveLock);
863 ruletup = SearchSysCacheCopy2(RULERELNAME,
864 ObjectIdGetDatum(owningRel),
865 PointerGetDatum(rulename));
866 if (!HeapTupleIsValid(ruletup))
867 ereport(ERROR,
868 (errcode(ERRCODE_UNDEFINED_OBJECT),
869 errmsg("rule \"%s\" for relation \"%s\" does not exist",
870 rulename, get_rel_name(owningRel))));
871
872 /*
873 * Verify that the user has appropriate permissions.
874 */
875 eventRelationOid = ((Form_pg_rewrite) GETSTRUCT(ruletup))->ev_class;
876 Assert(eventRelationOid == owningRel);
877 if (!pg_class_ownercheck(eventRelationOid, GetUserId()))
878 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
879 get_rel_name(eventRelationOid));
880
881 /*
882 * Change ev_enabled if it is different from the desired new state.
883 */
884 if (DatumGetChar(((Form_pg_rewrite) GETSTRUCT(ruletup))->ev_enabled) !=
885 fires_when)
886 {
887 ((Form_pg_rewrite) GETSTRUCT(ruletup))->ev_enabled =
888 CharGetDatum(fires_when);
889 CatalogTupleUpdate(pg_rewrite_desc, &ruletup->t_self, ruletup);
890
891 changed = true;
892 }
893
894 InvokeObjectPostAlterHook(RewriteRelationId,
895 HeapTupleGetOid(ruletup), 0);
896
897 heap_freetuple(ruletup);
898 heap_close(pg_rewrite_desc, RowExclusiveLock);
899
900 /*
901 * If we changed anything, broadcast a SI inval message to force each
902 * backend (including our own!) to rebuild relation's relcache entry.
903 * Otherwise they will fail to apply the change promptly.
904 */
905 if (changed)
906 CacheInvalidateRelcache(rel);
907 }
908
909
910 /*
911 * Perform permissions and integrity checks before acquiring a relation lock.
912 */
913 static void
RangeVarCallbackForRenameRule(const RangeVar * rv,Oid relid,Oid oldrelid,void * arg)914 RangeVarCallbackForRenameRule(const RangeVar *rv, Oid relid, Oid oldrelid,
915 void *arg)
916 {
917 HeapTuple tuple;
918 Form_pg_class form;
919
920 tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
921 if (!HeapTupleIsValid(tuple))
922 return; /* concurrently dropped */
923 form = (Form_pg_class) GETSTRUCT(tuple);
924
925 /* only tables and views can have rules */
926 if (form->relkind != RELKIND_RELATION &&
927 form->relkind != RELKIND_VIEW &&
928 form->relkind != RELKIND_PARTITIONED_TABLE)
929 ereport(ERROR,
930 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
931 errmsg("\"%s\" is not a table or view", rv->relname)));
932
933 if (!allowSystemTableMods && IsSystemClass(relid, form))
934 ereport(ERROR,
935 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
936 errmsg("permission denied: \"%s\" is a system catalog",
937 rv->relname)));
938
939 /* you must own the table to rename one of its rules */
940 if (!pg_class_ownercheck(relid, GetUserId()))
941 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, rv->relname);
942
943 ReleaseSysCache(tuple);
944 }
945
946 /*
947 * Rename an existing rewrite rule.
948 */
949 ObjectAddress
RenameRewriteRule(RangeVar * relation,const char * oldName,const char * newName)950 RenameRewriteRule(RangeVar *relation, const char *oldName,
951 const char *newName)
952 {
953 Oid relid;
954 Relation targetrel;
955 Relation pg_rewrite_desc;
956 HeapTuple ruletup;
957 Form_pg_rewrite ruleform;
958 Oid ruleOid;
959 ObjectAddress address;
960
961 /*
962 * Look up name, check permissions, and acquire lock (which we will NOT
963 * release until end of transaction).
964 */
965 relid = RangeVarGetRelidExtended(relation, AccessExclusiveLock,
966 false, false,
967 RangeVarCallbackForRenameRule,
968 NULL);
969
970 /* Have lock already, so just need to build relcache entry. */
971 targetrel = relation_open(relid, NoLock);
972
973 /* Prepare to modify pg_rewrite */
974 pg_rewrite_desc = heap_open(RewriteRelationId, RowExclusiveLock);
975
976 /* Fetch the rule's entry (it had better exist) */
977 ruletup = SearchSysCacheCopy2(RULERELNAME,
978 ObjectIdGetDatum(relid),
979 PointerGetDatum(oldName));
980 if (!HeapTupleIsValid(ruletup))
981 ereport(ERROR,
982 (errcode(ERRCODE_UNDEFINED_OBJECT),
983 errmsg("rule \"%s\" for relation \"%s\" does not exist",
984 oldName, RelationGetRelationName(targetrel))));
985 ruleform = (Form_pg_rewrite) GETSTRUCT(ruletup);
986 ruleOid = HeapTupleGetOid(ruletup);
987
988 /* rule with the new name should not already exist */
989 if (IsDefinedRewriteRule(relid, newName))
990 ereport(ERROR,
991 (errcode(ERRCODE_DUPLICATE_OBJECT),
992 errmsg("rule \"%s\" for relation \"%s\" already exists",
993 newName, RelationGetRelationName(targetrel))));
994
995 /*
996 * We disallow renaming ON SELECT rules, because they should always be
997 * named "_RETURN".
998 */
999 if (ruleform->ev_type == CMD_SELECT + '0')
1000 ereport(ERROR,
1001 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1002 errmsg("renaming an ON SELECT rule is not allowed")));
1003
1004 /* OK, do the update */
1005 namestrcpy(&(ruleform->rulename), newName);
1006
1007 CatalogTupleUpdate(pg_rewrite_desc, &ruletup->t_self, ruletup);
1008
1009 heap_freetuple(ruletup);
1010 heap_close(pg_rewrite_desc, RowExclusiveLock);
1011
1012 /*
1013 * Invalidate relation's relcache entry so that other backends (and this
1014 * one too!) are sent SI message to make them rebuild relcache entries.
1015 * (Ideally this should happen automatically...)
1016 */
1017 CacheInvalidateRelcache(targetrel);
1018
1019 ObjectAddressSet(address, RewriteRelationId, ruleOid);
1020
1021 /*
1022 * Close rel, but keep exclusive lock!
1023 */
1024 relation_close(targetrel, NoLock);
1025
1026 return address;
1027 }
1028