1 /*-------------------------------------------------------------------------
2  *
3  * execReplication.c
4  *	  miscellaneous executor routines for logical replication
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *	  src/backend/executor/execReplication.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/relscan.h"
18 #include "access/transam.h"
19 #include "access/xact.h"
20 #include "commands/trigger.h"
21 #include "executor/executor.h"
22 #include "nodes/nodeFuncs.h"
23 #include "parser/parse_relation.h"
24 #include "parser/parsetree.h"
25 #include "storage/bufmgr.h"
26 #include "storage/lmgr.h"
27 #include "utils/builtins.h"
28 #include "utils/datum.h"
29 #include "utils/lsyscache.h"
30 #include "utils/memutils.h"
31 #include "utils/rel.h"
32 #include "utils/snapmgr.h"
33 #include "utils/syscache.h"
34 #include "utils/typcache.h"
35 #include "utils/tqual.h"
36 
37 
38 /*
39  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
40  * is setup to match 'rel' (*NOT* idxrel!).
41  *
42  * Returns whether any column contains NULLs.
43  *
44  * This is not generic routine, it expects the idxrel to be replication
45  * identity of a rel and meet all limitations associated with that.
46  */
47 static bool
build_replindex_scan_key(ScanKey skey,Relation rel,Relation idxrel,TupleTableSlot * searchslot)48 build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
49 						 TupleTableSlot *searchslot)
50 {
51 	int			attoff;
52 	bool		isnull;
53 	Datum		indclassDatum;
54 	oidvector  *opclass;
55 	int2vector *indkey = &idxrel->rd_index->indkey;
56 	bool		hasnulls = false;
57 
58 	Assert(RelationGetReplicaIndex(rel) == RelationGetRelid(idxrel) ||
59 		   RelationGetPrimaryKeyIndex(rel) == RelationGetRelid(idxrel));
60 
61 	indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
62 									Anum_pg_index_indclass, &isnull);
63 	Assert(!isnull);
64 	opclass = (oidvector *) DatumGetPointer(indclassDatum);
65 
66 	/* Build scankey for every attribute in the index. */
67 	for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++)
68 	{
69 		Oid			operator;
70 		Oid			opfamily;
71 		RegProcedure regop;
72 		int			pkattno = attoff + 1;
73 		int			mainattno = indkey->values[attoff];
74 		Oid			optype = get_opclass_input_type(opclass->values[attoff]);
75 
76 		/*
77 		 * Load the operator info.  We need this to get the equality operator
78 		 * function for the scan key.
79 		 */
80 		opfamily = get_opclass_family(opclass->values[attoff]);
81 
82 		operator = get_opfamily_member(opfamily, optype,
83 									   optype,
84 									   BTEqualStrategyNumber);
85 		if (!OidIsValid(operator))
86 			elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
87 				 BTEqualStrategyNumber, optype, optype, opfamily);
88 
89 		regop = get_opcode(operator);
90 
91 		/* Initialize the scankey. */
92 		ScanKeyInit(&skey[attoff],
93 					pkattno,
94 					BTEqualStrategyNumber,
95 					regop,
96 					searchslot->tts_values[mainattno - 1]);
97 
98 		/* Check for null value. */
99 		if (searchslot->tts_isnull[mainattno - 1])
100 		{
101 			hasnulls = true;
102 			skey[attoff].sk_flags |= SK_ISNULL;
103 		}
104 	}
105 
106 	return hasnulls;
107 }
108 
109 /*
110  * Search the relation 'rel' for tuple using the index.
111  *
112  * If a matching tuple is found, lock it with lockmode, fill the slot with its
113  * contents, and return true.  Return false otherwise.
114  */
115 bool
RelationFindReplTupleByIndex(Relation rel,Oid idxoid,LockTupleMode lockmode,TupleTableSlot * searchslot,TupleTableSlot * outslot)116 RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
117 							 LockTupleMode lockmode,
118 							 TupleTableSlot *searchslot,
119 							 TupleTableSlot *outslot)
120 {
121 	HeapTuple	scantuple;
122 	ScanKeyData skey[INDEX_MAX_KEYS];
123 	IndexScanDesc scan;
124 	SnapshotData snap;
125 	TransactionId xwait;
126 	Relation	idxrel;
127 	bool		found;
128 
129 	/* Open the index. */
130 	idxrel = index_open(idxoid, RowExclusiveLock);
131 
132 	/* Start an index scan. */
133 	InitDirtySnapshot(snap);
134 	scan = index_beginscan(rel, idxrel, &snap,
135 						   IndexRelationGetNumberOfKeyAttributes(idxrel),
136 						   0);
137 
138 	/* Build scan key. */
139 	build_replindex_scan_key(skey, rel, idxrel, searchslot);
140 
141 retry:
142 	found = false;
143 
144 	index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0);
145 
146 	/* Try to find the tuple */
147 	if ((scantuple = index_getnext(scan, ForwardScanDirection)) != NULL)
148 	{
149 		found = true;
150 		ExecStoreTuple(scantuple, outslot, InvalidBuffer, false);
151 		ExecMaterializeSlot(outslot);
152 
153 		xwait = TransactionIdIsValid(snap.xmin) ?
154 			snap.xmin : snap.xmax;
155 
156 		/*
157 		 * If the tuple is locked, wait for locking transaction to finish and
158 		 * retry.
159 		 */
160 		if (TransactionIdIsValid(xwait))
161 		{
162 			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
163 			goto retry;
164 		}
165 	}
166 
167 	/* Found tuple, try to lock it in the lockmode. */
168 	if (found)
169 	{
170 		Buffer		buf;
171 		HeapUpdateFailureData hufd;
172 		HTSU_Result res;
173 		HeapTupleData locktup;
174 
175 		ItemPointerCopy(&outslot->tts_tuple->t_self, &locktup.t_self);
176 
177 		PushActiveSnapshot(GetLatestSnapshot());
178 
179 		res = heap_lock_tuple(rel, &locktup, GetCurrentCommandId(false),
180 							  lockmode,
181 							  LockWaitBlock,
182 							  false /* don't follow updates */ ,
183 							  &buf, &hufd);
184 		/* the tuple slot already has the buffer pinned */
185 		ReleaseBuffer(buf);
186 
187 		PopActiveSnapshot();
188 
189 		switch (res)
190 		{
191 			case HeapTupleMayBeUpdated:
192 				break;
193 			case HeapTupleUpdated:
194 				/* XXX: Improve handling here */
195 				if (ItemPointerIndicatesMovedPartitions(&hufd.ctid))
196 					ereport(LOG,
197 							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
198 							 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
199 				else
200 					ereport(LOG,
201 							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
202 							 errmsg("concurrent update, retrying")));
203 				goto retry;
204 			case HeapTupleInvisible:
205 				elog(ERROR, "attempted to lock invisible tuple");
206 				break;
207 			default:
208 				elog(ERROR, "unexpected heap_lock_tuple status: %u", res);
209 				break;
210 		}
211 	}
212 
213 	index_endscan(scan);
214 
215 	/* Don't release lock until commit. */
216 	index_close(idxrel, NoLock);
217 
218 	return found;
219 }
220 
221 /*
222  * Compare the tuple and slot and check if they have equal values.
223  */
224 static bool
tuple_equals_slot(TupleDesc desc,HeapTuple tup,TupleTableSlot * slot)225 tuple_equals_slot(TupleDesc desc, HeapTuple tup, TupleTableSlot *slot)
226 {
227 	Datum		values[MaxTupleAttributeNumber];
228 	bool		isnull[MaxTupleAttributeNumber];
229 	int			attrnum;
230 
231 	heap_deform_tuple(tup, desc, values, isnull);
232 
233 	/* Check equality of the attributes. */
234 	for (attrnum = 0; attrnum < desc->natts; attrnum++)
235 	{
236 		Form_pg_attribute att;
237 		TypeCacheEntry *typentry;
238 
239 		/*
240 		 * If one value is NULL and other is not, then they are certainly not
241 		 * equal
242 		 */
243 		if (isnull[attrnum] != slot->tts_isnull[attrnum])
244 			return false;
245 
246 		/*
247 		 * If both are NULL, they can be considered equal.
248 		 */
249 		if (isnull[attrnum])
250 			continue;
251 
252 		att = TupleDescAttr(desc, attrnum);
253 
254 		typentry = lookup_type_cache(att->atttypid, TYPECACHE_EQ_OPR_FINFO);
255 		if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
256 			ereport(ERROR,
257 					(errcode(ERRCODE_UNDEFINED_FUNCTION),
258 					 errmsg("could not identify an equality operator for type %s",
259 							format_type_be(att->atttypid))));
260 
261 		if (!DatumGetBool(FunctionCall2(&typentry->eq_opr_finfo,
262 										values[attrnum],
263 										slot->tts_values[attrnum])))
264 			return false;
265 	}
266 
267 	return true;
268 }
269 
270 /*
271  * Search the relation 'rel' for tuple using the sequential scan.
272  *
273  * If a matching tuple is found, lock it with lockmode, fill the slot with its
274  * contents, and return true.  Return false otherwise.
275  *
276  * Note that this stops on the first matching tuple.
277  *
278  * This can obviously be quite slow on tables that have more than few rows.
279  */
280 bool
RelationFindReplTupleSeq(Relation rel,LockTupleMode lockmode,TupleTableSlot * searchslot,TupleTableSlot * outslot)281 RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
282 						 TupleTableSlot *searchslot, TupleTableSlot *outslot)
283 {
284 	HeapTuple	scantuple;
285 	HeapScanDesc scan;
286 	SnapshotData snap;
287 	TransactionId xwait;
288 	bool		found;
289 	TupleDesc	desc = RelationGetDescr(rel);
290 
291 	Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
292 
293 	/* Start a heap scan. */
294 	InitDirtySnapshot(snap);
295 	scan = heap_beginscan(rel, &snap, 0, NULL);
296 
297 retry:
298 	found = false;
299 
300 	heap_rescan(scan, NULL);
301 
302 	/* Try to find the tuple */
303 	while ((scantuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
304 	{
305 		if (!tuple_equals_slot(desc, scantuple, searchslot))
306 			continue;
307 
308 		found = true;
309 		ExecStoreTuple(scantuple, outslot, InvalidBuffer, false);
310 		ExecMaterializeSlot(outslot);
311 
312 		xwait = TransactionIdIsValid(snap.xmin) ?
313 			snap.xmin : snap.xmax;
314 
315 		/*
316 		 * If the tuple is locked, wait for locking transaction to finish and
317 		 * retry.
318 		 */
319 		if (TransactionIdIsValid(xwait))
320 		{
321 			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
322 			goto retry;
323 		}
324 
325 		/* Found our tuple and it's not locked */
326 		break;
327 	}
328 
329 	/* Found tuple, try to lock it in the lockmode. */
330 	if (found)
331 	{
332 		Buffer		buf;
333 		HeapUpdateFailureData hufd;
334 		HTSU_Result res;
335 		HeapTupleData locktup;
336 
337 		ItemPointerCopy(&outslot->tts_tuple->t_self, &locktup.t_self);
338 
339 		PushActiveSnapshot(GetLatestSnapshot());
340 
341 		res = heap_lock_tuple(rel, &locktup, GetCurrentCommandId(false),
342 							  lockmode,
343 							  LockWaitBlock,
344 							  false /* don't follow updates */ ,
345 							  &buf, &hufd);
346 		/* the tuple slot already has the buffer pinned */
347 		ReleaseBuffer(buf);
348 
349 		PopActiveSnapshot();
350 
351 		switch (res)
352 		{
353 			case HeapTupleMayBeUpdated:
354 				break;
355 			case HeapTupleUpdated:
356 				/* XXX: Improve handling here */
357 				if (ItemPointerIndicatesMovedPartitions(&hufd.ctid))
358 					ereport(LOG,
359 							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
360 							 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
361 				else
362 					ereport(LOG,
363 							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
364 							 errmsg("concurrent update, retrying")));
365 				goto retry;
366 			case HeapTupleInvisible:
367 				elog(ERROR, "attempted to lock invisible tuple");
368 				break;
369 			default:
370 				elog(ERROR, "unexpected heap_lock_tuple status: %u", res);
371 				break;
372 		}
373 	}
374 
375 	heap_endscan(scan);
376 
377 	return found;
378 }
379 
380 /*
381  * Insert tuple represented in the slot to the relation, update the indexes,
382  * and execute any constraints and per-row triggers.
383  *
384  * Caller is responsible for opening the indexes.
385  */
386 void
ExecSimpleRelationInsert(EState * estate,TupleTableSlot * slot)387 ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
388 {
389 	bool		skip_tuple = false;
390 	HeapTuple	tuple;
391 	ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
392 	Relation	rel = resultRelInfo->ri_RelationDesc;
393 
394 	/* For now we support only tables. */
395 	Assert(rel->rd_rel->relkind == RELKIND_RELATION);
396 
397 	CheckCmdReplicaIdentity(rel, CMD_INSERT);
398 
399 	/* BEFORE ROW INSERT Triggers */
400 	if (resultRelInfo->ri_TrigDesc &&
401 		resultRelInfo->ri_TrigDesc->trig_insert_before_row)
402 	{
403 		slot = ExecBRInsertTriggers(estate, resultRelInfo, slot);
404 
405 		if (slot == NULL)		/* "do nothing" */
406 			skip_tuple = true;
407 	}
408 
409 	if (!skip_tuple)
410 	{
411 		List	   *recheckIndexes = NIL;
412 
413 		/* Check the constraints of the tuple */
414 		if (rel->rd_att->constr)
415 			ExecConstraints(resultRelInfo, slot, estate);
416 		if (resultRelInfo->ri_PartitionCheck)
417 			ExecPartitionCheck(resultRelInfo, slot, estate, true);
418 
419 		/* Materialize slot into a tuple that we can scribble upon. */
420 		tuple = ExecMaterializeSlot(slot);
421 
422 		/* OK, store the tuple and create index entries for it */
423 		simple_heap_insert(rel, tuple);
424 
425 		if (resultRelInfo->ri_NumIndices > 0)
426 			recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
427 												   estate, false, NULL,
428 												   NIL);
429 
430 		/* AFTER ROW INSERT Triggers */
431 		ExecARInsertTriggers(estate, resultRelInfo, tuple,
432 							 recheckIndexes, NULL);
433 
434 		/*
435 		 * XXX we should in theory pass a TransitionCaptureState object to the
436 		 * above to capture transition tuples, but after statement triggers
437 		 * don't actually get fired by replication yet anyway
438 		 */
439 
440 		list_free(recheckIndexes);
441 	}
442 }
443 
444 /*
445  * Find the searchslot tuple and update it with data in the slot,
446  * update the indexes, and execute any constraints and per-row triggers.
447  *
448  * Caller is responsible for opening the indexes.
449  */
450 void
ExecSimpleRelationUpdate(EState * estate,EPQState * epqstate,TupleTableSlot * searchslot,TupleTableSlot * slot)451 ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate,
452 						 TupleTableSlot *searchslot, TupleTableSlot *slot)
453 {
454 	bool		skip_tuple = false;
455 	HeapTuple	tuple;
456 	ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
457 	Relation	rel = resultRelInfo->ri_RelationDesc;
458 
459 	/* For now we support only tables. */
460 	Assert(rel->rd_rel->relkind == RELKIND_RELATION);
461 
462 	CheckCmdReplicaIdentity(rel, CMD_UPDATE);
463 
464 	/* BEFORE ROW UPDATE Triggers */
465 	if (resultRelInfo->ri_TrigDesc &&
466 		resultRelInfo->ri_TrigDesc->trig_update_before_row)
467 	{
468 		slot = ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
469 									&searchslot->tts_tuple->t_self,
470 									NULL, slot);
471 
472 		if (slot == NULL)		/* "do nothing" */
473 			skip_tuple = true;
474 	}
475 
476 	if (!skip_tuple)
477 	{
478 		List	   *recheckIndexes = NIL;
479 
480 		/* Check the constraints of the tuple */
481 		if (rel->rd_att->constr)
482 			ExecConstraints(resultRelInfo, slot, estate);
483 		if (resultRelInfo->ri_PartitionCheck)
484 			ExecPartitionCheck(resultRelInfo, slot, estate, true);
485 
486 		/* Materialize slot into a tuple that we can scribble upon. */
487 		tuple = ExecMaterializeSlot(slot);
488 
489 		/* OK, update the tuple and index entries for it */
490 		simple_heap_update(rel, &searchslot->tts_tuple->t_self,
491 						   slot->tts_tuple);
492 
493 		if (resultRelInfo->ri_NumIndices > 0 &&
494 			!HeapTupleIsHeapOnly(slot->tts_tuple))
495 			recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
496 												   estate, false, NULL,
497 												   NIL);
498 
499 		/* AFTER ROW UPDATE Triggers */
500 		ExecARUpdateTriggers(estate, resultRelInfo,
501 							 &searchslot->tts_tuple->t_self,
502 							 NULL, tuple, recheckIndexes, NULL);
503 
504 		list_free(recheckIndexes);
505 	}
506 }
507 
508 /*
509  * Find the searchslot tuple and delete it, and execute any constraints
510  * and per-row triggers.
511  *
512  * Caller is responsible for opening the indexes.
513  */
514 void
ExecSimpleRelationDelete(EState * estate,EPQState * epqstate,TupleTableSlot * searchslot)515 ExecSimpleRelationDelete(EState *estate, EPQState *epqstate,
516 						 TupleTableSlot *searchslot)
517 {
518 	bool		skip_tuple = false;
519 	ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
520 	Relation	rel = resultRelInfo->ri_RelationDesc;
521 
522 	/* For now we support only tables. */
523 	Assert(rel->rd_rel->relkind == RELKIND_RELATION);
524 
525 	CheckCmdReplicaIdentity(rel, CMD_DELETE);
526 
527 	/* BEFORE ROW DELETE Triggers */
528 	if (resultRelInfo->ri_TrigDesc &&
529 		resultRelInfo->ri_TrigDesc->trig_delete_before_row)
530 	{
531 		skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
532 										   &searchslot->tts_tuple->t_self,
533 										   NULL, NULL);
534 	}
535 
536 	if (!skip_tuple)
537 	{
538 		List	   *recheckIndexes = NIL;
539 
540 		/* OK, delete the tuple */
541 		simple_heap_delete(rel, &searchslot->tts_tuple->t_self);
542 
543 		/* AFTER ROW DELETE Triggers */
544 		ExecARDeleteTriggers(estate, resultRelInfo,
545 							 &searchslot->tts_tuple->t_self, NULL, NULL);
546 
547 		list_free(recheckIndexes);
548 	}
549 }
550 
551 /*
552  * Check if command can be executed with current replica identity.
553  */
554 void
CheckCmdReplicaIdentity(Relation rel,CmdType cmd)555 CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
556 {
557 	PublicationActions *pubactions;
558 
559 	/* We only need to do checks for UPDATE and DELETE. */
560 	if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
561 		return;
562 
563 	/* If relation has replica identity we are always good. */
564 	if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
565 		OidIsValid(RelationGetReplicaIndex(rel)))
566 		return;
567 
568 	/*
569 	 * This is either UPDATE OR DELETE and there is no replica identity.
570 	 *
571 	 * Check if the table publishes UPDATES or DELETES.
572 	 */
573 	pubactions = GetRelationPublicationActions(rel);
574 	if (cmd == CMD_UPDATE && pubactions->pubupdate)
575 		ereport(ERROR,
576 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
577 				 errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
578 						RelationGetRelationName(rel)),
579 				 errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
580 	else if (cmd == CMD_DELETE && pubactions->pubdelete)
581 		ereport(ERROR,
582 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
583 				 errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
584 						RelationGetRelationName(rel)),
585 				 errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
586 }
587 
588 
589 /*
590  * Check if we support writing into specific relkind.
591  *
592  * The nspname and relname are only needed for error reporting.
593  */
594 void
CheckSubscriptionRelkind(char relkind,const char * nspname,const char * relname)595 CheckSubscriptionRelkind(char relkind, const char *nspname,
596 						 const char *relname)
597 {
598 	/*
599 	 * We currently only support writing to regular tables.
600 	 */
601 	if (relkind != RELKIND_RELATION)
602 		ereport(ERROR,
603 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
604 				 errmsg("logical replication target relation \"%s.%s\" is not a table",
605 						nspname, relname)));
606 }
607