1 /*
2  * This file and its contents are licensed under the Timescale License.
3  * Please see the included NOTICE for copyright information and
4  * LICENSE-TIMESCALE for a copy of the license.
5  */
6 #include <postgres.h>
7 #include <funcapi.h>
8 #include <utils/builtins.h>
9 #include <utils/pg_lsn.h>
10 #include <utils/guc.h>
11 #include <access/xlog_internal.h>
12 #include <access/xlog.h>
13 #include <access/xact.h>
14 #include <catalog/pg_foreign_server.h>
15 #include <storage/lmgr.h>
16 #include <miscadmin.h>
17 
18 #include "errors.h"
19 #include "guc.h"
20 #include "catalog.h"
21 #include "debug_point.h"
22 #include "dist_util.h"
23 #include "remote/dist_commands.h"
24 #include "dist_backup.h"
25 
26 #define TS_ACCESS_NODE_TYPE "access_node"
27 #define TS_DATA_NODE_TYPE "data_node"
28 
29 enum
30 {
31 	Anum_restore_point_node_name = 1,
32 	Anum_restore_point_node_type,
33 	Anum_restore_point_lsn,
34 	_Anum_restore_point_max
35 };
36 
37 static Datum
create_restore_point_datum(TupleDesc tupdesc,const char * node_name,XLogRecPtr lsn)38 create_restore_point_datum(TupleDesc tupdesc, const char *node_name, XLogRecPtr lsn)
39 {
40 	Datum values[_Anum_restore_point_max] = { 0 };
41 	bool nulls[_Anum_restore_point_max] = { false };
42 	HeapTuple tuple;
43 	NameData node_name_nd;
44 
45 	tupdesc = BlessTupleDesc(tupdesc);
46 	if (node_name == NULL)
47 	{
48 		nulls[AttrNumberGetAttrOffset(Anum_restore_point_node_name)] = true;
49 		values[AttrNumberGetAttrOffset(Anum_restore_point_node_type)] =
50 			CStringGetTextDatum(TS_ACCESS_NODE_TYPE);
51 	}
52 	else
53 	{
54 		namestrcpy(&node_name_nd, node_name);
55 		values[AttrNumberGetAttrOffset(Anum_restore_point_node_name)] = NameGetDatum(&node_name_nd);
56 		values[AttrNumberGetAttrOffset(Anum_restore_point_node_type)] =
57 			CStringGetTextDatum(TS_DATA_NODE_TYPE);
58 	}
59 	values[AttrNumberGetAttrOffset(Anum_restore_point_lsn)] = LSNGetDatum(lsn);
60 	tuple = heap_form_tuple(tupdesc, values, nulls);
61 	return HeapTupleGetDatum(tuple);
62 }
63 
64 Datum
create_distributed_restore_point(PG_FUNCTION_ARGS)65 create_distributed_restore_point(PG_FUNCTION_ARGS)
66 {
67 	const char *name = TextDatumGetCString(PG_GETARG_DATUM(0));
68 	DistCmdResult *result_cmd;
69 	FuncCallContext *funcctx;
70 	XLogRecPtr lsn;
71 
72 	if (SRF_IS_FIRSTCALL())
73 	{
74 		int name_len = strlen(name);
75 		MemoryContext oldctx;
76 		TupleDesc tupdesc;
77 		char *sql;
78 
79 		if (name_len >= MAXFNAMELEN)
80 			ereport(ERROR,
81 					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
82 					 errmsg("restore point name is too long"),
83 					 errdetail("Maximum length is %d, while provided name has %d chars.",
84 							   MAXFNAMELEN - 1,
85 							   name_len)));
86 
87 		if (RecoveryInProgress())
88 			ereport(ERROR,
89 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
90 					 (errmsg("recovery is in progress"),
91 					  errdetail("WAL control functions cannot be executed during recovery."))));
92 
93 		if (!XLogIsNeeded())
94 			ereport(ERROR,
95 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
96 					 errmsg("WAL level '%s' is not sufficient for creating a restore point",
97 							GetConfigOptionByName("wal_level", NULL, false)),
98 					 errhint("Set wal_level to \"replica\" or \"logical\" at server start.")));
99 
100 		if (!superuser())
101 			ereport(ERROR,
102 					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
103 					 errmsg("must be superuser to create restore point")));
104 
105 		if (!ts_guc_enable_2pc)
106 			ereport(ERROR,
107 					(errcode(ERRCODE_TS_OPERATION_NOT_SUPPORTED),
108 					 errmsg("two-phase commit transactions are not enabled"),
109 					 errhint("Set timescaledb.enable_2pc to TRUE.")));
110 
111 		if (dist_util_membership() != DIST_MEMBER_ACCESS_NODE)
112 			ereport(ERROR,
113 					(errcode(ERRCODE_TS_OPERATION_NOT_SUPPORTED),
114 					 errmsg("distributed restore point must be created on the access node"),
115 					 errhint("Connect to the access node and create the distributed restore point "
116 							 "from there.")));
117 
118 		/*
119 		 * In order to achieve synchronization across the multinode cluster,
120 		 * we must ensure that the restore point created on the access node is
121 		 * synchronized with each data node.
122 		 *
123 		 * We must ensure that no concurrent prepared transactions are
124 		 * committed (COMMIT PREPARED) while we create the restore point.
125 		 * Otherwise, the distributed restore point might include prepared transactions
126 		 * that have committed on some data nodes but not others, leading to an
127 		 * inconsistent state when the distributed database is restored from a backup
128 		 * using the restore point.
129 		 *
130 		 * To do that we take an exclusive lock on the remote transaction
131 		 * table, which will force any concurrent transaction
132 		 * wait during their PREPARE phase.
133 		 */
134 		LockRelationOid(ts_catalog_get()->tables[REMOTE_TXN].id, ExclusiveLock);
135 
136 		/* Prevent situation when new data node added during the execution */
137 		LockRelationOid(ForeignServerRelationId, ExclusiveLock);
138 
139 		DEBUG_WAITPOINT("create_distributed_restore_point_lock");
140 
141 		funcctx = SRF_FIRSTCALL_INIT();
142 		oldctx = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
143 
144 		if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
145 			ereport(ERROR,
146 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
147 					 errmsg("function returning record called in context "
148 							"that cannot accept type record")));
149 
150 		/* Create local restore point and on each data node */
151 		lsn = XLogRestorePoint(name);
152 
153 		sql = psprintf("SELECT pg_create_restore_point AS lsn "
154 					   "FROM "
155 					   "pg_catalog.pg_create_restore_point(%s)",
156 					   quote_literal_cstr(name));
157 
158 		result_cmd = ts_dist_cmd_invoke_on_all_data_nodes(sql);
159 
160 		funcctx->attinmeta = TupleDescGetAttInMetadata(tupdesc);
161 		funcctx->user_fctx = result_cmd;
162 
163 		MemoryContextSwitchTo(oldctx);
164 
165 		/* Return access node restore point first */
166 		SRF_RETURN_NEXT(funcctx, create_restore_point_datum(tupdesc, NULL, lsn));
167 	}
168 
169 	funcctx = SRF_PERCALL_SETUP();
170 	result_cmd = funcctx->user_fctx;
171 
172 	/* Return data node restore point data */
173 	if (result_cmd)
174 	{
175 		int result_index = funcctx->call_cntr - 1;
176 
177 		if (result_index < ts_dist_cmd_response_count(result_cmd))
178 		{
179 			const char *node_name;
180 			PGresult *result =
181 				ts_dist_cmd_get_result_by_index(result_cmd, result_index, &node_name);
182 			AttInMetadata *attinmeta = funcctx->attinmeta;
183 			const int lsn_attr_pos = AttrNumberGetAttrOffset(Anum_restore_point_lsn);
184 
185 			lsn = DatumGetLSN(InputFunctionCall(&attinmeta->attinfuncs[lsn_attr_pos],
186 												PQgetvalue(result, 0, 0),
187 												attinmeta->attioparams[lsn_attr_pos],
188 												attinmeta->atttypmods[lsn_attr_pos]));
189 
190 			SRF_RETURN_NEXT(funcctx,
191 							create_restore_point_datum(attinmeta->tupdesc, node_name, lsn));
192 		}
193 
194 		ts_dist_cmd_close_response(result_cmd);
195 	}
196 
197 	SRF_RETURN_DONE(funcctx);
198 }
199