1 /*-------------------------------------------------------------------------
2 *
3 * transaction_management.c
4 *
5 * Transaction management for Citus. Most of the work is delegated to other
6 * subsystems, this files, and especially CoordinatedTransactionCallback,
7 * coordinates the work between them.
8 *
9 * Copyright (c) Citus Data, Inc.
10 *
11 *-------------------------------------------------------------------------
12 */
13
14 #include "postgres.h"
15
16 #include "libpq-fe.h"
17
18 #include "miscadmin.h"
19
20 #include "access/twophase.h"
21 #include "access/xact.h"
22 #include "distributed/backend_data.h"
23 #include "distributed/citus_safe_lib.h"
24 #include "distributed/connection_management.h"
25 #include "distributed/distributed_planner.h"
26 #include "distributed/hash_helpers.h"
27 #include "distributed/intermediate_results.h"
28 #include "distributed/listutils.h"
29 #include "distributed/local_executor.h"
30 #include "distributed/locally_reserved_shared_connections.h"
31 #include "distributed/maintenanced.h"
32 #include "distributed/multi_executor.h"
33 #include "distributed/multi_explain.h"
34 #include "distributed/repartition_join_execution.h"
35 #include "distributed/transaction_management.h"
36 #include "distributed/placement_connection.h"
37 #include "distributed/shared_connection_stats.h"
38 #include "distributed/subplan_execution.h"
39 #include "distributed/version_compat.h"
40 #include "distributed/worker_log_messages.h"
41 #include "utils/hsearch.h"
42 #include "utils/guc.h"
43 #include "utils/memutils.h"
44 #include "storage/fd.h"
45
46
47 CoordinatedTransactionState CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
48
49 /* GUC, the commit protocol to use for commands affecting more than one connection */
50 int MultiShardCommitProtocol = COMMIT_PROTOCOL_2PC;
51 int SingleShardCommitProtocol = COMMIT_PROTOCOL_2PC;
52 int SavedMultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
53
54 /*
55 * GUC that determines whether a SELECT in a transaction block should also run in
56 * a transaction block on the worker even if no writes have occurred yet.
57 */
58 bool SelectOpensTransactionBlock = true;
59
60 /* controls use of locks to enforce safe commutativity */
61 bool AllModificationsCommutative = false;
62
63 /* we've deprecated this flag, keeping here for some time not to break existing users */
64 bool EnableDeadlockPrevention = true;
65
66 /* number of nested stored procedure call levels we are currently in */
67 int StoredProcedureLevel = 0;
68
69 /* number of nested DO block levels we are currently in */
70 int DoBlockLevel = 0;
71
72 /* state needed to keep track of operations used during a transaction */
73 XactModificationType XactModificationLevel = XACT_MODIFICATION_NONE;
74
75 /* list of connections that are part of the current coordinated transaction */
76 dlist_head InProgressTransactions = DLIST_STATIC_INIT(InProgressTransactions);
77
78 /*
79 * activeSetStmts keeps track of SET LOCAL statements executed within the current
80 * subxact and will be set to NULL when pushing into new subxact or ending top xact.
81 */
82 StringInfo activeSetStmts;
83
84 /*
85 * Though a list, we treat this as a stack, pushing on subxact contexts whenever
86 * e.g. a SAVEPOINT is executed (though this is actually performed by providing
87 * PostgreSQL with a sub-xact callback). At present, the context of a subxact
88 * includes a subxact identifier as well as any SET LOCAL statements propagated
89 * to workers during the sub-transaction.
90 */
91 static List *activeSubXactContexts = NIL;
92
93 /* some pre-allocated memory so we don't need to call malloc() during callbacks */
94 MemoryContext CommitContext = NULL;
95
96 /*
97 * Should this coordinated transaction use 2PC? Set by
98 * CoordinatedTransactionUse2PC(), e.g. if DDL was issued and
99 * MultiShardCommitProtocol was set to 2PC. But, even if this
100 * flag is set, the transaction manager is smart enough to only
101 * do 2PC on the remote connections that did a modification.
102 *
103 * As a variable name ShouldCoordinatedTransactionUse2PC could
104 * be improved. We use Use2PCForCoordinatedTransaction() as the
105 * public API function, hence couldn't come up with a better name
106 * for the underlying variable at the moment.
107 */
108 bool ShouldCoordinatedTransactionUse2PC = false;
109
110 /* if disabled, distributed statements in a function may run as separate transactions */
111 bool FunctionOpensTransactionBlock = true;
112
113 /* if true, we should trigger metadata sync on commit */
114 bool MetadataSyncOnCommit = false;
115
116
117 /* transaction management functions */
118 static void CoordinatedTransactionCallback(XactEvent event, void *arg);
119 static void CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
120 SubTransactionId parentSubid, void *arg);
121
122 /* remaining functions */
123 static void ResetShardPlacementTransactionState(void);
124 static void AdjustMaxPreparedTransactions(void);
125 static void PushSubXact(SubTransactionId subId);
126 static void PopSubXact(SubTransactionId subId);
127 static bool MaybeExecutingUDF(void);
128 static void ResetGlobalVariables(void);
129 static bool SwallowErrors(void (*func)(void));
130 static void ForceAllInProgressConnectionsToClose(void);
131
132
133 /*
134 * UseCoordinatedTransaction sets up the necessary variables to use
135 * a coordinated transaction, unless one is already in progress.
136 */
137 void
UseCoordinatedTransaction(void)138 UseCoordinatedTransaction(void)
139 {
140 if (CurrentCoordinatedTransactionState == COORD_TRANS_STARTED)
141 {
142 return;
143 }
144
145 if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE &&
146 CurrentCoordinatedTransactionState != COORD_TRANS_IDLE)
147 {
148 ereport(ERROR, (errmsg("starting transaction in wrong state")));
149 }
150
151 CurrentCoordinatedTransactionState = COORD_TRANS_STARTED;
152
153 /*
154 * If assign_distributed_transaction_id() has been called, we should reuse
155 * that identifier so distributed deadlock detection works properly.
156 */
157 DistributedTransactionId *transactionId = GetCurrentDistributedTransactionId();
158 if (transactionId->transactionNumber == 0)
159 {
160 AssignDistributedTransactionId();
161 }
162 }
163
164
165 /*
166 * EnsureDistributedTransactionId makes sure that the current transaction
167 * has a distributed transaction id. It is either assigned by a previous
168 * call of assign_distributed_transaction_id(), or by starting a coordinated
169 * transaction.
170 */
171 void
EnsureDistributedTransactionId(void)172 EnsureDistributedTransactionId(void)
173 {
174 DistributedTransactionId *transactionId = GetCurrentDistributedTransactionId();
175 if (transactionId->transactionNumber == 0)
176 {
177 UseCoordinatedTransaction();
178 }
179 }
180
181
182 /*
183 * InCoordinatedTransaction returns whether a coordinated transaction has been
184 * started.
185 */
186 bool
InCoordinatedTransaction(void)187 InCoordinatedTransaction(void)
188 {
189 return CurrentCoordinatedTransactionState != COORD_TRANS_NONE &&
190 CurrentCoordinatedTransactionState != COORD_TRANS_IDLE;
191 }
192
193
194 /*
195 * Use2PCForCoordinatedTransaction() signals that the current coordinated
196 * transaction should use 2PC to commit.
197 *
198 * Note that even if 2PC is enabled, it is only used for connections that make
199 * modification (DML or DDL).
200 */
201 void
Use2PCForCoordinatedTransaction(void)202 Use2PCForCoordinatedTransaction(void)
203 {
204 Assert(InCoordinatedTransaction());
205
206 ShouldCoordinatedTransactionUse2PC = true;
207 }
208
209
210 /*
211 * GetCoordinatedTransactionShouldUse2PC is a wrapper function to read the value
212 * of CoordinatedTransactionShouldUse2PCFlag.
213 */
214 bool
GetCoordinatedTransactionShouldUse2PC(void)215 GetCoordinatedTransactionShouldUse2PC(void)
216 {
217 return ShouldCoordinatedTransactionUse2PC;
218 }
219
220
221 void
InitializeTransactionManagement(void)222 InitializeTransactionManagement(void)
223 {
224 /* hook into transaction machinery */
225 RegisterXactCallback(CoordinatedTransactionCallback, NULL);
226 RegisterSubXactCallback(CoordinatedSubTransactionCallback, NULL);
227
228 AdjustMaxPreparedTransactions();
229
230 /* set aside 8kb of memory for use in CoordinatedTransactionCallback */
231 CommitContext = AllocSetContextCreateExtended(TopMemoryContext,
232 "CommitContext",
233 8 * 1024,
234 8 * 1024,
235 8 * 1024);
236 }
237
238
239 /*
240 * Transaction management callback, handling coordinated transaction, and
241 * transaction independent connection management.
242 *
243 * NB: There should only ever be a single transaction callback in citus, the
244 * ordering between the callbacks and thee actions within those callbacks
245 * otherwise becomes too undeterministic / hard to reason about.
246 */
247 static void
CoordinatedTransactionCallback(XactEvent event,void * arg)248 CoordinatedTransactionCallback(XactEvent event, void *arg)
249 {
250 switch (event)
251 {
252 case XACT_EVENT_COMMIT:
253 {
254 /*
255 * ERRORs thrown during XACT_EVENT_COMMIT will cause postgres to abort, at
256 * this point enough work has been done that it's not possible to rollback.
257 *
258 * One possible source of errors is memory allocation failures. To minimize
259 * the chance of those happening we've pre-allocated some memory in the
260 * CommitContext, it has 8kb of memory that we're allowed to use.
261 *
262 * We only do this in the COMMIT callback because:
263 * - Errors thrown in other callbacks (such as PRE_COMMIT) won't cause
264 * crashes, they will simply cause the ABORT handler to be called.
265 * - The exception is ABORT, errors thrown there could also cause crashes, but
266 * postgres already creates a TransactionAbortContext which performs this
267 * trick, so there's no need for us to do it again.
268 */
269 MemoryContext previousContext = CurrentMemoryContext;
270 MemoryContextSwitchTo(CommitContext);
271
272 /*
273 * Call other parts of citus that need to integrate into
274 * transaction management. Do so before doing other work, so the
275 * callbacks still can perform work if needed.
276 */
277 ResetShardPlacementTransactionState();
278
279 if (CurrentCoordinatedTransactionState == COORD_TRANS_PREPARED)
280 {
281 /* handles both already prepared and open transactions */
282 CoordinatedRemoteTransactionsCommit();
283 }
284
285 /* close connections etc. */
286 if (CurrentCoordinatedTransactionState != COORD_TRANS_NONE)
287 {
288 ResetPlacementConnectionManagement();
289 AfterXactConnectionHandling(true);
290 }
291
292 /*
293 * Changes to catalog tables are now visible to the metadata sync
294 * daemon, so we can trigger metadata sync if necessary.
295 */
296 if (MetadataSyncOnCommit)
297 {
298 TriggerMetadataSync(MyDatabaseId);
299 }
300
301 ResetGlobalVariables();
302
303 /*
304 * Make sure that we give the shared connections back to the shared
305 * pool if any. This operation is a no-op if the reserved connections
306 * are already given away.
307 */
308 DeallocateReservedConnections();
309
310 UnSetDistributedTransactionId();
311
312 /* empty the CommitContext to ensure we're not leaking memory */
313 MemoryContextSwitchTo(previousContext);
314 MemoryContextReset(CommitContext);
315 break;
316 }
317
318 case XACT_EVENT_ABORT:
319 {
320 /* stop propagating notices from workers, we know the query is failed */
321 DisableWorkerMessagePropagation();
322
323 RemoveIntermediateResultsDirectory();
324
325 ResetShardPlacementTransactionState();
326
327 /* handles both already prepared and open transactions */
328 if (CurrentCoordinatedTransactionState > COORD_TRANS_IDLE)
329 {
330 /*
331 * Since CoordinateRemoteTransactionsAbort may cause an error and it is
332 * not allowed to error out at that point, swallow the error if any.
333 *
334 * Particular error we've observed was CreateWaitEventSet throwing an error
335 * when out of file descriptor.
336 *
337 * If an error is swallowed, connections of all active transactions must
338 * be forced to close at the end of the transaction explicitly.
339 */
340 bool errorSwallowed = SwallowErrors(CoordinatedRemoteTransactionsAbort);
341 if (errorSwallowed == true)
342 {
343 ForceAllInProgressConnectionsToClose();
344 }
345 }
346
347 /*
348 * Close connections etc. Contrary to a successful transaction we reset the
349 * placement connection management irregardless of state of the statemachine
350 * as recorded in CurrentCoordinatedTransactionState.
351 * The hashmaps recording the connection management live a memory context
352 * higher compared to most of the data referenced in the hashmap. This causes
353 * use after free errors when the contents are retained due to an error caused
354 * before the CurrentCoordinatedTransactionState changed.
355 */
356 ResetPlacementConnectionManagement();
357 AfterXactConnectionHandling(false);
358
359 ResetGlobalVariables();
360
361 /*
362 * Make sure that we give the shared connections back to the shared
363 * pool if any. This operation is a no-op if the reserved connections
364 * are already given away.
365 */
366 DeallocateReservedConnections();
367
368 /*
369 * We reset these mainly for posterity. The only way we would normally
370 * get here with ExecutorLevel or PlannerLevel > 0 is during a fatal
371 * error when the process is about to end.
372 */
373 ExecutorLevel = 0;
374 PlannerLevel = 0;
375
376 /*
377 * We should reset SubPlanLevel in case a transaction is aborted,
378 * otherwise this variable would stay +ve if the transaction is
379 * aborted in the middle of a CTE/complex subquery execution
380 * which would cause the subsequent queries to error out in
381 * case the copy size is greater than
382 * citus.max_intermediate_result_size
383 */
384 SubPlanLevel = 0;
385 UnSetDistributedTransactionId();
386 break;
387 }
388
389 case XACT_EVENT_PARALLEL_COMMIT:
390 case XACT_EVENT_PARALLEL_ABORT:
391 {
392 break;
393 }
394
395 case XACT_EVENT_PREPARE:
396 {
397 /* we need to reset SavedExplainPlan before TopTransactionContext is deleted */
398 FreeSavedExplainPlan();
399
400 /*
401 * This callback is only relevant for worker queries since
402 * distributed queries cannot be executed with 2PC, see
403 * XACT_EVENT_PRE_PREPARE.
404 *
405 * We should remove the intermediate results before unsetting the
406 * distributed transaction id. That is necessary, otherwise Citus
407 * would try to remove a non-existing folder and leak some of the
408 * existing folders that are associated with distributed transaction
409 * ids on the worker nodes.
410 */
411 RemoveIntermediateResultsDirectory();
412
413 UnSetDistributedTransactionId();
414 break;
415 }
416
417 case XACT_EVENT_PRE_COMMIT:
418 {
419 /*
420 * If the distributed query involves 2PC, we already removed
421 * the intermediate result directory on XACT_EVENT_PREPARE. However,
422 * if not, we should remove it here on the COMMIT. Since
423 * RemoveIntermediateResultsDirectory() is idempotent, we're safe
424 * to call it here again even if the transaction involves 2PC.
425 */
426 RemoveIntermediateResultsDirectory();
427
428 /* nothing further to do if there's no managed remote xacts */
429 if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE)
430 {
431 break;
432 }
433
434 /*
435 * TODO: It'd probably be a good idea to force constraints and
436 * such to 'immediate' here. Deferred triggers might try to send
437 * stuff to the remote side, which'd not be good. Doing so
438 * remotely would also catch a class of errors where committing
439 * fails, which can lead to divergence when not using 2PC.
440 */
441
442 /*
443 * Check whether the coordinated transaction is in a state we want
444 * to persist, or whether we want to error out. This handles the
445 * case where iteratively executed commands marked all placements
446 * as invalid.
447 */
448 MarkFailedShardPlacements();
449
450 if (ShouldCoordinatedTransactionUse2PC)
451 {
452 CoordinatedRemoteTransactionsPrepare();
453 CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED;
454
455 /*
456 * Make sure we did not have any failures on connections marked as
457 * critical before committing.
458 */
459 CheckRemoteTransactionsHealth();
460 }
461 else
462 {
463 CheckRemoteTransactionsHealth();
464
465 /*
466 * Have to commit remote transactions in PRE_COMMIT, to allow
467 * us to mark failed placements as invalid. Better don't use
468 * this for anything important (i.e. DDL/metadata).
469 */
470 CoordinatedRemoteTransactionsCommit();
471 CurrentCoordinatedTransactionState = COORD_TRANS_COMMITTED;
472 }
473
474 /*
475 * Check again whether shards/placement successfully
476 * committed. This handles failure at COMMIT/PREPARE time.
477 */
478 PostCommitMarkFailedShardPlacements(ShouldCoordinatedTransactionUse2PC);
479 break;
480 }
481
482 case XACT_EVENT_PARALLEL_PRE_COMMIT:
483 case XACT_EVENT_PRE_PREPARE:
484 {
485 if (InCoordinatedTransaction())
486 {
487 ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
488 errmsg("cannot use 2PC in transactions involving "
489 "multiple servers")));
490 }
491 break;
492 }
493 }
494 }
495
496
497 /*
498 * ForceAllInProgressConnectionsToClose forces all connections of in progress transactions
499 * to close at the end of the transaction.
500 */
501 static void
ForceAllInProgressConnectionsToClose(void)502 ForceAllInProgressConnectionsToClose(void)
503 {
504 dlist_iter iter;
505 dlist_foreach(iter, &InProgressTransactions)
506 {
507 MultiConnection *connection = dlist_container(MultiConnection,
508 transactionNode,
509 iter.cur);
510
511 connection->forceCloseAtTransactionEnd = true;
512 }
513 }
514
515
516 /*
517 * If an ERROR is thrown while processing a transaction the ABORT handler is called.
518 * ERRORS thrown during ABORT are not treated any differently, the ABORT handler is also
519 * called during processing of those. If an ERROR was raised the first time through it's
520 * unlikely that the second try will succeed; more likely that an ERROR will be thrown
521 * again. This loop continues until Postgres notices and PANICs, complaining about a stack
522 * overflow.
523 *
524 * Instead of looping and crashing, SwallowErrors lets us attempt to continue running the
525 * ABORT logic. This wouldn't be safe in most other parts of the codebase, in
526 * approximately none of the places where we emit ERROR do we first clean up after
527 * ourselves! It's fine inside the ABORT handler though; Postgres is going to clean
528 * everything up before control passes back to us.
529 *
530 * If it swallows any error, returns true. Otherwise, returns false.
531 */
532 static bool
SwallowErrors(void (* func)())533 SwallowErrors(void (*func)())
534 {
535 MemoryContext savedContext = CurrentMemoryContext;
536 volatile bool anyErrorSwallowed = false;
537
538 PG_TRY();
539 {
540 func();
541 }
542 PG_CATCH();
543 {
544 MemoryContextSwitchTo(savedContext);
545 ErrorData *edata = CopyErrorData();
546 FlushErrorState();
547
548 /* rethrow as WARNING */
549 edata->elevel = WARNING;
550 ThrowErrorData(edata);
551
552 anyErrorSwallowed = true;
553 }
554 PG_END_TRY();
555
556 return anyErrorSwallowed;
557 }
558
559
560 /*
561 * ResetGlobalVariables resets global variables that
562 * might be changed during the execution of queries.
563 */
564 static void
ResetGlobalVariables()565 ResetGlobalVariables()
566 {
567 CurrentCoordinatedTransactionState = COORD_TRANS_NONE;
568 XactModificationLevel = XACT_MODIFICATION_NONE;
569 SetLocalExecutionStatus(LOCAL_EXECUTION_OPTIONAL);
570 FreeSavedExplainPlan();
571 dlist_init(&InProgressTransactions);
572 activeSetStmts = NULL;
573 ShouldCoordinatedTransactionUse2PC = false;
574 TransactionModifiedNodeMetadata = false;
575 MetadataSyncOnCommit = false;
576 ResetWorkerErrorIndication();
577 }
578
579
580 /*
581 * ResetShardPlacementTransactionState performs cleanup after the end of a
582 * transaction.
583 */
584 static void
ResetShardPlacementTransactionState(void)585 ResetShardPlacementTransactionState(void)
586 {
587 if (MultiShardCommitProtocol == COMMIT_PROTOCOL_BARE)
588 {
589 MultiShardCommitProtocol = SavedMultiShardCommitProtocol;
590 SavedMultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
591 }
592 }
593
594
595 /*
596 * CoordinatedSubTransactionCallback is the callback used to implement
597 * distributed ROLLBACK TO SAVEPOINT.
598 */
599 static void
CoordinatedSubTransactionCallback(SubXactEvent event,SubTransactionId subId,SubTransactionId parentSubid,void * arg)600 CoordinatedSubTransactionCallback(SubXactEvent event, SubTransactionId subId,
601 SubTransactionId parentSubid, void *arg)
602 {
603 switch (event)
604 {
605 /*
606 * Our subtransaction stack should be consistent with postgres' internal
607 * transaction stack. In case of subxact begin, postgres calls our
608 * callback after it has pushed the transaction into stack, so we have to
609 * do the same even if worker commands fail, so we PushSubXact() first.
610 * In case of subxact commit, callback is called before pushing subxact to
611 * the postgres transaction stack, so we call PopSubXact() after making sure
612 * worker commands didn't fail. Otherwise, Postgres would roll back that
613 * would cause us to call PopSubXact again.
614 */
615 case SUBXACT_EVENT_START_SUB:
616 {
617 PushSubXact(subId);
618 if (InCoordinatedTransaction())
619 {
620 CoordinatedRemoteTransactionsSavepointBegin(subId);
621 }
622 break;
623 }
624
625 case SUBXACT_EVENT_COMMIT_SUB:
626 {
627 if (InCoordinatedTransaction())
628 {
629 CoordinatedRemoteTransactionsSavepointRelease(subId);
630 }
631 PopSubXact(subId);
632 break;
633 }
634
635 case SUBXACT_EVENT_ABORT_SUB:
636 {
637 /*
638 * Stop showing message for now, will re-enable when executing
639 * the next statement.
640 */
641 DisableWorkerMessagePropagation();
642
643 /*
644 * Given that we aborted, worker error indications can be ignored.
645 */
646 ResetWorkerErrorIndication();
647
648 if (InCoordinatedTransaction())
649 {
650 CoordinatedRemoteTransactionsSavepointRollback(subId);
651 }
652 PopSubXact(subId);
653
654 break;
655 }
656
657 case SUBXACT_EVENT_PRE_COMMIT_SUB:
658 {
659 /* nothing to do */
660 break;
661 }
662 }
663 }
664
665
666 /*
667 * AdjustMaxPreparedTransactions configures the number of available prepared
668 * transaction slots at startup.
669 */
670 static void
AdjustMaxPreparedTransactions(void)671 AdjustMaxPreparedTransactions(void)
672 {
673 /*
674 * As Citus uses 2PC internally, there always should be some available. As
675 * the default is 0, we increase it to something appropriate
676 * (connections * 2 currently). If the user explicitly configured 2PC, we
677 * leave the configuration alone - there might have been intent behind the
678 * decision.
679 */
680 if (max_prepared_xacts == 0)
681 {
682 char newvalue[12];
683
684 SafeSnprintf(newvalue, sizeof(newvalue), "%d", MaxConnections * 2);
685
686 SetConfigOption("max_prepared_transactions", newvalue, PGC_POSTMASTER,
687 PGC_S_OVERRIDE);
688
689 ereport(LOG, (errmsg("number of prepared transactions has not been "
690 "configured, overriding"),
691 errdetail("max_prepared_transactions is now set to %s",
692 newvalue)));
693 }
694 }
695
696
697 /* PushSubXact pushes subId to the stack of active sub-transactions. */
698 static void
PushSubXact(SubTransactionId subId)699 PushSubXact(SubTransactionId subId)
700 {
701 /*
702 * We need to allocate these in TopTransactionContext instead of current
703 * subxact's memory context. This is because AtSubCommit_Memory won't
704 * delete the subxact's memory context unless it is empty, and this
705 * can cause in memory leaks. For emptiness it just checks if the memory
706 * has been reset, and we cannot reset the subxact context since other
707 * data can be in the context that are needed by upper commits.
708 *
709 * See https://github.com/citusdata/citus/issues/3999
710 */
711 MemoryContext old_context = MemoryContextSwitchTo(TopTransactionContext);
712
713 /* save provided subId as well as propagated SET LOCAL stmts */
714 SubXactContext *state = palloc(sizeof(SubXactContext));
715 state->subId = subId;
716 state->setLocalCmds = activeSetStmts;
717
718 /* append to list and reset active set stmts for upcoming sub-xact */
719 activeSubXactContexts = lcons(state, activeSubXactContexts);
720 activeSetStmts = makeStringInfo();
721
722 MemoryContextSwitchTo(old_context);
723 }
724
725
726 /* PopSubXact pops subId from the stack of active sub-transactions. */
727 static void
PopSubXact(SubTransactionId subId)728 PopSubXact(SubTransactionId subId)
729 {
730 SubXactContext *state = linitial(activeSubXactContexts);
731
732 Assert(state->subId == subId);
733
734 /*
735 * Free activeSetStmts to avoid memory leaks when we create subxacts
736 * for each row, e.g. in exception handling of UDFs.
737 */
738 if (activeSetStmts != NULL)
739 {
740 pfree(activeSetStmts->data);
741 pfree(activeSetStmts);
742 }
743
744 /*
745 * SET LOCAL commands are local to subxact blocks. When a subxact commits
746 * or rolls back, we should roll back our set of SET LOCAL commands to the
747 * ones we had in the upper commit.
748 */
749 activeSetStmts = state->setLocalCmds;
750
751 /*
752 * Free state to avoid memory leaks when we create subxacts for each row,
753 * e.g. in exception handling of UDFs.
754 */
755 pfree(state);
756
757 activeSubXactContexts = list_delete_first(activeSubXactContexts);
758 }
759
760
761 /* ActiveSubXactContexts returns the list of active sub-xact context in temporal order. */
762 List *
ActiveSubXactContexts(void)763 ActiveSubXactContexts(void)
764 {
765 List *reversedSubXactStates = NIL;
766
767 /*
768 * activeSubXactContexts is in reversed temporal order, so we reverse it to get it
769 * in temporal order.
770 */
771 SubXactContext *state = NULL;
772 foreach_ptr(state, activeSubXactContexts)
773 {
774 reversedSubXactStates = lcons(state, reversedSubXactStates);
775 }
776
777 return reversedSubXactStates;
778 }
779
780
781 /*
782 * IsMultiStatementTransaction determines whether the current statement is
783 * part of a bigger multi-statement transaction. This is the case when the
784 * statement is wrapped in a transaction block (comes after BEGIN), or it
785 * is called from a stored procedure or function.
786 */
787 bool
IsMultiStatementTransaction(void)788 IsMultiStatementTransaction(void)
789 {
790 if (IsTransactionBlock())
791 {
792 /* in a BEGIN...END block */
793 return true;
794 }
795 else if (DoBlockLevel > 0)
796 {
797 /* in (a transaction within) a do block */
798 return true;
799 }
800 else if (StoredProcedureLevel > 0)
801 {
802 /* in (a transaction within) a stored procedure */
803 return true;
804 }
805 else if (MaybeExecutingUDF() && FunctionOpensTransactionBlock)
806 {
807 /* in a language-handler function call, open a transaction if configured to do so */
808 return true;
809 }
810 else
811 {
812 return false;
813 }
814 }
815
816
817 /*
818 * MaybeExecutingUDF returns true if we are possibly executing a function call.
819 * We use nested level of executor to check this, so this can return true for
820 * CTEs, etc. which also start nested executors.
821 *
822 * If the planner is being called from the executor, then we may also be in
823 * a UDF.
824 */
825 static bool
MaybeExecutingUDF(void)826 MaybeExecutingUDF(void)
827 {
828 return ExecutorLevel > 1 || (ExecutorLevel == 1 && PlannerLevel > 0);
829 }
830
831
832 /*
833 * TriggerMetadataSyncOnCommit sets a flag to do metadata sync on commit.
834 * This is because new metadata only becomes visible to the metadata sync
835 * daemon after commit happens.
836 */
837 void
TriggerMetadataSyncOnCommit(void)838 TriggerMetadataSyncOnCommit(void)
839 {
840 MetadataSyncOnCommit = true;
841 }
842