1 /*-------------------------------------------------------------------------
2 *
3 * parallel.c
4 *
5 * Parallel support for pg_dump and pg_restore
6 *
7 * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
9 *
10 * IDENTIFICATION
11 * src/bin/pg_dump/parallel.c
12 *
13 *-------------------------------------------------------------------------
14 */
15
16 /*
17 * Parallel operation works like this:
18 *
19 * The original, master process calls ParallelBackupStart(), which forks off
20 * the desired number of worker processes, which each enter WaitForCommands().
21 *
22 * The master process dispatches an individual work item to one of the worker
23 * processes in DispatchJobForTocEntry(). That calls
24 * AH->MasterStartParallelItemPtr, a routine of the output format. This
25 * function's arguments are the parents archive handle AH (containing the full
26 * catalog information), the TocEntry that the worker should work on and a
27 * T_Action value indicating whether this is a backup or a restore task. The
28 * function simply converts the TocEntry assignment into a command string that
29 * is then sent over to the worker process. In the simplest case that would be
30 * something like "DUMP 1234", with 1234 being the TocEntry id.
31 *
32 * The worker process receives and decodes the command and passes it to the
33 * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
34 * which are routines of the current archive format. That routine performs
35 * the required action (dump or restore) and returns a malloc'd status string.
36 * The status string is passed back to the master where it is interpreted by
37 * AH->MasterEndParallelItemPtr, another format-specific routine. That
38 * function can update state or catalog information on the master's side,
39 * depending on the reply from the worker process. In the end it returns a
40 * status code, which is 0 for successful execution.
41 *
42 * Remember that we have forked off the workers only after we have read in
43 * the catalog. That's why our worker processes can also access the catalog
44 * information. (In the Windows case, the workers are threads in the same
45 * process. To avoid problems, they work with cloned copies of the Archive
46 * data structure; see RunWorker().)
47 *
48 * In the master process, the workerStatus field for each worker has one of
49 * the following values:
50 * WRKR_IDLE: it's waiting for a command
51 * WRKR_WORKING: it's been sent a command
52 * WRKR_FINISHED: it's returned a result
53 * WRKR_TERMINATED: process ended (or not started yet)
54 * The FINISHED state indicates that the worker is idle, but we've not yet
55 * dealt with the status code it returned from the prior command.
56 * ReapWorkerStatus() extracts the unhandled command status value and sets
57 * the workerStatus back to WRKR_IDLE.
58 */
59
60 #include "postgres_fe.h"
61
62 #ifdef HAVE_SYS_SELECT_H
63 #include <sys/select.h>
64 #endif
65
66 #include "parallel.h"
67 #include "pg_backup_utils.h"
68 #include "fe_utils/string_utils.h"
69
70 #ifndef WIN32
71 #include <sys/types.h>
72 #include <sys/wait.h>
73 #include "signal.h"
74 #include <unistd.h>
75 #include <fcntl.h>
76 #endif
77
78 /* Mnemonic macros for indexing the fd array returned by pipe(2) */
79 #define PIPE_READ 0
80 #define PIPE_WRITE 1
81
82 #ifdef WIN32
83
84 /*
85 * Structure to hold info passed by _beginthreadex() to the function it calls
86 * via its single allowed argument.
87 */
88 typedef struct
89 {
90 ArchiveHandle *AH; /* master database connection */
91 ParallelSlot *slot; /* this worker's parallel slot */
92 } WorkerInfo;
93
94 /* Windows implementation of pipe access */
95 static int pgpipe(int handles[2]);
96 static int piperead(int s, char *buf, int len);
97 #define pipewrite(a,b,c) send(a,b,c,0)
98
99 #else /* !WIN32 */
100
101 /* Non-Windows implementation of pipe access */
102 #define pgpipe(a) pipe(a)
103 #define piperead(a,b,c) read(a,b,c)
104 #define pipewrite(a,b,c) write(a,b,c)
105
106 #endif /* WIN32 */
107
108 /*
109 * State info for archive_close_connection() shutdown callback.
110 */
111 typedef struct ShutdownInformation
112 {
113 ParallelState *pstate;
114 Archive *AHX;
115 } ShutdownInformation;
116
117 static ShutdownInformation shutdown_info;
118
119 /*
120 * State info for signal handling.
121 * We assume signal_info initializes to zeroes.
122 *
123 * On Unix, myAH is the master DB connection in the master process, and the
124 * worker's own connection in worker processes. On Windows, we have only one
125 * instance of signal_info, so myAH is the master connection and the worker
126 * connections must be dug out of pstate->parallelSlot[].
127 */
128 typedef struct DumpSignalInformation
129 {
130 ArchiveHandle *myAH; /* database connection to issue cancel for */
131 ParallelState *pstate; /* parallel state, if any */
132 bool handler_set; /* signal handler set up in this process? */
133 #ifndef WIN32
134 bool am_worker; /* am I a worker process? */
135 #endif
136 } DumpSignalInformation;
137
138 static volatile DumpSignalInformation signal_info;
139
140 #ifdef WIN32
141 static CRITICAL_SECTION signal_info_lock;
142 #endif
143
144 /*
145 * Write a simple string to stderr --- must be safe in a signal handler.
146 * We ignore the write() result since there's not much we could do about it.
147 * Certain compilers make that harder than it ought to be.
148 */
149 #define write_stderr(str) \
150 do { \
151 const char *str_ = (str); \
152 int rc_; \
153 rc_ = write(fileno(stderr), str_, strlen(str_)); \
154 (void) rc_; \
155 } while (0)
156
157
158 #ifdef WIN32
159 /* file-scope variables */
160 static DWORD tls_index;
161
162 /* globally visible variables (needed by exit_nicely) */
163 bool parallel_init_done = false;
164 DWORD mainThreadId;
165 #endif /* WIN32 */
166
167 static const char *modulename = gettext_noop("parallel archiver");
168
169 /* Local function prototypes */
170 static ParallelSlot *GetMyPSlot(ParallelState *pstate);
171 static void archive_close_connection(int code, void *arg);
172 static void ShutdownWorkersHard(ParallelState *pstate);
173 static void WaitForTerminatingWorkers(ParallelState *pstate);
174 static void setup_cancel_handler(void);
175 static void set_cancel_pstate(ParallelState *pstate);
176 static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
177 static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
178 static bool HasEveryWorkerTerminated(ParallelState *pstate);
179 static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
180 static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
181 static char *getMessageFromMaster(int pipefd[2]);
182 static void sendMessageToMaster(int pipefd[2], const char *str);
183 static int select_loop(int maxFd, fd_set *workerset);
184 static char *getMessageFromWorker(ParallelState *pstate,
185 bool do_wait, int *worker);
186 static void sendMessageToWorker(ParallelState *pstate,
187 int worker, const char *str);
188 static char *readMessageFromPipe(int fd);
189
190 #define messageStartsWith(msg, prefix) \
191 (strncmp(msg, prefix, strlen(prefix)) == 0)
192 #define messageEquals(msg, pattern) \
193 (strcmp(msg, pattern) == 0)
194
195
196 /*
197 * Initialize parallel dump support --- should be called early in process
198 * startup. (Currently, this is called whether or not we intend parallel
199 * activity.)
200 */
201 void
init_parallel_dump_utils(void)202 init_parallel_dump_utils(void)
203 {
204 #ifdef WIN32
205 if (!parallel_init_done)
206 {
207 WSADATA wsaData;
208 int err;
209
210 /* Prepare for threaded operation */
211 tls_index = TlsAlloc();
212 mainThreadId = GetCurrentThreadId();
213
214 /* Initialize socket access */
215 err = WSAStartup(MAKEWORD(2, 2), &wsaData);
216 if (err != 0)
217 {
218 fprintf(stderr, _("%s: WSAStartup failed: %d\n"), progname, err);
219 exit_nicely(1);
220 }
221
222 parallel_init_done = true;
223 }
224 #endif
225 }
226
227 /*
228 * Find the ParallelSlot for the current worker process or thread.
229 *
230 * Returns NULL if no matching slot is found (this implies we're the master).
231 */
232 static ParallelSlot *
GetMyPSlot(ParallelState * pstate)233 GetMyPSlot(ParallelState *pstate)
234 {
235 int i;
236
237 for (i = 0; i < pstate->numWorkers; i++)
238 {
239 #ifdef WIN32
240 if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
241 #else
242 if (pstate->parallelSlot[i].pid == getpid())
243 #endif
244 return &(pstate->parallelSlot[i]);
245 }
246
247 return NULL;
248 }
249
250 /*
251 * A thread-local version of getLocalPQExpBuffer().
252 *
253 * Non-reentrant but reduces memory leakage: we'll consume one buffer per
254 * thread, which is much better than one per fmtId/fmtQualifiedId call.
255 */
256 #ifdef WIN32
257 static PQExpBuffer
getThreadLocalPQExpBuffer(void)258 getThreadLocalPQExpBuffer(void)
259 {
260 /*
261 * The Tls code goes awry if we use a static var, so we provide for both
262 * static and auto, and omit any use of the static var when using Tls. We
263 * rely on TlsGetValue() to return 0 if the value is not yet set.
264 */
265 static PQExpBuffer s_id_return = NULL;
266 PQExpBuffer id_return;
267
268 if (parallel_init_done)
269 id_return = (PQExpBuffer) TlsGetValue(tls_index);
270 else
271 id_return = s_id_return;
272
273 if (id_return) /* first time through? */
274 {
275 /* same buffer, just wipe contents */
276 resetPQExpBuffer(id_return);
277 }
278 else
279 {
280 /* new buffer */
281 id_return = createPQExpBuffer();
282 if (parallel_init_done)
283 TlsSetValue(tls_index, id_return);
284 else
285 s_id_return = id_return;
286 }
287
288 return id_return;
289 }
290 #endif /* WIN32 */
291
292 /*
293 * pg_dump and pg_restore call this to register the cleanup handler
294 * as soon as they've created the ArchiveHandle.
295 */
296 void
on_exit_close_archive(Archive * AHX)297 on_exit_close_archive(Archive *AHX)
298 {
299 shutdown_info.AHX = AHX;
300 on_exit_nicely(archive_close_connection, &shutdown_info);
301 }
302
303 /*
304 * on_exit_nicely handler for shutting down database connections and
305 * worker processes cleanly.
306 */
307 static void
archive_close_connection(int code,void * arg)308 archive_close_connection(int code, void *arg)
309 {
310 ShutdownInformation *si = (ShutdownInformation *) arg;
311
312 if (si->pstate)
313 {
314 /* In parallel mode, must figure out who we are */
315 ParallelSlot *slot = GetMyPSlot(si->pstate);
316
317 if (!slot)
318 {
319 /*
320 * We're the master. Forcibly shut down workers, then close our
321 * own database connection, if any.
322 */
323 ShutdownWorkersHard(si->pstate);
324
325 if (si->AHX)
326 DisconnectDatabase(si->AHX);
327 }
328 else
329 {
330 /*
331 * We're a worker. Shut down our own DB connection if any. On
332 * Windows, we also have to close our communication sockets, to
333 * emulate what will happen on Unix when the worker process exits.
334 * (Without this, if this is a premature exit, the master would
335 * fail to detect it because there would be no EOF condition on
336 * the other end of the pipe.)
337 */
338 if (slot->args->AH)
339 DisconnectDatabase(&(slot->args->AH->public));
340
341 #ifdef WIN32
342 closesocket(slot->pipeRevRead);
343 closesocket(slot->pipeRevWrite);
344 #endif
345 }
346 }
347 else
348 {
349 /* Non-parallel operation: just kill the master DB connection */
350 if (si->AHX)
351 DisconnectDatabase(si->AHX);
352 }
353 }
354
355 /*
356 * Forcibly shut down any remaining workers, waiting for them to finish.
357 *
358 * Note that we don't expect to come here during normal exit (the workers
359 * should be long gone, and the ParallelState too). We're only here in an
360 * exit_horribly() situation, so intervening to cancel active commands is
361 * appropriate.
362 */
363 static void
ShutdownWorkersHard(ParallelState * pstate)364 ShutdownWorkersHard(ParallelState *pstate)
365 {
366 int i;
367
368 /*
369 * Close our write end of the sockets so that any workers waiting for
370 * commands know they can exit. (Note: some of the pipeWrite fields might
371 * still be zero, if we failed to initialize all the workers. Hence, just
372 * ignore errors here.)
373 */
374 for (i = 0; i < pstate->numWorkers; i++)
375 closesocket(pstate->parallelSlot[i].pipeWrite);
376
377 /*
378 * Force early termination of any commands currently in progress.
379 */
380 #ifndef WIN32
381 /* On non-Windows, send SIGTERM to each worker process. */
382 for (i = 0; i < pstate->numWorkers; i++)
383 {
384 pid_t pid = pstate->parallelSlot[i].pid;
385
386 if (pid != 0)
387 kill(pid, SIGTERM);
388 }
389 #else
390
391 /*
392 * On Windows, send query cancels directly to the workers' backends. Use
393 * a critical section to ensure worker threads don't change state.
394 */
395 EnterCriticalSection(&signal_info_lock);
396 for (i = 0; i < pstate->numWorkers; i++)
397 {
398 ArchiveHandle *AH = pstate->parallelSlot[i].args->AH;
399 char errbuf[1];
400
401 if (AH != NULL && AH->connCancel != NULL)
402 (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
403 }
404 LeaveCriticalSection(&signal_info_lock);
405 #endif
406
407 /* Now wait for them to terminate. */
408 WaitForTerminatingWorkers(pstate);
409 }
410
411 /*
412 * Wait for all workers to terminate.
413 */
414 static void
WaitForTerminatingWorkers(ParallelState * pstate)415 WaitForTerminatingWorkers(ParallelState *pstate)
416 {
417 while (!HasEveryWorkerTerminated(pstate))
418 {
419 ParallelSlot *slot = NULL;
420 int j;
421
422 #ifndef WIN32
423 /* On non-Windows, use wait() to wait for next worker to end */
424 int status;
425 pid_t pid = wait(&status);
426
427 /* Find dead worker's slot, and clear the PID field */
428 for (j = 0; j < pstate->numWorkers; j++)
429 {
430 slot = &(pstate->parallelSlot[j]);
431 if (slot->pid == pid)
432 {
433 slot->pid = 0;
434 break;
435 }
436 }
437 #else /* WIN32 */
438 /* On Windows, we must use WaitForMultipleObjects() */
439 HANDLE *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
440 int nrun = 0;
441 DWORD ret;
442 uintptr_t hThread;
443
444 for (j = 0; j < pstate->numWorkers; j++)
445 {
446 if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
447 {
448 lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
449 nrun++;
450 }
451 }
452 ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
453 Assert(ret != WAIT_FAILED);
454 hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
455 free(lpHandles);
456
457 /* Find dead worker's slot, and clear the hThread field */
458 for (j = 0; j < pstate->numWorkers; j++)
459 {
460 slot = &(pstate->parallelSlot[j]);
461 if (slot->hThread == hThread)
462 {
463 /* For cleanliness, close handles for dead threads */
464 CloseHandle((HANDLE) slot->hThread);
465 slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
466 break;
467 }
468 }
469 #endif /* WIN32 */
470
471 /* On all platforms, update workerStatus as well */
472 Assert(j < pstate->numWorkers);
473 slot->workerStatus = WRKR_TERMINATED;
474 }
475 }
476
477
478 /*
479 * Code for responding to cancel interrupts (SIGINT, control-C, etc)
480 *
481 * This doesn't quite belong in this module, but it needs access to the
482 * ParallelState data, so there's not really a better place either.
483 *
484 * When we get a cancel interrupt, we could just die, but in pg_restore that
485 * could leave a SQL command (e.g., CREATE INDEX on a large table) running
486 * for a long time. Instead, we try to send a cancel request and then die.
487 * pg_dump probably doesn't really need this, but we might as well use it
488 * there too. Note that sending the cancel directly from the signal handler
489 * is safe because PQcancel() is written to make it so.
490 *
491 * In parallel operation on Unix, each process is responsible for canceling
492 * its own connection (this must be so because nobody else has access to it).
493 * Furthermore, the master process should attempt to forward its signal to
494 * each child. In simple manual use of pg_dump/pg_restore, forwarding isn't
495 * needed because typing control-C at the console would deliver SIGINT to
496 * every member of the terminal process group --- but in other scenarios it
497 * might be that only the master gets signaled.
498 *
499 * On Windows, the cancel handler runs in a separate thread, because that's
500 * how SetConsoleCtrlHandler works. We make it stop worker threads, send
501 * cancels on all active connections, and then return FALSE, which will allow
502 * the process to die. For safety's sake, we use a critical section to
503 * protect the PGcancel structures against being changed while the signal
504 * thread runs.
505 */
506
507 #ifndef WIN32
508
509 /*
510 * Signal handler (Unix only)
511 */
512 static void
sigTermHandler(SIGNAL_ARGS)513 sigTermHandler(SIGNAL_ARGS)
514 {
515 int i;
516 char errbuf[1];
517
518 /*
519 * Some platforms allow delivery of new signals to interrupt an active
520 * signal handler. That could muck up our attempt to send PQcancel, so
521 * disable the signals that setup_cancel_handler enabled.
522 */
523 pqsignal(SIGINT, SIG_IGN);
524 pqsignal(SIGTERM, SIG_IGN);
525 pqsignal(SIGQUIT, SIG_IGN);
526
527 /*
528 * If we're in the master, forward signal to all workers. (It seems best
529 * to do this before PQcancel; killing the master transaction will result
530 * in invalid-snapshot errors from active workers, which maybe we can
531 * quiet by killing workers first.) Ignore any errors.
532 */
533 if (signal_info.pstate != NULL)
534 {
535 for (i = 0; i < signal_info.pstate->numWorkers; i++)
536 {
537 pid_t pid = signal_info.pstate->parallelSlot[i].pid;
538
539 if (pid != 0)
540 kill(pid, SIGTERM);
541 }
542 }
543
544 /*
545 * Send QueryCancel if we have a connection to send to. Ignore errors,
546 * there's not much we can do about them anyway.
547 */
548 if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
549 (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
550
551 /*
552 * Report we're quitting, using nothing more complicated than write(2).
553 * When in parallel operation, only the master process should do this.
554 */
555 if (!signal_info.am_worker)
556 {
557 if (progname)
558 {
559 write_stderr(progname);
560 write_stderr(": ");
561 }
562 write_stderr("terminated by user\n");
563 }
564
565 /*
566 * And die, using _exit() not exit() because the latter will invoke atexit
567 * handlers that can fail if we interrupted related code.
568 */
569 _exit(1);
570 }
571
572 /*
573 * Enable cancel interrupt handler, if not already done.
574 */
575 static void
setup_cancel_handler(void)576 setup_cancel_handler(void)
577 {
578 /*
579 * When forking, signal_info.handler_set will propagate into the new
580 * process, but that's fine because the signal handler state does too.
581 */
582 if (!signal_info.handler_set)
583 {
584 signal_info.handler_set = true;
585
586 pqsignal(SIGINT, sigTermHandler);
587 pqsignal(SIGTERM, sigTermHandler);
588 pqsignal(SIGQUIT, sigTermHandler);
589 }
590 }
591
592 #else /* WIN32 */
593
594 /*
595 * Console interrupt handler --- runs in a newly-started thread.
596 *
597 * After stopping other threads and sending cancel requests on all open
598 * connections, we return FALSE which will allow the default ExitProcess()
599 * action to be taken.
600 */
601 static BOOL WINAPI
consoleHandler(DWORD dwCtrlType)602 consoleHandler(DWORD dwCtrlType)
603 {
604 int i;
605 char errbuf[1];
606
607 if (dwCtrlType == CTRL_C_EVENT ||
608 dwCtrlType == CTRL_BREAK_EVENT)
609 {
610 /* Critical section prevents changing data we look at here */
611 EnterCriticalSection(&signal_info_lock);
612
613 /*
614 * If in parallel mode, stop worker threads and send QueryCancel to
615 * their connected backends. The main point of stopping the worker
616 * threads is to keep them from reporting the query cancels as errors,
617 * which would clutter the user's screen. We needn't stop the master
618 * thread since it won't be doing much anyway. Do this before
619 * canceling the main transaction, else we might get invalid-snapshot
620 * errors reported before we can stop the workers. Ignore errors,
621 * there's not much we can do about them anyway.
622 */
623 if (signal_info.pstate != NULL)
624 {
625 for (i = 0; i < signal_info.pstate->numWorkers; i++)
626 {
627 ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
628 ArchiveHandle *AH = slot->args->AH;
629 HANDLE hThread = (HANDLE) slot->hThread;
630
631 /*
632 * Using TerminateThread here may leave some resources leaked,
633 * but it doesn't matter since we're about to end the whole
634 * process.
635 */
636 if (hThread != INVALID_HANDLE_VALUE)
637 TerminateThread(hThread, 0);
638
639 if (AH != NULL && AH->connCancel != NULL)
640 (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
641 }
642 }
643
644 /*
645 * Send QueryCancel to master connection, if enabled. Ignore errors,
646 * there's not much we can do about them anyway.
647 */
648 if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
649 (void) PQcancel(signal_info.myAH->connCancel,
650 errbuf, sizeof(errbuf));
651
652 LeaveCriticalSection(&signal_info_lock);
653
654 /*
655 * Report we're quitting, using nothing more complicated than
656 * write(2). (We might be able to get away with using write_msg()
657 * here, but since we terminated other threads uncleanly above, it
658 * seems better to assume as little as possible.)
659 */
660 if (progname)
661 {
662 write_stderr(progname);
663 write_stderr(": ");
664 }
665 write_stderr("terminated by user\n");
666 }
667
668 /* Always return FALSE to allow signal handling to continue */
669 return FALSE;
670 }
671
672 /*
673 * Enable cancel interrupt handler, if not already done.
674 */
675 static void
setup_cancel_handler(void)676 setup_cancel_handler(void)
677 {
678 if (!signal_info.handler_set)
679 {
680 signal_info.handler_set = true;
681
682 InitializeCriticalSection(&signal_info_lock);
683
684 SetConsoleCtrlHandler(consoleHandler, TRUE);
685 }
686 }
687
688 #endif /* WIN32 */
689
690
691 /*
692 * set_archive_cancel_info
693 *
694 * Fill AH->connCancel with cancellation info for the specified database
695 * connection; or clear it if conn is NULL.
696 */
697 void
set_archive_cancel_info(ArchiveHandle * AH,PGconn * conn)698 set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
699 {
700 PGcancel *oldConnCancel;
701
702 /*
703 * Activate the interrupt handler if we didn't yet in this process. On
704 * Windows, this also initializes signal_info_lock; therefore it's
705 * important that this happen at least once before we fork off any
706 * threads.
707 */
708 setup_cancel_handler();
709
710 /*
711 * On Unix, we assume that storing a pointer value is atomic with respect
712 * to any possible signal interrupt. On Windows, use a critical section.
713 */
714
715 #ifdef WIN32
716 EnterCriticalSection(&signal_info_lock);
717 #endif
718
719 /* Free the old one if we have one */
720 oldConnCancel = AH->connCancel;
721 /* be sure interrupt handler doesn't use pointer while freeing */
722 AH->connCancel = NULL;
723
724 if (oldConnCancel != NULL)
725 PQfreeCancel(oldConnCancel);
726
727 /* Set the new one if specified */
728 if (conn)
729 AH->connCancel = PQgetCancel(conn);
730
731 /*
732 * On Unix, there's only ever one active ArchiveHandle per process, so we
733 * can just set signal_info.myAH unconditionally. On Windows, do that
734 * only in the main thread; worker threads have to make sure their
735 * ArchiveHandle appears in the pstate data, which is dealt with in
736 * RunWorker().
737 */
738 #ifndef WIN32
739 signal_info.myAH = AH;
740 #else
741 if (mainThreadId == GetCurrentThreadId())
742 signal_info.myAH = AH;
743 #endif
744
745 #ifdef WIN32
746 LeaveCriticalSection(&signal_info_lock);
747 #endif
748 }
749
750 /*
751 * set_cancel_pstate
752 *
753 * Set signal_info.pstate to point to the specified ParallelState, if any.
754 * We need this mainly to have an interlock against Windows signal thread.
755 */
756 static void
set_cancel_pstate(ParallelState * pstate)757 set_cancel_pstate(ParallelState *pstate)
758 {
759 #ifdef WIN32
760 EnterCriticalSection(&signal_info_lock);
761 #endif
762
763 signal_info.pstate = pstate;
764
765 #ifdef WIN32
766 LeaveCriticalSection(&signal_info_lock);
767 #endif
768 }
769
770 /*
771 * set_cancel_slot_archive
772 *
773 * Set ParallelSlot's AH field to point to the specified archive, if any.
774 * We need this mainly to have an interlock against Windows signal thread.
775 */
776 static void
set_cancel_slot_archive(ParallelSlot * slot,ArchiveHandle * AH)777 set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
778 {
779 #ifdef WIN32
780 EnterCriticalSection(&signal_info_lock);
781 #endif
782
783 slot->args->AH = AH;
784
785 #ifdef WIN32
786 LeaveCriticalSection(&signal_info_lock);
787 #endif
788 }
789
790
791 /*
792 * This function is called by both Unix and Windows variants to set up
793 * and run a worker process. Caller should exit the process (or thread)
794 * upon return.
795 */
796 static void
RunWorker(ArchiveHandle * AH,ParallelSlot * slot)797 RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
798 {
799 int pipefd[2];
800
801 /* fetch child ends of pipes */
802 pipefd[PIPE_READ] = slot->pipeRevRead;
803 pipefd[PIPE_WRITE] = slot->pipeRevWrite;
804
805 /*
806 * Clone the archive so that we have our own state to work with, and in
807 * particular our own database connection.
808 *
809 * We clone on Unix as well as Windows, even though technically we don't
810 * need to because fork() gives us a copy in our own address space
811 * already. But CloneArchive resets the state information and also clones
812 * the database connection which both seem kinda helpful.
813 */
814 AH = CloneArchive(AH);
815
816 /* Remember cloned archive where signal handler can find it */
817 set_cancel_slot_archive(slot, AH);
818
819 /*
820 * Call the setup worker function that's defined in the ArchiveHandle.
821 */
822 (AH->SetupWorkerPtr) ((Archive *) AH);
823
824 /*
825 * Execute commands until done.
826 */
827 WaitForCommands(AH, pipefd);
828
829 /*
830 * Disconnect from database and clean up.
831 */
832 set_cancel_slot_archive(slot, NULL);
833 DisconnectDatabase(&(AH->public));
834 DeCloneArchive(AH);
835 }
836
837 /*
838 * Thread base function for Windows
839 */
840 #ifdef WIN32
841 static unsigned __stdcall
init_spawned_worker_win32(WorkerInfo * wi)842 init_spawned_worker_win32(WorkerInfo *wi)
843 {
844 ArchiveHandle *AH = wi->AH;
845 ParallelSlot *slot = wi->slot;
846
847 /* Don't need WorkerInfo anymore */
848 free(wi);
849
850 /* Run the worker ... */
851 RunWorker(AH, slot);
852
853 /* Exit the thread */
854 _endthreadex(0);
855 return 0;
856 }
857 #endif /* WIN32 */
858
859 /*
860 * This function starts a parallel dump or restore by spawning off the worker
861 * processes. For Windows, it creates a number of threads; on Unix the
862 * workers are created with fork().
863 */
864 ParallelState *
ParallelBackupStart(ArchiveHandle * AH)865 ParallelBackupStart(ArchiveHandle *AH)
866 {
867 ParallelState *pstate;
868 int i;
869 const size_t slotSize = AH->public.numWorkers * sizeof(ParallelSlot);
870
871 Assert(AH->public.numWorkers > 0);
872
873 pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
874
875 pstate->numWorkers = AH->public.numWorkers;
876 pstate->parallelSlot = NULL;
877
878 if (AH->public.numWorkers == 1)
879 return pstate;
880
881 /* Create status array, being sure to initialize all fields to 0 */
882 pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize);
883 memset((void *) pstate->parallelSlot, 0, slotSize);
884
885 #ifdef WIN32
886 /* Make fmtId() and fmtQualifiedId() use thread-local storage */
887 getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
888 #endif
889
890 /*
891 * Set the pstate in shutdown_info, to tell the exit handler that it must
892 * clean up workers as well as the main database connection. But we don't
893 * set this in signal_info yet, because we don't want child processes to
894 * inherit non-NULL signal_info.pstate.
895 */
896 shutdown_info.pstate = pstate;
897
898 /*
899 * Temporarily disable query cancellation on the master connection. This
900 * ensures that child processes won't inherit valid AH->connCancel
901 * settings and thus won't try to issue cancels against the master's
902 * connection. No harm is done if we fail while it's disabled, because
903 * the master connection is idle at this point anyway.
904 */
905 set_archive_cancel_info(AH, NULL);
906
907 /* Ensure stdio state is quiesced before forking */
908 fflush(NULL);
909
910 /* Create desired number of workers */
911 for (i = 0; i < pstate->numWorkers; i++)
912 {
913 #ifdef WIN32
914 WorkerInfo *wi;
915 uintptr_t handle;
916 #else
917 pid_t pid;
918 #endif
919 ParallelSlot *slot = &(pstate->parallelSlot[i]);
920 int pipeMW[2],
921 pipeWM[2];
922
923 slot->args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
924 slot->args->AH = NULL;
925 slot->args->te = NULL;
926
927 /* Create communication pipes for this worker */
928 if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
929 exit_horribly(modulename,
930 "could not create communication channels: %s\n",
931 strerror(errno));
932
933 /* master's ends of the pipes */
934 slot->pipeRead = pipeWM[PIPE_READ];
935 slot->pipeWrite = pipeMW[PIPE_WRITE];
936 /* child's ends of the pipes */
937 slot->pipeRevRead = pipeMW[PIPE_READ];
938 slot->pipeRevWrite = pipeWM[PIPE_WRITE];
939
940 #ifdef WIN32
941 /* Create transient structure to pass args to worker function */
942 wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
943
944 wi->AH = AH;
945 wi->slot = slot;
946
947 handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
948 wi, 0, &(slot->threadId));
949 slot->hThread = handle;
950 slot->workerStatus = WRKR_IDLE;
951 #else /* !WIN32 */
952 pid = fork();
953 if (pid == 0)
954 {
955 /* we are the worker */
956 int j;
957
958 /* this is needed for GetMyPSlot() */
959 slot->pid = getpid();
960
961 /* instruct signal handler that we're in a worker now */
962 signal_info.am_worker = true;
963
964 /* close read end of Worker -> Master */
965 closesocket(pipeWM[PIPE_READ]);
966 /* close write end of Master -> Worker */
967 closesocket(pipeMW[PIPE_WRITE]);
968
969 /*
970 * Close all inherited fds for communication of the master with
971 * previously-forked workers.
972 */
973 for (j = 0; j < i; j++)
974 {
975 closesocket(pstate->parallelSlot[j].pipeRead);
976 closesocket(pstate->parallelSlot[j].pipeWrite);
977 }
978
979 /* Run the worker ... */
980 RunWorker(AH, slot);
981
982 /* We can just exit(0) when done */
983 exit(0);
984 }
985 else if (pid < 0)
986 {
987 /* fork failed */
988 exit_horribly(modulename,
989 "could not create worker process: %s\n",
990 strerror(errno));
991 }
992
993 /* In Master after successful fork */
994 slot->pid = pid;
995 slot->workerStatus = WRKR_IDLE;
996
997 /* close read end of Master -> Worker */
998 closesocket(pipeMW[PIPE_READ]);
999 /* close write end of Worker -> Master */
1000 closesocket(pipeWM[PIPE_WRITE]);
1001 #endif /* WIN32 */
1002 }
1003
1004 /*
1005 * Having forked off the workers, disable SIGPIPE so that master isn't
1006 * killed if it tries to send a command to a dead worker. We don't want
1007 * the workers to inherit this setting, though.
1008 */
1009 #ifndef WIN32
1010 pqsignal(SIGPIPE, SIG_IGN);
1011 #endif
1012
1013 /*
1014 * Re-establish query cancellation on the master connection.
1015 */
1016 set_archive_cancel_info(AH, AH->connection);
1017
1018 /*
1019 * Tell the cancel signal handler to forward signals to worker processes,
1020 * too. (As with query cancel, we did not need this earlier because the
1021 * workers have not yet been given anything to do; if we die before this
1022 * point, any already-started workers will see EOF and quit promptly.)
1023 */
1024 set_cancel_pstate(pstate);
1025
1026 return pstate;
1027 }
1028
1029 /*
1030 * Close down a parallel dump or restore.
1031 */
1032 void
ParallelBackupEnd(ArchiveHandle * AH,ParallelState * pstate)1033 ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
1034 {
1035 int i;
1036
1037 /* No work if non-parallel */
1038 if (pstate->numWorkers == 1)
1039 return;
1040
1041 /* There should not be any unfinished jobs */
1042 Assert(IsEveryWorkerIdle(pstate));
1043
1044 /* Close the sockets so that the workers know they can exit */
1045 for (i = 0; i < pstate->numWorkers; i++)
1046 {
1047 closesocket(pstate->parallelSlot[i].pipeRead);
1048 closesocket(pstate->parallelSlot[i].pipeWrite);
1049 }
1050
1051 /* Wait for them to exit */
1052 WaitForTerminatingWorkers(pstate);
1053
1054 /*
1055 * Unlink pstate from shutdown_info, so the exit handler will not try to
1056 * use it; and likewise unlink from signal_info.
1057 */
1058 shutdown_info.pstate = NULL;
1059 set_cancel_pstate(NULL);
1060
1061 /* Release state (mere neatnik-ism, since we're about to terminate) */
1062 free(pstate->parallelSlot);
1063 free(pstate);
1064 }
1065
1066 /*
1067 * Dispatch a job to some free worker (caller must ensure there is one!)
1068 *
1069 * te is the TocEntry to be processed, act is the action to be taken on it.
1070 */
1071 void
DispatchJobForTocEntry(ArchiveHandle * AH,ParallelState * pstate,TocEntry * te,T_Action act)1072 DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te,
1073 T_Action act)
1074 {
1075 int worker;
1076 char *arg;
1077
1078 /* our caller makes sure that at least one worker is idle */
1079 worker = GetIdleWorker(pstate);
1080 Assert(worker != NO_SLOT);
1081
1082 /* Construct and send command string */
1083 arg = (AH->MasterStartParallelItemPtr) (AH, te, act);
1084
1085 sendMessageToWorker(pstate, worker, arg);
1086
1087 /* XXX aren't we leaking string here? (no, because it's static. Ick.) */
1088
1089 /* Remember worker is busy, and which TocEntry it's working on */
1090 pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
1091 pstate->parallelSlot[worker].args->te = te;
1092 }
1093
1094 /*
1095 * Find an idle worker and return its slot number.
1096 * Return NO_SLOT if none are idle.
1097 */
1098 int
GetIdleWorker(ParallelState * pstate)1099 GetIdleWorker(ParallelState *pstate)
1100 {
1101 int i;
1102
1103 for (i = 0; i < pstate->numWorkers; i++)
1104 {
1105 if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
1106 return i;
1107 }
1108 return NO_SLOT;
1109 }
1110
1111 /*
1112 * Return true iff no worker is running.
1113 */
1114 static bool
HasEveryWorkerTerminated(ParallelState * pstate)1115 HasEveryWorkerTerminated(ParallelState *pstate)
1116 {
1117 int i;
1118
1119 for (i = 0; i < pstate->numWorkers; i++)
1120 {
1121 if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1122 return false;
1123 }
1124 return true;
1125 }
1126
1127 /*
1128 * Return true iff every worker is in the WRKR_IDLE state.
1129 */
1130 bool
IsEveryWorkerIdle(ParallelState * pstate)1131 IsEveryWorkerIdle(ParallelState *pstate)
1132 {
1133 int i;
1134
1135 for (i = 0; i < pstate->numWorkers; i++)
1136 {
1137 if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
1138 return false;
1139 }
1140 return true;
1141 }
1142
1143 /*
1144 * Acquire lock on a table to be dumped by a worker process.
1145 *
1146 * The master process is already holding an ACCESS SHARE lock. Ordinarily
1147 * it's no problem for a worker to get one too, but if anything else besides
1148 * pg_dump is running, there's a possible deadlock:
1149 *
1150 * 1) Master dumps the schema and locks all tables in ACCESS SHARE mode.
1151 * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
1152 * because the master holds a conflicting ACCESS SHARE lock).
1153 * 3) A worker process also requests an ACCESS SHARE lock to read the table.
1154 * The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
1155 * 4) Now we have a deadlock, since the master is effectively waiting for
1156 * the worker. The server cannot detect that, however.
1157 *
1158 * To prevent an infinite wait, prior to touching a table in a worker, request
1159 * a lock in ACCESS SHARE mode but with NOWAIT. If we don't get the lock,
1160 * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
1161 * so we have a deadlock. We must fail the backup in that case.
1162 */
1163 static void
lockTableForWorker(ArchiveHandle * AH,TocEntry * te)1164 lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
1165 {
1166 const char *qualId;
1167 PQExpBuffer query;
1168 PGresult *res;
1169
1170 /* Nothing to do for BLOBS */
1171 if (strcmp(te->desc, "BLOBS") == 0)
1172 return;
1173
1174 query = createPQExpBuffer();
1175
1176 qualId = fmtQualifiedId(AH->public.remoteVersion, te->namespace, te->tag);
1177
1178 appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
1179 qualId);
1180
1181 res = PQexec(AH->connection, query->data);
1182
1183 if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
1184 exit_horribly(modulename,
1185 "could not obtain lock on relation \"%s\"\n"
1186 "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1187 "on the table after the pg_dump parent process had gotten the "
1188 "initial ACCESS SHARE lock on the table.\n", qualId);
1189
1190 PQclear(res);
1191 destroyPQExpBuffer(query);
1192 }
1193
1194 /*
1195 * WaitForCommands: main routine for a worker process.
1196 *
1197 * Read and execute commands from the master until we see EOF on the pipe.
1198 */
1199 static void
WaitForCommands(ArchiveHandle * AH,int pipefd[2])1200 WaitForCommands(ArchiveHandle *AH, int pipefd[2])
1201 {
1202 char *command;
1203 DumpId dumpId;
1204 int nBytes;
1205 char *str;
1206 TocEntry *te;
1207
1208 for (;;)
1209 {
1210 if (!(command = getMessageFromMaster(pipefd)))
1211 {
1212 /* EOF, so done */
1213 return;
1214 }
1215
1216 if (messageStartsWith(command, "DUMP "))
1217 {
1218 /* Decode the command */
1219 sscanf(command + strlen("DUMP "), "%d%n", &dumpId, &nBytes);
1220 Assert(nBytes == strlen(command) - strlen("DUMP "));
1221 te = getTocEntryByDumpId(AH, dumpId);
1222 Assert(te != NULL);
1223
1224 /* Acquire lock on this table within the worker's session */
1225 lockTableForWorker(AH, te);
1226
1227 /* Perform the dump command */
1228 str = (AH->WorkerJobDumpPtr) (AH, te);
1229
1230 /* Return status to master */
1231 sendMessageToMaster(pipefd, str);
1232
1233 /* we are responsible for freeing the status string */
1234 free(str);
1235 }
1236 else if (messageStartsWith(command, "RESTORE "))
1237 {
1238 /* Decode the command */
1239 sscanf(command + strlen("RESTORE "), "%d%n", &dumpId, &nBytes);
1240 Assert(nBytes == strlen(command) - strlen("RESTORE "));
1241 te = getTocEntryByDumpId(AH, dumpId);
1242 Assert(te != NULL);
1243
1244 /* Perform the restore command */
1245 str = (AH->WorkerJobRestorePtr) (AH, te);
1246
1247 /* Return status to master */
1248 sendMessageToMaster(pipefd, str);
1249
1250 /* we are responsible for freeing the status string */
1251 free(str);
1252 }
1253 else
1254 exit_horribly(modulename,
1255 "unrecognized command received from master: \"%s\"\n",
1256 command);
1257
1258 /* command was pg_malloc'd and we are responsible for free()ing it. */
1259 free(command);
1260 }
1261 }
1262
1263 /*
1264 * Check for status messages from workers.
1265 *
1266 * If do_wait is true, wait to get a status message; otherwise, just return
1267 * immediately if there is none available.
1268 *
1269 * When we get a status message, we let MasterEndParallelItemPtr process it,
1270 * then save the resulting status code and switch the worker's state to
1271 * WRKR_FINISHED. Later, caller must call ReapWorkerStatus() to verify
1272 * that the status was "OK" and push the worker back to IDLE state.
1273 *
1274 * XXX Rube Goldberg would be proud of this API, but no one else should be.
1275 *
1276 * XXX is it worth checking for more than one status message per call?
1277 * It seems somewhat unlikely that multiple workers would finish at exactly
1278 * the same time.
1279 */
1280 void
ListenToWorkers(ArchiveHandle * AH,ParallelState * pstate,bool do_wait)1281 ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
1282 {
1283 int worker;
1284 char *msg;
1285
1286 /* Try to collect a status message */
1287 msg = getMessageFromWorker(pstate, do_wait, &worker);
1288
1289 if (!msg)
1290 {
1291 /* If do_wait is true, we must have detected EOF on some socket */
1292 if (do_wait)
1293 exit_horribly(modulename, "a worker process died unexpectedly\n");
1294 return;
1295 }
1296
1297 /* Process it and update our idea of the worker's status */
1298 if (messageStartsWith(msg, "OK "))
1299 {
1300 TocEntry *te = pstate->parallelSlot[worker].args->te;
1301 char *statusString;
1302
1303 if (messageStartsWith(msg, "OK RESTORE "))
1304 {
1305 statusString = msg + strlen("OK RESTORE ");
1306 pstate->parallelSlot[worker].status =
1307 (AH->MasterEndParallelItemPtr)
1308 (AH, te, statusString, ACT_RESTORE);
1309 }
1310 else if (messageStartsWith(msg, "OK DUMP "))
1311 {
1312 statusString = msg + strlen("OK DUMP ");
1313 pstate->parallelSlot[worker].status =
1314 (AH->MasterEndParallelItemPtr)
1315 (AH, te, statusString, ACT_DUMP);
1316 }
1317 else
1318 exit_horribly(modulename,
1319 "invalid message received from worker: \"%s\"\n",
1320 msg);
1321 pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED;
1322 }
1323 else
1324 exit_horribly(modulename,
1325 "invalid message received from worker: \"%s\"\n",
1326 msg);
1327
1328 /* Free the string returned from getMessageFromWorker */
1329 free(msg);
1330 }
1331
1332 /*
1333 * Check to see if any worker is in WRKR_FINISHED state. If so,
1334 * return its command status code into *status, reset it to IDLE state,
1335 * and return its slot number. Otherwise return NO_SLOT.
1336 *
1337 * This function is executed in the master process.
1338 */
1339 int
ReapWorkerStatus(ParallelState * pstate,int * status)1340 ReapWorkerStatus(ParallelState *pstate, int *status)
1341 {
1342 int i;
1343
1344 for (i = 0; i < pstate->numWorkers; i++)
1345 {
1346 if (pstate->parallelSlot[i].workerStatus == WRKR_FINISHED)
1347 {
1348 *status = pstate->parallelSlot[i].status;
1349 pstate->parallelSlot[i].status = 0;
1350 pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
1351 return i;
1352 }
1353 }
1354 return NO_SLOT;
1355 }
1356
1357 /*
1358 * Wait, if necessary, until we have at least one idle worker.
1359 * Reap worker status as necessary to move FINISHED workers to IDLE state.
1360 *
1361 * We assume that no extra processing is required when reaping a finished
1362 * command, except for checking that the status was OK (zero).
1363 * Caution: that assumption means that this function can only be used in
1364 * parallel dump, not parallel restore, because the latter has a more
1365 * complex set of rules about handling status.
1366 *
1367 * This function is executed in the master process.
1368 */
1369 void
EnsureIdleWorker(ArchiveHandle * AH,ParallelState * pstate)1370 EnsureIdleWorker(ArchiveHandle *AH, ParallelState *pstate)
1371 {
1372 int ret_worker;
1373 int work_status;
1374
1375 for (;;)
1376 {
1377 int nTerm = 0;
1378
1379 while ((ret_worker = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
1380 {
1381 if (work_status != 0)
1382 exit_horribly(modulename, "error processing a parallel work item\n");
1383
1384 nTerm++;
1385 }
1386
1387 /*
1388 * We need to make sure that we have an idle worker before dispatching
1389 * the next item. If nTerm > 0 we already have that (quick check).
1390 */
1391 if (nTerm > 0)
1392 return;
1393
1394 /* explicit check for an idle worker */
1395 if (GetIdleWorker(pstate) != NO_SLOT)
1396 return;
1397
1398 /*
1399 * If we have no idle worker, read the result of one or more workers
1400 * and loop the loop to call ReapWorkerStatus() on them
1401 */
1402 ListenToWorkers(AH, pstate, true);
1403 }
1404 }
1405
1406 /*
1407 * Wait for all workers to be idle.
1408 * Reap worker status as necessary to move FINISHED workers to IDLE state.
1409 *
1410 * We assume that no extra processing is required when reaping a finished
1411 * command, except for checking that the status was OK (zero).
1412 * Caution: that assumption means that this function can only be used in
1413 * parallel dump, not parallel restore, because the latter has a more
1414 * complex set of rules about handling status.
1415 *
1416 * This function is executed in the master process.
1417 */
1418 void
EnsureWorkersFinished(ArchiveHandle * AH,ParallelState * pstate)1419 EnsureWorkersFinished(ArchiveHandle *AH, ParallelState *pstate)
1420 {
1421 int work_status;
1422
1423 if (!pstate || pstate->numWorkers == 1)
1424 return;
1425
1426 /* Waiting for the remaining worker processes to finish */
1427 while (!IsEveryWorkerIdle(pstate))
1428 {
1429 if (ReapWorkerStatus(pstate, &work_status) == NO_SLOT)
1430 ListenToWorkers(AH, pstate, true);
1431 else if (work_status != 0)
1432 exit_horribly(modulename,
1433 "error processing a parallel work item\n");
1434 }
1435 }
1436
1437 /*
1438 * Read one command message from the master, blocking if necessary
1439 * until one is available, and return it as a malloc'd string.
1440 * On EOF, return NULL.
1441 *
1442 * This function is executed in worker processes.
1443 */
1444 static char *
getMessageFromMaster(int pipefd[2])1445 getMessageFromMaster(int pipefd[2])
1446 {
1447 return readMessageFromPipe(pipefd[PIPE_READ]);
1448 }
1449
1450 /*
1451 * Send a status message to the master.
1452 *
1453 * This function is executed in worker processes.
1454 */
1455 static void
sendMessageToMaster(int pipefd[2],const char * str)1456 sendMessageToMaster(int pipefd[2], const char *str)
1457 {
1458 int len = strlen(str) + 1;
1459
1460 if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
1461 exit_horribly(modulename,
1462 "could not write to the communication channel: %s\n",
1463 strerror(errno));
1464 }
1465
1466 /*
1467 * Wait until some descriptor in "workerset" becomes readable.
1468 * Returns -1 on error, else the number of readable descriptors.
1469 */
1470 static int
select_loop(int maxFd,fd_set * workerset)1471 select_loop(int maxFd, fd_set *workerset)
1472 {
1473 int i;
1474 fd_set saveSet = *workerset;
1475
1476 for (;;)
1477 {
1478 *workerset = saveSet;
1479 i = select(maxFd + 1, workerset, NULL, NULL, NULL);
1480
1481 #ifndef WIN32
1482 if (i < 0 && errno == EINTR)
1483 continue;
1484 #else
1485 if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1486 continue;
1487 #endif
1488 break;
1489 }
1490
1491 return i;
1492 }
1493
1494
1495 /*
1496 * Check for messages from worker processes.
1497 *
1498 * If a message is available, return it as a malloc'd string, and put the
1499 * index of the sending worker in *worker.
1500 *
1501 * If nothing is available, wait if "do_wait" is true, else return NULL.
1502 *
1503 * If we detect EOF on any socket, we'll return NULL. It's not great that
1504 * that's hard to distinguish from the no-data-available case, but for now
1505 * our one caller is okay with that.
1506 *
1507 * This function is executed in the master process.
1508 */
1509 static char *
getMessageFromWorker(ParallelState * pstate,bool do_wait,int * worker)1510 getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
1511 {
1512 int i;
1513 fd_set workerset;
1514 int maxFd = -1;
1515 struct timeval nowait = {0, 0};
1516
1517 /* construct bitmap of socket descriptors for select() */
1518 FD_ZERO(&workerset);
1519 for (i = 0; i < pstate->numWorkers; i++)
1520 {
1521 if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1522 continue;
1523 FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
1524 if (pstate->parallelSlot[i].pipeRead > maxFd)
1525 maxFd = pstate->parallelSlot[i].pipeRead;
1526 }
1527
1528 if (do_wait)
1529 {
1530 i = select_loop(maxFd, &workerset);
1531 Assert(i != 0);
1532 }
1533 else
1534 {
1535 if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1536 return NULL;
1537 }
1538
1539 if (i < 0)
1540 exit_horribly(modulename, "select() failed: %s\n", strerror(errno));
1541
1542 for (i = 0; i < pstate->numWorkers; i++)
1543 {
1544 char *msg;
1545
1546 if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1547 continue;
1548 if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
1549 continue;
1550
1551 /*
1552 * Read the message if any. If the socket is ready because of EOF,
1553 * we'll return NULL instead (and the socket will stay ready, so the
1554 * condition will persist).
1555 *
1556 * Note: because this is a blocking read, we'll wait if only part of
1557 * the message is available. Waiting a long time would be bad, but
1558 * since worker status messages are short and are always sent in one
1559 * operation, it shouldn't be a problem in practice.
1560 */
1561 msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
1562 *worker = i;
1563 return msg;
1564 }
1565 Assert(false);
1566 return NULL;
1567 }
1568
1569 /*
1570 * Send a command message to the specified worker process.
1571 *
1572 * This function is executed in the master process.
1573 */
1574 static void
sendMessageToWorker(ParallelState * pstate,int worker,const char * str)1575 sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
1576 {
1577 int len = strlen(str) + 1;
1578
1579 if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
1580 {
1581 exit_horribly(modulename,
1582 "could not write to the communication channel: %s\n",
1583 strerror(errno));
1584 }
1585 }
1586
1587 /*
1588 * Read one message from the specified pipe (fd), blocking if necessary
1589 * until one is available, and return it as a malloc'd string.
1590 * On EOF, return NULL.
1591 *
1592 * A "message" on the channel is just a null-terminated string.
1593 */
1594 static char *
readMessageFromPipe(int fd)1595 readMessageFromPipe(int fd)
1596 {
1597 char *msg;
1598 int msgsize,
1599 bufsize;
1600 int ret;
1601
1602 /*
1603 * In theory, if we let piperead() read multiple bytes, it might give us
1604 * back fragments of multiple messages. (That can't actually occur, since
1605 * neither master nor workers send more than one message without waiting
1606 * for a reply, but we don't wish to assume that here.) For simplicity,
1607 * read a byte at a time until we get the terminating '\0'. This method
1608 * is a bit inefficient, but since this is only used for relatively short
1609 * command and status strings, it shouldn't matter.
1610 */
1611 bufsize = 64; /* could be any number */
1612 msg = (char *) pg_malloc(bufsize);
1613 msgsize = 0;
1614 for (;;)
1615 {
1616 Assert(msgsize < bufsize);
1617 ret = piperead(fd, msg + msgsize, 1);
1618 if (ret <= 0)
1619 break; /* error or connection closure */
1620
1621 Assert(ret == 1);
1622
1623 if (msg[msgsize] == '\0')
1624 return msg; /* collected whole message */
1625
1626 msgsize++;
1627 if (msgsize == bufsize) /* enlarge buffer if needed */
1628 {
1629 bufsize += 16; /* could be any number */
1630 msg = (char *) pg_realloc(msg, bufsize);
1631 }
1632 }
1633
1634 /* Other end has closed the connection */
1635 pg_free(msg);
1636 return NULL;
1637 }
1638
1639 #ifdef WIN32
1640
1641 /*
1642 * This is a replacement version of pipe(2) for Windows which allows the pipe
1643 * handles to be used in select().
1644 *
1645 * Reads and writes on the pipe must go through piperead()/pipewrite().
1646 *
1647 * For consistency with Unix we declare the returned handles as "int".
1648 * This is okay even on WIN64 because system handles are not more than
1649 * 32 bits wide, but we do have to do some casting.
1650 */
1651 static int
pgpipe(int handles[2])1652 pgpipe(int handles[2])
1653 {
1654 pgsocket s,
1655 tmp_sock;
1656 struct sockaddr_in serv_addr;
1657 int len = sizeof(serv_addr);
1658
1659 /* We have to use the Unix socket invalid file descriptor value here. */
1660 handles[0] = handles[1] = -1;
1661
1662 /*
1663 * setup listen socket
1664 */
1665 if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1666 {
1667 write_msg(modulename, "pgpipe: could not create socket: error code %d\n",
1668 WSAGetLastError());
1669 return -1;
1670 }
1671
1672 memset((void *) &serv_addr, 0, sizeof(serv_addr));
1673 serv_addr.sin_family = AF_INET;
1674 serv_addr.sin_port = htons(0);
1675 serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1676 if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1677 {
1678 write_msg(modulename, "pgpipe: could not bind: error code %d\n",
1679 WSAGetLastError());
1680 closesocket(s);
1681 return -1;
1682 }
1683 if (listen(s, 1) == SOCKET_ERROR)
1684 {
1685 write_msg(modulename, "pgpipe: could not listen: error code %d\n",
1686 WSAGetLastError());
1687 closesocket(s);
1688 return -1;
1689 }
1690 if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
1691 {
1692 write_msg(modulename, "pgpipe: getsockname() failed: error code %d\n",
1693 WSAGetLastError());
1694 closesocket(s);
1695 return -1;
1696 }
1697
1698 /*
1699 * setup pipe handles
1700 */
1701 if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1702 {
1703 write_msg(modulename, "pgpipe: could not create second socket: error code %d\n",
1704 WSAGetLastError());
1705 closesocket(s);
1706 return -1;
1707 }
1708 handles[1] = (int) tmp_sock;
1709
1710 if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1711 {
1712 write_msg(modulename, "pgpipe: could not connect socket: error code %d\n",
1713 WSAGetLastError());
1714 closesocket(handles[1]);
1715 handles[1] = -1;
1716 closesocket(s);
1717 return -1;
1718 }
1719 if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
1720 {
1721 write_msg(modulename, "pgpipe: could not accept connection: error code %d\n",
1722 WSAGetLastError());
1723 closesocket(handles[1]);
1724 handles[1] = -1;
1725 closesocket(s);
1726 return -1;
1727 }
1728 handles[0] = (int) tmp_sock;
1729
1730 closesocket(s);
1731 return 0;
1732 }
1733
1734 /*
1735 * Windows implementation of reading from a pipe.
1736 */
1737 static int
piperead(int s,char * buf,int len)1738 piperead(int s, char *buf, int len)
1739 {
1740 int ret = recv(s, buf, len, 0);
1741
1742 if (ret < 0 && WSAGetLastError() == WSAECONNRESET)
1743 {
1744 /* EOF on the pipe! */
1745 ret = 0;
1746 }
1747 return ret;
1748 }
1749
1750 #endif /* WIN32 */
1751