1 /*-------------------------------------------------------------------------
2 *
3 * execReplication.c
4 * miscellaneous executor routines for logical replication
5 *
6 * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/executor/execReplication.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/relscan.h"
19 #include "access/tableam.h"
20 #include "access/transam.h"
21 #include "access/xact.h"
22 #include "commands/trigger.h"
23 #include "executor/executor.h"
24 #include "executor/nodeModifyTable.h"
25 #include "nodes/nodeFuncs.h"
26 #include "parser/parse_relation.h"
27 #include "parser/parsetree.h"
28 #include "storage/bufmgr.h"
29 #include "storage/lmgr.h"
30 #include "utils/builtins.h"
31 #include "utils/datum.h"
32 #include "utils/lsyscache.h"
33 #include "utils/memutils.h"
34 #include "utils/rel.h"
35 #include "utils/snapmgr.h"
36 #include "utils/syscache.h"
37 #include "utils/typcache.h"
38
39
40 /*
41 * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
42 * is setup to match 'rel' (*NOT* idxrel!).
43 *
44 * Returns whether any column contains NULLs.
45 *
46 * This is not generic routine, it expects the idxrel to be replication
47 * identity of a rel and meet all limitations associated with that.
48 */
49 static bool
build_replindex_scan_key(ScanKey skey,Relation rel,Relation idxrel,TupleTableSlot * searchslot)50 build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
51 TupleTableSlot *searchslot)
52 {
53 int attoff;
54 bool isnull;
55 Datum indclassDatum;
56 oidvector *opclass;
57 int2vector *indkey = &idxrel->rd_index->indkey;
58 bool hasnulls = false;
59
60 Assert(RelationGetReplicaIndex(rel) == RelationGetRelid(idxrel) ||
61 RelationGetPrimaryKeyIndex(rel) == RelationGetRelid(idxrel));
62
63 indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
64 Anum_pg_index_indclass, &isnull);
65 Assert(!isnull);
66 opclass = (oidvector *) DatumGetPointer(indclassDatum);
67
68 /* Build scankey for every attribute in the index. */
69 for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++)
70 {
71 Oid operator;
72 Oid opfamily;
73 RegProcedure regop;
74 int pkattno = attoff + 1;
75 int mainattno = indkey->values[attoff];
76 Oid optype = get_opclass_input_type(opclass->values[attoff]);
77
78 /*
79 * Load the operator info. We need this to get the equality operator
80 * function for the scan key.
81 */
82 opfamily = get_opclass_family(opclass->values[attoff]);
83
84 operator = get_opfamily_member(opfamily, optype,
85 optype,
86 BTEqualStrategyNumber);
87 if (!OidIsValid(operator))
88 elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
89 BTEqualStrategyNumber, optype, optype, opfamily);
90
91 regop = get_opcode(operator);
92
93 /* Initialize the scankey. */
94 ScanKeyInit(&skey[attoff],
95 pkattno,
96 BTEqualStrategyNumber,
97 regop,
98 searchslot->tts_values[mainattno - 1]);
99
100 skey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
101
102 /* Check for null value. */
103 if (searchslot->tts_isnull[mainattno - 1])
104 {
105 hasnulls = true;
106 skey[attoff].sk_flags |= SK_ISNULL;
107 }
108 }
109
110 return hasnulls;
111 }
112
113 /*
114 * Search the relation 'rel' for tuple using the index.
115 *
116 * If a matching tuple is found, lock it with lockmode, fill the slot with its
117 * contents, and return true. Return false otherwise.
118 */
119 bool
RelationFindReplTupleByIndex(Relation rel,Oid idxoid,LockTupleMode lockmode,TupleTableSlot * searchslot,TupleTableSlot * outslot)120 RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
121 LockTupleMode lockmode,
122 TupleTableSlot *searchslot,
123 TupleTableSlot *outslot)
124 {
125 ScanKeyData skey[INDEX_MAX_KEYS];
126 IndexScanDesc scan;
127 SnapshotData snap;
128 TransactionId xwait;
129 Relation idxrel;
130 bool found;
131
132 /* Open the index. */
133 idxrel = index_open(idxoid, RowExclusiveLock);
134
135 /* Start an index scan. */
136 InitDirtySnapshot(snap);
137 scan = index_beginscan(rel, idxrel, &snap,
138 IndexRelationGetNumberOfKeyAttributes(idxrel),
139 0);
140
141 /* Build scan key. */
142 build_replindex_scan_key(skey, rel, idxrel, searchslot);
143
144 retry:
145 found = false;
146
147 index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0);
148
149 /* Try to find the tuple */
150 if (index_getnext_slot(scan, ForwardScanDirection, outslot))
151 {
152 found = true;
153 ExecMaterializeSlot(outslot);
154
155 xwait = TransactionIdIsValid(snap.xmin) ?
156 snap.xmin : snap.xmax;
157
158 /*
159 * If the tuple is locked, wait for locking transaction to finish and
160 * retry.
161 */
162 if (TransactionIdIsValid(xwait))
163 {
164 XactLockTableWait(xwait, NULL, NULL, XLTW_None);
165 goto retry;
166 }
167 }
168
169 /* Found tuple, try to lock it in the lockmode. */
170 if (found)
171 {
172 TM_FailureData tmfd;
173 TM_Result res;
174
175 PushActiveSnapshot(GetLatestSnapshot());
176
177 res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
178 outslot,
179 GetCurrentCommandId(false),
180 lockmode,
181 LockWaitBlock,
182 0 /* don't follow updates */ ,
183 &tmfd);
184
185 PopActiveSnapshot();
186
187 switch (res)
188 {
189 case TM_Ok:
190 break;
191 case TM_Updated:
192 /* XXX: Improve handling here */
193 if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
194 ereport(LOG,
195 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
196 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
197 else
198 ereport(LOG,
199 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
200 errmsg("concurrent update, retrying")));
201 goto retry;
202 case TM_Deleted:
203 /* XXX: Improve handling here */
204 ereport(LOG,
205 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
206 errmsg("concurrent delete, retrying")));
207 goto retry;
208 case TM_Invisible:
209 elog(ERROR, "attempted to lock invisible tuple");
210 break;
211 default:
212 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
213 break;
214 }
215 }
216
217 index_endscan(scan);
218
219 /* Don't release lock until commit. */
220 index_close(idxrel, NoLock);
221
222 return found;
223 }
224
225 /*
226 * Compare the tuples in the slots by checking if they have equal values.
227 */
228 static bool
tuples_equal(TupleTableSlot * slot1,TupleTableSlot * slot2,TypeCacheEntry ** eq)229 tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
230 TypeCacheEntry **eq)
231 {
232 int attrnum;
233
234 Assert(slot1->tts_tupleDescriptor->natts ==
235 slot2->tts_tupleDescriptor->natts);
236
237 slot_getallattrs(slot1);
238 slot_getallattrs(slot2);
239
240 /* Check equality of the attributes. */
241 for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
242 {
243 Form_pg_attribute att;
244 TypeCacheEntry *typentry;
245
246 /*
247 * If one value is NULL and other is not, then they are certainly not
248 * equal
249 */
250 if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
251 return false;
252
253 /*
254 * If both are NULL, they can be considered equal.
255 */
256 if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
257 continue;
258
259 att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
260
261 typentry = eq[attrnum];
262 if (typentry == NULL)
263 {
264 typentry = lookup_type_cache(att->atttypid,
265 TYPECACHE_EQ_OPR_FINFO);
266 if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
267 ereport(ERROR,
268 (errcode(ERRCODE_UNDEFINED_FUNCTION),
269 errmsg("could not identify an equality operator for type %s",
270 format_type_be(att->atttypid))));
271 eq[attrnum] = typentry;
272 }
273
274 if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
275 att->attcollation,
276 slot1->tts_values[attrnum],
277 slot2->tts_values[attrnum])))
278 return false;
279 }
280
281 return true;
282 }
283
284 /*
285 * Search the relation 'rel' for tuple using the sequential scan.
286 *
287 * If a matching tuple is found, lock it with lockmode, fill the slot with its
288 * contents, and return true. Return false otherwise.
289 *
290 * Note that this stops on the first matching tuple.
291 *
292 * This can obviously be quite slow on tables that have more than few rows.
293 */
294 bool
RelationFindReplTupleSeq(Relation rel,LockTupleMode lockmode,TupleTableSlot * searchslot,TupleTableSlot * outslot)295 RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
296 TupleTableSlot *searchslot, TupleTableSlot *outslot)
297 {
298 TupleTableSlot *scanslot;
299 TableScanDesc scan;
300 SnapshotData snap;
301 TypeCacheEntry **eq;
302 TransactionId xwait;
303 bool found;
304 TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
305
306 Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
307
308 eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
309
310 /* Start a heap scan. */
311 InitDirtySnapshot(snap);
312 scan = table_beginscan(rel, &snap, 0, NULL);
313 scanslot = table_slot_create(rel, NULL);
314
315 retry:
316 found = false;
317
318 table_rescan(scan, NULL);
319
320 /* Try to find the tuple */
321 while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
322 {
323 if (!tuples_equal(scanslot, searchslot, eq))
324 continue;
325
326 found = true;
327 ExecCopySlot(outslot, scanslot);
328
329 xwait = TransactionIdIsValid(snap.xmin) ?
330 snap.xmin : snap.xmax;
331
332 /*
333 * If the tuple is locked, wait for locking transaction to finish and
334 * retry.
335 */
336 if (TransactionIdIsValid(xwait))
337 {
338 XactLockTableWait(xwait, NULL, NULL, XLTW_None);
339 goto retry;
340 }
341
342 /* Found our tuple and it's not locked */
343 break;
344 }
345
346 /* Found tuple, try to lock it in the lockmode. */
347 if (found)
348 {
349 TM_FailureData tmfd;
350 TM_Result res;
351
352 PushActiveSnapshot(GetLatestSnapshot());
353
354 res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
355 outslot,
356 GetCurrentCommandId(false),
357 lockmode,
358 LockWaitBlock,
359 0 /* don't follow updates */ ,
360 &tmfd);
361
362 PopActiveSnapshot();
363
364 switch (res)
365 {
366 case TM_Ok:
367 break;
368 case TM_Updated:
369 /* XXX: Improve handling here */
370 if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
371 ereport(LOG,
372 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
373 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
374 else
375 ereport(LOG,
376 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
377 errmsg("concurrent update, retrying")));
378 goto retry;
379 case TM_Deleted:
380 /* XXX: Improve handling here */
381 ereport(LOG,
382 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
383 errmsg("concurrent delete, retrying")));
384 goto retry;
385 case TM_Invisible:
386 elog(ERROR, "attempted to lock invisible tuple");
387 break;
388 default:
389 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
390 break;
391 }
392 }
393
394 table_endscan(scan);
395 ExecDropSingleTupleTableSlot(scanslot);
396
397 return found;
398 }
399
400 /*
401 * Insert tuple represented in the slot to the relation, update the indexes,
402 * and execute any constraints and per-row triggers.
403 *
404 * Caller is responsible for opening the indexes.
405 */
406 void
ExecSimpleRelationInsert(EState * estate,TupleTableSlot * slot)407 ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
408 {
409 bool skip_tuple = false;
410 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
411 Relation rel = resultRelInfo->ri_RelationDesc;
412
413 /* For now we support only tables. */
414 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
415
416 CheckCmdReplicaIdentity(rel, CMD_INSERT);
417
418 /* BEFORE ROW INSERT Triggers */
419 if (resultRelInfo->ri_TrigDesc &&
420 resultRelInfo->ri_TrigDesc->trig_insert_before_row)
421 {
422 if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
423 skip_tuple = true; /* "do nothing" */
424 }
425
426 if (!skip_tuple)
427 {
428 List *recheckIndexes = NIL;
429
430 /* Compute stored generated columns */
431 if (rel->rd_att->constr &&
432 rel->rd_att->constr->has_generated_stored)
433 ExecComputeStoredGenerated(estate, slot, CMD_INSERT);
434
435 /* Check the constraints of the tuple */
436 if (rel->rd_att->constr)
437 ExecConstraints(resultRelInfo, slot, estate);
438 if (resultRelInfo->ri_PartitionCheck)
439 ExecPartitionCheck(resultRelInfo, slot, estate, true);
440
441 /* OK, store the tuple and create index entries for it */
442 simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
443
444 if (resultRelInfo->ri_NumIndices > 0)
445 recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL,
446 NIL);
447
448 /* AFTER ROW INSERT Triggers */
449 ExecARInsertTriggers(estate, resultRelInfo, slot,
450 recheckIndexes, NULL);
451
452 /*
453 * XXX we should in theory pass a TransitionCaptureState object to the
454 * above to capture transition tuples, but after statement triggers
455 * don't actually get fired by replication yet anyway
456 */
457
458 list_free(recheckIndexes);
459 }
460 }
461
462 /*
463 * Find the searchslot tuple and update it with data in the slot,
464 * update the indexes, and execute any constraints and per-row triggers.
465 *
466 * Caller is responsible for opening the indexes.
467 */
468 void
ExecSimpleRelationUpdate(EState * estate,EPQState * epqstate,TupleTableSlot * searchslot,TupleTableSlot * slot)469 ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate,
470 TupleTableSlot *searchslot, TupleTableSlot *slot)
471 {
472 bool skip_tuple = false;
473 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
474 Relation rel = resultRelInfo->ri_RelationDesc;
475 ItemPointer tid = &(searchslot->tts_tid);
476
477 /* For now we support only tables. */
478 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
479
480 CheckCmdReplicaIdentity(rel, CMD_UPDATE);
481
482 /* BEFORE ROW UPDATE Triggers */
483 if (resultRelInfo->ri_TrigDesc &&
484 resultRelInfo->ri_TrigDesc->trig_update_before_row)
485 {
486 if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
487 tid, NULL, slot))
488 skip_tuple = true; /* "do nothing" */
489 }
490
491 if (!skip_tuple)
492 {
493 List *recheckIndexes = NIL;
494 bool update_indexes;
495
496 /* Compute stored generated columns */
497 if (rel->rd_att->constr &&
498 rel->rd_att->constr->has_generated_stored)
499 ExecComputeStoredGenerated(estate, slot, CMD_UPDATE);
500
501 /* Check the constraints of the tuple */
502 if (rel->rd_att->constr)
503 ExecConstraints(resultRelInfo, slot, estate);
504 if (resultRelInfo->ri_PartitionCheck)
505 ExecPartitionCheck(resultRelInfo, slot, estate, true);
506
507 simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
508 &update_indexes);
509
510 if (resultRelInfo->ri_NumIndices > 0 && update_indexes)
511 recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL,
512 NIL);
513
514 /* AFTER ROW UPDATE Triggers */
515 ExecARUpdateTriggers(estate, resultRelInfo,
516 tid, NULL, slot,
517 recheckIndexes, NULL);
518
519 list_free(recheckIndexes);
520 }
521 }
522
523 /*
524 * Find the searchslot tuple and delete it, and execute any constraints
525 * and per-row triggers.
526 *
527 * Caller is responsible for opening the indexes.
528 */
529 void
ExecSimpleRelationDelete(EState * estate,EPQState * epqstate,TupleTableSlot * searchslot)530 ExecSimpleRelationDelete(EState *estate, EPQState *epqstate,
531 TupleTableSlot *searchslot)
532 {
533 bool skip_tuple = false;
534 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
535 Relation rel = resultRelInfo->ri_RelationDesc;
536 ItemPointer tid = &searchslot->tts_tid;
537
538 CheckCmdReplicaIdentity(rel, CMD_DELETE);
539
540 /* BEFORE ROW DELETE Triggers */
541 if (resultRelInfo->ri_TrigDesc &&
542 resultRelInfo->ri_TrigDesc->trig_delete_before_row)
543 {
544 skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
545 tid, NULL, NULL);
546
547 }
548
549 if (!skip_tuple)
550 {
551 /* OK, delete the tuple */
552 simple_table_tuple_delete(rel, tid, estate->es_snapshot);
553
554 /* AFTER ROW DELETE Triggers */
555 ExecARDeleteTriggers(estate, resultRelInfo,
556 tid, NULL, NULL);
557 }
558 }
559
560 /*
561 * Check if command can be executed with current replica identity.
562 */
563 void
CheckCmdReplicaIdentity(Relation rel,CmdType cmd)564 CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
565 {
566 PublicationActions *pubactions;
567
568 /* We only need to do checks for UPDATE and DELETE. */
569 if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
570 return;
571
572 /* If relation has replica identity we are always good. */
573 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
574 OidIsValid(RelationGetReplicaIndex(rel)))
575 return;
576
577 /*
578 * This is either UPDATE OR DELETE and there is no replica identity.
579 *
580 * Check if the table publishes UPDATES or DELETES.
581 */
582 pubactions = GetRelationPublicationActions(rel);
583 if (cmd == CMD_UPDATE && pubactions->pubupdate)
584 ereport(ERROR,
585 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
586 errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
587 RelationGetRelationName(rel)),
588 errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
589 else if (cmd == CMD_DELETE && pubactions->pubdelete)
590 ereport(ERROR,
591 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
592 errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
593 RelationGetRelationName(rel)),
594 errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
595 }
596
597
598 /*
599 * Check if we support writing into specific relkind.
600 *
601 * The nspname and relname are only needed for error reporting.
602 */
603 void
CheckSubscriptionRelkind(char relkind,const char * nspname,const char * relname)604 CheckSubscriptionRelkind(char relkind, const char *nspname,
605 const char *relname)
606 {
607 /*
608 * Give a more specific error for foreign tables.
609 */
610 if (relkind == RELKIND_FOREIGN_TABLE)
611 ereport(ERROR,
612 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
613 errmsg("cannot use relation \"%s.%s\" as logical replication target",
614 nspname, relname),
615 errdetail("\"%s.%s\" is a foreign table.",
616 nspname, relname)));
617
618 if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
619 ereport(ERROR,
620 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
621 errmsg("cannot use relation \"%s.%s\" as logical replication target",
622 nspname, relname),
623 errdetail("\"%s.%s\" is not a table.",
624 nspname, relname)));
625 }
626