1 /*-------------------------------------------------------------------------
2 *
3 * execReplication.c
4 * miscellaneous executor routines for logical replication
5 *
6 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
8 *
9 * IDENTIFICATION
10 * src/backend/executor/execReplication.c
11 *
12 *-------------------------------------------------------------------------
13 */
14
15 #include "postgres.h"
16
17 #include "access/relscan.h"
18 #include "access/transam.h"
19 #include "access/xact.h"
20 #include "commands/trigger.h"
21 #include "executor/executor.h"
22 #include "nodes/nodeFuncs.h"
23 #include "parser/parse_relation.h"
24 #include "parser/parsetree.h"
25 #include "storage/bufmgr.h"
26 #include "storage/lmgr.h"
27 #include "utils/builtins.h"
28 #include "utils/datum.h"
29 #include "utils/lsyscache.h"
30 #include "utils/memutils.h"
31 #include "utils/rel.h"
32 #include "utils/snapmgr.h"
33 #include "utils/syscache.h"
34 #include "utils/typcache.h"
35 #include "utils/tqual.h"
36
37
38 /*
39 * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
40 * is setup to match 'rel' (*NOT* idxrel!).
41 *
42 * Returns whether any column contains NULLs.
43 *
44 * This is not generic routine, it expects the idxrel to be replication
45 * identity of a rel and meet all limitations associated with that.
46 */
47 static bool
build_replindex_scan_key(ScanKey skey,Relation rel,Relation idxrel,TupleTableSlot * searchslot)48 build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
49 TupleTableSlot *searchslot)
50 {
51 int attoff;
52 bool isnull;
53 Datum indclassDatum;
54 oidvector *opclass;
55 int2vector *indkey = &idxrel->rd_index->indkey;
56 bool hasnulls = false;
57
58 Assert(RelationGetReplicaIndex(rel) == RelationGetRelid(idxrel) ||
59 RelationGetPrimaryKeyIndex(rel) == RelationGetRelid(idxrel));
60
61 indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
62 Anum_pg_index_indclass, &isnull);
63 Assert(!isnull);
64 opclass = (oidvector *) DatumGetPointer(indclassDatum);
65
66 /* Build scankey for every attribute in the index. */
67 for (attoff = 0; attoff < RelationGetNumberOfAttributes(idxrel); attoff++)
68 {
69 Oid operator;
70 Oid opfamily;
71 RegProcedure regop;
72 int pkattno = attoff + 1;
73 int mainattno = indkey->values[attoff];
74 Oid optype = get_opclass_input_type(opclass->values[attoff]);
75
76 /*
77 * Load the operator info. We need this to get the equality operator
78 * function for the scan key.
79 */
80 opfamily = get_opclass_family(opclass->values[attoff]);
81
82 operator = get_opfamily_member(opfamily, optype,
83 optype,
84 BTEqualStrategyNumber);
85 if (!OidIsValid(operator))
86 elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
87 BTEqualStrategyNumber, optype, optype, opfamily);
88
89 regop = get_opcode(operator);
90
91 /* Initialize the scankey. */
92 ScanKeyInit(&skey[attoff],
93 pkattno,
94 BTEqualStrategyNumber,
95 regop,
96 searchslot->tts_values[mainattno - 1]);
97
98 /* Check for null value. */
99 if (searchslot->tts_isnull[mainattno - 1])
100 {
101 hasnulls = true;
102 skey[attoff].sk_flags |= SK_ISNULL;
103 }
104 }
105
106 return hasnulls;
107 }
108
109 /*
110 * Search the relation 'rel' for tuple using the index.
111 *
112 * If a matching tuple is found, lock it with lockmode, fill the slot with its
113 * contents, and return true. Return false otherwise.
114 */
115 bool
RelationFindReplTupleByIndex(Relation rel,Oid idxoid,LockTupleMode lockmode,TupleTableSlot * searchslot,TupleTableSlot * outslot)116 RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
117 LockTupleMode lockmode,
118 TupleTableSlot *searchslot,
119 TupleTableSlot *outslot)
120 {
121 HeapTuple scantuple;
122 ScanKeyData skey[INDEX_MAX_KEYS];
123 IndexScanDesc scan;
124 SnapshotData snap;
125 TransactionId xwait;
126 Relation idxrel;
127 bool found;
128
129 /* Open the index. */
130 idxrel = index_open(idxoid, RowExclusiveLock);
131
132 /* Start an index scan. */
133 InitDirtySnapshot(snap);
134 scan = index_beginscan(rel, idxrel, &snap,
135 RelationGetNumberOfAttributes(idxrel),
136 0);
137
138 /* Build scan key. */
139 build_replindex_scan_key(skey, rel, idxrel, searchslot);
140
141 retry:
142 found = false;
143
144 index_rescan(scan, skey, RelationGetNumberOfAttributes(idxrel), NULL, 0);
145
146 /* Try to find the tuple */
147 if ((scantuple = index_getnext(scan, ForwardScanDirection)) != NULL)
148 {
149 found = true;
150 ExecStoreTuple(scantuple, outslot, InvalidBuffer, false);
151 ExecMaterializeSlot(outslot);
152
153 xwait = TransactionIdIsValid(snap.xmin) ?
154 snap.xmin : snap.xmax;
155
156 /*
157 * If the tuple is locked, wait for locking transaction to finish and
158 * retry.
159 */
160 if (TransactionIdIsValid(xwait))
161 {
162 XactLockTableWait(xwait, NULL, NULL, XLTW_None);
163 goto retry;
164 }
165 }
166
167 /* Found tuple, try to lock it in the lockmode. */
168 if (found)
169 {
170 Buffer buf;
171 HeapUpdateFailureData hufd;
172 HTSU_Result res;
173 HeapTupleData locktup;
174
175 ItemPointerCopy(&outslot->tts_tuple->t_self, &locktup.t_self);
176
177 PushActiveSnapshot(GetLatestSnapshot());
178
179 res = heap_lock_tuple(rel, &locktup, GetCurrentCommandId(false),
180 lockmode,
181 LockWaitBlock,
182 false /* don't follow updates */ ,
183 &buf, &hufd);
184 /* the tuple slot already has the buffer pinned */
185 ReleaseBuffer(buf);
186
187 PopActiveSnapshot();
188
189 switch (res)
190 {
191 case HeapTupleMayBeUpdated:
192 break;
193 case HeapTupleUpdated:
194 /* XXX: Improve handling here */
195 ereport(LOG,
196 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
197 errmsg("concurrent update, retrying")));
198 goto retry;
199 case HeapTupleInvisible:
200 elog(ERROR, "attempted to lock invisible tuple");
201 default:
202 elog(ERROR, "unexpected heap_lock_tuple status: %u", res);
203 break;
204 }
205 }
206
207 index_endscan(scan);
208
209 /* Don't release lock until commit. */
210 index_close(idxrel, NoLock);
211
212 return found;
213 }
214
215 /*
216 * Compare the tuple and slot and check if they have equal values.
217 */
218 static bool
tuple_equals_slot(TupleDesc desc,HeapTuple tup,TupleTableSlot * slot)219 tuple_equals_slot(TupleDesc desc, HeapTuple tup, TupleTableSlot *slot)
220 {
221 Datum values[MaxTupleAttributeNumber];
222 bool isnull[MaxTupleAttributeNumber];
223 int attrnum;
224
225 heap_deform_tuple(tup, desc, values, isnull);
226
227 /* Check equality of the attributes. */
228 for (attrnum = 0; attrnum < desc->natts; attrnum++)
229 {
230 Form_pg_attribute att;
231 TypeCacheEntry *typentry;
232
233 /*
234 * If one value is NULL and other is not, then they are certainly not
235 * equal
236 */
237 if (isnull[attrnum] != slot->tts_isnull[attrnum])
238 return false;
239
240 /*
241 * If both are NULL, they can be considered equal.
242 */
243 if (isnull[attrnum])
244 continue;
245
246 att = desc->attrs[attrnum];
247
248 typentry = lookup_type_cache(att->atttypid, TYPECACHE_EQ_OPR_FINFO);
249 if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
250 ereport(ERROR,
251 (errcode(ERRCODE_UNDEFINED_FUNCTION),
252 errmsg("could not identify an equality operator for type %s",
253 format_type_be(att->atttypid))));
254
255 if (!DatumGetBool(FunctionCall2(&typentry->eq_opr_finfo,
256 values[attrnum],
257 slot->tts_values[attrnum])))
258 return false;
259 }
260
261 return true;
262 }
263
264 /*
265 * Search the relation 'rel' for tuple using the sequential scan.
266 *
267 * If a matching tuple is found, lock it with lockmode, fill the slot with its
268 * contents, and return true. Return false otherwise.
269 *
270 * Note that this stops on the first matching tuple.
271 *
272 * This can obviously be quite slow on tables that have more than few rows.
273 */
274 bool
RelationFindReplTupleSeq(Relation rel,LockTupleMode lockmode,TupleTableSlot * searchslot,TupleTableSlot * outslot)275 RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
276 TupleTableSlot *searchslot, TupleTableSlot *outslot)
277 {
278 HeapTuple scantuple;
279 HeapScanDesc scan;
280 SnapshotData snap;
281 TransactionId xwait;
282 bool found;
283 TupleDesc desc = RelationGetDescr(rel);
284
285 Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
286
287 /* Start a heap scan. */
288 InitDirtySnapshot(snap);
289 scan = heap_beginscan(rel, &snap, 0, NULL);
290
291 retry:
292 found = false;
293
294 heap_rescan(scan, NULL);
295
296 /* Try to find the tuple */
297 while ((scantuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
298 {
299 if (!tuple_equals_slot(desc, scantuple, searchslot))
300 continue;
301
302 found = true;
303 ExecStoreTuple(scantuple, outslot, InvalidBuffer, false);
304 ExecMaterializeSlot(outslot);
305
306 xwait = TransactionIdIsValid(snap.xmin) ?
307 snap.xmin : snap.xmax;
308
309 /*
310 * If the tuple is locked, wait for locking transaction to finish and
311 * retry.
312 */
313 if (TransactionIdIsValid(xwait))
314 {
315 XactLockTableWait(xwait, NULL, NULL, XLTW_None);
316 goto retry;
317 }
318
319 /* Found our tuple and it's not locked */
320 break;
321 }
322
323 /* Found tuple, try to lock it in the lockmode. */
324 if (found)
325 {
326 Buffer buf;
327 HeapUpdateFailureData hufd;
328 HTSU_Result res;
329 HeapTupleData locktup;
330
331 ItemPointerCopy(&outslot->tts_tuple->t_self, &locktup.t_self);
332
333 PushActiveSnapshot(GetLatestSnapshot());
334
335 res = heap_lock_tuple(rel, &locktup, GetCurrentCommandId(false),
336 lockmode,
337 LockWaitBlock,
338 false /* don't follow updates */ ,
339 &buf, &hufd);
340 /* the tuple slot already has the buffer pinned */
341 ReleaseBuffer(buf);
342
343 PopActiveSnapshot();
344
345 switch (res)
346 {
347 case HeapTupleMayBeUpdated:
348 break;
349 case HeapTupleUpdated:
350 /* XXX: Improve handling here */
351 ereport(LOG,
352 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
353 errmsg("concurrent update, retrying")));
354 goto retry;
355 case HeapTupleInvisible:
356 elog(ERROR, "attempted to lock invisible tuple");
357 default:
358 elog(ERROR, "unexpected heap_lock_tuple status: %u", res);
359 break;
360 }
361 }
362
363 heap_endscan(scan);
364
365 return found;
366 }
367
368 /*
369 * Insert tuple represented in the slot to the relation, update the indexes,
370 * and execute any constraints and per-row triggers.
371 *
372 * Caller is responsible for opening the indexes.
373 */
374 void
ExecSimpleRelationInsert(EState * estate,TupleTableSlot * slot)375 ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
376 {
377 bool skip_tuple = false;
378 HeapTuple tuple;
379 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
380 Relation rel = resultRelInfo->ri_RelationDesc;
381
382 /* For now we support only tables. */
383 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
384
385 CheckCmdReplicaIdentity(rel, CMD_INSERT);
386
387 /* BEFORE ROW INSERT Triggers */
388 if (resultRelInfo->ri_TrigDesc &&
389 resultRelInfo->ri_TrigDesc->trig_insert_before_row)
390 {
391 slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
392
393 if (slot == NULL) /* "do nothing" */
394 skip_tuple = true;
395 }
396
397 if (!skip_tuple)
398 {
399 List *recheckIndexes = NIL;
400
401 /* Check the constraints of the tuple */
402 if (rel->rd_att->constr)
403 ExecConstraints(resultRelInfo, slot, estate);
404
405 /* Materialize slot into a tuple that we can scribble upon. */
406 tuple = ExecMaterializeSlot(slot);
407
408 /* OK, store the tuple and create index entries for it */
409 simple_heap_insert(rel, tuple);
410
411 if (resultRelInfo->ri_NumIndices > 0)
412 recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
413 estate, false, NULL,
414 NIL);
415
416 /* AFTER ROW INSERT Triggers */
417 ExecARInsertTriggers(estate, resultRelInfo, tuple,
418 recheckIndexes, NULL);
419
420 /*
421 * XXX we should in theory pass a TransitionCaptureState object to the
422 * above to capture transition tuples, but after statement triggers
423 * don't actually get fired by replication yet anyway
424 */
425
426 list_free(recheckIndexes);
427 }
428 }
429
430 /*
431 * Find the searchslot tuple and update it with data in the slot,
432 * update the indexes, and execute any constraints and per-row triggers.
433 *
434 * Caller is responsible for opening the indexes.
435 */
436 void
ExecSimpleRelationUpdate(EState * estate,EPQState * epqstate,TupleTableSlot * searchslot,TupleTableSlot * slot)437 ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate,
438 TupleTableSlot *searchslot, TupleTableSlot *slot)
439 {
440 bool skip_tuple = false;
441 HeapTuple tuple;
442 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
443 Relation rel = resultRelInfo->ri_RelationDesc;
444
445 /* For now we support only tables. */
446 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
447
448 CheckCmdReplicaIdentity(rel, CMD_UPDATE);
449
450 /* BEFORE ROW UPDATE Triggers */
451 if (resultRelInfo->ri_TrigDesc &&
452 resultRelInfo->ri_TrigDesc->trig_update_before_row)
453 {
454 slot = ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
455 &searchslot->tts_tuple->t_self,
456 NULL, slot);
457
458 if (slot == NULL) /* "do nothing" */
459 skip_tuple = true;
460 }
461
462 if (!skip_tuple)
463 {
464 List *recheckIndexes = NIL;
465
466 /* Check the constraints of the tuple */
467 if (rel->rd_att->constr)
468 ExecConstraints(resultRelInfo, slot, estate);
469
470 /* Materialize slot into a tuple that we can scribble upon. */
471 tuple = ExecMaterializeSlot(slot);
472
473 /* OK, update the tuple and index entries for it */
474 simple_heap_update(rel, &searchslot->tts_tuple->t_self,
475 slot->tts_tuple);
476
477 if (resultRelInfo->ri_NumIndices > 0 &&
478 !HeapTupleIsHeapOnly(slot->tts_tuple))
479 recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
480 estate, false, NULL,
481 NIL);
482
483 /* AFTER ROW UPDATE Triggers */
484 ExecARUpdateTriggers(estate, resultRelInfo,
485 &searchslot->tts_tuple->t_self,
486 NULL, tuple, recheckIndexes, NULL);
487
488 list_free(recheckIndexes);
489 }
490 }
491
492 /*
493 * Find the searchslot tuple and delete it, and execute any constraints
494 * and per-row triggers.
495 *
496 * Caller is responsible for opening the indexes.
497 */
498 void
ExecSimpleRelationDelete(EState * estate,EPQState * epqstate,TupleTableSlot * searchslot)499 ExecSimpleRelationDelete(EState *estate, EPQState *epqstate,
500 TupleTableSlot *searchslot)
501 {
502 bool skip_tuple = false;
503 ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
504 Relation rel = resultRelInfo->ri_RelationDesc;
505
506 /* For now we support only tables. */
507 Assert(rel->rd_rel->relkind == RELKIND_RELATION);
508
509 CheckCmdReplicaIdentity(rel, CMD_DELETE);
510
511 /* BEFORE ROW DELETE Triggers */
512 if (resultRelInfo->ri_TrigDesc &&
513 resultRelInfo->ri_TrigDesc->trig_delete_before_row)
514 {
515 skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
516 &searchslot->tts_tuple->t_self,
517 NULL);
518 }
519
520 if (!skip_tuple)
521 {
522 List *recheckIndexes = NIL;
523
524 /* OK, delete the tuple */
525 simple_heap_delete(rel, &searchslot->tts_tuple->t_self);
526
527 /* AFTER ROW DELETE Triggers */
528 ExecARDeleteTriggers(estate, resultRelInfo,
529 &searchslot->tts_tuple->t_self, NULL, NULL);
530
531 list_free(recheckIndexes);
532 }
533 }
534
535 /*
536 * Check if command can be executed with current replica identity.
537 */
538 void
CheckCmdReplicaIdentity(Relation rel,CmdType cmd)539 CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
540 {
541 PublicationActions *pubactions;
542
543 /* We only need to do checks for UPDATE and DELETE. */
544 if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
545 return;
546
547 /* If relation has replica identity we are always good. */
548 if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
549 OidIsValid(RelationGetReplicaIndex(rel)))
550 return;
551
552 /*
553 * This is either UPDATE OR DELETE and there is no replica identity.
554 *
555 * Check if the table publishes UPDATES or DELETES.
556 */
557 pubactions = GetRelationPublicationActions(rel);
558 if (cmd == CMD_UPDATE && pubactions->pubupdate)
559 ereport(ERROR,
560 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
561 errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
562 RelationGetRelationName(rel)),
563 errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
564 else if (cmd == CMD_DELETE && pubactions->pubdelete)
565 ereport(ERROR,
566 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
567 errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
568 RelationGetRelationName(rel)),
569 errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
570 }
571
572
573 /*
574 * Check if we support writing into specific relkind.
575 *
576 * The nspname and relname are only needed for error reporting.
577 */
578 void
CheckSubscriptionRelkind(char relkind,const char * nspname,const char * relname)579 CheckSubscriptionRelkind(char relkind, const char *nspname,
580 const char *relname)
581 {
582 /*
583 * We currently only support writing to regular tables.
584 */
585 if (relkind != RELKIND_RELATION)
586 ereport(ERROR,
587 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
588 errmsg("logical replication target relation \"%s.%s\" is not a table",
589 nspname, relname)));
590 }
591