1 /*
2 * This file and its contents are licensed under the Timescale License.
3 * Please see the included NOTICE for copyright information and
4 * LICENSE-TIMESCALE for a copy of the license.
5 */
6 #include <postgres.h>
7 #include <utils/fmgrprotos.h>
8 #include <utils/snapmgr.h>
9 #include <utils/fmgroids.h>
10 #include <access/xact.h>
11 #include <access/transam.h>
12 #include <miscadmin.h>
13
14 #include "txn_resolve.h"
15 #include "connection.h"
16 #include "txn.h"
17
18 RemoteTxnResolution
remote_txn_resolution(Oid foreign_server,const RemoteTxnId * transaction_id)19 remote_txn_resolution(Oid foreign_server, const RemoteTxnId *transaction_id)
20 {
21 if (remote_txn_is_still_in_progress(transaction_id->xid))
22 /* transaction still ongoing; don't know it's state */
23 return REMOTE_TXN_RESOLUTION_UNKNOWN;
24
25 if (remote_txn_persistent_record_exists(transaction_id))
26 return REMOTE_TXN_RESOLUTION_COMMT;
27
28 return REMOTE_TXN_RESOLUTION_ABORT;
29 }
30
31 /*
32 * Resolve any unresolved 2-pc transaction on a data node.
33 * Since the remote_txn log can be long, and most txn there
34 * will have been resolved, do not iterate that list.
35 *
36 * Instead query the data node for the list of unresolved txns
37 * via the pg_prepared_xacts view. Using that list, then check
38 * remote_txn. Use this as an opportunity to clean up remote_txn
39 * as well.
40 *
41 * Note that pg_prepared_xacts shared across other databases which
42 * also could be distributed. Right now we interested only in
43 * the current one.
44 */
45 #define GET_PREPARED_XACT_SQL \
46 "SELECT gid FROM pg_prepared_xacts WHERE database = current_database()"
47
48 Datum
remote_txn_heal_data_node(PG_FUNCTION_ARGS)49 remote_txn_heal_data_node(PG_FUNCTION_ARGS)
50 {
51 Oid foreign_server_oid = PG_GETARG_OID(0);
52 TSConnection *conn = remote_connection_open(foreign_server_oid, GetUserId());
53 int resolved = 0;
54
55 /*
56 * Use a raw connection since you need to be out of transaction to do
57 * COMMIT/ROLLBACK PREPARED
58 */
59 PGresult *res;
60 int row;
61 List *unknown_txn_gid = NIL;
62 int non_ts_txns = 0;
63
64 /*
65 * This function cannot be called inside a transaction block since effects
66 * cannot be rolled back
67 */
68 PreventInTransactionBlock(true, "remote_txn_heal_data_node");
69
70 res = remote_connection_query_ok(conn, GET_PREPARED_XACT_SQL);
71
72 Assert(1 == PQnfields(res));
73 for (row = 0; row < PQntuples(res); row++)
74 {
75 const char *id_string = PQgetvalue(res, row, 0);
76 RemoteTxnId *tpc_gid;
77 RemoteTxnResolution resolution;
78
79 if (!remote_txn_id_matches_prepared_txn(id_string))
80 {
81 non_ts_txns++;
82 continue;
83 }
84
85 tpc_gid = remote_txn_id_in(id_string);
86 resolution = remote_txn_resolution(foreign_server_oid, tpc_gid);
87
88 switch (resolution)
89 {
90 case REMOTE_TXN_RESOLUTION_COMMT:
91 remote_connection_cmd_ok(conn, remote_txn_id_commit_prepared_sql(tpc_gid));
92 resolved++;
93 break;
94 case REMOTE_TXN_RESOLUTION_ABORT:
95 remote_connection_cmd_ok(conn, remote_txn_id_rollback_prepared_sql(tpc_gid));
96 resolved++;
97 break;
98 case REMOTE_TXN_RESOLUTION_UNKNOWN:
99 unknown_txn_gid = lappend(unknown_txn_gid, tpc_gid);
100 break;
101 }
102 }
103
104 if (non_ts_txns > 0)
105 elog(NOTICE, "skipping %d non-TimescaleDB prepared transaction", non_ts_txns);
106
107 remote_result_close(res);
108
109 /*
110 * Perform cleanup of all records if there are no unknown txns.
111 */
112 if (list_length(unknown_txn_gid) == 0)
113 remote_txn_persistent_record_delete_for_data_node(foreign_server_oid);
114
115 remote_connection_close(conn);
116 PG_RETURN_INT32(resolved);
117 }
118