1 /*-------------------------------------------------------------------------
2  *
3  * parallel.c
4  *	  Infrastructure for launching parallel workers
5  *
6  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *	  src/backend/access/transam/parallel.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/parallel.h"
18 #include "access/xact.h"
19 #include "access/xlog.h"
20 #include "catalog/namespace.h"
21 #include "commands/async.h"
22 #include "executor/execParallel.h"
23 #include "libpq/libpq.h"
24 #include "libpq/pqformat.h"
25 #include "libpq/pqmq.h"
26 #include "miscadmin.h"
27 #include "optimizer/planmain.h"
28 #include "pgstat.h"
29 #include "storage/ipc.h"
30 #include "storage/sinval.h"
31 #include "storage/spin.h"
32 #include "tcop/tcopprot.h"
33 #include "utils/combocid.h"
34 #include "utils/guc.h"
35 #include "utils/inval.h"
36 #include "utils/memutils.h"
37 #include "utils/resowner.h"
38 #include "utils/snapmgr.h"
39 
40 
41 /*
42  * We don't want to waste a lot of memory on an error queue which, most of
43  * the time, will process only a handful of small messages.  However, it is
44  * desirable to make it large enough that a typical ErrorResponse can be sent
45  * without blocking.  That way, a worker that errors out can write the whole
46  * message into the queue and terminate without waiting for the user backend.
47  */
48 #define PARALLEL_ERROR_QUEUE_SIZE			16384
49 
50 /* Magic number for parallel context TOC. */
51 #define PARALLEL_MAGIC						0x50477c7c
52 
53 /*
54  * Magic numbers for parallel state sharing.  Higher-level code should use
55  * smaller values, leaving these very large ones for use by this module.
56  */
57 #define PARALLEL_KEY_FIXED					UINT64CONST(0xFFFFFFFFFFFF0001)
58 #define PARALLEL_KEY_ERROR_QUEUE			UINT64CONST(0xFFFFFFFFFFFF0002)
59 #define PARALLEL_KEY_LIBRARY				UINT64CONST(0xFFFFFFFFFFFF0003)
60 #define PARALLEL_KEY_GUC					UINT64CONST(0xFFFFFFFFFFFF0004)
61 #define PARALLEL_KEY_COMBO_CID				UINT64CONST(0xFFFFFFFFFFFF0005)
62 #define PARALLEL_KEY_TRANSACTION_SNAPSHOT	UINT64CONST(0xFFFFFFFFFFFF0006)
63 #define PARALLEL_KEY_ACTIVE_SNAPSHOT		UINT64CONST(0xFFFFFFFFFFFF0007)
64 #define PARALLEL_KEY_TRANSACTION_STATE		UINT64CONST(0xFFFFFFFFFFFF0008)
65 #define PARALLEL_KEY_ENTRYPOINT				UINT64CONST(0xFFFFFFFFFFFF0009)
66 
67 /* Fixed-size parallel state. */
68 typedef struct FixedParallelState
69 {
70 	/* Fixed-size state that workers must restore. */
71 	Oid			database_id;
72 	Oid			authenticated_user_id;
73 	Oid			current_user_id;
74 	Oid			outer_user_id;
75 	Oid			temp_namespace_id;
76 	Oid			temp_toast_namespace_id;
77 	int			sec_context;
78 	bool		is_superuser;
79 	PGPROC	   *parallel_master_pgproc;
80 	pid_t		parallel_master_pid;
81 	BackendId	parallel_master_backend_id;
82 	TimestampTz xact_ts;
83 	TimestampTz stmt_ts;
84 
85 	/* Mutex protects remaining fields. */
86 	slock_t		mutex;
87 
88 	/* Maximum XactLastRecEnd of any worker. */
89 	XLogRecPtr	last_xlog_end;
90 } FixedParallelState;
91 
92 /*
93  * Our parallel worker number.  We initialize this to -1, meaning that we are
94  * not a parallel worker.  In parallel workers, it will be set to a value >= 0
95  * and < the number of workers before any user code is invoked; each parallel
96  * worker will get a different parallel worker number.
97  */
98 int			ParallelWorkerNumber = -1;
99 
100 /* Is there a parallel message pending which we need to receive? */
101 volatile bool ParallelMessagePending = false;
102 
103 /* Are we initializing a parallel worker? */
104 bool		InitializingParallelWorker = false;
105 
106 /* Pointer to our fixed parallel state. */
107 static FixedParallelState *MyFixedParallelState;
108 
109 /* List of active parallel contexts. */
110 static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
111 
112 /* Backend-local copy of data from FixedParallelState. */
113 static pid_t ParallelMasterPid;
114 
115 /*
116  * List of internal parallel worker entry points.  We need this for
117  * reasons explained in LookupParallelWorkerFunction(), below.
118  */
119 static const struct
120 {
121 	const char *fn_name;
122 	parallel_worker_main_type fn_addr;
123 }			InternalParallelWorkers[] =
124 
125 {
126 	{
127 		"ParallelQueryMain", ParallelQueryMain
128 	}
129 };
130 
131 /* Private functions. */
132 static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
133 static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
134 static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
135 static void ParallelWorkerShutdown(int code, Datum arg);
136 
137 
138 /*
139  * Establish a new parallel context.  This should be done after entering
140  * parallel mode, and (unless there is an error) the context should be
141  * destroyed before exiting the current subtransaction.
142  */
143 ParallelContext *
CreateParallelContext(const char * library_name,const char * function_name,int nworkers)144 CreateParallelContext(const char *library_name, const char *function_name,
145 					  int nworkers)
146 {
147 	MemoryContext oldcontext;
148 	ParallelContext *pcxt;
149 
150 	/* It is unsafe to create a parallel context if not in parallel mode. */
151 	Assert(IsInParallelMode());
152 
153 	/* Number of workers should be non-negative. */
154 	Assert(nworkers >= 0);
155 
156 	/*
157 	 * If dynamic shared memory is not available, we won't be able to use
158 	 * background workers.
159 	 */
160 	if (dynamic_shared_memory_type == DSM_IMPL_NONE)
161 		nworkers = 0;
162 
163 	/*
164 	 * If we are running under serializable isolation, we can't use parallel
165 	 * workers, at least not until somebody enhances that mechanism to be
166 	 * parallel-aware.
167 	 */
168 	if (IsolationIsSerializable())
169 		nworkers = 0;
170 
171 	/* We might be running in a short-lived memory context. */
172 	oldcontext = MemoryContextSwitchTo(TopTransactionContext);
173 
174 	/* Initialize a new ParallelContext. */
175 	pcxt = palloc0(sizeof(ParallelContext));
176 	pcxt->subid = GetCurrentSubTransactionId();
177 	pcxt->nworkers = nworkers;
178 	pcxt->library_name = pstrdup(library_name);
179 	pcxt->function_name = pstrdup(function_name);
180 	pcxt->error_context_stack = error_context_stack;
181 	shm_toc_initialize_estimator(&pcxt->estimator);
182 	dlist_push_head(&pcxt_list, &pcxt->node);
183 
184 	/* Restore previous memory context. */
185 	MemoryContextSwitchTo(oldcontext);
186 
187 	return pcxt;
188 }
189 
190 /*
191  * Establish the dynamic shared memory segment for a parallel context and
192  * copy state and other bookkeeping information that will be needed by
193  * parallel workers into it.
194  */
195 void
InitializeParallelDSM(ParallelContext * pcxt)196 InitializeParallelDSM(ParallelContext *pcxt)
197 {
198 	MemoryContext oldcontext;
199 	Size		library_len = 0;
200 	Size		guc_len = 0;
201 	Size		combocidlen = 0;
202 	Size		tsnaplen = 0;
203 	Size		asnaplen = 0;
204 	Size		tstatelen = 0;
205 	Size		segsize = 0;
206 	int			i;
207 	FixedParallelState *fps;
208 	Snapshot	transaction_snapshot = GetTransactionSnapshot();
209 	Snapshot	active_snapshot = GetActiveSnapshot();
210 
211 	/* We might be running in a very short-lived memory context. */
212 	oldcontext = MemoryContextSwitchTo(TopTransactionContext);
213 
214 	/* Allow space to store the fixed-size parallel state. */
215 	shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
216 	shm_toc_estimate_keys(&pcxt->estimator, 1);
217 
218 	/*
219 	 * Normally, the user will have requested at least one worker process, but
220 	 * if by chance they have not, we can skip a bunch of things here.
221 	 */
222 	if (pcxt->nworkers > 0)
223 	{
224 		/* Estimate space for various kinds of state sharing. */
225 		library_len = EstimateLibraryStateSpace();
226 		shm_toc_estimate_chunk(&pcxt->estimator, library_len);
227 		guc_len = EstimateGUCStateSpace();
228 		shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
229 		combocidlen = EstimateComboCIDStateSpace();
230 		shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
231 		if (IsolationUsesXactSnapshot())
232 		{
233 			tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
234 			shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
235 		}
236 		asnaplen = EstimateSnapshotSpace(active_snapshot);
237 		shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
238 		tstatelen = EstimateTransactionStateSpace();
239 		shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
240 		/* If you add more chunks here, you probably need to add keys. */
241 		shm_toc_estimate_keys(&pcxt->estimator, 6);
242 
243 		/* Estimate space need for error queues. */
244 		StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
245 						 PARALLEL_ERROR_QUEUE_SIZE,
246 						 "parallel error queue size not buffer-aligned");
247 		shm_toc_estimate_chunk(&pcxt->estimator,
248 							   mul_size(PARALLEL_ERROR_QUEUE_SIZE,
249 										pcxt->nworkers));
250 		shm_toc_estimate_keys(&pcxt->estimator, 1);
251 
252 		/* Estimate how much we'll need for the entrypoint info. */
253 		shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) +
254 							   strlen(pcxt->function_name) + 2);
255 		shm_toc_estimate_keys(&pcxt->estimator, 1);
256 	}
257 
258 	/*
259 	 * Create DSM and initialize with new table of contents.  But if the user
260 	 * didn't request any workers, then don't bother creating a dynamic shared
261 	 * memory segment; instead, just use backend-private memory.
262 	 *
263 	 * Also, if we can't create a dynamic shared memory segment because the
264 	 * maximum number of segments have already been created, then fall back to
265 	 * backend-private memory, and plan not to use any workers.  We hope this
266 	 * won't happen very often, but it's better to abandon the use of
267 	 * parallelism than to fail outright.
268 	 */
269 	segsize = shm_toc_estimate(&pcxt->estimator);
270 	if (pcxt->nworkers > 0)
271 		pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
272 	if (pcxt->seg != NULL)
273 		pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
274 								   dsm_segment_address(pcxt->seg),
275 								   segsize);
276 	else
277 	{
278 		pcxt->nworkers = 0;
279 		pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
280 		pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory,
281 								   segsize);
282 	}
283 
284 	/* Initialize fixed-size state in shared memory. */
285 	fps = (FixedParallelState *)
286 		shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
287 	fps->database_id = MyDatabaseId;
288 	fps->authenticated_user_id = GetAuthenticatedUserId();
289 	fps->outer_user_id = GetCurrentRoleId();
290 	fps->is_superuser = session_auth_is_superuser;
291 	GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
292 	GetTempNamespaceState(&fps->temp_namespace_id,
293 						  &fps->temp_toast_namespace_id);
294 	fps->parallel_master_pgproc = MyProc;
295 	fps->parallel_master_pid = MyProcPid;
296 	fps->parallel_master_backend_id = MyBackendId;
297 	fps->xact_ts = GetCurrentTransactionStartTimestamp();
298 	fps->stmt_ts = GetCurrentStatementStartTimestamp();
299 	SpinLockInit(&fps->mutex);
300 	fps->last_xlog_end = 0;
301 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
302 
303 	/* We can skip the rest of this if we're not budgeting for any workers. */
304 	if (pcxt->nworkers > 0)
305 	{
306 		char	   *libraryspace;
307 		char	   *gucspace;
308 		char	   *combocidspace;
309 		char	   *tsnapspace;
310 		char	   *asnapspace;
311 		char	   *tstatespace;
312 		char	   *error_queue_space;
313 		char	   *entrypointstate;
314 		Size		lnamelen;
315 
316 		/* Serialize shared libraries we have loaded. */
317 		libraryspace = shm_toc_allocate(pcxt->toc, library_len);
318 		SerializeLibraryState(library_len, libraryspace);
319 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
320 
321 		/* Serialize GUC settings. */
322 		gucspace = shm_toc_allocate(pcxt->toc, guc_len);
323 		SerializeGUCState(guc_len, gucspace);
324 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
325 
326 		/* Serialize combo CID state. */
327 		combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
328 		SerializeComboCIDState(combocidlen, combocidspace);
329 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
330 
331 		/*
332 		 * Serialize the transaction snapshot if the transaction
333 		 * isolation-level uses a transaction snapshot.
334 		 */
335 		if (IsolationUsesXactSnapshot())
336 		{
337 			tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
338 			SerializeSnapshot(transaction_snapshot, tsnapspace);
339 			shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
340 						   tsnapspace);
341 		}
342 
343 		/* Serialize the active snapshot. */
344 		asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
345 		SerializeSnapshot(active_snapshot, asnapspace);
346 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
347 
348 		/* Serialize transaction state. */
349 		tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
350 		SerializeTransactionState(tstatelen, tstatespace);
351 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
352 
353 		/* Allocate space for worker information. */
354 		pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
355 
356 		/*
357 		 * Establish error queues in dynamic shared memory.
358 		 *
359 		 * These queues should be used only for transmitting ErrorResponse,
360 		 * NoticeResponse, and NotifyResponse protocol messages.  Tuple data
361 		 * should be transmitted via separate (possibly larger?) queues.
362 		 */
363 		error_queue_space =
364 			shm_toc_allocate(pcxt->toc,
365 							 mul_size(PARALLEL_ERROR_QUEUE_SIZE,
366 									  pcxt->nworkers));
367 		for (i = 0; i < pcxt->nworkers; ++i)
368 		{
369 			char	   *start;
370 			shm_mq	   *mq;
371 
372 			start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
373 			mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
374 			shm_mq_set_receiver(mq, MyProc);
375 			pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
376 		}
377 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
378 
379 		/*
380 		 * Serialize entrypoint information.  It's unsafe to pass function
381 		 * pointers across processes, as the function pointer may be different
382 		 * in each process in EXEC_BACKEND builds, so we always pass library
383 		 * and function name.  (We use library name "postgres" for functions
384 		 * in the core backend.)
385 		 */
386 		lnamelen = strlen(pcxt->library_name);
387 		entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen +
388 										   strlen(pcxt->function_name) + 2);
389 		strcpy(entrypointstate, pcxt->library_name);
390 		strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
391 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
392 	}
393 
394 	/* Restore previous memory context. */
395 	MemoryContextSwitchTo(oldcontext);
396 }
397 
398 /*
399  * Reinitialize the dynamic shared memory segment for a parallel context such
400  * that we could launch workers for it again.
401  */
402 void
ReinitializeParallelDSM(ParallelContext * pcxt)403 ReinitializeParallelDSM(ParallelContext *pcxt)
404 {
405 	FixedParallelState *fps;
406 
407 	/* Wait for any old workers to exit. */
408 	if (pcxt->nworkers_launched > 0)
409 	{
410 		WaitForParallelWorkersToFinish(pcxt);
411 		WaitForParallelWorkersToExit(pcxt);
412 		pcxt->nworkers_launched = 0;
413 		if (pcxt->any_message_received)
414 		{
415 			pfree(pcxt->any_message_received);
416 			pcxt->any_message_received = NULL;
417 		}
418 	}
419 
420 	/* Reset a few bits of fixed parallel state to a clean state. */
421 	fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
422 	fps->last_xlog_end = 0;
423 
424 	/* Recreate error queues (if they exist). */
425 	if (pcxt->nworkers > 0)
426 	{
427 		char	   *error_queue_space;
428 		int			i;
429 
430 		error_queue_space =
431 			shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
432 		for (i = 0; i < pcxt->nworkers; ++i)
433 		{
434 			char	   *start;
435 			shm_mq	   *mq;
436 
437 			start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
438 			mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
439 			shm_mq_set_receiver(mq, MyProc);
440 			pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
441 		}
442 	}
443 }
444 
445 /*
446  * Launch parallel workers.
447  */
448 void
LaunchParallelWorkers(ParallelContext * pcxt)449 LaunchParallelWorkers(ParallelContext *pcxt)
450 {
451 	MemoryContext oldcontext;
452 	BackgroundWorker worker;
453 	int			i;
454 	bool		any_registrations_failed = false;
455 
456 	/* Skip this if we have no workers. */
457 	if (pcxt->nworkers == 0)
458 		return;
459 
460 	/* We need to be a lock group leader. */
461 	BecomeLockGroupLeader();
462 
463 	/* If we do have workers, we'd better have a DSM segment. */
464 	Assert(pcxt->seg != NULL);
465 
466 	/* We might be running in a short-lived memory context. */
467 	oldcontext = MemoryContextSwitchTo(TopTransactionContext);
468 
469 	/* Configure a worker. */
470 	memset(&worker, 0, sizeof(worker));
471 	snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
472 			 MyProcPid);
473 	worker.bgw_flags =
474 		BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
475 		| BGWORKER_CLASS_PARALLEL;
476 	worker.bgw_start_time = BgWorkerStart_ConsistentState;
477 	worker.bgw_restart_time = BGW_NEVER_RESTART;
478 	sprintf(worker.bgw_library_name, "postgres");
479 	sprintf(worker.bgw_function_name, "ParallelWorkerMain");
480 	worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
481 	worker.bgw_notify_pid = MyProcPid;
482 
483 	/*
484 	 * Start workers.
485 	 *
486 	 * The caller must be able to tolerate ending up with fewer workers than
487 	 * expected, so there is no need to throw an error here if registration
488 	 * fails.  It wouldn't help much anyway, because registering the worker in
489 	 * no way guarantees that it will start up and initialize successfully.
490 	 */
491 	for (i = 0; i < pcxt->nworkers; ++i)
492 	{
493 		memcpy(worker.bgw_extra, &i, sizeof(int));
494 		if (!any_registrations_failed &&
495 			RegisterDynamicBackgroundWorker(&worker,
496 											&pcxt->worker[i].bgwhandle))
497 		{
498 			shm_mq_set_handle(pcxt->worker[i].error_mqh,
499 							  pcxt->worker[i].bgwhandle);
500 			pcxt->nworkers_launched++;
501 		}
502 		else
503 		{
504 			/*
505 			 * If we weren't able to register the worker, then we've bumped up
506 			 * against the max_worker_processes limit, and future
507 			 * registrations will probably fail too, so arrange to skip them.
508 			 * But we still have to execute this code for the remaining slots
509 			 * to make sure that we forget about the error queues we budgeted
510 			 * for those workers.  Otherwise, we'll wait for them to start,
511 			 * but they never will.
512 			 */
513 			any_registrations_failed = true;
514 			pcxt->worker[i].bgwhandle = NULL;
515 			shm_mq_detach(pcxt->worker[i].error_mqh);
516 			pcxt->worker[i].error_mqh = NULL;
517 		}
518 	}
519 
520 	/*
521 	 * Now that nworkers_launched has taken its final value, we can initialize
522 	 * any_message_received.
523 	 */
524 	if (pcxt->nworkers_launched > 0)
525 		pcxt->any_message_received =
526 			palloc0(sizeof(bool) * pcxt->nworkers_launched);
527 
528 	/* Restore previous memory context. */
529 	MemoryContextSwitchTo(oldcontext);
530 }
531 
532 /*
533  * Wait for all workers to finish computing.
534  *
535  * Even if the parallel operation seems to have completed successfully, it's
536  * important to call this function afterwards.  We must not miss any errors
537  * the workers may have thrown during the parallel operation, or any that they
538  * may yet throw while shutting down.
539  *
540  * Also, we want to update our notion of XactLastRecEnd based on worker
541  * feedback.
542  */
543 void
WaitForParallelWorkersToFinish(ParallelContext * pcxt)544 WaitForParallelWorkersToFinish(ParallelContext *pcxt)
545 {
546 	for (;;)
547 	{
548 		bool		anyone_alive = false;
549 		int			nfinished = 0;
550 		int			i;
551 
552 		/*
553 		 * This will process any parallel messages that are pending, which may
554 		 * change the outcome of the loop that follows.  It may also throw an
555 		 * error propagated from a worker.
556 		 */
557 		CHECK_FOR_INTERRUPTS();
558 
559 		for (i = 0; i < pcxt->nworkers_launched; ++i)
560 		{
561 			/*
562 			 * If error_mqh is NULL, then the worker has already exited
563 			 * cleanly.  If we have received a message through error_mqh from
564 			 * the worker, we know it started up cleanly, and therefore we're
565 			 * certain to be notified when it exits.
566 			 */
567 			if (pcxt->worker[i].error_mqh == NULL)
568 				++nfinished;
569 			else if (pcxt->any_message_received[i])
570 			{
571 				anyone_alive = true;
572 				break;
573 			}
574 		}
575 
576 		if (!anyone_alive)
577 		{
578 			/* If all workers are known to have finished, we're done. */
579 			if (nfinished >= pcxt->nworkers_launched)
580 			{
581 				Assert(nfinished == pcxt->nworkers_launched);
582 				break;
583 			}
584 
585 			/*
586 			 * We didn't detect any living workers, but not all workers are
587 			 * known to have exited cleanly.  Either not all workers have
588 			 * launched yet, or maybe some of them failed to start or
589 			 * terminated abnormally.
590 			 */
591 			for (i = 0; i < pcxt->nworkers_launched; ++i)
592 			{
593 				pid_t		pid;
594 				shm_mq	   *mq;
595 
596 				/*
597 				 * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
598 				 * should just keep waiting.  If it is BGWH_STOPPED, then
599 				 * further investigation is needed.
600 				 */
601 				if (pcxt->worker[i].error_mqh == NULL ||
602 					pcxt->worker[i].bgwhandle == NULL ||
603 					GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
604 										   &pid) != BGWH_STOPPED)
605 					continue;
606 
607 				/*
608 				 * Check whether the worker ended up stopped without ever
609 				 * attaching to the error queue.  If so, the postmaster was
610 				 * unable to fork the worker or it exited without initializing
611 				 * properly.  We must throw an error, since the caller may
612 				 * have been expecting the worker to do some work before
613 				 * exiting.
614 				 */
615 				mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
616 				if (shm_mq_get_sender(mq) == NULL)
617 					ereport(ERROR,
618 							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
619 							 errmsg("parallel worker failed to initialize"),
620 							 errhint("More details may be available in the server log.")));
621 
622 				/*
623 				 * The worker is stopped, but is attached to the error queue.
624 				 * Unless there's a bug somewhere, this will only happen when
625 				 * the worker writes messages and terminates after the
626 				 * CHECK_FOR_INTERRUPTS() near the top of this function and
627 				 * before the call to GetBackgroundWorkerPid().  In that case,
628 				 * or latch should have been set as well and the right things
629 				 * will happen on the next pass through the loop.
630 				 */
631 			}
632 		}
633 
634 		WaitLatch(MyLatch, WL_LATCH_SET, -1,
635 				  WAIT_EVENT_PARALLEL_FINISH);
636 		ResetLatch(MyLatch);
637 	}
638 
639 	if (pcxt->toc != NULL)
640 	{
641 		FixedParallelState *fps;
642 
643 		fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
644 		if (fps->last_xlog_end > XactLastRecEnd)
645 			XactLastRecEnd = fps->last_xlog_end;
646 	}
647 }
648 
649 /*
650  * Wait for all workers to exit.
651  *
652  * This function ensures that workers have been completely shutdown.  The
653  * difference between WaitForParallelWorkersToFinish and this function is
654  * that former just ensures that last message sent by worker backend is
655  * received by master backend whereas this ensures the complete shutdown.
656  */
657 static void
WaitForParallelWorkersToExit(ParallelContext * pcxt)658 WaitForParallelWorkersToExit(ParallelContext *pcxt)
659 {
660 	int			i;
661 
662 	/* Wait until the workers actually die. */
663 	for (i = 0; i < pcxt->nworkers_launched; ++i)
664 	{
665 		BgwHandleStatus status;
666 
667 		if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
668 			continue;
669 
670 		status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
671 
672 		/*
673 		 * If the postmaster kicked the bucket, we have no chance of cleaning
674 		 * up safely -- we won't be able to tell when our workers are actually
675 		 * dead.  This doesn't necessitate a PANIC since they will all abort
676 		 * eventually, but we can't safely continue this session.
677 		 */
678 		if (status == BGWH_POSTMASTER_DIED)
679 			ereport(FATAL,
680 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
681 					 errmsg("postmaster exited during a parallel transaction")));
682 
683 		/* Release memory. */
684 		pfree(pcxt->worker[i].bgwhandle);
685 		pcxt->worker[i].bgwhandle = NULL;
686 	}
687 }
688 
689 /*
690  * Destroy a parallel context.
691  *
692  * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
693  * first, before calling this function.  When this function is invoked, any
694  * remaining workers are forcibly killed; the dynamic shared memory segment
695  * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
696  */
697 void
DestroyParallelContext(ParallelContext * pcxt)698 DestroyParallelContext(ParallelContext *pcxt)
699 {
700 	int			i;
701 
702 	/*
703 	 * Be careful about order of operations here!  We remove the parallel
704 	 * context from the list before we do anything else; otherwise, if an
705 	 * error occurs during a subsequent step, we might try to nuke it again
706 	 * from AtEOXact_Parallel or AtEOSubXact_Parallel.
707 	 */
708 	dlist_delete(&pcxt->node);
709 
710 	/* Kill each worker in turn, and forget their error queues. */
711 	if (pcxt->worker != NULL)
712 	{
713 		for (i = 0; i < pcxt->nworkers_launched; ++i)
714 		{
715 			if (pcxt->worker[i].error_mqh != NULL)
716 			{
717 				TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
718 
719 				shm_mq_detach(pcxt->worker[i].error_mqh);
720 				pcxt->worker[i].error_mqh = NULL;
721 			}
722 		}
723 	}
724 
725 	/*
726 	 * If we have allocated a shared memory segment, detach it.  This will
727 	 * implicitly detach the error queues, and any other shared memory queues,
728 	 * stored there.
729 	 */
730 	if (pcxt->seg != NULL)
731 	{
732 		dsm_detach(pcxt->seg);
733 		pcxt->seg = NULL;
734 	}
735 
736 	/*
737 	 * If this parallel context is actually in backend-private memory rather
738 	 * than shared memory, free that memory instead.
739 	 */
740 	if (pcxt->private_memory != NULL)
741 	{
742 		pfree(pcxt->private_memory);
743 		pcxt->private_memory = NULL;
744 	}
745 
746 	/*
747 	 * We can't finish transaction commit or abort until all of the workers
748 	 * have exited.  This means, in particular, that we can't respond to
749 	 * interrupts at this stage.
750 	 */
751 	HOLD_INTERRUPTS();
752 	WaitForParallelWorkersToExit(pcxt);
753 	RESUME_INTERRUPTS();
754 
755 	/* Free the worker array itself. */
756 	if (pcxt->worker != NULL)
757 	{
758 		pfree(pcxt->worker);
759 		pcxt->worker = NULL;
760 	}
761 
762 	/* Free memory. */
763 	pfree(pcxt->library_name);
764 	pfree(pcxt->function_name);
765 	pfree(pcxt);
766 }
767 
768 /*
769  * Are there any parallel contexts currently active?
770  */
771 bool
ParallelContextActive(void)772 ParallelContextActive(void)
773 {
774 	return !dlist_is_empty(&pcxt_list);
775 }
776 
777 /*
778  * Handle receipt of an interrupt indicating a parallel worker message.
779  *
780  * Note: this is called within a signal handler!  All we can do is set
781  * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
782  * HandleParallelMessages().
783  */
784 void
HandleParallelMessageInterrupt(void)785 HandleParallelMessageInterrupt(void)
786 {
787 	InterruptPending = true;
788 	ParallelMessagePending = true;
789 	SetLatch(MyLatch);
790 }
791 
792 /*
793  * Handle any queued protocol messages received from parallel workers.
794  */
795 void
HandleParallelMessages(void)796 HandleParallelMessages(void)
797 {
798 	dlist_iter	iter;
799 	MemoryContext oldcontext;
800 
801 	static MemoryContext hpm_context = NULL;
802 
803 	/*
804 	 * This is invoked from ProcessInterrupts(), and since some of the
805 	 * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
806 	 * for recursive calls if more signals are received while this runs.  It's
807 	 * unclear that recursive entry would be safe, and it doesn't seem useful
808 	 * even if it is safe, so let's block interrupts until done.
809 	 */
810 	HOLD_INTERRUPTS();
811 
812 	/*
813 	 * Moreover, CurrentMemoryContext might be pointing almost anywhere.  We
814 	 * don't want to risk leaking data into long-lived contexts, so let's do
815 	 * our work here in a private context that we can reset on each use.
816 	 */
817 	if (hpm_context == NULL)	/* first time through? */
818 		hpm_context = AllocSetContextCreate(TopMemoryContext,
819 											"HandleParallelMessages",
820 											ALLOCSET_DEFAULT_SIZES);
821 	else
822 		MemoryContextReset(hpm_context);
823 
824 	oldcontext = MemoryContextSwitchTo(hpm_context);
825 
826 	/* OK to process messages.  Reset the flag saying there are more to do. */
827 	ParallelMessagePending = false;
828 
829 	dlist_foreach(iter, &pcxt_list)
830 	{
831 		ParallelContext *pcxt;
832 		int			i;
833 
834 		pcxt = dlist_container(ParallelContext, node, iter.cur);
835 		if (pcxt->worker == NULL)
836 			continue;
837 
838 		for (i = 0; i < pcxt->nworkers_launched; ++i)
839 		{
840 			/*
841 			 * Read as many messages as we can from each worker, but stop when
842 			 * either (1) the worker's error queue goes away, which can happen
843 			 * if we receive a Terminate message from the worker; or (2) no
844 			 * more messages can be read from the worker without blocking.
845 			 */
846 			while (pcxt->worker[i].error_mqh != NULL)
847 			{
848 				shm_mq_result res;
849 				Size		nbytes;
850 				void	   *data;
851 
852 				res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
853 									 &data, true);
854 				if (res == SHM_MQ_WOULD_BLOCK)
855 					break;
856 				else if (res == SHM_MQ_SUCCESS)
857 				{
858 					StringInfoData msg;
859 
860 					initStringInfo(&msg);
861 					appendBinaryStringInfo(&msg, data, nbytes);
862 					HandleParallelMessage(pcxt, i, &msg);
863 					pfree(msg.data);
864 				}
865 				else
866 					ereport(ERROR,
867 							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
868 							 errmsg("lost connection to parallel worker")));
869 			}
870 		}
871 	}
872 
873 	MemoryContextSwitchTo(oldcontext);
874 
875 	/* Might as well clear the context on our way out */
876 	MemoryContextReset(hpm_context);
877 
878 	RESUME_INTERRUPTS();
879 }
880 
881 /*
882  * Handle a single protocol message received from a single parallel worker.
883  */
884 static void
HandleParallelMessage(ParallelContext * pcxt,int i,StringInfo msg)885 HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
886 {
887 	char		msgtype;
888 
889 	if (pcxt->any_message_received != NULL)
890 		pcxt->any_message_received[i] = true;
891 
892 	msgtype = pq_getmsgbyte(msg);
893 
894 	switch (msgtype)
895 	{
896 		case 'K':				/* BackendKeyData */
897 			{
898 				int32		pid = pq_getmsgint(msg, 4);
899 
900 				(void) pq_getmsgint(msg, 4);	/* discard cancel key */
901 				(void) pq_getmsgend(msg);
902 				pcxt->worker[i].pid = pid;
903 				break;
904 			}
905 
906 		case 'E':				/* ErrorResponse */
907 		case 'N':				/* NoticeResponse */
908 			{
909 				ErrorData	edata;
910 				ErrorContextCallback *save_error_context_stack;
911 
912 				/* Parse ErrorResponse or NoticeResponse. */
913 				pq_parse_errornotice(msg, &edata);
914 
915 				/* Death of a worker isn't enough justification for suicide. */
916 				edata.elevel = Min(edata.elevel, ERROR);
917 
918 				/*
919 				 * If desired, add a context line to show that this is a
920 				 * message propagated from a parallel worker.  Otherwise, it
921 				 * can sometimes be confusing to understand what actually
922 				 * happened.  (We don't do this in FORCE_PARALLEL_REGRESS mode
923 				 * because it causes test-result instability depending on
924 				 * whether a parallel worker is actually used or not.)
925 				 */
926 				if (force_parallel_mode != FORCE_PARALLEL_REGRESS)
927 				{
928 					if (edata.context)
929 						edata.context = psprintf("%s\n%s", edata.context,
930 												 _("parallel worker"));
931 					else
932 						edata.context = pstrdup(_("parallel worker"));
933 				}
934 
935 				/*
936 				 * Context beyond that should use the error context callbacks
937 				 * that were in effect when the ParallelContext was created,
938 				 * not the current ones.
939 				 */
940 				save_error_context_stack = error_context_stack;
941 				error_context_stack = pcxt->error_context_stack;
942 
943 				/* Rethrow error or print notice. */
944 				ThrowErrorData(&edata);
945 
946 				/* Not an error, so restore previous context stack. */
947 				error_context_stack = save_error_context_stack;
948 
949 				break;
950 			}
951 
952 		case 'A':				/* NotifyResponse */
953 			{
954 				/* Propagate NotifyResponse. */
955 				int32		pid;
956 				const char *channel;
957 				const char *payload;
958 
959 				pid = pq_getmsgint(msg, 4);
960 				channel = pq_getmsgrawstring(msg);
961 				payload = pq_getmsgrawstring(msg);
962 				pq_endmessage(msg);
963 
964 				NotifyMyFrontEnd(channel, payload, pid);
965 
966 				break;
967 			}
968 
969 		case 'X':				/* Terminate, indicating clean exit */
970 			{
971 				shm_mq_detach(pcxt->worker[i].error_mqh);
972 				pcxt->worker[i].error_mqh = NULL;
973 				break;
974 			}
975 
976 		default:
977 			{
978 				elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
979 					 msgtype, msg->len);
980 			}
981 	}
982 }
983 
984 /*
985  * End-of-subtransaction cleanup for parallel contexts.
986  *
987  * Currently, it's forbidden to enter or leave a subtransaction while
988  * parallel mode is in effect, so we could just blow away everything.  But
989  * we may want to relax that restriction in the future, so this code
990  * contemplates that there may be multiple subtransaction IDs in pcxt_list.
991  */
992 void
AtEOSubXact_Parallel(bool isCommit,SubTransactionId mySubId)993 AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
994 {
995 	while (!dlist_is_empty(&pcxt_list))
996 	{
997 		ParallelContext *pcxt;
998 
999 		pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1000 		if (pcxt->subid != mySubId)
1001 			break;
1002 		if (isCommit)
1003 			elog(WARNING, "leaked parallel context");
1004 		DestroyParallelContext(pcxt);
1005 	}
1006 }
1007 
1008 /*
1009  * End-of-transaction cleanup for parallel contexts.
1010  */
1011 void
AtEOXact_Parallel(bool isCommit)1012 AtEOXact_Parallel(bool isCommit)
1013 {
1014 	while (!dlist_is_empty(&pcxt_list))
1015 	{
1016 		ParallelContext *pcxt;
1017 
1018 		pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1019 		if (isCommit)
1020 			elog(WARNING, "leaked parallel context");
1021 		DestroyParallelContext(pcxt);
1022 	}
1023 }
1024 
1025 /*
1026  * Main entrypoint for parallel workers.
1027  */
1028 void
ParallelWorkerMain(Datum main_arg)1029 ParallelWorkerMain(Datum main_arg)
1030 {
1031 	dsm_segment *seg;
1032 	shm_toc    *toc;
1033 	FixedParallelState *fps;
1034 	char	   *error_queue_space;
1035 	shm_mq	   *mq;
1036 	shm_mq_handle *mqh;
1037 	char	   *libraryspace;
1038 	char	   *entrypointstate;
1039 	char	   *library_name;
1040 	char	   *function_name;
1041 	parallel_worker_main_type entrypt;
1042 	char	   *gucspace;
1043 	char	   *combocidspace;
1044 	char	   *tsnapspace;
1045 	char	   *asnapspace;
1046 	char	   *tstatespace;
1047 	StringInfoData msgbuf;
1048 	Snapshot	tsnapshot;
1049 	Snapshot	asnapshot;
1050 
1051 	/* Set flag to indicate that we're initializing a parallel worker. */
1052 	InitializingParallelWorker = true;
1053 
1054 	/* Establish signal handlers. */
1055 	pqsignal(SIGTERM, die);
1056 	BackgroundWorkerUnblockSignals();
1057 
1058 	/* Determine and set our parallel worker number. */
1059 	Assert(ParallelWorkerNumber == -1);
1060 	memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
1061 
1062 	/* Set up a memory context and resource owner. */
1063 	Assert(CurrentResourceOwner == NULL);
1064 	CurrentResourceOwner = ResourceOwnerCreate(NULL, "parallel toplevel");
1065 	CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
1066 												 "Parallel worker",
1067 												 ALLOCSET_DEFAULT_SIZES);
1068 
1069 	/*
1070 	 * Now that we have a resource owner, we can attach to the dynamic shared
1071 	 * memory segment and read the table of contents.
1072 	 */
1073 	seg = dsm_attach(DatumGetUInt32(main_arg));
1074 	if (seg == NULL)
1075 		ereport(ERROR,
1076 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1077 				 errmsg("could not map dynamic shared memory segment")));
1078 	toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
1079 	if (toc == NULL)
1080 		ereport(ERROR,
1081 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1082 				 errmsg("invalid magic number in dynamic shared memory segment")));
1083 
1084 	/* Look up fixed parallel state. */
1085 	fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
1086 	MyFixedParallelState = fps;
1087 
1088 	/* Arrange to signal the leader if we exit. */
1089 	ParallelMasterPid = fps->parallel_master_pid;
1090 	ParallelMasterBackendId = fps->parallel_master_backend_id;
1091 	on_shmem_exit(ParallelWorkerShutdown, (Datum) 0);
1092 
1093 	/*
1094 	 * Now we can find and attach to the error queue provided for us.  That's
1095 	 * good, because until we do that, any errors that happen here will not be
1096 	 * reported back to the process that requested that this worker be
1097 	 * launched.
1098 	 */
1099 	error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
1100 	mq = (shm_mq *) (error_queue_space +
1101 					 ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
1102 	shm_mq_set_sender(mq, MyProc);
1103 	mqh = shm_mq_attach(mq, seg, NULL);
1104 	pq_redirect_to_shm_mq(seg, mqh);
1105 	pq_set_parallel_master(fps->parallel_master_pid,
1106 						   fps->parallel_master_backend_id);
1107 
1108 	/*
1109 	 * Send a BackendKeyData message to the process that initiated parallelism
1110 	 * so that it has access to our PID before it receives any other messages
1111 	 * from us.  Our cancel key is sent, too, since that's the way the
1112 	 * protocol message is defined, but it won't actually be used for anything
1113 	 * in this case.
1114 	 */
1115 	pq_beginmessage(&msgbuf, 'K');
1116 	pq_sendint(&msgbuf, (int32) MyProcPid, sizeof(int32));
1117 	pq_sendint(&msgbuf, (int32) MyCancelKey, sizeof(int32));
1118 	pq_endmessage(&msgbuf);
1119 
1120 	/*
1121 	 * Hooray! Primary initialization is complete.  Now, we need to set up our
1122 	 * backend-local state to match the original backend.
1123 	 */
1124 
1125 	/*
1126 	 * Join locking group.  We must do this before anything that could try to
1127 	 * acquire a heavyweight lock, because any heavyweight locks acquired to
1128 	 * this point could block either directly against the parallel group
1129 	 * leader or against some process which in turn waits for a lock that
1130 	 * conflicts with the parallel group leader, causing an undetected
1131 	 * deadlock.  (If we can't join the lock group, the leader has gone away,
1132 	 * so just exit quietly.)
1133 	 */
1134 	if (!BecomeLockGroupMember(fps->parallel_master_pgproc,
1135 							   fps->parallel_master_pid))
1136 		return;
1137 
1138 	/*
1139 	 * Restore transaction and statement start-time timestamps.  This must
1140 	 * happen before anything that would start a transaction, else asserts in
1141 	 * xact.c will fire.
1142 	 */
1143 	SetParallelStartTimestamps(fps->xact_ts, fps->stmt_ts);
1144 
1145 	/*
1146 	 * Identify the entry point to be called.  In theory this could result in
1147 	 * loading an additional library, though most likely the entry point is in
1148 	 * the core backend or in a library we just loaded.
1149 	 */
1150 	entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
1151 	library_name = entrypointstate;
1152 	function_name = entrypointstate + strlen(library_name) + 1;
1153 
1154 	entrypt = LookupParallelWorkerFunction(library_name, function_name);
1155 
1156 	/* Restore database connection. */
1157 	BackgroundWorkerInitializeConnectionByOid(fps->database_id,
1158 											  fps->authenticated_user_id);
1159 
1160 	/*
1161 	 * Set the client encoding to the database encoding, since that is what
1162 	 * the leader will expect.
1163 	 */
1164 	SetClientEncoding(GetDatabaseEncoding());
1165 
1166 	/*
1167 	 * Load libraries that were loaded by original backend.  We want to do
1168 	 * this before restoring GUCs, because the libraries might define custom
1169 	 * variables.
1170 	 */
1171 	libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
1172 	StartTransactionCommand();
1173 	RestoreLibraryState(libraryspace);
1174 
1175 	/* Restore GUC values from launching backend. */
1176 	gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
1177 	RestoreGUCState(gucspace);
1178 	CommitTransactionCommand();
1179 
1180 	/* Crank up a transaction state appropriate to a parallel worker. */
1181 	tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
1182 	StartParallelWorkerTransaction(tstatespace);
1183 
1184 	/* Restore combo CID state. */
1185 	combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
1186 	RestoreComboCIDState(combocidspace);
1187 
1188 	/*
1189 	 * If the transaction isolation level is REPEATABLE READ or SERIALIZABLE,
1190 	 * the leader has serialized the transaction snapshot and we must restore
1191 	 * it. At lower isolation levels, there is no transaction-lifetime
1192 	 * snapshot, but we need TransactionXmin to get set to a value which is
1193 	 * less than or equal to the xmin of every snapshot that will be used by
1194 	 * this worker. The easiest way to accomplish that is to install the
1195 	 * active snapshot as the transaction snapshot. Code running in this
1196 	 * parallel worker might take new snapshots via GetTransactionSnapshot()
1197 	 * or GetLatestSnapshot(), but it shouldn't have any way of acquiring a
1198 	 * snapshot older than the active snapshot.
1199 	 */
1200 	asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
1201 	tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, true);
1202 	asnapshot = RestoreSnapshot(asnapspace);
1203 	tsnapshot = tsnapspace ? RestoreSnapshot(tsnapspace) : asnapshot;
1204 	RestoreTransactionSnapshot(tsnapshot,
1205 							   fps->parallel_master_pgproc);
1206 	PushActiveSnapshot(asnapshot);
1207 
1208 	/*
1209 	 * We've changed which tuples we can see, and must therefore invalidate
1210 	 * system caches.
1211 	 */
1212 	InvalidateSystemCaches();
1213 
1214 	/*
1215 	 * Restore current role id.  Skip verifying whether session user is
1216 	 * allowed to become this role and blindly restore the leader's state for
1217 	 * current role.
1218 	 */
1219 	SetCurrentRoleId(fps->outer_user_id, fps->is_superuser);
1220 
1221 	/* Restore user ID and security context. */
1222 	SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
1223 
1224 	/* Restore temp-namespace state to ensure search path matches leader's. */
1225 	SetTempNamespaceState(fps->temp_namespace_id,
1226 						  fps->temp_toast_namespace_id);
1227 
1228 	/*
1229 	 * We've initialized all of our state now; nothing should change
1230 	 * hereafter.
1231 	 */
1232 	InitializingParallelWorker = false;
1233 	EnterParallelMode();
1234 
1235 	/*
1236 	 * Time to do the real work: invoke the caller-supplied code.
1237 	 */
1238 	entrypt(seg, toc);
1239 
1240 	/* Must exit parallel mode to pop active snapshot. */
1241 	ExitParallelMode();
1242 
1243 	/* Must pop active snapshot so resowner.c doesn't complain. */
1244 	PopActiveSnapshot();
1245 
1246 	/* Shut down the parallel-worker transaction. */
1247 	EndParallelWorkerTransaction();
1248 
1249 	/* Report success. */
1250 	pq_putmessage('X', NULL, 0);
1251 }
1252 
1253 /*
1254  * Update shared memory with the ending location of the last WAL record we
1255  * wrote, if it's greater than the value already stored there.
1256  */
1257 void
ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)1258 ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
1259 {
1260 	FixedParallelState *fps = MyFixedParallelState;
1261 
1262 	Assert(fps != NULL);
1263 	SpinLockAcquire(&fps->mutex);
1264 	if (fps->last_xlog_end < last_xlog_end)
1265 		fps->last_xlog_end = last_xlog_end;
1266 	SpinLockRelease(&fps->mutex);
1267 }
1268 
1269 /*
1270  * Make sure the leader tries to read from our error queue one more time.
1271  * This guards against the case where we exit uncleanly without sending an
1272  * ErrorResponse to the leader, for example because some code calls proc_exit
1273  * directly.
1274  */
1275 static void
ParallelWorkerShutdown(int code,Datum arg)1276 ParallelWorkerShutdown(int code, Datum arg)
1277 {
1278 	SendProcSignal(ParallelMasterPid,
1279 				   PROCSIG_PARALLEL_MESSAGE,
1280 				   ParallelMasterBackendId);
1281 }
1282 
1283 /*
1284  * Look up (and possibly load) a parallel worker entry point function.
1285  *
1286  * For functions contained in the core code, we use library name "postgres"
1287  * and consult the InternalParallelWorkers array.  External functions are
1288  * looked up, and loaded if necessary, using load_external_function().
1289  *
1290  * The point of this is to pass function names as strings across process
1291  * boundaries.  We can't pass actual function addresses because of the
1292  * possibility that the function has been loaded at a different address
1293  * in a different process.  This is obviously a hazard for functions in
1294  * loadable libraries, but it can happen even for functions in the core code
1295  * on platforms using EXEC_BACKEND (e.g., Windows).
1296  *
1297  * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
1298  * in favor of applying load_external_function() for core functions too;
1299  * but that raises portability issues that are not worth addressing now.
1300  */
1301 static parallel_worker_main_type
LookupParallelWorkerFunction(const char * libraryname,const char * funcname)1302 LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
1303 {
1304 	/*
1305 	 * If the function is to be loaded from postgres itself, search the
1306 	 * InternalParallelWorkers array.
1307 	 */
1308 	if (strcmp(libraryname, "postgres") == 0)
1309 	{
1310 		int			i;
1311 
1312 		for (i = 0; i < lengthof(InternalParallelWorkers); i++)
1313 		{
1314 			if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
1315 				return InternalParallelWorkers[i].fn_addr;
1316 		}
1317 
1318 		/* We can only reach this by programming error. */
1319 		elog(ERROR, "internal function \"%s\" not found", funcname);
1320 	}
1321 
1322 	/* Otherwise load from external library. */
1323 	return (parallel_worker_main_type)
1324 		load_external_function(libraryname, funcname, true, NULL);
1325 }
1326