1 /*-------------------------------------------------------------------------
2 *
3 * matview.c
4 * materialized view support
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 *
10 * IDENTIFICATION
11 * src/backend/commands/matview.c
12 *
13 *-------------------------------------------------------------------------
14 */
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/heapam.h"
19 #include "access/htup_details.h"
20 #include "access/multixact.h"
21 #include "access/tableam.h"
22 #include "access/xact.h"
23 #include "access/xlog.h"
24 #include "catalog/catalog.h"
25 #include "catalog/indexing.h"
26 #include "catalog/namespace.h"
27 #include "catalog/pg_am.h"
28 #include "catalog/pg_opclass.h"
29 #include "catalog/pg_operator.h"
30 #include "commands/cluster.h"
31 #include "commands/matview.h"
32 #include "commands/tablecmds.h"
33 #include "commands/tablespace.h"
34 #include "executor/executor.h"
35 #include "executor/spi.h"
36 #include "miscadmin.h"
37 #include "parser/parse_relation.h"
38 #include "pgstat.h"
39 #include "rewrite/rewriteHandler.h"
40 #include "storage/lmgr.h"
41 #include "storage/smgr.h"
42 #include "tcop/tcopprot.h"
43 #include "utils/builtins.h"
44 #include "utils/lsyscache.h"
45 #include "utils/rel.h"
46 #include "utils/snapmgr.h"
47 #include "utils/syscache.h"
48
49
50 typedef struct
51 {
52 DestReceiver pub; /* publicly-known function pointers */
53 Oid transientoid; /* OID of new heap into which to store */
54 /* These fields are filled by transientrel_startup: */
55 Relation transientrel; /* relation to write to */
56 CommandId output_cid; /* cmin to insert in output tuples */
57 int ti_options; /* table_tuple_insert performance options */
58 BulkInsertState bistate; /* bulk insert state */
59 } DR_transientrel;
60
61 static int matview_maintenance_depth = 0;
62
63 static void transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo);
64 static bool transientrel_receive(TupleTableSlot *slot, DestReceiver *self);
65 static void transientrel_shutdown(DestReceiver *self);
66 static void transientrel_destroy(DestReceiver *self);
67 static uint64 refresh_matview_datafill(DestReceiver *dest, Query *query,
68 const char *queryString);
69 static char *make_temptable_name_n(char *tempname, int n);
70 static void refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
71 int save_sec_context);
72 static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence);
73 static bool is_usable_unique_index(Relation indexRel);
74 static void OpenMatViewIncrementalMaintenance(void);
75 static void CloseMatViewIncrementalMaintenance(void);
76
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result77 /*
78 * SetMatViewPopulatedState
79 * Mark a materialized view as populated, or not.
80 *
81 * NOTE: caller must be holding an appropriate lock on the relation.
82 */
83 void
84 SetMatViewPopulatedState(Relation relation, bool newstate)
85 {
86 Relation pgrel;
87 HeapTuple tuple;
88
89 Assert(relation->rd_rel->relkind == RELKIND_MATVIEW);
90
91 /*
92 * Update relation's pg_class entry. Crucial side-effect: other backends
93 * (and this one too!) are sent SI message to make them rebuild relcache
94 * entries.
95 */
96 pgrel = table_open(RelationRelationId, RowExclusiveLock);
97 tuple = SearchSysCacheCopy1(RELOID,
98 ObjectIdGetDatum(RelationGetRelid(relation)));
99 if (!HeapTupleIsValid(tuple))
100 elog(ERROR, "cache lookup failed for relation %u",
101 RelationGetRelid(relation));
102
103 ((Form_pg_class) GETSTRUCT(tuple))->relispopulated = newstate;
104
105 CatalogTupleUpdate(pgrel, &tuple->t_self, tuple);
106
107 heap_freetuple(tuple);
108 table_close(pgrel, RowExclusiveLock);
109
110 /*
111 * Advance command counter to make the updated pg_class row locally
112 * visible.
113 */
114 CommandCounterIncrement();
115 }
116
117 /*
118 * ExecRefreshMatView -- execute a REFRESH MATERIALIZED VIEW command
finish(&mut self) -> Result<W>119 *
120 * This refreshes the materialized view by creating a new table and swapping
121 * the relfilenodes of the new table and the old materialized view, so the OID
122 * of the original materialized view is preserved. Thus we do not lose GRANT
123 * nor references to this materialized view.
124 *
125 * If WITH NO DATA was specified, this is effectively like a TRUNCATE;
126 * otherwise it is like a TRUNCATE followed by an INSERT using the SELECT
127 * statement associated with the materialized view. The statement node's
128 * skipData field shows whether the clause was used.
129 *
130 * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
131 * the new heap, it's better to create the indexes afterwards than to fill them
132 * incrementally while we load.
133 *
134 * The matview's "populated" state is changed based on whether the contents
135 * reflect the result set of the materialized view's query.
136 */
137 ObjectAddress
138 ExecRefreshMatView(RefreshMatViewStmt *stmt, const char *queryString,
139 ParamListInfo params, char *completionTag)
140 {
141 Oid matviewOid;
142 Relation matviewRel;
143 RewriteRule *rule;
144 List *actions;
145 Query *dataQuery;
146 Oid tableSpace;
147 Oid relowner;
148 Oid OIDNewHeap;
149 DestReceiver *dest;
150 uint64 processed = 0;
151 bool concurrent;
152 LOCKMODE lockmode;
153 char relpersistence;
154 Oid save_userid;
155 int save_sec_context;
156 int save_nestlevel;
157 ObjectAddress address;
158
159 /* Determine strength of lock needed. */
160 concurrent = stmt->concurrent;
161 lockmode = concurrent ? ExclusiveLock : AccessExclusiveLock;
162
163 /*
164 * Get a lock until end of transaction.
165 */
166 matviewOid = RangeVarGetRelidExtended(stmt->relation,
167 lockmode, 0,
168 RangeVarCallbackOwnsTable, NULL);
169 matviewRel = table_open(matviewOid, NoLock);
170
171 /* Make sure it is a materialized view. */
172 if (matviewRel->rd_rel->relkind != RELKIND_MATVIEW)
173 ereport(ERROR,
174 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
175 errmsg("\"%s\" is not a materialized view",
176 RelationGetRelationName(matviewRel))));
177
178 /* Check that CONCURRENTLY is not specified if not populated. */
179 if (concurrent && !RelationIsPopulated(matviewRel))
180 ereport(ERROR,
181 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
182 errmsg("CONCURRENTLY cannot be used when the materialized view is not populated")));
183
184 /* Check that conflicting options have not been specified. */
185 if (concurrent && stmt->skipData)
186 ereport(ERROR,
187 (errcode(ERRCODE_SYNTAX_ERROR),
188 errmsg("CONCURRENTLY and WITH NO DATA options cannot be used together")));
189
190 /*
191 * Check that everything is correct for a refresh. Problems at this point
192 * are internal errors, so elog is sufficient.
193 */
194 if (matviewRel->rd_rel->relhasrules == false ||
195 matviewRel->rd_rules->numLocks < 1)
196 elog(ERROR,
197 "materialized view \"%s\" is missing rewrite information",
198 RelationGetRelationName(matviewRel));
199
200 if (matviewRel->rd_rules->numLocks > 1)
201 elog(ERROR,
202 "materialized view \"%s\" has too many rules",
203 RelationGetRelationName(matviewRel));
204
205 rule = matviewRel->rd_rules->rules[0];
206 if (rule->event != CMD_SELECT || !(rule->isInstead))
207 elog(ERROR,
208 "the rule for materialized view \"%s\" is not a SELECT INSTEAD OF rule",
209 RelationGetRelationName(matviewRel));
210
211 actions = rule->actions;
212 if (list_length(actions) != 1)
213 elog(ERROR,
214 "the rule for materialized view \"%s\" is not a single action",
215 RelationGetRelationName(matviewRel));
216
217 /*
218 * Check that there is a unique index with no WHERE clause on one or more
219 * columns of the materialized view if CONCURRENTLY is specified.
220 */
221 if (concurrent)
222 {
223 List *indexoidlist = RelationGetIndexList(matviewRel);
224 ListCell *indexoidscan;
225 bool hasUniqueIndex = false;
226
227 foreach(indexoidscan, indexoidlist)
228 {
229 Oid indexoid = lfirst_oid(indexoidscan);
230 Relation indexRel;
231
232 indexRel = index_open(indexoid, AccessShareLock);
233 hasUniqueIndex = is_usable_unique_index(indexRel);
234 index_close(indexRel, AccessShareLock);
235 if (hasUniqueIndex)
236 break;
237 }
238
239 list_free(indexoidlist);
240
241 if (!hasUniqueIndex)
242 ereport(ERROR,
243 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
244 errmsg("cannot refresh materialized view \"%s\" concurrently",
245 quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
246 RelationGetRelationName(matviewRel))),
247 errhint("Create a unique index with no WHERE clause on one or more columns of the materialized view.")));
248 }
249
250 /*
251 * The stored query was rewritten at the time of the MV definition, but
252 * has not been scribbled on by the planner.
253 */
254 dataQuery = linitial_node(Query, actions);
255
256 /*
257 * Check for active uses of the relation in the current transaction, such
258 * as open scans.
259 *
260 * NB: We count on this to protect us against problems with refreshing the
261 * data using TABLE_INSERT_FROZEN.
262 */
263 CheckTableNotInUse(matviewRel, "REFRESH MATERIALIZED VIEW");
264
265 /*
266 * Tentatively mark the matview as populated or not (this will roll back
267 * if we fail later).
268 */
269 SetMatViewPopulatedState(matviewRel, !stmt->skipData);
270
271 relowner = matviewRel->rd_rel->relowner;
272
273 /*
274 * Switch to the owner's userid, so that any functions are run as that
275 * user. Also arrange to make GUC variable changes local to this command.
276 * Don't lock it down too tight to create a temporary table just yet. We
277 * will switch modes when we are about to execute user code.
278 */
279 GetUserIdAndSecContext(&save_userid, &save_sec_context);
280 SetUserIdAndSecContext(relowner,
281 save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
282 save_nestlevel = NewGUCNestLevel();
283
284 /* Concurrent refresh builds new data in temp tablespace, and does diff. */
285 if (concurrent)
286 {
287 tableSpace = GetDefaultTablespace(RELPERSISTENCE_TEMP, false);
288 relpersistence = RELPERSISTENCE_TEMP;
289 }
290 else
291 {
292 tableSpace = matviewRel->rd_rel->reltablespace;
293 relpersistence = matviewRel->rd_rel->relpersistence;
294 }
295
296 /*
297 * Create the transient table that will receive the regenerated data. Lock
298 * it against access by any other process until commit (by which time it
299 * will be gone).
300 */
301 OIDNewHeap = make_new_heap(matviewOid, tableSpace, relpersistence,
302 ExclusiveLock);
303 LockRelationOid(OIDNewHeap, AccessExclusiveLock);
304 dest = CreateTransientRelDestReceiver(OIDNewHeap);
305
306 /*
307 * Now lock down security-restricted operations.
308 */
309 SetUserIdAndSecContext(relowner,
310 save_sec_context | SECURITY_RESTRICTED_OPERATION);
311
312 /* Generate the data, if wanted. */
313 if (!stmt->skipData)
314 processed = refresh_matview_datafill(dest, dataQuery, queryString);
315
316 /* Make the matview match the newly generated data. */
317 if (concurrent)
318 {
319 int old_depth = matview_maintenance_depth;
320
321 PG_TRY();
322 {
323 refresh_by_match_merge(matviewOid, OIDNewHeap, relowner,
324 save_sec_context);
325 }
326 PG_CATCH();
327 {
328 matview_maintenance_depth = old_depth;
329 PG_RE_THROW();
330 }
331 PG_END_TRY();
332 Assert(matview_maintenance_depth == old_depth);
333 }
334 else
335 {
336 refresh_by_heap_swap(matviewOid, OIDNewHeap, relpersistence);
337
338 /*
339 * Inform stats collector about our activity: basically, we truncated
340 * the matview and inserted some new data. (The concurrent code path
341 * above doesn't need to worry about this because the inserts and
342 * deletes it issues get counted by lower-level code.)
343 */
344 pgstat_count_truncate(matviewRel);
345 if (!stmt->skipData)
346 pgstat_count_heap_insert(matviewRel, processed);
347 }
348
349 table_close(matviewRel, NoLock);
350
351 /* Roll back any GUC changes */
352 AtEOXact_GUC(false, save_nestlevel);
353
354 /* Restore userid and security context */
355 SetUserIdAndSecContext(save_userid, save_sec_context);
356
357 ObjectAddressSet(address, RelationRelationId, matviewOid);
358
359 return address;
360 }
361
362 /*
363 * refresh_matview_datafill
364 *
flush(&mut self) -> Result<()>365 * Execute the given query, sending result rows to "dest" (which will
366 * insert them into the target matview).
367 *
368 * Returns number of rows inserted.
369 */
370 static uint64
371 refresh_matview_datafill(DestReceiver *dest, Query *query,
372 const char *queryString)
373 {
374 List *rewritten;
375 PlannedStmt *plan;
376 QueryDesc *queryDesc;
377 Query *copied_query;
378 uint64 processed;
379
380 /* Lock and rewrite, using a copy to preserve the original query. */
381 copied_query = copyObject(query);
382 AcquireRewriteLocks(copied_query, true, false);
383 rewritten = QueryRewrite(copied_query);
384
385 /* SELECT should never rewrite to more or less than one SELECT query */
386 if (list_length(rewritten) != 1)
387 elog(ERROR, "unexpected rewrite result for REFRESH MATERIALIZED VIEW");
388 query = (Query *) linitial(rewritten);
389
390 /* Check for user-requested abort. */
391 CHECK_FOR_INTERRUPTS();
392
393 /* Plan the query which will generate data for the refresh. */
394 plan = pg_plan_query(query, 0, NULL);
395
396 /*
397 * Use a snapshot with an updated command ID to ensure this query sees
398 * results of any previously executed queries. (This could only matter if
399 * the planner executed an allegedly-stable function that changed the
400 * database contents, but let's do it anyway to be safe.)
401 */
402 PushCopiedSnapshot(GetActiveSnapshot());
403 UpdateActiveSnapshotCommandId();
404
405 /* Create a QueryDesc, redirecting output to our tuple receiver */
406 queryDesc = CreateQueryDesc(plan, queryString,
407 GetActiveSnapshot(), InvalidSnapshot,
408 dest, NULL, NULL, 0);
409
410 /* call ExecutorStart to prepare the plan for execution */
411 ExecutorStart(queryDesc, 0);
412
413 /* run the plan */
414 ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
415
416 processed = queryDesc->estate->es_processed;
417
418 /* and clean up */
419 ExecutorFinish(queryDesc);
420 ExecutorEnd(queryDesc);
421
422 FreeQueryDesc(queryDesc);
423
424 PopActiveSnapshot();
425
426 return processed;
427 }
428
429 DestReceiver *
430 CreateTransientRelDestReceiver(Oid transientoid)
431 {
432 DR_transientrel *self = (DR_transientrel *) palloc0(sizeof(DR_transientrel));
433
434 self->pub.receiveSlot = transientrel_receive;
435 self->pub.rStartup = transientrel_startup;
436 self->pub.rShutdown = transientrel_shutdown;
437 self->pub.rDestroy = transientrel_destroy;
438 self->pub.mydest = DestTransientRel;
439 self->transientoid = transientoid;
440
441 return (DestReceiver *) self;
442 }
443
444 /*
445 * transientrel_startup --- executor startup
446 */
447 static void
448 transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
449 {
450 DR_transientrel *myState = (DR_transientrel *) self;
451 Relation transientrel;
452
453 transientrel = table_open(myState->transientoid, NoLock);
454
455 /*
456 * Fill private fields of myState for use by later routines
457 */
458 myState->transientrel = transientrel;
459 myState->output_cid = GetCurrentCommandId(true);
460
461 /*
462 * We can skip WAL-logging the insertions, unless PITR or streaming
463 * replication is in use. We can skip the FSM in any case.
464 */
465 myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
466 if (!XLogIsNeeded())
467 myState->ti_options |= TABLE_INSERT_SKIP_WAL;
468 myState->bistate = GetBulkInsertState();
469
470 /* Not using WAL requires smgr_targblock be initially invalid */
471 Assert(RelationGetTargetBlock(transientrel) == InvalidBlockNumber);
472 }
473
474 /*
475 * transientrel_receive --- receive one tuple
476 */
477 static bool
478 transientrel_receive(TupleTableSlot *slot, DestReceiver *self)
479 {
480 DR_transientrel *myState = (DR_transientrel *) self;
481
482 /*
483 * Note that the input slot might not be of the type of the target
484 * relation. That's supported by table_tuple_insert(), but slightly less
485 * efficient than inserting with the right slot - but the alternative
486 * would be to copy into a slot of the right type, which would not be
487 * cheap either. This also doesn't allow accessing per-AM data (say a
488 * tuple's xmin), but since we don't do that here...
489 */
490
491 table_tuple_insert(myState->transientrel,
492 slot,
493 myState->output_cid,
494 myState->ti_options,
495 myState->bistate);
496
497 /* We know this is a newly created relation, so there are no indexes */
498
499 return true;
500 }
501
502 /*
503 * transientrel_shutdown --- executor end
504 */
505 static void
506 transientrel_shutdown(DestReceiver *self)
507 {
508 DR_transientrel *myState = (DR_transientrel *) self;
509
510 FreeBulkInsertState(myState->bistate);
511
512 table_finish_bulk_insert(myState->transientrel, myState->ti_options);
513
514 /* close transientrel, but keep lock until commit */
515 table_close(myState->transientrel, NoLock);
516 myState->transientrel = NULL;
517 }
518
519 /*
520 * transientrel_destroy --- release DestReceiver object
521 */
522 static void
523 transientrel_destroy(DestReceiver *self)
524 {
525 pfree(self);
526 }
527
528
529 /*
530 * Given a qualified temporary table name, append an underscore followed by
531 * the given integer, to make a new table name based on the old one.
532 * The result is a palloc'd string.
533 *
534 * As coded, this would fail to make a valid SQL name if the given name were,
535 * say, "FOO"."BAR". Currently, the table name portion of the input will
536 * never be double-quoted because it's of the form "pg_temp_NNN", cf
537 * make_new_heap(). But we might have to work harder someday.
538 */
539 static char *
540 make_temptable_name_n(char *tempname, int n)
541 {
542 StringInfoData namebuf;
543
544 initStringInfo(&namebuf);
545 appendStringInfoString(&namebuf, tempname);
546 appendStringInfo(&namebuf, "_%d", n);
547 return namebuf.data;
548 }
549
550 /*
551 * refresh_by_match_merge
552 *
553 * Refresh a materialized view with transactional semantics, while allowing
554 * concurrent reads.
555 *
556 * This is called after a new version of the data has been created in a
557 * temporary table. It performs a full outer join against the old version of
558 * the data, producing "diff" results. This join cannot work if there are any
559 * duplicated rows in either the old or new versions, in the sense that every
560 * column would compare as equal between the two rows. It does work correctly
561 * in the face of rows which have at least one NULL value, with all non-NULL
562 * columns equal. The behavior of NULLs on equality tests and on UNIQUE
563 * indexes turns out to be quite convenient here; the tests we need to make
564 * are consistent with default behavior. If there is at least one UNIQUE
565 * index on the materialized view, we have exactly the guarantee we need.
566 *
567 * The temporary table used to hold the diff results contains just the TID of
568 * the old record (if matched) and the ROW from the new table as a single
569 * column of complex record type (if matched).
570 *
571 * Once we have the diff table, we perform set-based DELETE and INSERT
572 * operations against the materialized view, and discard both temporary
573 * tables.
574 *
575 * Everything from the generation of the new data to applying the differences
576 * takes place under cover of an ExclusiveLock, since it seems as though we
577 * would want to prohibit not only concurrent REFRESH operations, but also
578 * incremental maintenance. It also doesn't seem reasonable or safe to allow
579 * SELECT FOR UPDATE or SELECT FOR SHARE on rows being updated or deleted by
580 * this command.
581 */
582 static void
583 refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
584 int save_sec_context)
585 {
586 StringInfoData querybuf;
587 Relation matviewRel;
588 Relation tempRel;
589 char *matviewname;
590 char *tempname;
591 char *diffname;
592 TupleDesc tupdesc;
593 bool foundUniqueIndex;
594 List *indexoidlist;
595 ListCell *indexoidscan;
596 int16 relnatts;
597 Oid *opUsedForQual;
598
599 initStringInfo(&querybuf);
600 matviewRel = table_open(matviewOid, NoLock);
601 matviewname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(matviewRel)),
602 RelationGetRelationName(matviewRel));
603 tempRel = table_open(tempOid, NoLock);
604 tempname = quote_qualified_identifier(get_namespace_name(RelationGetNamespace(tempRel)),
605 RelationGetRelationName(tempRel));
606 diffname = make_temptable_name_n(tempname, 2);
607
608 relnatts = RelationGetNumberOfAttributes(matviewRel);
609
610 /* Open SPI context. */
611 if (SPI_connect() != SPI_OK_CONNECT)
612 elog(ERROR, "SPI_connect failed");
613
614 /* Analyze the temp table with the new contents. */
615 appendStringInfo(&querybuf, "ANALYZE %s", tempname);
616 if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
617 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
618
619 /*
620 * We need to ensure that there are not duplicate rows without NULLs in
621 * the new data set before we can count on the "diff" results. Check for
622 * that in a way that allows showing the first duplicated row found. Even
623 * after we pass this test, a unique index on the materialized view may
624 * find a duplicate key problem.
625 *
626 * Note: here and below, we use "tablename.*::tablerowtype" as a hack to
627 * keep ".*" from being expanded into multiple columns in a SELECT list.
628 * Compare ruleutils.c's get_variable().
629 */
630 resetStringInfo(&querybuf);
631 appendStringInfo(&querybuf,
632 "SELECT newdata.*::%s FROM %s newdata "
633 "WHERE newdata.* IS NOT NULL AND EXISTS "
634 "(SELECT 1 FROM %s newdata2 WHERE newdata2.* IS NOT NULL "
635 "AND newdata2.* OPERATOR(pg_catalog.*=) newdata.* "
636 "AND newdata2.ctid OPERATOR(pg_catalog.<>) "
637 "newdata.ctid)",
638 tempname, tempname, tempname);
639 if (SPI_execute(querybuf.data, false, 1) != SPI_OK_SELECT)
640 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
641 if (SPI_processed > 0)
642 {
643 /*
644 * Note that this ereport() is returning data to the user. Generally,
645 * we would want to make sure that the user has been granted access to
646 * this data. However, REFRESH MAT VIEW is only able to be run by the
647 * owner of the mat view (or a superuser) and therefore there is no
648 * need to check for access to data in the mat view.
649 */
650 ereport(ERROR,
651 (errcode(ERRCODE_CARDINALITY_VIOLATION),
652 errmsg("new data for materialized view \"%s\" contains duplicate rows without any null columns",
653 RelationGetRelationName(matviewRel)),
654 errdetail("Row: %s",
655 SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1))));
656 }
657
658 SetUserIdAndSecContext(relowner,
659 save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
660
661 /* Start building the query for creating the diff table. */
662 resetStringInfo(&querybuf);
663 appendStringInfo(&querybuf,
664 "CREATE TEMP TABLE %s AS "
665 "SELECT mv.ctid AS tid, newdata.*::%s AS newdata "
666 "FROM %s mv FULL JOIN %s newdata ON (",
667 diffname, tempname, matviewname, tempname);
668
669 /*
670 * Get the list of index OIDs for the table from the relcache, and look up
671 * each one in the pg_index syscache. We will test for equality on all
672 * columns present in all unique indexes which only reference columns and
673 * include all rows.
674 */
675 tupdesc = matviewRel->rd_att;
676 opUsedForQual = (Oid *) palloc0(sizeof(Oid) * relnatts);
677 foundUniqueIndex = false;
678
679 indexoidlist = RelationGetIndexList(matviewRel);
680
681 foreach(indexoidscan, indexoidlist)
682 {
683 Oid indexoid = lfirst_oid(indexoidscan);
684 Relation indexRel;
685
686 indexRel = index_open(indexoid, RowExclusiveLock);
687 if (is_usable_unique_index(indexRel))
688 {
689 Form_pg_index indexStruct = indexRel->rd_index;
690 int indnkeyatts = indexStruct->indnkeyatts;
691 oidvector *indclass;
692 Datum indclassDatum;
693 bool isnull;
694 int i;
695
696 /* Must get indclass the hard way. */
697 indclassDatum = SysCacheGetAttr(INDEXRELID,
698 indexRel->rd_indextuple,
699 Anum_pg_index_indclass,
700 &isnull);
701 Assert(!isnull);
702 indclass = (oidvector *) DatumGetPointer(indclassDatum);
703
704 /* Add quals for all columns from this index. */
705 for (i = 0; i < indnkeyatts; i++)
706 {
707 int attnum = indexStruct->indkey.values[i];
708 Oid opclass = indclass->values[i];
709 Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
710 Oid attrtype = attr->atttypid;
711 HeapTuple cla_ht;
712 Form_pg_opclass cla_tup;
713 Oid opfamily;
714 Oid opcintype;
715 Oid op;
716 const char *leftop;
717 const char *rightop;
718
719 /*
720 * Identify the equality operator associated with this index
721 * column. First we need to look up the column's opclass.
722 */
723 cla_ht = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
724 if (!HeapTupleIsValid(cla_ht))
725 elog(ERROR, "cache lookup failed for opclass %u", opclass);
726 cla_tup = (Form_pg_opclass) GETSTRUCT(cla_ht);
727 Assert(cla_tup->opcmethod == BTREE_AM_OID);
728 opfamily = cla_tup->opcfamily;
729 opcintype = cla_tup->opcintype;
730 ReleaseSysCache(cla_ht);
731
732 op = get_opfamily_member(opfamily, opcintype, opcintype,
733 BTEqualStrategyNumber);
734 if (!OidIsValid(op))
735 elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
736 BTEqualStrategyNumber, opcintype, opcintype, opfamily);
737
738 /*
739 * If we find the same column with the same equality semantics
740 * in more than one index, we only need to emit the equality
741 * clause once.
742 *
743 * Since we only remember the last equality operator, this
744 * code could be fooled into emitting duplicate clauses given
745 * multiple indexes with several different opclasses ... but
746 * that's so unlikely it doesn't seem worth spending extra
747 * code to avoid.
748 */
749 if (opUsedForQual[attnum - 1] == op)
750 continue;
751 opUsedForQual[attnum - 1] = op;
752
753 /*
754 * Actually add the qual, ANDed with any others.
755 */
756 if (foundUniqueIndex)
757 appendStringInfoString(&querybuf, " AND ");
758
759 leftop = quote_qualified_identifier("newdata",
760 NameStr(attr->attname));
761 rightop = quote_qualified_identifier("mv",
762 NameStr(attr->attname));
763
764 generate_operator_clause(&querybuf,
765 leftop, attrtype,
766 op,
767 rightop, attrtype);
768
769 foundUniqueIndex = true;
770 }
771 }
772
773 /* Keep the locks, since we're about to run DML which needs them. */
774 index_close(indexRel, NoLock);
775 }
776
777 list_free(indexoidlist);
778
779 /*
780 * There must be at least one usable unique index on the matview.
781 *
782 * ExecRefreshMatView() checks that after taking the exclusive lock on the
783 * matview. So at least one unique index is guaranteed to exist here
784 * because the lock is still being held; so an Assert seems sufficient.
785 */
786 Assert(foundUniqueIndex);
787
788 appendStringInfoString(&querybuf,
789 " AND newdata.* OPERATOR(pg_catalog.*=) mv.*) "
790 "WHERE newdata.* IS NULL OR mv.* IS NULL "
791 "ORDER BY tid");
792
793 /* Create the temporary "diff" table. */
794 if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
795 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
796
797 SetUserIdAndSecContext(relowner,
798 save_sec_context | SECURITY_RESTRICTED_OPERATION);
799
800 /*
801 * We have no further use for data from the "full-data" temp table, but we
802 * must keep it around because its type is referenced from the diff table.
803 */
804
805 /* Analyze the diff table. */
806 resetStringInfo(&querybuf);
807 appendStringInfo(&querybuf, "ANALYZE %s", diffname);
808 if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
809 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
810
811 OpenMatViewIncrementalMaintenance();
812
813 /* Deletes must come before inserts; do them first. */
814 resetStringInfo(&querybuf);
815 appendStringInfo(&querybuf,
816 "DELETE FROM %s mv WHERE ctid OPERATOR(pg_catalog.=) ANY "
817 "(SELECT diff.tid FROM %s diff "
818 "WHERE diff.tid IS NOT NULL "
819 "AND diff.newdata IS NULL)",
820 matviewname, diffname);
821 if (SPI_exec(querybuf.data, 0) != SPI_OK_DELETE)
822 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
823
824 /* Inserts go last. */
825 resetStringInfo(&querybuf);
826 appendStringInfo(&querybuf,
827 "INSERT INTO %s SELECT (diff.newdata).* "
828 "FROM %s diff WHERE tid IS NULL",
829 matviewname, diffname);
830 if (SPI_exec(querybuf.data, 0) != SPI_OK_INSERT)
831 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
832
833 /* We're done maintaining the materialized view. */
834 CloseMatViewIncrementalMaintenance();
835 table_close(tempRel, NoLock);
836 table_close(matviewRel, NoLock);
837
838 /* Clean up temp tables. */
839 resetStringInfo(&querybuf);
840 appendStringInfo(&querybuf, "DROP TABLE %s, %s", diffname, tempname);
841 if (SPI_exec(querybuf.data, 0) != SPI_OK_UTILITY)
842 elog(ERROR, "SPI_exec failed: %s", querybuf.data);
843
844 /* Close SPI context. */
845 if (SPI_finish() != SPI_OK_FINISH)
846 elog(ERROR, "SPI_finish failed");
847 }
848
849 /*
850 * Swap the physical files of the target and transient tables, then rebuild
851 * the target's indexes and throw away the transient table. Security context
852 * swapping is handled by the called function, so it is not needed here.
853 */
854 static void
855 refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence)
856 {
857 finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true,
858 RecentXmin, ReadNextMultiXactId(), relpersistence);
859 }
860
861 /*
862 * Check whether specified index is usable for match merge.
863 */
864 static bool
865 is_usable_unique_index(Relation indexRel)
866 {
867 Form_pg_index indexStruct = indexRel->rd_index;
868
869 /*
870 * Must be unique, valid, immediate, non-partial, and be defined over
871 * plain user columns (not expressions). We also require it to be a
872 * btree. Even if we had any other unique index kinds, we'd not know how
873 * to identify the corresponding equality operator, nor could we be sure
874 * that the planner could implement the required FULL JOIN with non-btree
875 * operators.
876 */
877 if (indexStruct->indisunique &&
878 indexStruct->indimmediate &&
879 indexRel->rd_rel->relam == BTREE_AM_OID &&
880 indexStruct->indisvalid &&
881 RelationGetIndexPredicate(indexRel) == NIL &&
882 indexStruct->indnatts > 0)
883 {
884 /*
885 * The point of groveling through the index columns individually is to
886 * reject both index expressions and system columns. Currently,
887 * matviews couldn't have OID columns so there's no way to create an
888 * index on a system column; but maybe someday that wouldn't be true,
889 * so let's be safe.
890 */
891 int numatts = indexStruct->indnatts;
892 int i;
893
894 for (i = 0; i < numatts; i++)
895 {
896 int attnum = indexStruct->indkey.values[i];
897
898 if (attnum <= 0)
899 return false;
900 }
901 return true;
902 }
903 return false;
904 }
905
906
907 /*
908 * This should be used to test whether the backend is in a context where it is
909 * OK to allow DML statements to modify materialized views. We only want to
910 * allow that for internal code driven by the materialized view definition,
911 * not for arbitrary user-supplied code.
912 *
913 * While the function names reflect the fact that their main intended use is
914 * incremental maintenance of materialized views (in response to changes to
915 * the data in referenced relations), they are initially used to allow REFRESH
916 * without blocking concurrent reads.
917 */
918 bool
919 MatViewIncrementalMaintenanceIsEnabled(void)
920 {
921 return matview_maintenance_depth > 0;
922 }
923
924 static void
925 OpenMatViewIncrementalMaintenance(void)
926 {
927 matview_maintenance_depth++;
928 }
929
930 static void
931 CloseMatViewIncrementalMaintenance(void)
932 {
933 matview_maintenance_depth--;
934 Assert(matview_maintenance_depth >= 0);
935 }
936