1 /*-------------------------------------------------------------------------
2  *
3  * transaction_management.c
4  *
5  *   Transaction management for Citus.  Most of the work is delegated to other
6  *   subsystems, this files, and especially CoordinatedTransactionCallback,
7  *   coordinates the work between them.
8  *
9  * Copyright (c) Citus Data, Inc.
10  *
11  *-------------------------------------------------------------------------
12  */
13 
14 #include "postgres.h"
15 
16 #include "libpq-fe.h"
17 
18 #include "miscadmin.h"
19 
20 #include "access/twophase.h"
21 #include "access/xact.h"
22 #include "distributed/backend_data.h"
23 #include "distributed/citus_safe_lib.h"
24 #include "distributed/connection_management.h"
25 #include "distributed/distributed_planner.h"
26 #include "distributed/hash_helpers.h"
27 #include "distributed/intermediate_results.h"
28 #include "distributed/listutils.h"
29 #include "distributed/local_executor.h"
30 #include "distributed/locally_reserved_shared_connections.h"
31 #include "distributed/maintenanced.h"
32 #include "distributed/multi_executor.h"
33 #include "distributed/multi_explain.h"
34 #include "distributed/repartition_join_execution.h"
35 #include "distributed/transaction_management.h"
36 #include "distributed/placement_connection.h"
37 #include "distributed/shared_connection_stats.h"
38 #include "distributed/subplan_execution.h"
39 #include "distributed/version_compat.h"
40 #include "distributed/worker_log_messages.h"
41 #include "utils/hsearch.h"
42 #include "utils/guc.h"
43 #include "utils/memutils.h"
44 #include "storage/fd.h"
45 
46 
47 CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
48 
49 /* GUC, the commit protocol to use for commands affecting more than one connection */
50 int MultiShardCommitProtocol = COMMIT_PROTOCOL_2PC;
51 int SingleShardCommitProtocol = COMMIT_PROTOCOL_2PC;
52 int SavedMultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
53 
54 /*
55  * GUC that determines whether a SELECT in a transaction block should also run in
56  * a transaction block on the worker even if no writes have occurred yet.
57  */
58 bool SelectOpensTransactionBlock = true;
59 
60 /* controls use of locks to enforce safe commutativity */
61 bool AllModificationsCommutative = false;
62 
63 /* we've deprecated this flag, keeping here for some time not to break existing users */
64 bool EnableDeadlockPrevention = true;
65 
66 /* number of nested stored procedure call levels we are currently in */
67 int StoredProcedureLevel = 0;
68 
69 /* number of nested DO block levels we are currently in */
70 int DoBlockLevel = 0;
71 
72 /* state needed to keep track of operations used during a transaction */
73 XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE;
74 
75 /* list of connections that are part of the current coordinated transaction */
76 dlist_head InProgressTransactions = DLIST_STATIC_INIT(InProgressTransactions);
77 
78 /*
79  * activeSetStmts keeps track of SET LOCAL statements executed within the current
80  * subxact and will be set to NULL when pushing into new subxact or ending top xact.
81  */
82 StringInfo activeSetStmts;
83 
84 /*
85  * Though a list, we treat this as a stack, pushing on subxact contexts whenever
86  * e.g. a SAVEPOINT is executed (though this is actually performed by providing
87  * PostgreSQL with a sub-xact callback). At present, the context of a subxact
88  * includes a subxact identifier as well as any SET LOCAL statements propagated
89  * to workers during the sub-transaction.
90  */
91 static List *activeSubXactContexts = NIL;
92 
93 /* some pre-allocated memory so we don't need to call malloc() during callbacks */
94 MemoryContext CommitContext = NULL;
95 
96 /*
97  * Should this coordinated transaction use 2PC? Set by
98  * CoordinatedTransactionUse2PC(), e.g. if DDL was issued and
99  * MultiShardCommitProtocol was set to 2PC. But, even if this
100  * flag is set, the transaction manager is smart enough to only
101  * do 2PC on the remote connections that did a modification.
102  *
103  * As a variable name ShouldCoordinatedTransactionUse2PC could
104  * be improved. We use Use2PCForCoordinatedTransaction() as the
105  * public API function, hence couldn't come up with a better name
106  * for the underlying variable at the moment.
107  */
108 bool ShouldCoordinatedTransactionUse2PC = false;
109 
110 /* if disabled, distributed statements in a function may run as separate transactions */
111 bool FunctionOpensTransactionBlock = true;
112 
113 /* if true, we should trigger metadata sync on commit */
114 bool MetadataSyncOnCommit = false;
115 
116 
117 /* transaction management functions */
118 static void CoordinatedTransactionCallback(XactEvent event, void *arg);
119 static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
120 											  SubTransactionId parentSubid, void *arg);
121 
122 /* remaining functions */
123 static void ResetShardPlacementTransactionState(void);
124 static void AdjustMaxPreparedTransactions(void);
125 static void PushSubXact(SubTransactionId subId);
126 static void PopSubXact(SubTransactionId subId);
127 static bool MaybeExecutingUDF(void);
128 static void ResetGlobalVariables(void);
129 static bool SwallowErrors(void (*func)(void));
130 static void ForceAllInProgressConnectionsToClose(void);
131 
132 
133 /*
134  * UseCoordinatedTransaction sets up the necessary variables to use
135  * a coordinated transaction, unless one is already in progress.
136  */
137 void
UseCoordinatedTransaction(void)138 UseCoordinatedTransaction(void)
139 {
140 	if (CurrentCoordinatedTransactionState == COORD_TRANS_STARTED)
141 	{
142 		return;
143 	}
144 
145 	if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE &&
146 		CurrentCoordinatedTransactionState != COORD_TRANS_IDLE)
147 	{
148 		ereport(ERROR, (errmsg("starting transaction in wrong state")));
149 	}
150 
151 	CurrentCoordinatedTransactionState = COORD_TRANS_STARTED;
152 
153 	/*
154 	 * If assign_distributed_transaction_id() has been called, we should reuse
155 	 * that identifier so distributed deadlock detection works properly.
156 	 */
157 	DistributedTransactionId *transactionId = GetCurrentDistributedTransactionId();
158 	if (transactionId->transactionNumber == 0)
159 	{
160 		AssignDistributedTransactionId();
161 	}
162 }
163 
164 
165 /*
166  * EnsureDistributedTransactionId makes sure that the current transaction
167  * has a distributed transaction id. It is either assigned by a previous
168  * call of assign_distributed_transaction_id(), or by starting a coordinated
169  * transaction.
170  */
171 void
EnsureDistributedTransactionId(void)172 EnsureDistributedTransactionId(void)
173 {
174 	DistributedTransactionId *transactionId = GetCurrentDistributedTransactionId();
175 	if (transactionId->transactionNumber == 0)
176 	{
177 		UseCoordinatedTransaction();
178 	}
179 }
180 
181 
182 /*
183  * InCoordinatedTransaction returns whether a coordinated transaction has been
184  * started.
185  */
186 bool
InCoordinatedTransaction(void)187 InCoordinatedTransaction(void)
188 {
189 	return CurrentCoordinatedTransactionState != COORD_TRANS_NONE &&
190 		   CurrentCoordinatedTransactionState != COORD_TRANS_IDLE;
191 }
192 
193 
194 /*
195  * Use2PCForCoordinatedTransaction() signals that the current coordinated
196  * transaction should use 2PC to commit.
197  *
198  * Note that even if 2PC is enabled, it is only used for connections that make
199  * modification (DML or DDL).
200  */
201 void
Use2PCForCoordinatedTransaction(void)202 Use2PCForCoordinatedTransaction(void)
203 {
204 	Assert(InCoordinatedTransaction());
205 
206 	ShouldCoordinatedTransactionUse2PC = true;
207 }
208 
209 
210 /*
211  * GetCoordinatedTransactionShouldUse2PC is a wrapper function to read the value
212  * of CoordinatedTransactionShouldUse2PCFlag.
213  */
214 bool
GetCoordinatedTransactionShouldUse2PC(void)215 GetCoordinatedTransactionShouldUse2PC(void)
216 {
217 	return ShouldCoordinatedTransactionUse2PC;
218 }
219 
220 
221 void
InitializeTransactionManagement(void)222 InitializeTransactionManagement(void)
223 {
224 	/* hook into transaction machinery */
225 	RegisterXactCallback(CoordinatedTransactionCallback, NULL);
226 	RegisterSubXactCallback(CoordinatedSubTransactionCallback, NULL);
227 
228 	AdjustMaxPreparedTransactions();
229 
230 	/* set aside 8kb of memory for use in CoordinatedTransactionCallback */
231 	CommitContext = AllocSetContextCreateExtended(TopMemoryContext,
232 												  "CommitContext",
233 												  8 * 1024,
234 												  8 * 1024,
235 												  8 * 1024);
236 }
237 
238 
239 /*
240  * Transaction management callback, handling coordinated transaction, and
241  * transaction independent connection management.
242  *
243  * NB: There should only ever be a single transaction callback in citus, the
244  * ordering between the callbacks and thee actions within those callbacks
245  * otherwise becomes too undeterministic / hard to reason about.
246  */
247 static void
CoordinatedTransactionCallback(XactEvent event,void * arg)248 CoordinatedTransactionCallback(XactEvent event, void *arg)
249 {
250 	switch (event)
251 	{
252 		case XACT_EVENT_COMMIT:
253 		{
254 			/*
255 			 * ERRORs thrown during XACT_EVENT_COMMIT will cause postgres to abort, at
256 			 * this point enough work has been done that it's not possible to rollback.
257 			 *
258 			 * One possible source of errors is memory allocation failures. To minimize
259 			 * the chance of those happening we've pre-allocated some memory in the
260 			 * CommitContext, it has 8kb of memory that we're allowed to use.
261 			 *
262 			 * We only do this in the COMMIT callback because:
263 			 * - Errors thrown in other callbacks (such as PRE_COMMIT) won't cause
264 			 *   crashes, they will simply cause the ABORT handler to be called.
265 			 * - The exception is ABORT, errors thrown there could also cause crashes, but
266 			 *   postgres already creates a TransactionAbortContext which performs this
267 			 *   trick, so there's no need for us to do it again.
268 			 */
269 			MemoryContext previousContext = CurrentMemoryContext;
270 			MemoryContextSwitchTo(CommitContext);
271 
272 			/*
273 			 * Call other parts of citus that need to integrate into
274 			 * transaction management. Do so before doing other work, so the
275 			 * callbacks still can perform work if needed.
276 			 */
277 			ResetShardPlacementTransactionState();
278 
279 			if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED)
280 			{
281 				/* handles both already prepared and open transactions */
282 				CoordinatedRemoteTransactionsCommit();
283 			}
284 
285 			/* close connections etc. */
286 			if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE)
287 			{
288 				ResetPlacementConnectionManagement();
289 				AfterXactConnectionHandling(true);
290 			}
291 
292 			/*
293 			 * Changes to catalog tables are now visible to the metadata sync
294 			 * daemon, so we can trigger metadata sync if necessary.
295 			 */
296 			if (MetadataSyncOnCommit)
297 			{
298 				TriggerMetadataSync(MyDatabaseId);
299 			}
300 
301 			ResetGlobalVariables();
302 
303 			/*
304 			 * Make sure that we give the shared connections back to the shared
305 			 * pool if any. This operation is a no-op if the reserved connections
306 			 * are already given away.
307 			 */
308 			DeallocateReservedConnections();
309 
310 			UnSetDistributedTransactionId();
311 
312 			/* empty the CommitContext to ensure we're not leaking memory */
313 			MemoryContextSwitchTo(previousContext);
314 			MemoryContextReset(CommitContext);
315 			break;
316 		}
317 
318 		case XACT_EVENT_ABORT:
319 		{
320 			/* stop propagating notices from workers, we know the query is failed */
321 			DisableWorkerMessagePropagation();
322 
323 			RemoveIntermediateResultsDirectory();
324 
325 			ResetShardPlacementTransactionState();
326 
327 			/* handles both already prepared and open transactions */
328 			if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE)
329 			{
330 				/*
331 				 * Since CoordinateRemoteTransactionsAbort may cause an error and it is
332 				 * not allowed to error out at that point, swallow the error if any.
333 				 *
334 				 * Particular error we've observed was CreateWaitEventSet throwing an error
335 				 * when out of file descriptor.
336 				 *
337 				 * If an error is swallowed, connections of all active transactions must
338 				 * be forced to close at the end of the transaction explicitly.
339 				 */
340 				bool errorSwallowed = SwallowErrors(CoordinatedRemoteTransactionsAbort);
341 				if (errorSwallowed == true)
342 				{
343 					ForceAllInProgressConnectionsToClose();
344 				}
345 			}
346 
347 			/*
348 			 * Close connections etc. Contrary to a successful transaction we reset the
349 			 * placement connection management irregardless of state of the statemachine
350 			 * as recorded in CurrentCoordinatedTransactionState.
351 			 * The hashmaps recording the connection management live a memory context
352 			 * higher compared to most of the data referenced in the hashmap. This causes
353 			 * use after free errors when the contents are retained due to an error caused
354 			 * before the CurrentCoordinatedTransactionState changed.
355 			 */
356 			ResetPlacementConnectionManagement();
357 			AfterXactConnectionHandling(false);
358 
359 			ResetGlobalVariables();
360 
361 			/*
362 			 * Make sure that we give the shared connections back to the shared
363 			 * pool if any. This operation is a no-op if the reserved connections
364 			 * are already given away.
365 			 */
366 			DeallocateReservedConnections();
367 
368 			/*
369 			 * We reset these mainly for posterity. The only way we would normally
370 			 * get here with ExecutorLevel or PlannerLevel > 0 is during a fatal
371 			 * error when the process is about to end.
372 			 */
373 			ExecutorLevel = 0;
374 			PlannerLevel = 0;
375 
376 			/*
377 			 * We should reset SubPlanLevel in case a transaction is aborted,
378 			 * otherwise this variable would stay +ve if the transaction is
379 			 * aborted in the middle of a CTE/complex subquery execution
380 			 * which would cause the subsequent queries to error out in
381 			 * case the copy size is greater than
382 			 * citus.max_intermediate_result_size
383 			 */
384 			SubPlanLevel = 0;
385 			UnSetDistributedTransactionId();
386 			break;
387 		}
388 
389 		case XACT_EVENT_PARALLEL_COMMIT:
390 		case XACT_EVENT_PARALLEL_ABORT:
391 		{
392 			break;
393 		}
394 
395 		case XACT_EVENT_PREPARE:
396 		{
397 			/* we need to reset SavedExplainPlan before TopTransactionContext is deleted */
398 			FreeSavedExplainPlan();
399 
400 			/*
401 			 * This callback is only relevant for worker queries since
402 			 * distributed queries cannot be executed with 2PC, see
403 			 * XACT_EVENT_PRE_PREPARE.
404 			 *
405 			 * We should remove the intermediate results before unsetting the
406 			 * distributed transaction id. That is necessary, otherwise Citus
407 			 * would try to remove a non-existing folder and leak some of the
408 			 * existing folders that are associated with distributed transaction
409 			 * ids on the worker nodes.
410 			 */
411 			RemoveIntermediateResultsDirectory();
412 
413 			UnSetDistributedTransactionId();
414 			break;
415 		}
416 
417 		case XACT_EVENT_PRE_COMMIT:
418 		{
419 			/*
420 			 * If the distributed query involves 2PC, we already removed
421 			 * the intermediate result directory on XACT_EVENT_PREPARE. However,
422 			 * if not, we should remove it here on the COMMIT. Since
423 			 * RemoveIntermediateResultsDirectory() is idempotent, we're safe
424 			 * to call it here again even if the transaction involves 2PC.
425 			 */
426 			RemoveIntermediateResultsDirectory();
427 
428 			/* nothing further to do if there's no managed remote xacts */
429 			if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE)
430 			{
431 				break;
432 			}
433 
434 			/*
435 			 * TODO: It'd probably be a good idea to force constraints and
436 			 * such to 'immediate' here. Deferred triggers might try to send
437 			 * stuff to the remote side, which'd not be good.  Doing so
438 			 * remotely would also catch a class of errors where committing
439 			 * fails, which can lead to divergence when not using 2PC.
440 			 */
441 
442 			/*
443 			 * Check whether the coordinated transaction is in a state we want
444 			 * to persist, or whether we want to error out.  This handles the
445 			 * case where iteratively executed commands marked all placements
446 			 * as invalid.
447 			 */
448 			MarkFailedShardPlacements();
449 
450 			if (ShouldCoordinatedTransactionUse2PC)
451 			{
452 				CoordinatedRemoteTransactionsPrepare();
453 				CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED;
454 
455 				/*
456 				 * Make sure we did not have any failures on connections marked as
457 				 * critical before committing.
458 				 */
459 				CheckRemoteTransactionsHealth();
460 			}
461 			else
462 			{
463 				CheckRemoteTransactionsHealth();
464 
465 				/*
466 				 * Have to commit remote transactions in PRE_COMMIT, to allow
467 				 * us to mark failed placements as invalid.  Better don't use
468 				 * this for anything important (i.e. DDL/metadata).
469 				 */
470 				CoordinatedRemoteTransactionsCommit();
471 				CurrentCoordinatedTransactionState = COORD_TRANS_COMMITTED;
472 			}
473 
474 			/*
475 			 * Check again whether shards/placement successfully
476 			 * committed. This handles failure at COMMIT/PREPARE time.
477 			 */
478 			PostCommitMarkFailedShardPlacements(ShouldCoordinatedTransactionUse2PC);
479 			break;
480 		}
481 
482 		case XACT_EVENT_PARALLEL_PRE_COMMIT:
483 		case XACT_EVENT_PRE_PREPARE:
484 		{
485 			if (InCoordinatedTransaction())
486 			{
487 				ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
488 								errmsg("cannot use 2PC in transactions involving "
489 									   "multiple servers")));
490 			}
491 			break;
492 		}
493 	}
494 }
495 
496 
497 /*
498  * ForceAllInProgressConnectionsToClose forces all connections of in progress transactions
499  * to close at the end of the transaction.
500  */
501 static void
ForceAllInProgressConnectionsToClose(void)502 ForceAllInProgressConnectionsToClose(void)
503 {
504 	dlist_iter iter;
505 	dlist_foreach(iter, &InProgressTransactions)
506 	{
507 		MultiConnection *connection = dlist_container(MultiConnection,
508 													  transactionNode,
509 													  iter.cur);
510 
511 		connection->forceCloseAtTransactionEnd = true;
512 	}
513 }
514 
515 
516 /*
517  * If an ERROR is thrown while processing a transaction the ABORT handler is called.
518  * ERRORS thrown during ABORT are not treated any differently, the ABORT handler is also
519  * called during processing of those. If an ERROR was raised the first time through it's
520  * unlikely that the second try will succeed; more likely that an ERROR will be thrown
521  * again. This loop continues until Postgres notices and PANICs, complaining about a stack
522  * overflow.
523  *
524  * Instead of looping and crashing, SwallowErrors lets us attempt to continue running the
525  * ABORT logic. This wouldn't be safe in most other parts of the codebase, in
526  * approximately none of the places where we emit ERROR do we first clean up after
527  * ourselves! It's fine inside the ABORT handler though; Postgres is going to clean
528  * everything up before control passes back to us.
529  *
530  * If it swallows any error, returns true. Otherwise, returns false.
531  */
532 static bool
SwallowErrors(void (* func)())533 SwallowErrors(void (*func)())
534 {
535 	MemoryContext savedContext = CurrentMemoryContext;
536 	volatile bool anyErrorSwallowed = false;
537 
538 	PG_TRY();
539 	{
540 		func();
541 	}
542 	PG_CATCH();
543 	{
544 		MemoryContextSwitchTo(savedContext);
545 		ErrorData *edata = CopyErrorData();
546 		FlushErrorState();
547 
548 		/* rethrow as WARNING */
549 		edata->elevel = WARNING;
550 		ThrowErrorData(edata);
551 
552 		anyErrorSwallowed = true;
553 	}
554 	PG_END_TRY();
555 
556 	return anyErrorSwallowed;
557 }
558 
559 
560 /*
561  * ResetGlobalVariables resets global variables that
562  * might be changed during the execution of queries.
563  */
564 static void
ResetGlobalVariables()565 ResetGlobalVariables()
566 {
567 	CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
568 	XactModificationLevel = XACT_MODIFICATION_NONE;
569 	SetLocalExecutionStatus(LOCAL_EXECUTION_OPTIONAL);
570 	FreeSavedExplainPlan();
571 	dlist_init(&InProgressTransactions);
572 	activeSetStmts = NULL;
573 	ShouldCoordinatedTransactionUse2PC = false;
574 	TransactionModifiedNodeMetadata = false;
575 	MetadataSyncOnCommit = false;
576 	ResetWorkerErrorIndication();
577 }
578 
579 
580 /*
581  * ResetShardPlacementTransactionState performs cleanup after the end of a
582  * transaction.
583  */
584 static void
ResetShardPlacementTransactionState(void)585 ResetShardPlacementTransactionState(void)
586 {
587 	if (MultiShardCommitProtocol == COMMIT_PROTOCOL_BARE)
588 	{
589 		MultiShardCommitProtocol = SavedMultiShardCommitProtocol;
590 		SavedMultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
591 	}
592 }
593 
594 
595 /*
596  * CoordinatedSubTransactionCallback is the callback used to implement
597  * distributed ROLLBACK TO SAVEPOINT.
598  */
599 static void
CoordinatedSubTransactionCallback(SubXactEvent event,SubTransactionId subId,SubTransactionId parentSubid,void * arg)600 CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
601 								  SubTransactionId parentSubid, void *arg)
602 {
603 	switch (event)
604 	{
605 		/*
606 		 * Our subtransaction stack should be consistent with postgres' internal
607 		 * transaction stack. In case of subxact begin, postgres calls our
608 		 * callback after it has pushed the transaction into stack, so we have to
609 		 * do the same even if worker commands fail, so we PushSubXact() first.
610 		 * In case of subxact commit, callback is called before pushing subxact to
611 		 * the postgres transaction stack, so we call PopSubXact() after making sure
612 		 * worker commands didn't fail. Otherwise, Postgres would roll back that
613 		 * would cause us to call PopSubXact again.
614 		 */
615 		case SUBXACT_EVENT_START_SUB:
616 		{
617 			PushSubXact(subId);
618 			if (InCoordinatedTransaction())
619 			{
620 				CoordinatedRemoteTransactionsSavepointBegin(subId);
621 			}
622 			break;
623 		}
624 
625 		case SUBXACT_EVENT_COMMIT_SUB:
626 		{
627 			if (InCoordinatedTransaction())
628 			{
629 				CoordinatedRemoteTransactionsSavepointRelease(subId);
630 			}
631 			PopSubXact(subId);
632 			break;
633 		}
634 
635 		case SUBXACT_EVENT_ABORT_SUB:
636 		{
637 			/*
638 			 * Stop showing message for now, will re-enable when executing
639 			 * the next statement.
640 			 */
641 			DisableWorkerMessagePropagation();
642 
643 			/*
644 			 * Given that we aborted, worker error indications can be ignored.
645 			 */
646 			ResetWorkerErrorIndication();
647 
648 			if (InCoordinatedTransaction())
649 			{
650 				CoordinatedRemoteTransactionsSavepointRollback(subId);
651 			}
652 			PopSubXact(subId);
653 
654 			break;
655 		}
656 
657 		case SUBXACT_EVENT_PRE_COMMIT_SUB:
658 		{
659 			/* nothing to do */
660 			break;
661 		}
662 	}
663 }
664 
665 
666 /*
667  * AdjustMaxPreparedTransactions configures the number of available prepared
668  * transaction slots at startup.
669  */
670 static void
AdjustMaxPreparedTransactions(void)671 AdjustMaxPreparedTransactions(void)
672 {
673 	/*
674 	 * As Citus uses 2PC internally, there always should be some available. As
675 	 * the default is 0, we increase it to something appropriate
676 	 * (connections * 2 currently).  If the user explicitly configured 2PC, we
677 	 * leave the configuration alone - there might have been intent behind the
678 	 * decision.
679 	 */
680 	if (max_prepared_xacts == 0)
681 	{
682 		char newvalue[12];
683 
684 		SafeSnprintf(newvalue, sizeof(newvalue), "%d", MaxConnections * 2);
685 
686 		SetConfigOption("max_prepared_transactions", newvalue, PGC_POSTMASTER,
687 						PGC_S_OVERRIDE);
688 
689 		ereport(LOG, (errmsg("number of prepared transactions has not been "
690 							 "configured, overriding"),
691 					  errdetail("max_prepared_transactions is now set to %s",
692 								newvalue)));
693 	}
694 }
695 
696 
697 /* PushSubXact pushes subId to the stack of active sub-transactions. */
698 static void
PushSubXact(SubTransactionId subId)699 PushSubXact(SubTransactionId subId)
700 {
701 	/*
702 	 * We need to allocate these in TopTransactionContext instead of current
703 	 * subxact's memory context. This is because AtSubCommit_Memory won't
704 	 * delete the subxact's memory context unless it is empty, and this
705 	 * can cause in memory leaks. For emptiness it just checks if the memory
706 	 * has been reset, and we cannot reset the subxact context since other
707 	 * data can be in the context that are needed by upper commits.
708 	 *
709 	 * See https://github.com/citusdata/citus/issues/3999
710 	 */
711 	MemoryContext old_context = MemoryContextSwitchTo(TopTransactionContext);
712 
713 	/* save provided subId as well as propagated SET LOCAL stmts */
714 	SubXactContext *state = palloc(sizeof(SubXactContext));
715 	state->subId = subId;
716 	state->setLocalCmds = activeSetStmts;
717 
718 	/* append to list and reset active set stmts for upcoming sub-xact */
719 	activeSubXactContexts = lcons(state, activeSubXactContexts);
720 	activeSetStmts = makeStringInfo();
721 
722 	MemoryContextSwitchTo(old_context);
723 }
724 
725 
726 /* PopSubXact pops subId from the stack of active sub-transactions. */
727 static void
PopSubXact(SubTransactionId subId)728 PopSubXact(SubTransactionId subId)
729 {
730 	SubXactContext *state = linitial(activeSubXactContexts);
731 
732 	Assert(state->subId == subId);
733 
734 	/*
735 	 * Free activeSetStmts to avoid memory leaks when we create subxacts
736 	 * for each row, e.g. in exception handling of UDFs.
737 	 */
738 	if (activeSetStmts != NULL)
739 	{
740 		pfree(activeSetStmts->data);
741 		pfree(activeSetStmts);
742 	}
743 
744 	/*
745 	 * SET LOCAL commands are local to subxact blocks. When a subxact commits
746 	 * or rolls back, we should roll back our set of SET LOCAL commands to the
747 	 * ones we had in the upper commit.
748 	 */
749 	activeSetStmts = state->setLocalCmds;
750 
751 	/*
752 	 * Free state to avoid memory leaks when we create subxacts for each row,
753 	 * e.g. in exception handling of UDFs.
754 	 */
755 	pfree(state);
756 
757 	activeSubXactContexts = list_delete_first(activeSubXactContexts);
758 }
759 
760 
761 /* ActiveSubXactContexts returns the list of active sub-xact context in temporal order. */
762 List *
ActiveSubXactContexts(void)763 ActiveSubXactContexts(void)
764 {
765 	List *reversedSubXactStates = NIL;
766 
767 	/*
768 	 * activeSubXactContexts is in reversed temporal order, so we reverse it to get it
769 	 * in temporal order.
770 	 */
771 	SubXactContext *state = NULL;
772 	foreach_ptr(state, activeSubXactContexts)
773 	{
774 		reversedSubXactStates = lcons(state, reversedSubXactStates);
775 	}
776 
777 	return reversedSubXactStates;
778 }
779 
780 
781 /*
782  * IsMultiStatementTransaction determines whether the current statement is
783  * part of a bigger multi-statement transaction. This is the case when the
784  * statement is wrapped in a transaction block (comes after BEGIN), or it
785  * is called from a stored procedure or function.
786  */
787 bool
IsMultiStatementTransaction(void)788 IsMultiStatementTransaction(void)
789 {
790 	if (IsTransactionBlock())
791 	{
792 		/* in a BEGIN...END block */
793 		return true;
794 	}
795 	else if (DoBlockLevel > 0)
796 	{
797 		/* in (a transaction within) a do block */
798 		return true;
799 	}
800 	else if (StoredProcedureLevel > 0)
801 	{
802 		/* in (a transaction within) a stored procedure */
803 		return true;
804 	}
805 	else if (MaybeExecutingUDF() && FunctionOpensTransactionBlock)
806 	{
807 		/* in a language-handler function call, open a transaction if configured to do so */
808 		return true;
809 	}
810 	else
811 	{
812 		return false;
813 	}
814 }
815 
816 
817 /*
818  * MaybeExecutingUDF returns true if we are possibly executing a function call.
819  * We use nested level of executor to check this, so this can return true for
820  * CTEs, etc. which also start nested executors.
821  *
822  * If the planner is being called from the executor, then we may also be in
823  * a UDF.
824  */
825 static bool
MaybeExecutingUDF(void)826 MaybeExecutingUDF(void)
827 {
828 	return ExecutorLevel > 1 || (ExecutorLevel == 1 && PlannerLevel > 0);
829 }
830 
831 
832 /*
833  * TriggerMetadataSyncOnCommit sets a flag to do metadata sync on commit.
834  * This is because new metadata only becomes visible to the metadata sync
835  * daemon after commit happens.
836  */
837 void
TriggerMetadataSyncOnCommit(void)838 TriggerMetadataSyncOnCommit(void)
839 {
840 	MetadataSyncOnCommit = true;
841 }
842