1 /*
2  * This file and its contents are licensed under the Timescale License.
3  * Please see the included NOTICE for copyright information and
4  * LICENSE-TIMESCALE for a copy of the license.
5  */
6 #include "libpq-fe.h"
7 #include <postgres.h>
8 #include <access/xact.h>
9 #include <utils/builtins.h>
10 #include <utils/snapmgr.h>
11 #include <libpq-fe.h>
12 #include <miscadmin.h>
13 
14 #include "remote/async.h"
15 #include "remote/txn_store.h"
16 #include "txn.h"
17 #include "connection.h"
18 #include "scanner.h"
19 #include "catalog.h"
20 #include "txn_id.h"
21 
22 /* This seemingly long timeout matches what postgres_fdw uses. */
23 #define DEFAULT_EXEC_CLEANUP_TIMEOUT_MS 30000
24 
25 /*
26  * This RemoteTxn represents one remote end in a distributed txn.
27  * Thus, a distributed txn is made up of a collection remote txn.
28  * Each remote txn corresponds to one remote connection and there
29  * is a unique remote connection per TSConnectionId used in the
30  * distributed txn. Because of this uniqueness property,
31  * the connection id appears first in the object, to allow
32  * it to be a hash key.
33  *
34  * The "conn" pointer can be NULL if we don't currently have a live connection.
35  * When we do have a connection, xact_depth tracks the current depth of
36  * transactions and subtransactions open on the remote side.  We need to issue
37  * commands at the same nesting depth on the remote as we're executing at
38  * ourselves, so that rolling back a subtransaction will kill the right
39  * queries and not the wrong ones.
40  */
41 
42 typedef struct RemoteTxn
43 {
44 	TSConnectionId id;  /* hash key (must be first) */
45 	TSConnection *conn; /* connection to data node, or NULL */
46 	/* Remaining fields are invalid when conn is NULL: */
47 	bool have_prep_stmt;	/* have we prepared any stmts in this xact? */
48 	bool have_subtxn_error; /* have any subxacts aborted in this xact? */
49 	RemoteTxnId *remote_txn_id;
50 } RemoteTxn;
51 
52 /*
53  * Start remote transaction or subtransaction, if it hasn't been
54  * already started (e.g. by a previous command in the same txn).
55  *
56  * We always use at least REPEATABLE READ in the remote session.
57  * This is important even for cases where we use the a single connection to
58  * a data node. This is because a single command from the access node may cause
59  * multiple remote commands to be executed (e.g. a join of two tables on one remote
60  * node might not be pushed down and instead two different queries are sent
61  * to the remote node, one for each table in the join). Since in READ
62  * COMMITED the snapshot is refreshed on each command, the semantics are off
63  * when multiple commands are meant to be part of the same one.
64  *
65  * This isn't great but we have no alternative unless we ensure that each access
66  * node command always translates to one data node query or if we had some other way to
67  * control which remote queries share a snapshot or when a snapshot is refreshed.
68  *
69  * NOTE: this does not guarantee any kind of snapshot isolation to different connections
70  * to the same data node. That only happens if we use multiple connection ids to the same data node
71  * in one access node transaction. Thus, such connections that use different users will potentially
72  * see inconsistent results. To solve this problem of inconsistent results, we could export the
73  * snapshot of the first connection to a remote node using pg_export_snapshot() and then use that
74  * using SET TRANSACTION SNAPSHOT xxxx across all other connections to that node during the
75  * transaction. However, given that we currently don't have snapshot isolation across different
76  * nodes, we don't want to commit to the overhead of exporting snapshots at this time.
77  */
78 void
remote_txn_begin(RemoteTxn * entry,int curlevel)79 remote_txn_begin(RemoteTxn *entry, int curlevel)
80 {
81 	int xact_depth = remote_connection_xact_depth_get(entry->conn);
82 
83 	/* Start main transaction if we haven't yet */
84 	if (xact_depth == 0)
85 	{
86 		const char *sql;
87 
88 		Assert(remote_connection_get_status(entry->conn) == CONN_IDLE);
89 		elog(DEBUG3, "starting remote transaction on connection %p", entry->conn);
90 
91 		if (IsolationIsSerializable())
92 			sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
93 		else
94 			sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
95 
96 		remote_connection_xact_transition_begin(entry->conn);
97 		remote_connection_cmd_ok(entry->conn, sql);
98 		remote_connection_xact_transition_end(entry->conn);
99 		xact_depth = remote_connection_xact_depth_inc(entry->conn);
100 	}
101 	/* If the connection is in COPY mode, then exit out of it */
102 	else if (remote_connection_get_status(entry->conn) == CONN_COPY_IN)
103 	{
104 		TSConnectionError err;
105 
106 		if (!remote_connection_end_copy(entry->conn, &err))
107 			remote_connection_error_elog(&err, ERROR);
108 	}
109 
110 	/*
111 	 * If we're in a subtransaction, stack up savepoints to match our level.
112 	 * This ensures we can rollback just the desired effects when a
113 	 * subtransaction aborts.
114 	 */
115 	while (xact_depth < curlevel)
116 	{
117 		remote_connection_xact_transition_begin(entry->conn);
118 		remote_connection_cmdf_ok(entry->conn, "SAVEPOINT s%d", xact_depth + 1);
119 		remote_connection_xact_transition_end(entry->conn);
120 		xact_depth = remote_connection_xact_depth_inc(entry->conn);
121 	}
122 }
123 
124 bool
remote_txn_is_still_in_progress(TransactionId access_node_xid)125 remote_txn_is_still_in_progress(TransactionId access_node_xid)
126 {
127 	if (TransactionIdIsCurrentTransactionId(access_node_xid))
128 		elog(ERROR, "checking if a commit is still in progress on same txn");
129 
130 	return XidInMVCCSnapshot(access_node_xid, GetTransactionSnapshot());
131 }
132 
133 size_t
remote_txn_size()134 remote_txn_size()
135 {
136 	return sizeof(RemoteTxn);
137 }
138 
139 void
remote_txn_init(RemoteTxn * entry,TSConnection * conn)140 remote_txn_init(RemoteTxn *entry, TSConnection *conn)
141 {
142 	Assert(NULL != conn);
143 	Assert(remote_connection_xact_depth_get(conn) == 0);
144 
145 	/* Reset all transient state fields, to be sure all are clean */
146 	entry->have_prep_stmt = false;
147 	entry->have_subtxn_error = false;
148 	entry->remote_txn_id = NULL;
149 
150 	/* Now try to make the connection */
151 	/* in connection  */
152 	entry->conn = conn;
153 
154 	elog(DEBUG3,
155 		 "new connection %p for data node \"%s\" (server "
156 		 "oid %u, userid %u)",
157 		 entry->conn,
158 		 remote_connection_node_name(conn),
159 		 entry->id.server_id,
160 		 entry->id.user_id);
161 }
162 
163 RemoteTxn *
remote_txn_begin_on_connection(TSConnection * conn)164 remote_txn_begin_on_connection(TSConnection *conn)
165 {
166 	RemoteTxn *txn = palloc0(sizeof(RemoteTxn));
167 
168 	remote_txn_init(txn, conn);
169 	remote_txn_begin(txn, GetCurrentTransactionNestLevel());
170 
171 	return txn;
172 }
173 
174 void
remote_txn_set_will_prep_statement(RemoteTxn * entry,RemoteTxnPrepStmtOption prep_stmt_option)175 remote_txn_set_will_prep_statement(RemoteTxn *entry, RemoteTxnPrepStmtOption prep_stmt_option)
176 {
177 	bool will_prep_stmt = (prep_stmt_option == REMOTE_TXN_USE_PREP_STMT);
178 
179 	entry->have_prep_stmt |= will_prep_stmt;
180 }
181 
182 TSConnection *
remote_txn_get_connection(RemoteTxn * txn)183 remote_txn_get_connection(RemoteTxn *txn)
184 {
185 	return txn->conn;
186 }
187 
188 TSConnectionId
remote_txn_get_connection_id(RemoteTxn * txn)189 remote_txn_get_connection_id(RemoteTxn *txn)
190 {
191 	return txn->id;
192 }
193 
194 void
remote_txn_report_prepare_transaction_result(RemoteTxn * txn,bool success)195 remote_txn_report_prepare_transaction_result(RemoteTxn *txn, bool success)
196 {
197 	if (!success)
198 		txn->remote_txn_id = NULL;
199 }
200 
201 /*
202  * This function submits commands to remote nodes during (sub)abort processing.
203  * Because remote nodes can be in a weird state and at the same time errors should
204  * not be thrown here, the processing here is a bit different.
205  *
206  * We submit a query during and wait up to 30 seconds for the result. All errors
207  * are reported as WARNINGS into the log.
208  *
209  * If the query is executed without error, the return value is true.
210  * If the query can't be sent, errors out, or times out, the return value is false.
211  */
212 static bool
exec_cleanup_command(TSConnection * conn,const char * query)213 exec_cleanup_command(TSConnection *conn, const char *query)
214 {
215 	TimestampTz end_time;
216 	AsyncRequest *req;
217 	AsyncResponseResult *result;
218 	AsyncResponse *response = NULL;
219 	PGresult *pg_result;
220 	bool success = false;
221 
222 	/*
223 	 * If it takes too long to execute a cleanup query, assume the connection
224 	 * is dead.  It's fairly likely that this is why we aborted in the first
225 	 * place (e.g. statement timeout, user cancel), so the timeout shouldn't
226 	 * be too long.
227 	 */
228 	end_time = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), DEFAULT_EXEC_CLEANUP_TIMEOUT_MS);
229 
230 	/*
231 	 * Send the query. Since we don't use non-blocking mode, this also can
232 	 * block. But its risk is relatively small, so we ignore that for now.
233 	 */
234 	req = async_request_send_with_error(conn, query, WARNING);
235 
236 	if (req == NULL)
237 		return false;
238 
239 	/* Wait until the command completes or there is a timeout or error */
240 	response = async_request_cleanup_result(req, end_time);
241 	Assert(response != NULL);
242 
243 	switch (async_response_get_type(response))
244 	{
245 		case RESPONSE_TIMEOUT:
246 			elog(DEBUG3, "abort processing: timeout executing %s", query);
247 			success = false;
248 			break;
249 		case RESPONSE_COMMUNICATION_ERROR:
250 			elog(DEBUG3, "abort processing: communication error executing %s", query);
251 			success = false;
252 			break;
253 		case RESPONSE_ERROR:
254 			elog(DEBUG3, "abort processing: error while executing %s", query);
255 			success = false;
256 			break;
257 		case RESPONSE_RESULT:
258 			result = (AsyncResponseResult *) response;
259 			pg_result = async_response_result_get_pg_result(result);
260 			if (PQresultStatus(pg_result) != PGRES_COMMAND_OK)
261 			{
262 				elog(DEBUG3, "abort processing: error in result executing %s", query);
263 				success = false;
264 			}
265 			else
266 				success = true;
267 			break;
268 		case RESPONSE_ROW:
269 			elog(DEBUG3,
270 				 "abort processing: unexpected response type %d while executing %s",
271 				 async_response_get_type(response),
272 				 query);
273 			success = false;
274 			break;
275 	}
276 
277 	if (!success)
278 		async_response_report_error(response, WARNING);
279 
280 	async_response_close(response);
281 
282 	return success;
283 }
284 
285 #ifdef DEBUG
286 /* Prepared statements can leak if the were created during a subtxn
287  * and the subtxn rolled back before the prepared stmt was deallocated.
288  * This function checks for such leaks inside of tests (thus only compiled
289  * in DEBUG mode). It can be quite expensive so not run under normal operations.
290  */
291 void
remote_txn_check_for_leaked_prepared_statements(RemoteTxn * entry)292 remote_txn_check_for_leaked_prepared_statements(RemoteTxn *entry)
293 {
294 	PGresult *res;
295 	char *count_string;
296 	ExecStatusType status;
297 
298 	if (PQTRANS_IDLE != PQtransactionStatus(remote_connection_get_pg_conn(entry->conn)))
299 		return;
300 
301 	res = remote_connection_exec(entry->conn, "SELECT count(*) FROM pg_prepared_statements");
302 
303 	status = PQresultStatus(res);
304 
305 	switch (status)
306 	{
307 		case PGRES_TUPLES_OK:
308 			if (PQntuples(res) == 1 && PQnfields(res) == 1)
309 			{
310 				count_string = PQgetvalue(res, 0, 0);
311 				if (strcmp("0", count_string) != 0)
312 					elog(WARNING, "leak check: connection leaked prepared statement");
313 			}
314 			else
315 				elog(ERROR, "leak check: unexpected number of rows or columns returned");
316 			break;
317 		case PGRES_FATAL_ERROR:
318 		case PGRES_NONFATAL_ERROR:
319 			elog(WARNING, "leak check: ERROR [\"%s\"]", PQresultErrorMessage(res));
320 			break;
321 		default:
322 			elog(WARNING, "leak check: unexpected result state %u", status);
323 			break;
324 	}
325 
326 	remote_result_close(res);
327 }
328 #endif
329 
330 bool
remote_txn_abort(RemoteTxn * entry)331 remote_txn_abort(RemoteTxn *entry)
332 {
333 	const char *abort_sql;
334 	bool success = true;
335 
336 	if (entry->remote_txn_id == NULL)
337 	{
338 		/* Rollback a regular (non two-phase commit) transaction */
339 		abort_sql = "ROLLBACK TRANSACTION";
340 	}
341 	else
342 	{
343 		/* Rollback a transaction prepared for two-phase commit (PREPARE
344 		 * TRANSACTION) */
345 		abort_sql = remote_txn_id_rollback_prepared_sql(entry->remote_txn_id);
346 	}
347 
348 	entry->remote_txn_id = NULL;
349 
350 	Assert(entry->conn != NULL);
351 	Assert(remote_connection_xact_depth_get(entry->conn) > 0);
352 
353 	elog(DEBUG3, "aborting remote transaction on connection %p", entry->conn);
354 
355 	/* Already in bad state */
356 	if (remote_connection_xact_is_transitioning(entry->conn))
357 		return false;
358 	else if (in_error_recursion_trouble() ||
359 			 PQstatus(remote_connection_get_pg_conn(entry->conn)) == CONNECTION_BAD)
360 	{
361 		/*
362 		 * Don't try to recover the connection if we're already in error
363 		 * recursion trouble or the connection is bad. Instead, mark it as a
364 		 * failed transition. This is a really bad case and so controlled
365 		 * cleanup cannot happen here. The calling function will instead break
366 		 * this ongoing connection and so no cleanup is necessary.
367 		 */
368 		remote_connection_xact_transition_begin(entry->conn);
369 		return false;
370 	}
371 
372 	/* Mark the connection as transitioning to new transaction state */
373 	remote_connection_xact_transition_begin(entry->conn);
374 
375 	/*
376 	 * Check if a command has been submitted to the data node by using an
377 	 * asynchronous execution function and the command had not yet completed.
378 	 * If so, request cancellation of the command.
379 	 */
380 	if (PQtransactionStatus(remote_connection_get_pg_conn(entry->conn)) == PQTRANS_ACTIVE)
381 		success = remote_connection_cancel_query(entry->conn);
382 
383 	if (success)
384 	{
385 		/* At this point any on going queries should have completed */
386 		remote_connection_set_status(entry->conn, CONN_IDLE);
387 		success = exec_cleanup_command(entry->conn, abort_sql);
388 	}
389 
390 	/*
391 	 * Assume we might may have not deallocated all the prepared statements we
392 	 * created because the deallocation would have happened after the abort.
393 	 *
394 	 * prepared stmts are per session not per transaction. But we don't want
395 	 * prepared_stmts to survive transactions in our use case.
396 	 */
397 	if (success && entry->have_prep_stmt)
398 		success = exec_cleanup_command(entry->conn, "DEALLOCATE ALL");
399 
400 	if (success)
401 	{
402 		entry->have_prep_stmt = false;
403 		entry->have_subtxn_error = false;
404 
405 		/* Everything succeeded, so we have finished transitioning */
406 		remote_connection_xact_transition_end(entry->conn);
407 	}
408 
409 	return success;
410 }
411 
412 /* Check if there is ongoing transaction on the remote node */
413 bool
remote_txn_is_ongoing(RemoteTxn * entry)414 remote_txn_is_ongoing(RemoteTxn *entry)
415 {
416 	Assert(remote_connection_xact_depth_get(entry->conn) >= 0);
417 	return remote_connection_xact_depth_get(entry->conn) > 0;
418 }
419 
420 /*
421  * If there were any errors in subtransactions, and we made prepared
422  * statements, those prepared statements may not have been cleared
423  * because of the subtxn error. Thus, do a DEALLOCATE ALL to make sure
424  * we get rid of all prepared statements.
425  *
426  * This is annoying and not terribly bulletproof, but it's
427  * probably not worth trying harder.
428  */
429 void
remote_txn_deallocate_prepared_stmts_if_needed(RemoteTxn * entry)430 remote_txn_deallocate_prepared_stmts_if_needed(RemoteTxn *entry)
431 {
432 	Assert(entry->conn != NULL && remote_connection_xact_depth_get(entry->conn) > 0);
433 
434 	if (entry->have_prep_stmt && entry->have_subtxn_error)
435 	{
436 		AsyncRequestSet *set = async_request_set_create();
437 		AsyncResponse *response;
438 
439 		async_request_set_add(set, async_request_send(entry->conn, "DEALLOCATE ALL"));
440 		response = async_request_set_wait_any_response(set);
441 		async_response_report_error_or_close(response, WARNING);
442 		response = async_request_set_wait_any_response(set);
443 		Assert(response == NULL);
444 	}
445 	entry->have_prep_stmt = false;
446 	entry->have_subtxn_error = false;
447 }
448 
449 /*
450  * Ensure state changes are marked successful when a remote transaction
451  * completes asynchronously and successfully.
452  *
453  * We do this in a callback which is guaranteed to be called when a reponse is
454  * received or a timeout occurs.
455  *
456  * There is no decision on whether to fail or not in this callback; this is
457  * only to guarantee that we're always updating the internal connection
458  * state. Someone still has to handle the responses elsewehere.
459  */
460 static bool
on_remote_txn_response(AsyncRequest * req,AsyncResponse * rsp)461 on_remote_txn_response(AsyncRequest *req, AsyncResponse *rsp)
462 {
463 	TSConnection *conn = async_request_get_connection(req);
464 	bool success = false;
465 
466 	if (async_response_get_type(rsp) == RESPONSE_RESULT)
467 	{
468 		AsyncResponseResult *res = (AsyncResponseResult *) rsp;
469 		PGresult *pgres = async_response_result_get_pg_result(res);
470 
471 		if (PQresultStatus(pgres) == PGRES_COMMAND_OK)
472 		{
473 			remote_connection_xact_transition_end(conn);
474 			success = true;
475 		}
476 	}
477 
478 	return success;
479 }
480 
481 static void
on_commit_or_commit_prepared_response(AsyncRequest * req,AsyncResponse * rsp,void * data)482 on_commit_or_commit_prepared_response(AsyncRequest *req, AsyncResponse *rsp, void *data)
483 {
484 	on_remote_txn_response(req, rsp);
485 }
486 
487 AsyncRequest *
remote_txn_async_send_commit(RemoteTxn * entry)488 remote_txn_async_send_commit(RemoteTxn *entry)
489 {
490 	AsyncRequest *req;
491 
492 	Assert(entry->conn != NULL);
493 	Assert(remote_connection_xact_depth_get(entry->conn) > 0);
494 
495 	elog(DEBUG3, "committing remote transaction on connection %p", entry->conn);
496 
497 	remote_connection_xact_transition_begin(entry->conn);
498 	req = async_request_send(entry->conn, "COMMIT TRANSACTION");
499 	async_request_set_response_callback(req, on_commit_or_commit_prepared_response, entry);
500 
501 	return req;
502 }
503 
504 void
remote_txn_write_persistent_record(RemoteTxn * entry)505 remote_txn_write_persistent_record(RemoteTxn *entry)
506 {
507 	entry->remote_txn_id = remote_txn_persistent_record_write(entry->id);
508 }
509 
510 static void
on_prepare_transaction_response(AsyncRequest * req,AsyncResponse * rsp,void * data)511 on_prepare_transaction_response(AsyncRequest *req, AsyncResponse *rsp, void *data)
512 {
513 	bool success = on_remote_txn_response(req, rsp);
514 
515 	if (!success)
516 	{
517 		RemoteTxn *txn = data;
518 
519 		/* If the prepare is not successful, reset the remote transaction ID
520 		 * to indicate we need to do a rollback */
521 		txn->remote_txn_id = NULL;
522 	}
523 }
524 
525 AsyncRequest *
remote_txn_async_send_prepare_transaction(RemoteTxn * entry)526 remote_txn_async_send_prepare_transaction(RemoteTxn *entry)
527 {
528 	AsyncRequest *req;
529 
530 	Assert(entry->conn != NULL);
531 	Assert(remote_connection_xact_depth_get(entry->conn) > 0);
532 	Assert(entry->remote_txn_id != NULL);
533 
534 	elog(DEBUG3,
535 		 "2pc: preparing remote transaction on connection %p: %s",
536 		 entry->conn,
537 		 remote_txn_id_out(entry->remote_txn_id));
538 
539 	remote_connection_xact_transition_begin(entry->conn);
540 	req = async_request_send(entry->conn,
541 							 remote_txn_id_prepare_transaction_sql(entry->remote_txn_id));
542 	async_request_set_response_callback(req, on_prepare_transaction_response, entry);
543 
544 	return req;
545 }
546 
547 AsyncRequest *
remote_txn_async_send_commit_prepared(RemoteTxn * entry)548 remote_txn_async_send_commit_prepared(RemoteTxn *entry)
549 {
550 	AsyncRequest *req;
551 
552 	Assert(entry->conn != NULL);
553 	Assert(entry->remote_txn_id != NULL);
554 
555 	elog(DEBUG3,
556 		 "2pc: commiting remote transaction on connection %p: '%s'",
557 		 entry->conn,
558 		 remote_txn_id_out(entry->remote_txn_id));
559 
560 	remote_connection_xact_transition_begin(entry->conn);
561 
562 	req = async_request_send_with_error(entry->conn,
563 										remote_txn_id_commit_prepared_sql(entry->remote_txn_id),
564 										WARNING);
565 	async_request_set_response_callback(req, on_commit_or_commit_prepared_response, entry);
566 
567 	return req;
568 }
569 
570 /*
571  * Rollback a subtransaction to a given savepoint.
572  */
573 bool
remote_txn_sub_txn_abort(RemoteTxn * entry,int curlevel)574 remote_txn_sub_txn_abort(RemoteTxn *entry, int curlevel)
575 {
576 	PGconn *pg_conn = remote_connection_get_pg_conn(entry->conn);
577 	bool success = false;
578 
579 	Assert(remote_connection_xact_depth_get(entry->conn) == curlevel);
580 	Assert(remote_connection_xact_depth_get(entry->conn) > 1);
581 
582 	if (in_error_recursion_trouble() && remote_connection_xact_is_transitioning(entry->conn))
583 		remote_connection_xact_transition_begin(entry->conn);
584 
585 	if (!remote_connection_xact_is_transitioning(entry->conn))
586 	{
587 		StringInfoData sql;
588 
589 		initStringInfo(&sql);
590 		entry->have_subtxn_error = true;
591 		remote_connection_xact_transition_begin(entry->conn);
592 
593 		/*
594 		 * If a command has been submitted to the data node by using an
595 		 * asynchronous execution function, the command might not have yet
596 		 * completed. Check to see if a command is still being processed by the
597 		 * data node, and if so, request cancellation of the command.
598 		 */
599 		if (PQtransactionStatus(pg_conn) == PQTRANS_ACTIVE &&
600 			!remote_connection_cancel_query(entry->conn))
601 			success = false;
602 		else
603 		{
604 			/* Rollback all remote subtransactions during abort */
605 			appendStringInfo(&sql, "ROLLBACK TO SAVEPOINT s%d", curlevel);
606 			success = exec_cleanup_command(entry->conn, sql.data);
607 
608 			if (success)
609 			{
610 				resetStringInfo(&sql);
611 				appendStringInfo(&sql, "RELEASE SAVEPOINT s%d", curlevel);
612 				success = exec_cleanup_command(entry->conn, sql.data);
613 			}
614 		}
615 
616 		if (success)
617 			remote_connection_xact_transition_end(entry->conn);
618 	}
619 
620 	Assert(remote_connection_xact_depth_get(entry->conn) > 0);
621 
622 	return success;
623 }
624 
625 bool
remote_txn_is_at_sub_txn_level(RemoteTxn * entry,int curlevel)626 remote_txn_is_at_sub_txn_level(RemoteTxn *entry, int curlevel)
627 {
628 	int xact_depth;
629 
630 	/*
631 	 * We only care about connections with open remote subtransactions of the
632 	 * current level.
633 	 */
634 	Assert(entry->conn != NULL);
635 
636 	xact_depth = remote_connection_xact_depth_get(entry->conn);
637 
638 	if (xact_depth < curlevel)
639 		return false;
640 
641 	if (xact_depth > curlevel)
642 		elog(ERROR, "missed cleaning up remote subtransaction at level %d", xact_depth);
643 
644 	Assert(xact_depth == curlevel);
645 
646 	return true;
647 }
648 
649 void
remote_txn_sub_txn_pre_commit(RemoteTxn * entry,int curlevel)650 remote_txn_sub_txn_pre_commit(RemoteTxn *entry, int curlevel)
651 {
652 	Assert(remote_connection_xact_depth_get(entry->conn) == curlevel);
653 	Assert(remote_connection_xact_depth_get(entry->conn) > 0);
654 	Assert(!remote_connection_xact_is_transitioning(entry->conn));
655 
656 	remote_connection_xact_transition_begin(entry->conn);
657 	remote_connection_cmdf_ok(entry->conn, "RELEASE SAVEPOINT s%d", curlevel);
658 	remote_connection_xact_transition_end(entry->conn);
659 }
660 
661 /*
662  * Functions for storing a persistent transaction records for two-phase
663  * commit.
664  */
665 static int
persistent_record_pkey_scan(const RemoteTxnId * id,tuple_found_func tuple_found,LOCKMODE lock_mode)666 persistent_record_pkey_scan(const RemoteTxnId *id, tuple_found_func tuple_found, LOCKMODE lock_mode)
667 {
668 	Catalog *catalog = ts_catalog_get();
669 	ScanKeyData scankey[1];
670 	ScannerCtx scanctx = {
671 		.table = catalog->tables[REMOTE_TXN].id,
672 		.index = catalog_get_index(catalog, REMOTE_TXN, REMOTE_TXN_PKEY_IDX),
673 		.nkeys = 1,
674 		.scankey = scankey,
675 		.tuple_found = tuple_found,
676 		.lockmode = lock_mode,
677 		.limit = 1,
678 		.scandirection = ForwardScanDirection,
679 	};
680 
681 	ScanKeyInit(&scankey[0],
682 				Anum_remote_txn_pkey_idx_remote_transaction_id,
683 				BTEqualStrategyNumber,
684 				F_TEXTEQ,
685 				CStringGetTextDatum(remote_txn_id_out(id)));
686 
687 	return ts_scanner_scan(&scanctx);
688 }
689 
690 bool
remote_txn_persistent_record_exists(const RemoteTxnId * parsed)691 remote_txn_persistent_record_exists(const RemoteTxnId *parsed)
692 {
693 	return persistent_record_pkey_scan(parsed, NULL, AccessShareLock) > 0;
694 }
695 
696 static ScanTupleResult
persistent_record_tuple_delete(TupleInfo * ti,void * data)697 persistent_record_tuple_delete(TupleInfo *ti, void *data)
698 {
699 	ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti));
700 	return SCAN_CONTINUE;
701 }
702 
703 int
remote_txn_persistent_record_delete_for_data_node(Oid foreign_server_oid)704 remote_txn_persistent_record_delete_for_data_node(Oid foreign_server_oid)
705 {
706 	Catalog *catalog = ts_catalog_get();
707 	ScanKeyData scankey[1];
708 	ScannerCtx scanctx;
709 	ForeignServer *server = GetForeignServer(foreign_server_oid);
710 
711 	ScanKeyInit(&scankey[0],
712 				Anum_remote_txn_data_node_name_idx_data_node_name,
713 				BTEqualStrategyNumber,
714 				F_NAMEEQ,
715 				CStringGetDatum(server->servername));
716 
717 	scanctx = (ScannerCtx){
718 		.table = catalog->tables[REMOTE_TXN].id,
719 		.index = catalog_get_index(catalog, REMOTE_TXN, REMOTE_TXN_DATA_NODE_NAME_IDX),
720 		.nkeys = 1,
721 		.scankey = scankey,
722 		.tuple_found = persistent_record_tuple_delete,
723 		.lockmode = RowExclusiveLock,
724 		.scandirection = ForwardScanDirection,
725 	};
726 
727 	return ts_scanner_scan(&scanctx);
728 }
729 
730 static void
persistent_record_insert_relation(Relation rel,RemoteTxnId * id)731 persistent_record_insert_relation(Relation rel, RemoteTxnId *id)
732 {
733 	TupleDesc desc = RelationGetDescr(rel);
734 	Datum values[Natts_remote_txn];
735 	bool nulls[Natts_remote_txn] = { false };
736 	CatalogSecurityContext sec_ctx;
737 	ForeignServer *server = GetForeignServer(id->id.server_id);
738 
739 	values[AttrNumberGetAttrOffset(Anum_remote_txn_data_node_name)] =
740 		DirectFunctionCall1(namein, CStringGetDatum(server->servername));
741 	values[AttrNumberGetAttrOffset(Anum_remote_txn_remote_transaction_id)] =
742 		CStringGetTextDatum(remote_txn_id_out(id));
743 
744 	ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
745 	ts_catalog_insert_values(rel, desc, values, nulls);
746 	ts_catalog_restore_user(&sec_ctx);
747 }
748 
749 /*
750  * Add a commit record to catalog.
751  */
752 RemoteTxnId *
remote_txn_persistent_record_write(TSConnectionId cid)753 remote_txn_persistent_record_write(TSConnectionId cid)
754 {
755 	RemoteTxnId *id = remote_txn_id_create(GetTopTransactionId(), cid);
756 	Catalog *catalog = ts_catalog_get();
757 	Relation rel;
758 
759 	rel = table_open(catalog->tables[REMOTE_TXN].id, RowExclusiveLock);
760 	persistent_record_insert_relation(rel, id);
761 
762 	/* Keep the table lock until transaction completes in order to
763 	 * synchronize with distributed restore point creation */
764 	table_close(rel, NoLock);
765 	return id;
766 }
767