1 /*-------------------------------------------------------------------------
2 *
3 * parallel.c
4 *
5 * Parallel support for pg_dump and pg_restore
6 *
7 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/bin/pg_dump/parallel.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16 /*
17 * Parallel operation works like this:
18 *
19 * The original, leader process calls ParallelBackupStart(), which forks off
20 * the desired number of worker processes, which each enter WaitForCommands().
21 *
22 * The leader process dispatches an individual work item to one of the worker
23 * processes in DispatchJobForTocEntry(). We send a command string such as
24 * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
25 * The worker process receives and decodes the command and passes it to the
26 * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
27 * which are routines of the current archive format. That routine performs
28 * the required action (dump or restore) and returns an integer status code.
29 * This is passed back to the leader where we pass it to the
30 * ParallelCompletionPtr callback function that was passed to
31 * DispatchJobForTocEntry(). The callback function does state updating
32 * for the leader control logic in pg_backup_archiver.c.
33 *
34 * In principle additional archive-format-specific information might be needed
35 * in commands or worker status responses, but so far that hasn't proved
36 * necessary, since workers have full copies of the ArchiveHandle/TocEntry
37 * data structures. Remember that we have forked off the workers only after
38 * we have read in the catalog. That's why our worker processes can also
39 * access the catalog information. (In the Windows case, the workers are
40 * threads in the same process. To avoid problems, they work with cloned
41 * copies of the Archive data structure; see RunWorker().)
42 *
43 * In the leader process, the workerStatus field for each worker has one of
44 * the following values:
45 * WRKR_NOT_STARTED: we've not yet forked this worker
46 * WRKR_IDLE: it's waiting for a command
47 * WRKR_WORKING: it's working on a command
48 * WRKR_TERMINATED: process ended
49 * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
50 * state, and must be NULL in other states.
51 */
52
53 #include "postgres_fe.h"
54
55 #ifndef WIN32
56 #include <sys/wait.h>
57 #include <signal.h>
58 #include <unistd.h>
59 #include <fcntl.h>
60 #endif
61 #ifdef HAVE_SYS_SELECT_H
62 #include <sys/select.h>
63 #endif
64
65 #include "fe_utils/string_utils.h"
66 #include "parallel.h"
67 #include "pg_backup_utils.h"
68 #include "port/pg_bswap.h"
69
70 /* Mnemonic macros for indexing the fd array returned by pipe(2) */
71 #define PIPE_READ 0
72 #define PIPE_WRITE 1
73
74 #define NO_SLOT (-1) /* Failure result for GetIdleWorker() */
75
76 /* Worker process statuses */
77 typedef enum
78 {
79 WRKR_NOT_STARTED = 0,
80 WRKR_IDLE,
81 WRKR_WORKING,
82 WRKR_TERMINATED
83 } T_WorkerStatus;
84
85 #define WORKER_IS_RUNNING(workerStatus) \
86 ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
87
88 /*
89 * Private per-parallel-worker state (typedef for this is in parallel.h).
90 *
91 * Much of this is valid only in the leader process (or, on Windows, should
92 * be touched only by the leader thread). But the AH field should be touched
93 * only by workers. The pipe descriptors are valid everywhere.
94 */
95 struct ParallelSlot
96 {
97 T_WorkerStatus workerStatus; /* see enum above */
98
99 /* These fields are valid if workerStatus == WRKR_WORKING: */
100 ParallelCompletionPtr callback; /* function to call on completion */
101 void *callback_data; /* passthrough data for it */
102
103 ArchiveHandle *AH; /* Archive data worker is using */
104
105 int pipeRead; /* leader's end of the pipes */
106 int pipeWrite;
107 int pipeRevRead; /* child's end of the pipes */
108 int pipeRevWrite;
109
110 /* Child process/thread identity info: */
111 #ifdef WIN32
112 uintptr_t hThread;
113 unsigned int threadId;
114 #else
115 pid_t pid;
116 #endif
117 };
118
119 #ifdef WIN32
120
121 /*
122 * Structure to hold info passed by _beginthreadex() to the function it calls
123 * via its single allowed argument.
124 */
125 typedef struct
126 {
127 ArchiveHandle *AH; /* leader database connection */
128 ParallelSlot *slot; /* this worker's parallel slot */
129 } WorkerInfo;
130
131 /* Windows implementation of pipe access */
132 static int pgpipe(int handles[2]);
133 #define piperead(a,b,c) recv(a,b,c,0)
134 #define pipewrite(a,b,c) send(a,b,c,0)
135
136 #else /* !WIN32 */
137
138 /* Non-Windows implementation of pipe access */
139 #define pgpipe(a) pipe(a)
140 #define piperead(a,b,c) read(a,b,c)
141 #define pipewrite(a,b,c) write(a,b,c)
142
143 #endif /* WIN32 */
144
145 /*
146 * State info for archive_close_connection() shutdown callback.
147 */
148 typedef struct ShutdownInformation
149 {
150 ParallelState *pstate;
151 Archive *AHX;
152 } ShutdownInformation;
153
154 static ShutdownInformation shutdown_info;
155
156 /*
157 * State info for signal handling.
158 * We assume signal_info initializes to zeroes.
159 *
160 * On Unix, myAH is the leader DB connection in the leader process, and the
161 * worker's own connection in worker processes. On Windows, we have only one
162 * instance of signal_info, so myAH is the leader connection and the worker
163 * connections must be dug out of pstate->parallelSlot[].
164 */
165 typedef struct DumpSignalInformation
166 {
167 ArchiveHandle *myAH; /* database connection to issue cancel for */
168 ParallelState *pstate; /* parallel state, if any */
169 bool handler_set; /* signal handler set up in this process? */
170 #ifndef WIN32
171 bool am_worker; /* am I a worker process? */
172 #endif
173 } DumpSignalInformation;
174
175 static volatile DumpSignalInformation signal_info;
176
177 #ifdef WIN32
178 static CRITICAL_SECTION signal_info_lock;
179 #endif
180
181 /*
182 * Write a simple string to stderr --- must be safe in a signal handler.
183 * We ignore the write() result since there's not much we could do about it.
184 * Certain compilers make that harder than it ought to be.
185 */
186 #define write_stderr(str) \
187 do { \
188 const char *str_ = (str); \
189 int rc_; \
190 rc_ = write(fileno(stderr), str_, strlen(str_)); \
191 (void) rc_; \
192 } while (0)
193
194
195 #ifdef WIN32
196 /* file-scope variables */
197 static DWORD tls_index;
198
199 /* globally visible variables (needed by exit_nicely) */
200 bool parallel_init_done = false;
201 DWORD mainThreadId;
202 #endif /* WIN32 */
203
204 /* Local function prototypes */
205 static ParallelSlot *GetMyPSlot(ParallelState *pstate);
206 static void archive_close_connection(int code, void *arg);
207 static void ShutdownWorkersHard(ParallelState *pstate);
208 static void WaitForTerminatingWorkers(ParallelState *pstate);
209 static void setup_cancel_handler(void);
210 static void set_cancel_pstate(ParallelState *pstate);
211 static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
212 static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
213 static int GetIdleWorker(ParallelState *pstate);
214 static bool HasEveryWorkerTerminated(ParallelState *pstate);
215 static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
216 static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
217 static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
218 bool do_wait);
219 static char *getMessageFromLeader(int pipefd[2]);
220 static void sendMessageToLeader(int pipefd[2], const char *str);
221 static int select_loop(int maxFd, fd_set *workerset);
222 static char *getMessageFromWorker(ParallelState *pstate,
223 bool do_wait, int *worker);
224 static void sendMessageToWorker(ParallelState *pstate,
225 int worker, const char *str);
226 static char *readMessageFromPipe(int fd);
227
228 #define messageStartsWith(msg, prefix) \
229 (strncmp(msg, prefix, strlen(prefix)) == 0)
230
231
232 /*
233 * Initialize parallel dump support --- should be called early in process
234 * startup. (Currently, this is called whether or not we intend parallel
235 * activity.)
236 */
237 void
init_parallel_dump_utils(void)238 init_parallel_dump_utils(void)
239 {
240 #ifdef WIN32
241 if (!parallel_init_done)
242 {
243 WSADATA wsaData;
244 int err;
245
246 /* Prepare for threaded operation */
247 tls_index = TlsAlloc();
248 mainThreadId = GetCurrentThreadId();
249
250 /* Initialize socket access */
251 err = WSAStartup(MAKEWORD(2, 2), &wsaData);
252 if (err != 0)
253 {
254 pg_log_error("%s() failed: error code %d", "WSAStartup", err);
255 exit_nicely(1);
256 }
257
258 parallel_init_done = true;
259 }
260 #endif
261 }
262
263 /*
264 * Find the ParallelSlot for the current worker process or thread.
265 *
266 * Returns NULL if no matching slot is found (this implies we're the leader).
267 */
268 static ParallelSlot *
GetMyPSlot(ParallelState * pstate)269 GetMyPSlot(ParallelState *pstate)
270 {
271 int i;
272
273 for (i = 0; i < pstate->numWorkers; i++)
274 {
275 #ifdef WIN32
276 if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
277 #else
278 if (pstate->parallelSlot[i].pid == getpid())
279 #endif
280 return &(pstate->parallelSlot[i]);
281 }
282
283 return NULL;
284 }
285
286 /*
287 * A thread-local version of getLocalPQExpBuffer().
288 *
289 * Non-reentrant but reduces memory leakage: we'll consume one buffer per
290 * thread, which is much better than one per fmtId/fmtQualifiedId call.
291 */
292 #ifdef WIN32
293 static PQExpBuffer
getThreadLocalPQExpBuffer(void)294 getThreadLocalPQExpBuffer(void)
295 {
296 /*
297 * The Tls code goes awry if we use a static var, so we provide for both
298 * static and auto, and omit any use of the static var when using Tls. We
299 * rely on TlsGetValue() to return 0 if the value is not yet set.
300 */
301 static PQExpBuffer s_id_return = NULL;
302 PQExpBuffer id_return;
303
304 if (parallel_init_done)
305 id_return = (PQExpBuffer) TlsGetValue(tls_index);
306 else
307 id_return = s_id_return;
308
309 if (id_return) /* first time through? */
310 {
311 /* same buffer, just wipe contents */
312 resetPQExpBuffer(id_return);
313 }
314 else
315 {
316 /* new buffer */
317 id_return = createPQExpBuffer();
318 if (parallel_init_done)
319 TlsSetValue(tls_index, id_return);
320 else
321 s_id_return = id_return;
322 }
323
324 return id_return;
325 }
326 #endif /* WIN32 */
327
328 /*
329 * pg_dump and pg_restore call this to register the cleanup handler
330 * as soon as they've created the ArchiveHandle.
331 */
332 void
on_exit_close_archive(Archive * AHX)333 on_exit_close_archive(Archive *AHX)
334 {
335 shutdown_info.AHX = AHX;
336 on_exit_nicely(archive_close_connection, &shutdown_info);
337 }
338
339 /*
340 * on_exit_nicely handler for shutting down database connections and
341 * worker processes cleanly.
342 */
343 static void
archive_close_connection(int code,void * arg)344 archive_close_connection(int code, void *arg)
345 {
346 ShutdownInformation *si = (ShutdownInformation *) arg;
347
348 if (si->pstate)
349 {
350 /* In parallel mode, must figure out who we are */
351 ParallelSlot *slot = GetMyPSlot(si->pstate);
352
353 if (!slot)
354 {
355 /*
356 * We're the leader. Forcibly shut down workers, then close our
357 * own database connection, if any.
358 */
359 ShutdownWorkersHard(si->pstate);
360
361 if (si->AHX)
362 DisconnectDatabase(si->AHX);
363 }
364 else
365 {
366 /*
367 * We're a worker. Shut down our own DB connection if any. On
368 * Windows, we also have to close our communication sockets, to
369 * emulate what will happen on Unix when the worker process exits.
370 * (Without this, if this is a premature exit, the leader would
371 * fail to detect it because there would be no EOF condition on
372 * the other end of the pipe.)
373 */
374 if (slot->AH)
375 DisconnectDatabase(&(slot->AH->public));
376
377 #ifdef WIN32
378 closesocket(slot->pipeRevRead);
379 closesocket(slot->pipeRevWrite);
380 #endif
381 }
382 }
383 else
384 {
385 /* Non-parallel operation: just kill the leader DB connection */
386 if (si->AHX)
387 DisconnectDatabase(si->AHX);
388 }
389 }
390
391 /*
392 * Forcibly shut down any remaining workers, waiting for them to finish.
393 *
394 * Note that we don't expect to come here during normal exit (the workers
395 * should be long gone, and the ParallelState too). We're only here in a
396 * fatal() situation, so intervening to cancel active commands is
397 * appropriate.
398 */
399 static void
ShutdownWorkersHard(ParallelState * pstate)400 ShutdownWorkersHard(ParallelState *pstate)
401 {
402 int i;
403
404 /*
405 * Close our write end of the sockets so that any workers waiting for
406 * commands know they can exit. (Note: some of the pipeWrite fields might
407 * still be zero, if we failed to initialize all the workers. Hence, just
408 * ignore errors here.)
409 */
410 for (i = 0; i < pstate->numWorkers; i++)
411 closesocket(pstate->parallelSlot[i].pipeWrite);
412
413 /*
414 * Force early termination of any commands currently in progress.
415 */
416 #ifndef WIN32
417 /* On non-Windows, send SIGTERM to each worker process. */
418 for (i = 0; i < pstate->numWorkers; i++)
419 {
420 pid_t pid = pstate->parallelSlot[i].pid;
421
422 if (pid != 0)
423 kill(pid, SIGTERM);
424 }
425 #else
426
427 /*
428 * On Windows, send query cancels directly to the workers' backends. Use
429 * a critical section to ensure worker threads don't change state.
430 */
431 EnterCriticalSection(&signal_info_lock);
432 for (i = 0; i < pstate->numWorkers; i++)
433 {
434 ArchiveHandle *AH = pstate->parallelSlot[i].AH;
435 char errbuf[1];
436
437 if (AH != NULL && AH->connCancel != NULL)
438 (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
439 }
440 LeaveCriticalSection(&signal_info_lock);
441 #endif
442
443 /* Now wait for them to terminate. */
444 WaitForTerminatingWorkers(pstate);
445 }
446
447 /*
448 * Wait for all workers to terminate.
449 */
450 static void
WaitForTerminatingWorkers(ParallelState * pstate)451 WaitForTerminatingWorkers(ParallelState *pstate)
452 {
453 while (!HasEveryWorkerTerminated(pstate))
454 {
455 ParallelSlot *slot = NULL;
456 int j;
457
458 #ifndef WIN32
459 /* On non-Windows, use wait() to wait for next worker to end */
460 int status;
461 pid_t pid = wait(&status);
462
463 /* Find dead worker's slot, and clear the PID field */
464 for (j = 0; j < pstate->numWorkers; j++)
465 {
466 slot = &(pstate->parallelSlot[j]);
467 if (slot->pid == pid)
468 {
469 slot->pid = 0;
470 break;
471 }
472 }
473 #else /* WIN32 */
474 /* On Windows, we must use WaitForMultipleObjects() */
475 HANDLE *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
476 int nrun = 0;
477 DWORD ret;
478 uintptr_t hThread;
479
480 for (j = 0; j < pstate->numWorkers; j++)
481 {
482 if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
483 {
484 lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
485 nrun++;
486 }
487 }
488 ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
489 Assert(ret != WAIT_FAILED);
490 hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
491 free(lpHandles);
492
493 /* Find dead worker's slot, and clear the hThread field */
494 for (j = 0; j < pstate->numWorkers; j++)
495 {
496 slot = &(pstate->parallelSlot[j]);
497 if (slot->hThread == hThread)
498 {
499 /* For cleanliness, close handles for dead threads */
500 CloseHandle((HANDLE) slot->hThread);
501 slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
502 break;
503 }
504 }
505 #endif /* WIN32 */
506
507 /* On all platforms, update workerStatus and te[] as well */
508 Assert(j < pstate->numWorkers);
509 slot->workerStatus = WRKR_TERMINATED;
510 pstate->te[j] = NULL;
511 }
512 }
513
514
515 /*
516 * Code for responding to cancel interrupts (SIGINT, control-C, etc)
517 *
518 * This doesn't quite belong in this module, but it needs access to the
519 * ParallelState data, so there's not really a better place either.
520 *
521 * When we get a cancel interrupt, we could just die, but in pg_restore that
522 * could leave a SQL command (e.g., CREATE INDEX on a large table) running
523 * for a long time. Instead, we try to send a cancel request and then die.
524 * pg_dump probably doesn't really need this, but we might as well use it
525 * there too. Note that sending the cancel directly from the signal handler
526 * is safe because PQcancel() is written to make it so.
527 *
528 * In parallel operation on Unix, each process is responsible for canceling
529 * its own connection (this must be so because nobody else has access to it).
530 * Furthermore, the leader process should attempt to forward its signal to
531 * each child. In simple manual use of pg_dump/pg_restore, forwarding isn't
532 * needed because typing control-C at the console would deliver SIGINT to
533 * every member of the terminal process group --- but in other scenarios it
534 * might be that only the leader gets signaled.
535 *
536 * On Windows, the cancel handler runs in a separate thread, because that's
537 * how SetConsoleCtrlHandler works. We make it stop worker threads, send
538 * cancels on all active connections, and then return FALSE, which will allow
539 * the process to die. For safety's sake, we use a critical section to
540 * protect the PGcancel structures against being changed while the signal
541 * thread runs.
542 */
543
544 #ifndef WIN32
545
546 /*
547 * Signal handler (Unix only)
548 */
549 static void
sigTermHandler(SIGNAL_ARGS)550 sigTermHandler(SIGNAL_ARGS)
551 {
552 int i;
553 char errbuf[1];
554
555 /*
556 * Some platforms allow delivery of new signals to interrupt an active
557 * signal handler. That could muck up our attempt to send PQcancel, so
558 * disable the signals that setup_cancel_handler enabled.
559 */
560 pqsignal(SIGINT, SIG_IGN);
561 pqsignal(SIGTERM, SIG_IGN);
562 pqsignal(SIGQUIT, SIG_IGN);
563
564 /*
565 * If we're in the leader, forward signal to all workers. (It seems best
566 * to do this before PQcancel; killing the leader transaction will result
567 * in invalid-snapshot errors from active workers, which maybe we can
568 * quiet by killing workers first.) Ignore any errors.
569 */
570 if (signal_info.pstate != NULL)
571 {
572 for (i = 0; i < signal_info.pstate->numWorkers; i++)
573 {
574 pid_t pid = signal_info.pstate->parallelSlot[i].pid;
575
576 if (pid != 0)
577 kill(pid, SIGTERM);
578 }
579 }
580
581 /*
582 * Send QueryCancel if we have a connection to send to. Ignore errors,
583 * there's not much we can do about them anyway.
584 */
585 if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
586 (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
587
588 /*
589 * Report we're quitting, using nothing more complicated than write(2).
590 * When in parallel operation, only the leader process should do this.
591 */
592 if (!signal_info.am_worker)
593 {
594 if (progname)
595 {
596 write_stderr(progname);
597 write_stderr(": ");
598 }
599 write_stderr("terminated by user\n");
600 }
601
602 /*
603 * And die, using _exit() not exit() because the latter will invoke atexit
604 * handlers that can fail if we interrupted related code.
605 */
606 _exit(1);
607 }
608
609 /*
610 * Enable cancel interrupt handler, if not already done.
611 */
612 static void
setup_cancel_handler(void)613 setup_cancel_handler(void)
614 {
615 /*
616 * When forking, signal_info.handler_set will propagate into the new
617 * process, but that's fine because the signal handler state does too.
618 */
619 if (!signal_info.handler_set)
620 {
621 signal_info.handler_set = true;
622
623 pqsignal(SIGINT, sigTermHandler);
624 pqsignal(SIGTERM, sigTermHandler);
625 pqsignal(SIGQUIT, sigTermHandler);
626 }
627 }
628
629 #else /* WIN32 */
630
631 /*
632 * Console interrupt handler --- runs in a newly-started thread.
633 *
634 * After stopping other threads and sending cancel requests on all open
635 * connections, we return FALSE which will allow the default ExitProcess()
636 * action to be taken.
637 */
638 static BOOL WINAPI
consoleHandler(DWORD dwCtrlType)639 consoleHandler(DWORD dwCtrlType)
640 {
641 int i;
642 char errbuf[1];
643
644 if (dwCtrlType == CTRL_C_EVENT ||
645 dwCtrlType == CTRL_BREAK_EVENT)
646 {
647 /* Critical section prevents changing data we look at here */
648 EnterCriticalSection(&signal_info_lock);
649
650 /*
651 * If in parallel mode, stop worker threads and send QueryCancel to
652 * their connected backends. The main point of stopping the worker
653 * threads is to keep them from reporting the query cancels as errors,
654 * which would clutter the user's screen. We needn't stop the leader
655 * thread since it won't be doing much anyway. Do this before
656 * canceling the main transaction, else we might get invalid-snapshot
657 * errors reported before we can stop the workers. Ignore errors,
658 * there's not much we can do about them anyway.
659 */
660 if (signal_info.pstate != NULL)
661 {
662 for (i = 0; i < signal_info.pstate->numWorkers; i++)
663 {
664 ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
665 ArchiveHandle *AH = slot->AH;
666 HANDLE hThread = (HANDLE) slot->hThread;
667
668 /*
669 * Using TerminateThread here may leave some resources leaked,
670 * but it doesn't matter since we're about to end the whole
671 * process.
672 */
673 if (hThread != INVALID_HANDLE_VALUE)
674 TerminateThread(hThread, 0);
675
676 if (AH != NULL && AH->connCancel != NULL)
677 (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
678 }
679 }
680
681 /*
682 * Send QueryCancel to leader connection, if enabled. Ignore errors,
683 * there's not much we can do about them anyway.
684 */
685 if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
686 (void) PQcancel(signal_info.myAH->connCancel,
687 errbuf, sizeof(errbuf));
688
689 LeaveCriticalSection(&signal_info_lock);
690
691 /*
692 * Report we're quitting, using nothing more complicated than
693 * write(2). (We might be able to get away with using pg_log_*()
694 * here, but since we terminated other threads uncleanly above, it
695 * seems better to assume as little as possible.)
696 */
697 if (progname)
698 {
699 write_stderr(progname);
700 write_stderr(": ");
701 }
702 write_stderr("terminated by user\n");
703 }
704
705 /* Always return FALSE to allow signal handling to continue */
706 return FALSE;
707 }
708
709 /*
710 * Enable cancel interrupt handler, if not already done.
711 */
712 static void
setup_cancel_handler(void)713 setup_cancel_handler(void)
714 {
715 if (!signal_info.handler_set)
716 {
717 signal_info.handler_set = true;
718
719 InitializeCriticalSection(&signal_info_lock);
720
721 SetConsoleCtrlHandler(consoleHandler, TRUE);
722 }
723 }
724
725 #endif /* WIN32 */
726
727
728 /*
729 * set_archive_cancel_info
730 *
731 * Fill AH->connCancel with cancellation info for the specified database
732 * connection; or clear it if conn is NULL.
733 */
734 void
set_archive_cancel_info(ArchiveHandle * AH,PGconn * conn)735 set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
736 {
737 PGcancel *oldConnCancel;
738
739 /*
740 * Activate the interrupt handler if we didn't yet in this process. On
741 * Windows, this also initializes signal_info_lock; therefore it's
742 * important that this happen at least once before we fork off any
743 * threads.
744 */
745 setup_cancel_handler();
746
747 /*
748 * On Unix, we assume that storing a pointer value is atomic with respect
749 * to any possible signal interrupt. On Windows, use a critical section.
750 */
751
752 #ifdef WIN32
753 EnterCriticalSection(&signal_info_lock);
754 #endif
755
756 /* Free the old one if we have one */
757 oldConnCancel = AH->connCancel;
758 /* be sure interrupt handler doesn't use pointer while freeing */
759 AH->connCancel = NULL;
760
761 if (oldConnCancel != NULL)
762 PQfreeCancel(oldConnCancel);
763
764 /* Set the new one if specified */
765 if (conn)
766 AH->connCancel = PQgetCancel(conn);
767
768 /*
769 * On Unix, there's only ever one active ArchiveHandle per process, so we
770 * can just set signal_info.myAH unconditionally. On Windows, do that
771 * only in the main thread; worker threads have to make sure their
772 * ArchiveHandle appears in the pstate data, which is dealt with in
773 * RunWorker().
774 */
775 #ifndef WIN32
776 signal_info.myAH = AH;
777 #else
778 if (mainThreadId == GetCurrentThreadId())
779 signal_info.myAH = AH;
780 #endif
781
782 #ifdef WIN32
783 LeaveCriticalSection(&signal_info_lock);
784 #endif
785 }
786
787 /*
788 * set_cancel_pstate
789 *
790 * Set signal_info.pstate to point to the specified ParallelState, if any.
791 * We need this mainly to have an interlock against Windows signal thread.
792 */
793 static void
set_cancel_pstate(ParallelState * pstate)794 set_cancel_pstate(ParallelState *pstate)
795 {
796 #ifdef WIN32
797 EnterCriticalSection(&signal_info_lock);
798 #endif
799
800 signal_info.pstate = pstate;
801
802 #ifdef WIN32
803 LeaveCriticalSection(&signal_info_lock);
804 #endif
805 }
806
807 /*
808 * set_cancel_slot_archive
809 *
810 * Set ParallelSlot's AH field to point to the specified archive, if any.
811 * We need this mainly to have an interlock against Windows signal thread.
812 */
813 static void
set_cancel_slot_archive(ParallelSlot * slot,ArchiveHandle * AH)814 set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
815 {
816 #ifdef WIN32
817 EnterCriticalSection(&signal_info_lock);
818 #endif
819
820 slot->AH = AH;
821
822 #ifdef WIN32
823 LeaveCriticalSection(&signal_info_lock);
824 #endif
825 }
826
827
828 /*
829 * This function is called by both Unix and Windows variants to set up
830 * and run a worker process. Caller should exit the process (or thread)
831 * upon return.
832 */
833 static void
RunWorker(ArchiveHandle * AH,ParallelSlot * slot)834 RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
835 {
836 int pipefd[2];
837
838 /* fetch child ends of pipes */
839 pipefd[PIPE_READ] = slot->pipeRevRead;
840 pipefd[PIPE_WRITE] = slot->pipeRevWrite;
841
842 /*
843 * Clone the archive so that we have our own state to work with, and in
844 * particular our own database connection.
845 *
846 * We clone on Unix as well as Windows, even though technically we don't
847 * need to because fork() gives us a copy in our own address space
848 * already. But CloneArchive resets the state information and also clones
849 * the database connection which both seem kinda helpful.
850 */
851 AH = CloneArchive(AH);
852
853 /* Remember cloned archive where signal handler can find it */
854 set_cancel_slot_archive(slot, AH);
855
856 /*
857 * Call the setup worker function that's defined in the ArchiveHandle.
858 */
859 (AH->SetupWorkerPtr) ((Archive *) AH);
860
861 /*
862 * Execute commands until done.
863 */
864 WaitForCommands(AH, pipefd);
865
866 /*
867 * Disconnect from database and clean up.
868 */
869 set_cancel_slot_archive(slot, NULL);
870 DisconnectDatabase(&(AH->public));
871 DeCloneArchive(AH);
872 }
873
874 /*
875 * Thread base function for Windows
876 */
877 #ifdef WIN32
878 static unsigned __stdcall
init_spawned_worker_win32(WorkerInfo * wi)879 init_spawned_worker_win32(WorkerInfo *wi)
880 {
881 ArchiveHandle *AH = wi->AH;
882 ParallelSlot *slot = wi->slot;
883
884 /* Don't need WorkerInfo anymore */
885 free(wi);
886
887 /* Run the worker ... */
888 RunWorker(AH, slot);
889
890 /* Exit the thread */
891 _endthreadex(0);
892 return 0;
893 }
894 #endif /* WIN32 */
895
896 /*
897 * This function starts a parallel dump or restore by spawning off the worker
898 * processes. For Windows, it creates a number of threads; on Unix the
899 * workers are created with fork().
900 */
901 ParallelState *
ParallelBackupStart(ArchiveHandle * AH)902 ParallelBackupStart(ArchiveHandle *AH)
903 {
904 ParallelState *pstate;
905 int i;
906
907 Assert(AH->public.numWorkers > 0);
908
909 pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
910
911 pstate->numWorkers = AH->public.numWorkers;
912 pstate->te = NULL;
913 pstate->parallelSlot = NULL;
914
915 if (AH->public.numWorkers == 1)
916 return pstate;
917
918 /* Create status arrays, being sure to initialize all fields to 0 */
919 pstate->te = (TocEntry **)
920 pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
921 pstate->parallelSlot = (ParallelSlot *)
922 pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
923
924 #ifdef WIN32
925 /* Make fmtId() and fmtQualifiedId() use thread-local storage */
926 getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
927 #endif
928
929 /*
930 * Set the pstate in shutdown_info, to tell the exit handler that it must
931 * clean up workers as well as the main database connection. But we don't
932 * set this in signal_info yet, because we don't want child processes to
933 * inherit non-NULL signal_info.pstate.
934 */
935 shutdown_info.pstate = pstate;
936
937 /*
938 * Temporarily disable query cancellation on the leader connection. This
939 * ensures that child processes won't inherit valid AH->connCancel
940 * settings and thus won't try to issue cancels against the leader's
941 * connection. No harm is done if we fail while it's disabled, because
942 * the leader connection is idle at this point anyway.
943 */
944 set_archive_cancel_info(AH, NULL);
945
946 /* Ensure stdio state is quiesced before forking */
947 fflush(NULL);
948
949 /* Create desired number of workers */
950 for (i = 0; i < pstate->numWorkers; i++)
951 {
952 #ifdef WIN32
953 WorkerInfo *wi;
954 uintptr_t handle;
955 #else
956 pid_t pid;
957 #endif
958 ParallelSlot *slot = &(pstate->parallelSlot[i]);
959 int pipeMW[2],
960 pipeWM[2];
961
962 /* Create communication pipes for this worker */
963 if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
964 fatal("could not create communication channels: %m");
965
966 /* leader's ends of the pipes */
967 slot->pipeRead = pipeWM[PIPE_READ];
968 slot->pipeWrite = pipeMW[PIPE_WRITE];
969 /* child's ends of the pipes */
970 slot->pipeRevRead = pipeMW[PIPE_READ];
971 slot->pipeRevWrite = pipeWM[PIPE_WRITE];
972
973 #ifdef WIN32
974 /* Create transient structure to pass args to worker function */
975 wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
976
977 wi->AH = AH;
978 wi->slot = slot;
979
980 handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
981 wi, 0, &(slot->threadId));
982 slot->hThread = handle;
983 slot->workerStatus = WRKR_IDLE;
984 #else /* !WIN32 */
985 pid = fork();
986 if (pid == 0)
987 {
988 /* we are the worker */
989 int j;
990
991 /* this is needed for GetMyPSlot() */
992 slot->pid = getpid();
993
994 /* instruct signal handler that we're in a worker now */
995 signal_info.am_worker = true;
996
997 /* close read end of Worker -> Leader */
998 closesocket(pipeWM[PIPE_READ]);
999 /* close write end of Leader -> Worker */
1000 closesocket(pipeMW[PIPE_WRITE]);
1001
1002 /*
1003 * Close all inherited fds for communication of the leader with
1004 * previously-forked workers.
1005 */
1006 for (j = 0; j < i; j++)
1007 {
1008 closesocket(pstate->parallelSlot[j].pipeRead);
1009 closesocket(pstate->parallelSlot[j].pipeWrite);
1010 }
1011
1012 /* Run the worker ... */
1013 RunWorker(AH, slot);
1014
1015 /* We can just exit(0) when done */
1016 exit(0);
1017 }
1018 else if (pid < 0)
1019 {
1020 /* fork failed */
1021 fatal("could not create worker process: %m");
1022 }
1023
1024 /* In Leader after successful fork */
1025 slot->pid = pid;
1026 slot->workerStatus = WRKR_IDLE;
1027
1028 /* close read end of Leader -> Worker */
1029 closesocket(pipeMW[PIPE_READ]);
1030 /* close write end of Worker -> Leader */
1031 closesocket(pipeWM[PIPE_WRITE]);
1032 #endif /* WIN32 */
1033 }
1034
1035 /*
1036 * Having forked off the workers, disable SIGPIPE so that leader isn't
1037 * killed if it tries to send a command to a dead worker. We don't want
1038 * the workers to inherit this setting, though.
1039 */
1040 #ifndef WIN32
1041 pqsignal(SIGPIPE, SIG_IGN);
1042 #endif
1043
1044 /*
1045 * Re-establish query cancellation on the leader connection.
1046 */
1047 set_archive_cancel_info(AH, AH->connection);
1048
1049 /*
1050 * Tell the cancel signal handler to forward signals to worker processes,
1051 * too. (As with query cancel, we did not need this earlier because the
1052 * workers have not yet been given anything to do; if we die before this
1053 * point, any already-started workers will see EOF and quit promptly.)
1054 */
1055 set_cancel_pstate(pstate);
1056
1057 return pstate;
1058 }
1059
1060 /*
1061 * Close down a parallel dump or restore.
1062 */
1063 void
ParallelBackupEnd(ArchiveHandle * AH,ParallelState * pstate)1064 ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
1065 {
1066 int i;
1067
1068 /* No work if non-parallel */
1069 if (pstate->numWorkers == 1)
1070 return;
1071
1072 /* There should not be any unfinished jobs */
1073 Assert(IsEveryWorkerIdle(pstate));
1074
1075 /* Close the sockets so that the workers know they can exit */
1076 for (i = 0; i < pstate->numWorkers; i++)
1077 {
1078 closesocket(pstate->parallelSlot[i].pipeRead);
1079 closesocket(pstate->parallelSlot[i].pipeWrite);
1080 }
1081
1082 /* Wait for them to exit */
1083 WaitForTerminatingWorkers(pstate);
1084
1085 /*
1086 * Unlink pstate from shutdown_info, so the exit handler will not try to
1087 * use it; and likewise unlink from signal_info.
1088 */
1089 shutdown_info.pstate = NULL;
1090 set_cancel_pstate(NULL);
1091
1092 /* Release state (mere neatnik-ism, since we're about to terminate) */
1093 free(pstate->te);
1094 free(pstate->parallelSlot);
1095 free(pstate);
1096 }
1097
1098 /*
1099 * These next four functions handle construction and parsing of the command
1100 * strings and response strings for parallel workers.
1101 *
1102 * Currently, these can be the same regardless of which archive format we are
1103 * processing. In future, we might want to let format modules override these
1104 * functions to add format-specific data to a command or response.
1105 */
1106
1107 /*
1108 * buildWorkerCommand: format a command string to send to a worker.
1109 *
1110 * The string is built in the caller-supplied buffer of size buflen.
1111 */
1112 static void
buildWorkerCommand(ArchiveHandle * AH,TocEntry * te,T_Action act,char * buf,int buflen)1113 buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act,
1114 char *buf, int buflen)
1115 {
1116 if (act == ACT_DUMP)
1117 snprintf(buf, buflen, "DUMP %d", te->dumpId);
1118 else if (act == ACT_RESTORE)
1119 snprintf(buf, buflen, "RESTORE %d", te->dumpId);
1120 else
1121 Assert(false);
1122 }
1123
1124 /*
1125 * parseWorkerCommand: interpret a command string in a worker.
1126 */
1127 static void
parseWorkerCommand(ArchiveHandle * AH,TocEntry ** te,T_Action * act,const char * msg)1128 parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act,
1129 const char *msg)
1130 {
1131 DumpId dumpId;
1132 int nBytes;
1133
1134 if (messageStartsWith(msg, "DUMP "))
1135 {
1136 *act = ACT_DUMP;
1137 sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
1138 Assert(nBytes == strlen(msg));
1139 *te = getTocEntryByDumpId(AH, dumpId);
1140 Assert(*te != NULL);
1141 }
1142 else if (messageStartsWith(msg, "RESTORE "))
1143 {
1144 *act = ACT_RESTORE;
1145 sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
1146 Assert(nBytes == strlen(msg));
1147 *te = getTocEntryByDumpId(AH, dumpId);
1148 Assert(*te != NULL);
1149 }
1150 else
1151 fatal("unrecognized command received from leader: \"%s\"",
1152 msg);
1153 }
1154
1155 /*
1156 * buildWorkerResponse: format a response string to send to the leader.
1157 *
1158 * The string is built in the caller-supplied buffer of size buflen.
1159 */
1160 static void
buildWorkerResponse(ArchiveHandle * AH,TocEntry * te,T_Action act,int status,char * buf,int buflen)1161 buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status,
1162 char *buf, int buflen)
1163 {
1164 snprintf(buf, buflen, "OK %d %d %d",
1165 te->dumpId,
1166 status,
1167 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
1168 }
1169
1170 /*
1171 * parseWorkerResponse: parse the status message returned by a worker.
1172 *
1173 * Returns the integer status code, and may update fields of AH and/or te.
1174 */
1175 static int
parseWorkerResponse(ArchiveHandle * AH,TocEntry * te,const char * msg)1176 parseWorkerResponse(ArchiveHandle *AH, TocEntry *te,
1177 const char *msg)
1178 {
1179 DumpId dumpId;
1180 int nBytes,
1181 n_errors;
1182 int status = 0;
1183
1184 if (messageStartsWith(msg, "OK "))
1185 {
1186 sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1187
1188 Assert(dumpId == te->dumpId);
1189 Assert(nBytes == strlen(msg));
1190
1191 AH->public.n_errors += n_errors;
1192 }
1193 else
1194 fatal("invalid message received from worker: \"%s\"",
1195 msg);
1196
1197 return status;
1198 }
1199
1200 /*
1201 * Dispatch a job to some free worker.
1202 *
1203 * te is the TocEntry to be processed, act is the action to be taken on it.
1204 * callback is the function to call on completion of the job.
1205 *
1206 * If no worker is currently available, this will block, and previously
1207 * registered callback functions may be called.
1208 */
1209 void
DispatchJobForTocEntry(ArchiveHandle * AH,ParallelState * pstate,TocEntry * te,T_Action act,ParallelCompletionPtr callback,void * callback_data)1210 DispatchJobForTocEntry(ArchiveHandle *AH,
1211 ParallelState *pstate,
1212 TocEntry *te,
1213 T_Action act,
1214 ParallelCompletionPtr callback,
1215 void *callback_data)
1216 {
1217 int worker;
1218 char buf[256];
1219
1220 /* Get a worker, waiting if none are idle */
1221 while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
1222 WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
1223
1224 /* Construct and send command string */
1225 buildWorkerCommand(AH, te, act, buf, sizeof(buf));
1226
1227 sendMessageToWorker(pstate, worker, buf);
1228
1229 /* Remember worker is busy, and which TocEntry it's working on */
1230 pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
1231 pstate->parallelSlot[worker].callback = callback;
1232 pstate->parallelSlot[worker].callback_data = callback_data;
1233 pstate->te[worker] = te;
1234 }
1235
1236 /*
1237 * Find an idle worker and return its slot number.
1238 * Return NO_SLOT if none are idle.
1239 */
1240 static int
GetIdleWorker(ParallelState * pstate)1241 GetIdleWorker(ParallelState *pstate)
1242 {
1243 int i;
1244
1245 for (i = 0; i < pstate->numWorkers; i++)
1246 {
1247 if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
1248 return i;
1249 }
1250 return NO_SLOT;
1251 }
1252
1253 /*
1254 * Return true iff no worker is running.
1255 */
1256 static bool
HasEveryWorkerTerminated(ParallelState * pstate)1257 HasEveryWorkerTerminated(ParallelState *pstate)
1258 {
1259 int i;
1260
1261 for (i = 0; i < pstate->numWorkers; i++)
1262 {
1263 if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1264 return false;
1265 }
1266 return true;
1267 }
1268
1269 /*
1270 * Return true iff every worker is in the WRKR_IDLE state.
1271 */
1272 bool
IsEveryWorkerIdle(ParallelState * pstate)1273 IsEveryWorkerIdle(ParallelState *pstate)
1274 {
1275 int i;
1276
1277 for (i = 0; i < pstate->numWorkers; i++)
1278 {
1279 if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
1280 return false;
1281 }
1282 return true;
1283 }
1284
1285 /*
1286 * Acquire lock on a table to be dumped by a worker process.
1287 *
1288 * The leader process is already holding an ACCESS SHARE lock. Ordinarily
1289 * it's no problem for a worker to get one too, but if anything else besides
1290 * pg_dump is running, there's a possible deadlock:
1291 *
1292 * 1) Leader dumps the schema and locks all tables in ACCESS SHARE mode.
1293 * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
1294 * because the leader holds a conflicting ACCESS SHARE lock).
1295 * 3) A worker process also requests an ACCESS SHARE lock to read the table.
1296 * The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
1297 * 4) Now we have a deadlock, since the leader is effectively waiting for
1298 * the worker. The server cannot detect that, however.
1299 *
1300 * To prevent an infinite wait, prior to touching a table in a worker, request
1301 * a lock in ACCESS SHARE mode but with NOWAIT. If we don't get the lock,
1302 * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
1303 * so we have a deadlock. We must fail the backup in that case.
1304 */
1305 static void
lockTableForWorker(ArchiveHandle * AH,TocEntry * te)1306 lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
1307 {
1308 const char *qualId;
1309 PQExpBuffer query;
1310 PGresult *res;
1311
1312 /* Nothing to do for BLOBS */
1313 if (strcmp(te->desc, "BLOBS") == 0)
1314 return;
1315
1316 query = createPQExpBuffer();
1317
1318 qualId = fmtQualifiedId(te->namespace, te->tag);
1319
1320 appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
1321 qualId);
1322
1323 res = PQexec(AH->connection, query->data);
1324
1325 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
1326 fatal("could not obtain lock on relation \"%s\"\n"
1327 "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1328 "on the table after the pg_dump parent process had gotten the "
1329 "initial ACCESS SHARE lock on the table.", qualId);
1330
1331 PQclear(res);
1332 destroyPQExpBuffer(query);
1333 }
1334
1335 /*
1336 * WaitForCommands: main routine for a worker process.
1337 *
1338 * Read and execute commands from the leader until we see EOF on the pipe.
1339 */
1340 static void
WaitForCommands(ArchiveHandle * AH,int pipefd[2])1341 WaitForCommands(ArchiveHandle *AH, int pipefd[2])
1342 {
1343 char *command;
1344 TocEntry *te;
1345 T_Action act;
1346 int status = 0;
1347 char buf[256];
1348
1349 for (;;)
1350 {
1351 if (!(command = getMessageFromLeader(pipefd)))
1352 {
1353 /* EOF, so done */
1354 return;
1355 }
1356
1357 /* Decode the command */
1358 parseWorkerCommand(AH, &te, &act, command);
1359
1360 if (act == ACT_DUMP)
1361 {
1362 /* Acquire lock on this table within the worker's session */
1363 lockTableForWorker(AH, te);
1364
1365 /* Perform the dump command */
1366 status = (AH->WorkerJobDumpPtr) (AH, te);
1367 }
1368 else if (act == ACT_RESTORE)
1369 {
1370 /* Perform the restore command */
1371 status = (AH->WorkerJobRestorePtr) (AH, te);
1372 }
1373 else
1374 Assert(false);
1375
1376 /* Return status to leader */
1377 buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
1378
1379 sendMessageToLeader(pipefd, buf);
1380
1381 /* command was pg_malloc'd and we are responsible for free()ing it. */
1382 free(command);
1383 }
1384 }
1385
1386 /*
1387 * Check for status messages from workers.
1388 *
1389 * If do_wait is true, wait to get a status message; otherwise, just return
1390 * immediately if there is none available.
1391 *
1392 * When we get a status message, we pass the status code to the callback
1393 * function that was specified to DispatchJobForTocEntry, then reset the
1394 * worker status to IDLE.
1395 *
1396 * Returns true if we collected a status message, else false.
1397 *
1398 * XXX is it worth checking for more than one status message per call?
1399 * It seems somewhat unlikely that multiple workers would finish at exactly
1400 * the same time.
1401 */
1402 static bool
ListenToWorkers(ArchiveHandle * AH,ParallelState * pstate,bool do_wait)1403 ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
1404 {
1405 int worker;
1406 char *msg;
1407
1408 /* Try to collect a status message */
1409 msg = getMessageFromWorker(pstate, do_wait, &worker);
1410
1411 if (!msg)
1412 {
1413 /* If do_wait is true, we must have detected EOF on some socket */
1414 if (do_wait)
1415 fatal("a worker process died unexpectedly");
1416 return false;
1417 }
1418
1419 /* Process it and update our idea of the worker's status */
1420 if (messageStartsWith(msg, "OK "))
1421 {
1422 ParallelSlot *slot = &pstate->parallelSlot[worker];
1423 TocEntry *te = pstate->te[worker];
1424 int status;
1425
1426 status = parseWorkerResponse(AH, te, msg);
1427 slot->callback(AH, te, status, slot->callback_data);
1428 slot->workerStatus = WRKR_IDLE;
1429 pstate->te[worker] = NULL;
1430 }
1431 else
1432 fatal("invalid message received from worker: \"%s\"",
1433 msg);
1434
1435 /* Free the string returned from getMessageFromWorker */
1436 free(msg);
1437
1438 return true;
1439 }
1440
1441 /*
1442 * Check for status results from workers, waiting if necessary.
1443 *
1444 * Available wait modes are:
1445 * WFW_NO_WAIT: reap any available status, but don't block
1446 * WFW_GOT_STATUS: wait for at least one more worker to finish
1447 * WFW_ONE_IDLE: wait for at least one worker to be idle
1448 * WFW_ALL_IDLE: wait for all workers to be idle
1449 *
1450 * Any received results are passed to the callback specified to
1451 * DispatchJobForTocEntry.
1452 *
1453 * This function is executed in the leader process.
1454 */
1455 void
WaitForWorkers(ArchiveHandle * AH,ParallelState * pstate,WFW_WaitOption mode)1456 WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
1457 {
1458 bool do_wait = false;
1459
1460 /*
1461 * In GOT_STATUS mode, always block waiting for a message, since we can't
1462 * return till we get something. In other modes, we don't block the first
1463 * time through the loop.
1464 */
1465 if (mode == WFW_GOT_STATUS)
1466 {
1467 /* Assert that caller knows what it's doing */
1468 Assert(!IsEveryWorkerIdle(pstate));
1469 do_wait = true;
1470 }
1471
1472 for (;;)
1473 {
1474 /*
1475 * Check for status messages, even if we don't need to block. We do
1476 * not try very hard to reap all available messages, though, since
1477 * there's unlikely to be more than one.
1478 */
1479 if (ListenToWorkers(AH, pstate, do_wait))
1480 {
1481 /*
1482 * If we got a message, we are done by definition for GOT_STATUS
1483 * mode, and we can also be certain that there's at least one idle
1484 * worker. So we're done in all but ALL_IDLE mode.
1485 */
1486 if (mode != WFW_ALL_IDLE)
1487 return;
1488 }
1489
1490 /* Check whether we must wait for new status messages */
1491 switch (mode)
1492 {
1493 case WFW_NO_WAIT:
1494 return; /* never wait */
1495 case WFW_GOT_STATUS:
1496 Assert(false); /* can't get here, because we waited */
1497 break;
1498 case WFW_ONE_IDLE:
1499 if (GetIdleWorker(pstate) != NO_SLOT)
1500 return;
1501 break;
1502 case WFW_ALL_IDLE:
1503 if (IsEveryWorkerIdle(pstate))
1504 return;
1505 break;
1506 }
1507
1508 /* Loop back, and this time wait for something to happen */
1509 do_wait = true;
1510 }
1511 }
1512
1513 /*
1514 * Read one command message from the leader, blocking if necessary
1515 * until one is available, and return it as a malloc'd string.
1516 * On EOF, return NULL.
1517 *
1518 * This function is executed in worker processes.
1519 */
1520 static char *
getMessageFromLeader(int pipefd[2])1521 getMessageFromLeader(int pipefd[2])
1522 {
1523 return readMessageFromPipe(pipefd[PIPE_READ]);
1524 }
1525
1526 /*
1527 * Send a status message to the leader.
1528 *
1529 * This function is executed in worker processes.
1530 */
1531 static void
sendMessageToLeader(int pipefd[2],const char * str)1532 sendMessageToLeader(int pipefd[2], const char *str)
1533 {
1534 int len = strlen(str) + 1;
1535
1536 if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
1537 fatal("could not write to the communication channel: %m");
1538 }
1539
1540 /*
1541 * Wait until some descriptor in "workerset" becomes readable.
1542 * Returns -1 on error, else the number of readable descriptors.
1543 */
1544 static int
select_loop(int maxFd,fd_set * workerset)1545 select_loop(int maxFd, fd_set *workerset)
1546 {
1547 int i;
1548 fd_set saveSet = *workerset;
1549
1550 for (;;)
1551 {
1552 *workerset = saveSet;
1553 i = select(maxFd + 1, workerset, NULL, NULL, NULL);
1554
1555 #ifndef WIN32
1556 if (i < 0 && errno == EINTR)
1557 continue;
1558 #else
1559 if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1560 continue;
1561 #endif
1562 break;
1563 }
1564
1565 return i;
1566 }
1567
1568
1569 /*
1570 * Check for messages from worker processes.
1571 *
1572 * If a message is available, return it as a malloc'd string, and put the
1573 * index of the sending worker in *worker.
1574 *
1575 * If nothing is available, wait if "do_wait" is true, else return NULL.
1576 *
1577 * If we detect EOF on any socket, we'll return NULL. It's not great that
1578 * that's hard to distinguish from the no-data-available case, but for now
1579 * our one caller is okay with that.
1580 *
1581 * This function is executed in the leader process.
1582 */
1583 static char *
getMessageFromWorker(ParallelState * pstate,bool do_wait,int * worker)1584 getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
1585 {
1586 int i;
1587 fd_set workerset;
1588 int maxFd = -1;
1589 struct timeval nowait = {0, 0};
1590
1591 /* construct bitmap of socket descriptors for select() */
1592 FD_ZERO(&workerset);
1593 for (i = 0; i < pstate->numWorkers; i++)
1594 {
1595 if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1596 continue;
1597 FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
1598 if (pstate->parallelSlot[i].pipeRead > maxFd)
1599 maxFd = pstate->parallelSlot[i].pipeRead;
1600 }
1601
1602 if (do_wait)
1603 {
1604 i = select_loop(maxFd, &workerset);
1605 Assert(i != 0);
1606 }
1607 else
1608 {
1609 if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1610 return NULL;
1611 }
1612
1613 if (i < 0)
1614 fatal("%s() failed: %m", "select");
1615
1616 for (i = 0; i < pstate->numWorkers; i++)
1617 {
1618 char *msg;
1619
1620 if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1621 continue;
1622 if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
1623 continue;
1624
1625 /*
1626 * Read the message if any. If the socket is ready because of EOF,
1627 * we'll return NULL instead (and the socket will stay ready, so the
1628 * condition will persist).
1629 *
1630 * Note: because this is a blocking read, we'll wait if only part of
1631 * the message is available. Waiting a long time would be bad, but
1632 * since worker status messages are short and are always sent in one
1633 * operation, it shouldn't be a problem in practice.
1634 */
1635 msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
1636 *worker = i;
1637 return msg;
1638 }
1639 Assert(false);
1640 return NULL;
1641 }
1642
1643 /*
1644 * Send a command message to the specified worker process.
1645 *
1646 * This function is executed in the leader process.
1647 */
1648 static void
sendMessageToWorker(ParallelState * pstate,int worker,const char * str)1649 sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
1650 {
1651 int len = strlen(str) + 1;
1652
1653 if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
1654 {
1655 fatal("could not write to the communication channel: %m");
1656 }
1657 }
1658
1659 /*
1660 * Read one message from the specified pipe (fd), blocking if necessary
1661 * until one is available, and return it as a malloc'd string.
1662 * On EOF, return NULL.
1663 *
1664 * A "message" on the channel is just a null-terminated string.
1665 */
1666 static char *
readMessageFromPipe(int fd)1667 readMessageFromPipe(int fd)
1668 {
1669 char *msg;
1670 int msgsize,
1671 bufsize;
1672 int ret;
1673
1674 /*
1675 * In theory, if we let piperead() read multiple bytes, it might give us
1676 * back fragments of multiple messages. (That can't actually occur, since
1677 * neither leader nor workers send more than one message without waiting
1678 * for a reply, but we don't wish to assume that here.) For simplicity,
1679 * read a byte at a time until we get the terminating '\0'. This method
1680 * is a bit inefficient, but since this is only used for relatively short
1681 * command and status strings, it shouldn't matter.
1682 */
1683 bufsize = 64; /* could be any number */
1684 msg = (char *) pg_malloc(bufsize);
1685 msgsize = 0;
1686 for (;;)
1687 {
1688 Assert(msgsize < bufsize);
1689 ret = piperead(fd, msg + msgsize, 1);
1690 if (ret <= 0)
1691 break; /* error or connection closure */
1692
1693 Assert(ret == 1);
1694
1695 if (msg[msgsize] == '\0')
1696 return msg; /* collected whole message */
1697
1698 msgsize++;
1699 if (msgsize == bufsize) /* enlarge buffer if needed */
1700 {
1701 bufsize += 16; /* could be any number */
1702 msg = (char *) pg_realloc(msg, bufsize);
1703 }
1704 }
1705
1706 /* Other end has closed the connection */
1707 pg_free(msg);
1708 return NULL;
1709 }
1710
1711 #ifdef WIN32
1712
1713 /*
1714 * This is a replacement version of pipe(2) for Windows which allows the pipe
1715 * handles to be used in select().
1716 *
1717 * Reads and writes on the pipe must go through piperead()/pipewrite().
1718 *
1719 * For consistency with Unix we declare the returned handles as "int".
1720 * This is okay even on WIN64 because system handles are not more than
1721 * 32 bits wide, but we do have to do some casting.
1722 */
1723 static int
pgpipe(int handles[2])1724 pgpipe(int handles[2])
1725 {
1726 pgsocket s,
1727 tmp_sock;
1728 struct sockaddr_in serv_addr;
1729 int len = sizeof(serv_addr);
1730
1731 /* We have to use the Unix socket invalid file descriptor value here. */
1732 handles[0] = handles[1] = -1;
1733
1734 /*
1735 * setup listen socket
1736 */
1737 if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1738 {
1739 pg_log_error("pgpipe: could not create socket: error code %d",
1740 WSAGetLastError());
1741 return -1;
1742 }
1743
1744 memset((void *) &serv_addr, 0, sizeof(serv_addr));
1745 serv_addr.sin_family = AF_INET;
1746 serv_addr.sin_port = pg_hton16(0);
1747 serv_addr.sin_addr.s_addr = pg_hton32(INADDR_LOOPBACK);
1748 if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1749 {
1750 pg_log_error("pgpipe: could not bind: error code %d",
1751 WSAGetLastError());
1752 closesocket(s);
1753 return -1;
1754 }
1755 if (listen(s, 1) == SOCKET_ERROR)
1756 {
1757 pg_log_error("pgpipe: could not listen: error code %d",
1758 WSAGetLastError());
1759 closesocket(s);
1760 return -1;
1761 }
1762 if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
1763 {
1764 pg_log_error("pgpipe: %s() failed: error code %d", "getsockname",
1765 WSAGetLastError());
1766 closesocket(s);
1767 return -1;
1768 }
1769
1770 /*
1771 * setup pipe handles
1772 */
1773 if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1774 {
1775 pg_log_error("pgpipe: could not create second socket: error code %d",
1776 WSAGetLastError());
1777 closesocket(s);
1778 return -1;
1779 }
1780 handles[1] = (int) tmp_sock;
1781
1782 if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1783 {
1784 pg_log_error("pgpipe: could not connect socket: error code %d",
1785 WSAGetLastError());
1786 closesocket(handles[1]);
1787 handles[1] = -1;
1788 closesocket(s);
1789 return -1;
1790 }
1791 if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
1792 {
1793 pg_log_error("pgpipe: could not accept connection: error code %d",
1794 WSAGetLastError());
1795 closesocket(handles[1]);
1796 handles[1] = -1;
1797 closesocket(s);
1798 return -1;
1799 }
1800 handles[0] = (int) tmp_sock;
1801
1802 closesocket(s);
1803 return 0;
1804 }
1805
1806 #endif /* WIN32 */
1807