1 /*-------------------------------------------------------------------------
2  *
3  * execReplication.c
4  *	  miscellaneous executor routines for logical replication
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *	  src/backend/executor/execReplication.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/genam.h"
18 #include "access/relscan.h"
19 #include "access/tableam.h"
20 #include "access/transam.h"
21 #include "access/xact.h"
22 #include "commands/trigger.h"
23 #include "executor/executor.h"
24 #include "executor/nodeModifyTable.h"
25 #include "nodes/nodeFuncs.h"
26 #include "parser/parse_relation.h"
27 #include "parser/parsetree.h"
28 #include "storage/bufmgr.h"
29 #include "storage/lmgr.h"
30 #include "utils/builtins.h"
31 #include "utils/datum.h"
32 #include "utils/lsyscache.h"
33 #include "utils/memutils.h"
34 #include "utils/rel.h"
35 #include "utils/snapmgr.h"
36 #include "utils/syscache.h"
37 #include "utils/typcache.h"
38 
39 
40 /*
41  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
42  * is setup to match 'rel' (*NOT* idxrel!).
43  *
44  * Returns whether any column contains NULLs.
45  *
46  * This is not generic routine, it expects the idxrel to be replication
47  * identity of a rel and meet all limitations associated with that.
48  */
49 static bool
build_replindex_scan_key(ScanKey skey,Relation rel,Relation idxrel,TupleTableSlot * searchslot)50 build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
51 						 TupleTableSlot *searchslot)
52 {
53 	int			attoff;
54 	bool		isnull;
55 	Datum		indclassDatum;
56 	oidvector  *opclass;
57 	int2vector *indkey = &idxrel->rd_index->indkey;
58 	bool		hasnulls = false;
59 
60 	Assert(RelationGetReplicaIndex(rel) == RelationGetRelid(idxrel) ||
61 		   RelationGetPrimaryKeyIndex(rel) == RelationGetRelid(idxrel));
62 
63 	indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
64 									Anum_pg_index_indclass, &isnull);
65 	Assert(!isnull);
66 	opclass = (oidvector *) DatumGetPointer(indclassDatum);
67 
68 	/* Build scankey for every attribute in the index. */
69 	for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++)
70 	{
71 		Oid			operator;
72 		Oid			opfamily;
73 		RegProcedure regop;
74 		int			pkattno = attoff + 1;
75 		int			mainattno = indkey->values[attoff];
76 		Oid			optype = get_opclass_input_type(opclass->values[attoff]);
77 
78 		/*
79 		 * Load the operator info.  We need this to get the equality operator
80 		 * function for the scan key.
81 		 */
82 		opfamily = get_opclass_family(opclass->values[attoff]);
83 
84 		operator = get_opfamily_member(opfamily, optype,
85 									   optype,
86 									   BTEqualStrategyNumber);
87 		if (!OidIsValid(operator))
88 			elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
89 				 BTEqualStrategyNumber, optype, optype, opfamily);
90 
91 		regop = get_opcode(operator);
92 
93 		/* Initialize the scankey. */
94 		ScanKeyInit(&skey[attoff],
95 					pkattno,
96 					BTEqualStrategyNumber,
97 					regop,
98 					searchslot->tts_values[mainattno - 1]);
99 
100 		skey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
101 
102 		/* Check for null value. */
103 		if (searchslot->tts_isnull[mainattno - 1])
104 		{
105 			hasnulls = true;
106 			skey[attoff].sk_flags |= SK_ISNULL;
107 		}
108 	}
109 
110 	return hasnulls;
111 }
112 
113 /*
114  * Search the relation 'rel' for tuple using the index.
115  *
116  * If a matching tuple is found, lock it with lockmode, fill the slot with its
117  * contents, and return true.  Return false otherwise.
118  */
119 bool
RelationFindReplTupleByIndex(Relation rel,Oid idxoid,LockTupleMode lockmode,TupleTableSlot * searchslot,TupleTableSlot * outslot)120 RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
121 							 LockTupleMode lockmode,
122 							 TupleTableSlot *searchslot,
123 							 TupleTableSlot *outslot)
124 {
125 	ScanKeyData skey[INDEX_MAX_KEYS];
126 	IndexScanDesc scan;
127 	SnapshotData snap;
128 	TransactionId xwait;
129 	Relation	idxrel;
130 	bool		found;
131 
132 	/* Open the index. */
133 	idxrel = index_open(idxoid, RowExclusiveLock);
134 
135 	/* Start an index scan. */
136 	InitDirtySnapshot(snap);
137 	scan = index_beginscan(rel, idxrel, &snap,
138 						   IndexRelationGetNumberOfKeyAttributes(idxrel),
139 						   0);
140 
141 	/* Build scan key. */
142 	build_replindex_scan_key(skey, rel, idxrel, searchslot);
143 
144 retry:
145 	found = false;
146 
147 	index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0);
148 
149 	/* Try to find the tuple */
150 	if (index_getnext_slot(scan, ForwardScanDirection, outslot))
151 	{
152 		found = true;
153 		ExecMaterializeSlot(outslot);
154 
155 		xwait = TransactionIdIsValid(snap.xmin) ?
156 			snap.xmin : snap.xmax;
157 
158 		/*
159 		 * If the tuple is locked, wait for locking transaction to finish and
160 		 * retry.
161 		 */
162 		if (TransactionIdIsValid(xwait))
163 		{
164 			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
165 			goto retry;
166 		}
167 	}
168 
169 	/* Found tuple, try to lock it in the lockmode. */
170 	if (found)
171 	{
172 		TM_FailureData tmfd;
173 		TM_Result	res;
174 
175 		PushActiveSnapshot(GetLatestSnapshot());
176 
177 		res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
178 							   outslot,
179 							   GetCurrentCommandId(false),
180 							   lockmode,
181 							   LockWaitBlock,
182 							   0 /* don't follow updates */ ,
183 							   &tmfd);
184 
185 		PopActiveSnapshot();
186 
187 		switch (res)
188 		{
189 			case TM_Ok:
190 				break;
191 			case TM_Updated:
192 				/* XXX: Improve handling here */
193 				if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
194 					ereport(LOG,
195 							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
196 							 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
197 				else
198 					ereport(LOG,
199 							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
200 							 errmsg("concurrent update, retrying")));
201 				goto retry;
202 			case TM_Deleted:
203 				/* XXX: Improve handling here */
204 				ereport(LOG,
205 						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
206 						 errmsg("concurrent delete, retrying")));
207 				goto retry;
208 			case TM_Invisible:
209 				elog(ERROR, "attempted to lock invisible tuple");
210 				break;
211 			default:
212 				elog(ERROR, "unexpected table_tuple_lock status: %u", res);
213 				break;
214 		}
215 	}
216 
217 	index_endscan(scan);
218 
219 	/* Don't release lock until commit. */
220 	index_close(idxrel, NoLock);
221 
222 	return found;
223 }
224 
225 /*
226  * Compare the tuples in the slots by checking if they have equal values.
227  */
228 static bool
tuples_equal(TupleTableSlot * slot1,TupleTableSlot * slot2)229 tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2)
230 {
231 	int			attrnum;
232 
233 	Assert(slot1->tts_tupleDescriptor->natts ==
234 		   slot2->tts_tupleDescriptor->natts);
235 
236 	slot_getallattrs(slot1);
237 	slot_getallattrs(slot2);
238 
239 	/* Check equality of the attributes. */
240 	for (attrnum = 0; attrnum < slot1->tts_tupleDescriptor->natts; attrnum++)
241 	{
242 		Form_pg_attribute att;
243 		TypeCacheEntry *typentry;
244 
245 		/*
246 		 * If one value is NULL and other is not, then they are certainly not
247 		 * equal
248 		 */
249 		if (slot1->tts_isnull[attrnum] != slot2->tts_isnull[attrnum])
250 			return false;
251 
252 		/*
253 		 * If both are NULL, they can be considered equal.
254 		 */
255 		if (slot1->tts_isnull[attrnum] || slot2->tts_isnull[attrnum])
256 			continue;
257 
258 		att = TupleDescAttr(slot1->tts_tupleDescriptor, attrnum);
259 
260 		typentry = lookup_type_cache(att->atttypid, TYPECACHE_EQ_OPR_FINFO);
261 		if (!OidIsValid(typentry->eq_opr_finfo.fn_oid))
262 			ereport(ERROR,
263 					(errcode(ERRCODE_UNDEFINED_FUNCTION),
264 					 errmsg("could not identify an equality operator for type %s",
265 							format_type_be(att->atttypid))));
266 
267 		if (!DatumGetBool(FunctionCall2Coll(&typentry->eq_opr_finfo,
268 											att->attcollation,
269 											slot1->tts_values[attrnum],
270 											slot2->tts_values[attrnum])))
271 			return false;
272 	}
273 
274 	return true;
275 }
276 
277 /*
278  * Search the relation 'rel' for tuple using the sequential scan.
279  *
280  * If a matching tuple is found, lock it with lockmode, fill the slot with its
281  * contents, and return true.  Return false otherwise.
282  *
283  * Note that this stops on the first matching tuple.
284  *
285  * This can obviously be quite slow on tables that have more than few rows.
286  */
287 bool
RelationFindReplTupleSeq(Relation rel,LockTupleMode lockmode,TupleTableSlot * searchslot,TupleTableSlot * outslot)288 RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
289 						 TupleTableSlot *searchslot, TupleTableSlot *outslot)
290 {
291 	TupleTableSlot *scanslot;
292 	TableScanDesc scan;
293 	SnapshotData snap;
294 	TransactionId xwait;
295 	bool		found;
296 	TupleDesc	desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
297 
298 	Assert(equalTupleDescs(desc, outslot->tts_tupleDescriptor));
299 
300 	/* Start a heap scan. */
301 	InitDirtySnapshot(snap);
302 	scan = table_beginscan(rel, &snap, 0, NULL);
303 	scanslot = table_slot_create(rel, NULL);
304 
305 retry:
306 	found = false;
307 
308 	table_rescan(scan, NULL);
309 
310 	/* Try to find the tuple */
311 	while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
312 	{
313 		if (!tuples_equal(scanslot, searchslot))
314 			continue;
315 
316 		found = true;
317 		ExecCopySlot(outslot, scanslot);
318 
319 		xwait = TransactionIdIsValid(snap.xmin) ?
320 			snap.xmin : snap.xmax;
321 
322 		/*
323 		 * If the tuple is locked, wait for locking transaction to finish and
324 		 * retry.
325 		 */
326 		if (TransactionIdIsValid(xwait))
327 		{
328 			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
329 			goto retry;
330 		}
331 
332 		/* Found our tuple and it's not locked */
333 		break;
334 	}
335 
336 	/* Found tuple, try to lock it in the lockmode. */
337 	if (found)
338 	{
339 		TM_FailureData tmfd;
340 		TM_Result	res;
341 
342 		PushActiveSnapshot(GetLatestSnapshot());
343 
344 		res = table_tuple_lock(rel, &(outslot->tts_tid), GetLatestSnapshot(),
345 							   outslot,
346 							   GetCurrentCommandId(false),
347 							   lockmode,
348 							   LockWaitBlock,
349 							   0 /* don't follow updates */ ,
350 							   &tmfd);
351 
352 		PopActiveSnapshot();
353 
354 		switch (res)
355 		{
356 			case TM_Ok:
357 				break;
358 			case TM_Updated:
359 				/* XXX: Improve handling here */
360 				if (ItemPointerIndicatesMovedPartitions(&tmfd.ctid))
361 					ereport(LOG,
362 							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
363 							 errmsg("tuple to be locked was already moved to another partition due to concurrent update, retrying")));
364 				else
365 					ereport(LOG,
366 							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
367 							 errmsg("concurrent update, retrying")));
368 				goto retry;
369 			case TM_Deleted:
370 				/* XXX: Improve handling here */
371 				ereport(LOG,
372 						(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
373 						 errmsg("concurrent delete, retrying")));
374 				goto retry;
375 			case TM_Invisible:
376 				elog(ERROR, "attempted to lock invisible tuple");
377 				break;
378 			default:
379 				elog(ERROR, "unexpected table_tuple_lock status: %u", res);
380 				break;
381 		}
382 	}
383 
384 	table_endscan(scan);
385 	ExecDropSingleTupleTableSlot(scanslot);
386 
387 	return found;
388 }
389 
390 /*
391  * Insert tuple represented in the slot to the relation, update the indexes,
392  * and execute any constraints and per-row triggers.
393  *
394  * Caller is responsible for opening the indexes.
395  */
396 void
ExecSimpleRelationInsert(EState * estate,TupleTableSlot * slot)397 ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
398 {
399 	bool		skip_tuple = false;
400 	ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
401 	Relation	rel = resultRelInfo->ri_RelationDesc;
402 
403 	/* For now we support only tables. */
404 	Assert(rel->rd_rel->relkind == RELKIND_RELATION);
405 
406 	CheckCmdReplicaIdentity(rel, CMD_INSERT);
407 
408 	/* BEFORE ROW INSERT Triggers */
409 	if (resultRelInfo->ri_TrigDesc &&
410 		resultRelInfo->ri_TrigDesc->trig_insert_before_row)
411 	{
412 		if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
413 			skip_tuple = true;	/* "do nothing" */
414 	}
415 
416 	if (!skip_tuple)
417 	{
418 		List	   *recheckIndexes = NIL;
419 
420 		/* Compute stored generated columns */
421 		if (rel->rd_att->constr &&
422 			rel->rd_att->constr->has_generated_stored)
423 			ExecComputeStoredGenerated(estate, slot);
424 
425 		/* Check the constraints of the tuple */
426 		if (rel->rd_att->constr)
427 			ExecConstraints(resultRelInfo, slot, estate);
428 		if (resultRelInfo->ri_PartitionCheck)
429 			ExecPartitionCheck(resultRelInfo, slot, estate, true);
430 
431 		/* OK, store the tuple and create index entries for it */
432 		simple_table_tuple_insert(resultRelInfo->ri_RelationDesc, slot);
433 
434 		if (resultRelInfo->ri_NumIndices > 0)
435 			recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL,
436 												   NIL);
437 
438 		/* AFTER ROW INSERT Triggers */
439 		ExecARInsertTriggers(estate, resultRelInfo, slot,
440 							 recheckIndexes, NULL);
441 
442 		/*
443 		 * XXX we should in theory pass a TransitionCaptureState object to the
444 		 * above to capture transition tuples, but after statement triggers
445 		 * don't actually get fired by replication yet anyway
446 		 */
447 
448 		list_free(recheckIndexes);
449 	}
450 }
451 
452 /*
453  * Find the searchslot tuple and update it with data in the slot,
454  * update the indexes, and execute any constraints and per-row triggers.
455  *
456  * Caller is responsible for opening the indexes.
457  */
458 void
ExecSimpleRelationUpdate(EState * estate,EPQState * epqstate,TupleTableSlot * searchslot,TupleTableSlot * slot)459 ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate,
460 						 TupleTableSlot *searchslot, TupleTableSlot *slot)
461 {
462 	bool		skip_tuple = false;
463 	ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
464 	Relation	rel = resultRelInfo->ri_RelationDesc;
465 	ItemPointer tid = &(searchslot->tts_tid);
466 
467 	/* For now we support only tables. */
468 	Assert(rel->rd_rel->relkind == RELKIND_RELATION);
469 
470 	CheckCmdReplicaIdentity(rel, CMD_UPDATE);
471 
472 	/* BEFORE ROW UPDATE Triggers */
473 	if (resultRelInfo->ri_TrigDesc &&
474 		resultRelInfo->ri_TrigDesc->trig_update_before_row)
475 	{
476 		if (!ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
477 								  tid, NULL, slot))
478 			skip_tuple = true;	/* "do nothing" */
479 	}
480 
481 	if (!skip_tuple)
482 	{
483 		List	   *recheckIndexes = NIL;
484 		bool		update_indexes;
485 
486 		/* Compute stored generated columns */
487 		if (rel->rd_att->constr &&
488 			rel->rd_att->constr->has_generated_stored)
489 			ExecComputeStoredGenerated(estate, slot);
490 
491 		/* Check the constraints of the tuple */
492 		if (rel->rd_att->constr)
493 			ExecConstraints(resultRelInfo, slot, estate);
494 		if (resultRelInfo->ri_PartitionCheck)
495 			ExecPartitionCheck(resultRelInfo, slot, estate, true);
496 
497 		simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
498 								  &update_indexes);
499 
500 		if (resultRelInfo->ri_NumIndices > 0 && update_indexes)
501 			recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL,
502 												   NIL);
503 
504 		/* AFTER ROW UPDATE Triggers */
505 		ExecARUpdateTriggers(estate, resultRelInfo,
506 							 tid, NULL, slot,
507 							 recheckIndexes, NULL);
508 
509 		list_free(recheckIndexes);
510 	}
511 }
512 
513 /*
514  * Find the searchslot tuple and delete it, and execute any constraints
515  * and per-row triggers.
516  *
517  * Caller is responsible for opening the indexes.
518  */
519 void
ExecSimpleRelationDelete(EState * estate,EPQState * epqstate,TupleTableSlot * searchslot)520 ExecSimpleRelationDelete(EState *estate, EPQState *epqstate,
521 						 TupleTableSlot *searchslot)
522 {
523 	bool		skip_tuple = false;
524 	ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
525 	Relation	rel = resultRelInfo->ri_RelationDesc;
526 	ItemPointer tid = &searchslot->tts_tid;
527 
528 	CheckCmdReplicaIdentity(rel, CMD_DELETE);
529 
530 	/* BEFORE ROW DELETE Triggers */
531 	if (resultRelInfo->ri_TrigDesc &&
532 		resultRelInfo->ri_TrigDesc->trig_delete_before_row)
533 	{
534 		skip_tuple = !ExecBRDeleteTriggers(estate, epqstate, resultRelInfo,
535 										   tid, NULL, NULL);
536 
537 	}
538 
539 	if (!skip_tuple)
540 	{
541 		/* OK, delete the tuple */
542 		simple_table_tuple_delete(rel, tid, estate->es_snapshot);
543 
544 		/* AFTER ROW DELETE Triggers */
545 		ExecARDeleteTriggers(estate, resultRelInfo,
546 							 tid, NULL, NULL);
547 	}
548 }
549 
550 /*
551  * Check if command can be executed with current replica identity.
552  */
553 void
CheckCmdReplicaIdentity(Relation rel,CmdType cmd)554 CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
555 {
556 	PublicationActions *pubactions;
557 
558 	/* We only need to do checks for UPDATE and DELETE. */
559 	if (cmd != CMD_UPDATE && cmd != CMD_DELETE)
560 		return;
561 
562 	/* If relation has replica identity we are always good. */
563 	if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
564 		OidIsValid(RelationGetReplicaIndex(rel)))
565 		return;
566 
567 	/*
568 	 * This is either UPDATE OR DELETE and there is no replica identity.
569 	 *
570 	 * Check if the table publishes UPDATES or DELETES.
571 	 */
572 	pubactions = GetRelationPublicationActions(rel);
573 	if (cmd == CMD_UPDATE && pubactions->pubupdate)
574 		ereport(ERROR,
575 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
576 				 errmsg("cannot update table \"%s\" because it does not have a replica identity and publishes updates",
577 						RelationGetRelationName(rel)),
578 				 errhint("To enable updating the table, set REPLICA IDENTITY using ALTER TABLE.")));
579 	else if (cmd == CMD_DELETE && pubactions->pubdelete)
580 		ereport(ERROR,
581 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
582 				 errmsg("cannot delete from table \"%s\" because it does not have a replica identity and publishes deletes",
583 						RelationGetRelationName(rel)),
584 				 errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
585 }
586 
587 
588 /*
589  * Check if we support writing into specific relkind.
590  *
591  * The nspname and relname are only needed for error reporting.
592  */
593 void
CheckSubscriptionRelkind(char relkind,const char * nspname,const char * relname)594 CheckSubscriptionRelkind(char relkind, const char *nspname,
595 						 const char *relname)
596 {
597 	/*
598 	 * We currently only support writing to regular tables.  However, give a
599 	 * more specific error for partitioned and foreign tables.
600 	 */
601 	if (relkind == RELKIND_PARTITIONED_TABLE)
602 		ereport(ERROR,
603 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
604 				 errmsg("cannot use relation \"%s.%s\" as logical replication target",
605 						nspname, relname),
606 				 errdetail("\"%s.%s\" is a partitioned table.",
607 						   nspname, relname)));
608 	else if (relkind == RELKIND_FOREIGN_TABLE)
609 		ereport(ERROR,
610 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
611 				 errmsg("cannot use relation \"%s.%s\" as logical replication target",
612 						nspname, relname),
613 				 errdetail("\"%s.%s\" is a foreign table.",
614 						   nspname, relname)));
615 
616 	if (relkind != RELKIND_RELATION)
617 		ereport(ERROR,
618 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
619 				 errmsg("cannot use relation \"%s.%s\" as logical replication target",
620 						nspname, relname),
621 				 errdetail("\"%s.%s\" is not a table.",
622 						   nspname, relname)));
623 }
624