1 /*
2 * This file and its contents are licensed under the Timescale License.
3 * Please see the included NOTICE for copyright information and
4 * LICENSE-TIMESCALE for a copy of the license.
5 */
6 #include <postgres.h>
7
8 #include <access/htup_details.h>
9 #include <access/xact.h>
10 #include <catalog.h>
11 #include <catalog/namespace.h>
12 #include <catalog/pg_database.h>
13 #include <catalog/pg_foreign_server.h>
14 #include <catalog/pg_inherits.h>
15 #include <catalog/pg_namespace.h>
16 #include <chunk_data_node.h>
17 #include <commands/dbcommands.h>
18 #include <commands/defrem.h>
19 #include <commands/event_trigger.h>
20 #include <compat/compat.h>
21 #include <extension.h>
22 #include <funcapi.h>
23 #include <hypertable_data_node.h>
24 #include <libpq/crypt.h>
25 #include <miscadmin.h>
26 #include <nodes/makefuncs.h>
27 #include <nodes/parsenodes.h>
28 #include <utils/acl.h>
29 #include <utils/builtins.h>
30 #include <utils/builtins.h>
31 #include <utils/guc.h>
32 #include <utils/inval.h>
33 #include <utils/syscache.h>
34
35 #include "config.h"
36 #include "extension.h"
37 #include "fdw/fdw.h"
38 #include "remote/async.h"
39 #include "remote/connection.h"
40 #include "remote/connection_cache.h"
41 #include "data_node.h"
42 #include "remote/utils.h"
43 #include "hypertable_cache.h"
44 #include "errors.h"
45 #include "dist_util.h"
46 #include "utils/uuid.h"
47 #include "mb/pg_wchar.h"
48 #include "chunk.h"
49
50 #define TS_DEFAULT_POSTGRES_PORT 5432
51 #define TS_DEFAULT_POSTGRES_HOST "localhost"
52
53 #define ERRCODE_DUPLICATE_DATABASE_STR "42P04"
54 #define ERRCODE_DUPLICATE_SCHEMA_STR "42P06"
55
56 typedef struct DbInfo
57 {
58 NameData name;
59 int32 encoding;
60 NameData chartype;
61 NameData collation;
62 } DbInfo;
63
64 /* A list of databases we try to connect to when bootstrapping a data node */
65 static const char *bootstrap_databases[] = { "postgres", "template1", "defaultdb" };
66
67 static bool data_node_validate_database(TSConnection *conn, const DbInfo *database);
68
69 /*
70 * get_database_info - given a database OID, look up info about the database
71 *
72 * Returns:
73 * True if a record for the OID was found, false otherwise.
74 */
75 static bool
get_database_info(Oid dbid,DbInfo * database)76 get_database_info(Oid dbid, DbInfo *database)
77 {
78 HeapTuple dbtuple;
79 Form_pg_database dbrecord;
80
81 dbtuple = SearchSysCache1(DATABASEOID, ObjectIdGetDatum(dbid));
82
83 if (!HeapTupleIsValid(dbtuple))
84 return false;
85
86 dbrecord = (Form_pg_database) GETSTRUCT(dbtuple);
87
88 database->encoding = dbrecord->encoding;
89 database->collation = dbrecord->datcollate;
90 database->chartype = dbrecord->datctype;
91
92 ReleaseSysCache(dbtuple);
93 return true;
94 }
95
96 /*
97 * Verify that server is TimescaleDB data node and perform optional ACL check.
98 *
99 * The function returns true iif the server is valid TimescaleDB data node and
100 * the ACL check succeeds. Otherwise, false is returned, or, an error is thrown
101 * if fail_on_aclcheck is set to true.
102 */
103 static bool
validate_foreign_server(const ForeignServer * server,AclMode const mode,bool fail_on_aclcheck)104 validate_foreign_server(const ForeignServer *server, AclMode const mode, bool fail_on_aclcheck)
105 {
106 Oid const fdwid = get_foreign_data_wrapper_oid(EXTENSION_FDW_NAME, false);
107 Oid curuserid = GetUserId();
108 AclResult aclresult;
109 bool valid;
110
111 Assert(NULL != server);
112 if (server->fdwid != fdwid)
113 ereport(ERROR,
114 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
115 errmsg("data node \"%s\" is not a TimescaleDB server", server->servername)));
116
117 if (mode == ACL_NO_CHECK)
118 return true;
119
120 /* Must have permissions on the server object */
121 aclresult = pg_foreign_server_aclcheck(server->serverid, curuserid, mode);
122
123 valid = (aclresult == ACLCHECK_OK);
124
125 if (!valid && fail_on_aclcheck)
126 aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername);
127
128 return valid;
129 }
130
131 /*
132 * Lookup the foreign server by name
133 */
134 ForeignServer *
data_node_get_foreign_server(const char * node_name,AclMode mode,bool fail_on_aclcheck,bool missing_ok)135 data_node_get_foreign_server(const char *node_name, AclMode mode, bool fail_on_aclcheck,
136 bool missing_ok)
137 {
138 ForeignServer *server;
139 bool valid;
140
141 if (node_name == NULL)
142 ereport(ERROR,
143 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
144 errmsg("data node name cannot be NULL")));
145
146 server = GetForeignServerByName(node_name, missing_ok);
147 if (NULL == server)
148 return NULL;
149
150 valid = validate_foreign_server(server, mode, fail_on_aclcheck);
151
152 if (mode != ACL_NO_CHECK && !valid)
153 return NULL;
154
155 return server;
156 }
157
158 ForeignServer *
data_node_get_foreign_server_by_oid(Oid server_oid,AclMode mode)159 data_node_get_foreign_server_by_oid(Oid server_oid, AclMode mode)
160 {
161 ForeignServer *server = GetForeignServer(server_oid);
162 bool PG_USED_FOR_ASSERTS_ONLY valid = validate_foreign_server(server, mode, true);
163 Assert(valid); /* Should always be valid since we should see error otherwise */
164 return server;
165 }
166
167 /*
168 * Create a foreign server.
169 *
170 * Returns true if the server was created and set the `oid` to the server oid.
171 */
172 static bool
create_foreign_server(const char * const node_name,const char * const host,int32 port,const char * const dbname,bool if_not_exists)173 create_foreign_server(const char *const node_name, const char *const host, int32 port,
174 const char *const dbname, bool if_not_exists)
175 {
176 ForeignServer *server = NULL;
177 ObjectAddress objaddr;
178 CreateForeignServerStmt stmt = {
179 .type = T_CreateForeignServerStmt,
180 .servername = (char *) node_name,
181 .fdwname = EXTENSION_FDW_NAME,
182 .options = list_make3(makeDefElem("host", (Node *) makeString(pstrdup(host)), -1),
183 makeDefElem("port", (Node *) makeInteger(port), -1),
184 makeDefElem("dbname", (Node *) makeString(pstrdup(dbname)), -1)),
185 .if_not_exists = if_not_exists,
186 };
187
188 if (NULL == host)
189 ereport(ERROR,
190 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
191 (errmsg("invalid host"),
192 (errhint("A hostname or IP address must be specified when "
193 "a data node does not already exist.")))));
194
195 if (if_not_exists)
196 {
197 server = data_node_get_foreign_server(node_name, ACL_NO_CHECK, false, true);
198
199 if (NULL != server)
200 {
201 ereport(NOTICE,
202 (errcode(ERRCODE_DUPLICATE_OBJECT),
203 errmsg("data node \"%s\" already exists, skipping", node_name)));
204 return false;
205 }
206 }
207
208 /* Permissions checks done in CreateForeignServer() */
209 objaddr = CreateForeignServer(&stmt);
210
211 /* CreateForeignServer returns InvalidOid if server already exists */
212 if (!OidIsValid(objaddr.objectId))
213 {
214 Assert(if_not_exists);
215 return false;
216 }
217
218 return true;
219 }
220
221 TSConnection *
data_node_get_connection(const char * const data_node,RemoteTxnPrepStmtOption const ps_opt,bool transactional)222 data_node_get_connection(const char *const data_node, RemoteTxnPrepStmtOption const ps_opt,
223 bool transactional)
224 {
225 const ForeignServer *server;
226 TSConnectionId id;
227
228 Assert(data_node != NULL);
229 server = data_node_get_foreign_server(data_node, ACL_NO_CHECK, false, false);
230 id = remote_connection_id(server->serverid, GetUserId());
231
232 if (transactional)
233 return remote_dist_txn_get_connection(id, ps_opt);
234
235 return remote_connection_cache_get_connection(id);
236 }
237
238 /* Attribute numbers for datum returned by create_data_node() */
239 enum Anum_create_data_node
240 {
241 Anum_create_data_node_name = 1,
242 Anum_create_data_node_host,
243 Anum_create_data_node_port,
244 Anum_create_data_node_dbname,
245 Anum_create_data_node_node_created,
246 Anum_create_data_node_database_created,
247 Anum_create_data_node_extension_created,
248 _Anum_create_data_node_max,
249 };
250
251 #define Natts_create_data_node (_Anum_create_data_node_max - 1)
252
253 static Datum
create_data_node_datum(FunctionCallInfo fcinfo,const char * node_name,const char * host,int32 port,const char * dbname,bool node_created,bool database_created,bool extension_created)254 create_data_node_datum(FunctionCallInfo fcinfo, const char *node_name, const char *host, int32 port,
255 const char *dbname, bool node_created, bool database_created,
256 bool extension_created)
257 {
258 TupleDesc tupdesc;
259 Datum values[_Anum_create_data_node_max];
260 bool nulls[_Anum_create_data_node_max] = { false };
261 HeapTuple tuple;
262
263 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
264 ereport(ERROR,
265 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
266 errmsg("function returning record called in "
267 "context that cannot accept type record")));
268
269 tupdesc = BlessTupleDesc(tupdesc);
270 values[AttrNumberGetAttrOffset(Anum_create_data_node_name)] = CStringGetDatum(node_name);
271 values[AttrNumberGetAttrOffset(Anum_create_data_node_host)] = CStringGetTextDatum(host);
272 values[AttrNumberGetAttrOffset(Anum_create_data_node_port)] = Int32GetDatum(port);
273 values[AttrNumberGetAttrOffset(Anum_create_data_node_dbname)] = CStringGetDatum(dbname);
274 values[AttrNumberGetAttrOffset(Anum_create_data_node_node_created)] =
275 BoolGetDatum(node_created);
276 values[AttrNumberGetAttrOffset(Anum_create_data_node_database_created)] =
277 BoolGetDatum(database_created);
278 values[AttrNumberGetAttrOffset(Anum_create_data_node_extension_created)] =
279 BoolGetDatum(extension_created);
280 tuple = heap_form_tuple(tupdesc, values, nulls);
281
282 return HeapTupleGetDatum(tuple);
283 }
284
285 static Datum
create_hypertable_data_node_datum(FunctionCallInfo fcinfo,HypertableDataNode * node)286 create_hypertable_data_node_datum(FunctionCallInfo fcinfo, HypertableDataNode *node)
287 {
288 TupleDesc tupdesc;
289 Datum values[Natts_hypertable_data_node];
290 bool nulls[Natts_hypertable_data_node] = { false };
291 HeapTuple tuple;
292
293 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
294 ereport(ERROR,
295 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
296 errmsg("function returning record called in "
297 "context that cannot accept type record")));
298
299 tupdesc = BlessTupleDesc(tupdesc);
300 values[AttrNumberGetAttrOffset(Anum_hypertable_data_node_hypertable_id)] =
301 Int32GetDatum(node->fd.hypertable_id);
302 values[AttrNumberGetAttrOffset(Anum_hypertable_data_node_node_hypertable_id)] =
303 Int32GetDatum(node->fd.node_hypertable_id);
304 values[AttrNumberGetAttrOffset(Anum_hypertable_data_node_node_name)] =
305 NameGetDatum(&node->fd.node_name);
306 tuple = heap_form_tuple(tupdesc, values, nulls);
307
308 return HeapTupleGetDatum(tuple);
309 }
310
311 static List *
create_data_node_options(const char * host,int32 port,const char * dbname,const char * user,const char * password)312 create_data_node_options(const char *host, int32 port, const char *dbname, const char *user,
313 const char *password)
314 {
315 DefElem *host_elm = makeDefElem("host", (Node *) makeString(pstrdup(host)), -1);
316 DefElem *port_elm = makeDefElem("port", (Node *) makeInteger(port), -1);
317 DefElem *dbname_elm = makeDefElem("dbname", (Node *) makeString(pstrdup(dbname)), -1);
318 DefElem *user_elm = makeDefElem("user", (Node *) makeString(pstrdup(user)), -1);
319
320 if (NULL != password)
321 {
322 DefElem *password_elm = makeDefElem("password", (Node *) makeString(pstrdup(password)), -1);
323 return list_make5(host_elm, port_elm, dbname_elm, user_elm, password_elm);
324 }
325
326 return list_make4(host_elm, port_elm, dbname_elm, user_elm);
327 }
328
329 /* Returns 'true' if the database was created. */
330 static bool
data_node_bootstrap_database(TSConnection * conn,const DbInfo * database)331 data_node_bootstrap_database(TSConnection *conn, const DbInfo *database)
332 {
333 const char *const username = PQuser(remote_connection_get_pg_conn(conn));
334
335 Assert(database);
336
337 if (data_node_validate_database(conn, database))
338 {
339 /* If the database already existed on the remote node, we will log a
340 * notice and proceed since it is not an error if the database already
341 * existed on the remote node. */
342 elog(NOTICE,
343 "database \"%s\" already exists on data node, skipping",
344 NameStr(database->name));
345 return false;
346 }
347
348 /* Create the database with the user as owner. There is no need to
349 * validate the database after this command since it should be created
350 * correctly. */
351 PGresult *res =
352 remote_connection_execf(conn,
353 "CREATE DATABASE %s ENCODING %s LC_COLLATE %s LC_CTYPE %s "
354 "TEMPLATE template0 OWNER %s",
355 quote_identifier(NameStr(database->name)),
356 quote_identifier(pg_encoding_to_char(database->encoding)),
357 quote_literal_cstr(NameStr(database->collation)),
358 quote_literal_cstr(NameStr(database->chartype)),
359 quote_identifier(username));
360 if (PQresultStatus(res) != PGRES_COMMAND_OK)
361 remote_result_elog(res, ERROR);
362 return true;
363 }
364
365 /* Validate the database.
366 *
367 * Errors:
368 * Will abort with errors if the database exists but is not correctly set
369 * up.
370 * Returns:
371 * true if the database exists and is valid
372 * false if it does not exist.
373 */
374 static bool
data_node_validate_database(TSConnection * conn,const DbInfo * database)375 data_node_validate_database(TSConnection *conn, const DbInfo *database)
376 {
377 PGresult *res;
378 uint32 actual_encoding;
379 const char *actual_chartype;
380 const char *actual_collation;
381
382 res = remote_connection_execf(conn,
383 "SELECT encoding, datcollate, datctype "
384 "FROM pg_database WHERE datname = %s",
385 quote_literal_cstr(NameStr(database->name)));
386
387 if (PQresultStatus(res) != PGRES_TUPLES_OK)
388 ereport(ERROR,
389 (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res))));
390
391 if (PQntuples(res) == 0)
392 return false;
393
394 Assert(PQnfields(res) > 2);
395
396 actual_encoding = atoi(PQgetvalue(res, 0, 0));
397 if (actual_encoding != database->encoding)
398 ereport(ERROR,
399 (errcode(ERRCODE_TS_DATA_NODE_INVALID_CONFIG),
400 errmsg("database exists but has wrong encoding"),
401 errdetail("Expected database encoding to be \"%s\" (%u) but it was \"%s\" (%u).",
402 pg_encoding_to_char(database->encoding),
403 database->encoding,
404 pg_encoding_to_char(actual_encoding),
405 actual_encoding)));
406
407 actual_collation = PQgetvalue(res, 0, 1);
408 Assert(actual_collation != NULL);
409 if (strcmp(actual_collation, NameStr(database->collation)) != 0)
410 ereport(ERROR,
411 (errcode(ERRCODE_TS_DATA_NODE_INVALID_CONFIG),
412 errmsg("database exists but has wrong collation"),
413 errdetail("Expected collation \"%s\" but it was \"%s\".",
414 NameStr(database->collation),
415 actual_collation)));
416
417 actual_chartype = PQgetvalue(res, 0, 2);
418 Assert(actual_chartype != NULL);
419 if (strcmp(actual_chartype, NameStr(database->chartype)) != 0)
420 ereport(ERROR,
421 (errcode(ERRCODE_TS_DATA_NODE_INVALID_CONFIG),
422 errmsg("database exists but has wrong LC_CTYPE"),
423 errdetail("Expected LC_CTYPE \"%s\" but it was \"%s\".",
424 NameStr(database->chartype),
425 actual_chartype)));
426 return true;
427 }
428
429 static void
data_node_validate_extension(TSConnection * conn)430 data_node_validate_extension(TSConnection *conn)
431 {
432 const char *const dbname = PQdb(remote_connection_get_pg_conn(conn));
433 const char *const host = PQhost(remote_connection_get_pg_conn(conn));
434 const char *const port = PQport(remote_connection_get_pg_conn(conn));
435
436 if (!remote_connection_check_extension(conn))
437 ereport(ERROR,
438 (errcode(ERRCODE_TS_DATA_NODE_INVALID_CONFIG),
439 errmsg("database does not have TimescaleDB extension loaded"),
440 errdetail("The TimescaleDB extension is not loaded in database %s on node at "
441 "%s:%s.",
442 dbname,
443 host,
444 port)));
445 }
446
447 static void
data_node_validate_as_data_node(TSConnection * conn)448 data_node_validate_as_data_node(TSConnection *conn)
449 {
450 PGresult *res =
451 remote_connection_exec(conn, "SELECT _timescaledb_internal.validate_as_data_node()");
452
453 if (PQresultStatus(res) != PGRES_TUPLES_OK)
454 ereport(ERROR,
455 (errcode(ERRCODE_TS_DATA_NODE_INVALID_CONFIG),
456 (errmsg("cannot add \"%s\" as a data node", remote_connection_node_name(conn)),
457 errdetail("%s", PQresultErrorMessage(res)))));
458
459 remote_result_close(res);
460 }
461
462 /*
463 * Bootstrap the extension and associated objects.
464 */
465 static bool
data_node_bootstrap_extension(TSConnection * conn)466 data_node_bootstrap_extension(TSConnection *conn)
467 {
468 const char *const username = PQuser(remote_connection_get_pg_conn(conn));
469 const char *schema_name = ts_extension_schema_name();
470 const char *schema_name_quoted = quote_identifier(schema_name);
471 Oid schema_oid = get_namespace_oid(schema_name, true);
472
473 /* We only count the number of tuples in the code below, but having the
474 * name and version are useful for debugging purposes. */
475 PGresult *res =
476 remote_connection_execf(conn,
477 "SELECT extname, extversion FROM pg_extension WHERE extname = %s",
478 quote_literal_cstr(EXTENSION_NAME));
479
480 if (PQresultStatus(res) != PGRES_TUPLES_OK)
481 ereport(ERROR,
482 (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res))));
483
484 if (PQntuples(res) == 0)
485 {
486 if (schema_oid != PG_PUBLIC_NAMESPACE)
487 {
488 PGresult *res = remote_connection_execf(conn,
489 "CREATE SCHEMA %s AUTHORIZATION %s",
490 schema_name_quoted,
491 quote_identifier(username));
492 if (PQresultStatus(res) != PGRES_COMMAND_OK)
493 {
494 const char *const sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
495 bool schema_exists =
496 (sqlstate && strcmp(sqlstate, ERRCODE_DUPLICATE_SCHEMA_STR) == 0);
497 if (!schema_exists)
498 remote_result_elog(res, ERROR);
499 /* If the schema already existed on the remote node, we got a
500 * duplicate schema error and the schema was not created. In
501 * that case, we log an error with a hint on how to fix the
502 * issue. */
503 ereport(ERROR,
504 (errcode(ERRCODE_DUPLICATE_SCHEMA),
505 errmsg("schema \"%s\" already exists in database, aborting", schema_name),
506 errhint("Make sure that the data node does not contain any "
507 "existing objects prior to adding it.")));
508 }
509 }
510
511 remote_connection_cmdf_ok(conn,
512 "CREATE EXTENSION " EXTENSION_NAME
513 " WITH SCHEMA %s VERSION %s CASCADE",
514 schema_name_quoted,
515 quote_literal_cstr(ts_extension_get_version()));
516 return true;
517 }
518 else
519 {
520 ereport(NOTICE,
521 (errmsg("extension \"%s\" already exists on data node, skipping",
522 PQgetvalue(res, 0, 0)),
523 errdetail("TimescaleDB extension version on %s:%s was %s.",
524 PQhost(remote_connection_get_pg_conn(conn)),
525 PQport(remote_connection_get_pg_conn(conn)),
526 PQgetvalue(res, 0, 1))));
527 data_node_validate_extension(conn);
528 return false;
529 }
530 }
531
532 /* Add dist_uuid on the remote node.
533 *
534 * If the remote node is set to use the current database, `set_dist_id` will report an error and not
535 * set it. */
536 static void
add_distributed_id_to_data_node(TSConnection * conn)537 add_distributed_id_to_data_node(TSConnection *conn)
538 {
539 Datum id_string = DirectFunctionCall1(uuid_out, dist_util_get_id());
540 PGresult *res = remote_connection_queryf_ok(conn,
541 "SELECT _timescaledb_internal.set_dist_id('%s')",
542 DatumGetCString(id_string));
543 remote_result_close(res);
544 }
545
546 /*
547 * Connect to do bootstrapping.
548 *
549 * We iterate through the list of databases and try to connect to so we can
550 * bootstrap the data node.
551 */
552 static TSConnection *
connect_for_bootstrapping(const char * node_name,const char * const host,int32 port,const char * username,const char * password)553 connect_for_bootstrapping(const char *node_name, const char *const host, int32 port,
554 const char *username, const char *password)
555 {
556 TSConnection *conn = NULL;
557 char *err = NULL;
558 int i;
559
560 for (i = 0; i < lengthof(bootstrap_databases); i++)
561 {
562 List *node_options =
563 create_data_node_options(host, port, bootstrap_databases[i], username, password);
564 conn = remote_connection_open_with_options_nothrow(node_name, node_options, &err);
565
566 if (conn)
567 return conn;
568 }
569
570 ereport(ERROR,
571 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
572 errmsg("could not connect to \"%s\"", node_name),
573 err == NULL ? 0 : errdetail("%s", err)));
574
575 pg_unreachable();
576
577 return NULL;
578 }
579
580 /*
581 * Validate that compatible extension is available on the data node.
582 *
583 * We check all available extension versions. Since we are connected to
584 * template DB when performing this check, it means we can't
585 * really tell if a compatible extension is installed in the database we
586 * are trying to add to the cluster. However we can make sure that a user
587 * will be able to manually upgrade the extension on the data node if needed.
588 *
589 * Will abort with error if there is no compatible version available, otherwise do nothing.
590 */
591 static void
data_node_validate_extension_availability(TSConnection * conn)592 data_node_validate_extension_availability(TSConnection *conn)
593 {
594 StringInfo concat_versions = makeStringInfo();
595 bool compatible = false;
596 PGresult *res;
597 int i;
598
599 res =
600 remote_connection_execf(conn,
601 "SELECT version FROM pg_available_extension_versions WHERE name = "
602 "%s AND version ~ '\\d+.\\d+.\\d+.*' ORDER BY version DESC",
603 quote_literal_cstr(EXTENSION_NAME));
604
605 if (PQresultStatus(res) != PGRES_TUPLES_OK)
606 ereport(ERROR,
607 (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res))));
608
609 if (PQntuples(res) == 0)
610 ereport(ERROR,
611 (errcode(ERRCODE_TS_DATA_NODE_INVALID_CONFIG),
612 errmsg("TimescaleDB extension not available on remote PostgreSQL instance"),
613 errhint("Install the TimescaleDB extension on the remote PostgresSQL instance.")));
614
615 Assert(PQnfields(res) == 1);
616
617 for (i = 0; i < PQntuples(res); i++)
618 {
619 bool old_version = false;
620
621 appendStringInfo(concat_versions, "%s, ", PQgetvalue(res, i, 0));
622 compatible = dist_util_is_compatible_version(PQgetvalue(res, i, 0),
623 TIMESCALEDB_VERSION,
624 &old_version);
625 if (compatible)
626 break;
627 }
628
629 if (!compatible)
630 ereport(ERROR,
631 (errcode(ERRCODE_TS_DATA_NODE_INVALID_CONFIG),
632 errmsg("remote PostgreSQL instance has an incompatible timescaledb extension "
633 "version"),
634 errdetail_internal("Access node version: %s, available remote versions: %s.",
635 TIMESCALEDB_VERSION_MOD,
636 concat_versions->data)));
637 }
638
639 /**
640 * Get the configured server port for the server as an integer.
641 *
642 * Returns:
643 * Port number if a port is configured, -1 if it is not able to get
644 * the port number.
645 *
646 * Note:
647 * We cannot use `inet_server_port()` since that will return NULL if
648 * connecting to a server on localhost since a UNIX socket will be
649 * used. This is the case even if explicitly using a port when
650 * connecting. Regardless of how the user connected, we want to use the same
651 * port as the one that the server listens on.
652 */
653 static int32
get_server_port()654 get_server_port()
655 {
656 const char *const portstr =
657 GetConfigOption("port", /* missing_ok= */ false, /* restrict_privileged= */ false);
658 return pg_atoi(portstr, sizeof(int32), 0);
659 }
660
661 /* set_distid may need to be false for some otherwise invalid configurations
662 * that are useful for testing */
663 static Datum
data_node_add_internal(PG_FUNCTION_ARGS,bool set_distid)664 data_node_add_internal(PG_FUNCTION_ARGS, bool set_distid)
665 {
666 Oid userid = GetUserId();
667 const char *username = GetUserNameFromId(userid, false);
668 const char *node_name = PG_ARGISNULL(0) ? NULL : PG_GETARG_CSTRING(0);
669 const char *host = PG_ARGISNULL(1) ? NULL : TextDatumGetCString(PG_GETARG_DATUM(1));
670 const char *dbname = PG_ARGISNULL(2) ? get_database_name(MyDatabaseId) : PG_GETARG_CSTRING(2);
671 int32 port = PG_ARGISNULL(3) ? get_server_port() : PG_GETARG_INT32(3);
672 bool if_not_exists = PG_ARGISNULL(4) ? false : PG_GETARG_BOOL(4);
673 bool bootstrap = PG_ARGISNULL(5) ? true : PG_GETARG_BOOL(5);
674 const char *password = PG_ARGISNULL(6) ? NULL : TextDatumGetCString(PG_GETARG_DATUM(6));
675 bool server_created = false;
676 bool database_created = false;
677 bool extension_created = false;
678 bool PG_USED_FOR_ASSERTS_ONLY result;
679 DbInfo database;
680
681 TS_PREVENT_FUNC_IF_READ_ONLY();
682
683 namestrcpy(&database.name, dbname);
684
685 if (host == NULL)
686 ereport(ERROR,
687 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
688 (errmsg("a host needs to be specified"),
689 errhint("Provide a host name or IP address of a data node to add."))));
690
691 if (set_distid && dist_util_membership() == DIST_MEMBER_DATA_NODE)
692 ereport(ERROR,
693 (errcode(ERRCODE_TS_DATA_NODE_ASSIGNMENT_ALREADY_EXISTS),
694 (errmsg("unable to assign data nodes from an existing distributed database"))));
695
696 if (NULL == node_name)
697 ereport(ERROR,
698 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
699 (errmsg("data node name cannot be NULL"))));
700
701 if (port < 1 || port > PG_UINT16_MAX)
702 ereport(ERROR,
703 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
704 (errmsg("invalid port number %d", port),
705 errhint("The port number must be between 1 and %u.", PG_UINT16_MAX))));
706
707 result = get_database_info(MyDatabaseId, &database);
708 Assert(result);
709
710 /*
711 * Since this function creates databases on remote nodes, and CREATE DATABASE
712 * cannot run in a transaction block, we cannot run the function in a
713 * transaction block either.
714 */
715 TS_PREVENT_IN_TRANSACTION_BLOCK(true);
716
717 /* Try to create the foreign server, or get the existing one in case of
718 * if_not_exists true. */
719 if (create_foreign_server(node_name, host, port, dbname, if_not_exists))
720 {
721 List *node_options;
722 TSConnection *conn;
723
724 server_created = true;
725
726 /* Make the foreign server visible in current transaction. */
727 CommandCounterIncrement();
728
729 /* If bootstrapping, we check the extension availability here and
730 * abort if the extension is not available. We should not start
731 * creating databases and other cruft on the datanode unless we know
732 * that the extension is installed.
733 *
734 * We ensure that there is a database if we are bootstrapping. This is
735 * done using a separate connection since the database that is going
736 * to be used for the data node does not exist yet, so we cannot
737 * connect to it. */
738 if (bootstrap)
739 {
740 TSConnection *conn =
741 connect_for_bootstrapping(node_name, host, port, username, password);
742 Assert(NULL != conn);
743 data_node_validate_extension_availability(conn);
744 database_created = data_node_bootstrap_database(conn, &database);
745 remote_connection_close(conn);
746 }
747
748 /* Connect to the database we are bootstrapping and either install the
749 * extension or validate that the extension is installed. The
750 * following statements are executed inside a transaction so that they
751 * can be rolled back in the event of a failure.
752 *
753 * We could use `remote_dist_txn_get_connection` here, but it is
754 * comparably heavy and make the code more complicated than
755 * necessary. Instead using a more straightforward approach here since
756 * we do not need 2PC support. */
757 node_options = create_data_node_options(host, port, dbname, username, password);
758 conn = remote_connection_open_with_options(node_name, node_options, false);
759 Assert(NULL != conn);
760 remote_connection_cmd_ok(conn, "BEGIN");
761
762 if (bootstrap)
763 extension_created = data_node_bootstrap_extension(conn);
764
765 if (!database_created)
766 {
767 data_node_validate_database(conn, &database);
768 data_node_validate_as_data_node(conn);
769 }
770
771 if (!extension_created)
772 data_node_validate_extension(conn);
773
774 /* After the node is verified or bootstrapped, we set the `dist_uuid`
775 * using the same connection. We skip this if clustering checks are
776 * disabled, which means that the `dist_uuid` is neither set nor
777 * checked.
778 *
779 * This is done inside a transaction so that we can roll it back if
780 * there are any failures. Note that any failure at this point will
781 * not rollback the creates above. */
782 if (set_distid)
783 {
784 if (dist_util_membership() != DIST_MEMBER_ACCESS_NODE)
785 dist_util_set_as_access_node();
786 add_distributed_id_to_data_node(conn);
787 }
788
789 /* If there were an error before, we will not reach this point to the
790 * transaction will be aborted when the connection is closed. */
791 remote_connection_cmd_ok(conn, "COMMIT");
792 remote_connection_close(conn);
793 }
794
795 PG_RETURN_DATUM(create_data_node_datum(fcinfo,
796 node_name,
797 host,
798 port,
799 dbname,
800 server_created,
801 database_created,
802 extension_created));
803 }
804
805 Datum
data_node_add(PG_FUNCTION_ARGS)806 data_node_add(PG_FUNCTION_ARGS)
807 {
808 return data_node_add_internal(fcinfo, true);
809 }
810
811 Datum
data_node_add_without_dist_id(PG_FUNCTION_ARGS)812 data_node_add_without_dist_id(PG_FUNCTION_ARGS)
813 {
814 return data_node_add_internal(fcinfo, false);
815 }
816
817 Datum
data_node_attach(PG_FUNCTION_ARGS)818 data_node_attach(PG_FUNCTION_ARGS)
819 {
820 const char *node_name = PG_ARGISNULL(0) ? NULL : PG_GETARG_CSTRING(0);
821 Oid table_id = PG_GETARG_OID(1);
822 bool if_not_attached = PG_ARGISNULL(2) ? false : PG_GETARG_BOOL(2);
823 bool repartition = PG_ARGISNULL(3) ? false : PG_GETARG_BOOL(3);
824 ForeignServer *fserver;
825 HypertableDataNode *node;
826 Cache *hcache;
827 Hypertable *ht;
828 Dimension *dim;
829 List *result;
830 int num_nodes;
831 ListCell *lc;
832
833 TS_PREVENT_FUNC_IF_READ_ONLY();
834
835 if (PG_ARGISNULL(1))
836 ereport(ERROR,
837 (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("hypertable cannot be NULL")));
838 Assert(get_rel_name(table_id));
839
840 ht = ts_hypertable_cache_get_cache_and_entry(table_id, CACHE_FLAG_NONE, &hcache);
841 Assert(ht != NULL);
842
843 if (!hypertable_is_distributed(ht))
844 ereport(ERROR,
845 (errcode(ERRCODE_TS_HYPERTABLE_NOT_DISTRIBUTED),
846 errmsg("hypertable \"%s\" is not distributed", get_rel_name(table_id))));
847
848 /* Must have owner permissions on the hypertable to attach a new data
849 node. Must also have USAGE on the foreign server. */
850 ts_hypertable_permissions_check(table_id, GetUserId());
851 fserver = data_node_get_foreign_server(node_name, ACL_USAGE, true, false);
852
853 Assert(NULL != fserver);
854
855 foreach (lc, ht->data_nodes)
856 {
857 node = lfirst(lc);
858
859 if (node->foreign_server_oid == fserver->serverid)
860 {
861 ts_cache_release(hcache);
862
863 if (if_not_attached)
864 {
865 ereport(NOTICE,
866 (errcode(ERRCODE_TS_DATA_NODE_ALREADY_ATTACHED),
867 errmsg("data node \"%s\" is already attached to hypertable \"%s\", "
868 "skipping",
869 node_name,
870 get_rel_name(table_id))));
871 PG_RETURN_DATUM(create_hypertable_data_node_datum(fcinfo, node));
872 }
873 else
874 ereport(ERROR,
875 (errcode(ERRCODE_TS_DATA_NODE_ALREADY_ATTACHED),
876 errmsg("data node \"%s\" is already attached to hypertable \"%s\"",
877 node_name,
878 get_rel_name(table_id))));
879 }
880 }
881
882 result = hypertable_assign_data_nodes(ht->fd.id, list_make1((char *) node_name));
883 Assert(result->length == 1);
884
885 /* Get the first closed (space) dimension, which is the one along which we
886 * partition across data nodes. */
887 dim = ts_hyperspace_get_mutable_dimension(ht->space, DIMENSION_TYPE_CLOSED, 0);
888 num_nodes = list_length(ht->data_nodes) + 1;
889
890 if (num_nodes > MAX_NUM_HYPERTABLE_DATA_NODES)
891 ereport(ERROR,
892 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
893 errmsg("max number of data nodes already attached"),
894 errdetail("The number of data nodes in a hypertable cannot exceed %d.",
895 MAX_NUM_HYPERTABLE_DATA_NODES)));
896
897 /* If there are less slices (partitions) in the space dimension than there
898 * are data nodes, we'd like to expand the number of slices to be able to
899 * make use of the new data node. */
900 if (NULL != dim && num_nodes > dim->fd.num_slices)
901 {
902 if (repartition)
903 {
904 ts_dimension_set_number_of_slices(dim, num_nodes & 0xFFFF);
905
906 ereport(NOTICE,
907 (errmsg("the number of partitions in dimension \"%s\" was increased to %u",
908 NameStr(dim->fd.column_name),
909 num_nodes),
910 errdetail("To make use of all attached data nodes, a distributed "
911 "hypertable needs at least as many partitions in the first "
912 "closed (space) dimension as there are attached data nodes.")));
913 }
914 else
915 {
916 /* Raise a warning if the number of partitions are too few to make
917 * use of all data nodes. Need to refresh cache first to get the
918 * updated data node list. */
919 int dimension_id = dim->fd.id;
920
921 ts_cache_release(hcache);
922 hcache = ts_hypertable_cache_pin();
923 ht = ts_hypertable_cache_get_entry(hcache, table_id, CACHE_FLAG_NONE);
924 ts_hypertable_check_partitioning(ht, dimension_id);
925 }
926 }
927
928 node = linitial(result);
929 ts_cache_release(hcache);
930
931 PG_RETURN_DATUM(create_hypertable_data_node_datum(fcinfo, node));
932 }
933
934 /* Only used for generating proper error message */
935 typedef enum OperationType
936 {
937 OP_BLOCK,
938 OP_DETACH,
939 OP_DELETE
940 } OperationType;
941
942 static void
check_replication_for_new_data(const char * node_name,Hypertable * ht,bool force,OperationType op_type)943 check_replication_for_new_data(const char *node_name, Hypertable *ht, bool force,
944 OperationType op_type)
945 {
946 List *available_nodes = ts_hypertable_get_available_data_nodes(ht, false);
947
948 if (ht->fd.replication_factor < list_length(available_nodes))
949 return;
950
951 ereport(force ? WARNING : ERROR,
952 (errcode(ERRCODE_TS_INSUFFICIENT_NUM_DATA_NODES),
953 errmsg("insufficient number of data nodes for distributed hypertable \"%s\"",
954 NameStr(ht->fd.table_name)),
955 errdetail("Reducing the number of available data nodes on distributed"
956 " hypertable \"%s\" prevents full replication of new chunks.",
957 NameStr(ht->fd.table_name)),
958 force ? 0 : errhint("Use force => true to force this operation.")));
959 }
960
961 static bool
data_node_contains_non_replicated_chunks(List * chunk_data_nodes)962 data_node_contains_non_replicated_chunks(List *chunk_data_nodes)
963 {
964 ListCell *lc;
965
966 foreach (lc, chunk_data_nodes)
967 {
968 ChunkDataNode *cdn = lfirst(lc);
969 List *replicas =
970 ts_chunk_data_node_scan_by_chunk_id(cdn->fd.chunk_id, CurrentMemoryContext);
971
972 if (list_length(replicas) < 2)
973 return true;
974 }
975
976 return false;
977 }
978
979 static List *
data_node_detach_or_delete_validate(const char * node_name,Hypertable * ht,bool force,OperationType op_type)980 data_node_detach_or_delete_validate(const char *node_name, Hypertable *ht, bool force,
981 OperationType op_type)
982 {
983 List *chunk_data_nodes =
984 ts_chunk_data_node_scan_by_node_name_and_hypertable_id(node_name,
985 ht->fd.id,
986 CurrentMemoryContext);
987 bool has_non_replicated_chunks = data_node_contains_non_replicated_chunks(chunk_data_nodes);
988
989 Assert(op_type == OP_DELETE || op_type == OP_DETACH);
990
991 if (has_non_replicated_chunks)
992 ereport(ERROR,
993 (errcode(ERRCODE_TS_INSUFFICIENT_NUM_DATA_NODES),
994 errmsg("insufficient number of data nodes"),
995 errdetail("Distributed hypertable \"%s\" would lose data if"
996 " data node \"%s\" is %s.",
997 NameStr(ht->fd.table_name),
998 node_name,
999 (op_type == OP_DELETE) ? "deleted" : "detached"),
1000 errhint("Ensure all chunks on the data node are fully replicated before %s it.",
1001 (op_type == OP_DELETE) ? "deleting" : "detaching")));
1002
1003 if (list_length(chunk_data_nodes) > 0)
1004 {
1005 if (force)
1006 ereport(WARNING,
1007 (errcode(ERRCODE_TS_INSUFFICIENT_NUM_DATA_NODES),
1008 errmsg("distributed hypertable \"%s\" is under-replicated",
1009 NameStr(ht->fd.table_name)),
1010 errdetail("Some chunks no longer meet the replication target"
1011 " after %s data node \"%s\".",
1012 (op_type == OP_DELETE) ? "deleting" : "detaching",
1013 node_name)));
1014 else
1015 ereport(ERROR,
1016 (errcode(ERRCODE_TS_DATA_NODE_IN_USE),
1017 errmsg("data node \"%s\" still holds data for distributed hypertable \"%s\"",
1018 node_name,
1019 NameStr(ht->fd.table_name))));
1020 }
1021
1022 check_replication_for_new_data(node_name, ht, force, op_type);
1023
1024 return chunk_data_nodes;
1025 }
1026
1027 static int
data_node_modify_hypertable_data_nodes(const char * node_name,List * hypertable_data_nodes,bool all_hypertables,OperationType op_type,bool block_chunks,bool force,bool repartition)1028 data_node_modify_hypertable_data_nodes(const char *node_name, List *hypertable_data_nodes,
1029 bool all_hypertables, OperationType op_type,
1030 bool block_chunks, bool force, bool repartition)
1031 {
1032 Cache *hcache = ts_hypertable_cache_pin();
1033 ListCell *lc;
1034 int removed = 0;
1035
1036 foreach (lc, hypertable_data_nodes)
1037 {
1038 HypertableDataNode *node = lfirst(lc);
1039 Oid relid = ts_hypertable_id_to_relid(node->fd.hypertable_id);
1040 Hypertable *ht = ts_hypertable_cache_get_entry_by_id(hcache, node->fd.hypertable_id);
1041 bool has_privs = ts_hypertable_has_privs_of(relid, GetUserId());
1042
1043 Assert(ht != NULL);
1044
1045 if (!has_privs)
1046 {
1047 /* If the operation is OP_DELETE, we MUST be able to detach the data
1048 * node from ALL tables since the foreign server object will be
1049 * deleted. Therefore, we fail the operation if we find a table
1050 * that we don't have owner permissions on in this case. */
1051 if (all_hypertables && op_type != OP_DELETE)
1052 ereport(NOTICE,
1053 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1054 errmsg("skipping hypertable \"%s\" due to missing permissions",
1055 get_rel_name(relid))));
1056 else
1057 ereport(ERROR,
1058 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1059 errmsg("permission denied for hypertable \"%s\"", get_rel_name(relid)),
1060 errdetail("The data node is attached to hypertables that the current "
1061 "user lacks permissions for.")));
1062 }
1063 else if (op_type == OP_DETACH || op_type == OP_DELETE)
1064 {
1065 /* we have permissions to detach */
1066 List *chunk_data_nodes =
1067 data_node_detach_or_delete_validate(NameStr(node->fd.node_name),
1068 ht,
1069 force,
1070 op_type);
1071 ListCell *cs_lc;
1072
1073 /* update chunk foreign table server and delete chunk mapping */
1074 foreach (cs_lc, chunk_data_nodes)
1075 {
1076 ChunkDataNode *cdn = lfirst(cs_lc);
1077
1078 chunk_update_foreign_server_if_needed(cdn->fd.chunk_id, cdn->foreign_server_oid);
1079 ts_chunk_data_node_delete_by_chunk_id_and_node_name(cdn->fd.chunk_id,
1080 NameStr(cdn->fd.node_name));
1081 }
1082
1083 /* delete hypertable mapping */
1084 removed +=
1085 ts_hypertable_data_node_delete_by_node_name_and_hypertable_id(node_name, ht->fd.id);
1086
1087 if (repartition)
1088 {
1089 Dimension *dim =
1090 ts_hyperspace_get_mutable_dimension(ht->space, DIMENSION_TYPE_CLOSED, 0);
1091 int num_nodes = list_length(ht->data_nodes) - 1;
1092
1093 if (dim != NULL && num_nodes < dim->fd.num_slices && num_nodes > 0)
1094 {
1095 ts_dimension_set_number_of_slices(dim, num_nodes & 0xFFFF);
1096
1097 ereport(NOTICE,
1098 (errmsg("the number of partitions in dimension \"%s\" was decreased to "
1099 "%u",
1100 NameStr(dim->fd.column_name),
1101 num_nodes),
1102 errdetail(
1103 "To make efficient use of all attached data nodes, the number of "
1104 "space partitions was set to match the number of data nodes.")));
1105 }
1106 }
1107 }
1108 else
1109 {
1110 /* set block new chunks */
1111 if (block_chunks)
1112 {
1113 if (node->fd.block_chunks)
1114 {
1115 elog(NOTICE,
1116 "new chunks already blocked on data node \"%s\" for"
1117 " hypertable \"%s\"",
1118 NameStr(node->fd.node_name),
1119 get_rel_name(relid));
1120 continue;
1121 }
1122
1123 check_replication_for_new_data(node_name, ht, force, OP_BLOCK);
1124 }
1125 node->fd.block_chunks = block_chunks;
1126 removed += ts_hypertable_data_node_update(node);
1127 }
1128 }
1129 ts_cache_release(hcache);
1130 return removed;
1131 }
1132
1133 static int
data_node_block_hypertable_data_nodes(const char * node_name,List * hypertable_data_nodes,bool all_hypertables,bool block_chunks,bool force)1134 data_node_block_hypertable_data_nodes(const char *node_name, List *hypertable_data_nodes,
1135 bool all_hypertables, bool block_chunks, bool force)
1136 {
1137 return data_node_modify_hypertable_data_nodes(node_name,
1138 hypertable_data_nodes,
1139 all_hypertables,
1140 OP_BLOCK,
1141 block_chunks,
1142 force,
1143 false);
1144 }
1145
1146 static int
data_node_detach_hypertable_data_nodes(const char * node_name,List * hypertable_data_nodes,bool all_hypertables,bool force,bool repartition,OperationType op_type)1147 data_node_detach_hypertable_data_nodes(const char *node_name, List *hypertable_data_nodes,
1148 bool all_hypertables, bool force, bool repartition,
1149 OperationType op_type)
1150 {
1151 return data_node_modify_hypertable_data_nodes(node_name,
1152 hypertable_data_nodes,
1153 all_hypertables,
1154 op_type,
1155 false,
1156 force,
1157 repartition);
1158 }
1159
1160 HypertableDataNode *
data_node_hypertable_get_by_node_name(const Hypertable * ht,const char * node_name,bool attach_check)1161 data_node_hypertable_get_by_node_name(const Hypertable *ht, const char *node_name,
1162 bool attach_check)
1163 {
1164 HypertableDataNode *hdn = NULL;
1165 ListCell *lc;
1166
1167 if (!hypertable_is_distributed(ht))
1168 ereport(ERROR,
1169 (errcode(ERRCODE_TS_HYPERTABLE_NOT_DISTRIBUTED),
1170 errmsg("hypertable \"%s\" is not distributed",
1171 get_rel_name(ht->main_table_relid))));
1172
1173 foreach (lc, ht->data_nodes)
1174 {
1175 hdn = lfirst(lc);
1176 if (namestrcmp(&hdn->fd.node_name, node_name) == 0)
1177 break;
1178 else
1179 hdn = NULL;
1180 }
1181
1182 if (hdn == NULL)
1183 {
1184 if (attach_check)
1185 ereport(ERROR,
1186 (errcode(ERRCODE_TS_DATA_NODE_NOT_ATTACHED),
1187 errmsg("data node \"%s\" is not attached to hypertable \"%s\"",
1188 node_name,
1189 get_rel_name(ht->main_table_relid))));
1190 else
1191 ereport(NOTICE,
1192 (errcode(ERRCODE_TS_DATA_NODE_NOT_ATTACHED),
1193 errmsg("data node \"%s\" is not attached to hypertable \"%s\", "
1194 "skipping",
1195 node_name,
1196 get_rel_name(ht->main_table_relid))));
1197 }
1198
1199 return hdn;
1200 }
1201
1202 static HypertableDataNode *
get_hypertable_data_node(Oid table_id,const char * node_name,bool owner_check,bool attach_check)1203 get_hypertable_data_node(Oid table_id, const char *node_name, bool owner_check, bool attach_check)
1204 {
1205 HypertableDataNode *hdn = NULL;
1206 Cache *hcache = ts_hypertable_cache_pin();
1207 const Hypertable *ht = ts_hypertable_cache_get_entry(hcache, table_id, CACHE_FLAG_NONE);
1208
1209 if (owner_check)
1210 ts_hypertable_permissions_check(table_id, GetUserId());
1211
1212 hdn = data_node_hypertable_get_by_node_name(ht, node_name, attach_check);
1213
1214 ts_cache_release(hcache);
1215
1216 return hdn;
1217 }
1218
1219 static Datum
data_node_block_or_allow_new_chunks(const char * node_name,Oid const table_id,bool force,bool block_chunks)1220 data_node_block_or_allow_new_chunks(const char *node_name, Oid const table_id, bool force,
1221 bool block_chunks)
1222 {
1223 int affected = 0;
1224 bool all_hypertables = table_id == InvalidOid ? true : false;
1225 List *hypertable_data_nodes = NIL;
1226 ForeignServer *server = data_node_get_foreign_server(node_name, ACL_USAGE, true, false);
1227
1228 Assert(NULL != server);
1229
1230 if (OidIsValid(table_id))
1231 {
1232 /* Early abort on missing hypertable permissions */
1233 ts_hypertable_permissions_check(table_id, GetUserId());
1234 hypertable_data_nodes =
1235 list_make1(get_hypertable_data_node(table_id, server->servername, true, true));
1236 }
1237 else
1238 {
1239 /* block or allow for all hypertables */
1240 hypertable_data_nodes =
1241 ts_hypertable_data_node_scan_by_node_name(server->servername, CurrentMemoryContext);
1242 }
1243
1244 affected = data_node_block_hypertable_data_nodes(server->servername,
1245 hypertable_data_nodes,
1246 all_hypertables,
1247 block_chunks,
1248 force);
1249 return Int32GetDatum(affected);
1250 }
1251
1252 Datum
data_node_allow_new_chunks(PG_FUNCTION_ARGS)1253 data_node_allow_new_chunks(PG_FUNCTION_ARGS)
1254 {
1255 const char *node_name = PG_ARGISNULL(0) ? NULL : NameStr(*PG_GETARG_NAME(0));
1256 Oid table_id = PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1);
1257
1258 TS_PREVENT_FUNC_IF_READ_ONLY();
1259
1260 return data_node_block_or_allow_new_chunks(node_name, table_id, false, false);
1261 }
1262
1263 Datum
data_node_block_new_chunks(PG_FUNCTION_ARGS)1264 data_node_block_new_chunks(PG_FUNCTION_ARGS)
1265 {
1266 const char *node_name = PG_ARGISNULL(0) ? NULL : NameStr(*PG_GETARG_NAME(0));
1267 Oid table_id = PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1);
1268 bool force = PG_ARGISNULL(2) ? false : PG_GETARG_BOOL(2);
1269
1270 TS_PREVENT_FUNC_IF_READ_ONLY();
1271
1272 return data_node_block_or_allow_new_chunks(node_name, table_id, force, true);
1273 }
1274
1275 Datum
data_node_detach(PG_FUNCTION_ARGS)1276 data_node_detach(PG_FUNCTION_ARGS)
1277 {
1278 const char *node_name = PG_ARGISNULL(0) ? NULL : NameStr(*PG_GETARG_NAME(0));
1279 Oid table_id = PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1);
1280 bool all_hypertables = PG_ARGISNULL(1);
1281 bool if_attached = PG_ARGISNULL(2) ? false : PG_GETARG_BOOL(2);
1282 bool force = PG_ARGISNULL(3) ? InvalidOid : PG_GETARG_OID(3);
1283 bool repartition = PG_ARGISNULL(4) ? false : PG_GETARG_BOOL(4);
1284 int removed = 0;
1285 List *hypertable_data_nodes = NIL;
1286 ForeignServer *server;
1287
1288 TS_PREVENT_FUNC_IF_READ_ONLY();
1289
1290 server = data_node_get_foreign_server(node_name, ACL_USAGE, true, false);
1291 Assert(NULL != server);
1292
1293 if (OidIsValid(table_id))
1294 {
1295 HypertableDataNode *node;
1296
1297 /* Early abort on missing hypertable permissions */
1298 ts_hypertable_permissions_check(table_id, GetUserId());
1299
1300 node = get_hypertable_data_node(table_id, server->servername, true, !if_attached);
1301 if (node)
1302 hypertable_data_nodes = list_make1(node);
1303 }
1304 else
1305 {
1306 /* Detach data node for all hypertables where user has
1307 * permissions. Permissions checks done in
1308 * data_node_detach_hypertable_data_nodes(). */
1309 hypertable_data_nodes =
1310 ts_hypertable_data_node_scan_by_node_name(server->servername, CurrentMemoryContext);
1311 }
1312
1313 removed = data_node_detach_hypertable_data_nodes(server->servername,
1314 hypertable_data_nodes,
1315 all_hypertables,
1316 force,
1317 repartition,
1318 OP_DETACH);
1319
1320 PG_RETURN_INT32(removed);
1321 }
1322
1323 Datum
data_node_delete(PG_FUNCTION_ARGS)1324 data_node_delete(PG_FUNCTION_ARGS)
1325 {
1326 const char *node_name = PG_ARGISNULL(0) ? NULL : PG_GETARG_CSTRING(0);
1327 bool if_exists = PG_ARGISNULL(1) ? false : PG_GETARG_BOOL(1);
1328 bool force = PG_ARGISNULL(2) ? false : PG_GETARG_BOOL(2);
1329 bool repartition = PG_ARGISNULL(3) ? false : PG_GETARG_BOOL(3);
1330 List *hypertable_data_nodes = NIL;
1331 DropStmt stmt;
1332 ObjectAddress address;
1333 ObjectAddress secondary_object = {
1334 .classId = InvalidOid,
1335 .objectId = InvalidOid,
1336 .objectSubId = 0,
1337 };
1338 Node *parsetree = NULL;
1339 TSConnectionId cid;
1340 ForeignServer *server;
1341
1342 TS_PREVENT_FUNC_IF_READ_ONLY();
1343
1344 /* Need USAGE to detach. Further owner check done when executing the DROP
1345 * statement. */
1346 server = data_node_get_foreign_server(node_name, ACL_USAGE, true, if_exists);
1347
1348 Assert(server == NULL ? if_exists : true);
1349
1350 if (NULL == server)
1351 {
1352 elog(NOTICE, "data node \"%s\" does not exist, skipping", node_name);
1353 PG_RETURN_BOOL(false);
1354 }
1355
1356 /* close any pending connections */
1357 remote_connection_id_set(&cid, server->serverid, GetUserId());
1358 remote_connection_cache_remove(cid);
1359
1360 /* detach data node */
1361 hypertable_data_nodes =
1362 ts_hypertable_data_node_scan_by_node_name(node_name, CurrentMemoryContext);
1363
1364 data_node_detach_hypertable_data_nodes(node_name,
1365 hypertable_data_nodes,
1366 true,
1367 force,
1368 repartition,
1369 OP_DELETE);
1370
1371 /* clean up persistent transaction records */
1372 remote_txn_persistent_record_delete_for_data_node(server->serverid);
1373
1374 stmt = (DropStmt){
1375 .type = T_DropStmt,
1376 .objects = list_make1(makeString(pstrdup(node_name))),
1377 .removeType = OBJECT_FOREIGN_SERVER,
1378 .behavior = DROP_RESTRICT,
1379 .missing_ok = if_exists,
1380 };
1381
1382 parsetree = (Node *) &stmt;
1383
1384 /* Make sure event triggers are invoked so that all dropped objects
1385 * are collected during a cascading drop. This ensures all dependent
1386 * objects get cleaned up. */
1387 EventTriggerBeginCompleteQuery();
1388
1389 PG_TRY();
1390 {
1391 ObjectAddressSet(address, ForeignServerRelationId, server->serverid);
1392 EventTriggerDDLCommandStart(parsetree);
1393 RemoveObjects(&stmt);
1394 EventTriggerCollectSimpleCommand(address, secondary_object, parsetree);
1395 EventTriggerSQLDrop(parsetree);
1396 EventTriggerDDLCommandEnd(parsetree);
1397 }
1398 PG_CATCH();
1399 {
1400 EventTriggerEndCompleteQuery();
1401 PG_RE_THROW();
1402 }
1403 PG_END_TRY();
1404
1405 /* Remove self from dist db if no longer have data_nodes */
1406 if (data_node_get_node_name_list() == NIL)
1407 dist_util_remove_from_db();
1408
1409 EventTriggerEndCompleteQuery();
1410 CommandCounterIncrement();
1411 CacheInvalidateRelcacheByRelid(ForeignServerRelationId);
1412
1413 PG_RETURN_BOOL(true);
1414 }
1415
1416 /*
1417 * Get server list, performing an ACL check on each of them in the process.
1418 */
1419 List *
data_node_get_node_name_list_with_aclcheck(AclMode mode,bool fail_on_aclcheck)1420 data_node_get_node_name_list_with_aclcheck(AclMode mode, bool fail_on_aclcheck)
1421 {
1422 HeapTuple tuple;
1423 ScanKeyData scankey[1];
1424 SysScanDesc scandesc;
1425 Relation rel;
1426 ForeignDataWrapper *fdw = GetForeignDataWrapperByName(EXTENSION_FDW_NAME, false);
1427 List *nodes = NIL;
1428
1429 rel = table_open(ForeignServerRelationId, AccessShareLock);
1430
1431 ScanKeyInit(&scankey[0],
1432 Anum_pg_foreign_server_srvfdw,
1433 BTEqualStrategyNumber,
1434 F_OIDEQ,
1435 ObjectIdGetDatum(fdw->fdwid));
1436
1437 scandesc = systable_beginscan(rel, InvalidOid, false, NULL, 1, scankey);
1438
1439 while (HeapTupleIsValid(tuple = systable_getnext(scandesc)))
1440 {
1441 Form_pg_foreign_server form = (Form_pg_foreign_server) GETSTRUCT(tuple);
1442 ForeignServer *server;
1443
1444 server =
1445 data_node_get_foreign_server(NameStr(form->srvname), mode, fail_on_aclcheck, false);
1446
1447 if (server != NULL)
1448 nodes = lappend(nodes, pstrdup(NameStr(form->srvname)));
1449 }
1450
1451 systable_endscan(scandesc);
1452 table_close(rel, AccessShareLock);
1453
1454 return nodes;
1455 }
1456
1457 /*
1458 * Get server list with optional ACL check.
1459 *
1460 * Returns:
1461 *
1462 * If nodearr is NULL, returns all system-configured data nodes that fulfill
1463 * the ACL check.
1464 *
1465 * If nodearr is non-NULL, returns all the data nodes in the specified array
1466 * subject to ACL checks.
1467 */
1468 List *
data_node_get_filtered_node_name_list(ArrayType * nodearr,AclMode mode,bool fail_on_aclcheck)1469 data_node_get_filtered_node_name_list(ArrayType *nodearr, AclMode mode, bool fail_on_aclcheck)
1470 {
1471 ArrayIterator it;
1472 Datum node_datum;
1473 bool isnull;
1474 List *nodes = NIL;
1475
1476 if (NULL == nodearr)
1477 return data_node_get_node_name_list_with_aclcheck(mode, fail_on_aclcheck);
1478
1479 it = array_create_iterator(nodearr, 0, NULL);
1480
1481 while (array_iterate(it, &node_datum, &isnull))
1482 {
1483 if (!isnull)
1484 {
1485 const char *node_name = DatumGetCString(node_datum);
1486 ForeignServer *server =
1487 data_node_get_foreign_server(node_name, mode, fail_on_aclcheck, false);
1488
1489 if (NULL != server)
1490 nodes = lappend(nodes, server->servername);
1491 }
1492 }
1493
1494 array_free_iterator(it);
1495
1496 return nodes;
1497 }
1498
1499 List *
data_node_get_node_name_list(void)1500 data_node_get_node_name_list(void)
1501 {
1502 return data_node_get_node_name_list_with_aclcheck(ACL_NO_CHECK, false);
1503 }
1504
1505 /*
1506 * Turn an array of data nodes into a list of names.
1507 *
1508 * The function will verify that all the servers in the list belong to the
1509 * TimescaleDB foreign data wrapper. Optionally, perform ACL check on each
1510 * data node's foreign server. Checks are skipped when specificing
1511 * ACL_NO_CHECK. If fail_on_aclcheck is false, then no errors will be thrown
1512 * on ACL check failures. Instead, data nodes that fail ACL checks will simply
1513 * be filtered.
1514 */
1515 List *
data_node_array_to_node_name_list_with_aclcheck(ArrayType * nodearr,AclMode mode,bool fail_on_aclcheck)1516 data_node_array_to_node_name_list_with_aclcheck(ArrayType *nodearr, AclMode mode,
1517 bool fail_on_aclcheck)
1518 {
1519 if (NULL == nodearr)
1520 return NIL;
1521
1522 Assert(ARR_NDIM(nodearr) <= 1);
1523
1524 return data_node_get_filtered_node_name_list(nodearr, mode, fail_on_aclcheck);
1525 }
1526
1527 List *
data_node_array_to_node_name_list(ArrayType * nodearr)1528 data_node_array_to_node_name_list(ArrayType *nodearr)
1529 {
1530 return data_node_array_to_node_name_list_with_aclcheck(nodearr, ACL_NO_CHECK, false);
1531 }
1532
1533 Datum
data_node_ping(PG_FUNCTION_ARGS)1534 data_node_ping(PG_FUNCTION_ARGS)
1535 {
1536 const char *node_name = PG_ARGISNULL(0) ? NULL : PG_GETARG_CSTRING(0);
1537 /* Allow anyone to ping a data node. Otherwise the
1538 * timescaledb_information.data_node view won't work for those users. */
1539 ForeignServer *server = data_node_get_foreign_server(node_name, ACL_NO_CHECK, false, false);
1540 bool success;
1541
1542 Assert(NULL != server);
1543
1544 success = remote_connection_ping(server->servername);
1545
1546 PG_RETURN_DATUM(BoolGetDatum(success));
1547 }
1548
1549 List *
data_node_oids_to_node_name_list(List * data_node_oids,AclMode mode)1550 data_node_oids_to_node_name_list(List *data_node_oids, AclMode mode)
1551 {
1552 List *node_names = NIL;
1553 ListCell *lc;
1554 ForeignServer *fs;
1555
1556 foreach (lc, data_node_oids)
1557 {
1558 Oid foreign_server_oid = lfirst_oid(lc);
1559 fs = data_node_get_foreign_server_by_oid(foreign_server_oid, mode);
1560 node_names = lappend(node_names, pstrdup(fs->servername));
1561 }
1562
1563 return node_names;
1564 }
1565
1566 void
data_node_name_list_check_acl(List * data_node_names,AclMode mode)1567 data_node_name_list_check_acl(List *data_node_names, AclMode mode)
1568 {
1569 AclResult aclresult;
1570 Oid curuserid;
1571 ListCell *lc;
1572
1573 if (data_node_names == NIL)
1574 return;
1575
1576 curuserid = GetUserId();
1577
1578 foreach (lc, data_node_names)
1579 {
1580 /* Validate the servers, but privilege check is optional */
1581 ForeignServer *server = GetForeignServerByName(lfirst(lc), false);
1582
1583 if (mode != ACL_NO_CHECK)
1584 {
1585 /* Must have permissions on the server object */
1586 aclresult = pg_foreign_server_aclcheck(server->serverid, curuserid, mode);
1587
1588 if (aclresult != ACLCHECK_OK)
1589 aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername);
1590 }
1591 }
1592 }
1593