1 /*-------------------------------------------------------------------------
2 *
3 * parallel.c
4 *
5 * Parallel support for pg_dump and pg_restore
6 *
7 * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/bin/pg_dump/parallel.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16 /*
17 * Parallel operation works like this:
18 *
19 * The original, master process calls ParallelBackupStart(), which forks off
20 * the desired number of worker processes, which each enter WaitForCommands().
21 *
22 * The master process dispatches an individual work item to one of the worker
23 * processes in DispatchJobForTocEntry(). We send a command string such as
24 * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
25 * The worker process receives and decodes the command and passes it to the
26 * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
27 * which are routines of the current archive format. That routine performs
28 * the required action (dump or restore) and returns an integer status code.
29 * This is passed back to the master where we pass it to the
30 * ParallelCompletionPtr callback function that was passed to
31 * DispatchJobForTocEntry(). The callback function does state updating
32 * for the master control logic in pg_backup_archiver.c.
33 *
34 * In principle additional archive-format-specific information might be needed
35 * in commands or worker status responses, but so far that hasn't proved
36 * necessary, since workers have full copies of the ArchiveHandle/TocEntry
37 * data structures. Remember that we have forked off the workers only after
38 * we have read in the catalog. That's why our worker processes can also
39 * access the catalog information. (In the Windows case, the workers are
40 * threads in the same process. To avoid problems, they work with cloned
41 * copies of the Archive data structure; see RunWorker().)
42 *
43 * In the master process, the workerStatus field for each worker has one of
44 * the following values:
45 * WRKR_NOT_STARTED: we've not yet forked this worker
46 * WRKR_IDLE: it's waiting for a command
47 * WRKR_WORKING: it's working on a command
48 * WRKR_TERMINATED: process ended
49 * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
50 * state, and must be NULL in other states.
51 */
52
53 #include "postgres_fe.h"
54
55 #ifndef WIN32
56 #include <sys/wait.h>
57 #include <signal.h>
58 #include <unistd.h>
59 #include <fcntl.h>
60 #endif
61 #ifdef HAVE_SYS_SELECT_H
62 #include <sys/select.h>
63 #endif
64
65 #include "parallel.h"
66 #include "pg_backup_utils.h"
67 #include "fe_utils/string_utils.h"
68
69 /* Mnemonic macros for indexing the fd array returned by pipe(2) */
70 #define PIPE_READ 0
71 #define PIPE_WRITE 1
72
73 #define NO_SLOT (-1) /* Failure result for GetIdleWorker() */
74
75 /* Worker process statuses */
76 typedef enum
77 {
78 WRKR_NOT_STARTED = 0,
79 WRKR_IDLE,
80 WRKR_WORKING,
81 WRKR_TERMINATED
82 } T_WorkerStatus;
83
84 #define WORKER_IS_RUNNING(workerStatus) \
85 ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
86
87 /*
88 * Private per-parallel-worker state (typedef for this is in parallel.h).
89 *
90 * Much of this is valid only in the master process (or, on Windows, should
91 * be touched only by the master thread). But the AH field should be touched
92 * only by workers. The pipe descriptors are valid everywhere.
93 */
94 struct ParallelSlot
95 {
96 T_WorkerStatus workerStatus; /* see enum above */
97
98 /* These fields are valid if workerStatus == WRKR_WORKING: */
99 ParallelCompletionPtr callback; /* function to call on completion */
100 void *callback_data; /* passthrough data for it */
101
102 ArchiveHandle *AH; /* Archive data worker is using */
103
104 int pipeRead; /* master's end of the pipes */
105 int pipeWrite;
106 int pipeRevRead; /* child's end of the pipes */
107 int pipeRevWrite;
108
109 /* Child process/thread identity info: */
110 #ifdef WIN32
111 uintptr_t hThread;
112 unsigned int threadId;
113 #else
114 pid_t pid;
115 #endif
116 };
117
118 #ifdef WIN32
119
120 /*
121 * Structure to hold info passed by _beginthreadex() to the function it calls
122 * via its single allowed argument.
123 */
124 typedef struct
125 {
126 ArchiveHandle *AH; /* master database connection */
127 ParallelSlot *slot; /* this worker's parallel slot */
128 } WorkerInfo;
129
130 /* Windows implementation of pipe access */
131 static int pgpipe(int handles[2]);
132 static int piperead(int s, char *buf, int len);
133 #define pipewrite(a,b,c) send(a,b,c,0)
134
135 #else /* !WIN32 */
136
137 /* Non-Windows implementation of pipe access */
138 #define pgpipe(a) pipe(a)
139 #define piperead(a,b,c) read(a,b,c)
140 #define pipewrite(a,b,c) write(a,b,c)
141
142 #endif /* WIN32 */
143
144 /*
145 * State info for archive_close_connection() shutdown callback.
146 */
147 typedef struct ShutdownInformation
148 {
149 ParallelState *pstate;
150 Archive *AHX;
151 } ShutdownInformation;
152
153 static ShutdownInformation shutdown_info;
154
155 /*
156 * State info for signal handling.
157 * We assume signal_info initializes to zeroes.
158 *
159 * On Unix, myAH is the master DB connection in the master process, and the
160 * worker's own connection in worker processes. On Windows, we have only one
161 * instance of signal_info, so myAH is the master connection and the worker
162 * connections must be dug out of pstate->parallelSlot[].
163 */
164 typedef struct DumpSignalInformation
165 {
166 ArchiveHandle *myAH; /* database connection to issue cancel for */
167 ParallelState *pstate; /* parallel state, if any */
168 bool handler_set; /* signal handler set up in this process? */
169 #ifndef WIN32
170 bool am_worker; /* am I a worker process? */
171 #endif
172 } DumpSignalInformation;
173
174 static volatile DumpSignalInformation signal_info;
175
176 #ifdef WIN32
177 static CRITICAL_SECTION signal_info_lock;
178 #endif
179
180 /*
181 * Write a simple string to stderr --- must be safe in a signal handler.
182 * We ignore the write() result since there's not much we could do about it.
183 * Certain compilers make that harder than it ought to be.
184 */
185 #define write_stderr(str) \
186 do { \
187 const char *str_ = (str); \
188 int rc_; \
189 rc_ = write(fileno(stderr), str_, strlen(str_)); \
190 (void) rc_; \
191 } while (0)
192
193
194 #ifdef WIN32
195 /* file-scope variables */
196 static DWORD tls_index;
197
198 /* globally visible variables (needed by exit_nicely) */
199 bool parallel_init_done = false;
200 DWORD mainThreadId;
201 #endif /* WIN32 */
202
203 static const char *modulename = gettext_noop("parallel archiver");
204
205 /* Local function prototypes */
206 static ParallelSlot *GetMyPSlot(ParallelState *pstate);
207 static void archive_close_connection(int code, void *arg);
208 static void ShutdownWorkersHard(ParallelState *pstate);
209 static void WaitForTerminatingWorkers(ParallelState *pstate);
210 static void setup_cancel_handler(void);
211 static void set_cancel_pstate(ParallelState *pstate);
212 static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
213 static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
214 static int GetIdleWorker(ParallelState *pstate);
215 static bool HasEveryWorkerTerminated(ParallelState *pstate);
216 static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
217 static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
218 static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
219 bool do_wait);
220 static char *getMessageFromMaster(int pipefd[2]);
221 static void sendMessageToMaster(int pipefd[2], const char *str);
222 static int select_loop(int maxFd, fd_set *workerset);
223 static char *getMessageFromWorker(ParallelState *pstate,
224 bool do_wait, int *worker);
225 static void sendMessageToWorker(ParallelState *pstate,
226 int worker, const char *str);
227 static char *readMessageFromPipe(int fd);
228
229 #define messageStartsWith(msg, prefix) \
230 (strncmp(msg, prefix, strlen(prefix)) == 0)
231 #define messageEquals(msg, pattern) \
232 (strcmp(msg, pattern) == 0)
233
234
235 /*
236 * Initialize parallel dump support --- should be called early in process
237 * startup. (Currently, this is called whether or not we intend parallel
238 * activity.)
239 */
240 void
init_parallel_dump_utils(void)241 init_parallel_dump_utils(void)
242 {
243 #ifdef WIN32
244 if (!parallel_init_done)
245 {
246 WSADATA wsaData;
247 int err;
248
249 /* Prepare for threaded operation */
250 tls_index = TlsAlloc();
251 mainThreadId = GetCurrentThreadId();
252
253 /* Initialize socket access */
254 err = WSAStartup(MAKEWORD(2, 2), &wsaData);
255 if (err != 0)
256 {
257 fprintf(stderr, _("%s: WSAStartup failed: %d\n"), progname, err);
258 exit_nicely(1);
259 }
260
261 parallel_init_done = true;
262 }
263 #endif
264 }
265
266 /*
267 * Find the ParallelSlot for the current worker process or thread.
268 *
269 * Returns NULL if no matching slot is found (this implies we're the master).
270 */
271 static ParallelSlot *
GetMyPSlot(ParallelState * pstate)272 GetMyPSlot(ParallelState *pstate)
273 {
274 int i;
275
276 for (i = 0; i < pstate->numWorkers; i++)
277 {
278 #ifdef WIN32
279 if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
280 #else
281 if (pstate->parallelSlot[i].pid == getpid())
282 #endif
283 return &(pstate->parallelSlot[i]);
284 }
285
286 return NULL;
287 }
288
289 /*
290 * A thread-local version of getLocalPQExpBuffer().
291 *
292 * Non-reentrant but reduces memory leakage: we'll consume one buffer per
293 * thread, which is much better than one per fmtId/fmtQualifiedId call.
294 */
295 #ifdef WIN32
296 static PQExpBuffer
getThreadLocalPQExpBuffer(void)297 getThreadLocalPQExpBuffer(void)
298 {
299 /*
300 * The Tls code goes awry if we use a static var, so we provide for both
301 * static and auto, and omit any use of the static var when using Tls. We
302 * rely on TlsGetValue() to return 0 if the value is not yet set.
303 */
304 static PQExpBuffer s_id_return = NULL;
305 PQExpBuffer id_return;
306
307 if (parallel_init_done)
308 id_return = (PQExpBuffer) TlsGetValue(tls_index);
309 else
310 id_return = s_id_return;
311
312 if (id_return) /* first time through? */
313 {
314 /* same buffer, just wipe contents */
315 resetPQExpBuffer(id_return);
316 }
317 else
318 {
319 /* new buffer */
320 id_return = createPQExpBuffer();
321 if (parallel_init_done)
322 TlsSetValue(tls_index, id_return);
323 else
324 s_id_return = id_return;
325 }
326
327 return id_return;
328 }
329 #endif /* WIN32 */
330
331 /*
332 * pg_dump and pg_restore call this to register the cleanup handler
333 * as soon as they've created the ArchiveHandle.
334 */
335 void
on_exit_close_archive(Archive * AHX)336 on_exit_close_archive(Archive *AHX)
337 {
338 shutdown_info.AHX = AHX;
339 on_exit_nicely(archive_close_connection, &shutdown_info);
340 }
341
342 /*
343 * on_exit_nicely handler for shutting down database connections and
344 * worker processes cleanly.
345 */
346 static void
archive_close_connection(int code,void * arg)347 archive_close_connection(int code, void *arg)
348 {
349 ShutdownInformation *si = (ShutdownInformation *) arg;
350
351 if (si->pstate)
352 {
353 /* In parallel mode, must figure out who we are */
354 ParallelSlot *slot = GetMyPSlot(si->pstate);
355
356 if (!slot)
357 {
358 /*
359 * We're the master. Forcibly shut down workers, then close our
360 * own database connection, if any.
361 */
362 ShutdownWorkersHard(si->pstate);
363
364 if (si->AHX)
365 DisconnectDatabase(si->AHX);
366 }
367 else
368 {
369 /*
370 * We're a worker. Shut down our own DB connection if any. On
371 * Windows, we also have to close our communication sockets, to
372 * emulate what will happen on Unix when the worker process exits.
373 * (Without this, if this is a premature exit, the master would
374 * fail to detect it because there would be no EOF condition on
375 * the other end of the pipe.)
376 */
377 if (slot->AH)
378 DisconnectDatabase(&(slot->AH->public));
379
380 #ifdef WIN32
381 closesocket(slot->pipeRevRead);
382 closesocket(slot->pipeRevWrite);
383 #endif
384 }
385 }
386 else
387 {
388 /* Non-parallel operation: just kill the master DB connection */
389 if (si->AHX)
390 DisconnectDatabase(si->AHX);
391 }
392 }
393
394 /*
395 * Forcibly shut down any remaining workers, waiting for them to finish.
396 *
397 * Note that we don't expect to come here during normal exit (the workers
398 * should be long gone, and the ParallelState too). We're only here in an
399 * exit_horribly() situation, so intervening to cancel active commands is
400 * appropriate.
401 */
402 static void
ShutdownWorkersHard(ParallelState * pstate)403 ShutdownWorkersHard(ParallelState *pstate)
404 {
405 int i;
406
407 /*
408 * Close our write end of the sockets so that any workers waiting for
409 * commands know they can exit. (Note: some of the pipeWrite fields might
410 * still be zero, if we failed to initialize all the workers. Hence, just
411 * ignore errors here.)
412 */
413 for (i = 0; i < pstate->numWorkers; i++)
414 closesocket(pstate->parallelSlot[i].pipeWrite);
415
416 /*
417 * Force early termination of any commands currently in progress.
418 */
419 #ifndef WIN32
420 /* On non-Windows, send SIGTERM to each worker process. */
421 for (i = 0; i < pstate->numWorkers; i++)
422 {
423 pid_t pid = pstate->parallelSlot[i].pid;
424
425 if (pid != 0)
426 kill(pid, SIGTERM);
427 }
428 #else
429
430 /*
431 * On Windows, send query cancels directly to the workers' backends. Use
432 * a critical section to ensure worker threads don't change state.
433 */
434 EnterCriticalSection(&signal_info_lock);
435 for (i = 0; i < pstate->numWorkers; i++)
436 {
437 ArchiveHandle *AH = pstate->parallelSlot[i].AH;
438 char errbuf[1];
439
440 if (AH != NULL && AH->connCancel != NULL)
441 (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
442 }
443 LeaveCriticalSection(&signal_info_lock);
444 #endif
445
446 /* Now wait for them to terminate. */
447 WaitForTerminatingWorkers(pstate);
448 }
449
450 /*
451 * Wait for all workers to terminate.
452 */
453 static void
WaitForTerminatingWorkers(ParallelState * pstate)454 WaitForTerminatingWorkers(ParallelState *pstate)
455 {
456 while (!HasEveryWorkerTerminated(pstate))
457 {
458 ParallelSlot *slot = NULL;
459 int j;
460
461 #ifndef WIN32
462 /* On non-Windows, use wait() to wait for next worker to end */
463 int status;
464 pid_t pid = wait(&status);
465
466 /* Find dead worker's slot, and clear the PID field */
467 for (j = 0; j < pstate->numWorkers; j++)
468 {
469 slot = &(pstate->parallelSlot[j]);
470 if (slot->pid == pid)
471 {
472 slot->pid = 0;
473 break;
474 }
475 }
476 #else /* WIN32 */
477 /* On Windows, we must use WaitForMultipleObjects() */
478 HANDLE *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
479 int nrun = 0;
480 DWORD ret;
481 uintptr_t hThread;
482
483 for (j = 0; j < pstate->numWorkers; j++)
484 {
485 if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
486 {
487 lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
488 nrun++;
489 }
490 }
491 ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
492 Assert(ret != WAIT_FAILED);
493 hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
494 free(lpHandles);
495
496 /* Find dead worker's slot, and clear the hThread field */
497 for (j = 0; j < pstate->numWorkers; j++)
498 {
499 slot = &(pstate->parallelSlot[j]);
500 if (slot->hThread == hThread)
501 {
502 /* For cleanliness, close handles for dead threads */
503 CloseHandle((HANDLE) slot->hThread);
504 slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
505 break;
506 }
507 }
508 #endif /* WIN32 */
509
510 /* On all platforms, update workerStatus and te[] as well */
511 Assert(j < pstate->numWorkers);
512 slot->workerStatus = WRKR_TERMINATED;
513 pstate->te[j] = NULL;
514 }
515 }
516
517
518 /*
519 * Code for responding to cancel interrupts (SIGINT, control-C, etc)
520 *
521 * This doesn't quite belong in this module, but it needs access to the
522 * ParallelState data, so there's not really a better place either.
523 *
524 * When we get a cancel interrupt, we could just die, but in pg_restore that
525 * could leave a SQL command (e.g., CREATE INDEX on a large table) running
526 * for a long time. Instead, we try to send a cancel request and then die.
527 * pg_dump probably doesn't really need this, but we might as well use it
528 * there too. Note that sending the cancel directly from the signal handler
529 * is safe because PQcancel() is written to make it so.
530 *
531 * In parallel operation on Unix, each process is responsible for canceling
532 * its own connection (this must be so because nobody else has access to it).
533 * Furthermore, the master process should attempt to forward its signal to
534 * each child. In simple manual use of pg_dump/pg_restore, forwarding isn't
535 * needed because typing control-C at the console would deliver SIGINT to
536 * every member of the terminal process group --- but in other scenarios it
537 * might be that only the master gets signaled.
538 *
539 * On Windows, the cancel handler runs in a separate thread, because that's
540 * how SetConsoleCtrlHandler works. We make it stop worker threads, send
541 * cancels on all active connections, and then return FALSE, which will allow
542 * the process to die. For safety's sake, we use a critical section to
543 * protect the PGcancel structures against being changed while the signal
544 * thread runs.
545 */
546
547 #ifndef WIN32
548
549 /*
550 * Signal handler (Unix only)
551 */
552 static void
sigTermHandler(SIGNAL_ARGS)553 sigTermHandler(SIGNAL_ARGS)
554 {
555 int i;
556 char errbuf[1];
557
558 /*
559 * Some platforms allow delivery of new signals to interrupt an active
560 * signal handler. That could muck up our attempt to send PQcancel, so
561 * disable the signals that setup_cancel_handler enabled.
562 */
563 pqsignal(SIGINT, SIG_IGN);
564 pqsignal(SIGTERM, SIG_IGN);
565 pqsignal(SIGQUIT, SIG_IGN);
566
567 /*
568 * If we're in the master, forward signal to all workers. (It seems best
569 * to do this before PQcancel; killing the master transaction will result
570 * in invalid-snapshot errors from active workers, which maybe we can
571 * quiet by killing workers first.) Ignore any errors.
572 */
573 if (signal_info.pstate != NULL)
574 {
575 for (i = 0; i < signal_info.pstate->numWorkers; i++)
576 {
577 pid_t pid = signal_info.pstate->parallelSlot[i].pid;
578
579 if (pid != 0)
580 kill(pid, SIGTERM);
581 }
582 }
583
584 /*
585 * Send QueryCancel if we have a connection to send to. Ignore errors,
586 * there's not much we can do about them anyway.
587 */
588 if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
589 (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
590
591 /*
592 * Report we're quitting, using nothing more complicated than write(2).
593 * When in parallel operation, only the master process should do this.
594 */
595 if (!signal_info.am_worker)
596 {
597 if (progname)
598 {
599 write_stderr(progname);
600 write_stderr(": ");
601 }
602 write_stderr("terminated by user\n");
603 }
604
605 /*
606 * And die, using _exit() not exit() because the latter will invoke atexit
607 * handlers that can fail if we interrupted related code.
608 */
609 _exit(1);
610 }
611
612 /*
613 * Enable cancel interrupt handler, if not already done.
614 */
615 static void
setup_cancel_handler(void)616 setup_cancel_handler(void)
617 {
618 /*
619 * When forking, signal_info.handler_set will propagate into the new
620 * process, but that's fine because the signal handler state does too.
621 */
622 if (!signal_info.handler_set)
623 {
624 signal_info.handler_set = true;
625
626 pqsignal(SIGINT, sigTermHandler);
627 pqsignal(SIGTERM, sigTermHandler);
628 pqsignal(SIGQUIT, sigTermHandler);
629 }
630 }
631
632 #else /* WIN32 */
633
634 /*
635 * Console interrupt handler --- runs in a newly-started thread.
636 *
637 * After stopping other threads and sending cancel requests on all open
638 * connections, we return FALSE which will allow the default ExitProcess()
639 * action to be taken.
640 */
641 static BOOL WINAPI
consoleHandler(DWORD dwCtrlType)642 consoleHandler(DWORD dwCtrlType)
643 {
644 int i;
645 char errbuf[1];
646
647 if (dwCtrlType == CTRL_C_EVENT ||
648 dwCtrlType == CTRL_BREAK_EVENT)
649 {
650 /* Critical section prevents changing data we look at here */
651 EnterCriticalSection(&signal_info_lock);
652
653 /*
654 * If in parallel mode, stop worker threads and send QueryCancel to
655 * their connected backends. The main point of stopping the worker
656 * threads is to keep them from reporting the query cancels as errors,
657 * which would clutter the user's screen. We needn't stop the master
658 * thread since it won't be doing much anyway. Do this before
659 * canceling the main transaction, else we might get invalid-snapshot
660 * errors reported before we can stop the workers. Ignore errors,
661 * there's not much we can do about them anyway.
662 */
663 if (signal_info.pstate != NULL)
664 {
665 for (i = 0; i < signal_info.pstate->numWorkers; i++)
666 {
667 ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
668 ArchiveHandle *AH = slot->AH;
669 HANDLE hThread = (HANDLE) slot->hThread;
670
671 /*
672 * Using TerminateThread here may leave some resources leaked,
673 * but it doesn't matter since we're about to end the whole
674 * process.
675 */
676 if (hThread != INVALID_HANDLE_VALUE)
677 TerminateThread(hThread, 0);
678
679 if (AH != NULL && AH->connCancel != NULL)
680 (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
681 }
682 }
683
684 /*
685 * Send QueryCancel to master connection, if enabled. Ignore errors,
686 * there's not much we can do about them anyway.
687 */
688 if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
689 (void) PQcancel(signal_info.myAH->connCancel,
690 errbuf, sizeof(errbuf));
691
692 LeaveCriticalSection(&signal_info_lock);
693
694 /*
695 * Report we're quitting, using nothing more complicated than
696 * write(2). (We might be able to get away with using write_msg()
697 * here, but since we terminated other threads uncleanly above, it
698 * seems better to assume as little as possible.)
699 */
700 if (progname)
701 {
702 write_stderr(progname);
703 write_stderr(": ");
704 }
705 write_stderr("terminated by user\n");
706 }
707
708 /* Always return FALSE to allow signal handling to continue */
709 return FALSE;
710 }
711
712 /*
713 * Enable cancel interrupt handler, if not already done.
714 */
715 static void
setup_cancel_handler(void)716 setup_cancel_handler(void)
717 {
718 if (!signal_info.handler_set)
719 {
720 signal_info.handler_set = true;
721
722 InitializeCriticalSection(&signal_info_lock);
723
724 SetConsoleCtrlHandler(consoleHandler, TRUE);
725 }
726 }
727
728 #endif /* WIN32 */
729
730
731 /*
732 * set_archive_cancel_info
733 *
734 * Fill AH->connCancel with cancellation info for the specified database
735 * connection; or clear it if conn is NULL.
736 */
737 void
set_archive_cancel_info(ArchiveHandle * AH,PGconn * conn)738 set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
739 {
740 PGcancel *oldConnCancel;
741
742 /*
743 * Activate the interrupt handler if we didn't yet in this process. On
744 * Windows, this also initializes signal_info_lock; therefore it's
745 * important that this happen at least once before we fork off any
746 * threads.
747 */
748 setup_cancel_handler();
749
750 /*
751 * On Unix, we assume that storing a pointer value is atomic with respect
752 * to any possible signal interrupt. On Windows, use a critical section.
753 */
754
755 #ifdef WIN32
756 EnterCriticalSection(&signal_info_lock);
757 #endif
758
759 /* Free the old one if we have one */
760 oldConnCancel = AH->connCancel;
761 /* be sure interrupt handler doesn't use pointer while freeing */
762 AH->connCancel = NULL;
763
764 if (oldConnCancel != NULL)
765 PQfreeCancel(oldConnCancel);
766
767 /* Set the new one if specified */
768 if (conn)
769 AH->connCancel = PQgetCancel(conn);
770
771 /*
772 * On Unix, there's only ever one active ArchiveHandle per process, so we
773 * can just set signal_info.myAH unconditionally. On Windows, do that
774 * only in the main thread; worker threads have to make sure their
775 * ArchiveHandle appears in the pstate data, which is dealt with in
776 * RunWorker().
777 */
778 #ifndef WIN32
779 signal_info.myAH = AH;
780 #else
781 if (mainThreadId == GetCurrentThreadId())
782 signal_info.myAH = AH;
783 #endif
784
785 #ifdef WIN32
786 LeaveCriticalSection(&signal_info_lock);
787 #endif
788 }
789
790 /*
791 * set_cancel_pstate
792 *
793 * Set signal_info.pstate to point to the specified ParallelState, if any.
794 * We need this mainly to have an interlock against Windows signal thread.
795 */
796 static void
set_cancel_pstate(ParallelState * pstate)797 set_cancel_pstate(ParallelState *pstate)
798 {
799 #ifdef WIN32
800 EnterCriticalSection(&signal_info_lock);
801 #endif
802
803 signal_info.pstate = pstate;
804
805 #ifdef WIN32
806 LeaveCriticalSection(&signal_info_lock);
807 #endif
808 }
809
810 /*
811 * set_cancel_slot_archive
812 *
813 * Set ParallelSlot's AH field to point to the specified archive, if any.
814 * We need this mainly to have an interlock against Windows signal thread.
815 */
816 static void
set_cancel_slot_archive(ParallelSlot * slot,ArchiveHandle * AH)817 set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
818 {
819 #ifdef WIN32
820 EnterCriticalSection(&signal_info_lock);
821 #endif
822
823 slot->AH = AH;
824
825 #ifdef WIN32
826 LeaveCriticalSection(&signal_info_lock);
827 #endif
828 }
829
830
831 /*
832 * This function is called by both Unix and Windows variants to set up
833 * and run a worker process. Caller should exit the process (or thread)
834 * upon return.
835 */
836 static void
RunWorker(ArchiveHandle * AH,ParallelSlot * slot)837 RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
838 {
839 int pipefd[2];
840
841 /* fetch child ends of pipes */
842 pipefd[PIPE_READ] = slot->pipeRevRead;
843 pipefd[PIPE_WRITE] = slot->pipeRevWrite;
844
845 /*
846 * Clone the archive so that we have our own state to work with, and in
847 * particular our own database connection.
848 *
849 * We clone on Unix as well as Windows, even though technically we don't
850 * need to because fork() gives us a copy in our own address space
851 * already. But CloneArchive resets the state information and also clones
852 * the database connection which both seem kinda helpful.
853 */
854 AH = CloneArchive(AH);
855
856 /* Remember cloned archive where signal handler can find it */
857 set_cancel_slot_archive(slot, AH);
858
859 /*
860 * Call the setup worker function that's defined in the ArchiveHandle.
861 */
862 (AH->SetupWorkerPtr) ((Archive *) AH);
863
864 /*
865 * Execute commands until done.
866 */
867 WaitForCommands(AH, pipefd);
868
869 /*
870 * Disconnect from database and clean up.
871 */
872 set_cancel_slot_archive(slot, NULL);
873 DisconnectDatabase(&(AH->public));
874 DeCloneArchive(AH);
875 }
876
877 /*
878 * Thread base function for Windows
879 */
880 #ifdef WIN32
881 static unsigned __stdcall
init_spawned_worker_win32(WorkerInfo * wi)882 init_spawned_worker_win32(WorkerInfo *wi)
883 {
884 ArchiveHandle *AH = wi->AH;
885 ParallelSlot *slot = wi->slot;
886
887 /* Don't need WorkerInfo anymore */
888 free(wi);
889
890 /* Run the worker ... */
891 RunWorker(AH, slot);
892
893 /* Exit the thread */
894 _endthreadex(0);
895 return 0;
896 }
897 #endif /* WIN32 */
898
899 /*
900 * This function starts a parallel dump or restore by spawning off the worker
901 * processes. For Windows, it creates a number of threads; on Unix the
902 * workers are created with fork().
903 */
904 ParallelState *
ParallelBackupStart(ArchiveHandle * AH)905 ParallelBackupStart(ArchiveHandle *AH)
906 {
907 ParallelState *pstate;
908 int i;
909
910 Assert(AH->public.numWorkers > 0);
911
912 pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
913
914 pstate->numWorkers = AH->public.numWorkers;
915 pstate->te = NULL;
916 pstate->parallelSlot = NULL;
917
918 if (AH->public.numWorkers == 1)
919 return pstate;
920
921 /* Create status arrays, being sure to initialize all fields to 0 */
922 pstate->te = (TocEntry **)
923 pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
924 pstate->parallelSlot = (ParallelSlot *)
925 pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
926
927 #ifdef WIN32
928 /* Make fmtId() and fmtQualifiedId() use thread-local storage */
929 getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
930 #endif
931
932 /*
933 * Set the pstate in shutdown_info, to tell the exit handler that it must
934 * clean up workers as well as the main database connection. But we don't
935 * set this in signal_info yet, because we don't want child processes to
936 * inherit non-NULL signal_info.pstate.
937 */
938 shutdown_info.pstate = pstate;
939
940 /*
941 * Temporarily disable query cancellation on the master connection. This
942 * ensures that child processes won't inherit valid AH->connCancel
943 * settings and thus won't try to issue cancels against the master's
944 * connection. No harm is done if we fail while it's disabled, because
945 * the master connection is idle at this point anyway.
946 */
947 set_archive_cancel_info(AH, NULL);
948
949 /* Ensure stdio state is quiesced before forking */
950 fflush(NULL);
951
952 /* Create desired number of workers */
953 for (i = 0; i < pstate->numWorkers; i++)
954 {
955 #ifdef WIN32
956 WorkerInfo *wi;
957 uintptr_t handle;
958 #else
959 pid_t pid;
960 #endif
961 ParallelSlot *slot = &(pstate->parallelSlot[i]);
962 int pipeMW[2],
963 pipeWM[2];
964
965 /* Create communication pipes for this worker */
966 if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
967 exit_horribly(modulename,
968 "could not create communication channels: %s\n",
969 strerror(errno));
970
971 /* master's ends of the pipes */
972 slot->pipeRead = pipeWM[PIPE_READ];
973 slot->pipeWrite = pipeMW[PIPE_WRITE];
974 /* child's ends of the pipes */
975 slot->pipeRevRead = pipeMW[PIPE_READ];
976 slot->pipeRevWrite = pipeWM[PIPE_WRITE];
977
978 #ifdef WIN32
979 /* Create transient structure to pass args to worker function */
980 wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
981
982 wi->AH = AH;
983 wi->slot = slot;
984
985 handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
986 wi, 0, &(slot->threadId));
987 slot->hThread = handle;
988 slot->workerStatus = WRKR_IDLE;
989 #else /* !WIN32 */
990 pid = fork();
991 if (pid == 0)
992 {
993 /* we are the worker */
994 int j;
995
996 /* this is needed for GetMyPSlot() */
997 slot->pid = getpid();
998
999 /* instruct signal handler that we're in a worker now */
1000 signal_info.am_worker = true;
1001
1002 /* close read end of Worker -> Master */
1003 closesocket(pipeWM[PIPE_READ]);
1004 /* close write end of Master -> Worker */
1005 closesocket(pipeMW[PIPE_WRITE]);
1006
1007 /*
1008 * Close all inherited fds for communication of the master with
1009 * previously-forked workers.
1010 */
1011 for (j = 0; j < i; j++)
1012 {
1013 closesocket(pstate->parallelSlot[j].pipeRead);
1014 closesocket(pstate->parallelSlot[j].pipeWrite);
1015 }
1016
1017 /* Run the worker ... */
1018 RunWorker(AH, slot);
1019
1020 /* We can just exit(0) when done */
1021 exit(0);
1022 }
1023 else if (pid < 0)
1024 {
1025 /* fork failed */
1026 exit_horribly(modulename,
1027 "could not create worker process: %s\n",
1028 strerror(errno));
1029 }
1030
1031 /* In Master after successful fork */
1032 slot->pid = pid;
1033 slot->workerStatus = WRKR_IDLE;
1034
1035 /* close read end of Master -> Worker */
1036 closesocket(pipeMW[PIPE_READ]);
1037 /* close write end of Worker -> Master */
1038 closesocket(pipeWM[PIPE_WRITE]);
1039 #endif /* WIN32 */
1040 }
1041
1042 /*
1043 * Having forked off the workers, disable SIGPIPE so that master isn't
1044 * killed if it tries to send a command to a dead worker. We don't want
1045 * the workers to inherit this setting, though.
1046 */
1047 #ifndef WIN32
1048 pqsignal(SIGPIPE, SIG_IGN);
1049 #endif
1050
1051 /*
1052 * Re-establish query cancellation on the master connection.
1053 */
1054 set_archive_cancel_info(AH, AH->connection);
1055
1056 /*
1057 * Tell the cancel signal handler to forward signals to worker processes,
1058 * too. (As with query cancel, we did not need this earlier because the
1059 * workers have not yet been given anything to do; if we die before this
1060 * point, any already-started workers will see EOF and quit promptly.)
1061 */
1062 set_cancel_pstate(pstate);
1063
1064 return pstate;
1065 }
1066
1067 /*
1068 * Close down a parallel dump or restore.
1069 */
1070 void
ParallelBackupEnd(ArchiveHandle * AH,ParallelState * pstate)1071 ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
1072 {
1073 int i;
1074
1075 /* No work if non-parallel */
1076 if (pstate->numWorkers == 1)
1077 return;
1078
1079 /* There should not be any unfinished jobs */
1080 Assert(IsEveryWorkerIdle(pstate));
1081
1082 /* Close the sockets so that the workers know they can exit */
1083 for (i = 0; i < pstate->numWorkers; i++)
1084 {
1085 closesocket(pstate->parallelSlot[i].pipeRead);
1086 closesocket(pstate->parallelSlot[i].pipeWrite);
1087 }
1088
1089 /* Wait for them to exit */
1090 WaitForTerminatingWorkers(pstate);
1091
1092 /*
1093 * Unlink pstate from shutdown_info, so the exit handler will not try to
1094 * use it; and likewise unlink from signal_info.
1095 */
1096 shutdown_info.pstate = NULL;
1097 set_cancel_pstate(NULL);
1098
1099 /* Release state (mere neatnik-ism, since we're about to terminate) */
1100 free(pstate->te);
1101 free(pstate->parallelSlot);
1102 free(pstate);
1103 }
1104
1105 /*
1106 * These next four functions handle construction and parsing of the command
1107 * strings and response strings for parallel workers.
1108 *
1109 * Currently, these can be the same regardless of which archive format we are
1110 * processing. In future, we might want to let format modules override these
1111 * functions to add format-specific data to a command or response.
1112 */
1113
1114 /*
1115 * buildWorkerCommand: format a command string to send to a worker.
1116 *
1117 * The string is built in the caller-supplied buffer of size buflen.
1118 */
1119 static void
buildWorkerCommand(ArchiveHandle * AH,TocEntry * te,T_Action act,char * buf,int buflen)1120 buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act,
1121 char *buf, int buflen)
1122 {
1123 if (act == ACT_DUMP)
1124 snprintf(buf, buflen, "DUMP %d", te->dumpId);
1125 else if (act == ACT_RESTORE)
1126 snprintf(buf, buflen, "RESTORE %d", te->dumpId);
1127 else
1128 Assert(false);
1129 }
1130
1131 /*
1132 * parseWorkerCommand: interpret a command string in a worker.
1133 */
1134 static void
parseWorkerCommand(ArchiveHandle * AH,TocEntry ** te,T_Action * act,const char * msg)1135 parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act,
1136 const char *msg)
1137 {
1138 DumpId dumpId;
1139 int nBytes;
1140
1141 if (messageStartsWith(msg, "DUMP "))
1142 {
1143 *act = ACT_DUMP;
1144 sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
1145 Assert(nBytes == strlen(msg));
1146 *te = getTocEntryByDumpId(AH, dumpId);
1147 Assert(*te != NULL);
1148 }
1149 else if (messageStartsWith(msg, "RESTORE "))
1150 {
1151 *act = ACT_RESTORE;
1152 sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
1153 Assert(nBytes == strlen(msg));
1154 *te = getTocEntryByDumpId(AH, dumpId);
1155 Assert(*te != NULL);
1156 }
1157 else
1158 exit_horribly(modulename,
1159 "unrecognized command received from master: \"%s\"\n",
1160 msg);
1161 }
1162
1163 /*
1164 * buildWorkerResponse: format a response string to send to the master.
1165 *
1166 * The string is built in the caller-supplied buffer of size buflen.
1167 */
1168 static void
buildWorkerResponse(ArchiveHandle * AH,TocEntry * te,T_Action act,int status,char * buf,int buflen)1169 buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status,
1170 char *buf, int buflen)
1171 {
1172 snprintf(buf, buflen, "OK %d %d %d",
1173 te->dumpId,
1174 status,
1175 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
1176 }
1177
1178 /*
1179 * parseWorkerResponse: parse the status message returned by a worker.
1180 *
1181 * Returns the integer status code, and may update fields of AH and/or te.
1182 */
1183 static int
parseWorkerResponse(ArchiveHandle * AH,TocEntry * te,const char * msg)1184 parseWorkerResponse(ArchiveHandle *AH, TocEntry *te,
1185 const char *msg)
1186 {
1187 DumpId dumpId;
1188 int nBytes,
1189 n_errors;
1190 int status = 0;
1191
1192 if (messageStartsWith(msg, "OK "))
1193 {
1194 sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1195
1196 Assert(dumpId == te->dumpId);
1197 Assert(nBytes == strlen(msg));
1198
1199 AH->public.n_errors += n_errors;
1200 }
1201 else
1202 exit_horribly(modulename,
1203 "invalid message received from worker: \"%s\"\n",
1204 msg);
1205
1206 return status;
1207 }
1208
1209 /*
1210 * Dispatch a job to some free worker.
1211 *
1212 * te is the TocEntry to be processed, act is the action to be taken on it.
1213 * callback is the function to call on completion of the job.
1214 *
1215 * If no worker is currently available, this will block, and previously
1216 * registered callback functions may be called.
1217 */
1218 void
DispatchJobForTocEntry(ArchiveHandle * AH,ParallelState * pstate,TocEntry * te,T_Action act,ParallelCompletionPtr callback,void * callback_data)1219 DispatchJobForTocEntry(ArchiveHandle *AH,
1220 ParallelState *pstate,
1221 TocEntry *te,
1222 T_Action act,
1223 ParallelCompletionPtr callback,
1224 void *callback_data)
1225 {
1226 int worker;
1227 char buf[256];
1228
1229 /* Get a worker, waiting if none are idle */
1230 while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
1231 WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
1232
1233 /* Construct and send command string */
1234 buildWorkerCommand(AH, te, act, buf, sizeof(buf));
1235
1236 sendMessageToWorker(pstate, worker, buf);
1237
1238 /* Remember worker is busy, and which TocEntry it's working on */
1239 pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
1240 pstate->parallelSlot[worker].callback = callback;
1241 pstate->parallelSlot[worker].callback_data = callback_data;
1242 pstate->te[worker] = te;
1243 }
1244
1245 /*
1246 * Find an idle worker and return its slot number.
1247 * Return NO_SLOT if none are idle.
1248 */
1249 static int
GetIdleWorker(ParallelState * pstate)1250 GetIdleWorker(ParallelState *pstate)
1251 {
1252 int i;
1253
1254 for (i = 0; i < pstate->numWorkers; i++)
1255 {
1256 if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
1257 return i;
1258 }
1259 return NO_SLOT;
1260 }
1261
1262 /*
1263 * Return true iff no worker is running.
1264 */
1265 static bool
HasEveryWorkerTerminated(ParallelState * pstate)1266 HasEveryWorkerTerminated(ParallelState *pstate)
1267 {
1268 int i;
1269
1270 for (i = 0; i < pstate->numWorkers; i++)
1271 {
1272 if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1273 return false;
1274 }
1275 return true;
1276 }
1277
1278 /*
1279 * Return true iff every worker is in the WRKR_IDLE state.
1280 */
1281 bool
IsEveryWorkerIdle(ParallelState * pstate)1282 IsEveryWorkerIdle(ParallelState *pstate)
1283 {
1284 int i;
1285
1286 for (i = 0; i < pstate->numWorkers; i++)
1287 {
1288 if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
1289 return false;
1290 }
1291 return true;
1292 }
1293
1294 /*
1295 * Acquire lock on a table to be dumped by a worker process.
1296 *
1297 * The master process is already holding an ACCESS SHARE lock. Ordinarily
1298 * it's no problem for a worker to get one too, but if anything else besides
1299 * pg_dump is running, there's a possible deadlock:
1300 *
1301 * 1) Master dumps the schema and locks all tables in ACCESS SHARE mode.
1302 * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
1303 * because the master holds a conflicting ACCESS SHARE lock).
1304 * 3) A worker process also requests an ACCESS SHARE lock to read the table.
1305 * The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
1306 * 4) Now we have a deadlock, since the master is effectively waiting for
1307 * the worker. The server cannot detect that, however.
1308 *
1309 * To prevent an infinite wait, prior to touching a table in a worker, request
1310 * a lock in ACCESS SHARE mode but with NOWAIT. If we don't get the lock,
1311 * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
1312 * so we have a deadlock. We must fail the backup in that case.
1313 */
1314 static void
lockTableForWorker(ArchiveHandle * AH,TocEntry * te)1315 lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
1316 {
1317 const char *qualId;
1318 PQExpBuffer query;
1319 PGresult *res;
1320
1321 /* Nothing to do for BLOBS */
1322 if (strcmp(te->desc, "BLOBS") == 0)
1323 return;
1324
1325 query = createPQExpBuffer();
1326
1327 qualId = fmtQualifiedId(AH->public.remoteVersion, te->namespace, te->tag);
1328
1329 appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
1330 qualId);
1331
1332 res = PQexec(AH->connection, query->data);
1333
1334 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
1335 exit_horribly(modulename,
1336 "could not obtain lock on relation \"%s\"\n"
1337 "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1338 "on the table after the pg_dump parent process had gotten the "
1339 "initial ACCESS SHARE lock on the table.\n", qualId);
1340
1341 PQclear(res);
1342 destroyPQExpBuffer(query);
1343 }
1344
1345 /*
1346 * WaitForCommands: main routine for a worker process.
1347 *
1348 * Read and execute commands from the master until we see EOF on the pipe.
1349 */
1350 static void
WaitForCommands(ArchiveHandle * AH,int pipefd[2])1351 WaitForCommands(ArchiveHandle *AH, int pipefd[2])
1352 {
1353 char *command;
1354 TocEntry *te;
1355 T_Action act;
1356 int status = 0;
1357 char buf[256];
1358
1359 for (;;)
1360 {
1361 if (!(command = getMessageFromMaster(pipefd)))
1362 {
1363 /* EOF, so done */
1364 return;
1365 }
1366
1367 /* Decode the command */
1368 parseWorkerCommand(AH, &te, &act, command);
1369
1370 if (act == ACT_DUMP)
1371 {
1372 /* Acquire lock on this table within the worker's session */
1373 lockTableForWorker(AH, te);
1374
1375 /* Perform the dump command */
1376 status = (AH->WorkerJobDumpPtr) (AH, te);
1377 }
1378 else if (act == ACT_RESTORE)
1379 {
1380 /* Perform the restore command */
1381 status = (AH->WorkerJobRestorePtr) (AH, te);
1382 }
1383 else
1384 Assert(false);
1385
1386 /* Return status to master */
1387 buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
1388
1389 sendMessageToMaster(pipefd, buf);
1390
1391 /* command was pg_malloc'd and we are responsible for free()ing it. */
1392 free(command);
1393 }
1394 }
1395
1396 /*
1397 * Check for status messages from workers.
1398 *
1399 * If do_wait is true, wait to get a status message; otherwise, just return
1400 * immediately if there is none available.
1401 *
1402 * When we get a status message, we pass the status code to the callback
1403 * function that was specified to DispatchJobForTocEntry, then reset the
1404 * worker status to IDLE.
1405 *
1406 * Returns true if we collected a status message, else false.
1407 *
1408 * XXX is it worth checking for more than one status message per call?
1409 * It seems somewhat unlikely that multiple workers would finish at exactly
1410 * the same time.
1411 */
1412 static bool
ListenToWorkers(ArchiveHandle * AH,ParallelState * pstate,bool do_wait)1413 ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
1414 {
1415 int worker;
1416 char *msg;
1417
1418 /* Try to collect a status message */
1419 msg = getMessageFromWorker(pstate, do_wait, &worker);
1420
1421 if (!msg)
1422 {
1423 /* If do_wait is true, we must have detected EOF on some socket */
1424 if (do_wait)
1425 exit_horribly(modulename, "a worker process died unexpectedly\n");
1426 return false;
1427 }
1428
1429 /* Process it and update our idea of the worker's status */
1430 if (messageStartsWith(msg, "OK "))
1431 {
1432 ParallelSlot *slot = &pstate->parallelSlot[worker];
1433 TocEntry *te = pstate->te[worker];
1434 int status;
1435
1436 status = parseWorkerResponse(AH, te, msg);
1437 slot->callback(AH, te, status, slot->callback_data);
1438 slot->workerStatus = WRKR_IDLE;
1439 pstate->te[worker] = NULL;
1440 }
1441 else
1442 exit_horribly(modulename,
1443 "invalid message received from worker: \"%s\"\n",
1444 msg);
1445
1446 /* Free the string returned from getMessageFromWorker */
1447 free(msg);
1448
1449 return true;
1450 }
1451
1452 /*
1453 * Check for status results from workers, waiting if necessary.
1454 *
1455 * Available wait modes are:
1456 * WFW_NO_WAIT: reap any available status, but don't block
1457 * WFW_GOT_STATUS: wait for at least one more worker to finish
1458 * WFW_ONE_IDLE: wait for at least one worker to be idle
1459 * WFW_ALL_IDLE: wait for all workers to be idle
1460 *
1461 * Any received results are passed to the callback specified to
1462 * DispatchJobForTocEntry.
1463 *
1464 * This function is executed in the master process.
1465 */
1466 void
WaitForWorkers(ArchiveHandle * AH,ParallelState * pstate,WFW_WaitOption mode)1467 WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
1468 {
1469 bool do_wait = false;
1470
1471 /*
1472 * In GOT_STATUS mode, always block waiting for a message, since we can't
1473 * return till we get something. In other modes, we don't block the first
1474 * time through the loop.
1475 */
1476 if (mode == WFW_GOT_STATUS)
1477 {
1478 /* Assert that caller knows what it's doing */
1479 Assert(!IsEveryWorkerIdle(pstate));
1480 do_wait = true;
1481 }
1482
1483 for (;;)
1484 {
1485 /*
1486 * Check for status messages, even if we don't need to block. We do
1487 * not try very hard to reap all available messages, though, since
1488 * there's unlikely to be more than one.
1489 */
1490 if (ListenToWorkers(AH, pstate, do_wait))
1491 {
1492 /*
1493 * If we got a message, we are done by definition for GOT_STATUS
1494 * mode, and we can also be certain that there's at least one idle
1495 * worker. So we're done in all but ALL_IDLE mode.
1496 */
1497 if (mode != WFW_ALL_IDLE)
1498 return;
1499 }
1500
1501 /* Check whether we must wait for new status messages */
1502 switch (mode)
1503 {
1504 case WFW_NO_WAIT:
1505 return; /* never wait */
1506 case WFW_GOT_STATUS:
1507 Assert(false); /* can't get here, because we waited */
1508 break;
1509 case WFW_ONE_IDLE:
1510 if (GetIdleWorker(pstate) != NO_SLOT)
1511 return;
1512 break;
1513 case WFW_ALL_IDLE:
1514 if (IsEveryWorkerIdle(pstate))
1515 return;
1516 break;
1517 }
1518
1519 /* Loop back, and this time wait for something to happen */
1520 do_wait = true;
1521 }
1522 }
1523
1524 /*
1525 * Read one command message from the master, blocking if necessary
1526 * until one is available, and return it as a malloc'd string.
1527 * On EOF, return NULL.
1528 *
1529 * This function is executed in worker processes.
1530 */
1531 static char *
getMessageFromMaster(int pipefd[2])1532 getMessageFromMaster(int pipefd[2])
1533 {
1534 return readMessageFromPipe(pipefd[PIPE_READ]);
1535 }
1536
1537 /*
1538 * Send a status message to the master.
1539 *
1540 * This function is executed in worker processes.
1541 */
1542 static void
sendMessageToMaster(int pipefd[2],const char * str)1543 sendMessageToMaster(int pipefd[2], const char *str)
1544 {
1545 int len = strlen(str) + 1;
1546
1547 if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
1548 exit_horribly(modulename,
1549 "could not write to the communication channel: %s\n",
1550 strerror(errno));
1551 }
1552
1553 /*
1554 * Wait until some descriptor in "workerset" becomes readable.
1555 * Returns -1 on error, else the number of readable descriptors.
1556 */
1557 static int
select_loop(int maxFd,fd_set * workerset)1558 select_loop(int maxFd, fd_set *workerset)
1559 {
1560 int i;
1561 fd_set saveSet = *workerset;
1562
1563 for (;;)
1564 {
1565 *workerset = saveSet;
1566 i = select(maxFd + 1, workerset, NULL, NULL, NULL);
1567
1568 #ifndef WIN32
1569 if (i < 0 && errno == EINTR)
1570 continue;
1571 #else
1572 if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1573 continue;
1574 #endif
1575 break;
1576 }
1577
1578 return i;
1579 }
1580
1581
1582 /*
1583 * Check for messages from worker processes.
1584 *
1585 * If a message is available, return it as a malloc'd string, and put the
1586 * index of the sending worker in *worker.
1587 *
1588 * If nothing is available, wait if "do_wait" is true, else return NULL.
1589 *
1590 * If we detect EOF on any socket, we'll return NULL. It's not great that
1591 * that's hard to distinguish from the no-data-available case, but for now
1592 * our one caller is okay with that.
1593 *
1594 * This function is executed in the master process.
1595 */
1596 static char *
getMessageFromWorker(ParallelState * pstate,bool do_wait,int * worker)1597 getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
1598 {
1599 int i;
1600 fd_set workerset;
1601 int maxFd = -1;
1602 struct timeval nowait = {0, 0};
1603
1604 /* construct bitmap of socket descriptors for select() */
1605 FD_ZERO(&workerset);
1606 for (i = 0; i < pstate->numWorkers; i++)
1607 {
1608 if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1609 continue;
1610 FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
1611 if (pstate->parallelSlot[i].pipeRead > maxFd)
1612 maxFd = pstate->parallelSlot[i].pipeRead;
1613 }
1614
1615 if (do_wait)
1616 {
1617 i = select_loop(maxFd, &workerset);
1618 Assert(i != 0);
1619 }
1620 else
1621 {
1622 if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1623 return NULL;
1624 }
1625
1626 if (i < 0)
1627 exit_horribly(modulename, "select() failed: %s\n", strerror(errno));
1628
1629 for (i = 0; i < pstate->numWorkers; i++)
1630 {
1631 char *msg;
1632
1633 if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1634 continue;
1635 if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
1636 continue;
1637
1638 /*
1639 * Read the message if any. If the socket is ready because of EOF,
1640 * we'll return NULL instead (and the socket will stay ready, so the
1641 * condition will persist).
1642 *
1643 * Note: because this is a blocking read, we'll wait if only part of
1644 * the message is available. Waiting a long time would be bad, but
1645 * since worker status messages are short and are always sent in one
1646 * operation, it shouldn't be a problem in practice.
1647 */
1648 msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
1649 *worker = i;
1650 return msg;
1651 }
1652 Assert(false);
1653 return NULL;
1654 }
1655
1656 /*
1657 * Send a command message to the specified worker process.
1658 *
1659 * This function is executed in the master process.
1660 */
1661 static void
sendMessageToWorker(ParallelState * pstate,int worker,const char * str)1662 sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
1663 {
1664 int len = strlen(str) + 1;
1665
1666 if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
1667 {
1668 exit_horribly(modulename,
1669 "could not write to the communication channel: %s\n",
1670 strerror(errno));
1671 }
1672 }
1673
1674 /*
1675 * Read one message from the specified pipe (fd), blocking if necessary
1676 * until one is available, and return it as a malloc'd string.
1677 * On EOF, return NULL.
1678 *
1679 * A "message" on the channel is just a null-terminated string.
1680 */
1681 static char *
readMessageFromPipe(int fd)1682 readMessageFromPipe(int fd)
1683 {
1684 char *msg;
1685 int msgsize,
1686 bufsize;
1687 int ret;
1688
1689 /*
1690 * In theory, if we let piperead() read multiple bytes, it might give us
1691 * back fragments of multiple messages. (That can't actually occur, since
1692 * neither master nor workers send more than one message without waiting
1693 * for a reply, but we don't wish to assume that here.) For simplicity,
1694 * read a byte at a time until we get the terminating '\0'. This method
1695 * is a bit inefficient, but since this is only used for relatively short
1696 * command and status strings, it shouldn't matter.
1697 */
1698 bufsize = 64; /* could be any number */
1699 msg = (char *) pg_malloc(bufsize);
1700 msgsize = 0;
1701 for (;;)
1702 {
1703 Assert(msgsize < bufsize);
1704 ret = piperead(fd, msg + msgsize, 1);
1705 if (ret <= 0)
1706 break; /* error or connection closure */
1707
1708 Assert(ret == 1);
1709
1710 if (msg[msgsize] == '\0')
1711 return msg; /* collected whole message */
1712
1713 msgsize++;
1714 if (msgsize == bufsize) /* enlarge buffer if needed */
1715 {
1716 bufsize += 16; /* could be any number */
1717 msg = (char *) pg_realloc(msg, bufsize);
1718 }
1719 }
1720
1721 /* Other end has closed the connection */
1722 pg_free(msg);
1723 return NULL;
1724 }
1725
1726 #ifdef WIN32
1727
1728 /*
1729 * This is a replacement version of pipe(2) for Windows which allows the pipe
1730 * handles to be used in select().
1731 *
1732 * Reads and writes on the pipe must go through piperead()/pipewrite().
1733 *
1734 * For consistency with Unix we declare the returned handles as "int".
1735 * This is okay even on WIN64 because system handles are not more than
1736 * 32 bits wide, but we do have to do some casting.
1737 */
1738 static int
pgpipe(int handles[2])1739 pgpipe(int handles[2])
1740 {
1741 pgsocket s,
1742 tmp_sock;
1743 struct sockaddr_in serv_addr;
1744 int len = sizeof(serv_addr);
1745
1746 /* We have to use the Unix socket invalid file descriptor value here. */
1747 handles[0] = handles[1] = -1;
1748
1749 /*
1750 * setup listen socket
1751 */
1752 if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1753 {
1754 write_msg(modulename, "pgpipe: could not create socket: error code %d\n",
1755 WSAGetLastError());
1756 return -1;
1757 }
1758
1759 memset((void *) &serv_addr, 0, sizeof(serv_addr));
1760 serv_addr.sin_family = AF_INET;
1761 serv_addr.sin_port = htons(0);
1762 serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1763 if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1764 {
1765 write_msg(modulename, "pgpipe: could not bind: error code %d\n",
1766 WSAGetLastError());
1767 closesocket(s);
1768 return -1;
1769 }
1770 if (listen(s, 1) == SOCKET_ERROR)
1771 {
1772 write_msg(modulename, "pgpipe: could not listen: error code %d\n",
1773 WSAGetLastError());
1774 closesocket(s);
1775 return -1;
1776 }
1777 if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
1778 {
1779 write_msg(modulename, "pgpipe: getsockname() failed: error code %d\n",
1780 WSAGetLastError());
1781 closesocket(s);
1782 return -1;
1783 }
1784
1785 /*
1786 * setup pipe handles
1787 */
1788 if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1789 {
1790 write_msg(modulename, "pgpipe: could not create second socket: error code %d\n",
1791 WSAGetLastError());
1792 closesocket(s);
1793 return -1;
1794 }
1795 handles[1] = (int) tmp_sock;
1796
1797 if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1798 {
1799 write_msg(modulename, "pgpipe: could not connect socket: error code %d\n",
1800 WSAGetLastError());
1801 closesocket(handles[1]);
1802 handles[1] = -1;
1803 closesocket(s);
1804 return -1;
1805 }
1806 if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
1807 {
1808 write_msg(modulename, "pgpipe: could not accept connection: error code %d\n",
1809 WSAGetLastError());
1810 closesocket(handles[1]);
1811 handles[1] = -1;
1812 closesocket(s);
1813 return -1;
1814 }
1815 handles[0] = (int) tmp_sock;
1816
1817 closesocket(s);
1818 return 0;
1819 }
1820
1821 /*
1822 * Windows implementation of reading from a pipe.
1823 */
1824 static int
piperead(int s,char * buf,int len)1825 piperead(int s, char *buf, int len)
1826 {
1827 int ret = recv(s, buf, len, 0);
1828
1829 if (ret < 0 && WSAGetLastError() == WSAECONNRESET)
1830 {
1831 /* EOF on the pipe! */
1832 ret = 0;
1833 }
1834 return ret;
1835 }
1836
1837 #endif /* WIN32 */
1838