1 /*-------------------------------------------------------------------------
2  *
3  * rewriteDefine.c
4  *	  routines for defining a rewrite rule
5  *
6  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/rewrite/rewriteDefine.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/heapam.h"
18 #include "access/htup_details.h"
19 #include "access/multixact.h"
20 #include "access/transam.h"
21 #include "access/xact.h"
22 #include "catalog/catalog.h"
23 #include "catalog/dependency.h"
24 #include "catalog/heap.h"
25 #include "catalog/indexing.h"
26 #include "catalog/namespace.h"
27 #include "catalog/objectaccess.h"
28 #include "catalog/pg_inherits_fn.h"
29 #include "catalog/pg_rewrite.h"
30 #include "catalog/storage.h"
31 #include "commands/policy.h"
32 #include "miscadmin.h"
33 #include "nodes/nodeFuncs.h"
34 #include "parser/parse_utilcmd.h"
35 #include "rewrite/rewriteDefine.h"
36 #include "rewrite/rewriteManip.h"
37 #include "rewrite/rewriteSupport.h"
38 #include "utils/acl.h"
39 #include "utils/builtins.h"
40 #include "utils/inval.h"
41 #include "utils/lsyscache.h"
42 #include "utils/rel.h"
43 #include "utils/snapmgr.h"
44 #include "utils/syscache.h"
45 #include "utils/tqual.h"
46 
47 
48 static void checkRuleResultList(List *targetList, TupleDesc resultDesc,
49 					bool isSelect, bool requireColumnNameMatch);
50 static bool setRuleCheckAsUser_walker(Node *node, Oid *context);
51 static void setRuleCheckAsUser_Query(Query *qry, Oid userid);
52 
53 
54 /*
55  * InsertRule -
56  *	  takes the arguments and inserts them as a row into the system
57  *	  relation "pg_rewrite"
58  */
59 static Oid
InsertRule(char * rulname,int evtype,Oid eventrel_oid,bool evinstead,Node * event_qual,List * action,bool replace)60 InsertRule(char *rulname,
61 		   int evtype,
62 		   Oid eventrel_oid,
63 		   bool evinstead,
64 		   Node *event_qual,
65 		   List *action,
66 		   bool replace)
67 {
68 	char	   *evqual = nodeToString(event_qual);
69 	char	   *actiontree = nodeToString((Node *) action);
70 	Datum		values[Natts_pg_rewrite];
71 	bool		nulls[Natts_pg_rewrite];
72 	bool		replaces[Natts_pg_rewrite];
73 	NameData	rname;
74 	Relation	pg_rewrite_desc;
75 	HeapTuple	tup,
76 				oldtup;
77 	Oid			rewriteObjectId;
78 	ObjectAddress myself,
79 				referenced;
80 	bool		is_update = false;
81 
82 	/*
83 	 * Set up *nulls and *values arrays
84 	 */
85 	MemSet(nulls, false, sizeof(nulls));
86 
87 	namestrcpy(&rname, rulname);
88 	values[Anum_pg_rewrite_rulename - 1] = NameGetDatum(&rname);
89 	values[Anum_pg_rewrite_ev_class - 1] = ObjectIdGetDatum(eventrel_oid);
90 	values[Anum_pg_rewrite_ev_type - 1] = CharGetDatum(evtype + '0');
91 	values[Anum_pg_rewrite_ev_enabled - 1] = CharGetDatum(RULE_FIRES_ON_ORIGIN);
92 	values[Anum_pg_rewrite_is_instead - 1] = BoolGetDatum(evinstead);
93 	values[Anum_pg_rewrite_ev_qual - 1] = CStringGetTextDatum(evqual);
94 	values[Anum_pg_rewrite_ev_action - 1] = CStringGetTextDatum(actiontree);
95 
96 	/*
97 	 * Ready to store new pg_rewrite tuple
98 	 */
99 	pg_rewrite_desc = heap_open(RewriteRelationId, RowExclusiveLock);
100 
101 	/*
102 	 * Check to see if we are replacing an existing tuple
103 	 */
104 	oldtup = SearchSysCache2(RULERELNAME,
105 							 ObjectIdGetDatum(eventrel_oid),
106 							 PointerGetDatum(rulname));
107 
108 	if (HeapTupleIsValid(oldtup))
109 	{
110 		if (!replace)
111 			ereport(ERROR,
112 					(errcode(ERRCODE_DUPLICATE_OBJECT),
113 					 errmsg("rule \"%s\" for relation \"%s\" already exists",
114 							rulname, get_rel_name(eventrel_oid))));
115 
116 		/*
117 		 * When replacing, we don't need to replace every attribute
118 		 */
119 		MemSet(replaces, false, sizeof(replaces));
120 		replaces[Anum_pg_rewrite_ev_type - 1] = true;
121 		replaces[Anum_pg_rewrite_is_instead - 1] = true;
122 		replaces[Anum_pg_rewrite_ev_qual - 1] = true;
123 		replaces[Anum_pg_rewrite_ev_action - 1] = true;
124 
125 		tup = heap_modify_tuple(oldtup, RelationGetDescr(pg_rewrite_desc),
126 								values, nulls, replaces);
127 
128 		simple_heap_update(pg_rewrite_desc, &tup->t_self, tup);
129 
130 		ReleaseSysCache(oldtup);
131 
132 		rewriteObjectId = HeapTupleGetOid(tup);
133 		is_update = true;
134 	}
135 	else
136 	{
137 		tup = heap_form_tuple(pg_rewrite_desc->rd_att, values, nulls);
138 
139 		rewriteObjectId = simple_heap_insert(pg_rewrite_desc, tup);
140 	}
141 
142 	/* Need to update indexes in either case */
143 	CatalogUpdateIndexes(pg_rewrite_desc, tup);
144 
145 	heap_freetuple(tup);
146 
147 	/* If replacing, get rid of old dependencies and make new ones */
148 	if (is_update)
149 		deleteDependencyRecordsFor(RewriteRelationId, rewriteObjectId, false);
150 
151 	/*
152 	 * Install dependency on rule's relation to ensure it will go away on
153 	 * relation deletion.  If the rule is ON SELECT, make the dependency
154 	 * implicit --- this prevents deleting a view's SELECT rule.  Other kinds
155 	 * of rules can be AUTO.
156 	 */
157 	myself.classId = RewriteRelationId;
158 	myself.objectId = rewriteObjectId;
159 	myself.objectSubId = 0;
160 
161 	referenced.classId = RelationRelationId;
162 	referenced.objectId = eventrel_oid;
163 	referenced.objectSubId = 0;
164 
165 	recordDependencyOn(&myself, &referenced,
166 			 (evtype == CMD_SELECT) ? DEPENDENCY_INTERNAL : DEPENDENCY_AUTO);
167 
168 	/*
169 	 * Also install dependencies on objects referenced in action and qual.
170 	 */
171 	recordDependencyOnExpr(&myself, (Node *) action, NIL,
172 						   DEPENDENCY_NORMAL);
173 
174 	if (event_qual != NULL)
175 	{
176 		/* Find query containing OLD/NEW rtable entries */
177 		Query	   *qry = (Query *) linitial(action);
178 
179 		qry = getInsertSelectQuery(qry, NULL);
180 		recordDependencyOnExpr(&myself, event_qual, qry->rtable,
181 							   DEPENDENCY_NORMAL);
182 	}
183 
184 	/* Post creation hook for new rule */
185 	InvokeObjectPostCreateHook(RewriteRelationId, rewriteObjectId, 0);
186 
187 	heap_close(pg_rewrite_desc, RowExclusiveLock);
188 
189 	return rewriteObjectId;
190 }
191 
192 /*
193  * DefineRule
194  *		Execute a CREATE RULE command.
195  */
196 ObjectAddress
DefineRule(RuleStmt * stmt,const char * queryString)197 DefineRule(RuleStmt *stmt, const char *queryString)
198 {
199 	List	   *actions;
200 	Node	   *whereClause;
201 	Oid			relId;
202 
203 	/* Parse analysis. */
204 	transformRuleStmt(stmt, queryString, &actions, &whereClause);
205 
206 	/*
207 	 * Find and lock the relation.  Lock level should match
208 	 * DefineQueryRewrite.
209 	 */
210 	relId = RangeVarGetRelid(stmt->relation, AccessExclusiveLock, false);
211 
212 	/* ... and execute */
213 	return DefineQueryRewrite(stmt->rulename,
214 							  relId,
215 							  whereClause,
216 							  stmt->event,
217 							  stmt->instead,
218 							  stmt->replace,
219 							  actions);
220 }
221 
222 
223 /*
224  * DefineQueryRewrite
225  *		Create a rule
226  *
227  * This is essentially the same as DefineRule() except that the rule's
228  * action and qual have already been passed through parse analysis.
229  */
230 ObjectAddress
DefineQueryRewrite(char * rulename,Oid event_relid,Node * event_qual,CmdType event_type,bool is_instead,bool replace,List * action)231 DefineQueryRewrite(char *rulename,
232 				   Oid event_relid,
233 				   Node *event_qual,
234 				   CmdType event_type,
235 				   bool is_instead,
236 				   bool replace,
237 				   List *action)
238 {
239 	Relation	event_relation;
240 	ListCell   *l;
241 	Query	   *query;
242 	bool		RelisBecomingView = false;
243 	Oid			ruleId = InvalidOid;
244 	ObjectAddress address;
245 
246 	/*
247 	 * If we are installing an ON SELECT rule, we had better grab
248 	 * AccessExclusiveLock to ensure no SELECTs are currently running on the
249 	 * event relation. For other types of rules, it would be sufficient to
250 	 * grab ShareRowExclusiveLock to lock out insert/update/delete actions and
251 	 * to ensure that we lock out current CREATE RULE statements; but because
252 	 * of race conditions in access to catalog entries, we can't do that yet.
253 	 *
254 	 * Note that this lock level should match the one used in DefineRule.
255 	 */
256 	event_relation = heap_open(event_relid, AccessExclusiveLock);
257 
258 	/*
259 	 * Verify relation is of a type that rules can sensibly be applied to.
260 	 * Internal callers can target materialized views, but transformRuleStmt()
261 	 * blocks them for users.  Don't mention them in the error message.
262 	 */
263 	if (event_relation->rd_rel->relkind != RELKIND_RELATION &&
264 		event_relation->rd_rel->relkind != RELKIND_MATVIEW &&
265 		event_relation->rd_rel->relkind != RELKIND_VIEW)
266 		ereport(ERROR,
267 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
268 				 errmsg("\"%s\" is not a table or view",
269 						RelationGetRelationName(event_relation))));
270 
271 	if (!allowSystemTableMods && IsSystemRelation(event_relation))
272 		ereport(ERROR,
273 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
274 				 errmsg("permission denied: \"%s\" is a system catalog",
275 						RelationGetRelationName(event_relation))));
276 
277 	/*
278 	 * Check user has permission to apply rules to this relation.
279 	 */
280 	if (!pg_class_ownercheck(event_relid, GetUserId()))
281 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
282 					   RelationGetRelationName(event_relation));
283 
284 	/*
285 	 * No rule actions that modify OLD or NEW
286 	 */
287 	foreach(l, action)
288 	{
289 		query = (Query *) lfirst(l);
290 		if (query->resultRelation == 0)
291 			continue;
292 		/* Don't be fooled by INSERT/SELECT */
293 		if (query != getInsertSelectQuery(query, NULL))
294 			continue;
295 		if (query->resultRelation == PRS2_OLD_VARNO)
296 			ereport(ERROR,
297 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
298 					 errmsg("rule actions on OLD are not implemented"),
299 					 errhint("Use views or triggers instead.")));
300 		if (query->resultRelation == PRS2_NEW_VARNO)
301 			ereport(ERROR,
302 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
303 					 errmsg("rule actions on NEW are not implemented"),
304 					 errhint("Use triggers instead.")));
305 	}
306 
307 	if (event_type == CMD_SELECT)
308 	{
309 		/*
310 		 * Rules ON SELECT are restricted to view definitions
311 		 *
312 		 * So there cannot be INSTEAD NOTHING, ...
313 		 */
314 		if (list_length(action) == 0)
315 			ereport(ERROR,
316 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
317 			   errmsg("INSTEAD NOTHING rules on SELECT are not implemented"),
318 					 errhint("Use views instead.")));
319 
320 		/*
321 		 * ... there cannot be multiple actions, ...
322 		 */
323 		if (list_length(action) > 1)
324 			ereport(ERROR,
325 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
326 					 errmsg("multiple actions for rules on SELECT are not implemented")));
327 
328 		/*
329 		 * ... the one action must be a SELECT, ...
330 		 */
331 		query = (Query *) linitial(action);
332 		if (!is_instead ||
333 			query->commandType != CMD_SELECT ||
334 			query->utilityStmt != NULL)
335 			ereport(ERROR,
336 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
337 				 errmsg("rules on SELECT must have action INSTEAD SELECT")));
338 
339 		/*
340 		 * ... it cannot contain data-modifying WITH ...
341 		 */
342 		if (query->hasModifyingCTE)
343 			ereport(ERROR,
344 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
345 					 errmsg("rules on SELECT must not contain data-modifying statements in WITH")));
346 
347 		/*
348 		 * ... there can be no rule qual, ...
349 		 */
350 		if (event_qual != NULL)
351 			ereport(ERROR,
352 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
353 					 errmsg("event qualifications are not implemented for rules on SELECT")));
354 
355 		/*
356 		 * ... the targetlist of the SELECT action must exactly match the
357 		 * event relation, ...
358 		 */
359 		checkRuleResultList(query->targetList,
360 							RelationGetDescr(event_relation),
361 							true,
362 							event_relation->rd_rel->relkind !=
363 							RELKIND_MATVIEW);
364 
365 		/*
366 		 * ... there must not be another ON SELECT rule already ...
367 		 */
368 		if (!replace && event_relation->rd_rules != NULL)
369 		{
370 			int			i;
371 
372 			for (i = 0; i < event_relation->rd_rules->numLocks; i++)
373 			{
374 				RewriteRule *rule;
375 
376 				rule = event_relation->rd_rules->rules[i];
377 				if (rule->event == CMD_SELECT)
378 					ereport(ERROR,
379 						  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
380 						   errmsg("\"%s\" is already a view",
381 								  RelationGetRelationName(event_relation))));
382 			}
383 		}
384 
385 		/*
386 		 * ... and finally the rule must be named _RETURN.
387 		 */
388 		if (strcmp(rulename, ViewSelectRuleName) != 0)
389 		{
390 			/*
391 			 * In versions before 7.3, the expected name was _RETviewname. For
392 			 * backwards compatibility with old pg_dump output, accept that
393 			 * and silently change it to _RETURN.  Since this is just a quick
394 			 * backwards-compatibility hack, limit the number of characters
395 			 * checked to a few less than NAMEDATALEN; this saves having to
396 			 * worry about where a multibyte character might have gotten
397 			 * truncated.
398 			 */
399 			if (strncmp(rulename, "_RET", 4) != 0 ||
400 				strncmp(rulename + 4, RelationGetRelationName(event_relation),
401 						NAMEDATALEN - 4 - 4) != 0)
402 				ereport(ERROR,
403 						(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
404 						 errmsg("view rule for \"%s\" must be named \"%s\"",
405 								RelationGetRelationName(event_relation),
406 								ViewSelectRuleName)));
407 			rulename = pstrdup(ViewSelectRuleName);
408 		}
409 
410 		/*
411 		 * Are we converting a relation to a view?
412 		 *
413 		 * If so, check that the relation is empty because the storage for the
414 		 * relation is going to be deleted.  Also insist that the rel not have
415 		 * any triggers, indexes, child or parent tables, RLS policies, or RLS
416 		 * enabled.  (Note: some of these tests are too strict, because they
417 		 * will reject relations that once had such but don't anymore.  But we
418 		 * don't really care, because this whole business of converting
419 		 * relations to views is just a kluge to allow dump/reload of views
420 		 * that participate in circular dependencies.)
421 		 */
422 		if (event_relation->rd_rel->relkind != RELKIND_VIEW &&
423 			event_relation->rd_rel->relkind != RELKIND_MATVIEW)
424 		{
425 			HeapScanDesc scanDesc;
426 			Snapshot	snapshot;
427 
428 			snapshot = RegisterSnapshot(GetLatestSnapshot());
429 			scanDesc = heap_beginscan(event_relation, snapshot, 0, NULL);
430 			if (heap_getnext(scanDesc, ForwardScanDirection) != NULL)
431 				ereport(ERROR,
432 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
433 						 errmsg("could not convert table \"%s\" to a view because it is not empty",
434 								RelationGetRelationName(event_relation))));
435 			heap_endscan(scanDesc);
436 			UnregisterSnapshot(snapshot);
437 
438 			if (event_relation->rd_rel->relhastriggers)
439 				ereport(ERROR,
440 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
441 						 errmsg("could not convert table \"%s\" to a view because it has triggers",
442 								RelationGetRelationName(event_relation)),
443 						 errhint("In particular, the table cannot be involved in any foreign key relationships.")));
444 
445 			if (event_relation->rd_rel->relhasindex)
446 				ereport(ERROR,
447 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
448 						 errmsg("could not convert table \"%s\" to a view because it has indexes",
449 								RelationGetRelationName(event_relation))));
450 
451 			if (event_relation->rd_rel->relhassubclass)
452 				ereport(ERROR,
453 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
454 						 errmsg("could not convert table \"%s\" to a view because it has child tables",
455 								RelationGetRelationName(event_relation))));
456 
457 			if (has_superclass(RelationGetRelid(event_relation)))
458 				ereport(ERROR,
459 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
460 						 errmsg("could not convert table \"%s\" to a view because it has parent tables",
461 								RelationGetRelationName(event_relation))));
462 
463 			if (event_relation->rd_rel->relrowsecurity)
464 				ereport(ERROR,
465 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
466 						 errmsg("could not convert table \"%s\" to a view because it has row security enabled",
467 								RelationGetRelationName(event_relation))));
468 
469 			if (relation_has_policies(event_relation))
470 				ereport(ERROR,
471 						(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
472 						 errmsg("could not convert table \"%s\" to a view because it has row security policies",
473 								RelationGetRelationName(event_relation))));
474 
475 			RelisBecomingView = true;
476 		}
477 	}
478 	else
479 	{
480 		/*
481 		 * For non-SELECT rules, a RETURNING list can appear in at most one of
482 		 * the actions ... and there can't be any RETURNING list at all in a
483 		 * conditional or non-INSTEAD rule.  (Actually, there can be at most
484 		 * one RETURNING list across all rules on the same event, but it seems
485 		 * best to enforce that at rule expansion time.)  If there is a
486 		 * RETURNING list, it must match the event relation.
487 		 */
488 		bool		haveReturning = false;
489 
490 		foreach(l, action)
491 		{
492 			query = (Query *) lfirst(l);
493 
494 			if (!query->returningList)
495 				continue;
496 			if (haveReturning)
497 				ereport(ERROR,
498 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
499 				  errmsg("cannot have multiple RETURNING lists in a rule")));
500 			haveReturning = true;
501 			if (event_qual != NULL)
502 				ereport(ERROR,
503 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
504 						 errmsg("RETURNING lists are not supported in conditional rules")));
505 			if (!is_instead)
506 				ereport(ERROR,
507 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
508 						 errmsg("RETURNING lists are not supported in non-INSTEAD rules")));
509 			checkRuleResultList(query->returningList,
510 								RelationGetDescr(event_relation),
511 								false, false);
512 		}
513 	}
514 
515 	/*
516 	 * This rule is allowed - prepare to install it.
517 	 */
518 
519 	/* discard rule if it's null action and not INSTEAD; it's a no-op */
520 	if (action != NIL || is_instead)
521 	{
522 		ruleId = InsertRule(rulename,
523 							event_type,
524 							event_relid,
525 							is_instead,
526 							event_qual,
527 							action,
528 							replace);
529 
530 		/*
531 		 * Set pg_class 'relhasrules' field TRUE for event relation.
532 		 *
533 		 * Important side effect: an SI notice is broadcast to force all
534 		 * backends (including me!) to update relcache entries with the new
535 		 * rule.
536 		 */
537 		SetRelationRuleStatus(event_relid, true);
538 	}
539 
540 	/* ---------------------------------------------------------------------
541 	 * If the relation is becoming a view:
542 	 * - delete the associated storage files
543 	 * - get rid of any system attributes in pg_attribute; a view shouldn't
544 	 *	 have any of those
545 	 * - remove the toast table; there is no need for it anymore, and its
546 	 *	 presence would make vacuum slightly more complicated
547 	 * - set relkind to RELKIND_VIEW, and adjust other pg_class fields
548 	 *	 to be appropriate for a view
549 	 *
550 	 * NB: we had better have AccessExclusiveLock to do this ...
551 	 * ---------------------------------------------------------------------
552 	 */
553 	if (RelisBecomingView)
554 	{
555 		Relation	relationRelation;
556 		Oid			toastrelid;
557 		HeapTuple	classTup;
558 		Form_pg_class classForm;
559 
560 		relationRelation = heap_open(RelationRelationId, RowExclusiveLock);
561 		toastrelid = event_relation->rd_rel->reltoastrelid;
562 
563 		/* drop storage while table still looks like a table  */
564 		RelationDropStorage(event_relation);
565 		DeleteSystemAttributeTuples(event_relid);
566 
567 		/*
568 		 * Drop the toast table if any.  (This won't take care of updating the
569 		 * toast fields in the relation's own pg_class entry; we handle that
570 		 * below.)
571 		 */
572 		if (OidIsValid(toastrelid))
573 		{
574 			ObjectAddress toastobject;
575 
576 			/*
577 			 * Delete the dependency of the toast relation on the main
578 			 * relation so we can drop the former without dropping the latter.
579 			 */
580 			deleteDependencyRecordsFor(RelationRelationId, toastrelid,
581 									   false);
582 
583 			/* Make deletion of dependency record visible */
584 			CommandCounterIncrement();
585 
586 			/* Now drop toast table, including its index */
587 			toastobject.classId = RelationRelationId;
588 			toastobject.objectId = toastrelid;
589 			toastobject.objectSubId = 0;
590 			performDeletion(&toastobject, DROP_RESTRICT,
591 							PERFORM_DELETION_INTERNAL);
592 		}
593 
594 		/*
595 		 * SetRelationRuleStatus may have updated the pg_class row, so we must
596 		 * advance the command counter before trying to update it again.
597 		 */
598 		CommandCounterIncrement();
599 
600 		/*
601 		 * Fix pg_class entry to look like a normal view's, including setting
602 		 * the correct relkind and removal of reltoastrelid of the toast table
603 		 * we potentially removed above.
604 		 */
605 		classTup = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(event_relid));
606 		if (!HeapTupleIsValid(classTup))
607 			elog(ERROR, "cache lookup failed for relation %u", event_relid);
608 		classForm = (Form_pg_class) GETSTRUCT(classTup);
609 
610 		classForm->reltablespace = InvalidOid;
611 		classForm->relpages = 0;
612 		classForm->reltuples = 0;
613 		classForm->relallvisible = 0;
614 		classForm->reltoastrelid = InvalidOid;
615 		classForm->relhasindex = false;
616 		classForm->relkind = RELKIND_VIEW;
617 		classForm->relhasoids = false;
618 		classForm->relhaspkey = false;
619 		classForm->relfrozenxid = InvalidTransactionId;
620 		classForm->relminmxid = InvalidMultiXactId;
621 		classForm->relreplident = REPLICA_IDENTITY_NOTHING;
622 
623 		simple_heap_update(relationRelation, &classTup->t_self, classTup);
624 		CatalogUpdateIndexes(relationRelation, classTup);
625 
626 		heap_freetuple(classTup);
627 		heap_close(relationRelation, RowExclusiveLock);
628 	}
629 
630 	ObjectAddressSet(address, RewriteRelationId, ruleId);
631 
632 	/* Close rel, but keep lock till commit... */
633 	heap_close(event_relation, NoLock);
634 
635 	return address;
636 }
637 
638 /*
639  * checkRuleResultList
640  *		Verify that targetList produces output compatible with a tupledesc
641  *
642  * The targetList might be either a SELECT targetlist, or a RETURNING list;
643  * isSelect tells which.  This is used for choosing error messages.
644  *
645  * A SELECT targetlist may optionally require that column names match.
646  */
647 static void
checkRuleResultList(List * targetList,TupleDesc resultDesc,bool isSelect,bool requireColumnNameMatch)648 checkRuleResultList(List *targetList, TupleDesc resultDesc, bool isSelect,
649 					bool requireColumnNameMatch)
650 {
651 	ListCell   *tllist;
652 	int			i;
653 
654 	/* Only a SELECT may require a column name match. */
655 	Assert(isSelect || !requireColumnNameMatch);
656 
657 	i = 0;
658 	foreach(tllist, targetList)
659 	{
660 		TargetEntry *tle = (TargetEntry *) lfirst(tllist);
661 		Oid			tletypid;
662 		int32		tletypmod;
663 		Form_pg_attribute attr;
664 		char	   *attname;
665 
666 		/* resjunk entries may be ignored */
667 		if (tle->resjunk)
668 			continue;
669 		i++;
670 		if (i > resultDesc->natts)
671 			ereport(ERROR,
672 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
673 					 isSelect ?
674 				   errmsg("SELECT rule's target list has too many entries") :
675 					 errmsg("RETURNING list has too many entries")));
676 
677 		attr = resultDesc->attrs[i - 1];
678 		attname = NameStr(attr->attname);
679 
680 		/*
681 		 * Disallow dropped columns in the relation.  This is not really
682 		 * expected to happen when creating an ON SELECT rule.  It'd be
683 		 * possible if someone tried to convert a relation with dropped
684 		 * columns to a view, but the only case we care about supporting
685 		 * table-to-view conversion for is pg_dump, and pg_dump won't do that.
686 		 *
687 		 * Unfortunately, the situation is also possible when adding a rule
688 		 * with RETURNING to a regular table, and rejecting that case is
689 		 * altogether more annoying.  In principle we could support it by
690 		 * modifying the targetlist to include dummy NULL columns
691 		 * corresponding to the dropped columns in the tupdesc.  However,
692 		 * places like ruleutils.c would have to be fixed to not process such
693 		 * entries, and that would take an uncertain and possibly rather large
694 		 * amount of work.  (Note we could not dodge that by marking the dummy
695 		 * columns resjunk, since it's precisely the non-resjunk tlist columns
696 		 * that are expected to correspond to table columns.)
697 		 */
698 		if (attr->attisdropped)
699 			ereport(ERROR,
700 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
701 					 isSelect ?
702 					 errmsg("cannot convert relation containing dropped columns to view") :
703 					 errmsg("cannot create a RETURNING list for a relation containing dropped columns")));
704 
705 		/* Check name match if required; no need for two error texts here */
706 		if (requireColumnNameMatch && strcmp(tle->resname, attname) != 0)
707 			ereport(ERROR,
708 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
709 					 errmsg("SELECT rule's target entry %d has different column name from column \"%s\"",
710 							i, attname),
711 					 errdetail("SELECT target entry is named \"%s\".",
712 							   tle->resname)));
713 
714 		/* Check type match. */
715 		tletypid = exprType((Node *) tle->expr);
716 		if (attr->atttypid != tletypid)
717 			ereport(ERROR,
718 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
719 					 isSelect ?
720 					 errmsg("SELECT rule's target entry %d has different type from column \"%s\"",
721 							i, attname) :
722 					 errmsg("RETURNING list's entry %d has different type from column \"%s\"",
723 							i, attname),
724 					 isSelect ?
725 					 errdetail("SELECT target entry has type %s, but column has type %s.",
726 							   format_type_be(tletypid),
727 							   format_type_be(attr->atttypid)) :
728 					 errdetail("RETURNING list entry has type %s, but column has type %s.",
729 							   format_type_be(tletypid),
730 							   format_type_be(attr->atttypid))));
731 
732 		/*
733 		 * Allow typmods to be different only if one of them is -1, ie,
734 		 * "unspecified".  This is necessary for cases like "numeric", where
735 		 * the table will have a filled-in default length but the select
736 		 * rule's expression will probably have typmod = -1.
737 		 */
738 		tletypmod = exprTypmod((Node *) tle->expr);
739 		if (attr->atttypmod != tletypmod &&
740 			attr->atttypmod != -1 && tletypmod != -1)
741 			ereport(ERROR,
742 					(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
743 					 isSelect ?
744 					 errmsg("SELECT rule's target entry %d has different size from column \"%s\"",
745 							i, attname) :
746 					 errmsg("RETURNING list's entry %d has different size from column \"%s\"",
747 							i, attname),
748 					 isSelect ?
749 					 errdetail("SELECT target entry has type %s, but column has type %s.",
750 							   format_type_with_typemod(tletypid, tletypmod),
751 							   format_type_with_typemod(attr->atttypid,
752 														attr->atttypmod)) :
753 					 errdetail("RETURNING list entry has type %s, but column has type %s.",
754 							   format_type_with_typemod(tletypid, tletypmod),
755 							   format_type_with_typemod(attr->atttypid,
756 														attr->atttypmod))));
757 	}
758 
759 	if (i != resultDesc->natts)
760 		ereport(ERROR,
761 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
762 				 isSelect ?
763 				 errmsg("SELECT rule's target list has too few entries") :
764 				 errmsg("RETURNING list has too few entries")));
765 }
766 
767 /*
768  * setRuleCheckAsUser
769  *		Recursively scan a query or expression tree and set the checkAsUser
770  *		field to the given userid in all rtable entries.
771  *
772  * Note: for a view (ON SELECT rule), the checkAsUser field of the OLD
773  * RTE entry will be overridden when the view rule is expanded, and the
774  * checkAsUser field of the NEW entry is irrelevant because that entry's
775  * requiredPerms bits will always be zero.  However, for other types of rules
776  * it's important to set these fields to match the rule owner.  So we just set
777  * them always.
778  */
779 void
setRuleCheckAsUser(Node * node,Oid userid)780 setRuleCheckAsUser(Node *node, Oid userid)
781 {
782 	(void) setRuleCheckAsUser_walker(node, &userid);
783 }
784 
785 static bool
setRuleCheckAsUser_walker(Node * node,Oid * context)786 setRuleCheckAsUser_walker(Node *node, Oid *context)
787 {
788 	if (node == NULL)
789 		return false;
790 	if (IsA(node, Query))
791 	{
792 		setRuleCheckAsUser_Query((Query *) node, *context);
793 		return false;
794 	}
795 	return expression_tree_walker(node, setRuleCheckAsUser_walker,
796 								  (void *) context);
797 }
798 
799 static void
setRuleCheckAsUser_Query(Query * qry,Oid userid)800 setRuleCheckAsUser_Query(Query *qry, Oid userid)
801 {
802 	ListCell   *l;
803 
804 	/* Set all the RTEs in this query node */
805 	foreach(l, qry->rtable)
806 	{
807 		RangeTblEntry *rte = (RangeTblEntry *) lfirst(l);
808 
809 		if (rte->rtekind == RTE_SUBQUERY)
810 		{
811 			/* Recurse into subquery in FROM */
812 			setRuleCheckAsUser_Query(rte->subquery, userid);
813 		}
814 		else
815 			rte->checkAsUser = userid;
816 	}
817 
818 	/* Recurse into subquery-in-WITH */
819 	foreach(l, qry->cteList)
820 	{
821 		CommonTableExpr *cte = (CommonTableExpr *) lfirst(l);
822 
823 		setRuleCheckAsUser_Query((Query *) cte->ctequery, userid);
824 	}
825 
826 	/* If there are sublinks, search for them and process their RTEs */
827 	if (qry->hasSubLinks)
828 		query_tree_walker(qry, setRuleCheckAsUser_walker, (void *) &userid,
829 						  QTW_IGNORE_RC_SUBQUERIES);
830 }
831 
832 
833 /*
834  * Change the firing semantics of an existing rule.
835  */
836 void
EnableDisableRule(Relation rel,const char * rulename,char fires_when)837 EnableDisableRule(Relation rel, const char *rulename,
838 				  char fires_when)
839 {
840 	Relation	pg_rewrite_desc;
841 	Oid			owningRel = RelationGetRelid(rel);
842 	Oid			eventRelationOid;
843 	HeapTuple	ruletup;
844 	bool		changed = false;
845 
846 	/*
847 	 * Find the rule tuple to change.
848 	 */
849 	pg_rewrite_desc = heap_open(RewriteRelationId, RowExclusiveLock);
850 	ruletup = SearchSysCacheCopy2(RULERELNAME,
851 								  ObjectIdGetDatum(owningRel),
852 								  PointerGetDatum(rulename));
853 	if (!HeapTupleIsValid(ruletup))
854 		ereport(ERROR,
855 				(errcode(ERRCODE_UNDEFINED_OBJECT),
856 				 errmsg("rule \"%s\" for relation \"%s\" does not exist",
857 						rulename, get_rel_name(owningRel))));
858 
859 	/*
860 	 * Verify that the user has appropriate permissions.
861 	 */
862 	eventRelationOid = ((Form_pg_rewrite) GETSTRUCT(ruletup))->ev_class;
863 	Assert(eventRelationOid == owningRel);
864 	if (!pg_class_ownercheck(eventRelationOid, GetUserId()))
865 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
866 					   get_rel_name(eventRelationOid));
867 
868 	/*
869 	 * Change ev_enabled if it is different from the desired new state.
870 	 */
871 	if (DatumGetChar(((Form_pg_rewrite) GETSTRUCT(ruletup))->ev_enabled) !=
872 		fires_when)
873 	{
874 		((Form_pg_rewrite) GETSTRUCT(ruletup))->ev_enabled =
875 			CharGetDatum(fires_when);
876 		simple_heap_update(pg_rewrite_desc, &ruletup->t_self, ruletup);
877 
878 		/* keep system catalog indexes current */
879 		CatalogUpdateIndexes(pg_rewrite_desc, ruletup);
880 
881 		changed = true;
882 	}
883 
884 	InvokeObjectPostAlterHook(RewriteRelationId,
885 							  HeapTupleGetOid(ruletup), 0);
886 
887 	heap_freetuple(ruletup);
888 	heap_close(pg_rewrite_desc, RowExclusiveLock);
889 
890 	/*
891 	 * If we changed anything, broadcast a SI inval message to force each
892 	 * backend (including our own!) to rebuild relation's relcache entry.
893 	 * Otherwise they will fail to apply the change promptly.
894 	 */
895 	if (changed)
896 		CacheInvalidateRelcache(rel);
897 }
898 
899 
900 /*
901  * Perform permissions and integrity checks before acquiring a relation lock.
902  */
903 static void
RangeVarCallbackForRenameRule(const RangeVar * rv,Oid relid,Oid oldrelid,void * arg)904 RangeVarCallbackForRenameRule(const RangeVar *rv, Oid relid, Oid oldrelid,
905 							  void *arg)
906 {
907 	HeapTuple	tuple;
908 	Form_pg_class form;
909 
910 	tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
911 	if (!HeapTupleIsValid(tuple))
912 		return;					/* concurrently dropped */
913 	form = (Form_pg_class) GETSTRUCT(tuple);
914 
915 	/* only tables and views can have rules */
916 	if (form->relkind != RELKIND_RELATION && form->relkind != RELKIND_VIEW)
917 		ereport(ERROR,
918 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
919 				 errmsg("\"%s\" is not a table or view", rv->relname)));
920 
921 	if (!allowSystemTableMods && IsSystemClass(relid, form))
922 		ereport(ERROR,
923 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
924 				 errmsg("permission denied: \"%s\" is a system catalog",
925 						rv->relname)));
926 
927 	/* you must own the table to rename one of its rules */
928 	if (!pg_class_ownercheck(relid, GetUserId()))
929 		aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS, rv->relname);
930 
931 	ReleaseSysCache(tuple);
932 }
933 
934 /*
935  * Rename an existing rewrite rule.
936  */
937 ObjectAddress
RenameRewriteRule(RangeVar * relation,const char * oldName,const char * newName)938 RenameRewriteRule(RangeVar *relation, const char *oldName,
939 				  const char *newName)
940 {
941 	Oid			relid;
942 	Relation	targetrel;
943 	Relation	pg_rewrite_desc;
944 	HeapTuple	ruletup;
945 	Form_pg_rewrite ruleform;
946 	Oid			ruleOid;
947 	ObjectAddress address;
948 
949 	/*
950 	 * Look up name, check permissions, and acquire lock (which we will NOT
951 	 * release until end of transaction).
952 	 */
953 	relid = RangeVarGetRelidExtended(relation, AccessExclusiveLock,
954 									 false, false,
955 									 RangeVarCallbackForRenameRule,
956 									 NULL);
957 
958 	/* Have lock already, so just need to build relcache entry. */
959 	targetrel = relation_open(relid, NoLock);
960 
961 	/* Prepare to modify pg_rewrite */
962 	pg_rewrite_desc = heap_open(RewriteRelationId, RowExclusiveLock);
963 
964 	/* Fetch the rule's entry (it had better exist) */
965 	ruletup = SearchSysCacheCopy2(RULERELNAME,
966 								  ObjectIdGetDatum(relid),
967 								  PointerGetDatum(oldName));
968 	if (!HeapTupleIsValid(ruletup))
969 		ereport(ERROR,
970 				(errcode(ERRCODE_UNDEFINED_OBJECT),
971 				 errmsg("rule \"%s\" for relation \"%s\" does not exist",
972 						oldName, RelationGetRelationName(targetrel))));
973 	ruleform = (Form_pg_rewrite) GETSTRUCT(ruletup);
974 	ruleOid = HeapTupleGetOid(ruletup);
975 
976 	/* rule with the new name should not already exist */
977 	if (IsDefinedRewriteRule(relid, newName))
978 		ereport(ERROR,
979 				(errcode(ERRCODE_DUPLICATE_OBJECT),
980 				 errmsg("rule \"%s\" for relation \"%s\" already exists",
981 						newName, RelationGetRelationName(targetrel))));
982 
983 	/*
984 	 * We disallow renaming ON SELECT rules, because they should always be
985 	 * named "_RETURN".
986 	 */
987 	if (ruleform->ev_type == CMD_SELECT + '0')
988 		ereport(ERROR,
989 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
990 				 errmsg("renaming an ON SELECT rule is not allowed")));
991 
992 	/* OK, do the update */
993 	namestrcpy(&(ruleform->rulename), newName);
994 
995 	simple_heap_update(pg_rewrite_desc, &ruletup->t_self, ruletup);
996 
997 	/* keep system catalog indexes current */
998 	CatalogUpdateIndexes(pg_rewrite_desc, ruletup);
999 
1000 	heap_freetuple(ruletup);
1001 	heap_close(pg_rewrite_desc, RowExclusiveLock);
1002 
1003 	/*
1004 	 * Invalidate relation's relcache entry so that other backends (and this
1005 	 * one too!) are sent SI message to make them rebuild relcache entries.
1006 	 * (Ideally this should happen automatically...)
1007 	 */
1008 	CacheInvalidateRelcache(targetrel);
1009 
1010 	ObjectAddressSet(address, RewriteRelationId, ruleOid);
1011 
1012 	/*
1013 	 * Close rel, but keep exclusive lock!
1014 	 */
1015 	relation_close(targetrel, NoLock);
1016 
1017 	return address;
1018 }
1019