1 /*------------------------------------------------------------------------- 2 * 3 * parallel.c 4 * 5 * Parallel support for pg_dump and pg_restore 6 * 7 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group 8 * Portions Copyright (c) 1994, Regents of the University of California 9 * 10 * IDENTIFICATION 11 * src/bin/pg_dump/parallel.c 12 * 13 *------------------------------------------------------------------------- 14 */ 15 16 /* 17 * Parallel operation works like this: 18 * 19 * The original, master process calls ParallelBackupStart(), which forks off 20 * the desired number of worker processes, which each enter WaitForCommands(). 21 * 22 * The master process dispatches an individual work item to one of the worker 23 * processes in DispatchJobForTocEntry(). We send a command string such as 24 * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID. 25 * The worker process receives and decodes the command and passes it to the 26 * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr, 27 * which are routines of the current archive format. That routine performs 28 * the required action (dump or restore) and returns an integer status code. 29 * This is passed back to the master where we pass it to the 30 * ParallelCompletionPtr callback function that was passed to 31 * DispatchJobForTocEntry(). The callback function does state updating 32 * for the master control logic in pg_backup_archiver.c. 33 * 34 * In principle additional archive-format-specific information might be needed 35 * in commands or worker status responses, but so far that hasn't proved 36 * necessary, since workers have full copies of the ArchiveHandle/TocEntry 37 * data structures. Remember that we have forked off the workers only after 38 * we have read in the catalog. That's why our worker processes can also 39 * access the catalog information. (In the Windows case, the workers are 40 * threads in the same process. To avoid problems, they work with cloned 41 * copies of the Archive data structure; see RunWorker().) 42 * 43 * In the master process, the workerStatus field for each worker has one of 44 * the following values: 45 * WRKR_NOT_STARTED: we've not yet forked this worker 46 * WRKR_IDLE: it's waiting for a command 47 * WRKR_WORKING: it's working on a command 48 * WRKR_TERMINATED: process ended 49 * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING 50 * state, and must be NULL in other states. 51 */ 52 53 #include "postgres_fe.h" 54 55 #ifndef WIN32 56 #include <sys/wait.h> 57 #include <signal.h> 58 #include <unistd.h> 59 #include <fcntl.h> 60 #endif 61 #ifdef HAVE_SYS_SELECT_H 62 #include <sys/select.h> 63 #endif 64 65 #include "parallel.h" 66 #include "pg_backup_utils.h" 67 68 #include "fe_utils/string_utils.h" 69 #include "port/pg_bswap.h" 70 71 /* Mnemonic macros for indexing the fd array returned by pipe(2) */ 72 #define PIPE_READ 0 73 #define PIPE_WRITE 1 74 75 #define NO_SLOT (-1) /* Failure result for GetIdleWorker() */ 76 77 /* Worker process statuses */ 78 typedef enum 79 { 80 WRKR_NOT_STARTED = 0, 81 WRKR_IDLE, 82 WRKR_WORKING, 83 WRKR_TERMINATED 84 } T_WorkerStatus; 85 86 #define WORKER_IS_RUNNING(workerStatus) \ 87 ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING) 88 89 /* 90 * Private per-parallel-worker state (typedef for this is in parallel.h). 91 * 92 * Much of this is valid only in the master process (or, on Windows, should 93 * be touched only by the master thread). But the AH field should be touched 94 * only by workers. The pipe descriptors are valid everywhere. 95 */ 96 struct ParallelSlot 97 { 98 T_WorkerStatus workerStatus; /* see enum above */ 99 100 /* These fields are valid if workerStatus == WRKR_WORKING: */ 101 ParallelCompletionPtr callback; /* function to call on completion */ 102 void *callback_data; /* passthrough data for it */ 103 104 ArchiveHandle *AH; /* Archive data worker is using */ 105 106 int pipeRead; /* master's end of the pipes */ 107 int pipeWrite; 108 int pipeRevRead; /* child's end of the pipes */ 109 int pipeRevWrite; 110 111 /* Child process/thread identity info: */ 112 #ifdef WIN32 113 uintptr_t hThread; 114 unsigned int threadId; 115 #else 116 pid_t pid; 117 #endif 118 }; 119 120 #ifdef WIN32 121 122 /* 123 * Structure to hold info passed by _beginthreadex() to the function it calls 124 * via its single allowed argument. 125 */ 126 typedef struct 127 { 128 ArchiveHandle *AH; /* master database connection */ 129 ParallelSlot *slot; /* this worker's parallel slot */ 130 } WorkerInfo; 131 132 /* Windows implementation of pipe access */ 133 static int pgpipe(int handles[2]); 134 static int piperead(int s, char *buf, int len); 135 #define pipewrite(a,b,c) send(a,b,c,0) 136 137 #else /* !WIN32 */ 138 139 /* Non-Windows implementation of pipe access */ 140 #define pgpipe(a) pipe(a) 141 #define piperead(a,b,c) read(a,b,c) 142 #define pipewrite(a,b,c) write(a,b,c) 143 144 #endif /* WIN32 */ 145 146 /* 147 * State info for archive_close_connection() shutdown callback. 148 */ 149 typedef struct ShutdownInformation 150 { 151 ParallelState *pstate; 152 Archive *AHX; 153 } ShutdownInformation; 154 155 static ShutdownInformation shutdown_info; 156 157 /* 158 * State info for signal handling. 159 * We assume signal_info initializes to zeroes. 160 * 161 * On Unix, myAH is the master DB connection in the master process, and the 162 * worker's own connection in worker processes. On Windows, we have only one 163 * instance of signal_info, so myAH is the master connection and the worker 164 * connections must be dug out of pstate->parallelSlot[]. 165 */ 166 typedef struct DumpSignalInformation 167 { 168 ArchiveHandle *myAH; /* database connection to issue cancel for */ 169 ParallelState *pstate; /* parallel state, if any */ 170 bool handler_set; /* signal handler set up in this process? */ 171 #ifndef WIN32 172 bool am_worker; /* am I a worker process? */ 173 #endif 174 } DumpSignalInformation; 175 176 static volatile DumpSignalInformation signal_info; 177 178 #ifdef WIN32 179 static CRITICAL_SECTION signal_info_lock; 180 #endif 181 182 /* 183 * Write a simple string to stderr --- must be safe in a signal handler. 184 * We ignore the write() result since there's not much we could do about it. 185 * Certain compilers make that harder than it ought to be. 186 */ 187 #define write_stderr(str) \ 188 do { \ 189 const char *str_ = (str); \ 190 int rc_; \ 191 rc_ = write(fileno(stderr), str_, strlen(str_)); \ 192 (void) rc_; \ 193 } while (0) 194 195 196 #ifdef WIN32 197 /* file-scope variables */ 198 static DWORD tls_index; 199 200 /* globally visible variables (needed by exit_nicely) */ 201 bool parallel_init_done = false; 202 DWORD mainThreadId; 203 #endif /* WIN32 */ 204 205 /* Local function prototypes */ 206 static ParallelSlot *GetMyPSlot(ParallelState *pstate); 207 static void archive_close_connection(int code, void *arg); 208 static void ShutdownWorkersHard(ParallelState *pstate); 209 static void WaitForTerminatingWorkers(ParallelState *pstate); 210 static void setup_cancel_handler(void); 211 static void set_cancel_pstate(ParallelState *pstate); 212 static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH); 213 static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot); 214 static int GetIdleWorker(ParallelState *pstate); 215 static bool HasEveryWorkerTerminated(ParallelState *pstate); 216 static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te); 217 static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]); 218 static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, 219 bool do_wait); 220 static char *getMessageFromMaster(int pipefd[2]); 221 static void sendMessageToMaster(int pipefd[2], const char *str); 222 static int select_loop(int maxFd, fd_set *workerset); 223 static char *getMessageFromWorker(ParallelState *pstate, 224 bool do_wait, int *worker); 225 static void sendMessageToWorker(ParallelState *pstate, 226 int worker, const char *str); 227 static char *readMessageFromPipe(int fd); 228 229 #define messageStartsWith(msg, prefix) \ 230 (strncmp(msg, prefix, strlen(prefix)) == 0) 231 232 233 /* 234 * Initialize parallel dump support --- should be called early in process 235 * startup. (Currently, this is called whether or not we intend parallel 236 * activity.) 237 */ 238 void 239 init_parallel_dump_utils(void) 240 { 241 #ifdef WIN32 242 if (!parallel_init_done) 243 { 244 WSADATA wsaData; 245 int err; 246 247 /* Prepare for threaded operation */ 248 tls_index = TlsAlloc(); 249 mainThreadId = GetCurrentThreadId(); 250 251 /* Initialize socket access */ 252 err = WSAStartup(MAKEWORD(2, 2), &wsaData); 253 if (err != 0) 254 { 255 pg_log_error("WSAStartup failed: %d", err); 256 exit_nicely(1); 257 } 258 259 parallel_init_done = true; 260 } 261 #endif 262 } 263 264 /* 265 * Find the ParallelSlot for the current worker process or thread. 266 * 267 * Returns NULL if no matching slot is found (this implies we're the master). 268 */ 269 static ParallelSlot * 270 GetMyPSlot(ParallelState *pstate) 271 { 272 int i; 273 274 for (i = 0; i < pstate->numWorkers; i++) 275 { 276 #ifdef WIN32 277 if (pstate->parallelSlot[i].threadId == GetCurrentThreadId()) 278 #else 279 if (pstate->parallelSlot[i].pid == getpid()) 280 #endif 281 return &(pstate->parallelSlot[i]); 282 } 283 284 return NULL; 285 } 286 287 /* 288 * A thread-local version of getLocalPQExpBuffer(). 289 * 290 * Non-reentrant but reduces memory leakage: we'll consume one buffer per 291 * thread, which is much better than one per fmtId/fmtQualifiedId call. 292 */ 293 #ifdef WIN32 294 static PQExpBuffer 295 getThreadLocalPQExpBuffer(void) 296 { 297 /* 298 * The Tls code goes awry if we use a static var, so we provide for both 299 * static and auto, and omit any use of the static var when using Tls. We 300 * rely on TlsGetValue() to return 0 if the value is not yet set. 301 */ 302 static PQExpBuffer s_id_return = NULL; 303 PQExpBuffer id_return; 304 305 if (parallel_init_done) 306 id_return = (PQExpBuffer) TlsGetValue(tls_index); 307 else 308 id_return = s_id_return; 309 310 if (id_return) /* first time through? */ 311 { 312 /* same buffer, just wipe contents */ 313 resetPQExpBuffer(id_return); 314 } 315 else 316 { 317 /* new buffer */ 318 id_return = createPQExpBuffer(); 319 if (parallel_init_done) 320 TlsSetValue(tls_index, id_return); 321 else 322 s_id_return = id_return; 323 } 324 325 return id_return; 326 } 327 #endif /* WIN32 */ 328 329 /* 330 * pg_dump and pg_restore call this to register the cleanup handler 331 * as soon as they've created the ArchiveHandle. 332 */ 333 void 334 on_exit_close_archive(Archive *AHX) 335 { 336 shutdown_info.AHX = AHX; 337 on_exit_nicely(archive_close_connection, &shutdown_info); 338 } 339 340 /* 341 * on_exit_nicely handler for shutting down database connections and 342 * worker processes cleanly. 343 */ 344 static void 345 archive_close_connection(int code, void *arg) 346 { 347 ShutdownInformation *si = (ShutdownInformation *) arg; 348 349 if (si->pstate) 350 { 351 /* In parallel mode, must figure out who we are */ 352 ParallelSlot *slot = GetMyPSlot(si->pstate); 353 354 if (!slot) 355 { 356 /* 357 * We're the master. Forcibly shut down workers, then close our 358 * own database connection, if any. 359 */ 360 ShutdownWorkersHard(si->pstate); 361 362 if (si->AHX) 363 DisconnectDatabase(si->AHX); 364 } 365 else 366 { 367 /* 368 * We're a worker. Shut down our own DB connection if any. On 369 * Windows, we also have to close our communication sockets, to 370 * emulate what will happen on Unix when the worker process exits. 371 * (Without this, if this is a premature exit, the master would 372 * fail to detect it because there would be no EOF condition on 373 * the other end of the pipe.) 374 */ 375 if (slot->AH) 376 DisconnectDatabase(&(slot->AH->public)); 377 378 #ifdef WIN32 379 closesocket(slot->pipeRevRead); 380 closesocket(slot->pipeRevWrite); 381 #endif 382 } 383 } 384 else 385 { 386 /* Non-parallel operation: just kill the master DB connection */ 387 if (si->AHX) 388 DisconnectDatabase(si->AHX); 389 } 390 } 391 392 /* 393 * Forcibly shut down any remaining workers, waiting for them to finish. 394 * 395 * Note that we don't expect to come here during normal exit (the workers 396 * should be long gone, and the ParallelState too). We're only here in a 397 * fatal() situation, so intervening to cancel active commands is 398 * appropriate. 399 */ 400 static void 401 ShutdownWorkersHard(ParallelState *pstate) 402 { 403 int i; 404 405 /* 406 * Close our write end of the sockets so that any workers waiting for 407 * commands know they can exit. (Note: some of the pipeWrite fields might 408 * still be zero, if we failed to initialize all the workers. Hence, just 409 * ignore errors here.) 410 */ 411 for (i = 0; i < pstate->numWorkers; i++) 412 closesocket(pstate->parallelSlot[i].pipeWrite); 413 414 /* 415 * Force early termination of any commands currently in progress. 416 */ 417 #ifndef WIN32 418 /* On non-Windows, send SIGTERM to each worker process. */ 419 for (i = 0; i < pstate->numWorkers; i++) 420 { 421 pid_t pid = pstate->parallelSlot[i].pid; 422 423 if (pid != 0) 424 kill(pid, SIGTERM); 425 } 426 #else 427 428 /* 429 * On Windows, send query cancels directly to the workers' backends. Use 430 * a critical section to ensure worker threads don't change state. 431 */ 432 EnterCriticalSection(&signal_info_lock); 433 for (i = 0; i < pstate->numWorkers; i++) 434 { 435 ArchiveHandle *AH = pstate->parallelSlot[i].AH; 436 char errbuf[1]; 437 438 if (AH != NULL && AH->connCancel != NULL) 439 (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf)); 440 } 441 LeaveCriticalSection(&signal_info_lock); 442 #endif 443 444 /* Now wait for them to terminate. */ 445 WaitForTerminatingWorkers(pstate); 446 } 447 448 /* 449 * Wait for all workers to terminate. 450 */ 451 static void 452 WaitForTerminatingWorkers(ParallelState *pstate) 453 { 454 while (!HasEveryWorkerTerminated(pstate)) 455 { 456 ParallelSlot *slot = NULL; 457 int j; 458 459 #ifndef WIN32 460 /* On non-Windows, use wait() to wait for next worker to end */ 461 int status; 462 pid_t pid = wait(&status); 463 464 /* Find dead worker's slot, and clear the PID field */ 465 for (j = 0; j < pstate->numWorkers; j++) 466 { 467 slot = &(pstate->parallelSlot[j]); 468 if (slot->pid == pid) 469 { 470 slot->pid = 0; 471 break; 472 } 473 } 474 #else /* WIN32 */ 475 /* On Windows, we must use WaitForMultipleObjects() */ 476 HANDLE *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers); 477 int nrun = 0; 478 DWORD ret; 479 uintptr_t hThread; 480 481 for (j = 0; j < pstate->numWorkers; j++) 482 { 483 if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus)) 484 { 485 lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread; 486 nrun++; 487 } 488 } 489 ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE); 490 Assert(ret != WAIT_FAILED); 491 hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0]; 492 free(lpHandles); 493 494 /* Find dead worker's slot, and clear the hThread field */ 495 for (j = 0; j < pstate->numWorkers; j++) 496 { 497 slot = &(pstate->parallelSlot[j]); 498 if (slot->hThread == hThread) 499 { 500 /* For cleanliness, close handles for dead threads */ 501 CloseHandle((HANDLE) slot->hThread); 502 slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE; 503 break; 504 } 505 } 506 #endif /* WIN32 */ 507 508 /* On all platforms, update workerStatus and te[] as well */ 509 Assert(j < pstate->numWorkers); 510 slot->workerStatus = WRKR_TERMINATED; 511 pstate->te[j] = NULL; 512 } 513 } 514 515 516 /* 517 * Code for responding to cancel interrupts (SIGINT, control-C, etc) 518 * 519 * This doesn't quite belong in this module, but it needs access to the 520 * ParallelState data, so there's not really a better place either. 521 * 522 * When we get a cancel interrupt, we could just die, but in pg_restore that 523 * could leave a SQL command (e.g., CREATE INDEX on a large table) running 524 * for a long time. Instead, we try to send a cancel request and then die. 525 * pg_dump probably doesn't really need this, but we might as well use it 526 * there too. Note that sending the cancel directly from the signal handler 527 * is safe because PQcancel() is written to make it so. 528 * 529 * In parallel operation on Unix, each process is responsible for canceling 530 * its own connection (this must be so because nobody else has access to it). 531 * Furthermore, the master process should attempt to forward its signal to 532 * each child. In simple manual use of pg_dump/pg_restore, forwarding isn't 533 * needed because typing control-C at the console would deliver SIGINT to 534 * every member of the terminal process group --- but in other scenarios it 535 * might be that only the master gets signaled. 536 * 537 * On Windows, the cancel handler runs in a separate thread, because that's 538 * how SetConsoleCtrlHandler works. We make it stop worker threads, send 539 * cancels on all active connections, and then return FALSE, which will allow 540 * the process to die. For safety's sake, we use a critical section to 541 * protect the PGcancel structures against being changed while the signal 542 * thread runs. 543 */ 544 545 #ifndef WIN32 546 547 /* 548 * Signal handler (Unix only) 549 */ 550 static void 551 sigTermHandler(SIGNAL_ARGS) 552 { 553 int i; 554 char errbuf[1]; 555 556 /* 557 * Some platforms allow delivery of new signals to interrupt an active 558 * signal handler. That could muck up our attempt to send PQcancel, so 559 * disable the signals that setup_cancel_handler enabled. 560 */ 561 pqsignal(SIGINT, SIG_IGN); 562 pqsignal(SIGTERM, SIG_IGN); 563 pqsignal(SIGQUIT, SIG_IGN); 564 565 /* 566 * If we're in the master, forward signal to all workers. (It seems best 567 * to do this before PQcancel; killing the master transaction will result 568 * in invalid-snapshot errors from active workers, which maybe we can 569 * quiet by killing workers first.) Ignore any errors. 570 */ 571 if (signal_info.pstate != NULL) 572 { 573 for (i = 0; i < signal_info.pstate->numWorkers; i++) 574 { 575 pid_t pid = signal_info.pstate->parallelSlot[i].pid; 576 577 if (pid != 0) 578 kill(pid, SIGTERM); 579 } 580 } 581 582 /* 583 * Send QueryCancel if we have a connection to send to. Ignore errors, 584 * there's not much we can do about them anyway. 585 */ 586 if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL) 587 (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf)); 588 589 /* 590 * Report we're quitting, using nothing more complicated than write(2). 591 * When in parallel operation, only the master process should do this. 592 */ 593 if (!signal_info.am_worker) 594 { 595 if (progname) 596 { 597 write_stderr(progname); 598 write_stderr(": "); 599 } 600 write_stderr("terminated by user\n"); 601 } 602 603 /* 604 * And die, using _exit() not exit() because the latter will invoke atexit 605 * handlers that can fail if we interrupted related code. 606 */ 607 _exit(1); 608 } 609 610 /* 611 * Enable cancel interrupt handler, if not already done. 612 */ 613 static void 614 setup_cancel_handler(void) 615 { 616 /* 617 * When forking, signal_info.handler_set will propagate into the new 618 * process, but that's fine because the signal handler state does too. 619 */ 620 if (!signal_info.handler_set) 621 { 622 signal_info.handler_set = true; 623 624 pqsignal(SIGINT, sigTermHandler); 625 pqsignal(SIGTERM, sigTermHandler); 626 pqsignal(SIGQUIT, sigTermHandler); 627 } 628 } 629 630 #else /* WIN32 */ 631 632 /* 633 * Console interrupt handler --- runs in a newly-started thread. 634 * 635 * After stopping other threads and sending cancel requests on all open 636 * connections, we return FALSE which will allow the default ExitProcess() 637 * action to be taken. 638 */ 639 static BOOL WINAPI 640 consoleHandler(DWORD dwCtrlType) 641 { 642 int i; 643 char errbuf[1]; 644 645 if (dwCtrlType == CTRL_C_EVENT || 646 dwCtrlType == CTRL_BREAK_EVENT) 647 { 648 /* Critical section prevents changing data we look at here */ 649 EnterCriticalSection(&signal_info_lock); 650 651 /* 652 * If in parallel mode, stop worker threads and send QueryCancel to 653 * their connected backends. The main point of stopping the worker 654 * threads is to keep them from reporting the query cancels as errors, 655 * which would clutter the user's screen. We needn't stop the master 656 * thread since it won't be doing much anyway. Do this before 657 * canceling the main transaction, else we might get invalid-snapshot 658 * errors reported before we can stop the workers. Ignore errors, 659 * there's not much we can do about them anyway. 660 */ 661 if (signal_info.pstate != NULL) 662 { 663 for (i = 0; i < signal_info.pstate->numWorkers; i++) 664 { 665 ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]); 666 ArchiveHandle *AH = slot->AH; 667 HANDLE hThread = (HANDLE) slot->hThread; 668 669 /* 670 * Using TerminateThread here may leave some resources leaked, 671 * but it doesn't matter since we're about to end the whole 672 * process. 673 */ 674 if (hThread != INVALID_HANDLE_VALUE) 675 TerminateThread(hThread, 0); 676 677 if (AH != NULL && AH->connCancel != NULL) 678 (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf)); 679 } 680 } 681 682 /* 683 * Send QueryCancel to master connection, if enabled. Ignore errors, 684 * there's not much we can do about them anyway. 685 */ 686 if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL) 687 (void) PQcancel(signal_info.myAH->connCancel, 688 errbuf, sizeof(errbuf)); 689 690 LeaveCriticalSection(&signal_info_lock); 691 692 /* 693 * Report we're quitting, using nothing more complicated than 694 * write(2). (We might be able to get away with using pg_log_*() 695 * here, but since we terminated other threads uncleanly above, it 696 * seems better to assume as little as possible.) 697 */ 698 if (progname) 699 { 700 write_stderr(progname); 701 write_stderr(": "); 702 } 703 write_stderr("terminated by user\n"); 704 } 705 706 /* Always return FALSE to allow signal handling to continue */ 707 return FALSE; 708 } 709 710 /* 711 * Enable cancel interrupt handler, if not already done. 712 */ 713 static void 714 setup_cancel_handler(void) 715 { 716 if (!signal_info.handler_set) 717 { 718 signal_info.handler_set = true; 719 720 InitializeCriticalSection(&signal_info_lock); 721 722 SetConsoleCtrlHandler(consoleHandler, TRUE); 723 } 724 } 725 726 #endif /* WIN32 */ 727 728 729 /* 730 * set_archive_cancel_info 731 * 732 * Fill AH->connCancel with cancellation info for the specified database 733 * connection; or clear it if conn is NULL. 734 */ 735 void 736 set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn) 737 { 738 PGcancel *oldConnCancel; 739 740 /* 741 * Activate the interrupt handler if we didn't yet in this process. On 742 * Windows, this also initializes signal_info_lock; therefore it's 743 * important that this happen at least once before we fork off any 744 * threads. 745 */ 746 setup_cancel_handler(); 747 748 /* 749 * On Unix, we assume that storing a pointer value is atomic with respect 750 * to any possible signal interrupt. On Windows, use a critical section. 751 */ 752 753 #ifdef WIN32 754 EnterCriticalSection(&signal_info_lock); 755 #endif 756 757 /* Free the old one if we have one */ 758 oldConnCancel = AH->connCancel; 759 /* be sure interrupt handler doesn't use pointer while freeing */ 760 AH->connCancel = NULL; 761 762 if (oldConnCancel != NULL) 763 PQfreeCancel(oldConnCancel); 764 765 /* Set the new one if specified */ 766 if (conn) 767 AH->connCancel = PQgetCancel(conn); 768 769 /* 770 * On Unix, there's only ever one active ArchiveHandle per process, so we 771 * can just set signal_info.myAH unconditionally. On Windows, do that 772 * only in the main thread; worker threads have to make sure their 773 * ArchiveHandle appears in the pstate data, which is dealt with in 774 * RunWorker(). 775 */ 776 #ifndef WIN32 777 signal_info.myAH = AH; 778 #else 779 if (mainThreadId == GetCurrentThreadId()) 780 signal_info.myAH = AH; 781 #endif 782 783 #ifdef WIN32 784 LeaveCriticalSection(&signal_info_lock); 785 #endif 786 } 787 788 /* 789 * set_cancel_pstate 790 * 791 * Set signal_info.pstate to point to the specified ParallelState, if any. 792 * We need this mainly to have an interlock against Windows signal thread. 793 */ 794 static void 795 set_cancel_pstate(ParallelState *pstate) 796 { 797 #ifdef WIN32 798 EnterCriticalSection(&signal_info_lock); 799 #endif 800 801 signal_info.pstate = pstate; 802 803 #ifdef WIN32 804 LeaveCriticalSection(&signal_info_lock); 805 #endif 806 } 807 808 /* 809 * set_cancel_slot_archive 810 * 811 * Set ParallelSlot's AH field to point to the specified archive, if any. 812 * We need this mainly to have an interlock against Windows signal thread. 813 */ 814 static void 815 set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH) 816 { 817 #ifdef WIN32 818 EnterCriticalSection(&signal_info_lock); 819 #endif 820 821 slot->AH = AH; 822 823 #ifdef WIN32 824 LeaveCriticalSection(&signal_info_lock); 825 #endif 826 } 827 828 829 /* 830 * This function is called by both Unix and Windows variants to set up 831 * and run a worker process. Caller should exit the process (or thread) 832 * upon return. 833 */ 834 static void 835 RunWorker(ArchiveHandle *AH, ParallelSlot *slot) 836 { 837 int pipefd[2]; 838 839 /* fetch child ends of pipes */ 840 pipefd[PIPE_READ] = slot->pipeRevRead; 841 pipefd[PIPE_WRITE] = slot->pipeRevWrite; 842 843 /* 844 * Clone the archive so that we have our own state to work with, and in 845 * particular our own database connection. 846 * 847 * We clone on Unix as well as Windows, even though technically we don't 848 * need to because fork() gives us a copy in our own address space 849 * already. But CloneArchive resets the state information and also clones 850 * the database connection which both seem kinda helpful. 851 */ 852 AH = CloneArchive(AH); 853 854 /* Remember cloned archive where signal handler can find it */ 855 set_cancel_slot_archive(slot, AH); 856 857 /* 858 * Call the setup worker function that's defined in the ArchiveHandle. 859 */ 860 (AH->SetupWorkerPtr) ((Archive *) AH); 861 862 /* 863 * Execute commands until done. 864 */ 865 WaitForCommands(AH, pipefd); 866 867 /* 868 * Disconnect from database and clean up. 869 */ 870 set_cancel_slot_archive(slot, NULL); 871 DisconnectDatabase(&(AH->public)); 872 DeCloneArchive(AH); 873 } 874 875 /* 876 * Thread base function for Windows 877 */ 878 #ifdef WIN32 879 static unsigned __stdcall 880 init_spawned_worker_win32(WorkerInfo *wi) 881 { 882 ArchiveHandle *AH = wi->AH; 883 ParallelSlot *slot = wi->slot; 884 885 /* Don't need WorkerInfo anymore */ 886 free(wi); 887 888 /* Run the worker ... */ 889 RunWorker(AH, slot); 890 891 /* Exit the thread */ 892 _endthreadex(0); 893 return 0; 894 } 895 #endif /* WIN32 */ 896 897 /* 898 * This function starts a parallel dump or restore by spawning off the worker 899 * processes. For Windows, it creates a number of threads; on Unix the 900 * workers are created with fork(). 901 */ 902 ParallelState * 903 ParallelBackupStart(ArchiveHandle *AH) 904 { 905 ParallelState *pstate; 906 int i; 907 908 Assert(AH->public.numWorkers > 0); 909 910 pstate = (ParallelState *) pg_malloc(sizeof(ParallelState)); 911 912 pstate->numWorkers = AH->public.numWorkers; 913 pstate->te = NULL; 914 pstate->parallelSlot = NULL; 915 916 if (AH->public.numWorkers == 1) 917 return pstate; 918 919 /* Create status arrays, being sure to initialize all fields to 0 */ 920 pstate->te = (TocEntry **) 921 pg_malloc0(pstate->numWorkers * sizeof(TocEntry *)); 922 pstate->parallelSlot = (ParallelSlot *) 923 pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot)); 924 925 #ifdef WIN32 926 /* Make fmtId() and fmtQualifiedId() use thread-local storage */ 927 getLocalPQExpBuffer = getThreadLocalPQExpBuffer; 928 #endif 929 930 /* 931 * Set the pstate in shutdown_info, to tell the exit handler that it must 932 * clean up workers as well as the main database connection. But we don't 933 * set this in signal_info yet, because we don't want child processes to 934 * inherit non-NULL signal_info.pstate. 935 */ 936 shutdown_info.pstate = pstate; 937 938 /* 939 * Temporarily disable query cancellation on the master connection. This 940 * ensures that child processes won't inherit valid AH->connCancel 941 * settings and thus won't try to issue cancels against the master's 942 * connection. No harm is done if we fail while it's disabled, because 943 * the master connection is idle at this point anyway. 944 */ 945 set_archive_cancel_info(AH, NULL); 946 947 /* Ensure stdio state is quiesced before forking */ 948 fflush(NULL); 949 950 /* Create desired number of workers */ 951 for (i = 0; i < pstate->numWorkers; i++) 952 { 953 #ifdef WIN32 954 WorkerInfo *wi; 955 uintptr_t handle; 956 #else 957 pid_t pid; 958 #endif 959 ParallelSlot *slot = &(pstate->parallelSlot[i]); 960 int pipeMW[2], 961 pipeWM[2]; 962 963 /* Create communication pipes for this worker */ 964 if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0) 965 fatal("could not create communication channels: %m"); 966 967 /* master's ends of the pipes */ 968 slot->pipeRead = pipeWM[PIPE_READ]; 969 slot->pipeWrite = pipeMW[PIPE_WRITE]; 970 /* child's ends of the pipes */ 971 slot->pipeRevRead = pipeMW[PIPE_READ]; 972 slot->pipeRevWrite = pipeWM[PIPE_WRITE]; 973 974 #ifdef WIN32 975 /* Create transient structure to pass args to worker function */ 976 wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo)); 977 978 wi->AH = AH; 979 wi->slot = slot; 980 981 handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32, 982 wi, 0, &(slot->threadId)); 983 slot->hThread = handle; 984 slot->workerStatus = WRKR_IDLE; 985 #else /* !WIN32 */ 986 pid = fork(); 987 if (pid == 0) 988 { 989 /* we are the worker */ 990 int j; 991 992 /* this is needed for GetMyPSlot() */ 993 slot->pid = getpid(); 994 995 /* instruct signal handler that we're in a worker now */ 996 signal_info.am_worker = true; 997 998 /* close read end of Worker -> Master */ 999 closesocket(pipeWM[PIPE_READ]); 1000 /* close write end of Master -> Worker */ 1001 closesocket(pipeMW[PIPE_WRITE]); 1002 1003 /* 1004 * Close all inherited fds for communication of the master with 1005 * previously-forked workers. 1006 */ 1007 for (j = 0; j < i; j++) 1008 { 1009 closesocket(pstate->parallelSlot[j].pipeRead); 1010 closesocket(pstate->parallelSlot[j].pipeWrite); 1011 } 1012 1013 /* Run the worker ... */ 1014 RunWorker(AH, slot); 1015 1016 /* We can just exit(0) when done */ 1017 exit(0); 1018 } 1019 else if (pid < 0) 1020 { 1021 /* fork failed */ 1022 fatal("could not create worker process: %m"); 1023 } 1024 1025 /* In Master after successful fork */ 1026 slot->pid = pid; 1027 slot->workerStatus = WRKR_IDLE; 1028 1029 /* close read end of Master -> Worker */ 1030 closesocket(pipeMW[PIPE_READ]); 1031 /* close write end of Worker -> Master */ 1032 closesocket(pipeWM[PIPE_WRITE]); 1033 #endif /* WIN32 */ 1034 } 1035 1036 /* 1037 * Having forked off the workers, disable SIGPIPE so that master isn't 1038 * killed if it tries to send a command to a dead worker. We don't want 1039 * the workers to inherit this setting, though. 1040 */ 1041 #ifndef WIN32 1042 pqsignal(SIGPIPE, SIG_IGN); 1043 #endif 1044 1045 /* 1046 * Re-establish query cancellation on the master connection. 1047 */ 1048 set_archive_cancel_info(AH, AH->connection); 1049 1050 /* 1051 * Tell the cancel signal handler to forward signals to worker processes, 1052 * too. (As with query cancel, we did not need this earlier because the 1053 * workers have not yet been given anything to do; if we die before this 1054 * point, any already-started workers will see EOF and quit promptly.) 1055 */ 1056 set_cancel_pstate(pstate); 1057 1058 return pstate; 1059 } 1060 1061 /* 1062 * Close down a parallel dump or restore. 1063 */ 1064 void 1065 ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate) 1066 { 1067 int i; 1068 1069 /* No work if non-parallel */ 1070 if (pstate->numWorkers == 1) 1071 return; 1072 1073 /* There should not be any unfinished jobs */ 1074 Assert(IsEveryWorkerIdle(pstate)); 1075 1076 /* Close the sockets so that the workers know they can exit */ 1077 for (i = 0; i < pstate->numWorkers; i++) 1078 { 1079 closesocket(pstate->parallelSlot[i].pipeRead); 1080 closesocket(pstate->parallelSlot[i].pipeWrite); 1081 } 1082 1083 /* Wait for them to exit */ 1084 WaitForTerminatingWorkers(pstate); 1085 1086 /* 1087 * Unlink pstate from shutdown_info, so the exit handler will not try to 1088 * use it; and likewise unlink from signal_info. 1089 */ 1090 shutdown_info.pstate = NULL; 1091 set_cancel_pstate(NULL); 1092 1093 /* Release state (mere neatnik-ism, since we're about to terminate) */ 1094 free(pstate->te); 1095 free(pstate->parallelSlot); 1096 free(pstate); 1097 } 1098 1099 /* 1100 * These next four functions handle construction and parsing of the command 1101 * strings and response strings for parallel workers. 1102 * 1103 * Currently, these can be the same regardless of which archive format we are 1104 * processing. In future, we might want to let format modules override these 1105 * functions to add format-specific data to a command or response. 1106 */ 1107 1108 /* 1109 * buildWorkerCommand: format a command string to send to a worker. 1110 * 1111 * The string is built in the caller-supplied buffer of size buflen. 1112 */ 1113 static void 1114 buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act, 1115 char *buf, int buflen) 1116 { 1117 if (act == ACT_DUMP) 1118 snprintf(buf, buflen, "DUMP %d", te->dumpId); 1119 else if (act == ACT_RESTORE) 1120 snprintf(buf, buflen, "RESTORE %d", te->dumpId); 1121 else 1122 Assert(false); 1123 } 1124 1125 /* 1126 * parseWorkerCommand: interpret a command string in a worker. 1127 */ 1128 static void 1129 parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act, 1130 const char *msg) 1131 { 1132 DumpId dumpId; 1133 int nBytes; 1134 1135 if (messageStartsWith(msg, "DUMP ")) 1136 { 1137 *act = ACT_DUMP; 1138 sscanf(msg, "DUMP %d%n", &dumpId, &nBytes); 1139 Assert(nBytes == strlen(msg)); 1140 *te = getTocEntryByDumpId(AH, dumpId); 1141 Assert(*te != NULL); 1142 } 1143 else if (messageStartsWith(msg, "RESTORE ")) 1144 { 1145 *act = ACT_RESTORE; 1146 sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes); 1147 Assert(nBytes == strlen(msg)); 1148 *te = getTocEntryByDumpId(AH, dumpId); 1149 Assert(*te != NULL); 1150 } 1151 else 1152 fatal("unrecognized command received from master: \"%s\"", 1153 msg); 1154 } 1155 1156 /* 1157 * buildWorkerResponse: format a response string to send to the master. 1158 * 1159 * The string is built in the caller-supplied buffer of size buflen. 1160 */ 1161 static void 1162 buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status, 1163 char *buf, int buflen) 1164 { 1165 snprintf(buf, buflen, "OK %d %d %d", 1166 te->dumpId, 1167 status, 1168 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0); 1169 } 1170 1171 /* 1172 * parseWorkerResponse: parse the status message returned by a worker. 1173 * 1174 * Returns the integer status code, and may update fields of AH and/or te. 1175 */ 1176 static int 1177 parseWorkerResponse(ArchiveHandle *AH, TocEntry *te, 1178 const char *msg) 1179 { 1180 DumpId dumpId; 1181 int nBytes, 1182 n_errors; 1183 int status = 0; 1184 1185 if (messageStartsWith(msg, "OK ")) 1186 { 1187 sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes); 1188 1189 Assert(dumpId == te->dumpId); 1190 Assert(nBytes == strlen(msg)); 1191 1192 AH->public.n_errors += n_errors; 1193 } 1194 else 1195 fatal("invalid message received from worker: \"%s\"", 1196 msg); 1197 1198 return status; 1199 } 1200 1201 /* 1202 * Dispatch a job to some free worker. 1203 * 1204 * te is the TocEntry to be processed, act is the action to be taken on it. 1205 * callback is the function to call on completion of the job. 1206 * 1207 * If no worker is currently available, this will block, and previously 1208 * registered callback functions may be called. 1209 */ 1210 void 1211 DispatchJobForTocEntry(ArchiveHandle *AH, 1212 ParallelState *pstate, 1213 TocEntry *te, 1214 T_Action act, 1215 ParallelCompletionPtr callback, 1216 void *callback_data) 1217 { 1218 int worker; 1219 char buf[256]; 1220 1221 /* Get a worker, waiting if none are idle */ 1222 while ((worker = GetIdleWorker(pstate)) == NO_SLOT) 1223 WaitForWorkers(AH, pstate, WFW_ONE_IDLE); 1224 1225 /* Construct and send command string */ 1226 buildWorkerCommand(AH, te, act, buf, sizeof(buf)); 1227 1228 sendMessageToWorker(pstate, worker, buf); 1229 1230 /* Remember worker is busy, and which TocEntry it's working on */ 1231 pstate->parallelSlot[worker].workerStatus = WRKR_WORKING; 1232 pstate->parallelSlot[worker].callback = callback; 1233 pstate->parallelSlot[worker].callback_data = callback_data; 1234 pstate->te[worker] = te; 1235 } 1236 1237 /* 1238 * Find an idle worker and return its slot number. 1239 * Return NO_SLOT if none are idle. 1240 */ 1241 static int 1242 GetIdleWorker(ParallelState *pstate) 1243 { 1244 int i; 1245 1246 for (i = 0; i < pstate->numWorkers; i++) 1247 { 1248 if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE) 1249 return i; 1250 } 1251 return NO_SLOT; 1252 } 1253 1254 /* 1255 * Return true iff no worker is running. 1256 */ 1257 static bool 1258 HasEveryWorkerTerminated(ParallelState *pstate) 1259 { 1260 int i; 1261 1262 for (i = 0; i < pstate->numWorkers; i++) 1263 { 1264 if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus)) 1265 return false; 1266 } 1267 return true; 1268 } 1269 1270 /* 1271 * Return true iff every worker is in the WRKR_IDLE state. 1272 */ 1273 bool 1274 IsEveryWorkerIdle(ParallelState *pstate) 1275 { 1276 int i; 1277 1278 for (i = 0; i < pstate->numWorkers; i++) 1279 { 1280 if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE) 1281 return false; 1282 } 1283 return true; 1284 } 1285 1286 /* 1287 * Acquire lock on a table to be dumped by a worker process. 1288 * 1289 * The master process is already holding an ACCESS SHARE lock. Ordinarily 1290 * it's no problem for a worker to get one too, but if anything else besides 1291 * pg_dump is running, there's a possible deadlock: 1292 * 1293 * 1) Master dumps the schema and locks all tables in ACCESS SHARE mode. 1294 * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted 1295 * because the master holds a conflicting ACCESS SHARE lock). 1296 * 3) A worker process also requests an ACCESS SHARE lock to read the table. 1297 * The worker is enqueued behind the ACCESS EXCLUSIVE lock request. 1298 * 4) Now we have a deadlock, since the master is effectively waiting for 1299 * the worker. The server cannot detect that, however. 1300 * 1301 * To prevent an infinite wait, prior to touching a table in a worker, request 1302 * a lock in ACCESS SHARE mode but with NOWAIT. If we don't get the lock, 1303 * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and 1304 * so we have a deadlock. We must fail the backup in that case. 1305 */ 1306 static void 1307 lockTableForWorker(ArchiveHandle *AH, TocEntry *te) 1308 { 1309 const char *qualId; 1310 PQExpBuffer query; 1311 PGresult *res; 1312 1313 /* Nothing to do for BLOBS */ 1314 if (strcmp(te->desc, "BLOBS") == 0) 1315 return; 1316 1317 query = createPQExpBuffer(); 1318 1319 qualId = fmtQualifiedId(te->namespace, te->tag); 1320 1321 appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT", 1322 qualId); 1323 1324 res = PQexec(AH->connection, query->data); 1325 1326 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK) 1327 fatal("could not obtain lock on relation \"%s\"\n" 1328 "This usually means that someone requested an ACCESS EXCLUSIVE lock " 1329 "on the table after the pg_dump parent process had gotten the " 1330 "initial ACCESS SHARE lock on the table.", qualId); 1331 1332 PQclear(res); 1333 destroyPQExpBuffer(query); 1334 } 1335 1336 /* 1337 * WaitForCommands: main routine for a worker process. 1338 * 1339 * Read and execute commands from the master until we see EOF on the pipe. 1340 */ 1341 static void 1342 WaitForCommands(ArchiveHandle *AH, int pipefd[2]) 1343 { 1344 char *command; 1345 TocEntry *te; 1346 T_Action act; 1347 int status = 0; 1348 char buf[256]; 1349 1350 for (;;) 1351 { 1352 if (!(command = getMessageFromMaster(pipefd))) 1353 { 1354 /* EOF, so done */ 1355 return; 1356 } 1357 1358 /* Decode the command */ 1359 parseWorkerCommand(AH, &te, &act, command); 1360 1361 if (act == ACT_DUMP) 1362 { 1363 /* Acquire lock on this table within the worker's session */ 1364 lockTableForWorker(AH, te); 1365 1366 /* Perform the dump command */ 1367 status = (AH->WorkerJobDumpPtr) (AH, te); 1368 } 1369 else if (act == ACT_RESTORE) 1370 { 1371 /* Perform the restore command */ 1372 status = (AH->WorkerJobRestorePtr) (AH, te); 1373 } 1374 else 1375 Assert(false); 1376 1377 /* Return status to master */ 1378 buildWorkerResponse(AH, te, act, status, buf, sizeof(buf)); 1379 1380 sendMessageToMaster(pipefd, buf); 1381 1382 /* command was pg_malloc'd and we are responsible for free()ing it. */ 1383 free(command); 1384 } 1385 } 1386 1387 /* 1388 * Check for status messages from workers. 1389 * 1390 * If do_wait is true, wait to get a status message; otherwise, just return 1391 * immediately if there is none available. 1392 * 1393 * When we get a status message, we pass the status code to the callback 1394 * function that was specified to DispatchJobForTocEntry, then reset the 1395 * worker status to IDLE. 1396 * 1397 * Returns true if we collected a status message, else false. 1398 * 1399 * XXX is it worth checking for more than one status message per call? 1400 * It seems somewhat unlikely that multiple workers would finish at exactly 1401 * the same time. 1402 */ 1403 static bool 1404 ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait) 1405 { 1406 int worker; 1407 char *msg; 1408 1409 /* Try to collect a status message */ 1410 msg = getMessageFromWorker(pstate, do_wait, &worker); 1411 1412 if (!msg) 1413 { 1414 /* If do_wait is true, we must have detected EOF on some socket */ 1415 if (do_wait) 1416 fatal("a worker process died unexpectedly"); 1417 return false; 1418 } 1419 1420 /* Process it and update our idea of the worker's status */ 1421 if (messageStartsWith(msg, "OK ")) 1422 { 1423 ParallelSlot *slot = &pstate->parallelSlot[worker]; 1424 TocEntry *te = pstate->te[worker]; 1425 int status; 1426 1427 status = parseWorkerResponse(AH, te, msg); 1428 slot->callback(AH, te, status, slot->callback_data); 1429 slot->workerStatus = WRKR_IDLE; 1430 pstate->te[worker] = NULL; 1431 } 1432 else 1433 fatal("invalid message received from worker: \"%s\"", 1434 msg); 1435 1436 /* Free the string returned from getMessageFromWorker */ 1437 free(msg); 1438 1439 return true; 1440 } 1441 1442 /* 1443 * Check for status results from workers, waiting if necessary. 1444 * 1445 * Available wait modes are: 1446 * WFW_NO_WAIT: reap any available status, but don't block 1447 * WFW_GOT_STATUS: wait for at least one more worker to finish 1448 * WFW_ONE_IDLE: wait for at least one worker to be idle 1449 * WFW_ALL_IDLE: wait for all workers to be idle 1450 * 1451 * Any received results are passed to the callback specified to 1452 * DispatchJobForTocEntry. 1453 * 1454 * This function is executed in the master process. 1455 */ 1456 void 1457 WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode) 1458 { 1459 bool do_wait = false; 1460 1461 /* 1462 * In GOT_STATUS mode, always block waiting for a message, since we can't 1463 * return till we get something. In other modes, we don't block the first 1464 * time through the loop. 1465 */ 1466 if (mode == WFW_GOT_STATUS) 1467 { 1468 /* Assert that caller knows what it's doing */ 1469 Assert(!IsEveryWorkerIdle(pstate)); 1470 do_wait = true; 1471 } 1472 1473 for (;;) 1474 { 1475 /* 1476 * Check for status messages, even if we don't need to block. We do 1477 * not try very hard to reap all available messages, though, since 1478 * there's unlikely to be more than one. 1479 */ 1480 if (ListenToWorkers(AH, pstate, do_wait)) 1481 { 1482 /* 1483 * If we got a message, we are done by definition for GOT_STATUS 1484 * mode, and we can also be certain that there's at least one idle 1485 * worker. So we're done in all but ALL_IDLE mode. 1486 */ 1487 if (mode != WFW_ALL_IDLE) 1488 return; 1489 } 1490 1491 /* Check whether we must wait for new status messages */ 1492 switch (mode) 1493 { 1494 case WFW_NO_WAIT: 1495 return; /* never wait */ 1496 case WFW_GOT_STATUS: 1497 Assert(false); /* can't get here, because we waited */ 1498 break; 1499 case WFW_ONE_IDLE: 1500 if (GetIdleWorker(pstate) != NO_SLOT) 1501 return; 1502 break; 1503 case WFW_ALL_IDLE: 1504 if (IsEveryWorkerIdle(pstate)) 1505 return; 1506 break; 1507 } 1508 1509 /* Loop back, and this time wait for something to happen */ 1510 do_wait = true; 1511 } 1512 } 1513 1514 /* 1515 * Read one command message from the master, blocking if necessary 1516 * until one is available, and return it as a malloc'd string. 1517 * On EOF, return NULL. 1518 * 1519 * This function is executed in worker processes. 1520 */ 1521 static char * 1522 getMessageFromMaster(int pipefd[2]) 1523 { 1524 return readMessageFromPipe(pipefd[PIPE_READ]); 1525 } 1526 1527 /* 1528 * Send a status message to the master. 1529 * 1530 * This function is executed in worker processes. 1531 */ 1532 static void 1533 sendMessageToMaster(int pipefd[2], const char *str) 1534 { 1535 int len = strlen(str) + 1; 1536 1537 if (pipewrite(pipefd[PIPE_WRITE], str, len) != len) 1538 fatal("could not write to the communication channel: %m"); 1539 } 1540 1541 /* 1542 * Wait until some descriptor in "workerset" becomes readable. 1543 * Returns -1 on error, else the number of readable descriptors. 1544 */ 1545 static int 1546 select_loop(int maxFd, fd_set *workerset) 1547 { 1548 int i; 1549 fd_set saveSet = *workerset; 1550 1551 for (;;) 1552 { 1553 *workerset = saveSet; 1554 i = select(maxFd + 1, workerset, NULL, NULL, NULL); 1555 1556 #ifndef WIN32 1557 if (i < 0 && errno == EINTR) 1558 continue; 1559 #else 1560 if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR) 1561 continue; 1562 #endif 1563 break; 1564 } 1565 1566 return i; 1567 } 1568 1569 1570 /* 1571 * Check for messages from worker processes. 1572 * 1573 * If a message is available, return it as a malloc'd string, and put the 1574 * index of the sending worker in *worker. 1575 * 1576 * If nothing is available, wait if "do_wait" is true, else return NULL. 1577 * 1578 * If we detect EOF on any socket, we'll return NULL. It's not great that 1579 * that's hard to distinguish from the no-data-available case, but for now 1580 * our one caller is okay with that. 1581 * 1582 * This function is executed in the master process. 1583 */ 1584 static char * 1585 getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker) 1586 { 1587 int i; 1588 fd_set workerset; 1589 int maxFd = -1; 1590 struct timeval nowait = {0, 0}; 1591 1592 /* construct bitmap of socket descriptors for select() */ 1593 FD_ZERO(&workerset); 1594 for (i = 0; i < pstate->numWorkers; i++) 1595 { 1596 if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus)) 1597 continue; 1598 FD_SET(pstate->parallelSlot[i].pipeRead, &workerset); 1599 if (pstate->parallelSlot[i].pipeRead > maxFd) 1600 maxFd = pstate->parallelSlot[i].pipeRead; 1601 } 1602 1603 if (do_wait) 1604 { 1605 i = select_loop(maxFd, &workerset); 1606 Assert(i != 0); 1607 } 1608 else 1609 { 1610 if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0) 1611 return NULL; 1612 } 1613 1614 if (i < 0) 1615 fatal("select() failed: %m"); 1616 1617 for (i = 0; i < pstate->numWorkers; i++) 1618 { 1619 char *msg; 1620 1621 if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus)) 1622 continue; 1623 if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset)) 1624 continue; 1625 1626 /* 1627 * Read the message if any. If the socket is ready because of EOF, 1628 * we'll return NULL instead (and the socket will stay ready, so the 1629 * condition will persist). 1630 * 1631 * Note: because this is a blocking read, we'll wait if only part of 1632 * the message is available. Waiting a long time would be bad, but 1633 * since worker status messages are short and are always sent in one 1634 * operation, it shouldn't be a problem in practice. 1635 */ 1636 msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead); 1637 *worker = i; 1638 return msg; 1639 } 1640 Assert(false); 1641 return NULL; 1642 } 1643 1644 /* 1645 * Send a command message to the specified worker process. 1646 * 1647 * This function is executed in the master process. 1648 */ 1649 static void 1650 sendMessageToWorker(ParallelState *pstate, int worker, const char *str) 1651 { 1652 int len = strlen(str) + 1; 1653 1654 if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len) 1655 { 1656 fatal("could not write to the communication channel: %m"); 1657 } 1658 } 1659 1660 /* 1661 * Read one message from the specified pipe (fd), blocking if necessary 1662 * until one is available, and return it as a malloc'd string. 1663 * On EOF, return NULL. 1664 * 1665 * A "message" on the channel is just a null-terminated string. 1666 */ 1667 static char * 1668 readMessageFromPipe(int fd) 1669 { 1670 char *msg; 1671 int msgsize, 1672 bufsize; 1673 int ret; 1674 1675 /* 1676 * In theory, if we let piperead() read multiple bytes, it might give us 1677 * back fragments of multiple messages. (That can't actually occur, since 1678 * neither master nor workers send more than one message without waiting 1679 * for a reply, but we don't wish to assume that here.) For simplicity, 1680 * read a byte at a time until we get the terminating '\0'. This method 1681 * is a bit inefficient, but since this is only used for relatively short 1682 * command and status strings, it shouldn't matter. 1683 */ 1684 bufsize = 64; /* could be any number */ 1685 msg = (char *) pg_malloc(bufsize); 1686 msgsize = 0; 1687 for (;;) 1688 { 1689 Assert(msgsize < bufsize); 1690 ret = piperead(fd, msg + msgsize, 1); 1691 if (ret <= 0) 1692 break; /* error or connection closure */ 1693 1694 Assert(ret == 1); 1695 1696 if (msg[msgsize] == '\0') 1697 return msg; /* collected whole message */ 1698 1699 msgsize++; 1700 if (msgsize == bufsize) /* enlarge buffer if needed */ 1701 { 1702 bufsize += 16; /* could be any number */ 1703 msg = (char *) pg_realloc(msg, bufsize); 1704 } 1705 } 1706 1707 /* Other end has closed the connection */ 1708 pg_free(msg); 1709 return NULL; 1710 } 1711 1712 #ifdef WIN32 1713 1714 /* 1715 * This is a replacement version of pipe(2) for Windows which allows the pipe 1716 * handles to be used in select(). 1717 * 1718 * Reads and writes on the pipe must go through piperead()/pipewrite(). 1719 * 1720 * For consistency with Unix we declare the returned handles as "int". 1721 * This is okay even on WIN64 because system handles are not more than 1722 * 32 bits wide, but we do have to do some casting. 1723 */ 1724 static int 1725 pgpipe(int handles[2]) 1726 { 1727 pgsocket s, 1728 tmp_sock; 1729 struct sockaddr_in serv_addr; 1730 int len = sizeof(serv_addr); 1731 1732 /* We have to use the Unix socket invalid file descriptor value here. */ 1733 handles[0] = handles[1] = -1; 1734 1735 /* 1736 * setup listen socket 1737 */ 1738 if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET) 1739 { 1740 pg_log_error("pgpipe: could not create socket: error code %d", 1741 WSAGetLastError()); 1742 return -1; 1743 } 1744 1745 memset((void *) &serv_addr, 0, sizeof(serv_addr)); 1746 serv_addr.sin_family = AF_INET; 1747 serv_addr.sin_port = pg_hton16(0); 1748 serv_addr.sin_addr.s_addr = pg_hton32(INADDR_LOOPBACK); 1749 if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR) 1750 { 1751 pg_log_error("pgpipe: could not bind: error code %d", 1752 WSAGetLastError()); 1753 closesocket(s); 1754 return -1; 1755 } 1756 if (listen(s, 1) == SOCKET_ERROR) 1757 { 1758 pg_log_error("pgpipe: could not listen: error code %d", 1759 WSAGetLastError()); 1760 closesocket(s); 1761 return -1; 1762 } 1763 if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR) 1764 { 1765 pg_log_error("pgpipe: getsockname() failed: error code %d", 1766 WSAGetLastError()); 1767 closesocket(s); 1768 return -1; 1769 } 1770 1771 /* 1772 * setup pipe handles 1773 */ 1774 if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET) 1775 { 1776 pg_log_error("pgpipe: could not create second socket: error code %d", 1777 WSAGetLastError()); 1778 closesocket(s); 1779 return -1; 1780 } 1781 handles[1] = (int) tmp_sock; 1782 1783 if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR) 1784 { 1785 pg_log_error("pgpipe: could not connect socket: error code %d", 1786 WSAGetLastError()); 1787 closesocket(handles[1]); 1788 handles[1] = -1; 1789 closesocket(s); 1790 return -1; 1791 } 1792 if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET) 1793 { 1794 pg_log_error("pgpipe: could not accept connection: error code %d", 1795 WSAGetLastError()); 1796 closesocket(handles[1]); 1797 handles[1] = -1; 1798 closesocket(s); 1799 return -1; 1800 } 1801 handles[0] = (int) tmp_sock; 1802 1803 closesocket(s); 1804 return 0; 1805 } 1806 1807 /* 1808 * Windows implementation of reading from a pipe. 1809 */ 1810 static int 1811 piperead(int s, char *buf, int len) 1812 { 1813 int ret = recv(s, buf, len, 0); 1814 1815 if (ret < 0 && WSAGetLastError() == WSAECONNRESET) 1816 { 1817 /* EOF on the pipe! */ 1818 ret = 0; 1819 } 1820 return ret; 1821 } 1822 1823 #endif /* WIN32 */ 1824