1 /*
2 * This file and its contents are licensed under the Timescale License.
3 * Please see the included NOTICE for copyright information and
4 * LICENSE-TIMESCALE for a copy of the license.
5 */
6 #include <postgres.h>
7 #include <funcapi.h>
8 #include <utils/builtins.h>
9 #include <utils/pg_lsn.h>
10 #include <utils/guc.h>
11 #include <access/xlog_internal.h>
12 #include <access/xlog.h>
13 #include <access/xact.h>
14 #include <catalog/pg_foreign_server.h>
15 #include <storage/lmgr.h>
16 #include <miscadmin.h>
17
18 #include "errors.h"
19 #include "guc.h"
20 #include "catalog.h"
21 #include "debug_point.h"
22 #include "dist_util.h"
23 #include "remote/dist_commands.h"
24 #include "dist_backup.h"
25
26 #define TS_ACCESS_NODE_TYPE "access_node"
27 #define TS_DATA_NODE_TYPE "data_node"
28
29 enum
30 {
31 Anum_restore_point_node_name = 1,
32 Anum_restore_point_node_type,
33 Anum_restore_point_lsn,
34 _Anum_restore_point_max
35 };
36
37 static Datum
create_restore_point_datum(TupleDesc tupdesc,const char * node_name,XLogRecPtr lsn)38 create_restore_point_datum(TupleDesc tupdesc, const char *node_name, XLogRecPtr lsn)
39 {
40 Datum values[_Anum_restore_point_max] = { 0 };
41 bool nulls[_Anum_restore_point_max] = { false };
42 HeapTuple tuple;
43 NameData node_name_nd;
44
45 tupdesc = BlessTupleDesc(tupdesc);
46 if (node_name == NULL)
47 {
48 nulls[AttrNumberGetAttrOffset(Anum_restore_point_node_name)] = true;
49 values[AttrNumberGetAttrOffset(Anum_restore_point_node_type)] =
50 CStringGetTextDatum(TS_ACCESS_NODE_TYPE);
51 }
52 else
53 {
54 namestrcpy(&node_name_nd, node_name);
55 values[AttrNumberGetAttrOffset(Anum_restore_point_node_name)] = NameGetDatum(&node_name_nd);
56 values[AttrNumberGetAttrOffset(Anum_restore_point_node_type)] =
57 CStringGetTextDatum(TS_DATA_NODE_TYPE);
58 }
59 values[AttrNumberGetAttrOffset(Anum_restore_point_lsn)] = LSNGetDatum(lsn);
60 tuple = heap_form_tuple(tupdesc, values, nulls);
61 return HeapTupleGetDatum(tuple);
62 }
63
64 Datum
create_distributed_restore_point(PG_FUNCTION_ARGS)65 create_distributed_restore_point(PG_FUNCTION_ARGS)
66 {
67 const char *name = TextDatumGetCString(PG_GETARG_DATUM(0));
68 DistCmdResult *result_cmd;
69 FuncCallContext *funcctx;
70 XLogRecPtr lsn;
71
72 if (SRF_IS_FIRSTCALL())
73 {
74 int name_len = strlen(name);
75 MemoryContext oldctx;
76 TupleDesc tupdesc;
77 char *sql;
78
79 if (name_len >= MAXFNAMELEN)
80 ereport(ERROR,
81 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
82 errmsg("restore point name is too long"),
83 errdetail("Maximum length is %d, while provided name has %d chars.",
84 MAXFNAMELEN - 1,
85 name_len)));
86
87 if (RecoveryInProgress())
88 ereport(ERROR,
89 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
90 (errmsg("recovery is in progress"),
91 errdetail("WAL control functions cannot be executed during recovery."))));
92
93 if (!XLogIsNeeded())
94 ereport(ERROR,
95 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
96 errmsg("WAL level '%s' is not sufficient for creating a restore point",
97 GetConfigOptionByName("wal_level", NULL, false)),
98 errhint("Set wal_level to \"replica\" or \"logical\" at server start.")));
99
100 if (!superuser())
101 ereport(ERROR,
102 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
103 errmsg("must be superuser to create restore point")));
104
105 if (!ts_guc_enable_2pc)
106 ereport(ERROR,
107 (errcode(ERRCODE_TS_OPERATION_NOT_SUPPORTED),
108 errmsg("two-phase commit transactions are not enabled"),
109 errhint("Set timescaledb.enable_2pc to TRUE.")));
110
111 if (dist_util_membership() != DIST_MEMBER_ACCESS_NODE)
112 ereport(ERROR,
113 (errcode(ERRCODE_TS_OPERATION_NOT_SUPPORTED),
114 errmsg("distributed restore point must be created on the access node"),
115 errhint("Connect to the access node and create the distributed restore point "
116 "from there.")));
117
118 /*
119 * In order to achieve synchronization across the multinode cluster,
120 * we must ensure that the restore point created on the access node is
121 * synchronized with each data node.
122 *
123 * We must ensure that no concurrent prepared transactions are
124 * committed (COMMIT PREPARED) while we create the restore point.
125 * Otherwise, the distributed restore point might include prepared transactions
126 * that have committed on some data nodes but not others, leading to an
127 * inconsistent state when the distributed database is restored from a backup
128 * using the restore point.
129 *
130 * To do that we take an exclusive lock on the remote transaction
131 * table, which will force any concurrent transaction
132 * wait during their PREPARE phase.
133 */
134 LockRelationOid(ts_catalog_get()->tables[REMOTE_TXN].id, ExclusiveLock);
135
136 /* Prevent situation when new data node added during the execution */
137 LockRelationOid(ForeignServerRelationId, ExclusiveLock);
138
139 DEBUG_WAITPOINT("create_distributed_restore_point_lock");
140
141 funcctx = SRF_FIRSTCALL_INIT();
142 oldctx = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
143
144 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
145 ereport(ERROR,
146 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
147 errmsg("function returning record called in context "
148 "that cannot accept type record")));
149
150 /* Create local restore point and on each data node */
151 lsn = XLogRestorePoint(name);
152
153 sql = psprintf("SELECT pg_create_restore_point AS lsn "
154 "FROM "
155 "pg_catalog.pg_create_restore_point(%s)",
156 quote_literal_cstr(name));
157
158 result_cmd = ts_dist_cmd_invoke_on_all_data_nodes(sql);
159
160 funcctx->attinmeta = TupleDescGetAttInMetadata(tupdesc);
161 funcctx->user_fctx = result_cmd;
162
163 MemoryContextSwitchTo(oldctx);
164
165 /* Return access node restore point first */
166 SRF_RETURN_NEXT(funcctx, create_restore_point_datum(tupdesc, NULL, lsn));
167 }
168
169 funcctx = SRF_PERCALL_SETUP();
170 result_cmd = funcctx->user_fctx;
171
172 /* Return data node restore point data */
173 if (result_cmd)
174 {
175 int result_index = funcctx->call_cntr - 1;
176
177 if (result_index < ts_dist_cmd_response_count(result_cmd))
178 {
179 const char *node_name;
180 PGresult *result =
181 ts_dist_cmd_get_result_by_index(result_cmd, result_index, &node_name);
182 AttInMetadata *attinmeta = funcctx->attinmeta;
183 const int lsn_attr_pos = AttrNumberGetAttrOffset(Anum_restore_point_lsn);
184
185 lsn = DatumGetLSN(InputFunctionCall(&attinmeta->attinfuncs[lsn_attr_pos],
186 PQgetvalue(result, 0, 0),
187 attinmeta->attioparams[lsn_attr_pos],
188 attinmeta->atttypmods[lsn_attr_pos]));
189
190 SRF_RETURN_NEXT(funcctx,
191 create_restore_point_datum(attinmeta->tupdesc, node_name, lsn));
192 }
193
194 ts_dist_cmd_close_response(result_cmd);
195 }
196
197 SRF_RETURN_DONE(funcctx);
198 }
199