1 /*-------------------------------------------------------------------------
2  *
3  * remote_transaction.c
4  *   Management of transaction spanning more than one node.
5  *
6  * Copyright (c) Citus Data, Inc.
7  *
8  *-------------------------------------------------------------------------
9  */
10 
11 #include "postgres.h"
12 
13 #include "libpq-fe.h"
14 
15 #include "miscadmin.h"
16 
17 #include "access/xact.h"
18 #include "distributed/backend_data.h"
19 #include "distributed/citus_safe_lib.h"
20 #include "distributed/connection_management.h"
21 #include "distributed/listutils.h"
22 #include "distributed/metadata_cache.h"
23 #include "distributed/placement_connection.h"
24 #include "distributed/remote_commands.h"
25 #include "distributed/remote_transaction.h"
26 #include "distributed/transaction_identifier.h"
27 #include "distributed/transaction_management.h"
28 #include "distributed/transaction_recovery.h"
29 #include "distributed/worker_manager.h"
30 #include "utils/builtins.h"
31 #include "utils/hsearch.h"
32 
33 
34 #define PREPARED_TRANSACTION_NAME_FORMAT "citus_%u_%u_"UINT64_FORMAT "_%u"
35 
36 
37 static void StartRemoteTransactionSavepointBegin(MultiConnection *connection,
38 												 SubTransactionId subId);
39 static void FinishRemoteTransactionSavepointBegin(MultiConnection *connection,
40 												  SubTransactionId subId);
41 static void StartRemoteTransactionSavepointRelease(MultiConnection *connection,
42 												   SubTransactionId subId);
43 static void FinishRemoteTransactionSavepointRelease(MultiConnection *connection,
44 													SubTransactionId subId);
45 static void StartRemoteTransactionSavepointRollback(MultiConnection *connection,
46 													SubTransactionId subId);
47 static void FinishRemoteTransactionSavepointRollback(MultiConnection *connection,
48 													 SubTransactionId subId);
49 
50 static void Assign2PCIdentifier(MultiConnection *connection);
51 
52 
53 /*
54  * StartRemoteTransactionBeging initiates beginning the remote transaction in
55  * a non-blocking manner. The function sends "BEGIN" followed by
56  * assign_distributed_transaction_id() to assign the distributed transaction
57  * id on the remote node.
58  */
59 void
StartRemoteTransactionBegin(struct MultiConnection * connection)60 StartRemoteTransactionBegin(struct MultiConnection *connection)
61 {
62 	RemoteTransaction *transaction = &connection->remoteTransaction;
63 
64 	Assert(transaction->transactionState == REMOTE_TRANS_NOT_STARTED);
65 
66 	/* remember transaction as being in-progress */
67 	dlist_push_tail(&InProgressTransactions, &connection->transactionNode);
68 
69 	transaction->transactionState = REMOTE_TRANS_STARTING;
70 
71 	StringInfo beginAndSetDistributedTransactionId =
72 		BeginAndSetDistributedTransactionIdCommand();
73 
74 	/* append context for in-progress SAVEPOINTs for this transaction */
75 	List *activeSubXacts = ActiveSubXactContexts();
76 	transaction->lastSuccessfulSubXact = TopSubTransactionId;
77 	transaction->lastQueuedSubXact = TopSubTransactionId;
78 
79 	SubXactContext *subXactState = NULL;
80 	foreach_ptr(subXactState, activeSubXacts)
81 	{
82 		/* append SET LOCAL state from when SAVEPOINT was encountered... */
83 		if (subXactState->setLocalCmds != NULL)
84 		{
85 			appendStringInfoString(beginAndSetDistributedTransactionId,
86 								   subXactState->setLocalCmds->data);
87 		}
88 
89 		/* ... then append SAVEPOINT to enter this subxact */
90 		appendStringInfo(beginAndSetDistributedTransactionId,
91 						 "SAVEPOINT savepoint_%u;", subXactState->subId);
92 		transaction->lastQueuedSubXact = subXactState->subId;
93 	}
94 
95 	/* we've pushed into deepest subxact: apply in-progress SET context */
96 	if (activeSetStmts != NULL)
97 	{
98 		appendStringInfoString(beginAndSetDistributedTransactionId, activeSetStmts->data);
99 	}
100 
101 	if (!SendRemoteCommand(connection, beginAndSetDistributedTransactionId->data))
102 	{
103 		const bool raiseErrors = true;
104 
105 		HandleRemoteTransactionConnectionError(connection, raiseErrors);
106 	}
107 
108 	transaction->beginSent = true;
109 }
110 
111 
112 /*
113  * BeginAndSetDistributedTransactionIdCommand returns a command which starts
114  * a transaction and assigns the current distributed transaction id.
115  */
116 StringInfo
BeginAndSetDistributedTransactionIdCommand(void)117 BeginAndSetDistributedTransactionIdCommand(void)
118 {
119 	StringInfo beginAndSetDistributedTransactionId = makeStringInfo();
120 
121 	/*
122 	 * Explicitly specify READ COMMITTED, the default on the remote
123 	 * side might have been changed, and that would cause problematic
124 	 * behaviour.
125 	 */
126 	appendStringInfoString(beginAndSetDistributedTransactionId,
127 						   "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;");
128 
129 	/*
130 	 * Append BEGIN and assign_distributed_transaction_id() statements into a single command
131 	 * and send both in one step. The reason is purely performance, we don't want
132 	 * seperate roundtrips for these two statements.
133 	 */
134 	DistributedTransactionId *distributedTransactionId =
135 		GetCurrentDistributedTransactionId();
136 	const char *timestamp = timestamptz_to_str(distributedTransactionId->timestamp);
137 	appendStringInfo(beginAndSetDistributedTransactionId,
138 					 "SELECT assign_distributed_transaction_id(%d, " UINT64_FORMAT
139 					 ", '%s');",
140 					 distributedTransactionId->initiatorNodeIdentifier,
141 					 distributedTransactionId->transactionNumber,
142 					 timestamp);
143 
144 	return beginAndSetDistributedTransactionId;
145 }
146 
147 
148 /*
149  * FinishRemoteTransactionBegin finishes the work StartRemoteTransactionBegin
150  * initiated. It blocks if necessary (i.e. if PQisBusy() would return true).
151  */
152 void
FinishRemoteTransactionBegin(struct MultiConnection * connection)153 FinishRemoteTransactionBegin(struct MultiConnection *connection)
154 {
155 	RemoteTransaction *transaction = &connection->remoteTransaction;
156 	bool raiseErrors = true;
157 
158 	Assert(transaction->transactionState == REMOTE_TRANS_STARTING);
159 
160 	bool clearSuccessful = ClearResults(connection, raiseErrors);
161 	if (clearSuccessful)
162 	{
163 		transaction->transactionState = REMOTE_TRANS_STARTED;
164 		transaction->lastSuccessfulSubXact = transaction->lastQueuedSubXact;
165 	}
166 
167 	if (!transaction->transactionFailed)
168 	{
169 		Assert(PQtransactionStatus(connection->pgConn) == PQTRANS_INTRANS);
170 	}
171 }
172 
173 
174 /*
175  * RemoteTransactionBegin begins a remote transaction in a blocking manner.
176  */
177 void
RemoteTransactionBegin(struct MultiConnection * connection)178 RemoteTransactionBegin(struct MultiConnection *connection)
179 {
180 	StartRemoteTransactionBegin(connection);
181 	FinishRemoteTransactionBegin(connection);
182 }
183 
184 
185 /*
186  * RemoteTransactionListBegin sends BEGIN over all connections in the
187  * given connection list and waits for all of them to finish.
188  */
189 void
RemoteTransactionListBegin(List * connectionList)190 RemoteTransactionListBegin(List *connectionList)
191 {
192 	MultiConnection *connection = NULL;
193 
194 	/* send BEGIN to all nodes */
195 	foreach_ptr(connection, connectionList)
196 	{
197 		StartRemoteTransactionBegin(connection);
198 	}
199 
200 	/* wait for BEGIN to finish on all nodes */
201 	foreach_ptr(connection, connectionList)
202 	{
203 		FinishRemoteTransactionBegin(connection);
204 	}
205 }
206 
207 
208 /*
209  * StartRemoteTransactionCommit initiates transaction commit in a non-blocking
210  * manner.  If the transaction is in a failed state, it'll instead get rolled
211  * back.
212  */
213 void
StartRemoteTransactionCommit(MultiConnection * connection)214 StartRemoteTransactionCommit(MultiConnection *connection)
215 {
216 	RemoteTransaction *transaction = &connection->remoteTransaction;
217 	const bool raiseErrors = false;
218 
219 	/* can only commit if transaction is in progress */
220 	Assert(transaction->transactionState != REMOTE_TRANS_NOT_STARTED);
221 
222 	/* can't commit if we already started to commit or abort */
223 	Assert(transaction->transactionState < REMOTE_TRANS_1PC_ABORTING);
224 
225 	if (transaction->transactionFailed)
226 	{
227 		/* abort the transaction if it failed */
228 		transaction->transactionState = REMOTE_TRANS_1PC_ABORTING;
229 
230 		/*
231 		 * Try sending an ROLLBACK; Depending on the state that won't
232 		 * succeed, but let's try.  Have to clear previous results
233 		 * first.
234 		 */
235 		ForgetResults(connection); /* try to clear pending stuff */
236 		if (!SendRemoteCommand(connection, "ROLLBACK"))
237 		{
238 			/* no point in reporting a likely redundant message */
239 		}
240 	}
241 	else if (transaction->transactionState == REMOTE_TRANS_PREPARED)
242 	{
243 		/* commit the prepared transaction */
244 		StringInfoData command;
245 
246 		initStringInfo(&command);
247 		appendStringInfo(&command, "COMMIT PREPARED %s",
248 						 quote_literal_cstr(transaction->preparedName));
249 
250 		transaction->transactionState = REMOTE_TRANS_2PC_COMMITTING;
251 
252 		if (!SendRemoteCommand(connection, command.data))
253 		{
254 			HandleRemoteTransactionConnectionError(connection, raiseErrors);
255 		}
256 	}
257 	else
258 	{
259 		/* initiate remote transaction commit */
260 		transaction->transactionState = REMOTE_TRANS_1PC_COMMITTING;
261 
262 		if (!SendRemoteCommand(connection, "COMMIT"))
263 		{
264 			/*
265 			 * For a moment there I thought we were in trouble.
266 			 *
267 			 * Failing in this state means that we don't know whether the
268 			 * commit has succeeded.
269 			 */
270 			HandleRemoteTransactionConnectionError(connection, raiseErrors);
271 		}
272 	}
273 }
274 
275 
276 /*
277  * FinishRemoteTransactionCommit finishes the work
278  * StartRemoteTransactionCommit initiated. It blocks if necessary (i.e. if
279  * PQisBusy() would return true).
280  */
281 void
FinishRemoteTransactionCommit(MultiConnection * connection)282 FinishRemoteTransactionCommit(MultiConnection *connection)
283 {
284 	RemoteTransaction *transaction = &connection->remoteTransaction;
285 	const bool raiseErrors = false;
286 
287 	Assert(transaction->transactionState == REMOTE_TRANS_1PC_ABORTING ||
288 		   transaction->transactionState == REMOTE_TRANS_1PC_COMMITTING ||
289 		   transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING);
290 
291 	PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
292 
293 	if (!IsResponseOK(result))
294 	{
295 		HandleRemoteTransactionResultError(connection, result, raiseErrors);
296 
297 		/*
298 		 * Failing in this state means that we will often not know whether
299 		 * the commit has succeeded (particularly in case of network
300 		 * troubles).
301 		 *
302 		 * XXX: It might be worthwhile to discern cases where we got a
303 		 * proper error back from postgres (i.e. COMMIT was received but
304 		 * produced an error) from cases where the connection failed
305 		 * before getting a reply.
306 		 */
307 
308 		if (transaction->transactionState == REMOTE_TRANS_1PC_COMMITTING)
309 		{
310 			ereport(WARNING, (errmsg("failed to commit transaction on %s:%d",
311 									 connection->hostname, connection->port)));
312 		}
313 		else if (transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING)
314 		{
315 			ereport(WARNING, (errmsg("failed to commit transaction on %s:%d",
316 									 connection->hostname, connection->port)));
317 		}
318 	}
319 	else if (transaction->transactionState == REMOTE_TRANS_1PC_ABORTING ||
320 			 transaction->transactionState == REMOTE_TRANS_2PC_ABORTING)
321 	{
322 		transaction->transactionState = REMOTE_TRANS_ABORTED;
323 	}
324 	else
325 	{
326 		transaction->transactionState = REMOTE_TRANS_COMMITTED;
327 	}
328 
329 	PQclear(result);
330 
331 	ForgetResults(connection);
332 }
333 
334 
335 /*
336  * RemoteTransactionCommit commits (or aborts, if the transaction failed) a
337  * remote transaction in a blocking manner.
338  */
339 void
RemoteTransactionCommit(MultiConnection * connection)340 RemoteTransactionCommit(MultiConnection *connection)
341 {
342 	StartRemoteTransactionCommit(connection);
343 	FinishRemoteTransactionCommit(connection);
344 }
345 
346 
347 /*
348  * StartRemoteTransactionAbort initiates abortin the transaction in a
349  * non-blocking manner.
350  */
351 void
StartRemoteTransactionAbort(MultiConnection * connection)352 StartRemoteTransactionAbort(MultiConnection *connection)
353 {
354 	RemoteTransaction *transaction = &connection->remoteTransaction;
355 	const bool raiseErrors = false;
356 
357 	Assert(transaction->transactionState != REMOTE_TRANS_NOT_STARTED);
358 
359 	/*
360 	 * Clear previous results, so we have a better chance to send ROLLBACK
361 	 * [PREPARED]. If we've previously sent a PREPARE TRANSACTION, we always
362 	 * want to wait for that result, as that shouldn't take long and will
363 	 * reserve resources.  But if there's another query running, we don't want
364 	 * to wait, because a long running statement may be running, so force it to
365 	 * be killed in that case.
366 	 */
367 	if (transaction->transactionState == REMOTE_TRANS_PREPARING ||
368 		transaction->transactionState == REMOTE_TRANS_PREPARED)
369 	{
370 		StringInfoData command;
371 
372 		/* await PREPARE TRANSACTION results, closing the connection would leave it dangling */
373 		ForgetResults(connection);
374 
375 		initStringInfo(&command);
376 		appendStringInfo(&command, "ROLLBACK PREPARED %s",
377 						 quote_literal_cstr(transaction->preparedName));
378 
379 		if (!SendRemoteCommand(connection, command.data))
380 		{
381 			HandleRemoteTransactionConnectionError(connection, raiseErrors);
382 		}
383 		else
384 		{
385 			transaction->transactionState = REMOTE_TRANS_2PC_ABORTING;
386 		}
387 	}
388 	else
389 	{
390 		/*
391 		 * In case of a cancellation, the connection might still be working
392 		 * on some commands. Try to consume the results such that the
393 		 * connection can be reused, but do not want to wait for commands
394 		 * to finish. Instead we just close the connection if the command
395 		 * is still busy.
396 		 */
397 		if (!ClearResultsIfReady(connection))
398 		{
399 			ShutdownConnection(connection);
400 
401 			/* FinishRemoteTransactionAbort will emit warning */
402 			return;
403 		}
404 
405 		if (!SendRemoteCommand(connection, "ROLLBACK"))
406 		{
407 			/* no point in reporting a likely redundant message */
408 			MarkRemoteTransactionFailed(connection, raiseErrors);
409 		}
410 		else
411 		{
412 			transaction->transactionState = REMOTE_TRANS_1PC_ABORTING;
413 		}
414 	}
415 }
416 
417 
418 /*
419  * FinishRemoteTransactionAbort finishes the work StartRemoteTransactionAbort
420  * initiated. It blocks if necessary (i.e. if PQisBusy() would return true).
421  */
422 void
FinishRemoteTransactionAbort(MultiConnection * connection)423 FinishRemoteTransactionAbort(MultiConnection *connection)
424 {
425 	RemoteTransaction *transaction = &connection->remoteTransaction;
426 	const bool raiseErrors = false;
427 
428 	if (transaction->transactionState == REMOTE_TRANS_2PC_ABORTING)
429 	{
430 		PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
431 		if (!IsResponseOK(result))
432 		{
433 			HandleRemoteTransactionResultError(connection, result, raiseErrors);
434 		}
435 
436 		PQclear(result);
437 	}
438 
439 	/*
440 	 * Try to consume results of any in-progress commands. In the 1PC case
441 	 * this is also where we consume the result of the ROLLBACK.
442 	 *
443 	 * If we don't succeed the connection will be in a bad state, so we close it.
444 	 */
445 	if (!ClearResults(connection, raiseErrors))
446 	{
447 		ShutdownConnection(connection);
448 	}
449 
450 	transaction->transactionState = REMOTE_TRANS_ABORTED;
451 }
452 
453 
454 /*
455  * RemoteTransactionAbort aborts a remote transaction in a blocking manner.
456  */
457 void
RemoteTransactionAbort(MultiConnection * connection)458 RemoteTransactionAbort(MultiConnection *connection)
459 {
460 	StartRemoteTransactionAbort(connection);
461 	FinishRemoteTransactionAbort(connection);
462 }
463 
464 
465 /*
466  * StartRemoteTransactionPrepare initiates preparing the transaction in a
467  * non-blocking manner.
468  */
469 void
StartRemoteTransactionPrepare(struct MultiConnection * connection)470 StartRemoteTransactionPrepare(struct MultiConnection *connection)
471 {
472 	RemoteTransaction *transaction = &connection->remoteTransaction;
473 	StringInfoData command;
474 	const bool raiseErrors = true;
475 
476 	/* can't prepare a nonexistant transaction */
477 	Assert(transaction->transactionState != REMOTE_TRANS_NOT_STARTED);
478 
479 	/* can't prepare in a failed transaction */
480 	Assert(!transaction->transactionFailed);
481 
482 	/* can't prepare if already started to prepare/abort/commit */
483 	Assert(transaction->transactionState < REMOTE_TRANS_PREPARING);
484 
485 	Assign2PCIdentifier(connection);
486 
487 	/* log transactions to workers in pg_dist_transaction */
488 	WorkerNode *workerNode = FindWorkerNode(connection->hostname, connection->port);
489 	if (workerNode != NULL)
490 	{
491 		LogTransactionRecord(workerNode->groupId, transaction->preparedName);
492 	}
493 
494 	initStringInfo(&command);
495 	appendStringInfo(&command, "PREPARE TRANSACTION %s",
496 					 quote_literal_cstr(transaction->preparedName));
497 
498 	if (!SendRemoteCommand(connection, command.data))
499 	{
500 		HandleRemoteTransactionConnectionError(connection, raiseErrors);
501 	}
502 	else
503 	{
504 		transaction->transactionState = REMOTE_TRANS_PREPARING;
505 	}
506 }
507 
508 
509 /*
510  * FinishRemoteTransactionPrepare finishes the work
511  * StartRemoteTransactionPrepare initiated. It blocks if necessary (i.e. if
512  * PQisBusy() would return true).
513  */
514 void
FinishRemoteTransactionPrepare(struct MultiConnection * connection)515 FinishRemoteTransactionPrepare(struct MultiConnection *connection)
516 {
517 	RemoteTransaction *transaction = &connection->remoteTransaction;
518 	const bool raiseErrors = true;
519 
520 	Assert(transaction->transactionState == REMOTE_TRANS_PREPARING);
521 
522 	PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
523 
524 	if (!IsResponseOK(result))
525 	{
526 		transaction->transactionState = REMOTE_TRANS_ABORTED;
527 		HandleRemoteTransactionResultError(connection, result, raiseErrors);
528 	}
529 	else
530 	{
531 		transaction->transactionState = REMOTE_TRANS_PREPARED;
532 	}
533 
534 	PQclear(result);
535 
536 	/*
537 	 * Try to consume results of PREPARE TRANSACTION command. If we don't
538 	 * succeed, rollback the transaction. Note that we've not committed on
539 	 * any node yet, and we're not sure about the state of the worker node.
540 	 * So rollbacking seems to be the safest action if the worker is
541 	 * in a state where it can actually rollback.
542 	 */
543 	if (!ClearResults(connection, raiseErrors))
544 	{
545 		ereport(ERROR, (errmsg("failed to prepare transaction '%s' on host %s:%d",
546 							   transaction->preparedName, connection->hostname,
547 							   connection->port),
548 						errhint("Try re-running the command.")));
549 	}
550 }
551 
552 
553 /*
554  * RemoteTransactionBeginIfNecessary is a convenience wrapper around
555  * RemoteTransactionsBeginIfNecessary(), for a single connection.
556  */
557 void
RemoteTransactionBeginIfNecessary(MultiConnection * connection)558 RemoteTransactionBeginIfNecessary(MultiConnection *connection)
559 {
560 	/* just delegate */
561 	if (InCoordinatedTransaction())
562 	{
563 		List *connectionList = list_make1(connection);
564 
565 		RemoteTransactionsBeginIfNecessary(connectionList);
566 		list_free(connectionList);
567 	}
568 }
569 
570 
571 /*
572  * RemoteTransactionsBeginIfNecessary begins, if necessary according to this
573  * session's coordinated transaction state, and the remote transaction's
574  * state, an explicit transaction on all the connections.  This is done in
575  * parallel, to lessen latency penalties.
576  */
577 void
RemoteTransactionsBeginIfNecessary(List * connectionList)578 RemoteTransactionsBeginIfNecessary(List *connectionList)
579 {
580 	MultiConnection *connection = NULL;
581 
582 	/*
583 	 * Don't do anything if not in a coordinated transaction. That allows the
584 	 * same code to work both in situations that uses transactions, and when
585 	 * not.
586 	 */
587 	if (!InCoordinatedTransaction())
588 	{
589 		return;
590 	}
591 
592 	/* issue BEGIN to all connections needing it */
593 	foreach_ptr(connection, connectionList)
594 	{
595 		RemoteTransaction *transaction = &connection->remoteTransaction;
596 
597 		/* can't send BEGIN if a command already is in progress */
598 		Assert(PQtransactionStatus(connection->pgConn) != PQTRANS_ACTIVE);
599 
600 		/*
601 		 * If a transaction already is in progress (including having failed),
602 		 * don't start it again. That's quite normal if a piece of code allows
603 		 * cached connections.
604 		 */
605 		if (transaction->transactionState != REMOTE_TRANS_NOT_STARTED)
606 		{
607 			continue;
608 		}
609 
610 		StartRemoteTransactionBegin(connection);
611 	}
612 
613 	bool raiseInterrupts = true;
614 	WaitForAllConnections(connectionList, raiseInterrupts);
615 
616 	/* get result of all the BEGINs */
617 	foreach_ptr(connection, connectionList)
618 	{
619 		RemoteTransaction *transaction = &connection->remoteTransaction;
620 
621 		/*
622 		 * Only handle BEGIN results on connections that are in process of
623 		 * starting a transaction, and haven't already failed (e.g. by not
624 		 * being able to send BEGIN due to a network failure).
625 		 */
626 		if (transaction->transactionFailed ||
627 			transaction->transactionState != REMOTE_TRANS_STARTING)
628 		{
629 			continue;
630 		}
631 
632 		FinishRemoteTransactionBegin(connection);
633 	}
634 }
635 
636 
637 /*
638  * HandleRemoteTransactionConnectionError records a transaction as having failed
639  * and throws a connection error if the transaction was critical and raiseErrors
640  * is true, or a warning otherwise.
641  */
642 void
HandleRemoteTransactionConnectionError(MultiConnection * connection,bool raiseErrors)643 HandleRemoteTransactionConnectionError(MultiConnection *connection, bool raiseErrors)
644 {
645 	RemoteTransaction *transaction = &connection->remoteTransaction;
646 
647 	transaction->transactionFailed = true;
648 
649 	if (transaction->transactionCritical && raiseErrors)
650 	{
651 		ReportConnectionError(connection, ERROR);
652 	}
653 	else
654 	{
655 		ReportConnectionError(connection, WARNING);
656 	}
657 }
658 
659 
660 /*
661  * HandleRemoteTransactionResultError records a transaction as having failed
662  * and throws a result error if the transaction was critical and raiseErrors
663  * is true, or a warning otherwise.
664  */
665 void
HandleRemoteTransactionResultError(MultiConnection * connection,PGresult * result,bool raiseErrors)666 HandleRemoteTransactionResultError(MultiConnection *connection, PGresult *result, bool
667 								   raiseErrors)
668 {
669 	RemoteTransaction *transaction = &connection->remoteTransaction;
670 
671 	transaction->transactionFailed = true;
672 
673 	if (transaction->transactionCritical && raiseErrors)
674 	{
675 		ReportResultError(connection, result, ERROR);
676 	}
677 	else
678 	{
679 		ReportResultError(connection, result, WARNING);
680 	}
681 }
682 
683 
684 /*
685  * MarkRemoteTransactionFailed records a transaction as having failed.
686  *
687  * If the connection is marked as critical, and allowErrorPromotion is true,
688  * this routine will ERROR out. The allowErrorPromotion case is primarily
689  * required for the transaction management code itself. Usually it is helpful
690  * to fail as soon as possible. If !allowErrorPromotion transaction commit
691  * will instead issue an error before committing on any node.
692  */
693 void
MarkRemoteTransactionFailed(MultiConnection * connection,bool allowErrorPromotion)694 MarkRemoteTransactionFailed(MultiConnection *connection, bool allowErrorPromotion)
695 {
696 	RemoteTransaction *transaction = &connection->remoteTransaction;
697 
698 	transaction->transactionFailed = true;
699 
700 	/*
701 	 * If the connection is marked as critical, fail the entire coordinated
702 	 * transaction. If allowed.
703 	 */
704 	if (transaction->transactionCritical && allowErrorPromotion)
705 	{
706 		ereport(ERROR, (errmsg("failure on connection marked as essential: %s:%d",
707 							   connection->hostname, connection->port)));
708 	}
709 }
710 
711 
712 /*
713  * MarkRemoteTransactionCritical signals that failures on this remote
714  * transaction should fail the entire coordinated transaction.
715  */
716 void
MarkRemoteTransactionCritical(struct MultiConnection * connection)717 MarkRemoteTransactionCritical(struct MultiConnection *connection)
718 {
719 	RemoteTransaction *transaction = &connection->remoteTransaction;
720 
721 	transaction->transactionCritical = true;
722 }
723 
724 
725 /*
726  * CloseRemoteTransaction handles closing a connection that, potentially, is
727  * part of a coordinated transaction.  This should only ever be called from
728  * connection_management.c, while closing a connection during a transaction.
729  */
730 void
CloseRemoteTransaction(struct MultiConnection * connection)731 CloseRemoteTransaction(struct MultiConnection *connection)
732 {
733 	RemoteTransaction *transaction = &connection->remoteTransaction;
734 
735 	/* unlink from list of open transactions, if necessary */
736 	if (transaction->transactionState != REMOTE_TRANS_NOT_STARTED)
737 	{
738 		/* XXX: Should we error out for a critical transaction? */
739 
740 		dlist_delete(&connection->transactionNode);
741 	}
742 }
743 
744 
745 /*
746  * ResetRemoteTransaction resets the state of the transaction after the end of
747  * the main transaction, if the connection is being reused.
748  */
749 void
ResetRemoteTransaction(struct MultiConnection * connection)750 ResetRemoteTransaction(struct MultiConnection *connection)
751 {
752 	RemoteTransaction *transaction = &connection->remoteTransaction;
753 
754 	/* just reset the entire state, relying on 0 being invalid/false */
755 	memset(transaction, 0, sizeof(*transaction));
756 }
757 
758 
759 /*
760  * CoordinatedRemoteTransactionsPrepare PREPAREs a 2PC transaction on all
761  * non-failed transactions participating in the coordinated transaction.
762  */
763 void
CoordinatedRemoteTransactionsPrepare(void)764 CoordinatedRemoteTransactionsPrepare(void)
765 {
766 	dlist_iter iter;
767 	List *connectionList = NIL;
768 
769 	/* issue PREPARE TRANSACTION; to all relevant remote nodes */
770 
771 	/* asynchronously send PREPARE */
772 	dlist_foreach(iter, &InProgressTransactions)
773 	{
774 		MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
775 													  iter.cur);
776 		RemoteTransaction *transaction = &connection->remoteTransaction;
777 
778 		Assert(transaction->transactionState != REMOTE_TRANS_NOT_STARTED);
779 
780 		/* can't PREPARE a transaction that failed */
781 		if (transaction->transactionFailed)
782 		{
783 			continue;
784 		}
785 
786 		/*
787 		 * Check if any DML or DDL is executed over the connection on any
788 		 * placement/table. If yes, we start preparing the transaction, otherwise
789 		 * we skip prepare since the connection didn't perform any write (read-only)
790 		 */
791 		if (ConnectionModifiedPlacement(connection))
792 		{
793 			StartRemoteTransactionPrepare(connection);
794 			connectionList = lappend(connectionList, connection);
795 		}
796 	}
797 
798 	bool raiseInterrupts = true;
799 	WaitForAllConnections(connectionList, raiseInterrupts);
800 
801 	/* Wait for result */
802 	dlist_foreach(iter, &InProgressTransactions)
803 	{
804 		MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
805 													  iter.cur);
806 		RemoteTransaction *transaction = &connection->remoteTransaction;
807 
808 		if (transaction->transactionState != REMOTE_TRANS_PREPARING)
809 		{
810 			/*
811 			 * Verify that either the transaction failed, hence we couldn't prepare
812 			 * or the connection didn't modify any placement
813 			 */
814 			Assert(transaction->transactionFailed ||
815 				   !ConnectionModifiedPlacement(connection));
816 			continue;
817 		}
818 
819 		FinishRemoteTransactionPrepare(connection);
820 	}
821 
822 	CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED;
823 }
824 
825 
826 /*
827  * CoordinatedRemoteTransactionsCommit performs distributed transactions
828  * handling at commit time. This will be called at XACT_EVENT_PRE_COMMIT if
829  * 1PC commits are used - so shards can still be invalidated - and at
830  * XACT_EVENT_COMMIT if 2PC is being used.
831  *
832  * Note that this routine has to issue rollbacks for failed transactions.
833  */
834 void
CoordinatedRemoteTransactionsCommit(void)835 CoordinatedRemoteTransactionsCommit(void)
836 {
837 	dlist_iter iter;
838 	List *connectionList = NIL;
839 
840 	/*
841 	 * Issue appropriate transaction commands to remote nodes. If everything
842 	 * went well that's going to be COMMIT or COMMIT PREPARED, if individual
843 	 * connections had errors, some or all of them might require a ROLLBACK.
844 	 *
845 	 * First send the command asynchronously over all connections.
846 	 */
847 	dlist_foreach(iter, &InProgressTransactions)
848 	{
849 		MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
850 													  iter.cur);
851 		RemoteTransaction *transaction = &connection->remoteTransaction;
852 
853 		if (transaction->transactionState == REMOTE_TRANS_NOT_STARTED ||
854 			transaction->transactionState == REMOTE_TRANS_1PC_COMMITTING ||
855 			transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING ||
856 			transaction->transactionState == REMOTE_TRANS_COMMITTED ||
857 			transaction->transactionState == REMOTE_TRANS_ABORTED)
858 		{
859 			continue;
860 		}
861 
862 		StartRemoteTransactionCommit(connection);
863 		connectionList = lappend(connectionList, connection);
864 	}
865 
866 	bool raiseInterrupts = false;
867 	WaitForAllConnections(connectionList, raiseInterrupts);
868 
869 	/* wait for the replies to the commands to come in */
870 	dlist_foreach(iter, &InProgressTransactions)
871 	{
872 		MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
873 													  iter.cur);
874 		RemoteTransaction *transaction = &connection->remoteTransaction;
875 
876 		/* nothing to do if not committing / aborting */
877 		if (transaction->transactionState != REMOTE_TRANS_1PC_COMMITTING &&
878 			transaction->transactionState != REMOTE_TRANS_2PC_COMMITTING &&
879 			transaction->transactionState != REMOTE_TRANS_1PC_ABORTING &&
880 			transaction->transactionState != REMOTE_TRANS_2PC_ABORTING)
881 		{
882 			continue;
883 		}
884 
885 		FinishRemoteTransactionCommit(connection);
886 	}
887 }
888 
889 
890 /*
891  * CoordinatedRemoteTransactionsAbort performs distributed transactions
892  * handling at abort time.
893  *
894  * This issues ROLLBACKS and ROLLBACK PREPARED depending on whether the remote
895  * transaction has been prepared or not.
896  */
897 void
CoordinatedRemoteTransactionsAbort(void)898 CoordinatedRemoteTransactionsAbort(void)
899 {
900 	dlist_iter iter;
901 	List *connectionList = NIL;
902 
903 	/* asynchronously send ROLLBACK [PREPARED] */
904 	dlist_foreach(iter, &InProgressTransactions)
905 	{
906 		MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
907 													  iter.cur);
908 		RemoteTransaction *transaction = &connection->remoteTransaction;
909 
910 		if (transaction->transactionState == REMOTE_TRANS_NOT_STARTED ||
911 			transaction->transactionState == REMOTE_TRANS_1PC_ABORTING ||
912 			transaction->transactionState == REMOTE_TRANS_2PC_ABORTING ||
913 			transaction->transactionState == REMOTE_TRANS_ABORTED)
914 		{
915 			continue;
916 		}
917 
918 		StartRemoteTransactionAbort(connection);
919 		connectionList = lappend(connectionList, connection);
920 	}
921 
922 	bool raiseInterrupts = false;
923 	WaitForAllConnections(connectionList, raiseInterrupts);
924 
925 	/* and wait for the results */
926 	dlist_foreach(iter, &InProgressTransactions)
927 	{
928 		MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
929 													  iter.cur);
930 		RemoteTransaction *transaction = &connection->remoteTransaction;
931 
932 		if (transaction->transactionState != REMOTE_TRANS_1PC_ABORTING &&
933 			transaction->transactionState != REMOTE_TRANS_2PC_ABORTING)
934 		{
935 			continue;
936 		}
937 
938 		FinishRemoteTransactionAbort(connection);
939 	}
940 }
941 
942 
943 /*
944  * CoordinatedRemoteTransactionsSavepointBegin sends the SAVEPOINT command for
945  * the given sub-transaction id to all connections participating in the current
946  * transaction.
947  */
948 void
CoordinatedRemoteTransactionsSavepointBegin(SubTransactionId subId)949 CoordinatedRemoteTransactionsSavepointBegin(SubTransactionId subId)
950 {
951 	dlist_iter iter;
952 	const bool raiseInterrupts = true;
953 	List *connectionList = NIL;
954 
955 	/* asynchronously send SAVEPOINT */
956 	dlist_foreach(iter, &InProgressTransactions)
957 	{
958 		MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
959 													  iter.cur);
960 		RemoteTransaction *transaction = &connection->remoteTransaction;
961 		if (transaction->transactionFailed)
962 		{
963 			continue;
964 		}
965 
966 		StartRemoteTransactionSavepointBegin(connection, subId);
967 		connectionList = lappend(connectionList, connection);
968 	}
969 
970 	WaitForAllConnections(connectionList, raiseInterrupts);
971 
972 	/* and wait for the results */
973 	dlist_foreach(iter, &InProgressTransactions)
974 	{
975 		MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
976 													  iter.cur);
977 		RemoteTransaction *transaction = &connection->remoteTransaction;
978 		if (transaction->transactionFailed)
979 		{
980 			continue;
981 		}
982 
983 		FinishRemoteTransactionSavepointBegin(connection, subId);
984 
985 		if (!transaction->transactionFailed)
986 		{
987 			transaction->lastSuccessfulSubXact = subId;
988 		}
989 	}
990 }
991 
992 
993 /*
994  * CoordinatedRemoteTransactionsSavepointRelease sends the RELEASE SAVEPOINT
995  * command for the given sub-transaction id to all connections participating in
996  * the current transaction.
997  */
998 void
CoordinatedRemoteTransactionsSavepointRelease(SubTransactionId subId)999 CoordinatedRemoteTransactionsSavepointRelease(SubTransactionId subId)
1000 {
1001 	dlist_iter iter;
1002 	const bool raiseInterrupts = true;
1003 	List *connectionList = NIL;
1004 
1005 	/* asynchronously send RELEASE SAVEPOINT */
1006 	dlist_foreach(iter, &InProgressTransactions)
1007 	{
1008 		MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
1009 													  iter.cur);
1010 		RemoteTransaction *transaction = &connection->remoteTransaction;
1011 		if (transaction->transactionFailed)
1012 		{
1013 			continue;
1014 		}
1015 
1016 		StartRemoteTransactionSavepointRelease(connection, subId);
1017 		connectionList = lappend(connectionList, connection);
1018 	}
1019 
1020 	WaitForAllConnections(connectionList, raiseInterrupts);
1021 
1022 	/* and wait for the results */
1023 	dlist_foreach(iter, &InProgressTransactions)
1024 	{
1025 		MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
1026 													  iter.cur);
1027 		RemoteTransaction *transaction = &connection->remoteTransaction;
1028 		if (transaction->transactionFailed)
1029 		{
1030 			continue;
1031 		}
1032 
1033 		FinishRemoteTransactionSavepointRelease(connection, subId);
1034 	}
1035 }
1036 
1037 
1038 /*
1039  * CoordinatedRemoteTransactionsSavepointRollback sends the ROLLBACK TO SAVEPOINT
1040  * command for the given sub-transaction id to all connections participating in
1041  * the current transaction.
1042  */
1043 void
CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId)1044 CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId)
1045 {
1046 	dlist_iter iter;
1047 	const bool raiseInterrupts = false;
1048 	List *connectionList = NIL;
1049 
1050 	/* asynchronously send ROLLBACK TO SAVEPOINT */
1051 	dlist_foreach(iter, &InProgressTransactions)
1052 	{
1053 		MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
1054 													  iter.cur);
1055 		RemoteTransaction *transaction = &connection->remoteTransaction;
1056 
1057 		/* cancel any ongoing queries before issuing rollback */
1058 		SendCancelationRequest(connection);
1059 
1060 		/* clear results, but don't show cancelation warning messages from workers. */
1061 		ClearResultsDiscardWarnings(connection, raiseInterrupts);
1062 
1063 		if (transaction->transactionFailed)
1064 		{
1065 			if (transaction->lastSuccessfulSubXact <= subId)
1066 			{
1067 				transaction->transactionRecovering = true;
1068 
1069 				/*
1070 				 * Clear the results of the failed query so we can send the ROLLBACK
1071 				 * TO SAVEPOINT command for a savepoint that can recover the transaction
1072 				 * from failure.
1073 				 */
1074 				ForgetResults(connection);
1075 			}
1076 			else
1077 			{
1078 				continue;
1079 			}
1080 		}
1081 		StartRemoteTransactionSavepointRollback(connection, subId);
1082 		connectionList = lappend(connectionList, connection);
1083 	}
1084 
1085 	WaitForAllConnections(connectionList, raiseInterrupts);
1086 
1087 	/* and wait for the results */
1088 	dlist_foreach(iter, &InProgressTransactions)
1089 	{
1090 		MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
1091 													  iter.cur);
1092 		RemoteTransaction *transaction = &connection->remoteTransaction;
1093 		if (transaction->transactionFailed && !transaction->transactionRecovering)
1094 		{
1095 			continue;
1096 		}
1097 
1098 		FinishRemoteTransactionSavepointRollback(connection, subId);
1099 
1100 		/*
1101 		 * We unclaim the connection now so it can be used again when
1102 		 * continuing after the ROLLBACK TO SAVEPOINT.
1103 		 * XXX: We do not undo our hadDML/hadDDL flags. This could result in
1104 		 * some queries not being allowed on Citus that would actually be fine
1105 		 * to execute.  Changing this would require us to keep track for each
1106 		 * savepoint which placement connections had DDL/DML executed at that
1107 		 * point and if they were already. We also do not call
1108 		 * ResetShardPlacementAssociation. This might result in suboptimal
1109 		 * parallelism, because of placement associations that are not really
1110 		 * necessary anymore because of ROLLBACK TO SAVEPOINT. To change this
1111 		 * we would need to keep track of when a connection becomes associated
1112 		 * to a placement.
1113 		 */
1114 		UnclaimConnection(connection);
1115 	}
1116 }
1117 
1118 
1119 /*
1120  * StartRemoteTransactionSavepointBegin initiates SAVEPOINT command for the given
1121  * subtransaction id in a non-blocking manner.
1122  */
1123 static void
StartRemoteTransactionSavepointBegin(MultiConnection * connection,SubTransactionId subId)1124 StartRemoteTransactionSavepointBegin(MultiConnection *connection, SubTransactionId subId)
1125 {
1126 	const bool raiseErrors = true;
1127 	StringInfo savepointCommand = makeStringInfo();
1128 	appendStringInfo(savepointCommand, "SAVEPOINT savepoint_%u", subId);
1129 
1130 	if (!SendRemoteCommand(connection, savepointCommand->data))
1131 	{
1132 		HandleRemoteTransactionConnectionError(connection, raiseErrors);
1133 	}
1134 }
1135 
1136 
1137 /*
1138  * FinishRemoteTransactionSavepointBegin finishes the work
1139  * StartRemoteTransactionSavepointBegin initiated. It blocks if necessary (i.e.
1140  * if PQisBusy() would return true).
1141  */
1142 static void
FinishRemoteTransactionSavepointBegin(MultiConnection * connection,SubTransactionId subId)1143 FinishRemoteTransactionSavepointBegin(MultiConnection *connection, SubTransactionId subId)
1144 {
1145 	const bool raiseErrors = true;
1146 	PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
1147 	if (!IsResponseOK(result))
1148 	{
1149 		HandleRemoteTransactionResultError(connection, result, raiseErrors);
1150 	}
1151 
1152 	PQclear(result);
1153 	ForgetResults(connection);
1154 }
1155 
1156 
1157 /*
1158  * StartRemoteTransactionSavepointRelease initiates RELEASE SAVEPOINT command for
1159  * the given subtransaction id in a non-blocking manner.
1160  */
1161 static void
StartRemoteTransactionSavepointRelease(MultiConnection * connection,SubTransactionId subId)1162 StartRemoteTransactionSavepointRelease(MultiConnection *connection,
1163 									   SubTransactionId subId)
1164 {
1165 	const bool raiseErrors = true;
1166 	StringInfo savepointCommand = makeStringInfo();
1167 	appendStringInfo(savepointCommand, "RELEASE SAVEPOINT savepoint_%u", subId);
1168 
1169 	if (!SendRemoteCommand(connection, savepointCommand->data))
1170 	{
1171 		HandleRemoteTransactionConnectionError(connection, raiseErrors);
1172 	}
1173 }
1174 
1175 
1176 /*
1177  * FinishRemoteTransactionSavepointRelease finishes the work
1178  * StartRemoteTransactionSavepointRelease initiated. It blocks if necessary (i.e.
1179  * if PQisBusy() would return true).
1180  */
1181 static void
FinishRemoteTransactionSavepointRelease(MultiConnection * connection,SubTransactionId subId)1182 FinishRemoteTransactionSavepointRelease(MultiConnection *connection,
1183 										SubTransactionId subId)
1184 {
1185 	const bool raiseErrors = true;
1186 	PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
1187 	if (!IsResponseOK(result))
1188 	{
1189 		HandleRemoteTransactionResultError(connection, result, raiseErrors);
1190 	}
1191 
1192 	PQclear(result);
1193 	ForgetResults(connection);
1194 }
1195 
1196 
1197 /*
1198  * StartRemoteTransactionSavepointRollback initiates ROLLBACK TO SAVEPOINT command
1199  * for the given subtransaction id in a non-blocking manner.
1200  */
1201 static void
StartRemoteTransactionSavepointRollback(MultiConnection * connection,SubTransactionId subId)1202 StartRemoteTransactionSavepointRollback(MultiConnection *connection,
1203 										SubTransactionId subId)
1204 {
1205 	const bool raiseErrors = false;
1206 	StringInfo savepointCommand = makeStringInfo();
1207 	appendStringInfo(savepointCommand, "ROLLBACK TO SAVEPOINT savepoint_%u", subId);
1208 
1209 	if (!SendRemoteCommand(connection, savepointCommand->data))
1210 	{
1211 		HandleRemoteTransactionConnectionError(connection, raiseErrors);
1212 	}
1213 }
1214 
1215 
1216 /*
1217  * FinishRemoteTransactionSavepointRollback finishes the work
1218  * StartRemoteTransactionSavepointRollback initiated. It blocks if necessary (i.e.
1219  * if PQisBusy() would return true). It also recovers the transaction from failure
1220  * if transaction is recovering and the rollback command succeeds.
1221  */
1222 static void
FinishRemoteTransactionSavepointRollback(MultiConnection * connection,SubTransactionId subId)1223 FinishRemoteTransactionSavepointRollback(MultiConnection *connection, SubTransactionId
1224 										 subId)
1225 {
1226 	const bool raiseErrors = false;
1227 	RemoteTransaction *transaction = &connection->remoteTransaction;
1228 
1229 	PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
1230 	if (!IsResponseOK(result))
1231 	{
1232 		HandleRemoteTransactionResultError(connection, result, raiseErrors);
1233 	}
1234 
1235 	/* ROLLBACK TO SAVEPOINT succeeded, check if it recovers the transaction */
1236 	else if (transaction->transactionRecovering)
1237 	{
1238 		transaction->transactionFailed = false;
1239 		transaction->transactionRecovering = false;
1240 	}
1241 
1242 	PQclear(result);
1243 	ForgetResults(connection);
1244 
1245 	/* reset transaction state so the executor can accept next commands in transaction */
1246 	transaction->transactionState = REMOTE_TRANS_STARTED;
1247 }
1248 
1249 
1250 /*
1251  * CheckRemoteTransactionsHealth checks if any of the participating transactions in a
1252  * coordinated transaction failed, and what consequence that should have.
1253  * This needs to be called before the coordinated transaction commits (but
1254  * after they've been PREPAREd if 2PC is in use).
1255  */
1256 void
CheckRemoteTransactionsHealth(void)1257 CheckRemoteTransactionsHealth(void)
1258 {
1259 	dlist_iter iter;
1260 
1261 	dlist_foreach(iter, &InProgressTransactions)
1262 	{
1263 		MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
1264 													  iter.cur);
1265 		RemoteTransaction *transaction = &connection->remoteTransaction;
1266 		PGTransactionStatusType status = PQtransactionStatus(connection->pgConn);
1267 
1268 		/* if the connection is in a bad state, so is the transaction's state */
1269 		if (status == PQTRANS_INERROR || status == PQTRANS_UNKNOWN)
1270 		{
1271 			transaction->transactionFailed = true;
1272 		}
1273 
1274 		/*
1275 		 * If a critical connection is marked as failed (and no error has been
1276 		 * raised yet) do so now.
1277 		 */
1278 		if (transaction->transactionFailed && transaction->transactionCritical)
1279 		{
1280 			ereport(ERROR, (errmsg("failure on connection marked as essential: %s:%d",
1281 								   connection->hostname, connection->port)));
1282 		}
1283 	}
1284 }
1285 
1286 
1287 /*
1288  * Assign2PCIdentifier computes the 2PC transaction name to use for a
1289  * transaction. Every prepared transaction should get a new name, i.e. this
1290  * function will need to be called again.
1291  *
1292  * The format of the name is:
1293  *
1294  * citus_<source group>_<pid>_<distributed transaction number>_<connection number>
1295  *
1296  * (at most 5+1+10+1+10+1+20+1+10 = 59 characters, while limit is 64)
1297  *
1298  * The source group is used to distinguish 2PCs started by different
1299  * coordinators. A coordinator will only attempt to recover its own 2PCs.
1300  *
1301  * The pid is used to distinguish different processes on the coordinator, mainly
1302  * to provide some entropy across restarts.
1303  *
1304  * The distributed transaction number is used to distinguish different
1305  * transactions originating from the same node (since restart).
1306  *
1307  * The connection number is used to distinguish connections made to a node
1308  * within the same transaction.
1309  *
1310  */
1311 static void
Assign2PCIdentifier(MultiConnection * connection)1312 Assign2PCIdentifier(MultiConnection *connection)
1313 {
1314 	/* local sequence number used to distinguish different connections */
1315 	static uint32 connectionNumber = 0;
1316 
1317 	/* transaction identifier that is unique across processes */
1318 	uint64 transactionNumber = CurrentDistributedTransactionNumber();
1319 
1320 	/* print all numbers as unsigned to guarantee no minus symbols appear in the name */
1321 	SafeSnprintf(connection->remoteTransaction.preparedName, NAMEDATALEN,
1322 				 PREPARED_TRANSACTION_NAME_FORMAT, GetLocalGroupId(), MyProcPid,
1323 				 transactionNumber, connectionNumber++);
1324 }
1325 
1326 
1327 /*
1328  * ParsePreparedTransactionName parses a prepared transaction name to extract
1329  * the initiator group ID, initiator process ID, distributed transaction number,
1330  * and the connection number. If the transaction name does not match the expected
1331  * format ParsePreparedTransactionName returns false, and true otherwise.
1332  */
1333 bool
ParsePreparedTransactionName(char * preparedTransactionName,int32 * groupId,int * procId,uint64 * transactionNumber,uint32 * connectionNumber)1334 ParsePreparedTransactionName(char *preparedTransactionName,
1335 							 int32 *groupId, int *procId,
1336 							 uint64 *transactionNumber,
1337 							 uint32 *connectionNumber)
1338 {
1339 	char *currentCharPointer = preparedTransactionName;
1340 
1341 	currentCharPointer = strchr(currentCharPointer, '_');
1342 	if (currentCharPointer == NULL)
1343 	{
1344 		return false;
1345 	}
1346 
1347 	/* step ahead of the current '_' character */
1348 	++currentCharPointer;
1349 
1350 	*groupId = strtol(currentCharPointer, NULL, 10);
1351 
1352 	if ((*groupId == COORDINATOR_GROUP_ID && errno == EINVAL) ||
1353 		(*groupId == INT_MAX && errno == ERANGE))
1354 	{
1355 		return false;
1356 	}
1357 
1358 	currentCharPointer = strchr(currentCharPointer, '_');
1359 	if (currentCharPointer == NULL)
1360 	{
1361 		return false;
1362 	}
1363 
1364 	/* step ahead of the current '_' character */
1365 	++currentCharPointer;
1366 
1367 	*procId = strtol(currentCharPointer, NULL, 10);
1368 	if ((*procId == 0 && errno == EINVAL) ||
1369 		(*procId == INT_MAX && errno == ERANGE))
1370 	{
1371 		return false;
1372 	}
1373 
1374 	currentCharPointer = strchr(currentCharPointer, '_');
1375 	if (currentCharPointer == NULL)
1376 	{
1377 		return false;
1378 	}
1379 
1380 	/* step ahead of the current '_' character */
1381 	++currentCharPointer;
1382 
1383 	*transactionNumber = pg_strtouint64(currentCharPointer, NULL, 10);
1384 	if ((*transactionNumber == 0 && errno != 0) ||
1385 		(*transactionNumber == ULLONG_MAX && errno == ERANGE))
1386 	{
1387 		return false;
1388 	}
1389 
1390 	currentCharPointer = strchr(currentCharPointer, '_');
1391 	if (currentCharPointer == NULL)
1392 	{
1393 		return false;
1394 	}
1395 
1396 	/* step ahead of the current '_' character */
1397 	++currentCharPointer;
1398 
1399 	*connectionNumber = strtoul(currentCharPointer, NULL, 10);
1400 	if ((*connectionNumber == 0 && errno == EINVAL) ||
1401 		(*connectionNumber == UINT_MAX && errno == ERANGE))
1402 	{
1403 		return false;
1404 	}
1405 
1406 	return true;
1407 }
1408