1 /*-------------------------------------------------------------------------
2  *
3  * matview.c
4  *	  materialized view support
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *	  src/backend/commands/matview.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "access/htup_details.h"
18 #include "access/multixact.h"
19 #include "access/xact.h"
20 #include "access/xlog.h"
21 #include "catalog/catalog.h"
22 #include "catalog/indexing.h"
23 #include "catalog/namespace.h"
24 #include "catalog/pg_am.h"
25 #include "catalog/pg_opclass.h"
26 #include "catalog/pg_operator.h"
27 #include "commands/cluster.h"
28 #include "commands/matview.h"
29 #include "commands/tablecmds.h"
30 #include "commands/tablespace.h"
31 #include "executor/executor.h"
32 #include "executor/spi.h"
33 #include "miscadmin.h"
34 #include "parser/parse_relation.h"
35 #include "pgstat.h"
36 #include "rewrite/rewriteHandler.h"
37 #include "storage/lmgr.h"
38 #include "storage/smgr.h"
39 #include "tcop/tcopprot.h"
40 #include "utils/builtins.h"
41 #include "utils/lsyscache.h"
42 #include "utils/rel.h"
43 #include "utils/snapmgr.h"
44 #include "utils/syscache.h"
45 
46 
47 typedef struct
48 {
49 	DestReceiver pub;			/* publicly-known function pointers */
50 	Oid			transientoid;	/* OID of new heap into which to store */
51 	/* These fields are filled by transientrel_startup: */
52 	Relation	transientrel;	/* relation to write to */
53 	CommandId	output_cid;		/* cmin to insert in output tuples */
54 	int			hi_options;		/* heap_insert performance options */
55 	BulkInsertState bistate;	/* bulk insert state */
56 } DR_transientrel;
57 
58 static int	matview_maintenance_depth = 0;
59 
60 static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
61 static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
62 static void transientrel_shutdown(DestReceiver *self);
63 static void transientrel_destroy(DestReceiver *self);
64 static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query,
65 						 const char *queryString);
66 static char *make_temptable_name_n(char *tempname, int n);
67 static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
68 					   int save_sec_context);
69 static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence);
70 static bool is_usable_unique_index(Relation indexRel);
71 static void OpenMatViewIncrementalMaintenance(void);
72 static void CloseMatViewIncrementalMaintenance(void);
73 
74 /*
75  * SetMatViewPopulatedState
76  *		Mark a materialized view as populated, or not.
77  *
78  * NOTE: caller must be holding an appropriate lock on the relation.
79  */
80 void
SetMatViewPopulatedState(Relation relation,bool newstate)81 SetMatViewPopulatedState(Relation relation, bool newstate)
82 {
83 	Relation	pgrel;
84 	HeapTuple	tuple;
85 
86 	Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
87 
88 	/*
89 	 * Update relation's pg_class entry.  Crucial side-effect: other backends
90 	 * (and this one too!) are sent SI message to make them rebuild relcache
91 	 * entries.
92 	 */
93 	pgrel = heap_open(RelationRelationId, RowExclusiveLock);
94 	tuple = SearchSysCacheCopy1(RELOID,
95 								ObjectIdGetDatum(RelationGetRelid(relation)));
96 	if (!HeapTupleIsValid(tuple))
97 		elog(ERROR, "cache lookup failed for relation %u",
98 			 RelationGetRelid(relation));
99 
100 	((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate;
101 
102 	CatalogTupleUpdate(pgrel, &tuple->t_self, tuple);
103 
104 	heap_freetuple(tuple);
105 	heap_close(pgrel, RowExclusiveLock);
106 
107 	/*
108 	 * Advance command counter to make the updated pg_class row locally
109 	 * visible.
110 	 */
111 	CommandCounterIncrement();
112 }
113 
114 /*
115  * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
116  *
117  * This refreshes the materialized view by creating a new table and swapping
118  * the relfilenodes of the new table and the old materialized view, so the OID
119  * of the original materialized view is preserved. Thus we do not lose GRANT
120  * nor references to this materialized view.
121  *
122  * If WITH NO DATA was specified, this is effectively like a TRUNCATE;
123  * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT
124  * statement associated with the materialized view.  The statement node's
125  * skipData field shows whether the clause was used.
126  *
127  * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
128  * the new heap, it's better to create the indexes afterwards than to fill them
129  * incrementally while we load.
130  *
131  * The matview's "populated" state is changed based on whether the contents
132  * reflect the result set of the materialized view's query.
133  */
134 ObjectAddress
ExecRefreshMatView(RefreshMatViewStmt * stmt,const char * queryString,ParamListInfo params,char * completionTag)135 ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
136 				   ParamListInfo params, char *completionTag)
137 {
138 	Oid			matviewOid;
139 	Relation	matviewRel;
140 	RewriteRule *rule;
141 	List	   *actions;
142 	Query	   *dataQuery;
143 	Oid			tableSpace;
144 	Oid			relowner;
145 	Oid			OIDNewHeap;
146 	DestReceiver *dest;
147 	uint64		processed = 0;
148 	bool		concurrent;
149 	LOCKMODE	lockmode;
150 	char		relpersistence;
151 	Oid			save_userid;
152 	int			save_sec_context;
153 	int			save_nestlevel;
154 	ObjectAddress address;
155 
156 	/* Determine strength of lock needed. */
157 	concurrent = stmt->concurrent;
158 	lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock;
159 
160 	/*
161 	 * Get a lock until end of transaction.
162 	 */
163 	matviewOid = RangeVarGetRelidExtended(stmt->relation,
164 										  lockmode, false, false,
165 										  RangeVarCallbackOwnsTable, NULL);
166 	matviewRel = heap_open(matviewOid, NoLock);
167 
168 	/* Make sure it is a materialized view. */
169 	if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
170 		ereport(ERROR,
171 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
172 				 errmsg("\"%s\" is not a materialized view",
173 						RelationGetRelationName(matviewRel))));
174 
175 	/* Check that CONCURRENTLY is not specified if not populated. */
176 	if (concurrent && !RelationIsPopulated(matviewRel))
177 		ereport(ERROR,
178 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
179 				 errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
180 
181 	/* Check that conflicting options have not been specified. */
182 	if (concurrent && stmt->skipData)
183 		ereport(ERROR,
184 				(errcode(ERRCODE_SYNTAX_ERROR),
185 				 errmsg("CONCURRENTLY and WITH NO DATA options cannot be used together")));
186 
187 	/* We don't allow an oid column for a materialized view. */
188 	Assert(!matviewRel->rd_rel->relhasoids);
189 
190 	/*
191 	 * Check that everything is correct for a refresh. Problems at this point
192 	 * are internal errors, so elog is sufficient.
193 	 */
194 	if (matviewRel->rd_rel->relhasrules == false ||
195 		matviewRel->rd_rules->numLocks < 1)
196 		elog(ERROR,
197 			 "materialized view \"%s\" is missing rewrite information",
198 			 RelationGetRelationName(matviewRel));
199 
200 	if (matviewRel->rd_rules->numLocks > 1)
201 		elog(ERROR,
202 			 "materialized view \"%s\" has too many rules",
203 			 RelationGetRelationName(matviewRel));
204 
205 	rule = matviewRel->rd_rules->rules[0];
206 	if (rule->event != CMD_SELECT || !(rule->isInstead))
207 		elog(ERROR,
208 			 "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
209 			 RelationGetRelationName(matviewRel));
210 
211 	actions = rule->actions;
212 	if (list_length(actions) != 1)
213 		elog(ERROR,
214 			 "the rule for materialized view \"%s\" is not a single action",
215 			 RelationGetRelationName(matviewRel));
216 
217 	/*
218 	 * Check that there is a unique index with no WHERE clause on one or more
219 	 * columns of the materialized view if CONCURRENTLY is specified.
220 	 */
221 	if (concurrent)
222 	{
223 		List	   *indexoidlist = RelationGetIndexList(matviewRel);
224 		ListCell   *indexoidscan;
225 		bool		hasUniqueIndex = false;
226 
227 		foreach(indexoidscan, indexoidlist)
228 		{
229 			Oid			indexoid = lfirst_oid(indexoidscan);
230 			Relation	indexRel;
231 
232 			indexRel = index_open(indexoid, AccessShareLock);
233 			hasUniqueIndex = is_usable_unique_index(indexRel);
234 			index_close(indexRel, AccessShareLock);
235 			if (hasUniqueIndex)
236 				break;
237 		}
238 
239 		list_free(indexoidlist);
240 
241 		if (!hasUniqueIndex)
242 			ereport(ERROR,
243 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
244 					 errmsg("cannot refresh materialized view \"%s\" concurrently",
245 							quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
246 													   RelationGetRelationName(matviewRel))),
247 					 errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
248 	}
249 
250 	/*
251 	 * The stored query was rewritten at the time of the MV definition, but
252 	 * has not been scribbled on by the planner.
253 	 */
254 	dataQuery = linitial_node(Query, actions);
255 
256 	/*
257 	 * Check for active uses of the relation in the current transaction, such
258 	 * as open scans.
259 	 *
260 	 * NB: We count on this to protect us against problems with refreshing the
261 	 * data using HEAP_INSERT_FROZEN.
262 	 */
263 	CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
264 
265 	/*
266 	 * Tentatively mark the matview as populated or not (this will roll back
267 	 * if we fail later).
268 	 */
269 	SetMatViewPopulatedState(matviewRel, !stmt->skipData);
270 
271 	relowner = matviewRel->rd_rel->relowner;
272 
273 	/*
274 	 * Switch to the owner's userid, so that any functions are run as that
275 	 * user.  Also arrange to make GUC variable changes local to this command.
276 	 * Don't lock it down too tight to create a temporary table just yet.  We
277 	 * will switch modes when we are about to execute user code.
278 	 */
279 	GetUserIdAndSecContext(&save_userid, &save_sec_context);
280 	SetUserIdAndSecContext(relowner,
281 						   save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
282 	save_nestlevel = NewGUCNestLevel();
283 
284 	/* Concurrent refresh builds new data in temp tablespace, and does diff. */
285 	if (concurrent)
286 	{
287 		tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP);
288 		relpersistence = RELPERSISTENCE_TEMP;
289 	}
290 	else
291 	{
292 		tableSpace = matviewRel->rd_rel->reltablespace;
293 		relpersistence = matviewRel->rd_rel->relpersistence;
294 	}
295 
296 	/*
297 	 * Create the transient table that will receive the regenerated data. Lock
298 	 * it against access by any other process until commit (by which time it
299 	 * will be gone).
300 	 */
301 	OIDNewHeap = make_new_heap(matviewOid, tableSpace, relpersistence,
302 							   ExclusiveLock);
303 	LockRelationOid(OIDNewHeap, AccessExclusiveLock);
304 	dest = CreateTransientRelDestReceiver(OIDNewHeap);
305 
306 	/*
307 	 * Now lock down security-restricted operations.
308 	 */
309 	SetUserIdAndSecContext(relowner,
310 						   save_sec_context | SECURITY_RESTRICTED_OPERATION);
311 
312 	/* Generate the data, if wanted. */
313 	if (!stmt->skipData)
314 		processed = refresh_matview_datafill(dest, dataQuery, queryString);
315 
316 	/* Make the matview match the newly generated data. */
317 	if (concurrent)
318 	{
319 		int			old_depth = matview_maintenance_depth;
320 
321 		PG_TRY();
322 		{
323 			refresh_by_match_merge(matviewOid, OIDNewHeap, relowner,
324 								   save_sec_context);
325 		}
326 		PG_CATCH();
327 		{
328 			matview_maintenance_depth = old_depth;
329 			PG_RE_THROW();
330 		}
331 		PG_END_TRY();
332 		Assert(matview_maintenance_depth == old_depth);
333 	}
334 	else
335 	{
336 		refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
337 
338 		/*
339 		 * Inform stats collector about our activity: basically, we truncated
340 		 * the matview and inserted some new data.  (The concurrent code path
341 		 * above doesn't need to worry about this because the inserts and
342 		 * deletes it issues get counted by lower-level code.)
343 		 */
344 		pgstat_count_truncate(matviewRel);
345 		if (!stmt->skipData)
346 			pgstat_count_heap_insert(matviewRel, processed);
347 	}
348 
349 	heap_close(matviewRel, NoLock);
350 
351 	/* Roll back any GUC changes */
352 	AtEOXact_GUC(false, save_nestlevel);
353 
354 	/* Restore userid and security context */
355 	SetUserIdAndSecContext(save_userid, save_sec_context);
356 
357 	ObjectAddressSet(address, RelationRelationId, matviewOid);
358 
359 	return address;
360 }
361 
362 /*
363  * refresh_matview_datafill
364  *
365  * Execute the given query, sending result rows to "dest" (which will
366  * insert them into the target matview).
367  *
368  * Returns number of rows inserted.
369  */
370 static uint64
refresh_matview_datafill(DestReceiver * dest,Query * query,const char * queryString)371 refresh_matview_datafill(DestReceiver *dest, Query *query,
372 						 const char *queryString)
373 {
374 	List	   *rewritten;
375 	PlannedStmt *plan;
376 	QueryDesc  *queryDesc;
377 	Query	   *copied_query;
378 	uint64		processed;
379 
380 	/* Lock and rewrite, using a copy to preserve the original query. */
381 	copied_query = copyObject(query);
382 	AcquireRewriteLocks(copied_query, true, false);
383 	rewritten = QueryRewrite(copied_query);
384 
385 	/* SELECT should never rewrite to more or less than one SELECT query */
386 	if (list_length(rewritten) != 1)
387 		elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
388 	query = (Query *) linitial(rewritten);
389 
390 	/* Check for user-requested abort. */
391 	CHECK_FOR_INTERRUPTS();
392 
393 	/* Plan the query which will generate data for the refresh. */
394 	plan = pg_plan_query(query, 0, NULL);
395 
396 	/*
397 	 * Use a snapshot with an updated command ID to ensure this query sees
398 	 * results of any previously executed queries.  (This could only matter if
399 	 * the planner executed an allegedly-stable function that changed the
400 	 * database contents, but let's do it anyway to be safe.)
401 	 */
402 	PushCopiedSnapshot(GetActiveSnapshot());
403 	UpdateActiveSnapshotCommandId();
404 
405 	/* Create a QueryDesc, redirecting output to our tuple receiver */
406 	queryDesc = CreateQueryDesc(plan, queryString,
407 								GetActiveSnapshot(), InvalidSnapshot,
408 								dest, NULL, NULL, 0);
409 
410 	/* call ExecutorStart to prepare the plan for execution */
411 	ExecutorStart(queryDesc, EXEC_FLAG_WITHOUT_OIDS);
412 
413 	/* run the plan */
414 	ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
415 
416 	processed = queryDesc->estate->es_processed;
417 
418 	/* and clean up */
419 	ExecutorFinish(queryDesc);
420 	ExecutorEnd(queryDesc);
421 
422 	FreeQueryDesc(queryDesc);
423 
424 	PopActiveSnapshot();
425 
426 	return processed;
427 }
428 
429 DestReceiver *
CreateTransientRelDestReceiver(Oid transientoid)430 CreateTransientRelDestReceiver(Oid transientoid)
431 {
432 	DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel));
433 
434 	self->pub.receiveSlot = transientrel_receive;
435 	self->pub.rStartup = transientrel_startup;
436 	self->pub.rShutdown = transientrel_shutdown;
437 	self->pub.rDestroy = transientrel_destroy;
438 	self->pub.mydest = DestTransientRel;
439 	self->transientoid = transientoid;
440 
441 	return (DestReceiver *) self;
442 }
443 
444 /*
445  * transientrel_startup --- executor startup
446  */
447 static void
transientrel_startup(DestReceiver * self,int operation,TupleDesc typeinfo)448 transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
449 {
450 	DR_transientrel *myState = (DR_transientrel *) self;
451 	Relation	transientrel;
452 
453 	transientrel = heap_open(myState->transientoid, NoLock);
454 
455 	/*
456 	 * Fill private fields of myState for use by later routines
457 	 */
458 	myState->transientrel = transientrel;
459 	myState->output_cid = GetCurrentCommandId(true);
460 
461 	/*
462 	 * We can skip WAL-logging the insertions, unless PITR or streaming
463 	 * replication is in use. We can skip the FSM in any case.
464 	 */
465 	myState->hi_options = HEAP_INSERT_SKIP_FSM | HEAP_INSERT_FROZEN;
466 	if (!XLogIsNeeded())
467 		myState->hi_options |= HEAP_INSERT_SKIP_WAL;
468 	myState->bistate = GetBulkInsertState();
469 
470 	/* Not using WAL requires smgr_targblock be initially invalid */
471 	Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber);
472 }
473 
474 /*
475  * transientrel_receive --- receive one tuple
476  */
477 static bool
transientrel_receive(TupleTableSlot * slot,DestReceiver * self)478 transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
479 {
480 	DR_transientrel *myState = (DR_transientrel *) self;
481 	HeapTuple	tuple;
482 
483 	/*
484 	 * get the heap tuple out of the tuple table slot, making sure we have a
485 	 * writable copy
486 	 */
487 	tuple = ExecMaterializeSlot(slot);
488 
489 	heap_insert(myState->transientrel,
490 				tuple,
491 				myState->output_cid,
492 				myState->hi_options,
493 				myState->bistate);
494 
495 	/* We know this is a newly created relation, so there are no indexes */
496 
497 	return true;
498 }
499 
500 /*
501  * transientrel_shutdown --- executor end
502  */
503 static void
transientrel_shutdown(DestReceiver * self)504 transientrel_shutdown(DestReceiver *self)
505 {
506 	DR_transientrel *myState = (DR_transientrel *) self;
507 
508 	FreeBulkInsertState(myState->bistate);
509 
510 	/* If we skipped using WAL, must heap_sync before commit */
511 	if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
512 		heap_sync(myState->transientrel);
513 
514 	/* close transientrel, but keep lock until commit */
515 	heap_close(myState->transientrel, NoLock);
516 	myState->transientrel = NULL;
517 }
518 
519 /*
520  * transientrel_destroy --- release DestReceiver object
521  */
522 static void
transientrel_destroy(DestReceiver * self)523 transientrel_destroy(DestReceiver *self)
524 {
525 	pfree(self);
526 }
527 
528 
529 /*
530  * Given a qualified temporary table name, append an underscore followed by
531  * the given integer, to make a new table name based on the old one.
532  * The result is a palloc'd string.
533  *
534  * As coded, this would fail to make a valid SQL name if the given name were,
535  * say, "FOO"."BAR".  Currently, the table name portion of the input will
536  * never be double-quoted because it's of the form "pg_temp_NNN", cf
537  * make_new_heap().  But we might have to work harder someday.
538  */
539 static char *
make_temptable_name_n(char * tempname,int n)540 make_temptable_name_n(char *tempname, int n)
541 {
542 	StringInfoData namebuf;
543 
544 	initStringInfo(&namebuf);
545 	appendStringInfoString(&namebuf, tempname);
546 	appendStringInfo(&namebuf, "_%d", n);
547 	return namebuf.data;
548 }
549 
550 /*
551  * refresh_by_match_merge
552  *
553  * Refresh a materialized view with transactional semantics, while allowing
554  * concurrent reads.
555  *
556  * This is called after a new version of the data has been created in a
557  * temporary table.  It performs a full outer join against the old version of
558  * the data, producing "diff" results.  This join cannot work if there are any
559  * duplicated rows in either the old or new versions, in the sense that every
560  * column would compare as equal between the two rows.  It does work correctly
561  * in the face of rows which have at least one NULL value, with all non-NULL
562  * columns equal.  The behavior of NULLs on equality tests and on UNIQUE
563  * indexes turns out to be quite convenient here; the tests we need to make
564  * are consistent with default behavior.  If there is at least one UNIQUE
565  * index on the materialized view, we have exactly the guarantee we need.
566  *
567  * The temporary table used to hold the diff results contains just the TID of
568  * the old record (if matched) and the ROW from the new table as a single
569  * column of complex record type (if matched).
570  *
571  * Once we have the diff table, we perform set-based DELETE and INSERT
572  * operations against the materialized view, and discard both temporary
573  * tables.
574  *
575  * Everything from the generation of the new data to applying the differences
576  * takes place under cover of an ExclusiveLock, since it seems as though we
577  * would want to prohibit not only concurrent REFRESH operations, but also
578  * incremental maintenance.  It also doesn't seem reasonable or safe to allow
579  * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
580  * this command.
581  */
582 static void
refresh_by_match_merge(Oid matviewOid,Oid tempOid,Oid relowner,int save_sec_context)583 refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
584 					   int save_sec_context)
585 {
586 	StringInfoData querybuf;
587 	Relation	matviewRel;
588 	Relation	tempRel;
589 	char	   *matviewname;
590 	char	   *tempname;
591 	char	   *diffname;
592 	TupleDesc	tupdesc;
593 	bool		foundUniqueIndex;
594 	List	   *indexoidlist;
595 	ListCell   *indexoidscan;
596 	int16		relnatts;
597 	Oid		   *opUsedForQual;
598 
599 	initStringInfo(&querybuf);
600 	matviewRel = heap_open(matviewOid, NoLock);
601 	matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
602 											 RelationGetRelationName(matviewRel));
603 	tempRel = heap_open(tempOid, NoLock);
604 	tempname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(tempRel)),
605 										  RelationGetRelationName(tempRel));
606 	diffname = make_temptable_name_n(tempname, 2);
607 
608 	relnatts = matviewRel->rd_rel->relnatts;
609 
610 	/* Open SPI context. */
611 	if (SPI_connect() != SPI_OK_CONNECT)
612 		elog(ERROR, "SPI_connect failed");
613 
614 	/* Analyze the temp table with the new contents. */
615 	appendStringInfo(&querybuf, "ANALYZE %s", tempname);
616 	if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
617 		elog(ERROR, "SPI_exec failed: %s", querybuf.data);
618 
619 	/*
620 	 * We need to ensure that there are not duplicate rows without NULLs in
621 	 * the new data set before we can count on the "diff" results.  Check for
622 	 * that in a way that allows showing the first duplicated row found.  Even
623 	 * after we pass this test, a unique index on the materialized view may
624 	 * find a duplicate key problem.
625 	 *
626 	 * Note: here and below, we use "tablename.*::tablerowtype" as a hack to
627 	 * keep ".*" from being expanded into multiple columns in a SELECT list.
628 	 * Compare ruleutils.c's get_variable().
629 	 */
630 	resetStringInfo(&querybuf);
631 	appendStringInfo(&querybuf,
632 					 "SELECT newdata.*::%s FROM %s newdata "
633 					 "WHERE newdata.* IS NOT NULL AND EXISTS "
634 					 "(SELECT 1 FROM %s newdata2 WHERE newdata2.* IS NOT NULL "
635 					 "AND newdata2.* OPERATOR(pg_catalog.*=) newdata.* "
636 					 "AND newdata2.ctid OPERATOR(pg_catalog.<>) "
637 					 "newdata.ctid)",
638 					 tempname, tempname, tempname);
639 	if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT)
640 		elog(ERROR, "SPI_exec failed: %s", querybuf.data);
641 	if (SPI_processed > 0)
642 	{
643 		/*
644 		 * Note that this ereport() is returning data to the user.  Generally,
645 		 * we would want to make sure that the user has been granted access to
646 		 * this data.  However, REFRESH MAT VIEW is only able to be run by the
647 		 * owner of the mat view (or a superuser) and therefore there is no
648 		 * need to check for access to data in the mat view.
649 		 */
650 		ereport(ERROR,
651 				(errcode(ERRCODE_CARDINALITY_VIOLATION),
652 				 errmsg("new data for materialized view \"%s\" contains duplicate rows without any null columns",
653 						RelationGetRelationName(matviewRel)),
654 				 errdetail("Row: %s",
655 						   SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1))));
656 	}
657 
658 	SetUserIdAndSecContext(relowner,
659 						   save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
660 
661 	/* Start building the query for creating the diff table. */
662 	resetStringInfo(&querybuf);
663 	appendStringInfo(&querybuf,
664 					 "CREATE TEMP TABLE %s AS "
665 					 "SELECT mv.ctid AS tid, newdata.*::%s AS newdata "
666 					 "FROM %s mv FULL JOIN %s newdata ON (",
667 					 diffname, tempname, matviewname, tempname);
668 
669 	/*
670 	 * Get the list of index OIDs for the table from the relcache, and look up
671 	 * each one in the pg_index syscache.  We will test for equality on all
672 	 * columns present in all unique indexes which only reference columns and
673 	 * include all rows.
674 	 */
675 	tupdesc = matviewRel->rd_att;
676 	opUsedForQual = (Oid *) palloc0(sizeof(Oid) * relnatts);
677 	foundUniqueIndex = false;
678 
679 	indexoidlist = RelationGetIndexList(matviewRel);
680 
681 	foreach(indexoidscan, indexoidlist)
682 	{
683 		Oid			indexoid = lfirst_oid(indexoidscan);
684 		Relation	indexRel;
685 
686 		indexRel = index_open(indexoid, RowExclusiveLock);
687 		if (is_usable_unique_index(indexRel))
688 		{
689 			Form_pg_index indexStruct = indexRel->rd_index;
690 			int			numatts = indexStruct->indnatts;
691 			oidvector  *indclass;
692 			Datum		indclassDatum;
693 			bool		isnull;
694 			int			i;
695 
696 			/* Must get indclass the hard way. */
697 			indclassDatum = SysCacheGetAttr(INDEXRELID,
698 											indexRel->rd_indextuple,
699 											Anum_pg_index_indclass,
700 											&isnull);
701 			Assert(!isnull);
702 			indclass = (oidvector *) DatumGetPointer(indclassDatum);
703 
704 			/* Add quals for all columns from this index. */
705 			for (i = 0; i < numatts; i++)
706 			{
707 				int			attnum = indexStruct->indkey.values[i];
708 				Oid			opclass = indclass->values[i];
709 				Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
710 				Oid			attrtype = attr->atttypid;
711 				HeapTuple	cla_ht;
712 				Form_pg_opclass cla_tup;
713 				Oid			opfamily;
714 				Oid			opcintype;
715 				Oid			op;
716 				const char *leftop;
717 				const char *rightop;
718 
719 				/*
720 				 * Identify the equality operator associated with this index
721 				 * column.  First we need to look up the column's opclass.
722 				 */
723 				cla_ht = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
724 				if (!HeapTupleIsValid(cla_ht))
725 					elog(ERROR, "cache lookup failed for opclass %u", opclass);
726 				cla_tup = (Form_pg_opclass) GETSTRUCT(cla_ht);
727 				Assert(cla_tup->opcmethod == BTREE_AM_OID);
728 				opfamily = cla_tup->opcfamily;
729 				opcintype = cla_tup->opcintype;
730 				ReleaseSysCache(cla_ht);
731 
732 				op = get_opfamily_member(opfamily, opcintype, opcintype,
733 										 BTEqualStrategyNumber);
734 				if (!OidIsValid(op))
735 					elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
736 						 BTEqualStrategyNumber, opcintype, opcintype, opfamily);
737 
738 				/*
739 				 * If we find the same column with the same equality semantics
740 				 * in more than one index, we only need to emit the equality
741 				 * clause once.
742 				 *
743 				 * Since we only remember the last equality operator, this
744 				 * code could be fooled into emitting duplicate clauses given
745 				 * multiple indexes with several different opclasses ... but
746 				 * that's so unlikely it doesn't seem worth spending extra
747 				 * code to avoid.
748 				 */
749 				if (opUsedForQual[attnum - 1] == op)
750 					continue;
751 				opUsedForQual[attnum - 1] = op;
752 
753 				/*
754 				 * Actually add the qual, ANDed with any others.
755 				 */
756 				if (foundUniqueIndex)
757 					appendStringInfoString(&querybuf, " AND ");
758 
759 				leftop = quote_qualified_identifier("newdata",
760 													NameStr(attr->attname));
761 				rightop = quote_qualified_identifier("mv",
762 													 NameStr(attr->attname));
763 
764 				generate_operator_clause(&querybuf,
765 										 leftop, attrtype,
766 										 op,
767 										 rightop, attrtype);
768 
769 				foundUniqueIndex = true;
770 			}
771 		}
772 
773 		/* Keep the locks, since we're about to run DML which needs them. */
774 		index_close(indexRel, NoLock);
775 	}
776 
777 	list_free(indexoidlist);
778 
779 	/*
780 	 * There must be at least one usable unique index on the matview.
781 	 *
782 	 * ExecRefreshMatView() checks that after taking the exclusive lock on the
783 	 * matview. So at least one unique index is guaranteed to exist here
784 	 * because the lock is still being held; so an Assert seems sufficient.
785 	 */
786 	Assert(foundUniqueIndex);
787 
788 	appendStringInfoString(&querybuf,
789 						   " AND newdata.* OPERATOR(pg_catalog.*=) mv.*) "
790 						   "WHERE newdata.* IS NULL OR mv.* IS NULL "
791 						   "ORDER BY tid");
792 
793 	/* Create the temporary "diff" table. */
794 	if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
795 		elog(ERROR, "SPI_exec failed: %s", querybuf.data);
796 
797 	SetUserIdAndSecContext(relowner,
798 						   save_sec_context | SECURITY_RESTRICTED_OPERATION);
799 
800 	/*
801 	 * We have no further use for data from the "full-data" temp table, but we
802 	 * must keep it around because its type is referenced from the diff table.
803 	 */
804 
805 	/* Analyze the diff table. */
806 	resetStringInfo(&querybuf);
807 	appendStringInfo(&querybuf, "ANALYZE %s", diffname);
808 	if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
809 		elog(ERROR, "SPI_exec failed: %s", querybuf.data);
810 
811 	OpenMatViewIncrementalMaintenance();
812 
813 	/* Deletes must come before inserts; do them first. */
814 	resetStringInfo(&querybuf);
815 	appendStringInfo(&querybuf,
816 					 "DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY "
817 					 "(SELECT diff.tid FROM %s diff "
818 					 "WHERE diff.tid IS NOT NULL "
819 					 "AND diff.newdata IS NULL)",
820 					 matviewname, diffname);
821 	if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
822 		elog(ERROR, "SPI_exec failed: %s", querybuf.data);
823 
824 	/* Inserts go last. */
825 	resetStringInfo(&querybuf);
826 	appendStringInfo(&querybuf,
827 					 "INSERT INTO %s SELECT (diff.newdata).* "
828 					 "FROM %s diff WHERE tid IS NULL",
829 					 matviewname, diffname);
830 	if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
831 		elog(ERROR, "SPI_exec failed: %s", querybuf.data);
832 
833 	/* We're done maintaining the materialized view. */
834 	CloseMatViewIncrementalMaintenance();
835 	heap_close(tempRel, NoLock);
836 	heap_close(matviewRel, NoLock);
837 
838 	/* Clean up temp tables. */
839 	resetStringInfo(&querybuf);
840 	appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname);
841 	if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
842 		elog(ERROR, "SPI_exec failed: %s", querybuf.data);
843 
844 	/* Close SPI context. */
845 	if (SPI_finish() != SPI_OK_FINISH)
846 		elog(ERROR, "SPI_finish failed");
847 }
848 
849 /*
850  * Swap the physical files of the target and transient tables, then rebuild
851  * the target's indexes and throw away the transient table.  Security context
852  * swapping is handled by the called function, so it is not needed here.
853  */
854 static void
refresh_by_heap_swap(Oid matviewOid,Oid OIDNewHeap,char relpersistence)855 refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
856 {
857 	finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
858 					 RecentXmin, ReadNextMultiXactId(), relpersistence);
859 }
860 
861 /*
862  * Check whether specified index is usable for match merge.
863  */
864 static bool
is_usable_unique_index(Relation indexRel)865 is_usable_unique_index(Relation indexRel)
866 {
867 	Form_pg_index indexStruct = indexRel->rd_index;
868 
869 	/*
870 	 * Must be unique, valid, immediate, non-partial, and be defined over
871 	 * plain user columns (not expressions).  We also require it to be a
872 	 * btree.  Even if we had any other unique index kinds, we'd not know how
873 	 * to identify the corresponding equality operator, nor could we be sure
874 	 * that the planner could implement the required FULL JOIN with non-btree
875 	 * operators.
876 	 */
877 	if (indexStruct->indisunique &&
878 		indexStruct->indimmediate &&
879 		indexRel->rd_rel->relam == BTREE_AM_OID &&
880 		IndexIsValid(indexStruct) &&
881 		RelationGetIndexPredicate(indexRel) == NIL &&
882 		indexStruct->indnatts > 0)
883 	{
884 		/*
885 		 * The point of groveling through the index columns individually is to
886 		 * reject both index expressions and system columns.  Currently,
887 		 * matviews couldn't have OID columns so there's no way to create an
888 		 * index on a system column; but maybe someday that wouldn't be true,
889 		 * so let's be safe.
890 		 */
891 		int			numatts = indexStruct->indnatts;
892 		int			i;
893 
894 		for (i = 0; i < numatts; i++)
895 		{
896 			int			attnum = indexStruct->indkey.values[i];
897 
898 			if (attnum <= 0)
899 				return false;
900 		}
901 		return true;
902 	}
903 	return false;
904 }
905 
906 
907 /*
908  * This should be used to test whether the backend is in a context where it is
909  * OK to allow DML statements to modify materialized views.  We only want to
910  * allow that for internal code driven by the materialized view definition,
911  * not for arbitrary user-supplied code.
912  *
913  * While the function names reflect the fact that their main intended use is
914  * incremental maintenance of materialized views (in response to changes to
915  * the data in referenced relations), they are initially used to allow REFRESH
916  * without blocking concurrent reads.
917  */
918 bool
MatViewIncrementalMaintenanceIsEnabled(void)919 MatViewIncrementalMaintenanceIsEnabled(void)
920 {
921 	return matview_maintenance_depth > 0;
922 }
923 
924 static void
OpenMatViewIncrementalMaintenance(void)925 OpenMatViewIncrementalMaintenance(void)
926 {
927 	matview_maintenance_depth++;
928 }
929 
930 static void
CloseMatViewIncrementalMaintenance(void)931 CloseMatViewIncrementalMaintenance(void)
932 {
933 	matview_maintenance_depth--;
934 	Assert(matview_maintenance_depth >= 0);
935 }
936