1 /*-------------------------------------------------------------------------
2  *
3  * parallel.c
4  *
5  *	Parallel support for pg_dump and pg_restore
6  *
7  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * IDENTIFICATION
11  *		src/bin/pg_dump/parallel.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 
16 /*
17  * Parallel operation works like this:
18  *
19  * The original, leader process calls ParallelBackupStart(), which forks off
20  * the desired number of worker processes, which each enter WaitForCommands().
21  *
22  * The leader process dispatches an individual work item to one of the worker
23  * processes in DispatchJobForTocEntry().  We send a command string such as
24  * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
25  * The worker process receives and decodes the command and passes it to the
26  * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
27  * which are routines of the current archive format.  That routine performs
28  * the required action (dump or restore) and returns an integer status code.
29  * This is passed back to the leader where we pass it to the
30  * ParallelCompletionPtr callback function that was passed to
31  * DispatchJobForTocEntry().  The callback function does state updating
32  * for the leader control logic in pg_backup_archiver.c.
33  *
34  * In principle additional archive-format-specific information might be needed
35  * in commands or worker status responses, but so far that hasn't proved
36  * necessary, since workers have full copies of the ArchiveHandle/TocEntry
37  * data structures.  Remember that we have forked off the workers only after
38  * we have read in the catalog.  That's why our worker processes can also
39  * access the catalog information.  (In the Windows case, the workers are
40  * threads in the same process.  To avoid problems, they work with cloned
41  * copies of the Archive data structure; see RunWorker().)
42  *
43  * In the leader process, the workerStatus field for each worker has one of
44  * the following values:
45  *		WRKR_NOT_STARTED: we've not yet forked this worker
46  *		WRKR_IDLE: it's waiting for a command
47  *		WRKR_WORKING: it's working on a command
48  *		WRKR_TERMINATED: process ended
49  * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
50  * state, and must be NULL in other states.
51  */
52 
53 #include "postgres_fe.h"
54 
55 #ifndef WIN32
56 #include <sys/wait.h>
57 #include <signal.h>
58 #include <unistd.h>
59 #include <fcntl.h>
60 #endif
61 #ifdef HAVE_SYS_SELECT_H
62 #include <sys/select.h>
63 #endif
64 
65 #include "fe_utils/string_utils.h"
66 #include "parallel.h"
67 #include "pg_backup_utils.h"
68 #include "port/pg_bswap.h"
69 
70 /* Mnemonic macros for indexing the fd array returned by pipe(2) */
71 #define PIPE_READ							0
72 #define PIPE_WRITE							1
73 
74 #define NO_SLOT (-1)			/* Failure result for GetIdleWorker() */
75 
76 /* Worker process statuses */
77 typedef enum
78 {
79 	WRKR_NOT_STARTED = 0,
80 	WRKR_IDLE,
81 	WRKR_WORKING,
82 	WRKR_TERMINATED
83 } T_WorkerStatus;
84 
85 #define WORKER_IS_RUNNING(workerStatus) \
86 	((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
87 
88 /*
89  * Private per-parallel-worker state (typedef for this is in parallel.h).
90  *
91  * Much of this is valid only in the leader process (or, on Windows, should
92  * be touched only by the leader thread).  But the AH field should be touched
93  * only by workers.  The pipe descriptors are valid everywhere.
94  */
95 struct ParallelSlot
96 {
97 	T_WorkerStatus workerStatus;	/* see enum above */
98 
99 	/* These fields are valid if workerStatus == WRKR_WORKING: */
100 	ParallelCompletionPtr callback; /* function to call on completion */
101 	void	   *callback_data;	/* passthrough data for it */
102 
103 	ArchiveHandle *AH;			/* Archive data worker is using */
104 
105 	int			pipeRead;		/* leader's end of the pipes */
106 	int			pipeWrite;
107 	int			pipeRevRead;	/* child's end of the pipes */
108 	int			pipeRevWrite;
109 
110 	/* Child process/thread identity info: */
111 #ifdef WIN32
112 	uintptr_t	hThread;
113 	unsigned int threadId;
114 #else
115 	pid_t		pid;
116 #endif
117 };
118 
119 #ifdef WIN32
120 
121 /*
122  * Structure to hold info passed by _beginthreadex() to the function it calls
123  * via its single allowed argument.
124  */
125 typedef struct
126 {
127 	ArchiveHandle *AH;			/* leader database connection */
128 	ParallelSlot *slot;			/* this worker's parallel slot */
129 } WorkerInfo;
130 
131 /* Windows implementation of pipe access */
132 static int	pgpipe(int handles[2]);
133 #define piperead(a,b,c)		recv(a,b,c,0)
134 #define pipewrite(a,b,c)	send(a,b,c,0)
135 
136 #else							/* !WIN32 */
137 
138 /* Non-Windows implementation of pipe access */
139 #define pgpipe(a)			pipe(a)
140 #define piperead(a,b,c)		read(a,b,c)
141 #define pipewrite(a,b,c)	write(a,b,c)
142 
143 #endif							/* WIN32 */
144 
145 /*
146  * State info for archive_close_connection() shutdown callback.
147  */
148 typedef struct ShutdownInformation
149 {
150 	ParallelState *pstate;
151 	Archive    *AHX;
152 } ShutdownInformation;
153 
154 static ShutdownInformation shutdown_info;
155 
156 /*
157  * State info for signal handling.
158  * We assume signal_info initializes to zeroes.
159  *
160  * On Unix, myAH is the leader DB connection in the leader process, and the
161  * worker's own connection in worker processes.  On Windows, we have only one
162  * instance of signal_info, so myAH is the leader connection and the worker
163  * connections must be dug out of pstate->parallelSlot[].
164  */
165 typedef struct DumpSignalInformation
166 {
167 	ArchiveHandle *myAH;		/* database connection to issue cancel for */
168 	ParallelState *pstate;		/* parallel state, if any */
169 	bool		handler_set;	/* signal handler set up in this process? */
170 #ifndef WIN32
171 	bool		am_worker;		/* am I a worker process? */
172 #endif
173 } DumpSignalInformation;
174 
175 static volatile DumpSignalInformation signal_info;
176 
177 #ifdef WIN32
178 static CRITICAL_SECTION signal_info_lock;
179 #endif
180 
181 /*
182  * Write a simple string to stderr --- must be safe in a signal handler.
183  * We ignore the write() result since there's not much we could do about it.
184  * Certain compilers make that harder than it ought to be.
185  */
186 #define write_stderr(str) \
187 	do { \
188 		const char *str_ = (str); \
189 		int		rc_; \
190 		rc_ = write(fileno(stderr), str_, strlen(str_)); \
191 		(void) rc_; \
192 	} while (0)
193 
194 
195 #ifdef WIN32
196 /* file-scope variables */
197 static DWORD tls_index;
198 
199 /* globally visible variables (needed by exit_nicely) */
200 bool		parallel_init_done = false;
201 DWORD		mainThreadId;
202 #endif							/* WIN32 */
203 
204 /* Local function prototypes */
205 static ParallelSlot *GetMyPSlot(ParallelState *pstate);
206 static void archive_close_connection(int code, void *arg);
207 static void ShutdownWorkersHard(ParallelState *pstate);
208 static void WaitForTerminatingWorkers(ParallelState *pstate);
209 static void setup_cancel_handler(void);
210 static void set_cancel_pstate(ParallelState *pstate);
211 static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
212 static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
213 static int	GetIdleWorker(ParallelState *pstate);
214 static bool HasEveryWorkerTerminated(ParallelState *pstate);
215 static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
216 static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
217 static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
218 							bool do_wait);
219 static char *getMessageFromLeader(int pipefd[2]);
220 static void sendMessageToLeader(int pipefd[2], const char *str);
221 static int	select_loop(int maxFd, fd_set *workerset);
222 static char *getMessageFromWorker(ParallelState *pstate,
223 								  bool do_wait, int *worker);
224 static void sendMessageToWorker(ParallelState *pstate,
225 								int worker, const char *str);
226 static char *readMessageFromPipe(int fd);
227 
228 #define messageStartsWith(msg, prefix) \
229 	(strncmp(msg, prefix, strlen(prefix)) == 0)
230 
231 
232 /*
233  * Initialize parallel dump support --- should be called early in process
234  * startup.  (Currently, this is called whether or not we intend parallel
235  * activity.)
236  */
237 void
init_parallel_dump_utils(void)238 init_parallel_dump_utils(void)
239 {
240 #ifdef WIN32
241 	if (!parallel_init_done)
242 	{
243 		WSADATA		wsaData;
244 		int			err;
245 
246 		/* Prepare for threaded operation */
247 		tls_index = TlsAlloc();
248 		mainThreadId = GetCurrentThreadId();
249 
250 		/* Initialize socket access */
251 		err = WSAStartup(MAKEWORD(2, 2), &wsaData);
252 		if (err != 0)
253 		{
254 			pg_log_error("%s() failed: error code %d", "WSAStartup", err);
255 			exit_nicely(1);
256 		}
257 
258 		parallel_init_done = true;
259 	}
260 #endif
261 }
262 
263 /*
264  * Find the ParallelSlot for the current worker process or thread.
265  *
266  * Returns NULL if no matching slot is found (this implies we're the leader).
267  */
268 static ParallelSlot *
GetMyPSlot(ParallelState * pstate)269 GetMyPSlot(ParallelState *pstate)
270 {
271 	int			i;
272 
273 	for (i = 0; i < pstate->numWorkers; i++)
274 	{
275 #ifdef WIN32
276 		if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
277 #else
278 		if (pstate->parallelSlot[i].pid == getpid())
279 #endif
280 			return &(pstate->parallelSlot[i]);
281 	}
282 
283 	return NULL;
284 }
285 
286 /*
287  * A thread-local version of getLocalPQExpBuffer().
288  *
289  * Non-reentrant but reduces memory leakage: we'll consume one buffer per
290  * thread, which is much better than one per fmtId/fmtQualifiedId call.
291  */
292 #ifdef WIN32
293 static PQExpBuffer
getThreadLocalPQExpBuffer(void)294 getThreadLocalPQExpBuffer(void)
295 {
296 	/*
297 	 * The Tls code goes awry if we use a static var, so we provide for both
298 	 * static and auto, and omit any use of the static var when using Tls. We
299 	 * rely on TlsGetValue() to return 0 if the value is not yet set.
300 	 */
301 	static PQExpBuffer s_id_return = NULL;
302 	PQExpBuffer id_return;
303 
304 	if (parallel_init_done)
305 		id_return = (PQExpBuffer) TlsGetValue(tls_index);
306 	else
307 		id_return = s_id_return;
308 
309 	if (id_return)				/* first time through? */
310 	{
311 		/* same buffer, just wipe contents */
312 		resetPQExpBuffer(id_return);
313 	}
314 	else
315 	{
316 		/* new buffer */
317 		id_return = createPQExpBuffer();
318 		if (parallel_init_done)
319 			TlsSetValue(tls_index, id_return);
320 		else
321 			s_id_return = id_return;
322 	}
323 
324 	return id_return;
325 }
326 #endif							/* WIN32 */
327 
328 /*
329  * pg_dump and pg_restore call this to register the cleanup handler
330  * as soon as they've created the ArchiveHandle.
331  */
332 void
on_exit_close_archive(Archive * AHX)333 on_exit_close_archive(Archive *AHX)
334 {
335 	shutdown_info.AHX = AHX;
336 	on_exit_nicely(archive_close_connection, &shutdown_info);
337 }
338 
339 /*
340  * on_exit_nicely handler for shutting down database connections and
341  * worker processes cleanly.
342  */
343 static void
archive_close_connection(int code,void * arg)344 archive_close_connection(int code, void *arg)
345 {
346 	ShutdownInformation *si = (ShutdownInformation *) arg;
347 
348 	if (si->pstate)
349 	{
350 		/* In parallel mode, must figure out who we are */
351 		ParallelSlot *slot = GetMyPSlot(si->pstate);
352 
353 		if (!slot)
354 		{
355 			/*
356 			 * We're the leader.  Forcibly shut down workers, then close our
357 			 * own database connection, if any.
358 			 */
359 			ShutdownWorkersHard(si->pstate);
360 
361 			if (si->AHX)
362 				DisconnectDatabase(si->AHX);
363 		}
364 		else
365 		{
366 			/*
367 			 * We're a worker.  Shut down our own DB connection if any.  On
368 			 * Windows, we also have to close our communication sockets, to
369 			 * emulate what will happen on Unix when the worker process exits.
370 			 * (Without this, if this is a premature exit, the leader would
371 			 * fail to detect it because there would be no EOF condition on
372 			 * the other end of the pipe.)
373 			 */
374 			if (slot->AH)
375 				DisconnectDatabase(&(slot->AH->public));
376 
377 #ifdef WIN32
378 			closesocket(slot->pipeRevRead);
379 			closesocket(slot->pipeRevWrite);
380 #endif
381 		}
382 	}
383 	else
384 	{
385 		/* Non-parallel operation: just kill the leader DB connection */
386 		if (si->AHX)
387 			DisconnectDatabase(si->AHX);
388 	}
389 }
390 
391 /*
392  * Forcibly shut down any remaining workers, waiting for them to finish.
393  *
394  * Note that we don't expect to come here during normal exit (the workers
395  * should be long gone, and the ParallelState too).  We're only here in a
396  * fatal() situation, so intervening to cancel active commands is
397  * appropriate.
398  */
399 static void
ShutdownWorkersHard(ParallelState * pstate)400 ShutdownWorkersHard(ParallelState *pstate)
401 {
402 	int			i;
403 
404 	/*
405 	 * Close our write end of the sockets so that any workers waiting for
406 	 * commands know they can exit.  (Note: some of the pipeWrite fields might
407 	 * still be zero, if we failed to initialize all the workers.  Hence, just
408 	 * ignore errors here.)
409 	 */
410 	for (i = 0; i < pstate->numWorkers; i++)
411 		closesocket(pstate->parallelSlot[i].pipeWrite);
412 
413 	/*
414 	 * Force early termination of any commands currently in progress.
415 	 */
416 #ifndef WIN32
417 	/* On non-Windows, send SIGTERM to each worker process. */
418 	for (i = 0; i < pstate->numWorkers; i++)
419 	{
420 		pid_t		pid = pstate->parallelSlot[i].pid;
421 
422 		if (pid != 0)
423 			kill(pid, SIGTERM);
424 	}
425 #else
426 
427 	/*
428 	 * On Windows, send query cancels directly to the workers' backends.  Use
429 	 * a critical section to ensure worker threads don't change state.
430 	 */
431 	EnterCriticalSection(&signal_info_lock);
432 	for (i = 0; i < pstate->numWorkers; i++)
433 	{
434 		ArchiveHandle *AH = pstate->parallelSlot[i].AH;
435 		char		errbuf[1];
436 
437 		if (AH != NULL && AH->connCancel != NULL)
438 			(void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
439 	}
440 	LeaveCriticalSection(&signal_info_lock);
441 #endif
442 
443 	/* Now wait for them to terminate. */
444 	WaitForTerminatingWorkers(pstate);
445 }
446 
447 /*
448  * Wait for all workers to terminate.
449  */
450 static void
WaitForTerminatingWorkers(ParallelState * pstate)451 WaitForTerminatingWorkers(ParallelState *pstate)
452 {
453 	while (!HasEveryWorkerTerminated(pstate))
454 	{
455 		ParallelSlot *slot = NULL;
456 		int			j;
457 
458 #ifndef WIN32
459 		/* On non-Windows, use wait() to wait for next worker to end */
460 		int			status;
461 		pid_t		pid = wait(&status);
462 
463 		/* Find dead worker's slot, and clear the PID field */
464 		for (j = 0; j < pstate->numWorkers; j++)
465 		{
466 			slot = &(pstate->parallelSlot[j]);
467 			if (slot->pid == pid)
468 			{
469 				slot->pid = 0;
470 				break;
471 			}
472 		}
473 #else							/* WIN32 */
474 		/* On Windows, we must use WaitForMultipleObjects() */
475 		HANDLE	   *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
476 		int			nrun = 0;
477 		DWORD		ret;
478 		uintptr_t	hThread;
479 
480 		for (j = 0; j < pstate->numWorkers; j++)
481 		{
482 			if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
483 			{
484 				lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
485 				nrun++;
486 			}
487 		}
488 		ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
489 		Assert(ret != WAIT_FAILED);
490 		hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
491 		free(lpHandles);
492 
493 		/* Find dead worker's slot, and clear the hThread field */
494 		for (j = 0; j < pstate->numWorkers; j++)
495 		{
496 			slot = &(pstate->parallelSlot[j]);
497 			if (slot->hThread == hThread)
498 			{
499 				/* For cleanliness, close handles for dead threads */
500 				CloseHandle((HANDLE) slot->hThread);
501 				slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
502 				break;
503 			}
504 		}
505 #endif							/* WIN32 */
506 
507 		/* On all platforms, update workerStatus and te[] as well */
508 		Assert(j < pstate->numWorkers);
509 		slot->workerStatus = WRKR_TERMINATED;
510 		pstate->te[j] = NULL;
511 	}
512 }
513 
514 
515 /*
516  * Code for responding to cancel interrupts (SIGINT, control-C, etc)
517  *
518  * This doesn't quite belong in this module, but it needs access to the
519  * ParallelState data, so there's not really a better place either.
520  *
521  * When we get a cancel interrupt, we could just die, but in pg_restore that
522  * could leave a SQL command (e.g., CREATE INDEX on a large table) running
523  * for a long time.  Instead, we try to send a cancel request and then die.
524  * pg_dump probably doesn't really need this, but we might as well use it
525  * there too.  Note that sending the cancel directly from the signal handler
526  * is safe because PQcancel() is written to make it so.
527  *
528  * In parallel operation on Unix, each process is responsible for canceling
529  * its own connection (this must be so because nobody else has access to it).
530  * Furthermore, the leader process should attempt to forward its signal to
531  * each child.  In simple manual use of pg_dump/pg_restore, forwarding isn't
532  * needed because typing control-C at the console would deliver SIGINT to
533  * every member of the terminal process group --- but in other scenarios it
534  * might be that only the leader gets signaled.
535  *
536  * On Windows, the cancel handler runs in a separate thread, because that's
537  * how SetConsoleCtrlHandler works.  We make it stop worker threads, send
538  * cancels on all active connections, and then return FALSE, which will allow
539  * the process to die.  For safety's sake, we use a critical section to
540  * protect the PGcancel structures against being changed while the signal
541  * thread runs.
542  */
543 
544 #ifndef WIN32
545 
546 /*
547  * Signal handler (Unix only)
548  */
549 static void
sigTermHandler(SIGNAL_ARGS)550 sigTermHandler(SIGNAL_ARGS)
551 {
552 	int			i;
553 	char		errbuf[1];
554 
555 	/*
556 	 * Some platforms allow delivery of new signals to interrupt an active
557 	 * signal handler.  That could muck up our attempt to send PQcancel, so
558 	 * disable the signals that setup_cancel_handler enabled.
559 	 */
560 	pqsignal(SIGINT, SIG_IGN);
561 	pqsignal(SIGTERM, SIG_IGN);
562 	pqsignal(SIGQUIT, SIG_IGN);
563 
564 	/*
565 	 * If we're in the leader, forward signal to all workers.  (It seems best
566 	 * to do this before PQcancel; killing the leader transaction will result
567 	 * in invalid-snapshot errors from active workers, which maybe we can
568 	 * quiet by killing workers first.)  Ignore any errors.
569 	 */
570 	if (signal_info.pstate != NULL)
571 	{
572 		for (i = 0; i < signal_info.pstate->numWorkers; i++)
573 		{
574 			pid_t		pid = signal_info.pstate->parallelSlot[i].pid;
575 
576 			if (pid != 0)
577 				kill(pid, SIGTERM);
578 		}
579 	}
580 
581 	/*
582 	 * Send QueryCancel if we have a connection to send to.  Ignore errors,
583 	 * there's not much we can do about them anyway.
584 	 */
585 	if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
586 		(void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
587 
588 	/*
589 	 * Report we're quitting, using nothing more complicated than write(2).
590 	 * When in parallel operation, only the leader process should do this.
591 	 */
592 	if (!signal_info.am_worker)
593 	{
594 		if (progname)
595 		{
596 			write_stderr(progname);
597 			write_stderr(": ");
598 		}
599 		write_stderr("terminated by user\n");
600 	}
601 
602 	/*
603 	 * And die, using _exit() not exit() because the latter will invoke atexit
604 	 * handlers that can fail if we interrupted related code.
605 	 */
606 	_exit(1);
607 }
608 
609 /*
610  * Enable cancel interrupt handler, if not already done.
611  */
612 static void
setup_cancel_handler(void)613 setup_cancel_handler(void)
614 {
615 	/*
616 	 * When forking, signal_info.handler_set will propagate into the new
617 	 * process, but that's fine because the signal handler state does too.
618 	 */
619 	if (!signal_info.handler_set)
620 	{
621 		signal_info.handler_set = true;
622 
623 		pqsignal(SIGINT, sigTermHandler);
624 		pqsignal(SIGTERM, sigTermHandler);
625 		pqsignal(SIGQUIT, sigTermHandler);
626 	}
627 }
628 
629 #else							/* WIN32 */
630 
631 /*
632  * Console interrupt handler --- runs in a newly-started thread.
633  *
634  * After stopping other threads and sending cancel requests on all open
635  * connections, we return FALSE which will allow the default ExitProcess()
636  * action to be taken.
637  */
638 static BOOL WINAPI
consoleHandler(DWORD dwCtrlType)639 consoleHandler(DWORD dwCtrlType)
640 {
641 	int			i;
642 	char		errbuf[1];
643 
644 	if (dwCtrlType == CTRL_C_EVENT ||
645 		dwCtrlType == CTRL_BREAK_EVENT)
646 	{
647 		/* Critical section prevents changing data we look at here */
648 		EnterCriticalSection(&signal_info_lock);
649 
650 		/*
651 		 * If in parallel mode, stop worker threads and send QueryCancel to
652 		 * their connected backends.  The main point of stopping the worker
653 		 * threads is to keep them from reporting the query cancels as errors,
654 		 * which would clutter the user's screen.  We needn't stop the leader
655 		 * thread since it won't be doing much anyway.  Do this before
656 		 * canceling the main transaction, else we might get invalid-snapshot
657 		 * errors reported before we can stop the workers.  Ignore errors,
658 		 * there's not much we can do about them anyway.
659 		 */
660 		if (signal_info.pstate != NULL)
661 		{
662 			for (i = 0; i < signal_info.pstate->numWorkers; i++)
663 			{
664 				ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
665 				ArchiveHandle *AH = slot->AH;
666 				HANDLE		hThread = (HANDLE) slot->hThread;
667 
668 				/*
669 				 * Using TerminateThread here may leave some resources leaked,
670 				 * but it doesn't matter since we're about to end the whole
671 				 * process.
672 				 */
673 				if (hThread != INVALID_HANDLE_VALUE)
674 					TerminateThread(hThread, 0);
675 
676 				if (AH != NULL && AH->connCancel != NULL)
677 					(void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
678 			}
679 		}
680 
681 		/*
682 		 * Send QueryCancel to leader connection, if enabled.  Ignore errors,
683 		 * there's not much we can do about them anyway.
684 		 */
685 		if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
686 			(void) PQcancel(signal_info.myAH->connCancel,
687 							errbuf, sizeof(errbuf));
688 
689 		LeaveCriticalSection(&signal_info_lock);
690 
691 		/*
692 		 * Report we're quitting, using nothing more complicated than
693 		 * write(2).  (We might be able to get away with using pg_log_*()
694 		 * here, but since we terminated other threads uncleanly above, it
695 		 * seems better to assume as little as possible.)
696 		 */
697 		if (progname)
698 		{
699 			write_stderr(progname);
700 			write_stderr(": ");
701 		}
702 		write_stderr("terminated by user\n");
703 	}
704 
705 	/* Always return FALSE to allow signal handling to continue */
706 	return FALSE;
707 }
708 
709 /*
710  * Enable cancel interrupt handler, if not already done.
711  */
712 static void
setup_cancel_handler(void)713 setup_cancel_handler(void)
714 {
715 	if (!signal_info.handler_set)
716 	{
717 		signal_info.handler_set = true;
718 
719 		InitializeCriticalSection(&signal_info_lock);
720 
721 		SetConsoleCtrlHandler(consoleHandler, TRUE);
722 	}
723 }
724 
725 #endif							/* WIN32 */
726 
727 
728 /*
729  * set_archive_cancel_info
730  *
731  * Fill AH->connCancel with cancellation info for the specified database
732  * connection; or clear it if conn is NULL.
733  */
734 void
set_archive_cancel_info(ArchiveHandle * AH,PGconn * conn)735 set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
736 {
737 	PGcancel   *oldConnCancel;
738 
739 	/*
740 	 * Activate the interrupt handler if we didn't yet in this process.  On
741 	 * Windows, this also initializes signal_info_lock; therefore it's
742 	 * important that this happen at least once before we fork off any
743 	 * threads.
744 	 */
745 	setup_cancel_handler();
746 
747 	/*
748 	 * On Unix, we assume that storing a pointer value is atomic with respect
749 	 * to any possible signal interrupt.  On Windows, use a critical section.
750 	 */
751 
752 #ifdef WIN32
753 	EnterCriticalSection(&signal_info_lock);
754 #endif
755 
756 	/* Free the old one if we have one */
757 	oldConnCancel = AH->connCancel;
758 	/* be sure interrupt handler doesn't use pointer while freeing */
759 	AH->connCancel = NULL;
760 
761 	if (oldConnCancel != NULL)
762 		PQfreeCancel(oldConnCancel);
763 
764 	/* Set the new one if specified */
765 	if (conn)
766 		AH->connCancel = PQgetCancel(conn);
767 
768 	/*
769 	 * On Unix, there's only ever one active ArchiveHandle per process, so we
770 	 * can just set signal_info.myAH unconditionally.  On Windows, do that
771 	 * only in the main thread; worker threads have to make sure their
772 	 * ArchiveHandle appears in the pstate data, which is dealt with in
773 	 * RunWorker().
774 	 */
775 #ifndef WIN32
776 	signal_info.myAH = AH;
777 #else
778 	if (mainThreadId == GetCurrentThreadId())
779 		signal_info.myAH = AH;
780 #endif
781 
782 #ifdef WIN32
783 	LeaveCriticalSection(&signal_info_lock);
784 #endif
785 }
786 
787 /*
788  * set_cancel_pstate
789  *
790  * Set signal_info.pstate to point to the specified ParallelState, if any.
791  * We need this mainly to have an interlock against Windows signal thread.
792  */
793 static void
set_cancel_pstate(ParallelState * pstate)794 set_cancel_pstate(ParallelState *pstate)
795 {
796 #ifdef WIN32
797 	EnterCriticalSection(&signal_info_lock);
798 #endif
799 
800 	signal_info.pstate = pstate;
801 
802 #ifdef WIN32
803 	LeaveCriticalSection(&signal_info_lock);
804 #endif
805 }
806 
807 /*
808  * set_cancel_slot_archive
809  *
810  * Set ParallelSlot's AH field to point to the specified archive, if any.
811  * We need this mainly to have an interlock against Windows signal thread.
812  */
813 static void
set_cancel_slot_archive(ParallelSlot * slot,ArchiveHandle * AH)814 set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
815 {
816 #ifdef WIN32
817 	EnterCriticalSection(&signal_info_lock);
818 #endif
819 
820 	slot->AH = AH;
821 
822 #ifdef WIN32
823 	LeaveCriticalSection(&signal_info_lock);
824 #endif
825 }
826 
827 
828 /*
829  * This function is called by both Unix and Windows variants to set up
830  * and run a worker process.  Caller should exit the process (or thread)
831  * upon return.
832  */
833 static void
RunWorker(ArchiveHandle * AH,ParallelSlot * slot)834 RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
835 {
836 	int			pipefd[2];
837 
838 	/* fetch child ends of pipes */
839 	pipefd[PIPE_READ] = slot->pipeRevRead;
840 	pipefd[PIPE_WRITE] = slot->pipeRevWrite;
841 
842 	/*
843 	 * Clone the archive so that we have our own state to work with, and in
844 	 * particular our own database connection.
845 	 *
846 	 * We clone on Unix as well as Windows, even though technically we don't
847 	 * need to because fork() gives us a copy in our own address space
848 	 * already.  But CloneArchive resets the state information and also clones
849 	 * the database connection which both seem kinda helpful.
850 	 */
851 	AH = CloneArchive(AH);
852 
853 	/* Remember cloned archive where signal handler can find it */
854 	set_cancel_slot_archive(slot, AH);
855 
856 	/*
857 	 * Call the setup worker function that's defined in the ArchiveHandle.
858 	 */
859 	(AH->SetupWorkerPtr) ((Archive *) AH);
860 
861 	/*
862 	 * Execute commands until done.
863 	 */
864 	WaitForCommands(AH, pipefd);
865 
866 	/*
867 	 * Disconnect from database and clean up.
868 	 */
869 	set_cancel_slot_archive(slot, NULL);
870 	DisconnectDatabase(&(AH->public));
871 	DeCloneArchive(AH);
872 }
873 
874 /*
875  * Thread base function for Windows
876  */
877 #ifdef WIN32
878 static unsigned __stdcall
init_spawned_worker_win32(WorkerInfo * wi)879 init_spawned_worker_win32(WorkerInfo *wi)
880 {
881 	ArchiveHandle *AH = wi->AH;
882 	ParallelSlot *slot = wi->slot;
883 
884 	/* Don't need WorkerInfo anymore */
885 	free(wi);
886 
887 	/* Run the worker ... */
888 	RunWorker(AH, slot);
889 
890 	/* Exit the thread */
891 	_endthreadex(0);
892 	return 0;
893 }
894 #endif							/* WIN32 */
895 
896 /*
897  * This function starts a parallel dump or restore by spawning off the worker
898  * processes.  For Windows, it creates a number of threads; on Unix the
899  * workers are created with fork().
900  */
901 ParallelState *
ParallelBackupStart(ArchiveHandle * AH)902 ParallelBackupStart(ArchiveHandle *AH)
903 {
904 	ParallelState *pstate;
905 	int			i;
906 
907 	Assert(AH->public.numWorkers > 0);
908 
909 	pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
910 
911 	pstate->numWorkers = AH->public.numWorkers;
912 	pstate->te = NULL;
913 	pstate->parallelSlot = NULL;
914 
915 	if (AH->public.numWorkers == 1)
916 		return pstate;
917 
918 	/* Create status arrays, being sure to initialize all fields to 0 */
919 	pstate->te = (TocEntry **)
920 		pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
921 	pstate->parallelSlot = (ParallelSlot *)
922 		pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
923 
924 #ifdef WIN32
925 	/* Make fmtId() and fmtQualifiedId() use thread-local storage */
926 	getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
927 #endif
928 
929 	/*
930 	 * Set the pstate in shutdown_info, to tell the exit handler that it must
931 	 * clean up workers as well as the main database connection.  But we don't
932 	 * set this in signal_info yet, because we don't want child processes to
933 	 * inherit non-NULL signal_info.pstate.
934 	 */
935 	shutdown_info.pstate = pstate;
936 
937 	/*
938 	 * Temporarily disable query cancellation on the leader connection.  This
939 	 * ensures that child processes won't inherit valid AH->connCancel
940 	 * settings and thus won't try to issue cancels against the leader's
941 	 * connection.  No harm is done if we fail while it's disabled, because
942 	 * the leader connection is idle at this point anyway.
943 	 */
944 	set_archive_cancel_info(AH, NULL);
945 
946 	/* Ensure stdio state is quiesced before forking */
947 	fflush(NULL);
948 
949 	/* Create desired number of workers */
950 	for (i = 0; i < pstate->numWorkers; i++)
951 	{
952 #ifdef WIN32
953 		WorkerInfo *wi;
954 		uintptr_t	handle;
955 #else
956 		pid_t		pid;
957 #endif
958 		ParallelSlot *slot = &(pstate->parallelSlot[i]);
959 		int			pipeMW[2],
960 					pipeWM[2];
961 
962 		/* Create communication pipes for this worker */
963 		if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
964 			fatal("could not create communication channels: %m");
965 
966 		/* leader's ends of the pipes */
967 		slot->pipeRead = pipeWM[PIPE_READ];
968 		slot->pipeWrite = pipeMW[PIPE_WRITE];
969 		/* child's ends of the pipes */
970 		slot->pipeRevRead = pipeMW[PIPE_READ];
971 		slot->pipeRevWrite = pipeWM[PIPE_WRITE];
972 
973 #ifdef WIN32
974 		/* Create transient structure to pass args to worker function */
975 		wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
976 
977 		wi->AH = AH;
978 		wi->slot = slot;
979 
980 		handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
981 								wi, 0, &(slot->threadId));
982 		slot->hThread = handle;
983 		slot->workerStatus = WRKR_IDLE;
984 #else							/* !WIN32 */
985 		pid = fork();
986 		if (pid == 0)
987 		{
988 			/* we are the worker */
989 			int			j;
990 
991 			/* this is needed for GetMyPSlot() */
992 			slot->pid = getpid();
993 
994 			/* instruct signal handler that we're in a worker now */
995 			signal_info.am_worker = true;
996 
997 			/* close read end of Worker -> Leader */
998 			closesocket(pipeWM[PIPE_READ]);
999 			/* close write end of Leader -> Worker */
1000 			closesocket(pipeMW[PIPE_WRITE]);
1001 
1002 			/*
1003 			 * Close all inherited fds for communication of the leader with
1004 			 * previously-forked workers.
1005 			 */
1006 			for (j = 0; j < i; j++)
1007 			{
1008 				closesocket(pstate->parallelSlot[j].pipeRead);
1009 				closesocket(pstate->parallelSlot[j].pipeWrite);
1010 			}
1011 
1012 			/* Run the worker ... */
1013 			RunWorker(AH, slot);
1014 
1015 			/* We can just exit(0) when done */
1016 			exit(0);
1017 		}
1018 		else if (pid < 0)
1019 		{
1020 			/* fork failed */
1021 			fatal("could not create worker process: %m");
1022 		}
1023 
1024 		/* In Leader after successful fork */
1025 		slot->pid = pid;
1026 		slot->workerStatus = WRKR_IDLE;
1027 
1028 		/* close read end of Leader -> Worker */
1029 		closesocket(pipeMW[PIPE_READ]);
1030 		/* close write end of Worker -> Leader */
1031 		closesocket(pipeWM[PIPE_WRITE]);
1032 #endif							/* WIN32 */
1033 	}
1034 
1035 	/*
1036 	 * Having forked off the workers, disable SIGPIPE so that leader isn't
1037 	 * killed if it tries to send a command to a dead worker.  We don't want
1038 	 * the workers to inherit this setting, though.
1039 	 */
1040 #ifndef WIN32
1041 	pqsignal(SIGPIPE, SIG_IGN);
1042 #endif
1043 
1044 	/*
1045 	 * Re-establish query cancellation on the leader connection.
1046 	 */
1047 	set_archive_cancel_info(AH, AH->connection);
1048 
1049 	/*
1050 	 * Tell the cancel signal handler to forward signals to worker processes,
1051 	 * too.  (As with query cancel, we did not need this earlier because the
1052 	 * workers have not yet been given anything to do; if we die before this
1053 	 * point, any already-started workers will see EOF and quit promptly.)
1054 	 */
1055 	set_cancel_pstate(pstate);
1056 
1057 	return pstate;
1058 }
1059 
1060 /*
1061  * Close down a parallel dump or restore.
1062  */
1063 void
ParallelBackupEnd(ArchiveHandle * AH,ParallelState * pstate)1064 ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
1065 {
1066 	int			i;
1067 
1068 	/* No work if non-parallel */
1069 	if (pstate->numWorkers == 1)
1070 		return;
1071 
1072 	/* There should not be any unfinished jobs */
1073 	Assert(IsEveryWorkerIdle(pstate));
1074 
1075 	/* Close the sockets so that the workers know they can exit */
1076 	for (i = 0; i < pstate->numWorkers; i++)
1077 	{
1078 		closesocket(pstate->parallelSlot[i].pipeRead);
1079 		closesocket(pstate->parallelSlot[i].pipeWrite);
1080 	}
1081 
1082 	/* Wait for them to exit */
1083 	WaitForTerminatingWorkers(pstate);
1084 
1085 	/*
1086 	 * Unlink pstate from shutdown_info, so the exit handler will not try to
1087 	 * use it; and likewise unlink from signal_info.
1088 	 */
1089 	shutdown_info.pstate = NULL;
1090 	set_cancel_pstate(NULL);
1091 
1092 	/* Release state (mere neatnik-ism, since we're about to terminate) */
1093 	free(pstate->te);
1094 	free(pstate->parallelSlot);
1095 	free(pstate);
1096 }
1097 
1098 /*
1099  * These next four functions handle construction and parsing of the command
1100  * strings and response strings for parallel workers.
1101  *
1102  * Currently, these can be the same regardless of which archive format we are
1103  * processing.  In future, we might want to let format modules override these
1104  * functions to add format-specific data to a command or response.
1105  */
1106 
1107 /*
1108  * buildWorkerCommand: format a command string to send to a worker.
1109  *
1110  * The string is built in the caller-supplied buffer of size buflen.
1111  */
1112 static void
buildWorkerCommand(ArchiveHandle * AH,TocEntry * te,T_Action act,char * buf,int buflen)1113 buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act,
1114 				   char *buf, int buflen)
1115 {
1116 	if (act == ACT_DUMP)
1117 		snprintf(buf, buflen, "DUMP %d", te->dumpId);
1118 	else if (act == ACT_RESTORE)
1119 		snprintf(buf, buflen, "RESTORE %d", te->dumpId);
1120 	else
1121 		Assert(false);
1122 }
1123 
1124 /*
1125  * parseWorkerCommand: interpret a command string in a worker.
1126  */
1127 static void
parseWorkerCommand(ArchiveHandle * AH,TocEntry ** te,T_Action * act,const char * msg)1128 parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act,
1129 				   const char *msg)
1130 {
1131 	DumpId		dumpId;
1132 	int			nBytes;
1133 
1134 	if (messageStartsWith(msg, "DUMP "))
1135 	{
1136 		*act = ACT_DUMP;
1137 		sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
1138 		Assert(nBytes == strlen(msg));
1139 		*te = getTocEntryByDumpId(AH, dumpId);
1140 		Assert(*te != NULL);
1141 	}
1142 	else if (messageStartsWith(msg, "RESTORE "))
1143 	{
1144 		*act = ACT_RESTORE;
1145 		sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
1146 		Assert(nBytes == strlen(msg));
1147 		*te = getTocEntryByDumpId(AH, dumpId);
1148 		Assert(*te != NULL);
1149 	}
1150 	else
1151 		fatal("unrecognized command received from leader: \"%s\"",
1152 			  msg);
1153 }
1154 
1155 /*
1156  * buildWorkerResponse: format a response string to send to the leader.
1157  *
1158  * The string is built in the caller-supplied buffer of size buflen.
1159  */
1160 static void
buildWorkerResponse(ArchiveHandle * AH,TocEntry * te,T_Action act,int status,char * buf,int buflen)1161 buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status,
1162 					char *buf, int buflen)
1163 {
1164 	snprintf(buf, buflen, "OK %d %d %d",
1165 			 te->dumpId,
1166 			 status,
1167 			 status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
1168 }
1169 
1170 /*
1171  * parseWorkerResponse: parse the status message returned by a worker.
1172  *
1173  * Returns the integer status code, and may update fields of AH and/or te.
1174  */
1175 static int
parseWorkerResponse(ArchiveHandle * AH,TocEntry * te,const char * msg)1176 parseWorkerResponse(ArchiveHandle *AH, TocEntry *te,
1177 					const char *msg)
1178 {
1179 	DumpId		dumpId;
1180 	int			nBytes,
1181 				n_errors;
1182 	int			status = 0;
1183 
1184 	if (messageStartsWith(msg, "OK "))
1185 	{
1186 		sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1187 
1188 		Assert(dumpId == te->dumpId);
1189 		Assert(nBytes == strlen(msg));
1190 
1191 		AH->public.n_errors += n_errors;
1192 	}
1193 	else
1194 		fatal("invalid message received from worker: \"%s\"",
1195 			  msg);
1196 
1197 	return status;
1198 }
1199 
1200 /*
1201  * Dispatch a job to some free worker.
1202  *
1203  * te is the TocEntry to be processed, act is the action to be taken on it.
1204  * callback is the function to call on completion of the job.
1205  *
1206  * If no worker is currently available, this will block, and previously
1207  * registered callback functions may be called.
1208  */
1209 void
DispatchJobForTocEntry(ArchiveHandle * AH,ParallelState * pstate,TocEntry * te,T_Action act,ParallelCompletionPtr callback,void * callback_data)1210 DispatchJobForTocEntry(ArchiveHandle *AH,
1211 					   ParallelState *pstate,
1212 					   TocEntry *te,
1213 					   T_Action act,
1214 					   ParallelCompletionPtr callback,
1215 					   void *callback_data)
1216 {
1217 	int			worker;
1218 	char		buf[256];
1219 
1220 	/* Get a worker, waiting if none are idle */
1221 	while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
1222 		WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
1223 
1224 	/* Construct and send command string */
1225 	buildWorkerCommand(AH, te, act, buf, sizeof(buf));
1226 
1227 	sendMessageToWorker(pstate, worker, buf);
1228 
1229 	/* Remember worker is busy, and which TocEntry it's working on */
1230 	pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
1231 	pstate->parallelSlot[worker].callback = callback;
1232 	pstate->parallelSlot[worker].callback_data = callback_data;
1233 	pstate->te[worker] = te;
1234 }
1235 
1236 /*
1237  * Find an idle worker and return its slot number.
1238  * Return NO_SLOT if none are idle.
1239  */
1240 static int
GetIdleWorker(ParallelState * pstate)1241 GetIdleWorker(ParallelState *pstate)
1242 {
1243 	int			i;
1244 
1245 	for (i = 0; i < pstate->numWorkers; i++)
1246 	{
1247 		if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
1248 			return i;
1249 	}
1250 	return NO_SLOT;
1251 }
1252 
1253 /*
1254  * Return true iff no worker is running.
1255  */
1256 static bool
HasEveryWorkerTerminated(ParallelState * pstate)1257 HasEveryWorkerTerminated(ParallelState *pstate)
1258 {
1259 	int			i;
1260 
1261 	for (i = 0; i < pstate->numWorkers; i++)
1262 	{
1263 		if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1264 			return false;
1265 	}
1266 	return true;
1267 }
1268 
1269 /*
1270  * Return true iff every worker is in the WRKR_IDLE state.
1271  */
1272 bool
IsEveryWorkerIdle(ParallelState * pstate)1273 IsEveryWorkerIdle(ParallelState *pstate)
1274 {
1275 	int			i;
1276 
1277 	for (i = 0; i < pstate->numWorkers; i++)
1278 	{
1279 		if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
1280 			return false;
1281 	}
1282 	return true;
1283 }
1284 
1285 /*
1286  * Acquire lock on a table to be dumped by a worker process.
1287  *
1288  * The leader process is already holding an ACCESS SHARE lock.  Ordinarily
1289  * it's no problem for a worker to get one too, but if anything else besides
1290  * pg_dump is running, there's a possible deadlock:
1291  *
1292  * 1) Leader dumps the schema and locks all tables in ACCESS SHARE mode.
1293  * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
1294  *	  because the leader holds a conflicting ACCESS SHARE lock).
1295  * 3) A worker process also requests an ACCESS SHARE lock to read the table.
1296  *	  The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
1297  * 4) Now we have a deadlock, since the leader is effectively waiting for
1298  *	  the worker.  The server cannot detect that, however.
1299  *
1300  * To prevent an infinite wait, prior to touching a table in a worker, request
1301  * a lock in ACCESS SHARE mode but with NOWAIT.  If we don't get the lock,
1302  * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
1303  * so we have a deadlock.  We must fail the backup in that case.
1304  */
1305 static void
lockTableForWorker(ArchiveHandle * AH,TocEntry * te)1306 lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
1307 {
1308 	const char *qualId;
1309 	PQExpBuffer query;
1310 	PGresult   *res;
1311 
1312 	/* Nothing to do for BLOBS */
1313 	if (strcmp(te->desc, "BLOBS") == 0)
1314 		return;
1315 
1316 	query = createPQExpBuffer();
1317 
1318 	qualId = fmtQualifiedId(te->namespace, te->tag);
1319 
1320 	appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
1321 					  qualId);
1322 
1323 	res = PQexec(AH->connection, query->data);
1324 
1325 	if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
1326 		fatal("could not obtain lock on relation \"%s\"\n"
1327 			  "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1328 			  "on the table after the pg_dump parent process had gotten the "
1329 			  "initial ACCESS SHARE lock on the table.", qualId);
1330 
1331 	PQclear(res);
1332 	destroyPQExpBuffer(query);
1333 }
1334 
1335 /*
1336  * WaitForCommands: main routine for a worker process.
1337  *
1338  * Read and execute commands from the leader until we see EOF on the pipe.
1339  */
1340 static void
WaitForCommands(ArchiveHandle * AH,int pipefd[2])1341 WaitForCommands(ArchiveHandle *AH, int pipefd[2])
1342 {
1343 	char	   *command;
1344 	TocEntry   *te;
1345 	T_Action	act;
1346 	int			status = 0;
1347 	char		buf[256];
1348 
1349 	for (;;)
1350 	{
1351 		if (!(command = getMessageFromLeader(pipefd)))
1352 		{
1353 			/* EOF, so done */
1354 			return;
1355 		}
1356 
1357 		/* Decode the command */
1358 		parseWorkerCommand(AH, &te, &act, command);
1359 
1360 		if (act == ACT_DUMP)
1361 		{
1362 			/* Acquire lock on this table within the worker's session */
1363 			lockTableForWorker(AH, te);
1364 
1365 			/* Perform the dump command */
1366 			status = (AH->WorkerJobDumpPtr) (AH, te);
1367 		}
1368 		else if (act == ACT_RESTORE)
1369 		{
1370 			/* Perform the restore command */
1371 			status = (AH->WorkerJobRestorePtr) (AH, te);
1372 		}
1373 		else
1374 			Assert(false);
1375 
1376 		/* Return status to leader */
1377 		buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
1378 
1379 		sendMessageToLeader(pipefd, buf);
1380 
1381 		/* command was pg_malloc'd and we are responsible for free()ing it. */
1382 		free(command);
1383 	}
1384 }
1385 
1386 /*
1387  * Check for status messages from workers.
1388  *
1389  * If do_wait is true, wait to get a status message; otherwise, just return
1390  * immediately if there is none available.
1391  *
1392  * When we get a status message, we pass the status code to the callback
1393  * function that was specified to DispatchJobForTocEntry, then reset the
1394  * worker status to IDLE.
1395  *
1396  * Returns true if we collected a status message, else false.
1397  *
1398  * XXX is it worth checking for more than one status message per call?
1399  * It seems somewhat unlikely that multiple workers would finish at exactly
1400  * the same time.
1401  */
1402 static bool
ListenToWorkers(ArchiveHandle * AH,ParallelState * pstate,bool do_wait)1403 ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
1404 {
1405 	int			worker;
1406 	char	   *msg;
1407 
1408 	/* Try to collect a status message */
1409 	msg = getMessageFromWorker(pstate, do_wait, &worker);
1410 
1411 	if (!msg)
1412 	{
1413 		/* If do_wait is true, we must have detected EOF on some socket */
1414 		if (do_wait)
1415 			fatal("a worker process died unexpectedly");
1416 		return false;
1417 	}
1418 
1419 	/* Process it and update our idea of the worker's status */
1420 	if (messageStartsWith(msg, "OK "))
1421 	{
1422 		ParallelSlot *slot = &pstate->parallelSlot[worker];
1423 		TocEntry   *te = pstate->te[worker];
1424 		int			status;
1425 
1426 		status = parseWorkerResponse(AH, te, msg);
1427 		slot->callback(AH, te, status, slot->callback_data);
1428 		slot->workerStatus = WRKR_IDLE;
1429 		pstate->te[worker] = NULL;
1430 	}
1431 	else
1432 		fatal("invalid message received from worker: \"%s\"",
1433 			  msg);
1434 
1435 	/* Free the string returned from getMessageFromWorker */
1436 	free(msg);
1437 
1438 	return true;
1439 }
1440 
1441 /*
1442  * Check for status results from workers, waiting if necessary.
1443  *
1444  * Available wait modes are:
1445  * WFW_NO_WAIT: reap any available status, but don't block
1446  * WFW_GOT_STATUS: wait for at least one more worker to finish
1447  * WFW_ONE_IDLE: wait for at least one worker to be idle
1448  * WFW_ALL_IDLE: wait for all workers to be idle
1449  *
1450  * Any received results are passed to the callback specified to
1451  * DispatchJobForTocEntry.
1452  *
1453  * This function is executed in the leader process.
1454  */
1455 void
WaitForWorkers(ArchiveHandle * AH,ParallelState * pstate,WFW_WaitOption mode)1456 WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
1457 {
1458 	bool		do_wait = false;
1459 
1460 	/*
1461 	 * In GOT_STATUS mode, always block waiting for a message, since we can't
1462 	 * return till we get something.  In other modes, we don't block the first
1463 	 * time through the loop.
1464 	 */
1465 	if (mode == WFW_GOT_STATUS)
1466 	{
1467 		/* Assert that caller knows what it's doing */
1468 		Assert(!IsEveryWorkerIdle(pstate));
1469 		do_wait = true;
1470 	}
1471 
1472 	for (;;)
1473 	{
1474 		/*
1475 		 * Check for status messages, even if we don't need to block.  We do
1476 		 * not try very hard to reap all available messages, though, since
1477 		 * there's unlikely to be more than one.
1478 		 */
1479 		if (ListenToWorkers(AH, pstate, do_wait))
1480 		{
1481 			/*
1482 			 * If we got a message, we are done by definition for GOT_STATUS
1483 			 * mode, and we can also be certain that there's at least one idle
1484 			 * worker.  So we're done in all but ALL_IDLE mode.
1485 			 */
1486 			if (mode != WFW_ALL_IDLE)
1487 				return;
1488 		}
1489 
1490 		/* Check whether we must wait for new status messages */
1491 		switch (mode)
1492 		{
1493 			case WFW_NO_WAIT:
1494 				return;			/* never wait */
1495 			case WFW_GOT_STATUS:
1496 				Assert(false);	/* can't get here, because we waited */
1497 				break;
1498 			case WFW_ONE_IDLE:
1499 				if (GetIdleWorker(pstate) != NO_SLOT)
1500 					return;
1501 				break;
1502 			case WFW_ALL_IDLE:
1503 				if (IsEveryWorkerIdle(pstate))
1504 					return;
1505 				break;
1506 		}
1507 
1508 		/* Loop back, and this time wait for something to happen */
1509 		do_wait = true;
1510 	}
1511 }
1512 
1513 /*
1514  * Read one command message from the leader, blocking if necessary
1515  * until one is available, and return it as a malloc'd string.
1516  * On EOF, return NULL.
1517  *
1518  * This function is executed in worker processes.
1519  */
1520 static char *
getMessageFromLeader(int pipefd[2])1521 getMessageFromLeader(int pipefd[2])
1522 {
1523 	return readMessageFromPipe(pipefd[PIPE_READ]);
1524 }
1525 
1526 /*
1527  * Send a status message to the leader.
1528  *
1529  * This function is executed in worker processes.
1530  */
1531 static void
sendMessageToLeader(int pipefd[2],const char * str)1532 sendMessageToLeader(int pipefd[2], const char *str)
1533 {
1534 	int			len = strlen(str) + 1;
1535 
1536 	if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
1537 		fatal("could not write to the communication channel: %m");
1538 }
1539 
1540 /*
1541  * Wait until some descriptor in "workerset" becomes readable.
1542  * Returns -1 on error, else the number of readable descriptors.
1543  */
1544 static int
select_loop(int maxFd,fd_set * workerset)1545 select_loop(int maxFd, fd_set *workerset)
1546 {
1547 	int			i;
1548 	fd_set		saveSet = *workerset;
1549 
1550 	for (;;)
1551 	{
1552 		*workerset = saveSet;
1553 		i = select(maxFd + 1, workerset, NULL, NULL, NULL);
1554 
1555 #ifndef WIN32
1556 		if (i < 0 && errno == EINTR)
1557 			continue;
1558 #else
1559 		if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1560 			continue;
1561 #endif
1562 		break;
1563 	}
1564 
1565 	return i;
1566 }
1567 
1568 
1569 /*
1570  * Check for messages from worker processes.
1571  *
1572  * If a message is available, return it as a malloc'd string, and put the
1573  * index of the sending worker in *worker.
1574  *
1575  * If nothing is available, wait if "do_wait" is true, else return NULL.
1576  *
1577  * If we detect EOF on any socket, we'll return NULL.  It's not great that
1578  * that's hard to distinguish from the no-data-available case, but for now
1579  * our one caller is okay with that.
1580  *
1581  * This function is executed in the leader process.
1582  */
1583 static char *
getMessageFromWorker(ParallelState * pstate,bool do_wait,int * worker)1584 getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
1585 {
1586 	int			i;
1587 	fd_set		workerset;
1588 	int			maxFd = -1;
1589 	struct timeval nowait = {0, 0};
1590 
1591 	/* construct bitmap of socket descriptors for select() */
1592 	FD_ZERO(&workerset);
1593 	for (i = 0; i < pstate->numWorkers; i++)
1594 	{
1595 		if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1596 			continue;
1597 		FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
1598 		if (pstate->parallelSlot[i].pipeRead > maxFd)
1599 			maxFd = pstate->parallelSlot[i].pipeRead;
1600 	}
1601 
1602 	if (do_wait)
1603 	{
1604 		i = select_loop(maxFd, &workerset);
1605 		Assert(i != 0);
1606 	}
1607 	else
1608 	{
1609 		if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1610 			return NULL;
1611 	}
1612 
1613 	if (i < 0)
1614 		fatal("%s() failed: %m", "select");
1615 
1616 	for (i = 0; i < pstate->numWorkers; i++)
1617 	{
1618 		char	   *msg;
1619 
1620 		if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1621 			continue;
1622 		if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
1623 			continue;
1624 
1625 		/*
1626 		 * Read the message if any.  If the socket is ready because of EOF,
1627 		 * we'll return NULL instead (and the socket will stay ready, so the
1628 		 * condition will persist).
1629 		 *
1630 		 * Note: because this is a blocking read, we'll wait if only part of
1631 		 * the message is available.  Waiting a long time would be bad, but
1632 		 * since worker status messages are short and are always sent in one
1633 		 * operation, it shouldn't be a problem in practice.
1634 		 */
1635 		msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
1636 		*worker = i;
1637 		return msg;
1638 	}
1639 	Assert(false);
1640 	return NULL;
1641 }
1642 
1643 /*
1644  * Send a command message to the specified worker process.
1645  *
1646  * This function is executed in the leader process.
1647  */
1648 static void
sendMessageToWorker(ParallelState * pstate,int worker,const char * str)1649 sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
1650 {
1651 	int			len = strlen(str) + 1;
1652 
1653 	if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
1654 	{
1655 		fatal("could not write to the communication channel: %m");
1656 	}
1657 }
1658 
1659 /*
1660  * Read one message from the specified pipe (fd), blocking if necessary
1661  * until one is available, and return it as a malloc'd string.
1662  * On EOF, return NULL.
1663  *
1664  * A "message" on the channel is just a null-terminated string.
1665  */
1666 static char *
readMessageFromPipe(int fd)1667 readMessageFromPipe(int fd)
1668 {
1669 	char	   *msg;
1670 	int			msgsize,
1671 				bufsize;
1672 	int			ret;
1673 
1674 	/*
1675 	 * In theory, if we let piperead() read multiple bytes, it might give us
1676 	 * back fragments of multiple messages.  (That can't actually occur, since
1677 	 * neither leader nor workers send more than one message without waiting
1678 	 * for a reply, but we don't wish to assume that here.)  For simplicity,
1679 	 * read a byte at a time until we get the terminating '\0'.  This method
1680 	 * is a bit inefficient, but since this is only used for relatively short
1681 	 * command and status strings, it shouldn't matter.
1682 	 */
1683 	bufsize = 64;				/* could be any number */
1684 	msg = (char *) pg_malloc(bufsize);
1685 	msgsize = 0;
1686 	for (;;)
1687 	{
1688 		Assert(msgsize < bufsize);
1689 		ret = piperead(fd, msg + msgsize, 1);
1690 		if (ret <= 0)
1691 			break;				/* error or connection closure */
1692 
1693 		Assert(ret == 1);
1694 
1695 		if (msg[msgsize] == '\0')
1696 			return msg;			/* collected whole message */
1697 
1698 		msgsize++;
1699 		if (msgsize == bufsize) /* enlarge buffer if needed */
1700 		{
1701 			bufsize += 16;		/* could be any number */
1702 			msg = (char *) pg_realloc(msg, bufsize);
1703 		}
1704 	}
1705 
1706 	/* Other end has closed the connection */
1707 	pg_free(msg);
1708 	return NULL;
1709 }
1710 
1711 #ifdef WIN32
1712 
1713 /*
1714  * This is a replacement version of pipe(2) for Windows which allows the pipe
1715  * handles to be used in select().
1716  *
1717  * Reads and writes on the pipe must go through piperead()/pipewrite().
1718  *
1719  * For consistency with Unix we declare the returned handles as "int".
1720  * This is okay even on WIN64 because system handles are not more than
1721  * 32 bits wide, but we do have to do some casting.
1722  */
1723 static int
pgpipe(int handles[2])1724 pgpipe(int handles[2])
1725 {
1726 	pgsocket	s,
1727 				tmp_sock;
1728 	struct sockaddr_in serv_addr;
1729 	int			len = sizeof(serv_addr);
1730 
1731 	/* We have to use the Unix socket invalid file descriptor value here. */
1732 	handles[0] = handles[1] = -1;
1733 
1734 	/*
1735 	 * setup listen socket
1736 	 */
1737 	if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1738 	{
1739 		pg_log_error("pgpipe: could not create socket: error code %d",
1740 					 WSAGetLastError());
1741 		return -1;
1742 	}
1743 
1744 	memset((void *) &serv_addr, 0, sizeof(serv_addr));
1745 	serv_addr.sin_family = AF_INET;
1746 	serv_addr.sin_port = pg_hton16(0);
1747 	serv_addr.sin_addr.s_addr = pg_hton32(INADDR_LOOPBACK);
1748 	if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1749 	{
1750 		pg_log_error("pgpipe: could not bind: error code %d",
1751 					 WSAGetLastError());
1752 		closesocket(s);
1753 		return -1;
1754 	}
1755 	if (listen(s, 1) == SOCKET_ERROR)
1756 	{
1757 		pg_log_error("pgpipe: could not listen: error code %d",
1758 					 WSAGetLastError());
1759 		closesocket(s);
1760 		return -1;
1761 	}
1762 	if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
1763 	{
1764 		pg_log_error("pgpipe: %s() failed: error code %d", "getsockname",
1765 					 WSAGetLastError());
1766 		closesocket(s);
1767 		return -1;
1768 	}
1769 
1770 	/*
1771 	 * setup pipe handles
1772 	 */
1773 	if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1774 	{
1775 		pg_log_error("pgpipe: could not create second socket: error code %d",
1776 					 WSAGetLastError());
1777 		closesocket(s);
1778 		return -1;
1779 	}
1780 	handles[1] = (int) tmp_sock;
1781 
1782 	if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1783 	{
1784 		pg_log_error("pgpipe: could not connect socket: error code %d",
1785 					 WSAGetLastError());
1786 		closesocket(handles[1]);
1787 		handles[1] = -1;
1788 		closesocket(s);
1789 		return -1;
1790 	}
1791 	if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
1792 	{
1793 		pg_log_error("pgpipe: could not accept connection: error code %d",
1794 					 WSAGetLastError());
1795 		closesocket(handles[1]);
1796 		handles[1] = -1;
1797 		closesocket(s);
1798 		return -1;
1799 	}
1800 	handles[0] = (int) tmp_sock;
1801 
1802 	closesocket(s);
1803 	return 0;
1804 }
1805 
1806 #endif							/* WIN32 */
1807