1 /*-------------------------------------------------------------------------
2  *
3  * parallel.c
4  *
5  *	Parallel support for pg_dump and pg_restore
6  *
7  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *		src/bin/pg_dump/parallel.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 /*
17  * Parallel operation works like this:
18  *
19  * The original, master process calls ParallelBackupStart(), which forks off
20  * the desired number of worker processes, which each enter WaitForCommands().
21  *
22  * The master process dispatches an individual work item to one of the worker
23  * processes in DispatchJobForTocEntry().  We send a command string such as
24  * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
25  * The worker process receives and decodes the command and passes it to the
26  * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
27  * which are routines of the current archive format.  That routine performs
28  * the required action (dump or restore) and returns an integer status code.
29  * This is passed back to the master where we pass it to the
30  * ParallelCompletionPtr callback function that was passed to
31  * DispatchJobForTocEntry().  The callback function does state updating
32  * for the master control logic in pg_backup_archiver.c.
33  *
34  * In principle additional archive-format-specific information might be needed
35  * in commands or worker status responses, but so far that hasn't proved
36  * necessary, since workers have full copies of the ArchiveHandle/TocEntry
37  * data structures.  Remember that we have forked off the workers only after
38  * we have read in the catalog.  That's why our worker processes can also
39  * access the catalog information.  (In the Windows case, the workers are
40  * threads in the same process.  To avoid problems, they work with cloned
41  * copies of the Archive data structure; see RunWorker().)
42  *
43  * In the master process, the workerStatus field for each worker has one of
44  * the following values:
45  *		WRKR_NOT_STARTED: we've not yet forked this worker
46  *		WRKR_IDLE: it's waiting for a command
47  *		WRKR_WORKING: it's working on a command
48  *		WRKR_TERMINATED: process ended
49  * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
50  * state, and must be NULL in other states.
51  */
52 
53 #include "postgres_fe.h"
54 
55 #ifndef WIN32
56 #include <sys/wait.h>
57 #include <signal.h>
58 #include <unistd.h>
59 #include <fcntl.h>
60 #endif
61 #ifdef HAVE_SYS_SELECT_H
62 #include <sys/select.h>
63 #endif
64 
65 #include "parallel.h"
66 #include "pg_backup_utils.h"
67 
68 #include "fe_utils/string_utils.h"
69 #include "port/pg_bswap.h"
70 
71 /* Mnemonic macros for indexing the fd array returned by pipe(2) */
72 #define PIPE_READ							0
73 #define PIPE_WRITE							1
74 
75 #define NO_SLOT (-1)			/* Failure result for GetIdleWorker() */
76 
77 /* Worker process statuses */
78 typedef enum
79 {
80 	WRKR_NOT_STARTED = 0,
81 	WRKR_IDLE,
82 	WRKR_WORKING,
83 	WRKR_TERMINATED
84 } T_WorkerStatus;
85 
86 #define WORKER_IS_RUNNING(workerStatus) \
87 	((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
88 
89 /*
90  * Private per-parallel-worker state (typedef for this is in parallel.h).
91  *
92  * Much of this is valid only in the master process (or, on Windows, should
93  * be touched only by the master thread).  But the AH field should be touched
94  * only by workers.  The pipe descriptors are valid everywhere.
95  */
96 struct ParallelSlot
97 {
98 	T_WorkerStatus workerStatus;	/* see enum above */
99 
100 	/* These fields are valid if workerStatus == WRKR_WORKING: */
101 	ParallelCompletionPtr callback; /* function to call on completion */
102 	void	   *callback_data;	/* passthrough data for it */
103 
104 	ArchiveHandle *AH;			/* Archive data worker is using */
105 
106 	int			pipeRead;		/* master's end of the pipes */
107 	int			pipeWrite;
108 	int			pipeRevRead;	/* child's end of the pipes */
109 	int			pipeRevWrite;
110 
111 	/* Child process/thread identity info: */
112 #ifdef WIN32
113 	uintptr_t	hThread;
114 	unsigned int threadId;
115 #else
116 	pid_t		pid;
117 #endif
118 };
119 
120 #ifdef WIN32
121 
122 /*
123  * Structure to hold info passed by _beginthreadex() to the function it calls
124  * via its single allowed argument.
125  */
126 typedef struct
127 {
128 	ArchiveHandle *AH;			/* master database connection */
129 	ParallelSlot *slot;			/* this worker's parallel slot */
130 } WorkerInfo;
131 
132 /* Windows implementation of pipe access */
133 static int	pgpipe(int handles[2]);
134 static int	piperead(int s, char *buf, int len);
135 #define pipewrite(a,b,c)	send(a,b,c,0)
136 
137 #else							/* !WIN32 */
138 
139 /* Non-Windows implementation of pipe access */
140 #define pgpipe(a)			pipe(a)
141 #define piperead(a,b,c)		read(a,b,c)
142 #define pipewrite(a,b,c)	write(a,b,c)
143 
144 #endif							/* WIN32 */
145 
146 /*
147  * State info for archive_close_connection() shutdown callback.
148  */
149 typedef struct ShutdownInformation
150 {
151 	ParallelState *pstate;
152 	Archive    *AHX;
153 } ShutdownInformation;
154 
155 static ShutdownInformation shutdown_info;
156 
157 /*
158  * State info for signal handling.
159  * We assume signal_info initializes to zeroes.
160  *
161  * On Unix, myAH is the master DB connection in the master process, and the
162  * worker's own connection in worker processes.  On Windows, we have only one
163  * instance of signal_info, so myAH is the master connection and the worker
164  * connections must be dug out of pstate->parallelSlot[].
165  */
166 typedef struct DumpSignalInformation
167 {
168 	ArchiveHandle *myAH;		/* database connection to issue cancel for */
169 	ParallelState *pstate;		/* parallel state, if any */
170 	bool		handler_set;	/* signal handler set up in this process? */
171 #ifndef WIN32
172 	bool		am_worker;		/* am I a worker process? */
173 #endif
174 } DumpSignalInformation;
175 
176 static volatile DumpSignalInformation signal_info;
177 
178 #ifdef WIN32
179 static CRITICAL_SECTION signal_info_lock;
180 #endif
181 
182 /*
183  * Write a simple string to stderr --- must be safe in a signal handler.
184  * We ignore the write() result since there's not much we could do about it.
185  * Certain compilers make that harder than it ought to be.
186  */
187 #define write_stderr(str) \
188 	do { \
189 		const char *str_ = (str); \
190 		int		rc_; \
191 		rc_ = write(fileno(stderr), str_, strlen(str_)); \
192 		(void) rc_; \
193 	} while (0)
194 
195 
196 #ifdef WIN32
197 /* file-scope variables */
198 static DWORD tls_index;
199 
200 /* globally visible variables (needed by exit_nicely) */
201 bool		parallel_init_done = false;
202 DWORD		mainThreadId;
203 #endif							/* WIN32 */
204 
205 static const char *modulename = gettext_noop("parallel archiver");
206 
207 /* Local function prototypes */
208 static ParallelSlot *GetMyPSlot(ParallelState *pstate);
209 static void archive_close_connection(int code, void *arg);
210 static void ShutdownWorkersHard(ParallelState *pstate);
211 static void WaitForTerminatingWorkers(ParallelState *pstate);
212 static void setup_cancel_handler(void);
213 static void set_cancel_pstate(ParallelState *pstate);
214 static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
215 static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
216 static int	GetIdleWorker(ParallelState *pstate);
217 static bool HasEveryWorkerTerminated(ParallelState *pstate);
218 static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
219 static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
220 static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
221 				bool do_wait);
222 static char *getMessageFromMaster(int pipefd[2]);
223 static void sendMessageToMaster(int pipefd[2], const char *str);
224 static int	select_loop(int maxFd, fd_set *workerset);
225 static char *getMessageFromWorker(ParallelState *pstate,
226 					 bool do_wait, int *worker);
227 static void sendMessageToWorker(ParallelState *pstate,
228 					int worker, const char *str);
229 static char *readMessageFromPipe(int fd);
230 
231 #define messageStartsWith(msg, prefix) \
232 	(strncmp(msg, prefix, strlen(prefix)) == 0)
233 #define messageEquals(msg, pattern) \
234 	(strcmp(msg, pattern) == 0)
235 
236 
237 /*
238  * Initialize parallel dump support --- should be called early in process
239  * startup.  (Currently, this is called whether or not we intend parallel
240  * activity.)
241  */
242 void
init_parallel_dump_utils(void)243 init_parallel_dump_utils(void)
244 {
245 #ifdef WIN32
246 	if (!parallel_init_done)
247 	{
248 		WSADATA		wsaData;
249 		int			err;
250 
251 		/* Prepare for threaded operation */
252 		tls_index = TlsAlloc();
253 		mainThreadId = GetCurrentThreadId();
254 
255 		/* Initialize socket access */
256 		err = WSAStartup(MAKEWORD(2, 2), &wsaData);
257 		if (err != 0)
258 		{
259 			fprintf(stderr, _("%s: WSAStartup failed: %d\n"), progname, err);
260 			exit_nicely(1);
261 		}
262 
263 		parallel_init_done = true;
264 	}
265 #endif
266 }
267 
268 /*
269  * Find the ParallelSlot for the current worker process or thread.
270  *
271  * Returns NULL if no matching slot is found (this implies we're the master).
272  */
273 static ParallelSlot *
GetMyPSlot(ParallelState * pstate)274 GetMyPSlot(ParallelState *pstate)
275 {
276 	int			i;
277 
278 	for (i = 0; i < pstate->numWorkers; i++)
279 	{
280 #ifdef WIN32
281 		if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
282 #else
283 		if (pstate->parallelSlot[i].pid == getpid())
284 #endif
285 			return &(pstate->parallelSlot[i]);
286 	}
287 
288 	return NULL;
289 }
290 
291 /*
292  * A thread-local version of getLocalPQExpBuffer().
293  *
294  * Non-reentrant but reduces memory leakage: we'll consume one buffer per
295  * thread, which is much better than one per fmtId/fmtQualifiedId call.
296  */
297 #ifdef WIN32
298 static PQExpBuffer
getThreadLocalPQExpBuffer(void)299 getThreadLocalPQExpBuffer(void)
300 {
301 	/*
302 	 * The Tls code goes awry if we use a static var, so we provide for both
303 	 * static and auto, and omit any use of the static var when using Tls. We
304 	 * rely on TlsGetValue() to return 0 if the value is not yet set.
305 	 */
306 	static PQExpBuffer s_id_return = NULL;
307 	PQExpBuffer id_return;
308 
309 	if (parallel_init_done)
310 		id_return = (PQExpBuffer) TlsGetValue(tls_index);
311 	else
312 		id_return = s_id_return;
313 
314 	if (id_return)				/* first time through? */
315 	{
316 		/* same buffer, just wipe contents */
317 		resetPQExpBuffer(id_return);
318 	}
319 	else
320 	{
321 		/* new buffer */
322 		id_return = createPQExpBuffer();
323 		if (parallel_init_done)
324 			TlsSetValue(tls_index, id_return);
325 		else
326 			s_id_return = id_return;
327 	}
328 
329 	return id_return;
330 }
331 #endif							/* WIN32 */
332 
333 /*
334  * pg_dump and pg_restore call this to register the cleanup handler
335  * as soon as they've created the ArchiveHandle.
336  */
337 void
on_exit_close_archive(Archive * AHX)338 on_exit_close_archive(Archive *AHX)
339 {
340 	shutdown_info.AHX = AHX;
341 	on_exit_nicely(archive_close_connection, &shutdown_info);
342 }
343 
344 /*
345  * on_exit_nicely handler for shutting down database connections and
346  * worker processes cleanly.
347  */
348 static void
archive_close_connection(int code,void * arg)349 archive_close_connection(int code, void *arg)
350 {
351 	ShutdownInformation *si = (ShutdownInformation *) arg;
352 
353 	if (si->pstate)
354 	{
355 		/* In parallel mode, must figure out who we are */
356 		ParallelSlot *slot = GetMyPSlot(si->pstate);
357 
358 		if (!slot)
359 		{
360 			/*
361 			 * We're the master.  Forcibly shut down workers, then close our
362 			 * own database connection, if any.
363 			 */
364 			ShutdownWorkersHard(si->pstate);
365 
366 			if (si->AHX)
367 				DisconnectDatabase(si->AHX);
368 		}
369 		else
370 		{
371 			/*
372 			 * We're a worker.  Shut down our own DB connection if any.  On
373 			 * Windows, we also have to close our communication sockets, to
374 			 * emulate what will happen on Unix when the worker process exits.
375 			 * (Without this, if this is a premature exit, the master would
376 			 * fail to detect it because there would be no EOF condition on
377 			 * the other end of the pipe.)
378 			 */
379 			if (slot->AH)
380 				DisconnectDatabase(&(slot->AH->public));
381 
382 #ifdef WIN32
383 			closesocket(slot->pipeRevRead);
384 			closesocket(slot->pipeRevWrite);
385 #endif
386 		}
387 	}
388 	else
389 	{
390 		/* Non-parallel operation: just kill the master DB connection */
391 		if (si->AHX)
392 			DisconnectDatabase(si->AHX);
393 	}
394 }
395 
396 /*
397  * Forcibly shut down any remaining workers, waiting for them to finish.
398  *
399  * Note that we don't expect to come here during normal exit (the workers
400  * should be long gone, and the ParallelState too).  We're only here in an
401  * exit_horribly() situation, so intervening to cancel active commands is
402  * appropriate.
403  */
404 static void
ShutdownWorkersHard(ParallelState * pstate)405 ShutdownWorkersHard(ParallelState *pstate)
406 {
407 	int			i;
408 
409 	/*
410 	 * Close our write end of the sockets so that any workers waiting for
411 	 * commands know they can exit.  (Note: some of the pipeWrite fields might
412 	 * still be zero, if we failed to initialize all the workers.  Hence, just
413 	 * ignore errors here.)
414 	 */
415 	for (i = 0; i < pstate->numWorkers; i++)
416 		closesocket(pstate->parallelSlot[i].pipeWrite);
417 
418 	/*
419 	 * Force early termination of any commands currently in progress.
420 	 */
421 #ifndef WIN32
422 	/* On non-Windows, send SIGTERM to each worker process. */
423 	for (i = 0; i < pstate->numWorkers; i++)
424 	{
425 		pid_t		pid = pstate->parallelSlot[i].pid;
426 
427 		if (pid != 0)
428 			kill(pid, SIGTERM);
429 	}
430 #else
431 
432 	/*
433 	 * On Windows, send query cancels directly to the workers' backends.  Use
434 	 * a critical section to ensure worker threads don't change state.
435 	 */
436 	EnterCriticalSection(&signal_info_lock);
437 	for (i = 0; i < pstate->numWorkers; i++)
438 	{
439 		ArchiveHandle *AH = pstate->parallelSlot[i].AH;
440 		char		errbuf[1];
441 
442 		if (AH != NULL && AH->connCancel != NULL)
443 			(void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
444 	}
445 	LeaveCriticalSection(&signal_info_lock);
446 #endif
447 
448 	/* Now wait for them to terminate. */
449 	WaitForTerminatingWorkers(pstate);
450 }
451 
452 /*
453  * Wait for all workers to terminate.
454  */
455 static void
WaitForTerminatingWorkers(ParallelState * pstate)456 WaitForTerminatingWorkers(ParallelState *pstate)
457 {
458 	while (!HasEveryWorkerTerminated(pstate))
459 	{
460 		ParallelSlot *slot = NULL;
461 		int			j;
462 
463 #ifndef WIN32
464 		/* On non-Windows, use wait() to wait for next worker to end */
465 		int			status;
466 		pid_t		pid = wait(&status);
467 
468 		/* Find dead worker's slot, and clear the PID field */
469 		for (j = 0; j < pstate->numWorkers; j++)
470 		{
471 			slot = &(pstate->parallelSlot[j]);
472 			if (slot->pid == pid)
473 			{
474 				slot->pid = 0;
475 				break;
476 			}
477 		}
478 #else							/* WIN32 */
479 		/* On Windows, we must use WaitForMultipleObjects() */
480 		HANDLE	   *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
481 		int			nrun = 0;
482 		DWORD		ret;
483 		uintptr_t	hThread;
484 
485 		for (j = 0; j < pstate->numWorkers; j++)
486 		{
487 			if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
488 			{
489 				lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
490 				nrun++;
491 			}
492 		}
493 		ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
494 		Assert(ret != WAIT_FAILED);
495 		hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
496 		free(lpHandles);
497 
498 		/* Find dead worker's slot, and clear the hThread field */
499 		for (j = 0; j < pstate->numWorkers; j++)
500 		{
501 			slot = &(pstate->parallelSlot[j]);
502 			if (slot->hThread == hThread)
503 			{
504 				/* For cleanliness, close handles for dead threads */
505 				CloseHandle((HANDLE) slot->hThread);
506 				slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
507 				break;
508 			}
509 		}
510 #endif							/* WIN32 */
511 
512 		/* On all platforms, update workerStatus and te[] as well */
513 		Assert(j < pstate->numWorkers);
514 		slot->workerStatus = WRKR_TERMINATED;
515 		pstate->te[j] = NULL;
516 	}
517 }
518 
519 
520 /*
521  * Code for responding to cancel interrupts (SIGINT, control-C, etc)
522  *
523  * This doesn't quite belong in this module, but it needs access to the
524  * ParallelState data, so there's not really a better place either.
525  *
526  * When we get a cancel interrupt, we could just die, but in pg_restore that
527  * could leave a SQL command (e.g., CREATE INDEX on a large table) running
528  * for a long time.  Instead, we try to send a cancel request and then die.
529  * pg_dump probably doesn't really need this, but we might as well use it
530  * there too.  Note that sending the cancel directly from the signal handler
531  * is safe because PQcancel() is written to make it so.
532  *
533  * In parallel operation on Unix, each process is responsible for canceling
534  * its own connection (this must be so because nobody else has access to it).
535  * Furthermore, the master process should attempt to forward its signal to
536  * each child.  In simple manual use of pg_dump/pg_restore, forwarding isn't
537  * needed because typing control-C at the console would deliver SIGINT to
538  * every member of the terminal process group --- but in other scenarios it
539  * might be that only the master gets signaled.
540  *
541  * On Windows, the cancel handler runs in a separate thread, because that's
542  * how SetConsoleCtrlHandler works.  We make it stop worker threads, send
543  * cancels on all active connections, and then return FALSE, which will allow
544  * the process to die.  For safety's sake, we use a critical section to
545  * protect the PGcancel structures against being changed while the signal
546  * thread runs.
547  */
548 
549 #ifndef WIN32
550 
551 /*
552  * Signal handler (Unix only)
553  */
554 static void
sigTermHandler(SIGNAL_ARGS)555 sigTermHandler(SIGNAL_ARGS)
556 {
557 	int			i;
558 	char		errbuf[1];
559 
560 	/*
561 	 * Some platforms allow delivery of new signals to interrupt an active
562 	 * signal handler.  That could muck up our attempt to send PQcancel, so
563 	 * disable the signals that setup_cancel_handler enabled.
564 	 */
565 	pqsignal(SIGINT, SIG_IGN);
566 	pqsignal(SIGTERM, SIG_IGN);
567 	pqsignal(SIGQUIT, SIG_IGN);
568 
569 	/*
570 	 * If we're in the master, forward signal to all workers.  (It seems best
571 	 * to do this before PQcancel; killing the master transaction will result
572 	 * in invalid-snapshot errors from active workers, which maybe we can
573 	 * quiet by killing workers first.)  Ignore any errors.
574 	 */
575 	if (signal_info.pstate != NULL)
576 	{
577 		for (i = 0; i < signal_info.pstate->numWorkers; i++)
578 		{
579 			pid_t		pid = signal_info.pstate->parallelSlot[i].pid;
580 
581 			if (pid != 0)
582 				kill(pid, SIGTERM);
583 		}
584 	}
585 
586 	/*
587 	 * Send QueryCancel if we have a connection to send to.  Ignore errors,
588 	 * there's not much we can do about them anyway.
589 	 */
590 	if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
591 		(void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
592 
593 	/*
594 	 * Report we're quitting, using nothing more complicated than write(2).
595 	 * When in parallel operation, only the master process should do this.
596 	 */
597 	if (!signal_info.am_worker)
598 	{
599 		if (progname)
600 		{
601 			write_stderr(progname);
602 			write_stderr(": ");
603 		}
604 		write_stderr("terminated by user\n");
605 	}
606 
607 	/*
608 	 * And die, using _exit() not exit() because the latter will invoke atexit
609 	 * handlers that can fail if we interrupted related code.
610 	 */
611 	_exit(1);
612 }
613 
614 /*
615  * Enable cancel interrupt handler, if not already done.
616  */
617 static void
setup_cancel_handler(void)618 setup_cancel_handler(void)
619 {
620 	/*
621 	 * When forking, signal_info.handler_set will propagate into the new
622 	 * process, but that's fine because the signal handler state does too.
623 	 */
624 	if (!signal_info.handler_set)
625 	{
626 		signal_info.handler_set = true;
627 
628 		pqsignal(SIGINT, sigTermHandler);
629 		pqsignal(SIGTERM, sigTermHandler);
630 		pqsignal(SIGQUIT, sigTermHandler);
631 	}
632 }
633 
634 #else							/* WIN32 */
635 
636 /*
637  * Console interrupt handler --- runs in a newly-started thread.
638  *
639  * After stopping other threads and sending cancel requests on all open
640  * connections, we return FALSE which will allow the default ExitProcess()
641  * action to be taken.
642  */
643 static BOOL WINAPI
consoleHandler(DWORD dwCtrlType)644 consoleHandler(DWORD dwCtrlType)
645 {
646 	int			i;
647 	char		errbuf[1];
648 
649 	if (dwCtrlType == CTRL_C_EVENT ||
650 		dwCtrlType == CTRL_BREAK_EVENT)
651 	{
652 		/* Critical section prevents changing data we look at here */
653 		EnterCriticalSection(&signal_info_lock);
654 
655 		/*
656 		 * If in parallel mode, stop worker threads and send QueryCancel to
657 		 * their connected backends.  The main point of stopping the worker
658 		 * threads is to keep them from reporting the query cancels as errors,
659 		 * which would clutter the user's screen.  We needn't stop the master
660 		 * thread since it won't be doing much anyway.  Do this before
661 		 * canceling the main transaction, else we might get invalid-snapshot
662 		 * errors reported before we can stop the workers.  Ignore errors,
663 		 * there's not much we can do about them anyway.
664 		 */
665 		if (signal_info.pstate != NULL)
666 		{
667 			for (i = 0; i < signal_info.pstate->numWorkers; i++)
668 			{
669 				ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
670 				ArchiveHandle *AH = slot->AH;
671 				HANDLE		hThread = (HANDLE) slot->hThread;
672 
673 				/*
674 				 * Using TerminateThread here may leave some resources leaked,
675 				 * but it doesn't matter since we're about to end the whole
676 				 * process.
677 				 */
678 				if (hThread != INVALID_HANDLE_VALUE)
679 					TerminateThread(hThread, 0);
680 
681 				if (AH != NULL && AH->connCancel != NULL)
682 					(void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
683 			}
684 		}
685 
686 		/*
687 		 * Send QueryCancel to master connection, if enabled.  Ignore errors,
688 		 * there's not much we can do about them anyway.
689 		 */
690 		if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
691 			(void) PQcancel(signal_info.myAH->connCancel,
692 							errbuf, sizeof(errbuf));
693 
694 		LeaveCriticalSection(&signal_info_lock);
695 
696 		/*
697 		 * Report we're quitting, using nothing more complicated than
698 		 * write(2).  (We might be able to get away with using write_msg()
699 		 * here, but since we terminated other threads uncleanly above, it
700 		 * seems better to assume as little as possible.)
701 		 */
702 		if (progname)
703 		{
704 			write_stderr(progname);
705 			write_stderr(": ");
706 		}
707 		write_stderr("terminated by user\n");
708 	}
709 
710 	/* Always return FALSE to allow signal handling to continue */
711 	return FALSE;
712 }
713 
714 /*
715  * Enable cancel interrupt handler, if not already done.
716  */
717 static void
setup_cancel_handler(void)718 setup_cancel_handler(void)
719 {
720 	if (!signal_info.handler_set)
721 	{
722 		signal_info.handler_set = true;
723 
724 		InitializeCriticalSection(&signal_info_lock);
725 
726 		SetConsoleCtrlHandler(consoleHandler, TRUE);
727 	}
728 }
729 
730 #endif							/* WIN32 */
731 
732 
733 /*
734  * set_archive_cancel_info
735  *
736  * Fill AH->connCancel with cancellation info for the specified database
737  * connection; or clear it if conn is NULL.
738  */
739 void
set_archive_cancel_info(ArchiveHandle * AH,PGconn * conn)740 set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
741 {
742 	PGcancel   *oldConnCancel;
743 
744 	/*
745 	 * Activate the interrupt handler if we didn't yet in this process.  On
746 	 * Windows, this also initializes signal_info_lock; therefore it's
747 	 * important that this happen at least once before we fork off any
748 	 * threads.
749 	 */
750 	setup_cancel_handler();
751 
752 	/*
753 	 * On Unix, we assume that storing a pointer value is atomic with respect
754 	 * to any possible signal interrupt.  On Windows, use a critical section.
755 	 */
756 
757 #ifdef WIN32
758 	EnterCriticalSection(&signal_info_lock);
759 #endif
760 
761 	/* Free the old one if we have one */
762 	oldConnCancel = AH->connCancel;
763 	/* be sure interrupt handler doesn't use pointer while freeing */
764 	AH->connCancel = NULL;
765 
766 	if (oldConnCancel != NULL)
767 		PQfreeCancel(oldConnCancel);
768 
769 	/* Set the new one if specified */
770 	if (conn)
771 		AH->connCancel = PQgetCancel(conn);
772 
773 	/*
774 	 * On Unix, there's only ever one active ArchiveHandle per process, so we
775 	 * can just set signal_info.myAH unconditionally.  On Windows, do that
776 	 * only in the main thread; worker threads have to make sure their
777 	 * ArchiveHandle appears in the pstate data, which is dealt with in
778 	 * RunWorker().
779 	 */
780 #ifndef WIN32
781 	signal_info.myAH = AH;
782 #else
783 	if (mainThreadId == GetCurrentThreadId())
784 		signal_info.myAH = AH;
785 #endif
786 
787 #ifdef WIN32
788 	LeaveCriticalSection(&signal_info_lock);
789 #endif
790 }
791 
792 /*
793  * set_cancel_pstate
794  *
795  * Set signal_info.pstate to point to the specified ParallelState, if any.
796  * We need this mainly to have an interlock against Windows signal thread.
797  */
798 static void
set_cancel_pstate(ParallelState * pstate)799 set_cancel_pstate(ParallelState *pstate)
800 {
801 #ifdef WIN32
802 	EnterCriticalSection(&signal_info_lock);
803 #endif
804 
805 	signal_info.pstate = pstate;
806 
807 #ifdef WIN32
808 	LeaveCriticalSection(&signal_info_lock);
809 #endif
810 }
811 
812 /*
813  * set_cancel_slot_archive
814  *
815  * Set ParallelSlot's AH field to point to the specified archive, if any.
816  * We need this mainly to have an interlock against Windows signal thread.
817  */
818 static void
set_cancel_slot_archive(ParallelSlot * slot,ArchiveHandle * AH)819 set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
820 {
821 #ifdef WIN32
822 	EnterCriticalSection(&signal_info_lock);
823 #endif
824 
825 	slot->AH = AH;
826 
827 #ifdef WIN32
828 	LeaveCriticalSection(&signal_info_lock);
829 #endif
830 }
831 
832 
833 /*
834  * This function is called by both Unix and Windows variants to set up
835  * and run a worker process.  Caller should exit the process (or thread)
836  * upon return.
837  */
838 static void
RunWorker(ArchiveHandle * AH,ParallelSlot * slot)839 RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
840 {
841 	int			pipefd[2];
842 
843 	/* fetch child ends of pipes */
844 	pipefd[PIPE_READ] = slot->pipeRevRead;
845 	pipefd[PIPE_WRITE] = slot->pipeRevWrite;
846 
847 	/*
848 	 * Clone the archive so that we have our own state to work with, and in
849 	 * particular our own database connection.
850 	 *
851 	 * We clone on Unix as well as Windows, even though technically we don't
852 	 * need to because fork() gives us a copy in our own address space
853 	 * already.  But CloneArchive resets the state information and also clones
854 	 * the database connection which both seem kinda helpful.
855 	 */
856 	AH = CloneArchive(AH);
857 
858 	/* Remember cloned archive where signal handler can find it */
859 	set_cancel_slot_archive(slot, AH);
860 
861 	/*
862 	 * Call the setup worker function that's defined in the ArchiveHandle.
863 	 */
864 	(AH->SetupWorkerPtr) ((Archive *) AH);
865 
866 	/*
867 	 * Execute commands until done.
868 	 */
869 	WaitForCommands(AH, pipefd);
870 
871 	/*
872 	 * Disconnect from database and clean up.
873 	 */
874 	set_cancel_slot_archive(slot, NULL);
875 	DisconnectDatabase(&(AH->public));
876 	DeCloneArchive(AH);
877 }
878 
879 /*
880  * Thread base function for Windows
881  */
882 #ifdef WIN32
883 static unsigned __stdcall
init_spawned_worker_win32(WorkerInfo * wi)884 init_spawned_worker_win32(WorkerInfo *wi)
885 {
886 	ArchiveHandle *AH = wi->AH;
887 	ParallelSlot *slot = wi->slot;
888 
889 	/* Don't need WorkerInfo anymore */
890 	free(wi);
891 
892 	/* Run the worker ... */
893 	RunWorker(AH, slot);
894 
895 	/* Exit the thread */
896 	_endthreadex(0);
897 	return 0;
898 }
899 #endif							/* WIN32 */
900 
901 /*
902  * This function starts a parallel dump or restore by spawning off the worker
903  * processes.  For Windows, it creates a number of threads; on Unix the
904  * workers are created with fork().
905  */
906 ParallelState *
ParallelBackupStart(ArchiveHandle * AH)907 ParallelBackupStart(ArchiveHandle *AH)
908 {
909 	ParallelState *pstate;
910 	int			i;
911 
912 	Assert(AH->public.numWorkers > 0);
913 
914 	pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
915 
916 	pstate->numWorkers = AH->public.numWorkers;
917 	pstate->te = NULL;
918 	pstate->parallelSlot = NULL;
919 
920 	if (AH->public.numWorkers == 1)
921 		return pstate;
922 
923 	/* Create status arrays, being sure to initialize all fields to 0 */
924 	pstate->te = (TocEntry **)
925 		pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
926 	pstate->parallelSlot = (ParallelSlot *)
927 		pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
928 
929 #ifdef WIN32
930 	/* Make fmtId() and fmtQualifiedId() use thread-local storage */
931 	getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
932 #endif
933 
934 	/*
935 	 * Set the pstate in shutdown_info, to tell the exit handler that it must
936 	 * clean up workers as well as the main database connection.  But we don't
937 	 * set this in signal_info yet, because we don't want child processes to
938 	 * inherit non-NULL signal_info.pstate.
939 	 */
940 	shutdown_info.pstate = pstate;
941 
942 	/*
943 	 * Temporarily disable query cancellation on the master connection.  This
944 	 * ensures that child processes won't inherit valid AH->connCancel
945 	 * settings and thus won't try to issue cancels against the master's
946 	 * connection.  No harm is done if we fail while it's disabled, because
947 	 * the master connection is idle at this point anyway.
948 	 */
949 	set_archive_cancel_info(AH, NULL);
950 
951 	/* Ensure stdio state is quiesced before forking */
952 	fflush(NULL);
953 
954 	/* Create desired number of workers */
955 	for (i = 0; i < pstate->numWorkers; i++)
956 	{
957 #ifdef WIN32
958 		WorkerInfo *wi;
959 		uintptr_t	handle;
960 #else
961 		pid_t		pid;
962 #endif
963 		ParallelSlot *slot = &(pstate->parallelSlot[i]);
964 		int			pipeMW[2],
965 					pipeWM[2];
966 
967 		/* Create communication pipes for this worker */
968 		if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
969 			exit_horribly(modulename,
970 						  "could not create communication channels: %s\n",
971 						  strerror(errno));
972 
973 		/* master's ends of the pipes */
974 		slot->pipeRead = pipeWM[PIPE_READ];
975 		slot->pipeWrite = pipeMW[PIPE_WRITE];
976 		/* child's ends of the pipes */
977 		slot->pipeRevRead = pipeMW[PIPE_READ];
978 		slot->pipeRevWrite = pipeWM[PIPE_WRITE];
979 
980 #ifdef WIN32
981 		/* Create transient structure to pass args to worker function */
982 		wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
983 
984 		wi->AH = AH;
985 		wi->slot = slot;
986 
987 		handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
988 								wi, 0, &(slot->threadId));
989 		slot->hThread = handle;
990 		slot->workerStatus = WRKR_IDLE;
991 #else							/* !WIN32 */
992 		pid = fork();
993 		if (pid == 0)
994 		{
995 			/* we are the worker */
996 			int			j;
997 
998 			/* this is needed for GetMyPSlot() */
999 			slot->pid = getpid();
1000 
1001 			/* instruct signal handler that we're in a worker now */
1002 			signal_info.am_worker = true;
1003 
1004 			/* close read end of Worker -> Master */
1005 			closesocket(pipeWM[PIPE_READ]);
1006 			/* close write end of Master -> Worker */
1007 			closesocket(pipeMW[PIPE_WRITE]);
1008 
1009 			/*
1010 			 * Close all inherited fds for communication of the master with
1011 			 * previously-forked workers.
1012 			 */
1013 			for (j = 0; j < i; j++)
1014 			{
1015 				closesocket(pstate->parallelSlot[j].pipeRead);
1016 				closesocket(pstate->parallelSlot[j].pipeWrite);
1017 			}
1018 
1019 			/* Run the worker ... */
1020 			RunWorker(AH, slot);
1021 
1022 			/* We can just exit(0) when done */
1023 			exit(0);
1024 		}
1025 		else if (pid < 0)
1026 		{
1027 			/* fork failed */
1028 			exit_horribly(modulename,
1029 						  "could not create worker process: %s\n",
1030 						  strerror(errno));
1031 		}
1032 
1033 		/* In Master after successful fork */
1034 		slot->pid = pid;
1035 		slot->workerStatus = WRKR_IDLE;
1036 
1037 		/* close read end of Master -> Worker */
1038 		closesocket(pipeMW[PIPE_READ]);
1039 		/* close write end of Worker -> Master */
1040 		closesocket(pipeWM[PIPE_WRITE]);
1041 #endif							/* WIN32 */
1042 	}
1043 
1044 	/*
1045 	 * Having forked off the workers, disable SIGPIPE so that master isn't
1046 	 * killed if it tries to send a command to a dead worker.  We don't want
1047 	 * the workers to inherit this setting, though.
1048 	 */
1049 #ifndef WIN32
1050 	pqsignal(SIGPIPE, SIG_IGN);
1051 #endif
1052 
1053 	/*
1054 	 * Re-establish query cancellation on the master connection.
1055 	 */
1056 	set_archive_cancel_info(AH, AH->connection);
1057 
1058 	/*
1059 	 * Tell the cancel signal handler to forward signals to worker processes,
1060 	 * too.  (As with query cancel, we did not need this earlier because the
1061 	 * workers have not yet been given anything to do; if we die before this
1062 	 * point, any already-started workers will see EOF and quit promptly.)
1063 	 */
1064 	set_cancel_pstate(pstate);
1065 
1066 	return pstate;
1067 }
1068 
1069 /*
1070  * Close down a parallel dump or restore.
1071  */
1072 void
ParallelBackupEnd(ArchiveHandle * AH,ParallelState * pstate)1073 ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
1074 {
1075 	int			i;
1076 
1077 	/* No work if non-parallel */
1078 	if (pstate->numWorkers == 1)
1079 		return;
1080 
1081 	/* There should not be any unfinished jobs */
1082 	Assert(IsEveryWorkerIdle(pstate));
1083 
1084 	/* Close the sockets so that the workers know they can exit */
1085 	for (i = 0; i < pstate->numWorkers; i++)
1086 	{
1087 		closesocket(pstate->parallelSlot[i].pipeRead);
1088 		closesocket(pstate->parallelSlot[i].pipeWrite);
1089 	}
1090 
1091 	/* Wait for them to exit */
1092 	WaitForTerminatingWorkers(pstate);
1093 
1094 	/*
1095 	 * Unlink pstate from shutdown_info, so the exit handler will not try to
1096 	 * use it; and likewise unlink from signal_info.
1097 	 */
1098 	shutdown_info.pstate = NULL;
1099 	set_cancel_pstate(NULL);
1100 
1101 	/* Release state (mere neatnik-ism, since we're about to terminate) */
1102 	free(pstate->te);
1103 	free(pstate->parallelSlot);
1104 	free(pstate);
1105 }
1106 
1107 /*
1108  * These next four functions handle construction and parsing of the command
1109  * strings and response strings for parallel workers.
1110  *
1111  * Currently, these can be the same regardless of which archive format we are
1112  * processing.  In future, we might want to let format modules override these
1113  * functions to add format-specific data to a command or response.
1114  */
1115 
1116 /*
1117  * buildWorkerCommand: format a command string to send to a worker.
1118  *
1119  * The string is built in the caller-supplied buffer of size buflen.
1120  */
1121 static void
buildWorkerCommand(ArchiveHandle * AH,TocEntry * te,T_Action act,char * buf,int buflen)1122 buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act,
1123 				   char *buf, int buflen)
1124 {
1125 	if (act == ACT_DUMP)
1126 		snprintf(buf, buflen, "DUMP %d", te->dumpId);
1127 	else if (act == ACT_RESTORE)
1128 		snprintf(buf, buflen, "RESTORE %d", te->dumpId);
1129 	else
1130 		Assert(false);
1131 }
1132 
1133 /*
1134  * parseWorkerCommand: interpret a command string in a worker.
1135  */
1136 static void
parseWorkerCommand(ArchiveHandle * AH,TocEntry ** te,T_Action * act,const char * msg)1137 parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act,
1138 				   const char *msg)
1139 {
1140 	DumpId		dumpId;
1141 	int			nBytes;
1142 
1143 	if (messageStartsWith(msg, "DUMP "))
1144 	{
1145 		*act = ACT_DUMP;
1146 		sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
1147 		Assert(nBytes == strlen(msg));
1148 		*te = getTocEntryByDumpId(AH, dumpId);
1149 		Assert(*te != NULL);
1150 	}
1151 	else if (messageStartsWith(msg, "RESTORE "))
1152 	{
1153 		*act = ACT_RESTORE;
1154 		sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
1155 		Assert(nBytes == strlen(msg));
1156 		*te = getTocEntryByDumpId(AH, dumpId);
1157 		Assert(*te != NULL);
1158 	}
1159 	else
1160 		exit_horribly(modulename,
1161 					  "unrecognized command received from master: \"%s\"\n",
1162 					  msg);
1163 }
1164 
1165 /*
1166  * buildWorkerResponse: format a response string to send to the master.
1167  *
1168  * The string is built in the caller-supplied buffer of size buflen.
1169  */
1170 static void
buildWorkerResponse(ArchiveHandle * AH,TocEntry * te,T_Action act,int status,char * buf,int buflen)1171 buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status,
1172 					char *buf, int buflen)
1173 {
1174 	snprintf(buf, buflen, "OK %d %d %d",
1175 			 te->dumpId,
1176 			 status,
1177 			 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
1178 }
1179 
1180 /*
1181  * parseWorkerResponse: parse the status message returned by a worker.
1182  *
1183  * Returns the integer status code, and may update fields of AH and/or te.
1184  */
1185 static int
parseWorkerResponse(ArchiveHandle * AH,TocEntry * te,const char * msg)1186 parseWorkerResponse(ArchiveHandle *AH, TocEntry *te,
1187 					const char *msg)
1188 {
1189 	DumpId		dumpId;
1190 	int			nBytes,
1191 				n_errors;
1192 	int			status = 0;
1193 
1194 	if (messageStartsWith(msg, "OK "))
1195 	{
1196 		sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1197 
1198 		Assert(dumpId == te->dumpId);
1199 		Assert(nBytes == strlen(msg));
1200 
1201 		AH->public.n_errors += n_errors;
1202 	}
1203 	else
1204 		exit_horribly(modulename,
1205 					  "invalid message received from worker: \"%s\"\n",
1206 					  msg);
1207 
1208 	return status;
1209 }
1210 
1211 /*
1212  * Dispatch a job to some free worker.
1213  *
1214  * te is the TocEntry to be processed, act is the action to be taken on it.
1215  * callback is the function to call on completion of the job.
1216  *
1217  * If no worker is currently available, this will block, and previously
1218  * registered callback functions may be called.
1219  */
1220 void
DispatchJobForTocEntry(ArchiveHandle * AH,ParallelState * pstate,TocEntry * te,T_Action act,ParallelCompletionPtr callback,void * callback_data)1221 DispatchJobForTocEntry(ArchiveHandle *AH,
1222 					   ParallelState *pstate,
1223 					   TocEntry *te,
1224 					   T_Action act,
1225 					   ParallelCompletionPtr callback,
1226 					   void *callback_data)
1227 {
1228 	int			worker;
1229 	char		buf[256];
1230 
1231 	/* Get a worker, waiting if none are idle */
1232 	while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
1233 		WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
1234 
1235 	/* Construct and send command string */
1236 	buildWorkerCommand(AH, te, act, buf, sizeof(buf));
1237 
1238 	sendMessageToWorker(pstate, worker, buf);
1239 
1240 	/* Remember worker is busy, and which TocEntry it's working on */
1241 	pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
1242 	pstate->parallelSlot[worker].callback = callback;
1243 	pstate->parallelSlot[worker].callback_data = callback_data;
1244 	pstate->te[worker] = te;
1245 }
1246 
1247 /*
1248  * Find an idle worker and return its slot number.
1249  * Return NO_SLOT if none are idle.
1250  */
1251 static int
GetIdleWorker(ParallelState * pstate)1252 GetIdleWorker(ParallelState *pstate)
1253 {
1254 	int			i;
1255 
1256 	for (i = 0; i < pstate->numWorkers; i++)
1257 	{
1258 		if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
1259 			return i;
1260 	}
1261 	return NO_SLOT;
1262 }
1263 
1264 /*
1265  * Return true iff no worker is running.
1266  */
1267 static bool
HasEveryWorkerTerminated(ParallelState * pstate)1268 HasEveryWorkerTerminated(ParallelState *pstate)
1269 {
1270 	int			i;
1271 
1272 	for (i = 0; i < pstate->numWorkers; i++)
1273 	{
1274 		if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1275 			return false;
1276 	}
1277 	return true;
1278 }
1279 
1280 /*
1281  * Return true iff every worker is in the WRKR_IDLE state.
1282  */
1283 bool
IsEveryWorkerIdle(ParallelState * pstate)1284 IsEveryWorkerIdle(ParallelState *pstate)
1285 {
1286 	int			i;
1287 
1288 	for (i = 0; i < pstate->numWorkers; i++)
1289 	{
1290 		if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
1291 			return false;
1292 	}
1293 	return true;
1294 }
1295 
1296 /*
1297  * Acquire lock on a table to be dumped by a worker process.
1298  *
1299  * The master process is already holding an ACCESS SHARE lock.  Ordinarily
1300  * it's no problem for a worker to get one too, but if anything else besides
1301  * pg_dump is running, there's a possible deadlock:
1302  *
1303  * 1) Master dumps the schema and locks all tables in ACCESS SHARE mode.
1304  * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
1305  *	  because the master holds a conflicting ACCESS SHARE lock).
1306  * 3) A worker process also requests an ACCESS SHARE lock to read the table.
1307  *	  The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
1308  * 4) Now we have a deadlock, since the master is effectively waiting for
1309  *	  the worker.  The server cannot detect that, however.
1310  *
1311  * To prevent an infinite wait, prior to touching a table in a worker, request
1312  * a lock in ACCESS SHARE mode but with NOWAIT.  If we don't get the lock,
1313  * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
1314  * so we have a deadlock.  We must fail the backup in that case.
1315  */
1316 static void
lockTableForWorker(ArchiveHandle * AH,TocEntry * te)1317 lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
1318 {
1319 	const char *qualId;
1320 	PQExpBuffer query;
1321 	PGresult   *res;
1322 
1323 	/* Nothing to do for BLOBS */
1324 	if (strcmp(te->desc, "BLOBS") == 0)
1325 		return;
1326 
1327 	query = createPQExpBuffer();
1328 
1329 	qualId = fmtQualifiedId(te->namespace, te->tag);
1330 
1331 	appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
1332 					  qualId);
1333 
1334 	res = PQexec(AH->connection, query->data);
1335 
1336 	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
1337 		exit_horribly(modulename,
1338 					  "could not obtain lock on relation \"%s\"\n"
1339 					  "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1340 					  "on the table after the pg_dump parent process had gotten the "
1341 					  "initial ACCESS SHARE lock on the table.\n", qualId);
1342 
1343 	PQclear(res);
1344 	destroyPQExpBuffer(query);
1345 }
1346 
1347 /*
1348  * WaitForCommands: main routine for a worker process.
1349  *
1350  * Read and execute commands from the master until we see EOF on the pipe.
1351  */
1352 static void
WaitForCommands(ArchiveHandle * AH,int pipefd[2])1353 WaitForCommands(ArchiveHandle *AH, int pipefd[2])
1354 {
1355 	char	   *command;
1356 	TocEntry   *te;
1357 	T_Action	act;
1358 	int			status = 0;
1359 	char		buf[256];
1360 
1361 	for (;;)
1362 	{
1363 		if (!(command = getMessageFromMaster(pipefd)))
1364 		{
1365 			/* EOF, so done */
1366 			return;
1367 		}
1368 
1369 		/* Decode the command */
1370 		parseWorkerCommand(AH, &te, &act, command);
1371 
1372 		if (act == ACT_DUMP)
1373 		{
1374 			/* Acquire lock on this table within the worker's session */
1375 			lockTableForWorker(AH, te);
1376 
1377 			/* Perform the dump command */
1378 			status = (AH->WorkerJobDumpPtr) (AH, te);
1379 		}
1380 		else if (act == ACT_RESTORE)
1381 		{
1382 			/* Perform the restore command */
1383 			status = (AH->WorkerJobRestorePtr) (AH, te);
1384 		}
1385 		else
1386 			Assert(false);
1387 
1388 		/* Return status to master */
1389 		buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
1390 
1391 		sendMessageToMaster(pipefd, buf);
1392 
1393 		/* command was pg_malloc'd and we are responsible for free()ing it. */
1394 		free(command);
1395 	}
1396 }
1397 
1398 /*
1399  * Check for status messages from workers.
1400  *
1401  * If do_wait is true, wait to get a status message; otherwise, just return
1402  * immediately if there is none available.
1403  *
1404  * When we get a status message, we pass the status code to the callback
1405  * function that was specified to DispatchJobForTocEntry, then reset the
1406  * worker status to IDLE.
1407  *
1408  * Returns true if we collected a status message, else false.
1409  *
1410  * XXX is it worth checking for more than one status message per call?
1411  * It seems somewhat unlikely that multiple workers would finish at exactly
1412  * the same time.
1413  */
1414 static bool
ListenToWorkers(ArchiveHandle * AH,ParallelState * pstate,bool do_wait)1415 ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
1416 {
1417 	int			worker;
1418 	char	   *msg;
1419 
1420 	/* Try to collect a status message */
1421 	msg = getMessageFromWorker(pstate, do_wait, &worker);
1422 
1423 	if (!msg)
1424 	{
1425 		/* If do_wait is true, we must have detected EOF on some socket */
1426 		if (do_wait)
1427 			exit_horribly(modulename, "a worker process died unexpectedly\n");
1428 		return false;
1429 	}
1430 
1431 	/* Process it and update our idea of the worker's status */
1432 	if (messageStartsWith(msg, "OK "))
1433 	{
1434 		ParallelSlot *slot = &pstate->parallelSlot[worker];
1435 		TocEntry   *te = pstate->te[worker];
1436 		int			status;
1437 
1438 		status = parseWorkerResponse(AH, te, msg);
1439 		slot->callback(AH, te, status, slot->callback_data);
1440 		slot->workerStatus = WRKR_IDLE;
1441 		pstate->te[worker] = NULL;
1442 	}
1443 	else
1444 		exit_horribly(modulename,
1445 					  "invalid message received from worker: \"%s\"\n",
1446 					  msg);
1447 
1448 	/* Free the string returned from getMessageFromWorker */
1449 	free(msg);
1450 
1451 	return true;
1452 }
1453 
1454 /*
1455  * Check for status results from workers, waiting if necessary.
1456  *
1457  * Available wait modes are:
1458  * WFW_NO_WAIT: reap any available status, but don't block
1459  * WFW_GOT_STATUS: wait for at least one more worker to finish
1460  * WFW_ONE_IDLE: wait for at least one worker to be idle
1461  * WFW_ALL_IDLE: wait for all workers to be idle
1462  *
1463  * Any received results are passed to the callback specified to
1464  * DispatchJobForTocEntry.
1465  *
1466  * This function is executed in the master process.
1467  */
1468 void
WaitForWorkers(ArchiveHandle * AH,ParallelState * pstate,WFW_WaitOption mode)1469 WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
1470 {
1471 	bool		do_wait = false;
1472 
1473 	/*
1474 	 * In GOT_STATUS mode, always block waiting for a message, since we can't
1475 	 * return till we get something.  In other modes, we don't block the first
1476 	 * time through the loop.
1477 	 */
1478 	if (mode == WFW_GOT_STATUS)
1479 	{
1480 		/* Assert that caller knows what it's doing */
1481 		Assert(!IsEveryWorkerIdle(pstate));
1482 		do_wait = true;
1483 	}
1484 
1485 	for (;;)
1486 	{
1487 		/*
1488 		 * Check for status messages, even if we don't need to block.  We do
1489 		 * not try very hard to reap all available messages, though, since
1490 		 * there's unlikely to be more than one.
1491 		 */
1492 		if (ListenToWorkers(AH, pstate, do_wait))
1493 		{
1494 			/*
1495 			 * If we got a message, we are done by definition for GOT_STATUS
1496 			 * mode, and we can also be certain that there's at least one idle
1497 			 * worker.  So we're done in all but ALL_IDLE mode.
1498 			 */
1499 			if (mode != WFW_ALL_IDLE)
1500 				return;
1501 		}
1502 
1503 		/* Check whether we must wait for new status messages */
1504 		switch (mode)
1505 		{
1506 			case WFW_NO_WAIT:
1507 				return;			/* never wait */
1508 			case WFW_GOT_STATUS:
1509 				Assert(false);	/* can't get here, because we waited */
1510 				break;
1511 			case WFW_ONE_IDLE:
1512 				if (GetIdleWorker(pstate) != NO_SLOT)
1513 					return;
1514 				break;
1515 			case WFW_ALL_IDLE:
1516 				if (IsEveryWorkerIdle(pstate))
1517 					return;
1518 				break;
1519 		}
1520 
1521 		/* Loop back, and this time wait for something to happen */
1522 		do_wait = true;
1523 	}
1524 }
1525 
1526 /*
1527  * Read one command message from the master, blocking if necessary
1528  * until one is available, and return it as a malloc'd string.
1529  * On EOF, return NULL.
1530  *
1531  * This function is executed in worker processes.
1532  */
1533 static char *
getMessageFromMaster(int pipefd[2])1534 getMessageFromMaster(int pipefd[2])
1535 {
1536 	return readMessageFromPipe(pipefd[PIPE_READ]);
1537 }
1538 
1539 /*
1540  * Send a status message to the master.
1541  *
1542  * This function is executed in worker processes.
1543  */
1544 static void
sendMessageToMaster(int pipefd[2],const char * str)1545 sendMessageToMaster(int pipefd[2], const char *str)
1546 {
1547 	int			len = strlen(str) + 1;
1548 
1549 	if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
1550 		exit_horribly(modulename,
1551 					  "could not write to the communication channel: %s\n",
1552 					  strerror(errno));
1553 }
1554 
1555 /*
1556  * Wait until some descriptor in "workerset" becomes readable.
1557  * Returns -1 on error, else the number of readable descriptors.
1558  */
1559 static int
select_loop(int maxFd,fd_set * workerset)1560 select_loop(int maxFd, fd_set *workerset)
1561 {
1562 	int			i;
1563 	fd_set		saveSet = *workerset;
1564 
1565 	for (;;)
1566 	{
1567 		*workerset = saveSet;
1568 		i = select(maxFd + 1, workerset, NULL, NULL, NULL);
1569 
1570 #ifndef WIN32
1571 		if (i < 0 && errno == EINTR)
1572 			continue;
1573 #else
1574 		if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1575 			continue;
1576 #endif
1577 		break;
1578 	}
1579 
1580 	return i;
1581 }
1582 
1583 
1584 /*
1585  * Check for messages from worker processes.
1586  *
1587  * If a message is available, return it as a malloc'd string, and put the
1588  * index of the sending worker in *worker.
1589  *
1590  * If nothing is available, wait if "do_wait" is true, else return NULL.
1591  *
1592  * If we detect EOF on any socket, we'll return NULL.  It's not great that
1593  * that's hard to distinguish from the no-data-available case, but for now
1594  * our one caller is okay with that.
1595  *
1596  * This function is executed in the master process.
1597  */
1598 static char *
getMessageFromWorker(ParallelState * pstate,bool do_wait,int * worker)1599 getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
1600 {
1601 	int			i;
1602 	fd_set		workerset;
1603 	int			maxFd = -1;
1604 	struct timeval nowait = {0, 0};
1605 
1606 	/* construct bitmap of socket descriptors for select() */
1607 	FD_ZERO(&workerset);
1608 	for (i = 0; i < pstate->numWorkers; i++)
1609 	{
1610 		if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1611 			continue;
1612 		FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
1613 		if (pstate->parallelSlot[i].pipeRead > maxFd)
1614 			maxFd = pstate->parallelSlot[i].pipeRead;
1615 	}
1616 
1617 	if (do_wait)
1618 	{
1619 		i = select_loop(maxFd, &workerset);
1620 		Assert(i != 0);
1621 	}
1622 	else
1623 	{
1624 		if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1625 			return NULL;
1626 	}
1627 
1628 	if (i < 0)
1629 		exit_horribly(modulename, "select() failed: %s\n", strerror(errno));
1630 
1631 	for (i = 0; i < pstate->numWorkers; i++)
1632 	{
1633 		char	   *msg;
1634 
1635 		if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1636 			continue;
1637 		if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
1638 			continue;
1639 
1640 		/*
1641 		 * Read the message if any.  If the socket is ready because of EOF,
1642 		 * we'll return NULL instead (and the socket will stay ready, so the
1643 		 * condition will persist).
1644 		 *
1645 		 * Note: because this is a blocking read, we'll wait if only part of
1646 		 * the message is available.  Waiting a long time would be bad, but
1647 		 * since worker status messages are short and are always sent in one
1648 		 * operation, it shouldn't be a problem in practice.
1649 		 */
1650 		msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
1651 		*worker = i;
1652 		return msg;
1653 	}
1654 	Assert(false);
1655 	return NULL;
1656 }
1657 
1658 /*
1659  * Send a command message to the specified worker process.
1660  *
1661  * This function is executed in the master process.
1662  */
1663 static void
sendMessageToWorker(ParallelState * pstate,int worker,const char * str)1664 sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
1665 {
1666 	int			len = strlen(str) + 1;
1667 
1668 	if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
1669 	{
1670 		exit_horribly(modulename,
1671 					  "could not write to the communication channel: %s\n",
1672 					  strerror(errno));
1673 	}
1674 }
1675 
1676 /*
1677  * Read one message from the specified pipe (fd), blocking if necessary
1678  * until one is available, and return it as a malloc'd string.
1679  * On EOF, return NULL.
1680  *
1681  * A "message" on the channel is just a null-terminated string.
1682  */
1683 static char *
readMessageFromPipe(int fd)1684 readMessageFromPipe(int fd)
1685 {
1686 	char	   *msg;
1687 	int			msgsize,
1688 				bufsize;
1689 	int			ret;
1690 
1691 	/*
1692 	 * In theory, if we let piperead() read multiple bytes, it might give us
1693 	 * back fragments of multiple messages.  (That can't actually occur, since
1694 	 * neither master nor workers send more than one message without waiting
1695 	 * for a reply, but we don't wish to assume that here.)  For simplicity,
1696 	 * read a byte at a time until we get the terminating '\0'.  This method
1697 	 * is a bit inefficient, but since this is only used for relatively short
1698 	 * command and status strings, it shouldn't matter.
1699 	 */
1700 	bufsize = 64;				/* could be any number */
1701 	msg = (char *) pg_malloc(bufsize);
1702 	msgsize = 0;
1703 	for (;;)
1704 	{
1705 		Assert(msgsize < bufsize);
1706 		ret = piperead(fd, msg + msgsize, 1);
1707 		if (ret <= 0)
1708 			break;				/* error or connection closure */
1709 
1710 		Assert(ret == 1);
1711 
1712 		if (msg[msgsize] == '\0')
1713 			return msg;			/* collected whole message */
1714 
1715 		msgsize++;
1716 		if (msgsize == bufsize) /* enlarge buffer if needed */
1717 		{
1718 			bufsize += 16;		/* could be any number */
1719 			msg = (char *) pg_realloc(msg, bufsize);
1720 		}
1721 	}
1722 
1723 	/* Other end has closed the connection */
1724 	pg_free(msg);
1725 	return NULL;
1726 }
1727 
1728 #ifdef WIN32
1729 
1730 /*
1731  * This is a replacement version of pipe(2) for Windows which allows the pipe
1732  * handles to be used in select().
1733  *
1734  * Reads and writes on the pipe must go through piperead()/pipewrite().
1735  *
1736  * For consistency with Unix we declare the returned handles as "int".
1737  * This is okay even on WIN64 because system handles are not more than
1738  * 32 bits wide, but we do have to do some casting.
1739  */
1740 static int
pgpipe(int handles[2])1741 pgpipe(int handles[2])
1742 {
1743 	pgsocket	s,
1744 				tmp_sock;
1745 	struct sockaddr_in serv_addr;
1746 	int			len = sizeof(serv_addr);
1747 
1748 	/* We have to use the Unix socket invalid file descriptor value here. */
1749 	handles[0] = handles[1] = -1;
1750 
1751 	/*
1752 	 * setup listen socket
1753 	 */
1754 	if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1755 	{
1756 		write_msg(modulename, "pgpipe: could not create socket: error code %d\n",
1757 				  WSAGetLastError());
1758 		return -1;
1759 	}
1760 
1761 	memset((void *) &serv_addr, 0, sizeof(serv_addr));
1762 	serv_addr.sin_family = AF_INET;
1763 	serv_addr.sin_port = pg_hton16(0);
1764 	serv_addr.sin_addr.s_addr = pg_hton32(INADDR_LOOPBACK);
1765 	if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1766 	{
1767 		write_msg(modulename, "pgpipe: could not bind: error code %d\n",
1768 				  WSAGetLastError());
1769 		closesocket(s);
1770 		return -1;
1771 	}
1772 	if (listen(s, 1) == SOCKET_ERROR)
1773 	{
1774 		write_msg(modulename, "pgpipe: could not listen: error code %d\n",
1775 				  WSAGetLastError());
1776 		closesocket(s);
1777 		return -1;
1778 	}
1779 	if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
1780 	{
1781 		write_msg(modulename, "pgpipe: getsockname() failed: error code %d\n",
1782 				  WSAGetLastError());
1783 		closesocket(s);
1784 		return -1;
1785 	}
1786 
1787 	/*
1788 	 * setup pipe handles
1789 	 */
1790 	if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1791 	{
1792 		write_msg(modulename, "pgpipe: could not create second socket: error code %d\n",
1793 				  WSAGetLastError());
1794 		closesocket(s);
1795 		return -1;
1796 	}
1797 	handles[1] = (int) tmp_sock;
1798 
1799 	if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1800 	{
1801 		write_msg(modulename, "pgpipe: could not connect socket: error code %d\n",
1802 				  WSAGetLastError());
1803 		closesocket(handles[1]);
1804 		handles[1] = -1;
1805 		closesocket(s);
1806 		return -1;
1807 	}
1808 	if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
1809 	{
1810 		write_msg(modulename, "pgpipe: could not accept connection: error code %d\n",
1811 				  WSAGetLastError());
1812 		closesocket(handles[1]);
1813 		handles[1] = -1;
1814 		closesocket(s);
1815 		return -1;
1816 	}
1817 	handles[0] = (int) tmp_sock;
1818 
1819 	closesocket(s);
1820 	return 0;
1821 }
1822 
1823 /*
1824  * Windows implementation of reading from a pipe.
1825  */
1826 static int
piperead(int s,char * buf,int len)1827 piperead(int s, char *buf, int len)
1828 {
1829 	int			ret = recv(s, buf, len, 0);
1830 
1831 	if (ret < 0 && WSAGetLastError() == WSAECONNRESET)
1832 	{
1833 		/* EOF on the pipe! */
1834 		ret = 0;
1835 	}
1836 	return ret;
1837 }
1838 
1839 #endif							/* WIN32 */
1840