1 /*-------------------------------------------------------------------------
2 *
3 * execReplication.c
4 * miscellaneous executor routines for logical replication
5 *
6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/executor/execReplication.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15 #include "postgres.h"
16
17 #include "access/genam.h"
18 #include "access/relscan.h"
19 #include "access/tableam.h"
20 #include "access/transam.h"
21 #include "access/xact.h"
22 #include "commands/trigger.h"
23 #include "executor/executor.h"
24 #include "executor/nodeModifyTable.h"
25 #include "nodes/nodeFuncs.h"
26 #include "parser/parse_relation.h"
27 #include "parser/parsetree.h"
28 #include "storage/bufmgr.h"
29 #include "storage/lmgr.h"
30 #include "utils/builtins.h"
31 #include "utils/datum.h"
32 #include "utils/lsyscache.h"
33 #include "utils/memutils.h"
34 #include "utils/rel.h"
35 #include "utils/snapmgr.h"
36 #include "utils/syscache.h"
37 #include "utils/typcache.h"
38
39
40 /*
41 * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
42 * is setup to match 'rel' (*NOT* idxrel!).
43 *
44 * Returns whether any column contains NULLs.
45 *
46 * This is not generic routine, it expects the idxrel to be replication
47 * identity of a rel and meet all limitations associated with that.
48 */
49 static bool
build_replindex_scan_key(ScanKey skey,Relation rel,Relation idxrel,TupleTableSlot * searchslot)50 build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
51 TupleTableSlot *searchslot)
52 {
53 int attoff;
54 bool isnull;
55 Datum indclassDatum;
56 oidvector *opclass;
57 int2vector *indkey = &idxrel->rd_index->indkey;
58 bool hasnulls = false;
59
60 Assert(RelationGetReplicaIndex(rel) == RelationGetRelid(idxrel) ||
61 RelationGetPrimaryKeyIndex(rel) == RelationGetRelid(idxrel));
62
63 indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
64 Anum_pg_index_indclass, &isnull);
65 Assert(!isnull);
66 opclass = (oidvector *) DatumGetPointer(indclassDatum);
67
68 /* Build scankey for every attribute in the index. */
69 for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++)
70 {
71 Oid operator;
72 Oid opfamily;
73 RegProcedure regop;
74 int pkattno = attoff + 1;
75 int mainattno = indkey->values[attoff];
76 Oid optype = get_opclass_input_type(opclass->values[attoff]);
77
78 /*
79 * Load the operator info. We need this to get the equality operator
80 * function for the scan key.
81 */
82 opfamily = get_opclass_family(opclass->values[attoff]);
83
84 operator = get_opfamily_member(opfamily, optype,
85 optype,
86 BTEqualStrategyNumber);
87 if (!OidIsValid(operator))
88 elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
89 BTEqualStrategyNumber, optype, optype, opfamily);
90
91 regop = get_opcode(operator);
92
93 /* Initialize the scankey. */
94 ScanKeyInit(&skey[attoff],
95 pkattno,
96 BTEqualStrategyNumber,
97 regop,
98 searchslot->tts_values[mainattno - 1]);
99
100 skey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
101
102 /* Check for null value. */
103 if (searchslot->tts_isnull[mainattno - 1])
104 {
105 hasnulls = true;
106 skey[attoff].sk_flags |= SK_ISNULL;
107 }
108 }
109
110 return hasnulls;
111 }
112
113 /*
114 * Search the relation 'rel' for tuple using the index.
115 *
116 * If a matching tuple is found, lock it with lockmode, fill the slot with its
117 * contents, and return true. Return false otherwise.
118 */
119 bool
RelationFindReplTupleByIndex(Relation rel,Oid idxoid,LockTupleMode lockmode,TupleTableSlot * searchslot,TupleTableSlot * outslot)120 RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
121 LockTupleMode lockmode,
122 TupleTableSlot *searchslot,
123 TupleTableSlot *outslot)
124 {
125 ScanKeyData skey[INDEX_MAX_KEYS];
126 IndexScanDesc scan;
127 SnapshotData snap;
128 TransactionId xwait;
129 Relation idxrel;
130 bool found;
131
132 /* Open the index. */
133 idxrel = index_open(idxoid, RowExclusiveLock);
134
135 /* Start an index scan. */
136 InitDirtySnapshot(snap);
137 scan = index_beginscan(rel, idxrel, &snap,
138 IndexRelationGetNumberOfKeyAttributes(idxrel),
139 0);
140
141 /* Build scan key. */
142 build_replindex_scan_key(skey, rel, idxrel, searchslot);
143
144 retry:
145 found = false;
146
147 index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0);
148
149 /* Try to find the tuple */
150 if (index_getnext_slot(scan, ForwardScanDirection, outslot))
151 {
152 found = true;
153 ExecMaterializeSlot(outslot);
154
155 xwait = TransactionIdIsValid(snap.xmin) ?
156 snap.xmin : snap.xmax;
157
158 /*
159 * If the tuple is locked, wait for locking transaction to finish and
160 * retry.
161 */
162 if (TransactionIdIsValid(xwait))
163 {
164 XactLockTableWait(xwait, NULL, NULL, XLTW_None);
165 goto retry;
166 }
167 }
168
169 /* Found tuple, try to lock it in the lockmode. */
170 if (found)
171 {
172 TM_FailureData tmfd;
173 TM_Result res;
174
175 PushActiveSnapshot(GetLatestSnapshot());
176
177 res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
178 outslot,
179 GetCurrentCommandId(false),
180 lockmode,
181 LockWaitBlock,
182 0 /* don't follow updates */ ,
183 &tmfd);
184
185 PopActiveSnapshot();
186
187 switch (res)
188 {
189 case TM_Ok:
190 break;
191 case TM_Updated:
192 /* XXX: Improve handling here */
193 if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
194 ereport(LOG,
195 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
196 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
197 else
198 ereport(LOG,
199 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
200 errmsg("concurrent update, retrying")));
201 goto retry;
202 case TM_Deleted:
203 /* XXX: Improve handling here */
204 ereport(LOG,
205 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
206 errmsg("concurrent delete, retrying")));
207 goto retry;
208 case TM_Invisible:
209 elog(ERROR, "attempted to lock invisible tuple");
210 break;
211 default:
212 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
213 break;
214 }
215 }
216
217 index_endscan(scan);
218
219 /* Don't release lock until commit. */
220 index_close(idxrel, NoLock);
221
222 return found;
223 }
224
225 /*
226 * Compare the tuples in the slots by checking if they have equal values.
227 */
228 static bool
tuples_equal(TupleTableSlot * slot1,TupleTableSlot * slot2)229 tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2)
230 {
231 int attrnum;
232
233 Assert(slot1->tts_tupleDescriptor->natts ==
234 slot2->tts_tupleDescriptor->natts);
235
236 slot_getallattrs(slot1);
237 slot_getallattrs(slot2);
238
239 /* Check equality of the attributes. */
240 for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
241 {
242 Form_pg_attribute att;
243 TypeCacheEntry *typentry;
244
245 /*
246 * If one value is NULL and other is not, then they are certainly not
247 * equal
248 */
249 if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
250 return false;
251
252 /*
253 * If both are NULL, they can be considered equal.
254 */
255 if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
256 continue;
257
258 att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
259
260 typentry = lookup_type_cache(att->atttypid, TYPECACHE_EQ_OPR_FINFO);
261 if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
262 ereport(ERROR,
263 (errcode(ERRCODE_UNDEFINED_FUNCTION),
264 errmsg("could not identify an equality operator for type %s",
265 format_type_be(att->atttypid))));
266
267 if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
268 att->attcollation,
269 slot1->tts_values[attrnum],
270 slot2->tts_values[attrnum])))
271 return false;
272 }
273
274 return true;
275 }
276
277 /*
278 * Search the relation 'rel' for tuple using the sequential scan.
279 *
280 * If a matching tuple is found, lock it with lockmode, fill the slot with its
281 * contents, and return true. Return false otherwise.
282 *
283 * Note that this stops on the first matching tuple.
284 *
285 * This can obviously be quite slow on tables that have more than few rows.
286 */
287 bool
RelationFindReplTupleSeq(Relation rel,LockTupleMode lockmode,TupleTableSlot * searchslot,TupleTableSlot * outslot)288 RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
289 TupleTableSlot *searchslot, TupleTableSlot *outslot)
290 {
291 TupleTableSlot *scanslot;
292 TableScanDesc scan;
293 SnapshotData snap;
294 TransactionId xwait;
295 bool found;
296 TupleDesc desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
297
298 Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
299
300 /* Start a heap scan. */
301 InitDirtySnapshot(snap);
302 scan = table_beginscan(rel, &snap, 0, NULL);
303 scanslot = table_slot_create(rel, NULL);
304
305 retry:
306 found = false;
307
308 table_rescan(scan, NULL);
309
310 /* Try to find the tuple */
311 while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
312 {
313 if (!tuples_equal(scanslot, searchslot))
314 continue;
315
316 found = true;
317 ExecCopySlot(outslot, scanslot);
318
319 xwait = TransactionIdIsValid(snap.xmin) ?
320 snap.xmin : snap.xmax;
321
322 /*
323 * If the tuple is locked, wait for locking transaction to finish and
324 * retry.
325 */
326 if (TransactionIdIsValid(xwait))
327 {
328 XactLockTableWait(xwait, NULL, NULL, XLTW_None);
329 goto retry;
330 }
331
332 /* Found our tuple and it's not locked */
333 break;
334 }
335
336 /* Found tuple, try to lock it in the lockmode. */
337 if (found)
338 {
339 TM_FailureData tmfd;
340 TM_Result res;
341
342 PushActiveSnapshot(GetLatestSnapshot());
343
344 res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
345 outslot,
346 GetCurrentCommandId(false),
347 lockmode,
348 LockWaitBlock,
349 0 /* don't follow updates */ ,
350 &tmfd);
351
352 PopActiveSnapshot();
353
354 switch (res)
355 {
356 case TM_Ok:
357 break;
358 case TM_Updated:
359 /* XXX: Improve handling here */
360 if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
361 ereport(LOG,
362 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
363 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
364 else
365 ereport(LOG,
366 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
367 errmsg("concurrent update, retrying")));
368 goto retry;
369 case TM_Deleted:
370 /* XXX: Improve handling here */
371 ereport(LOG,
372 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
373 errmsg("concurrent delete, retrying")));
374 goto retry;
375 case TM_Invisible:
376 elog(ERROR, "attempted to lock invisible tuple");
377 break;
378 default:
379 elog(ERROR, "unexpected table_tuple_lock status: %u", res);
380 break;
381 }
382 }
383
384 table_endscan(scan);
385 ExecDropSingleTupleTableSlot(scanslot);
386
387 return found;
388 }
389
390 /*
391 * Insert tuple represented in the slot to the relation, update the indexes,
392 * and execute any constraints and per-row triggers.
393 *
394 * Caller is responsible for opening the indexes.
395 */
396 void
ExecSimpleRelationInsert(EState * estate,TupleTableSlot * slot)397 ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
398 {
399 bool skip_tuple = false;
400 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
401 Relation rel = resultRelInfo->ri_RelationDesc;
402
403 /* For now we support only tables. */
404 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
405
406 CheckCmdReplicaIdentity(rel, CMD_INSERT);
407
408 /* BEFORE ROW INSERT Triggers */
409 if (resultRelInfo->ri_TrigDesc &&
410 resultRelInfo->ri_TrigDesc->trig_insert_before_row)
411 {
412 if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
413 skip_tuple = true; /* "do nothing" */
414 }
415
416 if (!skip_tuple)
417 {
418 List *recheckIndexes = NIL;
419
420 /* Compute stored generated columns */
421 if (rel->rd_att->constr &&
422 rel->rd_att->constr->has_generated_stored)
423 ExecComputeStoredGenerated(estate, slot);
424
425 /* Check the constraints of the tuple */
426 if (rel->rd_att->constr)
427 ExecConstraints(resultRelInfo, slot, estate);
428 if (resultRelInfo->ri_PartitionCheck)
429 ExecPartitionCheck(resultRelInfo, slot, estate, true);
430
431 /* OK, store the tuple and create index entries for it */
432 simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
433
434 if (resultRelInfo->ri_NumIndices > 0)
435 recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL,
436 NIL);
437
438 /* AFTER ROW INSERT Triggers */
439 ExecARInsertTriggers(estate, resultRelInfo, slot,
440 recheckIndexes, NULL);
441
442 /*
443 * XXX we should in theory pass a TransitionCaptureState object to the
444 * above to capture transition tuples, but after statement triggers
445 * don't actually get fired by replication yet anyway
446 */
447
448 list_free(recheckIndexes);
449 }
450 }
451
452 /*
453 * Find the searchslot tuple and update it with data in the slot,
454 * update the indexes, and execute any constraints and per-row triggers.
455 *
456 * Caller is responsible for opening the indexes.
457 */
458 void
ExecSimpleRelationUpdate(EState * estate,EPQState * epqstate,TupleTableSlot * searchslot,TupleTableSlot * slot)459 ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate,
460 TupleTableSlot *searchslot, TupleTableSlot *slot)
461 {
462 bool skip_tuple = false;
463 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
464 Relation rel = resultRelInfo->ri_RelationDesc;
465 ItemPointer tid = &(searchslot->tts_tid);
466
467 /* For now we support only tables. */
468 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
469
470 CheckCmdReplicaIdentity(rel, CMD_UPDATE);
471
472 /* BEFORE ROW UPDATE Triggers */
473 if (resultRelInfo->ri_TrigDesc &&
474 resultRelInfo->ri_TrigDesc->trig_update_before_row)
475 {
476 if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
477 tid, NULL, slot))
478 skip_tuple = true; /* "do nothing" */
479 }
480
481 if (!skip_tuple)
482 {
483 List *recheckIndexes = NIL;
484 bool update_indexes;
485
486 /* Compute stored generated columns */
487 if (rel->rd_att->constr &&
488 rel->rd_att->constr->has_generated_stored)
489 ExecComputeStoredGenerated(estate, slot);
490
491 /* Check the constraints of the tuple */
492 if (rel->rd_att->constr)
493 ExecConstraints(resultRelInfo, slot, estate);
494 if (resultRelInfo->ri_PartitionCheck)
495 ExecPartitionCheck(resultRelInfo, slot, estate, true);
496
497 simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
498 &update_indexes);
499
500 if (resultRelInfo->ri_NumIndices > 0 && update_indexes)
501 recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL,
502 NIL);
503
504 /* AFTER ROW UPDATE Triggers */
505 ExecARUpdateTriggers(estate, resultRelInfo,
506 tid, NULL, slot,
507 recheckIndexes, NULL);
508
509 list_free(recheckIndexes);
510 }
511 }
512
513 /*
514 * Find the searchslot tuple and delete it, and execute any constraints
515 * and per-row triggers.
516 *
517 * Caller is responsible for opening the indexes.
518 */
519 void
ExecSimpleRelationDelete(EState * estate,EPQState * epqstate,TupleTableSlot * searchslot)520 ExecSimpleRelationDelete(EState *estate, EPQState *epqstate,
521 TupleTableSlot *searchslot)
522 {
523 bool skip_tuple = false;
524 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
525 Relation rel = resultRelInfo->ri_RelationDesc;
526 ItemPointer tid = &searchslot->tts_tid;
527
528 CheckCmdReplicaIdentity(rel, CMD_DELETE);
529
530 /* BEFORE ROW DELETE Triggers */
531 if (resultRelInfo->ri_TrigDesc &&
532 resultRelInfo->ri_TrigDesc->trig_delete_before_row)
533 {
534 skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
535 tid, NULL, NULL);
536
537 }
538
539 if (!skip_tuple)
540 {
541 /* OK, delete the tuple */
542 simple_table_tuple_delete(rel, tid, estate->es_snapshot);
543
544 /* AFTER ROW DELETE Triggers */
545 ExecARDeleteTriggers(estate, resultRelInfo,
546 tid, NULL, NULL);
547 }
548 }
549
550 /*
551 * Check if command can be executed with current replica identity.
552 */
553 void
CheckCmdReplicaIdentity(Relation rel,CmdType cmd)554 CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
555 {
556 PublicationActions *pubactions;
557
558 /* We only need to do checks for UPDATE and DELETE. */
559 if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
560 return;
561
562 /* If relation has replica identity we are always good. */
563 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
564 OidIsValid(RelationGetReplicaIndex(rel)))
565 return;
566
567 /*
568 * This is either UPDATE OR DELETE and there is no replica identity.
569 *
570 * Check if the table publishes UPDATES or DELETES.
571 */
572 pubactions = GetRelationPublicationActions(rel);
573 if (cmd == CMD_UPDATE && pubactions->pubupdate)
574 ereport(ERROR,
575 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
576 errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
577 RelationGetRelationName(rel)),
578 errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
579 else if (cmd == CMD_DELETE && pubactions->pubdelete)
580 ereport(ERROR,
581 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
582 errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
583 RelationGetRelationName(rel)),
584 errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
585 }
586
587
588 /*
589 * Check if we support writing into specific relkind.
590 *
591 * The nspname and relname are only needed for error reporting.
592 */
593 void
CheckSubscriptionRelkind(char relkind,const char * nspname,const char * relname)594 CheckSubscriptionRelkind(char relkind, const char *nspname,
595 const char *relname)
596 {
597 /*
598 * We currently only support writing to regular tables. However, give a
599 * more specific error for partitioned and foreign tables.
600 */
601 if (relkind == RELKIND_PARTITIONED_TABLE)
602 ereport(ERROR,
603 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
604 errmsg("cannot use relation \"%s.%s\" as logical replication target",
605 nspname, relname),
606 errdetail("\"%s.%s\" is a partitioned table.",
607 nspname, relname)));
608 else if (relkind == RELKIND_FOREIGN_TABLE)
609 ereport(ERROR,
610 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
611 errmsg("cannot use relation \"%s.%s\" as logical replication target",
612 nspname, relname),
613 errdetail("\"%s.%s\" is a foreign table.",
614 nspname, relname)));
615
616 if (relkind != RELKIND_RELATION)
617 ereport(ERROR,
618 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
619 errmsg("cannot use relation \"%s.%s\" as logical replication target",
620 nspname, relname),
621 errdetail("\"%s.%s\" is not a table.",
622 nspname, relname)));
623 }
624