1 /*-------------------------------------------------------------------------
2 *
3 * execReplication.c
4 * miscellaneous executor routines for logical replication
5 *
6 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/executor/execReplication.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15 #include "postgres.h"
16
17 #include "access/relscan.h"
18 #include "access/transam.h"
19 #include "access/xact.h"
20 #include "commands/trigger.h"
21 #include "executor/executor.h"
22 #include "nodes/nodeFuncs.h"
23 #include "parser/parse_relation.h"
24 #include "parser/parsetree.h"
25 #include "storage/bufmgr.h"
26 #include "storage/lmgr.h"
27 #include "utils/builtins.h"
28 #include "utils/datum.h"
29 #include "utils/lsyscache.h"
30 #include "utils/memutils.h"
31 #include "utils/rel.h"
32 #include "utils/snapmgr.h"
33 #include "utils/syscache.h"
34 #include "utils/typcache.h"
35 #include "utils/tqual.h"
36
37
38 /*
39 * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
40 * is setup to match 'rel' (*NOT* idxrel!).
41 *
42 * Returns whether any column contains NULLs.
43 *
44 * This is not generic routine, it expects the idxrel to be replication
45 * identity of a rel and meet all limitations associated with that.
46 */
47 static bool
build_replindex_scan_key(ScanKey skey,Relation rel,Relation idxrel,TupleTableSlot * searchslot)48 build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
49 TupleTableSlot *searchslot)
50 {
51 int attoff;
52 bool isnull;
53 Datum indclassDatum;
54 oidvector *opclass;
55 int2vector *indkey = &idxrel->rd_index->indkey;
56 bool hasnulls = false;
57
58 Assert(RelationGetReplicaIndex(rel) == RelationGetRelid(idxrel) ||
59 RelationGetPrimaryKeyIndex(rel) == RelationGetRelid(idxrel));
60
61 indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
62 Anum_pg_index_indclass, &isnull);
63 Assert(!isnull);
64 opclass = (oidvector *) DatumGetPointer(indclassDatum);
65
66 /* Build scankey for every attribute in the index. */
67 for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++)
68 {
69 Oid operator;
70 Oid opfamily;
71 RegProcedure regop;
72 int pkattno = attoff + 1;
73 int mainattno = indkey->values[attoff];
74 Oid optype = get_opclass_input_type(opclass->values[attoff]);
75
76 /*
77 * Load the operator info. We need this to get the equality operator
78 * function for the scan key.
79 */
80 opfamily = get_opclass_family(opclass->values[attoff]);
81
82 operator = get_opfamily_member(opfamily, optype,
83 optype,
84 BTEqualStrategyNumber);
85 if (!OidIsValid(operator))
86 elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
87 BTEqualStrategyNumber, optype, optype, opfamily);
88
89 regop = get_opcode(operator);
90
91 /* Initialize the scankey. */
92 ScanKeyInit(&skey[attoff],
93 pkattno,
94 BTEqualStrategyNumber,
95 regop,
96 searchslot->tts_values[mainattno - 1]);
97
98 /* Check for null value. */
99 if (searchslot->tts_isnull[mainattno - 1])
100 {
101 hasnulls = true;
102 skey[attoff].sk_flags |= SK_ISNULL;
103 }
104 }
105
106 return hasnulls;
107 }
108
109 /*
110 * Search the relation 'rel' for tuple using the index.
111 *
112 * If a matching tuple is found, lock it with lockmode, fill the slot with its
113 * contents, and return true. Return false otherwise.
114 */
115 bool
RelationFindReplTupleByIndex(Relation rel,Oid idxoid,LockTupleMode lockmode,TupleTableSlot * searchslot,TupleTableSlot * outslot)116 RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
117 LockTupleMode lockmode,
118 TupleTableSlot *searchslot,
119 TupleTableSlot *outslot)
120 {
121 HeapTuple scantuple;
122 ScanKeyData skey[INDEX_MAX_KEYS];
123 IndexScanDesc scan;
124 SnapshotData snap;
125 TransactionId xwait;
126 Relation idxrel;
127 bool found;
128
129 /* Open the index. */
130 idxrel = index_open(idxoid, RowExclusiveLock);
131
132 /* Start an index scan. */
133 InitDirtySnapshot(snap);
134 scan = index_beginscan(rel, idxrel, &snap,
135 IndexRelationGetNumberOfKeyAttributes(idxrel),
136 0);
137
138 /* Build scan key. */
139 build_replindex_scan_key(skey, rel, idxrel, searchslot);
140
141 retry:
142 found = false;
143
144 index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0);
145
146 /* Try to find the tuple */
147 if ((scantuple = index_getnext(scan, ForwardScanDirection)) != NULL)
148 {
149 found = true;
150 ExecStoreTuple(scantuple, outslot, InvalidBuffer, false);
151 ExecMaterializeSlot(outslot);
152
153 xwait = TransactionIdIsValid(snap.xmin) ?
154 snap.xmin : snap.xmax;
155
156 /*
157 * If the tuple is locked, wait for locking transaction to finish and
158 * retry.
159 */
160 if (TransactionIdIsValid(xwait))
161 {
162 XactLockTableWait(xwait, NULL, NULL, XLTW_None);
163 goto retry;
164 }
165 }
166
167 /* Found tuple, try to lock it in the lockmode. */
168 if (found)
169 {
170 Buffer buf;
171 HeapUpdateFailureData hufd;
172 HTSU_Result res;
173 HeapTupleData locktup;
174
175 ItemPointerCopy(&outslot->tts_tuple->t_self, &locktup.t_self);
176
177 PushActiveSnapshot(GetLatestSnapshot());
178
179 res = heap_lock_tuple(rel, &locktup, GetCurrentCommandId(false),
180 lockmode,
181 LockWaitBlock,
182 false /* don't follow updates */ ,
183 &buf, &hufd);
184 /* the tuple slot already has the buffer pinned */
185 ReleaseBuffer(buf);
186
187 PopActiveSnapshot();
188
189 switch (res)
190 {
191 case HeapTupleMayBeUpdated:
192 break;
193 case HeapTupleUpdated:
194 /* XXX: Improve handling here */
195 if (ItemPointerIndicatesMovedPartitions(&hufd.ctid))
196 ereport(LOG,
197 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
198 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
199 else
200 ereport(LOG,
201 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
202 errmsg("concurrent update, retrying")));
203 goto retry;
204 case HeapTupleInvisible:
205 elog(ERROR, "attempted to lock invisible tuple");
206 break;
207 default:
208 elog(ERROR, "unexpected heap_lock_tuple status: %u", res);
209 break;
210 }
211 }
212
213 index_endscan(scan);
214
215 /* Don't release lock until commit. */
216 index_close(idxrel, NoLock);
217
218 return found;
219 }
220
221 /*
222 * Compare the tuple and slot and check if they have equal values.
223 */
224 static bool
tuple_equals_slot(TupleDesc desc,HeapTuple tup,TupleTableSlot * slot)225 tuple_equals_slot(TupleDesc desc, HeapTuple tup, TupleTableSlot *slot)
226 {
227 Datum values[MaxTupleAttributeNumber];
228 bool isnull[MaxTupleAttributeNumber];
229 int attrnum;
230
231 heap_deform_tuple(tup, desc, values, isnull);
232
233 /* Check equality of the attributes. */
234 for (attrnum = 0; attrnum < desc->natts; attrnum++)
235 {
236 Form_pg_attribute att;
237 TypeCacheEntry *typentry;
238
239 /*
240 * If one value is NULL and other is not, then they are certainly not
241 * equal
242 */
243 if (isnull[attrnum] != slot->tts_isnull[attrnum])
244 return false;
245
246 /*
247 * If both are NULL, they can be considered equal.
248 */
249 if (isnull[attrnum])
250 continue;
251
252 att = TupleDescAttr(desc, attrnum);
253
254 typentry = lookup_type_cache(att->atttypid, TYPECACHE_EQ_OPR_FINFO);
255 if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
256 ereport(ERROR,
257 (errcode(ERRCODE_UNDEFINED_FUNCTION),
258 errmsg("could not identify an equality operator for type %s",
259 format_type_be(att->atttypid))));
260
261 if (!DatumGetBool(FunctionCall2(&typentry->eq_opr_finfo,
262 values[attrnum],
263 slot->tts_values[attrnum])))
264 return false;
265 }
266
267 return true;
268 }
269
270 /*
271 * Search the relation 'rel' for tuple using the sequential scan.
272 *
273 * If a matching tuple is found, lock it with lockmode, fill the slot with its
274 * contents, and return true. Return false otherwise.
275 *
276 * Note that this stops on the first matching tuple.
277 *
278 * This can obviously be quite slow on tables that have more than few rows.
279 */
280 bool
RelationFindReplTupleSeq(Relation rel,LockTupleMode lockmode,TupleTableSlot * searchslot,TupleTableSlot * outslot)281 RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
282 TupleTableSlot *searchslot, TupleTableSlot *outslot)
283 {
284 HeapTuple scantuple;
285 HeapScanDesc scan;
286 SnapshotData snap;
287 TransactionId xwait;
288 bool found;
289 TupleDesc desc = RelationGetDescr(rel);
290
291 Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
292
293 /* Start a heap scan. */
294 InitDirtySnapshot(snap);
295 scan = heap_beginscan(rel, &snap, 0, NULL);
296
297 retry:
298 found = false;
299
300 heap_rescan(scan, NULL);
301
302 /* Try to find the tuple */
303 while ((scantuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
304 {
305 if (!tuple_equals_slot(desc, scantuple, searchslot))
306 continue;
307
308 found = true;
309 ExecStoreTuple(scantuple, outslot, InvalidBuffer, false);
310 ExecMaterializeSlot(outslot);
311
312 xwait = TransactionIdIsValid(snap.xmin) ?
313 snap.xmin : snap.xmax;
314
315 /*
316 * If the tuple is locked, wait for locking transaction to finish and
317 * retry.
318 */
319 if (TransactionIdIsValid(xwait))
320 {
321 XactLockTableWait(xwait, NULL, NULL, XLTW_None);
322 goto retry;
323 }
324
325 /* Found our tuple and it's not locked */
326 break;
327 }
328
329 /* Found tuple, try to lock it in the lockmode. */
330 if (found)
331 {
332 Buffer buf;
333 HeapUpdateFailureData hufd;
334 HTSU_Result res;
335 HeapTupleData locktup;
336
337 ItemPointerCopy(&outslot->tts_tuple->t_self, &locktup.t_self);
338
339 PushActiveSnapshot(GetLatestSnapshot());
340
341 res = heap_lock_tuple(rel, &locktup, GetCurrentCommandId(false),
342 lockmode,
343 LockWaitBlock,
344 false /* don't follow updates */ ,
345 &buf, &hufd);
346 /* the tuple slot already has the buffer pinned */
347 ReleaseBuffer(buf);
348
349 PopActiveSnapshot();
350
351 switch (res)
352 {
353 case HeapTupleMayBeUpdated:
354 break;
355 case HeapTupleUpdated:
356 /* XXX: Improve handling here */
357 if (ItemPointerIndicatesMovedPartitions(&hufd.ctid))
358 ereport(LOG,
359 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
360 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
361 else
362 ereport(LOG,
363 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
364 errmsg("concurrent update, retrying")));
365 goto retry;
366 case HeapTupleInvisible:
367 elog(ERROR, "attempted to lock invisible tuple");
368 break;
369 default:
370 elog(ERROR, "unexpected heap_lock_tuple status: %u", res);
371 break;
372 }
373 }
374
375 heap_endscan(scan);
376
377 return found;
378 }
379
380 /*
381 * Insert tuple represented in the slot to the relation, update the indexes,
382 * and execute any constraints and per-row triggers.
383 *
384 * Caller is responsible for opening the indexes.
385 */
386 void
ExecSimpleRelationInsert(EState * estate,TupleTableSlot * slot)387 ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
388 {
389 bool skip_tuple = false;
390 HeapTuple tuple;
391 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
392 Relation rel = resultRelInfo->ri_RelationDesc;
393
394 /* For now we support only tables. */
395 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
396
397 CheckCmdReplicaIdentity(rel, CMD_INSERT);
398
399 /* BEFORE ROW INSERT Triggers */
400 if (resultRelInfo->ri_TrigDesc &&
401 resultRelInfo->ri_TrigDesc->trig_insert_before_row)
402 {
403 slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
404
405 if (slot == NULL) /* "do nothing" */
406 skip_tuple = true;
407 }
408
409 if (!skip_tuple)
410 {
411 List *recheckIndexes = NIL;
412
413 /* Check the constraints of the tuple */
414 if (rel->rd_att->constr)
415 ExecConstraints(resultRelInfo, slot, estate);
416 if (resultRelInfo->ri_PartitionCheck)
417 ExecPartitionCheck(resultRelInfo, slot, estate, true);
418
419 /* Materialize slot into a tuple that we can scribble upon. */
420 tuple = ExecMaterializeSlot(slot);
421
422 /* OK, store the tuple and create index entries for it */
423 simple_heap_insert(rel, tuple);
424
425 if (resultRelInfo->ri_NumIndices > 0)
426 recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
427 estate, false, NULL,
428 NIL);
429
430 /* AFTER ROW INSERT Triggers */
431 ExecARInsertTriggers(estate, resultRelInfo, tuple,
432 recheckIndexes, NULL);
433
434 /*
435 * XXX we should in theory pass a TransitionCaptureState object to the
436 * above to capture transition tuples, but after statement triggers
437 * don't actually get fired by replication yet anyway
438 */
439
440 list_free(recheckIndexes);
441 }
442 }
443
444 /*
445 * Find the searchslot tuple and update it with data in the slot,
446 * update the indexes, and execute any constraints and per-row triggers.
447 *
448 * Caller is responsible for opening the indexes.
449 */
450 void
ExecSimpleRelationUpdate(EState * estate,EPQState * epqstate,TupleTableSlot * searchslot,TupleTableSlot * slot)451 ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate,
452 TupleTableSlot *searchslot, TupleTableSlot *slot)
453 {
454 bool skip_tuple = false;
455 HeapTuple tuple;
456 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
457 Relation rel = resultRelInfo->ri_RelationDesc;
458
459 /* For now we support only tables. */
460 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
461
462 CheckCmdReplicaIdentity(rel, CMD_UPDATE);
463
464 /* BEFORE ROW UPDATE Triggers */
465 if (resultRelInfo->ri_TrigDesc &&
466 resultRelInfo->ri_TrigDesc->trig_update_before_row)
467 {
468 slot = ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
469 &searchslot->tts_tuple->t_self,
470 NULL, slot);
471
472 if (slot == NULL) /* "do nothing" */
473 skip_tuple = true;
474 }
475
476 if (!skip_tuple)
477 {
478 List *recheckIndexes = NIL;
479
480 /* Check the constraints of the tuple */
481 if (rel->rd_att->constr)
482 ExecConstraints(resultRelInfo, slot, estate);
483 if (resultRelInfo->ri_PartitionCheck)
484 ExecPartitionCheck(resultRelInfo, slot, estate, true);
485
486 /* Materialize slot into a tuple that we can scribble upon. */
487 tuple = ExecMaterializeSlot(slot);
488
489 /* OK, update the tuple and index entries for it */
490 simple_heap_update(rel, &searchslot->tts_tuple->t_self,
491 slot->tts_tuple);
492
493 if (resultRelInfo->ri_NumIndices > 0 &&
494 !HeapTupleIsHeapOnly(slot->tts_tuple))
495 recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
496 estate, false, NULL,
497 NIL);
498
499 /* AFTER ROW UPDATE Triggers */
500 ExecARUpdateTriggers(estate, resultRelInfo,
501 &searchslot->tts_tuple->t_self,
502 NULL, tuple, recheckIndexes, NULL);
503
504 list_free(recheckIndexes);
505 }
506 }
507
508 /*
509 * Find the searchslot tuple and delete it, and execute any constraints
510 * and per-row triggers.
511 *
512 * Caller is responsible for opening the indexes.
513 */
514 void
ExecSimpleRelationDelete(EState * estate,EPQState * epqstate,TupleTableSlot * searchslot)515 ExecSimpleRelationDelete(EState *estate, EPQState *epqstate,
516 TupleTableSlot *searchslot)
517 {
518 bool skip_tuple = false;
519 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
520 Relation rel = resultRelInfo->ri_RelationDesc;
521
522 /* For now we support only tables. */
523 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
524
525 CheckCmdReplicaIdentity(rel, CMD_DELETE);
526
527 /* BEFORE ROW DELETE Triggers */
528 if (resultRelInfo->ri_TrigDesc &&
529 resultRelInfo->ri_TrigDesc->trig_delete_before_row)
530 {
531 skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
532 &searchslot->tts_tuple->t_self,
533 NULL, NULL);
534 }
535
536 if (!skip_tuple)
537 {
538 List *recheckIndexes = NIL;
539
540 /* OK, delete the tuple */
541 simple_heap_delete(rel, &searchslot->tts_tuple->t_self);
542
543 /* AFTER ROW DELETE Triggers */
544 ExecARDeleteTriggers(estate, resultRelInfo,
545 &searchslot->tts_tuple->t_self, NULL, NULL);
546
547 list_free(recheckIndexes);
548 }
549 }
550
551 /*
552 * Check if command can be executed with current replica identity.
553 */
554 void
CheckCmdReplicaIdentity(Relation rel,CmdType cmd)555 CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
556 {
557 PublicationActions *pubactions;
558
559 /* We only need to do checks for UPDATE and DELETE. */
560 if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
561 return;
562
563 /* If relation has replica identity we are always good. */
564 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
565 OidIsValid(RelationGetReplicaIndex(rel)))
566 return;
567
568 /*
569 * This is either UPDATE OR DELETE and there is no replica identity.
570 *
571 * Check if the table publishes UPDATES or DELETES.
572 */
573 pubactions = GetRelationPublicationActions(rel);
574 if (cmd == CMD_UPDATE && pubactions->pubupdate)
575 ereport(ERROR,
576 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
577 errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
578 RelationGetRelationName(rel)),
579 errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
580 else if (cmd == CMD_DELETE && pubactions->pubdelete)
581 ereport(ERROR,
582 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
583 errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
584 RelationGetRelationName(rel)),
585 errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
586 }
587
588
589 /*
590 * Check if we support writing into specific relkind.
591 *
592 * The nspname and relname are only needed for error reporting.
593 */
594 void
CheckSubscriptionRelkind(char relkind,const char * nspname,const char * relname)595 CheckSubscriptionRelkind(char relkind, const char *nspname,
596 const char *relname)
597 {
598 /*
599 * We currently only support writing to regular tables.
600 */
601 if (relkind != RELKIND_RELATION)
602 ereport(ERROR,
603 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
604 errmsg("logical replication target relation \"%s.%s\" is not a table",
605 nspname, relname)));
606 }
607