1 /*
2 * This file and its contents are licensed under the Timescale License.
3 * Please see the included NOTICE for copyright information and
4 * LICENSE-TIMESCALE for a copy of the license.
5 */
6 #include <postgres.h>
7 #include <foreign/foreign.h>
8 #include <catalog/pg_foreign_server.h>
9 #include <catalog/pg_foreign_table.h>
10 #include <catalog/dependency.h>
11 #include <catalog/namespace.h>
12 #include <access/htup_details.h>
13 #include <access/xact.h>
14 #include <nodes/makefuncs.h>
15 #include <utils/acl.h>
16 #include <utils/builtins.h>
17 #include <utils/syscache.h>
18 #include <utils/inval.h>
19 #include <utils/tuplestore.h>
20 #include <utils/palloc.h>
21 #include <utils/memutils.h>
22 #include <utils/snapmgr.h>
23 #include <executor/executor.h>
24 #include <parser/parse_func.h>
25 #include <funcapi.h>
26 #include <miscadmin.h>
27 #include <fmgr.h>
28 #include <executor/spi.h>
29
30 #if USE_ASSERT_CHECKING
31 #include <funcapi.h>
32 #endif
33
34 #include <compat/compat.h>
35 #include <chunk_data_node.h>
36 #include <extension.h>
37 #include <errors.h>
38 #include <error_utils.h>
39 #include <hypertable_cache.h>
40
41 #include "chunk.h"
42 #include "chunk_api.h"
43 #include "chunk_copy.h"
44 #include "data_node.h"
45 #include "debug_point.h"
46 #include "remote/dist_commands.h"
47 #include "dist_util.h"
48
49 #define CCS_INIT "init"
50 #define CCS_CREATE_EMPTY_CHUNK "create_empty_chunk"
51 #define CCS_CREATE_PUBLICATION "create_publication"
52 #define CCS_CREATE_REPLICATION_SLOT "create_replication_slot"
53 #define CCS_CREATE_SUBSCRIPTION "create_subscription"
54 #define CCS_SYNC_START "sync_start"
55 #define CCS_SYNC "sync"
56 #define CCS_DROP_PUBLICATION "drop_publication"
57 #define CCS_DROP_SUBSCRIPTION "drop_subscription"
58 #define CCS_ATTACH_CHUNK "attach_chunk"
59 #define CCS_DELETE_CHUNK "delete_chunk"
60
61 typedef struct ChunkCopyStage ChunkCopyStage;
62 typedef struct ChunkCopy ChunkCopy;
63
64 typedef void (*chunk_copy_stage_func)(ChunkCopy *);
65
66 struct ChunkCopyStage
67 {
68 const char *name;
69 chunk_copy_stage_func function;
70 chunk_copy_stage_func function_cleanup;
71 };
72
73 /* To track a chunk move or copy activity */
74 struct ChunkCopy
75 {
76 /* catalog data */
77 FormData_chunk_copy_operation fd;
78 /* current stage being executed */
79 const ChunkCopyStage *stage;
80 /* chunk to copy */
81 Chunk *chunk;
82 /* from/to foreign servers */
83 ForeignServer *src_server;
84 ForeignServer *dst_server;
85 /* temporary memory context */
86 MemoryContext mcxt;
87 };
88
89 static HeapTuple
chunk_copy_operation_make_tuple(const FormData_chunk_copy_operation * fd,TupleDesc desc)90 chunk_copy_operation_make_tuple(const FormData_chunk_copy_operation *fd, TupleDesc desc)
91 {
92 Datum values[Natts_chunk_copy_operation];
93 bool nulls[Natts_chunk_copy_operation] = { false };
94 memset(values, 0, sizeof(values));
95 values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_operation_id)] =
96 NameGetDatum(&fd->operation_id);
97 values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_backend_pid)] =
98 Int32GetDatum(fd->backend_pid);
99 values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_completed_stage)] =
100 NameGetDatum(&fd->completed_stage);
101 values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_time_start)] =
102 TimestampTzGetDatum(fd->time_start);
103 values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_chunk_id)] =
104 Int32GetDatum(fd->chunk_id);
105 values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_source_node_name)] =
106 NameGetDatum(&fd->source_node_name);
107 values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_dest_node_name)] =
108 NameGetDatum(&fd->dest_node_name);
109 values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_delete_on_src_node)] =
110 BoolGetDatum(fd->delete_on_src_node);
111 return heap_form_tuple(desc, values, nulls);
112 }
113
114 static void
chunk_copy_operation_insert_rel(Relation rel,const FormData_chunk_copy_operation * fd)115 chunk_copy_operation_insert_rel(Relation rel, const FormData_chunk_copy_operation *fd)
116 {
117 CatalogSecurityContext sec_ctx;
118 HeapTuple new_tuple;
119
120 new_tuple = chunk_copy_operation_make_tuple(fd, RelationGetDescr(rel));
121
122 ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
123 ts_catalog_insert(rel, new_tuple);
124 ts_catalog_restore_user(&sec_ctx);
125 heap_freetuple(new_tuple);
126 }
127
128 static void
chunk_copy_operation_insert(const FormData_chunk_copy_operation * fd)129 chunk_copy_operation_insert(const FormData_chunk_copy_operation *fd)
130 {
131 Catalog *catalog;
132 Relation rel;
133
134 catalog = ts_catalog_get();
135 rel = table_open(catalog_get_table_id(catalog, CHUNK_COPY_OPERATION), RowExclusiveLock);
136
137 chunk_copy_operation_insert_rel(rel, fd);
138 table_close(rel, RowExclusiveLock);
139 }
140
141 static ScanTupleResult
chunk_copy_operation_tuple_update(TupleInfo * ti,void * data)142 chunk_copy_operation_tuple_update(TupleInfo *ti, void *data)
143 {
144 ChunkCopy *cc = data;
145 Datum values[Natts_chunk_copy_operation];
146 bool nulls[Natts_chunk_copy_operation];
147 CatalogSecurityContext sec_ctx;
148 bool should_free;
149 HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
150 HeapTuple new_tuple;
151
152 heap_deform_tuple(tuple, ts_scanner_get_tupledesc(ti), values, nulls);
153
154 /* We only update the "completed_stage" field */
155 Assert(NULL != cc->stage);
156 values[AttrNumberGetAttrOffset(Anum_chunk_copy_operation_completed_stage)] =
157 DirectFunctionCall1(namein, CStringGetDatum((cc->stage->name)));
158
159 new_tuple = heap_form_tuple(ts_scanner_get_tupledesc(ti), values, nulls);
160 ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
161 ts_catalog_update_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti), new_tuple);
162 ts_catalog_restore_user(&sec_ctx);
163 heap_freetuple(new_tuple);
164
165 if (should_free)
166 heap_freetuple(tuple);
167
168 return SCAN_DONE;
169 }
170
171 static int
chunk_copy_operation_scan_update_by_id(const char * operation_id,tuple_found_func tuple_found,void * data,LOCKMODE lockmode)172 chunk_copy_operation_scan_update_by_id(const char *operation_id, tuple_found_func tuple_found,
173 void *data, LOCKMODE lockmode)
174 {
175 Catalog *catalog = ts_catalog_get();
176 ScanKeyData scankey[1];
177 ScannerCtx scanctx = {
178 .table = catalog_get_table_id(catalog, CHUNK_COPY_OPERATION),
179 .index = catalog_get_index(catalog, CHUNK_COPY_OPERATION, CHUNK_COPY_OPERATION_PKEY_IDX),
180 .nkeys = 1,
181 .limit = 1,
182 .scankey = scankey,
183 .data = data,
184 .tuple_found = tuple_found,
185 .lockmode = lockmode,
186 .scandirection = ForwardScanDirection,
187 };
188
189 ScanKeyInit(&scankey[0],
190 Anum_chunk_copy_operation_idx_operation_id,
191 BTEqualStrategyNumber,
192 F_NAMEEQ,
193 CStringGetDatum(operation_id));
194
195 return ts_scanner_scan(&scanctx);
196 }
197
198 static void
chunk_copy_operation_update(ChunkCopy * cc)199 chunk_copy_operation_update(ChunkCopy *cc)
200 {
201 NameData application_name;
202
203 snprintf(application_name.data,
204 sizeof(application_name.data),
205 "%s:%s",
206 cc->fd.operation_id.data,
207 cc->stage->name);
208
209 pgstat_report_appname(application_name.data);
210
211 chunk_copy_operation_scan_update_by_id(NameStr(cc->fd.operation_id),
212 chunk_copy_operation_tuple_update,
213 cc,
214 RowExclusiveLock);
215 }
216
217 static ScanTupleResult
chunk_copy_operation_tuple_delete(TupleInfo * ti,void * data)218 chunk_copy_operation_tuple_delete(TupleInfo *ti, void *data)
219 {
220 CatalogSecurityContext sec_ctx;
221
222 ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
223 ts_catalog_delete_tid(ti->scanrel, ts_scanner_get_tuple_tid(ti));
224 ts_catalog_restore_user(&sec_ctx);
225
226 return SCAN_CONTINUE;
227 }
228
229 static int
chunk_copy_operation_delete_by_id(const char * operation_id)230 chunk_copy_operation_delete_by_id(const char *operation_id)
231 {
232 Catalog *catalog = ts_catalog_get();
233 ScanKeyData scankey[1];
234 ScannerCtx scanctx = {
235 .table = catalog_get_table_id(catalog, CHUNK_COPY_OPERATION),
236 .index = catalog_get_index(catalog, CHUNK_COPY_OPERATION, CHUNK_COPY_OPERATION_PKEY_IDX),
237 .nkeys = 1,
238 .limit = 1,
239 .scankey = scankey,
240 .data = NULL,
241 .tuple_found = chunk_copy_operation_tuple_delete,
242 .lockmode = RowExclusiveLock,
243 .scandirection = ForwardScanDirection,
244 };
245
246 ScanKeyInit(&scankey[0],
247 Anum_chunk_copy_operation_idx_operation_id,
248 BTEqualStrategyNumber,
249 F_NAMEEQ,
250 CStringGetDatum(operation_id));
251
252 return ts_scanner_scan(&scanctx);
253 }
254
255 static void
chunk_copy_setup(ChunkCopy * cc,Oid chunk_relid,const char * src_node,const char * dst_node,bool delete_on_src_node)256 chunk_copy_setup(ChunkCopy *cc, Oid chunk_relid, const char *src_node, const char *dst_node,
257 bool delete_on_src_node)
258 {
259 Hypertable *ht;
260 Cache *hcache;
261 MemoryContext old, mcxt;
262
263 if (!superuser())
264 ereport(ERROR,
265 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
266 (errmsg("must be superuser to copy/move chunk to data node"))));
267
268 if (dist_util_membership() != DIST_MEMBER_ACCESS_NODE)
269 ereport(ERROR,
270 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
271 errmsg("function must be run on the access node only")));
272
273 /*
274 * The chunk and foreign server info needs to be on a memory context
275 * that will survive moving to a new transaction for each stage
276 */
277 mcxt = AllocSetContextCreate(PortalContext, "chunk move activity", ALLOCSET_DEFAULT_SIZES);
278 old = MemoryContextSwitchTo(mcxt);
279 cc->mcxt = mcxt;
280 cc->chunk = ts_chunk_get_by_relid(chunk_relid, true);
281 cc->stage = NULL;
282
283 /* It has to be a foreign table chunk */
284 if (cc->chunk->relkind != RELKIND_FOREIGN_TABLE)
285 ereport(ERROR,
286 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
287 errmsg("\"%s\" is not a valid remote chunk", get_rel_name(chunk_relid))));
288
289 /* It has to be an uncompressed chunk, we query the status field on the AN for this */
290 if (ts_chunk_is_compressed(cc->chunk))
291 ereport(ERROR,
292 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
293 errmsg("\"%s\" is a compressed remote chunk. Chunk copy/move not supported"
294 " currently on compressed chunks",
295 get_rel_name(chunk_relid))));
296
297 ht = ts_hypertable_cache_get_cache_and_entry(cc->chunk->hypertable_relid,
298 CACHE_FLAG_NONE,
299 &hcache);
300
301 ts_hypertable_permissions_check(ht->main_table_relid, GetUserId());
302
303 if (!hypertable_is_distributed(ht))
304 ereport(ERROR,
305 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
306 errmsg("hypertable \"%s\" is not distributed",
307 get_rel_name(ht->main_table_relid))));
308
309 cc->src_server = data_node_get_foreign_server(src_node, ACL_USAGE, true, false);
310 Assert(NULL != cc->src_server);
311
312 cc->dst_server = data_node_get_foreign_server(dst_node, ACL_USAGE, true, false);
313 Assert(NULL != cc->dst_server);
314
315 /* Ensure that source and destination data nodes are not the same */
316 if (cc->src_server == cc->dst_server)
317 ereport(ERROR,
318 (errcode(ERRCODE_UNDEFINED_OBJECT),
319 errmsg("source and destination data node match")));
320
321 /* Check that src_node is a valid DN and that chunk exists on it */
322 if (!ts_chunk_has_data_node(cc->chunk, src_node))
323 ereport(ERROR,
324 (errcode(ERRCODE_UNDEFINED_OBJECT),
325 errmsg("chunk \"%s\" does not exist on source data node \"%s\"",
326 get_rel_name(chunk_relid),
327 src_node)));
328
329 /* Check that dst_node is a valid DN and that chunk does not exist on it */
330 if (ts_chunk_has_data_node(cc->chunk, dst_node))
331 ereport(ERROR,
332 (errcode(ERRCODE_UNDEFINED_OBJECT),
333 errmsg("chunk \"%s\" already exists on destination data node \"%s\"",
334 get_rel_name(chunk_relid),
335 dst_node)));
336
337 /*
338 * Populate the FormData_chunk_copy_operation structure for use by various stages
339 *
340 * The operation_id will be populated in the chunk_copy_stage_init function.
341 */
342 cc->fd.backend_pid = MyProcPid;
343 namestrcpy(&cc->fd.completed_stage, CCS_INIT);
344 cc->fd.time_start = GetCurrentTimestamp();
345 cc->fd.chunk_id = cc->chunk->fd.id;
346 namestrcpy(&cc->fd.source_node_name, src_node);
347 namestrcpy(&cc->fd.dest_node_name, dst_node);
348 cc->fd.delete_on_src_node = delete_on_src_node;
349
350 ts_cache_release(hcache);
351 MemoryContextSwitchTo(old);
352
353 /* Commit to get out of starting transaction. This will also pop active
354 * snapshots. */
355 SPI_commit();
356 }
357
358 static void
chunk_copy_finish(ChunkCopy * cc)359 chunk_copy_finish(ChunkCopy *cc)
360 {
361 /* Done using this long lived memory context */
362 MemoryContextDelete(cc->mcxt);
363
364 /* Start a transaction for the final outer transaction */
365 SPI_start_transaction();
366 }
367
368 static void
chunk_copy_stage_init(ChunkCopy * cc)369 chunk_copy_stage_init(ChunkCopy *cc)
370 {
371 int32 id;
372
373 /*
374 * Get the operation id for this chunk move/copy activity. The naming
375 * convention is "ts_copy_seq-id_chunk-id".
376 */
377 id = ts_catalog_table_next_seq_id(ts_catalog_get(), CHUNK_COPY_OPERATION);
378 snprintf(cc->fd.operation_id.data,
379 sizeof(cc->fd.operation_id.data),
380 "ts_copy_%d_%d",
381 id,
382 cc->chunk->fd.id);
383
384 /* Persist the Formdata entry in the catalog */
385 chunk_copy_operation_insert(&cc->fd);
386 }
387
388 static void
chunk_copy_stage_init_cleanup(ChunkCopy * cc)389 chunk_copy_stage_init_cleanup(ChunkCopy *cc)
390 {
391 /* Failure in initial stages, delete this entry from the catalog */
392 chunk_copy_operation_delete_by_id(NameStr(cc->fd.operation_id));
393 }
394
395 static void
chunk_copy_stage_create_empty_chunk(ChunkCopy * cc)396 chunk_copy_stage_create_empty_chunk(ChunkCopy *cc)
397 {
398 /* Create an empty chunk table on the dst_node */
399 Cache *hcache;
400 Hypertable *ht;
401
402 ht = ts_hypertable_cache_get_cache_and_entry(cc->chunk->hypertable_relid,
403 CACHE_FLAG_NONE,
404 &hcache);
405
406 chunk_api_call_create_empty_chunk_table(ht, cc->chunk, NameStr(cc->fd.dest_node_name));
407
408 ts_cache_release(hcache);
409 }
410
411 static void
chunk_copy_stage_create_empty_chunk_cleanup(ChunkCopy * cc)412 chunk_copy_stage_create_empty_chunk_cleanup(ChunkCopy *cc)
413 {
414 /*
415 * Drop the chunk table on the dst_node. We use the API instead of just
416 * "DROP TABLE" because some metadata cleanup might also be needed
417 */
418 chunk_api_call_chunk_drop_replica(cc->chunk,
419 NameStr(cc->fd.dest_node_name),
420 cc->dst_server->serverid);
421 }
422
423 static void
chunk_copy_stage_create_publication(ChunkCopy * cc)424 chunk_copy_stage_create_publication(ChunkCopy *cc)
425 {
426 const char *cmd;
427
428 /* Create publication on the source data node */
429 cmd = psprintf("CREATE PUBLICATION %s FOR TABLE %s",
430 NameStr(cc->fd.operation_id),
431 quote_qualified_identifier(NameStr(cc->chunk->fd.schema_name),
432 NameStr(cc->chunk->fd.table_name)));
433
434 /* Create the publication */
435 ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
436 }
437
438 static void
chunk_copy_stage_create_replication_slot(ChunkCopy * cc)439 chunk_copy_stage_create_replication_slot(ChunkCopy *cc)
440 {
441 const char *cmd;
442
443 /*
444 * CREATE SUBSCRIPTION from a database within the same database cluster will hang,
445 * create the replication slot separately before creating the subscription
446 */
447 cmd = psprintf("SELECT pg_create_logical_replication_slot('%s', 'pgoutput')",
448 NameStr(cc->fd.operation_id));
449
450 ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
451 }
452
453 static void
chunk_copy_stage_create_replication_slot_cleanup(ChunkCopy * cc)454 chunk_copy_stage_create_replication_slot_cleanup(ChunkCopy *cc)
455 {
456 char *cmd;
457 DistCmdResult *dist_res;
458 PGresult *res;
459
460 /* Check if the slot exists on the source data node */
461 cmd = psprintf("SELECT 1 FROM pg_catalog.pg_replication_slots WHERE slot_name = '%s'",
462 NameStr(cc->fd.operation_id));
463 dist_res =
464 ts_dist_cmd_invoke_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
465 res = ts_dist_cmd_get_result_by_node_name(dist_res, NameStr(cc->fd.source_node_name));
466
467 if (PQresultStatus(res) != PGRES_TUPLES_OK)
468 ereport(ERROR,
469 (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res))));
470
471 /* Drop replication slot on the source data node only if it exists */
472 if (PQntuples(res) != 0)
473 {
474 cmd = psprintf("SELECT pg_drop_replication_slot('%s')", NameStr(cc->fd.operation_id));
475 ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
476 }
477
478 ts_dist_cmd_close_response(dist_res);
479 }
480
481 static void
chunk_copy_stage_create_publication_cleanup(ChunkCopy * cc)482 chunk_copy_stage_create_publication_cleanup(ChunkCopy *cc)
483 {
484 char *cmd;
485 DistCmdResult *dist_res;
486 PGresult *res;
487
488 /*
489 * Check if the replication slot exists and clean it up if so. This might
490 * happen if there's a failure in the create_replication_slot stage but
491 * PG might end up creating the slot even though we issued a ROLLBACK
492 */
493 chunk_copy_stage_create_replication_slot_cleanup(cc);
494
495 /* Check if the publication exists on the source data node */
496 cmd = psprintf("SELECT 1 FROM pg_catalog.pg_publication WHERE pubname = '%s'",
497 NameStr(cc->fd.operation_id));
498 dist_res =
499 ts_dist_cmd_invoke_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
500 res = ts_dist_cmd_get_result_by_node_name(dist_res, NameStr(cc->fd.source_node_name));
501
502 if (PQresultStatus(res) != PGRES_TUPLES_OK)
503 ereport(ERROR,
504 (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res))));
505
506 /* Drop publication on the source node only if it exists */
507 if (PQntuples(res) != 0)
508 {
509 cmd = psprintf("DROP PUBLICATION %s", NameStr(cc->fd.operation_id));
510
511 /* Drop the publication */
512 ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
513 }
514
515 ts_dist_cmd_close_response(dist_res);
516 }
517
518 static void
chunk_copy_stage_create_subscription(ChunkCopy * cc)519 chunk_copy_stage_create_subscription(ChunkCopy *cc)
520 {
521 const char *cmd;
522 const char *connection_string;
523
524 /* Prepare connection string to the source node */
525 connection_string = remote_connection_get_connstr(NameStr(cc->fd.source_node_name));
526
527 cmd = psprintf("CREATE SUBSCRIPTION %s CONNECTION '%s' PUBLICATION %s"
528 " WITH (create_slot = false, enabled = false)",
529 NameStr(cc->fd.operation_id),
530 connection_string,
531 NameStr(cc->fd.operation_id));
532 ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
533 }
534
535 static void
chunk_copy_stage_create_subscription_cleanup(ChunkCopy * cc)536 chunk_copy_stage_create_subscription_cleanup(ChunkCopy *cc)
537 {
538 char *cmd;
539 DistCmdResult *dist_res;
540 PGresult *res;
541
542 /* Check if the subscription exists on the destination data node */
543 cmd = psprintf("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = '%s'",
544 NameStr(cc->fd.operation_id));
545 dist_res =
546 ts_dist_cmd_invoke_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
547 res = ts_dist_cmd_get_result_by_node_name(dist_res, NameStr(cc->fd.dest_node_name));
548
549 if (PQresultStatus(res) != PGRES_TUPLES_OK)
550 ereport(ERROR,
551 (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res))));
552
553 /* Cleanup only if the subscription exists */
554 if (PQntuples(res) != 0)
555 {
556 List *nodes = list_make1(NameStr(cc->fd.dest_node_name));
557
558 /* Disassociate the subscription from the replication slot first */
559 cmd =
560 psprintf("ALTER SUBSCRIPTION %s SET (slot_name = NONE)", NameStr(cc->fd.operation_id));
561 ts_dist_cmd_run_on_data_nodes(cmd, nodes, true);
562
563 /* Drop the subscription now */
564 pfree(cmd);
565 cmd = psprintf("DROP SUBSCRIPTION %s", NameStr(cc->fd.operation_id));
566 ts_dist_cmd_run_on_data_nodes(cmd, nodes, true);
567 }
568
569 ts_dist_cmd_close_response(dist_res);
570 }
571
572 static void
chunk_copy_stage_sync_start(ChunkCopy * cc)573 chunk_copy_stage_sync_start(ChunkCopy *cc)
574 {
575 const char *cmd;
576
577 /* Start data transfer on the destination node */
578 cmd = psprintf("ALTER SUBSCRIPTION %s ENABLE", NameStr(cc->fd.operation_id));
579 ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
580 }
581
582 static void
chunk_copy_stage_sync_start_cleanup(ChunkCopy * cc)583 chunk_copy_stage_sync_start_cleanup(ChunkCopy *cc)
584 {
585 char *cmd;
586 DistCmdResult *dist_res;
587 PGresult *res;
588
589 /* Check if the subscription exists on the destination data node */
590 cmd = psprintf("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = '%s'",
591 NameStr(cc->fd.operation_id));
592 dist_res =
593 ts_dist_cmd_invoke_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
594 res = ts_dist_cmd_get_result_by_node_name(dist_res, NameStr(cc->fd.dest_node_name));
595
596 if (PQresultStatus(res) != PGRES_TUPLES_OK)
597 ereport(ERROR,
598 (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res))));
599
600 /* Alter subscription only if it exists */
601 if (PQntuples(res) != 0)
602 {
603 /* Stop data transfer on the destination node */
604 cmd = psprintf("ALTER SUBSCRIPTION %s DISABLE", NameStr(cc->fd.operation_id));
605 ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
606 }
607
608 ts_dist_cmd_close_response(dist_res);
609 }
610
611 static void
chunk_copy_stage_sync(ChunkCopy * cc)612 chunk_copy_stage_sync(ChunkCopy *cc)
613 {
614 char *cmd;
615
616 /*
617 * Transaction blocks run in REPEATABLE READ mode in the connection pool.
618 * However this wait_subscription_sync procedure needs to refresh the subcription
619 * sync status data and hence needs a READ COMMITTED transaction isolation
620 * level for that.
621 */
622 cmd = psprintf("SET transaction_isolation TO 'READ COMMITTED'");
623 ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
624 pfree(cmd);
625
626 /* Wait until data transfer finishes in its own transaction */
627 cmd = psprintf("CALL _timescaledb_internal.wait_subscription_sync(%s, %s)",
628 quote_literal_cstr(NameStr(cc->chunk->fd.schema_name)),
629 quote_literal_cstr(NameStr(cc->chunk->fd.table_name)));
630
631 ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
632 pfree(cmd);
633 }
634
635 static void
chunk_copy_stage_drop_subscription(ChunkCopy * cc)636 chunk_copy_stage_drop_subscription(ChunkCopy *cc)
637 {
638 char *cmd;
639
640 /* Stop data transfer on the destination node */
641 cmd = psprintf("ALTER SUBSCRIPTION %s DISABLE", NameStr(cc->fd.operation_id));
642 ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
643 pfree(cmd);
644
645 /* Disassociate the subscription from the replication slot first */
646 cmd = psprintf("ALTER SUBSCRIPTION %s SET (slot_name = NONE)", NameStr(cc->fd.operation_id));
647 ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
648 pfree(cmd);
649
650 /* Drop the subscription now */
651 cmd = psprintf("DROP SUBSCRIPTION %s", NameStr(cc->fd.operation_id));
652 ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
653 pfree(cmd);
654 }
655
656 static void
chunk_copy_stage_drop_publication(ChunkCopy * cc)657 chunk_copy_stage_drop_publication(ChunkCopy *cc)
658 {
659 char *cmd;
660
661 cmd = psprintf("SELECT pg_drop_replication_slot('%s')", NameStr(cc->fd.operation_id));
662 ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
663
664 cmd = psprintf("DROP PUBLICATION %s", NameStr(cc->fd.operation_id));
665 ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
666 }
667
668 static void
chunk_copy_stage_attach_chunk(ChunkCopy * cc)669 chunk_copy_stage_attach_chunk(ChunkCopy *cc)
670 {
671 Cache *hcache;
672 Hypertable *ht;
673 ChunkDataNode *chunk_data_node;
674 const char *remote_chunk_name;
675 Chunk *chunk = cc->chunk;
676
677 ht = ts_hypertable_cache_get_cache_and_entry(chunk->hypertable_relid, CACHE_FLAG_NONE, &hcache);
678
679 /* Check that the hypertable is already attached to this data node */
680 data_node_hypertable_get_by_node_name(ht, cc->dst_server->servername, true);
681
682 chunk_data_node = palloc0(sizeof(ChunkDataNode));
683
684 chunk_data_node->fd.chunk_id = chunk->fd.id;
685 chunk_data_node->fd.node_chunk_id = -1; /* below API will fill it up */
686 namestrcpy(&chunk_data_node->fd.node_name, cc->dst_server->servername);
687 chunk_data_node->foreign_server_oid = cc->dst_server->serverid;
688
689 remote_chunk_name = psprintf("%s.%s",
690 quote_identifier(chunk->fd.schema_name.data),
691 quote_identifier(chunk->fd.table_name.data));
692
693 chunk_api_create_on_data_nodes(chunk, ht, remote_chunk_name, list_make1(chunk_data_node));
694
695 /* All ok, update the AN chunk metadata to add this data node to it */
696 chunk->data_nodes = lappend(chunk->data_nodes, chunk_data_node);
697
698 /* persist this association in the metadata */
699 ts_chunk_data_node_insert(chunk_data_node);
700
701 ts_cache_release(hcache);
702 }
703
704 static void
chunk_copy_stage_delete_chunk(ChunkCopy * cc)705 chunk_copy_stage_delete_chunk(ChunkCopy *cc)
706 {
707 if (!cc->fd.delete_on_src_node)
708 return;
709
710 chunk_api_call_chunk_drop_replica(cc->chunk,
711 NameStr(cc->fd.source_node_name),
712 cc->src_server->serverid);
713 }
714
715 static const ChunkCopyStage chunk_copy_stages[] = {
716 /* Initial Marker */
717 { CCS_INIT, chunk_copy_stage_init, chunk_copy_stage_init_cleanup },
718
719 /*
720 * Create empty chunk table on the dst node.
721 * The corresponding cleanup function should just delete this empty chunk.
722 */
723 { CCS_CREATE_EMPTY_CHUNK,
724 chunk_copy_stage_create_empty_chunk,
725 chunk_copy_stage_create_empty_chunk_cleanup },
726
727 /*
728 * Setup logical replication between nodes.
729 * The corresponding cleanup functions should drop the subscription and
730 * remove the replication slot followed by dropping of the publication on
731 * the source data node.
732 */
733 { CCS_CREATE_PUBLICATION,
734 chunk_copy_stage_create_publication,
735 chunk_copy_stage_create_publication_cleanup },
736 { CCS_CREATE_REPLICATION_SLOT,
737 chunk_copy_stage_create_replication_slot,
738 chunk_copy_stage_create_replication_slot_cleanup },
739 { CCS_CREATE_SUBSCRIPTION,
740 chunk_copy_stage_create_subscription,
741 chunk_copy_stage_create_subscription_cleanup },
742
743 /*
744 * Begin data transfer and wait for completion.
745 * The corresponding cleanup function should just disable the subscription so
746 * that earlier steps above can drop the subcription/publication cleanly.
747 */
748 { CCS_SYNC_START, chunk_copy_stage_sync_start, chunk_copy_stage_sync_start_cleanup },
749 { CCS_SYNC, chunk_copy_stage_sync, NULL },
750
751 /*
752 * Cleanup. Nothing else required via the cleanup functions.
753 */
754 { CCS_DROP_SUBSCRIPTION, chunk_copy_stage_drop_subscription, NULL },
755 { CCS_DROP_PUBLICATION, chunk_copy_stage_drop_publication, NULL },
756
757 /*
758 * Attach chunk to the hypertable on the dst_node.
759 * The operation has succeeded from the destination data node perspective.
760 * No cleanup required here.
761 */
762 { CCS_ATTACH_CHUNK, chunk_copy_stage_attach_chunk, NULL },
763
764 /*
765 * Maybe delete chunk from the src_node (move operation).
766 * Again, everything ok, so no cleanup required, we probably shouldn't be
767 * seeing this entry in the catalog table because the operation has succeeded.
768 */
769 { CCS_DELETE_CHUNK, chunk_copy_stage_delete_chunk, NULL },
770
771 /* Done Marker */
772 { NULL, NULL, NULL }
773 };
774
775 static void
chunk_copy_execute(ChunkCopy * cc)776 chunk_copy_execute(ChunkCopy *cc)
777 {
778 const ChunkCopyStage *stage;
779
780 /*
781 * Execute each copy stage in a separate transaction. The below will employ
782 * 2PC by default. This can be later optimized to use 1PC since only one
783 * datanode is involved in most of the stages.
784 */
785 for (stage = &chunk_copy_stages[0]; stage->name != NULL; stage++)
786 {
787 SPI_start_transaction();
788
789 cc->stage = stage;
790 cc->stage->function(cc);
791
792 /* Mark current stage as completed and update the catalog */
793 chunk_copy_operation_update(cc);
794
795 DEBUG_ERROR_INJECTION(stage->name);
796
797 SPI_commit();
798 }
799 }
800
801 void
chunk_copy(Oid chunk_relid,const char * src_node,const char * dst_node,bool delete_on_src_node)802 chunk_copy(Oid chunk_relid, const char *src_node, const char *dst_node, bool delete_on_src_node)
803 {
804 ChunkCopy cc;
805 const MemoryContext oldcontext = CurrentMemoryContext;
806
807 /* Populate copy structure */
808 chunk_copy_setup(&cc, chunk_relid, src_node, dst_node, delete_on_src_node);
809
810 /* Execute chunk copy in separate stages */
811 PG_TRY();
812 {
813 chunk_copy_execute(&cc);
814 }
815 PG_CATCH();
816 {
817 /* Include chunk copy id to the error message */
818 ErrorData *edata;
819 MemoryContextSwitchTo(oldcontext);
820 edata = CopyErrorData();
821 edata->detail = psprintf("Chunk copy operation id: %s.", NameStr(cc.fd.operation_id));
822 FlushErrorState();
823 ReThrowError(edata);
824 }
825 PG_END_TRY();
826
827 /* Finish up and delete the catalog entry */
828 chunk_copy_finish(&cc);
829 }
830
831 static ScanTupleResult
chunk_copy_operation_tuple_found(TupleInfo * ti,void * const data)832 chunk_copy_operation_tuple_found(TupleInfo *ti, void *const data)
833 {
834 ChunkCopy **cc = data;
835
836 *cc = STRUCT_FROM_SLOT(ti->slot, ti->mctx, ChunkCopy, FormData_chunk_copy_operation);
837 return SCAN_CONTINUE;
838 }
839
840 static ChunkCopy *
chunk_copy_operation_get(const char * operation_id)841 chunk_copy_operation_get(const char *operation_id)
842 {
843 ScanKeyData scankeys[1];
844 ChunkCopy *cc = NULL;
845 int indexid;
846 MemoryContext old, mcxt;
847
848 /* Objects need to be in long lived context */
849 mcxt =
850 AllocSetContextCreate(PortalContext, "chunk copy cleanup activity", ALLOCSET_DEFAULT_SIZES);
851 old = MemoryContextSwitchTo(mcxt);
852
853 if (operation_id != NULL)
854 {
855 ScanKeyInit(&scankeys[0],
856 Anum_chunk_copy_operation_idx_operation_id,
857 BTEqualStrategyNumber,
858 F_NAMEEQ,
859 CStringGetDatum(operation_id));
860 indexid = CHUNK_COPY_OPERATION_PKEY_IDX;
861 }
862 else
863 ereport(ERROR,
864 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
865 errmsg("invalid chunk copy operation identifier")));
866
867 ts_catalog_scan_one(CHUNK_COPY_OPERATION,
868 indexid,
869 scankeys,
870 1,
871 chunk_copy_operation_tuple_found,
872 AccessShareLock,
873 CHUNK_COPY_OPERATION_TABLE_NAME,
874 &cc);
875
876 /*
877 * If a valid entry is returned then fill up the rest of the fields in the
878 * ChunkCopy structure
879 */
880 if (cc)
881 {
882 cc->mcxt = mcxt;
883 cc->chunk = ts_chunk_get_by_id(cc->fd.chunk_id, true);
884 cc->stage = NULL;
885
886 /* No other sanity checks need to be performed since they were done earlier */
887
888 /* Setup the src_node */
889 cc->src_server =
890 data_node_get_foreign_server(NameStr(cc->fd.source_node_name), ACL_USAGE, true, false);
891 Assert(NULL != cc->src_server);
892
893 /* Setup the dst_node */
894 cc->dst_server =
895 data_node_get_foreign_server(NameStr(cc->fd.dest_node_name), ACL_USAGE, true, false);
896 Assert(NULL != cc->dst_server);
897 }
898
899 MemoryContextSwitchTo(old);
900
901 if (cc == NULL)
902 /* No entry found, long lived context not required */
903 MemoryContextDelete(mcxt);
904
905 return cc;
906 }
907
908 static void
chunk_copy_cleanup_internal(ChunkCopy * cc,int stage_idx)909 chunk_copy_cleanup_internal(ChunkCopy *cc, int stage_idx)
910 {
911 bool first = true;
912
913 /* Cleanup each copy stage in a separate transaction */
914 do
915 {
916 SPI_start_transaction();
917
918 cc->stage = &chunk_copy_stages[stage_idx];
919 if (cc->stage->function_cleanup)
920 cc->stage->function_cleanup(cc);
921
922 /* Mark stage as cleaned up and update the catalog */
923 if (!first && stage_idx != 0)
924 chunk_copy_operation_update(cc);
925 else
926 first = false;
927
928 SPI_commit();
929 } while (--stage_idx >= 0);
930 }
931
932 void
chunk_copy_cleanup(const char * operation_id)933 chunk_copy_cleanup(const char *operation_id)
934 {
935 ChunkCopy *cc;
936 const MemoryContext oldcontext = CurrentMemoryContext;
937 const ChunkCopyStage *stage;
938 bool found = false;
939 int stage_idx;
940
941 if (!superuser())
942 ereport(ERROR,
943 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
944 (errmsg("must be superuser to cleanup a chunk copy operation"))));
945
946 if (dist_util_membership() != DIST_MEMBER_ACCESS_NODE)
947 ereport(ERROR,
948 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
949 errmsg("function must be run on the access node only")));
950
951 cc = chunk_copy_operation_get(operation_id);
952
953 if (cc == NULL)
954 ereport(ERROR,
955 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
956 errmsg("invalid chunk copy operation identifier. Entry not found")));
957
958 /* Identify the last completed stage for this activity. */
959 stage_idx = 0;
960 for (stage = &chunk_copy_stages[stage_idx]; stage->name != NULL;
961 stage = &chunk_copy_stages[++stage_idx])
962 {
963 if (namestrcmp(&cc->fd.completed_stage, stage->name) == 0)
964 {
965 found = true;
966 break;
967 }
968 }
969
970 /* should always find an entry, add ereport to quell compiler warning */
971 Assert(found == true);
972 if (!found)
973 ereport(ERROR,
974 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
975 errmsg("stage '%s' not found for copy chunk cleanup",
976 NameStr(cc->fd.completed_stage))));
977
978 /* Commit to get out of starting transaction, this will also pop active
979 * snapshots. */
980 SPI_commit();
981
982 /* Run the corresponding cleanup steps to roll back the activity. */
983 PG_TRY();
984 {
985 chunk_copy_cleanup_internal(cc, stage_idx);
986 }
987 PG_CATCH();
988 {
989 /* Include chunk copy id to the error message */
990 ErrorData *edata;
991 MemoryContextSwitchTo(oldcontext);
992 edata = CopyErrorData();
993 edata->detail = psprintf("While cleaning up chunk copy operation id: %s.",
994 NameStr(cc->fd.operation_id));
995 FlushErrorState();
996 ReThrowError(edata);
997 }
998 PG_END_TRY();
999
1000 /* Finish up and delete the catalog entry */
1001 chunk_copy_finish(cc);
1002 }
1003