1 /*------------------------------------------------------------------------- 2 * tablesync.c 3 * PostgreSQL logical replication 4 * 5 * Copyright (c) 2012-2020, PostgreSQL Global Development Group 6 * 7 * IDENTIFICATION 8 * src/backend/replication/logical/tablesync.c 9 * 10 * NOTES 11 * This file contains code for initial table data synchronization for 12 * logical replication. 13 * 14 * The initial data synchronization is done separately for each table, 15 * in a separate apply worker that only fetches the initial snapshot data 16 * from the publisher and then synchronizes the position in the stream with 17 * the main apply worker. 18 * 19 * There are several reasons for doing the synchronization this way: 20 * - It allows us to parallelize the initial data synchronization 21 * which lowers the time needed for it to happen. 22 * - The initial synchronization does not have to hold the xid and LSN 23 * for the time it takes to copy data of all tables, causing less 24 * bloat and lower disk consumption compared to doing the 25 * synchronization in a single process for the whole database. 26 * - It allows us to synchronize any tables added after the initial 27 * synchronization has finished. 28 * 29 * The stream position synchronization works in multiple steps. 30 * - Sync finishes copy and sets worker state as SYNCWAIT and waits for 31 * state to change in a loop. 32 * - Apply periodically checks tables that are synchronizing for SYNCWAIT. 33 * When the desired state appears, it will set the worker state to 34 * CATCHUP and starts loop-waiting until either the table state is set 35 * to SYNCDONE or the sync worker exits. 36 * - After the sync worker has seen the state change to CATCHUP, it will 37 * read the stream and apply changes (acting like an apply worker) until 38 * it catches up to the specified stream position. Then it sets the 39 * state to SYNCDONE. There might be zero changes applied between 40 * CATCHUP and SYNCDONE, because the sync worker might be ahead of the 41 * apply worker. 42 * - Once the state was set to SYNCDONE, the apply will continue tracking 43 * the table until it reaches the SYNCDONE stream position, at which 44 * point it sets state to READY and stops tracking. Again, there might 45 * be zero changes in between. 46 * 47 * So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP -> 48 * SYNCDONE -> READY. 49 * 50 * The catalog pg_subscription_rel is used to keep information about 51 * subscribed tables and their state. Some transient state during data 52 * synchronization is kept in shared memory. The states SYNCWAIT and 53 * CATCHUP only appear in memory. 54 * 55 * Example flows look like this: 56 * - Apply is in front: 57 * sync:8 58 * -> set in memory SYNCWAIT 59 * apply:10 60 * -> set in memory CATCHUP 61 * -> enter wait-loop 62 * sync:10 63 * -> set in catalog SYNCDONE 64 * -> exit 65 * apply:10 66 * -> exit wait-loop 67 * -> continue rep 68 * apply:11 69 * -> set in catalog READY 70 * - Sync in front: 71 * sync:10 72 * -> set in memory SYNCWAIT 73 * apply:8 74 * -> set in memory CATCHUP 75 * -> continue per-table filtering 76 * sync:10 77 * -> set in catalog SYNCDONE 78 * -> exit 79 * apply:10 80 * -> set in catalog READY 81 * -> stop per-table filtering 82 * -> continue rep 83 *------------------------------------------------------------------------- 84 */ 85 86 #include "postgres.h" 87 88 #include "access/table.h" 89 #include "access/xact.h" 90 #include "catalog/pg_subscription_rel.h" 91 #include "catalog/pg_type.h" 92 #include "commands/copy.h" 93 #include "miscadmin.h" 94 #include "parser/parse_relation.h" 95 #include "pgstat.h" 96 #include "replication/logicallauncher.h" 97 #include "replication/logicalrelation.h" 98 #include "replication/walreceiver.h" 99 #include "replication/worker_internal.h" 100 #include "storage/ipc.h" 101 #include "utils/builtins.h" 102 #include "utils/lsyscache.h" 103 #include "utils/memutils.h" 104 #include "utils/snapmgr.h" 105 106 static bool table_states_valid = false; 107 108 StringInfo copybuf = NULL; 109 110 /* 111 * Exit routine for synchronization worker. 112 */ 113 static void 114 pg_attribute_noreturn() 115 finish_sync_worker(void) 116 { 117 /* 118 * Commit any outstanding transaction. This is the usual case, unless 119 * there was nothing to do for the table. 120 */ 121 if (IsTransactionState()) 122 { 123 CommitTransactionCommand(); 124 pgstat_report_stat(false); 125 } 126 127 /* And flush all writes. */ 128 XLogFlush(GetXLogWriteRecPtr()); 129 130 StartTransactionCommand(); 131 ereport(LOG, 132 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", 133 MySubscription->name, 134 get_rel_name(MyLogicalRepWorker->relid)))); 135 CommitTransactionCommand(); 136 137 /* Find the main apply worker and signal it. */ 138 logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); 139 140 /* Stop gracefully */ 141 proc_exit(0); 142 } 143 144 /* 145 * Wait until the relation synchronization state is set in the catalog to the 146 * expected one. 147 * 148 * Used when transitioning from CATCHUP state to SYNCDONE. 149 * 150 * Returns false if the synchronization worker has disappeared or the table state 151 * has been reset. 152 */ 153 static bool 154 wait_for_relation_state_change(Oid relid, char expected_state) 155 { 156 char state; 157 158 for (;;) 159 { 160 LogicalRepWorker *worker; 161 XLogRecPtr statelsn; 162 163 CHECK_FOR_INTERRUPTS(); 164 165 /* XXX use cache invalidation here to improve performance? */ 166 PushActiveSnapshot(GetLatestSnapshot()); 167 state = GetSubscriptionRelState(MyLogicalRepWorker->subid, 168 relid, &statelsn, true); 169 PopActiveSnapshot(); 170 171 if (state == SUBREL_STATE_UNKNOWN) 172 return false; 173 174 if (state == expected_state) 175 return true; 176 177 /* Check if the sync worker is still running and bail if not. */ 178 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); 179 180 /* Check if the opposite worker is still running and bail if not. */ 181 worker = logicalrep_worker_find(MyLogicalRepWorker->subid, 182 am_tablesync_worker() ? InvalidOid : relid, 183 false); 184 LWLockRelease(LogicalRepWorkerLock); 185 if (!worker) 186 return false; 187 188 (void) WaitLatch(MyLatch, 189 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, 190 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE); 191 192 ResetLatch(MyLatch); 193 } 194 195 return false; 196 } 197 198 /* 199 * Wait until the apply worker changes the state of our synchronization 200 * worker to the expected one. 201 * 202 * Used when transitioning from SYNCWAIT state to CATCHUP. 203 * 204 * Returns false if the apply worker has disappeared. 205 */ 206 static bool 207 wait_for_worker_state_change(char expected_state) 208 { 209 int rc; 210 211 for (;;) 212 { 213 LogicalRepWorker *worker; 214 215 CHECK_FOR_INTERRUPTS(); 216 217 /* 218 * Done if already in correct state. (We assume this fetch is atomic 219 * enough to not give a misleading answer if we do it with no lock.) 220 */ 221 if (MyLogicalRepWorker->relstate == expected_state) 222 return true; 223 224 /* 225 * Bail out if the apply worker has died, else signal it we're 226 * waiting. 227 */ 228 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); 229 worker = logicalrep_worker_find(MyLogicalRepWorker->subid, 230 InvalidOid, false); 231 if (worker && worker->proc) 232 logicalrep_worker_wakeup_ptr(worker); 233 LWLockRelease(LogicalRepWorkerLock); 234 if (!worker) 235 break; 236 237 /* 238 * Wait. We expect to get a latch signal back from the apply worker, 239 * but use a timeout in case it dies without sending one. 240 */ 241 rc = WaitLatch(MyLatch, 242 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, 243 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE); 244 245 if (rc & WL_LATCH_SET) 246 ResetLatch(MyLatch); 247 } 248 249 return false; 250 } 251 252 /* 253 * Callback from syscache invalidation. 254 */ 255 void 256 invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) 257 { 258 table_states_valid = false; 259 } 260 261 /* 262 * Handle table synchronization cooperation from the synchronization 263 * worker. 264 * 265 * If the sync worker is in CATCHUP state and reached (or passed) the 266 * predetermined synchronization point in the WAL stream, mark the table as 267 * SYNCDONE and finish. 268 */ 269 static void 270 process_syncing_tables_for_sync(XLogRecPtr current_lsn) 271 { 272 Assert(IsTransactionState()); 273 274 SpinLockAcquire(&MyLogicalRepWorker->relmutex); 275 276 if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP && 277 current_lsn >= MyLogicalRepWorker->relstate_lsn) 278 { 279 TimeLineID tli; 280 281 MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE; 282 MyLogicalRepWorker->relstate_lsn = current_lsn; 283 284 SpinLockRelease(&MyLogicalRepWorker->relmutex); 285 286 UpdateSubscriptionRelState(MyLogicalRepWorker->subid, 287 MyLogicalRepWorker->relid, 288 MyLogicalRepWorker->relstate, 289 MyLogicalRepWorker->relstate_lsn); 290 291 walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); 292 finish_sync_worker(); 293 } 294 else 295 SpinLockRelease(&MyLogicalRepWorker->relmutex); 296 } 297 298 /* 299 * Handle table synchronization cooperation from the apply worker. 300 * 301 * Walk over all subscription tables that are individually tracked by the 302 * apply process (currently, all that have state other than 303 * SUBREL_STATE_READY) and manage synchronization for them. 304 * 305 * If there are tables that need synchronizing and are not being synchronized 306 * yet, start sync workers for them (if there are free slots for sync 307 * workers). To prevent starting the sync worker for the same relation at a 308 * high frequency after a failure, we store its last start time with each sync 309 * state info. We start the sync worker for the same relation after waiting 310 * at least wal_retrieve_retry_interval. 311 * 312 * For tables that are being synchronized already, check if sync workers 313 * either need action from the apply worker or have finished. This is the 314 * SYNCWAIT to CATCHUP transition. 315 * 316 * If the synchronization position is reached (SYNCDONE), then the table can 317 * be marked as READY and is no longer tracked. 318 */ 319 static void 320 process_syncing_tables_for_apply(XLogRecPtr current_lsn) 321 { 322 struct tablesync_start_time_mapping 323 { 324 Oid relid; 325 TimestampTz last_start_time; 326 }; 327 static List *table_states = NIL; 328 static HTAB *last_start_times = NULL; 329 ListCell *lc; 330 bool started_tx = false; 331 332 Assert(!IsTransactionState()); 333 334 /* We need up-to-date sync state info for subscription tables here. */ 335 if (!table_states_valid) 336 { 337 MemoryContext oldctx; 338 List *rstates; 339 ListCell *lc; 340 SubscriptionRelState *rstate; 341 342 /* Clean the old list. */ 343 list_free_deep(table_states); 344 table_states = NIL; 345 346 StartTransactionCommand(); 347 started_tx = true; 348 349 /* Fetch all non-ready tables. */ 350 rstates = GetSubscriptionNotReadyRelations(MySubscription->oid); 351 352 /* Allocate the tracking info in a permanent memory context. */ 353 oldctx = MemoryContextSwitchTo(CacheMemoryContext); 354 foreach(lc, rstates) 355 { 356 rstate = palloc(sizeof(SubscriptionRelState)); 357 memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); 358 table_states = lappend(table_states, rstate); 359 } 360 MemoryContextSwitchTo(oldctx); 361 362 table_states_valid = true; 363 } 364 365 /* 366 * Prepare a hash table for tracking last start times of workers, to avoid 367 * immediate restarts. We don't need it if there are no tables that need 368 * syncing. 369 */ 370 if (table_states && !last_start_times) 371 { 372 HASHCTL ctl; 373 374 memset(&ctl, 0, sizeof(ctl)); 375 ctl.keysize = sizeof(Oid); 376 ctl.entrysize = sizeof(struct tablesync_start_time_mapping); 377 last_start_times = hash_create("Logical replication table sync worker start times", 378 256, &ctl, HASH_ELEM | HASH_BLOBS); 379 } 380 381 /* 382 * Clean up the hash table when we're done with all tables (just to 383 * release the bit of memory). 384 */ 385 else if (!table_states && last_start_times) 386 { 387 hash_destroy(last_start_times); 388 last_start_times = NULL; 389 } 390 391 /* 392 * Process all tables that are being synchronized. 393 */ 394 foreach(lc, table_states) 395 { 396 SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); 397 398 if (rstate->state == SUBREL_STATE_SYNCDONE) 399 { 400 /* 401 * Apply has caught up to the position where the table sync has 402 * finished. Mark the table as ready so that the apply will just 403 * continue to replicate it normally. 404 */ 405 if (current_lsn >= rstate->lsn) 406 { 407 rstate->state = SUBREL_STATE_READY; 408 rstate->lsn = current_lsn; 409 if (!started_tx) 410 { 411 StartTransactionCommand(); 412 started_tx = true; 413 } 414 415 UpdateSubscriptionRelState(MyLogicalRepWorker->subid, 416 rstate->relid, rstate->state, 417 rstate->lsn); 418 } 419 } 420 else 421 { 422 LogicalRepWorker *syncworker; 423 424 /* 425 * Look for a sync worker for this relation. 426 */ 427 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); 428 429 syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid, 430 rstate->relid, false); 431 432 if (syncworker) 433 { 434 /* Found one, update our copy of its state */ 435 SpinLockAcquire(&syncworker->relmutex); 436 rstate->state = syncworker->relstate; 437 rstate->lsn = syncworker->relstate_lsn; 438 if (rstate->state == SUBREL_STATE_SYNCWAIT) 439 { 440 /* 441 * Sync worker is waiting for apply. Tell sync worker it 442 * can catchup now. 443 */ 444 syncworker->relstate = SUBREL_STATE_CATCHUP; 445 syncworker->relstate_lsn = 446 Max(syncworker->relstate_lsn, current_lsn); 447 } 448 SpinLockRelease(&syncworker->relmutex); 449 450 /* If we told worker to catch up, wait for it. */ 451 if (rstate->state == SUBREL_STATE_SYNCWAIT) 452 { 453 /* Signal the sync worker, as it may be waiting for us. */ 454 if (syncworker->proc) 455 logicalrep_worker_wakeup_ptr(syncworker); 456 457 /* Now safe to release the LWLock */ 458 LWLockRelease(LogicalRepWorkerLock); 459 460 /* 461 * Enter busy loop and wait for synchronization worker to 462 * reach expected state (or die trying). 463 */ 464 if (!started_tx) 465 { 466 StartTransactionCommand(); 467 started_tx = true; 468 } 469 470 wait_for_relation_state_change(rstate->relid, 471 SUBREL_STATE_SYNCDONE); 472 } 473 else 474 LWLockRelease(LogicalRepWorkerLock); 475 } 476 else 477 { 478 /* 479 * If there is no sync worker for this table yet, count 480 * running sync workers for this subscription, while we have 481 * the lock. 482 */ 483 int nsyncworkers = 484 logicalrep_sync_worker_count(MyLogicalRepWorker->subid); 485 486 /* Now safe to release the LWLock */ 487 LWLockRelease(LogicalRepWorkerLock); 488 489 /* 490 * If there are free sync worker slot(s), start a new sync 491 * worker for the table. 492 */ 493 if (nsyncworkers < max_sync_workers_per_subscription) 494 { 495 TimestampTz now = GetCurrentTimestamp(); 496 struct tablesync_start_time_mapping *hentry; 497 bool found; 498 499 hentry = hash_search(last_start_times, &rstate->relid, 500 HASH_ENTER, &found); 501 502 if (!found || 503 TimestampDifferenceExceeds(hentry->last_start_time, now, 504 wal_retrieve_retry_interval)) 505 { 506 logicalrep_worker_launch(MyLogicalRepWorker->dbid, 507 MySubscription->oid, 508 MySubscription->name, 509 MyLogicalRepWorker->userid, 510 rstate->relid); 511 hentry->last_start_time = now; 512 } 513 } 514 } 515 } 516 } 517 518 if (started_tx) 519 { 520 CommitTransactionCommand(); 521 pgstat_report_stat(false); 522 } 523 } 524 525 /* 526 * Process possible state change(s) of tables that are being synchronized. 527 */ 528 void 529 process_syncing_tables(XLogRecPtr current_lsn) 530 { 531 if (am_tablesync_worker()) 532 process_syncing_tables_for_sync(current_lsn); 533 else 534 process_syncing_tables_for_apply(current_lsn); 535 } 536 537 /* 538 * Create list of columns for COPY based on logical relation mapping. 539 */ 540 static List * 541 make_copy_attnamelist(LogicalRepRelMapEntry *rel) 542 { 543 List *attnamelist = NIL; 544 int i; 545 546 for (i = 0; i < rel->remoterel.natts; i++) 547 { 548 attnamelist = lappend(attnamelist, 549 makeString(rel->remoterel.attnames[i])); 550 } 551 552 553 return attnamelist; 554 } 555 556 /* 557 * Data source callback for the COPY FROM, which reads from the remote 558 * connection and passes the data back to our local COPY. 559 */ 560 static int 561 copy_read_data(void *outbuf, int minread, int maxread) 562 { 563 int bytesread = 0; 564 int avail; 565 566 /* If there are some leftover data from previous read, use it. */ 567 avail = copybuf->len - copybuf->cursor; 568 if (avail) 569 { 570 if (avail > maxread) 571 avail = maxread; 572 memcpy(outbuf, ©buf->data[copybuf->cursor], avail); 573 copybuf->cursor += avail; 574 maxread -= avail; 575 bytesread += avail; 576 } 577 578 while (maxread > 0 && bytesread < minread) 579 { 580 pgsocket fd = PGINVALID_SOCKET; 581 int len; 582 char *buf = NULL; 583 584 for (;;) 585 { 586 /* Try read the data. */ 587 len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); 588 589 CHECK_FOR_INTERRUPTS(); 590 591 if (len == 0) 592 break; 593 else if (len < 0) 594 return bytesread; 595 else 596 { 597 /* Process the data */ 598 copybuf->data = buf; 599 copybuf->len = len; 600 copybuf->cursor = 0; 601 602 avail = copybuf->len - copybuf->cursor; 603 if (avail > maxread) 604 avail = maxread; 605 memcpy(outbuf, ©buf->data[copybuf->cursor], avail); 606 outbuf = (void *) ((char *) outbuf + avail); 607 copybuf->cursor += avail; 608 maxread -= avail; 609 bytesread += avail; 610 } 611 612 if (maxread <= 0 || bytesread >= minread) 613 return bytesread; 614 } 615 616 /* 617 * Wait for more data or latch. 618 */ 619 (void) WaitLatchOrSocket(MyLatch, 620 WL_SOCKET_READABLE | WL_LATCH_SET | 621 WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, 622 fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA); 623 624 ResetLatch(MyLatch); 625 } 626 627 return bytesread; 628 } 629 630 631 /* 632 * Get information about remote relation in similar fashion the RELATION 633 * message provides during replication. 634 */ 635 static void 636 fetch_remote_table_info(char *nspname, char *relname, 637 LogicalRepRelation *lrel) 638 { 639 WalRcvExecResult *res; 640 StringInfoData cmd; 641 TupleTableSlot *slot; 642 Oid tableRow[] = {OIDOID, CHAROID, CHAROID}; 643 Oid attrRow[] = {TEXTOID, OIDOID, INT4OID, BOOLOID}; 644 bool isnull; 645 int natt; 646 647 lrel->nspname = nspname; 648 lrel->relname = relname; 649 650 /* First fetch Oid and replica identity. */ 651 initStringInfo(&cmd); 652 appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind" 653 " FROM pg_catalog.pg_class c" 654 " INNER JOIN pg_catalog.pg_namespace n" 655 " ON (c.relnamespace = n.oid)" 656 " WHERE n.nspname = %s" 657 " AND c.relname = %s", 658 quote_literal_cstr(nspname), 659 quote_literal_cstr(relname)); 660 res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 661 lengthof(tableRow), tableRow); 662 663 if (res->status != WALRCV_OK_TUPLES) 664 ereport(ERROR, 665 (errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s", 666 nspname, relname, res->err))); 667 668 slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); 669 if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) 670 ereport(ERROR, 671 (errmsg("table \"%s.%s\" not found on publisher", 672 nspname, relname))); 673 674 lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull)); 675 Assert(!isnull); 676 lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull)); 677 Assert(!isnull); 678 lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull)); 679 Assert(!isnull); 680 681 ExecDropSingleTupleTableSlot(slot); 682 walrcv_clear_result(res); 683 684 /* Now fetch columns. */ 685 resetStringInfo(&cmd); 686 appendStringInfo(&cmd, 687 "SELECT a.attname," 688 " a.atttypid," 689 " a.atttypmod," 690 " a.attnum = ANY(i.indkey)" 691 " FROM pg_catalog.pg_attribute a" 692 " LEFT JOIN pg_catalog.pg_index i" 693 " ON (i.indexrelid = pg_get_replica_identity_index(%u))" 694 " WHERE a.attnum > 0::pg_catalog.int2" 695 " AND NOT a.attisdropped %s" 696 " AND a.attrelid = %u" 697 " ORDER BY a.attnum", 698 lrel->remoteid, 699 (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ? 700 "AND a.attgenerated = ''" : ""), 701 lrel->remoteid); 702 res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 703 lengthof(attrRow), attrRow); 704 705 if (res->status != WALRCV_OK_TUPLES) 706 ereport(ERROR, 707 (errmsg("could not fetch table info for table \"%s.%s\": %s", 708 nspname, relname, res->err))); 709 710 /* We don't know the number of rows coming, so allocate enough space. */ 711 lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *)); 712 lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid)); 713 lrel->attkeys = NULL; 714 715 natt = 0; 716 slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); 717 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) 718 { 719 lrel->attnames[natt] = 720 TextDatumGetCString(slot_getattr(slot, 1, &isnull)); 721 Assert(!isnull); 722 lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull)); 723 Assert(!isnull); 724 if (DatumGetBool(slot_getattr(slot, 4, &isnull))) 725 lrel->attkeys = bms_add_member(lrel->attkeys, natt); 726 727 /* Should never happen. */ 728 if (++natt >= MaxTupleAttributeNumber) 729 elog(ERROR, "too many columns in remote table \"%s.%s\"", 730 nspname, relname); 731 732 ExecClearTuple(slot); 733 } 734 ExecDropSingleTupleTableSlot(slot); 735 736 lrel->natts = natt; 737 738 walrcv_clear_result(res); 739 pfree(cmd.data); 740 } 741 742 /* 743 * Copy existing data of a table from publisher. 744 * 745 * Caller is responsible for locking the local relation. 746 */ 747 static void 748 copy_table(Relation rel) 749 { 750 LogicalRepRelMapEntry *relmapentry; 751 LogicalRepRelation lrel; 752 WalRcvExecResult *res; 753 StringInfoData cmd; 754 CopyState cstate; 755 List *attnamelist; 756 ParseState *pstate; 757 758 /* Get the publisher relation info. */ 759 fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), 760 RelationGetRelationName(rel), &lrel); 761 762 /* Put the relation into relmap. */ 763 logicalrep_relmap_update(&lrel); 764 765 /* Map the publisher relation to local one. */ 766 relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); 767 Assert(rel == relmapentry->localrel); 768 769 /* Start copy on the publisher. */ 770 initStringInfo(&cmd); 771 if (lrel.relkind == RELKIND_RELATION) 772 appendStringInfo(&cmd, "COPY %s TO STDOUT", 773 quote_qualified_identifier(lrel.nspname, lrel.relname)); 774 else 775 { 776 /* 777 * For non-tables, we need to do COPY (SELECT ...), but we can't just 778 * do SELECT * because we need to not copy generated columns. 779 */ 780 appendStringInfo(&cmd, "COPY (SELECT "); 781 for (int i = 0; i < lrel.natts; i++) 782 { 783 appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i])); 784 if (i < lrel.natts - 1) 785 appendStringInfoString(&cmd, ", "); 786 } 787 appendStringInfo(&cmd, " FROM %s) TO STDOUT", 788 quote_qualified_identifier(lrel.nspname, lrel.relname)); 789 } 790 res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL); 791 pfree(cmd.data); 792 if (res->status != WALRCV_OK_COPY_OUT) 793 ereport(ERROR, 794 (errmsg("could not start initial contents copy for table \"%s.%s\": %s", 795 lrel.nspname, lrel.relname, res->err))); 796 walrcv_clear_result(res); 797 798 copybuf = makeStringInfo(); 799 800 pstate = make_parsestate(NULL); 801 (void) addRangeTableEntryForRelation(pstate, rel, AccessShareLock, 802 NULL, false, false); 803 804 attnamelist = make_copy_attnamelist(relmapentry); 805 cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL); 806 807 /* Do the copy */ 808 (void) CopyFrom(cstate); 809 810 logicalrep_rel_close(relmapentry, NoLock); 811 } 812 813 /* 814 * Start syncing the table in the sync worker. 815 * 816 * The returned slot name is palloc'ed in current memory context. 817 */ 818 char * 819 LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) 820 { 821 char *slotname; 822 char *err; 823 char relstate; 824 XLogRecPtr relstate_lsn; 825 826 /* Check the state of the table synchronization. */ 827 StartTransactionCommand(); 828 relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid, 829 MyLogicalRepWorker->relid, 830 &relstate_lsn, true); 831 CommitTransactionCommand(); 832 833 SpinLockAcquire(&MyLogicalRepWorker->relmutex); 834 MyLogicalRepWorker->relstate = relstate; 835 MyLogicalRepWorker->relstate_lsn = relstate_lsn; 836 SpinLockRelease(&MyLogicalRepWorker->relmutex); 837 838 /* 839 * To build a slot name for the sync work, we are limited to NAMEDATALEN - 840 * 1 characters. We cut the original slot name to NAMEDATALEN - 28 chars 841 * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0'). (It's actually the 842 * NAMEDATALEN on the remote that matters, but this scheme will also work 843 * reasonably if that is different.) 844 */ 845 StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */ 846 slotname = psprintf("%.*s_%u_sync_%u", 847 NAMEDATALEN - 28, 848 MySubscription->slotname, 849 MySubscription->oid, 850 MyLogicalRepWorker->relid); 851 852 /* 853 * Here we use the slot name instead of the subscription name as the 854 * application_name, so that it is different from the main apply worker, 855 * so that synchronous replication can distinguish them. 856 */ 857 LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, 858 slotname, &err); 859 if (LogRepWorkerWalRcvConn == NULL) 860 ereport(ERROR, 861 (errmsg("could not connect to the publisher: %s", err))); 862 863 switch (MyLogicalRepWorker->relstate) 864 { 865 case SUBREL_STATE_INIT: 866 case SUBREL_STATE_DATASYNC: 867 { 868 Relation rel; 869 WalRcvExecResult *res; 870 871 SpinLockAcquire(&MyLogicalRepWorker->relmutex); 872 MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC; 873 MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr; 874 SpinLockRelease(&MyLogicalRepWorker->relmutex); 875 876 /* Update the state and make it visible to others. */ 877 StartTransactionCommand(); 878 UpdateSubscriptionRelState(MyLogicalRepWorker->subid, 879 MyLogicalRepWorker->relid, 880 MyLogicalRepWorker->relstate, 881 MyLogicalRepWorker->relstate_lsn); 882 CommitTransactionCommand(); 883 pgstat_report_stat(false); 884 885 /* 886 * We want to do the table data sync in a single transaction. 887 */ 888 StartTransactionCommand(); 889 890 /* 891 * Use a standard write lock here. It might be better to 892 * disallow access to the table while it's being synchronized. 893 * But we don't want to block the main apply process from 894 * working and it has to open the relation in RowExclusiveLock 895 * when remapping remote relation id to local one. 896 */ 897 rel = table_open(MyLogicalRepWorker->relid, RowExclusiveLock); 898 899 /* 900 * Create a temporary slot for the sync process. We do this 901 * inside the transaction so that we can use the snapshot made 902 * by the slot to get existing data. 903 */ 904 res = walrcv_exec(LogRepWorkerWalRcvConn, 905 "BEGIN READ ONLY ISOLATION LEVEL " 906 "REPEATABLE READ", 0, NULL); 907 if (res->status != WALRCV_OK_COMMAND) 908 ereport(ERROR, 909 (errmsg("table copy could not start transaction on publisher"), 910 errdetail("The error was: %s", res->err))); 911 walrcv_clear_result(res); 912 913 /* 914 * Create new temporary logical decoding slot. 915 * 916 * We'll use slot for data copy so make sure the snapshot is 917 * used for the transaction; that way the COPY will get data 918 * that is consistent with the lsn used by the slot to start 919 * decoding. 920 */ 921 walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, true, 922 CRS_USE_SNAPSHOT, origin_startpos); 923 924 PushActiveSnapshot(GetTransactionSnapshot()); 925 copy_table(rel); 926 PopActiveSnapshot(); 927 928 res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL); 929 if (res->status != WALRCV_OK_COMMAND) 930 ereport(ERROR, 931 (errmsg("table copy could not finish transaction on publisher"), 932 errdetail("The error was: %s", res->err))); 933 walrcv_clear_result(res); 934 935 table_close(rel, NoLock); 936 937 /* Make the copy visible. */ 938 CommandCounterIncrement(); 939 940 /* 941 * We are done with the initial data synchronization, update 942 * the state. 943 */ 944 SpinLockAcquire(&MyLogicalRepWorker->relmutex); 945 MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT; 946 MyLogicalRepWorker->relstate_lsn = *origin_startpos; 947 SpinLockRelease(&MyLogicalRepWorker->relmutex); 948 949 /* Wait for main apply worker to tell us to catchup. */ 950 wait_for_worker_state_change(SUBREL_STATE_CATCHUP); 951 952 /*---------- 953 * There are now two possible states here: 954 * a) Sync is behind the apply. If that's the case we need to 955 * catch up with it by consuming the logical replication 956 * stream up to the relstate_lsn. For that, we exit this 957 * function and continue in ApplyWorkerMain(). 958 * b) Sync is caught up with the apply. So it can just set 959 * the state to SYNCDONE and finish. 960 *---------- 961 */ 962 if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn) 963 { 964 /* 965 * Update the new state in catalog. No need to bother 966 * with the shmem state as we are exiting for good. 967 */ 968 UpdateSubscriptionRelState(MyLogicalRepWorker->subid, 969 MyLogicalRepWorker->relid, 970 SUBREL_STATE_SYNCDONE, 971 *origin_startpos); 972 finish_sync_worker(); 973 } 974 break; 975 } 976 case SUBREL_STATE_SYNCDONE: 977 case SUBREL_STATE_READY: 978 case SUBREL_STATE_UNKNOWN: 979 980 /* 981 * Nothing to do here but finish. (UNKNOWN means the relation was 982 * removed from pg_subscription_rel before the sync worker could 983 * start.) 984 */ 985 finish_sync_worker(); 986 break; 987 default: 988 elog(ERROR, "unknown relation state \"%c\"", 989 MyLogicalRepWorker->relstate); 990 } 991 992 return slotname; 993 } 994