1 /*-------------------------------------------------------------------------
2  *
3  * matview.c
4  *	  materialized view support
5  *
6  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/commands/matview.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "access/htup_details.h"
20 #include "access/multixact.h"
21 #include "access/tableam.h"
22 #include "access/xact.h"
23 #include "access/xlog.h"
24 #include "catalog/catalog.h"
25 #include "catalog/indexing.h"
26 #include "catalog/namespace.h"
27 #include "catalog/pg_am.h"
28 #include "catalog/pg_opclass.h"
29 #include "catalog/pg_operator.h"
30 #include "commands/cluster.h"
31 #include "commands/matview.h"
32 #include "commands/tablecmds.h"
33 #include "commands/tablespace.h"
34 #include "executor/executor.h"
35 #include "executor/spi.h"
36 #include "miscadmin.h"
37 #include "parser/parse_relation.h"
38 #include "pgstat.h"
39 #include "rewrite/rewriteHandler.h"
40 #include "storage/lmgr.h"
41 #include "storage/smgr.h"
42 #include "tcop/tcopprot.h"
43 #include "utils/builtins.h"
44 #include "utils/lsyscache.h"
45 #include "utils/rel.h"
46 #include "utils/snapmgr.h"
47 #include "utils/syscache.h"
48 
49 
50 typedef struct
51 {
52 	DestReceiver pub;			/* publicly-known function pointers */
53 	Oid			transientoid;	/* OID of new heap into which to store */
54 	/* These fields are filled by transientrel_startup: */
55 	Relation	transientrel;	/* relation to write to */
56 	CommandId	output_cid;		/* cmin to insert in output tuples */
57 	int			ti_options;		/* table_tuple_insert performance options */
58 	BulkInsertState bistate;	/* bulk insert state */
59 } DR_transientrel;
60 
61 static int	matview_maintenance_depth = 0;
62 
63 static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
64 static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
65 static void transientrel_shutdown(DestReceiver *self);
66 static void transientrel_destroy(DestReceiver *self);
67 static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query,
68 									   const char *queryString);
69 static char *make_temptable_name_n(char *tempname, int n);
70 static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
71 								   int save_sec_context);
72 static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence);
73 static bool is_usable_unique_index(Relation indexRel);
74 static void OpenMatViewIncrementalMaintenance(void);
75 static void CloseMatViewIncrementalMaintenance(void);
76 
77 /*
78  * SetMatViewPopulatedState
79  *		Mark a materialized view as populated, or not.
80  *
81  * NOTE: caller must be holding an appropriate lock on the relation.
82  */
83 void
SetMatViewPopulatedState(Relation relation,bool newstate)84 SetMatViewPopulatedState(Relation relation, bool newstate)
85 {
86 	Relation	pgrel;
87 	HeapTuple	tuple;
88 
89 	Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
90 
91 	/*
92 	 * Update relation's pg_class entry.  Crucial side-effect: other backends
93 	 * (and this one too!) are sent SI message to make them rebuild relcache
94 	 * entries.
95 	 */
96 	pgrel = table_open(RelationRelationId, RowExclusiveLock);
97 	tuple = SearchSysCacheCopy1(RELOID,
98 								ObjectIdGetDatum(RelationGetRelid(relation)));
99 	if (!HeapTupleIsValid(tuple))
100 		elog(ERROR, "cache lookup failed for relation %u",
101 			 RelationGetRelid(relation));
102 
103 	((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate;
104 
105 	CatalogTupleUpdate(pgrel, &tuple->t_self, tuple);
106 
107 	heap_freetuple(tuple);
108 	table_close(pgrel, RowExclusiveLock);
109 
110 	/*
111 	 * Advance command counter to make the updated pg_class row locally
112 	 * visible.
113 	 */
114 	CommandCounterIncrement();
115 }
116 
117 /*
118  * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
119  *
120  * This refreshes the materialized view by creating a new table and swapping
121  * the relfilenodes of the new table and the old materialized view, so the OID
122  * of the original materialized view is preserved. Thus we do not lose GRANT
123  * nor references to this materialized view.
124  *
125  * If WITH NO DATA was specified, this is effectively like a TRUNCATE;
126  * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT
127  * statement associated with the materialized view.  The statement node's
128  * skipData field shows whether the clause was used.
129  *
130  * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
131  * the new heap, it's better to create the indexes afterwards than to fill them
132  * incrementally while we load.
133  *
134  * The matview's "populated" state is changed based on whether the contents
135  * reflect the result set of the materialized view's query.
136  */
137 ObjectAddress
ExecRefreshMatView(RefreshMatViewStmt * stmt,const char * queryString,ParamListInfo params,QueryCompletion * qc)138 ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
139 				   ParamListInfo params, QueryCompletion *qc)
140 {
141 	Oid			matviewOid;
142 	Relation	matviewRel;
143 	RewriteRule *rule;
144 	List	   *actions;
145 	Query	   *dataQuery;
146 	Oid			tableSpace;
147 	Oid			relowner;
148 	Oid			OIDNewHeap;
149 	DestReceiver *dest;
150 	uint64		processed = 0;
151 	bool		concurrent;
152 	LOCKMODE	lockmode;
153 	char		relpersistence;
154 	Oid			save_userid;
155 	int			save_sec_context;
156 	int			save_nestlevel;
157 	ObjectAddress address;
158 
159 	/* Determine strength of lock needed. */
160 	concurrent = stmt->concurrent;
161 	lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock;
162 
163 	/*
164 	 * Get a lock until end of transaction.
165 	 */
166 	matviewOid = RangeVarGetRelidExtended(stmt->relation,
167 										  lockmode, 0,
168 										  RangeVarCallbackOwnsTable, NULL);
169 	matviewRel = table_open(matviewOid, NoLock);
170 
171 	/* Make sure it is a materialized view. */
172 	if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
173 		ereport(ERROR,
174 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
175 				 errmsg("\"%s\" is not a materialized view",
176 						RelationGetRelationName(matviewRel))));
177 
178 	/* Check that CONCURRENTLY is not specified if not populated. */
179 	if (concurrent && !RelationIsPopulated(matviewRel))
180 		ereport(ERROR,
181 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
182 				 errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
183 
184 	/* Check that conflicting options have not been specified. */
185 	if (concurrent && stmt->skipData)
186 		ereport(ERROR,
187 				(errcode(ERRCODE_SYNTAX_ERROR),
188 				 errmsg("%s and %s options cannot be used together",
189 						"CONCURRENTLY", "WITH NO DATA")));
190 
191 	/*
192 	 * Check that everything is correct for a refresh. Problems at this point
193 	 * are internal errors, so elog is sufficient.
194 	 */
195 	if (matviewRel->rd_rel->relhasrules == false ||
196 		matviewRel->rd_rules->numLocks < 1)
197 		elog(ERROR,
198 			 "materialized view \"%s\" is missing rewrite information",
199 			 RelationGetRelationName(matviewRel));
200 
201 	if (matviewRel->rd_rules->numLocks > 1)
202 		elog(ERROR,
203 			 "materialized view \"%s\" has too many rules",
204 			 RelationGetRelationName(matviewRel));
205 
206 	rule = matviewRel->rd_rules->rules[0];
207 	if (rule->event != CMD_SELECT || !(rule->isInstead))
208 		elog(ERROR,
209 			 "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
210 			 RelationGetRelationName(matviewRel));
211 
212 	actions = rule->actions;
213 	if (list_length(actions) != 1)
214 		elog(ERROR,
215 			 "the rule for materialized view \"%s\" is not a single action",
216 			 RelationGetRelationName(matviewRel));
217 
218 	/*
219 	 * Check that there is a unique index with no WHERE clause on one or more
220 	 * columns of the materialized view if CONCURRENTLY is specified.
221 	 */
222 	if (concurrent)
223 	{
224 		List	   *indexoidlist = RelationGetIndexList(matviewRel);
225 		ListCell   *indexoidscan;
226 		bool		hasUniqueIndex = false;
227 
228 		foreach(indexoidscan, indexoidlist)
229 		{
230 			Oid			indexoid = lfirst_oid(indexoidscan);
231 			Relation	indexRel;
232 
233 			indexRel = index_open(indexoid, AccessShareLock);
234 			hasUniqueIndex = is_usable_unique_index(indexRel);
235 			index_close(indexRel, AccessShareLock);
236 			if (hasUniqueIndex)
237 				break;
238 		}
239 
240 		list_free(indexoidlist);
241 
242 		if (!hasUniqueIndex)
243 			ereport(ERROR,
244 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
245 					 errmsg("cannot refresh materialized view \"%s\" concurrently",
246 							quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
247 													   RelationGetRelationName(matviewRel))),
248 					 errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
249 	}
250 
251 	/*
252 	 * The stored query was rewritten at the time of the MV definition, but
253 	 * has not been scribbled on by the planner.
254 	 */
255 	dataQuery = linitial_node(Query, actions);
256 
257 	/*
258 	 * Check for active uses of the relation in the current transaction, such
259 	 * as open scans.
260 	 *
261 	 * NB: We count on this to protect us against problems with refreshing the
262 	 * data using TABLE_INSERT_FROZEN.
263 	 */
264 	CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
265 
266 	/*
267 	 * Tentatively mark the matview as populated or not (this will roll back
268 	 * if we fail later).
269 	 */
270 	SetMatViewPopulatedState(matviewRel, !stmt->skipData);
271 
272 	relowner = matviewRel->rd_rel->relowner;
273 
274 	/*
275 	 * Switch to the owner's userid, so that any functions are run as that
276 	 * user.  Also arrange to make GUC variable changes local to this command.
277 	 * Don't lock it down too tight to create a temporary table just yet.  We
278 	 * will switch modes when we are about to execute user code.
279 	 */
280 	GetUserIdAndSecContext(&save_userid, &save_sec_context);
281 	SetUserIdAndSecContext(relowner,
282 						   save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
283 	save_nestlevel = NewGUCNestLevel();
284 
285 	/* Concurrent refresh builds new data in temp tablespace, and does diff. */
286 	if (concurrent)
287 	{
288 		tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP, false);
289 		relpersistence = RELPERSISTENCE_TEMP;
290 	}
291 	else
292 	{
293 		tableSpace = matviewRel->rd_rel->reltablespace;
294 		relpersistence = matviewRel->rd_rel->relpersistence;
295 	}
296 
297 	/*
298 	 * Create the transient table that will receive the regenerated data. Lock
299 	 * it against access by any other process until commit (by which time it
300 	 * will be gone).
301 	 */
302 	OIDNewHeap = make_new_heap(matviewOid, tableSpace, relpersistence,
303 							   ExclusiveLock);
304 	LockRelationOid(OIDNewHeap, AccessExclusiveLock);
305 	dest = CreateTransientRelDestReceiver(OIDNewHeap);
306 
307 	/*
308 	 * Now lock down security-restricted operations.
309 	 */
310 	SetUserIdAndSecContext(relowner,
311 						   save_sec_context | SECURITY_RESTRICTED_OPERATION);
312 
313 	/* Generate the data, if wanted. */
314 	if (!stmt->skipData)
315 		processed = refresh_matview_datafill(dest, dataQuery, queryString);
316 
317 	/* Make the matview match the newly generated data. */
318 	if (concurrent)
319 	{
320 		int			old_depth = matview_maintenance_depth;
321 
322 		PG_TRY();
323 		{
324 			refresh_by_match_merge(matviewOid, OIDNewHeap, relowner,
325 								   save_sec_context);
326 		}
327 		PG_CATCH();
328 		{
329 			matview_maintenance_depth = old_depth;
330 			PG_RE_THROW();
331 		}
332 		PG_END_TRY();
333 		Assert(matview_maintenance_depth == old_depth);
334 	}
335 	else
336 	{
337 		refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
338 
339 		/*
340 		 * Inform stats collector about our activity: basically, we truncated
341 		 * the matview and inserted some new data.  (The concurrent code path
342 		 * above doesn't need to worry about this because the inserts and
343 		 * deletes it issues get counted by lower-level code.)
344 		 */
345 		pgstat_count_truncate(matviewRel);
346 		if (!stmt->skipData)
347 			pgstat_count_heap_insert(matviewRel, processed);
348 	}
349 
350 	table_close(matviewRel, NoLock);
351 
352 	/* Roll back any GUC changes */
353 	AtEOXact_GUC(false, save_nestlevel);
354 
355 	/* Restore userid and security context */
356 	SetUserIdAndSecContext(save_userid, save_sec_context);
357 
358 	ObjectAddressSet(address, RelationRelationId, matviewOid);
359 
360 	return address;
361 }
362 
363 /*
364  * refresh_matview_datafill
365  *
366  * Execute the given query, sending result rows to "dest" (which will
367  * insert them into the target matview).
368  *
369  * Returns number of rows inserted.
370  */
371 static uint64
refresh_matview_datafill(DestReceiver * dest,Query * query,const char * queryString)372 refresh_matview_datafill(DestReceiver *dest, Query *query,
373 						 const char *queryString)
374 {
375 	List	   *rewritten;
376 	PlannedStmt *plan;
377 	QueryDesc  *queryDesc;
378 	Query	   *copied_query;
379 	uint64		processed;
380 
381 	/* Lock and rewrite, using a copy to preserve the original query. */
382 	copied_query = copyObject(query);
383 	AcquireRewriteLocks(copied_query, true, false);
384 	rewritten = QueryRewrite(copied_query);
385 
386 	/* SELECT should never rewrite to more or less than one SELECT query */
387 	if (list_length(rewritten) != 1)
388 		elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
389 	query = (Query *) linitial(rewritten);
390 
391 	/* Check for user-requested abort. */
392 	CHECK_FOR_INTERRUPTS();
393 
394 	/* Plan the query which will generate data for the refresh. */
395 	plan = pg_plan_query(query, queryString, 0, NULL);
396 
397 	/*
398 	 * Use a snapshot with an updated command ID to ensure this query sees
399 	 * results of any previously executed queries.  (This could only matter if
400 	 * the planner executed an allegedly-stable function that changed the
401 	 * database contents, but let's do it anyway to be safe.)
402 	 */
403 	PushCopiedSnapshot(GetActiveSnapshot());
404 	UpdateActiveSnapshotCommandId();
405 
406 	/* Create a QueryDesc, redirecting output to our tuple receiver */
407 	queryDesc = CreateQueryDesc(plan, queryString,
408 								GetActiveSnapshot(), InvalidSnapshot,
409 								dest, NULL, NULL, 0);
410 
411 	/* call ExecutorStart to prepare the plan for execution */
412 	ExecutorStart(queryDesc, 0);
413 
414 	/* run the plan */
415 	ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
416 
417 	processed = queryDesc->estate->es_processed;
418 
419 	/* and clean up */
420 	ExecutorFinish(queryDesc);
421 	ExecutorEnd(queryDesc);
422 
423 	FreeQueryDesc(queryDesc);
424 
425 	PopActiveSnapshot();
426 
427 	return processed;
428 }
429 
430 DestReceiver *
CreateTransientRelDestReceiver(Oid transientoid)431 CreateTransientRelDestReceiver(Oid transientoid)
432 {
433 	DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel));
434 
435 	self->pub.receiveSlot = transientrel_receive;
436 	self->pub.rStartup = transientrel_startup;
437 	self->pub.rShutdown = transientrel_shutdown;
438 	self->pub.rDestroy = transientrel_destroy;
439 	self->pub.mydest = DestTransientRel;
440 	self->transientoid = transientoid;
441 
442 	return (DestReceiver *) self;
443 }
444 
445 /*
446  * transientrel_startup --- executor startup
447  */
448 static void
transientrel_startup(DestReceiver * self,int operation,TupleDesc typeinfo)449 transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
450 {
451 	DR_transientrel *myState = (DR_transientrel *) self;
452 	Relation	transientrel;
453 
454 	transientrel = table_open(myState->transientoid, NoLock);
455 
456 	/*
457 	 * Fill private fields of myState for use by later routines
458 	 */
459 	myState->transientrel = transientrel;
460 	myState->output_cid = GetCurrentCommandId(true);
461 	myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
462 	myState->bistate = GetBulkInsertState();
463 
464 	/*
465 	 * Valid smgr_targblock implies something already wrote to the relation.
466 	 * This may be harmless, but this function hasn't planned for it.
467 	 */
468 	Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber);
469 }
470 
471 /*
472  * transientrel_receive --- receive one tuple
473  */
474 static bool
transientrel_receive(TupleTableSlot * slot,DestReceiver * self)475 transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
476 {
477 	DR_transientrel *myState = (DR_transientrel *) self;
478 
479 	/*
480 	 * Note that the input slot might not be of the type of the target
481 	 * relation. That's supported by table_tuple_insert(), but slightly less
482 	 * efficient than inserting with the right slot - but the alternative
483 	 * would be to copy into a slot of the right type, which would not be
484 	 * cheap either. This also doesn't allow accessing per-AM data (say a
485 	 * tuple's xmin), but since we don't do that here...
486 	 */
487 
488 	table_tuple_insert(myState->transientrel,
489 					   slot,
490 					   myState->output_cid,
491 					   myState->ti_options,
492 					   myState->bistate);
493 
494 	/* We know this is a newly created relation, so there are no indexes */
495 
496 	return true;
497 }
498 
499 /*
500  * transientrel_shutdown --- executor end
501  */
502 static void
transientrel_shutdown(DestReceiver * self)503 transientrel_shutdown(DestReceiver *self)
504 {
505 	DR_transientrel *myState = (DR_transientrel *) self;
506 
507 	FreeBulkInsertState(myState->bistate);
508 
509 	table_finish_bulk_insert(myState->transientrel, myState->ti_options);
510 
511 	/* close transientrel, but keep lock until commit */
512 	table_close(myState->transientrel, NoLock);
513 	myState->transientrel = NULL;
514 }
515 
516 /*
517  * transientrel_destroy --- release DestReceiver object
518  */
519 static void
transientrel_destroy(DestReceiver * self)520 transientrel_destroy(DestReceiver *self)
521 {
522 	pfree(self);
523 }
524 
525 
526 /*
527  * Given a qualified temporary table name, append an underscore followed by
528  * the given integer, to make a new table name based on the old one.
529  * The result is a palloc'd string.
530  *
531  * As coded, this would fail to make a valid SQL name if the given name were,
532  * say, "FOO"."BAR".  Currently, the table name portion of the input will
533  * never be double-quoted because it's of the form "pg_temp_NNN", cf
534  * make_new_heap().  But we might have to work harder someday.
535  */
536 static char *
make_temptable_name_n(char * tempname,int n)537 make_temptable_name_n(char *tempname, int n)
538 {
539 	StringInfoData namebuf;
540 
541 	initStringInfo(&namebuf);
542 	appendStringInfoString(&namebuf, tempname);
543 	appendStringInfo(&namebuf, "_%d", n);
544 	return namebuf.data;
545 }
546 
547 /*
548  * refresh_by_match_merge
549  *
550  * Refresh a materialized view with transactional semantics, while allowing
551  * concurrent reads.
552  *
553  * This is called after a new version of the data has been created in a
554  * temporary table.  It performs a full outer join against the old version of
555  * the data, producing "diff" results.  This join cannot work if there are any
556  * duplicated rows in either the old or new versions, in the sense that every
557  * column would compare as equal between the two rows.  It does work correctly
558  * in the face of rows which have at least one NULL value, with all non-NULL
559  * columns equal.  The behavior of NULLs on equality tests and on UNIQUE
560  * indexes turns out to be quite convenient here; the tests we need to make
561  * are consistent with default behavior.  If there is at least one UNIQUE
562  * index on the materialized view, we have exactly the guarantee we need.
563  *
564  * The temporary table used to hold the diff results contains just the TID of
565  * the old record (if matched) and the ROW from the new table as a single
566  * column of complex record type (if matched).
567  *
568  * Once we have the diff table, we perform set-based DELETE and INSERT
569  * operations against the materialized view, and discard both temporary
570  * tables.
571  *
572  * Everything from the generation of the new data to applying the differences
573  * takes place under cover of an ExclusiveLock, since it seems as though we
574  * would want to prohibit not only concurrent REFRESH operations, but also
575  * incremental maintenance.  It also doesn't seem reasonable or safe to allow
576  * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
577  * this command.
578  */
579 static void
refresh_by_match_merge(Oid matviewOid,Oid tempOid,Oid relowner,int save_sec_context)580 refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
581 					   int save_sec_context)
582 {
583 	StringInfoData querybuf;
584 	Relation	matviewRel;
585 	Relation	tempRel;
586 	char	   *matviewname;
587 	char	   *tempname;
588 	char	   *diffname;
589 	TupleDesc	tupdesc;
590 	bool		foundUniqueIndex;
591 	List	   *indexoidlist;
592 	ListCell   *indexoidscan;
593 	int16		relnatts;
594 	Oid		   *opUsedForQual;
595 
596 	initStringInfo(&querybuf);
597 	matviewRel = table_open(matviewOid, NoLock);
598 	matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
599 											 RelationGetRelationName(matviewRel));
600 	tempRel = table_open(tempOid, NoLock);
601 	tempname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(tempRel)),
602 										  RelationGetRelationName(tempRel));
603 	diffname = make_temptable_name_n(tempname, 2);
604 
605 	relnatts = RelationGetNumberOfAttributes(matviewRel);
606 
607 	/* Open SPI context. */
608 	if (SPI_connect() != SPI_OK_CONNECT)
609 		elog(ERROR, "SPI_connect failed");
610 
611 	/* Analyze the temp table with the new contents. */
612 	appendStringInfo(&querybuf, "ANALYZE %s", tempname);
613 	if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
614 		elog(ERROR, "SPI_exec failed: %s", querybuf.data);
615 
616 	/*
617 	 * We need to ensure that there are not duplicate rows without NULLs in
618 	 * the new data set before we can count on the "diff" results.  Check for
619 	 * that in a way that allows showing the first duplicated row found.  Even
620 	 * after we pass this test, a unique index on the materialized view may
621 	 * find a duplicate key problem.
622 	 *
623 	 * Note: here and below, we use "tablename.*::tablerowtype" as a hack to
624 	 * keep ".*" from being expanded into multiple columns in a SELECT list.
625 	 * Compare ruleutils.c's get_variable().
626 	 */
627 	resetStringInfo(&querybuf);
628 	appendStringInfo(&querybuf,
629 					 "SELECT newdata.*::%s FROM %s newdata "
630 					 "WHERE newdata.* IS NOT NULL AND EXISTS "
631 					 "(SELECT 1 FROM %s newdata2 WHERE newdata2.* IS NOT NULL "
632 					 "AND newdata2.* OPERATOR(pg_catalog.*=) newdata.* "
633 					 "AND newdata2.ctid OPERATOR(pg_catalog.<>) "
634 					 "newdata.ctid)",
635 					 tempname, tempname, tempname);
636 	if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT)
637 		elog(ERROR, "SPI_exec failed: %s", querybuf.data);
638 	if (SPI_processed > 0)
639 	{
640 		/*
641 		 * Note that this ereport() is returning data to the user.  Generally,
642 		 * we would want to make sure that the user has been granted access to
643 		 * this data.  However, REFRESH MAT VIEW is only able to be run by the
644 		 * owner of the mat view (or a superuser) and therefore there is no
645 		 * need to check for access to data in the mat view.
646 		 */
647 		ereport(ERROR,
648 				(errcode(ERRCODE_CARDINALITY_VIOLATION),
649 				 errmsg("new data for materialized view \"%s\" contains duplicate rows without any null columns",
650 						RelationGetRelationName(matviewRel)),
651 				 errdetail("Row: %s",
652 						   SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1))));
653 	}
654 
655 	SetUserIdAndSecContext(relowner,
656 						   save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
657 
658 	/* Start building the query for creating the diff table. */
659 	resetStringInfo(&querybuf);
660 	appendStringInfo(&querybuf,
661 					 "CREATE TEMP TABLE %s AS "
662 					 "SELECT mv.ctid AS tid, newdata.*::%s AS newdata "
663 					 "FROM %s mv FULL JOIN %s newdata ON (",
664 					 diffname, tempname, matviewname, tempname);
665 
666 	/*
667 	 * Get the list of index OIDs for the table from the relcache, and look up
668 	 * each one in the pg_index syscache.  We will test for equality on all
669 	 * columns present in all unique indexes which only reference columns and
670 	 * include all rows.
671 	 */
672 	tupdesc = matviewRel->rd_att;
673 	opUsedForQual = (Oid *) palloc0(sizeof(Oid) * relnatts);
674 	foundUniqueIndex = false;
675 
676 	indexoidlist = RelationGetIndexList(matviewRel);
677 
678 	foreach(indexoidscan, indexoidlist)
679 	{
680 		Oid			indexoid = lfirst_oid(indexoidscan);
681 		Relation	indexRel;
682 
683 		indexRel = index_open(indexoid, RowExclusiveLock);
684 		if (is_usable_unique_index(indexRel))
685 		{
686 			Form_pg_index indexStruct = indexRel->rd_index;
687 			int			indnkeyatts = indexStruct->indnkeyatts;
688 			oidvector  *indclass;
689 			Datum		indclassDatum;
690 			bool		isnull;
691 			int			i;
692 
693 			/* Must get indclass the hard way. */
694 			indclassDatum = SysCacheGetAttr(INDEXRELID,
695 											indexRel->rd_indextuple,
696 											Anum_pg_index_indclass,
697 											&isnull);
698 			Assert(!isnull);
699 			indclass = (oidvector *) DatumGetPointer(indclassDatum);
700 
701 			/* Add quals for all columns from this index. */
702 			for (i = 0; i < indnkeyatts; i++)
703 			{
704 				int			attnum = indexStruct->indkey.values[i];
705 				Oid			opclass = indclass->values[i];
706 				Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
707 				Oid			attrtype = attr->atttypid;
708 				HeapTuple	cla_ht;
709 				Form_pg_opclass cla_tup;
710 				Oid			opfamily;
711 				Oid			opcintype;
712 				Oid			op;
713 				const char *leftop;
714 				const char *rightop;
715 
716 				/*
717 				 * Identify the equality operator associated with this index
718 				 * column.  First we need to look up the column's opclass.
719 				 */
720 				cla_ht = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
721 				if (!HeapTupleIsValid(cla_ht))
722 					elog(ERROR, "cache lookup failed for opclass %u", opclass);
723 				cla_tup = (Form_pg_opclass) GETSTRUCT(cla_ht);
724 				Assert(cla_tup->opcmethod == BTREE_AM_OID);
725 				opfamily = cla_tup->opcfamily;
726 				opcintype = cla_tup->opcintype;
727 				ReleaseSysCache(cla_ht);
728 
729 				op = get_opfamily_member(opfamily, opcintype, opcintype,
730 										 BTEqualStrategyNumber);
731 				if (!OidIsValid(op))
732 					elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
733 						 BTEqualStrategyNumber, opcintype, opcintype, opfamily);
734 
735 				/*
736 				 * If we find the same column with the same equality semantics
737 				 * in more than one index, we only need to emit the equality
738 				 * clause once.
739 				 *
740 				 * Since we only remember the last equality operator, this
741 				 * code could be fooled into emitting duplicate clauses given
742 				 * multiple indexes with several different opclasses ... but
743 				 * that's so unlikely it doesn't seem worth spending extra
744 				 * code to avoid.
745 				 */
746 				if (opUsedForQual[attnum - 1] == op)
747 					continue;
748 				opUsedForQual[attnum - 1] = op;
749 
750 				/*
751 				 * Actually add the qual, ANDed with any others.
752 				 */
753 				if (foundUniqueIndex)
754 					appendStringInfoString(&querybuf, " AND ");
755 
756 				leftop = quote_qualified_identifier("newdata",
757 													NameStr(attr->attname));
758 				rightop = quote_qualified_identifier("mv",
759 													 NameStr(attr->attname));
760 
761 				generate_operator_clause(&querybuf,
762 										 leftop, attrtype,
763 										 op,
764 										 rightop, attrtype);
765 
766 				foundUniqueIndex = true;
767 			}
768 		}
769 
770 		/* Keep the locks, since we're about to run DML which needs them. */
771 		index_close(indexRel, NoLock);
772 	}
773 
774 	list_free(indexoidlist);
775 
776 	/*
777 	 * There must be at least one usable unique index on the matview.
778 	 *
779 	 * ExecRefreshMatView() checks that after taking the exclusive lock on the
780 	 * matview. So at least one unique index is guaranteed to exist here
781 	 * because the lock is still being held; so an Assert seems sufficient.
782 	 */
783 	Assert(foundUniqueIndex);
784 
785 	appendStringInfoString(&querybuf,
786 						   " AND newdata.* OPERATOR(pg_catalog.*=) mv.*) "
787 						   "WHERE newdata.* IS NULL OR mv.* IS NULL "
788 						   "ORDER BY tid");
789 
790 	/* Create the temporary "diff" table. */
791 	if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
792 		elog(ERROR, "SPI_exec failed: %s", querybuf.data);
793 
794 	SetUserIdAndSecContext(relowner,
795 						   save_sec_context | SECURITY_RESTRICTED_OPERATION);
796 
797 	/*
798 	 * We have no further use for data from the "full-data" temp table, but we
799 	 * must keep it around because its type is referenced from the diff table.
800 	 */
801 
802 	/* Analyze the diff table. */
803 	resetStringInfo(&querybuf);
804 	appendStringInfo(&querybuf, "ANALYZE %s", diffname);
805 	if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
806 		elog(ERROR, "SPI_exec failed: %s", querybuf.data);
807 
808 	OpenMatViewIncrementalMaintenance();
809 
810 	/* Deletes must come before inserts; do them first. */
811 	resetStringInfo(&querybuf);
812 	appendStringInfo(&querybuf,
813 					 "DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY "
814 					 "(SELECT diff.tid FROM %s diff "
815 					 "WHERE diff.tid IS NOT NULL "
816 					 "AND diff.newdata IS NULL)",
817 					 matviewname, diffname);
818 	if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
819 		elog(ERROR, "SPI_exec failed: %s", querybuf.data);
820 
821 	/* Inserts go last. */
822 	resetStringInfo(&querybuf);
823 	appendStringInfo(&querybuf,
824 					 "INSERT INTO %s SELECT (diff.newdata).* "
825 					 "FROM %s diff WHERE tid IS NULL",
826 					 matviewname, diffname);
827 	if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
828 		elog(ERROR, "SPI_exec failed: %s", querybuf.data);
829 
830 	/* We're done maintaining the materialized view. */
831 	CloseMatViewIncrementalMaintenance();
832 	table_close(tempRel, NoLock);
833 	table_close(matviewRel, NoLock);
834 
835 	/* Clean up temp tables. */
836 	resetStringInfo(&querybuf);
837 	appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname);
838 	if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
839 		elog(ERROR, "SPI_exec failed: %s", querybuf.data);
840 
841 	/* Close SPI context. */
842 	if (SPI_finish() != SPI_OK_FINISH)
843 		elog(ERROR, "SPI_finish failed");
844 }
845 
846 /*
847  * Swap the physical files of the target and transient tables, then rebuild
848  * the target's indexes and throw away the transient table.  Security context
849  * swapping is handled by the called function, so it is not needed here.
850  */
851 static void
refresh_by_heap_swap(Oid matviewOid,Oid OIDNewHeap,char relpersistence)852 refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
853 {
854 	finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
855 					 RecentXmin, ReadNextMultiXactId(), relpersistence);
856 }
857 
858 /*
859  * Check whether specified index is usable for match merge.
860  */
861 static bool
is_usable_unique_index(Relation indexRel)862 is_usable_unique_index(Relation indexRel)
863 {
864 	Form_pg_index indexStruct = indexRel->rd_index;
865 
866 	/*
867 	 * Must be unique, valid, immediate, non-partial, and be defined over
868 	 * plain user columns (not expressions).  We also require it to be a
869 	 * btree.  Even if we had any other unique index kinds, we'd not know how
870 	 * to identify the corresponding equality operator, nor could we be sure
871 	 * that the planner could implement the required FULL JOIN with non-btree
872 	 * operators.
873 	 */
874 	if (indexStruct->indisunique &&
875 		indexStruct->indimmediate &&
876 		indexRel->rd_rel->relam == BTREE_AM_OID &&
877 		indexStruct->indisvalid &&
878 		RelationGetIndexPredicate(indexRel) == NIL &&
879 		indexStruct->indnatts > 0)
880 	{
881 		/*
882 		 * The point of groveling through the index columns individually is to
883 		 * reject both index expressions and system columns.  Currently,
884 		 * matviews couldn't have OID columns so there's no way to create an
885 		 * index on a system column; but maybe someday that wouldn't be true,
886 		 * so let's be safe.
887 		 */
888 		int			numatts = indexStruct->indnatts;
889 		int			i;
890 
891 		for (i = 0; i < numatts; i++)
892 		{
893 			int			attnum = indexStruct->indkey.values[i];
894 
895 			if (attnum <= 0)
896 				return false;
897 		}
898 		return true;
899 	}
900 	return false;
901 }
902 
903 
904 /*
905  * This should be used to test whether the backend is in a context where it is
906  * OK to allow DML statements to modify materialized views.  We only want to
907  * allow that for internal code driven by the materialized view definition,
908  * not for arbitrary user-supplied code.
909  *
910  * While the function names reflect the fact that their main intended use is
911  * incremental maintenance of materialized views (in response to changes to
912  * the data in referenced relations), they are initially used to allow REFRESH
913  * without blocking concurrent reads.
914  */
915 bool
MatViewIncrementalMaintenanceIsEnabled(void)916 MatViewIncrementalMaintenanceIsEnabled(void)
917 {
918 	return matview_maintenance_depth > 0;
919 }
920 
921 static void
OpenMatViewIncrementalMaintenance(void)922 OpenMatViewIncrementalMaintenance(void)
923 {
924 	matview_maintenance_depth++;
925 }
926 
927 static void
CloseMatViewIncrementalMaintenance(void)928 CloseMatViewIncrementalMaintenance(void)
929 {
930 	matview_maintenance_depth--;
931 	Assert(matview_maintenance_depth >= 0);
932 }
933