1 /*
2  * This file and its contents are licensed under the Timescale License.
3  * Please see the included NOTICE for copyright information and
4  * LICENSE-TIMESCALE for a copy of the license.
5  */
6 #include <postgres.h>
7 #include <utils/fmgrprotos.h>
8 #include <utils/snapmgr.h>
9 #include <utils/fmgroids.h>
10 #include <access/xact.h>
11 #include <access/transam.h>
12 #include <miscadmin.h>
13 
14 #include "txn_resolve.h"
15 #include "connection.h"
16 #include "txn.h"
17 
18 RemoteTxnResolution
remote_txn_resolution(Oid foreign_server,const RemoteTxnId * transaction_id)19 remote_txn_resolution(Oid foreign_server, const RemoteTxnId *transaction_id)
20 {
21 	if (remote_txn_is_still_in_progress(transaction_id->xid))
22 		/* transaction still ongoing; don't know it's state */
23 		return REMOTE_TXN_RESOLUTION_UNKNOWN;
24 
25 	if (remote_txn_persistent_record_exists(transaction_id))
26 		return REMOTE_TXN_RESOLUTION_COMMT;
27 
28 	return REMOTE_TXN_RESOLUTION_ABORT;
29 }
30 
31 /*
32  * Resolve any unresolved 2-pc transaction on a data node.
33  * Since the remote_txn log can be long, and most txn there
34  * will have been resolved, do not iterate that list.
35  *
36  * Instead query the data node for the list of unresolved txns
37  * via the pg_prepared_xacts view. Using that list, then check
38  * remote_txn. Use this as an opportunity to clean up remote_txn
39  * as well.
40  *
41  * Note that pg_prepared_xacts shared across other databases which
42  * also could be distributed. Right now we interested only in
43  * the current one.
44  */
45 #define GET_PREPARED_XACT_SQL                                                                      \
46 	"SELECT gid FROM pg_prepared_xacts WHERE database = current_database()"
47 
48 Datum
remote_txn_heal_data_node(PG_FUNCTION_ARGS)49 remote_txn_heal_data_node(PG_FUNCTION_ARGS)
50 {
51 	Oid foreign_server_oid = PG_GETARG_OID(0);
52 	TSConnection *conn = remote_connection_open(foreign_server_oid, GetUserId());
53 	int resolved = 0;
54 
55 	/*
56 	 * Use a raw connection since you need to be out of transaction to do
57 	 * COMMIT/ROLLBACK PREPARED
58 	 */
59 	PGresult *res;
60 	int row;
61 	List *unknown_txn_gid = NIL;
62 	int non_ts_txns = 0;
63 
64 	/*
65 	 * This function cannot be called inside a transaction block since effects
66 	 * cannot be rolled back
67 	 */
68 	PreventInTransactionBlock(true, "remote_txn_heal_data_node");
69 
70 	res = remote_connection_query_ok(conn, GET_PREPARED_XACT_SQL);
71 
72 	Assert(1 == PQnfields(res));
73 	for (row = 0; row < PQntuples(res); row++)
74 	{
75 		const char *id_string = PQgetvalue(res, row, 0);
76 		RemoteTxnId *tpc_gid;
77 		RemoteTxnResolution resolution;
78 
79 		if (!remote_txn_id_matches_prepared_txn(id_string))
80 		{
81 			non_ts_txns++;
82 			continue;
83 		}
84 
85 		tpc_gid = remote_txn_id_in(id_string);
86 		resolution = remote_txn_resolution(foreign_server_oid, tpc_gid);
87 
88 		switch (resolution)
89 		{
90 			case REMOTE_TXN_RESOLUTION_COMMT:
91 				remote_connection_cmd_ok(conn, remote_txn_id_commit_prepared_sql(tpc_gid));
92 				resolved++;
93 				break;
94 			case REMOTE_TXN_RESOLUTION_ABORT:
95 				remote_connection_cmd_ok(conn, remote_txn_id_rollback_prepared_sql(tpc_gid));
96 				resolved++;
97 				break;
98 			case REMOTE_TXN_RESOLUTION_UNKNOWN:
99 				unknown_txn_gid = lappend(unknown_txn_gid, tpc_gid);
100 				break;
101 		}
102 	}
103 
104 	if (non_ts_txns > 0)
105 		elog(NOTICE, "skipping %d non-TimescaleDB prepared transaction", non_ts_txns);
106 
107 	remote_result_close(res);
108 
109 	/*
110 	 * Perform cleanup of all records if there are no unknown txns.
111 	 */
112 	if (list_length(unknown_txn_gid) == 0)
113 		remote_txn_persistent_record_delete_for_data_node(foreign_server_oid);
114 
115 	remote_connection_close(conn);
116 	PG_RETURN_INT32(resolved);
117 }
118