1 /*-------------------------------------------------------------------------
2 *
3 * parallel.c
4 *
5 * Parallel support for pg_dump and pg_restore
6 *
7 * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/bin/pg_dump/parallel.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16 /*
17 * Parallel operation works like this:
18 *
19 * The original, master process calls ParallelBackupStart(), which forks off
20 * the desired number of worker processes, which each enter WaitForCommands().
21 *
22 * The master process dispatches an individual work item to one of the worker
23 * processes in DispatchJobForTocEntry(). We send a command string such as
24 * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
25 * The worker process receives and decodes the command and passes it to the
26 * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
27 * which are routines of the current archive format. That routine performs
28 * the required action (dump or restore) and returns an integer status code.
29 * This is passed back to the master where we pass it to the
30 * ParallelCompletionPtr callback function that was passed to
31 * DispatchJobForTocEntry(). The callback function does state updating
32 * for the master control logic in pg_backup_archiver.c.
33 *
34 * In principle additional archive-format-specific information might be needed
35 * in commands or worker status responses, but so far that hasn't proved
36 * necessary, since workers have full copies of the ArchiveHandle/TocEntry
37 * data structures. Remember that we have forked off the workers only after
38 * we have read in the catalog. That's why our worker processes can also
39 * access the catalog information. (In the Windows case, the workers are
40 * threads in the same process. To avoid problems, they work with cloned
41 * copies of the Archive data structure; see RunWorker().)
42 *
43 * In the master process, the workerStatus field for each worker has one of
44 * the following values:
45 * WRKR_NOT_STARTED: we've not yet forked this worker
46 * WRKR_IDLE: it's waiting for a command
47 * WRKR_WORKING: it's working on a command
48 * WRKR_TERMINATED: process ended
49 * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
50 * state, and must be NULL in other states.
51 */
52
53 #include "postgres_fe.h"
54
55 #ifndef WIN32
56 #include <sys/wait.h>
57 #include <signal.h>
58 #include <unistd.h>
59 #include <fcntl.h>
60 #endif
61 #ifdef HAVE_SYS_SELECT_H
62 #include <sys/select.h>
63 #endif
64
65 #include "parallel.h"
66 #include "pg_backup_utils.h"
67
68 #include "fe_utils/string_utils.h"
69 #include "port/pg_bswap.h"
70
71 /* Mnemonic macros for indexing the fd array returned by pipe(2) */
72 #define PIPE_READ 0
73 #define PIPE_WRITE 1
74
75 #define NO_SLOT (-1) /* Failure result for GetIdleWorker() */
76
77 /* Worker process statuses */
78 typedef enum
79 {
80 WRKR_NOT_STARTED = 0,
81 WRKR_IDLE,
82 WRKR_WORKING,
83 WRKR_TERMINATED
84 } T_WorkerStatus;
85
86 #define WORKER_IS_RUNNING(workerStatus) \
87 ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
88
89 /*
90 * Private per-parallel-worker state (typedef for this is in parallel.h).
91 *
92 * Much of this is valid only in the master process (or, on Windows, should
93 * be touched only by the master thread). But the AH field should be touched
94 * only by workers. The pipe descriptors are valid everywhere.
95 */
96 struct ParallelSlot
97 {
98 T_WorkerStatus workerStatus; /* see enum above */
99
100 /* These fields are valid if workerStatus == WRKR_WORKING: */
101 ParallelCompletionPtr callback; /* function to call on completion */
102 void *callback_data; /* passthrough data for it */
103
104 ArchiveHandle *AH; /* Archive data worker is using */
105
106 int pipeRead; /* master's end of the pipes */
107 int pipeWrite;
108 int pipeRevRead; /* child's end of the pipes */
109 int pipeRevWrite;
110
111 /* Child process/thread identity info: */
112 #ifdef WIN32
113 uintptr_t hThread;
114 unsigned int threadId;
115 #else
116 pid_t pid;
117 #endif
118 };
119
120 #ifdef WIN32
121
122 /*
123 * Structure to hold info passed by _beginthreadex() to the function it calls
124 * via its single allowed argument.
125 */
126 typedef struct
127 {
128 ArchiveHandle *AH; /* master database connection */
129 ParallelSlot *slot; /* this worker's parallel slot */
130 } WorkerInfo;
131
132 /* Windows implementation of pipe access */
133 static int pgpipe(int handles[2]);
134 static int piperead(int s, char *buf, int len);
135 #define pipewrite(a,b,c) send(a,b,c,0)
136
137 #else /* !WIN32 */
138
139 /* Non-Windows implementation of pipe access */
140 #define pgpipe(a) pipe(a)
141 #define piperead(a,b,c) read(a,b,c)
142 #define pipewrite(a,b,c) write(a,b,c)
143
144 #endif /* WIN32 */
145
146 /*
147 * State info for archive_close_connection() shutdown callback.
148 */
149 typedef struct ShutdownInformation
150 {
151 ParallelState *pstate;
152 Archive *AHX;
153 } ShutdownInformation;
154
155 static ShutdownInformation shutdown_info;
156
157 /*
158 * State info for signal handling.
159 * We assume signal_info initializes to zeroes.
160 *
161 * On Unix, myAH is the master DB connection in the master process, and the
162 * worker's own connection in worker processes. On Windows, we have only one
163 * instance of signal_info, so myAH is the master connection and the worker
164 * connections must be dug out of pstate->parallelSlot[].
165 */
166 typedef struct DumpSignalInformation
167 {
168 ArchiveHandle *myAH; /* database connection to issue cancel for */
169 ParallelState *pstate; /* parallel state, if any */
170 bool handler_set; /* signal handler set up in this process? */
171 #ifndef WIN32
172 bool am_worker; /* am I a worker process? */
173 #endif
174 } DumpSignalInformation;
175
176 static volatile DumpSignalInformation signal_info;
177
178 #ifdef WIN32
179 static CRITICAL_SECTION signal_info_lock;
180 #endif
181
182 /*
183 * Write a simple string to stderr --- must be safe in a signal handler.
184 * We ignore the write() result since there's not much we could do about it.
185 * Certain compilers make that harder than it ought to be.
186 */
187 #define write_stderr(str) \
188 do { \
189 const char *str_ = (str); \
190 int rc_; \
191 rc_ = write(fileno(stderr), str_, strlen(str_)); \
192 (void) rc_; \
193 } while (0)
194
195
196 #ifdef WIN32
197 /* file-scope variables */
198 static DWORD tls_index;
199
200 /* globally visible variables (needed by exit_nicely) */
201 bool parallel_init_done = false;
202 DWORD mainThreadId;
203 #endif /* WIN32 */
204
205 static const char *modulename = gettext_noop("parallel archiver");
206
207 /* Local function prototypes */
208 static ParallelSlot *GetMyPSlot(ParallelState *pstate);
209 static void archive_close_connection(int code, void *arg);
210 static void ShutdownWorkersHard(ParallelState *pstate);
211 static void WaitForTerminatingWorkers(ParallelState *pstate);
212 static void setup_cancel_handler(void);
213 static void set_cancel_pstate(ParallelState *pstate);
214 static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
215 static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
216 static int GetIdleWorker(ParallelState *pstate);
217 static bool HasEveryWorkerTerminated(ParallelState *pstate);
218 static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
219 static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
220 static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
221 bool do_wait);
222 static char *getMessageFromMaster(int pipefd[2]);
223 static void sendMessageToMaster(int pipefd[2], const char *str);
224 static int select_loop(int maxFd, fd_set *workerset);
225 static char *getMessageFromWorker(ParallelState *pstate,
226 bool do_wait, int *worker);
227 static void sendMessageToWorker(ParallelState *pstate,
228 int worker, const char *str);
229 static char *readMessageFromPipe(int fd);
230
231 #define messageStartsWith(msg, prefix) \
232 (strncmp(msg, prefix, strlen(prefix)) == 0)
233 #define messageEquals(msg, pattern) \
234 (strcmp(msg, pattern) == 0)
235
236
237 /*
238 * Initialize parallel dump support --- should be called early in process
239 * startup. (Currently, this is called whether or not we intend parallel
240 * activity.)
241 */
242 void
init_parallel_dump_utils(void)243 init_parallel_dump_utils(void)
244 {
245 #ifdef WIN32
246 if (!parallel_init_done)
247 {
248 WSADATA wsaData;
249 int err;
250
251 /* Prepare for threaded operation */
252 tls_index = TlsAlloc();
253 mainThreadId = GetCurrentThreadId();
254
255 /* Initialize socket access */
256 err = WSAStartup(MAKEWORD(2, 2), &wsaData);
257 if (err != 0)
258 {
259 fprintf(stderr, _("%s: WSAStartup failed: %d\n"), progname, err);
260 exit_nicely(1);
261 }
262
263 parallel_init_done = true;
264 }
265 #endif
266 }
267
268 /*
269 * Find the ParallelSlot for the current worker process or thread.
270 *
271 * Returns NULL if no matching slot is found (this implies we're the master).
272 */
273 static ParallelSlot *
GetMyPSlot(ParallelState * pstate)274 GetMyPSlot(ParallelState *pstate)
275 {
276 int i;
277
278 for (i = 0; i < pstate->numWorkers; i++)
279 {
280 #ifdef WIN32
281 if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
282 #else
283 if (pstate->parallelSlot[i].pid == getpid())
284 #endif
285 return &(pstate->parallelSlot[i]);
286 }
287
288 return NULL;
289 }
290
291 /*
292 * A thread-local version of getLocalPQExpBuffer().
293 *
294 * Non-reentrant but reduces memory leakage: we'll consume one buffer per
295 * thread, which is much better than one per fmtId/fmtQualifiedId call.
296 */
297 #ifdef WIN32
298 static PQExpBuffer
getThreadLocalPQExpBuffer(void)299 getThreadLocalPQExpBuffer(void)
300 {
301 /*
302 * The Tls code goes awry if we use a static var, so we provide for both
303 * static and auto, and omit any use of the static var when using Tls. We
304 * rely on TlsGetValue() to return 0 if the value is not yet set.
305 */
306 static PQExpBuffer s_id_return = NULL;
307 PQExpBuffer id_return;
308
309 if (parallel_init_done)
310 id_return = (PQExpBuffer) TlsGetValue(tls_index);
311 else
312 id_return = s_id_return;
313
314 if (id_return) /* first time through? */
315 {
316 /* same buffer, just wipe contents */
317 resetPQExpBuffer(id_return);
318 }
319 else
320 {
321 /* new buffer */
322 id_return = createPQExpBuffer();
323 if (parallel_init_done)
324 TlsSetValue(tls_index, id_return);
325 else
326 s_id_return = id_return;
327 }
328
329 return id_return;
330 }
331 #endif /* WIN32 */
332
333 /*
334 * pg_dump and pg_restore call this to register the cleanup handler
335 * as soon as they've created the ArchiveHandle.
336 */
337 void
on_exit_close_archive(Archive * AHX)338 on_exit_close_archive(Archive *AHX)
339 {
340 shutdown_info.AHX = AHX;
341 on_exit_nicely(archive_close_connection, &shutdown_info);
342 }
343
344 /*
345 * on_exit_nicely handler for shutting down database connections and
346 * worker processes cleanly.
347 */
348 static void
archive_close_connection(int code,void * arg)349 archive_close_connection(int code, void *arg)
350 {
351 ShutdownInformation *si = (ShutdownInformation *) arg;
352
353 if (si->pstate)
354 {
355 /* In parallel mode, must figure out who we are */
356 ParallelSlot *slot = GetMyPSlot(si->pstate);
357
358 if (!slot)
359 {
360 /*
361 * We're the master. Forcibly shut down workers, then close our
362 * own database connection, if any.
363 */
364 ShutdownWorkersHard(si->pstate);
365
366 if (si->AHX)
367 DisconnectDatabase(si->AHX);
368 }
369 else
370 {
371 /*
372 * We're a worker. Shut down our own DB connection if any. On
373 * Windows, we also have to close our communication sockets, to
374 * emulate what will happen on Unix when the worker process exits.
375 * (Without this, if this is a premature exit, the master would
376 * fail to detect it because there would be no EOF condition on
377 * the other end of the pipe.)
378 */
379 if (slot->AH)
380 DisconnectDatabase(&(slot->AH->public));
381
382 #ifdef WIN32
383 closesocket(slot->pipeRevRead);
384 closesocket(slot->pipeRevWrite);
385 #endif
386 }
387 }
388 else
389 {
390 /* Non-parallel operation: just kill the master DB connection */
391 if (si->AHX)
392 DisconnectDatabase(si->AHX);
393 }
394 }
395
396 /*
397 * Forcibly shut down any remaining workers, waiting for them to finish.
398 *
399 * Note that we don't expect to come here during normal exit (the workers
400 * should be long gone, and the ParallelState too). We're only here in an
401 * exit_horribly() situation, so intervening to cancel active commands is
402 * appropriate.
403 */
404 static void
ShutdownWorkersHard(ParallelState * pstate)405 ShutdownWorkersHard(ParallelState *pstate)
406 {
407 int i;
408
409 /*
410 * Close our write end of the sockets so that any workers waiting for
411 * commands know they can exit. (Note: some of the pipeWrite fields might
412 * still be zero, if we failed to initialize all the workers. Hence, just
413 * ignore errors here.)
414 */
415 for (i = 0; i < pstate->numWorkers; i++)
416 closesocket(pstate->parallelSlot[i].pipeWrite);
417
418 /*
419 * Force early termination of any commands currently in progress.
420 */
421 #ifndef WIN32
422 /* On non-Windows, send SIGTERM to each worker process. */
423 for (i = 0; i < pstate->numWorkers; i++)
424 {
425 pid_t pid = pstate->parallelSlot[i].pid;
426
427 if (pid != 0)
428 kill(pid, SIGTERM);
429 }
430 #else
431
432 /*
433 * On Windows, send query cancels directly to the workers' backends. Use
434 * a critical section to ensure worker threads don't change state.
435 */
436 EnterCriticalSection(&signal_info_lock);
437 for (i = 0; i < pstate->numWorkers; i++)
438 {
439 ArchiveHandle *AH = pstate->parallelSlot[i].AH;
440 char errbuf[1];
441
442 if (AH != NULL && AH->connCancel != NULL)
443 (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
444 }
445 LeaveCriticalSection(&signal_info_lock);
446 #endif
447
448 /* Now wait for them to terminate. */
449 WaitForTerminatingWorkers(pstate);
450 }
451
452 /*
453 * Wait for all workers to terminate.
454 */
455 static void
WaitForTerminatingWorkers(ParallelState * pstate)456 WaitForTerminatingWorkers(ParallelState *pstate)
457 {
458 while (!HasEveryWorkerTerminated(pstate))
459 {
460 ParallelSlot *slot = NULL;
461 int j;
462
463 #ifndef WIN32
464 /* On non-Windows, use wait() to wait for next worker to end */
465 int status;
466 pid_t pid = wait(&status);
467
468 /* Find dead worker's slot, and clear the PID field */
469 for (j = 0; j < pstate->numWorkers; j++)
470 {
471 slot = &(pstate->parallelSlot[j]);
472 if (slot->pid == pid)
473 {
474 slot->pid = 0;
475 break;
476 }
477 }
478 #else /* WIN32 */
479 /* On Windows, we must use WaitForMultipleObjects() */
480 HANDLE *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
481 int nrun = 0;
482 DWORD ret;
483 uintptr_t hThread;
484
485 for (j = 0; j < pstate->numWorkers; j++)
486 {
487 if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
488 {
489 lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
490 nrun++;
491 }
492 }
493 ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
494 Assert(ret != WAIT_FAILED);
495 hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
496 free(lpHandles);
497
498 /* Find dead worker's slot, and clear the hThread field */
499 for (j = 0; j < pstate->numWorkers; j++)
500 {
501 slot = &(pstate->parallelSlot[j]);
502 if (slot->hThread == hThread)
503 {
504 /* For cleanliness, close handles for dead threads */
505 CloseHandle((HANDLE) slot->hThread);
506 slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
507 break;
508 }
509 }
510 #endif /* WIN32 */
511
512 /* On all platforms, update workerStatus and te[] as well */
513 Assert(j < pstate->numWorkers);
514 slot->workerStatus = WRKR_TERMINATED;
515 pstate->te[j] = NULL;
516 }
517 }
518
519
520 /*
521 * Code for responding to cancel interrupts (SIGINT, control-C, etc)
522 *
523 * This doesn't quite belong in this module, but it needs access to the
524 * ParallelState data, so there's not really a better place either.
525 *
526 * When we get a cancel interrupt, we could just die, but in pg_restore that
527 * could leave a SQL command (e.g., CREATE INDEX on a large table) running
528 * for a long time. Instead, we try to send a cancel request and then die.
529 * pg_dump probably doesn't really need this, but we might as well use it
530 * there too. Note that sending the cancel directly from the signal handler
531 * is safe because PQcancel() is written to make it so.
532 *
533 * In parallel operation on Unix, each process is responsible for canceling
534 * its own connection (this must be so because nobody else has access to it).
535 * Furthermore, the master process should attempt to forward its signal to
536 * each child. In simple manual use of pg_dump/pg_restore, forwarding isn't
537 * needed because typing control-C at the console would deliver SIGINT to
538 * every member of the terminal process group --- but in other scenarios it
539 * might be that only the master gets signaled.
540 *
541 * On Windows, the cancel handler runs in a separate thread, because that's
542 * how SetConsoleCtrlHandler works. We make it stop worker threads, send
543 * cancels on all active connections, and then return FALSE, which will allow
544 * the process to die. For safety's sake, we use a critical section to
545 * protect the PGcancel structures against being changed while the signal
546 * thread runs.
547 */
548
549 #ifndef WIN32
550
551 /*
552 * Signal handler (Unix only)
553 */
554 static void
sigTermHandler(SIGNAL_ARGS)555 sigTermHandler(SIGNAL_ARGS)
556 {
557 int i;
558 char errbuf[1];
559
560 /*
561 * Some platforms allow delivery of new signals to interrupt an active
562 * signal handler. That could muck up our attempt to send PQcancel, so
563 * disable the signals that setup_cancel_handler enabled.
564 */
565 pqsignal(SIGINT, SIG_IGN);
566 pqsignal(SIGTERM, SIG_IGN);
567 pqsignal(SIGQUIT, SIG_IGN);
568
569 /*
570 * If we're in the master, forward signal to all workers. (It seems best
571 * to do this before PQcancel; killing the master transaction will result
572 * in invalid-snapshot errors from active workers, which maybe we can
573 * quiet by killing workers first.) Ignore any errors.
574 */
575 if (signal_info.pstate != NULL)
576 {
577 for (i = 0; i < signal_info.pstate->numWorkers; i++)
578 {
579 pid_t pid = signal_info.pstate->parallelSlot[i].pid;
580
581 if (pid != 0)
582 kill(pid, SIGTERM);
583 }
584 }
585
586 /*
587 * Send QueryCancel if we have a connection to send to. Ignore errors,
588 * there's not much we can do about them anyway.
589 */
590 if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
591 (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
592
593 /*
594 * Report we're quitting, using nothing more complicated than write(2).
595 * When in parallel operation, only the master process should do this.
596 */
597 if (!signal_info.am_worker)
598 {
599 if (progname)
600 {
601 write_stderr(progname);
602 write_stderr(": ");
603 }
604 write_stderr("terminated by user\n");
605 }
606
607 /*
608 * And die, using _exit() not exit() because the latter will invoke atexit
609 * handlers that can fail if we interrupted related code.
610 */
611 _exit(1);
612 }
613
614 /*
615 * Enable cancel interrupt handler, if not already done.
616 */
617 static void
setup_cancel_handler(void)618 setup_cancel_handler(void)
619 {
620 /*
621 * When forking, signal_info.handler_set will propagate into the new
622 * process, but that's fine because the signal handler state does too.
623 */
624 if (!signal_info.handler_set)
625 {
626 signal_info.handler_set = true;
627
628 pqsignal(SIGINT, sigTermHandler);
629 pqsignal(SIGTERM, sigTermHandler);
630 pqsignal(SIGQUIT, sigTermHandler);
631 }
632 }
633
634 #else /* WIN32 */
635
636 /*
637 * Console interrupt handler --- runs in a newly-started thread.
638 *
639 * After stopping other threads and sending cancel requests on all open
640 * connections, we return FALSE which will allow the default ExitProcess()
641 * action to be taken.
642 */
643 static BOOL WINAPI
consoleHandler(DWORD dwCtrlType)644 consoleHandler(DWORD dwCtrlType)
645 {
646 int i;
647 char errbuf[1];
648
649 if (dwCtrlType == CTRL_C_EVENT ||
650 dwCtrlType == CTRL_BREAK_EVENT)
651 {
652 /* Critical section prevents changing data we look at here */
653 EnterCriticalSection(&signal_info_lock);
654
655 /*
656 * If in parallel mode, stop worker threads and send QueryCancel to
657 * their connected backends. The main point of stopping the worker
658 * threads is to keep them from reporting the query cancels as errors,
659 * which would clutter the user's screen. We needn't stop the master
660 * thread since it won't be doing much anyway. Do this before
661 * canceling the main transaction, else we might get invalid-snapshot
662 * errors reported before we can stop the workers. Ignore errors,
663 * there's not much we can do about them anyway.
664 */
665 if (signal_info.pstate != NULL)
666 {
667 for (i = 0; i < signal_info.pstate->numWorkers; i++)
668 {
669 ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
670 ArchiveHandle *AH = slot->AH;
671 HANDLE hThread = (HANDLE) slot->hThread;
672
673 /*
674 * Using TerminateThread here may leave some resources leaked,
675 * but it doesn't matter since we're about to end the whole
676 * process.
677 */
678 if (hThread != INVALID_HANDLE_VALUE)
679 TerminateThread(hThread, 0);
680
681 if (AH != NULL && AH->connCancel != NULL)
682 (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
683 }
684 }
685
686 /*
687 * Send QueryCancel to master connection, if enabled. Ignore errors,
688 * there's not much we can do about them anyway.
689 */
690 if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
691 (void) PQcancel(signal_info.myAH->connCancel,
692 errbuf, sizeof(errbuf));
693
694 LeaveCriticalSection(&signal_info_lock);
695
696 /*
697 * Report we're quitting, using nothing more complicated than
698 * write(2). (We might be able to get away with using write_msg()
699 * here, but since we terminated other threads uncleanly above, it
700 * seems better to assume as little as possible.)
701 */
702 if (progname)
703 {
704 write_stderr(progname);
705 write_stderr(": ");
706 }
707 write_stderr("terminated by user\n");
708 }
709
710 /* Always return FALSE to allow signal handling to continue */
711 return FALSE;
712 }
713
714 /*
715 * Enable cancel interrupt handler, if not already done.
716 */
717 static void
setup_cancel_handler(void)718 setup_cancel_handler(void)
719 {
720 if (!signal_info.handler_set)
721 {
722 signal_info.handler_set = true;
723
724 InitializeCriticalSection(&signal_info_lock);
725
726 SetConsoleCtrlHandler(consoleHandler, TRUE);
727 }
728 }
729
730 #endif /* WIN32 */
731
732
733 /*
734 * set_archive_cancel_info
735 *
736 * Fill AH->connCancel with cancellation info for the specified database
737 * connection; or clear it if conn is NULL.
738 */
739 void
set_archive_cancel_info(ArchiveHandle * AH,PGconn * conn)740 set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
741 {
742 PGcancel *oldConnCancel;
743
744 /*
745 * Activate the interrupt handler if we didn't yet in this process. On
746 * Windows, this also initializes signal_info_lock; therefore it's
747 * important that this happen at least once before we fork off any
748 * threads.
749 */
750 setup_cancel_handler();
751
752 /*
753 * On Unix, we assume that storing a pointer value is atomic with respect
754 * to any possible signal interrupt. On Windows, use a critical section.
755 */
756
757 #ifdef WIN32
758 EnterCriticalSection(&signal_info_lock);
759 #endif
760
761 /* Free the old one if we have one */
762 oldConnCancel = AH->connCancel;
763 /* be sure interrupt handler doesn't use pointer while freeing */
764 AH->connCancel = NULL;
765
766 if (oldConnCancel != NULL)
767 PQfreeCancel(oldConnCancel);
768
769 /* Set the new one if specified */
770 if (conn)
771 AH->connCancel = PQgetCancel(conn);
772
773 /*
774 * On Unix, there's only ever one active ArchiveHandle per process, so we
775 * can just set signal_info.myAH unconditionally. On Windows, do that
776 * only in the main thread; worker threads have to make sure their
777 * ArchiveHandle appears in the pstate data, which is dealt with in
778 * RunWorker().
779 */
780 #ifndef WIN32
781 signal_info.myAH = AH;
782 #else
783 if (mainThreadId == GetCurrentThreadId())
784 signal_info.myAH = AH;
785 #endif
786
787 #ifdef WIN32
788 LeaveCriticalSection(&signal_info_lock);
789 #endif
790 }
791
792 /*
793 * set_cancel_pstate
794 *
795 * Set signal_info.pstate to point to the specified ParallelState, if any.
796 * We need this mainly to have an interlock against Windows signal thread.
797 */
798 static void
set_cancel_pstate(ParallelState * pstate)799 set_cancel_pstate(ParallelState *pstate)
800 {
801 #ifdef WIN32
802 EnterCriticalSection(&signal_info_lock);
803 #endif
804
805 signal_info.pstate = pstate;
806
807 #ifdef WIN32
808 LeaveCriticalSection(&signal_info_lock);
809 #endif
810 }
811
812 /*
813 * set_cancel_slot_archive
814 *
815 * Set ParallelSlot's AH field to point to the specified archive, if any.
816 * We need this mainly to have an interlock against Windows signal thread.
817 */
818 static void
set_cancel_slot_archive(ParallelSlot * slot,ArchiveHandle * AH)819 set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
820 {
821 #ifdef WIN32
822 EnterCriticalSection(&signal_info_lock);
823 #endif
824
825 slot->AH = AH;
826
827 #ifdef WIN32
828 LeaveCriticalSection(&signal_info_lock);
829 #endif
830 }
831
832
833 /*
834 * This function is called by both Unix and Windows variants to set up
835 * and run a worker process. Caller should exit the process (or thread)
836 * upon return.
837 */
838 static void
RunWorker(ArchiveHandle * AH,ParallelSlot * slot)839 RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
840 {
841 int pipefd[2];
842
843 /* fetch child ends of pipes */
844 pipefd[PIPE_READ] = slot->pipeRevRead;
845 pipefd[PIPE_WRITE] = slot->pipeRevWrite;
846
847 /*
848 * Clone the archive so that we have our own state to work with, and in
849 * particular our own database connection.
850 *
851 * We clone on Unix as well as Windows, even though technically we don't
852 * need to because fork() gives us a copy in our own address space
853 * already. But CloneArchive resets the state information and also clones
854 * the database connection which both seem kinda helpful.
855 */
856 AH = CloneArchive(AH);
857
858 /* Remember cloned archive where signal handler can find it */
859 set_cancel_slot_archive(slot, AH);
860
861 /*
862 * Call the setup worker function that's defined in the ArchiveHandle.
863 */
864 (AH->SetupWorkerPtr) ((Archive *) AH);
865
866 /*
867 * Execute commands until done.
868 */
869 WaitForCommands(AH, pipefd);
870
871 /*
872 * Disconnect from database and clean up.
873 */
874 set_cancel_slot_archive(slot, NULL);
875 DisconnectDatabase(&(AH->public));
876 DeCloneArchive(AH);
877 }
878
879 /*
880 * Thread base function for Windows
881 */
882 #ifdef WIN32
883 static unsigned __stdcall
init_spawned_worker_win32(WorkerInfo * wi)884 init_spawned_worker_win32(WorkerInfo *wi)
885 {
886 ArchiveHandle *AH = wi->AH;
887 ParallelSlot *slot = wi->slot;
888
889 /* Don't need WorkerInfo anymore */
890 free(wi);
891
892 /* Run the worker ... */
893 RunWorker(AH, slot);
894
895 /* Exit the thread */
896 _endthreadex(0);
897 return 0;
898 }
899 #endif /* WIN32 */
900
901 /*
902 * This function starts a parallel dump or restore by spawning off the worker
903 * processes. For Windows, it creates a number of threads; on Unix the
904 * workers are created with fork().
905 */
906 ParallelState *
ParallelBackupStart(ArchiveHandle * AH)907 ParallelBackupStart(ArchiveHandle *AH)
908 {
909 ParallelState *pstate;
910 int i;
911
912 Assert(AH->public.numWorkers > 0);
913
914 pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
915
916 pstate->numWorkers = AH->public.numWorkers;
917 pstate->te = NULL;
918 pstate->parallelSlot = NULL;
919
920 if (AH->public.numWorkers == 1)
921 return pstate;
922
923 /* Create status arrays, being sure to initialize all fields to 0 */
924 pstate->te = (TocEntry **)
925 pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
926 pstate->parallelSlot = (ParallelSlot *)
927 pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
928
929 #ifdef WIN32
930 /* Make fmtId() and fmtQualifiedId() use thread-local storage */
931 getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
932 #endif
933
934 /*
935 * Set the pstate in shutdown_info, to tell the exit handler that it must
936 * clean up workers as well as the main database connection. But we don't
937 * set this in signal_info yet, because we don't want child processes to
938 * inherit non-NULL signal_info.pstate.
939 */
940 shutdown_info.pstate = pstate;
941
942 /*
943 * Temporarily disable query cancellation on the master connection. This
944 * ensures that child processes won't inherit valid AH->connCancel
945 * settings and thus won't try to issue cancels against the master's
946 * connection. No harm is done if we fail while it's disabled, because
947 * the master connection is idle at this point anyway.
948 */
949 set_archive_cancel_info(AH, NULL);
950
951 /* Ensure stdio state is quiesced before forking */
952 fflush(NULL);
953
954 /* Create desired number of workers */
955 for (i = 0; i < pstate->numWorkers; i++)
956 {
957 #ifdef WIN32
958 WorkerInfo *wi;
959 uintptr_t handle;
960 #else
961 pid_t pid;
962 #endif
963 ParallelSlot *slot = &(pstate->parallelSlot[i]);
964 int pipeMW[2],
965 pipeWM[2];
966
967 /* Create communication pipes for this worker */
968 if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
969 exit_horribly(modulename,
970 "could not create communication channels: %s\n",
971 strerror(errno));
972
973 /* master's ends of the pipes */
974 slot->pipeRead = pipeWM[PIPE_READ];
975 slot->pipeWrite = pipeMW[PIPE_WRITE];
976 /* child's ends of the pipes */
977 slot->pipeRevRead = pipeMW[PIPE_READ];
978 slot->pipeRevWrite = pipeWM[PIPE_WRITE];
979
980 #ifdef WIN32
981 /* Create transient structure to pass args to worker function */
982 wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
983
984 wi->AH = AH;
985 wi->slot = slot;
986
987 handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
988 wi, 0, &(slot->threadId));
989 slot->hThread = handle;
990 slot->workerStatus = WRKR_IDLE;
991 #else /* !WIN32 */
992 pid = fork();
993 if (pid == 0)
994 {
995 /* we are the worker */
996 int j;
997
998 /* this is needed for GetMyPSlot() */
999 slot->pid = getpid();
1000
1001 /* instruct signal handler that we're in a worker now */
1002 signal_info.am_worker = true;
1003
1004 /* close read end of Worker -> Master */
1005 closesocket(pipeWM[PIPE_READ]);
1006 /* close write end of Master -> Worker */
1007 closesocket(pipeMW[PIPE_WRITE]);
1008
1009 /*
1010 * Close all inherited fds for communication of the master with
1011 * previously-forked workers.
1012 */
1013 for (j = 0; j < i; j++)
1014 {
1015 closesocket(pstate->parallelSlot[j].pipeRead);
1016 closesocket(pstate->parallelSlot[j].pipeWrite);
1017 }
1018
1019 /* Run the worker ... */
1020 RunWorker(AH, slot);
1021
1022 /* We can just exit(0) when done */
1023 exit(0);
1024 }
1025 else if (pid < 0)
1026 {
1027 /* fork failed */
1028 exit_horribly(modulename,
1029 "could not create worker process: %s\n",
1030 strerror(errno));
1031 }
1032
1033 /* In Master after successful fork */
1034 slot->pid = pid;
1035 slot->workerStatus = WRKR_IDLE;
1036
1037 /* close read end of Master -> Worker */
1038 closesocket(pipeMW[PIPE_READ]);
1039 /* close write end of Worker -> Master */
1040 closesocket(pipeWM[PIPE_WRITE]);
1041 #endif /* WIN32 */
1042 }
1043
1044 /*
1045 * Having forked off the workers, disable SIGPIPE so that master isn't
1046 * killed if it tries to send a command to a dead worker. We don't want
1047 * the workers to inherit this setting, though.
1048 */
1049 #ifndef WIN32
1050 pqsignal(SIGPIPE, SIG_IGN);
1051 #endif
1052
1053 /*
1054 * Re-establish query cancellation on the master connection.
1055 */
1056 set_archive_cancel_info(AH, AH->connection);
1057
1058 /*
1059 * Tell the cancel signal handler to forward signals to worker processes,
1060 * too. (As with query cancel, we did not need this earlier because the
1061 * workers have not yet been given anything to do; if we die before this
1062 * point, any already-started workers will see EOF and quit promptly.)
1063 */
1064 set_cancel_pstate(pstate);
1065
1066 return pstate;
1067 }
1068
1069 /*
1070 * Close down a parallel dump or restore.
1071 */
1072 void
ParallelBackupEnd(ArchiveHandle * AH,ParallelState * pstate)1073 ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
1074 {
1075 int i;
1076
1077 /* No work if non-parallel */
1078 if (pstate->numWorkers == 1)
1079 return;
1080
1081 /* There should not be any unfinished jobs */
1082 Assert(IsEveryWorkerIdle(pstate));
1083
1084 /* Close the sockets so that the workers know they can exit */
1085 for (i = 0; i < pstate->numWorkers; i++)
1086 {
1087 closesocket(pstate->parallelSlot[i].pipeRead);
1088 closesocket(pstate->parallelSlot[i].pipeWrite);
1089 }
1090
1091 /* Wait for them to exit */
1092 WaitForTerminatingWorkers(pstate);
1093
1094 /*
1095 * Unlink pstate from shutdown_info, so the exit handler will not try to
1096 * use it; and likewise unlink from signal_info.
1097 */
1098 shutdown_info.pstate = NULL;
1099 set_cancel_pstate(NULL);
1100
1101 /* Release state (mere neatnik-ism, since we're about to terminate) */
1102 free(pstate->te);
1103 free(pstate->parallelSlot);
1104 free(pstate);
1105 }
1106
1107 /*
1108 * These next four functions handle construction and parsing of the command
1109 * strings and response strings for parallel workers.
1110 *
1111 * Currently, these can be the same regardless of which archive format we are
1112 * processing. In future, we might want to let format modules override these
1113 * functions to add format-specific data to a command or response.
1114 */
1115
1116 /*
1117 * buildWorkerCommand: format a command string to send to a worker.
1118 *
1119 * The string is built in the caller-supplied buffer of size buflen.
1120 */
1121 static void
buildWorkerCommand(ArchiveHandle * AH,TocEntry * te,T_Action act,char * buf,int buflen)1122 buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act,
1123 char *buf, int buflen)
1124 {
1125 if (act == ACT_DUMP)
1126 snprintf(buf, buflen, "DUMP %d", te->dumpId);
1127 else if (act == ACT_RESTORE)
1128 snprintf(buf, buflen, "RESTORE %d", te->dumpId);
1129 else
1130 Assert(false);
1131 }
1132
1133 /*
1134 * parseWorkerCommand: interpret a command string in a worker.
1135 */
1136 static void
parseWorkerCommand(ArchiveHandle * AH,TocEntry ** te,T_Action * act,const char * msg)1137 parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act,
1138 const char *msg)
1139 {
1140 DumpId dumpId;
1141 int nBytes;
1142
1143 if (messageStartsWith(msg, "DUMP "))
1144 {
1145 *act = ACT_DUMP;
1146 sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
1147 Assert(nBytes == strlen(msg));
1148 *te = getTocEntryByDumpId(AH, dumpId);
1149 Assert(*te != NULL);
1150 }
1151 else if (messageStartsWith(msg, "RESTORE "))
1152 {
1153 *act = ACT_RESTORE;
1154 sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
1155 Assert(nBytes == strlen(msg));
1156 *te = getTocEntryByDumpId(AH, dumpId);
1157 Assert(*te != NULL);
1158 }
1159 else
1160 exit_horribly(modulename,
1161 "unrecognized command received from master: \"%s\"\n",
1162 msg);
1163 }
1164
1165 /*
1166 * buildWorkerResponse: format a response string to send to the master.
1167 *
1168 * The string is built in the caller-supplied buffer of size buflen.
1169 */
1170 static void
buildWorkerResponse(ArchiveHandle * AH,TocEntry * te,T_Action act,int status,char * buf,int buflen)1171 buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status,
1172 char *buf, int buflen)
1173 {
1174 snprintf(buf, buflen, "OK %d %d %d",
1175 te->dumpId,
1176 status,
1177 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
1178 }
1179
1180 /*
1181 * parseWorkerResponse: parse the status message returned by a worker.
1182 *
1183 * Returns the integer status code, and may update fields of AH and/or te.
1184 */
1185 static int
parseWorkerResponse(ArchiveHandle * AH,TocEntry * te,const char * msg)1186 parseWorkerResponse(ArchiveHandle *AH, TocEntry *te,
1187 const char *msg)
1188 {
1189 DumpId dumpId;
1190 int nBytes,
1191 n_errors;
1192 int status = 0;
1193
1194 if (messageStartsWith(msg, "OK "))
1195 {
1196 sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1197
1198 Assert(dumpId == te->dumpId);
1199 Assert(nBytes == strlen(msg));
1200
1201 AH->public.n_errors += n_errors;
1202 }
1203 else
1204 exit_horribly(modulename,
1205 "invalid message received from worker: \"%s\"\n",
1206 msg);
1207
1208 return status;
1209 }
1210
1211 /*
1212 * Dispatch a job to some free worker.
1213 *
1214 * te is the TocEntry to be processed, act is the action to be taken on it.
1215 * callback is the function to call on completion of the job.
1216 *
1217 * If no worker is currently available, this will block, and previously
1218 * registered callback functions may be called.
1219 */
1220 void
DispatchJobForTocEntry(ArchiveHandle * AH,ParallelState * pstate,TocEntry * te,T_Action act,ParallelCompletionPtr callback,void * callback_data)1221 DispatchJobForTocEntry(ArchiveHandle *AH,
1222 ParallelState *pstate,
1223 TocEntry *te,
1224 T_Action act,
1225 ParallelCompletionPtr callback,
1226 void *callback_data)
1227 {
1228 int worker;
1229 char buf[256];
1230
1231 /* Get a worker, waiting if none are idle */
1232 while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
1233 WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
1234
1235 /* Construct and send command string */
1236 buildWorkerCommand(AH, te, act, buf, sizeof(buf));
1237
1238 sendMessageToWorker(pstate, worker, buf);
1239
1240 /* Remember worker is busy, and which TocEntry it's working on */
1241 pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
1242 pstate->parallelSlot[worker].callback = callback;
1243 pstate->parallelSlot[worker].callback_data = callback_data;
1244 pstate->te[worker] = te;
1245 }
1246
1247 /*
1248 * Find an idle worker and return its slot number.
1249 * Return NO_SLOT if none are idle.
1250 */
1251 static int
GetIdleWorker(ParallelState * pstate)1252 GetIdleWorker(ParallelState *pstate)
1253 {
1254 int i;
1255
1256 for (i = 0; i < pstate->numWorkers; i++)
1257 {
1258 if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
1259 return i;
1260 }
1261 return NO_SLOT;
1262 }
1263
1264 /*
1265 * Return true iff no worker is running.
1266 */
1267 static bool
HasEveryWorkerTerminated(ParallelState * pstate)1268 HasEveryWorkerTerminated(ParallelState *pstate)
1269 {
1270 int i;
1271
1272 for (i = 0; i < pstate->numWorkers; i++)
1273 {
1274 if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1275 return false;
1276 }
1277 return true;
1278 }
1279
1280 /*
1281 * Return true iff every worker is in the WRKR_IDLE state.
1282 */
1283 bool
IsEveryWorkerIdle(ParallelState * pstate)1284 IsEveryWorkerIdle(ParallelState *pstate)
1285 {
1286 int i;
1287
1288 for (i = 0; i < pstate->numWorkers; i++)
1289 {
1290 if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
1291 return false;
1292 }
1293 return true;
1294 }
1295
1296 /*
1297 * Acquire lock on a table to be dumped by a worker process.
1298 *
1299 * The master process is already holding an ACCESS SHARE lock. Ordinarily
1300 * it's no problem for a worker to get one too, but if anything else besides
1301 * pg_dump is running, there's a possible deadlock:
1302 *
1303 * 1) Master dumps the schema and locks all tables in ACCESS SHARE mode.
1304 * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
1305 * because the master holds a conflicting ACCESS SHARE lock).
1306 * 3) A worker process also requests an ACCESS SHARE lock to read the table.
1307 * The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
1308 * 4) Now we have a deadlock, since the master is effectively waiting for
1309 * the worker. The server cannot detect that, however.
1310 *
1311 * To prevent an infinite wait, prior to touching a table in a worker, request
1312 * a lock in ACCESS SHARE mode but with NOWAIT. If we don't get the lock,
1313 * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
1314 * so we have a deadlock. We must fail the backup in that case.
1315 */
1316 static void
lockTableForWorker(ArchiveHandle * AH,TocEntry * te)1317 lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
1318 {
1319 const char *qualId;
1320 PQExpBuffer query;
1321 PGresult *res;
1322
1323 /* Nothing to do for BLOBS */
1324 if (strcmp(te->desc, "BLOBS") == 0)
1325 return;
1326
1327 query = createPQExpBuffer();
1328
1329 qualId = fmtQualifiedId(te->namespace, te->tag);
1330
1331 appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
1332 qualId);
1333
1334 res = PQexec(AH->connection, query->data);
1335
1336 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
1337 exit_horribly(modulename,
1338 "could not obtain lock on relation \"%s\"\n"
1339 "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1340 "on the table after the pg_dump parent process had gotten the "
1341 "initial ACCESS SHARE lock on the table.\n", qualId);
1342
1343 PQclear(res);
1344 destroyPQExpBuffer(query);
1345 }
1346
1347 /*
1348 * WaitForCommands: main routine for a worker process.
1349 *
1350 * Read and execute commands from the master until we see EOF on the pipe.
1351 */
1352 static void
WaitForCommands(ArchiveHandle * AH,int pipefd[2])1353 WaitForCommands(ArchiveHandle *AH, int pipefd[2])
1354 {
1355 char *command;
1356 TocEntry *te;
1357 T_Action act;
1358 int status = 0;
1359 char buf[256];
1360
1361 for (;;)
1362 {
1363 if (!(command = getMessageFromMaster(pipefd)))
1364 {
1365 /* EOF, so done */
1366 return;
1367 }
1368
1369 /* Decode the command */
1370 parseWorkerCommand(AH, &te, &act, command);
1371
1372 if (act == ACT_DUMP)
1373 {
1374 /* Acquire lock on this table within the worker's session */
1375 lockTableForWorker(AH, te);
1376
1377 /* Perform the dump command */
1378 status = (AH->WorkerJobDumpPtr) (AH, te);
1379 }
1380 else if (act == ACT_RESTORE)
1381 {
1382 /* Perform the restore command */
1383 status = (AH->WorkerJobRestorePtr) (AH, te);
1384 }
1385 else
1386 Assert(false);
1387
1388 /* Return status to master */
1389 buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
1390
1391 sendMessageToMaster(pipefd, buf);
1392
1393 /* command was pg_malloc'd and we are responsible for free()ing it. */
1394 free(command);
1395 }
1396 }
1397
1398 /*
1399 * Check for status messages from workers.
1400 *
1401 * If do_wait is true, wait to get a status message; otherwise, just return
1402 * immediately if there is none available.
1403 *
1404 * When we get a status message, we pass the status code to the callback
1405 * function that was specified to DispatchJobForTocEntry, then reset the
1406 * worker status to IDLE.
1407 *
1408 * Returns true if we collected a status message, else false.
1409 *
1410 * XXX is it worth checking for more than one status message per call?
1411 * It seems somewhat unlikely that multiple workers would finish at exactly
1412 * the same time.
1413 */
1414 static bool
ListenToWorkers(ArchiveHandle * AH,ParallelState * pstate,bool do_wait)1415 ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
1416 {
1417 int worker;
1418 char *msg;
1419
1420 /* Try to collect a status message */
1421 msg = getMessageFromWorker(pstate, do_wait, &worker);
1422
1423 if (!msg)
1424 {
1425 /* If do_wait is true, we must have detected EOF on some socket */
1426 if (do_wait)
1427 exit_horribly(modulename, "a worker process died unexpectedly\n");
1428 return false;
1429 }
1430
1431 /* Process it and update our idea of the worker's status */
1432 if (messageStartsWith(msg, "OK "))
1433 {
1434 ParallelSlot *slot = &pstate->parallelSlot[worker];
1435 TocEntry *te = pstate->te[worker];
1436 int status;
1437
1438 status = parseWorkerResponse(AH, te, msg);
1439 slot->callback(AH, te, status, slot->callback_data);
1440 slot->workerStatus = WRKR_IDLE;
1441 pstate->te[worker] = NULL;
1442 }
1443 else
1444 exit_horribly(modulename,
1445 "invalid message received from worker: \"%s\"\n",
1446 msg);
1447
1448 /* Free the string returned from getMessageFromWorker */
1449 free(msg);
1450
1451 return true;
1452 }
1453
1454 /*
1455 * Check for status results from workers, waiting if necessary.
1456 *
1457 * Available wait modes are:
1458 * WFW_NO_WAIT: reap any available status, but don't block
1459 * WFW_GOT_STATUS: wait for at least one more worker to finish
1460 * WFW_ONE_IDLE: wait for at least one worker to be idle
1461 * WFW_ALL_IDLE: wait for all workers to be idle
1462 *
1463 * Any received results are passed to the callback specified to
1464 * DispatchJobForTocEntry.
1465 *
1466 * This function is executed in the master process.
1467 */
1468 void
WaitForWorkers(ArchiveHandle * AH,ParallelState * pstate,WFW_WaitOption mode)1469 WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
1470 {
1471 bool do_wait = false;
1472
1473 /*
1474 * In GOT_STATUS mode, always block waiting for a message, since we can't
1475 * return till we get something. In other modes, we don't block the first
1476 * time through the loop.
1477 */
1478 if (mode == WFW_GOT_STATUS)
1479 {
1480 /* Assert that caller knows what it's doing */
1481 Assert(!IsEveryWorkerIdle(pstate));
1482 do_wait = true;
1483 }
1484
1485 for (;;)
1486 {
1487 /*
1488 * Check for status messages, even if we don't need to block. We do
1489 * not try very hard to reap all available messages, though, since
1490 * there's unlikely to be more than one.
1491 */
1492 if (ListenToWorkers(AH, pstate, do_wait))
1493 {
1494 /*
1495 * If we got a message, we are done by definition for GOT_STATUS
1496 * mode, and we can also be certain that there's at least one idle
1497 * worker. So we're done in all but ALL_IDLE mode.
1498 */
1499 if (mode != WFW_ALL_IDLE)
1500 return;
1501 }
1502
1503 /* Check whether we must wait for new status messages */
1504 switch (mode)
1505 {
1506 case WFW_NO_WAIT:
1507 return; /* never wait */
1508 case WFW_GOT_STATUS:
1509 Assert(false); /* can't get here, because we waited */
1510 break;
1511 case WFW_ONE_IDLE:
1512 if (GetIdleWorker(pstate) != NO_SLOT)
1513 return;
1514 break;
1515 case WFW_ALL_IDLE:
1516 if (IsEveryWorkerIdle(pstate))
1517 return;
1518 break;
1519 }
1520
1521 /* Loop back, and this time wait for something to happen */
1522 do_wait = true;
1523 }
1524 }
1525
1526 /*
1527 * Read one command message from the master, blocking if necessary
1528 * until one is available, and return it as a malloc'd string.
1529 * On EOF, return NULL.
1530 *
1531 * This function is executed in worker processes.
1532 */
1533 static char *
getMessageFromMaster(int pipefd[2])1534 getMessageFromMaster(int pipefd[2])
1535 {
1536 return readMessageFromPipe(pipefd[PIPE_READ]);
1537 }
1538
1539 /*
1540 * Send a status message to the master.
1541 *
1542 * This function is executed in worker processes.
1543 */
1544 static void
sendMessageToMaster(int pipefd[2],const char * str)1545 sendMessageToMaster(int pipefd[2], const char *str)
1546 {
1547 int len = strlen(str) + 1;
1548
1549 if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
1550 exit_horribly(modulename,
1551 "could not write to the communication channel: %s\n",
1552 strerror(errno));
1553 }
1554
1555 /*
1556 * Wait until some descriptor in "workerset" becomes readable.
1557 * Returns -1 on error, else the number of readable descriptors.
1558 */
1559 static int
select_loop(int maxFd,fd_set * workerset)1560 select_loop(int maxFd, fd_set *workerset)
1561 {
1562 int i;
1563 fd_set saveSet = *workerset;
1564
1565 for (;;)
1566 {
1567 *workerset = saveSet;
1568 i = select(maxFd + 1, workerset, NULL, NULL, NULL);
1569
1570 #ifndef WIN32
1571 if (i < 0 && errno == EINTR)
1572 continue;
1573 #else
1574 if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1575 continue;
1576 #endif
1577 break;
1578 }
1579
1580 return i;
1581 }
1582
1583
1584 /*
1585 * Check for messages from worker processes.
1586 *
1587 * If a message is available, return it as a malloc'd string, and put the
1588 * index of the sending worker in *worker.
1589 *
1590 * If nothing is available, wait if "do_wait" is true, else return NULL.
1591 *
1592 * If we detect EOF on any socket, we'll return NULL. It's not great that
1593 * that's hard to distinguish from the no-data-available case, but for now
1594 * our one caller is okay with that.
1595 *
1596 * This function is executed in the master process.
1597 */
1598 static char *
getMessageFromWorker(ParallelState * pstate,bool do_wait,int * worker)1599 getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
1600 {
1601 int i;
1602 fd_set workerset;
1603 int maxFd = -1;
1604 struct timeval nowait = {0, 0};
1605
1606 /* construct bitmap of socket descriptors for select() */
1607 FD_ZERO(&workerset);
1608 for (i = 0; i < pstate->numWorkers; i++)
1609 {
1610 if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1611 continue;
1612 FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
1613 if (pstate->parallelSlot[i].pipeRead > maxFd)
1614 maxFd = pstate->parallelSlot[i].pipeRead;
1615 }
1616
1617 if (do_wait)
1618 {
1619 i = select_loop(maxFd, &workerset);
1620 Assert(i != 0);
1621 }
1622 else
1623 {
1624 if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1625 return NULL;
1626 }
1627
1628 if (i < 0)
1629 exit_horribly(modulename, "select() failed: %s\n", strerror(errno));
1630
1631 for (i = 0; i < pstate->numWorkers; i++)
1632 {
1633 char *msg;
1634
1635 if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1636 continue;
1637 if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
1638 continue;
1639
1640 /*
1641 * Read the message if any. If the socket is ready because of EOF,
1642 * we'll return NULL instead (and the socket will stay ready, so the
1643 * condition will persist).
1644 *
1645 * Note: because this is a blocking read, we'll wait if only part of
1646 * the message is available. Waiting a long time would be bad, but
1647 * since worker status messages are short and are always sent in one
1648 * operation, it shouldn't be a problem in practice.
1649 */
1650 msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
1651 *worker = i;
1652 return msg;
1653 }
1654 Assert(false);
1655 return NULL;
1656 }
1657
1658 /*
1659 * Send a command message to the specified worker process.
1660 *
1661 * This function is executed in the master process.
1662 */
1663 static void
sendMessageToWorker(ParallelState * pstate,int worker,const char * str)1664 sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
1665 {
1666 int len = strlen(str) + 1;
1667
1668 if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
1669 {
1670 exit_horribly(modulename,
1671 "could not write to the communication channel: %s\n",
1672 strerror(errno));
1673 }
1674 }
1675
1676 /*
1677 * Read one message from the specified pipe (fd), blocking if necessary
1678 * until one is available, and return it as a malloc'd string.
1679 * On EOF, return NULL.
1680 *
1681 * A "message" on the channel is just a null-terminated string.
1682 */
1683 static char *
readMessageFromPipe(int fd)1684 readMessageFromPipe(int fd)
1685 {
1686 char *msg;
1687 int msgsize,
1688 bufsize;
1689 int ret;
1690
1691 /*
1692 * In theory, if we let piperead() read multiple bytes, it might give us
1693 * back fragments of multiple messages. (That can't actually occur, since
1694 * neither master nor workers send more than one message without waiting
1695 * for a reply, but we don't wish to assume that here.) For simplicity,
1696 * read a byte at a time until we get the terminating '\0'. This method
1697 * is a bit inefficient, but since this is only used for relatively short
1698 * command and status strings, it shouldn't matter.
1699 */
1700 bufsize = 64; /* could be any number */
1701 msg = (char *) pg_malloc(bufsize);
1702 msgsize = 0;
1703 for (;;)
1704 {
1705 Assert(msgsize < bufsize);
1706 ret = piperead(fd, msg + msgsize, 1);
1707 if (ret <= 0)
1708 break; /* error or connection closure */
1709
1710 Assert(ret == 1);
1711
1712 if (msg[msgsize] == '\0')
1713 return msg; /* collected whole message */
1714
1715 msgsize++;
1716 if (msgsize == bufsize) /* enlarge buffer if needed */
1717 {
1718 bufsize += 16; /* could be any number */
1719 msg = (char *) pg_realloc(msg, bufsize);
1720 }
1721 }
1722
1723 /* Other end has closed the connection */
1724 pg_free(msg);
1725 return NULL;
1726 }
1727
1728 #ifdef WIN32
1729
1730 /*
1731 * This is a replacement version of pipe(2) for Windows which allows the pipe
1732 * handles to be used in select().
1733 *
1734 * Reads and writes on the pipe must go through piperead()/pipewrite().
1735 *
1736 * For consistency with Unix we declare the returned handles as "int".
1737 * This is okay even on WIN64 because system handles are not more than
1738 * 32 bits wide, but we do have to do some casting.
1739 */
1740 static int
pgpipe(int handles[2])1741 pgpipe(int handles[2])
1742 {
1743 pgsocket s,
1744 tmp_sock;
1745 struct sockaddr_in serv_addr;
1746 int len = sizeof(serv_addr);
1747
1748 /* We have to use the Unix socket invalid file descriptor value here. */
1749 handles[0] = handles[1] = -1;
1750
1751 /*
1752 * setup listen socket
1753 */
1754 if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1755 {
1756 write_msg(modulename, "pgpipe: could not create socket: error code %d\n",
1757 WSAGetLastError());
1758 return -1;
1759 }
1760
1761 memset((void *) &serv_addr, 0, sizeof(serv_addr));
1762 serv_addr.sin_family = AF_INET;
1763 serv_addr.sin_port = pg_hton16(0);
1764 serv_addr.sin_addr.s_addr = pg_hton32(INADDR_LOOPBACK);
1765 if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1766 {
1767 write_msg(modulename, "pgpipe: could not bind: error code %d\n",
1768 WSAGetLastError());
1769 closesocket(s);
1770 return -1;
1771 }
1772 if (listen(s, 1) == SOCKET_ERROR)
1773 {
1774 write_msg(modulename, "pgpipe: could not listen: error code %d\n",
1775 WSAGetLastError());
1776 closesocket(s);
1777 return -1;
1778 }
1779 if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
1780 {
1781 write_msg(modulename, "pgpipe: getsockname() failed: error code %d\n",
1782 WSAGetLastError());
1783 closesocket(s);
1784 return -1;
1785 }
1786
1787 /*
1788 * setup pipe handles
1789 */
1790 if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1791 {
1792 write_msg(modulename, "pgpipe: could not create second socket: error code %d\n",
1793 WSAGetLastError());
1794 closesocket(s);
1795 return -1;
1796 }
1797 handles[1] = (int) tmp_sock;
1798
1799 if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1800 {
1801 write_msg(modulename, "pgpipe: could not connect socket: error code %d\n",
1802 WSAGetLastError());
1803 closesocket(handles[1]);
1804 handles[1] = -1;
1805 closesocket(s);
1806 return -1;
1807 }
1808 if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
1809 {
1810 write_msg(modulename, "pgpipe: could not accept connection: error code %d\n",
1811 WSAGetLastError());
1812 closesocket(handles[1]);
1813 handles[1] = -1;
1814 closesocket(s);
1815 return -1;
1816 }
1817 handles[0] = (int) tmp_sock;
1818
1819 closesocket(s);
1820 return 0;
1821 }
1822
1823 /*
1824 * Windows implementation of reading from a pipe.
1825 */
1826 static int
piperead(int s,char * buf,int len)1827 piperead(int s, char *buf, int len)
1828 {
1829 int ret = recv(s, buf, len, 0);
1830
1831 if (ret < 0 && WSAGetLastError() == WSAECONNRESET)
1832 {
1833 /* EOF on the pipe! */
1834 ret = 0;
1835 }
1836 return ret;
1837 }
1838
1839 #endif /* WIN32 */
1840