1 /*-------------------------------------------------------------------------
2  *
3  * rewriteDefine.c
4  *	  routines for defining a rewrite rule
5  *
6  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/rewrite/rewriteDefine.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/heapam.h"
18 #include "access/htup_details.h"
19 #include "access/multixact.h"
20 #include "access/tableam.h"
21 #include "access/transam.h"
22 #include "access/xact.h"
23 #include "catalog/catalog.h"
24 #include "catalog/dependency.h"
25 #include "catalog/heap.h"
26 #include "catalog/namespace.h"
27 #include "catalog/objectaccess.h"
28 #include "catalog/pg_inherits.h"
29 #include "catalog/pg_rewrite.h"
30 #include "catalog/storage.h"
31 #include "commands/policy.h"
32 #include "miscadmin.h"
33 #include "nodes/nodeFuncs.h"
34 #include "parser/parse_utilcmd.h"
35 #include "rewrite/rewriteDefine.h"
36 #include "rewrite/rewriteManip.h"
37 #include "rewrite/rewriteSupport.h"
38 #include "utils/acl.h"
39 #include "utils/builtins.h"
40 #include "utils/inval.h"
41 #include "utils/lsyscache.h"
42 #include "utils/rel.h"
43 #include "utils/snapmgr.h"
44 #include "utils/syscache.h"
45 
46 
47 static void checkRuleResultList(List *targetList, TupleDesc resultDesc,
48 								bool isSelect, bool requireColumnNameMatch);
49 static bool setRuleCheckAsUser_walker(Node *node, Oid *context);
50 static void setRuleCheckAsUser_Query(Query *qry, Oid userid);
51 
52 
53 /*
54  * InsertRule -
55  *	  takes the arguments and inserts them as a row into the system
56  *	  relation "pg_rewrite"
57  */
58 static Oid
59 InsertRule(const char *rulname,
60 		   int evtype,
61 		   Oid eventrel_oid,
62 		   bool evinstead,
63 		   Node *event_qual,
64 		   List *action,
65 		   bool replace)
66 {
67 	char	   *evqual = nodeToString(event_qual);
68 	char	   *actiontree = nodeToString((Node *) action);
69 	Datum		values[Natts_pg_rewrite];
70 	bool		nulls[Natts_pg_rewrite];
71 	bool		replaces[Natts_pg_rewrite];
72 	NameData	rname;
73 	Relation	pg_rewrite_desc;
74 	HeapTuple	tup,
75 				oldtup;
76 	Oid			rewriteObjectId;
77 	ObjectAddress myself,
78 				referenced;
79 	bool		is_update = false;
80 
81 	/*
82 	 * Set up *nulls and *values arrays
83 	 */
84 	MemSet(nulls, false, sizeof(nulls));
85 
86 	namestrcpy(&rname, rulname);
87 	values[Anum_pg_rewrite_rulename - 1] = NameGetDatum(&rname);
88 	values[Anum_pg_rewrite_ev_class - 1] = ObjectIdGetDatum(eventrel_oid);
89 	values[Anum_pg_rewrite_ev_type - 1] = CharGetDatum(evtype + '0');
90 	values[Anum_pg_rewrite_ev_enabled - 1] = CharGetDatum(RULE_FIRES_ON_ORIGIN);
91 	values[Anum_pg_rewrite_is_instead - 1] = BoolGetDatum(evinstead);
92 	values[Anum_pg_rewrite_ev_qual - 1] = CStringGetTextDatum(evqual);
93 	values[Anum_pg_rewrite_ev_action - 1] = CStringGetTextDatum(actiontree);
94 
95 	/*
96 	 * Ready to store new pg_rewrite tuple
97 	 */
98 	pg_rewrite_desc = table_open(RewriteRelationId, RowExclusiveLock);
99 
100 	/*
101 	 * Check to see if we are replacing an existing tuple
102 	 */
103 	oldtup = SearchSysCache2(RULERELNAME,
104 							 ObjectIdGetDatum(eventrel_oid),
105 							 PointerGetDatum(rulname));
106 
107 	if (HeapTupleIsValid(oldtup))
108 	{
109 		if (!replace)
110 			ereport(ERROR,
111 					(errcode(ERRCODE_DUPLICATE_OBJECT),
112 					 errmsg("rule \"%s\" for relation \"%s\" already exists",
113 							rulname, get_rel_name(eventrel_oid))));
114 
115 		/*
116 		 * When replacing, we don't need to replace every attribute
117 		 */
118 		MemSet(replaces, false, sizeof(replaces));
119 		replaces[Anum_pg_rewrite_ev_type - 1] = true;
120 		replaces[Anum_pg_rewrite_is_instead - 1] = true;
121 		replaces[Anum_pg_rewrite_ev_qual - 1] = true;
122 		replaces[Anum_pg_rewrite_ev_action - 1] = true;
123 
124 		tup = heap_modify_tuple(oldtup, RelationGetDescr(pg_rewrite_desc),
125 								values, nulls, replaces);
126 
127 		CatalogTupleUpdate(pg_rewrite_desc, &tup->t_self, tup);
128 
129 		ReleaseSysCache(oldtup);
130 
131 		rewriteObjectId = ((Form_pg_rewrite) GETSTRUCT(tup))->oid;
132 		is_update = true;
133 	}
134 	else
135 	{
136 		rewriteObjectId = GetNewOidWithIndex(pg_rewrite_desc,
137 											 RewriteOidIndexId,
138 											 Anum_pg_rewrite_oid);
139 		values[Anum_pg_rewrite_oid - 1] = ObjectIdGetDatum(rewriteObjectId);
140 
141 		tup = heap_form_tuple(pg_rewrite_desc->rd_att, values, nulls);
142 
143 		CatalogTupleInsert(pg_rewrite_desc, tup);
144 	}
145 
146 
147 	heap_freetuple(tup);
148 
149 	/* If replacing, get rid of old dependencies and make new ones */
150 	if (is_update)
151 		deleteDependencyRecordsFor(RewriteRelationId, rewriteObjectId, false);
152 
153 	/*
154 	 * Install dependency on rule's relation to ensure it will go away on
155 	 * relation deletion.  If the rule is ON SELECT, make the dependency
156 	 * implicit --- this prevents deleting a view's SELECT rule.  Other kinds
157 	 * of rules can be AUTO.
158 	 */
159 	myself.classId = RewriteRelationId;
160 	myself.objectId = rewriteObjectId;
161 	myself.objectSubId = 0;
162 
163 	referenced.classId = RelationRelationId;
164 	referenced.objectId = eventrel_oid;
165 	referenced.objectSubId = 0;
166 
167 	recordDependencyOn(&myself, &referenced,
168 					   (evtype == CMD_SELECT) ? DEPENDENCY_INTERNAL : DEPENDENCY_AUTO);
169 
170 	/*
171 	 * Also install dependencies on objects referenced in action and qual.
172 	 */
173 	recordDependencyOnExpr(&myself, (Node *) action, NIL,
174 						   DEPENDENCY_NORMAL);
175 
176 	if (event_qual != NULL)
177 	{
178 		/* Find query containing OLD/NEW rtable entries */
179 		Query	   *qry = linitial_node(Query, action);
180 
181 		qry = getInsertSelectQuery(qry, NULL);
182 		recordDependencyOnExpr(&myself, event_qual, qry->rtable,
183 							   DEPENDENCY_NORMAL);
184 	}
185 
186 	/* Post creation hook for new rule */
187 	InvokeObjectPostCreateHook(RewriteRelationId, rewriteObjectId, 0);
188 
189 	table_close(pg_rewrite_desc, RowExclusiveLock);
190 
191 	return rewriteObjectId;
192 }
193 
194 /*
195  * DefineRule
196  *		Execute a CREATE RULE command.
197  */
198 ObjectAddress
199 DefineRule(RuleStmt *stmt, const char *queryString)
200 {
201 	List	   *actions;
202 	Node	   *whereClause;
203 	Oid			relId;
204 
205 	/* Parse analysis. */
206 	transformRuleStmt(stmt, queryString, &actions, &whereClause);
207 
208 	/*
209 	 * Find and lock the relation.  Lock level should match
210 	 * DefineQueryRewrite.
211 	 */
212 	relId = RangeVarGetRelid(stmt->relation, AccessExclusiveLock, false);
213 
214 	/* ... and execute */
215 	return DefineQueryRewrite(stmt->rulename,
216 							  relId,
217 							  whereClause,
218 							  stmt->event,
219 							  stmt->instead,
220 							  stmt->replace,
221 							  actions);
222 }
223 
224 
225 /*
226  * DefineQueryRewrite
227  *		Create a rule
228  *
229  * This is essentially the same as DefineRule() except that the rule's
230  * action and qual have already been passed through parse analysis.
231  */
232 ObjectAddress
233 DefineQueryRewrite(const char *rulename,
234 				   Oid event_relid,
235 				   Node *event_qual,
236 				   CmdType event_type,
237 				   bool is_instead,
238 				   bool replace,
239 				   List *action)
240 {
241 	Relation	event_relation;
242 	ListCell   *l;
243 	Query	   *query;
244 	bool		RelisBecomingView = false;
245 	Oid			ruleId = InvalidOid;
246 	ObjectAddress address;
247 
248 	/*
249 	 * If we are installing an ON SELECT rule, we had better grab
250 	 * AccessExclusiveLock to ensure no SELECTs are currently running on the
251 	 * event relation. For other types of rules, it would be sufficient to
252 	 * grab ShareRowExclusiveLock to lock out insert/update/delete actions and
253 	 * to ensure that we lock out current CREATE RULE statements; but because
254 	 * of race conditions in access to catalog entries, we can't do that yet.
255 	 *
256 	 * Note that this lock level should match the one used in DefineRule.
257 	 */
258 	event_relation = table_open(event_relid, AccessExclusiveLock);
259 
260 	/*
261 	 * Verify relation is of a type that rules can sensibly be applied to.
262 	 * Internal callers can target materialized views, but transformRuleStmt()
263 	 * blocks them for users.  Don't mention them in the error message.
264 	 */
265 	if (event_relation->rd_rel->relkind != RELKIND_RELATION &&
266 		event_relation->rd_rel->relkind != RELKIND_MATVIEW &&
267 		event_relation->rd_rel->relkind != RELKIND_VIEW &&
268 		event_relation->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
269 		ereport(ERROR,
270 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
271 				 errmsg("\"%s\" is not a table or view",
272 						RelationGetRelationName(event_relation))));
273 
274 	if (!allowSystemTableMods && IsSystemRelation(event_relation))
275 		ereport(ERROR,
276 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
277 				 errmsg("permission denied: \"%s\" is a system catalog",
278 						RelationGetRelationName(event_relation))));
279 
280 	/*
281 	 * Check user has permission to apply rules to this relation.
282 	 */
283 	if (!pg_class_ownercheck(event_relid, GetUserId()))
284 		aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(event_relation->rd_rel->relkind),
285 					   RelationGetRelationName(event_relation));
286 
287 	/*
288 	 * No rule actions that modify OLD or NEW
289 	 */
290 	foreach(l, action)
291 	{
292 		query = lfirst_node(Query, l);
293 		if (query->resultRelation == 0)
294 			continue;
295 		/* Don't be fooled by INSERT/SELECT */
296 		if (query != getInsertSelectQuery(query, NULL))
297 			continue;
298 		if (query->resultRelation == PRS2_OLD_VARNO)
299 			ereport(ERROR,
300 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
301 					 errmsg("rule actions on OLD are not implemented"),
302 					 errhint("Use views or triggers instead.")));
303 		if (query->resultRelation == PRS2_NEW_VARNO)
304 			ereport(ERROR,
305 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
306 					 errmsg("rule actions on NEW are not implemented"),
307 					 errhint("Use triggers instead.")));
308 	}
309 
310 	if (event_type == CMD_SELECT)
311 	{
312 		/*
313 		 * Rules ON SELECT are restricted to view definitions
314 		 *
315 		 * So there cannot be INSTEAD NOTHING, ...
316 		 */
317 		if (list_length(action) == 0)
318 			ereport(ERROR,
319 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
320 					 errmsg("INSTEAD NOTHING rules on SELECT are not implemented"),
321 					 errhint("Use views instead.")));
322 
323 		/*
324 		 * ... there cannot be multiple actions, ...
325 		 */
326 		if (list_length(action) > 1)
327 			ereport(ERROR,
328 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
329 					 errmsg("multiple actions for rules on SELECT are not implemented")));
330 
331 		/*
332 		 * ... the one action must be a SELECT, ...
333 		 */
334 		query = linitial_node(Query, action);
335 		if (!is_instead ||
336 			query->commandType != CMD_SELECT)
337 			ereport(ERROR,
338 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
339 					 errmsg("rules on SELECT must have action INSTEAD SELECT")));
340 
341 		/*
342 		 * ... it cannot contain data-modifying WITH ...
343 		 */
344 		if (query->hasModifyingCTE)
345 			ereport(ERROR,
346 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
347 					 errmsg("rules on SELECT must not contain data-modifying statements in WITH")));
348 
349 		/*
350 		 * ... there can be no rule qual, ...
351 		 */
352 		if (event_qual != NULL)
353 			ereport(ERROR,
354 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
355 					 errmsg("event qualifications are not implemented for rules on SELECT")));
356 
357 		/*
358 		 * ... the targetlist of the SELECT action must exactly match the
359 		 * event relation, ...
360 		 */
361 		checkRuleResultList(query->targetList,
362 							RelationGetDescr(event_relation),
363 							true,
364 							event_relation->rd_rel->relkind !=
365 							RELKIND_MATVIEW);
366 
367 		/*
368 		 * ... there must not be another ON SELECT rule already ...
369 		 */
370 		if (!replace && event_relation->rd_rules != NULL)
371 		{
372 			int			i;
373 
374 			for (i = 0; i < event_relation->rd_rules->numLocks; i++)
375 			{
376 				RewriteRule *rule;
377 
378 				rule = event_relation->rd_rules->rules[i];
379 				if (rule->event == CMD_SELECT)
380 					ereport(ERROR,
381 							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
382 							 errmsg("\"%s\" is already a view",
383 									RelationGetRelationName(event_relation))));
384 			}
385 		}
386 
387 		/*
388 		 * ... and finally the rule must be named _RETURN.
389 		 */
390 		if (strcmp(rulename, ViewSelectRuleName) != 0)
391 		{
392 			/*
393 			 * In versions before 7.3, the expected name was _RETviewname. For
394 			 * backwards compatibility with old pg_dump output, accept that
395 			 * and silently change it to _RETURN.  Since this is just a quick
396 			 * backwards-compatibility hack, limit the number of characters
397 			 * checked to a few less than NAMEDATALEN; this saves having to
398 			 * worry about where a multibyte character might have gotten
399 			 * truncated.
400 			 */
401 			if (strncmp(rulename, "_RET", 4) != 0 ||
402 				strncmp(rulename + 4, RelationGetRelationName(event_relation),
403 						NAMEDATALEN - 4 - 4) != 0)
404 				ereport(ERROR,
405 						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
406 						 errmsg("view rule for \"%s\" must be named \"%s\"",
407 								RelationGetRelationName(event_relation),
408 								ViewSelectRuleName)));
409 			rulename = pstrdup(ViewSelectRuleName);
410 		}
411 
412 		/*
413 		 * Are we converting a relation to a view?
414 		 *
415 		 * If so, check that the relation is empty because the storage for the
416 		 * relation is going to be deleted.  Also insist that the rel not be
417 		 * involved in partitioning, nor have any triggers, indexes, child or
418 		 * parent tables, RLS policies, or RLS enabled.  (Note: some of these
419 		 * tests are too strict, because they will reject relations that once
420 		 * had such but don't anymore.  But we don't really care, because this
421 		 * whole business of converting relations to views is just an obsolete
422 		 * kluge to allow dump/reload of views that participate in circular
423 		 * dependencies.)
424 		 */
425 		if (event_relation->rd_rel->relkind != RELKIND_VIEW &&
426 			event_relation->rd_rel->relkind != RELKIND_MATVIEW)
427 		{
428 			TableScanDesc scanDesc;
429 			Snapshot	snapshot;
430 			TupleTableSlot *slot;
431 
432 			if (event_relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
433 				ereport(ERROR,
434 						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
435 						 errmsg("cannot convert partitioned table \"%s\" to a view",
436 								RelationGetRelationName(event_relation))));
437 
438 			/* only case left: */
439 			Assert(event_relation->rd_rel->relkind == RELKIND_RELATION);
440 
441 			if (event_relation->rd_rel->relispartition)
442 				ereport(ERROR,
443 						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
444 						 errmsg("cannot convert partition \"%s\" to a view",
445 								RelationGetRelationName(event_relation))));
446 
447 			snapshot = RegisterSnapshot(GetLatestSnapshot());
448 			scanDesc = table_beginscan(event_relation, snapshot, 0, NULL);
449 			slot = table_slot_create(event_relation, NULL);
450 			if (table_scan_getnextslot(scanDesc, ForwardScanDirection, slot))
451 				ereport(ERROR,
452 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
453 						 errmsg("could not convert table \"%s\" to a view because it is not empty",
454 								RelationGetRelationName(event_relation))));
455 			ExecDropSingleTupleTableSlot(slot);
456 			table_endscan(scanDesc);
457 			UnregisterSnapshot(snapshot);
458 
459 			if (event_relation->rd_rel->relhastriggers)
460 				ereport(ERROR,
461 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
462 						 errmsg("could not convert table \"%s\" to a view because it has triggers",
463 								RelationGetRelationName(event_relation)),
464 						 errhint("In particular, the table cannot be involved in any foreign key relationships.")));
465 
466 			if (event_relation->rd_rel->relhasindex)
467 				ereport(ERROR,
468 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
469 						 errmsg("could not convert table \"%s\" to a view because it has indexes",
470 								RelationGetRelationName(event_relation))));
471 
472 			if (event_relation->rd_rel->relhassubclass)
473 				ereport(ERROR,
474 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
475 						 errmsg("could not convert table \"%s\" to a view because it has child tables",
476 								RelationGetRelationName(event_relation))));
477 
478 			if (has_superclass(RelationGetRelid(event_relation)))
479 				ereport(ERROR,
480 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
481 						 errmsg("could not convert table \"%s\" to a view because it has parent tables",
482 								RelationGetRelationName(event_relation))));
483 
484 			if (event_relation->rd_rel->relrowsecurity)
485 				ereport(ERROR,
486 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
487 						 errmsg("could not convert table \"%s\" to a view because it has row security enabled",
488 								RelationGetRelationName(event_relation))));
489 
490 			if (relation_has_policies(event_relation))
491 				ereport(ERROR,
492 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
493 						 errmsg("could not convert table \"%s\" to a view because it has row security policies",
494 								RelationGetRelationName(event_relation))));
495 
496 			RelisBecomingView = true;
497 		}
498 	}
499 	else
500 	{
501 		/*
502 		 * For non-SELECT rules, a RETURNING list can appear in at most one of
503 		 * the actions ... and there can't be any RETURNING list at all in a
504 		 * conditional or non-INSTEAD rule.  (Actually, there can be at most
505 		 * one RETURNING list across all rules on the same event, but it seems
506 		 * best to enforce that at rule expansion time.)  If there is a
507 		 * RETURNING list, it must match the event relation.
508 		 */
509 		bool		haveReturning = false;
510 
511 		foreach(l, action)
512 		{
513 			query = lfirst_node(Query, l);
514 
515 			if (!query->returningList)
516 				continue;
517 			if (haveReturning)
518 				ereport(ERROR,
519 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
520 						 errmsg("cannot have multiple RETURNING lists in a rule")));
521 			haveReturning = true;
522 			if (event_qual != NULL)
523 				ereport(ERROR,
524 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
525 						 errmsg("RETURNING lists are not supported in conditional rules")));
526 			if (!is_instead)
527 				ereport(ERROR,
528 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
529 						 errmsg("RETURNING lists are not supported in non-INSTEAD rules")));
530 			checkRuleResultList(query->returningList,
531 								RelationGetDescr(event_relation),
532 								false, false);
533 		}
534 	}
535 
536 	/*
537 	 * This rule is allowed - prepare to install it.
538 	 */
539 
540 	/* discard rule if it's null action and not INSTEAD; it's a no-op */
541 	if (action != NIL || is_instead)
542 	{
543 		ruleId = InsertRule(rulename,
544 							event_type,
545 							event_relid,
546 							is_instead,
547 							event_qual,
548 							action,
549 							replace);
550 
551 		/*
552 		 * Set pg_class 'relhasrules' field true for event relation.
553 		 *
554 		 * Important side effect: an SI notice is broadcast to force all
555 		 * backends (including me!) to update relcache entries with the new
556 		 * rule.
557 		 */
558 		SetRelationRuleStatus(event_relid, true);
559 	}
560 
561 	/* ---------------------------------------------------------------------
562 	 * If the relation is becoming a view:
563 	 * - delete the associated storage files
564 	 * - get rid of any system attributes in pg_attribute; a view shouldn't
565 	 *	 have any of those
566 	 * - remove the toast table; there is no need for it anymore, and its
567 	 *	 presence would make vacuum slightly more complicated
568 	 * - set relkind to RELKIND_VIEW, and adjust other pg_class fields
569 	 *	 to be appropriate for a view
570 	 *
571 	 * NB: we had better have AccessExclusiveLock to do this ...
572 	 * ---------------------------------------------------------------------
573 	 */
574 	if (RelisBecomingView)
575 	{
576 		Relation	relationRelation;
577 		Oid			toastrelid;
578 		HeapTuple	classTup;
579 		Form_pg_class classForm;
580 
581 		relationRelation = table_open(RelationRelationId, RowExclusiveLock);
582 		toastrelid = event_relation->rd_rel->reltoastrelid;
583 
584 		/* drop storage while table still looks like a table  */
585 		RelationDropStorage(event_relation);
586 		DeleteSystemAttributeTuples(event_relid);
587 
588 		/*
589 		 * Drop the toast table if any.  (This won't take care of updating the
590 		 * toast fields in the relation's own pg_class entry; we handle that
591 		 * below.)
592 		 */
593 		if (OidIsValid(toastrelid))
594 		{
595 			ObjectAddress toastobject;
596 
597 			/*
598 			 * Delete the dependency of the toast relation on the main
599 			 * relation so we can drop the former without dropping the latter.
600 			 */
601 			deleteDependencyRecordsFor(RelationRelationId, toastrelid,
602 									   false);
603 
604 			/* Make deletion of dependency record visible */
605 			CommandCounterIncrement();
606 
607 			/* Now drop toast table, including its index */
608 			toastobject.classId = RelationRelationId;
609 			toastobject.objectId = toastrelid;
610 			toastobject.objectSubId = 0;
611 			performDeletion(&toastobject, DROP_RESTRICT,
612 							PERFORM_DELETION_INTERNAL);
613 		}
614 
615 		/*
616 		 * SetRelationRuleStatus may have updated the pg_class row, so we must
617 		 * advance the command counter before trying to update it again.
618 		 */
619 		CommandCounterIncrement();
620 
621 		/*
622 		 * Fix pg_class entry to look like a normal view's, including setting
623 		 * the correct relkind and removal of reltoastrelid of the toast table
624 		 * we potentially removed above.
625 		 */
626 		classTup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(event_relid));
627 		if (!HeapTupleIsValid(classTup))
628 			elog(ERROR, "cache lookup failed for relation %u", event_relid);
629 		classForm = (Form_pg_class) GETSTRUCT(classTup);
630 
631 		classForm->relam = InvalidOid;
632 		classForm->reltablespace = InvalidOid;
633 		classForm->relpages = 0;
634 		classForm->reltuples = -1;
635 		classForm->relallvisible = 0;
636 		classForm->reltoastrelid = InvalidOid;
637 		classForm->relhasindex = false;
638 		classForm->relkind = RELKIND_VIEW;
639 		classForm->relfrozenxid = InvalidTransactionId;
640 		classForm->relminmxid = InvalidMultiXactId;
641 		classForm->relreplident = REPLICA_IDENTITY_NOTHING;
642 
643 		CatalogTupleUpdate(relationRelation, &classTup->t_self, classTup);
644 
645 		heap_freetuple(classTup);
646 		table_close(relationRelation, RowExclusiveLock);
647 	}
648 
649 	ObjectAddressSet(address, RewriteRelationId, ruleId);
650 
651 	/* Close rel, but keep lock till commit... */
652 	table_close(event_relation, NoLock);
653 
654 	return address;
655 }
656 
657 /*
658  * checkRuleResultList
659  *		Verify that targetList produces output compatible with a tupledesc
660  *
661  * The targetList might be either a SELECT targetlist, or a RETURNING list;
662  * isSelect tells which.  This is used for choosing error messages.
663  *
664  * A SELECT targetlist may optionally require that column names match.
665  */
666 static void
667 checkRuleResultList(List *targetList, TupleDesc resultDesc, bool isSelect,
668 					bool requireColumnNameMatch)
669 {
670 	ListCell   *tllist;
671 	int			i;
672 
673 	/* Only a SELECT may require a column name match. */
674 	Assert(isSelect || !requireColumnNameMatch);
675 
676 	i = 0;
677 	foreach(tllist, targetList)
678 	{
679 		TargetEntry *tle = (TargetEntry *) lfirst(tllist);
680 		Oid			tletypid;
681 		int32		tletypmod;
682 		Form_pg_attribute attr;
683 		char	   *attname;
684 
685 		/* resjunk entries may be ignored */
686 		if (tle->resjunk)
687 			continue;
688 		i++;
689 		if (i > resultDesc->natts)
690 			ereport(ERROR,
691 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
692 					 isSelect ?
693 					 errmsg("SELECT rule's target list has too many entries") :
694 					 errmsg("RETURNING list has too many entries")));
695 
696 		attr = TupleDescAttr(resultDesc, i - 1);
697 		attname = NameStr(attr->attname);
698 
699 		/*
700 		 * Disallow dropped columns in the relation.  This is not really
701 		 * expected to happen when creating an ON SELECT rule.  It'd be
702 		 * possible if someone tried to convert a relation with dropped
703 		 * columns to a view, but the only case we care about supporting
704 		 * table-to-view conversion for is pg_dump, and pg_dump won't do that.
705 		 *
706 		 * Unfortunately, the situation is also possible when adding a rule
707 		 * with RETURNING to a regular table, and rejecting that case is
708 		 * altogether more annoying.  In principle we could support it by
709 		 * modifying the targetlist to include dummy NULL columns
710 		 * corresponding to the dropped columns in the tupdesc.  However,
711 		 * places like ruleutils.c would have to be fixed to not process such
712 		 * entries, and that would take an uncertain and possibly rather large
713 		 * amount of work.  (Note we could not dodge that by marking the dummy
714 		 * columns resjunk, since it's precisely the non-resjunk tlist columns
715 		 * that are expected to correspond to table columns.)
716 		 */
717 		if (attr->attisdropped)
718 			ereport(ERROR,
719 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
720 					 isSelect ?
721 					 errmsg("cannot convert relation containing dropped columns to view") :
722 					 errmsg("cannot create a RETURNING list for a relation containing dropped columns")));
723 
724 		/* Check name match if required; no need for two error texts here */
725 		if (requireColumnNameMatch && strcmp(tle->resname, attname) != 0)
726 			ereport(ERROR,
727 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
728 					 errmsg("SELECT rule's target entry %d has different column name from column \"%s\"",
729 							i, attname),
730 					 errdetail("SELECT target entry is named \"%s\".",
731 							   tle->resname)));
732 
733 		/* Check type match. */
734 		tletypid = exprType((Node *) tle->expr);
735 		if (attr->atttypid != tletypid)
736 			ereport(ERROR,
737 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
738 					 isSelect ?
739 					 errmsg("SELECT rule's target entry %d has different type from column \"%s\"",
740 							i, attname) :
741 					 errmsg("RETURNING list's entry %d has different type from column \"%s\"",
742 							i, attname),
743 					 isSelect ?
744 					 errdetail("SELECT target entry has type %s, but column has type %s.",
745 							   format_type_be(tletypid),
746 							   format_type_be(attr->atttypid)) :
747 					 errdetail("RETURNING list entry has type %s, but column has type %s.",
748 							   format_type_be(tletypid),
749 							   format_type_be(attr->atttypid))));
750 
751 		/*
752 		 * Allow typmods to be different only if one of them is -1, ie,
753 		 * "unspecified".  This is necessary for cases like "numeric", where
754 		 * the table will have a filled-in default length but the select
755 		 * rule's expression will probably have typmod = -1.
756 		 */
757 		tletypmod = exprTypmod((Node *) tle->expr);
758 		if (attr->atttypmod != tletypmod &&
759 			attr->atttypmod != -1 && tletypmod != -1)
760 			ereport(ERROR,
761 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
762 					 isSelect ?
763 					 errmsg("SELECT rule's target entry %d has different size from column \"%s\"",
764 							i, attname) :
765 					 errmsg("RETURNING list's entry %d has different size from column \"%s\"",
766 							i, attname),
767 					 isSelect ?
768 					 errdetail("SELECT target entry has type %s, but column has type %s.",
769 							   format_type_with_typemod(tletypid, tletypmod),
770 							   format_type_with_typemod(attr->atttypid,
771 														attr->atttypmod)) :
772 					 errdetail("RETURNING list entry has type %s, but column has type %s.",
773 							   format_type_with_typemod(tletypid, tletypmod),
774 							   format_type_with_typemod(attr->atttypid,
775 														attr->atttypmod))));
776 	}
777 
778 	if (i != resultDesc->natts)
779 		ereport(ERROR,
780 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
781 				 isSelect ?
782 				 errmsg("SELECT rule's target list has too few entries") :
783 				 errmsg("RETURNING list has too few entries")));
784 }
785 
786 /*
787  * setRuleCheckAsUser
788  *		Recursively scan a query or expression tree and set the checkAsUser
789  *		field to the given userid in all rtable entries.
790  *
791  * Note: for a view (ON SELECT rule), the checkAsUser field of the OLD
792  * RTE entry will be overridden when the view rule is expanded, and the
793  * checkAsUser field of the NEW entry is irrelevant because that entry's
794  * requiredPerms bits will always be zero.  However, for other types of rules
795  * it's important to set these fields to match the rule owner.  So we just set
796  * them always.
797  */
798 void
799 setRuleCheckAsUser(Node *node, Oid userid)
800 {
801 	(void) setRuleCheckAsUser_walker(node, &userid);
802 }
803 
804 static bool
805 setRuleCheckAsUser_walker(Node *node, Oid *context)
806 {
807 	if (node == NULL)
808 		return false;
809 	if (IsA(node, Query))
810 	{
811 		setRuleCheckAsUser_Query((Query *) node, *context);
812 		return false;
813 	}
814 	return expression_tree_walker(node, setRuleCheckAsUser_walker,
815 								  (void *) context);
816 }
817 
818 static void
819 setRuleCheckAsUser_Query(Query *qry, Oid userid)
820 {
821 	ListCell   *l;
822 
823 	/* Set all the RTEs in this query node */
824 	foreach(l, qry->rtable)
825 	{
826 		RangeTblEntry *rte = (RangeTblEntry *) lfirst(l);
827 
828 		if (rte->rtekind == RTE_SUBQUERY)
829 		{
830 			/* Recurse into subquery in FROM */
831 			setRuleCheckAsUser_Query(rte->subquery, userid);
832 		}
833 		else
834 			rte->checkAsUser = userid;
835 	}
836 
837 	/* Recurse into subquery-in-WITH */
838 	foreach(l, qry->cteList)
839 	{
840 		CommonTableExpr *cte = (CommonTableExpr *) lfirst(l);
841 
842 		setRuleCheckAsUser_Query(castNode(Query, cte->ctequery), userid);
843 	}
844 
845 	/* If there are sublinks, search for them and process their RTEs */
846 	if (qry->hasSubLinks)
847 		query_tree_walker(qry, setRuleCheckAsUser_walker, (void *) &userid,
848 						  QTW_IGNORE_RC_SUBQUERIES);
849 }
850 
851 
852 /*
853  * Change the firing semantics of an existing rule.
854  */
855 void
856 EnableDisableRule(Relation rel, const char *rulename,
857 				  char fires_when)
858 {
859 	Relation	pg_rewrite_desc;
860 	Oid			owningRel = RelationGetRelid(rel);
861 	Oid			eventRelationOid;
862 	HeapTuple	ruletup;
863 	Form_pg_rewrite ruleform;
864 	bool		changed = false;
865 
866 	/*
867 	 * Find the rule tuple to change.
868 	 */
869 	pg_rewrite_desc = table_open(RewriteRelationId, RowExclusiveLock);
870 	ruletup = SearchSysCacheCopy2(RULERELNAME,
871 								  ObjectIdGetDatum(owningRel),
872 								  PointerGetDatum(rulename));
873 	if (!HeapTupleIsValid(ruletup))
874 		ereport(ERROR,
875 				(errcode(ERRCODE_UNDEFINED_OBJECT),
876 				 errmsg("rule \"%s\" for relation \"%s\" does not exist",
877 						rulename, get_rel_name(owningRel))));
878 
879 	ruleform = (Form_pg_rewrite) GETSTRUCT(ruletup);
880 
881 	/*
882 	 * Verify that the user has appropriate permissions.
883 	 */
884 	eventRelationOid = ruleform->ev_class;
885 	Assert(eventRelationOid == owningRel);
886 	if (!pg_class_ownercheck(eventRelationOid, GetUserId()))
887 		aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(eventRelationOid)),
888 					   get_rel_name(eventRelationOid));
889 
890 	/*
891 	 * Change ev_enabled if it is different from the desired new state.
892 	 */
893 	if (DatumGetChar(ruleform->ev_enabled) !=
894 		fires_when)
895 	{
896 		ruleform->ev_enabled = CharGetDatum(fires_when);
897 		CatalogTupleUpdate(pg_rewrite_desc, &ruletup->t_self, ruletup);
898 
899 		changed = true;
900 	}
901 
902 	InvokeObjectPostAlterHook(RewriteRelationId, ruleform->oid, 0);
903 
904 	heap_freetuple(ruletup);
905 	table_close(pg_rewrite_desc, RowExclusiveLock);
906 
907 	/*
908 	 * If we changed anything, broadcast a SI inval message to force each
909 	 * backend (including our own!) to rebuild relation's relcache entry.
910 	 * Otherwise they will fail to apply the change promptly.
911 	 */
912 	if (changed)
913 		CacheInvalidateRelcache(rel);
914 }
915 
916 
917 /*
918  * Perform permissions and integrity checks before acquiring a relation lock.
919  */
920 static void
921 RangeVarCallbackForRenameRule(const RangeVar *rv, Oid relid, Oid oldrelid,
922 							  void *arg)
923 {
924 	HeapTuple	tuple;
925 	Form_pg_class form;
926 
927 	tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
928 	if (!HeapTupleIsValid(tuple))
929 		return;					/* concurrently dropped */
930 	form = (Form_pg_class) GETSTRUCT(tuple);
931 
932 	/* only tables and views can have rules */
933 	if (form->relkind != RELKIND_RELATION &&
934 		form->relkind != RELKIND_VIEW &&
935 		form->relkind != RELKIND_PARTITIONED_TABLE)
936 		ereport(ERROR,
937 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
938 				 errmsg("\"%s\" is not a table or view", rv->relname)));
939 
940 	if (!allowSystemTableMods && IsSystemClass(relid, form))
941 		ereport(ERROR,
942 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
943 				 errmsg("permission denied: \"%s\" is a system catalog",
944 						rv->relname)));
945 
946 	/* you must own the table to rename one of its rules */
947 	if (!pg_class_ownercheck(relid, GetUserId()))
948 		aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(relid)), rv->relname);
949 
950 	ReleaseSysCache(tuple);
951 }
952 
953 /*
954  * Rename an existing rewrite rule.
955  */
956 ObjectAddress
957 RenameRewriteRule(RangeVar *relation, const char *oldName,
958 				  const char *newName)
959 {
960 	Oid			relid;
961 	Relation	targetrel;
962 	Relation	pg_rewrite_desc;
963 	HeapTuple	ruletup;
964 	Form_pg_rewrite ruleform;
965 	Oid			ruleOid;
966 	ObjectAddress address;
967 
968 	/*
969 	 * Look up name, check permissions, and acquire lock (which we will NOT
970 	 * release until end of transaction).
971 	 */
972 	relid = RangeVarGetRelidExtended(relation, AccessExclusiveLock,
973 									 0,
974 									 RangeVarCallbackForRenameRule,
975 									 NULL);
976 
977 	/* Have lock already, so just need to build relcache entry. */
978 	targetrel = relation_open(relid, NoLock);
979 
980 	/* Prepare to modify pg_rewrite */
981 	pg_rewrite_desc = table_open(RewriteRelationId, RowExclusiveLock);
982 
983 	/* Fetch the rule's entry (it had better exist) */
984 	ruletup = SearchSysCacheCopy2(RULERELNAME,
985 								  ObjectIdGetDatum(relid),
986 								  PointerGetDatum(oldName));
987 	if (!HeapTupleIsValid(ruletup))
988 		ereport(ERROR,
989 				(errcode(ERRCODE_UNDEFINED_OBJECT),
990 				 errmsg("rule \"%s\" for relation \"%s\" does not exist",
991 						oldName, RelationGetRelationName(targetrel))));
992 	ruleform = (Form_pg_rewrite) GETSTRUCT(ruletup);
993 	ruleOid = ruleform->oid;
994 
995 	/* rule with the new name should not already exist */
996 	if (IsDefinedRewriteRule(relid, newName))
997 		ereport(ERROR,
998 				(errcode(ERRCODE_DUPLICATE_OBJECT),
999 				 errmsg("rule \"%s\" for relation \"%s\" already exists",
1000 						newName, RelationGetRelationName(targetrel))));
1001 
1002 	/*
1003 	 * We disallow renaming ON SELECT rules, because they should always be
1004 	 * named "_RETURN".
1005 	 */
1006 	if (ruleform->ev_type == CMD_SELECT + '0')
1007 		ereport(ERROR,
1008 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1009 				 errmsg("renaming an ON SELECT rule is not allowed")));
1010 
1011 	/* OK, do the update */
1012 	namestrcpy(&(ruleform->rulename), newName);
1013 
1014 	CatalogTupleUpdate(pg_rewrite_desc, &ruletup->t_self, ruletup);
1015 
1016 	InvokeObjectPostAlterHook(RewriteRelationId, ruleOid, 0);
1017 
1018 	heap_freetuple(ruletup);
1019 	table_close(pg_rewrite_desc, RowExclusiveLock);
1020 
1021 	/*
1022 	 * Invalidate relation's relcache entry so that other backends (and this
1023 	 * one too!) are sent SI message to make them rebuild relcache entries.
1024 	 * (Ideally this should happen automatically...)
1025 	 */
1026 	CacheInvalidateRelcache(targetrel);
1027 
1028 	ObjectAddressSet(address, RewriteRelationId, ruleOid);
1029 
1030 	/*
1031 	 * Close rel, but keep exclusive lock!
1032 	 */
1033 	relation_close(targetrel, NoLock);
1034 
1035 	return address;
1036 }
1037