1 /*-------------------------------------------------------------------------
2 *
3 * remote_transaction.c
4 * Management of transaction spanning more than one node.
5 *
6 * Copyright (c) Citus Data, Inc.
7 *
8 *-------------------------------------------------------------------------
9 */
10
11 #include "postgres.h"
12
13 #include "libpq-fe.h"
14
15 #include "miscadmin.h"
16
17 #include "access/xact.h"
18 #include "distributed/backend_data.h"
19 #include "distributed/citus_safe_lib.h"
20 #include "distributed/connection_management.h"
21 #include "distributed/listutils.h"
22 #include "distributed/metadata_cache.h"
23 #include "distributed/placement_connection.h"
24 #include "distributed/remote_commands.h"
25 #include "distributed/remote_transaction.h"
26 #include "distributed/transaction_identifier.h"
27 #include "distributed/transaction_management.h"
28 #include "distributed/transaction_recovery.h"
29 #include "distributed/worker_manager.h"
30 #include "utils/builtins.h"
31 #include "utils/hsearch.h"
32
33
34 #define PREPARED_TRANSACTION_NAME_FORMAT "citus_%u_%u_"UINT64_FORMAT "_%u"
35
36
37 static void StartRemoteTransactionSavepointBegin(MultiConnection *connection,
38 SubTransactionId subId);
39 static void FinishRemoteTransactionSavepointBegin(MultiConnection *connection,
40 SubTransactionId subId);
41 static void StartRemoteTransactionSavepointRelease(MultiConnection *connection,
42 SubTransactionId subId);
43 static void FinishRemoteTransactionSavepointRelease(MultiConnection *connection,
44 SubTransactionId subId);
45 static void StartRemoteTransactionSavepointRollback(MultiConnection *connection,
46 SubTransactionId subId);
47 static void FinishRemoteTransactionSavepointRollback(MultiConnection *connection,
48 SubTransactionId subId);
49
50 static void Assign2PCIdentifier(MultiConnection *connection);
51
52
53 /*
54 * StartRemoteTransactionBeging initiates beginning the remote transaction in
55 * a non-blocking manner. The function sends "BEGIN" followed by
56 * assign_distributed_transaction_id() to assign the distributed transaction
57 * id on the remote node.
58 */
59 void
StartRemoteTransactionBegin(struct MultiConnection * connection)60 StartRemoteTransactionBegin(struct MultiConnection *connection)
61 {
62 RemoteTransaction *transaction = &connection->remoteTransaction;
63
64 Assert(transaction->transactionState == REMOTE_TRANS_NOT_STARTED);
65
66 /* remember transaction as being in-progress */
67 dlist_push_tail(&InProgressTransactions, &connection->transactionNode);
68
69 transaction->transactionState = REMOTE_TRANS_STARTING;
70
71 StringInfo beginAndSetDistributedTransactionId =
72 BeginAndSetDistributedTransactionIdCommand();
73
74 /* append context for in-progress SAVEPOINTs for this transaction */
75 List *activeSubXacts = ActiveSubXactContexts();
76 transaction->lastSuccessfulSubXact = TopSubTransactionId;
77 transaction->lastQueuedSubXact = TopSubTransactionId;
78
79 SubXactContext *subXactState = NULL;
80 foreach_ptr(subXactState, activeSubXacts)
81 {
82 /* append SET LOCAL state from when SAVEPOINT was encountered... */
83 if (subXactState->setLocalCmds != NULL)
84 {
85 appendStringInfoString(beginAndSetDistributedTransactionId,
86 subXactState->setLocalCmds->data);
87 }
88
89 /* ... then append SAVEPOINT to enter this subxact */
90 appendStringInfo(beginAndSetDistributedTransactionId,
91 "SAVEPOINT savepoint_%u;", subXactState->subId);
92 transaction->lastQueuedSubXact = subXactState->subId;
93 }
94
95 /* we've pushed into deepest subxact: apply in-progress SET context */
96 if (activeSetStmts != NULL)
97 {
98 appendStringInfoString(beginAndSetDistributedTransactionId, activeSetStmts->data);
99 }
100
101 if (!SendRemoteCommand(connection, beginAndSetDistributedTransactionId->data))
102 {
103 const bool raiseErrors = true;
104
105 HandleRemoteTransactionConnectionError(connection, raiseErrors);
106 }
107
108 transaction->beginSent = true;
109 }
110
111
112 /*
113 * BeginAndSetDistributedTransactionIdCommand returns a command which starts
114 * a transaction and assigns the current distributed transaction id.
115 */
116 StringInfo
BeginAndSetDistributedTransactionIdCommand(void)117 BeginAndSetDistributedTransactionIdCommand(void)
118 {
119 StringInfo beginAndSetDistributedTransactionId = makeStringInfo();
120
121 /*
122 * Explicitly specify READ COMMITTED, the default on the remote
123 * side might have been changed, and that would cause problematic
124 * behaviour.
125 */
126 appendStringInfoString(beginAndSetDistributedTransactionId,
127 "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;");
128
129 /*
130 * Append BEGIN and assign_distributed_transaction_id() statements into a single command
131 * and send both in one step. The reason is purely performance, we don't want
132 * seperate roundtrips for these two statements.
133 */
134 DistributedTransactionId *distributedTransactionId =
135 GetCurrentDistributedTransactionId();
136 const char *timestamp = timestamptz_to_str(distributedTransactionId->timestamp);
137 appendStringInfo(beginAndSetDistributedTransactionId,
138 "SELECT assign_distributed_transaction_id(%d, " UINT64_FORMAT
139 ", '%s');",
140 distributedTransactionId->initiatorNodeIdentifier,
141 distributedTransactionId->transactionNumber,
142 timestamp);
143
144 return beginAndSetDistributedTransactionId;
145 }
146
147
148 /*
149 * FinishRemoteTransactionBegin finishes the work StartRemoteTransactionBegin
150 * initiated. It blocks if necessary (i.e. if PQisBusy() would return true).
151 */
152 void
FinishRemoteTransactionBegin(struct MultiConnection * connection)153 FinishRemoteTransactionBegin(struct MultiConnection *connection)
154 {
155 RemoteTransaction *transaction = &connection->remoteTransaction;
156 bool raiseErrors = true;
157
158 Assert(transaction->transactionState == REMOTE_TRANS_STARTING);
159
160 bool clearSuccessful = ClearResults(connection, raiseErrors);
161 if (clearSuccessful)
162 {
163 transaction->transactionState = REMOTE_TRANS_STARTED;
164 transaction->lastSuccessfulSubXact = transaction->lastQueuedSubXact;
165 }
166
167 if (!transaction->transactionFailed)
168 {
169 Assert(PQtransactionStatus(connection->pgConn) == PQTRANS_INTRANS);
170 }
171 }
172
173
174 /*
175 * RemoteTransactionBegin begins a remote transaction in a blocking manner.
176 */
177 void
RemoteTransactionBegin(struct MultiConnection * connection)178 RemoteTransactionBegin(struct MultiConnection *connection)
179 {
180 StartRemoteTransactionBegin(connection);
181 FinishRemoteTransactionBegin(connection);
182 }
183
184
185 /*
186 * RemoteTransactionListBegin sends BEGIN over all connections in the
187 * given connection list and waits for all of them to finish.
188 */
189 void
RemoteTransactionListBegin(List * connectionList)190 RemoteTransactionListBegin(List *connectionList)
191 {
192 MultiConnection *connection = NULL;
193
194 /* send BEGIN to all nodes */
195 foreach_ptr(connection, connectionList)
196 {
197 StartRemoteTransactionBegin(connection);
198 }
199
200 /* wait for BEGIN to finish on all nodes */
201 foreach_ptr(connection, connectionList)
202 {
203 FinishRemoteTransactionBegin(connection);
204 }
205 }
206
207
208 /*
209 * StartRemoteTransactionCommit initiates transaction commit in a non-blocking
210 * manner. If the transaction is in a failed state, it'll instead get rolled
211 * back.
212 */
213 void
StartRemoteTransactionCommit(MultiConnection * connection)214 StartRemoteTransactionCommit(MultiConnection *connection)
215 {
216 RemoteTransaction *transaction = &connection->remoteTransaction;
217 const bool raiseErrors = false;
218
219 /* can only commit if transaction is in progress */
220 Assert(transaction->transactionState != REMOTE_TRANS_NOT_STARTED);
221
222 /* can't commit if we already started to commit or abort */
223 Assert(transaction->transactionState < REMOTE_TRANS_1PC_ABORTING);
224
225 if (transaction->transactionFailed)
226 {
227 /* abort the transaction if it failed */
228 transaction->transactionState = REMOTE_TRANS_1PC_ABORTING;
229
230 /*
231 * Try sending an ROLLBACK; Depending on the state that won't
232 * succeed, but let's try. Have to clear previous results
233 * first.
234 */
235 ForgetResults(connection); /* try to clear pending stuff */
236 if (!SendRemoteCommand(connection, "ROLLBACK"))
237 {
238 /* no point in reporting a likely redundant message */
239 }
240 }
241 else if (transaction->transactionState == REMOTE_TRANS_PREPARED)
242 {
243 /* commit the prepared transaction */
244 StringInfoData command;
245
246 initStringInfo(&command);
247 appendStringInfo(&command, "COMMIT PREPARED %s",
248 quote_literal_cstr(transaction->preparedName));
249
250 transaction->transactionState = REMOTE_TRANS_2PC_COMMITTING;
251
252 if (!SendRemoteCommand(connection, command.data))
253 {
254 HandleRemoteTransactionConnectionError(connection, raiseErrors);
255 }
256 }
257 else
258 {
259 /* initiate remote transaction commit */
260 transaction->transactionState = REMOTE_TRANS_1PC_COMMITTING;
261
262 if (!SendRemoteCommand(connection, "COMMIT"))
263 {
264 /*
265 * For a moment there I thought we were in trouble.
266 *
267 * Failing in this state means that we don't know whether the
268 * commit has succeeded.
269 */
270 HandleRemoteTransactionConnectionError(connection, raiseErrors);
271 }
272 }
273 }
274
275
276 /*
277 * FinishRemoteTransactionCommit finishes the work
278 * StartRemoteTransactionCommit initiated. It blocks if necessary (i.e. if
279 * PQisBusy() would return true).
280 */
281 void
FinishRemoteTransactionCommit(MultiConnection * connection)282 FinishRemoteTransactionCommit(MultiConnection *connection)
283 {
284 RemoteTransaction *transaction = &connection->remoteTransaction;
285 const bool raiseErrors = false;
286
287 Assert(transaction->transactionState == REMOTE_TRANS_1PC_ABORTING ||
288 transaction->transactionState == REMOTE_TRANS_1PC_COMMITTING ||
289 transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING);
290
291 PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
292
293 if (!IsResponseOK(result))
294 {
295 HandleRemoteTransactionResultError(connection, result, raiseErrors);
296
297 /*
298 * Failing in this state means that we will often not know whether
299 * the commit has succeeded (particularly in case of network
300 * troubles).
301 *
302 * XXX: It might be worthwhile to discern cases where we got a
303 * proper error back from postgres (i.e. COMMIT was received but
304 * produced an error) from cases where the connection failed
305 * before getting a reply.
306 */
307
308 if (transaction->transactionState == REMOTE_TRANS_1PC_COMMITTING)
309 {
310 ereport(WARNING, (errmsg("failed to commit transaction on %s:%d",
311 connection->hostname, connection->port)));
312 }
313 else if (transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING)
314 {
315 ereport(WARNING, (errmsg("failed to commit transaction on %s:%d",
316 connection->hostname, connection->port)));
317 }
318 }
319 else if (transaction->transactionState == REMOTE_TRANS_1PC_ABORTING ||
320 transaction->transactionState == REMOTE_TRANS_2PC_ABORTING)
321 {
322 transaction->transactionState = REMOTE_TRANS_ABORTED;
323 }
324 else
325 {
326 transaction->transactionState = REMOTE_TRANS_COMMITTED;
327 }
328
329 PQclear(result);
330
331 ForgetResults(connection);
332 }
333
334
335 /*
336 * RemoteTransactionCommit commits (or aborts, if the transaction failed) a
337 * remote transaction in a blocking manner.
338 */
339 void
RemoteTransactionCommit(MultiConnection * connection)340 RemoteTransactionCommit(MultiConnection *connection)
341 {
342 StartRemoteTransactionCommit(connection);
343 FinishRemoteTransactionCommit(connection);
344 }
345
346
347 /*
348 * StartRemoteTransactionAbort initiates abortin the transaction in a
349 * non-blocking manner.
350 */
351 void
StartRemoteTransactionAbort(MultiConnection * connection)352 StartRemoteTransactionAbort(MultiConnection *connection)
353 {
354 RemoteTransaction *transaction = &connection->remoteTransaction;
355 const bool raiseErrors = false;
356
357 Assert(transaction->transactionState != REMOTE_TRANS_NOT_STARTED);
358
359 /*
360 * Clear previous results, so we have a better chance to send ROLLBACK
361 * [PREPARED]. If we've previously sent a PREPARE TRANSACTION, we always
362 * want to wait for that result, as that shouldn't take long and will
363 * reserve resources. But if there's another query running, we don't want
364 * to wait, because a long running statement may be running, so force it to
365 * be killed in that case.
366 */
367 if (transaction->transactionState == REMOTE_TRANS_PREPARING ||
368 transaction->transactionState == REMOTE_TRANS_PREPARED)
369 {
370 StringInfoData command;
371
372 /* await PREPARE TRANSACTION results, closing the connection would leave it dangling */
373 ForgetResults(connection);
374
375 initStringInfo(&command);
376 appendStringInfo(&command, "ROLLBACK PREPARED %s",
377 quote_literal_cstr(transaction->preparedName));
378
379 if (!SendRemoteCommand(connection, command.data))
380 {
381 HandleRemoteTransactionConnectionError(connection, raiseErrors);
382 }
383 else
384 {
385 transaction->transactionState = REMOTE_TRANS_2PC_ABORTING;
386 }
387 }
388 else
389 {
390 /*
391 * In case of a cancellation, the connection might still be working
392 * on some commands. Try to consume the results such that the
393 * connection can be reused, but do not want to wait for commands
394 * to finish. Instead we just close the connection if the command
395 * is still busy.
396 */
397 if (!ClearResultsIfReady(connection))
398 {
399 ShutdownConnection(connection);
400
401 /* FinishRemoteTransactionAbort will emit warning */
402 return;
403 }
404
405 if (!SendRemoteCommand(connection, "ROLLBACK"))
406 {
407 /* no point in reporting a likely redundant message */
408 MarkRemoteTransactionFailed(connection, raiseErrors);
409 }
410 else
411 {
412 transaction->transactionState = REMOTE_TRANS_1PC_ABORTING;
413 }
414 }
415 }
416
417
418 /*
419 * FinishRemoteTransactionAbort finishes the work StartRemoteTransactionAbort
420 * initiated. It blocks if necessary (i.e. if PQisBusy() would return true).
421 */
422 void
FinishRemoteTransactionAbort(MultiConnection * connection)423 FinishRemoteTransactionAbort(MultiConnection *connection)
424 {
425 RemoteTransaction *transaction = &connection->remoteTransaction;
426 const bool raiseErrors = false;
427
428 if (transaction->transactionState == REMOTE_TRANS_2PC_ABORTING)
429 {
430 PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
431 if (!IsResponseOK(result))
432 {
433 HandleRemoteTransactionResultError(connection, result, raiseErrors);
434 }
435
436 PQclear(result);
437 }
438
439 /*
440 * Try to consume results of any in-progress commands. In the 1PC case
441 * this is also where we consume the result of the ROLLBACK.
442 *
443 * If we don't succeed the connection will be in a bad state, so we close it.
444 */
445 if (!ClearResults(connection, raiseErrors))
446 {
447 ShutdownConnection(connection);
448 }
449
450 transaction->transactionState = REMOTE_TRANS_ABORTED;
451 }
452
453
454 /*
455 * RemoteTransactionAbort aborts a remote transaction in a blocking manner.
456 */
457 void
RemoteTransactionAbort(MultiConnection * connection)458 RemoteTransactionAbort(MultiConnection *connection)
459 {
460 StartRemoteTransactionAbort(connection);
461 FinishRemoteTransactionAbort(connection);
462 }
463
464
465 /*
466 * StartRemoteTransactionPrepare initiates preparing the transaction in a
467 * non-blocking manner.
468 */
469 void
StartRemoteTransactionPrepare(struct MultiConnection * connection)470 StartRemoteTransactionPrepare(struct MultiConnection *connection)
471 {
472 RemoteTransaction *transaction = &connection->remoteTransaction;
473 StringInfoData command;
474 const bool raiseErrors = true;
475
476 /* can't prepare a nonexistant transaction */
477 Assert(transaction->transactionState != REMOTE_TRANS_NOT_STARTED);
478
479 /* can't prepare in a failed transaction */
480 Assert(!transaction->transactionFailed);
481
482 /* can't prepare if already started to prepare/abort/commit */
483 Assert(transaction->transactionState < REMOTE_TRANS_PREPARING);
484
485 Assign2PCIdentifier(connection);
486
487 /* log transactions to workers in pg_dist_transaction */
488 WorkerNode *workerNode = FindWorkerNode(connection->hostname, connection->port);
489 if (workerNode != NULL)
490 {
491 LogTransactionRecord(workerNode->groupId, transaction->preparedName);
492 }
493
494 initStringInfo(&command);
495 appendStringInfo(&command, "PREPARE TRANSACTION %s",
496 quote_literal_cstr(transaction->preparedName));
497
498 if (!SendRemoteCommand(connection, command.data))
499 {
500 HandleRemoteTransactionConnectionError(connection, raiseErrors);
501 }
502 else
503 {
504 transaction->transactionState = REMOTE_TRANS_PREPARING;
505 }
506 }
507
508
509 /*
510 * FinishRemoteTransactionPrepare finishes the work
511 * StartRemoteTransactionPrepare initiated. It blocks if necessary (i.e. if
512 * PQisBusy() would return true).
513 */
514 void
FinishRemoteTransactionPrepare(struct MultiConnection * connection)515 FinishRemoteTransactionPrepare(struct MultiConnection *connection)
516 {
517 RemoteTransaction *transaction = &connection->remoteTransaction;
518 const bool raiseErrors = true;
519
520 Assert(transaction->transactionState == REMOTE_TRANS_PREPARING);
521
522 PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
523
524 if (!IsResponseOK(result))
525 {
526 transaction->transactionState = REMOTE_TRANS_ABORTED;
527 HandleRemoteTransactionResultError(connection, result, raiseErrors);
528 }
529 else
530 {
531 transaction->transactionState = REMOTE_TRANS_PREPARED;
532 }
533
534 PQclear(result);
535
536 /*
537 * Try to consume results of PREPARE TRANSACTION command. If we don't
538 * succeed, rollback the transaction. Note that we've not committed on
539 * any node yet, and we're not sure about the state of the worker node.
540 * So rollbacking seems to be the safest action if the worker is
541 * in a state where it can actually rollback.
542 */
543 if (!ClearResults(connection, raiseErrors))
544 {
545 ereport(ERROR, (errmsg("failed to prepare transaction '%s' on host %s:%d",
546 transaction->preparedName, connection->hostname,
547 connection->port),
548 errhint("Try re-running the command.")));
549 }
550 }
551
552
553 /*
554 * RemoteTransactionBeginIfNecessary is a convenience wrapper around
555 * RemoteTransactionsBeginIfNecessary(), for a single connection.
556 */
557 void
RemoteTransactionBeginIfNecessary(MultiConnection * connection)558 RemoteTransactionBeginIfNecessary(MultiConnection *connection)
559 {
560 /* just delegate */
561 if (InCoordinatedTransaction())
562 {
563 List *connectionList = list_make1(connection);
564
565 RemoteTransactionsBeginIfNecessary(connectionList);
566 list_free(connectionList);
567 }
568 }
569
570
571 /*
572 * RemoteTransactionsBeginIfNecessary begins, if necessary according to this
573 * session's coordinated transaction state, and the remote transaction's
574 * state, an explicit transaction on all the connections. This is done in
575 * parallel, to lessen latency penalties.
576 */
577 void
RemoteTransactionsBeginIfNecessary(List * connectionList)578 RemoteTransactionsBeginIfNecessary(List *connectionList)
579 {
580 MultiConnection *connection = NULL;
581
582 /*
583 * Don't do anything if not in a coordinated transaction. That allows the
584 * same code to work both in situations that uses transactions, and when
585 * not.
586 */
587 if (!InCoordinatedTransaction())
588 {
589 return;
590 }
591
592 /* issue BEGIN to all connections needing it */
593 foreach_ptr(connection, connectionList)
594 {
595 RemoteTransaction *transaction = &connection->remoteTransaction;
596
597 /* can't send BEGIN if a command already is in progress */
598 Assert(PQtransactionStatus(connection->pgConn) != PQTRANS_ACTIVE);
599
600 /*
601 * If a transaction already is in progress (including having failed),
602 * don't start it again. That's quite normal if a piece of code allows
603 * cached connections.
604 */
605 if (transaction->transactionState != REMOTE_TRANS_NOT_STARTED)
606 {
607 continue;
608 }
609
610 StartRemoteTransactionBegin(connection);
611 }
612
613 bool raiseInterrupts = true;
614 WaitForAllConnections(connectionList, raiseInterrupts);
615
616 /* get result of all the BEGINs */
617 foreach_ptr(connection, connectionList)
618 {
619 RemoteTransaction *transaction = &connection->remoteTransaction;
620
621 /*
622 * Only handle BEGIN results on connections that are in process of
623 * starting a transaction, and haven't already failed (e.g. by not
624 * being able to send BEGIN due to a network failure).
625 */
626 if (transaction->transactionFailed ||
627 transaction->transactionState != REMOTE_TRANS_STARTING)
628 {
629 continue;
630 }
631
632 FinishRemoteTransactionBegin(connection);
633 }
634 }
635
636
637 /*
638 * HandleRemoteTransactionConnectionError records a transaction as having failed
639 * and throws a connection error if the transaction was critical and raiseErrors
640 * is true, or a warning otherwise.
641 */
642 void
HandleRemoteTransactionConnectionError(MultiConnection * connection,bool raiseErrors)643 HandleRemoteTransactionConnectionError(MultiConnection *connection, bool raiseErrors)
644 {
645 RemoteTransaction *transaction = &connection->remoteTransaction;
646
647 transaction->transactionFailed = true;
648
649 if (transaction->transactionCritical && raiseErrors)
650 {
651 ReportConnectionError(connection, ERROR);
652 }
653 else
654 {
655 ReportConnectionError(connection, WARNING);
656 }
657 }
658
659
660 /*
661 * HandleRemoteTransactionResultError records a transaction as having failed
662 * and throws a result error if the transaction was critical and raiseErrors
663 * is true, or a warning otherwise.
664 */
665 void
HandleRemoteTransactionResultError(MultiConnection * connection,PGresult * result,bool raiseErrors)666 HandleRemoteTransactionResultError(MultiConnection *connection, PGresult *result, bool
667 raiseErrors)
668 {
669 RemoteTransaction *transaction = &connection->remoteTransaction;
670
671 transaction->transactionFailed = true;
672
673 if (transaction->transactionCritical && raiseErrors)
674 {
675 ReportResultError(connection, result, ERROR);
676 }
677 else
678 {
679 ReportResultError(connection, result, WARNING);
680 }
681 }
682
683
684 /*
685 * MarkRemoteTransactionFailed records a transaction as having failed.
686 *
687 * If the connection is marked as critical, and allowErrorPromotion is true,
688 * this routine will ERROR out. The allowErrorPromotion case is primarily
689 * required for the transaction management code itself. Usually it is helpful
690 * to fail as soon as possible. If !allowErrorPromotion transaction commit
691 * will instead issue an error before committing on any node.
692 */
693 void
MarkRemoteTransactionFailed(MultiConnection * connection,bool allowErrorPromotion)694 MarkRemoteTransactionFailed(MultiConnection *connection, bool allowErrorPromotion)
695 {
696 RemoteTransaction *transaction = &connection->remoteTransaction;
697
698 transaction->transactionFailed = true;
699
700 /*
701 * If the connection is marked as critical, fail the entire coordinated
702 * transaction. If allowed.
703 */
704 if (transaction->transactionCritical && allowErrorPromotion)
705 {
706 ereport(ERROR, (errmsg("failure on connection marked as essential: %s:%d",
707 connection->hostname, connection->port)));
708 }
709 }
710
711
712 /*
713 * MarkRemoteTransactionCritical signals that failures on this remote
714 * transaction should fail the entire coordinated transaction.
715 */
716 void
MarkRemoteTransactionCritical(struct MultiConnection * connection)717 MarkRemoteTransactionCritical(struct MultiConnection *connection)
718 {
719 RemoteTransaction *transaction = &connection->remoteTransaction;
720
721 transaction->transactionCritical = true;
722 }
723
724
725 /*
726 * CloseRemoteTransaction handles closing a connection that, potentially, is
727 * part of a coordinated transaction. This should only ever be called from
728 * connection_management.c, while closing a connection during a transaction.
729 */
730 void
CloseRemoteTransaction(struct MultiConnection * connection)731 CloseRemoteTransaction(struct MultiConnection *connection)
732 {
733 RemoteTransaction *transaction = &connection->remoteTransaction;
734
735 /* unlink from list of open transactions, if necessary */
736 if (transaction->transactionState != REMOTE_TRANS_NOT_STARTED)
737 {
738 /* XXX: Should we error out for a critical transaction? */
739
740 dlist_delete(&connection->transactionNode);
741 }
742 }
743
744
745 /*
746 * ResetRemoteTransaction resets the state of the transaction after the end of
747 * the main transaction, if the connection is being reused.
748 */
749 void
ResetRemoteTransaction(struct MultiConnection * connection)750 ResetRemoteTransaction(struct MultiConnection *connection)
751 {
752 RemoteTransaction *transaction = &connection->remoteTransaction;
753
754 /* just reset the entire state, relying on 0 being invalid/false */
755 memset(transaction, 0, sizeof(*transaction));
756 }
757
758
759 /*
760 * CoordinatedRemoteTransactionsPrepare PREPAREs a 2PC transaction on all
761 * non-failed transactions participating in the coordinated transaction.
762 */
763 void
CoordinatedRemoteTransactionsPrepare(void)764 CoordinatedRemoteTransactionsPrepare(void)
765 {
766 dlist_iter iter;
767 List *connectionList = NIL;
768
769 /* issue PREPARE TRANSACTION; to all relevant remote nodes */
770
771 /* asynchronously send PREPARE */
772 dlist_foreach(iter, &InProgressTransactions)
773 {
774 MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
775 iter.cur);
776 RemoteTransaction *transaction = &connection->remoteTransaction;
777
778 Assert(transaction->transactionState != REMOTE_TRANS_NOT_STARTED);
779
780 /* can't PREPARE a transaction that failed */
781 if (transaction->transactionFailed)
782 {
783 continue;
784 }
785
786 /*
787 * Check if any DML or DDL is executed over the connection on any
788 * placement/table. If yes, we start preparing the transaction, otherwise
789 * we skip prepare since the connection didn't perform any write (read-only)
790 */
791 if (ConnectionModifiedPlacement(connection))
792 {
793 StartRemoteTransactionPrepare(connection);
794 connectionList = lappend(connectionList, connection);
795 }
796 }
797
798 bool raiseInterrupts = true;
799 WaitForAllConnections(connectionList, raiseInterrupts);
800
801 /* Wait for result */
802 dlist_foreach(iter, &InProgressTransactions)
803 {
804 MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
805 iter.cur);
806 RemoteTransaction *transaction = &connection->remoteTransaction;
807
808 if (transaction->transactionState != REMOTE_TRANS_PREPARING)
809 {
810 /*
811 * Verify that either the transaction failed, hence we couldn't prepare
812 * or the connection didn't modify any placement
813 */
814 Assert(transaction->transactionFailed ||
815 !ConnectionModifiedPlacement(connection));
816 continue;
817 }
818
819 FinishRemoteTransactionPrepare(connection);
820 }
821
822 CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED;
823 }
824
825
826 /*
827 * CoordinatedRemoteTransactionsCommit performs distributed transactions
828 * handling at commit time. This will be called at XACT_EVENT_PRE_COMMIT if
829 * 1PC commits are used - so shards can still be invalidated - and at
830 * XACT_EVENT_COMMIT if 2PC is being used.
831 *
832 * Note that this routine has to issue rollbacks for failed transactions.
833 */
834 void
CoordinatedRemoteTransactionsCommit(void)835 CoordinatedRemoteTransactionsCommit(void)
836 {
837 dlist_iter iter;
838 List *connectionList = NIL;
839
840 /*
841 * Issue appropriate transaction commands to remote nodes. If everything
842 * went well that's going to be COMMIT or COMMIT PREPARED, if individual
843 * connections had errors, some or all of them might require a ROLLBACK.
844 *
845 * First send the command asynchronously over all connections.
846 */
847 dlist_foreach(iter, &InProgressTransactions)
848 {
849 MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
850 iter.cur);
851 RemoteTransaction *transaction = &connection->remoteTransaction;
852
853 if (transaction->transactionState == REMOTE_TRANS_NOT_STARTED ||
854 transaction->transactionState == REMOTE_TRANS_1PC_COMMITTING ||
855 transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING ||
856 transaction->transactionState == REMOTE_TRANS_COMMITTED ||
857 transaction->transactionState == REMOTE_TRANS_ABORTED)
858 {
859 continue;
860 }
861
862 StartRemoteTransactionCommit(connection);
863 connectionList = lappend(connectionList, connection);
864 }
865
866 bool raiseInterrupts = false;
867 WaitForAllConnections(connectionList, raiseInterrupts);
868
869 /* wait for the replies to the commands to come in */
870 dlist_foreach(iter, &InProgressTransactions)
871 {
872 MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
873 iter.cur);
874 RemoteTransaction *transaction = &connection->remoteTransaction;
875
876 /* nothing to do if not committing / aborting */
877 if (transaction->transactionState != REMOTE_TRANS_1PC_COMMITTING &&
878 transaction->transactionState != REMOTE_TRANS_2PC_COMMITTING &&
879 transaction->transactionState != REMOTE_TRANS_1PC_ABORTING &&
880 transaction->transactionState != REMOTE_TRANS_2PC_ABORTING)
881 {
882 continue;
883 }
884
885 FinishRemoteTransactionCommit(connection);
886 }
887 }
888
889
890 /*
891 * CoordinatedRemoteTransactionsAbort performs distributed transactions
892 * handling at abort time.
893 *
894 * This issues ROLLBACKS and ROLLBACK PREPARED depending on whether the remote
895 * transaction has been prepared or not.
896 */
897 void
CoordinatedRemoteTransactionsAbort(void)898 CoordinatedRemoteTransactionsAbort(void)
899 {
900 dlist_iter iter;
901 List *connectionList = NIL;
902
903 /* asynchronously send ROLLBACK [PREPARED] */
904 dlist_foreach(iter, &InProgressTransactions)
905 {
906 MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
907 iter.cur);
908 RemoteTransaction *transaction = &connection->remoteTransaction;
909
910 if (transaction->transactionState == REMOTE_TRANS_NOT_STARTED ||
911 transaction->transactionState == REMOTE_TRANS_1PC_ABORTING ||
912 transaction->transactionState == REMOTE_TRANS_2PC_ABORTING ||
913 transaction->transactionState == REMOTE_TRANS_ABORTED)
914 {
915 continue;
916 }
917
918 StartRemoteTransactionAbort(connection);
919 connectionList = lappend(connectionList, connection);
920 }
921
922 bool raiseInterrupts = false;
923 WaitForAllConnections(connectionList, raiseInterrupts);
924
925 /* and wait for the results */
926 dlist_foreach(iter, &InProgressTransactions)
927 {
928 MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
929 iter.cur);
930 RemoteTransaction *transaction = &connection->remoteTransaction;
931
932 if (transaction->transactionState != REMOTE_TRANS_1PC_ABORTING &&
933 transaction->transactionState != REMOTE_TRANS_2PC_ABORTING)
934 {
935 continue;
936 }
937
938 FinishRemoteTransactionAbort(connection);
939 }
940 }
941
942
943 /*
944 * CoordinatedRemoteTransactionsSavepointBegin sends the SAVEPOINT command for
945 * the given sub-transaction id to all connections participating in the current
946 * transaction.
947 */
948 void
CoordinatedRemoteTransactionsSavepointBegin(SubTransactionId subId)949 CoordinatedRemoteTransactionsSavepointBegin(SubTransactionId subId)
950 {
951 dlist_iter iter;
952 const bool raiseInterrupts = true;
953 List *connectionList = NIL;
954
955 /* asynchronously send SAVEPOINT */
956 dlist_foreach(iter, &InProgressTransactions)
957 {
958 MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
959 iter.cur);
960 RemoteTransaction *transaction = &connection->remoteTransaction;
961 if (transaction->transactionFailed)
962 {
963 continue;
964 }
965
966 StartRemoteTransactionSavepointBegin(connection, subId);
967 connectionList = lappend(connectionList, connection);
968 }
969
970 WaitForAllConnections(connectionList, raiseInterrupts);
971
972 /* and wait for the results */
973 dlist_foreach(iter, &InProgressTransactions)
974 {
975 MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
976 iter.cur);
977 RemoteTransaction *transaction = &connection->remoteTransaction;
978 if (transaction->transactionFailed)
979 {
980 continue;
981 }
982
983 FinishRemoteTransactionSavepointBegin(connection, subId);
984
985 if (!transaction->transactionFailed)
986 {
987 transaction->lastSuccessfulSubXact = subId;
988 }
989 }
990 }
991
992
993 /*
994 * CoordinatedRemoteTransactionsSavepointRelease sends the RELEASE SAVEPOINT
995 * command for the given sub-transaction id to all connections participating in
996 * the current transaction.
997 */
998 void
CoordinatedRemoteTransactionsSavepointRelease(SubTransactionId subId)999 CoordinatedRemoteTransactionsSavepointRelease(SubTransactionId subId)
1000 {
1001 dlist_iter iter;
1002 const bool raiseInterrupts = true;
1003 List *connectionList = NIL;
1004
1005 /* asynchronously send RELEASE SAVEPOINT */
1006 dlist_foreach(iter, &InProgressTransactions)
1007 {
1008 MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
1009 iter.cur);
1010 RemoteTransaction *transaction = &connection->remoteTransaction;
1011 if (transaction->transactionFailed)
1012 {
1013 continue;
1014 }
1015
1016 StartRemoteTransactionSavepointRelease(connection, subId);
1017 connectionList = lappend(connectionList, connection);
1018 }
1019
1020 WaitForAllConnections(connectionList, raiseInterrupts);
1021
1022 /* and wait for the results */
1023 dlist_foreach(iter, &InProgressTransactions)
1024 {
1025 MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
1026 iter.cur);
1027 RemoteTransaction *transaction = &connection->remoteTransaction;
1028 if (transaction->transactionFailed)
1029 {
1030 continue;
1031 }
1032
1033 FinishRemoteTransactionSavepointRelease(connection, subId);
1034 }
1035 }
1036
1037
1038 /*
1039 * CoordinatedRemoteTransactionsSavepointRollback sends the ROLLBACK TO SAVEPOINT
1040 * command for the given sub-transaction id to all connections participating in
1041 * the current transaction.
1042 */
1043 void
CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId)1044 CoordinatedRemoteTransactionsSavepointRollback(SubTransactionId subId)
1045 {
1046 dlist_iter iter;
1047 const bool raiseInterrupts = false;
1048 List *connectionList = NIL;
1049
1050 /* asynchronously send ROLLBACK TO SAVEPOINT */
1051 dlist_foreach(iter, &InProgressTransactions)
1052 {
1053 MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
1054 iter.cur);
1055 RemoteTransaction *transaction = &connection->remoteTransaction;
1056
1057 /* cancel any ongoing queries before issuing rollback */
1058 SendCancelationRequest(connection);
1059
1060 /* clear results, but don't show cancelation warning messages from workers. */
1061 ClearResultsDiscardWarnings(connection, raiseInterrupts);
1062
1063 if (transaction->transactionFailed)
1064 {
1065 if (transaction->lastSuccessfulSubXact <= subId)
1066 {
1067 transaction->transactionRecovering = true;
1068
1069 /*
1070 * Clear the results of the failed query so we can send the ROLLBACK
1071 * TO SAVEPOINT command for a savepoint that can recover the transaction
1072 * from failure.
1073 */
1074 ForgetResults(connection);
1075 }
1076 else
1077 {
1078 continue;
1079 }
1080 }
1081 StartRemoteTransactionSavepointRollback(connection, subId);
1082 connectionList = lappend(connectionList, connection);
1083 }
1084
1085 WaitForAllConnections(connectionList, raiseInterrupts);
1086
1087 /* and wait for the results */
1088 dlist_foreach(iter, &InProgressTransactions)
1089 {
1090 MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
1091 iter.cur);
1092 RemoteTransaction *transaction = &connection->remoteTransaction;
1093 if (transaction->transactionFailed && !transaction->transactionRecovering)
1094 {
1095 continue;
1096 }
1097
1098 FinishRemoteTransactionSavepointRollback(connection, subId);
1099
1100 /*
1101 * We unclaim the connection now so it can be used again when
1102 * continuing after the ROLLBACK TO SAVEPOINT.
1103 * XXX: We do not undo our hadDML/hadDDL flags. This could result in
1104 * some queries not being allowed on Citus that would actually be fine
1105 * to execute. Changing this would require us to keep track for each
1106 * savepoint which placement connections had DDL/DML executed at that
1107 * point and if they were already. We also do not call
1108 * ResetShardPlacementAssociation. This might result in suboptimal
1109 * parallelism, because of placement associations that are not really
1110 * necessary anymore because of ROLLBACK TO SAVEPOINT. To change this
1111 * we would need to keep track of when a connection becomes associated
1112 * to a placement.
1113 */
1114 UnclaimConnection(connection);
1115 }
1116 }
1117
1118
1119 /*
1120 * StartRemoteTransactionSavepointBegin initiates SAVEPOINT command for the given
1121 * subtransaction id in a non-blocking manner.
1122 */
1123 static void
StartRemoteTransactionSavepointBegin(MultiConnection * connection,SubTransactionId subId)1124 StartRemoteTransactionSavepointBegin(MultiConnection *connection, SubTransactionId subId)
1125 {
1126 const bool raiseErrors = true;
1127 StringInfo savepointCommand = makeStringInfo();
1128 appendStringInfo(savepointCommand, "SAVEPOINT savepoint_%u", subId);
1129
1130 if (!SendRemoteCommand(connection, savepointCommand->data))
1131 {
1132 HandleRemoteTransactionConnectionError(connection, raiseErrors);
1133 }
1134 }
1135
1136
1137 /*
1138 * FinishRemoteTransactionSavepointBegin finishes the work
1139 * StartRemoteTransactionSavepointBegin initiated. It blocks if necessary (i.e.
1140 * if PQisBusy() would return true).
1141 */
1142 static void
FinishRemoteTransactionSavepointBegin(MultiConnection * connection,SubTransactionId subId)1143 FinishRemoteTransactionSavepointBegin(MultiConnection *connection, SubTransactionId subId)
1144 {
1145 const bool raiseErrors = true;
1146 PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
1147 if (!IsResponseOK(result))
1148 {
1149 HandleRemoteTransactionResultError(connection, result, raiseErrors);
1150 }
1151
1152 PQclear(result);
1153 ForgetResults(connection);
1154 }
1155
1156
1157 /*
1158 * StartRemoteTransactionSavepointRelease initiates RELEASE SAVEPOINT command for
1159 * the given subtransaction id in a non-blocking manner.
1160 */
1161 static void
StartRemoteTransactionSavepointRelease(MultiConnection * connection,SubTransactionId subId)1162 StartRemoteTransactionSavepointRelease(MultiConnection *connection,
1163 SubTransactionId subId)
1164 {
1165 const bool raiseErrors = true;
1166 StringInfo savepointCommand = makeStringInfo();
1167 appendStringInfo(savepointCommand, "RELEASE SAVEPOINT savepoint_%u", subId);
1168
1169 if (!SendRemoteCommand(connection, savepointCommand->data))
1170 {
1171 HandleRemoteTransactionConnectionError(connection, raiseErrors);
1172 }
1173 }
1174
1175
1176 /*
1177 * FinishRemoteTransactionSavepointRelease finishes the work
1178 * StartRemoteTransactionSavepointRelease initiated. It blocks if necessary (i.e.
1179 * if PQisBusy() would return true).
1180 */
1181 static void
FinishRemoteTransactionSavepointRelease(MultiConnection * connection,SubTransactionId subId)1182 FinishRemoteTransactionSavepointRelease(MultiConnection *connection,
1183 SubTransactionId subId)
1184 {
1185 const bool raiseErrors = true;
1186 PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
1187 if (!IsResponseOK(result))
1188 {
1189 HandleRemoteTransactionResultError(connection, result, raiseErrors);
1190 }
1191
1192 PQclear(result);
1193 ForgetResults(connection);
1194 }
1195
1196
1197 /*
1198 * StartRemoteTransactionSavepointRollback initiates ROLLBACK TO SAVEPOINT command
1199 * for the given subtransaction id in a non-blocking manner.
1200 */
1201 static void
StartRemoteTransactionSavepointRollback(MultiConnection * connection,SubTransactionId subId)1202 StartRemoteTransactionSavepointRollback(MultiConnection *connection,
1203 SubTransactionId subId)
1204 {
1205 const bool raiseErrors = false;
1206 StringInfo savepointCommand = makeStringInfo();
1207 appendStringInfo(savepointCommand, "ROLLBACK TO SAVEPOINT savepoint_%u", subId);
1208
1209 if (!SendRemoteCommand(connection, savepointCommand->data))
1210 {
1211 HandleRemoteTransactionConnectionError(connection, raiseErrors);
1212 }
1213 }
1214
1215
1216 /*
1217 * FinishRemoteTransactionSavepointRollback finishes the work
1218 * StartRemoteTransactionSavepointRollback initiated. It blocks if necessary (i.e.
1219 * if PQisBusy() would return true). It also recovers the transaction from failure
1220 * if transaction is recovering and the rollback command succeeds.
1221 */
1222 static void
FinishRemoteTransactionSavepointRollback(MultiConnection * connection,SubTransactionId subId)1223 FinishRemoteTransactionSavepointRollback(MultiConnection *connection, SubTransactionId
1224 subId)
1225 {
1226 const bool raiseErrors = false;
1227 RemoteTransaction *transaction = &connection->remoteTransaction;
1228
1229 PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
1230 if (!IsResponseOK(result))
1231 {
1232 HandleRemoteTransactionResultError(connection, result, raiseErrors);
1233 }
1234
1235 /* ROLLBACK TO SAVEPOINT succeeded, check if it recovers the transaction */
1236 else if (transaction->transactionRecovering)
1237 {
1238 transaction->transactionFailed = false;
1239 transaction->transactionRecovering = false;
1240 }
1241
1242 PQclear(result);
1243 ForgetResults(connection);
1244
1245 /* reset transaction state so the executor can accept next commands in transaction */
1246 transaction->transactionState = REMOTE_TRANS_STARTED;
1247 }
1248
1249
1250 /*
1251 * CheckRemoteTransactionsHealth checks if any of the participating transactions in a
1252 * coordinated transaction failed, and what consequence that should have.
1253 * This needs to be called before the coordinated transaction commits (but
1254 * after they've been PREPAREd if 2PC is in use).
1255 */
1256 void
CheckRemoteTransactionsHealth(void)1257 CheckRemoteTransactionsHealth(void)
1258 {
1259 dlist_iter iter;
1260
1261 dlist_foreach(iter, &InProgressTransactions)
1262 {
1263 MultiConnection *connection = dlist_container(MultiConnection, transactionNode,
1264 iter.cur);
1265 RemoteTransaction *transaction = &connection->remoteTransaction;
1266 PGTransactionStatusType status = PQtransactionStatus(connection->pgConn);
1267
1268 /* if the connection is in a bad state, so is the transaction's state */
1269 if (status == PQTRANS_INERROR || status == PQTRANS_UNKNOWN)
1270 {
1271 transaction->transactionFailed = true;
1272 }
1273
1274 /*
1275 * If a critical connection is marked as failed (and no error has been
1276 * raised yet) do so now.
1277 */
1278 if (transaction->transactionFailed && transaction->transactionCritical)
1279 {
1280 ereport(ERROR, (errmsg("failure on connection marked as essential: %s:%d",
1281 connection->hostname, connection->port)));
1282 }
1283 }
1284 }
1285
1286
1287 /*
1288 * Assign2PCIdentifier computes the 2PC transaction name to use for a
1289 * transaction. Every prepared transaction should get a new name, i.e. this
1290 * function will need to be called again.
1291 *
1292 * The format of the name is:
1293 *
1294 * citus_<source group>_<pid>_<distributed transaction number>_<connection number>
1295 *
1296 * (at most 5+1+10+1+10+1+20+1+10 = 59 characters, while limit is 64)
1297 *
1298 * The source group is used to distinguish 2PCs started by different
1299 * coordinators. A coordinator will only attempt to recover its own 2PCs.
1300 *
1301 * The pid is used to distinguish different processes on the coordinator, mainly
1302 * to provide some entropy across restarts.
1303 *
1304 * The distributed transaction number is used to distinguish different
1305 * transactions originating from the same node (since restart).
1306 *
1307 * The connection number is used to distinguish connections made to a node
1308 * within the same transaction.
1309 *
1310 */
1311 static void
Assign2PCIdentifier(MultiConnection * connection)1312 Assign2PCIdentifier(MultiConnection *connection)
1313 {
1314 /* local sequence number used to distinguish different connections */
1315 static uint32 connectionNumber = 0;
1316
1317 /* transaction identifier that is unique across processes */
1318 uint64 transactionNumber = CurrentDistributedTransactionNumber();
1319
1320 /* print all numbers as unsigned to guarantee no minus symbols appear in the name */
1321 SafeSnprintf(connection->remoteTransaction.preparedName, NAMEDATALEN,
1322 PREPARED_TRANSACTION_NAME_FORMAT, GetLocalGroupId(), MyProcPid,
1323 transactionNumber, connectionNumber++);
1324 }
1325
1326
1327 /*
1328 * ParsePreparedTransactionName parses a prepared transaction name to extract
1329 * the initiator group ID, initiator process ID, distributed transaction number,
1330 * and the connection number. If the transaction name does not match the expected
1331 * format ParsePreparedTransactionName returns false, and true otherwise.
1332 */
1333 bool
ParsePreparedTransactionName(char * preparedTransactionName,int32 * groupId,int * procId,uint64 * transactionNumber,uint32 * connectionNumber)1334 ParsePreparedTransactionName(char *preparedTransactionName,
1335 int32 *groupId, int *procId,
1336 uint64 *transactionNumber,
1337 uint32 *connectionNumber)
1338 {
1339 char *currentCharPointer = preparedTransactionName;
1340
1341 currentCharPointer = strchr(currentCharPointer, '_');
1342 if (currentCharPointer == NULL)
1343 {
1344 return false;
1345 }
1346
1347 /* step ahead of the current '_' character */
1348 ++currentCharPointer;
1349
1350 *groupId = strtol(currentCharPointer, NULL, 10);
1351
1352 if ((*groupId == COORDINATOR_GROUP_ID && errno == EINVAL) ||
1353 (*groupId == INT_MAX && errno == ERANGE))
1354 {
1355 return false;
1356 }
1357
1358 currentCharPointer = strchr(currentCharPointer, '_');
1359 if (currentCharPointer == NULL)
1360 {
1361 return false;
1362 }
1363
1364 /* step ahead of the current '_' character */
1365 ++currentCharPointer;
1366
1367 *procId = strtol(currentCharPointer, NULL, 10);
1368 if ((*procId == 0 && errno == EINVAL) ||
1369 (*procId == INT_MAX && errno == ERANGE))
1370 {
1371 return false;
1372 }
1373
1374 currentCharPointer = strchr(currentCharPointer, '_');
1375 if (currentCharPointer == NULL)
1376 {
1377 return false;
1378 }
1379
1380 /* step ahead of the current '_' character */
1381 ++currentCharPointer;
1382
1383 *transactionNumber = pg_strtouint64(currentCharPointer, NULL, 10);
1384 if ((*transactionNumber == 0 && errno != 0) ||
1385 (*transactionNumber == ULLONG_MAX && errno == ERANGE))
1386 {
1387 return false;
1388 }
1389
1390 currentCharPointer = strchr(currentCharPointer, '_');
1391 if (currentCharPointer == NULL)
1392 {
1393 return false;
1394 }
1395
1396 /* step ahead of the current '_' character */
1397 ++currentCharPointer;
1398
1399 *connectionNumber = strtoul(currentCharPointer, NULL, 10);
1400 if ((*connectionNumber == 0 && errno == EINVAL) ||
1401 (*connectionNumber == UINT_MAX && errno == ERANGE))
1402 {
1403 return false;
1404 }
1405
1406 return true;
1407 }
1408