1 /*-------------------------------------------------------------------------
2  *
3  * parallel.c
4  *
5  *	Parallel support for pg_dump and pg_restore
6  *
7  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *		src/bin/pg_dump/parallel.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 /*
17  * Parallel operation works like this:
18  *
19  * The original, master process calls ParallelBackupStart(), which forks off
20  * the desired number of worker processes, which each enter WaitForCommands().
21  *
22  * The master process dispatches an individual work item to one of the worker
23  * processes in DispatchJobForTocEntry().  That calls
24  * AH->MasterStartParallelItemPtr, a routine of the output format.  This
25  * function's arguments are the parents archive handle AH (containing the full
26  * catalog information), the TocEntry that the worker should work on and a
27  * T_Action value indicating whether this is a backup or a restore task.  The
28  * function simply converts the TocEntry assignment into a command string that
29  * is then sent over to the worker process. In the simplest case that would be
30  * something like "DUMP 1234", with 1234 being the TocEntry id.
31  *
32  * The worker process receives and decodes the command and passes it to the
33  * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
34  * which are routines of the current archive format.  That routine performs
35  * the required action (dump or restore) and returns a malloc'd status string.
36  * The status string is passed back to the master where it is interpreted by
37  * AH->MasterEndParallelItemPtr, another format-specific routine.  That
38  * function can update state or catalog information on the master's side,
39  * depending on the reply from the worker process.  In the end it returns a
40  * status code, which is 0 for successful execution.
41  *
42  * Remember that we have forked off the workers only after we have read in
43  * the catalog.  That's why our worker processes can also access the catalog
44  * information.  (In the Windows case, the workers are threads in the same
45  * process.  To avoid problems, they work with cloned copies of the Archive
46  * data structure; see RunWorker().)
47  *
48  * In the master process, the workerStatus field for each worker has one of
49  * the following values:
50  *		WRKR_IDLE: it's waiting for a command
51  *		WRKR_WORKING: it's been sent a command
52  *		WRKR_FINISHED: it's returned a result
53  *		WRKR_TERMINATED: process ended (or not started yet)
54  * The FINISHED state indicates that the worker is idle, but we've not yet
55  * dealt with the status code it returned from the prior command.
56  * ReapWorkerStatus() extracts the unhandled command status value and sets
57  * the workerStatus back to WRKR_IDLE.
58  */
59 
60 #include "postgres_fe.h"
61 
62 #ifdef HAVE_SYS_SELECT_H
63 #include <sys/select.h>
64 #endif
65 
66 #include "parallel.h"
67 #include "pg_backup_utils.h"
68 #include "fe_utils/string_utils.h"
69 
70 #ifndef WIN32
71 #include <sys/types.h>
72 #include <sys/wait.h>
73 #include "signal.h"
74 #include <unistd.h>
75 #include <fcntl.h>
76 #endif
77 
78 /* Mnemonic macros for indexing the fd array returned by pipe(2) */
79 #define PIPE_READ							0
80 #define PIPE_WRITE							1
81 
82 #ifdef WIN32
83 
84 /*
85  * Structure to hold info passed by _beginthreadex() to the function it calls
86  * via its single allowed argument.
87  */
88 typedef struct
89 {
90 	ArchiveHandle *AH;			/* master database connection */
91 	ParallelSlot *slot;			/* this worker's parallel slot */
92 } WorkerInfo;
93 
94 /* Windows implementation of pipe access */
95 static int	pgpipe(int handles[2]);
96 static int	piperead(int s, char *buf, int len);
97 #define pipewrite(a,b,c)	send(a,b,c,0)
98 
99 #else							/* !WIN32 */
100 
101 /* Non-Windows implementation of pipe access */
102 #define pgpipe(a)			pipe(a)
103 #define piperead(a,b,c)		read(a,b,c)
104 #define pipewrite(a,b,c)	write(a,b,c)
105 
106 #endif   /* WIN32 */
107 
108 /*
109  * State info for archive_close_connection() shutdown callback.
110  */
111 typedef struct ShutdownInformation
112 {
113 	ParallelState *pstate;
114 	Archive    *AHX;
115 } ShutdownInformation;
116 
117 static ShutdownInformation shutdown_info;
118 
119 /*
120  * State info for signal handling.
121  * We assume signal_info initializes to zeroes.
122  *
123  * On Unix, myAH is the master DB connection in the master process, and the
124  * worker's own connection in worker processes.  On Windows, we have only one
125  * instance of signal_info, so myAH is the master connection and the worker
126  * connections must be dug out of pstate->parallelSlot[].
127  */
128 typedef struct DumpSignalInformation
129 {
130 	ArchiveHandle *myAH;		/* database connection to issue cancel for */
131 	ParallelState *pstate;		/* parallel state, if any */
132 	bool		handler_set;	/* signal handler set up in this process? */
133 #ifndef WIN32
134 	bool		am_worker;		/* am I a worker process? */
135 #endif
136 } DumpSignalInformation;
137 
138 static volatile DumpSignalInformation signal_info;
139 
140 #ifdef WIN32
141 static CRITICAL_SECTION signal_info_lock;
142 #endif
143 
144 /*
145  * Write a simple string to stderr --- must be safe in a signal handler.
146  * We ignore the write() result since there's not much we could do about it.
147  * Certain compilers make that harder than it ought to be.
148  */
149 #define write_stderr(str) \
150 	do { \
151 		const char *str_ = (str); \
152 		int		rc_; \
153 		rc_ = write(fileno(stderr), str_, strlen(str_)); \
154 		(void) rc_; \
155 	} while (0)
156 
157 
158 #ifdef WIN32
159 /* file-scope variables */
160 static DWORD tls_index;
161 
162 /* globally visible variables (needed by exit_nicely) */
163 bool		parallel_init_done = false;
164 DWORD		mainThreadId;
165 #endif   /* WIN32 */
166 
167 static const char *modulename = gettext_noop("parallel archiver");
168 
169 /* Local function prototypes */
170 static ParallelSlot *GetMyPSlot(ParallelState *pstate);
171 static void archive_close_connection(int code, void *arg);
172 static void ShutdownWorkersHard(ParallelState *pstate);
173 static void WaitForTerminatingWorkers(ParallelState *pstate);
174 static void setup_cancel_handler(void);
175 static void set_cancel_pstate(ParallelState *pstate);
176 static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
177 static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
178 static bool HasEveryWorkerTerminated(ParallelState *pstate);
179 static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
180 static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
181 static char *getMessageFromMaster(int pipefd[2]);
182 static void sendMessageToMaster(int pipefd[2], const char *str);
183 static int	select_loop(int maxFd, fd_set *workerset);
184 static char *getMessageFromWorker(ParallelState *pstate,
185 					 bool do_wait, int *worker);
186 static void sendMessageToWorker(ParallelState *pstate,
187 					int worker, const char *str);
188 static char *readMessageFromPipe(int fd);
189 
190 #define messageStartsWith(msg, prefix) \
191 	(strncmp(msg, prefix, strlen(prefix)) == 0)
192 #define messageEquals(msg, pattern) \
193 	(strcmp(msg, pattern) == 0)
194 
195 
196 /*
197  * Initialize parallel dump support --- should be called early in process
198  * startup.  (Currently, this is called whether or not we intend parallel
199  * activity.)
200  */
201 void
init_parallel_dump_utils(void)202 init_parallel_dump_utils(void)
203 {
204 #ifdef WIN32
205 	if (!parallel_init_done)
206 	{
207 		WSADATA		wsaData;
208 		int			err;
209 
210 		/* Prepare for threaded operation */
211 		tls_index = TlsAlloc();
212 		mainThreadId = GetCurrentThreadId();
213 
214 		/* Initialize socket access */
215 		err = WSAStartup(MAKEWORD(2, 2), &wsaData);
216 		if (err != 0)
217 		{
218 			fprintf(stderr, _("%s: WSAStartup failed: %d\n"), progname, err);
219 			exit_nicely(1);
220 		}
221 
222 		parallel_init_done = true;
223 	}
224 #endif
225 }
226 
227 /*
228  * Find the ParallelSlot for the current worker process or thread.
229  *
230  * Returns NULL if no matching slot is found (this implies we're the master).
231  */
232 static ParallelSlot *
GetMyPSlot(ParallelState * pstate)233 GetMyPSlot(ParallelState *pstate)
234 {
235 	int			i;
236 
237 	for (i = 0; i < pstate->numWorkers; i++)
238 	{
239 #ifdef WIN32
240 		if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
241 #else
242 		if (pstate->parallelSlot[i].pid == getpid())
243 #endif
244 			return &(pstate->parallelSlot[i]);
245 	}
246 
247 	return NULL;
248 }
249 
250 /*
251  * A thread-local version of getLocalPQExpBuffer().
252  *
253  * Non-reentrant but reduces memory leakage: we'll consume one buffer per
254  * thread, which is much better than one per fmtId/fmtQualifiedId call.
255  */
256 #ifdef WIN32
257 static PQExpBuffer
getThreadLocalPQExpBuffer(void)258 getThreadLocalPQExpBuffer(void)
259 {
260 	/*
261 	 * The Tls code goes awry if we use a static var, so we provide for both
262 	 * static and auto, and omit any use of the static var when using Tls. We
263 	 * rely on TlsGetValue() to return 0 if the value is not yet set.
264 	 */
265 	static PQExpBuffer s_id_return = NULL;
266 	PQExpBuffer id_return;
267 
268 	if (parallel_init_done)
269 		id_return = (PQExpBuffer) TlsGetValue(tls_index);
270 	else
271 		id_return = s_id_return;
272 
273 	if (id_return)				/* first time through? */
274 	{
275 		/* same buffer, just wipe contents */
276 		resetPQExpBuffer(id_return);
277 	}
278 	else
279 	{
280 		/* new buffer */
281 		id_return = createPQExpBuffer();
282 		if (parallel_init_done)
283 			TlsSetValue(tls_index, id_return);
284 		else
285 			s_id_return = id_return;
286 	}
287 
288 	return id_return;
289 }
290 #endif   /* WIN32 */
291 
292 /*
293  * pg_dump and pg_restore call this to register the cleanup handler
294  * as soon as they've created the ArchiveHandle.
295  */
296 void
on_exit_close_archive(Archive * AHX)297 on_exit_close_archive(Archive *AHX)
298 {
299 	shutdown_info.AHX = AHX;
300 	on_exit_nicely(archive_close_connection, &shutdown_info);
301 }
302 
303 /*
304  * on_exit_nicely handler for shutting down database connections and
305  * worker processes cleanly.
306  */
307 static void
archive_close_connection(int code,void * arg)308 archive_close_connection(int code, void *arg)
309 {
310 	ShutdownInformation *si = (ShutdownInformation *) arg;
311 
312 	if (si->pstate)
313 	{
314 		/* In parallel mode, must figure out who we are */
315 		ParallelSlot *slot = GetMyPSlot(si->pstate);
316 
317 		if (!slot)
318 		{
319 			/*
320 			 * We're the master.  Forcibly shut down workers, then close our
321 			 * own database connection, if any.
322 			 */
323 			ShutdownWorkersHard(si->pstate);
324 
325 			if (si->AHX)
326 				DisconnectDatabase(si->AHX);
327 		}
328 		else
329 		{
330 			/*
331 			 * We're a worker.  Shut down our own DB connection if any.  On
332 			 * Windows, we also have to close our communication sockets, to
333 			 * emulate what will happen on Unix when the worker process exits.
334 			 * (Without this, if this is a premature exit, the master would
335 			 * fail to detect it because there would be no EOF condition on
336 			 * the other end of the pipe.)
337 			 */
338 			if (slot->args->AH)
339 				DisconnectDatabase(&(slot->args->AH->public));
340 
341 #ifdef WIN32
342 			closesocket(slot->pipeRevRead);
343 			closesocket(slot->pipeRevWrite);
344 #endif
345 		}
346 	}
347 	else
348 	{
349 		/* Non-parallel operation: just kill the master DB connection */
350 		if (si->AHX)
351 			DisconnectDatabase(si->AHX);
352 	}
353 }
354 
355 /*
356  * Forcibly shut down any remaining workers, waiting for them to finish.
357  *
358  * Note that we don't expect to come here during normal exit (the workers
359  * should be long gone, and the ParallelState too).  We're only here in an
360  * exit_horribly() situation, so intervening to cancel active commands is
361  * appropriate.
362  */
363 static void
ShutdownWorkersHard(ParallelState * pstate)364 ShutdownWorkersHard(ParallelState *pstate)
365 {
366 	int			i;
367 
368 	/*
369 	 * Close our write end of the sockets so that any workers waiting for
370 	 * commands know they can exit.  (Note: some of the pipeWrite fields might
371 	 * still be zero, if we failed to initialize all the workers.  Hence, just
372 	 * ignore errors here.)
373 	 */
374 	for (i = 0; i < pstate->numWorkers; i++)
375 		closesocket(pstate->parallelSlot[i].pipeWrite);
376 
377 	/*
378 	 * Force early termination of any commands currently in progress.
379 	 */
380 #ifndef WIN32
381 	/* On non-Windows, send SIGTERM to each worker process. */
382 	for (i = 0; i < pstate->numWorkers; i++)
383 	{
384 		pid_t		pid = pstate->parallelSlot[i].pid;
385 
386 		if (pid != 0)
387 			kill(pid, SIGTERM);
388 	}
389 #else
390 
391 	/*
392 	 * On Windows, send query cancels directly to the workers' backends.  Use
393 	 * a critical section to ensure worker threads don't change state.
394 	 */
395 	EnterCriticalSection(&signal_info_lock);
396 	for (i = 0; i < pstate->numWorkers; i++)
397 	{
398 		ArchiveHandle *AH = pstate->parallelSlot[i].args->AH;
399 		char		errbuf[1];
400 
401 		if (AH != NULL && AH->connCancel != NULL)
402 			(void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
403 	}
404 	LeaveCriticalSection(&signal_info_lock);
405 #endif
406 
407 	/* Now wait for them to terminate. */
408 	WaitForTerminatingWorkers(pstate);
409 }
410 
411 /*
412  * Wait for all workers to terminate.
413  */
414 static void
WaitForTerminatingWorkers(ParallelState * pstate)415 WaitForTerminatingWorkers(ParallelState *pstate)
416 {
417 	while (!HasEveryWorkerTerminated(pstate))
418 	{
419 		ParallelSlot *slot = NULL;
420 		int			j;
421 
422 #ifndef WIN32
423 		/* On non-Windows, use wait() to wait for next worker to end */
424 		int			status;
425 		pid_t		pid = wait(&status);
426 
427 		/* Find dead worker's slot, and clear the PID field */
428 		for (j = 0; j < pstate->numWorkers; j++)
429 		{
430 			slot = &(pstate->parallelSlot[j]);
431 			if (slot->pid == pid)
432 			{
433 				slot->pid = 0;
434 				break;
435 			}
436 		}
437 #else							/* WIN32 */
438 		/* On Windows, we must use WaitForMultipleObjects() */
439 		HANDLE	   *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
440 		int			nrun = 0;
441 		DWORD		ret;
442 		uintptr_t	hThread;
443 
444 		for (j = 0; j < pstate->numWorkers; j++)
445 		{
446 			if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
447 			{
448 				lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
449 				nrun++;
450 			}
451 		}
452 		ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
453 		Assert(ret != WAIT_FAILED);
454 		hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
455 		free(lpHandles);
456 
457 		/* Find dead worker's slot, and clear the hThread field */
458 		for (j = 0; j < pstate->numWorkers; j++)
459 		{
460 			slot = &(pstate->parallelSlot[j]);
461 			if (slot->hThread == hThread)
462 			{
463 				/* For cleanliness, close handles for dead threads */
464 				CloseHandle((HANDLE) slot->hThread);
465 				slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
466 				break;
467 			}
468 		}
469 #endif   /* WIN32 */
470 
471 		/* On all platforms, update workerStatus as well */
472 		Assert(j < pstate->numWorkers);
473 		slot->workerStatus = WRKR_TERMINATED;
474 	}
475 }
476 
477 
478 /*
479  * Code for responding to cancel interrupts (SIGINT, control-C, etc)
480  *
481  * This doesn't quite belong in this module, but it needs access to the
482  * ParallelState data, so there's not really a better place either.
483  *
484  * When we get a cancel interrupt, we could just die, but in pg_restore that
485  * could leave a SQL command (e.g., CREATE INDEX on a large table) running
486  * for a long time.  Instead, we try to send a cancel request and then die.
487  * pg_dump probably doesn't really need this, but we might as well use it
488  * there too.  Note that sending the cancel directly from the signal handler
489  * is safe because PQcancel() is written to make it so.
490  *
491  * In parallel operation on Unix, each process is responsible for canceling
492  * its own connection (this must be so because nobody else has access to it).
493  * Furthermore, the master process should attempt to forward its signal to
494  * each child.  In simple manual use of pg_dump/pg_restore, forwarding isn't
495  * needed because typing control-C at the console would deliver SIGINT to
496  * every member of the terminal process group --- but in other scenarios it
497  * might be that only the master gets signaled.
498  *
499  * On Windows, the cancel handler runs in a separate thread, because that's
500  * how SetConsoleCtrlHandler works.  We make it stop worker threads, send
501  * cancels on all active connections, and then return FALSE, which will allow
502  * the process to die.  For safety's sake, we use a critical section to
503  * protect the PGcancel structures against being changed while the signal
504  * thread runs.
505  */
506 
507 #ifndef WIN32
508 
509 /*
510  * Signal handler (Unix only)
511  */
512 static void
sigTermHandler(SIGNAL_ARGS)513 sigTermHandler(SIGNAL_ARGS)
514 {
515 	int			i;
516 	char		errbuf[1];
517 
518 	/*
519 	 * Some platforms allow delivery of new signals to interrupt an active
520 	 * signal handler.  That could muck up our attempt to send PQcancel, so
521 	 * disable the signals that setup_cancel_handler enabled.
522 	 */
523 	pqsignal(SIGINT, SIG_IGN);
524 	pqsignal(SIGTERM, SIG_IGN);
525 	pqsignal(SIGQUIT, SIG_IGN);
526 
527 	/*
528 	 * If we're in the master, forward signal to all workers.  (It seems best
529 	 * to do this before PQcancel; killing the master transaction will result
530 	 * in invalid-snapshot errors from active workers, which maybe we can
531 	 * quiet by killing workers first.)  Ignore any errors.
532 	 */
533 	if (signal_info.pstate != NULL)
534 	{
535 		for (i = 0; i < signal_info.pstate->numWorkers; i++)
536 		{
537 			pid_t		pid = signal_info.pstate->parallelSlot[i].pid;
538 
539 			if (pid != 0)
540 				kill(pid, SIGTERM);
541 		}
542 	}
543 
544 	/*
545 	 * Send QueryCancel if we have a connection to send to.  Ignore errors,
546 	 * there's not much we can do about them anyway.
547 	 */
548 	if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
549 		(void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
550 
551 	/*
552 	 * Report we're quitting, using nothing more complicated than write(2).
553 	 * When in parallel operation, only the master process should do this.
554 	 */
555 	if (!signal_info.am_worker)
556 	{
557 		if (progname)
558 		{
559 			write_stderr(progname);
560 			write_stderr(": ");
561 		}
562 		write_stderr("terminated by user\n");
563 	}
564 
565 	/*
566 	 * And die, using _exit() not exit() because the latter will invoke atexit
567 	 * handlers that can fail if we interrupted related code.
568 	 */
569 	_exit(1);
570 }
571 
572 /*
573  * Enable cancel interrupt handler, if not already done.
574  */
575 static void
setup_cancel_handler(void)576 setup_cancel_handler(void)
577 {
578 	/*
579 	 * When forking, signal_info.handler_set will propagate into the new
580 	 * process, but that's fine because the signal handler state does too.
581 	 */
582 	if (!signal_info.handler_set)
583 	{
584 		signal_info.handler_set = true;
585 
586 		pqsignal(SIGINT, sigTermHandler);
587 		pqsignal(SIGTERM, sigTermHandler);
588 		pqsignal(SIGQUIT, sigTermHandler);
589 	}
590 }
591 
592 #else							/* WIN32 */
593 
594 /*
595  * Console interrupt handler --- runs in a newly-started thread.
596  *
597  * After stopping other threads and sending cancel requests on all open
598  * connections, we return FALSE which will allow the default ExitProcess()
599  * action to be taken.
600  */
601 static BOOL WINAPI
consoleHandler(DWORD dwCtrlType)602 consoleHandler(DWORD dwCtrlType)
603 {
604 	int			i;
605 	char		errbuf[1];
606 
607 	if (dwCtrlType == CTRL_C_EVENT ||
608 		dwCtrlType == CTRL_BREAK_EVENT)
609 	{
610 		/* Critical section prevents changing data we look at here */
611 		EnterCriticalSection(&signal_info_lock);
612 
613 		/*
614 		 * If in parallel mode, stop worker threads and send QueryCancel to
615 		 * their connected backends.  The main point of stopping the worker
616 		 * threads is to keep them from reporting the query cancels as errors,
617 		 * which would clutter the user's screen.  We needn't stop the master
618 		 * thread since it won't be doing much anyway.  Do this before
619 		 * canceling the main transaction, else we might get invalid-snapshot
620 		 * errors reported before we can stop the workers.  Ignore errors,
621 		 * there's not much we can do about them anyway.
622 		 */
623 		if (signal_info.pstate != NULL)
624 		{
625 			for (i = 0; i < signal_info.pstate->numWorkers; i++)
626 			{
627 				ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
628 				ArchiveHandle *AH = slot->args->AH;
629 				HANDLE		hThread = (HANDLE) slot->hThread;
630 
631 				/*
632 				 * Using TerminateThread here may leave some resources leaked,
633 				 * but it doesn't matter since we're about to end the whole
634 				 * process.
635 				 */
636 				if (hThread != INVALID_HANDLE_VALUE)
637 					TerminateThread(hThread, 0);
638 
639 				if (AH != NULL && AH->connCancel != NULL)
640 					(void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
641 			}
642 		}
643 
644 		/*
645 		 * Send QueryCancel to master connection, if enabled.  Ignore errors,
646 		 * there's not much we can do about them anyway.
647 		 */
648 		if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
649 			(void) PQcancel(signal_info.myAH->connCancel,
650 							errbuf, sizeof(errbuf));
651 
652 		LeaveCriticalSection(&signal_info_lock);
653 
654 		/*
655 		 * Report we're quitting, using nothing more complicated than
656 		 * write(2).  (We might be able to get away with using write_msg()
657 		 * here, but since we terminated other threads uncleanly above, it
658 		 * seems better to assume as little as possible.)
659 		 */
660 		if (progname)
661 		{
662 			write_stderr(progname);
663 			write_stderr(": ");
664 		}
665 		write_stderr("terminated by user\n");
666 	}
667 
668 	/* Always return FALSE to allow signal handling to continue */
669 	return FALSE;
670 }
671 
672 /*
673  * Enable cancel interrupt handler, if not already done.
674  */
675 static void
setup_cancel_handler(void)676 setup_cancel_handler(void)
677 {
678 	if (!signal_info.handler_set)
679 	{
680 		signal_info.handler_set = true;
681 
682 		InitializeCriticalSection(&signal_info_lock);
683 
684 		SetConsoleCtrlHandler(consoleHandler, TRUE);
685 	}
686 }
687 
688 #endif   /* WIN32 */
689 
690 
691 /*
692  * set_archive_cancel_info
693  *
694  * Fill AH->connCancel with cancellation info for the specified database
695  * connection; or clear it if conn is NULL.
696  */
697 void
set_archive_cancel_info(ArchiveHandle * AH,PGconn * conn)698 set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
699 {
700 	PGcancel   *oldConnCancel;
701 
702 	/*
703 	 * Activate the interrupt handler if we didn't yet in this process.  On
704 	 * Windows, this also initializes signal_info_lock; therefore it's
705 	 * important that this happen at least once before we fork off any
706 	 * threads.
707 	 */
708 	setup_cancel_handler();
709 
710 	/*
711 	 * On Unix, we assume that storing a pointer value is atomic with respect
712 	 * to any possible signal interrupt.  On Windows, use a critical section.
713 	 */
714 
715 #ifdef WIN32
716 	EnterCriticalSection(&signal_info_lock);
717 #endif
718 
719 	/* Free the old one if we have one */
720 	oldConnCancel = AH->connCancel;
721 	/* be sure interrupt handler doesn't use pointer while freeing */
722 	AH->connCancel = NULL;
723 
724 	if (oldConnCancel != NULL)
725 		PQfreeCancel(oldConnCancel);
726 
727 	/* Set the new one if specified */
728 	if (conn)
729 		AH->connCancel = PQgetCancel(conn);
730 
731 	/*
732 	 * On Unix, there's only ever one active ArchiveHandle per process, so we
733 	 * can just set signal_info.myAH unconditionally.  On Windows, do that
734 	 * only in the main thread; worker threads have to make sure their
735 	 * ArchiveHandle appears in the pstate data, which is dealt with in
736 	 * RunWorker().
737 	 */
738 #ifndef WIN32
739 	signal_info.myAH = AH;
740 #else
741 	if (mainThreadId == GetCurrentThreadId())
742 		signal_info.myAH = AH;
743 #endif
744 
745 #ifdef WIN32
746 	LeaveCriticalSection(&signal_info_lock);
747 #endif
748 }
749 
750 /*
751  * set_cancel_pstate
752  *
753  * Set signal_info.pstate to point to the specified ParallelState, if any.
754  * We need this mainly to have an interlock against Windows signal thread.
755  */
756 static void
set_cancel_pstate(ParallelState * pstate)757 set_cancel_pstate(ParallelState *pstate)
758 {
759 #ifdef WIN32
760 	EnterCriticalSection(&signal_info_lock);
761 #endif
762 
763 	signal_info.pstate = pstate;
764 
765 #ifdef WIN32
766 	LeaveCriticalSection(&signal_info_lock);
767 #endif
768 }
769 
770 /*
771  * set_cancel_slot_archive
772  *
773  * Set ParallelSlot's AH field to point to the specified archive, if any.
774  * We need this mainly to have an interlock against Windows signal thread.
775  */
776 static void
set_cancel_slot_archive(ParallelSlot * slot,ArchiveHandle * AH)777 set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
778 {
779 #ifdef WIN32
780 	EnterCriticalSection(&signal_info_lock);
781 #endif
782 
783 	slot->args->AH = AH;
784 
785 #ifdef WIN32
786 	LeaveCriticalSection(&signal_info_lock);
787 #endif
788 }
789 
790 
791 /*
792  * This function is called by both Unix and Windows variants to set up
793  * and run a worker process.  Caller should exit the process (or thread)
794  * upon return.
795  */
796 static void
RunWorker(ArchiveHandle * AH,ParallelSlot * slot)797 RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
798 {
799 	int			pipefd[2];
800 
801 	/* fetch child ends of pipes */
802 	pipefd[PIPE_READ] = slot->pipeRevRead;
803 	pipefd[PIPE_WRITE] = slot->pipeRevWrite;
804 
805 	/*
806 	 * Clone the archive so that we have our own state to work with, and in
807 	 * particular our own database connection.
808 	 *
809 	 * We clone on Unix as well as Windows, even though technically we don't
810 	 * need to because fork() gives us a copy in our own address space
811 	 * already.  But CloneArchive resets the state information and also clones
812 	 * the database connection which both seem kinda helpful.
813 	 */
814 	AH = CloneArchive(AH);
815 
816 	/* Remember cloned archive where signal handler can find it */
817 	set_cancel_slot_archive(slot, AH);
818 
819 	/*
820 	 * Call the setup worker function that's defined in the ArchiveHandle.
821 	 */
822 	(AH->SetupWorkerPtr) ((Archive *) AH);
823 
824 	/*
825 	 * Execute commands until done.
826 	 */
827 	WaitForCommands(AH, pipefd);
828 
829 	/*
830 	 * Disconnect from database and clean up.
831 	 */
832 	set_cancel_slot_archive(slot, NULL);
833 	DisconnectDatabase(&(AH->public));
834 	DeCloneArchive(AH);
835 }
836 
837 /*
838  * Thread base function for Windows
839  */
840 #ifdef WIN32
841 static unsigned __stdcall
init_spawned_worker_win32(WorkerInfo * wi)842 init_spawned_worker_win32(WorkerInfo *wi)
843 {
844 	ArchiveHandle *AH = wi->AH;
845 	ParallelSlot *slot = wi->slot;
846 
847 	/* Don't need WorkerInfo anymore */
848 	free(wi);
849 
850 	/* Run the worker ... */
851 	RunWorker(AH, slot);
852 
853 	/* Exit the thread */
854 	_endthreadex(0);
855 	return 0;
856 }
857 #endif   /* WIN32 */
858 
859 /*
860  * This function starts a parallel dump or restore by spawning off the worker
861  * processes.  For Windows, it creates a number of threads; on Unix the
862  * workers are created with fork().
863  */
864 ParallelState *
ParallelBackupStart(ArchiveHandle * AH)865 ParallelBackupStart(ArchiveHandle *AH)
866 {
867 	ParallelState *pstate;
868 	int			i;
869 	const size_t slotSize = AH->public.numWorkers * sizeof(ParallelSlot);
870 
871 	Assert(AH->public.numWorkers > 0);
872 
873 	pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
874 
875 	pstate->numWorkers = AH->public.numWorkers;
876 	pstate->parallelSlot = NULL;
877 
878 	if (AH->public.numWorkers == 1)
879 		return pstate;
880 
881 	/* Create status array, being sure to initialize all fields to 0 */
882 	pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize);
883 	memset((void *) pstate->parallelSlot, 0, slotSize);
884 
885 #ifdef WIN32
886 	/* Make fmtId() and fmtQualifiedId() use thread-local storage */
887 	getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
888 #endif
889 
890 	/*
891 	 * Set the pstate in shutdown_info, to tell the exit handler that it must
892 	 * clean up workers as well as the main database connection.  But we don't
893 	 * set this in signal_info yet, because we don't want child processes to
894 	 * inherit non-NULL signal_info.pstate.
895 	 */
896 	shutdown_info.pstate = pstate;
897 
898 	/*
899 	 * Temporarily disable query cancellation on the master connection.  This
900 	 * ensures that child processes won't inherit valid AH->connCancel
901 	 * settings and thus won't try to issue cancels against the master's
902 	 * connection.  No harm is done if we fail while it's disabled, because
903 	 * the master connection is idle at this point anyway.
904 	 */
905 	set_archive_cancel_info(AH, NULL);
906 
907 	/* Ensure stdio state is quiesced before forking */
908 	fflush(NULL);
909 
910 	/* Create desired number of workers */
911 	for (i = 0; i < pstate->numWorkers; i++)
912 	{
913 #ifdef WIN32
914 		WorkerInfo *wi;
915 		uintptr_t	handle;
916 #else
917 		pid_t		pid;
918 #endif
919 		ParallelSlot *slot = &(pstate->parallelSlot[i]);
920 		int			pipeMW[2],
921 					pipeWM[2];
922 
923 		slot->args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
924 		slot->args->AH = NULL;
925 		slot->args->te = NULL;
926 
927 		/* Create communication pipes for this worker */
928 		if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
929 			exit_horribly(modulename,
930 						  "could not create communication channels: %s\n",
931 						  strerror(errno));
932 
933 		/* master's ends of the pipes */
934 		slot->pipeRead = pipeWM[PIPE_READ];
935 		slot->pipeWrite = pipeMW[PIPE_WRITE];
936 		/* child's ends of the pipes */
937 		slot->pipeRevRead = pipeMW[PIPE_READ];
938 		slot->pipeRevWrite = pipeWM[PIPE_WRITE];
939 
940 #ifdef WIN32
941 		/* Create transient structure to pass args to worker function */
942 		wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
943 
944 		wi->AH = AH;
945 		wi->slot = slot;
946 
947 		handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
948 								wi, 0, &(slot->threadId));
949 		slot->hThread = handle;
950 		slot->workerStatus = WRKR_IDLE;
951 #else							/* !WIN32 */
952 		pid = fork();
953 		if (pid == 0)
954 		{
955 			/* we are the worker */
956 			int			j;
957 
958 			/* this is needed for GetMyPSlot() */
959 			slot->pid = getpid();
960 
961 			/* instruct signal handler that we're in a worker now */
962 			signal_info.am_worker = true;
963 
964 			/* close read end of Worker -> Master */
965 			closesocket(pipeWM[PIPE_READ]);
966 			/* close write end of Master -> Worker */
967 			closesocket(pipeMW[PIPE_WRITE]);
968 
969 			/*
970 			 * Close all inherited fds for communication of the master with
971 			 * previously-forked workers.
972 			 */
973 			for (j = 0; j < i; j++)
974 			{
975 				closesocket(pstate->parallelSlot[j].pipeRead);
976 				closesocket(pstate->parallelSlot[j].pipeWrite);
977 			}
978 
979 			/* Run the worker ... */
980 			RunWorker(AH, slot);
981 
982 			/* We can just exit(0) when done */
983 			exit(0);
984 		}
985 		else if (pid < 0)
986 		{
987 			/* fork failed */
988 			exit_horribly(modulename,
989 						  "could not create worker process: %s\n",
990 						  strerror(errno));
991 		}
992 
993 		/* In Master after successful fork */
994 		slot->pid = pid;
995 		slot->workerStatus = WRKR_IDLE;
996 
997 		/* close read end of Master -> Worker */
998 		closesocket(pipeMW[PIPE_READ]);
999 		/* close write end of Worker -> Master */
1000 		closesocket(pipeWM[PIPE_WRITE]);
1001 #endif   /* WIN32 */
1002 	}
1003 
1004 	/*
1005 	 * Having forked off the workers, disable SIGPIPE so that master isn't
1006 	 * killed if it tries to send a command to a dead worker.  We don't want
1007 	 * the workers to inherit this setting, though.
1008 	 */
1009 #ifndef WIN32
1010 	pqsignal(SIGPIPE, SIG_IGN);
1011 #endif
1012 
1013 	/*
1014 	 * Re-establish query cancellation on the master connection.
1015 	 */
1016 	set_archive_cancel_info(AH, AH->connection);
1017 
1018 	/*
1019 	 * Tell the cancel signal handler to forward signals to worker processes,
1020 	 * too.  (As with query cancel, we did not need this earlier because the
1021 	 * workers have not yet been given anything to do; if we die before this
1022 	 * point, any already-started workers will see EOF and quit promptly.)
1023 	 */
1024 	set_cancel_pstate(pstate);
1025 
1026 	return pstate;
1027 }
1028 
1029 /*
1030  * Close down a parallel dump or restore.
1031  */
1032 void
ParallelBackupEnd(ArchiveHandle * AH,ParallelState * pstate)1033 ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
1034 {
1035 	int			i;
1036 
1037 	/* No work if non-parallel */
1038 	if (pstate->numWorkers == 1)
1039 		return;
1040 
1041 	/* There should not be any unfinished jobs */
1042 	Assert(IsEveryWorkerIdle(pstate));
1043 
1044 	/* Close the sockets so that the workers know they can exit */
1045 	for (i = 0; i < pstate->numWorkers; i++)
1046 	{
1047 		closesocket(pstate->parallelSlot[i].pipeRead);
1048 		closesocket(pstate->parallelSlot[i].pipeWrite);
1049 	}
1050 
1051 	/* Wait for them to exit */
1052 	WaitForTerminatingWorkers(pstate);
1053 
1054 	/*
1055 	 * Unlink pstate from shutdown_info, so the exit handler will not try to
1056 	 * use it; and likewise unlink from signal_info.
1057 	 */
1058 	shutdown_info.pstate = NULL;
1059 	set_cancel_pstate(NULL);
1060 
1061 	/* Release state (mere neatnik-ism, since we're about to terminate) */
1062 	free(pstate->parallelSlot);
1063 	free(pstate);
1064 }
1065 
1066 /*
1067  * Dispatch a job to some free worker (caller must ensure there is one!)
1068  *
1069  * te is the TocEntry to be processed, act is the action to be taken on it.
1070  */
1071 void
DispatchJobForTocEntry(ArchiveHandle * AH,ParallelState * pstate,TocEntry * te,T_Action act)1072 DispatchJobForTocEntry(ArchiveHandle *AH, ParallelState *pstate, TocEntry *te,
1073 					   T_Action act)
1074 {
1075 	int			worker;
1076 	char	   *arg;
1077 
1078 	/* our caller makes sure that at least one worker is idle */
1079 	worker = GetIdleWorker(pstate);
1080 	Assert(worker != NO_SLOT);
1081 
1082 	/* Construct and send command string */
1083 	arg = (AH->MasterStartParallelItemPtr) (AH, te, act);
1084 
1085 	sendMessageToWorker(pstate, worker, arg);
1086 
1087 	/* XXX aren't we leaking string here? (no, because it's static. Ick.) */
1088 
1089 	/* Remember worker is busy, and which TocEntry it's working on */
1090 	pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
1091 	pstate->parallelSlot[worker].args->te = te;
1092 }
1093 
1094 /*
1095  * Find an idle worker and return its slot number.
1096  * Return NO_SLOT if none are idle.
1097  */
1098 int
GetIdleWorker(ParallelState * pstate)1099 GetIdleWorker(ParallelState *pstate)
1100 {
1101 	int			i;
1102 
1103 	for (i = 0; i < pstate->numWorkers; i++)
1104 	{
1105 		if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
1106 			return i;
1107 	}
1108 	return NO_SLOT;
1109 }
1110 
1111 /*
1112  * Return true iff no worker is running.
1113  */
1114 static bool
HasEveryWorkerTerminated(ParallelState * pstate)1115 HasEveryWorkerTerminated(ParallelState *pstate)
1116 {
1117 	int			i;
1118 
1119 	for (i = 0; i < pstate->numWorkers; i++)
1120 	{
1121 		if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1122 			return false;
1123 	}
1124 	return true;
1125 }
1126 
1127 /*
1128  * Return true iff every worker is in the WRKR_IDLE state.
1129  */
1130 bool
IsEveryWorkerIdle(ParallelState * pstate)1131 IsEveryWorkerIdle(ParallelState *pstate)
1132 {
1133 	int			i;
1134 
1135 	for (i = 0; i < pstate->numWorkers; i++)
1136 	{
1137 		if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
1138 			return false;
1139 	}
1140 	return true;
1141 }
1142 
1143 /*
1144  * Acquire lock on a table to be dumped by a worker process.
1145  *
1146  * The master process is already holding an ACCESS SHARE lock.  Ordinarily
1147  * it's no problem for a worker to get one too, but if anything else besides
1148  * pg_dump is running, there's a possible deadlock:
1149  *
1150  * 1) Master dumps the schema and locks all tables in ACCESS SHARE mode.
1151  * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
1152  *	  because the master holds a conflicting ACCESS SHARE lock).
1153  * 3) A worker process also requests an ACCESS SHARE lock to read the table.
1154  *	  The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
1155  * 4) Now we have a deadlock, since the master is effectively waiting for
1156  *	  the worker.  The server cannot detect that, however.
1157  *
1158  * To prevent an infinite wait, prior to touching a table in a worker, request
1159  * a lock in ACCESS SHARE mode but with NOWAIT.  If we don't get the lock,
1160  * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
1161  * so we have a deadlock.  We must fail the backup in that case.
1162  */
1163 static void
lockTableForWorker(ArchiveHandle * AH,TocEntry * te)1164 lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
1165 {
1166 	const char *qualId;
1167 	PQExpBuffer query;
1168 	PGresult   *res;
1169 
1170 	/* Nothing to do for BLOBS */
1171 	if (strcmp(te->desc, "BLOBS") == 0)
1172 		return;
1173 
1174 	query = createPQExpBuffer();
1175 
1176 	qualId = fmtQualifiedId(AH->public.remoteVersion, te->namespace, te->tag);
1177 
1178 	appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
1179 					  qualId);
1180 
1181 	res = PQexec(AH->connection, query->data);
1182 
1183 	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
1184 		exit_horribly(modulename,
1185 					  "could not obtain lock on relation \"%s\"\n"
1186 		"This usually means that someone requested an ACCESS EXCLUSIVE lock "
1187 			  "on the table after the pg_dump parent process had gotten the "
1188 					  "initial ACCESS SHARE lock on the table.\n", qualId);
1189 
1190 	PQclear(res);
1191 	destroyPQExpBuffer(query);
1192 }
1193 
1194 /*
1195  * WaitForCommands: main routine for a worker process.
1196  *
1197  * Read and execute commands from the master until we see EOF on the pipe.
1198  */
1199 static void
WaitForCommands(ArchiveHandle * AH,int pipefd[2])1200 WaitForCommands(ArchiveHandle *AH, int pipefd[2])
1201 {
1202 	char	   *command;
1203 	DumpId		dumpId;
1204 	int			nBytes;
1205 	char	   *str;
1206 	TocEntry   *te;
1207 
1208 	for (;;)
1209 	{
1210 		if (!(command = getMessageFromMaster(pipefd)))
1211 		{
1212 			/* EOF, so done */
1213 			return;
1214 		}
1215 
1216 		if (messageStartsWith(command, "DUMP "))
1217 		{
1218 			/* Decode the command */
1219 			sscanf(command + strlen("DUMP "), "%d%n", &dumpId, &nBytes);
1220 			Assert(nBytes == strlen(command) - strlen("DUMP "));
1221 			te = getTocEntryByDumpId(AH, dumpId);
1222 			Assert(te != NULL);
1223 
1224 			/* Acquire lock on this table within the worker's session */
1225 			lockTableForWorker(AH, te);
1226 
1227 			/* Perform the dump command */
1228 			str = (AH->WorkerJobDumpPtr) (AH, te);
1229 
1230 			/* Return status to master */
1231 			sendMessageToMaster(pipefd, str);
1232 
1233 			/* we are responsible for freeing the status string */
1234 			free(str);
1235 		}
1236 		else if (messageStartsWith(command, "RESTORE "))
1237 		{
1238 			/* Decode the command */
1239 			sscanf(command + strlen("RESTORE "), "%d%n", &dumpId, &nBytes);
1240 			Assert(nBytes == strlen(command) - strlen("RESTORE "));
1241 			te = getTocEntryByDumpId(AH, dumpId);
1242 			Assert(te != NULL);
1243 
1244 			/* Perform the restore command */
1245 			str = (AH->WorkerJobRestorePtr) (AH, te);
1246 
1247 			/* Return status to master */
1248 			sendMessageToMaster(pipefd, str);
1249 
1250 			/* we are responsible for freeing the status string */
1251 			free(str);
1252 		}
1253 		else
1254 			exit_horribly(modulename,
1255 					   "unrecognized command received from master: \"%s\"\n",
1256 						  command);
1257 
1258 		/* command was pg_malloc'd and we are responsible for free()ing it. */
1259 		free(command);
1260 	}
1261 }
1262 
1263 /*
1264  * Check for status messages from workers.
1265  *
1266  * If do_wait is true, wait to get a status message; otherwise, just return
1267  * immediately if there is none available.
1268  *
1269  * When we get a status message, we let MasterEndParallelItemPtr process it,
1270  * then save the resulting status code and switch the worker's state to
1271  * WRKR_FINISHED.  Later, caller must call ReapWorkerStatus() to verify
1272  * that the status was "OK" and push the worker back to IDLE state.
1273  *
1274  * XXX Rube Goldberg would be proud of this API, but no one else should be.
1275  *
1276  * XXX is it worth checking for more than one status message per call?
1277  * It seems somewhat unlikely that multiple workers would finish at exactly
1278  * the same time.
1279  */
1280 void
ListenToWorkers(ArchiveHandle * AH,ParallelState * pstate,bool do_wait)1281 ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
1282 {
1283 	int			worker;
1284 	char	   *msg;
1285 
1286 	/* Try to collect a status message */
1287 	msg = getMessageFromWorker(pstate, do_wait, &worker);
1288 
1289 	if (!msg)
1290 	{
1291 		/* If do_wait is true, we must have detected EOF on some socket */
1292 		if (do_wait)
1293 			exit_horribly(modulename, "a worker process died unexpectedly\n");
1294 		return;
1295 	}
1296 
1297 	/* Process it and update our idea of the worker's status */
1298 	if (messageStartsWith(msg, "OK "))
1299 	{
1300 		TocEntry   *te = pstate->parallelSlot[worker].args->te;
1301 		char	   *statusString;
1302 
1303 		if (messageStartsWith(msg, "OK RESTORE "))
1304 		{
1305 			statusString = msg + strlen("OK RESTORE ");
1306 			pstate->parallelSlot[worker].status =
1307 				(AH->MasterEndParallelItemPtr)
1308 				(AH, te, statusString, ACT_RESTORE);
1309 		}
1310 		else if (messageStartsWith(msg, "OK DUMP "))
1311 		{
1312 			statusString = msg + strlen("OK DUMP ");
1313 			pstate->parallelSlot[worker].status =
1314 				(AH->MasterEndParallelItemPtr)
1315 				(AH, te, statusString, ACT_DUMP);
1316 		}
1317 		else
1318 			exit_horribly(modulename,
1319 						  "invalid message received from worker: \"%s\"\n",
1320 						  msg);
1321 		pstate->parallelSlot[worker].workerStatus = WRKR_FINISHED;
1322 	}
1323 	else
1324 		exit_horribly(modulename,
1325 					  "invalid message received from worker: \"%s\"\n",
1326 					  msg);
1327 
1328 	/* Free the string returned from getMessageFromWorker */
1329 	free(msg);
1330 }
1331 
1332 /*
1333  * Check to see if any worker is in WRKR_FINISHED state.  If so,
1334  * return its command status code into *status, reset it to IDLE state,
1335  * and return its slot number.  Otherwise return NO_SLOT.
1336  *
1337  * This function is executed in the master process.
1338  */
1339 int
ReapWorkerStatus(ParallelState * pstate,int * status)1340 ReapWorkerStatus(ParallelState *pstate, int *status)
1341 {
1342 	int			i;
1343 
1344 	for (i = 0; i < pstate->numWorkers; i++)
1345 	{
1346 		if (pstate->parallelSlot[i].workerStatus == WRKR_FINISHED)
1347 		{
1348 			*status = pstate->parallelSlot[i].status;
1349 			pstate->parallelSlot[i].status = 0;
1350 			pstate->parallelSlot[i].workerStatus = WRKR_IDLE;
1351 			return i;
1352 		}
1353 	}
1354 	return NO_SLOT;
1355 }
1356 
1357 /*
1358  * Wait, if necessary, until we have at least one idle worker.
1359  * Reap worker status as necessary to move FINISHED workers to IDLE state.
1360  *
1361  * We assume that no extra processing is required when reaping a finished
1362  * command, except for checking that the status was OK (zero).
1363  * Caution: that assumption means that this function can only be used in
1364  * parallel dump, not parallel restore, because the latter has a more
1365  * complex set of rules about handling status.
1366  *
1367  * This function is executed in the master process.
1368  */
1369 void
EnsureIdleWorker(ArchiveHandle * AH,ParallelState * pstate)1370 EnsureIdleWorker(ArchiveHandle *AH, ParallelState *pstate)
1371 {
1372 	int			ret_worker;
1373 	int			work_status;
1374 
1375 	for (;;)
1376 	{
1377 		int			nTerm = 0;
1378 
1379 		while ((ret_worker = ReapWorkerStatus(pstate, &work_status)) != NO_SLOT)
1380 		{
1381 			if (work_status != 0)
1382 				exit_horribly(modulename, "error processing a parallel work item\n");
1383 
1384 			nTerm++;
1385 		}
1386 
1387 		/*
1388 		 * We need to make sure that we have an idle worker before dispatching
1389 		 * the next item. If nTerm > 0 we already have that (quick check).
1390 		 */
1391 		if (nTerm > 0)
1392 			return;
1393 
1394 		/* explicit check for an idle worker */
1395 		if (GetIdleWorker(pstate) != NO_SLOT)
1396 			return;
1397 
1398 		/*
1399 		 * If we have no idle worker, read the result of one or more workers
1400 		 * and loop the loop to call ReapWorkerStatus() on them
1401 		 */
1402 		ListenToWorkers(AH, pstate, true);
1403 	}
1404 }
1405 
1406 /*
1407  * Wait for all workers to be idle.
1408  * Reap worker status as necessary to move FINISHED workers to IDLE state.
1409  *
1410  * We assume that no extra processing is required when reaping a finished
1411  * command, except for checking that the status was OK (zero).
1412  * Caution: that assumption means that this function can only be used in
1413  * parallel dump, not parallel restore, because the latter has a more
1414  * complex set of rules about handling status.
1415  *
1416  * This function is executed in the master process.
1417  */
1418 void
EnsureWorkersFinished(ArchiveHandle * AH,ParallelState * pstate)1419 EnsureWorkersFinished(ArchiveHandle *AH, ParallelState *pstate)
1420 {
1421 	int			work_status;
1422 
1423 	if (!pstate || pstate->numWorkers == 1)
1424 		return;
1425 
1426 	/* Waiting for the remaining worker processes to finish */
1427 	while (!IsEveryWorkerIdle(pstate))
1428 	{
1429 		if (ReapWorkerStatus(pstate, &work_status) == NO_SLOT)
1430 			ListenToWorkers(AH, pstate, true);
1431 		else if (work_status != 0)
1432 			exit_horribly(modulename,
1433 						  "error processing a parallel work item\n");
1434 	}
1435 }
1436 
1437 /*
1438  * Read one command message from the master, blocking if necessary
1439  * until one is available, and return it as a malloc'd string.
1440  * On EOF, return NULL.
1441  *
1442  * This function is executed in worker processes.
1443  */
1444 static char *
getMessageFromMaster(int pipefd[2])1445 getMessageFromMaster(int pipefd[2])
1446 {
1447 	return readMessageFromPipe(pipefd[PIPE_READ]);
1448 }
1449 
1450 /*
1451  * Send a status message to the master.
1452  *
1453  * This function is executed in worker processes.
1454  */
1455 static void
sendMessageToMaster(int pipefd[2],const char * str)1456 sendMessageToMaster(int pipefd[2], const char *str)
1457 {
1458 	int			len = strlen(str) + 1;
1459 
1460 	if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
1461 		exit_horribly(modulename,
1462 					  "could not write to the communication channel: %s\n",
1463 					  strerror(errno));
1464 }
1465 
1466 /*
1467  * Wait until some descriptor in "workerset" becomes readable.
1468  * Returns -1 on error, else the number of readable descriptors.
1469  */
1470 static int
select_loop(int maxFd,fd_set * workerset)1471 select_loop(int maxFd, fd_set *workerset)
1472 {
1473 	int			i;
1474 	fd_set		saveSet = *workerset;
1475 
1476 	for (;;)
1477 	{
1478 		*workerset = saveSet;
1479 		i = select(maxFd + 1, workerset, NULL, NULL, NULL);
1480 
1481 #ifndef WIN32
1482 		if (i < 0 && errno == EINTR)
1483 			continue;
1484 #else
1485 		if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1486 			continue;
1487 #endif
1488 		break;
1489 	}
1490 
1491 	return i;
1492 }
1493 
1494 
1495 /*
1496  * Check for messages from worker processes.
1497  *
1498  * If a message is available, return it as a malloc'd string, and put the
1499  * index of the sending worker in *worker.
1500  *
1501  * If nothing is available, wait if "do_wait" is true, else return NULL.
1502  *
1503  * If we detect EOF on any socket, we'll return NULL.  It's not great that
1504  * that's hard to distinguish from the no-data-available case, but for now
1505  * our one caller is okay with that.
1506  *
1507  * This function is executed in the master process.
1508  */
1509 static char *
getMessageFromWorker(ParallelState * pstate,bool do_wait,int * worker)1510 getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
1511 {
1512 	int			i;
1513 	fd_set		workerset;
1514 	int			maxFd = -1;
1515 	struct timeval nowait = {0, 0};
1516 
1517 	/* construct bitmap of socket descriptors for select() */
1518 	FD_ZERO(&workerset);
1519 	for (i = 0; i < pstate->numWorkers; i++)
1520 	{
1521 		if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1522 			continue;
1523 		FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
1524 		if (pstate->parallelSlot[i].pipeRead > maxFd)
1525 			maxFd = pstate->parallelSlot[i].pipeRead;
1526 	}
1527 
1528 	if (do_wait)
1529 	{
1530 		i = select_loop(maxFd, &workerset);
1531 		Assert(i != 0);
1532 	}
1533 	else
1534 	{
1535 		if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1536 			return NULL;
1537 	}
1538 
1539 	if (i < 0)
1540 		exit_horribly(modulename, "select() failed: %s\n", strerror(errno));
1541 
1542 	for (i = 0; i < pstate->numWorkers; i++)
1543 	{
1544 		char	   *msg;
1545 
1546 		if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1547 			continue;
1548 		if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
1549 			continue;
1550 
1551 		/*
1552 		 * Read the message if any.  If the socket is ready because of EOF,
1553 		 * we'll return NULL instead (and the socket will stay ready, so the
1554 		 * condition will persist).
1555 		 *
1556 		 * Note: because this is a blocking read, we'll wait if only part of
1557 		 * the message is available.  Waiting a long time would be bad, but
1558 		 * since worker status messages are short and are always sent in one
1559 		 * operation, it shouldn't be a problem in practice.
1560 		 */
1561 		msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
1562 		*worker = i;
1563 		return msg;
1564 	}
1565 	Assert(false);
1566 	return NULL;
1567 }
1568 
1569 /*
1570  * Send a command message to the specified worker process.
1571  *
1572  * This function is executed in the master process.
1573  */
1574 static void
sendMessageToWorker(ParallelState * pstate,int worker,const char * str)1575 sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
1576 {
1577 	int			len = strlen(str) + 1;
1578 
1579 	if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
1580 	{
1581 		exit_horribly(modulename,
1582 					  "could not write to the communication channel: %s\n",
1583 					  strerror(errno));
1584 	}
1585 }
1586 
1587 /*
1588  * Read one message from the specified pipe (fd), blocking if necessary
1589  * until one is available, and return it as a malloc'd string.
1590  * On EOF, return NULL.
1591  *
1592  * A "message" on the channel is just a null-terminated string.
1593  */
1594 static char *
readMessageFromPipe(int fd)1595 readMessageFromPipe(int fd)
1596 {
1597 	char	   *msg;
1598 	int			msgsize,
1599 				bufsize;
1600 	int			ret;
1601 
1602 	/*
1603 	 * In theory, if we let piperead() read multiple bytes, it might give us
1604 	 * back fragments of multiple messages.  (That can't actually occur, since
1605 	 * neither master nor workers send more than one message without waiting
1606 	 * for a reply, but we don't wish to assume that here.)  For simplicity,
1607 	 * read a byte at a time until we get the terminating '\0'.  This method
1608 	 * is a bit inefficient, but since this is only used for relatively short
1609 	 * command and status strings, it shouldn't matter.
1610 	 */
1611 	bufsize = 64;				/* could be any number */
1612 	msg = (char *) pg_malloc(bufsize);
1613 	msgsize = 0;
1614 	for (;;)
1615 	{
1616 		Assert(msgsize < bufsize);
1617 		ret = piperead(fd, msg + msgsize, 1);
1618 		if (ret <= 0)
1619 			break;				/* error or connection closure */
1620 
1621 		Assert(ret == 1);
1622 
1623 		if (msg[msgsize] == '\0')
1624 			return msg;			/* collected whole message */
1625 
1626 		msgsize++;
1627 		if (msgsize == bufsize) /* enlarge buffer if needed */
1628 		{
1629 			bufsize += 16;		/* could be any number */
1630 			msg = (char *) pg_realloc(msg, bufsize);
1631 		}
1632 	}
1633 
1634 	/* Other end has closed the connection */
1635 	pg_free(msg);
1636 	return NULL;
1637 }
1638 
1639 #ifdef WIN32
1640 
1641 /*
1642  * This is a replacement version of pipe(2) for Windows which allows the pipe
1643  * handles to be used in select().
1644  *
1645  * Reads and writes on the pipe must go through piperead()/pipewrite().
1646  *
1647  * For consistency with Unix we declare the returned handles as "int".
1648  * This is okay even on WIN64 because system handles are not more than
1649  * 32 bits wide, but we do have to do some casting.
1650  */
1651 static int
pgpipe(int handles[2])1652 pgpipe(int handles[2])
1653 {
1654 	pgsocket	s,
1655 				tmp_sock;
1656 	struct sockaddr_in serv_addr;
1657 	int			len = sizeof(serv_addr);
1658 
1659 	/* We have to use the Unix socket invalid file descriptor value here. */
1660 	handles[0] = handles[1] = -1;
1661 
1662 	/*
1663 	 * setup listen socket
1664 	 */
1665 	if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1666 	{
1667 		write_msg(modulename, "pgpipe: could not create socket: error code %d\n",
1668 				  WSAGetLastError());
1669 		return -1;
1670 	}
1671 
1672 	memset((void *) &serv_addr, 0, sizeof(serv_addr));
1673 	serv_addr.sin_family = AF_INET;
1674 	serv_addr.sin_port = htons(0);
1675 	serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1676 	if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1677 	{
1678 		write_msg(modulename, "pgpipe: could not bind: error code %d\n",
1679 				  WSAGetLastError());
1680 		closesocket(s);
1681 		return -1;
1682 	}
1683 	if (listen(s, 1) == SOCKET_ERROR)
1684 	{
1685 		write_msg(modulename, "pgpipe: could not listen: error code %d\n",
1686 				  WSAGetLastError());
1687 		closesocket(s);
1688 		return -1;
1689 	}
1690 	if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
1691 	{
1692 		write_msg(modulename, "pgpipe: getsockname() failed: error code %d\n",
1693 				  WSAGetLastError());
1694 		closesocket(s);
1695 		return -1;
1696 	}
1697 
1698 	/*
1699 	 * setup pipe handles
1700 	 */
1701 	if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1702 	{
1703 		write_msg(modulename, "pgpipe: could not create second socket: error code %d\n",
1704 				  WSAGetLastError());
1705 		closesocket(s);
1706 		return -1;
1707 	}
1708 	handles[1] = (int) tmp_sock;
1709 
1710 	if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1711 	{
1712 		write_msg(modulename, "pgpipe: could not connect socket: error code %d\n",
1713 				  WSAGetLastError());
1714 		closesocket(handles[1]);
1715 		handles[1] = -1;
1716 		closesocket(s);
1717 		return -1;
1718 	}
1719 	if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
1720 	{
1721 		write_msg(modulename, "pgpipe: could not accept connection: error code %d\n",
1722 				  WSAGetLastError());
1723 		closesocket(handles[1]);
1724 		handles[1] = -1;
1725 		closesocket(s);
1726 		return -1;
1727 	}
1728 	handles[0] = (int) tmp_sock;
1729 
1730 	closesocket(s);
1731 	return 0;
1732 }
1733 
1734 /*
1735  * Windows implementation of reading from a pipe.
1736  */
1737 static int
piperead(int s,char * buf,int len)1738 piperead(int s, char *buf, int len)
1739 {
1740 	int			ret = recv(s, buf, len, 0);
1741 
1742 	if (ret < 0 && WSAGetLastError() == WSAECONNRESET)
1743 	{
1744 		/* EOF on the pipe! */
1745 		ret = 0;
1746 	}
1747 	return ret;
1748 }
1749 
1750 #endif   /* WIN32 */
1751