1 /*-------------------------------------------------------------------------
2  *
3  * parallel.c
4  *
5  *	Parallel support for pg_dump and pg_restore
6  *
7  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *		src/bin/pg_dump/parallel.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 /*
17  * Parallel operation works like this:
18  *
19  * The original, master process calls ParallelBackupStart(), which forks off
20  * the desired number of worker processes, which each enter WaitForCommands().
21  *
22  * The master process dispatches an individual work item to one of the worker
23  * processes in DispatchJobForTocEntry().  We send a command string such as
24  * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
25  * The worker process receives and decodes the command and passes it to the
26  * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
27  * which are routines of the current archive format.  That routine performs
28  * the required action (dump or restore) and returns an integer status code.
29  * This is passed back to the master where we pass it to the
30  * ParallelCompletionPtr callback function that was passed to
31  * DispatchJobForTocEntry().  The callback function does state updating
32  * for the master control logic in pg_backup_archiver.c.
33  *
34  * In principle additional archive-format-specific information might be needed
35  * in commands or worker status responses, but so far that hasn't proved
36  * necessary, since workers have full copies of the ArchiveHandle/TocEntry
37  * data structures.  Remember that we have forked off the workers only after
38  * we have read in the catalog.  That's why our worker processes can also
39  * access the catalog information.  (In the Windows case, the workers are
40  * threads in the same process.  To avoid problems, they work with cloned
41  * copies of the Archive data structure; see RunWorker().)
42  *
43  * In the master process, the workerStatus field for each worker has one of
44  * the following values:
45  *		WRKR_NOT_STARTED: we've not yet forked this worker
46  *		WRKR_IDLE: it's waiting for a command
47  *		WRKR_WORKING: it's working on a command
48  *		WRKR_TERMINATED: process ended
49  * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
50  * state, and must be NULL in other states.
51  */
52 
53 #include "postgres_fe.h"
54 
55 #ifndef WIN32
56 #include <sys/wait.h>
57 #include <signal.h>
58 #include <unistd.h>
59 #include <fcntl.h>
60 #endif
61 #ifdef HAVE_SYS_SELECT_H
62 #include <sys/select.h>
63 #endif
64 
65 #include "parallel.h"
66 #include "pg_backup_utils.h"
67 #include "fe_utils/string_utils.h"
68 
69 /* Mnemonic macros for indexing the fd array returned by pipe(2) */
70 #define PIPE_READ							0
71 #define PIPE_WRITE							1
72 
73 #define NO_SLOT (-1)			/* Failure result for GetIdleWorker() */
74 
75 /* Worker process statuses */
76 typedef enum
77 {
78 	WRKR_NOT_STARTED = 0,
79 	WRKR_IDLE,
80 	WRKR_WORKING,
81 	WRKR_TERMINATED
82 } T_WorkerStatus;
83 
84 #define WORKER_IS_RUNNING(workerStatus) \
85 	((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
86 
87 /*
88  * Private per-parallel-worker state (typedef for this is in parallel.h).
89  *
90  * Much of this is valid only in the master process (or, on Windows, should
91  * be touched only by the master thread).  But the AH field should be touched
92  * only by workers.  The pipe descriptors are valid everywhere.
93  */
94 struct ParallelSlot
95 {
96 	T_WorkerStatus workerStatus;	/* see enum above */
97 
98 	/* These fields are valid if workerStatus == WRKR_WORKING: */
99 	ParallelCompletionPtr callback; /* function to call on completion */
100 	void	   *callback_data;	/* passthrough data for it */
101 
102 	ArchiveHandle *AH;			/* Archive data worker is using */
103 
104 	int			pipeRead;		/* master's end of the pipes */
105 	int			pipeWrite;
106 	int			pipeRevRead;	/* child's end of the pipes */
107 	int			pipeRevWrite;
108 
109 	/* Child process/thread identity info: */
110 #ifdef WIN32
111 	uintptr_t	hThread;
112 	unsigned int threadId;
113 #else
114 	pid_t		pid;
115 #endif
116 };
117 
118 #ifdef WIN32
119 
120 /*
121  * Structure to hold info passed by _beginthreadex() to the function it calls
122  * via its single allowed argument.
123  */
124 typedef struct
125 {
126 	ArchiveHandle *AH;			/* master database connection */
127 	ParallelSlot *slot;			/* this worker's parallel slot */
128 } WorkerInfo;
129 
130 /* Windows implementation of pipe access */
131 static int	pgpipe(int handles[2]);
132 static int	piperead(int s, char *buf, int len);
133 #define pipewrite(a,b,c)	send(a,b,c,0)
134 
135 #else							/* !WIN32 */
136 
137 /* Non-Windows implementation of pipe access */
138 #define pgpipe(a)			pipe(a)
139 #define piperead(a,b,c)		read(a,b,c)
140 #define pipewrite(a,b,c)	write(a,b,c)
141 
142 #endif							/* WIN32 */
143 
144 /*
145  * State info for archive_close_connection() shutdown callback.
146  */
147 typedef struct ShutdownInformation
148 {
149 	ParallelState *pstate;
150 	Archive    *AHX;
151 } ShutdownInformation;
152 
153 static ShutdownInformation shutdown_info;
154 
155 /*
156  * State info for signal handling.
157  * We assume signal_info initializes to zeroes.
158  *
159  * On Unix, myAH is the master DB connection in the master process, and the
160  * worker's own connection in worker processes.  On Windows, we have only one
161  * instance of signal_info, so myAH is the master connection and the worker
162  * connections must be dug out of pstate->parallelSlot[].
163  */
164 typedef struct DumpSignalInformation
165 {
166 	ArchiveHandle *myAH;		/* database connection to issue cancel for */
167 	ParallelState *pstate;		/* parallel state, if any */
168 	bool		handler_set;	/* signal handler set up in this process? */
169 #ifndef WIN32
170 	bool		am_worker;		/* am I a worker process? */
171 #endif
172 } DumpSignalInformation;
173 
174 static volatile DumpSignalInformation signal_info;
175 
176 #ifdef WIN32
177 static CRITICAL_SECTION signal_info_lock;
178 #endif
179 
180 /*
181  * Write a simple string to stderr --- must be safe in a signal handler.
182  * We ignore the write() result since there's not much we could do about it.
183  * Certain compilers make that harder than it ought to be.
184  */
185 #define write_stderr(str) \
186 	do { \
187 		const char *str_ = (str); \
188 		int		rc_; \
189 		rc_ = write(fileno(stderr), str_, strlen(str_)); \
190 		(void) rc_; \
191 	} while (0)
192 
193 
194 #ifdef WIN32
195 /* file-scope variables */
196 static DWORD tls_index;
197 
198 /* globally visible variables (needed by exit_nicely) */
199 bool		parallel_init_done = false;
200 DWORD		mainThreadId;
201 #endif							/* WIN32 */
202 
203 static const char *modulename = gettext_noop("parallel archiver");
204 
205 /* Local function prototypes */
206 static ParallelSlot *GetMyPSlot(ParallelState *pstate);
207 static void archive_close_connection(int code, void *arg);
208 static void ShutdownWorkersHard(ParallelState *pstate);
209 static void WaitForTerminatingWorkers(ParallelState *pstate);
210 static void setup_cancel_handler(void);
211 static void set_cancel_pstate(ParallelState *pstate);
212 static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
213 static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
214 static int	GetIdleWorker(ParallelState *pstate);
215 static bool HasEveryWorkerTerminated(ParallelState *pstate);
216 static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
217 static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
218 static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
219 				bool do_wait);
220 static char *getMessageFromMaster(int pipefd[2]);
221 static void sendMessageToMaster(int pipefd[2], const char *str);
222 static int	select_loop(int maxFd, fd_set *workerset);
223 static char *getMessageFromWorker(ParallelState *pstate,
224 					 bool do_wait, int *worker);
225 static void sendMessageToWorker(ParallelState *pstate,
226 					int worker, const char *str);
227 static char *readMessageFromPipe(int fd);
228 
229 #define messageStartsWith(msg, prefix) \
230 	(strncmp(msg, prefix, strlen(prefix)) == 0)
231 #define messageEquals(msg, pattern) \
232 	(strcmp(msg, pattern) == 0)
233 
234 
235 /*
236  * Initialize parallel dump support --- should be called early in process
237  * startup.  (Currently, this is called whether or not we intend parallel
238  * activity.)
239  */
240 void
init_parallel_dump_utils(void)241 init_parallel_dump_utils(void)
242 {
243 #ifdef WIN32
244 	if (!parallel_init_done)
245 	{
246 		WSADATA		wsaData;
247 		int			err;
248 
249 		/* Prepare for threaded operation */
250 		tls_index = TlsAlloc();
251 		mainThreadId = GetCurrentThreadId();
252 
253 		/* Initialize socket access */
254 		err = WSAStartup(MAKEWORD(2, 2), &wsaData);
255 		if (err != 0)
256 		{
257 			fprintf(stderr, _("%s: WSAStartup failed: %d\n"), progname, err);
258 			exit_nicely(1);
259 		}
260 
261 		parallel_init_done = true;
262 	}
263 #endif
264 }
265 
266 /*
267  * Find the ParallelSlot for the current worker process or thread.
268  *
269  * Returns NULL if no matching slot is found (this implies we're the master).
270  */
271 static ParallelSlot *
GetMyPSlot(ParallelState * pstate)272 GetMyPSlot(ParallelState *pstate)
273 {
274 	int			i;
275 
276 	for (i = 0; i < pstate->numWorkers; i++)
277 	{
278 #ifdef WIN32
279 		if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
280 #else
281 		if (pstate->parallelSlot[i].pid == getpid())
282 #endif
283 			return &(pstate->parallelSlot[i]);
284 	}
285 
286 	return NULL;
287 }
288 
289 /*
290  * A thread-local version of getLocalPQExpBuffer().
291  *
292  * Non-reentrant but reduces memory leakage: we'll consume one buffer per
293  * thread, which is much better than one per fmtId/fmtQualifiedId call.
294  */
295 #ifdef WIN32
296 static PQExpBuffer
getThreadLocalPQExpBuffer(void)297 getThreadLocalPQExpBuffer(void)
298 {
299 	/*
300 	 * The Tls code goes awry if we use a static var, so we provide for both
301 	 * static and auto, and omit any use of the static var when using Tls. We
302 	 * rely on TlsGetValue() to return 0 if the value is not yet set.
303 	 */
304 	static PQExpBuffer s_id_return = NULL;
305 	PQExpBuffer id_return;
306 
307 	if (parallel_init_done)
308 		id_return = (PQExpBuffer) TlsGetValue(tls_index);
309 	else
310 		id_return = s_id_return;
311 
312 	if (id_return)				/* first time through? */
313 	{
314 		/* same buffer, just wipe contents */
315 		resetPQExpBuffer(id_return);
316 	}
317 	else
318 	{
319 		/* new buffer */
320 		id_return = createPQExpBuffer();
321 		if (parallel_init_done)
322 			TlsSetValue(tls_index, id_return);
323 		else
324 			s_id_return = id_return;
325 	}
326 
327 	return id_return;
328 }
329 #endif							/* WIN32 */
330 
331 /*
332  * pg_dump and pg_restore call this to register the cleanup handler
333  * as soon as they've created the ArchiveHandle.
334  */
335 void
on_exit_close_archive(Archive * AHX)336 on_exit_close_archive(Archive *AHX)
337 {
338 	shutdown_info.AHX = AHX;
339 	on_exit_nicely(archive_close_connection, &shutdown_info);
340 }
341 
342 /*
343  * on_exit_nicely handler for shutting down database connections and
344  * worker processes cleanly.
345  */
346 static void
archive_close_connection(int code,void * arg)347 archive_close_connection(int code, void *arg)
348 {
349 	ShutdownInformation *si = (ShutdownInformation *) arg;
350 
351 	if (si->pstate)
352 	{
353 		/* In parallel mode, must figure out who we are */
354 		ParallelSlot *slot = GetMyPSlot(si->pstate);
355 
356 		if (!slot)
357 		{
358 			/*
359 			 * We're the master.  Forcibly shut down workers, then close our
360 			 * own database connection, if any.
361 			 */
362 			ShutdownWorkersHard(si->pstate);
363 
364 			if (si->AHX)
365 				DisconnectDatabase(si->AHX);
366 		}
367 		else
368 		{
369 			/*
370 			 * We're a worker.  Shut down our own DB connection if any.  On
371 			 * Windows, we also have to close our communication sockets, to
372 			 * emulate what will happen on Unix when the worker process exits.
373 			 * (Without this, if this is a premature exit, the master would
374 			 * fail to detect it because there would be no EOF condition on
375 			 * the other end of the pipe.)
376 			 */
377 			if (slot->AH)
378 				DisconnectDatabase(&(slot->AH->public));
379 
380 #ifdef WIN32
381 			closesocket(slot->pipeRevRead);
382 			closesocket(slot->pipeRevWrite);
383 #endif
384 		}
385 	}
386 	else
387 	{
388 		/* Non-parallel operation: just kill the master DB connection */
389 		if (si->AHX)
390 			DisconnectDatabase(si->AHX);
391 	}
392 }
393 
394 /*
395  * Forcibly shut down any remaining workers, waiting for them to finish.
396  *
397  * Note that we don't expect to come here during normal exit (the workers
398  * should be long gone, and the ParallelState too).  We're only here in an
399  * exit_horribly() situation, so intervening to cancel active commands is
400  * appropriate.
401  */
402 static void
ShutdownWorkersHard(ParallelState * pstate)403 ShutdownWorkersHard(ParallelState *pstate)
404 {
405 	int			i;
406 
407 	/*
408 	 * Close our write end of the sockets so that any workers waiting for
409 	 * commands know they can exit.  (Note: some of the pipeWrite fields might
410 	 * still be zero, if we failed to initialize all the workers.  Hence, just
411 	 * ignore errors here.)
412 	 */
413 	for (i = 0; i < pstate->numWorkers; i++)
414 		closesocket(pstate->parallelSlot[i].pipeWrite);
415 
416 	/*
417 	 * Force early termination of any commands currently in progress.
418 	 */
419 #ifndef WIN32
420 	/* On non-Windows, send SIGTERM to each worker process. */
421 	for (i = 0; i < pstate->numWorkers; i++)
422 	{
423 		pid_t		pid = pstate->parallelSlot[i].pid;
424 
425 		if (pid != 0)
426 			kill(pid, SIGTERM);
427 	}
428 #else
429 
430 	/*
431 	 * On Windows, send query cancels directly to the workers' backends.  Use
432 	 * a critical section to ensure worker threads don't change state.
433 	 */
434 	EnterCriticalSection(&signal_info_lock);
435 	for (i = 0; i < pstate->numWorkers; i++)
436 	{
437 		ArchiveHandle *AH = pstate->parallelSlot[i].AH;
438 		char		errbuf[1];
439 
440 		if (AH != NULL && AH->connCancel != NULL)
441 			(void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
442 	}
443 	LeaveCriticalSection(&signal_info_lock);
444 #endif
445 
446 	/* Now wait for them to terminate. */
447 	WaitForTerminatingWorkers(pstate);
448 }
449 
450 /*
451  * Wait for all workers to terminate.
452  */
453 static void
WaitForTerminatingWorkers(ParallelState * pstate)454 WaitForTerminatingWorkers(ParallelState *pstate)
455 {
456 	while (!HasEveryWorkerTerminated(pstate))
457 	{
458 		ParallelSlot *slot = NULL;
459 		int			j;
460 
461 #ifndef WIN32
462 		/* On non-Windows, use wait() to wait for next worker to end */
463 		int			status;
464 		pid_t		pid = wait(&status);
465 
466 		/* Find dead worker's slot, and clear the PID field */
467 		for (j = 0; j < pstate->numWorkers; j++)
468 		{
469 			slot = &(pstate->parallelSlot[j]);
470 			if (slot->pid == pid)
471 			{
472 				slot->pid = 0;
473 				break;
474 			}
475 		}
476 #else							/* WIN32 */
477 		/* On Windows, we must use WaitForMultipleObjects() */
478 		HANDLE	   *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
479 		int			nrun = 0;
480 		DWORD		ret;
481 		uintptr_t	hThread;
482 
483 		for (j = 0; j < pstate->numWorkers; j++)
484 		{
485 			if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
486 			{
487 				lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
488 				nrun++;
489 			}
490 		}
491 		ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
492 		Assert(ret != WAIT_FAILED);
493 		hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
494 		free(lpHandles);
495 
496 		/* Find dead worker's slot, and clear the hThread field */
497 		for (j = 0; j < pstate->numWorkers; j++)
498 		{
499 			slot = &(pstate->parallelSlot[j]);
500 			if (slot->hThread == hThread)
501 			{
502 				/* For cleanliness, close handles for dead threads */
503 				CloseHandle((HANDLE) slot->hThread);
504 				slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
505 				break;
506 			}
507 		}
508 #endif							/* WIN32 */
509 
510 		/* On all platforms, update workerStatus and te[] as well */
511 		Assert(j < pstate->numWorkers);
512 		slot->workerStatus = WRKR_TERMINATED;
513 		pstate->te[j] = NULL;
514 	}
515 }
516 
517 
518 /*
519  * Code for responding to cancel interrupts (SIGINT, control-C, etc)
520  *
521  * This doesn't quite belong in this module, but it needs access to the
522  * ParallelState data, so there's not really a better place either.
523  *
524  * When we get a cancel interrupt, we could just die, but in pg_restore that
525  * could leave a SQL command (e.g., CREATE INDEX on a large table) running
526  * for a long time.  Instead, we try to send a cancel request and then die.
527  * pg_dump probably doesn't really need this, but we might as well use it
528  * there too.  Note that sending the cancel directly from the signal handler
529  * is safe because PQcancel() is written to make it so.
530  *
531  * In parallel operation on Unix, each process is responsible for canceling
532  * its own connection (this must be so because nobody else has access to it).
533  * Furthermore, the master process should attempt to forward its signal to
534  * each child.  In simple manual use of pg_dump/pg_restore, forwarding isn't
535  * needed because typing control-C at the console would deliver SIGINT to
536  * every member of the terminal process group --- but in other scenarios it
537  * might be that only the master gets signaled.
538  *
539  * On Windows, the cancel handler runs in a separate thread, because that's
540  * how SetConsoleCtrlHandler works.  We make it stop worker threads, send
541  * cancels on all active connections, and then return FALSE, which will allow
542  * the process to die.  For safety's sake, we use a critical section to
543  * protect the PGcancel structures against being changed while the signal
544  * thread runs.
545  */
546 
547 #ifndef WIN32
548 
549 /*
550  * Signal handler (Unix only)
551  */
552 static void
sigTermHandler(SIGNAL_ARGS)553 sigTermHandler(SIGNAL_ARGS)
554 {
555 	int			i;
556 	char		errbuf[1];
557 
558 	/*
559 	 * Some platforms allow delivery of new signals to interrupt an active
560 	 * signal handler.  That could muck up our attempt to send PQcancel, so
561 	 * disable the signals that setup_cancel_handler enabled.
562 	 */
563 	pqsignal(SIGINT, SIG_IGN);
564 	pqsignal(SIGTERM, SIG_IGN);
565 	pqsignal(SIGQUIT, SIG_IGN);
566 
567 	/*
568 	 * If we're in the master, forward signal to all workers.  (It seems best
569 	 * to do this before PQcancel; killing the master transaction will result
570 	 * in invalid-snapshot errors from active workers, which maybe we can
571 	 * quiet by killing workers first.)  Ignore any errors.
572 	 */
573 	if (signal_info.pstate != NULL)
574 	{
575 		for (i = 0; i < signal_info.pstate->numWorkers; i++)
576 		{
577 			pid_t		pid = signal_info.pstate->parallelSlot[i].pid;
578 
579 			if (pid != 0)
580 				kill(pid, SIGTERM);
581 		}
582 	}
583 
584 	/*
585 	 * Send QueryCancel if we have a connection to send to.  Ignore errors,
586 	 * there's not much we can do about them anyway.
587 	 */
588 	if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
589 		(void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
590 
591 	/*
592 	 * Report we're quitting, using nothing more complicated than write(2).
593 	 * When in parallel operation, only the master process should do this.
594 	 */
595 	if (!signal_info.am_worker)
596 	{
597 		if (progname)
598 		{
599 			write_stderr(progname);
600 			write_stderr(": ");
601 		}
602 		write_stderr("terminated by user\n");
603 	}
604 
605 	/*
606 	 * And die, using _exit() not exit() because the latter will invoke atexit
607 	 * handlers that can fail if we interrupted related code.
608 	 */
609 	_exit(1);
610 }
611 
612 /*
613  * Enable cancel interrupt handler, if not already done.
614  */
615 static void
setup_cancel_handler(void)616 setup_cancel_handler(void)
617 {
618 	/*
619 	 * When forking, signal_info.handler_set will propagate into the new
620 	 * process, but that's fine because the signal handler state does too.
621 	 */
622 	if (!signal_info.handler_set)
623 	{
624 		signal_info.handler_set = true;
625 
626 		pqsignal(SIGINT, sigTermHandler);
627 		pqsignal(SIGTERM, sigTermHandler);
628 		pqsignal(SIGQUIT, sigTermHandler);
629 	}
630 }
631 
632 #else							/* WIN32 */
633 
634 /*
635  * Console interrupt handler --- runs in a newly-started thread.
636  *
637  * After stopping other threads and sending cancel requests on all open
638  * connections, we return FALSE which will allow the default ExitProcess()
639  * action to be taken.
640  */
641 static BOOL WINAPI
consoleHandler(DWORD dwCtrlType)642 consoleHandler(DWORD dwCtrlType)
643 {
644 	int			i;
645 	char		errbuf[1];
646 
647 	if (dwCtrlType == CTRL_C_EVENT ||
648 		dwCtrlType == CTRL_BREAK_EVENT)
649 	{
650 		/* Critical section prevents changing data we look at here */
651 		EnterCriticalSection(&signal_info_lock);
652 
653 		/*
654 		 * If in parallel mode, stop worker threads and send QueryCancel to
655 		 * their connected backends.  The main point of stopping the worker
656 		 * threads is to keep them from reporting the query cancels as errors,
657 		 * which would clutter the user's screen.  We needn't stop the master
658 		 * thread since it won't be doing much anyway.  Do this before
659 		 * canceling the main transaction, else we might get invalid-snapshot
660 		 * errors reported before we can stop the workers.  Ignore errors,
661 		 * there's not much we can do about them anyway.
662 		 */
663 		if (signal_info.pstate != NULL)
664 		{
665 			for (i = 0; i < signal_info.pstate->numWorkers; i++)
666 			{
667 				ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
668 				ArchiveHandle *AH = slot->AH;
669 				HANDLE		hThread = (HANDLE) slot->hThread;
670 
671 				/*
672 				 * Using TerminateThread here may leave some resources leaked,
673 				 * but it doesn't matter since we're about to end the whole
674 				 * process.
675 				 */
676 				if (hThread != INVALID_HANDLE_VALUE)
677 					TerminateThread(hThread, 0);
678 
679 				if (AH != NULL && AH->connCancel != NULL)
680 					(void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
681 			}
682 		}
683 
684 		/*
685 		 * Send QueryCancel to master connection, if enabled.  Ignore errors,
686 		 * there's not much we can do about them anyway.
687 		 */
688 		if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
689 			(void) PQcancel(signal_info.myAH->connCancel,
690 							errbuf, sizeof(errbuf));
691 
692 		LeaveCriticalSection(&signal_info_lock);
693 
694 		/*
695 		 * Report we're quitting, using nothing more complicated than
696 		 * write(2).  (We might be able to get away with using write_msg()
697 		 * here, but since we terminated other threads uncleanly above, it
698 		 * seems better to assume as little as possible.)
699 		 */
700 		if (progname)
701 		{
702 			write_stderr(progname);
703 			write_stderr(": ");
704 		}
705 		write_stderr("terminated by user\n");
706 	}
707 
708 	/* Always return FALSE to allow signal handling to continue */
709 	return FALSE;
710 }
711 
712 /*
713  * Enable cancel interrupt handler, if not already done.
714  */
715 static void
setup_cancel_handler(void)716 setup_cancel_handler(void)
717 {
718 	if (!signal_info.handler_set)
719 	{
720 		signal_info.handler_set = true;
721 
722 		InitializeCriticalSection(&signal_info_lock);
723 
724 		SetConsoleCtrlHandler(consoleHandler, TRUE);
725 	}
726 }
727 
728 #endif							/* WIN32 */
729 
730 
731 /*
732  * set_archive_cancel_info
733  *
734  * Fill AH->connCancel with cancellation info for the specified database
735  * connection; or clear it if conn is NULL.
736  */
737 void
set_archive_cancel_info(ArchiveHandle * AH,PGconn * conn)738 set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
739 {
740 	PGcancel   *oldConnCancel;
741 
742 	/*
743 	 * Activate the interrupt handler if we didn't yet in this process.  On
744 	 * Windows, this also initializes signal_info_lock; therefore it's
745 	 * important that this happen at least once before we fork off any
746 	 * threads.
747 	 */
748 	setup_cancel_handler();
749 
750 	/*
751 	 * On Unix, we assume that storing a pointer value is atomic with respect
752 	 * to any possible signal interrupt.  On Windows, use a critical section.
753 	 */
754 
755 #ifdef WIN32
756 	EnterCriticalSection(&signal_info_lock);
757 #endif
758 
759 	/* Free the old one if we have one */
760 	oldConnCancel = AH->connCancel;
761 	/* be sure interrupt handler doesn't use pointer while freeing */
762 	AH->connCancel = NULL;
763 
764 	if (oldConnCancel != NULL)
765 		PQfreeCancel(oldConnCancel);
766 
767 	/* Set the new one if specified */
768 	if (conn)
769 		AH->connCancel = PQgetCancel(conn);
770 
771 	/*
772 	 * On Unix, there's only ever one active ArchiveHandle per process, so we
773 	 * can just set signal_info.myAH unconditionally.  On Windows, do that
774 	 * only in the main thread; worker threads have to make sure their
775 	 * ArchiveHandle appears in the pstate data, which is dealt with in
776 	 * RunWorker().
777 	 */
778 #ifndef WIN32
779 	signal_info.myAH = AH;
780 #else
781 	if (mainThreadId == GetCurrentThreadId())
782 		signal_info.myAH = AH;
783 #endif
784 
785 #ifdef WIN32
786 	LeaveCriticalSection(&signal_info_lock);
787 #endif
788 }
789 
790 /*
791  * set_cancel_pstate
792  *
793  * Set signal_info.pstate to point to the specified ParallelState, if any.
794  * We need this mainly to have an interlock against Windows signal thread.
795  */
796 static void
set_cancel_pstate(ParallelState * pstate)797 set_cancel_pstate(ParallelState *pstate)
798 {
799 #ifdef WIN32
800 	EnterCriticalSection(&signal_info_lock);
801 #endif
802 
803 	signal_info.pstate = pstate;
804 
805 #ifdef WIN32
806 	LeaveCriticalSection(&signal_info_lock);
807 #endif
808 }
809 
810 /*
811  * set_cancel_slot_archive
812  *
813  * Set ParallelSlot's AH field to point to the specified archive, if any.
814  * We need this mainly to have an interlock against Windows signal thread.
815  */
816 static void
set_cancel_slot_archive(ParallelSlot * slot,ArchiveHandle * AH)817 set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
818 {
819 #ifdef WIN32
820 	EnterCriticalSection(&signal_info_lock);
821 #endif
822 
823 	slot->AH = AH;
824 
825 #ifdef WIN32
826 	LeaveCriticalSection(&signal_info_lock);
827 #endif
828 }
829 
830 
831 /*
832  * This function is called by both Unix and Windows variants to set up
833  * and run a worker process.  Caller should exit the process (or thread)
834  * upon return.
835  */
836 static void
RunWorker(ArchiveHandle * AH,ParallelSlot * slot)837 RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
838 {
839 	int			pipefd[2];
840 
841 	/* fetch child ends of pipes */
842 	pipefd[PIPE_READ] = slot->pipeRevRead;
843 	pipefd[PIPE_WRITE] = slot->pipeRevWrite;
844 
845 	/*
846 	 * Clone the archive so that we have our own state to work with, and in
847 	 * particular our own database connection.
848 	 *
849 	 * We clone on Unix as well as Windows, even though technically we don't
850 	 * need to because fork() gives us a copy in our own address space
851 	 * already.  But CloneArchive resets the state information and also clones
852 	 * the database connection which both seem kinda helpful.
853 	 */
854 	AH = CloneArchive(AH);
855 
856 	/* Remember cloned archive where signal handler can find it */
857 	set_cancel_slot_archive(slot, AH);
858 
859 	/*
860 	 * Call the setup worker function that's defined in the ArchiveHandle.
861 	 */
862 	(AH->SetupWorkerPtr) ((Archive *) AH);
863 
864 	/*
865 	 * Execute commands until done.
866 	 */
867 	WaitForCommands(AH, pipefd);
868 
869 	/*
870 	 * Disconnect from database and clean up.
871 	 */
872 	set_cancel_slot_archive(slot, NULL);
873 	DisconnectDatabase(&(AH->public));
874 	DeCloneArchive(AH);
875 }
876 
877 /*
878  * Thread base function for Windows
879  */
880 #ifdef WIN32
881 static unsigned __stdcall
init_spawned_worker_win32(WorkerInfo * wi)882 init_spawned_worker_win32(WorkerInfo *wi)
883 {
884 	ArchiveHandle *AH = wi->AH;
885 	ParallelSlot *slot = wi->slot;
886 
887 	/* Don't need WorkerInfo anymore */
888 	free(wi);
889 
890 	/* Run the worker ... */
891 	RunWorker(AH, slot);
892 
893 	/* Exit the thread */
894 	_endthreadex(0);
895 	return 0;
896 }
897 #endif							/* WIN32 */
898 
899 /*
900  * This function starts a parallel dump or restore by spawning off the worker
901  * processes.  For Windows, it creates a number of threads; on Unix the
902  * workers are created with fork().
903  */
904 ParallelState *
ParallelBackupStart(ArchiveHandle * AH)905 ParallelBackupStart(ArchiveHandle *AH)
906 {
907 	ParallelState *pstate;
908 	int			i;
909 
910 	Assert(AH->public.numWorkers > 0);
911 
912 	pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
913 
914 	pstate->numWorkers = AH->public.numWorkers;
915 	pstate->te = NULL;
916 	pstate->parallelSlot = NULL;
917 
918 	if (AH->public.numWorkers == 1)
919 		return pstate;
920 
921 	/* Create status arrays, being sure to initialize all fields to 0 */
922 	pstate->te = (TocEntry **)
923 		pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
924 	pstate->parallelSlot = (ParallelSlot *)
925 		pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
926 
927 #ifdef WIN32
928 	/* Make fmtId() and fmtQualifiedId() use thread-local storage */
929 	getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
930 #endif
931 
932 	/*
933 	 * Set the pstate in shutdown_info, to tell the exit handler that it must
934 	 * clean up workers as well as the main database connection.  But we don't
935 	 * set this in signal_info yet, because we don't want child processes to
936 	 * inherit non-NULL signal_info.pstate.
937 	 */
938 	shutdown_info.pstate = pstate;
939 
940 	/*
941 	 * Temporarily disable query cancellation on the master connection.  This
942 	 * ensures that child processes won't inherit valid AH->connCancel
943 	 * settings and thus won't try to issue cancels against the master's
944 	 * connection.  No harm is done if we fail while it's disabled, because
945 	 * the master connection is idle at this point anyway.
946 	 */
947 	set_archive_cancel_info(AH, NULL);
948 
949 	/* Ensure stdio state is quiesced before forking */
950 	fflush(NULL);
951 
952 	/* Create desired number of workers */
953 	for (i = 0; i < pstate->numWorkers; i++)
954 	{
955 #ifdef WIN32
956 		WorkerInfo *wi;
957 		uintptr_t	handle;
958 #else
959 		pid_t		pid;
960 #endif
961 		ParallelSlot *slot = &(pstate->parallelSlot[i]);
962 		int			pipeMW[2],
963 					pipeWM[2];
964 
965 		/* Create communication pipes for this worker */
966 		if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
967 			exit_horribly(modulename,
968 						  "could not create communication channels: %s\n",
969 						  strerror(errno));
970 
971 		/* master's ends of the pipes */
972 		slot->pipeRead = pipeWM[PIPE_READ];
973 		slot->pipeWrite = pipeMW[PIPE_WRITE];
974 		/* child's ends of the pipes */
975 		slot->pipeRevRead = pipeMW[PIPE_READ];
976 		slot->pipeRevWrite = pipeWM[PIPE_WRITE];
977 
978 #ifdef WIN32
979 		/* Create transient structure to pass args to worker function */
980 		wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
981 
982 		wi->AH = AH;
983 		wi->slot = slot;
984 
985 		handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
986 								wi, 0, &(slot->threadId));
987 		slot->hThread = handle;
988 		slot->workerStatus = WRKR_IDLE;
989 #else							/* !WIN32 */
990 		pid = fork();
991 		if (pid == 0)
992 		{
993 			/* we are the worker */
994 			int			j;
995 
996 			/* this is needed for GetMyPSlot() */
997 			slot->pid = getpid();
998 
999 			/* instruct signal handler that we're in a worker now */
1000 			signal_info.am_worker = true;
1001 
1002 			/* close read end of Worker -> Master */
1003 			closesocket(pipeWM[PIPE_READ]);
1004 			/* close write end of Master -> Worker */
1005 			closesocket(pipeMW[PIPE_WRITE]);
1006 
1007 			/*
1008 			 * Close all inherited fds for communication of the master with
1009 			 * previously-forked workers.
1010 			 */
1011 			for (j = 0; j < i; j++)
1012 			{
1013 				closesocket(pstate->parallelSlot[j].pipeRead);
1014 				closesocket(pstate->parallelSlot[j].pipeWrite);
1015 			}
1016 
1017 			/* Run the worker ... */
1018 			RunWorker(AH, slot);
1019 
1020 			/* We can just exit(0) when done */
1021 			exit(0);
1022 		}
1023 		else if (pid < 0)
1024 		{
1025 			/* fork failed */
1026 			exit_horribly(modulename,
1027 						  "could not create worker process: %s\n",
1028 						  strerror(errno));
1029 		}
1030 
1031 		/* In Master after successful fork */
1032 		slot->pid = pid;
1033 		slot->workerStatus = WRKR_IDLE;
1034 
1035 		/* close read end of Master -> Worker */
1036 		closesocket(pipeMW[PIPE_READ]);
1037 		/* close write end of Worker -> Master */
1038 		closesocket(pipeWM[PIPE_WRITE]);
1039 #endif							/* WIN32 */
1040 	}
1041 
1042 	/*
1043 	 * Having forked off the workers, disable SIGPIPE so that master isn't
1044 	 * killed if it tries to send a command to a dead worker.  We don't want
1045 	 * the workers to inherit this setting, though.
1046 	 */
1047 #ifndef WIN32
1048 	pqsignal(SIGPIPE, SIG_IGN);
1049 #endif
1050 
1051 	/*
1052 	 * Re-establish query cancellation on the master connection.
1053 	 */
1054 	set_archive_cancel_info(AH, AH->connection);
1055 
1056 	/*
1057 	 * Tell the cancel signal handler to forward signals to worker processes,
1058 	 * too.  (As with query cancel, we did not need this earlier because the
1059 	 * workers have not yet been given anything to do; if we die before this
1060 	 * point, any already-started workers will see EOF and quit promptly.)
1061 	 */
1062 	set_cancel_pstate(pstate);
1063 
1064 	return pstate;
1065 }
1066 
1067 /*
1068  * Close down a parallel dump or restore.
1069  */
1070 void
ParallelBackupEnd(ArchiveHandle * AH,ParallelState * pstate)1071 ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
1072 {
1073 	int			i;
1074 
1075 	/* No work if non-parallel */
1076 	if (pstate->numWorkers == 1)
1077 		return;
1078 
1079 	/* There should not be any unfinished jobs */
1080 	Assert(IsEveryWorkerIdle(pstate));
1081 
1082 	/* Close the sockets so that the workers know they can exit */
1083 	for (i = 0; i < pstate->numWorkers; i++)
1084 	{
1085 		closesocket(pstate->parallelSlot[i].pipeRead);
1086 		closesocket(pstate->parallelSlot[i].pipeWrite);
1087 	}
1088 
1089 	/* Wait for them to exit */
1090 	WaitForTerminatingWorkers(pstate);
1091 
1092 	/*
1093 	 * Unlink pstate from shutdown_info, so the exit handler will not try to
1094 	 * use it; and likewise unlink from signal_info.
1095 	 */
1096 	shutdown_info.pstate = NULL;
1097 	set_cancel_pstate(NULL);
1098 
1099 	/* Release state (mere neatnik-ism, since we're about to terminate) */
1100 	free(pstate->te);
1101 	free(pstate->parallelSlot);
1102 	free(pstate);
1103 }
1104 
1105 /*
1106  * These next four functions handle construction and parsing of the command
1107  * strings and response strings for parallel workers.
1108  *
1109  * Currently, these can be the same regardless of which archive format we are
1110  * processing.  In future, we might want to let format modules override these
1111  * functions to add format-specific data to a command or response.
1112  */
1113 
1114 /*
1115  * buildWorkerCommand: format a command string to send to a worker.
1116  *
1117  * The string is built in the caller-supplied buffer of size buflen.
1118  */
1119 static void
buildWorkerCommand(ArchiveHandle * AH,TocEntry * te,T_Action act,char * buf,int buflen)1120 buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act,
1121 				   char *buf, int buflen)
1122 {
1123 	if (act == ACT_DUMP)
1124 		snprintf(buf, buflen, "DUMP %d", te->dumpId);
1125 	else if (act == ACT_RESTORE)
1126 		snprintf(buf, buflen, "RESTORE %d", te->dumpId);
1127 	else
1128 		Assert(false);
1129 }
1130 
1131 /*
1132  * parseWorkerCommand: interpret a command string in a worker.
1133  */
1134 static void
parseWorkerCommand(ArchiveHandle * AH,TocEntry ** te,T_Action * act,const char * msg)1135 parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act,
1136 				   const char *msg)
1137 {
1138 	DumpId		dumpId;
1139 	int			nBytes;
1140 
1141 	if (messageStartsWith(msg, "DUMP "))
1142 	{
1143 		*act = ACT_DUMP;
1144 		sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
1145 		Assert(nBytes == strlen(msg));
1146 		*te = getTocEntryByDumpId(AH, dumpId);
1147 		Assert(*te != NULL);
1148 	}
1149 	else if (messageStartsWith(msg, "RESTORE "))
1150 	{
1151 		*act = ACT_RESTORE;
1152 		sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
1153 		Assert(nBytes == strlen(msg));
1154 		*te = getTocEntryByDumpId(AH, dumpId);
1155 		Assert(*te != NULL);
1156 	}
1157 	else
1158 		exit_horribly(modulename,
1159 					  "unrecognized command received from master: \"%s\"\n",
1160 					  msg);
1161 }
1162 
1163 /*
1164  * buildWorkerResponse: format a response string to send to the master.
1165  *
1166  * The string is built in the caller-supplied buffer of size buflen.
1167  */
1168 static void
buildWorkerResponse(ArchiveHandle * AH,TocEntry * te,T_Action act,int status,char * buf,int buflen)1169 buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status,
1170 					char *buf, int buflen)
1171 {
1172 	snprintf(buf, buflen, "OK %d %d %d",
1173 			 te->dumpId,
1174 			 status,
1175 			 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
1176 }
1177 
1178 /*
1179  * parseWorkerResponse: parse the status message returned by a worker.
1180  *
1181  * Returns the integer status code, and may update fields of AH and/or te.
1182  */
1183 static int
parseWorkerResponse(ArchiveHandle * AH,TocEntry * te,const char * msg)1184 parseWorkerResponse(ArchiveHandle *AH, TocEntry *te,
1185 					const char *msg)
1186 {
1187 	DumpId		dumpId;
1188 	int			nBytes,
1189 				n_errors;
1190 	int			status = 0;
1191 
1192 	if (messageStartsWith(msg, "OK "))
1193 	{
1194 		sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1195 
1196 		Assert(dumpId == te->dumpId);
1197 		Assert(nBytes == strlen(msg));
1198 
1199 		AH->public.n_errors += n_errors;
1200 	}
1201 	else
1202 		exit_horribly(modulename,
1203 					  "invalid message received from worker: \"%s\"\n",
1204 					  msg);
1205 
1206 	return status;
1207 }
1208 
1209 /*
1210  * Dispatch a job to some free worker.
1211  *
1212  * te is the TocEntry to be processed, act is the action to be taken on it.
1213  * callback is the function to call on completion of the job.
1214  *
1215  * If no worker is currently available, this will block, and previously
1216  * registered callback functions may be called.
1217  */
1218 void
DispatchJobForTocEntry(ArchiveHandle * AH,ParallelState * pstate,TocEntry * te,T_Action act,ParallelCompletionPtr callback,void * callback_data)1219 DispatchJobForTocEntry(ArchiveHandle *AH,
1220 					   ParallelState *pstate,
1221 					   TocEntry *te,
1222 					   T_Action act,
1223 					   ParallelCompletionPtr callback,
1224 					   void *callback_data)
1225 {
1226 	int			worker;
1227 	char		buf[256];
1228 
1229 	/* Get a worker, waiting if none are idle */
1230 	while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
1231 		WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
1232 
1233 	/* Construct and send command string */
1234 	buildWorkerCommand(AH, te, act, buf, sizeof(buf));
1235 
1236 	sendMessageToWorker(pstate, worker, buf);
1237 
1238 	/* Remember worker is busy, and which TocEntry it's working on */
1239 	pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
1240 	pstate->parallelSlot[worker].callback = callback;
1241 	pstate->parallelSlot[worker].callback_data = callback_data;
1242 	pstate->te[worker] = te;
1243 }
1244 
1245 /*
1246  * Find an idle worker and return its slot number.
1247  * Return NO_SLOT if none are idle.
1248  */
1249 static int
GetIdleWorker(ParallelState * pstate)1250 GetIdleWorker(ParallelState *pstate)
1251 {
1252 	int			i;
1253 
1254 	for (i = 0; i < pstate->numWorkers; i++)
1255 	{
1256 		if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
1257 			return i;
1258 	}
1259 	return NO_SLOT;
1260 }
1261 
1262 /*
1263  * Return true iff no worker is running.
1264  */
1265 static bool
HasEveryWorkerTerminated(ParallelState * pstate)1266 HasEveryWorkerTerminated(ParallelState *pstate)
1267 {
1268 	int			i;
1269 
1270 	for (i = 0; i < pstate->numWorkers; i++)
1271 	{
1272 		if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1273 			return false;
1274 	}
1275 	return true;
1276 }
1277 
1278 /*
1279  * Return true iff every worker is in the WRKR_IDLE state.
1280  */
1281 bool
IsEveryWorkerIdle(ParallelState * pstate)1282 IsEveryWorkerIdle(ParallelState *pstate)
1283 {
1284 	int			i;
1285 
1286 	for (i = 0; i < pstate->numWorkers; i++)
1287 	{
1288 		if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
1289 			return false;
1290 	}
1291 	return true;
1292 }
1293 
1294 /*
1295  * Acquire lock on a table to be dumped by a worker process.
1296  *
1297  * The master process is already holding an ACCESS SHARE lock.  Ordinarily
1298  * it's no problem for a worker to get one too, but if anything else besides
1299  * pg_dump is running, there's a possible deadlock:
1300  *
1301  * 1) Master dumps the schema and locks all tables in ACCESS SHARE mode.
1302  * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
1303  *	  because the master holds a conflicting ACCESS SHARE lock).
1304  * 3) A worker process also requests an ACCESS SHARE lock to read the table.
1305  *	  The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
1306  * 4) Now we have a deadlock, since the master is effectively waiting for
1307  *	  the worker.  The server cannot detect that, however.
1308  *
1309  * To prevent an infinite wait, prior to touching a table in a worker, request
1310  * a lock in ACCESS SHARE mode but with NOWAIT.  If we don't get the lock,
1311  * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
1312  * so we have a deadlock.  We must fail the backup in that case.
1313  */
1314 static void
lockTableForWorker(ArchiveHandle * AH,TocEntry * te)1315 lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
1316 {
1317 	const char *qualId;
1318 	PQExpBuffer query;
1319 	PGresult   *res;
1320 
1321 	/* Nothing to do for BLOBS */
1322 	if (strcmp(te->desc, "BLOBS") == 0)
1323 		return;
1324 
1325 	query = createPQExpBuffer();
1326 
1327 	qualId = fmtQualifiedId(AH->public.remoteVersion, te->namespace, te->tag);
1328 
1329 	appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
1330 					  qualId);
1331 
1332 	res = PQexec(AH->connection, query->data);
1333 
1334 	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
1335 		exit_horribly(modulename,
1336 					  "could not obtain lock on relation \"%s\"\n"
1337 					  "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1338 					  "on the table after the pg_dump parent process had gotten the "
1339 					  "initial ACCESS SHARE lock on the table.\n", qualId);
1340 
1341 	PQclear(res);
1342 	destroyPQExpBuffer(query);
1343 }
1344 
1345 /*
1346  * WaitForCommands: main routine for a worker process.
1347  *
1348  * Read and execute commands from the master until we see EOF on the pipe.
1349  */
1350 static void
WaitForCommands(ArchiveHandle * AH,int pipefd[2])1351 WaitForCommands(ArchiveHandle *AH, int pipefd[2])
1352 {
1353 	char	   *command;
1354 	TocEntry   *te;
1355 	T_Action	act;
1356 	int			status = 0;
1357 	char		buf[256];
1358 
1359 	for (;;)
1360 	{
1361 		if (!(command = getMessageFromMaster(pipefd)))
1362 		{
1363 			/* EOF, so done */
1364 			return;
1365 		}
1366 
1367 		/* Decode the command */
1368 		parseWorkerCommand(AH, &te, &act, command);
1369 
1370 		if (act == ACT_DUMP)
1371 		{
1372 			/* Acquire lock on this table within the worker's session */
1373 			lockTableForWorker(AH, te);
1374 
1375 			/* Perform the dump command */
1376 			status = (AH->WorkerJobDumpPtr) (AH, te);
1377 		}
1378 		else if (act == ACT_RESTORE)
1379 		{
1380 			/* Perform the restore command */
1381 			status = (AH->WorkerJobRestorePtr) (AH, te);
1382 		}
1383 		else
1384 			Assert(false);
1385 
1386 		/* Return status to master */
1387 		buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
1388 
1389 		sendMessageToMaster(pipefd, buf);
1390 
1391 		/* command was pg_malloc'd and we are responsible for free()ing it. */
1392 		free(command);
1393 	}
1394 }
1395 
1396 /*
1397  * Check for status messages from workers.
1398  *
1399  * If do_wait is true, wait to get a status message; otherwise, just return
1400  * immediately if there is none available.
1401  *
1402  * When we get a status message, we pass the status code to the callback
1403  * function that was specified to DispatchJobForTocEntry, then reset the
1404  * worker status to IDLE.
1405  *
1406  * Returns true if we collected a status message, else false.
1407  *
1408  * XXX is it worth checking for more than one status message per call?
1409  * It seems somewhat unlikely that multiple workers would finish at exactly
1410  * the same time.
1411  */
1412 static bool
ListenToWorkers(ArchiveHandle * AH,ParallelState * pstate,bool do_wait)1413 ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
1414 {
1415 	int			worker;
1416 	char	   *msg;
1417 
1418 	/* Try to collect a status message */
1419 	msg = getMessageFromWorker(pstate, do_wait, &worker);
1420 
1421 	if (!msg)
1422 	{
1423 		/* If do_wait is true, we must have detected EOF on some socket */
1424 		if (do_wait)
1425 			exit_horribly(modulename, "a worker process died unexpectedly\n");
1426 		return false;
1427 	}
1428 
1429 	/* Process it and update our idea of the worker's status */
1430 	if (messageStartsWith(msg, "OK "))
1431 	{
1432 		ParallelSlot *slot = &pstate->parallelSlot[worker];
1433 		TocEntry   *te = pstate->te[worker];
1434 		int			status;
1435 
1436 		status = parseWorkerResponse(AH, te, msg);
1437 		slot->callback(AH, te, status, slot->callback_data);
1438 		slot->workerStatus = WRKR_IDLE;
1439 		pstate->te[worker] = NULL;
1440 	}
1441 	else
1442 		exit_horribly(modulename,
1443 					  "invalid message received from worker: \"%s\"\n",
1444 					  msg);
1445 
1446 	/* Free the string returned from getMessageFromWorker */
1447 	free(msg);
1448 
1449 	return true;
1450 }
1451 
1452 /*
1453  * Check for status results from workers, waiting if necessary.
1454  *
1455  * Available wait modes are:
1456  * WFW_NO_WAIT: reap any available status, but don't block
1457  * WFW_GOT_STATUS: wait for at least one more worker to finish
1458  * WFW_ONE_IDLE: wait for at least one worker to be idle
1459  * WFW_ALL_IDLE: wait for all workers to be idle
1460  *
1461  * Any received results are passed to the callback specified to
1462  * DispatchJobForTocEntry.
1463  *
1464  * This function is executed in the master process.
1465  */
1466 void
WaitForWorkers(ArchiveHandle * AH,ParallelState * pstate,WFW_WaitOption mode)1467 WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
1468 {
1469 	bool		do_wait = false;
1470 
1471 	/*
1472 	 * In GOT_STATUS mode, always block waiting for a message, since we can't
1473 	 * return till we get something.  In other modes, we don't block the first
1474 	 * time through the loop.
1475 	 */
1476 	if (mode == WFW_GOT_STATUS)
1477 	{
1478 		/* Assert that caller knows what it's doing */
1479 		Assert(!IsEveryWorkerIdle(pstate));
1480 		do_wait = true;
1481 	}
1482 
1483 	for (;;)
1484 	{
1485 		/*
1486 		 * Check for status messages, even if we don't need to block.  We do
1487 		 * not try very hard to reap all available messages, though, since
1488 		 * there's unlikely to be more than one.
1489 		 */
1490 		if (ListenToWorkers(AH, pstate, do_wait))
1491 		{
1492 			/*
1493 			 * If we got a message, we are done by definition for GOT_STATUS
1494 			 * mode, and we can also be certain that there's at least one idle
1495 			 * worker.  So we're done in all but ALL_IDLE mode.
1496 			 */
1497 			if (mode != WFW_ALL_IDLE)
1498 				return;
1499 		}
1500 
1501 		/* Check whether we must wait for new status messages */
1502 		switch (mode)
1503 		{
1504 			case WFW_NO_WAIT:
1505 				return;			/* never wait */
1506 			case WFW_GOT_STATUS:
1507 				Assert(false);	/* can't get here, because we waited */
1508 				break;
1509 			case WFW_ONE_IDLE:
1510 				if (GetIdleWorker(pstate) != NO_SLOT)
1511 					return;
1512 				break;
1513 			case WFW_ALL_IDLE:
1514 				if (IsEveryWorkerIdle(pstate))
1515 					return;
1516 				break;
1517 		}
1518 
1519 		/* Loop back, and this time wait for something to happen */
1520 		do_wait = true;
1521 	}
1522 }
1523 
1524 /*
1525  * Read one command message from the master, blocking if necessary
1526  * until one is available, and return it as a malloc'd string.
1527  * On EOF, return NULL.
1528  *
1529  * This function is executed in worker processes.
1530  */
1531 static char *
getMessageFromMaster(int pipefd[2])1532 getMessageFromMaster(int pipefd[2])
1533 {
1534 	return readMessageFromPipe(pipefd[PIPE_READ]);
1535 }
1536 
1537 /*
1538  * Send a status message to the master.
1539  *
1540  * This function is executed in worker processes.
1541  */
1542 static void
sendMessageToMaster(int pipefd[2],const char * str)1543 sendMessageToMaster(int pipefd[2], const char *str)
1544 {
1545 	int			len = strlen(str) + 1;
1546 
1547 	if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
1548 		exit_horribly(modulename,
1549 					  "could not write to the communication channel: %s\n",
1550 					  strerror(errno));
1551 }
1552 
1553 /*
1554  * Wait until some descriptor in "workerset" becomes readable.
1555  * Returns -1 on error, else the number of readable descriptors.
1556  */
1557 static int
select_loop(int maxFd,fd_set * workerset)1558 select_loop(int maxFd, fd_set *workerset)
1559 {
1560 	int			i;
1561 	fd_set		saveSet = *workerset;
1562 
1563 	for (;;)
1564 	{
1565 		*workerset = saveSet;
1566 		i = select(maxFd + 1, workerset, NULL, NULL, NULL);
1567 
1568 #ifndef WIN32
1569 		if (i < 0 && errno == EINTR)
1570 			continue;
1571 #else
1572 		if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1573 			continue;
1574 #endif
1575 		break;
1576 	}
1577 
1578 	return i;
1579 }
1580 
1581 
1582 /*
1583  * Check for messages from worker processes.
1584  *
1585  * If a message is available, return it as a malloc'd string, and put the
1586  * index of the sending worker in *worker.
1587  *
1588  * If nothing is available, wait if "do_wait" is true, else return NULL.
1589  *
1590  * If we detect EOF on any socket, we'll return NULL.  It's not great that
1591  * that's hard to distinguish from the no-data-available case, but for now
1592  * our one caller is okay with that.
1593  *
1594  * This function is executed in the master process.
1595  */
1596 static char *
getMessageFromWorker(ParallelState * pstate,bool do_wait,int * worker)1597 getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
1598 {
1599 	int			i;
1600 	fd_set		workerset;
1601 	int			maxFd = -1;
1602 	struct timeval nowait = {0, 0};
1603 
1604 	/* construct bitmap of socket descriptors for select() */
1605 	FD_ZERO(&workerset);
1606 	for (i = 0; i < pstate->numWorkers; i++)
1607 	{
1608 		if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1609 			continue;
1610 		FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
1611 		if (pstate->parallelSlot[i].pipeRead > maxFd)
1612 			maxFd = pstate->parallelSlot[i].pipeRead;
1613 	}
1614 
1615 	if (do_wait)
1616 	{
1617 		i = select_loop(maxFd, &workerset);
1618 		Assert(i != 0);
1619 	}
1620 	else
1621 	{
1622 		if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1623 			return NULL;
1624 	}
1625 
1626 	if (i < 0)
1627 		exit_horribly(modulename, "select() failed: %s\n", strerror(errno));
1628 
1629 	for (i = 0; i < pstate->numWorkers; i++)
1630 	{
1631 		char	   *msg;
1632 
1633 		if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1634 			continue;
1635 		if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
1636 			continue;
1637 
1638 		/*
1639 		 * Read the message if any.  If the socket is ready because of EOF,
1640 		 * we'll return NULL instead (and the socket will stay ready, so the
1641 		 * condition will persist).
1642 		 *
1643 		 * Note: because this is a blocking read, we'll wait if only part of
1644 		 * the message is available.  Waiting a long time would be bad, but
1645 		 * since worker status messages are short and are always sent in one
1646 		 * operation, it shouldn't be a problem in practice.
1647 		 */
1648 		msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
1649 		*worker = i;
1650 		return msg;
1651 	}
1652 	Assert(false);
1653 	return NULL;
1654 }
1655 
1656 /*
1657  * Send a command message to the specified worker process.
1658  *
1659  * This function is executed in the master process.
1660  */
1661 static void
sendMessageToWorker(ParallelState * pstate,int worker,const char * str)1662 sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
1663 {
1664 	int			len = strlen(str) + 1;
1665 
1666 	if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
1667 	{
1668 		exit_horribly(modulename,
1669 					  "could not write to the communication channel: %s\n",
1670 					  strerror(errno));
1671 	}
1672 }
1673 
1674 /*
1675  * Read one message from the specified pipe (fd), blocking if necessary
1676  * until one is available, and return it as a malloc'd string.
1677  * On EOF, return NULL.
1678  *
1679  * A "message" on the channel is just a null-terminated string.
1680  */
1681 static char *
readMessageFromPipe(int fd)1682 readMessageFromPipe(int fd)
1683 {
1684 	char	   *msg;
1685 	int			msgsize,
1686 				bufsize;
1687 	int			ret;
1688 
1689 	/*
1690 	 * In theory, if we let piperead() read multiple bytes, it might give us
1691 	 * back fragments of multiple messages.  (That can't actually occur, since
1692 	 * neither master nor workers send more than one message without waiting
1693 	 * for a reply, but we don't wish to assume that here.)  For simplicity,
1694 	 * read a byte at a time until we get the terminating '\0'.  This method
1695 	 * is a bit inefficient, but since this is only used for relatively short
1696 	 * command and status strings, it shouldn't matter.
1697 	 */
1698 	bufsize = 64;				/* could be any number */
1699 	msg = (char *) pg_malloc(bufsize);
1700 	msgsize = 0;
1701 	for (;;)
1702 	{
1703 		Assert(msgsize < bufsize);
1704 		ret = piperead(fd, msg + msgsize, 1);
1705 		if (ret <= 0)
1706 			break;				/* error or connection closure */
1707 
1708 		Assert(ret == 1);
1709 
1710 		if (msg[msgsize] == '\0')
1711 			return msg;			/* collected whole message */
1712 
1713 		msgsize++;
1714 		if (msgsize == bufsize) /* enlarge buffer if needed */
1715 		{
1716 			bufsize += 16;		/* could be any number */
1717 			msg = (char *) pg_realloc(msg, bufsize);
1718 		}
1719 	}
1720 
1721 	/* Other end has closed the connection */
1722 	pg_free(msg);
1723 	return NULL;
1724 }
1725 
1726 #ifdef WIN32
1727 
1728 /*
1729  * This is a replacement version of pipe(2) for Windows which allows the pipe
1730  * handles to be used in select().
1731  *
1732  * Reads and writes on the pipe must go through piperead()/pipewrite().
1733  *
1734  * For consistency with Unix we declare the returned handles as "int".
1735  * This is okay even on WIN64 because system handles are not more than
1736  * 32 bits wide, but we do have to do some casting.
1737  */
1738 static int
pgpipe(int handles[2])1739 pgpipe(int handles[2])
1740 {
1741 	pgsocket	s,
1742 				tmp_sock;
1743 	struct sockaddr_in serv_addr;
1744 	int			len = sizeof(serv_addr);
1745 
1746 	/* We have to use the Unix socket invalid file descriptor value here. */
1747 	handles[0] = handles[1] = -1;
1748 
1749 	/*
1750 	 * setup listen socket
1751 	 */
1752 	if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1753 	{
1754 		write_msg(modulename, "pgpipe: could not create socket: error code %d\n",
1755 				  WSAGetLastError());
1756 		return -1;
1757 	}
1758 
1759 	memset((void *) &serv_addr, 0, sizeof(serv_addr));
1760 	serv_addr.sin_family = AF_INET;
1761 	serv_addr.sin_port = htons(0);
1762 	serv_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1763 	if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1764 	{
1765 		write_msg(modulename, "pgpipe: could not bind: error code %d\n",
1766 				  WSAGetLastError());
1767 		closesocket(s);
1768 		return -1;
1769 	}
1770 	if (listen(s, 1) == SOCKET_ERROR)
1771 	{
1772 		write_msg(modulename, "pgpipe: could not listen: error code %d\n",
1773 				  WSAGetLastError());
1774 		closesocket(s);
1775 		return -1;
1776 	}
1777 	if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
1778 	{
1779 		write_msg(modulename, "pgpipe: getsockname() failed: error code %d\n",
1780 				  WSAGetLastError());
1781 		closesocket(s);
1782 		return -1;
1783 	}
1784 
1785 	/*
1786 	 * setup pipe handles
1787 	 */
1788 	if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1789 	{
1790 		write_msg(modulename, "pgpipe: could not create second socket: error code %d\n",
1791 				  WSAGetLastError());
1792 		closesocket(s);
1793 		return -1;
1794 	}
1795 	handles[1] = (int) tmp_sock;
1796 
1797 	if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1798 	{
1799 		write_msg(modulename, "pgpipe: could not connect socket: error code %d\n",
1800 				  WSAGetLastError());
1801 		closesocket(handles[1]);
1802 		handles[1] = -1;
1803 		closesocket(s);
1804 		return -1;
1805 	}
1806 	if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
1807 	{
1808 		write_msg(modulename, "pgpipe: could not accept connection: error code %d\n",
1809 				  WSAGetLastError());
1810 		closesocket(handles[1]);
1811 		handles[1] = -1;
1812 		closesocket(s);
1813 		return -1;
1814 	}
1815 	handles[0] = (int) tmp_sock;
1816 
1817 	closesocket(s);
1818 	return 0;
1819 }
1820 
1821 /*
1822  * Windows implementation of reading from a pipe.
1823  */
1824 static int
piperead(int s,char * buf,int len)1825 piperead(int s, char *buf, int len)
1826 {
1827 	int			ret = recv(s, buf, len, 0);
1828 
1829 	if (ret < 0 && WSAGetLastError() == WSAECONNRESET)
1830 	{
1831 		/* EOF on the pipe! */
1832 		ret = 0;
1833 	}
1834 	return ret;
1835 }
1836 
1837 #endif							/* WIN32 */
1838