1 /*------------------------------------------------------------------------- 2 * tablesync.c 3 * PostgreSQL logical replication 4 * 5 * Copyright (c) 2012-2018, PostgreSQL Global Development Group 6 * 7 * IDENTIFICATION 8 * src/backend/replication/logical/tablesync.c 9 * 10 * NOTES 11 * This file contains code for initial table data synchronization for 12 * logical replication. 13 * 14 * The initial data synchronization is done separately for each table, 15 * in a separate apply worker that only fetches the initial snapshot data 16 * from the publisher and then synchronizes the position in the stream with 17 * the main apply worker. 18 * 19 * There are several reasons for doing the synchronization this way: 20 * - It allows us to parallelize the initial data synchronization 21 * which lowers the time needed for it to happen. 22 * - The initial synchronization does not have to hold the xid and LSN 23 * for the time it takes to copy data of all tables, causing less 24 * bloat and lower disk consumption compared to doing the 25 * synchronization in a single process for the whole database. 26 * - It allows us to synchronize any tables added after the initial 27 * synchronization has finished. 28 * 29 * The stream position synchronization works in multiple steps. 30 * - Sync finishes copy and sets worker state as SYNCWAIT and waits for 31 * state to change in a loop. 32 * - Apply periodically checks tables that are synchronizing for SYNCWAIT. 33 * When the desired state appears, it will set the worker state to 34 * CATCHUP and starts loop-waiting until either the table state is set 35 * to SYNCDONE or the sync worker exits. 36 * - After the sync worker has seen the state change to CATCHUP, it will 37 * read the stream and apply changes (acting like an apply worker) until 38 * it catches up to the specified stream position. Then it sets the 39 * state to SYNCDONE. There might be zero changes applied between 40 * CATCHUP and SYNCDONE, because the sync worker might be ahead of the 41 * apply worker. 42 * - Once the state was set to SYNCDONE, the apply will continue tracking 43 * the table until it reaches the SYNCDONE stream position, at which 44 * point it sets state to READY and stops tracking. Again, there might 45 * be zero changes in between. 46 * 47 * So the state progression is always: INIT -> DATASYNC -> SYNCWAIT -> CATCHUP -> 48 * SYNCDONE -> READY. 49 * 50 * The catalog pg_subscription_rel is used to keep information about 51 * subscribed tables and their state. Some transient state during data 52 * synchronization is kept in shared memory. The states SYNCWAIT and 53 * CATCHUP only appear in memory. 54 * 55 * Example flows look like this: 56 * - Apply is in front: 57 * sync:8 58 * -> set in memory SYNCWAIT 59 * apply:10 60 * -> set in memory CATCHUP 61 * -> enter wait-loop 62 * sync:10 63 * -> set in catalog SYNCDONE 64 * -> exit 65 * apply:10 66 * -> exit wait-loop 67 * -> continue rep 68 * apply:11 69 * -> set in catalog READY 70 * - Sync in front: 71 * sync:10 72 * -> set in memory SYNCWAIT 73 * apply:8 74 * -> set in memory CATCHUP 75 * -> continue per-table filtering 76 * sync:10 77 * -> set in catalog SYNCDONE 78 * -> exit 79 * apply:10 80 * -> set in catalog READY 81 * -> stop per-table filtering 82 * -> continue rep 83 *------------------------------------------------------------------------- 84 */ 85 86 #include "postgres.h" 87 88 #include "miscadmin.h" 89 #include "pgstat.h" 90 91 #include "access/xact.h" 92 93 #include "catalog/pg_subscription_rel.h" 94 #include "catalog/pg_type.h" 95 96 #include "commands/copy.h" 97 98 #include "parser/parse_relation.h" 99 100 #include "replication/logicallauncher.h" 101 #include "replication/logicalrelation.h" 102 #include "replication/walreceiver.h" 103 #include "replication/worker_internal.h" 104 105 #include "utils/snapmgr.h" 106 #include "storage/ipc.h" 107 108 #include "utils/builtins.h" 109 #include "utils/lsyscache.h" 110 #include "utils/memutils.h" 111 112 static bool table_states_valid = false; 113 114 StringInfo copybuf = NULL; 115 116 /* 117 * Exit routine for synchronization worker. 118 */ 119 static void 120 pg_attribute_noreturn() 121 finish_sync_worker(void) 122 { 123 /* 124 * Commit any outstanding transaction. This is the usual case, unless 125 * there was nothing to do for the table. 126 */ 127 if (IsTransactionState()) 128 { 129 CommitTransactionCommand(); 130 pgstat_report_stat(false); 131 } 132 133 /* And flush all writes. */ 134 XLogFlush(GetXLogWriteRecPtr()); 135 136 StartTransactionCommand(); 137 ereport(LOG, 138 (errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has finished", 139 MySubscription->name, 140 get_rel_name(MyLogicalRepWorker->relid)))); 141 CommitTransactionCommand(); 142 143 /* Find the main apply worker and signal it. */ 144 logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); 145 146 /* Stop gracefully */ 147 proc_exit(0); 148 } 149 150 /* 151 * Wait until the relation synchronization state is set in the catalog to the 152 * expected one. 153 * 154 * Used when transitioning from CATCHUP state to SYNCDONE. 155 * 156 * Returns false if the synchronization worker has disappeared or the table state 157 * has been reset. 158 */ 159 static bool 160 wait_for_relation_state_change(Oid relid, char expected_state) 161 { 162 int rc; 163 char state; 164 165 for (;;) 166 { 167 LogicalRepWorker *worker; 168 XLogRecPtr statelsn; 169 170 CHECK_FOR_INTERRUPTS(); 171 172 /* XXX use cache invalidation here to improve performance? */ 173 PushActiveSnapshot(GetLatestSnapshot()); 174 state = GetSubscriptionRelState(MyLogicalRepWorker->subid, 175 relid, &statelsn, true); 176 PopActiveSnapshot(); 177 178 if (state == SUBREL_STATE_UNKNOWN) 179 return false; 180 181 if (state == expected_state) 182 return true; 183 184 /* Check if the sync worker is still running and bail if not. */ 185 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); 186 187 /* Check if the opposite worker is still running and bail if not. */ 188 worker = logicalrep_worker_find(MyLogicalRepWorker->subid, 189 am_tablesync_worker() ? InvalidOid : relid, 190 false); 191 LWLockRelease(LogicalRepWorkerLock); 192 if (!worker) 193 return false; 194 195 rc = WaitLatch(MyLatch, 196 WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 197 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE); 198 199 /* emergency bailout if postmaster has died */ 200 if (rc & WL_POSTMASTER_DEATH) 201 proc_exit(1); 202 203 ResetLatch(MyLatch); 204 } 205 206 return false; 207 } 208 209 /* 210 * Wait until the apply worker changes the state of our synchronization 211 * worker to the expected one. 212 * 213 * Used when transitioning from SYNCWAIT state to CATCHUP. 214 * 215 * Returns false if the apply worker has disappeared. 216 */ 217 static bool 218 wait_for_worker_state_change(char expected_state) 219 { 220 int rc; 221 222 for (;;) 223 { 224 LogicalRepWorker *worker; 225 226 CHECK_FOR_INTERRUPTS(); 227 228 /* 229 * Done if already in correct state. (We assume this fetch is atomic 230 * enough to not give a misleading answer if we do it with no lock.) 231 */ 232 if (MyLogicalRepWorker->relstate == expected_state) 233 return true; 234 235 /* 236 * Bail out if the apply worker has died, else signal it we're 237 * waiting. 238 */ 239 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); 240 worker = logicalrep_worker_find(MyLogicalRepWorker->subid, 241 InvalidOid, false); 242 if (worker && worker->proc) 243 logicalrep_worker_wakeup_ptr(worker); 244 LWLockRelease(LogicalRepWorkerLock); 245 if (!worker) 246 break; 247 248 /* 249 * Wait. We expect to get a latch signal back from the apply worker, 250 * but use a timeout in case it dies without sending one. 251 */ 252 rc = WaitLatch(MyLatch, 253 WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 254 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE); 255 256 /* emergency bailout if postmaster has died */ 257 if (rc & WL_POSTMASTER_DEATH) 258 proc_exit(1); 259 260 if (rc & WL_LATCH_SET) 261 ResetLatch(MyLatch); 262 } 263 264 return false; 265 } 266 267 /* 268 * Callback from syscache invalidation. 269 */ 270 void 271 invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) 272 { 273 table_states_valid = false; 274 } 275 276 /* 277 * Handle table synchronization cooperation from the synchronization 278 * worker. 279 * 280 * If the sync worker is in CATCHUP state and reached (or passed) the 281 * predetermined synchronization point in the WAL stream, mark the table as 282 * SYNCDONE and finish. 283 */ 284 static void 285 process_syncing_tables_for_sync(XLogRecPtr current_lsn) 286 { 287 Assert(IsTransactionState()); 288 289 SpinLockAcquire(&MyLogicalRepWorker->relmutex); 290 291 if (MyLogicalRepWorker->relstate == SUBREL_STATE_CATCHUP && 292 current_lsn >= MyLogicalRepWorker->relstate_lsn) 293 { 294 TimeLineID tli; 295 296 MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE; 297 MyLogicalRepWorker->relstate_lsn = current_lsn; 298 299 SpinLockRelease(&MyLogicalRepWorker->relmutex); 300 301 UpdateSubscriptionRelState(MyLogicalRepWorker->subid, 302 MyLogicalRepWorker->relid, 303 MyLogicalRepWorker->relstate, 304 MyLogicalRepWorker->relstate_lsn); 305 306 walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli); 307 finish_sync_worker(); 308 } 309 else 310 SpinLockRelease(&MyLogicalRepWorker->relmutex); 311 } 312 313 /* 314 * Handle table synchronization cooperation from the apply worker. 315 * 316 * Walk over all subscription tables that are individually tracked by the 317 * apply process (currently, all that have state other than 318 * SUBREL_STATE_READY) and manage synchronization for them. 319 * 320 * If there are tables that need synchronizing and are not being synchronized 321 * yet, start sync workers for them (if there are free slots for sync 322 * workers). To prevent starting the sync worker for the same relation at a 323 * high frequency after a failure, we store its last start time with each sync 324 * state info. We start the sync worker for the same relation after waiting 325 * at least wal_retrieve_retry_interval. 326 * 327 * For tables that are being synchronized already, check if sync workers 328 * either need action from the apply worker or have finished. This is the 329 * SYNCWAIT to CATCHUP transition. 330 * 331 * If the synchronization position is reached (SYNCDONE), then the table can 332 * be marked as READY and is no longer tracked. 333 */ 334 static void 335 process_syncing_tables_for_apply(XLogRecPtr current_lsn) 336 { 337 struct tablesync_start_time_mapping 338 { 339 Oid relid; 340 TimestampTz last_start_time; 341 }; 342 static List *table_states = NIL; 343 static HTAB *last_start_times = NULL; 344 ListCell *lc; 345 bool started_tx = false; 346 347 Assert(!IsTransactionState()); 348 349 /* We need up-to-date sync state info for subscription tables here. */ 350 if (!table_states_valid) 351 { 352 MemoryContext oldctx; 353 List *rstates; 354 ListCell *lc; 355 SubscriptionRelState *rstate; 356 357 /* Clean the old list. */ 358 list_free_deep(table_states); 359 table_states = NIL; 360 361 StartTransactionCommand(); 362 started_tx = true; 363 364 /* Fetch all non-ready tables. */ 365 rstates = GetSubscriptionNotReadyRelations(MySubscription->oid); 366 367 /* Allocate the tracking info in a permanent memory context. */ 368 oldctx = MemoryContextSwitchTo(CacheMemoryContext); 369 foreach(lc, rstates) 370 { 371 rstate = palloc(sizeof(SubscriptionRelState)); 372 memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); 373 table_states = lappend(table_states, rstate); 374 } 375 MemoryContextSwitchTo(oldctx); 376 377 table_states_valid = true; 378 } 379 380 /* 381 * Prepare a hash table for tracking last start times of workers, to avoid 382 * immediate restarts. We don't need it if there are no tables that need 383 * syncing. 384 */ 385 if (table_states && !last_start_times) 386 { 387 HASHCTL ctl; 388 389 memset(&ctl, 0, sizeof(ctl)); 390 ctl.keysize = sizeof(Oid); 391 ctl.entrysize = sizeof(struct tablesync_start_time_mapping); 392 last_start_times = hash_create("Logical replication table sync worker start times", 393 256, &ctl, HASH_ELEM | HASH_BLOBS); 394 } 395 396 /* 397 * Clean up the hash table when we're done with all tables (just to 398 * release the bit of memory). 399 */ 400 else if (!table_states && last_start_times) 401 { 402 hash_destroy(last_start_times); 403 last_start_times = NULL; 404 } 405 406 /* 407 * Process all tables that are being synchronized. 408 */ 409 foreach(lc, table_states) 410 { 411 SubscriptionRelState *rstate = (SubscriptionRelState *) lfirst(lc); 412 413 if (rstate->state == SUBREL_STATE_SYNCDONE) 414 { 415 /* 416 * Apply has caught up to the position where the table sync has 417 * finished. Mark the table as ready so that the apply will just 418 * continue to replicate it normally. 419 */ 420 if (current_lsn >= rstate->lsn) 421 { 422 rstate->state = SUBREL_STATE_READY; 423 rstate->lsn = current_lsn; 424 if (!started_tx) 425 { 426 StartTransactionCommand(); 427 started_tx = true; 428 } 429 430 UpdateSubscriptionRelState(MyLogicalRepWorker->subid, 431 rstate->relid, rstate->state, 432 rstate->lsn); 433 } 434 } 435 else 436 { 437 LogicalRepWorker *syncworker; 438 439 /* 440 * Look for a sync worker for this relation. 441 */ 442 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); 443 444 syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid, 445 rstate->relid, false); 446 447 if (syncworker) 448 { 449 /* Found one, update our copy of its state */ 450 SpinLockAcquire(&syncworker->relmutex); 451 rstate->state = syncworker->relstate; 452 rstate->lsn = syncworker->relstate_lsn; 453 if (rstate->state == SUBREL_STATE_SYNCWAIT) 454 { 455 /* 456 * Sync worker is waiting for apply. Tell sync worker it 457 * can catchup now. 458 */ 459 syncworker->relstate = SUBREL_STATE_CATCHUP; 460 syncworker->relstate_lsn = 461 Max(syncworker->relstate_lsn, current_lsn); 462 } 463 SpinLockRelease(&syncworker->relmutex); 464 465 /* If we told worker to catch up, wait for it. */ 466 if (rstate->state == SUBREL_STATE_SYNCWAIT) 467 { 468 /* Signal the sync worker, as it may be waiting for us. */ 469 if (syncworker->proc) 470 logicalrep_worker_wakeup_ptr(syncworker); 471 472 /* Now safe to release the LWLock */ 473 LWLockRelease(LogicalRepWorkerLock); 474 475 /* 476 * Enter busy loop and wait for synchronization worker to 477 * reach expected state (or die trying). 478 */ 479 if (!started_tx) 480 { 481 StartTransactionCommand(); 482 started_tx = true; 483 } 484 485 wait_for_relation_state_change(rstate->relid, 486 SUBREL_STATE_SYNCDONE); 487 } 488 else 489 LWLockRelease(LogicalRepWorkerLock); 490 } 491 else 492 { 493 /* 494 * If there is no sync worker for this table yet, count 495 * running sync workers for this subscription, while we have 496 * the lock. 497 */ 498 int nsyncworkers = 499 logicalrep_sync_worker_count(MyLogicalRepWorker->subid); 500 501 /* Now safe to release the LWLock */ 502 LWLockRelease(LogicalRepWorkerLock); 503 504 /* 505 * If there are free sync worker slot(s), start a new sync 506 * worker for the table. 507 */ 508 if (nsyncworkers < max_sync_workers_per_subscription) 509 { 510 TimestampTz now = GetCurrentTimestamp(); 511 struct tablesync_start_time_mapping *hentry; 512 bool found; 513 514 hentry = hash_search(last_start_times, &rstate->relid, 515 HASH_ENTER, &found); 516 517 if (!found || 518 TimestampDifferenceExceeds(hentry->last_start_time, now, 519 wal_retrieve_retry_interval)) 520 { 521 logicalrep_worker_launch(MyLogicalRepWorker->dbid, 522 MySubscription->oid, 523 MySubscription->name, 524 MyLogicalRepWorker->userid, 525 rstate->relid); 526 hentry->last_start_time = now; 527 } 528 } 529 } 530 } 531 } 532 533 if (started_tx) 534 { 535 CommitTransactionCommand(); 536 pgstat_report_stat(false); 537 } 538 } 539 540 /* 541 * Process possible state change(s) of tables that are being synchronized. 542 */ 543 void 544 process_syncing_tables(XLogRecPtr current_lsn) 545 { 546 if (am_tablesync_worker()) 547 process_syncing_tables_for_sync(current_lsn); 548 else 549 process_syncing_tables_for_apply(current_lsn); 550 } 551 552 /* 553 * Create list of columns for COPY based on logical relation mapping. 554 */ 555 static List * 556 make_copy_attnamelist(LogicalRepRelMapEntry *rel) 557 { 558 List *attnamelist = NIL; 559 int i; 560 561 for (i = 0; i < rel->remoterel.natts; i++) 562 { 563 attnamelist = lappend(attnamelist, 564 makeString(rel->remoterel.attnames[i])); 565 } 566 567 568 return attnamelist; 569 } 570 571 /* 572 * Data source callback for the COPY FROM, which reads from the remote 573 * connection and passes the data back to our local COPY. 574 */ 575 static int 576 copy_read_data(void *outbuf, int minread, int maxread) 577 { 578 int bytesread = 0; 579 int avail; 580 581 /* If there are some leftover data from previous read, use it. */ 582 avail = copybuf->len - copybuf->cursor; 583 if (avail) 584 { 585 if (avail > maxread) 586 avail = maxread; 587 memcpy(outbuf, ©buf->data[copybuf->cursor], avail); 588 copybuf->cursor += avail; 589 maxread -= avail; 590 bytesread += avail; 591 } 592 593 while (maxread > 0 && bytesread < minread) 594 { 595 pgsocket fd = PGINVALID_SOCKET; 596 int rc; 597 int len; 598 char *buf = NULL; 599 600 for (;;) 601 { 602 /* Try read the data. */ 603 len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd); 604 605 CHECK_FOR_INTERRUPTS(); 606 607 if (len == 0) 608 break; 609 else if (len < 0) 610 return bytesread; 611 else 612 { 613 /* Process the data */ 614 copybuf->data = buf; 615 copybuf->len = len; 616 copybuf->cursor = 0; 617 618 avail = copybuf->len - copybuf->cursor; 619 if (avail > maxread) 620 avail = maxread; 621 memcpy(outbuf, ©buf->data[copybuf->cursor], avail); 622 outbuf = (void *) ((char *) outbuf + avail); 623 copybuf->cursor += avail; 624 maxread -= avail; 625 bytesread += avail; 626 } 627 628 if (maxread <= 0 || bytesread >= minread) 629 return bytesread; 630 } 631 632 /* 633 * Wait for more data or latch. 634 */ 635 rc = WaitLatchOrSocket(MyLatch, 636 WL_SOCKET_READABLE | WL_LATCH_SET | 637 WL_TIMEOUT | WL_POSTMASTER_DEATH, 638 fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA); 639 640 /* Emergency bailout if postmaster has died */ 641 if (rc & WL_POSTMASTER_DEATH) 642 proc_exit(1); 643 644 ResetLatch(MyLatch); 645 } 646 647 return bytesread; 648 } 649 650 651 /* 652 * Get information about remote relation in similar fashion the RELATION 653 * message provides during replication. 654 */ 655 static void 656 fetch_remote_table_info(char *nspname, char *relname, 657 LogicalRepRelation *lrel) 658 { 659 WalRcvExecResult *res; 660 StringInfoData cmd; 661 TupleTableSlot *slot; 662 Oid tableRow[2] = {OIDOID, CHAROID}; 663 Oid attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID}; 664 bool isnull; 665 int natt; 666 667 lrel->nspname = nspname; 668 lrel->relname = relname; 669 670 /* First fetch Oid and replica identity. */ 671 initStringInfo(&cmd); 672 appendStringInfo(&cmd, "SELECT c.oid, c.relreplident" 673 " FROM pg_catalog.pg_class c" 674 " INNER JOIN pg_catalog.pg_namespace n" 675 " ON (c.relnamespace = n.oid)" 676 " WHERE n.nspname = %s" 677 " AND c.relname = %s" 678 " AND c.relkind = 'r'", 679 quote_literal_cstr(nspname), 680 quote_literal_cstr(relname)); 681 res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 682 lengthof(tableRow), tableRow); 683 684 if (res->status != WALRCV_OK_TUPLES) 685 ereport(ERROR, 686 (errmsg("could not fetch table info for table \"%s.%s\" from publisher: %s", 687 nspname, relname, res->err))); 688 689 slot = MakeSingleTupleTableSlot(res->tupledesc); 690 if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) 691 ereport(ERROR, 692 (errmsg("table \"%s.%s\" not found on publisher", 693 nspname, relname))); 694 695 lrel->remoteid = DatumGetObjectId(slot_getattr(slot, 1, &isnull)); 696 Assert(!isnull); 697 lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull)); 698 Assert(!isnull); 699 700 ExecDropSingleTupleTableSlot(slot); 701 walrcv_clear_result(res); 702 703 /* Now fetch columns. */ 704 resetStringInfo(&cmd); 705 appendStringInfo(&cmd, 706 "SELECT a.attname," 707 " a.atttypid," 708 " a.atttypmod," 709 " a.attnum = ANY(i.indkey)" 710 " FROM pg_catalog.pg_attribute a" 711 " LEFT JOIN pg_catalog.pg_index i" 712 " ON (i.indexrelid = pg_get_replica_identity_index(%u))" 713 " WHERE a.attnum > 0::pg_catalog.int2" 714 " AND NOT a.attisdropped" 715 " AND a.attrelid = %u" 716 " ORDER BY a.attnum", 717 lrel->remoteid, lrel->remoteid); 718 res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 719 lengthof(attrRow), attrRow); 720 721 if (res->status != WALRCV_OK_TUPLES) 722 ereport(ERROR, 723 (errmsg("could not fetch table info for table \"%s.%s\": %s", 724 nspname, relname, res->err))); 725 726 /* We don't know the number of rows coming, so allocate enough space. */ 727 lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *)); 728 lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid)); 729 lrel->attkeys = NULL; 730 731 natt = 0; 732 slot = MakeSingleTupleTableSlot(res->tupledesc); 733 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) 734 { 735 lrel->attnames[natt] = 736 TextDatumGetCString(slot_getattr(slot, 1, &isnull)); 737 Assert(!isnull); 738 lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull)); 739 Assert(!isnull); 740 if (DatumGetBool(slot_getattr(slot, 4, &isnull))) 741 lrel->attkeys = bms_add_member(lrel->attkeys, natt); 742 743 /* Should never happen. */ 744 if (++natt >= MaxTupleAttributeNumber) 745 elog(ERROR, "too many columns in remote table \"%s.%s\"", 746 nspname, relname); 747 748 ExecClearTuple(slot); 749 } 750 ExecDropSingleTupleTableSlot(slot); 751 752 lrel->natts = natt; 753 754 walrcv_clear_result(res); 755 pfree(cmd.data); 756 } 757 758 /* 759 * Copy existing data of a table from publisher. 760 * 761 * Caller is responsible for locking the local relation. 762 */ 763 static void 764 copy_table(Relation rel) 765 { 766 LogicalRepRelMapEntry *relmapentry; 767 LogicalRepRelation lrel; 768 WalRcvExecResult *res; 769 StringInfoData cmd; 770 CopyState cstate; 771 List *attnamelist; 772 ParseState *pstate; 773 774 /* Get the publisher relation info. */ 775 fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), 776 RelationGetRelationName(rel), &lrel); 777 778 /* Put the relation into relmap. */ 779 logicalrep_relmap_update(&lrel); 780 781 /* Map the publisher relation to local one. */ 782 relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); 783 Assert(rel == relmapentry->localrel); 784 785 /* Start copy on the publisher. */ 786 initStringInfo(&cmd); 787 appendStringInfo(&cmd, "COPY %s TO STDOUT", 788 quote_qualified_identifier(lrel.nspname, lrel.relname)); 789 res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL); 790 pfree(cmd.data); 791 if (res->status != WALRCV_OK_COPY_OUT) 792 ereport(ERROR, 793 (errmsg("could not start initial contents copy for table \"%s.%s\": %s", 794 lrel.nspname, lrel.relname, res->err))); 795 walrcv_clear_result(res); 796 797 copybuf = makeStringInfo(); 798 799 pstate = make_parsestate(NULL); 800 addRangeTableEntryForRelation(pstate, rel, NULL, false, false); 801 802 attnamelist = make_copy_attnamelist(relmapentry); 803 cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL); 804 805 /* Do the copy */ 806 (void) CopyFrom(cstate); 807 808 logicalrep_rel_close(relmapentry, NoLock); 809 } 810 811 /* 812 * Start syncing the table in the sync worker. 813 * 814 * The returned slot name is palloc'ed in current memory context. 815 */ 816 char * 817 LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) 818 { 819 char *slotname; 820 char *err; 821 char relstate; 822 XLogRecPtr relstate_lsn; 823 824 /* Check the state of the table synchronization. */ 825 StartTransactionCommand(); 826 relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid, 827 MyLogicalRepWorker->relid, 828 &relstate_lsn, true); 829 CommitTransactionCommand(); 830 831 SpinLockAcquire(&MyLogicalRepWorker->relmutex); 832 MyLogicalRepWorker->relstate = relstate; 833 MyLogicalRepWorker->relstate_lsn = relstate_lsn; 834 SpinLockRelease(&MyLogicalRepWorker->relmutex); 835 836 /* 837 * To build a slot name for the sync work, we are limited to NAMEDATALEN - 838 * 1 characters. We cut the original slot name to NAMEDATALEN - 28 chars 839 * and append _%u_sync_%u (1 + 10 + 6 + 10 + '\0'). (It's actually the 840 * NAMEDATALEN on the remote that matters, but this scheme will also work 841 * reasonably if that is different.) 842 */ 843 StaticAssertStmt(NAMEDATALEN >= 32, "NAMEDATALEN too small"); /* for sanity */ 844 slotname = psprintf("%.*s_%u_sync_%u", 845 NAMEDATALEN - 28, 846 MySubscription->slotname, 847 MySubscription->oid, 848 MyLogicalRepWorker->relid); 849 850 /* 851 * Here we use the slot name instead of the subscription name as the 852 * application_name, so that it is different from the main apply worker, 853 * so that synchronous replication can distinguish them. 854 */ 855 LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true, 856 slotname, &err); 857 if (LogRepWorkerWalRcvConn == NULL) 858 ereport(ERROR, 859 (errmsg("could not connect to the publisher: %s", err))); 860 861 switch (MyLogicalRepWorker->relstate) 862 { 863 case SUBREL_STATE_INIT: 864 case SUBREL_STATE_DATASYNC: 865 { 866 Relation rel; 867 WalRcvExecResult *res; 868 869 SpinLockAcquire(&MyLogicalRepWorker->relmutex); 870 MyLogicalRepWorker->relstate = SUBREL_STATE_DATASYNC; 871 MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr; 872 SpinLockRelease(&MyLogicalRepWorker->relmutex); 873 874 /* Update the state and make it visible to others. */ 875 StartTransactionCommand(); 876 UpdateSubscriptionRelState(MyLogicalRepWorker->subid, 877 MyLogicalRepWorker->relid, 878 MyLogicalRepWorker->relstate, 879 MyLogicalRepWorker->relstate_lsn); 880 CommitTransactionCommand(); 881 pgstat_report_stat(false); 882 883 /* 884 * We want to do the table data sync in a single transaction. 885 */ 886 StartTransactionCommand(); 887 888 /* 889 * Use a standard write lock here. It might be better to 890 * disallow access to the table while it's being synchronized. 891 * But we don't want to block the main apply process from 892 * working and it has to open the relation in RowExclusiveLock 893 * when remapping remote relation id to local one. 894 */ 895 rel = heap_open(MyLogicalRepWorker->relid, RowExclusiveLock); 896 897 /* 898 * Create a temporary slot for the sync process. We do this 899 * inside the transaction so that we can use the snapshot made 900 * by the slot to get existing data. 901 */ 902 res = walrcv_exec(LogRepWorkerWalRcvConn, 903 "BEGIN READ ONLY ISOLATION LEVEL " 904 "REPEATABLE READ", 0, NULL); 905 if (res->status != WALRCV_OK_COMMAND) 906 ereport(ERROR, 907 (errmsg("table copy could not start transaction on publisher"), 908 errdetail("The error was: %s", res->err))); 909 walrcv_clear_result(res); 910 911 /* 912 * Create new temporary logical decoding slot. 913 * 914 * We'll use slot for data copy so make sure the snapshot is 915 * used for the transaction; that way the COPY will get data 916 * that is consistent with the lsn used by the slot to start 917 * decoding. 918 */ 919 walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, true, 920 CRS_USE_SNAPSHOT, origin_startpos); 921 922 PushActiveSnapshot(GetTransactionSnapshot()); 923 copy_table(rel); 924 PopActiveSnapshot(); 925 926 res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL); 927 if (res->status != WALRCV_OK_COMMAND) 928 ereport(ERROR, 929 (errmsg("table copy could not finish transaction on publisher"), 930 errdetail("The error was: %s", res->err))); 931 walrcv_clear_result(res); 932 933 heap_close(rel, NoLock); 934 935 /* Make the copy visible. */ 936 CommandCounterIncrement(); 937 938 /* 939 * We are done with the initial data synchronization, update 940 * the state. 941 */ 942 SpinLockAcquire(&MyLogicalRepWorker->relmutex); 943 MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCWAIT; 944 MyLogicalRepWorker->relstate_lsn = *origin_startpos; 945 SpinLockRelease(&MyLogicalRepWorker->relmutex); 946 947 /* Wait for main apply worker to tell us to catchup. */ 948 wait_for_worker_state_change(SUBREL_STATE_CATCHUP); 949 950 /*---------- 951 * There are now two possible states here: 952 * a) Sync is behind the apply. If that's the case we need to 953 * catch up with it by consuming the logical replication 954 * stream up to the relstate_lsn. For that, we exit this 955 * function and continue in ApplyWorkerMain(). 956 * b) Sync is caught up with the apply. So it can just set 957 * the state to SYNCDONE and finish. 958 *---------- 959 */ 960 if (*origin_startpos >= MyLogicalRepWorker->relstate_lsn) 961 { 962 /* 963 * Update the new state in catalog. No need to bother 964 * with the shmem state as we are exiting for good. 965 */ 966 UpdateSubscriptionRelState(MyLogicalRepWorker->subid, 967 MyLogicalRepWorker->relid, 968 SUBREL_STATE_SYNCDONE, 969 *origin_startpos); 970 finish_sync_worker(); 971 } 972 break; 973 } 974 case SUBREL_STATE_SYNCDONE: 975 case SUBREL_STATE_READY: 976 case SUBREL_STATE_UNKNOWN: 977 978 /* 979 * Nothing to do here but finish. (UNKNOWN means the relation was 980 * removed from pg_subscription_rel before the sync worker could 981 * start.) 982 */ 983 finish_sync_worker(); 984 break; 985 default: 986 elog(ERROR, "unknown relation state \"%c\"", 987 MyLogicalRepWorker->relstate); 988 } 989 990 return slotname; 991 } 992