1 /*
2 * This file and its contents are licensed under the Timescale License.
3 * Please see the included NOTICE for copyright information and
4 * LICENSE-TIMESCALE for a copy of the license.
5 */
6 #include "libpq-fe.h"
7 #include <postgres.h>
8 #include <access/xact.h>
9 #include <utils/builtins.h>
10 #include <utils/snapmgr.h>
11 #include <libpq-fe.h>
12 #include <miscadmin.h>
13
14 #include "remote/async.h"
15 #include "remote/txn_store.h"
16 #include "txn.h"
17 #include "connection.h"
18 #include "scanner.h"
19 #include "catalog.h"
20 #include "txn_id.h"
21
22 /* This seemingly long timeout matches what postgres_fdw uses. */
23 #define DEFAULT_EXEC_CLEANUP_TIMEOUT_MS 30000
24
25 /*
26 * This RemoteTxn represents one remote end in a distributed txn.
27 * Thus, a distributed txn is made up of a collection remote txn.
28 * Each remote txn corresponds to one remote connection and there
29 * is a unique remote connection per TSConnectionId used in the
30 * distributed txn. Because of this uniqueness property,
31 * the connection id appears first in the object, to allow
32 * it to be a hash key.
33 *
34 * The "conn" pointer can be NULL if we don't currently have a live connection.
35 * When we do have a connection, xact_depth tracks the current depth of
36 * transactions and subtransactions open on the remote side. We need to issue
37 * commands at the same nesting depth on the remote as we're executing at
38 * ourselves, so that rolling back a subtransaction will kill the right
39 * queries and not the wrong ones.
40 */
41
42 typedef struct RemoteTxn
43 {
44 TSConnectionId id; /* hash key (must be first) */
45 TSConnection *conn; /* connection to data node, or NULL */
46 /* Remaining fields are invalid when conn is NULL: */
47 bool have_prep_stmt; /* have we prepared any stmts in this xact? */
48 bool have_subtxn_error; /* have any subxacts aborted in this xact? */
49 RemoteTxnId *remote_txn_id;
50 } RemoteTxn;
51
52 /*
53 * Start remote transaction or subtransaction, if it hasn't been
54 * already started (e.g. by a previous command in the same txn).
55 *
56 * We always use at least REPEATABLE READ in the remote session.
57 * This is important even for cases where we use the a single connection to
58 * a data node. This is because a single command from the access node may cause
59 * multiple remote commands to be executed (e.g. a join of two tables on one remote
60 * node might not be pushed down and instead two different queries are sent
61 * to the remote node, one for each table in the join). Since in READ
62 * COMMITED the snapshot is refreshed on each command, the semantics are off
63 * when multiple commands are meant to be part of the same one.
64 *
65 * This isn't great but we have no alternative unless we ensure that each access
66 * node command always translates to one data node query or if we had some other way to
67 * control which remote queries share a snapshot or when a snapshot is refreshed.
68 *
69 * NOTE: this does not guarantee any kind of snapshot isolation to different connections
70 * to the same data node. That only happens if we use multiple connection ids to the same data node
71 * in one access node transaction. Thus, such connections that use different users will potentially
72 * see inconsistent results. To solve this problem of inconsistent results, we could export the
73 * snapshot of the first connection to a remote node using pg_export_snapshot() and then use that
74 * using SET TRANSACTION SNAPSHOT xxxx across all other connections to that node during the
75 * transaction. However, given that we currently don't have snapshot isolation across different
76 * nodes, we don't want to commit to the overhead of exporting snapshots at this time.
77 */
78 void
remote_txn_begin(RemoteTxn * entry,int curlevel)79 remote_txn_begin(RemoteTxn *entry, int curlevel)
80 {
81 int xact_depth = remote_connection_xact_depth_get(entry->conn);
82
83 /* Start main transaction if we haven't yet */
84 if (xact_depth == 0)
85 {
86 const char *sql;
87
88 Assert(remote_connection_get_status(entry->conn) == CONN_IDLE);
89 elog(DEBUG3, "starting remote transaction on connection %p", entry->conn);
90
91 if (IsolationIsSerializable())
92 sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
93 else
94 sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
95
96 remote_connection_xact_transition_begin(entry->conn);
97 remote_connection_cmd_ok(entry->conn, sql);
98 remote_connection_xact_transition_end(entry->conn);
99 xact_depth = remote_connection_xact_depth_inc(entry->conn);
100 }
101 /* If the connection is in COPY mode, then exit out of it */
102 else if (remote_connection_get_status(entry->conn) == CONN_COPY_IN)
103 {
104 TSConnectionError err;
105
106 if (!remote_connection_end_copy(entry->conn, &err))
107 remote_connection_error_elog(&err, ERROR);
108 }
109
110 /*
111 * If we're in a subtransaction, stack up savepoints to match our level.
112 * This ensures we can rollback just the desired effects when a
113 * subtransaction aborts.
114 */
115 while (xact_depth < curlevel)
116 {
117 remote_connection_xact_transition_begin(entry->conn);
118 remote_connection_cmdf_ok(entry->conn, "SAVEPOINT s%d", xact_depth + 1);
119 remote_connection_xact_transition_end(entry->conn);
120 xact_depth = remote_connection_xact_depth_inc(entry->conn);
121 }
122 }
123
124 bool
remote_txn_is_still_in_progress(TransactionId access_node_xid)125 remote_txn_is_still_in_progress(TransactionId access_node_xid)
126 {
127 if (TransactionIdIsCurrentTransactionId(access_node_xid))
128 elog(ERROR, "checking if a commit is still in progress on same txn");
129
130 return XidInMVCCSnapshot(access_node_xid, GetTransactionSnapshot());
131 }
132
133 size_t
remote_txn_size()134 remote_txn_size()
135 {
136 return sizeof(RemoteTxn);
137 }
138
139 void
remote_txn_init(RemoteTxn * entry,TSConnection * conn)140 remote_txn_init(RemoteTxn *entry, TSConnection *conn)
141 {
142 Assert(NULL != conn);
143 Assert(remote_connection_xact_depth_get(conn) == 0);
144
145 /* Reset all transient state fields, to be sure all are clean */
146 entry->have_prep_stmt = false;
147 entry->have_subtxn_error = false;
148 entry->remote_txn_id = NULL;
149
150 /* Now try to make the connection */
151 /* in connection */
152 entry->conn = conn;
153
154 elog(DEBUG3,
155 "new connection %p for data node \"%s\" (server "
156 "oid %u, userid %u)",
157 entry->conn,
158 remote_connection_node_name(conn),
159 entry->id.server_id,
160 entry->id.user_id);
161 }
162
163 RemoteTxn *
remote_txn_begin_on_connection(TSConnection * conn)164 remote_txn_begin_on_connection(TSConnection *conn)
165 {
166 RemoteTxn *txn = palloc0(sizeof(RemoteTxn));
167
168 remote_txn_init(txn, conn);
169 remote_txn_begin(txn, GetCurrentTransactionNestLevel());
170
171 return txn;
172 }
173
174 void
remote_txn_set_will_prep_statement(RemoteTxn * entry,RemoteTxnPrepStmtOption prep_stmt_option)175 remote_txn_set_will_prep_statement(RemoteTxn *entry, RemoteTxnPrepStmtOption prep_stmt_option)
176 {
177 bool will_prep_stmt = (prep_stmt_option == REMOTE_TXN_USE_PREP_STMT);
178
179 entry->have_prep_stmt |= will_prep_stmt;
180 }
181
182 TSConnection *
remote_txn_get_connection(RemoteTxn * txn)183 remote_txn_get_connection(RemoteTxn *txn)
184 {
185 return txn->conn;
186 }
187
188 TSConnectionId
remote_txn_get_connection_id(RemoteTxn * txn)189 remote_txn_get_connection_id(RemoteTxn *txn)
190 {
191 return txn->id;
192 }
193
194 void
remote_txn_report_prepare_transaction_result(RemoteTxn * txn,bool success)195 remote_txn_report_prepare_transaction_result(RemoteTxn *txn, bool success)
196 {
197 if (!success)
198 txn->remote_txn_id = NULL;
199 }
200
201 /*
202 * This function submits commands to remote nodes during (sub)abort processing.
203 * Because remote nodes can be in a weird state and at the same time errors should
204 * not be thrown here, the processing here is a bit different.
205 *
206 * We submit a query during and wait up to 30 seconds for the result. All errors
207 * are reported as WARNINGS into the log.
208 *
209 * If the query is executed without error, the return value is true.
210 * If the query can't be sent, errors out, or times out, the return value is false.
211 */
212 static bool
exec_cleanup_command(TSConnection * conn,const char * query)213 exec_cleanup_command(TSConnection *conn, const char *query)
214 {
215 TimestampTz end_time;
216 AsyncRequest *req;
217 AsyncResponseResult *result;
218 AsyncResponse *response = NULL;
219 PGresult *pg_result;
220 bool success = false;
221
222 /*
223 * If it takes too long to execute a cleanup query, assume the connection
224 * is dead. It's fairly likely that this is why we aborted in the first
225 * place (e.g. statement timeout, user cancel), so the timeout shouldn't
226 * be too long.
227 */
228 end_time = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), DEFAULT_EXEC_CLEANUP_TIMEOUT_MS);
229
230 /*
231 * Send the query. Since we don't use non-blocking mode, this also can
232 * block. But its risk is relatively small, so we ignore that for now.
233 */
234 req = async_request_send_with_error(conn, query, WARNING);
235
236 if (req == NULL)
237 return false;
238
239 /* Wait until the command completes or there is a timeout or error */
240 response = async_request_cleanup_result(req, end_time);
241 Assert(response != NULL);
242
243 switch (async_response_get_type(response))
244 {
245 case RESPONSE_TIMEOUT:
246 elog(DEBUG3, "abort processing: timeout executing %s", query);
247 success = false;
248 break;
249 case RESPONSE_COMMUNICATION_ERROR:
250 elog(DEBUG3, "abort processing: communication error executing %s", query);
251 success = false;
252 break;
253 case RESPONSE_ERROR:
254 elog(DEBUG3, "abort processing: error while executing %s", query);
255 success = false;
256 break;
257 case RESPONSE_RESULT:
258 result = (AsyncResponseResult *) response;
259 pg_result = async_response_result_get_pg_result(result);
260 if (PQresultStatus(pg_result) != PGRES_COMMAND_OK)
261 {
262 elog(DEBUG3, "abort processing: error in result executing %s", query);
263 success = false;
264 }
265 else
266 success = true;
267 break;
268 case RESPONSE_ROW:
269 elog(DEBUG3,
270 "abort processing: unexpected response type %d while executing %s",
271 async_response_get_type(response),
272 query);
273 success = false;
274 break;
275 }
276
277 if (!success)
278 async_response_report_error(response, WARNING);
279
280 async_response_close(response);
281
282 return success;
283 }
284
285 #ifdef DEBUG
286 /* Prepared statements can leak if the were created during a subtxn
287 * and the subtxn rolled back before the prepared stmt was deallocated.
288 * This function checks for such leaks inside of tests (thus only compiled
289 * in DEBUG mode). It can be quite expensive so not run under normal operations.
290 */
291 void
remote_txn_check_for_leaked_prepared_statements(RemoteTxn * entry)292 remote_txn_check_for_leaked_prepared_statements(RemoteTxn *entry)
293 {
294 PGresult *res;
295 char *count_string;
296 ExecStatusType status;
297
298 if (PQTRANS_IDLE != PQtransactionStatus(remote_connection_get_pg_conn(entry->conn)))
299 return;
300
301 res = remote_connection_exec(entry->conn, "SELECT count(*) FROM pg_prepared_statements");
302
303 status = PQresultStatus(res);
304
305 switch (status)
306 {
307 case PGRES_TUPLES_OK:
308 if (PQntuples(res) == 1 && PQnfields(res) == 1)
309 {
310 count_string = PQgetvalue(res, 0, 0);
311 if (strcmp("0", count_string) != 0)
312 elog(WARNING, "leak check: connection leaked prepared statement");
313 }
314 else
315 elog(ERROR, "leak check: unexpected number of rows or columns returned");
316 break;
317 case PGRES_FATAL_ERROR:
318 case PGRES_NONFATAL_ERROR:
319 elog(WARNING, "leak check: ERROR [\"%s\"]", PQresultErrorMessage(res));
320 break;
321 default:
322 elog(WARNING, "leak check: unexpected result state %u", status);
323 break;
324 }
325
326 remote_result_close(res);
327 }
328 #endif
329
330 bool
remote_txn_abort(RemoteTxn * entry)331 remote_txn_abort(RemoteTxn *entry)
332 {
333 const char *abort_sql;
334 bool success = true;
335
336 if (entry->remote_txn_id == NULL)
337 {
338 /* Rollback a regular (non two-phase commit) transaction */
339 abort_sql = "ROLLBACK TRANSACTION";
340 }
341 else
342 {
343 /* Rollback a transaction prepared for two-phase commit (PREPARE
344 * TRANSACTION) */
345 abort_sql = remote_txn_id_rollback_prepared_sql(entry->remote_txn_id);
346 }
347
348 entry->remote_txn_id = NULL;
349
350 Assert(entry->conn != NULL);
351 Assert(remote_connection_xact_depth_get(entry->conn) > 0);
352
353 elog(DEBUG3, "aborting remote transaction on connection %p", entry->conn);
354
355 /* Already in bad state */
356 if (remote_connection_xact_is_transitioning(entry->conn))
357 return false;
358 else if (in_error_recursion_trouble() ||
359 PQstatus(remote_connection_get_pg_conn(entry->conn)) == CONNECTION_BAD)
360 {
361 /*
362 * Don't try to recover the connection if we're already in error
363 * recursion trouble or the connection is bad. Instead, mark it as a
364 * failed transition. This is a really bad case and so controlled
365 * cleanup cannot happen here. The calling function will instead break
366 * this ongoing connection and so no cleanup is necessary.
367 */
368 remote_connection_xact_transition_begin(entry->conn);
369 return false;
370 }
371
372 /* Mark the connection as transitioning to new transaction state */
373 remote_connection_xact_transition_begin(entry->conn);
374
375 /*
376 * Check if a command has been submitted to the data node by using an
377 * asynchronous execution function and the command had not yet completed.
378 * If so, request cancellation of the command.
379 */
380 if (PQtransactionStatus(remote_connection_get_pg_conn(entry->conn)) == PQTRANS_ACTIVE)
381 success = remote_connection_cancel_query(entry->conn);
382
383 if (success)
384 {
385 /* At this point any on going queries should have completed */
386 remote_connection_set_status(entry->conn, CONN_IDLE);
387 success = exec_cleanup_command(entry->conn, abort_sql);
388 }
389
390 /*
391 * Assume we might may have not deallocated all the prepared statements we
392 * created because the deallocation would have happened after the abort.
393 *
394 * prepared stmts are per session not per transaction. But we don't want
395 * prepared_stmts to survive transactions in our use case.
396 */
397 if (success && entry->have_prep_stmt)
398 success = exec_cleanup_command(entry->conn, "DEALLOCATE ALL");
399
400 if (success)
401 {
402 entry->have_prep_stmt = false;
403 entry->have_subtxn_error = false;
404
405 /* Everything succeeded, so we have finished transitioning */
406 remote_connection_xact_transition_end(entry->conn);
407 }
408
409 return success;
410 }
411
412 /* Check if there is ongoing transaction on the remote node */
413 bool
remote_txn_is_ongoing(RemoteTxn * entry)414 remote_txn_is_ongoing(RemoteTxn *entry)
415 {
416 Assert(remote_connection_xact_depth_get(entry->conn) >= 0);
417 return remote_connection_xact_depth_get(entry->conn) > 0;
418 }
419
420 /*
421 * If there were any errors in subtransactions, and we made prepared
422 * statements, those prepared statements may not have been cleared
423 * because of the subtxn error. Thus, do a DEALLOCATE ALL to make sure
424 * we get rid of all prepared statements.
425 *
426 * This is annoying and not terribly bulletproof, but it's
427 * probably not worth trying harder.
428 */
429 void
remote_txn_deallocate_prepared_stmts_if_needed(RemoteTxn * entry)430 remote_txn_deallocate_prepared_stmts_if_needed(RemoteTxn *entry)
431 {
432 Assert(entry->conn != NULL && remote_connection_xact_depth_get(entry->conn) > 0);
433
434 if (entry->have_prep_stmt && entry->have_subtxn_error)
435 {
436 AsyncRequestSet *set = async_request_set_create();
437 AsyncResponse *response;
438
439 async_request_set_add(set, async_request_send(entry->conn, "DEALLOCATE ALL"));
440 response = async_request_set_wait_any_response(set);
441 async_response_report_error_or_close(response, WARNING);
442 response = async_request_set_wait_any_response(set);
443 Assert(response == NULL);
444 }
445 entry->have_prep_stmt = false;
446 entry->have_subtxn_error = false;
447 }
448
449 /*
450 * Ensure state changes are marked successful when a remote transaction
451 * completes asynchronously and successfully.
452 *
453 * We do this in a callback which is guaranteed to be called when a reponse is
454 * received or a timeout occurs.
455 *
456 * There is no decision on whether to fail or not in this callback; this is
457 * only to guarantee that we're always updating the internal connection
458 * state. Someone still has to handle the responses elsewehere.
459 */
460 static bool
on_remote_txn_response(AsyncRequest * req,AsyncResponse * rsp)461 on_remote_txn_response(AsyncRequest *req, AsyncResponse *rsp)
462 {
463 TSConnection *conn = async_request_get_connection(req);
464 bool success = false;
465
466 if (async_response_get_type(rsp) == RESPONSE_RESULT)
467 {
468 AsyncResponseResult *res = (AsyncResponseResult *) rsp;
469 PGresult *pgres = async_response_result_get_pg_result(res);
470
471 if (PQresultStatus(pgres) == PGRES_COMMAND_OK)
472 {
473 remote_connection_xact_transition_end(conn);
474 success = true;
475 }
476 }
477
478 return success;
479 }
480
481 static void
on_commit_or_commit_prepared_response(AsyncRequest * req,AsyncResponse * rsp,void * data)482 on_commit_or_commit_prepared_response(AsyncRequest *req, AsyncResponse *rsp, void *data)
483 {
484 on_remote_txn_response(req, rsp);
485 }
486
487 AsyncRequest *
remote_txn_async_send_commit(RemoteTxn * entry)488 remote_txn_async_send_commit(RemoteTxn *entry)
489 {
490 AsyncRequest *req;
491
492 Assert(entry->conn != NULL);
493 Assert(remote_connection_xact_depth_get(entry->conn) > 0);
494
495 elog(DEBUG3, "committing remote transaction on connection %p", entry->conn);
496
497 remote_connection_xact_transition_begin(entry->conn);
498 req = async_request_send(entry->conn, "COMMIT TRANSACTION");
499 async_request_set_response_callback(req, on_commit_or_commit_prepared_response, entry);
500
501 return req;
502 }
503
504 void
remote_txn_write_persistent_record(RemoteTxn * entry)505 remote_txn_write_persistent_record(RemoteTxn *entry)
506 {
507 entry->remote_txn_id = remote_txn_persistent_record_write(entry->id);
508 }
509
510 static void
on_prepare_transaction_response(AsyncRequest * req,AsyncResponse * rsp,void * data)511 on_prepare_transaction_response(AsyncRequest *req, AsyncResponse *rsp, void *data)
512 {
513 bool success = on_remote_txn_response(req, rsp);
514
515 if (!success)
516 {
517 RemoteTxn *txn = data;
518
519 /* If the prepare is not successful, reset the remote transaction ID
520 * to indicate we need to do a rollback */
521 txn->remote_txn_id = NULL;
522 }
523 }
524
525 AsyncRequest *
remote_txn_async_send_prepare_transaction(RemoteTxn * entry)526 remote_txn_async_send_prepare_transaction(RemoteTxn *entry)
527 {
528 AsyncRequest *req;
529
530 Assert(entry->conn != NULL);
531 Assert(remote_connection_xact_depth_get(entry->conn) > 0);
532 Assert(entry->remote_txn_id != NULL);
533
534 elog(DEBUG3,
535 "2pc: preparing remote transaction on connection %p: %s",
536 entry->conn,
537 remote_txn_id_out(entry->remote_txn_id));
538
539 remote_connection_xact_transition_begin(entry->conn);
540 req = async_request_send(entry->conn,
541 remote_txn_id_prepare_transaction_sql(entry->remote_txn_id));
542 async_request_set_response_callback(req, on_prepare_transaction_response, entry);
543
544 return req;
545 }
546
547 AsyncRequest *
remote_txn_async_send_commit_prepared(RemoteTxn * entry)548 remote_txn_async_send_commit_prepared(RemoteTxn *entry)
549 {
550 AsyncRequest *req;
551
552 Assert(entry->conn != NULL);
553 Assert(entry->remote_txn_id != NULL);
554
555 elog(DEBUG3,
556 "2pc: commiting remote transaction on connection %p: '%s'",
557 entry->conn,
558 remote_txn_id_out(entry->remote_txn_id));
559
560 remote_connection_xact_transition_begin(entry->conn);
561
562 req = async_request_send_with_error(entry->conn,
563 remote_txn_id_commit_prepared_sql(entry->remote_txn_id),
564 WARNING);
565 async_request_set_response_callback(req, on_commit_or_commit_prepared_response, entry);
566
567 return req;
568 }
569
570 /*
571 * Rollback a subtransaction to a given savepoint.
572 */
573 bool
remote_txn_sub_txn_abort(RemoteTxn * entry,int curlevel)574 remote_txn_sub_txn_abort(RemoteTxn *entry, int curlevel)
575 {
576 PGconn *pg_conn = remote_connection_get_pg_conn(entry->conn);
577 bool success = false;
578
579 Assert(remote_connection_xact_depth_get(entry->conn) == curlevel);
580 Assert(remote_connection_xact_depth_get(entry->conn) > 1);
581
582 if (in_error_recursion_trouble() && remote_connection_xact_is_transitioning(entry->conn))
583 remote_connection_xact_transition_begin(entry->conn);
584
585 if (!remote_connection_xact_is_transitioning(entry->conn))
586 {
587 StringInfoData sql;
588
589 initStringInfo(&sql);
590 entry->have_subtxn_error = true;
591 remote_connection_xact_transition_begin(entry->conn);
592
593 /*
594 * If a command has been submitted to the data node by using an
595 * asynchronous execution function, the command might not have yet
596 * completed. Check to see if a command is still being processed by the
597 * data node, and if so, request cancellation of the command.
598 */
599 if (PQtransactionStatus(pg_conn) == PQTRANS_ACTIVE &&
600 !remote_connection_cancel_query(entry->conn))
601 success = false;
602 else
603 {
604 /* Rollback all remote subtransactions during abort */
605 appendStringInfo(&sql, "ROLLBACK TO SAVEPOINT s%d", curlevel);
606 success = exec_cleanup_command(entry->conn, sql.data);
607
608 if (success)
609 {
610 resetStringInfo(&sql);
611 appendStringInfo(&sql, "RELEASE SAVEPOINT s%d", curlevel);
612 success = exec_cleanup_command(entry->conn, sql.data);
613 }
614 }
615
616 if (success)
617 remote_connection_xact_transition_end(entry->conn);
618 }
619
620 Assert(remote_connection_xact_depth_get(entry->conn) > 0);
621
622 return success;
623 }
624
625 bool
remote_txn_is_at_sub_txn_level(RemoteTxn * entry,int curlevel)626 remote_txn_is_at_sub_txn_level(RemoteTxn *entry, int curlevel)
627 {
628 int xact_depth;
629
630 /*
631 * We only care about connections with open remote subtransactions of the
632 * current level.
633 */
634 Assert(entry->conn != NULL);
635
636 xact_depth = remote_connection_xact_depth_get(entry->conn);
637
638 if (xact_depth < curlevel)
639 return false;
640
641 if (xact_depth > curlevel)
642 elog(ERROR, "missed cleaning up remote subtransaction at level %d", xact_depth);
643
644 Assert(xact_depth == curlevel);
645
646 return true;
647 }
648
649 void
remote_txn_sub_txn_pre_commit(RemoteTxn * entry,int curlevel)650 remote_txn_sub_txn_pre_commit(RemoteTxn *entry, int curlevel)
651 {
652 Assert(remote_connection_xact_depth_get(entry->conn) == curlevel);
653 Assert(remote_connection_xact_depth_get(entry->conn) > 0);
654 Assert(!remote_connection_xact_is_transitioning(entry->conn));
655
656 remote_connection_xact_transition_begin(entry->conn);
657 remote_connection_cmdf_ok(entry->conn, "RELEASE SAVEPOINT s%d", curlevel);
658 remote_connection_xact_transition_end(entry->conn);
659 }
660
661 /*
662 * Functions for storing a persistent transaction records for two-phase
663 * commit.
664 */
665 static int
persistent_record_pkey_scan(const RemoteTxnId * id,tuple_found_func tuple_found,LOCKMODE lock_mode)666 persistent_record_pkey_scan(const RemoteTxnId *id, tuple_found_func tuple_found, LOCKMODE lock_mode)
667 {
668 Catalog *catalog = ts_catalog_get();
669 ScanKeyData scankey[1];
670 ScannerCtx scanctx = {
671 .table = catalog->tables[REMOTE_TXN].id,
672 .index = catalog_get_index(catalog, REMOTE_TXN, REMOTE_TXN_PKEY_IDX),
673 .nkeys = 1,
674 .scankey = scankey,
675 .tuple_found = tuple_found,
676 .lockmode = lock_mode,
677 .limit = 1,
678 .scandirection = ForwardScanDirection,
679 };
680
681 ScanKeyInit(&scankey[0],
682 Anum_remote_txn_pkey_idx_remote_transaction_id,
683 BTEqualStrategyNumber,
684 F_TEXTEQ,
685 CStringGetTextDatum(remote_txn_id_out(id)));
686
687 return ts_scanner_scan(&scanctx);
688 }
689
690 bool
remote_txn_persistent_record_exists(const RemoteTxnId * parsed)691 remote_txn_persistent_record_exists(const RemoteTxnId *parsed)
692 {
693 return persistent_record_pkey_scan(parsed, NULL, AccessShareLock) > 0;
694 }
695
696 static ScanTupleResult
persistent_record_tuple_delete(TupleInfo * ti,void * data)697 persistent_record_tuple_delete(TupleInfo *ti, void *data)
698 {
699 ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti));
700 return SCAN_CONTINUE;
701 }
702
703 int
remote_txn_persistent_record_delete_for_data_node(Oid foreign_server_oid)704 remote_txn_persistent_record_delete_for_data_node(Oid foreign_server_oid)
705 {
706 Catalog *catalog = ts_catalog_get();
707 ScanKeyData scankey[1];
708 ScannerCtx scanctx;
709 ForeignServer *server = GetForeignServer(foreign_server_oid);
710
711 ScanKeyInit(&scankey[0],
712 Anum_remote_txn_data_node_name_idx_data_node_name,
713 BTEqualStrategyNumber,
714 F_NAMEEQ,
715 CStringGetDatum(server->servername));
716
717 scanctx = (ScannerCtx){
718 .table = catalog->tables[REMOTE_TXN].id,
719 .index = catalog_get_index(catalog, REMOTE_TXN, REMOTE_TXN_DATA_NODE_NAME_IDX),
720 .nkeys = 1,
721 .scankey = scankey,
722 .tuple_found = persistent_record_tuple_delete,
723 .lockmode = RowExclusiveLock,
724 .scandirection = ForwardScanDirection,
725 };
726
727 return ts_scanner_scan(&scanctx);
728 }
729
730 static void
persistent_record_insert_relation(Relation rel,RemoteTxnId * id)731 persistent_record_insert_relation(Relation rel, RemoteTxnId *id)
732 {
733 TupleDesc desc = RelationGetDescr(rel);
734 Datum values[Natts_remote_txn];
735 bool nulls[Natts_remote_txn] = { false };
736 CatalogSecurityContext sec_ctx;
737 ForeignServer *server = GetForeignServer(id->id.server_id);
738
739 values[AttrNumberGetAttrOffset(Anum_remote_txn_data_node_name)] =
740 DirectFunctionCall1(namein, CStringGetDatum(server->servername));
741 values[AttrNumberGetAttrOffset(Anum_remote_txn_remote_transaction_id)] =
742 CStringGetTextDatum(remote_txn_id_out(id));
743
744 ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
745 ts_catalog_insert_values(rel, desc, values, nulls);
746 ts_catalog_restore_user(&sec_ctx);
747 }
748
749 /*
750 * Add a commit record to catalog.
751 */
752 RemoteTxnId *
remote_txn_persistent_record_write(TSConnectionId cid)753 remote_txn_persistent_record_write(TSConnectionId cid)
754 {
755 RemoteTxnId *id = remote_txn_id_create(GetTopTransactionId(), cid);
756 Catalog *catalog = ts_catalog_get();
757 Relation rel;
758
759 rel = table_open(catalog->tables[REMOTE_TXN].id, RowExclusiveLock);
760 persistent_record_insert_relation(rel, id);
761
762 /* Keep the table lock until transaction completes in order to
763 * synchronize with distributed restore point creation */
764 table_close(rel, NoLock);
765 return id;
766 }
767