1 /*-------------------------------------------------------------------------
2  *
3  * execReplication.c
4  *	  miscellaneous executor routines for logical replication
5  *
6  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *	  src/backend/executor/execReplication.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/genam.h"
18 #include "access/relscan.h"
19 #include "access/tableam.h"
20 #include "access/transam.h"
21 #include "access/xact.h"
22 #include "commands/trigger.h"
23 #include "executor/executor.h"
24 #include "executor/nodeModifyTable.h"
25 #include "nodes/nodeFuncs.h"
26 #include "parser/parse_relation.h"
27 #include "parser/parsetree.h"
28 #include "storage/bufmgr.h"
29 #include "storage/lmgr.h"
30 #include "utils/builtins.h"
31 #include "utils/datum.h"
32 #include "utils/lsyscache.h"
33 #include "utils/memutils.h"
34 #include "utils/rel.h"
35 #include "utils/snapmgr.h"
36 #include "utils/syscache.h"
37 #include "utils/typcache.h"
38 
39 
40 /*
41  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
42  * is setup to match 'rel' (*NOT* idxrel!).
43  *
44  * Returns whether any column contains NULLs.
45  *
46  * This is not generic routine, it expects the idxrel to be replication
47  * identity of a rel and meet all limitations associated with that.
48  */
49 static bool
build_replindex_scan_key(ScanKey skey,Relation rel,Relation idxrel,TupleTableSlot * searchslot)50 build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
51 						 TupleTableSlot *searchslot)
52 {
53 	int			attoff;
54 	bool		isnull;
55 	Datum		indclassDatum;
56 	oidvector  *opclass;
57 	int2vector *indkey = &idxrel->rd_index->indkey;
58 	bool		hasnulls = false;
59 
60 	Assert(RelationGetReplicaIndex(rel) == RelationGetRelid(idxrel) ||
61 		   RelationGetPrimaryKeyIndex(rel) == RelationGetRelid(idxrel));
62 
63 	indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
64 									Anum_pg_index_indclass, &isnull);
65 	Assert(!isnull);
66 	opclass = (oidvector *) DatumGetPointer(indclassDatum);
67 
68 	/* Build scankey for every attribute in the index. */
69 	for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++)
70 	{
71 		Oid			operator;
72 		Oid			opfamily;
73 		RegProcedure regop;
74 		int			pkattno = attoff + 1;
75 		int			mainattno = indkey->values[attoff];
76 		Oid			optype = get_opclass_input_type(opclass->values[attoff]);
77 
78 		/*
79 		 * Load the operator info.  We need this to get the equality operator
80 		 * function for the scan key.
81 		 */
82 		opfamily = get_opclass_family(opclass->values[attoff]);
83 
84 		operator = get_opfamily_member(opfamily, optype,
85 									   optype,
86 									   BTEqualStrategyNumber);
87 		if (!OidIsValid(operator))
88 			elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
89 				 BTEqualStrategyNumber, optype, optype, opfamily);
90 
91 		regop = get_opcode(operator);
92 
93 		/* Initialize the scankey. */
94 		ScanKeyInit(&skey[attoff],
95 					pkattno,
96 					BTEqualStrategyNumber,
97 					regop,
98 					searchslot->tts_values[mainattno - 1]);
99 
100 		skey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
101 
102 		/* Check for null value. */
103 		if (searchslot->tts_isnull[mainattno - 1])
104 		{
105 			hasnulls = true;
106 			skey[attoff].sk_flags |= SK_ISNULL;
107 		}
108 	}
109 
110 	return hasnulls;
111 }
112 
113 /*
114  * Search the relation 'rel' for tuple using the index.
115  *
116  * If a matching tuple is found, lock it with lockmode, fill the slot with its
117  * contents, and return true.  Return false otherwise.
118  */
119 bool
RelationFindReplTupleByIndex(Relation rel,Oid idxoid,LockTupleMode lockmode,TupleTableSlot * searchslot,TupleTableSlot * outslot)120 RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
121 							 LockTupleMode lockmode,
122 							 TupleTableSlot *searchslot,
123 							 TupleTableSlot *outslot)
124 {
125 	ScanKeyData skey[INDEX_MAX_KEYS];
126 	IndexScanDesc scan;
127 	SnapshotData snap;
128 	TransactionId xwait;
129 	Relation	idxrel;
130 	bool		found;
131 
132 	/* Open the index. */
133 	idxrel = index_open(idxoid, RowExclusiveLock);
134 
135 	/* Start an index scan. */
136 	InitDirtySnapshot(snap);
137 	scan = index_beginscan(rel, idxrel, &snap,
138 						   IndexRelationGetNumberOfKeyAttributes(idxrel),
139 						   0);
140 
141 	/* Build scan key. */
142 	build_replindex_scan_key(skey, rel, idxrel, searchslot);
143 
144 retry:
145 	found = false;
146 
147 	index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0);
148 
149 	/* Try to find the tuple */
150 	if (index_getnext_slot(scan, ForwardScanDirection, outslot))
151 	{
152 		found = true;
153 		ExecMaterializeSlot(outslot);
154 
155 		xwait = TransactionIdIsValid(snap.xmin) ?
156 			snap.xmin : snap.xmax;
157 
158 		/*
159 		 * If the tuple is locked, wait for locking transaction to finish and
160 		 * retry.
161 		 */
162 		if (TransactionIdIsValid(xwait))
163 		{
164 			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
165 			goto retry;
166 		}
167 	}
168 
169 	/* Found tuple, try to lock it in the lockmode. */
170 	if (found)
171 	{
172 		TM_FailureData tmfd;
173 		TM_Result	res;
174 
175 		PushActiveSnapshot(GetLatestSnapshot());
176 
177 		res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
178 							   outslot,
179 							   GetCurrentCommandId(false),
180 							   lockmode,
181 							   LockWaitBlock,
182 							   0 /* don't follow updates */ ,
183 							   &tmfd);
184 
185 		PopActiveSnapshot();
186 
187 		switch (res)
188 		{
189 			case TM_Ok:
190 				break;
191 			case TM_Updated:
192 				/* XXX: Improve handling here */
193 				if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
194 					ereport(LOG,
195 							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
196 							 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
197 				else
198 					ereport(LOG,
199 							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
200 							 errmsg("concurrent update, retrying")));
201 				goto retry;
202 			case TM_Deleted:
203 				/* XXX: Improve handling here */
204 				ereport(LOG,
205 						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
206 						 errmsg("concurrent delete, retrying")));
207 				goto retry;
208 			case TM_Invisible:
209 				elog(ERROR, "attempted to lock invisible tuple");
210 				break;
211 			default:
212 				elog(ERROR, "unexpected table_tuple_lock status: %u", res);
213 				break;
214 		}
215 	}
216 
217 	index_endscan(scan);
218 
219 	/* Don't release lock until commit. */
220 	index_close(idxrel, NoLock);
221 
222 	return found;
223 }
224 
225 /*
226  * Compare the tuples in the slots by checking if they have equal values.
227  */
228 static bool
tuples_equal(TupleTableSlot * slot1,TupleTableSlot * slot2,TypeCacheEntry ** eq)229 tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
230 			 TypeCacheEntry **eq)
231 {
232 	int			attrnum;
233 
234 	Assert(slot1->tts_tupleDescriptor->natts ==
235 		   slot2->tts_tupleDescriptor->natts);
236 
237 	slot_getallattrs(slot1);
238 	slot_getallattrs(slot2);
239 
240 	/* Check equality of the attributes. */
241 	for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
242 	{
243 		Form_pg_attribute att;
244 		TypeCacheEntry *typentry;
245 
246 		/*
247 		 * If one value is NULL and other is not, then they are certainly not
248 		 * equal
249 		 */
250 		if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
251 			return false;
252 
253 		/*
254 		 * If both are NULL, they can be considered equal.
255 		 */
256 		if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
257 			continue;
258 
259 		att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
260 
261 		typentry = eq[attrnum];
262 		if (typentry == NULL)
263 		{
264 			typentry = lookup_type_cache(att->atttypid,
265 										 TYPECACHE_EQ_OPR_FINFO);
266 			if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
267 				ereport(ERROR,
268 						(errcode(ERRCODE_UNDEFINED_FUNCTION),
269 						 errmsg("could not identify an equality operator for type %s",
270 								format_type_be(att->atttypid))));
271 			eq[attrnum] = typentry;
272 		}
273 
274 		if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
275 											att->attcollation,
276 											slot1->tts_values[attrnum],
277 											slot2->tts_values[attrnum])))
278 			return false;
279 	}
280 
281 	return true;
282 }
283 
284 /*
285  * Search the relation 'rel' for tuple using the sequential scan.
286  *
287  * If a matching tuple is found, lock it with lockmode, fill the slot with its
288  * contents, and return true.  Return false otherwise.
289  *
290  * Note that this stops on the first matching tuple.
291  *
292  * This can obviously be quite slow on tables that have more than few rows.
293  */
294 bool
RelationFindReplTupleSeq(Relation rel,LockTupleMode lockmode,TupleTableSlot * searchslot,TupleTableSlot * outslot)295 RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
296 						 TupleTableSlot *searchslot, TupleTableSlot *outslot)
297 {
298 	TupleTableSlot *scanslot;
299 	TableScanDesc scan;
300 	SnapshotData snap;
301 	TypeCacheEntry **eq;
302 	TransactionId xwait;
303 	bool		found;
304 	TupleDesc	desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
305 
306 	Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
307 
308 	eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
309 
310 	/* Start a heap scan. */
311 	InitDirtySnapshot(snap);
312 	scan = table_beginscan(rel, &snap, 0, NULL);
313 	scanslot = table_slot_create(rel, NULL);
314 
315 retry:
316 	found = false;
317 
318 	table_rescan(scan, NULL);
319 
320 	/* Try to find the tuple */
321 	while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
322 	{
323 		if (!tuples_equal(scanslot, searchslot, eq))
324 			continue;
325 
326 		found = true;
327 		ExecCopySlot(outslot, scanslot);
328 
329 		xwait = TransactionIdIsValid(snap.xmin) ?
330 			snap.xmin : snap.xmax;
331 
332 		/*
333 		 * If the tuple is locked, wait for locking transaction to finish and
334 		 * retry.
335 		 */
336 		if (TransactionIdIsValid(xwait))
337 		{
338 			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
339 			goto retry;
340 		}
341 
342 		/* Found our tuple and it's not locked */
343 		break;
344 	}
345 
346 	/* Found tuple, try to lock it in the lockmode. */
347 	if (found)
348 	{
349 		TM_FailureData tmfd;
350 		TM_Result	res;
351 
352 		PushActiveSnapshot(GetLatestSnapshot());
353 
354 		res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
355 							   outslot,
356 							   GetCurrentCommandId(false),
357 							   lockmode,
358 							   LockWaitBlock,
359 							   0 /* don't follow updates */ ,
360 							   &tmfd);
361 
362 		PopActiveSnapshot();
363 
364 		switch (res)
365 		{
366 			case TM_Ok:
367 				break;
368 			case TM_Updated:
369 				/* XXX: Improve handling here */
370 				if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
371 					ereport(LOG,
372 							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
373 							 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
374 				else
375 					ereport(LOG,
376 							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
377 							 errmsg("concurrent update, retrying")));
378 				goto retry;
379 			case TM_Deleted:
380 				/* XXX: Improve handling here */
381 				ereport(LOG,
382 						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
383 						 errmsg("concurrent delete, retrying")));
384 				goto retry;
385 			case TM_Invisible:
386 				elog(ERROR, "attempted to lock invisible tuple");
387 				break;
388 			default:
389 				elog(ERROR, "unexpected table_tuple_lock status: %u", res);
390 				break;
391 		}
392 	}
393 
394 	table_endscan(scan);
395 	ExecDropSingleTupleTableSlot(scanslot);
396 
397 	return found;
398 }
399 
400 /*
401  * Insert tuple represented in the slot to the relation, update the indexes,
402  * and execute any constraints and per-row triggers.
403  *
404  * Caller is responsible for opening the indexes.
405  */
406 void
ExecSimpleRelationInsert(EState * estate,TupleTableSlot * slot)407 ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
408 {
409 	bool		skip_tuple = false;
410 	ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
411 	Relation	rel = resultRelInfo->ri_RelationDesc;
412 
413 	/* For now we support only tables. */
414 	Assert(rel->rd_rel->relkind == RELKIND_RELATION);
415 
416 	CheckCmdReplicaIdentity(rel, CMD_INSERT);
417 
418 	/* BEFORE ROW INSERT Triggers */
419 	if (resultRelInfo->ri_TrigDesc &&
420 		resultRelInfo->ri_TrigDesc->trig_insert_before_row)
421 	{
422 		if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
423 			skip_tuple = true;	/* "do nothing" */
424 	}
425 
426 	if (!skip_tuple)
427 	{
428 		List	   *recheckIndexes = NIL;
429 
430 		/* Compute stored generated columns */
431 		if (rel->rd_att->constr &&
432 			rel->rd_att->constr->has_generated_stored)
433 			ExecComputeStoredGenerated(estate, slot, CMD_INSERT);
434 
435 		/* Check the constraints of the tuple */
436 		if (rel->rd_att->constr)
437 			ExecConstraints(resultRelInfo, slot, estate);
438 		if (resultRelInfo->ri_PartitionCheck)
439 			ExecPartitionCheck(resultRelInfo, slot, estate, true);
440 
441 		/* OK, store the tuple and create index entries for it */
442 		simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
443 
444 		if (resultRelInfo->ri_NumIndices > 0)
445 			recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL,
446 												   NIL);
447 
448 		/* AFTER ROW INSERT Triggers */
449 		ExecARInsertTriggers(estate, resultRelInfo, slot,
450 							 recheckIndexes, NULL);
451 
452 		/*
453 		 * XXX we should in theory pass a TransitionCaptureState object to the
454 		 * above to capture transition tuples, but after statement triggers
455 		 * don't actually get fired by replication yet anyway
456 		 */
457 
458 		list_free(recheckIndexes);
459 	}
460 }
461 
462 /*
463  * Find the searchslot tuple and update it with data in the slot,
464  * update the indexes, and execute any constraints and per-row triggers.
465  *
466  * Caller is responsible for opening the indexes.
467  */
468 void
ExecSimpleRelationUpdate(EState * estate,EPQState * epqstate,TupleTableSlot * searchslot,TupleTableSlot * slot)469 ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate,
470 						 TupleTableSlot *searchslot, TupleTableSlot *slot)
471 {
472 	bool		skip_tuple = false;
473 	ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
474 	Relation	rel = resultRelInfo->ri_RelationDesc;
475 	ItemPointer tid = &(searchslot->tts_tid);
476 
477 	/* For now we support only tables. */
478 	Assert(rel->rd_rel->relkind == RELKIND_RELATION);
479 
480 	CheckCmdReplicaIdentity(rel, CMD_UPDATE);
481 
482 	/* BEFORE ROW UPDATE Triggers */
483 	if (resultRelInfo->ri_TrigDesc &&
484 		resultRelInfo->ri_TrigDesc->trig_update_before_row)
485 	{
486 		if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
487 								  tid, NULL, slot))
488 			skip_tuple = true;	/* "do nothing" */
489 	}
490 
491 	if (!skip_tuple)
492 	{
493 		List	   *recheckIndexes = NIL;
494 		bool		update_indexes;
495 
496 		/* Compute stored generated columns */
497 		if (rel->rd_att->constr &&
498 			rel->rd_att->constr->has_generated_stored)
499 			ExecComputeStoredGenerated(estate, slot, CMD_UPDATE);
500 
501 		/* Check the constraints of the tuple */
502 		if (rel->rd_att->constr)
503 			ExecConstraints(resultRelInfo, slot, estate);
504 		if (resultRelInfo->ri_PartitionCheck)
505 			ExecPartitionCheck(resultRelInfo, slot, estate, true);
506 
507 		simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
508 								  &update_indexes);
509 
510 		if (resultRelInfo->ri_NumIndices > 0 && update_indexes)
511 			recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL,
512 												   NIL);
513 
514 		/* AFTER ROW UPDATE Triggers */
515 		ExecARUpdateTriggers(estate, resultRelInfo,
516 							 tid, NULL, slot,
517 							 recheckIndexes, NULL);
518 
519 		list_free(recheckIndexes);
520 	}
521 }
522 
523 /*
524  * Find the searchslot tuple and delete it, and execute any constraints
525  * and per-row triggers.
526  *
527  * Caller is responsible for opening the indexes.
528  */
529 void
ExecSimpleRelationDelete(EState * estate,EPQState * epqstate,TupleTableSlot * searchslot)530 ExecSimpleRelationDelete(EState *estate, EPQState *epqstate,
531 						 TupleTableSlot *searchslot)
532 {
533 	bool		skip_tuple = false;
534 	ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
535 	Relation	rel = resultRelInfo->ri_RelationDesc;
536 	ItemPointer tid = &searchslot->tts_tid;
537 
538 	CheckCmdReplicaIdentity(rel, CMD_DELETE);
539 
540 	/* BEFORE ROW DELETE Triggers */
541 	if (resultRelInfo->ri_TrigDesc &&
542 		resultRelInfo->ri_TrigDesc->trig_delete_before_row)
543 	{
544 		skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
545 										   tid, NULL, NULL);
546 
547 	}
548 
549 	if (!skip_tuple)
550 	{
551 		/* OK, delete the tuple */
552 		simple_table_tuple_delete(rel, tid, estate->es_snapshot);
553 
554 		/* AFTER ROW DELETE Triggers */
555 		ExecARDeleteTriggers(estate, resultRelInfo,
556 							 tid, NULL, NULL);
557 	}
558 }
559 
560 /*
561  * Check if command can be executed with current replica identity.
562  */
563 void
CheckCmdReplicaIdentity(Relation rel,CmdType cmd)564 CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
565 {
566 	PublicationActions *pubactions;
567 
568 	/* We only need to do checks for UPDATE and DELETE. */
569 	if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
570 		return;
571 
572 	/* If relation has replica identity we are always good. */
573 	if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
574 		OidIsValid(RelationGetReplicaIndex(rel)))
575 		return;
576 
577 	/*
578 	 * This is either UPDATE OR DELETE and there is no replica identity.
579 	 *
580 	 * Check if the table publishes UPDATES or DELETES.
581 	 */
582 	pubactions = GetRelationPublicationActions(rel);
583 	if (cmd == CMD_UPDATE && pubactions->pubupdate)
584 		ereport(ERROR,
585 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
586 				 errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
587 						RelationGetRelationName(rel)),
588 				 errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
589 	else if (cmd == CMD_DELETE && pubactions->pubdelete)
590 		ereport(ERROR,
591 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
592 				 errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
593 						RelationGetRelationName(rel)),
594 				 errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
595 }
596 
597 
598 /*
599  * Check if we support writing into specific relkind.
600  *
601  * The nspname and relname are only needed for error reporting.
602  */
603 void
CheckSubscriptionRelkind(char relkind,const char * nspname,const char * relname)604 CheckSubscriptionRelkind(char relkind, const char *nspname,
605 						 const char *relname)
606 {
607 	/*
608 	 * Give a more specific error for foreign tables.
609 	 */
610 	if (relkind == RELKIND_FOREIGN_TABLE)
611 		ereport(ERROR,
612 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
613 				 errmsg("cannot use relation \"%s.%s\" as logical replication target",
614 						nspname, relname),
615 				 errdetail("\"%s.%s\" is a foreign table.",
616 						   nspname, relname)));
617 
618 	if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
619 		ereport(ERROR,
620 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
621 				 errmsg("cannot use relation \"%s.%s\" as logical replication target",
622 						nspname, relname),
623 				 errdetail("\"%s.%s\" is not a table.",
624 						   nspname, relname)));
625 }
626