1 /*-------------------------------------------------------------------------
2  *
3  * rewriteDefine.c
4  *	  routines for defining a rewrite rule
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/rewrite/rewriteDefine.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/heapam.h"
18 #include "access/htup_details.h"
19 #include "access/multixact.h"
20 #include "access/transam.h"
21 #include "access/xact.h"
22 #include "catalog/catalog.h"
23 #include "catalog/dependency.h"
24 #include "catalog/heap.h"
25 #include "catalog/indexing.h"
testConstexprYears(std::chrono::year_month_weekday_last ym)26 #include "catalog/namespace.h"
27 #include "catalog/objectaccess.h"
28 #include "catalog/pg_inherits.h"
29 #include "catalog/pg_rewrite.h"
30 #include "catalog/storage.h"
31 #include "commands/policy.h"
32 #include "miscadmin.h"
33 #include "nodes/nodeFuncs.h"
34 #include "parser/parse_utilcmd.h"
35 #include "rewrite/rewriteDefine.h"
36 #include "rewrite/rewriteManip.h"
37 #include "rewrite/rewriteSupport.h"
38 #include "utils/acl.h"
39 #include "utils/builtins.h"
40 #include "utils/inval.h"
41 #include "utils/lsyscache.h"
42 #include "utils/rel.h"
43 #include "utils/snapmgr.h"
44 #include "utils/syscache.h"
45 #include "utils/tqual.h"
46 
47 
48 static void checkRuleResultList(List *targetList, TupleDesc resultDesc,
49 					bool isSelect, bool requireColumnNameMatch);
50 static bool setRuleCheckAsUser_walker(Node *node, Oid *context);
51 static void setRuleCheckAsUser_Query(Query *qry, Oid userid);
52 
53 
54 /*
55  * InsertRule -
56  *	  takes the arguments and inserts them as a row into the system
57  *	  relation "pg_rewrite"
58  */
59 static Oid
60 InsertRule(const char *rulname,
61 		   int evtype,
62 		   Oid eventrel_oid,
63 		   bool evinstead,
64 		   Node *event_qual,
65 		   List *action,
66 		   bool replace)
67 {
68 	char	   *evqual = nodeToString(event_qual);
69 	char	   *actiontree = nodeToString((Node *) action);
70 	Datum		values[Natts_pg_rewrite];
71 	bool		nulls[Natts_pg_rewrite];
72 	bool		replaces[Natts_pg_rewrite];
73 	NameData	rname;
74 	Relation	pg_rewrite_desc;
75 	HeapTuple	tup,
76 				oldtup;
77 	Oid			rewriteObjectId;
78 	ObjectAddress myself,
79 				referenced;
80 	bool		is_update = false;
81 
82 	/*
83 	 * Set up *nulls and *values arrays
84 	 */
85 	MemSet(nulls, false, sizeof(nulls));
86 
87 	namestrcpy(&rname, rulname);
88 	values[Anum_pg_rewrite_rulename - 1] = NameGetDatum(&rname);
89 	values[Anum_pg_rewrite_ev_class - 1] = ObjectIdGetDatum(eventrel_oid);
90 	values[Anum_pg_rewrite_ev_type - 1] = CharGetDatum(evtype + '0');
91 	values[Anum_pg_rewrite_ev_enabled - 1] = CharGetDatum(RULE_FIRES_ON_ORIGIN);
92 	values[Anum_pg_rewrite_is_instead - 1] = BoolGetDatum(evinstead);
93 	values[Anum_pg_rewrite_ev_qual - 1] = CStringGetTextDatum(evqual);
94 	values[Anum_pg_rewrite_ev_action - 1] = CStringGetTextDatum(actiontree);
95 
96 	/*
97 	 * Ready to store new pg_rewrite tuple
98 	 */
99 	pg_rewrite_desc = heap_open(RewriteRelationId, RowExclusiveLock);
100 
101 	/*
102 	 * Check to see if we are replacing an existing tuple
103 	 */
104 	oldtup = SearchSysCache2(RULERELNAME,
105 							 ObjectIdGetDatum(eventrel_oid),
106 							 PointerGetDatum(rulname));
107 
108 	if (HeapTupleIsValid(oldtup))
109 	{
110 		if (!replace)
111 			ereport(ERROR,
112 					(errcode(ERRCODE_DUPLICATE_OBJECT),
113 					 errmsg("rule \"%s\" for relation \"%s\" already exists",
114 							rulname, get_rel_name(eventrel_oid))));
115 
116 		/*
117 		 * When replacing, we don't need to replace every attribute
118 		 */
119 		MemSet(replaces, false, sizeof(replaces));
120 		replaces[Anum_pg_rewrite_ev_type - 1] = true;
121 		replaces[Anum_pg_rewrite_is_instead - 1] = true;
122 		replaces[Anum_pg_rewrite_ev_qual - 1] = true;
123 		replaces[Anum_pg_rewrite_ev_action - 1] = true;
124 
125 		tup = heap_modify_tuple(oldtup, RelationGetDescr(pg_rewrite_desc),
126 								values, nulls, replaces);
127 
128 		CatalogTupleUpdate(pg_rewrite_desc, &tup->t_self, tup);
129 
130 		ReleaseSysCache(oldtup);
131 
132 		rewriteObjectId = HeapTupleGetOid(tup);
133 		is_update = true;
134 	}
135 	else
136 	{
137 		tup = heap_form_tuple(pg_rewrite_desc->rd_att, values, nulls);
138 
139 		rewriteObjectId = CatalogTupleInsert(pg_rewrite_desc, tup);
140 	}
141 
142 
143 	heap_freetuple(tup);
144 
145 	/* If replacing, get rid of old dependencies and make new ones */
146 	if (is_update)
147 		deleteDependencyRecordsFor(RewriteRelationId, rewriteObjectId, false);
148 
149 	/*
150 	 * Install dependency on rule's relation to ensure it will go away on
151 	 * relation deletion.  If the rule is ON SELECT, make the dependency
152 	 * implicit --- this prevents deleting a view's SELECT rule.  Other kinds
153 	 * of rules can be AUTO.
154 	 */
155 	myself.classId = RewriteRelationId;
156 	myself.objectId = rewriteObjectId;
157 	myself.objectSubId = 0;
158 
159 	referenced.classId = RelationRelationId;
160 	referenced.objectId = eventrel_oid;
161 	referenced.objectSubId = 0;
162 
163 	recordDependencyOn(&myself, &referenced,
164 					   (evtype == CMD_SELECT) ? DEPENDENCY_INTERNAL : DEPENDENCY_AUTO);
165 
166 	/*
167 	 * Also install dependencies on objects referenced in action and qual.
168 	 */
169 	recordDependencyOnExpr(&myself, (Node *) action, NIL,
170 						   DEPENDENCY_NORMAL);
171 
172 	if (event_qual != NULL)
173 	{
174 		/* Find query containing OLD/NEW rtable entries */
175 		Query	   *qry = linitial_node(Query, action);
176 
177 		qry = getInsertSelectQuery(qry, NULL);
178 		recordDependencyOnExpr(&myself, event_qual, qry->rtable,
179 							   DEPENDENCY_NORMAL);
180 	}
181 
182 	/* Post creation hook for new rule */
183 	InvokeObjectPostCreateHook(RewriteRelationId, rewriteObjectId, 0);
184 
185 	heap_close(pg_rewrite_desc, RowExclusiveLock);
186 
187 	return rewriteObjectId;
188 }
189 
190 /*
191  * DefineRule
192  *		Execute a CREATE RULE command.
193  */
194 ObjectAddress
195 DefineRule(RuleStmt *stmt, const char *queryString)
196 {
197 	List	   *actions;
198 	Node	   *whereClause;
199 	Oid			relId;
200 
201 	/* Parse analysis. */
202 	transformRuleStmt(stmt, queryString, &actions, &whereClause);
203 
204 	/*
205 	 * Find and lock the relation.  Lock level should match
206 	 * DefineQueryRewrite.
207 	 */
208 	relId = RangeVarGetRelid(stmt->relation, AccessExclusiveLock, false);
209 
210 	/* ... and execute */
211 	return DefineQueryRewrite(stmt->rulename,
212 							  relId,
213 							  whereClause,
214 							  stmt->event,
215 							  stmt->instead,
216 							  stmt->replace,
217 							  actions);
218 }
219 
220 
221 /*
222  * DefineQueryRewrite
223  *		Create a rule
224  *
225  * This is essentially the same as DefineRule() except that the rule's
226  * action and qual have already been passed through parse analysis.
227  */
228 ObjectAddress
229 DefineQueryRewrite(const char *rulename,
230 				   Oid event_relid,
231 				   Node *event_qual,
232 				   CmdType event_type,
233 				   bool is_instead,
234 				   bool replace,
235 				   List *action)
236 {
237 	Relation	event_relation;
238 	ListCell   *l;
239 	Query	   *query;
240 	bool		RelisBecomingView = false;
241 	Oid			ruleId = InvalidOid;
242 	ObjectAddress address;
243 
244 	/*
245 	 * If we are installing an ON SELECT rule, we had better grab
246 	 * AccessExclusiveLock to ensure no SELECTs are currently running on the
247 	 * event relation. For other types of rules, it would be sufficient to
248 	 * grab ShareRowExclusiveLock to lock out insert/update/delete actions and
249 	 * to ensure that we lock out current CREATE RULE statements; but because
250 	 * of race conditions in access to catalog entries, we can't do that yet.
251 	 *
252 	 * Note that this lock level should match the one used in DefineRule.
253 	 */
254 	event_relation = heap_open(event_relid, AccessExclusiveLock);
255 
256 	/*
257 	 * Verify relation is of a type that rules can sensibly be applied to.
258 	 * Internal callers can target materialized views, but transformRuleStmt()
259 	 * blocks them for users.  Don't mention them in the error message.
260 	 */
261 	if (event_relation->rd_rel->relkind != RELKIND_RELATION &&
262 		event_relation->rd_rel->relkind != RELKIND_MATVIEW &&
263 		event_relation->rd_rel->relkind != RELKIND_VIEW &&
264 		event_relation->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
265 		ereport(ERROR,
266 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
267 				 errmsg("\"%s\" is not a table or view",
268 						RelationGetRelationName(event_relation))));
269 
270 	if (!allowSystemTableMods && IsSystemRelation(event_relation))
271 		ereport(ERROR,
272 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
273 				 errmsg("permission denied: \"%s\" is a system catalog",
274 						RelationGetRelationName(event_relation))));
275 
276 	/*
277 	 * Check user has permission to apply rules to this relation.
278 	 */
279 	if (!pg_class_ownercheck(event_relid, GetUserId()))
280 		aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(event_relation->rd_rel->relkind),
281 					   RelationGetRelationName(event_relation));
282 
283 	/*
284 	 * No rule actions that modify OLD or NEW
285 	 */
286 	foreach(l, action)
287 	{
288 		query = lfirst_node(Query, l);
289 		if (query->resultRelation == 0)
290 			continue;
291 		/* Don't be fooled by INSERT/SELECT */
292 		if (query != getInsertSelectQuery(query, NULL))
293 			continue;
294 		if (query->resultRelation == PRS2_OLD_VARNO)
295 			ereport(ERROR,
296 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
297 					 errmsg("rule actions on OLD are not implemented"),
298 					 errhint("Use views or triggers instead.")));
299 		if (query->resultRelation == PRS2_NEW_VARNO)
300 			ereport(ERROR,
301 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
302 					 errmsg("rule actions on NEW are not implemented"),
303 					 errhint("Use triggers instead.")));
304 	}
305 
306 	if (event_type == CMD_SELECT)
307 	{
308 		/*
309 		 * Rules ON SELECT are restricted to view definitions
310 		 *
311 		 * So there cannot be INSTEAD NOTHING, ...
312 		 */
313 		if (list_length(action) == 0)
314 			ereport(ERROR,
315 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
316 					 errmsg("INSTEAD NOTHING rules on SELECT are not implemented"),
317 					 errhint("Use views instead.")));
318 
319 		/*
320 		 * ... there cannot be multiple actions, ...
321 		 */
322 		if (list_length(action) > 1)
323 			ereport(ERROR,
324 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
325 					 errmsg("multiple actions for rules on SELECT are not implemented")));
326 
327 		/*
328 		 * ... the one action must be a SELECT, ...
329 		 */
330 		query = linitial_node(Query, action);
331 		if (!is_instead ||
332 			query->commandType != CMD_SELECT)
333 			ereport(ERROR,
334 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
335 					 errmsg("rules on SELECT must have action INSTEAD SELECT")));
336 
337 		/*
338 		 * ... it cannot contain data-modifying WITH ...
339 		 */
340 		if (query->hasModifyingCTE)
341 			ereport(ERROR,
342 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
343 					 errmsg("rules on SELECT must not contain data-modifying statements in WITH")));
344 
345 		/*
346 		 * ... there can be no rule qual, ...
347 		 */
348 		if (event_qual != NULL)
349 			ereport(ERROR,
350 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
351 					 errmsg("event qualifications are not implemented for rules on SELECT")));
352 
353 		/*
354 		 * ... the targetlist of the SELECT action must exactly match the
355 		 * event relation, ...
356 		 */
357 		checkRuleResultList(query->targetList,
358 							RelationGetDescr(event_relation),
359 							true,
360 							event_relation->rd_rel->relkind !=
361 							RELKIND_MATVIEW);
362 
363 		/*
364 		 * ... there must not be another ON SELECT rule already ...
365 		 */
366 		if (!replace && event_relation->rd_rules != NULL)
367 		{
368 			int			i;
369 
370 			for (i = 0; i < event_relation->rd_rules->numLocks; i++)
371 			{
372 				RewriteRule *rule;
373 
374 				rule = event_relation->rd_rules->rules[i];
375 				if (rule->event == CMD_SELECT)
376 					ereport(ERROR,
377 							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
378 							 errmsg("\"%s\" is already a view",
379 									RelationGetRelationName(event_relation))));
380 			}
381 		}
382 
383 		/*
384 		 * ... and finally the rule must be named _RETURN.
385 		 */
386 		if (strcmp(rulename, ViewSelectRuleName) != 0)
387 		{
388 			/*
389 			 * In versions before 7.3, the expected name was _RETviewname. For
390 			 * backwards compatibility with old pg_dump output, accept that
391 			 * and silently change it to _RETURN.  Since this is just a quick
392 			 * backwards-compatibility hack, limit the number of characters
393 			 * checked to a few less than NAMEDATALEN; this saves having to
394 			 * worry about where a multibyte character might have gotten
395 			 * truncated.
396 			 */
397 			if (strncmp(rulename, "_RET", 4) != 0 ||
398 				strncmp(rulename + 4, RelationGetRelationName(event_relation),
399 						NAMEDATALEN - 4 - 4) != 0)
400 				ereport(ERROR,
401 						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
402 						 errmsg("view rule for \"%s\" must be named \"%s\"",
403 								RelationGetRelationName(event_relation),
404 								ViewSelectRuleName)));
405 			rulename = pstrdup(ViewSelectRuleName);
406 		}
407 
408 		/*
409 		 * Are we converting a relation to a view?
410 		 *
411 		 * If so, check that the relation is empty because the storage for the
412 		 * relation is going to be deleted.  Also insist that the rel not be
413 		 * involved in partitioning, nor have any triggers, indexes, child or
414 		 * parent tables, RLS policies, or RLS enabled.  (Note: some of these
415 		 * tests are too strict, because they will reject relations that once
416 		 * had such but don't anymore.  But we don't really care, because this
417 		 * whole business of converting relations to views is just an obsolete
418 		 * kluge to allow dump/reload of views that participate in circular
419 		 * dependencies.)
420 		 */
421 		if (event_relation->rd_rel->relkind != RELKIND_VIEW &&
422 			event_relation->rd_rel->relkind != RELKIND_MATVIEW)
423 		{
424 			HeapScanDesc scanDesc;
425 			Snapshot	snapshot;
426 
427 			if (event_relation->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
428 				ereport(ERROR,
429 						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
430 						 errmsg("cannot convert partitioned table \"%s\" to a view",
431 								RelationGetRelationName(event_relation))));
432 
433 			/* only case left: */
434 			Assert(event_relation->rd_rel->relkind == RELKIND_RELATION);
435 
436 			if (event_relation->rd_rel->relispartition)
437 				ereport(ERROR,
438 						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
439 						 errmsg("cannot convert partition \"%s\" to a view",
440 								RelationGetRelationName(event_relation))));
441 
442 			snapshot = RegisterSnapshot(GetLatestSnapshot());
443 			scanDesc = heap_beginscan(event_relation, snapshot, 0, NULL);
444 			if (heap_getnext(scanDesc, ForwardScanDirection) != NULL)
445 				ereport(ERROR,
446 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
447 						 errmsg("could not convert table \"%s\" to a view because it is not empty",
448 								RelationGetRelationName(event_relation))));
449 			heap_endscan(scanDesc);
450 			UnregisterSnapshot(snapshot);
451 
452 			if (event_relation->rd_rel->relhastriggers)
453 				ereport(ERROR,
454 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
455 						 errmsg("could not convert table \"%s\" to a view because it has triggers",
456 								RelationGetRelationName(event_relation)),
457 						 errhint("In particular, the table cannot be involved in any foreign key relationships.")));
458 
459 			if (event_relation->rd_rel->relhasindex)
460 				ereport(ERROR,
461 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
462 						 errmsg("could not convert table \"%s\" to a view because it has indexes",
463 								RelationGetRelationName(event_relation))));
464 
465 			if (event_relation->rd_rel->relhassubclass)
466 				ereport(ERROR,
467 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
468 						 errmsg("could not convert table \"%s\" to a view because it has child tables",
469 								RelationGetRelationName(event_relation))));
470 
471 			if (has_superclass(RelationGetRelid(event_relation)))
472 				ereport(ERROR,
473 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
474 						 errmsg("could not convert table \"%s\" to a view because it has parent tables",
475 								RelationGetRelationName(event_relation))));
476 
477 			if (event_relation->rd_rel->relrowsecurity)
478 				ereport(ERROR,
479 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
480 						 errmsg("could not convert table \"%s\" to a view because it has row security enabled",
481 								RelationGetRelationName(event_relation))));
482 
483 			if (relation_has_policies(event_relation))
484 				ereport(ERROR,
485 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
486 						 errmsg("could not convert table \"%s\" to a view because it has row security policies",
487 								RelationGetRelationName(event_relation))));
488 
489 			RelisBecomingView = true;
490 		}
491 	}
492 	else
493 	{
494 		/*
495 		 * For non-SELECT rules, a RETURNING list can appear in at most one of
496 		 * the actions ... and there can't be any RETURNING list at all in a
497 		 * conditional or non-INSTEAD rule.  (Actually, there can be at most
498 		 * one RETURNING list across all rules on the same event, but it seems
499 		 * best to enforce that at rule expansion time.)  If there is a
500 		 * RETURNING list, it must match the event relation.
501 		 */
502 		bool		haveReturning = false;
503 
504 		foreach(l, action)
505 		{
506 			query = lfirst_node(Query, l);
507 
508 			if (!query->returningList)
509 				continue;
510 			if (haveReturning)
511 				ereport(ERROR,
512 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
513 						 errmsg("cannot have multiple RETURNING lists in a rule")));
514 			haveReturning = true;
515 			if (event_qual != NULL)
516 				ereport(ERROR,
517 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
518 						 errmsg("RETURNING lists are not supported in conditional rules")));
519 			if (!is_instead)
520 				ereport(ERROR,
521 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
522 						 errmsg("RETURNING lists are not supported in non-INSTEAD rules")));
523 			checkRuleResultList(query->returningList,
524 								RelationGetDescr(event_relation),
525 								false, false);
526 		}
527 	}
528 
529 	/*
530 	 * This rule is allowed - prepare to install it.
531 	 */
532 
533 	/* discard rule if it's null action and not INSTEAD; it's a no-op */
534 	if (action != NIL || is_instead)
535 	{
536 		ruleId = InsertRule(rulename,
537 							event_type,
538 							event_relid,
539 							is_instead,
540 							event_qual,
541 							action,
542 							replace);
543 
544 		/*
545 		 * Set pg_class 'relhasrules' field true for event relation.
546 		 *
547 		 * Important side effect: an SI notice is broadcast to force all
548 		 * backends (including me!) to update relcache entries with the new
549 		 * rule.
550 		 */
551 		SetRelationRuleStatus(event_relid, true);
552 	}
553 
554 	/* ---------------------------------------------------------------------
555 	 * If the relation is becoming a view:
556 	 * - delete the associated storage files
557 	 * - get rid of any system attributes in pg_attribute; a view shouldn't
558 	 *	 have any of those
559 	 * - remove the toast table; there is no need for it anymore, and its
560 	 *	 presence would make vacuum slightly more complicated
561 	 * - set relkind to RELKIND_VIEW, and adjust other pg_class fields
562 	 *	 to be appropriate for a view
563 	 *
564 	 * NB: we had better have AccessExclusiveLock to do this ...
565 	 * ---------------------------------------------------------------------
566 	 */
567 	if (RelisBecomingView)
568 	{
569 		Relation	relationRelation;
570 		Oid			toastrelid;
571 		HeapTuple	classTup;
572 		Form_pg_class classForm;
573 
574 		relationRelation = heap_open(RelationRelationId, RowExclusiveLock);
575 		toastrelid = event_relation->rd_rel->reltoastrelid;
576 
577 		/* drop storage while table still looks like a table  */
578 		RelationDropStorage(event_relation);
579 		DeleteSystemAttributeTuples(event_relid);
580 
581 		/*
582 		 * Drop the toast table if any.  (This won't take care of updating the
583 		 * toast fields in the relation's own pg_class entry; we handle that
584 		 * below.)
585 		 */
586 		if (OidIsValid(toastrelid))
587 		{
588 			ObjectAddress toastobject;
589 
590 			/*
591 			 * Delete the dependency of the toast relation on the main
592 			 * relation so we can drop the former without dropping the latter.
593 			 */
594 			deleteDependencyRecordsFor(RelationRelationId, toastrelid,
595 									   false);
596 
597 			/* Make deletion of dependency record visible */
598 			CommandCounterIncrement();
599 
600 			/* Now drop toast table, including its index */
601 			toastobject.classId = RelationRelationId;
602 			toastobject.objectId = toastrelid;
603 			toastobject.objectSubId = 0;
604 			performDeletion(&toastobject, DROP_RESTRICT,
605 							PERFORM_DELETION_INTERNAL);
606 		}
607 
608 		/*
609 		 * SetRelationRuleStatus may have updated the pg_class row, so we must
610 		 * advance the command counter before trying to update it again.
611 		 */
612 		CommandCounterIncrement();
613 
614 		/*
615 		 * Fix pg_class entry to look like a normal view's, including setting
616 		 * the correct relkind and removal of reltoastrelid of the toast table
617 		 * we potentially removed above.
618 		 */
619 		classTup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(event_relid));
620 		if (!HeapTupleIsValid(classTup))
621 			elog(ERROR, "cache lookup failed for relation %u", event_relid);
622 		classForm = (Form_pg_class) GETSTRUCT(classTup);
623 
624 		classForm->reltablespace = InvalidOid;
625 		classForm->relpages = 0;
626 		classForm->reltuples = 0;
627 		classForm->relallvisible = 0;
628 		classForm->reltoastrelid = InvalidOid;
629 		classForm->relhasindex = false;
630 		classForm->relkind = RELKIND_VIEW;
631 		classForm->relhasoids = false;
632 		classForm->relfrozenxid = InvalidTransactionId;
633 		classForm->relminmxid = InvalidMultiXactId;
634 		classForm->relreplident = REPLICA_IDENTITY_NOTHING;
635 
636 		CatalogTupleUpdate(relationRelation, &classTup->t_self, classTup);
637 
638 		heap_freetuple(classTup);
639 		heap_close(relationRelation, RowExclusiveLock);
640 	}
641 
642 	ObjectAddressSet(address, RewriteRelationId, ruleId);
643 
644 	/* Close rel, but keep lock till commit... */
645 	heap_close(event_relation, NoLock);
646 
647 	return address;
648 }
649 
650 /*
651  * checkRuleResultList
652  *		Verify that targetList produces output compatible with a tupledesc
653  *
654  * The targetList might be either a SELECT targetlist, or a RETURNING list;
655  * isSelect tells which.  This is used for choosing error messages.
656  *
657  * A SELECT targetlist may optionally require that column names match.
658  */
659 static void
660 checkRuleResultList(List *targetList, TupleDesc resultDesc, bool isSelect,
661 					bool requireColumnNameMatch)
662 {
663 	ListCell   *tllist;
664 	int			i;
665 
666 	/* Only a SELECT may require a column name match. */
667 	Assert(isSelect || !requireColumnNameMatch);
668 
669 	i = 0;
670 	foreach(tllist, targetList)
671 	{
672 		TargetEntry *tle = (TargetEntry *) lfirst(tllist);
673 		Oid			tletypid;
674 		int32		tletypmod;
675 		Form_pg_attribute attr;
676 		char	   *attname;
677 
678 		/* resjunk entries may be ignored */
679 		if (tle->resjunk)
680 			continue;
681 		i++;
682 		if (i > resultDesc->natts)
683 			ereport(ERROR,
684 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
685 					 isSelect ?
686 					 errmsg("SELECT rule's target list has too many entries") :
687 					 errmsg("RETURNING list has too many entries")));
688 
689 		attr = TupleDescAttr(resultDesc, i - 1);
690 		attname = NameStr(attr->attname);
691 
692 		/*
693 		 * Disallow dropped columns in the relation.  This is not really
694 		 * expected to happen when creating an ON SELECT rule.  It'd be
695 		 * possible if someone tried to convert a relation with dropped
696 		 * columns to a view, but the only case we care about supporting
697 		 * table-to-view conversion for is pg_dump, and pg_dump won't do that.
698 		 *
699 		 * Unfortunately, the situation is also possible when adding a rule
700 		 * with RETURNING to a regular table, and rejecting that case is
701 		 * altogether more annoying.  In principle we could support it by
702 		 * modifying the targetlist to include dummy NULL columns
703 		 * corresponding to the dropped columns in the tupdesc.  However,
704 		 * places like ruleutils.c would have to be fixed to not process such
705 		 * entries, and that would take an uncertain and possibly rather large
706 		 * amount of work.  (Note we could not dodge that by marking the dummy
707 		 * columns resjunk, since it's precisely the non-resjunk tlist columns
708 		 * that are expected to correspond to table columns.)
709 		 */
710 		if (attr->attisdropped)
711 			ereport(ERROR,
712 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
713 					 isSelect ?
714 					 errmsg("cannot convert relation containing dropped columns to view") :
715 					 errmsg("cannot create a RETURNING list for a relation containing dropped columns")));
716 
717 		/* Check name match if required; no need for two error texts here */
718 		if (requireColumnNameMatch && strcmp(tle->resname, attname) != 0)
719 			ereport(ERROR,
720 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
721 					 errmsg("SELECT rule's target entry %d has different column name from column \"%s\"",
722 							i, attname),
723 					 errdetail("SELECT target entry is named \"%s\".",
724 							   tle->resname)));
725 
726 		/* Check type match. */
727 		tletypid = exprType((Node *) tle->expr);
728 		if (attr->atttypid != tletypid)
729 			ereport(ERROR,
730 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
731 					 isSelect ?
732 					 errmsg("SELECT rule's target entry %d has different type from column \"%s\"",
733 							i, attname) :
734 					 errmsg("RETURNING list's entry %d has different type from column \"%s\"",
735 							i, attname),
736 					 isSelect ?
737 					 errdetail("SELECT target entry has type %s, but column has type %s.",
738 							   format_type_be(tletypid),
739 							   format_type_be(attr->atttypid)) :
740 					 errdetail("RETURNING list entry has type %s, but column has type %s.",
741 							   format_type_be(tletypid),
742 							   format_type_be(attr->atttypid))));
743 
744 		/*
745 		 * Allow typmods to be different only if one of them is -1, ie,
746 		 * "unspecified".  This is necessary for cases like "numeric", where
747 		 * the table will have a filled-in default length but the select
748 		 * rule's expression will probably have typmod = -1.
749 		 */
750 		tletypmod = exprTypmod((Node *) tle->expr);
751 		if (attr->atttypmod != tletypmod &&
752 			attr->atttypmod != -1 && tletypmod != -1)
753 			ereport(ERROR,
754 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
755 					 isSelect ?
756 					 errmsg("SELECT rule's target entry %d has different size from column \"%s\"",
757 							i, attname) :
758 					 errmsg("RETURNING list's entry %d has different size from column \"%s\"",
759 							i, attname),
760 					 isSelect ?
761 					 errdetail("SELECT target entry has type %s, but column has type %s.",
762 							   format_type_with_typemod(tletypid, tletypmod),
763 							   format_type_with_typemod(attr->atttypid,
764 														attr->atttypmod)) :
765 					 errdetail("RETURNING list entry has type %s, but column has type %s.",
766 							   format_type_with_typemod(tletypid, tletypmod),
767 							   format_type_with_typemod(attr->atttypid,
768 														attr->atttypmod))));
769 	}
770 
771 	if (i != resultDesc->natts)
772 		ereport(ERROR,
773 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
774 				 isSelect ?
775 				 errmsg("SELECT rule's target list has too few entries") :
776 				 errmsg("RETURNING list has too few entries")));
777 }
778 
779 /*
780  * setRuleCheckAsUser
781  *		Recursively scan a query or expression tree and set the checkAsUser
782  *		field to the given userid in all rtable entries.
783  *
784  * Note: for a view (ON SELECT rule), the checkAsUser field of the OLD
785  * RTE entry will be overridden when the view rule is expanded, and the
786  * checkAsUser field of the NEW entry is irrelevant because that entry's
787  * requiredPerms bits will always be zero.  However, for other types of rules
788  * it's important to set these fields to match the rule owner.  So we just set
789  * them always.
790  */
791 void
792 setRuleCheckAsUser(Node *node, Oid userid)
793 {
794 	(void) setRuleCheckAsUser_walker(node, &userid);
795 }
796 
797 static bool
798 setRuleCheckAsUser_walker(Node *node, Oid *context)
799 {
800 	if (node == NULL)
801 		return false;
802 	if (IsA(node, Query))
803 	{
804 		setRuleCheckAsUser_Query((Query *) node, *context);
805 		return false;
806 	}
807 	return expression_tree_walker(node, setRuleCheckAsUser_walker,
808 								  (void *) context);
809 }
810 
811 static void
812 setRuleCheckAsUser_Query(Query *qry, Oid userid)
813 {
814 	ListCell   *l;
815 
816 	/* Set all the RTEs in this query node */
817 	foreach(l, qry->rtable)
818 	{
819 		RangeTblEntry *rte = (RangeTblEntry *) lfirst(l);
820 
821 		if (rte->rtekind == RTE_SUBQUERY)
822 		{
823 			/* Recurse into subquery in FROM */
824 			setRuleCheckAsUser_Query(rte->subquery, userid);
825 		}
826 		else
827 			rte->checkAsUser = userid;
828 	}
829 
830 	/* Recurse into subquery-in-WITH */
831 	foreach(l, qry->cteList)
832 	{
833 		CommonTableExpr *cte = (CommonTableExpr *) lfirst(l);
834 
835 		setRuleCheckAsUser_Query(castNode(Query, cte->ctequery), userid);
836 	}
837 
838 	/* If there are sublinks, search for them and process their RTEs */
839 	if (qry->hasSubLinks)
840 		query_tree_walker(qry, setRuleCheckAsUser_walker, (void *) &userid,
841 						  QTW_IGNORE_RC_SUBQUERIES);
842 }
843 
844 
845 /*
846  * Change the firing semantics of an existing rule.
847  */
848 void
849 EnableDisableRule(Relation rel, const char *rulename,
850 				  char fires_when)
851 {
852 	Relation	pg_rewrite_desc;
853 	Oid			owningRel = RelationGetRelid(rel);
854 	Oid			eventRelationOid;
855 	HeapTuple	ruletup;
856 	bool		changed = false;
857 
858 	/*
859 	 * Find the rule tuple to change.
860 	 */
861 	pg_rewrite_desc = heap_open(RewriteRelationId, RowExclusiveLock);
862 	ruletup = SearchSysCacheCopy2(RULERELNAME,
863 								  ObjectIdGetDatum(owningRel),
864 								  PointerGetDatum(rulename));
865 	if (!HeapTupleIsValid(ruletup))
866 		ereport(ERROR,
867 				(errcode(ERRCODE_UNDEFINED_OBJECT),
868 				 errmsg("rule \"%s\" for relation \"%s\" does not exist",
869 						rulename, get_rel_name(owningRel))));
870 
871 	/*
872 	 * Verify that the user has appropriate permissions.
873 	 */
874 	eventRelationOid = ((Form_pg_rewrite) GETSTRUCT(ruletup))->ev_class;
875 	Assert(eventRelationOid == owningRel);
876 	if (!pg_class_ownercheck(eventRelationOid, GetUserId()))
877 		aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(eventRelationOid)),
878 					   get_rel_name(eventRelationOid));
879 
880 	/*
881 	 * Change ev_enabled if it is different from the desired new state.
882 	 */
883 	if (DatumGetChar(((Form_pg_rewrite) GETSTRUCT(ruletup))->ev_enabled) !=
884 		fires_when)
885 	{
886 		((Form_pg_rewrite) GETSTRUCT(ruletup))->ev_enabled =
887 			CharGetDatum(fires_when);
888 		CatalogTupleUpdate(pg_rewrite_desc, &ruletup->t_self, ruletup);
889 
890 		changed = true;
891 	}
892 
893 	InvokeObjectPostAlterHook(RewriteRelationId,
894 							  HeapTupleGetOid(ruletup), 0);
895 
896 	heap_freetuple(ruletup);
897 	heap_close(pg_rewrite_desc, RowExclusiveLock);
898 
899 	/*
900 	 * If we changed anything, broadcast a SI inval message to force each
901 	 * backend (including our own!) to rebuild relation's relcache entry.
902 	 * Otherwise they will fail to apply the change promptly.
903 	 */
904 	if (changed)
905 		CacheInvalidateRelcache(rel);
906 }
907 
908 
909 /*
910  * Perform permissions and integrity checks before acquiring a relation lock.
911  */
912 static void
913 RangeVarCallbackForRenameRule(const RangeVar *rv, Oid relid, Oid oldrelid,
914 							  void *arg)
915 {
916 	HeapTuple	tuple;
917 	Form_pg_class form;
918 
919 	tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
920 	if (!HeapTupleIsValid(tuple))
921 		return;					/* concurrently dropped */
922 	form = (Form_pg_class) GETSTRUCT(tuple);
923 
924 	/* only tables and views can have rules */
925 	if (form->relkind != RELKIND_RELATION &&
926 		form->relkind != RELKIND_VIEW &&
927 		form->relkind != RELKIND_PARTITIONED_TABLE)
928 		ereport(ERROR,
929 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
930 				 errmsg("\"%s\" is not a table or view", rv->relname)));
931 
932 	if (!allowSystemTableMods && IsSystemClass(relid, form))
933 		ereport(ERROR,
934 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
935 				 errmsg("permission denied: \"%s\" is a system catalog",
936 						rv->relname)));
937 
938 	/* you must own the table to rename one of its rules */
939 	if (!pg_class_ownercheck(relid, GetUserId()))
940 		aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(get_rel_relkind(relid)), rv->relname);
941 
942 	ReleaseSysCache(tuple);
943 }
944 
945 /*
946  * Rename an existing rewrite rule.
947  */
948 ObjectAddress
949 RenameRewriteRule(RangeVar *relation, const char *oldName,
950 				  const char *newName)
951 {
952 	Oid			relid;
953 	Relation	targetrel;
954 	Relation	pg_rewrite_desc;
955 	HeapTuple	ruletup;
956 	Form_pg_rewrite ruleform;
957 	Oid			ruleOid;
958 	ObjectAddress address;
959 
960 	/*
961 	 * Look up name, check permissions, and acquire lock (which we will NOT
962 	 * release until end of transaction).
963 	 */
964 	relid = RangeVarGetRelidExtended(relation, AccessExclusiveLock,
965 									 0,
966 									 RangeVarCallbackForRenameRule,
967 									 NULL);
968 
969 	/* Have lock already, so just need to build relcache entry. */
970 	targetrel = relation_open(relid, NoLock);
971 
972 	/* Prepare to modify pg_rewrite */
973 	pg_rewrite_desc = heap_open(RewriteRelationId, RowExclusiveLock);
974 
975 	/* Fetch the rule's entry (it had better exist) */
976 	ruletup = SearchSysCacheCopy2(RULERELNAME,
977 								  ObjectIdGetDatum(relid),
978 								  PointerGetDatum(oldName));
979 	if (!HeapTupleIsValid(ruletup))
980 		ereport(ERROR,
981 				(errcode(ERRCODE_UNDEFINED_OBJECT),
982 				 errmsg("rule \"%s\" for relation \"%s\" does not exist",
983 						oldName, RelationGetRelationName(targetrel))));
984 	ruleform = (Form_pg_rewrite) GETSTRUCT(ruletup);
985 	ruleOid = HeapTupleGetOid(ruletup);
986 
987 	/* rule with the new name should not already exist */
988 	if (IsDefinedRewriteRule(relid, newName))
989 		ereport(ERROR,
990 				(errcode(ERRCODE_DUPLICATE_OBJECT),
991 				 errmsg("rule \"%s\" for relation \"%s\" already exists",
992 						newName, RelationGetRelationName(targetrel))));
993 
994 	/*
995 	 * We disallow renaming ON SELECT rules, because they should always be
996 	 * named "_RETURN".
997 	 */
998 	if (ruleform->ev_type == CMD_SELECT + '0')
999 		ereport(ERROR,
1000 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
1001 				 errmsg("renaming an ON SELECT rule is not allowed")));
1002 
1003 	/* OK, do the update */
1004 	namestrcpy(&(ruleform->rulename), newName);
1005 
1006 	CatalogTupleUpdate(pg_rewrite_desc, &ruletup->t_self, ruletup);
1007 
1008 	heap_freetuple(ruletup);
1009 	heap_close(pg_rewrite_desc, RowExclusiveLock);
1010 
1011 	/*
1012 	 * Invalidate relation's relcache entry so that other backends (and this
1013 	 * one too!) are sent SI message to make them rebuild relcache entries.
1014 	 * (Ideally this should happen automatically...)
1015 	 */
1016 	CacheInvalidateRelcache(targetrel);
1017 
1018 	ObjectAddressSet(address, RewriteRelationId, ruleOid);
1019 
1020 	/*
1021 	 * Close rel, but keep exclusive lock!
1022 	 */
1023 	relation_close(targetrel, NoLock);
1024 
1025 	return address;
1026 }
1027