1 /*
2  * This file and its contents are licensed under the Timescale License.
3  * Please see the included NOTICE for copyright information and
4  * LICENSE-TIMESCALE for a copy of the license.
5  */
6 #include <postgres.h>
7 #include <foreign/foreign.h>
8 #include <catalog/pg_foreign_server.h>
9 #include <catalog/pg_foreign_table.h>
10 #include <catalog/dependency.h>
11 #include <catalog/namespace.h>
12 #include <access/htup_details.h>
13 #include <access/xact.h>
14 #include <nodes/makefuncs.h>
15 #include <utils/acl.h>
16 #include <utils/builtins.h>
17 #include <utils/syscache.h>
18 #include <utils/inval.h>
19 #include <utils/tuplestore.h>
20 #include <utils/palloc.h>
21 #include <utils/memutils.h>
22 #include <utils/snapmgr.h>
23 #include <executor/executor.h>
24 #include <parser/parse_func.h>
25 #include <funcapi.h>
26 #include <miscadmin.h>
27 #include <fmgr.h>
28 #include <executor/spi.h>
29 
30 #if USE_ASSERT_CHECKING
31 #include <funcapi.h>
32 #endif
33 
34 #include <compat/compat.h>
35 #include <chunk_data_node.h>
36 #include <extension.h>
37 #include <errors.h>
38 #include <error_utils.h>
39 #include <hypertable_cache.h>
40 
41 #include "chunk.h"
42 #include "chunk_api.h"
43 #include "chunk_copy.h"
44 #include "data_node.h"
45 #include "debug_point.h"
46 #include "remote/dist_commands.h"
47 #include "dist_util.h"
48 
49 #define CCS_INIT "init"
50 #define CCS_CREATE_EMPTY_CHUNK "create_empty_chunk"
51 #define CCS_CREATE_PUBLICATION "create_publication"
52 #define CCS_CREATE_REPLICATION_SLOT "create_replication_slot"
53 #define CCS_CREATE_SUBSCRIPTION "create_subscription"
54 #define CCS_SYNC_START "sync_start"
55 #define CCS_SYNC "sync"
56 #define CCS_DROP_PUBLICATION "drop_publication"
57 #define CCS_DROP_SUBSCRIPTION "drop_subscription"
58 #define CCS_ATTACH_CHUNK "attach_chunk"
59 #define CCS_DELETE_CHUNK "delete_chunk"
60 
61 typedef struct ChunkCopyStage ChunkCopyStage;
62 typedef struct ChunkCopy ChunkCopy;
63 
64 typedef void (*chunk_copy_stage_func)(ChunkCopy *);
65 
66 struct ChunkCopyStage
67 {
68 	const char *name;
69 	chunk_copy_stage_func function;
70 	chunk_copy_stage_func function_cleanup;
71 };
72 
73 /* To track a chunk move or copy activity */
74 struct ChunkCopy
75 {
76 	/* catalog data */
77 	FormData_chunk_copy_operation fd;
78 	/* current stage being executed */
79 	const ChunkCopyStage *stage;
80 	/* chunk to copy */
81 	Chunk *chunk;
82 	/* from/to foreign servers */
83 	ForeignServer *src_server;
84 	ForeignServer *dst_server;
85 	/* temporary memory context */
86 	MemoryContext mcxt;
87 };
88 
89 static HeapTuple
chunk_copy_operation_make_tuple(const FormData_chunk_copy_operation * fd,TupleDesc desc)90 chunk_copy_operation_make_tuple(const FormData_chunk_copy_operation *fd, TupleDesc desc)
91 {
92 	Datum values[Natts_chunk_copy_operation];
93 	bool nulls[Natts_chunk_copy_operation] = { false };
94 	memset(values, 0, sizeof(values));
95 	values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_operation_id)] =
96 		NameGetDatum(&fd->operation_id);
97 	values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_backend_pid)] =
98 		Int32GetDatum(fd->backend_pid);
99 	values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_completed_stage)] =
100 		NameGetDatum(&fd->completed_stage);
101 	values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_time_start)] =
102 		TimestampTzGetDatum(fd->time_start);
103 	values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_chunk_id)] =
104 		Int32GetDatum(fd->chunk_id);
105 	values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_source_node_name)] =
106 		NameGetDatum(&fd->source_node_name);
107 	values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_dest_node_name)] =
108 		NameGetDatum(&fd->dest_node_name);
109 	values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_delete_on_src_node)] =
110 		BoolGetDatum(fd->delete_on_src_node);
111 	return heap_form_tuple(desc, values, nulls);
112 }
113 
114 static void
chunk_copy_operation_insert_rel(Relation rel,const FormData_chunk_copy_operation * fd)115 chunk_copy_operation_insert_rel(Relation rel, const FormData_chunk_copy_operation *fd)
116 {
117 	CatalogSecurityContext sec_ctx;
118 	HeapTuple new_tuple;
119 
120 	new_tuple = chunk_copy_operation_make_tuple(fd, RelationGetDescr(rel));
121 
122 	ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
123 	ts_catalog_insert(rel, new_tuple);
124 	ts_catalog_restore_user(&sec_ctx);
125 	heap_freetuple(new_tuple);
126 }
127 
128 static void
chunk_copy_operation_insert(const FormData_chunk_copy_operation * fd)129 chunk_copy_operation_insert(const FormData_chunk_copy_operation *fd)
130 {
131 	Catalog *catalog;
132 	Relation rel;
133 
134 	catalog = ts_catalog_get();
135 	rel = table_open(catalog_get_table_id(catalog, CHUNK_COPY_OPERATION), RowExclusiveLock);
136 
137 	chunk_copy_operation_insert_rel(rel, fd);
138 	table_close(rel, RowExclusiveLock);
139 }
140 
141 static ScanTupleResult
chunk_copy_operation_tuple_update(TupleInfo * ti,void * data)142 chunk_copy_operation_tuple_update(TupleInfo *ti, void *data)
143 {
144 	ChunkCopy *cc = data;
145 	Datum values[Natts_chunk_copy_operation];
146 	bool nulls[Natts_chunk_copy_operation];
147 	CatalogSecurityContext sec_ctx;
148 	bool should_free;
149 	HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
150 	HeapTuple new_tuple;
151 
152 	heap_deform_tuple(tuple, ts_scanner_get_tupledesc(ti), values, nulls);
153 
154 	/* We only update the "completed_stage" field */
155 	Assert(NULL != cc->stage);
156 	values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_completed_stage)] =
157 		DirectFunctionCall1(namein, CStringGetDatum((cc->stage->name)));
158 
159 	new_tuple = heap_form_tuple(ts_scanner_get_tupledesc(ti), values, nulls);
160 	ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
161 	ts_catalog_update_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti), new_tuple);
162 	ts_catalog_restore_user(&sec_ctx);
163 	heap_freetuple(new_tuple);
164 
165 	if (should_free)
166 		heap_freetuple(tuple);
167 
168 	return SCAN_DONE;
169 }
170 
171 static int
chunk_copy_operation_scan_update_by_id(const char * operation_id,tuple_found_func tuple_found,void * data,LOCKMODE lockmode)172 chunk_copy_operation_scan_update_by_id(const char *operation_id, tuple_found_func tuple_found,
173 									   void *data, LOCKMODE lockmode)
174 {
175 	Catalog *catalog = ts_catalog_get();
176 	ScanKeyData scankey[1];
177 	ScannerCtx scanctx = {
178 		.table = catalog_get_table_id(catalog, CHUNK_COPY_OPERATION),
179 		.index = catalog_get_index(catalog, CHUNK_COPY_OPERATION, CHUNK_COPY_OPERATION_PKEY_IDX),
180 		.nkeys = 1,
181 		.limit = 1,
182 		.scankey = scankey,
183 		.data = data,
184 		.tuple_found = tuple_found,
185 		.lockmode = lockmode,
186 		.scandirection = ForwardScanDirection,
187 	};
188 
189 	ScanKeyInit(&scankey[0],
190 				Anum_chunk_copy_operation_idx_operation_id,
191 				BTEqualStrategyNumber,
192 				F_NAMEEQ,
193 				CStringGetDatum(operation_id));
194 
195 	return ts_scanner_scan(&scanctx);
196 }
197 
198 static void
chunk_copy_operation_update(ChunkCopy * cc)199 chunk_copy_operation_update(ChunkCopy *cc)
200 {
201 	NameData application_name;
202 
203 	snprintf(application_name.data,
204 			 sizeof(application_name.data),
205 			 "%s:%s",
206 			 cc->fd.operation_id.data,
207 			 cc->stage->name);
208 
209 	pgstat_report_appname(application_name.data);
210 
211 	chunk_copy_operation_scan_update_by_id(NameStr(cc->fd.operation_id),
212 										   chunk_copy_operation_tuple_update,
213 										   cc,
214 										   RowExclusiveLock);
215 }
216 
217 static ScanTupleResult
chunk_copy_operation_tuple_delete(TupleInfo * ti,void * data)218 chunk_copy_operation_tuple_delete(TupleInfo *ti, void *data)
219 {
220 	CatalogSecurityContext sec_ctx;
221 
222 	ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
223 	ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti));
224 	ts_catalog_restore_user(&sec_ctx);
225 
226 	return SCAN_CONTINUE;
227 }
228 
229 static int
chunk_copy_operation_delete_by_id(const char * operation_id)230 chunk_copy_operation_delete_by_id(const char *operation_id)
231 {
232 	Catalog *catalog = ts_catalog_get();
233 	ScanKeyData scankey[1];
234 	ScannerCtx scanctx = {
235 		.table = catalog_get_table_id(catalog, CHUNK_COPY_OPERATION),
236 		.index = catalog_get_index(catalog, CHUNK_COPY_OPERATION, CHUNK_COPY_OPERATION_PKEY_IDX),
237 		.nkeys = 1,
238 		.limit = 1,
239 		.scankey = scankey,
240 		.data = NULL,
241 		.tuple_found = chunk_copy_operation_tuple_delete,
242 		.lockmode = RowExclusiveLock,
243 		.scandirection = ForwardScanDirection,
244 	};
245 
246 	ScanKeyInit(&scankey[0],
247 				Anum_chunk_copy_operation_idx_operation_id,
248 				BTEqualStrategyNumber,
249 				F_NAMEEQ,
250 				CStringGetDatum(operation_id));
251 
252 	return ts_scanner_scan(&scanctx);
253 }
254 
255 static void
chunk_copy_setup(ChunkCopy * cc,Oid chunk_relid,const char * src_node,const char * dst_node,bool delete_on_src_node)256 chunk_copy_setup(ChunkCopy *cc, Oid chunk_relid, const char *src_node, const char *dst_node,
257 				 bool delete_on_src_node)
258 {
259 	Hypertable *ht;
260 	Cache *hcache;
261 	MemoryContext old, mcxt;
262 
263 	if (!superuser())
264 		ereport(ERROR,
265 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
266 				 (errmsg("must be superuser to copy/move chunk to data node"))));
267 
268 	if (dist_util_membership() != DIST_MEMBER_ACCESS_NODE)
269 		ereport(ERROR,
270 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
271 				 errmsg("function must be run on the access node only")));
272 
273 	/*
274 	 * The chunk and foreign server info needs to be on a memory context
275 	 * that will survive moving to a new transaction for each stage
276 	 */
277 	mcxt = AllocSetContextCreate(PortalContext, "chunk move activity", ALLOCSET_DEFAULT_SIZES);
278 	old = MemoryContextSwitchTo(mcxt);
279 	cc->mcxt = mcxt;
280 	cc->chunk = ts_chunk_get_by_relid(chunk_relid, true);
281 	cc->stage = NULL;
282 
283 	/* It has to be a foreign table chunk */
284 	if (cc->chunk->relkind != RELKIND_FOREIGN_TABLE)
285 		ereport(ERROR,
286 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
287 				 errmsg("\"%s\" is not a valid remote chunk", get_rel_name(chunk_relid))));
288 
289 	/* It has to be an uncompressed chunk, we query the status field on the AN for this */
290 	if (ts_chunk_is_compressed(cc->chunk))
291 		ereport(ERROR,
292 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
293 				 errmsg("\"%s\" is a compressed remote chunk. Chunk copy/move not supported"
294 						" currently on compressed chunks",
295 						get_rel_name(chunk_relid))));
296 
297 	ht = ts_hypertable_cache_get_cache_and_entry(cc->chunk->hypertable_relid,
298 												 CACHE_FLAG_NONE,
299 												 &hcache);
300 
301 	ts_hypertable_permissions_check(ht->main_table_relid, GetUserId());
302 
303 	if (!hypertable_is_distributed(ht))
304 		ereport(ERROR,
305 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
306 				 errmsg("hypertable \"%s\" is not distributed",
307 						get_rel_name(ht->main_table_relid))));
308 
309 	cc->src_server = data_node_get_foreign_server(src_node, ACL_USAGE, true, false);
310 	Assert(NULL != cc->src_server);
311 
312 	cc->dst_server = data_node_get_foreign_server(dst_node, ACL_USAGE, true, false);
313 	Assert(NULL != cc->dst_server);
314 
315 	/* Ensure that source and destination data nodes are not the same */
316 	if (cc->src_server == cc->dst_server)
317 		ereport(ERROR,
318 				(errcode(ERRCODE_UNDEFINED_OBJECT),
319 				 errmsg("source and destination data node match")));
320 
321 	/* Check that src_node is a valid DN and that chunk exists on it */
322 	if (!ts_chunk_has_data_node(cc->chunk, src_node))
323 		ereport(ERROR,
324 				(errcode(ERRCODE_UNDEFINED_OBJECT),
325 				 errmsg("chunk \"%s\" does not exist on source data node \"%s\"",
326 						get_rel_name(chunk_relid),
327 						src_node)));
328 
329 	/* Check that dst_node is a valid DN and that chunk does not exist on it */
330 	if (ts_chunk_has_data_node(cc->chunk, dst_node))
331 		ereport(ERROR,
332 				(errcode(ERRCODE_UNDEFINED_OBJECT),
333 				 errmsg("chunk \"%s\" already exists on destination data node \"%s\"",
334 						get_rel_name(chunk_relid),
335 						dst_node)));
336 
337 	/*
338 	 * Populate the FormData_chunk_copy_operation structure for use by various stages
339 	 *
340 	 * The operation_id will be populated in the chunk_copy_stage_init function.
341 	 */
342 	cc->fd.backend_pid = MyProcPid;
343 	namestrcpy(&cc->fd.completed_stage, CCS_INIT);
344 	cc->fd.time_start = GetCurrentTimestamp();
345 	cc->fd.chunk_id = cc->chunk->fd.id;
346 	namestrcpy(&cc->fd.source_node_name, src_node);
347 	namestrcpy(&cc->fd.dest_node_name, dst_node);
348 	cc->fd.delete_on_src_node = delete_on_src_node;
349 
350 	ts_cache_release(hcache);
351 	MemoryContextSwitchTo(old);
352 
353 	/* Commit to get out of starting transaction. This will also pop active
354 	 * snapshots. */
355 	SPI_commit();
356 }
357 
358 static void
chunk_copy_finish(ChunkCopy * cc)359 chunk_copy_finish(ChunkCopy *cc)
360 {
361 	/* Done using this long lived memory context */
362 	MemoryContextDelete(cc->mcxt);
363 
364 	/* Start a transaction for the final outer transaction */
365 	SPI_start_transaction();
366 }
367 
368 static void
chunk_copy_stage_init(ChunkCopy * cc)369 chunk_copy_stage_init(ChunkCopy *cc)
370 {
371 	int32 id;
372 
373 	/*
374 	 * Get the operation id for this chunk move/copy activity. The naming
375 	 * convention is "ts_copy_seq-id_chunk-id".
376 	 */
377 	id = ts_catalog_table_next_seq_id(ts_catalog_get(), CHUNK_COPY_OPERATION);
378 	snprintf(cc->fd.operation_id.data,
379 			 sizeof(cc->fd.operation_id.data),
380 			 "ts_copy_%d_%d",
381 			 id,
382 			 cc->chunk->fd.id);
383 
384 	/* Persist the Formdata entry in the catalog */
385 	chunk_copy_operation_insert(&cc->fd);
386 }
387 
388 static void
chunk_copy_stage_init_cleanup(ChunkCopy * cc)389 chunk_copy_stage_init_cleanup(ChunkCopy *cc)
390 {
391 	/* Failure in initial stages, delete this entry from the catalog */
392 	chunk_copy_operation_delete_by_id(NameStr(cc->fd.operation_id));
393 }
394 
395 static void
chunk_copy_stage_create_empty_chunk(ChunkCopy * cc)396 chunk_copy_stage_create_empty_chunk(ChunkCopy *cc)
397 {
398 	/* Create an empty chunk table on the dst_node */
399 	Cache *hcache;
400 	Hypertable *ht;
401 
402 	ht = ts_hypertable_cache_get_cache_and_entry(cc->chunk->hypertable_relid,
403 												 CACHE_FLAG_NONE,
404 												 &hcache);
405 
406 	chunk_api_call_create_empty_chunk_table(ht, cc->chunk, NameStr(cc->fd.dest_node_name));
407 
408 	ts_cache_release(hcache);
409 }
410 
411 static void
chunk_copy_stage_create_empty_chunk_cleanup(ChunkCopy * cc)412 chunk_copy_stage_create_empty_chunk_cleanup(ChunkCopy *cc)
413 {
414 	/*
415 	 * Drop the chunk table on the dst_node. We use the API instead of just
416 	 * "DROP TABLE" because some metadata cleanup might also be needed
417 	 */
418 	chunk_api_call_chunk_drop_replica(cc->chunk,
419 									  NameStr(cc->fd.dest_node_name),
420 									  cc->dst_server->serverid);
421 }
422 
423 static void
chunk_copy_stage_create_publication(ChunkCopy * cc)424 chunk_copy_stage_create_publication(ChunkCopy *cc)
425 {
426 	const char *cmd;
427 
428 	/* Create publication on the source data node */
429 	cmd = psprintf("CREATE PUBLICATION %s FOR TABLE %s",
430 				   NameStr(cc->fd.operation_id),
431 				   quote_qualified_identifier(NameStr(cc->chunk->fd.schema_name),
432 											  NameStr(cc->chunk->fd.table_name)));
433 
434 	/* Create the publication */
435 	ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
436 }
437 
438 static void
chunk_copy_stage_create_replication_slot(ChunkCopy * cc)439 chunk_copy_stage_create_replication_slot(ChunkCopy *cc)
440 {
441 	const char *cmd;
442 
443 	/*
444 	 * CREATE SUBSCRIPTION from a database within the same database cluster will hang,
445 	 * create the replication slot separately before creating the subscription
446 	 */
447 	cmd = psprintf("SELECT pg_create_logical_replication_slot('%s', 'pgoutput')",
448 				   NameStr(cc->fd.operation_id));
449 
450 	ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
451 }
452 
453 static void
chunk_copy_stage_create_replication_slot_cleanup(ChunkCopy * cc)454 chunk_copy_stage_create_replication_slot_cleanup(ChunkCopy *cc)
455 {
456 	char *cmd;
457 	DistCmdResult *dist_res;
458 	PGresult *res;
459 
460 	/* Check if the slot exists on the source data node */
461 	cmd = psprintf("SELECT 1 FROM pg_catalog.pg_replication_slots WHERE slot_name = '%s'",
462 				   NameStr(cc->fd.operation_id));
463 	dist_res =
464 		ts_dist_cmd_invoke_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
465 	res = ts_dist_cmd_get_result_by_node_name(dist_res, NameStr(cc->fd.source_node_name));
466 
467 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
468 		ereport(ERROR,
469 				(errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res))));
470 
471 	/* Drop replication slot on the source data node only if it exists */
472 	if (PQntuples(res) != 0)
473 	{
474 		cmd = psprintf("SELECT pg_drop_replication_slot('%s')", NameStr(cc->fd.operation_id));
475 		ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
476 	}
477 
478 	ts_dist_cmd_close_response(dist_res);
479 }
480 
481 static void
chunk_copy_stage_create_publication_cleanup(ChunkCopy * cc)482 chunk_copy_stage_create_publication_cleanup(ChunkCopy *cc)
483 {
484 	char *cmd;
485 	DistCmdResult *dist_res;
486 	PGresult *res;
487 
488 	/*
489 	 * Check if the replication slot exists and clean it up if so. This might
490 	 * happen if there's a failure in the create_replication_slot stage but
491 	 * PG might end up creating the slot even though we issued a ROLLBACK
492 	 */
493 	chunk_copy_stage_create_replication_slot_cleanup(cc);
494 
495 	/* Check if the publication exists on the source data node */
496 	cmd = psprintf("SELECT 1 FROM pg_catalog.pg_publication WHERE pubname = '%s'",
497 				   NameStr(cc->fd.operation_id));
498 	dist_res =
499 		ts_dist_cmd_invoke_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
500 	res = ts_dist_cmd_get_result_by_node_name(dist_res, NameStr(cc->fd.source_node_name));
501 
502 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
503 		ereport(ERROR,
504 				(errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res))));
505 
506 	/* Drop publication on the source node only if it exists */
507 	if (PQntuples(res) != 0)
508 	{
509 		cmd = psprintf("DROP PUBLICATION %s", NameStr(cc->fd.operation_id));
510 
511 		/* Drop the publication */
512 		ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
513 	}
514 
515 	ts_dist_cmd_close_response(dist_res);
516 }
517 
518 static void
chunk_copy_stage_create_subscription(ChunkCopy * cc)519 chunk_copy_stage_create_subscription(ChunkCopy *cc)
520 {
521 	const char *cmd;
522 	const char *connection_string;
523 
524 	/* Prepare connection string to the source node */
525 	connection_string = remote_connection_get_connstr(NameStr(cc->fd.source_node_name));
526 
527 	cmd = psprintf("CREATE SUBSCRIPTION %s CONNECTION '%s' PUBLICATION %s"
528 				   " WITH (create_slot = false, enabled = false)",
529 				   NameStr(cc->fd.operation_id),
530 				   connection_string,
531 				   NameStr(cc->fd.operation_id));
532 	ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
533 }
534 
535 static void
chunk_copy_stage_create_subscription_cleanup(ChunkCopy * cc)536 chunk_copy_stage_create_subscription_cleanup(ChunkCopy *cc)
537 {
538 	char *cmd;
539 	DistCmdResult *dist_res;
540 	PGresult *res;
541 
542 	/* Check if the subscription exists on the destination data node */
543 	cmd = psprintf("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = '%s'",
544 				   NameStr(cc->fd.operation_id));
545 	dist_res =
546 		ts_dist_cmd_invoke_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
547 	res = ts_dist_cmd_get_result_by_node_name(dist_res, NameStr(cc->fd.dest_node_name));
548 
549 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
550 		ereport(ERROR,
551 				(errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res))));
552 
553 	/* Cleanup only if the subscription exists */
554 	if (PQntuples(res) != 0)
555 	{
556 		List *nodes = list_make1(NameStr(cc->fd.dest_node_name));
557 
558 		/* Disassociate the subscription from the replication slot first */
559 		cmd =
560 			psprintf("ALTER SUBSCRIPTION %s SET (slot_name = NONE)", NameStr(cc->fd.operation_id));
561 		ts_dist_cmd_run_on_data_nodes(cmd, nodes, true);
562 
563 		/* Drop the subscription now */
564 		pfree(cmd);
565 		cmd = psprintf("DROP SUBSCRIPTION %s", NameStr(cc->fd.operation_id));
566 		ts_dist_cmd_run_on_data_nodes(cmd, nodes, true);
567 	}
568 
569 	ts_dist_cmd_close_response(dist_res);
570 }
571 
572 static void
chunk_copy_stage_sync_start(ChunkCopy * cc)573 chunk_copy_stage_sync_start(ChunkCopy *cc)
574 {
575 	const char *cmd;
576 
577 	/* Start data transfer on the destination node */
578 	cmd = psprintf("ALTER SUBSCRIPTION %s ENABLE", NameStr(cc->fd.operation_id));
579 	ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
580 }
581 
582 static void
chunk_copy_stage_sync_start_cleanup(ChunkCopy * cc)583 chunk_copy_stage_sync_start_cleanup(ChunkCopy *cc)
584 {
585 	char *cmd;
586 	DistCmdResult *dist_res;
587 	PGresult *res;
588 
589 	/* Check if the subscription exists on the destination data node */
590 	cmd = psprintf("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = '%s'",
591 				   NameStr(cc->fd.operation_id));
592 	dist_res =
593 		ts_dist_cmd_invoke_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
594 	res = ts_dist_cmd_get_result_by_node_name(dist_res, NameStr(cc->fd.dest_node_name));
595 
596 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
597 		ereport(ERROR,
598 				(errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res))));
599 
600 	/* Alter subscription only if it exists */
601 	if (PQntuples(res) != 0)
602 	{
603 		/* Stop data transfer on the destination node */
604 		cmd = psprintf("ALTER SUBSCRIPTION %s DISABLE", NameStr(cc->fd.operation_id));
605 		ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
606 	}
607 
608 	ts_dist_cmd_close_response(dist_res);
609 }
610 
611 static void
chunk_copy_stage_sync(ChunkCopy * cc)612 chunk_copy_stage_sync(ChunkCopy *cc)
613 {
614 	char *cmd;
615 
616 	/*
617 	 * Transaction blocks run in REPEATABLE READ mode in the connection pool.
618 	 * However this wait_subscription_sync procedure needs to refresh the subcription
619 	 * sync status data and hence needs a READ COMMITTED transaction isolation
620 	 * level for that.
621 	 */
622 	cmd = psprintf("SET transaction_isolation TO 'READ COMMITTED'");
623 	ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
624 	pfree(cmd);
625 
626 	/* Wait until data transfer finishes in its own transaction */
627 	cmd = psprintf("CALL _timescaledb_internal.wait_subscription_sync(%s, %s)",
628 				   quote_literal_cstr(NameStr(cc->chunk->fd.schema_name)),
629 				   quote_literal_cstr(NameStr(cc->chunk->fd.table_name)));
630 
631 	ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
632 	pfree(cmd);
633 }
634 
635 static void
chunk_copy_stage_drop_subscription(ChunkCopy * cc)636 chunk_copy_stage_drop_subscription(ChunkCopy *cc)
637 {
638 	char *cmd;
639 
640 	/* Stop data transfer on the destination node */
641 	cmd = psprintf("ALTER SUBSCRIPTION %s DISABLE", NameStr(cc->fd.operation_id));
642 	ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
643 	pfree(cmd);
644 
645 	/* Disassociate the subscription from the replication slot first */
646 	cmd = psprintf("ALTER SUBSCRIPTION %s SET (slot_name = NONE)", NameStr(cc->fd.operation_id));
647 	ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
648 	pfree(cmd);
649 
650 	/* Drop the subscription now */
651 	cmd = psprintf("DROP SUBSCRIPTION %s", NameStr(cc->fd.operation_id));
652 	ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
653 	pfree(cmd);
654 }
655 
656 static void
chunk_copy_stage_drop_publication(ChunkCopy * cc)657 chunk_copy_stage_drop_publication(ChunkCopy *cc)
658 {
659 	char *cmd;
660 
661 	cmd = psprintf("SELECT pg_drop_replication_slot('%s')", NameStr(cc->fd.operation_id));
662 	ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
663 
664 	cmd = psprintf("DROP PUBLICATION %s", NameStr(cc->fd.operation_id));
665 	ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
666 }
667 
668 static void
chunk_copy_stage_attach_chunk(ChunkCopy * cc)669 chunk_copy_stage_attach_chunk(ChunkCopy *cc)
670 {
671 	Cache *hcache;
672 	Hypertable *ht;
673 	ChunkDataNode *chunk_data_node;
674 	const char *remote_chunk_name;
675 	Chunk *chunk = cc->chunk;
676 
677 	ht = ts_hypertable_cache_get_cache_and_entry(chunk->hypertable_relid, CACHE_FLAG_NONE, &hcache);
678 
679 	/* Check that the hypertable is already attached to this data node */
680 	data_node_hypertable_get_by_node_name(ht, cc->dst_server->servername, true);
681 
682 	chunk_data_node = palloc0(sizeof(ChunkDataNode));
683 
684 	chunk_data_node->fd.chunk_id = chunk->fd.id;
685 	chunk_data_node->fd.node_chunk_id = -1; /* below API will fill it up */
686 	namestrcpy(&chunk_data_node->fd.node_name, cc->dst_server->servername);
687 	chunk_data_node->foreign_server_oid = cc->dst_server->serverid;
688 
689 	remote_chunk_name = psprintf("%s.%s",
690 								 quote_identifier(chunk->fd.schema_name.data),
691 								 quote_identifier(chunk->fd.table_name.data));
692 
693 	chunk_api_create_on_data_nodes(chunk, ht, remote_chunk_name, list_make1(chunk_data_node));
694 
695 	/* All ok, update the AN chunk metadata to add this data node to it */
696 	chunk->data_nodes = lappend(chunk->data_nodes, chunk_data_node);
697 
698 	/* persist this association in the metadata */
699 	ts_chunk_data_node_insert(chunk_data_node);
700 
701 	ts_cache_release(hcache);
702 }
703 
704 static void
chunk_copy_stage_delete_chunk(ChunkCopy * cc)705 chunk_copy_stage_delete_chunk(ChunkCopy *cc)
706 {
707 	if (!cc->fd.delete_on_src_node)
708 		return;
709 
710 	chunk_api_call_chunk_drop_replica(cc->chunk,
711 									  NameStr(cc->fd.source_node_name),
712 									  cc->src_server->serverid);
713 }
714 
715 static const ChunkCopyStage chunk_copy_stages[] = {
716 	/* Initial Marker */
717 	{ CCS_INIT, chunk_copy_stage_init, chunk_copy_stage_init_cleanup },
718 
719 	/*
720 	 * Create empty chunk table on the dst node.
721 	 * The corresponding cleanup function should just delete this empty chunk.
722 	 */
723 	{ CCS_CREATE_EMPTY_CHUNK,
724 	  chunk_copy_stage_create_empty_chunk,
725 	  chunk_copy_stage_create_empty_chunk_cleanup },
726 
727 	/*
728 	 * Setup logical replication between nodes.
729 	 * The corresponding cleanup functions should drop the subscription and
730 	 * remove the replication slot followed by dropping of the publication on
731 	 * the source data node.
732 	 */
733 	{ CCS_CREATE_PUBLICATION,
734 	  chunk_copy_stage_create_publication,
735 	  chunk_copy_stage_create_publication_cleanup },
736 	{ CCS_CREATE_REPLICATION_SLOT,
737 	  chunk_copy_stage_create_replication_slot,
738 	  chunk_copy_stage_create_replication_slot_cleanup },
739 	{ CCS_CREATE_SUBSCRIPTION,
740 	  chunk_copy_stage_create_subscription,
741 	  chunk_copy_stage_create_subscription_cleanup },
742 
743 	/*
744 	 * Begin data transfer and wait for completion.
745 	 * The corresponding cleanup function should just disable the subscription so
746 	 * that earlier steps above can drop the subcription/publication cleanly.
747 	 */
748 	{ CCS_SYNC_START, chunk_copy_stage_sync_start, chunk_copy_stage_sync_start_cleanup },
749 	{ CCS_SYNC, chunk_copy_stage_sync, NULL },
750 
751 	/*
752 	 * Cleanup. Nothing else required via the cleanup functions.
753 	 */
754 	{ CCS_DROP_SUBSCRIPTION, chunk_copy_stage_drop_subscription, NULL },
755 	{ CCS_DROP_PUBLICATION, chunk_copy_stage_drop_publication, NULL },
756 
757 	/*
758 	 * Attach chunk to the hypertable on the dst_node.
759 	 * The operation has succeeded from the destination data node perspective.
760 	 * No cleanup required here.
761 	 */
762 	{ CCS_ATTACH_CHUNK, chunk_copy_stage_attach_chunk, NULL },
763 
764 	/*
765 	 * Maybe delete chunk from the src_node (move operation).
766 	 * Again, everything ok, so no cleanup required, we probably shouldn't be
767 	 * seeing this entry in the catalog table because the operation has succeeded.
768 	 */
769 	{ CCS_DELETE_CHUNK, chunk_copy_stage_delete_chunk, NULL },
770 
771 	/* Done Marker */
772 	{ NULL, NULL, NULL }
773 };
774 
775 static void
chunk_copy_execute(ChunkCopy * cc)776 chunk_copy_execute(ChunkCopy *cc)
777 {
778 	const ChunkCopyStage *stage;
779 
780 	/*
781 	 * Execute each copy stage in a separate transaction. The below will employ
782 	 * 2PC by default. This can be later optimized to use 1PC since only one
783 	 * datanode is involved in most of the stages.
784 	 */
785 	for (stage = &chunk_copy_stages[0]; stage->name != NULL; stage++)
786 	{
787 		SPI_start_transaction();
788 
789 		cc->stage = stage;
790 		cc->stage->function(cc);
791 
792 		/* Mark current stage as completed and update the catalog */
793 		chunk_copy_operation_update(cc);
794 
795 		DEBUG_ERROR_INJECTION(stage->name);
796 
797 		SPI_commit();
798 	}
799 }
800 
801 void
chunk_copy(Oid chunk_relid,const char * src_node,const char * dst_node,bool delete_on_src_node)802 chunk_copy(Oid chunk_relid, const char *src_node, const char *dst_node, bool delete_on_src_node)
803 {
804 	ChunkCopy cc;
805 	const MemoryContext oldcontext = CurrentMemoryContext;
806 
807 	/* Populate copy structure */
808 	chunk_copy_setup(&cc, chunk_relid, src_node, dst_node, delete_on_src_node);
809 
810 	/* Execute chunk copy in separate stages */
811 	PG_TRY();
812 	{
813 		chunk_copy_execute(&cc);
814 	}
815 	PG_CATCH();
816 	{
817 		/* Include chunk copy id to the error message */
818 		ErrorData *edata;
819 		MemoryContextSwitchTo(oldcontext);
820 		edata = CopyErrorData();
821 		edata->detail = psprintf("Chunk copy operation id: %s.", NameStr(cc.fd.operation_id));
822 		FlushErrorState();
823 		ReThrowError(edata);
824 	}
825 	PG_END_TRY();
826 
827 	/* Finish up and delete the catalog entry */
828 	chunk_copy_finish(&cc);
829 }
830 
831 static ScanTupleResult
chunk_copy_operation_tuple_found(TupleInfo * ti,void * const data)832 chunk_copy_operation_tuple_found(TupleInfo *ti, void *const data)
833 {
834 	ChunkCopy **cc = data;
835 
836 	*cc = STRUCT_FROM_SLOT(ti->slot, ti->mctx, ChunkCopy, FormData_chunk_copy_operation);
837 	return SCAN_CONTINUE;
838 }
839 
840 static ChunkCopy *
chunk_copy_operation_get(const char * operation_id)841 chunk_copy_operation_get(const char *operation_id)
842 {
843 	ScanKeyData scankeys[1];
844 	ChunkCopy *cc = NULL;
845 	int indexid;
846 	MemoryContext old, mcxt;
847 
848 	/* Objects need to be in long lived context */
849 	mcxt =
850 		AllocSetContextCreate(PortalContext, "chunk copy cleanup activity", ALLOCSET_DEFAULT_SIZES);
851 	old = MemoryContextSwitchTo(mcxt);
852 
853 	if (operation_id != NULL)
854 	{
855 		ScanKeyInit(&scankeys[0],
856 					Anum_chunk_copy_operation_idx_operation_id,
857 					BTEqualStrategyNumber,
858 					F_NAMEEQ,
859 					CStringGetDatum(operation_id));
860 		indexid = CHUNK_COPY_OPERATION_PKEY_IDX;
861 	}
862 	else
863 		ereport(ERROR,
864 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
865 				 errmsg("invalid chunk copy operation identifier")));
866 
867 	ts_catalog_scan_one(CHUNK_COPY_OPERATION,
868 						indexid,
869 						scankeys,
870 						1,
871 						chunk_copy_operation_tuple_found,
872 						AccessShareLock,
873 						CHUNK_COPY_OPERATION_TABLE_NAME,
874 						&cc);
875 
876 	/*
877 	 * If a valid entry is returned then fill up the rest of the fields in the
878 	 * ChunkCopy structure
879 	 */
880 	if (cc)
881 	{
882 		cc->mcxt = mcxt;
883 		cc->chunk = ts_chunk_get_by_id(cc->fd.chunk_id, true);
884 		cc->stage = NULL;
885 
886 		/* No other sanity checks need to be performed since they were done earlier */
887 
888 		/* Setup the src_node */
889 		cc->src_server =
890 			data_node_get_foreign_server(NameStr(cc->fd.source_node_name), ACL_USAGE, true, false);
891 		Assert(NULL != cc->src_server);
892 
893 		/* Setup the dst_node */
894 		cc->dst_server =
895 			data_node_get_foreign_server(NameStr(cc->fd.dest_node_name), ACL_USAGE, true, false);
896 		Assert(NULL != cc->dst_server);
897 	}
898 
899 	MemoryContextSwitchTo(old);
900 
901 	if (cc == NULL)
902 		/* No entry found, long lived context not required */
903 		MemoryContextDelete(mcxt);
904 
905 	return cc;
906 }
907 
908 static void
chunk_copy_cleanup_internal(ChunkCopy * cc,int stage_idx)909 chunk_copy_cleanup_internal(ChunkCopy *cc, int stage_idx)
910 {
911 	bool first = true;
912 
913 	/* Cleanup each copy stage in a separate transaction */
914 	do
915 	{
916 		SPI_start_transaction();
917 
918 		cc->stage = &chunk_copy_stages[stage_idx];
919 		if (cc->stage->function_cleanup)
920 			cc->stage->function_cleanup(cc);
921 
922 		/* Mark stage as cleaned up and update the catalog */
923 		if (!first && stage_idx != 0)
924 			chunk_copy_operation_update(cc);
925 		else
926 			first = false;
927 
928 		SPI_commit();
929 	} while (--stage_idx >= 0);
930 }
931 
932 void
chunk_copy_cleanup(const char * operation_id)933 chunk_copy_cleanup(const char *operation_id)
934 {
935 	ChunkCopy *cc;
936 	const MemoryContext oldcontext = CurrentMemoryContext;
937 	const ChunkCopyStage *stage;
938 	bool found = false;
939 	int stage_idx;
940 
941 	if (!superuser())
942 		ereport(ERROR,
943 				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
944 				 (errmsg("must be superuser to cleanup a chunk copy operation"))));
945 
946 	if (dist_util_membership() != DIST_MEMBER_ACCESS_NODE)
947 		ereport(ERROR,
948 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
949 				 errmsg("function must be run on the access node only")));
950 
951 	cc = chunk_copy_operation_get(operation_id);
952 
953 	if (cc == NULL)
954 		ereport(ERROR,
955 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
956 				 errmsg("invalid chunk copy operation identifier. Entry not found")));
957 
958 	/* Identify the last completed stage for this activity. */
959 	stage_idx = 0;
960 	for (stage = &chunk_copy_stages[stage_idx]; stage->name != NULL;
961 		 stage = &chunk_copy_stages[++stage_idx])
962 	{
963 		if (namestrcmp(&cc->fd.completed_stage, stage->name) == 0)
964 		{
965 			found = true;
966 			break;
967 		}
968 	}
969 
970 	/* should always find an entry, add ereport to quell compiler warning */
971 	Assert(found == true);
972 	if (!found)
973 		ereport(ERROR,
974 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
975 				 errmsg("stage '%s' not found for copy chunk cleanup",
976 						NameStr(cc->fd.completed_stage))));
977 
978 	/* Commit to get out of starting transaction, this will also pop active
979 	 * snapshots. */
980 	SPI_commit();
981 
982 	/* Run the corresponding cleanup steps to roll back the activity. */
983 	PG_TRY();
984 	{
985 		chunk_copy_cleanup_internal(cc, stage_idx);
986 	}
987 	PG_CATCH();
988 	{
989 		/* Include chunk copy id to the error message */
990 		ErrorData *edata;
991 		MemoryContextSwitchTo(oldcontext);
992 		edata = CopyErrorData();
993 		edata->detail = psprintf("While cleaning up chunk copy operation id: %s.",
994 								 NameStr(cc->fd.operation_id));
995 		FlushErrorState();
996 		ReThrowError(edata);
997 	}
998 	PG_END_TRY();
999 
1000 	/* Finish up and delete the catalog entry */
1001 	chunk_copy_finish(cc);
1002 }
1003