1 /*-------------------------------------------------------------------------
2 *
3 * matview.c
4 * materialized view support
5 *
6 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/commands/matview.c
12 *
13 *-------------------------------------------------------------------------
14 */
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "access/htup_details.h"
20 #include "access/multixact.h"
21 #include "access/tableam.h"
22 #include "access/xact.h"
23 #include "access/xlog.h"
24 #include "catalog/catalog.h"
25 #include "catalog/indexing.h"
26 #include "catalog/namespace.h"
27 #include "catalog/pg_am.h"
28 #include "catalog/pg_opclass.h"
29 #include "catalog/pg_operator.h"
30 #include "commands/cluster.h"
31 #include "commands/matview.h"
32 #include "commands/tablecmds.h"
33 #include "commands/tablespace.h"
34 #include "executor/executor.h"
35 #include "executor/spi.h"
36 #include "miscadmin.h"
37 #include "parser/parse_relation.h"
38 #include "pgstat.h"
39 #include "rewrite/rewriteHandler.h"
40 #include "storage/lmgr.h"
41 #include "storage/smgr.h"
42 #include "tcop/tcopprot.h"
43 #include "utils/builtins.h"
44 #include "utils/lsyscache.h"
45 #include "utils/rel.h"
46 #include "utils/snapmgr.h"
47 #include "utils/syscache.h"
48
49
50 typedef struct
51 {
52 DestReceiver pub; /* publicly-known function pointers */
53 Oid transientoid; /* OID of new heap into which to store */
54 /* These fields are filled by transientrel_startup: */
55 Relation transientrel; /* relation to write to */
56 CommandId output_cid; /* cmin to insert in output tuples */
57 int ti_options; /* table_tuple_insert performance options */
58 BulkInsertState bistate; /* bulk insert state */
59 } DR_transientrel;
60
61 static int matview_maintenance_depth = 0;
62
63 static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
64 static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
65 static void transientrel_shutdown(DestReceiver *self);
66 static void transientrel_destroy(DestReceiver *self);
67 static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query,
68 const char *queryString);
69 static char *make_temptable_name_n(char *tempname, int n);
70 static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
71 int save_sec_context);
72 static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence);
73 static bool is_usable_unique_index(Relation indexRel);
74 static void OpenMatViewIncrementalMaintenance(void);
75 static void CloseMatViewIncrementalMaintenance(void);
76
77 /*
78 * SetMatViewPopulatedState
79 * Mark a materialized view as populated, or not.
80 *
81 * NOTE: caller must be holding an appropriate lock on the relation.
82 */
83 void
SetMatViewPopulatedState(Relation relation,bool newstate)84 SetMatViewPopulatedState(Relation relation, bool newstate)
85 {
86 Relation pgrel;
87 HeapTuple tuple;
88
89 Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
90
91 /*
92 * Update relation's pg_class entry. Crucial side-effect: other backends
93 * (and this one too!) are sent SI message to make them rebuild relcache
94 * entries.
95 */
96 pgrel = table_open(RelationRelationId, RowExclusiveLock);
97 tuple = SearchSysCacheCopy1(RELOID,
98 ObjectIdGetDatum(RelationGetRelid(relation)));
99 if (!HeapTupleIsValid(tuple))
100 elog(ERROR, "cache lookup failed for relation %u",
101 RelationGetRelid(relation));
102
103 ((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate;
104
105 CatalogTupleUpdate(pgrel, &tuple->t_self, tuple);
106
107 heap_freetuple(tuple);
108 table_close(pgrel, RowExclusiveLock);
109
110 /*
111 * Advance command counter to make the updated pg_class row locally
112 * visible.
113 */
114 CommandCounterIncrement();
115 }
116
117 /*
118 * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
119 *
120 * This refreshes the materialized view by creating a new table and swapping
121 * the relfilenodes of the new table and the old materialized view, so the OID
122 * of the original materialized view is preserved. Thus we do not lose GRANT
123 * nor references to this materialized view.
124 *
125 * If WITH NO DATA was specified, this is effectively like a TRUNCATE;
126 * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT
127 * statement associated with the materialized view. The statement node's
128 * skipData field shows whether the clause was used.
129 *
130 * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
131 * the new heap, it's better to create the indexes afterwards than to fill them
132 * incrementally while we load.
133 *
134 * The matview's "populated" state is changed based on whether the contents
135 * reflect the result set of the materialized view's query.
136 */
137 ObjectAddress
ExecRefreshMatView(RefreshMatViewStmt * stmt,const char * queryString,ParamListInfo params,QueryCompletion * qc)138 ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
139 ParamListInfo params, QueryCompletion *qc)
140 {
141 Oid matviewOid;
142 Relation matviewRel;
143 RewriteRule *rule;
144 List *actions;
145 Query *dataQuery;
146 Oid tableSpace;
147 Oid relowner;
148 Oid OIDNewHeap;
149 DestReceiver *dest;
150 uint64 processed = 0;
151 bool concurrent;
152 LOCKMODE lockmode;
153 char relpersistence;
154 Oid save_userid;
155 int save_sec_context;
156 int save_nestlevel;
157 ObjectAddress address;
158
159 /* Determine strength of lock needed. */
160 concurrent = stmt->concurrent;
161 lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock;
162
163 /*
164 * Get a lock until end of transaction.
165 */
166 matviewOid = RangeVarGetRelidExtended(stmt->relation,
167 lockmode, 0,
168 RangeVarCallbackOwnsTable, NULL);
169 matviewRel = table_open(matviewOid, NoLock);
170
171 /* Make sure it is a materialized view. */
172 if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
173 ereport(ERROR,
174 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
175 errmsg("\"%s\" is not a materialized view",
176 RelationGetRelationName(matviewRel))));
177
178 /* Check that CONCURRENTLY is not specified if not populated. */
179 if (concurrent && !RelationIsPopulated(matviewRel))
180 ereport(ERROR,
181 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
182 errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
183
184 /* Check that conflicting options have not been specified. */
185 if (concurrent && stmt->skipData)
186 ereport(ERROR,
187 (errcode(ERRCODE_SYNTAX_ERROR),
188 errmsg("%s and %s options cannot be used together",
189 "CONCURRENTLY", "WITH NO DATA")));
190
191 /*
192 * Check that everything is correct for a refresh. Problems at this point
193 * are internal errors, so elog is sufficient.
194 */
195 if (matviewRel->rd_rel->relhasrules == false ||
196 matviewRel->rd_rules->numLocks < 1)
197 elog(ERROR,
198 "materialized view \"%s\" is missing rewrite information",
199 RelationGetRelationName(matviewRel));
200
201 if (matviewRel->rd_rules->numLocks > 1)
202 elog(ERROR,
203 "materialized view \"%s\" has too many rules",
204 RelationGetRelationName(matviewRel));
205
206 rule = matviewRel->rd_rules->rules[0];
207 if (rule->event != CMD_SELECT || !(rule->isInstead))
208 elog(ERROR,
209 "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
210 RelationGetRelationName(matviewRel));
211
212 actions = rule->actions;
213 if (list_length(actions) != 1)
214 elog(ERROR,
215 "the rule for materialized view \"%s\" is not a single action",
216 RelationGetRelationName(matviewRel));
217
218 /*
219 * Check that there is a unique index with no WHERE clause on one or more
220 * columns of the materialized view if CONCURRENTLY is specified.
221 */
222 if (concurrent)
223 {
224 List *indexoidlist = RelationGetIndexList(matviewRel);
225 ListCell *indexoidscan;
226 bool hasUniqueIndex = false;
227
228 foreach(indexoidscan, indexoidlist)
229 {
230 Oid indexoid = lfirst_oid(indexoidscan);
231 Relation indexRel;
232
233 indexRel = index_open(indexoid, AccessShareLock);
234 hasUniqueIndex = is_usable_unique_index(indexRel);
235 index_close(indexRel, AccessShareLock);
236 if (hasUniqueIndex)
237 break;
238 }
239
240 list_free(indexoidlist);
241
242 if (!hasUniqueIndex)
243 ereport(ERROR,
244 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
245 errmsg("cannot refresh materialized view \"%s\" concurrently",
246 quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
247 RelationGetRelationName(matviewRel))),
248 errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
249 }
250
251 /*
252 * The stored query was rewritten at the time of the MV definition, but
253 * has not been scribbled on by the planner.
254 */
255 dataQuery = linitial_node(Query, actions);
256
257 /*
258 * Check for active uses of the relation in the current transaction, such
259 * as open scans.
260 *
261 * NB: We count on this to protect us against problems with refreshing the
262 * data using TABLE_INSERT_FROZEN.
263 */
264 CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
265
266 /*
267 * Tentatively mark the matview as populated or not (this will roll back
268 * if we fail later).
269 */
270 SetMatViewPopulatedState(matviewRel, !stmt->skipData);
271
272 relowner = matviewRel->rd_rel->relowner;
273
274 /*
275 * Switch to the owner's userid, so that any functions are run as that
276 * user. Also arrange to make GUC variable changes local to this command.
277 * Don't lock it down too tight to create a temporary table just yet. We
278 * will switch modes when we are about to execute user code.
279 */
280 GetUserIdAndSecContext(&save_userid, &save_sec_context);
281 SetUserIdAndSecContext(relowner,
282 save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
283 save_nestlevel = NewGUCNestLevel();
284
285 /* Concurrent refresh builds new data in temp tablespace, and does diff. */
286 if (concurrent)
287 {
288 tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP, false);
289 relpersistence = RELPERSISTENCE_TEMP;
290 }
291 else
292 {
293 tableSpace = matviewRel->rd_rel->reltablespace;
294 relpersistence = matviewRel->rd_rel->relpersistence;
295 }
296
297 /*
298 * Create the transient table that will receive the regenerated data. Lock
299 * it against access by any other process until commit (by which time it
300 * will be gone).
301 */
302 OIDNewHeap = make_new_heap(matviewOid, tableSpace, relpersistence,
303 ExclusiveLock);
304 LockRelationOid(OIDNewHeap, AccessExclusiveLock);
305 dest = CreateTransientRelDestReceiver(OIDNewHeap);
306
307 /*
308 * Now lock down security-restricted operations.
309 */
310 SetUserIdAndSecContext(relowner,
311 save_sec_context | SECURITY_RESTRICTED_OPERATION);
312
313 /* Generate the data, if wanted. */
314 if (!stmt->skipData)
315 processed = refresh_matview_datafill(dest, dataQuery, queryString);
316
317 /* Make the matview match the newly generated data. */
318 if (concurrent)
319 {
320 int old_depth = matview_maintenance_depth;
321
322 PG_TRY();
323 {
324 refresh_by_match_merge(matviewOid, OIDNewHeap, relowner,
325 save_sec_context);
326 }
327 PG_CATCH();
328 {
329 matview_maintenance_depth = old_depth;
330 PG_RE_THROW();
331 }
332 PG_END_TRY();
333 Assert(matview_maintenance_depth == old_depth);
334 }
335 else
336 {
337 refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
338
339 /*
340 * Inform stats collector about our activity: basically, we truncated
341 * the matview and inserted some new data. (The concurrent code path
342 * above doesn't need to worry about this because the inserts and
343 * deletes it issues get counted by lower-level code.)
344 */
345 pgstat_count_truncate(matviewRel);
346 if (!stmt->skipData)
347 pgstat_count_heap_insert(matviewRel, processed);
348 }
349
350 table_close(matviewRel, NoLock);
351
352 /* Roll back any GUC changes */
353 AtEOXact_GUC(false, save_nestlevel);
354
355 /* Restore userid and security context */
356 SetUserIdAndSecContext(save_userid, save_sec_context);
357
358 ObjectAddressSet(address, RelationRelationId, matviewOid);
359
360 return address;
361 }
362
363 /*
364 * refresh_matview_datafill
365 *
366 * Execute the given query, sending result rows to "dest" (which will
367 * insert them into the target matview).
368 *
369 * Returns number of rows inserted.
370 */
371 static uint64
refresh_matview_datafill(DestReceiver * dest,Query * query,const char * queryString)372 refresh_matview_datafill(DestReceiver *dest, Query *query,
373 const char *queryString)
374 {
375 List *rewritten;
376 PlannedStmt *plan;
377 QueryDesc *queryDesc;
378 Query *copied_query;
379 uint64 processed;
380
381 /* Lock and rewrite, using a copy to preserve the original query. */
382 copied_query = copyObject(query);
383 AcquireRewriteLocks(copied_query, true, false);
384 rewritten = QueryRewrite(copied_query);
385
386 /* SELECT should never rewrite to more or less than one SELECT query */
387 if (list_length(rewritten) != 1)
388 elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
389 query = (Query *) linitial(rewritten);
390
391 /* Check for user-requested abort. */
392 CHECK_FOR_INTERRUPTS();
393
394 /* Plan the query which will generate data for the refresh. */
395 plan = pg_plan_query(query, queryString, 0, NULL);
396
397 /*
398 * Use a snapshot with an updated command ID to ensure this query sees
399 * results of any previously executed queries. (This could only matter if
400 * the planner executed an allegedly-stable function that changed the
401 * database contents, but let's do it anyway to be safe.)
402 */
403 PushCopiedSnapshot(GetActiveSnapshot());
404 UpdateActiveSnapshotCommandId();
405
406 /* Create a QueryDesc, redirecting output to our tuple receiver */
407 queryDesc = CreateQueryDesc(plan, queryString,
408 GetActiveSnapshot(), InvalidSnapshot,
409 dest, NULL, NULL, 0);
410
411 /* call ExecutorStart to prepare the plan for execution */
412 ExecutorStart(queryDesc, 0);
413
414 /* run the plan */
415 ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
416
417 processed = queryDesc->estate->es_processed;
418
419 /* and clean up */
420 ExecutorFinish(queryDesc);
421 ExecutorEnd(queryDesc);
422
423 FreeQueryDesc(queryDesc);
424
425 PopActiveSnapshot();
426
427 return processed;
428 }
429
430 DestReceiver *
CreateTransientRelDestReceiver(Oid transientoid)431 CreateTransientRelDestReceiver(Oid transientoid)
432 {
433 DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel));
434
435 self->pub.receiveSlot = transientrel_receive;
436 self->pub.rStartup = transientrel_startup;
437 self->pub.rShutdown = transientrel_shutdown;
438 self->pub.rDestroy = transientrel_destroy;
439 self->pub.mydest = DestTransientRel;
440 self->transientoid = transientoid;
441
442 return (DestReceiver *) self;
443 }
444
445 /*
446 * transientrel_startup --- executor startup
447 */
448 static void
transientrel_startup(DestReceiver * self,int operation,TupleDesc typeinfo)449 transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
450 {
451 DR_transientrel *myState = (DR_transientrel *) self;
452 Relation transientrel;
453
454 transientrel = table_open(myState->transientoid, NoLock);
455
456 /*
457 * Fill private fields of myState for use by later routines
458 */
459 myState->transientrel = transientrel;
460 myState->output_cid = GetCurrentCommandId(true);
461 myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
462 myState->bistate = GetBulkInsertState();
463
464 /*
465 * Valid smgr_targblock implies something already wrote to the relation.
466 * This may be harmless, but this function hasn't planned for it.
467 */
468 Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber);
469 }
470
471 /*
472 * transientrel_receive --- receive one tuple
473 */
474 static bool
transientrel_receive(TupleTableSlot * slot,DestReceiver * self)475 transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
476 {
477 DR_transientrel *myState = (DR_transientrel *) self;
478
479 /*
480 * Note that the input slot might not be of the type of the target
481 * relation. That's supported by table_tuple_insert(), but slightly less
482 * efficient than inserting with the right slot - but the alternative
483 * would be to copy into a slot of the right type, which would not be
484 * cheap either. This also doesn't allow accessing per-AM data (say a
485 * tuple's xmin), but since we don't do that here...
486 */
487
488 table_tuple_insert(myState->transientrel,
489 slot,
490 myState->output_cid,
491 myState->ti_options,
492 myState->bistate);
493
494 /* We know this is a newly created relation, so there are no indexes */
495
496 return true;
497 }
498
499 /*
500 * transientrel_shutdown --- executor end
501 */
502 static void
transientrel_shutdown(DestReceiver * self)503 transientrel_shutdown(DestReceiver *self)
504 {
505 DR_transientrel *myState = (DR_transientrel *) self;
506
507 FreeBulkInsertState(myState->bistate);
508
509 table_finish_bulk_insert(myState->transientrel, myState->ti_options);
510
511 /* close transientrel, but keep lock until commit */
512 table_close(myState->transientrel, NoLock);
513 myState->transientrel = NULL;
514 }
515
516 /*
517 * transientrel_destroy --- release DestReceiver object
518 */
519 static void
transientrel_destroy(DestReceiver * self)520 transientrel_destroy(DestReceiver *self)
521 {
522 pfree(self);
523 }
524
525
526 /*
527 * Given a qualified temporary table name, append an underscore followed by
528 * the given integer, to make a new table name based on the old one.
529 * The result is a palloc'd string.
530 *
531 * As coded, this would fail to make a valid SQL name if the given name were,
532 * say, "FOO"."BAR". Currently, the table name portion of the input will
533 * never be double-quoted because it's of the form "pg_temp_NNN", cf
534 * make_new_heap(). But we might have to work harder someday.
535 */
536 static char *
make_temptable_name_n(char * tempname,int n)537 make_temptable_name_n(char *tempname, int n)
538 {
539 StringInfoData namebuf;
540
541 initStringInfo(&namebuf);
542 appendStringInfoString(&namebuf, tempname);
543 appendStringInfo(&namebuf, "_%d", n);
544 return namebuf.data;
545 }
546
547 /*
548 * refresh_by_match_merge
549 *
550 * Refresh a materialized view with transactional semantics, while allowing
551 * concurrent reads.
552 *
553 * This is called after a new version of the data has been created in a
554 * temporary table. It performs a full outer join against the old version of
555 * the data, producing "diff" results. This join cannot work if there are any
556 * duplicated rows in either the old or new versions, in the sense that every
557 * column would compare as equal between the two rows. It does work correctly
558 * in the face of rows which have at least one NULL value, with all non-NULL
559 * columns equal. The behavior of NULLs on equality tests and on UNIQUE
560 * indexes turns out to be quite convenient here; the tests we need to make
561 * are consistent with default behavior. If there is at least one UNIQUE
562 * index on the materialized view, we have exactly the guarantee we need.
563 *
564 * The temporary table used to hold the diff results contains just the TID of
565 * the old record (if matched) and the ROW from the new table as a single
566 * column of complex record type (if matched).
567 *
568 * Once we have the diff table, we perform set-based DELETE and INSERT
569 * operations against the materialized view, and discard both temporary
570 * tables.
571 *
572 * Everything from the generation of the new data to applying the differences
573 * takes place under cover of an ExclusiveLock, since it seems as though we
574 * would want to prohibit not only concurrent REFRESH operations, but also
575 * incremental maintenance. It also doesn't seem reasonable or safe to allow
576 * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
577 * this command.
578 */
579 static void
refresh_by_match_merge(Oid matviewOid,Oid tempOid,Oid relowner,int save_sec_context)580 refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
581 int save_sec_context)
582 {
583 StringInfoData querybuf;
584 Relation matviewRel;
585 Relation tempRel;
586 char *matviewname;
587 char *tempname;
588 char *diffname;
589 TupleDesc tupdesc;
590 bool foundUniqueIndex;
591 List *indexoidlist;
592 ListCell *indexoidscan;
593 int16 relnatts;
594 Oid *opUsedForQual;
595
596 initStringInfo(&querybuf);
597 matviewRel = table_open(matviewOid, NoLock);
598 matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
599 RelationGetRelationName(matviewRel));
600 tempRel = table_open(tempOid, NoLock);
601 tempname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(tempRel)),
602 RelationGetRelationName(tempRel));
603 diffname = make_temptable_name_n(tempname, 2);
604
605 relnatts = RelationGetNumberOfAttributes(matviewRel);
606
607 /* Open SPI context. */
608 if (SPI_connect() != SPI_OK_CONNECT)
609 elog(ERROR, "SPI_connect failed");
610
611 /* Analyze the temp table with the new contents. */
612 appendStringInfo(&querybuf, "ANALYZE %s", tempname);
613 if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
614 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
615
616 /*
617 * We need to ensure that there are not duplicate rows without NULLs in
618 * the new data set before we can count on the "diff" results. Check for
619 * that in a way that allows showing the first duplicated row found. Even
620 * after we pass this test, a unique index on the materialized view may
621 * find a duplicate key problem.
622 *
623 * Note: here and below, we use "tablename.*::tablerowtype" as a hack to
624 * keep ".*" from being expanded into multiple columns in a SELECT list.
625 * Compare ruleutils.c's get_variable().
626 */
627 resetStringInfo(&querybuf);
628 appendStringInfo(&querybuf,
629 "SELECT newdata.*::%s FROM %s newdata "
630 "WHERE newdata.* IS NOT NULL AND EXISTS "
631 "(SELECT 1 FROM %s newdata2 WHERE newdata2.* IS NOT NULL "
632 "AND newdata2.* OPERATOR(pg_catalog.*=) newdata.* "
633 "AND newdata2.ctid OPERATOR(pg_catalog.<>) "
634 "newdata.ctid)",
635 tempname, tempname, tempname);
636 if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT)
637 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
638 if (SPI_processed > 0)
639 {
640 /*
641 * Note that this ereport() is returning data to the user. Generally,
642 * we would want to make sure that the user has been granted access to
643 * this data. However, REFRESH MAT VIEW is only able to be run by the
644 * owner of the mat view (or a superuser) and therefore there is no
645 * need to check for access to data in the mat view.
646 */
647 ereport(ERROR,
648 (errcode(ERRCODE_CARDINALITY_VIOLATION),
649 errmsg("new data for materialized view \"%s\" contains duplicate rows without any null columns",
650 RelationGetRelationName(matviewRel)),
651 errdetail("Row: %s",
652 SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1))));
653 }
654
655 SetUserIdAndSecContext(relowner,
656 save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
657
658 /* Start building the query for creating the diff table. */
659 resetStringInfo(&querybuf);
660 appendStringInfo(&querybuf,
661 "CREATE TEMP TABLE %s AS "
662 "SELECT mv.ctid AS tid, newdata.*::%s AS newdata "
663 "FROM %s mv FULL JOIN %s newdata ON (",
664 diffname, tempname, matviewname, tempname);
665
666 /*
667 * Get the list of index OIDs for the table from the relcache, and look up
668 * each one in the pg_index syscache. We will test for equality on all
669 * columns present in all unique indexes which only reference columns and
670 * include all rows.
671 */
672 tupdesc = matviewRel->rd_att;
673 opUsedForQual = (Oid *) palloc0(sizeof(Oid) * relnatts);
674 foundUniqueIndex = false;
675
676 indexoidlist = RelationGetIndexList(matviewRel);
677
678 foreach(indexoidscan, indexoidlist)
679 {
680 Oid indexoid = lfirst_oid(indexoidscan);
681 Relation indexRel;
682
683 indexRel = index_open(indexoid, RowExclusiveLock);
684 if (is_usable_unique_index(indexRel))
685 {
686 Form_pg_index indexStruct = indexRel->rd_index;
687 int indnkeyatts = indexStruct->indnkeyatts;
688 oidvector *indclass;
689 Datum indclassDatum;
690 bool isnull;
691 int i;
692
693 /* Must get indclass the hard way. */
694 indclassDatum = SysCacheGetAttr(INDEXRELID,
695 indexRel->rd_indextuple,
696 Anum_pg_index_indclass,
697 &isnull);
698 Assert(!isnull);
699 indclass = (oidvector *) DatumGetPointer(indclassDatum);
700
701 /* Add quals for all columns from this index. */
702 for (i = 0; i < indnkeyatts; i++)
703 {
704 int attnum = indexStruct->indkey.values[i];
705 Oid opclass = indclass->values[i];
706 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
707 Oid attrtype = attr->atttypid;
708 HeapTuple cla_ht;
709 Form_pg_opclass cla_tup;
710 Oid opfamily;
711 Oid opcintype;
712 Oid op;
713 const char *leftop;
714 const char *rightop;
715
716 /*
717 * Identify the equality operator associated with this index
718 * column. First we need to look up the column's opclass.
719 */
720 cla_ht = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
721 if (!HeapTupleIsValid(cla_ht))
722 elog(ERROR, "cache lookup failed for opclass %u", opclass);
723 cla_tup = (Form_pg_opclass) GETSTRUCT(cla_ht);
724 Assert(cla_tup->opcmethod == BTREE_AM_OID);
725 opfamily = cla_tup->opcfamily;
726 opcintype = cla_tup->opcintype;
727 ReleaseSysCache(cla_ht);
728
729 op = get_opfamily_member(opfamily, opcintype, opcintype,
730 BTEqualStrategyNumber);
731 if (!OidIsValid(op))
732 elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
733 BTEqualStrategyNumber, opcintype, opcintype, opfamily);
734
735 /*
736 * If we find the same column with the same equality semantics
737 * in more than one index, we only need to emit the equality
738 * clause once.
739 *
740 * Since we only remember the last equality operator, this
741 * code could be fooled into emitting duplicate clauses given
742 * multiple indexes with several different opclasses ... but
743 * that's so unlikely it doesn't seem worth spending extra
744 * code to avoid.
745 */
746 if (opUsedForQual[attnum - 1] == op)
747 continue;
748 opUsedForQual[attnum - 1] = op;
749
750 /*
751 * Actually add the qual, ANDed with any others.
752 */
753 if (foundUniqueIndex)
754 appendStringInfoString(&querybuf, " AND ");
755
756 leftop = quote_qualified_identifier("newdata",
757 NameStr(attr->attname));
758 rightop = quote_qualified_identifier("mv",
759 NameStr(attr->attname));
760
761 generate_operator_clause(&querybuf,
762 leftop, attrtype,
763 op,
764 rightop, attrtype);
765
766 foundUniqueIndex = true;
767 }
768 }
769
770 /* Keep the locks, since we're about to run DML which needs them. */
771 index_close(indexRel, NoLock);
772 }
773
774 list_free(indexoidlist);
775
776 /*
777 * There must be at least one usable unique index on the matview.
778 *
779 * ExecRefreshMatView() checks that after taking the exclusive lock on the
780 * matview. So at least one unique index is guaranteed to exist here
781 * because the lock is still being held; so an Assert seems sufficient.
782 */
783 Assert(foundUniqueIndex);
784
785 appendStringInfoString(&querybuf,
786 " AND newdata.* OPERATOR(pg_catalog.*=) mv.*) "
787 "WHERE newdata.* IS NULL OR mv.* IS NULL "
788 "ORDER BY tid");
789
790 /* Create the temporary "diff" table. */
791 if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
792 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
793
794 SetUserIdAndSecContext(relowner,
795 save_sec_context | SECURITY_RESTRICTED_OPERATION);
796
797 /*
798 * We have no further use for data from the "full-data" temp table, but we
799 * must keep it around because its type is referenced from the diff table.
800 */
801
802 /* Analyze the diff table. */
803 resetStringInfo(&querybuf);
804 appendStringInfo(&querybuf, "ANALYZE %s", diffname);
805 if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
806 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
807
808 OpenMatViewIncrementalMaintenance();
809
810 /* Deletes must come before inserts; do them first. */
811 resetStringInfo(&querybuf);
812 appendStringInfo(&querybuf,
813 "DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY "
814 "(SELECT diff.tid FROM %s diff "
815 "WHERE diff.tid IS NOT NULL "
816 "AND diff.newdata IS NULL)",
817 matviewname, diffname);
818 if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
819 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
820
821 /* Inserts go last. */
822 resetStringInfo(&querybuf);
823 appendStringInfo(&querybuf,
824 "INSERT INTO %s SELECT (diff.newdata).* "
825 "FROM %s diff WHERE tid IS NULL",
826 matviewname, diffname);
827 if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
828 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
829
830 /* We're done maintaining the materialized view. */
831 CloseMatViewIncrementalMaintenance();
832 table_close(tempRel, NoLock);
833 table_close(matviewRel, NoLock);
834
835 /* Clean up temp tables. */
836 resetStringInfo(&querybuf);
837 appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname);
838 if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
839 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
840
841 /* Close SPI context. */
842 if (SPI_finish() != SPI_OK_FINISH)
843 elog(ERROR, "SPI_finish failed");
844 }
845
846 /*
847 * Swap the physical files of the target and transient tables, then rebuild
848 * the target's indexes and throw away the transient table. Security context
849 * swapping is handled by the called function, so it is not needed here.
850 */
851 static void
refresh_by_heap_swap(Oid matviewOid,Oid OIDNewHeap,char relpersistence)852 refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
853 {
854 finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
855 RecentXmin, ReadNextMultiXactId(), relpersistence);
856 }
857
858 /*
859 * Check whether specified index is usable for match merge.
860 */
861 static bool
is_usable_unique_index(Relation indexRel)862 is_usable_unique_index(Relation indexRel)
863 {
864 Form_pg_index indexStruct = indexRel->rd_index;
865
866 /*
867 * Must be unique, valid, immediate, non-partial, and be defined over
868 * plain user columns (not expressions). We also require it to be a
869 * btree. Even if we had any other unique index kinds, we'd not know how
870 * to identify the corresponding equality operator, nor could we be sure
871 * that the planner could implement the required FULL JOIN with non-btree
872 * operators.
873 */
874 if (indexStruct->indisunique &&
875 indexStruct->indimmediate &&
876 indexRel->rd_rel->relam == BTREE_AM_OID &&
877 indexStruct->indisvalid &&
878 RelationGetIndexPredicate(indexRel) == NIL &&
879 indexStruct->indnatts > 0)
880 {
881 /*
882 * The point of groveling through the index columns individually is to
883 * reject both index expressions and system columns. Currently,
884 * matviews couldn't have OID columns so there's no way to create an
885 * index on a system column; but maybe someday that wouldn't be true,
886 * so let's be safe.
887 */
888 int numatts = indexStruct->indnatts;
889 int i;
890
891 for (i = 0; i < numatts; i++)
892 {
893 int attnum = indexStruct->indkey.values[i];
894
895 if (attnum <= 0)
896 return false;
897 }
898 return true;
899 }
900 return false;
901 }
902
903
904 /*
905 * This should be used to test whether the backend is in a context where it is
906 * OK to allow DML statements to modify materialized views. We only want to
907 * allow that for internal code driven by the materialized view definition,
908 * not for arbitrary user-supplied code.
909 *
910 * While the function names reflect the fact that their main intended use is
911 * incremental maintenance of materialized views (in response to changes to
912 * the data in referenced relations), they are initially used to allow REFRESH
913 * without blocking concurrent reads.
914 */
915 bool
MatViewIncrementalMaintenanceIsEnabled(void)916 MatViewIncrementalMaintenanceIsEnabled(void)
917 {
918 return matview_maintenance_depth > 0;
919 }
920
921 static void
OpenMatViewIncrementalMaintenance(void)922 OpenMatViewIncrementalMaintenance(void)
923 {
924 matview_maintenance_depth++;
925 }
926
927 static void
CloseMatViewIncrementalMaintenance(void)928 CloseMatViewIncrementalMaintenance(void)
929 {
930 matview_maintenance_depth--;
931 Assert(matview_maintenance_depth >= 0);
932 }
933