1 /*-------------------------------------------------------------------------
2 *
3 * matview.c
4 * materialized view support
5 *
6 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/commands/matview.c
12 *
13 *-------------------------------------------------------------------------
14 */
15 #include "postgres.h"
16
17 #include "access/htup_details.h"
18 #include "access/multixact.h"
19 #include "access/xact.h"
20 #include "access/xlog.h"
21 #include "catalog/catalog.h"
22 #include "catalog/indexing.h"
23 #include "catalog/namespace.h"
24 #include "catalog/pg_am.h"
25 #include "catalog/pg_opclass.h"
26 #include "catalog/pg_operator.h"
27 #include "commands/cluster.h"
28 #include "commands/matview.h"
29 #include "commands/tablecmds.h"
30 #include "commands/tablespace.h"
31 #include "executor/executor.h"
32 #include "executor/spi.h"
33 #include "miscadmin.h"
34 #include "parser/parse_relation.h"
35 #include "pgstat.h"
36 #include "rewrite/rewriteHandler.h"
37 #include "storage/lmgr.h"
38 #include "storage/smgr.h"
39 #include "tcop/tcopprot.h"
40 #include "utils/builtins.h"
41 #include "utils/lsyscache.h"
42 #include "utils/rel.h"
43 #include "utils/snapmgr.h"
44 #include "utils/syscache.h"
45
46
47 typedef struct
48 {
49 DestReceiver pub; /* publicly-known function pointers */
50 Oid transientoid; /* OID of new heap into which to store */
51 /* These fields are filled by transientrel_startup: */
52 Relation transientrel; /* relation to write to */
53 CommandId output_cid; /* cmin to insert in output tuples */
54 int hi_options; /* heap_insert performance options */
55 BulkInsertState bistate; /* bulk insert state */
56 } DR_transientrel;
57
58 static int matview_maintenance_depth = 0;
59
60 static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
61 static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
62 static void transientrel_shutdown(DestReceiver *self);
63 static void transientrel_destroy(DestReceiver *self);
64 static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query,
65 const char *queryString);
66 static char *make_temptable_name_n(char *tempname, int n);
67 static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
68 int save_sec_context);
69 static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence);
70 static bool is_usable_unique_index(Relation indexRel);
71 static void OpenMatViewIncrementalMaintenance(void);
72 static void CloseMatViewIncrementalMaintenance(void);
73
74 /*
75 * SetMatViewPopulatedState
76 * Mark a materialized view as populated, or not.
77 *
78 * NOTE: caller must be holding an appropriate lock on the relation.
79 */
80 void
SetMatViewPopulatedState(Relation relation,bool newstate)81 SetMatViewPopulatedState(Relation relation, bool newstate)
82 {
83 Relation pgrel;
84 HeapTuple tuple;
85
86 Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
87
88 /*
89 * Update relation's pg_class entry. Crucial side-effect: other backends
90 * (and this one too!) are sent SI message to make them rebuild relcache
91 * entries.
92 */
93 pgrel = heap_open(RelationRelationId, RowExclusiveLock);
94 tuple = SearchSysCacheCopy1(RELOID,
95 ObjectIdGetDatum(RelationGetRelid(relation)));
96 if (!HeapTupleIsValid(tuple))
97 elog(ERROR, "cache lookup failed for relation %u",
98 RelationGetRelid(relation));
99
100 ((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate;
101
102 CatalogTupleUpdate(pgrel, &tuple->t_self, tuple);
103
104 heap_freetuple(tuple);
105 heap_close(pgrel, RowExclusiveLock);
106
107 /*
108 * Advance command counter to make the updated pg_class row locally
109 * visible.
110 */
111 CommandCounterIncrement();
112 }
113
114 /*
115 * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
116 *
117 * This refreshes the materialized view by creating a new table and swapping
118 * the relfilenodes of the new table and the old materialized view, so the OID
119 * of the original materialized view is preserved. Thus we do not lose GRANT
120 * nor references to this materialized view.
121 *
122 * If WITH NO DATA was specified, this is effectively like a TRUNCATE;
123 * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT
124 * statement associated with the materialized view. The statement node's
125 * skipData field shows whether the clause was used.
126 *
127 * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
128 * the new heap, it's better to create the indexes afterwards than to fill them
129 * incrementally while we load.
130 *
131 * The matview's "populated" state is changed based on whether the contents
132 * reflect the result set of the materialized view's query.
133 */
134 ObjectAddress
ExecRefreshMatView(RefreshMatViewStmt * stmt,const char * queryString,ParamListInfo params,char * completionTag)135 ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
136 ParamListInfo params, char *completionTag)
137 {
138 Oid matviewOid;
139 Relation matviewRel;
140 RewriteRule *rule;
141 List *actions;
142 Query *dataQuery;
143 Oid tableSpace;
144 Oid relowner;
145 Oid OIDNewHeap;
146 DestReceiver *dest;
147 uint64 processed = 0;
148 bool concurrent;
149 LOCKMODE lockmode;
150 char relpersistence;
151 Oid save_userid;
152 int save_sec_context;
153 int save_nestlevel;
154 ObjectAddress address;
155
156 /* Determine strength of lock needed. */
157 concurrent = stmt->concurrent;
158 lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock;
159
160 /*
161 * Get a lock until end of transaction.
162 */
163 matviewOid = RangeVarGetRelidExtended(stmt->relation,
164 lockmode, false, false,
165 RangeVarCallbackOwnsTable, NULL);
166 matviewRel = heap_open(matviewOid, NoLock);
167
168 /* Make sure it is a materialized view. */
169 if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
170 ereport(ERROR,
171 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
172 errmsg("\"%s\" is not a materialized view",
173 RelationGetRelationName(matviewRel))));
174
175 /* Check that CONCURRENTLY is not specified if not populated. */
176 if (concurrent && !RelationIsPopulated(matviewRel))
177 ereport(ERROR,
178 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
179 errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
180
181 /* Check that conflicting options have not been specified. */
182 if (concurrent && stmt->skipData)
183 ereport(ERROR,
184 (errcode(ERRCODE_SYNTAX_ERROR),
185 errmsg("CONCURRENTLY and WITH NO DATA options cannot be used together")));
186
187 /* We don't allow an oid column for a materialized view. */
188 Assert(!matviewRel->rd_rel->relhasoids);
189
190 /*
191 * Check that everything is correct for a refresh. Problems at this point
192 * are internal errors, so elog is sufficient.
193 */
194 if (matviewRel->rd_rel->relhasrules == false ||
195 matviewRel->rd_rules->numLocks < 1)
196 elog(ERROR,
197 "materialized view \"%s\" is missing rewrite information",
198 RelationGetRelationName(matviewRel));
199
200 if (matviewRel->rd_rules->numLocks > 1)
201 elog(ERROR,
202 "materialized view \"%s\" has too many rules",
203 RelationGetRelationName(matviewRel));
204
205 rule = matviewRel->rd_rules->rules[0];
206 if (rule->event != CMD_SELECT || !(rule->isInstead))
207 elog(ERROR,
208 "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
209 RelationGetRelationName(matviewRel));
210
211 actions = rule->actions;
212 if (list_length(actions) != 1)
213 elog(ERROR,
214 "the rule for materialized view \"%s\" is not a single action",
215 RelationGetRelationName(matviewRel));
216
217 /*
218 * Check that there is a unique index with no WHERE clause on one or more
219 * columns of the materialized view if CONCURRENTLY is specified.
220 */
221 if (concurrent)
222 {
223 List *indexoidlist = RelationGetIndexList(matviewRel);
224 ListCell *indexoidscan;
225 bool hasUniqueIndex = false;
226
227 foreach(indexoidscan, indexoidlist)
228 {
229 Oid indexoid = lfirst_oid(indexoidscan);
230 Relation indexRel;
231
232 indexRel = index_open(indexoid, AccessShareLock);
233 hasUniqueIndex = is_usable_unique_index(indexRel);
234 index_close(indexRel, AccessShareLock);
235 if (hasUniqueIndex)
236 break;
237 }
238
239 list_free(indexoidlist);
240
241 if (!hasUniqueIndex)
242 ereport(ERROR,
243 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
244 errmsg("cannot refresh materialized view \"%s\" concurrently",
245 quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
246 RelationGetRelationName(matviewRel))),
247 errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
248 }
249
250 /*
251 * The stored query was rewritten at the time of the MV definition, but
252 * has not been scribbled on by the planner.
253 */
254 dataQuery = linitial_node(Query, actions);
255
256 /*
257 * Check for active uses of the relation in the current transaction, such
258 * as open scans.
259 *
260 * NB: We count on this to protect us against problems with refreshing the
261 * data using HEAP_INSERT_FROZEN.
262 */
263 CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
264
265 /*
266 * Tentatively mark the matview as populated or not (this will roll back
267 * if we fail later).
268 */
269 SetMatViewPopulatedState(matviewRel, !stmt->skipData);
270
271 relowner = matviewRel->rd_rel->relowner;
272
273 /*
274 * Switch to the owner's userid, so that any functions are run as that
275 * user. Also arrange to make GUC variable changes local to this command.
276 * Don't lock it down too tight to create a temporary table just yet. We
277 * will switch modes when we are about to execute user code.
278 */
279 GetUserIdAndSecContext(&save_userid, &save_sec_context);
280 SetUserIdAndSecContext(relowner,
281 save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
282 save_nestlevel = NewGUCNestLevel();
283
284 /* Concurrent refresh builds new data in temp tablespace, and does diff. */
285 if (concurrent)
286 {
287 tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP);
288 relpersistence = RELPERSISTENCE_TEMP;
289 }
290 else
291 {
292 tableSpace = matviewRel->rd_rel->reltablespace;
293 relpersistence = matviewRel->rd_rel->relpersistence;
294 }
295
296 /*
297 * Create the transient table that will receive the regenerated data. Lock
298 * it against access by any other process until commit (by which time it
299 * will be gone).
300 */
301 OIDNewHeap = make_new_heap(matviewOid, tableSpace, relpersistence,
302 ExclusiveLock);
303 LockRelationOid(OIDNewHeap, AccessExclusiveLock);
304 dest = CreateTransientRelDestReceiver(OIDNewHeap);
305
306 /*
307 * Now lock down security-restricted operations.
308 */
309 SetUserIdAndSecContext(relowner,
310 save_sec_context | SECURITY_RESTRICTED_OPERATION);
311
312 /* Generate the data, if wanted. */
313 if (!stmt->skipData)
314 processed = refresh_matview_datafill(dest, dataQuery, queryString);
315
316 /* Make the matview match the newly generated data. */
317 if (concurrent)
318 {
319 int old_depth = matview_maintenance_depth;
320
321 PG_TRY();
322 {
323 refresh_by_match_merge(matviewOid, OIDNewHeap, relowner,
324 save_sec_context);
325 }
326 PG_CATCH();
327 {
328 matview_maintenance_depth = old_depth;
329 PG_RE_THROW();
330 }
331 PG_END_TRY();
332 Assert(matview_maintenance_depth == old_depth);
333 }
334 else
335 {
336 refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
337
338 /*
339 * Inform stats collector about our activity: basically, we truncated
340 * the matview and inserted some new data. (The concurrent code path
341 * above doesn't need to worry about this because the inserts and
342 * deletes it issues get counted by lower-level code.)
343 */
344 pgstat_count_truncate(matviewRel);
345 if (!stmt->skipData)
346 pgstat_count_heap_insert(matviewRel, processed);
347 }
348
349 heap_close(matviewRel, NoLock);
350
351 /* Roll back any GUC changes */
352 AtEOXact_GUC(false, save_nestlevel);
353
354 /* Restore userid and security context */
355 SetUserIdAndSecContext(save_userid, save_sec_context);
356
357 ObjectAddressSet(address, RelationRelationId, matviewOid);
358
359 return address;
360 }
361
362 /*
363 * refresh_matview_datafill
364 *
365 * Execute the given query, sending result rows to "dest" (which will
366 * insert them into the target matview).
367 *
368 * Returns number of rows inserted.
369 */
370 static uint64
refresh_matview_datafill(DestReceiver * dest,Query * query,const char * queryString)371 refresh_matview_datafill(DestReceiver *dest, Query *query,
372 const char *queryString)
373 {
374 List *rewritten;
375 PlannedStmt *plan;
376 QueryDesc *queryDesc;
377 Query *copied_query;
378 uint64 processed;
379
380 /* Lock and rewrite, using a copy to preserve the original query. */
381 copied_query = copyObject(query);
382 AcquireRewriteLocks(copied_query, true, false);
383 rewritten = QueryRewrite(copied_query);
384
385 /* SELECT should never rewrite to more or less than one SELECT query */
386 if (list_length(rewritten) != 1)
387 elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
388 query = (Query *) linitial(rewritten);
389
390 /* Check for user-requested abort. */
391 CHECK_FOR_INTERRUPTS();
392
393 /* Plan the query which will generate data for the refresh. */
394 plan = pg_plan_query(query, 0, NULL);
395
396 /*
397 * Use a snapshot with an updated command ID to ensure this query sees
398 * results of any previously executed queries. (This could only matter if
399 * the planner executed an allegedly-stable function that changed the
400 * database contents, but let's do it anyway to be safe.)
401 */
402 PushCopiedSnapshot(GetActiveSnapshot());
403 UpdateActiveSnapshotCommandId();
404
405 /* Create a QueryDesc, redirecting output to our tuple receiver */
406 queryDesc = CreateQueryDesc(plan, queryString,
407 GetActiveSnapshot(), InvalidSnapshot,
408 dest, NULL, NULL, 0);
409
410 /* call ExecutorStart to prepare the plan for execution */
411 ExecutorStart(queryDesc, EXEC_FLAG_WITHOUT_OIDS);
412
413 /* run the plan */
414 ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
415
416 processed = queryDesc->estate->es_processed;
417
418 /* and clean up */
419 ExecutorFinish(queryDesc);
420 ExecutorEnd(queryDesc);
421
422 FreeQueryDesc(queryDesc);
423
424 PopActiveSnapshot();
425
426 return processed;
427 }
428
429 DestReceiver *
CreateTransientRelDestReceiver(Oid transientoid)430 CreateTransientRelDestReceiver(Oid transientoid)
431 {
432 DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel));
433
434 self->pub.receiveSlot = transientrel_receive;
435 self->pub.rStartup = transientrel_startup;
436 self->pub.rShutdown = transientrel_shutdown;
437 self->pub.rDestroy = transientrel_destroy;
438 self->pub.mydest = DestTransientRel;
439 self->transientoid = transientoid;
440
441 return (DestReceiver *) self;
442 }
443
444 /*
445 * transientrel_startup --- executor startup
446 */
447 static void
transientrel_startup(DestReceiver * self,int operation,TupleDesc typeinfo)448 transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
449 {
450 DR_transientrel *myState = (DR_transientrel *) self;
451 Relation transientrel;
452
453 transientrel = heap_open(myState->transientoid, NoLock);
454
455 /*
456 * Fill private fields of myState for use by later routines
457 */
458 myState->transientrel = transientrel;
459 myState->output_cid = GetCurrentCommandId(true);
460
461 /*
462 * We can skip WAL-logging the insertions, unless PITR or streaming
463 * replication is in use. We can skip the FSM in any case.
464 */
465 myState->hi_options = HEAP_INSERT_SKIP_FSM | HEAP_INSERT_FROZEN;
466 if (!XLogIsNeeded())
467 myState->hi_options |= HEAP_INSERT_SKIP_WAL;
468 myState->bistate = GetBulkInsertState();
469
470 /* Not using WAL requires smgr_targblock be initially invalid */
471 Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber);
472 }
473
474 /*
475 * transientrel_receive --- receive one tuple
476 */
477 static bool
transientrel_receive(TupleTableSlot * slot,DestReceiver * self)478 transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
479 {
480 DR_transientrel *myState = (DR_transientrel *) self;
481 HeapTuple tuple;
482
483 /*
484 * get the heap tuple out of the tuple table slot, making sure we have a
485 * writable copy
486 */
487 tuple = ExecMaterializeSlot(slot);
488
489 heap_insert(myState->transientrel,
490 tuple,
491 myState->output_cid,
492 myState->hi_options,
493 myState->bistate);
494
495 /* We know this is a newly created relation, so there are no indexes */
496
497 return true;
498 }
499
500 /*
501 * transientrel_shutdown --- executor end
502 */
503 static void
transientrel_shutdown(DestReceiver * self)504 transientrel_shutdown(DestReceiver *self)
505 {
506 DR_transientrel *myState = (DR_transientrel *) self;
507
508 FreeBulkInsertState(myState->bistate);
509
510 /* If we skipped using WAL, must heap_sync before commit */
511 if (myState->hi_options & HEAP_INSERT_SKIP_WAL)
512 heap_sync(myState->transientrel);
513
514 /* close transientrel, but keep lock until commit */
515 heap_close(myState->transientrel, NoLock);
516 myState->transientrel = NULL;
517 }
518
519 /*
520 * transientrel_destroy --- release DestReceiver object
521 */
522 static void
transientrel_destroy(DestReceiver * self)523 transientrel_destroy(DestReceiver *self)
524 {
525 pfree(self);
526 }
527
528
529 /*
530 * Given a qualified temporary table name, append an underscore followed by
531 * the given integer, to make a new table name based on the old one.
532 * The result is a palloc'd string.
533 *
534 * As coded, this would fail to make a valid SQL name if the given name were,
535 * say, "FOO"."BAR". Currently, the table name portion of the input will
536 * never be double-quoted because it's of the form "pg_temp_NNN", cf
537 * make_new_heap(). But we might have to work harder someday.
538 */
539 static char *
make_temptable_name_n(char * tempname,int n)540 make_temptable_name_n(char *tempname, int n)
541 {
542 StringInfoData namebuf;
543
544 initStringInfo(&namebuf);
545 appendStringInfoString(&namebuf, tempname);
546 appendStringInfo(&namebuf, "_%d", n);
547 return namebuf.data;
548 }
549
550 /*
551 * refresh_by_match_merge
552 *
553 * Refresh a materialized view with transactional semantics, while allowing
554 * concurrent reads.
555 *
556 * This is called after a new version of the data has been created in a
557 * temporary table. It performs a full outer join against the old version of
558 * the data, producing "diff" results. This join cannot work if there are any
559 * duplicated rows in either the old or new versions, in the sense that every
560 * column would compare as equal between the two rows. It does work correctly
561 * in the face of rows which have at least one NULL value, with all non-NULL
562 * columns equal. The behavior of NULLs on equality tests and on UNIQUE
563 * indexes turns out to be quite convenient here; the tests we need to make
564 * are consistent with default behavior. If there is at least one UNIQUE
565 * index on the materialized view, we have exactly the guarantee we need.
566 *
567 * The temporary table used to hold the diff results contains just the TID of
568 * the old record (if matched) and the ROW from the new table as a single
569 * column of complex record type (if matched).
570 *
571 * Once we have the diff table, we perform set-based DELETE and INSERT
572 * operations against the materialized view, and discard both temporary
573 * tables.
574 *
575 * Everything from the generation of the new data to applying the differences
576 * takes place under cover of an ExclusiveLock, since it seems as though we
577 * would want to prohibit not only concurrent REFRESH operations, but also
578 * incremental maintenance. It also doesn't seem reasonable or safe to allow
579 * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
580 * this command.
581 */
582 static void
refresh_by_match_merge(Oid matviewOid,Oid tempOid,Oid relowner,int save_sec_context)583 refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
584 int save_sec_context)
585 {
586 StringInfoData querybuf;
587 Relation matviewRel;
588 Relation tempRel;
589 char *matviewname;
590 char *tempname;
591 char *diffname;
592 TupleDesc tupdesc;
593 bool foundUniqueIndex;
594 List *indexoidlist;
595 ListCell *indexoidscan;
596 int16 relnatts;
597 Oid *opUsedForQual;
598
599 initStringInfo(&querybuf);
600 matviewRel = heap_open(matviewOid, NoLock);
601 matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
602 RelationGetRelationName(matviewRel));
603 tempRel = heap_open(tempOid, NoLock);
604 tempname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(tempRel)),
605 RelationGetRelationName(tempRel));
606 diffname = make_temptable_name_n(tempname, 2);
607
608 relnatts = matviewRel->rd_rel->relnatts;
609
610 /* Open SPI context. */
611 if (SPI_connect() != SPI_OK_CONNECT)
612 elog(ERROR, "SPI_connect failed");
613
614 /* Analyze the temp table with the new contents. */
615 appendStringInfo(&querybuf, "ANALYZE %s", tempname);
616 if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
617 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
618
619 /*
620 * We need to ensure that there are not duplicate rows without NULLs in
621 * the new data set before we can count on the "diff" results. Check for
622 * that in a way that allows showing the first duplicated row found. Even
623 * after we pass this test, a unique index on the materialized view may
624 * find a duplicate key problem.
625 *
626 * Note: here and below, we use "tablename.*::tablerowtype" as a hack to
627 * keep ".*" from being expanded into multiple columns in a SELECT list.
628 * Compare ruleutils.c's get_variable().
629 */
630 resetStringInfo(&querybuf);
631 appendStringInfo(&querybuf,
632 "SELECT newdata.*::%s FROM %s newdata "
633 "WHERE newdata.* IS NOT NULL AND EXISTS "
634 "(SELECT 1 FROM %s newdata2 WHERE newdata2.* IS NOT NULL "
635 "AND newdata2.* OPERATOR(pg_catalog.*=) newdata.* "
636 "AND newdata2.ctid OPERATOR(pg_catalog.<>) "
637 "newdata.ctid)",
638 tempname, tempname, tempname);
639 if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT)
640 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
641 if (SPI_processed > 0)
642 {
643 /*
644 * Note that this ereport() is returning data to the user. Generally,
645 * we would want to make sure that the user has been granted access to
646 * this data. However, REFRESH MAT VIEW is only able to be run by the
647 * owner of the mat view (or a superuser) and therefore there is no
648 * need to check for access to data in the mat view.
649 */
650 ereport(ERROR,
651 (errcode(ERRCODE_CARDINALITY_VIOLATION),
652 errmsg("new data for materialized view \"%s\" contains duplicate rows without any null columns",
653 RelationGetRelationName(matviewRel)),
654 errdetail("Row: %s",
655 SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1))));
656 }
657
658 SetUserIdAndSecContext(relowner,
659 save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
660
661 /* Start building the query for creating the diff table. */
662 resetStringInfo(&querybuf);
663 appendStringInfo(&querybuf,
664 "CREATE TEMP TABLE %s AS "
665 "SELECT mv.ctid AS tid, newdata.*::%s AS newdata "
666 "FROM %s mv FULL JOIN %s newdata ON (",
667 diffname, tempname, matviewname, tempname);
668
669 /*
670 * Get the list of index OIDs for the table from the relcache, and look up
671 * each one in the pg_index syscache. We will test for equality on all
672 * columns present in all unique indexes which only reference columns and
673 * include all rows.
674 */
675 tupdesc = matviewRel->rd_att;
676 opUsedForQual = (Oid *) palloc0(sizeof(Oid) * relnatts);
677 foundUniqueIndex = false;
678
679 indexoidlist = RelationGetIndexList(matviewRel);
680
681 foreach(indexoidscan, indexoidlist)
682 {
683 Oid indexoid = lfirst_oid(indexoidscan);
684 Relation indexRel;
685
686 indexRel = index_open(indexoid, RowExclusiveLock);
687 if (is_usable_unique_index(indexRel))
688 {
689 Form_pg_index indexStruct = indexRel->rd_index;
690 int numatts = indexStruct->indnatts;
691 oidvector *indclass;
692 Datum indclassDatum;
693 bool isnull;
694 int i;
695
696 /* Must get indclass the hard way. */
697 indclassDatum = SysCacheGetAttr(INDEXRELID,
698 indexRel->rd_indextuple,
699 Anum_pg_index_indclass,
700 &isnull);
701 Assert(!isnull);
702 indclass = (oidvector *) DatumGetPointer(indclassDatum);
703
704 /* Add quals for all columns from this index. */
705 for (i = 0; i < numatts; i++)
706 {
707 int attnum = indexStruct->indkey.values[i];
708 Oid opclass = indclass->values[i];
709 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
710 Oid attrtype = attr->atttypid;
711 HeapTuple cla_ht;
712 Form_pg_opclass cla_tup;
713 Oid opfamily;
714 Oid opcintype;
715 Oid op;
716 const char *leftop;
717 const char *rightop;
718
719 /*
720 * Identify the equality operator associated with this index
721 * column. First we need to look up the column's opclass.
722 */
723 cla_ht = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
724 if (!HeapTupleIsValid(cla_ht))
725 elog(ERROR, "cache lookup failed for opclass %u", opclass);
726 cla_tup = (Form_pg_opclass) GETSTRUCT(cla_ht);
727 Assert(cla_tup->opcmethod == BTREE_AM_OID);
728 opfamily = cla_tup->opcfamily;
729 opcintype = cla_tup->opcintype;
730 ReleaseSysCache(cla_ht);
731
732 op = get_opfamily_member(opfamily, opcintype, opcintype,
733 BTEqualStrategyNumber);
734 if (!OidIsValid(op))
735 elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
736 BTEqualStrategyNumber, opcintype, opcintype, opfamily);
737
738 /*
739 * If we find the same column with the same equality semantics
740 * in more than one index, we only need to emit the equality
741 * clause once.
742 *
743 * Since we only remember the last equality operator, this
744 * code could be fooled into emitting duplicate clauses given
745 * multiple indexes with several different opclasses ... but
746 * that's so unlikely it doesn't seem worth spending extra
747 * code to avoid.
748 */
749 if (opUsedForQual[attnum - 1] == op)
750 continue;
751 opUsedForQual[attnum - 1] = op;
752
753 /*
754 * Actually add the qual, ANDed with any others.
755 */
756 if (foundUniqueIndex)
757 appendStringInfoString(&querybuf, " AND ");
758
759 leftop = quote_qualified_identifier("newdata",
760 NameStr(attr->attname));
761 rightop = quote_qualified_identifier("mv",
762 NameStr(attr->attname));
763
764 generate_operator_clause(&querybuf,
765 leftop, attrtype,
766 op,
767 rightop, attrtype);
768
769 foundUniqueIndex = true;
770 }
771 }
772
773 /* Keep the locks, since we're about to run DML which needs them. */
774 index_close(indexRel, NoLock);
775 }
776
777 list_free(indexoidlist);
778
779 /*
780 * There must be at least one usable unique index on the matview.
781 *
782 * ExecRefreshMatView() checks that after taking the exclusive lock on the
783 * matview. So at least one unique index is guaranteed to exist here
784 * because the lock is still being held; so an Assert seems sufficient.
785 */
786 Assert(foundUniqueIndex);
787
788 appendStringInfoString(&querybuf,
789 " AND newdata.* OPERATOR(pg_catalog.*=) mv.*) "
790 "WHERE newdata.* IS NULL OR mv.* IS NULL "
791 "ORDER BY tid");
792
793 /* Create the temporary "diff" table. */
794 if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
795 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
796
797 SetUserIdAndSecContext(relowner,
798 save_sec_context | SECURITY_RESTRICTED_OPERATION);
799
800 /*
801 * We have no further use for data from the "full-data" temp table, but we
802 * must keep it around because its type is referenced from the diff table.
803 */
804
805 /* Analyze the diff table. */
806 resetStringInfo(&querybuf);
807 appendStringInfo(&querybuf, "ANALYZE %s", diffname);
808 if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
809 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
810
811 OpenMatViewIncrementalMaintenance();
812
813 /* Deletes must come before inserts; do them first. */
814 resetStringInfo(&querybuf);
815 appendStringInfo(&querybuf,
816 "DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY "
817 "(SELECT diff.tid FROM %s diff "
818 "WHERE diff.tid IS NOT NULL "
819 "AND diff.newdata IS NULL)",
820 matviewname, diffname);
821 if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
822 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
823
824 /* Inserts go last. */
825 resetStringInfo(&querybuf);
826 appendStringInfo(&querybuf,
827 "INSERT INTO %s SELECT (diff.newdata).* "
828 "FROM %s diff WHERE tid IS NULL",
829 matviewname, diffname);
830 if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
831 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
832
833 /* We're done maintaining the materialized view. */
834 CloseMatViewIncrementalMaintenance();
835 heap_close(tempRel, NoLock);
836 heap_close(matviewRel, NoLock);
837
838 /* Clean up temp tables. */
839 resetStringInfo(&querybuf);
840 appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname);
841 if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
842 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
843
844 /* Close SPI context. */
845 if (SPI_finish() != SPI_OK_FINISH)
846 elog(ERROR, "SPI_finish failed");
847 }
848
849 /*
850 * Swap the physical files of the target and transient tables, then rebuild
851 * the target's indexes and throw away the transient table. Security context
852 * swapping is handled by the called function, so it is not needed here.
853 */
854 static void
refresh_by_heap_swap(Oid matviewOid,Oid OIDNewHeap,char relpersistence)855 refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
856 {
857 finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
858 RecentXmin, ReadNextMultiXactId(), relpersistence);
859 }
860
861 /*
862 * Check whether specified index is usable for match merge.
863 */
864 static bool
is_usable_unique_index(Relation indexRel)865 is_usable_unique_index(Relation indexRel)
866 {
867 Form_pg_index indexStruct = indexRel->rd_index;
868
869 /*
870 * Must be unique, valid, immediate, non-partial, and be defined over
871 * plain user columns (not expressions). We also require it to be a
872 * btree. Even if we had any other unique index kinds, we'd not know how
873 * to identify the corresponding equality operator, nor could we be sure
874 * that the planner could implement the required FULL JOIN with non-btree
875 * operators.
876 */
877 if (indexStruct->indisunique &&
878 indexStruct->indimmediate &&
879 indexRel->rd_rel->relam == BTREE_AM_OID &&
880 IndexIsValid(indexStruct) &&
881 RelationGetIndexPredicate(indexRel) == NIL &&
882 indexStruct->indnatts > 0)
883 {
884 /*
885 * The point of groveling through the index columns individually is to
886 * reject both index expressions and system columns. Currently,
887 * matviews couldn't have OID columns so there's no way to create an
888 * index on a system column; but maybe someday that wouldn't be true,
889 * so let's be safe.
890 */
891 int numatts = indexStruct->indnatts;
892 int i;
893
894 for (i = 0; i < numatts; i++)
895 {
896 int attnum = indexStruct->indkey.values[i];
897
898 if (attnum <= 0)
899 return false;
900 }
901 return true;
902 }
903 return false;
904 }
905
906
907 /*
908 * This should be used to test whether the backend is in a context where it is
909 * OK to allow DML statements to modify materialized views. We only want to
910 * allow that for internal code driven by the materialized view definition,
911 * not for arbitrary user-supplied code.
912 *
913 * While the function names reflect the fact that their main intended use is
914 * incremental maintenance of materialized views (in response to changes to
915 * the data in referenced relations), they are initially used to allow REFRESH
916 * without blocking concurrent reads.
917 */
918 bool
MatViewIncrementalMaintenanceIsEnabled(void)919 MatViewIncrementalMaintenanceIsEnabled(void)
920 {
921 return matview_maintenance_depth > 0;
922 }
923
924 static void
OpenMatViewIncrementalMaintenance(void)925 OpenMatViewIncrementalMaintenance(void)
926 {
927 matview_maintenance_depth++;
928 }
929
930 static void
CloseMatViewIncrementalMaintenance(void)931 CloseMatViewIncrementalMaintenance(void)
932 {
933 matview_maintenance_depth--;
934 Assert(matview_maintenance_depth >= 0);
935 }
936