1 /*
2  * This file and its contents are licensed under the Timescale License.
3  * Please see the included NOTICE for copyright information and
4  * LICENSE-TIMESCALE for a copy of the license.
5  */
6 #include <postgres.h>
7 
8 #include <access/htup_details.h>
9 #include <access/xact.h>
10 #include <catalog.h>
11 #include <catalog/namespace.h>
12 #include <catalog/pg_database.h>
13 #include <catalog/pg_foreign_server.h>
14 #include <catalog/pg_inherits.h>
15 #include <catalog/pg_namespace.h>
16 #include <chunk_data_node.h>
17 #include <commands/dbcommands.h>
18 #include <commands/defrem.h>
19 #include <commands/event_trigger.h>
20 #include <compat/compat.h>
21 #include <extension.h>
22 #include <funcapi.h>
23 #include <hypertable_data_node.h>
24 #include <libpq/crypt.h>
25 #include <miscadmin.h>
26 #include <nodes/makefuncs.h>
27 #include <nodes/parsenodes.h>
28 #include <utils/acl.h>
29 #include <utils/builtins.h>
30 #include <utils/builtins.h>
31 #include <utils/guc.h>
32 #include <utils/inval.h>
33 #include <utils/syscache.h>
34 
35 #include "config.h"
36 #include "extension.h"
37 #include "fdw/fdw.h"
38 #include "remote/async.h"
39 #include "remote/connection.h"
40 #include "remote/connection_cache.h"
41 #include "data_node.h"
42 #include "remote/utils.h"
43 #include "hypertable_cache.h"
44 #include "errors.h"
45 #include "dist_util.h"
46 #include "utils/uuid.h"
47 #include "mb/pg_wchar.h"
48 #include "chunk.h"
49 
50 #define TS_DEFAULT_POSTGRES_PORT 5432
51 #define TS_DEFAULT_POSTGRES_HOST "localhost"
52 
53 #define ERRCODE_DUPLICATE_DATABASE_STR "42P04"
54 #define ERRCODE_DUPLICATE_SCHEMA_STR "42P06"
55 
56 typedef struct DbInfo
57 {
58 	NameData name;
59 	int32 encoding;
60 	NameData chartype;
61 	NameData collation;
62 } DbInfo;
63 
64 /* A list of databases we try to connect to when bootstrapping a data node */
65 static const char *bootstrap_databases[] = { "postgres", "template1", "defaultdb" };
66 
67 static bool data_node_validate_database(TSConnection *conn, const DbInfo *database);
68 
69 /*
70  * get_database_info - given a database OID, look up info about the database
71  *
72  * Returns:
73  *  True if a record for the OID was found, false otherwise.
74  */
75 static bool
get_database_info(Oid dbid,DbInfo * database)76 get_database_info(Oid dbid, DbInfo *database)
77 {
78 	HeapTuple dbtuple;
79 	Form_pg_database dbrecord;
80 
81 	dbtuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
82 
83 	if (!HeapTupleIsValid(dbtuple))
84 		return false;
85 
86 	dbrecord = (Form_pg_database) GETSTRUCT(dbtuple);
87 
88 	database->encoding = dbrecord->encoding;
89 	database->collation = dbrecord->datcollate;
90 	database->chartype = dbrecord->datctype;
91 
92 	ReleaseSysCache(dbtuple);
93 	return true;
94 }
95 
96 /*
97  * Verify that server is TimescaleDB data node and perform optional ACL check.
98  *
99  * The function returns true iif the server is valid TimescaleDB data node and
100  * the ACL check succeeds. Otherwise, false is returned, or, an error is thrown
101  * if fail_on_aclcheck is set to true.
102  */
103 static bool
validate_foreign_server(const ForeignServer * server,AclMode const mode,bool fail_on_aclcheck)104 validate_foreign_server(const ForeignServer *server, AclMode const mode, bool fail_on_aclcheck)
105 {
106 	Oid const fdwid = get_foreign_data_wrapper_oid(EXTENSION_FDW_NAME, false);
107 	Oid curuserid = GetUserId();
108 	AclResult aclresult;
109 	bool valid;
110 
111 	Assert(NULL != server);
112 	if (server->fdwid != fdwid)
113 		ereport(ERROR,
114 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
115 				 errmsg("data node \"%s\" is not a TimescaleDB server", server->servername)));
116 
117 	if (mode == ACL_NO_CHECK)
118 		return true;
119 
120 	/* Must have permissions on the server object */
121 	aclresult = pg_foreign_server_aclcheck(server->serverid, curuserid, mode);
122 
123 	valid = (aclresult == ACLCHECK_OK);
124 
125 	if (!valid && fail_on_aclcheck)
126 		aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername);
127 
128 	return valid;
129 }
130 
131 /*
132  * Lookup the foreign server by name
133  */
134 ForeignServer *
data_node_get_foreign_server(const char * node_name,AclMode mode,bool fail_on_aclcheck,bool missing_ok)135 data_node_get_foreign_server(const char *node_name, AclMode mode, bool fail_on_aclcheck,
136 							 bool missing_ok)
137 {
138 	ForeignServer *server;
139 	bool valid;
140 
141 	if (node_name == NULL)
142 		ereport(ERROR,
143 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
144 				 errmsg("data node name cannot be NULL")));
145 
146 	server = GetForeignServerByName(node_name, missing_ok);
147 	if (NULL == server)
148 		return NULL;
149 
150 	valid = validate_foreign_server(server, mode, fail_on_aclcheck);
151 
152 	if (mode != ACL_NO_CHECK && !valid)
153 		return NULL;
154 
155 	return server;
156 }
157 
158 ForeignServer *
data_node_get_foreign_server_by_oid(Oid server_oid,AclMode mode)159 data_node_get_foreign_server_by_oid(Oid server_oid, AclMode mode)
160 {
161 	ForeignServer *server = GetForeignServer(server_oid);
162 	bool PG_USED_FOR_ASSERTS_ONLY valid = validate_foreign_server(server, mode, true);
163 	Assert(valid); /* Should always be valid since we should see error otherwise */
164 	return server;
165 }
166 
167 /*
168  * Create a foreign server.
169  *
170  * Returns true if the server was created and set the `oid` to the server oid.
171  */
172 static bool
create_foreign_server(const char * const node_name,const char * const host,int32 port,const char * const dbname,bool if_not_exists)173 create_foreign_server(const char *const node_name, const char *const host, int32 port,
174 					  const char *const dbname, bool if_not_exists)
175 {
176 	ForeignServer *server = NULL;
177 	ObjectAddress objaddr;
178 	CreateForeignServerStmt stmt = {
179 		.type = T_CreateForeignServerStmt,
180 		.servername = (char *) node_name,
181 		.fdwname = EXTENSION_FDW_NAME,
182 		.options = list_make3(makeDefElem("host", (Node *) makeString(pstrdup(host)), -1),
183 							  makeDefElem("port", (Node *) makeInteger(port), -1),
184 							  makeDefElem("dbname", (Node *) makeString(pstrdup(dbname)), -1)),
185 		.if_not_exists = if_not_exists,
186 	};
187 
188 	if (NULL == host)
189 		ereport(ERROR,
190 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
191 				 (errmsg("invalid host"),
192 				  (errhint("A hostname or IP address must be specified when "
193 						   "a data node does not already exist.")))));
194 
195 	if (if_not_exists)
196 	{
197 		server = data_node_get_foreign_server(node_name, ACL_NO_CHECK, false, true);
198 
199 		if (NULL != server)
200 		{
201 			ereport(NOTICE,
202 					(errcode(ERRCODE_DUPLICATE_OBJECT),
203 					 errmsg("data node \"%s\" already exists, skipping", node_name)));
204 			return false;
205 		}
206 	}
207 
208 	/* Permissions checks done in CreateForeignServer() */
209 	objaddr = CreateForeignServer(&stmt);
210 
211 	/* CreateForeignServer returns InvalidOid if server already exists */
212 	if (!OidIsValid(objaddr.objectId))
213 	{
214 		Assert(if_not_exists);
215 		return false;
216 	}
217 
218 	return true;
219 }
220 
221 TSConnection *
data_node_get_connection(const char * const data_node,RemoteTxnPrepStmtOption const ps_opt,bool transactional)222 data_node_get_connection(const char *const data_node, RemoteTxnPrepStmtOption const ps_opt,
223 						 bool transactional)
224 {
225 	const ForeignServer *server;
226 	TSConnectionId id;
227 
228 	Assert(data_node != NULL);
229 	server = data_node_get_foreign_server(data_node, ACL_NO_CHECK, false, false);
230 	id = remote_connection_id(server->serverid, GetUserId());
231 
232 	if (transactional)
233 		return remote_dist_txn_get_connection(id, ps_opt);
234 
235 	return remote_connection_cache_get_connection(id);
236 }
237 
238 /* Attribute numbers for datum returned by create_data_node() */
239 enum Anum_create_data_node
240 {
241 	Anum_create_data_node_name = 1,
242 	Anum_create_data_node_host,
243 	Anum_create_data_node_port,
244 	Anum_create_data_node_dbname,
245 	Anum_create_data_node_node_created,
246 	Anum_create_data_node_database_created,
247 	Anum_create_data_node_extension_created,
248 	_Anum_create_data_node_max,
249 };
250 
251 #define Natts_create_data_node (_Anum_create_data_node_max - 1)
252 
253 static Datum
create_data_node_datum(FunctionCallInfo fcinfo,const char * node_name,const char * host,int32 port,const char * dbname,bool node_created,bool database_created,bool extension_created)254 create_data_node_datum(FunctionCallInfo fcinfo, const char *node_name, const char *host, int32 port,
255 					   const char *dbname, bool node_created, bool database_created,
256 					   bool extension_created)
257 {
258 	TupleDesc tupdesc;
259 	Datum values[_Anum_create_data_node_max];
260 	bool nulls[_Anum_create_data_node_max] = { false };
261 	HeapTuple tuple;
262 
263 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
264 		ereport(ERROR,
265 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
266 				 errmsg("function returning record called in "
267 						"context that cannot accept type record")));
268 
269 	tupdesc = BlessTupleDesc(tupdesc);
270 	values[AttrNumberGetAttrOffset(Anum_create_data_node_name)] = CStringGetDatum(node_name);
271 	values[AttrNumberGetAttrOffset(Anum_create_data_node_host)] = CStringGetTextDatum(host);
272 	values[AttrNumberGetAttrOffset(Anum_create_data_node_port)] = Int32GetDatum(port);
273 	values[AttrNumberGetAttrOffset(Anum_create_data_node_dbname)] = CStringGetDatum(dbname);
274 	values[AttrNumberGetAttrOffset(Anum_create_data_node_node_created)] =
275 		BoolGetDatum(node_created);
276 	values[AttrNumberGetAttrOffset(Anum_create_data_node_database_created)] =
277 		BoolGetDatum(database_created);
278 	values[AttrNumberGetAttrOffset(Anum_create_data_node_extension_created)] =
279 		BoolGetDatum(extension_created);
280 	tuple = heap_form_tuple(tupdesc, values, nulls);
281 
282 	return HeapTupleGetDatum(tuple);
283 }
284 
285 static Datum
create_hypertable_data_node_datum(FunctionCallInfo fcinfo,HypertableDataNode * node)286 create_hypertable_data_node_datum(FunctionCallInfo fcinfo, HypertableDataNode *node)
287 {
288 	TupleDesc tupdesc;
289 	Datum values[Natts_hypertable_data_node];
290 	bool nulls[Natts_hypertable_data_node] = { false };
291 	HeapTuple tuple;
292 
293 	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
294 		ereport(ERROR,
295 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
296 				 errmsg("function returning record called in "
297 						"context that cannot accept type record")));
298 
299 	tupdesc = BlessTupleDesc(tupdesc);
300 	values[AttrNumberGetAttrOffset(Anum_hypertable_data_node_hypertable_id)] =
301 		Int32GetDatum(node->fd.hypertable_id);
302 	values[AttrNumberGetAttrOffset(Anum_hypertable_data_node_node_hypertable_id)] =
303 		Int32GetDatum(node->fd.node_hypertable_id);
304 	values[AttrNumberGetAttrOffset(Anum_hypertable_data_node_node_name)] =
305 		NameGetDatum(&node->fd.node_name);
306 	tuple = heap_form_tuple(tupdesc, values, nulls);
307 
308 	return HeapTupleGetDatum(tuple);
309 }
310 
311 static List *
create_data_node_options(const char * host,int32 port,const char * dbname,const char * user,const char * password)312 create_data_node_options(const char *host, int32 port, const char *dbname, const char *user,
313 						 const char *password)
314 {
315 	DefElem *host_elm = makeDefElem("host", (Node *) makeString(pstrdup(host)), -1);
316 	DefElem *port_elm = makeDefElem("port", (Node *) makeInteger(port), -1);
317 	DefElem *dbname_elm = makeDefElem("dbname", (Node *) makeString(pstrdup(dbname)), -1);
318 	DefElem *user_elm = makeDefElem("user", (Node *) makeString(pstrdup(user)), -1);
319 
320 	if (NULL != password)
321 	{
322 		DefElem *password_elm = makeDefElem("password", (Node *) makeString(pstrdup(password)), -1);
323 		return list_make5(host_elm, port_elm, dbname_elm, user_elm, password_elm);
324 	}
325 
326 	return list_make4(host_elm, port_elm, dbname_elm, user_elm);
327 }
328 
329 /* Returns 'true' if the database was created. */
330 static bool
data_node_bootstrap_database(TSConnection * conn,const DbInfo * database)331 data_node_bootstrap_database(TSConnection *conn, const DbInfo *database)
332 {
333 	const char *const username = PQuser(remote_connection_get_pg_conn(conn));
334 
335 	Assert(database);
336 
337 	if (data_node_validate_database(conn, database))
338 	{
339 		/* If the database already existed on the remote node, we will log a
340 		 * notice and proceed since it is not an error if the database already
341 		 * existed on the remote node. */
342 		elog(NOTICE,
343 			 "database \"%s\" already exists on data node, skipping",
344 			 NameStr(database->name));
345 		return false;
346 	}
347 
348 	/* Create the database with the user as owner. There is no need to
349 	 * validate the database after this command since it should be created
350 	 * correctly. */
351 	PGresult *res =
352 		remote_connection_execf(conn,
353 								"CREATE DATABASE %s ENCODING %s LC_COLLATE %s LC_CTYPE %s "
354 								"TEMPLATE template0 OWNER %s",
355 								quote_identifier(NameStr(database->name)),
356 								quote_identifier(pg_encoding_to_char(database->encoding)),
357 								quote_literal_cstr(NameStr(database->collation)),
358 								quote_literal_cstr(NameStr(database->chartype)),
359 								quote_identifier(username));
360 	if (PQresultStatus(res) != PGRES_COMMAND_OK)
361 		remote_result_elog(res, ERROR);
362 	return true;
363 }
364 
365 /* Validate the database.
366  *
367  * Errors:
368  *   Will abort with errors if the database exists but is not correctly set
369  *   up.
370  * Returns:
371  *   true if the database exists and is valid
372  *   false if it does not exist.
373  */
374 static bool
data_node_validate_database(TSConnection * conn,const DbInfo * database)375 data_node_validate_database(TSConnection *conn, const DbInfo *database)
376 {
377 	PGresult *res;
378 	uint32 actual_encoding;
379 	const char *actual_chartype;
380 	const char *actual_collation;
381 
382 	res = remote_connection_execf(conn,
383 								  "SELECT encoding, datcollate, datctype "
384 								  "FROM pg_database WHERE datname = %s",
385 								  quote_literal_cstr(NameStr(database->name)));
386 
387 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
388 		ereport(ERROR,
389 				(errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res))));
390 
391 	if (PQntuples(res) == 0)
392 		return false;
393 
394 	Assert(PQnfields(res) > 2);
395 
396 	actual_encoding = atoi(PQgetvalue(res, 0, 0));
397 	if (actual_encoding != database->encoding)
398 		ereport(ERROR,
399 				(errcode(ERRCODE_TS_DATA_NODE_INVALID_CONFIG),
400 				 errmsg("database exists but has wrong encoding"),
401 				 errdetail("Expected database encoding to be \"%s\" (%u) but it was \"%s\" (%u).",
402 						   pg_encoding_to_char(database->encoding),
403 						   database->encoding,
404 						   pg_encoding_to_char(actual_encoding),
405 						   actual_encoding)));
406 
407 	actual_collation = PQgetvalue(res, 0, 1);
408 	Assert(actual_collation != NULL);
409 	if (strcmp(actual_collation, NameStr(database->collation)) != 0)
410 		ereport(ERROR,
411 				(errcode(ERRCODE_TS_DATA_NODE_INVALID_CONFIG),
412 				 errmsg("database exists but has wrong collation"),
413 				 errdetail("Expected collation \"%s\" but it was \"%s\".",
414 						   NameStr(database->collation),
415 						   actual_collation)));
416 
417 	actual_chartype = PQgetvalue(res, 0, 2);
418 	Assert(actual_chartype != NULL);
419 	if (strcmp(actual_chartype, NameStr(database->chartype)) != 0)
420 		ereport(ERROR,
421 				(errcode(ERRCODE_TS_DATA_NODE_INVALID_CONFIG),
422 				 errmsg("database exists but has wrong LC_CTYPE"),
423 				 errdetail("Expected LC_CTYPE \"%s\" but it was \"%s\".",
424 						   NameStr(database->chartype),
425 						   actual_chartype)));
426 	return true;
427 }
428 
429 static void
data_node_validate_extension(TSConnection * conn)430 data_node_validate_extension(TSConnection *conn)
431 {
432 	const char *const dbname = PQdb(remote_connection_get_pg_conn(conn));
433 	const char *const host = PQhost(remote_connection_get_pg_conn(conn));
434 	const char *const port = PQport(remote_connection_get_pg_conn(conn));
435 
436 	if (!remote_connection_check_extension(conn))
437 		ereport(ERROR,
438 				(errcode(ERRCODE_TS_DATA_NODE_INVALID_CONFIG),
439 				 errmsg("database does not have TimescaleDB extension loaded"),
440 				 errdetail("The TimescaleDB extension is not loaded in database %s on node at "
441 						   "%s:%s.",
442 						   dbname,
443 						   host,
444 						   port)));
445 }
446 
447 static void
data_node_validate_as_data_node(TSConnection * conn)448 data_node_validate_as_data_node(TSConnection *conn)
449 {
450 	PGresult *res =
451 		remote_connection_exec(conn, "SELECT _timescaledb_internal.validate_as_data_node()");
452 
453 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
454 		ereport(ERROR,
455 				(errcode(ERRCODE_TS_DATA_NODE_INVALID_CONFIG),
456 				 (errmsg("cannot add \"%s\" as a data node", remote_connection_node_name(conn)),
457 				  errdetail("%s", PQresultErrorMessage(res)))));
458 
459 	remote_result_close(res);
460 }
461 
462 /*
463  * Bootstrap the extension and associated objects.
464  */
465 static bool
data_node_bootstrap_extension(TSConnection * conn)466 data_node_bootstrap_extension(TSConnection *conn)
467 {
468 	const char *const username = PQuser(remote_connection_get_pg_conn(conn));
469 	const char *schema_name = ts_extension_schema_name();
470 	const char *schema_name_quoted = quote_identifier(schema_name);
471 	Oid schema_oid = get_namespace_oid(schema_name, true);
472 
473 	/* We only count the number of tuples in the code below, but having the
474 	 * name and version are useful for debugging purposes. */
475 	PGresult *res =
476 		remote_connection_execf(conn,
477 								"SELECT extname, extversion FROM pg_extension WHERE extname = %s",
478 								quote_literal_cstr(EXTENSION_NAME));
479 
480 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
481 		ereport(ERROR,
482 				(errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res))));
483 
484 	if (PQntuples(res) == 0)
485 	{
486 		if (schema_oid != PG_PUBLIC_NAMESPACE)
487 		{
488 			PGresult *res = remote_connection_execf(conn,
489 													"CREATE SCHEMA %s AUTHORIZATION %s",
490 													schema_name_quoted,
491 													quote_identifier(username));
492 			if (PQresultStatus(res) != PGRES_COMMAND_OK)
493 			{
494 				const char *const sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
495 				bool schema_exists =
496 					(sqlstate && strcmp(sqlstate, ERRCODE_DUPLICATE_SCHEMA_STR) == 0);
497 				if (!schema_exists)
498 					remote_result_elog(res, ERROR);
499 				/* If the schema already existed on the remote node, we got a
500 				 * duplicate schema error and the schema was not created. In
501 				 * that case, we log an error with a hint on how to fix the
502 				 * issue. */
503 				ereport(ERROR,
504 						(errcode(ERRCODE_DUPLICATE_SCHEMA),
505 						 errmsg("schema \"%s\" already exists in database, aborting", schema_name),
506 						 errhint("Make sure that the data node does not contain any "
507 								 "existing objects prior to adding it.")));
508 			}
509 		}
510 
511 		remote_connection_cmdf_ok(conn,
512 								  "CREATE EXTENSION " EXTENSION_NAME
513 								  " WITH SCHEMA %s VERSION %s CASCADE",
514 								  schema_name_quoted,
515 								  quote_literal_cstr(ts_extension_get_version()));
516 		return true;
517 	}
518 	else
519 	{
520 		ereport(NOTICE,
521 				(errmsg("extension \"%s\" already exists on data node, skipping",
522 						PQgetvalue(res, 0, 0)),
523 				 errdetail("TimescaleDB extension version on %s:%s was %s.",
524 						   PQhost(remote_connection_get_pg_conn(conn)),
525 						   PQport(remote_connection_get_pg_conn(conn)),
526 						   PQgetvalue(res, 0, 1))));
527 		data_node_validate_extension(conn);
528 		return false;
529 	}
530 }
531 
532 /* Add dist_uuid on the remote node.
533  *
534  * If the remote node is set to use the current database, `set_dist_id` will report an error and not
535  * set it. */
536 static void
add_distributed_id_to_data_node(TSConnection * conn)537 add_distributed_id_to_data_node(TSConnection *conn)
538 {
539 	Datum id_string = DirectFunctionCall1(uuid_out, dist_util_get_id());
540 	PGresult *res = remote_connection_queryf_ok(conn,
541 												"SELECT _timescaledb_internal.set_dist_id('%s')",
542 												DatumGetCString(id_string));
543 	remote_result_close(res);
544 }
545 
546 /*
547  * Connect to do bootstrapping.
548  *
549  * We iterate through the list of databases and try to connect to so we can
550  * bootstrap the data node.
551  */
552 static TSConnection *
connect_for_bootstrapping(const char * node_name,const char * const host,int32 port,const char * username,const char * password)553 connect_for_bootstrapping(const char *node_name, const char *const host, int32 port,
554 						  const char *username, const char *password)
555 {
556 	TSConnection *conn = NULL;
557 	char *err = NULL;
558 	int i;
559 
560 	for (i = 0; i < lengthof(bootstrap_databases); i++)
561 	{
562 		List *node_options =
563 			create_data_node_options(host, port, bootstrap_databases[i], username, password);
564 		conn = remote_connection_open_with_options_nothrow(node_name, node_options, &err);
565 
566 		if (conn)
567 			return conn;
568 	}
569 
570 	ereport(ERROR,
571 			(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
572 			 errmsg("could not connect to \"%s\"", node_name),
573 			 err == NULL ? 0 : errdetail("%s", err)));
574 
575 	pg_unreachable();
576 
577 	return NULL;
578 }
579 
580 /*
581  * Validate that compatible extension is available on the data node.
582  *
583  * We check all available extension versions. Since we are connected to
584  * template DB when performing this check, it means we can't
585  * really tell if a compatible extension is installed in the database we
586  * are trying to add to the cluster. However we can make sure that a user
587  * will be able to manually upgrade the extension on the data node if needed.
588  *
589  * Will abort with error if there is no compatible version available, otherwise do nothing.
590  */
591 static void
data_node_validate_extension_availability(TSConnection * conn)592 data_node_validate_extension_availability(TSConnection *conn)
593 {
594 	StringInfo concat_versions = makeStringInfo();
595 	bool compatible = false;
596 	PGresult *res;
597 	int i;
598 
599 	res =
600 		remote_connection_execf(conn,
601 								"SELECT version FROM pg_available_extension_versions WHERE name = "
602 								"%s AND version ~ '\\d+.\\d+.\\d+.*' ORDER BY version DESC",
603 								quote_literal_cstr(EXTENSION_NAME));
604 
605 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
606 		ereport(ERROR,
607 				(errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res))));
608 
609 	if (PQntuples(res) == 0)
610 		ereport(ERROR,
611 				(errcode(ERRCODE_TS_DATA_NODE_INVALID_CONFIG),
612 				 errmsg("TimescaleDB extension not available on remote PostgreSQL instance"),
613 				 errhint("Install the TimescaleDB extension on the remote PostgresSQL instance.")));
614 
615 	Assert(PQnfields(res) == 1);
616 
617 	for (i = 0; i < PQntuples(res); i++)
618 	{
619 		bool old_version = false;
620 
621 		appendStringInfo(concat_versions, "%s, ", PQgetvalue(res, i, 0));
622 		compatible = dist_util_is_compatible_version(PQgetvalue(res, i, 0),
623 													 TIMESCALEDB_VERSION,
624 													 &old_version);
625 		if (compatible)
626 			break;
627 	}
628 
629 	if (!compatible)
630 		ereport(ERROR,
631 				(errcode(ERRCODE_TS_DATA_NODE_INVALID_CONFIG),
632 				 errmsg("remote PostgreSQL instance has an incompatible timescaledb extension "
633 						"version"),
634 				 errdetail_internal("Access node version: %s, available remote versions: %s.",
635 									TIMESCALEDB_VERSION_MOD,
636 									concat_versions->data)));
637 }
638 
639 /**
640  * Get the configured server port for the server as an integer.
641  *
642  * Returns:
643  *   Port number if a port is configured, -1 if it is not able to get
644  *   the port number.
645  *
646  * Note:
647  *   We cannot use `inet_server_port()` since that will return NULL if
648  *   connecting to a server on localhost since a UNIX socket will be
649  *   used. This is the case even if explicitly using a port when
650  *   connecting. Regardless of how the user connected, we want to use the same
651  *   port as the one that the server listens on.
652  */
653 static int32
get_server_port()654 get_server_port()
655 {
656 	const char *const portstr =
657 		GetConfigOption("port", /* missing_ok= */ false, /* restrict_privileged= */ false);
658 	return pg_atoi(portstr, sizeof(int32), 0);
659 }
660 
661 /* set_distid may need to be false for some otherwise invalid configurations
662  * that are useful for testing */
663 static Datum
data_node_add_internal(PG_FUNCTION_ARGS,bool set_distid)664 data_node_add_internal(PG_FUNCTION_ARGS, bool set_distid)
665 {
666 	Oid userid = GetUserId();
667 	const char *username = GetUserNameFromId(userid, false);
668 	const char *node_name = PG_ARGISNULL(0) ? NULL : PG_GETARG_CSTRING(0);
669 	const char *host = PG_ARGISNULL(1) ? NULL : TextDatumGetCString(PG_GETARG_DATUM(1));
670 	const char *dbname = PG_ARGISNULL(2) ? get_database_name(MyDatabaseId) : PG_GETARG_CSTRING(2);
671 	int32 port = PG_ARGISNULL(3) ? get_server_port() : PG_GETARG_INT32(3);
672 	bool if_not_exists = PG_ARGISNULL(4) ? false : PG_GETARG_BOOL(4);
673 	bool bootstrap = PG_ARGISNULL(5) ? true : PG_GETARG_BOOL(5);
674 	const char *password = PG_ARGISNULL(6) ? NULL : TextDatumGetCString(PG_GETARG_DATUM(6));
675 	bool server_created = false;
676 	bool database_created = false;
677 	bool extension_created = false;
678 	bool PG_USED_FOR_ASSERTS_ONLY result;
679 	DbInfo database;
680 
681 	TS_PREVENT_FUNC_IF_READ_ONLY();
682 
683 	namestrcpy(&database.name, dbname);
684 
685 	if (host == NULL)
686 		ereport(ERROR,
687 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
688 				 (errmsg("a host needs to be specified"),
689 				  errhint("Provide a host name or IP address of a data node to add."))));
690 
691 	if (set_distid && dist_util_membership() == DIST_MEMBER_DATA_NODE)
692 		ereport(ERROR,
693 				(errcode(ERRCODE_TS_DATA_NODE_ASSIGNMENT_ALREADY_EXISTS),
694 				 (errmsg("unable to assign data nodes from an existing distributed database"))));
695 
696 	if (NULL == node_name)
697 		ereport(ERROR,
698 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
699 				 (errmsg("data node name cannot be NULL"))));
700 
701 	if (port < 1 || port > PG_UINT16_MAX)
702 		ereport(ERROR,
703 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
704 				 (errmsg("invalid port number %d", port),
705 				  errhint("The port number must be between 1 and %u.", PG_UINT16_MAX))));
706 
707 	result = get_database_info(MyDatabaseId, &database);
708 	Assert(result);
709 
710 	/*
711 	 * Since this function creates databases on remote nodes, and CREATE DATABASE
712 	 * cannot run in a transaction block, we cannot run the function in a
713 	 * transaction block either.
714 	 */
715 	TS_PREVENT_IN_TRANSACTION_BLOCK(true);
716 
717 	/* Try to create the foreign server, or get the existing one in case of
718 	 * if_not_exists true. */
719 	if (create_foreign_server(node_name, host, port, dbname, if_not_exists))
720 	{
721 		List *node_options;
722 		TSConnection *conn;
723 
724 		server_created = true;
725 
726 		/* Make the foreign server visible in current transaction. */
727 		CommandCounterIncrement();
728 
729 		/* If bootstrapping, we check the extension availability here and
730 		 * abort if the extension is not available. We should not start
731 		 * creating databases and other cruft on the datanode unless we know
732 		 * that the extension is installed.
733 		 *
734 		 * We ensure that there is a database if we are bootstrapping. This is
735 		 * done using a separate connection since the database that is going
736 		 * to be used for the data node does not exist yet, so we cannot
737 		 * connect to it. */
738 		if (bootstrap)
739 		{
740 			TSConnection *conn =
741 				connect_for_bootstrapping(node_name, host, port, username, password);
742 			Assert(NULL != conn);
743 			data_node_validate_extension_availability(conn);
744 			database_created = data_node_bootstrap_database(conn, &database);
745 			remote_connection_close(conn);
746 		}
747 
748 		/* Connect to the database we are bootstrapping and either install the
749 		 * extension or validate that the extension is installed. The
750 		 * following statements are executed inside a transaction so that they
751 		 * can be rolled back in the event of a failure.
752 		 *
753 		 * We could use `remote_dist_txn_get_connection` here, but it is
754 		 * comparably heavy and make the code more complicated than
755 		 * necessary. Instead using a more straightforward approach here since
756 		 * we do not need 2PC support. */
757 		node_options = create_data_node_options(host, port, dbname, username, password);
758 		conn = remote_connection_open_with_options(node_name, node_options, false);
759 		Assert(NULL != conn);
760 		remote_connection_cmd_ok(conn, "BEGIN");
761 
762 		if (bootstrap)
763 			extension_created = data_node_bootstrap_extension(conn);
764 
765 		if (!database_created)
766 		{
767 			data_node_validate_database(conn, &database);
768 			data_node_validate_as_data_node(conn);
769 		}
770 
771 		if (!extension_created)
772 			data_node_validate_extension(conn);
773 
774 		/* After the node is verified or bootstrapped, we set the `dist_uuid`
775 		 * using the same connection. We skip this if clustering checks are
776 		 * disabled, which means that the `dist_uuid` is neither set nor
777 		 * checked.
778 		 *
779 		 * This is done inside a transaction so that we can roll it back if
780 		 * there are any failures. Note that any failure at this point will
781 		 * not rollback the creates above. */
782 		if (set_distid)
783 		{
784 			if (dist_util_membership() != DIST_MEMBER_ACCESS_NODE)
785 				dist_util_set_as_access_node();
786 			add_distributed_id_to_data_node(conn);
787 		}
788 
789 		/* If there were an error before, we will not reach this point to the
790 		 * transaction will be aborted when the connection is closed. */
791 		remote_connection_cmd_ok(conn, "COMMIT");
792 		remote_connection_close(conn);
793 	}
794 
795 	PG_RETURN_DATUM(create_data_node_datum(fcinfo,
796 										   node_name,
797 										   host,
798 										   port,
799 										   dbname,
800 										   server_created,
801 										   database_created,
802 										   extension_created));
803 }
804 
805 Datum
data_node_add(PG_FUNCTION_ARGS)806 data_node_add(PG_FUNCTION_ARGS)
807 {
808 	return data_node_add_internal(fcinfo, true);
809 }
810 
811 Datum
data_node_add_without_dist_id(PG_FUNCTION_ARGS)812 data_node_add_without_dist_id(PG_FUNCTION_ARGS)
813 {
814 	return data_node_add_internal(fcinfo, false);
815 }
816 
817 Datum
data_node_attach(PG_FUNCTION_ARGS)818 data_node_attach(PG_FUNCTION_ARGS)
819 {
820 	const char *node_name = PG_ARGISNULL(0) ? NULL : PG_GETARG_CSTRING(0);
821 	Oid table_id = PG_GETARG_OID(1);
822 	bool if_not_attached = PG_ARGISNULL(2) ? false : PG_GETARG_BOOL(2);
823 	bool repartition = PG_ARGISNULL(3) ? false : PG_GETARG_BOOL(3);
824 	ForeignServer *fserver;
825 	HypertableDataNode *node;
826 	Cache *hcache;
827 	Hypertable *ht;
828 	Dimension *dim;
829 	List *result;
830 	int num_nodes;
831 	ListCell *lc;
832 
833 	TS_PREVENT_FUNC_IF_READ_ONLY();
834 
835 	if (PG_ARGISNULL(1))
836 		ereport(ERROR,
837 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("hypertable cannot be NULL")));
838 	Assert(get_rel_name(table_id));
839 
840 	ht = ts_hypertable_cache_get_cache_and_entry(table_id, CACHE_FLAG_NONE, &hcache);
841 	Assert(ht != NULL);
842 
843 	if (!hypertable_is_distributed(ht))
844 		ereport(ERROR,
845 				(errcode(ERRCODE_TS_HYPERTABLE_NOT_DISTRIBUTED),
846 				 errmsg("hypertable \"%s\" is not distributed", get_rel_name(table_id))));
847 
848 	/* Must have owner permissions on the hypertable to attach a new data
849 	   node. Must also have USAGE on the foreign server.  */
850 	ts_hypertable_permissions_check(table_id, GetUserId());
851 	fserver = data_node_get_foreign_server(node_name, ACL_USAGE, true, false);
852 
853 	Assert(NULL != fserver);
854 
855 	foreach (lc, ht->data_nodes)
856 	{
857 		node = lfirst(lc);
858 
859 		if (node->foreign_server_oid == fserver->serverid)
860 		{
861 			ts_cache_release(hcache);
862 
863 			if (if_not_attached)
864 			{
865 				ereport(NOTICE,
866 						(errcode(ERRCODE_TS_DATA_NODE_ALREADY_ATTACHED),
867 						 errmsg("data node \"%s\" is already attached to hypertable \"%s\", "
868 								"skipping",
869 								node_name,
870 								get_rel_name(table_id))));
871 				PG_RETURN_DATUM(create_hypertable_data_node_datum(fcinfo, node));
872 			}
873 			else
874 				ereport(ERROR,
875 						(errcode(ERRCODE_TS_DATA_NODE_ALREADY_ATTACHED),
876 						 errmsg("data node \"%s\" is already attached to hypertable \"%s\"",
877 								node_name,
878 								get_rel_name(table_id))));
879 		}
880 	}
881 
882 	result = hypertable_assign_data_nodes(ht->fd.id, list_make1((char *) node_name));
883 	Assert(result->length == 1);
884 
885 	/* Get the first closed (space) dimension, which is the one along which we
886 	 * partition across data nodes. */
887 	dim = ts_hyperspace_get_mutable_dimension(ht->space, DIMENSION_TYPE_CLOSED, 0);
888 	num_nodes = list_length(ht->data_nodes) + 1;
889 
890 	if (num_nodes > MAX_NUM_HYPERTABLE_DATA_NODES)
891 		ereport(ERROR,
892 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
893 				 errmsg("max number of data nodes already attached"),
894 				 errdetail("The number of data nodes in a hypertable cannot exceed %d.",
895 						   MAX_NUM_HYPERTABLE_DATA_NODES)));
896 
897 	/* If there are less slices (partitions) in the space dimension than there
898 	 * are data nodes, we'd like to expand the number of slices to be able to
899 	 * make use of the new data node. */
900 	if (NULL != dim && num_nodes > dim->fd.num_slices)
901 	{
902 		if (repartition)
903 		{
904 			ts_dimension_set_number_of_slices(dim, num_nodes & 0xFFFF);
905 
906 			ereport(NOTICE,
907 					(errmsg("the number of partitions in dimension \"%s\" was increased to %u",
908 							NameStr(dim->fd.column_name),
909 							num_nodes),
910 					 errdetail("To make use of all attached data nodes, a distributed "
911 							   "hypertable needs at least as many partitions in the first "
912 							   "closed (space) dimension as there are attached data nodes.")));
913 		}
914 		else
915 		{
916 			/* Raise a warning if the number of partitions are too few to make
917 			 * use of all data nodes. Need to refresh cache first to get the
918 			 * updated data node list. */
919 			int dimension_id = dim->fd.id;
920 
921 			ts_cache_release(hcache);
922 			hcache = ts_hypertable_cache_pin();
923 			ht = ts_hypertable_cache_get_entry(hcache, table_id, CACHE_FLAG_NONE);
924 			ts_hypertable_check_partitioning(ht, dimension_id);
925 		}
926 	}
927 
928 	node = linitial(result);
929 	ts_cache_release(hcache);
930 
931 	PG_RETURN_DATUM(create_hypertable_data_node_datum(fcinfo, node));
932 }
933 
934 /* Only used for generating proper error message */
935 typedef enum OperationType
936 {
937 	OP_BLOCK,
938 	OP_DETACH,
939 	OP_DELETE
940 } OperationType;
941 
942 static void
check_replication_for_new_data(const char * node_name,Hypertable * ht,bool force,OperationType op_type)943 check_replication_for_new_data(const char *node_name, Hypertable *ht, bool force,
944 							   OperationType op_type)
945 {
946 	List *available_nodes = ts_hypertable_get_available_data_nodes(ht, false);
947 
948 	if (ht->fd.replication_factor < list_length(available_nodes))
949 		return;
950 
951 	ereport(force ? WARNING : ERROR,
952 			(errcode(ERRCODE_TS_INSUFFICIENT_NUM_DATA_NODES),
953 			 errmsg("insufficient number of data nodes for distributed hypertable \"%s\"",
954 					NameStr(ht->fd.table_name)),
955 			 errdetail("Reducing the number of available data nodes on distributed"
956 					   " hypertable \"%s\" prevents full replication of new chunks.",
957 					   NameStr(ht->fd.table_name)),
958 			 force ? 0 : errhint("Use force => true to force this operation.")));
959 }
960 
961 static bool
data_node_contains_non_replicated_chunks(List * chunk_data_nodes)962 data_node_contains_non_replicated_chunks(List *chunk_data_nodes)
963 {
964 	ListCell *lc;
965 
966 	foreach (lc, chunk_data_nodes)
967 	{
968 		ChunkDataNode *cdn = lfirst(lc);
969 		List *replicas =
970 			ts_chunk_data_node_scan_by_chunk_id(cdn->fd.chunk_id, CurrentMemoryContext);
971 
972 		if (list_length(replicas) < 2)
973 			return true;
974 	}
975 
976 	return false;
977 }
978 
979 static List *
data_node_detach_or_delete_validate(const char * node_name,Hypertable * ht,bool force,OperationType op_type)980 data_node_detach_or_delete_validate(const char *node_name, Hypertable *ht, bool force,
981 									OperationType op_type)
982 {
983 	List *chunk_data_nodes =
984 		ts_chunk_data_node_scan_by_node_name_and_hypertable_id(node_name,
985 															   ht->fd.id,
986 															   CurrentMemoryContext);
987 	bool has_non_replicated_chunks = data_node_contains_non_replicated_chunks(chunk_data_nodes);
988 
989 	Assert(op_type == OP_DELETE || op_type == OP_DETACH);
990 
991 	if (has_non_replicated_chunks)
992 		ereport(ERROR,
993 				(errcode(ERRCODE_TS_INSUFFICIENT_NUM_DATA_NODES),
994 				 errmsg("insufficient number of data nodes"),
995 				 errdetail("Distributed hypertable \"%s\" would lose data if"
996 						   " data node \"%s\" is %s.",
997 						   NameStr(ht->fd.table_name),
998 						   node_name,
999 						   (op_type == OP_DELETE) ? "deleted" : "detached"),
1000 				 errhint("Ensure all chunks on the data node are fully replicated before %s it.",
1001 						 (op_type == OP_DELETE) ? "deleting" : "detaching")));
1002 
1003 	if (list_length(chunk_data_nodes) > 0)
1004 	{
1005 		if (force)
1006 			ereport(WARNING,
1007 					(errcode(ERRCODE_TS_INSUFFICIENT_NUM_DATA_NODES),
1008 					 errmsg("distributed hypertable \"%s\" is under-replicated",
1009 							NameStr(ht->fd.table_name)),
1010 					 errdetail("Some chunks no longer meet the replication target"
1011 							   " after %s data node \"%s\".",
1012 							   (op_type == OP_DELETE) ? "deleting" : "detaching",
1013 							   node_name)));
1014 		else
1015 			ereport(ERROR,
1016 					(errcode(ERRCODE_TS_DATA_NODE_IN_USE),
1017 					 errmsg("data node \"%s\" still holds data for distributed hypertable \"%s\"",
1018 							node_name,
1019 							NameStr(ht->fd.table_name))));
1020 	}
1021 
1022 	check_replication_for_new_data(node_name, ht, force, op_type);
1023 
1024 	return chunk_data_nodes;
1025 }
1026 
1027 static int
data_node_modify_hypertable_data_nodes(const char * node_name,List * hypertable_data_nodes,bool all_hypertables,OperationType op_type,bool block_chunks,bool force,bool repartition)1028 data_node_modify_hypertable_data_nodes(const char *node_name, List *hypertable_data_nodes,
1029 									   bool all_hypertables, OperationType op_type,
1030 									   bool block_chunks, bool force, bool repartition)
1031 {
1032 	Cache *hcache = ts_hypertable_cache_pin();
1033 	ListCell *lc;
1034 	int removed = 0;
1035 
1036 	foreach (lc, hypertable_data_nodes)
1037 	{
1038 		HypertableDataNode *node = lfirst(lc);
1039 		Oid relid = ts_hypertable_id_to_relid(node->fd.hypertable_id);
1040 		Hypertable *ht = ts_hypertable_cache_get_entry_by_id(hcache, node->fd.hypertable_id);
1041 		bool has_privs = ts_hypertable_has_privs_of(relid, GetUserId());
1042 
1043 		Assert(ht != NULL);
1044 
1045 		if (!has_privs)
1046 		{
1047 			/* If the operation is OP_DELETE, we MUST be able to detach the data
1048 			 * node from ALL tables since the foreign server object will be
1049 			 * deleted. Therefore, we fail the operation if we find a table
1050 			 * that we don't have owner permissions on in this case. */
1051 			if (all_hypertables && op_type != OP_DELETE)
1052 				ereport(NOTICE,
1053 						(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1054 						 errmsg("skipping hypertable \"%s\" due to missing permissions",
1055 								get_rel_name(relid))));
1056 			else
1057 				ereport(ERROR,
1058 						(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1059 						 errmsg("permission denied for hypertable \"%s\"", get_rel_name(relid)),
1060 						 errdetail("The data node is attached to hypertables that the current "
1061 								   "user lacks permissions for.")));
1062 		}
1063 		else if (op_type == OP_DETACH || op_type == OP_DELETE)
1064 		{
1065 			/* we have permissions to detach */
1066 			List *chunk_data_nodes =
1067 				data_node_detach_or_delete_validate(NameStr(node->fd.node_name),
1068 													ht,
1069 													force,
1070 													op_type);
1071 			ListCell *cs_lc;
1072 
1073 			/* update chunk foreign table server and delete chunk mapping */
1074 			foreach (cs_lc, chunk_data_nodes)
1075 			{
1076 				ChunkDataNode *cdn = lfirst(cs_lc);
1077 
1078 				chunk_update_foreign_server_if_needed(cdn->fd.chunk_id, cdn->foreign_server_oid);
1079 				ts_chunk_data_node_delete_by_chunk_id_and_node_name(cdn->fd.chunk_id,
1080 																	NameStr(cdn->fd.node_name));
1081 			}
1082 
1083 			/* delete hypertable mapping */
1084 			removed +=
1085 				ts_hypertable_data_node_delete_by_node_name_and_hypertable_id(node_name, ht->fd.id);
1086 
1087 			if (repartition)
1088 			{
1089 				Dimension *dim =
1090 					ts_hyperspace_get_mutable_dimension(ht->space, DIMENSION_TYPE_CLOSED, 0);
1091 				int num_nodes = list_length(ht->data_nodes) - 1;
1092 
1093 				if (dim != NULL && num_nodes < dim->fd.num_slices && num_nodes > 0)
1094 				{
1095 					ts_dimension_set_number_of_slices(dim, num_nodes & 0xFFFF);
1096 
1097 					ereport(NOTICE,
1098 							(errmsg("the number of partitions in dimension \"%s\" was decreased to "
1099 									"%u",
1100 									NameStr(dim->fd.column_name),
1101 									num_nodes),
1102 							 errdetail(
1103 								 "To make efficient use of all attached data nodes, the number of "
1104 								 "space partitions was set to match the number of data nodes.")));
1105 				}
1106 			}
1107 		}
1108 		else
1109 		{
1110 			/*  set block new chunks */
1111 			if (block_chunks)
1112 			{
1113 				if (node->fd.block_chunks)
1114 				{
1115 					elog(NOTICE,
1116 						 "new chunks already blocked on data node \"%s\" for"
1117 						 " hypertable \"%s\"",
1118 						 NameStr(node->fd.node_name),
1119 						 get_rel_name(relid));
1120 					continue;
1121 				}
1122 
1123 				check_replication_for_new_data(node_name, ht, force, OP_BLOCK);
1124 			}
1125 			node->fd.block_chunks = block_chunks;
1126 			removed += ts_hypertable_data_node_update(node);
1127 		}
1128 	}
1129 	ts_cache_release(hcache);
1130 	return removed;
1131 }
1132 
1133 static int
data_node_block_hypertable_data_nodes(const char * node_name,List * hypertable_data_nodes,bool all_hypertables,bool block_chunks,bool force)1134 data_node_block_hypertable_data_nodes(const char *node_name, List *hypertable_data_nodes,
1135 									  bool all_hypertables, bool block_chunks, bool force)
1136 {
1137 	return data_node_modify_hypertable_data_nodes(node_name,
1138 												  hypertable_data_nodes,
1139 												  all_hypertables,
1140 												  OP_BLOCK,
1141 												  block_chunks,
1142 												  force,
1143 												  false);
1144 }
1145 
1146 static int
data_node_detach_hypertable_data_nodes(const char * node_name,List * hypertable_data_nodes,bool all_hypertables,bool force,bool repartition,OperationType op_type)1147 data_node_detach_hypertable_data_nodes(const char *node_name, List *hypertable_data_nodes,
1148 									   bool all_hypertables, bool force, bool repartition,
1149 									   OperationType op_type)
1150 {
1151 	return data_node_modify_hypertable_data_nodes(node_name,
1152 												  hypertable_data_nodes,
1153 												  all_hypertables,
1154 												  op_type,
1155 												  false,
1156 												  force,
1157 												  repartition);
1158 }
1159 
1160 HypertableDataNode *
data_node_hypertable_get_by_node_name(const Hypertable * ht,const char * node_name,bool attach_check)1161 data_node_hypertable_get_by_node_name(const Hypertable *ht, const char *node_name,
1162 									  bool attach_check)
1163 {
1164 	HypertableDataNode *hdn = NULL;
1165 	ListCell *lc;
1166 
1167 	if (!hypertable_is_distributed(ht))
1168 		ereport(ERROR,
1169 				(errcode(ERRCODE_TS_HYPERTABLE_NOT_DISTRIBUTED),
1170 				 errmsg("hypertable \"%s\" is not distributed",
1171 						get_rel_name(ht->main_table_relid))));
1172 
1173 	foreach (lc, ht->data_nodes)
1174 	{
1175 		hdn = lfirst(lc);
1176 		if (namestrcmp(&hdn->fd.node_name, node_name) == 0)
1177 			break;
1178 		else
1179 			hdn = NULL;
1180 	}
1181 
1182 	if (hdn == NULL)
1183 	{
1184 		if (attach_check)
1185 			ereport(ERROR,
1186 					(errcode(ERRCODE_TS_DATA_NODE_NOT_ATTACHED),
1187 					 errmsg("data node \"%s\" is not attached to hypertable \"%s\"",
1188 							node_name,
1189 							get_rel_name(ht->main_table_relid))));
1190 		else
1191 			ereport(NOTICE,
1192 					(errcode(ERRCODE_TS_DATA_NODE_NOT_ATTACHED),
1193 					 errmsg("data node \"%s\" is not attached to hypertable \"%s\", "
1194 							"skipping",
1195 							node_name,
1196 							get_rel_name(ht->main_table_relid))));
1197 	}
1198 
1199 	return hdn;
1200 }
1201 
1202 static HypertableDataNode *
get_hypertable_data_node(Oid table_id,const char * node_name,bool owner_check,bool attach_check)1203 get_hypertable_data_node(Oid table_id, const char *node_name, bool owner_check, bool attach_check)
1204 {
1205 	HypertableDataNode *hdn = NULL;
1206 	Cache *hcache = ts_hypertable_cache_pin();
1207 	const Hypertable *ht = ts_hypertable_cache_get_entry(hcache, table_id, CACHE_FLAG_NONE);
1208 
1209 	if (owner_check)
1210 		ts_hypertable_permissions_check(table_id, GetUserId());
1211 
1212 	hdn = data_node_hypertable_get_by_node_name(ht, node_name, attach_check);
1213 
1214 	ts_cache_release(hcache);
1215 
1216 	return hdn;
1217 }
1218 
1219 static Datum
data_node_block_or_allow_new_chunks(const char * node_name,Oid const table_id,bool force,bool block_chunks)1220 data_node_block_or_allow_new_chunks(const char *node_name, Oid const table_id, bool force,
1221 									bool block_chunks)
1222 {
1223 	int affected = 0;
1224 	bool all_hypertables = table_id == InvalidOid ? true : false;
1225 	List *hypertable_data_nodes = NIL;
1226 	ForeignServer *server = data_node_get_foreign_server(node_name, ACL_USAGE, true, false);
1227 
1228 	Assert(NULL != server);
1229 
1230 	if (OidIsValid(table_id))
1231 	{
1232 		/* Early abort on missing hypertable permissions */
1233 		ts_hypertable_permissions_check(table_id, GetUserId());
1234 		hypertable_data_nodes =
1235 			list_make1(get_hypertable_data_node(table_id, server->servername, true, true));
1236 	}
1237 	else
1238 	{
1239 		/* block or allow for all hypertables */
1240 		hypertable_data_nodes =
1241 			ts_hypertable_data_node_scan_by_node_name(server->servername, CurrentMemoryContext);
1242 	}
1243 
1244 	affected = data_node_block_hypertable_data_nodes(server->servername,
1245 													 hypertable_data_nodes,
1246 													 all_hypertables,
1247 													 block_chunks,
1248 													 force);
1249 	return Int32GetDatum(affected);
1250 }
1251 
1252 Datum
data_node_allow_new_chunks(PG_FUNCTION_ARGS)1253 data_node_allow_new_chunks(PG_FUNCTION_ARGS)
1254 {
1255 	const char *node_name = PG_ARGISNULL(0) ? NULL : NameStr(*PG_GETARG_NAME(0));
1256 	Oid table_id = PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1);
1257 
1258 	TS_PREVENT_FUNC_IF_READ_ONLY();
1259 
1260 	return data_node_block_or_allow_new_chunks(node_name, table_id, false, false);
1261 }
1262 
1263 Datum
data_node_block_new_chunks(PG_FUNCTION_ARGS)1264 data_node_block_new_chunks(PG_FUNCTION_ARGS)
1265 {
1266 	const char *node_name = PG_ARGISNULL(0) ? NULL : NameStr(*PG_GETARG_NAME(0));
1267 	Oid table_id = PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1);
1268 	bool force = PG_ARGISNULL(2) ? false : PG_GETARG_BOOL(2);
1269 
1270 	TS_PREVENT_FUNC_IF_READ_ONLY();
1271 
1272 	return data_node_block_or_allow_new_chunks(node_name, table_id, force, true);
1273 }
1274 
1275 Datum
data_node_detach(PG_FUNCTION_ARGS)1276 data_node_detach(PG_FUNCTION_ARGS)
1277 {
1278 	const char *node_name = PG_ARGISNULL(0) ? NULL : NameStr(*PG_GETARG_NAME(0));
1279 	Oid table_id = PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1);
1280 	bool all_hypertables = PG_ARGISNULL(1);
1281 	bool if_attached = PG_ARGISNULL(2) ? false : PG_GETARG_BOOL(2);
1282 	bool force = PG_ARGISNULL(3) ? InvalidOid : PG_GETARG_OID(3);
1283 	bool repartition = PG_ARGISNULL(4) ? false : PG_GETARG_BOOL(4);
1284 	int removed = 0;
1285 	List *hypertable_data_nodes = NIL;
1286 	ForeignServer *server;
1287 
1288 	TS_PREVENT_FUNC_IF_READ_ONLY();
1289 
1290 	server = data_node_get_foreign_server(node_name, ACL_USAGE, true, false);
1291 	Assert(NULL != server);
1292 
1293 	if (OidIsValid(table_id))
1294 	{
1295 		HypertableDataNode *node;
1296 
1297 		/* Early abort on missing hypertable permissions */
1298 		ts_hypertable_permissions_check(table_id, GetUserId());
1299 
1300 		node = get_hypertable_data_node(table_id, server->servername, true, !if_attached);
1301 		if (node)
1302 			hypertable_data_nodes = list_make1(node);
1303 	}
1304 	else
1305 	{
1306 		/* Detach data node for all hypertables where user has
1307 		 * permissions. Permissions checks done in
1308 		 * data_node_detach_hypertable_data_nodes().  */
1309 		hypertable_data_nodes =
1310 			ts_hypertable_data_node_scan_by_node_name(server->servername, CurrentMemoryContext);
1311 	}
1312 
1313 	removed = data_node_detach_hypertable_data_nodes(server->servername,
1314 													 hypertable_data_nodes,
1315 													 all_hypertables,
1316 													 force,
1317 													 repartition,
1318 													 OP_DETACH);
1319 
1320 	PG_RETURN_INT32(removed);
1321 }
1322 
1323 Datum
data_node_delete(PG_FUNCTION_ARGS)1324 data_node_delete(PG_FUNCTION_ARGS)
1325 {
1326 	const char *node_name = PG_ARGISNULL(0) ? NULL : PG_GETARG_CSTRING(0);
1327 	bool if_exists = PG_ARGISNULL(1) ? false : PG_GETARG_BOOL(1);
1328 	bool force = PG_ARGISNULL(2) ? false : PG_GETARG_BOOL(2);
1329 	bool repartition = PG_ARGISNULL(3) ? false : PG_GETARG_BOOL(3);
1330 	List *hypertable_data_nodes = NIL;
1331 	DropStmt stmt;
1332 	ObjectAddress address;
1333 	ObjectAddress secondary_object = {
1334 		.classId = InvalidOid,
1335 		.objectId = InvalidOid,
1336 		.objectSubId = 0,
1337 	};
1338 	Node *parsetree = NULL;
1339 	TSConnectionId cid;
1340 	ForeignServer *server;
1341 
1342 	TS_PREVENT_FUNC_IF_READ_ONLY();
1343 
1344 	/* Need USAGE to detach. Further owner check done when executing the DROP
1345 	 * statement. */
1346 	server = data_node_get_foreign_server(node_name, ACL_USAGE, true, if_exists);
1347 
1348 	Assert(server == NULL ? if_exists : true);
1349 
1350 	if (NULL == server)
1351 	{
1352 		elog(NOTICE, "data node \"%s\" does not exist, skipping", node_name);
1353 		PG_RETURN_BOOL(false);
1354 	}
1355 
1356 	/* close any pending connections */
1357 	remote_connection_id_set(&cid, server->serverid, GetUserId());
1358 	remote_connection_cache_remove(cid);
1359 
1360 	/* detach data node */
1361 	hypertable_data_nodes =
1362 		ts_hypertable_data_node_scan_by_node_name(node_name, CurrentMemoryContext);
1363 
1364 	data_node_detach_hypertable_data_nodes(node_name,
1365 										   hypertable_data_nodes,
1366 										   true,
1367 										   force,
1368 										   repartition,
1369 										   OP_DELETE);
1370 
1371 	/* clean up persistent transaction records */
1372 	remote_txn_persistent_record_delete_for_data_node(server->serverid);
1373 
1374 	stmt = (DropStmt){
1375 		.type = T_DropStmt,
1376 		.objects = list_make1(makeString(pstrdup(node_name))),
1377 		.removeType = OBJECT_FOREIGN_SERVER,
1378 		.behavior = DROP_RESTRICT,
1379 		.missing_ok = if_exists,
1380 	};
1381 
1382 	parsetree = (Node *) &stmt;
1383 
1384 	/* Make sure event triggers are invoked so that all dropped objects
1385 	 * are collected during a cascading drop. This ensures all dependent
1386 	 * objects get cleaned up. */
1387 	EventTriggerBeginCompleteQuery();
1388 
1389 	PG_TRY();
1390 	{
1391 		ObjectAddressSet(address, ForeignServerRelationId, server->serverid);
1392 		EventTriggerDDLCommandStart(parsetree);
1393 		RemoveObjects(&stmt);
1394 		EventTriggerCollectSimpleCommand(address, secondary_object, parsetree);
1395 		EventTriggerSQLDrop(parsetree);
1396 		EventTriggerDDLCommandEnd(parsetree);
1397 	}
1398 	PG_CATCH();
1399 	{
1400 		EventTriggerEndCompleteQuery();
1401 		PG_RE_THROW();
1402 	}
1403 	PG_END_TRY();
1404 
1405 	/* Remove self from dist db if no longer have data_nodes */
1406 	if (data_node_get_node_name_list() == NIL)
1407 		dist_util_remove_from_db();
1408 
1409 	EventTriggerEndCompleteQuery();
1410 	CommandCounterIncrement();
1411 	CacheInvalidateRelcacheByRelid(ForeignServerRelationId);
1412 
1413 	PG_RETURN_BOOL(true);
1414 }
1415 
1416 /*
1417  * Get server list, performing an ACL check on each of them in the process.
1418  */
1419 List *
data_node_get_node_name_list_with_aclcheck(AclMode mode,bool fail_on_aclcheck)1420 data_node_get_node_name_list_with_aclcheck(AclMode mode, bool fail_on_aclcheck)
1421 {
1422 	HeapTuple tuple;
1423 	ScanKeyData scankey[1];
1424 	SysScanDesc scandesc;
1425 	Relation rel;
1426 	ForeignDataWrapper *fdw = GetForeignDataWrapperByName(EXTENSION_FDW_NAME, false);
1427 	List *nodes = NIL;
1428 
1429 	rel = table_open(ForeignServerRelationId, AccessShareLock);
1430 
1431 	ScanKeyInit(&scankey[0],
1432 				Anum_pg_foreign_server_srvfdw,
1433 				BTEqualStrategyNumber,
1434 				F_OIDEQ,
1435 				ObjectIdGetDatum(fdw->fdwid));
1436 
1437 	scandesc = systable_beginscan(rel, InvalidOid, false, NULL, 1, scankey);
1438 
1439 	while (HeapTupleIsValid(tuple = systable_getnext(scandesc)))
1440 	{
1441 		Form_pg_foreign_server form = (Form_pg_foreign_server) GETSTRUCT(tuple);
1442 		ForeignServer *server;
1443 
1444 		server =
1445 			data_node_get_foreign_server(NameStr(form->srvname), mode, fail_on_aclcheck, false);
1446 
1447 		if (server != NULL)
1448 			nodes = lappend(nodes, pstrdup(NameStr(form->srvname)));
1449 	}
1450 
1451 	systable_endscan(scandesc);
1452 	table_close(rel, AccessShareLock);
1453 
1454 	return nodes;
1455 }
1456 
1457 /*
1458  * Get server list with optional ACL check.
1459  *
1460  * Returns:
1461  *
1462  * If nodearr is NULL, returns all system-configured data nodes that fulfill
1463  * the ACL check.
1464  *
1465  * If nodearr is non-NULL, returns all the data nodes in the specified array
1466  * subject to ACL checks.
1467  */
1468 List *
data_node_get_filtered_node_name_list(ArrayType * nodearr,AclMode mode,bool fail_on_aclcheck)1469 data_node_get_filtered_node_name_list(ArrayType *nodearr, AclMode mode, bool fail_on_aclcheck)
1470 {
1471 	ArrayIterator it;
1472 	Datum node_datum;
1473 	bool isnull;
1474 	List *nodes = NIL;
1475 
1476 	if (NULL == nodearr)
1477 		return data_node_get_node_name_list_with_aclcheck(mode, fail_on_aclcheck);
1478 
1479 	it = array_create_iterator(nodearr, 0, NULL);
1480 
1481 	while (array_iterate(it, &node_datum, &isnull))
1482 	{
1483 		if (!isnull)
1484 		{
1485 			const char *node_name = DatumGetCString(node_datum);
1486 			ForeignServer *server =
1487 				data_node_get_foreign_server(node_name, mode, fail_on_aclcheck, false);
1488 
1489 			if (NULL != server)
1490 				nodes = lappend(nodes, server->servername);
1491 		}
1492 	}
1493 
1494 	array_free_iterator(it);
1495 
1496 	return nodes;
1497 }
1498 
1499 List *
data_node_get_node_name_list(void)1500 data_node_get_node_name_list(void)
1501 {
1502 	return data_node_get_node_name_list_with_aclcheck(ACL_NO_CHECK, false);
1503 }
1504 
1505 /*
1506  * Turn an array of data nodes into a list of names.
1507  *
1508  * The function will verify that all the servers in the list belong to the
1509  * TimescaleDB foreign data wrapper. Optionally, perform ACL check on each
1510  * data node's foreign server. Checks are skipped when specificing
1511  * ACL_NO_CHECK. If fail_on_aclcheck is false, then no errors will be thrown
1512  * on ACL check failures. Instead, data nodes that fail ACL checks will simply
1513  * be filtered.
1514  */
1515 List *
data_node_array_to_node_name_list_with_aclcheck(ArrayType * nodearr,AclMode mode,bool fail_on_aclcheck)1516 data_node_array_to_node_name_list_with_aclcheck(ArrayType *nodearr, AclMode mode,
1517 												bool fail_on_aclcheck)
1518 {
1519 	if (NULL == nodearr)
1520 		return NIL;
1521 
1522 	Assert(ARR_NDIM(nodearr) <= 1);
1523 
1524 	return data_node_get_filtered_node_name_list(nodearr, mode, fail_on_aclcheck);
1525 }
1526 
1527 List *
data_node_array_to_node_name_list(ArrayType * nodearr)1528 data_node_array_to_node_name_list(ArrayType *nodearr)
1529 {
1530 	return data_node_array_to_node_name_list_with_aclcheck(nodearr, ACL_NO_CHECK, false);
1531 }
1532 
1533 Datum
data_node_ping(PG_FUNCTION_ARGS)1534 data_node_ping(PG_FUNCTION_ARGS)
1535 {
1536 	const char *node_name = PG_ARGISNULL(0) ? NULL : PG_GETARG_CSTRING(0);
1537 	/* Allow anyone to ping a data node. Otherwise the
1538 	 * timescaledb_information.data_node view won't work for those users. */
1539 	ForeignServer *server = data_node_get_foreign_server(node_name, ACL_NO_CHECK, false, false);
1540 	bool success;
1541 
1542 	Assert(NULL != server);
1543 
1544 	success = remote_connection_ping(server->servername);
1545 
1546 	PG_RETURN_DATUM(BoolGetDatum(success));
1547 }
1548 
1549 List *
data_node_oids_to_node_name_list(List * data_node_oids,AclMode mode)1550 data_node_oids_to_node_name_list(List *data_node_oids, AclMode mode)
1551 {
1552 	List *node_names = NIL;
1553 	ListCell *lc;
1554 	ForeignServer *fs;
1555 
1556 	foreach (lc, data_node_oids)
1557 	{
1558 		Oid foreign_server_oid = lfirst_oid(lc);
1559 		fs = data_node_get_foreign_server_by_oid(foreign_server_oid, mode);
1560 		node_names = lappend(node_names, pstrdup(fs->servername));
1561 	}
1562 
1563 	return node_names;
1564 }
1565 
1566 void
data_node_name_list_check_acl(List * data_node_names,AclMode mode)1567 data_node_name_list_check_acl(List *data_node_names, AclMode mode)
1568 {
1569 	AclResult aclresult;
1570 	Oid curuserid;
1571 	ListCell *lc;
1572 
1573 	if (data_node_names == NIL)
1574 		return;
1575 
1576 	curuserid = GetUserId();
1577 
1578 	foreach (lc, data_node_names)
1579 	{
1580 		/* Validate the servers, but privilege check is optional */
1581 		ForeignServer *server = GetForeignServerByName(lfirst(lc), false);
1582 
1583 		if (mode != ACL_NO_CHECK)
1584 		{
1585 			/* Must have permissions on the server object */
1586 			aclresult = pg_foreign_server_aclcheck(server->serverid, curuserid, mode);
1587 
1588 			if (aclresult != ACLCHECK_OK)
1589 				aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername);
1590 		}
1591 	}
1592 }
1593